feat(agent/agentic): RFC §16.5 offline sync queue with exponential backoff

Adds remote_sync_queue.go with persistent queue + 1s→30s exponential
backoff drainer, max-attempt drop with audit record, ctx.Done() clean
shutdown. sync.go rewired to enqueue dispatches via the queue rather
than the legacy minute-ticker schedule.

CORE_AGENT_SYNC_MAX_ATTEMPTS / CORE_SYNC_MAX_ATTEMPTS env vars override
the default 100-attempt cap.

Targeted tests cover happy-path drain, retry/backoff with stub clock,
cancellation, max-attempt exhaustion, and file-backed restart persistence.
go test was blocked by go.work workspace dep resolution in sandbox; CI
catches the real run.

Co-authored-by: Codex <noreply@openai.com>
Closes tasks.lthn.sh/view.php?id=231
This commit is contained in:
Snider 2026-04-25 20:57:10 +01:00
parent 929f6effbd
commit 64852472de
3 changed files with 671 additions and 59 deletions

View file

@ -0,0 +1,313 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"strconv"
"sync"
"time"
core "dappco.re/go/core"
)
const (
remoteSyncQueueDefaultMaxAttempts = 100
remoteSyncQueueMinBackoff = time.Second
remoteSyncQueueMaxBackoff = 30 * time.Second
)
var remoteSyncQueueMu sync.Mutex
type remoteSyncClock interface {
Now() time.Time
After(time.Duration) <-chan time.Time
}
type remoteSyncRealClock struct{}
// clock := remoteSyncRealClock{}
// now := clock.Now()
func (remoteSyncRealClock) Now() time.Time {
return time.Now()
}
// clock := remoteSyncRealClock{}
// <-clock.After(time.Second)
func (remoteSyncRealClock) After(delay time.Duration) <-chan time.Time {
return time.After(delay)
}
type remoteSyncQueueController struct {
mu *sync.Mutex
clock remoteSyncClock
idlePoll time.Duration
maxAttempts int
read func() []syncQueuedPush
write func([]syncQueuedPush)
push func(context.Context, syncQueuedPush) error
onSuccess func(syncQueuedPush, time.Time)
onDrop func(syncQueuedPush, error, time.Time)
warn func(string, ...any)
}
// delay := remoteSyncQueueBackoff(3) // 4s
func remoteSyncQueueBackoff(attempts int) time.Duration {
if attempts <= 0 {
return 0
}
shift := attempts - 1
if shift > 5 {
shift = 5
}
delay := remoteSyncQueueMinBackoff * time.Duration(1<<shift)
if delay > remoteSyncQueueMaxBackoff {
return remoteSyncQueueMaxBackoff
}
return delay
}
// attempts := remoteSyncQueueMaxAttempts() // 100
func remoteSyncQueueMaxAttempts() int {
for _, key := range []string{"CORE_AGENT_SYNC_MAX_ATTEMPTS", "CORE_SYNC_MAX_ATTEMPTS"} {
value := core.Trim(core.Env(key))
if value == "" {
continue
}
attempts, err := strconv.Atoi(value)
if err == nil && attempts > 0 {
return attempts
}
}
return remoteSyncQueueDefaultMaxAttempts
}
// controller := s.remoteSyncQueueController(remoteSyncRealClock{}, time.Minute)
// controller.run(ctx)
func (s *PrepSubsystem) remoteSyncQueueController(clock remoteSyncClock, idlePoll time.Duration) remoteSyncQueueController {
if clock == nil {
clock = remoteSyncRealClock{}
}
return remoteSyncQueueController{
mu: &remoteSyncQueueMu,
clock: clock,
idlePoll: idlePoll,
maxAttempts: remoteSyncQueueMaxAttempts(),
read: func() []syncQueuedPush {
if s == nil {
return nil
}
return append([]syncQueuedPush(nil), s.readSyncQueue()...)
},
write: func(queued []syncQueuedPush) {
if s == nil {
return
}
s.writeSyncQueue(queued)
},
push: func(ctx context.Context, queued syncQueuedPush) error {
if s == nil {
return core.E("agentic.remoteSyncQueue", "nil subsystem", nil)
}
return s.postSyncPush(ctx, queued.AgentID, queued.Dispatches, s.syncToken())
},
onSuccess: func(queued syncQueuedPush, at time.Time) {
markDispatchesSynced(queued.Dispatches)
recordSyncPush(at)
recordSyncHistory("push", queued.AgentID, queued.FleetNodeID, len(core.JSONMarshalString(map[string]any{
"agent_id": queued.AgentID,
"dispatches": queued.Dispatches,
})), len(queued.Dispatches), at)
},
onDrop: func(queued syncQueuedPush, err error, at time.Time) {
recordSyncDrop(queued.AgentID, queued.FleetNodeID, queued.Dispatches, queued.Attempts, err, at)
},
warn: core.Warn,
}
}
// queued := syncQueuedPush{AgentID: "charon", Dispatches: []map[string]any{{"workspace": "core/go-io/task-5"}}}
// s.enqueueSyncPush(queued)
func (s *PrepSubsystem) enqueueSyncPush(queued syncQueuedPush) {
remoteSyncQueueMu.Lock()
defer remoteSyncQueueMu.Unlock()
current := s.readSyncQueue()
current = append(current, queued)
s.writeSyncQueue(current)
}
// synced := controller.drainReady(ctx)
func (c remoteSyncQueueController) drainReady(ctx context.Context) int {
if ctx == nil {
return 0
}
c.lock()
defer c.unlock()
synced := 0
for {
if ctx.Err() != nil {
return synced
}
queued := c.readQueue()
if len(queued) == 0 {
return synced
}
head := queued[0]
if len(head.Dispatches) == 0 {
c.writeQueue(queued[1:])
continue
}
now := c.now()
if !head.NextAttempt.IsZero() && head.NextAttempt.After(now) {
return synced
}
err := c.pushQueue(ctx, head)
finishedAt := c.now()
if err == nil {
if c.onSuccess != nil {
c.onSuccess(head, finishedAt)
}
c.writeQueue(queued[1:])
synced += len(head.Dispatches)
continue
}
head.Attempts++
if head.Attempts >= c.maxAttemptLimit() {
if c.warn != nil {
c.warn("agentic: dropping sync queue entry after max attempts",
"agent_id", head.AgentID,
"attempts", head.Attempts,
"reason", err)
}
if c.onDrop != nil {
c.onDrop(head, err, finishedAt)
}
c.writeQueue(queued[1:])
continue
}
head.NextAttempt = finishedAt.Add(remoteSyncQueueBackoff(head.Attempts))
queued[0] = head
c.writeQueue(queued)
return synced
}
}
// controller := s.remoteSyncQueueController(remoteSyncRealClock{}, time.Minute)
// controller.run(ctx)
func (c remoteSyncQueueController) run(ctx context.Context) {
if ctx == nil {
return
}
for {
if ctx.Err() != nil {
return
}
c.drainReady(ctx)
delay := c.nextWait()
if delay <= 0 {
continue
}
select {
case <-ctx.Done():
return
case <-c.after(delay):
}
}
}
func (c remoteSyncQueueController) nextWait() time.Duration {
c.lock()
defer c.unlock()
queued := c.readQueue()
if len(queued) == 0 {
return c.idleDelay()
}
head := queued[0]
if len(head.Dispatches) == 0 {
return 0
}
now := c.now()
if head.NextAttempt.IsZero() || !head.NextAttempt.After(now) {
return 0
}
return head.NextAttempt.Sub(now)
}
func (c remoteSyncQueueController) maxAttemptLimit() int {
if c.maxAttempts > 0 {
return c.maxAttempts
}
return remoteSyncQueueDefaultMaxAttempts
}
func (c remoteSyncQueueController) idleDelay() time.Duration {
if c.idlePoll > 0 {
return c.idlePoll
}
return remoteSyncQueueMinBackoff
}
func (c remoteSyncQueueController) now() time.Time {
if c.clock == nil {
return time.Now()
}
return c.clock.Now()
}
func (c remoteSyncQueueController) after(delay time.Duration) <-chan time.Time {
if c.clock == nil {
return time.After(delay)
}
return c.clock.After(delay)
}
func (c remoteSyncQueueController) readQueue() []syncQueuedPush {
if c.read == nil {
return nil
}
return c.read()
}
func (c remoteSyncQueueController) writeQueue(queued []syncQueuedPush) {
if c.write == nil {
return
}
c.write(queued)
}
func (c remoteSyncQueueController) pushQueue(ctx context.Context, queued syncQueuedPush) error {
if c.push == nil {
return nil
}
return c.push(ctx, queued)
}
func (c remoteSyncQueueController) lock() {
if c.mu != nil {
c.mu.Lock()
}
}
func (c remoteSyncQueueController) unlock() {
if c.mu != nil {
c.mu.Unlock()
}
}

View file

@ -0,0 +1,323 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"errors"
"sync"
"testing"
"time"
core "dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestRemotesyncqueue_DrainReady_Good_EnqueueAndDrain(t *testing.T) {
store := &remoteSyncQueueTestStore{}
store.write([]syncQueuedPush{{
AgentID: "charon",
QueuedAt: time.Date(2026, 4, 25, 10, 0, 0, 0, time.UTC),
Dispatches: []map[string]any{{"workspace": "core/go-io/task-5", "status": "completed"}},
}})
pushed := make(chan syncQueuedPush, 1)
controller := remoteSyncQueueController{
clock: newRemoteSyncQueueStubClock(time.Date(2026, 4, 25, 10, 0, 0, 0, time.UTC)),
maxAttempts: 3,
read: store.read,
write: store.write,
push: func(_ context.Context, queued syncQueuedPush) error {
pushed <- queued
return nil
},
}
synced := controller.drainReady(context.Background())
require.Equal(t, 1, synced)
assert.Empty(t, store.snapshot())
select {
case queued := <-pushed:
assert.Equal(t, "charon", queued.AgentID)
require.Len(t, queued.Dispatches, 1)
assert.Equal(t, "core/go-io/task-5", queued.Dispatches[0]["workspace"])
case <-time.After(time.Second):
t.Fatal("expected queued push to be drained")
}
}
func TestRemotesyncqueue_Run_Bad_RetriesWithBackoff(t *testing.T) {
start := time.Date(2026, 4, 25, 10, 0, 0, 0, time.UTC)
store := &remoteSyncQueueTestStore{}
store.write([]syncQueuedPush{{
AgentID: "charon",
QueuedAt: start,
Dispatches: []map[string]any{{"workspace": "core/go-io/task-5", "status": "completed"}},
}})
clock := newRemoteSyncQueueStubClock(start)
attempts := make(chan int, 3)
var calls int
controller := remoteSyncQueueController{
clock: clock,
idlePoll: time.Hour,
maxAttempts: 10,
read: store.read,
write: store.write,
push: func(_ context.Context, _ syncQueuedPush) error {
calls++
attempts <- calls
if calls < 3 {
return errors.New("offline")
}
return nil
},
}
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
controller.run(ctx)
close(done)
}()
assert.Equal(t, 1, remoteSyncQueueReceiveAttempt(t, attempts))
require.Eventually(t, func() bool {
queued := store.snapshot()
return len(queued) == 1 &&
queued[0].Attempts == 1 &&
queued[0].NextAttempt.Equal(start.Add(time.Second))
}, time.Second, 10*time.Millisecond)
require.Eventually(t, func() bool { return clock.WaiterCount() == 1 }, time.Second, 10*time.Millisecond)
clock.Advance(999 * time.Millisecond)
remoteSyncQueueAssertNoAttempt(t, attempts)
clock.Advance(time.Millisecond)
assert.Equal(t, 2, remoteSyncQueueReceiveAttempt(t, attempts))
require.Eventually(t, func() bool {
queued := store.snapshot()
return len(queued) == 1 &&
queued[0].Attempts == 2 &&
queued[0].NextAttempt.Equal(start.Add(3*time.Second))
}, time.Second, 10*time.Millisecond)
require.Eventually(t, func() bool { return clock.WaiterCount() == 1 }, time.Second, 10*time.Millisecond)
clock.Advance(2 * time.Second)
assert.Equal(t, 3, remoteSyncQueueReceiveAttempt(t, attempts))
require.Eventually(t, func() bool { return len(store.snapshot()) == 0 }, time.Second, 10*time.Millisecond)
cancel()
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("remote sync drainer did not stop")
}
}
func TestRemotesyncqueue_Run_Ugly_ContextCancellationStopsDrainer(t *testing.T) {
clock := newRemoteSyncQueueStubClock(time.Date(2026, 4, 25, 10, 0, 0, 0, time.UTC))
controller := remoteSyncQueueController{
clock: clock,
idlePoll: time.Hour,
read: func() []syncQueuedPush { return nil },
write: func([]syncQueuedPush) {},
}
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
controller.run(ctx)
close(done)
}()
require.Eventually(t, func() bool { return clock.WaiterCount() == 1 }, time.Second, 10*time.Millisecond)
cancel()
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("remote sync drainer did not stop after cancellation")
}
}
func TestRemotesyncqueue_DrainReady_Ugly_MaxAttemptsExhaustedLogsAndDrops(t *testing.T) {
root := t.TempDir()
setTestWorkspace(t, root)
store := &remoteSyncQueueTestStore{}
store.write([]syncQueuedPush{{
AgentID: "charon",
QueuedAt: time.Date(2026, 4, 25, 10, 0, 0, 0, time.UTC),
Attempts: 1,
Dispatches: []map[string]any{{"workspace": "core/go-io/task-5", "status": "completed"}},
}})
var warnings []string
controller := remoteSyncQueueController{
clock: newRemoteSyncQueueStubClock(time.Date(2026, 4, 25, 10, 0, 1, 0, time.UTC)),
maxAttempts: 2,
read: store.read,
write: store.write,
push: func(_ context.Context, _ syncQueuedPush) error {
return errors.New("offline")
},
onDrop: func(queued syncQueuedPush, err error, at time.Time) {
recordSyncDrop(queued.AgentID, queued.FleetNodeID, queued.Dispatches, queued.Attempts, err, at)
},
warn: func(message string, _ ...any) {
warnings = append(warnings, message)
},
}
synced := controller.drainReady(context.Background())
require.Equal(t, 0, synced)
assert.Empty(t, store.snapshot())
require.Len(t, warnings, 1)
assert.Contains(t, warnings[0], "dropping sync queue entry")
records := readSyncRecords()
require.Len(t, records, 1)
assert.Equal(t, "drop", records[0].Direction)
assert.Equal(t, 2, records[0].Attempts)
assert.Equal(t, 1, records[0].ItemsCount)
assert.Contains(t, records[0].Reason, "offline")
}
func TestRemotesyncqueue_FileQueue_Good_PersistsAcrossRestart(t *testing.T) {
root := t.TempDir()
setTestWorkspace(t, root)
writeSyncQueue([]syncQueuedPush{{
AgentID: "charon",
QueuedAt: time.Date(2026, 4, 25, 10, 0, 0, 0, time.UTC),
Dispatches: []map[string]any{{"workspace": "core/go-io/task-5", "status": "completed"}},
}})
assert.True(t, fs.Exists(syncQueuePath()))
restarted := readSyncQueue()
require.Len(t, restarted, 1)
assert.Equal(t, "charon", restarted[0].AgentID)
require.Len(t, restarted[0].Dispatches, 1)
assert.Equal(t, "core/go-io/task-5", restarted[0].Dispatches[0]["workspace"])
}
type remoteSyncQueueTestStore struct {
mu sync.Mutex
queued []syncQueuedPush
}
func (s *remoteSyncQueueTestStore) read() []syncQueuedPush {
s.mu.Lock()
defer s.mu.Unlock()
return remoteSyncQueueClone(s.queued)
}
func (s *remoteSyncQueueTestStore) write(queued []syncQueuedPush) {
s.mu.Lock()
defer s.mu.Unlock()
s.queued = remoteSyncQueueClone(queued)
}
func (s *remoteSyncQueueTestStore) snapshot() []syncQueuedPush {
return s.read()
}
type remoteSyncQueueStubClock struct {
mu sync.Mutex
now time.Time
waiters []remoteSyncQueueStubWaiter
}
type remoteSyncQueueStubWaiter struct {
at time.Time
ch chan time.Time
}
func newRemoteSyncQueueStubClock(now time.Time) *remoteSyncQueueStubClock {
return &remoteSyncQueueStubClock{now: now}
}
func (c *remoteSyncQueueStubClock) Now() time.Time {
c.mu.Lock()
defer c.mu.Unlock()
return c.now
}
func (c *remoteSyncQueueStubClock) After(delay time.Duration) <-chan time.Time {
c.mu.Lock()
defer c.mu.Unlock()
ch := make(chan time.Time, 1)
if delay <= 0 {
ch <- c.now
return ch
}
c.waiters = append(c.waiters, remoteSyncQueueStubWaiter{
at: c.now.Add(delay),
ch: ch,
})
return ch
}
func (c *remoteSyncQueueStubClock) Advance(delay time.Duration) {
c.mu.Lock()
c.now = c.now.Add(delay)
now := c.now
var due []remoteSyncQueueStubWaiter
pending := make([]remoteSyncQueueStubWaiter, 0, len(c.waiters))
for _, waiter := range c.waiters {
if waiter.at.After(now) {
pending = append(pending, waiter)
continue
}
due = append(due, waiter)
}
c.waiters = pending
c.mu.Unlock()
for _, waiter := range due {
waiter.ch <- now
}
}
func (c *remoteSyncQueueStubClock) WaiterCount() int {
c.mu.Lock()
defer c.mu.Unlock()
return len(c.waiters)
}
func remoteSyncQueueClone(queued []syncQueuedPush) []syncQueuedPush {
payload := core.JSONMarshalString(queued)
var clone []syncQueuedPush
_ = core.JSONUnmarshalString(payload, &clone)
return clone
}
func remoteSyncQueueReceiveAttempt(t *testing.T, attempts <-chan int) int {
t.Helper()
select {
case attempt := <-attempts:
return attempt
case <-time.After(time.Second):
t.Fatal("expected remote sync attempt")
return 0
}
}
func remoteSyncQueueAssertNoAttempt(t *testing.T, attempts <-chan int) {
t.Helper()
select {
case attempt := <-attempts:
t.Fatalf("unexpected remote sync attempt %d", attempt)
case <-time.After(100 * time.Millisecond):
}
}

View file

@ -43,6 +43,8 @@ type SyncRecord struct {
Direction string `json:"direction"`
PayloadSize int `json:"payload_size"`
ItemsCount int `json:"items_count"`
Attempts int `json:"attempts,omitempty"`
Reason string `json:"reason,omitempty"`
SyncedAt string `json:"synced_at"`
}
@ -60,7 +62,9 @@ type syncStatusState struct {
LastPullAt time.Time `json:"last_pull_at,omitempty"`
}
// syncBackoffSchedule implements RFC §16.5 — 1s → 5s → 15s → 60s → 5min max.
// syncBackoffSchedule preserves the legacy sync pacing used by compatibility
// tests and status tooling. The active offline queue retry logic lives in
// remoteSyncQueueBackoff and caps at 30 seconds per RFC §16.5.
// schedule := syncBackoffSchedule(2) // 15s
// next := time.Now().Add(schedule)
func syncBackoffSchedule(attempts int) time.Duration {
@ -89,31 +93,13 @@ const syncFlushScheduleInterval = time.Minute
// ctx, cancel := context.WithCancel(context.Background())
// go s.runSyncFlushLoop(ctx, time.Minute)
func (s *PrepSubsystem) runSyncFlushLoop(ctx context.Context, interval time.Duration) {
if ctx == nil || interval <= 0 {
if ctx == nil {
return
}
if s == nil || s.syncToken() == "" {
return
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if len(s.readSyncQueue()) == 0 {
continue
}
// QueueOnly keeps syncPushInput from re-scanning workspaces — the
// flush loop only drains entries already queued.
if _, err := s.syncPushInput(ctx, SyncPushInput{QueueOnly: true}); err != nil {
core.Warn("sync flush loop failed", "error", err)
}
}
}
s.remoteSyncQueueController(remoteSyncRealClock{}, interval).run(ctx)
}
// result := c.Action("agentic.sync.push").Run(ctx, core.NewOptions())
@ -156,9 +142,8 @@ func (s *PrepSubsystem) syncPushInput(ctx context.Context, input SyncPushInput)
dispatches = collectSyncDispatches()
}
token := s.syncToken()
queuedPushes := s.readSyncQueue()
if len(dispatches) > 0 {
queuedPushes = append(queuedPushes, syncQueuedPush{
if len(dispatches) > 0 && (token != "" || len(input.Dispatches) > 0) {
s.enqueueSyncPush(syncQueuedPush{
AgentID: agentID,
FleetNodeID: input.FleetNodeID,
Dispatches: dispatches,
@ -166,43 +151,9 @@ func (s *PrepSubsystem) syncPushInput(ctx context.Context, input SyncPushInput)
})
}
if token == "" {
if len(input.Dispatches) > 0 {
s.writeSyncQueue(queuedPushes)
}
return SyncPushOutput{Success: true, Count: 0}, nil
}
if len(queuedPushes) == 0 {
return SyncPushOutput{Success: true, Count: 0}, nil
}
synced := 0
now := time.Now()
for i, queued := range queuedPushes {
if len(queued.Dispatches) == 0 {
continue
}
if !queued.NextAttempt.IsZero() && queued.NextAttempt.After(now) {
// Respect backoff — persist remaining tail so queue survives restart.
s.writeSyncQueue(queuedPushes[i:])
return SyncPushOutput{Success: true, Count: synced}, nil
}
if err := s.postSyncPush(ctx, queued.AgentID, queued.Dispatches, token); err != nil {
remaining := append([]syncQueuedPush(nil), queuedPushes[i:]...)
remaining[0].Attempts = queued.Attempts + 1
remaining[0].NextAttempt = time.Now().Add(syncBackoffSchedule(remaining[0].Attempts))
s.writeSyncQueue(remaining)
return SyncPushOutput{Success: true, Count: synced}, nil
}
synced += len(queued.Dispatches)
markDispatchesSynced(queued.Dispatches)
recordSyncPush(time.Now())
recordSyncHistory("push", queued.AgentID, queued.FleetNodeID, len(core.JSONMarshalString(map[string]any{
"agent_id": queued.AgentID,
"dispatches": queued.Dispatches,
})), len(queued.Dispatches), time.Now())
}
s.writeSyncQueue(nil)
synced := s.remoteSyncQueueController(remoteSyncRealClock{}, syncFlushScheduleInterval).drainReady(ctx)
return SyncPushOutput{Success: true, Count: synced}, nil
}
@ -649,7 +600,10 @@ func recordSyncHistory(direction, agentID string, fleetNodeID, payloadSize, item
ItemsCount: itemsCount,
SyncedAt: at.UTC().Format(time.RFC3339),
}
appendSyncRecord(record)
}
func appendSyncRecord(record SyncRecord) {
records := readSyncRecords()
records = append(records, record)
if len(records) > 100 {
@ -658,6 +612,28 @@ func recordSyncHistory(direction, agentID string, fleetNodeID, payloadSize, item
writeSyncRecords(records)
}
func recordSyncDrop(agentID string, fleetNodeID int, dispatches []map[string]any, attempts int, err error, at time.Time) {
reason := ""
if err != nil {
reason = err.Error()
}
record := SyncRecord{
AgentID: core.Trim(agentID),
FleetNodeID: fleetNodeID,
Direction: "drop",
PayloadSize: len(core.JSONMarshalString(map[string]any{
"agent_id": agentID,
"dispatches": dispatches,
})),
ItemsCount: len(dispatches),
Attempts: attempts,
Reason: reason,
SyncedAt: at.UTC().Format(time.RFC3339),
}
appendSyncRecord(record)
}
func recordSyncPush(at time.Time) {
state := readSyncStatusState()
state.LastPushAt = at