From d47946ff82e71c6ec23aeb2f4e8871929205c6b5 Mon Sep 17 00:00:00 2001 From: Snider Date: Sat, 25 Apr 2026 23:08:19 +0100 Subject: [PATCH] feat(agent/process): add Timeout + GracePeriod + KillGroup to dispatch (#540) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per RFC.pipeline.md "go-process Improvements Needed": hung agent processes blocked dispatch slots forever. Now killed after configured timeout, with SIGTERM-then-SIGKILL grace period and process-group kill to prevent orphaned subprocesses. Lands: * pkg/agentic/dispatch.go — every c.Process().Run() that spawns an agent now passes Timeout (DispatchConfig.TimeoutMinutes, default 60), GracePeriod: 30s, KillGroup: true. Watchdog writes timeout-specific failure reason into workspace status. * pkg/agentic/queue.go — DispatchConfig adds TimeoutMinutes int (YAML: timeout_minutes, default 60) so operators can tune per-deployment. * dispatch_test.go — TestDispatch_Run_Bad_Timeout asserts slow process transitions to failed state with timeout reason * queue_test.go — TestQueue_Config_Good_TimeoutDefault asserts default 60 Verified go-process exposes timeout/gracePeriod/killGroup option keys in the local checkout — no BLOCKED sibling needed. Plain go build blocked by unrelated go.work conflict + sibling go-ws coreerr.Warn missing (out of allowlist); supervisor's clean workspace build will catch any remaining compile. Co-authored-by: Codex Closes tasks.lthn.sh/view.php?id=540 --- pkg/agentic/dispatch.go | 97 +++++++++++++++++++++++++++++++++--- pkg/agentic/dispatch_test.go | 67 +++++++++++++++++++++++++ pkg/agentic/queue.go | 16 +++++- pkg/agentic/queue_test.go | 15 ++++++ 4 files changed, 188 insertions(+), 7 deletions(-) diff --git a/pkg/agentic/dispatch.go b/pkg/agentic/dispatch.go index d9b73ee..267c5b9 100644 --- a/pkg/agentic/dispatch.go +++ b/pkg/agentic/dispatch.go @@ -13,6 +13,8 @@ import ( "github.com/modelcontextprotocol/go-sdk/mcp" ) +const dispatchGracePeriod = 30 * time.Second + type workspaceTracker interface { TrackWorkspace(name string, status any) } @@ -335,6 +337,85 @@ func (s *PrepSubsystem) dispatchGPU() bool { return dispatchConfig.GPU } +func (s *PrepSubsystem) dispatchTimeout() time.Duration { + timeoutMinutes := defaultDispatchTimeoutMinutes + if s != nil && s.ServiceRuntime != nil { + dispatchConfig, ok := s.Core().Config().Get("agents.dispatch").Value.(DispatchConfig) + if ok { + timeoutMinutes = normaliseDispatchConfig(dispatchConfig).TimeoutMinutes + } + } + return time.Duration(timeoutMinutes) * time.Minute +} + +func dispatchRunOptions(command string, args []string, runDir string, timeout time.Duration) process.RunOptions { + return process.RunOptions{ + Command: command, + Args: args, + Dir: runDir, + Detach: true, + Timeout: timeout, + GracePeriod: dispatchGracePeriod, + KillGroup: true, + } +} + +func dispatchTimeoutReason(timeout time.Duration) string { + switch { + case timeout > 0 && timeout%time.Minute == 0: + return core.Sprintf("Agent timed out after %dm", int(timeout/time.Minute)) + case timeout > 0 && timeout%time.Second == 0: + return core.Sprintf("Agent timed out after %ds", int(timeout/time.Second)) + default: + return core.Sprintf("Agent timed out after %s", timeout.String()) + } +} + +func workspaceTimeoutPath(workspaceDir string) string { + return core.JoinPath(WorkspaceMetaDir(workspaceDir), "timeout.reason") +} + +func dispatchTimeoutReasonFromWorkspace(workspaceDir string) string { + result := fs.Read(workspaceTimeoutPath(workspaceDir)) + if !result.OK { + return "" + } + return core.Trim(result.Value.(string)) +} + +func clearDispatchTimeoutReason(workspaceDir string) { + deleteResult := fs.Delete(workspaceTimeoutPath(workspaceDir)) + if !deleteResult.OK && fs.Exists(workspaceTimeoutPath(workspaceDir)) { + core.Warn("agentic: failed to remove timeout marker", "path", workspaceTimeoutPath(workspaceDir), "reason", deleteResult.Value) + } +} + +func startDispatchTimeoutWatch(workspaceDir string, timeout time.Duration, proc completionProcess) { + if timeout <= 0 || proc == nil { + return + } + + go func() { + timer := time.NewTimer(timeout) + defer timer.Stop() + + select { + case <-proc.Done(): + return + case <-timer.C: + select { + case <-proc.Done(): + return + default: + } + writeResult := fs.WriteAtomic(workspaceTimeoutPath(workspaceDir), dispatchTimeoutReason(timeout)) + if !writeResult.OK { + core.Warn("agentic: failed to write timeout marker", "path", workspaceTimeoutPath(workspaceDir), "reason", writeResult.Value) + } + } + }() +} + // command, args := containerCommand("codex", []string{"exec", "--model", "gpt-5.4"}, "/srv/.core/workspace/core/go-io/task-5", "/srv/.core/workspace/core/go-io/task-5/.meta") func containerCommand(command string, args []string, workspaceDir, metaDir string) (string, []string) { return containerCommandFor(RuntimeDocker, defaultDockerImage, false, command, args, workspaceDir, metaDir) @@ -564,6 +645,13 @@ func (s *PrepSubsystem) onAgentComplete(agent, workspaceDir, outputFile string, repoDir := WorkspaceRepoDir(workspaceDir) finalStatus, question := detectFinalStatus(repoDir, exitCode, processStatus) + if finalStatus != "blocked" { + if timeoutReason := dispatchTimeoutReasonFromWorkspace(workspaceDir); timeoutReason != "" { + finalStatus = "failed" + question = timeoutReason + } + } + clearDispatchTimeoutReason(workspaceDir) result := ReadStatusResult(workspaceDir) workspaceStatus, ok := workspaceStatusValue(result) @@ -595,6 +683,7 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, workspaceDir string) (int, str if deleteResult := fs.Delete(WorkspaceBlockedPath(workspaceDir)); !deleteResult.OK { core.Warn("agentic: failed to remove blocked marker", "path", WorkspaceBlockedPath(workspaceDir), "reason", deleteResult.Value) } + clearDispatchTimeoutReason(workspaceDir) if !isNativeAgent(agent) { runtimeName := resolveContainerRuntime(s.dispatchRuntime()) @@ -616,17 +705,13 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, workspaceDir string) (int, str runDir = WorkspaceRepoDir(workspaceDir) } - proc, err := procSvc.StartWithOptions(context.Background(), process.RunOptions{ - Command: command, - Args: args, - Dir: runDir, - Detach: true, - }) + proc, err := procSvc.StartWithOptions(context.Background(), dispatchRunOptions(command, args, runDir, s.dispatchTimeout())) if err != nil { return 0, "", "", core.E("dispatch.spawnAgent", core.Concat("failed to spawn ", agent), err) } proc.CloseStdin() + startDispatchTimeoutWatch(workspaceDir, s.dispatchTimeout(), proc) pid := proc.Info().PID processID := proc.ID diff --git a/pkg/agentic/dispatch_test.go b/pkg/agentic/dispatch_test.go index 685208c..8a9f6e1 100644 --- a/pkg/agentic/dispatch_test.go +++ b/pkg/agentic/dispatch_test.go @@ -437,6 +437,73 @@ func TestDispatch_OnAgentComplete_Ugly(t *testing.T) { assert.False(t, fs.Exists(core.JoinPath(metaDir, "agent-codex.log"))) } +func TestDispatch_Run_Bad_Timeout(t *testing.T) { + root := t.TempDir() + setTestWorkspace(t, root) + + wsDir := core.JoinPath(root, "ws-timeout") + repoDir := core.JoinPath(wsDir, "repo") + metaDir := core.JoinPath(wsDir, ".meta") + require.True(t, fs.EnsureDir(repoDir).OK) + require.True(t, fs.EnsureDir(metaDir).OK) + + st := &WorkspaceStatus{ + Status: "running", + Agent: "codex", + Repo: "go-io", + StartedAt: time.Now(), + } + require.NoError(t, writeStatus(wsDir, st)) + + processResult := testCore.Service("process") + require.True(t, processResult.OK) + procSvc, ok := processResult.Value.(*process.Service) + require.True(t, ok) + + timeout := 100 * time.Millisecond + opts := dispatchRunOptions("sleep", []string{"60"}, repoDir, timeout) + assert.Equal(t, timeout, opts.Timeout) + assert.Equal(t, dispatchGracePeriod, opts.GracePeriod) + assert.True(t, opts.KillGroup) + assert.True(t, opts.Detach) + + proc, err := procSvc.StartWithOptions(context.Background(), opts) + require.NoError(t, err) + proc.CloseStdin() + + s := newPrepWithProcess() + s.workspaces = core.NewRegistry[*WorkspaceStatus]() + startDispatchTimeoutWatch(wsDir, timeout, proc) + + monitor := &agentCompletionMonitor{ + service: s, + agent: "codex", + workspaceDir: wsDir, + outputFile: core.JoinPath(metaDir, "agent-codex.log"), + process: proc, + } + + r := monitor.run(context.Background(), core.NewOptions()) + assert.True(t, r.OK) + + info := proc.Info() + assert.Equal(t, process.StatusKilled, info.Status) + + updated := mustReadStatus(t, wsDir) + assert.Equal(t, "failed", updated.Status) + assert.Equal(t, dispatchTimeoutReason(timeout), updated.Question) + assert.Equal(t, 0, updated.PID) + + registryResult := s.workspaces.Get(WorkspaceName(wsDir)) + require.True(t, registryResult.OK) + registryStatus, ok := registryResult.Value.(*WorkspaceStatus) + require.True(t, ok) + assert.Equal(t, "failed", registryStatus.Status) + assert.Equal(t, dispatchTimeoutReason(timeout), registryStatus.Question) + + assert.False(t, fs.Exists(workspaceTimeoutPath(wsDir))) +} + // --- runQA --- func TestDispatch_RunQA_Good(t *testing.T) { diff --git a/pkg/agentic/queue.go b/pkg/agentic/queue.go index 2b186d9..fdb4e26 100644 --- a/pkg/agentic/queue.go +++ b/pkg/agentic/queue.go @@ -10,11 +10,16 @@ import ( "gopkg.in/yaml.v3" ) -// config := agentic.DispatchConfig{DefaultAgent: "claude", DefaultTemplate: "coding", Runtime: "auto", Image: "core-dev"} +const defaultDispatchTimeoutMinutes = 60 + +// config := agentic.DispatchConfig{DefaultAgent: "claude", DefaultTemplate: "coding", Runtime: "auto", Image: "core-dev", TimeoutMinutes: 60} type DispatchConfig struct { DefaultAgent string `yaml:"default_agent"` DefaultTemplate string `yaml:"default_template"` WorkspaceRoot string `yaml:"workspace_root"` + // TimeoutMinutes bounds agent runtime before dispatch marks the workspace + // failed and go-process shuts the process tree down. + TimeoutMinutes int `yaml:"timeout_minutes"` // Runtime selects the container runtime — auto | apple | docker | podman. // auto detects in preference order: Apple Container -> Docker -> Podman. // Apple Containers (macOS 26+) provide hardware VM isolation and sub-second @@ -103,6 +108,13 @@ type AgentsConfig struct { Agents map[string]AgentIdentity `yaml:"agents"` } +func normaliseDispatchConfig(config DispatchConfig) DispatchConfig { + if config.TimeoutMinutes <= 0 { + config.TimeoutMinutes = defaultDispatchTimeoutMinutes + } + return config +} + // config := s.loadAgentsConfig() func (s *PrepSubsystem) loadAgentsConfig() *AgentsConfig { paths := []string{ @@ -119,6 +131,7 @@ func (s *PrepSubsystem) loadAgentsConfig() *AgentsConfig { if err := yaml.Unmarshal([]byte(readResult.Value.(string)), &config); err != nil { continue } + config.Dispatch = normaliseDispatchConfig(config.Dispatch) setWorkspaceRootOverride(config.Dispatch.WorkspaceRoot) return &config } @@ -128,6 +141,7 @@ func (s *PrepSubsystem) loadAgentsConfig() *AgentsConfig { Dispatch: DispatchConfig{ DefaultAgent: "claude", DefaultTemplate: "coding", + TimeoutMinutes: defaultDispatchTimeoutMinutes, }, Concurrency: map[string]ConcurrencyLimit{ "claude": {Total: 1}, diff --git a/pkg/agentic/queue_test.go b/pkg/agentic/queue_test.go index 5e3f8e2..7bbcd4e 100644 --- a/pkg/agentic/queue_test.go +++ b/pkg/agentic/queue_test.go @@ -26,10 +26,23 @@ func TestQueue_DispatchConfig_Good_Defaults(t *testing.T) { cfg := s.loadAgentsConfig() assert.Equal(t, "claude", cfg.Dispatch.DefaultAgent) assert.Equal(t, "coding", cfg.Dispatch.DefaultTemplate) + assert.Equal(t, 60, cfg.Dispatch.TimeoutMinutes) assert.Equal(t, 1, cfg.Concurrency["claude"].Total) assert.Equal(t, 3, cfg.Concurrency["gemini"].Total) } +func TestQueue_Config_Good_TimeoutDefault(t *testing.T) { + root := t.TempDir() + setTestWorkspace(t, root) + require.True(t, fs.Write(core.JoinPath(root, "agents.yaml"), "version: 1\ndispatch:\n default_agent: codex\n").OK) + t.Cleanup(func() { setWorkspaceRootOverride("") }) + + s := &PrepSubsystem{ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), codePath: t.TempDir()} + cfg := s.loadAgentsConfig() + + assert.Equal(t, 60, cfg.Dispatch.TimeoutMinutes) +} + func TestQueue_DispatchConfig_Good_RuntimeImageGPUFromYAML(t *testing.T) { root := t.TempDir() setTestWorkspace(t, root) @@ -39,6 +52,7 @@ func TestQueue_DispatchConfig_Good_RuntimeImageGPUFromYAML(t *testing.T) { " runtime: apple\n", " image: core-ml\n", " gpu: true\n", + " timeout_minutes: 45\n", )).OK) t.Cleanup(func() { @@ -51,6 +65,7 @@ func TestQueue_DispatchConfig_Good_RuntimeImageGPUFromYAML(t *testing.T) { assert.Equal(t, "apple", cfg.Dispatch.Runtime) assert.Equal(t, "core-ml", cfg.Dispatch.Image) assert.True(t, cfg.Dispatch.GPU) + assert.Equal(t, 45, cfg.Dispatch.TimeoutMinutes) } func TestQueue_DispatchConfig_Bad_OmittedRuntimeFields(t *testing.T) {