From 7d6fd8af5e81d68d8938b23f8e18047a49b26fda Mon Sep 17 00:00:00 2001 From: Virgil Date: Mon, 30 Mar 2026 19:40:02 +0000 Subject: [PATCH] fix(ax): make agentic status reads Result-native Co-Authored-By: Virgil --- pkg/agentic/actions.go | 24 ++++++++++++++++-------- pkg/agentic/auto_pr.go | 23 ++++++++++++++++++----- pkg/agentic/commands_workspace.go | 10 ++++++---- pkg/agentic/dispatch.go | 28 ++++++++++++++++------------ pkg/agentic/dispatch_sync.go | 6 ++++-- pkg/agentic/handlers.go | 11 ++++++++--- pkg/agentic/ingest.go | 5 +++-- pkg/agentic/pr.go | 6 ++++-- pkg/agentic/prep.go | 5 +++-- pkg/agentic/queue.go | 15 +++++++++------ pkg/agentic/resume.go | 6 ++++-- pkg/agentic/status.go | 17 +++++++++++++++-- pkg/agentic/verify.go | 30 ++++++++++++++++++++---------- pkg/agentic/watch.go | 10 ++++++---- 14 files changed, 132 insertions(+), 64 deletions(-) diff --git a/pkg/agentic/actions.go b/pkg/agentic/actions.go index 463896b..1491826 100644 --- a/pkg/agentic/actions.go +++ b/pkg/agentic/actions.go @@ -189,17 +189,21 @@ func (s *PrepSubsystem) handleQA(ctx context.Context, opts core.Options) core.Re } passed := s.runQA(wsDir) if !passed { - if st, err := ReadStatus(wsDir); err == nil { - st.Status = "failed" - st.Question = "QA check failed — build or tests did not pass" - writeStatusResult(wsDir, st) + if result := ReadStatusResult(wsDir); result.OK { + st, ok := workspaceStatusValue(result) + if ok { + st.Status = "failed" + st.Question = "QA check failed — build or tests did not pass" + writeStatusResult(wsDir, st) + } } } // Emit QA result for observability (monitor picks this up) if s.ServiceRuntime != nil { - st, _ := ReadStatus(wsDir) + result := ReadStatusResult(wsDir) + st, ok := workspaceStatusValue(result) repo := "" - if st != nil { + if ok { repo = st.Repo } s.Core().ACTION(messages.QAResult{ @@ -228,7 +232,9 @@ func (s *PrepSubsystem) handleAutoPR(ctx context.Context, opts core.Options) cor // Emit PRCreated for observability if s.ServiceRuntime != nil { - if st, err := ReadStatus(wsDir); err == nil && st.PRURL != "" { + result := ReadStatusResult(wsDir) + st, ok := workspaceStatusValue(result) + if ok && st.PRURL != "" { s.Core().ACTION(messages.PRCreated{ Repo: st.Repo, Branch: st.Branch, @@ -257,7 +263,9 @@ func (s *PrepSubsystem) handleVerify(ctx context.Context, opts core.Options) cor // Emit merge/review events for observability if s.ServiceRuntime != nil { - if st, err := ReadStatus(wsDir); err == nil { + result := ReadStatusResult(wsDir) + st, ok := workspaceStatusValue(result) + if ok { if st.Status == "merged" { s.Core().ACTION(messages.PRMerged{ Repo: st.Repo, diff --git a/pkg/agentic/auto_pr.go b/pkg/agentic/auto_pr.go index 29991b8..713e500 100644 --- a/pkg/agentic/auto_pr.go +++ b/pkg/agentic/auto_pr.go @@ -12,8 +12,9 @@ import ( // autoCreatePR pushes the agent's branch and creates a PR on Forge // if the agent made any commits beyond the initial clone. func (s *PrepSubsystem) autoCreatePR(wsDir string) { - st, err := ReadStatus(wsDir) - if err != nil || st.Branch == "" || st.Repo == "" { + result := ReadStatusResult(wsDir) + st, ok := workspaceStatusValue(result) + if !ok || st.Branch == "" || st.Repo == "" { return } @@ -43,7 +44,11 @@ func (s *PrepSubsystem) autoCreatePR(wsDir string) { // Push the branch to forge forgeRemote := core.Sprintf("ssh://git@forge.lthn.ai:2223/%s/%s.git", org, st.Repo) if !process.RunIn(ctx, repoDir, "git", "push", forgeRemote, st.Branch).OK { - if st2, err := ReadStatus(wsDir); err == nil { + if result := ReadStatusResult(wsDir); result.OK { + st2, ok := workspaceStatusValue(result) + if !ok { + return + } st2.Question = "PR push failed" writeStatusResult(wsDir, st2) } @@ -59,7 +64,11 @@ func (s *PrepSubsystem) autoCreatePR(wsDir string) { prURL, _, err := s.forgeCreatePR(ctx, org, st.Repo, st.Branch, base, title, body) if err != nil { - if st2, err := ReadStatus(wsDir); err == nil { + if result := ReadStatusResult(wsDir); result.OK { + st2, ok := workspaceStatusValue(result) + if !ok { + return + } st2.Question = core.Sprintf("PR creation failed: %v", err) writeStatusResult(wsDir, st2) } @@ -67,7 +76,11 @@ func (s *PrepSubsystem) autoCreatePR(wsDir string) { } // Update status with PR URL - if st2, err := ReadStatus(wsDir); err == nil { + if result := ReadStatusResult(wsDir); result.OK { + st2, ok := workspaceStatusValue(result) + if !ok { + return + } st2.PRURL = prURL writeStatusResult(wsDir, st2) } diff --git a/pkg/agentic/commands_workspace.go b/pkg/agentic/commands_workspace.go index 9fc2d96..f3c311d 100644 --- a/pkg/agentic/commands_workspace.go +++ b/pkg/agentic/commands_workspace.go @@ -24,8 +24,9 @@ func (s *PrepSubsystem) cmdWorkspaceList(_ core.Options) core.Result { for _, sf := range statusFiles { wsDir := core.PathDir(sf) wsName := WorkspaceName(wsDir) - st, err := ReadStatus(wsDir) - if err != nil { + result := ReadStatusResult(wsDir) + st, ok := workspaceStatusValue(result) + if !ok { continue } core.Print(nil, " %-8s %-8s %-10s %s", st.Status, st.Agent, st.Repo, wsName) @@ -51,8 +52,9 @@ func (s *PrepSubsystem) cmdWorkspaceClean(opts core.Options) core.Result { for _, sf := range statusFiles { wsDir := core.PathDir(sf) wsName := WorkspaceName(wsDir) - st, err := ReadStatus(wsDir) - if err != nil { + result := ReadStatusResult(wsDir) + st, ok := workspaceStatusValue(result) + if !ok { continue } status := st.Status diff --git a/pkg/agentic/dispatch.go b/pkg/agentic/dispatch.go index d208606..77c19ed 100644 --- a/pkg/agentic/dispatch.go +++ b/pkg/agentic/dispatch.go @@ -282,8 +282,9 @@ func (s *PrepSubsystem) startIssueTracking(wsDir string) { if s.forge == nil { return } - st, _ := ReadStatus(wsDir) - if st == nil || st.Issue == 0 { + result := ReadStatusResult(wsDir) + st, ok := workspaceStatusValue(result) + if !ok || st.Issue == 0 { return } org := st.Org @@ -298,8 +299,9 @@ func (s *PrepSubsystem) stopIssueTracking(wsDir string) { if s.forge == nil { return } - st, _ := ReadStatus(wsDir) - if st == nil || st.Issue == 0 { + result := ReadStatusResult(wsDir) + st, ok := workspaceStatusValue(result) + if !ok || st.Issue == 0 { return } org := st.Org @@ -312,9 +314,10 @@ func (s *PrepSubsystem) stopIssueTracking(wsDir string) { // broadcastStart emits IPC + audit events for agent start. func (s *PrepSubsystem) broadcastStart(agent, wsDir string) { wsName := WorkspaceName(wsDir) - st, _ := ReadStatus(wsDir) + result := ReadStatusResult(wsDir) + st, ok := workspaceStatusValue(result) repo := "" - if st != nil { + if ok { repo = st.Repo } if s.ServiceRuntime != nil { @@ -330,9 +333,10 @@ func (s *PrepSubsystem) broadcastComplete(agent, wsDir, finalStatus string) { wsName := WorkspaceName(wsDir) emitCompletionEvent(agent, wsName, finalStatus) if s.ServiceRuntime != nil { - st, _ := ReadStatus(wsDir) + result := ReadStatusResult(wsDir) + st, ok := workspaceStatusValue(result) repo := "" - if st != nil { + if ok { repo = st.Repo } s.Core().ACTION(messages.AgentCompleted{ @@ -354,16 +358,16 @@ func (s *PrepSubsystem) onAgentComplete(agent, wsDir, outputFile string, exitCod finalStatus, question := detectFinalStatus(repoDir, exitCode, procStatus) // Update workspace status (disk + registry) - if st, err := ReadStatus(wsDir); err == nil { + result := ReadStatusResult(wsDir) + st, ok := workspaceStatusValue(result) + if ok { st.Status = finalStatus st.PID = 0 st.Question = question writeStatusResult(wsDir, st) s.TrackWorkspace(WorkspaceName(wsDir), st) - } - // Rate-limit tracking - if st, _ := ReadStatus(wsDir); st != nil { + // Rate-limit tracking s.trackFailureRate(agent, finalStatus, st.StartedAt) } diff --git a/pkg/agentic/dispatch_sync.go b/pkg/agentic/dispatch_sync.go index c35e969..1a96f98 100644 --- a/pkg/agentic/dispatch_sync.go +++ b/pkg/agentic/dispatch_sync.go @@ -87,8 +87,10 @@ func (s *PrepSubsystem) DispatchSync(ctx context.Context, input DispatchSyncInpu case <-ticker.C: if pid > 0 && !ProcessAlive(runtime, processID, pid) { // Process exited — read final status - st, err := ReadStatus(wsDir) - if err != nil { + result := ReadStatusResult(wsDir) + st, ok := workspaceStatusValue(result) + if !ok { + err, _ := result.Value.(error) return DispatchSyncResult{Err: core.E("agentic.DispatchSync", "can't read final status", err)} } return DispatchSyncResult{ diff --git a/pkg/agentic/handlers.go b/pkg/agentic/handlers.go index 06ca30b..f4d4afc 100644 --- a/pkg/agentic/handlers.go +++ b/pkg/agentic/handlers.go @@ -38,7 +38,11 @@ func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Res break } // Update status with real PID - if st, serr := ReadStatus(wsDir); serr == nil { + if result := ReadStatusResult(wsDir); result.OK { + st, ok := workspaceStatusValue(result) + if !ok { + break + } st.PID = pid st.ProcessID = processID writeStatusResult(wsDir, st) @@ -84,8 +88,9 @@ func resolveWorkspace(name string) string { func findWorkspaceByPR(repo, branch string) string { for _, path := range WorkspaceStatusPaths() { wsDir := core.PathDir(path) - st, err := ReadStatus(wsDir) - if err != nil { + result := ReadStatusResult(wsDir) + st, ok := workspaceStatusValue(result) + if !ok { continue } if st.Repo == repo && st.Branch == branch { diff --git a/pkg/agentic/ingest.go b/pkg/agentic/ingest.go index 907bf4e..1d03785 100644 --- a/pkg/agentic/ingest.go +++ b/pkg/agentic/ingest.go @@ -15,8 +15,9 @@ func agentHomeDir() string { // ingestFindings reads the agent output log and creates issues via the API // for scan/audit results. Only runs for conventions and security templates. func (s *PrepSubsystem) ingestFindings(wsDir string) { - st, err := ReadStatus(wsDir) - if err != nil || st.Status != "completed" { + result := ReadStatusResult(wsDir) + st, ok := workspaceStatusValue(result) + if !ok || st.Status != "completed" { return } diff --git a/pkg/agentic/pr.go b/pkg/agentic/pr.go index 3e36ecc..5274572 100644 --- a/pkg/agentic/pr.go +++ b/pkg/agentic/pr.go @@ -59,8 +59,10 @@ func (s *PrepSubsystem) createPR(ctx context.Context, _ *mcp.CallToolRequest, in } // Read workspace status for repo, branch, issue context - st, err := ReadStatus(wsDir) - if err != nil { + result := ReadStatusResult(wsDir) + st, ok := workspaceStatusValue(result) + if !ok { + err, _ := result.Value.(error) return nil, CreatePROutput{}, core.E("createPR", "no status.json", err) } diff --git a/pkg/agentic/prep.go b/pkg/agentic/prep.go index c1bfd05..54c78dd 100644 --- a/pkg/agentic/prep.go +++ b/pkg/agentic/prep.go @@ -235,8 +235,9 @@ func (s *PrepSubsystem) hydrateWorkspaces() { } for _, path := range WorkspaceStatusPaths() { wsDir := core.PathDir(path) - st, err := ReadStatus(wsDir) - if err != nil || st == nil { + result := ReadStatusResult(wsDir) + st, ok := workspaceStatusValue(result) + if !ok { continue } s.workspaces.Set(WorkspaceName(wsDir), st) diff --git a/pkg/agentic/queue.go b/pkg/agentic/queue.go index 7d5e1ee..2705224 100644 --- a/pkg/agentic/queue.go +++ b/pkg/agentic/queue.go @@ -186,8 +186,9 @@ func (s *PrepSubsystem) countRunningByAgent(agent string) int { func (s *PrepSubsystem) countRunningByAgentDisk(runtime *core.Core, agent string) int { count := 0 for _, statusPath := range WorkspaceStatusPaths() { - st, err := ReadStatus(core.PathDir(statusPath)) - if err != nil || st.Status != "running" { + result := ReadStatusResult(core.PathDir(statusPath)) + st, ok := workspaceStatusValue(result) + if !ok || st.Status != "running" { continue } if baseAgent(st.Agent) != agent { @@ -228,8 +229,9 @@ func (s *PrepSubsystem) countRunningByModel(agent string) int { func (s *PrepSubsystem) countRunningByModelDisk(runtime *core.Core, agent string) int { count := 0 for _, statusPath := range WorkspaceStatusPaths() { - st, err := ReadStatus(core.PathDir(statusPath)) - if err != nil || st.Status != "running" { + result := ReadStatusResult(core.PathDir(statusPath)) + st, ok := workspaceStatusValue(result) + if !ok || st.Status != "running" { continue } if st.Agent != agent { @@ -326,8 +328,9 @@ func (s *PrepSubsystem) drainQueue() { func (s *PrepSubsystem) drainOne() bool { for _, statusPath := range WorkspaceStatusPaths() { wsDir := core.PathDir(statusPath) - st, err := ReadStatus(wsDir) - if err != nil || st.Status != "queued" { + result := ReadStatusResult(wsDir) + st, ok := workspaceStatusValue(result) + if !ok || st.Status != "queued" { continue } diff --git a/pkg/agentic/resume.go b/pkg/agentic/resume.go index 77b0a95..1333946 100644 --- a/pkg/agentic/resume.go +++ b/pkg/agentic/resume.go @@ -52,8 +52,10 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu } // Read current status - st, err := ReadStatus(wsDir) - if err != nil { + result := ReadStatusResult(wsDir) + st, ok := workspaceStatusValue(result) + if !ok { + err, _ := result.Value.(error) return nil, ResumeOutput{}, core.E("resume", "no status.json in workspace", err) } diff --git a/pkg/agentic/status.go b/pkg/agentic/status.go index a213e34..aeeb187 100644 --- a/pkg/agentic/status.go +++ b/pkg/agentic/status.go @@ -136,6 +136,18 @@ func ReadStatusResult(wsDir string) core.Result { return core.Result{Value: &s, OK: true} } +// workspaceStatusValue extracts a WorkspaceStatus from a Result. +// +// r := ReadStatusResult("/path/to/workspace") +// st, ok := workspaceStatusValue(r) +func workspaceStatusValue(result core.Result) (*WorkspaceStatus, bool) { + st, ok := result.Value.(*WorkspaceStatus) + if !ok || st == nil { + return nil, false + } + return st, true +} + // --- agentic_status tool --- // StatusInput is the input for agentic_status. @@ -190,8 +202,9 @@ func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, inpu wsDir := core.PathDir(statusPath) name := WorkspaceName(wsDir) - st, err := ReadStatus(wsDir) - if err != nil { + result := ReadStatusResult(wsDir) + st, ok := workspaceStatusValue(result) + if !ok { out.Total++ out.Failed++ continue diff --git a/pkg/agentic/verify.go b/pkg/agentic/verify.go index b6ee261..ad73b7c 100644 --- a/pkg/agentic/verify.go +++ b/pkg/agentic/verify.go @@ -17,8 +17,9 @@ import ( // // agentic_dispatch repo=go-crypt template=verify persona=engineering/engineering-security-engineer func (s *PrepSubsystem) autoVerifyAndMerge(wsDir string) { - st, err := ReadStatus(wsDir) - if err != nil || st.PRURL == "" || st.Repo == "" { + result := ReadStatusResult(wsDir) + st, ok := workspaceStatusValue(result) + if !ok || st.PRURL == "" || st.Repo == "" { return } @@ -35,21 +36,25 @@ func (s *PrepSubsystem) autoVerifyAndMerge(wsDir string) { // markMerged is a helper to avoid repeating the status update. markMerged := func() { - if st2, err := ReadStatus(wsDir); err == nil { + if result := ReadStatusResult(wsDir); result.OK { + st2, ok := workspaceStatusValue(result) + if !ok { + return + } st2.Status = "merged" writeStatusResult(wsDir, st2) } } // Attempt 1: run tests and try to merge - result := s.attemptVerifyAndMerge(repoDir, org, st.Repo, st.Branch, prNum) - if result == mergeSuccess { + mergeOutcome := s.attemptVerifyAndMerge(repoDir, org, st.Repo, st.Branch, prNum) + if mergeOutcome == mergeSuccess { markMerged() return } // Attempt 2: rebase onto main and retry - if result == mergeConflict || result == testFailed { + if mergeOutcome == mergeConflict || mergeOutcome == testFailed { if s.rebaseBranch(repoDir, st.Branch) { if s.attemptVerifyAndMerge(repoDir, org, st.Repo, st.Branch, prNum) == mergeSuccess { markMerged() @@ -59,9 +64,13 @@ func (s *PrepSubsystem) autoVerifyAndMerge(wsDir string) { } // Both attempts failed — flag for human review - s.flagForReview(org, st.Repo, prNum, result) + s.flagForReview(org, st.Repo, prNum, mergeOutcome) - if st2, err := ReadStatus(wsDir); err == nil { + if result := ReadStatusResult(wsDir); result.OK { + st2, ok := workspaceStatusValue(result) + if !ok { + return + } st2.Question = "Flagged for review — auto-merge failed after retry" writeStatusResult(wsDir, st2) } @@ -116,10 +125,11 @@ func (s *PrepSubsystem) rebaseBranch(repoDir, branch string) bool { return false } - st, _ := ReadStatus(core.PathDir(repoDir)) + result := ReadStatusResult(core.PathDir(repoDir)) + st, ok := workspaceStatusValue(result) org := "core" repo := "" - if st != nil { + if ok { if st.Org != "" { org = st.Org } diff --git a/pkg/agentic/watch.go b/pkg/agentic/watch.go index 8945401..4d82a77 100644 --- a/pkg/agentic/watch.go +++ b/pkg/agentic/watch.go @@ -112,8 +112,9 @@ func (s *PrepSubsystem) watch(ctx context.Context, req *mcp.CallToolRequest, inp for ws := range remaining { wsDir := s.resolveWorkspaceDir(ws) - st, err := ReadStatus(wsDir) - if err != nil { + result := ReadStatusResult(wsDir) + st, ok := workspaceStatusValue(result) + if !ok { continue } @@ -196,8 +197,9 @@ func (s *PrepSubsystem) findActiveWorkspaces() []string { var active []string for _, entry := range WorkspaceStatusPaths() { wsDir := core.PathDir(entry) - st, err := ReadStatus(wsDir) - if err != nil { + result := ReadStatusResult(wsDir) + st, ok := workspaceStatusValue(result) + if !ok { continue } if st.Status == "running" || st.Status == "queued" {