From e357fc723d144cbf8a79cc3f5e59b396ef2791bd Mon Sep 17 00:00:00 2001 From: Celia Chen Date: Tue, 11 Nov 2025 14:43:24 -0800 Subject: [PATCH] [app-server] add item started/completed events for turn items (#6517) This one should be quite straightforward, as it's just a translation of TurnItem events we already emit to ThreadItem that app-server exposes to customers. To test, cp my change to owen/app_server_test_client and do the following: ``` cargo build -p codex-cli RUST_LOG=codex_app_server=info CODEX_BIN=target/debug/codex cargo run -p codex-app-server-test-client -- send-message-v2 "hello" ``` example event before (still kept there for backward compatibility): ``` { < "method": "codex/event/item_completed", < "params": { < "conversationId": "019a74cc-fad9-7ab3-83a3-f42827b7b074", < "id": "0", < "msg": { < "item": { < "Reasoning": { < "id": "rs_03d183492e07e20a016913a936eb8c81a1a7671a103fee8afc", < "raw_content": [], < "summary_text": [ < "Hey! What would you like to work on? I can explore the repo, run specific tests, or implement a change. Let's keep it short and straightforward. There's no need for a lengthy introduction or elaborate planning, just a friendly greeting and an open offer to help. I want to make sure the user feels welcomed and understood right from the start. It's all about keeping the tone friendly and concise!" < ] < } < }, < "thread_id": "019a74cc-fad9-7ab3-83a3-f42827b7b074", < "turn_id": "0", < "type": "item_completed" < } < } < } ``` after (v2): ``` < { < "method": "item/completed", < "params": { < "item": { < "id": "rs_03d183492e07e20a016913a936eb8c81a1a7671a103fee8afc", < "text": "Hey! What would you like to work on? I can explore the repo, run specific tests, or implement a change. Let's keep it short and straightforward. There's no need for a lengthy introduction or elaborate planning, just a friendly greeting and an open offer to help. I want to make sure the user feels welcomed and understood right from the start. It's all about keeping the tone friendly and concise!", < "type": "reasoning" < } < } < } ``` --- .../app-server-protocol/src/protocol/v2.rs | 146 ++++++++++++++++++ .../app-server/src/codex_message_processor.rs | 16 ++ 2 files changed, 162 insertions(+) diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 2afdb4575..88f0f7832 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -6,6 +6,8 @@ use codex_protocol::ConversationId; use codex_protocol::account::PlanType; use codex_protocol::config_types::ReasoningEffort; use codex_protocol::config_types::ReasoningSummary; +use codex_protocol::items::AgentMessageContent as CoreAgentMessageContent; +use codex_protocol::items::TurnItem as CoreTurnItem; use codex_protocol::protocol::RateLimitSnapshot as CoreRateLimitSnapshot; use codex_protocol::protocol::RateLimitWindow as CoreRateLimitWindow; use codex_protocol::user_input::UserInput as CoreUserInput; @@ -457,6 +459,17 @@ impl UserInput { } } +impl From for UserInput { + fn from(value: CoreUserInput) -> Self { + match value { + CoreUserInput::Text { text } => UserInput::Text { text }, + CoreUserInput::Image { image_url } => UserInput::Image { url: image_url }, + CoreUserInput::LocalImage { path } => UserInput::LocalImage { path }, + _ => unreachable!("unsupported user input variant"), + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(tag = "type", rename_all = "camelCase")] #[ts(tag = "type")] @@ -514,6 +527,42 @@ pub enum ThreadItem { }, } +impl From for ThreadItem { + fn from(value: CoreTurnItem) -> Self { + match value { + CoreTurnItem::UserMessage(user) => ThreadItem::UserMessage { + id: user.id, + content: user.content.into_iter().map(UserInput::from).collect(), + }, + CoreTurnItem::AgentMessage(agent) => { + let text = agent + .content + .into_iter() + .map(|entry| match entry { + CoreAgentMessageContent::Text { text } => text, + }) + .collect::(); + ThreadItem::AgentMessage { id: agent.id, text } + } + CoreTurnItem::Reasoning(reasoning) => { + let text = if !reasoning.summary_text.is_empty() { + reasoning.summary_text.join("\n") + } else { + reasoning.raw_content.join("\n") + }; + ThreadItem::Reasoning { + id: reasoning.id, + text, + } + } + CoreTurnItem::WebSearch(search) => ThreadItem::WebSearch { + id: search.id, + query: search.query, + }, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] @@ -708,3 +757,100 @@ pub struct AccountLoginCompletedNotification { pub success: bool, pub error: Option, } + +#[cfg(test)] +mod tests { + use super::*; + use codex_protocol::items::AgentMessageContent; + use codex_protocol::items::AgentMessageItem; + use codex_protocol::items::ReasoningItem; + use codex_protocol::items::TurnItem; + use codex_protocol::items::UserMessageItem; + use codex_protocol::items::WebSearchItem; + use codex_protocol::user_input::UserInput as CoreUserInput; + use pretty_assertions::assert_eq; + use std::path::PathBuf; + + #[test] + fn core_turn_item_into_thread_item_converts_supported_variants() { + let user_item = TurnItem::UserMessage(UserMessageItem { + id: "user-1".to_string(), + content: vec![ + CoreUserInput::Text { + text: "hello".to_string(), + }, + CoreUserInput::Image { + image_url: "https://example.com/image.png".to_string(), + }, + CoreUserInput::LocalImage { + path: PathBuf::from("local/image.png"), + }, + ], + }); + + assert_eq!( + ThreadItem::from(user_item), + ThreadItem::UserMessage { + id: "user-1".to_string(), + content: vec![ + UserInput::Text { + text: "hello".to_string(), + }, + UserInput::Image { + url: "https://example.com/image.png".to_string(), + }, + UserInput::LocalImage { + path: PathBuf::from("local/image.png"), + }, + ], + } + ); + + let agent_item = TurnItem::AgentMessage(AgentMessageItem { + id: "agent-1".to_string(), + content: vec![ + AgentMessageContent::Text { + text: "Hello ".to_string(), + }, + AgentMessageContent::Text { + text: "world".to_string(), + }, + ], + }); + + assert_eq!( + ThreadItem::from(agent_item), + ThreadItem::AgentMessage { + id: "agent-1".to_string(), + text: "Hello world".to_string(), + } + ); + + let reasoning_item = TurnItem::Reasoning(ReasoningItem { + id: "reasoning-1".to_string(), + summary_text: vec!["line one".to_string(), "line two".to_string()], + raw_content: vec![], + }); + + assert_eq!( + ThreadItem::from(reasoning_item), + ThreadItem::Reasoning { + id: "reasoning-1".to_string(), + text: "line one\nline two".to_string(), + } + ); + + let search_item = TurnItem::WebSearch(WebSearchItem { + id: "search-1".to_string(), + query: "docs".to_string(), + }); + + assert_eq!( + ThreadItem::from(search_item), + ThreadItem::WebSearch { + id: "search-1".to_string(), + query: "docs".to_string(), + } + ); + } +} diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 8d97f82c5..e465117da 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -46,6 +46,8 @@ use codex_app_server_protocol::GitDiffToRemoteResponse; use codex_app_server_protocol::InputItem as WireInputItem; use codex_app_server_protocol::InterruptConversationParams; use codex_app_server_protocol::InterruptConversationResponse; +use codex_app_server_protocol::ItemCompletedNotification; +use codex_app_server_protocol::ItemStartedNotification; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::ListConversationsParams; use codex_app_server_protocol::ListConversationsResponse; @@ -2609,6 +2611,20 @@ async fn apply_bespoke_event_handling( .await; } } + EventMsg::ItemStarted(item_started_event) => { + let item: ThreadItem = item_started_event.item.clone().into(); + let notification = ItemStartedNotification { item }; + outgoing + .send_server_notification(ServerNotification::ItemStarted(notification)) + .await; + } + EventMsg::ItemCompleted(item_completed_event) => { + let item: ThreadItem = item_completed_event.item.clone().into(); + let notification = ItemCompletedNotification { item }; + outgoing + .send_server_notification(ServerNotification::ItemCompleted(notification)) + .await; + } // If this is a TurnAborted, reply to any pending interrupt requests. EventMsg::TurnAborted(turn_aborted_event) => { let pending = {