diff --git a/docs/RFC-AGENT.md b/docs/RFC-AGENT.md index fca395a..3cbcc6a 100644 --- a/docs/RFC-AGENT.md +++ b/docs/RFC-AGENT.md @@ -1,3 +1,21 @@ +--- +module: core/agent +repo: core/agent +lang: multi +tier: consumer +depends: + - code/core/go/process + - code/core/go/store + - code/core/mcp + - code/snider/poindexter +tags: + - dispatch + - orchestration + - pipeline + - agents + - memory +--- + # core/agent RFC — Agentic Dispatch, Orchestration, and Pipeline Management > The cross-cutting contract for the agent system. @@ -20,7 +38,7 @@ The contract is language-agnostic. Go implements the local MCP server and dispat | Model | Purpose | |-------|---------| -| `AgentPlan` | Structured work plan with phases, soft-deleted, activity-logged | +| `AgentPlan` | Structured work plan with phases, soft-deleted, activity-logged. Status enum: `draft`, `active`, `in_progress`, `needs_verification`, `verified`, `completed`, `archived`. Both Go and PHP must accept all values. | | `AgentPhase` | Individual phase within a plan (tasks, dependencies, status) | | `AgentSession` | Agent work session (context, work_log, artefacts, handoff) | | `AgentMessage` | Direct agent-to-agent messaging (chronological, not semantic) | @@ -59,6 +77,7 @@ Both implementations provide these capabilities, registered as named actions: | `resume` | Resume a paused or failed agent session | | `scan` | Scan Forge repos for actionable issues | | `watch` | Watch workspace for agent output changes | +| `complete` | Run the full completion pipeline (QA → PR → Verify → Ingest → Poke) | ### Pipeline @@ -81,6 +100,8 @@ Both implementations provide these capabilities, registered as named actions: | `pr.get` | Get a single pull request | | `pr.list` | List pull requests | | `pr.merge` | Merge a pull request | +| `pr.close` | Close a pull request without merging | +| `branch.delete` | Delete a feature branch after merge or close | ### Brain @@ -143,7 +164,9 @@ Shared semantic knowledge store. All agents read and write via `brain_*` tools. | `type` | enum | decision, observation, convention, research, plan, bug, architecture | | `content` | text | The knowledge (markdown) | | `tags` | JSON | Topic tags for filtering | +| `org` | string nullable | Organisation scope (e.g. "core", "lthn", "ofm" — null = global) | | `project` | string nullable | Repo/project scope (null = cross-project) | +| `indexed_at` | timestamp nullable | When Qdrant/ES indexing completed (null = pending async embed) | | `confidence` | float | 0.0-1.0 | | `supersedes_id` | UUID nullable | FK to older memory this replaces | | `expires_at` | timestamp nullable | TTL for session-scoped context | @@ -174,7 +197,13 @@ brain_recall(query, filters) ## 5. API Surface -Both implementations expose these endpoints. PHP serves them as REST routes; Go exposes equivalent capabilities via MCP tools and local IPC. +Both implementations expose these capabilities but with different storage backends: + +- **Go** operates on **local workspace state** — plans, sessions, and findings live in `.core/` filesystem and DuckDB. Go is the local agent runtime. +- **PHP** operates on **persistent database state** — MariaDB, Qdrant, Elasticsearch. PHP is the fleet coordination platform. +- **Sync** connects them: `POST /v1/agent/sync` pushes Go's local dispatch history/findings to PHP's persistent store. `GET /v1/agent/context` pulls fleet-wide intelligence back to Go. + +Plans created locally by Go are workspace artifacts. Plans created via PHP are persistent. Cross-agent plan handoff requires syncing through the API. Go MCP tools operate on local plans; PHP REST endpoints operate on database plans. ### Brain (`/v1/brain/*`) @@ -221,7 +250,7 @@ Standard CRUD patterns matching the domain model. ## 6. MCP Tools -Both implementations register these tools. Go exposes them via the core-agent MCP server binary. PHP exposes them via the AgentToolRegistry. +Go exposes all tools via the core-agent MCP server binary. PHP exposes Brain, Plan, Session, and Message tools via the AgentToolRegistry. Dispatch, Workspace, and Forge tools are Go-only (PHP handles these via REST endpoints, not MCP tools). ### Brain Tools @@ -243,7 +272,8 @@ Both implementations register these tools. Go exposes them via the core-agent MC | `agentic_resume` | Resume agent | | `agentic_review_queue` | List review queue | | `agentic_dispatch_start` | Start dispatch service | -| `agentic_dispatch_shutdown` | Graceful shutdown | +| `agentic_dispatch_shutdown` | Graceful shutdown (drain queue) | +| `agentic_dispatch_shutdown_now` | Immediate shutdown (kill running agents) | ### Workspace Tools @@ -316,7 +346,10 @@ The QA step captures EVERYTHING — the agent does not filter what it thinks is // QA handler — runs lint, captures all findings to workspace store func (s *QASubsystem) runQA(ctx context.Context, wsDir, repoDir string) QAResult { // Open workspace buffer for this dispatch cycle - ws, _ := s.store.NewWorkspace(core.JoinPath(wsDir, "db.duckdb")) + ws, err := s.store.NewWorkspace(core.Concat("qa-", core.PathBase(wsDir))) + if err != nil { + return QAResult{Error: core.E("qa.workspace", "create", err)} + } // Run core/lint — capture every finding lintResult := s.core.Action("lint.run").Run(ctx, s.core, core.Options{ @@ -535,7 +568,7 @@ core-agent fleet --api=https://api.lthn.ai --agent-id=charon ### Connection -- AgentApiKey authentication (provisioned via OAuth flow on first login) +- AgentApiKey authentication. Bootstrap: `core login CODE` exchanges a 6-digit pairing code (generated at app.lthn.ai/device by a logged-in user) for an AgentApiKey. See lthn.ai RFC §11.7 Device Pairing. No OAuth needed — session auth on the web side, code exchange on the agent side. - SSE connection for real-time job push - Polling fallback for NAT'd nodes (`GET /v1/fleet/task/next`) - Heartbeat and capability registration (`POST /v1/fleet/heartbeat`) @@ -755,19 +788,22 @@ If go-store is not loaded as a service, agent falls back to in-memory state (cur // OnStartup restores state from go-store. store.New is used directly — // agent owns its own store instance, it does not use the Core DI service registry for this. func (s *Service) OnStartup(ctx context.Context) core.Result { - st, _ := store.New(".core/db.duckdb") + st, err := store.New(".core/db.duckdb") + if err != nil { + return core.Result{Value: core.E("agent.startup", "state store", err), OK: false} + } // Restore queue — values are JSON strings stored via store.Set for key, val := range st.AllSeq("queue") { var task QueuedTask - core.JSON.Unmarshal(val, &task) + core.JSONUnmarshalString(val, &task) s.queue.Enqueue(task) } // Restore registry — check PIDs, mark dead agents as failed for key, val := range st.AllSeq("registry") { var ws WorkspaceStatus - core.JSON.Unmarshal(val, &ws) + core.JSONUnmarshalString(val, &ws) if ws.Status == "running" && !pidAlive(ws.PID) { ws.Status = "failed" ws.Question = "Agent process died during restart" @@ -824,7 +860,7 @@ After successful push or merge, delete the agent branch on Forge: ```go // Clean up Forge branch after push func (s *Service) cleanupBranch(ctx context.Context, repo, branch string) { - s.core.Action("forge.branch.delete").Run(ctx, s.core, core.Options{ + s.core.Action("agentic.branch.delete").Run(ctx, s.core, core.Options{ "repo": repo, "branch": branch, }) @@ -833,20 +869,70 @@ func (s *Service) cleanupBranch(ctx context.Context, repo, branch string) { Agent branches (`agent/*`) are ephemeral — they exist only during the dispatch cycle. Accumulation of stale branches pollutes the workspace prep and causes clone confusion. -### 15.5.2 Docker Mount Fix +### 15.5.2 Workspace Mount -The dispatch container must mount the full workspace root, not just the repo: +The dispatch container mounts the workspace directory as the agent's home. The repo is at `repo/` within the workspace. Specs are baked into the Docker image at `~/spec/` (read-only, COPY at build time). The entrypoint handles auth symlinks and spec availability. + +### 15.5.3 Apple Container Dispatch + +On macOS 26+, agent dispatch uses Apple Containers instead of Docker. Apple Containers provide hardware VM isolation with sub-second startup — no Docker Desktop required, no cold-start penalty, and agents cannot escape the sandbox even with root. + +The container runtime is auto-detected via go-container's `Detect()` function, which probes available runtimes in preference order: Apple Container, Docker, Podman. The first available runtime is used unless overridden in `agents.yaml` or per-dispatch options. + +The container image is immutable — built by go-build's LinuxKit builder, not by the agent. The OS environment (toolchains, dependencies, linters) is enforced at build time. Agents work inside a known environment regardless of host configuration. ```go -// Current (broken): only repo visible inside container -"-v", core.Concat(repoDir, ":/workspace"), +// Dispatch an agent to an Apple Container workspace +// +// agent.Dispatch(task, agent.WithRuntime(container.Apple), +// agent.WithImage(build.LinuxKit("core-dev")), +// agent.WithMount("~/Code/project", "/workspace"), +// agent.WithGPU(true), // Metal passthrough when available +// ) +func (s *Service) dispatchAppleContainer(ctx context.Context, task DispatchTask) core.Result { + // Detect runtime — prefers Apple → Docker → Podman + rt := s.Core().Action("container.detect").Run(ctx, s.Core(), core.Options{}) + runtime := rt.Value.(string) // "apple", "docker", "podman" -// Fixed: full workspace visible — specs/, CODEX.md, .meta/ all accessible -"-v", core.Concat(wsDir, ":/workspace"), -"-w", "/workspace/repo", // working directory is still the repo + // Resolve immutable image — built by go-build LinuxKit + image := s.Core().Action("build.linuxkit.resolve").Run(ctx, s.Core(), core.Options{ + "base": task.Image, // "core-dev", "core-ml", "core-minimal" + }) + + return s.Core().Action("container.run").Run(ctx, s.Core(), core.Options{ + "runtime": runtime, + "image": image.Value.(string), + "mount": core.Concat(task.WorkspaceDir, ":/workspace"), + "gpu": task.GPU, + "env": task.Env, + "command": task.Command, + }) +} ``` -This allows agents to read `../specs/RFC.md` and `../CODEX.md` from within the repo. The entrypoint validates `/workspace/repo` exists. +**Runtime behaviour:** + +| Property | Apple Container | Docker | Podman | +|----------|----------------|--------|--------| +| Isolation | Hardware VM (Virtualisation.framework) | Namespace/cgroup | Namespace/cgroup | +| Startup | Sub-second | 2-5 seconds (cold) | 2-5 seconds (cold) | +| GPU | Metal passthrough (roadmap) | NVIDIA only | NVIDIA only | +| Root escape | Impossible (VM boundary) | Possible (misconfigured) | Possible (rootless mitigates) | +| macOS native | Yes | Requires Docker Desktop | Requires Podman Machine | + +**Fallback chain:** If Apple Containers are unavailable (macOS < 26, Linux host, CI environment), dispatch falls back to Docker automatically. The agent code is runtime-agnostic — the same `container.run` action handles all three runtimes. + +**GPU passthrough:** Metal GPU passthrough is on Apple's roadmap. When available, `agent.WithGPU(true)` enables it — go-mlx works inside the container for local inference during agent tasks. Until then, `WithGPU(true)` is a no-op on Apple Containers and enables NVIDIA passthrough on Docker. + +**Configuration:** + +```yaml +# agents.yaml — runtime preference override +dispatch: + runtime: auto # auto | apple | docker | podman + image: core-dev # default LinuxKit image + gpu: false # Metal passthrough (when available) +``` ### 15.6 Graceful Degradation @@ -895,8 +981,16 @@ Agents authenticated with api.lthn.ai can sync local state to the platform. Loca ``` Local agent (.core/db.duckdb) → auth: api.lthn.ai (AgentApiKey) - → POST /v1/agent/sync (dispatch history, findings, reports) + → POST /v1/agent/sync (dispatches[] — see DispatchHistoryItem below) → core/php/agent receives state + +DispatchHistoryItem payload shape (Go produces, PHP consumes): + { id (UUID, generated at dispatch time), repo, branch, agent_model, task, template, status, started_at, completed_at, + findings: [{tool, severity, file, category, message}], + changes: {files_changed, insertions, deletions}, + report: {clusters_count, new_count, resolved_count, persistent_count}, + synced: false } + → OpenBrain: embed findings as BrainMemory records → WorkspaceState: update managed workflow progress → Notify: alert subscribers of new findings @@ -930,14 +1024,14 @@ func (s *Service) OnStartup(ctx context.Context) core.Result { func (s *Service) handleSyncPush(ctx context.Context, opts core.Options) core.Result { st := s.stateStore() if st == nil { - return core.Result{OK: false, Error: core.E("agent.sync.push", "no store", nil)} + return core.Result{OK: false, Value: core.E("agent.sync.push", "no store", nil)} } // Collect unsync'd dispatch records var payload []map[string]any for key, val := range st.AllSeq("dispatch_history") { var record map[string]any - core.JSON.Unmarshal(val, &record) + core.JSONUnmarshalString(val, &record) if synced, _ := record["synced"].(bool); !synced { payload = append(payload, record) } @@ -950,7 +1044,7 @@ func (s *Service) handleSyncPush(ctx context.Context, opts core.Options) core.Re // POST to lthn.ai result := s.Core().Action("api.post").Run(ctx, s.Core(), core.Options{ "url": core.Concat(s.apiURL, "/v1/agent/sync"), - "body": core.JSON.Marshal(payload), + "body": core.JSONMarshalString(payload), "auth": s.apiKey, }) @@ -958,7 +1052,7 @@ func (s *Service) handleSyncPush(ctx context.Context, opts core.Options) core.Re if result.OK { for _, record := range payload { record["synced"] = true - st.Set("dispatch_history", record["id"].(string), core.JSON.Marshal(record)) + st.Set("dispatch_history", record["id"].(string), core.JSONMarshalString(record)) } } @@ -983,12 +1077,12 @@ func (s *Service) handleSyncPull(ctx context.Context, opts core.Options) core.Re // Merge fleet context into local store var context []map[string]any - core.JSON.Unmarshal(result.Value.(string), &context) + core.JSONUnmarshalString(result.Value.(string), &context) st := s.stateStore() for _, entry := range context { if id, ok := entry["id"].(string); ok { - st.Set("fleet_context", id, core.JSON.Marshal(entry)) + st.Set("fleet_context", id, core.JSONMarshalString(entry)) } } @@ -1057,12 +1151,13 @@ See `code/core/php/agent/RFC.md` § "API Endpoints" and § "OpenBrain" for the P | Poindexter (spatial analysis) | `code/snider/poindexter/RFC.md` | | Lint (QA gate) | `code/core/lint/RFC.md` | | MCP spec | `code/core/mcp/RFC.md` | -| lthn.ai platform RFC | `project/lthn/ai/RFC.md` | +| RAG RFC | `code/core/go/rag/RFC.md` | --- ## Changelog +- 2026-04-08: Added §15.5.3 Apple Container Dispatch — native macOS 26 hardware VM isolation, auto-detected runtime fallback chain (Apple → Docker → Podman), immutable LinuxKit images from go-build, Metal GPU passthrough (roadmap). - 2026-03-29: Restructured as language-agnostic contract. Go-specific code moved to `code/core/go/agent/RFC.md`. PHP-specific code stays in `code/core/php/agent/RFC.md`. Polyglot mapping, OpenBrain architecture, and completion pipeline consolidated here. - 2026-03-26: WIP — net/http consolidated to transport.go. - 2026-03-25: Initial spec — written with full core/go v0.8.0 domain context. diff --git a/pkg/agentic/prep.go b/pkg/agentic/prep.go index 8f950fb..0597350 100644 --- a/pkg/agentic/prep.go +++ b/pkg/agentic/prep.go @@ -347,6 +347,9 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result { if s.forgeToken != "" { go s.runPRManageLoop(ctx, prManageScheduleInterval) } + if s.syncToken() != "" { + go s.runSyncFlushLoop(ctx, syncFlushScheduleInterval) + } c.RegisterQuery(s.handleWorkspaceQuery) diff --git a/pkg/agentic/sync.go b/pkg/agentic/sync.go index 310119c..0428781 100644 --- a/pkg/agentic/sync.go +++ b/pkg/agentic/sync.go @@ -13,6 +13,10 @@ type SyncPushInput struct { AgentID string `json:"agent_id,omitempty"` FleetNodeID int `json:"fleet_node_id,omitempty"` Dispatches []map[string]any `json:"dispatches,omitempty"` + // QueueOnly skips the collectSyncDispatches() scan so the caller only + // drains entries already queued. Used by the flush loop to avoid + // re-adding the same completed workspaces on every tick. + QueueOnly bool `json:"-"` } type SyncPushOutput struct { @@ -47,6 +51,8 @@ type syncQueuedPush struct { FleetNodeID int `json:"fleet_node_id,omitempty"` Dispatches []map[string]any `json:"dispatches"` QueuedAt time.Time `json:"queued_at"` + Attempts int `json:"attempts,omitempty"` + NextAttempt time.Time `json:"next_attempt,omitempty"` } type syncStatusState struct { @@ -54,6 +60,62 @@ type syncStatusState struct { LastPullAt time.Time `json:"last_pull_at,omitempty"` } +// syncBackoffSchedule implements RFC §16.5 — 1s → 5s → 15s → 60s → 5min max. +// schedule := syncBackoffSchedule(2) // 15s +// next := time.Now().Add(schedule) +func syncBackoffSchedule(attempts int) time.Duration { + switch { + case attempts <= 0: + return 0 + case attempts == 1: + return time.Second + case attempts == 2: + return 5 * time.Second + case attempts == 3: + return 15 * time.Second + case attempts == 4: + return 60 * time.Second + default: + return 5 * time.Minute + } +} + +// syncFlushScheduleInterval is the cadence at which queued pushes are retried +// when the agent has been unable to reach the platform. Per RFC §16.5 the +// retry window max is 5 minutes, so the scheduler wakes at that cadence and +// each queued entry enforces its own NextAttempt gate. +const syncFlushScheduleInterval = time.Minute + +// ctx, cancel := context.WithCancel(context.Background()) +// go s.runSyncFlushLoop(ctx, time.Minute) +func (s *PrepSubsystem) runSyncFlushLoop(ctx context.Context, interval time.Duration) { + if ctx == nil || interval <= 0 { + return + } + if s == nil || s.syncToken() == "" { + return + } + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if len(readSyncQueue()) == 0 { + continue + } + // QueueOnly keeps syncPushInput from re-scanning workspaces — the + // flush loop only drains entries already queued. + if _, err := s.syncPushInput(ctx, SyncPushInput{QueueOnly: true}); err != nil { + core.Warn("sync flush loop failed", "error", err) + } + } + } +} + // result := c.Action("agentic.sync.push").Run(ctx, core.NewOptions()) func (s *PrepSubsystem) handleSyncPush(ctx context.Context, options core.Options) core.Result { output, err := s.syncPushInput(ctx, SyncPushInput{ @@ -90,7 +152,7 @@ func (s *PrepSubsystem) syncPushInput(ctx context.Context, input SyncPushInput) agentID = AgentName() } dispatches := input.Dispatches - if len(dispatches) == 0 { + if len(dispatches) == 0 && !input.QueueOnly { dispatches = collectSyncDispatches() } token := s.syncToken() @@ -114,15 +176,25 @@ func (s *PrepSubsystem) syncPushInput(ctx context.Context, input SyncPushInput) } synced := 0 + now := time.Now() for i, queued := range queuedPushes { if len(queued.Dispatches) == 0 { continue } - if err := s.postSyncPush(ctx, queued.AgentID, queued.Dispatches, token); err != nil { + if !queued.NextAttempt.IsZero() && queued.NextAttempt.After(now) { + // Respect backoff — persist remaining tail so queue survives restart. writeSyncQueue(queuedPushes[i:]) return SyncPushOutput{Success: true, Count: synced}, nil } + if err := s.postSyncPush(ctx, queued.AgentID, queued.Dispatches, token); err != nil { + remaining := append([]syncQueuedPush(nil), queuedPushes[i:]...) + remaining[0].Attempts = queued.Attempts + 1 + remaining[0].NextAttempt = time.Now().Add(syncBackoffSchedule(remaining[0].Attempts)) + writeSyncQueue(remaining) + return SyncPushOutput{Success: true, Count: synced}, nil + } synced += len(queued.Dispatches) + markDispatchesSynced(queued.Dispatches) recordSyncPush(time.Now()) recordSyncHistory("push", queued.AgentID, queued.FleetNodeID, len(core.JSONMarshalString(map[string]any{ "agent_id": queued.AgentID, @@ -201,6 +273,7 @@ func (s *PrepSubsystem) syncToken() string { } func collectSyncDispatches() []map[string]any { + ledger := readSyncLedger() var dispatches []map[string]any for _, path := range WorkspaceStatusPaths() { workspaceDir := core.PathDir(path) @@ -212,11 +285,102 @@ func collectSyncDispatches() []map[string]any { if !shouldSyncStatus(workspaceStatus.Status) { continue } - dispatches = append(dispatches, syncDispatchRecord(workspaceDir, workspaceStatus)) + dispatchID := syncDispatchID(workspaceDir, workspaceStatus) + if synced, ok := ledger[dispatchID]; ok && synced == syncDispatchFingerprint(workspaceStatus) { + continue + } + record := syncDispatchRecord(workspaceDir, workspaceStatus) + record["id"] = dispatchID + dispatches = append(dispatches, record) } return dispatches } +// id := syncDispatchID(workspaceDir, workspaceStatus) // "core/go-io/task-5" +func syncDispatchID(workspaceDir string, workspaceStatus *WorkspaceStatus) string { + if workspaceStatus == nil { + return WorkspaceName(workspaceDir) + } + return WorkspaceName(workspaceDir) +} + +// fingerprint := syncDispatchFingerprint(workspaceStatus) // "2026-04-14T12:00:00Z#3" +// A dispatch is considered unchanged when (updated_at, runs) matches. +// Any new activity (re-dispatch, status change) generates a fresh fingerprint. +func syncDispatchFingerprint(workspaceStatus *WorkspaceStatus) string { + if workspaceStatus == nil { + return "" + } + return core.Concat(workspaceStatus.UpdatedAt.UTC().Format(time.RFC3339), "#", core.Sprintf("%d", workspaceStatus.Runs)) +} + +// ledger := readSyncLedger() // map[dispatchID]fingerprint of last push +func readSyncLedger() map[string]string { + ledger := map[string]string{} + result := fs.Read(syncLedgerPath()) + if !result.OK { + return ledger + } + content := core.Trim(result.Value.(string)) + if content == "" { + return ledger + } + if parseResult := core.JSONUnmarshalString(content, &ledger); !parseResult.OK { + return map[string]string{} + } + return ledger +} + +// writeSyncLedger persists the dispatched fingerprints so the next scan +// can skip workspaces that have already been pushed. +func writeSyncLedger(ledger map[string]string) { + if len(ledger) == 0 { + fs.Delete(syncLedgerPath()) + return + } + fs.EnsureDir(syncStateDir()) + fs.WriteAtomic(syncLedgerPath(), core.JSONMarshalString(ledger)) +} + +// markDispatchesSynced records which dispatches were successfully pushed so +// collectSyncDispatches skips them on the next scan. +func markDispatchesSynced(dispatches []map[string]any) { + if len(dispatches) == 0 { + return + } + ledger := readSyncLedger() + changed := false + for _, record := range dispatches { + id := stringValue(record["id"]) + if id == "" { + id = stringValue(record["workspace"]) + } + if id == "" { + continue + } + updatedAt := "" + switch v := record["updated_at"].(type) { + case time.Time: + updatedAt = v.UTC().Format(time.RFC3339) + case string: + updatedAt = v + } + runs := 0 + if v, ok := record["runs"].(int); ok { + runs = v + } + ledger[id] = core.Concat(updatedAt, "#", core.Sprintf("%d", runs)) + changed = true + } + if changed { + writeSyncLedger(ledger) + } +} + +func syncLedgerPath() string { + return core.JoinPath(syncStateDir(), "ledger.json") +} + func shouldSyncStatus(status string) bool { switch status { case "completed", "merged", "failed", "blocked": diff --git a/pkg/agentic/sync_example_test.go b/pkg/agentic/sync_example_test.go index bd77ebc..38a989b 100644 --- a/pkg/agentic/sync_example_test.go +++ b/pkg/agentic/sync_example_test.go @@ -11,3 +11,17 @@ func Example_shouldSyncStatus() { // true // false } + +func Example_syncBackoffSchedule() { + fmt.Println(syncBackoffSchedule(1)) + fmt.Println(syncBackoffSchedule(2)) + fmt.Println(syncBackoffSchedule(3)) + fmt.Println(syncBackoffSchedule(4)) + fmt.Println(syncBackoffSchedule(5)) + // Output: + // 1s + // 5s + // 15s + // 1m0s + // 5m0s +} diff --git a/pkg/agentic/sync_test.go b/pkg/agentic/sync_test.go index 3e09df1..976cfda 100644 --- a/pkg/agentic/sync_test.go +++ b/pkg/agentic/sync_test.go @@ -442,3 +442,210 @@ func TestSync_HandleSyncPull_Ugly(t *testing.T) { require.Len(t, output.Context, 1) assert.Equal(t, "cached-2", output.Context[0]["id"]) } + +// schedule := syncBackoffSchedule(3) // 15s +func TestSync_SyncBackoffSchedule_Good(t *testing.T) { + assert.Equal(t, time.Duration(0), syncBackoffSchedule(0)) + assert.Equal(t, time.Second, syncBackoffSchedule(1)) + assert.Equal(t, 5*time.Second, syncBackoffSchedule(2)) + assert.Equal(t, 15*time.Second, syncBackoffSchedule(3)) + assert.Equal(t, 60*time.Second, syncBackoffSchedule(4)) + assert.Equal(t, 5*time.Minute, syncBackoffSchedule(5)) + assert.Equal(t, 5*time.Minute, syncBackoffSchedule(100)) +} + +func TestSync_SyncBackoffSchedule_Bad_NegativeAttempts(t *testing.T) { + assert.Equal(t, time.Duration(0), syncBackoffSchedule(-1)) + assert.Equal(t, time.Duration(0), syncBackoffSchedule(-5)) +} + +func TestSync_HandleSyncPush_Ugly_IncrementsBackoffOnFailure(t *testing.T) { + root := t.TempDir() + setTestWorkspace(t, root) + t.Setenv("CORE_AGENT_API_KEY", "secret-token") + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadGateway) + })) + defer server.Close() + + subsystem := &PrepSubsystem{ + ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), + brainURL: server.URL, + } + + // First failure — attempt 1, backoff 1s + _, err := subsystem.syncPushInput(context.Background(), SyncPushInput{ + AgentID: "charon", + Dispatches: []map[string]any{{"workspace": "w-1", "status": "completed"}}, + }) + require.NoError(t, err) + queued := readSyncQueue() + require.Len(t, queued, 1) + assert.Equal(t, 1, queued[0].Attempts) + assert.False(t, queued[0].NextAttempt.IsZero()) + assert.True(t, queued[0].NextAttempt.After(time.Now())) + assert.True(t, queued[0].NextAttempt.Before(time.Now().Add(2*time.Second))) +} + +func TestSync_RunSyncFlushLoop_Good_DrainsQueuedPushes(t *testing.T) { + root := t.TempDir() + setTestWorkspace(t, root) + t.Setenv("CORE_AGENT_API_KEY", "secret-token") + + writeSyncQueue([]syncQueuedPush{{ + AgentID: "charon", + Dispatches: []map[string]any{{"workspace": "w-1", "status": "completed"}}, + QueuedAt: time.Now().Add(-1 * time.Minute), + }}) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "/v1/agent/sync", r.URL.Path) + _, _ = w.Write([]byte(`{"data":{"synced":1}}`)) + })) + defer server.Close() + + subsystem := &PrepSubsystem{ + ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), + brainURL: server.URL, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go subsystem.runSyncFlushLoop(ctx, 10*time.Millisecond) + + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + if len(readSyncQueue()) == 0 { + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatalf("sync flush loop did not drain queue: %v", readSyncQueue()) +} + +func TestSync_CollectSyncDispatches_Good_SkipsAlreadySynced(t *testing.T) { + root := t.TempDir() + setTestWorkspace(t, root) + + workspaceDir := core.JoinPath(root, "workspace", "core", "go-io", "task-5") + fs.EnsureDir(workspaceDir) + updatedAt := time.Date(2026, 4, 14, 12, 0, 0, 0, time.UTC) + writeStatusResult(workspaceDir, &WorkspaceStatus{ + Status: "completed", + Repo: "go-io", + Org: "core", + Runs: 1, + UpdatedAt: updatedAt, + }) + + // First scan picks it up. + first := collectSyncDispatches() + require.Len(t, first, 1) + + // Mark as synced — next scan skips it. + markDispatchesSynced(first) + second := collectSyncDispatches() + assert.Empty(t, second) + + // When the workspace gets a new run, fingerprint changes → rescan. + writeStatusResult(workspaceDir, &WorkspaceStatus{ + Status: "completed", + Repo: "go-io", + Org: "core", + Runs: 2, + UpdatedAt: updatedAt.Add(time.Hour), + }) + third := collectSyncDispatches() + assert.Len(t, third, 1) +} + +func TestSync_SyncPushInput_Good_QueueOnlySkipsWorkspaceScan(t *testing.T) { + root := t.TempDir() + setTestWorkspace(t, root) + t.Setenv("CORE_AGENT_API_KEY", "secret-token") + + // Seed a completed workspace that would normally be picked up by scan. + workspaceDir := core.JoinPath(root, "workspace", "core", "go-io", "task-5") + fs.EnsureDir(workspaceDir) + writeStatusResult(workspaceDir, &WorkspaceStatus{ + Status: "completed", + Agent: "codex", + Repo: "go-io", + Org: "core", + Task: "Fix tests", + Branch: "agent/fix-tests", + StartedAt: time.Now(), + UpdatedAt: time.Now(), + }) + + called := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called++ + _, _ = w.Write([]byte(`{"data":{"synced":1}}`)) + })) + defer server.Close() + + subsystem := &PrepSubsystem{ + ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), + brainURL: server.URL, + } + + // With an empty queue and no scan, nothing to push. + output, err := subsystem.syncPushInput(context.Background(), SyncPushInput{QueueOnly: true}) + require.NoError(t, err) + assert.True(t, output.Success) + assert.Equal(t, 0, output.Count) + assert.Equal(t, 0, called) + assert.Empty(t, readSyncQueue()) +} + +func TestSync_RunSyncFlushLoop_Bad_NoopWithoutToken(t *testing.T) { + root := t.TempDir() + setTestWorkspace(t, root) + t.Setenv("CORE_AGENT_API_KEY", "") + + subsystem := &PrepSubsystem{ + ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), + } + ctx, cancel := context.WithCancel(context.Background()) + cancel() + // Should return immediately, no goroutine leak. + subsystem.runSyncFlushLoop(ctx, 10*time.Millisecond) +} + +func TestSync_HandleSyncPush_Ugly_RespectsBackoffWindow(t *testing.T) { + root := t.TempDir() + setTestWorkspace(t, root) + t.Setenv("CORE_AGENT_API_KEY", "secret-token") + + // Prime queue with a push that's still inside its backoff window + writeSyncQueue([]syncQueuedPush{{ + AgentID: "charon", + Dispatches: []map[string]any{{"workspace": "w-1", "status": "completed"}}, + QueuedAt: time.Now().Add(-2 * time.Minute), + Attempts: 3, + NextAttempt: time.Now().Add(5 * time.Minute), + }}) + + called := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called++ + _, _ = w.Write([]byte(`{"data":{"synced":1}}`)) + })) + defer server.Close() + + subsystem := &PrepSubsystem{ + ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), + brainURL: server.URL, + } + output, err := subsystem.syncPush(context.Background(), "") + require.NoError(t, err) + assert.True(t, output.Success) + assert.Equal(t, 0, output.Count) + assert.Equal(t, 0, called, "backoff must skip the HTTP call") + + queued := readSyncQueue() + require.Len(t, queued, 1) + assert.Equal(t, 3, queued[0].Attempts) +}