feat(php-agent): stream fleet events continuously

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-01 21:45:39 +00:00
parent 1832728b05
commit 036c09c235
2 changed files with 90 additions and 10 deletions

View file

@ -158,23 +158,38 @@ class FleetController extends Controller
{
$validated = $request->validate([
'agent_id' => 'required|string|max:255',
'limit' => 'nullable|integer|min:1',
'poll_interval_ms' => 'nullable|integer|min:100|max:5000',
]);
$workspaceId = (int) $request->attributes->get('workspace_id');
$agentId = $validated['agent_id'];
$limit = $validated['limit'] ?? 0;
$pollIntervalMs = $validated['poll_interval_ms'] ?? 1000;
return response()->stream(function () use ($workspaceId, $agentId): void {
echo "event: ready\n";
echo 'data: '.json_encode(['agent_id' => $agentId])."\n\n";
return response()->stream(function () use ($workspaceId, $agentId, $limit, $pollIntervalMs): void {
$emitted = 0;
$fleetTask = GetNextTask::run($workspaceId, $agentId, []);
if ($fleetTask instanceof FleetTask) {
echo "event: task.assigned\n";
echo 'data: '.json_encode($this->formatTask($fleetTask))."\n\n";
ignore_user_abort(true);
set_time_limit(0);
$this->streamFleetEvent('ready', ['agent_id' => $agentId]);
while (! connection_aborted()) {
$fleetTask = GetNextTask::run($workspaceId, $agentId, []);
if ($fleetTask instanceof FleetTask) {
$this->streamFleetEvent('task.assigned', $this->formatTask($fleetTask));
$emitted++;
if ($limit > 0 && $emitted >= $limit) {
break;
}
continue;
}
usleep($pollIntervalMs * 1000);
}
@ob_flush();
flush();
}, 200, [
'Content-Type' => 'text/event-stream',
'Cache-Control' => 'no-cache',
@ -183,6 +198,18 @@ class FleetController extends Controller
]);
}
/**
* @param array<string, mixed> $data
*/
private function streamFleetEvent(string $event, array $data): void
{
echo "event: {$event}\n";
echo 'data: '.json_encode($data)."\n\n";
@ob_flush();
flush();
}
public function stats(Request $request): JsonResponse
{
$stats = GetFleetStats::run((int) $request->attributes->get('workspace_id'));

View file

@ -0,0 +1,53 @@
<?php
declare(strict_types=1);
use Core\Mod\Agentic\Controllers\Api\FleetController;
use Core\Mod\Agentic\Models\FleetNode;
use Core\Mod\Agentic\Models\FleetTask;
use Core\Tenant\Models\Workspace;
use Illuminate\Http\Request;
it('streams assigned fleet tasks as SSE events', function () {
$workspace = Workspace::factory()->create();
$node = FleetNode::create([
'workspace_id' => $workspace->id,
'agent_id' => 'charon',
'platform' => 'linux',
'status' => FleetNode::STATUS_ONLINE,
]);
$task = FleetTask::create([
'workspace_id' => $workspace->id,
'fleet_node_id' => $node->id,
'repo' => 'core/app',
'task' => 'Fix the failing tests',
'status' => FleetTask::STATUS_ASSIGNED,
]);
$request = Request::create('/v1/fleet/events', 'GET', [
'agent_id' => 'charon',
'limit' => 1,
'poll_interval_ms' => 100,
]);
$request->attributes->set('workspace_id', $workspace->id);
$response = app(FleetController::class)->events($request);
ob_start();
$response->sendContent();
$output = ob_get_clean();
expect($output)->toContain("event: ready")
->and($output)->toContain('"agent_id":"charon"')
->and($output)->toContain("event: task.assigned")
->and($output)->toContain('"repo":"core/app"')
->and($output)->toContain('"task":"Fix the failing tests"');
$task->refresh();
$node->refresh();
expect($task->status)->toBe(FleetTask::STATUS_IN_PROGRESS)
->and($node->status)->toBe(FleetNode::STATUS_BUSY)
->and($node->current_task_id)->toBe($task->id);
});