feat(agent/state): OnStartup queue+registry restore from .core/db.duckdb (#537)

Per RFC §15.3: restart was losing in-flight queue + workspace registry.
"Ghost agents" and "lost queue" pain now fixed.

Lands:
* pkg/agentic/persist.go (NEW):
  - OnStartup(ctx, c): opens .core/db.duckdb via go-store, restores
    registry/queue/concurrency groups
  - Dead-PID detection: registry entries with status=running but
    !pidAlive(PID) → marked failed with question="dead worker on
    restart"; status.json files re-written to disk
  - Orphaned workspace cleanup: walk .core/workspace/, dir-exists +
    registry-says-completed → delete
  - OnShutdown(ctx): flushes in-memory registry + queue back to store
    before close
* pkg/agentic/prep.go — PrepSubsystem.OnStartup/OnShutdown wired
* pkg/agentic/persist_test.go — AX-10 covering queue restore,
  dead-worker reaping, shutdown persistence, invalid-store-payload,
  orphan cleanup
* tests/cli/restart/Taskfile.yaml — extended smoke seeds DuckDB state
  for queued workspace + dead running worker, asserts status.json
  reflects restore correctly

Sandbox blocked from go test by go.work conflicting dappco.re/go/api
replacements (pre-existing); gofmt clean. Supervisor's clean workspace
catches.

Co-authored-by: Codex <noreply@openai.com>
Closes tasks.lthn.sh/view.php?id=537
This commit is contained in:
Snider 2026-04-26 00:04:54 +01:00
parent 2f9ffd5324
commit c6415aa53a
4 changed files with 592 additions and 37 deletions

274
pkg/agentic/persist.go Normal file
View file

@ -0,0 +1,274 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"errors"
"syscall"
"time"
core "dappco.re/go/core"
)
const deadWorkerOnRestartQuestion = "dead worker on restart"
type concurrencySnapshot struct {
Running int `json:"running,omitempty"`
Tracked int `json:"tracked,omitempty"`
SnapshotAt time.Time `json:"snapshot_at,omitempty"`
}
// result := s.restorePersistedState(context.Background())
func (s *PrepSubsystem) restorePersistedState(_ context.Context) core.Result {
if s == nil {
return core.Result{OK: true}
}
s.workspaces = core.NewRegistry[*WorkspaceStatus]()
_ = s.stateStoreInstance()
restored := map[string]*WorkspaceStatus{}
s.restoreRegistrySnapshot(restored)
s.restoreQueueSnapshot(restored)
s.restoreConcurrencySnapshot()
s.restoreWorkspaceSnapshot(restored)
registryKeep := map[string]struct{}{}
queueKeep := map[string]struct{}{}
for name, workspaceStatus := range restored {
if workspaceStatus == nil {
continue
}
registryKeep[name] = struct{}{}
workspaceDir := core.JoinPath(WorkspaceRoot(), name)
changed := s.normaliseRestoredWorkspace(workspaceStatus)
if workspaceStatus.Status == "completed" && fs.IsDir(workspaceDir) {
fs.DeleteAll(workspaceDir)
} else if fs.IsDir(workspaceDir) && (changed || !fs.IsFile(WorkspaceStatusPath(workspaceDir))) {
s.writePersistedWorkspaceStatus(workspaceDir, workspaceStatus)
}
s.workspaces.Set(name, cloneWorkspaceStatus(workspaceStatus))
s.stateStoreSet(stateRegistryGroup, name, workspaceStatus)
if workspaceStatus.Status == "queued" {
queueKeep[name] = struct{}{}
s.stateStoreSet(stateQueueGroup, name, queueEntryFromStatus(workspaceStatus))
} else {
s.stateStoreDelete(stateQueueGroup, name)
}
}
s.pruneStateGroup(stateRegistryGroup, registryKeep)
s.pruneStateGroup(stateQueueGroup, queueKeep)
s.refreshConcurrencySnapshot()
return core.Result{OK: true}
}
// result := s.flushPersistedState(context.Background())
func (s *PrepSubsystem) flushPersistedState(_ context.Context) core.Result {
if s == nil {
return core.Result{OK: true}
}
if len(s.backoff) > 0 || len(s.failCount) > 0 {
s.persistRuntimeState()
}
if s.workspaces == nil {
return core.Result{OK: true}
}
_ = s.stateStoreInstance()
registryKeep := map[string]struct{}{}
queueKeep := map[string]struct{}{}
s.workspaces.Each(func(name string, workspaceStatus *WorkspaceStatus) {
if name == "" {
return
}
if workspaceStatus == nil {
s.stateStoreDelete(stateRegistryGroup, name)
s.stateStoreDelete(stateQueueGroup, name)
return
}
registryKeep[name] = struct{}{}
s.stateStoreSet(stateRegistryGroup, name, workspaceStatus)
if workspaceStatus.Status == "queued" {
queueKeep[name] = struct{}{}
s.stateStoreSet(stateQueueGroup, name, queueEntryFromStatus(workspaceStatus))
} else {
s.stateStoreDelete(stateQueueGroup, name)
}
})
s.pruneStateGroup(stateRegistryGroup, registryKeep)
s.pruneStateGroup(stateQueueGroup, queueKeep)
s.refreshConcurrencySnapshot()
return core.Result{OK: true}
}
func (s *PrepSubsystem) restoreRegistrySnapshot(restored map[string]*WorkspaceStatus) {
if restored == nil {
return
}
s.stateStoreRestore(stateRegistryGroup, func(key, value string) bool {
var workspaceStatus WorkspaceStatus
if result := core.JSONUnmarshalString(value, &workspaceStatus); !result.OK {
return true
}
restored[key] = cloneWorkspaceStatus(&workspaceStatus)
return true
})
}
func (s *PrepSubsystem) restoreQueueSnapshot(restored map[string]*WorkspaceStatus) {
if restored == nil {
return
}
s.stateStoreRestore(stateQueueGroup, func(key, value string) bool {
if _, ok := restored[key]; ok {
return true
}
var entry queueEntry
if result := core.JSONUnmarshalString(value, &entry); !result.OK {
return true
}
restored[key] = workspaceStatusFromQueueEntry(entry)
return true
})
}
func (s *PrepSubsystem) restoreConcurrencySnapshot() {
s.stateStoreRestore(stateConcurrencyGroup, func(_ string, value string) bool {
var snapshot concurrencySnapshot
_ = core.JSONUnmarshalString(value, &snapshot)
return true
})
}
func (s *PrepSubsystem) restoreWorkspaceSnapshot(restored map[string]*WorkspaceStatus) {
if restored == nil {
return
}
for _, statusPath := range WorkspaceStatusPaths() {
workspaceDir := core.PathDir(statusPath)
result := ReadStatusResult(workspaceDir)
workspaceStatus, ok := workspaceStatusValue(result)
if !ok {
continue
}
restored[WorkspaceName(workspaceDir)] = cloneWorkspaceStatus(workspaceStatus)
}
}
func (s *PrepSubsystem) normaliseRestoredWorkspace(workspaceStatus *WorkspaceStatus) bool {
if workspaceStatus == nil {
return false
}
if workspaceStatus.Status != "running" {
if workspaceStatus.UpdatedAt.IsZero() {
if workspaceStatus.StartedAt.IsZero() {
workspaceStatus.UpdatedAt = time.Now().UTC()
} else {
workspaceStatus.UpdatedAt = workspaceStatus.StartedAt.UTC()
}
return true
}
return false
}
if s.pidAlive(workspaceStatus.ProcessID, workspaceStatus.PID) {
if workspaceStatus.UpdatedAt.IsZero() {
workspaceStatus.UpdatedAt = time.Now().UTC()
return true
}
return false
}
workspaceStatus.Status = "failed"
workspaceStatus.Question = deadWorkerOnRestartQuestion
workspaceStatus.PID = 0
workspaceStatus.ProcessID = ""
workspaceStatus.UpdatedAt = time.Now().UTC()
return true
}
func (s *PrepSubsystem) pidAlive(processID string, pid int) bool {
if s != nil && s.ServiceRuntime != nil && ProcessAlive(s.Core(), processID, pid) {
return true
}
if pid <= 0 {
return false
}
err := syscall.Kill(pid, 0)
if err == nil {
return true
}
return errors.Is(err, syscall.EPERM)
}
func (s *PrepSubsystem) pruneStateGroup(group string, keep map[string]struct{}) {
if keep == nil {
keep = map[string]struct{}{}
}
var stale []string
s.stateStoreRestore(group, func(key, _ string) bool {
if _, ok := keep[key]; !ok {
stale = append(stale, key)
}
return true
})
for _, key := range stale {
s.stateStoreDelete(group, key)
}
}
func (s *PrepSubsystem) writePersistedWorkspaceStatus(workspaceDir string, workspaceStatus *WorkspaceStatus) {
if workspaceDir == "" || workspaceStatus == nil || !fs.IsDir(workspaceDir) {
return
}
if workspaceStatus.UpdatedAt.IsZero() {
workspaceStatus.UpdatedAt = time.Now().UTC()
}
_ = fs.WriteAtomic(WorkspaceStatusPath(workspaceDir), core.JSONMarshalString(workspaceStatus))
}
func cloneWorkspaceStatus(workspaceStatus *WorkspaceStatus) *WorkspaceStatus {
if workspaceStatus == nil {
return nil
}
cloned := *workspaceStatus
return &cloned
}
func workspaceStatusFromQueueEntry(entry queueEntry) *WorkspaceStatus {
queuedAt := entry.QueuedAt.UTC()
if queuedAt.IsZero() {
queuedAt = time.Now().UTC()
}
return &WorkspaceStatus{
Status: "queued",
Agent: entry.Agent,
Repo: entry.Repo,
Org: entry.Org,
Task: entry.Task,
Branch: entry.Branch,
StartedAt: queuedAt,
UpdatedAt: queuedAt,
}
}

245
pkg/agentic/persist_test.go Normal file
View file

@ -0,0 +1,245 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"testing"
"time"
core "dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPersist_OnStartup_Good_RestoresQueue(t *testing.T) {
root := t.TempDir()
setPersistTestWorkspace(t, root)
workspaceName := "core/go-io/task-restore"
workspaceDir := core.JoinPath(root, "workspace", "core", "go-io", "task-restore")
require.True(t, fs.EnsureDir(workspaceDir).OK)
subsystem := &PrepSubsystem{}
defer subsystem.closeStateStore()
if subsystem.stateStoreInstance() == nil {
t.Skip("go-store unavailable on this platform — RFC §15.6 graceful degradation")
}
queuedAt := time.Now().UTC().Add(-5 * time.Minute)
subsystem.stateStoreSet(stateQueueGroup, workspaceName, queueEntry{
Repo: "go-io",
Org: "core",
Task: "restore queue",
Branch: "agent/restore-queue",
Agent: "codex:gpt-5.4",
Status: "queued",
QueuedAt: queuedAt,
})
result := subsystem.restorePersistedState(context.Background())
require.True(t, result.OK)
assert.True(t, fs.IsFile(core.JoinPath(root, "db.duckdb")))
registryResult := subsystem.Workspaces().Get(workspaceName)
require.True(t, registryResult.OK)
workspaceStatus, ok := registryResult.Value.(*WorkspaceStatus)
require.True(t, ok)
assert.Equal(t, "queued", workspaceStatus.Status)
assert.Equal(t, "go-io", workspaceStatus.Repo)
assert.Equal(t, "core", workspaceStatus.Org)
assert.Equal(t, "agent/restore-queue", workspaceStatus.Branch)
statusResult := ReadStatusResult(workspaceDir)
require.True(t, statusResult.OK)
restoredStatus, ok := workspaceStatusValue(statusResult)
require.True(t, ok)
assert.Equal(t, "queued", restoredStatus.Status)
assert.Equal(t, "go-io", restoredStatus.Repo)
}
func TestPersist_OnStartup_Good_MarksDeadWorkers(t *testing.T) {
root := t.TempDir()
setPersistTestWorkspace(t, root)
workspaceName := "core/go-io/task-dead"
workspaceDir := core.JoinPath(root, "workspace", "core", "go-io", "task-dead")
require.True(t, fs.EnsureDir(workspaceDir).OK)
subsystem := &PrepSubsystem{}
defer subsystem.closeStateStore()
if subsystem.stateStoreInstance() == nil {
t.Skip("go-store unavailable on this platform — RFC §15.6 graceful degradation")
}
subsystem.stateStoreSet(stateRegistryGroup, workspaceName, WorkspaceStatus{
Status: "running",
Agent: "codex:gpt-5.4",
Repo: "go-io",
Org: "core",
Task: "reap ghost",
Branch: "agent/reap-ghost",
PID: 999999,
ProcessID: "process-dead",
StartedAt: time.Now().UTC().Add(-10 * time.Minute),
UpdatedAt: time.Now().UTC().Add(-9 * time.Minute),
Runs: 1,
})
result := subsystem.restorePersistedState(context.Background())
require.True(t, result.OK)
registryResult := subsystem.Workspaces().Get(workspaceName)
require.True(t, registryResult.OK)
workspaceStatus, ok := registryResult.Value.(*WorkspaceStatus)
require.True(t, ok)
assert.Equal(t, "failed", workspaceStatus.Status)
assert.Equal(t, deadWorkerOnRestartQuestion, workspaceStatus.Question)
assert.Zero(t, workspaceStatus.PID)
assert.Empty(t, workspaceStatus.ProcessID)
statusResult := ReadStatusResult(workspaceDir)
require.True(t, statusResult.OK)
restoredStatus, ok := workspaceStatusValue(statusResult)
require.True(t, ok)
assert.Equal(t, "failed", restoredStatus.Status)
assert.Equal(t, deadWorkerOnRestartQuestion, restoredStatus.Question)
}
func TestPersist_OnShutdown_Good_PersistsQueue(t *testing.T) {
root := t.TempDir()
setPersistTestWorkspace(t, root)
subsystem := &PrepSubsystem{
workspaces: core.NewRegistry[*WorkspaceStatus](),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
if subsystem.stateStoreInstance() == nil {
t.Skip("go-store unavailable on this platform — RFC §15.6 graceful degradation")
}
now := time.Now().UTC()
subsystem.workspaces.Set("core/go-io/task-queue", &WorkspaceStatus{
Status: "queued",
Agent: "codex:gpt-5.4",
Repo: "go-io",
Org: "core",
Task: "persist queue",
Branch: "agent/persist-queue",
StartedAt: now,
UpdatedAt: now,
})
subsystem.workspaces.Set("core/go-store/task-running", &WorkspaceStatus{
Status: "running",
Agent: "codex:gpt-5.4-mini",
Repo: "go-store",
Org: "core",
Task: "persist registry",
Branch: "agent/persist-registry",
PID: 4242,
StartedAt: now,
UpdatedAt: now,
})
result := subsystem.OnShutdown(context.Background())
require.True(t, result.OK)
replay := &PrepSubsystem{}
defer replay.closeStateStore()
queueValue, ok := replay.stateStoreGet(stateQueueGroup, "core/go-io/task-queue")
require.True(t, ok)
var entry queueEntry
require.True(t, core.JSONUnmarshalString(queueValue, &entry).OK)
assert.Equal(t, "go-io", entry.Repo)
assert.Equal(t, "agent/persist-queue", entry.Branch)
registryValue, ok := replay.stateStoreGet(stateRegistryGroup, "core/go-store/task-running")
require.True(t, ok)
var stored WorkspaceStatus
require.True(t, core.JSONUnmarshalString(registryValue, &stored).OK)
assert.Equal(t, "running", stored.Status)
assert.Equal(t, "go-store", stored.Repo)
}
func TestPersist_OnStartup_Bad_IgnoresInvalidStorePayload(t *testing.T) {
root := t.TempDir()
setPersistTestWorkspace(t, root)
validWorkspace := "core/go-io/task-valid"
validWorkspaceDir := core.JoinPath(root, "workspace", "core", "go-io", "task-valid")
require.True(t, fs.EnsureDir(validWorkspaceDir).OK)
subsystem := &PrepSubsystem{}
defer subsystem.closeStateStore()
storeInstance := subsystem.stateStoreInstance()
if storeInstance == nil {
t.Skip("go-store unavailable on this platform — RFC §15.6 graceful degradation")
}
require.NoError(t, storeInstance.Set(stateRegistryGroup, "broken", "{"))
subsystem.stateStoreSet(stateQueueGroup, validWorkspace, queueEntry{
Repo: "go-io",
Org: "core",
Task: "valid queue",
Branch: "agent/valid-queue",
Agent: "codex:gpt-5.4",
QueuedAt: time.Now().UTC(),
})
result := subsystem.restorePersistedState(context.Background())
require.True(t, result.OK)
assert.False(t, subsystem.Workspaces().Get("broken").OK)
assert.True(t, subsystem.Workspaces().Get(validWorkspace).OK)
}
func TestPersist_OnStartup_Ugly_CleansCompletedOrphanedWorkspace(t *testing.T) {
root := t.TempDir()
setPersistTestWorkspace(t, root)
workspaceName := "core/go-io/task-completed"
workspaceDir := core.JoinPath(root, "workspace", "core", "go-io", "task-completed")
require.True(t, fs.EnsureDir(workspaceDir).OK)
require.True(t, fs.WriteAtomic(WorkspaceStatusPath(workspaceDir), core.JSONMarshalString(WorkspaceStatus{
Status: "completed",
Agent: "codex:gpt-5.4",
Repo: "go-io",
Org: "core",
Task: "cleanup orphan",
Branch: "agent/cleanup-orphan",
StartedAt: time.Now().UTC().Add(-2 * time.Hour),
UpdatedAt: time.Now().UTC().Add(-time.Hour),
Runs: 1,
})).OK)
subsystem := &PrepSubsystem{}
defer subsystem.closeStateStore()
if subsystem.stateStoreInstance() == nil {
t.Skip("go-store unavailable on this platform — RFC §15.6 graceful degradation")
}
subsystem.stateStoreSet(stateRegistryGroup, workspaceName, WorkspaceStatus{
Status: "completed",
Agent: "codex:gpt-5.4",
Repo: "go-io",
Org: "core",
Task: "cleanup orphan",
Branch: "agent/cleanup-orphan",
StartedAt: time.Now().UTC().Add(-2 * time.Hour),
UpdatedAt: time.Now().UTC().Add(-time.Hour),
Runs: 1,
})
result := subsystem.restorePersistedState(context.Background())
require.True(t, result.OK)
assert.False(t, fs.IsDir(workspaceDir))
}
func setPersistTestWorkspace(t *testing.T, root string) {
t.Helper()
setTestWorkspace(t, root)
t.Setenv("CORE_HOME", root)
t.Setenv("DIR_HOME", root)
t.Setenv("HOME", root)
}

View file

@ -353,7 +353,9 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result {
c.Action("agentic.complete", s.handleComplete).Description = "Run completion pipeline (QA → PR → Verify → Commit → Ingest → Poke) in background"
s.hydrateWorkspaces()
if result := s.restorePersistedState(ctx); !result.OK {
return result
}
// RFC §15.5 — startup scans `.core/state/` for orphaned QA workspace
// buffers (leftover DuckDB files from dispatches that crashed before
// commit) and releases them so the next cycle starts clean.
@ -385,6 +387,9 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result {
// _ = subsystem.OnShutdown(context.Background())
func (s *PrepSubsystem) OnShutdown(ctx context.Context) core.Result {
s.frozen = true
if result := s.flushPersistedState(ctx); !result.OK {
return result
}
s.closeStateStore()
s.closeWorkspaceStatsStore()
return core.Result{OK: true}

View file

@ -22,50 +22,81 @@ tasks:
export CORE_HOME="$workspace"
export DIR_HOME="$workspace"
# Pre-seed a workspace with status.json showing a process that is
# not alive — simulating the "dispatch -> kill -> restart" cycle.
ghost_dir="$workspace/workspace/core/go-io/task-restart"
mkdir -p "$ghost_dir"
cat >"$ghost_dir/status.json" <<JSON
{
"status": "running",
"agent": "codex:gpt-5.4",
"repo": "go-io",
"org": "core",
"task": "restart-survival",
"branch": "agent/restart-survival",
"pid": 99999,
"started_at": "2026-04-14T12:00:00Z",
"updated_at": "2026-04-14T12:00:00Z",
"runs": 1
}
JSON
# Simulate the persisted state a killed process leaves behind:
# one queued workspace with only db.duckdb state and one dead
# running worker that must be reaped to failed on restart.
queued_dir="$workspace/workspace/core/go-io/task-queued"
dead_dir="$workspace/workspace/core/go-io/task-dead"
mkdir -p "$queued_dir" "$dead_dir"
# Run status — the new agent process must NOT report the ghost as
# "running"; it must mark the workspace failed (RFC §15.3 ghost
# detection on hydrate).
seed="$workspace/seed-restart-state.go"
cat >"$seed" <<'GO'
package main
import (
"os"
store "dappco.re/go/store"
)
func main() {
st, err := store.New(os.Args[1])
if err != nil {
panic(err)
}
defer st.Close()
must := func(err error) {
if err != nil {
panic(err)
}
}
must(st.Set("queue", "core/go-io/task-queued", `{"repo":"go-io","org":"core","task":"restart-queued","branch":"agent/restart-queued","agent":"codex:gpt-5.4","status":"queued","queued_at":"2026-04-14T12:00:00Z"}`))
must(st.Set("registry", "core/go-io/task-queued", `{"status":"queued","agent":"codex:gpt-5.4","repo":"go-io","org":"core","task":"restart-queued","branch":"agent/restart-queued","started_at":"2026-04-14T12:00:00Z","updated_at":"2026-04-14T12:00:00Z","runs":0}`))
must(st.Set("registry", "core/go-io/task-dead", `{"status":"running","agent":"codex:gpt-5.4","repo":"go-io","org":"core","task":"restart-dead","branch":"agent/restart-dead","pid":99999,"process_id":"dead-proc","started_at":"2026-04-14T12:00:00Z","updated_at":"2026-04-14T12:00:00Z","runs":1}`))
must(st.Set("concurrency", "codex", `{"running":1,"tracked":2,"snapshot_at":"2026-04-14T12:00:00Z"}`))
}
GO
go run "$seed" "$workspace/db.duckdb"
# Restart into a fresh process. Startup must restore the queued
# workspace from db.duckdb and reap the dead worker to failed.
output="$(mktemp)"
run_capture_all 0 "$output" ./bin/core-agent status
assert_contains "core/go-io/task-restart" "$output"
assert_contains "core/go-io/task-queued" "$output"
assert_contains "core/go-io/task-dead" "$output"
# Verify the ghost did not survive as `running`. The status command
# surfaces both the workspace and its current state — accepting any
# non-running label (failed, completed, queued, blocked) confirms
# the ghost was reaped.
if grep -E 'core/go-io/task-restart.*\brunning\b' "$output"; then
printf 'expected ghost agent reaped on restart, still reports running\n' >&2
if ! grep -E 'queued[[:space:]]+codex:gpt-5.4[[:space:]]+go-io[[:space:]]+core/go-io/task-queued' "$output"; then
printf 'expected queued workspace restored from db.duckdb\n' >&2
cat "$output" >&2
exit 1
fi
assert_contains "failed" "$output"
# The reaped status should be persisted back to disk per RFC §15.3 —
# cross-process consumers (other tooling reading status.json) must
# see the same coherent state.
status_file="$ghost_dir/status.json"
if grep -q '"status":[[:space:]]*"running"' "$status_file"; then
printf 'expected reaped status persisted to %s\n' "$status_file" >&2
cat "$status_file" >&2
if grep -E 'core/go-io/task-dead.*\brunning\b' "$output"; then
printf 'expected dead worker reaped on restart, still reports running\n' >&2
cat "$output" >&2
exit 1
fi
dead_status="$dead_dir/status.json"
queued_status="$queued_dir/status.json"
if ! grep -q '"status":[[:space:]]*"failed"' "$dead_status"; then
printf 'expected failed status persisted to %s\n' "$dead_status" >&2
cat "$dead_status" >&2
exit 1
fi
if ! grep -q '"question":[[:space:]]*"dead worker on restart"' "$dead_status"; then
printf 'expected dead-worker question persisted to %s\n' "$dead_status" >&2
cat "$dead_status" >&2
exit 1
fi
if ! grep -q '"status":[[:space:]]*"queued"' "$queued_status"; then
printf 'expected queued status restored to %s\n' "$queued_status" >&2
cat "$queued_status" >&2
exit 1
fi
EOF