From a73efab8dd6ca8371dc637ae67d4de991d69fb2e Mon Sep 17 00:00:00 2001 From: Michael Bolin Date: Fri, 20 Feb 2026 20:36:04 -0800 Subject: [PATCH] fix: address flakiness in thread_resume_rejoins_running_thread_even_with_override_mismatch (#12381) ## Why `thread/resume` responses for already-running threads can be reported as `Idle` even while a turn is still in progress. This is caused by a timing window where the runtime watch state has not yet observed the running-thread transition, so API clients can receive stale status information at resume time. Possibly related: https://github.com/openai/codex/pull/11786 ## What - Add a shared status normalization helper, `resolve_thread_status`, in `codex-rs/app-server/src/thread_status.rs` that resolves `Idle`/`NotLoaded` to `Active { active_flags: [] }` when an in-progress turn is known. - Reuse this helper across thread response paths in `codex-rs/app-server/src/codex_message_processor.rs` (including `thread/start`, `thread/unarchive`, `thread/read`, `thread/resume`, `thread/fork`, and review/thread-started notification responses). - In `handle_pending_thread_resume_request`, use both the in-memory `active_turn_snapshot` and the resumed rollout turns to decide whether a turn is in progress before resolving thread status for the response. - Extend `thread_status` tests to validate the new status-resolution behavior directly. ## Verification - `cargo test -p codex-app-server suite::v2::thread_resume::thread_resume_rejoins_running_thread_even_with_override_mismatch` --- .../app-server/src/codex_message_processor.rs | 81 ++++++++++++------- codex-rs/app-server/src/thread_status.rs | 47 +++++++++++ 2 files changed, 101 insertions(+), 27 deletions(-) diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 72dc7eaea..17a67fa4d 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -11,6 +11,7 @@ use crate::outgoing_message::OutgoingMessageSender; use crate::outgoing_message::OutgoingNotification; use crate::outgoing_message::ThreadScopedOutgoingMessageSender; use crate::thread_status::ThreadWatchManager; +use crate::thread_status::resolve_thread_status; use chrono::DateTime; use chrono::SecondsFormat; use chrono::Utc; @@ -2053,10 +2054,12 @@ impl CodexMessageProcessor { .upsert_thread(thread.clone()) .await; - thread.status = self - .thread_watch_manager - .loaded_status_for_thread(&thread.id) - .await; + thread.status = resolve_thread_status( + self.thread_watch_manager + .loaded_status_for_thread(&thread.id) + .await, + false, + ); let response = ThreadStartResponse { thread: thread.clone(), @@ -2372,10 +2375,12 @@ impl CodexMessageProcessor { match result { Ok(mut thread) => { - thread.status = self - .thread_watch_manager - .loaded_status_for_thread(&thread.id) - .await; + thread.status = resolve_thread_status( + self.thread_watch_manager + .loaded_status_for_thread(&thread.id) + .await, + false, + ); self.attach_thread_name(thread_id, &mut thread).await; let thread_id = thread.id.clone(); let response = ThreadUnarchiveResponse { thread }; @@ -2769,10 +2774,12 @@ impl CodexMessageProcessor { } } - thread.status = self - .thread_watch_manager - .loaded_status_for_thread(&thread.id) - .await; + thread.status = resolve_thread_status( + self.thread_watch_manager + .loaded_status_for_thread(&thread.id) + .await, + false, + ); let response = ThreadReadResponse { thread }; self.outgoing.send_response(request_id, response).await; } @@ -2949,10 +2956,12 @@ impl CodexMessageProcessor { .upsert_thread(thread.clone()) .await; - thread.status = self - .thread_watch_manager - .loaded_status_for_thread(&thread.id) - .await; + thread.status = resolve_thread_status( + self.thread_watch_manager + .loaded_status_for_thread(&thread.id) + .await, + false, + ); let response = ThreadResumeResponse { thread, @@ -3475,10 +3484,12 @@ impl CodexMessageProcessor { .upsert_thread(thread.clone()) .await; - thread.status = self - .thread_watch_manager - .loaded_status_for_thread(&thread.id) - .await; + thread.status = resolve_thread_status( + self.thread_watch_manager + .loaded_status_for_thread(&thread.id) + .await, + false, + ); let response = ThreadForkResponse { thread: thread.clone(), @@ -5616,10 +5627,12 @@ impl CodexMessageProcessor { self.thread_watch_manager .upsert_thread(thread.clone()) .await; - thread.status = self - .thread_watch_manager - .loaded_status_for_thread(&thread.id) - .await; + thread.status = resolve_thread_status( + self.thread_watch_manager + .loaded_status_for_thread(&thread.id) + .await, + false, + ); let notif = ThreadStartedNotification { thread }; self.outgoing .send_server_notification(ServerNotification::ThreadStarted(notif)) @@ -6283,6 +6296,9 @@ async fn handle_pending_thread_resume_request( let state = thread_state.lock().await; state.active_turn_snapshot() }; + let mut has_in_progress_turn = active_turn + .as_ref() + .is_some_and(|turn| matches!(turn.status, TurnStatus::InProgress)); let request_id = pending.request_id; let connection_id = request_id.connection_id; @@ -6309,9 +6325,20 @@ async fn handle_pending_thread_resume_request( return; } }; - thread.status = thread_watch_manager - .loaded_status_for_thread(&thread.id) - .await; + + has_in_progress_turn = has_in_progress_turn + || thread + .turns + .iter() + .any(|turn| matches!(turn.status, TurnStatus::InProgress)); + + let status = resolve_thread_status( + thread_watch_manager + .loaded_status_for_thread(&thread.id) + .await, + has_in_progress_turn, + ); + thread.status = status; match find_thread_name_by_id(codex_home, &conversation_id).await { Ok(thread_name) => thread.name = thread_name, diff --git a/codex-rs/app-server/src/thread_status.rs b/codex-rs/app-server/src/thread_status.rs index 48e3d9ab0..569363111 100644 --- a/codex-rs/app-server/src/thread_status.rs +++ b/codex-rs/app-server/src/thread_status.rs @@ -239,6 +239,22 @@ impl ThreadWatchManager { } } +pub(crate) fn resolve_thread_status( + status: ThreadStatus, + has_in_progress_turn: bool, +) -> ThreadStatus { + // Running-turn events can arrive before the watch runtime state is observed by + // the listener loop. In that window we prefer to reflect a real active turn as + // `Active` instead of `Idle`/`NotLoaded`. + if has_in_progress_turn && matches!(status, ThreadStatus::Idle | ThreadStatus::NotLoaded) { + return ThreadStatus::Active { + active_flags: Vec::new(), + }; + } + + status +} + #[derive(Default)] struct ThreadWatchState { runtime_by_thread_id: HashMap, @@ -459,6 +475,37 @@ mod tests { ); } + #[test] + fn resolves_in_progress_turn_to_active_status() { + let status = resolve_thread_status(ThreadStatus::Idle, true); + assert_eq!( + status, + ThreadStatus::Active { + active_flags: Vec::new(), + } + ); + + let status = resolve_thread_status(ThreadStatus::NotLoaded, true); + assert_eq!( + status, + ThreadStatus::Active { + active_flags: Vec::new(), + } + ); + } + + #[test] + fn keeps_status_when_no_in_progress_turn() { + assert_eq!( + resolve_thread_status(ThreadStatus::Idle, false), + ThreadStatus::Idle + ); + assert_eq!( + resolve_thread_status(ThreadStatus::SystemError, false), + ThreadStatus::SystemError + ); + } + #[tokio::test] async fn system_error_sets_idle_flag_until_next_turn() { let manager = ThreadWatchManager::new();