From 0dc242a67229c99ac1de63dbdd5adc1d17481575 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 9 Mar 2026 10:27:19 -0700 Subject: [PATCH] Order websocket initialize after handshake (#13943) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What changed - `app-server` now sends initialize notifications to the specific websocket connection before that connection is marked outbound-ready. - `message_processor` now exposes the forwarding hook needed to target that initialize delivery path. ## Why this fixes the flake - This was a real websocket ordering bug. - The old code allowed “connection is ready for outbound broadcasts” to become true before the initialize notification had been routed to the intended client. - On CI this showed up as a race where tests would occasionally miss or misorder initialize delivery depending on scheduler timing. - Sending initialize to the exact connection first, then exposing it to the general outbound path, removes that race instead of hiding it with timing slack. ## Scope - Production logic change. --- codex-rs/app-server/src/lib.rs | 11 +++- codex-rs/app-server/src/message_processor.rs | 58 +++++++++++++++----- 2 files changed, 52 insertions(+), 17 deletions(-) diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 26f90eb61..8ad14b8e1 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -715,7 +715,6 @@ pub async fn run_main_with_transport( request, transport, &mut connection_state.session, - &connection_state.outbound_initialized, ) .await; if let Ok(mut opted_out_notification_methods) = connection_state @@ -738,7 +737,15 @@ pub async fn run_main_with_transport( std::sync::atomic::Ordering::Release, ); if !was_initialized && connection_state.session.initialized { - processor.send_initialize_notifications().await; + processor + .send_initialize_notifications_to_connection( + connection_id, + ) + .await; + processor.connection_initialized(connection_id).await; + connection_state + .outbound_initialized + .store(true, std::sync::atomic::Ordering::Release); } } JSONRPCMessage::Response(response) => { diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 76ca255c9..27a02257c 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -239,7 +239,6 @@ impl MessageProcessor { request: JSONRPCRequest, transport: AppServerTransport, session: &mut ConnectionSessionState, - outbound_initialized: &AtomicBool, ) { let request_span = crate::app_server_tracing::request_span(&request, transport, connection_id, session); @@ -280,14 +279,12 @@ impl MessageProcessor { } }; - self.handle_client_request( - connection_id, - request_id, - codex_request, - session, - outbound_initialized, - ) - .await; + // Websocket callers finalize outbound readiness in lib.rs after mirroring + // session state into outbound state and sending initialize notifications to + // this specific connection. Passing `None` avoids marking the connection + // ready too early from inside the shared request handler. + self.handle_client_request(connection_id, request_id, codex_request, session, None) + .await; } .instrument(request_span) .await; @@ -316,12 +313,15 @@ impl MessageProcessor { request_id = ?request_id.request_id, "app-server typed request" ); + // In-process clients do not have the websocket transport loop that performs + // post-initialize bookkeeping, so they still finalize outbound readiness in + // the shared request handler. self.handle_client_request( connection_id, request_id, request, session, - outbound_initialized, + Some(outbound_initialized), ) .await; } @@ -346,6 +346,26 @@ impl MessageProcessor { self.codex_message_processor.thread_created_receiver() } + pub(crate) async fn send_initialize_notifications_to_connection( + &self, + connection_id: ConnectionId, + ) { + for notification in self.config_warnings.iter().cloned() { + self.outgoing + .send_server_notification_to_connections( + &[connection_id], + ServerNotification::ConfigWarning(notification), + ) + .await; + } + } + + pub(crate) async fn connection_initialized(&self, connection_id: ConnectionId) { + self.codex_message_processor + .connection_initialized(connection_id) + .await; + } + pub(crate) async fn send_initialize_notifications(&self) { for notification in self.config_warnings.iter().cloned() { self.outgoing @@ -394,7 +414,10 @@ impl MessageProcessor { request_id: ConnectionRequestId, codex_request: ClientRequest, session: &mut ConnectionSessionState, - outbound_initialized: &AtomicBool, + // `Some(...)` means the caller wants initialize to immediately mark the + // connection outbound-ready. Websocket JSON-RPC calls pass `None` so + // lib.rs can deliver connection-scoped initialize notifications first. + outbound_initialized: Option<&AtomicBool>, ) { match codex_request { // Handle Initialize internally so CodexMessageProcessor does not have to concern @@ -472,10 +495,15 @@ impl MessageProcessor { self.outgoing.send_response(request_id, response).await; session.initialized = true; - outbound_initialized.store(true, Ordering::Release); - self.codex_message_processor - .connection_initialized(connection_id) - .await; + if let Some(outbound_initialized) = outbound_initialized { + // In-process clients can complete readiness immediately here. The + // websocket path defers this until lib.rs finishes transport-layer + // initialize handling for the specific connection. + outbound_initialized.store(true, Ordering::Release); + self.codex_message_processor + .connection_initialized(connection_id) + .await; + } return; } _ => {