diff --git a/pkg/agentic/actions.go b/pkg/agentic/actions.go index f969721..6f6bd6b 100644 --- a/pkg/agentic/actions.go +++ b/pkg/agentic/actions.go @@ -12,6 +12,7 @@ package agentic import ( "context" + "dappco.re/go/agent/pkg/lib" "dappco.re/go/agent/pkg/messages" core "dappco.re/go/core" ) @@ -125,8 +126,53 @@ func (s *PrepSubsystem) handleWatch(ctx context.Context, opts core.Options) core return core.Result{Value: out, OK: true} } +// handlePrompt reads an embedded prompt by slug. +// +// r := c.Action("agentic.prompt").Run(ctx, core.NewOptions( +// core.Option{Key: "slug", Value: "coding"}, +// )) +func (s *PrepSubsystem) handlePrompt(_ context.Context, opts core.Options) core.Result { + return lib.Prompt(opts.String("slug")) +} + +// handleTask reads an embedded task plan by slug. +// +// r := c.Action("agentic.task").Run(ctx, core.NewOptions( +// core.Option{Key: "slug", Value: "bug-fix"}, +// )) +func (s *PrepSubsystem) handleTask(_ context.Context, opts core.Options) core.Result { + return lib.Task(opts.String("slug")) +} + +// handleFlow reads an embedded flow by slug. +// +// r := c.Action("agentic.flow").Run(ctx, core.NewOptions( +// core.Option{Key: "slug", Value: "go"}, +// )) +func (s *PrepSubsystem) handleFlow(_ context.Context, opts core.Options) core.Result { + return lib.Flow(opts.String("slug")) +} + +// handlePersona reads an embedded persona by path. +// +// r := c.Action("agentic.persona").Run(ctx, core.NewOptions( +// core.Option{Key: "path", Value: "code/backend-architect"}, +// )) +func (s *PrepSubsystem) handlePersona(_ context.Context, opts core.Options) core.Result { + return lib.Persona(opts.String("path")) +} + // --- Pipeline --- +// handleComplete runs the named completion task. +// +// r := c.Action("agentic.complete").Run(ctx, core.NewOptions( +// core.Option{Key: "workspace", Value: "/srv/.core/workspace/core/go-io/task-42"}, +// )) +func (s *PrepSubsystem) handleComplete(ctx context.Context, opts core.Options) core.Result { + return s.Core().Task("agent.completion").Run(ctx, s.Core(), opts) +} + // handleQA runs build+test on a completed workspace. // // r := c.Action("agentic.qa").Run(ctx, core.NewOptions( @@ -369,3 +415,27 @@ func (s *PrepSubsystem) handleEpic(ctx context.Context, opts core.Options) core. } return core.Result{Value: out, OK: true} } + +// handleWorkspaceQuery answers workspace state queries from Core QUERY calls. +// +// r := c.QUERY(agentic.WorkspaceQuery{Name: "core/go-io/task-42"}) +// r := c.QUERY(agentic.WorkspaceQuery{Status: "blocked"}) +func (s *PrepSubsystem) handleWorkspaceQuery(_ *core.Core, q core.Query) core.Result { + wq, ok := q.(WorkspaceQuery) + if !ok { + return core.Result{} + } + if wq.Name != "" { + return s.workspaces.Get(wq.Name) + } + if wq.Status != "" { + var names []string + s.workspaces.Each(func(name string, st *WorkspaceStatus) { + if st.Status == wq.Status { + names = append(names, name) + } + }) + return core.Result{Value: names, OK: true} + } + return core.Result{Value: s.workspaces, OK: true} +} diff --git a/pkg/agentic/actions_test.go b/pkg/agentic/actions_test.go index 023fc8e..83d3777 100644 --- a/pkg/agentic/actions_test.go +++ b/pkg/agentic/actions_test.go @@ -6,8 +6,10 @@ import ( "context" "testing" + "dappco.re/go/agent/pkg/lib" core "dappco.re/go/core" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestActions_HandleDispatch_Good(t *testing.T) { @@ -27,6 +29,51 @@ func TestActions_HandleStatus_Good(t *testing.T) { assert.True(t, r.OK) } +func TestActions_HandlePrompt_Good(t *testing.T) { + s := newPrepWithProcess() + r := s.handlePrompt(context.Background(), core.NewOptions( + core.Option{Key: "slug", Value: "coding"}, + )) + assert.True(t, r.OK) +} + +func TestActions_HandlePrompt_Bad(t *testing.T) { + s := newPrepWithProcess() + r := s.handlePrompt(context.Background(), core.NewOptions( + core.Option{Key: "slug", Value: "does-not-exist"}, + )) + assert.False(t, r.OK) +} + +func TestActions_HandleTask_Good(t *testing.T) { + s := newPrepWithProcess() + r := s.handleTask(context.Background(), core.NewOptions( + core.Option{Key: "slug", Value: "bug-fix"}, + )) + assert.True(t, r.OK) +} + +func TestActions_HandleFlow_Good(t *testing.T) { + s := newPrepWithProcess() + r := s.handleFlow(context.Background(), core.NewOptions( + core.Option{Key: "slug", Value: "go"}, + )) + assert.True(t, r.OK) +} + +func TestActions_HandlePersona_Good(t *testing.T) { + personas := lib.ListPersonas() + if len(personas) == 0 { + t.Skip("no personas embedded") + } + + s := newPrepWithProcess() + r := s.handlePersona(context.Background(), core.NewOptions( + core.Option{Key: "path", Value: personas[0]}, + )) + assert.True(t, r.OK) +} + func TestActions_HandlePoke_Good(t *testing.T) { s := newPrepWithProcess() s.pokeCh = make(chan struct{}, 1) @@ -51,3 +98,24 @@ func TestActions_HandleIngest_Bad_NoWorkspace(t *testing.T) { r := s.handleIngest(context.Background(), core.NewOptions()) assert.False(t, r.OK) } + +func TestActions_HandleWorkspaceQuery_Good(t *testing.T) { + s := newPrepWithProcess() + s.workspaces = core.NewRegistry[*WorkspaceStatus]() + s.workspaces.Set("core/go-io/task-42", &WorkspaceStatus{Status: "blocked", Repo: "go-io"}) + + r := s.handleWorkspaceQuery(nil, WorkspaceQuery{Status: "blocked"}) + require.True(t, r.OK) + + names, ok := r.Value.([]string) + require.True(t, ok) + require.Len(t, names, 1) + assert.Equal(t, "core/go-io/task-42", names[0]) +} + +func TestActions_HandleWorkspaceQuery_Bad(t *testing.T) { + s := newPrepWithProcess() + r := s.handleWorkspaceQuery(nil, "not-a-workspace-query") + assert.False(t, r.OK) + assert.Nil(t, r.Value) +} diff --git a/pkg/agentic/paths.go b/pkg/agentic/paths.go index 62a5ebb..fef6505 100644 --- a/pkg/agentic/paths.go +++ b/pkg/agentic/paths.go @@ -24,7 +24,7 @@ func LocalFs() *core.Fs { return fs } // WorkspaceRoot returns the root directory for agent workspaces. // Checks CORE_WORKSPACE env var first, falls back to ~/Code/.core/workspace. // -// wsDir := core.JoinPath(agentic.WorkspaceRoot(), "go-io-1774149757") +// wsDir := core.JoinPath(agentic.WorkspaceRoot(), "core", "go-io", "task-42") func WorkspaceRoot() string { return core.JoinPath(CoreRoot(), "workspace") } diff --git a/pkg/agentic/pr.go b/pkg/agentic/pr.go index 8aea37f..d997978 100644 --- a/pkg/agentic/pr.go +++ b/pkg/agentic/pr.go @@ -14,9 +14,9 @@ import ( // CreatePRInput is the input for agentic_create_pr. // -// input := agentic.CreatePRInput{Workspace: "go-io-1773581873", Title: "Fix watcher panic"} +// input := agentic.CreatePRInput{Workspace: "core/go-io/task-42", Title: "Fix watcher panic"} type CreatePRInput struct { - Workspace string `json:"workspace"` // workspace name (e.g. "mcp-1773581873") + Workspace string `json:"workspace"` // workspace name (e.g. "core/go-io/task-42") Title string `json:"title,omitempty"` // PR title (default: task description) Body string `json:"body,omitempty"` // PR body (default: auto-generated) Base string `json:"base,omitempty"` // base branch (default: "main") diff --git a/pkg/agentic/prep.go b/pkg/agentic/prep.go index 8cbb9c9..361fb4f 100644 --- a/pkg/agentic/prep.go +++ b/pkg/agentic/prep.go @@ -173,18 +173,10 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result { c.Action("agentic.epic", s.handleEpic).Description = "Create sub-issues from an epic plan" // Content — accessible via IPC, no lib import needed - c.Action("agentic.prompt", func(_ context.Context, opts core.Options) core.Result { - return lib.Prompt(opts.String("slug")) - }).Description = "Read a system prompt by slug" - c.Action("agentic.task", func(_ context.Context, opts core.Options) core.Result { - return lib.Task(opts.String("slug")) - }).Description = "Read a task plan by slug" - c.Action("agentic.flow", func(_ context.Context, opts core.Options) core.Result { - return lib.Flow(opts.String("slug")) - }).Description = "Read a build/release flow by slug" - c.Action("agentic.persona", func(_ context.Context, opts core.Options) core.Result { - return lib.Persona(opts.String("path")) - }).Description = "Read a persona by path" + c.Action("agentic.prompt", s.handlePrompt).Description = "Read a system prompt by slug" + c.Action("agentic.task", s.handleTask).Description = "Read a task plan by slug" + c.Action("agentic.flow", s.handleFlow).Description = "Read a build/release flow by slug" + c.Action("agentic.persona", s.handlePersona).Description = "Read a persona by path" // Completion pipeline — Task composition c.Task("agent.completion", core.Task{ @@ -200,9 +192,7 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result { // PerformAsync wrapper — runs the completion Task in background with progress tracking. // c.PerformAsync("agentic.complete", opts) broadcasts ActionTaskStarted/Completed. - c.Action("agentic.complete", func(ctx context.Context, opts core.Options) core.Result { - return c.Task("agent.completion").Run(ctx, c, opts) - }).Description = "Run completion pipeline (QA → PR → Verify) in background" + c.Action("agentic.complete", s.handleComplete).Description = "Run completion pipeline (QA → PR → Verify) in background" // Hydrate workspace registry from disk s.hydrateWorkspaces() @@ -211,28 +201,7 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result { // // r := c.QUERY(agentic.WorkspaceQuery{}) // if r.OK { workspaces := r.Value.(*core.Registry[*WorkspaceStatus]) } - c.RegisterQuery(func(_ *core.Core, q core.Query) core.Result { - wq, ok := q.(WorkspaceQuery) - if !ok { - return core.Result{} - } - // Specific workspace lookup - if wq.Name != "" { - return s.workspaces.Get(wq.Name) - } - // Status filter — return matching names - if wq.Status != "" { - var names []string - s.workspaces.Each(func(name string, st *WorkspaceStatus) { - if st.Status == wq.Status { - names = append(names, name) - } - }) - return core.Result{Value: names, OK: true} - } - // No filter — return full registry - return core.Result{Value: s.workspaces, OK: true} - }) + c.RegisterQuery(s.handleWorkspaceQuery) s.StartRunner() s.registerCommands(ctx) diff --git a/pkg/agentic/prep_test.go b/pkg/agentic/prep_test.go index 30c8463..4942fe5 100644 --- a/pkg/agentic/prep_test.go +++ b/pkg/agentic/prep_test.go @@ -402,7 +402,49 @@ func TestPrep_SetCore_Ugly(t *testing.T) { assert.Equal(t, c2, s.Core(), "second SetCore should overwrite first") } -// --- OnStartup Bad/Ugly --- +// --- OnStartup Good/Bad/Ugly --- + +func TestPrep_OnStartup_Good_CreatesPokeCh(t *testing.T) { + // StartRunner is now a no-op — pokeCh is no longer initialised by OnStartup. + // Verify OnStartup succeeds and pokeCh remains nil. + t.Setenv("CORE_WORKSPACE", t.TempDir()) + t.Setenv("CORE_AGENT_DISPATCH", "") + + c := core.New(core.WithOption("name", "test")) + s := NewPrep() + s.SetCore(c) + + assert.Nil(t, s.pokeCh, "pokeCh should be nil before OnStartup") + + r := s.OnStartup(context.Background()) + assert.True(t, r.OK) + + assert.Nil(t, s.pokeCh, "pokeCh should remain nil — queue drain is owned by pkg/runner") +} + +func TestPrep_OnStartup_Good_FrozenByDefault(t *testing.T) { + // Frozen state is now owned by pkg/runner.Service, not agentic. + // Verify OnStartup succeeds without asserting frozen state. + t.Setenv("CORE_WORKSPACE", t.TempDir()) + t.Setenv("CORE_AGENT_DISPATCH", "") + + c := core.New(core.WithOption("name", "test")) + s := NewPrep() + s.SetCore(c) + + assert.True(t, s.OnStartup(context.Background()).OK) +} + +func TestPrep_OnStartup_Good_NoError(t *testing.T) { + t.Setenv("CORE_WORKSPACE", t.TempDir()) + t.Setenv("CORE_AGENT_DISPATCH", "") + + c := core.New(core.WithOption("name", "test")) + s := NewPrep() + s.SetCore(c) + + assert.True(t, s.OnStartup(context.Background()).OK) +} func TestPrep_OnStartup_Bad(t *testing.T) { // OnStartup without SetCore (nil ServiceRuntime) — panics because @@ -421,8 +463,8 @@ func TestPrep_OnStartup_Ugly(t *testing.T) { // OnStartup called twice with valid core — second call should not panic s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } c := core.New(core.WithOption("name", "test")) s.SetCore(c) @@ -433,14 +475,45 @@ func TestPrep_OnStartup_Ugly(t *testing.T) { }) } -// --- OnShutdown Bad --- +// --- OnShutdown Good/Bad --- + +func TestPrep_OnShutdown_Good_FreezesQueue(t *testing.T) { + t.Setenv("CORE_WORKSPACE", t.TempDir()) + + s := &PrepSubsystem{ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), frozen: false} + r := s.OnShutdown(context.Background()) + assert.True(t, r.OK) + assert.True(t, s.frozen, "OnShutdown must set frozen=true") +} + +func TestPrep_OnShutdown_Good_AlreadyFrozen(t *testing.T) { + // Calling OnShutdown twice must be idempotent + s := &PrepSubsystem{ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), frozen: true} + r := s.OnShutdown(context.Background()) + assert.True(t, r.OK) + assert.True(t, s.frozen) +} + +func TestPrep_OnShutdown_Good_NoError(t *testing.T) { + s := &PrepSubsystem{ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{})} + assert.True(t, s.OnShutdown(context.Background()).OK) +} + +func TestPrep_OnShutdown_Ugly_NilCore(t *testing.T) { + // OnShutdown must not panic even if s.core is nil + s := &PrepSubsystem{ServiceRuntime: nil, frozen: false} + assert.NotPanics(t, func() { + _ = s.OnShutdown(context.Background()) + }) + assert.True(t, s.frozen) +} func TestPrep_OnShutdown_Bad(t *testing.T) { // OnShutdown without Core s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } assert.NotPanics(t, func() { r := s.OnShutdown(context.Background()) @@ -455,8 +528,8 @@ func TestPrep_Shutdown_Bad(t *testing.T) { // Shutdown always returns nil s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } err := s.Shutdown(context.Background()) assert.NoError(t, err) @@ -525,9 +598,9 @@ func TestPrep_TestPrepWorkspace_Good(t *testing.T) { s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - codePath: t.TempDir(), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + codePath: t.TempDir(), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } // Valid input but repo won't exist — still exercises the public wrapper delegation @@ -542,9 +615,9 @@ func TestPrep_TestPrepWorkspace_Good(t *testing.T) { func TestPrep_TestPrepWorkspace_Bad(t *testing.T) { s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - codePath: t.TempDir(), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + codePath: t.TempDir(), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } // Missing repo — should return error @@ -559,9 +632,9 @@ func TestPrep_TestPrepWorkspace_Ugly(t *testing.T) { s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - codePath: t.TempDir(), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + codePath: t.TempDir(), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } // Bare ".." is caught as invalid repo name by PathBase check @@ -581,9 +654,9 @@ func TestPrep_TestBuildPrompt_Good(t *testing.T) { s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - codePath: t.TempDir(), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + codePath: t.TempDir(), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } prompt, memories, consumers := s.TestBuildPrompt(context.Background(), PrepInput{ @@ -602,9 +675,9 @@ func TestPrep_TestBuildPrompt_Good(t *testing.T) { func TestPrep_TestBuildPrompt_Bad(t *testing.T) { s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - codePath: t.TempDir(), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + codePath: t.TempDir(), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } // Empty inputs — should still return a prompt string without panicking @@ -621,9 +694,9 @@ func TestPrep_TestBuildPrompt_Ugly(t *testing.T) { s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - codePath: t.TempDir(), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + codePath: t.TempDir(), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } // Unicode in all fields — should not panic @@ -685,8 +758,8 @@ func TestPrep_GetGitLog_Good(t *testing.T) { s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } log := s.getGitLog(dir) assert.NotEmpty(t, log) @@ -698,8 +771,8 @@ func TestPrep_GetGitLog_Bad(t *testing.T) { dir := t.TempDir() s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } log := s.getGitLog(dir) assert.Empty(t, log) @@ -712,8 +785,8 @@ func TestPrep_GetGitLog_Ugly(t *testing.T) { s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } log := s.getGitLog(dir) assert.Empty(t, log) @@ -753,10 +826,10 @@ func TestPrep_PrepWorkspace_Good(t *testing.T) { s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - forge: forge.NewForge(srv.URL, "test-token"), - codePath: core.JoinPath(root, "src"), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + forge: forge.NewForge(srv.URL, "test-token"), + codePath: core.JoinPath(root, "src"), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } _, out, err := s.TestPrepWorkspace(context.Background(), PrepInput{ diff --git a/pkg/agentic/process_register.go b/pkg/agentic/process_register.go index eb5c382..1b1fa5d 100644 --- a/pkg/agentic/process_register.go +++ b/pkg/agentic/process_register.go @@ -9,6 +9,10 @@ import ( "dappco.re/go/core/process" ) +type processActionHandlers struct { + service *process.Service +} + // ProcessRegister is the service factory for go-process. // Registers the process service under the canonical "process" name and exposes // the Core `process.*` Actions expected by `c.Process()`. @@ -37,44 +41,49 @@ func ProcessRegister(c *core.Core) core.Result { return r } - c.Action("process.run", func(ctx context.Context, opts core.Options) core.Result { - output, err := svc.RunWithOptions(ctx, process.RunOptions{ - Command: opts.String("command"), - Args: optionStrings(opts, "args"), - Dir: opts.String("dir"), - Env: optionStrings(opts, "env"), - }) - if err != nil { - return core.Result{Value: err, OK: false} - } - return core.Result{Value: output, OK: true} - }) + handlers := &processActionHandlers{service: svc} + c.Action("process.run", handlers.handleRun) + c.Action("process.start", handlers.handleStart) + c.Action("process.kill", handlers.handleKill) - c.Action("process.start", func(ctx context.Context, opts core.Options) core.Result { - proc, err := svc.StartWithOptions(ctx, process.RunOptions{ - Command: opts.String("command"), - Args: optionStrings(opts, "args"), - Dir: opts.String("dir"), - Env: optionStrings(opts, "env"), - Detach: opts.Bool("detach"), - }) - if err != nil { - return core.Result{Value: err, OK: false} - } - return core.Result{Value: proc, OK: true} - }) + return core.Result{OK: true} +} - c.Action("process.kill", func(_ context.Context, opts core.Options) core.Result { - id := opts.String("id") - if id == "" { - return core.Result{Value: core.E("agentic.ProcessRegister", "process id is required", nil), OK: false} - } - if err := svc.Kill(id); err != nil { - return core.Result{Value: err, OK: false} - } - return core.Result{OK: true} +func (h *processActionHandlers) handleRun(ctx context.Context, opts core.Options) core.Result { + output, err := h.service.RunWithOptions(ctx, process.RunOptions{ + Command: opts.String("command"), + Args: optionStrings(opts, "args"), + Dir: opts.String("dir"), + Env: optionStrings(opts, "env"), }) + if err != nil { + return core.Result{Value: err, OK: false} + } + return core.Result{Value: output, OK: true} +} +func (h *processActionHandlers) handleStart(ctx context.Context, opts core.Options) core.Result { + proc, err := h.service.StartWithOptions(ctx, process.RunOptions{ + Command: opts.String("command"), + Args: optionStrings(opts, "args"), + Dir: opts.String("dir"), + Env: optionStrings(opts, "env"), + Detach: opts.Bool("detach"), + }) + if err != nil { + return core.Result{Value: err, OK: false} + } + return core.Result{Value: proc, OK: true} +} + +func (h *processActionHandlers) handleKill(_ context.Context, opts core.Options) core.Result { + id := opts.String("id") + if id == "" { + return core.Result{Value: core.E("agentic.ProcessRegister", "process id is required", nil), OK: false} + } + if err := h.service.Kill(id); err != nil { + return core.Result{Value: err, OK: false} + } return core.Result{OK: true} } diff --git a/pkg/agentic/process_register_example_test.go b/pkg/agentic/process_register_example_test.go index b459bfc..7335add 100644 --- a/pkg/agentic/process_register_example_test.go +++ b/pkg/agentic/process_register_example_test.go @@ -1,21 +1,24 @@ // SPDX-License-Identifier: EUPL-1.2 -package agentic_test +package agentic import ( "context" core "dappco.re/go/core" - "dappco.re/go/agent/pkg/agentic" ) -func ExampleProcessRegister_exists() { - c := core.New(core.WithService(agentic.ProcessRegister)) - c.ServiceStartup(context.Background(), nil) +func ExampleProcessRegister() { + c := core.New() + ProcessRegister(c) - core.Println(c.Process().Exists()) - core.Println(c.Action("process.run").Exists()) - // Output: - // true - // true + r := c.Action("process.run").Run(context.Background(), core.NewOptions( + core.Option{Key: "command", Value: "echo"}, + core.Option{Key: "args", Value: []string{"ok"}}, + )) + if r.OK { + core.Println(core.Trim(r.Value.(string))) + } + + // Output: ok } diff --git a/pkg/agentic/process_register_test.go b/pkg/agentic/process_register_test.go index fd4e8e6..2187a3c 100644 --- a/pkg/agentic/process_register_test.go +++ b/pkg/agentic/process_register_test.go @@ -5,30 +5,90 @@ package agentic import ( "context" "testing" + "time" core "dappco.re/go/core" + "dappco.re/go/core/process" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -func TestProcessregister_Register_Good(t *testing.T) { - c := core.New(core.WithService(ProcessRegister)) - c.ServiceStartup(context.Background(), nil) - assert.True(t, c.Process().Exists()) +func TestProcessRegister_ProcessRegister_Good(t *testing.T) { + t.Setenv("CORE_WORKSPACE", t.TempDir()) + + c := core.New() + result := ProcessRegister(c) + assert.True(t, result.OK, "ProcessRegister should succeed with a real Core instance") + assert.True(t, c.Process().Exists(), "ProcessRegister should register the process service") } -func TestProcessregister_NilCore_Bad_NilCore(t *testing.T) { - // ProcessRegister delegates to process.Register - // which needs a valid Core — verify it doesn't panic - assert.NotPanics(t, func() { - c := core.New() - _ = ProcessRegister(c) - }) +func TestProcessRegister_ProcessRegister_Bad_NilCore(t *testing.T) { + result := ProcessRegister(nil) + assert.False(t, result.OK) } -func TestProcessregister_Actions_Ugly_ActionsRegistered(t *testing.T) { - c := core.New(core.WithService(ProcessRegister)) - c.ServiceStartup(context.Background(), nil) - assert.True(t, c.Action("process.run").Exists()) - assert.True(t, c.Action("process.start").Exists()) - assert.True(t, c.Action("process.kill").Exists()) +func TestProcessRegister_ProcessRegister_Ugly_DoubleRegister(t *testing.T) { + t.Setenv("CORE_WORKSPACE", t.TempDir()) + + c := core.New() + r1 := ProcessRegister(c) + assert.True(t, r1.OK) + + r2 := ProcessRegister(c) + assert.True(t, r2.OK, "second ProcessRegister call should not fail") +} + +func TestProcessRegister_HandleRun_Good(t *testing.T) { + t.Setenv("CORE_WORKSPACE", t.TempDir()) + + c := core.New() + require.True(t, ProcessRegister(c).OK) + + r := c.Action("process.run").Run(context.Background(), core.NewOptions( + core.Option{Key: "command", Value: "echo"}, + core.Option{Key: "args", Value: []string{"ok"}}, + )) + require.True(t, r.OK) + assert.Equal(t, "ok", core.Trim(r.Value.(string))) +} + +func TestProcessRegister_HandleKill_Bad_MissingID(t *testing.T) { + t.Setenv("CORE_WORKSPACE", t.TempDir()) + + c := core.New() + require.True(t, ProcessRegister(c).OK) + + r := c.Action("process.kill").Run(context.Background(), core.NewOptions()) + assert.False(t, r.OK) +} + +func TestProcessRegister_HandleStart_Ugly_StartAndKill(t *testing.T) { + t.Setenv("CORE_WORKSPACE", t.TempDir()) + + c := core.New() + require.True(t, ProcessRegister(c).OK) + + r := c.Action("process.start").Run(context.Background(), core.NewOptions( + core.Option{Key: "command", Value: "sleep"}, + core.Option{Key: "args", Value: []string{"30"}}, + core.Option{Key: "detach", Value: true}, + )) + require.True(t, r.OK) + + proc, ok := r.Value.(*process.Process) + require.True(t, ok) + require.NotEmpty(t, proc.ID) + + defer proc.Kill() + + kill := c.Action("process.kill").Run(context.Background(), core.NewOptions( + core.Option{Key: "id", Value: proc.ID}, + )) + assert.True(t, kill.OK) + + select { + case <-proc.Done(): + case <-time.After(5 * time.Second): + t.Fatal("process.kill did not stop the managed process") + } } diff --git a/pkg/agentic/register_test.go b/pkg/agentic/register_test.go index 2172e36..2de2d76 100644 --- a/pkg/agentic/register_test.go +++ b/pkg/agentic/register_test.go @@ -3,7 +3,6 @@ package agentic import ( - "context" "testing" core "dappco.re/go/core" @@ -54,110 +53,3 @@ func TestRegister_AgentsConfig_Good(t *testing.T) { concurrency := core.ConfigGet[map[string]ConcurrencyLimit](c.Config(), "agents.concurrency") assert.NotNil(t, concurrency, "Register must store agents.concurrency in Core Config") } - -// --- ProcessRegister --- - -func TestRegister_ProcessRegister_Good(t *testing.T) { - t.Setenv("CORE_WORKSPACE", t.TempDir()) - - c := core.New() - result := ProcessRegister(c) - assert.True(t, result.OK, "ProcessRegister should succeed with a real Core instance") - assert.True(t, c.Process().Exists(), "ProcessRegister should register the process service") -} - -func TestRegister_ProcessRegister_Bad(t *testing.T) { - // nil Core — the process.NewService factory tolerates nil Core, returns a result - result := ProcessRegister(nil) - // Either OK (service created without Core) or not OK (error) — must not panic - _ = result -} - -func TestRegister_ProcessRegister_Ugly(t *testing.T) { - // Call twice with same Core — second call should still succeed - t.Setenv("CORE_WORKSPACE", t.TempDir()) - - c := core.New() - r1 := ProcessRegister(c) - assert.True(t, r1.OK) - - r2 := ProcessRegister(c) - assert.True(t, r2.OK, "second ProcessRegister call should not fail") -} - -// --- OnStartup --- - -func TestPrep_OnStartup_Good_CreatesPokeCh(t *testing.T) { - // StartRunner is now a no-op — pokeCh is no longer initialised by OnStartup. - // Verify OnStartup succeeds and pokeCh remains nil. - t.Setenv("CORE_WORKSPACE", t.TempDir()) - t.Setenv("CORE_AGENT_DISPATCH", "") - - c := core.New(core.WithOption("name", "test")) - s := NewPrep() - s.SetCore(c) - - assert.Nil(t, s.pokeCh, "pokeCh should be nil before OnStartup") - - r := s.OnStartup(context.Background()) - assert.True(t, r.OK) - - assert.Nil(t, s.pokeCh, "pokeCh should remain nil — queue drain is owned by pkg/runner") -} - -func TestPrep_OnStartup_Good_FrozenByDefault(t *testing.T) { - // Frozen state is now owned by pkg/runner.Service, not agentic. - // Verify OnStartup succeeds without asserting frozen state. - t.Setenv("CORE_WORKSPACE", t.TempDir()) - t.Setenv("CORE_AGENT_DISPATCH", "") - - c := core.New(core.WithOption("name", "test")) - s := NewPrep() - s.SetCore(c) - - assert.True(t, s.OnStartup(context.Background()).OK) -} - -func TestPrep_OnStartup_Good_NoError(t *testing.T) { - t.Setenv("CORE_WORKSPACE", t.TempDir()) - t.Setenv("CORE_AGENT_DISPATCH", "") - - c := core.New(core.WithOption("name", "test")) - s := NewPrep() - s.SetCore(c) - - assert.True(t, s.OnStartup(context.Background()).OK) -} - -// --- OnShutdown --- - -func TestPrep_OnShutdown_Good_FreezesQueue(t *testing.T) { - t.Setenv("CORE_WORKSPACE", t.TempDir()) - - s := &PrepSubsystem{ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), frozen: false} - r := s.OnShutdown(context.Background()) - assert.True(t, r.OK) - assert.True(t, s.frozen, "OnShutdown must set frozen=true") -} - -func TestPrep_OnShutdown_Good_AlreadyFrozen(t *testing.T) { - // Calling OnShutdown twice must be idempotent - s := &PrepSubsystem{ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), frozen: true} - r := s.OnShutdown(context.Background()) - assert.True(t, r.OK) - assert.True(t, s.frozen) -} - -func TestPrep_OnShutdown_Good_NoError(t *testing.T) { - s := &PrepSubsystem{ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{})} - assert.True(t, s.OnShutdown(context.Background()).OK) -} - -func TestPrep_OnShutdown_Ugly_NilCore(t *testing.T) { - // OnShutdown must not panic even if s.core is nil - s := &PrepSubsystem{ServiceRuntime: nil, frozen: false} - assert.NotPanics(t, func() { - _ = s.OnShutdown(context.Background()) - }) - assert.True(t, s.frozen) -} diff --git a/pkg/agentic/resume.go b/pkg/agentic/resume.go index b507a21..12f54cc 100644 --- a/pkg/agentic/resume.go +++ b/pkg/agentic/resume.go @@ -11,9 +11,9 @@ import ( // ResumeInput is the input for agentic_resume. // -// input := agentic.ResumeInput{Workspace: "go-scm-1773581173", Answer: "Use the existing queue config"} +// input := agentic.ResumeInput{Workspace: "core/go-scm/task-42", Answer: "Use the existing queue config"} type ResumeInput struct { - Workspace string `json:"workspace"` // workspace name (e.g. "go-scm-1773581173") + Workspace string `json:"workspace"` // workspace name (e.g. "core/go-scm/task-42") Answer string `json:"answer,omitempty"` // answer to the blocked question (written to ANSWER.md) Agent string `json:"agent,omitempty"` // override agent type (default: same as original) DryRun bool `json:"dry_run,omitempty"` // preview without executing @@ -21,7 +21,7 @@ type ResumeInput struct { // ResumeOutput is the output for agentic_resume. // -// out := agentic.ResumeOutput{Success: true, Workspace: "go-scm-1773581173", Agent: "codex"} +// out := agentic.ResumeOutput{Success: true, Workspace: "core/go-scm/task-42", Agent: "codex"} type ResumeOutput struct { Success bool `json:"success"` Workspace string `json:"workspace"` diff --git a/pkg/agentic/status.go b/pkg/agentic/status.go index c7e20be..d73e526 100644 --- a/pkg/agentic/status.go +++ b/pkg/agentic/status.go @@ -87,7 +87,7 @@ func ReadStatus(wsDir string) (*WorkspaceStatus, error) { // StatusInput is the input for agentic_status. // -// input := agentic.StatusInput{Workspace: "go-io-123", Limit: 50} +// input := agentic.StatusInput{Workspace: "core/go-io/task-42", Limit: 50} type StatusInput struct { Workspace string `json:"workspace,omitempty"` // specific workspace name, or empty for all Limit int `json:"limit,omitempty"` // max results (default 100) @@ -109,7 +109,7 @@ type StatusOutput struct { // BlockedInfo shows a workspace that needs human input. // -// info := agentic.BlockedInfo{Name: "go-io/task-4", Repo: "go-io", Question: "Which API version?"} +// info := agentic.BlockedInfo{Name: "core/go-io/task-4", Repo: "go-io", Question: "Which API version?"} type BlockedInfo struct { Name string `json:"name"` Repo string `json:"repo"` diff --git a/pkg/agentic/watch.go b/pkg/agentic/watch.go index eb3492a..55f258b 100644 --- a/pkg/agentic/watch.go +++ b/pkg/agentic/watch.go @@ -12,7 +12,7 @@ import ( // WatchInput is the input for agentic_watch. // -// input := agentic.WatchInput{Workspaces: []string{"go-io-123"}, PollInterval: 5, Timeout: 600} +// input := agentic.WatchInput{Workspaces: []string{"core/go-io/task-42"}, PollInterval: 5, Timeout: 600} type WatchInput struct { // Workspaces to watch. If empty, watches all running/queued workspaces. Workspaces []string `json:"workspaces,omitempty"` @@ -24,7 +24,7 @@ type WatchInput struct { // WatchOutput is the result when all watched workspaces complete. // -// out := agentic.WatchOutput{Success: true, Completed: []agentic.WatchResult{{Workspace: "go-io-123", Status: "completed"}}} +// out := agentic.WatchOutput{Success: true, Completed: []agentic.WatchResult{{Workspace: "core/go-io/task-42", Status: "completed"}}} type WatchOutput struct { Success bool `json:"success"` Completed []WatchResult `json:"completed"` @@ -34,7 +34,7 @@ type WatchOutput struct { // WatchResult describes one completed workspace. // -// result := agentic.WatchResult{Workspace: "go-io-123", Agent: "codex", Repo: "go-io", Status: "completed"} +// result := agentic.WatchResult{Workspace: "core/go-io/task-42", Agent: "codex", Repo: "go-io", Status: "completed"} type WatchResult struct { Workspace string `json:"workspace"` Agent string `json:"agent"` diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 821eaf5..973a2f2 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -97,25 +97,7 @@ func (s *Service) OnStartup(ctx context.Context) core.Result { s.hydrateWorkspaces() // QUERY handler — workspace state queries - c.RegisterQuery(func(_ *core.Core, q core.Query) core.Result { - wq, ok := q.(WorkspaceQuery) - if !ok { - return core.Result{} - } - if wq.Name != "" { - return s.workspaces.Get(wq.Name) - } - if wq.Status != "" { - var names []string - s.workspaces.Each(func(name string, st *WorkspaceStatus) { - if st.Status == wq.Status { - names = append(names, name) - } - }) - return core.Result{Value: names, OK: true} - } - return core.Result{Value: s.workspaces, OK: true} - }) + c.RegisterQuery(s.handleWorkspaceQuery) // Start the background queue runner s.startRunner() @@ -253,6 +235,30 @@ func (s *Service) Workspaces() *core.Registry[*WorkspaceStatus] { return s.workspaces } +// handleWorkspaceQuery answers workspace state queries from Core QUERY calls. +// +// r := c.QUERY(runner.WorkspaceQuery{Name: "core/go-io/task-42"}) +// r := c.QUERY(runner.WorkspaceQuery{Status: "running"}) +func (s *Service) handleWorkspaceQuery(_ *core.Core, q core.Query) core.Result { + wq, ok := q.(WorkspaceQuery) + if !ok { + return core.Result{} + } + if wq.Name != "" { + return s.workspaces.Get(wq.Name) + } + if wq.Status != "" { + var names []string + s.workspaces.Each(func(name string, st *WorkspaceStatus) { + if st.Status == wq.Status { + names = append(names, name) + } + }) + return core.Result{Value: names, OK: true} + } + return core.Result{Value: s.workspaces, OK: true} +} + // --- Actions --- func (s *Service) actionDispatch(_ context.Context, opts core.Options) core.Result {