diff --git a/pkg/agentic/platform.go b/pkg/agentic/platform.go index afe9153..593524a 100644 --- a/pkg/agentic/platform.go +++ b/pkg/agentic/platform.go @@ -4,6 +4,7 @@ package agentic import ( "context" + "time" core "dappco.re/go/core" ) @@ -11,6 +12,7 @@ import ( // node := agentic.FleetNode{AgentID: "charon", Platform: "linux", Status: "online"} type FleetNode struct { ID int `json:"id"` + WorkspaceID int `json:"workspace_id,omitempty"` AgentID string `json:"agent_id"` Platform string `json:"platform"` Models []string `json:"models,omitempty"` @@ -25,6 +27,8 @@ type FleetNode struct { // task := agentic.FleetTask{ID: 7, Repo: "go-io", Task: "Fix tests", Status: "assigned"} type FleetTask struct { ID int `json:"id"` + WorkspaceID int `json:"workspace_id,omitempty"` + FleetNodeID int `json:"fleet_node_id,omitempty"` Repo string `json:"repo"` Branch string `json:"branch,omitempty"` Task string `json:"task"` @@ -76,6 +80,8 @@ type CreditBalance struct { // entry := agentic.CreditEntry{ID: 4, TaskType: "fleet-task", Amount: 2} type CreditEntry struct { ID int `json:"id"` + WorkspaceID int `json:"workspace_id,omitempty"` + FleetNodeID int `json:"fleet_node_id,omitempty"` TaskType string `json:"task_type"` Amount int `json:"amount"` BalanceAfter int `json:"balance_after"` @@ -108,6 +114,13 @@ func (s *PrepSubsystem) handleSyncStatus(ctx context.Context, options core.Optio Queued: len(readSyncQueue()), ContextCount: len(readSyncContext()), } + localStatus := readSyncStatusState() + if !localStatus.LastPushAt.IsZero() { + output.LastPushAt = localStatus.LastPushAt.Format(time.RFC3339) + } + if !localStatus.LastPullAt.IsZero() { + output.LastPullAt = localStatus.LastPullAt.Format(time.RFC3339) + } if s.syncToken() == "" { return core.Result{Value: output, OK: true} @@ -123,14 +136,27 @@ func (s *PrepSubsystem) handleSyncStatus(ctx context.Context, options core.Optio return core.Result{Value: output, OK: true} } - data := payloadDataMap(result.Value.(map[string]any)) + data := payloadResourceMap(result.Value.(map[string]any), "status") if len(data) == 0 { return core.Result{Value: output, OK: true} } + if remoteAgentID := stringValue(data["agent_id"]); remoteAgentID != "" { + output.AgentID = remoteAgentID + } output.Status = stringValue(data["status"]) - output.LastPushAt = stringValue(data["last_push_at"]) - output.LastPullAt = stringValue(data["last_pull_at"]) + if lastPushAt := stringValue(data["last_push_at"]); lastPushAt != "" { + output.LastPushAt = lastPushAt + } + if lastPullAt := stringValue(data["last_pull_at"]); lastPullAt != "" { + output.LastPullAt = lastPullAt + } + if queued, ok := intValueOK(data["queued"]); ok { + output.Queued = queued + } + if contextCount, ok := intValueOK(data["context_count"]); ok { + output.ContextCount = contextCount + } if output.Status == "" { output.Status = "online" } @@ -166,7 +192,7 @@ func (s *PrepSubsystem) handleFleetRegister(ctx context.Context, options core.Op return result } - return core.Result{Value: parseFleetNode(payloadDataMap(result.Value.(map[string]any))), OK: true} + return core.Result{Value: parseFleetNode(payloadResourceMap(result.Value.(map[string]any), "node")), OK: true} } // result := c.Action("agentic.fleet.heartbeat").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"})) @@ -190,7 +216,7 @@ func (s *PrepSubsystem) handleFleetHeartbeat(ctx context.Context, options core.O return result } - return core.Result{Value: parseFleetNode(payloadDataMap(result.Value.(map[string]any))), OK: true} + return core.Result{Value: parseFleetNode(payloadResourceMap(result.Value.(map[string]any), "node")), OK: true} } // result := c.Action("agentic.fleet.deregister").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"})) @@ -256,7 +282,7 @@ func (s *PrepSubsystem) handleFleetAssignTask(ctx context.Context, options core. return result } - return core.Result{Value: parseFleetTask(payloadDataMap(result.Value.(map[string]any))), OK: true} + return core.Result{Value: parseFleetTask(payloadResourceMap(result.Value.(map[string]any), "task")), OK: true} } // result := c.Action("agentic.fleet.task.complete").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"})) @@ -289,7 +315,7 @@ func (s *PrepSubsystem) handleFleetCompleteTask(ctx context.Context, options cor return result } - return core.Result{Value: parseFleetTask(payloadDataMap(result.Value.(map[string]any))), OK: true} + return core.Result{Value: parseFleetTask(payloadResourceMap(result.Value.(map[string]any), "task")), OK: true} } // result := c.Action("agentic.fleet.task.next").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"})) @@ -307,7 +333,7 @@ func (s *PrepSubsystem) handleFleetNextTask(ctx context.Context, options core.Op return result } - data := payloadDataMap(result.Value.(map[string]any)) + data := payloadResourceMap(result.Value.(map[string]any), "task") if len(data) == 0 { var task *FleetTask return core.Result{Value: task, OK: true} @@ -324,7 +350,7 @@ func (s *PrepSubsystem) handleFleetStats(ctx context.Context, options core.Optio return result } - return core.Result{Value: parseFleetStats(payloadDataMap(result.Value.(map[string]any))), OK: true} + return core.Result{Value: parseFleetStats(payloadResourceMap(result.Value.(map[string]any), "stats")), OK: true} } // result := c.Action("agentic.credits.award").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"})) @@ -353,7 +379,7 @@ func (s *PrepSubsystem) handleCreditsAward(ctx context.Context, options core.Opt return result } - return core.Result{Value: parseCreditEntry(payloadDataMap(result.Value.(map[string]any))), OK: true} + return core.Result{Value: parseCreditEntry(payloadResourceMap(result.Value.(map[string]any), "entry")), OK: true} } // result := c.Action("agentic.credits.balance").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"})) @@ -369,7 +395,7 @@ func (s *PrepSubsystem) handleCreditsBalance(ctx context.Context, options core.O return result } - return core.Result{Value: parseCreditBalance(payloadDataMap(result.Value.(map[string]any))), OK: true} + return core.Result{Value: parseCreditBalance(payloadResourceMap(result.Value.(map[string]any), "balance")), OK: true} } // result := c.Action("agentic.credits.history").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"})) @@ -404,7 +430,7 @@ func (s *PrepSubsystem) handleSubscriptionDetect(ctx context.Context, options co return result } - return core.Result{Value: parseSubscriptionCapabilities(payloadDataMap(result.Value.(map[string]any))), OK: true} + return core.Result{Value: parseSubscriptionCapabilities(payloadResourceMap(result.Value.(map[string]any), "capabilities", "subscription")), OK: true} } // result := c.Action("agentic.subscription.budget").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"})) @@ -420,7 +446,7 @@ func (s *PrepSubsystem) handleSubscriptionBudget(ctx context.Context, options co return result } - return core.Result{Value: payloadDataMap(result.Value.(map[string]any)), OK: true} + return core.Result{Value: payloadResourceMap(result.Value.(map[string]any), "budget"), OK: true} } // result := c.Action("agentic.subscription.budget.update").Run(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"})) @@ -439,7 +465,7 @@ func (s *PrepSubsystem) handleSubscriptionBudgetUpdate(ctx context.Context, opti return result } - return core.Result{Value: payloadDataMap(result.Value.(map[string]any)), OK: true} + return core.Result{Value: payloadResourceMap(result.Value.(map[string]any), "budget"), OK: true} } func (s *PrepSubsystem) platformPayload(ctx context.Context, action, method, path string, body any) core.Result { @@ -492,13 +518,88 @@ func payloadDataMap(payload map[string]any) map[string]any { return anyMapValue(payload["data"]) } -func payloadDataSlice(payload map[string]any) []map[string]any { - return anyMapSliceValue(payload["data"]) +func payloadDataSlice(payload map[string]any, keys ...string) []map[string]any { + if values := anyMapSliceValue(payload["data"]); len(values) > 0 { + return values + } + + if data := payloadDataMap(payload); len(data) > 0 { + for _, key := range keys { + if values := anyMapSliceValue(data[key]); len(values) > 0 { + return values + } + } + } + + for _, key := range keys { + if values := anyMapSliceValue(payload[key]); len(values) > 0 { + return values + } + } + + return nil +} + +func payloadResourceMap(payload map[string]any, keys ...string) map[string]any { + if data := payloadDataMap(payload); len(data) > 0 { + for _, key := range keys { + if values := anyMapValue(data[key]); len(values) > 0 { + return values + } + } + return data + } + + for _, key := range keys { + if values := anyMapValue(payload[key]); len(values) > 0 { + return values + } + } + + for key, value := range payload { + switch key { + case "data", "error", "code", "message": + continue + } + if value != nil { + return payload + } + } + + return nil +} + +func mapIntValue(values map[string]any, keys ...string) int { + for _, key := range keys { + if value, ok := values[key]; ok { + return intValue(value) + } + } + return 0 +} + +func intValueOK(value any) (int, bool) { + switch typed := value.(type) { + case int: + return typed, true + case int64: + return int(typed), true + case float64: + return int(typed), true + case string: + trimmed := core.Trim(typed) + parsed := parseInt(trimmed) + if parsed != 0 || trimmed == "0" { + return parsed, true + } + } + return 0, false } func parseFleetNode(values map[string]any) FleetNode { return FleetNode{ ID: intValue(values["id"]), + WorkspaceID: intValue(values["workspace_id"]), AgentID: stringValue(values["agent_id"]), Platform: stringValue(values["platform"]), Models: listValue(values["models"]), @@ -514,6 +615,8 @@ func parseFleetNode(values map[string]any) FleetNode { func parseFleetTask(values map[string]any) FleetTask { return FleetTask{ ID: intValue(values["id"]), + WorkspaceID: intValue(values["workspace_id"]), + FleetNodeID: intValue(values["fleet_node_id"]), Repo: stringValue(values["repo"]), Branch: stringValue(values["branch"]), Task: stringValue(values["task"]), @@ -530,13 +633,16 @@ func parseFleetTask(values map[string]any) FleetTask { } func parseFleetNodesOutput(payload map[string]any) FleetNodesOutput { - nodesData := payloadDataSlice(payload) + nodesData := payloadDataSlice(payload, "nodes") nodes := make([]FleetNode, 0, len(nodesData)) for _, values := range nodesData { nodes = append(nodes, parseFleetNode(values)) } - total := intValue(payload["total"]) + total := mapIntValue(payload, "total", "count") + if total == 0 { + total = mapIntValue(payloadDataMap(payload), "total", "count") + } if total == 0 { total = len(nodes) } @@ -561,6 +667,8 @@ func parseFleetStats(values map[string]any) FleetStats { func parseCreditEntry(values map[string]any) CreditEntry { return CreditEntry{ ID: intValue(values["id"]), + WorkspaceID: intValue(values["workspace_id"]), + FleetNodeID: intValue(values["fleet_node_id"]), TaskType: stringValue(values["task_type"]), Amount: intValue(values["amount"]), BalanceAfter: intValue(values["balance_after"]), @@ -578,13 +686,16 @@ func parseCreditBalance(values map[string]any) CreditBalance { } func parseCreditsHistoryOutput(payload map[string]any) CreditsHistoryOutput { - entriesData := payloadDataSlice(payload) + entriesData := payloadDataSlice(payload, "entries", "history") entries := make([]CreditEntry, 0, len(entriesData)) for _, values := range entriesData { entries = append(entries, parseCreditEntry(values)) } - total := intValue(payload["total"]) + total := mapIntValue(payload, "total", "count") + if total == 0 { + total = mapIntValue(payloadDataMap(payload), "total", "count") + } if total == 0 { total = len(entries) } @@ -596,10 +707,19 @@ func parseCreditsHistoryOutput(payload map[string]any) CreditsHistoryOutput { } func parseSubscriptionCapabilities(values map[string]any) SubscriptionCapabilities { - return SubscriptionCapabilities{ + capabilities := SubscriptionCapabilities{ Providers: boolMapValue(values["providers"]), Available: listValue(values["available"]), } + if len(capabilities.Available) == 0 && len(capabilities.Providers) > 0 { + for name, enabled := range capabilities.Providers { + if enabled { + capabilities.Available = append(capabilities.Available, name) + } + } + capabilities.Available = cleanStrings(capabilities.Available) + } + return capabilities } func appendQueryParam(path, key, value string) string { diff --git a/pkg/agentic/platform_test.go b/pkg/agentic/platform_test.go index 1f8f700..badcbf1 100644 --- a/pkg/agentic/platform_test.go +++ b/pkg/agentic/platform_test.go @@ -194,6 +194,43 @@ func TestPlatform_HandleCreditsHistory_Good(t *testing.T) { assert.Equal(t, 7, output.Entries[0].BalanceAfter) } +func TestPlatform_HandleFleetNodes_Good_NestedEnvelope(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{"data":{"nodes":[{"id":1,"workspace_id":7,"agent_id":"charon","platform":"linux","models":["codex"],"status":"online"}],"total":1}}`)) + })) + defer server.Close() + + subsystem := testPrepWithPlatformServer(t, server, "secret-token") + result := subsystem.handleFleetNodes(context.Background(), core.NewOptions()) + require.True(t, result.OK) + + output, ok := result.Value.(FleetNodesOutput) + require.True(t, ok) + require.Len(t, output.Nodes, 1) + assert.Equal(t, 1, output.Total) + assert.Equal(t, 7, output.Nodes[0].WorkspaceID) +} + +func TestPlatform_HandleCreditsHistory_Good_NestedEnvelope(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{"data":{"entries":[{"id":1,"workspace_id":3,"fleet_node_id":9,"task_type":"fleet-task","amount":2,"balance_after":7}],"total":1}}`)) + })) + defer server.Close() + + subsystem := testPrepWithPlatformServer(t, server, "secret-token") + result := subsystem.handleCreditsHistory(context.Background(), core.NewOptions( + core.Option{Key: "agent_id", Value: "charon"}, + )) + require.True(t, result.OK) + + output, ok := result.Value.(CreditsHistoryOutput) + require.True(t, ok) + require.Len(t, output.Entries, 1) + assert.Equal(t, 1, output.Total) + assert.Equal(t, 3, output.Entries[0].WorkspaceID) + assert.Equal(t, 9, output.Entries[0].FleetNodeID) +} + func TestPlatform_HandleSubscriptionDetect_Good(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { require.Equal(t, "/v1/subscription/detect", r.URL.Path) @@ -222,3 +259,43 @@ func TestPlatform_HandleSubscriptionDetect_Good(t *testing.T) { assert.True(t, capabilities.Providers["claude"]) assert.Equal(t, []string{"claude", "openai"}, capabilities.Available) } + +func TestPlatform_HandleSubscriptionDetect_Good_ProvidersOnly(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{"data":{"providers":{"claude":true,"openai":false,"gemini":true}}}`)) + })) + defer server.Close() + + subsystem := testPrepWithPlatformServer(t, server, "secret-token") + result := subsystem.handleSubscriptionDetect(context.Background(), core.NewOptions()) + require.True(t, result.OK) + + capabilities, ok := result.Value.(SubscriptionCapabilities) + require.True(t, ok) + assert.ElementsMatch(t, []string{"claude", "gemini"}, capabilities.Available) +} + +func TestPlatform_HandleSyncStatus_Good_LocalStateFallback(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + t.Setenv("CORE_AGENT_API_KEY", "") + t.Setenv("CORE_BRAIN_KEY", "") + recordSyncPush(time.Date(2026, 3, 31, 8, 0, 0, 0, time.UTC)) + recordSyncPull(time.Date(2026, 3, 31, 8, 5, 0, 0, time.UTC)) + + c := core.New() + subsystem := &PrepSubsystem{ + ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + } + result := subsystem.handleSyncStatus(context.Background(), core.NewOptions( + core.Option{Key: "agent_id", Value: "charon"}, + )) + require.True(t, result.OK) + + status, ok := result.Value.(SyncStatusOutput) + require.True(t, ok) + assert.Equal(t, "2026-03-31T08:00:00Z", status.LastPushAt) + assert.Equal(t, "2026-03-31T08:05:00Z", status.LastPullAt) +} diff --git a/pkg/agentic/session.go b/pkg/agentic/session.go index 93bbd94..a231248 100644 --- a/pkg/agentic/session.go +++ b/pkg/agentic/session.go @@ -288,7 +288,7 @@ func (s *PrepSubsystem) sessionEnd(ctx context.Context, _ *mcp.CallToolRequest, } func sessionDataMap(payload map[string]any) map[string]any { - data := payloadDataMap(payload) + data := payloadResourceMap(payload, "session") if len(data) > 0 { return data } @@ -320,15 +320,15 @@ func parseSession(values map[string]any) Session { } func parseSessionListOutput(payload map[string]any) SessionListOutput { - sessionData := payloadDataSlice(payload) + sessionData := payloadDataSlice(payload, "sessions") sessions := make([]Session, 0, len(sessionData)) for _, values := range sessionData { sessions = append(sessions, parseSession(values)) } - count := intValue(payload["count"]) + count := mapIntValue(payload, "count", "total") if count == 0 { - count = intValue(payload["total"]) + count = mapIntValue(payloadDataMap(payload), "count", "total") } if count == 0 { count = len(sessions) diff --git a/pkg/agentic/session_test.go b/pkg/agentic/session_test.go index badb579..8a861f1 100644 --- a/pkg/agentic/session_test.go +++ b/pkg/agentic/session_test.go @@ -87,6 +87,24 @@ func TestSession_HandleSessionGet_Good(t *testing.T) { assert.Equal(t, "ax-follow-up", output.Session.Plan) } +func TestSession_HandleSessionGet_Good_NestedEnvelope(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{"data":{"session":{"session_id":"ses_nested","plan":"ax-follow-up","agent_type":"codex","status":"active"}}}`)) + })) + defer server.Close() + + subsystem := testPrepWithPlatformServer(t, server, "secret-token") + result := subsystem.handleSessionGet(context.Background(), core.NewOptions( + core.Option{Key: "session_id", Value: "ses_nested"}, + )) + require.True(t, result.OK) + + output, ok := result.Value.(SessionOutput) + require.True(t, ok) + assert.Equal(t, "ses_nested", output.Session.SessionID) + assert.Equal(t, "active", output.Session.Status) +} + func TestSession_HandleSessionList_Good(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { require.Equal(t, "/v1/sessions", r.URL.Path) @@ -112,6 +130,23 @@ func TestSession_HandleSessionList_Good(t *testing.T) { assert.Equal(t, "ses_1", output.Sessions[0].SessionID) } +func TestSession_HandleSessionList_Good_NestedEnvelope(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{"data":{"sessions":[{"session_id":"ses_1","agent_type":"codex","status":"active"}],"total":1}}`)) + })) + defer server.Close() + + subsystem := testPrepWithPlatformServer(t, server, "secret-token") + result := subsystem.handleSessionList(context.Background(), core.NewOptions()) + require.True(t, result.OK) + + output, ok := result.Value.(SessionListOutput) + require.True(t, ok) + assert.Equal(t, 1, output.Count) + require.Len(t, output.Sessions, 1) + assert.Equal(t, "ses_1", output.Sessions[0].SessionID) +} + func TestSession_HandleSessionContinue_Good(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { require.Equal(t, "/v1/sessions/ses_abc123/continue", r.URL.Path) diff --git a/pkg/agentic/sync.go b/pkg/agentic/sync.go index 19f760f..628694a 100644 --- a/pkg/agentic/sync.go +++ b/pkg/agentic/sync.go @@ -34,6 +34,11 @@ type syncQueuedPush struct { QueuedAt time.Time `json:"queued_at"` } +type syncStatusState struct { + LastPushAt time.Time `json:"last_push_at,omitempty"` + LastPullAt time.Time `json:"last_pull_at,omitempty"` +} + // result := c.Action("agentic.sync.push").Run(ctx, core.NewOptions()) func (s *PrepSubsystem) handleSyncPush(ctx context.Context, options core.Options) core.Result { output, err := s.syncPush(ctx, options.String("agent_id")) @@ -84,6 +89,7 @@ func (s *PrepSubsystem) syncPush(ctx context.Context, agentID string) (SyncPushO return SyncPushOutput{Success: true, Count: synced}, nil } synced += len(queued.Dispatches) + recordSyncPush(time.Now()) } writeSyncQueue(nil) @@ -107,20 +113,21 @@ func (s *PrepSubsystem) syncPull(ctx context.Context, agentID string) (SyncPullO return SyncPullOutput{Success: true, Count: len(cached), Context: cached}, nil } - var response struct { - Data []map[string]any `json:"data"` - } + var response map[string]any parseResult := core.JSONUnmarshalString(result.Value.(string), &response) if !parseResult.OK { cached := readSyncContext() return SyncPullOutput{Success: true, Count: len(cached), Context: cached}, nil } - writeSyncContext(response.Data) + + contextData := syncContextPayload(response) + writeSyncContext(contextData) + recordSyncPull(time.Now()) return SyncPullOutput{ Success: true, - Count: len(response.Data), - Context: response.Data, + Count: len(contextData), + Context: contextData, }, nil } @@ -159,18 +166,7 @@ func collectSyncDispatches() []map[string]any { if !shouldSyncStatus(workspaceStatus.Status) { continue } - dispatches = append(dispatches, map[string]any{ - "workspace": WorkspaceName(workspaceDir), - "repo": workspaceStatus.Repo, - "org": workspaceStatus.Org, - "task": workspaceStatus.Task, - "agent": workspaceStatus.Agent, - "branch": workspaceStatus.Branch, - "status": workspaceStatus.Status, - "pr_url": workspaceStatus.PRURL, - "started_at": workspaceStatus.StartedAt, - "updated_at": workspaceStatus.UpdatedAt, - }) + dispatches = append(dispatches, syncDispatchRecord(workspaceDir, workspaceStatus)) } return dispatches } @@ -213,6 +209,10 @@ func syncContextPath() string { return core.JoinPath(syncStateDir(), "context.json") } +func syncStatusPath() string { + return core.JoinPath(syncStateDir(), "status.json") +} + func readSyncQueue() []syncQueuedPush { var queued []syncQueuedPush result := fs.Read(syncQueuePath()) @@ -252,3 +252,89 @@ func writeSyncContext(contextData []map[string]any) { fs.EnsureDir(syncStateDir()) fs.WriteAtomic(syncContextPath(), core.JSONMarshalString(contextData)) } + +func syncContextPayload(payload map[string]any) []map[string]any { + if contextData := payloadDataSlice(payload, "context", "items", "memories"); len(contextData) > 0 { + return contextData + } + return nil +} + +func syncDispatchRecord(workspaceDir string, workspaceStatus *WorkspaceStatus) map[string]any { + record := map[string]any{ + "workspace": WorkspaceName(workspaceDir), + "repo": workspaceStatus.Repo, + "org": workspaceStatus.Org, + "task": workspaceStatus.Task, + "agent": workspaceStatus.Agent, + "branch": workspaceStatus.Branch, + "status": workspaceStatus.Status, + "question": workspaceStatus.Question, + "issue": workspaceStatus.Issue, + "runs": workspaceStatus.Runs, + "process_id": workspaceStatus.ProcessID, + "pr_url": workspaceStatus.PRURL, + "started_at": workspaceStatus.StartedAt, + "updated_at": workspaceStatus.UpdatedAt, + } + + if report := readSyncWorkspaceReport(workspaceDir); len(report) > 0 { + record["report"] = report + if findings := anyMapSliceValue(report["findings"]); len(findings) > 0 { + record["findings"] = findings + } + if changes := anyMapValue(report["changes"]); len(changes) > 0 { + record["changes"] = changes + } + } + + return record +} + +func readSyncWorkspaceReport(workspaceDir string) map[string]any { + reportPath := core.JoinPath(WorkspaceMetaDir(workspaceDir), "report.json") + result := fs.Read(reportPath) + if !result.OK { + return nil + } + + var report map[string]any + parseResult := core.JSONUnmarshalString(result.Value.(string), &report) + if !parseResult.OK { + return nil + } + + return report +} + +func readSyncStatusState() syncStatusState { + var state syncStatusState + result := fs.Read(syncStatusPath()) + if !result.OK { + return state + } + + parseResult := core.JSONUnmarshalString(result.Value.(string), &state) + if !parseResult.OK { + return syncStatusState{} + } + + return state +} + +func writeSyncStatusState(state syncStatusState) { + fs.EnsureDir(syncStateDir()) + fs.WriteAtomic(syncStatusPath(), core.JSONMarshalString(state)) +} + +func recordSyncPush(at time.Time) { + state := readSyncStatusState() + state.LastPushAt = at + writeSyncStatusState(state) +} + +func recordSyncPull(at time.Time) { + state := readSyncStatusState() + state.LastPullAt = at + writeSyncStatusState(state) +} diff --git a/pkg/agentic/sync_test.go b/pkg/agentic/sync_test.go index 190edb0..7700ace 100644 --- a/pkg/agentic/sync_test.go +++ b/pkg/agentic/sync_test.go @@ -59,6 +59,7 @@ func TestSync_HandleSyncPush_Good(t *testing.T) { require.NoError(t, err) assert.True(t, output.Success) assert.Equal(t, 1, output.Count) + assert.False(t, readSyncStatusState().LastPushAt.IsZero()) } func TestSync_HandleSyncPush_Bad(t *testing.T) { @@ -155,6 +156,88 @@ func TestSync_HandleSyncPull_Good(t *testing.T) { cached := readSyncContext() require.Len(t, cached, 1) assert.Equal(t, "mem-1", cached[0]["id"]) + assert.False(t, readSyncStatusState().LastPullAt.IsZero()) +} + +func TestSync_HandleSyncPush_Good_ReportMetadata(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + t.Setenv("CORE_AGENT_API_KEY", "secret-token") + + workspaceDir := core.JoinPath(root, "workspace", "core", "go-io", "task-5") + fs.EnsureDir(WorkspaceMetaDir(workspaceDir)) + require.True(t, fs.Write(core.JoinPath(WorkspaceMetaDir(workspaceDir), "report.json"), `{"findings":[{"file":"main.go"}],"changes":{"files_changed":1}}`).OK) + writeStatusResult(workspaceDir, &WorkspaceStatus{ + Status: "blocked", + Agent: "codex", + Repo: "go-io", + Org: "core", + Task: "Fix tests", + Branch: "agent/fix-tests", + Issue: 42, + Question: "Which API version?", + ProcessID: "proc-1", + Runs: 2, + StartedAt: time.Now(), + UpdatedAt: time.Now(), + }) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + bodyResult := core.ReadAll(r.Body) + require.True(t, bodyResult.OK) + + var payload map[string]any + parseResult := core.JSONUnmarshalString(bodyResult.Value.(string), &payload) + require.True(t, parseResult.OK) + + dispatches, ok := payload["dispatches"].([]any) + require.True(t, ok) + require.Len(t, dispatches, 1) + + record, ok := dispatches[0].(map[string]any) + require.True(t, ok) + require.Equal(t, "Which API version?", record["question"]) + require.Equal(t, float64(42), record["issue"]) + require.Equal(t, float64(2), record["runs"]) + require.Equal(t, "proc-1", record["process_id"]) + require.NotNil(t, record["report"]) + require.NotNil(t, record["findings"]) + require.NotNil(t, record["changes"]) + + _, _ = w.Write([]byte(`{"data":{"synced":1}}`)) + })) + defer server.Close() + + subsystem := &PrepSubsystem{ + ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), + brainURL: server.URL, + } + output, err := subsystem.syncPush(context.Background(), "") + require.NoError(t, err) + assert.True(t, output.Success) + assert.Equal(t, 1, output.Count) +} + +func TestSync_HandleSyncPull_Good_NestedEnvelope(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + t.Setenv("CORE_AGENT_API_KEY", "secret-token") + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{"data":{"context":[{"id":"ctx-1","content":"Known pattern"}]}}`)) + })) + defer server.Close() + + subsystem := &PrepSubsystem{ + ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), + brainURL: server.URL, + } + output, err := subsystem.syncPull(context.Background(), "codex") + require.NoError(t, err) + assert.True(t, output.Success) + assert.Equal(t, 1, output.Count) + require.Len(t, output.Context, 1) + assert.Equal(t, "ctx-1", output.Context[0]["id"]) } func TestSync_HandleSyncPull_Bad(t *testing.T) {