diff --git a/pkg/agentic/ingest.go b/pkg/agentic/ingest.go
index 2c12dba..9cccd73 100644
--- a/pkg/agentic/ingest.go
+++ b/pkg/agentic/ingest.go
@@ -8,65 +8,55 @@ import (
core "dappco.re/go/core"
)
-// ingestFindings reads the agent output log and creates issues via the API
-// for scan/audit results. Only runs for conventions and security templates.
func (s *PrepSubsystem) ingestFindings(wsDir string) {
- result := ReadStatusResult(wsDir)
- st, ok := workspaceStatusValue(result)
- if !ok || st.Status != "completed" {
+ statusResult := ReadStatusResult(wsDir)
+ workspaceStatus, ok := workspaceStatusValue(statusResult)
+ if !ok || workspaceStatus.Status != "completed" {
return
}
- // Read the log file
logFiles := workspaceLogFiles(wsDir)
if len(logFiles) == 0 {
return
}
- r := fs.Read(logFiles[0])
- if !r.OK || len(r.Value.(string)) < 100 {
+ logResult := fs.Read(logFiles[0])
+ if !logResult.OK || len(logResult.Value.(string)) < 100 {
return
}
- body := r.Value.(string)
+ logBody := logResult.Value.(string)
- // Skip quota errors
- if core.Contains(body, "QUOTA_EXHAUSTED") || core.Contains(body, "QuotaError") {
+ if core.Contains(logBody, "QUOTA_EXHAUSTED") || core.Contains(logBody, "QuotaError") {
return
}
- // Only ingest if there are actual findings (file:line references)
- findings := countFileRefs(body)
+ findings := countFileRefs(logBody)
if findings < 2 {
- return // No meaningful findings
+ return
}
- // Determine issue type from the template used
issueType := "task"
priority := "normal"
- if core.Contains(body, "security") || core.Contains(body, "Security") {
+ if core.Contains(logBody, "security") || core.Contains(logBody, "Security") {
issueType = "bug"
priority = "high"
}
- // Create a single issue per repo with all findings in the body
- title := core.Sprintf("Scan findings for %s (%d items)", st.Repo, findings)
+ title := core.Sprintf("Scan findings for %s (%d items)", workspaceStatus.Repo, findings)
- // Truncate body to reasonable size for issue description
- description := body
- if len(description) > 10000 {
- description = core.Concat(description[:10000], "\n\n... (truncated, see full log in workspace)")
+ issueDescription := logBody
+ if len(issueDescription) > 10000 {
+ issueDescription = core.Concat(issueDescription[:10000], "\n\n... (truncated, see full log in workspace)")
}
- s.createIssueViaAPI(st.Repo, title, description, issueType, priority, "scan")
+ s.createIssueViaAPI(title, issueDescription, issueType, priority)
}
-// countFileRefs counts file:line references in the output (indicates real findings)
func countFileRefs(body string) int {
count := 0
for i := 0; i < len(body)-5; i++ {
if body[i] == '`' {
- // Look for pattern: `file.go:123`
j := i + 1
for j < len(body) && body[j] != '`' && j-i < 100 {
j++
@@ -82,20 +72,18 @@ func countFileRefs(body string) int {
return count
}
-// createIssueViaAPI posts an issue to the lthn.sh API
-func (s *PrepSubsystem) createIssueViaAPI(repo, title, description, issueType, priority, source string) {
+func (s *PrepSubsystem) createIssueViaAPI(title, description, issueType, priority string) {
if s.brainKey == "" {
return
}
- // Read the agent API key from file
- r := fs.Read(core.JoinPath(HomeDir(), ".claude", "agent-api.key"))
- if !r.OK {
+ apiKeyResult := fs.Read(core.JoinPath(HomeDir(), ".claude", "agent-api.key"))
+ if !apiKeyResult.OK {
return
}
- apiKey := core.Trim(r.Value.(string))
+ apiKey := core.Trim(apiKeyResult.Value.(string))
- payload := core.JSONMarshalString(map[string]string{
+ issuePayload := core.JSONMarshalString(map[string]string{
"title": title,
"description": description,
"type": issueType,
@@ -103,5 +91,5 @@ func (s *PrepSubsystem) createIssueViaAPI(repo, title, description, issueType, p
"reporter": "cladius",
})
- HTTPPost(context.Background(), core.Concat(s.brainURL, "/v1/issues"), payload, apiKey, "Bearer")
+ HTTPPost(context.Background(), core.Concat(s.brainURL, "/v1/issues"), issuePayload, apiKey, "Bearer")
}
diff --git a/pkg/agentic/ingest_test.go b/pkg/agentic/ingest_test.go
index 592b11a..765ea0a 100644
--- a/pkg/agentic/ingest_test.go
+++ b/pkg/agentic/ingest_test.go
@@ -220,7 +220,7 @@ func TestIngest_CreateIssueViaAPI_Good_Success(t *testing.T) {
failCount: make(map[string]int),
}
- s.createIssueViaAPI("go-io", "Test Issue", "Description", "bug", "high", "scan")
+ s.createIssueViaAPI("Test Issue", "Description", "bug", "high")
assert.True(t, called)
}
@@ -234,7 +234,7 @@ func TestIngest_CreateIssueViaAPI_Bad_NoBrainKey(t *testing.T) {
// Should return early without panic
assert.NotPanics(t, func() {
- s.createIssueViaAPI("go-io", "Title", "Body", "task", "normal", "scan")
+ s.createIssueViaAPI("Title", "Body", "task", "normal")
})
}
@@ -253,7 +253,7 @@ func TestIngest_CreateIssueViaAPI_Bad_NoAPIKey(t *testing.T) {
// Should return early — no API key file
assert.NotPanics(t, func() {
- s.createIssueViaAPI("go-io", "Title", "Body", "task", "normal", "scan")
+ s.createIssueViaAPI("Title", "Body", "task", "normal")
})
}
@@ -278,7 +278,7 @@ func TestIngest_CreateIssueViaAPI_Bad_ServerError(t *testing.T) {
// Should not panic even on server error
assert.NotPanics(t, func() {
- s.createIssueViaAPI("go-io", "Title", "Body", "task", "normal", "scan")
+ s.createIssueViaAPI("Title", "Body", "task", "normal")
})
}
@@ -344,7 +344,7 @@ func TestIngest_CreateIssueViaAPI_Ugly(t *testing.T) {
failCount: make(map[string]int),
}
- s.createIssueViaAPI("go-io", "XSS Test", "bold&", "bug", "high", "scan")
+ s.createIssueViaAPI("XSS Test", "bold&", "bug", "high")
assert.True(t, called)
}
diff --git a/pkg/agentic/scan.go b/pkg/agentic/scan.go
index 93d252f..7475ab9 100644
--- a/pkg/agentic/scan.go
+++ b/pkg/agentic/scan.go
@@ -10,27 +10,21 @@ import (
"github.com/modelcontextprotocol/go-sdk/mcp"
)
-// ScanInput is the input for agentic_scan.
-//
-// input := agentic.ScanInput{Org: "core", Labels: []string{"agentic", "bug"}, Limit: 20}
+// input := agentic.ScanInput{Org: "core", Labels: []string{"agentic", "bug"}, Limit: 20}
type ScanInput struct {
- Org string `json:"org,omitempty"` // default "core"
- Labels []string `json:"labels,omitempty"` // filter by labels (default: agentic, help-wanted, bug)
- Limit int `json:"limit,omitempty"` // max issues to return
+ Org string `json:"org,omitempty"`
+ Labels []string `json:"labels,omitempty"`
+ Limit int `json:"limit,omitempty"`
}
-// ScanOutput is the output for agentic_scan.
-//
-// out := agentic.ScanOutput{Success: true, Count: 1, Issues: []agentic.ScanIssue{{Repo: "go-io", Number: 12}}}
+// out := agentic.ScanOutput{Success: true, Count: 1, Issues: []agentic.ScanIssue{{Repo: "go-io", Number: 12}}}
type ScanOutput struct {
Success bool `json:"success"`
Count int `json:"count"`
Issues []ScanIssue `json:"issues"`
}
-// ScanIssue is a single actionable issue.
-//
-// issue := agentic.ScanIssue{Repo: "go-io", Number: 12, Title: "Replace fmt.Errorf"}
+// issue := agentic.ScanIssue{Repo: "go-io", Number: 12, Title: "Replace fmt.Errorf"}
type ScanIssue struct {
Repo string `json:"repo"`
Number int `json:"number"`
@@ -57,15 +51,14 @@ func (s *PrepSubsystem) scan(ctx context.Context, _ *mcp.CallToolRequest, input
var allIssues []ScanIssue
- // Get repos for the org
repos, err := s.listOrgRepos(ctx, input.Org)
if err != nil {
return nil, ScanOutput{}, err
}
- for _, repo := range repos {
+ for _, repoName := range repos {
for _, label := range input.Labels {
- issues, err := s.listRepoIssues(ctx, input.Org, repo, label)
+ issues, err := s.listRepoIssues(ctx, input.Org, repoName, label)
if err != nil {
continue
}
@@ -80,7 +73,6 @@ func (s *PrepSubsystem) scan(ctx context.Context, _ *mcp.CallToolRequest, input
}
}
- // Deduplicate by repo+number
seen := make(map[string]bool)
var unique []ScanIssue
for _, issue := range allIssues {
@@ -109,20 +101,20 @@ func (s *PrepSubsystem) listOrgRepos(ctx context.Context, org string) ([]string,
}
var allNames []string
- for _, r := range repos {
- allNames = append(allNames, r.Name)
+ for _, repoInfo := range repos {
+ allNames = append(allNames, repoInfo.Name)
}
return allNames, nil
}
func (s *PrepSubsystem) listRepoIssues(ctx context.Context, org, repo, label string) ([]ScanIssue, error) {
- u := core.Sprintf("%s/api/v1/repos/%s/%s/issues?state=open&limit=10&type=issues",
+ requestURL := core.Sprintf("%s/api/v1/repos/%s/%s/issues?state=open&limit=10&type=issues",
s.forgeURL, org, repo)
if label != "" {
- u = core.Concat(u, "&labels=", url.QueryEscape(label))
+ requestURL = core.Concat(requestURL, "&labels=", url.QueryEscape(label))
}
- r := HTTPGet(ctx, u, s.forgeToken, "token")
- if !r.OK {
+ httpResult := HTTPGet(ctx, requestURL, s.forgeToken, "token")
+ if !httpResult.OK {
return nil, core.E("scan.listRepoIssues", core.Concat("failed to list issues for ", repo), nil)
}
@@ -137,16 +129,16 @@ func (s *PrepSubsystem) listRepoIssues(ctx context.Context, org, repo, label str
} `json:"assignee"`
HTMLURL string `json:"html_url"`
}
- if ur := core.JSONUnmarshalString(r.Value.(string), &issues); !ur.OK {
- err, _ := ur.Value.(error)
+ if parseResult := core.JSONUnmarshalString(httpResult.Value.(string), &issues); !parseResult.OK {
+ err, _ := parseResult.Value.(error)
return nil, core.E("scan.listRepoIssues", "parse issues response", err)
}
var result []ScanIssue
for _, issue := range issues {
var labels []string
- for _, l := range issue.Labels {
- labels = append(labels, l.Name)
+ for _, labelInfo := range issue.Labels {
+ labels = append(labels, labelInfo.Name)
}
assignee := ""
if issue.Assignee != nil {
diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go
index c489eda..1a00844 100644
--- a/pkg/monitor/monitor.go
+++ b/pkg/monitor/monitor.go
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: EUPL-1.2
-// Package monitor polls workspace state, repo drift, and agent inboxes, then
-// pushes the current view to connected MCP clients.
+// Package monitor keeps workspace state and inbox status visible to MCP clients.
//
// mon := monitor.New(monitor.Options{Interval: 30 * time.Second})
// mon.RegisterTools(server)
@@ -20,10 +19,8 @@ import (
"github.com/modelcontextprotocol/go-sdk/mcp"
)
-// fs provides unrestricted filesystem access (root "/" = no sandbox).
-//
-// r := fs.Read(core.JoinPath(wsRoot, name, "status.json"))
-// if text, ok := resultString(r); ok { _ = core.JSONUnmarshalString(text, &st) }
+// r := fs.Read(core.JoinPath(wsRoot, name, "status.json"))
+// if text, ok := resultString(r); ok { _ = core.JSONUnmarshalString(text, &st) }
var fs = agentic.LocalFs()
type channelSender interface {
@@ -61,10 +58,8 @@ func resultString(r core.Result) (string, bool) {
return value, true
}
-// Subsystem owns the long-running monitor loop and MCP resource surface.
-//
-// mon := monitor.New()
-// mon.Start(context.Background())
+// mon := monitor.New()
+// mon.Start(context.Background())
type Subsystem struct {
*core.ServiceRuntime[Options]
server *mcp.Server
@@ -72,23 +67,19 @@ type Subsystem struct {
cancel context.CancelFunc
wg sync.WaitGroup
- // Track last seen state to only notify on changes
- lastCompletedCount int // completed workspaces seen on the last scan
- seenCompleted map[string]bool // workspace names we've already notified about
- seenRunning map[string]bool // workspace names we've already sent start notification for
- completionsSeeded bool // true after first completions check
- lastInboxMaxID int // highest message ID seen
- inboxSeeded bool // true after first inbox check
- lastSyncTimestamp int64
- mu sync.Mutex
+ seenCompleted map[string]bool
+ seenRunning map[string]bool
+ completionsSeeded bool
+ lastInboxMaxID int
+ inboxSeeded bool
+ lastSyncTimestamp int64
+ mu sync.Mutex
- // Event-driven poke channel — dispatch goroutine sends here on completion
poke chan struct{}
}
var _ coremcp.Subsystem = (*Subsystem)(nil)
-// SetCore preserves direct test setup without going through core.WithService.
// Deprecated: prefer Register with core.WithService(monitor.Register).
//
// mon.SetCore(c)
@@ -96,14 +87,12 @@ func (m *Subsystem) SetCore(c *core.Core) {
m.ServiceRuntime = core.NewServiceRuntime(c, Options{})
}
-// handleAgentStarted tracks started agents.
func (m *Subsystem) handleAgentStarted(ev messages.AgentStarted) {
m.mu.Lock()
m.seenRunning[ev.Workspace] = true
m.mu.Unlock()
}
-// handleAgentCompleted processes agent completion — emits notifications and checks queue drain.
func (m *Subsystem) handleAgentCompleted(ev messages.AgentCompleted) {
m.mu.Lock()
m.seenCompleted[ev.Workspace] = true
@@ -113,10 +102,8 @@ func (m *Subsystem) handleAgentCompleted(ev messages.AgentCompleted) {
go m.checkIdleAfterDelay()
}
-// HandleIPCEvents lets Core auto-wire monitor side-effects for IPC messages.
-//
-// c.ACTION(messages.AgentStarted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-5"})
-// c.ACTION(messages.AgentCompleted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-5", Status: "completed"})
+// c.ACTION(messages.AgentStarted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-5"})
+// c.ACTION(messages.AgentCompleted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-5", Status: "completed"})
func (m *Subsystem) HandleIPCEvents(_ *core.Core, msg core.Message) core.Result {
switch ev := msg.(type) {
case messages.AgentCompleted:
@@ -127,24 +114,18 @@ func (m *Subsystem) HandleIPCEvents(_ *core.Core, msg core.Message) core.Result
return core.Result{OK: true}
}
-// Options configures the monitor polling interval.
-//
-// opts := monitor.Options{Interval: 30 * time.Second}
-// mon := monitor.New(opts)
+// opts := monitor.Options{Interval: 30 * time.Second}
+// mon := monitor.New(opts)
type Options struct {
- // Interval between checks (default: 2 minutes)
Interval time.Duration
}
-// New builds the monitor with a polling interval and poke channel.
-//
-// mon := monitor.New(monitor.Options{Interval: 30 * time.Second})
+// mon := monitor.New(monitor.Options{Interval: 30 * time.Second})
func New(opts ...Options) *Subsystem {
interval := 2 * time.Minute
if len(opts) > 0 && opts[0].Interval > 0 {
interval = opts[0].Interval
}
- // Override via env for debugging
if envInterval := core.Env("MONITOR_INTERVAL"); envInterval != "" {
if d, err := time.ParseDuration(envInterval); err == nil {
interval = d
@@ -158,23 +139,17 @@ func New(opts ...Options) *Subsystem {
}
}
-// debugChannel logs a debug message.
-func (m *Subsystem) debugChannel(msg string) {
+func (m *Subsystem) debug(msg string) {
core.Debug(msg)
}
-// Name keeps the monitor address stable for MCP and core.WithService lookups.
-//
-// name := mon.Name() // "monitor"
+// name := mon.Name() // "monitor"
func (m *Subsystem) Name() string { return "monitor" }
-// RegisterTools publishes the monitor status resource on an MCP server.
-//
-// mon.RegisterTools(server)
+// mon.RegisterTools(server)
func (m *Subsystem) RegisterTools(server *mcp.Server) {
m.server = server
- // Register a resource that clients can read for current status
server.AddResource(&mcp.Resource{
Name: "Agent Status",
URI: "status://agents",
@@ -183,11 +158,9 @@ func (m *Subsystem) RegisterTools(server *mcp.Server) {
}, m.agentStatusResource)
}
-// Start launches the background polling loop.
-//
-// mon.Start(ctx)
+// mon.Start(ctx)
func (m *Subsystem) Start(ctx context.Context) {
- monCtx, cancel := context.WithCancel(ctx)
+ monitorCtx, cancel := context.WithCancel(ctx)
m.cancel = cancel
core.Info("monitor: started (interval=%s)", m.interval)
@@ -195,31 +168,25 @@ func (m *Subsystem) Start(ctx context.Context) {
m.wg.Add(1)
go func() {
defer m.wg.Done()
- m.loop(monCtx)
+ m.loop(monitorCtx)
}()
}
-// OnStartup starts the monitor when Core starts the service lifecycle.
-//
-// r := mon.OnStartup(context.Background())
-// core.Println(r.OK)
+// r := mon.OnStartup(context.Background())
+// core.Println(r.OK)
func (m *Subsystem) OnStartup(ctx context.Context) core.Result {
m.Start(ctx)
return core.Result{OK: true}
}
-// OnShutdown stops the monitor through the Core lifecycle hook.
-//
-// r := mon.OnShutdown(context.Background())
-// core.Println(r.OK)
+// r := mon.OnShutdown(context.Background())
+// core.Println(r.OK)
func (m *Subsystem) OnShutdown(ctx context.Context) core.Result {
_ = m.Shutdown(ctx)
return core.Result{OK: true}
}
-// Shutdown cancels the monitor loop and waits for the goroutine to exit.
-//
-// _ = mon.Shutdown(ctx)
+// _ = mon.Shutdown(ctx)
func (m *Subsystem) Shutdown(_ context.Context) error {
if m.cancel != nil {
m.cancel()
@@ -228,9 +195,7 @@ func (m *Subsystem) Shutdown(_ context.Context) error {
return nil
}
-// Poke asks the loop to run a check immediately instead of waiting for the ticker.
-//
-// mon.Poke()
+// mon.Poke()
func (m *Subsystem) Poke() {
select {
case m.poke <- struct{}{}:
@@ -238,11 +203,8 @@ func (m *Subsystem) Poke() {
}
}
-// checkIdleAfterDelay waits briefly then checks if the fleet is genuinely idle.
-// Only emits queue.drained when there are truly zero running or queued agents,
-// verified by checking managed processes are alive, not just trusting status files.
func (m *Subsystem) checkIdleAfterDelay() {
- time.Sleep(5 * time.Second) // wait for queue drain to fill slots
+ time.Sleep(5 * time.Second)
if m.ServiceRuntime == nil {
return
}
@@ -253,8 +215,6 @@ func (m *Subsystem) checkIdleAfterDelay() {
}
}
-// countLiveWorkspaces counts workspaces that are genuinely active.
-// For "running" status, verifies the managed process is still alive.
func (m *Subsystem) countLiveWorkspaces() (running, queued int) {
var runtime *core.Core
if m.ServiceRuntime != nil {
@@ -262,17 +222,17 @@ func (m *Subsystem) countLiveWorkspaces() (running, queued int) {
}
for _, path := range agentic.WorkspaceStatusPaths() {
wsDir := core.PathDir(path)
- r := agentic.ReadStatusResult(wsDir)
- if !r.OK {
+ statusResult := agentic.ReadStatusResult(wsDir)
+ if !statusResult.OK {
continue
}
- st, ok := r.Value.(*agentic.WorkspaceStatus)
- if !ok || st == nil {
+ workspaceStatus, ok := statusResult.Value.(*agentic.WorkspaceStatus)
+ if !ok || workspaceStatus == nil {
continue
}
- switch st.Status {
+ switch workspaceStatus.Status {
case "running":
- if st.PID > 0 && processAlive(runtime, st.ProcessID, st.PID) {
+ if workspaceStatus.PID > 0 && processAlive(runtime, workspaceStatus.ProcessID, workspaceStatus.PID) {
running++
}
case "queued":
@@ -282,23 +242,19 @@ func (m *Subsystem) countLiveWorkspaces() (running, queued int) {
return
}
-// processAlive checks whether a managed process is still running.
func processAlive(c *core.Core, processID string, pid int) bool {
return agentic.ProcessAlive(c, processID, pid)
}
func (m *Subsystem) loop(ctx context.Context) {
- // Initial check after short delay (let server fully start)
select {
case <-ctx.Done():
return
case <-time.After(5 * time.Second):
}
- // Initialise sync timestamp to now (don't pull everything on first run)
m.initSyncTimestamp()
- // Run first check immediately
m.check(ctx)
ticker := time.NewTicker(m.interval)
@@ -319,27 +275,22 @@ func (m *Subsystem) loop(ctx context.Context) {
func (m *Subsystem) check(ctx context.Context) {
var messages []string
- // Check agent completions
if msg := m.checkCompletions(); msg != "" {
messages = append(messages, msg)
}
- // Harvest completed workspaces — push branches, check for binaries
if msg := m.harvestCompleted(); msg != "" {
messages = append(messages, msg)
}
- // Check inbox
if msg := m.checkInbox(); msg != "" {
messages = append(messages, msg)
}
- // Sync repos from other agents' changes
if msg := m.syncRepos(); msg != "" {
messages = append(messages, msg)
}
- // Only notify if there's something new
if len(messages) == 0 {
return
}
@@ -347,7 +298,6 @@ func (m *Subsystem) check(ctx context.Context) {
combined := core.Join("\n", messages...)
m.notify(ctx, combined)
- // Notify resource subscribers that agent status changed
if m.server != nil {
m.server.ResourceUpdated(ctx, &mcp.ResourceUpdatedNotificationParams{
URI: "status://agents",
@@ -355,9 +305,6 @@ func (m *Subsystem) check(ctx context.Context) {
}
}
-// checkCompletions scans workspace for newly completed agents.
-// Tracks by workspace name (not count) so harvest status rewrites
-// don't suppress future notifications.
func (m *Subsystem) checkCompletions() string {
entries := agentic.WorkspaceStatusPaths()
@@ -369,39 +316,38 @@ func (m *Subsystem) checkCompletions() string {
m.mu.Lock()
seeded := m.completionsSeeded
for _, entry := range entries {
- r := fs.Read(entry)
- if !r.OK {
+ entryResult := fs.Read(entry)
+ if !entryResult.OK {
continue
}
- entryData, ok := resultString(r)
+ entryData, ok := resultString(entryResult)
if !ok {
continue
}
- var st struct {
+ var workspaceStatus struct {
Status string `json:"status"`
Repo string `json:"repo"`
Agent string `json:"agent"`
}
- if r := core.JSONUnmarshalString(entryData, &st); !r.OK {
+ if parseResult := core.JSONUnmarshalString(entryData, &workspaceStatus); !parseResult.OK {
continue
}
wsName := agentic.WorkspaceName(core.PathDir(entry))
- switch st.Status {
+ switch workspaceStatus.Status {
case "completed":
completed++
if !m.seenCompleted[wsName] {
m.seenCompleted[wsName] = true
if seeded {
- newlyCompleted = append(newlyCompleted, core.Sprintf("%s (%s)", st.Repo, st.Agent))
+ newlyCompleted = append(newlyCompleted, core.Sprintf("%s (%s)", workspaceStatus.Repo, workspaceStatus.Agent))
}
}
case "running":
running++
if !m.seenRunning[wsName] && seeded {
m.seenRunning[wsName] = true
- // No individual start notification — too noisy
}
case "queued":
queued++
@@ -409,12 +355,11 @@ func (m *Subsystem) checkCompletions() string {
if !m.seenCompleted[wsName] {
m.seenCompleted[wsName] = true
if seeded {
- newlyCompleted = append(newlyCompleted, core.Sprintf("%s (%s) [%s]", st.Repo, st.Agent, st.Status))
+ newlyCompleted = append(newlyCompleted, core.Sprintf("%s (%s) [%s]", workspaceStatus.Repo, workspaceStatus.Agent, workspaceStatus.Status))
}
}
}
}
- m.lastCompletedCount = completed
m.completionsSeeded = true
m.mu.Unlock()
@@ -422,7 +367,6 @@ func (m *Subsystem) checkCompletions() string {
return ""
}
- // Only emit queue.drained when genuinely empty — verified by live process checks
liveRunning, liveQueued := m.countLiveWorkspaces()
if m.ServiceRuntime != nil && liveRunning == 0 && liveQueued == 0 {
m.Core().ACTION(messages.QueueDrained{Completed: len(newlyCompleted)})
@@ -438,18 +382,16 @@ func (m *Subsystem) checkCompletions() string {
return msg
}
-// checkInbox checks for unread messages.
func (m *Subsystem) checkInbox() string {
apiKeyStr := monitorBrainKey()
if apiKeyStr == "" {
return ""
}
- // Call the API to check inbox
apiURL := monitorAPIURL()
inboxURL := core.Concat(apiURL, "/v1/messages/inbox?agent=", url.QueryEscape(agentic.AgentName()))
- hr := agentic.HTTPGet(context.Background(), inboxURL, core.Trim(apiKeyStr), "Bearer")
- if !hr.OK {
+ httpResult := agentic.HTTPGet(context.Background(), inboxURL, core.Trim(apiKeyStr), "Bearer")
+ if !httpResult.OK {
return ""
}
@@ -462,12 +404,11 @@ func (m *Subsystem) checkInbox() string {
Content string `json:"content"`
} `json:"data"`
}
- if r := core.JSONUnmarshalString(hr.Value.(string), &resp); !r.OK {
- m.debugChannel("checkInbox: failed to decode response")
+ if parseResult := core.JSONUnmarshalString(httpResult.Value.(string), &resp); !parseResult.OK {
+ m.debug("checkInbox: failed to decode response")
return ""
}
- // Find max ID, count unread, collect new messages
maxID := 0
unread := 0
@@ -484,20 +425,19 @@ func (m *Subsystem) checkInbox() string {
}
var newMessages []newMessage
- for _, msg := range resp.Data {
- if msg.ID > maxID {
- maxID = msg.ID
+ for _, message := range resp.Data {
+ if message.ID > maxID {
+ maxID = message.ID
}
- if !msg.Read {
+ if !message.Read {
unread++
}
- // Collect messages newer than what we've seen
- if msg.ID > prevMaxID {
+ if message.ID > prevMaxID {
newMessages = append(newMessages, newMessage{
- ID: msg.ID,
- From: msg.From,
- Subject: msg.Subject,
- Content: msg.Content,
+ ID: message.ID,
+ From: message.From,
+ Subject: message.Subject,
+ Content: message.Content,
})
}
}
@@ -507,24 +447,21 @@ func (m *Subsystem) checkInbox() string {
m.inboxSeeded = true
m.mu.Unlock()
- // First check after startup: seed, don't fire
if !seeded {
return ""
}
- // Only fire if there are new messages (higher ID than last seen)
if maxID <= prevMaxID || len(newMessages) == 0 {
return ""
}
- // Push each message as a channel event so it lands in the session
if m.ServiceRuntime != nil {
if notifier, ok := core.ServiceFor[channelSender](m.Core(), "mcp"); ok {
- for _, msg := range newMessages {
+ for _, message := range newMessages {
notifier.ChannelSend(context.Background(), "inbox.message", map[string]any{
- "from": msg.From,
- "subject": msg.Subject,
- "content": msg.Content,
+ "from": message.From,
+ "subject": message.Subject,
+ "content": message.Content,
})
}
}
@@ -533,15 +470,13 @@ func (m *Subsystem) checkInbox() string {
return core.Sprintf("%d unread message(s) in inbox", unread)
}
-// notify sends a log notification to all connected MCP sessions.
func (m *Subsystem) notify(ctx context.Context, message string) {
if m.server == nil {
return
}
- // Use the server's session list to broadcast
- for ss := range m.server.Sessions() {
- ss.Log(ctx, &mcp.LoggingMessageParams{
+ for session := range m.server.Sessions() {
+ session.Log(ctx, &mcp.LoggingMessageParams{
Level: "info",
Logger: "monitor",
Data: message,
@@ -549,11 +484,10 @@ func (m *Subsystem) notify(ctx context.Context, message string) {
}
}
-// agentStatusResource returns current workspace status as a JSON resource.
-func (m *Subsystem) agentStatusResource(ctx context.Context, req *mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) {
+func (m *Subsystem) agentStatusResource(_ context.Context, _ *mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) {
entries := agentic.WorkspaceStatusPaths()
- type wsInfo struct {
+ type workspaceInfo struct {
Name string `json:"name"`
Status string `json:"status"`
Repo string `json:"repo"`
@@ -561,31 +495,31 @@ func (m *Subsystem) agentStatusResource(ctx context.Context, req *mcp.ReadResour
PRURL string `json:"pr_url,omitempty"`
}
- var workspaces []wsInfo
+ var workspaces []workspaceInfo
for _, entry := range entries {
- r := fs.Read(entry)
- if !r.OK {
+ entryResult := fs.Read(entry)
+ if !entryResult.OK {
continue
}
- entryData, ok := resultString(r)
+ entryData, ok := resultString(entryResult)
if !ok {
continue
}
- var st struct {
+ var workspaceStatus struct {
Status string `json:"status"`
Repo string `json:"repo"`
Agent string `json:"agent"`
PRURL string `json:"pr_url"`
}
- if r := core.JSONUnmarshalString(entryData, &st); !r.OK {
+ if parseResult := core.JSONUnmarshalString(entryData, &workspaceStatus); !parseResult.OK {
continue
}
- workspaces = append(workspaces, wsInfo{
+ workspaces = append(workspaces, workspaceInfo{
Name: agentic.WorkspaceName(core.PathDir(entry)),
- Status: st.Status,
- Repo: st.Repo,
- Agent: st.Agent,
- PRURL: st.PRURL,
+ Status: workspaceStatus.Status,
+ Repo: workspaceStatus.Repo,
+ Agent: workspaceStatus.Agent,
+ PRURL: workspaceStatus.PRURL,
})
}
diff --git a/pkg/monitor/sync.go b/pkg/monitor/sync.go
index 79e69b5..3ce662c 100644
--- a/pkg/monitor/sync.go
+++ b/pkg/monitor/sync.go
@@ -11,51 +11,41 @@ import (
core "dappco.re/go/core"
)
-// CheckinResponse is what the API returns for an agent checkin.
-//
-// resp := monitor.CheckinResponse{Changed: []monitor.ChangedRepo{{Repo: "core-agent", Branch: "main", SHA: "abc123"}}, Timestamp: 1712345678}
+// resp := monitor.CheckinResponse{Changed: []monitor.ChangedRepo{{Repo: "core-agent", Branch: "main", SHA: "abc123"}}, Timestamp: 1712345678}
type CheckinResponse struct {
- // Repos that have new commits since the agent's last checkin.
- Changed []ChangedRepo `json:"changed,omitempty"`
- // Server timestamp — use as "since" on next checkin.
- Timestamp int64 `json:"timestamp"`
+ Changed []ChangedRepo `json:"changed,omitempty"`
+ Timestamp int64 `json:"timestamp"`
}
-// ChangedRepo is a repo that has new commits.
-//
-// repo := monitor.ChangedRepo{Repo: "core-agent", Branch: "main", SHA: "abc123"}
+// repo := monitor.ChangedRepo{Repo: "core-agent", Branch: "main", SHA: "abc123"}
type ChangedRepo struct {
Repo string `json:"repo"`
Branch string `json:"branch"`
SHA string `json:"sha"`
}
-// syncRepos calls the checkin API and pulls any repos that changed.
-// Returns a human-readable message if repos were updated, empty string otherwise.
func (m *Subsystem) syncRepos() string {
agentName := agentic.AgentName()
checkinURL := core.Sprintf("%s/v1/agent/checkin?agent=%s&since=%d", monitorAPIURL(), url.QueryEscape(agentName), m.lastSyncTimestamp)
brainKey := monitorBrainKey()
- hr := agentic.HTTPGet(context.Background(), checkinURL, brainKey, "Bearer")
- if !hr.OK {
+ httpResult := agentic.HTTPGet(context.Background(), checkinURL, brainKey, "Bearer")
+ if !httpResult.OK {
return ""
}
var checkin CheckinResponse
- if r := core.JSONUnmarshalString(hr.Value.(string), &checkin); !r.OK {
+ if parseResult := core.JSONUnmarshalString(httpResult.Value.(string), &checkin); !parseResult.OK {
return ""
}
if len(checkin.Changed) == 0 {
- // No changes — safe to advance timestamp
m.mu.Lock()
m.lastSyncTimestamp = checkin.Timestamp
m.mu.Unlock()
return ""
}
- // Pull changed repos
basePath := core.Env("CODE_PATH")
if basePath == "" {
basePath = core.JoinPath(agentic.HomeDir(), "Code", "core")
@@ -63,7 +53,6 @@ func (m *Subsystem) syncRepos() string {
var pulled []string
for _, repo := range checkin.Changed {
- // Sanitise repo name to prevent path traversal from API response
repoName := core.PathBase(core.Replace(repo.Repo, "\\", "/"))
if repoName == "." || repoName == ".." || repoName == "" {
continue
@@ -73,38 +62,30 @@ func (m *Subsystem) syncRepos() string {
continue
}
- // Check if on the default branch and clean
current := m.gitOutput(repoDir, "rev-parse", "--abbrev-ref", "HEAD")
if current == "" {
continue
}
- // Determine which branch to pull — use server-reported branch,
- // fall back to current if server didn't specify
targetBranch := repo.Branch
if targetBranch == "" {
targetBranch = current
}
- // Only pull if we're on the target branch (or it's a default branch)
if current != targetBranch {
- continue // On a different branch — skip
+ continue
}
status := m.gitOutput(repoDir, "status", "--porcelain")
if len(status) > 0 {
- continue // Don't pull if dirty
+ continue
}
- // Fast-forward pull the target branch
if m.gitOK(repoDir, "pull", "--ff-only", "origin", targetBranch) {
pulled = append(pulled, repo.Repo)
}
}
- // Only advance timestamp if we handled all reported repos.
- // If any were skipped (dirty, wrong branch, missing), keep the
- // old timestamp so the server reports them again next cycle.
skipped := len(checkin.Changed) - len(pulled)
if skipped == 0 {
m.mu.Lock()
@@ -119,8 +100,6 @@ func (m *Subsystem) syncRepos() string {
return core.Sprintf("Synced %d repo(s): %s", len(pulled), core.Join(", ", pulled...))
}
-// lastSyncTimestamp is stored on the subsystem — add it via the check cycle.
-// Initialised to "now" on first run so we don't pull everything on startup.
func (m *Subsystem) initSyncTimestamp() {
m.mu.Lock()
if m.lastSyncTimestamp == 0 {