fix(ax): refine workspace naming cleanup

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-03-30 21:17:33 +00:00
parent f11d1d47a1
commit e82112024c
5 changed files with 156 additions and 158 deletions

View file

@ -50,7 +50,7 @@ func (s *PrepSubsystem) registerWatchTool(server *mcp.Server) {
}, s.watch)
}
func (s *PrepSubsystem) watch(ctx context.Context, req *mcp.CallToolRequest, input WatchInput) (*mcp.CallToolResult, WatchOutput, error) {
func (s *PrepSubsystem) watch(ctx context.Context, request *mcp.CallToolRequest, input WatchInput) (*mcp.CallToolResult, WatchOutput, error) {
pollInterval := time.Duration(input.PollInterval) * time.Second
if pollInterval <= 0 {
pollInterval = 5 * time.Second
@ -63,13 +63,12 @@ func (s *PrepSubsystem) watch(ctx context.Context, req *mcp.CallToolRequest, inp
start := time.Now()
deadline := start.Add(timeout)
// Find workspaces to watch
targets := input.Workspaces
if len(targets) == 0 {
targets = s.findActiveWorkspaces()
workspaceNames := input.Workspaces
if len(workspaceNames) == 0 {
workspaceNames = s.findActiveWorkspaces()
}
if len(targets) == 0 {
if len(workspaceNames) == 0 {
return nil, WatchOutput{
Success: true,
Duration: "0s",
@ -78,26 +77,25 @@ func (s *PrepSubsystem) watch(ctx context.Context, req *mcp.CallToolRequest, inp
var completed []WatchResult
var failed []WatchResult
remaining := make(map[string]bool)
for _, ws := range targets {
remaining[ws] = true
pendingWorkspaces := make(map[string]bool)
for _, workspaceName := range workspaceNames {
pendingWorkspaces[workspaceName] = true
}
progressCount := float64(0)
total := float64(len(targets))
total := float64(len(workspaceNames))
// MCP tests and internal callers may not provide a full request envelope.
progressToken := any(nil)
if req != nil && req.Params != nil {
progressToken = req.Params.GetProgressToken()
if request != nil && request.Params != nil {
progressToken = request.Params.GetProgressToken()
}
// Poll until all complete or timeout
for len(remaining) > 0 {
for len(pendingWorkspaces) > 0 {
if time.Now().After(deadline) {
for ws := range remaining {
for workspaceName := range pendingWorkspaces {
failed = append(failed, WatchResult{
Workspace: ws,
Workspace: workspaceName,
Status: "timeout",
})
}
@ -110,74 +108,74 @@ func (s *PrepSubsystem) watch(ctx context.Context, req *mcp.CallToolRequest, inp
case <-time.After(pollInterval):
}
for ws := range remaining {
wsDir := s.resolveWorkspaceDir(ws)
result := ReadStatusResult(wsDir)
st, ok := workspaceStatusValue(result)
for workspaceName := range pendingWorkspaces {
workspaceDir := s.resolveWorkspaceDir(workspaceName)
statusResult := ReadStatusResult(workspaceDir)
workspaceStatus, ok := workspaceStatusValue(statusResult)
if !ok {
continue
}
switch st.Status {
switch workspaceStatus.Status {
case "completed":
result := WatchResult{
Workspace: ws,
Agent: st.Agent,
Repo: st.Repo,
watchResult := WatchResult{
Workspace: workspaceName,
Agent: workspaceStatus.Agent,
Repo: workspaceStatus.Repo,
Status: "completed",
PRURL: st.PRURL,
PRURL: workspaceStatus.PRURL,
}
completed = append(completed, result)
delete(remaining, ws)
completed = append(completed, watchResult)
delete(pendingWorkspaces, workspaceName)
progressCount++
if req != nil && progressToken != nil && req.Session != nil {
req.Session.NotifyProgress(ctx, &mcp.ProgressNotificationParams{
if request != nil && progressToken != nil && request.Session != nil {
request.Session.NotifyProgress(ctx, &mcp.ProgressNotificationParams{
ProgressToken: progressToken,
Progress: progressCount,
Total: total,
Message: core.Sprintf("%s completed (%s)", st.Repo, st.Agent),
Message: core.Sprintf("%s completed (%s)", workspaceStatus.Repo, workspaceStatus.Agent),
})
}
case "merged", "ready-for-review":
result := WatchResult{
Workspace: ws,
Agent: st.Agent,
Repo: st.Repo,
Status: st.Status,
PRURL: st.PRURL,
watchResult := WatchResult{
Workspace: workspaceName,
Agent: workspaceStatus.Agent,
Repo: workspaceStatus.Repo,
Status: workspaceStatus.Status,
PRURL: workspaceStatus.PRURL,
}
completed = append(completed, result)
delete(remaining, ws)
completed = append(completed, watchResult)
delete(pendingWorkspaces, workspaceName)
progressCount++
if req != nil && progressToken != nil && req.Session != nil {
req.Session.NotifyProgress(ctx, &mcp.ProgressNotificationParams{
if request != nil && progressToken != nil && request.Session != nil {
request.Session.NotifyProgress(ctx, &mcp.ProgressNotificationParams{
ProgressToken: progressToken,
Progress: progressCount,
Total: total,
Message: core.Sprintf("%s %s (%s)", st.Repo, st.Status, st.Agent),
Message: core.Sprintf("%s %s (%s)", workspaceStatus.Repo, workspaceStatus.Status, workspaceStatus.Agent),
})
}
case "failed", "blocked":
result := WatchResult{
Workspace: ws,
Agent: st.Agent,
Repo: st.Repo,
Status: st.Status,
watchResult := WatchResult{
Workspace: workspaceName,
Agent: workspaceStatus.Agent,
Repo: workspaceStatus.Repo,
Status: workspaceStatus.Status,
}
failed = append(failed, result)
delete(remaining, ws)
failed = append(failed, watchResult)
delete(pendingWorkspaces, workspaceName)
progressCount++
if req != nil && progressToken != nil && req.Session != nil {
req.Session.NotifyProgress(ctx, &mcp.ProgressNotificationParams{
if request != nil && progressToken != nil && request.Session != nil {
request.Session.NotifyProgress(ctx, &mcp.ProgressNotificationParams{
ProgressToken: progressToken,
Progress: progressCount,
Total: total,
Message: core.Sprintf("%s %s (%s)", st.Repo, st.Status, st.Agent),
Message: core.Sprintf("%s %s (%s)", workspaceStatus.Repo, workspaceStatus.Status, workspaceStatus.Agent),
})
}
}
@ -196,23 +194,23 @@ func (s *PrepSubsystem) watch(ctx context.Context, req *mcp.CallToolRequest, inp
func (s *PrepSubsystem) findActiveWorkspaces() []string {
var active []string
for _, entry := range WorkspaceStatusPaths() {
wsDir := core.PathDir(entry)
result := ReadStatusResult(wsDir)
st, ok := workspaceStatusValue(result)
workspaceDir := core.PathDir(entry)
statusResult := ReadStatusResult(workspaceDir)
workspaceStatus, ok := workspaceStatusValue(statusResult)
if !ok {
continue
}
if st.Status == "running" || st.Status == "queued" {
active = append(active, WorkspaceName(wsDir))
if workspaceStatus.Status == "running" || workspaceStatus.Status == "queued" {
active = append(active, WorkspaceName(workspaceDir))
}
}
return active
}
// resolveWorkspaceDir converts a workspace name to full path.
func (s *PrepSubsystem) resolveWorkspaceDir(name string) string {
if core.PathIsAbs(name) {
return name
func (s *PrepSubsystem) resolveWorkspaceDir(workspaceName string) string {
if core.PathIsAbs(workspaceName) {
return workspaceName
}
return core.JoinPath(WorkspaceRoot(), name)
return core.JoinPath(WorkspaceRoot(), workspaceName)
}

View file

@ -83,8 +83,8 @@ var _ coremcp.Subsystem = (*Subsystem)(nil)
// Deprecated: prefer Register with core.WithService(monitor.Register).
//
// mon.SetCore(c)
func (m *Subsystem) SetCore(c *core.Core) {
m.ServiceRuntime = core.NewServiceRuntime(c, Options{})
func (m *Subsystem) SetCore(coreApp *core.Core) {
m.ServiceRuntime = core.NewServiceRuntime(coreApp, Options{})
}
func (m *Subsystem) handleAgentStarted(ev messages.AgentStarted) {
@ -160,7 +160,7 @@ func (m *Subsystem) RegisterTools(server *mcp.Server) {
// service.Start(ctx)
func (m *Subsystem) Start(ctx context.Context) {
monitorCtx, cancel := context.WithCancel(ctx)
loopCtx, cancel := context.WithCancel(ctx)
m.cancel = cancel
core.Info("monitor: started (interval=%s)", m.interval)
@ -168,7 +168,7 @@ func (m *Subsystem) Start(ctx context.Context) {
m.wg.Add(1)
go func() {
defer m.wg.Done()
m.loop(monitorCtx)
m.loop(loopCtx)
}()
}
@ -221,8 +221,8 @@ func (m *Subsystem) countLiveWorkspaces() (running, queued int) {
runtime = m.Core()
}
for _, path := range agentic.WorkspaceStatusPaths() {
wsDir := core.PathDir(path)
statusResult := agentic.ReadStatusResult(wsDir)
workspaceDir := core.PathDir(path)
statusResult := agentic.ReadStatusResult(workspaceDir)
if !statusResult.OK {
continue
}
@ -242,8 +242,8 @@ func (m *Subsystem) countLiveWorkspaces() (running, queued int) {
return
}
func processAlive(c *core.Core, processID string, pid int) bool {
return agentic.ProcessAlive(c, processID, pid)
func processAlive(coreApp *core.Core, processID string, pid int) bool {
return agentic.ProcessAlive(coreApp, processID, pid)
}
func (m *Subsystem) loop(ctx context.Context) {
@ -273,30 +273,30 @@ func (m *Subsystem) loop(ctx context.Context) {
}
func (m *Subsystem) check(ctx context.Context) {
var messages []string
var statusMessages []string
if msg := m.checkCompletions(); msg != "" {
messages = append(messages, msg)
if statusMessage := m.checkCompletions(); statusMessage != "" {
statusMessages = append(statusMessages, statusMessage)
}
if msg := m.harvestCompleted(); msg != "" {
messages = append(messages, msg)
if statusMessage := m.harvestCompleted(); statusMessage != "" {
statusMessages = append(statusMessages, statusMessage)
}
if msg := m.checkInbox(); msg != "" {
messages = append(messages, msg)
if statusMessage := m.checkInbox(); statusMessage != "" {
statusMessages = append(statusMessages, statusMessage)
}
if msg := m.syncRepos(); msg != "" {
messages = append(messages, msg)
if statusMessage := m.syncRepos(); statusMessage != "" {
statusMessages = append(statusMessages, statusMessage)
}
if len(messages) == 0 {
if len(statusMessages) == 0 {
return
}
combined := core.Join("\n", messages...)
m.notify(ctx, combined)
combinedMessage := core.Join("\n", statusMessages...)
m.notify(ctx, combinedMessage)
if m.server != nil {
m.server.ResourceUpdated(ctx, &mcp.ResourceUpdatedNotificationParams{
@ -383,19 +383,19 @@ func (m *Subsystem) checkCompletions() string {
}
func (m *Subsystem) checkInbox() string {
apiKeyStr := monitorBrainKey()
if apiKeyStr == "" {
brainKey := monitorBrainKey()
if brainKey == "" {
return ""
}
apiURL := monitorAPIURL()
inboxURL := core.Concat(apiURL, "/v1/messages/inbox?agent=", url.QueryEscape(agentic.AgentName()))
httpResult := agentic.HTTPGet(context.Background(), inboxURL, core.Trim(apiKeyStr), "Bearer")
baseURL := monitorAPIURL()
inboxURL := core.Concat(baseURL, "/v1/messages/inbox?agent=", url.QueryEscape(agentic.AgentName()))
httpResult := agentic.HTTPGet(context.Background(), inboxURL, core.Trim(brainKey), "Bearer")
if !httpResult.OK {
return ""
}
var resp struct {
var inboxResponse struct {
Data []struct {
ID int `json:"id"`
Read bool `json:"read"`
@ -404,7 +404,7 @@ func (m *Subsystem) checkInbox() string {
Content string `json:"content"`
} `json:"data"`
}
if parseResult := core.JSONUnmarshalString(httpResult.Value.(string), &resp); !parseResult.OK {
if parseResult := core.JSONUnmarshalString(httpResult.Value.(string), &inboxResponse); !parseResult.OK {
m.debug("checkInbox: failed to decode response")
return ""
}
@ -417,15 +417,15 @@ func (m *Subsystem) checkInbox() string {
seeded := m.inboxSeeded
m.mu.Unlock()
type newMessage struct {
type inboxMessage struct {
ID int `json:"id"`
From string `json:"from"`
Subject string `json:"subject"`
Content string `json:"content"`
}
var newMessages []newMessage
var inboxMessages []inboxMessage
for _, message := range resp.Data {
for _, message := range inboxResponse.Data {
if message.ID > maxID {
maxID = message.ID
}
@ -433,7 +433,7 @@ func (m *Subsystem) checkInbox() string {
unread++
}
if message.ID > prevMaxID {
newMessages = append(newMessages, newMessage{
inboxMessages = append(inboxMessages, inboxMessage{
ID: message.ID,
From: message.From,
Subject: message.Subject,
@ -451,17 +451,17 @@ func (m *Subsystem) checkInbox() string {
return ""
}
if maxID <= prevMaxID || len(newMessages) == 0 {
if maxID <= prevMaxID || len(inboxMessages) == 0 {
return ""
}
if m.ServiceRuntime != nil {
if notifier, ok := core.ServiceFor[channelSender](m.Core(), "mcp"); ok {
for _, message := range newMessages {
for _, inboxMessage := range inboxMessages {
notifier.ChannelSend(context.Background(), "inbox.message", map[string]any{
"from": message.From,
"subject": message.Subject,
"content": message.Content,
"from": inboxMessage.From,
"subject": inboxMessage.Subject,
"content": inboxMessage.Content,
})
}
}

View file

@ -57,21 +57,21 @@ func New() *Service {
// Register is the service factory for core.WithService.
//
// core.New(core.WithService(runner.Register))
func Register(c *core.Core) core.Result {
func Register(coreApp *core.Core) core.Result {
service := New()
service.ServiceRuntime = core.NewServiceRuntime(c, Options{})
service.ServiceRuntime = core.NewServiceRuntime(coreApp, Options{})
// Load agents config
config := service.loadAgentsConfig()
c.Config().Set("agents.concurrency", config.Concurrency)
c.Config().Set("agents.rates", config.Rates)
c.Config().Set("agents.dispatch", config.Dispatch)
c.Config().Set("agents.config_path", core.JoinPath(CoreRoot(), "agents.yaml"))
coreApp.Config().Set("agents.concurrency", config.Concurrency)
coreApp.Config().Set("agents.rates", config.Rates)
coreApp.Config().Set("agents.dispatch", config.Dispatch)
coreApp.Config().Set("agents.config_path", core.JoinPath(CoreRoot(), "agents.yaml"))
codexTotal := 0
if limit, ok := config.Concurrency["codex"]; ok {
codexTotal = limit.Total
}
c.Config().Set("agents.codex_limit_debug", codexTotal)
coreApp.Config().Set("agents.codex_limit_debug", codexTotal)
return core.Result{Value: service, OK: true}
}
@ -84,21 +84,21 @@ func Register(c *core.Core) core.Result {
// ))
// c.Action("runner.status").Run(ctx, core.NewOptions())
func (s *Service) OnStartup(ctx context.Context) core.Result {
c := s.Core()
coreApp := s.Core()
// Actions — the runner's capability map
c.Action("runner.dispatch", s.actionDispatch).Description = "Dispatch a subagent (checks frozen + concurrency)"
c.Action("runner.status", s.actionStatus).Description = "Query workspace status"
c.Action("runner.start", s.actionStart).Description = "Unfreeze dispatch queue"
c.Action("runner.stop", s.actionStop).Description = "Freeze dispatch queue (graceful)"
c.Action("runner.kill", s.actionKill).Description = "Kill all running agents (hard stop)"
c.Action("runner.poke", s.actionPoke).Description = "Drain next queued task"
coreApp.Action("runner.dispatch", s.actionDispatch).Description = "Dispatch a subagent (checks frozen + concurrency)"
coreApp.Action("runner.status", s.actionStatus).Description = "Query workspace status"
coreApp.Action("runner.start", s.actionStart).Description = "Unfreeze dispatch queue"
coreApp.Action("runner.stop", s.actionStop).Description = "Freeze dispatch queue (graceful)"
coreApp.Action("runner.kill", s.actionKill).Description = "Kill all running agents (hard stop)"
coreApp.Action("runner.poke", s.actionPoke).Description = "Drain next queued task"
// Hydrate workspace registry from disk
s.hydrateWorkspaces()
// QUERY handler — workspace state queries
c.RegisterQuery(s.handleWorkspaceQuery)
coreApp.RegisterQuery(s.handleWorkspaceQuery)
// Start the background queue runner
s.startRunner()
@ -123,17 +123,17 @@ func (s *Service) OnShutdown(_ context.Context) core.Result {
// service.HandleIPCEvents(c, messages.AgentCompleted{
// Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-5", Status: "completed",
// })
func (s *Service) HandleIPCEvents(c *core.Core, msg core.Message) core.Result {
func (s *Service) HandleIPCEvents(coreApp *core.Core, msg core.Message) core.Result {
switch ev := msg.(type) {
case messages.AgentStarted:
base := baseAgent(ev.Agent)
running := s.countRunningByAgent(base)
baseAgentName := baseAgent(ev.Agent)
runningCount := s.countRunningByAgent(baseAgentName)
var limit int
configurationResult := c.Config().Get("agents.concurrency")
if configurationResult.OK {
if concurrency, ok := configurationResult.Value.(map[string]ConcurrencyLimit); ok {
if cl, has := concurrency[base]; has {
limit = cl.Total
concurrencyResult := coreApp.Config().Get("agents.concurrency")
if concurrencyResult.OK {
if concurrency, ok := concurrencyResult.Value.(map[string]ConcurrencyLimit); ok {
if concurrencyLimit, has := concurrency[baseAgentName]; has {
limit = concurrencyLimit.Total
}
}
}
@ -142,10 +142,10 @@ func (s *Service) HandleIPCEvents(c *core.Core, msg core.Message) core.Result {
Repo: ev.Repo,
Agent: ev.Agent,
Workspace: ev.Workspace,
Running: running,
Running: runningCount,
Limit: limit,
}
if notifier, ok := core.ServiceFor[channelSender](c, "mcp"); ok {
if notifier, ok := core.ServiceFor[channelSender](coreApp, "mcp"); ok {
notifier.ChannelSend(context.Background(), "agent.status", notification)
}
@ -166,14 +166,14 @@ func (s *Service) HandleIPCEvents(c *core.Core, msg core.Message) core.Result {
}
})
}
cBase := baseAgent(ev.Agent)
cRunning := s.countRunningByAgent(cBase)
var cLimit int
completionResult := c.Config().Get("agents.concurrency")
completedBaseAgentName := baseAgent(ev.Agent)
runningCount := s.countRunningByAgent(completedBaseAgentName)
var limit int
completionResult := coreApp.Config().Get("agents.concurrency")
if completionResult.OK {
if concurrency, ok := completionResult.Value.(map[string]ConcurrencyLimit); ok {
if cl, has := concurrency[cBase]; has {
cLimit = cl.Total
if concurrencyLimit, has := concurrency[completedBaseAgentName]; has {
limit = concurrencyLimit.Total
}
}
}
@ -182,10 +182,10 @@ func (s *Service) HandleIPCEvents(c *core.Core, msg core.Message) core.Result {
Repo: ev.Repo,
Agent: ev.Agent,
Workspace: ev.Workspace,
Running: cRunning,
Limit: cLimit,
Running: runningCount,
Limit: limit,
}
if notifier, ok := core.ServiceFor[channelSender](c, "mcp"); ok {
if notifier, ok := core.ServiceFor[channelSender](coreApp, "mcp"); ok {
notifier.ChannelSend(context.Background(), "agent.status", notification)
}
s.Poke()
@ -300,8 +300,8 @@ func (s *Service) actionDispatch(_ context.Context, options core.Options) core.R
}
// Reserve the slot immediately — before returning to agentic.
name := core.Concat("pending/", repo)
s.workspaces.Set(name, &WorkspaceStatus{
workspaceName := core.Concat("pending/", repo)
s.workspaces.Set(workspaceName, &WorkspaceStatus{
Status: "running",
Agent: agent,
Repo: repo,
@ -404,8 +404,8 @@ func (s *Service) hydrateWorkspaces() {
s.workspaces = core.NewRegistry[*WorkspaceStatus]()
}
for _, path := range agentic.WorkspaceStatusPaths() {
wsDir := core.PathDir(path)
statusResult := ReadStatusResult(wsDir)
workspaceDir := core.PathDir(path)
statusResult := ReadStatusResult(workspaceDir)
if !statusResult.OK {
continue
}
@ -417,7 +417,7 @@ func (s *Service) hydrateWorkspaces() {
if workspaceStatus.Status == "running" {
workspaceStatus.Status = "queued"
}
s.workspaces.Set(agentic.WorkspaceName(wsDir), workspaceStatus)
s.workspaces.Set(agentic.WorkspaceName(workspaceDir), workspaceStatus)
}
}

View file

@ -139,16 +139,16 @@ func GenerateTestConfig(projType ProjectType) core.Result {
return renderConfig("Test configuration", sections)
}
func renderConfig(comment string, sections []configSection) core.Result {
func renderConfig(headerComment string, sections []configSection) core.Result {
builder := core.NewBuilder()
if comment != "" {
if headerComment != "" {
builder.WriteString("# ")
builder.WriteString(comment)
builder.WriteString(headerComment)
builder.WriteString("\n\n")
}
for idx, section := range sections {
for sectionIndex, section := range sections {
builder.WriteString(section.Key)
builder.WriteString(":\n")
@ -169,7 +169,7 @@ func renderConfig(comment string, sections []configSection) core.Result {
builder.WriteString("\n")
}
if idx < len(sections)-1 {
if sectionIndex < len(sections)-1 {
builder.WriteString("\n")
}
}
@ -205,12 +205,12 @@ func parseGitRemote(remote string) string {
// HTTPS/HTTP URL — extract path after host
if core.Contains(remote, "://") {
parts := core.SplitN(remote, "://", 2)
if len(parts) == 2 {
rest := parts[1]
if idx := core.Split(rest, "/"); len(idx) > 1 {
schemeParts := core.SplitN(remote, "://", 2)
if len(schemeParts) == 2 {
rest := schemeParts[1]
if pathSegments := core.Split(rest, "/"); len(pathSegments) > 1 {
// Skip host, take path
pathStart := len(idx[0]) + 1
pathStart := len(pathSegments[0]) + 1
if pathStart < len(rest) {
return trimRemotePath(rest[pathStart:])
}
@ -218,9 +218,9 @@ func parseGitRemote(remote string) string {
}
}
parts := core.SplitN(remote, ":", 2)
if len(parts) == 2 && core.Contains(parts[0], "@") {
return trimRemotePath(parts[1])
pathParts := core.SplitN(remote, ":", 2)
if len(pathParts) == 2 && core.Contains(pathParts[0], "@") {
return trimRemotePath(pathParts[1])
}
if core.Contains(remote, "/") {

View file

@ -40,9 +40,9 @@ func Detect(path string) ProjectType {
{"composer.json", TypePHP},
{"package.json", TypeNode},
}
for _, c := range checks {
if fs.IsFile(core.JoinPath(base, c.file)) {
return c.projType
for _, candidate := range checks {
if fs.IsFile(core.JoinPath(base, candidate.file)) {
return candidate.projType
}
}
return TypeUnknown
@ -53,8 +53,8 @@ func Detect(path string) ProjectType {
// types := setup.DetectAll("./repo")
func DetectAll(path string) []ProjectType {
base := absolutePath(path)
var types []ProjectType
all := []struct {
var projectTypes []ProjectType
checks := []struct {
file string
projType ProjectType
}{
@ -63,12 +63,12 @@ func DetectAll(path string) []ProjectType {
{"package.json", TypeNode},
{"wails.json", TypeWails},
}
for _, c := range all {
if fs.IsFile(core.JoinPath(base, c.file)) {
types = append(types, c.projType)
for _, candidate := range checks {
if fs.IsFile(core.JoinPath(base, candidate.file)) {
projectTypes = append(projectTypes, candidate.projType)
}
}
return types
return projectTypes
}
func absolutePath(path string) string {