fix(ax): streamline agentic and monitor helpers

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-03-30 20:45:23 +00:00
parent b4f5542b21
commit f32edaa17e
5 changed files with 129 additions and 236 deletions

View file

@ -8,65 +8,55 @@ import (
core "dappco.re/go/core"
)
// ingestFindings reads the agent output log and creates issues via the API
// for scan/audit results. Only runs for conventions and security templates.
func (s *PrepSubsystem) ingestFindings(wsDir string) {
result := ReadStatusResult(wsDir)
st, ok := workspaceStatusValue(result)
if !ok || st.Status != "completed" {
statusResult := ReadStatusResult(wsDir)
workspaceStatus, ok := workspaceStatusValue(statusResult)
if !ok || workspaceStatus.Status != "completed" {
return
}
// Read the log file
logFiles := workspaceLogFiles(wsDir)
if len(logFiles) == 0 {
return
}
r := fs.Read(logFiles[0])
if !r.OK || len(r.Value.(string)) < 100 {
logResult := fs.Read(logFiles[0])
if !logResult.OK || len(logResult.Value.(string)) < 100 {
return
}
body := r.Value.(string)
logBody := logResult.Value.(string)
// Skip quota errors
if core.Contains(body, "QUOTA_EXHAUSTED") || core.Contains(body, "QuotaError") {
if core.Contains(logBody, "QUOTA_EXHAUSTED") || core.Contains(logBody, "QuotaError") {
return
}
// Only ingest if there are actual findings (file:line references)
findings := countFileRefs(body)
findings := countFileRefs(logBody)
if findings < 2 {
return // No meaningful findings
return
}
// Determine issue type from the template used
issueType := "task"
priority := "normal"
if core.Contains(body, "security") || core.Contains(body, "Security") {
if core.Contains(logBody, "security") || core.Contains(logBody, "Security") {
issueType = "bug"
priority = "high"
}
// Create a single issue per repo with all findings in the body
title := core.Sprintf("Scan findings for %s (%d items)", st.Repo, findings)
title := core.Sprintf("Scan findings for %s (%d items)", workspaceStatus.Repo, findings)
// Truncate body to reasonable size for issue description
description := body
if len(description) > 10000 {
description = core.Concat(description[:10000], "\n\n... (truncated, see full log in workspace)")
issueDescription := logBody
if len(issueDescription) > 10000 {
issueDescription = core.Concat(issueDescription[:10000], "\n\n... (truncated, see full log in workspace)")
}
s.createIssueViaAPI(st.Repo, title, description, issueType, priority, "scan")
s.createIssueViaAPI(title, issueDescription, issueType, priority)
}
// countFileRefs counts file:line references in the output (indicates real findings)
func countFileRefs(body string) int {
count := 0
for i := 0; i < len(body)-5; i++ {
if body[i] == '`' {
// Look for pattern: `file.go:123`
j := i + 1
for j < len(body) && body[j] != '`' && j-i < 100 {
j++
@ -82,20 +72,18 @@ func countFileRefs(body string) int {
return count
}
// createIssueViaAPI posts an issue to the lthn.sh API
func (s *PrepSubsystem) createIssueViaAPI(repo, title, description, issueType, priority, source string) {
func (s *PrepSubsystem) createIssueViaAPI(title, description, issueType, priority string) {
if s.brainKey == "" {
return
}
// Read the agent API key from file
r := fs.Read(core.JoinPath(HomeDir(), ".claude", "agent-api.key"))
if !r.OK {
apiKeyResult := fs.Read(core.JoinPath(HomeDir(), ".claude", "agent-api.key"))
if !apiKeyResult.OK {
return
}
apiKey := core.Trim(r.Value.(string))
apiKey := core.Trim(apiKeyResult.Value.(string))
payload := core.JSONMarshalString(map[string]string{
issuePayload := core.JSONMarshalString(map[string]string{
"title": title,
"description": description,
"type": issueType,
@ -103,5 +91,5 @@ func (s *PrepSubsystem) createIssueViaAPI(repo, title, description, issueType, p
"reporter": "cladius",
})
HTTPPost(context.Background(), core.Concat(s.brainURL, "/v1/issues"), payload, apiKey, "Bearer")
HTTPPost(context.Background(), core.Concat(s.brainURL, "/v1/issues"), issuePayload, apiKey, "Bearer")
}

View file

@ -220,7 +220,7 @@ func TestIngest_CreateIssueViaAPI_Good_Success(t *testing.T) {
failCount: make(map[string]int),
}
s.createIssueViaAPI("go-io", "Test Issue", "Description", "bug", "high", "scan")
s.createIssueViaAPI("Test Issue", "Description", "bug", "high")
assert.True(t, called)
}
@ -234,7 +234,7 @@ func TestIngest_CreateIssueViaAPI_Bad_NoBrainKey(t *testing.T) {
// Should return early without panic
assert.NotPanics(t, func() {
s.createIssueViaAPI("go-io", "Title", "Body", "task", "normal", "scan")
s.createIssueViaAPI("Title", "Body", "task", "normal")
})
}
@ -253,7 +253,7 @@ func TestIngest_CreateIssueViaAPI_Bad_NoAPIKey(t *testing.T) {
// Should return early — no API key file
assert.NotPanics(t, func() {
s.createIssueViaAPI("go-io", "Title", "Body", "task", "normal", "scan")
s.createIssueViaAPI("Title", "Body", "task", "normal")
})
}
@ -278,7 +278,7 @@ func TestIngest_CreateIssueViaAPI_Bad_ServerError(t *testing.T) {
// Should not panic even on server error
assert.NotPanics(t, func() {
s.createIssueViaAPI("go-io", "Title", "Body", "task", "normal", "scan")
s.createIssueViaAPI("Title", "Body", "task", "normal")
})
}
@ -344,7 +344,7 @@ func TestIngest_CreateIssueViaAPI_Ugly(t *testing.T) {
failCount: make(map[string]int),
}
s.createIssueViaAPI("go-io", "XSS Test", "<script>alert('xss')</script><b>bold</b>&amp;", "bug", "high", "scan")
s.createIssueViaAPI("XSS Test", "<script>alert('xss')</script><b>bold</b>&amp;", "bug", "high")
assert.True(t, called)
}

View file

@ -10,27 +10,21 @@ import (
"github.com/modelcontextprotocol/go-sdk/mcp"
)
// ScanInput is the input for agentic_scan.
//
// input := agentic.ScanInput{Org: "core", Labels: []string{"agentic", "bug"}, Limit: 20}
// input := agentic.ScanInput{Org: "core", Labels: []string{"agentic", "bug"}, Limit: 20}
type ScanInput struct {
Org string `json:"org,omitempty"` // default "core"
Labels []string `json:"labels,omitempty"` // filter by labels (default: agentic, help-wanted, bug)
Limit int `json:"limit,omitempty"` // max issues to return
Org string `json:"org,omitempty"`
Labels []string `json:"labels,omitempty"`
Limit int `json:"limit,omitempty"`
}
// ScanOutput is the output for agentic_scan.
//
// out := agentic.ScanOutput{Success: true, Count: 1, Issues: []agentic.ScanIssue{{Repo: "go-io", Number: 12}}}
// out := agentic.ScanOutput{Success: true, Count: 1, Issues: []agentic.ScanIssue{{Repo: "go-io", Number: 12}}}
type ScanOutput struct {
Success bool `json:"success"`
Count int `json:"count"`
Issues []ScanIssue `json:"issues"`
}
// ScanIssue is a single actionable issue.
//
// issue := agentic.ScanIssue{Repo: "go-io", Number: 12, Title: "Replace fmt.Errorf"}
// issue := agentic.ScanIssue{Repo: "go-io", Number: 12, Title: "Replace fmt.Errorf"}
type ScanIssue struct {
Repo string `json:"repo"`
Number int `json:"number"`
@ -57,15 +51,14 @@ func (s *PrepSubsystem) scan(ctx context.Context, _ *mcp.CallToolRequest, input
var allIssues []ScanIssue
// Get repos for the org
repos, err := s.listOrgRepos(ctx, input.Org)
if err != nil {
return nil, ScanOutput{}, err
}
for _, repo := range repos {
for _, repoName := range repos {
for _, label := range input.Labels {
issues, err := s.listRepoIssues(ctx, input.Org, repo, label)
issues, err := s.listRepoIssues(ctx, input.Org, repoName, label)
if err != nil {
continue
}
@ -80,7 +73,6 @@ func (s *PrepSubsystem) scan(ctx context.Context, _ *mcp.CallToolRequest, input
}
}
// Deduplicate by repo+number
seen := make(map[string]bool)
var unique []ScanIssue
for _, issue := range allIssues {
@ -109,20 +101,20 @@ func (s *PrepSubsystem) listOrgRepos(ctx context.Context, org string) ([]string,
}
var allNames []string
for _, r := range repos {
allNames = append(allNames, r.Name)
for _, repoInfo := range repos {
allNames = append(allNames, repoInfo.Name)
}
return allNames, nil
}
func (s *PrepSubsystem) listRepoIssues(ctx context.Context, org, repo, label string) ([]ScanIssue, error) {
u := core.Sprintf("%s/api/v1/repos/%s/%s/issues?state=open&limit=10&type=issues",
requestURL := core.Sprintf("%s/api/v1/repos/%s/%s/issues?state=open&limit=10&type=issues",
s.forgeURL, org, repo)
if label != "" {
u = core.Concat(u, "&labels=", url.QueryEscape(label))
requestURL = core.Concat(requestURL, "&labels=", url.QueryEscape(label))
}
r := HTTPGet(ctx, u, s.forgeToken, "token")
if !r.OK {
httpResult := HTTPGet(ctx, requestURL, s.forgeToken, "token")
if !httpResult.OK {
return nil, core.E("scan.listRepoIssues", core.Concat("failed to list issues for ", repo), nil)
}
@ -137,16 +129,16 @@ func (s *PrepSubsystem) listRepoIssues(ctx context.Context, org, repo, label str
} `json:"assignee"`
HTMLURL string `json:"html_url"`
}
if ur := core.JSONUnmarshalString(r.Value.(string), &issues); !ur.OK {
err, _ := ur.Value.(error)
if parseResult := core.JSONUnmarshalString(httpResult.Value.(string), &issues); !parseResult.OK {
err, _ := parseResult.Value.(error)
return nil, core.E("scan.listRepoIssues", "parse issues response", err)
}
var result []ScanIssue
for _, issue := range issues {
var labels []string
for _, l := range issue.Labels {
labels = append(labels, l.Name)
for _, labelInfo := range issue.Labels {
labels = append(labels, labelInfo.Name)
}
assignee := ""
if issue.Assignee != nil {

View file

@ -1,7 +1,6 @@
// SPDX-License-Identifier: EUPL-1.2
// Package monitor polls workspace state, repo drift, and agent inboxes, then
// pushes the current view to connected MCP clients.
// Package monitor keeps workspace state and inbox status visible to MCP clients.
//
// mon := monitor.New(monitor.Options{Interval: 30 * time.Second})
// mon.RegisterTools(server)
@ -20,10 +19,8 @@ import (
"github.com/modelcontextprotocol/go-sdk/mcp"
)
// fs provides unrestricted filesystem access (root "/" = no sandbox).
//
// r := fs.Read(core.JoinPath(wsRoot, name, "status.json"))
// if text, ok := resultString(r); ok { _ = core.JSONUnmarshalString(text, &st) }
// r := fs.Read(core.JoinPath(wsRoot, name, "status.json"))
// if text, ok := resultString(r); ok { _ = core.JSONUnmarshalString(text, &st) }
var fs = agentic.LocalFs()
type channelSender interface {
@ -61,10 +58,8 @@ func resultString(r core.Result) (string, bool) {
return value, true
}
// Subsystem owns the long-running monitor loop and MCP resource surface.
//
// mon := monitor.New()
// mon.Start(context.Background())
// mon := monitor.New()
// mon.Start(context.Background())
type Subsystem struct {
*core.ServiceRuntime[Options]
server *mcp.Server
@ -72,23 +67,19 @@ type Subsystem struct {
cancel context.CancelFunc
wg sync.WaitGroup
// Track last seen state to only notify on changes
lastCompletedCount int // completed workspaces seen on the last scan
seenCompleted map[string]bool // workspace names we've already notified about
seenRunning map[string]bool // workspace names we've already sent start notification for
completionsSeeded bool // true after first completions check
lastInboxMaxID int // highest message ID seen
inboxSeeded bool // true after first inbox check
lastSyncTimestamp int64
mu sync.Mutex
seenCompleted map[string]bool
seenRunning map[string]bool
completionsSeeded bool
lastInboxMaxID int
inboxSeeded bool
lastSyncTimestamp int64
mu sync.Mutex
// Event-driven poke channel — dispatch goroutine sends here on completion
poke chan struct{}
}
var _ coremcp.Subsystem = (*Subsystem)(nil)
// SetCore preserves direct test setup without going through core.WithService.
// Deprecated: prefer Register with core.WithService(monitor.Register).
//
// mon.SetCore(c)
@ -96,14 +87,12 @@ func (m *Subsystem) SetCore(c *core.Core) {
m.ServiceRuntime = core.NewServiceRuntime(c, Options{})
}
// handleAgentStarted tracks started agents.
func (m *Subsystem) handleAgentStarted(ev messages.AgentStarted) {
m.mu.Lock()
m.seenRunning[ev.Workspace] = true
m.mu.Unlock()
}
// handleAgentCompleted processes agent completion — emits notifications and checks queue drain.
func (m *Subsystem) handleAgentCompleted(ev messages.AgentCompleted) {
m.mu.Lock()
m.seenCompleted[ev.Workspace] = true
@ -113,10 +102,8 @@ func (m *Subsystem) handleAgentCompleted(ev messages.AgentCompleted) {
go m.checkIdleAfterDelay()
}
// HandleIPCEvents lets Core auto-wire monitor side-effects for IPC messages.
//
// c.ACTION(messages.AgentStarted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-5"})
// c.ACTION(messages.AgentCompleted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-5", Status: "completed"})
// c.ACTION(messages.AgentStarted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-5"})
// c.ACTION(messages.AgentCompleted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-5", Status: "completed"})
func (m *Subsystem) HandleIPCEvents(_ *core.Core, msg core.Message) core.Result {
switch ev := msg.(type) {
case messages.AgentCompleted:
@ -127,24 +114,18 @@ func (m *Subsystem) HandleIPCEvents(_ *core.Core, msg core.Message) core.Result
return core.Result{OK: true}
}
// Options configures the monitor polling interval.
//
// opts := monitor.Options{Interval: 30 * time.Second}
// mon := monitor.New(opts)
// opts := monitor.Options{Interval: 30 * time.Second}
// mon := monitor.New(opts)
type Options struct {
// Interval between checks (default: 2 minutes)
Interval time.Duration
}
// New builds the monitor with a polling interval and poke channel.
//
// mon := monitor.New(monitor.Options{Interval: 30 * time.Second})
// mon := monitor.New(monitor.Options{Interval: 30 * time.Second})
func New(opts ...Options) *Subsystem {
interval := 2 * time.Minute
if len(opts) > 0 && opts[0].Interval > 0 {
interval = opts[0].Interval
}
// Override via env for debugging
if envInterval := core.Env("MONITOR_INTERVAL"); envInterval != "" {
if d, err := time.ParseDuration(envInterval); err == nil {
interval = d
@ -158,23 +139,17 @@ func New(opts ...Options) *Subsystem {
}
}
// debugChannel logs a debug message.
func (m *Subsystem) debugChannel(msg string) {
func (m *Subsystem) debug(msg string) {
core.Debug(msg)
}
// Name keeps the monitor address stable for MCP and core.WithService lookups.
//
// name := mon.Name() // "monitor"
// name := mon.Name() // "monitor"
func (m *Subsystem) Name() string { return "monitor" }
// RegisterTools publishes the monitor status resource on an MCP server.
//
// mon.RegisterTools(server)
// mon.RegisterTools(server)
func (m *Subsystem) RegisterTools(server *mcp.Server) {
m.server = server
// Register a resource that clients can read for current status
server.AddResource(&mcp.Resource{
Name: "Agent Status",
URI: "status://agents",
@ -183,11 +158,9 @@ func (m *Subsystem) RegisterTools(server *mcp.Server) {
}, m.agentStatusResource)
}
// Start launches the background polling loop.
//
// mon.Start(ctx)
// mon.Start(ctx)
func (m *Subsystem) Start(ctx context.Context) {
monCtx, cancel := context.WithCancel(ctx)
monitorCtx, cancel := context.WithCancel(ctx)
m.cancel = cancel
core.Info("monitor: started (interval=%s)", m.interval)
@ -195,31 +168,25 @@ func (m *Subsystem) Start(ctx context.Context) {
m.wg.Add(1)
go func() {
defer m.wg.Done()
m.loop(monCtx)
m.loop(monitorCtx)
}()
}
// OnStartup starts the monitor when Core starts the service lifecycle.
//
// r := mon.OnStartup(context.Background())
// core.Println(r.OK)
// r := mon.OnStartup(context.Background())
// core.Println(r.OK)
func (m *Subsystem) OnStartup(ctx context.Context) core.Result {
m.Start(ctx)
return core.Result{OK: true}
}
// OnShutdown stops the monitor through the Core lifecycle hook.
//
// r := mon.OnShutdown(context.Background())
// core.Println(r.OK)
// r := mon.OnShutdown(context.Background())
// core.Println(r.OK)
func (m *Subsystem) OnShutdown(ctx context.Context) core.Result {
_ = m.Shutdown(ctx)
return core.Result{OK: true}
}
// Shutdown cancels the monitor loop and waits for the goroutine to exit.
//
// _ = mon.Shutdown(ctx)
// _ = mon.Shutdown(ctx)
func (m *Subsystem) Shutdown(_ context.Context) error {
if m.cancel != nil {
m.cancel()
@ -228,9 +195,7 @@ func (m *Subsystem) Shutdown(_ context.Context) error {
return nil
}
// Poke asks the loop to run a check immediately instead of waiting for the ticker.
//
// mon.Poke()
// mon.Poke()
func (m *Subsystem) Poke() {
select {
case m.poke <- struct{}{}:
@ -238,11 +203,8 @@ func (m *Subsystem) Poke() {
}
}
// checkIdleAfterDelay waits briefly then checks if the fleet is genuinely idle.
// Only emits queue.drained when there are truly zero running or queued agents,
// verified by checking managed processes are alive, not just trusting status files.
func (m *Subsystem) checkIdleAfterDelay() {
time.Sleep(5 * time.Second) // wait for queue drain to fill slots
time.Sleep(5 * time.Second)
if m.ServiceRuntime == nil {
return
}
@ -253,8 +215,6 @@ func (m *Subsystem) checkIdleAfterDelay() {
}
}
// countLiveWorkspaces counts workspaces that are genuinely active.
// For "running" status, verifies the managed process is still alive.
func (m *Subsystem) countLiveWorkspaces() (running, queued int) {
var runtime *core.Core
if m.ServiceRuntime != nil {
@ -262,17 +222,17 @@ func (m *Subsystem) countLiveWorkspaces() (running, queued int) {
}
for _, path := range agentic.WorkspaceStatusPaths() {
wsDir := core.PathDir(path)
r := agentic.ReadStatusResult(wsDir)
if !r.OK {
statusResult := agentic.ReadStatusResult(wsDir)
if !statusResult.OK {
continue
}
st, ok := r.Value.(*agentic.WorkspaceStatus)
if !ok || st == nil {
workspaceStatus, ok := statusResult.Value.(*agentic.WorkspaceStatus)
if !ok || workspaceStatus == nil {
continue
}
switch st.Status {
switch workspaceStatus.Status {
case "running":
if st.PID > 0 && processAlive(runtime, st.ProcessID, st.PID) {
if workspaceStatus.PID > 0 && processAlive(runtime, workspaceStatus.ProcessID, workspaceStatus.PID) {
running++
}
case "queued":
@ -282,23 +242,19 @@ func (m *Subsystem) countLiveWorkspaces() (running, queued int) {
return
}
// processAlive checks whether a managed process is still running.
func processAlive(c *core.Core, processID string, pid int) bool {
return agentic.ProcessAlive(c, processID, pid)
}
func (m *Subsystem) loop(ctx context.Context) {
// Initial check after short delay (let server fully start)
select {
case <-ctx.Done():
return
case <-time.After(5 * time.Second):
}
// Initialise sync timestamp to now (don't pull everything on first run)
m.initSyncTimestamp()
// Run first check immediately
m.check(ctx)
ticker := time.NewTicker(m.interval)
@ -319,27 +275,22 @@ func (m *Subsystem) loop(ctx context.Context) {
func (m *Subsystem) check(ctx context.Context) {
var messages []string
// Check agent completions
if msg := m.checkCompletions(); msg != "" {
messages = append(messages, msg)
}
// Harvest completed workspaces — push branches, check for binaries
if msg := m.harvestCompleted(); msg != "" {
messages = append(messages, msg)
}
// Check inbox
if msg := m.checkInbox(); msg != "" {
messages = append(messages, msg)
}
// Sync repos from other agents' changes
if msg := m.syncRepos(); msg != "" {
messages = append(messages, msg)
}
// Only notify if there's something new
if len(messages) == 0 {
return
}
@ -347,7 +298,6 @@ func (m *Subsystem) check(ctx context.Context) {
combined := core.Join("\n", messages...)
m.notify(ctx, combined)
// Notify resource subscribers that agent status changed
if m.server != nil {
m.server.ResourceUpdated(ctx, &mcp.ResourceUpdatedNotificationParams{
URI: "status://agents",
@ -355,9 +305,6 @@ func (m *Subsystem) check(ctx context.Context) {
}
}
// checkCompletions scans workspace for newly completed agents.
// Tracks by workspace name (not count) so harvest status rewrites
// don't suppress future notifications.
func (m *Subsystem) checkCompletions() string {
entries := agentic.WorkspaceStatusPaths()
@ -369,39 +316,38 @@ func (m *Subsystem) checkCompletions() string {
m.mu.Lock()
seeded := m.completionsSeeded
for _, entry := range entries {
r := fs.Read(entry)
if !r.OK {
entryResult := fs.Read(entry)
if !entryResult.OK {
continue
}
entryData, ok := resultString(r)
entryData, ok := resultString(entryResult)
if !ok {
continue
}
var st struct {
var workspaceStatus struct {
Status string `json:"status"`
Repo string `json:"repo"`
Agent string `json:"agent"`
}
if r := core.JSONUnmarshalString(entryData, &st); !r.OK {
if parseResult := core.JSONUnmarshalString(entryData, &workspaceStatus); !parseResult.OK {
continue
}
wsName := agentic.WorkspaceName(core.PathDir(entry))
switch st.Status {
switch workspaceStatus.Status {
case "completed":
completed++
if !m.seenCompleted[wsName] {
m.seenCompleted[wsName] = true
if seeded {
newlyCompleted = append(newlyCompleted, core.Sprintf("%s (%s)", st.Repo, st.Agent))
newlyCompleted = append(newlyCompleted, core.Sprintf("%s (%s)", workspaceStatus.Repo, workspaceStatus.Agent))
}
}
case "running":
running++
if !m.seenRunning[wsName] && seeded {
m.seenRunning[wsName] = true
// No individual start notification — too noisy
}
case "queued":
queued++
@ -409,12 +355,11 @@ func (m *Subsystem) checkCompletions() string {
if !m.seenCompleted[wsName] {
m.seenCompleted[wsName] = true
if seeded {
newlyCompleted = append(newlyCompleted, core.Sprintf("%s (%s) [%s]", st.Repo, st.Agent, st.Status))
newlyCompleted = append(newlyCompleted, core.Sprintf("%s (%s) [%s]", workspaceStatus.Repo, workspaceStatus.Agent, workspaceStatus.Status))
}
}
}
}
m.lastCompletedCount = completed
m.completionsSeeded = true
m.mu.Unlock()
@ -422,7 +367,6 @@ func (m *Subsystem) checkCompletions() string {
return ""
}
// Only emit queue.drained when genuinely empty — verified by live process checks
liveRunning, liveQueued := m.countLiveWorkspaces()
if m.ServiceRuntime != nil && liveRunning == 0 && liveQueued == 0 {
m.Core().ACTION(messages.QueueDrained{Completed: len(newlyCompleted)})
@ -438,18 +382,16 @@ func (m *Subsystem) checkCompletions() string {
return msg
}
// checkInbox checks for unread messages.
func (m *Subsystem) checkInbox() string {
apiKeyStr := monitorBrainKey()
if apiKeyStr == "" {
return ""
}
// Call the API to check inbox
apiURL := monitorAPIURL()
inboxURL := core.Concat(apiURL, "/v1/messages/inbox?agent=", url.QueryEscape(agentic.AgentName()))
hr := agentic.HTTPGet(context.Background(), inboxURL, core.Trim(apiKeyStr), "Bearer")
if !hr.OK {
httpResult := agentic.HTTPGet(context.Background(), inboxURL, core.Trim(apiKeyStr), "Bearer")
if !httpResult.OK {
return ""
}
@ -462,12 +404,11 @@ func (m *Subsystem) checkInbox() string {
Content string `json:"content"`
} `json:"data"`
}
if r := core.JSONUnmarshalString(hr.Value.(string), &resp); !r.OK {
m.debugChannel("checkInbox: failed to decode response")
if parseResult := core.JSONUnmarshalString(httpResult.Value.(string), &resp); !parseResult.OK {
m.debug("checkInbox: failed to decode response")
return ""
}
// Find max ID, count unread, collect new messages
maxID := 0
unread := 0
@ -484,20 +425,19 @@ func (m *Subsystem) checkInbox() string {
}
var newMessages []newMessage
for _, msg := range resp.Data {
if msg.ID > maxID {
maxID = msg.ID
for _, message := range resp.Data {
if message.ID > maxID {
maxID = message.ID
}
if !msg.Read {
if !message.Read {
unread++
}
// Collect messages newer than what we've seen
if msg.ID > prevMaxID {
if message.ID > prevMaxID {
newMessages = append(newMessages, newMessage{
ID: msg.ID,
From: msg.From,
Subject: msg.Subject,
Content: msg.Content,
ID: message.ID,
From: message.From,
Subject: message.Subject,
Content: message.Content,
})
}
}
@ -507,24 +447,21 @@ func (m *Subsystem) checkInbox() string {
m.inboxSeeded = true
m.mu.Unlock()
// First check after startup: seed, don't fire
if !seeded {
return ""
}
// Only fire if there are new messages (higher ID than last seen)
if maxID <= prevMaxID || len(newMessages) == 0 {
return ""
}
// Push each message as a channel event so it lands in the session
if m.ServiceRuntime != nil {
if notifier, ok := core.ServiceFor[channelSender](m.Core(), "mcp"); ok {
for _, msg := range newMessages {
for _, message := range newMessages {
notifier.ChannelSend(context.Background(), "inbox.message", map[string]any{
"from": msg.From,
"subject": msg.Subject,
"content": msg.Content,
"from": message.From,
"subject": message.Subject,
"content": message.Content,
})
}
}
@ -533,15 +470,13 @@ func (m *Subsystem) checkInbox() string {
return core.Sprintf("%d unread message(s) in inbox", unread)
}
// notify sends a log notification to all connected MCP sessions.
func (m *Subsystem) notify(ctx context.Context, message string) {
if m.server == nil {
return
}
// Use the server's session list to broadcast
for ss := range m.server.Sessions() {
ss.Log(ctx, &mcp.LoggingMessageParams{
for session := range m.server.Sessions() {
session.Log(ctx, &mcp.LoggingMessageParams{
Level: "info",
Logger: "monitor",
Data: message,
@ -549,11 +484,10 @@ func (m *Subsystem) notify(ctx context.Context, message string) {
}
}
// agentStatusResource returns current workspace status as a JSON resource.
func (m *Subsystem) agentStatusResource(ctx context.Context, req *mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) {
func (m *Subsystem) agentStatusResource(_ context.Context, _ *mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) {
entries := agentic.WorkspaceStatusPaths()
type wsInfo struct {
type workspaceInfo struct {
Name string `json:"name"`
Status string `json:"status"`
Repo string `json:"repo"`
@ -561,31 +495,31 @@ func (m *Subsystem) agentStatusResource(ctx context.Context, req *mcp.ReadResour
PRURL string `json:"pr_url,omitempty"`
}
var workspaces []wsInfo
var workspaces []workspaceInfo
for _, entry := range entries {
r := fs.Read(entry)
if !r.OK {
entryResult := fs.Read(entry)
if !entryResult.OK {
continue
}
entryData, ok := resultString(r)
entryData, ok := resultString(entryResult)
if !ok {
continue
}
var st struct {
var workspaceStatus struct {
Status string `json:"status"`
Repo string `json:"repo"`
Agent string `json:"agent"`
PRURL string `json:"pr_url"`
}
if r := core.JSONUnmarshalString(entryData, &st); !r.OK {
if parseResult := core.JSONUnmarshalString(entryData, &workspaceStatus); !parseResult.OK {
continue
}
workspaces = append(workspaces, wsInfo{
workspaces = append(workspaces, workspaceInfo{
Name: agentic.WorkspaceName(core.PathDir(entry)),
Status: st.Status,
Repo: st.Repo,
Agent: st.Agent,
PRURL: st.PRURL,
Status: workspaceStatus.Status,
Repo: workspaceStatus.Repo,
Agent: workspaceStatus.Agent,
PRURL: workspaceStatus.PRURL,
})
}

View file

@ -11,51 +11,41 @@ import (
core "dappco.re/go/core"
)
// CheckinResponse is what the API returns for an agent checkin.
//
// resp := monitor.CheckinResponse{Changed: []monitor.ChangedRepo{{Repo: "core-agent", Branch: "main", SHA: "abc123"}}, Timestamp: 1712345678}
// resp := monitor.CheckinResponse{Changed: []monitor.ChangedRepo{{Repo: "core-agent", Branch: "main", SHA: "abc123"}}, Timestamp: 1712345678}
type CheckinResponse struct {
// Repos that have new commits since the agent's last checkin.
Changed []ChangedRepo `json:"changed,omitempty"`
// Server timestamp — use as "since" on next checkin.
Timestamp int64 `json:"timestamp"`
Changed []ChangedRepo `json:"changed,omitempty"`
Timestamp int64 `json:"timestamp"`
}
// ChangedRepo is a repo that has new commits.
//
// repo := monitor.ChangedRepo{Repo: "core-agent", Branch: "main", SHA: "abc123"}
// repo := monitor.ChangedRepo{Repo: "core-agent", Branch: "main", SHA: "abc123"}
type ChangedRepo struct {
Repo string `json:"repo"`
Branch string `json:"branch"`
SHA string `json:"sha"`
}
// syncRepos calls the checkin API and pulls any repos that changed.
// Returns a human-readable message if repos were updated, empty string otherwise.
func (m *Subsystem) syncRepos() string {
agentName := agentic.AgentName()
checkinURL := core.Sprintf("%s/v1/agent/checkin?agent=%s&since=%d", monitorAPIURL(), url.QueryEscape(agentName), m.lastSyncTimestamp)
brainKey := monitorBrainKey()
hr := agentic.HTTPGet(context.Background(), checkinURL, brainKey, "Bearer")
if !hr.OK {
httpResult := agentic.HTTPGet(context.Background(), checkinURL, brainKey, "Bearer")
if !httpResult.OK {
return ""
}
var checkin CheckinResponse
if r := core.JSONUnmarshalString(hr.Value.(string), &checkin); !r.OK {
if parseResult := core.JSONUnmarshalString(httpResult.Value.(string), &checkin); !parseResult.OK {
return ""
}
if len(checkin.Changed) == 0 {
// No changes — safe to advance timestamp
m.mu.Lock()
m.lastSyncTimestamp = checkin.Timestamp
m.mu.Unlock()
return ""
}
// Pull changed repos
basePath := core.Env("CODE_PATH")
if basePath == "" {
basePath = core.JoinPath(agentic.HomeDir(), "Code", "core")
@ -63,7 +53,6 @@ func (m *Subsystem) syncRepos() string {
var pulled []string
for _, repo := range checkin.Changed {
// Sanitise repo name to prevent path traversal from API response
repoName := core.PathBase(core.Replace(repo.Repo, "\\", "/"))
if repoName == "." || repoName == ".." || repoName == "" {
continue
@ -73,38 +62,30 @@ func (m *Subsystem) syncRepos() string {
continue
}
// Check if on the default branch and clean
current := m.gitOutput(repoDir, "rev-parse", "--abbrev-ref", "HEAD")
if current == "" {
continue
}
// Determine which branch to pull — use server-reported branch,
// fall back to current if server didn't specify
targetBranch := repo.Branch
if targetBranch == "" {
targetBranch = current
}
// Only pull if we're on the target branch (or it's a default branch)
if current != targetBranch {
continue // On a different branch — skip
continue
}
status := m.gitOutput(repoDir, "status", "--porcelain")
if len(status) > 0 {
continue // Don't pull if dirty
continue
}
// Fast-forward pull the target branch
if m.gitOK(repoDir, "pull", "--ff-only", "origin", targetBranch) {
pulled = append(pulled, repo.Repo)
}
}
// Only advance timestamp if we handled all reported repos.
// If any were skipped (dirty, wrong branch, missing), keep the
// old timestamp so the server reports them again next cycle.
skipped := len(checkin.Changed) - len(pulled)
if skipped == 0 {
m.mu.Lock()
@ -119,8 +100,6 @@ func (m *Subsystem) syncRepos() string {
return core.Sprintf("Synced %d repo(s): %s", len(pulled), core.Join(", ", pulled...))
}
// lastSyncTimestamp is stored on the subsystem — add it via the check cycle.
// Initialised to "now" on first run so we don't pull everything on startup.
func (m *Subsystem) initSyncTimestamp() {
m.mu.Lock()
if m.lastSyncTimestamp == 0 {