diff --git a/cmd/core-agent/commands.go b/cmd/core-agent/commands.go index 07ba367..2e1c15f 100644 --- a/cmd/core-agent/commands.go +++ b/cmd/core-agent/commands.go @@ -11,7 +11,7 @@ import ( ) type applicationCommandSet struct { - core *core.Core + coreApp *core.Core } // args := startupArgs() @@ -67,7 +67,7 @@ func applyLogLevel(args []string) []string { // c.Command("check", core.Command{Description: "Verify workspace, deps, and config", Action: commands.check}) // c.Command("env", core.Command{Description: "Show all core.Env() keys and values", Action: commands.env}) func registerApplicationCommands(c *core.Core) { - commands := applicationCommandSet{core: c} + commands := applicationCommandSet{coreApp: c} c.Command("version", core.Command{ Description: "Print version and build info", @@ -86,7 +86,7 @@ func registerApplicationCommands(c *core.Core) { } func (commands applicationCommandSet) version(_ core.Options) core.Result { - core.Print(nil, "core-agent %s", commands.core.App().Version) + core.Print(nil, "core-agent %s", commands.coreApp.App().Version) core.Print(nil, " go: %s", core.Env("GO")) core.Print(nil, " os: %s/%s", core.Env("OS"), core.Env("ARCH")) core.Print(nil, " home: %s", agentic.HomeDir()) @@ -97,8 +97,8 @@ func (commands applicationCommandSet) version(_ core.Options) core.Result { } func (commands applicationCommandSet) check(_ core.Options) core.Result { - fs := commands.core.Fs() - core.Print(nil, "core-agent %s health check", commands.core.App().Version) + fs := commands.coreApp.Fs() + core.Print(nil, "core-agent %s health check", commands.coreApp.App().Version) core.Print(nil, "") core.Print(nil, " binary: core-agent") @@ -117,9 +117,9 @@ func (commands applicationCommandSet) check(_ core.Options) core.Result { core.Print(nil, " workspace: %s (MISSING)", workspaceRoot) } - core.Print(nil, " services: %d registered", len(commands.core.Services())) - core.Print(nil, " actions: %d registered", len(commands.core.Actions())) - core.Print(nil, " commands: %d registered", len(commands.core.Commands())) + core.Print(nil, " services: %d registered", len(commands.coreApp.Services())) + core.Print(nil, " actions: %d registered", len(commands.coreApp.Actions())) + core.Print(nil, " commands: %d registered", len(commands.coreApp.Commands())) core.Print(nil, " env keys: %d loaded", len(core.EnvKeys())) core.Print(nil, "") core.Print(nil, "ok") diff --git a/pkg/agentic/actions.go b/pkg/agentic/actions.go index d1d2d37..c085e9f 100644 --- a/pkg/agentic/actions.go +++ b/pkg/agentic/actions.go @@ -1,9 +1,5 @@ // SPDX-License-Identifier: EUPL-1.2 -// Named Action handlers for the agentic service. -// Each handler adapts (ctx, Options) → Result to call the existing MCP tool method. -// Registered during OnStartup — the Action registry IS the capability map. -// // c.Action("agentic.dispatch").Run(ctx, options) // c.Actions() // all registered capabilities @@ -171,7 +167,6 @@ func (s *PrepSubsystem) handleComplete(ctx context.Context, options core.Options // // )) func (s *PrepSubsystem) handleQA(ctx context.Context, options core.Options) core.Result { - // Feature flag gate — skip QA if disabled if s.ServiceRuntime != nil && !s.Config().Enabled("auto-qa") { return core.Result{Value: true, OK: true} } @@ -190,7 +185,6 @@ func (s *PrepSubsystem) handleQA(ctx context.Context, options core.Options) core } } } - // Emit QA result for observability (monitor picks this up) if s.ServiceRuntime != nil { result := ReadStatusResult(workspaceDir) workspaceStatus, ok := workspaceStatusValue(result) @@ -222,7 +216,6 @@ func (s *PrepSubsystem) handleAutoPR(ctx context.Context, options core.Options) } s.autoCreatePR(workspaceDir) - // Emit PRCreated for observability if s.ServiceRuntime != nil { result := ReadStatusResult(workspaceDir) workspaceStatus, ok := workspaceStatusValue(result) @@ -253,7 +246,6 @@ func (s *PrepSubsystem) handleVerify(ctx context.Context, options core.Options) } s.autoVerifyAndMerge(workspaceDir) - // Emit merge/review events for observability if s.ServiceRuntime != nil { result := ReadStatusResult(workspaceDir) workspaceStatus, ok := workspaceStatusValue(result) @@ -291,9 +283,7 @@ func (s *PrepSubsystem) handleIngest(ctx context.Context, options core.Options) return core.Result{OK: true} } -// handlePoke drains the dispatch queue. -// -// result := c.Action("agentic.poke").Run(ctx, core.NewOptions()) +// result := c.Action("agentic.poke").Run(ctx, core.NewOptions()) func (s *PrepSubsystem) handlePoke(ctx context.Context, _ core.Options) core.Result { s.Poke() return core.Result{OK: true} @@ -315,12 +305,12 @@ func (s *PrepSubsystem) handleMirror(ctx context.Context, options core.Options) return core.Result{Value: out, OK: true} } -// handleIssueGet retrieves a forge issue. +// result := c.Action("agentic.issue.get").Run(ctx, core.NewOptions( // -// result := c.Action("agentic.issue.get").Run(ctx, core.NewOptions( -// core.Option{Key: "repo", Value: "go-io"}, -// core.Option{Key: "number", Value: "42"}, -// )) +// core.Option{Key: "repo", Value: "go-io"}, +// core.Option{Key: "number", Value: "42"}, +// +// )) func (s *PrepSubsystem) handleIssueGet(ctx context.Context, options core.Options) core.Result { return s.cmdIssueGet(options) } @@ -344,12 +334,12 @@ func (s *PrepSubsystem) handleIssueCreate(ctx context.Context, options core.Opti return s.cmdIssueCreate(options) } -// handlePRGet retrieves a forge PR. +// result := c.Action("agentic.pr.get").Run(ctx, core.NewOptions( // -// result := c.Action("agentic.pr.get").Run(ctx, core.NewOptions( -// core.Option{Key: "_arg", Value: "go-io"}, -// core.Option{Key: "number", Value: "12"}, -// )) +// core.Option{Key: "_arg", Value: "go-io"}, +// core.Option{Key: "number", Value: "12"}, +// +// )) func (s *PrepSubsystem) handlePRGet(ctx context.Context, options core.Options) core.Result { return s.cmdPRGet(options) } @@ -363,12 +353,12 @@ func (s *PrepSubsystem) handlePRList(ctx context.Context, options core.Options) return s.cmdPRList(options) } -// handlePRMerge merges a forge PR. +// result := c.Action("agentic.pr.merge").Run(ctx, core.NewOptions( // -// result := c.Action("agentic.pr.merge").Run(ctx, core.NewOptions( -// core.Option{Key: "_arg", Value: "go-io"}, -// core.Option{Key: "number", Value: "12"}, -// )) +// core.Option{Key: "_arg", Value: "go-io"}, +// core.Option{Key: "number", Value: "12"}, +// +// )) func (s *PrepSubsystem) handlePRMerge(ctx context.Context, options core.Options) core.Result { return s.cmdPRMerge(options) } @@ -410,10 +400,8 @@ func (s *PrepSubsystem) handleEpic(ctx context.Context, options core.Options) co return core.Result{Value: out, OK: true} } -// handleWorkspaceQuery answers workspace state queries from Core QUERY calls. -// -// result := c.QUERY(agentic.WorkspaceQuery{Name: "core/go-io/task-42"}) -// result := c.QUERY(agentic.WorkspaceQuery{Status: "blocked"}) +// result := c.QUERY(agentic.WorkspaceQuery{Name: "core/go-io/task-42"}) +// result := c.QUERY(agentic.WorkspaceQuery{Status: "blocked"}) func (s *PrepSubsystem) handleWorkspaceQuery(_ *core.Core, query core.Query) core.Result { workspaceQuery, ok := query.(WorkspaceQuery) if !ok { diff --git a/pkg/agentic/dispatch.go b/pkg/agentic/dispatch.go index 41a6568..322af96 100644 --- a/pkg/agentic/dispatch.go +++ b/pkg/agentic/dispatch.go @@ -304,7 +304,6 @@ func (s *PrepSubsystem) stopIssueTracking(workspaceDir string) { s.forge.Issues.StopStopwatch(context.Background(), org, workspaceStatus.Repo, int64(workspaceStatus.Issue)) } -// broadcastStart emits IPC + audit events for agent start. func (s *PrepSubsystem) broadcastStart(agent, workspaceDir string) { workspaceName := WorkspaceName(workspaceDir) result := ReadStatusResult(workspaceDir) @@ -321,7 +320,6 @@ func (s *PrepSubsystem) broadcastStart(agent, workspaceDir string) { emitStartEvent(agent, workspaceName) } -// broadcastComplete emits IPC + audit events for agent completion. func (s *PrepSubsystem) broadcastComplete(agent, workspaceDir, finalStatus string) { workspaceName := WorkspaceName(workspaceDir) emitCompletionEvent(agent, workspaceName, finalStatus) @@ -340,7 +338,6 @@ func (s *PrepSubsystem) broadcastComplete(agent, workspaceDir, finalStatus strin } func (s *PrepSubsystem) onAgentComplete(agent, workspaceDir, outputFile string, exitCode int, processStatus, output string) { - // Save output if output != "" { fs.Write(outputFile, output) } @@ -348,7 +345,6 @@ func (s *PrepSubsystem) onAgentComplete(agent, workspaceDir, outputFile string, repoDir := WorkspaceRepoDir(workspaceDir) finalStatus, question := detectFinalStatus(repoDir, exitCode, processStatus) - // Update workspace status (disk + registry) result := ReadStatusResult(workspaceDir) workspaceStatus, ok := workspaceStatusValue(result) if ok { @@ -358,14 +354,11 @@ func (s *PrepSubsystem) onAgentComplete(agent, workspaceDir, outputFile string, writeStatusResult(workspaceDir, workspaceStatus) s.TrackWorkspace(WorkspaceName(workspaceDir), workspaceStatus) - // Rate-limit tracking s.trackFailureRate(agent, finalStatus, workspaceStatus.StartedAt) } - // Forge time tracking s.stopIssueTracking(workspaceDir) - // Broadcast completion s.broadcastComplete(agent, workspaceDir, finalStatus) // Run completion pipeline via PerformAsync for successful agents. @@ -519,7 +512,6 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, callRequest *mcp.CallToolR input.Template = "coding" } - // Step 1: Prep workspace — clone + build prompt prepInput := PrepInput{ Repo: input.Repo, Org: input.Org, @@ -552,15 +544,12 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, callRequest *mcp.CallToolR }, nil } - // Step 2: Ask runner service for permission (frozen + concurrency check). - // Runner owns the gate — agentic owns the spawn. if s.ServiceRuntime != nil { dispatchResult := s.Core().Action("runner.dispatch").Run(ctx, core.NewOptions( core.Option{Key: "agent", Value: input.Agent}, core.Option{Key: "repo", Value: input.Repo}, )) if !dispatchResult.OK { - // Runner denied — queue it workspaceStatus := &WorkspaceStatus{ Status: "queued", Agent: input.Agent, @@ -585,7 +574,6 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, callRequest *mcp.CallToolR } } - // Step 3: Spawn agent in repo/ directory pid, processID, outputFile, err := s.spawnAgent(input.Agent, prompt, workspaceDir) if err != nil { return nil, DispatchOutput{}, err diff --git a/pkg/agentic/events.go b/pkg/agentic/events.go index 99927c5..2727161 100644 --- a/pkg/agentic/events.go +++ b/pkg/agentic/events.go @@ -8,10 +8,7 @@ import ( core "dappco.re/go/core" ) -// CompletionEvent is emitted when a dispatched agent finishes. -// Written to ~/.core/workspace/events.jsonl as append-only log. -// -// event := agentic.CompletionEvent{Type: "agent_completed", Agent: "codex", Workspace: "go-io-123", Status: "completed"} +// event := agentic.CompletionEvent{Type: "agent_completed", Agent: "codex", Workspace: "go-io-123", Status: "completed"} type CompletionEvent struct { Type string `json:"type"` Agent string `json:"agent"` @@ -20,7 +17,6 @@ type CompletionEvent struct { Timestamp string `json:"timestamp"` } -// emitEvent appends an event to the events log. func emitEvent(eventType, agent, workspace, status string) { eventsFile := core.JoinPath(WorkspaceRoot(), "events.jsonl") @@ -34,7 +30,6 @@ func emitEvent(eventType, agent, workspace, status string) { line := core.Concat(core.JSONMarshalString(event), "\n") - // Append to events log r := fs.Append(eventsFile) if !r.OK { return @@ -42,12 +37,10 @@ func emitEvent(eventType, agent, workspace, status string) { core.WriteAll(r.Value, line) } -// emitStartEvent logs that an agent has been spawned. func emitStartEvent(agent, workspace string) { emitEvent("agent_started", agent, workspace, "running") } -// emitCompletionEvent logs that an agent has finished. func emitCompletionEvent(agent, workspace, status string) { emitEvent("agent_completed", agent, workspace, status) } diff --git a/pkg/agentic/handlers.go b/pkg/agentic/handlers.go index fb77975..3108596 100644 --- a/pkg/agentic/handlers.go +++ b/pkg/agentic/handlers.go @@ -7,14 +7,11 @@ import ( core "dappco.re/go/core" ) -// HandleIPCEvents applies agent lifecycle messages to the prep subsystem. -// -// _ = prep.HandleIPCEvents(c, messages.AgentCompleted{Workspace: "core/go-io/task-5", Status: "completed"}) -// _ = prep.HandleIPCEvents(c, messages.SpawnQueued{Workspace: "core/go-io/task-5", Agent: "codex", Task: "fix tests"}) +// _ = prep.HandleIPCEvents(c, messages.AgentCompleted{Workspace: "core/go-io/task-5", Status: "completed"}) +// _ = prep.HandleIPCEvents(c, messages.SpawnQueued{Workspace: "core/go-io/task-5", Agent: "codex", Task: "fix tests"}) func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Result { switch ev := msg.(type) { case messages.AgentCompleted: - // Ingest findings (feature-flag gated) if c.Config().Enabled("auto-ingest") { if workspaceDir := resolveWorkspace(ev.Workspace); workspaceDir != "" { s.ingestFindings(workspaceDir) @@ -22,7 +19,6 @@ func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Res } case messages.SpawnQueued: - // Runner asks agentic to spawn a queued workspace workspaceDir := resolveWorkspace(ev.Workspace) if workspaceDir == "" { break @@ -32,7 +28,6 @@ func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Res if err != nil { break } - // Update status with real PID if result := ReadStatusResult(workspaceDir); result.OK { workspaceStatus, ok := workspaceStatusValue(result) if !ok { @@ -51,11 +46,8 @@ func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Res return core.Result{OK: true} } -// SpawnFromQueue spawns an agent in a pre-prepped workspace. -// Called by runner.Service via ServiceFor interface matching. -// -// spawnResult := prep.SpawnFromQueue("codex", prompt, workspaceDir) -// pid := spawnResult.Value.(int) +// spawnResult := prep.SpawnFromQueue("codex", prompt, workspaceDir) +// pid := spawnResult.Value.(int) func (s *PrepSubsystem) SpawnFromQueue(agent, prompt, workspaceDir string) core.Result { pid, _, _, err := s.spawnAgent(agent, prompt, workspaceDir) if err != nil { @@ -78,8 +70,6 @@ func resolveWorkspace(name string) string { return "" } -// findWorkspaceByPR finds a workspace directory by repo name and branch. -// Scans running/completed workspaces for a matching repo+branch combination. func findWorkspaceByPR(repo, branch string) string { for _, path := range WorkspaceStatusPaths() { workspaceDir := core.PathDir(path) diff --git a/pkg/agentic/queue.go b/pkg/agentic/queue.go index fa2efb4..882c147 100644 --- a/pkg/agentic/queue.go +++ b/pkg/agentic/queue.go @@ -157,7 +157,6 @@ func (s *PrepSubsystem) countRunningByAgent(agent string) int { return count } - // Fallback: scan disk (cold start before hydration) return s.countRunningByAgentDisk(runtime, agent) } @@ -195,7 +194,6 @@ func (s *PrepSubsystem) countRunningByModel(agent string) int { return count } - // Fallback: scan disk return s.countRunningByModelDisk(runtime, agent) } @@ -286,7 +284,6 @@ func (s *PrepSubsystem) drainQueue() { } for s.drainOne() { - // keep filling slots } } diff --git a/pkg/agentic/register.go b/pkg/agentic/register.go index 4779e2b..c75c71d 100644 --- a/pkg/agentic/register.go +++ b/pkg/agentic/register.go @@ -6,24 +6,22 @@ import ( core "dappco.re/go/core" ) -// c := core.New( -// core.WithService(agentic.ProcessRegister), -// core.WithService(agentic.Register), -// ) -// prep, _ := core.ServiceFor[*agentic.PrepSubsystem](c, "agentic") +// c := core.New( +// +// core.WithService(agentic.ProcessRegister), +// core.WithService(agentic.Register), +// +// ) +// prep, _ := core.ServiceFor[*agentic.PrepSubsystem](c, "agentic") func Register(c *core.Core) core.Result { subsystem := NewPrep() subsystem.ServiceRuntime = core.NewServiceRuntime(c, AgentOptions{}) - // Load agents config once into Core shared config config := subsystem.loadAgentsConfig() c.Config().Set("agents.concurrency", config.Concurrency) c.Config().Set("agents.rates", config.Rates) c.Config().Set("agents.dispatch", config.Dispatch) - // Pipeline feature flags — all enabled by default. - // Disable with c.Config().Disable("auto-qa") etc. - // // c.Config().Enabled("auto-qa") // true — run QA after completion // c.Config().Enabled("auto-pr") // true — create PR on QA pass // c.Config().Enabled("auto-merge") // true — verify + merge PR diff --git a/pkg/agentic/remote.go b/pkg/agentic/remote.go index 3bf672d..6f76446 100644 --- a/pkg/agentic/remote.go +++ b/pkg/agentic/remote.go @@ -53,13 +53,10 @@ func (s *PrepSubsystem) dispatchRemote(ctx context.Context, _ *mcp.CallToolReque return nil, RemoteDispatchOutput{}, core.E("dispatchRemote", "task is required", nil) } - // Resolve host aliases addr := resolveHost(input.Host) - // Get auth token for remote agent token := remoteToken(input.Host) - // Build the MCP JSON-RPC call to agentic_dispatch on the remote callParams := map[string]any{ "repo": input.Repo, "task": input.Task, @@ -93,7 +90,6 @@ func (s *PrepSubsystem) dispatchRemote(ctx context.Context, _ *mcp.CallToolReque url := core.Sprintf("http://%s/mcp", addr) - // Step 1: Initialize session sessionResult := mcpInitializeResult(ctx, url, token) if !sessionResult.OK { err, _ := sessionResult.Value.(error) @@ -134,7 +130,6 @@ func (s *PrepSubsystem) dispatchRemote(ctx context.Context, _ *mcp.CallToolReque }, err } - // Parse result output := RemoteDispatchOutput{ Success: true, Host: input.Host, @@ -172,7 +167,6 @@ func (s *PrepSubsystem) dispatchRemote(ctx context.Context, _ *mcp.CallToolReque // resolveHost maps friendly names to addresses. func resolveHost(host string) string { - // Known hosts aliases := map[string]string{ "charon": "10.69.69.165:9101", "cladius": "127.0.0.1:9101", @@ -183,7 +177,6 @@ func resolveHost(host string) string { return addr } - // If no port specified, add default if !core.Contains(host, ":") { return core.Concat(host, ":9101") } @@ -193,18 +186,15 @@ func resolveHost(host string) string { // remoteToken gets the auth token for a remote agent. func remoteToken(host string) string { - // Check environment first envKey := core.Sprintf("AGENT_TOKEN_%s", core.Upper(host)) if token := core.Env(envKey); token != "" { return token } - // Fallback to shared agent token if token := core.Env("MCP_AUTH_TOKEN"); token != "" { return token } - // Try reading from file home := HomeDir() tokenFiles := []string{ core.Sprintf("%s/.core/tokens/%s.token", home, core.Lower(host)), diff --git a/pkg/agentic/remote_status.go b/pkg/agentic/remote_status.go index fa1afc4..3fe9298 100644 --- a/pkg/agentic/remote_status.go +++ b/pkg/agentic/remote_status.go @@ -8,16 +8,12 @@ import ( "github.com/modelcontextprotocol/go-sdk/mcp" ) -// RemoteStatusInput queries a remote core-agent for workspace status. -// -// input := agentic.RemoteStatusInput{Host: "charon"} +// input := agentic.RemoteStatusInput{Host: "charon"} type RemoteStatusInput struct { Host string `json:"host"` // Remote agent host (e.g. "charon") } -// RemoteStatusOutput is the response from a remote status check. -// -// out := agentic.RemoteStatusOutput{Success: true, Host: "charon"} +// out := agentic.RemoteStatusOutput{Success: true, Host: "charon"} type RemoteStatusOutput struct { Success bool `json:"success"` Host string `json:"host"` diff --git a/pkg/agentic/shutdown.go b/pkg/agentic/shutdown.go index 0d7a1f3..b81319f 100644 --- a/pkg/agentic/shutdown.go +++ b/pkg/agentic/shutdown.go @@ -9,14 +9,10 @@ import ( "github.com/modelcontextprotocol/go-sdk/mcp" ) -// ShutdownInput is the input for agentic_dispatch_shutdown. -// -// input := agentic.ShutdownInput{} +// input := agentic.ShutdownInput{} type ShutdownInput struct{} -// ShutdownOutput is the output for agentic_dispatch_shutdown. -// -// out := agentic.ShutdownOutput{Success: true, Running: 3, Message: "draining"} +// out := agentic.ShutdownOutput{Success: true, Running: 3, Message: "draining"} type ShutdownOutput struct { Success bool `json:"success"` Running int `json:"running"` @@ -41,7 +37,6 @@ func (s *PrepSubsystem) registerShutdownTools(server *mcp.Server) { }, s.shutdownNow) } -// dispatchStart delegates to runner.start Action. func (s *PrepSubsystem) dispatchStart(ctx context.Context, _ *mcp.CallToolRequest, input ShutdownInput) (*mcp.CallToolResult, ShutdownOutput, error) { if s.ServiceRuntime != nil { s.Core().Action("runner.start").Run(ctx, core.NewOptions()) @@ -52,7 +47,6 @@ func (s *PrepSubsystem) dispatchStart(ctx context.Context, _ *mcp.CallToolReques }, nil } -// shutdownGraceful delegates to runner.stop Action. func (s *PrepSubsystem) shutdownGraceful(ctx context.Context, _ *mcp.CallToolRequest, input ShutdownInput) (*mcp.CallToolResult, ShutdownOutput, error) { if s.ServiceRuntime != nil { s.Core().Action("runner.stop").Run(ctx, core.NewOptions()) @@ -63,7 +57,6 @@ func (s *PrepSubsystem) shutdownGraceful(ctx context.Context, _ *mcp.CallToolReq }, nil } -// shutdownNow delegates to runner.kill Action. func (s *PrepSubsystem) shutdownNow(ctx context.Context, _ *mcp.CallToolRequest, input ShutdownInput) (*mcp.CallToolResult, ShutdownOutput, error) { if s.ServiceRuntime != nil { s.Core().Action("runner.kill").Run(ctx, core.NewOptions()) diff --git a/pkg/agentic/transport.go b/pkg/agentic/transport.go index 296bb30..43ad783 100644 --- a/pkg/agentic/transport.go +++ b/pkg/agentic/transport.go @@ -1,7 +1,5 @@ // SPDX-License-Identifier: EUPL-1.2 -// HTTP transport for Core API streams. - package agentic import ( @@ -12,10 +10,8 @@ import ( core "dappco.re/go/core" ) -// defaultClient is the shared HTTP client for all transport calls. var defaultClient = &http.Client{Timeout: 30 * time.Second} -// httpStream implements core.Stream over HTTP request/response. type httpStream struct { client *http.Client url string @@ -24,8 +20,6 @@ type httpStream struct { response []byte } -// stream := &httpStream{client: defaultClient, url: "https://forge.lthn.ai/api/v1/version", method: "GET"} -// _ = stream.Send(nil) func (s *httpStream) Send(data []byte) error { request, err := http.NewRequestWithContext(context.Background(), s.method, s.url, core.NewReader(string(data))) if err != nil { @@ -51,15 +45,10 @@ func (s *httpStream) Send(data []byte) error { return nil } -// stream := &httpStream{response: []byte(`{"ok":true}`)} -// data, _ := stream.Receive() -// _ = data func (s *httpStream) Receive() ([]byte, error) { return s.response, nil } -// stream := &httpStream{} -// _ = stream.Close() func (s *httpStream) Close() error { return nil } @@ -227,10 +216,8 @@ func mcpInitializeResult(ctx context.Context, url, token string) core.Result { sessionID := response.Header.Get("Mcp-Session-Id") - // Drain SSE response drainSSE(response) - // Send initialised notification notification := core.JSONMarshalString(map[string]any{ "jsonrpc": "2.0", "method": "notifications/initialized", diff --git a/pkg/brain/messaging.go b/pkg/brain/messaging.go index 7df400a..643a16d 100644 --- a/pkg/brain/messaging.go +++ b/pkg/brain/messaging.go @@ -29,8 +29,6 @@ func (s *DirectSubsystem) RegisterMessagingTools(server *mcp.Server) { }, s.conversation) } -// Input/Output types - // brain.SendInput{To: "charon", Subject: "status update", Content: "deploy complete"} type SendInput struct { To string `json:"to"` @@ -78,8 +76,6 @@ type ConversationOutput struct { Messages []MessageItem `json:"messages"` } -// Handlers - func (s *DirectSubsystem) sendMessage(ctx context.Context, _ *mcp.CallToolRequest, input SendInput) (*mcp.CallToolResult, SendOutput, error) { if input.To == "" || input.Content == "" { return nil, SendOutput{}, core.E("brain.sendMessage", "to and content are required", nil) @@ -112,7 +108,6 @@ func (s *DirectSubsystem) inbox(ctx context.Context, _ *mcp.CallToolRequest, inp if agent == "" { agent = agentic.AgentName() } - // Agent names are validated identifiers — no URL escaping needed. result := s.apiCall(ctx, "GET", core.Concat("/v1/messages/inbox?agent=", agent), nil) if !result.OK { err, _ := result.Value.(error) diff --git a/pkg/brain/provider.go b/pkg/brain/provider.go index 3b4d1b8..9e1a237 100644 --- a/pkg/brain/provider.go +++ b/pkg/brain/provider.go @@ -189,8 +189,6 @@ func (p *BrainProvider) Describe() []api.RouteDescription { } } -// -- Handlers ----------------------------------------------------------------- - func (p *BrainProvider) remember(c *gin.Context) { if p.bridge == nil { p.respondBridgeUnavailable(c) @@ -355,7 +353,6 @@ func (p *BrainProvider) respondBridgeError(c *gin.Context, err error) { c.JSON(statusInternalServerError, api.Fail("bridge_error", err.Error())) } -// emitEvent sends a WS event if the hub is available. func (p *BrainProvider) emitEvent(channel string, data any) { if p.hub == nil { return diff --git a/pkg/monitor/harvest.go b/pkg/monitor/harvest.go index 50a2117..18917d0 100644 --- a/pkg/monitor/harvest.go +++ b/pkg/monitor/harvest.go @@ -76,7 +76,6 @@ func (m *Subsystem) harvestWorkspace(workspaceDir string) *harvestResult { return nil } - // Only harvest completed workspaces (not merged, running, etc.) if workspaceStatus.Status != "completed" { return nil } @@ -86,7 +85,6 @@ func (m *Subsystem) harvestWorkspace(workspaceDir string) *harvestResult { return nil } - // Check if there are commits ahead of the default branch branch := workspaceStatus.Branch if branch == "" { branch = m.detectBranch(repoDir) @@ -96,24 +94,18 @@ func (m *Subsystem) harvestWorkspace(workspaceDir string) *harvestResult { return nil } - // Check for unpushed commits unpushed := m.countUnpushed(repoDir, branch) if unpushed == 0 { - return nil // already on origin or no commits + return nil } - // Safety checks before marking ready-for-review if reason := m.checkSafety(repoDir); reason != "" { updateStatus(workspaceDir, "rejected", reason) return &harvestResult{repo: workspaceStatus.Repo, branch: branch, rejected: reason} } - // Count changed files files := m.countChangedFiles(repoDir) - // Mark ready for review — do NOT auto-push. - // Pushing is a high-impact mutation that should happen during - // explicit review (/review command), not silently in the background. updateStatus(workspaceDir, "ready-for-review", "") return &harvestResult{repo: workspaceStatus.Repo, branch: branch, files: files} diff --git a/pkg/runner/queue.go b/pkg/runner/queue.go index 31a2075..8312a15 100644 --- a/pkg/runner/queue.go +++ b/pkg/runner/queue.go @@ -196,7 +196,6 @@ func (s *Service) drainQueue() { defer s.drainMu.Unlock() for s.drainOne() { - // keep filling slots } } diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 97cb99c..2f830ed 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -50,7 +50,6 @@ func Register(coreApp *core.Core) core.Result { service := New() service.ServiceRuntime = core.NewServiceRuntime(coreApp, Options{}) - // Load agents config config := service.loadAgentsConfig() coreApp.Config().Set("agents.concurrency", config.Concurrency) coreApp.Config().Set("agents.rates", config.Rates) @@ -133,7 +132,6 @@ func (s *Service) HandleIPCEvents(coreApp *core.Core, msg core.Message) core.Res } case messages.AgentCompleted: - // Update workspace status in Registry so concurrency count drops if ev.Workspace != "" { if workspaceResult := s.workspaces.Get(ev.Workspace); workspaceResult.OK { if workspaceStatus, ok := workspaceResult.Value.(*WorkspaceStatus); ok && workspaceStatus.Status == "running" { @@ -219,7 +217,6 @@ func (s *Service) TrackWorkspace(name string, status any) { return } s.workspaces.Set(name, workspaceStatus) - // Remove pending reservation now that the real workspace is tracked s.workspaces.Delete(core.Concat("pending/", workspaceStatus.Repo)) } @@ -386,14 +383,7 @@ func (s *Service) hydrateWorkspaces() { } } -// AgentNotification is the channel payload sent on `agent.status`. -// -// n := runner.AgentNotification{ -// Status: "started", Repo: "go-io", Agent: "codex", Workspace: "core/go-io/task-5", Running: 1, Limit: 2, -// } -// -// Field order is guaranteed by json tags so truncated notifications still show -// status and repo first. +// notification := runner.AgentNotification{Status: "started", Repo: "go-io", Agent: "codex", Workspace: "core/go-io/task-5", Running: 1, Limit: 2} type AgentNotification struct { Status string `json:"status"` Repo string `json:"repo"` diff --git a/pkg/setup/setup.go b/pkg/setup/setup.go index cd09981..86a3f5d 100644 --- a/pkg/setup/setup.go +++ b/pkg/setup/setup.go @@ -48,12 +48,10 @@ func (s *Service) Run(options Options) core.Result { } } - // Generate .core/ config files if result := setupCoreDir(options, projectType); !result.OK { return result } - // Scaffold from dir template if requested if templateName != "" { return s.scaffoldTemplate(options, projectType, templateName) } @@ -78,7 +76,6 @@ func setupCoreDir(options Options, projectType ProjectType) core.Result { } } - // build.yaml buildConfig := GenerateBuildConfig(options.Path, projectType) if !buildConfig.OK { err, _ := buildConfig.Value.(error) @@ -91,7 +88,6 @@ func setupCoreDir(options Options, projectType ProjectType) core.Result { return result } - // test.yaml testConfig := GenerateTestConfig(projectType) if !testConfig.OK { err, _ := testConfig.Value.(error)