From e8a46c2f95e02566f87aa365304f4965c22c8ee3 Mon Sep 17 00:00:00 2001 From: Virgil Date: Sun, 29 Mar 2026 21:27:32 +0000 Subject: [PATCH] fix(ax): align runner helper layer and examples Co-Authored-By: Virgil --- pkg/runner/paths.go | 41 +++++++----- pkg/runner/paths_example_test.go | 38 +++++++++++ pkg/runner/paths_test.go | 107 ++++++++++++++++++++++++++++++ pkg/runner/queue_example_test.go | 23 +++++++ pkg/runner/runner.go | 11 +-- pkg/runner/runner_example_test.go | 20 ++++++ 6 files changed, 221 insertions(+), 19 deletions(-) create mode 100644 pkg/runner/paths_example_test.go create mode 100644 pkg/runner/paths_test.go create mode 100644 pkg/runner/queue_example_test.go create mode 100644 pkg/runner/runner_example_test.go diff --git a/pkg/runner/paths.go b/pkg/runner/paths.go index e3c298f..803cb9e 100644 --- a/pkg/runner/paths.go +++ b/pkg/runner/paths.go @@ -3,41 +3,43 @@ package runner import ( + "time" + + "dappco.re/go/agent/pkg/agentic" core "dappco.re/go/core" ) -// fs is the file I/O medium for the runner package. -var fs = (&core.Fs{}).NewUnrestricted() +// fs reuses the shared unrestricted filesystem used by agentic. +var fs = agentic.LocalFs() // WorkspaceRoot returns the root directory for agent workspaces. // // root := runner.WorkspaceRoot() // ~/Code/.core/workspace func WorkspaceRoot() string { - return core.JoinPath(CoreRoot(), "workspace") + return agentic.WorkspaceRoot() } // CoreRoot returns the root directory for core ecosystem files. // // root := runner.CoreRoot() // ~/Code/.core func CoreRoot() string { - if root := core.Env("CORE_WORKSPACE"); root != "" { - return root - } - return core.JoinPath(core.Env("DIR_HOME"), "Code", ".core") + return agentic.CoreRoot() } // ReadStatus reads a workspace status.json. // // st, err := runner.ReadStatus("/path/to/workspace") func ReadStatus(wsDir string) (*WorkspaceStatus, error) { - path := core.JoinPath(wsDir, "status.json") - r := fs.Read(path) - if !r.OK { - return nil, core.E("runner.ReadStatus", "failed to read status", nil) + status, err := agentic.ReadStatus(wsDir) + if err != nil { + return nil, core.E("runner.ReadStatus", "failed to read status", err) } + + json := core.JSONMarshalString(status) var st WorkspaceStatus - if result := core.JSONUnmarshalString(r.Value.(string), &st); !result.OK { - return nil, core.E("runner.ReadStatus", "failed to parse status", nil) + if result := core.JSONUnmarshalString(json, &st); !result.OK { + parseErr, _ := result.Value.(error) + return nil, core.E("runner.ReadStatus", "failed to parse status", parseErr) } return &st, nil } @@ -46,6 +48,15 @@ func ReadStatus(wsDir string) (*WorkspaceStatus, error) { // // runner.WriteStatus(wsDir, &runner.WorkspaceStatus{Status: "running", Agent: "codex"}) func WriteStatus(wsDir string, st *WorkspaceStatus) { - path := core.JoinPath(wsDir, "status.json") - fs.Write(path, core.JSONMarshalString(st)) + if st == nil { + return + } + + json := core.JSONMarshalString(st) + var status agentic.WorkspaceStatus + if result := core.JSONUnmarshalString(json, &status); !result.OK { + return + } + status.UpdatedAt = time.Now() + fs.WriteAtomic(agentic.WorkspaceStatusPath(wsDir), core.JSONMarshalString(&status)) } diff --git a/pkg/runner/paths_example_test.go b/pkg/runner/paths_example_test.go new file mode 100644 index 0000000..97eb9b5 --- /dev/null +++ b/pkg/runner/paths_example_test.go @@ -0,0 +1,38 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package runner + +import ( + core "dappco.re/go/core" +) + +func ExampleCoreRoot() { + root := CoreRoot() + core.Println(core.HasSuffix(root, ".core")) + // Output: true +} + +func ExampleWorkspaceRoot() { + root := WorkspaceRoot() + core.Println(core.HasSuffix(root, "workspace")) + // Output: true +} + +func ExampleWriteStatus() { + fsys := (&core.Fs{}).NewUnrestricted() + dir := fsys.TempDir("runner-paths") + defer fsys.DeleteAll(dir) + + WriteStatus(dir, &WorkspaceStatus{ + Status: "running", + Agent: "codex", + Repo: "go-io", + }) + + st, err := ReadStatus(dir) + core.Println(err == nil) + core.Println(st.Status) + // Output: + // true + // running +} diff --git a/pkg/runner/paths_test.go b/pkg/runner/paths_test.go new file mode 100644 index 0000000..b8951cd --- /dev/null +++ b/pkg/runner/paths_test.go @@ -0,0 +1,107 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package runner + +import ( + "testing" + "time" + + "dappco.re/go/agent/pkg/agentic" + core "dappco.re/go/core" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPaths_CoreRoot_Good_EnvVar(t *testing.T) { + t.Setenv("CORE_WORKSPACE", "/tmp/core-root") + assert.Equal(t, "/tmp/core-root", CoreRoot()) +} + +func TestPaths_CoreRoot_Bad_Fallback(t *testing.T) { + t.Setenv("CORE_WORKSPACE", "") + home := core.Env("DIR_HOME") + assert.Equal(t, home+"/Code/.core", CoreRoot()) +} + +func TestPaths_CoreRoot_Ugly_UnicodePath(t *testing.T) { + t.Setenv("CORE_WORKSPACE", "/tmp/core-røot") + assert.Equal(t, "/tmp/core-røot", CoreRoot()) +} + +func TestPaths_WorkspaceRoot_Good(t *testing.T) { + t.Setenv("CORE_WORKSPACE", "/tmp/core-root") + assert.Equal(t, "/tmp/core-root/workspace", WorkspaceRoot()) +} + +func TestPaths_WorkspaceRoot_Bad_EmptyEnv(t *testing.T) { + t.Setenv("CORE_WORKSPACE", "") + home := core.Env("DIR_HOME") + assert.Equal(t, home+"/Code/.core/workspace", WorkspaceRoot()) +} + +func TestPaths_WorkspaceRoot_Ugly_NestedCoreRoot(t *testing.T) { + t.Setenv("CORE_WORKSPACE", "/srv/core/tenant-a") + assert.Equal(t, "/srv/core/tenant-a/workspace", WorkspaceRoot()) +} + +func TestPaths_ReadStatus_Good_AgenticShape(t *testing.T) { + wsDir := t.TempDir() + status := &agentic.WorkspaceStatus{ + Status: "completed", + Agent: "codex", + Repo: "go-io", + Task: "Finish AX cleanup", + Branch: "agent/ax-cleanup", + PID: 42, + ProcessID: "proc-123", + StartedAt: time.Now(), + UpdatedAt: time.Now(), + Question: "Ready?", + Runs: 2, + PRURL: "https://forge.test/core/go-io/pulls/12", + } + require.True(t, agentic.LocalFs().WriteAtomic(agentic.WorkspaceStatusPath(wsDir), core.JSONMarshalString(status)).OK) + + st, err := ReadStatus(wsDir) + require.NoError(t, err) + assert.Equal(t, "completed", st.Status) + assert.Equal(t, "codex", st.Agent) + assert.Equal(t, "go-io", st.Repo) + assert.Equal(t, "agent/ax-cleanup", st.Branch) + assert.Equal(t, 2, st.Runs) +} + +func TestPaths_ReadStatus_Bad_InvalidJSON(t *testing.T) { + wsDir := t.TempDir() + require.True(t, agentic.LocalFs().WriteAtomic(agentic.WorkspaceStatusPath(wsDir), "{not-json").OK) + + _, err := ReadStatus(wsDir) + assert.Error(t, err) +} + +func TestPaths_WriteStatus_Ugly_AtomicOverwrite(t *testing.T) { + wsDir := t.TempDir() + + WriteStatus(wsDir, &WorkspaceStatus{ + Status: "running", + Agent: "codex", + Repo: "go-io", + Task: "First run", + }) + WriteStatus(wsDir, &WorkspaceStatus{ + Status: "completed", + Agent: "claude", + Repo: "go-io", + Task: "Second run", + Branch: "agent/ax-cleanup", + StartedAt: time.Now(), + Runs: 3, + }) + + st, err := ReadStatus(wsDir) + require.NoError(t, err) + assert.Equal(t, "completed", st.Status) + assert.Equal(t, "claude", st.Agent) + assert.Equal(t, "agent/ax-cleanup", st.Branch) + assert.Equal(t, 3, st.Runs) +} diff --git a/pkg/runner/queue_example_test.go b/pkg/runner/queue_example_test.go new file mode 100644 index 0000000..ae7915b --- /dev/null +++ b/pkg/runner/queue_example_test.go @@ -0,0 +1,23 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package runner + +import ( + core "dappco.re/go/core" + "gopkg.in/yaml.v3" +) + +func ExampleConcurrencyLimit_UnmarshalYAML() { + input := ` +total: 5 +gpt-5.4: 1 +` + var limit ConcurrencyLimit + _ = yaml.Unmarshal([]byte(input), &limit) + + core.Println(limit.Total) + core.Println(limit.Models["gpt-5.4"]) + // Output: + // 5 + // 1 +} diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 386dfe9..821eaf5 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -25,8 +25,8 @@ type Options struct{} // Manages concurrency limits, queue drain, workspace lifecycle, and frozen state. // All dispatch requests — MCP tool, CLI, or IPC — go through this service. // -// r := runner.New() -// r.Dispatch(ctx, input) // checks frozen + concurrency, spawns or queues +// svc := runner.New() +// svc.TrackWorkspace("core/go-io/task-5", &runner.WorkspaceStatus{Status: "running", Agent: "codex"}) type Service struct { *core.ServiceRuntime[Options] dispatchMu sync.Mutex @@ -77,8 +77,11 @@ func Register(c *core.Core) core.Result { // OnStartup registers Actions and starts the queue runner. // -// c.Perform("runner.dispatch", opts) // dispatch an agent -// c.Perform("runner.status", opts) // query workspace status +// c.Action("runner.dispatch").Run(ctx, core.NewOptions( +// core.Option{Key: "repo", Value: "go-io"}, +// core.Option{Key: "agent", Value: "codex"}, +// )) +// c.Action("runner.status").Run(ctx, core.NewOptions()) func (s *Service) OnStartup(ctx context.Context) core.Result { c := s.Core() diff --git a/pkg/runner/runner_example_test.go b/pkg/runner/runner_example_test.go new file mode 100644 index 0000000..fd75334 --- /dev/null +++ b/pkg/runner/runner_example_test.go @@ -0,0 +1,20 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package runner + +import ( + core "dappco.re/go/core" +) + +func ExampleNew() { + svc := New() + core.Println(svc.Workspaces().Len()) + // Output: 0 +} + +func ExampleRegister() { + c := core.New(core.WithOption("name", "runner-example")) + r := Register(c) + core.Println(r.OK) + // Output: true +}