diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 06acc6c97..45402ed66 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1724,6 +1724,7 @@ dependencies = [ "codex-utils-home-dir", "codex-utils-pty", "codex-utils-readiness", + "codex-utils-stream-parser", "codex-utils-string", "codex-windows-sandbox", "core-foundation 0.9.4", @@ -2547,6 +2548,13 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "codex-utils-stream-parser" +version = "0.0.0" +dependencies = [ + "pretty_assertions", +] + [[package]] name = "codex-utils-string" version = "0.0.0" diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index 51f541f24..e648572ec 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -58,6 +58,7 @@ members = [ "utils/approval-presets", "utils/oss", "utils/fuzzy-match", + "utils/stream-parser", "codex-client", "codex-api", "state", @@ -137,6 +138,7 @@ codex-utils-rustls-provider = { path = "utils/rustls-provider" } codex-utils-sandbox-summary = { path = "utils/sandbox-summary" } codex-utils-sleep-inhibitor = { path = "utils/sleep-inhibitor" } codex-utils-string = { path = "utils/string" } +codex-utils-stream-parser = { path = "utils/stream-parser" } codex-windows-sandbox = { path = "windows-sandbox-rs" } core_test_support = { path = "core/tests/common" } mcp_test_support = { path = "mcp-server/tests/common" } diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index 9a0be02a9..c79e90588 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -50,6 +50,7 @@ codex-utils-pty = { workspace = true } codex-utils-readiness = { workspace = true } codex-secrets = { workspace = true } codex-utils-string = { workspace = true } +codex-utils-stream-parser = { workspace = true } codex-windows-sandbox = { package = "codex-windows-sandbox", path = "../windows-sandbox-rs" } csv = { workspace = true } dirs = { workspace = true } diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index fedb77c44..a58f2e1e0 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -42,6 +42,7 @@ use crate::stream_events_utils::HandleOutputCtx; use crate::stream_events_utils::handle_non_tool_response_item; use crate::stream_events_utils::handle_output_item_done; use crate::stream_events_utils::last_assistant_message_from_item; +use crate::stream_events_utils::raw_assistant_output_text_from_item; use crate::terminal; use crate::truncate::TruncationPolicy; use crate::turn_metadata::TurnMetadataState; @@ -92,6 +93,11 @@ use codex_protocol::request_user_input::RequestUserInputResponse; use codex_protocol::skill_approval::SkillApprovalResponse; use codex_rmcp_client::ElicitationResponse; use codex_rmcp_client::OAuthCredentialsStoreMode; +use codex_utils_stream_parser::AssistantTextChunk; +use codex_utils_stream_parser::AssistantTextStreamParser; +use codex_utils_stream_parser::ProposedPlanSegment; +use codex_utils_stream_parser::extract_proposed_plan_text; +use codex_utils_stream_parser::strip_citations; use futures::future::BoxFuture; use futures::prelude::*; use futures::stream::FuturesOrdered; @@ -174,9 +180,6 @@ use crate::mentions::collect_explicit_app_ids; use crate::mentions::collect_tool_mentions_from_messages; use crate::network_policy_decision::execpolicy_network_rule_amendment; use crate::project_doc::get_user_instructions; -use crate::proposed_plan_parser::ProposedPlanParser; -use crate::proposed_plan_parser::ProposedPlanSegment; -use crate::proposed_plan_parser::extract_proposed_plan_text; use crate::protocol::AgentMessageContentDeltaEvent; use crate::protocol::AgentReasoningSectionBreakEvent; use crate::protocol::ApplyPatchApprovalRequestEvent; @@ -5618,39 +5621,9 @@ struct ProposedPlanItemState { completed: bool, } -/// Per-item plan parsers so we can buffer text while detecting `` -/// tags without ever mixing buffered lines across item ids. -struct PlanParsers { - assistant: HashMap, -} - -impl PlanParsers { - fn new() -> Self { - Self { - assistant: HashMap::new(), - } - } - - fn assistant_parser_mut(&mut self, item_id: &str) -> &mut ProposedPlanParser { - self.assistant - .entry(item_id.to_string()) - .or_insert_with(ProposedPlanParser::new) - } - - fn take_assistant_parser(&mut self, item_id: &str) -> Option { - self.assistant.remove(item_id) - } - - fn drain_assistant_parsers(&mut self) -> Vec<(String, ProposedPlanParser)> { - self.assistant.drain().collect() - } -} - /// Aggregated state used only while streaming a plan-mode response. /// Includes per-item parsers, deferred agent message bookkeeping, and the plan item lifecycle. struct PlanModeStreamState { - /// Per-item parsers for assistant streams in plan mode. - plan_parsers: PlanParsers, /// Agent message items started by the model but deferred until we see non-plan text. pending_agent_message_items: HashMap, /// Agent message items whose start notification has been emitted. @@ -5664,7 +5637,6 @@ struct PlanModeStreamState { impl PlanModeStreamState { fn new(turn_id: &str) -> Self { Self { - plan_parsers: PlanParsers::new(), pending_agent_message_items: HashMap::new(), started_agent_message_items: HashSet::new(), leading_whitespace_by_item: HashMap::new(), @@ -5673,6 +5645,56 @@ impl PlanModeStreamState { } } +#[derive(Debug, Default)] +struct AssistantMessageStreamParsers { + plan_mode: bool, + parsers_by_item: HashMap, +} + +type ParsedAssistantTextDelta = AssistantTextChunk; + +impl AssistantMessageStreamParsers { + fn new(plan_mode: bool) -> Self { + Self { + plan_mode, + parsers_by_item: HashMap::new(), + } + } + + fn parser_mut(&mut self, item_id: &str) -> &mut AssistantTextStreamParser { + let plan_mode = self.plan_mode; + self.parsers_by_item + .entry(item_id.to_string()) + .or_insert_with(|| AssistantTextStreamParser::new(plan_mode)) + } + + fn seed_item_text(&mut self, item_id: &str, text: &str) -> ParsedAssistantTextDelta { + if text.is_empty() { + return ParsedAssistantTextDelta::default(); + } + self.parser_mut(item_id).push_str(text) + } + + fn parse_delta(&mut self, item_id: &str, delta: &str) -> ParsedAssistantTextDelta { + self.parser_mut(item_id).push_str(delta) + } + + fn finish_item(&mut self, item_id: &str) -> ParsedAssistantTextDelta { + let Some(mut parser) = self.parsers_by_item.remove(item_id) else { + return ParsedAssistantTextDelta::default(); + }; + parser.finish() + } + + fn drain_finished(&mut self) -> Vec<(String, ParsedAssistantTextDelta)> { + let parsers_by_item = std::mem::take(&mut self.parsers_by_item); + parsers_by_item + .into_iter() + .map(|(item_id, mut parser)| (item_id, parser.finish())) + .collect() + } +} + impl ProposedPlanItemState { fn new(turn_id: &str) -> Self { Self { @@ -5875,35 +5897,68 @@ async fn handle_plan_segments( } } -/// Flush any buffered proposed-plan segments when a specific assistant message ends. -async fn flush_proposed_plan_segments_for_item( +async fn emit_streamed_assistant_text_delta( sess: &Session, turn_context: &TurnContext, - state: &mut PlanModeStreamState, + plan_mode_state: Option<&mut PlanModeStreamState>, item_id: &str, + parsed: ParsedAssistantTextDelta, ) { - let Some(mut parser) = state.plan_parsers.take_assistant_parser(item_id) else { - return; - }; - let segments = parser.finish(); - if segments.is_empty() { + if parsed.is_empty() { return; } - handle_plan_segments(sess, turn_context, state, item_id, segments).await; + if !parsed.citations.is_empty() { + // Citation extraction is intentionally local for now; we strip citations from display text + // but do not yet surface them in protocol events. + let _citations = parsed.citations; + } + if let Some(state) = plan_mode_state { + if !parsed.plan_segments.is_empty() { + handle_plan_segments(sess, turn_context, state, item_id, parsed.plan_segments).await; + } + return; + } + if parsed.visible_text.is_empty() { + return; + } + let event = AgentMessageContentDeltaEvent { + thread_id: sess.conversation_id.to_string(), + turn_id: turn_context.sub_id.clone(), + item_id: item_id.to_string(), + delta: parsed.visible_text, + }; + sess.send_event(turn_context, EventMsg::AgentMessageContentDelta(event)) + .await; } -/// Flush any remaining assistant plan parsers when the response completes. -async fn flush_proposed_plan_segments_all( +/// Flush buffered assistant text parser state when an assistant message item ends. +async fn flush_assistant_text_segments_for_item( sess: &Session, turn_context: &TurnContext, - state: &mut PlanModeStreamState, + plan_mode_state: Option<&mut PlanModeStreamState>, + parsers: &mut AssistantMessageStreamParsers, + item_id: &str, ) { - for (item_id, mut parser) in state.plan_parsers.drain_assistant_parsers() { - let segments = parser.finish(); - if segments.is_empty() { - continue; - } - handle_plan_segments(sess, turn_context, state, &item_id, segments).await; + let parsed = parsers.finish_item(item_id); + emit_streamed_assistant_text_delta(sess, turn_context, plan_mode_state, item_id, parsed).await; +} + +/// Flush any remaining buffered assistant text parser state at response completion. +async fn flush_assistant_text_segments_all( + sess: &Session, + turn_context: &TurnContext, + mut plan_mode_state: Option<&mut PlanModeStreamState>, + parsers: &mut AssistantMessageStreamParsers, +) { + for (item_id, parsed) in parsers.drain_finished() { + emit_streamed_assistant_text_delta( + sess, + turn_context, + plan_mode_state.as_deref_mut(), + &item_id, + parsed, + ) + .await; } } @@ -5924,6 +5979,7 @@ async fn maybe_complete_plan_item_from_message( } } if let Some(plan_text) = extract_proposed_plan_text(&text) { + let (plan_text, _citations) = strip_citations(&plan_text); if !state.plan_item_state.started { state.plan_item_state.start(sess, turn_context).await; } @@ -6112,6 +6168,7 @@ async fn try_run_sampling_request( let mut active_item: Option = None; let mut should_emit_turn_diff = false; let plan_mode = turn_context.collaboration_mode.mode == ModeKind::Plan; + let mut assistant_message_stream_parsers = AssistantMessageStreamParsers::new(plan_mode); let mut plan_mode_state = plan_mode.then(|| PlanModeStreamState::new(&turn_context.sub_id)); let receiving_span = trace_span!("receiving_stream"); let outcome: CodexResult = loop { @@ -6151,20 +6208,21 @@ async fn try_run_sampling_request( ResponseEvent::Created => {} ResponseEvent::OutputItemDone(item) => { let previously_active_item = active_item.take(); - if let Some(state) = plan_mode_state.as_mut() { - if let Some(previous) = previously_active_item.as_ref() { - let item_id = previous.id(); - if matches!(previous, TurnItem::AgentMessage(_)) { - flush_proposed_plan_segments_for_item( - &sess, - &turn_context, - state, - &item_id, - ) - .await; - } - } - if handle_assistant_item_done_in_plan_mode( + if let Some(previous) = previously_active_item.as_ref() + && matches!(previous, TurnItem::AgentMessage(_)) + { + let item_id = previous.id(); + flush_assistant_text_segments_for_item( + &sess, + &turn_context, + plan_mode_state.as_mut(), + &mut assistant_message_stream_parsers, + &item_id, + ) + .await; + } + if let Some(state) = plan_mode_state.as_mut() + && handle_assistant_item_done_in_plan_mode( &sess, &turn_context, &item, @@ -6173,9 +6231,8 @@ async fn try_run_sampling_request( &mut last_agent_message, ) .await - { - continue; - } + { + continue; } let mut ctx = HandleOutputCtx { @@ -6198,6 +6255,28 @@ async fn try_run_sampling_request( } ResponseEvent::OutputItemAdded(item) => { if let Some(turn_item) = handle_non_tool_response_item(&item, plan_mode).await { + let mut turn_item = turn_item; + let mut seeded_parsed: Option = None; + let mut seeded_item_id: Option = None; + if matches!(turn_item, TurnItem::AgentMessage(_)) + && let Some(raw_text) = raw_assistant_output_text_from_item(&item) + { + let item_id = turn_item.id(); + let mut seeded = + assistant_message_stream_parsers.seed_item_text(&item_id, &raw_text); + if let TurnItem::AgentMessage(agent_message) = &mut turn_item { + agent_message.content = + vec![codex_protocol::items::AgentMessageContent::Text { + text: if plan_mode { + String::new() + } else { + std::mem::take(&mut seeded.visible_text) + }, + }]; + } + seeded_parsed = plan_mode.then_some(seeded); + seeded_item_id = Some(item_id); + } if let Some(state) = plan_mode_state.as_mut() && matches!(turn_item, TurnItem::AgentMessage(_)) { @@ -6208,6 +6287,20 @@ async fn try_run_sampling_request( } else { sess.emit_turn_item_started(&turn_context, &turn_item).await; } + if let (Some(state), Some(item_id), Some(parsed)) = ( + plan_mode_state.as_mut(), + seeded_item_id.as_deref(), + seeded_parsed, + ) { + emit_streamed_assistant_text_delta( + &sess, + &turn_context, + Some(state), + item_id, + parsed, + ) + .await; + } active_item = Some(turn_item); } } @@ -6237,9 +6330,13 @@ async fn try_run_sampling_request( token_usage, can_append: _, } => { - if let Some(state) = plan_mode_state.as_mut() { - flush_proposed_plan_segments_all(&sess, &turn_context, state).await; - } + flush_assistant_text_segments_all( + &sess, + &turn_context, + plan_mode_state.as_mut(), + &mut assistant_message_stream_parsers, + ) + .await; sess.update_token_usage_info(&turn_context, token_usage.as_ref()) .await; should_emit_turn_diff = true; @@ -6256,14 +6353,16 @@ async fn try_run_sampling_request( // UI will show a selection popup from the final ReviewOutput. if let Some(active) = active_item.as_ref() { let item_id = active.id(); - if let Some(state) = plan_mode_state.as_mut() - && matches!(active, TurnItem::AgentMessage(_)) - { - let segments = state - .plan_parsers - .assistant_parser_mut(&item_id) - .parse(&delta); - handle_plan_segments(&sess, &turn_context, state, &item_id, segments).await; + if matches!(active, TurnItem::AgentMessage(_)) { + let parsed = assistant_message_stream_parsers.parse_delta(&item_id, &delta); + emit_streamed_assistant_text_delta( + &sess, + &turn_context, + plan_mode_state.as_mut(), + &item_id, + parsed, + ) + .await; } else { let event = AgentMessageContentDeltaEvent { thread_id: sess.conversation_id.to_string(), @@ -6329,6 +6428,14 @@ async fn try_run_sampling_request( } }; + flush_assistant_text_segments_all( + &sess, + &turn_context, + plan_mode_state.as_mut(), + &mut assistant_message_stream_parsers, + ) + .await; + drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?; if should_emit_turn_diff { @@ -6346,23 +6453,10 @@ async fn try_run_sampling_request( } pub(super) fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option { - responses.iter().rev().find_map(|item| { - if let ResponseItem::Message { role, content, .. } = item { - if role == "assistant" { - content.iter().rev().find_map(|ci| { - if let ContentItem::OutputText { text } = ci { - Some(text.clone()) - } else { - None - } - }) - } else { - None - } - } else { - None - } - }) + responses + .iter() + .rev() + .find_map(|item| last_assistant_message_from_item(item, false)) } use crate::memories::prompts::build_memory_tool_developer_instructions; @@ -6486,6 +6580,68 @@ mod tests { } } + #[test] + fn assistant_message_stream_parsers_can_be_seeded_from_output_item_added_text() { + let mut parsers = AssistantMessageStreamParsers::new(false); + let item_id = "msg-1"; + + let seeded = parsers.seed_item_text(item_id, "hello doc"); + let parsed = parsers.parse_delta(item_id, "1 world"); + let tail = parsers.finish_item(item_id); + + assert_eq!(seeded.visible_text, "hello "); + assert_eq!(seeded.citations, Vec::::new()); + assert_eq!(parsed.visible_text, " world"); + assert_eq!(parsed.citations, vec!["doc1".to_string()]); + assert_eq!(tail.visible_text, ""); + assert_eq!(tail.citations, Vec::::new()); + } + + #[test] + fn assistant_message_stream_parsers_seed_buffered_prefix_stays_out_of_finish_tail() { + let mut parsers = AssistantMessageStreamParsers::new(false); + let item_id = "msg-1"; + + let seeded = parsers.seed_item_text(item_id, "hello doc world"); + let tail = parsers.finish_item(item_id); + + assert_eq!(seeded.visible_text, "hello "); + assert_eq!(seeded.citations, Vec::::new()); + assert_eq!(parsed.visible_text, " world"); + assert_eq!(parsed.citations, vec!["doc".to_string()]); + assert_eq!(tail.visible_text, ""); + assert_eq!(tail.citations, Vec::::new()); + } + + #[test] + fn assistant_message_stream_parsers_seed_plan_parser_across_added_and_delta_boundaries() { + let mut parsers = AssistantMessageStreamParsers::new(true); + let item_id = "msg-1"; + + let seeded = parsers.seed_item_text(item_id, "Intro\n\n- step\n\nOutro"); + let tail = parsers.finish_item(item_id); + + assert_eq!(seeded.visible_text, "Intro\n"); + assert_eq!( + seeded.plan_segments, + vec![ProposedPlanSegment::Normal("Intro\n".to_string())] + ); + assert_eq!(parsed.visible_text, "Outro"); + assert_eq!( + parsed.plan_segments, + vec![ + ProposedPlanSegment::ProposedPlanStart, + ProposedPlanSegment::ProposedPlanDelta("- step\n".to_string()), + ProposedPlanSegment::ProposedPlanEnd, + ProposedPlanSegment::Normal("Outro".to_string()), + ] + ); + assert_eq!(tail.visible_text, ""); + assert!(tail.plan_segments.is_empty()); + } + fn make_mcp_tool( server_name: &str, tool_name: &str, diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index fdac1accb..c34f33f39 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -56,13 +56,11 @@ mod message_history; mod model_provider_info; pub mod path_utils; pub mod personality_migration; -mod proposed_plan_parser; mod sandbox_tags; pub mod sandboxing; mod session_prefix; mod shell_detect; mod stream_events_utils; -mod tagged_block_parser; pub mod test_support; mod text_encoding; pub mod token_data; diff --git a/codex-rs/core/src/proposed_plan_parser.rs b/codex-rs/core/src/proposed_plan_parser.rs deleted file mode 100644 index 44be264f2..000000000 --- a/codex-rs/core/src/proposed_plan_parser.rs +++ /dev/null @@ -1,185 +0,0 @@ -use crate::tagged_block_parser::TagSpec; -use crate::tagged_block_parser::TaggedLineParser; -use crate::tagged_block_parser::TaggedLineSegment; - -const OPEN_TAG: &str = ""; -const CLOSE_TAG: &str = ""; - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum PlanTag { - ProposedPlan, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) enum ProposedPlanSegment { - Normal(String), - ProposedPlanStart, - ProposedPlanDelta(String), - ProposedPlanEnd, -} - -/// Parser for `` blocks emitted in plan mode. -/// -/// This is a thin wrapper around the generic line-based tag parser. It maps -/// tag-aware segments into plan-specific segments for downstream consumers. -#[derive(Debug)] -pub(crate) struct ProposedPlanParser { - parser: TaggedLineParser, -} - -impl ProposedPlanParser { - pub(crate) fn new() -> Self { - Self { - parser: TaggedLineParser::new(vec![TagSpec { - open: OPEN_TAG, - close: CLOSE_TAG, - tag: PlanTag::ProposedPlan, - }]), - } - } - - pub(crate) fn parse(&mut self, delta: &str) -> Vec { - self.parser - .parse(delta) - .into_iter() - .map(map_plan_segment) - .collect() - } - - pub(crate) fn finish(&mut self) -> Vec { - self.parser - .finish() - .into_iter() - .map(map_plan_segment) - .collect() - } -} - -fn map_plan_segment(segment: TaggedLineSegment) -> ProposedPlanSegment { - match segment { - TaggedLineSegment::Normal(text) => ProposedPlanSegment::Normal(text), - TaggedLineSegment::TagStart(PlanTag::ProposedPlan) => { - ProposedPlanSegment::ProposedPlanStart - } - TaggedLineSegment::TagDelta(PlanTag::ProposedPlan, text) => { - ProposedPlanSegment::ProposedPlanDelta(text) - } - TaggedLineSegment::TagEnd(PlanTag::ProposedPlan) => ProposedPlanSegment::ProposedPlanEnd, - } -} - -pub(crate) fn strip_proposed_plan_blocks(text: &str) -> String { - let mut parser = ProposedPlanParser::new(); - let mut out = String::new(); - for segment in parser.parse(text).into_iter().chain(parser.finish()) { - if let ProposedPlanSegment::Normal(delta) = segment { - out.push_str(&delta); - } - } - out -} - -pub(crate) fn extract_proposed_plan_text(text: &str) -> Option { - let mut parser = ProposedPlanParser::new(); - let mut plan_text = String::new(); - let mut saw_plan_block = false; - for segment in parser.parse(text).into_iter().chain(parser.finish()) { - match segment { - ProposedPlanSegment::ProposedPlanStart => { - saw_plan_block = true; - plan_text.clear(); - } - ProposedPlanSegment::ProposedPlanDelta(delta) => { - plan_text.push_str(&delta); - } - ProposedPlanSegment::ProposedPlanEnd | ProposedPlanSegment::Normal(_) => {} - } - } - saw_plan_block.then_some(plan_text) -} - -#[cfg(test)] -mod tests { - use super::ProposedPlanParser; - use super::ProposedPlanSegment; - use super::strip_proposed_plan_blocks; - use pretty_assertions::assert_eq; - - #[test] - fn streams_proposed_plan_segments() { - let mut parser = ProposedPlanParser::new(); - let mut segments = Vec::new(); - - for chunk in [ - "Intro text\n\n- step 1\n", - "\nOutro", - ] { - segments.extend(parser.parse(chunk)); - } - segments.extend(parser.finish()); - - assert_eq!( - segments, - vec![ - ProposedPlanSegment::Normal("Intro text\n".to_string()), - ProposedPlanSegment::ProposedPlanStart, - ProposedPlanSegment::ProposedPlanDelta("- step 1\n".to_string()), - ProposedPlanSegment::ProposedPlanEnd, - ProposedPlanSegment::Normal("Outro".to_string()), - ] - ); - } - - #[test] - fn preserves_non_tag_lines() { - let mut parser = ProposedPlanParser::new(); - let mut segments = parser.parse(" extra\n"); - segments.extend(parser.finish()); - - assert_eq!( - segments, - vec![ProposedPlanSegment::Normal( - " extra\n".to_string() - )] - ); - } - - #[test] - fn closes_unterminated_plan_block_on_finish() { - let mut parser = ProposedPlanParser::new(); - let mut segments = parser.parse("\n- step 1\n"); - segments.extend(parser.finish()); - - assert_eq!( - segments, - vec![ - ProposedPlanSegment::ProposedPlanStart, - ProposedPlanSegment::ProposedPlanDelta("- step 1\n".to_string()), - ProposedPlanSegment::ProposedPlanEnd, - ] - ); - } - - #[test] - fn closes_tag_line_without_trailing_newline() { - let mut parser = ProposedPlanParser::new(); - let mut segments = parser.parse("\n- step 1\n"); - segments.extend(parser.finish()); - - assert_eq!( - segments, - vec![ - ProposedPlanSegment::ProposedPlanStart, - ProposedPlanSegment::ProposedPlanDelta("- step 1\n".to_string()), - ProposedPlanSegment::ProposedPlanEnd, - ] - ); - } - - #[test] - fn strips_proposed_plan_blocks_from_text() { - let text = "before\n\n- step\n\nafter"; - assert_eq!(strip_proposed_plan_blocks(text), "before\nafter"); - } -} diff --git a/codex-rs/core/src/stream_events_utils.rs b/codex-rs/core/src/stream_events_utils.rs index 394f4b932..753336554 100644 --- a/codex-rs/core/src/stream_events_utils.rs +++ b/codex-rs/core/src/stream_events_utils.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use codex_protocol::config_types::ModeKind; use codex_protocol::items::TurnItem; +use codex_utils_stream_parser::strip_citations; use tokio_util::sync::CancellationToken; use crate::codex::Session; @@ -11,17 +12,42 @@ use crate::error::CodexErr; use crate::error::Result; use crate::function_tool::FunctionCallError; use crate::parse_turn_item; -use crate::proposed_plan_parser::strip_proposed_plan_blocks; use crate::tools::parallel::ToolCallRuntime; use crate::tools::router::ToolRouter; use codex_protocol::models::FunctionCallOutputBody; use codex_protocol::models::FunctionCallOutputPayload; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; +use codex_utils_stream_parser::strip_proposed_plan_blocks; use futures::Future; use tracing::debug; use tracing::instrument; +fn strip_hidden_assistant_markup(text: &str, plan_mode: bool) -> String { + let (without_citations, _citations) = strip_citations(text); + if plan_mode { + strip_proposed_plan_blocks(&without_citations) + } else { + without_citations + } +} + +pub(crate) fn raw_assistant_output_text_from_item(item: &ResponseItem) -> Option { + if let ResponseItem::Message { role, content, .. } = item + && role == "assistant" + { + let combined = content + .iter() + .filter_map(|ci| match ci { + codex_protocol::models::ContentItem::OutputText { text } => Some(text.as_str()), + _ => None, + }) + .collect::(); + return Some(combined); + } + None +} + /// Handle a completed output item from the model stream, recording it and /// queuing any tool execution futures. This records items immediately so /// history and rollout stay in sync even if the turn is later cancelled. @@ -169,7 +195,7 @@ pub(crate) async fn handle_non_tool_response_item( | ResponseItem::Reasoning { .. } | ResponseItem::WebSearchCall { .. } => { let mut turn_item = parse_turn_item(item)?; - if plan_mode && let TurnItem::AgentMessage(agent_message) = &mut turn_item { + if let TurnItem::AgentMessage(agent_message) = &mut turn_item { let combined = agent_message .content .iter() @@ -177,7 +203,7 @@ pub(crate) async fn handle_non_tool_response_item( codex_protocol::items::AgentMessageContent::Text { text } => text.as_str(), }) .collect::(); - let stripped = strip_proposed_plan_blocks(&combined); + let stripped = strip_hidden_assistant_markup(&combined, plan_mode); agent_message.content = vec![codex_protocol::items::AgentMessageContent::Text { text: stripped }]; } @@ -195,25 +221,15 @@ pub(crate) fn last_assistant_message_from_item( item: &ResponseItem, plan_mode: bool, ) -> Option { - if let ResponseItem::Message { role, content, .. } = item - && role == "assistant" - { - let combined = content - .iter() - .filter_map(|ci| match ci { - codex_protocol::models::ContentItem::OutputText { text } => Some(text.as_str()), - _ => None, - }) - .collect::(); + if let Some(combined) = raw_assistant_output_text_from_item(item) { if combined.is_empty() { return None; } - return if plan_mode { - let stripped = strip_proposed_plan_blocks(&combined); - (!stripped.trim().is_empty()).then_some(stripped) - } else { - Some(combined) - }; + let stripped = strip_hidden_assistant_markup(&combined, plan_mode); + if stripped.trim().is_empty() { + return None; + } + return Some(stripped); } None } @@ -248,3 +264,72 @@ pub(crate) fn response_input_to_response_item(input: &ResponseInputItem) -> Opti _ => None, } } + +#[cfg(test)] +mod tests { + use super::handle_non_tool_response_item; + use super::last_assistant_message_from_item; + use codex_protocol::items::TurnItem; + use codex_protocol::models::ContentItem; + use codex_protocol::models::ResponseItem; + use pretty_assertions::assert_eq; + + fn assistant_output_text(text: &str) -> ResponseItem { + ResponseItem::Message { + id: Some("msg-1".to_string()), + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: text.to_string(), + }], + end_turn: Some(true), + phase: None, + } + } + + #[tokio::test] + async fn handle_non_tool_response_item_strips_citations_from_assistant_message() { + let item = assistant_output_text("hellodoc1 world"); + + let turn_item = handle_non_tool_response_item(&item, false) + .await + .expect("assistant message should parse"); + + let TurnItem::AgentMessage(agent_message) = turn_item else { + panic!("expected agent message"); + }; + let text = agent_message + .content + .iter() + .map(|entry| match entry { + codex_protocol::items::AgentMessageContent::Text { text } => text.as_str(), + }) + .collect::(); + assert_eq!(text, "hello world"); + } + + #[test] + fn last_assistant_message_from_item_strips_citations_and_plan_blocks() { + let item = assistant_output_text( + "beforedoc1\n\n- x\n\nafter", + ); + + let message = last_assistant_message_from_item(&item, true) + .expect("assistant text should remain after stripping"); + + assert_eq!(message, "before\nafter"); + } + + #[test] + fn last_assistant_message_from_item_returns_none_for_citation_only_message() { + let item = assistant_output_text("doc1"); + + assert_eq!(last_assistant_message_from_item(&item, false), None); + } + + #[test] + fn last_assistant_message_from_item_returns_none_for_plan_only_hidden_message() { + let item = assistant_output_text("\n- x\n"); + + assert_eq!(last_assistant_message_from_item(&item, true), None); + } +} diff --git a/codex-rs/core/templates/memories/read_path.md b/codex-rs/core/templates/memories/read_path.md index 7fd38ca50..59159b2ec 100644 --- a/codex-rs/core/templates/memories/read_path.md +++ b/codex-rs/core/templates/memories/read_path.md @@ -52,6 +52,47 @@ When to update memory: - When user explicitly asks to remember/update memory, revise memory_summary.md and/or MEMORY.md. +Memory citation requirements: + +- If ANY relevant memory files were used: append exactly one +`` block as the VERY LAST content of the final reply. + Normal responses should include the answer first, then append the +`` block at the end. +- Use this exact structure for programmatic parsing: +``` + + +MEMORY.md:234-236|note=[responsesapi citation extraction code pointer] +rollout_summaries/2026-02-17T21-23-02-LN3m-weekly_memory_report_pivot_from_git_history.md:10-12|note=[weekly report format] + + +019c6e27-e55b-73d1-87d8-4e01f1f75043 +019c7714-3b77-74d1-9866-e1f484aae2ab + + +``` +- `citation_entries` is for rendering: + - one citation entry per line + - format: `:-|note=[]` + - use file paths relative to the memory base path (for example, `MEMORY.md`, + `rollout_summaries/...`, `skills/...`) + - only cite files actually used under the memory base path (do not cite + workspace files as memory citations) + - if you used `MEMORY.md` and then a rollout summary/skill file, cite both + - list entries in order of importance (most important first) + - `note` should be short, single-line, and use simple characters only (avoid + unusual symbols, no newlines) +- `rollout_ids` is for us to track what previous rollouts you find useful: + - include one rollout id per line + - rollout ids should look like UUIDs (for example, + `019c6e27-e55b-73d1-87d8-4e01f1f75043`) + - include unique ids only; do not repeat ids + - an empty `` section is allowed if no rollout ids are available + - you can find rollout ids in rollout summary files and MEMORY.md + - do not include file paths or notes in this section +- Never include memory citations inside pull-request messages. +- Never cite blank lines; double-check ranges. + ========= MEMORY_SUMMARY BEGINS ========= {{ memory_summary }} ========= MEMORY_SUMMARY ENDS ========= diff --git a/codex-rs/core/tests/suite/items.rs b/codex-rs/core/tests/suite/items.rs index 844c55c6b..0d93278af 100644 --- a/codex-rs/core/tests/suite/items.rs +++ b/codex-rs/core/tests/suite/items.rs @@ -505,6 +505,314 @@ async fn plan_mode_strips_plan_from_agent_messages() -> anyhow::Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn plan_mode_streaming_citations_are_stripped_across_added_deltas_and_done() +-> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + + let TestCodex { + codex, + session_configured, + .. + } = test_codex().build(&server).await?; + + let added_text = "Intro outer-doc\n\n- Step 1plan-doc\n- Step 2\n\nOu", + "tro", + ]; + let full_message = format!("{added_text}{}", deltas.concat()); + + let mut events = vec![ + ev_response_created("resp-1"), + ev_message_item_added("msg-1", added_text), + ]; + for delta in deltas { + events.push(ev_output_text_delta(delta)); + } + events.push(ev_assistant_message("msg-1", &full_message)); + events.push(ev_completed("resp-1")); + mount_sse_once(&server, sse(events)).await; + + let collaboration_mode = CollaborationMode { + mode: ModeKind::Plan, + settings: Settings { + model: session_configured.model.clone(), + reasoning_effort: None, + developer_instructions: None, + }, + }; + + codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "please plan with citations".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + cwd: std::env::current_dir()?, + approval_policy: codex_protocol::protocol::AskForApproval::Never, + sandbox_policy: codex_protocol::protocol::SandboxPolicy::DangerFullAccess, + model: session_configured.model.clone(), + effort: None, + summary: codex_protocol::config_types::ReasoningSummary::Auto, + collaboration_mode: Some(collaboration_mode), + personality: None, + }) + .await?; + + let mut agent_started = None; + let mut agent_started_idx = None; + let mut agent_completed = None; + let mut agent_completed_idx = None; + let mut plan_started = None; + let mut plan_started_idx = None; + let mut plan_completed = None; + let mut plan_completed_idx = None; + let mut agent_deltas = Vec::new(); + let mut plan_deltas = Vec::new(); + let mut first_agent_delta_idx = None; + let mut last_agent_delta_idx = None; + let mut first_plan_delta_idx = None; + let mut last_plan_delta_idx = None; + let mut idx = 0usize; + + let turn_complete_idx = loop { + let ev = wait_for_event(&codex, |_| true).await; + match ev { + EventMsg::ItemStarted(ItemStartedEvent { + item: TurnItem::AgentMessage(item), + .. + }) => { + agent_started_idx = Some(idx); + agent_started = Some(item); + } + EventMsg::ItemStarted(ItemStartedEvent { + item: TurnItem::Plan(item), + .. + }) => { + plan_started_idx = Some(idx); + plan_started = Some(item); + } + EventMsg::AgentMessageContentDelta(event) => { + if first_agent_delta_idx.is_none() { + first_agent_delta_idx = Some(idx); + } + last_agent_delta_idx = Some(idx); + agent_deltas.push(event.delta); + } + EventMsg::PlanDelta(event) => { + if first_plan_delta_idx.is_none() { + first_plan_delta_idx = Some(idx); + } + last_plan_delta_idx = Some(idx); + plan_deltas.push(event.delta); + } + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::AgentMessage(item), + .. + }) => { + agent_completed_idx = Some(idx); + agent_completed = Some(item); + } + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::Plan(item), + .. + }) => { + plan_completed_idx = Some(idx); + plan_completed = Some(item); + } + EventMsg::TurnComplete(_) => { + break idx; + } + _ => {} + } + idx += 1; + }; + + let agent_started = agent_started.expect("agent item start should be emitted"); + let agent_completed = agent_completed.expect("agent item completion should be emitted"); + let plan_started = plan_started.expect("plan item start should be emitted"); + let plan_completed = plan_completed.expect("plan item completion should be emitted"); + + assert_eq!(agent_started.id, agent_completed.id); + assert_eq!(plan_started.id, plan_completed.id); + assert_eq!(plan_started.text, ""); + + let agent_started_text: String = agent_started + .content + .iter() + .map(|entry| match entry { + AgentMessageContent::Text { text } => text.as_str(), + }) + .collect(); + let agent_completed_text: String = agent_completed + .content + .iter() + .map(|entry| match entry { + AgentMessageContent::Text { text } => text.as_str(), + }) + .collect(); + let agent_delta_text = agent_deltas.concat(); + let plan_delta_text = plan_deltas.concat(); + + assert_eq!(agent_started_text, ""); + assert_eq!(agent_delta_text, "Intro \nOutro"); + assert_eq!(agent_completed_text, "Intro \nOutro"); + assert_eq!(plan_delta_text, "- Step 1\n- Step 2\n"); + assert_eq!(plan_completed.text, "- Step 1\n- Step 2\n"); + + for text in [ + agent_started_text.as_str(), + agent_delta_text.as_str(), + agent_completed_text.as_str(), + plan_delta_text.as_str(), + plan_completed.text.as_str(), + ] { + assert!(!text.contains("")); + assert!(!text.contains("")); + } + + let agent_started_idx = agent_started_idx.expect("agent start index"); + let agent_completed_idx = agent_completed_idx.expect("agent completion index"); + let plan_started_idx = plan_started_idx.expect("plan start index"); + let plan_completed_idx = plan_completed_idx.expect("plan completion index"); + let first_agent_delta_idx = first_agent_delta_idx.expect("agent delta index"); + let last_agent_delta_idx = last_agent_delta_idx.expect("agent delta index"); + let first_plan_delta_idx = first_plan_delta_idx.expect("plan delta index"); + let last_plan_delta_idx = last_plan_delta_idx.expect("plan delta index"); + assert!(agent_started_idx < first_agent_delta_idx); + assert!(plan_started_idx < first_plan_delta_idx); + assert!(last_agent_delta_idx < agent_completed_idx); + assert!(last_plan_delta_idx < plan_completed_idx); + assert!(agent_completed_idx < turn_complete_idx); + assert!(plan_completed_idx < turn_complete_idx); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn plan_mode_streaming_proposed_plan_tag_split_across_added_and_delta_is_parsed() +-> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + + let TestCodex { + codex, + session_configured, + .. + } = test_codex().build(&server).await?; + + let added_text = "Intro\n\n- Step 1\n\nOutro"]; + let full_message = format!("{added_text}{}", deltas.concat()); + + let mut events = vec![ + ev_response_created("resp-1"), + ev_message_item_added("msg-1", added_text), + ]; + for delta in deltas { + events.push(ev_output_text_delta(delta)); + } + events.push(ev_assistant_message("msg-1", &full_message)); + events.push(ev_completed("resp-1")); + mount_sse_once(&server, sse(events)).await; + + let collaboration_mode = CollaborationMode { + mode: ModeKind::Plan, + settings: Settings { + model: session_configured.model.clone(), + reasoning_effort: None, + developer_instructions: None, + }, + }; + + codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "please plan".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + cwd: std::env::current_dir()?, + approval_policy: codex_protocol::protocol::AskForApproval::Never, + sandbox_policy: codex_protocol::protocol::SandboxPolicy::DangerFullAccess, + model: session_configured.model.clone(), + effort: None, + summary: codex_protocol::config_types::ReasoningSummary::Auto, + collaboration_mode: Some(collaboration_mode), + personality: None, + }) + .await?; + + let mut agent_started = None; + let mut agent_completed = None; + let mut plan_started = None; + let mut plan_completed = None; + let mut agent_deltas = Vec::new(); + let mut plan_deltas = Vec::new(); + + loop { + let ev = wait_for_event(&codex, |_| true).await; + match ev { + EventMsg::ItemStarted(ItemStartedEvent { + item: TurnItem::AgentMessage(item), + .. + }) => agent_started = Some(item), + EventMsg::ItemStarted(ItemStartedEvent { + item: TurnItem::Plan(item), + .. + }) => plan_started = Some(item), + EventMsg::AgentMessageContentDelta(event) => agent_deltas.push(event.delta), + EventMsg::PlanDelta(event) => plan_deltas.push(event.delta), + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::AgentMessage(item), + .. + }) => agent_completed = Some(item), + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::Plan(item), + .. + }) => plan_completed = Some(item), + EventMsg::TurnComplete(_) => break, + _ => {} + } + } + + let agent_started = agent_started.expect("agent item start should be emitted"); + let agent_completed = agent_completed.expect("agent item completion should be emitted"); + let plan_started = plan_started.expect("plan item start should be emitted"); + let plan_completed = plan_completed.expect("plan item completion should be emitted"); + + let agent_started_text: String = agent_started + .content + .iter() + .map(|entry| match entry { + AgentMessageContent::Text { text } => text.as_str(), + }) + .collect(); + let agent_completed_text: String = agent_completed + .content + .iter() + .map(|entry| match entry { + AgentMessageContent::Text { text } => text.as_str(), + }) + .collect(); + + assert_eq!(agent_started_text, ""); + assert_eq!(agent_deltas.concat(), "Intro\nOutro"); + assert_eq!(agent_completed_text, "Intro\nOutro"); + assert_eq!(plan_started.text, ""); + assert_eq!(plan_deltas.concat(), "- Step 1\n"); + assert_eq!(plan_completed.text, "- Step 1\n"); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn plan_mode_handles_missing_plan_close_tag() -> anyhow::Result<()> { skip_if_no_network!(Ok(())); diff --git a/codex-rs/utils/stream-parser/BUILD.bazel b/codex-rs/utils/stream-parser/BUILD.bazel new file mode 100644 index 000000000..e94b3d740 --- /dev/null +++ b/codex-rs/utils/stream-parser/BUILD.bazel @@ -0,0 +1,6 @@ +load("//:defs.bzl", "codex_rust_crate") + +codex_rust_crate( + name = "stream-parser", + crate_name = "codex_utils_stream_parser", +) diff --git a/codex-rs/utils/stream-parser/Cargo.toml b/codex-rs/utils/stream-parser/Cargo.toml new file mode 100644 index 000000000..faba53e1f --- /dev/null +++ b/codex-rs/utils/stream-parser/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "codex-utils-stream-parser" +version.workspace = true +edition.workspace = true +license.workspace = true + +[lints] +workspace = true + +[dev-dependencies] +pretty_assertions = { workspace = true } diff --git a/codex-rs/utils/stream-parser/README.md b/codex-rs/utils/stream-parser/README.md new file mode 100644 index 000000000..7163570ec --- /dev/null +++ b/codex-rs/utils/stream-parser/README.md @@ -0,0 +1,97 @@ +# codex-utils-stream-parser + +Small, dependency-free utilities for parsing streamed text incrementally. + +**Disclaimer**: This code is pretty complex and Codex did not manage to write it so before updating the code, make +sure to deeply understand it and don't blindly trust Codex on it. Feel free to update the documentation as you +modify the code + +## What it provides + +- `StreamTextParser`: trait for incremental parsers that consume string chunks +- `InlineHiddenTagParser`: generic parser that hides inline tags and extracts their contents +- `CitationStreamParser`: convenience wrapper for `...` +- `strip_citations(...)`: one-shot helper for non-streamed strings +- `Utf8StreamParser

`: adapter for raw `&[u8]` streams that may split UTF-8 code points + +## Why this exists + +Some model outputs arrive as a stream and may contain hidden markup (for example +`...`) split across chunk boundaries. Parsing each chunk +independently is incorrect because tags can be split (``). + +This crate keeps parser state across chunks, returns visible text safe to render +immediately, and extracts hidden payloads separately. + +## Example: citation streaming + +```rust +use codex_utils_stream_parser::CitationStreamParser; +use codex_utils_stream_parser::StreamTextParser; + +let mut parser = CitationStreamParser::new(); + +let first = parser.push_str("Hello doc A world"); +assert_eq!(second.visible_text, " world"); +assert_eq!(second.extracted, vec!["doc A".to_string()]); + +let tail = parser.finish(); +assert!(tail.visible_text.is_empty()); +assert!(tail.extracted.is_empty()); +``` + +## Example: raw byte streaming with split UTF-8 code points + +```rust +use codex_utils_stream_parser::CitationStreamParser; +use codex_utils_stream_parser::Utf8StreamParser; + +# fn demo() -> Result<(), codex_utils_stream_parser::Utf8StreamParserError> { +let mut parser = Utf8StreamParser::new(CitationStreamParser::new()); + +// "é" split across chunks: 0xC3 + 0xA9 +let first = parser.push_bytes(&[b'H', 0xC3])?; +assert_eq!(first.visible_text, "H"); + +let second = parser.push_bytes(&[0xA9, b'!'])?; +assert_eq!(second.visible_text, "é!"); + +let tail = parser.finish()?; +assert!(tail.visible_text.is_empty()); +# Ok(()) +# } +``` + +## Example: custom hidden tags + +```rust +use codex_utils_stream_parser::InlineHiddenTagParser; +use codex_utils_stream_parser::InlineTagSpec; +use codex_utils_stream_parser::StreamTextParser; + +#[derive(Clone, Debug, PartialEq, Eq)] +enum Tag { + Secret, +} + +let mut parser = InlineHiddenTagParser::new(vec![InlineTagSpec { + tag: Tag::Secret, + open: "", + close: "", +}]); + +let out = parser.push_str("axb"); +assert_eq!(out.visible_text, "ab"); +assert_eq!(out.extracted.len(), 1); +assert_eq!(out.extracted[0].content, "x"); +``` + +## Known limitations + +- Tags are matched literally and case-sensitively +- No nested tag support +- A stream can return empty objects. \ No newline at end of file diff --git a/codex-rs/utils/stream-parser/src/assistant_text.rs b/codex-rs/utils/stream-parser/src/assistant_text.rs new file mode 100644 index 000000000..931c30bd5 --- /dev/null +++ b/codex-rs/utils/stream-parser/src/assistant_text.rs @@ -0,0 +1,130 @@ +use crate::CitationStreamParser; +use crate::ProposedPlanParser; +use crate::ProposedPlanSegment; +use crate::StreamTextChunk; +use crate::StreamTextParser; + +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub struct AssistantTextChunk { + pub visible_text: String, + pub citations: Vec, + pub plan_segments: Vec, +} + +impl AssistantTextChunk { + pub fn is_empty(&self) -> bool { + self.visible_text.is_empty() && self.citations.is_empty() && self.plan_segments.is_empty() + } +} + +/// Parses assistant text streaming markup in one pass: +/// - strips `` tags and extracts citation payloads +/// - in plan mode, also strips `` blocks and emits plan segments +#[derive(Debug, Default)] +pub struct AssistantTextStreamParser { + plan_mode: bool, + citations: CitationStreamParser, + plan: ProposedPlanParser, +} + +impl AssistantTextStreamParser { + pub fn new(plan_mode: bool) -> Self { + Self { + plan_mode, + ..Self::default() + } + } + + pub fn push_str(&mut self, chunk: &str) -> AssistantTextChunk { + let citation_chunk = self.citations.push_str(chunk); + let mut out = self.parse_visible_text(citation_chunk.visible_text); + out.citations = citation_chunk.extracted; + out + } + + pub fn finish(&mut self) -> AssistantTextChunk { + let citation_chunk = self.citations.finish(); + let mut out = self.parse_visible_text(citation_chunk.visible_text); + if self.plan_mode { + let mut tail = self.plan.finish(); + if !tail.is_empty() { + out.visible_text.push_str(&tail.visible_text); + out.plan_segments.append(&mut tail.extracted); + } + } + out.citations = citation_chunk.extracted; + out + } + + fn parse_visible_text(&mut self, visible_text: String) -> AssistantTextChunk { + if !self.plan_mode { + return AssistantTextChunk { + visible_text, + ..AssistantTextChunk::default() + }; + } + let plan_chunk: StreamTextChunk = self.plan.push_str(&visible_text); + AssistantTextChunk { + visible_text: plan_chunk.visible_text, + plan_segments: plan_chunk.extracted, + ..AssistantTextChunk::default() + } + } +} + +#[cfg(test)] +mod tests { + use super::AssistantTextStreamParser; + use crate::ProposedPlanSegment; + use pretty_assertions::assert_eq; + + #[test] + fn parses_citations_across_seed_and_delta_boundaries() { + let mut parser = AssistantTextStreamParser::new(false); + + let seeded = parser.push_str("hello doc"); + let parsed = parser.push_str("1 world"); + let tail = parser.finish(); + + assert_eq!(seeded.visible_text, "hello "); + assert_eq!(seeded.citations, Vec::::new()); + assert_eq!(parsed.visible_text, " world"); + assert_eq!(parsed.citations, vec!["doc1".to_string()]); + assert_eq!(tail.visible_text, ""); + assert_eq!(tail.citations, Vec::::new()); + } + + #[test] + fn parses_plan_segments_after_citation_stripping() { + let mut parser = AssistantTextStreamParser::new(true); + + let seeded = parser.push_str("Intro\n\n- step doc\n"); + let tail = parser.push_str("\nOutro"); + let finish = parser.finish(); + + assert_eq!(seeded.visible_text, "Intro\n"); + assert_eq!( + seeded.plan_segments, + vec![ProposedPlanSegment::Normal("Intro\n".to_string())] + ); + assert_eq!(parsed.visible_text, ""); + assert_eq!(parsed.citations, vec!["doc".to_string()]); + assert_eq!( + parsed.plan_segments, + vec![ + ProposedPlanSegment::ProposedPlanStart, + ProposedPlanSegment::ProposedPlanDelta("- step \n".to_string()), + ] + ); + assert_eq!(tail.visible_text, "Outro"); + assert_eq!( + tail.plan_segments, + vec![ + ProposedPlanSegment::ProposedPlanEnd, + ProposedPlanSegment::Normal("Outro".to_string()), + ] + ); + assert!(finish.is_empty()); + } +} diff --git a/codex-rs/utils/stream-parser/src/citation.rs b/codex-rs/utils/stream-parser/src/citation.rs new file mode 100644 index 000000000..d7be6dd5f --- /dev/null +++ b/codex-rs/utils/stream-parser/src/citation.rs @@ -0,0 +1,179 @@ +use crate::InlineHiddenTagParser; +use crate::InlineTagSpec; +use crate::StreamTextChunk; +use crate::StreamTextParser; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum CitationTag { + Citation, +} + +const CITATION_OPEN: &str = ""; +const CITATION_CLOSE: &str = ""; + +/// Stream parser for `...` tags. +/// +/// This is a thin convenience wrapper around [`InlineHiddenTagParser`]. It returns citation bodies +/// as plain strings and omits the citation tags from visible text. +/// +/// Matching is literal and non-nested. If EOF is reached before a closing +/// ``, the parser auto-closes the tag and returns the buffered body as an +/// extracted citation. +#[derive(Debug)] +pub struct CitationStreamParser { + inner: InlineHiddenTagParser, +} + +impl CitationStreamParser { + pub fn new() -> Self { + Self { + inner: InlineHiddenTagParser::new(vec![InlineTagSpec { + tag: CitationTag::Citation, + open: CITATION_OPEN, + close: CITATION_CLOSE, + }]), + } + } +} + +impl Default for CitationStreamParser { + fn default() -> Self { + Self::new() + } +} + +impl StreamTextParser for CitationStreamParser { + type Extracted = String; + + fn push_str(&mut self, chunk: &str) -> StreamTextChunk { + let inner = self.inner.push_str(chunk); + StreamTextChunk { + visible_text: inner.visible_text, + extracted: inner.extracted.into_iter().map(|tag| tag.content).collect(), + } + } + + fn finish(&mut self) -> StreamTextChunk { + let inner = self.inner.finish(); + StreamTextChunk { + visible_text: inner.visible_text, + extracted: inner.extracted.into_iter().map(|tag| tag.content).collect(), + } + } +} + +/// Strip citation tags from a complete string and return `(visible_text, citations)`. +/// +/// This uses [`CitationStreamParser`] internally, so it inherits the same semantics: +/// literal, non-nested matching and auto-closing unterminated citations at EOF. +pub fn strip_citations(text: &str) -> (String, Vec) { + let mut parser = CitationStreamParser::new(); + let mut out = parser.push_str(text); + let tail = parser.finish(); + out.visible_text.push_str(&tail.visible_text); + out.extracted.extend(tail.extracted); + (out.visible_text, out.extracted) +} + +#[cfg(test)] +mod tests { + use super::CitationStreamParser; + use super::strip_citations; + use crate::StreamTextChunk; + use crate::StreamTextParser; + use pretty_assertions::assert_eq; + + fn collect_chunks

(parser: &mut P, chunks: &[&str]) -> StreamTextChunk + where + P: StreamTextParser, + { + let mut all = StreamTextChunk::default(); + for chunk in chunks { + let next = parser.push_str(chunk); + all.visible_text.push_str(&next.visible_text); + all.extracted.extend(next.extracted); + } + let tail = parser.finish(); + all.visible_text.push_str(&tail.visible_text); + all.extracted.extend(tail.extracted); + all + } + + #[test] + fn citation_parser_streams_across_chunk_boundaries() { + let mut parser = CitationStreamParser::new(); + let out = collect_chunks( + &mut parser, + &[ + "Hello source A world", + ], + ); + + assert_eq!(out.visible_text, "Hello world"); + assert_eq!(out.extracted, vec!["source A".to_string()]); + } + + #[test] + fn citation_parser_buffers_partial_open_tag_prefix() { + let mut parser = CitationStreamParser::new(); + + let first = parser.push_str("abc ::new()); + + let second = parser.push_str("citation>xz"); + let tail = parser.finish(); + + assert_eq!(second.visible_text, "z"); + assert_eq!(second.extracted, vec!["x".to_string()]); + assert!(tail.is_empty()); + } + + #[test] + fn citation_parser_auto_closes_unterminated_tag_on_finish() { + let mut parser = CitationStreamParser::new(); + let out = collect_chunks(&mut parser, &["xsource"]); + + assert_eq!(out.visible_text, "x"); + assert_eq!(out.extracted, vec!["source".to_string()]); + } + + #[test] + fn citation_parser_preserves_partial_open_tag_at_eof_if_not_a_full_tag() { + let mut parser = CitationStreamParser::new(); + let out = collect_chunks(&mut parser, &["hello ::new()); + } + + #[test] + fn strip_citations_collects_all_citations() { + let (visible, citations) = strip_citations( + "aonebtwoc", + ); + + assert_eq!(visible, "abc"); + assert_eq!(citations, vec!["one".to_string(), "two".to_string()]); + } + + #[test] + fn strip_citations_auto_closes_unterminated_citation_at_eof() { + let (visible, citations) = strip_citations("xy"); + + assert_eq!(visible, "x"); + assert_eq!(citations, vec!["y".to_string()]); + } + + #[test] + fn citation_parser_does_not_support_nested_tags() { + let (visible, citations) = strip_citations( + "axyzb", + ); + + assert_eq!(visible, "azb"); + assert_eq!(citations, vec!["xy".to_string()]); + } +} diff --git a/codex-rs/utils/stream-parser/src/inline_hidden_tag.rs b/codex-rs/utils/stream-parser/src/inline_hidden_tag.rs new file mode 100644 index 000000000..0b7501cdb --- /dev/null +++ b/codex-rs/utils/stream-parser/src/inline_hidden_tag.rs @@ -0,0 +1,323 @@ +use crate::StreamTextChunk; +use crate::StreamTextParser; + +/// One hidden inline tag extracted by [`InlineHiddenTagParser`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ExtractedInlineTag { + pub tag: T, + pub content: String, +} + +/// Literal tag specification used by [`InlineHiddenTagParser`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct InlineTagSpec { + pub tag: T, + pub open: &'static str, + pub close: &'static str, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct ActiveTag { + tag: T, + close: &'static str, + content: String, +} + +/// Generic streaming parser that hides configured inline tags and extracts their contents. +/// +/// Example: +/// - input: `hello doc A world` +/// - visible output: `hello world` +/// - extracted: `["doc A"]` +/// +/// Matching is literal and non-nested. If EOF is reached while a tag is still open, the parser +/// auto-closes it and returns the buffered content as extracted data. +#[derive(Debug)] +pub struct InlineHiddenTagParser +where + T: Clone + Eq, +{ + specs: Vec>, + pending: String, + active: Option>, +} + +impl InlineHiddenTagParser +where + T: Clone + Eq, +{ + /// Create a parser for one or more hidden inline tags. + pub fn new(specs: Vec>) -> Self { + assert!( + !specs.is_empty(), + "InlineHiddenTagParser requires at least one tag spec" + ); + for spec in &specs { + assert!( + !spec.open.is_empty(), + "InlineHiddenTagParser requires non-empty open delimiters" + ); + assert!( + !spec.close.is_empty(), + "InlineHiddenTagParser requires non-empty close delimiters" + ); + } + Self { + specs, + pending: String::new(), + active: None, + } + } + + fn find_next_open(&self) -> Option<(usize, usize)> { + self.specs + .iter() + .enumerate() + .filter_map(|(idx, spec)| { + self.pending + .find(spec.open) + .map(|pos| (pos, spec.open.len(), idx)) + }) + .min_by(|(pos_a, len_a, idx_a), (pos_b, len_b, idx_b)| { + pos_a + .cmp(pos_b) + .then_with(|| len_b.cmp(len_a)) + .then_with(|| idx_a.cmp(idx_b)) + }) + .map(|(pos, _len, idx)| (pos, idx)) + } + + fn max_open_prefix_suffix_len(&self) -> usize { + self.specs + .iter() + .map(|spec| longest_suffix_prefix_len(&self.pending, spec.open)) + .max() + .map_or(0, std::convert::identity) + } + + fn push_visible_prefix(out: &mut StreamTextChunk>, pending: &str) { + if !pending.is_empty() { + out.visible_text.push_str(pending); + } + } + + fn drain_visible_to_suffix_match( + &mut self, + out: &mut StreamTextChunk>, + keep_suffix_len: usize, + ) { + let take = self.pending.len().saturating_sub(keep_suffix_len); + if take == 0 { + return; + } + Self::push_visible_prefix(out, &self.pending[..take]); + self.pending.drain(..take); + } +} + +impl StreamTextParser for InlineHiddenTagParser +where + T: Clone + Eq, +{ + type Extracted = ExtractedInlineTag; + + fn push_str(&mut self, chunk: &str) -> StreamTextChunk { + self.pending.push_str(chunk); + let mut out = StreamTextChunk::default(); + + loop { + if let Some(close) = self.active.as_ref().map(|active| active.close) { + if let Some(close_idx) = self.pending.find(close) { + let Some(mut active) = self.active.take() else { + continue; + }; + active.content.push_str(&self.pending[..close_idx]); + out.extracted.push(ExtractedInlineTag { + tag: active.tag, + content: active.content, + }); + let close_len = close.len(); + self.pending.drain(..close_idx + close_len); + continue; + } + + let keep = longest_suffix_prefix_len(&self.pending, close); + let take = self.pending.len().saturating_sub(keep); + if take > 0 { + if let Some(active) = self.active.as_mut() { + active.content.push_str(&self.pending[..take]); + } + self.pending.drain(..take); + } + break; + } + + if let Some((open_idx, spec_idx)) = self.find_next_open() { + Self::push_visible_prefix(&mut out, &self.pending[..open_idx]); + let spec = &self.specs[spec_idx]; + let open_len = spec.open.len(); + self.pending.drain(..open_idx + open_len); + self.active = Some(ActiveTag { + tag: spec.tag.clone(), + close: spec.close, + content: String::new(), + }); + continue; + } + + let keep = self.max_open_prefix_suffix_len(); + self.drain_visible_to_suffix_match(&mut out, keep); + break; + } + + out + } + + fn finish(&mut self) -> StreamTextChunk { + let mut out = StreamTextChunk::default(); + + if let Some(mut active) = self.active.take() { + if !self.pending.is_empty() { + active.content.push_str(&self.pending); + self.pending.clear(); + } + out.extracted.push(ExtractedInlineTag { + tag: active.tag, + content: active.content, + }); + return out; + } + + if !self.pending.is_empty() { + out.visible_text.push_str(&self.pending); + self.pending.clear(); + } + + out + } +} + +fn longest_suffix_prefix_len(s: &str, needle: &str) -> usize { + let max = s.len().min(needle.len().saturating_sub(1)); + for k in (1..=max).rev() { + if needle.is_char_boundary(k) && s.ends_with(&needle[..k]) { + return k; + } + } + 0 +} + +#[cfg(test)] +mod tests { + use super::InlineHiddenTagParser; + use super::InlineTagSpec; + use crate::StreamTextChunk; + use crate::StreamTextParser; + use pretty_assertions::assert_eq; + + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + enum Tag { + A, + B, + } + + fn collect_chunks

(parser: &mut P, chunks: &[&str]) -> StreamTextChunk + where + P: StreamTextParser, + { + let mut all = StreamTextChunk::default(); + for chunk in chunks { + let next = parser.push_str(chunk); + all.visible_text.push_str(&next.visible_text); + all.extracted.extend(next.extracted); + } + let tail = parser.finish(); + all.visible_text.push_str(&tail.visible_text); + all.extracted.extend(tail.extracted); + all + } + + #[test] + fn generic_inline_parser_supports_multiple_tag_types() { + let mut parser = InlineHiddenTagParser::new(vec![ + InlineTagSpec { + tag: Tag::A, + open: "", + close: "", + }, + InlineTagSpec { + tag: Tag::B, + open: "", + close: "", + }, + ]); + + let out = collect_chunks(&mut parser, &["1x2y3"]); + + assert_eq!(out.visible_text, "123"); + assert_eq!(out.extracted.len(), 2); + assert_eq!(out.extracted[0].tag, Tag::A); + assert_eq!(out.extracted[0].content, "x"); + assert_eq!(out.extracted[1].tag, Tag::B); + assert_eq!(out.extracted[1].content, "y"); + } + + #[test] + fn generic_inline_parser_supports_non_ascii_tag_delimiters() { + let mut parser = InlineHiddenTagParser::new(vec![InlineTagSpec { + tag: Tag::A, + open: "<é>", + close: "", + }]); + + let out = collect_chunks(&mut parser, &["a<", "é>中b"]); + + assert_eq!(out.visible_text, "ab"); + assert_eq!(out.extracted.len(), 1); + assert_eq!(out.extracted[0].tag, Tag::A); + assert_eq!(out.extracted[0].content, "中"); + } + + #[test] + fn generic_inline_parser_prefers_longest_opener_at_same_offset() { + let mut parser = InlineHiddenTagParser::new(vec![ + InlineTagSpec { + tag: Tag::A, + open: "", + close: "", + }, + InlineTagSpec { + tag: Tag::B, + open: "", + close: "", + }, + ]); + + let out = collect_chunks(&mut parser, &["xyz"]); + + assert_eq!(out.visible_text, "xz"); + assert_eq!(out.extracted.len(), 1); + assert_eq!(out.extracted[0].tag, Tag::B); + assert_eq!(out.extracted[0].content, "y"); + } + + #[test] + #[should_panic(expected = "non-empty open delimiters")] + fn generic_inline_parser_rejects_empty_open_delimiter() { + let _ = InlineHiddenTagParser::new(vec![InlineTagSpec { + tag: Tag::A, + open: "", + close: "", + }]); + } + + #[test] + #[should_panic(expected = "non-empty close delimiters")] + fn generic_inline_parser_rejects_empty_close_delimiter() { + let _ = InlineHiddenTagParser::new(vec![InlineTagSpec { + tag: Tag::A, + open: "", + close: "", + }]); + } +} diff --git a/codex-rs/utils/stream-parser/src/lib.rs b/codex-rs/utils/stream-parser/src/lib.rs new file mode 100644 index 000000000..2cf91ed45 --- /dev/null +++ b/codex-rs/utils/stream-parser/src/lib.rs @@ -0,0 +1,23 @@ +mod assistant_text; +mod citation; +mod inline_hidden_tag; +mod proposed_plan; +mod stream_text; +mod tagged_line_parser; +mod utf8_stream; + +pub use assistant_text::AssistantTextChunk; +pub use assistant_text::AssistantTextStreamParser; +pub use citation::CitationStreamParser; +pub use citation::strip_citations; +pub use inline_hidden_tag::ExtractedInlineTag; +pub use inline_hidden_tag::InlineHiddenTagParser; +pub use inline_hidden_tag::InlineTagSpec; +pub use proposed_plan::ProposedPlanParser; +pub use proposed_plan::ProposedPlanSegment; +pub use proposed_plan::extract_proposed_plan_text; +pub use proposed_plan::strip_proposed_plan_blocks; +pub use stream_text::StreamTextChunk; +pub use stream_text::StreamTextParser; +pub use utf8_stream::Utf8StreamParser; +pub use utf8_stream::Utf8StreamParserError; diff --git a/codex-rs/utils/stream-parser/src/proposed_plan.rs b/codex-rs/utils/stream-parser/src/proposed_plan.rs new file mode 100644 index 000000000..cd3a2a352 --- /dev/null +++ b/codex-rs/utils/stream-parser/src/proposed_plan.rs @@ -0,0 +1,212 @@ +use crate::StreamTextChunk; +use crate::StreamTextParser; +use crate::tagged_line_parser::TagSpec; +use crate::tagged_line_parser::TaggedLineParser; +use crate::tagged_line_parser::TaggedLineSegment; + +const OPEN_TAG: &str = ""; +const CLOSE_TAG: &str = ""; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum PlanTag { + ProposedPlan, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ProposedPlanSegment { + Normal(String), + ProposedPlanStart, + ProposedPlanDelta(String), + ProposedPlanEnd, +} + +/// Parser for `` blocks emitted in plan mode. +/// +/// Implements [`StreamTextParser`] so callers can consume: +/// - `visible_text`: normal assistant text with plan blocks removed +/// - `extracted`: ordered plan segments (includes `Normal(...)` segments for ordering fidelity) +#[derive(Debug)] +pub struct ProposedPlanParser { + parser: TaggedLineParser, +} + +impl ProposedPlanParser { + pub fn new() -> Self { + Self { + parser: TaggedLineParser::new(vec![TagSpec { + open: OPEN_TAG, + close: CLOSE_TAG, + tag: PlanTag::ProposedPlan, + }]), + } + } +} + +impl Default for ProposedPlanParser { + fn default() -> Self { + Self::new() + } +} + +impl StreamTextParser for ProposedPlanParser { + type Extracted = ProposedPlanSegment; + + fn push_str(&mut self, chunk: &str) -> StreamTextChunk { + map_segments(self.parser.parse(chunk)) + } + + fn finish(&mut self) -> StreamTextChunk { + map_segments(self.parser.finish()) + } +} + +fn map_segments(segments: Vec>) -> StreamTextChunk { + let mut out = StreamTextChunk::default(); + for segment in segments { + let mapped = match segment { + TaggedLineSegment::Normal(text) => ProposedPlanSegment::Normal(text), + TaggedLineSegment::TagStart(PlanTag::ProposedPlan) => { + ProposedPlanSegment::ProposedPlanStart + } + TaggedLineSegment::TagDelta(PlanTag::ProposedPlan, text) => { + ProposedPlanSegment::ProposedPlanDelta(text) + } + TaggedLineSegment::TagEnd(PlanTag::ProposedPlan) => { + ProposedPlanSegment::ProposedPlanEnd + } + }; + if let ProposedPlanSegment::Normal(text) = &mapped { + out.visible_text.push_str(text); + } + out.extracted.push(mapped); + } + out +} + +pub fn strip_proposed_plan_blocks(text: &str) -> String { + let mut parser = ProposedPlanParser::new(); + let mut out = parser.push_str(text).visible_text; + out.push_str(&parser.finish().visible_text); + out +} + +pub fn extract_proposed_plan_text(text: &str) -> Option { + let mut parser = ProposedPlanParser::new(); + let mut plan_text = String::new(); + let mut saw_plan_block = false; + for segment in parser + .push_str(text) + .extracted + .into_iter() + .chain(parser.finish().extracted) + { + match segment { + ProposedPlanSegment::ProposedPlanStart => { + saw_plan_block = true; + plan_text.clear(); + } + ProposedPlanSegment::ProposedPlanDelta(delta) => { + plan_text.push_str(&delta); + } + ProposedPlanSegment::ProposedPlanEnd | ProposedPlanSegment::Normal(_) => {} + } + } + saw_plan_block.then_some(plan_text) +} + +#[cfg(test)] +mod tests { + use super::ProposedPlanParser; + use super::ProposedPlanSegment; + use super::extract_proposed_plan_text; + use super::strip_proposed_plan_blocks; + use crate::StreamTextChunk; + use crate::StreamTextParser; + use pretty_assertions::assert_eq; + + fn collect_chunks

(parser: &mut P, chunks: &[&str]) -> StreamTextChunk + where + P: StreamTextParser, + { + let mut all = StreamTextChunk::default(); + for chunk in chunks { + let next = parser.push_str(chunk); + all.visible_text.push_str(&next.visible_text); + all.extracted.extend(next.extracted); + } + let tail = parser.finish(); + all.visible_text.push_str(&tail.visible_text); + all.extracted.extend(tail.extracted); + all + } + + #[test] + fn streams_proposed_plan_segments_and_visible_text() { + let mut parser = ProposedPlanParser::new(); + let out = collect_chunks( + &mut parser, + &[ + "Intro text\n\n- step 1\n", + "\nOutro", + ], + ); + + assert_eq!(out.visible_text, "Intro text\nOutro"); + assert_eq!( + out.extracted, + vec![ + ProposedPlanSegment::Normal("Intro text\n".to_string()), + ProposedPlanSegment::ProposedPlanStart, + ProposedPlanSegment::ProposedPlanDelta("- step 1\n".to_string()), + ProposedPlanSegment::ProposedPlanEnd, + ProposedPlanSegment::Normal("Outro".to_string()), + ] + ); + } + + #[test] + fn preserves_non_tag_lines() { + let mut parser = ProposedPlanParser::new(); + let out = collect_chunks(&mut parser, &[" extra\n"]); + + assert_eq!(out.visible_text, " extra\n"); + assert_eq!( + out.extracted, + vec![ProposedPlanSegment::Normal( + " extra\n".to_string() + )] + ); + } + + #[test] + fn closes_unterminated_plan_block_on_finish() { + let mut parser = ProposedPlanParser::new(); + let out = collect_chunks(&mut parser, &["\n- step 1\n"]); + + assert_eq!(out.visible_text, ""); + assert_eq!( + out.extracted, + vec![ + ProposedPlanSegment::ProposedPlanStart, + ProposedPlanSegment::ProposedPlanDelta("- step 1\n".to_string()), + ProposedPlanSegment::ProposedPlanEnd, + ] + ); + } + + #[test] + fn strips_proposed_plan_blocks_from_text() { + let text = "before\n\n- step\n\nafter"; + assert_eq!(strip_proposed_plan_blocks(text), "before\nafter"); + } + + #[test] + fn extracts_proposed_plan_text() { + let text = "before\n\n- step\n\nafter"; + assert_eq!( + extract_proposed_plan_text(text), + Some("- step\n".to_string()) + ); + } +} diff --git a/codex-rs/utils/stream-parser/src/stream_text.rs b/codex-rs/utils/stream-parser/src/stream_text.rs new file mode 100644 index 000000000..2ba16ea3a --- /dev/null +++ b/codex-rs/utils/stream-parser/src/stream_text.rs @@ -0,0 +1,36 @@ +/// Incremental parser result for one pushed chunk (or final flush). +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct StreamTextChunk { + /// Text safe to render immediately. + pub visible_text: String, + /// Hidden payloads extracted from the chunk. + pub extracted: Vec, +} + +impl Default for StreamTextChunk { + fn default() -> Self { + Self { + visible_text: String::new(), + extracted: Vec::new(), + } + } +} + +impl StreamTextChunk { + /// Returns true when no visible text or extracted payloads were produced. + pub fn is_empty(&self) -> bool { + self.visible_text.is_empty() && self.extracted.is_empty() + } +} + +/// Trait for parsers that consume streamed text and emit visible text plus extracted payloads. +pub trait StreamTextParser { + /// Payload extracted by this parser (for example a citation body). + type Extracted; + + /// Feed a new text chunk. + fn push_str(&mut self, chunk: &str) -> StreamTextChunk; + + /// Flush any buffered state at end-of-stream (or end-of-item). + fn finish(&mut self) -> StreamTextChunk; +} diff --git a/codex-rs/core/src/tagged_block_parser.rs b/codex-rs/utils/stream-parser/src/tagged_line_parser.rs similarity index 74% rename from codex-rs/core/src/tagged_block_parser.rs rename to codex-rs/utils/stream-parser/src/tagged_line_parser.rs index 46ec012c3..dadc77ec3 100644 --- a/codex-rs/core/src/tagged_block_parser.rs +++ b/codex-rs/utils/stream-parser/src/tagged_line_parser.rs @@ -1,9 +1,7 @@ //! Line-based tag block parsing for streamed text. //! //! The parser buffers each line until it can disprove that the line is a tag, -//! which is required for tags that must appear alone on a line. For example, -//! Proposed Plan output uses `` and `` tags -//! on their own lines so clients can stream plan content separately. +//! which is required for tags that must appear alone on a line. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) struct TagSpec { @@ -21,17 +19,6 @@ pub(crate) enum TaggedLineSegment { } /// Stateful line parser that splits input into normal text vs tag blocks. -/// -/// How it works: -/// - While reading a line, we buffer characters until the line either finishes -/// (`\n`) or stops matching any tag prefix (after `trim_start`). -/// - If it stops matching a tag prefix, the buffered line is immediately -/// emitted as text and we continue in "plain text" mode until the next -/// newline. -/// - When a full line is available, we compare it to the open/close tags; tag -/// lines emit TagStart/TagEnd, otherwise the line is emitted as text. -/// - `finish()` flushes any buffered line and auto-closes an unterminated tag, -/// which keeps streaming resilient to missing closing tags. #[derive(Debug, Default)] pub(crate) struct TaggedLineParser where @@ -56,7 +43,6 @@ where } } - /// Parse a streamed delta into line-aware segments. pub(crate) fn parse(&mut self, delta: &str) -> Vec> { let mut segments = Vec::new(); let mut run = String::new(); @@ -75,7 +61,6 @@ where if slug.is_empty() || self.is_tag_prefix(slug) { continue; } - // This line cannot be a tag line, so flush it immediately. let buffered = std::mem::take(&mut self.line_buffer); self.detect_tag = false; self.push_text(buffered, &mut segments); @@ -96,7 +81,6 @@ where segments } - /// Flush any buffered text and close an unterminated tag block. pub(crate) fn finish(&mut self) -> Vec> { let mut segments = Vec::new(); if !self.line_buffer.is_empty() { @@ -115,7 +99,6 @@ where push_segment(&mut segments, TaggedLineSegment::TagEnd(tag)); self.active_tag = None; } else { - // The buffered line never proved to be a tag line. self.push_text(buffered, &mut segments); } } @@ -210,12 +193,8 @@ where } segments.push(TaggedLineSegment::TagDelta(tag, delta)); } - TaggedLineSegment::TagStart(tag) => { - segments.push(TaggedLineSegment::TagStart(tag)); - } - TaggedLineSegment::TagEnd(tag) => { - segments.push(TaggedLineSegment::TagEnd(tag)); - } + TaggedLineSegment::TagStart(tag) => segments.push(TaggedLineSegment::TagStart(tag)), + TaggedLineSegment::TagEnd(tag) => segments.push(TaggedLineSegment::TagEnd(tag)), } } @@ -267,48 +246,4 @@ mod tests { vec![TaggedLineSegment::Normal(" extra\n".to_string())] ); } - - #[test] - fn closes_unterminated_tag_on_finish() { - let mut parser = parser(); - let mut segments = parser.parse("\nline\n"); - segments.extend(parser.finish()); - - assert_eq!( - segments, - vec![ - TaggedLineSegment::TagStart(Tag::Block), - TaggedLineSegment::TagDelta(Tag::Block, "line\n".to_string()), - TaggedLineSegment::TagEnd(Tag::Block), - ] - ); - } - - #[test] - fn accepts_tags_with_trailing_whitespace() { - let mut parser = parser(); - let mut segments = parser.parse(" \nline\n \n"); - segments.extend(parser.finish()); - - assert_eq!( - segments, - vec![ - TaggedLineSegment::TagStart(Tag::Block), - TaggedLineSegment::TagDelta(Tag::Block, "line\n".to_string()), - TaggedLineSegment::TagEnd(Tag::Block), - ] - ); - } - - #[test] - fn passes_through_plain_text() { - let mut parser = parser(); - let mut segments = parser.parse("plain text\n"); - segments.extend(parser.finish()); - - assert_eq!( - segments, - vec![TaggedLineSegment::Normal("plain text\n".to_string())] - ); - } } diff --git a/codex-rs/utils/stream-parser/src/utf8_stream.rs b/codex-rs/utils/stream-parser/src/utf8_stream.rs new file mode 100644 index 000000000..f9cd31eec --- /dev/null +++ b/codex-rs/utils/stream-parser/src/utf8_stream.rs @@ -0,0 +1,333 @@ +use std::error::Error; +use std::fmt; + +use crate::StreamTextChunk; +use crate::StreamTextParser; + +/// Error returned by [`Utf8StreamParser`] when streamed bytes are not valid UTF-8. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Utf8StreamParserError { + /// The provided bytes contain an invalid UTF-8 sequence. + InvalidUtf8 { + /// Byte offset in the parser's buffered bytes where decoding failed. + valid_up_to: usize, + /// Length in bytes of the invalid sequence. + error_len: usize, + }, + /// EOF was reached with a buffered partial UTF-8 code point. + IncompleteUtf8AtEof, +} + +impl fmt::Display for Utf8StreamParserError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::InvalidUtf8 { + valid_up_to, + error_len, + } => write!( + f, + "invalid UTF-8 in streamed bytes at offset {valid_up_to} (error length {error_len})" + ), + Self::IncompleteUtf8AtEof => { + write!(f, "incomplete UTF-8 code point at end of stream") + } + } + } +} + +impl Error for Utf8StreamParserError {} + +/// Wraps a [`StreamTextParser`] and accepts raw bytes, buffering partial UTF-8 code points. +/// +/// This is useful when upstream data arrives as `&[u8]` and a code point may be split across +/// chunk boundaries (for example `0xC3` followed by `0xA9` for `é`). +#[derive(Debug)] +pub struct Utf8StreamParser

{ + inner: P, + pending_utf8: Vec, +} + +impl

Utf8StreamParser

+where + P: StreamTextParser, +{ + pub fn new(inner: P) -> Self { + Self { + inner, + pending_utf8: Vec::new(), + } + } + + /// Feed a raw byte chunk. + /// + /// If the chunk contains invalid UTF-8, this returns an error and rolls back the entire + /// pushed chunk so callers can decide how to recover without the inner parser seeing a partial + /// prefix from that chunk. + pub fn push_bytes( + &mut self, + chunk: &[u8], + ) -> Result, Utf8StreamParserError> { + let old_len = self.pending_utf8.len(); + self.pending_utf8.extend_from_slice(chunk); + + match std::str::from_utf8(&self.pending_utf8) { + Ok(text) => { + let out = self.inner.push_str(text); + self.pending_utf8.clear(); + Ok(out) + } + Err(err) => { + if let Some(error_len) = err.error_len() { + self.pending_utf8.truncate(old_len); + return Err(Utf8StreamParserError::InvalidUtf8 { + valid_up_to: err.valid_up_to(), + error_len, + }); + } + + let valid_up_to = err.valid_up_to(); + if valid_up_to == 0 { + return Ok(StreamTextChunk::default()); + } + + let text = match std::str::from_utf8(&self.pending_utf8[..valid_up_to]) { + Ok(text) => text, + Err(prefix_err) => { + self.pending_utf8.truncate(old_len); + let error_len = prefix_err.error_len().unwrap_or(0); + return Err(Utf8StreamParserError::InvalidUtf8 { + valid_up_to: prefix_err.valid_up_to(), + error_len, + }); + } + }; + let out = self.inner.push_str(text); + self.pending_utf8.drain(..valid_up_to); + Ok(out) + } + } + } + + pub fn finish(&mut self) -> Result, Utf8StreamParserError> { + if !self.pending_utf8.is_empty() { + match std::str::from_utf8(&self.pending_utf8) { + Ok(_) => {} + Err(err) => { + if let Some(error_len) = err.error_len() { + return Err(Utf8StreamParserError::InvalidUtf8 { + valid_up_to: err.valid_up_to(), + error_len, + }); + } + return Err(Utf8StreamParserError::IncompleteUtf8AtEof); + } + } + } + + let mut out = if self.pending_utf8.is_empty() { + StreamTextChunk::default() + } else { + let text = match std::str::from_utf8(&self.pending_utf8) { + Ok(text) => text, + Err(err) => { + let error_len = err.error_len().unwrap_or(0); + return Err(Utf8StreamParserError::InvalidUtf8 { + valid_up_to: err.valid_up_to(), + error_len, + }); + } + }; + let out = self.inner.push_str(text); + self.pending_utf8.clear(); + out + }; + + let mut tail = self.inner.finish(); + out.visible_text.push_str(&tail.visible_text); + out.extracted.append(&mut tail.extracted); + Ok(out) + } + + /// Return the wrapped parser if no undecoded UTF-8 bytes are buffered. + /// + /// Use [`Self::finish`] first if you want to flush buffered text into the wrapped parser. + pub fn into_inner(self) -> Result { + if self.pending_utf8.is_empty() { + return Ok(self.inner); + } + match std::str::from_utf8(&self.pending_utf8) { + Ok(_) => Ok(self.inner), + Err(err) => { + if let Some(error_len) = err.error_len() { + return Err(Utf8StreamParserError::InvalidUtf8 { + valid_up_to: err.valid_up_to(), + error_len, + }); + } + Err(Utf8StreamParserError::IncompleteUtf8AtEof) + } + } + } + + /// Return the wrapped parser without validating or flushing buffered undecoded bytes. + /// + /// This may drop a partial UTF-8 code point that was buffered across chunk boundaries. + pub fn into_inner_lossy(self) -> P { + self.inner + } +} + +#[cfg(test)] +mod tests { + use super::Utf8StreamParser; + use super::Utf8StreamParserError; + use crate::CitationStreamParser; + use crate::StreamTextChunk; + use crate::StreamTextParser; + + use pretty_assertions::assert_eq; + + fn collect_bytes( + parser: &mut Utf8StreamParser, + chunks: &[&[u8]], + ) -> Result, Utf8StreamParserError> { + let mut all = StreamTextChunk::default(); + for chunk in chunks { + let next = parser.push_bytes(chunk)?; + all.visible_text.push_str(&next.visible_text); + all.extracted.extend(next.extracted); + } + let tail = parser.finish()?; + all.visible_text.push_str(&tail.visible_text); + all.extracted.extend(tail.extracted); + Ok(all) + } + + #[test] + fn utf8_stream_parser_handles_split_code_points_across_chunks() { + let chunks: [&[u8]; 3] = [ + b"A\xC3", + b"\xA9\xE4", + b"\xB8\xADZ", + ]; + + let mut parser = Utf8StreamParser::new(CitationStreamParser::new()); + let out = match collect_bytes(&mut parser, &chunks) { + Ok(out) => out, + Err(err) => panic!("valid UTF-8 stream should parse: {err}"), + }; + + assert_eq!(out.visible_text, "AéZ"); + assert_eq!(out.extracted, vec!["中".to_string()]); + } + + #[test] + fn utf8_stream_parser_rolls_back_on_invalid_utf8_chunk() { + let mut parser = Utf8StreamParser::new(CitationStreamParser::new()); + + let first = match parser.push_bytes(&[0xC3]) { + Ok(out) => out, + Err(err) => panic!("leading byte may be buffered until next chunk: {err}"), + }; + assert!(first.is_empty()); + + let err = match parser.push_bytes(&[0x28]) { + Ok(out) => panic!("invalid continuation byte should error, got output: {out:?}"), + Err(err) => err, + }; + assert_eq!( + err, + Utf8StreamParserError::InvalidUtf8 { + valid_up_to: 0, + error_len: 1, + } + ); + + let second = match parser.push_bytes(&[0xA9, b'x']) { + Ok(out) => out, + Err(err) => panic!("state should still allow a valid continuation: {err}"), + }; + let tail = match parser.finish() { + Ok(out) => out, + Err(err) => panic!("stream should finish: {err}"), + }; + + assert_eq!(second.visible_text, "éx"); + assert!(second.extracted.is_empty()); + assert!(tail.is_empty()); + } + + #[test] + fn utf8_stream_parser_rolls_back_entire_chunk_when_invalid_byte_follows_valid_prefix() { + let mut parser = Utf8StreamParser::new(CitationStreamParser::new()); + + let err = match parser.push_bytes(b"ok\xFF") { + Ok(out) => panic!("invalid byte should error, got output: {out:?}"), + Err(err) => err, + }; + assert_eq!( + err, + Utf8StreamParserError::InvalidUtf8 { + valid_up_to: 2, + error_len: 1, + } + ); + + let next = match parser.push_bytes(b"!") { + Ok(out) => out, + Err(err) => panic!("parser should recover after rollback: {err}"), + }; + + assert_eq!(next.visible_text, "!"); + assert!(next.extracted.is_empty()); + } + + #[test] + fn utf8_stream_parser_errors_on_incomplete_code_point_at_eof() { + let mut parser = Utf8StreamParser::new(CitationStreamParser::new()); + + let out = match parser.push_bytes(&[0xE2, 0x82]) { + Ok(out) => out, + Err(err) => panic!("partial code point should be buffered: {err}"), + }; + assert!(out.is_empty()); + + let err = match parser.finish() { + Ok(out) => panic!("unfinished code point should error, got output: {out:?}"), + Err(err) => err, + }; + assert_eq!(err, Utf8StreamParserError::IncompleteUtf8AtEof); + } + + #[test] + fn utf8_stream_parser_into_inner_errors_when_partial_code_point_is_buffered() { + let mut parser = Utf8StreamParser::new(CitationStreamParser::new()); + + let out = match parser.push_bytes(&[0xC3]) { + Ok(out) => out, + Err(err) => panic!("partial code point should be buffered: {err}"), + }; + assert!(out.is_empty()); + + let err = match parser.into_inner() { + Ok(_) => panic!("buffered partial code point should be rejected"), + Err(err) => err, + }; + assert_eq!(err, Utf8StreamParserError::IncompleteUtf8AtEof); + } + + #[test] + fn utf8_stream_parser_into_inner_lossy_drops_buffered_partial_code_point() { + let mut parser = Utf8StreamParser::new(CitationStreamParser::new()); + + let out = match parser.push_bytes(&[0xC3]) { + Ok(out) => out, + Err(err) => panic!("partial code point should be buffered: {err}"), + }; + assert!(out.is_empty()); + + let mut inner = parser.into_inner_lossy(); + let tail = inner.finish(); + assert!(tail.is_empty()); + } +}