fix(ax): remove pid syscall wrappers

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-03-30 16:01:32 +00:00
parent 3c2575f45b
commit 6d239d5b95
14 changed files with 328 additions and 183 deletions

View file

@ -387,10 +387,10 @@ func (s *PrepSubsystem) onAgentComplete(agent, wsDir, outputFile string, exitCod
// spawnAgent launches an agent inside a Docker container.
// The repo/ directory is mounted at /workspace, agent runs sandboxed.
// Output is captured and written to .meta/agent-{agent}.log on completion.
func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, error) {
func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, string, error) {
command, args, err := agentCommand(agent, prompt)
if err != nil {
return 0, "", err
return 0, "", "", err
}
repoDir := WorkspaceRepoDir(wsDir)
@ -406,7 +406,7 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er
procSvc, ok := core.ServiceFor[*process.Service](s.Core(), "process")
if !ok {
return 0, "", core.E("dispatch.spawnAgent", "process service not registered", nil)
return 0, "", "", core.E("dispatch.spawnAgent", "process service not registered", nil)
}
proc, err := procSvc.StartWithOptions(context.Background(), process.RunOptions{
Command: command,
@ -415,11 +415,12 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er
Detach: true,
})
if err != nil {
return 0, "", core.E("dispatch.spawnAgent", core.Concat("failed to spawn ", agent), err)
return 0, "", "", core.E("dispatch.spawnAgent", core.Concat("failed to spawn ", agent), err)
}
proc.CloseStdin()
pid := proc.Info().PID
processID := proc.ID
s.broadcastStart(agent, wsDir)
s.startIssueTracking(wsDir)
@ -435,7 +436,7 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er
})
s.Core().PerformAsync(monitorAction, core.NewOptions())
return pid, outputFile, nil
return pid, processID, outputFile, nil
}
// runQA runs build + test checks on the repo after agent completion.
@ -560,7 +561,7 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest,
}
// Step 3: Spawn agent in repo/ directory
pid, outputFile, err := s.spawnAgent(input.Agent, prompt, wsDir)
pid, processID, outputFile, err := s.spawnAgent(input.Agent, prompt, wsDir)
if err != nil {
return nil, DispatchOutput{}, err
}
@ -573,6 +574,7 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest,
Task: input.Task,
Branch: prepOut.Branch,
PID: pid,
ProcessID: processID,
StartedAt: time.Now(),
Runs: 1,
}

View file

@ -62,7 +62,7 @@ func (s *PrepSubsystem) DispatchSync(ctx context.Context, input DispatchSyncInpu
core.Print(nil, " branch: %s", prepOut.Branch)
// Spawn agent directly — no queue, no concurrency check
pid, _, err := s.spawnAgent(input.Agent, prompt, wsDir)
pid, processID, _, err := s.spawnAgent(input.Agent, prompt, wsDir)
if err != nil {
return DispatchSyncResult{Error: err.Error()}
}
@ -70,6 +70,11 @@ func (s *PrepSubsystem) DispatchSync(ctx context.Context, input DispatchSyncInpu
core.Print(nil, " pid: %d", pid)
core.Print(nil, " waiting for completion...")
var runtime *core.Core
if s.ServiceRuntime != nil {
runtime = s.Core()
}
// Poll for process exit
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
@ -79,7 +84,7 @@ func (s *PrepSubsystem) DispatchSync(ctx context.Context, input DispatchSyncInpu
case <-ctx.Done():
return DispatchSyncResult{Error: "cancelled"}
case <-ticker.C:
if pid > 0 && !PIDAlive(pid) {
if pid > 0 && !ProcessAlive(runtime, processID, pid) {
// Process exited — read final status
st, err := ReadStatus(wsDir)
if err != nil {

View file

@ -33,13 +33,14 @@ func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Res
break
}
prompt := core.Concat("TASK: ", ev.Task, "\n\nResume from where you left off. Read CODEX.md for conventions. Commit when done.")
pid, outputFile, err := s.spawnAgent(ev.Agent, prompt, wsDir)
pid, processID, outputFile, err := s.spawnAgent(ev.Agent, prompt, wsDir)
if err != nil {
break
}
// Update status with real PID
if st, serr := ReadStatus(wsDir); serr == nil {
st.PID = pid
st.ProcessID = processID
writeStatus(wsDir, st)
if runnerSvc, ok := core.ServiceFor[workspaceTracker](c, "runner"); ok {
runnerSvc.TrackWorkspace(WorkspaceName(wsDir), st)
@ -57,7 +58,7 @@ func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Res
// r := prep.SpawnFromQueue("codex", prompt, wsDir)
// pid := r.Value.(int)
func (s *PrepSubsystem) SpawnFromQueue(agent, prompt, wsDir string) core.Result {
pid, _, err := s.spawnAgent(agent, prompt, wsDir)
pid, _, _, err := s.spawnAgent(agent, prompt, wsDir)
if err != nil {
return core.Result{
Value: core.E("agentic.SpawnFromQueue", "failed to spawn queued agent", err),

View file

@ -2,24 +2,72 @@
package agentic
import "syscall"
import (
core "dappco.re/go/core"
"dappco.re/go/core/process"
)
// PIDAlive checks if an OS process is still alive via PID signal check.
// ProcessAlive checks whether a managed process is still running.
//
// if agentic.PIDAlive(pid) { ... }
func PIDAlive(pid int) bool {
if pid > 0 {
return syscall.Kill(pid, 0) == nil
// alive := agentic.ProcessAlive(c, proc.ID, proc.Info().PID)
// alive := agentic.ProcessAlive(c, "", 12345) // legacy PID fallback
func ProcessAlive(c *core.Core, processID string, pid int) bool {
if c == nil {
return false
}
svc, ok := core.ServiceFor[*process.Service](c, "process")
if !ok {
return false
}
if processID != "" {
if proc, err := svc.Get(processID); err == nil {
return proc.IsRunning()
}
}
if pid <= 0 {
return false
}
for _, proc := range svc.Running() {
if proc.Info().PID == pid {
return true
}
}
return false
}
// PIDTerminate terminates a process via SIGTERM.
// ProcessTerminate stops a managed process.
//
// if agentic.PIDTerminate(pid) { ... }
func PIDTerminate(pid int) bool {
if pid > 0 {
return syscall.Kill(pid, syscall.SIGTERM) == nil
// _ = agentic.ProcessTerminate(c, proc.ID, proc.Info().PID)
func ProcessTerminate(c *core.Core, processID string, pid int) bool {
if c == nil {
return false
}
svc, ok := core.ServiceFor[*process.Service](c, "process")
if !ok {
return false
}
if processID != "" {
if err := svc.Kill(processID); err == nil {
return true
}
}
if pid <= 0 {
return false
}
for _, proc := range svc.Running() {
if proc.Info().PID == pid {
return svc.Kill(proc.ID) == nil
}
}
return false
}

View file

@ -3,17 +3,39 @@
package agentic
import (
"os"
"context"
core "dappco.re/go/core"
"dappco.re/go/core/process"
)
func ExamplePIDAlive() {
core.Println(PIDAlive(os.Getpid()))
func ExampleProcessAlive() {
r := testCore.Process().Start(context.Background(), core.NewOptions(
core.Option{Key: "command", Value: "sleep"},
core.Option{Key: "args", Value: []string{"1"}},
core.Option{Key: "detach", Value: true},
))
if !r.OK {
return
}
proc := r.Value.(*process.Process)
defer proc.Kill()
core.Println(ProcessAlive(testCore, proc.ID, proc.Info().PID))
// Output: true
}
func ExamplePIDTerminate() {
core.Println(PIDTerminate(0))
// Output: false
func ExampleProcessTerminate() {
r := testCore.Process().Start(context.Background(), core.NewOptions(
core.Option{Key: "command", Value: "sleep"},
core.Option{Key: "args", Value: []string{"1"}},
core.Option{Key: "detach", Value: true},
))
if !r.OK {
return
}
proc := r.Value.(*process.Process)
defer proc.Kill()
core.Println(ProcessTerminate(testCore, proc.ID, proc.Info().PID))
// Output: true
}

View file

@ -5,14 +5,11 @@ package agentic
import (
"context"
"os"
"strconv"
"testing"
"time"
core "dappco.re/go/core"
"dappco.re/go/core/process"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// testPrep is the package-level PrepSubsystem for tests that need process execution.
@ -52,56 +49,45 @@ func newPrepWithProcess() *PrepSubsystem {
}
}
// --- PIDAlive ---
func TestPid_PIDAlive_Good(t *testing.T) {
pid, _ := strconv.Atoi(core.Env("PID"))
assert.True(t, PIDAlive(pid))
}
func TestPid_PIDAlive_Bad(t *testing.T) {
assert.False(t, PIDAlive(999999))
}
func TestPid_PIDAlive_Ugly(t *testing.T) {
assert.False(t, PIDAlive(0))
}
// --- PIDTerminate ---
func TestPid_PIDTerminate_Good(t *testing.T) {
r := testCore.Process().Start(context.Background(), core.NewOptions(
core.Option{Key: "command", Value: "sleep"},
core.Option{Key: "args", Value: []string{"30"}},
core.Option{Key: "detach", Value: true},
))
require.True(t, r.OK)
proc, ok := r.Value.(*process.Process)
require.True(t, ok)
// --- ProcessAlive ---
func TestPid_ProcessAlive_Good(t *testing.T) {
proc := startManagedProcess(t, testCore)
pid := proc.Info().PID
require.NotZero(t, pid)
defer func() {
_ = proc.Kill()
}()
assert.True(t, ProcessAlive(testCore, proc.ID, pid))
assert.True(t, ProcessAlive(testCore, "", pid))
}
assert.True(t, PIDTerminate(pid))
func TestPid_ProcessAlive_Bad(t *testing.T) {
assert.False(t, ProcessAlive(testCore, "", 999999))
}
func TestPid_ProcessAlive_Ugly(t *testing.T) {
assert.False(t, ProcessAlive(nil, "", 0))
}
// --- ProcessTerminate ---
func TestPid_ProcessTerminate_Good(t *testing.T) {
proc := startManagedProcess(t, testCore)
pid := proc.Info().PID
assert.True(t, ProcessTerminate(testCore, proc.ID, pid))
select {
case <-proc.Done():
case <-time.After(5 * time.Second):
t.Fatal("PIDTerminate did not stop the process")
t.Fatal("ProcessTerminate did not stop the process")
}
assert.False(t, PIDAlive(pid))
assert.False(t, ProcessAlive(testCore, proc.ID, pid))
}
func TestPid_PIDTerminate_Bad(t *testing.T) {
assert.False(t, PIDTerminate(999999))
func TestPid_ProcessTerminate_Bad(t *testing.T) {
assert.False(t, ProcessTerminate(testCore, "", 999999))
}
func TestPid_PIDTerminate_Ugly(t *testing.T) {
assert.False(t, PIDTerminate(0))
func TestPid_ProcessTerminate_Ugly(t *testing.T) {
assert.False(t, ProcessTerminate(nil, "", 0))
}

View file

@ -163,10 +163,14 @@ func (s *PrepSubsystem) delayForAgent(agent string) time.Duration {
//
// n := s.countRunningByAgent("codex") // counts all codex:* variants
func (s *PrepSubsystem) countRunningByAgent(agent string) int {
var runtime *core.Core
if s.ServiceRuntime != nil {
runtime = s.Core()
}
if s.workspaces != nil && s.workspaces.Len() > 0 {
count := 0
s.workspaces.Each(func(_ string, st *WorkspaceStatus) {
if st.Status == "running" && baseAgent(st.Agent) == agent && PIDAlive(st.PID) {
if st.Status == "running" && baseAgent(st.Agent) == agent && ProcessAlive(runtime, st.ProcessID, st.PID) {
count++
}
})
@ -174,12 +178,12 @@ func (s *PrepSubsystem) countRunningByAgent(agent string) int {
}
// Fallback: scan disk (cold start before hydration)
return s.countRunningByAgentDisk(agent)
return s.countRunningByAgentDisk(runtime, agent)
}
// countRunningByAgentDisk scans workspace status.json files on disk.
// Used only as fallback before Registry hydration completes.
func (s *PrepSubsystem) countRunningByAgentDisk(agent string) int {
func (s *PrepSubsystem) countRunningByAgentDisk(runtime *core.Core, agent string) int {
count := 0
for _, statusPath := range WorkspaceStatusPaths() {
st, err := ReadStatus(core.PathDir(statusPath))
@ -189,7 +193,7 @@ func (s *PrepSubsystem) countRunningByAgentDisk(agent string) int {
if baseAgent(st.Agent) != agent {
continue
}
if PIDAlive(st.PID) {
if ProcessAlive(runtime, st.ProcessID, st.PID) {
count++
}
}
@ -201,10 +205,14 @@ func (s *PrepSubsystem) countRunningByAgentDisk(agent string) int {
//
// n := s.countRunningByModel("codex:gpt-5.4") // counts only that model
func (s *PrepSubsystem) countRunningByModel(agent string) int {
var runtime *core.Core
if s.ServiceRuntime != nil {
runtime = s.Core()
}
if s.workspaces != nil && s.workspaces.Len() > 0 {
count := 0
s.workspaces.Each(func(_ string, st *WorkspaceStatus) {
if st.Status == "running" && st.Agent == agent && PIDAlive(st.PID) {
if st.Status == "running" && st.Agent == agent && ProcessAlive(runtime, st.ProcessID, st.PID) {
count++
}
})
@ -212,6 +220,12 @@ func (s *PrepSubsystem) countRunningByModel(agent string) int {
}
// Fallback: scan disk
return s.countRunningByModelDisk(runtime, agent)
}
// countRunningByModelDisk scans workspace status.json files on disk.
// Used only as fallback before Registry hydration completes.
func (s *PrepSubsystem) countRunningByModelDisk(runtime *core.Core, agent string) int {
count := 0
for _, statusPath := range WorkspaceStatusPaths() {
st, err := ReadStatus(core.PathDir(statusPath))
@ -221,7 +235,7 @@ func (s *PrepSubsystem) countRunningByModel(agent string) int {
if st.Agent != agent {
continue
}
if PIDAlive(st.PID) {
if ProcessAlive(runtime, st.ProcessID, st.PID) {
count++
}
}
@ -340,13 +354,14 @@ func (s *PrepSubsystem) drainOne() bool {
prompt := core.Concat("TASK: ", st.Task, "\n\nResume from where you left off. Read CODEX.md for conventions. Commit when done.")
pid, _, err := s.spawnAgent(st.Agent, prompt, wsDir)
pid, processID, _, err := s.spawnAgent(st.Agent, prompt, wsDir)
if err != nil {
continue
}
st.Status = "running"
st.PID = pid
st.ProcessID = processID
st.Runs++
writeStatus(wsDir, st)
s.TrackWorkspace(WorkspaceName(wsDir), st)

View file

@ -3,20 +3,33 @@
package agentic
import (
"strconv"
"context"
"testing"
"time"
core "dappco.re/go/core"
"dappco.re/go/core/process"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
)
// mustPID returns the current process PID as int via Core's cached Env.
func mustPID() int {
pid, _ := strconv.Atoi(core.Env("PID"))
return pid
func startManagedProcess(t *testing.T, c *core.Core) *process.Process {
t.Helper()
r := c.Process().Start(context.Background(), core.NewOptions(
core.Option{Key: "command", Value: "sleep"},
core.Option{Key: "args", Value: []string{"30"}},
core.Option{Key: "detach", Value: true},
))
require.True(t, r.OK)
proc, ok := r.Value.(*process.Process)
require.True(t, ok)
t.Cleanup(func() {
_ = proc.Kill()
})
return proc
}
// --- UnmarshalYAML for ConcurrencyLimit ---
@ -99,9 +112,9 @@ rates:
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
codePath: t.TempDir(),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
codePath: t.TempDir(),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
d := s.delayForAgent("codex:gpt-5.4")
@ -118,8 +131,8 @@ func TestQueue_CountRunningByModel_Good_NoWorkspaces(t *testing.T) {
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
assert.Equal(t, 0, s.countRunningByModel("codex:gpt-5.4"))
}
@ -133,9 +146,9 @@ func TestQueue_DrainQueue_Good_NoCoreFallsBackToMutex(t *testing.T) {
s := &PrepSubsystem{
ServiceRuntime: nil,
frozen: false,
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
frozen: false,
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
assert.NotPanics(t, func() { s.drainQueue() })
}
@ -147,9 +160,9 @@ func TestQueue_DrainOne_Good_NoWorkspaces(t *testing.T) {
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
codePath: t.TempDir(),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
codePath: t.TempDir(),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
assert.False(t, s.drainOne())
}
@ -166,9 +179,9 @@ func TestQueue_DrainOne_Good_SkipsNonQueued(t *testing.T) {
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
codePath: t.TempDir(),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
codePath: t.TempDir(),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
assert.False(t, s.drainOne())
}
@ -185,7 +198,7 @@ func TestQueue_DrainOne_Good_SkipsBackedOffPool(t *testing.T) {
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
codePath: t.TempDir(),
codePath: t.TempDir(),
backoff: map[string]time.Time{
"codex": time.Now().Add(1 * time.Hour),
},
@ -210,8 +223,8 @@ func TestQueue_CanDispatchAgent_Ugly(t *testing.T) {
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
// No running workspaces → should be able to dispatch
@ -231,9 +244,9 @@ func TestQueue_DrainQueue_Ugly(t *testing.T) {
c := core.New()
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}),
frozen: false,
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
frozen: false,
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
// Not frozen, Core is present, empty workspace → drainQueue runs the Core lock path without panic
@ -247,26 +260,30 @@ func TestQueue_CanDispatchAgent_Bad_AgentAtLimit(t *testing.T) {
t.Setenv("CORE_WORKSPACE", root)
wsRoot := core.JoinPath(root, "workspace")
// Create a running workspace with a valid-looking PID (use our own PID)
c := core.New(core.WithService(ProcessRegister))
c.ServiceStartup(context.Background(), nil)
// Create a running workspace backed by a managed process.
ws := core.JoinPath(wsRoot, "ws-running")
fs.EnsureDir(ws)
proc := startManagedProcess(t, c)
st := &WorkspaceStatus{
Status: "running",
Agent: "claude",
Repo: "go-io",
PID: mustPID(), // Our own PID so Kill(pid, 0) succeeds
Status: "running",
Agent: "claude",
Repo: "go-io",
PID: proc.Info().PID,
ProcessID: proc.ID,
}
fs.Write(core.JoinPath(ws, "status.json"), core.JSONMarshalString(st))
c := core.New()
c.Config().Set("agents.concurrency", map[string]ConcurrencyLimit{
"claude": {Total: 1},
})
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
// Agent at limit (1 running, limit is 1) — cannot dispatch
@ -283,18 +300,20 @@ func TestQueue_CountRunningByAgent_Bad_WrongAgentType(t *testing.T) {
// Create a running workspace for a different agent type
ws := core.JoinPath(wsRoot, "ws-gemini")
fs.EnsureDir(ws)
proc := startManagedProcess(t, testCore)
st := &WorkspaceStatus{
Status: "running",
Agent: "gemini",
Repo: "go-io",
PID: mustPID(),
Status: "running",
Agent: "gemini",
Repo: "go-io",
PID: proc.Info().PID,
ProcessID: proc.ID,
}
fs.Write(core.JoinPath(ws, "status.json"), core.JSONMarshalString(st))
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
// Counting for "claude" when only "gemini" is running → 0
@ -313,8 +332,8 @@ func TestQueue_CountRunningByAgent_Ugly_CorruptStatusJSON(t *testing.T) {
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
// Corrupt status.json → ReadStatus fails → skipped → count is 0
@ -330,18 +349,20 @@ func TestQueue_CountRunningByModel_Bad_NoMatchingModel(t *testing.T) {
ws := core.JoinPath(wsRoot, "ws-1")
fs.EnsureDir(ws)
proc := startManagedProcess(t, testCore)
st := &WorkspaceStatus{
Status: "running",
Agent: "codex:gpt-5.4",
Repo: "go-io",
PID: mustPID(),
Status: "running",
Agent: "codex:gpt-5.4",
Repo: "go-io",
PID: proc.Info().PID,
ProcessID: proc.ID,
}
fs.Write(core.JoinPath(ws, "status.json"), core.JSONMarshalString(st))
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
// Looking for a model that doesn't match any running workspace
@ -362,14 +383,15 @@ func TestQueue_CountRunningByModel_Ugly_ModelMismatch(t *testing.T) {
} {
d := core.JoinPath(wsRoot, ws.name)
fs.EnsureDir(d)
st := &WorkspaceStatus{Status: "running", Agent: ws.agent, Repo: "test", PID: mustPID()}
proc := startManagedProcess(t, testCore)
st := &WorkspaceStatus{Status: "running", Agent: ws.agent, Repo: "test", PID: proc.Info().PID, ProcessID: proc.ID}
fs.Write(core.JoinPath(d, "status.json"), core.JSONMarshalString(st))
}
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
// countRunningByModel does exact match on agent string
@ -395,9 +417,9 @@ rates:
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
codePath: t.TempDir(),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
codePath: t.TempDir(),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
// sustained_delay is 0 → delayForAgent returns 0
@ -420,9 +442,9 @@ rates:
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
codePath: t.TempDir(),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
codePath: t.TempDir(),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
// Malformed reset_utc — strconv.Atoi fails → defaults to hour=6, min=0
@ -439,14 +461,16 @@ func TestQueue_DrainOne_Bad_QueuedButAtConcurrencyLimit(t *testing.T) {
t.Setenv("CORE_WORKSPACE", root)
wsRoot := core.JoinPath(root, "workspace")
// Create a running workspace that uses our PID
// Create a running workspace backed by a managed process.
wsRunning := core.JoinPath(wsRoot, "ws-running")
fs.EnsureDir(wsRunning)
proc := startManagedProcess(t, testCore)
stRunning := &WorkspaceStatus{
Status: "running",
Agent: "claude",
Repo: "go-io",
PID: mustPID(),
Status: "running",
Agent: "claude",
Repo: "go-io",
PID: proc.Info().PID,
ProcessID: proc.ID,
}
fs.Write(core.JoinPath(wsRunning, "status.json"), core.JSONMarshalString(stRunning))
@ -463,9 +487,9 @@ func TestQueue_DrainOne_Bad_QueuedButAtConcurrencyLimit(t *testing.T) {
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}),
codePath: t.TempDir(),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
codePath: t.TempDir(),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
// Queued workspace exists but agent is at concurrency limit → drainOne returns false
@ -485,7 +509,7 @@ func TestQueue_DrainOne_Ugly_QueuedButInBackoffWindow(t *testing.T) {
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
codePath: t.TempDir(),
codePath: t.TempDir(),
backoff: map[string]time.Time{
"codex": time.Now().Add(1 * time.Hour), // pool is backed off
},
@ -547,9 +571,9 @@ rates:
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
codePath: t.TempDir(),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
codePath: t.TempDir(),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
loaded := s.loadAgentsConfig()
@ -569,9 +593,9 @@ func TestQueue_LoadAgentsConfig_Bad(t *testing.T) {
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
codePath: t.TempDir(),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
codePath: t.TempDir(),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
// Should return defaults when YAML is corrupt
@ -587,9 +611,9 @@ func TestQueue_LoadAgentsConfig_Ugly(t *testing.T) {
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
codePath: t.TempDir(),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
codePath: t.TempDir(),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
loaded := s.loadAgentsConfig()
@ -614,10 +638,10 @@ func TestQueue_DrainQueue_Bad_FrozenQueueDoesNothing(t *testing.T) {
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
frozen: true, // queue is frozen
codePath: t.TempDir(),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
frozen: true, // queue is frozen
codePath: t.TempDir(),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
// Frozen queue returns immediately without draining

View file

@ -94,7 +94,7 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu
}
// Spawn agent via go-process
pid, _, err := s.spawnAgent(agent, prompt, wsDir)
pid, processID, _, err := s.spawnAgent(agent, prompt, wsDir)
if err != nil {
return nil, ResumeOutput{}, err
}
@ -102,6 +102,7 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu
// Update status
st.Status = "running"
st.PID = pid
st.ProcessID = processID
st.Runs++
st.Question = ""
writeStatus(wsDir, st)

View file

@ -175,6 +175,10 @@ func (s *PrepSubsystem) registerStatusTool(server *mcp.Server) {
func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, input StatusInput) (*mcp.CallToolResult, StatusOutput, error) {
statusFiles := WorkspaceStatusPaths()
var runtime *core.Core
if s.ServiceRuntime != nil {
runtime = s.Core()
}
var out StatusOutput
@ -189,9 +193,9 @@ func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, inpu
continue
}
// If status is "running", check if PID is still alive
if st.Status == "running" && st.PID > 0 {
if !PIDAlive(st.PID) {
// If status is "running", check whether the managed process is still alive.
if st.Status == "running" && (st.ProcessID != "" || st.PID > 0) {
if !ProcessAlive(runtime, st.ProcessID, st.PID) {
blockedPath := workspaceBlockedPath(wsDir)
if r := fs.Read(blockedPath); r.OK {
st.Status = "blocked"

View file

@ -260,7 +260,7 @@ func (m *Subsystem) Poke() {
// checkIdleAfterDelay waits briefly then checks if the fleet is genuinely idle.
// Only emits queue.drained when there are truly zero running or queued agents,
// verified by checking PIDs are alive, not just trusting status files.
// verified by checking managed processes are alive, not just trusting status files.
func (m *Subsystem) checkIdleAfterDelay() {
time.Sleep(5 * time.Second) // wait for queue drain to fill slots
if m.ServiceRuntime == nil {
@ -274,8 +274,12 @@ func (m *Subsystem) checkIdleAfterDelay() {
}
// countLiveWorkspaces counts workspaces that are genuinely active.
// For "running" status, verifies the PID is still alive.
// For "running" status, verifies the managed process is still alive.
func (m *Subsystem) countLiveWorkspaces() (running, queued int) {
var runtime *core.Core
if m.ServiceRuntime != nil {
runtime = m.Core()
}
for _, path := range agentic.WorkspaceStatusPaths() {
wsDir := core.PathDir(path)
r := agentic.ReadStatusResult(wsDir)
@ -288,7 +292,7 @@ func (m *Subsystem) countLiveWorkspaces() (running, queued int) {
}
switch st.Status {
case "running":
if st.PID > 0 && pidAlive(st.PID) {
if st.PID > 0 && processAlive(runtime, st.ProcessID, st.PID) {
running++
}
case "queued":
@ -298,9 +302,9 @@ func (m *Subsystem) countLiveWorkspaces() (running, queued int) {
return
}
// pidAlive checks whether a process is still running.
func pidAlive(pid int) bool {
return agentic.PIDAlive(pid)
// processAlive checks whether a managed process is still running.
func processAlive(c *core.Core, processID string, pid int) bool {
return agentic.ProcessAlive(c, processID, pid)
}
func (m *Subsystem) loop(ctx context.Context) {
@ -438,7 +442,7 @@ func (m *Subsystem) checkCompletions() string {
return ""
}
// Only emit queue.drained when genuinely empty — verified by live PID check
// Only emit queue.drained when genuinely empty — verified by live process checks
liveRunning, liveQueued := m.countLiveWorkspaces()
if m.ServiceRuntime != nil && liveRunning == 0 && liveQueued == 0 {
m.Core().ACTION(messages.QueueDrained{Completed: len(newlyCompleted)})

View file

@ -8,13 +8,13 @@ import (
"net/http"
"net/http/httptest"
"os"
"strconv"
"testing"
"time"
"dappco.re/go/agent/pkg/agentic"
"dappco.re/go/agent/pkg/messages"
core "dappco.re/go/core"
"dappco.re/go/core/process"
"github.com/modelcontextprotocol/go-sdk/mcp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -71,6 +71,24 @@ func writeWorkspaceStatus(t *testing.T, wsRoot, name string, fields map[string]a
return dir
}
func startManagedProcess(t *testing.T, c *core.Core) *process.Process {
t.Helper()
r := c.Process().Start(context.Background(), core.NewOptions(
core.Option{Key: "command", Value: "sleep"},
core.Option{Key: "args", Value: []string{"30"}},
core.Option{Key: "detach", Value: true},
))
require.True(t, r.OK)
proc, ok := r.Value.(*process.Process)
require.True(t, ok)
t.Cleanup(func() {
_ = proc.Kill()
})
return proc
}
// --- New ---
func TestMonitor_New_Good_Defaults(t *testing.T) {
@ -328,37 +346,40 @@ func TestMonitor_CountLiveWorkspaces_Good_RunningLivePID(t *testing.T) {
wsRoot := t.TempDir()
t.Setenv("CORE_WORKSPACE", wsRoot)
pid, _ := strconv.Atoi(core.Env("PID"))
proc := startManagedProcess(t, testMon.Core())
pid := proc.Info().PID
writeWorkspaceStatus(t, wsRoot, "ws-live", map[string]any{
"status": "running",
"repo": "go-io",
"agent": "codex",
"pid": pid,
"status": "running",
"repo": "go-io",
"agent": "codex",
"pid": pid,
"process_id": proc.ID,
})
mon := New()
mon.ServiceRuntime = testMon.ServiceRuntime
running, queued := mon.countLiveWorkspaces()
assert.Equal(t, 1, running)
assert.Equal(t, 0, queued)
}
// --- pidAlive ---
// --- processAlive ---
func TestMonitor_PidAlive_Good_CurrentProcess(t *testing.T) {
pid, _ := strconv.Atoi(core.Env("PID"))
assert.True(t, pidAlive(pid), "current process must be alive")
func TestMonitor_ProcessAlive_Good_ManagedProcess(t *testing.T) {
proc := startManagedProcess(t, testMon.Core())
assert.True(t, processAlive(testMon.Core(), proc.ID, proc.Info().PID), "managed process must be alive")
}
func TestMonitor_PidAlive_Bad_DeadPID(t *testing.T) {
assert.False(t, pidAlive(99999999))
func TestMonitor_ProcessAlive_Bad_DeadPID(t *testing.T) {
assert.False(t, processAlive(testMon.Core(), "", 99999999))
}
func TestMonitor_PidAlive_Ugly_ZeroPID(t *testing.T) {
assert.NotPanics(t, func() { pidAlive(0) })
func TestMonitor_ProcessAlive_Ugly_ZeroPID(t *testing.T) {
assert.NotPanics(t, func() { processAlive(testMon.Core(), "", 0) })
}
func TestMonitor_PidAlive_Ugly_NegativePID(t *testing.T) {
assert.NotPanics(t, func() { pidAlive(-1) })
func TestMonitor_ProcessAlive_Ugly_NegativePID(t *testing.T) {
assert.NotPanics(t, func() { processAlive(testMon.Core(), "", -1) })
}
// --- OnStartup / OnShutdown ---

View file

@ -161,6 +161,10 @@ func (s *Service) canDispatchAgent(agent string) (bool, string) {
//
// n := s.countRunningByAgent("codex")
func (s *Service) countRunningByAgent(agent string) int {
var runtime *core.Core
if s.ServiceRuntime != nil {
runtime = s.Core()
}
count := 0
s.workspaces.Each(func(_ string, st *WorkspaceStatus) {
if st.Status != "running" || baseAgent(st.Agent) != agent {
@ -169,7 +173,7 @@ func (s *Service) countRunningByAgent(agent string) int {
switch {
case st.PID < 0:
count++
case st.PID > 0 && agentic.PIDAlive(st.PID):
case st.PID > 0 && agentic.ProcessAlive(runtime, "", st.PID):
count++
}
})
@ -180,6 +184,10 @@ func (s *Service) countRunningByAgent(agent string) int {
//
// n := s.countRunningByModel("codex:gpt-5.4")
func (s *Service) countRunningByModel(agent string) int {
var runtime *core.Core
if s.ServiceRuntime != nil {
runtime = s.Core()
}
count := 0
s.workspaces.Each(func(_ string, st *WorkspaceStatus) {
if st.Status != "running" || st.Agent != agent {
@ -188,7 +196,7 @@ func (s *Service) countRunningByModel(agent string) int {
switch {
case st.PID < 0:
count++
case st.PID > 0 && agentic.PIDAlive(st.PID):
case st.PID > 0 && agentic.ProcessAlive(runtime, "", st.PID):
count++
}
})

View file

@ -334,10 +334,14 @@ func (s *Service) actionStop(_ context.Context, _ core.Options) core.Result {
func (s *Service) actionKill(_ context.Context, _ core.Options) core.Result {
s.frozen = true
var runtime *core.Core
if s.ServiceRuntime != nil {
runtime = s.Core()
}
killed := 0
s.workspaces.Each(func(_ string, st *WorkspaceStatus) {
if st.Status == "running" && st.PID > 0 {
if agentic.PIDTerminate(st.PID) {
if agentic.ProcessTerminate(runtime, "", st.PID) {
killed++
}
st.Status = "failed"