agent/pkg/agentic/queue.go
Snider c6490c175a refactor: migrate imports to dappco.re paths + bump mcp to v0.4.0
Update all go-* imports from forge.lthn.ai to dappco.re/go/core/*.
Bump mcp to v0.4.0 (Options{} struct API).
Versions: core v0.5.0, io v0.2.0, log v0.1.0, process v0.3.0,
ws v0.3.0, ai v0.2.0, webview v0.2.0, i18n v0.2.0.

Co-Authored-By: Virgil <virgil@lethean.io>
2026-03-22 01:27:48 +00:00

211 lines
5.4 KiB
Go

// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"fmt"
"os"
"path/filepath"
"strings"
"syscall"
"time"
coreio "dappco.re/go/core/io"
"gopkg.in/yaml.v3"
)
// DispatchConfig controls agent dispatch behaviour.
type DispatchConfig struct {
DefaultAgent string `yaml:"default_agent"`
DefaultTemplate string `yaml:"default_template"`
WorkspaceRoot string `yaml:"workspace_root"`
}
// RateConfig controls pacing between task dispatches.
type RateConfig struct {
ResetUTC string `yaml:"reset_utc"` // Daily quota reset time (UTC), e.g. "06:00"
DailyLimit int `yaml:"daily_limit"` // Max requests per day (0 = unknown)
MinDelay int `yaml:"min_delay"` // Minimum seconds between task starts
SustainedDelay int `yaml:"sustained_delay"` // Delay when pacing for full-day use
BurstWindow int `yaml:"burst_window"` // Hours before reset where burst kicks in
BurstDelay int `yaml:"burst_delay"` // Delay during burst window
}
// AgentsConfig is the root of config/agents.yaml.
type AgentsConfig struct {
Version int `yaml:"version"`
Dispatch DispatchConfig `yaml:"dispatch"`
Concurrency map[string]int `yaml:"concurrency"`
Rates map[string]RateConfig `yaml:"rates"`
}
// loadAgentsConfig reads config/agents.yaml from the code path.
func (s *PrepSubsystem) loadAgentsConfig() *AgentsConfig {
paths := []string{
filepath.Join(CoreRoot(), "agents.yaml"),
filepath.Join(s.codePath, "core", "agent", "config", "agents.yaml"),
}
for _, path := range paths {
data, err := coreio.Local.Read(path)
if err != nil {
continue
}
var cfg AgentsConfig
if err := yaml.Unmarshal([]byte(data), &cfg); err != nil {
continue
}
return &cfg
}
return &AgentsConfig{
Dispatch: DispatchConfig{
DefaultAgent: "claude",
DefaultTemplate: "coding",
},
Concurrency: map[string]int{
"claude": 1,
"gemini": 3,
},
}
}
// delayForAgent calculates how long to wait before spawning the next task
// for a given agent type, based on rate config and time of day.
func (s *PrepSubsystem) delayForAgent(agent string) time.Duration {
cfg := s.loadAgentsConfig()
// Strip variant suffix (claude:opus → claude) for config lookup
base := agent
if idx := strings.Index(agent, ":"); idx >= 0 {
base = agent[:idx]
}
rate, ok := cfg.Rates[base]
if !ok || rate.SustainedDelay == 0 {
return 0
}
// Parse reset time
resetHour, resetMin := 6, 0
fmt.Sscanf(rate.ResetUTC, "%d:%d", &resetHour, &resetMin)
now := time.Now().UTC()
resetToday := time.Date(now.Year(), now.Month(), now.Day(), resetHour, resetMin, 0, 0, time.UTC)
if now.Before(resetToday) {
// Reset hasn't happened yet today — reset was yesterday
resetToday = resetToday.AddDate(0, 0, -1)
}
nextReset := resetToday.AddDate(0, 0, 1)
hoursUntilReset := nextReset.Sub(now).Hours()
// Burst mode: if within burst window of reset, use burst delay
if rate.BurstWindow > 0 && hoursUntilReset <= float64(rate.BurstWindow) {
return time.Duration(rate.BurstDelay) * time.Second
}
// Sustained mode
return time.Duration(rate.SustainedDelay) * time.Second
}
// countRunningByAgent counts running workspaces for a specific agent type.
func (s *PrepSubsystem) countRunningByAgent(agent string) int {
wsRoot := WorkspaceRoot()
entries, err := os.ReadDir(wsRoot)
if err != nil {
return 0
}
count := 0
for _, entry := range entries {
if !entry.IsDir() {
continue
}
st, err := readStatus(filepath.Join(wsRoot, entry.Name()))
if err != nil || st.Status != "running" {
continue
}
if baseAgent(st.Agent) != agent {
continue
}
if st.PID > 0 && syscall.Kill(st.PID, 0) == nil {
count++
}
}
return count
}
// baseAgent strips the model variant (gemini:flash → gemini).
func baseAgent(agent string) string {
return strings.SplitN(agent, ":", 2)[0]
}
// canDispatchAgent checks if we're under the concurrency limit for a specific agent type.
func (s *PrepSubsystem) canDispatchAgent(agent string) bool {
cfg := s.loadAgentsConfig()
base := baseAgent(agent)
limit, ok := cfg.Concurrency[base]
if !ok || limit <= 0 {
return true
}
return s.countRunningByAgent(base) < limit
}
// drainQueue finds the oldest queued workspace and spawns it if a slot is available.
// Applies rate-based delay between spawns. Serialised via drainMu to prevent
// concurrent drainers from exceeding concurrency limits.
func (s *PrepSubsystem) drainQueue() {
s.drainMu.Lock()
defer s.drainMu.Unlock()
wsRoot := WorkspaceRoot()
entries, err := os.ReadDir(wsRoot)
if err != nil {
return
}
for _, entry := range entries {
if !entry.IsDir() {
continue
}
wsDir := filepath.Join(wsRoot, entry.Name())
st, err := readStatus(wsDir)
if err != nil || st.Status != "queued" {
continue
}
if !s.canDispatchAgent(st.Agent) {
continue
}
// Apply rate delay before spawning
delay := s.delayForAgent(st.Agent)
if delay > 0 {
time.Sleep(delay)
}
// Re-check concurrency after delay (another task may have started)
if !s.canDispatchAgent(st.Agent) {
continue
}
srcDir := filepath.Join(wsDir, "src")
prompt := "Read PROMPT.md for instructions. All context files (CLAUDE.md, TODO.md, CONTEXT.md, CONSUMERS.md, RECENT.md) are in the current directory. Work in this directory."
pid, _, err := s.spawnAgent(st.Agent, prompt, wsDir, srcDir)
if err != nil {
continue
}
st.Status = "running"
st.PID = pid
st.Runs++
writeStatus(wsDir, st)
return
}
}