fix: prevent repeating interrupted turns (#9043)

## What
Record a model-visible `<turn_aborted>` marker in history when a turn is
interrupted, and treat it as a session prefix.

## Why
When a turn is interrupted, Codex emits `TurnAborted` but previously did
not persist anything model-visible in the conversation history. On the
next user turn, the model can’t tell the previous work was aborted and
may resume/repeat earlier actions (including duplicated side effects
like re-opening PRs).

Fixes: https://github.com/openai/codex/issues/9042

## How
On `TurnAbortReason::Interrupted`, append a hidden user message
containing a `<turn_aborted>…</turn_aborted>` marker and flush.
Treat `<turn_aborted>` like `<environment_context>` for session-prefix
filtering.
Add a regression test to ensure follow-up turns don’t repeat side
effects from an aborted turn.

## Testing
`just fmt`
`just fix -p codex-core`
`cargo test -p codex-core -- --test-threads=1`
`cargo test --all-features -- --test-threads=1`

---------

Co-authored-by: Skylar Graika <sgraika127@gmail.com>
Co-authored-by: jif-oai <jif@openai.com>
Co-authored-by: Eric Traut <etraut@openai.com>
This commit is contained in:
Skylar Graika 2026-01-20 13:07:28 -08:00 committed by GitHub
parent 79c5bf9835
commit b236f1c95d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 239 additions and 40 deletions

View file

@ -4340,6 +4340,8 @@ mod tests {
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
// Interrupts persist a model-visible `<turn_aborted>` marker into history, but there is no
// separate client-visible event for that marker (only `EventMsg::TurnAborted`).
let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.expect("timeout waiting for event")
@ -4348,6 +4350,7 @@ mod tests {
EventMsg::TurnAborted(e) => assert_eq!(TurnAbortReason::Interrupted, e.reason),
other => panic!("unexpected event: {other:?}"),
}
// No extra events should be emitted after an abort.
assert!(rx.try_recv().is_err());
}
@ -4370,11 +4373,17 @@ mod tests {
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
let evt = rx.recv().await.expect("event");
// Even if tasks handle cancellation gracefully, interrupts still result in `TurnAborted`
// being the only client-visible signal.
let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.expect("timeout waiting for event")
.expect("event");
match evt.msg {
EventMsg::TurnAborted(e) => assert_eq!(TurnAbortReason::Interrupted, e.reason),
other => panic!("unexpected event: {other:?}"),
}
// No extra events should be emitted after an abort.
assert!(rx.try_recv().is_err());
}
@ -4390,42 +4399,67 @@ mod tests {
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
// Drain events until we observe ExitedReviewMode; earlier
// RawResponseItem entries (e.g., environment context) may arrive first.
loop {
let evt = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
// Aborting a review task should exit review mode before surfacing the abort to the client.
// We scan for these events (rather than relying on fixed ordering) since unrelated events
// may interleave.
let mut exited_review_mode_idx = None;
let mut turn_aborted_idx = None;
let mut idx = 0usize;
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(3);
while tokio::time::Instant::now() < deadline {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
let evt = tokio::time::timeout(remaining, rx.recv())
.await
.expect("timeout waiting for first event")
.expect("first event");
.expect("timeout waiting for event")
.expect("event");
let event_idx = idx;
idx = idx.saturating_add(1);
match evt.msg {
EventMsg::ExitedReviewMode(ev) => {
assert!(ev.review_output.is_none());
exited_review_mode_idx = Some(event_idx);
}
EventMsg::TurnAborted(ev) => {
assert_eq!(TurnAbortReason::Interrupted, ev.reason);
turn_aborted_idx = Some(event_idx);
break;
}
// Ignore any non-critical events before exit.
_ => continue,
}
}
loop {
let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.expect("timeout waiting for next event")
.expect("event");
match evt.msg {
EventMsg::RawResponseItem(_) => continue,
EventMsg::ItemStarted(_) | EventMsg::ItemCompleted(_) => continue,
EventMsg::AgentMessage(_) => continue,
EventMsg::TurnAborted(e) => {
assert_eq!(TurnAbortReason::Interrupted, e.reason);
break;
}
other => panic!("unexpected second event: {other:?}"),
_ => {}
}
}
assert!(
exited_review_mode_idx.is_some(),
"expected ExitedReviewMode after abort"
);
assert!(
turn_aborted_idx.is_some(),
"expected TurnAborted after abort"
);
assert!(
exited_review_mode_idx.unwrap() < turn_aborted_idx.unwrap(),
"expected ExitedReviewMode before TurnAborted"
);
// TODO(jif) investigate what is this?
let history = sess.clone_history().await;
let _ = history.raw_items();
// The `<turn_aborted>` marker is silent in the event stream, so verify it is still
// recorded in history for the model.
assert!(
history.raw_items().iter().any(|item| {
let ResponseItem::Message { role, content, .. } = item else {
return false;
};
if role != "user" {
return false;
}
content.iter().any(|content_item| {
let ContentItem::InputText { text } = content_item else {
return false;
};
text.contains(crate::session_prefix::TURN_ABORTED_OPEN_TAG)
})
}),
"expected a model-visible turn aborted marker in history after interrupt"
);
}
#[tokio::test]

View file

@ -15,6 +15,7 @@ use crate::protocol::EventMsg;
use crate::protocol::TurnContextItem;
use crate::protocol::TurnStartedEvent;
use crate::protocol::WarningEvent;
use crate::session_prefix::TURN_ABORTED_OPEN_TAG;
use crate::truncate::TruncationPolicy;
use crate::truncate::approx_token_count;
use crate::truncate::truncate_text;
@ -223,11 +224,31 @@ pub(crate) fn collect_user_messages(items: &[ResponseItem]) -> Vec<String> {
Some(user.message())
}
}
_ => None,
_ => collect_turn_aborted_marker(item),
})
.collect()
}
fn collect_turn_aborted_marker(item: &ResponseItem) -> Option<String> {
let ResponseItem::Message { role, content, .. } = item else {
return None;
};
if role != "user" {
return None;
}
let text = content_items_to_text(content)?;
if text
.trim_start()
.to_ascii_lowercase()
.starts_with(TURN_ABORTED_OPEN_TAG)
{
Some(text)
} else {
None
}
}
pub(crate) fn is_summary_message(message: &str) -> bool {
message.starts_with(format!("{SUMMARY_PREFIX}\n").as_str())
}
@ -337,6 +358,7 @@ async fn drain_to_completed(
mod tests {
use super::*;
use crate::session_prefix::TURN_ABORTED_OPEN_TAG;
use pretty_assertions::assert_eq;
#[test]
@ -489,4 +511,41 @@ mod tests {
};
assert_eq!(summary, summary_text);
}
#[test]
fn build_compacted_history_preserves_turn_aborted_markers() {
let marker = format!(
"{TURN_ABORTED_OPEN_TAG}\n <turn_id>turn-1</turn_id>\n <reason>interrupted</reason>\n</turn_aborted>"
);
let items = vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: marker.clone(),
}],
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "real user message".to_string(),
}],
},
];
let user_messages = collect_user_messages(&items);
let history = build_compacted_history(Vec::new(), &user_messages, "SUMMARY");
let found_marker = history.iter().any(|item| match item {
ResponseItem::Message { role, content, .. } if role == "user" => {
content_items_to_text(content).is_some_and(|text| text == marker)
}
_ => false,
});
assert!(
found_marker,
"expected compacted history to retain <turn_aborted> marker"
);
}
}

View file

@ -2,6 +2,7 @@ use crate::codex::TurnContext;
use crate::context_manager::normalize;
use crate::instructions::SkillInstructions;
use crate::instructions::UserInstructions;
use crate::session_prefix::is_session_prefix;
use crate::truncate::TruncationPolicy;
use crate::truncate::approx_token_count;
use crate::truncate::approx_tokens_from_byte_count;
@ -328,12 +329,6 @@ fn estimate_reasoning_length(encoded_len: usize) -> usize {
.saturating_sub(650)
}
fn is_session_prefix(text: &str) -> bool {
let trimmed = text.trim_start();
let lowered = trimmed.to_ascii_lowercase();
lowered.starts_with("<environment_context>")
}
pub(crate) fn is_user_turn_boundary(item: &ResponseItem) -> bool {
let ResponseItem::Message { role, content, .. } = item else {
return false;

View file

@ -19,14 +19,9 @@ use uuid::Uuid;
use crate::instructions::SkillInstructions;
use crate::instructions::UserInstructions;
use crate::session_prefix::is_session_prefix;
use crate::user_shell_command::is_user_shell_command_text;
fn is_session_prefix(text: &str) -> bool {
let trimmed = text.trim_start();
let lowered = trimmed.to_ascii_lowercase();
lowered.starts_with("<environment_context>")
}
fn parse_user_message(message: &[ContentItem]) -> Option<UserMessageItem> {
if UserInstructions::is_user_instructions(message)
|| SkillInstructions::is_skill_instructions(message)

View file

@ -46,6 +46,7 @@ pub mod parse_command;
pub mod path_utils;
pub mod powershell;
pub mod sandboxing;
mod session_prefix;
mod stream_events_utils;
mod text_encoding;
pub mod token_data;

View file

@ -0,0 +1,15 @@
/// Helpers for identifying model-visible "session prefix" messages.
///
/// A session prefix is a user-role message that carries configuration or state needed by
/// follow-up turns (e.g. `<environment_context>`, `<turn_aborted>`). These items are persisted in
/// history so the model can see them, but they are not user intent and must not create user-turn
/// boundaries.
pub(crate) const ENVIRONMENT_CONTEXT_OPEN_TAG: &str = "<environment_context>";
pub(crate) const TURN_ABORTED_OPEN_TAG: &str = "<turn_aborted>";
/// Returns true if `text` starts with a session prefix marker (case-insensitive).
pub(crate) fn is_session_prefix(text: &str) -> bool {
let trimmed = text.trim_start();
let lowered = trimmed.to_ascii_lowercase();
lowered.starts_with(ENVIRONMENT_CONTEXT_OPEN_TAG) || lowered.starts_with(TURN_ABORTED_OPEN_TAG)
}

View file

@ -24,9 +24,13 @@ use crate::protocol::EventMsg;
use crate::protocol::TurnAbortReason;
use crate::protocol::TurnAbortedEvent;
use crate::protocol::TurnCompleteEvent;
use crate::session_prefix::TURN_ABORTED_OPEN_TAG;
use crate::state::ActiveTurn;
use crate::state::RunningTask;
use crate::state::TaskKind;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::user_input::UserInput;
pub(crate) use compact::CompactTask;
@ -37,6 +41,7 @@ pub(crate) use undo::UndoTask;
pub(crate) use user_shell::UserShellCommandTask;
const GRACEFULL_INTERRUPTION_TIMEOUT_MS: u64 = 100;
const TURN_ABORTED_INTERRUPTED_GUIDANCE: &str = "The user interrupted the previous turn. Do not continue or repeat work from that turn unless the user explicitly asks. If any tools/commands were aborted, they may have partially executed; verify current state before retrying.";
/// Thin wrapper that exposes the parts of [`Session`] task runners need.
#[derive(Clone)]
@ -242,6 +247,25 @@ impl Session {
.abort(session_ctx, Arc::clone(&task.turn_context))
.await;
if reason == TurnAbortReason::Interrupted {
let marker = ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!(
"{TURN_ABORTED_OPEN_TAG}\n <turn_id>{sub_id}</turn_id>\n <reason>interrupted</reason>\n <guidance>{TURN_ABORTED_INTERRUPTED_GUIDANCE}</guidance>\n</turn_aborted>"
),
}],
};
self.record_into_history(std::slice::from_ref(&marker), task.turn_context.as_ref())
.await;
self.persist_rollout_items(&[RolloutItem::ResponseItem(marker)])
.await;
// Ensure the marker is durably visible before emitting TurnAborted: some clients
// synchronously re-read the rollout on receipt of the abort event.
self.flush_rollout().await;
}
let event = EventMsg::TurnAborted(TurnAbortedEvent { reason });
self.send_event(task.turn_context.as_ref(), event).await;
}

View file

@ -163,3 +163,79 @@ async fn interrupt_tool_records_history_entries() {
"expected at least one tenth of a second of elapsed time, got {secs}"
);
}
/// After an interrupt we persist a model-visible `<turn_aborted>` marker in the conversation
/// history. This test asserts that the marker is included in the next `/responses` request.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn interrupt_persists_turn_aborted_marker_in_next_request() {
let command = "sleep 60";
let call_id = "call-turn-aborted-marker";
let args = json!({
"command": command,
"timeout_ms": 60_000
})
.to_string();
let first_body = sse(vec![
ev_response_created("resp-marker"),
ev_function_call(call_id, "shell_command", &args),
ev_completed("resp-marker"),
]);
let follow_up_body = sse(vec![
ev_response_created("resp-followup"),
ev_completed("resp-followup"),
]);
let server = start_mock_server().await;
let response_mock = mount_sse_sequence(&server, vec![first_body, follow_up_body]).await;
let fixture = test_codex()
.with_model("gpt-5.1")
.build(&server)
.await
.unwrap();
let codex = Arc::clone(&fixture.codex);
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "start interrupt marker".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandBegin(_))).await;
tokio::time::sleep(Duration::from_secs_f32(0.1)).await;
codex.submit(Op::Interrupt).await.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnAborted(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "follow up".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let requests = response_mock.requests();
assert_eq!(requests.len(), 2, "expected two calls to the responses API");
let follow_up_request = &requests[1];
let user_texts = follow_up_request.message_input_texts("user");
assert!(
user_texts
.iter()
.any(|text| text.contains("<turn_aborted>")),
"expected <turn_aborted> marker in follow-up request"
);
}