From 6ed025d3e64eebe8c64bfb990aa4e21550dd5af9 Mon Sep 17 00:00:00 2001 From: Snider Date: Fri, 30 Jan 2026 10:18:54 +0000 Subject: [PATCH] feat(framework): add QUERY/QUERYALL/PERFORM dispatch patterns Implements the Core IPC design with four dispatch patterns: - ACTION: fire-and-forget broadcast (existing) - QUERY: first responder returns data - QUERYALL: all responders return data - PERFORM: first responder executes task Updates git and agentic services to use Query/Task patterns. Adds dev service for workflow orchestration. Refactors dev work command to use worker bundles. Co-Authored-By: Claude Opus 4.5 --- cmd/dev/bundles.go | 89 +++++++++ cmd/dev/dev_work.go | 181 ++++++++++------- pkg/agentic/service.go | 71 +++---- pkg/dev/service.go | 313 ++++++++++++++++++++++++++++++ pkg/framework/core/core.go | 67 +++++++ pkg/framework/core/interfaces.go | 21 ++ pkg/framework/core/query_test.go | 201 +++++++++++++++++++ pkg/framework/core/runtime_pkg.go | 5 + pkg/framework/framework.go | 4 + pkg/git/service.go | 91 ++++++--- 10 files changed, 912 insertions(+), 131 deletions(-) create mode 100644 cmd/dev/bundles.go create mode 100644 pkg/dev/service.go create mode 100644 pkg/framework/core/query_test.go diff --git a/cmd/dev/bundles.go b/cmd/dev/bundles.go new file mode 100644 index 00000000..bd9cccf4 --- /dev/null +++ b/cmd/dev/bundles.go @@ -0,0 +1,89 @@ +package dev + +import ( + "context" + + "github.com/host-uk/core/pkg/agentic" + devpkg "github.com/host-uk/core/pkg/dev" + "github.com/host-uk/core/pkg/framework" + "github.com/host-uk/core/pkg/git" +) + +// WorkBundle contains the Core instance for dev work operations. +type WorkBundle struct { + Core *framework.Core +} + +// WorkBundleOptions configures the work bundle. +type WorkBundleOptions struct { + RegistryPath string + AllowEdit bool // Allow agentic to use Write/Edit tools +} + +// NewWorkBundle creates a bundle for dev work operations. +// Includes: dev (orchestration), git, agentic services. +func NewWorkBundle(opts WorkBundleOptions) (*WorkBundle, error) { + c, err := framework.New( + framework.WithService(devpkg.NewService(devpkg.ServiceOptions{ + RegistryPath: opts.RegistryPath, + })), + framework.WithService(git.NewService(git.ServiceOptions{})), + framework.WithService(agentic.NewService(agentic.ServiceOptions{ + AllowEdit: opts.AllowEdit, + })), + framework.WithServiceLock(), + ) + if err != nil { + return nil, err + } + + return &WorkBundle{Core: c}, nil +} + +// Start initialises the bundle services. +func (b *WorkBundle) Start(ctx context.Context) error { + return b.Core.ServiceStartup(ctx, nil) +} + +// Stop shuts down the bundle services. +func (b *WorkBundle) Stop(ctx context.Context) error { + return b.Core.ServiceShutdown(ctx) +} + +// StatusBundle contains the Core instance for status-only operations. +type StatusBundle struct { + Core *framework.Core +} + +// StatusBundleOptions configures the status bundle. +type StatusBundleOptions struct { + RegistryPath string +} + +// NewStatusBundle creates a bundle for status-only operations. +// Includes: dev (orchestration), git services. No agentic - commits not available. +func NewStatusBundle(opts StatusBundleOptions) (*StatusBundle, error) { + c, err := framework.New( + framework.WithService(devpkg.NewService(devpkg.ServiceOptions{ + RegistryPath: opts.RegistryPath, + })), + framework.WithService(git.NewService(git.ServiceOptions{})), + // No agentic service - TaskCommit will be unhandled + framework.WithServiceLock(), + ) + if err != nil { + return nil, err + } + + return &StatusBundle{Core: c}, nil +} + +// Start initialises the bundle services. +func (b *StatusBundle) Start(ctx context.Context) error { + return b.Core.ServiceStartup(ctx, nil) +} + +// Stop shuts down the bundle services. +func (b *StatusBundle) Stop(ctx context.Context) error { + return b.Core.ServiceShutdown(ctx) +} diff --git a/cmd/dev/dev_work.go b/cmd/dev/dev_work.go index 8bba6dac..d2fc68ce 100644 --- a/cmd/dev/dev_work.go +++ b/cmd/dev/dev_work.go @@ -44,44 +44,24 @@ func addWorkCommand(parent *cobra.Command) { func runWork(registryPath string, statusOnly, autoCommit bool) error { ctx := context.Background() - // Find or use provided registry, fall back to directory scan - var reg *repos.Registry - var err error - - if registryPath != "" { - reg, err = repos.LoadRegistry(registryPath) - if err != nil { - return fmt.Errorf("failed to load registry: %w", err) - } - fmt.Printf("%s %s\n\n", dimStyle.Render(i18n.T("cmd.dev.registry_label")), registryPath) - } else { - registryPath, err = repos.FindRegistry() - if err == nil { - reg, err = repos.LoadRegistry(registryPath) - if err != nil { - return fmt.Errorf("failed to load registry: %w", err) - } - fmt.Printf("%s %s\n\n", dimStyle.Render(i18n.T("cmd.dev.registry_label")), registryPath) - } else { - // Fallback: scan current directory - cwd, _ := os.Getwd() - reg, err = repos.ScanDirectory(cwd) - if err != nil { - return fmt.Errorf("failed to scan directory: %w", err) - } - fmt.Printf("%s %s\n\n", dimStyle.Render(i18n.T("cmd.dev.scanning_label")), cwd) - } + // Build worker bundle with required services + bundle, err := NewWorkBundle(WorkBundleOptions{ + RegistryPath: registryPath, + }) + if err != nil { + return err } - // Build paths and names for git operations - var paths []string - names := make(map[string]string) + // Start services (registers handlers) + if err := bundle.Start(ctx); err != nil { + return err + } + defer bundle.Stop(ctx) - for _, repo := range reg.List() { - if repo.IsGitRepo() { - paths = append(paths, repo.Path) - names[repo.Path] = repo.Name - } + // Load registry and get paths + paths, names, err := loadRegistry(registryPath) + if err != nil { + return err } if len(paths) == 0 { @@ -89,11 +69,18 @@ func runWork(registryPath string, statusOnly, autoCommit bool) error { return nil } - // Get status for all repos - statuses := git.Status(ctx, git.StatusOptions{ + // QUERY git status + result, handled, err := bundle.Core.QUERY(git.QueryStatus{ Paths: paths, Names: names, }) + if !handled { + return fmt.Errorf("git service not available") + } + if err != nil { + return err + } + statuses := result.([]git.RepoStatus) // Sort by repo name for consistent output sort.Slice(statuses, func(i, j int) bool { @@ -126,18 +113,28 @@ func runWork(registryPath string, statusOnly, autoCommit bool) error { fmt.Println() for _, s := range dirtyRepos { - if err := claudeCommit(ctx, s.Path, s.Name, registryPath); err != nil { + // PERFORM commit via agentic service + _, handled, err := bundle.Core.PERFORM(agentic.TaskCommit{ + Path: s.Path, + Name: s.Name, + }) + if !handled { + fmt.Printf(" %s %s: %s\n", warningStyle.Render("!"), s.Name, "agentic service not available") + continue + } + if err != nil { fmt.Printf(" %s %s: %s\n", errorStyle.Render("x"), s.Name, err) } else { fmt.Printf(" %s %s\n", successStyle.Render("v"), s.Name) } } - // Re-check status after commits - statuses = git.Status(ctx, git.StatusOptions{ + // Re-QUERY status after commits + result, _, _ = bundle.Core.QUERY(git.QueryStatus{ Paths: paths, Names: names, }) + statuses = result.([]git.RepoStatus) // Rebuild ahead repos list aheadRepos = nil @@ -178,27 +175,27 @@ func runWork(registryPath string, statusOnly, autoCommit bool) error { fmt.Println() - // Push sequentially (SSH passphrase needs interaction) - var pushPaths []string + // PERFORM push for each repo + var divergedRepos []git.RepoStatus + for _, s := range aheadRepos { - pushPaths = append(pushPaths, s.Path) - } - - results := git.PushMultiple(ctx, pushPaths, names) - - var divergedRepos []git.PushResult - - for _, r := range results { - if r.Success { - fmt.Printf(" %s %s\n", successStyle.Render("v"), r.Name) - } else { - // Check if this is a non-fast-forward error (diverged branch) - if git.IsNonFastForward(r.Error) { - fmt.Printf(" %s %s: %s\n", warningStyle.Render("!"), r.Name, i18n.T("cmd.dev.push.diverged")) - divergedRepos = append(divergedRepos, r) + _, handled, err := bundle.Core.PERFORM(git.TaskPush{ + Path: s.Path, + Name: s.Name, + }) + if !handled { + fmt.Printf(" %s %s: %s\n", errorStyle.Render("x"), s.Name, "git service not available") + continue + } + if err != nil { + if git.IsNonFastForward(err) { + fmt.Printf(" %s %s: %s\n", warningStyle.Render("!"), s.Name, i18n.T("cmd.dev.push.diverged")) + divergedRepos = append(divergedRepos, s) } else { - fmt.Printf(" %s %s: %s\n", errorStyle.Render("x"), r.Name, r.Error) + fmt.Printf(" %s %s: %s\n", errorStyle.Render("x"), s.Name, err) } + } else { + fmt.Printf(" %s %s\n", successStyle.Render("v"), s.Name) } } @@ -208,18 +205,26 @@ func runWork(registryPath string, statusOnly, autoCommit bool) error { fmt.Printf("%s\n", i18n.T("cmd.dev.push.diverged_help")) if shared.Confirm(i18n.T("cmd.dev.push.pull_and_retry")) { fmt.Println() - for _, r := range divergedRepos { - fmt.Printf(" %s %s...\n", dimStyle.Render("↓"), r.Name) - if err := git.Pull(ctx, r.Path); err != nil { - fmt.Printf(" %s %s: %s\n", errorStyle.Render("x"), r.Name, err) + for _, s := range divergedRepos { + fmt.Printf(" %s %s...\n", dimStyle.Render("↓"), s.Name) + + // PERFORM pull + _, _, err := bundle.Core.PERFORM(git.TaskPull{Path: s.Path, Name: s.Name}) + if err != nil { + fmt.Printf(" %s %s: %s\n", errorStyle.Render("x"), s.Name, err) continue } - fmt.Printf(" %s %s...\n", dimStyle.Render("↑"), r.Name) - if err := git.Push(ctx, r.Path); err != nil { - fmt.Printf(" %s %s: %s\n", errorStyle.Render("x"), r.Name, err) + + fmt.Printf(" %s %s...\n", dimStyle.Render("↑"), s.Name) + + // PERFORM push + _, _, err = bundle.Core.PERFORM(git.TaskPush{Path: s.Path, Name: s.Name}) + if err != nil { + fmt.Printf(" %s %s: %s\n", errorStyle.Render("x"), s.Name, err) continue } - fmt.Printf(" %s %s\n", successStyle.Render("v"), r.Name) + + fmt.Printf(" %s %s\n", successStyle.Render("v"), s.Name) } } } @@ -301,6 +306,7 @@ func printStatusTable(statuses []git.RepoStatus) { } } +// claudeCommit shells out to claude for committing (legacy helper for other commands) func claudeCommit(ctx context.Context, repoPath, repoName, registryPath string) error { prompt := agentic.Prompt("commit") @@ -313,6 +319,7 @@ func claudeCommit(ctx context.Context, repoPath, repoName, registryPath string) return cmd.Run() } +// claudeEditCommit shells out to claude with edit permissions (legacy helper) func claudeEditCommit(ctx context.Context, repoPath, repoName, registryPath string) error { prompt := agentic.Prompt("commit") @@ -324,3 +331,45 @@ func claudeEditCommit(ctx context.Context, repoPath, repoName, registryPath stri return cmd.Run() } + +func loadRegistry(registryPath string) ([]string, map[string]string, error) { + var reg *repos.Registry + var err error + + if registryPath != "" { + reg, err = repos.LoadRegistry(registryPath) + if err != nil { + return nil, nil, fmt.Errorf("failed to load registry: %w", err) + } + fmt.Printf("%s %s\n\n", dimStyle.Render(i18n.T("cmd.dev.registry_label")), registryPath) + } else { + registryPath, err = repos.FindRegistry() + if err == nil { + reg, err = repos.LoadRegistry(registryPath) + if err != nil { + return nil, nil, fmt.Errorf("failed to load registry: %w", err) + } + fmt.Printf("%s %s\n\n", dimStyle.Render(i18n.T("cmd.dev.registry_label")), registryPath) + } else { + // Fallback: scan current directory + cwd, _ := os.Getwd() + reg, err = repos.ScanDirectory(cwd) + if err != nil { + return nil, nil, fmt.Errorf("failed to scan directory: %w", err) + } + fmt.Printf("%s %s\n\n", dimStyle.Render(i18n.T("cmd.dev.scanning_label")), cwd) + } + } + + var paths []string + names := make(map[string]string) + + for _, repo := range reg.List() { + if repo.IsGitRepo() { + paths = append(paths, repo.Path) + names[repo.Path] = repo.Name + } + } + + return paths, names, nil +} diff --git a/pkg/agentic/service.go b/pkg/agentic/service.go index f1972004..11364744 100644 --- a/pkg/agentic/service.go +++ b/pkg/agentic/service.go @@ -4,21 +4,22 @@ import ( "context" "os" "os/exec" + "strings" "github.com/host-uk/core/pkg/framework" ) -// Actions for AI service IPC +// Tasks for AI service -// ActionCommit requests Claude to create a commit. -type ActionCommit struct { +// TaskCommit requests Claude to create a commit. +type TaskCommit struct { Path string Name string CanEdit bool // allow Write/Edit tools } -// ActionPrompt sends a custom prompt to Claude. -type ActionPrompt struct { +// TaskPrompt sends a custom prompt to Claude. +type TaskPrompt struct { Prompt string WorkDir string AllowedTools []string @@ -27,12 +28,14 @@ type ActionPrompt struct { // ServiceOptions for configuring the AI service. type ServiceOptions struct { DefaultTools []string + AllowEdit bool // global permission for Write/Edit tools } // DefaultServiceOptions returns sensible defaults. func DefaultServiceOptions() ServiceOptions { return ServiceOptions{ DefaultTools: []string{"Bash", "Read", "Glob", "Grep"}, + AllowEdit: false, } } @@ -50,32 +53,35 @@ func NewService(opts ServiceOptions) func(*framework.Core) (any, error) { } } -// OnStartup registers action handlers. +// OnStartup registers task handlers. func (s *Service) OnStartup(ctx context.Context) error { - s.Core().RegisterAction(s.handle) + s.Core().RegisterTask(s.handleTask) return nil } -func (s *Service) handle(c *framework.Core, msg framework.Message) error { - switch m := msg.(type) { - case ActionCommit: - return s.handleCommit(m) - case ActionPrompt: - return s.handlePrompt(m) +func (s *Service) handleTask(c *framework.Core, t framework.Task) (any, bool, error) { + switch m := t.(type) { + case TaskCommit: + err := s.doCommit(m) + return nil, true, err + + case TaskPrompt: + err := s.doPrompt(m) + return nil, true, err } - return nil + return nil, false, nil } -func (s *Service) handleCommit(action ActionCommit) error { +func (s *Service) doCommit(task TaskCommit) error { prompt := Prompt("commit") - tools := "Bash,Read,Glob,Grep" - if action.CanEdit { - tools = "Bash,Read,Write,Edit,Glob,Grep" + tools := []string{"Bash", "Read", "Glob", "Grep"} + if task.CanEdit { + tools = []string{"Bash", "Read", "Write", "Edit", "Glob", "Grep"} } - cmd := exec.CommandContext(context.Background(), "claude", "-p", prompt, "--allowedTools", tools) - cmd.Dir = action.Path + cmd := exec.CommandContext(context.Background(), "claude", "-p", prompt, "--allowedTools", strings.Join(tools, ",")) + cmd.Dir = task.Path cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr cmd.Stdin = os.Stdin @@ -83,21 +89,20 @@ func (s *Service) handleCommit(action ActionCommit) error { return cmd.Run() } -func (s *Service) handlePrompt(action ActionPrompt) error { - tools := "Bash,Read,Glob,Grep" - if len(action.AllowedTools) > 0 { - tools = "" - for i, t := range action.AllowedTools { - if i > 0 { - tools += "," - } - tools += t - } +func (s *Service) doPrompt(task TaskPrompt) error { + opts := s.Opts() + tools := opts.DefaultTools + if len(tools) == 0 { + tools = []string{"Bash", "Read", "Glob", "Grep"} } - cmd := exec.CommandContext(context.Background(), "claude", "-p", action.Prompt, "--allowedTools", tools) - if action.WorkDir != "" { - cmd.Dir = action.WorkDir + if len(task.AllowedTools) > 0 { + tools = task.AllowedTools + } + + cmd := exec.CommandContext(context.Background(), "claude", "-p", task.Prompt, "--allowedTools", strings.Join(tools, ",")) + if task.WorkDir != "" { + cmd.Dir = task.WorkDir } cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr diff --git a/pkg/dev/service.go b/pkg/dev/service.go new file mode 100644 index 00000000..d7786397 --- /dev/null +++ b/pkg/dev/service.go @@ -0,0 +1,313 @@ +package dev + +import ( + "context" + "fmt" + "os" + "sort" + "strings" + + "github.com/host-uk/core/pkg/agentic" + "github.com/host-uk/core/pkg/framework" + "github.com/host-uk/core/pkg/git" + "github.com/host-uk/core/pkg/repos" +) + +// Tasks for dev service + +// TaskWork runs the full dev workflow: status, commit, push. +type TaskWork struct { + RegistryPath string + StatusOnly bool + AutoCommit bool +} + +// TaskStatus displays git status for all repos. +type TaskStatus struct { + RegistryPath string +} + +// ServiceOptions for configuring the dev service. +type ServiceOptions struct { + RegistryPath string +} + +// Service provides dev workflow orchestration as a Core service. +type Service struct { + *framework.ServiceRuntime[ServiceOptions] +} + +// NewService creates a dev service factory. +func NewService(opts ServiceOptions) func(*framework.Core) (any, error) { + return func(c *framework.Core) (any, error) { + return &Service{ + ServiceRuntime: framework.NewServiceRuntime(c, opts), + }, nil + } +} + +// OnStartup registers task handlers. +func (s *Service) OnStartup(ctx context.Context) error { + s.Core().RegisterTask(s.handleTask) + return nil +} + +func (s *Service) handleTask(c *framework.Core, t framework.Task) (any, bool, error) { + switch m := t.(type) { + case TaskWork: + err := s.runWork(m) + return nil, true, err + + case TaskStatus: + err := s.runStatus(m) + return nil, true, err + } + return nil, false, nil +} + +func (s *Service) runWork(task TaskWork) error { + // Load registry + paths, names, err := s.loadRegistry(task.RegistryPath) + if err != nil { + return err + } + + if len(paths) == 0 { + fmt.Println("No git repositories found") + return nil + } + + // QUERY git status + result, handled, err := s.Core().QUERY(git.QueryStatus{ + Paths: paths, + Names: names, + }) + if !handled { + return fmt.Errorf("git service not available") + } + if err != nil { + return err + } + statuses := result.([]git.RepoStatus) + + // Sort by name + sort.Slice(statuses, func(i, j int) bool { + return statuses[i].Name < statuses[j].Name + }) + + // Display status table + s.printStatusTable(statuses) + + // Collect dirty and ahead repos + var dirtyRepos []git.RepoStatus + var aheadRepos []git.RepoStatus + + for _, st := range statuses { + if st.Error != nil { + continue + } + if st.IsDirty() { + dirtyRepos = append(dirtyRepos, st) + } + if st.HasUnpushed() { + aheadRepos = append(aheadRepos, st) + } + } + + // Auto-commit dirty repos if requested + if task.AutoCommit && len(dirtyRepos) > 0 { + fmt.Println() + fmt.Println("Committing changes...") + fmt.Println() + + for _, repo := range dirtyRepos { + _, handled, err := s.Core().PERFORM(agentic.TaskCommit{ + Path: repo.Path, + Name: repo.Name, + }) + if !handled { + // Agentic service not available - skip silently + fmt.Printf(" - %s: agentic service not available\n", repo.Name) + continue + } + if err != nil { + fmt.Printf(" x %s: %s\n", repo.Name, err) + } else { + fmt.Printf(" v %s\n", repo.Name) + } + } + + // Re-query status after commits + result, _, _ = s.Core().QUERY(git.QueryStatus{ + Paths: paths, + Names: names, + }) + statuses = result.([]git.RepoStatus) + + // Rebuild ahead repos list + aheadRepos = nil + for _, st := range statuses { + if st.Error == nil && st.HasUnpushed() { + aheadRepos = append(aheadRepos, st) + } + } + } + + // If status only, we're done + if task.StatusOnly { + if len(dirtyRepos) > 0 && !task.AutoCommit { + fmt.Println() + fmt.Println("Use --commit flag to auto-commit dirty repos") + } + return nil + } + + // Push repos with unpushed commits + if len(aheadRepos) == 0 { + fmt.Println() + fmt.Println("All repositories are up to date") + return nil + } + + fmt.Println() + fmt.Printf("%d repos with unpushed commits:\n", len(aheadRepos)) + for _, st := range aheadRepos { + fmt.Printf(" %s: %d commits\n", st.Name, st.Ahead) + } + + fmt.Println() + fmt.Print("Push all? [y/N] ") + var answer string + fmt.Scanln(&answer) + if strings.ToLower(answer) != "y" { + fmt.Println("Aborted") + return nil + } + + fmt.Println() + + // Push each repo + for _, st := range aheadRepos { + _, handled, err := s.Core().PERFORM(git.TaskPush{ + Path: st.Path, + Name: st.Name, + }) + if !handled { + fmt.Printf(" x %s: git service not available\n", st.Name) + continue + } + if err != nil { + if git.IsNonFastForward(err) { + fmt.Printf(" ! %s: branch has diverged\n", st.Name) + } else { + fmt.Printf(" x %s: %s\n", st.Name, err) + } + } else { + fmt.Printf(" v %s\n", st.Name) + } + } + + return nil +} + +func (s *Service) runStatus(task TaskStatus) error { + paths, names, err := s.loadRegistry(task.RegistryPath) + if err != nil { + return err + } + + if len(paths) == 0 { + fmt.Println("No git repositories found") + return nil + } + + result, handled, err := s.Core().QUERY(git.QueryStatus{ + Paths: paths, + Names: names, + }) + if !handled { + return fmt.Errorf("git service not available") + } + if err != nil { + return err + } + + statuses := result.([]git.RepoStatus) + sort.Slice(statuses, func(i, j int) bool { + return statuses[i].Name < statuses[j].Name + }) + + s.printStatusTable(statuses) + return nil +} + +func (s *Service) loadRegistry(registryPath string) ([]string, map[string]string, error) { + var reg *repos.Registry + var err error + + if registryPath != "" { + reg, err = repos.LoadRegistry(registryPath) + if err != nil { + return nil, nil, fmt.Errorf("failed to load registry: %w", err) + } + fmt.Printf("Registry: %s\n\n", registryPath) + } else { + registryPath, err = repos.FindRegistry() + if err == nil { + reg, err = repos.LoadRegistry(registryPath) + if err != nil { + return nil, nil, fmt.Errorf("failed to load registry: %w", err) + } + fmt.Printf("Registry: %s\n\n", registryPath) + } else { + // Fallback: scan current directory + cwd, _ := os.Getwd() + reg, err = repos.ScanDirectory(cwd) + if err != nil { + return nil, nil, fmt.Errorf("failed to scan directory: %w", err) + } + fmt.Printf("Scanning: %s\n\n", cwd) + } + } + + var paths []string + names := make(map[string]string) + + for _, repo := range reg.List() { + if repo.IsGitRepo() { + paths = append(paths, repo.Path) + names[repo.Path] = repo.Name + } + } + + return paths, names, nil +} + +func (s *Service) printStatusTable(statuses []git.RepoStatus) { + // Calculate column widths + nameWidth := 4 // "Repo" + for _, st := range statuses { + if len(st.Name) > nameWidth { + nameWidth = len(st.Name) + } + } + + // Print header + fmt.Printf("%-*s %8s %9s %6s %5s\n", + nameWidth, "Repo", "Modified", "Untracked", "Staged", "Ahead") + + // Print separator + fmt.Println(strings.Repeat("-", nameWidth+2+10+11+8+7)) + + // Print rows + for _, st := range statuses { + if st.Error != nil { + fmt.Printf("%-*s error: %s\n", nameWidth, st.Name, st.Error) + continue + } + + fmt.Printf("%-*s %8d %9d %6d %5d\n", + nameWidth, st.Name, + st.Modified, st.Untracked, st.Staged, st.Ahead) + } +} diff --git a/pkg/framework/core/core.go b/pkg/framework/core/core.go index 29395fb2..ade5b94f 100644 --- a/pkg/framework/core/core.go +++ b/pkg/framework/core/core.go @@ -200,6 +200,73 @@ func (c *Core) RegisterActions(handlers ...func(*Core, Message) error) { c.ipcMu.Unlock() } +// QUERY dispatches a query to handlers until one responds. +// Returns (result, handled, error). If no handler responds, handled is false. +func (c *Core) QUERY(q Query) (any, bool, error) { + c.queryMu.RLock() + handlers := append([]QueryHandler(nil), c.queryHandlers...) + c.queryMu.RUnlock() + + for _, h := range handlers { + result, handled, err := h(c, q) + if handled { + return result, true, err + } + } + return nil, false, nil +} + +// QUERYALL dispatches a query to all handlers and collects all responses. +// Returns all results from handlers that responded. +func (c *Core) QUERYALL(q Query) ([]any, error) { + c.queryMu.RLock() + handlers := append([]QueryHandler(nil), c.queryHandlers...) + c.queryMu.RUnlock() + + var results []any + var agg error + for _, h := range handlers { + result, handled, err := h(c, q) + if err != nil { + agg = errors.Join(agg, err) + } + if handled && result != nil { + results = append(results, result) + } + } + return results, agg +} + +// PERFORM dispatches a task to handlers until one executes it. +// Returns (result, handled, error). If no handler responds, handled is false. +func (c *Core) PERFORM(t Task) (any, bool, error) { + c.taskMu.RLock() + handlers := append([]TaskHandler(nil), c.taskHandlers...) + c.taskMu.RUnlock() + + for _, h := range handlers { + result, handled, err := h(c, t) + if handled { + return result, true, err + } + } + return nil, false, nil +} + +// RegisterQuery adds a query handler to the Core. +func (c *Core) RegisterQuery(handler QueryHandler) { + c.queryMu.Lock() + c.queryHandlers = append(c.queryHandlers, handler) + c.queryMu.Unlock() +} + +// RegisterTask adds a task handler to the Core. +func (c *Core) RegisterTask(handler TaskHandler) { + c.taskMu.Lock() + c.taskHandlers = append(c.taskHandlers, handler) + c.taskMu.Unlock() +} + // RegisterService adds a new service to the Core. func (c *Core) RegisterService(name string, api any) error { if c.servicesLocked { diff --git a/pkg/framework/core/interfaces.go b/pkg/framework/core/interfaces.go index 3a803847..f382cff6 100644 --- a/pkg/framework/core/interfaces.go +++ b/pkg/framework/core/interfaces.go @@ -42,8 +42,25 @@ type Option func(*Core) error // Message is the interface for all messages that can be sent through the Core's IPC system. // Any struct can be a message, allowing for structured data to be passed between services. +// Used with ACTION for fire-and-forget broadcasts. type Message interface{} +// Query is the interface for read-only requests that return data. +// Used with QUERY (first responder) or QUERYALL (all responders). +type Query interface{} + +// Task is the interface for requests that perform side effects. +// Used with PERFORM (first responder executes). +type Task interface{} + +// QueryHandler handles Query requests. Returns (result, handled, error). +// If handled is false, the query will be passed to the next handler. +type QueryHandler func(*Core, Query) (any, bool, error) + +// TaskHandler handles Task requests. Returns (result, handled, error). +// If handled is false, the task will be passed to the next handler. +type TaskHandler func(*Core, Task) (any, bool, error) + // Startable is an interface for services that need to perform initialization. type Startable interface { OnStartup(ctx context.Context) error @@ -64,6 +81,10 @@ type Core struct { serviceLock bool ipcMu sync.RWMutex ipcHandlers []func(*Core, Message) error + queryMu sync.RWMutex + queryHandlers []QueryHandler + taskMu sync.RWMutex + taskHandlers []TaskHandler serviceMu sync.RWMutex services map[string]any servicesLocked bool diff --git a/pkg/framework/core/query_test.go b/pkg/framework/core/query_test.go new file mode 100644 index 00000000..43b00fb6 --- /dev/null +++ b/pkg/framework/core/query_test.go @@ -0,0 +1,201 @@ +package core + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +type TestQuery struct { + Value string +} + +type TestTask struct { + Value string +} + +func TestCore_QUERY_Good(t *testing.T) { + c, err := New() + assert.NoError(t, err) + + // Register a handler that responds to TestQuery + c.RegisterQuery(func(c *Core, q Query) (any, bool, error) { + if tq, ok := q.(TestQuery); ok { + return "result-" + tq.Value, true, nil + } + return nil, false, nil + }) + + result, handled, err := c.QUERY(TestQuery{Value: "test"}) + assert.NoError(t, err) + assert.True(t, handled) + assert.Equal(t, "result-test", result) +} + +func TestCore_QUERY_NotHandled(t *testing.T) { + c, err := New() + assert.NoError(t, err) + + // No handlers registered + result, handled, err := c.QUERY(TestQuery{Value: "test"}) + assert.NoError(t, err) + assert.False(t, handled) + assert.Nil(t, result) +} + +func TestCore_QUERY_FirstResponder(t *testing.T) { + c, err := New() + assert.NoError(t, err) + + // First handler responds + c.RegisterQuery(func(c *Core, q Query) (any, bool, error) { + return "first", true, nil + }) + + // Second handler would respond but shouldn't be called + c.RegisterQuery(func(c *Core, q Query) (any, bool, error) { + return "second", true, nil + }) + + result, handled, err := c.QUERY(TestQuery{}) + assert.NoError(t, err) + assert.True(t, handled) + assert.Equal(t, "first", result) +} + +func TestCore_QUERY_SkipsNonHandlers(t *testing.T) { + c, err := New() + assert.NoError(t, err) + + // First handler doesn't handle + c.RegisterQuery(func(c *Core, q Query) (any, bool, error) { + return nil, false, nil + }) + + // Second handler responds + c.RegisterQuery(func(c *Core, q Query) (any, bool, error) { + return "second", true, nil + }) + + result, handled, err := c.QUERY(TestQuery{}) + assert.NoError(t, err) + assert.True(t, handled) + assert.Equal(t, "second", result) +} + +func TestCore_QUERYALL_Good(t *testing.T) { + c, err := New() + assert.NoError(t, err) + + // Multiple handlers respond + c.RegisterQuery(func(c *Core, q Query) (any, bool, error) { + return "first", true, nil + }) + + c.RegisterQuery(func(c *Core, q Query) (any, bool, error) { + return "second", true, nil + }) + + c.RegisterQuery(func(c *Core, q Query) (any, bool, error) { + return nil, false, nil // Doesn't handle + }) + + results, err := c.QUERYALL(TestQuery{}) + assert.NoError(t, err) + assert.Len(t, results, 2) + assert.Contains(t, results, "first") + assert.Contains(t, results, "second") +} + +func TestCore_QUERYALL_AggregatesErrors(t *testing.T) { + c, err := New() + assert.NoError(t, err) + + err1 := errors.New("error1") + err2 := errors.New("error2") + + c.RegisterQuery(func(c *Core, q Query) (any, bool, error) { + return "result1", true, err1 + }) + + c.RegisterQuery(func(c *Core, q Query) (any, bool, error) { + return "result2", true, err2 + }) + + results, err := c.QUERYALL(TestQuery{}) + assert.Error(t, err) + assert.ErrorIs(t, err, err1) + assert.ErrorIs(t, err, err2) + assert.Len(t, results, 2) +} + +func TestCore_PERFORM_Good(t *testing.T) { + c, err := New() + assert.NoError(t, err) + + executed := false + c.RegisterTask(func(c *Core, t Task) (any, bool, error) { + if tt, ok := t.(TestTask); ok { + executed = true + return "done-" + tt.Value, true, nil + } + return nil, false, nil + }) + + result, handled, err := c.PERFORM(TestTask{Value: "work"}) + assert.NoError(t, err) + assert.True(t, handled) + assert.True(t, executed) + assert.Equal(t, "done-work", result) +} + +func TestCore_PERFORM_NotHandled(t *testing.T) { + c, err := New() + assert.NoError(t, err) + + // No handlers registered + result, handled, err := c.PERFORM(TestTask{Value: "work"}) + assert.NoError(t, err) + assert.False(t, handled) + assert.Nil(t, result) +} + +func TestCore_PERFORM_FirstResponder(t *testing.T) { + c, err := New() + assert.NoError(t, err) + + callCount := 0 + + c.RegisterTask(func(c *Core, t Task) (any, bool, error) { + callCount++ + return "first", true, nil + }) + + c.RegisterTask(func(c *Core, t Task) (any, bool, error) { + callCount++ + return "second", true, nil + }) + + result, handled, err := c.PERFORM(TestTask{}) + assert.NoError(t, err) + assert.True(t, handled) + assert.Equal(t, "first", result) + assert.Equal(t, 1, callCount) // Only first handler called +} + +func TestCore_PERFORM_WithError(t *testing.T) { + c, err := New() + assert.NoError(t, err) + + expectedErr := errors.New("task failed") + c.RegisterTask(func(c *Core, t Task) (any, bool, error) { + return nil, true, expectedErr + }) + + result, handled, err := c.PERFORM(TestTask{}) + assert.Error(t, err) + assert.ErrorIs(t, err, expectedErr) + assert.True(t, handled) + assert.Nil(t, result) +} diff --git a/pkg/framework/core/runtime_pkg.go b/pkg/framework/core/runtime_pkg.go index 04a3bd93..71199f6a 100644 --- a/pkg/framework/core/runtime_pkg.go +++ b/pkg/framework/core/runtime_pkg.go @@ -27,6 +27,11 @@ func (r *ServiceRuntime[T]) Core() *Core { return r.core } +// Opts returns the service-specific options. +func (r *ServiceRuntime[T]) Opts() T { + return r.opts +} + // Config returns the registered Config service from the core application. // This is a convenience method for accessing the application's configuration. func (r *ServiceRuntime[T]) Config() Config { diff --git a/pkg/framework/framework.go b/pkg/framework/framework.go index 64540364..1ce53fb8 100644 --- a/pkg/framework/framework.go +++ b/pkg/framework/framework.go @@ -19,6 +19,10 @@ type ( Core = core.Core Option = core.Option Message = core.Message + Query = core.Query + Task = core.Task + QueryHandler = core.QueryHandler + TaskHandler = core.TaskHandler Startable = core.Startable Stoppable = core.Stoppable Config = core.Config diff --git a/pkg/git/service.go b/pkg/git/service.go index e26d7d48..2ed11da2 100644 --- a/pkg/git/service.go +++ b/pkg/git/service.go @@ -6,19 +6,39 @@ import ( "github.com/host-uk/core/pkg/framework" ) -// Actions for git service IPC +// Queries for git service -// ActionStatus requests git status for paths. -type ActionStatus struct { +// QueryStatus requests git status for paths. +type QueryStatus struct { Paths []string Names map[string]string } -// ActionPush requests git push for a path. -type ActionPush struct{ Path, Name string } +// QueryDirtyRepos requests repos with uncommitted changes. +type QueryDirtyRepos struct{} -// ActionPull requests git pull for a path. -type ActionPull struct{ Path, Name string } +// QueryAheadRepos requests repos with unpushed commits. +type QueryAheadRepos struct{} + +// Tasks for git service + +// TaskPush requests git push for a path. +type TaskPush struct { + Path string + Name string +} + +// TaskPull requests git pull for a path. +type TaskPull struct { + Path string + Name string +} + +// TaskPushMultiple requests git push for multiple paths. +type TaskPushMultiple struct { + Paths []string + Names map[string]string +} // ServiceOptions for configuring the git service. type ServiceOptions struct { @@ -40,40 +60,47 @@ func NewService(opts ServiceOptions) func(*framework.Core) (any, error) { } } -// OnStartup registers action handlers. +// OnStartup registers query and task handlers. func (s *Service) OnStartup(ctx context.Context) error { - s.Core().RegisterAction(s.handle) + s.Core().RegisterQuery(s.handleQuery) + s.Core().RegisterTask(s.handleTask) return nil } -func (s *Service) handle(c *framework.Core, msg framework.Message) error { - switch m := msg.(type) { - case ActionStatus: - return s.handleStatus(m) - case ActionPush: - return s.handlePush(m) - case ActionPull: - return s.handlePull(m) +func (s *Service) handleQuery(c *framework.Core, q framework.Query) (any, bool, error) { + switch m := q.(type) { + case QueryStatus: + statuses := Status(context.Background(), StatusOptions{ + Paths: m.Paths, + Names: m.Names, + }) + s.lastStatus = statuses + return statuses, true, nil + + case QueryDirtyRepos: + return s.DirtyRepos(), true, nil + + case QueryAheadRepos: + return s.AheadRepos(), true, nil } - return nil + return nil, false, nil } -func (s *Service) handleStatus(action ActionStatus) error { - ctx := context.Background() - statuses := Status(ctx, StatusOptions{ - Paths: action.Paths, - Names: action.Names, - }) - s.lastStatus = statuses - return nil -} +func (s *Service) handleTask(c *framework.Core, t framework.Task) (any, bool, error) { + switch m := t.(type) { + case TaskPush: + err := Push(context.Background(), m.Path) + return nil, true, err -func (s *Service) handlePush(action ActionPush) error { - return Push(context.Background(), action.Path) -} + case TaskPull: + err := Pull(context.Background(), m.Path) + return nil, true, err -func (s *Service) handlePull(action ActionPull) error { - return Pull(context.Background(), action.Path) + case TaskPushMultiple: + results := PushMultiple(context.Background(), m.Paths, m.Names) + return results, true, nil + } + return nil, false, nil } // Status returns last status result.