tui: queue non-pending rollback trims in app-event order (#11373)

## Summary

This PR fixes TUI transcript-sync behavior for
`EventMsg::ThreadRolledBack` and makes rollback application order
deterministic.

Previously, rollback handling depended on `pending_rollback`:

- if `pending_rollback` was set (local backtrack), TUI trimmed correctly
- otherwise, replayed/external rollbacks were either ignored or could be
applied at the wrong time relative to queued transcript inserts

This change keeps the local backtrack path intact and routes non-pending
rollbacks through the app event queue so rollback trims are applied in
FIFO order with transcript cell inserts.

## What changed

- Added/used `trim_transcript_cells_drop_last_n_user_turns(...)` for
rollback-by-`num_turns` semantics.
- Renamed rollback app event:
- `AppEvent::ApplyReplayedThreadRollback` ->
`AppEvent::ApplyThreadRollback`
- Replay path (`ChatWidget`) now emits `ApplyThreadRollback`.
- Live non-pending rollback path (`App::handle_backtrack_event`) now
emits `ApplyThreadRollback` instead of trimming immediately.
- App-level event handler applies `ApplyThreadRollback` after queued
`InsertHistoryCell` events and schedules redraw only when a trim
occurred.
- When a trim occurs with an overlay open, TUI now syncs transcript
overlay committed cells, clamps backtrack preview selection, and clears
stale `deferred_history_lines` so closed overlays do not re-append
rolled-back lines.
- Clarified inline comments around the `pending_rollback` branch so
future readers can reason about why there are two paths.

## Why queueing matters

During resume/replay, transcript cells are populated via queued
`InsertHistoryCell` app events. If a rollback is applied immediately
outside that queue, it can run against an incomplete transcript and
under-trim. Queueing non-pending rollbacks ensures consistent ordering
and correct final transcript state.

## Behavior by rollback source

- `pending_rollback = Some(...)` (local backtrack requested by this
TUI):
  - use `finish_pending_backtrack()` and the stored selection boundary
- `pending_rollback = None` (replay/external/non-local rollback):
- enqueue `AppEvent::ApplyThreadRollback { num_turns }` and trim in
app-event order

## Tests

Added/updated tests covering ordering and semantics:

-
`app_backtrack::tests::trim_drop_last_n_user_turns_applies_rollback_semantics`
- `app_backtrack::tests::trim_drop_last_n_user_turns_allows_overflow`
- `app::tests::replayed_initial_messages_apply_rollback_in_queue_order`
-
`app::tests::live_rollback_during_replay_is_applied_in_app_event_order`
-
`app::tests::queued_rollback_syncs_overlay_and_clears_deferred_history`
- `chatwidget::tests::replayed_thread_rollback_emits_ordered_app_event`

Validation run:

- `just fmt`
- `cargo test -p codex-tui`
This commit is contained in:
Charley Cunningham 2026-02-10 18:53:43 -08:00 committed by GitHub
parent c68999ee6d
commit 8b46c0ce00
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 434 additions and 12 deletions

View file

@ -1537,6 +1537,11 @@ impl App {
}
}
}
AppEvent::ApplyThreadRollback { num_turns } => {
if self.apply_non_pending_thread_rollback(num_turns) {
tui.frame_requester().schedule_frame();
}
}
AppEvent::StartCommitAnimation => {
if self
.commit_anim_running
@ -2324,7 +2329,6 @@ impl App {
}
fn handle_codex_event_replay(&mut self, event: Event) {
self.handle_backtrack_event(&event.msg);
self.chat_widget.handle_codex_event_replay(event);
}
@ -2632,6 +2636,8 @@ mod tests {
use codex_core::protocol::SandboxPolicy;
use codex_core::protocol::SessionConfiguredEvent;
use codex_core::protocol::SessionSource;
use codex_core::protocol::ThreadRolledBackEvent;
use codex_core::protocol::UserMessageEvent;
use codex_otel::OtelManager;
use codex_protocol::ThreadId;
use codex_protocol::user_input::TextElement;
@ -3137,6 +3143,211 @@ mod tests {
assert_eq!(rollback_turns, Some(1));
}
#[tokio::test]
async fn replayed_initial_messages_apply_rollback_in_queue_order() {
let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await;
let session_id = ThreadId::new();
app.handle_codex_event_replay(Event {
id: String::new(),
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id,
forked_from_id: None,
thread_name: None,
model: "gpt-test".to_string(),
model_provider_id: "test-provider".to_string(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::ReadOnly,
cwd: PathBuf::from("/home/user/project"),
reasoning_effort: None,
history_log_id: 0,
history_entry_count: 0,
initial_messages: Some(vec![
EventMsg::UserMessage(UserMessageEvent {
message: "first prompt".to_string(),
images: None,
local_images: Vec::new(),
text_elements: Vec::new(),
}),
EventMsg::UserMessage(UserMessageEvent {
message: "second prompt".to_string(),
images: None,
local_images: Vec::new(),
text_elements: Vec::new(),
}),
EventMsg::ThreadRolledBack(ThreadRolledBackEvent { num_turns: 1 }),
EventMsg::UserMessage(UserMessageEvent {
message: "third prompt".to_string(),
images: None,
local_images: Vec::new(),
text_elements: Vec::new(),
}),
]),
network_proxy: None,
rollout_path: Some(PathBuf::new()),
}),
});
let mut saw_rollback = false;
while let Ok(event) = app_event_rx.try_recv() {
match event {
AppEvent::InsertHistoryCell(cell) => {
let cell: Arc<dyn HistoryCell> = cell.into();
app.transcript_cells.push(cell);
}
AppEvent::ApplyThreadRollback { num_turns } => {
saw_rollback = true;
crate::app_backtrack::trim_transcript_cells_drop_last_n_user_turns(
&mut app.transcript_cells,
num_turns,
);
}
_ => {}
}
}
assert!(saw_rollback);
let user_messages: Vec<String> = app
.transcript_cells
.iter()
.filter_map(|cell| {
cell.as_any()
.downcast_ref::<UserHistoryCell>()
.map(|cell| cell.message.clone())
})
.collect();
assert_eq!(
user_messages,
vec!["first prompt".to_string(), "third prompt".to_string()]
);
}
#[tokio::test]
async fn live_rollback_during_replay_is_applied_in_app_event_order() {
let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await;
let session_id = ThreadId::new();
app.handle_codex_event_replay(Event {
id: String::new(),
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id,
forked_from_id: None,
thread_name: None,
model: "gpt-test".to_string(),
model_provider_id: "test-provider".to_string(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::ReadOnly,
cwd: PathBuf::from("/home/user/project"),
reasoning_effort: None,
history_log_id: 0,
history_entry_count: 0,
initial_messages: Some(vec![
EventMsg::UserMessage(UserMessageEvent {
message: "first prompt".to_string(),
images: None,
local_images: Vec::new(),
text_elements: Vec::new(),
}),
EventMsg::UserMessage(UserMessageEvent {
message: "second prompt".to_string(),
images: None,
local_images: Vec::new(),
text_elements: Vec::new(),
}),
]),
network_proxy: None,
rollout_path: Some(PathBuf::new()),
}),
});
// Simulate a live rollback arriving before queued replay inserts are drained.
app.handle_codex_event_now(Event {
id: "live-rollback".to_string(),
msg: EventMsg::ThreadRolledBack(ThreadRolledBackEvent { num_turns: 1 }),
});
let mut saw_rollback = false;
while let Ok(event) = app_event_rx.try_recv() {
match event {
AppEvent::InsertHistoryCell(cell) => {
let cell: Arc<dyn HistoryCell> = cell.into();
app.transcript_cells.push(cell);
}
AppEvent::ApplyThreadRollback { num_turns } => {
saw_rollback = true;
crate::app_backtrack::trim_transcript_cells_drop_last_n_user_turns(
&mut app.transcript_cells,
num_turns,
);
}
_ => {}
}
}
assert!(saw_rollback);
let user_messages: Vec<String> = app
.transcript_cells
.iter()
.filter_map(|cell| {
cell.as_any()
.downcast_ref::<UserHistoryCell>()
.map(|cell| cell.message.clone())
})
.collect();
assert_eq!(user_messages, vec!["first prompt".to_string()]);
}
#[tokio::test]
async fn queued_rollback_syncs_overlay_and_clears_deferred_history() {
let mut app = make_test_app().await;
app.transcript_cells = vec![
Arc::new(UserHistoryCell {
message: "first".to_string(),
text_elements: Vec::new(),
local_image_paths: Vec::new(),
}) as Arc<dyn HistoryCell>,
Arc::new(AgentMessageCell::new(
vec![Line::from("after first")],
false,
)) as Arc<dyn HistoryCell>,
Arc::new(UserHistoryCell {
message: "second".to_string(),
text_elements: Vec::new(),
local_image_paths: Vec::new(),
}) as Arc<dyn HistoryCell>,
Arc::new(AgentMessageCell::new(
vec![Line::from("after second")],
false,
)) as Arc<dyn HistoryCell>,
];
app.overlay = Some(Overlay::new_transcript(app.transcript_cells.clone()));
app.deferred_history_lines = vec![Line::from("stale buffered line")];
app.backtrack.overlay_preview_active = true;
app.backtrack.nth_user_message = 1;
let changed = app.apply_non_pending_thread_rollback(1);
assert!(changed);
assert!(app.backtrack_render_pending);
assert!(app.deferred_history_lines.is_empty());
assert_eq!(app.backtrack.nth_user_message, 0);
let user_messages: Vec<String> = app
.transcript_cells
.iter()
.filter_map(|cell| {
cell.as_any()
.downcast_ref::<UserHistoryCell>()
.map(|cell| cell.message.clone())
})
.collect();
assert_eq!(user_messages, vec!["first".to_string()]);
let overlay_cell_count = match app.overlay.as_ref() {
Some(Overlay::Transcript(t)) => t.committed_cell_count(),
_ => panic!("expected transcript overlay"),
};
assert_eq!(overlay_cell_count, app.transcript_cells.len());
}
#[tokio::test]
async fn new_session_requests_shutdown_for_previous_conversation() {
let (mut app, mut app_event_rx, mut op_rx) = make_test_app_with_channels().await;

View file

@ -12,8 +12,8 @@
//! - The first `Esc` in the main view "primes" the feature and captures a base thread id.
//! - A subsequent `Esc` opens the transcript overlay (`Ctrl+T`) and highlights a user message.
//! - `Enter` requests a rollback from core and records a `pending_rollback` guard.
//! - Only after receiving `EventMsg::ThreadRolledBack` do we trim local transcript state and
//! schedule a one-time scrollback refresh.
//! - On `EventMsg::ThreadRolledBack`, we either finish an in-flight backtrack request or queue a
//! rollback trim so it runs in event order with transcript inserts.
//!
//! The transcript overlay (`Ctrl+T`) renders committed transcript cells plus a render-only live
//! tail derived from the current in-flight `ChatWidget.active_cell`.
@ -28,6 +28,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use crate::app::App;
use crate::app_event::AppEvent;
use crate::history_cell::SessionInfoCell;
use crate::history_cell::UserHistoryCell;
use crate::pager_overlay::Overlay;
@ -453,7 +454,24 @@ impl App {
pub(crate) fn handle_backtrack_event(&mut self, event: &EventMsg) {
match event {
EventMsg::ThreadRolledBack(_) => self.finish_pending_backtrack(),
EventMsg::ThreadRolledBack(rollback) => {
// `pending_rollback` is set only after this UI sends `Op::ThreadRollback`
// from the backtrack flow. In that case, finish immediately using the
// stored selection (nth user message) so local trim matches the exact
// backtrack target.
//
// When it is `None`, rollback came from replay or another source. We
// queue an AppEvent so rollback trim runs in FIFO order with
// `InsertHistoryCell` events, avoiding races with in-flight transcript
// inserts.
if self.backtrack.pending_rollback.is_some() {
self.finish_pending_backtrack();
} else {
self.app_event_tx.send(AppEvent::ApplyThreadRollback {
num_turns: rollback.num_turns,
});
}
}
EventMsg::Error(ErrorEvent {
codex_error_info: Some(CodexErrorInfo::ThreadRollbackFailed),
..
@ -465,6 +483,19 @@ impl App {
}
}
/// Apply rollback semantics for `ThreadRolledBack` events where this TUI does not have an
/// in-flight backtrack request (`pending_rollback` is `None`).
///
/// Returns `true` when local transcript state changed.
pub(crate) fn apply_non_pending_thread_rollback(&mut self, num_turns: u32) -> bool {
if !trim_transcript_cells_drop_last_n_user_turns(&mut self.transcript_cells, num_turns) {
return false;
}
self.sync_overlay_after_transcript_trim();
self.backtrack_render_pending = true;
true
}
/// Finish a pending rollback by applying the local trim and scheduling a scrollback refresh.
///
/// We ignore events that do not correspond to the currently active thread to avoid applying
@ -477,8 +508,13 @@ impl App {
// Ignore rollbacks targeting a prior thread.
return;
}
self.trim_transcript_for_backtrack(pending.selection.nth_user_message);
self.backtrack_render_pending = true;
if trim_transcript_cells_to_nth_user(
&mut self.transcript_cells,
pending.selection.nth_user_message,
) {
self.sync_overlay_after_transcript_trim();
self.backtrack_render_pending = true;
}
}
fn backtrack_selection(&self, nth_user_message: usize) -> Option<BacktrackSelection> {
@ -508,23 +544,72 @@ impl App {
})
}
/// Trim `transcript_cells` to preserve only content before the selected user message.
fn trim_transcript_for_backtrack(&mut self, nth_user_message: usize) {
trim_transcript_cells_to_nth_user(&mut self.transcript_cells, nth_user_message);
/// Keep transcript-related UI state aligned after `transcript_cells` was trimmed.
///
/// This does three things:
/// 1. If transcript overlay is open, replace its committed cells so removed turns disappear.
/// 2. If backtrack preview is active, clamp/recompute the highlighted user selection.
/// 3. Drop deferred transcript lines buffered while overlay was open to avoid flushing lines
/// for cells that were just removed by the trim.
fn sync_overlay_after_transcript_trim(&mut self) {
if let Some(Overlay::Transcript(t)) = &mut self.overlay {
t.replace_cells(self.transcript_cells.clone());
}
if self.backtrack.overlay_preview_active {
let total_users = user_count(&self.transcript_cells);
let next_selection = if total_users == 0 {
usize::MAX
} else {
self.backtrack
.nth_user_message
.min(total_users.saturating_sub(1))
};
self.apply_backtrack_selection_internal(next_selection);
}
// While overlay is open, we buffer rendered history lines and flush them on close.
// If rollback trimmed cells meanwhile, those buffered lines can reference removed turns.
self.deferred_history_lines.clear();
}
}
fn trim_transcript_cells_to_nth_user(
transcript_cells: &mut Vec<Arc<dyn crate::history_cell::HistoryCell>>,
nth_user_message: usize,
) {
) -> bool {
if nth_user_message == usize::MAX {
return;
return false;
}
if let Some(cut_idx) = nth_user_position(transcript_cells, nth_user_message) {
let original_len = transcript_cells.len();
transcript_cells.truncate(cut_idx);
return transcript_cells.len() != original_len;
}
false
}
pub(crate) fn trim_transcript_cells_drop_last_n_user_turns(
transcript_cells: &mut Vec<Arc<dyn crate::history_cell::HistoryCell>>,
num_turns: u32,
) -> bool {
if num_turns == 0 {
return false;
}
let user_positions: Vec<usize> = user_positions_iter(transcript_cells).collect();
let Some(&first_user_idx) = user_positions.first() else {
return false;
};
let turns_from_end = usize::try_from(num_turns).unwrap_or(usize::MAX);
let cut_idx = if turns_from_end >= user_positions.len() {
first_user_idx
} else {
user_positions[user_positions.len() - turns_from_end]
};
let original_len = transcript_cells.len();
transcript_cells.truncate(cut_idx);
transcript_cells.len() != original_len
}
pub(crate) fn user_count(cells: &[Arc<dyn crate::history_cell::HistoryCell>]) -> usize {
@ -666,4 +751,69 @@ mod tests {
.collect();
assert_eq!(between_text, " between");
}
#[test]
fn trim_drop_last_n_user_turns_applies_rollback_semantics() {
let mut cells: Vec<Arc<dyn HistoryCell>> = vec![
Arc::new(UserHistoryCell {
message: "first".to_string(),
text_elements: Vec::new(),
local_image_paths: Vec::new(),
}) as Arc<dyn HistoryCell>,
Arc::new(AgentMessageCell::new(
vec![Line::from("after first")],
false,
)) as Arc<dyn HistoryCell>,
Arc::new(UserHistoryCell {
message: "second".to_string(),
text_elements: Vec::new(),
local_image_paths: Vec::new(),
}) as Arc<dyn HistoryCell>,
Arc::new(AgentMessageCell::new(
vec![Line::from("after second")],
false,
)) as Arc<dyn HistoryCell>,
];
let changed = trim_transcript_cells_drop_last_n_user_turns(&mut cells, 1);
assert!(changed);
assert_eq!(cells.len(), 2);
let first_user = cells[0]
.as_any()
.downcast_ref::<UserHistoryCell>()
.expect("first user");
assert_eq!(first_user.message, "first");
}
#[test]
fn trim_drop_last_n_user_turns_allows_overflow() {
let mut cells: Vec<Arc<dyn HistoryCell>> = vec![
Arc::new(AgentMessageCell::new(vec![Line::from("intro")], true))
as Arc<dyn HistoryCell>,
Arc::new(UserHistoryCell {
message: "first".to_string(),
text_elements: Vec::new(),
local_image_paths: Vec::new(),
}) as Arc<dyn HistoryCell>,
Arc::new(AgentMessageCell::new(vec![Line::from("after")], false))
as Arc<dyn HistoryCell>,
];
let changed = trim_transcript_cells_drop_last_n_user_turns(&mut cells, u32::MAX);
assert!(changed);
assert_eq!(cells.len(), 1);
let intro = cells[0]
.as_any()
.downcast_ref::<AgentMessageCell>()
.expect("intro agent");
let intro_lines = intro.display_lines(u16::MAX);
let intro_text: String = intro_lines[0]
.spans
.iter()
.map(|span| span.content.as_ref())
.collect();
assert_eq!(intro_text, "• intro");
}
}

View file

@ -126,6 +126,15 @@ pub(crate) enum AppEvent {
InsertHistoryCell(Box<dyn HistoryCell>),
/// Apply rollback semantics to local transcript cells.
///
/// This is emitted when rollback was not initiated by the current
/// backtrack flow so trimming occurs in AppEvent queue order relative to
/// inserted history cells.
ApplyThreadRollback {
num_turns: u32,
},
StartCommitAnimation,
StopCommitAnimation,
CommitTick,

View file

@ -4041,7 +4041,13 @@ impl ChatWidget {
EventMsg::CollabCloseEnd(ev) => self.on_collab_event(collab::close_end(ev)),
EventMsg::CollabResumeBegin(ev) => self.on_collab_event(collab::resume_begin(ev)),
EventMsg::CollabResumeEnd(ev) => self.on_collab_event(collab::resume_end(ev)),
EventMsg::ThreadRolledBack(_) => {}
EventMsg::ThreadRolledBack(rollback) => {
if from_replay {
self.app_event_tx.send(AppEvent::ApplyThreadRollback {
num_turns: rollback.num_turns,
});
}
}
EventMsg::RawResponseItem(_)
| EventMsg::ItemStarted(_)
| EventMsg::AgentMessageContentDelta(_)

View file

@ -54,6 +54,7 @@ use codex_core::protocol::ReviewTarget;
use codex_core::protocol::SessionSource;
use codex_core::protocol::StreamErrorEvent;
use codex_core::protocol::TerminalInteractionEvent;
use codex_core::protocol::ThreadRolledBackEvent;
use codex_core::protocol::TokenCountEvent;
use codex_core::protocol::TokenUsage;
use codex_core::protocol::TokenUsageInfo;
@ -1691,6 +1692,26 @@ async fn plan_implementation_popup_shows_once_when_replay_precedes_live_turn_com
);
}
#[tokio::test]
async fn replayed_thread_rollback_emits_ordered_app_event() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(Some("gpt-5")).await;
chat.replay_initial_messages(vec![EventMsg::ThreadRolledBack(ThreadRolledBackEvent {
num_turns: 2,
})]);
let mut saw = false;
while let Ok(event) = rx.try_recv() {
if let AppEvent::ApplyThreadRollback { num_turns } = event {
saw = true;
assert_eq!(num_turns, 2);
break;
}
}
assert!(saw, "expected replay rollback app event");
}
#[tokio::test]
async fn plan_implementation_popup_skips_when_messages_queued() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(Some("gpt-5")).await;

View file

@ -539,6 +539,26 @@ impl TranscriptOverlay {
}
}
/// Replace committed transcript cells while keeping any cached in-progress output that is
/// currently shown at the end of the overlay.
///
/// This is used when existing history is trimmed (for example after rollback) so the
/// transcript overlay immediately reflects the same committed cells as the main transcript.
pub(crate) fn replace_cells(&mut self, cells: Vec<Arc<dyn HistoryCell>>) {
let follow_bottom = self.view.is_scrolled_to_bottom();
self.cells = cells;
if self
.highlight_cell
.is_some_and(|idx| idx >= self.cells.len())
{
self.highlight_cell = None;
}
self.rebuild_renderables();
if follow_bottom {
self.view.scroll_offset = usize::MAX;
}
}
/// Sync the active-cell live tail with the current width and cell state.
///
/// Recomputes the tail only when the cache key changes, preserving scroll
@ -680,6 +700,11 @@ impl TranscriptOverlay {
pub(crate) fn is_done(&self) -> bool {
self.is_done
}
#[cfg(test)]
pub(crate) fn committed_cell_count(&self) -> usize {
self.cells.len()
}
}
pub(crate) struct StaticOverlay {