Prevent replayed runtime events from forcing active status (#12420)

Fixes #11852

Resume replay was applying transient runtime events (`TurnStarted`,
`StreamError`) as if they were live, which could leave the TUI stuck in
a stale `Working` / `Reconnecting...` state after resuming an
interrupted reconnect.

This change makes replay transcript-oriented for these events by:
- skipping retry-status restoration for replayed non-stream events
- ignoring replayed `TurnStarted` for task-running state
- ignoring replayed `StreamError` for reconnect/status UI

Also adds TUI regression tests and snapshot coverage for the interrupted
reconnect replay case.
This commit is contained in:
Eric Traut 2026-02-21 11:55:03 -08:00 committed by GitHub
parent 5a635f3427
commit a6b2bacb5b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 189 additions and 7 deletions

View file

@ -832,6 +832,12 @@ fn remap_placeholders_for_message(message: UserMessage, next_label: &mut usize)
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum ReplayKind {
ResumeInitialMessages,
ThreadSnapshot,
}
impl ChatWidget {
/// Synchronize the bottom-pane "task running" indicator with the current lifecycles.
///
@ -4002,13 +4008,13 @@ impl ChatWidget {
continue;
}
// `id: None` indicates a synthetic/fake id coming from replay.
self.dispatch_event_msg(None, msg, true);
self.dispatch_event_msg(None, msg, Some(ReplayKind::ResumeInitialMessages));
}
}
pub(crate) fn handle_codex_event(&mut self, event: Event) {
let Event { id, msg } = event;
self.dispatch_event_msg(Some(id), msg, false);
self.dispatch_event_msg(Some(id), msg, None);
}
pub(crate) fn handle_codex_event_replay(&mut self, event: Event) {
@ -4016,7 +4022,7 @@ impl ChatWidget {
if matches!(msg, EventMsg::ShutdownComplete) {
return;
}
self.dispatch_event_msg(None, msg, true);
self.dispatch_event_msg(None, msg, Some(ReplayKind::ThreadSnapshot));
}
/// Dispatch a protocol `EventMsg` to the appropriate handler.
@ -4024,9 +4030,17 @@ impl ChatWidget {
/// `id` is `Some` for live events and `None` for replayed events from
/// `replay_initial_messages()`. Callers should treat `None` as a "fake" id
/// that must not be used to correlate follow-up actions.
fn dispatch_event_msg(&mut self, id: Option<String>, msg: EventMsg, from_replay: bool) {
fn dispatch_event_msg(
&mut self,
id: Option<String>,
msg: EventMsg,
replay_kind: Option<ReplayKind>,
) {
let from_replay = replay_kind.is_some();
let is_resume_initial_replay =
matches!(replay_kind, Some(ReplayKind::ResumeInitialMessages));
let is_stream_error = matches!(&msg, EventMsg::StreamError(_));
if !is_stream_error {
if !is_resume_initial_replay && !is_stream_error {
self.restore_retry_status_header_if_present();
}
@ -4061,7 +4075,11 @@ impl ChatWidget {
self.on_agent_reasoning_final();
}
EventMsg::AgentReasoningSectionBreak(_) => self.on_reasoning_section_break(),
EventMsg::TurnStarted(_) => self.on_task_started(),
EventMsg::TurnStarted(_) => {
if !is_resume_initial_replay {
self.on_task_started();
}
}
EventMsg::TurnComplete(TurnCompleteEvent {
last_agent_message, ..
}) => self.on_task_complete(last_agent_message, from_replay),
@ -4151,7 +4169,11 @@ impl ChatWidget {
message,
additional_details,
..
}) => self.on_stream_error(message, additional_details),
}) => {
if !is_resume_initial_replay {
self.on_stream_error(message, additional_details);
}
}
EventMsg::UserMessage(ev) => {
if from_replay {
self.on_user_message_event(ev);

View file

@ -0,0 +1,6 @@
---
source: tui/src/chatwidget/tests.rs
assertion_line: 7060
expression: header
---
Ask Codex to do anything

View file

@ -7228,6 +7228,160 @@ async fn stream_error_updates_status_indicator() {
assert_eq!(status.details(), Some(details));
}
#[tokio::test]
async fn replayed_turn_started_does_not_mark_task_running() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;
chat.replay_initial_messages(vec![EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-1".to_string(),
model_context_window: None,
collaboration_mode_kind: ModeKind::Default,
})]);
assert!(!chat.bottom_pane.is_task_running());
assert!(chat.bottom_pane.status_widget().is_none());
}
#[tokio::test]
async fn thread_snapshot_replayed_turn_started_marks_task_running() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event_replay(Event {
id: "turn-1".into(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-1".to_string(),
model_context_window: None,
collaboration_mode_kind: ModeKind::Default,
}),
});
drain_insert_history(&mut rx);
assert!(chat.bottom_pane.is_task_running());
let status = chat
.bottom_pane
.status_widget()
.expect("status indicator should be visible");
assert_eq!(status.header(), "Working");
}
#[tokio::test]
async fn replayed_stream_error_does_not_set_retry_status_or_status_indicator() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.set_status_header("Idle".to_string());
chat.replay_initial_messages(vec![EventMsg::StreamError(StreamErrorEvent {
message: "Reconnecting... 2/5".to_string(),
codex_error_info: Some(CodexErrorInfo::Other),
additional_details: Some("Idle timeout waiting for SSE".to_string()),
})]);
let cells = drain_insert_history(&mut rx);
assert!(
cells.is_empty(),
"expected no history cell for replayed StreamError event"
);
assert_eq!(chat.current_status_header, "Idle");
assert!(chat.retry_status_header.is_none());
assert!(chat.bottom_pane.status_widget().is_none());
}
#[tokio::test]
async fn thread_snapshot_replayed_stream_recovery_restores_previous_status_header() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event_replay(Event {
id: "task".into(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-1".to_string(),
model_context_window: None,
collaboration_mode_kind: ModeKind::Default,
}),
});
drain_insert_history(&mut rx);
chat.handle_codex_event_replay(Event {
id: "retry".into(),
msg: EventMsg::StreamError(StreamErrorEvent {
message: "Reconnecting... 1/5".to_string(),
codex_error_info: Some(CodexErrorInfo::Other),
additional_details: None,
}),
});
drain_insert_history(&mut rx);
chat.handle_codex_event_replay(Event {
id: "delta".into(),
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "hello".to_string(),
}),
});
let status = chat
.bottom_pane
.status_widget()
.expect("status indicator should be visible");
assert_eq!(status.header(), "Working");
assert_eq!(status.details(), None);
assert!(chat.retry_status_header.is_none());
}
#[tokio::test]
async fn resume_replay_interrupted_reconnect_does_not_leave_stale_working_state() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.set_status_header("Idle".to_string());
chat.replay_initial_messages(vec![
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-1".to_string(),
model_context_window: None,
collaboration_mode_kind: ModeKind::Default,
}),
EventMsg::StreamError(StreamErrorEvent {
message: "Reconnecting... 1/5".to_string(),
codex_error_info: Some(CodexErrorInfo::Other),
additional_details: None,
}),
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "hello".to_string(),
}),
]);
let cells = drain_insert_history(&mut rx);
assert!(
cells.is_empty(),
"expected no history cells for replayed interrupted reconnect sequence"
);
assert!(!chat.bottom_pane.is_task_running());
assert!(chat.bottom_pane.status_widget().is_none());
assert_eq!(chat.current_status_header, "Idle");
assert!(chat.retry_status_header.is_none());
}
#[tokio::test]
async fn replayed_interrupted_reconnect_footer_row_snapshot() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;
chat.replay_initial_messages(vec![
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-1".to_string(),
model_context_window: None,
collaboration_mode_kind: ModeKind::Default,
}),
EventMsg::StreamError(StreamErrorEvent {
message: "Reconnecting... 2/5".to_string(),
codex_error_info: Some(CodexErrorInfo::Other),
additional_details: Some("Idle timeout waiting for SSE".to_string()),
}),
]);
let header = render_bottom_first_row(&chat, 80);
assert!(
!header.contains("Reconnecting") && !header.contains("Working"),
"expected replayed interrupted reconnect to avoid active status row, got {header:?}"
);
assert_snapshot!("replayed_interrupted_reconnect_footer_row", header);
}
#[tokio::test]
async fn stream_error_restores_hidden_status_indicator() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;