From 0eb2e6f9ee66f4d67584822f5bec91835ef0d4fe Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 18 Nov 2025 16:34:13 +0000 Subject: [PATCH] nit: app server (#6830) --- .../app-server/src/codex_message_processor.rs | 14 ++++---- codex-rs/app-server/src/fuzzy_file_search.rs | 4 +++ codex-rs/app-server/src/lib.rs | 2 +- codex-rs/app-server/src/outgoing_message.rs | 32 +++++++++++++------ 4 files changed, 35 insertions(+), 17 deletions(-) diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index c5ac6a14a..c5fa2a7fa 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -158,8 +158,8 @@ struct ActiveLogin { login_id: Uuid, } -impl ActiveLogin { - fn drop(&self) { +impl Drop for ActiveLogin { + fn drop(&mut self) { self.shutdown_handle.shutdown(); } } @@ -417,7 +417,7 @@ impl CodexMessageProcessor { { let mut guard = self.active_login.lock().await; if let Some(active) = guard.take() { - active.drop(); + drop(active); } } @@ -525,7 +525,7 @@ impl CodexMessageProcessor { { let mut guard = self.active_login.lock().await; if let Some(existing) = guard.take() { - existing.drop(); + drop(existing); } *guard = Some(ActiveLogin { shutdown_handle: shutdown_handle.clone(), @@ -615,7 +615,7 @@ impl CodexMessageProcessor { { let mut guard = self.active_login.lock().await; if let Some(existing) = guard.take() { - existing.drop(); + drop(existing); } *guard = Some(ActiveLogin { shutdown_handle: shutdown_handle.clone(), @@ -704,7 +704,7 @@ impl CodexMessageProcessor { let mut guard = self.active_login.lock().await; if guard.as_ref().map(|l| l.login_id) == Some(login_id) { if let Some(active) = guard.take() { - active.drop(); + drop(active); } Ok(()) } else { @@ -758,7 +758,7 @@ impl CodexMessageProcessor { { let mut guard = self.active_login.lock().await; if let Some(active) = guard.take() { - active.drop(); + drop(active); } } diff --git a/codex-rs/app-server/src/fuzzy_file_search.rs b/codex-rs/app-server/src/fuzzy_file_search.rs index fcb05852b..5c6d86e18 100644 --- a/codex-rs/app-server/src/fuzzy_file_search.rs +++ b/codex-rs/app-server/src/fuzzy_file_search.rs @@ -19,6 +19,10 @@ pub(crate) async fn run_fuzzy_file_search( roots: Vec, cancellation_flag: Arc, ) -> Vec { + if roots.is_empty() { + return Vec::new(); + } + #[expect(clippy::expect_used)] let limit_per_root = NonZero::new(LIMIT_PER_ROOT).expect("LIMIT_PER_ROOT should be a valid non-zero usize"); diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 4b65e66d2..9ad6f50b2 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -47,7 +47,7 @@ pub async fn run_main( ) -> IoResult<()> { // Set up channels. let (incoming_tx, mut incoming_rx) = mpsc::channel::(CHANNEL_CAPACITY); - let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded_channel::(); + let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(CHANNEL_CAPACITY); // Task: read from stdin, push to `incoming_tx`. let stdin_reader_handle = tokio::spawn({ diff --git a/codex-rs/app-server/src/outgoing_message.rs b/codex-rs/app-server/src/outgoing_message.rs index f0ee6cf93..40260c8b9 100644 --- a/codex-rs/app-server/src/outgoing_message.rs +++ b/codex-rs/app-server/src/outgoing_message.rs @@ -19,12 +19,12 @@ use crate::error_code::INTERNAL_ERROR_CODE; /// Sends messages to the client and manages request callbacks. pub(crate) struct OutgoingMessageSender { next_request_id: AtomicI64, - sender: mpsc::UnboundedSender, + sender: mpsc::Sender, request_id_to_callback: Mutex>>, } impl OutgoingMessageSender { - pub(crate) fn new(sender: mpsc::UnboundedSender) -> Self { + pub(crate) fn new(sender: mpsc::Sender) -> Self { Self { next_request_id: AtomicI64::new(0), sender, @@ -45,8 +45,12 @@ impl OutgoingMessageSender { } let outgoing_message = - OutgoingMessage::Request(request.request_with_id(outgoing_message_id)); - let _ = self.sender.send(outgoing_message); + OutgoingMessage::Request(request.request_with_id(outgoing_message_id.clone())); + if let Err(err) = self.sender.send(outgoing_message).await { + warn!("failed to send request {outgoing_message_id:?} to client: {err:?}"); + let mut request_id_to_callback = self.request_id_to_callback.lock().await; + request_id_to_callback.remove(&outgoing_message_id); + } rx_approve } @@ -72,7 +76,9 @@ impl OutgoingMessageSender { match serde_json::to_value(response) { Ok(result) => { let outgoing_message = OutgoingMessage::Response(OutgoingResponse { id, result }); - let _ = self.sender.send(outgoing_message); + if let Err(err) = self.sender.send(outgoing_message).await { + warn!("failed to send response to client: {err:?}"); + } } Err(err) => { self.send_error( @@ -89,21 +95,29 @@ impl OutgoingMessageSender { } pub(crate) async fn send_server_notification(&self, notification: ServerNotification) { - let _ = self + if let Err(err) = self .sender - .send(OutgoingMessage::AppServerNotification(notification)); + .send(OutgoingMessage::AppServerNotification(notification)) + .await + { + warn!("failed to send server notification to client: {err:?}"); + } } /// All notifications should be migrated to [`ServerNotification`] and /// [`OutgoingMessage::Notification`] should be removed. pub(crate) async fn send_notification(&self, notification: OutgoingNotification) { let outgoing_message = OutgoingMessage::Notification(notification); - let _ = self.sender.send(outgoing_message); + if let Err(err) = self.sender.send(outgoing_message).await { + warn!("failed to send notification to client: {err:?}"); + } } pub(crate) async fn send_error(&self, id: RequestId, error: JSONRPCErrorError) { let outgoing_message = OutgoingMessage::Error(OutgoingError { id, error }); - let _ = self.sender.send(outgoing_message); + if let Err(err) = self.sender.send(outgoing_message).await { + warn!("failed to send error to client: {err:?}"); + } } }