From 1cc318e2e86e8577783ea01f8a9d6edcb0d0223e Mon Sep 17 00:00:00 2001 From: Virgil Date: Wed, 1 Apr 2026 17:54:26 +0000 Subject: [PATCH] feat(agentic): persist runtime backoff state Co-Authored-By: Virgil --- pkg/agentic/dispatch.go | 2 + pkg/agentic/prep.go | 4 +- pkg/agentic/runtime_state.go | 98 +++++++++++++++++++++++++++++++ pkg/agentic/runtime_state_test.go | 73 +++++++++++++++++++++++ 4 files changed, 176 insertions(+), 1 deletion(-) create mode 100644 pkg/agentic/runtime_state.go create mode 100644 pkg/agentic/runtime_state_test.go diff --git a/pkg/agentic/dispatch.go b/pkg/agentic/dispatch.go index 9f8c744..b7e5edd 100644 --- a/pkg/agentic/dispatch.go +++ b/pkg/agentic/dispatch.go @@ -236,6 +236,7 @@ func (s *PrepSubsystem) trackFailureRate(agent, status string, startedAt time.Ti s.failCount[pool]++ if s.failCount[pool] >= 3 { s.backoff[pool] = time.Now().Add(30 * time.Minute) + s.persistRuntimeState() core.Print(nil, "rate-limit detected for %s — pausing pool for 30 minutes", pool) return true } @@ -245,6 +246,7 @@ func (s *PrepSubsystem) trackFailureRate(agent, status string, startedAt time.Ti } else { s.failCount[pool] = 0 } + s.persistRuntimeState() return false } diff --git a/pkg/agentic/prep.go b/pkg/agentic/prep.go index 57119f6..1944c95 100644 --- a/pkg/agentic/prep.go +++ b/pkg/agentic/prep.go @@ -59,7 +59,7 @@ func NewPrep() *PrepSubsystem { forgeURL := envOr("FORGE_URL", "https://forge.lthn.ai") - return &PrepSubsystem{ + subsystem := &PrepSubsystem{ forge: forge.NewForge(forgeURL, forgeToken), forgeURL: forgeURL, forgeToken: forgeToken, @@ -70,6 +70,8 @@ func NewPrep() *PrepSubsystem { failCount: make(map[string]int), workspaces: core.NewRegistry[*WorkspaceStatus](), } + subsystem.loadRuntimeState() + return subsystem } // c.Action("agentic.dispatch").Run(ctx, options) diff --git a/pkg/agentic/runtime_state.go b/pkg/agentic/runtime_state.go new file mode 100644 index 0000000..49affcd --- /dev/null +++ b/pkg/agentic/runtime_state.go @@ -0,0 +1,98 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "time" + + core "dappco.re/go/core" +) + +type runtimeState struct { + Backoff map[string]time.Time `json:"backoff,omitempty"` + FailCount map[string]int `json:"fail_count,omitempty"` +} + +func runtimeStateDir() string { + return core.JoinPath(CoreRoot(), "runtime") +} + +func runtimeStatePath() string { + return core.JoinPath(runtimeStateDir(), "dispatch.json") +} + +func (s *PrepSubsystem) loadRuntimeState() { + result := readRuntimeState() + if !result.OK { + return + } + + state, ok := result.Value.(runtimeState) + if !ok { + return + } + + if s.backoff == nil { + s.backoff = make(map[string]time.Time) + } + for pool, value := range state.Backoff { + s.backoff[pool] = value + } + + if s.failCount == nil { + s.failCount = make(map[string]int) + } + for pool, count := range state.FailCount { + s.failCount[pool] = count + } +} + +func (s *PrepSubsystem) persistRuntimeState() { + state := runtimeState{ + Backoff: make(map[string]time.Time), + FailCount: make(map[string]int), + } + + for pool, until := range s.backoff { + if until.IsZero() { + continue + } + state.Backoff[pool] = until.UTC() + } + for pool, count := range s.failCount { + if count <= 0 { + continue + } + state.FailCount[pool] = count + } + + if len(state.Backoff) == 0 && len(state.FailCount) == 0 { + fs.Delete(runtimeStatePath()) + return + } + + fs.EnsureDir(runtimeStateDir()) + fs.WriteAtomic(runtimeStatePath(), core.JSONMarshalString(state)) +} + +func readRuntimeState() core.Result { + result := fs.Read(runtimeStatePath()) + if !result.OK { + return core.Result{Value: runtimeState{}, OK: false} + } + + var state runtimeState + parseResult := core.JSONUnmarshalString(result.Value.(string), &state) + if !parseResult.OK { + return core.Result{Value: runtimeState{}, OK: false} + } + + if state.Backoff == nil { + state.Backoff = make(map[string]time.Time) + } + if state.FailCount == nil { + state.FailCount = make(map[string]int) + } + + return core.Result{Value: state, OK: true} +} diff --git a/pkg/agentic/runtime_state_test.go b/pkg/agentic/runtime_state_test.go new file mode 100644 index 0000000..6df87e7 --- /dev/null +++ b/pkg/agentic/runtime_state_test.go @@ -0,0 +1,73 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "testing" + "time" + + core "dappco.re/go/core" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRuntimeState_PersistLoad_Good_RoundTrip(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + + expectedBackoff := time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC) + subsystem := &PrepSubsystem{ + backoff: map[string]time.Time{ + "codex": expectedBackoff, + }, + failCount: map[string]int{ + "codex": 2, + }, + } + + subsystem.persistRuntimeState() + + loaded := &PrepSubsystem{ + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + } + loaded.loadRuntimeState() + + require.Len(t, loaded.backoff, 1) + assert.True(t, loaded.backoff["codex"].Equal(expectedBackoff)) + assert.Equal(t, 2, loaded.failCount["codex"]) +} + +func TestRuntimeState_Read_Bad_InvalidJSON(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + + require.True(t, fs.EnsureDir(runtimeStateDir()).OK) + require.True(t, fs.WriteAtomic(runtimeStatePath(), "{not-json").OK) + + result := readRuntimeState() + assert.False(t, result.OK) +} + +func TestRuntimeState_Persist_Ugly_EmptyStateDeletesFile(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + + require.True(t, fs.EnsureDir(runtimeStateDir()).OK) + require.True(t, fs.WriteAtomic(runtimeStatePath(), core.JSONMarshalString(runtimeState{ + Backoff: map[string]time.Time{ + "codex": time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC), + }, + FailCount: map[string]int{ + "codex": 1, + }, + })).OK) + + subsystem := &PrepSubsystem{ + backoff: map[string]time.Time{}, + failCount: map[string]int{}, + } + subsystem.persistRuntimeState() + + assert.False(t, fs.Read(runtimeStatePath()).OK) +}