feat(agentic): harden RFC platform sync handling

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-03-31 13:19:18 +00:00
parent 671a0872ef
commit bad6d66abf
6 changed files with 444 additions and 43 deletions

View file

@ -4,6 +4,7 @@ package agentic
import (
"context"
"time"
core "dappco.re/go/core"
)
@ -11,6 +12,7 @@ import (
// node := agentic.FleetNode{AgentID: "charon", Platform: "linux", Status: "online"}
type FleetNode struct {
ID int `json:"id"`
WorkspaceID int `json:"workspace_id,omitempty"`
AgentID string `json:"agent_id"`
Platform string `json:"platform"`
Models []string `json:"models,omitempty"`
@ -25,6 +27,8 @@ type FleetNode struct {
// task := agentic.FleetTask{ID: 7, Repo: "go-io", Task: "Fix tests", Status: "assigned"}
type FleetTask struct {
ID int `json:"id"`
WorkspaceID int `json:"workspace_id,omitempty"`
FleetNodeID int `json:"fleet_node_id,omitempty"`
Repo string `json:"repo"`
Branch string `json:"branch,omitempty"`
Task string `json:"task"`
@ -76,6 +80,8 @@ type CreditBalance struct {
// entry := agentic.CreditEntry{ID: 4, TaskType: "fleet-task", Amount: 2}
type CreditEntry struct {
ID int `json:"id"`
WorkspaceID int `json:"workspace_id,omitempty"`
FleetNodeID int `json:"fleet_node_id,omitempty"`
TaskType string `json:"task_type"`
Amount int `json:"amount"`
BalanceAfter int `json:"balance_after"`
@ -108,6 +114,13 @@ func (s *PrepSubsystem) handleSyncStatus(ctx context.Context, options core.Optio
Queued: len(readSyncQueue()),
ContextCount: len(readSyncContext()),
}
localStatus := readSyncStatusState()
if !localStatus.LastPushAt.IsZero() {
output.LastPushAt = localStatus.LastPushAt.Format(time.RFC3339)
}
if !localStatus.LastPullAt.IsZero() {
output.LastPullAt = localStatus.LastPullAt.Format(time.RFC3339)
}
if s.syncToken() == "" {
return core.Result{Value: output, OK: true}
@ -123,14 +136,27 @@ func (s *PrepSubsystem) handleSyncStatus(ctx context.Context, options core.Optio
return core.Result{Value: output, OK: true}
}
data := payloadDataMap(result.Value.(map[string]any))
data := payloadResourceMap(result.Value.(map[string]any), "status")
if len(data) == 0 {
return core.Result{Value: output, OK: true}
}
if remoteAgentID := stringValue(data["agent_id"]); remoteAgentID != "" {
output.AgentID = remoteAgentID
}
output.Status = stringValue(data["status"])
output.LastPushAt = stringValue(data["last_push_at"])
output.LastPullAt = stringValue(data["last_pull_at"])
if lastPushAt := stringValue(data["last_push_at"]); lastPushAt != "" {
output.LastPushAt = lastPushAt
}
if lastPullAt := stringValue(data["last_pull_at"]); lastPullAt != "" {
output.LastPullAt = lastPullAt
}
if queued, ok := intValueOK(data["queued"]); ok {
output.Queued = queued
}
if contextCount, ok := intValueOK(data["context_count"]); ok {
output.ContextCount = contextCount
}
if output.Status == "" {
output.Status = "online"
}
@ -166,7 +192,7 @@ func (s *PrepSubsystem) handleFleetRegister(ctx context.Context, options core.Op
return result
}
return core.Result{Value: parseFleetNode(payloadDataMap(result.Value.(map[string]any))), OK: true}
return core.Result{Value: parseFleetNode(payloadResourceMap(result.Value.(map[string]any), "node")), OK: true}
}
// result := c.Action("agentic.fleet.heartbeat").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"}))
@ -190,7 +216,7 @@ func (s *PrepSubsystem) handleFleetHeartbeat(ctx context.Context, options core.O
return result
}
return core.Result{Value: parseFleetNode(payloadDataMap(result.Value.(map[string]any))), OK: true}
return core.Result{Value: parseFleetNode(payloadResourceMap(result.Value.(map[string]any), "node")), OK: true}
}
// result := c.Action("agentic.fleet.deregister").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"}))
@ -256,7 +282,7 @@ func (s *PrepSubsystem) handleFleetAssignTask(ctx context.Context, options core.
return result
}
return core.Result{Value: parseFleetTask(payloadDataMap(result.Value.(map[string]any))), OK: true}
return core.Result{Value: parseFleetTask(payloadResourceMap(result.Value.(map[string]any), "task")), OK: true}
}
// result := c.Action("agentic.fleet.task.complete").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"}))
@ -289,7 +315,7 @@ func (s *PrepSubsystem) handleFleetCompleteTask(ctx context.Context, options cor
return result
}
return core.Result{Value: parseFleetTask(payloadDataMap(result.Value.(map[string]any))), OK: true}
return core.Result{Value: parseFleetTask(payloadResourceMap(result.Value.(map[string]any), "task")), OK: true}
}
// result := c.Action("agentic.fleet.task.next").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"}))
@ -307,7 +333,7 @@ func (s *PrepSubsystem) handleFleetNextTask(ctx context.Context, options core.Op
return result
}
data := payloadDataMap(result.Value.(map[string]any))
data := payloadResourceMap(result.Value.(map[string]any), "task")
if len(data) == 0 {
var task *FleetTask
return core.Result{Value: task, OK: true}
@ -324,7 +350,7 @@ func (s *PrepSubsystem) handleFleetStats(ctx context.Context, options core.Optio
return result
}
return core.Result{Value: parseFleetStats(payloadDataMap(result.Value.(map[string]any))), OK: true}
return core.Result{Value: parseFleetStats(payloadResourceMap(result.Value.(map[string]any), "stats")), OK: true}
}
// result := c.Action("agentic.credits.award").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"}))
@ -353,7 +379,7 @@ func (s *PrepSubsystem) handleCreditsAward(ctx context.Context, options core.Opt
return result
}
return core.Result{Value: parseCreditEntry(payloadDataMap(result.Value.(map[string]any))), OK: true}
return core.Result{Value: parseCreditEntry(payloadResourceMap(result.Value.(map[string]any), "entry")), OK: true}
}
// result := c.Action("agentic.credits.balance").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"}))
@ -369,7 +395,7 @@ func (s *PrepSubsystem) handleCreditsBalance(ctx context.Context, options core.O
return result
}
return core.Result{Value: parseCreditBalance(payloadDataMap(result.Value.(map[string]any))), OK: true}
return core.Result{Value: parseCreditBalance(payloadResourceMap(result.Value.(map[string]any), "balance")), OK: true}
}
// result := c.Action("agentic.credits.history").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"}))
@ -404,7 +430,7 @@ func (s *PrepSubsystem) handleSubscriptionDetect(ctx context.Context, options co
return result
}
return core.Result{Value: parseSubscriptionCapabilities(payloadDataMap(result.Value.(map[string]any))), OK: true}
return core.Result{Value: parseSubscriptionCapabilities(payloadResourceMap(result.Value.(map[string]any), "capabilities", "subscription")), OK: true}
}
// result := c.Action("agentic.subscription.budget").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"}))
@ -420,7 +446,7 @@ func (s *PrepSubsystem) handleSubscriptionBudget(ctx context.Context, options co
return result
}
return core.Result{Value: payloadDataMap(result.Value.(map[string]any)), OK: true}
return core.Result{Value: payloadResourceMap(result.Value.(map[string]any), "budget"), OK: true}
}
// result := c.Action("agentic.subscription.budget.update").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"}))
@ -439,7 +465,7 @@ func (s *PrepSubsystem) handleSubscriptionBudgetUpdate(ctx context.Context, opti
return result
}
return core.Result{Value: payloadDataMap(result.Value.(map[string]any)), OK: true}
return core.Result{Value: payloadResourceMap(result.Value.(map[string]any), "budget"), OK: true}
}
func (s *PrepSubsystem) platformPayload(ctx context.Context, action, method, path string, body any) core.Result {
@ -492,13 +518,88 @@ func payloadDataMap(payload map[string]any) map[string]any {
return anyMapValue(payload["data"])
}
func payloadDataSlice(payload map[string]any) []map[string]any {
return anyMapSliceValue(payload["data"])
func payloadDataSlice(payload map[string]any, keys ...string) []map[string]any {
if values := anyMapSliceValue(payload["data"]); len(values) > 0 {
return values
}
if data := payloadDataMap(payload); len(data) > 0 {
for _, key := range keys {
if values := anyMapSliceValue(data[key]); len(values) > 0 {
return values
}
}
}
for _, key := range keys {
if values := anyMapSliceValue(payload[key]); len(values) > 0 {
return values
}
}
return nil
}
func payloadResourceMap(payload map[string]any, keys ...string) map[string]any {
if data := payloadDataMap(payload); len(data) > 0 {
for _, key := range keys {
if values := anyMapValue(data[key]); len(values) > 0 {
return values
}
}
return data
}
for _, key := range keys {
if values := anyMapValue(payload[key]); len(values) > 0 {
return values
}
}
for key, value := range payload {
switch key {
case "data", "error", "code", "message":
continue
}
if value != nil {
return payload
}
}
return nil
}
func mapIntValue(values map[string]any, keys ...string) int {
for _, key := range keys {
if value, ok := values[key]; ok {
return intValue(value)
}
}
return 0
}
func intValueOK(value any) (int, bool) {
switch typed := value.(type) {
case int:
return typed, true
case int64:
return int(typed), true
case float64:
return int(typed), true
case string:
trimmed := core.Trim(typed)
parsed := parseInt(trimmed)
if parsed != 0 || trimmed == "0" {
return parsed, true
}
}
return 0, false
}
func parseFleetNode(values map[string]any) FleetNode {
return FleetNode{
ID: intValue(values["id"]),
WorkspaceID: intValue(values["workspace_id"]),
AgentID: stringValue(values["agent_id"]),
Platform: stringValue(values["platform"]),
Models: listValue(values["models"]),
@ -514,6 +615,8 @@ func parseFleetNode(values map[string]any) FleetNode {
func parseFleetTask(values map[string]any) FleetTask {
return FleetTask{
ID: intValue(values["id"]),
WorkspaceID: intValue(values["workspace_id"]),
FleetNodeID: intValue(values["fleet_node_id"]),
Repo: stringValue(values["repo"]),
Branch: stringValue(values["branch"]),
Task: stringValue(values["task"]),
@ -530,13 +633,16 @@ func parseFleetTask(values map[string]any) FleetTask {
}
func parseFleetNodesOutput(payload map[string]any) FleetNodesOutput {
nodesData := payloadDataSlice(payload)
nodesData := payloadDataSlice(payload, "nodes")
nodes := make([]FleetNode, 0, len(nodesData))
for _, values := range nodesData {
nodes = append(nodes, parseFleetNode(values))
}
total := intValue(payload["total"])
total := mapIntValue(payload, "total", "count")
if total == 0 {
total = mapIntValue(payloadDataMap(payload), "total", "count")
}
if total == 0 {
total = len(nodes)
}
@ -561,6 +667,8 @@ func parseFleetStats(values map[string]any) FleetStats {
func parseCreditEntry(values map[string]any) CreditEntry {
return CreditEntry{
ID: intValue(values["id"]),
WorkspaceID: intValue(values["workspace_id"]),
FleetNodeID: intValue(values["fleet_node_id"]),
TaskType: stringValue(values["task_type"]),
Amount: intValue(values["amount"]),
BalanceAfter: intValue(values["balance_after"]),
@ -578,13 +686,16 @@ func parseCreditBalance(values map[string]any) CreditBalance {
}
func parseCreditsHistoryOutput(payload map[string]any) CreditsHistoryOutput {
entriesData := payloadDataSlice(payload)
entriesData := payloadDataSlice(payload, "entries", "history")
entries := make([]CreditEntry, 0, len(entriesData))
for _, values := range entriesData {
entries = append(entries, parseCreditEntry(values))
}
total := intValue(payload["total"])
total := mapIntValue(payload, "total", "count")
if total == 0 {
total = mapIntValue(payloadDataMap(payload), "total", "count")
}
if total == 0 {
total = len(entries)
}
@ -596,10 +707,19 @@ func parseCreditsHistoryOutput(payload map[string]any) CreditsHistoryOutput {
}
func parseSubscriptionCapabilities(values map[string]any) SubscriptionCapabilities {
return SubscriptionCapabilities{
capabilities := SubscriptionCapabilities{
Providers: boolMapValue(values["providers"]),
Available: listValue(values["available"]),
}
if len(capabilities.Available) == 0 && len(capabilities.Providers) > 0 {
for name, enabled := range capabilities.Providers {
if enabled {
capabilities.Available = append(capabilities.Available, name)
}
}
capabilities.Available = cleanStrings(capabilities.Available)
}
return capabilities
}
func appendQueryParam(path, key, value string) string {

View file

@ -194,6 +194,43 @@ func TestPlatform_HandleCreditsHistory_Good(t *testing.T) {
assert.Equal(t, 7, output.Entries[0].BalanceAfter)
}
func TestPlatform_HandleFleetNodes_Good_NestedEnvelope(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"data":{"nodes":[{"id":1,"workspace_id":7,"agent_id":"charon","platform":"linux","models":["codex"],"status":"online"}],"total":1}}`))
}))
defer server.Close()
subsystem := testPrepWithPlatformServer(t, server, "secret-token")
result := subsystem.handleFleetNodes(context.Background(), core.NewOptions())
require.True(t, result.OK)
output, ok := result.Value.(FleetNodesOutput)
require.True(t, ok)
require.Len(t, output.Nodes, 1)
assert.Equal(t, 1, output.Total)
assert.Equal(t, 7, output.Nodes[0].WorkspaceID)
}
func TestPlatform_HandleCreditsHistory_Good_NestedEnvelope(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"data":{"entries":[{"id":1,"workspace_id":3,"fleet_node_id":9,"task_type":"fleet-task","amount":2,"balance_after":7}],"total":1}}`))
}))
defer server.Close()
subsystem := testPrepWithPlatformServer(t, server, "secret-token")
result := subsystem.handleCreditsHistory(context.Background(), core.NewOptions(
core.Option{Key: "agent_id", Value: "charon"},
))
require.True(t, result.OK)
output, ok := result.Value.(CreditsHistoryOutput)
require.True(t, ok)
require.Len(t, output.Entries, 1)
assert.Equal(t, 1, output.Total)
assert.Equal(t, 3, output.Entries[0].WorkspaceID)
assert.Equal(t, 9, output.Entries[0].FleetNodeID)
}
func TestPlatform_HandleSubscriptionDetect_Good(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "/v1/subscription/detect", r.URL.Path)
@ -222,3 +259,43 @@ func TestPlatform_HandleSubscriptionDetect_Good(t *testing.T) {
assert.True(t, capabilities.Providers["claude"])
assert.Equal(t, []string{"claude", "openai"}, capabilities.Available)
}
func TestPlatform_HandleSubscriptionDetect_Good_ProvidersOnly(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"data":{"providers":{"claude":true,"openai":false,"gemini":true}}}`))
}))
defer server.Close()
subsystem := testPrepWithPlatformServer(t, server, "secret-token")
result := subsystem.handleSubscriptionDetect(context.Background(), core.NewOptions())
require.True(t, result.OK)
capabilities, ok := result.Value.(SubscriptionCapabilities)
require.True(t, ok)
assert.ElementsMatch(t, []string{"claude", "gemini"}, capabilities.Available)
}
func TestPlatform_HandleSyncStatus_Good_LocalStateFallback(t *testing.T) {
root := t.TempDir()
t.Setenv("CORE_WORKSPACE", root)
t.Setenv("CORE_AGENT_API_KEY", "")
t.Setenv("CORE_BRAIN_KEY", "")
recordSyncPush(time.Date(2026, 3, 31, 8, 0, 0, 0, time.UTC))
recordSyncPull(time.Date(2026, 3, 31, 8, 5, 0, 0, time.UTC))
c := core.New()
subsystem := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
result := subsystem.handleSyncStatus(context.Background(), core.NewOptions(
core.Option{Key: "agent_id", Value: "charon"},
))
require.True(t, result.OK)
status, ok := result.Value.(SyncStatusOutput)
require.True(t, ok)
assert.Equal(t, "2026-03-31T08:00:00Z", status.LastPushAt)
assert.Equal(t, "2026-03-31T08:05:00Z", status.LastPullAt)
}

View file

@ -288,7 +288,7 @@ func (s *PrepSubsystem) sessionEnd(ctx context.Context, _ *mcp.CallToolRequest,
}
func sessionDataMap(payload map[string]any) map[string]any {
data := payloadDataMap(payload)
data := payloadResourceMap(payload, "session")
if len(data) > 0 {
return data
}
@ -320,15 +320,15 @@ func parseSession(values map[string]any) Session {
}
func parseSessionListOutput(payload map[string]any) SessionListOutput {
sessionData := payloadDataSlice(payload)
sessionData := payloadDataSlice(payload, "sessions")
sessions := make([]Session, 0, len(sessionData))
for _, values := range sessionData {
sessions = append(sessions, parseSession(values))
}
count := intValue(payload["count"])
count := mapIntValue(payload, "count", "total")
if count == 0 {
count = intValue(payload["total"])
count = mapIntValue(payloadDataMap(payload), "count", "total")
}
if count == 0 {
count = len(sessions)

View file

@ -87,6 +87,24 @@ func TestSession_HandleSessionGet_Good(t *testing.T) {
assert.Equal(t, "ax-follow-up", output.Session.Plan)
}
func TestSession_HandleSessionGet_Good_NestedEnvelope(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"data":{"session":{"session_id":"ses_nested","plan":"ax-follow-up","agent_type":"codex","status":"active"}}}`))
}))
defer server.Close()
subsystem := testPrepWithPlatformServer(t, server, "secret-token")
result := subsystem.handleSessionGet(context.Background(), core.NewOptions(
core.Option{Key: "session_id", Value: "ses_nested"},
))
require.True(t, result.OK)
output, ok := result.Value.(SessionOutput)
require.True(t, ok)
assert.Equal(t, "ses_nested", output.Session.SessionID)
assert.Equal(t, "active", output.Session.Status)
}
func TestSession_HandleSessionList_Good(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "/v1/sessions", r.URL.Path)
@ -112,6 +130,23 @@ func TestSession_HandleSessionList_Good(t *testing.T) {
assert.Equal(t, "ses_1", output.Sessions[0].SessionID)
}
func TestSession_HandleSessionList_Good_NestedEnvelope(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"data":{"sessions":[{"session_id":"ses_1","agent_type":"codex","status":"active"}],"total":1}}`))
}))
defer server.Close()
subsystem := testPrepWithPlatformServer(t, server, "secret-token")
result := subsystem.handleSessionList(context.Background(), core.NewOptions())
require.True(t, result.OK)
output, ok := result.Value.(SessionListOutput)
require.True(t, ok)
assert.Equal(t, 1, output.Count)
require.Len(t, output.Sessions, 1)
assert.Equal(t, "ses_1", output.Sessions[0].SessionID)
}
func TestSession_HandleSessionContinue_Good(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "/v1/sessions/ses_abc123/continue", r.URL.Path)

View file

@ -34,6 +34,11 @@ type syncQueuedPush struct {
QueuedAt time.Time `json:"queued_at"`
}
type syncStatusState struct {
LastPushAt time.Time `json:"last_push_at,omitempty"`
LastPullAt time.Time `json:"last_pull_at,omitempty"`
}
// result := c.Action("agentic.sync.push").Run(ctx, core.NewOptions())
func (s *PrepSubsystem) handleSyncPush(ctx context.Context, options core.Options) core.Result {
output, err := s.syncPush(ctx, options.String("agent_id"))
@ -84,6 +89,7 @@ func (s *PrepSubsystem) syncPush(ctx context.Context, agentID string) (SyncPushO
return SyncPushOutput{Success: true, Count: synced}, nil
}
synced += len(queued.Dispatches)
recordSyncPush(time.Now())
}
writeSyncQueue(nil)
@ -107,20 +113,21 @@ func (s *PrepSubsystem) syncPull(ctx context.Context, agentID string) (SyncPullO
return SyncPullOutput{Success: true, Count: len(cached), Context: cached}, nil
}
var response struct {
Data []map[string]any `json:"data"`
}
var response map[string]any
parseResult := core.JSONUnmarshalString(result.Value.(string), &response)
if !parseResult.OK {
cached := readSyncContext()
return SyncPullOutput{Success: true, Count: len(cached), Context: cached}, nil
}
writeSyncContext(response.Data)
contextData := syncContextPayload(response)
writeSyncContext(contextData)
recordSyncPull(time.Now())
return SyncPullOutput{
Success: true,
Count: len(response.Data),
Context: response.Data,
Count: len(contextData),
Context: contextData,
}, nil
}
@ -159,18 +166,7 @@ func collectSyncDispatches() []map[string]any {
if !shouldSyncStatus(workspaceStatus.Status) {
continue
}
dispatches = append(dispatches, map[string]any{
"workspace": WorkspaceName(workspaceDir),
"repo": workspaceStatus.Repo,
"org": workspaceStatus.Org,
"task": workspaceStatus.Task,
"agent": workspaceStatus.Agent,
"branch": workspaceStatus.Branch,
"status": workspaceStatus.Status,
"pr_url": workspaceStatus.PRURL,
"started_at": workspaceStatus.StartedAt,
"updated_at": workspaceStatus.UpdatedAt,
})
dispatches = append(dispatches, syncDispatchRecord(workspaceDir, workspaceStatus))
}
return dispatches
}
@ -213,6 +209,10 @@ func syncContextPath() string {
return core.JoinPath(syncStateDir(), "context.json")
}
func syncStatusPath() string {
return core.JoinPath(syncStateDir(), "status.json")
}
func readSyncQueue() []syncQueuedPush {
var queued []syncQueuedPush
result := fs.Read(syncQueuePath())
@ -252,3 +252,89 @@ func writeSyncContext(contextData []map[string]any) {
fs.EnsureDir(syncStateDir())
fs.WriteAtomic(syncContextPath(), core.JSONMarshalString(contextData))
}
func syncContextPayload(payload map[string]any) []map[string]any {
if contextData := payloadDataSlice(payload, "context", "items", "memories"); len(contextData) > 0 {
return contextData
}
return nil
}
func syncDispatchRecord(workspaceDir string, workspaceStatus *WorkspaceStatus) map[string]any {
record := map[string]any{
"workspace": WorkspaceName(workspaceDir),
"repo": workspaceStatus.Repo,
"org": workspaceStatus.Org,
"task": workspaceStatus.Task,
"agent": workspaceStatus.Agent,
"branch": workspaceStatus.Branch,
"status": workspaceStatus.Status,
"question": workspaceStatus.Question,
"issue": workspaceStatus.Issue,
"runs": workspaceStatus.Runs,
"process_id": workspaceStatus.ProcessID,
"pr_url": workspaceStatus.PRURL,
"started_at": workspaceStatus.StartedAt,
"updated_at": workspaceStatus.UpdatedAt,
}
if report := readSyncWorkspaceReport(workspaceDir); len(report) > 0 {
record["report"] = report
if findings := anyMapSliceValue(report["findings"]); len(findings) > 0 {
record["findings"] = findings
}
if changes := anyMapValue(report["changes"]); len(changes) > 0 {
record["changes"] = changes
}
}
return record
}
func readSyncWorkspaceReport(workspaceDir string) map[string]any {
reportPath := core.JoinPath(WorkspaceMetaDir(workspaceDir), "report.json")
result := fs.Read(reportPath)
if !result.OK {
return nil
}
var report map[string]any
parseResult := core.JSONUnmarshalString(result.Value.(string), &report)
if !parseResult.OK {
return nil
}
return report
}
func readSyncStatusState() syncStatusState {
var state syncStatusState
result := fs.Read(syncStatusPath())
if !result.OK {
return state
}
parseResult := core.JSONUnmarshalString(result.Value.(string), &state)
if !parseResult.OK {
return syncStatusState{}
}
return state
}
func writeSyncStatusState(state syncStatusState) {
fs.EnsureDir(syncStateDir())
fs.WriteAtomic(syncStatusPath(), core.JSONMarshalString(state))
}
func recordSyncPush(at time.Time) {
state := readSyncStatusState()
state.LastPushAt = at
writeSyncStatusState(state)
}
func recordSyncPull(at time.Time) {
state := readSyncStatusState()
state.LastPullAt = at
writeSyncStatusState(state)
}

View file

@ -59,6 +59,7 @@ func TestSync_HandleSyncPush_Good(t *testing.T) {
require.NoError(t, err)
assert.True(t, output.Success)
assert.Equal(t, 1, output.Count)
assert.False(t, readSyncStatusState().LastPushAt.IsZero())
}
func TestSync_HandleSyncPush_Bad(t *testing.T) {
@ -155,6 +156,88 @@ func TestSync_HandleSyncPull_Good(t *testing.T) {
cached := readSyncContext()
require.Len(t, cached, 1)
assert.Equal(t, "mem-1", cached[0]["id"])
assert.False(t, readSyncStatusState().LastPullAt.IsZero())
}
func TestSync_HandleSyncPush_Good_ReportMetadata(t *testing.T) {
root := t.TempDir()
t.Setenv("CORE_WORKSPACE", root)
t.Setenv("CORE_AGENT_API_KEY", "secret-token")
workspaceDir := core.JoinPath(root, "workspace", "core", "go-io", "task-5")
fs.EnsureDir(WorkspaceMetaDir(workspaceDir))
require.True(t, fs.Write(core.JoinPath(WorkspaceMetaDir(workspaceDir), "report.json"), `{"findings":[{"file":"main.go"}],"changes":{"files_changed":1}}`).OK)
writeStatusResult(workspaceDir, &WorkspaceStatus{
Status: "blocked",
Agent: "codex",
Repo: "go-io",
Org: "core",
Task: "Fix tests",
Branch: "agent/fix-tests",
Issue: 42,
Question: "Which API version?",
ProcessID: "proc-1",
Runs: 2,
StartedAt: time.Now(),
UpdatedAt: time.Now(),
})
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
bodyResult := core.ReadAll(r.Body)
require.True(t, bodyResult.OK)
var payload map[string]any
parseResult := core.JSONUnmarshalString(bodyResult.Value.(string), &payload)
require.True(t, parseResult.OK)
dispatches, ok := payload["dispatches"].([]any)
require.True(t, ok)
require.Len(t, dispatches, 1)
record, ok := dispatches[0].(map[string]any)
require.True(t, ok)
require.Equal(t, "Which API version?", record["question"])
require.Equal(t, float64(42), record["issue"])
require.Equal(t, float64(2), record["runs"])
require.Equal(t, "proc-1", record["process_id"])
require.NotNil(t, record["report"])
require.NotNil(t, record["findings"])
require.NotNil(t, record["changes"])
_, _ = w.Write([]byte(`{"data":{"synced":1}}`))
}))
defer server.Close()
subsystem := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
brainURL: server.URL,
}
output, err := subsystem.syncPush(context.Background(), "")
require.NoError(t, err)
assert.True(t, output.Success)
assert.Equal(t, 1, output.Count)
}
func TestSync_HandleSyncPull_Good_NestedEnvelope(t *testing.T) {
root := t.TempDir()
t.Setenv("CORE_WORKSPACE", root)
t.Setenv("CORE_AGENT_API_KEY", "secret-token")
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"data":{"context":[{"id":"ctx-1","content":"Known pattern"}]}}`))
}))
defer server.Close()
subsystem := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
brainURL: server.URL,
}
output, err := subsystem.syncPull(context.Background(), "codex")
require.NoError(t, err)
assert.True(t, output.Success)
assert.Equal(t, 1, output.Count)
require.Len(t, output.Context, 1)
assert.Equal(t, "ctx-1", output.Context[0]["id"])
}
func TestSync_HandleSyncPull_Bad(t *testing.T) {