feat(agentic): persist runtime backoff state
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
a783f9023c
commit
1cc318e2e8
4 changed files with 176 additions and 1 deletions
|
|
@ -236,6 +236,7 @@ func (s *PrepSubsystem) trackFailureRate(agent, status string, startedAt time.Ti
|
|||
s.failCount[pool]++
|
||||
if s.failCount[pool] >= 3 {
|
||||
s.backoff[pool] = time.Now().Add(30 * time.Minute)
|
||||
s.persistRuntimeState()
|
||||
core.Print(nil, "rate-limit detected for %s — pausing pool for 30 minutes", pool)
|
||||
return true
|
||||
}
|
||||
|
|
@ -245,6 +246,7 @@ func (s *PrepSubsystem) trackFailureRate(agent, status string, startedAt time.Ti
|
|||
} else {
|
||||
s.failCount[pool] = 0
|
||||
}
|
||||
s.persistRuntimeState()
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ func NewPrep() *PrepSubsystem {
|
|||
|
||||
forgeURL := envOr("FORGE_URL", "https://forge.lthn.ai")
|
||||
|
||||
return &PrepSubsystem{
|
||||
subsystem := &PrepSubsystem{
|
||||
forge: forge.NewForge(forgeURL, forgeToken),
|
||||
forgeURL: forgeURL,
|
||||
forgeToken: forgeToken,
|
||||
|
|
@ -70,6 +70,8 @@ func NewPrep() *PrepSubsystem {
|
|||
failCount: make(map[string]int),
|
||||
workspaces: core.NewRegistry[*WorkspaceStatus](),
|
||||
}
|
||||
subsystem.loadRuntimeState()
|
||||
return subsystem
|
||||
}
|
||||
|
||||
// c.Action("agentic.dispatch").Run(ctx, options)
|
||||
|
|
|
|||
98
pkg/agentic/runtime_state.go
Normal file
98
pkg/agentic/runtime_state.go
Normal file
|
|
@ -0,0 +1,98 @@
|
|||
// SPDX-License-Identifier: EUPL-1.2
|
||||
|
||||
package agentic
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
)
|
||||
|
||||
type runtimeState struct {
|
||||
Backoff map[string]time.Time `json:"backoff,omitempty"`
|
||||
FailCount map[string]int `json:"fail_count,omitempty"`
|
||||
}
|
||||
|
||||
func runtimeStateDir() string {
|
||||
return core.JoinPath(CoreRoot(), "runtime")
|
||||
}
|
||||
|
||||
func runtimeStatePath() string {
|
||||
return core.JoinPath(runtimeStateDir(), "dispatch.json")
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) loadRuntimeState() {
|
||||
result := readRuntimeState()
|
||||
if !result.OK {
|
||||
return
|
||||
}
|
||||
|
||||
state, ok := result.Value.(runtimeState)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if s.backoff == nil {
|
||||
s.backoff = make(map[string]time.Time)
|
||||
}
|
||||
for pool, value := range state.Backoff {
|
||||
s.backoff[pool] = value
|
||||
}
|
||||
|
||||
if s.failCount == nil {
|
||||
s.failCount = make(map[string]int)
|
||||
}
|
||||
for pool, count := range state.FailCount {
|
||||
s.failCount[pool] = count
|
||||
}
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) persistRuntimeState() {
|
||||
state := runtimeState{
|
||||
Backoff: make(map[string]time.Time),
|
||||
FailCount: make(map[string]int),
|
||||
}
|
||||
|
||||
for pool, until := range s.backoff {
|
||||
if until.IsZero() {
|
||||
continue
|
||||
}
|
||||
state.Backoff[pool] = until.UTC()
|
||||
}
|
||||
for pool, count := range s.failCount {
|
||||
if count <= 0 {
|
||||
continue
|
||||
}
|
||||
state.FailCount[pool] = count
|
||||
}
|
||||
|
||||
if len(state.Backoff) == 0 && len(state.FailCount) == 0 {
|
||||
fs.Delete(runtimeStatePath())
|
||||
return
|
||||
}
|
||||
|
||||
fs.EnsureDir(runtimeStateDir())
|
||||
fs.WriteAtomic(runtimeStatePath(), core.JSONMarshalString(state))
|
||||
}
|
||||
|
||||
func readRuntimeState() core.Result {
|
||||
result := fs.Read(runtimeStatePath())
|
||||
if !result.OK {
|
||||
return core.Result{Value: runtimeState{}, OK: false}
|
||||
}
|
||||
|
||||
var state runtimeState
|
||||
parseResult := core.JSONUnmarshalString(result.Value.(string), &state)
|
||||
if !parseResult.OK {
|
||||
return core.Result{Value: runtimeState{}, OK: false}
|
||||
}
|
||||
|
||||
if state.Backoff == nil {
|
||||
state.Backoff = make(map[string]time.Time)
|
||||
}
|
||||
if state.FailCount == nil {
|
||||
state.FailCount = make(map[string]int)
|
||||
}
|
||||
|
||||
return core.Result{Value: state, OK: true}
|
||||
}
|
||||
73
pkg/agentic/runtime_state_test.go
Normal file
73
pkg/agentic/runtime_state_test.go
Normal file
|
|
@ -0,0 +1,73 @@
|
|||
// SPDX-License-Identifier: EUPL-1.2
|
||||
|
||||
package agentic
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestRuntimeState_PersistLoad_Good_RoundTrip(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", root)
|
||||
|
||||
expectedBackoff := time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC)
|
||||
subsystem := &PrepSubsystem{
|
||||
backoff: map[string]time.Time{
|
||||
"codex": expectedBackoff,
|
||||
},
|
||||
failCount: map[string]int{
|
||||
"codex": 2,
|
||||
},
|
||||
}
|
||||
|
||||
subsystem.persistRuntimeState()
|
||||
|
||||
loaded := &PrepSubsystem{
|
||||
backoff: make(map[string]time.Time),
|
||||
failCount: make(map[string]int),
|
||||
}
|
||||
loaded.loadRuntimeState()
|
||||
|
||||
require.Len(t, loaded.backoff, 1)
|
||||
assert.True(t, loaded.backoff["codex"].Equal(expectedBackoff))
|
||||
assert.Equal(t, 2, loaded.failCount["codex"])
|
||||
}
|
||||
|
||||
func TestRuntimeState_Read_Bad_InvalidJSON(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", root)
|
||||
|
||||
require.True(t, fs.EnsureDir(runtimeStateDir()).OK)
|
||||
require.True(t, fs.WriteAtomic(runtimeStatePath(), "{not-json").OK)
|
||||
|
||||
result := readRuntimeState()
|
||||
assert.False(t, result.OK)
|
||||
}
|
||||
|
||||
func TestRuntimeState_Persist_Ugly_EmptyStateDeletesFile(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", root)
|
||||
|
||||
require.True(t, fs.EnsureDir(runtimeStateDir()).OK)
|
||||
require.True(t, fs.WriteAtomic(runtimeStatePath(), core.JSONMarshalString(runtimeState{
|
||||
Backoff: map[string]time.Time{
|
||||
"codex": time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC),
|
||||
},
|
||||
FailCount: map[string]int{
|
||||
"codex": 1,
|
||||
},
|
||||
})).OK)
|
||||
|
||||
subsystem := &PrepSubsystem{
|
||||
backoff: map[string]time.Time{},
|
||||
failCount: map[string]int{},
|
||||
}
|
||||
subsystem.persistRuntimeState()
|
||||
|
||||
assert.False(t, fs.Read(runtimeStatePath()).OK)
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue