diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 716559e94..d897350bf 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -1921,6 +1921,10 @@ impl CodexMessageProcessor { } } + pub(crate) async fn clear_all_thread_listeners(&self) { + self.thread_state_manager.clear_all_listeners().await; + } + pub(crate) async fn shutdown_threads(&self) { let report = self .thread_manager diff --git a/codex-rs/app-server/src/in_process.rs b/codex-rs/app-server/src/in_process.rs index 3a9286a5f..286210e6e 100644 --- a/codex-rs/app-server/src/in_process.rs +++ b/codex-rs/app-server/src/in_process.rs @@ -484,9 +484,10 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle { } processor.clear_runtime_references(); + processor.connection_closed(IN_PROCESS_CONNECTION_ID).await; + processor.clear_all_thread_listeners().await; processor.drain_background_tasks().await; processor.shutdown_threads().await; - processor.connection_closed(IN_PROCESS_CONNECTION_ID).await; }); let mut pending_request_responses = HashMap::>::new(); diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index e16e2e693..5eda1edeb 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -456,6 +456,12 @@ impl MessageProcessor { self.codex_message_processor.drain_background_tasks().await; } + pub(crate) async fn clear_all_thread_listeners(&self) { + self.codex_message_processor + .clear_all_thread_listeners() + .await; + } + pub(crate) async fn shutdown_threads(&self) { self.codex_message_processor.shutdown_threads().await; } diff --git a/codex-rs/app-server/src/thread_state.rs b/codex-rs/app-server/src/thread_state.rs index 5176fe133..a875a9149 100644 --- a/codex-rs/app-server/src/thread_state.rs +++ b/codex-rs/app-server/src/thread_state.rs @@ -196,6 +196,29 @@ impl ThreadStateManager { } } + pub(crate) async fn clear_all_listeners(&self) { + let thread_states = { + let state = self.state.lock().await; + state + .threads + .iter() + .map(|(thread_id, thread_entry)| (*thread_id, thread_entry.state.clone())) + .collect::>() + }; + + for (thread_id, thread_state) in thread_states { + let mut thread_state = thread_state.lock().await; + tracing::debug!( + thread_id = %thread_id, + listener_generation = thread_state.listener_generation, + had_listener = thread_state.cancel_tx.is_some(), + had_active_turn = thread_state.active_turn_snapshot().is_some(), + "clearing thread listener during app-server shutdown" + ); + thread_state.clear_listener(); + } + } + pub(crate) async fn unsubscribe_connection_from_thread( &self, thread_id: ThreadId,