feat(agent): sync backoff + ledger + auto-flush loop
- sync.go: syncBackoffSchedule (1s/5s/15s/60s/5min) with per-push Attempts and NextAttempt honoured on retry (RFC §16.5) - runSyncFlushLoop: ticks every minute from OnStartup when API key present, drains the queue without re-scanning workspaces - SyncPushInput.QueueOnly: lets flush loop drain without triggering a full workspace scan (prevents duplicate pushes) - Sync ledger at .core/sync/ledger.json: fingerprints keyed by workspace name + (updated_at, runs); skips already-synced workspaces until fresh activity - docs/RFC-AGENT.md: synced from plans/code/core/agent/RFC.md with latest AgentPlan status enum, complete capability, pr.close/branch.delete, indexed_at/org brain fields Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
edfcb1bdfe
commit
83364a6080
5 changed files with 512 additions and 29 deletions
|
|
@ -1,3 +1,21 @@
|
|||
---
|
||||
module: core/agent
|
||||
repo: core/agent
|
||||
lang: multi
|
||||
tier: consumer
|
||||
depends:
|
||||
- code/core/go/process
|
||||
- code/core/go/store
|
||||
- code/core/mcp
|
||||
- code/snider/poindexter
|
||||
tags:
|
||||
- dispatch
|
||||
- orchestration
|
||||
- pipeline
|
||||
- agents
|
||||
- memory
|
||||
---
|
||||
|
||||
# core/agent RFC — Agentic Dispatch, Orchestration, and Pipeline Management
|
||||
|
||||
> The cross-cutting contract for the agent system.
|
||||
|
|
@ -20,7 +38,7 @@ The contract is language-agnostic. Go implements the local MCP server and dispat
|
|||
|
||||
| Model | Purpose |
|
||||
|-------|---------|
|
||||
| `AgentPlan` | Structured work plan with phases, soft-deleted, activity-logged |
|
||||
| `AgentPlan` | Structured work plan with phases, soft-deleted, activity-logged. Status enum: `draft`, `active`, `in_progress`, `needs_verification`, `verified`, `completed`, `archived`. Both Go and PHP must accept all values. |
|
||||
| `AgentPhase` | Individual phase within a plan (tasks, dependencies, status) |
|
||||
| `AgentSession` | Agent work session (context, work_log, artefacts, handoff) |
|
||||
| `AgentMessage` | Direct agent-to-agent messaging (chronological, not semantic) |
|
||||
|
|
@ -59,6 +77,7 @@ Both implementations provide these capabilities, registered as named actions:
|
|||
| `resume` | Resume a paused or failed agent session |
|
||||
| `scan` | Scan Forge repos for actionable issues |
|
||||
| `watch` | Watch workspace for agent output changes |
|
||||
| `complete` | Run the full completion pipeline (QA → PR → Verify → Ingest → Poke) |
|
||||
|
||||
### Pipeline
|
||||
|
||||
|
|
@ -81,6 +100,8 @@ Both implementations provide these capabilities, registered as named actions:
|
|||
| `pr.get` | Get a single pull request |
|
||||
| `pr.list` | List pull requests |
|
||||
| `pr.merge` | Merge a pull request |
|
||||
| `pr.close` | Close a pull request without merging |
|
||||
| `branch.delete` | Delete a feature branch after merge or close |
|
||||
|
||||
### Brain
|
||||
|
||||
|
|
@ -143,7 +164,9 @@ Shared semantic knowledge store. All agents read and write via `brain_*` tools.
|
|||
| `type` | enum | decision, observation, convention, research, plan, bug, architecture |
|
||||
| `content` | text | The knowledge (markdown) |
|
||||
| `tags` | JSON | Topic tags for filtering |
|
||||
| `org` | string nullable | Organisation scope (e.g. "core", "lthn", "ofm" — null = global) |
|
||||
| `project` | string nullable | Repo/project scope (null = cross-project) |
|
||||
| `indexed_at` | timestamp nullable | When Qdrant/ES indexing completed (null = pending async embed) |
|
||||
| `confidence` | float | 0.0-1.0 |
|
||||
| `supersedes_id` | UUID nullable | FK to older memory this replaces |
|
||||
| `expires_at` | timestamp nullable | TTL for session-scoped context |
|
||||
|
|
@ -174,7 +197,13 @@ brain_recall(query, filters)
|
|||
|
||||
## 5. API Surface
|
||||
|
||||
Both implementations expose these endpoints. PHP serves them as REST routes; Go exposes equivalent capabilities via MCP tools and local IPC.
|
||||
Both implementations expose these capabilities but with different storage backends:
|
||||
|
||||
- **Go** operates on **local workspace state** — plans, sessions, and findings live in `.core/` filesystem and DuckDB. Go is the local agent runtime.
|
||||
- **PHP** operates on **persistent database state** — MariaDB, Qdrant, Elasticsearch. PHP is the fleet coordination platform.
|
||||
- **Sync** connects them: `POST /v1/agent/sync` pushes Go's local dispatch history/findings to PHP's persistent store. `GET /v1/agent/context` pulls fleet-wide intelligence back to Go.
|
||||
|
||||
Plans created locally by Go are workspace artifacts. Plans created via PHP are persistent. Cross-agent plan handoff requires syncing through the API. Go MCP tools operate on local plans; PHP REST endpoints operate on database plans.
|
||||
|
||||
### Brain (`/v1/brain/*`)
|
||||
|
||||
|
|
@ -221,7 +250,7 @@ Standard CRUD patterns matching the domain model.
|
|||
|
||||
## 6. MCP Tools
|
||||
|
||||
Both implementations register these tools. Go exposes them via the core-agent MCP server binary. PHP exposes them via the AgentToolRegistry.
|
||||
Go exposes all tools via the core-agent MCP server binary. PHP exposes Brain, Plan, Session, and Message tools via the AgentToolRegistry. Dispatch, Workspace, and Forge tools are Go-only (PHP handles these via REST endpoints, not MCP tools).
|
||||
|
||||
### Brain Tools
|
||||
|
||||
|
|
@ -243,7 +272,8 @@ Both implementations register these tools. Go exposes them via the core-agent MC
|
|||
| `agentic_resume` | Resume agent |
|
||||
| `agentic_review_queue` | List review queue |
|
||||
| `agentic_dispatch_start` | Start dispatch service |
|
||||
| `agentic_dispatch_shutdown` | Graceful shutdown |
|
||||
| `agentic_dispatch_shutdown` | Graceful shutdown (drain queue) |
|
||||
| `agentic_dispatch_shutdown_now` | Immediate shutdown (kill running agents) |
|
||||
|
||||
### Workspace Tools
|
||||
|
||||
|
|
@ -316,7 +346,10 @@ The QA step captures EVERYTHING — the agent does not filter what it thinks is
|
|||
// QA handler — runs lint, captures all findings to workspace store
|
||||
func (s *QASubsystem) runQA(ctx context.Context, wsDir, repoDir string) QAResult {
|
||||
// Open workspace buffer for this dispatch cycle
|
||||
ws, _ := s.store.NewWorkspace(core.JoinPath(wsDir, "db.duckdb"))
|
||||
ws, err := s.store.NewWorkspace(core.Concat("qa-", core.PathBase(wsDir)))
|
||||
if err != nil {
|
||||
return QAResult{Error: core.E("qa.workspace", "create", err)}
|
||||
}
|
||||
|
||||
// Run core/lint — capture every finding
|
||||
lintResult := s.core.Action("lint.run").Run(ctx, s.core, core.Options{
|
||||
|
|
@ -535,7 +568,7 @@ core-agent fleet --api=https://api.lthn.ai --agent-id=charon
|
|||
|
||||
### Connection
|
||||
|
||||
- AgentApiKey authentication (provisioned via OAuth flow on first login)
|
||||
- AgentApiKey authentication. Bootstrap: `core login CODE` exchanges a 6-digit pairing code (generated at app.lthn.ai/device by a logged-in user) for an AgentApiKey. See lthn.ai RFC §11.7 Device Pairing. No OAuth needed — session auth on the web side, code exchange on the agent side.
|
||||
- SSE connection for real-time job push
|
||||
- Polling fallback for NAT'd nodes (`GET /v1/fleet/task/next`)
|
||||
- Heartbeat and capability registration (`POST /v1/fleet/heartbeat`)
|
||||
|
|
@ -755,19 +788,22 @@ If go-store is not loaded as a service, agent falls back to in-memory state (cur
|
|||
// OnStartup restores state from go-store. store.New is used directly —
|
||||
// agent owns its own store instance, it does not use the Core DI service registry for this.
|
||||
func (s *Service) OnStartup(ctx context.Context) core.Result {
|
||||
st, _ := store.New(".core/db.duckdb")
|
||||
st, err := store.New(".core/db.duckdb")
|
||||
if err != nil {
|
||||
return core.Result{Value: core.E("agent.startup", "state store", err), OK: false}
|
||||
}
|
||||
|
||||
// Restore queue — values are JSON strings stored via store.Set
|
||||
for key, val := range st.AllSeq("queue") {
|
||||
var task QueuedTask
|
||||
core.JSON.Unmarshal(val, &task)
|
||||
core.JSONUnmarshalString(val, &task)
|
||||
s.queue.Enqueue(task)
|
||||
}
|
||||
|
||||
// Restore registry — check PIDs, mark dead agents as failed
|
||||
for key, val := range st.AllSeq("registry") {
|
||||
var ws WorkspaceStatus
|
||||
core.JSON.Unmarshal(val, &ws)
|
||||
core.JSONUnmarshalString(val, &ws)
|
||||
if ws.Status == "running" && !pidAlive(ws.PID) {
|
||||
ws.Status = "failed"
|
||||
ws.Question = "Agent process died during restart"
|
||||
|
|
@ -824,7 +860,7 @@ After successful push or merge, delete the agent branch on Forge:
|
|||
```go
|
||||
// Clean up Forge branch after push
|
||||
func (s *Service) cleanupBranch(ctx context.Context, repo, branch string) {
|
||||
s.core.Action("forge.branch.delete").Run(ctx, s.core, core.Options{
|
||||
s.core.Action("agentic.branch.delete").Run(ctx, s.core, core.Options{
|
||||
"repo": repo,
|
||||
"branch": branch,
|
||||
})
|
||||
|
|
@ -833,20 +869,70 @@ func (s *Service) cleanupBranch(ctx context.Context, repo, branch string) {
|
|||
|
||||
Agent branches (`agent/*`) are ephemeral — they exist only during the dispatch cycle. Accumulation of stale branches pollutes the workspace prep and causes clone confusion.
|
||||
|
||||
### 15.5.2 Docker Mount Fix
|
||||
### 15.5.2 Workspace Mount
|
||||
|
||||
The dispatch container must mount the full workspace root, not just the repo:
|
||||
The dispatch container mounts the workspace directory as the agent's home. The repo is at `repo/` within the workspace. Specs are baked into the Docker image at `~/spec/` (read-only, COPY at build time). The entrypoint handles auth symlinks and spec availability.
|
||||
|
||||
### 15.5.3 Apple Container Dispatch
|
||||
|
||||
On macOS 26+, agent dispatch uses Apple Containers instead of Docker. Apple Containers provide hardware VM isolation with sub-second startup — no Docker Desktop required, no cold-start penalty, and agents cannot escape the sandbox even with root.
|
||||
|
||||
The container runtime is auto-detected via go-container's `Detect()` function, which probes available runtimes in preference order: Apple Container, Docker, Podman. The first available runtime is used unless overridden in `agents.yaml` or per-dispatch options.
|
||||
|
||||
The container image is immutable — built by go-build's LinuxKit builder, not by the agent. The OS environment (toolchains, dependencies, linters) is enforced at build time. Agents work inside a known environment regardless of host configuration.
|
||||
|
||||
```go
|
||||
// Current (broken): only repo visible inside container
|
||||
"-v", core.Concat(repoDir, ":/workspace"),
|
||||
// Dispatch an agent to an Apple Container workspace
|
||||
//
|
||||
// agent.Dispatch(task, agent.WithRuntime(container.Apple),
|
||||
// agent.WithImage(build.LinuxKit("core-dev")),
|
||||
// agent.WithMount("~/Code/project", "/workspace"),
|
||||
// agent.WithGPU(true), // Metal passthrough when available
|
||||
// )
|
||||
func (s *Service) dispatchAppleContainer(ctx context.Context, task DispatchTask) core.Result {
|
||||
// Detect runtime — prefers Apple → Docker → Podman
|
||||
rt := s.Core().Action("container.detect").Run(ctx, s.Core(), core.Options{})
|
||||
runtime := rt.Value.(string) // "apple", "docker", "podman"
|
||||
|
||||
// Fixed: full workspace visible — specs/, CODEX.md, .meta/ all accessible
|
||||
"-v", core.Concat(wsDir, ":/workspace"),
|
||||
"-w", "/workspace/repo", // working directory is still the repo
|
||||
// Resolve immutable image — built by go-build LinuxKit
|
||||
image := s.Core().Action("build.linuxkit.resolve").Run(ctx, s.Core(), core.Options{
|
||||
"base": task.Image, // "core-dev", "core-ml", "core-minimal"
|
||||
})
|
||||
|
||||
return s.Core().Action("container.run").Run(ctx, s.Core(), core.Options{
|
||||
"runtime": runtime,
|
||||
"image": image.Value.(string),
|
||||
"mount": core.Concat(task.WorkspaceDir, ":/workspace"),
|
||||
"gpu": task.GPU,
|
||||
"env": task.Env,
|
||||
"command": task.Command,
|
||||
})
|
||||
}
|
||||
```
|
||||
|
||||
This allows agents to read `../specs/RFC.md` and `../CODEX.md` from within the repo. The entrypoint validates `/workspace/repo` exists.
|
||||
**Runtime behaviour:**
|
||||
|
||||
| Property | Apple Container | Docker | Podman |
|
||||
|----------|----------------|--------|--------|
|
||||
| Isolation | Hardware VM (Virtualisation.framework) | Namespace/cgroup | Namespace/cgroup |
|
||||
| Startup | Sub-second | 2-5 seconds (cold) | 2-5 seconds (cold) |
|
||||
| GPU | Metal passthrough (roadmap) | NVIDIA only | NVIDIA only |
|
||||
| Root escape | Impossible (VM boundary) | Possible (misconfigured) | Possible (rootless mitigates) |
|
||||
| macOS native | Yes | Requires Docker Desktop | Requires Podman Machine |
|
||||
|
||||
**Fallback chain:** If Apple Containers are unavailable (macOS < 26, Linux host, CI environment), dispatch falls back to Docker automatically. The agent code is runtime-agnostic — the same `container.run` action handles all three runtimes.
|
||||
|
||||
**GPU passthrough:** Metal GPU passthrough is on Apple's roadmap. When available, `agent.WithGPU(true)` enables it — go-mlx works inside the container for local inference during agent tasks. Until then, `WithGPU(true)` is a no-op on Apple Containers and enables NVIDIA passthrough on Docker.
|
||||
|
||||
**Configuration:**
|
||||
|
||||
```yaml
|
||||
# agents.yaml — runtime preference override
|
||||
dispatch:
|
||||
runtime: auto # auto | apple | docker | podman
|
||||
image: core-dev # default LinuxKit image
|
||||
gpu: false # Metal passthrough (when available)
|
||||
```
|
||||
|
||||
### 15.6 Graceful Degradation
|
||||
|
||||
|
|
@ -895,8 +981,16 @@ Agents authenticated with api.lthn.ai can sync local state to the platform. Loca
|
|||
```
|
||||
Local agent (.core/db.duckdb)
|
||||
→ auth: api.lthn.ai (AgentApiKey)
|
||||
→ POST /v1/agent/sync (dispatch history, findings, reports)
|
||||
→ POST /v1/agent/sync (dispatches[] — see DispatchHistoryItem below)
|
||||
→ core/php/agent receives state
|
||||
|
||||
DispatchHistoryItem payload shape (Go produces, PHP consumes):
|
||||
{ id (UUID, generated at dispatch time), repo, branch, agent_model, task, template, status, started_at, completed_at,
|
||||
findings: [{tool, severity, file, category, message}],
|
||||
changes: {files_changed, insertions, deletions},
|
||||
report: {clusters_count, new_count, resolved_count, persistent_count},
|
||||
synced: false }
|
||||
|
||||
→ OpenBrain: embed findings as BrainMemory records
|
||||
→ WorkspaceState: update managed workflow progress
|
||||
→ Notify: alert subscribers of new findings
|
||||
|
|
@ -930,14 +1024,14 @@ func (s *Service) OnStartup(ctx context.Context) core.Result {
|
|||
func (s *Service) handleSyncPush(ctx context.Context, opts core.Options) core.Result {
|
||||
st := s.stateStore()
|
||||
if st == nil {
|
||||
return core.Result{OK: false, Error: core.E("agent.sync.push", "no store", nil)}
|
||||
return core.Result{OK: false, Value: core.E("agent.sync.push", "no store", nil)}
|
||||
}
|
||||
|
||||
// Collect unsync'd dispatch records
|
||||
var payload []map[string]any
|
||||
for key, val := range st.AllSeq("dispatch_history") {
|
||||
var record map[string]any
|
||||
core.JSON.Unmarshal(val, &record)
|
||||
core.JSONUnmarshalString(val, &record)
|
||||
if synced, _ := record["synced"].(bool); !synced {
|
||||
payload = append(payload, record)
|
||||
}
|
||||
|
|
@ -950,7 +1044,7 @@ func (s *Service) handleSyncPush(ctx context.Context, opts core.Options) core.Re
|
|||
// POST to lthn.ai
|
||||
result := s.Core().Action("api.post").Run(ctx, s.Core(), core.Options{
|
||||
"url": core.Concat(s.apiURL, "/v1/agent/sync"),
|
||||
"body": core.JSON.Marshal(payload),
|
||||
"body": core.JSONMarshalString(payload),
|
||||
"auth": s.apiKey,
|
||||
})
|
||||
|
||||
|
|
@ -958,7 +1052,7 @@ func (s *Service) handleSyncPush(ctx context.Context, opts core.Options) core.Re
|
|||
if result.OK {
|
||||
for _, record := range payload {
|
||||
record["synced"] = true
|
||||
st.Set("dispatch_history", record["id"].(string), core.JSON.Marshal(record))
|
||||
st.Set("dispatch_history", record["id"].(string), core.JSONMarshalString(record))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -983,12 +1077,12 @@ func (s *Service) handleSyncPull(ctx context.Context, opts core.Options) core.Re
|
|||
|
||||
// Merge fleet context into local store
|
||||
var context []map[string]any
|
||||
core.JSON.Unmarshal(result.Value.(string), &context)
|
||||
core.JSONUnmarshalString(result.Value.(string), &context)
|
||||
|
||||
st := s.stateStore()
|
||||
for _, entry := range context {
|
||||
if id, ok := entry["id"].(string); ok {
|
||||
st.Set("fleet_context", id, core.JSON.Marshal(entry))
|
||||
st.Set("fleet_context", id, core.JSONMarshalString(entry))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1057,12 +1151,13 @@ See `code/core/php/agent/RFC.md` § "API Endpoints" and § "OpenBrain" for the P
|
|||
| Poindexter (spatial analysis) | `code/snider/poindexter/RFC.md` |
|
||||
| Lint (QA gate) | `code/core/lint/RFC.md` |
|
||||
| MCP spec | `code/core/mcp/RFC.md` |
|
||||
| lthn.ai platform RFC | `project/lthn/ai/RFC.md` |
|
||||
| RAG RFC | `code/core/go/rag/RFC.md` |
|
||||
|
||||
---
|
||||
|
||||
## Changelog
|
||||
|
||||
- 2026-04-08: Added §15.5.3 Apple Container Dispatch — native macOS 26 hardware VM isolation, auto-detected runtime fallback chain (Apple → Docker → Podman), immutable LinuxKit images from go-build, Metal GPU passthrough (roadmap).
|
||||
- 2026-03-29: Restructured as language-agnostic contract. Go-specific code moved to `code/core/go/agent/RFC.md`. PHP-specific code stays in `code/core/php/agent/RFC.md`. Polyglot mapping, OpenBrain architecture, and completion pipeline consolidated here.
|
||||
- 2026-03-26: WIP — net/http consolidated to transport.go.
|
||||
- 2026-03-25: Initial spec — written with full core/go v0.8.0 domain context.
|
||||
|
|
|
|||
|
|
@ -347,6 +347,9 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result {
|
|||
if s.forgeToken != "" {
|
||||
go s.runPRManageLoop(ctx, prManageScheduleInterval)
|
||||
}
|
||||
if s.syncToken() != "" {
|
||||
go s.runSyncFlushLoop(ctx, syncFlushScheduleInterval)
|
||||
}
|
||||
|
||||
c.RegisterQuery(s.handleWorkspaceQuery)
|
||||
|
||||
|
|
|
|||
|
|
@ -13,6 +13,10 @@ type SyncPushInput struct {
|
|||
AgentID string `json:"agent_id,omitempty"`
|
||||
FleetNodeID int `json:"fleet_node_id,omitempty"`
|
||||
Dispatches []map[string]any `json:"dispatches,omitempty"`
|
||||
// QueueOnly skips the collectSyncDispatches() scan so the caller only
|
||||
// drains entries already queued. Used by the flush loop to avoid
|
||||
// re-adding the same completed workspaces on every tick.
|
||||
QueueOnly bool `json:"-"`
|
||||
}
|
||||
|
||||
type SyncPushOutput struct {
|
||||
|
|
@ -47,6 +51,8 @@ type syncQueuedPush struct {
|
|||
FleetNodeID int `json:"fleet_node_id,omitempty"`
|
||||
Dispatches []map[string]any `json:"dispatches"`
|
||||
QueuedAt time.Time `json:"queued_at"`
|
||||
Attempts int `json:"attempts,omitempty"`
|
||||
NextAttempt time.Time `json:"next_attempt,omitempty"`
|
||||
}
|
||||
|
||||
type syncStatusState struct {
|
||||
|
|
@ -54,6 +60,62 @@ type syncStatusState struct {
|
|||
LastPullAt time.Time `json:"last_pull_at,omitempty"`
|
||||
}
|
||||
|
||||
// syncBackoffSchedule implements RFC §16.5 — 1s → 5s → 15s → 60s → 5min max.
|
||||
// schedule := syncBackoffSchedule(2) // 15s
|
||||
// next := time.Now().Add(schedule)
|
||||
func syncBackoffSchedule(attempts int) time.Duration {
|
||||
switch {
|
||||
case attempts <= 0:
|
||||
return 0
|
||||
case attempts == 1:
|
||||
return time.Second
|
||||
case attempts == 2:
|
||||
return 5 * time.Second
|
||||
case attempts == 3:
|
||||
return 15 * time.Second
|
||||
case attempts == 4:
|
||||
return 60 * time.Second
|
||||
default:
|
||||
return 5 * time.Minute
|
||||
}
|
||||
}
|
||||
|
||||
// syncFlushScheduleInterval is the cadence at which queued pushes are retried
|
||||
// when the agent has been unable to reach the platform. Per RFC §16.5 the
|
||||
// retry window max is 5 minutes, so the scheduler wakes at that cadence and
|
||||
// each queued entry enforces its own NextAttempt gate.
|
||||
const syncFlushScheduleInterval = time.Minute
|
||||
|
||||
// ctx, cancel := context.WithCancel(context.Background())
|
||||
// go s.runSyncFlushLoop(ctx, time.Minute)
|
||||
func (s *PrepSubsystem) runSyncFlushLoop(ctx context.Context, interval time.Duration) {
|
||||
if ctx == nil || interval <= 0 {
|
||||
return
|
||||
}
|
||||
if s == nil || s.syncToken() == "" {
|
||||
return
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if len(readSyncQueue()) == 0 {
|
||||
continue
|
||||
}
|
||||
// QueueOnly keeps syncPushInput from re-scanning workspaces — the
|
||||
// flush loop only drains entries already queued.
|
||||
if _, err := s.syncPushInput(ctx, SyncPushInput{QueueOnly: true}); err != nil {
|
||||
core.Warn("sync flush loop failed", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// result := c.Action("agentic.sync.push").Run(ctx, core.NewOptions())
|
||||
func (s *PrepSubsystem) handleSyncPush(ctx context.Context, options core.Options) core.Result {
|
||||
output, err := s.syncPushInput(ctx, SyncPushInput{
|
||||
|
|
@ -90,7 +152,7 @@ func (s *PrepSubsystem) syncPushInput(ctx context.Context, input SyncPushInput)
|
|||
agentID = AgentName()
|
||||
}
|
||||
dispatches := input.Dispatches
|
||||
if len(dispatches) == 0 {
|
||||
if len(dispatches) == 0 && !input.QueueOnly {
|
||||
dispatches = collectSyncDispatches()
|
||||
}
|
||||
token := s.syncToken()
|
||||
|
|
@ -114,15 +176,25 @@ func (s *PrepSubsystem) syncPushInput(ctx context.Context, input SyncPushInput)
|
|||
}
|
||||
|
||||
synced := 0
|
||||
now := time.Now()
|
||||
for i, queued := range queuedPushes {
|
||||
if len(queued.Dispatches) == 0 {
|
||||
continue
|
||||
}
|
||||
if err := s.postSyncPush(ctx, queued.AgentID, queued.Dispatches, token); err != nil {
|
||||
if !queued.NextAttempt.IsZero() && queued.NextAttempt.After(now) {
|
||||
// Respect backoff — persist remaining tail so queue survives restart.
|
||||
writeSyncQueue(queuedPushes[i:])
|
||||
return SyncPushOutput{Success: true, Count: synced}, nil
|
||||
}
|
||||
if err := s.postSyncPush(ctx, queued.AgentID, queued.Dispatches, token); err != nil {
|
||||
remaining := append([]syncQueuedPush(nil), queuedPushes[i:]...)
|
||||
remaining[0].Attempts = queued.Attempts + 1
|
||||
remaining[0].NextAttempt = time.Now().Add(syncBackoffSchedule(remaining[0].Attempts))
|
||||
writeSyncQueue(remaining)
|
||||
return SyncPushOutput{Success: true, Count: synced}, nil
|
||||
}
|
||||
synced += len(queued.Dispatches)
|
||||
markDispatchesSynced(queued.Dispatches)
|
||||
recordSyncPush(time.Now())
|
||||
recordSyncHistory("push", queued.AgentID, queued.FleetNodeID, len(core.JSONMarshalString(map[string]any{
|
||||
"agent_id": queued.AgentID,
|
||||
|
|
@ -201,6 +273,7 @@ func (s *PrepSubsystem) syncToken() string {
|
|||
}
|
||||
|
||||
func collectSyncDispatches() []map[string]any {
|
||||
ledger := readSyncLedger()
|
||||
var dispatches []map[string]any
|
||||
for _, path := range WorkspaceStatusPaths() {
|
||||
workspaceDir := core.PathDir(path)
|
||||
|
|
@ -212,11 +285,102 @@ func collectSyncDispatches() []map[string]any {
|
|||
if !shouldSyncStatus(workspaceStatus.Status) {
|
||||
continue
|
||||
}
|
||||
dispatches = append(dispatches, syncDispatchRecord(workspaceDir, workspaceStatus))
|
||||
dispatchID := syncDispatchID(workspaceDir, workspaceStatus)
|
||||
if synced, ok := ledger[dispatchID]; ok && synced == syncDispatchFingerprint(workspaceStatus) {
|
||||
continue
|
||||
}
|
||||
record := syncDispatchRecord(workspaceDir, workspaceStatus)
|
||||
record["id"] = dispatchID
|
||||
dispatches = append(dispatches, record)
|
||||
}
|
||||
return dispatches
|
||||
}
|
||||
|
||||
// id := syncDispatchID(workspaceDir, workspaceStatus) // "core/go-io/task-5"
|
||||
func syncDispatchID(workspaceDir string, workspaceStatus *WorkspaceStatus) string {
|
||||
if workspaceStatus == nil {
|
||||
return WorkspaceName(workspaceDir)
|
||||
}
|
||||
return WorkspaceName(workspaceDir)
|
||||
}
|
||||
|
||||
// fingerprint := syncDispatchFingerprint(workspaceStatus) // "2026-04-14T12:00:00Z#3"
|
||||
// A dispatch is considered unchanged when (updated_at, runs) matches.
|
||||
// Any new activity (re-dispatch, status change) generates a fresh fingerprint.
|
||||
func syncDispatchFingerprint(workspaceStatus *WorkspaceStatus) string {
|
||||
if workspaceStatus == nil {
|
||||
return ""
|
||||
}
|
||||
return core.Concat(workspaceStatus.UpdatedAt.UTC().Format(time.RFC3339), "#", core.Sprintf("%d", workspaceStatus.Runs))
|
||||
}
|
||||
|
||||
// ledger := readSyncLedger() // map[dispatchID]fingerprint of last push
|
||||
func readSyncLedger() map[string]string {
|
||||
ledger := map[string]string{}
|
||||
result := fs.Read(syncLedgerPath())
|
||||
if !result.OK {
|
||||
return ledger
|
||||
}
|
||||
content := core.Trim(result.Value.(string))
|
||||
if content == "" {
|
||||
return ledger
|
||||
}
|
||||
if parseResult := core.JSONUnmarshalString(content, &ledger); !parseResult.OK {
|
||||
return map[string]string{}
|
||||
}
|
||||
return ledger
|
||||
}
|
||||
|
||||
// writeSyncLedger persists the dispatched fingerprints so the next scan
|
||||
// can skip workspaces that have already been pushed.
|
||||
func writeSyncLedger(ledger map[string]string) {
|
||||
if len(ledger) == 0 {
|
||||
fs.Delete(syncLedgerPath())
|
||||
return
|
||||
}
|
||||
fs.EnsureDir(syncStateDir())
|
||||
fs.WriteAtomic(syncLedgerPath(), core.JSONMarshalString(ledger))
|
||||
}
|
||||
|
||||
// markDispatchesSynced records which dispatches were successfully pushed so
|
||||
// collectSyncDispatches skips them on the next scan.
|
||||
func markDispatchesSynced(dispatches []map[string]any) {
|
||||
if len(dispatches) == 0 {
|
||||
return
|
||||
}
|
||||
ledger := readSyncLedger()
|
||||
changed := false
|
||||
for _, record := range dispatches {
|
||||
id := stringValue(record["id"])
|
||||
if id == "" {
|
||||
id = stringValue(record["workspace"])
|
||||
}
|
||||
if id == "" {
|
||||
continue
|
||||
}
|
||||
updatedAt := ""
|
||||
switch v := record["updated_at"].(type) {
|
||||
case time.Time:
|
||||
updatedAt = v.UTC().Format(time.RFC3339)
|
||||
case string:
|
||||
updatedAt = v
|
||||
}
|
||||
runs := 0
|
||||
if v, ok := record["runs"].(int); ok {
|
||||
runs = v
|
||||
}
|
||||
ledger[id] = core.Concat(updatedAt, "#", core.Sprintf("%d", runs))
|
||||
changed = true
|
||||
}
|
||||
if changed {
|
||||
writeSyncLedger(ledger)
|
||||
}
|
||||
}
|
||||
|
||||
func syncLedgerPath() string {
|
||||
return core.JoinPath(syncStateDir(), "ledger.json")
|
||||
}
|
||||
|
||||
func shouldSyncStatus(status string) bool {
|
||||
switch status {
|
||||
case "completed", "merged", "failed", "blocked":
|
||||
|
|
|
|||
|
|
@ -11,3 +11,17 @@ func Example_shouldSyncStatus() {
|
|||
// true
|
||||
// false
|
||||
}
|
||||
|
||||
func Example_syncBackoffSchedule() {
|
||||
fmt.Println(syncBackoffSchedule(1))
|
||||
fmt.Println(syncBackoffSchedule(2))
|
||||
fmt.Println(syncBackoffSchedule(3))
|
||||
fmt.Println(syncBackoffSchedule(4))
|
||||
fmt.Println(syncBackoffSchedule(5))
|
||||
// Output:
|
||||
// 1s
|
||||
// 5s
|
||||
// 15s
|
||||
// 1m0s
|
||||
// 5m0s
|
||||
}
|
||||
|
|
|
|||
|
|
@ -442,3 +442,210 @@ func TestSync_HandleSyncPull_Ugly(t *testing.T) {
|
|||
require.Len(t, output.Context, 1)
|
||||
assert.Equal(t, "cached-2", output.Context[0]["id"])
|
||||
}
|
||||
|
||||
// schedule := syncBackoffSchedule(3) // 15s
|
||||
func TestSync_SyncBackoffSchedule_Good(t *testing.T) {
|
||||
assert.Equal(t, time.Duration(0), syncBackoffSchedule(0))
|
||||
assert.Equal(t, time.Second, syncBackoffSchedule(1))
|
||||
assert.Equal(t, 5*time.Second, syncBackoffSchedule(2))
|
||||
assert.Equal(t, 15*time.Second, syncBackoffSchedule(3))
|
||||
assert.Equal(t, 60*time.Second, syncBackoffSchedule(4))
|
||||
assert.Equal(t, 5*time.Minute, syncBackoffSchedule(5))
|
||||
assert.Equal(t, 5*time.Minute, syncBackoffSchedule(100))
|
||||
}
|
||||
|
||||
func TestSync_SyncBackoffSchedule_Bad_NegativeAttempts(t *testing.T) {
|
||||
assert.Equal(t, time.Duration(0), syncBackoffSchedule(-1))
|
||||
assert.Equal(t, time.Duration(0), syncBackoffSchedule(-5))
|
||||
}
|
||||
|
||||
func TestSync_HandleSyncPush_Ugly_IncrementsBackoffOnFailure(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
setTestWorkspace(t, root)
|
||||
t.Setenv("CORE_AGENT_API_KEY", "secret-token")
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusBadGateway)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
subsystem := &PrepSubsystem{
|
||||
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
|
||||
brainURL: server.URL,
|
||||
}
|
||||
|
||||
// First failure — attempt 1, backoff 1s
|
||||
_, err := subsystem.syncPushInput(context.Background(), SyncPushInput{
|
||||
AgentID: "charon",
|
||||
Dispatches: []map[string]any{{"workspace": "w-1", "status": "completed"}},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
queued := readSyncQueue()
|
||||
require.Len(t, queued, 1)
|
||||
assert.Equal(t, 1, queued[0].Attempts)
|
||||
assert.False(t, queued[0].NextAttempt.IsZero())
|
||||
assert.True(t, queued[0].NextAttempt.After(time.Now()))
|
||||
assert.True(t, queued[0].NextAttempt.Before(time.Now().Add(2*time.Second)))
|
||||
}
|
||||
|
||||
func TestSync_RunSyncFlushLoop_Good_DrainsQueuedPushes(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
setTestWorkspace(t, root)
|
||||
t.Setenv("CORE_AGENT_API_KEY", "secret-token")
|
||||
|
||||
writeSyncQueue([]syncQueuedPush{{
|
||||
AgentID: "charon",
|
||||
Dispatches: []map[string]any{{"workspace": "w-1", "status": "completed"}},
|
||||
QueuedAt: time.Now().Add(-1 * time.Minute),
|
||||
}})
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
require.Equal(t, "/v1/agent/sync", r.URL.Path)
|
||||
_, _ = w.Write([]byte(`{"data":{"synced":1}}`))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
subsystem := &PrepSubsystem{
|
||||
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
|
||||
brainURL: server.URL,
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go subsystem.runSyncFlushLoop(ctx, 10*time.Millisecond)
|
||||
|
||||
deadline := time.Now().Add(5 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
if len(readSyncQueue()) == 0 {
|
||||
return
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
t.Fatalf("sync flush loop did not drain queue: %v", readSyncQueue())
|
||||
}
|
||||
|
||||
func TestSync_CollectSyncDispatches_Good_SkipsAlreadySynced(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
setTestWorkspace(t, root)
|
||||
|
||||
workspaceDir := core.JoinPath(root, "workspace", "core", "go-io", "task-5")
|
||||
fs.EnsureDir(workspaceDir)
|
||||
updatedAt := time.Date(2026, 4, 14, 12, 0, 0, 0, time.UTC)
|
||||
writeStatusResult(workspaceDir, &WorkspaceStatus{
|
||||
Status: "completed",
|
||||
Repo: "go-io",
|
||||
Org: "core",
|
||||
Runs: 1,
|
||||
UpdatedAt: updatedAt,
|
||||
})
|
||||
|
||||
// First scan picks it up.
|
||||
first := collectSyncDispatches()
|
||||
require.Len(t, first, 1)
|
||||
|
||||
// Mark as synced — next scan skips it.
|
||||
markDispatchesSynced(first)
|
||||
second := collectSyncDispatches()
|
||||
assert.Empty(t, second)
|
||||
|
||||
// When the workspace gets a new run, fingerprint changes → rescan.
|
||||
writeStatusResult(workspaceDir, &WorkspaceStatus{
|
||||
Status: "completed",
|
||||
Repo: "go-io",
|
||||
Org: "core",
|
||||
Runs: 2,
|
||||
UpdatedAt: updatedAt.Add(time.Hour),
|
||||
})
|
||||
third := collectSyncDispatches()
|
||||
assert.Len(t, third, 1)
|
||||
}
|
||||
|
||||
func TestSync_SyncPushInput_Good_QueueOnlySkipsWorkspaceScan(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
setTestWorkspace(t, root)
|
||||
t.Setenv("CORE_AGENT_API_KEY", "secret-token")
|
||||
|
||||
// Seed a completed workspace that would normally be picked up by scan.
|
||||
workspaceDir := core.JoinPath(root, "workspace", "core", "go-io", "task-5")
|
||||
fs.EnsureDir(workspaceDir)
|
||||
writeStatusResult(workspaceDir, &WorkspaceStatus{
|
||||
Status: "completed",
|
||||
Agent: "codex",
|
||||
Repo: "go-io",
|
||||
Org: "core",
|
||||
Task: "Fix tests",
|
||||
Branch: "agent/fix-tests",
|
||||
StartedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
})
|
||||
|
||||
called := 0
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
called++
|
||||
_, _ = w.Write([]byte(`{"data":{"synced":1}}`))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
subsystem := &PrepSubsystem{
|
||||
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
|
||||
brainURL: server.URL,
|
||||
}
|
||||
|
||||
// With an empty queue and no scan, nothing to push.
|
||||
output, err := subsystem.syncPushInput(context.Background(), SyncPushInput{QueueOnly: true})
|
||||
require.NoError(t, err)
|
||||
assert.True(t, output.Success)
|
||||
assert.Equal(t, 0, output.Count)
|
||||
assert.Equal(t, 0, called)
|
||||
assert.Empty(t, readSyncQueue())
|
||||
}
|
||||
|
||||
func TestSync_RunSyncFlushLoop_Bad_NoopWithoutToken(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
setTestWorkspace(t, root)
|
||||
t.Setenv("CORE_AGENT_API_KEY", "")
|
||||
|
||||
subsystem := &PrepSubsystem{
|
||||
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
// Should return immediately, no goroutine leak.
|
||||
subsystem.runSyncFlushLoop(ctx, 10*time.Millisecond)
|
||||
}
|
||||
|
||||
func TestSync_HandleSyncPush_Ugly_RespectsBackoffWindow(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
setTestWorkspace(t, root)
|
||||
t.Setenv("CORE_AGENT_API_KEY", "secret-token")
|
||||
|
||||
// Prime queue with a push that's still inside its backoff window
|
||||
writeSyncQueue([]syncQueuedPush{{
|
||||
AgentID: "charon",
|
||||
Dispatches: []map[string]any{{"workspace": "w-1", "status": "completed"}},
|
||||
QueuedAt: time.Now().Add(-2 * time.Minute),
|
||||
Attempts: 3,
|
||||
NextAttempt: time.Now().Add(5 * time.Minute),
|
||||
}})
|
||||
|
||||
called := 0
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
called++
|
||||
_, _ = w.Write([]byte(`{"data":{"synced":1}}`))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
subsystem := &PrepSubsystem{
|
||||
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
|
||||
brainURL: server.URL,
|
||||
}
|
||||
output, err := subsystem.syncPush(context.Background(), "")
|
||||
require.NoError(t, err)
|
||||
assert.True(t, output.Success)
|
||||
assert.Equal(t, 0, output.Count)
|
||||
assert.Equal(t, 0, called, "backoff must skip the HTTP call")
|
||||
|
||||
queued := readSyncQueue()
|
||||
require.Len(t, queued, 1)
|
||||
assert.Equal(t, 3, queued[0].Attempts)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue