Order websocket initialize after handshake (#13943)
## What changed - `app-server` now sends initialize notifications to the specific websocket connection before that connection is marked outbound-ready. - `message_processor` now exposes the forwarding hook needed to target that initialize delivery path. ## Why this fixes the flake - This was a real websocket ordering bug. - The old code allowed “connection is ready for outbound broadcasts” to become true before the initialize notification had been routed to the intended client. - On CI this showed up as a race where tests would occasionally miss or misorder initialize delivery depending on scheduler timing. - Sending initialize to the exact connection first, then exposing it to the general outbound path, removes that race instead of hiding it with timing slack. ## Scope - Production logic change.
This commit is contained in:
parent
6b68d1ef66
commit
0dc242a672
2 changed files with 52 additions and 17 deletions
|
|
@ -715,7 +715,6 @@ pub async fn run_main_with_transport(
|
|||
request,
|
||||
transport,
|
||||
&mut connection_state.session,
|
||||
&connection_state.outbound_initialized,
|
||||
)
|
||||
.await;
|
||||
if let Ok(mut opted_out_notification_methods) = connection_state
|
||||
|
|
@ -738,7 +737,15 @@ pub async fn run_main_with_transport(
|
|||
std::sync::atomic::Ordering::Release,
|
||||
);
|
||||
if !was_initialized && connection_state.session.initialized {
|
||||
processor.send_initialize_notifications().await;
|
||||
processor
|
||||
.send_initialize_notifications_to_connection(
|
||||
connection_id,
|
||||
)
|
||||
.await;
|
||||
processor.connection_initialized(connection_id).await;
|
||||
connection_state
|
||||
.outbound_initialized
|
||||
.store(true, std::sync::atomic::Ordering::Release);
|
||||
}
|
||||
}
|
||||
JSONRPCMessage::Response(response) => {
|
||||
|
|
|
|||
|
|
@ -239,7 +239,6 @@ impl MessageProcessor {
|
|||
request: JSONRPCRequest,
|
||||
transport: AppServerTransport,
|
||||
session: &mut ConnectionSessionState,
|
||||
outbound_initialized: &AtomicBool,
|
||||
) {
|
||||
let request_span =
|
||||
crate::app_server_tracing::request_span(&request, transport, connection_id, session);
|
||||
|
|
@ -280,14 +279,12 @@ impl MessageProcessor {
|
|||
}
|
||||
};
|
||||
|
||||
self.handle_client_request(
|
||||
connection_id,
|
||||
request_id,
|
||||
codex_request,
|
||||
session,
|
||||
outbound_initialized,
|
||||
)
|
||||
.await;
|
||||
// Websocket callers finalize outbound readiness in lib.rs after mirroring
|
||||
// session state into outbound state and sending initialize notifications to
|
||||
// this specific connection. Passing `None` avoids marking the connection
|
||||
// ready too early from inside the shared request handler.
|
||||
self.handle_client_request(connection_id, request_id, codex_request, session, None)
|
||||
.await;
|
||||
}
|
||||
.instrument(request_span)
|
||||
.await;
|
||||
|
|
@ -316,12 +313,15 @@ impl MessageProcessor {
|
|||
request_id = ?request_id.request_id,
|
||||
"app-server typed request"
|
||||
);
|
||||
// In-process clients do not have the websocket transport loop that performs
|
||||
// post-initialize bookkeeping, so they still finalize outbound readiness in
|
||||
// the shared request handler.
|
||||
self.handle_client_request(
|
||||
connection_id,
|
||||
request_id,
|
||||
request,
|
||||
session,
|
||||
outbound_initialized,
|
||||
Some(outbound_initialized),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
|
@ -346,6 +346,26 @@ impl MessageProcessor {
|
|||
self.codex_message_processor.thread_created_receiver()
|
||||
}
|
||||
|
||||
pub(crate) async fn send_initialize_notifications_to_connection(
|
||||
&self,
|
||||
connection_id: ConnectionId,
|
||||
) {
|
||||
for notification in self.config_warnings.iter().cloned() {
|
||||
self.outgoing
|
||||
.send_server_notification_to_connections(
|
||||
&[connection_id],
|
||||
ServerNotification::ConfigWarning(notification),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn connection_initialized(&self, connection_id: ConnectionId) {
|
||||
self.codex_message_processor
|
||||
.connection_initialized(connection_id)
|
||||
.await;
|
||||
}
|
||||
|
||||
pub(crate) async fn send_initialize_notifications(&self) {
|
||||
for notification in self.config_warnings.iter().cloned() {
|
||||
self.outgoing
|
||||
|
|
@ -394,7 +414,10 @@ impl MessageProcessor {
|
|||
request_id: ConnectionRequestId,
|
||||
codex_request: ClientRequest,
|
||||
session: &mut ConnectionSessionState,
|
||||
outbound_initialized: &AtomicBool,
|
||||
// `Some(...)` means the caller wants initialize to immediately mark the
|
||||
// connection outbound-ready. Websocket JSON-RPC calls pass `None` so
|
||||
// lib.rs can deliver connection-scoped initialize notifications first.
|
||||
outbound_initialized: Option<&AtomicBool>,
|
||||
) {
|
||||
match codex_request {
|
||||
// Handle Initialize internally so CodexMessageProcessor does not have to concern
|
||||
|
|
@ -472,10 +495,15 @@ impl MessageProcessor {
|
|||
self.outgoing.send_response(request_id, response).await;
|
||||
|
||||
session.initialized = true;
|
||||
outbound_initialized.store(true, Ordering::Release);
|
||||
self.codex_message_processor
|
||||
.connection_initialized(connection_id)
|
||||
.await;
|
||||
if let Some(outbound_initialized) = outbound_initialized {
|
||||
// In-process clients can complete readiness immediately here. The
|
||||
// websocket path defers this until lib.rs finishes transport-layer
|
||||
// initialize handling for the specific connection.
|
||||
outbound_initialized.store(true, Ordering::Release);
|
||||
self.codex_message_processor
|
||||
.connection_initialized(connection_id)
|
||||
.await;
|
||||
}
|
||||
return;
|
||||
}
|
||||
_ => {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue