feat(agent): go-store backing for dispatch state per RFC §15

Introduce an optional go-store persistence layer for the three state
groups described in RFC §15.3 — queue, concurrency, registry — plus
runtime_state and dispatch_history used by the sync pipeline.

- statestore.go lazily opens `.core/db.duckdb` via go-store when
  available; nil-safe helpers return cleanly so in-memory/file-based
  fallbacks survive when the store cannot open (graceful degradation,
  RFC §15.6)
- prep.go tracks the store reference on the subsystem and closes it on
  shutdown; hydrateWorkspaces now consults the registry group before
  the filesystem scan so ghost agents are marked failed across
  restarts, and TrackWorkspace mirrors updates back into the cache
- runtime_state.go persists backoff + fail-count snapshots into the
  go-store runtime group so dispatch backoff survives restarts even
  when the JSON file rotates
- commit.go writes the completed dispatch record into dispatch_history
  for RFC §16.3 sync push to drain without rescanning workspaces
- statestore_test.go covers lazy-once init, restore/delete round trip,
  ghost-agent failure marking, and runtime-state replay across
  subsystem instances

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Snider 2026-04-14 12:17:08 +01:00
parent 83364a6080
commit 4684ae725a
7 changed files with 517 additions and 10 deletions

9
go.mod
View file

@ -7,6 +7,7 @@ require (
dappco.re/go/core/api v0.3.0
dappco.re/go/core/forge v0.3.1
dappco.re/go/core/process v0.5.1
dappco.re/go/core/store v0.3.0
dappco.re/go/core/ws v0.4.0
dappco.re/go/mcp v0.5.6
github.com/gin-gonic/gin v1.12.0
@ -39,6 +40,7 @@ require (
github.com/coreos/go-oidc/v3 v3.17.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/gabriel-vasile/mimetype v1.4.13 // indirect
github.com/gin-contrib/authz v1.0.6 // indirect
github.com/gin-contrib/cors v1.7.6 // indirect
@ -79,12 +81,14 @@ require (
github.com/gorilla/sessions v1.4.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.18.5 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mailru/easyjson v0.9.2 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/ncruces/go-strftime v1.0.0 // indirect
github.com/ollama/ollama v0.18.2 // indirect
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
@ -92,6 +96,7 @@ require (
github.com/quic-go/qpack v0.6.0 // indirect
github.com/quic-go/quic-go v0.59.0 // indirect
github.com/redis/go-redis/v9 v9.18.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/segmentio/asm v1.2.1 // indirect
github.com/segmentio/encoding v0.5.4 // indirect
github.com/sosodev/duration v1.4.0 // indirect
@ -124,6 +129,10 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20260319201613-d00831a3d3e7 // indirect
google.golang.org/grpc v1.79.3 // indirect
google.golang.org/protobuf v1.36.11 // indirect
modernc.org/libc v1.70.0 // indirect
modernc.org/mathutil v1.7.1 // indirect
modernc.org/memory v1.11.0 // indirect
modernc.org/sqlite v1.47.0 // indirect
)
replace dappco.re/go/mcp => ../mcp

44
go.sum
View file

@ -14,14 +14,12 @@ dappco.re/go/core/process v0.5.1 h1:USnVQRzbfGolgju4/L/gyAU7anzgHzr//z8vmR9ppug=
dappco.re/go/core/process v0.5.1/go.mod h1:Zh8H+Rw6LCmjFmO0X6zxy9Z6O4EUKXrE6+XiPDuWuuA=
dappco.re/go/core/rag v0.1.13 h1:R2Q+Xw5YenT4uFemXLBu+xQYtyUIYGSmMln5/Z+nol4=
dappco.re/go/core/rag v0.1.13/go.mod h1:wthXtCqYEChjlGIHcJXetlgk49lPDmzG6jFWd1PEIZc=
dappco.re/go/core/store v0.3.0 h1:DECJB0A8dovqtX7w0/nGCV1XZLGI1/1pUt4SMM6GHh0=
dappco.re/go/core/store v0.3.0/go.mod h1:mirctw1g2ZfZRrALz43bomurXJFSQwd+rZdfIwPVqF8=
dappco.re/go/core/webview v0.2.1 h1:rdy2sV+MS6RZsav8BiARJxtWhfx7eOAJp3b1Ynp1sYs=
dappco.re/go/core/webview v0.2.1/go.mod h1:Qdo1V/sJJwOnL0hYd3+vzVUJxWYC8eGyILZROya6KoM=
dappco.re/go/core/ws v0.4.0 h1:yEDV9whXyo+GWzBSjuB3NiLiH2bmBPBWD6rydwHyBn8=
dappco.re/go/core/ws v0.4.0/go.mod h1:L1rrgW6zU+DztcVBJW2yO5Lm3rGXpyUMOA8OL9zsAok=
dappco.re/go/mcp v0.5.5 h1:RY0Ip5Xs6G2wXUdgCARp9vgnqKdBrIXTiwOAsukzRJk=
dappco.re/go/mcp v0.5.5/go.mod h1:xqPT0qER3oJeQNC7xHjkdg2VYcBXJunJKr6Vk/0moes=
dappco.re/go/mcp v0.5.6 h1:F9yuV9e1vc3vEQwNzhfnKSZcgk7C2wDQr81UcmlxWfU=
dappco.re/go/mcp v0.5.6/go.mod h1:xqPT0qER3oJeQNC7xHjkdg2VYcBXJunJKr6Vk/0moes=
github.com/99designs/gqlgen v0.17.88 h1:neMQDgehMwT1vYIOx/w5ZYPUU/iMNAJzRO44I5Intoc=
github.com/99designs/gqlgen v0.17.88/go.mod h1:qeqYFEgOeSKqWedOjogPizimp2iu4E23bdPvl4jTYic=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
@ -74,6 +72,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dgryski/trifles v0.0.0-20230903005119-f50d829f2e54 h1:SG7nF6SRlWhcT7cNTs5R6Hk4V2lcmLz2NsG2VnInyNo=
github.com/dgryski/trifles v0.0.0-20230903005119-f50d829f2e54/go.mod h1:if7Fbed8SFyPtHLHbg49SI7NAdJiC5WIA09pe59rfAA=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/gabriel-vasile/mimetype v1.4.13 h1:46nXokslUBsAJE/wMsp5gtO500a4F3Nkz9Ufpk2AcUM=
github.com/gabriel-vasile/mimetype v1.4.13/go.mod h1:d+9Oxyo1wTzWdyVUPMmXFvp4F9tea18J8ufA774AB3s=
github.com/gin-contrib/authz v1.0.6 h1:qAO4sSSzOPCwYRZI6YtubC+h2tZVwhwSJeyEZn2W+5k=
@ -165,6 +165,8 @@ github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/jsonschema-go v0.4.2 h1:tmrUohrwoLZZS/P3x7ex0WAVknEkBZM46iALbcqoRA8=
github.com/google/jsonschema-go v0.4.2/go.mod h1:r5quNTdLOYEz95Ru18zA0ydNbBuYoo9tgaYcxEYhJVE=
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/context v1.1.2 h1:WRkNAv2uoa03QNIc1A6u4O7DAGMUVoopZhkiXWA2V1o=
@ -179,6 +181,8 @@ github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE=
github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ=
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
@ -198,6 +202,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w=
github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
github.com/ollama/ollama v0.18.2 h1:RsOY8oZ6TufRiPgsSlKJp4/V/X+oBREscUlEHZfd554=
github.com/ollama/ollama v0.18.2/go.mod h1:tCX4IMV8DHjl3zY0THxuEkpWDZSOchJpzTuLACpMwFw=
github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4=
@ -213,6 +219,8 @@ github.com/quic-go/quic-go v0.59.0 h1:OLJkp1Mlm/aS7dpKgTc6cnpynnD2Xg7C1pwL6vy/SA
github.com/quic-go/quic-go v0.59.0/go.mod h1:upnsH4Ju1YkqpLXC305eW3yDZ4NfnNbmQRCMWS58IKU=
github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs=
github.com/redis/go-redis/v9 v9.18.0/go.mod h1:k3ufPphLU5YXwNTUcCRXGxUoF1fqxnhFQmscfkCoDA0=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/segmentio/asm v1.2.1 h1:DTNbBqs57ioxAD4PrArqftgypG4/qNpXoJx8TVXxPR0=
@ -346,3 +354,31 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis=
modernc.org/cc/v4 v4.27.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0=
modernc.org/ccgo/v4 v4.32.0 h1:hjG66bI/kqIPX1b2yT6fr/jt+QedtP2fqojG2VrFuVw=
modernc.org/ccgo/v4 v4.32.0/go.mod h1:6F08EBCx5uQc38kMGl+0Nm0oWczoo1c7cgpzEry7Uc0=
modernc.org/fileutil v1.4.0 h1:j6ZzNTftVS054gi281TyLjHPp6CPHr2KCxEXjEbD6SM=
modernc.org/fileutil v1.4.0/go.mod h1:EqdKFDxiByqxLk8ozOxObDSfcVOv/54xDs/DUHdvCUU=
modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI=
modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito=
modernc.org/gc/v3 v3.1.2 h1:ZtDCnhonXSZexk/AYsegNRV1lJGgaNZJuKjJSWKyEqo=
modernc.org/gc/v3 v3.1.2/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY=
modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks=
modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI=
modernc.org/libc v1.70.0 h1:U58NawXqXbgpZ/dcdS9kMshu08aiA6b7gusEusqzNkw=
modernc.org/libc v1.70.0/go.mod h1:OVmxFGP1CI/Z4L3E0Q3Mf1PDE0BucwMkcXjjLntvHJo=
modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI=
modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw=
modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8=
modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns=
modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w=
modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE=
modernc.org/sqlite v1.47.0 h1:R1XyaNpoW4Et9yly+I2EeX7pBza/w+pmYee/0HJDyKk=
modernc.org/sqlite v1.47.0/go.mod h1:hWjRO6Tj/5Ik8ieqxQybiEOUXy0NJFNp2tpvVpKlvig=
modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0=
modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A=
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=

View file

@ -120,6 +120,14 @@ func (s *PrepSubsystem) commitWorkspace(ctx context.Context, input CommitInput)
return CommitOutput{}, err
}
// Mirror the dispatch record to the top-level dispatch_history group so
// sync push can drain completed dispatches without re-scanning the
// workspace tree — RFC §15.5 + §16.3. The record carries the same
// shape expected by `POST /v1/agent/sync`.
record["id"] = WorkspaceName(workspaceDir)
record["synced"] = false
s.stateStoreSet(stateDispatchHistoryGroup, WorkspaceName(workspaceDir), record)
return CommitOutput{
Success: true,
Workspace: input.Workspace,

View file

@ -8,6 +8,7 @@ import (
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"sync"
"time"
"dappco.re/go/agent/pkg/lib"
@ -37,6 +38,8 @@ type PrepSubsystem struct {
failCount map[string]int
providers *ProviderManager
workspaces *core.Registry[*WorkspaceStatus]
stateOnce sync.Once
state *stateStoreRef
}
var _ coremcp.Subsystem = (*PrepSubsystem)(nil)
@ -367,6 +370,7 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result {
// _ = subsystem.OnShutdown(context.Background())
func (s *PrepSubsystem) OnShutdown(ctx context.Context) core.Result {
s.frozen = true
s.closeStateStore()
return core.Result{OK: true}
}
@ -376,6 +380,31 @@ func (s *PrepSubsystem) hydrateWorkspaces() {
if s.workspaces == nil {
s.workspaces = core.NewRegistry[*WorkspaceStatus]()
}
// Registry hydration is filesystem-first — workspace status.json is
// authoritative. The go-store registry group caches last-known status
// so ghost agents can be detected after crashes even when the JSON
// status file has been rotated. Filesystem wins on conflict (§15.3).
s.stateStoreRestore(stateRegistryGroup, func(key, value string) bool {
if s.workspaces.Get(key).OK {
return true
}
var status WorkspaceStatus
if result := core.JSONUnmarshalString(value, &status); !result.OK {
return true
}
// Dead agents are marked failed so the next dispatch does not
// block on a PID that disappeared.
if status.Status == "running" && !ProcessAlive(nil, status.ProcessID, status.PID) {
status.Status = "failed"
if status.Question == "" {
status.Question = "Agent process died during restart"
}
}
s.workspaces.Set(key, &status)
return true
})
for _, path := range WorkspaceStatusPaths() {
workspaceDir := core.PathDir(path)
result := ReadStatusResult(workspaceDir)
@ -392,6 +421,11 @@ func (s *PrepSubsystem) TrackWorkspace(name string, st *WorkspaceStatus) {
if s.workspaces != nil {
s.workspaces.Set(name, st)
}
if st == nil {
s.stateStoreDelete(stateRegistryGroup, name)
return
}
s.stateStoreSet(stateRegistryGroup, name, st)
}
// s.Workspaces().Names() // all workspace names

View file

@ -22,14 +22,48 @@ func runtimeStatePath() string {
}
func (s *PrepSubsystem) loadRuntimeState() {
result := readRuntimeState()
if !result.OK {
return
state := runtimeState{
Backoff: make(map[string]time.Time),
FailCount: make(map[string]int),
}
state, ok := result.Value.(runtimeState)
if !ok {
return
// Read the go-store cached runtime state first — when go-store is
// unavailable the read is a no-op and we fall back to the JSON file.
s.stateStoreRestore(stateRuntimeGroup, func(key, value string) bool {
switch key {
case "backoff":
backoff := map[string]time.Time{}
if result := core.JSONUnmarshalString(value, &backoff); result.OK {
for pool, deadline := range backoff {
state.Backoff[pool] = deadline
}
}
case "fail_count":
failCount := map[string]int{}
if result := core.JSONUnmarshalString(value, &failCount); result.OK {
for pool, count := range failCount {
state.FailCount[pool] = count
}
}
}
return true
})
// The JSON file remains authoritative when go-store is missing so
// existing deployments do not regress during the rollout.
if result := readRuntimeState(); result.OK {
if fileState, ok := result.Value.(runtimeState); ok {
for pool, deadline := range fileState.Backoff {
if _, seen := state.Backoff[pool]; !seen {
state.Backoff[pool] = deadline
}
}
for pool, count := range fileState.FailCount {
if _, seen := state.FailCount[pool]; !seen {
state.FailCount[pool] = count
}
}
}
}
if s.backoff == nil {
@ -68,11 +102,26 @@ func (s *PrepSubsystem) persistRuntimeState() {
if len(state.Backoff) == 0 && len(state.FailCount) == 0 {
fs.Delete(runtimeStatePath())
s.stateStoreDelete(stateRuntimeGroup, "backoff")
s.stateStoreDelete(stateRuntimeGroup, "fail_count")
return
}
fs.EnsureDir(runtimeStateDir())
fs.WriteAtomic(runtimeStatePath(), core.JSONMarshalString(state))
// Mirror the authoritative JSON to the go-store cache so restarts see
// the same state even when the JSON file is archived or rotated.
if len(state.Backoff) > 0 {
s.stateStoreSet(stateRuntimeGroup, "backoff", state.Backoff)
} else {
s.stateStoreDelete(stateRuntimeGroup, "backoff")
}
if len(state.FailCount) > 0 {
s.stateStoreSet(stateRuntimeGroup, "fail_count", state.FailCount)
} else {
s.stateStoreDelete(stateRuntimeGroup, "fail_count")
}
}
func readRuntimeState() core.Result {

198
pkg/agentic/statestore.go Normal file
View file

@ -0,0 +1,198 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"sync"
core "dappco.re/go/core"
store "dappco.re/go/core/store"
)
// Usage example: `groupName := queueGroup` // "queue"
const (
stateQueueGroup = "queue"
stateConcurrencyGroup = "concurrency"
stateRegistryGroup = "registry"
stateDispatchHistoryGroup = "dispatch_history"
stateSyncQueueGroup = "sync_queue"
stateRuntimeGroup = "runtime"
)
// stateStorePath returns the canonical path for the top-level agent DuckDB
// state file described in RFC §15.2 — `.core/db.duckdb` relative to CoreRoot.
//
// Usage example: `path := stateStorePath()`
func stateStorePath() string {
return core.JoinPath(CoreRoot(), "db.duckdb")
}
// stateStoreRef keeps the store instance, its initialisation error, and a
// sync.Once so multiple callers observe the same lazily-initialised value.
type stateStoreRef struct {
once sync.Once
instance *store.Store
err error
}
// stateStoreReference is a subsystem-scoped handle that exposes the lazily
// initialised go-store Store. The agent works fully offline when go-store
// cannot be initialised — RFC §15.6.
//
// Usage example: `st := s.stateStoreInstance(); if st == nil { return } // in-memory fallback`
func (s *PrepSubsystem) stateStoreInstance() *store.Store {
if s == nil {
return nil
}
ref := s.stateStoreRef()
if ref == nil {
return nil
}
ref.once.Do(func() {
ref.instance, ref.err = openStateStore()
})
if ref.err != nil {
return nil
}
return ref.instance
}
// stateStoreErr reports the last error observed while opening the go-store
// backend, so callers can decide whether to log or silently fall back.
//
// Usage example: `if err := s.stateStoreErr(); err != nil { core.Warn("state store unavailable", "err", err) }`
func (s *PrepSubsystem) stateStoreErr() error {
if s == nil {
return nil
}
ref := s.stateStoreRef()
if ref == nil {
return nil
}
_ = s.stateStoreInstance()
return ref.err
}
// stateStoreRef returns the subsystem-scoped reference, allocating it lazily
// so zero-value subsystems (used by tests) do not crash.
func (s *PrepSubsystem) stateStoreRef() *stateStoreRef {
if s == nil {
return nil
}
s.stateOnce.Do(func() {
s.state = &stateStoreRef{}
})
return s.state
}
// closeStateStore releases the go-store handle. Safe to call multiple times.
//
// Usage example: `s.closeStateStore()`
func (s *PrepSubsystem) closeStateStore() {
if s == nil {
return
}
ref := s.state
if ref == nil {
return
}
if ref.instance != nil {
_ = ref.instance.Close()
ref.instance = nil
}
ref.err = nil
s.state = nil
s.stateOnce = sync.Once{}
}
// openStateStore attempts to open the canonical state store at
// `.core/db.duckdb`. The filesystem is prepared first so new workspaces do
// not fail the first call. Errors are returned but never cause a panic — the
// caller falls back to in-memory or file-based state per RFC §15.6.
//
// Usage example: `st, err := openStateStore()`
func openStateStore() (*store.Store, error) {
path := stateStorePath()
directory := core.PathDir(path)
if ensureResult := fs.EnsureDir(directory); !ensureResult.OK {
if err, ok := ensureResult.Value.(error); ok {
return nil, core.E("agentic.stateStore", "prepare state directory", err)
}
return nil, core.E("agentic.stateStore", "prepare state directory", nil)
}
storeInstance, err := store.New(path)
if err != nil {
return nil, core.E("agentic.stateStore", "open state store", err)
}
return storeInstance, nil
}
// stateStoreSet writes a JSON-encoded value to the given group+key if the
// store is available. No-op when go-store is not initialised.
//
// Usage example: `s.stateStoreSet(stateQueueGroup, "core/go-io", queueEntry)`
func (s *PrepSubsystem) stateStoreSet(group, key string, value any) {
st := s.stateStoreInstance()
if st == nil {
return
}
payload := core.JSONMarshalString(value)
_ = st.Set(group, key, payload)
}
// stateStoreDelete removes a key from the given group if the store is
// available. No-op when go-store is not initialised.
//
// Usage example: `s.stateStoreDelete(stateRegistryGroup, "core/go-io/task-5")`
func (s *PrepSubsystem) stateStoreDelete(group, key string) {
st := s.stateStoreInstance()
if st == nil {
return
}
_ = st.Delete(group, key)
}
// stateStoreRestore iterates every entry in the given group and invokes
// the visitor with the decoded JSON payload. The visitor must return true
// to continue iteration or false to stop early. No-op when go-store is not
// initialised — callers continue to use file-based/in-memory state.
//
// Usage example:
//
// s.stateStoreRestore(stateQueueGroup, func(key, value string) bool {
// var task QueuedTask
// core.JSONUnmarshalString(value, &task)
// s.queue.Enqueue(task)
// return true
// })
func (s *PrepSubsystem) stateStoreRestore(group string, visit func(key, value string) bool) {
st := s.stateStoreInstance()
if st == nil || visit == nil {
return
}
for entry, err := range st.AllSeq(group) {
if err != nil {
return
}
if !visit(entry.Key, entry.Value) {
return
}
}
}
// stateStoreCount reports the number of entries in a group. Returns 0 when
// the store is unavailable so call sites can compare to zero without guards.
//
// Usage example: `if s.stateStoreCount(stateRegistryGroup) > 0 { /* restore workspaces */ }`
func (s *PrepSubsystem) stateStoreCount(group string) int {
st := s.stateStoreInstance()
if st == nil {
return 0
}
count, err := st.Count(group)
if err != nil {
return 0
}
return count
}

View file

@ -0,0 +1,173 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"testing"
"time"
core "dappco.re/go/core"
)
// withStateStoreTempDir redirects CORE_WORKSPACE to a fresh temporary
// directory so statestore tests can open `.core/db.duckdb` in isolation.
func withStateStoreTempDir(t *testing.T) {
t.Helper()
dir := t.TempDir()
t.Setenv("CORE_WORKSPACE", dir)
t.Setenv("CORE_HOME", dir)
t.Setenv("HOME", dir)
t.Setenv("DIR_HOME", dir)
}
// TestStatestore_StateStoreInstance_Good verifies the DuckDB-backed store can
// be initialised inside a temporary workspace and that the same instance is
// returned on subsequent calls (lazy once semantics).
//
// Usage example: `go test ./pkg/agentic -run TestStatestore_StateStoreInstance_Good`
func TestStatestore_StateStoreInstance_Good(t *testing.T) {
withStateStoreTempDir(t)
subsystem := &PrepSubsystem{}
defer subsystem.closeStateStore()
first := subsystem.stateStoreInstance()
if first == nil {
t.Fatalf("expected store instance, got nil; err=%v", subsystem.stateStoreErr())
}
second := subsystem.stateStoreInstance()
if second != first {
t.Fatalf("expected lazy-once to return same instance, got different pointers")
}
}
// TestStatestore_StateStoreSet_Good_WritesAndRestores verifies the helpers
// round-trip JSON entries through the store and that stateStoreRestore walks
// every entry.
//
// Usage example: `go test ./pkg/agentic -run TestStatestore_StateStoreSet_Good_WritesAndRestores`
func TestStatestore_StateStoreSet_Good_WritesAndRestores(t *testing.T) {
withStateStoreTempDir(t)
subsystem := &PrepSubsystem{}
defer subsystem.closeStateStore()
subsystem.stateStoreSet(stateRegistryGroup, "core/go-io", map[string]any{"status": "running"})
subsystem.stateStoreSet(stateRegistryGroup, "core/go-store", map[string]any{"status": "queued"})
entries := map[string]map[string]any{}
subsystem.stateStoreRestore(stateRegistryGroup, func(key, value string) bool {
decoded := map[string]any{}
if result := core.JSONUnmarshalString(value, &decoded); !result.OK {
t.Fatalf("unmarshal %s: %v", key, result.Value)
}
entries[key] = decoded
return true
})
if len(entries) != 2 {
t.Fatalf("expected 2 entries, got %d: %v", len(entries), entries)
}
if status, ok := entries["core/go-io"]["status"].(string); !ok || status != "running" {
t.Fatalf("expected core/go-io status=running, got %v", entries["core/go-io"])
}
}
// TestStatestore_CloseStateStore_Bad_SafeOnNilSubsystem verifies close helpers
// do not panic on nil receivers — critical for test teardown paths and the
// graceful-degradation requirement in RFC §15.6.
//
// Usage example: `go test ./pkg/agentic -run TestStatestore_CloseStateStore_Bad_SafeOnNilSubsystem`
func TestStatestore_CloseStateStore_Bad_SafeOnNilSubsystem(t *testing.T) {
var subsystem *PrepSubsystem
subsystem.closeStateStore()
if instance := subsystem.stateStoreInstance(); instance != nil {
t.Fatalf("expected nil instance on nil subsystem, got %v", instance)
}
}
// TestStatestore_StateStoreDelete_Ugly_DeletingUnknownKey verifies delete is a
// no-op for missing keys so call sites never need to guard against misses.
//
// Usage example: `go test ./pkg/agentic -run TestStatestore_StateStoreDelete_Ugly_DeletingUnknownKey`
func TestStatestore_StateStoreDelete_Ugly_DeletingUnknownKey(t *testing.T) {
withStateStoreTempDir(t)
subsystem := &PrepSubsystem{}
defer subsystem.closeStateStore()
subsystem.stateStoreDelete(stateRegistryGroup, "never-existed")
subsystem.stateStoreSet(stateRegistryGroup, "real", map[string]any{"value": 1})
subsystem.stateStoreDelete(stateRegistryGroup, "real")
count := subsystem.stateStoreCount(stateRegistryGroup)
if count != 0 {
t.Fatalf("expected registry empty after delete, got count=%d", count)
}
}
// TestStatestore_HydrateWorkspaces_Good_RestoresFromStore mirrors RFC §15.3 —
// the registry group is populated before hydrateWorkspaces runs, and the
// subsystem must restore those entries so ghost agents are detectable across
// restarts without reading the status.json filesystem tree.
//
// Usage example: `go test ./pkg/agentic -run TestStatestore_HydrateWorkspaces_Good_RestoresFromStore`
func TestStatestore_HydrateWorkspaces_Good_RestoresFromStore(t *testing.T) {
withStateStoreTempDir(t)
subsystem := &PrepSubsystem{}
subsystem.workspaces = core.NewRegistry[*WorkspaceStatus]()
defer subsystem.closeStateStore()
subsystem.stateStoreSet(stateRegistryGroup, "core/go-io/task-5", WorkspaceStatus{
Status: "running",
Agent: "codex:gpt-5.4",
PID: 0,
})
subsystem.hydrateWorkspaces()
result := subsystem.Workspaces().Get("core/go-io/task-5")
if !result.OK {
t.Fatalf("expected workspace restored, got miss")
}
status, ok := result.Value.(*WorkspaceStatus)
if !ok {
t.Fatalf("expected *WorkspaceStatus, got %T", result.Value)
}
// Dead PID should be marked failed, per §15.3.
if status.Status != "failed" {
t.Fatalf("expected ghost agent marked failed, got status=%s", status.Status)
}
}
// TestStatestore_RuntimeState_Good_PersistsAcrossReloads mirrors RFC §15 —
// backoff deadlines saved via persistRuntimeState must replay when a new
// subsystem instance calls loadRuntimeState, enabling seamless resume after
// dispatch crashes.
//
// Usage example: `go test ./pkg/agentic -run TestStatestore_RuntimeState_Good_PersistsAcrossReloads`
func TestStatestore_RuntimeState_Good_PersistsAcrossReloads(t *testing.T) {
withStateStoreTempDir(t)
original := &PrepSubsystem{
backoff: map[string]time.Time{
"codex": time.Now().Add(15 * time.Minute),
},
failCount: map[string]int{"codex": 3},
}
original.persistRuntimeState()
original.closeStateStore()
replay := &PrepSubsystem{}
defer replay.closeStateStore()
replay.loadRuntimeState()
if _, ok := replay.backoff["codex"]; !ok {
t.Fatalf("expected replay to restore codex backoff, got map=%v", replay.backoff)
}
if replay.failCount["codex"] != 3 {
t.Fatalf("expected replay fail count=3, got %d", replay.failCount["codex"])
}
}