diff --git a/pkg/agentic/persist.go b/pkg/agentic/persist.go new file mode 100644 index 0000000..2aa72d2 --- /dev/null +++ b/pkg/agentic/persist.go @@ -0,0 +1,274 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "errors" + "syscall" + "time" + + core "dappco.re/go/core" +) + +const deadWorkerOnRestartQuestion = "dead worker on restart" + +type concurrencySnapshot struct { + Running int `json:"running,omitempty"` + Tracked int `json:"tracked,omitempty"` + SnapshotAt time.Time `json:"snapshot_at,omitempty"` +} + +// result := s.restorePersistedState(context.Background()) +func (s *PrepSubsystem) restorePersistedState(_ context.Context) core.Result { + if s == nil { + return core.Result{OK: true} + } + + s.workspaces = core.NewRegistry[*WorkspaceStatus]() + _ = s.stateStoreInstance() + + restored := map[string]*WorkspaceStatus{} + s.restoreRegistrySnapshot(restored) + s.restoreQueueSnapshot(restored) + s.restoreConcurrencySnapshot() + s.restoreWorkspaceSnapshot(restored) + + registryKeep := map[string]struct{}{} + queueKeep := map[string]struct{}{} + + for name, workspaceStatus := range restored { + if workspaceStatus == nil { + continue + } + + registryKeep[name] = struct{}{} + workspaceDir := core.JoinPath(WorkspaceRoot(), name) + changed := s.normaliseRestoredWorkspace(workspaceStatus) + + if workspaceStatus.Status == "completed" && fs.IsDir(workspaceDir) { + fs.DeleteAll(workspaceDir) + } else if fs.IsDir(workspaceDir) && (changed || !fs.IsFile(WorkspaceStatusPath(workspaceDir))) { + s.writePersistedWorkspaceStatus(workspaceDir, workspaceStatus) + } + + s.workspaces.Set(name, cloneWorkspaceStatus(workspaceStatus)) + s.stateStoreSet(stateRegistryGroup, name, workspaceStatus) + + if workspaceStatus.Status == "queued" { + queueKeep[name] = struct{}{} + s.stateStoreSet(stateQueueGroup, name, queueEntryFromStatus(workspaceStatus)) + } else { + s.stateStoreDelete(stateQueueGroup, name) + } + } + + s.pruneStateGroup(stateRegistryGroup, registryKeep) + s.pruneStateGroup(stateQueueGroup, queueKeep) + s.refreshConcurrencySnapshot() + + return core.Result{OK: true} +} + +// result := s.flushPersistedState(context.Background()) +func (s *PrepSubsystem) flushPersistedState(_ context.Context) core.Result { + if s == nil { + return core.Result{OK: true} + } + + if len(s.backoff) > 0 || len(s.failCount) > 0 { + s.persistRuntimeState() + } + if s.workspaces == nil { + return core.Result{OK: true} + } + + _ = s.stateStoreInstance() + registryKeep := map[string]struct{}{} + queueKeep := map[string]struct{}{} + + s.workspaces.Each(func(name string, workspaceStatus *WorkspaceStatus) { + if name == "" { + return + } + if workspaceStatus == nil { + s.stateStoreDelete(stateRegistryGroup, name) + s.stateStoreDelete(stateQueueGroup, name) + return + } + + registryKeep[name] = struct{}{} + s.stateStoreSet(stateRegistryGroup, name, workspaceStatus) + + if workspaceStatus.Status == "queued" { + queueKeep[name] = struct{}{} + s.stateStoreSet(stateQueueGroup, name, queueEntryFromStatus(workspaceStatus)) + } else { + s.stateStoreDelete(stateQueueGroup, name) + } + }) + + s.pruneStateGroup(stateRegistryGroup, registryKeep) + s.pruneStateGroup(stateQueueGroup, queueKeep) + s.refreshConcurrencySnapshot() + + return core.Result{OK: true} +} + +func (s *PrepSubsystem) restoreRegistrySnapshot(restored map[string]*WorkspaceStatus) { + if restored == nil { + return + } + s.stateStoreRestore(stateRegistryGroup, func(key, value string) bool { + var workspaceStatus WorkspaceStatus + if result := core.JSONUnmarshalString(value, &workspaceStatus); !result.OK { + return true + } + restored[key] = cloneWorkspaceStatus(&workspaceStatus) + return true + }) +} + +func (s *PrepSubsystem) restoreQueueSnapshot(restored map[string]*WorkspaceStatus) { + if restored == nil { + return + } + s.stateStoreRestore(stateQueueGroup, func(key, value string) bool { + if _, ok := restored[key]; ok { + return true + } + + var entry queueEntry + if result := core.JSONUnmarshalString(value, &entry); !result.OK { + return true + } + + restored[key] = workspaceStatusFromQueueEntry(entry) + return true + }) +} + +func (s *PrepSubsystem) restoreConcurrencySnapshot() { + s.stateStoreRestore(stateConcurrencyGroup, func(_ string, value string) bool { + var snapshot concurrencySnapshot + _ = core.JSONUnmarshalString(value, &snapshot) + return true + }) +} + +func (s *PrepSubsystem) restoreWorkspaceSnapshot(restored map[string]*WorkspaceStatus) { + if restored == nil { + return + } + for _, statusPath := range WorkspaceStatusPaths() { + workspaceDir := core.PathDir(statusPath) + result := ReadStatusResult(workspaceDir) + workspaceStatus, ok := workspaceStatusValue(result) + if !ok { + continue + } + restored[WorkspaceName(workspaceDir)] = cloneWorkspaceStatus(workspaceStatus) + } +} + +func (s *PrepSubsystem) normaliseRestoredWorkspace(workspaceStatus *WorkspaceStatus) bool { + if workspaceStatus == nil { + return false + } + + if workspaceStatus.Status != "running" { + if workspaceStatus.UpdatedAt.IsZero() { + if workspaceStatus.StartedAt.IsZero() { + workspaceStatus.UpdatedAt = time.Now().UTC() + } else { + workspaceStatus.UpdatedAt = workspaceStatus.StartedAt.UTC() + } + return true + } + return false + } + + if s.pidAlive(workspaceStatus.ProcessID, workspaceStatus.PID) { + if workspaceStatus.UpdatedAt.IsZero() { + workspaceStatus.UpdatedAt = time.Now().UTC() + return true + } + return false + } + + workspaceStatus.Status = "failed" + workspaceStatus.Question = deadWorkerOnRestartQuestion + workspaceStatus.PID = 0 + workspaceStatus.ProcessID = "" + workspaceStatus.UpdatedAt = time.Now().UTC() + return true +} + +func (s *PrepSubsystem) pidAlive(processID string, pid int) bool { + if s != nil && s.ServiceRuntime != nil && ProcessAlive(s.Core(), processID, pid) { + return true + } + if pid <= 0 { + return false + } + + err := syscall.Kill(pid, 0) + if err == nil { + return true + } + + return errors.Is(err, syscall.EPERM) +} + +func (s *PrepSubsystem) pruneStateGroup(group string, keep map[string]struct{}) { + if keep == nil { + keep = map[string]struct{}{} + } + + var stale []string + s.stateStoreRestore(group, func(key, _ string) bool { + if _, ok := keep[key]; !ok { + stale = append(stale, key) + } + return true + }) + + for _, key := range stale { + s.stateStoreDelete(group, key) + } +} + +func (s *PrepSubsystem) writePersistedWorkspaceStatus(workspaceDir string, workspaceStatus *WorkspaceStatus) { + if workspaceDir == "" || workspaceStatus == nil || !fs.IsDir(workspaceDir) { + return + } + if workspaceStatus.UpdatedAt.IsZero() { + workspaceStatus.UpdatedAt = time.Now().UTC() + } + _ = fs.WriteAtomic(WorkspaceStatusPath(workspaceDir), core.JSONMarshalString(workspaceStatus)) +} + +func cloneWorkspaceStatus(workspaceStatus *WorkspaceStatus) *WorkspaceStatus { + if workspaceStatus == nil { + return nil + } + cloned := *workspaceStatus + return &cloned +} + +func workspaceStatusFromQueueEntry(entry queueEntry) *WorkspaceStatus { + queuedAt := entry.QueuedAt.UTC() + if queuedAt.IsZero() { + queuedAt = time.Now().UTC() + } + return &WorkspaceStatus{ + Status: "queued", + Agent: entry.Agent, + Repo: entry.Repo, + Org: entry.Org, + Task: entry.Task, + Branch: entry.Branch, + StartedAt: queuedAt, + UpdatedAt: queuedAt, + } +} diff --git a/pkg/agentic/persist_test.go b/pkg/agentic/persist_test.go new file mode 100644 index 0000000..5ea9ac3 --- /dev/null +++ b/pkg/agentic/persist_test.go @@ -0,0 +1,245 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "testing" + "time" + + core "dappco.re/go/core" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPersist_OnStartup_Good_RestoresQueue(t *testing.T) { + root := t.TempDir() + setPersistTestWorkspace(t, root) + + workspaceName := "core/go-io/task-restore" + workspaceDir := core.JoinPath(root, "workspace", "core", "go-io", "task-restore") + require.True(t, fs.EnsureDir(workspaceDir).OK) + + subsystem := &PrepSubsystem{} + defer subsystem.closeStateStore() + if subsystem.stateStoreInstance() == nil { + t.Skip("go-store unavailable on this platform — RFC §15.6 graceful degradation") + } + + queuedAt := time.Now().UTC().Add(-5 * time.Minute) + subsystem.stateStoreSet(stateQueueGroup, workspaceName, queueEntry{ + Repo: "go-io", + Org: "core", + Task: "restore queue", + Branch: "agent/restore-queue", + Agent: "codex:gpt-5.4", + Status: "queued", + QueuedAt: queuedAt, + }) + + result := subsystem.restorePersistedState(context.Background()) + require.True(t, result.OK) + assert.True(t, fs.IsFile(core.JoinPath(root, "db.duckdb"))) + + registryResult := subsystem.Workspaces().Get(workspaceName) + require.True(t, registryResult.OK) + workspaceStatus, ok := registryResult.Value.(*WorkspaceStatus) + require.True(t, ok) + assert.Equal(t, "queued", workspaceStatus.Status) + assert.Equal(t, "go-io", workspaceStatus.Repo) + assert.Equal(t, "core", workspaceStatus.Org) + assert.Equal(t, "agent/restore-queue", workspaceStatus.Branch) + + statusResult := ReadStatusResult(workspaceDir) + require.True(t, statusResult.OK) + restoredStatus, ok := workspaceStatusValue(statusResult) + require.True(t, ok) + assert.Equal(t, "queued", restoredStatus.Status) + assert.Equal(t, "go-io", restoredStatus.Repo) +} + +func TestPersist_OnStartup_Good_MarksDeadWorkers(t *testing.T) { + root := t.TempDir() + setPersistTestWorkspace(t, root) + + workspaceName := "core/go-io/task-dead" + workspaceDir := core.JoinPath(root, "workspace", "core", "go-io", "task-dead") + require.True(t, fs.EnsureDir(workspaceDir).OK) + + subsystem := &PrepSubsystem{} + defer subsystem.closeStateStore() + if subsystem.stateStoreInstance() == nil { + t.Skip("go-store unavailable on this platform — RFC §15.6 graceful degradation") + } + + subsystem.stateStoreSet(stateRegistryGroup, workspaceName, WorkspaceStatus{ + Status: "running", + Agent: "codex:gpt-5.4", + Repo: "go-io", + Org: "core", + Task: "reap ghost", + Branch: "agent/reap-ghost", + PID: 999999, + ProcessID: "process-dead", + StartedAt: time.Now().UTC().Add(-10 * time.Minute), + UpdatedAt: time.Now().UTC().Add(-9 * time.Minute), + Runs: 1, + }) + + result := subsystem.restorePersistedState(context.Background()) + require.True(t, result.OK) + + registryResult := subsystem.Workspaces().Get(workspaceName) + require.True(t, registryResult.OK) + workspaceStatus, ok := registryResult.Value.(*WorkspaceStatus) + require.True(t, ok) + assert.Equal(t, "failed", workspaceStatus.Status) + assert.Equal(t, deadWorkerOnRestartQuestion, workspaceStatus.Question) + assert.Zero(t, workspaceStatus.PID) + assert.Empty(t, workspaceStatus.ProcessID) + + statusResult := ReadStatusResult(workspaceDir) + require.True(t, statusResult.OK) + restoredStatus, ok := workspaceStatusValue(statusResult) + require.True(t, ok) + assert.Equal(t, "failed", restoredStatus.Status) + assert.Equal(t, deadWorkerOnRestartQuestion, restoredStatus.Question) +} + +func TestPersist_OnShutdown_Good_PersistsQueue(t *testing.T) { + root := t.TempDir() + setPersistTestWorkspace(t, root) + + subsystem := &PrepSubsystem{ + workspaces: core.NewRegistry[*WorkspaceStatus](), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + } + if subsystem.stateStoreInstance() == nil { + t.Skip("go-store unavailable on this platform — RFC §15.6 graceful degradation") + } + + now := time.Now().UTC() + subsystem.workspaces.Set("core/go-io/task-queue", &WorkspaceStatus{ + Status: "queued", + Agent: "codex:gpt-5.4", + Repo: "go-io", + Org: "core", + Task: "persist queue", + Branch: "agent/persist-queue", + StartedAt: now, + UpdatedAt: now, + }) + subsystem.workspaces.Set("core/go-store/task-running", &WorkspaceStatus{ + Status: "running", + Agent: "codex:gpt-5.4-mini", + Repo: "go-store", + Org: "core", + Task: "persist registry", + Branch: "agent/persist-registry", + PID: 4242, + StartedAt: now, + UpdatedAt: now, + }) + + result := subsystem.OnShutdown(context.Background()) + require.True(t, result.OK) + + replay := &PrepSubsystem{} + defer replay.closeStateStore() + + queueValue, ok := replay.stateStoreGet(stateQueueGroup, "core/go-io/task-queue") + require.True(t, ok) + var entry queueEntry + require.True(t, core.JSONUnmarshalString(queueValue, &entry).OK) + assert.Equal(t, "go-io", entry.Repo) + assert.Equal(t, "agent/persist-queue", entry.Branch) + + registryValue, ok := replay.stateStoreGet(stateRegistryGroup, "core/go-store/task-running") + require.True(t, ok) + var stored WorkspaceStatus + require.True(t, core.JSONUnmarshalString(registryValue, &stored).OK) + assert.Equal(t, "running", stored.Status) + assert.Equal(t, "go-store", stored.Repo) +} + +func TestPersist_OnStartup_Bad_IgnoresInvalidStorePayload(t *testing.T) { + root := t.TempDir() + setPersistTestWorkspace(t, root) + + validWorkspace := "core/go-io/task-valid" + validWorkspaceDir := core.JoinPath(root, "workspace", "core", "go-io", "task-valid") + require.True(t, fs.EnsureDir(validWorkspaceDir).OK) + + subsystem := &PrepSubsystem{} + defer subsystem.closeStateStore() + storeInstance := subsystem.stateStoreInstance() + if storeInstance == nil { + t.Skip("go-store unavailable on this platform — RFC §15.6 graceful degradation") + } + + require.NoError(t, storeInstance.Set(stateRegistryGroup, "broken", "{")) + subsystem.stateStoreSet(stateQueueGroup, validWorkspace, queueEntry{ + Repo: "go-io", + Org: "core", + Task: "valid queue", + Branch: "agent/valid-queue", + Agent: "codex:gpt-5.4", + QueuedAt: time.Now().UTC(), + }) + + result := subsystem.restorePersistedState(context.Background()) + require.True(t, result.OK) + assert.False(t, subsystem.Workspaces().Get("broken").OK) + assert.True(t, subsystem.Workspaces().Get(validWorkspace).OK) +} + +func TestPersist_OnStartup_Ugly_CleansCompletedOrphanedWorkspace(t *testing.T) { + root := t.TempDir() + setPersistTestWorkspace(t, root) + + workspaceName := "core/go-io/task-completed" + workspaceDir := core.JoinPath(root, "workspace", "core", "go-io", "task-completed") + require.True(t, fs.EnsureDir(workspaceDir).OK) + require.True(t, fs.WriteAtomic(WorkspaceStatusPath(workspaceDir), core.JSONMarshalString(WorkspaceStatus{ + Status: "completed", + Agent: "codex:gpt-5.4", + Repo: "go-io", + Org: "core", + Task: "cleanup orphan", + Branch: "agent/cleanup-orphan", + StartedAt: time.Now().UTC().Add(-2 * time.Hour), + UpdatedAt: time.Now().UTC().Add(-time.Hour), + Runs: 1, + })).OK) + + subsystem := &PrepSubsystem{} + defer subsystem.closeStateStore() + if subsystem.stateStoreInstance() == nil { + t.Skip("go-store unavailable on this platform — RFC §15.6 graceful degradation") + } + + subsystem.stateStoreSet(stateRegistryGroup, workspaceName, WorkspaceStatus{ + Status: "completed", + Agent: "codex:gpt-5.4", + Repo: "go-io", + Org: "core", + Task: "cleanup orphan", + Branch: "agent/cleanup-orphan", + StartedAt: time.Now().UTC().Add(-2 * time.Hour), + UpdatedAt: time.Now().UTC().Add(-time.Hour), + Runs: 1, + }) + + result := subsystem.restorePersistedState(context.Background()) + require.True(t, result.OK) + assert.False(t, fs.IsDir(workspaceDir)) +} + +func setPersistTestWorkspace(t *testing.T, root string) { + t.Helper() + setTestWorkspace(t, root) + t.Setenv("CORE_HOME", root) + t.Setenv("DIR_HOME", root) + t.Setenv("HOME", root) +} diff --git a/pkg/agentic/prep.go b/pkg/agentic/prep.go index c5fdecc..c2545d1 100644 --- a/pkg/agentic/prep.go +++ b/pkg/agentic/prep.go @@ -353,7 +353,9 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result { c.Action("agentic.complete", s.handleComplete).Description = "Run completion pipeline (QA → PR → Verify → Commit → Ingest → Poke) in background" - s.hydrateWorkspaces() + if result := s.restorePersistedState(ctx); !result.OK { + return result + } // RFC §15.5 — startup scans `.core/state/` for orphaned QA workspace // buffers (leftover DuckDB files from dispatches that crashed before // commit) and releases them so the next cycle starts clean. @@ -385,6 +387,9 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result { // _ = subsystem.OnShutdown(context.Background()) func (s *PrepSubsystem) OnShutdown(ctx context.Context) core.Result { s.frozen = true + if result := s.flushPersistedState(ctx); !result.OK { + return result + } s.closeStateStore() s.closeWorkspaceStatsStore() return core.Result{OK: true} diff --git a/tests/cli/restart/Taskfile.yaml b/tests/cli/restart/Taskfile.yaml index d406c1f..1f51dc9 100644 --- a/tests/cli/restart/Taskfile.yaml +++ b/tests/cli/restart/Taskfile.yaml @@ -22,50 +22,81 @@ tasks: export CORE_HOME="$workspace" export DIR_HOME="$workspace" - # Pre-seed a workspace with status.json showing a process that is - # not alive — simulating the "dispatch -> kill -> restart" cycle. - ghost_dir="$workspace/workspace/core/go-io/task-restart" - mkdir -p "$ghost_dir" - cat >"$ghost_dir/status.json" <"$seed" <<'GO' + package main + + import ( + "os" + + store "dappco.re/go/store" + ) + + func main() { + st, err := store.New(os.Args[1]) + if err != nil { + panic(err) + } + defer st.Close() + + must := func(err error) { + if err != nil { + panic(err) + } + } + + must(st.Set("queue", "core/go-io/task-queued", `{"repo":"go-io","org":"core","task":"restart-queued","branch":"agent/restart-queued","agent":"codex:gpt-5.4","status":"queued","queued_at":"2026-04-14T12:00:00Z"}`)) + must(st.Set("registry", "core/go-io/task-queued", `{"status":"queued","agent":"codex:gpt-5.4","repo":"go-io","org":"core","task":"restart-queued","branch":"agent/restart-queued","started_at":"2026-04-14T12:00:00Z","updated_at":"2026-04-14T12:00:00Z","runs":0}`)) + must(st.Set("registry", "core/go-io/task-dead", `{"status":"running","agent":"codex:gpt-5.4","repo":"go-io","org":"core","task":"restart-dead","branch":"agent/restart-dead","pid":99999,"process_id":"dead-proc","started_at":"2026-04-14T12:00:00Z","updated_at":"2026-04-14T12:00:00Z","runs":1}`)) + must(st.Set("concurrency", "codex", `{"running":1,"tracked":2,"snapshot_at":"2026-04-14T12:00:00Z"}`)) + } + GO + go run "$seed" "$workspace/db.duckdb" + + # Restart into a fresh process. Startup must restore the queued + # workspace from db.duckdb and reap the dead worker to failed. output="$(mktemp)" run_capture_all 0 "$output" ./bin/core-agent status - assert_contains "core/go-io/task-restart" "$output" + assert_contains "core/go-io/task-queued" "$output" + assert_contains "core/go-io/task-dead" "$output" - # Verify the ghost did not survive as `running`. The status command - # surfaces both the workspace and its current state — accepting any - # non-running label (failed, completed, queued, blocked) confirms - # the ghost was reaped. - if grep -E 'core/go-io/task-restart.*\brunning\b' "$output"; then - printf 'expected ghost agent reaped on restart, still reports running\n' >&2 + if ! grep -E 'queued[[:space:]]+codex:gpt-5.4[[:space:]]+go-io[[:space:]]+core/go-io/task-queued' "$output"; then + printf 'expected queued workspace restored from db.duckdb\n' >&2 cat "$output" >&2 exit 1 fi - assert_contains "failed" "$output" - # The reaped status should be persisted back to disk per RFC §15.3 — - # cross-process consumers (other tooling reading status.json) must - # see the same coherent state. - status_file="$ghost_dir/status.json" - if grep -q '"status":[[:space:]]*"running"' "$status_file"; then - printf 'expected reaped status persisted to %s\n' "$status_file" >&2 - cat "$status_file" >&2 + if grep -E 'core/go-io/task-dead.*\brunning\b' "$output"; then + printf 'expected dead worker reaped on restart, still reports running\n' >&2 + cat "$output" >&2 + exit 1 + fi + + dead_status="$dead_dir/status.json" + queued_status="$queued_dir/status.json" + + if ! grep -q '"status":[[:space:]]*"failed"' "$dead_status"; then + printf 'expected failed status persisted to %s\n' "$dead_status" >&2 + cat "$dead_status" >&2 + exit 1 + fi + + if ! grep -q '"question":[[:space:]]*"dead worker on restart"' "$dead_status"; then + printf 'expected dead-worker question persisted to %s\n' "$dead_status" >&2 + cat "$dead_status" >&2 + exit 1 + fi + + if ! grep -q '"status":[[:space:]]*"queued"' "$queued_status"; then + printf 'expected queued status restored to %s\n' "$queued_status" >&2 + cat "$queued_status" >&2 exit 1 fi EOF