From 4684ae725a2e44ccbbaf58a8a428237ac9022a30 Mon Sep 17 00:00:00 2001 From: Snider Date: Tue, 14 Apr 2026 12:17:08 +0100 Subject: [PATCH] =?UTF-8?q?feat(agent):=20go-store=20backing=20for=20dispa?= =?UTF-8?q?tch=20state=20per=20RFC=20=C2=A715?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce an optional go-store persistence layer for the three state groups described in RFC §15.3 — queue, concurrency, registry — plus runtime_state and dispatch_history used by the sync pipeline. - statestore.go lazily opens `.core/db.duckdb` via go-store when available; nil-safe helpers return cleanly so in-memory/file-based fallbacks survive when the store cannot open (graceful degradation, RFC §15.6) - prep.go tracks the store reference on the subsystem and closes it on shutdown; hydrateWorkspaces now consults the registry group before the filesystem scan so ghost agents are marked failed across restarts, and TrackWorkspace mirrors updates back into the cache - runtime_state.go persists backoff + fail-count snapshots into the go-store runtime group so dispatch backoff survives restarts even when the JSON file rotates - commit.go writes the completed dispatch record into dispatch_history for RFC §16.3 sync push to drain without rescanning workspaces - statestore_test.go covers lazy-once init, restore/delete round trip, ghost-agent failure marking, and runtime-state replay across subsystem instances Co-Authored-By: Virgil --- go.mod | 9 ++ go.sum | 44 +++++++- pkg/agentic/commit.go | 8 ++ pkg/agentic/prep.go | 34 ++++++ pkg/agentic/runtime_state.go | 61 +++++++++- pkg/agentic/statestore.go | 198 +++++++++++++++++++++++++++++++++ pkg/agentic/statestore_test.go | 173 ++++++++++++++++++++++++++++ 7 files changed, 517 insertions(+), 10 deletions(-) create mode 100644 pkg/agentic/statestore.go create mode 100644 pkg/agentic/statestore_test.go diff --git a/go.mod b/go.mod index a85bd9d..a914411 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( dappco.re/go/core/api v0.3.0 dappco.re/go/core/forge v0.3.1 dappco.re/go/core/process v0.5.1 + dappco.re/go/core/store v0.3.0 dappco.re/go/core/ws v0.4.0 dappco.re/go/mcp v0.5.6 github.com/gin-gonic/gin v1.12.0 @@ -39,6 +40,7 @@ require ( github.com/coreos/go-oidc/v3 v3.17.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/gabriel-vasile/mimetype v1.4.13 // indirect github.com/gin-contrib/authz v1.0.6 // indirect github.com/gin-contrib/cors v1.7.6 // indirect @@ -79,12 +81,14 @@ require ( github.com/gorilla/sessions v1.4.0 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.18.5 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/mailru/easyjson v0.9.2 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/ncruces/go-strftime v1.0.0 // indirect github.com/ollama/ollama v0.18.2 // indirect github.com/pelletier/go-toml/v2 v2.2.4 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect @@ -92,6 +96,7 @@ require ( github.com/quic-go/qpack v0.6.0 // indirect github.com/quic-go/quic-go v0.59.0 // indirect github.com/redis/go-redis/v9 v9.18.0 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/segmentio/asm v1.2.1 // indirect github.com/segmentio/encoding v0.5.4 // indirect github.com/sosodev/duration v1.4.0 // indirect @@ -124,6 +129,10 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20260319201613-d00831a3d3e7 // indirect google.golang.org/grpc v1.79.3 // indirect google.golang.org/protobuf v1.36.11 // indirect + modernc.org/libc v1.70.0 // indirect + modernc.org/mathutil v1.7.1 // indirect + modernc.org/memory v1.11.0 // indirect + modernc.org/sqlite v1.47.0 // indirect ) replace dappco.re/go/mcp => ../mcp diff --git a/go.sum b/go.sum index 7400a3a..5a4c37b 100644 --- a/go.sum +++ b/go.sum @@ -14,14 +14,12 @@ dappco.re/go/core/process v0.5.1 h1:USnVQRzbfGolgju4/L/gyAU7anzgHzr//z8vmR9ppug= dappco.re/go/core/process v0.5.1/go.mod h1:Zh8H+Rw6LCmjFmO0X6zxy9Z6O4EUKXrE6+XiPDuWuuA= dappco.re/go/core/rag v0.1.13 h1:R2Q+Xw5YenT4uFemXLBu+xQYtyUIYGSmMln5/Z+nol4= dappco.re/go/core/rag v0.1.13/go.mod h1:wthXtCqYEChjlGIHcJXetlgk49lPDmzG6jFWd1PEIZc= +dappco.re/go/core/store v0.3.0 h1:DECJB0A8dovqtX7w0/nGCV1XZLGI1/1pUt4SMM6GHh0= +dappco.re/go/core/store v0.3.0/go.mod h1:mirctw1g2ZfZRrALz43bomurXJFSQwd+rZdfIwPVqF8= dappco.re/go/core/webview v0.2.1 h1:rdy2sV+MS6RZsav8BiARJxtWhfx7eOAJp3b1Ynp1sYs= dappco.re/go/core/webview v0.2.1/go.mod h1:Qdo1V/sJJwOnL0hYd3+vzVUJxWYC8eGyILZROya6KoM= dappco.re/go/core/ws v0.4.0 h1:yEDV9whXyo+GWzBSjuB3NiLiH2bmBPBWD6rydwHyBn8= dappco.re/go/core/ws v0.4.0/go.mod h1:L1rrgW6zU+DztcVBJW2yO5Lm3rGXpyUMOA8OL9zsAok= -dappco.re/go/mcp v0.5.5 h1:RY0Ip5Xs6G2wXUdgCARp9vgnqKdBrIXTiwOAsukzRJk= -dappco.re/go/mcp v0.5.5/go.mod h1:xqPT0qER3oJeQNC7xHjkdg2VYcBXJunJKr6Vk/0moes= -dappco.re/go/mcp v0.5.6 h1:F9yuV9e1vc3vEQwNzhfnKSZcgk7C2wDQr81UcmlxWfU= -dappco.re/go/mcp v0.5.6/go.mod h1:xqPT0qER3oJeQNC7xHjkdg2VYcBXJunJKr6Vk/0moes= github.com/99designs/gqlgen v0.17.88 h1:neMQDgehMwT1vYIOx/w5ZYPUU/iMNAJzRO44I5Intoc= github.com/99designs/gqlgen v0.17.88/go.mod h1:qeqYFEgOeSKqWedOjogPizimp2iu4E23bdPvl4jTYic= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= @@ -74,6 +72,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/trifles v0.0.0-20230903005119-f50d829f2e54 h1:SG7nF6SRlWhcT7cNTs5R6Hk4V2lcmLz2NsG2VnInyNo= github.com/dgryski/trifles v0.0.0-20230903005119-f50d829f2e54/go.mod h1:if7Fbed8SFyPtHLHbg49SI7NAdJiC5WIA09pe59rfAA= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/gabriel-vasile/mimetype v1.4.13 h1:46nXokslUBsAJE/wMsp5gtO500a4F3Nkz9Ufpk2AcUM= github.com/gabriel-vasile/mimetype v1.4.13/go.mod h1:d+9Oxyo1wTzWdyVUPMmXFvp4F9tea18J8ufA774AB3s= github.com/gin-contrib/authz v1.0.6 h1:qAO4sSSzOPCwYRZI6YtubC+h2tZVwhwSJeyEZn2W+5k= @@ -165,6 +165,8 @@ github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/jsonschema-go v0.4.2 h1:tmrUohrwoLZZS/P3x7ex0WAVknEkBZM46iALbcqoRA8= github.com/google/jsonschema-go v0.4.2/go.mod h1:r5quNTdLOYEz95Ru18zA0ydNbBuYoo9tgaYcxEYhJVE= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/context v1.1.2 h1:WRkNAv2uoa03QNIc1A6u4O7DAGMUVoopZhkiXWA2V1o= @@ -179,6 +181,8 @@ github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE= +github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -198,6 +202,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= +github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/ollama/ollama v0.18.2 h1:RsOY8oZ6TufRiPgsSlKJp4/V/X+oBREscUlEHZfd554= github.com/ollama/ollama v0.18.2/go.mod h1:tCX4IMV8DHjl3zY0THxuEkpWDZSOchJpzTuLACpMwFw= github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= @@ -213,6 +219,8 @@ github.com/quic-go/quic-go v0.59.0 h1:OLJkp1Mlm/aS7dpKgTc6cnpynnD2Xg7C1pwL6vy/SA github.com/quic-go/quic-go v0.59.0/go.mod h1:upnsH4Ju1YkqpLXC305eW3yDZ4NfnNbmQRCMWS58IKU= github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs= github.com/redis/go-redis/v9 v9.18.0/go.mod h1:k3ufPphLU5YXwNTUcCRXGxUoF1fqxnhFQmscfkCoDA0= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/segmentio/asm v1.2.1 h1:DTNbBqs57ioxAD4PrArqftgypG4/qNpXoJx8TVXxPR0= @@ -346,3 +354,31 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis= +modernc.org/cc/v4 v4.27.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0= +modernc.org/ccgo/v4 v4.32.0 h1:hjG66bI/kqIPX1b2yT6fr/jt+QedtP2fqojG2VrFuVw= +modernc.org/ccgo/v4 v4.32.0/go.mod h1:6F08EBCx5uQc38kMGl+0Nm0oWczoo1c7cgpzEry7Uc0= +modernc.org/fileutil v1.4.0 h1:j6ZzNTftVS054gi281TyLjHPp6CPHr2KCxEXjEbD6SM= +modernc.org/fileutil v1.4.0/go.mod h1:EqdKFDxiByqxLk8ozOxObDSfcVOv/54xDs/DUHdvCUU= +modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI= +modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito= +modernc.org/gc/v3 v3.1.2 h1:ZtDCnhonXSZexk/AYsegNRV1lJGgaNZJuKjJSWKyEqo= +modernc.org/gc/v3 v3.1.2/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY= +modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks= +modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI= +modernc.org/libc v1.70.0 h1:U58NawXqXbgpZ/dcdS9kMshu08aiA6b7gusEusqzNkw= +modernc.org/libc v1.70.0/go.mod h1:OVmxFGP1CI/Z4L3E0Q3Mf1PDE0BucwMkcXjjLntvHJo= +modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= +modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= +modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= +modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw= +modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8= +modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns= +modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w= +modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE= +modernc.org/sqlite v1.47.0 h1:R1XyaNpoW4Et9yly+I2EeX7pBza/w+pmYee/0HJDyKk= +modernc.org/sqlite v1.47.0/go.mod h1:hWjRO6Tj/5Ik8ieqxQybiEOUXy0NJFNp2tpvVpKlvig= +modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= +modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/pkg/agentic/commit.go b/pkg/agentic/commit.go index 188add5..3106168 100644 --- a/pkg/agentic/commit.go +++ b/pkg/agentic/commit.go @@ -120,6 +120,14 @@ func (s *PrepSubsystem) commitWorkspace(ctx context.Context, input CommitInput) return CommitOutput{}, err } + // Mirror the dispatch record to the top-level dispatch_history group so + // sync push can drain completed dispatches without re-scanning the + // workspace tree — RFC §15.5 + §16.3. The record carries the same + // shape expected by `POST /v1/agent/sync`. + record["id"] = WorkspaceName(workspaceDir) + record["synced"] = false + s.stateStoreSet(stateDispatchHistoryGroup, WorkspaceName(workspaceDir), record) + return CommitOutput{ Success: true, Workspace: input.Workspace, diff --git a/pkg/agentic/prep.go b/pkg/agentic/prep.go index 0597350..78301ca 100644 --- a/pkg/agentic/prep.go +++ b/pkg/agentic/prep.go @@ -8,6 +8,7 @@ import ( "crypto/sha256" "encoding/base64" "encoding/hex" + "sync" "time" "dappco.re/go/agent/pkg/lib" @@ -37,6 +38,8 @@ type PrepSubsystem struct { failCount map[string]int providers *ProviderManager workspaces *core.Registry[*WorkspaceStatus] + stateOnce sync.Once + state *stateStoreRef } var _ coremcp.Subsystem = (*PrepSubsystem)(nil) @@ -367,6 +370,7 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result { // _ = subsystem.OnShutdown(context.Background()) func (s *PrepSubsystem) OnShutdown(ctx context.Context) core.Result { s.frozen = true + s.closeStateStore() return core.Result{OK: true} } @@ -376,6 +380,31 @@ func (s *PrepSubsystem) hydrateWorkspaces() { if s.workspaces == nil { s.workspaces = core.NewRegistry[*WorkspaceStatus]() } + + // Registry hydration is filesystem-first — workspace status.json is + // authoritative. The go-store registry group caches last-known status + // so ghost agents can be detected after crashes even when the JSON + // status file has been rotated. Filesystem wins on conflict (§15.3). + s.stateStoreRestore(stateRegistryGroup, func(key, value string) bool { + if s.workspaces.Get(key).OK { + return true + } + var status WorkspaceStatus + if result := core.JSONUnmarshalString(value, &status); !result.OK { + return true + } + // Dead agents are marked failed so the next dispatch does not + // block on a PID that disappeared. + if status.Status == "running" && !ProcessAlive(nil, status.ProcessID, status.PID) { + status.Status = "failed" + if status.Question == "" { + status.Question = "Agent process died during restart" + } + } + s.workspaces.Set(key, &status) + return true + }) + for _, path := range WorkspaceStatusPaths() { workspaceDir := core.PathDir(path) result := ReadStatusResult(workspaceDir) @@ -392,6 +421,11 @@ func (s *PrepSubsystem) TrackWorkspace(name string, st *WorkspaceStatus) { if s.workspaces != nil { s.workspaces.Set(name, st) } + if st == nil { + s.stateStoreDelete(stateRegistryGroup, name) + return + } + s.stateStoreSet(stateRegistryGroup, name, st) } // s.Workspaces().Names() // all workspace names diff --git a/pkg/agentic/runtime_state.go b/pkg/agentic/runtime_state.go index 49affcd..9d971d9 100644 --- a/pkg/agentic/runtime_state.go +++ b/pkg/agentic/runtime_state.go @@ -22,14 +22,48 @@ func runtimeStatePath() string { } func (s *PrepSubsystem) loadRuntimeState() { - result := readRuntimeState() - if !result.OK { - return + state := runtimeState{ + Backoff: make(map[string]time.Time), + FailCount: make(map[string]int), } - state, ok := result.Value.(runtimeState) - if !ok { - return + // Read the go-store cached runtime state first — when go-store is + // unavailable the read is a no-op and we fall back to the JSON file. + s.stateStoreRestore(stateRuntimeGroup, func(key, value string) bool { + switch key { + case "backoff": + backoff := map[string]time.Time{} + if result := core.JSONUnmarshalString(value, &backoff); result.OK { + for pool, deadline := range backoff { + state.Backoff[pool] = deadline + } + } + case "fail_count": + failCount := map[string]int{} + if result := core.JSONUnmarshalString(value, &failCount); result.OK { + for pool, count := range failCount { + state.FailCount[pool] = count + } + } + } + return true + }) + + // The JSON file remains authoritative when go-store is missing so + // existing deployments do not regress during the rollout. + if result := readRuntimeState(); result.OK { + if fileState, ok := result.Value.(runtimeState); ok { + for pool, deadline := range fileState.Backoff { + if _, seen := state.Backoff[pool]; !seen { + state.Backoff[pool] = deadline + } + } + for pool, count := range fileState.FailCount { + if _, seen := state.FailCount[pool]; !seen { + state.FailCount[pool] = count + } + } + } } if s.backoff == nil { @@ -68,11 +102,26 @@ func (s *PrepSubsystem) persistRuntimeState() { if len(state.Backoff) == 0 && len(state.FailCount) == 0 { fs.Delete(runtimeStatePath()) + s.stateStoreDelete(stateRuntimeGroup, "backoff") + s.stateStoreDelete(stateRuntimeGroup, "fail_count") return } fs.EnsureDir(runtimeStateDir()) fs.WriteAtomic(runtimeStatePath(), core.JSONMarshalString(state)) + + // Mirror the authoritative JSON to the go-store cache so restarts see + // the same state even when the JSON file is archived or rotated. + if len(state.Backoff) > 0 { + s.stateStoreSet(stateRuntimeGroup, "backoff", state.Backoff) + } else { + s.stateStoreDelete(stateRuntimeGroup, "backoff") + } + if len(state.FailCount) > 0 { + s.stateStoreSet(stateRuntimeGroup, "fail_count", state.FailCount) + } else { + s.stateStoreDelete(stateRuntimeGroup, "fail_count") + } } func readRuntimeState() core.Result { diff --git a/pkg/agentic/statestore.go b/pkg/agentic/statestore.go new file mode 100644 index 0000000..1de9da4 --- /dev/null +++ b/pkg/agentic/statestore.go @@ -0,0 +1,198 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "sync" + + core "dappco.re/go/core" + store "dappco.re/go/core/store" +) + +// Usage example: `groupName := queueGroup` // "queue" +const ( + stateQueueGroup = "queue" + stateConcurrencyGroup = "concurrency" + stateRegistryGroup = "registry" + stateDispatchHistoryGroup = "dispatch_history" + stateSyncQueueGroup = "sync_queue" + stateRuntimeGroup = "runtime" +) + +// stateStorePath returns the canonical path for the top-level agent DuckDB +// state file described in RFC §15.2 — `.core/db.duckdb` relative to CoreRoot. +// +// Usage example: `path := stateStorePath()` +func stateStorePath() string { + return core.JoinPath(CoreRoot(), "db.duckdb") +} + +// stateStoreRef keeps the store instance, its initialisation error, and a +// sync.Once so multiple callers observe the same lazily-initialised value. +type stateStoreRef struct { + once sync.Once + instance *store.Store + err error +} + +// stateStoreReference is a subsystem-scoped handle that exposes the lazily +// initialised go-store Store. The agent works fully offline when go-store +// cannot be initialised — RFC §15.6. +// +// Usage example: `st := s.stateStoreInstance(); if st == nil { return } // in-memory fallback` +func (s *PrepSubsystem) stateStoreInstance() *store.Store { + if s == nil { + return nil + } + ref := s.stateStoreRef() + if ref == nil { + return nil + } + ref.once.Do(func() { + ref.instance, ref.err = openStateStore() + }) + if ref.err != nil { + return nil + } + return ref.instance +} + +// stateStoreErr reports the last error observed while opening the go-store +// backend, so callers can decide whether to log or silently fall back. +// +// Usage example: `if err := s.stateStoreErr(); err != nil { core.Warn("state store unavailable", "err", err) }` +func (s *PrepSubsystem) stateStoreErr() error { + if s == nil { + return nil + } + ref := s.stateStoreRef() + if ref == nil { + return nil + } + _ = s.stateStoreInstance() + return ref.err +} + +// stateStoreRef returns the subsystem-scoped reference, allocating it lazily +// so zero-value subsystems (used by tests) do not crash. +func (s *PrepSubsystem) stateStoreRef() *stateStoreRef { + if s == nil { + return nil + } + s.stateOnce.Do(func() { + s.state = &stateStoreRef{} + }) + return s.state +} + +// closeStateStore releases the go-store handle. Safe to call multiple times. +// +// Usage example: `s.closeStateStore()` +func (s *PrepSubsystem) closeStateStore() { + if s == nil { + return + } + ref := s.state + if ref == nil { + return + } + if ref.instance != nil { + _ = ref.instance.Close() + ref.instance = nil + } + ref.err = nil + s.state = nil + s.stateOnce = sync.Once{} +} + +// openStateStore attempts to open the canonical state store at +// `.core/db.duckdb`. The filesystem is prepared first so new workspaces do +// not fail the first call. Errors are returned but never cause a panic — the +// caller falls back to in-memory or file-based state per RFC §15.6. +// +// Usage example: `st, err := openStateStore()` +func openStateStore() (*store.Store, error) { + path := stateStorePath() + directory := core.PathDir(path) + if ensureResult := fs.EnsureDir(directory); !ensureResult.OK { + if err, ok := ensureResult.Value.(error); ok { + return nil, core.E("agentic.stateStore", "prepare state directory", err) + } + return nil, core.E("agentic.stateStore", "prepare state directory", nil) + } + + storeInstance, err := store.New(path) + if err != nil { + return nil, core.E("agentic.stateStore", "open state store", err) + } + return storeInstance, nil +} + +// stateStoreSet writes a JSON-encoded value to the given group+key if the +// store is available. No-op when go-store is not initialised. +// +// Usage example: `s.stateStoreSet(stateQueueGroup, "core/go-io", queueEntry)` +func (s *PrepSubsystem) stateStoreSet(group, key string, value any) { + st := s.stateStoreInstance() + if st == nil { + return + } + payload := core.JSONMarshalString(value) + _ = st.Set(group, key, payload) +} + +// stateStoreDelete removes a key from the given group if the store is +// available. No-op when go-store is not initialised. +// +// Usage example: `s.stateStoreDelete(stateRegistryGroup, "core/go-io/task-5")` +func (s *PrepSubsystem) stateStoreDelete(group, key string) { + st := s.stateStoreInstance() + if st == nil { + return + } + _ = st.Delete(group, key) +} + +// stateStoreRestore iterates every entry in the given group and invokes +// the visitor with the decoded JSON payload. The visitor must return true +// to continue iteration or false to stop early. No-op when go-store is not +// initialised — callers continue to use file-based/in-memory state. +// +// Usage example: +// +// s.stateStoreRestore(stateQueueGroup, func(key, value string) bool { +// var task QueuedTask +// core.JSONUnmarshalString(value, &task) +// s.queue.Enqueue(task) +// return true +// }) +func (s *PrepSubsystem) stateStoreRestore(group string, visit func(key, value string) bool) { + st := s.stateStoreInstance() + if st == nil || visit == nil { + return + } + for entry, err := range st.AllSeq(group) { + if err != nil { + return + } + if !visit(entry.Key, entry.Value) { + return + } + } +} + +// stateStoreCount reports the number of entries in a group. Returns 0 when +// the store is unavailable so call sites can compare to zero without guards. +// +// Usage example: `if s.stateStoreCount(stateRegistryGroup) > 0 { /* restore workspaces */ }` +func (s *PrepSubsystem) stateStoreCount(group string) int { + st := s.stateStoreInstance() + if st == nil { + return 0 + } + count, err := st.Count(group) + if err != nil { + return 0 + } + return count +} diff --git a/pkg/agentic/statestore_test.go b/pkg/agentic/statestore_test.go new file mode 100644 index 0000000..b1cd49d --- /dev/null +++ b/pkg/agentic/statestore_test.go @@ -0,0 +1,173 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "testing" + "time" + + core "dappco.re/go/core" +) + +// withStateStoreTempDir redirects CORE_WORKSPACE to a fresh temporary +// directory so statestore tests can open `.core/db.duckdb` in isolation. +func withStateStoreTempDir(t *testing.T) { + t.Helper() + dir := t.TempDir() + t.Setenv("CORE_WORKSPACE", dir) + t.Setenv("CORE_HOME", dir) + t.Setenv("HOME", dir) + t.Setenv("DIR_HOME", dir) +} + +// TestStatestore_StateStoreInstance_Good verifies the DuckDB-backed store can +// be initialised inside a temporary workspace and that the same instance is +// returned on subsequent calls (lazy once semantics). +// +// Usage example: `go test ./pkg/agentic -run TestStatestore_StateStoreInstance_Good` +func TestStatestore_StateStoreInstance_Good(t *testing.T) { + withStateStoreTempDir(t) + + subsystem := &PrepSubsystem{} + defer subsystem.closeStateStore() + + first := subsystem.stateStoreInstance() + if first == nil { + t.Fatalf("expected store instance, got nil; err=%v", subsystem.stateStoreErr()) + } + + second := subsystem.stateStoreInstance() + if second != first { + t.Fatalf("expected lazy-once to return same instance, got different pointers") + } +} + +// TestStatestore_StateStoreSet_Good_WritesAndRestores verifies the helpers +// round-trip JSON entries through the store and that stateStoreRestore walks +// every entry. +// +// Usage example: `go test ./pkg/agentic -run TestStatestore_StateStoreSet_Good_WritesAndRestores` +func TestStatestore_StateStoreSet_Good_WritesAndRestores(t *testing.T) { + withStateStoreTempDir(t) + + subsystem := &PrepSubsystem{} + defer subsystem.closeStateStore() + + subsystem.stateStoreSet(stateRegistryGroup, "core/go-io", map[string]any{"status": "running"}) + subsystem.stateStoreSet(stateRegistryGroup, "core/go-store", map[string]any{"status": "queued"}) + + entries := map[string]map[string]any{} + subsystem.stateStoreRestore(stateRegistryGroup, func(key, value string) bool { + decoded := map[string]any{} + if result := core.JSONUnmarshalString(value, &decoded); !result.OK { + t.Fatalf("unmarshal %s: %v", key, result.Value) + } + entries[key] = decoded + return true + }) + + if len(entries) != 2 { + t.Fatalf("expected 2 entries, got %d: %v", len(entries), entries) + } + if status, ok := entries["core/go-io"]["status"].(string); !ok || status != "running" { + t.Fatalf("expected core/go-io status=running, got %v", entries["core/go-io"]) + } +} + +// TestStatestore_CloseStateStore_Bad_SafeOnNilSubsystem verifies close helpers +// do not panic on nil receivers — critical for test teardown paths and the +// graceful-degradation requirement in RFC §15.6. +// +// Usage example: `go test ./pkg/agentic -run TestStatestore_CloseStateStore_Bad_SafeOnNilSubsystem` +func TestStatestore_CloseStateStore_Bad_SafeOnNilSubsystem(t *testing.T) { + var subsystem *PrepSubsystem + subsystem.closeStateStore() + if instance := subsystem.stateStoreInstance(); instance != nil { + t.Fatalf("expected nil instance on nil subsystem, got %v", instance) + } +} + +// TestStatestore_StateStoreDelete_Ugly_DeletingUnknownKey verifies delete is a +// no-op for missing keys so call sites never need to guard against misses. +// +// Usage example: `go test ./pkg/agentic -run TestStatestore_StateStoreDelete_Ugly_DeletingUnknownKey` +func TestStatestore_StateStoreDelete_Ugly_DeletingUnknownKey(t *testing.T) { + withStateStoreTempDir(t) + + subsystem := &PrepSubsystem{} + defer subsystem.closeStateStore() + + subsystem.stateStoreDelete(stateRegistryGroup, "never-existed") + subsystem.stateStoreSet(stateRegistryGroup, "real", map[string]any{"value": 1}) + subsystem.stateStoreDelete(stateRegistryGroup, "real") + + count := subsystem.stateStoreCount(stateRegistryGroup) + if count != 0 { + t.Fatalf("expected registry empty after delete, got count=%d", count) + } +} + +// TestStatestore_HydrateWorkspaces_Good_RestoresFromStore mirrors RFC §15.3 — +// the registry group is populated before hydrateWorkspaces runs, and the +// subsystem must restore those entries so ghost agents are detectable across +// restarts without reading the status.json filesystem tree. +// +// Usage example: `go test ./pkg/agentic -run TestStatestore_HydrateWorkspaces_Good_RestoresFromStore` +func TestStatestore_HydrateWorkspaces_Good_RestoresFromStore(t *testing.T) { + withStateStoreTempDir(t) + + subsystem := &PrepSubsystem{} + subsystem.workspaces = core.NewRegistry[*WorkspaceStatus]() + defer subsystem.closeStateStore() + + subsystem.stateStoreSet(stateRegistryGroup, "core/go-io/task-5", WorkspaceStatus{ + Status: "running", + Agent: "codex:gpt-5.4", + PID: 0, + }) + + subsystem.hydrateWorkspaces() + + result := subsystem.Workspaces().Get("core/go-io/task-5") + if !result.OK { + t.Fatalf("expected workspace restored, got miss") + } + status, ok := result.Value.(*WorkspaceStatus) + if !ok { + t.Fatalf("expected *WorkspaceStatus, got %T", result.Value) + } + // Dead PID should be marked failed, per §15.3. + if status.Status != "failed" { + t.Fatalf("expected ghost agent marked failed, got status=%s", status.Status) + } +} + +// TestStatestore_RuntimeState_Good_PersistsAcrossReloads mirrors RFC §15 — +// backoff deadlines saved via persistRuntimeState must replay when a new +// subsystem instance calls loadRuntimeState, enabling seamless resume after +// dispatch crashes. +// +// Usage example: `go test ./pkg/agentic -run TestStatestore_RuntimeState_Good_PersistsAcrossReloads` +func TestStatestore_RuntimeState_Good_PersistsAcrossReloads(t *testing.T) { + withStateStoreTempDir(t) + + original := &PrepSubsystem{ + backoff: map[string]time.Time{ + "codex": time.Now().Add(15 * time.Minute), + }, + failCount: map[string]int{"codex": 3}, + } + original.persistRuntimeState() + original.closeStateStore() + + replay := &PrepSubsystem{} + defer replay.closeStateStore() + replay.loadRuntimeState() + + if _, ok := replay.backoff["codex"]; !ok { + t.Fatalf("expected replay to restore codex backoff, got map=%v", replay.backoff) + } + if replay.failCount["codex"] != 3 { + t.Fatalf("expected replay fail count=3, got %d", replay.failCount["codex"]) + } +}