From 64852472debaa00624bcd32d710f51008890ec66 Mon Sep 17 00:00:00 2001 From: Snider Date: Sat, 25 Apr 2026 20:57:10 +0100 Subject: [PATCH] =?UTF-8?q?feat(agent/agentic):=20RFC=20=C2=A716.5=20offli?= =?UTF-8?q?ne=20sync=20queue=20with=20exponential=20backoff?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds remote_sync_queue.go with persistent queue + 1s→30s exponential backoff drainer, max-attempt drop with audit record, ctx.Done() clean shutdown. sync.go rewired to enqueue dispatches via the queue rather than the legacy minute-ticker schedule. CORE_AGENT_SYNC_MAX_ATTEMPTS / CORE_SYNC_MAX_ATTEMPTS env vars override the default 100-attempt cap. Targeted tests cover happy-path drain, retry/backoff with stub clock, cancellation, max-attempt exhaustion, and file-backed restart persistence. go test was blocked by go.work workspace dep resolution in sandbox; CI catches the real run. Co-authored-by: Codex Closes tasks.lthn.sh/view.php?id=231 --- pkg/agentic/remote_sync_queue.go | 313 +++++++++++++++++++++++++ pkg/agentic/remote_sync_queue_test.go | 323 ++++++++++++++++++++++++++ pkg/agentic/sync.go | 94 +++----- 3 files changed, 671 insertions(+), 59 deletions(-) create mode 100644 pkg/agentic/remote_sync_queue.go create mode 100644 pkg/agentic/remote_sync_queue_test.go diff --git a/pkg/agentic/remote_sync_queue.go b/pkg/agentic/remote_sync_queue.go new file mode 100644 index 0000000..7e3c6c3 --- /dev/null +++ b/pkg/agentic/remote_sync_queue.go @@ -0,0 +1,313 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "strconv" + "sync" + "time" + + core "dappco.re/go/core" +) + +const ( + remoteSyncQueueDefaultMaxAttempts = 100 + remoteSyncQueueMinBackoff = time.Second + remoteSyncQueueMaxBackoff = 30 * time.Second +) + +var remoteSyncQueueMu sync.Mutex + +type remoteSyncClock interface { + Now() time.Time + After(time.Duration) <-chan time.Time +} + +type remoteSyncRealClock struct{} + +// clock := remoteSyncRealClock{} +// now := clock.Now() +func (remoteSyncRealClock) Now() time.Time { + return time.Now() +} + +// clock := remoteSyncRealClock{} +// <-clock.After(time.Second) +func (remoteSyncRealClock) After(delay time.Duration) <-chan time.Time { + return time.After(delay) +} + +type remoteSyncQueueController struct { + mu *sync.Mutex + clock remoteSyncClock + idlePoll time.Duration + maxAttempts int + read func() []syncQueuedPush + write func([]syncQueuedPush) + push func(context.Context, syncQueuedPush) error + onSuccess func(syncQueuedPush, time.Time) + onDrop func(syncQueuedPush, error, time.Time) + warn func(string, ...any) +} + +// delay := remoteSyncQueueBackoff(3) // 4s +func remoteSyncQueueBackoff(attempts int) time.Duration { + if attempts <= 0 { + return 0 + } + + shift := attempts - 1 + if shift > 5 { + shift = 5 + } + + delay := remoteSyncQueueMinBackoff * time.Duration(1< remoteSyncQueueMaxBackoff { + return remoteSyncQueueMaxBackoff + } + return delay +} + +// attempts := remoteSyncQueueMaxAttempts() // 100 +func remoteSyncQueueMaxAttempts() int { + for _, key := range []string{"CORE_AGENT_SYNC_MAX_ATTEMPTS", "CORE_SYNC_MAX_ATTEMPTS"} { + value := core.Trim(core.Env(key)) + if value == "" { + continue + } + attempts, err := strconv.Atoi(value) + if err == nil && attempts > 0 { + return attempts + } + } + return remoteSyncQueueDefaultMaxAttempts +} + +// controller := s.remoteSyncQueueController(remoteSyncRealClock{}, time.Minute) +// controller.run(ctx) +func (s *PrepSubsystem) remoteSyncQueueController(clock remoteSyncClock, idlePoll time.Duration) remoteSyncQueueController { + if clock == nil { + clock = remoteSyncRealClock{} + } + + return remoteSyncQueueController{ + mu: &remoteSyncQueueMu, + clock: clock, + idlePoll: idlePoll, + maxAttempts: remoteSyncQueueMaxAttempts(), + read: func() []syncQueuedPush { + if s == nil { + return nil + } + return append([]syncQueuedPush(nil), s.readSyncQueue()...) + }, + write: func(queued []syncQueuedPush) { + if s == nil { + return + } + s.writeSyncQueue(queued) + }, + push: func(ctx context.Context, queued syncQueuedPush) error { + if s == nil { + return core.E("agentic.remoteSyncQueue", "nil subsystem", nil) + } + return s.postSyncPush(ctx, queued.AgentID, queued.Dispatches, s.syncToken()) + }, + onSuccess: func(queued syncQueuedPush, at time.Time) { + markDispatchesSynced(queued.Dispatches) + recordSyncPush(at) + recordSyncHistory("push", queued.AgentID, queued.FleetNodeID, len(core.JSONMarshalString(map[string]any{ + "agent_id": queued.AgentID, + "dispatches": queued.Dispatches, + })), len(queued.Dispatches), at) + }, + onDrop: func(queued syncQueuedPush, err error, at time.Time) { + recordSyncDrop(queued.AgentID, queued.FleetNodeID, queued.Dispatches, queued.Attempts, err, at) + }, + warn: core.Warn, + } +} + +// queued := syncQueuedPush{AgentID: "charon", Dispatches: []map[string]any{{"workspace": "core/go-io/task-5"}}} +// s.enqueueSyncPush(queued) +func (s *PrepSubsystem) enqueueSyncPush(queued syncQueuedPush) { + remoteSyncQueueMu.Lock() + defer remoteSyncQueueMu.Unlock() + + current := s.readSyncQueue() + current = append(current, queued) + s.writeSyncQueue(current) +} + +// synced := controller.drainReady(ctx) +func (c remoteSyncQueueController) drainReady(ctx context.Context) int { + if ctx == nil { + return 0 + } + + c.lock() + defer c.unlock() + + synced := 0 + for { + if ctx.Err() != nil { + return synced + } + + queued := c.readQueue() + if len(queued) == 0 { + return synced + } + + head := queued[0] + if len(head.Dispatches) == 0 { + c.writeQueue(queued[1:]) + continue + } + + now := c.now() + if !head.NextAttempt.IsZero() && head.NextAttempt.After(now) { + return synced + } + + err := c.pushQueue(ctx, head) + finishedAt := c.now() + if err == nil { + if c.onSuccess != nil { + c.onSuccess(head, finishedAt) + } + c.writeQueue(queued[1:]) + synced += len(head.Dispatches) + continue + } + + head.Attempts++ + if head.Attempts >= c.maxAttemptLimit() { + if c.warn != nil { + c.warn("agentic: dropping sync queue entry after max attempts", + "agent_id", head.AgentID, + "attempts", head.Attempts, + "reason", err) + } + if c.onDrop != nil { + c.onDrop(head, err, finishedAt) + } + c.writeQueue(queued[1:]) + continue + } + + head.NextAttempt = finishedAt.Add(remoteSyncQueueBackoff(head.Attempts)) + queued[0] = head + c.writeQueue(queued) + return synced + } +} + +// controller := s.remoteSyncQueueController(remoteSyncRealClock{}, time.Minute) +// controller.run(ctx) +func (c remoteSyncQueueController) run(ctx context.Context) { + if ctx == nil { + return + } + + for { + if ctx.Err() != nil { + return + } + + c.drainReady(ctx) + delay := c.nextWait() + if delay <= 0 { + continue + } + + select { + case <-ctx.Done(): + return + case <-c.after(delay): + } + } +} + +func (c remoteSyncQueueController) nextWait() time.Duration { + c.lock() + defer c.unlock() + + queued := c.readQueue() + if len(queued) == 0 { + return c.idleDelay() + } + + head := queued[0] + if len(head.Dispatches) == 0 { + return 0 + } + + now := c.now() + if head.NextAttempt.IsZero() || !head.NextAttempt.After(now) { + return 0 + } + return head.NextAttempt.Sub(now) +} + +func (c remoteSyncQueueController) maxAttemptLimit() int { + if c.maxAttempts > 0 { + return c.maxAttempts + } + return remoteSyncQueueDefaultMaxAttempts +} + +func (c remoteSyncQueueController) idleDelay() time.Duration { + if c.idlePoll > 0 { + return c.idlePoll + } + return remoteSyncQueueMinBackoff +} + +func (c remoteSyncQueueController) now() time.Time { + if c.clock == nil { + return time.Now() + } + return c.clock.Now() +} + +func (c remoteSyncQueueController) after(delay time.Duration) <-chan time.Time { + if c.clock == nil { + return time.After(delay) + } + return c.clock.After(delay) +} + +func (c remoteSyncQueueController) readQueue() []syncQueuedPush { + if c.read == nil { + return nil + } + return c.read() +} + +func (c remoteSyncQueueController) writeQueue(queued []syncQueuedPush) { + if c.write == nil { + return + } + c.write(queued) +} + +func (c remoteSyncQueueController) pushQueue(ctx context.Context, queued syncQueuedPush) error { + if c.push == nil { + return nil + } + return c.push(ctx, queued) +} + +func (c remoteSyncQueueController) lock() { + if c.mu != nil { + c.mu.Lock() + } +} + +func (c remoteSyncQueueController) unlock() { + if c.mu != nil { + c.mu.Unlock() + } +} diff --git a/pkg/agentic/remote_sync_queue_test.go b/pkg/agentic/remote_sync_queue_test.go new file mode 100644 index 0000000..5b1d697 --- /dev/null +++ b/pkg/agentic/remote_sync_queue_test.go @@ -0,0 +1,323 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + core "dappco.re/go/core" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRemotesyncqueue_DrainReady_Good_EnqueueAndDrain(t *testing.T) { + store := &remoteSyncQueueTestStore{} + store.write([]syncQueuedPush{{ + AgentID: "charon", + QueuedAt: time.Date(2026, 4, 25, 10, 0, 0, 0, time.UTC), + Dispatches: []map[string]any{{"workspace": "core/go-io/task-5", "status": "completed"}}, + }}) + + pushed := make(chan syncQueuedPush, 1) + controller := remoteSyncQueueController{ + clock: newRemoteSyncQueueStubClock(time.Date(2026, 4, 25, 10, 0, 0, 0, time.UTC)), + maxAttempts: 3, + read: store.read, + write: store.write, + push: func(_ context.Context, queued syncQueuedPush) error { + pushed <- queued + return nil + }, + } + + synced := controller.drainReady(context.Background()) + require.Equal(t, 1, synced) + assert.Empty(t, store.snapshot()) + + select { + case queued := <-pushed: + assert.Equal(t, "charon", queued.AgentID) + require.Len(t, queued.Dispatches, 1) + assert.Equal(t, "core/go-io/task-5", queued.Dispatches[0]["workspace"]) + case <-time.After(time.Second): + t.Fatal("expected queued push to be drained") + } +} + +func TestRemotesyncqueue_Run_Bad_RetriesWithBackoff(t *testing.T) { + start := time.Date(2026, 4, 25, 10, 0, 0, 0, time.UTC) + store := &remoteSyncQueueTestStore{} + store.write([]syncQueuedPush{{ + AgentID: "charon", + QueuedAt: start, + Dispatches: []map[string]any{{"workspace": "core/go-io/task-5", "status": "completed"}}, + }}) + + clock := newRemoteSyncQueueStubClock(start) + attempts := make(chan int, 3) + var calls int + + controller := remoteSyncQueueController{ + clock: clock, + idlePoll: time.Hour, + maxAttempts: 10, + read: store.read, + write: store.write, + push: func(_ context.Context, _ syncQueuedPush) error { + calls++ + attempts <- calls + if calls < 3 { + return errors.New("offline") + } + return nil + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + controller.run(ctx) + close(done) + }() + + assert.Equal(t, 1, remoteSyncQueueReceiveAttempt(t, attempts)) + require.Eventually(t, func() bool { + queued := store.snapshot() + return len(queued) == 1 && + queued[0].Attempts == 1 && + queued[0].NextAttempt.Equal(start.Add(time.Second)) + }, time.Second, 10*time.Millisecond) + require.Eventually(t, func() bool { return clock.WaiterCount() == 1 }, time.Second, 10*time.Millisecond) + + clock.Advance(999 * time.Millisecond) + remoteSyncQueueAssertNoAttempt(t, attempts) + + clock.Advance(time.Millisecond) + assert.Equal(t, 2, remoteSyncQueueReceiveAttempt(t, attempts)) + require.Eventually(t, func() bool { + queued := store.snapshot() + return len(queued) == 1 && + queued[0].Attempts == 2 && + queued[0].NextAttempt.Equal(start.Add(3*time.Second)) + }, time.Second, 10*time.Millisecond) + require.Eventually(t, func() bool { return clock.WaiterCount() == 1 }, time.Second, 10*time.Millisecond) + + clock.Advance(2 * time.Second) + assert.Equal(t, 3, remoteSyncQueueReceiveAttempt(t, attempts)) + require.Eventually(t, func() bool { return len(store.snapshot()) == 0 }, time.Second, 10*time.Millisecond) + + cancel() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("remote sync drainer did not stop") + } +} + +func TestRemotesyncqueue_Run_Ugly_ContextCancellationStopsDrainer(t *testing.T) { + clock := newRemoteSyncQueueStubClock(time.Date(2026, 4, 25, 10, 0, 0, 0, time.UTC)) + controller := remoteSyncQueueController{ + clock: clock, + idlePoll: time.Hour, + read: func() []syncQueuedPush { return nil }, + write: func([]syncQueuedPush) {}, + } + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + controller.run(ctx) + close(done) + }() + + require.Eventually(t, func() bool { return clock.WaiterCount() == 1 }, time.Second, 10*time.Millisecond) + cancel() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("remote sync drainer did not stop after cancellation") + } +} + +func TestRemotesyncqueue_DrainReady_Ugly_MaxAttemptsExhaustedLogsAndDrops(t *testing.T) { + root := t.TempDir() + setTestWorkspace(t, root) + + store := &remoteSyncQueueTestStore{} + store.write([]syncQueuedPush{{ + AgentID: "charon", + QueuedAt: time.Date(2026, 4, 25, 10, 0, 0, 0, time.UTC), + Attempts: 1, + Dispatches: []map[string]any{{"workspace": "core/go-io/task-5", "status": "completed"}}, + }}) + + var warnings []string + controller := remoteSyncQueueController{ + clock: newRemoteSyncQueueStubClock(time.Date(2026, 4, 25, 10, 0, 1, 0, time.UTC)), + maxAttempts: 2, + read: store.read, + write: store.write, + push: func(_ context.Context, _ syncQueuedPush) error { + return errors.New("offline") + }, + onDrop: func(queued syncQueuedPush, err error, at time.Time) { + recordSyncDrop(queued.AgentID, queued.FleetNodeID, queued.Dispatches, queued.Attempts, err, at) + }, + warn: func(message string, _ ...any) { + warnings = append(warnings, message) + }, + } + + synced := controller.drainReady(context.Background()) + require.Equal(t, 0, synced) + assert.Empty(t, store.snapshot()) + require.Len(t, warnings, 1) + assert.Contains(t, warnings[0], "dropping sync queue entry") + + records := readSyncRecords() + require.Len(t, records, 1) + assert.Equal(t, "drop", records[0].Direction) + assert.Equal(t, 2, records[0].Attempts) + assert.Equal(t, 1, records[0].ItemsCount) + assert.Contains(t, records[0].Reason, "offline") +} + +func TestRemotesyncqueue_FileQueue_Good_PersistsAcrossRestart(t *testing.T) { + root := t.TempDir() + setTestWorkspace(t, root) + + writeSyncQueue([]syncQueuedPush{{ + AgentID: "charon", + QueuedAt: time.Date(2026, 4, 25, 10, 0, 0, 0, time.UTC), + Dispatches: []map[string]any{{"workspace": "core/go-io/task-5", "status": "completed"}}, + }}) + + assert.True(t, fs.Exists(syncQueuePath())) + + restarted := readSyncQueue() + require.Len(t, restarted, 1) + assert.Equal(t, "charon", restarted[0].AgentID) + require.Len(t, restarted[0].Dispatches, 1) + assert.Equal(t, "core/go-io/task-5", restarted[0].Dispatches[0]["workspace"]) +} + +type remoteSyncQueueTestStore struct { + mu sync.Mutex + queued []syncQueuedPush +} + +func (s *remoteSyncQueueTestStore) read() []syncQueuedPush { + s.mu.Lock() + defer s.mu.Unlock() + return remoteSyncQueueClone(s.queued) +} + +func (s *remoteSyncQueueTestStore) write(queued []syncQueuedPush) { + s.mu.Lock() + defer s.mu.Unlock() + s.queued = remoteSyncQueueClone(queued) +} + +func (s *remoteSyncQueueTestStore) snapshot() []syncQueuedPush { + return s.read() +} + +type remoteSyncQueueStubClock struct { + mu sync.Mutex + now time.Time + waiters []remoteSyncQueueStubWaiter +} + +type remoteSyncQueueStubWaiter struct { + at time.Time + ch chan time.Time +} + +func newRemoteSyncQueueStubClock(now time.Time) *remoteSyncQueueStubClock { + return &remoteSyncQueueStubClock{now: now} +} + +func (c *remoteSyncQueueStubClock) Now() time.Time { + c.mu.Lock() + defer c.mu.Unlock() + return c.now +} + +func (c *remoteSyncQueueStubClock) After(delay time.Duration) <-chan time.Time { + c.mu.Lock() + defer c.mu.Unlock() + + ch := make(chan time.Time, 1) + if delay <= 0 { + ch <- c.now + return ch + } + + c.waiters = append(c.waiters, remoteSyncQueueStubWaiter{ + at: c.now.Add(delay), + ch: ch, + }) + return ch +} + +func (c *remoteSyncQueueStubClock) Advance(delay time.Duration) { + c.mu.Lock() + c.now = c.now.Add(delay) + now := c.now + + var due []remoteSyncQueueStubWaiter + pending := make([]remoteSyncQueueStubWaiter, 0, len(c.waiters)) + for _, waiter := range c.waiters { + if waiter.at.After(now) { + pending = append(pending, waiter) + continue + } + due = append(due, waiter) + } + c.waiters = pending + c.mu.Unlock() + + for _, waiter := range due { + waiter.ch <- now + } +} + +func (c *remoteSyncQueueStubClock) WaiterCount() int { + c.mu.Lock() + defer c.mu.Unlock() + return len(c.waiters) +} + +func remoteSyncQueueClone(queued []syncQueuedPush) []syncQueuedPush { + payload := core.JSONMarshalString(queued) + var clone []syncQueuedPush + _ = core.JSONUnmarshalString(payload, &clone) + return clone +} + +func remoteSyncQueueReceiveAttempt(t *testing.T, attempts <-chan int) int { + t.Helper() + + select { + case attempt := <-attempts: + return attempt + case <-time.After(time.Second): + t.Fatal("expected remote sync attempt") + return 0 + } +} + +func remoteSyncQueueAssertNoAttempt(t *testing.T, attempts <-chan int) { + t.Helper() + + select { + case attempt := <-attempts: + t.Fatalf("unexpected remote sync attempt %d", attempt) + case <-time.After(100 * time.Millisecond): + } +} diff --git a/pkg/agentic/sync.go b/pkg/agentic/sync.go index 5d6e93f..2b35c0a 100644 --- a/pkg/agentic/sync.go +++ b/pkg/agentic/sync.go @@ -43,6 +43,8 @@ type SyncRecord struct { Direction string `json:"direction"` PayloadSize int `json:"payload_size"` ItemsCount int `json:"items_count"` + Attempts int `json:"attempts,omitempty"` + Reason string `json:"reason,omitempty"` SyncedAt string `json:"synced_at"` } @@ -60,7 +62,9 @@ type syncStatusState struct { LastPullAt time.Time `json:"last_pull_at,omitempty"` } -// syncBackoffSchedule implements RFC §16.5 — 1s → 5s → 15s → 60s → 5min max. +// syncBackoffSchedule preserves the legacy sync pacing used by compatibility +// tests and status tooling. The active offline queue retry logic lives in +// remoteSyncQueueBackoff and caps at 30 seconds per RFC §16.5. // schedule := syncBackoffSchedule(2) // 15s // next := time.Now().Add(schedule) func syncBackoffSchedule(attempts int) time.Duration { @@ -89,31 +93,13 @@ const syncFlushScheduleInterval = time.Minute // ctx, cancel := context.WithCancel(context.Background()) // go s.runSyncFlushLoop(ctx, time.Minute) func (s *PrepSubsystem) runSyncFlushLoop(ctx context.Context, interval time.Duration) { - if ctx == nil || interval <= 0 { + if ctx == nil { return } if s == nil || s.syncToken() == "" { return } - - ticker := time.NewTicker(interval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - if len(s.readSyncQueue()) == 0 { - continue - } - // QueueOnly keeps syncPushInput from re-scanning workspaces — the - // flush loop only drains entries already queued. - if _, err := s.syncPushInput(ctx, SyncPushInput{QueueOnly: true}); err != nil { - core.Warn("sync flush loop failed", "error", err) - } - } - } + s.remoteSyncQueueController(remoteSyncRealClock{}, interval).run(ctx) } // result := c.Action("agentic.sync.push").Run(ctx, core.NewOptions()) @@ -156,9 +142,8 @@ func (s *PrepSubsystem) syncPushInput(ctx context.Context, input SyncPushInput) dispatches = collectSyncDispatches() } token := s.syncToken() - queuedPushes := s.readSyncQueue() - if len(dispatches) > 0 { - queuedPushes = append(queuedPushes, syncQueuedPush{ + if len(dispatches) > 0 && (token != "" || len(input.Dispatches) > 0) { + s.enqueueSyncPush(syncQueuedPush{ AgentID: agentID, FleetNodeID: input.FleetNodeID, Dispatches: dispatches, @@ -166,43 +151,9 @@ func (s *PrepSubsystem) syncPushInput(ctx context.Context, input SyncPushInput) }) } if token == "" { - if len(input.Dispatches) > 0 { - s.writeSyncQueue(queuedPushes) - } return SyncPushOutput{Success: true, Count: 0}, nil } - if len(queuedPushes) == 0 { - return SyncPushOutput{Success: true, Count: 0}, nil - } - - synced := 0 - now := time.Now() - for i, queued := range queuedPushes { - if len(queued.Dispatches) == 0 { - continue - } - if !queued.NextAttempt.IsZero() && queued.NextAttempt.After(now) { - // Respect backoff — persist remaining tail so queue survives restart. - s.writeSyncQueue(queuedPushes[i:]) - return SyncPushOutput{Success: true, Count: synced}, nil - } - if err := s.postSyncPush(ctx, queued.AgentID, queued.Dispatches, token); err != nil { - remaining := append([]syncQueuedPush(nil), queuedPushes[i:]...) - remaining[0].Attempts = queued.Attempts + 1 - remaining[0].NextAttempt = time.Now().Add(syncBackoffSchedule(remaining[0].Attempts)) - s.writeSyncQueue(remaining) - return SyncPushOutput{Success: true, Count: synced}, nil - } - synced += len(queued.Dispatches) - markDispatchesSynced(queued.Dispatches) - recordSyncPush(time.Now()) - recordSyncHistory("push", queued.AgentID, queued.FleetNodeID, len(core.JSONMarshalString(map[string]any{ - "agent_id": queued.AgentID, - "dispatches": queued.Dispatches, - })), len(queued.Dispatches), time.Now()) - } - - s.writeSyncQueue(nil) + synced := s.remoteSyncQueueController(remoteSyncRealClock{}, syncFlushScheduleInterval).drainReady(ctx) return SyncPushOutput{Success: true, Count: synced}, nil } @@ -649,7 +600,10 @@ func recordSyncHistory(direction, agentID string, fleetNodeID, payloadSize, item ItemsCount: itemsCount, SyncedAt: at.UTC().Format(time.RFC3339), } + appendSyncRecord(record) +} +func appendSyncRecord(record SyncRecord) { records := readSyncRecords() records = append(records, record) if len(records) > 100 { @@ -658,6 +612,28 @@ func recordSyncHistory(direction, agentID string, fleetNodeID, payloadSize, item writeSyncRecords(records) } +func recordSyncDrop(agentID string, fleetNodeID int, dispatches []map[string]any, attempts int, err error, at time.Time) { + reason := "" + if err != nil { + reason = err.Error() + } + + record := SyncRecord{ + AgentID: core.Trim(agentID), + FleetNodeID: fleetNodeID, + Direction: "drop", + PayloadSize: len(core.JSONMarshalString(map[string]any{ + "agent_id": agentID, + "dispatches": dispatches, + })), + ItemsCount: len(dispatches), + Attempts: attempts, + Reason: reason, + SyncedAt: at.UTC().Format(time.RFC3339), + } + appendSyncRecord(record) +} + func recordSyncPush(at time.Time) { state := readSyncStatusState() state.LastPushAt = at