agent/pkg/monitor/monitor.go
Snider f83c753277 feat(v0.8.0): full AX migration — ServiceRuntime, Actions, quality gates, transport
go-process:
- Register factory, Result lifecycle, 5 named Action handlers
- Start/Run/StartWithOptions/RunWithOptions all return core.Result
- core.ID() replaces fmt.Sprintf, core.As replaces errors.As

core/agent:
- PrepSubsystem + monitor.Subsystem + setup.Service embed ServiceRuntime[T]
- 22 named Actions + agent.completion Task pipeline in OnStartup
- ChannelNotifier removed — all IPC via c.ACTION(messages.X{})
- proc.go: all methods via s.Core().Process(), returns core.Result
- status.go: WriteAtomic + JSONMarshalString
- paths.go: Fs.NewUnrestricted() replaces unsafe.Pointer
- transport.go: ONE net/http file — HTTPGet/HTTPPost/HTTPDo/MCP transport
- All disallowed imports eliminated from source files (13 quality gates)
- String concat eliminated — core.Concat() throughout
- 1:1 _test.go + _example_test.go for every source file
- Reference docs synced from core/go v0.8.0
- RFC-025 updated with net/http, net/url, io/fs quality gates
- lib.go: io/fs eliminated via Data.ListNames, Array[T].Deduplicate

Co-Authored-By: Virgil <virgil@lethean.io>
2026-03-26 01:27:46 +00:00

619 lines
15 KiB
Go

// SPDX-License-Identifier: EUPL-1.2
// Package monitor provides a background subsystem that watches the ecosystem
// and pushes notifications to connected MCP clients.
//
// Checks performed on each tick:
// - Agent completions: scans workspace for newly completed agents
// - Repo drift: checks forge for repos with unpushed/unpulled changes
// - Inbox: checks for unread agent messages
package monitor
import (
"context"
"sync"
"syscall"
"time"
"dappco.re/go/agent/pkg/agentic"
"dappco.re/go/agent/pkg/messages"
core "dappco.re/go/core"
coremcp "dappco.re/go/mcp/pkg/mcp"
"github.com/modelcontextprotocol/go-sdk/mcp"
)
// fs provides unrestricted filesystem access (root "/" = no sandbox).
//
// r := fs.Read(core.Concat(wsRoot, "/", name, "/status.json"))
// if text, ok := resultString(r); ok { json.Unmarshal([]byte(text), &st) }
var fs = agentic.LocalFs()
// workspaceStatusPaths returns all status.json files across both old and new workspace layouts.
// Old: workspace/{name}/status.json (1 level)
// New: workspace/{org}/{repo}/{identifier}/status.json (3 levels)
func workspaceStatusPaths(wsRoot string) []string {
old := core.PathGlob(core.Concat(wsRoot, "/*/status.json"))
new := core.PathGlob(core.Concat(wsRoot, "/*/*/*/status.json"))
return append(old, new...)
}
func workspaceStatusPath(wsDir string) string {
return core.Concat(wsDir, "/status.json")
}
func brainKeyPath(home string) string {
return core.JoinPath(home, ".claude", "brain.key")
}
func monitorPath(path string) string {
ds := core.Env("DS")
return core.Replace(core.Replace(path, "\\", ds), "/", ds)
}
func monitorHomeDir() string {
if d := core.Env("CORE_HOME"); d != "" {
return d
}
return core.Env("DIR_HOME")
}
func monitorAPIURL() string {
if u := core.Env("CORE_API_URL"); u != "" {
return u
}
return "https://api.lthn.sh"
}
func monitorBrainKey() string {
if k := core.Env("CORE_BRAIN_KEY"); k != "" {
return k
}
if r := fs.Read(brainKeyPath(monitorHomeDir())); r.OK {
if value, ok := resultString(r); ok {
return core.Trim(value)
}
}
return ""
}
func resultString(r core.Result) (string, bool) {
value, ok := r.Value.(string)
if !ok {
return "", false
}
return value, true
}
// MonitorOptions configures the monitor service.
type MonitorOptions struct{}
// Subsystem implements mcp.Subsystem for background monitoring.
//
// core.New(core.WithService(monitor.Register))
type Subsystem struct {
*core.ServiceRuntime[MonitorOptions]
server *mcp.Server
interval time.Duration
cancel context.CancelFunc
wg sync.WaitGroup
// Track last seen state to only notify on changes
lastCompletedCount int // completed workspaces seen on the last scan
seenCompleted map[string]bool // workspace names we've already notified about
seenRunning map[string]bool // workspace names we've already sent start notification for
completionsSeeded bool // true after first completions check
lastInboxMaxID int // highest message ID seen
inboxSeeded bool // true after first inbox check
lastSyncTimestamp int64
mu sync.Mutex
// Event-driven poke channel — dispatch goroutine sends here on completion
poke chan struct{}
}
var _ coremcp.Subsystem = (*Subsystem)(nil)
// SetCore wires the Core framework instance via ServiceRuntime.
// Deprecated: Use Register with core.WithService(monitor.Register) instead.
//
// mon.SetCore(c)
func (m *Subsystem) SetCore(c *core.Core) {
m.ServiceRuntime = core.NewServiceRuntime(c, MonitorOptions{})
}
// handleAgentStarted tracks started agents.
func (m *Subsystem) handleAgentStarted(ev messages.AgentStarted) {
m.mu.Lock()
m.seenRunning[ev.Workspace] = true
m.mu.Unlock()
}
// handleAgentCompleted processes agent completion — emits notifications and checks queue drain.
func (m *Subsystem) handleAgentCompleted(ev messages.AgentCompleted) {
m.mu.Lock()
m.seenCompleted[ev.Workspace] = true
m.mu.Unlock()
m.Poke()
go m.checkIdleAfterDelay()
}
// Options configures the monitor interval.
//
// monitor.New(monitor.Options{Interval: 30 * time.Second})
type Options struct {
// Interval between checks (default: 2 minutes)
Interval time.Duration
}
// New creates a monitor subsystem.
//
// mon := monitor.New(monitor.Options{Interval: 30 * time.Second})
func New(opts ...Options) *Subsystem {
interval := 2 * time.Minute
if len(opts) > 0 && opts[0].Interval > 0 {
interval = opts[0].Interval
}
// Override via env for debugging
if envInterval := core.Env("MONITOR_INTERVAL"); envInterval != "" {
if d, err := time.ParseDuration(envInterval); err == nil {
interval = d
}
}
return &Subsystem{
interval: interval,
poke: make(chan struct{}, 1),
seenCompleted: make(map[string]bool),
seenRunning: make(map[string]bool),
}
}
// debugChannel logs a debug message.
func (m *Subsystem) debugChannel(msg string) {
core.Debug(msg)
}
// Name returns the subsystem identifier used by MCP registration.
//
// mon.Name() // "monitor"
func (m *Subsystem) Name() string { return "monitor" }
// RegisterTools binds the monitor resource to an MCP server.
//
// mon.RegisterTools(server)
func (m *Subsystem) RegisterTools(server *mcp.Server) {
m.server = server
// Register a resource that clients can read for current status
server.AddResource(&mcp.Resource{
Name: "Agent Status",
URI: "status://agents",
Description: "Current status of all agent workspaces",
MIMEType: "application/json",
}, m.agentStatusResource)
}
// Start begins the background monitoring loop after MCP startup.
//
// mon.Start(ctx)
func (m *Subsystem) Start(ctx context.Context) {
monCtx, cancel := context.WithCancel(ctx)
m.cancel = cancel
core.Info( "monitor: started (interval=%s)", m.interval)
m.wg.Add(1)
go func() {
defer m.wg.Done()
m.loop(monCtx)
}()
}
// OnStartup implements core.Startable — starts the monitoring loop.
func (m *Subsystem) OnStartup(ctx context.Context) core.Result {
m.Start(ctx)
return core.Result{OK: true}
}
// OnShutdown implements core.Stoppable — stops the monitoring loop.
func (m *Subsystem) OnShutdown(ctx context.Context) core.Result {
_ = m.Shutdown(ctx)
return core.Result{OK: true}
}
// Shutdown stops the monitoring loop and waits for it to exit.
//
// _ = mon.Shutdown(ctx)
func (m *Subsystem) Shutdown(_ context.Context) error {
if m.cancel != nil {
m.cancel()
}
m.wg.Wait()
return nil
}
// Poke triggers an immediate check cycle (legacy — prefer AgentStarted/AgentCompleted).
func (m *Subsystem) Poke() {
select {
case m.poke <- struct{}{}:
default:
}
}
// checkIdleAfterDelay waits briefly then checks if the fleet is genuinely idle.
// Only emits queue.drained when there are truly zero running or queued agents,
// verified by checking PIDs are alive, not just trusting status files.
func (m *Subsystem) checkIdleAfterDelay() {
time.Sleep(5 * time.Second) // wait for queue drain to fill slots
if m.ServiceRuntime == nil {
return
}
running, queued := m.countLiveWorkspaces()
if running == 0 && queued == 0 {
m.Core().ACTION(messages.QueueDrained{Completed: 0})
}
}
// countLiveWorkspaces counts workspaces that are genuinely active.
// For "running" status, verifies the PID is still alive.
func (m *Subsystem) countLiveWorkspaces() (running, queued int) {
wsRoot := agentic.WorkspaceRoot()
old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json"))
deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json"))
for _, path := range append(old, deep...) {
wsDir := core.PathDir(path)
st, err := agentic.ReadStatus(wsDir)
if err != nil {
continue
}
switch st.Status {
case "running":
if st.PID > 0 && pidAlive(st.PID) {
running++
}
case "queued":
queued++
}
}
return
}
// pidAlive checks whether a process is still running.
func pidAlive(pid int) bool {
if pid <= 0 {
return false
}
return syscall.Kill(pid, 0) == nil
}
func (m *Subsystem) loop(ctx context.Context) {
// Initial check after short delay (let server fully start)
select {
case <-ctx.Done():
return
case <-time.After(5 * time.Second):
}
// Initialise sync timestamp to now (don't pull everything on first run)
m.initSyncTimestamp()
// Run first check immediately
m.check(ctx)
ticker := time.NewTicker(m.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
m.check(ctx)
case <-m.poke:
m.check(ctx)
}
}
}
func (m *Subsystem) check(ctx context.Context) {
var messages []string
// Check agent completions
if msg := m.checkCompletions(); msg != "" {
messages = append(messages, msg)
}
// Harvest completed workspaces — push branches, check for binaries
if msg := m.harvestCompleted(); msg != "" {
messages = append(messages, msg)
}
// Check inbox
if msg := m.checkInbox(); msg != "" {
messages = append(messages, msg)
}
// Sync repos from other agents' changes
if msg := m.syncRepos(); msg != "" {
messages = append(messages, msg)
}
// Only notify if there's something new
if len(messages) == 0 {
return
}
combined := core.Join("\n", messages...)
m.notify(ctx, combined)
// Notify resource subscribers that agent status changed
if m.server != nil {
m.server.ResourceUpdated(ctx, &mcp.ResourceUpdatedNotificationParams{
URI: "status://agents",
})
}
}
// checkCompletions scans workspace for newly completed agents.
// Tracks by workspace name (not count) so harvest status rewrites
// don't suppress future notifications.
func (m *Subsystem) checkCompletions() string {
wsRoot := agentic.WorkspaceRoot()
entries := workspaceStatusPaths(wsRoot)
running := 0
queued := 0
completed := 0
var newlyCompleted []string
m.mu.Lock()
seeded := m.completionsSeeded
for _, entry := range entries {
r := fs.Read(entry)
if !r.OK {
continue
}
entryData, ok := resultString(r)
if !ok {
continue
}
var st struct {
Status string `json:"status"`
Repo string `json:"repo"`
Agent string `json:"agent"`
}
if r := core.JSONUnmarshalString(entryData, &st); !r.OK {
continue
}
// Use full relative path as dedup key — "core/go/main" not just "main"
wsDir := core.PathDir(entry)
wsName := wsDir
if len(wsDir) > len(wsRoot)+1 {
wsName = wsDir[len(wsRoot)+1:]
}
switch st.Status {
case "completed":
completed++
if !m.seenCompleted[wsName] {
m.seenCompleted[wsName] = true
if seeded {
newlyCompleted = append(newlyCompleted, core.Sprintf("%s (%s)", st.Repo, st.Agent))
}
}
case "running":
running++
if !m.seenRunning[wsName] && seeded {
m.seenRunning[wsName] = true
// No individual start notification — too noisy
}
case "queued":
queued++
case "blocked", "failed":
if !m.seenCompleted[wsName] {
m.seenCompleted[wsName] = true
if seeded {
newlyCompleted = append(newlyCompleted, core.Sprintf("%s (%s) [%s]", st.Repo, st.Agent, st.Status))
}
}
}
}
m.lastCompletedCount = completed
m.completionsSeeded = true
m.mu.Unlock()
if len(newlyCompleted) == 0 {
return ""
}
// Only emit queue.drained when genuinely empty — verified by live PID check
liveRunning, liveQueued := m.countLiveWorkspaces()
if m.ServiceRuntime != nil && liveRunning == 0 && liveQueued == 0 {
m.Core().ACTION(messages.QueueDrained{Completed: len(newlyCompleted)})
}
msg := core.Sprintf("%d agent(s) completed", len(newlyCompleted))
if running > 0 {
msg = core.Concat(msg, core.Sprintf(", %d still running", running))
}
if queued > 0 {
msg = core.Concat(msg, core.Sprintf(", %d queued", queued))
}
return msg
}
// checkInbox checks for unread messages.
func (m *Subsystem) checkInbox() string {
apiKeyStr := core.Env("CORE_BRAIN_KEY")
if apiKeyStr == "" {
home := core.Env("DIR_HOME")
keyFile := brainKeyPath(home)
r := fs.Read(keyFile)
if !r.OK {
return ""
}
value, ok := resultString(r)
if !ok {
return ""
}
apiKeyStr = value
}
// Call the API to check inbox
apiURL := core.Env("CORE_API_URL")
if apiURL == "" {
apiURL = "https://api.lthn.sh"
}
inboxURL := core.Concat(apiURL, "/v1/messages/inbox?agent=", core.Replace(agentic.AgentName(), " ", "%20"))
hr := agentic.HTTPGet(context.Background(), inboxURL, core.Trim(apiKeyStr), "Bearer")
if !hr.OK {
return ""
}
var resp struct {
Data []struct {
ID int `json:"id"`
Read bool `json:"read"`
From string `json:"from"`
Subject string `json:"subject"`
Content string `json:"content"`
} `json:"data"`
}
if r := core.JSONUnmarshalString(hr.Value.(string), &resp); !r.OK {
m.debugChannel("checkInbox: failed to decode response")
return ""
}
// Find max ID, count unread, collect new messages
maxID := 0
unread := 0
m.mu.Lock()
prevMaxID := m.lastInboxMaxID
seeded := m.inboxSeeded
m.mu.Unlock()
type newMessage struct {
ID int `json:"id"`
From string `json:"from"`
Subject string `json:"subject"`
Content string `json:"content"`
}
var newMessages []newMessage
for _, msg := range resp.Data {
if msg.ID > maxID {
maxID = msg.ID
}
if !msg.Read {
unread++
}
// Collect messages newer than what we've seen
if msg.ID > prevMaxID {
newMessages = append(newMessages, newMessage{
ID: msg.ID,
From: msg.From,
Subject: msg.Subject,
Content: msg.Content,
})
}
}
m.mu.Lock()
m.lastInboxMaxID = maxID
m.inboxSeeded = true
m.mu.Unlock()
// First check after startup: seed, don't fire
if !seeded {
return ""
}
// Only fire if there are new messages (higher ID than last seen)
if maxID <= prevMaxID || len(newMessages) == 0 {
return ""
}
// Push channel event with full message content
if m.ServiceRuntime != nil {
m.Core().ACTION(messages.InboxMessage{New: len(newMessages), Total: unread})
}
return core.Sprintf("%d unread message(s) in inbox", unread)
}
// notify sends a log notification to all connected MCP sessions.
func (m *Subsystem) notify(ctx context.Context, message string) {
if m.server == nil {
return
}
// Use the server's session list to broadcast
for ss := range m.server.Sessions() {
ss.Log(ctx, &mcp.LoggingMessageParams{
Level: "info",
Logger: "monitor",
Data: message,
})
}
}
// agentStatusResource returns current workspace status as a JSON resource.
func (m *Subsystem) agentStatusResource(ctx context.Context, req *mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) {
wsRoot := agentic.WorkspaceRoot()
entries := workspaceStatusPaths(wsRoot)
type wsInfo struct {
Name string `json:"name"`
Status string `json:"status"`
Repo string `json:"repo"`
Agent string `json:"agent"`
PRURL string `json:"pr_url,omitempty"`
}
var workspaces []wsInfo
for _, entry := range entries {
r := fs.Read(entry)
if !r.OK {
continue
}
entryData, ok := resultString(r)
if !ok {
continue
}
var st struct {
Status string `json:"status"`
Repo string `json:"repo"`
Agent string `json:"agent"`
PRURL string `json:"pr_url"`
}
if r := core.JSONUnmarshalString(entryData, &st); !r.OK {
continue
}
entryDir := core.PathDir(entry)
entryName := entryDir
if len(entryDir) > len(wsRoot)+1 {
entryName = entryDir[len(wsRoot)+1:]
}
workspaces = append(workspaces, wsInfo{
Name: entryName,
Status: st.Status,
Repo: st.Repo,
Agent: st.Agent,
PRURL: st.PRURL,
})
}
return &mcp.ReadResourceResult{
Contents: []*mcp.ResourceContents{
{
URI: "status://agents",
MIMEType: "application/json",
Text: core.JSONMarshalString(workspaces),
},
},
}, nil
}