diff --git a/php/Controllers/Api/FleetController.php b/php/Controllers/Api/FleetController.php index d17ce25..334c522 100644 --- a/php/Controllers/Api/FleetController.php +++ b/php/Controllers/Api/FleetController.php @@ -158,23 +158,38 @@ class FleetController extends Controller { $validated = $request->validate([ 'agent_id' => 'required|string|max:255', + 'limit' => 'nullable|integer|min:1', + 'poll_interval_ms' => 'nullable|integer|min:100|max:5000', ]); $workspaceId = (int) $request->attributes->get('workspace_id'); $agentId = $validated['agent_id']; + $limit = $validated['limit'] ?? 0; + $pollIntervalMs = $validated['poll_interval_ms'] ?? 1000; - return response()->stream(function () use ($workspaceId, $agentId): void { - echo "event: ready\n"; - echo 'data: '.json_encode(['agent_id' => $agentId])."\n\n"; + return response()->stream(function () use ($workspaceId, $agentId, $limit, $pollIntervalMs): void { + $emitted = 0; - $fleetTask = GetNextTask::run($workspaceId, $agentId, []); - if ($fleetTask instanceof FleetTask) { - echo "event: task.assigned\n"; - echo 'data: '.json_encode($this->formatTask($fleetTask))."\n\n"; + ignore_user_abort(true); + set_time_limit(0); + + $this->streamFleetEvent('ready', ['agent_id' => $agentId]); + + while (! connection_aborted()) { + $fleetTask = GetNextTask::run($workspaceId, $agentId, []); + if ($fleetTask instanceof FleetTask) { + $this->streamFleetEvent('task.assigned', $this->formatTask($fleetTask)); + $emitted++; + + if ($limit > 0 && $emitted >= $limit) { + break; + } + + continue; + } + + usleep($pollIntervalMs * 1000); } - - @ob_flush(); - flush(); }, 200, [ 'Content-Type' => 'text/event-stream', 'Cache-Control' => 'no-cache', @@ -183,6 +198,18 @@ class FleetController extends Controller ]); } + /** + * @param array $data + */ + private function streamFleetEvent(string $event, array $data): void + { + echo "event: {$event}\n"; + echo 'data: '.json_encode($data)."\n\n"; + + @ob_flush(); + flush(); + } + public function stats(Request $request): JsonResponse { $stats = GetFleetStats::run((int) $request->attributes->get('workspace_id')); diff --git a/php/tests/Feature/FleetControllerTest.php b/php/tests/Feature/FleetControllerTest.php new file mode 100644 index 0000000..a17ca79 --- /dev/null +++ b/php/tests/Feature/FleetControllerTest.php @@ -0,0 +1,53 @@ +create(); + $node = FleetNode::create([ + 'workspace_id' => $workspace->id, + 'agent_id' => 'charon', + 'platform' => 'linux', + 'status' => FleetNode::STATUS_ONLINE, + ]); + + $task = FleetTask::create([ + 'workspace_id' => $workspace->id, + 'fleet_node_id' => $node->id, + 'repo' => 'core/app', + 'task' => 'Fix the failing tests', + 'status' => FleetTask::STATUS_ASSIGNED, + ]); + + $request = Request::create('/v1/fleet/events', 'GET', [ + 'agent_id' => 'charon', + 'limit' => 1, + 'poll_interval_ms' => 100, + ]); + $request->attributes->set('workspace_id', $workspace->id); + + $response = app(FleetController::class)->events($request); + + ob_start(); + $response->sendContent(); + $output = ob_get_clean(); + + expect($output)->toContain("event: ready") + ->and($output)->toContain('"agent_id":"charon"') + ->and($output)->toContain("event: task.assigned") + ->and($output)->toContain('"repo":"core/app"') + ->and($output)->toContain('"task":"Fix the failing tests"'); + + $task->refresh(); + $node->refresh(); + + expect($task->status)->toBe(FleetTask::STATUS_IN_PROGRESS) + ->and($node->status)->toBe(FleetNode::STATUS_BUSY) + ->and($node->current_task_id)->toBe($task->id); +});