diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 6ac4201d1..bd7fd8e28 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -525,6 +525,8 @@ server_notification_definitions! { TurnPlanUpdated => "turn/plan/updated" (v2::TurnPlanUpdatedNotification), ItemStarted => "item/started" (v2::ItemStartedNotification), ItemCompleted => "item/completed" (v2::ItemCompletedNotification), + /// This event is internal-only. Used by Codex Cloud. + RawResponseItemCompleted => "rawResponseItem/completed" (v2::RawResponseItemCompletedNotification), AgentMessageDelta => "item/agentMessage/delta" (v2::AgentMessageDeltaNotification), CommandExecutionOutputDelta => "item/commandExecution/outputDelta" (v2::CommandExecutionOutputDeltaNotification), TerminalInteraction => "item/commandExecution/terminalInteraction" (v2::TerminalInteractionNotification), diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index b69ec0632..2acea04e9 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -860,6 +860,12 @@ pub struct ThreadStartParams { pub config: Option>, pub base_instructions: Option, pub developer_instructions: Option, + /// If true, opt into emitting raw response items on the event stream. + /// + /// This is for internal use only (e.g. Codex Cloud). + /// (TODO): Figure out a better way to categorize internal / experimental events & protocols. + #[serde(default)] + pub experimental_raw_events: bool, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] @@ -1581,6 +1587,15 @@ pub struct ItemCompletedNotification { pub turn_id: String, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct RawResponseItemCompletedNotification { + pub thread_id: String, + pub turn_id: String, + pub item: ResponseItem, +} + // Item-specific progress notifications #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index b0161cd9f..dec9d8c08 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -31,6 +31,7 @@ use codex_app_server_protocol::McpToolCallResult; use codex_app_server_protocol::McpToolCallStatus; use codex_app_server_protocol::PatchApplyStatus; use codex_app_server_protocol::PatchChangeKind as V2PatchChangeKind; +use codex_app_server_protocol::RawResponseItemCompletedNotification; use codex_app_server_protocol::ReasoningSummaryPartAddedNotification; use codex_app_server_protocol::ReasoningSummaryTextDeltaNotification; use codex_app_server_protocol::ReasoningTextDeltaNotification; @@ -451,6 +452,16 @@ pub(crate) async fn apply_bespoke_event_handling( .send_server_notification(ServerNotification::ItemCompleted(completed)) .await; } + EventMsg::RawResponseItem(raw_response_item_event) => { + maybe_emit_raw_response_item_completed( + api_version, + conversation_id, + &event_turn_id, + raw_response_item_event.item, + outgoing.as_ref(), + ) + .await; + } EventMsg::PatchApplyBegin(patch_begin_event) => { // Until we migrate the core to be aware of a first class FileChangeItem // and emit the corresponding EventMsg, we repurpose the call_id as the item_id. @@ -820,6 +831,27 @@ async fn complete_command_execution_item( .await; } +async fn maybe_emit_raw_response_item_completed( + api_version: ApiVersion, + conversation_id: ConversationId, + turn_id: &str, + item: codex_protocol::models::ResponseItem, + outgoing: &OutgoingMessageSender, +) { + let ApiVersion::V2 = api_version else { + return; + }; + + let notification = RawResponseItemCompletedNotification { + thread_id: conversation_id.to_string(), + turn_id: turn_id.to_string(), + item, + }; + outgoing + .send_server_notification(ServerNotification::RawResponseItemCompleted(notification)) + .await; +} + async fn find_and_remove_turn_summary( conversation_id: ConversationId, turn_summary_store: &TurnSummaryStore, diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 276e357d2..0bcac39a5 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -1373,9 +1373,13 @@ impl CodexMessageProcessor { }; // Auto-attach a conversation listener when starting a thread. - // Use the same behavior as the v1 API with experimental_raw_events=false. + // Use the same behavior as the v1 API, with opt-in support for raw item events. if let Err(err) = self - .attach_conversation_listener(conversation_id, false, ApiVersion::V2) + .attach_conversation_listener( + conversation_id, + params.experimental_raw_events, + ApiVersion::V2, + ) .await { tracing::warn!(