From a6b2bacb5b8bb485a52d979d6d53bf1e639811d7 Mon Sep 17 00:00:00 2001 From: Eric Traut Date: Sat, 21 Feb 2026 11:55:03 -0800 Subject: [PATCH] Prevent replayed runtime events from forcing active status (#12420) Fixes #11852 Resume replay was applying transient runtime events (`TurnStarted`, `StreamError`) as if they were live, which could leave the TUI stuck in a stale `Working` / `Reconnecting...` state after resuming an interrupted reconnect. This change makes replay transcript-oriented for these events by: - skipping retry-status restoration for replayed non-stream events - ignoring replayed `TurnStarted` for task-running state - ignoring replayed `StreamError` for reconnect/status UI Also adds TUI regression tests and snapshot coverage for the interrupted reconnect replay case. --- codex-rs/tui/src/chatwidget.rs | 36 +++- ...ayed_interrupted_reconnect_footer_row.snap | 6 + codex-rs/tui/src/chatwidget/tests.rs | 154 ++++++++++++++++++ 3 files changed, 189 insertions(+), 7 deletions(-) create mode 100644 codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__replayed_interrupted_reconnect_footer_row.snap diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 238a60730..9bfddd18e 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -832,6 +832,12 @@ fn remap_placeholders_for_message(message: UserMessage, next_label: &mut usize) } } +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum ReplayKind { + ResumeInitialMessages, + ThreadSnapshot, +} + impl ChatWidget { /// Synchronize the bottom-pane "task running" indicator with the current lifecycles. /// @@ -4002,13 +4008,13 @@ impl ChatWidget { continue; } // `id: None` indicates a synthetic/fake id coming from replay. - self.dispatch_event_msg(None, msg, true); + self.dispatch_event_msg(None, msg, Some(ReplayKind::ResumeInitialMessages)); } } pub(crate) fn handle_codex_event(&mut self, event: Event) { let Event { id, msg } = event; - self.dispatch_event_msg(Some(id), msg, false); + self.dispatch_event_msg(Some(id), msg, None); } pub(crate) fn handle_codex_event_replay(&mut self, event: Event) { @@ -4016,7 +4022,7 @@ impl ChatWidget { if matches!(msg, EventMsg::ShutdownComplete) { return; } - self.dispatch_event_msg(None, msg, true); + self.dispatch_event_msg(None, msg, Some(ReplayKind::ThreadSnapshot)); } /// Dispatch a protocol `EventMsg` to the appropriate handler. @@ -4024,9 +4030,17 @@ impl ChatWidget { /// `id` is `Some` for live events and `None` for replayed events from /// `replay_initial_messages()`. Callers should treat `None` as a "fake" id /// that must not be used to correlate follow-up actions. - fn dispatch_event_msg(&mut self, id: Option, msg: EventMsg, from_replay: bool) { + fn dispatch_event_msg( + &mut self, + id: Option, + msg: EventMsg, + replay_kind: Option, + ) { + let from_replay = replay_kind.is_some(); + let is_resume_initial_replay = + matches!(replay_kind, Some(ReplayKind::ResumeInitialMessages)); let is_stream_error = matches!(&msg, EventMsg::StreamError(_)); - if !is_stream_error { + if !is_resume_initial_replay && !is_stream_error { self.restore_retry_status_header_if_present(); } @@ -4061,7 +4075,11 @@ impl ChatWidget { self.on_agent_reasoning_final(); } EventMsg::AgentReasoningSectionBreak(_) => self.on_reasoning_section_break(), - EventMsg::TurnStarted(_) => self.on_task_started(), + EventMsg::TurnStarted(_) => { + if !is_resume_initial_replay { + self.on_task_started(); + } + } EventMsg::TurnComplete(TurnCompleteEvent { last_agent_message, .. }) => self.on_task_complete(last_agent_message, from_replay), @@ -4151,7 +4169,11 @@ impl ChatWidget { message, additional_details, .. - }) => self.on_stream_error(message, additional_details), + }) => { + if !is_resume_initial_replay { + self.on_stream_error(message, additional_details); + } + } EventMsg::UserMessage(ev) => { if from_replay { self.on_user_message_event(ev); diff --git a/codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__replayed_interrupted_reconnect_footer_row.snap b/codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__replayed_interrupted_reconnect_footer_row.snap new file mode 100644 index 000000000..eb3183f57 --- /dev/null +++ b/codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__replayed_interrupted_reconnect_footer_row.snap @@ -0,0 +1,6 @@ +--- +source: tui/src/chatwidget/tests.rs +assertion_line: 7060 +expression: header +--- +› Ask Codex to do anything diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index b9f5ddf81..e634d3ba3 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -7228,6 +7228,160 @@ async fn stream_error_updates_status_indicator() { assert_eq!(status.details(), Some(details)); } +#[tokio::test] +async fn replayed_turn_started_does_not_mark_task_running() { + let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await; + + chat.replay_initial_messages(vec![EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), + model_context_window: None, + collaboration_mode_kind: ModeKind::Default, + })]); + + assert!(!chat.bottom_pane.is_task_running()); + assert!(chat.bottom_pane.status_widget().is_none()); +} + +#[tokio::test] +async fn thread_snapshot_replayed_turn_started_marks_task_running() { + let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await; + + chat.handle_codex_event_replay(Event { + id: "turn-1".into(), + msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), + model_context_window: None, + collaboration_mode_kind: ModeKind::Default, + }), + }); + + drain_insert_history(&mut rx); + assert!(chat.bottom_pane.is_task_running()); + let status = chat + .bottom_pane + .status_widget() + .expect("status indicator should be visible"); + assert_eq!(status.header(), "Working"); +} + +#[tokio::test] +async fn replayed_stream_error_does_not_set_retry_status_or_status_indicator() { + let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await; + chat.set_status_header("Idle".to_string()); + + chat.replay_initial_messages(vec![EventMsg::StreamError(StreamErrorEvent { + message: "Reconnecting... 2/5".to_string(), + codex_error_info: Some(CodexErrorInfo::Other), + additional_details: Some("Idle timeout waiting for SSE".to_string()), + })]); + + let cells = drain_insert_history(&mut rx); + assert!( + cells.is_empty(), + "expected no history cell for replayed StreamError event" + ); + assert_eq!(chat.current_status_header, "Idle"); + assert!(chat.retry_status_header.is_none()); + assert!(chat.bottom_pane.status_widget().is_none()); +} + +#[tokio::test] +async fn thread_snapshot_replayed_stream_recovery_restores_previous_status_header() { + let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await; + + chat.handle_codex_event_replay(Event { + id: "task".into(), + msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), + model_context_window: None, + collaboration_mode_kind: ModeKind::Default, + }), + }); + drain_insert_history(&mut rx); + + chat.handle_codex_event_replay(Event { + id: "retry".into(), + msg: EventMsg::StreamError(StreamErrorEvent { + message: "Reconnecting... 1/5".to_string(), + codex_error_info: Some(CodexErrorInfo::Other), + additional_details: None, + }), + }); + drain_insert_history(&mut rx); + + chat.handle_codex_event_replay(Event { + id: "delta".into(), + msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { + delta: "hello".to_string(), + }), + }); + + let status = chat + .bottom_pane + .status_widget() + .expect("status indicator should be visible"); + assert_eq!(status.header(), "Working"); + assert_eq!(status.details(), None); + assert!(chat.retry_status_header.is_none()); +} + +#[tokio::test] +async fn resume_replay_interrupted_reconnect_does_not_leave_stale_working_state() { + let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await; + chat.set_status_header("Idle".to_string()); + + chat.replay_initial_messages(vec![ + EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), + model_context_window: None, + collaboration_mode_kind: ModeKind::Default, + }), + EventMsg::StreamError(StreamErrorEvent { + message: "Reconnecting... 1/5".to_string(), + codex_error_info: Some(CodexErrorInfo::Other), + additional_details: None, + }), + EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { + delta: "hello".to_string(), + }), + ]); + + let cells = drain_insert_history(&mut rx); + assert!( + cells.is_empty(), + "expected no history cells for replayed interrupted reconnect sequence" + ); + assert!(!chat.bottom_pane.is_task_running()); + assert!(chat.bottom_pane.status_widget().is_none()); + assert_eq!(chat.current_status_header, "Idle"); + assert!(chat.retry_status_header.is_none()); +} + +#[tokio::test] +async fn replayed_interrupted_reconnect_footer_row_snapshot() { + let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await; + + chat.replay_initial_messages(vec![ + EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), + model_context_window: None, + collaboration_mode_kind: ModeKind::Default, + }), + EventMsg::StreamError(StreamErrorEvent { + message: "Reconnecting... 2/5".to_string(), + codex_error_info: Some(CodexErrorInfo::Other), + additional_details: Some("Idle timeout waiting for SSE".to_string()), + }), + ]); + + let header = render_bottom_first_row(&chat, 80); + assert!( + !header.contains("Reconnecting") && !header.contains("Working"), + "expected replayed interrupted reconnect to avoid active status row, got {header:?}" + ); + assert_snapshot!("replayed_interrupted_reconnect_footer_row", header); +} + #[tokio::test] async fn stream_error_restores_hidden_status_indicator() { let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;