php-agentic/Services/Concerns/HasStreamParsing.php
Snider ad83825f93 refactor: rename namespace Core\Agentic to Core\Mod\Agentic
Updates all classes to use the new modular namespace convention.
Adds Service/ layer with Core\Service\Agentic for service definition.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-27 16:12:58 +00:00

188 lines
5.9 KiB
PHP

<?php
declare(strict_types=1);
namespace Core\Mod\Agentic\Services\Concerns;
use Generator;
use Psr\Http\Message\StreamInterface;
/**
* Provides robust SSE (Server-Sent Events) stream parsing for AI provider services.
*
* Handles:
* - Chunked/partial reads
* - Line buffering across chunks
* - SSE event parsing (data:, event:, etc.)
*/
trait HasStreamParsing
{
/**
* Parse SSE stream and yield data payloads.
*
* @param StreamInterface $stream The HTTP response body stream
* @param callable $extractContent Function to extract content from parsed JSON data
* @return Generator<string>
*/
protected function parseSSEStream(StreamInterface $stream, callable $extractContent): Generator
{
$buffer = '';
while (! $stream->eof()) {
$chunk = $stream->read(8192);
if ($chunk === '') {
continue;
}
$buffer .= $chunk;
// Process complete lines from the buffer
while (($newlinePos = strpos($buffer, "\n")) !== false) {
$line = substr($buffer, 0, $newlinePos);
$buffer = substr($buffer, $newlinePos + 1);
// Trim carriage return if present (handle \r\n)
$line = rtrim($line, "\r");
// Skip empty lines (event separators)
if ($line === '') {
continue;
}
// Parse SSE data lines
if (str_starts_with($line, 'data: ')) {
$data = substr($line, 6);
// Check for stream termination
if ($data === '[DONE]' || trim($data) === '[DONE]') {
return;
}
// Skip empty data
if (trim($data) === '') {
continue;
}
// Parse JSON payload
$json = json_decode($data, true);
if ($json === null && json_last_error() !== JSON_ERROR_NONE) {
// Invalid JSON, skip this line
continue;
}
// Extract content using provider-specific callback
$content = $extractContent($json);
if ($content !== null && $content !== '') {
yield $content;
}
}
// Skip other SSE fields (event:, id:, retry:, comments starting with :)
}
}
// Process any remaining data in buffer after stream ends
if (trim($buffer) !== '') {
$lines = explode("\n", $buffer);
foreach ($lines as $line) {
$line = rtrim($line, "\r");
if (str_starts_with($line, 'data: ')) {
$data = substr($line, 6);
if ($data !== '[DONE]' && trim($data) !== '' && trim($data) !== '[DONE]') {
$json = json_decode($data, true);
if ($json !== null) {
$content = $extractContent($json);
if ($content !== null && $content !== '') {
yield $content;
}
}
}
}
}
}
}
/**
* Parse JSON object stream (for providers like Gemini that don't use SSE).
*
* @param StreamInterface $stream The HTTP response body stream
* @param callable $extractContent Function to extract content from parsed JSON data
* @return Generator<string>
*/
protected function parseJSONStream(StreamInterface $stream, callable $extractContent): Generator
{
$buffer = '';
$braceDepth = 0;
$inString = false;
$escape = false;
$objectStart = -1;
while (! $stream->eof()) {
$chunk = $stream->read(8192);
if ($chunk === '') {
continue;
}
$buffer .= $chunk;
// Parse JSON objects from the buffer
$length = strlen($buffer);
$i = 0;
while ($i < $length) {
$char = $buffer[$i];
if ($escape) {
$escape = false;
$i++;
continue;
}
if ($char === '\\' && $inString) {
$escape = true;
$i++;
continue;
}
if ($char === '"') {
$inString = ! $inString;
} elseif (! $inString) {
if ($char === '{') {
if ($braceDepth === 0) {
$objectStart = $i;
}
$braceDepth++;
} elseif ($char === '}') {
$braceDepth--;
if ($braceDepth === 0 && $objectStart >= 0) {
// Complete JSON object found
$jsonStr = substr($buffer, $objectStart, $i - $objectStart + 1);
$json = json_decode($jsonStr, true);
if ($json !== null) {
$content = $extractContent($json);
if ($content !== null && $content !== '') {
yield $content;
}
}
// Update buffer to remove processed content
$buffer = substr($buffer, $i + 1);
$length = strlen($buffer);
$i = -1; // Will be incremented to 0
$objectStart = -1;
}
}
}
$i++;
}
}
}
}