fix(sync): preserve corrupt dispatch reports

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Snider 2026-04-18 08:47:50 +01:00
parent b338e12fbf
commit a7c16de971
2 changed files with 66 additions and 12 deletions

View file

@ -335,11 +335,18 @@ func readSyncLedger() map[string]string {
// can skip workspaces that have already been pushed.
func writeSyncLedger(ledger map[string]string) {
if len(ledger) == 0 {
fs.Delete(syncLedgerPath())
if deleteResult := fs.Delete(syncLedgerPath()); !deleteResult.OK {
core.Warn("agentic: failed to delete sync ledger", "path", syncLedgerPath(), "reason", deleteResult.Value)
}
return
}
fs.EnsureDir(syncStateDir())
fs.WriteAtomic(syncLedgerPath(), core.JSONMarshalString(ledger))
if ensureResult := fs.EnsureDir(syncStateDir()); !ensureResult.OK {
core.Warn("agentic: failed to prepare sync ledger directory", "path", syncStateDir(), "reason", ensureResult.Value)
return
}
if writeResult := fs.WriteAtomic(syncLedgerPath(), core.JSONMarshalString(ledger)); !writeResult.OK {
core.Warn("agentic: failed to write sync ledger", "path", syncLedgerPath(), "reason", writeResult.Value)
}
}
// markDispatchesSynced records which dispatches were successfully pushed so
@ -438,11 +445,18 @@ func readSyncQueue() []syncQueuedPush {
func writeSyncQueue(queued []syncQueuedPush) {
if len(queued) == 0 {
fs.Delete(syncQueuePath())
if deleteResult := fs.Delete(syncQueuePath()); !deleteResult.OK {
core.Warn("agentic: failed to delete sync queue", "path", syncQueuePath(), "reason", deleteResult.Value)
}
return
}
fs.EnsureDir(syncStateDir())
fs.WriteAtomic(syncQueuePath(), core.JSONMarshalString(queued))
if ensureResult := fs.EnsureDir(syncStateDir()); !ensureResult.OK {
core.Warn("agentic: failed to prepare sync queue directory", "path", syncStateDir(), "reason", ensureResult.Value)
return
}
if writeResult := fs.WriteAtomic(syncQueuePath(), core.JSONMarshalString(queued)); !writeResult.OK {
core.Warn("agentic: failed to write sync queue", "path", syncQueuePath(), "reason", writeResult.Value)
}
}
// syncQueueStoreKey is the canonical key for the sync queue inside go-store —
@ -499,8 +513,13 @@ func readSyncContext() []map[string]any {
}
func writeSyncContext(contextData []map[string]any) {
fs.EnsureDir(syncStateDir())
fs.WriteAtomic(syncContextPath(), core.JSONMarshalString(contextData))
if ensureResult := fs.EnsureDir(syncStateDir()); !ensureResult.OK {
core.Warn("agentic: failed to prepare sync context directory", "path", syncStateDir(), "reason", ensureResult.Value)
return
}
if writeResult := fs.WriteAtomic(syncContextPath(), core.JSONMarshalString(contextData)); !writeResult.OK {
core.Warn("agentic: failed to write sync context", "path", syncContextPath(), "reason", writeResult.Value)
}
}
func syncContextPayload(payload map[string]any) []map[string]any {
@ -551,6 +570,11 @@ func readSyncWorkspaceReport(workspaceDir string) map[string]any {
var report map[string]any
parseResult := core.JSONUnmarshalString(result.Value.(string), &report)
if !parseResult.OK {
backupPath := core.Concat(reportPath, ".corrupt-", time.Now().UTC().Format("20060102T150405Z"))
core.Warn("agentic: corrupt dispatch report", "path", reportPath, "backup", backupPath, "reason", parseResult.Value)
if renameResult := fs.Rename(reportPath, backupPath); !renameResult.OK {
core.Warn("agentic: failed to preserve corrupt dispatch report", "path", reportPath, "backup", backupPath, "reason", renameResult.Value)
}
return nil
}
@ -573,8 +597,13 @@ func readSyncStatusState() syncStatusState {
}
func writeSyncStatusState(state syncStatusState) {
fs.EnsureDir(syncStateDir())
fs.WriteAtomic(syncStatusPath(), core.JSONMarshalString(state))
if ensureResult := fs.EnsureDir(syncStateDir()); !ensureResult.OK {
core.Warn("agentic: failed to prepare sync status directory", "path", syncStateDir(), "reason", ensureResult.Value)
return
}
if writeResult := fs.WriteAtomic(syncStatusPath(), core.JSONMarshalString(state)); !writeResult.OK {
core.Warn("agentic: failed to write sync status", "path", syncStatusPath(), "reason", writeResult.Value)
}
}
func syncRecordsPath() string {
@ -597,8 +626,13 @@ func readSyncRecords() []SyncRecord {
}
func writeSyncRecords(records []SyncRecord) {
fs.EnsureDir(syncStateDir())
fs.WriteAtomic(syncRecordsPath(), core.JSONMarshalString(records))
if ensureResult := fs.EnsureDir(syncStateDir()); !ensureResult.OK {
core.Warn("agentic: failed to prepare sync records directory", "path", syncStateDir(), "reason", ensureResult.Value)
return
}
if writeResult := fs.WriteAtomic(syncRecordsPath(), core.JSONMarshalString(records)); !writeResult.OK {
core.Warn("agentic: failed to write sync records", "path", syncRecordsPath(), "reason", writeResult.Value)
}
}
func recordSyncHistory(direction, agentID string, fleetNodeID, payloadSize, itemsCount int, at time.Time) {

View file

@ -367,6 +367,26 @@ func TestSync_HandleSyncPush_Good_ReportMetadata(t *testing.T) {
assert.Equal(t, 1, output.Count)
}
func TestSync_ReadSyncWorkspaceReport_Ugly_CorruptJSONPreservesArtifact(t *testing.T) {
root := t.TempDir()
setTestWorkspace(t, root)
workspaceDir := core.JoinPath(root, "workspace", "core", "go-io", "task-5")
metaDir := WorkspaceMetaDir(workspaceDir)
require.True(t, fs.EnsureDir(metaDir).OK)
reportPath := core.JoinPath(metaDir, "report.json")
require.True(t, fs.Write(reportPath, `{"findings":[{"file":"main.go"}],"changes":`).OK)
report := readSyncWorkspaceReport(workspaceDir)
require.Nil(t, report)
assert.False(t, fs.Exists(reportPath))
entries := listDirNames(fs.List(metaDir))
require.Len(t, entries, 1)
assert.True(t, core.HasPrefix(entries[0], "report.json.corrupt-"))
}
func TestSync_HandleSyncPull_Good_NestedEnvelope(t *testing.T) {
root := t.TempDir()
setTestWorkspace(t, root)