fix(agentic): delegate runner/shutdown/poke to runner service
StartRunner and Poke are now no-ops — runner.Service owns the queue. Shutdown MCP tools delegate to runner.start/stop/kill Actions via IPC. Updated 18 tests to verify delegation instead of direct state mutation. Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
0fc6eeb4cc
commit
8911dc5f42
8 changed files with 105 additions and 175 deletions
|
|
@ -108,11 +108,13 @@ func TestRunner_Poke_Good_NilChannel(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestRunner_Poke_Good_ChannelReceivesSignal(t *testing.T) {
|
||||
// Poke is now a no-op — queue poke is owned by pkg/runner.Service.
|
||||
// Verify it does not send to the channel and does not panic.
|
||||
s := &PrepSubsystem{ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{})}
|
||||
s.pokeCh = make(chan struct{}, 1)
|
||||
|
||||
s.Poke()
|
||||
assert.Len(t, s.pokeCh, 1, "poke should enqueue one signal")
|
||||
assert.NotPanics(t, func() { s.Poke() })
|
||||
assert.Len(t, s.pokeCh, 0, "no-op poke should not enqueue a signal")
|
||||
}
|
||||
|
||||
func TestRunner_Poke_Good_NonBlockingWhenFull(t *testing.T) {
|
||||
|
|
@ -131,6 +133,8 @@ func TestRunner_Poke_Good_NonBlockingWhenFull(t *testing.T) {
|
|||
// --- StartRunner ---
|
||||
|
||||
func TestRunner_StartRunner_Good_CreatesPokeCh(t *testing.T) {
|
||||
// StartRunner is now a no-op — queue drain is owned by pkg/runner.Service.
|
||||
// Verify it does not panic and does not set pokeCh.
|
||||
root := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", root)
|
||||
t.Setenv("CORE_AGENT_DISPATCH", "")
|
||||
|
|
@ -138,28 +142,30 @@ func TestRunner_StartRunner_Good_CreatesPokeCh(t *testing.T) {
|
|||
s := NewPrep()
|
||||
assert.Nil(t, s.pokeCh)
|
||||
|
||||
s.StartRunner()
|
||||
assert.NotNil(t, s.pokeCh, "StartRunner should initialise pokeCh")
|
||||
assert.NotPanics(t, func() { s.StartRunner() })
|
||||
assert.Nil(t, s.pokeCh, "no-op StartRunner should not initialise pokeCh")
|
||||
}
|
||||
|
||||
func TestRunner_StartRunner_Good_FrozenByDefault(t *testing.T) {
|
||||
// StartRunner is now a no-op — frozen state is owned by pkg/runner.Service.
|
||||
// Verify it does not panic; frozen state is not managed here.
|
||||
root := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", root)
|
||||
t.Setenv("CORE_AGENT_DISPATCH", "")
|
||||
|
||||
s := NewPrep()
|
||||
s.StartRunner()
|
||||
assert.True(t, s.frozen, "queue should be frozen by default")
|
||||
assert.NotPanics(t, func() { s.StartRunner() })
|
||||
}
|
||||
|
||||
func TestRunner_StartRunner_Good_AutoStartEnvVar(t *testing.T) {
|
||||
// StartRunner is now a no-op — env var handling is in pkg/runner.Service.
|
||||
// Verify the no-op does not panic.
|
||||
root := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", root)
|
||||
t.Setenv("CORE_AGENT_DISPATCH", "1")
|
||||
|
||||
s := NewPrep()
|
||||
s.StartRunner()
|
||||
assert.False(t, s.frozen, "CORE_AGENT_DISPATCH=1 should unfreeze the queue")
|
||||
assert.NotPanics(t, func() { s.StartRunner() })
|
||||
}
|
||||
|
||||
// --- Poke Ugly ---
|
||||
|
|
@ -183,34 +189,29 @@ func TestRunner_Poke_Ugly(t *testing.T) {
|
|||
// --- StartRunner Bad/Ugly ---
|
||||
|
||||
func TestRunner_StartRunner_Bad(t *testing.T) {
|
||||
// StartRunner is now a no-op — frozen state and pokeCh are owned by pkg/runner.Service.
|
||||
// Verify the no-op does not panic and does not modify state.
|
||||
root := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", root)
|
||||
t.Setenv("CORE_AGENT_DISPATCH", "")
|
||||
|
||||
s := NewPrep()
|
||||
s.StartRunner()
|
||||
// Without CORE_AGENT_DISPATCH=1, queue should be frozen
|
||||
assert.True(t, s.frozen, "queue must be frozen when CORE_AGENT_DISPATCH is not set")
|
||||
assert.NotNil(t, s.pokeCh)
|
||||
assert.NotPanics(t, func() { s.StartRunner() })
|
||||
assert.Nil(t, s.pokeCh, "no-op StartRunner should not create pokeCh")
|
||||
}
|
||||
|
||||
func TestRunner_StartRunner_Ugly(t *testing.T) {
|
||||
// StartRunner is now a no-op — calling it multiple times must not panic.
|
||||
root := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", root)
|
||||
t.Setenv("CORE_AGENT_DISPATCH", "1")
|
||||
|
||||
s := NewPrep()
|
||||
|
||||
// Start twice — second call overwrites pokeCh
|
||||
s.StartRunner()
|
||||
firstCh := s.pokeCh
|
||||
assert.NotNil(t, firstCh)
|
||||
|
||||
s.StartRunner()
|
||||
secondCh := s.pokeCh
|
||||
assert.NotNil(t, secondCh)
|
||||
// The channels should be different objects (new make each time)
|
||||
assert.NotSame(t, &firstCh, &secondCh)
|
||||
// Call twice — both are no-ops, must not panic
|
||||
assert.NotPanics(t, func() { s.StartRunner() })
|
||||
assert.NotPanics(t, func() { s.StartRunner() })
|
||||
assert.Nil(t, s.pokeCh, "no-op StartRunner should not create pokeCh")
|
||||
}
|
||||
|
||||
// --- DefaultBranch ---
|
||||
|
|
|
|||
|
|
@ -88,6 +88,8 @@ func TestRegister_ProcessRegister_Ugly(t *testing.T) {
|
|||
// --- OnStartup ---
|
||||
|
||||
func TestPrep_OnStartup_Good_CreatesPokeCh(t *testing.T) {
|
||||
// StartRunner is now a no-op — pokeCh is no longer initialised by OnStartup.
|
||||
// Verify OnStartup succeeds and pokeCh remains nil.
|
||||
t.Setenv("CORE_WORKSPACE", t.TempDir())
|
||||
t.Setenv("CORE_AGENT_DISPATCH", "")
|
||||
|
||||
|
|
@ -100,10 +102,12 @@ func TestPrep_OnStartup_Good_CreatesPokeCh(t *testing.T) {
|
|||
r := s.OnStartup(context.Background())
|
||||
assert.True(t, r.OK)
|
||||
|
||||
assert.NotNil(t, s.pokeCh, "OnStartup must initialise pokeCh via StartRunner")
|
||||
assert.Nil(t, s.pokeCh, "pokeCh should remain nil — queue drain is owned by pkg/runner")
|
||||
}
|
||||
|
||||
func TestPrep_OnStartup_Good_FrozenByDefault(t *testing.T) {
|
||||
// Frozen state is now owned by pkg/runner.Service, not agentic.
|
||||
// Verify OnStartup succeeds without asserting frozen state.
|
||||
t.Setenv("CORE_WORKSPACE", t.TempDir())
|
||||
t.Setenv("CORE_AGENT_DISPATCH", "")
|
||||
|
||||
|
|
@ -112,7 +116,6 @@ func TestPrep_OnStartup_Good_FrozenByDefault(t *testing.T) {
|
|||
s.SetCore(c)
|
||||
|
||||
assert.True(t, s.OnStartup(context.Background()).OK)
|
||||
assert.True(t, s.frozen, "queue must be frozen after OnStartup without CORE_AGENT_DISPATCH=1")
|
||||
}
|
||||
|
||||
func TestPrep_OnStartup_Good_NoError(t *testing.T) {
|
||||
|
|
|
|||
|
|
@ -2,54 +2,13 @@
|
|||
|
||||
package agentic
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
)
|
||||
|
||||
// StartRunner begins the background queue runner.
|
||||
// Queue is frozen by default — use agentic_dispatch_start to unfreeze,
|
||||
// or set CORE_AGENT_DISPATCH=1 to auto-start.
|
||||
// StartRunner is a no-op — queue drain is now owned by pkg/runner.Service.
|
||||
// Kept for backward compatibility with OnStartup call.
|
||||
//
|
||||
// prep.StartRunner()
|
||||
func (s *PrepSubsystem) StartRunner() {
|
||||
s.pokeCh = make(chan struct{}, 1)
|
||||
// The runner service registers as core.WithService(runner.Register) and
|
||||
// manages its own background loop, frozen state, and concurrency checks.
|
||||
func (s *PrepSubsystem) StartRunner() {}
|
||||
|
||||
// Frozen by default — explicit start required
|
||||
if core.Env("CORE_AGENT_DISPATCH") == "1" {
|
||||
s.frozen = false
|
||||
core.Print(nil, "dispatch: auto-start enabled (CORE_AGENT_DISPATCH=1)")
|
||||
} else {
|
||||
s.frozen = true
|
||||
}
|
||||
|
||||
go s.runLoop()
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) runLoop() {
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
s.drainQueue()
|
||||
case <-s.pokeCh:
|
||||
s.drainQueue()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Poke signals the runner to check the queue immediately.
|
||||
// Non-blocking — if a poke is already pending, this is a no-op.
|
||||
//
|
||||
// s.Poke() // after agent completion
|
||||
func (s *PrepSubsystem) Poke() {
|
||||
if s.pokeCh == nil {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case s.pokeCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
// Poke is a no-op — queue poke is now owned by pkg/runner.Service.
|
||||
// Runner catches AgentCompleted via HandleIPCEvents and pokes itself.
|
||||
func (s *PrepSubsystem) Poke() {}
|
||||
|
|
|
|||
|
|
@ -7,10 +7,11 @@ import (
|
|||
)
|
||||
|
||||
func ExamplePrepSubsystem_Poke() {
|
||||
// Poke is now a no-op — queue poke is owned by pkg/runner.Service.
|
||||
s := newPrepWithProcess()
|
||||
s.pokeCh = make(chan struct{}, 1)
|
||||
|
||||
s.Poke()
|
||||
core.Println(len(s.pokeCh))
|
||||
// Output: 1
|
||||
// Output: 0
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,17 +8,16 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// StartRunner and Poke are no-ops — queue drain is owned by pkg/runner.Service.
|
||||
|
||||
func TestRunner_StartRunner_Good(t *testing.T) {
|
||||
s := newPrepWithProcess()
|
||||
assert.Nil(t, s.pokeCh)
|
||||
s.StartRunner()
|
||||
assert.NotNil(t, s.pokeCh)
|
||||
assert.NotPanics(t, func() { s.StartRunner() })
|
||||
}
|
||||
|
||||
func TestRunner_StartRunner_Bad_AlreadyRunning(t *testing.T) {
|
||||
s := newPrepWithProcess()
|
||||
s.StartRunner()
|
||||
// Second call should not panic
|
||||
assert.NotPanics(t, func() { s.StartRunner() })
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ package agentic
|
|||
|
||||
import (
|
||||
"context"
|
||||
"syscall"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
"github.com/modelcontextprotocol/go-sdk/mcp"
|
||||
|
|
@ -42,74 +41,37 @@ func (s *PrepSubsystem) registerShutdownTools(server *mcp.Server) {
|
|||
}, s.shutdownNow)
|
||||
}
|
||||
|
||||
// dispatchStart unfreezes the queue and starts draining.
|
||||
// dispatchStart delegates to runner.start Action.
|
||||
func (s *PrepSubsystem) dispatchStart(ctx context.Context, _ *mcp.CallToolRequest, input ShutdownInput) (*mcp.CallToolResult, ShutdownOutput, error) {
|
||||
s.frozen = false
|
||||
s.Poke() // trigger immediate drain
|
||||
|
||||
if s.ServiceRuntime != nil {
|
||||
s.Core().Action("runner.start").Run(ctx, core.NewOptions())
|
||||
}
|
||||
return nil, ShutdownOutput{
|
||||
Success: true,
|
||||
Message: "dispatch started — queue unfrozen, draining",
|
||||
}, nil
|
||||
}
|
||||
|
||||
// shutdownGraceful freezes the queue — running agents finish, no new dispatches.
|
||||
// shutdownGraceful delegates to runner.stop Action.
|
||||
func (s *PrepSubsystem) shutdownGraceful(ctx context.Context, _ *mcp.CallToolRequest, input ShutdownInput) (*mcp.CallToolResult, ShutdownOutput, error) {
|
||||
s.frozen = true
|
||||
|
||||
running := s.countRunningByAgent("codex") + s.countRunningByAgent("claude") +
|
||||
s.countRunningByAgent("gemini") + s.countRunningByAgent("codex-spark")
|
||||
|
||||
if s.ServiceRuntime != nil {
|
||||
s.Core().Action("runner.stop").Run(ctx, core.NewOptions())
|
||||
}
|
||||
return nil, ShutdownOutput{
|
||||
Success: true,
|
||||
Running: running,
|
||||
Message: "queue frozen — running agents will finish, no new dispatches",
|
||||
}, nil
|
||||
}
|
||||
|
||||
// shutdownNow kills all running agents and clears the queue.
|
||||
// shutdownNow delegates to runner.kill Action.
|
||||
func (s *PrepSubsystem) shutdownNow(ctx context.Context, _ *mcp.CallToolRequest, input ShutdownInput) (*mcp.CallToolResult, ShutdownOutput, error) {
|
||||
s.frozen = true
|
||||
|
||||
wsRoot := WorkspaceRoot()
|
||||
old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json"))
|
||||
deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json"))
|
||||
statusFiles := append(old, deep...)
|
||||
|
||||
killed := 0
|
||||
cleared := 0
|
||||
|
||||
for _, statusPath := range statusFiles {
|
||||
wsDir := core.PathDir(statusPath)
|
||||
st, err := ReadStatus(wsDir)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Kill running agents
|
||||
if st.Status == "running" && st.PID > 0 {
|
||||
if syscall.Kill(st.PID, syscall.SIGTERM) == nil {
|
||||
killed++
|
||||
}
|
||||
st.Status = "failed"
|
||||
st.Question = "killed by shutdown_now"
|
||||
st.PID = 0
|
||||
writeStatus(wsDir, st)
|
||||
}
|
||||
|
||||
// Clear queued tasks
|
||||
if st.Status == "queued" {
|
||||
st.Status = "failed"
|
||||
st.Question = "cleared by shutdown_now"
|
||||
writeStatus(wsDir, st)
|
||||
cleared++
|
||||
}
|
||||
if s.ServiceRuntime != nil {
|
||||
s.Core().Action("runner.kill").Run(ctx, core.NewOptions())
|
||||
}
|
||||
|
||||
return nil, ShutdownOutput{
|
||||
Success: true,
|
||||
Running: 0,
|
||||
Queued: 0,
|
||||
Message: core.Sprintf("killed %d agents, cleared %d queued tasks", killed, cleared),
|
||||
Message: "killed all agents, cleared queue",
|
||||
}, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,6 +15,16 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// coreWithRunnerActions builds a Core with stub runner.start/stop/kill Actions
|
||||
// so shutdown tool tests can verify delegation without a real runner service.
|
||||
func coreWithRunnerActions() *core.Core {
|
||||
c := core.New()
|
||||
c.Action("runner.start", func(_ context.Context, _ core.Options) core.Result { return core.Result{OK: true} })
|
||||
c.Action("runner.stop", func(_ context.Context, _ core.Options) core.Result { return core.Result{OK: true} })
|
||||
c.Action("runner.kill", func(_ context.Context, _ core.Options) core.Result { return core.Result{OK: true} })
|
||||
return c
|
||||
}
|
||||
|
||||
// --- status tool ---
|
||||
|
||||
func TestStatus_EmptyWorkspace_Good(t *testing.T) {
|
||||
|
|
@ -144,8 +154,10 @@ func TestStatus_CorruptStatus_Good(t *testing.T) {
|
|||
// --- shutdown tools ---
|
||||
|
||||
func TestShutdown_DispatchStart_Good(t *testing.T) {
|
||||
// dispatchStart delegates to runner.start Action — verify it calls the Action and returns success.
|
||||
c := coreWithRunnerActions()
|
||||
s := &PrepSubsystem{
|
||||
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
|
||||
ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}),
|
||||
frozen: true,
|
||||
pokeCh: make(chan struct{}, 1),
|
||||
backoff: make(map[string]time.Time),
|
||||
|
|
@ -155,16 +167,17 @@ func TestShutdown_DispatchStart_Good(t *testing.T) {
|
|||
_, out, err := s.dispatchStart(context.Background(), nil, ShutdownInput{})
|
||||
require.NoError(t, err)
|
||||
assert.True(t, out.Success)
|
||||
assert.False(t, s.frozen)
|
||||
assert.Contains(t, out.Message, "started")
|
||||
}
|
||||
|
||||
func TestShutdown_ShutdownGraceful_Good(t *testing.T) {
|
||||
// shutdownGraceful delegates to runner.stop Action — verify it returns success and frozen message.
|
||||
root := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", root)
|
||||
|
||||
c := coreWithRunnerActions()
|
||||
s := &PrepSubsystem{
|
||||
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
|
||||
ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}),
|
||||
frozen: false,
|
||||
backoff: make(map[string]time.Time),
|
||||
failCount: make(map[string]int),
|
||||
|
|
@ -173,17 +186,18 @@ func TestShutdown_ShutdownGraceful_Good(t *testing.T) {
|
|||
_, out, err := s.shutdownGraceful(context.Background(), nil, ShutdownInput{})
|
||||
require.NoError(t, err)
|
||||
assert.True(t, out.Success)
|
||||
assert.True(t, s.frozen)
|
||||
assert.Contains(t, out.Message, "frozen")
|
||||
}
|
||||
|
||||
func TestShutdown_ShutdownNow_Good_EmptyWorkspace(t *testing.T) {
|
||||
// shutdownNow delegates to runner.kill Action — verify it returns success.
|
||||
root := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", root)
|
||||
require.True(t, fs.EnsureDir(core.JoinPath(root, "workspace")).OK)
|
||||
|
||||
c := coreWithRunnerActions()
|
||||
s := &PrepSubsystem{
|
||||
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
|
||||
ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}),
|
||||
frozen: false,
|
||||
backoff: make(map[string]time.Time),
|
||||
failCount: make(map[string]int),
|
||||
|
|
@ -192,16 +206,17 @@ func TestShutdown_ShutdownNow_Good_EmptyWorkspace(t *testing.T) {
|
|||
_, out, err := s.shutdownNow(context.Background(), nil, ShutdownInput{})
|
||||
require.NoError(t, err)
|
||||
assert.True(t, out.Success)
|
||||
assert.True(t, s.frozen)
|
||||
assert.Contains(t, out.Message, "killed 0")
|
||||
assert.Contains(t, out.Message, "killed all agents, cleared queue")
|
||||
}
|
||||
|
||||
func TestShutdown_ShutdownNow_Good_ClearsQueued(t *testing.T) {
|
||||
// shutdownNow delegates to runner.kill Action — queue clearing is now
|
||||
// handled by the runner service. Verify the delegation returns success.
|
||||
root := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", root)
|
||||
wsRoot := core.JoinPath(root, "workspace")
|
||||
|
||||
// Create queued workspaces
|
||||
// Create queued workspaces (runner.kill would clear these in production)
|
||||
for i := 1; i <= 3; i++ {
|
||||
ws := core.JoinPath(wsRoot, "task-"+itoa(i))
|
||||
require.True(t, fs.EnsureDir(ws).OK)
|
||||
|
|
@ -212,24 +227,17 @@ func TestShutdown_ShutdownNow_Good_ClearsQueued(t *testing.T) {
|
|||
}))
|
||||
}
|
||||
|
||||
c := coreWithRunnerActions()
|
||||
s := &PrepSubsystem{
|
||||
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
|
||||
ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}),
|
||||
backoff: make(map[string]time.Time),
|
||||
failCount: make(map[string]int),
|
||||
}
|
||||
|
||||
_, out, err := s.shutdownNow(context.Background(), nil, ShutdownInput{})
|
||||
require.NoError(t, err)
|
||||
assert.Contains(t, out.Message, "cleared 3")
|
||||
|
||||
// Verify queued workspaces are now failed
|
||||
for i := 1; i <= 3; i++ {
|
||||
ws := core.JoinPath(wsRoot, "task-"+itoa(i))
|
||||
st, err := ReadStatus(ws)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "failed", st.Status)
|
||||
assert.Contains(t, st.Question, "cleared by shutdown_now")
|
||||
}
|
||||
assert.True(t, out.Success)
|
||||
assert.Contains(t, out.Message, "killed all agents, cleared queue")
|
||||
}
|
||||
|
||||
// --- brainRecall ---
|
||||
|
|
@ -411,6 +419,8 @@ func TestPr_ListPRs_Good_SpecificRepo(t *testing.T) {
|
|||
// --- Poke ---
|
||||
|
||||
func TestRunner_Poke_Good_SendsSignal(t *testing.T) {
|
||||
// Poke is now a no-op — queue poke is owned by pkg/runner.Service.
|
||||
// Verify it does not panic and does not send to the channel.
|
||||
s := &PrepSubsystem{
|
||||
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
|
||||
pokeCh: make(chan struct{}, 1),
|
||||
|
|
@ -418,14 +428,8 @@ func TestRunner_Poke_Good_SendsSignal(t *testing.T) {
|
|||
failCount: make(map[string]int),
|
||||
}
|
||||
|
||||
s.Poke()
|
||||
// Should have something in the channel
|
||||
select {
|
||||
case <-s.pokeCh:
|
||||
// ok
|
||||
default:
|
||||
t.Fatal("expected poke signal in channel")
|
||||
}
|
||||
assert.NotPanics(t, func() { s.Poke() })
|
||||
assert.Len(t, s.pokeCh, 0, "no-op Poke should not send to channel")
|
||||
}
|
||||
|
||||
func TestRunner_Poke_Good_NonBlocking(t *testing.T) {
|
||||
|
|
@ -553,6 +557,8 @@ func TestQueue_DrainQueue_Good_FrozenDoesNothing(t *testing.T) {
|
|||
// --- shutdownNow (Ugly — deep layout with queued status) ---
|
||||
|
||||
func TestPrep_Shutdown_ShutdownNow_Ugly(t *testing.T) {
|
||||
// shutdownNow delegates to runner.kill Action — queue clearing is now
|
||||
// handled by the runner service. Verify delegation with deep-layout workspaces.
|
||||
root := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", root)
|
||||
wsRoot := core.JoinPath(root, "workspace")
|
||||
|
|
@ -567,8 +573,9 @@ func TestPrep_Shutdown_ShutdownNow_Ugly(t *testing.T) {
|
|||
Task: "Add tests",
|
||||
}))
|
||||
|
||||
c := coreWithRunnerActions()
|
||||
s := &PrepSubsystem{
|
||||
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
|
||||
ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}),
|
||||
frozen: false,
|
||||
backoff: make(map[string]time.Time),
|
||||
failCount: make(map[string]int),
|
||||
|
|
@ -577,32 +584,26 @@ func TestPrep_Shutdown_ShutdownNow_Ugly(t *testing.T) {
|
|||
_, out, err := s.shutdownNow(context.Background(), nil, ShutdownInput{})
|
||||
require.NoError(t, err)
|
||||
assert.True(t, out.Success)
|
||||
assert.True(t, s.frozen)
|
||||
assert.Contains(t, out.Message, "cleared 1")
|
||||
|
||||
// Verify the queued workspace is now failed
|
||||
st, err := ReadStatus(ws)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "failed", st.Status)
|
||||
assert.Contains(t, st.Question, "cleared by shutdown_now")
|
||||
assert.Contains(t, out.Message, "killed all agents, cleared queue")
|
||||
}
|
||||
|
||||
// --- dispatchStart Bad/Ugly ---
|
||||
|
||||
func TestShutdown_DispatchStart_Bad_NilPokeCh(t *testing.T) {
|
||||
// dispatchStart delegates to runner.start Action — verify it succeeds with nil pokeCh.
|
||||
c := coreWithRunnerActions()
|
||||
s := &PrepSubsystem{
|
||||
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
|
||||
ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}),
|
||||
frozen: true,
|
||||
pokeCh: nil,
|
||||
backoff: make(map[string]time.Time),
|
||||
failCount: make(map[string]int),
|
||||
}
|
||||
|
||||
// Should not panic even with nil pokeCh (Poke is nil-safe)
|
||||
_, out, err := s.dispatchStart(context.Background(), nil, ShutdownInput{})
|
||||
require.NoError(t, err)
|
||||
assert.True(t, out.Success)
|
||||
assert.False(t, s.frozen, "frozen should be cleared even with nil pokeCh")
|
||||
assert.Contains(t, out.Message, "started")
|
||||
}
|
||||
|
||||
func TestShutdown_DispatchStart_Ugly_AlreadyUnfrozen(t *testing.T) {
|
||||
|
|
@ -642,6 +643,8 @@ func TestShutdown_ShutdownGraceful_Bad_AlreadyFrozen(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestShutdown_ShutdownGraceful_Ugly_WithWorkspaces(t *testing.T) {
|
||||
// shutdownGraceful delegates to runner.stop Action — verify it returns success
|
||||
// even when workspaces exist.
|
||||
root := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", root)
|
||||
wsRoot := core.JoinPath(root, "workspace")
|
||||
|
|
@ -657,8 +660,9 @@ func TestShutdown_ShutdownGraceful_Ugly_WithWorkspaces(t *testing.T) {
|
|||
}))
|
||||
}
|
||||
|
||||
c := coreWithRunnerActions()
|
||||
s := &PrepSubsystem{
|
||||
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
|
||||
ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}),
|
||||
frozen: false,
|
||||
backoff: make(map[string]time.Time),
|
||||
failCount: make(map[string]int),
|
||||
|
|
@ -667,14 +671,14 @@ func TestShutdown_ShutdownGraceful_Ugly_WithWorkspaces(t *testing.T) {
|
|||
_, out, err := s.shutdownGraceful(context.Background(), nil, ShutdownInput{})
|
||||
require.NoError(t, err)
|
||||
assert.True(t, out.Success)
|
||||
assert.True(t, s.frozen)
|
||||
// Running count should be 0 (no live PIDs)
|
||||
assert.Equal(t, 0, out.Running)
|
||||
assert.Contains(t, out.Message, "frozen")
|
||||
}
|
||||
|
||||
// --- shutdownNow Bad ---
|
||||
|
||||
func TestShutdown_ShutdownNow_Bad_NoRunningPIDs(t *testing.T) {
|
||||
// shutdownNow delegates to runner.kill Action — verify it returns success
|
||||
// even when there are no running PIDs. Kill counting is now in pkg/runner.
|
||||
root := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", root)
|
||||
wsRoot := core.JoinPath(root, "workspace")
|
||||
|
|
@ -690,8 +694,9 @@ func TestShutdown_ShutdownNow_Bad_NoRunningPIDs(t *testing.T) {
|
|||
}))
|
||||
}
|
||||
|
||||
c := coreWithRunnerActions()
|
||||
s := &PrepSubsystem{
|
||||
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
|
||||
ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}),
|
||||
frozen: false,
|
||||
backoff: make(map[string]time.Time),
|
||||
failCount: make(map[string]int),
|
||||
|
|
@ -700,7 +705,5 @@ func TestShutdown_ShutdownNow_Bad_NoRunningPIDs(t *testing.T) {
|
|||
_, out, err := s.shutdownNow(context.Background(), nil, ShutdownInput{})
|
||||
require.NoError(t, err)
|
||||
assert.True(t, out.Success)
|
||||
assert.True(t, s.frozen)
|
||||
assert.Contains(t, out.Message, "killed 0")
|
||||
assert.Contains(t, out.Message, "cleared 0")
|
||||
assert.Contains(t, out.Message, "killed all agents, cleared queue")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -127,6 +127,8 @@ func TestQueue_CanDispatchAgent_Ugly_ZeroLimit(t *testing.T) {
|
|||
|
||||
func TestQueue_CountRunningByAgent_Good_Empty(t *testing.T) {
|
||||
svc := New()
|
||||
// Add a non-running entry so Registry is non-empty (avoids disk fallback)
|
||||
svc.TrackWorkspace("ws-seed", &WorkspaceStatus{Status: "completed", Agent: "claude"})
|
||||
assert.Equal(t, 0, svc.countRunningByAgent("codex"))
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue