feat(framework): add QUERY/QUERYALL/PERFORM dispatch patterns

Implements the Core IPC design with four dispatch patterns:
- ACTION: fire-and-forget broadcast (existing)
- QUERY: first responder returns data
- QUERYALL: all responders return data
- PERFORM: first responder executes task

Updates git and agentic services to use Query/Task patterns.
Adds dev service for workflow orchestration.
Refactors dev work command to use worker bundles.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Snider 2026-01-30 10:18:54 +00:00
parent eecf267935
commit 6ed025d3e6
10 changed files with 912 additions and 131 deletions

89
cmd/dev/bundles.go Normal file
View file

@ -0,0 +1,89 @@
package dev
import (
"context"
"github.com/host-uk/core/pkg/agentic"
devpkg "github.com/host-uk/core/pkg/dev"
"github.com/host-uk/core/pkg/framework"
"github.com/host-uk/core/pkg/git"
)
// WorkBundle contains the Core instance for dev work operations.
type WorkBundle struct {
Core *framework.Core
}
// WorkBundleOptions configures the work bundle.
type WorkBundleOptions struct {
RegistryPath string
AllowEdit bool // Allow agentic to use Write/Edit tools
}
// NewWorkBundle creates a bundle for dev work operations.
// Includes: dev (orchestration), git, agentic services.
func NewWorkBundle(opts WorkBundleOptions) (*WorkBundle, error) {
c, err := framework.New(
framework.WithService(devpkg.NewService(devpkg.ServiceOptions{
RegistryPath: opts.RegistryPath,
})),
framework.WithService(git.NewService(git.ServiceOptions{})),
framework.WithService(agentic.NewService(agentic.ServiceOptions{
AllowEdit: opts.AllowEdit,
})),
framework.WithServiceLock(),
)
if err != nil {
return nil, err
}
return &WorkBundle{Core: c}, nil
}
// Start initialises the bundle services.
func (b *WorkBundle) Start(ctx context.Context) error {
return b.Core.ServiceStartup(ctx, nil)
}
// Stop shuts down the bundle services.
func (b *WorkBundle) Stop(ctx context.Context) error {
return b.Core.ServiceShutdown(ctx)
}
// StatusBundle contains the Core instance for status-only operations.
type StatusBundle struct {
Core *framework.Core
}
// StatusBundleOptions configures the status bundle.
type StatusBundleOptions struct {
RegistryPath string
}
// NewStatusBundle creates a bundle for status-only operations.
// Includes: dev (orchestration), git services. No agentic - commits not available.
func NewStatusBundle(opts StatusBundleOptions) (*StatusBundle, error) {
c, err := framework.New(
framework.WithService(devpkg.NewService(devpkg.ServiceOptions{
RegistryPath: opts.RegistryPath,
})),
framework.WithService(git.NewService(git.ServiceOptions{})),
// No agentic service - TaskCommit will be unhandled
framework.WithServiceLock(),
)
if err != nil {
return nil, err
}
return &StatusBundle{Core: c}, nil
}
// Start initialises the bundle services.
func (b *StatusBundle) Start(ctx context.Context) error {
return b.Core.ServiceStartup(ctx, nil)
}
// Stop shuts down the bundle services.
func (b *StatusBundle) Stop(ctx context.Context) error {
return b.Core.ServiceShutdown(ctx)
}

View file

@ -44,44 +44,24 @@ func addWorkCommand(parent *cobra.Command) {
func runWork(registryPath string, statusOnly, autoCommit bool) error {
ctx := context.Background()
// Find or use provided registry, fall back to directory scan
var reg *repos.Registry
var err error
if registryPath != "" {
reg, err = repos.LoadRegistry(registryPath)
// Build worker bundle with required services
bundle, err := NewWorkBundle(WorkBundleOptions{
RegistryPath: registryPath,
})
if err != nil {
return fmt.Errorf("failed to load registry: %w", err)
}
fmt.Printf("%s %s\n\n", dimStyle.Render(i18n.T("cmd.dev.registry_label")), registryPath)
} else {
registryPath, err = repos.FindRegistry()
if err == nil {
reg, err = repos.LoadRegistry(registryPath)
if err != nil {
return fmt.Errorf("failed to load registry: %w", err)
}
fmt.Printf("%s %s\n\n", dimStyle.Render(i18n.T("cmd.dev.registry_label")), registryPath)
} else {
// Fallback: scan current directory
cwd, _ := os.Getwd()
reg, err = repos.ScanDirectory(cwd)
if err != nil {
return fmt.Errorf("failed to scan directory: %w", err)
}
fmt.Printf("%s %s\n\n", dimStyle.Render(i18n.T("cmd.dev.scanning_label")), cwd)
}
return err
}
// Build paths and names for git operations
var paths []string
names := make(map[string]string)
for _, repo := range reg.List() {
if repo.IsGitRepo() {
paths = append(paths, repo.Path)
names[repo.Path] = repo.Name
// Start services (registers handlers)
if err := bundle.Start(ctx); err != nil {
return err
}
defer bundle.Stop(ctx)
// Load registry and get paths
paths, names, err := loadRegistry(registryPath)
if err != nil {
return err
}
if len(paths) == 0 {
@ -89,11 +69,18 @@ func runWork(registryPath string, statusOnly, autoCommit bool) error {
return nil
}
// Get status for all repos
statuses := git.Status(ctx, git.StatusOptions{
// QUERY git status
result, handled, err := bundle.Core.QUERY(git.QueryStatus{
Paths: paths,
Names: names,
})
if !handled {
return fmt.Errorf("git service not available")
}
if err != nil {
return err
}
statuses := result.([]git.RepoStatus)
// Sort by repo name for consistent output
sort.Slice(statuses, func(i, j int) bool {
@ -126,18 +113,28 @@ func runWork(registryPath string, statusOnly, autoCommit bool) error {
fmt.Println()
for _, s := range dirtyRepos {
if err := claudeCommit(ctx, s.Path, s.Name, registryPath); err != nil {
// PERFORM commit via agentic service
_, handled, err := bundle.Core.PERFORM(agentic.TaskCommit{
Path: s.Path,
Name: s.Name,
})
if !handled {
fmt.Printf(" %s %s: %s\n", warningStyle.Render("!"), s.Name, "agentic service not available")
continue
}
if err != nil {
fmt.Printf(" %s %s: %s\n", errorStyle.Render("x"), s.Name, err)
} else {
fmt.Printf(" %s %s\n", successStyle.Render("v"), s.Name)
}
}
// Re-check status after commits
statuses = git.Status(ctx, git.StatusOptions{
// Re-QUERY status after commits
result, _, _ = bundle.Core.QUERY(git.QueryStatus{
Paths: paths,
Names: names,
})
statuses = result.([]git.RepoStatus)
// Rebuild ahead repos list
aheadRepos = nil
@ -178,27 +175,27 @@ func runWork(registryPath string, statusOnly, autoCommit bool) error {
fmt.Println()
// Push sequentially (SSH passphrase needs interaction)
var pushPaths []string
// PERFORM push for each repo
var divergedRepos []git.RepoStatus
for _, s := range aheadRepos {
pushPaths = append(pushPaths, s.Path)
_, handled, err := bundle.Core.PERFORM(git.TaskPush{
Path: s.Path,
Name: s.Name,
})
if !handled {
fmt.Printf(" %s %s: %s\n", errorStyle.Render("x"), s.Name, "git service not available")
continue
}
results := git.PushMultiple(ctx, pushPaths, names)
var divergedRepos []git.PushResult
for _, r := range results {
if r.Success {
fmt.Printf(" %s %s\n", successStyle.Render("v"), r.Name)
if err != nil {
if git.IsNonFastForward(err) {
fmt.Printf(" %s %s: %s\n", warningStyle.Render("!"), s.Name, i18n.T("cmd.dev.push.diverged"))
divergedRepos = append(divergedRepos, s)
} else {
// Check if this is a non-fast-forward error (diverged branch)
if git.IsNonFastForward(r.Error) {
fmt.Printf(" %s %s: %s\n", warningStyle.Render("!"), r.Name, i18n.T("cmd.dev.push.diverged"))
divergedRepos = append(divergedRepos, r)
} else {
fmt.Printf(" %s %s: %s\n", errorStyle.Render("x"), r.Name, r.Error)
fmt.Printf(" %s %s: %s\n", errorStyle.Render("x"), s.Name, err)
}
} else {
fmt.Printf(" %s %s\n", successStyle.Render("v"), s.Name)
}
}
@ -208,18 +205,26 @@ func runWork(registryPath string, statusOnly, autoCommit bool) error {
fmt.Printf("%s\n", i18n.T("cmd.dev.push.diverged_help"))
if shared.Confirm(i18n.T("cmd.dev.push.pull_and_retry")) {
fmt.Println()
for _, r := range divergedRepos {
fmt.Printf(" %s %s...\n", dimStyle.Render("↓"), r.Name)
if err := git.Pull(ctx, r.Path); err != nil {
fmt.Printf(" %s %s: %s\n", errorStyle.Render("x"), r.Name, err)
for _, s := range divergedRepos {
fmt.Printf(" %s %s...\n", dimStyle.Render("↓"), s.Name)
// PERFORM pull
_, _, err := bundle.Core.PERFORM(git.TaskPull{Path: s.Path, Name: s.Name})
if err != nil {
fmt.Printf(" %s %s: %s\n", errorStyle.Render("x"), s.Name, err)
continue
}
fmt.Printf(" %s %s...\n", dimStyle.Render("↑"), r.Name)
if err := git.Push(ctx, r.Path); err != nil {
fmt.Printf(" %s %s: %s\n", errorStyle.Render("x"), r.Name, err)
fmt.Printf(" %s %s...\n", dimStyle.Render("↑"), s.Name)
// PERFORM push
_, _, err = bundle.Core.PERFORM(git.TaskPush{Path: s.Path, Name: s.Name})
if err != nil {
fmt.Printf(" %s %s: %s\n", errorStyle.Render("x"), s.Name, err)
continue
}
fmt.Printf(" %s %s\n", successStyle.Render("v"), r.Name)
fmt.Printf(" %s %s\n", successStyle.Render("v"), s.Name)
}
}
}
@ -301,6 +306,7 @@ func printStatusTable(statuses []git.RepoStatus) {
}
}
// claudeCommit shells out to claude for committing (legacy helper for other commands)
func claudeCommit(ctx context.Context, repoPath, repoName, registryPath string) error {
prompt := agentic.Prompt("commit")
@ -313,6 +319,7 @@ func claudeCommit(ctx context.Context, repoPath, repoName, registryPath string)
return cmd.Run()
}
// claudeEditCommit shells out to claude with edit permissions (legacy helper)
func claudeEditCommit(ctx context.Context, repoPath, repoName, registryPath string) error {
prompt := agentic.Prompt("commit")
@ -324,3 +331,45 @@ func claudeEditCommit(ctx context.Context, repoPath, repoName, registryPath stri
return cmd.Run()
}
func loadRegistry(registryPath string) ([]string, map[string]string, error) {
var reg *repos.Registry
var err error
if registryPath != "" {
reg, err = repos.LoadRegistry(registryPath)
if err != nil {
return nil, nil, fmt.Errorf("failed to load registry: %w", err)
}
fmt.Printf("%s %s\n\n", dimStyle.Render(i18n.T("cmd.dev.registry_label")), registryPath)
} else {
registryPath, err = repos.FindRegistry()
if err == nil {
reg, err = repos.LoadRegistry(registryPath)
if err != nil {
return nil, nil, fmt.Errorf("failed to load registry: %w", err)
}
fmt.Printf("%s %s\n\n", dimStyle.Render(i18n.T("cmd.dev.registry_label")), registryPath)
} else {
// Fallback: scan current directory
cwd, _ := os.Getwd()
reg, err = repos.ScanDirectory(cwd)
if err != nil {
return nil, nil, fmt.Errorf("failed to scan directory: %w", err)
}
fmt.Printf("%s %s\n\n", dimStyle.Render(i18n.T("cmd.dev.scanning_label")), cwd)
}
}
var paths []string
names := make(map[string]string)
for _, repo := range reg.List() {
if repo.IsGitRepo() {
paths = append(paths, repo.Path)
names[repo.Path] = repo.Name
}
}
return paths, names, nil
}

View file

@ -4,21 +4,22 @@ import (
"context"
"os"
"os/exec"
"strings"
"github.com/host-uk/core/pkg/framework"
)
// Actions for AI service IPC
// Tasks for AI service
// ActionCommit requests Claude to create a commit.
type ActionCommit struct {
// TaskCommit requests Claude to create a commit.
type TaskCommit struct {
Path string
Name string
CanEdit bool // allow Write/Edit tools
}
// ActionPrompt sends a custom prompt to Claude.
type ActionPrompt struct {
// TaskPrompt sends a custom prompt to Claude.
type TaskPrompt struct {
Prompt string
WorkDir string
AllowedTools []string
@ -27,12 +28,14 @@ type ActionPrompt struct {
// ServiceOptions for configuring the AI service.
type ServiceOptions struct {
DefaultTools []string
AllowEdit bool // global permission for Write/Edit tools
}
// DefaultServiceOptions returns sensible defaults.
func DefaultServiceOptions() ServiceOptions {
return ServiceOptions{
DefaultTools: []string{"Bash", "Read", "Glob", "Grep"},
AllowEdit: false,
}
}
@ -50,32 +53,35 @@ func NewService(opts ServiceOptions) func(*framework.Core) (any, error) {
}
}
// OnStartup registers action handlers.
// OnStartup registers task handlers.
func (s *Service) OnStartup(ctx context.Context) error {
s.Core().RegisterAction(s.handle)
s.Core().RegisterTask(s.handleTask)
return nil
}
func (s *Service) handle(c *framework.Core, msg framework.Message) error {
switch m := msg.(type) {
case ActionCommit:
return s.handleCommit(m)
case ActionPrompt:
return s.handlePrompt(m)
func (s *Service) handleTask(c *framework.Core, t framework.Task) (any, bool, error) {
switch m := t.(type) {
case TaskCommit:
err := s.doCommit(m)
return nil, true, err
case TaskPrompt:
err := s.doPrompt(m)
return nil, true, err
}
return nil
return nil, false, nil
}
func (s *Service) handleCommit(action ActionCommit) error {
func (s *Service) doCommit(task TaskCommit) error {
prompt := Prompt("commit")
tools := "Bash,Read,Glob,Grep"
if action.CanEdit {
tools = "Bash,Read,Write,Edit,Glob,Grep"
tools := []string{"Bash", "Read", "Glob", "Grep"}
if task.CanEdit {
tools = []string{"Bash", "Read", "Write", "Edit", "Glob", "Grep"}
}
cmd := exec.CommandContext(context.Background(), "claude", "-p", prompt, "--allowedTools", tools)
cmd.Dir = action.Path
cmd := exec.CommandContext(context.Background(), "claude", "-p", prompt, "--allowedTools", strings.Join(tools, ","))
cmd.Dir = task.Path
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Stdin = os.Stdin
@ -83,21 +89,20 @@ func (s *Service) handleCommit(action ActionCommit) error {
return cmd.Run()
}
func (s *Service) handlePrompt(action ActionPrompt) error {
tools := "Bash,Read,Glob,Grep"
if len(action.AllowedTools) > 0 {
tools = ""
for i, t := range action.AllowedTools {
if i > 0 {
tools += ","
}
tools += t
}
func (s *Service) doPrompt(task TaskPrompt) error {
opts := s.Opts()
tools := opts.DefaultTools
if len(tools) == 0 {
tools = []string{"Bash", "Read", "Glob", "Grep"}
}
cmd := exec.CommandContext(context.Background(), "claude", "-p", action.Prompt, "--allowedTools", tools)
if action.WorkDir != "" {
cmd.Dir = action.WorkDir
if len(task.AllowedTools) > 0 {
tools = task.AllowedTools
}
cmd := exec.CommandContext(context.Background(), "claude", "-p", task.Prompt, "--allowedTools", strings.Join(tools, ","))
if task.WorkDir != "" {
cmd.Dir = task.WorkDir
}
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

313
pkg/dev/service.go Normal file
View file

@ -0,0 +1,313 @@
package dev
import (
"context"
"fmt"
"os"
"sort"
"strings"
"github.com/host-uk/core/pkg/agentic"
"github.com/host-uk/core/pkg/framework"
"github.com/host-uk/core/pkg/git"
"github.com/host-uk/core/pkg/repos"
)
// Tasks for dev service
// TaskWork runs the full dev workflow: status, commit, push.
type TaskWork struct {
RegistryPath string
StatusOnly bool
AutoCommit bool
}
// TaskStatus displays git status for all repos.
type TaskStatus struct {
RegistryPath string
}
// ServiceOptions for configuring the dev service.
type ServiceOptions struct {
RegistryPath string
}
// Service provides dev workflow orchestration as a Core service.
type Service struct {
*framework.ServiceRuntime[ServiceOptions]
}
// NewService creates a dev service factory.
func NewService(opts ServiceOptions) func(*framework.Core) (any, error) {
return func(c *framework.Core) (any, error) {
return &Service{
ServiceRuntime: framework.NewServiceRuntime(c, opts),
}, nil
}
}
// OnStartup registers task handlers.
func (s *Service) OnStartup(ctx context.Context) error {
s.Core().RegisterTask(s.handleTask)
return nil
}
func (s *Service) handleTask(c *framework.Core, t framework.Task) (any, bool, error) {
switch m := t.(type) {
case TaskWork:
err := s.runWork(m)
return nil, true, err
case TaskStatus:
err := s.runStatus(m)
return nil, true, err
}
return nil, false, nil
}
func (s *Service) runWork(task TaskWork) error {
// Load registry
paths, names, err := s.loadRegistry(task.RegistryPath)
if err != nil {
return err
}
if len(paths) == 0 {
fmt.Println("No git repositories found")
return nil
}
// QUERY git status
result, handled, err := s.Core().QUERY(git.QueryStatus{
Paths: paths,
Names: names,
})
if !handled {
return fmt.Errorf("git service not available")
}
if err != nil {
return err
}
statuses := result.([]git.RepoStatus)
// Sort by name
sort.Slice(statuses, func(i, j int) bool {
return statuses[i].Name < statuses[j].Name
})
// Display status table
s.printStatusTable(statuses)
// Collect dirty and ahead repos
var dirtyRepos []git.RepoStatus
var aheadRepos []git.RepoStatus
for _, st := range statuses {
if st.Error != nil {
continue
}
if st.IsDirty() {
dirtyRepos = append(dirtyRepos, st)
}
if st.HasUnpushed() {
aheadRepos = append(aheadRepos, st)
}
}
// Auto-commit dirty repos if requested
if task.AutoCommit && len(dirtyRepos) > 0 {
fmt.Println()
fmt.Println("Committing changes...")
fmt.Println()
for _, repo := range dirtyRepos {
_, handled, err := s.Core().PERFORM(agentic.TaskCommit{
Path: repo.Path,
Name: repo.Name,
})
if !handled {
// Agentic service not available - skip silently
fmt.Printf(" - %s: agentic service not available\n", repo.Name)
continue
}
if err != nil {
fmt.Printf(" x %s: %s\n", repo.Name, err)
} else {
fmt.Printf(" v %s\n", repo.Name)
}
}
// Re-query status after commits
result, _, _ = s.Core().QUERY(git.QueryStatus{
Paths: paths,
Names: names,
})
statuses = result.([]git.RepoStatus)
// Rebuild ahead repos list
aheadRepos = nil
for _, st := range statuses {
if st.Error == nil && st.HasUnpushed() {
aheadRepos = append(aheadRepos, st)
}
}
}
// If status only, we're done
if task.StatusOnly {
if len(dirtyRepos) > 0 && !task.AutoCommit {
fmt.Println()
fmt.Println("Use --commit flag to auto-commit dirty repos")
}
return nil
}
// Push repos with unpushed commits
if len(aheadRepos) == 0 {
fmt.Println()
fmt.Println("All repositories are up to date")
return nil
}
fmt.Println()
fmt.Printf("%d repos with unpushed commits:\n", len(aheadRepos))
for _, st := range aheadRepos {
fmt.Printf(" %s: %d commits\n", st.Name, st.Ahead)
}
fmt.Println()
fmt.Print("Push all? [y/N] ")
var answer string
fmt.Scanln(&answer)
if strings.ToLower(answer) != "y" {
fmt.Println("Aborted")
return nil
}
fmt.Println()
// Push each repo
for _, st := range aheadRepos {
_, handled, err := s.Core().PERFORM(git.TaskPush{
Path: st.Path,
Name: st.Name,
})
if !handled {
fmt.Printf(" x %s: git service not available\n", st.Name)
continue
}
if err != nil {
if git.IsNonFastForward(err) {
fmt.Printf(" ! %s: branch has diverged\n", st.Name)
} else {
fmt.Printf(" x %s: %s\n", st.Name, err)
}
} else {
fmt.Printf(" v %s\n", st.Name)
}
}
return nil
}
func (s *Service) runStatus(task TaskStatus) error {
paths, names, err := s.loadRegistry(task.RegistryPath)
if err != nil {
return err
}
if len(paths) == 0 {
fmt.Println("No git repositories found")
return nil
}
result, handled, err := s.Core().QUERY(git.QueryStatus{
Paths: paths,
Names: names,
})
if !handled {
return fmt.Errorf("git service not available")
}
if err != nil {
return err
}
statuses := result.([]git.RepoStatus)
sort.Slice(statuses, func(i, j int) bool {
return statuses[i].Name < statuses[j].Name
})
s.printStatusTable(statuses)
return nil
}
func (s *Service) loadRegistry(registryPath string) ([]string, map[string]string, error) {
var reg *repos.Registry
var err error
if registryPath != "" {
reg, err = repos.LoadRegistry(registryPath)
if err != nil {
return nil, nil, fmt.Errorf("failed to load registry: %w", err)
}
fmt.Printf("Registry: %s\n\n", registryPath)
} else {
registryPath, err = repos.FindRegistry()
if err == nil {
reg, err = repos.LoadRegistry(registryPath)
if err != nil {
return nil, nil, fmt.Errorf("failed to load registry: %w", err)
}
fmt.Printf("Registry: %s\n\n", registryPath)
} else {
// Fallback: scan current directory
cwd, _ := os.Getwd()
reg, err = repos.ScanDirectory(cwd)
if err != nil {
return nil, nil, fmt.Errorf("failed to scan directory: %w", err)
}
fmt.Printf("Scanning: %s\n\n", cwd)
}
}
var paths []string
names := make(map[string]string)
for _, repo := range reg.List() {
if repo.IsGitRepo() {
paths = append(paths, repo.Path)
names[repo.Path] = repo.Name
}
}
return paths, names, nil
}
func (s *Service) printStatusTable(statuses []git.RepoStatus) {
// Calculate column widths
nameWidth := 4 // "Repo"
for _, st := range statuses {
if len(st.Name) > nameWidth {
nameWidth = len(st.Name)
}
}
// Print header
fmt.Printf("%-*s %8s %9s %6s %5s\n",
nameWidth, "Repo", "Modified", "Untracked", "Staged", "Ahead")
// Print separator
fmt.Println(strings.Repeat("-", nameWidth+2+10+11+8+7))
// Print rows
for _, st := range statuses {
if st.Error != nil {
fmt.Printf("%-*s error: %s\n", nameWidth, st.Name, st.Error)
continue
}
fmt.Printf("%-*s %8d %9d %6d %5d\n",
nameWidth, st.Name,
st.Modified, st.Untracked, st.Staged, st.Ahead)
}
}

View file

@ -200,6 +200,73 @@ func (c *Core) RegisterActions(handlers ...func(*Core, Message) error) {
c.ipcMu.Unlock()
}
// QUERY dispatches a query to handlers until one responds.
// Returns (result, handled, error). If no handler responds, handled is false.
func (c *Core) QUERY(q Query) (any, bool, error) {
c.queryMu.RLock()
handlers := append([]QueryHandler(nil), c.queryHandlers...)
c.queryMu.RUnlock()
for _, h := range handlers {
result, handled, err := h(c, q)
if handled {
return result, true, err
}
}
return nil, false, nil
}
// QUERYALL dispatches a query to all handlers and collects all responses.
// Returns all results from handlers that responded.
func (c *Core) QUERYALL(q Query) ([]any, error) {
c.queryMu.RLock()
handlers := append([]QueryHandler(nil), c.queryHandlers...)
c.queryMu.RUnlock()
var results []any
var agg error
for _, h := range handlers {
result, handled, err := h(c, q)
if err != nil {
agg = errors.Join(agg, err)
}
if handled && result != nil {
results = append(results, result)
}
}
return results, agg
}
// PERFORM dispatches a task to handlers until one executes it.
// Returns (result, handled, error). If no handler responds, handled is false.
func (c *Core) PERFORM(t Task) (any, bool, error) {
c.taskMu.RLock()
handlers := append([]TaskHandler(nil), c.taskHandlers...)
c.taskMu.RUnlock()
for _, h := range handlers {
result, handled, err := h(c, t)
if handled {
return result, true, err
}
}
return nil, false, nil
}
// RegisterQuery adds a query handler to the Core.
func (c *Core) RegisterQuery(handler QueryHandler) {
c.queryMu.Lock()
c.queryHandlers = append(c.queryHandlers, handler)
c.queryMu.Unlock()
}
// RegisterTask adds a task handler to the Core.
func (c *Core) RegisterTask(handler TaskHandler) {
c.taskMu.Lock()
c.taskHandlers = append(c.taskHandlers, handler)
c.taskMu.Unlock()
}
// RegisterService adds a new service to the Core.
func (c *Core) RegisterService(name string, api any) error {
if c.servicesLocked {

View file

@ -42,8 +42,25 @@ type Option func(*Core) error
// Message is the interface for all messages that can be sent through the Core's IPC system.
// Any struct can be a message, allowing for structured data to be passed between services.
// Used with ACTION for fire-and-forget broadcasts.
type Message interface{}
// Query is the interface for read-only requests that return data.
// Used with QUERY (first responder) or QUERYALL (all responders).
type Query interface{}
// Task is the interface for requests that perform side effects.
// Used with PERFORM (first responder executes).
type Task interface{}
// QueryHandler handles Query requests. Returns (result, handled, error).
// If handled is false, the query will be passed to the next handler.
type QueryHandler func(*Core, Query) (any, bool, error)
// TaskHandler handles Task requests. Returns (result, handled, error).
// If handled is false, the task will be passed to the next handler.
type TaskHandler func(*Core, Task) (any, bool, error)
// Startable is an interface for services that need to perform initialization.
type Startable interface {
OnStartup(ctx context.Context) error
@ -64,6 +81,10 @@ type Core struct {
serviceLock bool
ipcMu sync.RWMutex
ipcHandlers []func(*Core, Message) error
queryMu sync.RWMutex
queryHandlers []QueryHandler
taskMu sync.RWMutex
taskHandlers []TaskHandler
serviceMu sync.RWMutex
services map[string]any
servicesLocked bool

View file

@ -0,0 +1,201 @@
package core
import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
)
type TestQuery struct {
Value string
}
type TestTask struct {
Value string
}
func TestCore_QUERY_Good(t *testing.T) {
c, err := New()
assert.NoError(t, err)
// Register a handler that responds to TestQuery
c.RegisterQuery(func(c *Core, q Query) (any, bool, error) {
if tq, ok := q.(TestQuery); ok {
return "result-" + tq.Value, true, nil
}
return nil, false, nil
})
result, handled, err := c.QUERY(TestQuery{Value: "test"})
assert.NoError(t, err)
assert.True(t, handled)
assert.Equal(t, "result-test", result)
}
func TestCore_QUERY_NotHandled(t *testing.T) {
c, err := New()
assert.NoError(t, err)
// No handlers registered
result, handled, err := c.QUERY(TestQuery{Value: "test"})
assert.NoError(t, err)
assert.False(t, handled)
assert.Nil(t, result)
}
func TestCore_QUERY_FirstResponder(t *testing.T) {
c, err := New()
assert.NoError(t, err)
// First handler responds
c.RegisterQuery(func(c *Core, q Query) (any, bool, error) {
return "first", true, nil
})
// Second handler would respond but shouldn't be called
c.RegisterQuery(func(c *Core, q Query) (any, bool, error) {
return "second", true, nil
})
result, handled, err := c.QUERY(TestQuery{})
assert.NoError(t, err)
assert.True(t, handled)
assert.Equal(t, "first", result)
}
func TestCore_QUERY_SkipsNonHandlers(t *testing.T) {
c, err := New()
assert.NoError(t, err)
// First handler doesn't handle
c.RegisterQuery(func(c *Core, q Query) (any, bool, error) {
return nil, false, nil
})
// Second handler responds
c.RegisterQuery(func(c *Core, q Query) (any, bool, error) {
return "second", true, nil
})
result, handled, err := c.QUERY(TestQuery{})
assert.NoError(t, err)
assert.True(t, handled)
assert.Equal(t, "second", result)
}
func TestCore_QUERYALL_Good(t *testing.T) {
c, err := New()
assert.NoError(t, err)
// Multiple handlers respond
c.RegisterQuery(func(c *Core, q Query) (any, bool, error) {
return "first", true, nil
})
c.RegisterQuery(func(c *Core, q Query) (any, bool, error) {
return "second", true, nil
})
c.RegisterQuery(func(c *Core, q Query) (any, bool, error) {
return nil, false, nil // Doesn't handle
})
results, err := c.QUERYALL(TestQuery{})
assert.NoError(t, err)
assert.Len(t, results, 2)
assert.Contains(t, results, "first")
assert.Contains(t, results, "second")
}
func TestCore_QUERYALL_AggregatesErrors(t *testing.T) {
c, err := New()
assert.NoError(t, err)
err1 := errors.New("error1")
err2 := errors.New("error2")
c.RegisterQuery(func(c *Core, q Query) (any, bool, error) {
return "result1", true, err1
})
c.RegisterQuery(func(c *Core, q Query) (any, bool, error) {
return "result2", true, err2
})
results, err := c.QUERYALL(TestQuery{})
assert.Error(t, err)
assert.ErrorIs(t, err, err1)
assert.ErrorIs(t, err, err2)
assert.Len(t, results, 2)
}
func TestCore_PERFORM_Good(t *testing.T) {
c, err := New()
assert.NoError(t, err)
executed := false
c.RegisterTask(func(c *Core, t Task) (any, bool, error) {
if tt, ok := t.(TestTask); ok {
executed = true
return "done-" + tt.Value, true, nil
}
return nil, false, nil
})
result, handled, err := c.PERFORM(TestTask{Value: "work"})
assert.NoError(t, err)
assert.True(t, handled)
assert.True(t, executed)
assert.Equal(t, "done-work", result)
}
func TestCore_PERFORM_NotHandled(t *testing.T) {
c, err := New()
assert.NoError(t, err)
// No handlers registered
result, handled, err := c.PERFORM(TestTask{Value: "work"})
assert.NoError(t, err)
assert.False(t, handled)
assert.Nil(t, result)
}
func TestCore_PERFORM_FirstResponder(t *testing.T) {
c, err := New()
assert.NoError(t, err)
callCount := 0
c.RegisterTask(func(c *Core, t Task) (any, bool, error) {
callCount++
return "first", true, nil
})
c.RegisterTask(func(c *Core, t Task) (any, bool, error) {
callCount++
return "second", true, nil
})
result, handled, err := c.PERFORM(TestTask{})
assert.NoError(t, err)
assert.True(t, handled)
assert.Equal(t, "first", result)
assert.Equal(t, 1, callCount) // Only first handler called
}
func TestCore_PERFORM_WithError(t *testing.T) {
c, err := New()
assert.NoError(t, err)
expectedErr := errors.New("task failed")
c.RegisterTask(func(c *Core, t Task) (any, bool, error) {
return nil, true, expectedErr
})
result, handled, err := c.PERFORM(TestTask{})
assert.Error(t, err)
assert.ErrorIs(t, err, expectedErr)
assert.True(t, handled)
assert.Nil(t, result)
}

View file

@ -27,6 +27,11 @@ func (r *ServiceRuntime[T]) Core() *Core {
return r.core
}
// Opts returns the service-specific options.
func (r *ServiceRuntime[T]) Opts() T {
return r.opts
}
// Config returns the registered Config service from the core application.
// This is a convenience method for accessing the application's configuration.
func (r *ServiceRuntime[T]) Config() Config {

View file

@ -19,6 +19,10 @@ type (
Core = core.Core
Option = core.Option
Message = core.Message
Query = core.Query
Task = core.Task
QueryHandler = core.QueryHandler
TaskHandler = core.TaskHandler
Startable = core.Startable
Stoppable = core.Stoppable
Config = core.Config

View file

@ -6,19 +6,39 @@ import (
"github.com/host-uk/core/pkg/framework"
)
// Actions for git service IPC
// Queries for git service
// ActionStatus requests git status for paths.
type ActionStatus struct {
// QueryStatus requests git status for paths.
type QueryStatus struct {
Paths []string
Names map[string]string
}
// ActionPush requests git push for a path.
type ActionPush struct{ Path, Name string }
// QueryDirtyRepos requests repos with uncommitted changes.
type QueryDirtyRepos struct{}
// ActionPull requests git pull for a path.
type ActionPull struct{ Path, Name string }
// QueryAheadRepos requests repos with unpushed commits.
type QueryAheadRepos struct{}
// Tasks for git service
// TaskPush requests git push for a path.
type TaskPush struct {
Path string
Name string
}
// TaskPull requests git pull for a path.
type TaskPull struct {
Path string
Name string
}
// TaskPushMultiple requests git push for multiple paths.
type TaskPushMultiple struct {
Paths []string
Names map[string]string
}
// ServiceOptions for configuring the git service.
type ServiceOptions struct {
@ -40,40 +60,47 @@ func NewService(opts ServiceOptions) func(*framework.Core) (any, error) {
}
}
// OnStartup registers action handlers.
// OnStartup registers query and task handlers.
func (s *Service) OnStartup(ctx context.Context) error {
s.Core().RegisterAction(s.handle)
s.Core().RegisterQuery(s.handleQuery)
s.Core().RegisterTask(s.handleTask)
return nil
}
func (s *Service) handle(c *framework.Core, msg framework.Message) error {
switch m := msg.(type) {
case ActionStatus:
return s.handleStatus(m)
case ActionPush:
return s.handlePush(m)
case ActionPull:
return s.handlePull(m)
}
return nil
}
func (s *Service) handleStatus(action ActionStatus) error {
ctx := context.Background()
statuses := Status(ctx, StatusOptions{
Paths: action.Paths,
Names: action.Names,
func (s *Service) handleQuery(c *framework.Core, q framework.Query) (any, bool, error) {
switch m := q.(type) {
case QueryStatus:
statuses := Status(context.Background(), StatusOptions{
Paths: m.Paths,
Names: m.Names,
})
s.lastStatus = statuses
return nil
return statuses, true, nil
case QueryDirtyRepos:
return s.DirtyRepos(), true, nil
case QueryAheadRepos:
return s.AheadRepos(), true, nil
}
return nil, false, nil
}
func (s *Service) handlePush(action ActionPush) error {
return Push(context.Background(), action.Path)
}
func (s *Service) handleTask(c *framework.Core, t framework.Task) (any, bool, error) {
switch m := t.(type) {
case TaskPush:
err := Push(context.Background(), m.Path)
return nil, true, err
func (s *Service) handlePull(action ActionPull) error {
return Pull(context.Background(), action.Path)
case TaskPull:
err := Pull(context.Background(), m.Path)
return nil, true, err
case TaskPushMultiple:
results := PushMultiple(context.Background(), m.Paths, m.Names)
return results, true, nil
}
return nil, false, nil
}
// Status returns last status result.