From 641d5268fa581bbdff401e5e8e8cc346976b92fd Mon Sep 17 00:00:00 2001 From: Celia Chen Date: Tue, 10 Feb 2026 19:56:01 -0800 Subject: [PATCH] chore: persist turn_id in rollout session and make turn_id uuid based (#11246) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Problem: 1. turn id is constructed in-memory; 2. on resuming threads, turn_id might not be unique; 3. client cannot no the boundary of a turn from rollout files easily. This PR does three things: 1. persist `task_started` and `task_complete` events; 1. persist `turn_id` in rollout turn events; 5. generate turn_id as unique uuids instead of incrementing it in memory. This helps us resolve the issue of clients wanting to have unique turn ids for resuming a thread, and knowing the boundry of each turn in rollout files. example debug logs ``` 2026-02-11T00:32:10.746876Z DEBUG codex_app_server_protocol::protocol::thread_history: built turn from rollout items turn_index=8 turn=Turn { id: "019c4a07-d809-74c3-bc4b-fd9618487b4b", items: [UserMessage { id: "item-24", content: [Text { text: "hi", text_elements: [] }] }, AgentMessage { id: "item-25", text: "Hi. I’m in the workspace with your current changes loaded and ready. Send the next task and I’ll execute it end-to-end." }], status: Completed, error: None } 2026-02-11T00:32:10.746888Z DEBUG codex_app_server_protocol::protocol::thread_history: built turn from rollout items turn_index=9 turn=Turn { id: "019c4a18-1004-76c0-a0fb-a77610f6a9b8", items: [UserMessage { id: "item-26", content: [Text { text: "hello", text_elements: [] }] }, AgentMessage { id: "item-27", text: "Hello. Ready for the next change in `codex-rs`; I can continue from the current in-progress diff or start a new task." }], status: Completed, error: None } 2026-02-11T00:32:10.746899Z DEBUG codex_app_server_protocol::protocol::thread_history: built turn from rollout items turn_index=10 turn=Turn { id: "019c4a19-41f0-7db0-ad78-74f1503baeb8", items: [UserMessage { id: "item-28", content: [Text { text: "hello", text_elements: [] }] }, AgentMessage { id: "item-29", text: "Hello. Send the specific change you want in `codex-rs`, and I’ll implement it and run the required checks." }], status: Completed, error: None } ``` backward compatibility: if you try to resume an old session without task_started and task_complete event populated, the following happens: - If you resume and do nothing: those reconstructed historical IDs can differ next time you resume. - If you resume and send a new turn: the new turn gets a fresh UUID from live submission flow and is persisted, so that new turn’s ID is stable on later resumes. I think this behavior is fine, because we only care about deterministic turn id once a turn is triggered. --- .../schema/json/EventMsg.json | 28 ++ .../schema/json/ServerNotification.json | 14 + .../codex_app_server_protocol.schemas.json | 14 + .../json/v1/ForkConversationResponse.json | 14 + .../json/v1/ResumeConversationResponse.json | 14 + .../v1/SessionConfiguredNotification.json | 14 + .../schema/typescript/TurnAbortedEvent.ts | 2 +- .../schema/typescript/TurnCompleteEvent.ts | 2 +- .../schema/typescript/TurnStartedEvent.ts | 2 +- .../src/protocol/thread_history.rs | 304 ++++++++++++++---- codex-rs/app-server-test-client/src/lib.rs | 64 ++++ .../app-server/src/bespoke_event_handling.rs | 10 +- .../app-server/src/codex_message_processor.rs | 70 ++-- .../app-server/tests/suite/send_message.rs | 1 + codex-rs/core/src/agent/control.rs | 3 + codex-rs/core/src/codex.rs | 10 +- codex-rs/core/src/codex_delegate.rs | 5 +- codex-rs/core/src/compact.rs | 2 + codex-rs/core/src/compact_remote.rs | 1 + codex-rs/core/src/rollout/policy.rs | 6 +- codex-rs/core/src/tasks/mod.rs | 10 +- codex-rs/core/src/tasks/user_shell.rs | 1 + codex-rs/core/tests/suite/resume.rs | 14 + codex-rs/core/tests/suite/resume_warning.rs | 1 + .../src/event_processor_with_human_output.rs | 4 +- .../src/event_processor_with_jsonl_output.rs | 1 + .../tests/event_processor_with_json_output.rs | 5 + codex-rs/mcp-server/src/codex_tool_runner.rs | 4 +- codex-rs/protocol/src/protocol.rs | 23 ++ codex-rs/tui/src/chatwidget.rs | 6 +- codex-rs/tui/src/chatwidget/tests.rs | 35 ++ codex-rs/tui/src/lib.rs | 1 + 32 files changed, 558 insertions(+), 127 deletions(-) diff --git a/codex-rs/app-server-protocol/schema/json/EventMsg.json b/codex-rs/app-server-protocol/schema/json/EventMsg.json index 5991401b8..7151f0d6b 100644 --- a/codex-rs/app-server-protocol/schema/json/EventMsg.json +++ b/codex-rs/app-server-protocol/schema/json/EventMsg.json @@ -560,6 +560,9 @@ "null" ] }, + "turn_id": { + "type": "string" + }, "type": { "enum": [ "task_started" @@ -569,6 +572,7 @@ } }, "required": [ + "turn_id", "type" ], "title": "TaskStartedEventMsg", @@ -583,6 +587,9 @@ "null" ] }, + "turn_id": { + "type": "string" + }, "type": { "enum": [ "task_complete" @@ -592,6 +599,7 @@ } }, "required": [ + "turn_id", "type" ], "title": "TaskCompleteEventMsg", @@ -2129,6 +2137,12 @@ "reason": { "$ref": "#/definitions/TurnAbortReason" }, + "turn_id": { + "type": [ + "string", + "null" + ] + }, "type": { "enum": [ "turn_aborted" @@ -5303,6 +5317,9 @@ "null" ] }, + "turn_id": { + "type": "string" + }, "type": { "enum": [ "task_started" @@ -5312,6 +5329,7 @@ } }, "required": [ + "turn_id", "type" ], "title": "TaskStartedEventMsg", @@ -5326,6 +5344,9 @@ "null" ] }, + "turn_id": { + "type": "string" + }, "type": { "enum": [ "task_complete" @@ -5335,6 +5356,7 @@ } }, "required": [ + "turn_id", "type" ], "title": "TaskCompleteEventMsg", @@ -6872,6 +6894,12 @@ "reason": { "$ref": "#/definitions/TurnAbortReason" }, + "turn_id": { + "type": [ + "string", + "null" + ] + }, "type": { "enum": [ "turn_aborted" diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index 69c6c2cb9..39fba5b5a 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -1204,6 +1204,9 @@ "null" ] }, + "turn_id": { + "type": "string" + }, "type": { "enum": [ "task_started" @@ -1213,6 +1216,7 @@ } }, "required": [ + "turn_id", "type" ], "title": "TaskStartedEventMsg", @@ -1227,6 +1231,9 @@ "null" ] }, + "turn_id": { + "type": "string" + }, "type": { "enum": [ "task_complete" @@ -1236,6 +1243,7 @@ } }, "required": [ + "turn_id", "type" ], "title": "TaskCompleteEventMsg", @@ -2773,6 +2781,12 @@ "reason": { "$ref": "#/definitions/TurnAbortReason" }, + "turn_id": { + "type": [ + "string", + "null" + ] + }, "type": { "enum": [ "turn_aborted" diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 88fb5ddbd..633c485e2 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -2573,6 +2573,9 @@ "null" ] }, + "turn_id": { + "type": "string" + }, "type": { "enum": [ "task_started" @@ -2582,6 +2585,7 @@ } }, "required": [ + "turn_id", "type" ], "title": "TaskStartedEventMsg", @@ -2596,6 +2600,9 @@ "null" ] }, + "turn_id": { + "type": "string" + }, "type": { "enum": [ "task_complete" @@ -2605,6 +2612,7 @@ } }, "required": [ + "turn_id", "type" ], "title": "TaskCompleteEventMsg", @@ -4142,6 +4150,12 @@ "reason": { "$ref": "#/definitions/TurnAbortReason" }, + "turn_id": { + "type": [ + "string", + "null" + ] + }, "type": { "enum": [ "turn_aborted" diff --git a/codex-rs/app-server-protocol/schema/json/v1/ForkConversationResponse.json b/codex-rs/app-server-protocol/schema/json/v1/ForkConversationResponse.json index 90949d56f..027caee1c 100644 --- a/codex-rs/app-server-protocol/schema/json/v1/ForkConversationResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v1/ForkConversationResponse.json @@ -560,6 +560,9 @@ "null" ] }, + "turn_id": { + "type": "string" + }, "type": { "enum": [ "task_started" @@ -569,6 +572,7 @@ } }, "required": [ + "turn_id", "type" ], "title": "TaskStartedEventMsg", @@ -583,6 +587,9 @@ "null" ] }, + "turn_id": { + "type": "string" + }, "type": { "enum": [ "task_complete" @@ -592,6 +599,7 @@ } }, "required": [ + "turn_id", "type" ], "title": "TaskCompleteEventMsg", @@ -2129,6 +2137,12 @@ "reason": { "$ref": "#/definitions/TurnAbortReason" }, + "turn_id": { + "type": [ + "string", + "null" + ] + }, "type": { "enum": [ "turn_aborted" diff --git a/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationResponse.json b/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationResponse.json index b25d2dbed..e6f3b145a 100644 --- a/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationResponse.json @@ -560,6 +560,9 @@ "null" ] }, + "turn_id": { + "type": "string" + }, "type": { "enum": [ "task_started" @@ -569,6 +572,7 @@ } }, "required": [ + "turn_id", "type" ], "title": "TaskStartedEventMsg", @@ -583,6 +587,9 @@ "null" ] }, + "turn_id": { + "type": "string" + }, "type": { "enum": [ "task_complete" @@ -592,6 +599,7 @@ } }, "required": [ + "turn_id", "type" ], "title": "TaskCompleteEventMsg", @@ -2129,6 +2137,12 @@ "reason": { "$ref": "#/definitions/TurnAbortReason" }, + "turn_id": { + "type": [ + "string", + "null" + ] + }, "type": { "enum": [ "turn_aborted" diff --git a/codex-rs/app-server-protocol/schema/json/v1/SessionConfiguredNotification.json b/codex-rs/app-server-protocol/schema/json/v1/SessionConfiguredNotification.json index fa9e144c4..1b717eefc 100644 --- a/codex-rs/app-server-protocol/schema/json/v1/SessionConfiguredNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v1/SessionConfiguredNotification.json @@ -560,6 +560,9 @@ "null" ] }, + "turn_id": { + "type": "string" + }, "type": { "enum": [ "task_started" @@ -569,6 +572,7 @@ } }, "required": [ + "turn_id", "type" ], "title": "TaskStartedEventMsg", @@ -583,6 +587,9 @@ "null" ] }, + "turn_id": { + "type": "string" + }, "type": { "enum": [ "task_complete" @@ -592,6 +599,7 @@ } }, "required": [ + "turn_id", "type" ], "title": "TaskCompleteEventMsg", @@ -2129,6 +2137,12 @@ "reason": { "$ref": "#/definitions/TurnAbortReason" }, + "turn_id": { + "type": [ + "string", + "null" + ] + }, "type": { "enum": [ "turn_aborted" diff --git a/codex-rs/app-server-protocol/schema/typescript/TurnAbortedEvent.ts b/codex-rs/app-server-protocol/schema/typescript/TurnAbortedEvent.ts index eb0bf24c1..0b4e9075b 100644 --- a/codex-rs/app-server-protocol/schema/typescript/TurnAbortedEvent.ts +++ b/codex-rs/app-server-protocol/schema/typescript/TurnAbortedEvent.ts @@ -3,4 +3,4 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { TurnAbortReason } from "./TurnAbortReason"; -export type TurnAbortedEvent = { reason: TurnAbortReason, }; +export type TurnAbortedEvent = { turn_id: string | null, reason: TurnAbortReason, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/TurnCompleteEvent.ts b/codex-rs/app-server-protocol/schema/typescript/TurnCompleteEvent.ts index ab271ba9e..6987d59f9 100644 --- a/codex-rs/app-server-protocol/schema/typescript/TurnCompleteEvent.ts +++ b/codex-rs/app-server-protocol/schema/typescript/TurnCompleteEvent.ts @@ -2,4 +2,4 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -export type TurnCompleteEvent = { last_agent_message: string | null, }; +export type TurnCompleteEvent = { turn_id: string, last_agent_message: string | null, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/TurnStartedEvent.ts b/codex-rs/app-server-protocol/schema/typescript/TurnStartedEvent.ts index 91598aa78..14c0d7670 100644 --- a/codex-rs/app-server-protocol/schema/typescript/TurnStartedEvent.ts +++ b/codex-rs/app-server-protocol/schema/typescript/TurnStartedEvent.ts @@ -3,4 +3,4 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { ModeKind } from "./ModeKind"; -export type TurnStartedEvent = { model_context_window: bigint | null, collaboration_mode_kind: ModeKind, }; +export type TurnStartedEvent = { turn_id: string, model_context_window: bigint | null, collaboration_mode_kind: ModeKind, }; diff --git a/codex-rs/app-server-protocol/src/protocol/thread_history.rs b/codex-rs/app-server-protocol/src/protocol/thread_history.rs index 39efe4765..231ee99b2 100644 --- a/codex-rs/app-server-protocol/src/protocol/thread_history.rs +++ b/codex-rs/app-server-protocol/src/protocol/thread_history.rs @@ -5,21 +5,25 @@ use crate::protocol::v2::TurnStatus; use crate::protocol::v2::UserInput; use codex_protocol::protocol::AgentReasoningEvent; use codex_protocol::protocol::AgentReasoningRawContentEvent; +use codex_protocol::protocol::CompactedItem; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::ItemCompletedEvent; +use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::ThreadRolledBackEvent; use codex_protocol::protocol::TurnAbortedEvent; +use codex_protocol::protocol::TurnCompleteEvent; +use codex_protocol::protocol::TurnStartedEvent; use codex_protocol::protocol::UserMessageEvent; +use uuid::Uuid; -/// Convert persisted [`EventMsg`] entries into a sequence of [`Turn`] values. +/// Convert persisted [`RolloutItem`] entries into a sequence of [`Turn`] values. /// -/// The purpose of this is to convert the EventMsgs persisted in a rollout file -/// into a sequence of Turns and ThreadItems, which allows the client to render -/// the historical messages when resuming a thread. -pub fn build_turns_from_event_msgs(events: &[EventMsg]) -> Vec { +/// When available, this uses `TurnContext.turn_id` as the canonical turn id so +/// resumed/rebuilt thread history preserves the original turn identifiers. +pub fn build_turns_from_rollout_items(items: &[RolloutItem]) -> Vec { let mut builder = ThreadHistoryBuilder::new(); - for event in events { - builder.handle_event(event); + for item in items { + builder.handle_rollout_item(item); } builder.finish() } @@ -27,7 +31,6 @@ pub fn build_turns_from_event_msgs(events: &[EventMsg]) -> Vec { struct ThreadHistoryBuilder { turns: Vec, current_turn: Option, - next_turn_index: i64, next_item_index: i64, } @@ -36,7 +39,6 @@ impl ThreadHistoryBuilder { Self { turns: Vec::new(), current_turn: None, - next_turn_index: 1, next_item_index: 1, } } @@ -63,13 +65,36 @@ impl ThreadHistoryBuilder { EventMsg::ThreadRolledBack(payload) => self.handle_thread_rollback(payload), EventMsg::UndoCompleted(_) => {} EventMsg::TurnAborted(payload) => self.handle_turn_aborted(payload), + EventMsg::TurnStarted(payload) => self.handle_turn_started(payload), + EventMsg::TurnComplete(payload) => self.handle_turn_complete(payload), _ => {} } } + fn handle_rollout_item(&mut self, item: &RolloutItem) { + match item { + RolloutItem::EventMsg(event) => self.handle_event(event), + RolloutItem::Compacted(payload) => self.handle_compacted(payload), + RolloutItem::TurnContext(_) + | RolloutItem::SessionMeta(_) + | RolloutItem::ResponseItem(_) => {} + } + } + fn handle_user_message(&mut self, payload: &UserMessageEvent) { - self.finish_current_turn(); - let mut turn = self.new_turn(); + // User messages should stay in explicitly opened turns. For backward + // compatibility with older streams that did not open turns explicitly, + // close any implicit/inactive turn and start a fresh one for this input. + if let Some(turn) = self.current_turn.as_ref() + && !turn.opened_explicitly + && !(turn.saw_compaction && turn.items.is_empty()) + { + self.finish_current_turn(); + } + let mut turn = self + .current_turn + .take() + .unwrap_or_else(|| self.new_turn(None)); let id = self.next_item_id(); let content = self.build_user_inputs(payload); turn.items.push(ThreadItem::UserMessage { id, content }); @@ -147,6 +172,30 @@ impl ThreadHistoryBuilder { turn.status = TurnStatus::Interrupted; } + fn handle_turn_started(&mut self, payload: &TurnStartedEvent) { + self.finish_current_turn(); + self.current_turn = Some( + self.new_turn(Some(payload.turn_id.clone())) + .opened_explicitly(), + ); + } + + fn handle_turn_complete(&mut self, _payload: &TurnCompleteEvent) { + if let Some(current_turn) = self.current_turn.as_mut() { + current_turn.status = TurnStatus::Completed; + self.finish_current_turn(); + } + } + + /// Marks the current turn as containing a persisted compaction marker. + /// + /// This keeps compaction-only legacy turns from being dropped by + /// `finish_current_turn` when they have no renderable items and were not + /// explicitly opened. + fn handle_compacted(&mut self, _payload: &CompactedItem) { + self.ensure_turn().saw_compaction = true; + } + fn handle_thread_rollback(&mut self, payload: &ThreadRolledBackEvent) { self.finish_current_turn(); @@ -157,34 +206,33 @@ impl ThreadHistoryBuilder { self.turns.truncate(self.turns.len().saturating_sub(n)); } - // Re-number subsequent synthetic ids so the pruned history is consistent. - self.next_turn_index = - i64::try_from(self.turns.len().saturating_add(1)).unwrap_or(i64::MAX); let item_count: usize = self.turns.iter().map(|t| t.items.len()).sum(); self.next_item_index = i64::try_from(item_count.saturating_add(1)).unwrap_or(i64::MAX); } fn finish_current_turn(&mut self) { if let Some(turn) = self.current_turn.take() { - if turn.items.is_empty() { + if turn.items.is_empty() && !turn.opened_explicitly && !turn.saw_compaction { return; } self.turns.push(turn.into()); } } - fn new_turn(&mut self) -> PendingTurn { + fn new_turn(&mut self, id: Option) -> PendingTurn { PendingTurn { - id: self.next_turn_id(), + id: id.unwrap_or_else(|| Uuid::now_v7().to_string()), items: Vec::new(), error: None, status: TurnStatus::Completed, + opened_explicitly: false, + saw_compaction: false, } } fn ensure_turn(&mut self) -> &mut PendingTurn { if self.current_turn.is_none() { - let turn = self.new_turn(); + let turn = self.new_turn(None); return self.current_turn.insert(turn); } @@ -195,12 +243,6 @@ impl ThreadHistoryBuilder { unreachable!("current turn must exist after initialization"); } - fn next_turn_id(&mut self) -> String { - let id = format!("turn-{}", self.next_turn_index); - self.next_turn_index += 1; - id - } - fn next_item_id(&mut self) -> String { let id = format!("item-{}", self.next_item_index); self.next_item_index += 1; @@ -237,6 +279,19 @@ struct PendingTurn { items: Vec, error: Option, status: TurnStatus, + /// True when this turn originated from an explicit `turn_started`/`turn_complete` + /// boundary, so we preserve it even if it has no renderable items. + opened_explicitly: bool, + /// True when this turn includes a persisted `RolloutItem::Compacted`, which + /// should keep the turn from being dropped even without normal items. + saw_compaction: bool, +} + +impl PendingTurn { + fn opened_explicitly(mut self) -> Self { + self.opened_explicitly = true; + self + } } impl From for Turn { @@ -256,11 +311,15 @@ mod tests { use codex_protocol::protocol::AgentMessageEvent; use codex_protocol::protocol::AgentReasoningEvent; use codex_protocol::protocol::AgentReasoningRawContentEvent; + use codex_protocol::protocol::CompactedItem; use codex_protocol::protocol::ThreadRolledBackEvent; use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnAbortedEvent; + use codex_protocol::protocol::TurnCompleteEvent; + use codex_protocol::protocol::TurnStartedEvent; use codex_protocol::protocol::UserMessageEvent; use pretty_assertions::assert_eq; + use uuid::Uuid; #[test] fn builds_multiple_turns_with_reasoning_items() { @@ -291,11 +350,15 @@ mod tests { }), ]; - let turns = build_turns_from_event_msgs(&events); + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 2); let first = &turns[0]; - assert_eq!(first.id, "turn-1"); + assert!(Uuid::parse_str(&first.id).is_ok()); assert_eq!(first.status, TurnStatus::Completed); assert_eq!(first.items.len(), 3); assert_eq!( @@ -330,7 +393,8 @@ mod tests { ); let second = &turns[1]; - assert_eq!(second.id, "turn-2"); + assert!(Uuid::parse_str(&second.id).is_ok()); + assert_ne!(first.id, second.id); assert_eq!(second.items.len(), 2); assert_eq!( second.items[0], @@ -374,7 +438,11 @@ mod tests { }), ]; - let turns = build_turns_from_event_msgs(&events); + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 1); let turn = &turns[0]; assert_eq!(turn.items.len(), 4); @@ -410,6 +478,7 @@ mod tests { message: "Working...".into(), }), EventMsg::TurnAborted(TurnAbortedEvent { + turn_id: Some("turn-1".into()), reason: TurnAbortReason::Replaced, }), EventMsg::UserMessage(UserMessageEvent { @@ -423,7 +492,11 @@ mod tests { }), ]; - let turns = build_turns_from_event_msgs(&events); + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 2); let first_turn = &turns[0]; @@ -502,46 +575,49 @@ mod tests { }), ]; - let turns = build_turns_from_event_msgs(&events); - let expected = vec![ - Turn { - id: "turn-1".into(), - status: TurnStatus::Completed, - error: None, - items: vec![ - ThreadItem::UserMessage { - id: "item-1".into(), - content: vec![UserInput::Text { - text: "First".into(), - text_elements: Vec::new(), - }], - }, - ThreadItem::AgentMessage { - id: "item-2".into(), - text: "A1".into(), - }, - ], - }, - Turn { - id: "turn-2".into(), - status: TurnStatus::Completed, - error: None, - items: vec![ - ThreadItem::UserMessage { - id: "item-3".into(), - content: vec![UserInput::Text { - text: "Third".into(), - text_elements: Vec::new(), - }], - }, - ThreadItem::AgentMessage { - id: "item-4".into(), - text: "A3".into(), - }, - ], - }, - ]; - assert_eq!(turns, expected); + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); + assert_eq!(turns.len(), 2); + assert!(Uuid::parse_str(&turns[0].id).is_ok()); + assert!(Uuid::parse_str(&turns[1].id).is_ok()); + assert_ne!(turns[0].id, turns[1].id); + assert_eq!(turns[0].status, TurnStatus::Completed); + assert_eq!(turns[1].status, TurnStatus::Completed); + assert_eq!( + turns[0].items, + vec![ + ThreadItem::UserMessage { + id: "item-1".into(), + content: vec![UserInput::Text { + text: "First".into(), + text_elements: Vec::new(), + }], + }, + ThreadItem::AgentMessage { + id: "item-2".into(), + text: "A1".into(), + }, + ] + ); + assert_eq!( + turns[1].items, + vec![ + ThreadItem::UserMessage { + id: "item-3".into(), + content: vec![UserInput::Text { + text: "Third".into(), + text_elements: Vec::new(), + }], + }, + ThreadItem::AgentMessage { + id: "item-4".into(), + text: "A3".into(), + }, + ] + ); } #[test] @@ -568,7 +644,95 @@ mod tests { EventMsg::ThreadRolledBack(ThreadRolledBackEvent { num_turns: 99 }), ]; - let turns = build_turns_from_event_msgs(&events); + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); assert_eq!(turns, Vec::::new()); } + + #[test] + fn uses_explicit_turn_boundaries_for_mid_turn_steering() { + let events = vec![ + EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-a".into(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + EventMsg::UserMessage(UserMessageEvent { + message: "Start".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::UserMessage(UserMessageEvent { + message: "Steer".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-a".into(), + last_agent_message: None, + }), + ]; + + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); + assert_eq!(turns.len(), 1); + assert_eq!(turns[0].id, "turn-a"); + assert_eq!( + turns[0].items, + vec![ + ThreadItem::UserMessage { + id: "item-1".into(), + content: vec![UserInput::Text { + text: "Start".into(), + text_elements: Vec::new(), + }], + }, + ThreadItem::UserMessage { + id: "item-2".into(), + content: vec![UserInput::Text { + text: "Steer".into(), + text_elements: Vec::new(), + }], + }, + ] + ); + } + + #[test] + fn preserves_compaction_only_turn() { + let items = vec![ + RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-compact".into(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + })), + RolloutItem::Compacted(CompactedItem { + message: String::new(), + replacement_history: None, + }), + RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-compact".into(), + last_agent_message: None, + })), + ]; + + let turns = build_turns_from_rollout_items(&items); + assert_eq!( + turns, + vec![Turn { + id: "turn-compact".into(), + status: TurnStatus::Completed, + error: None, + items: Vec::new(), + }] + ); + } } diff --git a/codex-rs/app-server-test-client/src/lib.rs b/codex-rs/app-server-test-client/src/lib.rs index 415f45e4a..4c4152543 100644 --- a/codex-rs/app-server-test-client/src/lib.rs +++ b/codex-rs/app-server-test-client/src/lib.rs @@ -52,6 +52,8 @@ use codex_app_server_protocol::SendUserMessageParams; use codex_app_server_protocol::SendUserMessageResponse; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequest; +use codex_app_server_protocol::ThreadResumeParams; +use codex_app_server_protocol::ThreadResumeResponse; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::TurnStartParams; @@ -112,6 +114,13 @@ enum CliCommand { /// User message to send to Codex. user_message: String, }, + /// Resume a V2 thread by id, then send a user message. + ResumeMessageV2 { + /// Existing thread id to resume. + thread_id: String, + /// User message to send to Codex. + user_message: String, + }, /// Start a V2 turn that elicits an ExecCommand approval. #[command(name = "trigger-cmd-approval")] TriggerCmdApproval { @@ -161,6 +170,16 @@ pub fn run() -> Result<()> { CliCommand::SendMessageV2 { user_message } => { send_message_v2(&codex_bin, &config_overrides, user_message, &dynamic_tools) } + CliCommand::ResumeMessageV2 { + thread_id, + user_message, + } => resume_message_v2( + &codex_bin, + &config_overrides, + thread_id, + user_message, + &dynamic_tools, + ), CliCommand::TriggerCmdApproval { user_message } => { trigger_cmd_approval(&codex_bin, &config_overrides, user_message, &dynamic_tools) } @@ -233,6 +252,41 @@ pub fn send_message_v2( ) } +fn resume_message_v2( + codex_bin: &Path, + config_overrides: &[String], + thread_id: String, + user_message: String, + dynamic_tools: &Option>, +) -> Result<()> { + ensure_dynamic_tools_unused(dynamic_tools, "resume-message-v2")?; + + let mut client = CodexClient::spawn(codex_bin, config_overrides)?; + + let initialize = client.initialize()?; + println!("< initialize response: {initialize:?}"); + + let resume_response = client.thread_resume(ThreadResumeParams { + thread_id, + ..Default::default() + })?; + println!("< thread/resume response: {resume_response:?}"); + + let turn_response = client.turn_start(TurnStartParams { + thread_id: resume_response.thread.id.clone(), + input: vec![V2UserInput::Text { + text: user_message, + text_elements: Vec::new(), + }], + ..Default::default() + })?; + println!("< turn/start response: {turn_response:?}"); + + client.stream_turn(&resume_response.thread.id, &turn_response.turn.id)?; + + Ok(()) +} + fn trigger_cmd_approval( codex_bin: &Path, config_overrides: &[String], @@ -592,6 +646,16 @@ impl CodexClient { self.send_request(request, request_id, "thread/start") } + fn thread_resume(&mut self, params: ThreadResumeParams) -> Result { + let request_id = self.request_id(); + let request = ClientRequest::ThreadResume { + request_id: request_id.clone(), + params, + }; + + self.send_request(request, request_id, "thread/resume") + } + fn turn_start(&mut self, params: TurnStartParams) -> Result { let request_id = self.request_id(); let request = ClientRequest::TurnStart { diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 73647b1c0..4f0c526f7 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -3,7 +3,7 @@ use crate::codex_message_processor::PendingInterrupts; use crate::codex_message_processor::PendingRollbacks; use crate::codex_message_processor::TurnSummary; use crate::codex_message_processor::TurnSummaryStore; -use crate::codex_message_processor::read_event_msgs_from_rollout; +use crate::codex_message_processor::read_rollout_items_from_rollout; use crate::codex_message_processor::read_summary_from_rollout; use crate::codex_message_processor::summary_to_thread; use crate::error_code::INTERNAL_ERROR_CODE; @@ -69,7 +69,7 @@ use codex_app_server_protocol::TurnInterruptResponse; use codex_app_server_protocol::TurnPlanStep; use codex_app_server_protocol::TurnPlanUpdatedNotification; use codex_app_server_protocol::TurnStatus; -use codex_app_server_protocol::build_turns_from_event_msgs; +use codex_app_server_protocol::build_turns_from_rollout_items; use codex_core::CodexThread; use codex_core::parse_command::shlex_join; use codex_core::protocol::ApplyPatchApprovalRequestEvent; @@ -1101,9 +1101,9 @@ pub(crate) async fn apply_bespoke_event_handling( { Ok(summary) => { let mut thread = summary_to_thread(summary); - match read_event_msgs_from_rollout(rollout_path.as_path()).await { - Ok(events) => { - thread.turns = build_turns_from_event_msgs(&events); + match read_rollout_items_from_rollout(rollout_path.as_path()).await { + Ok(items) => { + thread.turns = build_turns_from_rollout_items(&items); ThreadRollbackResponse { thread } } Err(err) => { diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index e8b921980..61507d79d 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -147,7 +147,7 @@ use codex_app_server_protocol::TurnSteerResponse; use codex_app_server_protocol::UserInfoResponse; use codex_app_server_protocol::UserInput as V2UserInput; use codex_app_server_protocol::UserSavedConfig; -use codex_app_server_protocol::build_turns_from_event_msgs; +use codex_app_server_protocol::build_turns_from_rollout_items; use codex_backend_client::Client as BackendClient; use codex_chatgpt::connectors; use codex_cloud_requirements::cloud_requirements_loader; @@ -2445,9 +2445,9 @@ impl CodexMessageProcessor { }; if include_turns && let Some(rollout_path) = rollout_path.as_ref() { - match read_event_msgs_from_rollout(rollout_path).await { - Ok(events) => { - thread.turns = build_turns_from_event_msgs(&events); + match read_rollout_items_from_rollout(rollout_path).await { + Ok(items) => { + thread.turns = build_turns_from_rollout_items(&items); } Err(err) if err.kind() == std::io::ErrorKind::NotFound => { self.send_invalid_request_error( @@ -2639,11 +2639,7 @@ impl CodexMessageProcessor { session_configured, .. }) => { - let SessionConfiguredEvent { - rollout_path, - initial_messages, - .. - } = session_configured; + let SessionConfiguredEvent { rollout_path, .. } = session_configured; let Some(rollout_path) = rollout_path else { self.send_internal_error( request_id, @@ -2683,9 +2679,22 @@ impl CodexMessageProcessor { return; } }; - thread.turns = initial_messages - .as_deref() - .map_or_else(Vec::new, build_turns_from_event_msgs); + match read_rollout_items_from_rollout(rollout_path.as_path()).await { + Ok(items) => { + thread.turns = build_turns_from_rollout_items(&items); + } + Err(err) => { + self.send_internal_error( + request_id, + format!( + "failed to load rollout `{}` for thread {thread_id}: {err}", + rollout_path.display() + ), + ) + .await; + return; + } + } let response = ThreadResumeResponse { thread, @@ -2847,11 +2856,7 @@ impl CodexMessageProcessor { } }; - let SessionConfiguredEvent { - rollout_path, - initial_messages, - .. - } = session_configured; + let SessionConfiguredEvent { rollout_path, .. } = session_configured; let Some(rollout_path) = rollout_path else { self.send_internal_error( request_id, @@ -2891,9 +2896,22 @@ impl CodexMessageProcessor { return; } }; - thread.turns = initial_messages - .as_deref() - .map_or_else(Vec::new, build_turns_from_event_msgs); + match read_rollout_items_from_rollout(rollout_path.as_path()).await { + Ok(items) => { + thread.turns = build_turns_from_rollout_items(&items); + } + Err(err) => { + self.send_internal_error( + request_id, + format!( + "failed to load rollout `{}` for thread {thread_id}: {err}", + rollout_path.display() + ), + ) + .await; + return; + } + } let response = ThreadForkResponse { thread: thread.clone(), @@ -5779,22 +5797,16 @@ pub(crate) async fn read_summary_from_rollout( }) } -pub(crate) async fn read_event_msgs_from_rollout( +pub(crate) async fn read_rollout_items_from_rollout( path: &Path, -) -> std::io::Result> { +) -> std::io::Result> { let items = match RolloutRecorder::get_rollout_history(path).await? { InitialHistory::New => Vec::new(), InitialHistory::Forked(items) => items, InitialHistory::Resumed(resumed) => resumed.history, }; - Ok(items - .into_iter() - .filter_map(|item| match item { - RolloutItem::EventMsg(event) => Some(event), - _ => None, - }) - .collect()) + Ok(items) } fn extract_conversation_summary( diff --git a/codex-rs/app-server/tests/suite/send_message.rs b/codex-rs/app-server/tests/suite/send_message.rs index ecb742aff..66032da20 100644 --- a/codex-rs/app-server/tests/suite/send_message.rs +++ b/codex-rs/app-server/tests/suite/send_message.rs @@ -560,6 +560,7 @@ fn append_rollout_turn_context(path: &Path, timestamp: &str, model: &str) -> std let line = RolloutLine { timestamp: timestamp.to_string(), item: RolloutItem::TurnContext(TurnContextItem { + turn_id: None, cwd: PathBuf::from("/"), approval_policy: AskForApproval::Never, sandbox_policy: SandboxPolicy::DangerFullAccess, diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index 9334a0764..c2dc3b8f8 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -268,6 +268,7 @@ mod tests { #[tokio::test] async fn on_event_updates_status_from_task_started() { let status = agent_status_from_event(&EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), model_context_window: None, collaboration_mode_kind: ModeKind::Default, })); @@ -277,6 +278,7 @@ mod tests { #[tokio::test] async fn on_event_updates_status_from_task_complete() { let status = agent_status_from_event(&EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-1".to_string(), last_agent_message: Some("done".to_string()), })); let expected = AgentStatus::Completed(Some("done".to_string())); @@ -297,6 +299,7 @@ mod tests { #[tokio::test] async fn on_event_updates_status_from_turn_aborted() { let status = agent_status_from_event(&EventMsg::TurnAborted(TurnAbortedEvent { + turn_id: Some("turn-1".to_string()), reason: TurnAbortReason::Interrupted, })); diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 7ee094b56..b64eedbe4 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -104,6 +104,7 @@ use tracing::instrument; use tracing::trace; use tracing::trace_span; use tracing::warn; +use uuid::Uuid; use crate::ModelProviderInfo; use crate::client::ModelClient; @@ -248,7 +249,6 @@ use codex_utils_readiness::ReadinessFlag; /// The high-level interface to the Codex system. /// It operates as a queue pair where you send submissions and receive events. pub struct Codex { - pub(crate) next_id: AtomicU64, pub(crate) tx_sub: Sender, pub(crate) rx_event: Receiver, // Last known status of the agent. @@ -429,7 +429,6 @@ impl Codex { submission_loop(Arc::clone(&session), config, rx_sub).instrument(session_loop_span), ); let codex = Codex { - next_id: AtomicU64::new(0), tx_sub, rx_event, agent_status: agent_status_rx, @@ -446,10 +445,7 @@ impl Codex { /// Submit the `op` wrapped in a `Submission` with a unique ID. pub async fn submit(&self, op: Op) -> CodexResult { - let id = self - .next_id - .fetch_add(1, std::sync::atomic::Ordering::SeqCst) - .to_string(); + let id = Uuid::now_v7().to_string(); let sub = Submission { id: id.clone(), op }; self.submit_with_id(sub).await?; Ok(id) @@ -3869,6 +3865,7 @@ pub(crate) async fn run_turn( let total_usage_tokens = sess.get_total_token_usage().await; let event = EventMsg::TurnStarted(TurnStartedEvent { + turn_id: turn_context.sub_id.clone(), model_context_window: turn_context.model_context_window(), collaboration_mode_kind: turn_context.collaboration_mode.mode, }); @@ -4822,6 +4819,7 @@ async fn try_run_sampling_request( ) -> CodexResult { let collaboration_mode = sess.current_collaboration_mode().await; let rollout_item = RolloutItem::TurnContext(TurnContextItem { + turn_id: Some(turn_context.sub_id.clone()), cwd: turn_context.cwd.clone(), approval_policy: turn_context.approval_policy, sandbox_policy: turn_context.sandbox_policy.clone(), diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index b9af1dd5f..504b876bb 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; use std::sync::Arc; -use std::sync::atomic::AtomicU64; use async_channel::Receiver; use async_channel::Sender; @@ -90,7 +89,6 @@ pub(crate) async fn run_codex_thread_interactive( }); Ok(Codex { - next_id: AtomicU64::new(0), tx_sub: tx_ops, rx_event: rx_sub, agent_status: codex.agent_status.clone(), @@ -166,7 +164,6 @@ pub(crate) async fn run_codex_thread_one_shot( drop(rx_closed); Ok(Codex { - next_id: AtomicU64::new(0), rx_event: rx_bridge, tx_sub: tx_closed, agent_status, @@ -470,7 +467,6 @@ mod tests { let (_agent_status_tx, agent_status) = watch::channel(AgentStatus::PendingInit); let (session, ctx, _rx_evt) = crate::codex::make_session_and_context_with_rx().await; let codex = Arc::new(Codex { - next_id: AtomicU64::new(0), tx_sub, rx_event: rx_events, agent_status, @@ -482,6 +478,7 @@ mod tests { .send(Event { id: "full".to_string(), msg: EventMsg::TurnAborted(TurnAbortedEvent { + turn_id: Some("turn-1".to_string()), reason: TurnAbortReason::Interrupted, }), }) diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index ed8abfc54..f7de93dfe 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -57,6 +57,7 @@ pub(crate) async fn run_compact_task( input: Vec, ) -> CodexResult<()> { let start_event = EventMsg::TurnStarted(TurnStartedEvent { + turn_id: turn_context.sub_id.clone(), model_context_window: turn_context.model_context_window(), collaboration_mode_kind: turn_context.collaboration_mode.mode, }); @@ -95,6 +96,7 @@ async fn run_compact_task_inner( // session config before this write occurs. let collaboration_mode = sess.current_collaboration_mode().await; let rollout_item = RolloutItem::TurnContext(TurnContextItem { + turn_id: Some(turn_context.sub_id.clone()), cwd: turn_context.cwd.clone(), approval_policy: turn_context.approval_policy, sandbox_policy: turn_context.sandbox_policy.clone(), diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index 6af43dddf..4fe07560b 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -34,6 +34,7 @@ pub(crate) async fn run_remote_compact_task( turn_context: Arc, ) -> CodexResult<()> { let start_event = EventMsg::TurnStarted(TurnStartedEvent { + turn_id: turn_context.sub_id.clone(), model_context_window: turn_context.model_context_window(), collaboration_mode_kind: turn_context.collaboration_mode.mode, }); diff --git a/codex-rs/core/src/rollout/policy.rs b/codex-rs/core/src/rollout/policy.rs index 5043c9ddf..edc3b941c 100644 --- a/codex-rs/core/src/rollout/policy.rs +++ b/codex-rs/core/src/rollout/policy.rs @@ -47,7 +47,9 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { | EventMsg::ExitedReviewMode(_) | EventMsg::ThreadRolledBack(_) | EventMsg::UndoCompleted(_) - | EventMsg::TurnAborted(_) => true, + | EventMsg::TurnAborted(_) + | EventMsg::TurnStarted(_) + | EventMsg::TurnComplete(_) => true, EventMsg::ItemCompleted(event) => { // Plan items are derived from streaming tags and are not part of the // raw ResponseItem history, so we persist their completion to replay @@ -56,8 +58,6 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { } EventMsg::Error(_) | EventMsg::Warning(_) - | EventMsg::TurnStarted(_) - | EventMsg::TurnComplete(_) | EventMsg::AgentMessageDelta(_) | EventMsg::AgentReasoningDelta(_) | EventMsg::AgentReasoningRawContentDelta(_) diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index cbb6cd4b3..9a3a4756a 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -214,7 +214,10 @@ impl Session { self.record_conversation_items(turn_context.as_ref(), &pending_response_items) .await; } - let event = EventMsg::TurnComplete(TurnCompleteEvent { last_agent_message }); + let event = EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: turn_context.sub_id.clone(), + last_agent_message, + }); self.send_event(turn_context.as_ref(), event).await; } @@ -290,7 +293,10 @@ impl Session { self.flush_rollout().await; } - let event = EventMsg::TurnAborted(TurnAbortedEvent { reason }); + let event = EventMsg::TurnAborted(TurnAbortedEvent { + turn_id: Some(task.turn_context.sub_id.clone()), + reason, + }); self.send_event(task.turn_context.as_ref(), event).await; } } diff --git a/codex-rs/core/src/tasks/user_shell.rs b/codex-rs/core/src/tasks/user_shell.rs index b0c505b64..ef6c7180a 100644 --- a/codex-rs/core/src/tasks/user_shell.rs +++ b/codex-rs/core/src/tasks/user_shell.rs @@ -101,6 +101,7 @@ pub(crate) async fn execute_user_shell_command( // emitted TurnStarted, so emitting another TurnStarted here would create // duplicate turn lifecycle events and confuse clients. let event = EventMsg::TurnStarted(TurnStartedEvent { + turn_id: turn_context.sub_id.clone(), model_context_window: turn_context.model_context_window(), collaboration_mode_kind: turn_context.collaboration_mode.mode, }); diff --git a/codex-rs/core/tests/suite/resume.rs b/codex-rs/core/tests/suite/resume.rs index c0bdcd4fe..261ec5808 100644 --- a/codex-rs/core/tests/suite/resume.rs +++ b/codex-rs/core/tests/suite/resume.rs @@ -64,14 +64,21 @@ async fn resume_includes_initial_messages_from_rollout_events() -> Result<()> { .expect("expected initial messages to be present for resumed session"); match initial_messages.as_slice() { [ + EventMsg::TurnStarted(started), EventMsg::UserMessage(first_user), EventMsg::TokenCount(_), EventMsg::AgentMessage(assistant_message), EventMsg::TokenCount(_), + EventMsg::TurnComplete(completed), ] => { assert_eq!(first_user.message, "Record some messages"); assert_eq!(first_user.text_elements, text_elements); assert_eq!(assistant_message.message, "Completed first turn"); + assert_eq!(completed.turn_id, started.turn_id); + assert_eq!( + completed.last_agent_message.as_deref(), + Some("Completed first turn") + ); } other => panic!("unexpected initial messages after resume: {other:#?}"), } @@ -123,17 +130,24 @@ async fn resume_includes_initial_messages_from_reasoning_events() -> Result<()> .expect("expected initial messages to be present for resumed session"); match initial_messages.as_slice() { [ + EventMsg::TurnStarted(started), EventMsg::UserMessage(first_user), EventMsg::TokenCount(_), EventMsg::AgentReasoning(reasoning), EventMsg::AgentReasoningRawContent(raw), EventMsg::AgentMessage(assistant_message), EventMsg::TokenCount(_), + EventMsg::TurnComplete(completed), ] => { assert_eq!(first_user.message, "Record reasoning messages"); assert_eq!(reasoning.text, "Summarized step"); assert_eq!(raw.text, "raw detail"); assert_eq!(assistant_message.message, "Completed reasoning turn"); + assert_eq!(completed.turn_id, started.turn_id); + assert_eq!( + completed.last_agent_message.as_deref(), + Some("Completed reasoning turn") + ); } other => panic!("unexpected initial messages after resume: {other:#?}"), } diff --git a/codex-rs/core/tests/suite/resume_warning.rs b/codex-rs/core/tests/suite/resume_warning.rs index 5e5fc6d74..a91013bed 100644 --- a/codex-rs/core/tests/suite/resume_warning.rs +++ b/codex-rs/core/tests/suite/resume_warning.rs @@ -22,6 +22,7 @@ fn resume_history( rollout_path: &std::path::Path, ) -> InitialHistory { let turn_ctx = TurnContextItem { + turn_id: None, cwd: config.cwd.clone(), approval_policy: config.approval_policy.value(), sandbox_policy: config.sandbox_policy.get().clone(), diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index 6b154b179..f9edc3662 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -264,7 +264,9 @@ impl EventProcessor for EventProcessorWithHumanOutput { "auto-cancelling (not supported in exec mode)".style(self.dimmed) ); } - EventMsg::TurnComplete(TurnCompleteEvent { last_agent_message }) => { + EventMsg::TurnComplete(TurnCompleteEvent { + last_agent_message, .. + }) => { let last_message = last_agent_message .as_deref() .or(self.last_proposed_plan.as_deref()); diff --git a/codex-rs/exec/src/event_processor_with_jsonl_output.rs b/codex-rs/exec/src/event_processor_with_jsonl_output.rs index 9675651ef..6cefcbd94 100644 --- a/codex-rs/exec/src/event_processor_with_jsonl_output.rs +++ b/codex-rs/exec/src/event_processor_with_jsonl_output.rs @@ -862,6 +862,7 @@ impl EventProcessor for EventProcessorWithJsonOutput { match msg { protocol::EventMsg::TurnComplete(protocol::TurnCompleteEvent { last_agent_message, + .. }) => { if let Some(output_file) = self.last_message_path.as_deref() { let last_message = last_agent_message diff --git a/codex-rs/exec/tests/event_processor_with_json_output.rs b/codex-rs/exec/tests/event_processor_with_json_output.rs index f349e4835..4e669d232 100644 --- a/codex-rs/exec/tests/event_processor_with_json_output.rs +++ b/codex-rs/exec/tests/event_processor_with_json_output.rs @@ -117,6 +117,7 @@ fn task_started_produces_turn_started_event() { let out = ep.collect_thread_events(&event( "t1", EventMsg::TurnStarted(codex_core::protocol::TurnStartedEvent { + turn_id: "turn-1".to_string(), model_context_window: Some(32_000), collaboration_mode_kind: ModeKind::Default, }), @@ -310,6 +311,7 @@ fn plan_update_emits_todo_list_started_updated_and_completed() { let complete = event( "p3", EventMsg::TurnComplete(codex_core::protocol::TurnCompleteEvent { + turn_id: "turn-1".to_string(), last_agent_message: None, }), ); @@ -677,6 +679,7 @@ fn plan_update_after_complete_starts_new_todo_list_with_new_id() { let complete = event( "t2", EventMsg::TurnComplete(codex_core::protocol::TurnCompleteEvent { + turn_id: "turn-1".to_string(), last_agent_message: None, }), ); @@ -829,6 +832,7 @@ fn error_followed_by_task_complete_produces_turn_failed() { let complete_event = event( "e2", EventMsg::TurnComplete(codex_core::protocol::TurnCompleteEvent { + turn_id: "turn-1".to_string(), last_agent_message: None, }), ); @@ -1276,6 +1280,7 @@ fn task_complete_produces_turn_completed_with_usage() { let complete_event = event( "e2", EventMsg::TurnComplete(codex_core::protocol::TurnCompleteEvent { + turn_id: "turn-1".to_string(), last_agent_message: Some("done".to_string()), }), ); diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index 6f6d2ec78..6c5b3c25a 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -279,7 +279,9 @@ async fn run_codex_tool_session_inner( .await; continue; } - EventMsg::TurnComplete(TurnCompleteEvent { last_agent_message }) => { + EventMsg::TurnComplete(TurnCompleteEvent { + last_agent_message, .. + }) => { let text = match last_agent_message { Some(msg) => msg, None => "".to_string(), diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 2368a39ba..a56f55ba9 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -1152,11 +1152,13 @@ pub struct ContextCompactedEvent; #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] pub struct TurnCompleteEvent { + pub turn_id: String, pub last_agent_message: Option, } #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] pub struct TurnStartedEvent { + pub turn_id: String, // TODO(aibrahim): make this not optional pub model_context_window: Option, #[serde(default)] @@ -1740,6 +1742,8 @@ impl From for ResponseItem { #[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, TS)] pub struct TurnContextItem { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub turn_id: Option, pub cwd: PathBuf, pub approval_policy: AskForApproval, pub sandbox_policy: SandboxPolicy, @@ -2402,6 +2406,7 @@ pub struct Chunk { #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] pub struct TurnAbortedEvent { + pub turn_id: Option, pub reason: TurnAbortReason, } @@ -2690,6 +2695,24 @@ mod tests { Ok(()) } + #[test] + fn turn_aborted_event_deserializes_without_turn_id() -> Result<()> { + let event: EventMsg = serde_json::from_value(json!({ + "type": "turn_aborted", + "reason": "interrupted", + }))?; + + match event { + EventMsg::TurnAborted(TurnAbortedEvent { turn_id, reason }) => { + assert_eq!(turn_id, None); + assert_eq!(reason, TurnAbortReason::Interrupted); + } + _ => panic!("expected turn_aborted event"), + } + + Ok(()) + } + /// Serialize Event to verify that its JSON representation has the expected /// amount of nesting. #[test] diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 9920e8d1b..18c4b9b16 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -3929,9 +3929,9 @@ impl ChatWidget { } EventMsg::AgentReasoningSectionBreak(_) => self.on_reasoning_section_break(), EventMsg::TurnStarted(_) => self.on_task_started(), - EventMsg::TurnComplete(TurnCompleteEvent { last_agent_message }) => { - self.on_task_complete(last_agent_message, from_replay) - } + EventMsg::TurnComplete(TurnCompleteEvent { + last_agent_message, .. + }) => self.on_task_complete(last_agent_message, from_replay), EventMsg::TokenCount(ev) => { self.set_token_info(ev.info); self.on_rate_limit_snapshot(ev.rate_limits); diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index 280db452f..6fdefb36c 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -589,6 +589,7 @@ async fn interrupted_turn_restores_queued_messages_with_images_and_elements() { chat.handle_codex_event(Event { id: "interrupt".into(), msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent { + turn_id: Some("turn-1".to_string()), reason: TurnAbortReason::Interrupted, }), }); @@ -652,6 +653,7 @@ async fn interrupted_turn_restore_keeps_active_mode_for_resubmission() { chat.handle_codex_event(Event { id: "interrupt".into(), msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent { + turn_id: Some("turn-1".to_string()), reason: TurnAbortReason::Interrupted, }), }); @@ -1627,6 +1629,7 @@ async fn plan_implementation_popup_skips_replayed_turn_complete() { chat.set_collaboration_mask(plan_mask); chat.replay_initial_messages(vec![EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-1".to_string(), last_agent_message: Some("Plan details".to_string()), })]); @@ -1651,6 +1654,7 @@ async fn plan_implementation_popup_shows_once_when_replay_precedes_live_turn_com chat.on_plan_item_completed("- Step 1\n- Step 2\n".to_string()); chat.replay_initial_messages(vec![EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-1".to_string(), last_agent_message: Some("Plan details".to_string()), })]); let replay_popup = render_bottom_popup(&chat, 80); @@ -1662,6 +1666,7 @@ async fn plan_implementation_popup_shows_once_when_replay_precedes_live_turn_com chat.handle_codex_event(Event { id: "live-turn-complete-1".to_string(), msg: EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-1".to_string(), last_agent_message: Some("Plan details".to_string()), }), }); @@ -1682,6 +1687,7 @@ async fn plan_implementation_popup_shows_once_when_replay_precedes_live_turn_com chat.handle_codex_event(Event { id: "live-turn-complete-2".to_string(), msg: EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-1".to_string(), last_agent_message: Some("Plan details".to_string()), }), }); @@ -2646,6 +2652,7 @@ async fn unified_exec_wait_after_final_agent_message_snapshot() { chat.handle_codex_event(Event { id: "turn-1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), model_context_window: None, collaboration_mode_kind: ModeKind::Default, }), @@ -2663,6 +2670,7 @@ async fn unified_exec_wait_after_final_agent_message_snapshot() { chat.handle_codex_event(Event { id: "turn-1".into(), msg: EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-1".to_string(), last_agent_message: Some("Final response.".into()), }), }); @@ -2681,6 +2689,7 @@ async fn unified_exec_wait_before_streamed_agent_message_snapshot() { chat.handle_codex_event(Event { id: "turn-1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), model_context_window: None, collaboration_mode_kind: ModeKind::Default, }), @@ -2703,6 +2712,7 @@ async fn unified_exec_wait_before_streamed_agent_message_snapshot() { chat.handle_codex_event(Event { id: "turn-1".into(), msg: EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-1".to_string(), last_agent_message: None, }), }); @@ -2755,6 +2765,7 @@ async fn unified_exec_waiting_multiple_empty_snapshots() { chat.handle_codex_event(Event { id: "turn-wait-1".into(), msg: EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-1".to_string(), last_agent_message: None, }), }); @@ -2806,6 +2817,7 @@ async fn unified_exec_non_empty_then_empty_snapshots() { chat.handle_codex_event(Event { id: "turn-wait-3".into(), msg: EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-1".to_string(), last_agent_message: None, }), }); @@ -3543,6 +3555,7 @@ async fn interrupt_exec_marks_failed_snapshot() { chat.handle_codex_event(Event { id: "call-int".into(), msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent { + turn_id: Some("turn-1".to_string()), reason: TurnAbortReason::Interrupted, }), }); @@ -3568,6 +3581,7 @@ async fn interrupted_turn_error_message_snapshot() { chat.handle_codex_event(Event { id: "task-1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), model_context_window: None, collaboration_mode_kind: ModeKind::Default, }), @@ -3577,6 +3591,7 @@ async fn interrupted_turn_error_message_snapshot() { chat.handle_codex_event(Event { id: "task-1".into(), msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent { + turn_id: Some("turn-1".to_string()), reason: TurnAbortReason::Interrupted, }), }); @@ -4640,6 +4655,7 @@ async fn interrupt_restores_queued_messages_into_composer() { chat.handle_codex_event(Event { id: "turn-1".into(), msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent { + turn_id: Some("turn-1".to_string()), reason: TurnAbortReason::Interrupted, }), }); @@ -4678,6 +4694,7 @@ async fn interrupt_prepends_queued_messages_before_existing_composer_text() { chat.handle_codex_event(Event { id: "turn-1".into(), msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent { + turn_id: Some("turn-1".to_string()), reason: TurnAbortReason::Interrupted, }), }); @@ -4706,6 +4723,7 @@ async fn interrupt_clears_unified_exec_processes() { chat.handle_codex_event(Event { id: "turn-1".into(), msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent { + turn_id: Some("turn-1".to_string()), reason: TurnAbortReason::Interrupted, }), }); @@ -4726,6 +4744,7 @@ async fn review_ended_keeps_unified_exec_processes() { chat.handle_codex_event(Event { id: "turn-1".into(), msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent { + turn_id: Some("turn-1".to_string()), reason: TurnAbortReason::ReviewEnded, }), }); @@ -4758,6 +4777,7 @@ async fn interrupt_clears_unified_exec_wait_streak_snapshot() { chat.handle_codex_event(Event { id: "turn-1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), model_context_window: None, collaboration_mode_kind: ModeKind::Default, }), @@ -4769,6 +4789,7 @@ async fn interrupt_clears_unified_exec_wait_streak_snapshot() { chat.handle_codex_event(Event { id: "turn-1".into(), msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent { + turn_id: Some("turn-1".to_string()), reason: TurnAbortReason::Interrupted, }), }); @@ -4795,6 +4816,7 @@ async fn turn_complete_keeps_unified_exec_processes() { chat.handle_codex_event(Event { id: "turn-1".into(), msg: EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-1".to_string(), last_agent_message: None, }), }); @@ -4848,6 +4870,7 @@ async fn ui_snapshots_small_heights_task_running() { chat.handle_codex_event(Event { id: "task-1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), model_context_window: None, collaboration_mode_kind: ModeKind::Default, }), @@ -4880,6 +4903,7 @@ async fn status_widget_and_approval_modal_snapshot() { chat.handle_codex_event(Event { id: "task-1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), model_context_window: None, collaboration_mode_kind: ModeKind::Default, }), @@ -4933,6 +4957,7 @@ async fn status_widget_active_snapshot() { chat.handle_codex_event(Event { id: "task-1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), model_context_window: None, collaboration_mode_kind: ModeKind::Default, }), @@ -4983,6 +5008,7 @@ async fn mcp_startup_complete_does_not_clear_running_task() { chat.handle_codex_event(Event { id: "task-1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), model_context_window: None, collaboration_mode_kind: ModeKind::Default, }), @@ -5615,6 +5641,7 @@ async fn status_line_branch_refreshes_after_turn_complete() { chat.handle_codex_event(Event { id: "turn-1".into(), msg: EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-1".to_string(), last_agent_message: None, }), }); @@ -5632,6 +5659,7 @@ async fn status_line_branch_refreshes_after_interrupt() { chat.handle_codex_event(Event { id: "turn-1".into(), msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent { + turn_id: Some("turn-1".to_string()), reason: TurnAbortReason::Interrupted, }), }); @@ -5645,6 +5673,7 @@ async fn stream_recovery_restores_previous_status_header() { chat.handle_codex_event(Event { id: "task".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), model_context_window: None, collaboration_mode_kind: ModeKind::Default, }), @@ -5727,6 +5756,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() { chat.handle_codex_event(Event { id: "s1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), model_context_window: None, collaboration_mode_kind: ModeKind::Default, }), @@ -5752,6 +5782,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() { chat.handle_codex_event(Event { id: "s1".into(), msg: EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-1".to_string(), last_agent_message: None, }), }); @@ -5922,6 +5953,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() { chat.handle_codex_event(Event { id: "t1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), model_context_window: None, collaboration_mode_kind: ModeKind::Default, }), @@ -5970,6 +6002,7 @@ async fn chatwidget_markdown_code_blocks_vt100_snapshot() { chat.handle_codex_event(Event { id: "t1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), model_context_window: None, collaboration_mode_kind: ModeKind::Default, }), @@ -6042,6 +6075,7 @@ printf 'fenced within fenced\n' chat.handle_codex_event(Event { id: "t1".into(), msg: EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-1".to_string(), last_agent_message: None, }), }); @@ -6060,6 +6094,7 @@ async fn chatwidget_tall() { chat.handle_codex_event(Event { id: "t1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), model_context_window: None, collaboration_mode_kind: ModeKind::Default, }), diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index f457881ed..ac82c1d7e 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -1004,6 +1004,7 @@ mod tests { .clone() .unwrap_or_else(|| "gpt-5.1".to_string()); TurnContextItem { + turn_id: None, cwd, approval_policy: config.approval_policy.value(), sandbox_policy: config.sandbox_policy.get().clone(),