php-agentic/tests/Unit/Concerns/HasStreamParsingTest.php

466 lines
14 KiB
PHP
Raw Normal View History

<?php
declare(strict_types=1);
/**
* Tests for the HasStreamParsing trait.
*
* Exercises SSE (Server-Sent Events) and JSON object stream parsing
* including chunked reads, edge cases, and error handling.
*/
use Core\Mod\Agentic\Services\Concerns\HasStreamParsing;
use Psr\Http\Message\StreamInterface;
// ---------------------------------------------------------------------------
// Test helpers
// ---------------------------------------------------------------------------
/**
* Create a minimal in-memory PSR-7 stream from a string.
*
* Only eof() and read() are needed by the trait; all other
* StreamInterface methods are stubbed.
*/
function fakeStream(string $data, int $chunkSize = 8192): StreamInterface
{
2026-02-23 06:42:24 +00:00
return new class($data, $chunkSize) implements StreamInterface
{
private int $pos = 0;
public function __construct(
private readonly string $data,
private readonly int $chunkSize,
) {}
public function eof(): bool
{
return $this->pos >= strlen($this->data);
}
public function read($length): string
{
$effective = min($length, $this->chunkSize);
$chunk = substr($this->data, $this->pos, $effective);
$this->pos += strlen($chunk);
return $chunk;
}
// --- PSR-7 stubs (not exercised by the trait) ---
2026-02-23 06:42:24 +00:00
public function __toString(): string
{
return $this->data;
}
public function close(): void {}
2026-02-23 06:42:24 +00:00
public function detach()
{
return null;
}
2026-02-23 06:42:24 +00:00
public function getSize(): ?int
{
return strlen($this->data);
}
2026-02-23 06:42:24 +00:00
public function tell(): int
{
return $this->pos;
}
2026-02-23 06:42:24 +00:00
public function isSeekable(): bool
{
return false;
}
public function seek($offset, $whence = SEEK_SET): void {}
public function rewind(): void {}
2026-02-23 06:42:24 +00:00
public function isWritable(): bool
{
return false;
}
2026-02-23 06:42:24 +00:00
public function write($string): int
{
return 0;
}
2026-02-23 06:42:24 +00:00
public function isReadable(): bool
{
return true;
}
2026-02-23 06:42:24 +00:00
public function getContents(): string
{
return substr($this->data, $this->pos);
}
2026-02-23 06:42:24 +00:00
public function getMetadata($key = null)
{
return null;
}
};
}
/**
* Create a testable object that exposes the HasStreamParsing trait methods.
*/
function streamParsingService(): object
{
2026-02-23 06:42:24 +00:00
return new class
{
use HasStreamParsing;
public function sse(StreamInterface $stream, callable $extract): Generator
{
return $this->parseSSEStream($stream, $extract);
}
public function json(StreamInterface $stream, callable $extract): Generator
{
return $this->parseJSONStream($stream, $extract);
}
};
}
// ---------------------------------------------------------------------------
// parseSSEStream basic data extraction
// ---------------------------------------------------------------------------
describe('parseSSEStream basic parsing', function () {
it('yields content from a single data line', function () {
$raw = "data: {\"text\":\"hello\"}\n\n";
$service = streamParsingService();
$results = iterator_to_array(
$service->sse(fakeStream($raw), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBe(['hello']);
});
it('yields content from multiple data lines', function () {
$raw = "data: {\"text\":\"foo\"}\n";
$raw .= "data: {\"text\":\"bar\"}\n";
$raw .= "data: {\"text\":\"baz\"}\n";
$service = streamParsingService();
$results = iterator_to_array(
$service->sse(fakeStream($raw), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBe(['foo', 'bar', 'baz']);
});
it('handles Windows-style \\r\\n line endings', function () {
$raw = "data: {\"text\":\"crlf\"}\r\n\r\n";
$service = streamParsingService();
$results = iterator_to_array(
$service->sse(fakeStream($raw), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBe(['crlf']);
});
});
// ---------------------------------------------------------------------------
// parseSSEStream stream termination
// ---------------------------------------------------------------------------
describe('parseSSEStream stream termination', function () {
it('stops yielding when it encounters [DONE]', function () {
$raw = "data: {\"text\":\"before\"}\n";
$raw .= "data: [DONE]\n";
$raw .= "data: {\"text\":\"after\"}\n";
$service = streamParsingService();
$results = iterator_to_array(
$service->sse(fakeStream($raw), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBe(['before']);
});
it('stops when [DONE] has surrounding whitespace', function () {
$raw = "data: {\"text\":\"first\"}\n";
$raw .= "data: [DONE] \n";
$service = streamParsingService();
$results = iterator_to_array(
$service->sse(fakeStream($raw), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBe(['first']);
});
it('yields nothing from an empty stream', function () {
$service = streamParsingService();
$results = iterator_to_array(
$service->sse(fakeStream(''), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBeEmpty();
});
});
// ---------------------------------------------------------------------------
// parseSSEStream skipped lines
// ---------------------------------------------------------------------------
describe('parseSSEStream skipped lines', function () {
it('skips blank/separator lines', function () {
$raw = "\n\ndata: {\"text\":\"ok\"}\n\n\n";
$service = streamParsingService();
$results = iterator_to_array(
$service->sse(fakeStream($raw), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBe(['ok']);
});
it('skips non-data SSE fields (event:, id:, retry:)', function () {
$raw = "event: message\n";
$raw .= "id: 42\n";
$raw .= "retry: 3000\n";
$raw .= "data: {\"text\":\"content\"}\n";
$service = streamParsingService();
$results = iterator_to_array(
$service->sse(fakeStream($raw), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBe(['content']);
});
it('skips SSE comment lines starting with colon', function () {
$raw = ": keep-alive\n";
$raw .= "data: {\"text\":\"real\"}\n";
$service = streamParsingService();
$results = iterator_to_array(
$service->sse(fakeStream($raw), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBe(['real']);
});
it('skips data lines with empty payload after trimming', function () {
$raw = "data: \n";
$raw .= "data: {\"text\":\"actual\"}\n";
$service = streamParsingService();
$results = iterator_to_array(
$service->sse(fakeStream($raw), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBe(['actual']);
});
});
// ---------------------------------------------------------------------------
// parseSSEStream error handling
// ---------------------------------------------------------------------------
describe('parseSSEStream error handling', function () {
it('skips lines with invalid JSON', function () {
$raw = "data: not-valid-json\n";
$raw .= "data: {\"text\":\"valid\"}\n";
$service = streamParsingService();
$results = iterator_to_array(
$service->sse(fakeStream($raw), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBe(['valid']);
});
it('skips lines where extractor returns null', function () {
$raw = "data: {\"other\":\"field\"}\n";
$raw .= "data: {\"text\":\"present\"}\n";
$service = streamParsingService();
$results = iterator_to_array(
$service->sse(fakeStream($raw), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBe(['present']);
});
it('skips lines where extractor returns empty string', function () {
$raw = "data: {\"text\":\"\"}\n";
$raw .= "data: {\"text\":\"hello\"}\n";
$service = streamParsingService();
$results = iterator_to_array(
$service->sse(fakeStream($raw), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBe(['hello']);
});
});
// ---------------------------------------------------------------------------
// parseSSEStream chunked / partial reads
// ---------------------------------------------------------------------------
describe('parseSSEStream chunked reads', function () {
it('handles a stream delivered in small chunks', function () {
$raw = "data: {\"text\":\"chunked\"}\n\n";
$service = streamParsingService();
// Force the stream to return 5 bytes at a time
$results = iterator_to_array(
$service->sse(fakeStream($raw, 5), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBe(['chunked']);
});
it('processes remaining data buffered after stream EOF', function () {
// No trailing newline data stays in the buffer until EOF
2026-02-23 06:42:24 +00:00
$raw = 'data: {"text":"buffered"}';
$service = streamParsingService();
$results = iterator_to_array(
$service->sse(fakeStream($raw), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBe(['buffered']);
});
});
// ---------------------------------------------------------------------------
// parseJSONStream basic parsing
// ---------------------------------------------------------------------------
describe('parseJSONStream basic parsing', function () {
it('yields content from a single JSON object', function () {
$raw = '{"text":"hello"}';
$service = streamParsingService();
$results = iterator_to_array(
$service->json(fakeStream($raw), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBe(['hello']);
});
it('yields content from multiple consecutive JSON objects', function () {
$raw = '{"text":"first"}{"text":"second"}{"text":"third"}';
$service = streamParsingService();
$results = iterator_to_array(
$service->json(fakeStream($raw), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBe(['first', 'second', 'third']);
});
it('handles JSON objects separated by whitespace', function () {
$raw = " {\"text\":\"a\"}\n\n {\"text\":\"b\"}\n";
$service = streamParsingService();
$results = iterator_to_array(
$service->json(fakeStream($raw), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBe(['a', 'b']);
});
it('handles nested JSON objects correctly', function () {
$raw = '{"outer":{"inner":"value"},"text":"top"}';
$service = streamParsingService();
$results = iterator_to_array(
$service->json(fakeStream($raw), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBe(['top']);
});
it('handles escaped quotes inside strings', function () {
$raw = '{"text":"say \"hello\""}';
$service = streamParsingService();
$results = iterator_to_array(
$service->json(fakeStream($raw), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBe(['say "hello"']);
});
});
// ---------------------------------------------------------------------------
// parseJSONStream extractor filtering
// ---------------------------------------------------------------------------
describe('parseJSONStream extractor filtering', function () {
it('skips objects where extractor returns null', function () {
$raw = '{"other":"x"}{"text":"keep"}';
$service = streamParsingService();
$results = iterator_to_array(
$service->json(fakeStream($raw), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBe(['keep']);
});
it('skips objects where extractor returns empty string', function () {
$raw = '{"text":""}{"text":"non-empty"}';
$service = streamParsingService();
$results = iterator_to_array(
$service->json(fakeStream($raw), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBe(['non-empty']);
});
it('yields nothing from an empty stream', function () {
$service = streamParsingService();
$results = iterator_to_array(
$service->json(fakeStream(''), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBeEmpty();
});
});
// ---------------------------------------------------------------------------
// parseJSONStream chunked reads
// ---------------------------------------------------------------------------
describe('parseJSONStream chunked reads', function () {
it('handles objects split across multiple chunks', function () {
$raw = '{"text":"split"}';
$service = streamParsingService();
// Force 3-byte chunks to ensure the object is assembled across reads
$results = iterator_to_array(
$service->json(fakeStream($raw, 3), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBe(['split']);
});
it('handles multiple objects across chunks', function () {
$raw = '{"text":"a"}{"text":"b"}';
$service = streamParsingService();
$results = iterator_to_array(
$service->json(fakeStream($raw, 4), fn ($json) => $json['text'] ?? null)
);
expect($results)->toBe(['a', 'b']);
});
});