diff --git a/pkg/agentic/dispatch.go b/pkg/agentic/dispatch.go index fe28349..dd4fa80 100644 --- a/pkg/agentic/dispatch.go +++ b/pkg/agentic/dispatch.go @@ -60,6 +60,29 @@ func (s *PrepSubsystem) registerDispatchTool(server *mcp.Server) { // agentCommand returns the command and args for a given agent type. // Supports model variants: "gemini", "gemini:flash", "codex", "claude", "claude:haiku". func agentCommand(agent, prompt string) (string, []string, error) { + r := agentCommandResult(agent, prompt) + if !r.OK { + err, _ := r.Value.(error) + if err == nil { + err = core.E("agentCommand", "failed to resolve command", nil) + } + return "", nil, err + } + + result, ok := r.Value.(agentCommandResultValue) + if !ok { + return "", nil, core.E("agentCommand", "invalid command result", nil) + } + + return result.command, result.args, nil +} + +type agentCommandResultValue struct { + command string + args []string +} + +func agentCommandResult(agent, prompt string) core.Result { parts := core.SplitN(agent, ":", 2) base := parts[0] model := "" @@ -73,16 +96,16 @@ func agentCommand(agent, prompt string) (string, []string, error) { if model != "" { args = append(args, "-m", core.Concat("gemini-2.5-", model)) } - return "gemini", args, nil + return core.Result{Value: agentCommandResultValue{command: "gemini", args: args}, OK: true} case "codex": if model == "review" { // Use exec with bypass — codex review subcommand has its own sandbox that blocks shell // No -o flag — stdout captured by process output, ../.meta path unreliable in sandbox - return "codex", []string{ + return core.Result{Value: agentCommandResultValue{command: "codex", args: []string{ "exec", "--dangerously-bypass-approvals-and-sandbox", "Review the last 2 commits via git diff HEAD~2. Check for bugs, security issues, missing tests, naming issues. Report pass/fail with specifics. Do NOT make changes.", - }, nil + }}, OK: true } // Container IS the sandbox — let codex run unrestricted inside it args := []string{ @@ -94,7 +117,7 @@ func agentCommand(agent, prompt string) (string, []string, error) { args = append(args, "--model", model) } args = append(args, prompt) - return "codex", args, nil + return core.Result{Value: agentCommandResultValue{command: "codex", args: args}, OK: true} case "claude": args := []string{ "-p", prompt, @@ -106,7 +129,7 @@ func agentCommand(agent, prompt string) (string, []string, error) { if model != "" { args = append(args, "--model", model) } - return "claude", args, nil + return core.Result{Value: agentCommandResultValue{command: "claude", args: args}, OK: true} case "coderabbit": args := []string{"review", "--plain", "--base", "HEAD~1"} if model != "" { @@ -115,7 +138,7 @@ func agentCommand(agent, prompt string) (string, []string, error) { if prompt != "" { args = append(args, "--config", "CLAUDE.md") } - return "coderabbit", args, nil + return core.Result{Value: agentCommandResultValue{command: "coderabbit", args: args}, OK: true} case "local": // Local model via codex --oss → Ollama. Default model: devstral-24b // socat proxies localhost:11434 → host.docker.internal:11434 @@ -128,9 +151,9 @@ func agentCommand(agent, prompt string) (string, []string, error) { `socat TCP-LISTEN:11434,fork,reuseaddr TCP:host.docker.internal:11434 & sleep 0.5 && codex exec --dangerously-bypass-approvals-and-sandbox --oss --local-provider ollama -m %s -o ../.meta/agent-codex.log %q`, localModel, prompt, ) - return "sh", []string{"-c", script}, nil + return core.Result{Value: agentCommandResultValue{command: "sh", args: []string{"-c", script}}, OK: true} default: - return "", nil, core.E("agentCommand", core.Concat("unknown agent: ", agent), nil) + return core.Result{Value: core.E("agentCommand", core.Concat("unknown agent: ", agent), nil), OK: false} } } diff --git a/pkg/agentic/plan.go b/pkg/agentic/plan.go index 115cb33..1cd1294 100644 --- a/pkg/agentic/plan.go +++ b/pkg/agentic/plan.go @@ -196,9 +196,17 @@ func (s *PrepSubsystem) planCreate(_ context.Context, _ *mcp.CallToolRequest, in } } - path, err := writePlan(PlansRoot(), &plan) - if err != nil { - return nil, PlanCreateOutput{}, core.E("planCreate", "failed to write plan", err) + writeResult := writePlanResult(PlansRoot(), &plan) + if !writeResult.OK { + err, _ := writeResult.Value.(error) + if err == nil { + err = core.E("planCreate", "failed to write plan", nil) + } + return nil, PlanCreateOutput{}, err + } + path, ok := writeResult.Value.(string) + if !ok { + return nil, PlanCreateOutput{}, core.E("planCreate", "invalid plan write result", nil) } return nil, PlanCreateOutput{ @@ -213,10 +221,18 @@ func (s *PrepSubsystem) planRead(_ context.Context, _ *mcp.CallToolRequest, inpu return nil, PlanReadOutput{}, core.E("planRead", "id is required", nil) } - plan, err := readPlan(PlansRoot(), input.ID) - if err != nil { + planResult := readPlanResult(PlansRoot(), input.ID) + if !planResult.OK { + err, _ := planResult.Value.(error) + if err == nil { + err = core.E("planRead", "failed to read plan", nil) + } return nil, PlanReadOutput{}, err } + plan, ok := planResult.Value.(*Plan) + if !ok || plan == nil { + return nil, PlanReadOutput{}, core.E("planRead", "invalid plan payload", nil) + } return nil, PlanReadOutput{ Success: true, @@ -229,10 +245,18 @@ func (s *PrepSubsystem) planUpdate(_ context.Context, _ *mcp.CallToolRequest, in return nil, PlanUpdateOutput{}, core.E("planUpdate", "id is required", nil) } - plan, err := readPlan(PlansRoot(), input.ID) - if err != nil { + planResult := readPlanResult(PlansRoot(), input.ID) + if !planResult.OK { + err, _ := planResult.Value.(error) + if err == nil { + err = core.E("planUpdate", "failed to read plan", nil) + } return nil, PlanUpdateOutput{}, err } + plan, ok := planResult.Value.(*Plan) + if !ok || plan == nil { + return nil, PlanUpdateOutput{}, core.E("planUpdate", "invalid plan payload", nil) + } // Apply partial updates if input.Status != "" { @@ -259,8 +283,13 @@ func (s *PrepSubsystem) planUpdate(_ context.Context, _ *mcp.CallToolRequest, in plan.UpdatedAt = time.Now() - if _, err := writePlan(PlansRoot(), plan); err != nil { - return nil, PlanUpdateOutput{}, core.E("planUpdate", "failed to write plan", err) + writeResult := writePlanResult(PlansRoot(), plan) + if !writeResult.OK { + err, _ := writeResult.Value.(error) + if err == nil { + err = core.E("planUpdate", "failed to write plan", nil) + } + return nil, PlanUpdateOutput{}, err } return nil, PlanUpdateOutput{ @@ -302,8 +331,12 @@ func (s *PrepSubsystem) planList(_ context.Context, _ *mcp.CallToolRequest, inpu var plans []Plan for _, f := range jsonFiles { id := core.TrimSuffix(core.PathBase(f), ".json") - plan, err := readPlan(dir, id) - if err != nil { + planResult := readPlanResult(dir, id) + if !planResult.OK { + continue + } + plan, ok := planResult.Value.(*Plan) + if !ok || plan == nil { continue } @@ -345,30 +378,93 @@ func generatePlanID(title string) string { return core.Concat(slug, "-", hex.EncodeToString(b)) } -func readPlan(dir, id string) (*Plan, error) { +// readPlanResult reads and decodes a plan file as core.Result. +// +// result := readPlanResult(PlansRoot(), "plan-id") +// if result.OK { plan := result.Value.(*Plan) } +func readPlanResult(dir, id string) core.Result { r := fs.Read(planPath(dir, id)) if !r.OK { - return nil, core.E("readPlan", core.Concat("plan not found: ", id), nil) + err, _ := r.Value.(error) + if err == nil { + return core.Result{Value: core.E("readPlan", core.Concat("plan not found: ", id), nil), OK: false} + } + return core.Result{Value: core.E("readPlan", core.Concat("plan not found: ", id), err), OK: false} } var plan Plan if ur := core.JSONUnmarshalString(r.Value.(string), &plan); !ur.OK { - return nil, core.E("readPlan", core.Concat("failed to parse plan ", id), nil) + err, _ := ur.Value.(error) + if err == nil { + return core.Result{Value: core.E("readPlan", core.Concat("failed to parse plan ", id), nil), OK: false} + } + return core.Result{Value: core.E("readPlan", core.Concat("failed to parse plan ", id), err), OK: false} } - return &plan, nil + return core.Result{Value: &plan, OK: true} } -func writePlan(dir string, plan *Plan) (string, error) { +// readPlan reads a plan file. Kept as compatibility wrapper. +// +// plan, err := readPlan(PlansRoot(), "plan-id") +func readPlan(dir, id string) (*Plan, error) { + r := readPlanResult(dir, id) + if !r.OK { + err, _ := r.Value.(error) + if err == nil { + return nil, core.E("readPlan", "failed to read plan", nil) + } + return nil, err + } + plan, ok := r.Value.(*Plan) + if !ok || plan == nil { + return nil, core.E("readPlan", "invalid plan payload", nil) + } + return plan, nil +} + +// writePlanResult writes a plan file and returns core.Result. +// +// result := writePlanResult(PlansRoot(), plan) +// if result.OK { path := result.Value.(string) } +func writePlanResult(dir string, plan *Plan) core.Result { + if plan == nil { + return core.Result{Value: core.E("writePlan", "plan is required", nil), OK: false} + } if r := fs.EnsureDir(dir); !r.OK { err, _ := r.Value.(error) - return "", core.E("writePlan", "failed to create plans directory", err) + if err == nil { + return core.Result{Value: core.E("writePlan", "failed to create plans directory", nil), OK: false} + } + return core.Result{Value: core.E("writePlan", "failed to create plans directory", err), OK: false} } path := planPath(dir, plan.ID) if r := fs.Write(path, core.JSONMarshalString(plan)); !r.OK { err, _ := r.Value.(error) - return "", core.E("writePlan", "failed to write plan", err) + if err == nil { + return core.Result{Value: core.E("writePlan", "failed to write plan", nil), OK: false} + } + return core.Result{Value: core.E("writePlan", "failed to write plan", err), OK: false} + } + return core.Result{Value: path, OK: true} +} + +// writePlan writes a plan file. Kept as compatibility wrapper. +// +// _, err := writePlan(PlansRoot(), plan) +func writePlan(dir string, plan *Plan) (string, error) { + r := writePlanResult(dir, plan) + if !r.OK { + err, _ := r.Value.(error) + if err == nil { + return "", core.E("writePlan", "failed to write plan", nil) + } + return "", err + } + path, ok := r.Value.(string) + if !ok { + return "", core.E("writePlan", "invalid plan write result", nil) } return path, nil } diff --git a/pkg/agentic/prep.go b/pkg/agentic/prep.go index e18b6e0..a1309ab 100644 --- a/pkg/agentic/prep.go +++ b/pkg/agentic/prep.go @@ -353,18 +353,38 @@ type PrepOutput struct { // dir := workspaceDir("core", "go-io", PrepInput{Issue: 15}) // // → ".core/workspace/core/go-io/task-15" func workspaceDir(org, repo string, input PrepInput) (string, error) { + r := workspaceDirResult(org, repo, input) + if !r.OK { + err, _ := r.Value.(error) + if err == nil { + err = core.E("workspaceDir", "failed to resolve workspace directory", nil) + } + return "", err + } + wsDir, ok := r.Value.(string) + if !ok || wsDir == "" { + return "", core.E("workspaceDir", "invalid workspace directory result", nil) + } + return wsDir, nil +} + +// workspaceDirResult resolves the workspace path and returns core.Result. +// +// r := workspaceDirResult("core", "go-io", PrepInput{Issue: 15}) +// if r.OK { wsDir := r.Value.(string) } +func workspaceDirResult(org, repo string, input PrepInput) core.Result { base := core.JoinPath(WorkspaceRoot(), org, repo) switch { case input.PR > 0: - return core.JoinPath(base, core.Sprintf("pr-%d", input.PR)), nil + return core.Result{Value: core.JoinPath(base, core.Sprintf("pr-%d", input.PR)), OK: true} case input.Issue > 0: - return core.JoinPath(base, core.Sprintf("task-%d", input.Issue)), nil + return core.Result{Value: core.JoinPath(base, core.Sprintf("task-%d", input.Issue)), OK: true} case input.Branch != "": - return core.JoinPath(base, input.Branch), nil + return core.Result{Value: core.JoinPath(base, input.Branch), OK: true} case input.Tag != "": - return core.JoinPath(base, input.Tag), nil + return core.Result{Value: core.JoinPath(base, input.Tag), OK: true} default: - return "", core.E("workspaceDir", "one of issue, pr, branch, or tag is required", nil) + return core.Result{Value: core.E("workspaceDir", "one of issue, pr, branch, or tag is required", nil), OK: false} } } @@ -380,10 +400,18 @@ func (s *PrepSubsystem) prepWorkspace(ctx context.Context, _ *mcp.CallToolReques } // Resolve workspace directory from identifier - wsDir, err := workspaceDir(input.Org, input.Repo, input) - if err != nil { + wsDirResult := workspaceDirResult(input.Org, input.Repo, input) + if !wsDirResult.OK { + err, _ := wsDirResult.Value.(error) + if err == nil { + err = core.E("prepWorkspace", "workspace path not resolved", nil) + } return nil, PrepOutput{}, err } + wsDir, ok := wsDirResult.Value.(string) + if !ok || wsDir == "" { + return nil, PrepOutput{}, core.E("prepWorkspace", "invalid workspace path", nil) + } repoDir := workspaceRepoDir(wsDir) metaDir := workspaceMetaDir(wsDir) diff --git a/pkg/agentic/remote.go b/pkg/agentic/remote.go index 07259d5..856a2ca 100644 --- a/pkg/agentic/remote.go +++ b/pkg/agentic/remote.go @@ -95,23 +95,44 @@ func (s *PrepSubsystem) dispatchRemote(ctx context.Context, _ *mcp.CallToolReque url := core.Sprintf("http://%s/mcp", addr) // Step 1: Initialize session - sessionID, err := mcpInitialize(ctx, url, token) - if err != nil { + sessionResult := mcpInitializeResult(ctx, url, token) + if !sessionResult.OK { + err, _ := sessionResult.Value.(error) + if err == nil { + err = core.E("dispatchRemote", "MCP initialize failed", nil) + } return nil, RemoteDispatchOutput{ Host: input.Host, Error: core.Sprintf("init failed: %v", err), }, core.E("dispatchRemote", "MCP initialize failed", err) } - - // Step 2: Call the tool - body := []byte(core.JSONMarshalString(rpcReq)) - result, err := mcpCall(ctx, url, token, sessionID, body) - if err != nil { + sessionID, ok := sessionResult.Value.(string) + if !ok || sessionID == "" { + err := core.E("dispatchRemote", "invalid session id", nil) + return nil, RemoteDispatchOutput{ + Host: input.Host, + Error: core.Sprintf("init failed: %v", err), + }, err + } + callResult := mcpCallResult(ctx, url, token, sessionID, body) + if !callResult.OK { + err, _ := callResult.Value.(error) + if err == nil { + err = core.E("dispatchRemote", "tool call failed", nil) + } return nil, RemoteDispatchOutput{ Host: input.Host, Error: core.Sprintf("call failed: %v", err), }, core.E("dispatchRemote", "tool call failed", err) } + result, ok := callResult.Value.([]byte) + if !ok { + err := core.E("dispatchRemote", "invalid tool response", nil) + return nil, RemoteDispatchOutput{ + Host: input.Host, + Error: core.Sprintf("call failed: %v", err), + }, err + } // Parse result output := RemoteDispatchOutput{ diff --git a/pkg/agentic/remote_status.go b/pkg/agentic/remote_status.go index 96b852e..da94b71 100644 --- a/pkg/agentic/remote_status.go +++ b/pkg/agentic/remote_status.go @@ -43,8 +43,20 @@ func (s *PrepSubsystem) statusRemote(ctx context.Context, _ *mcp.CallToolRequest token := remoteToken(input.Host) url := core.Concat("http://", addr, "/mcp") - sessionID, err := mcpInitialize(ctx, url, token) - if err != nil { + sessionResult := mcpInitializeResult(ctx, url, token) + if !sessionResult.OK { + err, _ := sessionResult.Value.(error) + if err == nil { + err = core.E("statusRemote", "MCP initialize failed", nil) + } + return nil, RemoteStatusOutput{ + Host: input.Host, + Error: core.Concat("unreachable: ", err.Error()), + }, nil + } + sessionID, ok := sessionResult.Value.(string) + if !ok || sessionID == "" { + err := core.E("statusRemote", "invalid session id", nil) return nil, RemoteStatusOutput{ Host: input.Host, Error: core.Concat("unreachable: ", err.Error()), @@ -62,8 +74,20 @@ func (s *PrepSubsystem) statusRemote(ctx context.Context, _ *mcp.CallToolRequest } body := []byte(core.JSONMarshalString(rpcReq)) - result, err := mcpCall(ctx, url, token, sessionID, body) - if err != nil { + callResult := mcpCallResult(ctx, url, token, sessionID, body) + if !callResult.OK { + err, _ := callResult.Value.(error) + if err == nil { + err = core.E("statusRemote", "tool call failed", nil) + } + return nil, RemoteStatusOutput{ + Host: input.Host, + Error: core.Concat("call failed: ", err.Error()), + }, nil + } + result, ok := callResult.Value.([]byte) + if !ok { + err := core.E("statusRemote", "invalid tool response", nil) return nil, RemoteStatusOutput{ Host: input.Host, Error: core.Concat("call failed: ", err.Error()), diff --git a/pkg/agentic/status.go b/pkg/agentic/status.go index 2d75ebe..fda3498 100644 --- a/pkg/agentic/status.go +++ b/pkg/agentic/status.go @@ -57,29 +57,79 @@ type WorkspaceQuery struct { } func writeStatus(wsDir string, status *WorkspaceStatus) error { + r := writeStatusResult(wsDir, status) + if !r.OK { + err, _ := r.Value.(error) + if err == nil { + err = core.E("writeStatus", "failed to write status", nil) + } + return err + } + return nil +} + +// writeStatusResult writes status.json and returns core.Result. +// +// r := writeStatusResult("/srv/core/workspace/core/go-io/task-5", &WorkspaceStatus{Status: "running"}) +// if r.OK { return } +func writeStatusResult(wsDir string, status *WorkspaceStatus) core.Result { + if status == nil { + return core.Result{Value: core.E("writeStatus", "status is required", nil), OK: false} + } status.UpdatedAt = time.Now() statusPath := WorkspaceStatusPath(wsDir) if r := fs.WriteAtomic(statusPath, core.JSONMarshalString(status)); !r.OK { err, _ := r.Value.(error) - return core.E("writeStatus", "failed to write status", err) + if err == nil { + return core.Result{Value: core.E("writeStatus", "failed to write status", nil), OK: false} + } + return core.Result{Value: core.E("writeStatus", "failed to write status", err), OK: false} } - return nil + return core.Result{OK: true} } // ReadStatus parses the status.json in a workspace directory. // // st, err := agentic.ReadStatus("/path/to/workspace") func ReadStatus(wsDir string) (*WorkspaceStatus, error) { + r := ReadStatusResult(wsDir) + if !r.OK { + err, _ := r.Value.(error) + if err == nil { + return nil, core.E("ReadStatus", "failed to read status", nil) + } + return nil, err + } + + st, ok := r.Value.(*WorkspaceStatus) + if !ok || st == nil { + return nil, core.E("ReadStatus", "invalid status payload", nil) + } + return st, nil +} + +// ReadStatusResult parses status.json and returns a WorkspaceStatus pointer. +// +// r := ReadStatusResult("/path/to/workspace") +// if r.OK { st := r.Value.(*WorkspaceStatus) } +func ReadStatusResult(wsDir string) core.Result { r := fs.Read(WorkspaceStatusPath(wsDir)) if !r.OK { - return nil, core.E("ReadStatus", "status not found", nil) + err, _ := r.Value.(error) + if err == nil { + return core.Result{Value: core.E("ReadStatus", "status not found", nil), OK: false} + } + return core.Result{Value: core.E("ReadStatus", core.Concat("status not found for ", wsDir), err), OK: false} } var s WorkspaceStatus if ur := core.JSONUnmarshalString(r.Value.(string), &s); !ur.OK { err, _ := ur.Value.(error) - return nil, core.E("ReadStatus", "invalid status json", err) + if err == nil { + return core.Result{Value: core.E("ReadStatus", "invalid status json", nil), OK: false} + } + return core.Result{Value: core.E("ReadStatus", "invalid status json", err), OK: false} } - return &s, nil + return core.Result{Value: &s, OK: true} } // --- agentic_status tool --- diff --git a/pkg/agentic/transport.go b/pkg/agentic/transport.go index 19f8993..a15a829 100644 --- a/pkg/agentic/transport.go +++ b/pkg/agentic/transport.go @@ -213,6 +213,22 @@ func httpDo(ctx context.Context, method, url, body, token, authScheme string) co // mcpInitialize performs the MCP initialise handshake over Streamable HTTP. // Returns the session ID from the Mcp-Session-Id header. func mcpInitialize(ctx context.Context, url, token string) (string, error) { + result := mcpInitializeResult(ctx, url, token) + if !result.OK { + err, _ := result.Value.(error) + if err == nil { + return "", core.E("mcpInitialize", "failed", nil) + } + return "", err + } + sessionID, ok := result.Value.(string) + if !ok { + return "", core.E("mcpInitialize", "invalid session id result", nil) + } + return sessionID, nil +} + +func mcpInitializeResult(ctx context.Context, url, token string) core.Result { initReq := map[string]any{ "jsonrpc": "2.0", "id": 1, @@ -230,18 +246,18 @@ func mcpInitialize(ctx context.Context, url, token string) (string, error) { body := core.JSONMarshalString(initReq) req, err := http.NewRequestWithContext(ctx, "POST", url, core.NewReader(body)) if err != nil { - return "", core.E("mcpInitialize", "create request", nil) + return core.Result{Value: core.E("mcpInitialize", "create request", nil), OK: false} } mcpHeaders(req, token, "") resp, err := defaultClient.Do(req) if err != nil { - return "", core.E("mcpInitialize", "request failed", nil) + return core.Result{Value: core.E("mcpInitialize", "request failed", nil), OK: false} } defer resp.Body.Close() if resp.StatusCode != 200 { - return "", core.E("mcpInitialize", core.Sprintf("HTTP %d", resp.StatusCode), nil) + return core.Result{Value: core.E("mcpInitialize", core.Sprintf("HTTP %d", resp.StatusCode), nil), OK: false} } sessionID := resp.Header.Get("Mcp-Session-Id") @@ -261,42 +277,79 @@ func mcpInitialize(ctx context.Context, url, token string) (string, error) { notifResp.Body.Close() } - return sessionID, nil + return core.Result{Value: sessionID, OK: true} } // mcpCall sends a JSON-RPC request and returns the parsed response. func mcpCall(ctx context.Context, url, token, sessionID string, body []byte) ([]byte, error) { + result := mcpCallResult(ctx, url, token, sessionID, body) + if !result.OK { + err, _ := result.Value.(error) + if err == nil { + return nil, core.E("mcpCall", "failed", nil) + } + return nil, err + } + data, ok := result.Value.([]byte) + if !ok { + return nil, core.E("mcpCall", "invalid call result", nil) + } + return data, nil +} + +func mcpCallResult(ctx context.Context, url, token, sessionID string, body []byte) core.Result { req, err := http.NewRequestWithContext(ctx, "POST", url, core.NewReader(string(body))) if err != nil { - return nil, core.E("mcpCall", "create request", nil) + return core.Result{Value: core.E("mcpCall", "create request", nil), OK: false} } mcpHeaders(req, token, sessionID) resp, err := defaultClient.Do(req) if err != nil { - return nil, core.E("mcpCall", "request failed", nil) + return core.Result{Value: core.E("mcpCall", "request failed", nil), OK: false} } defer resp.Body.Close() if resp.StatusCode != 200 { - return nil, core.E("mcpCall", core.Sprintf("HTTP %d", resp.StatusCode), nil) + return core.Result{Value: core.E("mcpCall", core.Sprintf("HTTP %d", resp.StatusCode), nil), OK: false} } - return readSSEData(resp) + return readSSEDataResult(resp) } // readSSEData reads an SSE response and extracts JSON from data: lines. func readSSEData(resp *http.Response) ([]byte, error) { + result := readSSEDataResult(resp) + if !result.OK { + err, _ := result.Value.(error) + if err == nil { + return nil, core.E("readSSEData", "failed", nil) + } + return nil, err + } + data, ok := result.Value.([]byte) + if !ok { + return nil, core.E("readSSEData", "invalid data result", nil) + } + return data, nil +} + +// readSSEDataResult parses an SSE response and extracts the first data: payload as core.Result. +func readSSEDataResult(resp *http.Response) core.Result { r := core.ReadAll(resp.Body) if !r.OK { - return nil, core.E("readSSEData", "failed to read response", nil) + err, _ := r.Value.(error) + if err == nil { + return core.Result{Value: core.E("readSSEData", "failed to read response", nil), OK: false} + } + return core.Result{Value: core.E("readSSEData", "failed to read response", err), OK: false} } for _, line := range core.Split(r.Value.(string), "\n") { if core.HasPrefix(line, "data: ") { - return []byte(core.TrimPrefix(line, "data: ")), nil + return core.Result{Value: []byte(core.TrimPrefix(line, "data: ")), OK: true} } } - return nil, core.E("readSSEData", "no data in SSE response", nil) + return core.Result{Value: core.E("readSSEData", "no data in SSE response", nil), OK: false} } // mcpHeaders applies standard MCP HTTP headers. diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index cf7e1b5..389a775 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -278,8 +278,12 @@ func (m *Subsystem) checkIdleAfterDelay() { func (m *Subsystem) countLiveWorkspaces() (running, queued int) { for _, path := range agentic.WorkspaceStatusPaths() { wsDir := core.PathDir(path) - st, err := agentic.ReadStatus(wsDir) - if err != nil { + r := agentic.ReadStatusResult(wsDir) + if !r.OK { + continue + } + st, ok := r.Value.(*agentic.WorkspaceStatus) + if !ok || st == nil { continue } switch st.Status { diff --git a/pkg/runner/paths.go b/pkg/runner/paths.go index 52c92de..257ae06 100644 --- a/pkg/runner/paths.go +++ b/pkg/runner/paths.go @@ -30,18 +30,41 @@ func CoreRoot() string { // // st, err := runner.ReadStatus("/srv/core/workspace/core/go-io/task-5") func ReadStatus(wsDir string) (*WorkspaceStatus, error) { + r := ReadStatusResult(wsDir) + if !r.OK { + err, _ := r.Value.(error) + if err == nil { + return nil, core.E("runner.ReadStatus", "failed to read status", nil) + } + return nil, err + } + st, ok := r.Value.(*WorkspaceStatus) + if !ok || st == nil { + return nil, core.E("runner.ReadStatus", "invalid status payload", nil) + } + return st, nil +} + +// ReadStatusResult reads status.json as core.Result. +// +// result := ReadStatusResult("/srv/core/workspace/core/go-io/task-5") +// if result.OK { st := result.Value.(*WorkspaceStatus) } +func ReadStatusResult(wsDir string) core.Result { status, err := agentic.ReadStatus(wsDir) if err != nil { - return nil, core.E("runner.ReadStatus", "failed to read status", err) + return core.Result{Value: core.E("runner.ReadStatus", "failed to read status", err), OK: false} } json := core.JSONMarshalString(status) var st WorkspaceStatus if result := core.JSONUnmarshalString(json, &st); !result.OK { parseErr, _ := result.Value.(error) - return nil, core.E("runner.ReadStatus", "failed to parse status", parseErr) + if parseErr == nil { + parseErr = core.E("runner.ReadStatus", "failed to parse status", nil) + } + return core.Result{Value: parseErr, OK: false} } - return &st, nil + return core.Result{Value: &st, OK: true} } // WriteStatus writes `status.json` for one workspace directory.