diff --git a/pkg/monitor/harvest.go b/pkg/monitor/harvest.go index 8166ba0..8f6fad3 100644 --- a/pkg/monitor/harvest.go +++ b/pkg/monitor/harvest.go @@ -12,9 +12,7 @@ package monitor import ( "context" "encoding/json" - "os" "os/exec" - "path/filepath" "strconv" "dappco.re/go/agent/pkg/agentic" @@ -33,15 +31,12 @@ type harvestResult struct { // branches back to the source repos. Returns a summary message. func (m *Subsystem) harvestCompleted() string { wsRoot := agentic.WorkspaceRoot() - entries, err := filepath.Glob(workspaceStatusGlob(wsRoot)) - if err != nil { - return "" - } + entries := core.PathGlob(workspaceStatusGlob(wsRoot)) var harvested []harvestResult for _, entry := range entries { - wsDir := filepath.Dir(entry) + wsDir := core.PathDir(monitorPath(entry)) result := m.harvestWorkspace(wsDir) if result != nil { harvested = append(harvested, *result) @@ -103,7 +98,7 @@ func (m *Subsystem) harvestWorkspace(wsDir string) *harvestResult { } srcDir := core.Concat(wsDir, "/src") - if _, err := os.Stat(srcDir); err != nil { + if !fs.Exists(srcDir) || fs.IsFile(srcDir) { return nil } @@ -205,12 +200,8 @@ func countUnpushed(srcDir, branch string) int { // Checks ALL changed files (added, modified, renamed), not just new. // Fails closed: if git diff fails, rejects the workspace. func checkSafety(srcDir string) string { - // Check all changed files — added, modified, renamed - base := defaultBranch(srcDir) - cmd := exec.Command("git", "diff", "--name-only", core.Concat(base, "...HEAD")) - cmd.Dir = srcDir - out, err := cmd.Output() - if err != nil { + files, ok := changedFilesSinceDefault(srcDir) + if !ok { return "safety check failed: git diff error" } @@ -224,20 +215,18 @@ func checkSafety(srcDir string) string { ".db": true, ".sqlite": true, ".sqlite3": true, } - for _, file := range core.Split(core.Trim(string(out)), "\n") { - if file == "" { - continue - } - ext := core.Lower(filepath.Ext(file)) + for _, file := range files { + ext := core.Lower(core.PathExt(file)) if binaryExts[ext] { return core.Sprintf("binary file added: %s", file) } // Check file size (reject > 1MB) fullPath := core.Concat(srcDir, "/", file) - info, err := os.Stat(fullPath) - if err == nil && info.Size() > 1024*1024 { - return core.Sprintf("large file: %s (%d bytes)", file, info.Size()) + r := fs.Stat(fullPath) + size, ok := resultFileSize(r) + if ok && size > 1024*1024 { + return core.Sprintf("large file: %s (%d bytes)", file, size) } } @@ -246,18 +235,26 @@ func checkSafety(srcDir string) string { // countChangedFiles returns the number of files changed vs the default branch. func countChangedFiles(srcDir string) int { + files, ok := changedFilesSinceDefault(srcDir) + if !ok { + return 0 + } + return len(files) +} + +func changedFilesSinceDefault(srcDir string) ([]string, bool) { base := defaultBranch(srcDir) cmd := exec.Command("git", "diff", "--name-only", core.Concat(base, "...HEAD")) cmd.Dir = srcDir out, err := cmd.Output() if err != nil { - return 0 + return nil, false } lines := core.Split(core.Trim(string(out)), "\n") if len(lines) == 1 && lines[0] == "" { - return 0 + return nil, true } - return len(lines) + return lines, true } // pushBranch pushes the agent's branch to origin. @@ -271,6 +268,19 @@ func pushBranch(srcDir, branch string) error { return nil } +func resultFileSize(r core.Result) (int64, bool) { + type sizable interface { + Size() int64 + } + + switch value := r.Value.(type) { + case sizable: + return value.Size(), true + default: + return 0, false + } +} + // updateStatus updates the workspace status.json. func updateStatus(wsDir, status, question string) { r := fs.Read(workspaceStatusPath(wsDir)) diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index 16b23bc..461bdf0 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -15,7 +15,6 @@ import ( "net/http" "net/url" "os" - "path/filepath" "sync" "time" @@ -40,7 +39,43 @@ func workspaceStatusPath(wsDir string) string { } func brainKeyPath(home string) string { - return filepath.Join(home, ".claude", "brain.key") + return core.JoinPath(home, ".claude", "brain.key") +} + +func monitorPath(path string) string { + ds := core.Env("DS") + return core.Replace(core.Replace(path, "\\", ds), "/", ds) +} + +func monitorHomeDir() string { + if coreHome, ok := os.LookupEnv("CORE_HOME"); ok && coreHome != "" { + return coreHome + } + if home, ok := os.LookupEnv("HOME"); ok && home != "" { + return home + } + return core.Env("DIR_HOME") +} + +func monitorAPIURL() string { + apiURL := core.Env("CORE_API_URL") + if apiURL == "" { + return "https://api.lthn.sh" + } + return apiURL +} + +func monitorBrainKey() string { + brainKey := core.Env("CORE_BRAIN_KEY") + if brainKey != "" { + return brainKey + } + if r := fs.Read(brainKeyPath(monitorHomeDir())); r.OK { + if value, ok := resultString(r); ok { + return core.Trim(value) + } + } + return "" } func resultString(r core.Result) (string, bool) { @@ -110,7 +145,7 @@ func New(opts ...Options) *Subsystem { interval = opts[0].Interval } // Override via env for debugging - if envInterval := os.Getenv("MONITOR_INTERVAL"); envInterval != "" { + if envInterval := core.Env("MONITOR_INTERVAL"); envInterval != "" { if d, err := time.ParseDuration(envInterval); err == nil { interval = d } @@ -259,10 +294,7 @@ func (m *Subsystem) check(ctx context.Context) { // don't suppress future notifications. func (m *Subsystem) checkCompletions() string { wsRoot := agentic.WorkspaceRoot() - entries, err := filepath.Glob(workspaceStatusGlob(wsRoot)) - if err != nil { - return "" - } + entries := core.PathGlob(workspaceStatusGlob(wsRoot)) running := 0 queued := 0 @@ -289,7 +321,7 @@ func (m *Subsystem) checkCompletions() string { continue } - wsName := filepath.Base(filepath.Dir(entry)) + wsName := core.PathBase(core.PathDir(monitorPath(entry))) switch st.Status { case "completed": @@ -343,26 +375,11 @@ func (m *Subsystem) checkCompletions() string { // checkInbox checks for unread messages. func (m *Subsystem) checkInbox() string { - apiKeyStr := os.Getenv("CORE_BRAIN_KEY") + apiKeyStr := monitorBrainKey() if apiKeyStr == "" { - home, _ := os.UserHomeDir() - keyFile := brainKeyPath(home) - r := fs.Read(keyFile) - if !r.OK { - return "" - } - value, ok := resultString(r) - if !ok { - return "" - } - apiKeyStr = value - } - - // Call the API to check inbox - apiURL := os.Getenv("CORE_API_URL") - if apiURL == "" { - apiURL = "https://api.lthn.sh" + return "" } + apiURL := monitorAPIURL() req, err := http.NewRequest("GET", core.Concat(apiURL, "/v1/messages/inbox?agent=", url.QueryEscape(agentic.AgentName())), nil) if err != nil { return "" @@ -475,10 +492,7 @@ func (m *Subsystem) notify(ctx context.Context, message string) { // agentStatusResource returns current workspace status as a JSON resource. func (m *Subsystem) agentStatusResource(ctx context.Context, req *mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) { wsRoot := agentic.WorkspaceRoot() - entries, err := filepath.Glob(workspaceStatusGlob(wsRoot)) - if err != nil { - return nil, core.E("monitor.agentStatus", "failed to scan workspaces", err) - } + entries := core.PathGlob(workspaceStatusGlob(wsRoot)) type wsInfo struct { Name string `json:"name"` @@ -508,7 +522,7 @@ func (m *Subsystem) agentStatusResource(ctx context.Context, req *mcp.ReadResour continue } workspaces = append(workspaces, wsInfo{ - Name: filepath.Base(filepath.Dir(entry)), + Name: core.PathBase(core.PathDir(monitorPath(entry))), Status: st.Status, Repo: st.Repo, Agent: st.Agent, diff --git a/pkg/monitor/sync.go b/pkg/monitor/sync.go index d9e12ee..55b1045 100644 --- a/pkg/monitor/sync.go +++ b/pkg/monitor/sync.go @@ -6,9 +6,7 @@ import ( "encoding/json" "net/http" neturl "net/url" - "os" "os/exec" - "path/filepath" "time" "dappco.re/go/agent/pkg/agentic" @@ -37,14 +35,8 @@ type ChangedRepo struct { // syncRepos calls the checkin API and pulls any repos that changed. // Returns a human-readable message if repos were updated, empty string otherwise. func (m *Subsystem) syncRepos() string { - apiURL := os.Getenv("CORE_API_URL") - if apiURL == "" { - apiURL = "https://api.lthn.sh" - } - agentName := agentic.AgentName() - - checkinURL := core.Sprintf("%s/v1/agent/checkin?agent=%s&since=%d", apiURL, neturl.QueryEscape(agentName), m.lastSyncTimestamp) + checkinURL := core.Sprintf("%s/v1/agent/checkin?agent=%s&since=%d", monitorAPIURL(), neturl.QueryEscape(agentName), m.lastSyncTimestamp) req, err := http.NewRequest("GET", checkinURL, nil) if err != nil { @@ -52,15 +44,7 @@ func (m *Subsystem) syncRepos() string { } // Use brain key for auth - brainKey := os.Getenv("CORE_BRAIN_KEY") - if brainKey == "" { - home, _ := os.UserHomeDir() - if r := fs.Read(brainKeyPath(home)); r.OK { - if value, ok := resultString(r); ok { - brainKey = core.Trim(value) - } - } - } + brainKey := monitorBrainKey() if brainKey != "" { req.Header.Set("Authorization", core.Concat("Bearer ", brainKey)) } @@ -90,21 +74,20 @@ func (m *Subsystem) syncRepos() string { } // Pull changed repos - basePath := os.Getenv("CODE_PATH") + basePath := core.Env("CODE_PATH") if basePath == "" { - home, _ := os.UserHomeDir() - basePath = filepath.Join(home, "Code", "core") + basePath = core.JoinPath(monitorHomeDir(), "Code", "core") } var pulled []string for _, repo := range checkin.Changed { // Sanitise repo name to prevent path traversal from API response - repoName := filepath.Base(repo.Repo) + repoName := core.PathBase(monitorPath(repo.Repo)) if repoName == "." || repoName == ".." || repoName == "" { continue } repoDir := core.Concat(basePath, "/", repoName) - if _, err := os.Stat(repoDir); err != nil { + if !fs.Exists(repoDir) || fs.IsFile(repoDir) { continue }