diff --git a/pkg/agentic/sync.go b/pkg/agentic/sync.go index 2bd2486..5d6e93f 100644 --- a/pkg/agentic/sync.go +++ b/pkg/agentic/sync.go @@ -335,11 +335,18 @@ func readSyncLedger() map[string]string { // can skip workspaces that have already been pushed. func writeSyncLedger(ledger map[string]string) { if len(ledger) == 0 { - fs.Delete(syncLedgerPath()) + if deleteResult := fs.Delete(syncLedgerPath()); !deleteResult.OK { + core.Warn("agentic: failed to delete sync ledger", "path", syncLedgerPath(), "reason", deleteResult.Value) + } return } - fs.EnsureDir(syncStateDir()) - fs.WriteAtomic(syncLedgerPath(), core.JSONMarshalString(ledger)) + if ensureResult := fs.EnsureDir(syncStateDir()); !ensureResult.OK { + core.Warn("agentic: failed to prepare sync ledger directory", "path", syncStateDir(), "reason", ensureResult.Value) + return + } + if writeResult := fs.WriteAtomic(syncLedgerPath(), core.JSONMarshalString(ledger)); !writeResult.OK { + core.Warn("agentic: failed to write sync ledger", "path", syncLedgerPath(), "reason", writeResult.Value) + } } // markDispatchesSynced records which dispatches were successfully pushed so @@ -438,11 +445,18 @@ func readSyncQueue() []syncQueuedPush { func writeSyncQueue(queued []syncQueuedPush) { if len(queued) == 0 { - fs.Delete(syncQueuePath()) + if deleteResult := fs.Delete(syncQueuePath()); !deleteResult.OK { + core.Warn("agentic: failed to delete sync queue", "path", syncQueuePath(), "reason", deleteResult.Value) + } return } - fs.EnsureDir(syncStateDir()) - fs.WriteAtomic(syncQueuePath(), core.JSONMarshalString(queued)) + if ensureResult := fs.EnsureDir(syncStateDir()); !ensureResult.OK { + core.Warn("agentic: failed to prepare sync queue directory", "path", syncStateDir(), "reason", ensureResult.Value) + return + } + if writeResult := fs.WriteAtomic(syncQueuePath(), core.JSONMarshalString(queued)); !writeResult.OK { + core.Warn("agentic: failed to write sync queue", "path", syncQueuePath(), "reason", writeResult.Value) + } } // syncQueueStoreKey is the canonical key for the sync queue inside go-store — @@ -499,8 +513,13 @@ func readSyncContext() []map[string]any { } func writeSyncContext(contextData []map[string]any) { - fs.EnsureDir(syncStateDir()) - fs.WriteAtomic(syncContextPath(), core.JSONMarshalString(contextData)) + if ensureResult := fs.EnsureDir(syncStateDir()); !ensureResult.OK { + core.Warn("agentic: failed to prepare sync context directory", "path", syncStateDir(), "reason", ensureResult.Value) + return + } + if writeResult := fs.WriteAtomic(syncContextPath(), core.JSONMarshalString(contextData)); !writeResult.OK { + core.Warn("agentic: failed to write sync context", "path", syncContextPath(), "reason", writeResult.Value) + } } func syncContextPayload(payload map[string]any) []map[string]any { @@ -551,6 +570,11 @@ func readSyncWorkspaceReport(workspaceDir string) map[string]any { var report map[string]any parseResult := core.JSONUnmarshalString(result.Value.(string), &report) if !parseResult.OK { + backupPath := core.Concat(reportPath, ".corrupt-", time.Now().UTC().Format("20060102T150405Z")) + core.Warn("agentic: corrupt dispatch report", "path", reportPath, "backup", backupPath, "reason", parseResult.Value) + if renameResult := fs.Rename(reportPath, backupPath); !renameResult.OK { + core.Warn("agentic: failed to preserve corrupt dispatch report", "path", reportPath, "backup", backupPath, "reason", renameResult.Value) + } return nil } @@ -573,8 +597,13 @@ func readSyncStatusState() syncStatusState { } func writeSyncStatusState(state syncStatusState) { - fs.EnsureDir(syncStateDir()) - fs.WriteAtomic(syncStatusPath(), core.JSONMarshalString(state)) + if ensureResult := fs.EnsureDir(syncStateDir()); !ensureResult.OK { + core.Warn("agentic: failed to prepare sync status directory", "path", syncStateDir(), "reason", ensureResult.Value) + return + } + if writeResult := fs.WriteAtomic(syncStatusPath(), core.JSONMarshalString(state)); !writeResult.OK { + core.Warn("agentic: failed to write sync status", "path", syncStatusPath(), "reason", writeResult.Value) + } } func syncRecordsPath() string { @@ -597,8 +626,13 @@ func readSyncRecords() []SyncRecord { } func writeSyncRecords(records []SyncRecord) { - fs.EnsureDir(syncStateDir()) - fs.WriteAtomic(syncRecordsPath(), core.JSONMarshalString(records)) + if ensureResult := fs.EnsureDir(syncStateDir()); !ensureResult.OK { + core.Warn("agentic: failed to prepare sync records directory", "path", syncStateDir(), "reason", ensureResult.Value) + return + } + if writeResult := fs.WriteAtomic(syncRecordsPath(), core.JSONMarshalString(records)); !writeResult.OK { + core.Warn("agentic: failed to write sync records", "path", syncRecordsPath(), "reason", writeResult.Value) + } } func recordSyncHistory(direction, agentID string, fleetNodeID, payloadSize, itemsCount int, at time.Time) { diff --git a/pkg/agentic/sync_test.go b/pkg/agentic/sync_test.go index 976cfda..148dc02 100644 --- a/pkg/agentic/sync_test.go +++ b/pkg/agentic/sync_test.go @@ -367,6 +367,26 @@ func TestSync_HandleSyncPush_Good_ReportMetadata(t *testing.T) { assert.Equal(t, 1, output.Count) } +func TestSync_ReadSyncWorkspaceReport_Ugly_CorruptJSONPreservesArtifact(t *testing.T) { + root := t.TempDir() + setTestWorkspace(t, root) + + workspaceDir := core.JoinPath(root, "workspace", "core", "go-io", "task-5") + metaDir := WorkspaceMetaDir(workspaceDir) + require.True(t, fs.EnsureDir(metaDir).OK) + + reportPath := core.JoinPath(metaDir, "report.json") + require.True(t, fs.Write(reportPath, `{"findings":[{"file":"main.go"}],"changes":`).OK) + + report := readSyncWorkspaceReport(workspaceDir) + require.Nil(t, report) + assert.False(t, fs.Exists(reportPath)) + + entries := listDirNames(fs.List(metaDir)) + require.Len(t, entries, 1) + assert.True(t, core.HasPrefix(entries[0], "report.json.corrupt-")) +} + func TestSync_HandleSyncPull_Good_NestedEnvelope(t *testing.T) { root := t.TempDir() setTestWorkspace(t, root)