From 6d239d5b956b51afc59602c937866476e65b4be3 Mon Sep 17 00:00:00 2001 From: Virgil Date: Mon, 30 Mar 2026 16:01:32 +0000 Subject: [PATCH] fix(ax): remove pid syscall wrappers Co-Authored-By: Virgil --- pkg/agentic/dispatch.go | 14 +-- pkg/agentic/dispatch_sync.go | 9 +- pkg/agentic/handlers.go | 5 +- pkg/agentic/pid.go | 70 +++++++++++-- pkg/agentic/pid_example_test.go | 34 ++++-- pkg/agentic/pid_test.go | 68 +++++------- pkg/agentic/queue.go | 29 +++-- pkg/agentic/queue_extra_test.go | 180 ++++++++++++++++++-------------- pkg/agentic/resume.go | 3 +- pkg/agentic/status.go | 10 +- pkg/monitor/monitor.go | 18 ++-- pkg/monitor/monitor_test.go | 53 +++++++--- pkg/runner/queue.go | 12 ++- pkg/runner/runner.go | 6 +- 14 files changed, 328 insertions(+), 183 deletions(-) diff --git a/pkg/agentic/dispatch.go b/pkg/agentic/dispatch.go index a1f144b..75f9397 100644 --- a/pkg/agentic/dispatch.go +++ b/pkg/agentic/dispatch.go @@ -387,10 +387,10 @@ func (s *PrepSubsystem) onAgentComplete(agent, wsDir, outputFile string, exitCod // spawnAgent launches an agent inside a Docker container. // The repo/ directory is mounted at /workspace, agent runs sandboxed. // Output is captured and written to .meta/agent-{agent}.log on completion. -func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, error) { +func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, string, error) { command, args, err := agentCommand(agent, prompt) if err != nil { - return 0, "", err + return 0, "", "", err } repoDir := WorkspaceRepoDir(wsDir) @@ -406,7 +406,7 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er procSvc, ok := core.ServiceFor[*process.Service](s.Core(), "process") if !ok { - return 0, "", core.E("dispatch.spawnAgent", "process service not registered", nil) + return 0, "", "", core.E("dispatch.spawnAgent", "process service not registered", nil) } proc, err := procSvc.StartWithOptions(context.Background(), process.RunOptions{ Command: command, @@ -415,11 +415,12 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er Detach: true, }) if err != nil { - return 0, "", core.E("dispatch.spawnAgent", core.Concat("failed to spawn ", agent), err) + return 0, "", "", core.E("dispatch.spawnAgent", core.Concat("failed to spawn ", agent), err) } proc.CloseStdin() pid := proc.Info().PID + processID := proc.ID s.broadcastStart(agent, wsDir) s.startIssueTracking(wsDir) @@ -435,7 +436,7 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er }) s.Core().PerformAsync(monitorAction, core.NewOptions()) - return pid, outputFile, nil + return pid, processID, outputFile, nil } // runQA runs build + test checks on the repo after agent completion. @@ -560,7 +561,7 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest, } // Step 3: Spawn agent in repo/ directory - pid, outputFile, err := s.spawnAgent(input.Agent, prompt, wsDir) + pid, processID, outputFile, err := s.spawnAgent(input.Agent, prompt, wsDir) if err != nil { return nil, DispatchOutput{}, err } @@ -573,6 +574,7 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest, Task: input.Task, Branch: prepOut.Branch, PID: pid, + ProcessID: processID, StartedAt: time.Now(), Runs: 1, } diff --git a/pkg/agentic/dispatch_sync.go b/pkg/agentic/dispatch_sync.go index f1c9607..80b2558 100644 --- a/pkg/agentic/dispatch_sync.go +++ b/pkg/agentic/dispatch_sync.go @@ -62,7 +62,7 @@ func (s *PrepSubsystem) DispatchSync(ctx context.Context, input DispatchSyncInpu core.Print(nil, " branch: %s", prepOut.Branch) // Spawn agent directly — no queue, no concurrency check - pid, _, err := s.spawnAgent(input.Agent, prompt, wsDir) + pid, processID, _, err := s.spawnAgent(input.Agent, prompt, wsDir) if err != nil { return DispatchSyncResult{Error: err.Error()} } @@ -70,6 +70,11 @@ func (s *PrepSubsystem) DispatchSync(ctx context.Context, input DispatchSyncInpu core.Print(nil, " pid: %d", pid) core.Print(nil, " waiting for completion...") + var runtime *core.Core + if s.ServiceRuntime != nil { + runtime = s.Core() + } + // Poll for process exit ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() @@ -79,7 +84,7 @@ func (s *PrepSubsystem) DispatchSync(ctx context.Context, input DispatchSyncInpu case <-ctx.Done(): return DispatchSyncResult{Error: "cancelled"} case <-ticker.C: - if pid > 0 && !PIDAlive(pid) { + if pid > 0 && !ProcessAlive(runtime, processID, pid) { // Process exited — read final status st, err := ReadStatus(wsDir) if err != nil { diff --git a/pkg/agentic/handlers.go b/pkg/agentic/handlers.go index 4538c02..88be7a9 100644 --- a/pkg/agentic/handlers.go +++ b/pkg/agentic/handlers.go @@ -33,13 +33,14 @@ func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Res break } prompt := core.Concat("TASK: ", ev.Task, "\n\nResume from where you left off. Read CODEX.md for conventions. Commit when done.") - pid, outputFile, err := s.spawnAgent(ev.Agent, prompt, wsDir) + pid, processID, outputFile, err := s.spawnAgent(ev.Agent, prompt, wsDir) if err != nil { break } // Update status with real PID if st, serr := ReadStatus(wsDir); serr == nil { st.PID = pid + st.ProcessID = processID writeStatus(wsDir, st) if runnerSvc, ok := core.ServiceFor[workspaceTracker](c, "runner"); ok { runnerSvc.TrackWorkspace(WorkspaceName(wsDir), st) @@ -57,7 +58,7 @@ func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Res // r := prep.SpawnFromQueue("codex", prompt, wsDir) // pid := r.Value.(int) func (s *PrepSubsystem) SpawnFromQueue(agent, prompt, wsDir string) core.Result { - pid, _, err := s.spawnAgent(agent, prompt, wsDir) + pid, _, _, err := s.spawnAgent(agent, prompt, wsDir) if err != nil { return core.Result{ Value: core.E("agentic.SpawnFromQueue", "failed to spawn queued agent", err), diff --git a/pkg/agentic/pid.go b/pkg/agentic/pid.go index d077165..f73a375 100644 --- a/pkg/agentic/pid.go +++ b/pkg/agentic/pid.go @@ -2,24 +2,72 @@ package agentic -import "syscall" +import ( + core "dappco.re/go/core" + "dappco.re/go/core/process" +) -// PIDAlive checks if an OS process is still alive via PID signal check. +// ProcessAlive checks whether a managed process is still running. // -// if agentic.PIDAlive(pid) { ... } -func PIDAlive(pid int) bool { - if pid > 0 { - return syscall.Kill(pid, 0) == nil +// alive := agentic.ProcessAlive(c, proc.ID, proc.Info().PID) +// alive := agentic.ProcessAlive(c, "", 12345) // legacy PID fallback +func ProcessAlive(c *core.Core, processID string, pid int) bool { + if c == nil { + return false } + + svc, ok := core.ServiceFor[*process.Service](c, "process") + if !ok { + return false + } + + if processID != "" { + if proc, err := svc.Get(processID); err == nil { + return proc.IsRunning() + } + } + + if pid <= 0 { + return false + } + + for _, proc := range svc.Running() { + if proc.Info().PID == pid { + return true + } + } + return false } -// PIDTerminate terminates a process via SIGTERM. +// ProcessTerminate stops a managed process. // -// if agentic.PIDTerminate(pid) { ... } -func PIDTerminate(pid int) bool { - if pid > 0 { - return syscall.Kill(pid, syscall.SIGTERM) == nil +// _ = agentic.ProcessTerminate(c, proc.ID, proc.Info().PID) +func ProcessTerminate(c *core.Core, processID string, pid int) bool { + if c == nil { + return false } + + svc, ok := core.ServiceFor[*process.Service](c, "process") + if !ok { + return false + } + + if processID != "" { + if err := svc.Kill(processID); err == nil { + return true + } + } + + if pid <= 0 { + return false + } + + for _, proc := range svc.Running() { + if proc.Info().PID == pid { + return svc.Kill(proc.ID) == nil + } + } + return false } diff --git a/pkg/agentic/pid_example_test.go b/pkg/agentic/pid_example_test.go index 19e1764..cebdb3c 100644 --- a/pkg/agentic/pid_example_test.go +++ b/pkg/agentic/pid_example_test.go @@ -3,17 +3,39 @@ package agentic import ( - "os" + "context" core "dappco.re/go/core" + "dappco.re/go/core/process" ) -func ExamplePIDAlive() { - core.Println(PIDAlive(os.Getpid())) +func ExampleProcessAlive() { + r := testCore.Process().Start(context.Background(), core.NewOptions( + core.Option{Key: "command", Value: "sleep"}, + core.Option{Key: "args", Value: []string{"1"}}, + core.Option{Key: "detach", Value: true}, + )) + if !r.OK { + return + } + proc := r.Value.(*process.Process) + defer proc.Kill() + + core.Println(ProcessAlive(testCore, proc.ID, proc.Info().PID)) // Output: true } -func ExamplePIDTerminate() { - core.Println(PIDTerminate(0)) - // Output: false +func ExampleProcessTerminate() { + r := testCore.Process().Start(context.Background(), core.NewOptions( + core.Option{Key: "command", Value: "sleep"}, + core.Option{Key: "args", Value: []string{"1"}}, + core.Option{Key: "detach", Value: true}, + )) + if !r.OK { + return + } + proc := r.Value.(*process.Process) + defer proc.Kill() + core.Println(ProcessTerminate(testCore, proc.ID, proc.Info().PID)) + // Output: true } diff --git a/pkg/agentic/pid_test.go b/pkg/agentic/pid_test.go index b76aad8..3bf4f70 100644 --- a/pkg/agentic/pid_test.go +++ b/pkg/agentic/pid_test.go @@ -5,14 +5,11 @@ package agentic import ( "context" "os" - "strconv" "testing" "time" core "dappco.re/go/core" - "dappco.re/go/core/process" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) // testPrep is the package-level PrepSubsystem for tests that need process execution. @@ -52,56 +49,45 @@ func newPrepWithProcess() *PrepSubsystem { } } -// --- PIDAlive --- - -func TestPid_PIDAlive_Good(t *testing.T) { - pid, _ := strconv.Atoi(core.Env("PID")) - assert.True(t, PIDAlive(pid)) -} - -func TestPid_PIDAlive_Bad(t *testing.T) { - assert.False(t, PIDAlive(999999)) -} - -func TestPid_PIDAlive_Ugly(t *testing.T) { - assert.False(t, PIDAlive(0)) -} - -// --- PIDTerminate --- - -func TestPid_PIDTerminate_Good(t *testing.T) { - r := testCore.Process().Start(context.Background(), core.NewOptions( - core.Option{Key: "command", Value: "sleep"}, - core.Option{Key: "args", Value: []string{"30"}}, - core.Option{Key: "detach", Value: true}, - )) - require.True(t, r.OK) - - proc, ok := r.Value.(*process.Process) - require.True(t, ok) +// --- ProcessAlive --- +func TestPid_ProcessAlive_Good(t *testing.T) { + proc := startManagedProcess(t, testCore) pid := proc.Info().PID - require.NotZero(t, pid) - defer func() { - _ = proc.Kill() - }() + assert.True(t, ProcessAlive(testCore, proc.ID, pid)) + assert.True(t, ProcessAlive(testCore, "", pid)) +} - assert.True(t, PIDTerminate(pid)) +func TestPid_ProcessAlive_Bad(t *testing.T) { + assert.False(t, ProcessAlive(testCore, "", 999999)) +} + +func TestPid_ProcessAlive_Ugly(t *testing.T) { + assert.False(t, ProcessAlive(nil, "", 0)) +} + +// --- ProcessTerminate --- + +func TestPid_ProcessTerminate_Good(t *testing.T) { + proc := startManagedProcess(t, testCore) + pid := proc.Info().PID + + assert.True(t, ProcessTerminate(testCore, proc.ID, pid)) select { case <-proc.Done(): case <-time.After(5 * time.Second): - t.Fatal("PIDTerminate did not stop the process") + t.Fatal("ProcessTerminate did not stop the process") } - assert.False(t, PIDAlive(pid)) + assert.False(t, ProcessAlive(testCore, proc.ID, pid)) } -func TestPid_PIDTerminate_Bad(t *testing.T) { - assert.False(t, PIDTerminate(999999)) +func TestPid_ProcessTerminate_Bad(t *testing.T) { + assert.False(t, ProcessTerminate(testCore, "", 999999)) } -func TestPid_PIDTerminate_Ugly(t *testing.T) { - assert.False(t, PIDTerminate(0)) +func TestPid_ProcessTerminate_Ugly(t *testing.T) { + assert.False(t, ProcessTerminate(nil, "", 0)) } diff --git a/pkg/agentic/queue.go b/pkg/agentic/queue.go index f625329..3ac3f66 100644 --- a/pkg/agentic/queue.go +++ b/pkg/agentic/queue.go @@ -163,10 +163,14 @@ func (s *PrepSubsystem) delayForAgent(agent string) time.Duration { // // n := s.countRunningByAgent("codex") // counts all codex:* variants func (s *PrepSubsystem) countRunningByAgent(agent string) int { + var runtime *core.Core + if s.ServiceRuntime != nil { + runtime = s.Core() + } if s.workspaces != nil && s.workspaces.Len() > 0 { count := 0 s.workspaces.Each(func(_ string, st *WorkspaceStatus) { - if st.Status == "running" && baseAgent(st.Agent) == agent && PIDAlive(st.PID) { + if st.Status == "running" && baseAgent(st.Agent) == agent && ProcessAlive(runtime, st.ProcessID, st.PID) { count++ } }) @@ -174,12 +178,12 @@ func (s *PrepSubsystem) countRunningByAgent(agent string) int { } // Fallback: scan disk (cold start before hydration) - return s.countRunningByAgentDisk(agent) + return s.countRunningByAgentDisk(runtime, agent) } // countRunningByAgentDisk scans workspace status.json files on disk. // Used only as fallback before Registry hydration completes. -func (s *PrepSubsystem) countRunningByAgentDisk(agent string) int { +func (s *PrepSubsystem) countRunningByAgentDisk(runtime *core.Core, agent string) int { count := 0 for _, statusPath := range WorkspaceStatusPaths() { st, err := ReadStatus(core.PathDir(statusPath)) @@ -189,7 +193,7 @@ func (s *PrepSubsystem) countRunningByAgentDisk(agent string) int { if baseAgent(st.Agent) != agent { continue } - if PIDAlive(st.PID) { + if ProcessAlive(runtime, st.ProcessID, st.PID) { count++ } } @@ -201,10 +205,14 @@ func (s *PrepSubsystem) countRunningByAgentDisk(agent string) int { // // n := s.countRunningByModel("codex:gpt-5.4") // counts only that model func (s *PrepSubsystem) countRunningByModel(agent string) int { + var runtime *core.Core + if s.ServiceRuntime != nil { + runtime = s.Core() + } if s.workspaces != nil && s.workspaces.Len() > 0 { count := 0 s.workspaces.Each(func(_ string, st *WorkspaceStatus) { - if st.Status == "running" && st.Agent == agent && PIDAlive(st.PID) { + if st.Status == "running" && st.Agent == agent && ProcessAlive(runtime, st.ProcessID, st.PID) { count++ } }) @@ -212,6 +220,12 @@ func (s *PrepSubsystem) countRunningByModel(agent string) int { } // Fallback: scan disk + return s.countRunningByModelDisk(runtime, agent) +} + +// countRunningByModelDisk scans workspace status.json files on disk. +// Used only as fallback before Registry hydration completes. +func (s *PrepSubsystem) countRunningByModelDisk(runtime *core.Core, agent string) int { count := 0 for _, statusPath := range WorkspaceStatusPaths() { st, err := ReadStatus(core.PathDir(statusPath)) @@ -221,7 +235,7 @@ func (s *PrepSubsystem) countRunningByModel(agent string) int { if st.Agent != agent { continue } - if PIDAlive(st.PID) { + if ProcessAlive(runtime, st.ProcessID, st.PID) { count++ } } @@ -340,13 +354,14 @@ func (s *PrepSubsystem) drainOne() bool { prompt := core.Concat("TASK: ", st.Task, "\n\nResume from where you left off. Read CODEX.md for conventions. Commit when done.") - pid, _, err := s.spawnAgent(st.Agent, prompt, wsDir) + pid, processID, _, err := s.spawnAgent(st.Agent, prompt, wsDir) if err != nil { continue } st.Status = "running" st.PID = pid + st.ProcessID = processID st.Runs++ writeStatus(wsDir, st) s.TrackWorkspace(WorkspaceName(wsDir), st) diff --git a/pkg/agentic/queue_extra_test.go b/pkg/agentic/queue_extra_test.go index 61a340f..3fe7254 100644 --- a/pkg/agentic/queue_extra_test.go +++ b/pkg/agentic/queue_extra_test.go @@ -3,20 +3,33 @@ package agentic import ( - "strconv" + "context" "testing" "time" core "dappco.re/go/core" + "dappco.re/go/core/process" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/yaml.v3" ) -// mustPID returns the current process PID as int via Core's cached Env. -func mustPID() int { - pid, _ := strconv.Atoi(core.Env("PID")) - return pid +func startManagedProcess(t *testing.T, c *core.Core) *process.Process { + t.Helper() + + r := c.Process().Start(context.Background(), core.NewOptions( + core.Option{Key: "command", Value: "sleep"}, + core.Option{Key: "args", Value: []string{"30"}}, + core.Option{Key: "detach", Value: true}, + )) + require.True(t, r.OK) + + proc, ok := r.Value.(*process.Process) + require.True(t, ok) + t.Cleanup(func() { + _ = proc.Kill() + }) + return proc } // --- UnmarshalYAML for ConcurrencyLimit --- @@ -99,9 +112,9 @@ rates: s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - codePath: t.TempDir(), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + codePath: t.TempDir(), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } d := s.delayForAgent("codex:gpt-5.4") @@ -118,8 +131,8 @@ func TestQueue_CountRunningByModel_Good_NoWorkspaces(t *testing.T) { s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } assert.Equal(t, 0, s.countRunningByModel("codex:gpt-5.4")) } @@ -133,9 +146,9 @@ func TestQueue_DrainQueue_Good_NoCoreFallsBackToMutex(t *testing.T) { s := &PrepSubsystem{ ServiceRuntime: nil, - frozen: false, - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + frozen: false, + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } assert.NotPanics(t, func() { s.drainQueue() }) } @@ -147,9 +160,9 @@ func TestQueue_DrainOne_Good_NoWorkspaces(t *testing.T) { s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - codePath: t.TempDir(), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + codePath: t.TempDir(), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } assert.False(t, s.drainOne()) } @@ -166,9 +179,9 @@ func TestQueue_DrainOne_Good_SkipsNonQueued(t *testing.T) { s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - codePath: t.TempDir(), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + codePath: t.TempDir(), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } assert.False(t, s.drainOne()) } @@ -185,7 +198,7 @@ func TestQueue_DrainOne_Good_SkipsBackedOffPool(t *testing.T) { s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - codePath: t.TempDir(), + codePath: t.TempDir(), backoff: map[string]time.Time{ "codex": time.Now().Add(1 * time.Hour), }, @@ -210,8 +223,8 @@ func TestQueue_CanDispatchAgent_Ugly(t *testing.T) { s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } // No running workspaces → should be able to dispatch @@ -231,9 +244,9 @@ func TestQueue_DrainQueue_Ugly(t *testing.T) { c := core.New() s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}), - frozen: false, - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + frozen: false, + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } // Not frozen, Core is present, empty workspace → drainQueue runs the Core lock path without panic @@ -247,26 +260,30 @@ func TestQueue_CanDispatchAgent_Bad_AgentAtLimit(t *testing.T) { t.Setenv("CORE_WORKSPACE", root) wsRoot := core.JoinPath(root, "workspace") - // Create a running workspace with a valid-looking PID (use our own PID) + c := core.New(core.WithService(ProcessRegister)) + c.ServiceStartup(context.Background(), nil) + + // Create a running workspace backed by a managed process. ws := core.JoinPath(wsRoot, "ws-running") fs.EnsureDir(ws) + proc := startManagedProcess(t, c) st := &WorkspaceStatus{ - Status: "running", - Agent: "claude", - Repo: "go-io", - PID: mustPID(), // Our own PID so Kill(pid, 0) succeeds + Status: "running", + Agent: "claude", + Repo: "go-io", + PID: proc.Info().PID, + ProcessID: proc.ID, } fs.Write(core.JoinPath(ws, "status.json"), core.JSONMarshalString(st)) - c := core.New() c.Config().Set("agents.concurrency", map[string]ConcurrencyLimit{ "claude": {Total: 1}, }) s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } // Agent at limit (1 running, limit is 1) — cannot dispatch @@ -283,18 +300,20 @@ func TestQueue_CountRunningByAgent_Bad_WrongAgentType(t *testing.T) { // Create a running workspace for a different agent type ws := core.JoinPath(wsRoot, "ws-gemini") fs.EnsureDir(ws) + proc := startManagedProcess(t, testCore) st := &WorkspaceStatus{ - Status: "running", - Agent: "gemini", - Repo: "go-io", - PID: mustPID(), + Status: "running", + Agent: "gemini", + Repo: "go-io", + PID: proc.Info().PID, + ProcessID: proc.ID, } fs.Write(core.JoinPath(ws, "status.json"), core.JSONMarshalString(st)) s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } // Counting for "claude" when only "gemini" is running → 0 @@ -313,8 +332,8 @@ func TestQueue_CountRunningByAgent_Ugly_CorruptStatusJSON(t *testing.T) { s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } // Corrupt status.json → ReadStatus fails → skipped → count is 0 @@ -330,18 +349,20 @@ func TestQueue_CountRunningByModel_Bad_NoMatchingModel(t *testing.T) { ws := core.JoinPath(wsRoot, "ws-1") fs.EnsureDir(ws) + proc := startManagedProcess(t, testCore) st := &WorkspaceStatus{ - Status: "running", - Agent: "codex:gpt-5.4", - Repo: "go-io", - PID: mustPID(), + Status: "running", + Agent: "codex:gpt-5.4", + Repo: "go-io", + PID: proc.Info().PID, + ProcessID: proc.ID, } fs.Write(core.JoinPath(ws, "status.json"), core.JSONMarshalString(st)) s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } // Looking for a model that doesn't match any running workspace @@ -362,14 +383,15 @@ func TestQueue_CountRunningByModel_Ugly_ModelMismatch(t *testing.T) { } { d := core.JoinPath(wsRoot, ws.name) fs.EnsureDir(d) - st := &WorkspaceStatus{Status: "running", Agent: ws.agent, Repo: "test", PID: mustPID()} + proc := startManagedProcess(t, testCore) + st := &WorkspaceStatus{Status: "running", Agent: ws.agent, Repo: "test", PID: proc.Info().PID, ProcessID: proc.ID} fs.Write(core.JoinPath(d, "status.json"), core.JSONMarshalString(st)) } s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } // countRunningByModel does exact match on agent string @@ -395,9 +417,9 @@ rates: s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - codePath: t.TempDir(), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + codePath: t.TempDir(), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } // sustained_delay is 0 → delayForAgent returns 0 @@ -420,9 +442,9 @@ rates: s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - codePath: t.TempDir(), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + codePath: t.TempDir(), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } // Malformed reset_utc — strconv.Atoi fails → defaults to hour=6, min=0 @@ -439,14 +461,16 @@ func TestQueue_DrainOne_Bad_QueuedButAtConcurrencyLimit(t *testing.T) { t.Setenv("CORE_WORKSPACE", root) wsRoot := core.JoinPath(root, "workspace") - // Create a running workspace that uses our PID + // Create a running workspace backed by a managed process. wsRunning := core.JoinPath(wsRoot, "ws-running") fs.EnsureDir(wsRunning) + proc := startManagedProcess(t, testCore) stRunning := &WorkspaceStatus{ - Status: "running", - Agent: "claude", - Repo: "go-io", - PID: mustPID(), + Status: "running", + Agent: "claude", + Repo: "go-io", + PID: proc.Info().PID, + ProcessID: proc.ID, } fs.Write(core.JoinPath(wsRunning, "status.json"), core.JSONMarshalString(stRunning)) @@ -463,9 +487,9 @@ func TestQueue_DrainOne_Bad_QueuedButAtConcurrencyLimit(t *testing.T) { s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}), - codePath: t.TempDir(), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + codePath: t.TempDir(), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } // Queued workspace exists but agent is at concurrency limit → drainOne returns false @@ -485,7 +509,7 @@ func TestQueue_DrainOne_Ugly_QueuedButInBackoffWindow(t *testing.T) { s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - codePath: t.TempDir(), + codePath: t.TempDir(), backoff: map[string]time.Time{ "codex": time.Now().Add(1 * time.Hour), // pool is backed off }, @@ -547,9 +571,9 @@ rates: s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - codePath: t.TempDir(), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + codePath: t.TempDir(), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } loaded := s.loadAgentsConfig() @@ -569,9 +593,9 @@ func TestQueue_LoadAgentsConfig_Bad(t *testing.T) { s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - codePath: t.TempDir(), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + codePath: t.TempDir(), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } // Should return defaults when YAML is corrupt @@ -587,9 +611,9 @@ func TestQueue_LoadAgentsConfig_Ugly(t *testing.T) { s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - codePath: t.TempDir(), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + codePath: t.TempDir(), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } loaded := s.loadAgentsConfig() @@ -614,10 +638,10 @@ func TestQueue_DrainQueue_Bad_FrozenQueueDoesNothing(t *testing.T) { s := &PrepSubsystem{ ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), - frozen: true, // queue is frozen - codePath: t.TempDir(), - backoff: make(map[string]time.Time), - failCount: make(map[string]int), + frozen: true, // queue is frozen + codePath: t.TempDir(), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), } // Frozen queue returns immediately without draining diff --git a/pkg/agentic/resume.go b/pkg/agentic/resume.go index 12f54cc..5042409 100644 --- a/pkg/agentic/resume.go +++ b/pkg/agentic/resume.go @@ -94,7 +94,7 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu } // Spawn agent via go-process - pid, _, err := s.spawnAgent(agent, prompt, wsDir) + pid, processID, _, err := s.spawnAgent(agent, prompt, wsDir) if err != nil { return nil, ResumeOutput{}, err } @@ -102,6 +102,7 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu // Update status st.Status = "running" st.PID = pid + st.ProcessID = processID st.Runs++ st.Question = "" writeStatus(wsDir, st) diff --git a/pkg/agentic/status.go b/pkg/agentic/status.go index fda3498..3fca547 100644 --- a/pkg/agentic/status.go +++ b/pkg/agentic/status.go @@ -175,6 +175,10 @@ func (s *PrepSubsystem) registerStatusTool(server *mcp.Server) { func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, input StatusInput) (*mcp.CallToolResult, StatusOutput, error) { statusFiles := WorkspaceStatusPaths() + var runtime *core.Core + if s.ServiceRuntime != nil { + runtime = s.Core() + } var out StatusOutput @@ -189,9 +193,9 @@ func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, inpu continue } - // If status is "running", check if PID is still alive - if st.Status == "running" && st.PID > 0 { - if !PIDAlive(st.PID) { + // If status is "running", check whether the managed process is still alive. + if st.Status == "running" && (st.ProcessID != "" || st.PID > 0) { + if !ProcessAlive(runtime, st.ProcessID, st.PID) { blockedPath := workspaceBlockedPath(wsDir) if r := fs.Read(blockedPath); r.OK { st.Status = "blocked" diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index 8b2b0b5..2519f58 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -260,7 +260,7 @@ func (m *Subsystem) Poke() { // checkIdleAfterDelay waits briefly then checks if the fleet is genuinely idle. // Only emits queue.drained when there are truly zero running or queued agents, -// verified by checking PIDs are alive, not just trusting status files. +// verified by checking managed processes are alive, not just trusting status files. func (m *Subsystem) checkIdleAfterDelay() { time.Sleep(5 * time.Second) // wait for queue drain to fill slots if m.ServiceRuntime == nil { @@ -274,8 +274,12 @@ func (m *Subsystem) checkIdleAfterDelay() { } // countLiveWorkspaces counts workspaces that are genuinely active. -// For "running" status, verifies the PID is still alive. +// For "running" status, verifies the managed process is still alive. func (m *Subsystem) countLiveWorkspaces() (running, queued int) { + var runtime *core.Core + if m.ServiceRuntime != nil { + runtime = m.Core() + } for _, path := range agentic.WorkspaceStatusPaths() { wsDir := core.PathDir(path) r := agentic.ReadStatusResult(wsDir) @@ -288,7 +292,7 @@ func (m *Subsystem) countLiveWorkspaces() (running, queued int) { } switch st.Status { case "running": - if st.PID > 0 && pidAlive(st.PID) { + if st.PID > 0 && processAlive(runtime, st.ProcessID, st.PID) { running++ } case "queued": @@ -298,9 +302,9 @@ func (m *Subsystem) countLiveWorkspaces() (running, queued int) { return } -// pidAlive checks whether a process is still running. -func pidAlive(pid int) bool { - return agentic.PIDAlive(pid) +// processAlive checks whether a managed process is still running. +func processAlive(c *core.Core, processID string, pid int) bool { + return agentic.ProcessAlive(c, processID, pid) } func (m *Subsystem) loop(ctx context.Context) { @@ -438,7 +442,7 @@ func (m *Subsystem) checkCompletions() string { return "" } - // Only emit queue.drained when genuinely empty — verified by live PID check + // Only emit queue.drained when genuinely empty — verified by live process checks liveRunning, liveQueued := m.countLiveWorkspaces() if m.ServiceRuntime != nil && liveRunning == 0 && liveQueued == 0 { m.Core().ACTION(messages.QueueDrained{Completed: len(newlyCompleted)}) diff --git a/pkg/monitor/monitor_test.go b/pkg/monitor/monitor_test.go index 94b97f1..805556d 100644 --- a/pkg/monitor/monitor_test.go +++ b/pkg/monitor/monitor_test.go @@ -8,13 +8,13 @@ import ( "net/http" "net/http/httptest" "os" - "strconv" "testing" "time" "dappco.re/go/agent/pkg/agentic" "dappco.re/go/agent/pkg/messages" core "dappco.re/go/core" + "dappco.re/go/core/process" "github.com/modelcontextprotocol/go-sdk/mcp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -71,6 +71,24 @@ func writeWorkspaceStatus(t *testing.T, wsRoot, name string, fields map[string]a return dir } +func startManagedProcess(t *testing.T, c *core.Core) *process.Process { + t.Helper() + + r := c.Process().Start(context.Background(), core.NewOptions( + core.Option{Key: "command", Value: "sleep"}, + core.Option{Key: "args", Value: []string{"30"}}, + core.Option{Key: "detach", Value: true}, + )) + require.True(t, r.OK) + + proc, ok := r.Value.(*process.Process) + require.True(t, ok) + t.Cleanup(func() { + _ = proc.Kill() + }) + return proc +} + // --- New --- func TestMonitor_New_Good_Defaults(t *testing.T) { @@ -328,37 +346,40 @@ func TestMonitor_CountLiveWorkspaces_Good_RunningLivePID(t *testing.T) { wsRoot := t.TempDir() t.Setenv("CORE_WORKSPACE", wsRoot) - pid, _ := strconv.Atoi(core.Env("PID")) + proc := startManagedProcess(t, testMon.Core()) + pid := proc.Info().PID writeWorkspaceStatus(t, wsRoot, "ws-live", map[string]any{ - "status": "running", - "repo": "go-io", - "agent": "codex", - "pid": pid, + "status": "running", + "repo": "go-io", + "agent": "codex", + "pid": pid, + "process_id": proc.ID, }) mon := New() + mon.ServiceRuntime = testMon.ServiceRuntime running, queued := mon.countLiveWorkspaces() assert.Equal(t, 1, running) assert.Equal(t, 0, queued) } -// --- pidAlive --- +// --- processAlive --- -func TestMonitor_PidAlive_Good_CurrentProcess(t *testing.T) { - pid, _ := strconv.Atoi(core.Env("PID")) - assert.True(t, pidAlive(pid), "current process must be alive") +func TestMonitor_ProcessAlive_Good_ManagedProcess(t *testing.T) { + proc := startManagedProcess(t, testMon.Core()) + assert.True(t, processAlive(testMon.Core(), proc.ID, proc.Info().PID), "managed process must be alive") } -func TestMonitor_PidAlive_Bad_DeadPID(t *testing.T) { - assert.False(t, pidAlive(99999999)) +func TestMonitor_ProcessAlive_Bad_DeadPID(t *testing.T) { + assert.False(t, processAlive(testMon.Core(), "", 99999999)) } -func TestMonitor_PidAlive_Ugly_ZeroPID(t *testing.T) { - assert.NotPanics(t, func() { pidAlive(0) }) +func TestMonitor_ProcessAlive_Ugly_ZeroPID(t *testing.T) { + assert.NotPanics(t, func() { processAlive(testMon.Core(), "", 0) }) } -func TestMonitor_PidAlive_Ugly_NegativePID(t *testing.T) { - assert.NotPanics(t, func() { pidAlive(-1) }) +func TestMonitor_ProcessAlive_Ugly_NegativePID(t *testing.T) { + assert.NotPanics(t, func() { processAlive(testMon.Core(), "", -1) }) } // --- OnStartup / OnShutdown --- diff --git a/pkg/runner/queue.go b/pkg/runner/queue.go index 6be1080..684e0ec 100644 --- a/pkg/runner/queue.go +++ b/pkg/runner/queue.go @@ -161,6 +161,10 @@ func (s *Service) canDispatchAgent(agent string) (bool, string) { // // n := s.countRunningByAgent("codex") func (s *Service) countRunningByAgent(agent string) int { + var runtime *core.Core + if s.ServiceRuntime != nil { + runtime = s.Core() + } count := 0 s.workspaces.Each(func(_ string, st *WorkspaceStatus) { if st.Status != "running" || baseAgent(st.Agent) != agent { @@ -169,7 +173,7 @@ func (s *Service) countRunningByAgent(agent string) int { switch { case st.PID < 0: count++ - case st.PID > 0 && agentic.PIDAlive(st.PID): + case st.PID > 0 && agentic.ProcessAlive(runtime, "", st.PID): count++ } }) @@ -180,6 +184,10 @@ func (s *Service) countRunningByAgent(agent string) int { // // n := s.countRunningByModel("codex:gpt-5.4") func (s *Service) countRunningByModel(agent string) int { + var runtime *core.Core + if s.ServiceRuntime != nil { + runtime = s.Core() + } count := 0 s.workspaces.Each(func(_ string, st *WorkspaceStatus) { if st.Status != "running" || st.Agent != agent { @@ -188,7 +196,7 @@ func (s *Service) countRunningByModel(agent string) int { switch { case st.PID < 0: count++ - case st.PID > 0 && agentic.PIDAlive(st.PID): + case st.PID > 0 && agentic.ProcessAlive(runtime, "", st.PID): count++ } }) diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 8115af6..689741d 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -334,10 +334,14 @@ func (s *Service) actionStop(_ context.Context, _ core.Options) core.Result { func (s *Service) actionKill(_ context.Context, _ core.Options) core.Result { s.frozen = true + var runtime *core.Core + if s.ServiceRuntime != nil { + runtime = s.Core() + } killed := 0 s.workspaces.Each(func(_ string, st *WorkspaceStatus) { if st.Status == "running" && st.PID > 0 { - if agentic.PIDTerminate(st.PID) { + if agentic.ProcessTerminate(runtime, "", st.PID) { killed++ } st.Status = "failed"