diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 936ff2d85..160db04ce 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -104,6 +104,10 @@ use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::unbounded_channel; use toml::Value as TomlValue; +mod pending_interactive_replay; + +use self::pending_interactive_replay::PendingInteractiveReplayState; + const EXTERNAL_EDITOR_HINT: &str = "Save and close external editor to continue."; const THREAD_EVENT_CHANNEL_CAPACITY: usize = 32768; /// Baseline cadence for periodic stream commit animation ticks. @@ -252,6 +256,7 @@ struct ThreadEventStore { session_configured: Option, buffer: VecDeque, user_message_ids: HashSet, + pending_interactive_replay: PendingInteractiveReplayState, capacity: usize, active: bool, } @@ -262,6 +267,7 @@ impl ThreadEventStore { session_configured: None, buffer: VecDeque::new(), user_message_ids: HashSet::new(), + pending_interactive_replay: PendingInteractiveReplayState::default(), capacity, active: false, } @@ -274,6 +280,7 @@ impl ThreadEventStore { } fn push_event(&mut self, event: Event) { + self.pending_interactive_replay.note_event(&event); match &event.msg { EventMsg::SessionConfigured(_) => { self.session_configured = Some(event); @@ -308,19 +315,38 @@ impl ThreadEventStore { self.buffer.push_back(event); if self.buffer.len() > self.capacity && let Some(removed) = self.buffer.pop_front() - && matches!(removed.msg, EventMsg::UserMessage(_)) - && !removed.id.is_empty() { - self.user_message_ids.remove(&removed.id); + self.pending_interactive_replay.note_evicted_event(&removed); + if matches!(removed.msg, EventMsg::UserMessage(_)) && !removed.id.is_empty() { + self.user_message_ids.remove(&removed.id); + } } } fn snapshot(&self) -> ThreadEventSnapshot { ThreadEventSnapshot { session_configured: self.session_configured.clone(), - events: self.buffer.iter().cloned().collect(), + // Thread switches replay buffered events into a rebuilt ChatWidget. Only replay + // interactive prompts that are still pending, or answered approvals/input will reappear. + events: self + .buffer + .iter() + .filter(|event| { + self.pending_interactive_replay + .should_replay_snapshot_event(event) + }) + .cloned() + .collect(), } } + + fn note_outbound_op(&mut self, op: &Op) { + self.pending_interactive_replay.note_outbound_op(op); + } + + fn op_can_change_pending_replay_state(op: &Op) -> bool { + PendingInteractiveReplayState::op_can_change_state(op) + } } #[derive(Debug)] @@ -808,6 +834,20 @@ impl App { self.active_thread_rx = None; } + async fn note_active_thread_outbound_op(&mut self, op: &Op) { + if !ThreadEventStore::op_can_change_pending_replay_state(op) { + return; + } + let Some(thread_id) = self.active_thread_id else { + return; + }; + let Some(channel) = self.thread_event_channels.get(&thread_id) else { + return; + }; + let mut store = channel.store.lock().await; + store.note_outbound_op(op); + } + async fn enqueue_thread_event(&mut self, thread_id: ThreadId, event: Event) -> Result<()> { let (sender, store) = { let channel = self.ensure_thread_channel(thread_id); @@ -1816,7 +1856,12 @@ impl App { return Ok(AppRunControl::Exit(ExitReason::Fatal(message))); } AppEvent::CodexOp(op) => { - self.chat_widget.submit_op(op); + let replay_state_op = + ThreadEventStore::op_can_change_pending_replay_state(&op).then(|| op.clone()); + let submitted = self.chat_widget.submit_op(op); + if submitted && let Some(op) = replay_state_op.as_ref() { + self.note_active_thread_outbound_op(op).await; + } } AppEvent::DiffResult(text) => { // Clear the in-progress state in the bottom pane diff --git a/codex-rs/tui/src/app/pending_interactive_replay.rs b/codex-rs/tui/src/app/pending_interactive_replay.rs new file mode 100644 index 000000000..1cfb12f5f --- /dev/null +++ b/codex-rs/tui/src/app/pending_interactive_replay.rs @@ -0,0 +1,579 @@ +use codex_protocol::protocol::Event; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::Op; +use std::collections::HashMap; +use std::collections::HashSet; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct ElicitationRequestKey { + server_name: String, + request_id: codex_protocol::mcp::RequestId, +} + +impl ElicitationRequestKey { + fn new(server_name: String, request_id: codex_protocol::mcp::RequestId) -> Self { + Self { + server_name, + request_id, + } + } +} + +#[derive(Debug, Default)] +// Tracks which interactive prompts are still unresolved in the thread-event buffer. +// +// Thread snapshots are replayed when switching threads/agents. Most events should replay +// verbatim, but interactive prompts (approvals, request_user_input, MCP elicitations) must +// only replay if they are still pending. This state is updated from: +// - inbound events (`note_event`) +// - outbound ops that resolve a prompt (`note_outbound_op`) +// - buffer eviction (`note_evicted_event`) +// +// We keep both fast lookup sets (for snapshot filtering by call_id/request key) and +// turn-indexed queues/vectors so `TurnComplete`/`TurnAborted` can clear stale prompts tied +// to a turn. `request_user_input` removal is FIFO because the overlay answers queued prompts +// in FIFO order for a shared `turn_id`. +pub(super) struct PendingInteractiveReplayState { + exec_approval_call_ids: HashSet, + exec_approval_call_ids_by_turn_id: HashMap>, + patch_approval_call_ids: HashSet, + patch_approval_call_ids_by_turn_id: HashMap>, + elicitation_requests: HashSet, + request_user_input_call_ids: HashSet, + request_user_input_call_ids_by_turn_id: HashMap>, +} + +impl PendingInteractiveReplayState { + pub(super) fn op_can_change_state(op: &Op) -> bool { + matches!( + op, + Op::ExecApproval { .. } + | Op::PatchApproval { .. } + | Op::ResolveElicitation { .. } + | Op::UserInputAnswer { .. } + | Op::Shutdown + ) + } + + pub(super) fn note_outbound_op(&mut self, op: &Op) { + match op { + Op::ExecApproval { id, turn_id, .. } => { + self.exec_approval_call_ids.remove(id); + if let Some(turn_id) = turn_id { + Self::remove_call_id_from_turn_map_entry( + &mut self.exec_approval_call_ids_by_turn_id, + turn_id, + id, + ); + } + } + Op::PatchApproval { id, .. } => { + self.patch_approval_call_ids.remove(id); + Self::remove_call_id_from_turn_map( + &mut self.patch_approval_call_ids_by_turn_id, + id, + ); + } + Op::ResolveElicitation { + server_name, + request_id, + .. + } => { + self.elicitation_requests + .remove(&ElicitationRequestKey::new( + server_name.clone(), + request_id.clone(), + )); + } + // `Op::UserInputAnswer` identifies the turn, not the prompt call_id. The UI + // answers queued prompts for the same turn in FIFO order, so remove the oldest + // queued call_id for that turn. + Op::UserInputAnswer { id, .. } => { + let mut remove_turn_entry = false; + if let Some(call_ids) = self.request_user_input_call_ids_by_turn_id.get_mut(id) { + if !call_ids.is_empty() { + let call_id = call_ids.remove(0); + self.request_user_input_call_ids.remove(&call_id); + } + if call_ids.is_empty() { + remove_turn_entry = true; + } + } + if remove_turn_entry { + self.request_user_input_call_ids_by_turn_id.remove(id); + } + } + Op::Shutdown => self.clear(), + _ => {} + } + } + + pub(super) fn note_event(&mut self, event: &Event) { + match &event.msg { + EventMsg::ExecApprovalRequest(ev) => { + self.exec_approval_call_ids.insert(ev.call_id.clone()); + self.exec_approval_call_ids_by_turn_id + .entry(ev.turn_id.clone()) + .or_default() + .push(ev.call_id.clone()); + } + EventMsg::ExecCommandBegin(ev) => { + self.exec_approval_call_ids.remove(&ev.call_id); + Self::remove_call_id_from_turn_map( + &mut self.exec_approval_call_ids_by_turn_id, + &ev.call_id, + ); + } + EventMsg::ApplyPatchApprovalRequest(ev) => { + self.patch_approval_call_ids.insert(ev.call_id.clone()); + self.patch_approval_call_ids_by_turn_id + .entry(ev.turn_id.clone()) + .or_default() + .push(ev.call_id.clone()); + } + EventMsg::PatchApplyBegin(ev) => { + self.patch_approval_call_ids.remove(&ev.call_id); + Self::remove_call_id_from_turn_map( + &mut self.patch_approval_call_ids_by_turn_id, + &ev.call_id, + ); + } + EventMsg::ElicitationRequest(ev) => { + self.elicitation_requests.insert(ElicitationRequestKey::new( + ev.server_name.clone(), + ev.id.clone(), + )); + } + EventMsg::RequestUserInput(ev) => { + self.request_user_input_call_ids.insert(ev.call_id.clone()); + self.request_user_input_call_ids_by_turn_id + .entry(ev.turn_id.clone()) + .or_default() + .push(ev.call_id.clone()); + } + // A turn ending (normally or aborted/replaced) invalidates any unresolved + // turn-scoped approvals and request_user_input prompts from that turn. + EventMsg::TurnComplete(ev) => { + self.clear_exec_approval_turn(&ev.turn_id); + self.clear_patch_approval_turn(&ev.turn_id); + self.clear_request_user_input_turn(&ev.turn_id); + } + EventMsg::TurnAborted(ev) => { + if let Some(turn_id) = &ev.turn_id { + self.clear_exec_approval_turn(turn_id); + self.clear_patch_approval_turn(turn_id); + self.clear_request_user_input_turn(turn_id); + } + } + EventMsg::ShutdownComplete => self.clear(), + _ => {} + } + } + + pub(super) fn note_evicted_event(&mut self, event: &Event) { + match &event.msg { + EventMsg::ExecApprovalRequest(ev) => { + self.exec_approval_call_ids.remove(&ev.call_id); + Self::remove_call_id_from_turn_map_entry( + &mut self.exec_approval_call_ids_by_turn_id, + &ev.turn_id, + &ev.call_id, + ); + } + EventMsg::ApplyPatchApprovalRequest(ev) => { + self.patch_approval_call_ids.remove(&ev.call_id); + Self::remove_call_id_from_turn_map_entry( + &mut self.patch_approval_call_ids_by_turn_id, + &ev.turn_id, + &ev.call_id, + ); + } + EventMsg::ElicitationRequest(ev) => { + self.elicitation_requests + .remove(&ElicitationRequestKey::new( + ev.server_name.clone(), + ev.id.clone(), + )); + } + EventMsg::RequestUserInput(ev) => { + self.request_user_input_call_ids.remove(&ev.call_id); + let mut remove_turn_entry = false; + if let Some(call_ids) = self + .request_user_input_call_ids_by_turn_id + .get_mut(&ev.turn_id) + { + call_ids.retain(|call_id| call_id != &ev.call_id); + if call_ids.is_empty() { + remove_turn_entry = true; + } + } + if remove_turn_entry { + self.request_user_input_call_ids_by_turn_id + .remove(&ev.turn_id); + } + } + _ => {} + } + } + + pub(super) fn should_replay_snapshot_event(&self, event: &Event) -> bool { + match &event.msg { + EventMsg::ExecApprovalRequest(ev) => self.exec_approval_call_ids.contains(&ev.call_id), + EventMsg::ApplyPatchApprovalRequest(ev) => { + self.patch_approval_call_ids.contains(&ev.call_id) + } + EventMsg::ElicitationRequest(ev) => { + self.elicitation_requests + .contains(&ElicitationRequestKey::new( + ev.server_name.clone(), + ev.id.clone(), + )) + } + EventMsg::RequestUserInput(ev) => { + self.request_user_input_call_ids.contains(&ev.call_id) + } + _ => true, + } + } + + fn clear_request_user_input_turn(&mut self, turn_id: &str) { + if let Some(call_ids) = self.request_user_input_call_ids_by_turn_id.remove(turn_id) { + for call_id in call_ids { + self.request_user_input_call_ids.remove(&call_id); + } + } + } + + fn clear_exec_approval_turn(&mut self, turn_id: &str) { + if let Some(call_ids) = self.exec_approval_call_ids_by_turn_id.remove(turn_id) { + for call_id in call_ids { + self.exec_approval_call_ids.remove(&call_id); + } + } + } + + fn clear_patch_approval_turn(&mut self, turn_id: &str) { + if let Some(call_ids) = self.patch_approval_call_ids_by_turn_id.remove(turn_id) { + for call_id in call_ids { + self.patch_approval_call_ids.remove(&call_id); + } + } + } + + fn remove_call_id_from_turn_map( + call_ids_by_turn_id: &mut HashMap>, + call_id: &str, + ) { + call_ids_by_turn_id.retain(|_, call_ids| { + call_ids.retain(|queued_call_id| queued_call_id != call_id); + !call_ids.is_empty() + }); + } + + fn remove_call_id_from_turn_map_entry( + call_ids_by_turn_id: &mut HashMap>, + turn_id: &str, + call_id: &str, + ) { + let mut remove_turn_entry = false; + if let Some(call_ids) = call_ids_by_turn_id.get_mut(turn_id) { + call_ids.retain(|queued_call_id| queued_call_id != call_id); + if call_ids.is_empty() { + remove_turn_entry = true; + } + } + if remove_turn_entry { + call_ids_by_turn_id.remove(turn_id); + } + } + + fn clear(&mut self) { + self.exec_approval_call_ids.clear(); + self.exec_approval_call_ids_by_turn_id.clear(); + self.patch_approval_call_ids.clear(); + self.patch_approval_call_ids_by_turn_id.clear(); + self.elicitation_requests.clear(); + self.request_user_input_call_ids.clear(); + self.request_user_input_call_ids_by_turn_id.clear(); + } +} + +#[cfg(test)] +mod tests { + use super::super::ThreadEventStore; + use codex_protocol::protocol::Event; + use codex_protocol::protocol::EventMsg; + use codex_protocol::protocol::Op; + use codex_protocol::protocol::TurnAbortReason; + use pretty_assertions::assert_eq; + use std::collections::HashMap; + use std::path::PathBuf; + + #[test] + fn thread_event_snapshot_keeps_pending_request_user_input() { + let mut store = ThreadEventStore::new(8); + let request = Event { + id: "ev-1".to_string(), + msg: EventMsg::RequestUserInput( + codex_protocol::request_user_input::RequestUserInputEvent { + call_id: "call-1".to_string(), + turn_id: "turn-1".to_string(), + questions: Vec::new(), + }, + ), + }; + + store.push_event(request); + + let snapshot = store.snapshot(); + assert_eq!(snapshot.events.len(), 1); + assert!(matches!( + snapshot.events.first().map(|event| &event.msg), + Some(EventMsg::RequestUserInput(_)) + )); + } + + #[test] + fn thread_event_snapshot_drops_resolved_request_user_input_after_user_answer() { + let mut store = ThreadEventStore::new(8); + store.push_event(Event { + id: "ev-1".to_string(), + msg: EventMsg::RequestUserInput( + codex_protocol::request_user_input::RequestUserInputEvent { + call_id: "call-1".to_string(), + turn_id: "turn-1".to_string(), + questions: Vec::new(), + }, + ), + }); + + store.note_outbound_op(&Op::UserInputAnswer { + id: "turn-1".to_string(), + response: codex_protocol::request_user_input::RequestUserInputResponse { + answers: HashMap::new(), + }, + }); + + let snapshot = store.snapshot(); + assert!( + snapshot.events.is_empty(), + "resolved request_user_input prompt should not replay on thread switch" + ); + } + + #[test] + fn thread_event_snapshot_drops_resolved_exec_approval_after_outbound_approval_call_id() { + let mut store = ThreadEventStore::new(8); + store.push_event(Event { + id: "ev-1".to_string(), + msg: EventMsg::ExecApprovalRequest( + codex_protocol::protocol::ExecApprovalRequestEvent { + call_id: "call-1".to_string(), + approval_id: Some("approval-1".to_string()), + turn_id: "turn-1".to_string(), + command: vec!["echo".to_string(), "hi".to_string()], + cwd: PathBuf::from("/tmp"), + reason: None, + network_approval_context: None, + proposed_execpolicy_amendment: None, + proposed_network_policy_amendments: None, + parsed_cmd: Vec::new(), + }, + ), + }); + + store.note_outbound_op(&Op::ExecApproval { + id: "call-1".to_string(), + turn_id: Some("turn-1".to_string()), + decision: codex_protocol::protocol::ReviewDecision::Approved, + }); + + let snapshot = store.snapshot(); + assert!( + snapshot.events.is_empty(), + "resolved exec approval prompt should not replay on thread switch" + ); + } + + #[test] + fn thread_event_snapshot_drops_answered_request_user_input_for_multi_prompt_turn() { + let mut store = ThreadEventStore::new(8); + store.push_event(Event { + id: "ev-1".to_string(), + msg: EventMsg::RequestUserInput( + codex_protocol::request_user_input::RequestUserInputEvent { + call_id: "call-1".to_string(), + turn_id: "turn-1".to_string(), + questions: Vec::new(), + }, + ), + }); + + store.note_outbound_op(&Op::UserInputAnswer { + id: "turn-1".to_string(), + response: codex_protocol::request_user_input::RequestUserInputResponse { + answers: HashMap::new(), + }, + }); + + store.push_event(Event { + id: "ev-2".to_string(), + msg: EventMsg::RequestUserInput( + codex_protocol::request_user_input::RequestUserInputEvent { + call_id: "call-2".to_string(), + turn_id: "turn-1".to_string(), + questions: Vec::new(), + }, + ), + }); + + let snapshot = store.snapshot(); + assert_eq!(snapshot.events.len(), 1); + assert!(matches!( + snapshot.events.first().map(|event| &event.msg), + Some(EventMsg::RequestUserInput(ev)) if ev.call_id == "call-2" + )); + } + + #[test] + fn thread_event_snapshot_keeps_newer_request_user_input_pending_when_same_turn_has_queue() { + let mut store = ThreadEventStore::new(8); + store.push_event(Event { + id: "ev-1".to_string(), + msg: EventMsg::RequestUserInput( + codex_protocol::request_user_input::RequestUserInputEvent { + call_id: "call-1".to_string(), + turn_id: "turn-1".to_string(), + questions: Vec::new(), + }, + ), + }); + store.push_event(Event { + id: "ev-2".to_string(), + msg: EventMsg::RequestUserInput( + codex_protocol::request_user_input::RequestUserInputEvent { + call_id: "call-2".to_string(), + turn_id: "turn-1".to_string(), + questions: Vec::new(), + }, + ), + }); + + store.note_outbound_op(&Op::UserInputAnswer { + id: "turn-1".to_string(), + response: codex_protocol::request_user_input::RequestUserInputResponse { + answers: HashMap::new(), + }, + }); + + let snapshot = store.snapshot(); + assert_eq!(snapshot.events.len(), 1); + assert!(matches!( + snapshot.events.first().map(|event| &event.msg), + Some(EventMsg::RequestUserInput(ev)) if ev.call_id == "call-2" + )); + } + + #[test] + fn thread_event_snapshot_drops_resolved_patch_approval_after_outbound_approval() { + let mut store = ThreadEventStore::new(8); + store.push_event(Event { + id: "ev-1".to_string(), + msg: EventMsg::ApplyPatchApprovalRequest( + codex_protocol::protocol::ApplyPatchApprovalRequestEvent { + call_id: "call-1".to_string(), + turn_id: "turn-1".to_string(), + changes: HashMap::new(), + reason: None, + grant_root: None, + }, + ), + }); + + store.note_outbound_op(&Op::PatchApproval { + id: "call-1".to_string(), + decision: codex_protocol::protocol::ReviewDecision::Approved, + }); + + let snapshot = store.snapshot(); + assert!( + snapshot.events.is_empty(), + "resolved patch approval prompt should not replay on thread switch" + ); + } + + #[test] + fn thread_event_snapshot_drops_pending_approvals_when_turn_aborts() { + let mut store = ThreadEventStore::new(8); + store.push_event(Event { + id: "ev-1".to_string(), + msg: EventMsg::ExecApprovalRequest( + codex_protocol::protocol::ExecApprovalRequestEvent { + call_id: "exec-call-1".to_string(), + approval_id: Some("approval-1".to_string()), + turn_id: "turn-1".to_string(), + command: vec!["echo".to_string(), "hi".to_string()], + cwd: PathBuf::from("/tmp"), + reason: None, + network_approval_context: None, + proposed_execpolicy_amendment: None, + proposed_network_policy_amendments: None, + parsed_cmd: Vec::new(), + }, + ), + }); + store.push_event(Event { + id: "ev-2".to_string(), + msg: EventMsg::ApplyPatchApprovalRequest( + codex_protocol::protocol::ApplyPatchApprovalRequestEvent { + call_id: "patch-call-1".to_string(), + turn_id: "turn-1".to_string(), + changes: HashMap::new(), + reason: None, + grant_root: None, + }, + ), + }); + store.push_event(Event { + id: "ev-3".to_string(), + msg: EventMsg::TurnAborted(codex_protocol::protocol::TurnAbortedEvent { + turn_id: Some("turn-1".to_string()), + reason: TurnAbortReason::Replaced, + }), + }); + + let snapshot = store.snapshot(); + assert!(snapshot.events.iter().all(|event| { + !matches!( + &event.msg, + EventMsg::ExecApprovalRequest(_) | EventMsg::ApplyPatchApprovalRequest(_) + ) + })); + } + + #[test] + fn thread_event_snapshot_drops_resolved_elicitation_after_outbound_resolution() { + let mut store = ThreadEventStore::new(8); + let request_id = codex_protocol::mcp::RequestId::String("request-1".to_string()); + store.push_event(Event { + id: "ev-1".to_string(), + msg: EventMsg::ElicitationRequest(codex_protocol::approvals::ElicitationRequestEvent { + server_name: "server-1".to_string(), + id: request_id.clone(), + message: "Please confirm".to_string(), + }), + }); + + store.note_outbound_op(&Op::ResolveElicitation { + server_name: "server-1".to_string(), + request_id, + decision: codex_protocol::approvals::ElicitationAction::Accept, + }); + + let snapshot = store.snapshot(); + assert!( + snapshot.events.is_empty(), + "resolved elicitation prompt should not replay on thread switch" + ); + } +} diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index fee60fb1a..ab6b5af46 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -7186,7 +7186,7 @@ impl ChatWidget { self.bottom_pane.clear_esc_backtrack_hint(); } /// Forward an `Op` directly to codex. - pub(crate) fn submit_op(&mut self, op: Op) { + pub(crate) fn submit_op(&mut self, op: Op) -> bool { // Record outbound operation for session replay fidelity. crate::session_log::log_outbound_op(&op); if matches!(&op, Op::Review { .. }) && !self.bottom_pane.is_task_running() { @@ -7194,7 +7194,9 @@ impl ChatWidget { } if let Err(e) = self.codex_op_tx.send(op) { tracing::error!("failed to submit op: {e}"); + return false; } + true } fn on_list_mcp_tools(&mut self, ev: McpListToolsResponseEvent) {