Compare commits
3 commits
d305663573
...
b5873a8f31
| Author | SHA1 | Date | |
|---|---|---|---|
| b5873a8f31 | |||
|
|
53acf4000d | ||
|
|
04e3d492e9 |
15 changed files with 203 additions and 83 deletions
|
|
@ -13,7 +13,7 @@ import (
|
|||
// autoCreatePR pushes the agent's branch and creates a PR on Forge
|
||||
// if the agent made any commits beyond the initial clone.
|
||||
func (s *PrepSubsystem) autoCreatePR(wsDir string) {
|
||||
st, err := readStatus(wsDir)
|
||||
st, err := ReadStatus(wsDir)
|
||||
if err != nil || st.Branch == "" || st.Repo == "" {
|
||||
return
|
||||
}
|
||||
|
|
@ -44,7 +44,7 @@ func (s *PrepSubsystem) autoCreatePR(wsDir string) {
|
|||
pushCmd.Dir = repoDir
|
||||
if pushErr := pushCmd.Run(); pushErr != nil {
|
||||
// Push failed — update status with error but don't block
|
||||
if st2, err := readStatus(wsDir); err == nil {
|
||||
if st2, err := ReadStatus(wsDir); err == nil {
|
||||
st2.Question = core.Sprintf("PR push failed: %v", pushErr)
|
||||
writeStatus(wsDir, st2)
|
||||
}
|
||||
|
|
@ -60,7 +60,7 @@ func (s *PrepSubsystem) autoCreatePR(wsDir string) {
|
|||
|
||||
prURL, _, err := s.forgeCreatePR(ctx, org, st.Repo, st.Branch, base, title, body)
|
||||
if err != nil {
|
||||
if st2, err := readStatus(wsDir); err == nil {
|
||||
if st2, err := ReadStatus(wsDir); err == nil {
|
||||
st2.Question = core.Sprintf("PR creation failed: %v", err)
|
||||
writeStatus(wsDir, st2)
|
||||
}
|
||||
|
|
@ -68,7 +68,7 @@ func (s *PrepSubsystem) autoCreatePR(wsDir string) {
|
|||
}
|
||||
|
||||
// Update status with PR URL
|
||||
if st2, err := readStatus(wsDir); err == nil {
|
||||
if st2, err := ReadStatus(wsDir); err == nil {
|
||||
st2.PRURL = prURL
|
||||
writeStatus(wsDir, st2)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -156,7 +156,7 @@ func containerCommand(agentType, command string, args []string, repoDir, metaDir
|
|||
"-v", metaDir + ":/workspace/.meta",
|
||||
"-w", "/workspace",
|
||||
// Auth: agent configs only — NO SSH keys, git push runs on host
|
||||
"-v", core.JoinPath(home, ".codex") + ":/root/.codex:ro",
|
||||
"-v", core.JoinPath(home, ".codex") + ":/home/dev/.codex:ro",
|
||||
// API keys — passed by name, Docker resolves from host env
|
||||
"-e", "OPENAI_API_KEY",
|
||||
"-e", "ANTHROPIC_API_KEY",
|
||||
|
|
@ -175,14 +175,14 @@ func containerCommand(agentType, command string, args []string, repoDir, metaDir
|
|||
// Mount Claude config if dispatching claude agent
|
||||
if command == "claude" {
|
||||
dockerArgs = append(dockerArgs,
|
||||
"-v", core.JoinPath(home, ".claude")+":/root/.claude:ro",
|
||||
"-v", core.JoinPath(home, ".claude")+":/home/dev/.claude:ro",
|
||||
)
|
||||
}
|
||||
|
||||
// Mount Gemini config if dispatching gemini agent
|
||||
if command == "gemini" {
|
||||
dockerArgs = append(dockerArgs,
|
||||
"-v", core.JoinPath(home, ".gemini")+":/root/.gemini:ro",
|
||||
"-v", core.JoinPath(home, ".gemini")+":/home/dev/.gemini:ro",
|
||||
)
|
||||
}
|
||||
|
||||
|
|
@ -228,7 +228,7 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er
|
|||
|
||||
// Notify monitor directly — no filesystem polling
|
||||
if s.onComplete != nil {
|
||||
st, _ := readStatus(wsDir)
|
||||
st, _ := ReadStatus(wsDir)
|
||||
repo := ""
|
||||
if st != nil {
|
||||
repo = st.Repo
|
||||
|
|
@ -238,7 +238,7 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er
|
|||
emitStartEvent(agent, core.PathBase(wsDir)) // audit log
|
||||
|
||||
// Start Forge stopwatch on the issue (time tracking)
|
||||
if st, _ := readStatus(wsDir); st != nil && st.Issue > 0 {
|
||||
if st, _ := ReadStatus(wsDir); st != nil && st.Issue > 0 {
|
||||
org := st.Org
|
||||
if org == "" {
|
||||
org = "core"
|
||||
|
|
@ -281,7 +281,7 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er
|
|||
}
|
||||
}
|
||||
|
||||
if st, stErr := readStatus(wsDir); stErr == nil {
|
||||
if st, stErr := ReadStatus(wsDir); stErr == nil {
|
||||
st.Status = finalStatus
|
||||
st.PID = 0
|
||||
st.Question = question
|
||||
|
|
@ -293,7 +293,7 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er
|
|||
// Rate-limit detection: if agent failed fast (<60s), track consecutive failures
|
||||
pool := baseAgent(agent)
|
||||
if finalStatus == "failed" {
|
||||
if st, _ := readStatus(wsDir); st != nil {
|
||||
if st, _ := ReadStatus(wsDir); st != nil {
|
||||
elapsed := time.Since(st.StartedAt)
|
||||
if elapsed < 60*time.Second {
|
||||
s.failCount[pool]++
|
||||
|
|
@ -310,7 +310,7 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er
|
|||
}
|
||||
|
||||
// Stop Forge stopwatch on the issue (time tracking)
|
||||
if st, _ := readStatus(wsDir); st != nil && st.Issue > 0 {
|
||||
if st, _ := ReadStatus(wsDir); st != nil && st.Issue > 0 {
|
||||
org := st.Org
|
||||
if org == "" {
|
||||
org = "core"
|
||||
|
|
@ -320,7 +320,7 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er
|
|||
|
||||
// Push notification directly — no filesystem polling
|
||||
if s.onComplete != nil {
|
||||
stNow, _ := readStatus(wsDir)
|
||||
stNow, _ := ReadStatus(wsDir)
|
||||
repoName := ""
|
||||
if stNow != nil {
|
||||
repoName = stNow.Repo
|
||||
|
|
@ -333,7 +333,7 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er
|
|||
if !s.runQA(wsDir) {
|
||||
finalStatus = "failed"
|
||||
question = "QA check failed — build or tests did not pass"
|
||||
if st, stErr := readStatus(wsDir); stErr == nil {
|
||||
if st, stErr := ReadStatus(wsDir); stErr == nil {
|
||||
st.Status = finalStatus
|
||||
st.Question = question
|
||||
writeStatus(wsDir, st)
|
||||
|
|
|
|||
|
|
@ -82,7 +82,7 @@ func (s *PrepSubsystem) DispatchSync(ctx context.Context, input DispatchSyncInpu
|
|||
case <-ticker.C:
|
||||
if pid > 0 && syscall.Kill(pid, 0) != nil {
|
||||
// Process exited — read final status
|
||||
st, err := readStatus(wsDir)
|
||||
st, err := ReadStatus(wsDir)
|
||||
if err != nil {
|
||||
return DispatchSyncResult{Error: "can't read final status"}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ import (
|
|||
// ingestFindings reads the agent output log and creates issues via the API
|
||||
// for scan/audit results. Only runs for conventions and security templates.
|
||||
func (s *PrepSubsystem) ingestFindings(wsDir string) {
|
||||
st, err := readStatus(wsDir)
|
||||
st, err := ReadStatus(wsDir)
|
||||
if err != nil || st.Status != "completed" {
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ func (s *PrepSubsystem) createPR(ctx context.Context, _ *mcp.CallToolRequest, in
|
|||
}
|
||||
|
||||
// Read workspace status for repo, branch, issue context
|
||||
st, err := readStatus(wsDir)
|
||||
st, err := ReadStatus(wsDir)
|
||||
if err != nil {
|
||||
return nil, CreatePROutput{}, core.E("createPR", "no status.json", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,14 +32,49 @@ type RateConfig struct {
|
|||
BurstDelay int `yaml:"burst_delay"` // Delay during burst window
|
||||
}
|
||||
|
||||
// ConcurrencyLimit supports both flat (int) and nested (map with total + per-model) formats.
|
||||
//
|
||||
// claude: 1 → Total=1, Models=nil
|
||||
// codex: → Total=2, Models={"gpt-5.4": 1, "gpt-5.3-codex-spark": 1}
|
||||
// total: 2
|
||||
// gpt-5.4: 1
|
||||
// gpt-5.3-codex-spark: 1
|
||||
type ConcurrencyLimit struct {
|
||||
Total int
|
||||
Models map[string]int
|
||||
}
|
||||
|
||||
// UnmarshalYAML handles both int and map forms.
|
||||
func (c *ConcurrencyLimit) UnmarshalYAML(value *yaml.Node) error {
|
||||
// Try int first
|
||||
var n int
|
||||
if err := value.Decode(&n); err == nil {
|
||||
c.Total = n
|
||||
return nil
|
||||
}
|
||||
// Try map
|
||||
var m map[string]int
|
||||
if err := value.Decode(&m); err != nil {
|
||||
return err
|
||||
}
|
||||
c.Total = m["total"]
|
||||
c.Models = make(map[string]int)
|
||||
for k, v := range m {
|
||||
if k != "total" {
|
||||
c.Models[k] = v
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AgentsConfig is the root of config/agents.yaml.
|
||||
//
|
||||
// cfg := agentic.AgentsConfig{Version: 1, Dispatch: agentic.DispatchConfig{DefaultAgent: "claude"}}
|
||||
type AgentsConfig struct {
|
||||
Version int `yaml:"version"`
|
||||
Dispatch DispatchConfig `yaml:"dispatch"`
|
||||
Concurrency map[string]int `yaml:"concurrency"`
|
||||
Rates map[string]RateConfig `yaml:"rates"`
|
||||
Version int `yaml:"version"`
|
||||
Dispatch DispatchConfig `yaml:"dispatch"`
|
||||
Concurrency map[string]ConcurrencyLimit `yaml:"concurrency"`
|
||||
Rates map[string]RateConfig `yaml:"rates"`
|
||||
}
|
||||
|
||||
// loadAgentsConfig reads config/agents.yaml from the code path.
|
||||
|
|
@ -66,9 +101,9 @@ func (s *PrepSubsystem) loadAgentsConfig() *AgentsConfig {
|
|||
DefaultAgent: "claude",
|
||||
DefaultTemplate: "coding",
|
||||
},
|
||||
Concurrency: map[string]int{
|
||||
"claude": 1,
|
||||
"gemini": 3,
|
||||
Concurrency: map[string]ConcurrencyLimit{
|
||||
"claude": {Total: 1},
|
||||
"gemini": {Total: 3},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
@ -126,7 +161,7 @@ func (s *PrepSubsystem) countRunningByAgent(agent string) int {
|
|||
|
||||
count := 0
|
||||
for _, statusPath := range paths {
|
||||
st, err := readStatus(core.PathDir(statusPath))
|
||||
st, err := ReadStatus(core.PathDir(statusPath))
|
||||
if err != nil || st.Status != "running" {
|
||||
continue
|
||||
}
|
||||
|
|
@ -142,6 +177,28 @@ func (s *PrepSubsystem) countRunningByAgent(agent string) int {
|
|||
return count
|
||||
}
|
||||
|
||||
// countRunningByModel counts running workspaces for a specific agent:model string.
|
||||
func (s *PrepSubsystem) countRunningByModel(agent string) int {
|
||||
wsRoot := WorkspaceRoot()
|
||||
old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json"))
|
||||
deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json"))
|
||||
|
||||
count := 0
|
||||
for _, statusPath := range append(old, deep...) {
|
||||
st, err := ReadStatus(core.PathDir(statusPath))
|
||||
if err != nil || st.Status != "running" {
|
||||
continue
|
||||
}
|
||||
if st.Agent != agent {
|
||||
continue
|
||||
}
|
||||
if st.PID > 0 && syscall.Kill(st.PID, 0) == nil {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
// baseAgent strips the model variant (gemini:flash → gemini).
|
||||
func baseAgent(agent string) string {
|
||||
// codex:gpt-5.3-codex-spark → codex-spark (separate pool)
|
||||
|
|
@ -151,15 +208,48 @@ func baseAgent(agent string) string {
|
|||
return core.SplitN(agent, ":", 2)[0]
|
||||
}
|
||||
|
||||
// canDispatchAgent checks if we're under the concurrency limit for a specific agent type.
|
||||
// canDispatchAgent checks both pool-level and per-model concurrency limits.
|
||||
//
|
||||
// codex: {total: 2, models: {gpt-5.4: 1}} → max 2 codex total, max 1 gpt-5.4
|
||||
func (s *PrepSubsystem) canDispatchAgent(agent string) bool {
|
||||
cfg := s.loadAgentsConfig()
|
||||
base := baseAgent(agent)
|
||||
limit, ok := cfg.Concurrency[base]
|
||||
if !ok || limit <= 0 {
|
||||
if !ok || limit.Total <= 0 {
|
||||
return true
|
||||
}
|
||||
return s.countRunningByAgent(base) < limit
|
||||
|
||||
// Check pool total
|
||||
if s.countRunningByAgent(base) >= limit.Total {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check per-model limit if configured
|
||||
if limit.Models != nil {
|
||||
model := modelVariant(agent)
|
||||
if model != "" {
|
||||
if modelLimit, has := limit.Models[model]; has && modelLimit > 0 {
|
||||
if s.countRunningByModel(agent) >= modelLimit {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// modelVariant extracts the model name from an agent string.
|
||||
//
|
||||
// codex:gpt-5.4 → gpt-5.4
|
||||
// codex:gpt-5.3-codex-spark → gpt-5.3-codex-spark
|
||||
// claude → ""
|
||||
func modelVariant(agent string) string {
|
||||
parts := core.SplitN(agent, ":", 2)
|
||||
if len(parts) < 2 {
|
||||
return ""
|
||||
}
|
||||
return parts[1]
|
||||
}
|
||||
|
||||
// drainQueue fills all available concurrency slots from queued workspaces.
|
||||
|
|
@ -188,7 +278,7 @@ func (s *PrepSubsystem) drainOne() bool {
|
|||
|
||||
for _, statusPath := range statusFiles {
|
||||
wsDir := core.PathDir(statusPath)
|
||||
st, err := readStatus(wsDir)
|
||||
st, err := ReadStatus(wsDir)
|
||||
if err != nil || st.Status != "queued" {
|
||||
continue
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,8 +26,8 @@ func TestDispatchConfig_Good_Defaults(t *testing.T) {
|
|||
cfg := s.loadAgentsConfig()
|
||||
assert.Equal(t, "claude", cfg.Dispatch.DefaultAgent)
|
||||
assert.Equal(t, "coding", cfg.Dispatch.DefaultTemplate)
|
||||
assert.Equal(t, 1, cfg.Concurrency["claude"])
|
||||
assert.Equal(t, 3, cfg.Concurrency["gemini"])
|
||||
assert.Equal(t, 1, cfg.Concurrency["claude"].Total)
|
||||
assert.Equal(t, 3, cfg.Concurrency["gemini"].Total)
|
||||
}
|
||||
|
||||
func TestCanDispatchAgent_Good_NoConfig(t *testing.T) {
|
||||
|
|
|
|||
|
|
@ -52,7 +52,7 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu
|
|||
}
|
||||
|
||||
// Read current status
|
||||
st, err := readStatus(wsDir)
|
||||
st, err := ReadStatus(wsDir)
|
||||
if err != nil {
|
||||
return nil, ResumeOutput{}, core.E("resume", "no status.json in workspace", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -81,7 +81,7 @@ func (s *PrepSubsystem) shutdownNow(ctx context.Context, _ *mcp.CallToolRequest,
|
|||
|
||||
for _, statusPath := range statusFiles {
|
||||
wsDir := core.PathDir(statusPath)
|
||||
st, err := readStatus(wsDir)
|
||||
st, err := ReadStatus(wsDir)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ import (
|
|||
|
||||
// WorkspaceStatus represents the current state of an agent workspace.
|
||||
//
|
||||
// st, err := readStatus(wsDir)
|
||||
// st, err := ReadStatus(wsDir)
|
||||
// if err == nil && st.Status == "completed" { autoCreatePR(wsDir) }
|
||||
type WorkspaceStatus struct {
|
||||
Status string `json:"status"` // running, completed, blocked, failed
|
||||
|
|
@ -58,10 +58,13 @@ func writeStatus(wsDir string, status *WorkspaceStatus) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func readStatus(wsDir string) (*WorkspaceStatus, error) {
|
||||
// ReadStatus parses the status.json in a workspace directory.
|
||||
//
|
||||
// st, err := agentic.ReadStatus("/path/to/workspace")
|
||||
func ReadStatus(wsDir string) (*WorkspaceStatus, error) {
|
||||
r := fs.Read(core.JoinPath(wsDir, "status.json"))
|
||||
if !r.OK {
|
||||
return nil, core.E("readStatus", "status not found", nil)
|
||||
return nil, core.E("ReadStatus", "status not found", nil)
|
||||
}
|
||||
var s WorkspaceStatus
|
||||
if err := json.Unmarshal([]byte(r.Value.(string)), &s); err != nil {
|
||||
|
|
@ -125,7 +128,7 @@ func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, inpu
|
|||
wsDir := core.PathDir(statusPath)
|
||||
name := wsDir[len(wsRoot)+1:]
|
||||
|
||||
st, err := readStatus(wsDir)
|
||||
st, err := ReadStatus(wsDir)
|
||||
if err != nil {
|
||||
out.Total++
|
||||
out.Failed++
|
||||
|
|
|
|||
|
|
@ -77,7 +77,7 @@ func TestReadStatus_Good(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.True(t, fs.Write(filepath.Join(dir, "status.json"), string(data)).OK)
|
||||
|
||||
read, err := readStatus(dir)
|
||||
read, err := ReadStatus(dir)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, "completed", read.Status)
|
||||
|
|
@ -91,7 +91,7 @@ func TestReadStatus_Good(t *testing.T) {
|
|||
|
||||
func TestReadStatus_Bad_NoFile(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
_, err := readStatus(dir)
|
||||
_, err := ReadStatus(dir)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
|
|
@ -99,7 +99,7 @@ func TestReadStatus_Bad_InvalidJSON(t *testing.T) {
|
|||
dir := t.TempDir()
|
||||
require.True(t, fs.Write(filepath.Join(dir, "status.json"), "not json{").OK)
|
||||
|
||||
_, err := readStatus(dir)
|
||||
_, err := ReadStatus(dir)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
|
|
@ -117,7 +117,7 @@ func TestReadStatus_Good_BlockedWithQuestion(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.True(t, fs.Write(filepath.Join(dir, "status.json"), string(data)).OK)
|
||||
|
||||
read, err := readStatus(dir)
|
||||
read, err := ReadStatus(dir)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, "blocked", read.Status)
|
||||
|
|
@ -143,7 +143,7 @@ func TestWriteReadStatus_Good_Roundtrip(t *testing.T) {
|
|||
err := writeStatus(dir, original)
|
||||
require.NoError(t, err)
|
||||
|
||||
read, err := readStatus(dir)
|
||||
read, err := ReadStatus(dir)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, original.Status, read.Status)
|
||||
|
|
@ -168,7 +168,7 @@ func TestWriteStatus_Good_OverwriteExisting(t *testing.T) {
|
|||
err = writeStatus(dir, second)
|
||||
require.NoError(t, err)
|
||||
|
||||
read, err := readStatus(dir)
|
||||
read, err := ReadStatus(dir)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "completed", read.Status)
|
||||
}
|
||||
|
|
@ -177,6 +177,6 @@ func TestReadStatus_Ugly_EmptyFile(t *testing.T) {
|
|||
dir := t.TempDir()
|
||||
require.True(t, fs.Write(filepath.Join(dir, "status.json"), "").OK)
|
||||
|
||||
_, err := readStatus(dir)
|
||||
_, err := ReadStatus(dir)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ import (
|
|||
//
|
||||
// agentic_dispatch repo=go-crypt template=verify persona=engineering/engineering-security-engineer
|
||||
func (s *PrepSubsystem) autoVerifyAndMerge(wsDir string) {
|
||||
st, err := readStatus(wsDir)
|
||||
st, err := ReadStatus(wsDir)
|
||||
if err != nil || st.PRURL == "" || st.Repo == "" {
|
||||
return
|
||||
}
|
||||
|
|
@ -40,7 +40,7 @@ func (s *PrepSubsystem) autoVerifyAndMerge(wsDir string) {
|
|||
|
||||
// markMerged is a helper to avoid repeating the status update.
|
||||
markMerged := func() {
|
||||
if st2, err := readStatus(wsDir); err == nil {
|
||||
if st2, err := ReadStatus(wsDir); err == nil {
|
||||
st2.Status = "merged"
|
||||
writeStatus(wsDir, st2)
|
||||
}
|
||||
|
|
@ -66,7 +66,7 @@ func (s *PrepSubsystem) autoVerifyAndMerge(wsDir string) {
|
|||
// Both attempts failed — flag for human review
|
||||
s.flagForReview(org, st.Repo, prNum, result)
|
||||
|
||||
if st2, err := readStatus(wsDir); err == nil {
|
||||
if st2, err := ReadStatus(wsDir); err == nil {
|
||||
st2.Question = "Flagged for review — auto-merge failed after retry"
|
||||
writeStatus(wsDir, st2)
|
||||
}
|
||||
|
|
@ -129,7 +129,7 @@ func (s *PrepSubsystem) rebaseBranch(repoDir, branch string) bool {
|
|||
}
|
||||
|
||||
// Force-push the rebased branch to Forge (origin is local clone)
|
||||
st, _ := readStatus(core.PathDir(repoDir))
|
||||
st, _ := ReadStatus(core.PathDir(repoDir))
|
||||
org := "core"
|
||||
repo := ""
|
||||
if st != nil {
|
||||
|
|
|
|||
|
|
@ -109,7 +109,7 @@ func (s *PrepSubsystem) watch(ctx context.Context, req *mcp.CallToolRequest, inp
|
|||
|
||||
for ws := range remaining {
|
||||
wsDir := s.resolveWorkspaceDir(ws)
|
||||
st, err := readStatus(wsDir)
|
||||
st, err := ReadStatus(wsDir)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
|
@ -196,7 +196,7 @@ func (s *PrepSubsystem) findActiveWorkspaces() []string {
|
|||
var active []string
|
||||
for _, entry := range entries {
|
||||
wsDir := core.PathDir(entry)
|
||||
st, err := readStatus(wsDir)
|
||||
st, err := ReadStatus(wsDir)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"dappco.re/go/agent/pkg/agentic"
|
||||
|
|
@ -231,7 +232,7 @@ func (m *Subsystem) AgentStarted(agent, repo, workspace string) {
|
|||
}
|
||||
|
||||
// AgentCompleted is called when an agent finishes.
|
||||
// Only sends notifications for failures. Sends "queue.drained" when all work is done.
|
||||
// Emits agent.completed for every finish, then checks if the queue is empty.
|
||||
//
|
||||
// mon.AgentCompleted("codex", "go-io", "core/go-io/task-5", "completed")
|
||||
func (m *Subsystem) AgentCompleted(agent, repo, workspace, status string) {
|
||||
|
|
@ -240,53 +241,68 @@ func (m *Subsystem) AgentCompleted(agent, repo, workspace, status string) {
|
|||
m.mu.Unlock()
|
||||
|
||||
if m.notifier != nil {
|
||||
// Only notify on failures — those need attention
|
||||
if status == "failed" || status == "blocked" {
|
||||
m.notifier.ChannelSend(context.Background(), "agent.failed", map[string]any{
|
||||
"repo": repo,
|
||||
"agent": agent,
|
||||
"status": status,
|
||||
})
|
||||
}
|
||||
m.notifier.ChannelSend(context.Background(), "agent.completed", map[string]any{
|
||||
"repo": repo,
|
||||
"agent": agent,
|
||||
"workspace": workspace,
|
||||
"status": status,
|
||||
})
|
||||
}
|
||||
|
||||
// Check if queue is drained (0 running + 0 queued)
|
||||
m.Poke()
|
||||
go m.checkIdleAfterDelay()
|
||||
}
|
||||
|
||||
// checkIdleAfterDelay waits briefly then checks if the fleet is idle.
|
||||
// Sends a single "queue.drained" notification when all work stops.
|
||||
// checkIdleAfterDelay waits briefly then checks if the fleet is genuinely idle.
|
||||
// Only emits queue.drained when there are truly zero running or queued agents,
|
||||
// verified by checking PIDs are alive, not just trusting status files.
|
||||
func (m *Subsystem) checkIdleAfterDelay() {
|
||||
time.Sleep(5 * time.Second) // wait for runner to fill slots
|
||||
time.Sleep(5 * time.Second) // wait for queue drain to fill slots
|
||||
if m.notifier == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Quick count — scan for running/queued
|
||||
running := 0
|
||||
queued := 0
|
||||
running, queued := m.countLiveWorkspaces()
|
||||
if running == 0 && queued == 0 {
|
||||
m.notifier.ChannelSend(context.Background(), "queue.drained", map[string]any{
|
||||
"running": running,
|
||||
"queued": queued,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// countLiveWorkspaces counts workspaces that are genuinely active.
|
||||
// For "running" status, verifies the PID is still alive.
|
||||
func (m *Subsystem) countLiveWorkspaces() (running, queued int) {
|
||||
wsRoot := agentic.WorkspaceRoot()
|
||||
old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json"))
|
||||
deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json"))
|
||||
for _, path := range append(old, deep...) {
|
||||
r := fs.Read(path)
|
||||
if !r.OK {
|
||||
wsDir := core.PathDir(path)
|
||||
st, err := agentic.ReadStatus(wsDir)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
s := r.Value.(string)
|
||||
if core.Contains(s, `"status":"running"`) {
|
||||
running++
|
||||
} else if core.Contains(s, `"status":"queued"`) {
|
||||
switch st.Status {
|
||||
case "running":
|
||||
if st.PID > 0 && pidAlive(st.PID) {
|
||||
running++
|
||||
}
|
||||
case "queued":
|
||||
queued++
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if running == 0 && queued == 0 {
|
||||
m.notifier.ChannelSend(context.Background(), "queue.drained", map[string]any{
|
||||
"message": "all work complete",
|
||||
})
|
||||
// pidAlive checks whether a process is still running.
|
||||
func pidAlive(pid int) bool {
|
||||
proc, err := os.FindProcess(pid)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
err = proc.Signal(syscall.Signal(0))
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func (m *Subsystem) loop(ctx context.Context) {
|
||||
|
|
@ -430,11 +446,20 @@ func (m *Subsystem) checkCompletions() string {
|
|||
return ""
|
||||
}
|
||||
|
||||
// Only notify on queue drain (0 running + 0 queued) — individual completions are noise
|
||||
if m.notifier != nil && running == 0 && queued == 0 {
|
||||
// Emit agent.completed for each newly finished task
|
||||
if m.notifier != nil {
|
||||
for _, desc := range newlyCompleted {
|
||||
m.notifier.ChannelSend(context.Background(), "agent.completed", map[string]any{
|
||||
"description": desc,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Only emit queue.drained when genuinely empty — verified by live PID check
|
||||
liveRunning, liveQueued := m.countLiveWorkspaces()
|
||||
if m.notifier != nil && liveRunning == 0 && liveQueued == 0 {
|
||||
m.notifier.ChannelSend(context.Background(), "queue.drained", map[string]any{
|
||||
"completed": len(newlyCompleted),
|
||||
"message": "all work complete",
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -146,10 +146,12 @@ func TestCheckCompletions_Good_NewCompletions(t *testing.T) {
|
|||
assert.Contains(t, msg, "2 agent(s) completed")
|
||||
|
||||
events := notifier.Events()
|
||||
require.Len(t, events, 1)
|
||||
assert.Equal(t, "agent.complete", events[0].channel)
|
||||
eventData := events[0].data.(map[string]any)
|
||||
assert.Equal(t, 2, eventData["count"])
|
||||
require.Len(t, events, 3) // 2 agent.completed + 1 queue.drained
|
||||
assert.Equal(t, "agent.completed", events[0].channel)
|
||||
assert.Equal(t, "agent.completed", events[1].channel)
|
||||
assert.Equal(t, "queue.drained", events[2].channel)
|
||||
drainData := events[2].data.(map[string]any)
|
||||
assert.Equal(t, 2, drainData["completed"])
|
||||
}
|
||||
|
||||
func TestCheckCompletions_Good_MixedStatuses(t *testing.T) {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue