From 780b22a3f70b8e747f3b401f5d98f5ee2f0f03c4 Mon Sep 17 00:00:00 2001 From: Virgil Date: Wed, 1 Apr 2026 16:20:06 +0000 Subject: [PATCH] feat(runner): clear queued workspaces on hard shutdown Co-Authored-By: Virgil --- pkg/runner/runner.go | 43 ++++++++++++++++++++++++------ pkg/runner/runner_test.go | 56 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 8 deletions(-) diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index f64c63d..1174c22 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -324,19 +324,46 @@ func (s *Service) actionKill(_ context.Context, _ core.Options) core.Result { runtime = s.Core() } killed := 0 - s.workspaces.Each(func(_ string, workspaceStatus *WorkspaceStatus) { - if workspaceStatus.Status == "running" && workspaceStatus.PID > 0 { - if agentic.ProcessTerminate(runtime, "", workspaceStatus.PID) { + cleared := 0 + seenQueued := make(map[string]bool) + + for _, statusPath := range agentic.WorkspaceStatusPaths() { + workspaceDir := core.PathDir(statusPath) + statusResult := ReadStatusResult(workspaceDir) + workspaceStatus, ok := statusResult.Value.(*WorkspaceStatus) + if !ok || workspaceStatus == nil { + continue + } + + switch workspaceStatus.Status { + case "running": + if workspaceStatus.PID > 0 && agentic.ProcessTerminate(runtime, "", workspaceStatus.PID) { killed++ } workspaceStatus.Status = "failed" workspaceStatus.PID = 0 + _ = WriteStatus(workspaceDir, workspaceStatus) + if s.workspaces != nil { + s.workspaces.Set(agentic.WorkspaceName(workspaceDir), workspaceStatus) + } + case "queued": + workspaceName := agentic.WorkspaceName(workspaceDir) + if seenQueued[workspaceName] { + continue + } + seenQueued[workspaceName] = true + if deleteResult := fs.DeleteAll(workspaceDir); !deleteResult.OK { + core.Warn("runner.actionKill: failed to delete queued workspace", "workspace", workspaceName, "reason", core.Sprint(deleteResult.Value)) + continue + } + cleared++ + if s.workspaces != nil { + s.workspaces.Delete(workspaceName) + } } - if workspaceStatus.Status == "queued" { - workspaceStatus.Status = "failed" - } - }) - return core.Result{Value: core.Sprintf("killed %d agents", killed), OK: true} + } + + return core.Result{Value: core.Sprintf("killed %d agents, cleared %d queued", killed, cleared), OK: true} } func (s *Service) actionPoke(_ context.Context, _ core.Options) core.Result { diff --git a/pkg/runner/runner_test.go b/pkg/runner/runner_test.go index d31d294..d831e53 100644 --- a/pkg/runner/runner_test.go +++ b/pkg/runner/runner_test.go @@ -226,6 +226,62 @@ func TestRunner_ActionStop_Good(t *testing.T) { assert.True(t, svc.IsFrozen()) } +func TestRunner_ActionKill_Good_ClearsQueuedWorkspaces(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + wsRoot := core.JoinPath(root, "workspace") + + queuedDir := core.JoinPath(wsRoot, "task-queued") + runningDir := core.JoinPath(wsRoot, "task-running") + require.True(t, fs.EnsureDir(queuedDir).OK) + require.True(t, fs.EnsureDir(runningDir).OK) + + require.True(t, WriteStatus(queuedDir, &WorkspaceStatus{ + Status: "queued", + Agent: "codex", + Repo: "go-io", + Task: "Queued work", + }).OK) + require.True(t, WriteStatus(runningDir, &WorkspaceStatus{ + Status: "running", + Agent: "claude", + Repo: "go-log", + Task: "Running work", + }).OK) + + svc := New() + result := svc.actionKill(context.Background(), core.NewOptions()) + require.True(t, result.OK) + assert.Contains(t, result.Value.(string), "cleared 1 queued") + + assert.False(t, fs.Read(core.JoinPath(queuedDir, "status.json")).OK, "queued workspace should be deleted") + running := mustReadStatus(t, runningDir) + assert.Equal(t, "failed", running.Status) + assert.Equal(t, 0, running.PID) +} + +func TestRunner_ActionKill_Bad_EmptyWorkspaceRoot(t *testing.T) { + t.Setenv("CORE_WORKSPACE", "") + svc := New() + result := svc.actionKill(context.Background(), core.NewOptions()) + require.True(t, result.OK) + assert.Contains(t, result.Value.(string), "cleared 0 queued") +} + +func TestRunner_ActionKill_Ugly_InvalidStatusFile(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + wsRoot := core.JoinPath(root, "workspace") + wsDir := core.JoinPath(wsRoot, "task-bad") + require.True(t, fs.EnsureDir(wsDir).OK) + require.True(t, fs.Write(core.JoinPath(wsDir, "status.json"), "{not-json").OK) + + svc := New() + result := svc.actionKill(context.Background(), core.NewOptions()) + require.True(t, result.OK) + assert.Contains(t, result.Value.(string), "cleared 0 queued") +} + func TestRunner_ActionDispatch_Bad_Frozen(t *testing.T) { svc := New() svc.frozen = true