feat(agent/process): add Timeout + GracePeriod + KillGroup to dispatch (#540)
Per RFC.pipeline.md "go-process Improvements Needed": hung agent processes blocked dispatch slots forever. Now killed after configured timeout, with SIGTERM-then-SIGKILL grace period and process-group kill to prevent orphaned subprocesses. Lands: * pkg/agentic/dispatch.go — every c.Process().Run() that spawns an agent now passes Timeout (DispatchConfig.TimeoutMinutes, default 60), GracePeriod: 30s, KillGroup: true. Watchdog writes timeout-specific failure reason into workspace status. * pkg/agentic/queue.go — DispatchConfig adds TimeoutMinutes int (YAML: timeout_minutes, default 60) so operators can tune per-deployment. * dispatch_test.go — TestDispatch_Run_Bad_Timeout asserts slow process transitions to failed state with timeout reason * queue_test.go — TestQueue_Config_Good_TimeoutDefault asserts default 60 Verified go-process exposes timeout/gracePeriod/killGroup option keys in the local checkout — no BLOCKED sibling needed. Plain go build blocked by unrelated go.work conflict + sibling go-ws coreerr.Warn missing (out of allowlist); supervisor's clean workspace build will catch any remaining compile. Co-authored-by: Codex <noreply@openai.com> Closes tasks.lthn.sh/view.php?id=540
This commit is contained in:
parent
f96bd67bd6
commit
d47946ff82
4 changed files with 188 additions and 7 deletions
|
|
@ -13,6 +13,8 @@ import (
|
|||
"github.com/modelcontextprotocol/go-sdk/mcp"
|
||||
)
|
||||
|
||||
const dispatchGracePeriod = 30 * time.Second
|
||||
|
||||
type workspaceTracker interface {
|
||||
TrackWorkspace(name string, status any)
|
||||
}
|
||||
|
|
@ -335,6 +337,85 @@ func (s *PrepSubsystem) dispatchGPU() bool {
|
|||
return dispatchConfig.GPU
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) dispatchTimeout() time.Duration {
|
||||
timeoutMinutes := defaultDispatchTimeoutMinutes
|
||||
if s != nil && s.ServiceRuntime != nil {
|
||||
dispatchConfig, ok := s.Core().Config().Get("agents.dispatch").Value.(DispatchConfig)
|
||||
if ok {
|
||||
timeoutMinutes = normaliseDispatchConfig(dispatchConfig).TimeoutMinutes
|
||||
}
|
||||
}
|
||||
return time.Duration(timeoutMinutes) * time.Minute
|
||||
}
|
||||
|
||||
func dispatchRunOptions(command string, args []string, runDir string, timeout time.Duration) process.RunOptions {
|
||||
return process.RunOptions{
|
||||
Command: command,
|
||||
Args: args,
|
||||
Dir: runDir,
|
||||
Detach: true,
|
||||
Timeout: timeout,
|
||||
GracePeriod: dispatchGracePeriod,
|
||||
KillGroup: true,
|
||||
}
|
||||
}
|
||||
|
||||
func dispatchTimeoutReason(timeout time.Duration) string {
|
||||
switch {
|
||||
case timeout > 0 && timeout%time.Minute == 0:
|
||||
return core.Sprintf("Agent timed out after %dm", int(timeout/time.Minute))
|
||||
case timeout > 0 && timeout%time.Second == 0:
|
||||
return core.Sprintf("Agent timed out after %ds", int(timeout/time.Second))
|
||||
default:
|
||||
return core.Sprintf("Agent timed out after %s", timeout.String())
|
||||
}
|
||||
}
|
||||
|
||||
func workspaceTimeoutPath(workspaceDir string) string {
|
||||
return core.JoinPath(WorkspaceMetaDir(workspaceDir), "timeout.reason")
|
||||
}
|
||||
|
||||
func dispatchTimeoutReasonFromWorkspace(workspaceDir string) string {
|
||||
result := fs.Read(workspaceTimeoutPath(workspaceDir))
|
||||
if !result.OK {
|
||||
return ""
|
||||
}
|
||||
return core.Trim(result.Value.(string))
|
||||
}
|
||||
|
||||
func clearDispatchTimeoutReason(workspaceDir string) {
|
||||
deleteResult := fs.Delete(workspaceTimeoutPath(workspaceDir))
|
||||
if !deleteResult.OK && fs.Exists(workspaceTimeoutPath(workspaceDir)) {
|
||||
core.Warn("agentic: failed to remove timeout marker", "path", workspaceTimeoutPath(workspaceDir), "reason", deleteResult.Value)
|
||||
}
|
||||
}
|
||||
|
||||
func startDispatchTimeoutWatch(workspaceDir string, timeout time.Duration, proc completionProcess) {
|
||||
if timeout <= 0 || proc == nil {
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
timer := time.NewTimer(timeout)
|
||||
defer timer.Stop()
|
||||
|
||||
select {
|
||||
case <-proc.Done():
|
||||
return
|
||||
case <-timer.C:
|
||||
select {
|
||||
case <-proc.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
writeResult := fs.WriteAtomic(workspaceTimeoutPath(workspaceDir), dispatchTimeoutReason(timeout))
|
||||
if !writeResult.OK {
|
||||
core.Warn("agentic: failed to write timeout marker", "path", workspaceTimeoutPath(workspaceDir), "reason", writeResult.Value)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// command, args := containerCommand("codex", []string{"exec", "--model", "gpt-5.4"}, "/srv/.core/workspace/core/go-io/task-5", "/srv/.core/workspace/core/go-io/task-5/.meta")
|
||||
func containerCommand(command string, args []string, workspaceDir, metaDir string) (string, []string) {
|
||||
return containerCommandFor(RuntimeDocker, defaultDockerImage, false, command, args, workspaceDir, metaDir)
|
||||
|
|
@ -564,6 +645,13 @@ func (s *PrepSubsystem) onAgentComplete(agent, workspaceDir, outputFile string,
|
|||
|
||||
repoDir := WorkspaceRepoDir(workspaceDir)
|
||||
finalStatus, question := detectFinalStatus(repoDir, exitCode, processStatus)
|
||||
if finalStatus != "blocked" {
|
||||
if timeoutReason := dispatchTimeoutReasonFromWorkspace(workspaceDir); timeoutReason != "" {
|
||||
finalStatus = "failed"
|
||||
question = timeoutReason
|
||||
}
|
||||
}
|
||||
clearDispatchTimeoutReason(workspaceDir)
|
||||
|
||||
result := ReadStatusResult(workspaceDir)
|
||||
workspaceStatus, ok := workspaceStatusValue(result)
|
||||
|
|
@ -595,6 +683,7 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, workspaceDir string) (int, str
|
|||
if deleteResult := fs.Delete(WorkspaceBlockedPath(workspaceDir)); !deleteResult.OK {
|
||||
core.Warn("agentic: failed to remove blocked marker", "path", WorkspaceBlockedPath(workspaceDir), "reason", deleteResult.Value)
|
||||
}
|
||||
clearDispatchTimeoutReason(workspaceDir)
|
||||
|
||||
if !isNativeAgent(agent) {
|
||||
runtimeName := resolveContainerRuntime(s.dispatchRuntime())
|
||||
|
|
@ -616,17 +705,13 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, workspaceDir string) (int, str
|
|||
runDir = WorkspaceRepoDir(workspaceDir)
|
||||
}
|
||||
|
||||
proc, err := procSvc.StartWithOptions(context.Background(), process.RunOptions{
|
||||
Command: command,
|
||||
Args: args,
|
||||
Dir: runDir,
|
||||
Detach: true,
|
||||
})
|
||||
proc, err := procSvc.StartWithOptions(context.Background(), dispatchRunOptions(command, args, runDir, s.dispatchTimeout()))
|
||||
if err != nil {
|
||||
return 0, "", "", core.E("dispatch.spawnAgent", core.Concat("failed to spawn ", agent), err)
|
||||
}
|
||||
|
||||
proc.CloseStdin()
|
||||
startDispatchTimeoutWatch(workspaceDir, s.dispatchTimeout(), proc)
|
||||
pid := proc.Info().PID
|
||||
processID := proc.ID
|
||||
|
||||
|
|
|
|||
|
|
@ -437,6 +437,73 @@ func TestDispatch_OnAgentComplete_Ugly(t *testing.T) {
|
|||
assert.False(t, fs.Exists(core.JoinPath(metaDir, "agent-codex.log")))
|
||||
}
|
||||
|
||||
func TestDispatch_Run_Bad_Timeout(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
setTestWorkspace(t, root)
|
||||
|
||||
wsDir := core.JoinPath(root, "ws-timeout")
|
||||
repoDir := core.JoinPath(wsDir, "repo")
|
||||
metaDir := core.JoinPath(wsDir, ".meta")
|
||||
require.True(t, fs.EnsureDir(repoDir).OK)
|
||||
require.True(t, fs.EnsureDir(metaDir).OK)
|
||||
|
||||
st := &WorkspaceStatus{
|
||||
Status: "running",
|
||||
Agent: "codex",
|
||||
Repo: "go-io",
|
||||
StartedAt: time.Now(),
|
||||
}
|
||||
require.NoError(t, writeStatus(wsDir, st))
|
||||
|
||||
processResult := testCore.Service("process")
|
||||
require.True(t, processResult.OK)
|
||||
procSvc, ok := processResult.Value.(*process.Service)
|
||||
require.True(t, ok)
|
||||
|
||||
timeout := 100 * time.Millisecond
|
||||
opts := dispatchRunOptions("sleep", []string{"60"}, repoDir, timeout)
|
||||
assert.Equal(t, timeout, opts.Timeout)
|
||||
assert.Equal(t, dispatchGracePeriod, opts.GracePeriod)
|
||||
assert.True(t, opts.KillGroup)
|
||||
assert.True(t, opts.Detach)
|
||||
|
||||
proc, err := procSvc.StartWithOptions(context.Background(), opts)
|
||||
require.NoError(t, err)
|
||||
proc.CloseStdin()
|
||||
|
||||
s := newPrepWithProcess()
|
||||
s.workspaces = core.NewRegistry[*WorkspaceStatus]()
|
||||
startDispatchTimeoutWatch(wsDir, timeout, proc)
|
||||
|
||||
monitor := &agentCompletionMonitor{
|
||||
service: s,
|
||||
agent: "codex",
|
||||
workspaceDir: wsDir,
|
||||
outputFile: core.JoinPath(metaDir, "agent-codex.log"),
|
||||
process: proc,
|
||||
}
|
||||
|
||||
r := monitor.run(context.Background(), core.NewOptions())
|
||||
assert.True(t, r.OK)
|
||||
|
||||
info := proc.Info()
|
||||
assert.Equal(t, process.StatusKilled, info.Status)
|
||||
|
||||
updated := mustReadStatus(t, wsDir)
|
||||
assert.Equal(t, "failed", updated.Status)
|
||||
assert.Equal(t, dispatchTimeoutReason(timeout), updated.Question)
|
||||
assert.Equal(t, 0, updated.PID)
|
||||
|
||||
registryResult := s.workspaces.Get(WorkspaceName(wsDir))
|
||||
require.True(t, registryResult.OK)
|
||||
registryStatus, ok := registryResult.Value.(*WorkspaceStatus)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, "failed", registryStatus.Status)
|
||||
assert.Equal(t, dispatchTimeoutReason(timeout), registryStatus.Question)
|
||||
|
||||
assert.False(t, fs.Exists(workspaceTimeoutPath(wsDir)))
|
||||
}
|
||||
|
||||
// --- runQA ---
|
||||
|
||||
func TestDispatch_RunQA_Good(t *testing.T) {
|
||||
|
|
|
|||
|
|
@ -10,11 +10,16 @@ import (
|
|||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// config := agentic.DispatchConfig{DefaultAgent: "claude", DefaultTemplate: "coding", Runtime: "auto", Image: "core-dev"}
|
||||
const defaultDispatchTimeoutMinutes = 60
|
||||
|
||||
// config := agentic.DispatchConfig{DefaultAgent: "claude", DefaultTemplate: "coding", Runtime: "auto", Image: "core-dev", TimeoutMinutes: 60}
|
||||
type DispatchConfig struct {
|
||||
DefaultAgent string `yaml:"default_agent"`
|
||||
DefaultTemplate string `yaml:"default_template"`
|
||||
WorkspaceRoot string `yaml:"workspace_root"`
|
||||
// TimeoutMinutes bounds agent runtime before dispatch marks the workspace
|
||||
// failed and go-process shuts the process tree down.
|
||||
TimeoutMinutes int `yaml:"timeout_minutes"`
|
||||
// Runtime selects the container runtime — auto | apple | docker | podman.
|
||||
// auto detects in preference order: Apple Container -> Docker -> Podman.
|
||||
// Apple Containers (macOS 26+) provide hardware VM isolation and sub-second
|
||||
|
|
@ -103,6 +108,13 @@ type AgentsConfig struct {
|
|||
Agents map[string]AgentIdentity `yaml:"agents"`
|
||||
}
|
||||
|
||||
func normaliseDispatchConfig(config DispatchConfig) DispatchConfig {
|
||||
if config.TimeoutMinutes <= 0 {
|
||||
config.TimeoutMinutes = defaultDispatchTimeoutMinutes
|
||||
}
|
||||
return config
|
||||
}
|
||||
|
||||
// config := s.loadAgentsConfig()
|
||||
func (s *PrepSubsystem) loadAgentsConfig() *AgentsConfig {
|
||||
paths := []string{
|
||||
|
|
@ -119,6 +131,7 @@ func (s *PrepSubsystem) loadAgentsConfig() *AgentsConfig {
|
|||
if err := yaml.Unmarshal([]byte(readResult.Value.(string)), &config); err != nil {
|
||||
continue
|
||||
}
|
||||
config.Dispatch = normaliseDispatchConfig(config.Dispatch)
|
||||
setWorkspaceRootOverride(config.Dispatch.WorkspaceRoot)
|
||||
return &config
|
||||
}
|
||||
|
|
@ -128,6 +141,7 @@ func (s *PrepSubsystem) loadAgentsConfig() *AgentsConfig {
|
|||
Dispatch: DispatchConfig{
|
||||
DefaultAgent: "claude",
|
||||
DefaultTemplate: "coding",
|
||||
TimeoutMinutes: defaultDispatchTimeoutMinutes,
|
||||
},
|
||||
Concurrency: map[string]ConcurrencyLimit{
|
||||
"claude": {Total: 1},
|
||||
|
|
|
|||
|
|
@ -26,10 +26,23 @@ func TestQueue_DispatchConfig_Good_Defaults(t *testing.T) {
|
|||
cfg := s.loadAgentsConfig()
|
||||
assert.Equal(t, "claude", cfg.Dispatch.DefaultAgent)
|
||||
assert.Equal(t, "coding", cfg.Dispatch.DefaultTemplate)
|
||||
assert.Equal(t, 60, cfg.Dispatch.TimeoutMinutes)
|
||||
assert.Equal(t, 1, cfg.Concurrency["claude"].Total)
|
||||
assert.Equal(t, 3, cfg.Concurrency["gemini"].Total)
|
||||
}
|
||||
|
||||
func TestQueue_Config_Good_TimeoutDefault(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
setTestWorkspace(t, root)
|
||||
require.True(t, fs.Write(core.JoinPath(root, "agents.yaml"), "version: 1\ndispatch:\n default_agent: codex\n").OK)
|
||||
t.Cleanup(func() { setWorkspaceRootOverride("") })
|
||||
|
||||
s := &PrepSubsystem{ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), codePath: t.TempDir()}
|
||||
cfg := s.loadAgentsConfig()
|
||||
|
||||
assert.Equal(t, 60, cfg.Dispatch.TimeoutMinutes)
|
||||
}
|
||||
|
||||
func TestQueue_DispatchConfig_Good_RuntimeImageGPUFromYAML(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
setTestWorkspace(t, root)
|
||||
|
|
@ -39,6 +52,7 @@ func TestQueue_DispatchConfig_Good_RuntimeImageGPUFromYAML(t *testing.T) {
|
|||
" runtime: apple\n",
|
||||
" image: core-ml\n",
|
||||
" gpu: true\n",
|
||||
" timeout_minutes: 45\n",
|
||||
)).OK)
|
||||
|
||||
t.Cleanup(func() {
|
||||
|
|
@ -51,6 +65,7 @@ func TestQueue_DispatchConfig_Good_RuntimeImageGPUFromYAML(t *testing.T) {
|
|||
assert.Equal(t, "apple", cfg.Dispatch.Runtime)
|
||||
assert.Equal(t, "core-ml", cfg.Dispatch.Image)
|
||||
assert.True(t, cfg.Dispatch.GPU)
|
||||
assert.Equal(t, 45, cfg.Dispatch.TimeoutMinutes)
|
||||
}
|
||||
|
||||
func TestQueue_DispatchConfig_Bad_OmittedRuntimeFields(t *testing.T) {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue