From c1140cc91799a293765cfe4cc59d4cde3f65ba85 Mon Sep 17 00:00:00 2001 From: Virgil Date: Sun, 29 Mar 2026 23:10:19 +0000 Subject: [PATCH] test(monitor): align AX test paths Co-Authored-By: Virgil --- pkg/monitor/harvest_test.go | 107 +++++ pkg/monitor/logic_test.go | 410 ----------------- pkg/monitor/monitor_test.go | 639 ++++++++++++--------------- pkg/monitor/monitor_testmain_test.go | 22 - pkg/monitor/register_test.go | 49 +- pkg/monitor/sync_test.go | 204 +++++++++ 6 files changed, 650 insertions(+), 781 deletions(-) delete mode 100644 pkg/monitor/logic_test.go delete mode 100644 pkg/monitor/monitor_testmain_test.go diff --git a/pkg/monitor/harvest_test.go b/pkg/monitor/harvest_test.go index ac701a1..f6ddd2a 100644 --- a/pkg/monitor/harvest_test.go +++ b/pkg/monitor/harvest_test.go @@ -4,6 +4,7 @@ package monitor import ( "context" + "fmt" "testing" "dappco.re/go/agent/pkg/agentic" @@ -227,6 +228,112 @@ func TestHarvest_HarvestCompleted_Good_ChannelEvents(t *testing.T) { assert.Equal(t, 1, captured[0].Files) } +func TestHarvest_HarvestCompleted_Good_MultipleWorkspaces(t *testing.T) { + wsRoot := t.TempDir() + t.Setenv("CORE_WORKSPACE", wsRoot) + + for i := 0; i < 2; i++ { + name := fmt.Sprintf("ws-%d", i) + wsDir := core.JoinPath(wsRoot, "workspace", name) + + sourceDir := core.JoinPath(wsRoot, fmt.Sprintf("source-%d", i)) + fs.EnsureDir(sourceDir) + run(t, sourceDir, "git", "init") + run(t, sourceDir, "git", "checkout", "-b", "main") + fs.Write(core.JoinPath(sourceDir, "README.md"), "# test") + run(t, sourceDir, "git", "add", ".") + run(t, sourceDir, "git", "commit", "-m", "init") + + fs.EnsureDir(wsDir) + run(t, wsDir, "git", "clone", sourceDir, "repo") + repoDir := core.JoinPath(wsDir, "repo") + run(t, repoDir, "git", "checkout", "-b", "agent/test-task") + fs.Write(core.JoinPath(repoDir, "new.go"), "package main\n") + run(t, repoDir, "git", "add", ".") + run(t, repoDir, "git", "commit", "-m", "agent work") + + writeStatus(t, wsDir, "completed", fmt.Sprintf("repo-%d", i), "agent/test-task") + } + + var harvests []messages.HarvestComplete + c := core.New(core.WithService(agentic.ProcessRegister)) + c.ServiceStartup(context.Background(), nil) + c.RegisterAction(func(_ *core.Core, msg core.Message) core.Result { + if ev, ok := msg.(messages.HarvestComplete); ok { + harvests = append(harvests, ev) + } + return core.Result{OK: true} + }) + + mon := New() + mon.ServiceRuntime = core.NewServiceRuntime(c, MonitorOptions{}) + + msg := mon.harvestCompleted() + assert.Contains(t, msg, "Harvested:") + assert.Contains(t, msg, "repo-0") + assert.Contains(t, msg, "repo-1") + + assert.GreaterOrEqual(t, len(harvests), 2) +} + +func TestHarvest_HarvestCompleted_Good_Empty(t *testing.T) { + wsRoot := t.TempDir() + t.Setenv("CORE_WORKSPACE", wsRoot) + fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) + + mon := New() + mon.ServiceRuntime = testMon.ServiceRuntime + msg := mon.harvestCompleted() + assert.Equal(t, "", msg) +} + +func TestHarvest_HarvestCompleted_Good_RejectedWorkspace(t *testing.T) { + wsRoot := t.TempDir() + t.Setenv("CORE_WORKSPACE", wsRoot) + + sourceDir := core.JoinPath(wsRoot, "source-rej") + fs.EnsureDir(sourceDir) + run(t, sourceDir, "git", "init") + run(t, sourceDir, "git", "checkout", "-b", "main") + fs.Write(core.JoinPath(sourceDir, "README.md"), "# test") + run(t, sourceDir, "git", "add", ".") + run(t, sourceDir, "git", "commit", "-m", "init") + + wsDir := core.JoinPath(wsRoot, "workspace", "ws-rej") + fs.EnsureDir(wsDir) + run(t, wsDir, "git", "clone", sourceDir, "repo") + repoDir := core.JoinPath(wsDir, "repo") + run(t, repoDir, "git", "checkout", "-b", "agent/test-task") + fs.Write(core.JoinPath(repoDir, "new.go"), "package main\n") + run(t, repoDir, "git", "add", ".") + run(t, repoDir, "git", "commit", "-m", "agent work") + + fs.Write(core.JoinPath(repoDir, "app.exe"), "binary") + run(t, repoDir, "git", "add", ".") + run(t, repoDir, "git", "commit", "-m", "add binary") + + writeStatus(t, wsDir, "completed", "rej-repo", "agent/test-task") + + var rejections []messages.HarvestRejected + c := core.New(core.WithService(agentic.ProcessRegister)) + c.ServiceStartup(context.Background(), nil) + c.RegisterAction(func(_ *core.Core, msg core.Message) core.Result { + if ev, ok := msg.(messages.HarvestRejected); ok { + rejections = append(rejections, ev) + } + return core.Result{OK: true} + }) + + mon := New() + mon.ServiceRuntime = core.NewServiceRuntime(c, MonitorOptions{}) + + msg := mon.harvestCompleted() + assert.Contains(t, msg, "REJECTED") + + require.Len(t, rejections, 1) + assert.Contains(t, rejections[0].Reason, "binary file added") +} + func TestHarvest_UpdateStatus_Good(t *testing.T) { dir := t.TempDir() initial := map[string]any{"status": "completed", "repo": "test"} diff --git a/pkg/monitor/logic_test.go b/pkg/monitor/logic_test.go deleted file mode 100644 index bef9a97..0000000 --- a/pkg/monitor/logic_test.go +++ /dev/null @@ -1,410 +0,0 @@ -// SPDX-License-Identifier: EUPL-1.2 - -package monitor - -import ( - "context" - "strconv" - "testing" - "time" - - "dappco.re/go/agent/pkg/messages" - core "dappco.re/go/core" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -// --- handleAgentStarted --- - -func TestLogic_HandleAgentStarted_Good(t *testing.T) { - mon := New() - ev := messages.AgentStarted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-1"} - mon.handleAgentStarted(ev) - - mon.mu.Lock() - defer mon.mu.Unlock() - assert.True(t, mon.seenRunning["core/go-io/task-1"]) -} - -func TestLogic_HandleAgentStarted_Bad_EmptyWorkspace(t *testing.T) { - mon := New() - // Empty workspace key must not panic and must record empty string key. - ev := messages.AgentStarted{Agent: "", Repo: "", Workspace: ""} - assert.NotPanics(t, func() { mon.handleAgentStarted(ev) }) - - mon.mu.Lock() - defer mon.mu.Unlock() - assert.True(t, mon.seenRunning[""]) -} - -// --- handleAgentCompleted --- - -func TestLogic_HandleAgentCompleted_Good_NilRuntime(t *testing.T) { - wsRoot := t.TempDir() - t.Setenv("CORE_WORKSPACE", wsRoot) - fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) - - mon := New() - // ServiceRuntime is nil — must not panic, must record completion and poke. - ev := messages.AgentCompleted{Agent: "codex", Repo: "go-io", Workspace: "ws-1", Status: "completed"} - assert.NotPanics(t, func() { mon.handleAgentCompleted(ev) }) - - mon.mu.Lock() - defer mon.mu.Unlock() - assert.True(t, mon.seenCompleted["ws-1"]) -} - -func TestLogic_HandleAgentCompleted_Good_WithCore(t *testing.T) { - wsRoot := t.TempDir() - t.Setenv("CORE_WORKSPACE", wsRoot) - fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) - - // Use Register so IPC handlers are wired - c := core.New(core.WithService(Register)) - mon, ok := core.ServiceFor[*Subsystem](c, "monitor") - require.True(t, ok) - - ev := messages.AgentCompleted{Agent: "codex", Repo: "go-io", Workspace: "ws-2", Status: "completed"} - c.ACTION(ev) - - mon.mu.Lock() - defer mon.mu.Unlock() - assert.True(t, mon.seenCompleted["ws-2"]) -} - -func TestLogic_HandleAgentCompleted_Bad_EmptyFields(t *testing.T) { - wsRoot := t.TempDir() - t.Setenv("CORE_WORKSPACE", wsRoot) - fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) - - mon := New() - - // All fields empty — must not panic, must record empty workspace key. - ev := messages.AgentCompleted{} - assert.NotPanics(t, func() { mon.handleAgentCompleted(ev) }) - - mon.mu.Lock() - defer mon.mu.Unlock() - assert.True(t, mon.seenCompleted[""]) -} - -// --- checkIdleAfterDelay --- - -func TestLogic_CheckIdleAfterDelay_Bad_NilRuntime(t *testing.T) { - wsRoot := t.TempDir() - t.Setenv("CORE_WORKSPACE", wsRoot) - fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) - - mon := New() // ServiceRuntime is nil - - // Should return immediately without panic after the 5s sleep. - // We test the "ServiceRuntime == nil" return branch by exercising the guard directly. - done := make(chan struct{}) - go func() { - if mon.ServiceRuntime == nil { - close(done) - return - } - mon.checkIdleAfterDelay() - close(done) - }() - - select { - case <-done: - case <-time.After(1 * time.Second): - t.Fatal("checkIdleAfterDelay nil-runtime guard did not return quickly") - } -} - -func TestLogic_CheckIdleAfterDelay_Good_EmptyWorkspace(t *testing.T) { - wsRoot := t.TempDir() - t.Setenv("CORE_WORKSPACE", wsRoot) - fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) - - // Create a Core with an IPC handler to capture QueueDrained messages - var captured []messages.QueueDrained - c := core.New() - c.RegisterAction(func(_ *core.Core, msg core.Message) core.Result { - if ev, ok := msg.(messages.QueueDrained); ok { - captured = append(captured, ev) - } - return core.Result{OK: true} - }) - - mon := New() - mon.ServiceRuntime = core.NewServiceRuntime(c, MonitorOptions{}) - - // With empty workspace, running=0 and queued=0, so queue.drained fires. - running, queued := mon.countLiveWorkspaces() - assert.Equal(t, 0, running) - assert.Equal(t, 0, queued) - - if running == 0 && queued == 0 { - mon.Core().ACTION(messages.QueueDrained{Completed: 0}) - } - - require.Len(t, captured, 1) - assert.Equal(t, 0, captured[0].Completed) -} - -// --- countLiveWorkspaces --- - -func TestLogic_CountLiveWorkspaces_Good_EmptyWorkspace(t *testing.T) { - wsRoot := t.TempDir() - t.Setenv("CORE_WORKSPACE", wsRoot) - fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) - - mon := New() - running, queued := mon.countLiveWorkspaces() - assert.Equal(t, 0, running) - assert.Equal(t, 0, queued) -} - -func TestLogic_CountLiveWorkspaces_Good_QueuedStatus(t *testing.T) { - wsRoot := t.TempDir() - t.Setenv("CORE_WORKSPACE", wsRoot) - - writeWorkspaceStatus(t, wsRoot, "ws-q", map[string]any{ - "status": "queued", - "repo": "go-io", - "agent": "codex", - }) - - mon := New() - running, queued := mon.countLiveWorkspaces() - assert.Equal(t, 0, running) - assert.Equal(t, 1, queued) -} - -func TestLogic_CountLiveWorkspaces_Bad_RunningDeadPID(t *testing.T) { - wsRoot := t.TempDir() - t.Setenv("CORE_WORKSPACE", wsRoot) - - // PID 1 is always init/launchd and not "our" process — on macOS sending - // signal 0 to PID 1 returns EPERM (process exists but not ours), which - // means pidAlive returns false for non-owned processes. Use PID 99999999 - // which is near-certainly dead. - writeWorkspaceStatus(t, wsRoot, "ws-dead", map[string]any{ - "status": "running", - "repo": "go-io", - "agent": "codex", - "pid": 99999999, - }) - - mon := New() - running, queued := mon.countLiveWorkspaces() - // Dead PID should not count as running. - assert.Equal(t, 0, running) - assert.Equal(t, 0, queued) -} - -func TestLogic_CountLiveWorkspaces_Good_RunningLivePID(t *testing.T) { - wsRoot := t.TempDir() - t.Setenv("CORE_WORKSPACE", wsRoot) - - // Current process is definitely alive. - pid, _ := strconv.Atoi(core.Env("PID")) - writeWorkspaceStatus(t, wsRoot, "ws-live", map[string]any{ - "status": "running", - "repo": "go-io", - "agent": "codex", - "pid": pid, - }) - - mon := New() - running, queued := mon.countLiveWorkspaces() - assert.Equal(t, 1, running) - assert.Equal(t, 0, queued) -} - -// --- pidAlive --- - -func TestLogic_PidAlive_Good_CurrentProcess(t *testing.T) { - pid, _ := strconv.Atoi(core.Env("PID")) - assert.True(t, pidAlive(pid), "current process must be alive") -} - -func TestLogic_PidAlive_Bad_DeadPID(t *testing.T) { - // PID 99999999 is virtually guaranteed to not exist. - assert.False(t, pidAlive(99999999)) -} - -func TestLogic_PidAlive_Ugly_ZeroPID(t *testing.T) { - // PID 0 is not a valid user process. pidAlive must return false or at - // least not panic. - assert.NotPanics(t, func() { pidAlive(0) }) -} - -func TestLogic_PidAlive_Ugly_NegativePID(t *testing.T) { - // Negative PID is invalid. Must not panic. - assert.NotPanics(t, func() { pidAlive(-1) }) -} - -// --- SetCore --- - -func TestLogic_SetCore_Good_RegistersIPCHandler(t *testing.T) { - c := core.New() - mon := New() - - // SetCore must not panic and must wire ServiceRuntime. - assert.NotPanics(t, func() { mon.SetCore(c) }) - assert.NotNil(t, mon.ServiceRuntime) - assert.Equal(t, c, mon.Core()) -} - -func TestLogic_SetCore_Good_IPCHandlerFires(t *testing.T) { - wsRoot := t.TempDir() - t.Setenv("CORE_WORKSPACE", wsRoot) - fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) - - // IPC handlers are registered via Register, not SetCore - c := core.New(core.WithService(Register)) - - mon, ok := core.ServiceFor[*Subsystem](c, "monitor") - require.True(t, ok) - - // Dispatch an AgentStarted via Core IPC — handler must update seenRunning. - c.ACTION(messages.AgentStarted{Agent: "codex", Repo: "go-io", Workspace: "ws-ipc"}) - - mon.mu.Lock() - defer mon.mu.Unlock() - assert.True(t, mon.seenRunning["ws-ipc"]) -} - -func TestLogic_SetCore_Good_CompletedIPCHandler(t *testing.T) { - wsRoot := t.TempDir() - t.Setenv("CORE_WORKSPACE", wsRoot) - fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) - - // IPC handlers are registered via Register, not SetCore - c := core.New(core.WithService(Register)) - - mon, ok := core.ServiceFor[*Subsystem](c, "monitor") - require.True(t, ok) - - // Dispatch AgentCompleted — handler must update seenCompleted. - c.ACTION(messages.AgentCompleted{Agent: "codex", Repo: "go-io", Workspace: "ws-done", Status: "completed"}) - - mon.mu.Lock() - defer mon.mu.Unlock() - assert.True(t, mon.seenCompleted["ws-done"]) -} - -// --- OnStartup / OnShutdown --- - -func TestLogic_OnStartup_Good_StartsLoop(t *testing.T) { - wsRoot := t.TempDir() - t.Setenv("CORE_WORKSPACE", wsRoot) - fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) - - home := t.TempDir() - t.Setenv("HOME", home) - - mon := New(Options{Interval: 1 * time.Hour}) - r := mon.OnStartup(context.Background()) - assert.True(t, r.OK) - - // cancel must be non-nil after startup (loop running) - assert.NotNil(t, mon.cancel) - - // Cleanup. - r2 := mon.OnShutdown(context.Background()) - assert.True(t, r2.OK) -} - -func TestLogic_OnStartup_Good_NoError(t *testing.T) { - wsRoot := t.TempDir() - t.Setenv("CORE_WORKSPACE", wsRoot) - fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) - - mon := New(Options{Interval: 1 * time.Hour}) - assert.True(t, mon.OnStartup(context.Background()).OK) - _ = mon.OnShutdown(context.Background()) -} - -func TestLogic_OnShutdown_Good_NoError(t *testing.T) { - mon := New(Options{Interval: 1 * time.Hour}) - assert.True(t, mon.OnShutdown(context.Background()).OK) -} - -func TestLogic_OnShutdown_Good_StopsLoop(t *testing.T) { - wsRoot := t.TempDir() - t.Setenv("CORE_WORKSPACE", wsRoot) - fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) - - home := t.TempDir() - t.Setenv("HOME", home) - - mon := New(Options{Interval: 1 * time.Hour}) - require.True(t, mon.OnStartup(context.Background()).OK) - - done := make(chan bool, 1) - go func() { - done <- mon.OnShutdown(context.Background()).OK - }() - - select { - case ok := <-done: - assert.True(t, ok) - case <-time.After(5 * time.Second): - t.Fatal("OnShutdown did not return in time") - } -} - -func TestLogic_OnShutdown_Ugly_NilCancel(t *testing.T) { - // OnShutdown without prior OnStartup must not panic. - mon := New() - assert.NotPanics(t, func() { - _ = mon.OnShutdown(context.Background()) - }) -} - -// --- Register --- - -func TestLogic_Register_Good_ReturnsSubsystem(t *testing.T) { - wsRoot := t.TempDir() - t.Setenv("CORE_WORKSPACE", wsRoot) - - c := core.New(core.WithService(Register)) - require.NotNil(t, c) - - // Register returns the Subsystem as Value; WithService auto-registers it - // under the package name "monitor". - svc, ok := core.ServiceFor[*Subsystem](c, "monitor") - assert.True(t, ok, "Subsystem must be registered as \"monitor\"") - assert.NotNil(t, svc) -} - -func TestLogic_Register_Good_CoreWired(t *testing.T) { - wsRoot := t.TempDir() - t.Setenv("CORE_WORKSPACE", wsRoot) - - c := core.New(core.WithService(Register)) - require.NotNil(t, c) - - svc, ok := core.ServiceFor[*Subsystem](c, "monitor") - require.True(t, ok) - - // Register must set ServiceRuntime. - assert.NotNil(t, svc.ServiceRuntime) - assert.Equal(t, c, svc.Core()) -} - -func TestLogic_Register_Good_IPCHandlerActive(t *testing.T) { - wsRoot := t.TempDir() - t.Setenv("CORE_WORKSPACE", wsRoot) - fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) - - c := core.New(core.WithService(Register)) - require.NotNil(t, c) - - svc, ok := core.ServiceFor[*Subsystem](c, "monitor") - require.True(t, ok) - - // Fire an AgentStarted message — the registered IPC handler must update seenRunning. - c.ACTION(messages.AgentStarted{Agent: "codex", Repo: "go-io", Workspace: "ws-reg"}) - - svc.mu.Lock() - defer svc.mu.Unlock() - assert.True(t, svc.seenRunning["ws-reg"]) -} diff --git a/pkg/monitor/monitor_test.go b/pkg/monitor/monitor_test.go index f2784b9..94b97f1 100644 --- a/pkg/monitor/monitor_test.go +++ b/pkg/monitor/monitor_test.go @@ -7,6 +7,8 @@ import ( "fmt" "net/http" "net/http/httptest" + "os" + "strconv" "testing" "time" @@ -18,6 +20,16 @@ import ( "github.com/stretchr/testify/require" ) +var testMon *Subsystem + +func TestMain(m *testing.M) { + c := core.New(core.WithService(agentic.ProcessRegister)) + c.ServiceStartup(context.Background(), nil) + testMon = New() + testMon.ServiceRuntime = core.NewServiceRuntime(c, MonitorOptions{}) + os.Exit(m.Run()) +} + type capturedChannelEvent struct { Channel string Data any @@ -133,6 +145,287 @@ func TestMonitor_Shutdown_Good_NilCancel(t *testing.T) { assert.NoError(t, err) } +// --- SetCore --- + +func TestMonitor_SetCore_Good_WiresServiceRuntime(t *testing.T) { + c := core.New() + mon := New() + + assert.NotPanics(t, func() { mon.SetCore(c) }) + assert.NotNil(t, mon.ServiceRuntime) + assert.Equal(t, c, mon.Core()) +} + +// --- handleAgentStarted / handleAgentCompleted --- + +func TestMonitor_HandleAgentStarted_Good(t *testing.T) { + mon := New() + ev := messages.AgentStarted{Agent: "codex", Repo: "go-io", Workspace: "core/go-io/task-1"} + mon.handleAgentStarted(ev) + + mon.mu.Lock() + defer mon.mu.Unlock() + assert.True(t, mon.seenRunning["core/go-io/task-1"]) +} + +func TestMonitor_HandleAgentStarted_Bad_EmptyWorkspace(t *testing.T) { + mon := New() + ev := messages.AgentStarted{} + + assert.NotPanics(t, func() { mon.handleAgentStarted(ev) }) + + mon.mu.Lock() + defer mon.mu.Unlock() + assert.True(t, mon.seenRunning[""]) +} + +func TestMonitor_HandleAgentCompleted_Good_NilRuntime(t *testing.T) { + wsRoot := t.TempDir() + t.Setenv("CORE_WORKSPACE", wsRoot) + fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) + + mon := New() + ev := messages.AgentCompleted{Agent: "codex", Repo: "go-io", Workspace: "ws-1", Status: "completed"} + + assert.NotPanics(t, func() { mon.handleAgentCompleted(ev) }) + + mon.mu.Lock() + defer mon.mu.Unlock() + assert.True(t, mon.seenCompleted["ws-1"]) +} + +func TestMonitor_HandleAgentCompleted_Good_WithCore(t *testing.T) { + wsRoot := t.TempDir() + t.Setenv("CORE_WORKSPACE", wsRoot) + fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) + + c := core.New(core.WithService(Register)) + mon, ok := core.ServiceFor[*Subsystem](c, "monitor") + require.True(t, ok) + + c.ACTION(messages.AgentCompleted{Agent: "codex", Repo: "go-io", Workspace: "ws-2", Status: "completed"}) + + mon.mu.Lock() + defer mon.mu.Unlock() + assert.True(t, mon.seenCompleted["ws-2"]) +} + +func TestMonitor_HandleAgentCompleted_Bad_EmptyFields(t *testing.T) { + wsRoot := t.TempDir() + t.Setenv("CORE_WORKSPACE", wsRoot) + fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) + + mon := New() + + assert.NotPanics(t, func() { mon.handleAgentCompleted(messages.AgentCompleted{}) }) + + mon.mu.Lock() + defer mon.mu.Unlock() + assert.True(t, mon.seenCompleted[""]) +} + +// --- checkIdleAfterDelay --- + +func TestMonitor_CheckIdleAfterDelay_Bad_NilRuntime(t *testing.T) { + wsRoot := t.TempDir() + t.Setenv("CORE_WORKSPACE", wsRoot) + fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) + + mon := New() + done := make(chan struct{}) + go func() { + if mon.ServiceRuntime == nil { + close(done) + return + } + mon.checkIdleAfterDelay() + close(done) + }() + + select { + case <-done: + case <-time.After(1 * time.Second): + t.Fatal("checkIdleAfterDelay nil-runtime guard did not return quickly") + } +} + +func TestMonitor_CheckIdleAfterDelay_Good_EmptyWorkspace(t *testing.T) { + wsRoot := t.TempDir() + t.Setenv("CORE_WORKSPACE", wsRoot) + fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) + + var captured []messages.QueueDrained + c := core.New() + c.RegisterAction(func(_ *core.Core, msg core.Message) core.Result { + if ev, ok := msg.(messages.QueueDrained); ok { + captured = append(captured, ev) + } + return core.Result{OK: true} + }) + + mon := New() + mon.ServiceRuntime = core.NewServiceRuntime(c, MonitorOptions{}) + + running, queued := mon.countLiveWorkspaces() + assert.Equal(t, 0, running) + assert.Equal(t, 0, queued) + + if running == 0 && queued == 0 { + mon.Core().ACTION(messages.QueueDrained{Completed: 0}) + } + + require.Len(t, captured, 1) + assert.Equal(t, 0, captured[0].Completed) +} + +// --- countLiveWorkspaces --- + +func TestMonitor_CountLiveWorkspaces_Good_EmptyWorkspace(t *testing.T) { + wsRoot := t.TempDir() + t.Setenv("CORE_WORKSPACE", wsRoot) + fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) + + mon := New() + running, queued := mon.countLiveWorkspaces() + assert.Equal(t, 0, running) + assert.Equal(t, 0, queued) +} + +func TestMonitor_CountLiveWorkspaces_Good_QueuedStatus(t *testing.T) { + wsRoot := t.TempDir() + t.Setenv("CORE_WORKSPACE", wsRoot) + + writeWorkspaceStatus(t, wsRoot, "ws-q", map[string]any{ + "status": "queued", + "repo": "go-io", + "agent": "codex", + }) + + mon := New() + running, queued := mon.countLiveWorkspaces() + assert.Equal(t, 0, running) + assert.Equal(t, 1, queued) +} + +func TestMonitor_CountLiveWorkspaces_Bad_RunningDeadPID(t *testing.T) { + wsRoot := t.TempDir() + t.Setenv("CORE_WORKSPACE", wsRoot) + + writeWorkspaceStatus(t, wsRoot, "ws-dead", map[string]any{ + "status": "running", + "repo": "go-io", + "agent": "codex", + "pid": 99999999, + }) + + mon := New() + running, queued := mon.countLiveWorkspaces() + assert.Equal(t, 0, running) + assert.Equal(t, 0, queued) +} + +func TestMonitor_CountLiveWorkspaces_Good_RunningLivePID(t *testing.T) { + wsRoot := t.TempDir() + t.Setenv("CORE_WORKSPACE", wsRoot) + + pid, _ := strconv.Atoi(core.Env("PID")) + writeWorkspaceStatus(t, wsRoot, "ws-live", map[string]any{ + "status": "running", + "repo": "go-io", + "agent": "codex", + "pid": pid, + }) + + mon := New() + running, queued := mon.countLiveWorkspaces() + assert.Equal(t, 1, running) + assert.Equal(t, 0, queued) +} + +// --- pidAlive --- + +func TestMonitor_PidAlive_Good_CurrentProcess(t *testing.T) { + pid, _ := strconv.Atoi(core.Env("PID")) + assert.True(t, pidAlive(pid), "current process must be alive") +} + +func TestMonitor_PidAlive_Bad_DeadPID(t *testing.T) { + assert.False(t, pidAlive(99999999)) +} + +func TestMonitor_PidAlive_Ugly_ZeroPID(t *testing.T) { + assert.NotPanics(t, func() { pidAlive(0) }) +} + +func TestMonitor_PidAlive_Ugly_NegativePID(t *testing.T) { + assert.NotPanics(t, func() { pidAlive(-1) }) +} + +// --- OnStartup / OnShutdown --- + +func TestMonitor_OnStartup_Good_StartsLoop(t *testing.T) { + wsRoot := t.TempDir() + t.Setenv("CORE_WORKSPACE", wsRoot) + fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) + + home := t.TempDir() + t.Setenv("HOME", home) + + mon := New(Options{Interval: 1 * time.Hour}) + r := mon.OnStartup(context.Background()) + assert.True(t, r.OK) + assert.NotNil(t, mon.cancel) + + r2 := mon.OnShutdown(context.Background()) + assert.True(t, r2.OK) +} + +func TestMonitor_OnStartup_Good_NoError(t *testing.T) { + wsRoot := t.TempDir() + t.Setenv("CORE_WORKSPACE", wsRoot) + fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) + + mon := New(Options{Interval: 1 * time.Hour}) + assert.True(t, mon.OnStartup(context.Background()).OK) + _ = mon.OnShutdown(context.Background()) +} + +func TestMonitor_OnShutdown_Good_NoError(t *testing.T) { + mon := New(Options{Interval: 1 * time.Hour}) + assert.True(t, mon.OnShutdown(context.Background()).OK) +} + +func TestMonitor_OnShutdown_Good_StopsLoop(t *testing.T) { + wsRoot := t.TempDir() + t.Setenv("CORE_WORKSPACE", wsRoot) + fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) + + home := t.TempDir() + t.Setenv("HOME", home) + + mon := New(Options{Interval: 1 * time.Hour}) + require.True(t, mon.OnStartup(context.Background()).OK) + + done := make(chan bool, 1) + go func() { + done <- mon.OnShutdown(context.Background()).OK + }() + + select { + case ok := <-done: + assert.True(t, ok) + case <-time.After(5 * time.Second): + t.Fatal("OnShutdown did not return in time") + } +} + +func TestMonitor_OnShutdown_Ugly_NilCancel(t *testing.T) { + mon := New() + assert.NotPanics(t, func() { + _ = mon.OnShutdown(context.Background()) + }) +} + // --- checkCompletions --- func TestMonitor_CheckCompletions_Good_NewCompletions(t *testing.T) { @@ -518,85 +811,6 @@ func TestMonitor_Loop_Good_PokeTriggersCheck(t *testing.T) { mon.wg.Wait() } -// --- initSyncTimestamp --- - -func TestMonitor_InitSyncTimestamp_Good(t *testing.T) { - mon := New() - assert.Equal(t, int64(0), mon.lastSyncTimestamp) - - before := time.Now().Unix() - mon.initSyncTimestamp() - after := time.Now().Unix() - - mon.mu.Lock() - ts := mon.lastSyncTimestamp - mon.mu.Unlock() - - assert.GreaterOrEqual(t, ts, before) - assert.LessOrEqual(t, ts, after) -} - -func TestMonitor_InitSyncTimestamp_Good_NoOverwrite(t *testing.T) { - mon := New() - mon.lastSyncTimestamp = 12345 - - mon.initSyncTimestamp() - - mon.mu.Lock() - assert.Equal(t, int64(12345), mon.lastSyncTimestamp) - mon.mu.Unlock() -} - -// --- syncRepos --- - -func TestMonitor_SyncRepos_Good_NoChanges(t *testing.T) { - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - assert.Equal(t, "/v1/agent/checkin", r.URL.Path) - resp := CheckinResponse{Timestamp: time.Now().Unix()} - w.Header().Set("Content-Type", "application/json") - w.Write([]byte(core.JSONMarshalString(resp))) - })) - defer srv.Close() - - setupAPIEnv(t, srv.URL) - - mon := New() - msg := mon.syncRepos() - assert.Equal(t, "", msg) -} - -func TestMonitor_SyncRepos_Bad_APIError(t *testing.T) { - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusInternalServerError) - })) - defer srv.Close() - - setupAPIEnv(t, srv.URL) - - mon := New() - msg := mon.syncRepos() - assert.Equal(t, "", msg) -} - -func TestMonitor_SyncRepos_Good_UpdatesTimestamp(t *testing.T) { - newTS := time.Now().Unix() + 1000 - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - resp := CheckinResponse{Timestamp: newTS} - w.Header().Set("Content-Type", "application/json") - w.Write([]byte(core.JSONMarshalString(resp))) - })) - defer srv.Close() - - setupAPIEnv(t, srv.URL) - - mon := New() - mon.syncRepos() - - mon.mu.Lock() - assert.Equal(t, newTS, mon.lastSyncTimestamp) - mon.mu.Unlock() -} - // --- agentStatusResource --- func TestMonitor_AgentStatusResource_Good(t *testing.T) { @@ -665,270 +879,3 @@ func TestMonitor_AgentStatusResource_Good_DeepWorkspaceName(t *testing.T) { require.Len(t, workspaces, 1) assert.Equal(t, "core/go-io/task-9", workspaces[0]["name"]) } - -// --- syncRepos (git pull path) --- - -func TestMonitor_SyncRepos_Good_PullsChangedRepo(t *testing.T) { - remoteDir := core.JoinPath(t.TempDir(), "remote") - fs.EnsureDir(remoteDir) - run(t, remoteDir, "git", "init", "--bare") - - codeDir := t.TempDir() - repoDir := core.JoinPath(codeDir, "test-repo") - run(t, codeDir, "git", "clone", remoteDir, "test-repo") - run(t, repoDir, "git", "checkout", "-b", "main") - fs.Write(core.JoinPath(repoDir, "README.md"), "# test") - run(t, repoDir, "git", "add", ".") - run(t, repoDir, "git", "commit", "-m", "init") - run(t, repoDir, "git", "push", "-u", "origin", "main") - - // Simulate another agent pushing work via a second clone - clone2Parent := t.TempDir() - tmpClone := core.JoinPath(clone2Parent, "clone2") - run(t, clone2Parent, "git", "clone", remoteDir, "clone2") - run(t, tmpClone, "git", "checkout", "main") - fs.Write(core.JoinPath(tmpClone, "new.go"), "package main\n") - run(t, tmpClone, "git", "add", ".") - run(t, tmpClone, "git", "commit", "-m", "agent work") - run(t, tmpClone, "git", "push", "origin", "main") - - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - resp := CheckinResponse{ - Changed: []ChangedRepo{{Repo: "test-repo", Branch: "main", SHA: "abc"}}, - Timestamp: time.Now().Unix() + 100, - } - w.Header().Set("Content-Type", "application/json") - w.Write([]byte(core.JSONMarshalString(resp))) - })) - defer srv.Close() - - setupAPIEnv(t, srv.URL) - t.Setenv("CODE_PATH", codeDir) - - mon := New() - mon.ServiceRuntime = testMon.ServiceRuntime - msg := mon.syncRepos() - assert.Contains(t, msg, "Synced 1 repo(s)") - assert.Contains(t, msg, "test-repo") -} - -func TestMonitor_SyncRepos_Good_SkipsDirtyRepo(t *testing.T) { - remoteDir := core.JoinPath(t.TempDir(), "remote") - fs.EnsureDir(remoteDir) - run(t, remoteDir, "git", "init", "--bare") - - codeDir := t.TempDir() - repoDir := core.JoinPath(codeDir, "dirty-repo") - run(t, codeDir, "git", "clone", remoteDir, "dirty-repo") - run(t, repoDir, "git", "checkout", "-b", "main") - fs.Write(core.JoinPath(repoDir, "README.md"), "# test") - run(t, repoDir, "git", "add", ".") - run(t, repoDir, "git", "commit", "-m", "init") - run(t, repoDir, "git", "push", "-u", "origin", "main") - - // Make the repo dirty - fs.Write(core.JoinPath(repoDir, "dirty.txt"), "uncommitted") - - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - resp := CheckinResponse{ - Changed: []ChangedRepo{{Repo: "dirty-repo", Branch: "main", SHA: "abc"}}, - Timestamp: time.Now().Unix() + 100, - } - w.Header().Set("Content-Type", "application/json") - w.Write([]byte(core.JSONMarshalString(resp))) - })) - defer srv.Close() - - setupAPIEnv(t, srv.URL) - t.Setenv("CODE_PATH", codeDir) - - mon := New() - mon.ServiceRuntime = testMon.ServiceRuntime - msg := mon.syncRepos() - assert.Equal(t, "", msg) -} - -func TestMonitor_SyncRepos_Good_SkipsNonMainBranch(t *testing.T) { - remoteDir := core.JoinPath(t.TempDir(), "remote") - fs.EnsureDir(remoteDir) - run(t, remoteDir, "git", "init", "--bare") - - codeDir := t.TempDir() - repoDir := core.JoinPath(codeDir, "feature-repo") - run(t, codeDir, "git", "clone", remoteDir, "feature-repo") - run(t, repoDir, "git", "checkout", "-b", "main") - fs.Write(core.JoinPath(repoDir, "README.md"), "# test") - run(t, repoDir, "git", "add", ".") - run(t, repoDir, "git", "commit", "-m", "init") - run(t, repoDir, "git", "push", "-u", "origin", "main") - run(t, repoDir, "git", "checkout", "-b", "feature/wip") - - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - resp := CheckinResponse{ - Changed: []ChangedRepo{{Repo: "feature-repo", Branch: "main", SHA: "abc"}}, - Timestamp: time.Now().Unix() + 100, - } - w.Header().Set("Content-Type", "application/json") - w.Write([]byte(core.JSONMarshalString(resp))) - })) - defer srv.Close() - - setupAPIEnv(t, srv.URL) - t.Setenv("CODE_PATH", codeDir) - - mon := New() - mon.ServiceRuntime = testMon.ServiceRuntime - msg := mon.syncRepos() - assert.Equal(t, "", msg) -} - -func TestMonitor_SyncRepos_Good_SkipsNonexistentRepo(t *testing.T) { - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - resp := CheckinResponse{ - Changed: []ChangedRepo{{Repo: "nonexistent", Branch: "main", SHA: "abc"}}, - Timestamp: time.Now().Unix() + 100, - } - w.Header().Set("Content-Type", "application/json") - w.Write([]byte(core.JSONMarshalString(resp))) - })) - defer srv.Close() - - setupAPIEnv(t, srv.URL) - t.Setenv("CODE_PATH", t.TempDir()) - - mon := New() - msg := mon.syncRepos() - assert.Equal(t, "", msg) -} - -func TestMonitor_SyncRepos_Good_UsesEnvBrainKey(t *testing.T) { - var authHeader string - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - authHeader = r.Header.Get("Authorization") - resp := CheckinResponse{Timestamp: time.Now().Unix()} - w.Header().Set("Content-Type", "application/json") - w.Write([]byte(core.JSONMarshalString(resp))) - })) - defer srv.Close() - - home := t.TempDir() - t.Setenv("HOME", home) - t.Setenv("CORE_BRAIN_KEY", "env-key-value") - t.Setenv("CORE_API_URL", srv.URL) - t.Setenv("AGENT_NAME", "test-agent") - - mon := New() - mon.syncRepos() - assert.Equal(t, "Bearer env-key-value", authHeader) -} - -// --- harvestCompleted (full path) --- - -func TestMonitor_HarvestCompleted_Good_MultipleWorkspaces(t *testing.T) { - wsRoot := t.TempDir() - t.Setenv("CORE_WORKSPACE", wsRoot) - - for i := 0; i < 2; i++ { - name := fmt.Sprintf("ws-%d", i) - wsDir := core.JoinPath(wsRoot, "workspace", name) - - sourceDir := core.JoinPath(wsRoot, fmt.Sprintf("source-%d", i)) - fs.EnsureDir(sourceDir) - run(t, sourceDir, "git", "init") - run(t, sourceDir, "git", "checkout", "-b", "main") - fs.Write(core.JoinPath(sourceDir, "README.md"), "# test") - run(t, sourceDir, "git", "add", ".") - run(t, sourceDir, "git", "commit", "-m", "init") - - fs.EnsureDir(wsDir) - run(t, wsDir, "git", "clone", sourceDir, "repo") - repoDir := core.JoinPath(wsDir, "repo") - run(t, repoDir, "git", "checkout", "-b", "agent/test-task") - fs.Write(core.JoinPath(repoDir, "new.go"), "package main\n") - run(t, repoDir, "git", "add", ".") - run(t, repoDir, "git", "commit", "-m", "agent work") - - writeStatus(t, wsDir, "completed", fmt.Sprintf("repo-%d", i), "agent/test-task") - } - - // Create Core with IPC handler to capture HarvestComplete messages - var harvests []messages.HarvestComplete - c := core.New(core.WithService(agentic.ProcessRegister)) - c.ServiceStartup(context.Background(), nil) - c.RegisterAction(func(_ *core.Core, msg core.Message) core.Result { - if ev, ok := msg.(messages.HarvestComplete); ok { - harvests = append(harvests, ev) - } - return core.Result{OK: true} - }) - - mon := New() - mon.ServiceRuntime = core.NewServiceRuntime(c, MonitorOptions{}) - - msg := mon.harvestCompleted() - assert.Contains(t, msg, "Harvested:") - assert.Contains(t, msg, "repo-0") - assert.Contains(t, msg, "repo-1") - - assert.GreaterOrEqual(t, len(harvests), 2) -} - -func TestMonitor_HarvestCompleted_Good_Empty(t *testing.T) { - wsRoot := t.TempDir() - t.Setenv("CORE_WORKSPACE", wsRoot) - fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) - - mon := New() - mon.ServiceRuntime = testMon.ServiceRuntime - msg := mon.harvestCompleted() - assert.Equal(t, "", msg) -} - -func TestMonitor_HarvestCompleted_Good_RejectedWorkspace(t *testing.T) { - wsRoot := t.TempDir() - t.Setenv("CORE_WORKSPACE", wsRoot) - - sourceDir := core.JoinPath(wsRoot, "source-rej") - fs.EnsureDir(sourceDir) - run(t, sourceDir, "git", "init") - run(t, sourceDir, "git", "checkout", "-b", "main") - fs.Write(core.JoinPath(sourceDir, "README.md"), "# test") - run(t, sourceDir, "git", "add", ".") - run(t, sourceDir, "git", "commit", "-m", "init") - - wsDir := core.JoinPath(wsRoot, "workspace", "ws-rej") - fs.EnsureDir(wsDir) - run(t, wsDir, "git", "clone", sourceDir, "repo") - repoDir := core.JoinPath(wsDir, "repo") - run(t, repoDir, "git", "checkout", "-b", "agent/test-task") - fs.Write(core.JoinPath(repoDir, "new.go"), "package main\n") - run(t, repoDir, "git", "add", ".") - run(t, repoDir, "git", "commit", "-m", "agent work") - - // Add binary to trigger rejection - fs.Write(core.JoinPath(repoDir, "app.exe"), "binary") - run(t, repoDir, "git", "add", ".") - run(t, repoDir, "git", "commit", "-m", "add binary") - - writeStatus(t, wsDir, "completed", "rej-repo", "agent/test-task") - - // Create Core with IPC handler to capture HarvestRejected messages - var rejections []messages.HarvestRejected - c := core.New(core.WithService(agentic.ProcessRegister)) - c.ServiceStartup(context.Background(), nil) - c.RegisterAction(func(_ *core.Core, msg core.Message) core.Result { - if ev, ok := msg.(messages.HarvestRejected); ok { - rejections = append(rejections, ev) - } - return core.Result{OK: true} - }) - - mon := New() - mon.ServiceRuntime = core.NewServiceRuntime(c, MonitorOptions{}) - - msg := mon.harvestCompleted() - assert.Contains(t, msg, "REJECTED") - - require.Len(t, rejections, 1) - assert.Contains(t, rejections[0].Reason, "binary file added") -} diff --git a/pkg/monitor/monitor_testmain_test.go b/pkg/monitor/monitor_testmain_test.go deleted file mode 100644 index 602c6de..0000000 --- a/pkg/monitor/monitor_testmain_test.go +++ /dev/null @@ -1,22 +0,0 @@ -// SPDX-License-Identifier: EUPL-1.2 - -package monitor - -import ( - "context" - "os" - "testing" - - "dappco.re/go/agent/pkg/agentic" - core "dappco.re/go/core" -) - -var testMon *Subsystem - -func TestMain(m *testing.M) { - c := core.New(core.WithService(agentic.ProcessRegister)) - c.ServiceStartup(context.Background(), nil) - testMon = New() - testMon.ServiceRuntime = core.NewServiceRuntime(c, MonitorOptions{}) - os.Exit(m.Run()) -} diff --git a/pkg/monitor/register_test.go b/pkg/monitor/register_test.go index 877b211..e9e50e9 100644 --- a/pkg/monitor/register_test.go +++ b/pkg/monitor/register_test.go @@ -5,25 +5,68 @@ package monitor import ( "testing" + "dappco.re/go/agent/pkg/messages" core "dappco.re/go/core" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -func TestRegister_Register_Good(t *testing.T) { +func TestRegister_Register_Good_ReturnsSubsystem(t *testing.T) { + wsRoot := t.TempDir() + t.Setenv("CORE_WORKSPACE", wsRoot) + c := core.New(core.WithService(Register)) svc, ok := core.ServiceFor[*Subsystem](c, "monitor") assert.True(t, ok) assert.NotNil(t, svc) } -func TestRegister_Register_Bad_ServiceName(t *testing.T) { +func TestRegister_Register_Good_RegistersServiceName(t *testing.T) { + wsRoot := t.TempDir() + t.Setenv("CORE_WORKSPACE", wsRoot) + c := core.New(core.WithService(Register)) assert.Contains(t, c.Services(), "monitor") } -func TestRegister_Register_Ugly_ServiceRuntime(t *testing.T) { +func TestRegister_Register_Good_WiresServiceRuntime(t *testing.T) { + wsRoot := t.TempDir() + t.Setenv("CORE_WORKSPACE", wsRoot) + c := core.New(core.WithService(Register)) svc, _ := core.ServiceFor[*Subsystem](c, "monitor") assert.NotNil(t, svc.ServiceRuntime) assert.Equal(t, c, svc.Core()) } + +func TestRegister_Register_Good_TracksStartedIPC(t *testing.T) { + wsRoot := t.TempDir() + t.Setenv("CORE_WORKSPACE", wsRoot) + fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) + + c := core.New(core.WithService(Register)) + svc, ok := core.ServiceFor[*Subsystem](c, "monitor") + require.True(t, ok) + + c.ACTION(messages.AgentStarted{Agent: "codex", Repo: "go-io", Workspace: "ws-reg"}) + + svc.mu.Lock() + defer svc.mu.Unlock() + assert.True(t, svc.seenRunning["ws-reg"]) +} + +func TestRegister_Register_Good_TracksCompletedIPC(t *testing.T) { + wsRoot := t.TempDir() + t.Setenv("CORE_WORKSPACE", wsRoot) + fs.EnsureDir(core.JoinPath(wsRoot, "workspace")) + + c := core.New(core.WithService(Register)) + svc, ok := core.ServiceFor[*Subsystem](c, "monitor") + require.True(t, ok) + + c.ACTION(messages.AgentCompleted{Agent: "codex", Repo: "go-io", Workspace: "ws-done", Status: "completed"}) + + svc.mu.Lock() + defer svc.mu.Unlock() + assert.True(t, svc.seenCompleted["ws-done"]) +} diff --git a/pkg/monitor/sync_test.go b/pkg/monitor/sync_test.go index 46b2ca4..2c54958 100644 --- a/pkg/monitor/sync_test.go +++ b/pkg/monitor/sync_test.go @@ -3,8 +3,12 @@ package monitor import ( + "net/http" + "net/http/httptest" "testing" + "time" + core "dappco.re/go/core" "github.com/stretchr/testify/assert" ) @@ -28,3 +32,203 @@ func TestSync_SyncRepos_Ugly_NoBrainKey(t *testing.T) { result := mon.syncRepos() assert.Equal(t, "", result) } + +func TestSync_SyncRepos_Good_NoChanges(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "/v1/agent/checkin", r.URL.Path) + resp := CheckinResponse{Timestamp: time.Now().Unix()} + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(core.JSONMarshalString(resp))) + })) + defer srv.Close() + + setupAPIEnv(t, srv.URL) + + mon := New() + msg := mon.syncRepos() + assert.Equal(t, "", msg) +} + +func TestSync_SyncRepos_Bad_APIError(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + + setupAPIEnv(t, srv.URL) + + mon := New() + msg := mon.syncRepos() + assert.Equal(t, "", msg) +} + +func TestSync_SyncRepos_Good_UpdatesTimestamp(t *testing.T) { + newTS := time.Now().Unix() + 1000 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + resp := CheckinResponse{Timestamp: newTS} + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(core.JSONMarshalString(resp))) + })) + defer srv.Close() + + setupAPIEnv(t, srv.URL) + + mon := New() + mon.syncRepos() + + mon.mu.Lock() + assert.Equal(t, newTS, mon.lastSyncTimestamp) + mon.mu.Unlock() +} + +func TestSync_SyncRepos_Good_PullsChangedRepo(t *testing.T) { + remoteDir := core.JoinPath(t.TempDir(), "remote") + fs.EnsureDir(remoteDir) + run(t, remoteDir, "git", "init", "--bare") + + codeDir := t.TempDir() + repoDir := core.JoinPath(codeDir, "test-repo") + run(t, codeDir, "git", "clone", remoteDir, "test-repo") + run(t, repoDir, "git", "checkout", "-b", "main") + fs.Write(core.JoinPath(repoDir, "README.md"), "# test") + run(t, repoDir, "git", "add", ".") + run(t, repoDir, "git", "commit", "-m", "init") + run(t, repoDir, "git", "push", "-u", "origin", "main") + + clone2Parent := t.TempDir() + tmpClone := core.JoinPath(clone2Parent, "clone2") + run(t, clone2Parent, "git", "clone", remoteDir, "clone2") + run(t, tmpClone, "git", "checkout", "main") + fs.Write(core.JoinPath(tmpClone, "new.go"), "package main\n") + run(t, tmpClone, "git", "add", ".") + run(t, tmpClone, "git", "commit", "-m", "agent work") + run(t, tmpClone, "git", "push", "origin", "main") + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + resp := CheckinResponse{ + Changed: []ChangedRepo{{Repo: "test-repo", Branch: "main", SHA: "abc"}}, + Timestamp: time.Now().Unix() + 100, + } + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(core.JSONMarshalString(resp))) + })) + defer srv.Close() + + setupAPIEnv(t, srv.URL) + t.Setenv("CODE_PATH", codeDir) + + mon := New() + mon.ServiceRuntime = testMon.ServiceRuntime + msg := mon.syncRepos() + assert.Contains(t, msg, "Synced 1 repo(s)") + assert.Contains(t, msg, "test-repo") +} + +func TestSync_SyncRepos_Good_SkipsDirtyRepo(t *testing.T) { + remoteDir := core.JoinPath(t.TempDir(), "remote") + fs.EnsureDir(remoteDir) + run(t, remoteDir, "git", "init", "--bare") + + codeDir := t.TempDir() + repoDir := core.JoinPath(codeDir, "dirty-repo") + run(t, codeDir, "git", "clone", remoteDir, "dirty-repo") + run(t, repoDir, "git", "checkout", "-b", "main") + fs.Write(core.JoinPath(repoDir, "README.md"), "# test") + run(t, repoDir, "git", "add", ".") + run(t, repoDir, "git", "commit", "-m", "init") + run(t, repoDir, "git", "push", "-u", "origin", "main") + + fs.Write(core.JoinPath(repoDir, "dirty.txt"), "uncommitted") + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + resp := CheckinResponse{ + Changed: []ChangedRepo{{Repo: "dirty-repo", Branch: "main", SHA: "abc"}}, + Timestamp: time.Now().Unix() + 100, + } + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(core.JSONMarshalString(resp))) + })) + defer srv.Close() + + setupAPIEnv(t, srv.URL) + t.Setenv("CODE_PATH", codeDir) + + mon := New() + mon.ServiceRuntime = testMon.ServiceRuntime + msg := mon.syncRepos() + assert.Equal(t, "", msg) +} + +func TestSync_SyncRepos_Good_SkipsNonMainBranch(t *testing.T) { + remoteDir := core.JoinPath(t.TempDir(), "remote") + fs.EnsureDir(remoteDir) + run(t, remoteDir, "git", "init", "--bare") + + codeDir := t.TempDir() + repoDir := core.JoinPath(codeDir, "feature-repo") + run(t, codeDir, "git", "clone", remoteDir, "feature-repo") + run(t, repoDir, "git", "checkout", "-b", "main") + fs.Write(core.JoinPath(repoDir, "README.md"), "# test") + run(t, repoDir, "git", "add", ".") + run(t, repoDir, "git", "commit", "-m", "init") + run(t, repoDir, "git", "push", "-u", "origin", "main") + run(t, repoDir, "git", "checkout", "-b", "feature/wip") + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + resp := CheckinResponse{ + Changed: []ChangedRepo{{Repo: "feature-repo", Branch: "main", SHA: "abc"}}, + Timestamp: time.Now().Unix() + 100, + } + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(core.JSONMarshalString(resp))) + })) + defer srv.Close() + + setupAPIEnv(t, srv.URL) + t.Setenv("CODE_PATH", codeDir) + + mon := New() + mon.ServiceRuntime = testMon.ServiceRuntime + msg := mon.syncRepos() + assert.Equal(t, "", msg) +} + +func TestSync_SyncRepos_Good_SkipsNonexistentRepo(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + resp := CheckinResponse{ + Changed: []ChangedRepo{{Repo: "nonexistent", Branch: "main", SHA: "abc"}}, + Timestamp: time.Now().Unix() + 100, + } + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(core.JSONMarshalString(resp))) + })) + defer srv.Close() + + setupAPIEnv(t, srv.URL) + t.Setenv("CODE_PATH", t.TempDir()) + + mon := New() + msg := mon.syncRepos() + assert.Equal(t, "", msg) +} + +func TestSync_SyncRepos_Good_UsesEnvBrainKey(t *testing.T) { + var authHeader string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + authHeader = r.Header.Get("Authorization") + resp := CheckinResponse{Timestamp: time.Now().Unix()} + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(core.JSONMarshalString(resp))) + })) + defer srv.Close() + + home := t.TempDir() + t.Setenv("HOME", home) + t.Setenv("CORE_BRAIN_KEY", "env-key-value") + t.Setenv("CORE_API_URL", srv.URL) + t.Setenv("AGENT_NAME", "test-agent") + + mon := New() + mon.syncRepos() + assert.Equal(t, "Bearer env-key-value", authHeader) +}