feat(agent/fleet): core login CODE + fleet connect/poll/heartbeat (#539)

Per RFC §9 Fleet Mode: device pairing + SSE-with-poll-fallback +
heartbeat + status reporting now wired.

Lands:
* pkg/agentic/fleet_login.go — `core login CODE` POSTs /v1/device/pair
  with the 6-digit code; writes {agent_api_key, agent_id, expires_at}
  to ~/.core/agent.key (mode 0600). Errors clean (no panic) on invalid
  code / network fail.
* pkg/agentic/fleet_connect.go — Connect(ctx) opens SSE to
  /v1/fleet/events with Bearer auth; reconnect backoff 1s→2s→4s→8s→
  16s→30s. PollFallback via /v1/fleet/task/next every 30s when SSE
  keeps failing. Heartbeat goroutine POSTs /v1/fleet/heartbeat every
  60s with {agent_id, status}. Persists last-known fleet snapshot to
  ~/.core/fleet.status.json so fleet/status survives restart.
* pkg/agentic/fleet_mode.go — `core fleet` top-level + `fleet/nodes`
  (lists registered nodes) + `fleet/status` (connection state, last
  heartbeat, last task). All exit cleanly on API-unreachable.
* commands.go — registerFleetCommands wired into registerCommands.
* AX-10 tests + CLI Taskfiles for login + nodes (unreachable-API
  asserted clean-exit, no panic).

Sandbox blocked from go test by go.work + private-module-graph
(pre-existing); gofmt clean.

Co-authored-by: Codex <noreply@openai.com>
Closes tasks.lthn.sh/view.php?id=539
This commit is contained in:
Snider 2026-04-26 00:13:18 +01:00
parent c6415aa53a
commit 7a9dbadb57
9 changed files with 1346 additions and 2 deletions

View file

@ -96,6 +96,7 @@ func (s *PrepSubsystem) registerCommands(ctx context.Context) {
s.registerSprintCommands()
s.registerStateCommands()
s.registerCoreCommands()
s.registerFleetCommands()
s.registerPipelineCommands()
s.registerLanguageCommands()
s.registerSetupCommands()

View file

@ -0,0 +1,629 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"bufio"
"context"
"net/http"
"sync"
"time"
core "dappco.re/go/core"
)
var fleetBackoffSchedule = []time.Duration{
time.Second,
2 * time.Second,
4 * time.Second,
8 * time.Second,
16 * time.Second,
30 * time.Second,
}
var fleetPollInterval = 30 * time.Second
var fleetHeartbeatInterval = 60 * time.Second
var fleetPollingFailureThreshold = 3
var fleetSleep = func(ctx context.Context, delay time.Duration) bool {
if delay <= 0 {
select {
case <-ctx.Done():
return false
default:
return true
}
}
timer := time.NewTimer(delay)
defer timer.Stop()
select {
case <-ctx.Done():
return false
case <-timer.C:
return true
}
}
type fleetClientConfig struct {
APIURL string
AgentID string
AgentAPIKey string
Status string
Capabilities []string
}
type fleetRuntimeSnapshot struct {
APIURL string
AgentID string
State string
Transport string
LastError string
LastHeartbeatAt string
LastConnectedAt string
LastEventAt string
LastTaskReceived string
LastTask FleetTask
}
var fleetRuntimeState = struct {
mu sync.RWMutex
snapshot fleetRuntimeSnapshot
}{
snapshot: fleetRuntimeSnapshot{State: "offline"},
}
// result := subsystem.Connect(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"}))
func (s *PrepSubsystem) Connect(ctx context.Context, options core.Options) core.Result {
config := fleetClientConfigFromOptions(s, options)
if validation := validateFleetClientConfig("agentic.fleet.connect", config, true); !validation.OK {
return validation
}
fleetRememberBase(config)
fleetRememberState("connecting", "sse", "")
heartbeatContext, cancelHeartbeat := context.WithCancel(ctx)
defer cancelHeartbeat()
if fleetHeartbeatInterval > 0 {
go s.runFleetHeartbeat(heartbeatContext, config)
}
var pollingCancel context.CancelFunc
var pollingDone chan struct{}
consecutiveFailures := 0
for ctx.Err() == nil {
if pollingDone != nil {
select {
case <-pollingDone:
pollingDone = nil
pollingCancel = nil
default:
}
}
result := s.connectFleetEventStream(ctx, config)
if result.OK {
consecutiveFailures = 0
if pollingCancel != nil {
pollingCancel()
if pollingDone != nil {
<-pollingDone
}
pollingCancel = nil
pollingDone = nil
}
continue
}
if ctx.Err() != nil {
break
}
consecutiveFailures++
err := commandResultError("agentic.fleet.connect", result)
fleetRememberState("disconnected", "sse", err.Error())
if consecutiveFailures >= fleetPollingFailureThreshold && pollingCancel == nil {
pollingContext, cancelPolling := context.WithCancel(ctx)
pollingCancel = cancelPolling
pollingDone = make(chan struct{})
go func() {
defer close(pollingDone)
_ = s.runFleetPollFallback(pollingContext, config)
}()
}
if !fleetSleep(ctx, fleetBackoffDelay(consecutiveFailures)) {
break
}
}
if pollingCancel != nil {
pollingCancel()
if pollingDone != nil {
<-pollingDone
}
}
fleetRememberState("offline", fleetRuntimeSnapshotValue().Transport, "")
return core.Result{OK: true}
}
// result := subsystem.PollFallback(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"}))
func (s *PrepSubsystem) PollFallback(ctx context.Context, options core.Options) core.Result {
config := fleetClientConfigFromOptions(s, options)
if validation := validateFleetClientConfig("agentic.fleet.poll", config, true); !validation.OK {
return validation
}
return s.runFleetPollFallback(ctx, config)
}
// result := subsystem.Heartbeat(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"}))
func (s *PrepSubsystem) Heartbeat(ctx context.Context, options core.Options) core.Result {
config := fleetClientConfigFromOptions(s, options)
if validation := validateFleetClientConfig("agentic.fleet.heartbeat", config, true); !validation.OK {
return validation
}
return s.runFleetHeartbeat(ctx, config)
}
func fleetClientConfigFromOptions(s *PrepSubsystem, options core.Options) fleetClientConfig {
status := optionStringValue(options, "status")
if status == "" {
status = "online"
}
token := optionStringValue(options, "agent_api_key", "agent-api-key", "token")
if token == "" {
token = fleetAgentAPIKey(s)
}
agentID := optionStringValue(options, "agent_id", "agent-id", "_arg")
if agentID == "" {
agentID = AgentName()
}
return fleetClientConfig{
APIURL: fleetAPIURLFromOptions(s, options),
AgentID: agentID,
AgentAPIKey: token,
Status: status,
Capabilities: optionStringSliceValue(options, "capabilities"),
}
}
func validateFleetClientConfig(action string, config fleetClientConfig, requireToken bool) core.Result {
if core.Trim(config.APIURL) == "" {
return core.Result{Value: core.E(action, "api url is required", nil), OK: false}
}
if core.Trim(config.AgentID) == "" {
return core.Result{Value: core.E(action, "agent_id is required", nil), OK: false}
}
if requireToken && core.Trim(config.AgentAPIKey) == "" {
return core.Result{Value: core.E(action, core.Concat("no fleet api key configured at ", fleetAgentKeyPath()), nil), OK: false}
}
return core.Result{OK: true}
}
func fleetAPIURLFromOptions(s *PrepSubsystem, options core.Options) string {
if apiURL := optionStringValue(options, "api", "api_url", "api-url"); apiURL != "" {
return core.TrimSuffix(apiURL, "/")
}
if envURL := core.Env("CORE_API_URL"); envURL != "" {
return core.TrimSuffix(envURL, "/")
}
if s != nil && s.brainURL != "" {
return core.TrimSuffix(s.brainURL, "/")
}
return "https://api.lthn.sh"
}
func fleetAgentKeyPath() string {
return core.JoinPath(HomeDir(), ".core", "agent.key")
}
func fleetStatusSnapshotPath() string {
return core.JoinPath(HomeDir(), ".core", "fleet.status.json")
}
func fleetAgentAPIKey(s *PrepSubsystem) string {
if value := core.Env("CORE_AGENT_API_KEY"); value != "" {
return core.Trim(value)
}
if value := core.Env("CORE_BRAIN_KEY"); value != "" {
return core.Trim(value)
}
if readResult := fs.Read(fleetAgentKeyPath()); readResult.OK {
return core.Trim(readResult.Value.(string))
}
if readResult := fs.Read(core.JoinPath(HomeDir(), ".claude", "agent-api.key")); readResult.OK {
return core.Trim(readResult.Value.(string))
}
if readResult := fs.Read(core.JoinPath(HomeDir(), ".claude", "brain.key")); readResult.OK {
return core.Trim(readResult.Value.(string))
}
if s != nil {
return core.Trim(s.brainKey)
}
return ""
}
func (s *PrepSubsystem) fleetJSONRequest(ctx context.Context, action string, config fleetClientConfig, method, path string, body any) core.Result {
bodyString := ""
if body != nil {
bodyString = core.JSONMarshalString(body)
}
requestResult := HTTPDo(ctx, method, fleetURL(config.APIURL, path), bodyString, config.AgentAPIKey, "Bearer")
if !requestResult.OK {
return core.Result{Value: platformResultError(action, requestResult), OK: false}
}
rawBody := core.Trim(stringValue(requestResult.Value))
if rawBody == "" {
return core.Result{Value: map[string]any{}, OK: true}
}
var payload map[string]any
parseResult := core.JSONUnmarshalString(rawBody, &payload)
if !parseResult.OK {
err, _ := parseResult.Value.(error)
return core.Result{Value: core.E(action, "failed to parse fleet response", err), OK: false}
}
return core.Result{Value: payload, OK: true}
}
func (s *PrepSubsystem) fleetEventRequest(ctx context.Context, action string, config fleetClientConfig) core.Result {
path := appendQueryParam("/v1/fleet/events", "agent_id", config.AgentID)
path = appendQuerySlice(path, "capabilities[]", config.Capabilities)
request, err := http.NewRequestWithContext(ctx, http.MethodGet, fleetURL(config.APIURL, path), nil)
if err != nil {
return core.Result{Value: core.E(action, "create request", err), OK: false}
}
request.Header.Set("Accept", "text/event-stream, application/json")
if config.AgentAPIKey != "" {
request.Header.Set("Authorization", core.Concat("Bearer ", config.AgentAPIKey))
}
response, err := defaultClient.Do(request)
if err != nil {
return core.Result{Value: core.E(action, "request failed", err), OK: false}
}
if response.StatusCode >= 400 {
defer response.Body.Close()
readResult := core.ReadAll(response.Body)
if !readResult.OK {
return core.Result{Value: core.E(action, core.Sprintf("HTTP %d", response.StatusCode), nil), OK: false}
}
return core.Result{
Value: platformResultError(action, core.Result{Value: readResult.Value, OK: false}),
OK: false,
}
}
return core.Result{Value: response, OK: true}
}
func (s *PrepSubsystem) connectFleetEventStream(ctx context.Context, config fleetClientConfig) core.Result {
requestResult := s.fleetEventRequest(ctx, "agentic.fleet.connect", config)
if !requestResult.OK {
return requestResult
}
response, ok := requestResult.Value.(*http.Response)
if !ok || response == nil {
return core.Result{Value: core.E("agentic.fleet.connect", "invalid event stream response", nil), OK: false}
}
defer response.Body.Close()
fleetRememberBase(config)
fleetRememberState("connected", "sse", "")
fleetRememberConnected()
scanner := bufio.NewScanner(response.Body)
scanner.Buffer(make([]byte, 0, 4096), 1024*1024)
eventCount := 0
rawLines := make([]string, 0, 4)
flushEvent := func() {
if len(rawLines) == 0 {
return
}
eventBody := core.Join("\n", rawLines...)
rawLines = rawLines[:0]
payload := s.eventPayloadValue(eventBody)
output, err := parseFleetEventOutput(payload)
if err != nil {
return
}
eventCount++
fleetRememberState("connected", "sse", "")
fleetRememberEvent(output.Event)
}
for scanner.Scan() {
line := core.Trim(scanner.Text())
if line == "" {
flushEvent()
continue
}
rawLines = append(rawLines, line)
}
flushEvent()
if err := scanner.Err(); err != nil {
if eventCount > 0 {
return core.Result{Value: eventCount, OK: true}
}
return core.Result{Value: core.E("agentic.fleet.connect", "read event stream", err), OK: false}
}
if ctx.Err() != nil {
return core.Result{OK: true}
}
if eventCount > 0 {
return core.Result{Value: eventCount, OK: true}
}
return core.Result{Value: core.E("agentic.fleet.connect", "event stream closed before any events arrived", nil), OK: false}
}
func (s *PrepSubsystem) runFleetPollFallback(ctx context.Context, config fleetClientConfig) core.Result {
fleetRememberBase(config)
fleetRememberState("polling", "poll", "")
for ctx.Err() == nil {
result := s.pollFleetNextTask(ctx, config)
if result.OK {
task, _ := result.Value.(*FleetTask)
if task != nil {
return core.Result{Value: task, OK: true}
}
} else {
err := commandResultError("agentic.fleet.poll", result)
fleetRememberState("polling", "poll", err.Error())
}
if !fleetSleep(ctx, fleetPollInterval) {
break
}
}
return core.Result{OK: true}
}
func (s *PrepSubsystem) pollFleetNextTask(ctx context.Context, config fleetClientConfig) core.Result {
path := appendQueryParam("/v1/fleet/task/next", "agent_id", config.AgentID)
path = appendQuerySlice(path, "capabilities[]", config.Capabilities)
result := s.fleetJSONRequest(ctx, "agentic.fleet.poll", config, http.MethodGet, path, nil)
if !result.OK {
return result
}
payload, ok := result.Value.(map[string]any)
if !ok {
return core.Result{Value: core.E("agentic.fleet.poll", "invalid fleet polling payload", nil), OK: false}
}
taskValues := payloadResourceMap(payload, "task")
if len(taskValues) == 0 {
return core.Result{OK: true}
}
task := parseFleetTask(taskValues)
fleetRememberTask(task)
return core.Result{Value: &task, OK: true}
}
func (s *PrepSubsystem) runFleetHeartbeat(ctx context.Context, config fleetClientConfig) core.Result {
if fleetHeartbeatInterval <= 0 {
return core.Result{OK: true}
}
fleetRememberBase(config)
for ctx.Err() == nil {
result := s.postFleetHeartbeat(ctx, config)
if !result.OK {
err := commandResultError("agentic.fleet.heartbeat", result)
fleetRememberState(fleetRuntimeSnapshotValue().State, fleetRuntimeSnapshotValue().Transport, err.Error())
}
if !fleetSleep(ctx, fleetHeartbeatInterval) {
break
}
}
return core.Result{OK: true}
}
func (s *PrepSubsystem) postFleetHeartbeat(ctx context.Context, config fleetClientConfig) core.Result {
result := s.fleetJSONRequest(ctx, "agentic.fleet.heartbeat", config, http.MethodPost, "/v1/fleet/heartbeat", map[string]any{
"agent_id": config.AgentID,
"status": config.Status,
})
if !result.OK {
return result
}
fleetRememberHeartbeat()
return result
}
func fleetURL(apiURL, path string) string {
return core.Concat(core.TrimSuffix(apiURL, "/"), path)
}
func fleetBackoffDelay(failures int) time.Duration {
if len(fleetBackoffSchedule) == 0 {
return 30 * time.Second
}
index := failures - 1
if index < 0 {
index = 0
}
if index >= len(fleetBackoffSchedule) {
index = len(fleetBackoffSchedule) - 1
}
return fleetBackoffSchedule[index]
}
func fleetRuntimeSnapshotValue() fleetRuntimeSnapshot {
fleetRuntimeState.mu.RLock()
snapshot := fleetRuntimeState.snapshot
fleetRuntimeState.mu.RUnlock()
if !fleetSnapshotEmpty(snapshot) {
return snapshot
}
persisted := loadFleetRuntimeSnapshot()
if persisted.State == "" {
persisted.State = "offline"
}
return persisted
}
func fleetRememberBase(config fleetClientConfig) {
fleetRuntimeState.mu.Lock()
if config.APIURL != "" {
fleetRuntimeState.snapshot.APIURL = config.APIURL
}
if config.AgentID != "" {
fleetRuntimeState.snapshot.AgentID = config.AgentID
}
snapshot := fleetRuntimeState.snapshot
fleetRuntimeState.mu.Unlock()
persistFleetRuntimeSnapshot(snapshot)
}
func fleetRememberState(state, transport, lastError string) {
fleetRuntimeState.mu.Lock()
if state != "" {
fleetRuntimeState.snapshot.State = state
}
if transport != "" {
fleetRuntimeState.snapshot.Transport = transport
}
fleetRuntimeState.snapshot.LastError = core.Trim(lastError)
snapshot := fleetRuntimeState.snapshot
fleetRuntimeState.mu.Unlock()
persistFleetRuntimeSnapshot(snapshot)
}
func fleetRememberConnected() {
fleetRuntimeState.mu.Lock()
fleetRuntimeState.snapshot.LastConnectedAt = time.Now().Format(time.RFC3339)
snapshot := fleetRuntimeState.snapshot
fleetRuntimeState.mu.Unlock()
persistFleetRuntimeSnapshot(snapshot)
}
func fleetRememberHeartbeat() {
fleetRuntimeState.mu.Lock()
fleetRuntimeState.snapshot.LastHeartbeatAt = time.Now().Format(time.RFC3339)
fleetRuntimeState.snapshot.LastError = ""
snapshot := fleetRuntimeState.snapshot
fleetRuntimeState.mu.Unlock()
persistFleetRuntimeSnapshot(snapshot)
}
func fleetRememberEvent(event FleetEvent) {
fleetRuntimeState.mu.Lock()
fleetRuntimeState.snapshot.LastEventAt = time.Now().Format(time.RFC3339)
fleetRuntimeState.snapshot.LastError = ""
task := fleetTaskFromEvent(event)
if task.ID > 0 || task.Repo != "" || task.Task != "" {
fleetRuntimeState.snapshot.LastTask = task
fleetRuntimeState.snapshot.LastTaskReceived = fleetRuntimeState.snapshot.LastEventAt
}
snapshot := fleetRuntimeState.snapshot
fleetRuntimeState.mu.Unlock()
persistFleetRuntimeSnapshot(snapshot)
}
func fleetRememberTask(task FleetTask) {
fleetRuntimeState.mu.Lock()
fleetRuntimeState.snapshot.LastTask = task
fleetRuntimeState.snapshot.LastTaskReceived = time.Now().Format(time.RFC3339)
fleetRuntimeState.snapshot.LastError = ""
snapshot := fleetRuntimeState.snapshot
fleetRuntimeState.mu.Unlock()
persistFleetRuntimeSnapshot(snapshot)
}
func fleetTaskFromEvent(event FleetEvent) FleetTask {
payload := event.Payload
return FleetTask{
ID: event.TaskID,
Repo: event.Repo,
Branch: event.Branch,
Status: event.Status,
Task: stringValue(payload["task"]),
Template: stringValue(payload["template"]),
AgentModel: stringValue(payload["agent_model"]),
}
}
func resetFleetRuntimeState() {
fleetRuntimeState.mu.Lock()
fleetRuntimeState.snapshot = fleetRuntimeSnapshot{State: "offline"}
fleetRuntimeState.mu.Unlock()
_ = fs.Delete(fleetStatusSnapshotPath())
}
func fleetSnapshotEmpty(snapshot fleetRuntimeSnapshot) bool {
return snapshot.APIURL == "" &&
snapshot.AgentID == "" &&
snapshot.Transport == "" &&
snapshot.LastError == "" &&
snapshot.LastHeartbeatAt == "" &&
snapshot.LastConnectedAt == "" &&
snapshot.LastEventAt == "" &&
snapshot.LastTaskReceived == "" &&
snapshot.LastTask.ID == 0 &&
snapshot.LastTask.Repo == "" &&
snapshot.LastTask.Task == "" &&
(snapshot.State == "" || snapshot.State == "offline")
}
func persistFleetRuntimeSnapshot(snapshot fleetRuntimeSnapshot) {
if ensureResult := fs.EnsureDir(core.PathDir(fleetStatusSnapshotPath())); !ensureResult.OK {
return
}
_ = fs.WriteMode(fleetStatusSnapshotPath(), core.JSONMarshalString(snapshot), 0644)
}
func loadFleetRuntimeSnapshot() fleetRuntimeSnapshot {
readResult := fs.Read(fleetStatusSnapshotPath())
if !readResult.OK {
return fleetRuntimeSnapshot{State: "offline"}
}
var snapshot fleetRuntimeSnapshot
parseResult := core.JSONUnmarshalString(readResult.Value.(string), &snapshot)
if !parseResult.OK {
return fleetRuntimeSnapshot{State: "offline"}
}
return snapshot
}

View file

@ -0,0 +1,151 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"
core "dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestConnect_BackoffOnFailure(t *testing.T) {
t.Setenv("CORE_HOME", t.TempDir())
resetFleetRuntimeState()
originalSchedule := fleetBackoffSchedule
originalThreshold := fleetPollingFailureThreshold
originalHeartbeat := fleetHeartbeatInterval
originalSleep := fleetSleep
t.Cleanup(func() {
fleetBackoffSchedule = originalSchedule
fleetPollingFailureThreshold = originalThreshold
fleetHeartbeatInterval = originalHeartbeat
fleetSleep = originalSleep
resetFleetRuntimeState()
})
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "/v1/fleet/events", r.URL.Path)
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte(`{"error":"stream unavailable"}`))
}))
defer server.Close()
subsystem := testPrepWithPlatformServer(t, server, "secret-token")
fleetHeartbeatInterval = 0
fleetPollingFailureThreshold = 99
fleetBackoffSchedule = []time.Duration{
time.Millisecond,
2 * time.Millisecond,
4 * time.Millisecond,
8 * time.Millisecond,
16 * time.Millisecond,
30 * time.Millisecond,
}
ctx, cancel := context.WithCancel(context.Background())
durations := []time.Duration{}
fleetSleep = func(_ context.Context, delay time.Duration) bool {
durations = append(durations, delay)
if len(durations) >= 6 {
cancel()
return false
}
return true
}
result := subsystem.Connect(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"}))
require.True(t, result.OK)
assert.Equal(t, []time.Duration{
time.Millisecond,
2 * time.Millisecond,
4 * time.Millisecond,
8 * time.Millisecond,
16 * time.Millisecond,
30 * time.Millisecond,
}, durations)
}
func TestPollFallback_Good(t *testing.T) {
t.Setenv("CORE_HOME", t.TempDir())
resetFleetRuntimeState()
t.Cleanup(resetFleetRuntimeState)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "/v1/fleet/task/next", r.URL.Path)
require.Equal(t, "agent_id=charon", r.URL.RawQuery)
require.Equal(t, "Bearer secret-token", r.Header.Get("Authorization"))
_, _ = w.Write([]byte(`{"data":{"task":{"id":7,"repo":"core/go-io","branch":"dev","task":"Fix tests","status":"assigned"}}}`))
}))
defer server.Close()
subsystem := testPrepWithPlatformServer(t, server, "secret-token")
result := subsystem.PollFallback(context.Background(), core.NewOptions(core.Option{Key: "agent_id", Value: "charon"}))
require.True(t, result.OK)
task, ok := result.Value.(*FleetTask)
require.True(t, ok)
require.NotNil(t, task)
assert.Equal(t, 7, task.ID)
assert.Equal(t, "core/go-io", task.Repo)
snapshot := fleetRuntimeSnapshotValue()
assert.Equal(t, "polling", snapshot.State)
assert.Equal(t, 7, snapshot.LastTask.ID)
}
func TestHeartbeat_Good(t *testing.T) {
t.Setenv("CORE_HOME", t.TempDir())
resetFleetRuntimeState()
originalInterval := fleetHeartbeatInterval
originalSleep := fleetSleep
t.Cleanup(func() {
fleetHeartbeatInterval = originalInterval
fleetSleep = originalSleep
resetFleetRuntimeState()
})
requests := 0
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "/v1/fleet/heartbeat", r.URL.Path)
require.Equal(t, http.MethodPost, r.Method)
require.Equal(t, "Bearer secret-token", r.Header.Get("Authorization"))
bodyResult := core.ReadAll(r.Body)
require.True(t, bodyResult.OK)
var payload map[string]any
parseResult := core.JSONUnmarshalString(bodyResult.Value.(string), &payload)
require.True(t, parseResult.OK)
assert.Equal(t, "charon", payload["agent_id"])
assert.Equal(t, "online", payload["status"])
requests++
_, _ = w.Write([]byte(`{"data":{"ok":true}}`))
}))
defer server.Close()
subsystem := testPrepWithPlatformServer(t, server, "secret-token")
fleetHeartbeatInterval = time.Millisecond
ctx, cancel := context.WithCancel(context.Background())
fleetSleep = func(_ context.Context, delay time.Duration) bool {
if delay > 0 {
cancel()
return false
}
return true
}
result := subsystem.Heartbeat(ctx, core.NewOptions(core.Option{Key: "agent_id", Value: "charon"}))
require.True(t, result.OK)
assert.Equal(t, 1, requests)
assert.NotEmpty(t, fleetRuntimeSnapshotValue().LastHeartbeatAt)
}

136
pkg/agentic/fleet_login.go Normal file
View file

@ -0,0 +1,136 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"net/http"
core "dappco.re/go/core"
)
type fleetLoginOutput struct {
Success bool
AgentID string
AgentAPIKey string
ExpiresAt string
KeyPath string
}
// result := subsystem.cmdFleetLogin(core.NewOptions(core.Option{Key: "_arg", Value: "123456"}))
func (s *PrepSubsystem) cmdFleetLogin(options core.Options) core.Result {
code := core.Trim(optionStringValue(options, "code", "pairing_code", "pairing-code", "_arg"))
if !fleetPairingCodeValid(code) {
core.Print(nil, "usage: core-agent login <6-digit-code>")
return core.Result{Value: core.E("agentic.cmdFleetLogin", "pairing code must be 6 digits", nil), OK: false}
}
result := s.loginWithPairingCode(s.commandContext(), core.NewOptions(
core.Option{Key: "code", Value: code},
core.Option{Key: "api", Value: optionStringValue(options, "api", "api_url", "api-url")},
))
if !result.OK {
err := commandResultError("agentic.cmdFleetLogin", result)
core.Print(nil, "error: %v", err)
return core.Result{Value: err, OK: false}
}
output, ok := result.Value.(fleetLoginOutput)
if !ok {
err := core.E("agentic.cmdFleetLogin", "invalid fleet login output", nil)
core.Print(nil, "error: %v", err)
return core.Result{Value: err, OK: false}
}
core.Print(nil, "logged in")
if output.AgentID != "" {
core.Print(nil, "agent: %s", output.AgentID)
}
if output.ExpiresAt != "" {
core.Print(nil, "expires: %s", output.ExpiresAt)
}
core.Print(nil, "saved to: %s", output.KeyPath)
return core.Result{Value: output, OK: true}
}
func (s *PrepSubsystem) loginWithPairingCode(ctx context.Context, options core.Options) core.Result {
code := core.Trim(optionStringValue(options, "code", "pairing_code", "pairing-code", "_arg"))
if !fleetPairingCodeValid(code) {
return core.Result{Value: core.E("agentic.fleet.login", "pairing code must be 6 digits", nil), OK: false}
}
config := fleetClientConfig{
APIURL: fleetAPIURLFromOptions(s, options),
}
result := s.fleetJSONRequest(ctx, "agentic.fleet.login", config, http.MethodPost, "/v1/device/pair", map[string]any{
"code": code,
})
if !result.OK {
return result
}
payload, ok := result.Value.(map[string]any)
if !ok {
return core.Result{Value: core.E("agentic.fleet.login", "invalid fleet login payload", nil), OK: false}
}
output := parseFleetLoginOutput(payload)
if output.AgentAPIKey == "" {
return core.Result{Value: core.E("agentic.fleet.login", "device pairing response did not include an api key", nil), OK: false}
}
output.Success = true
output.KeyPath = fleetAgentKeyPath()
if ensureResult := fs.EnsureDir(core.PathDir(output.KeyPath)); !ensureResult.OK {
err, _ := ensureResult.Value.(error)
return core.Result{Value: core.E("agentic.fleet.login", "create fleet key directory", err), OK: false}
}
if writeResult := fs.WriteMode(output.KeyPath, output.AgentAPIKey, 0600); !writeResult.OK {
err, _ := writeResult.Value.(error)
return core.Result{Value: core.E("agentic.fleet.login", "write fleet api key", err), OK: false}
}
if s != nil {
s.brainKey = output.AgentAPIKey
}
fleetRememberBase(fleetClientConfig{APIURL: config.APIURL, AgentID: output.AgentID, AgentAPIKey: output.AgentAPIKey})
return core.Result{Value: output, OK: true}
}
func parseFleetLoginOutput(payload map[string]any) fleetLoginOutput {
values := payloadDataMap(payload)
if len(values) == 0 {
values = payload
}
key := stringValue(values["agent_api_key"])
if key == "" {
key = stringValue(values["key"])
}
if key == "" {
key = stringValue(anyMapValue(values["agent_api_key"])["key"])
}
return fleetLoginOutput{
AgentAPIKey: key,
AgentID: stringValue(values["agent_id"]),
ExpiresAt: stringValue(values["expires_at"]),
}
}
func fleetPairingCodeValid(code string) bool {
if len(code) != 6 {
return false
}
for _, character := range code {
if character < '0' || character > '9' {
return false
}
}
return true
}

View file

@ -0,0 +1,87 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
iofs "io/fs"
"net/http"
"net/http/httptest"
"testing"
core "dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestLogin_Good(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "/v1/device/pair", r.URL.Path)
require.Equal(t, http.MethodPost, r.Method)
require.Equal(t, "", r.Header.Get("Authorization"))
bodyResult := core.ReadAll(r.Body)
require.True(t, bodyResult.OK)
var payload map[string]any
parseResult := core.JSONUnmarshalString(bodyResult.Value.(string), &payload)
require.True(t, parseResult.OK)
assert.Equal(t, "123456", payload["code"])
_, _ = w.Write([]byte(`{"agent_api_key":"ak_live_test","agent_id":"charon","expires_at":"2027-01-01T00:00:00Z"}`))
}))
defer server.Close()
homeDir := t.TempDir()
t.Setenv("CORE_HOME", homeDir)
subsystem := testPrepWithPlatformServer(t, server, "")
output := captureStdout(t, func() {
result := subsystem.cmdFleetLogin(core.NewOptions(core.Option{Key: "_arg", Value: "123456"}))
require.True(t, result.OK)
})
assert.Contains(t, output, "logged in")
assert.Contains(t, output, "agent: charon")
assert.Contains(t, output, "saved to:")
keyPath := core.JoinPath(homeDir, ".core", "agent.key")
readResult := fs.Read(keyPath)
require.True(t, readResult.OK)
assert.Equal(t, "ak_live_test", core.Trim(readResult.Value.(string)))
statResult := fs.Stat(keyPath)
require.True(t, statResult.OK)
info := statResult.Value.(iofs.FileInfo)
assert.Equal(t, iofs.FileMode(0600), info.Mode().Perm())
}
func TestLogin_Bad(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusUnauthorized)
_, _ = w.Write([]byte(`{"error":"invalid pairing code"}`))
}))
defer server.Close()
subsystem := testPrepWithPlatformServer(t, server, "")
output := captureStdout(t, func() {
result := subsystem.cmdFleetLogin(core.NewOptions(core.Option{Key: "_arg", Value: "123456"}))
require.False(t, result.OK)
})
assert.Contains(t, output, "error:")
assert.Contains(t, output, "invalid pairing code")
}
func TestLogin_Ugly(t *testing.T) {
homeDir := t.TempDir()
t.Setenv("CORE_HOME", homeDir)
subsystem := testPrepWithPlatformServer(t, nil, "")
output := captureStdout(t, func() {
result := subsystem.cmdFleetLogin(core.NewOptions(core.Option{Key: "_arg", Value: "12ab"}))
require.False(t, result.OK)
})
assert.Contains(t, output, "usage: core-agent login <6-digit-code>")
assert.False(t, fs.Exists(core.JoinPath(homeDir, ".core", "agent.key")))
}

183
pkg/agentic/fleet_mode.go Normal file
View file

@ -0,0 +1,183 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"net/http"
core "dappco.re/go/core"
)
func (s *PrepSubsystem) registerFleetCommands() {
c := s.Core()
c.Command("login", core.Command{Description: "Exchange a 6-digit pairing code for a fleet api key", Action: s.cmdFleetLogin})
c.Command("agentic:login", core.Command{Description: "Exchange a 6-digit pairing code for a fleet api key", Action: s.cmdFleetLogin})
c.Command("fleet", core.Command{Description: "Run or inspect fleet mode", Action: s.cmdFleet})
c.Command("agentic:fleet", core.Command{Description: "Run or inspect fleet mode", Action: s.cmdFleet})
c.Command("fleet/nodes", core.Command{Description: "List registered fleet nodes", Action: s.cmdFleetNodesCommand})
c.Command("agentic:fleet/nodes", core.Command{Description: "List registered fleet nodes", Action: s.cmdFleetNodesCommand})
c.Command("fleet/status", core.Command{Description: "Show current fleet connection status", Action: s.cmdFleetStatus})
c.Command("agentic:fleet/status", core.Command{Description: "Show current fleet connection status", Action: s.cmdFleetStatus})
}
func (s *PrepSubsystem) cmdFleet(options core.Options) core.Result {
action := optionStringValue(options, "action", "_arg")
switch action {
case "nodes":
return s.cmdFleetNodesCommand(options)
case "status":
return s.cmdFleetStatus(options)
case "", "help":
if optionBoolValue(options, "help") || optionStringValue(options, "agent_id", "agent-id") == "" {
printFleetUsage()
return core.Result{OK: true}
}
config := fleetClientConfigFromOptions(s, options)
core.Print(nil, "fleet mode")
core.Print(nil, "api: %s", config.APIURL)
core.Print(nil, "agent: %s", config.AgentID)
result := s.Connect(s.commandContext(), options)
if !result.OK {
err := commandResultError("agentic.cmdFleet", result)
core.Print(nil, "error: %v", err)
return core.Result{Value: err, OK: false}
}
return result
default:
printFleetUsage()
return core.Result{Value: core.E("agentic.cmdFleet", core.Concat("unknown fleet command: ", action), nil), OK: false}
}
}
func (s *PrepSubsystem) cmdFleetNodesCommand(options core.Options) core.Result {
result := s.listFleetNodes(s.commandContext(), options)
if !result.OK {
err := commandResultError("agentic.cmdFleetNodes", result)
core.Print(nil, "error: %v", err)
return core.Result{Value: err, OK: false}
}
output, ok := result.Value.(FleetNodesOutput)
if !ok {
err := core.E("agentic.cmdFleetNodes", "invalid fleet nodes output", nil)
core.Print(nil, "error: %v", err)
return core.Result{Value: err, OK: false}
}
if len(output.Nodes) == 0 {
core.Print(nil, "no fleet nodes")
return core.Result{Value: output, OK: true}
}
for _, node := range output.Nodes {
core.Print(nil, " %-10s %-8s %-10s %s", node.Status, node.Platform, node.AgentID, core.Join(",", node.Models...))
}
core.Print(nil, "")
core.Print(nil, "total: %d", output.Total)
return core.Result{Value: output, OK: true}
}
func (s *PrepSubsystem) cmdFleetStatus(options core.Options) core.Result {
snapshot := fleetRuntimeSnapshotValue()
config := fleetClientConfigFromOptions(s, options)
if snapshot.APIURL == "" {
snapshot.APIURL = config.APIURL
}
if snapshot.AgentID == "" {
snapshot.AgentID = config.AgentID
}
if snapshot.State == "" {
snapshot.State = "offline"
}
if snapshot.Transport == "" {
snapshot.Transport = "none"
}
core.Print(nil, "api: %s", snapshot.APIURL)
core.Print(nil, "agent: %s", snapshot.AgentID)
core.Print(nil, "state: %s", snapshot.State)
core.Print(nil, "transport: %s", snapshot.Transport)
if snapshot.LastConnectedAt != "" {
core.Print(nil, "last connected: %s", snapshot.LastConnectedAt)
}
if snapshot.LastHeartbeatAt != "" {
core.Print(nil, "last heartbeat: %s", snapshot.LastHeartbeatAt)
} else {
core.Print(nil, "last heartbeat: never")
}
if snapshot.LastEventAt != "" {
core.Print(nil, "last event: %s", snapshot.LastEventAt)
}
if fleetTaskSummary(snapshot.LastTask) != "" {
core.Print(nil, "last task: %s", fleetTaskSummary(snapshot.LastTask))
if snapshot.LastTaskReceived != "" {
core.Print(nil, "task received: %s", snapshot.LastTaskReceived)
}
} else {
core.Print(nil, "last task: none")
}
if snapshot.LastError != "" {
core.Print(nil, "last error: %s", snapshot.LastError)
}
return core.Result{Value: snapshot, OK: true}
}
func (s *PrepSubsystem) listFleetNodes(ctx context.Context, options core.Options) core.Result {
config := fleetClientConfigFromOptions(s, options)
path := "/v1/fleet/nodes"
path = appendQueryParam(path, "status", optionStringValue(options, "status"))
path = appendQueryParam(path, "platform", optionStringValue(options, "platform"))
result := s.fleetJSONRequest(ctx, "agentic.fleet.nodes", config, http.MethodGet, path, nil)
if !result.OK {
return result
}
payload, ok := result.Value.(map[string]any)
if !ok {
return core.Result{Value: core.E("agentic.fleet.nodes", "invalid fleet nodes payload", nil), OK: false}
}
return core.Result{Value: parseFleetNodesOutput(payload), OK: true}
}
func printFleetUsage() {
core.Print(nil, "usage: core-agent fleet --api=https://api.lthn.ai --agent-id=charon")
core.Print(nil, " core-agent fleet nodes [--status=online] [--platform=linux]")
core.Print(nil, " core-agent fleet status")
}
func fleetTaskSummary(task FleetTask) string {
if task.ID == 0 && task.Repo == "" && task.Task == "" {
return ""
}
summary := ""
if task.ID > 0 {
summary = core.Sprintf("#%d", task.ID)
}
if task.Repo != "" {
if summary != "" {
summary = core.Concat(summary, " ")
}
summary = core.Concat(summary, task.Repo)
}
if task.Status != "" {
if summary != "" {
summary = core.Concat(summary, " ")
}
summary = core.Concat(summary, task.Status)
}
if task.Task != "" {
if summary != "" {
summary = core.Concat(summary, " ")
}
summary = core.Concat(summary, task.Task)
}
return summary
}

View file

@ -0,0 +1,72 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"net/http"
"net/http/httptest"
"testing"
core "dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestFleetMode_RegisterFleetCommands_Good(t *testing.T) {
t.Setenv("CORE_HOME", t.TempDir())
c := core.New(core.WithOption("name", "test"))
subsystem := &PrepSubsystem{ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{})}
subsystem.registerFleetCommands()
assert.Contains(t, c.Commands(), "login")
assert.Contains(t, c.Commands(), "fleet")
assert.Contains(t, c.Commands(), "fleet/nodes")
assert.Contains(t, c.Commands(), "fleet/status")
}
func TestFleetMode_CmdFleetNodes_Good(t *testing.T) {
t.Setenv("CORE_HOME", t.TempDir())
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write([]byte(`{"data":[{"id":1,"agent_id":"charon","platform":"linux","models":["codex"],"status":"online"}],"total":1}`))
}))
defer server.Close()
subsystem := testPrepWithPlatformServer(t, server, "secret-token")
output := captureStdout(t, func() {
result := subsystem.cmdFleetNodesCommand(core.NewOptions())
require.True(t, result.OK)
})
assert.Contains(t, output, "charon")
assert.Contains(t, output, "total: 1")
}
func TestFleetMode_CmdFleetStatus_Good(t *testing.T) {
t.Setenv("CORE_HOME", t.TempDir())
resetFleetRuntimeState()
t.Cleanup(resetFleetRuntimeState)
fleetRememberBase(fleetClientConfig{APIURL: "https://api.lthn.ai", AgentID: "charon"})
fleetRememberState("connected", "sse", "")
fleetRememberTask(FleetTask{ID: 9, Repo: "core/go-io", Status: "assigned", Task: "Fix tests"})
subsystem := testPrepWithPlatformServer(t, nil, "secret-token")
output := captureStdout(t, func() {
result := subsystem.cmdFleetStatus(core.NewOptions())
require.True(t, result.OK)
})
assert.Contains(t, output, "state: connected")
assert.Contains(t, output, "last task: #9 core/go-io assigned Fix tests")
}
func TestFleetMode_ListFleetNodes_Bad_Unreachable(t *testing.T) {
t.Setenv("CORE_HOME", t.TempDir())
subsystem := testPrepWithPlatformServer(t, nil, "secret-token")
result := subsystem.listFleetNodes(context.Background(), core.NewOptions(
core.Option{Key: "api", Value: "http://127.0.0.1:1"},
))
assert.False(t, result.OK)
}

View file

@ -0,0 +1,77 @@
version: "3"
tasks:
test:
cmds:
- |
bash <<'EOF'
set -euo pipefail
source ../../_lib/run.sh
GOWORK=off go build -trimpath -ldflags="-s -w" -o bin/core-agent ../../../../cmd/core-agent
workspace="$(mktemp -d)"
export CORE_HOME="$workspace"
export DIR_HOME="$workspace"
server_src="$(mktemp /tmp/core-agent-fleet-login-XXXXXX.go)"
server_url="$(mktemp)"
cat >"$server_src" <<'GO'
package main
import (
"encoding/json"
"net"
"net/http"
"os"
)
func main() {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
panic(err)
}
handler := http.NewServeMux()
handler.HandleFunc("/v1/device/pair", func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{
"agent_api_key": "ak_live_cli",
"agent_id": "charon",
"expires_at": "2027-01-01T00:00:00Z",
})
})
if err := os.WriteFile(os.Args[1], []byte("http://"+listener.Addr().String()), 0644); err != nil {
panic(err)
}
if err := http.Serve(listener, handler); err != nil {
panic(err)
}
}
GO
GOWORK=off go run "$server_src" "$server_url" >/tmp/core-agent-fleet-login.log 2>&1 &
server_pid=$!
trap 'kill "$server_pid" >/dev/null 2>&1 || true; rm -f "$server_src" "$server_url"' EXIT
for _ in $(seq 1 50); do
if [[ -s "$server_url" ]]; then
break
fi
sleep 0.1
done
export CORE_API_URL="$(cat "$server_url")"
output="$(mktemp)"
run_capture_all 0 "$output" ./bin/core-agent login 123456
assert_contains "logged in" "$output"
assert_contains "saved to:" "$output"
key_path="$workspace/.core/agent.key"
test -f "$key_path"
grep -Fq "ak_live_cli" "$key_path"
EOF

View file

@ -8,10 +8,18 @@ tasks:
set -euo pipefail
source ../../_lib/run.sh
go build -trimpath -ldflags="-s -w" -o bin/core-agent ../../../../cmd/core-agent
GOWORK=off go build -trimpath -ldflags="-s -w" -o bin/core-agent ../../../../cmd/core-agent
export CORE_API_URL="http://127.0.0.1:1"
output="$(mktemp)"
# fleet/nodes calls the API — exit 1 with error is expected offline
# fleet/nodes must fail cleanly when the API is unreachable.
run_capture_all 1 "$output" ./bin/core-agent fleet/nodes
assert_contains "error:" "$output"
assert_contains "fleet" "$output"
if grep -Fq "panic" "$output"; then
printf 'unexpected panic in fleet/nodes output\n' >&2
cat "$output" >&2
exit 1
fi
EOF