- prep.go TrackWorkspace mirrors into queue + concurrency store groups
(previously only registry); hydrateWorkspaces reaps filesystem ghosts
(dead PID → failed, persisted back to status.json) so cmdStatus and
out-of-process consumers see coherent state (RFC §15.3)
- sync.go queue read/write goes through go-store first per RFC §16.5
("Queue persists across restarts in db.duckdb"), file remains fallback
for graceful degradation
- statestore.go stateStoreGet helper for go-store-first reads
- tests/cli/restart — new CLI test for RFC §15.7 "dispatch → kill →
restart → no ghost agents" dead-PID reap flow
- 4 new statestore tests: queue group mirror, concurrency refresh,
sync queue persistence, fs ghost reap with disk write-back
Co-Authored-By: Virgil <virgil@lethean.io>
637 lines
18 KiB
Go
637 lines
18 KiB
Go
// SPDX-License-Identifier: EUPL-1.2
|
|
|
|
package agentic
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
core "dappco.re/go/core"
|
|
)
|
|
|
|
type SyncPushInput struct {
|
|
AgentID string `json:"agent_id,omitempty"`
|
|
FleetNodeID int `json:"fleet_node_id,omitempty"`
|
|
Dispatches []map[string]any `json:"dispatches,omitempty"`
|
|
// QueueOnly skips the collectSyncDispatches() scan so the caller only
|
|
// drains entries already queued. Used by the flush loop to avoid
|
|
// re-adding the same completed workspaces on every tick.
|
|
QueueOnly bool `json:"-"`
|
|
}
|
|
|
|
type SyncPushOutput struct {
|
|
Success bool `json:"success"`
|
|
Count int `json:"count"`
|
|
}
|
|
|
|
type SyncPullInput struct {
|
|
AgentID string `json:"agent_id,omitempty"`
|
|
FleetNodeID int `json:"fleet_node_id,omitempty"`
|
|
Since string `json:"since,omitempty"`
|
|
}
|
|
|
|
type SyncPullOutput struct {
|
|
Success bool `json:"success"`
|
|
Count int `json:"count"`
|
|
Context []map[string]any `json:"context"`
|
|
}
|
|
|
|
// record := agentic.SyncRecord{AgentID: "codex", Direction: "push", ItemsCount: 3, PayloadSize: 512, SyncedAt: "2026-03-31T12:00:00Z"}
|
|
type SyncRecord struct {
|
|
AgentID string `json:"agent_id,omitempty"`
|
|
FleetNodeID int `json:"fleet_node_id,omitempty"`
|
|
Direction string `json:"direction"`
|
|
PayloadSize int `json:"payload_size"`
|
|
ItemsCount int `json:"items_count"`
|
|
SyncedAt string `json:"synced_at"`
|
|
}
|
|
|
|
type syncQueuedPush struct {
|
|
AgentID string `json:"agent_id"`
|
|
FleetNodeID int `json:"fleet_node_id,omitempty"`
|
|
Dispatches []map[string]any `json:"dispatches"`
|
|
QueuedAt time.Time `json:"queued_at"`
|
|
Attempts int `json:"attempts,omitempty"`
|
|
NextAttempt time.Time `json:"next_attempt,omitempty"`
|
|
}
|
|
|
|
type syncStatusState struct {
|
|
LastPushAt time.Time `json:"last_push_at,omitempty"`
|
|
LastPullAt time.Time `json:"last_pull_at,omitempty"`
|
|
}
|
|
|
|
// syncBackoffSchedule implements RFC §16.5 — 1s → 5s → 15s → 60s → 5min max.
|
|
// schedule := syncBackoffSchedule(2) // 15s
|
|
// next := time.Now().Add(schedule)
|
|
func syncBackoffSchedule(attempts int) time.Duration {
|
|
switch {
|
|
case attempts <= 0:
|
|
return 0
|
|
case attempts == 1:
|
|
return time.Second
|
|
case attempts == 2:
|
|
return 5 * time.Second
|
|
case attempts == 3:
|
|
return 15 * time.Second
|
|
case attempts == 4:
|
|
return 60 * time.Second
|
|
default:
|
|
return 5 * time.Minute
|
|
}
|
|
}
|
|
|
|
// syncFlushScheduleInterval is the cadence at which queued pushes are retried
|
|
// when the agent has been unable to reach the platform. Per RFC §16.5 the
|
|
// retry window max is 5 minutes, so the scheduler wakes at that cadence and
|
|
// each queued entry enforces its own NextAttempt gate.
|
|
const syncFlushScheduleInterval = time.Minute
|
|
|
|
// ctx, cancel := context.WithCancel(context.Background())
|
|
// go s.runSyncFlushLoop(ctx, time.Minute)
|
|
func (s *PrepSubsystem) runSyncFlushLoop(ctx context.Context, interval time.Duration) {
|
|
if ctx == nil || interval <= 0 {
|
|
return
|
|
}
|
|
if s == nil || s.syncToken() == "" {
|
|
return
|
|
}
|
|
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
if len(s.readSyncQueue()) == 0 {
|
|
continue
|
|
}
|
|
// QueueOnly keeps syncPushInput from re-scanning workspaces — the
|
|
// flush loop only drains entries already queued.
|
|
if _, err := s.syncPushInput(ctx, SyncPushInput{QueueOnly: true}); err != nil {
|
|
core.Warn("sync flush loop failed", "error", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// result := c.Action("agentic.sync.push").Run(ctx, core.NewOptions())
|
|
func (s *PrepSubsystem) handleSyncPush(ctx context.Context, options core.Options) core.Result {
|
|
output, err := s.syncPushInput(ctx, SyncPushInput{
|
|
AgentID: optionStringValue(options, "agent_id", "agent-id", "_arg"),
|
|
FleetNodeID: optionIntValue(options, "fleet_node_id", "fleet-node-id"),
|
|
Dispatches: optionAnyMapSliceValue(options, "dispatches"),
|
|
})
|
|
if err != nil {
|
|
return core.Result{Value: err, OK: false}
|
|
}
|
|
return core.Result{Value: output, OK: true}
|
|
}
|
|
|
|
// result := c.Action("agentic.sync.pull").Run(ctx, core.NewOptions())
|
|
func (s *PrepSubsystem) handleSyncPull(ctx context.Context, options core.Options) core.Result {
|
|
output, err := s.syncPullInput(ctx, SyncPullInput{
|
|
AgentID: optionStringValue(options, "agent_id", "agent-id", "_arg"),
|
|
FleetNodeID: optionIntValue(options, "fleet_node_id", "fleet-node-id"),
|
|
Since: optionStringValue(options, "since"),
|
|
})
|
|
if err != nil {
|
|
return core.Result{Value: err, OK: false}
|
|
}
|
|
return core.Result{Value: output, OK: true}
|
|
}
|
|
|
|
func (s *PrepSubsystem) syncPush(ctx context.Context, agentID string) (SyncPushOutput, error) {
|
|
return s.syncPushInput(ctx, SyncPushInput{AgentID: agentID})
|
|
}
|
|
|
|
func (s *PrepSubsystem) syncPushInput(ctx context.Context, input SyncPushInput) (SyncPushOutput, error) {
|
|
agentID := input.AgentID
|
|
if agentID == "" {
|
|
agentID = AgentName()
|
|
}
|
|
dispatches := input.Dispatches
|
|
if len(dispatches) == 0 && !input.QueueOnly {
|
|
dispatches = collectSyncDispatches()
|
|
}
|
|
token := s.syncToken()
|
|
queuedPushes := s.readSyncQueue()
|
|
if len(dispatches) > 0 {
|
|
queuedPushes = append(queuedPushes, syncQueuedPush{
|
|
AgentID: agentID,
|
|
FleetNodeID: input.FleetNodeID,
|
|
Dispatches: dispatches,
|
|
QueuedAt: time.Now(),
|
|
})
|
|
}
|
|
if token == "" {
|
|
if len(input.Dispatches) > 0 {
|
|
s.writeSyncQueue(queuedPushes)
|
|
}
|
|
return SyncPushOutput{Success: true, Count: 0}, nil
|
|
}
|
|
if len(queuedPushes) == 0 {
|
|
return SyncPushOutput{Success: true, Count: 0}, nil
|
|
}
|
|
|
|
synced := 0
|
|
now := time.Now()
|
|
for i, queued := range queuedPushes {
|
|
if len(queued.Dispatches) == 0 {
|
|
continue
|
|
}
|
|
if !queued.NextAttempt.IsZero() && queued.NextAttempt.After(now) {
|
|
// Respect backoff — persist remaining tail so queue survives restart.
|
|
s.writeSyncQueue(queuedPushes[i:])
|
|
return SyncPushOutput{Success: true, Count: synced}, nil
|
|
}
|
|
if err := s.postSyncPush(ctx, queued.AgentID, queued.Dispatches, token); err != nil {
|
|
remaining := append([]syncQueuedPush(nil), queuedPushes[i:]...)
|
|
remaining[0].Attempts = queued.Attempts + 1
|
|
remaining[0].NextAttempt = time.Now().Add(syncBackoffSchedule(remaining[0].Attempts))
|
|
s.writeSyncQueue(remaining)
|
|
return SyncPushOutput{Success: true, Count: synced}, nil
|
|
}
|
|
synced += len(queued.Dispatches)
|
|
markDispatchesSynced(queued.Dispatches)
|
|
recordSyncPush(time.Now())
|
|
recordSyncHistory("push", queued.AgentID, queued.FleetNodeID, len(core.JSONMarshalString(map[string]any{
|
|
"agent_id": queued.AgentID,
|
|
"dispatches": queued.Dispatches,
|
|
})), len(queued.Dispatches), time.Now())
|
|
}
|
|
|
|
s.writeSyncQueue(nil)
|
|
return SyncPushOutput{Success: true, Count: synced}, nil
|
|
}
|
|
|
|
func (s *PrepSubsystem) syncPull(ctx context.Context, agentID string) (SyncPullOutput, error) {
|
|
return s.syncPullInput(ctx, SyncPullInput{AgentID: agentID})
|
|
}
|
|
|
|
func (s *PrepSubsystem) syncPullInput(ctx context.Context, input SyncPullInput) (SyncPullOutput, error) {
|
|
agentID := input.AgentID
|
|
if agentID == "" {
|
|
agentID = AgentName()
|
|
}
|
|
token := s.syncToken()
|
|
if token == "" {
|
|
cached := readSyncContext()
|
|
return SyncPullOutput{Success: true, Count: len(cached), Context: cached}, nil
|
|
}
|
|
|
|
path := appendQueryParam("/v1/agent/context", "agent_id", agentID)
|
|
path = appendQueryParam(path, "since", input.Since)
|
|
endpoint := core.Concat(s.syncAPIURL(), path)
|
|
result := HTTPGet(ctx, endpoint, token, "Bearer")
|
|
if !result.OK {
|
|
cached := readSyncContext()
|
|
return SyncPullOutput{Success: true, Count: len(cached), Context: cached}, nil
|
|
}
|
|
|
|
var response map[string]any
|
|
parseResult := core.JSONUnmarshalString(result.Value.(string), &response)
|
|
if !parseResult.OK {
|
|
cached := readSyncContext()
|
|
return SyncPullOutput{Success: true, Count: len(cached), Context: cached}, nil
|
|
}
|
|
|
|
contextData := syncContextPayload(response)
|
|
writeSyncContext(contextData)
|
|
recordSyncPull(time.Now())
|
|
recordSyncHistory("pull", agentID, input.FleetNodeID, len(result.Value.(string)), len(contextData), time.Now())
|
|
|
|
return SyncPullOutput{
|
|
Success: true,
|
|
Count: len(contextData),
|
|
Context: contextData,
|
|
}, nil
|
|
}
|
|
|
|
func (s *PrepSubsystem) syncAPIURL() string {
|
|
if value := core.Env("CORE_API_URL"); value != "" {
|
|
return value
|
|
}
|
|
if s != nil && s.brainURL != "" {
|
|
return s.brainURL
|
|
}
|
|
return "https://api.lthn.sh"
|
|
}
|
|
|
|
func (s *PrepSubsystem) syncToken() string {
|
|
if value := core.Env("CORE_AGENT_API_KEY"); value != "" {
|
|
return value
|
|
}
|
|
if value := core.Env("CORE_BRAIN_KEY"); value != "" {
|
|
return value
|
|
}
|
|
if s != nil && s.brainKey != "" {
|
|
return s.brainKey
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func collectSyncDispatches() []map[string]any {
|
|
ledger := readSyncLedger()
|
|
var dispatches []map[string]any
|
|
for _, path := range WorkspaceStatusPaths() {
|
|
workspaceDir := core.PathDir(path)
|
|
statusResult := ReadStatusResult(workspaceDir)
|
|
workspaceStatus, ok := workspaceStatusValue(statusResult)
|
|
if !ok {
|
|
continue
|
|
}
|
|
if !shouldSyncStatus(workspaceStatus.Status) {
|
|
continue
|
|
}
|
|
dispatchID := syncDispatchID(workspaceDir, workspaceStatus)
|
|
if synced, ok := ledger[dispatchID]; ok && synced == syncDispatchFingerprint(workspaceStatus) {
|
|
continue
|
|
}
|
|
record := syncDispatchRecord(workspaceDir, workspaceStatus)
|
|
record["id"] = dispatchID
|
|
dispatches = append(dispatches, record)
|
|
}
|
|
return dispatches
|
|
}
|
|
|
|
// id := syncDispatchID(workspaceDir, workspaceStatus) // "core/go-io/task-5"
|
|
func syncDispatchID(workspaceDir string, workspaceStatus *WorkspaceStatus) string {
|
|
if workspaceStatus == nil {
|
|
return WorkspaceName(workspaceDir)
|
|
}
|
|
return WorkspaceName(workspaceDir)
|
|
}
|
|
|
|
// fingerprint := syncDispatchFingerprint(workspaceStatus) // "2026-04-14T12:00:00Z#3"
|
|
// A dispatch is considered unchanged when (updated_at, runs) matches.
|
|
// Any new activity (re-dispatch, status change) generates a fresh fingerprint.
|
|
func syncDispatchFingerprint(workspaceStatus *WorkspaceStatus) string {
|
|
if workspaceStatus == nil {
|
|
return ""
|
|
}
|
|
return core.Concat(workspaceStatus.UpdatedAt.UTC().Format(time.RFC3339), "#", core.Sprintf("%d", workspaceStatus.Runs))
|
|
}
|
|
|
|
// ledger := readSyncLedger() // map[dispatchID]fingerprint of last push
|
|
func readSyncLedger() map[string]string {
|
|
ledger := map[string]string{}
|
|
result := fs.Read(syncLedgerPath())
|
|
if !result.OK {
|
|
return ledger
|
|
}
|
|
content := core.Trim(result.Value.(string))
|
|
if content == "" {
|
|
return ledger
|
|
}
|
|
if parseResult := core.JSONUnmarshalString(content, &ledger); !parseResult.OK {
|
|
return map[string]string{}
|
|
}
|
|
return ledger
|
|
}
|
|
|
|
// writeSyncLedger persists the dispatched fingerprints so the next scan
|
|
// can skip workspaces that have already been pushed.
|
|
func writeSyncLedger(ledger map[string]string) {
|
|
if len(ledger) == 0 {
|
|
fs.Delete(syncLedgerPath())
|
|
return
|
|
}
|
|
fs.EnsureDir(syncStateDir())
|
|
fs.WriteAtomic(syncLedgerPath(), core.JSONMarshalString(ledger))
|
|
}
|
|
|
|
// markDispatchesSynced records which dispatches were successfully pushed so
|
|
// collectSyncDispatches skips them on the next scan.
|
|
func markDispatchesSynced(dispatches []map[string]any) {
|
|
if len(dispatches) == 0 {
|
|
return
|
|
}
|
|
ledger := readSyncLedger()
|
|
changed := false
|
|
for _, record := range dispatches {
|
|
id := stringValue(record["id"])
|
|
if id == "" {
|
|
id = stringValue(record["workspace"])
|
|
}
|
|
if id == "" {
|
|
continue
|
|
}
|
|
updatedAt := ""
|
|
switch v := record["updated_at"].(type) {
|
|
case time.Time:
|
|
updatedAt = v.UTC().Format(time.RFC3339)
|
|
case string:
|
|
updatedAt = v
|
|
}
|
|
runs := 0
|
|
if v, ok := record["runs"].(int); ok {
|
|
runs = v
|
|
}
|
|
ledger[id] = core.Concat(updatedAt, "#", core.Sprintf("%d", runs))
|
|
changed = true
|
|
}
|
|
if changed {
|
|
writeSyncLedger(ledger)
|
|
}
|
|
}
|
|
|
|
func syncLedgerPath() string {
|
|
return core.JoinPath(syncStateDir(), "ledger.json")
|
|
}
|
|
|
|
func shouldSyncStatus(status string) bool {
|
|
switch status {
|
|
case "completed", "merged", "failed", "blocked":
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (s *PrepSubsystem) postSyncPush(ctx context.Context, agentID string, dispatches []map[string]any, token string) error {
|
|
payload := map[string]any{
|
|
"agent_id": agentID,
|
|
"dispatches": dispatches,
|
|
}
|
|
|
|
result := HTTPPost(ctx, core.Concat(s.syncAPIURL(), "/v1/agent/sync"), core.JSONMarshalString(payload), token, "Bearer")
|
|
if result.OK {
|
|
return nil
|
|
}
|
|
|
|
err, _ := result.Value.(error)
|
|
if err == nil {
|
|
err = core.E("agentic.sync.push", "sync push failed", nil)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func syncStateDir() string {
|
|
return core.JoinPath(CoreRoot(), "sync")
|
|
}
|
|
|
|
func syncQueuePath() string {
|
|
return core.JoinPath(syncStateDir(), "queue.json")
|
|
}
|
|
|
|
func syncContextPath() string {
|
|
return core.JoinPath(syncStateDir(), "context.json")
|
|
}
|
|
|
|
func syncStatusPath() string {
|
|
return core.JoinPath(syncStateDir(), "status.json")
|
|
}
|
|
|
|
func readSyncQueue() []syncQueuedPush {
|
|
var queued []syncQueuedPush
|
|
result := fs.Read(syncQueuePath())
|
|
if !result.OK {
|
|
return queued
|
|
}
|
|
parseResult := core.JSONUnmarshalString(result.Value.(string), &queued)
|
|
if !parseResult.OK {
|
|
return []syncQueuedPush{}
|
|
}
|
|
return queued
|
|
}
|
|
|
|
func writeSyncQueue(queued []syncQueuedPush) {
|
|
if len(queued) == 0 {
|
|
fs.Delete(syncQueuePath())
|
|
return
|
|
}
|
|
fs.EnsureDir(syncStateDir())
|
|
fs.WriteAtomic(syncQueuePath(), core.JSONMarshalString(queued))
|
|
}
|
|
|
|
// syncQueueStoreKey is the canonical key for the sync queue inside go-store —
|
|
// the queue is a single JSON blob keyed under stateSyncQueueGroup so RFC §16.5
|
|
// "Queue persists across restarts in db.duckdb" holds.
|
|
//
|
|
// Usage example: `key := syncQueueStoreKey // "queue"`
|
|
const syncQueueStoreKey = "queue"
|
|
|
|
// readSyncQueue reads the queued sync pushes from go-store first (RFC §16.5)
|
|
// and falls back to the JSON file when the store is unavailable. Falling back
|
|
// keeps offline deployments working through the rollout.
|
|
//
|
|
// Usage example: `queued := s.readSyncQueue()`
|
|
func (s *PrepSubsystem) readSyncQueue() []syncQueuedPush {
|
|
if s != nil {
|
|
if value, ok := s.stateStoreGet(stateSyncQueueGroup, syncQueueStoreKey); ok {
|
|
var queued []syncQueuedPush
|
|
if result := core.JSONUnmarshalString(value, &queued); result.OK {
|
|
return queued
|
|
}
|
|
}
|
|
}
|
|
return readSyncQueue()
|
|
}
|
|
|
|
// writeSyncQueue persists the queued sync pushes to go-store (RFC §16.5) and
|
|
// mirrors the JSON file so file-only consumers (debug tooling, manual recovery)
|
|
// continue to work.
|
|
//
|
|
// Usage example: `s.writeSyncQueue(queued)`
|
|
func (s *PrepSubsystem) writeSyncQueue(queued []syncQueuedPush) {
|
|
if s != nil {
|
|
if len(queued) == 0 {
|
|
s.stateStoreDelete(stateSyncQueueGroup, syncQueueStoreKey)
|
|
} else {
|
|
s.stateStoreSet(stateSyncQueueGroup, syncQueueStoreKey, queued)
|
|
}
|
|
}
|
|
writeSyncQueue(queued)
|
|
}
|
|
|
|
func readSyncContext() []map[string]any {
|
|
var contextData []map[string]any
|
|
result := fs.Read(syncContextPath())
|
|
if !result.OK {
|
|
return contextData
|
|
}
|
|
parseResult := core.JSONUnmarshalString(result.Value.(string), &contextData)
|
|
if !parseResult.OK {
|
|
return []map[string]any{}
|
|
}
|
|
return contextData
|
|
}
|
|
|
|
func writeSyncContext(contextData []map[string]any) {
|
|
fs.EnsureDir(syncStateDir())
|
|
fs.WriteAtomic(syncContextPath(), core.JSONMarshalString(contextData))
|
|
}
|
|
|
|
func syncContextPayload(payload map[string]any) []map[string]any {
|
|
if contextData := payloadDataSlice(payload, "context", "items", "memories"); len(contextData) > 0 {
|
|
return contextData
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func syncDispatchRecord(workspaceDir string, workspaceStatus *WorkspaceStatus) map[string]any {
|
|
record := map[string]any{
|
|
"workspace": WorkspaceName(workspaceDir),
|
|
"repo": workspaceStatus.Repo,
|
|
"org": workspaceStatus.Org,
|
|
"task": workspaceStatus.Task,
|
|
"agent": workspaceStatus.Agent,
|
|
"branch": workspaceStatus.Branch,
|
|
"status": workspaceStatus.Status,
|
|
"question": workspaceStatus.Question,
|
|
"issue": workspaceStatus.Issue,
|
|
"runs": workspaceStatus.Runs,
|
|
"process_id": workspaceStatus.ProcessID,
|
|
"pr_url": workspaceStatus.PRURL,
|
|
"started_at": workspaceStatus.StartedAt,
|
|
"updated_at": workspaceStatus.UpdatedAt,
|
|
}
|
|
|
|
if report := readSyncWorkspaceReport(workspaceDir); len(report) > 0 {
|
|
record["report"] = report
|
|
if findings := anyMapSliceValue(report["findings"]); len(findings) > 0 {
|
|
record["findings"] = findings
|
|
}
|
|
if changes := anyMapValue(report["changes"]); len(changes) > 0 {
|
|
record["changes"] = changes
|
|
}
|
|
}
|
|
|
|
return record
|
|
}
|
|
|
|
func readSyncWorkspaceReport(workspaceDir string) map[string]any {
|
|
reportPath := core.JoinPath(WorkspaceMetaDir(workspaceDir), "report.json")
|
|
result := fs.Read(reportPath)
|
|
if !result.OK {
|
|
return nil
|
|
}
|
|
|
|
var report map[string]any
|
|
parseResult := core.JSONUnmarshalString(result.Value.(string), &report)
|
|
if !parseResult.OK {
|
|
return nil
|
|
}
|
|
|
|
return report
|
|
}
|
|
|
|
func readSyncStatusState() syncStatusState {
|
|
var state syncStatusState
|
|
result := fs.Read(syncStatusPath())
|
|
if !result.OK {
|
|
return state
|
|
}
|
|
|
|
parseResult := core.JSONUnmarshalString(result.Value.(string), &state)
|
|
if !parseResult.OK {
|
|
return syncStatusState{}
|
|
}
|
|
|
|
return state
|
|
}
|
|
|
|
func writeSyncStatusState(state syncStatusState) {
|
|
fs.EnsureDir(syncStateDir())
|
|
fs.WriteAtomic(syncStatusPath(), core.JSONMarshalString(state))
|
|
}
|
|
|
|
func syncRecordsPath() string {
|
|
return core.JoinPath(syncStateDir(), "records.json")
|
|
}
|
|
|
|
func readSyncRecords() []SyncRecord {
|
|
var records []SyncRecord
|
|
result := fs.Read(syncRecordsPath())
|
|
if !result.OK {
|
|
return records
|
|
}
|
|
|
|
parseResult := core.JSONUnmarshalString(result.Value.(string), &records)
|
|
if !parseResult.OK {
|
|
return []SyncRecord{}
|
|
}
|
|
|
|
return records
|
|
}
|
|
|
|
func writeSyncRecords(records []SyncRecord) {
|
|
fs.EnsureDir(syncStateDir())
|
|
fs.WriteAtomic(syncRecordsPath(), core.JSONMarshalString(records))
|
|
}
|
|
|
|
func recordSyncHistory(direction, agentID string, fleetNodeID, payloadSize, itemsCount int, at time.Time) {
|
|
direction = core.Trim(direction)
|
|
if direction == "" {
|
|
return
|
|
}
|
|
|
|
record := SyncRecord{
|
|
AgentID: core.Trim(agentID),
|
|
FleetNodeID: fleetNodeID,
|
|
Direction: direction,
|
|
PayloadSize: payloadSize,
|
|
ItemsCount: itemsCount,
|
|
SyncedAt: at.UTC().Format(time.RFC3339),
|
|
}
|
|
|
|
records := readSyncRecords()
|
|
records = append(records, record)
|
|
if len(records) > 100 {
|
|
records = records[len(records)-100:]
|
|
}
|
|
writeSyncRecords(records)
|
|
}
|
|
|
|
func recordSyncPush(at time.Time) {
|
|
state := readSyncStatusState()
|
|
state.LastPushAt = at
|
|
writeSyncStatusState(state)
|
|
}
|
|
|
|
func recordSyncPull(at time.Time) {
|
|
state := readSyncStatusState()
|
|
state.LastPullAt = at
|
|
writeSyncStatusState(state)
|
|
}
|