diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 72dc7eaea..17a67fa4d 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -11,6 +11,7 @@ use crate::outgoing_message::OutgoingMessageSender; use crate::outgoing_message::OutgoingNotification; use crate::outgoing_message::ThreadScopedOutgoingMessageSender; use crate::thread_status::ThreadWatchManager; +use crate::thread_status::resolve_thread_status; use chrono::DateTime; use chrono::SecondsFormat; use chrono::Utc; @@ -2053,10 +2054,12 @@ impl CodexMessageProcessor { .upsert_thread(thread.clone()) .await; - thread.status = self - .thread_watch_manager - .loaded_status_for_thread(&thread.id) - .await; + thread.status = resolve_thread_status( + self.thread_watch_manager + .loaded_status_for_thread(&thread.id) + .await, + false, + ); let response = ThreadStartResponse { thread: thread.clone(), @@ -2372,10 +2375,12 @@ impl CodexMessageProcessor { match result { Ok(mut thread) => { - thread.status = self - .thread_watch_manager - .loaded_status_for_thread(&thread.id) - .await; + thread.status = resolve_thread_status( + self.thread_watch_manager + .loaded_status_for_thread(&thread.id) + .await, + false, + ); self.attach_thread_name(thread_id, &mut thread).await; let thread_id = thread.id.clone(); let response = ThreadUnarchiveResponse { thread }; @@ -2769,10 +2774,12 @@ impl CodexMessageProcessor { } } - thread.status = self - .thread_watch_manager - .loaded_status_for_thread(&thread.id) - .await; + thread.status = resolve_thread_status( + self.thread_watch_manager + .loaded_status_for_thread(&thread.id) + .await, + false, + ); let response = ThreadReadResponse { thread }; self.outgoing.send_response(request_id, response).await; } @@ -2949,10 +2956,12 @@ impl CodexMessageProcessor { .upsert_thread(thread.clone()) .await; - thread.status = self - .thread_watch_manager - .loaded_status_for_thread(&thread.id) - .await; + thread.status = resolve_thread_status( + self.thread_watch_manager + .loaded_status_for_thread(&thread.id) + .await, + false, + ); let response = ThreadResumeResponse { thread, @@ -3475,10 +3484,12 @@ impl CodexMessageProcessor { .upsert_thread(thread.clone()) .await; - thread.status = self - .thread_watch_manager - .loaded_status_for_thread(&thread.id) - .await; + thread.status = resolve_thread_status( + self.thread_watch_manager + .loaded_status_for_thread(&thread.id) + .await, + false, + ); let response = ThreadForkResponse { thread: thread.clone(), @@ -5616,10 +5627,12 @@ impl CodexMessageProcessor { self.thread_watch_manager .upsert_thread(thread.clone()) .await; - thread.status = self - .thread_watch_manager - .loaded_status_for_thread(&thread.id) - .await; + thread.status = resolve_thread_status( + self.thread_watch_manager + .loaded_status_for_thread(&thread.id) + .await, + false, + ); let notif = ThreadStartedNotification { thread }; self.outgoing .send_server_notification(ServerNotification::ThreadStarted(notif)) @@ -6283,6 +6296,9 @@ async fn handle_pending_thread_resume_request( let state = thread_state.lock().await; state.active_turn_snapshot() }; + let mut has_in_progress_turn = active_turn + .as_ref() + .is_some_and(|turn| matches!(turn.status, TurnStatus::InProgress)); let request_id = pending.request_id; let connection_id = request_id.connection_id; @@ -6309,9 +6325,20 @@ async fn handle_pending_thread_resume_request( return; } }; - thread.status = thread_watch_manager - .loaded_status_for_thread(&thread.id) - .await; + + has_in_progress_turn = has_in_progress_turn + || thread + .turns + .iter() + .any(|turn| matches!(turn.status, TurnStatus::InProgress)); + + let status = resolve_thread_status( + thread_watch_manager + .loaded_status_for_thread(&thread.id) + .await, + has_in_progress_turn, + ); + thread.status = status; match find_thread_name_by_id(codex_home, &conversation_id).await { Ok(thread_name) => thread.name = thread_name, diff --git a/codex-rs/app-server/src/thread_status.rs b/codex-rs/app-server/src/thread_status.rs index 48e3d9ab0..569363111 100644 --- a/codex-rs/app-server/src/thread_status.rs +++ b/codex-rs/app-server/src/thread_status.rs @@ -239,6 +239,22 @@ impl ThreadWatchManager { } } +pub(crate) fn resolve_thread_status( + status: ThreadStatus, + has_in_progress_turn: bool, +) -> ThreadStatus { + // Running-turn events can arrive before the watch runtime state is observed by + // the listener loop. In that window we prefer to reflect a real active turn as + // `Active` instead of `Idle`/`NotLoaded`. + if has_in_progress_turn && matches!(status, ThreadStatus::Idle | ThreadStatus::NotLoaded) { + return ThreadStatus::Active { + active_flags: Vec::new(), + }; + } + + status +} + #[derive(Default)] struct ThreadWatchState { runtime_by_thread_id: HashMap, @@ -459,6 +475,37 @@ mod tests { ); } + #[test] + fn resolves_in_progress_turn_to_active_status() { + let status = resolve_thread_status(ThreadStatus::Idle, true); + assert_eq!( + status, + ThreadStatus::Active { + active_flags: Vec::new(), + } + ); + + let status = resolve_thread_status(ThreadStatus::NotLoaded, true); + assert_eq!( + status, + ThreadStatus::Active { + active_flags: Vec::new(), + } + ); + } + + #[test] + fn keeps_status_when_no_in_progress_turn() { + assert_eq!( + resolve_thread_status(ThreadStatus::Idle, false), + ThreadStatus::Idle + ); + assert_eq!( + resolve_thread_status(ThreadStatus::SystemError, false), + ThreadStatus::SystemError + ); + } + #[tokio::test] async fn system_error_sets_idle_flag_until_next_turn() { let manager = ThreadWatchManager::new();