fix: replay after /agent (#12663)

Filter the events after a`/agent` replay to prevent replaying decision
events
This commit is contained in:
jif-oai 2026-02-24 12:08:38 +00:00 committed by GitHub
parent 3fe365ad8a
commit 0679e70bfc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 632 additions and 6 deletions

View file

@ -104,6 +104,10 @@ use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::unbounded_channel;
use toml::Value as TomlValue;
mod pending_interactive_replay;
use self::pending_interactive_replay::PendingInteractiveReplayState;
const EXTERNAL_EDITOR_HINT: &str = "Save and close external editor to continue.";
const THREAD_EVENT_CHANNEL_CAPACITY: usize = 32768;
/// Baseline cadence for periodic stream commit animation ticks.
@ -252,6 +256,7 @@ struct ThreadEventStore {
session_configured: Option<Event>,
buffer: VecDeque<Event>,
user_message_ids: HashSet<String>,
pending_interactive_replay: PendingInteractiveReplayState,
capacity: usize,
active: bool,
}
@ -262,6 +267,7 @@ impl ThreadEventStore {
session_configured: None,
buffer: VecDeque::new(),
user_message_ids: HashSet::new(),
pending_interactive_replay: PendingInteractiveReplayState::default(),
capacity,
active: false,
}
@ -274,6 +280,7 @@ impl ThreadEventStore {
}
fn push_event(&mut self, event: Event) {
self.pending_interactive_replay.note_event(&event);
match &event.msg {
EventMsg::SessionConfigured(_) => {
self.session_configured = Some(event);
@ -308,19 +315,38 @@ impl ThreadEventStore {
self.buffer.push_back(event);
if self.buffer.len() > self.capacity
&& let Some(removed) = self.buffer.pop_front()
&& matches!(removed.msg, EventMsg::UserMessage(_))
&& !removed.id.is_empty()
{
self.user_message_ids.remove(&removed.id);
self.pending_interactive_replay.note_evicted_event(&removed);
if matches!(removed.msg, EventMsg::UserMessage(_)) && !removed.id.is_empty() {
self.user_message_ids.remove(&removed.id);
}
}
}
fn snapshot(&self) -> ThreadEventSnapshot {
ThreadEventSnapshot {
session_configured: self.session_configured.clone(),
events: self.buffer.iter().cloned().collect(),
// Thread switches replay buffered events into a rebuilt ChatWidget. Only replay
// interactive prompts that are still pending, or answered approvals/input will reappear.
events: self
.buffer
.iter()
.filter(|event| {
self.pending_interactive_replay
.should_replay_snapshot_event(event)
})
.cloned()
.collect(),
}
}
fn note_outbound_op(&mut self, op: &Op) {
self.pending_interactive_replay.note_outbound_op(op);
}
fn op_can_change_pending_replay_state(op: &Op) -> bool {
PendingInteractiveReplayState::op_can_change_state(op)
}
}
#[derive(Debug)]
@ -808,6 +834,20 @@ impl App {
self.active_thread_rx = None;
}
async fn note_active_thread_outbound_op(&mut self, op: &Op) {
if !ThreadEventStore::op_can_change_pending_replay_state(op) {
return;
}
let Some(thread_id) = self.active_thread_id else {
return;
};
let Some(channel) = self.thread_event_channels.get(&thread_id) else {
return;
};
let mut store = channel.store.lock().await;
store.note_outbound_op(op);
}
async fn enqueue_thread_event(&mut self, thread_id: ThreadId, event: Event) -> Result<()> {
let (sender, store) = {
let channel = self.ensure_thread_channel(thread_id);
@ -1816,7 +1856,12 @@ impl App {
return Ok(AppRunControl::Exit(ExitReason::Fatal(message)));
}
AppEvent::CodexOp(op) => {
self.chat_widget.submit_op(op);
let replay_state_op =
ThreadEventStore::op_can_change_pending_replay_state(&op).then(|| op.clone());
let submitted = self.chat_widget.submit_op(op);
if submitted && let Some(op) = replay_state_op.as_ref() {
self.note_active_thread_outbound_op(op).await;
}
}
AppEvent::DiffResult(text) => {
// Clear the in-progress state in the bottom pane

View file

@ -0,0 +1,579 @@
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use std::collections::HashMap;
use std::collections::HashSet;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct ElicitationRequestKey {
server_name: String,
request_id: codex_protocol::mcp::RequestId,
}
impl ElicitationRequestKey {
fn new(server_name: String, request_id: codex_protocol::mcp::RequestId) -> Self {
Self {
server_name,
request_id,
}
}
}
#[derive(Debug, Default)]
// Tracks which interactive prompts are still unresolved in the thread-event buffer.
//
// Thread snapshots are replayed when switching threads/agents. Most events should replay
// verbatim, but interactive prompts (approvals, request_user_input, MCP elicitations) must
// only replay if they are still pending. This state is updated from:
// - inbound events (`note_event`)
// - outbound ops that resolve a prompt (`note_outbound_op`)
// - buffer eviction (`note_evicted_event`)
//
// We keep both fast lookup sets (for snapshot filtering by call_id/request key) and
// turn-indexed queues/vectors so `TurnComplete`/`TurnAborted` can clear stale prompts tied
// to a turn. `request_user_input` removal is FIFO because the overlay answers queued prompts
// in FIFO order for a shared `turn_id`.
pub(super) struct PendingInteractiveReplayState {
exec_approval_call_ids: HashSet<String>,
exec_approval_call_ids_by_turn_id: HashMap<String, Vec<String>>,
patch_approval_call_ids: HashSet<String>,
patch_approval_call_ids_by_turn_id: HashMap<String, Vec<String>>,
elicitation_requests: HashSet<ElicitationRequestKey>,
request_user_input_call_ids: HashSet<String>,
request_user_input_call_ids_by_turn_id: HashMap<String, Vec<String>>,
}
impl PendingInteractiveReplayState {
pub(super) fn op_can_change_state(op: &Op) -> bool {
matches!(
op,
Op::ExecApproval { .. }
| Op::PatchApproval { .. }
| Op::ResolveElicitation { .. }
| Op::UserInputAnswer { .. }
| Op::Shutdown
)
}
pub(super) fn note_outbound_op(&mut self, op: &Op) {
match op {
Op::ExecApproval { id, turn_id, .. } => {
self.exec_approval_call_ids.remove(id);
if let Some(turn_id) = turn_id {
Self::remove_call_id_from_turn_map_entry(
&mut self.exec_approval_call_ids_by_turn_id,
turn_id,
id,
);
}
}
Op::PatchApproval { id, .. } => {
self.patch_approval_call_ids.remove(id);
Self::remove_call_id_from_turn_map(
&mut self.patch_approval_call_ids_by_turn_id,
id,
);
}
Op::ResolveElicitation {
server_name,
request_id,
..
} => {
self.elicitation_requests
.remove(&ElicitationRequestKey::new(
server_name.clone(),
request_id.clone(),
));
}
// `Op::UserInputAnswer` identifies the turn, not the prompt call_id. The UI
// answers queued prompts for the same turn in FIFO order, so remove the oldest
// queued call_id for that turn.
Op::UserInputAnswer { id, .. } => {
let mut remove_turn_entry = false;
if let Some(call_ids) = self.request_user_input_call_ids_by_turn_id.get_mut(id) {
if !call_ids.is_empty() {
let call_id = call_ids.remove(0);
self.request_user_input_call_ids.remove(&call_id);
}
if call_ids.is_empty() {
remove_turn_entry = true;
}
}
if remove_turn_entry {
self.request_user_input_call_ids_by_turn_id.remove(id);
}
}
Op::Shutdown => self.clear(),
_ => {}
}
}
pub(super) fn note_event(&mut self, event: &Event) {
match &event.msg {
EventMsg::ExecApprovalRequest(ev) => {
self.exec_approval_call_ids.insert(ev.call_id.clone());
self.exec_approval_call_ids_by_turn_id
.entry(ev.turn_id.clone())
.or_default()
.push(ev.call_id.clone());
}
EventMsg::ExecCommandBegin(ev) => {
self.exec_approval_call_ids.remove(&ev.call_id);
Self::remove_call_id_from_turn_map(
&mut self.exec_approval_call_ids_by_turn_id,
&ev.call_id,
);
}
EventMsg::ApplyPatchApprovalRequest(ev) => {
self.patch_approval_call_ids.insert(ev.call_id.clone());
self.patch_approval_call_ids_by_turn_id
.entry(ev.turn_id.clone())
.or_default()
.push(ev.call_id.clone());
}
EventMsg::PatchApplyBegin(ev) => {
self.patch_approval_call_ids.remove(&ev.call_id);
Self::remove_call_id_from_turn_map(
&mut self.patch_approval_call_ids_by_turn_id,
&ev.call_id,
);
}
EventMsg::ElicitationRequest(ev) => {
self.elicitation_requests.insert(ElicitationRequestKey::new(
ev.server_name.clone(),
ev.id.clone(),
));
}
EventMsg::RequestUserInput(ev) => {
self.request_user_input_call_ids.insert(ev.call_id.clone());
self.request_user_input_call_ids_by_turn_id
.entry(ev.turn_id.clone())
.or_default()
.push(ev.call_id.clone());
}
// A turn ending (normally or aborted/replaced) invalidates any unresolved
// turn-scoped approvals and request_user_input prompts from that turn.
EventMsg::TurnComplete(ev) => {
self.clear_exec_approval_turn(&ev.turn_id);
self.clear_patch_approval_turn(&ev.turn_id);
self.clear_request_user_input_turn(&ev.turn_id);
}
EventMsg::TurnAborted(ev) => {
if let Some(turn_id) = &ev.turn_id {
self.clear_exec_approval_turn(turn_id);
self.clear_patch_approval_turn(turn_id);
self.clear_request_user_input_turn(turn_id);
}
}
EventMsg::ShutdownComplete => self.clear(),
_ => {}
}
}
pub(super) fn note_evicted_event(&mut self, event: &Event) {
match &event.msg {
EventMsg::ExecApprovalRequest(ev) => {
self.exec_approval_call_ids.remove(&ev.call_id);
Self::remove_call_id_from_turn_map_entry(
&mut self.exec_approval_call_ids_by_turn_id,
&ev.turn_id,
&ev.call_id,
);
}
EventMsg::ApplyPatchApprovalRequest(ev) => {
self.patch_approval_call_ids.remove(&ev.call_id);
Self::remove_call_id_from_turn_map_entry(
&mut self.patch_approval_call_ids_by_turn_id,
&ev.turn_id,
&ev.call_id,
);
}
EventMsg::ElicitationRequest(ev) => {
self.elicitation_requests
.remove(&ElicitationRequestKey::new(
ev.server_name.clone(),
ev.id.clone(),
));
}
EventMsg::RequestUserInput(ev) => {
self.request_user_input_call_ids.remove(&ev.call_id);
let mut remove_turn_entry = false;
if let Some(call_ids) = self
.request_user_input_call_ids_by_turn_id
.get_mut(&ev.turn_id)
{
call_ids.retain(|call_id| call_id != &ev.call_id);
if call_ids.is_empty() {
remove_turn_entry = true;
}
}
if remove_turn_entry {
self.request_user_input_call_ids_by_turn_id
.remove(&ev.turn_id);
}
}
_ => {}
}
}
pub(super) fn should_replay_snapshot_event(&self, event: &Event) -> bool {
match &event.msg {
EventMsg::ExecApprovalRequest(ev) => self.exec_approval_call_ids.contains(&ev.call_id),
EventMsg::ApplyPatchApprovalRequest(ev) => {
self.patch_approval_call_ids.contains(&ev.call_id)
}
EventMsg::ElicitationRequest(ev) => {
self.elicitation_requests
.contains(&ElicitationRequestKey::new(
ev.server_name.clone(),
ev.id.clone(),
))
}
EventMsg::RequestUserInput(ev) => {
self.request_user_input_call_ids.contains(&ev.call_id)
}
_ => true,
}
}
fn clear_request_user_input_turn(&mut self, turn_id: &str) {
if let Some(call_ids) = self.request_user_input_call_ids_by_turn_id.remove(turn_id) {
for call_id in call_ids {
self.request_user_input_call_ids.remove(&call_id);
}
}
}
fn clear_exec_approval_turn(&mut self, turn_id: &str) {
if let Some(call_ids) = self.exec_approval_call_ids_by_turn_id.remove(turn_id) {
for call_id in call_ids {
self.exec_approval_call_ids.remove(&call_id);
}
}
}
fn clear_patch_approval_turn(&mut self, turn_id: &str) {
if let Some(call_ids) = self.patch_approval_call_ids_by_turn_id.remove(turn_id) {
for call_id in call_ids {
self.patch_approval_call_ids.remove(&call_id);
}
}
}
fn remove_call_id_from_turn_map(
call_ids_by_turn_id: &mut HashMap<String, Vec<String>>,
call_id: &str,
) {
call_ids_by_turn_id.retain(|_, call_ids| {
call_ids.retain(|queued_call_id| queued_call_id != call_id);
!call_ids.is_empty()
});
}
fn remove_call_id_from_turn_map_entry(
call_ids_by_turn_id: &mut HashMap<String, Vec<String>>,
turn_id: &str,
call_id: &str,
) {
let mut remove_turn_entry = false;
if let Some(call_ids) = call_ids_by_turn_id.get_mut(turn_id) {
call_ids.retain(|queued_call_id| queued_call_id != call_id);
if call_ids.is_empty() {
remove_turn_entry = true;
}
}
if remove_turn_entry {
call_ids_by_turn_id.remove(turn_id);
}
}
fn clear(&mut self) {
self.exec_approval_call_ids.clear();
self.exec_approval_call_ids_by_turn_id.clear();
self.patch_approval_call_ids.clear();
self.patch_approval_call_ids_by_turn_id.clear();
self.elicitation_requests.clear();
self.request_user_input_call_ids.clear();
self.request_user_input_call_ids_by_turn_id.clear();
}
}
#[cfg(test)]
mod tests {
use super::super::ThreadEventStore;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::TurnAbortReason;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use std::path::PathBuf;
#[test]
fn thread_event_snapshot_keeps_pending_request_user_input() {
let mut store = ThreadEventStore::new(8);
let request = Event {
id: "ev-1".to_string(),
msg: EventMsg::RequestUserInput(
codex_protocol::request_user_input::RequestUserInputEvent {
call_id: "call-1".to_string(),
turn_id: "turn-1".to_string(),
questions: Vec::new(),
},
),
};
store.push_event(request);
let snapshot = store.snapshot();
assert_eq!(snapshot.events.len(), 1);
assert!(matches!(
snapshot.events.first().map(|event| &event.msg),
Some(EventMsg::RequestUserInput(_))
));
}
#[test]
fn thread_event_snapshot_drops_resolved_request_user_input_after_user_answer() {
let mut store = ThreadEventStore::new(8);
store.push_event(Event {
id: "ev-1".to_string(),
msg: EventMsg::RequestUserInput(
codex_protocol::request_user_input::RequestUserInputEvent {
call_id: "call-1".to_string(),
turn_id: "turn-1".to_string(),
questions: Vec::new(),
},
),
});
store.note_outbound_op(&Op::UserInputAnswer {
id: "turn-1".to_string(),
response: codex_protocol::request_user_input::RequestUserInputResponse {
answers: HashMap::new(),
},
});
let snapshot = store.snapshot();
assert!(
snapshot.events.is_empty(),
"resolved request_user_input prompt should not replay on thread switch"
);
}
#[test]
fn thread_event_snapshot_drops_resolved_exec_approval_after_outbound_approval_call_id() {
let mut store = ThreadEventStore::new(8);
store.push_event(Event {
id: "ev-1".to_string(),
msg: EventMsg::ExecApprovalRequest(
codex_protocol::protocol::ExecApprovalRequestEvent {
call_id: "call-1".to_string(),
approval_id: Some("approval-1".to_string()),
turn_id: "turn-1".to_string(),
command: vec!["echo".to_string(), "hi".to_string()],
cwd: PathBuf::from("/tmp"),
reason: None,
network_approval_context: None,
proposed_execpolicy_amendment: None,
proposed_network_policy_amendments: None,
parsed_cmd: Vec::new(),
},
),
});
store.note_outbound_op(&Op::ExecApproval {
id: "call-1".to_string(),
turn_id: Some("turn-1".to_string()),
decision: codex_protocol::protocol::ReviewDecision::Approved,
});
let snapshot = store.snapshot();
assert!(
snapshot.events.is_empty(),
"resolved exec approval prompt should not replay on thread switch"
);
}
#[test]
fn thread_event_snapshot_drops_answered_request_user_input_for_multi_prompt_turn() {
let mut store = ThreadEventStore::new(8);
store.push_event(Event {
id: "ev-1".to_string(),
msg: EventMsg::RequestUserInput(
codex_protocol::request_user_input::RequestUserInputEvent {
call_id: "call-1".to_string(),
turn_id: "turn-1".to_string(),
questions: Vec::new(),
},
),
});
store.note_outbound_op(&Op::UserInputAnswer {
id: "turn-1".to_string(),
response: codex_protocol::request_user_input::RequestUserInputResponse {
answers: HashMap::new(),
},
});
store.push_event(Event {
id: "ev-2".to_string(),
msg: EventMsg::RequestUserInput(
codex_protocol::request_user_input::RequestUserInputEvent {
call_id: "call-2".to_string(),
turn_id: "turn-1".to_string(),
questions: Vec::new(),
},
),
});
let snapshot = store.snapshot();
assert_eq!(snapshot.events.len(), 1);
assert!(matches!(
snapshot.events.first().map(|event| &event.msg),
Some(EventMsg::RequestUserInput(ev)) if ev.call_id == "call-2"
));
}
#[test]
fn thread_event_snapshot_keeps_newer_request_user_input_pending_when_same_turn_has_queue() {
let mut store = ThreadEventStore::new(8);
store.push_event(Event {
id: "ev-1".to_string(),
msg: EventMsg::RequestUserInput(
codex_protocol::request_user_input::RequestUserInputEvent {
call_id: "call-1".to_string(),
turn_id: "turn-1".to_string(),
questions: Vec::new(),
},
),
});
store.push_event(Event {
id: "ev-2".to_string(),
msg: EventMsg::RequestUserInput(
codex_protocol::request_user_input::RequestUserInputEvent {
call_id: "call-2".to_string(),
turn_id: "turn-1".to_string(),
questions: Vec::new(),
},
),
});
store.note_outbound_op(&Op::UserInputAnswer {
id: "turn-1".to_string(),
response: codex_protocol::request_user_input::RequestUserInputResponse {
answers: HashMap::new(),
},
});
let snapshot = store.snapshot();
assert_eq!(snapshot.events.len(), 1);
assert!(matches!(
snapshot.events.first().map(|event| &event.msg),
Some(EventMsg::RequestUserInput(ev)) if ev.call_id == "call-2"
));
}
#[test]
fn thread_event_snapshot_drops_resolved_patch_approval_after_outbound_approval() {
let mut store = ThreadEventStore::new(8);
store.push_event(Event {
id: "ev-1".to_string(),
msg: EventMsg::ApplyPatchApprovalRequest(
codex_protocol::protocol::ApplyPatchApprovalRequestEvent {
call_id: "call-1".to_string(),
turn_id: "turn-1".to_string(),
changes: HashMap::new(),
reason: None,
grant_root: None,
},
),
});
store.note_outbound_op(&Op::PatchApproval {
id: "call-1".to_string(),
decision: codex_protocol::protocol::ReviewDecision::Approved,
});
let snapshot = store.snapshot();
assert!(
snapshot.events.is_empty(),
"resolved patch approval prompt should not replay on thread switch"
);
}
#[test]
fn thread_event_snapshot_drops_pending_approvals_when_turn_aborts() {
let mut store = ThreadEventStore::new(8);
store.push_event(Event {
id: "ev-1".to_string(),
msg: EventMsg::ExecApprovalRequest(
codex_protocol::protocol::ExecApprovalRequestEvent {
call_id: "exec-call-1".to_string(),
approval_id: Some("approval-1".to_string()),
turn_id: "turn-1".to_string(),
command: vec!["echo".to_string(), "hi".to_string()],
cwd: PathBuf::from("/tmp"),
reason: None,
network_approval_context: None,
proposed_execpolicy_amendment: None,
proposed_network_policy_amendments: None,
parsed_cmd: Vec::new(),
},
),
});
store.push_event(Event {
id: "ev-2".to_string(),
msg: EventMsg::ApplyPatchApprovalRequest(
codex_protocol::protocol::ApplyPatchApprovalRequestEvent {
call_id: "patch-call-1".to_string(),
turn_id: "turn-1".to_string(),
changes: HashMap::new(),
reason: None,
grant_root: None,
},
),
});
store.push_event(Event {
id: "ev-3".to_string(),
msg: EventMsg::TurnAborted(codex_protocol::protocol::TurnAbortedEvent {
turn_id: Some("turn-1".to_string()),
reason: TurnAbortReason::Replaced,
}),
});
let snapshot = store.snapshot();
assert!(snapshot.events.iter().all(|event| {
!matches!(
&event.msg,
EventMsg::ExecApprovalRequest(_) | EventMsg::ApplyPatchApprovalRequest(_)
)
}));
}
#[test]
fn thread_event_snapshot_drops_resolved_elicitation_after_outbound_resolution() {
let mut store = ThreadEventStore::new(8);
let request_id = codex_protocol::mcp::RequestId::String("request-1".to_string());
store.push_event(Event {
id: "ev-1".to_string(),
msg: EventMsg::ElicitationRequest(codex_protocol::approvals::ElicitationRequestEvent {
server_name: "server-1".to_string(),
id: request_id.clone(),
message: "Please confirm".to_string(),
}),
});
store.note_outbound_op(&Op::ResolveElicitation {
server_name: "server-1".to_string(),
request_id,
decision: codex_protocol::approvals::ElicitationAction::Accept,
});
let snapshot = store.snapshot();
assert!(
snapshot.events.is_empty(),
"resolved elicitation prompt should not replay on thread switch"
);
}
}

View file

@ -7186,7 +7186,7 @@ impl ChatWidget {
self.bottom_pane.clear_esc_backtrack_hint();
}
/// Forward an `Op` directly to codex.
pub(crate) fn submit_op(&mut self, op: Op) {
pub(crate) fn submit_op(&mut self, op: Op) -> bool {
// Record outbound operation for session replay fidelity.
crate::session_log::log_outbound_op(&op);
if matches!(&op, Op::Review { .. }) && !self.bottom_pane.is_task_running() {
@ -7194,7 +7194,9 @@ impl ChatWidget {
}
if let Err(e) = self.codex_op_tx.send(op) {
tracing::error!("failed to submit op: {e}");
return false;
}
true
}
fn on_list_mcp_tools(&mut self, ev: McpListToolsResponseEvent) {