From f32edaa17ef3cc3896a4545f6023b8a27132c615 Mon Sep 17 00:00:00 2001 From: Virgil Date: Mon, 30 Mar 2026 20:45:23 +0000 Subject: [PATCH] fix(ax): streamline agentic and monitor helpers Co-Authored-By: Virgil --- pkg/agentic/ingest.go | 54 ++++----- pkg/agentic/ingest_test.go | 10 +- pkg/agentic/scan.go | 44 +++----- pkg/monitor/monitor.go | 218 +++++++++++++------------------------ pkg/monitor/sync.go | 39 ++----- 5 files changed, 129 insertions(+), 236 deletions(-) diff --git a/pkg/agentic/ingest.go b/pkg/agentic/ingest.go index 2c12dba..9cccd73 100644 --- a/pkg/agentic/ingest.go +++ b/pkg/agentic/ingest.go @@ -8,65 +8,55 @@ import ( core "dappco.re/go/core" ) -// ingestFindings reads the agent output log and creates issues via the API -// for scan/audit results. Only runs for conventions and security templates. func (s *PrepSubsystem) ingestFindings(wsDir string) { - result := ReadStatusResult(wsDir) - st, ok := workspaceStatusValue(result) - if !ok || st.Status != "completed" { + statusResult := ReadStatusResult(wsDir) + workspaceStatus, ok := workspaceStatusValue(statusResult) + if !ok || workspaceStatus.Status != "completed" { return } - // Read the log file logFiles := workspaceLogFiles(wsDir) if len(logFiles) == 0 { return } - r := fs.Read(logFiles[0]) - if !r.OK || len(r.Value.(string)) < 100 { + logResult := fs.Read(logFiles[0]) + if !logResult.OK || len(logResult.Value.(string)) < 100 { return } - body := r.Value.(string) + logBody := logResult.Value.(string) - // Skip quota errors - if core.Contains(body, "QUOTA_EXHAUSTED") || core.Contains(body, "QuotaError") { + if core.Contains(logBody, "QUOTA_EXHAUSTED") || core.Contains(logBody, "QuotaError") { return } - // Only ingest if there are actual findings (file:line references) - findings := countFileRefs(body) + findings := countFileRefs(logBody) if findings < 2 { - return // No meaningful findings + return } - // Determine issue type from the template used issueType := "task" priority := "normal" - if core.Contains(body, "security") || core.Contains(body, "Security") { + if core.Contains(logBody, "security") || core.Contains(logBody, "Security") { issueType = "bug" priority = "high" } - // Create a single issue per repo with all findings in the body - title := core.Sprintf("Scan findings for %s (%d items)", st.Repo, findings) + title := core.Sprintf("Scan findings for %s (%d items)", workspaceStatus.Repo, findings) - // Truncate body to reasonable size for issue description - description := body - if len(description) > 10000 { - description = core.Concat(description[:10000], "\n\n... (truncated, see full log in workspace)") + issueDescription := logBody + if len(issueDescription) > 10000 { + issueDescription = core.Concat(issueDescription[:10000], "\n\n... (truncated, see full log in workspace)") } - s.createIssueViaAPI(st.Repo, title, description, issueType, priority, "scan") + s.createIssueViaAPI(title, issueDescription, issueType, priority) } -// countFileRefs counts file:line references in the output (indicates real findings) func countFileRefs(body string) int { count := 0 for i := 0; i < len(body)-5; i++ { if body[i] == '`' { - // Look for pattern: `file.go:123` j := i + 1 for j < len(body) && body[j] != '`' && j-i < 100 { j++ @@ -82,20 +72,18 @@ func countFileRefs(body string) int { return count } -// createIssueViaAPI posts an issue to the lthn.sh API -func (s *PrepSubsystem) createIssueViaAPI(repo, title, description, issueType, priority, source string) { +func (s *PrepSubsystem) createIssueViaAPI(title, description, issueType, priority string) { if s.brainKey == "" { return } - // Read the agent API key from file - r := fs.Read(core.JoinPath(HomeDir(), ".claude", "agent-api.key")) - if !r.OK { + apiKeyResult := fs.Read(core.JoinPath(HomeDir(), ".claude", "agent-api.key")) + if !apiKeyResult.OK { return } - apiKey := core.Trim(r.Value.(string)) + apiKey := core.Trim(apiKeyResult.Value.(string)) - payload := core.JSONMarshalString(map[string]string{ + issuePayload := core.JSONMarshalString(map[string]string{ "title": title, "description": description, "type": issueType, @@ -103,5 +91,5 @@ func (s *PrepSubsystem) createIssueViaAPI(repo, title, description, issueType, p "reporter": "cladius", }) - HTTPPost(context.Background(), core.Concat(s.brainURL, "/v1/issues"), payload, apiKey, "Bearer") + HTTPPost(context.Background(), core.Concat(s.brainURL, "/v1/issues"), issuePayload, apiKey, "Bearer") } diff --git a/pkg/agentic/ingest_test.go b/pkg/agentic/ingest_test.go index 592b11a..765ea0a 100644 --- a/pkg/agentic/ingest_test.go +++ b/pkg/agentic/ingest_test.go @@ -220,7 +220,7 @@ func TestIngest_CreateIssueViaAPI_Good_Success(t *testing.T) { failCount: make(map[string]int), } - s.createIssueViaAPI("go-io", "Test Issue", "Description", "bug", "high", "scan") + s.createIssueViaAPI("Test Issue", "Description", "bug", "high") assert.True(t, called) } @@ -234,7 +234,7 @@ func TestIngest_CreateIssueViaAPI_Bad_NoBrainKey(t *testing.T) { // Should return early without panic assert.NotPanics(t, func() { - s.createIssueViaAPI("go-io", "Title", "Body", "task", "normal", "scan") + s.createIssueViaAPI("Title", "Body", "task", "normal") }) } @@ -253,7 +253,7 @@ func TestIngest_CreateIssueViaAPI_Bad_NoAPIKey(t *testing.T) { // Should return early — no API key file assert.NotPanics(t, func() { - s.createIssueViaAPI("go-io", "Title", "Body", "task", "normal", "scan") + s.createIssueViaAPI("Title", "Body", "task", "normal") }) } @@ -278,7 +278,7 @@ func TestIngest_CreateIssueViaAPI_Bad_ServerError(t *testing.T) { // Should not panic even on server error assert.NotPanics(t, func() { - s.createIssueViaAPI("go-io", "Title", "Body", "task", "normal", "scan") + s.createIssueViaAPI("Title", "Body", "task", "normal") }) } @@ -344,7 +344,7 @@ func TestIngest_CreateIssueViaAPI_Ugly(t *testing.T) { failCount: make(map[string]int), } - s.createIssueViaAPI("go-io", "XSS Test", "bold&", "bug", "high", "scan") + s.createIssueViaAPI("XSS Test", "bold&", "bug", "high") assert.True(t, called) } diff --git a/pkg/agentic/scan.go b/pkg/agentic/scan.go index 93d252f..7475ab9 100644 --- a/pkg/agentic/scan.go +++ b/pkg/agentic/scan.go @@ -10,27 +10,21 @@ import ( "github.com/modelcontextprotocol/go-sdk/mcp" ) -// ScanInput is the input for agentic_scan. -// -// input := agentic.ScanInput{Org: "core", Labels: []string{"agentic", "bug"}, Limit: 20} +// input := agentic.ScanInput{Org: "core", Labels: []string{"agentic", "bug"}, Limit: 20} type ScanInput struct { - Org string `json:"org,omitempty"` // default "core" - Labels []string `json:"labels,omitempty"` // filter by labels (default: agentic, help-wanted, bug) - Limit int `json:"limit,omitempty"` // max issues to return + Org string `json:"org,omitempty"` + Labels []string `json:"labels,omitempty"` + Limit int `json:"limit,omitempty"` } -// ScanOutput is the output for agentic_scan. -// -// out := agentic.ScanOutput{Success: true, Count: 1, Issues: []agentic.ScanIssue{{Repo: "go-io", Number: 12}}} +// out := agentic.ScanOutput{Success: true, Count: 1, Issues: []agentic.ScanIssue{{Repo: "go-io", Number: 12}}} type ScanOutput struct { Success bool `json:"success"` Count int `json:"count"` Issues []ScanIssue `json:"issues"` } -// ScanIssue is a single actionable issue. -// -// issue := agentic.ScanIssue{Repo: "go-io", Number: 12, Title: "Replace fmt.Errorf"} +// issue := agentic.ScanIssue{Repo: "go-io", Number: 12, Title: "Replace fmt.Errorf"} type ScanIssue struct { Repo string `json:"repo"` Number int `json:"number"` @@ -57,15 +51,14 @@ func (s *PrepSubsystem) scan(ctx context.Context, _ *mcp.CallToolRequest, input var allIssues []ScanIssue - // Get repos for the org repos, err := s.listOrgRepos(ctx, input.Org) if err != nil { return nil, ScanOutput{}, err } - for _, repo := range repos { + for _, repoName := range repos { for _, label := range input.Labels { - issues, err := s.listRepoIssues(ctx, input.Org, repo, label) + issues, err := s.listRepoIssues(ctx, input.Org, repoName, label) if err != nil { continue } @@ -80,7 +73,6 @@ func (s *PrepSubsystem) scan(ctx context.Context, _ *mcp.CallToolRequest, input } } - // Deduplicate by repo+number seen := make(map[string]bool) var unique []ScanIssue for _, issue := range allIssues { @@ -109,20 +101,20 @@ func (s *PrepSubsystem) listOrgRepos(ctx context.Context, org string) ([]string, } var allNames []string - for _, r := range repos { - allNames = append(allNames, r.Name) + for _, repoInfo := range repos { + allNames = append(allNames, repoInfo.Name) } return allNames, nil } func (s *PrepSubsystem) listRepoIssues(ctx context.Context, org, repo, label string) ([]ScanIssue, error) { - u := core.Sprintf("%s/api/v1/repos/%s/%s/issues?state=open&limit=10&type=issues", + requestURL := core.Sprintf("%s/api/v1/repos/%s/%s/issues?state=open&limit=10&type=issues", s.forgeURL, org, repo) if label != "" { - u = core.Concat(u, "&labels=", url.QueryEscape(label)) + requestURL = core.Concat(requestURL, "&labels=", url.QueryEscape(label)) } - r := HTTPGet(ctx, u, s.forgeToken, "token") - if !r.OK { + httpResult := HTTPGet(ctx, requestURL, s.forgeToken, "token") + if !httpResult.OK { return nil, core.E("scan.listRepoIssues", core.Concat("failed to list issues for ", repo), nil) } @@ -137,16 +129,16 @@ func (s *PrepSubsystem) listRepoIssues(ctx context.Context, org, repo, label str } `json:"assignee"` HTMLURL string `json:"html_url"` } - if ur := core.JSONUnmarshalString(r.Value.(string), &issues); !ur.OK { - err, _ := ur.Value.(error) + if parseResult := core.JSONUnmarshalString(httpResult.Value.(string), &issues); !parseResult.OK { + err, _ := parseResult.Value.(error) return nil, core.E("scan.listRepoIssues", "parse issues response", err) } var result []ScanIssue for _, issue := range issues { var labels []string - for _, l := range issue.Labels { - labels = append(labels, l.Name) + for _, labelInfo := range issue.Labels { + labels = append(labels, labelInfo.Name) } assignee := "" if issue.Assignee != nil { diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index c489eda..1a00844 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -1,7 +1,6 @@ // SPDX-License-Identifier: EUPL-1.2 -// Package monitor polls workspace state, repo drift, and agent inboxes, then -// pushes the current view to connected MCP clients. +// Package monitor keeps workspace state and inbox status visible to MCP clients. // // mon := monitor.New(monitor.Options{Interval: 30 * time.Second}) // mon.RegisterTools(server) @@ -20,10 +19,8 @@ import ( "github.com/modelcontextprotocol/go-sdk/mcp" ) -// fs provides unrestricted filesystem access (root "/" = no sandbox). -// -// r := fs.Read(core.JoinPath(wsRoot, name, "status.json")) -// if text, ok := resultString(r); ok { _ = core.JSONUnmarshalString(text, &st) } +// r := fs.Read(core.JoinPath(wsRoot, name, "status.json")) +// if text, ok := resultString(r); ok { _ = core.JSONUnmarshalString(text, &st) } var fs = agentic.LocalFs() type channelSender interface { @@ -61,10 +58,8 @@ func resultString(r core.Result) (string, bool) { return value, true } -// Subsystem owns the long-running monitor loop and MCP resource surface. -// -// mon := monitor.New() -// mon.Start(context.Background()) +// mon := monitor.New() +// mon.Start(context.Background()) type Subsystem struct { *core.ServiceRuntime[Options] server *mcp.Server @@ -72,23 +67,19 @@ type Subsystem struct { cancel context.CancelFunc wg sync.WaitGroup - // Track last seen state to only notify on changes - lastCompletedCount int // completed workspaces seen on the last scan - seenCompleted map[string]bool // workspace names we've already notified about - seenRunning map[string]bool // workspace names we've already sent start notification for - completionsSeeded bool // true after first completions check - lastInboxMaxID int // highest message ID seen - inboxSeeded bool // true after first inbox check - lastSyncTimestamp int64 - mu sync.Mutex + seenCompleted map[string]bool + seenRunning map[string]bool + completionsSeeded bool + lastInboxMaxID int + inboxSeeded bool + lastSyncTimestamp int64 + mu sync.Mutex - // Event-driven poke channel — dispatch goroutine sends here on completion poke chan struct{} } var _ coremcp.Subsystem = (*Subsystem)(nil) -// SetCore preserves direct test setup without going through core.WithService. // Deprecated: prefer Register with core.WithService(monitor.Register). // // mon.SetCore(c) @@ -96,14 +87,12 @@ func (m *Subsystem) SetCore(c *core.Core) { m.ServiceRuntime = core.NewServiceRuntime(c, Options{}) } -// handleAgentStarted tracks started agents. func (m *Subsystem) handleAgentStarted(ev messages.AgentStarted) { m.mu.Lock() m.seenRunning[ev.Workspace] = true m.mu.Unlock() } -// handleAgentCompleted processes agent completion — emits notifications and checks queue drain. func (m *Subsystem) handleAgentCompleted(ev messages.AgentCompleted) { m.mu.Lock() m.seenCompleted[ev.Workspace] = true @@ -113,10 +102,8 @@ func (m *Subsystem) handleAgentCompleted(ev messages.AgentCompleted) { go m.checkIdleAfterDelay() } -// HandleIPCEvents lets Core auto-wire monitor side-effects for IPC messages. -// -// c.ACTION(messages.AgentStarted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-5"}) -// c.ACTION(messages.AgentCompleted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-5", Status: "completed"}) +// c.ACTION(messages.AgentStarted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-5"}) +// c.ACTION(messages.AgentCompleted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-5", Status: "completed"}) func (m *Subsystem) HandleIPCEvents(_ *core.Core, msg core.Message) core.Result { switch ev := msg.(type) { case messages.AgentCompleted: @@ -127,24 +114,18 @@ func (m *Subsystem) HandleIPCEvents(_ *core.Core, msg core.Message) core.Result return core.Result{OK: true} } -// Options configures the monitor polling interval. -// -// opts := monitor.Options{Interval: 30 * time.Second} -// mon := monitor.New(opts) +// opts := monitor.Options{Interval: 30 * time.Second} +// mon := monitor.New(opts) type Options struct { - // Interval between checks (default: 2 minutes) Interval time.Duration } -// New builds the monitor with a polling interval and poke channel. -// -// mon := monitor.New(monitor.Options{Interval: 30 * time.Second}) +// mon := monitor.New(monitor.Options{Interval: 30 * time.Second}) func New(opts ...Options) *Subsystem { interval := 2 * time.Minute if len(opts) > 0 && opts[0].Interval > 0 { interval = opts[0].Interval } - // Override via env for debugging if envInterval := core.Env("MONITOR_INTERVAL"); envInterval != "" { if d, err := time.ParseDuration(envInterval); err == nil { interval = d @@ -158,23 +139,17 @@ func New(opts ...Options) *Subsystem { } } -// debugChannel logs a debug message. -func (m *Subsystem) debugChannel(msg string) { +func (m *Subsystem) debug(msg string) { core.Debug(msg) } -// Name keeps the monitor address stable for MCP and core.WithService lookups. -// -// name := mon.Name() // "monitor" +// name := mon.Name() // "monitor" func (m *Subsystem) Name() string { return "monitor" } -// RegisterTools publishes the monitor status resource on an MCP server. -// -// mon.RegisterTools(server) +// mon.RegisterTools(server) func (m *Subsystem) RegisterTools(server *mcp.Server) { m.server = server - // Register a resource that clients can read for current status server.AddResource(&mcp.Resource{ Name: "Agent Status", URI: "status://agents", @@ -183,11 +158,9 @@ func (m *Subsystem) RegisterTools(server *mcp.Server) { }, m.agentStatusResource) } -// Start launches the background polling loop. -// -// mon.Start(ctx) +// mon.Start(ctx) func (m *Subsystem) Start(ctx context.Context) { - monCtx, cancel := context.WithCancel(ctx) + monitorCtx, cancel := context.WithCancel(ctx) m.cancel = cancel core.Info("monitor: started (interval=%s)", m.interval) @@ -195,31 +168,25 @@ func (m *Subsystem) Start(ctx context.Context) { m.wg.Add(1) go func() { defer m.wg.Done() - m.loop(monCtx) + m.loop(monitorCtx) }() } -// OnStartup starts the monitor when Core starts the service lifecycle. -// -// r := mon.OnStartup(context.Background()) -// core.Println(r.OK) +// r := mon.OnStartup(context.Background()) +// core.Println(r.OK) func (m *Subsystem) OnStartup(ctx context.Context) core.Result { m.Start(ctx) return core.Result{OK: true} } -// OnShutdown stops the monitor through the Core lifecycle hook. -// -// r := mon.OnShutdown(context.Background()) -// core.Println(r.OK) +// r := mon.OnShutdown(context.Background()) +// core.Println(r.OK) func (m *Subsystem) OnShutdown(ctx context.Context) core.Result { _ = m.Shutdown(ctx) return core.Result{OK: true} } -// Shutdown cancels the monitor loop and waits for the goroutine to exit. -// -// _ = mon.Shutdown(ctx) +// _ = mon.Shutdown(ctx) func (m *Subsystem) Shutdown(_ context.Context) error { if m.cancel != nil { m.cancel() @@ -228,9 +195,7 @@ func (m *Subsystem) Shutdown(_ context.Context) error { return nil } -// Poke asks the loop to run a check immediately instead of waiting for the ticker. -// -// mon.Poke() +// mon.Poke() func (m *Subsystem) Poke() { select { case m.poke <- struct{}{}: @@ -238,11 +203,8 @@ func (m *Subsystem) Poke() { } } -// checkIdleAfterDelay waits briefly then checks if the fleet is genuinely idle. -// Only emits queue.drained when there are truly zero running or queued agents, -// verified by checking managed processes are alive, not just trusting status files. func (m *Subsystem) checkIdleAfterDelay() { - time.Sleep(5 * time.Second) // wait for queue drain to fill slots + time.Sleep(5 * time.Second) if m.ServiceRuntime == nil { return } @@ -253,8 +215,6 @@ func (m *Subsystem) checkIdleAfterDelay() { } } -// countLiveWorkspaces counts workspaces that are genuinely active. -// For "running" status, verifies the managed process is still alive. func (m *Subsystem) countLiveWorkspaces() (running, queued int) { var runtime *core.Core if m.ServiceRuntime != nil { @@ -262,17 +222,17 @@ func (m *Subsystem) countLiveWorkspaces() (running, queued int) { } for _, path := range agentic.WorkspaceStatusPaths() { wsDir := core.PathDir(path) - r := agentic.ReadStatusResult(wsDir) - if !r.OK { + statusResult := agentic.ReadStatusResult(wsDir) + if !statusResult.OK { continue } - st, ok := r.Value.(*agentic.WorkspaceStatus) - if !ok || st == nil { + workspaceStatus, ok := statusResult.Value.(*agentic.WorkspaceStatus) + if !ok || workspaceStatus == nil { continue } - switch st.Status { + switch workspaceStatus.Status { case "running": - if st.PID > 0 && processAlive(runtime, st.ProcessID, st.PID) { + if workspaceStatus.PID > 0 && processAlive(runtime, workspaceStatus.ProcessID, workspaceStatus.PID) { running++ } case "queued": @@ -282,23 +242,19 @@ func (m *Subsystem) countLiveWorkspaces() (running, queued int) { return } -// processAlive checks whether a managed process is still running. func processAlive(c *core.Core, processID string, pid int) bool { return agentic.ProcessAlive(c, processID, pid) } func (m *Subsystem) loop(ctx context.Context) { - // Initial check after short delay (let server fully start) select { case <-ctx.Done(): return case <-time.After(5 * time.Second): } - // Initialise sync timestamp to now (don't pull everything on first run) m.initSyncTimestamp() - // Run first check immediately m.check(ctx) ticker := time.NewTicker(m.interval) @@ -319,27 +275,22 @@ func (m *Subsystem) loop(ctx context.Context) { func (m *Subsystem) check(ctx context.Context) { var messages []string - // Check agent completions if msg := m.checkCompletions(); msg != "" { messages = append(messages, msg) } - // Harvest completed workspaces — push branches, check for binaries if msg := m.harvestCompleted(); msg != "" { messages = append(messages, msg) } - // Check inbox if msg := m.checkInbox(); msg != "" { messages = append(messages, msg) } - // Sync repos from other agents' changes if msg := m.syncRepos(); msg != "" { messages = append(messages, msg) } - // Only notify if there's something new if len(messages) == 0 { return } @@ -347,7 +298,6 @@ func (m *Subsystem) check(ctx context.Context) { combined := core.Join("\n", messages...) m.notify(ctx, combined) - // Notify resource subscribers that agent status changed if m.server != nil { m.server.ResourceUpdated(ctx, &mcp.ResourceUpdatedNotificationParams{ URI: "status://agents", @@ -355,9 +305,6 @@ func (m *Subsystem) check(ctx context.Context) { } } -// checkCompletions scans workspace for newly completed agents. -// Tracks by workspace name (not count) so harvest status rewrites -// don't suppress future notifications. func (m *Subsystem) checkCompletions() string { entries := agentic.WorkspaceStatusPaths() @@ -369,39 +316,38 @@ func (m *Subsystem) checkCompletions() string { m.mu.Lock() seeded := m.completionsSeeded for _, entry := range entries { - r := fs.Read(entry) - if !r.OK { + entryResult := fs.Read(entry) + if !entryResult.OK { continue } - entryData, ok := resultString(r) + entryData, ok := resultString(entryResult) if !ok { continue } - var st struct { + var workspaceStatus struct { Status string `json:"status"` Repo string `json:"repo"` Agent string `json:"agent"` } - if r := core.JSONUnmarshalString(entryData, &st); !r.OK { + if parseResult := core.JSONUnmarshalString(entryData, &workspaceStatus); !parseResult.OK { continue } wsName := agentic.WorkspaceName(core.PathDir(entry)) - switch st.Status { + switch workspaceStatus.Status { case "completed": completed++ if !m.seenCompleted[wsName] { m.seenCompleted[wsName] = true if seeded { - newlyCompleted = append(newlyCompleted, core.Sprintf("%s (%s)", st.Repo, st.Agent)) + newlyCompleted = append(newlyCompleted, core.Sprintf("%s (%s)", workspaceStatus.Repo, workspaceStatus.Agent)) } } case "running": running++ if !m.seenRunning[wsName] && seeded { m.seenRunning[wsName] = true - // No individual start notification — too noisy } case "queued": queued++ @@ -409,12 +355,11 @@ func (m *Subsystem) checkCompletions() string { if !m.seenCompleted[wsName] { m.seenCompleted[wsName] = true if seeded { - newlyCompleted = append(newlyCompleted, core.Sprintf("%s (%s) [%s]", st.Repo, st.Agent, st.Status)) + newlyCompleted = append(newlyCompleted, core.Sprintf("%s (%s) [%s]", workspaceStatus.Repo, workspaceStatus.Agent, workspaceStatus.Status)) } } } } - m.lastCompletedCount = completed m.completionsSeeded = true m.mu.Unlock() @@ -422,7 +367,6 @@ func (m *Subsystem) checkCompletions() string { return "" } - // Only emit queue.drained when genuinely empty — verified by live process checks liveRunning, liveQueued := m.countLiveWorkspaces() if m.ServiceRuntime != nil && liveRunning == 0 && liveQueued == 0 { m.Core().ACTION(messages.QueueDrained{Completed: len(newlyCompleted)}) @@ -438,18 +382,16 @@ func (m *Subsystem) checkCompletions() string { return msg } -// checkInbox checks for unread messages. func (m *Subsystem) checkInbox() string { apiKeyStr := monitorBrainKey() if apiKeyStr == "" { return "" } - // Call the API to check inbox apiURL := monitorAPIURL() inboxURL := core.Concat(apiURL, "/v1/messages/inbox?agent=", url.QueryEscape(agentic.AgentName())) - hr := agentic.HTTPGet(context.Background(), inboxURL, core.Trim(apiKeyStr), "Bearer") - if !hr.OK { + httpResult := agentic.HTTPGet(context.Background(), inboxURL, core.Trim(apiKeyStr), "Bearer") + if !httpResult.OK { return "" } @@ -462,12 +404,11 @@ func (m *Subsystem) checkInbox() string { Content string `json:"content"` } `json:"data"` } - if r := core.JSONUnmarshalString(hr.Value.(string), &resp); !r.OK { - m.debugChannel("checkInbox: failed to decode response") + if parseResult := core.JSONUnmarshalString(httpResult.Value.(string), &resp); !parseResult.OK { + m.debug("checkInbox: failed to decode response") return "" } - // Find max ID, count unread, collect new messages maxID := 0 unread := 0 @@ -484,20 +425,19 @@ func (m *Subsystem) checkInbox() string { } var newMessages []newMessage - for _, msg := range resp.Data { - if msg.ID > maxID { - maxID = msg.ID + for _, message := range resp.Data { + if message.ID > maxID { + maxID = message.ID } - if !msg.Read { + if !message.Read { unread++ } - // Collect messages newer than what we've seen - if msg.ID > prevMaxID { + if message.ID > prevMaxID { newMessages = append(newMessages, newMessage{ - ID: msg.ID, - From: msg.From, - Subject: msg.Subject, - Content: msg.Content, + ID: message.ID, + From: message.From, + Subject: message.Subject, + Content: message.Content, }) } } @@ -507,24 +447,21 @@ func (m *Subsystem) checkInbox() string { m.inboxSeeded = true m.mu.Unlock() - // First check after startup: seed, don't fire if !seeded { return "" } - // Only fire if there are new messages (higher ID than last seen) if maxID <= prevMaxID || len(newMessages) == 0 { return "" } - // Push each message as a channel event so it lands in the session if m.ServiceRuntime != nil { if notifier, ok := core.ServiceFor[channelSender](m.Core(), "mcp"); ok { - for _, msg := range newMessages { + for _, message := range newMessages { notifier.ChannelSend(context.Background(), "inbox.message", map[string]any{ - "from": msg.From, - "subject": msg.Subject, - "content": msg.Content, + "from": message.From, + "subject": message.Subject, + "content": message.Content, }) } } @@ -533,15 +470,13 @@ func (m *Subsystem) checkInbox() string { return core.Sprintf("%d unread message(s) in inbox", unread) } -// notify sends a log notification to all connected MCP sessions. func (m *Subsystem) notify(ctx context.Context, message string) { if m.server == nil { return } - // Use the server's session list to broadcast - for ss := range m.server.Sessions() { - ss.Log(ctx, &mcp.LoggingMessageParams{ + for session := range m.server.Sessions() { + session.Log(ctx, &mcp.LoggingMessageParams{ Level: "info", Logger: "monitor", Data: message, @@ -549,11 +484,10 @@ func (m *Subsystem) notify(ctx context.Context, message string) { } } -// agentStatusResource returns current workspace status as a JSON resource. -func (m *Subsystem) agentStatusResource(ctx context.Context, req *mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) { +func (m *Subsystem) agentStatusResource(_ context.Context, _ *mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) { entries := agentic.WorkspaceStatusPaths() - type wsInfo struct { + type workspaceInfo struct { Name string `json:"name"` Status string `json:"status"` Repo string `json:"repo"` @@ -561,31 +495,31 @@ func (m *Subsystem) agentStatusResource(ctx context.Context, req *mcp.ReadResour PRURL string `json:"pr_url,omitempty"` } - var workspaces []wsInfo + var workspaces []workspaceInfo for _, entry := range entries { - r := fs.Read(entry) - if !r.OK { + entryResult := fs.Read(entry) + if !entryResult.OK { continue } - entryData, ok := resultString(r) + entryData, ok := resultString(entryResult) if !ok { continue } - var st struct { + var workspaceStatus struct { Status string `json:"status"` Repo string `json:"repo"` Agent string `json:"agent"` PRURL string `json:"pr_url"` } - if r := core.JSONUnmarshalString(entryData, &st); !r.OK { + if parseResult := core.JSONUnmarshalString(entryData, &workspaceStatus); !parseResult.OK { continue } - workspaces = append(workspaces, wsInfo{ + workspaces = append(workspaces, workspaceInfo{ Name: agentic.WorkspaceName(core.PathDir(entry)), - Status: st.Status, - Repo: st.Repo, - Agent: st.Agent, - PRURL: st.PRURL, + Status: workspaceStatus.Status, + Repo: workspaceStatus.Repo, + Agent: workspaceStatus.Agent, + PRURL: workspaceStatus.PRURL, }) } diff --git a/pkg/monitor/sync.go b/pkg/monitor/sync.go index 79e69b5..3ce662c 100644 --- a/pkg/monitor/sync.go +++ b/pkg/monitor/sync.go @@ -11,51 +11,41 @@ import ( core "dappco.re/go/core" ) -// CheckinResponse is what the API returns for an agent checkin. -// -// resp := monitor.CheckinResponse{Changed: []monitor.ChangedRepo{{Repo: "core-agent", Branch: "main", SHA: "abc123"}}, Timestamp: 1712345678} +// resp := monitor.CheckinResponse{Changed: []monitor.ChangedRepo{{Repo: "core-agent", Branch: "main", SHA: "abc123"}}, Timestamp: 1712345678} type CheckinResponse struct { - // Repos that have new commits since the agent's last checkin. - Changed []ChangedRepo `json:"changed,omitempty"` - // Server timestamp — use as "since" on next checkin. - Timestamp int64 `json:"timestamp"` + Changed []ChangedRepo `json:"changed,omitempty"` + Timestamp int64 `json:"timestamp"` } -// ChangedRepo is a repo that has new commits. -// -// repo := monitor.ChangedRepo{Repo: "core-agent", Branch: "main", SHA: "abc123"} +// repo := monitor.ChangedRepo{Repo: "core-agent", Branch: "main", SHA: "abc123"} type ChangedRepo struct { Repo string `json:"repo"` Branch string `json:"branch"` SHA string `json:"sha"` } -// syncRepos calls the checkin API and pulls any repos that changed. -// Returns a human-readable message if repos were updated, empty string otherwise. func (m *Subsystem) syncRepos() string { agentName := agentic.AgentName() checkinURL := core.Sprintf("%s/v1/agent/checkin?agent=%s&since=%d", monitorAPIURL(), url.QueryEscape(agentName), m.lastSyncTimestamp) brainKey := monitorBrainKey() - hr := agentic.HTTPGet(context.Background(), checkinURL, brainKey, "Bearer") - if !hr.OK { + httpResult := agentic.HTTPGet(context.Background(), checkinURL, brainKey, "Bearer") + if !httpResult.OK { return "" } var checkin CheckinResponse - if r := core.JSONUnmarshalString(hr.Value.(string), &checkin); !r.OK { + if parseResult := core.JSONUnmarshalString(httpResult.Value.(string), &checkin); !parseResult.OK { return "" } if len(checkin.Changed) == 0 { - // No changes — safe to advance timestamp m.mu.Lock() m.lastSyncTimestamp = checkin.Timestamp m.mu.Unlock() return "" } - // Pull changed repos basePath := core.Env("CODE_PATH") if basePath == "" { basePath = core.JoinPath(agentic.HomeDir(), "Code", "core") @@ -63,7 +53,6 @@ func (m *Subsystem) syncRepos() string { var pulled []string for _, repo := range checkin.Changed { - // Sanitise repo name to prevent path traversal from API response repoName := core.PathBase(core.Replace(repo.Repo, "\\", "/")) if repoName == "." || repoName == ".." || repoName == "" { continue @@ -73,38 +62,30 @@ func (m *Subsystem) syncRepos() string { continue } - // Check if on the default branch and clean current := m.gitOutput(repoDir, "rev-parse", "--abbrev-ref", "HEAD") if current == "" { continue } - // Determine which branch to pull — use server-reported branch, - // fall back to current if server didn't specify targetBranch := repo.Branch if targetBranch == "" { targetBranch = current } - // Only pull if we're on the target branch (or it's a default branch) if current != targetBranch { - continue // On a different branch — skip + continue } status := m.gitOutput(repoDir, "status", "--porcelain") if len(status) > 0 { - continue // Don't pull if dirty + continue } - // Fast-forward pull the target branch if m.gitOK(repoDir, "pull", "--ff-only", "origin", targetBranch) { pulled = append(pulled, repo.Repo) } } - // Only advance timestamp if we handled all reported repos. - // If any were skipped (dirty, wrong branch, missing), keep the - // old timestamp so the server reports them again next cycle. skipped := len(checkin.Changed) - len(pulled) if skipped == 0 { m.mu.Lock() @@ -119,8 +100,6 @@ func (m *Subsystem) syncRepos() string { return core.Sprintf("Synced %d repo(s): %s", len(pulled), core.Join(", ", pulled...)) } -// lastSyncTimestamp is stored on the subsystem — add it via the check cycle. -// Initialised to "now" on first run so we don't pull everything on startup. func (m *Subsystem) initSyncTimestamp() { m.mu.Lock() if m.lastSyncTimestamp == 0 {