feat: adding stream parser (#12666)

Add a stream parser to extract citations (and others) from a stream.
This support cases where markers are split in differen tokens.

Codex never manage to make this code work so everything was done
manually. Please review correctly and do not touch this part of the code
without a very clear understanding of it
This commit is contained in:
jif-oai 2026-02-25 13:27:58 +00:00 committed by GitHub
parent 5a9a5b51b2
commit 5441130e0a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 2070 additions and 371 deletions

8
codex-rs/Cargo.lock generated
View file

@ -1724,6 +1724,7 @@ dependencies = [
"codex-utils-home-dir",
"codex-utils-pty",
"codex-utils-readiness",
"codex-utils-stream-parser",
"codex-utils-string",
"codex-windows-sandbox",
"core-foundation 0.9.4",
@ -2547,6 +2548,13 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "codex-utils-stream-parser"
version = "0.0.0"
dependencies = [
"pretty_assertions",
]
[[package]]
name = "codex-utils-string"
version = "0.0.0"

View file

@ -58,6 +58,7 @@ members = [
"utils/approval-presets",
"utils/oss",
"utils/fuzzy-match",
"utils/stream-parser",
"codex-client",
"codex-api",
"state",
@ -137,6 +138,7 @@ codex-utils-rustls-provider = { path = "utils/rustls-provider" }
codex-utils-sandbox-summary = { path = "utils/sandbox-summary" }
codex-utils-sleep-inhibitor = { path = "utils/sleep-inhibitor" }
codex-utils-string = { path = "utils/string" }
codex-utils-stream-parser = { path = "utils/stream-parser" }
codex-windows-sandbox = { path = "windows-sandbox-rs" }
core_test_support = { path = "core/tests/common" }
mcp_test_support = { path = "mcp-server/tests/common" }

View file

@ -50,6 +50,7 @@ codex-utils-pty = { workspace = true }
codex-utils-readiness = { workspace = true }
codex-secrets = { workspace = true }
codex-utils-string = { workspace = true }
codex-utils-stream-parser = { workspace = true }
codex-windows-sandbox = { package = "codex-windows-sandbox", path = "../windows-sandbox-rs" }
csv = { workspace = true }
dirs = { workspace = true }

View file

@ -42,6 +42,7 @@ use crate::stream_events_utils::HandleOutputCtx;
use crate::stream_events_utils::handle_non_tool_response_item;
use crate::stream_events_utils::handle_output_item_done;
use crate::stream_events_utils::last_assistant_message_from_item;
use crate::stream_events_utils::raw_assistant_output_text_from_item;
use crate::terminal;
use crate::truncate::TruncationPolicy;
use crate::turn_metadata::TurnMetadataState;
@ -92,6 +93,11 @@ use codex_protocol::request_user_input::RequestUserInputResponse;
use codex_protocol::skill_approval::SkillApprovalResponse;
use codex_rmcp_client::ElicitationResponse;
use codex_rmcp_client::OAuthCredentialsStoreMode;
use codex_utils_stream_parser::AssistantTextChunk;
use codex_utils_stream_parser::AssistantTextStreamParser;
use codex_utils_stream_parser::ProposedPlanSegment;
use codex_utils_stream_parser::extract_proposed_plan_text;
use codex_utils_stream_parser::strip_citations;
use futures::future::BoxFuture;
use futures::prelude::*;
use futures::stream::FuturesOrdered;
@ -174,9 +180,6 @@ use crate::mentions::collect_explicit_app_ids;
use crate::mentions::collect_tool_mentions_from_messages;
use crate::network_policy_decision::execpolicy_network_rule_amendment;
use crate::project_doc::get_user_instructions;
use crate::proposed_plan_parser::ProposedPlanParser;
use crate::proposed_plan_parser::ProposedPlanSegment;
use crate::proposed_plan_parser::extract_proposed_plan_text;
use crate::protocol::AgentMessageContentDeltaEvent;
use crate::protocol::AgentReasoningSectionBreakEvent;
use crate::protocol::ApplyPatchApprovalRequestEvent;
@ -5618,39 +5621,9 @@ struct ProposedPlanItemState {
completed: bool,
}
/// Per-item plan parsers so we can buffer text while detecting `<proposed_plan>`
/// tags without ever mixing buffered lines across item ids.
struct PlanParsers {
assistant: HashMap<String, ProposedPlanParser>,
}
impl PlanParsers {
fn new() -> Self {
Self {
assistant: HashMap::new(),
}
}
fn assistant_parser_mut(&mut self, item_id: &str) -> &mut ProposedPlanParser {
self.assistant
.entry(item_id.to_string())
.or_insert_with(ProposedPlanParser::new)
}
fn take_assistant_parser(&mut self, item_id: &str) -> Option<ProposedPlanParser> {
self.assistant.remove(item_id)
}
fn drain_assistant_parsers(&mut self) -> Vec<(String, ProposedPlanParser)> {
self.assistant.drain().collect()
}
}
/// Aggregated state used only while streaming a plan-mode response.
/// Includes per-item parsers, deferred agent message bookkeeping, and the plan item lifecycle.
struct PlanModeStreamState {
/// Per-item parsers for assistant streams in plan mode.
plan_parsers: PlanParsers,
/// Agent message items started by the model but deferred until we see non-plan text.
pending_agent_message_items: HashMap<String, TurnItem>,
/// Agent message items whose start notification has been emitted.
@ -5664,7 +5637,6 @@ struct PlanModeStreamState {
impl PlanModeStreamState {
fn new(turn_id: &str) -> Self {
Self {
plan_parsers: PlanParsers::new(),
pending_agent_message_items: HashMap::new(),
started_agent_message_items: HashSet::new(),
leading_whitespace_by_item: HashMap::new(),
@ -5673,6 +5645,56 @@ impl PlanModeStreamState {
}
}
#[derive(Debug, Default)]
struct AssistantMessageStreamParsers {
plan_mode: bool,
parsers_by_item: HashMap<String, AssistantTextStreamParser>,
}
type ParsedAssistantTextDelta = AssistantTextChunk;
impl AssistantMessageStreamParsers {
fn new(plan_mode: bool) -> Self {
Self {
plan_mode,
parsers_by_item: HashMap::new(),
}
}
fn parser_mut(&mut self, item_id: &str) -> &mut AssistantTextStreamParser {
let plan_mode = self.plan_mode;
self.parsers_by_item
.entry(item_id.to_string())
.or_insert_with(|| AssistantTextStreamParser::new(plan_mode))
}
fn seed_item_text(&mut self, item_id: &str, text: &str) -> ParsedAssistantTextDelta {
if text.is_empty() {
return ParsedAssistantTextDelta::default();
}
self.parser_mut(item_id).push_str(text)
}
fn parse_delta(&mut self, item_id: &str, delta: &str) -> ParsedAssistantTextDelta {
self.parser_mut(item_id).push_str(delta)
}
fn finish_item(&mut self, item_id: &str) -> ParsedAssistantTextDelta {
let Some(mut parser) = self.parsers_by_item.remove(item_id) else {
return ParsedAssistantTextDelta::default();
};
parser.finish()
}
fn drain_finished(&mut self) -> Vec<(String, ParsedAssistantTextDelta)> {
let parsers_by_item = std::mem::take(&mut self.parsers_by_item);
parsers_by_item
.into_iter()
.map(|(item_id, mut parser)| (item_id, parser.finish()))
.collect()
}
}
impl ProposedPlanItemState {
fn new(turn_id: &str) -> Self {
Self {
@ -5875,35 +5897,68 @@ async fn handle_plan_segments(
}
}
/// Flush any buffered proposed-plan segments when a specific assistant message ends.
async fn flush_proposed_plan_segments_for_item(
async fn emit_streamed_assistant_text_delta(
sess: &Session,
turn_context: &TurnContext,
state: &mut PlanModeStreamState,
plan_mode_state: Option<&mut PlanModeStreamState>,
item_id: &str,
parsed: ParsedAssistantTextDelta,
) {
let Some(mut parser) = state.plan_parsers.take_assistant_parser(item_id) else {
return;
};
let segments = parser.finish();
if segments.is_empty() {
if parsed.is_empty() {
return;
}
handle_plan_segments(sess, turn_context, state, item_id, segments).await;
if !parsed.citations.is_empty() {
// Citation extraction is intentionally local for now; we strip citations from display text
// but do not yet surface them in protocol events.
let _citations = parsed.citations;
}
if let Some(state) = plan_mode_state {
if !parsed.plan_segments.is_empty() {
handle_plan_segments(sess, turn_context, state, item_id, parsed.plan_segments).await;
}
return;
}
if parsed.visible_text.is_empty() {
return;
}
let event = AgentMessageContentDeltaEvent {
thread_id: sess.conversation_id.to_string(),
turn_id: turn_context.sub_id.clone(),
item_id: item_id.to_string(),
delta: parsed.visible_text,
};
sess.send_event(turn_context, EventMsg::AgentMessageContentDelta(event))
.await;
}
/// Flush any remaining assistant plan parsers when the response completes.
async fn flush_proposed_plan_segments_all(
/// Flush buffered assistant text parser state when an assistant message item ends.
async fn flush_assistant_text_segments_for_item(
sess: &Session,
turn_context: &TurnContext,
state: &mut PlanModeStreamState,
plan_mode_state: Option<&mut PlanModeStreamState>,
parsers: &mut AssistantMessageStreamParsers,
item_id: &str,
) {
for (item_id, mut parser) in state.plan_parsers.drain_assistant_parsers() {
let segments = parser.finish();
if segments.is_empty() {
continue;
}
handle_plan_segments(sess, turn_context, state, &item_id, segments).await;
let parsed = parsers.finish_item(item_id);
emit_streamed_assistant_text_delta(sess, turn_context, plan_mode_state, item_id, parsed).await;
}
/// Flush any remaining buffered assistant text parser state at response completion.
async fn flush_assistant_text_segments_all(
sess: &Session,
turn_context: &TurnContext,
mut plan_mode_state: Option<&mut PlanModeStreamState>,
parsers: &mut AssistantMessageStreamParsers,
) {
for (item_id, parsed) in parsers.drain_finished() {
emit_streamed_assistant_text_delta(
sess,
turn_context,
plan_mode_state.as_deref_mut(),
&item_id,
parsed,
)
.await;
}
}
@ -5924,6 +5979,7 @@ async fn maybe_complete_plan_item_from_message(
}
}
if let Some(plan_text) = extract_proposed_plan_text(&text) {
let (plan_text, _citations) = strip_citations(&plan_text);
if !state.plan_item_state.started {
state.plan_item_state.start(sess, turn_context).await;
}
@ -6112,6 +6168,7 @@ async fn try_run_sampling_request(
let mut active_item: Option<TurnItem> = None;
let mut should_emit_turn_diff = false;
let plan_mode = turn_context.collaboration_mode.mode == ModeKind::Plan;
let mut assistant_message_stream_parsers = AssistantMessageStreamParsers::new(plan_mode);
let mut plan_mode_state = plan_mode.then(|| PlanModeStreamState::new(&turn_context.sub_id));
let receiving_span = trace_span!("receiving_stream");
let outcome: CodexResult<SamplingRequestResult> = loop {
@ -6151,20 +6208,21 @@ async fn try_run_sampling_request(
ResponseEvent::Created => {}
ResponseEvent::OutputItemDone(item) => {
let previously_active_item = active_item.take();
if let Some(state) = plan_mode_state.as_mut() {
if let Some(previous) = previously_active_item.as_ref() {
let item_id = previous.id();
if matches!(previous, TurnItem::AgentMessage(_)) {
flush_proposed_plan_segments_for_item(
&sess,
&turn_context,
state,
&item_id,
)
.await;
}
}
if handle_assistant_item_done_in_plan_mode(
if let Some(previous) = previously_active_item.as_ref()
&& matches!(previous, TurnItem::AgentMessage(_))
{
let item_id = previous.id();
flush_assistant_text_segments_for_item(
&sess,
&turn_context,
plan_mode_state.as_mut(),
&mut assistant_message_stream_parsers,
&item_id,
)
.await;
}
if let Some(state) = plan_mode_state.as_mut()
&& handle_assistant_item_done_in_plan_mode(
&sess,
&turn_context,
&item,
@ -6173,9 +6231,8 @@ async fn try_run_sampling_request(
&mut last_agent_message,
)
.await
{
continue;
}
{
continue;
}
let mut ctx = HandleOutputCtx {
@ -6198,6 +6255,28 @@ async fn try_run_sampling_request(
}
ResponseEvent::OutputItemAdded(item) => {
if let Some(turn_item) = handle_non_tool_response_item(&item, plan_mode).await {
let mut turn_item = turn_item;
let mut seeded_parsed: Option<ParsedAssistantTextDelta> = None;
let mut seeded_item_id: Option<String> = None;
if matches!(turn_item, TurnItem::AgentMessage(_))
&& let Some(raw_text) = raw_assistant_output_text_from_item(&item)
{
let item_id = turn_item.id();
let mut seeded =
assistant_message_stream_parsers.seed_item_text(&item_id, &raw_text);
if let TurnItem::AgentMessage(agent_message) = &mut turn_item {
agent_message.content =
vec![codex_protocol::items::AgentMessageContent::Text {
text: if plan_mode {
String::new()
} else {
std::mem::take(&mut seeded.visible_text)
},
}];
}
seeded_parsed = plan_mode.then_some(seeded);
seeded_item_id = Some(item_id);
}
if let Some(state) = plan_mode_state.as_mut()
&& matches!(turn_item, TurnItem::AgentMessage(_))
{
@ -6208,6 +6287,20 @@ async fn try_run_sampling_request(
} else {
sess.emit_turn_item_started(&turn_context, &turn_item).await;
}
if let (Some(state), Some(item_id), Some(parsed)) = (
plan_mode_state.as_mut(),
seeded_item_id.as_deref(),
seeded_parsed,
) {
emit_streamed_assistant_text_delta(
&sess,
&turn_context,
Some(state),
item_id,
parsed,
)
.await;
}
active_item = Some(turn_item);
}
}
@ -6237,9 +6330,13 @@ async fn try_run_sampling_request(
token_usage,
can_append: _,
} => {
if let Some(state) = plan_mode_state.as_mut() {
flush_proposed_plan_segments_all(&sess, &turn_context, state).await;
}
flush_assistant_text_segments_all(
&sess,
&turn_context,
plan_mode_state.as_mut(),
&mut assistant_message_stream_parsers,
)
.await;
sess.update_token_usage_info(&turn_context, token_usage.as_ref())
.await;
should_emit_turn_diff = true;
@ -6256,14 +6353,16 @@ async fn try_run_sampling_request(
// UI will show a selection popup from the final ReviewOutput.
if let Some(active) = active_item.as_ref() {
let item_id = active.id();
if let Some(state) = plan_mode_state.as_mut()
&& matches!(active, TurnItem::AgentMessage(_))
{
let segments = state
.plan_parsers
.assistant_parser_mut(&item_id)
.parse(&delta);
handle_plan_segments(&sess, &turn_context, state, &item_id, segments).await;
if matches!(active, TurnItem::AgentMessage(_)) {
let parsed = assistant_message_stream_parsers.parse_delta(&item_id, &delta);
emit_streamed_assistant_text_delta(
&sess,
&turn_context,
plan_mode_state.as_mut(),
&item_id,
parsed,
)
.await;
} else {
let event = AgentMessageContentDeltaEvent {
thread_id: sess.conversation_id.to_string(),
@ -6329,6 +6428,14 @@ async fn try_run_sampling_request(
}
};
flush_assistant_text_segments_all(
&sess,
&turn_context,
plan_mode_state.as_mut(),
&mut assistant_message_stream_parsers,
)
.await;
drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?;
if should_emit_turn_diff {
@ -6346,23 +6453,10 @@ async fn try_run_sampling_request(
}
pub(super) fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option<String> {
responses.iter().rev().find_map(|item| {
if let ResponseItem::Message { role, content, .. } = item {
if role == "assistant" {
content.iter().rev().find_map(|ci| {
if let ContentItem::OutputText { text } = ci {
Some(text.clone())
} else {
None
}
})
} else {
None
}
} else {
None
}
})
responses
.iter()
.rev()
.find_map(|item| last_assistant_message_from_item(item, false))
}
use crate::memories::prompts::build_memory_tool_developer_instructions;
@ -6486,6 +6580,68 @@ mod tests {
}
}
#[test]
fn assistant_message_stream_parsers_can_be_seeded_from_output_item_added_text() {
let mut parsers = AssistantMessageStreamParsers::new(false);
let item_id = "msg-1";
let seeded = parsers.seed_item_text(item_id, "hello <oai-mem-citation>doc");
let parsed = parsers.parse_delta(item_id, "1</oai-mem-citation> world");
let tail = parsers.finish_item(item_id);
assert_eq!(seeded.visible_text, "hello ");
assert_eq!(seeded.citations, Vec::<String>::new());
assert_eq!(parsed.visible_text, " world");
assert_eq!(parsed.citations, vec!["doc1".to_string()]);
assert_eq!(tail.visible_text, "");
assert_eq!(tail.citations, Vec::<String>::new());
}
#[test]
fn assistant_message_stream_parsers_seed_buffered_prefix_stays_out_of_finish_tail() {
let mut parsers = AssistantMessageStreamParsers::new(false);
let item_id = "msg-1";
let seeded = parsers.seed_item_text(item_id, "hello <oai-mem-");
let parsed = parsers.parse_delta(item_id, "citation>doc</oai-mem-citation> world");
let tail = parsers.finish_item(item_id);
assert_eq!(seeded.visible_text, "hello ");
assert_eq!(seeded.citations, Vec::<String>::new());
assert_eq!(parsed.visible_text, " world");
assert_eq!(parsed.citations, vec!["doc".to_string()]);
assert_eq!(tail.visible_text, "");
assert_eq!(tail.citations, Vec::<String>::new());
}
#[test]
fn assistant_message_stream_parsers_seed_plan_parser_across_added_and_delta_boundaries() {
let mut parsers = AssistantMessageStreamParsers::new(true);
let item_id = "msg-1";
let seeded = parsers.seed_item_text(item_id, "Intro\n<proposed");
let parsed = parsers.parse_delta(item_id, "_plan>\n- step\n</proposed_plan>\nOutro");
let tail = parsers.finish_item(item_id);
assert_eq!(seeded.visible_text, "Intro\n");
assert_eq!(
seeded.plan_segments,
vec![ProposedPlanSegment::Normal("Intro\n".to_string())]
);
assert_eq!(parsed.visible_text, "Outro");
assert_eq!(
parsed.plan_segments,
vec![
ProposedPlanSegment::ProposedPlanStart,
ProposedPlanSegment::ProposedPlanDelta("- step\n".to_string()),
ProposedPlanSegment::ProposedPlanEnd,
ProposedPlanSegment::Normal("Outro".to_string()),
]
);
assert_eq!(tail.visible_text, "");
assert!(tail.plan_segments.is_empty());
}
fn make_mcp_tool(
server_name: &str,
tool_name: &str,

View file

@ -56,13 +56,11 @@ mod message_history;
mod model_provider_info;
pub mod path_utils;
pub mod personality_migration;
mod proposed_plan_parser;
mod sandbox_tags;
pub mod sandboxing;
mod session_prefix;
mod shell_detect;
mod stream_events_utils;
mod tagged_block_parser;
pub mod test_support;
mod text_encoding;
pub mod token_data;

View file

@ -1,185 +0,0 @@
use crate::tagged_block_parser::TagSpec;
use crate::tagged_block_parser::TaggedLineParser;
use crate::tagged_block_parser::TaggedLineSegment;
const OPEN_TAG: &str = "<proposed_plan>";
const CLOSE_TAG: &str = "</proposed_plan>";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PlanTag {
ProposedPlan,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum ProposedPlanSegment {
Normal(String),
ProposedPlanStart,
ProposedPlanDelta(String),
ProposedPlanEnd,
}
/// Parser for `<proposed_plan>` blocks emitted in plan mode.
///
/// This is a thin wrapper around the generic line-based tag parser. It maps
/// tag-aware segments into plan-specific segments for downstream consumers.
#[derive(Debug)]
pub(crate) struct ProposedPlanParser {
parser: TaggedLineParser<PlanTag>,
}
impl ProposedPlanParser {
pub(crate) fn new() -> Self {
Self {
parser: TaggedLineParser::new(vec![TagSpec {
open: OPEN_TAG,
close: CLOSE_TAG,
tag: PlanTag::ProposedPlan,
}]),
}
}
pub(crate) fn parse(&mut self, delta: &str) -> Vec<ProposedPlanSegment> {
self.parser
.parse(delta)
.into_iter()
.map(map_plan_segment)
.collect()
}
pub(crate) fn finish(&mut self) -> Vec<ProposedPlanSegment> {
self.parser
.finish()
.into_iter()
.map(map_plan_segment)
.collect()
}
}
fn map_plan_segment(segment: TaggedLineSegment<PlanTag>) -> ProposedPlanSegment {
match segment {
TaggedLineSegment::Normal(text) => ProposedPlanSegment::Normal(text),
TaggedLineSegment::TagStart(PlanTag::ProposedPlan) => {
ProposedPlanSegment::ProposedPlanStart
}
TaggedLineSegment::TagDelta(PlanTag::ProposedPlan, text) => {
ProposedPlanSegment::ProposedPlanDelta(text)
}
TaggedLineSegment::TagEnd(PlanTag::ProposedPlan) => ProposedPlanSegment::ProposedPlanEnd,
}
}
pub(crate) fn strip_proposed_plan_blocks(text: &str) -> String {
let mut parser = ProposedPlanParser::new();
let mut out = String::new();
for segment in parser.parse(text).into_iter().chain(parser.finish()) {
if let ProposedPlanSegment::Normal(delta) = segment {
out.push_str(&delta);
}
}
out
}
pub(crate) fn extract_proposed_plan_text(text: &str) -> Option<String> {
let mut parser = ProposedPlanParser::new();
let mut plan_text = String::new();
let mut saw_plan_block = false;
for segment in parser.parse(text).into_iter().chain(parser.finish()) {
match segment {
ProposedPlanSegment::ProposedPlanStart => {
saw_plan_block = true;
plan_text.clear();
}
ProposedPlanSegment::ProposedPlanDelta(delta) => {
plan_text.push_str(&delta);
}
ProposedPlanSegment::ProposedPlanEnd | ProposedPlanSegment::Normal(_) => {}
}
}
saw_plan_block.then_some(plan_text)
}
#[cfg(test)]
mod tests {
use super::ProposedPlanParser;
use super::ProposedPlanSegment;
use super::strip_proposed_plan_blocks;
use pretty_assertions::assert_eq;
#[test]
fn streams_proposed_plan_segments() {
let mut parser = ProposedPlanParser::new();
let mut segments = Vec::new();
for chunk in [
"Intro text\n<prop",
"osed_plan>\n- step 1\n",
"</proposed_plan>\nOutro",
] {
segments.extend(parser.parse(chunk));
}
segments.extend(parser.finish());
assert_eq!(
segments,
vec![
ProposedPlanSegment::Normal("Intro text\n".to_string()),
ProposedPlanSegment::ProposedPlanStart,
ProposedPlanSegment::ProposedPlanDelta("- step 1\n".to_string()),
ProposedPlanSegment::ProposedPlanEnd,
ProposedPlanSegment::Normal("Outro".to_string()),
]
);
}
#[test]
fn preserves_non_tag_lines() {
let mut parser = ProposedPlanParser::new();
let mut segments = parser.parse(" <proposed_plan> extra\n");
segments.extend(parser.finish());
assert_eq!(
segments,
vec![ProposedPlanSegment::Normal(
" <proposed_plan> extra\n".to_string()
)]
);
}
#[test]
fn closes_unterminated_plan_block_on_finish() {
let mut parser = ProposedPlanParser::new();
let mut segments = parser.parse("<proposed_plan>\n- step 1\n");
segments.extend(parser.finish());
assert_eq!(
segments,
vec![
ProposedPlanSegment::ProposedPlanStart,
ProposedPlanSegment::ProposedPlanDelta("- step 1\n".to_string()),
ProposedPlanSegment::ProposedPlanEnd,
]
);
}
#[test]
fn closes_tag_line_without_trailing_newline() {
let mut parser = ProposedPlanParser::new();
let mut segments = parser.parse("<proposed_plan>\n- step 1\n</proposed_plan>");
segments.extend(parser.finish());
assert_eq!(
segments,
vec![
ProposedPlanSegment::ProposedPlanStart,
ProposedPlanSegment::ProposedPlanDelta("- step 1\n".to_string()),
ProposedPlanSegment::ProposedPlanEnd,
]
);
}
#[test]
fn strips_proposed_plan_blocks_from_text() {
let text = "before\n<proposed_plan>\n- step\n</proposed_plan>\nafter";
assert_eq!(strip_proposed_plan_blocks(text), "before\nafter");
}
}

View file

@ -3,6 +3,7 @@ use std::sync::Arc;
use codex_protocol::config_types::ModeKind;
use codex_protocol::items::TurnItem;
use codex_utils_stream_parser::strip_citations;
use tokio_util::sync::CancellationToken;
use crate::codex::Session;
@ -11,17 +12,42 @@ use crate::error::CodexErr;
use crate::error::Result;
use crate::function_tool::FunctionCallError;
use crate::parse_turn_item;
use crate::proposed_plan_parser::strip_proposed_plan_blocks;
use crate::tools::parallel::ToolCallRuntime;
use crate::tools::router::ToolRouter;
use codex_protocol::models::FunctionCallOutputBody;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_utils_stream_parser::strip_proposed_plan_blocks;
use futures::Future;
use tracing::debug;
use tracing::instrument;
fn strip_hidden_assistant_markup(text: &str, plan_mode: bool) -> String {
let (without_citations, _citations) = strip_citations(text);
if plan_mode {
strip_proposed_plan_blocks(&without_citations)
} else {
without_citations
}
}
pub(crate) fn raw_assistant_output_text_from_item(item: &ResponseItem) -> Option<String> {
if let ResponseItem::Message { role, content, .. } = item
&& role == "assistant"
{
let combined = content
.iter()
.filter_map(|ci| match ci {
codex_protocol::models::ContentItem::OutputText { text } => Some(text.as_str()),
_ => None,
})
.collect::<String>();
return Some(combined);
}
None
}
/// Handle a completed output item from the model stream, recording it and
/// queuing any tool execution futures. This records items immediately so
/// history and rollout stay in sync even if the turn is later cancelled.
@ -169,7 +195,7 @@ pub(crate) async fn handle_non_tool_response_item(
| ResponseItem::Reasoning { .. }
| ResponseItem::WebSearchCall { .. } => {
let mut turn_item = parse_turn_item(item)?;
if plan_mode && let TurnItem::AgentMessage(agent_message) = &mut turn_item {
if let TurnItem::AgentMessage(agent_message) = &mut turn_item {
let combined = agent_message
.content
.iter()
@ -177,7 +203,7 @@ pub(crate) async fn handle_non_tool_response_item(
codex_protocol::items::AgentMessageContent::Text { text } => text.as_str(),
})
.collect::<String>();
let stripped = strip_proposed_plan_blocks(&combined);
let stripped = strip_hidden_assistant_markup(&combined, plan_mode);
agent_message.content =
vec![codex_protocol::items::AgentMessageContent::Text { text: stripped }];
}
@ -195,25 +221,15 @@ pub(crate) fn last_assistant_message_from_item(
item: &ResponseItem,
plan_mode: bool,
) -> Option<String> {
if let ResponseItem::Message { role, content, .. } = item
&& role == "assistant"
{
let combined = content
.iter()
.filter_map(|ci| match ci {
codex_protocol::models::ContentItem::OutputText { text } => Some(text.as_str()),
_ => None,
})
.collect::<String>();
if let Some(combined) = raw_assistant_output_text_from_item(item) {
if combined.is_empty() {
return None;
}
return if plan_mode {
let stripped = strip_proposed_plan_blocks(&combined);
(!stripped.trim().is_empty()).then_some(stripped)
} else {
Some(combined)
};
let stripped = strip_hidden_assistant_markup(&combined, plan_mode);
if stripped.trim().is_empty() {
return None;
}
return Some(stripped);
}
None
}
@ -248,3 +264,72 @@ pub(crate) fn response_input_to_response_item(input: &ResponseInputItem) -> Opti
_ => None,
}
}
#[cfg(test)]
mod tests {
use super::handle_non_tool_response_item;
use super::last_assistant_message_from_item;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use pretty_assertions::assert_eq;
fn assistant_output_text(text: &str) -> ResponseItem {
ResponseItem::Message {
id: Some("msg-1".to_string()),
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: text.to_string(),
}],
end_turn: Some(true),
phase: None,
}
}
#[tokio::test]
async fn handle_non_tool_response_item_strips_citations_from_assistant_message() {
let item = assistant_output_text("hello<oai-mem-citation>doc1</oai-mem-citation> world");
let turn_item = handle_non_tool_response_item(&item, false)
.await
.expect("assistant message should parse");
let TurnItem::AgentMessage(agent_message) = turn_item else {
panic!("expected agent message");
};
let text = agent_message
.content
.iter()
.map(|entry| match entry {
codex_protocol::items::AgentMessageContent::Text { text } => text.as_str(),
})
.collect::<String>();
assert_eq!(text, "hello world");
}
#[test]
fn last_assistant_message_from_item_strips_citations_and_plan_blocks() {
let item = assistant_output_text(
"before<oai-mem-citation>doc1</oai-mem-citation>\n<proposed_plan>\n- x\n</proposed_plan>\nafter",
);
let message = last_assistant_message_from_item(&item, true)
.expect("assistant text should remain after stripping");
assert_eq!(message, "before\nafter");
}
#[test]
fn last_assistant_message_from_item_returns_none_for_citation_only_message() {
let item = assistant_output_text("<oai-mem-citation>doc1</oai-mem-citation>");
assert_eq!(last_assistant_message_from_item(&item, false), None);
}
#[test]
fn last_assistant_message_from_item_returns_none_for_plan_only_hidden_message() {
let item = assistant_output_text("<proposed_plan>\n- x\n</proposed_plan>");
assert_eq!(last_assistant_message_from_item(&item, true), None);
}
}

View file

@ -52,6 +52,47 @@ When to update memory:
- When user explicitly asks to remember/update memory, revise memory_summary.md
and/or MEMORY.md.
Memory citation requirements:
- If ANY relevant memory files were used: append exactly one
`<oai-mem-citation>` block as the VERY LAST content of the final reply.
Normal responses should include the answer first, then append the
`<oai-mem-citation>` block at the end.
- Use this exact structure for programmatic parsing:
```
<oai-mem-citation>
<citation_entries>
MEMORY.md:234-236|note=[responsesapi citation extraction code pointer]
rollout_summaries/2026-02-17T21-23-02-LN3m-weekly_memory_report_pivot_from_git_history.md:10-12|note=[weekly report format]
</citation_entries>
<rollout_ids>
019c6e27-e55b-73d1-87d8-4e01f1f75043
019c7714-3b77-74d1-9866-e1f484aae2ab
</rollout_ids>
</oai-mem-citation>
```
- `citation_entries` is for rendering:
- one citation entry per line
- format: `<file>:<line_start>-<line_end>|note=[<how memory was used>]`
- use file paths relative to the memory base path (for example, `MEMORY.md`,
`rollout_summaries/...`, `skills/...`)
- only cite files actually used under the memory base path (do not cite
workspace files as memory citations)
- if you used `MEMORY.md` and then a rollout summary/skill file, cite both
- list entries in order of importance (most important first)
- `note` should be short, single-line, and use simple characters only (avoid
unusual symbols, no newlines)
- `rollout_ids` is for us to track what previous rollouts you find useful:
- include one rollout id per line
- rollout ids should look like UUIDs (for example,
`019c6e27-e55b-73d1-87d8-4e01f1f75043`)
- include unique ids only; do not repeat ids
- an empty `<rollout_ids>` section is allowed if no rollout ids are available
- you can find rollout ids in rollout summary files and MEMORY.md
- do not include file paths or notes in this section
- Never include memory citations inside pull-request messages.
- Never cite blank lines; double-check ranges.
========= MEMORY_SUMMARY BEGINS =========
{{ memory_summary }}
========= MEMORY_SUMMARY ENDS =========

View file

@ -505,6 +505,314 @@ async fn plan_mode_strips_plan_from_agent_messages() -> anyhow::Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn plan_mode_streaming_citations_are_stripped_across_added_deltas_and_done()
-> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let TestCodex {
codex,
session_configured,
..
} = test_codex().build(&server).await?;
let added_text = "Intro <oai-mem-";
let deltas = [
"citation>outer-doc</oai-mem-citation>\n<proposed",
"_plan>\n- Step 1<oai-mem-",
"citation>plan-doc</oai-mem-citation>\n- Step 2\n</proposed_plan>\nOu",
"tro",
];
let full_message = format!("{added_text}{}", deltas.concat());
let mut events = vec![
ev_response_created("resp-1"),
ev_message_item_added("msg-1", added_text),
];
for delta in deltas {
events.push(ev_output_text_delta(delta));
}
events.push(ev_assistant_message("msg-1", &full_message));
events.push(ev_completed("resp-1"));
mount_sse_once(&server, sse(events)).await;
let collaboration_mode = CollaborationMode {
mode: ModeKind::Plan,
settings: Settings {
model: session_configured.model.clone(),
reasoning_effort: None,
developer_instructions: None,
},
};
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "please plan with citations".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: std::env::current_dir()?,
approval_policy: codex_protocol::protocol::AskForApproval::Never,
sandbox_policy: codex_protocol::protocol::SandboxPolicy::DangerFullAccess,
model: session_configured.model.clone(),
effort: None,
summary: codex_protocol::config_types::ReasoningSummary::Auto,
collaboration_mode: Some(collaboration_mode),
personality: None,
})
.await?;
let mut agent_started = None;
let mut agent_started_idx = None;
let mut agent_completed = None;
let mut agent_completed_idx = None;
let mut plan_started = None;
let mut plan_started_idx = None;
let mut plan_completed = None;
let mut plan_completed_idx = None;
let mut agent_deltas = Vec::new();
let mut plan_deltas = Vec::new();
let mut first_agent_delta_idx = None;
let mut last_agent_delta_idx = None;
let mut first_plan_delta_idx = None;
let mut last_plan_delta_idx = None;
let mut idx = 0usize;
let turn_complete_idx = loop {
let ev = wait_for_event(&codex, |_| true).await;
match ev {
EventMsg::ItemStarted(ItemStartedEvent {
item: TurnItem::AgentMessage(item),
..
}) => {
agent_started_idx = Some(idx);
agent_started = Some(item);
}
EventMsg::ItemStarted(ItemStartedEvent {
item: TurnItem::Plan(item),
..
}) => {
plan_started_idx = Some(idx);
plan_started = Some(item);
}
EventMsg::AgentMessageContentDelta(event) => {
if first_agent_delta_idx.is_none() {
first_agent_delta_idx = Some(idx);
}
last_agent_delta_idx = Some(idx);
agent_deltas.push(event.delta);
}
EventMsg::PlanDelta(event) => {
if first_plan_delta_idx.is_none() {
first_plan_delta_idx = Some(idx);
}
last_plan_delta_idx = Some(idx);
plan_deltas.push(event.delta);
}
EventMsg::ItemCompleted(ItemCompletedEvent {
item: TurnItem::AgentMessage(item),
..
}) => {
agent_completed_idx = Some(idx);
agent_completed = Some(item);
}
EventMsg::ItemCompleted(ItemCompletedEvent {
item: TurnItem::Plan(item),
..
}) => {
plan_completed_idx = Some(idx);
plan_completed = Some(item);
}
EventMsg::TurnComplete(_) => {
break idx;
}
_ => {}
}
idx += 1;
};
let agent_started = agent_started.expect("agent item start should be emitted");
let agent_completed = agent_completed.expect("agent item completion should be emitted");
let plan_started = plan_started.expect("plan item start should be emitted");
let plan_completed = plan_completed.expect("plan item completion should be emitted");
assert_eq!(agent_started.id, agent_completed.id);
assert_eq!(plan_started.id, plan_completed.id);
assert_eq!(plan_started.text, "");
let agent_started_text: String = agent_started
.content
.iter()
.map(|entry| match entry {
AgentMessageContent::Text { text } => text.as_str(),
})
.collect();
let agent_completed_text: String = agent_completed
.content
.iter()
.map(|entry| match entry {
AgentMessageContent::Text { text } => text.as_str(),
})
.collect();
let agent_delta_text = agent_deltas.concat();
let plan_delta_text = plan_deltas.concat();
assert_eq!(agent_started_text, "");
assert_eq!(agent_delta_text, "Intro \nOutro");
assert_eq!(agent_completed_text, "Intro \nOutro");
assert_eq!(plan_delta_text, "- Step 1\n- Step 2\n");
assert_eq!(plan_completed.text, "- Step 1\n- Step 2\n");
for text in [
agent_started_text.as_str(),
agent_delta_text.as_str(),
agent_completed_text.as_str(),
plan_delta_text.as_str(),
plan_completed.text.as_str(),
] {
assert!(!text.contains("<oai-mem-citation>"));
assert!(!text.contains("</oai-mem-citation>"));
}
let agent_started_idx = agent_started_idx.expect("agent start index");
let agent_completed_idx = agent_completed_idx.expect("agent completion index");
let plan_started_idx = plan_started_idx.expect("plan start index");
let plan_completed_idx = plan_completed_idx.expect("plan completion index");
let first_agent_delta_idx = first_agent_delta_idx.expect("agent delta index");
let last_agent_delta_idx = last_agent_delta_idx.expect("agent delta index");
let first_plan_delta_idx = first_plan_delta_idx.expect("plan delta index");
let last_plan_delta_idx = last_plan_delta_idx.expect("plan delta index");
assert!(agent_started_idx < first_agent_delta_idx);
assert!(plan_started_idx < first_plan_delta_idx);
assert!(last_agent_delta_idx < agent_completed_idx);
assert!(last_plan_delta_idx < plan_completed_idx);
assert!(agent_completed_idx < turn_complete_idx);
assert!(plan_completed_idx < turn_complete_idx);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn plan_mode_streaming_proposed_plan_tag_split_across_added_and_delta_is_parsed()
-> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let TestCodex {
codex,
session_configured,
..
} = test_codex().build(&server).await?;
let added_text = "Intro\n<proposed";
let deltas = ["_plan>\n- Step 1\n</proposed_plan>\nOutro"];
let full_message = format!("{added_text}{}", deltas.concat());
let mut events = vec![
ev_response_created("resp-1"),
ev_message_item_added("msg-1", added_text),
];
for delta in deltas {
events.push(ev_output_text_delta(delta));
}
events.push(ev_assistant_message("msg-1", &full_message));
events.push(ev_completed("resp-1"));
mount_sse_once(&server, sse(events)).await;
let collaboration_mode = CollaborationMode {
mode: ModeKind::Plan,
settings: Settings {
model: session_configured.model.clone(),
reasoning_effort: None,
developer_instructions: None,
},
};
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "please plan".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: std::env::current_dir()?,
approval_policy: codex_protocol::protocol::AskForApproval::Never,
sandbox_policy: codex_protocol::protocol::SandboxPolicy::DangerFullAccess,
model: session_configured.model.clone(),
effort: None,
summary: codex_protocol::config_types::ReasoningSummary::Auto,
collaboration_mode: Some(collaboration_mode),
personality: None,
})
.await?;
let mut agent_started = None;
let mut agent_completed = None;
let mut plan_started = None;
let mut plan_completed = None;
let mut agent_deltas = Vec::new();
let mut plan_deltas = Vec::new();
loop {
let ev = wait_for_event(&codex, |_| true).await;
match ev {
EventMsg::ItemStarted(ItemStartedEvent {
item: TurnItem::AgentMessage(item),
..
}) => agent_started = Some(item),
EventMsg::ItemStarted(ItemStartedEvent {
item: TurnItem::Plan(item),
..
}) => plan_started = Some(item),
EventMsg::AgentMessageContentDelta(event) => agent_deltas.push(event.delta),
EventMsg::PlanDelta(event) => plan_deltas.push(event.delta),
EventMsg::ItemCompleted(ItemCompletedEvent {
item: TurnItem::AgentMessage(item),
..
}) => agent_completed = Some(item),
EventMsg::ItemCompleted(ItemCompletedEvent {
item: TurnItem::Plan(item),
..
}) => plan_completed = Some(item),
EventMsg::TurnComplete(_) => break,
_ => {}
}
}
let agent_started = agent_started.expect("agent item start should be emitted");
let agent_completed = agent_completed.expect("agent item completion should be emitted");
let plan_started = plan_started.expect("plan item start should be emitted");
let plan_completed = plan_completed.expect("plan item completion should be emitted");
let agent_started_text: String = agent_started
.content
.iter()
.map(|entry| match entry {
AgentMessageContent::Text { text } => text.as_str(),
})
.collect();
let agent_completed_text: String = agent_completed
.content
.iter()
.map(|entry| match entry {
AgentMessageContent::Text { text } => text.as_str(),
})
.collect();
assert_eq!(agent_started_text, "");
assert_eq!(agent_deltas.concat(), "Intro\nOutro");
assert_eq!(agent_completed_text, "Intro\nOutro");
assert_eq!(plan_started.text, "");
assert_eq!(plan_deltas.concat(), "- Step 1\n");
assert_eq!(plan_completed.text, "- Step 1\n");
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn plan_mode_handles_missing_plan_close_tag() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));

View file

@ -0,0 +1,6 @@
load("//:defs.bzl", "codex_rust_crate")
codex_rust_crate(
name = "stream-parser",
crate_name = "codex_utils_stream_parser",
)

View file

@ -0,0 +1,11 @@
[package]
name = "codex-utils-stream-parser"
version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dev-dependencies]
pretty_assertions = { workspace = true }

View file

@ -0,0 +1,97 @@
# codex-utils-stream-parser
Small, dependency-free utilities for parsing streamed text incrementally.
**Disclaimer**: This code is pretty complex and Codex did not manage to write it so before updating the code, make
sure to deeply understand it and don't blindly trust Codex on it. Feel free to update the documentation as you
modify the code
## What it provides
- `StreamTextParser`: trait for incremental parsers that consume string chunks
- `InlineHiddenTagParser<T>`: generic parser that hides inline tags and extracts their contents
- `CitationStreamParser`: convenience wrapper for `<oai-mem-citation>...</oai-mem-citation>`
- `strip_citations(...)`: one-shot helper for non-streamed strings
- `Utf8StreamParser<P>`: adapter for raw `&[u8]` streams that may split UTF-8 code points
## Why this exists
Some model outputs arrive as a stream and may contain hidden markup (for example
`<oai-mem-citation>...</oai-mem-citation>`) split across chunk boundaries. Parsing each chunk
independently is incorrect because tags can be split (`<oai-mem-` + `citation>`).
This crate keeps parser state across chunks, returns visible text safe to render
immediately, and extracts hidden payloads separately.
## Example: citation streaming
```rust
use codex_utils_stream_parser::CitationStreamParser;
use codex_utils_stream_parser::StreamTextParser;
let mut parser = CitationStreamParser::new();
let first = parser.push_str("Hello <oai-mem-");
assert_eq!(first.visible_text, "Hello ");
assert!(first.extracted.is_empty());
let second = parser.push_str("citation>doc A</oai-mem-citation> world");
assert_eq!(second.visible_text, " world");
assert_eq!(second.extracted, vec!["doc A".to_string()]);
let tail = parser.finish();
assert!(tail.visible_text.is_empty());
assert!(tail.extracted.is_empty());
```
## Example: raw byte streaming with split UTF-8 code points
```rust
use codex_utils_stream_parser::CitationStreamParser;
use codex_utils_stream_parser::Utf8StreamParser;
# fn demo() -> Result<(), codex_utils_stream_parser::Utf8StreamParserError> {
let mut parser = Utf8StreamParser::new(CitationStreamParser::new());
// "é" split across chunks: 0xC3 + 0xA9
let first = parser.push_bytes(&[b'H', 0xC3])?;
assert_eq!(first.visible_text, "H");
let second = parser.push_bytes(&[0xA9, b'!'])?;
assert_eq!(second.visible_text, "é!");
let tail = parser.finish()?;
assert!(tail.visible_text.is_empty());
# Ok(())
# }
```
## Example: custom hidden tags
```rust
use codex_utils_stream_parser::InlineHiddenTagParser;
use codex_utils_stream_parser::InlineTagSpec;
use codex_utils_stream_parser::StreamTextParser;
#[derive(Clone, Debug, PartialEq, Eq)]
enum Tag {
Secret,
}
let mut parser = InlineHiddenTagParser::new(vec![InlineTagSpec {
tag: Tag::Secret,
open: "<secret>",
close: "</secret>",
}]);
let out = parser.push_str("a<secret>x</secret>b");
assert_eq!(out.visible_text, "ab");
assert_eq!(out.extracted.len(), 1);
assert_eq!(out.extracted[0].content, "x");
```
## Known limitations
- Tags are matched literally and case-sensitively
- No nested tag support
- A stream can return empty objects.

View file

@ -0,0 +1,130 @@
use crate::CitationStreamParser;
use crate::ProposedPlanParser;
use crate::ProposedPlanSegment;
use crate::StreamTextChunk;
use crate::StreamTextParser;
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct AssistantTextChunk {
pub visible_text: String,
pub citations: Vec<String>,
pub plan_segments: Vec<ProposedPlanSegment>,
}
impl AssistantTextChunk {
pub fn is_empty(&self) -> bool {
self.visible_text.is_empty() && self.citations.is_empty() && self.plan_segments.is_empty()
}
}
/// Parses assistant text streaming markup in one pass:
/// - strips `<oai-mem-citation>` tags and extracts citation payloads
/// - in plan mode, also strips `<proposed_plan>` blocks and emits plan segments
#[derive(Debug, Default)]
pub struct AssistantTextStreamParser {
plan_mode: bool,
citations: CitationStreamParser,
plan: ProposedPlanParser,
}
impl AssistantTextStreamParser {
pub fn new(plan_mode: bool) -> Self {
Self {
plan_mode,
..Self::default()
}
}
pub fn push_str(&mut self, chunk: &str) -> AssistantTextChunk {
let citation_chunk = self.citations.push_str(chunk);
let mut out = self.parse_visible_text(citation_chunk.visible_text);
out.citations = citation_chunk.extracted;
out
}
pub fn finish(&mut self) -> AssistantTextChunk {
let citation_chunk = self.citations.finish();
let mut out = self.parse_visible_text(citation_chunk.visible_text);
if self.plan_mode {
let mut tail = self.plan.finish();
if !tail.is_empty() {
out.visible_text.push_str(&tail.visible_text);
out.plan_segments.append(&mut tail.extracted);
}
}
out.citations = citation_chunk.extracted;
out
}
fn parse_visible_text(&mut self, visible_text: String) -> AssistantTextChunk {
if !self.plan_mode {
return AssistantTextChunk {
visible_text,
..AssistantTextChunk::default()
};
}
let plan_chunk: StreamTextChunk<ProposedPlanSegment> = self.plan.push_str(&visible_text);
AssistantTextChunk {
visible_text: plan_chunk.visible_text,
plan_segments: plan_chunk.extracted,
..AssistantTextChunk::default()
}
}
}
#[cfg(test)]
mod tests {
use super::AssistantTextStreamParser;
use crate::ProposedPlanSegment;
use pretty_assertions::assert_eq;
#[test]
fn parses_citations_across_seed_and_delta_boundaries() {
let mut parser = AssistantTextStreamParser::new(false);
let seeded = parser.push_str("hello <oai-mem-citation>doc");
let parsed = parser.push_str("1</oai-mem-citation> world");
let tail = parser.finish();
assert_eq!(seeded.visible_text, "hello ");
assert_eq!(seeded.citations, Vec::<String>::new());
assert_eq!(parsed.visible_text, " world");
assert_eq!(parsed.citations, vec!["doc1".to_string()]);
assert_eq!(tail.visible_text, "");
assert_eq!(tail.citations, Vec::<String>::new());
}
#[test]
fn parses_plan_segments_after_citation_stripping() {
let mut parser = AssistantTextStreamParser::new(true);
let seeded = parser.push_str("Intro\n<proposed");
let parsed = parser.push_str("_plan>\n- step <oai-mem-citation>doc</oai-mem-citation>\n");
let tail = parser.push_str("</proposed_plan>\nOutro");
let finish = parser.finish();
assert_eq!(seeded.visible_text, "Intro\n");
assert_eq!(
seeded.plan_segments,
vec![ProposedPlanSegment::Normal("Intro\n".to_string())]
);
assert_eq!(parsed.visible_text, "");
assert_eq!(parsed.citations, vec!["doc".to_string()]);
assert_eq!(
parsed.plan_segments,
vec![
ProposedPlanSegment::ProposedPlanStart,
ProposedPlanSegment::ProposedPlanDelta("- step \n".to_string()),
]
);
assert_eq!(tail.visible_text, "Outro");
assert_eq!(
tail.plan_segments,
vec![
ProposedPlanSegment::ProposedPlanEnd,
ProposedPlanSegment::Normal("Outro".to_string()),
]
);
assert!(finish.is_empty());
}
}

View file

@ -0,0 +1,179 @@
use crate::InlineHiddenTagParser;
use crate::InlineTagSpec;
use crate::StreamTextChunk;
use crate::StreamTextParser;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum CitationTag {
Citation,
}
const CITATION_OPEN: &str = "<oai-mem-citation>";
const CITATION_CLOSE: &str = "</oai-mem-citation>";
/// Stream parser for `<oai-mem-citation>...</oai-mem-citation>` tags.
///
/// This is a thin convenience wrapper around [`InlineHiddenTagParser`]. It returns citation bodies
/// as plain strings and omits the citation tags from visible text.
///
/// Matching is literal and non-nested. If EOF is reached before a closing
/// `</oai-mem-citation>`, the parser auto-closes the tag and returns the buffered body as an
/// extracted citation.
#[derive(Debug)]
pub struct CitationStreamParser {
inner: InlineHiddenTagParser<CitationTag>,
}
impl CitationStreamParser {
pub fn new() -> Self {
Self {
inner: InlineHiddenTagParser::new(vec![InlineTagSpec {
tag: CitationTag::Citation,
open: CITATION_OPEN,
close: CITATION_CLOSE,
}]),
}
}
}
impl Default for CitationStreamParser {
fn default() -> Self {
Self::new()
}
}
impl StreamTextParser for CitationStreamParser {
type Extracted = String;
fn push_str(&mut self, chunk: &str) -> StreamTextChunk<Self::Extracted> {
let inner = self.inner.push_str(chunk);
StreamTextChunk {
visible_text: inner.visible_text,
extracted: inner.extracted.into_iter().map(|tag| tag.content).collect(),
}
}
fn finish(&mut self) -> StreamTextChunk<Self::Extracted> {
let inner = self.inner.finish();
StreamTextChunk {
visible_text: inner.visible_text,
extracted: inner.extracted.into_iter().map(|tag| tag.content).collect(),
}
}
}
/// Strip citation tags from a complete string and return `(visible_text, citations)`.
///
/// This uses [`CitationStreamParser`] internally, so it inherits the same semantics:
/// literal, non-nested matching and auto-closing unterminated citations at EOF.
pub fn strip_citations(text: &str) -> (String, Vec<String>) {
let mut parser = CitationStreamParser::new();
let mut out = parser.push_str(text);
let tail = parser.finish();
out.visible_text.push_str(&tail.visible_text);
out.extracted.extend(tail.extracted);
(out.visible_text, out.extracted)
}
#[cfg(test)]
mod tests {
use super::CitationStreamParser;
use super::strip_citations;
use crate::StreamTextChunk;
use crate::StreamTextParser;
use pretty_assertions::assert_eq;
fn collect_chunks<P>(parser: &mut P, chunks: &[&str]) -> StreamTextChunk<P::Extracted>
where
P: StreamTextParser,
{
let mut all = StreamTextChunk::default();
for chunk in chunks {
let next = parser.push_str(chunk);
all.visible_text.push_str(&next.visible_text);
all.extracted.extend(next.extracted);
}
let tail = parser.finish();
all.visible_text.push_str(&tail.visible_text);
all.extracted.extend(tail.extracted);
all
}
#[test]
fn citation_parser_streams_across_chunk_boundaries() {
let mut parser = CitationStreamParser::new();
let out = collect_chunks(
&mut parser,
&[
"Hello <oai-mem-",
"citation>source A</oai-mem-",
"citation> world",
],
);
assert_eq!(out.visible_text, "Hello world");
assert_eq!(out.extracted, vec!["source A".to_string()]);
}
#[test]
fn citation_parser_buffers_partial_open_tag_prefix() {
let mut parser = CitationStreamParser::new();
let first = parser.push_str("abc <oai-mem-");
assert_eq!(first.visible_text, "abc ");
assert_eq!(first.extracted, Vec::<String>::new());
let second = parser.push_str("citation>x</oai-mem-citation>z");
let tail = parser.finish();
assert_eq!(second.visible_text, "z");
assert_eq!(second.extracted, vec!["x".to_string()]);
assert!(tail.is_empty());
}
#[test]
fn citation_parser_auto_closes_unterminated_tag_on_finish() {
let mut parser = CitationStreamParser::new();
let out = collect_chunks(&mut parser, &["x<oai-mem-citation>source"]);
assert_eq!(out.visible_text, "x");
assert_eq!(out.extracted, vec!["source".to_string()]);
}
#[test]
fn citation_parser_preserves_partial_open_tag_at_eof_if_not_a_full_tag() {
let mut parser = CitationStreamParser::new();
let out = collect_chunks(&mut parser, &["hello <oai-mem-"]);
assert_eq!(out.visible_text, "hello <oai-mem-");
assert_eq!(out.extracted, Vec::<String>::new());
}
#[test]
fn strip_citations_collects_all_citations() {
let (visible, citations) = strip_citations(
"a<oai-mem-citation>one</oai-mem-citation>b<oai-mem-citation>two</oai-mem-citation>c",
);
assert_eq!(visible, "abc");
assert_eq!(citations, vec!["one".to_string(), "two".to_string()]);
}
#[test]
fn strip_citations_auto_closes_unterminated_citation_at_eof() {
let (visible, citations) = strip_citations("x<oai-mem-citation>y");
assert_eq!(visible, "x");
assert_eq!(citations, vec!["y".to_string()]);
}
#[test]
fn citation_parser_does_not_support_nested_tags() {
let (visible, citations) = strip_citations(
"a<oai-mem-citation>x<oai-mem-citation>y</oai-mem-citation>z</oai-mem-citation>b",
);
assert_eq!(visible, "az</oai-mem-citation>b");
assert_eq!(citations, vec!["x<oai-mem-citation>y".to_string()]);
}
}

View file

@ -0,0 +1,323 @@
use crate::StreamTextChunk;
use crate::StreamTextParser;
/// One hidden inline tag extracted by [`InlineHiddenTagParser`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExtractedInlineTag<T> {
pub tag: T,
pub content: String,
}
/// Literal tag specification used by [`InlineHiddenTagParser`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct InlineTagSpec<T> {
pub tag: T,
pub open: &'static str,
pub close: &'static str,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct ActiveTag<T> {
tag: T,
close: &'static str,
content: String,
}
/// Generic streaming parser that hides configured inline tags and extracts their contents.
///
/// Example:
/// - input: `hello <oai-mem-citation>doc A</oai-mem-citation> world`
/// - visible output: `hello world`
/// - extracted: `["doc A"]`
///
/// Matching is literal and non-nested. If EOF is reached while a tag is still open, the parser
/// auto-closes it and returns the buffered content as extracted data.
#[derive(Debug)]
pub struct InlineHiddenTagParser<T>
where
T: Clone + Eq,
{
specs: Vec<InlineTagSpec<T>>,
pending: String,
active: Option<ActiveTag<T>>,
}
impl<T> InlineHiddenTagParser<T>
where
T: Clone + Eq,
{
/// Create a parser for one or more hidden inline tags.
pub fn new(specs: Vec<InlineTagSpec<T>>) -> Self {
assert!(
!specs.is_empty(),
"InlineHiddenTagParser requires at least one tag spec"
);
for spec in &specs {
assert!(
!spec.open.is_empty(),
"InlineHiddenTagParser requires non-empty open delimiters"
);
assert!(
!spec.close.is_empty(),
"InlineHiddenTagParser requires non-empty close delimiters"
);
}
Self {
specs,
pending: String::new(),
active: None,
}
}
fn find_next_open(&self) -> Option<(usize, usize)> {
self.specs
.iter()
.enumerate()
.filter_map(|(idx, spec)| {
self.pending
.find(spec.open)
.map(|pos| (pos, spec.open.len(), idx))
})
.min_by(|(pos_a, len_a, idx_a), (pos_b, len_b, idx_b)| {
pos_a
.cmp(pos_b)
.then_with(|| len_b.cmp(len_a))
.then_with(|| idx_a.cmp(idx_b))
})
.map(|(pos, _len, idx)| (pos, idx))
}
fn max_open_prefix_suffix_len(&self) -> usize {
self.specs
.iter()
.map(|spec| longest_suffix_prefix_len(&self.pending, spec.open))
.max()
.map_or(0, std::convert::identity)
}
fn push_visible_prefix(out: &mut StreamTextChunk<ExtractedInlineTag<T>>, pending: &str) {
if !pending.is_empty() {
out.visible_text.push_str(pending);
}
}
fn drain_visible_to_suffix_match(
&mut self,
out: &mut StreamTextChunk<ExtractedInlineTag<T>>,
keep_suffix_len: usize,
) {
let take = self.pending.len().saturating_sub(keep_suffix_len);
if take == 0 {
return;
}
Self::push_visible_prefix(out, &self.pending[..take]);
self.pending.drain(..take);
}
}
impl<T> StreamTextParser for InlineHiddenTagParser<T>
where
T: Clone + Eq,
{
type Extracted = ExtractedInlineTag<T>;
fn push_str(&mut self, chunk: &str) -> StreamTextChunk<Self::Extracted> {
self.pending.push_str(chunk);
let mut out = StreamTextChunk::default();
loop {
if let Some(close) = self.active.as_ref().map(|active| active.close) {
if let Some(close_idx) = self.pending.find(close) {
let Some(mut active) = self.active.take() else {
continue;
};
active.content.push_str(&self.pending[..close_idx]);
out.extracted.push(ExtractedInlineTag {
tag: active.tag,
content: active.content,
});
let close_len = close.len();
self.pending.drain(..close_idx + close_len);
continue;
}
let keep = longest_suffix_prefix_len(&self.pending, close);
let take = self.pending.len().saturating_sub(keep);
if take > 0 {
if let Some(active) = self.active.as_mut() {
active.content.push_str(&self.pending[..take]);
}
self.pending.drain(..take);
}
break;
}
if let Some((open_idx, spec_idx)) = self.find_next_open() {
Self::push_visible_prefix(&mut out, &self.pending[..open_idx]);
let spec = &self.specs[spec_idx];
let open_len = spec.open.len();
self.pending.drain(..open_idx + open_len);
self.active = Some(ActiveTag {
tag: spec.tag.clone(),
close: spec.close,
content: String::new(),
});
continue;
}
let keep = self.max_open_prefix_suffix_len();
self.drain_visible_to_suffix_match(&mut out, keep);
break;
}
out
}
fn finish(&mut self) -> StreamTextChunk<Self::Extracted> {
let mut out = StreamTextChunk::default();
if let Some(mut active) = self.active.take() {
if !self.pending.is_empty() {
active.content.push_str(&self.pending);
self.pending.clear();
}
out.extracted.push(ExtractedInlineTag {
tag: active.tag,
content: active.content,
});
return out;
}
if !self.pending.is_empty() {
out.visible_text.push_str(&self.pending);
self.pending.clear();
}
out
}
}
fn longest_suffix_prefix_len(s: &str, needle: &str) -> usize {
let max = s.len().min(needle.len().saturating_sub(1));
for k in (1..=max).rev() {
if needle.is_char_boundary(k) && s.ends_with(&needle[..k]) {
return k;
}
}
0
}
#[cfg(test)]
mod tests {
use super::InlineHiddenTagParser;
use super::InlineTagSpec;
use crate::StreamTextChunk;
use crate::StreamTextParser;
use pretty_assertions::assert_eq;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Tag {
A,
B,
}
fn collect_chunks<P>(parser: &mut P, chunks: &[&str]) -> StreamTextChunk<P::Extracted>
where
P: StreamTextParser,
{
let mut all = StreamTextChunk::default();
for chunk in chunks {
let next = parser.push_str(chunk);
all.visible_text.push_str(&next.visible_text);
all.extracted.extend(next.extracted);
}
let tail = parser.finish();
all.visible_text.push_str(&tail.visible_text);
all.extracted.extend(tail.extracted);
all
}
#[test]
fn generic_inline_parser_supports_multiple_tag_types() {
let mut parser = InlineHiddenTagParser::new(vec![
InlineTagSpec {
tag: Tag::A,
open: "<a>",
close: "</a>",
},
InlineTagSpec {
tag: Tag::B,
open: "<b>",
close: "</b>",
},
]);
let out = collect_chunks(&mut parser, &["1<a>x</a>2<b>y</b>3"]);
assert_eq!(out.visible_text, "123");
assert_eq!(out.extracted.len(), 2);
assert_eq!(out.extracted[0].tag, Tag::A);
assert_eq!(out.extracted[0].content, "x");
assert_eq!(out.extracted[1].tag, Tag::B);
assert_eq!(out.extracted[1].content, "y");
}
#[test]
fn generic_inline_parser_supports_non_ascii_tag_delimiters() {
let mut parser = InlineHiddenTagParser::new(vec![InlineTagSpec {
tag: Tag::A,
open: "<é>",
close: "</é>",
}]);
let out = collect_chunks(&mut parser, &["a<", "é>中</", "é>b"]);
assert_eq!(out.visible_text, "ab");
assert_eq!(out.extracted.len(), 1);
assert_eq!(out.extracted[0].tag, Tag::A);
assert_eq!(out.extracted[0].content, "");
}
#[test]
fn generic_inline_parser_prefers_longest_opener_at_same_offset() {
let mut parser = InlineHiddenTagParser::new(vec![
InlineTagSpec {
tag: Tag::A,
open: "<a>",
close: "</a>",
},
InlineTagSpec {
tag: Tag::B,
open: "<ab>",
close: "</ab>",
},
]);
let out = collect_chunks(&mut parser, &["x<ab>y</ab>z"]);
assert_eq!(out.visible_text, "xz");
assert_eq!(out.extracted.len(), 1);
assert_eq!(out.extracted[0].tag, Tag::B);
assert_eq!(out.extracted[0].content, "y");
}
#[test]
#[should_panic(expected = "non-empty open delimiters")]
fn generic_inline_parser_rejects_empty_open_delimiter() {
let _ = InlineHiddenTagParser::new(vec![InlineTagSpec {
tag: Tag::A,
open: "",
close: "</a>",
}]);
}
#[test]
#[should_panic(expected = "non-empty close delimiters")]
fn generic_inline_parser_rejects_empty_close_delimiter() {
let _ = InlineHiddenTagParser::new(vec![InlineTagSpec {
tag: Tag::A,
open: "<a>",
close: "",
}]);
}
}

View file

@ -0,0 +1,23 @@
mod assistant_text;
mod citation;
mod inline_hidden_tag;
mod proposed_plan;
mod stream_text;
mod tagged_line_parser;
mod utf8_stream;
pub use assistant_text::AssistantTextChunk;
pub use assistant_text::AssistantTextStreamParser;
pub use citation::CitationStreamParser;
pub use citation::strip_citations;
pub use inline_hidden_tag::ExtractedInlineTag;
pub use inline_hidden_tag::InlineHiddenTagParser;
pub use inline_hidden_tag::InlineTagSpec;
pub use proposed_plan::ProposedPlanParser;
pub use proposed_plan::ProposedPlanSegment;
pub use proposed_plan::extract_proposed_plan_text;
pub use proposed_plan::strip_proposed_plan_blocks;
pub use stream_text::StreamTextChunk;
pub use stream_text::StreamTextParser;
pub use utf8_stream::Utf8StreamParser;
pub use utf8_stream::Utf8StreamParserError;

View file

@ -0,0 +1,212 @@
use crate::StreamTextChunk;
use crate::StreamTextParser;
use crate::tagged_line_parser::TagSpec;
use crate::tagged_line_parser::TaggedLineParser;
use crate::tagged_line_parser::TaggedLineSegment;
const OPEN_TAG: &str = "<proposed_plan>";
const CLOSE_TAG: &str = "</proposed_plan>";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PlanTag {
ProposedPlan,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProposedPlanSegment {
Normal(String),
ProposedPlanStart,
ProposedPlanDelta(String),
ProposedPlanEnd,
}
/// Parser for `<proposed_plan>` blocks emitted in plan mode.
///
/// Implements [`StreamTextParser`] so callers can consume:
/// - `visible_text`: normal assistant text with plan blocks removed
/// - `extracted`: ordered plan segments (includes `Normal(...)` segments for ordering fidelity)
#[derive(Debug)]
pub struct ProposedPlanParser {
parser: TaggedLineParser<PlanTag>,
}
impl ProposedPlanParser {
pub fn new() -> Self {
Self {
parser: TaggedLineParser::new(vec![TagSpec {
open: OPEN_TAG,
close: CLOSE_TAG,
tag: PlanTag::ProposedPlan,
}]),
}
}
}
impl Default for ProposedPlanParser {
fn default() -> Self {
Self::new()
}
}
impl StreamTextParser for ProposedPlanParser {
type Extracted = ProposedPlanSegment;
fn push_str(&mut self, chunk: &str) -> StreamTextChunk<Self::Extracted> {
map_segments(self.parser.parse(chunk))
}
fn finish(&mut self) -> StreamTextChunk<Self::Extracted> {
map_segments(self.parser.finish())
}
}
fn map_segments(segments: Vec<TaggedLineSegment<PlanTag>>) -> StreamTextChunk<ProposedPlanSegment> {
let mut out = StreamTextChunk::default();
for segment in segments {
let mapped = match segment {
TaggedLineSegment::Normal(text) => ProposedPlanSegment::Normal(text),
TaggedLineSegment::TagStart(PlanTag::ProposedPlan) => {
ProposedPlanSegment::ProposedPlanStart
}
TaggedLineSegment::TagDelta(PlanTag::ProposedPlan, text) => {
ProposedPlanSegment::ProposedPlanDelta(text)
}
TaggedLineSegment::TagEnd(PlanTag::ProposedPlan) => {
ProposedPlanSegment::ProposedPlanEnd
}
};
if let ProposedPlanSegment::Normal(text) = &mapped {
out.visible_text.push_str(text);
}
out.extracted.push(mapped);
}
out
}
pub fn strip_proposed_plan_blocks(text: &str) -> String {
let mut parser = ProposedPlanParser::new();
let mut out = parser.push_str(text).visible_text;
out.push_str(&parser.finish().visible_text);
out
}
pub fn extract_proposed_plan_text(text: &str) -> Option<String> {
let mut parser = ProposedPlanParser::new();
let mut plan_text = String::new();
let mut saw_plan_block = false;
for segment in parser
.push_str(text)
.extracted
.into_iter()
.chain(parser.finish().extracted)
{
match segment {
ProposedPlanSegment::ProposedPlanStart => {
saw_plan_block = true;
plan_text.clear();
}
ProposedPlanSegment::ProposedPlanDelta(delta) => {
plan_text.push_str(&delta);
}
ProposedPlanSegment::ProposedPlanEnd | ProposedPlanSegment::Normal(_) => {}
}
}
saw_plan_block.then_some(plan_text)
}
#[cfg(test)]
mod tests {
use super::ProposedPlanParser;
use super::ProposedPlanSegment;
use super::extract_proposed_plan_text;
use super::strip_proposed_plan_blocks;
use crate::StreamTextChunk;
use crate::StreamTextParser;
use pretty_assertions::assert_eq;
fn collect_chunks<P>(parser: &mut P, chunks: &[&str]) -> StreamTextChunk<P::Extracted>
where
P: StreamTextParser,
{
let mut all = StreamTextChunk::default();
for chunk in chunks {
let next = parser.push_str(chunk);
all.visible_text.push_str(&next.visible_text);
all.extracted.extend(next.extracted);
}
let tail = parser.finish();
all.visible_text.push_str(&tail.visible_text);
all.extracted.extend(tail.extracted);
all
}
#[test]
fn streams_proposed_plan_segments_and_visible_text() {
let mut parser = ProposedPlanParser::new();
let out = collect_chunks(
&mut parser,
&[
"Intro text\n<prop",
"osed_plan>\n- step 1\n",
"</proposed_plan>\nOutro",
],
);
assert_eq!(out.visible_text, "Intro text\nOutro");
assert_eq!(
out.extracted,
vec![
ProposedPlanSegment::Normal("Intro text\n".to_string()),
ProposedPlanSegment::ProposedPlanStart,
ProposedPlanSegment::ProposedPlanDelta("- step 1\n".to_string()),
ProposedPlanSegment::ProposedPlanEnd,
ProposedPlanSegment::Normal("Outro".to_string()),
]
);
}
#[test]
fn preserves_non_tag_lines() {
let mut parser = ProposedPlanParser::new();
let out = collect_chunks(&mut parser, &[" <proposed_plan> extra\n"]);
assert_eq!(out.visible_text, " <proposed_plan> extra\n");
assert_eq!(
out.extracted,
vec![ProposedPlanSegment::Normal(
" <proposed_plan> extra\n".to_string()
)]
);
}
#[test]
fn closes_unterminated_plan_block_on_finish() {
let mut parser = ProposedPlanParser::new();
let out = collect_chunks(&mut parser, &["<proposed_plan>\n- step 1\n"]);
assert_eq!(out.visible_text, "");
assert_eq!(
out.extracted,
vec![
ProposedPlanSegment::ProposedPlanStart,
ProposedPlanSegment::ProposedPlanDelta("- step 1\n".to_string()),
ProposedPlanSegment::ProposedPlanEnd,
]
);
}
#[test]
fn strips_proposed_plan_blocks_from_text() {
let text = "before\n<proposed_plan>\n- step\n</proposed_plan>\nafter";
assert_eq!(strip_proposed_plan_blocks(text), "before\nafter");
}
#[test]
fn extracts_proposed_plan_text() {
let text = "before\n<proposed_plan>\n- step\n</proposed_plan>\nafter";
assert_eq!(
extract_proposed_plan_text(text),
Some("- step\n".to_string())
);
}
}

View file

@ -0,0 +1,36 @@
/// Incremental parser result for one pushed chunk (or final flush).
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StreamTextChunk<T> {
/// Text safe to render immediately.
pub visible_text: String,
/// Hidden payloads extracted from the chunk.
pub extracted: Vec<T>,
}
impl<T> Default for StreamTextChunk<T> {
fn default() -> Self {
Self {
visible_text: String::new(),
extracted: Vec::new(),
}
}
}
impl<T> StreamTextChunk<T> {
/// Returns true when no visible text or extracted payloads were produced.
pub fn is_empty(&self) -> bool {
self.visible_text.is_empty() && self.extracted.is_empty()
}
}
/// Trait for parsers that consume streamed text and emit visible text plus extracted payloads.
pub trait StreamTextParser {
/// Payload extracted by this parser (for example a citation body).
type Extracted;
/// Feed a new text chunk.
fn push_str(&mut self, chunk: &str) -> StreamTextChunk<Self::Extracted>;
/// Flush any buffered state at end-of-stream (or end-of-item).
fn finish(&mut self) -> StreamTextChunk<Self::Extracted>;
}

View file

@ -1,9 +1,7 @@
//! Line-based tag block parsing for streamed text.
//!
//! The parser buffers each line until it can disprove that the line is a tag,
//! which is required for tags that must appear alone on a line. For example,
//! Proposed Plan output uses `<proposed_plan>` and `</proposed_plan>` tags
//! on their own lines so clients can stream plan content separately.
//! which is required for tags that must appear alone on a line.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct TagSpec<T> {
@ -21,17 +19,6 @@ pub(crate) enum TaggedLineSegment<T> {
}
/// Stateful line parser that splits input into normal text vs tag blocks.
///
/// How it works:
/// - While reading a line, we buffer characters until the line either finishes
/// (`\n`) or stops matching any tag prefix (after `trim_start`).
/// - If it stops matching a tag prefix, the buffered line is immediately
/// emitted as text and we continue in "plain text" mode until the next
/// newline.
/// - When a full line is available, we compare it to the open/close tags; tag
/// lines emit TagStart/TagEnd, otherwise the line is emitted as text.
/// - `finish()` flushes any buffered line and auto-closes an unterminated tag,
/// which keeps streaming resilient to missing closing tags.
#[derive(Debug, Default)]
pub(crate) struct TaggedLineParser<T>
where
@ -56,7 +43,6 @@ where
}
}
/// Parse a streamed delta into line-aware segments.
pub(crate) fn parse(&mut self, delta: &str) -> Vec<TaggedLineSegment<T>> {
let mut segments = Vec::new();
let mut run = String::new();
@ -75,7 +61,6 @@ where
if slug.is_empty() || self.is_tag_prefix(slug) {
continue;
}
// This line cannot be a tag line, so flush it immediately.
let buffered = std::mem::take(&mut self.line_buffer);
self.detect_tag = false;
self.push_text(buffered, &mut segments);
@ -96,7 +81,6 @@ where
segments
}
/// Flush any buffered text and close an unterminated tag block.
pub(crate) fn finish(&mut self) -> Vec<TaggedLineSegment<T>> {
let mut segments = Vec::new();
if !self.line_buffer.is_empty() {
@ -115,7 +99,6 @@ where
push_segment(&mut segments, TaggedLineSegment::TagEnd(tag));
self.active_tag = None;
} else {
// The buffered line never proved to be a tag line.
self.push_text(buffered, &mut segments);
}
}
@ -210,12 +193,8 @@ where
}
segments.push(TaggedLineSegment::TagDelta(tag, delta));
}
TaggedLineSegment::TagStart(tag) => {
segments.push(TaggedLineSegment::TagStart(tag));
}
TaggedLineSegment::TagEnd(tag) => {
segments.push(TaggedLineSegment::TagEnd(tag));
}
TaggedLineSegment::TagStart(tag) => segments.push(TaggedLineSegment::TagStart(tag)),
TaggedLineSegment::TagEnd(tag) => segments.push(TaggedLineSegment::TagEnd(tag)),
}
}
@ -267,48 +246,4 @@ mod tests {
vec![TaggedLineSegment::Normal("<tag> extra\n".to_string())]
);
}
#[test]
fn closes_unterminated_tag_on_finish() {
let mut parser = parser();
let mut segments = parser.parse("<tag>\nline\n");
segments.extend(parser.finish());
assert_eq!(
segments,
vec![
TaggedLineSegment::TagStart(Tag::Block),
TaggedLineSegment::TagDelta(Tag::Block, "line\n".to_string()),
TaggedLineSegment::TagEnd(Tag::Block),
]
);
}
#[test]
fn accepts_tags_with_trailing_whitespace() {
let mut parser = parser();
let mut segments = parser.parse("<tag> \nline\n</tag> \n");
segments.extend(parser.finish());
assert_eq!(
segments,
vec![
TaggedLineSegment::TagStart(Tag::Block),
TaggedLineSegment::TagDelta(Tag::Block, "line\n".to_string()),
TaggedLineSegment::TagEnd(Tag::Block),
]
);
}
#[test]
fn passes_through_plain_text() {
let mut parser = parser();
let mut segments = parser.parse("plain text\n");
segments.extend(parser.finish());
assert_eq!(
segments,
vec![TaggedLineSegment::Normal("plain text\n".to_string())]
);
}
}

View file

@ -0,0 +1,333 @@
use std::error::Error;
use std::fmt;
use crate::StreamTextChunk;
use crate::StreamTextParser;
/// Error returned by [`Utf8StreamParser`] when streamed bytes are not valid UTF-8.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Utf8StreamParserError {
/// The provided bytes contain an invalid UTF-8 sequence.
InvalidUtf8 {
/// Byte offset in the parser's buffered bytes where decoding failed.
valid_up_to: usize,
/// Length in bytes of the invalid sequence.
error_len: usize,
},
/// EOF was reached with a buffered partial UTF-8 code point.
IncompleteUtf8AtEof,
}
impl fmt::Display for Utf8StreamParserError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InvalidUtf8 {
valid_up_to,
error_len,
} => write!(
f,
"invalid UTF-8 in streamed bytes at offset {valid_up_to} (error length {error_len})"
),
Self::IncompleteUtf8AtEof => {
write!(f, "incomplete UTF-8 code point at end of stream")
}
}
}
}
impl Error for Utf8StreamParserError {}
/// Wraps a [`StreamTextParser`] and accepts raw bytes, buffering partial UTF-8 code points.
///
/// This is useful when upstream data arrives as `&[u8]` and a code point may be split across
/// chunk boundaries (for example `0xC3` followed by `0xA9` for `é`).
#[derive(Debug)]
pub struct Utf8StreamParser<P> {
inner: P,
pending_utf8: Vec<u8>,
}
impl<P> Utf8StreamParser<P>
where
P: StreamTextParser,
{
pub fn new(inner: P) -> Self {
Self {
inner,
pending_utf8: Vec::new(),
}
}
/// Feed a raw byte chunk.
///
/// If the chunk contains invalid UTF-8, this returns an error and rolls back the entire
/// pushed chunk so callers can decide how to recover without the inner parser seeing a partial
/// prefix from that chunk.
pub fn push_bytes(
&mut self,
chunk: &[u8],
) -> Result<StreamTextChunk<P::Extracted>, Utf8StreamParserError> {
let old_len = self.pending_utf8.len();
self.pending_utf8.extend_from_slice(chunk);
match std::str::from_utf8(&self.pending_utf8) {
Ok(text) => {
let out = self.inner.push_str(text);
self.pending_utf8.clear();
Ok(out)
}
Err(err) => {
if let Some(error_len) = err.error_len() {
self.pending_utf8.truncate(old_len);
return Err(Utf8StreamParserError::InvalidUtf8 {
valid_up_to: err.valid_up_to(),
error_len,
});
}
let valid_up_to = err.valid_up_to();
if valid_up_to == 0 {
return Ok(StreamTextChunk::default());
}
let text = match std::str::from_utf8(&self.pending_utf8[..valid_up_to]) {
Ok(text) => text,
Err(prefix_err) => {
self.pending_utf8.truncate(old_len);
let error_len = prefix_err.error_len().unwrap_or(0);
return Err(Utf8StreamParserError::InvalidUtf8 {
valid_up_to: prefix_err.valid_up_to(),
error_len,
});
}
};
let out = self.inner.push_str(text);
self.pending_utf8.drain(..valid_up_to);
Ok(out)
}
}
}
pub fn finish(&mut self) -> Result<StreamTextChunk<P::Extracted>, Utf8StreamParserError> {
if !self.pending_utf8.is_empty() {
match std::str::from_utf8(&self.pending_utf8) {
Ok(_) => {}
Err(err) => {
if let Some(error_len) = err.error_len() {
return Err(Utf8StreamParserError::InvalidUtf8 {
valid_up_to: err.valid_up_to(),
error_len,
});
}
return Err(Utf8StreamParserError::IncompleteUtf8AtEof);
}
}
}
let mut out = if self.pending_utf8.is_empty() {
StreamTextChunk::default()
} else {
let text = match std::str::from_utf8(&self.pending_utf8) {
Ok(text) => text,
Err(err) => {
let error_len = err.error_len().unwrap_or(0);
return Err(Utf8StreamParserError::InvalidUtf8 {
valid_up_to: err.valid_up_to(),
error_len,
});
}
};
let out = self.inner.push_str(text);
self.pending_utf8.clear();
out
};
let mut tail = self.inner.finish();
out.visible_text.push_str(&tail.visible_text);
out.extracted.append(&mut tail.extracted);
Ok(out)
}
/// Return the wrapped parser if no undecoded UTF-8 bytes are buffered.
///
/// Use [`Self::finish`] first if you want to flush buffered text into the wrapped parser.
pub fn into_inner(self) -> Result<P, Utf8StreamParserError> {
if self.pending_utf8.is_empty() {
return Ok(self.inner);
}
match std::str::from_utf8(&self.pending_utf8) {
Ok(_) => Ok(self.inner),
Err(err) => {
if let Some(error_len) = err.error_len() {
return Err(Utf8StreamParserError::InvalidUtf8 {
valid_up_to: err.valid_up_to(),
error_len,
});
}
Err(Utf8StreamParserError::IncompleteUtf8AtEof)
}
}
}
/// Return the wrapped parser without validating or flushing buffered undecoded bytes.
///
/// This may drop a partial UTF-8 code point that was buffered across chunk boundaries.
pub fn into_inner_lossy(self) -> P {
self.inner
}
}
#[cfg(test)]
mod tests {
use super::Utf8StreamParser;
use super::Utf8StreamParserError;
use crate::CitationStreamParser;
use crate::StreamTextChunk;
use crate::StreamTextParser;
use pretty_assertions::assert_eq;
fn collect_bytes(
parser: &mut Utf8StreamParser<CitationStreamParser>,
chunks: &[&[u8]],
) -> Result<StreamTextChunk<String>, Utf8StreamParserError> {
let mut all = StreamTextChunk::default();
for chunk in chunks {
let next = parser.push_bytes(chunk)?;
all.visible_text.push_str(&next.visible_text);
all.extracted.extend(next.extracted);
}
let tail = parser.finish()?;
all.visible_text.push_str(&tail.visible_text);
all.extracted.extend(tail.extracted);
Ok(all)
}
#[test]
fn utf8_stream_parser_handles_split_code_points_across_chunks() {
let chunks: [&[u8]; 3] = [
b"A\xC3",
b"\xA9<oai-mem-citation>\xE4",
b"\xB8\xAD</oai-mem-citation>Z",
];
let mut parser = Utf8StreamParser::new(CitationStreamParser::new());
let out = match collect_bytes(&mut parser, &chunks) {
Ok(out) => out,
Err(err) => panic!("valid UTF-8 stream should parse: {err}"),
};
assert_eq!(out.visible_text, "AéZ");
assert_eq!(out.extracted, vec!["".to_string()]);
}
#[test]
fn utf8_stream_parser_rolls_back_on_invalid_utf8_chunk() {
let mut parser = Utf8StreamParser::new(CitationStreamParser::new());
let first = match parser.push_bytes(&[0xC3]) {
Ok(out) => out,
Err(err) => panic!("leading byte may be buffered until next chunk: {err}"),
};
assert!(first.is_empty());
let err = match parser.push_bytes(&[0x28]) {
Ok(out) => panic!("invalid continuation byte should error, got output: {out:?}"),
Err(err) => err,
};
assert_eq!(
err,
Utf8StreamParserError::InvalidUtf8 {
valid_up_to: 0,
error_len: 1,
}
);
let second = match parser.push_bytes(&[0xA9, b'x']) {
Ok(out) => out,
Err(err) => panic!("state should still allow a valid continuation: {err}"),
};
let tail = match parser.finish() {
Ok(out) => out,
Err(err) => panic!("stream should finish: {err}"),
};
assert_eq!(second.visible_text, "éx");
assert!(second.extracted.is_empty());
assert!(tail.is_empty());
}
#[test]
fn utf8_stream_parser_rolls_back_entire_chunk_when_invalid_byte_follows_valid_prefix() {
let mut parser = Utf8StreamParser::new(CitationStreamParser::new());
let err = match parser.push_bytes(b"ok\xFF") {
Ok(out) => panic!("invalid byte should error, got output: {out:?}"),
Err(err) => err,
};
assert_eq!(
err,
Utf8StreamParserError::InvalidUtf8 {
valid_up_to: 2,
error_len: 1,
}
);
let next = match parser.push_bytes(b"!") {
Ok(out) => out,
Err(err) => panic!("parser should recover after rollback: {err}"),
};
assert_eq!(next.visible_text, "!");
assert!(next.extracted.is_empty());
}
#[test]
fn utf8_stream_parser_errors_on_incomplete_code_point_at_eof() {
let mut parser = Utf8StreamParser::new(CitationStreamParser::new());
let out = match parser.push_bytes(&[0xE2, 0x82]) {
Ok(out) => out,
Err(err) => panic!("partial code point should be buffered: {err}"),
};
assert!(out.is_empty());
let err = match parser.finish() {
Ok(out) => panic!("unfinished code point should error, got output: {out:?}"),
Err(err) => err,
};
assert_eq!(err, Utf8StreamParserError::IncompleteUtf8AtEof);
}
#[test]
fn utf8_stream_parser_into_inner_errors_when_partial_code_point_is_buffered() {
let mut parser = Utf8StreamParser::new(CitationStreamParser::new());
let out = match parser.push_bytes(&[0xC3]) {
Ok(out) => out,
Err(err) => panic!("partial code point should be buffered: {err}"),
};
assert!(out.is_empty());
let err = match parser.into_inner() {
Ok(_) => panic!("buffered partial code point should be rejected"),
Err(err) => err,
};
assert_eq!(err, Utf8StreamParserError::IncompleteUtf8AtEof);
}
#[test]
fn utf8_stream_parser_into_inner_lossy_drops_buffered_partial_code_point() {
let mut parser = Utf8StreamParser::new(CitationStreamParser::new());
let out = match parser.push_bytes(&[0xC3]) {
Ok(out) => out,
Err(err) => panic!("partial code point should be buffered: {err}"),
};
assert!(out.is_empty());
let mut inner = parser.into_inner_lossy();
let tail = inner.finish();
assert!(tail.is_empty());
}
}