test(monitor): align AX test paths
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
feaa4dec5e
commit
c1140cc917
6 changed files with 650 additions and 781 deletions
|
|
@ -4,6 +4,7 @@ package monitor
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"dappco.re/go/agent/pkg/agentic"
|
||||
|
|
@ -227,6 +228,112 @@ func TestHarvest_HarvestCompleted_Good_ChannelEvents(t *testing.T) {
|
|||
assert.Equal(t, 1, captured[0].Files)
|
||||
}
|
||||
|
||||
func TestHarvest_HarvestCompleted_Good_MultipleWorkspaces(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
name := fmt.Sprintf("ws-%d", i)
|
||||
wsDir := core.JoinPath(wsRoot, "workspace", name)
|
||||
|
||||
sourceDir := core.JoinPath(wsRoot, fmt.Sprintf("source-%d", i))
|
||||
fs.EnsureDir(sourceDir)
|
||||
run(t, sourceDir, "git", "init")
|
||||
run(t, sourceDir, "git", "checkout", "-b", "main")
|
||||
fs.Write(core.JoinPath(sourceDir, "README.md"), "# test")
|
||||
run(t, sourceDir, "git", "add", ".")
|
||||
run(t, sourceDir, "git", "commit", "-m", "init")
|
||||
|
||||
fs.EnsureDir(wsDir)
|
||||
run(t, wsDir, "git", "clone", sourceDir, "repo")
|
||||
repoDir := core.JoinPath(wsDir, "repo")
|
||||
run(t, repoDir, "git", "checkout", "-b", "agent/test-task")
|
||||
fs.Write(core.JoinPath(repoDir, "new.go"), "package main\n")
|
||||
run(t, repoDir, "git", "add", ".")
|
||||
run(t, repoDir, "git", "commit", "-m", "agent work")
|
||||
|
||||
writeStatus(t, wsDir, "completed", fmt.Sprintf("repo-%d", i), "agent/test-task")
|
||||
}
|
||||
|
||||
var harvests []messages.HarvestComplete
|
||||
c := core.New(core.WithService(agentic.ProcessRegister))
|
||||
c.ServiceStartup(context.Background(), nil)
|
||||
c.RegisterAction(func(_ *core.Core, msg core.Message) core.Result {
|
||||
if ev, ok := msg.(messages.HarvestComplete); ok {
|
||||
harvests = append(harvests, ev)
|
||||
}
|
||||
return core.Result{OK: true}
|
||||
})
|
||||
|
||||
mon := New()
|
||||
mon.ServiceRuntime = core.NewServiceRuntime(c, MonitorOptions{})
|
||||
|
||||
msg := mon.harvestCompleted()
|
||||
assert.Contains(t, msg, "Harvested:")
|
||||
assert.Contains(t, msg, "repo-0")
|
||||
assert.Contains(t, msg, "repo-1")
|
||||
|
||||
assert.GreaterOrEqual(t, len(harvests), 2)
|
||||
}
|
||||
|
||||
func TestHarvest_HarvestCompleted_Good_Empty(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
mon := New()
|
||||
mon.ServiceRuntime = testMon.ServiceRuntime
|
||||
msg := mon.harvestCompleted()
|
||||
assert.Equal(t, "", msg)
|
||||
}
|
||||
|
||||
func TestHarvest_HarvestCompleted_Good_RejectedWorkspace(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
|
||||
sourceDir := core.JoinPath(wsRoot, "source-rej")
|
||||
fs.EnsureDir(sourceDir)
|
||||
run(t, sourceDir, "git", "init")
|
||||
run(t, sourceDir, "git", "checkout", "-b", "main")
|
||||
fs.Write(core.JoinPath(sourceDir, "README.md"), "# test")
|
||||
run(t, sourceDir, "git", "add", ".")
|
||||
run(t, sourceDir, "git", "commit", "-m", "init")
|
||||
|
||||
wsDir := core.JoinPath(wsRoot, "workspace", "ws-rej")
|
||||
fs.EnsureDir(wsDir)
|
||||
run(t, wsDir, "git", "clone", sourceDir, "repo")
|
||||
repoDir := core.JoinPath(wsDir, "repo")
|
||||
run(t, repoDir, "git", "checkout", "-b", "agent/test-task")
|
||||
fs.Write(core.JoinPath(repoDir, "new.go"), "package main\n")
|
||||
run(t, repoDir, "git", "add", ".")
|
||||
run(t, repoDir, "git", "commit", "-m", "agent work")
|
||||
|
||||
fs.Write(core.JoinPath(repoDir, "app.exe"), "binary")
|
||||
run(t, repoDir, "git", "add", ".")
|
||||
run(t, repoDir, "git", "commit", "-m", "add binary")
|
||||
|
||||
writeStatus(t, wsDir, "completed", "rej-repo", "agent/test-task")
|
||||
|
||||
var rejections []messages.HarvestRejected
|
||||
c := core.New(core.WithService(agentic.ProcessRegister))
|
||||
c.ServiceStartup(context.Background(), nil)
|
||||
c.RegisterAction(func(_ *core.Core, msg core.Message) core.Result {
|
||||
if ev, ok := msg.(messages.HarvestRejected); ok {
|
||||
rejections = append(rejections, ev)
|
||||
}
|
||||
return core.Result{OK: true}
|
||||
})
|
||||
|
||||
mon := New()
|
||||
mon.ServiceRuntime = core.NewServiceRuntime(c, MonitorOptions{})
|
||||
|
||||
msg := mon.harvestCompleted()
|
||||
assert.Contains(t, msg, "REJECTED")
|
||||
|
||||
require.Len(t, rejections, 1)
|
||||
assert.Contains(t, rejections[0].Reason, "binary file added")
|
||||
}
|
||||
|
||||
func TestHarvest_UpdateStatus_Good(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
initial := map[string]any{"status": "completed", "repo": "test"}
|
||||
|
|
|
|||
|
|
@ -1,410 +0,0 @@
|
|||
// SPDX-License-Identifier: EUPL-1.2
|
||||
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"dappco.re/go/agent/pkg/messages"
|
||||
core "dappco.re/go/core"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// --- handleAgentStarted ---
|
||||
|
||||
func TestLogic_HandleAgentStarted_Good(t *testing.T) {
|
||||
mon := New()
|
||||
ev := messages.AgentStarted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-1"}
|
||||
mon.handleAgentStarted(ev)
|
||||
|
||||
mon.mu.Lock()
|
||||
defer mon.mu.Unlock()
|
||||
assert.True(t, mon.seenRunning["core/go-io/task-1"])
|
||||
}
|
||||
|
||||
func TestLogic_HandleAgentStarted_Bad_EmptyWorkspace(t *testing.T) {
|
||||
mon := New()
|
||||
// Empty workspace key must not panic and must record empty string key.
|
||||
ev := messages.AgentStarted{Agent: "", Repo: "", Workspace: ""}
|
||||
assert.NotPanics(t, func() { mon.handleAgentStarted(ev) })
|
||||
|
||||
mon.mu.Lock()
|
||||
defer mon.mu.Unlock()
|
||||
assert.True(t, mon.seenRunning[""])
|
||||
}
|
||||
|
||||
// --- handleAgentCompleted ---
|
||||
|
||||
func TestLogic_HandleAgentCompleted_Good_NilRuntime(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
mon := New()
|
||||
// ServiceRuntime is nil — must not panic, must record completion and poke.
|
||||
ev := messages.AgentCompleted{Agent: "codex", Repo: "go-io", Workspace: "ws-1", Status: "completed"}
|
||||
assert.NotPanics(t, func() { mon.handleAgentCompleted(ev) })
|
||||
|
||||
mon.mu.Lock()
|
||||
defer mon.mu.Unlock()
|
||||
assert.True(t, mon.seenCompleted["ws-1"])
|
||||
}
|
||||
|
||||
func TestLogic_HandleAgentCompleted_Good_WithCore(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
// Use Register so IPC handlers are wired
|
||||
c := core.New(core.WithService(Register))
|
||||
mon, ok := core.ServiceFor[*Subsystem](c, "monitor")
|
||||
require.True(t, ok)
|
||||
|
||||
ev := messages.AgentCompleted{Agent: "codex", Repo: "go-io", Workspace: "ws-2", Status: "completed"}
|
||||
c.ACTION(ev)
|
||||
|
||||
mon.mu.Lock()
|
||||
defer mon.mu.Unlock()
|
||||
assert.True(t, mon.seenCompleted["ws-2"])
|
||||
}
|
||||
|
||||
func TestLogic_HandleAgentCompleted_Bad_EmptyFields(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
mon := New()
|
||||
|
||||
// All fields empty — must not panic, must record empty workspace key.
|
||||
ev := messages.AgentCompleted{}
|
||||
assert.NotPanics(t, func() { mon.handleAgentCompleted(ev) })
|
||||
|
||||
mon.mu.Lock()
|
||||
defer mon.mu.Unlock()
|
||||
assert.True(t, mon.seenCompleted[""])
|
||||
}
|
||||
|
||||
// --- checkIdleAfterDelay ---
|
||||
|
||||
func TestLogic_CheckIdleAfterDelay_Bad_NilRuntime(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
mon := New() // ServiceRuntime is nil
|
||||
|
||||
// Should return immediately without panic after the 5s sleep.
|
||||
// We test the "ServiceRuntime == nil" return branch by exercising the guard directly.
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
if mon.ServiceRuntime == nil {
|
||||
close(done)
|
||||
return
|
||||
}
|
||||
mon.checkIdleAfterDelay()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatal("checkIdleAfterDelay nil-runtime guard did not return quickly")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLogic_CheckIdleAfterDelay_Good_EmptyWorkspace(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
// Create a Core with an IPC handler to capture QueueDrained messages
|
||||
var captured []messages.QueueDrained
|
||||
c := core.New()
|
||||
c.RegisterAction(func(_ *core.Core, msg core.Message) core.Result {
|
||||
if ev, ok := msg.(messages.QueueDrained); ok {
|
||||
captured = append(captured, ev)
|
||||
}
|
||||
return core.Result{OK: true}
|
||||
})
|
||||
|
||||
mon := New()
|
||||
mon.ServiceRuntime = core.NewServiceRuntime(c, MonitorOptions{})
|
||||
|
||||
// With empty workspace, running=0 and queued=0, so queue.drained fires.
|
||||
running, queued := mon.countLiveWorkspaces()
|
||||
assert.Equal(t, 0, running)
|
||||
assert.Equal(t, 0, queued)
|
||||
|
||||
if running == 0 && queued == 0 {
|
||||
mon.Core().ACTION(messages.QueueDrained{Completed: 0})
|
||||
}
|
||||
|
||||
require.Len(t, captured, 1)
|
||||
assert.Equal(t, 0, captured[0].Completed)
|
||||
}
|
||||
|
||||
// --- countLiveWorkspaces ---
|
||||
|
||||
func TestLogic_CountLiveWorkspaces_Good_EmptyWorkspace(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
mon := New()
|
||||
running, queued := mon.countLiveWorkspaces()
|
||||
assert.Equal(t, 0, running)
|
||||
assert.Equal(t, 0, queued)
|
||||
}
|
||||
|
||||
func TestLogic_CountLiveWorkspaces_Good_QueuedStatus(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
|
||||
writeWorkspaceStatus(t, wsRoot, "ws-q", map[string]any{
|
||||
"status": "queued",
|
||||
"repo": "go-io",
|
||||
"agent": "codex",
|
||||
})
|
||||
|
||||
mon := New()
|
||||
running, queued := mon.countLiveWorkspaces()
|
||||
assert.Equal(t, 0, running)
|
||||
assert.Equal(t, 1, queued)
|
||||
}
|
||||
|
||||
func TestLogic_CountLiveWorkspaces_Bad_RunningDeadPID(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
|
||||
// PID 1 is always init/launchd and not "our" process — on macOS sending
|
||||
// signal 0 to PID 1 returns EPERM (process exists but not ours), which
|
||||
// means pidAlive returns false for non-owned processes. Use PID 99999999
|
||||
// which is near-certainly dead.
|
||||
writeWorkspaceStatus(t, wsRoot, "ws-dead", map[string]any{
|
||||
"status": "running",
|
||||
"repo": "go-io",
|
||||
"agent": "codex",
|
||||
"pid": 99999999,
|
||||
})
|
||||
|
||||
mon := New()
|
||||
running, queued := mon.countLiveWorkspaces()
|
||||
// Dead PID should not count as running.
|
||||
assert.Equal(t, 0, running)
|
||||
assert.Equal(t, 0, queued)
|
||||
}
|
||||
|
||||
func TestLogic_CountLiveWorkspaces_Good_RunningLivePID(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
|
||||
// Current process is definitely alive.
|
||||
pid, _ := strconv.Atoi(core.Env("PID"))
|
||||
writeWorkspaceStatus(t, wsRoot, "ws-live", map[string]any{
|
||||
"status": "running",
|
||||
"repo": "go-io",
|
||||
"agent": "codex",
|
||||
"pid": pid,
|
||||
})
|
||||
|
||||
mon := New()
|
||||
running, queued := mon.countLiveWorkspaces()
|
||||
assert.Equal(t, 1, running)
|
||||
assert.Equal(t, 0, queued)
|
||||
}
|
||||
|
||||
// --- pidAlive ---
|
||||
|
||||
func TestLogic_PidAlive_Good_CurrentProcess(t *testing.T) {
|
||||
pid, _ := strconv.Atoi(core.Env("PID"))
|
||||
assert.True(t, pidAlive(pid), "current process must be alive")
|
||||
}
|
||||
|
||||
func TestLogic_PidAlive_Bad_DeadPID(t *testing.T) {
|
||||
// PID 99999999 is virtually guaranteed to not exist.
|
||||
assert.False(t, pidAlive(99999999))
|
||||
}
|
||||
|
||||
func TestLogic_PidAlive_Ugly_ZeroPID(t *testing.T) {
|
||||
// PID 0 is not a valid user process. pidAlive must return false or at
|
||||
// least not panic.
|
||||
assert.NotPanics(t, func() { pidAlive(0) })
|
||||
}
|
||||
|
||||
func TestLogic_PidAlive_Ugly_NegativePID(t *testing.T) {
|
||||
// Negative PID is invalid. Must not panic.
|
||||
assert.NotPanics(t, func() { pidAlive(-1) })
|
||||
}
|
||||
|
||||
// --- SetCore ---
|
||||
|
||||
func TestLogic_SetCore_Good_RegistersIPCHandler(t *testing.T) {
|
||||
c := core.New()
|
||||
mon := New()
|
||||
|
||||
// SetCore must not panic and must wire ServiceRuntime.
|
||||
assert.NotPanics(t, func() { mon.SetCore(c) })
|
||||
assert.NotNil(t, mon.ServiceRuntime)
|
||||
assert.Equal(t, c, mon.Core())
|
||||
}
|
||||
|
||||
func TestLogic_SetCore_Good_IPCHandlerFires(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
// IPC handlers are registered via Register, not SetCore
|
||||
c := core.New(core.WithService(Register))
|
||||
|
||||
mon, ok := core.ServiceFor[*Subsystem](c, "monitor")
|
||||
require.True(t, ok)
|
||||
|
||||
// Dispatch an AgentStarted via Core IPC — handler must update seenRunning.
|
||||
c.ACTION(messages.AgentStarted{Agent: "codex", Repo: "go-io", Workspace: "ws-ipc"})
|
||||
|
||||
mon.mu.Lock()
|
||||
defer mon.mu.Unlock()
|
||||
assert.True(t, mon.seenRunning["ws-ipc"])
|
||||
}
|
||||
|
||||
func TestLogic_SetCore_Good_CompletedIPCHandler(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
// IPC handlers are registered via Register, not SetCore
|
||||
c := core.New(core.WithService(Register))
|
||||
|
||||
mon, ok := core.ServiceFor[*Subsystem](c, "monitor")
|
||||
require.True(t, ok)
|
||||
|
||||
// Dispatch AgentCompleted — handler must update seenCompleted.
|
||||
c.ACTION(messages.AgentCompleted{Agent: "codex", Repo: "go-io", Workspace: "ws-done", Status: "completed"})
|
||||
|
||||
mon.mu.Lock()
|
||||
defer mon.mu.Unlock()
|
||||
assert.True(t, mon.seenCompleted["ws-done"])
|
||||
}
|
||||
|
||||
// --- OnStartup / OnShutdown ---
|
||||
|
||||
func TestLogic_OnStartup_Good_StartsLoop(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
home := t.TempDir()
|
||||
t.Setenv("HOME", home)
|
||||
|
||||
mon := New(Options{Interval: 1 * time.Hour})
|
||||
r := mon.OnStartup(context.Background())
|
||||
assert.True(t, r.OK)
|
||||
|
||||
// cancel must be non-nil after startup (loop running)
|
||||
assert.NotNil(t, mon.cancel)
|
||||
|
||||
// Cleanup.
|
||||
r2 := mon.OnShutdown(context.Background())
|
||||
assert.True(t, r2.OK)
|
||||
}
|
||||
|
||||
func TestLogic_OnStartup_Good_NoError(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
mon := New(Options{Interval: 1 * time.Hour})
|
||||
assert.True(t, mon.OnStartup(context.Background()).OK)
|
||||
_ = mon.OnShutdown(context.Background())
|
||||
}
|
||||
|
||||
func TestLogic_OnShutdown_Good_NoError(t *testing.T) {
|
||||
mon := New(Options{Interval: 1 * time.Hour})
|
||||
assert.True(t, mon.OnShutdown(context.Background()).OK)
|
||||
}
|
||||
|
||||
func TestLogic_OnShutdown_Good_StopsLoop(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
home := t.TempDir()
|
||||
t.Setenv("HOME", home)
|
||||
|
||||
mon := New(Options{Interval: 1 * time.Hour})
|
||||
require.True(t, mon.OnStartup(context.Background()).OK)
|
||||
|
||||
done := make(chan bool, 1)
|
||||
go func() {
|
||||
done <- mon.OnShutdown(context.Background()).OK
|
||||
}()
|
||||
|
||||
select {
|
||||
case ok := <-done:
|
||||
assert.True(t, ok)
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("OnShutdown did not return in time")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLogic_OnShutdown_Ugly_NilCancel(t *testing.T) {
|
||||
// OnShutdown without prior OnStartup must not panic.
|
||||
mon := New()
|
||||
assert.NotPanics(t, func() {
|
||||
_ = mon.OnShutdown(context.Background())
|
||||
})
|
||||
}
|
||||
|
||||
// --- Register ---
|
||||
|
||||
func TestLogic_Register_Good_ReturnsSubsystem(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
|
||||
c := core.New(core.WithService(Register))
|
||||
require.NotNil(t, c)
|
||||
|
||||
// Register returns the Subsystem as Value; WithService auto-registers it
|
||||
// under the package name "monitor".
|
||||
svc, ok := core.ServiceFor[*Subsystem](c, "monitor")
|
||||
assert.True(t, ok, "Subsystem must be registered as \"monitor\"")
|
||||
assert.NotNil(t, svc)
|
||||
}
|
||||
|
||||
func TestLogic_Register_Good_CoreWired(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
|
||||
c := core.New(core.WithService(Register))
|
||||
require.NotNil(t, c)
|
||||
|
||||
svc, ok := core.ServiceFor[*Subsystem](c, "monitor")
|
||||
require.True(t, ok)
|
||||
|
||||
// Register must set ServiceRuntime.
|
||||
assert.NotNil(t, svc.ServiceRuntime)
|
||||
assert.Equal(t, c, svc.Core())
|
||||
}
|
||||
|
||||
func TestLogic_Register_Good_IPCHandlerActive(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
c := core.New(core.WithService(Register))
|
||||
require.NotNil(t, c)
|
||||
|
||||
svc, ok := core.ServiceFor[*Subsystem](c, "monitor")
|
||||
require.True(t, ok)
|
||||
|
||||
// Fire an AgentStarted message — the registered IPC handler must update seenRunning.
|
||||
c.ACTION(messages.AgentStarted{Agent: "codex", Repo: "go-io", Workspace: "ws-reg"})
|
||||
|
||||
svc.mu.Lock()
|
||||
defer svc.mu.Unlock()
|
||||
assert.True(t, svc.seenRunning["ws-reg"])
|
||||
}
|
||||
|
|
@ -7,6 +7,8 @@ import (
|
|||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
|
@ -18,6 +20,16 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var testMon *Subsystem
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
c := core.New(core.WithService(agentic.ProcessRegister))
|
||||
c.ServiceStartup(context.Background(), nil)
|
||||
testMon = New()
|
||||
testMon.ServiceRuntime = core.NewServiceRuntime(c, MonitorOptions{})
|
||||
os.Exit(m.Run())
|
||||
}
|
||||
|
||||
type capturedChannelEvent struct {
|
||||
Channel string
|
||||
Data any
|
||||
|
|
@ -133,6 +145,287 @@ func TestMonitor_Shutdown_Good_NilCancel(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
// --- SetCore ---
|
||||
|
||||
func TestMonitor_SetCore_Good_WiresServiceRuntime(t *testing.T) {
|
||||
c := core.New()
|
||||
mon := New()
|
||||
|
||||
assert.NotPanics(t, func() { mon.SetCore(c) })
|
||||
assert.NotNil(t, mon.ServiceRuntime)
|
||||
assert.Equal(t, c, mon.Core())
|
||||
}
|
||||
|
||||
// --- handleAgentStarted / handleAgentCompleted ---
|
||||
|
||||
func TestMonitor_HandleAgentStarted_Good(t *testing.T) {
|
||||
mon := New()
|
||||
ev := messages.AgentStarted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-1"}
|
||||
mon.handleAgentStarted(ev)
|
||||
|
||||
mon.mu.Lock()
|
||||
defer mon.mu.Unlock()
|
||||
assert.True(t, mon.seenRunning["core/go-io/task-1"])
|
||||
}
|
||||
|
||||
func TestMonitor_HandleAgentStarted_Bad_EmptyWorkspace(t *testing.T) {
|
||||
mon := New()
|
||||
ev := messages.AgentStarted{}
|
||||
|
||||
assert.NotPanics(t, func() { mon.handleAgentStarted(ev) })
|
||||
|
||||
mon.mu.Lock()
|
||||
defer mon.mu.Unlock()
|
||||
assert.True(t, mon.seenRunning[""])
|
||||
}
|
||||
|
||||
func TestMonitor_HandleAgentCompleted_Good_NilRuntime(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
mon := New()
|
||||
ev := messages.AgentCompleted{Agent: "codex", Repo: "go-io", Workspace: "ws-1", Status: "completed"}
|
||||
|
||||
assert.NotPanics(t, func() { mon.handleAgentCompleted(ev) })
|
||||
|
||||
mon.mu.Lock()
|
||||
defer mon.mu.Unlock()
|
||||
assert.True(t, mon.seenCompleted["ws-1"])
|
||||
}
|
||||
|
||||
func TestMonitor_HandleAgentCompleted_Good_WithCore(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
c := core.New(core.WithService(Register))
|
||||
mon, ok := core.ServiceFor[*Subsystem](c, "monitor")
|
||||
require.True(t, ok)
|
||||
|
||||
c.ACTION(messages.AgentCompleted{Agent: "codex", Repo: "go-io", Workspace: "ws-2", Status: "completed"})
|
||||
|
||||
mon.mu.Lock()
|
||||
defer mon.mu.Unlock()
|
||||
assert.True(t, mon.seenCompleted["ws-2"])
|
||||
}
|
||||
|
||||
func TestMonitor_HandleAgentCompleted_Bad_EmptyFields(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
mon := New()
|
||||
|
||||
assert.NotPanics(t, func() { mon.handleAgentCompleted(messages.AgentCompleted{}) })
|
||||
|
||||
mon.mu.Lock()
|
||||
defer mon.mu.Unlock()
|
||||
assert.True(t, mon.seenCompleted[""])
|
||||
}
|
||||
|
||||
// --- checkIdleAfterDelay ---
|
||||
|
||||
func TestMonitor_CheckIdleAfterDelay_Bad_NilRuntime(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
mon := New()
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
if mon.ServiceRuntime == nil {
|
||||
close(done)
|
||||
return
|
||||
}
|
||||
mon.checkIdleAfterDelay()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatal("checkIdleAfterDelay nil-runtime guard did not return quickly")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMonitor_CheckIdleAfterDelay_Good_EmptyWorkspace(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
var captured []messages.QueueDrained
|
||||
c := core.New()
|
||||
c.RegisterAction(func(_ *core.Core, msg core.Message) core.Result {
|
||||
if ev, ok := msg.(messages.QueueDrained); ok {
|
||||
captured = append(captured, ev)
|
||||
}
|
||||
return core.Result{OK: true}
|
||||
})
|
||||
|
||||
mon := New()
|
||||
mon.ServiceRuntime = core.NewServiceRuntime(c, MonitorOptions{})
|
||||
|
||||
running, queued := mon.countLiveWorkspaces()
|
||||
assert.Equal(t, 0, running)
|
||||
assert.Equal(t, 0, queued)
|
||||
|
||||
if running == 0 && queued == 0 {
|
||||
mon.Core().ACTION(messages.QueueDrained{Completed: 0})
|
||||
}
|
||||
|
||||
require.Len(t, captured, 1)
|
||||
assert.Equal(t, 0, captured[0].Completed)
|
||||
}
|
||||
|
||||
// --- countLiveWorkspaces ---
|
||||
|
||||
func TestMonitor_CountLiveWorkspaces_Good_EmptyWorkspace(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
mon := New()
|
||||
running, queued := mon.countLiveWorkspaces()
|
||||
assert.Equal(t, 0, running)
|
||||
assert.Equal(t, 0, queued)
|
||||
}
|
||||
|
||||
func TestMonitor_CountLiveWorkspaces_Good_QueuedStatus(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
|
||||
writeWorkspaceStatus(t, wsRoot, "ws-q", map[string]any{
|
||||
"status": "queued",
|
||||
"repo": "go-io",
|
||||
"agent": "codex",
|
||||
})
|
||||
|
||||
mon := New()
|
||||
running, queued := mon.countLiveWorkspaces()
|
||||
assert.Equal(t, 0, running)
|
||||
assert.Equal(t, 1, queued)
|
||||
}
|
||||
|
||||
func TestMonitor_CountLiveWorkspaces_Bad_RunningDeadPID(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
|
||||
writeWorkspaceStatus(t, wsRoot, "ws-dead", map[string]any{
|
||||
"status": "running",
|
||||
"repo": "go-io",
|
||||
"agent": "codex",
|
||||
"pid": 99999999,
|
||||
})
|
||||
|
||||
mon := New()
|
||||
running, queued := mon.countLiveWorkspaces()
|
||||
assert.Equal(t, 0, running)
|
||||
assert.Equal(t, 0, queued)
|
||||
}
|
||||
|
||||
func TestMonitor_CountLiveWorkspaces_Good_RunningLivePID(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
|
||||
pid, _ := strconv.Atoi(core.Env("PID"))
|
||||
writeWorkspaceStatus(t, wsRoot, "ws-live", map[string]any{
|
||||
"status": "running",
|
||||
"repo": "go-io",
|
||||
"agent": "codex",
|
||||
"pid": pid,
|
||||
})
|
||||
|
||||
mon := New()
|
||||
running, queued := mon.countLiveWorkspaces()
|
||||
assert.Equal(t, 1, running)
|
||||
assert.Equal(t, 0, queued)
|
||||
}
|
||||
|
||||
// --- pidAlive ---
|
||||
|
||||
func TestMonitor_PidAlive_Good_CurrentProcess(t *testing.T) {
|
||||
pid, _ := strconv.Atoi(core.Env("PID"))
|
||||
assert.True(t, pidAlive(pid), "current process must be alive")
|
||||
}
|
||||
|
||||
func TestMonitor_PidAlive_Bad_DeadPID(t *testing.T) {
|
||||
assert.False(t, pidAlive(99999999))
|
||||
}
|
||||
|
||||
func TestMonitor_PidAlive_Ugly_ZeroPID(t *testing.T) {
|
||||
assert.NotPanics(t, func() { pidAlive(0) })
|
||||
}
|
||||
|
||||
func TestMonitor_PidAlive_Ugly_NegativePID(t *testing.T) {
|
||||
assert.NotPanics(t, func() { pidAlive(-1) })
|
||||
}
|
||||
|
||||
// --- OnStartup / OnShutdown ---
|
||||
|
||||
func TestMonitor_OnStartup_Good_StartsLoop(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
home := t.TempDir()
|
||||
t.Setenv("HOME", home)
|
||||
|
||||
mon := New(Options{Interval: 1 * time.Hour})
|
||||
r := mon.OnStartup(context.Background())
|
||||
assert.True(t, r.OK)
|
||||
assert.NotNil(t, mon.cancel)
|
||||
|
||||
r2 := mon.OnShutdown(context.Background())
|
||||
assert.True(t, r2.OK)
|
||||
}
|
||||
|
||||
func TestMonitor_OnStartup_Good_NoError(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
mon := New(Options{Interval: 1 * time.Hour})
|
||||
assert.True(t, mon.OnStartup(context.Background()).OK)
|
||||
_ = mon.OnShutdown(context.Background())
|
||||
}
|
||||
|
||||
func TestMonitor_OnShutdown_Good_NoError(t *testing.T) {
|
||||
mon := New(Options{Interval: 1 * time.Hour})
|
||||
assert.True(t, mon.OnShutdown(context.Background()).OK)
|
||||
}
|
||||
|
||||
func TestMonitor_OnShutdown_Good_StopsLoop(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
home := t.TempDir()
|
||||
t.Setenv("HOME", home)
|
||||
|
||||
mon := New(Options{Interval: 1 * time.Hour})
|
||||
require.True(t, mon.OnStartup(context.Background()).OK)
|
||||
|
||||
done := make(chan bool, 1)
|
||||
go func() {
|
||||
done <- mon.OnShutdown(context.Background()).OK
|
||||
}()
|
||||
|
||||
select {
|
||||
case ok := <-done:
|
||||
assert.True(t, ok)
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("OnShutdown did not return in time")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMonitor_OnShutdown_Ugly_NilCancel(t *testing.T) {
|
||||
mon := New()
|
||||
assert.NotPanics(t, func() {
|
||||
_ = mon.OnShutdown(context.Background())
|
||||
})
|
||||
}
|
||||
|
||||
// --- checkCompletions ---
|
||||
|
||||
func TestMonitor_CheckCompletions_Good_NewCompletions(t *testing.T) {
|
||||
|
|
@ -518,85 +811,6 @@ func TestMonitor_Loop_Good_PokeTriggersCheck(t *testing.T) {
|
|||
mon.wg.Wait()
|
||||
}
|
||||
|
||||
// --- initSyncTimestamp ---
|
||||
|
||||
func TestMonitor_InitSyncTimestamp_Good(t *testing.T) {
|
||||
mon := New()
|
||||
assert.Equal(t, int64(0), mon.lastSyncTimestamp)
|
||||
|
||||
before := time.Now().Unix()
|
||||
mon.initSyncTimestamp()
|
||||
after := time.Now().Unix()
|
||||
|
||||
mon.mu.Lock()
|
||||
ts := mon.lastSyncTimestamp
|
||||
mon.mu.Unlock()
|
||||
|
||||
assert.GreaterOrEqual(t, ts, before)
|
||||
assert.LessOrEqual(t, ts, after)
|
||||
}
|
||||
|
||||
func TestMonitor_InitSyncTimestamp_Good_NoOverwrite(t *testing.T) {
|
||||
mon := New()
|
||||
mon.lastSyncTimestamp = 12345
|
||||
|
||||
mon.initSyncTimestamp()
|
||||
|
||||
mon.mu.Lock()
|
||||
assert.Equal(t, int64(12345), mon.lastSyncTimestamp)
|
||||
mon.mu.Unlock()
|
||||
}
|
||||
|
||||
// --- syncRepos ---
|
||||
|
||||
func TestMonitor_SyncRepos_Good_NoChanges(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
assert.Equal(t, "/v1/agent/checkin", r.URL.Path)
|
||||
resp := CheckinResponse{Timestamp: time.Now().Unix()}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write([]byte(core.JSONMarshalString(resp)))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
setupAPIEnv(t, srv.URL)
|
||||
|
||||
mon := New()
|
||||
msg := mon.syncRepos()
|
||||
assert.Equal(t, "", msg)
|
||||
}
|
||||
|
||||
func TestMonitor_SyncRepos_Bad_APIError(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
setupAPIEnv(t, srv.URL)
|
||||
|
||||
mon := New()
|
||||
msg := mon.syncRepos()
|
||||
assert.Equal(t, "", msg)
|
||||
}
|
||||
|
||||
func TestMonitor_SyncRepos_Good_UpdatesTimestamp(t *testing.T) {
|
||||
newTS := time.Now().Unix() + 1000
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
resp := CheckinResponse{Timestamp: newTS}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write([]byte(core.JSONMarshalString(resp)))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
setupAPIEnv(t, srv.URL)
|
||||
|
||||
mon := New()
|
||||
mon.syncRepos()
|
||||
|
||||
mon.mu.Lock()
|
||||
assert.Equal(t, newTS, mon.lastSyncTimestamp)
|
||||
mon.mu.Unlock()
|
||||
}
|
||||
|
||||
// --- agentStatusResource ---
|
||||
|
||||
func TestMonitor_AgentStatusResource_Good(t *testing.T) {
|
||||
|
|
@ -665,270 +879,3 @@ func TestMonitor_AgentStatusResource_Good_DeepWorkspaceName(t *testing.T) {
|
|||
require.Len(t, workspaces, 1)
|
||||
assert.Equal(t, "core/go-io/task-9", workspaces[0]["name"])
|
||||
}
|
||||
|
||||
// --- syncRepos (git pull path) ---
|
||||
|
||||
func TestMonitor_SyncRepos_Good_PullsChangedRepo(t *testing.T) {
|
||||
remoteDir := core.JoinPath(t.TempDir(), "remote")
|
||||
fs.EnsureDir(remoteDir)
|
||||
run(t, remoteDir, "git", "init", "--bare")
|
||||
|
||||
codeDir := t.TempDir()
|
||||
repoDir := core.JoinPath(codeDir, "test-repo")
|
||||
run(t, codeDir, "git", "clone", remoteDir, "test-repo")
|
||||
run(t, repoDir, "git", "checkout", "-b", "main")
|
||||
fs.Write(core.JoinPath(repoDir, "README.md"), "# test")
|
||||
run(t, repoDir, "git", "add", ".")
|
||||
run(t, repoDir, "git", "commit", "-m", "init")
|
||||
run(t, repoDir, "git", "push", "-u", "origin", "main")
|
||||
|
||||
// Simulate another agent pushing work via a second clone
|
||||
clone2Parent := t.TempDir()
|
||||
tmpClone := core.JoinPath(clone2Parent, "clone2")
|
||||
run(t, clone2Parent, "git", "clone", remoteDir, "clone2")
|
||||
run(t, tmpClone, "git", "checkout", "main")
|
||||
fs.Write(core.JoinPath(tmpClone, "new.go"), "package main\n")
|
||||
run(t, tmpClone, "git", "add", ".")
|
||||
run(t, tmpClone, "git", "commit", "-m", "agent work")
|
||||
run(t, tmpClone, "git", "push", "origin", "main")
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
resp := CheckinResponse{
|
||||
Changed: []ChangedRepo{{Repo: "test-repo", Branch: "main", SHA: "abc"}},
|
||||
Timestamp: time.Now().Unix() + 100,
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write([]byte(core.JSONMarshalString(resp)))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
setupAPIEnv(t, srv.URL)
|
||||
t.Setenv("CODE_PATH", codeDir)
|
||||
|
||||
mon := New()
|
||||
mon.ServiceRuntime = testMon.ServiceRuntime
|
||||
msg := mon.syncRepos()
|
||||
assert.Contains(t, msg, "Synced 1 repo(s)")
|
||||
assert.Contains(t, msg, "test-repo")
|
||||
}
|
||||
|
||||
func TestMonitor_SyncRepos_Good_SkipsDirtyRepo(t *testing.T) {
|
||||
remoteDir := core.JoinPath(t.TempDir(), "remote")
|
||||
fs.EnsureDir(remoteDir)
|
||||
run(t, remoteDir, "git", "init", "--bare")
|
||||
|
||||
codeDir := t.TempDir()
|
||||
repoDir := core.JoinPath(codeDir, "dirty-repo")
|
||||
run(t, codeDir, "git", "clone", remoteDir, "dirty-repo")
|
||||
run(t, repoDir, "git", "checkout", "-b", "main")
|
||||
fs.Write(core.JoinPath(repoDir, "README.md"), "# test")
|
||||
run(t, repoDir, "git", "add", ".")
|
||||
run(t, repoDir, "git", "commit", "-m", "init")
|
||||
run(t, repoDir, "git", "push", "-u", "origin", "main")
|
||||
|
||||
// Make the repo dirty
|
||||
fs.Write(core.JoinPath(repoDir, "dirty.txt"), "uncommitted")
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
resp := CheckinResponse{
|
||||
Changed: []ChangedRepo{{Repo: "dirty-repo", Branch: "main", SHA: "abc"}},
|
||||
Timestamp: time.Now().Unix() + 100,
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write([]byte(core.JSONMarshalString(resp)))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
setupAPIEnv(t, srv.URL)
|
||||
t.Setenv("CODE_PATH", codeDir)
|
||||
|
||||
mon := New()
|
||||
mon.ServiceRuntime = testMon.ServiceRuntime
|
||||
msg := mon.syncRepos()
|
||||
assert.Equal(t, "", msg)
|
||||
}
|
||||
|
||||
func TestMonitor_SyncRepos_Good_SkipsNonMainBranch(t *testing.T) {
|
||||
remoteDir := core.JoinPath(t.TempDir(), "remote")
|
||||
fs.EnsureDir(remoteDir)
|
||||
run(t, remoteDir, "git", "init", "--bare")
|
||||
|
||||
codeDir := t.TempDir()
|
||||
repoDir := core.JoinPath(codeDir, "feature-repo")
|
||||
run(t, codeDir, "git", "clone", remoteDir, "feature-repo")
|
||||
run(t, repoDir, "git", "checkout", "-b", "main")
|
||||
fs.Write(core.JoinPath(repoDir, "README.md"), "# test")
|
||||
run(t, repoDir, "git", "add", ".")
|
||||
run(t, repoDir, "git", "commit", "-m", "init")
|
||||
run(t, repoDir, "git", "push", "-u", "origin", "main")
|
||||
run(t, repoDir, "git", "checkout", "-b", "feature/wip")
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
resp := CheckinResponse{
|
||||
Changed: []ChangedRepo{{Repo: "feature-repo", Branch: "main", SHA: "abc"}},
|
||||
Timestamp: time.Now().Unix() + 100,
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write([]byte(core.JSONMarshalString(resp)))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
setupAPIEnv(t, srv.URL)
|
||||
t.Setenv("CODE_PATH", codeDir)
|
||||
|
||||
mon := New()
|
||||
mon.ServiceRuntime = testMon.ServiceRuntime
|
||||
msg := mon.syncRepos()
|
||||
assert.Equal(t, "", msg)
|
||||
}
|
||||
|
||||
func TestMonitor_SyncRepos_Good_SkipsNonexistentRepo(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
resp := CheckinResponse{
|
||||
Changed: []ChangedRepo{{Repo: "nonexistent", Branch: "main", SHA: "abc"}},
|
||||
Timestamp: time.Now().Unix() + 100,
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write([]byte(core.JSONMarshalString(resp)))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
setupAPIEnv(t, srv.URL)
|
||||
t.Setenv("CODE_PATH", t.TempDir())
|
||||
|
||||
mon := New()
|
||||
msg := mon.syncRepos()
|
||||
assert.Equal(t, "", msg)
|
||||
}
|
||||
|
||||
func TestMonitor_SyncRepos_Good_UsesEnvBrainKey(t *testing.T) {
|
||||
var authHeader string
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
authHeader = r.Header.Get("Authorization")
|
||||
resp := CheckinResponse{Timestamp: time.Now().Unix()}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write([]byte(core.JSONMarshalString(resp)))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
home := t.TempDir()
|
||||
t.Setenv("HOME", home)
|
||||
t.Setenv("CORE_BRAIN_KEY", "env-key-value")
|
||||
t.Setenv("CORE_API_URL", srv.URL)
|
||||
t.Setenv("AGENT_NAME", "test-agent")
|
||||
|
||||
mon := New()
|
||||
mon.syncRepos()
|
||||
assert.Equal(t, "Bearer env-key-value", authHeader)
|
||||
}
|
||||
|
||||
// --- harvestCompleted (full path) ---
|
||||
|
||||
func TestMonitor_HarvestCompleted_Good_MultipleWorkspaces(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
name := fmt.Sprintf("ws-%d", i)
|
||||
wsDir := core.JoinPath(wsRoot, "workspace", name)
|
||||
|
||||
sourceDir := core.JoinPath(wsRoot, fmt.Sprintf("source-%d", i))
|
||||
fs.EnsureDir(sourceDir)
|
||||
run(t, sourceDir, "git", "init")
|
||||
run(t, sourceDir, "git", "checkout", "-b", "main")
|
||||
fs.Write(core.JoinPath(sourceDir, "README.md"), "# test")
|
||||
run(t, sourceDir, "git", "add", ".")
|
||||
run(t, sourceDir, "git", "commit", "-m", "init")
|
||||
|
||||
fs.EnsureDir(wsDir)
|
||||
run(t, wsDir, "git", "clone", sourceDir, "repo")
|
||||
repoDir := core.JoinPath(wsDir, "repo")
|
||||
run(t, repoDir, "git", "checkout", "-b", "agent/test-task")
|
||||
fs.Write(core.JoinPath(repoDir, "new.go"), "package main\n")
|
||||
run(t, repoDir, "git", "add", ".")
|
||||
run(t, repoDir, "git", "commit", "-m", "agent work")
|
||||
|
||||
writeStatus(t, wsDir, "completed", fmt.Sprintf("repo-%d", i), "agent/test-task")
|
||||
}
|
||||
|
||||
// Create Core with IPC handler to capture HarvestComplete messages
|
||||
var harvests []messages.HarvestComplete
|
||||
c := core.New(core.WithService(agentic.ProcessRegister))
|
||||
c.ServiceStartup(context.Background(), nil)
|
||||
c.RegisterAction(func(_ *core.Core, msg core.Message) core.Result {
|
||||
if ev, ok := msg.(messages.HarvestComplete); ok {
|
||||
harvests = append(harvests, ev)
|
||||
}
|
||||
return core.Result{OK: true}
|
||||
})
|
||||
|
||||
mon := New()
|
||||
mon.ServiceRuntime = core.NewServiceRuntime(c, MonitorOptions{})
|
||||
|
||||
msg := mon.harvestCompleted()
|
||||
assert.Contains(t, msg, "Harvested:")
|
||||
assert.Contains(t, msg, "repo-0")
|
||||
assert.Contains(t, msg, "repo-1")
|
||||
|
||||
assert.GreaterOrEqual(t, len(harvests), 2)
|
||||
}
|
||||
|
||||
func TestMonitor_HarvestCompleted_Good_Empty(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
mon := New()
|
||||
mon.ServiceRuntime = testMon.ServiceRuntime
|
||||
msg := mon.harvestCompleted()
|
||||
assert.Equal(t, "", msg)
|
||||
}
|
||||
|
||||
func TestMonitor_HarvestCompleted_Good_RejectedWorkspace(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
|
||||
sourceDir := core.JoinPath(wsRoot, "source-rej")
|
||||
fs.EnsureDir(sourceDir)
|
||||
run(t, sourceDir, "git", "init")
|
||||
run(t, sourceDir, "git", "checkout", "-b", "main")
|
||||
fs.Write(core.JoinPath(sourceDir, "README.md"), "# test")
|
||||
run(t, sourceDir, "git", "add", ".")
|
||||
run(t, sourceDir, "git", "commit", "-m", "init")
|
||||
|
||||
wsDir := core.JoinPath(wsRoot, "workspace", "ws-rej")
|
||||
fs.EnsureDir(wsDir)
|
||||
run(t, wsDir, "git", "clone", sourceDir, "repo")
|
||||
repoDir := core.JoinPath(wsDir, "repo")
|
||||
run(t, repoDir, "git", "checkout", "-b", "agent/test-task")
|
||||
fs.Write(core.JoinPath(repoDir, "new.go"), "package main\n")
|
||||
run(t, repoDir, "git", "add", ".")
|
||||
run(t, repoDir, "git", "commit", "-m", "agent work")
|
||||
|
||||
// Add binary to trigger rejection
|
||||
fs.Write(core.JoinPath(repoDir, "app.exe"), "binary")
|
||||
run(t, repoDir, "git", "add", ".")
|
||||
run(t, repoDir, "git", "commit", "-m", "add binary")
|
||||
|
||||
writeStatus(t, wsDir, "completed", "rej-repo", "agent/test-task")
|
||||
|
||||
// Create Core with IPC handler to capture HarvestRejected messages
|
||||
var rejections []messages.HarvestRejected
|
||||
c := core.New(core.WithService(agentic.ProcessRegister))
|
||||
c.ServiceStartup(context.Background(), nil)
|
||||
c.RegisterAction(func(_ *core.Core, msg core.Message) core.Result {
|
||||
if ev, ok := msg.(messages.HarvestRejected); ok {
|
||||
rejections = append(rejections, ev)
|
||||
}
|
||||
return core.Result{OK: true}
|
||||
})
|
||||
|
||||
mon := New()
|
||||
mon.ServiceRuntime = core.NewServiceRuntime(c, MonitorOptions{})
|
||||
|
||||
msg := mon.harvestCompleted()
|
||||
assert.Contains(t, msg, "REJECTED")
|
||||
|
||||
require.Len(t, rejections, 1)
|
||||
assert.Contains(t, rejections[0].Reason, "binary file added")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,22 +0,0 @@
|
|||
// SPDX-License-Identifier: EUPL-1.2
|
||||
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"dappco.re/go/agent/pkg/agentic"
|
||||
core "dappco.re/go/core"
|
||||
)
|
||||
|
||||
var testMon *Subsystem
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
c := core.New(core.WithService(agentic.ProcessRegister))
|
||||
c.ServiceStartup(context.Background(), nil)
|
||||
testMon = New()
|
||||
testMon.ServiceRuntime = core.NewServiceRuntime(c, MonitorOptions{})
|
||||
os.Exit(m.Run())
|
||||
}
|
||||
|
|
@ -5,25 +5,68 @@ package monitor
|
|||
import (
|
||||
"testing"
|
||||
|
||||
"dappco.re/go/agent/pkg/messages"
|
||||
core "dappco.re/go/core"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestRegister_Register_Good(t *testing.T) {
|
||||
func TestRegister_Register_Good_ReturnsSubsystem(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
|
||||
c := core.New(core.WithService(Register))
|
||||
svc, ok := core.ServiceFor[*Subsystem](c, "monitor")
|
||||
assert.True(t, ok)
|
||||
assert.NotNil(t, svc)
|
||||
}
|
||||
|
||||
func TestRegister_Register_Bad_ServiceName(t *testing.T) {
|
||||
func TestRegister_Register_Good_RegistersServiceName(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
|
||||
c := core.New(core.WithService(Register))
|
||||
assert.Contains(t, c.Services(), "monitor")
|
||||
}
|
||||
|
||||
func TestRegister_Register_Ugly_ServiceRuntime(t *testing.T) {
|
||||
func TestRegister_Register_Good_WiresServiceRuntime(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
|
||||
c := core.New(core.WithService(Register))
|
||||
svc, _ := core.ServiceFor[*Subsystem](c, "monitor")
|
||||
assert.NotNil(t, svc.ServiceRuntime)
|
||||
assert.Equal(t, c, svc.Core())
|
||||
}
|
||||
|
||||
func TestRegister_Register_Good_TracksStartedIPC(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
c := core.New(core.WithService(Register))
|
||||
svc, ok := core.ServiceFor[*Subsystem](c, "monitor")
|
||||
require.True(t, ok)
|
||||
|
||||
c.ACTION(messages.AgentStarted{Agent: "codex", Repo: "go-io", Workspace: "ws-reg"})
|
||||
|
||||
svc.mu.Lock()
|
||||
defer svc.mu.Unlock()
|
||||
assert.True(t, svc.seenRunning["ws-reg"])
|
||||
}
|
||||
|
||||
func TestRegister_Register_Good_TracksCompletedIPC(t *testing.T) {
|
||||
wsRoot := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", wsRoot)
|
||||
fs.EnsureDir(core.JoinPath(wsRoot, "workspace"))
|
||||
|
||||
c := core.New(core.WithService(Register))
|
||||
svc, ok := core.ServiceFor[*Subsystem](c, "monitor")
|
||||
require.True(t, ok)
|
||||
|
||||
c.ACTION(messages.AgentCompleted{Agent: "codex", Repo: "go-io", Workspace: "ws-done", Status: "completed"})
|
||||
|
||||
svc.mu.Lock()
|
||||
defer svc.mu.Unlock()
|
||||
assert.True(t, svc.seenCompleted["ws-done"])
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,8 +3,12 @@
|
|||
package monitor
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
|
|
@ -28,3 +32,203 @@ func TestSync_SyncRepos_Ugly_NoBrainKey(t *testing.T) {
|
|||
result := mon.syncRepos()
|
||||
assert.Equal(t, "", result)
|
||||
}
|
||||
|
||||
func TestSync_SyncRepos_Good_NoChanges(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
assert.Equal(t, "/v1/agent/checkin", r.URL.Path)
|
||||
resp := CheckinResponse{Timestamp: time.Now().Unix()}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write([]byte(core.JSONMarshalString(resp)))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
setupAPIEnv(t, srv.URL)
|
||||
|
||||
mon := New()
|
||||
msg := mon.syncRepos()
|
||||
assert.Equal(t, "", msg)
|
||||
}
|
||||
|
||||
func TestSync_SyncRepos_Bad_APIError(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
setupAPIEnv(t, srv.URL)
|
||||
|
||||
mon := New()
|
||||
msg := mon.syncRepos()
|
||||
assert.Equal(t, "", msg)
|
||||
}
|
||||
|
||||
func TestSync_SyncRepos_Good_UpdatesTimestamp(t *testing.T) {
|
||||
newTS := time.Now().Unix() + 1000
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
resp := CheckinResponse{Timestamp: newTS}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write([]byte(core.JSONMarshalString(resp)))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
setupAPIEnv(t, srv.URL)
|
||||
|
||||
mon := New()
|
||||
mon.syncRepos()
|
||||
|
||||
mon.mu.Lock()
|
||||
assert.Equal(t, newTS, mon.lastSyncTimestamp)
|
||||
mon.mu.Unlock()
|
||||
}
|
||||
|
||||
func TestSync_SyncRepos_Good_PullsChangedRepo(t *testing.T) {
|
||||
remoteDir := core.JoinPath(t.TempDir(), "remote")
|
||||
fs.EnsureDir(remoteDir)
|
||||
run(t, remoteDir, "git", "init", "--bare")
|
||||
|
||||
codeDir := t.TempDir()
|
||||
repoDir := core.JoinPath(codeDir, "test-repo")
|
||||
run(t, codeDir, "git", "clone", remoteDir, "test-repo")
|
||||
run(t, repoDir, "git", "checkout", "-b", "main")
|
||||
fs.Write(core.JoinPath(repoDir, "README.md"), "# test")
|
||||
run(t, repoDir, "git", "add", ".")
|
||||
run(t, repoDir, "git", "commit", "-m", "init")
|
||||
run(t, repoDir, "git", "push", "-u", "origin", "main")
|
||||
|
||||
clone2Parent := t.TempDir()
|
||||
tmpClone := core.JoinPath(clone2Parent, "clone2")
|
||||
run(t, clone2Parent, "git", "clone", remoteDir, "clone2")
|
||||
run(t, tmpClone, "git", "checkout", "main")
|
||||
fs.Write(core.JoinPath(tmpClone, "new.go"), "package main\n")
|
||||
run(t, tmpClone, "git", "add", ".")
|
||||
run(t, tmpClone, "git", "commit", "-m", "agent work")
|
||||
run(t, tmpClone, "git", "push", "origin", "main")
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
resp := CheckinResponse{
|
||||
Changed: []ChangedRepo{{Repo: "test-repo", Branch: "main", SHA: "abc"}},
|
||||
Timestamp: time.Now().Unix() + 100,
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write([]byte(core.JSONMarshalString(resp)))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
setupAPIEnv(t, srv.URL)
|
||||
t.Setenv("CODE_PATH", codeDir)
|
||||
|
||||
mon := New()
|
||||
mon.ServiceRuntime = testMon.ServiceRuntime
|
||||
msg := mon.syncRepos()
|
||||
assert.Contains(t, msg, "Synced 1 repo(s)")
|
||||
assert.Contains(t, msg, "test-repo")
|
||||
}
|
||||
|
||||
func TestSync_SyncRepos_Good_SkipsDirtyRepo(t *testing.T) {
|
||||
remoteDir := core.JoinPath(t.TempDir(), "remote")
|
||||
fs.EnsureDir(remoteDir)
|
||||
run(t, remoteDir, "git", "init", "--bare")
|
||||
|
||||
codeDir := t.TempDir()
|
||||
repoDir := core.JoinPath(codeDir, "dirty-repo")
|
||||
run(t, codeDir, "git", "clone", remoteDir, "dirty-repo")
|
||||
run(t, repoDir, "git", "checkout", "-b", "main")
|
||||
fs.Write(core.JoinPath(repoDir, "README.md"), "# test")
|
||||
run(t, repoDir, "git", "add", ".")
|
||||
run(t, repoDir, "git", "commit", "-m", "init")
|
||||
run(t, repoDir, "git", "push", "-u", "origin", "main")
|
||||
|
||||
fs.Write(core.JoinPath(repoDir, "dirty.txt"), "uncommitted")
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
resp := CheckinResponse{
|
||||
Changed: []ChangedRepo{{Repo: "dirty-repo", Branch: "main", SHA: "abc"}},
|
||||
Timestamp: time.Now().Unix() + 100,
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write([]byte(core.JSONMarshalString(resp)))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
setupAPIEnv(t, srv.URL)
|
||||
t.Setenv("CODE_PATH", codeDir)
|
||||
|
||||
mon := New()
|
||||
mon.ServiceRuntime = testMon.ServiceRuntime
|
||||
msg := mon.syncRepos()
|
||||
assert.Equal(t, "", msg)
|
||||
}
|
||||
|
||||
func TestSync_SyncRepos_Good_SkipsNonMainBranch(t *testing.T) {
|
||||
remoteDir := core.JoinPath(t.TempDir(), "remote")
|
||||
fs.EnsureDir(remoteDir)
|
||||
run(t, remoteDir, "git", "init", "--bare")
|
||||
|
||||
codeDir := t.TempDir()
|
||||
repoDir := core.JoinPath(codeDir, "feature-repo")
|
||||
run(t, codeDir, "git", "clone", remoteDir, "feature-repo")
|
||||
run(t, repoDir, "git", "checkout", "-b", "main")
|
||||
fs.Write(core.JoinPath(repoDir, "README.md"), "# test")
|
||||
run(t, repoDir, "git", "add", ".")
|
||||
run(t, repoDir, "git", "commit", "-m", "init")
|
||||
run(t, repoDir, "git", "push", "-u", "origin", "main")
|
||||
run(t, repoDir, "git", "checkout", "-b", "feature/wip")
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
resp := CheckinResponse{
|
||||
Changed: []ChangedRepo{{Repo: "feature-repo", Branch: "main", SHA: "abc"}},
|
||||
Timestamp: time.Now().Unix() + 100,
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write([]byte(core.JSONMarshalString(resp)))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
setupAPIEnv(t, srv.URL)
|
||||
t.Setenv("CODE_PATH", codeDir)
|
||||
|
||||
mon := New()
|
||||
mon.ServiceRuntime = testMon.ServiceRuntime
|
||||
msg := mon.syncRepos()
|
||||
assert.Equal(t, "", msg)
|
||||
}
|
||||
|
||||
func TestSync_SyncRepos_Good_SkipsNonexistentRepo(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
resp := CheckinResponse{
|
||||
Changed: []ChangedRepo{{Repo: "nonexistent", Branch: "main", SHA: "abc"}},
|
||||
Timestamp: time.Now().Unix() + 100,
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write([]byte(core.JSONMarshalString(resp)))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
setupAPIEnv(t, srv.URL)
|
||||
t.Setenv("CODE_PATH", t.TempDir())
|
||||
|
||||
mon := New()
|
||||
msg := mon.syncRepos()
|
||||
assert.Equal(t, "", msg)
|
||||
}
|
||||
|
||||
func TestSync_SyncRepos_Good_UsesEnvBrainKey(t *testing.T) {
|
||||
var authHeader string
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
authHeader = r.Header.Get("Authorization")
|
||||
resp := CheckinResponse{Timestamp: time.Now().Unix()}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write([]byte(core.JSONMarshalString(resp)))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
home := t.TempDir()
|
||||
t.Setenv("HOME", home)
|
||||
t.Setenv("CORE_BRAIN_KEY", "env-key-value")
|
||||
t.Setenv("CORE_API_URL", srv.URL)
|
||||
t.Setenv("AGENT_NAME", "test-agent")
|
||||
|
||||
mon := New()
|
||||
mon.syncRepos()
|
||||
assert.Equal(t, "Bearer env-key-value", authHeader)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue