2026-03-16 11:10:33 +00:00
|
|
|
// SPDX-License-Identifier: EUPL-1.2
|
|
|
|
|
|
|
|
|
|
package agentic
|
|
|
|
|
|
|
|
|
|
import (
|
refactor: migrate core/agent to Core primitives — reference implementation
Phase 1: go-io/go-log → core.Fs{}, core.E(), core.Error/Info/Warn
Phase 2: strings/fmt → core.Contains, core.Sprintf, core.Split etc
Phase 3: embed.FS → core.Mount/core.Embed, core.Extract
Phase 4: cmd/main.go → core.Command(), c.Cli().Run(), no cli package
All packages migrated:
- pkg/lib (Codex): core.Mount, core.Extract, Result returns, AX comments
- pkg/setup (Codex): core.Fs, core.E, fixed missing lib helpers
- pkg/brain (Codex): Core primitives, AX comments
- pkg/monitor (Codex): Core string/logging primitives
- pkg/agentic (Codex): 20 files, Core primitives throughout
- cmd/main.go: pure Core CLI, no fmt/log/filepath/strings/cli
Remaining stdlib: path/filepath (Core doesn't wrap OS paths),
fmt.Sscanf/strings.Map (no Core equivalent).
Co-Authored-By: Virgil <virgil@lethean.io>
2026-03-22 06:13:41 +00:00
|
|
|
"strconv"
|
2026-03-16 11:10:33 +00:00
|
|
|
"time"
|
|
|
|
|
|
refactor: migrate core/agent to Core primitives — reference implementation
Phase 1: go-io/go-log → core.Fs{}, core.E(), core.Error/Info/Warn
Phase 2: strings/fmt → core.Contains, core.Sprintf, core.Split etc
Phase 3: embed.FS → core.Mount/core.Embed, core.Extract
Phase 4: cmd/main.go → core.Command(), c.Cli().Run(), no cli package
All packages migrated:
- pkg/lib (Codex): core.Mount, core.Extract, Result returns, AX comments
- pkg/setup (Codex): core.Fs, core.E, fixed missing lib helpers
- pkg/brain (Codex): Core primitives, AX comments
- pkg/monitor (Codex): Core string/logging primitives
- pkg/agentic (Codex): 20 files, Core primitives throughout
- cmd/main.go: pure Core CLI, no fmt/log/filepath/strings/cli
Remaining stdlib: path/filepath (Core doesn't wrap OS paths),
fmt.Sscanf/strings.Map (no Core equivalent).
Co-Authored-By: Virgil <virgil@lethean.io>
2026-03-22 06:13:41 +00:00
|
|
|
core "dappco.re/go/core"
|
2026-03-16 11:10:33 +00:00
|
|
|
"gopkg.in/yaml.v3"
|
|
|
|
|
)
|
|
|
|
|
|
2026-04-14 11:45:09 +01:00
|
|
|
// config := agentic.DispatchConfig{DefaultAgent: "claude", DefaultTemplate: "coding", Runtime: "auto", Image: "core-dev"}
|
2026-03-16 11:10:33 +00:00
|
|
|
type DispatchConfig struct {
|
|
|
|
|
DefaultAgent string `yaml:"default_agent"`
|
|
|
|
|
DefaultTemplate string `yaml:"default_template"`
|
|
|
|
|
WorkspaceRoot string `yaml:"workspace_root"`
|
2026-04-14 11:45:09 +01:00
|
|
|
// Runtime selects the container runtime — auto | apple | docker | podman.
|
|
|
|
|
// auto detects in preference order: Apple Container -> Docker -> Podman.
|
|
|
|
|
// Apple Containers (macOS 26+) provide hardware VM isolation and sub-second
|
|
|
|
|
// startup; Docker is the cross-platform fallback; Podman is the rootless
|
|
|
|
|
// option for Linux environments where Docker is unavailable.
|
|
|
|
|
Runtime string `yaml:"runtime"`
|
|
|
|
|
// Image is the default container image for non-native agent dispatch.
|
|
|
|
|
// Used by go-build LinuxKit images such as "core-dev", "core-ml", "core-minimal".
|
|
|
|
|
Image string `yaml:"image"`
|
|
|
|
|
// GPU enables GPU passthrough — Metal on Apple Containers (when available),
|
|
|
|
|
// NVIDIA on Docker. Default false.
|
|
|
|
|
GPU bool `yaml:"gpu"`
|
2026-03-16 11:10:33 +00:00
|
|
|
}
|
|
|
|
|
|
2026-03-31 05:20:50 +00:00
|
|
|
// rate := agentic.RateConfig{ResetUTC: "06:00", DailyLimit: 200, MinDelay: 15, SustainedDelay: 120, BurstWindow: 2, BurstDelay: 15}
|
2026-03-16 11:10:33 +00:00
|
|
|
type RateConfig struct {
|
2026-03-31 05:20:50 +00:00
|
|
|
ResetUTC string `yaml:"reset_utc"`
|
|
|
|
|
DailyLimit int `yaml:"daily_limit"`
|
|
|
|
|
MinDelay int `yaml:"min_delay"`
|
|
|
|
|
SustainedDelay int `yaml:"sustained_delay"`
|
|
|
|
|
BurstWindow int `yaml:"burst_window"`
|
|
|
|
|
BurstDelay int `yaml:"burst_delay"`
|
2026-03-16 11:10:33 +00:00
|
|
|
}
|
|
|
|
|
|
2026-03-30 22:30:05 +00:00
|
|
|
// claude: 1 → Total=1, Models=nil
|
|
|
|
|
// codex: → Total=2, Models={"gpt-5.4": 1, "gpt-5.3-codex-spark": 1}
|
2026-03-24 13:05:41 +00:00
|
|
|
//
|
2026-03-30 22:30:05 +00:00
|
|
|
// total: 2
|
|
|
|
|
// gpt-5.4: 1
|
|
|
|
|
// gpt-5.3-codex-spark: 1
|
2026-03-24 13:05:41 +00:00
|
|
|
type ConcurrencyLimit struct {
|
|
|
|
|
Total int
|
|
|
|
|
Models map[string]int
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-30 22:04:36 +00:00
|
|
|
// var limit ConcurrencyLimit
|
|
|
|
|
// _ = yaml.Unmarshal([]byte("total: 2\ngpt-5.4: 1\n"), &limit)
|
2026-03-24 13:05:41 +00:00
|
|
|
func (c *ConcurrencyLimit) UnmarshalYAML(value *yaml.Node) error {
|
|
|
|
|
var n int
|
|
|
|
|
if err := value.Decode(&n); err == nil {
|
|
|
|
|
c.Total = n
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
var m map[string]int
|
|
|
|
|
if err := value.Decode(&m); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
c.Total = m["total"]
|
|
|
|
|
c.Models = make(map[string]int)
|
|
|
|
|
for k, v := range m {
|
|
|
|
|
if k != "total" {
|
|
|
|
|
c.Models[k] = v
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-14 11:45:09 +01:00
|
|
|
// identity := agentic.AgentIdentity{Host: "local", Runner: "claude", Active: true, Roles: []string{"dispatch", "review"}}
|
|
|
|
|
// AgentIdentity represents one entry in the agents.yaml `agents:` block —
|
|
|
|
|
// the named identity (e.g. cladius, charon, codex) that can dispatch work.
|
|
|
|
|
type AgentIdentity struct {
|
|
|
|
|
// Host is "local", "cloud", "remote", or an explicit IP/hostname.
|
|
|
|
|
// identity := agentic.AgentIdentity{Host: "local"}
|
|
|
|
|
Host string `yaml:"host"`
|
|
|
|
|
// Runner is the runtime that backs this identity ("claude", "openai", "gemini").
|
|
|
|
|
// identity := agentic.AgentIdentity{Runner: "claude"}
|
|
|
|
|
Runner string `yaml:"runner"`
|
|
|
|
|
// Active reports whether this identity participates in dispatch.
|
|
|
|
|
// identity := agentic.AgentIdentity{Active: true}
|
|
|
|
|
Active bool `yaml:"active"`
|
|
|
|
|
// Roles enumerates the workflows this identity can handle:
|
|
|
|
|
// dispatch, worker, review, qa, plan.
|
|
|
|
|
// identity := agentic.AgentIdentity{Roles: []string{"dispatch", "review", "plan"}}
|
|
|
|
|
Roles []string `yaml:"roles"`
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-30 22:30:05 +00:00
|
|
|
// config := agentic.AgentsConfig{Version: 1, Dispatch: agentic.DispatchConfig{DefaultAgent: "claude"}}
|
2026-03-16 11:10:33 +00:00
|
|
|
type AgentsConfig struct {
|
2026-03-29 21:19:37 +00:00
|
|
|
Version int `yaml:"version"`
|
|
|
|
|
Dispatch DispatchConfig `yaml:"dispatch"`
|
|
|
|
|
Concurrency map[string]ConcurrencyLimit `yaml:"concurrency"`
|
|
|
|
|
Rates map[string]RateConfig `yaml:"rates"`
|
2026-04-14 11:45:09 +01:00
|
|
|
// Agents declares named identities (cladius, charon, codex, clotho)
|
|
|
|
|
// keyed by name. Each identity carries host/runner/roles metadata used
|
|
|
|
|
// by message routing, brain attribution, and session ownership.
|
|
|
|
|
Agents map[string]AgentIdentity `yaml:"agents"`
|
2026-03-16 11:10:33 +00:00
|
|
|
}
|
|
|
|
|
|
2026-03-30 22:04:36 +00:00
|
|
|
// config := s.loadAgentsConfig()
|
2026-03-16 11:10:33 +00:00
|
|
|
func (s *PrepSubsystem) loadAgentsConfig() *AgentsConfig {
|
|
|
|
|
paths := []string{
|
refactor: migrate core/agent to Core primitives — reference implementation
Phase 1: go-io/go-log → core.Fs{}, core.E(), core.Error/Info/Warn
Phase 2: strings/fmt → core.Contains, core.Sprintf, core.Split etc
Phase 3: embed.FS → core.Mount/core.Embed, core.Extract
Phase 4: cmd/main.go → core.Command(), c.Cli().Run(), no cli package
All packages migrated:
- pkg/lib (Codex): core.Mount, core.Extract, Result returns, AX comments
- pkg/setup (Codex): core.Fs, core.E, fixed missing lib helpers
- pkg/brain (Codex): Core primitives, AX comments
- pkg/monitor (Codex): Core string/logging primitives
- pkg/agentic (Codex): 20 files, Core primitives throughout
- cmd/main.go: pure Core CLI, no fmt/log/filepath/strings/cli
Remaining stdlib: path/filepath (Core doesn't wrap OS paths),
fmt.Sscanf/strings.Map (no Core equivalent).
Co-Authored-By: Virgil <virgil@lethean.io>
2026-03-22 06:13:41 +00:00
|
|
|
core.JoinPath(CoreRoot(), "agents.yaml"),
|
|
|
|
|
core.JoinPath(s.codePath, "core", "agent", "config", "agents.yaml"),
|
2026-03-16 11:10:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, path := range paths {
|
2026-03-30 21:11:06 +00:00
|
|
|
readResult := fs.Read(path)
|
|
|
|
|
if !readResult.OK {
|
2026-03-16 11:10:33 +00:00
|
|
|
continue
|
|
|
|
|
}
|
2026-03-30 20:53:36 +00:00
|
|
|
var config AgentsConfig
|
2026-03-30 21:11:06 +00:00
|
|
|
if err := yaml.Unmarshal([]byte(readResult.Value.(string)), &config); err != nil {
|
2026-03-16 11:10:33 +00:00
|
|
|
continue
|
|
|
|
|
}
|
2026-04-02 04:28:37 +00:00
|
|
|
setWorkspaceRootOverride(config.Dispatch.WorkspaceRoot)
|
2026-03-30 20:53:36 +00:00
|
|
|
return &config
|
2026-03-16 11:10:33 +00:00
|
|
|
}
|
|
|
|
|
|
2026-04-02 04:28:37 +00:00
|
|
|
setWorkspaceRootOverride("")
|
2026-03-16 11:10:33 +00:00
|
|
|
return &AgentsConfig{
|
|
|
|
|
Dispatch: DispatchConfig{
|
|
|
|
|
DefaultAgent: "claude",
|
|
|
|
|
DefaultTemplate: "coding",
|
|
|
|
|
},
|
2026-03-24 13:05:41 +00:00
|
|
|
Concurrency: map[string]ConcurrencyLimit{
|
|
|
|
|
"claude": {Total: 1},
|
|
|
|
|
"gemini": {Total: 3},
|
2026-03-16 11:10:33 +00:00
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-30 22:04:36 +00:00
|
|
|
// delay := s.delayForAgent("codex:gpt-5.4")
|
2026-03-16 11:10:33 +00:00
|
|
|
func (s *PrepSubsystem) delayForAgent(agent string) time.Duration {
|
2026-03-26 06:38:02 +00:00
|
|
|
var rates map[string]RateConfig
|
|
|
|
|
if s.ServiceRuntime != nil {
|
|
|
|
|
rates, _ = s.Core().Config().Get("agents.rates").Value.(map[string]RateConfig)
|
|
|
|
|
}
|
|
|
|
|
if rates == nil {
|
2026-03-30 20:53:36 +00:00
|
|
|
config := s.loadAgentsConfig()
|
|
|
|
|
rates = config.Rates
|
2026-03-26 06:38:02 +00:00
|
|
|
}
|
refactor: migrate core/agent to Core primitives — reference implementation
Phase 1: go-io/go-log → core.Fs{}, core.E(), core.Error/Info/Warn
Phase 2: strings/fmt → core.Contains, core.Sprintf, core.Split etc
Phase 3: embed.FS → core.Mount/core.Embed, core.Extract
Phase 4: cmd/main.go → core.Command(), c.Cli().Run(), no cli package
All packages migrated:
- pkg/lib (Codex): core.Mount, core.Extract, Result returns, AX comments
- pkg/setup (Codex): core.Fs, core.E, fixed missing lib helpers
- pkg/brain (Codex): Core primitives, AX comments
- pkg/monitor (Codex): Core string/logging primitives
- pkg/agentic (Codex): 20 files, Core primitives throughout
- cmd/main.go: pure Core CLI, no fmt/log/filepath/strings/cli
Remaining stdlib: path/filepath (Core doesn't wrap OS paths),
fmt.Sscanf/strings.Map (no Core equivalent).
Co-Authored-By: Virgil <virgil@lethean.io>
2026-03-22 06:13:41 +00:00
|
|
|
base := baseAgent(agent)
|
2026-03-26 06:38:02 +00:00
|
|
|
rate, ok := rates[base]
|
2026-03-16 11:10:33 +00:00
|
|
|
if !ok || rate.SustainedDelay == 0 {
|
|
|
|
|
return 0
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
resetHour, resetMin := 6, 0
|
refactor: migrate core/agent to Core primitives — reference implementation
Phase 1: go-io/go-log → core.Fs{}, core.E(), core.Error/Info/Warn
Phase 2: strings/fmt → core.Contains, core.Sprintf, core.Split etc
Phase 3: embed.FS → core.Mount/core.Embed, core.Extract
Phase 4: cmd/main.go → core.Command(), c.Cli().Run(), no cli package
All packages migrated:
- pkg/lib (Codex): core.Mount, core.Extract, Result returns, AX comments
- pkg/setup (Codex): core.Fs, core.E, fixed missing lib helpers
- pkg/brain (Codex): Core primitives, AX comments
- pkg/monitor (Codex): Core string/logging primitives
- pkg/agentic (Codex): 20 files, Core primitives throughout
- cmd/main.go: pure Core CLI, no fmt/log/filepath/strings/cli
Remaining stdlib: path/filepath (Core doesn't wrap OS paths),
fmt.Sscanf/strings.Map (no Core equivalent).
Co-Authored-By: Virgil <virgil@lethean.io>
2026-03-22 06:13:41 +00:00
|
|
|
parts := core.Split(rate.ResetUTC, ":")
|
|
|
|
|
if len(parts) >= 2 {
|
|
|
|
|
if hour, err := strconv.Atoi(core.Trim(parts[0])); err == nil {
|
|
|
|
|
resetHour = hour
|
|
|
|
|
}
|
|
|
|
|
if min, err := strconv.Atoi(core.Trim(parts[1])); err == nil {
|
|
|
|
|
resetMin = min
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-03-16 11:10:33 +00:00
|
|
|
|
|
|
|
|
now := time.Now().UTC()
|
|
|
|
|
resetToday := time.Date(now.Year(), now.Month(), now.Day(), resetHour, resetMin, 0, 0, time.UTC)
|
|
|
|
|
if now.Before(resetToday) {
|
|
|
|
|
resetToday = resetToday.AddDate(0, 0, -1)
|
|
|
|
|
}
|
|
|
|
|
nextReset := resetToday.AddDate(0, 0, 1)
|
|
|
|
|
hoursUntilReset := nextReset.Sub(now).Hours()
|
|
|
|
|
|
2026-04-01 19:14:54 +00:00
|
|
|
delay := time.Duration(rate.SustainedDelay) * time.Second
|
2026-03-16 11:10:33 +00:00
|
|
|
if rate.BurstWindow > 0 && hoursUntilReset <= float64(rate.BurstWindow) {
|
2026-04-01 19:14:54 +00:00
|
|
|
delay = time.Duration(rate.BurstDelay) * time.Second
|
2026-03-16 11:10:33 +00:00
|
|
|
}
|
|
|
|
|
|
2026-04-01 19:14:54 +00:00
|
|
|
minDelay := time.Duration(rate.MinDelay) * time.Second
|
|
|
|
|
if minDelay > delay {
|
|
|
|
|
delay = minDelay
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return delay
|
2026-03-16 11:10:33 +00:00
|
|
|
}
|
|
|
|
|
|
2026-03-30 22:54:19 +00:00
|
|
|
// n := s.countRunningByAgent("codex")
|
2026-03-16 11:10:33 +00:00
|
|
|
func (s *PrepSubsystem) countRunningByAgent(agent string) int {
|
2026-03-30 16:01:32 +00:00
|
|
|
var runtime *core.Core
|
|
|
|
|
if s.ServiceRuntime != nil {
|
|
|
|
|
runtime = s.Core()
|
|
|
|
|
}
|
feat(runner): extract dispatch runner into independent Core service
Moves concurrency, queue drain, workspace lifecycle, and frozen state
from agentic/prep into pkg/runner/ — a standalone Core service that
communicates via IPC Actions only.
- runner.Register wires Actions: dispatch, status, start, stop, kill, poke
- runner.HandleIPCEvents catches AgentCompleted → ChannelPush + queue poke
- Agentic dispatch asks runner for permission via c.Action("runner.dispatch")
- Dispatch mutex moved to struct-level sync.Mutex (fixes core.Lock init race)
- Registry-based concurrency counting replaces disk scanning
- TrackWorkspace called on both queued and running status writes
- SpawnQueued message added for runner→agentic spawn requests
- ChannelPush message in core/mcp enables any service to push channel events
- 51 new tests covering runner service, queue, and config parsing
Co-Authored-By: Virgil <virgil@lethean.io>
2026-03-26 11:00:47 +00:00
|
|
|
if s.workspaces != nil && s.workspaces.Len() > 0 {
|
|
|
|
|
count := 0
|
2026-03-30 21:11:06 +00:00
|
|
|
s.workspaces.Each(func(_ string, workspaceStatus *WorkspaceStatus) {
|
|
|
|
|
if workspaceStatus.Status == "running" && baseAgent(workspaceStatus.Agent) == agent && ProcessAlive(runtime, workspaceStatus.ProcessID, workspaceStatus.PID) {
|
2026-03-30 00:28:11 +00:00
|
|
|
count++
|
feat(runner): extract dispatch runner into independent Core service
Moves concurrency, queue drain, workspace lifecycle, and frozen state
from agentic/prep into pkg/runner/ — a standalone Core service that
communicates via IPC Actions only.
- runner.Register wires Actions: dispatch, status, start, stop, kill, poke
- runner.HandleIPCEvents catches AgentCompleted → ChannelPush + queue poke
- Agentic dispatch asks runner for permission via c.Action("runner.dispatch")
- Dispatch mutex moved to struct-level sync.Mutex (fixes core.Lock init race)
- Registry-based concurrency counting replaces disk scanning
- TrackWorkspace called on both queued and running status writes
- SpawnQueued message added for runner→agentic spawn requests
- ChannelPush message in core/mcp enables any service to push channel events
- 51 new tests covering runner service, queue, and config parsing
Co-Authored-By: Virgil <virgil@lethean.io>
2026-03-26 11:00:47 +00:00
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
return count
|
|
|
|
|
}
|
2026-03-16 11:10:33 +00:00
|
|
|
|
2026-03-30 16:01:32 +00:00
|
|
|
return s.countRunningByAgentDisk(runtime, agent)
|
feat(runner): extract dispatch runner into independent Core service
Moves concurrency, queue drain, workspace lifecycle, and frozen state
from agentic/prep into pkg/runner/ — a standalone Core service that
communicates via IPC Actions only.
- runner.Register wires Actions: dispatch, status, start, stop, kill, poke
- runner.HandleIPCEvents catches AgentCompleted → ChannelPush + queue poke
- Agentic dispatch asks runner for permission via c.Action("runner.dispatch")
- Dispatch mutex moved to struct-level sync.Mutex (fixes core.Lock init race)
- Registry-based concurrency counting replaces disk scanning
- TrackWorkspace called on both queued and running status writes
- SpawnQueued message added for runner→agentic spawn requests
- ChannelPush message in core/mcp enables any service to push channel events
- 51 new tests covering runner service, queue, and config parsing
Co-Authored-By: Virgil <virgil@lethean.io>
2026-03-26 11:00:47 +00:00
|
|
|
}
|
|
|
|
|
|
2026-03-30 16:01:32 +00:00
|
|
|
func (s *PrepSubsystem) countRunningByAgentDisk(runtime *core.Core, agent string) int {
|
2026-03-16 11:10:33 +00:00
|
|
|
count := 0
|
2026-03-29 21:19:37 +00:00
|
|
|
for _, statusPath := range WorkspaceStatusPaths() {
|
2026-03-30 19:40:02 +00:00
|
|
|
result := ReadStatusResult(core.PathDir(statusPath))
|
2026-03-30 21:11:06 +00:00
|
|
|
workspaceStatus, ok := workspaceStatusValue(result)
|
|
|
|
|
if !ok || workspaceStatus.Status != "running" {
|
2026-03-16 11:10:33 +00:00
|
|
|
continue
|
|
|
|
|
}
|
2026-03-30 21:11:06 +00:00
|
|
|
if baseAgent(workspaceStatus.Agent) != agent {
|
2026-03-16 11:10:33 +00:00
|
|
|
continue
|
|
|
|
|
}
|
2026-03-30 21:11:06 +00:00
|
|
|
if ProcessAlive(runtime, workspaceStatus.ProcessID, workspaceStatus.PID) {
|
2026-03-17 19:35:15 +00:00
|
|
|
count++
|
2026-03-16 11:10:33 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return count
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-30 22:54:19 +00:00
|
|
|
// n := s.countRunningByModel("codex:gpt-5.4")
|
2026-03-24 13:05:41 +00:00
|
|
|
func (s *PrepSubsystem) countRunningByModel(agent string) int {
|
2026-03-30 16:01:32 +00:00
|
|
|
var runtime *core.Core
|
|
|
|
|
if s.ServiceRuntime != nil {
|
|
|
|
|
runtime = s.Core()
|
|
|
|
|
}
|
feat(runner): extract dispatch runner into independent Core service
Moves concurrency, queue drain, workspace lifecycle, and frozen state
from agentic/prep into pkg/runner/ — a standalone Core service that
communicates via IPC Actions only.
- runner.Register wires Actions: dispatch, status, start, stop, kill, poke
- runner.HandleIPCEvents catches AgentCompleted → ChannelPush + queue poke
- Agentic dispatch asks runner for permission via c.Action("runner.dispatch")
- Dispatch mutex moved to struct-level sync.Mutex (fixes core.Lock init race)
- Registry-based concurrency counting replaces disk scanning
- TrackWorkspace called on both queued and running status writes
- SpawnQueued message added for runner→agentic spawn requests
- ChannelPush message in core/mcp enables any service to push channel events
- 51 new tests covering runner service, queue, and config parsing
Co-Authored-By: Virgil <virgil@lethean.io>
2026-03-26 11:00:47 +00:00
|
|
|
if s.workspaces != nil && s.workspaces.Len() > 0 {
|
|
|
|
|
count := 0
|
2026-03-30 21:11:06 +00:00
|
|
|
s.workspaces.Each(func(_ string, workspaceStatus *WorkspaceStatus) {
|
|
|
|
|
if workspaceStatus.Status == "running" && workspaceStatus.Agent == agent && ProcessAlive(runtime, workspaceStatus.ProcessID, workspaceStatus.PID) {
|
2026-03-30 00:28:11 +00:00
|
|
|
count++
|
feat(runner): extract dispatch runner into independent Core service
Moves concurrency, queue drain, workspace lifecycle, and frozen state
from agentic/prep into pkg/runner/ — a standalone Core service that
communicates via IPC Actions only.
- runner.Register wires Actions: dispatch, status, start, stop, kill, poke
- runner.HandleIPCEvents catches AgentCompleted → ChannelPush + queue poke
- Agentic dispatch asks runner for permission via c.Action("runner.dispatch")
- Dispatch mutex moved to struct-level sync.Mutex (fixes core.Lock init race)
- Registry-based concurrency counting replaces disk scanning
- TrackWorkspace called on both queued and running status writes
- SpawnQueued message added for runner→agentic spawn requests
- ChannelPush message in core/mcp enables any service to push channel events
- 51 new tests covering runner service, queue, and config parsing
Co-Authored-By: Virgil <virgil@lethean.io>
2026-03-26 11:00:47 +00:00
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
return count
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-30 16:01:32 +00:00
|
|
|
return s.countRunningByModelDisk(runtime, agent)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *PrepSubsystem) countRunningByModelDisk(runtime *core.Core, agent string) int {
|
2026-03-24 13:05:41 +00:00
|
|
|
count := 0
|
2026-03-29 21:19:37 +00:00
|
|
|
for _, statusPath := range WorkspaceStatusPaths() {
|
2026-03-30 19:40:02 +00:00
|
|
|
result := ReadStatusResult(core.PathDir(statusPath))
|
2026-03-30 21:11:06 +00:00
|
|
|
workspaceStatus, ok := workspaceStatusValue(result)
|
|
|
|
|
if !ok || workspaceStatus.Status != "running" {
|
2026-03-24 13:05:41 +00:00
|
|
|
continue
|
|
|
|
|
}
|
2026-03-30 21:11:06 +00:00
|
|
|
if workspaceStatus.Agent != agent {
|
2026-03-24 13:05:41 +00:00
|
|
|
continue
|
|
|
|
|
}
|
2026-03-30 21:11:06 +00:00
|
|
|
if ProcessAlive(runtime, workspaceStatus.ProcessID, workspaceStatus.PID) {
|
2026-03-24 13:05:41 +00:00
|
|
|
count++
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return count
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-30 22:04:36 +00:00
|
|
|
// base := baseAgent("gemini:flash") // "gemini"
|
2026-03-16 11:10:33 +00:00
|
|
|
func baseAgent(agent string) string {
|
refactor: migrate core/agent to Core primitives — reference implementation
Phase 1: go-io/go-log → core.Fs{}, core.E(), core.Error/Info/Warn
Phase 2: strings/fmt → core.Contains, core.Sprintf, core.Split etc
Phase 3: embed.FS → core.Mount/core.Embed, core.Extract
Phase 4: cmd/main.go → core.Command(), c.Cli().Run(), no cli package
All packages migrated:
- pkg/lib (Codex): core.Mount, core.Extract, Result returns, AX comments
- pkg/setup (Codex): core.Fs, core.E, fixed missing lib helpers
- pkg/brain (Codex): Core primitives, AX comments
- pkg/monitor (Codex): Core string/logging primitives
- pkg/agentic (Codex): 20 files, Core primitives throughout
- cmd/main.go: pure Core CLI, no fmt/log/filepath/strings/cli
Remaining stdlib: path/filepath (Core doesn't wrap OS paths),
fmt.Sscanf/strings.Map (no Core equivalent).
Co-Authored-By: Virgil <virgil@lethean.io>
2026-03-22 06:13:41 +00:00
|
|
|
return core.SplitN(agent, ":", 2)[0]
|
2026-03-16 11:10:33 +00:00
|
|
|
}
|
|
|
|
|
|
2026-03-30 22:30:05 +00:00
|
|
|
// codex: {total: 2, models: {gpt-5.4: 1}} → max 2 codex total, max 1 gpt-5.4
|
2026-03-16 11:10:33 +00:00
|
|
|
func (s *PrepSubsystem) canDispatchAgent(agent string) bool {
|
2026-03-24 16:44:19 +00:00
|
|
|
var concurrency map[string]ConcurrencyLimit
|
feat(v0.8.0): full AX migration — ServiceRuntime, Actions, quality gates, transport
go-process:
- Register factory, Result lifecycle, 5 named Action handlers
- Start/Run/StartWithOptions/RunWithOptions all return core.Result
- core.ID() replaces fmt.Sprintf, core.As replaces errors.As
core/agent:
- PrepSubsystem + monitor.Subsystem + setup.Service embed ServiceRuntime[T]
- 22 named Actions + agent.completion Task pipeline in OnStartup
- ChannelNotifier removed — all IPC via c.ACTION(messages.X{})
- proc.go: all methods via s.Core().Process(), returns core.Result
- status.go: WriteAtomic + JSONMarshalString
- paths.go: Fs.NewUnrestricted() replaces unsafe.Pointer
- transport.go: ONE net/http file — HTTPGet/HTTPPost/HTTPDo/MCP transport
- All disallowed imports eliminated from source files (13 quality gates)
- String concat eliminated — core.Concat() throughout
- 1:1 _test.go + _example_test.go for every source file
- Reference docs synced from core/go v0.8.0
- RFC-025 updated with net/http, net/url, io/fs quality gates
- lib.go: io/fs eliminated via Data.ListNames, Array[T].Deduplicate
Co-Authored-By: Virgil <virgil@lethean.io>
2026-03-26 01:27:46 +00:00
|
|
|
if s.ServiceRuntime != nil {
|
2026-03-30 21:11:06 +00:00
|
|
|
configurationResult := s.Core().Config().Get("agents.concurrency")
|
|
|
|
|
if configurationResult.OK {
|
|
|
|
|
concurrency, _ = configurationResult.Value.(map[string]ConcurrencyLimit)
|
2026-03-26 11:30:38 +00:00
|
|
|
}
|
2026-03-24 16:44:19 +00:00
|
|
|
}
|
|
|
|
|
if concurrency == nil {
|
2026-03-30 20:53:36 +00:00
|
|
|
config := s.loadAgentsConfig()
|
|
|
|
|
concurrency = config.Concurrency
|
2026-03-24 16:44:19 +00:00
|
|
|
}
|
|
|
|
|
|
2026-03-16 11:10:33 +00:00
|
|
|
base := baseAgent(agent)
|
2026-03-24 16:44:19 +00:00
|
|
|
limit, ok := concurrency[base]
|
2026-03-24 13:05:41 +00:00
|
|
|
if !ok || limit.Total <= 0 {
|
2026-04-01 19:14:54 +00:00
|
|
|
if blocked, until := s.dailyRateLimitBackoff(agent); blocked {
|
|
|
|
|
if s.backoff == nil {
|
|
|
|
|
s.backoff = make(map[string]time.Time)
|
|
|
|
|
}
|
|
|
|
|
s.backoff[baseAgent(agent)] = until
|
|
|
|
|
s.persistRuntimeState()
|
|
|
|
|
return false
|
|
|
|
|
}
|
2026-03-16 11:10:33 +00:00
|
|
|
return true
|
|
|
|
|
}
|
2026-03-24 13:05:41 +00:00
|
|
|
|
|
|
|
|
if s.countRunningByAgent(base) >= limit.Total {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if limit.Models != nil {
|
|
|
|
|
model := modelVariant(agent)
|
|
|
|
|
if model != "" {
|
|
|
|
|
if modelLimit, has := limit.Models[model]; has && modelLimit > 0 {
|
|
|
|
|
if s.countRunningByModel(agent) >= modelLimit {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-01 19:14:54 +00:00
|
|
|
if blocked, until := s.dailyRateLimitBackoff(agent); blocked {
|
|
|
|
|
if s.backoff == nil {
|
|
|
|
|
s.backoff = make(map[string]time.Time)
|
|
|
|
|
}
|
|
|
|
|
s.backoff[base] = until
|
|
|
|
|
s.persistRuntimeState()
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-24 13:05:41 +00:00
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-01 19:14:54 +00:00
|
|
|
func (s *PrepSubsystem) dailyRateLimitBackoff(agent string) (bool, time.Time) {
|
|
|
|
|
rates := s.loadAgentsConfig().Rates
|
|
|
|
|
rate, ok := rates[baseAgent(agent)]
|
|
|
|
|
if !ok || rate.DailyLimit <= 0 {
|
|
|
|
|
return false, time.Time{}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if s.dailyDispatchCount(agent) < rate.DailyLimit {
|
|
|
|
|
return false, time.Time{}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
resetHour, resetMin := 6, 0
|
|
|
|
|
parts := core.Split(rate.ResetUTC, ":")
|
|
|
|
|
if len(parts) >= 2 {
|
|
|
|
|
if hour, err := strconv.Atoi(core.Trim(parts[0])); err == nil {
|
|
|
|
|
resetHour = hour
|
|
|
|
|
}
|
|
|
|
|
if min, err := strconv.Atoi(core.Trim(parts[1])); err == nil {
|
|
|
|
|
resetMin = min
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
now := time.Now().UTC()
|
|
|
|
|
resetToday := time.Date(now.Year(), now.Month(), now.Day(), resetHour, resetMin, 0, 0, time.UTC)
|
|
|
|
|
if now.Before(resetToday) {
|
|
|
|
|
resetToday = resetToday.AddDate(0, 0, -1)
|
|
|
|
|
}
|
|
|
|
|
nextReset := resetToday.AddDate(0, 0, 1)
|
|
|
|
|
if nextReset.Before(now) {
|
|
|
|
|
nextReset = now
|
|
|
|
|
}
|
|
|
|
|
return true, nextReset
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *PrepSubsystem) dailyDispatchCount(agent string) int {
|
|
|
|
|
eventsPath := core.JoinPath(WorkspaceRoot(), "events.jsonl")
|
|
|
|
|
result := fs.Read(eventsPath)
|
|
|
|
|
if !result.OK {
|
|
|
|
|
return 0
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
targetDay := time.Now().UTC().Format("2006-01-02")
|
|
|
|
|
base := baseAgent(agent)
|
|
|
|
|
count := 0
|
|
|
|
|
|
|
|
|
|
for _, line := range core.Split(result.Value.(string), "\n") {
|
|
|
|
|
line = core.Trim(line)
|
|
|
|
|
if line == "" {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var event CompletionEvent
|
|
|
|
|
if parseResult := core.JSONUnmarshalString(line, &event); !parseResult.OK {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if event.Type != "agent_started" {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if baseAgent(event.Agent) != base {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
timestamp, err := time.Parse(time.RFC3339, event.Timestamp)
|
|
|
|
|
if err != nil {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if timestamp.UTC().Format("2006-01-02") != targetDay {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
count++
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return count
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-30 22:54:19 +00:00
|
|
|
// model := modelVariant("codex:gpt-5.4")
|
2026-03-31 05:20:50 +00:00
|
|
|
// core.Println(model) // "gpt-5.4"
|
2026-03-24 13:05:41 +00:00
|
|
|
func modelVariant(agent string) string {
|
|
|
|
|
parts := core.SplitN(agent, ":", 2)
|
|
|
|
|
if len(parts) < 2 {
|
|
|
|
|
return ""
|
|
|
|
|
}
|
|
|
|
|
return parts[1]
|
2026-03-16 11:10:33 +00:00
|
|
|
}
|
|
|
|
|
|
2026-03-30 22:04:36 +00:00
|
|
|
// s.drainQueue()
|
2026-03-16 11:10:33 +00:00
|
|
|
func (s *PrepSubsystem) drainQueue() {
|
2026-03-23 16:08:08 +00:00
|
|
|
if s.frozen {
|
|
|
|
|
return
|
|
|
|
|
}
|
feat(v0.8.0): full AX migration — ServiceRuntime, Actions, quality gates, transport
go-process:
- Register factory, Result lifecycle, 5 named Action handlers
- Start/Run/StartWithOptions/RunWithOptions all return core.Result
- core.ID() replaces fmt.Sprintf, core.As replaces errors.As
core/agent:
- PrepSubsystem + monitor.Subsystem + setup.Service embed ServiceRuntime[T]
- 22 named Actions + agent.completion Task pipeline in OnStartup
- ChannelNotifier removed — all IPC via c.ACTION(messages.X{})
- proc.go: all methods via s.Core().Process(), returns core.Result
- status.go: WriteAtomic + JSONMarshalString
- paths.go: Fs.NewUnrestricted() replaces unsafe.Pointer
- transport.go: ONE net/http file — HTTPGet/HTTPPost/HTTPDo/MCP transport
- All disallowed imports eliminated from source files (13 quality gates)
- String concat eliminated — core.Concat() throughout
- 1:1 _test.go + _example_test.go for every source file
- Reference docs synced from core/go v0.8.0
- RFC-025 updated with net/http, net/url, io/fs quality gates
- lib.go: io/fs eliminated via Data.ListNames, Array[T].Deduplicate
Co-Authored-By: Virgil <virgil@lethean.io>
2026-03-26 01:27:46 +00:00
|
|
|
if s.ServiceRuntime != nil {
|
|
|
|
|
s.Core().Lock("drain").Mutex.Lock()
|
|
|
|
|
defer s.Core().Lock("drain").Mutex.Unlock()
|
refactor: AX compliance sweep — replace banned stdlib imports with core primitives
Replaced fmt, strings, sort, os, io, sync, encoding/json, path/filepath,
errors, log, reflect with core.Sprintf, core.E, core.Contains, core.Trim,
core.Split, core.Join, core.JoinPath, slices.Sort, c.Fs(), c.Lock(),
core.JSONMarshal, core.ReadAll and other CoreGO v0.8.0 primitives.
Framework boundary exceptions preserved where stdlib types are required
by external interfaces (Gin, net/http, CGo, Wails, bubbletea).
Co-Authored-By: Virgil <virgil@lethean.io>
2026-04-13 09:32:00 +01:00
|
|
|
} else if s.drainCh != nil {
|
|
|
|
|
s.drainCh <- struct{}{}
|
|
|
|
|
defer func() { <-s.drainCh }()
|
2026-03-24 16:44:19 +00:00
|
|
|
}
|
2026-03-21 17:10:43 +00:00
|
|
|
|
2026-03-23 12:53:33 +00:00
|
|
|
for s.drainOne() {
|
2026-03-16 11:10:33 +00:00
|
|
|
}
|
2026-03-23 12:53:33 +00:00
|
|
|
}
|
2026-03-16 11:10:33 +00:00
|
|
|
|
2026-03-30 22:04:36 +00:00
|
|
|
// spawned := s.drainOne()
|
2026-03-23 12:53:33 +00:00
|
|
|
func (s *PrepSubsystem) drainOne() bool {
|
2026-03-29 21:19:37 +00:00
|
|
|
for _, statusPath := range WorkspaceStatusPaths() {
|
2026-03-30 21:22:54 +00:00
|
|
|
workspaceDir := core.PathDir(statusPath)
|
|
|
|
|
result := ReadStatusResult(workspaceDir)
|
2026-03-30 21:11:06 +00:00
|
|
|
workspaceStatus, ok := workspaceStatusValue(result)
|
|
|
|
|
if !ok || workspaceStatus.Status != "queued" {
|
2026-03-16 11:10:33 +00:00
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-30 21:11:06 +00:00
|
|
|
if !s.canDispatchAgent(workspaceStatus.Agent) {
|
2026-03-16 11:10:33 +00:00
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-30 21:11:06 +00:00
|
|
|
pool := baseAgent(workspaceStatus.Agent)
|
2026-03-23 16:08:08 +00:00
|
|
|
if until, ok := s.backoff[pool]; ok && time.Now().Before(until) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-30 21:11:06 +00:00
|
|
|
delay := s.delayForAgent(workspaceStatus.Agent)
|
2026-03-16 11:10:33 +00:00
|
|
|
if delay > 0 {
|
|
|
|
|
time.Sleep(delay)
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-30 21:11:06 +00:00
|
|
|
if !s.canDispatchAgent(workspaceStatus.Agent) {
|
2026-03-16 11:10:33 +00:00
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-30 21:11:06 +00:00
|
|
|
prompt := core.Concat("TASK: ", workspaceStatus.Task, "\n\nResume from where you left off. Read CODEX.md for conventions. Commit when done.")
|
2026-03-16 11:10:33 +00:00
|
|
|
|
2026-03-30 21:22:54 +00:00
|
|
|
pid, processID, _, err := s.spawnAgent(workspaceStatus.Agent, prompt, workspaceDir)
|
2026-03-16 11:10:33 +00:00
|
|
|
if err != nil {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-30 21:11:06 +00:00
|
|
|
workspaceStatus.Status = "running"
|
|
|
|
|
workspaceStatus.PID = pid
|
|
|
|
|
workspaceStatus.ProcessID = processID
|
|
|
|
|
workspaceStatus.Runs++
|
2026-03-30 21:22:54 +00:00
|
|
|
writeStatusResult(workspaceDir, workspaceStatus)
|
|
|
|
|
s.TrackWorkspace(WorkspaceName(workspaceDir), workspaceStatus)
|
2026-03-16 11:10:33 +00:00
|
|
|
|
2026-03-23 12:53:33 +00:00
|
|
|
return true
|
2026-03-16 11:10:33 +00:00
|
|
|
}
|
2026-03-23 12:53:33 +00:00
|
|
|
|
|
|
|
|
return false
|
2026-03-16 11:10:33 +00:00
|
|
|
}
|