From 8b46c0ce00f0343670936545ea56609126ff5593 Mon Sep 17 00:00:00 2001 From: Charley Cunningham Date: Tue, 10 Feb 2026 18:53:43 -0800 Subject: [PATCH] tui: queue non-pending rollback trims in app-event order (#11373) ## Summary This PR fixes TUI transcript-sync behavior for `EventMsg::ThreadRolledBack` and makes rollback application order deterministic. Previously, rollback handling depended on `pending_rollback`: - if `pending_rollback` was set (local backtrack), TUI trimmed correctly - otherwise, replayed/external rollbacks were either ignored or could be applied at the wrong time relative to queued transcript inserts This change keeps the local backtrack path intact and routes non-pending rollbacks through the app event queue so rollback trims are applied in FIFO order with transcript cell inserts. ## What changed - Added/used `trim_transcript_cells_drop_last_n_user_turns(...)` for rollback-by-`num_turns` semantics. - Renamed rollback app event: - `AppEvent::ApplyReplayedThreadRollback` -> `AppEvent::ApplyThreadRollback` - Replay path (`ChatWidget`) now emits `ApplyThreadRollback`. - Live non-pending rollback path (`App::handle_backtrack_event`) now emits `ApplyThreadRollback` instead of trimming immediately. - App-level event handler applies `ApplyThreadRollback` after queued `InsertHistoryCell` events and schedules redraw only when a trim occurred. - When a trim occurs with an overlay open, TUI now syncs transcript overlay committed cells, clamps backtrack preview selection, and clears stale `deferred_history_lines` so closed overlays do not re-append rolled-back lines. - Clarified inline comments around the `pending_rollback` branch so future readers can reason about why there are two paths. ## Why queueing matters During resume/replay, transcript cells are populated via queued `InsertHistoryCell` app events. If a rollback is applied immediately outside that queue, it can run against an incomplete transcript and under-trim. Queueing non-pending rollbacks ensures consistent ordering and correct final transcript state. ## Behavior by rollback source - `pending_rollback = Some(...)` (local backtrack requested by this TUI): - use `finish_pending_backtrack()` and the stored selection boundary - `pending_rollback = None` (replay/external/non-local rollback): - enqueue `AppEvent::ApplyThreadRollback { num_turns }` and trim in app-event order ## Tests Added/updated tests covering ordering and semantics: - `app_backtrack::tests::trim_drop_last_n_user_turns_applies_rollback_semantics` - `app_backtrack::tests::trim_drop_last_n_user_turns_allows_overflow` - `app::tests::replayed_initial_messages_apply_rollback_in_queue_order` - `app::tests::live_rollback_during_replay_is_applied_in_app_event_order` - `app::tests::queued_rollback_syncs_overlay_and_clears_deferred_history` - `chatwidget::tests::replayed_thread_rollback_emits_ordered_app_event` Validation run: - `just fmt` - `cargo test -p codex-tui` --- codex-rs/tui/src/app.rs | 213 ++++++++++++++++++++++++++- codex-rs/tui/src/app_backtrack.rs | 170 +++++++++++++++++++-- codex-rs/tui/src/app_event.rs | 9 ++ codex-rs/tui/src/chatwidget.rs | 8 +- codex-rs/tui/src/chatwidget/tests.rs | 21 +++ codex-rs/tui/src/pager_overlay.rs | 25 ++++ 6 files changed, 434 insertions(+), 12 deletions(-) diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 1ef3475e4..d30b747eb 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -1537,6 +1537,11 @@ impl App { } } } + AppEvent::ApplyThreadRollback { num_turns } => { + if self.apply_non_pending_thread_rollback(num_turns) { + tui.frame_requester().schedule_frame(); + } + } AppEvent::StartCommitAnimation => { if self .commit_anim_running @@ -2324,7 +2329,6 @@ impl App { } fn handle_codex_event_replay(&mut self, event: Event) { - self.handle_backtrack_event(&event.msg); self.chat_widget.handle_codex_event_replay(event); } @@ -2632,6 +2636,8 @@ mod tests { use codex_core::protocol::SandboxPolicy; use codex_core::protocol::SessionConfiguredEvent; use codex_core::protocol::SessionSource; + use codex_core::protocol::ThreadRolledBackEvent; + use codex_core::protocol::UserMessageEvent; use codex_otel::OtelManager; use codex_protocol::ThreadId; use codex_protocol::user_input::TextElement; @@ -3137,6 +3143,211 @@ mod tests { assert_eq!(rollback_turns, Some(1)); } + #[tokio::test] + async fn replayed_initial_messages_apply_rollback_in_queue_order() { + let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await; + + let session_id = ThreadId::new(); + app.handle_codex_event_replay(Event { + id: String::new(), + msg: EventMsg::SessionConfigured(SessionConfiguredEvent { + session_id, + forked_from_id: None, + thread_name: None, + model: "gpt-test".to_string(), + model_provider_id: "test-provider".to_string(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::ReadOnly, + cwd: PathBuf::from("/home/user/project"), + reasoning_effort: None, + history_log_id: 0, + history_entry_count: 0, + initial_messages: Some(vec![ + EventMsg::UserMessage(UserMessageEvent { + message: "first prompt".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }), + EventMsg::UserMessage(UserMessageEvent { + message: "second prompt".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }), + EventMsg::ThreadRolledBack(ThreadRolledBackEvent { num_turns: 1 }), + EventMsg::UserMessage(UserMessageEvent { + message: "third prompt".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }), + ]), + network_proxy: None, + rollout_path: Some(PathBuf::new()), + }), + }); + + let mut saw_rollback = false; + while let Ok(event) = app_event_rx.try_recv() { + match event { + AppEvent::InsertHistoryCell(cell) => { + let cell: Arc = cell.into(); + app.transcript_cells.push(cell); + } + AppEvent::ApplyThreadRollback { num_turns } => { + saw_rollback = true; + crate::app_backtrack::trim_transcript_cells_drop_last_n_user_turns( + &mut app.transcript_cells, + num_turns, + ); + } + _ => {} + } + } + + assert!(saw_rollback); + let user_messages: Vec = app + .transcript_cells + .iter() + .filter_map(|cell| { + cell.as_any() + .downcast_ref::() + .map(|cell| cell.message.clone()) + }) + .collect(); + assert_eq!( + user_messages, + vec!["first prompt".to_string(), "third prompt".to_string()] + ); + } + + #[tokio::test] + async fn live_rollback_during_replay_is_applied_in_app_event_order() { + let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await; + + let session_id = ThreadId::new(); + app.handle_codex_event_replay(Event { + id: String::new(), + msg: EventMsg::SessionConfigured(SessionConfiguredEvent { + session_id, + forked_from_id: None, + thread_name: None, + model: "gpt-test".to_string(), + model_provider_id: "test-provider".to_string(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::ReadOnly, + cwd: PathBuf::from("/home/user/project"), + reasoning_effort: None, + history_log_id: 0, + history_entry_count: 0, + initial_messages: Some(vec![ + EventMsg::UserMessage(UserMessageEvent { + message: "first prompt".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }), + EventMsg::UserMessage(UserMessageEvent { + message: "second prompt".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }), + ]), + network_proxy: None, + rollout_path: Some(PathBuf::new()), + }), + }); + + // Simulate a live rollback arriving before queued replay inserts are drained. + app.handle_codex_event_now(Event { + id: "live-rollback".to_string(), + msg: EventMsg::ThreadRolledBack(ThreadRolledBackEvent { num_turns: 1 }), + }); + + let mut saw_rollback = false; + while let Ok(event) = app_event_rx.try_recv() { + match event { + AppEvent::InsertHistoryCell(cell) => { + let cell: Arc = cell.into(); + app.transcript_cells.push(cell); + } + AppEvent::ApplyThreadRollback { num_turns } => { + saw_rollback = true; + crate::app_backtrack::trim_transcript_cells_drop_last_n_user_turns( + &mut app.transcript_cells, + num_turns, + ); + } + _ => {} + } + } + + assert!(saw_rollback); + let user_messages: Vec = app + .transcript_cells + .iter() + .filter_map(|cell| { + cell.as_any() + .downcast_ref::() + .map(|cell| cell.message.clone()) + }) + .collect(); + assert_eq!(user_messages, vec!["first prompt".to_string()]); + } + + #[tokio::test] + async fn queued_rollback_syncs_overlay_and_clears_deferred_history() { + let mut app = make_test_app().await; + app.transcript_cells = vec![ + Arc::new(UserHistoryCell { + message: "first".to_string(), + text_elements: Vec::new(), + local_image_paths: Vec::new(), + }) as Arc, + Arc::new(AgentMessageCell::new( + vec![Line::from("after first")], + false, + )) as Arc, + Arc::new(UserHistoryCell { + message: "second".to_string(), + text_elements: Vec::new(), + local_image_paths: Vec::new(), + }) as Arc, + Arc::new(AgentMessageCell::new( + vec![Line::from("after second")], + false, + )) as Arc, + ]; + app.overlay = Some(Overlay::new_transcript(app.transcript_cells.clone())); + app.deferred_history_lines = vec![Line::from("stale buffered line")]; + app.backtrack.overlay_preview_active = true; + app.backtrack.nth_user_message = 1; + + let changed = app.apply_non_pending_thread_rollback(1); + + assert!(changed); + assert!(app.backtrack_render_pending); + assert!(app.deferred_history_lines.is_empty()); + assert_eq!(app.backtrack.nth_user_message, 0); + let user_messages: Vec = app + .transcript_cells + .iter() + .filter_map(|cell| { + cell.as_any() + .downcast_ref::() + .map(|cell| cell.message.clone()) + }) + .collect(); + assert_eq!(user_messages, vec!["first".to_string()]); + let overlay_cell_count = match app.overlay.as_ref() { + Some(Overlay::Transcript(t)) => t.committed_cell_count(), + _ => panic!("expected transcript overlay"), + }; + assert_eq!(overlay_cell_count, app.transcript_cells.len()); + } + #[tokio::test] async fn new_session_requests_shutdown_for_previous_conversation() { let (mut app, mut app_event_rx, mut op_rx) = make_test_app_with_channels().await; diff --git a/codex-rs/tui/src/app_backtrack.rs b/codex-rs/tui/src/app_backtrack.rs index 256355dd0..22b437ec5 100644 --- a/codex-rs/tui/src/app_backtrack.rs +++ b/codex-rs/tui/src/app_backtrack.rs @@ -12,8 +12,8 @@ //! - The first `Esc` in the main view "primes" the feature and captures a base thread id. //! - A subsequent `Esc` opens the transcript overlay (`Ctrl+T`) and highlights a user message. //! - `Enter` requests a rollback from core and records a `pending_rollback` guard. -//! - Only after receiving `EventMsg::ThreadRolledBack` do we trim local transcript state and -//! schedule a one-time scrollback refresh. +//! - On `EventMsg::ThreadRolledBack`, we either finish an in-flight backtrack request or queue a +//! rollback trim so it runs in event order with transcript inserts. //! //! The transcript overlay (`Ctrl+T`) renders committed transcript cells plus a render-only live //! tail derived from the current in-flight `ChatWidget.active_cell`. @@ -28,6 +28,7 @@ use std::path::PathBuf; use std::sync::Arc; use crate::app::App; +use crate::app_event::AppEvent; use crate::history_cell::SessionInfoCell; use crate::history_cell::UserHistoryCell; use crate::pager_overlay::Overlay; @@ -453,7 +454,24 @@ impl App { pub(crate) fn handle_backtrack_event(&mut self, event: &EventMsg) { match event { - EventMsg::ThreadRolledBack(_) => self.finish_pending_backtrack(), + EventMsg::ThreadRolledBack(rollback) => { + // `pending_rollback` is set only after this UI sends `Op::ThreadRollback` + // from the backtrack flow. In that case, finish immediately using the + // stored selection (nth user message) so local trim matches the exact + // backtrack target. + // + // When it is `None`, rollback came from replay or another source. We + // queue an AppEvent so rollback trim runs in FIFO order with + // `InsertHistoryCell` events, avoiding races with in-flight transcript + // inserts. + if self.backtrack.pending_rollback.is_some() { + self.finish_pending_backtrack(); + } else { + self.app_event_tx.send(AppEvent::ApplyThreadRollback { + num_turns: rollback.num_turns, + }); + } + } EventMsg::Error(ErrorEvent { codex_error_info: Some(CodexErrorInfo::ThreadRollbackFailed), .. @@ -465,6 +483,19 @@ impl App { } } + /// Apply rollback semantics for `ThreadRolledBack` events where this TUI does not have an + /// in-flight backtrack request (`pending_rollback` is `None`). + /// + /// Returns `true` when local transcript state changed. + pub(crate) fn apply_non_pending_thread_rollback(&mut self, num_turns: u32) -> bool { + if !trim_transcript_cells_drop_last_n_user_turns(&mut self.transcript_cells, num_turns) { + return false; + } + self.sync_overlay_after_transcript_trim(); + self.backtrack_render_pending = true; + true + } + /// Finish a pending rollback by applying the local trim and scheduling a scrollback refresh. /// /// We ignore events that do not correspond to the currently active thread to avoid applying @@ -477,8 +508,13 @@ impl App { // Ignore rollbacks targeting a prior thread. return; } - self.trim_transcript_for_backtrack(pending.selection.nth_user_message); - self.backtrack_render_pending = true; + if trim_transcript_cells_to_nth_user( + &mut self.transcript_cells, + pending.selection.nth_user_message, + ) { + self.sync_overlay_after_transcript_trim(); + self.backtrack_render_pending = true; + } } fn backtrack_selection(&self, nth_user_message: usize) -> Option { @@ -508,23 +544,72 @@ impl App { }) } - /// Trim `transcript_cells` to preserve only content before the selected user message. - fn trim_transcript_for_backtrack(&mut self, nth_user_message: usize) { - trim_transcript_cells_to_nth_user(&mut self.transcript_cells, nth_user_message); + /// Keep transcript-related UI state aligned after `transcript_cells` was trimmed. + /// + /// This does three things: + /// 1. If transcript overlay is open, replace its committed cells so removed turns disappear. + /// 2. If backtrack preview is active, clamp/recompute the highlighted user selection. + /// 3. Drop deferred transcript lines buffered while overlay was open to avoid flushing lines + /// for cells that were just removed by the trim. + fn sync_overlay_after_transcript_trim(&mut self) { + if let Some(Overlay::Transcript(t)) = &mut self.overlay { + t.replace_cells(self.transcript_cells.clone()); + } + if self.backtrack.overlay_preview_active { + let total_users = user_count(&self.transcript_cells); + let next_selection = if total_users == 0 { + usize::MAX + } else { + self.backtrack + .nth_user_message + .min(total_users.saturating_sub(1)) + }; + self.apply_backtrack_selection_internal(next_selection); + } + // While overlay is open, we buffer rendered history lines and flush them on close. + // If rollback trimmed cells meanwhile, those buffered lines can reference removed turns. + self.deferred_history_lines.clear(); } } fn trim_transcript_cells_to_nth_user( transcript_cells: &mut Vec>, nth_user_message: usize, -) { +) -> bool { if nth_user_message == usize::MAX { - return; + return false; } if let Some(cut_idx) = nth_user_position(transcript_cells, nth_user_message) { + let original_len = transcript_cells.len(); transcript_cells.truncate(cut_idx); + return transcript_cells.len() != original_len; } + false +} + +pub(crate) fn trim_transcript_cells_drop_last_n_user_turns( + transcript_cells: &mut Vec>, + num_turns: u32, +) -> bool { + if num_turns == 0 { + return false; + } + + let user_positions: Vec = user_positions_iter(transcript_cells).collect(); + let Some(&first_user_idx) = user_positions.first() else { + return false; + }; + + let turns_from_end = usize::try_from(num_turns).unwrap_or(usize::MAX); + let cut_idx = if turns_from_end >= user_positions.len() { + first_user_idx + } else { + user_positions[user_positions.len() - turns_from_end] + }; + let original_len = transcript_cells.len(); + transcript_cells.truncate(cut_idx); + transcript_cells.len() != original_len } pub(crate) fn user_count(cells: &[Arc]) -> usize { @@ -666,4 +751,69 @@ mod tests { .collect(); assert_eq!(between_text, " between"); } + + #[test] + fn trim_drop_last_n_user_turns_applies_rollback_semantics() { + let mut cells: Vec> = vec![ + Arc::new(UserHistoryCell { + message: "first".to_string(), + text_elements: Vec::new(), + local_image_paths: Vec::new(), + }) as Arc, + Arc::new(AgentMessageCell::new( + vec![Line::from("after first")], + false, + )) as Arc, + Arc::new(UserHistoryCell { + message: "second".to_string(), + text_elements: Vec::new(), + local_image_paths: Vec::new(), + }) as Arc, + Arc::new(AgentMessageCell::new( + vec![Line::from("after second")], + false, + )) as Arc, + ]; + + let changed = trim_transcript_cells_drop_last_n_user_turns(&mut cells, 1); + + assert!(changed); + assert_eq!(cells.len(), 2); + let first_user = cells[0] + .as_any() + .downcast_ref::() + .expect("first user"); + assert_eq!(first_user.message, "first"); + } + + #[test] + fn trim_drop_last_n_user_turns_allows_overflow() { + let mut cells: Vec> = vec![ + Arc::new(AgentMessageCell::new(vec![Line::from("intro")], true)) + as Arc, + Arc::new(UserHistoryCell { + message: "first".to_string(), + text_elements: Vec::new(), + local_image_paths: Vec::new(), + }) as Arc, + Arc::new(AgentMessageCell::new(vec![Line::from("after")], false)) + as Arc, + ]; + + let changed = trim_transcript_cells_drop_last_n_user_turns(&mut cells, u32::MAX); + + assert!(changed); + assert_eq!(cells.len(), 1); + let intro = cells[0] + .as_any() + .downcast_ref::() + .expect("intro agent"); + let intro_lines = intro.display_lines(u16::MAX); + let intro_text: String = intro_lines[0] + .spans + .iter() + .map(|span| span.content.as_ref()) + .collect(); + assert_eq!(intro_text, "• intro"); + } } diff --git a/codex-rs/tui/src/app_event.rs b/codex-rs/tui/src/app_event.rs index 0f456f457..0b85fafbb 100644 --- a/codex-rs/tui/src/app_event.rs +++ b/codex-rs/tui/src/app_event.rs @@ -126,6 +126,15 @@ pub(crate) enum AppEvent { InsertHistoryCell(Box), + /// Apply rollback semantics to local transcript cells. + /// + /// This is emitted when rollback was not initiated by the current + /// backtrack flow so trimming occurs in AppEvent queue order relative to + /// inserted history cells. + ApplyThreadRollback { + num_turns: u32, + }, + StartCommitAnimation, StopCommitAnimation, CommitTick, diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index e507a39a4..9920e8d1b 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -4041,7 +4041,13 @@ impl ChatWidget { EventMsg::CollabCloseEnd(ev) => self.on_collab_event(collab::close_end(ev)), EventMsg::CollabResumeBegin(ev) => self.on_collab_event(collab::resume_begin(ev)), EventMsg::CollabResumeEnd(ev) => self.on_collab_event(collab::resume_end(ev)), - EventMsg::ThreadRolledBack(_) => {} + EventMsg::ThreadRolledBack(rollback) => { + if from_replay { + self.app_event_tx.send(AppEvent::ApplyThreadRollback { + num_turns: rollback.num_turns, + }); + } + } EventMsg::RawResponseItem(_) | EventMsg::ItemStarted(_) | EventMsg::AgentMessageContentDelta(_) diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index 1f1af74a1..280db452f 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -54,6 +54,7 @@ use codex_core::protocol::ReviewTarget; use codex_core::protocol::SessionSource; use codex_core::protocol::StreamErrorEvent; use codex_core::protocol::TerminalInteractionEvent; +use codex_core::protocol::ThreadRolledBackEvent; use codex_core::protocol::TokenCountEvent; use codex_core::protocol::TokenUsage; use codex_core::protocol::TokenUsageInfo; @@ -1691,6 +1692,26 @@ async fn plan_implementation_popup_shows_once_when_replay_precedes_live_turn_com ); } +#[tokio::test] +async fn replayed_thread_rollback_emits_ordered_app_event() { + let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(Some("gpt-5")).await; + + chat.replay_initial_messages(vec![EventMsg::ThreadRolledBack(ThreadRolledBackEvent { + num_turns: 2, + })]); + + let mut saw = false; + while let Ok(event) = rx.try_recv() { + if let AppEvent::ApplyThreadRollback { num_turns } = event { + saw = true; + assert_eq!(num_turns, 2); + break; + } + } + + assert!(saw, "expected replay rollback app event"); +} + #[tokio::test] async fn plan_implementation_popup_skips_when_messages_queued() { let (mut chat, _rx, _op_rx) = make_chatwidget_manual(Some("gpt-5")).await; diff --git a/codex-rs/tui/src/pager_overlay.rs b/codex-rs/tui/src/pager_overlay.rs index 58f15fa7f..f04310cd8 100644 --- a/codex-rs/tui/src/pager_overlay.rs +++ b/codex-rs/tui/src/pager_overlay.rs @@ -539,6 +539,26 @@ impl TranscriptOverlay { } } + /// Replace committed transcript cells while keeping any cached in-progress output that is + /// currently shown at the end of the overlay. + /// + /// This is used when existing history is trimmed (for example after rollback) so the + /// transcript overlay immediately reflects the same committed cells as the main transcript. + pub(crate) fn replace_cells(&mut self, cells: Vec>) { + let follow_bottom = self.view.is_scrolled_to_bottom(); + self.cells = cells; + if self + .highlight_cell + .is_some_and(|idx| idx >= self.cells.len()) + { + self.highlight_cell = None; + } + self.rebuild_renderables(); + if follow_bottom { + self.view.scroll_offset = usize::MAX; + } + } + /// Sync the active-cell live tail with the current width and cell state. /// /// Recomputes the tail only when the cache key changes, preserving scroll @@ -680,6 +700,11 @@ impl TranscriptOverlay { pub(crate) fn is_done(&self) -> bool { self.is_done } + + #[cfg(test)] + pub(crate) fn committed_cell_count(&self) -> usize { + self.cells.len() + } } pub(crate) struct StaticOverlay {