diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 799bec403..6bdcc8258 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -130,7 +130,7 @@ Example with notification opt-out: - `thread/status/changed` — notification emitted when a loaded thread’s status changes (`threadId` + new `status`). - `thread/archive` — move a thread’s rollout file into the archived directory; returns `{}` on success and emits `thread/archived`. - `thread/unsubscribe` — unsubscribe this connection from thread turn/item events. If this was the last subscriber, the server shuts down and unloads the thread, then emits `thread/closed`. -- `thread/name/set` — set or update a thread’s user-facing name for either a loaded thread or a persisted rollout; returns `{}` on success. Thread names are not required to be unique; name lookups resolve to the most recently updated thread. +- `thread/name/set` — set or update a thread’s user-facing name for either a loaded thread or a persisted rollout; returns `{}` on success and emits `thread/name/updated` to initialized, opted-in clients. Thread names are not required to be unique; name lookups resolve to the most recently updated thread. - `thread/unarchive` — move an archived rollout file back into the sessions directory; returns the restored `thread` on success and emits `thread/unarchived`. - `thread/compact/start` — trigger conversation history compaction for a thread; returns `{}` immediately while progress streams through standard turn/item notifications. - `thread/backgroundTerminals/clean` — terminate all running background terminals for a thread (experimental; requires `capabilities.experimentalApi`); returns `{}` when the cleanup request is accepted. diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 33ae38c73..499ac9ffc 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -1590,7 +1590,9 @@ pub(crate) async fn apply_bespoke_event_handling( thread_name: thread_name_event.thread_name, }; outgoing - .send_server_notification(ServerNotification::ThreadNameUpdated(notification)) + .send_global_server_notification(ServerNotification::ThreadNameUpdated( + notification, + )) .await; } } diff --git a/codex-rs/app-server/src/outgoing_message.rs b/codex-rs/app-server/src/outgoing_message.rs index 368d5d21a..bc38bd36c 100644 --- a/codex-rs/app-server/src/outgoing_message.rs +++ b/codex-rs/app-server/src/outgoing_message.rs @@ -101,6 +101,10 @@ impl ThreadScopedOutgoingMessageSender { .await; } + pub(crate) async fn send_global_server_notification(&self, notification: ServerNotification) { + self.outgoing.send_server_notification(notification).await; + } + pub(crate) async fn abort_pending_server_requests(&self) { self.outgoing .cancel_requests_for_thread( diff --git a/codex-rs/app-server/tests/suite/v2/connection_handling_websocket.rs b/codex-rs/app-server/tests/suite/v2/connection_handling_websocket.rs index 979994538..26fffd8a1 100644 --- a/codex-rs/app-server/tests/suite/v2/connection_handling_websocket.rs +++ b/codex-rs/app-server/tests/suite/v2/connection_handling_websocket.rs @@ -6,6 +6,7 @@ use codex_app_server_protocol::ClientInfo; use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCRequest; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; @@ -202,6 +203,56 @@ pub(super) async fn read_response_for_id( } } +pub(super) async fn read_notification_for_method( + stream: &mut WsClient, + method: &str, +) -> Result { + loop { + let message = read_jsonrpc_message(stream).await?; + if let JSONRPCMessage::Notification(notification) = message + && notification.method == method + { + return Ok(notification); + } + } +} + +pub(super) async fn read_response_and_notification_for_method( + stream: &mut WsClient, + id: i64, + method: &str, +) -> Result<(JSONRPCResponse, JSONRPCNotification)> { + let target_id = RequestId::Integer(id); + let mut response = None; + let mut notification = None; + + while response.is_none() || notification.is_none() { + let message = read_jsonrpc_message(stream).await?; + match message { + JSONRPCMessage::Response(candidate) if candidate.id == target_id => { + response = Some(candidate); + } + JSONRPCMessage::Notification(candidate) if candidate.method == method => { + if notification.replace(candidate).is_some() { + bail!( + "received duplicate notification for method `{method}` before completing paired read" + ); + } + } + _ => {} + } + } + + let Some(response) = response else { + bail!("response must be set before returning"); + }; + let Some(notification) = notification else { + bail!("notification must be set before returning"); + }; + + Ok((response, notification)) +} + async fn read_error_for_id(stream: &mut WsClient, id: i64) -> Result { let target_id = RequestId::Integer(id); loop { @@ -237,7 +288,7 @@ async fn read_jsonrpc_message(stream: &mut WsClient) -> Result { } } -async fn assert_no_message(stream: &mut WsClient, wait_for: Duration) -> Result<()> { +pub(super) async fn assert_no_message(stream: &mut WsClient, wait_for: Duration) -> Result<()> { match timeout(wait_for, stream.next()).await { Ok(Some(Ok(frame))) => bail!("unexpected frame while waiting for silence: {frame:?}"), Ok(Some(Err(err))) => bail!("unexpected websocket read error: {err}"), diff --git a/codex-rs/app-server/tests/suite/v2/mod.rs b/codex-rs/app-server/tests/suite/v2/mod.rs index 283928d6a..f85849250 100644 --- a/codex-rs/app-server/tests/suite/v2/mod.rs +++ b/codex-rs/app-server/tests/suite/v2/mod.rs @@ -28,6 +28,7 @@ mod thread_fork; mod thread_list; mod thread_loaded_list; mod thread_metadata_update; +mod thread_name_websocket; mod thread_read; mod thread_resume; mod thread_rollback; diff --git a/codex-rs/app-server/tests/suite/v2/thread_name_websocket.rs b/codex-rs/app-server/tests/suite/v2/thread_name_websocket.rs new file mode 100644 index 000000000..f2c03cb94 --- /dev/null +++ b/codex-rs/app-server/tests/suite/v2/thread_name_websocket.rs @@ -0,0 +1,171 @@ +use super::connection_handling_websocket::DEFAULT_READ_TIMEOUT; +use super::connection_handling_websocket::WsClient; +use super::connection_handling_websocket::assert_no_message; +use super::connection_handling_websocket::connect_websocket; +use super::connection_handling_websocket::create_config_toml; +use super::connection_handling_websocket::read_notification_for_method; +use super::connection_handling_websocket::read_response_and_notification_for_method; +use super::connection_handling_websocket::read_response_for_id; +use super::connection_handling_websocket::reserve_local_addr; +use super::connection_handling_websocket::send_initialize_request; +use super::connection_handling_websocket::send_request; +use super::connection_handling_websocket::spawn_websocket_server; +use anyhow::Context; +use anyhow::Result; +use app_test_support::create_fake_rollout_with_text_elements; +use app_test_support::create_mock_responses_server_repeating_assistant; +use app_test_support::to_response; +use codex_app_server_protocol::JSONRPCNotification; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::ThreadNameUpdatedNotification; +use codex_app_server_protocol::ThreadResumeParams; +use codex_app_server_protocol::ThreadResumeResponse; +use codex_app_server_protocol::ThreadSetNameParams; +use codex_app_server_protocol::ThreadSetNameResponse; +use pretty_assertions::assert_eq; +use tempfile::TempDir; +use tokio::time::Duration; +use tokio::time::timeout; + +#[tokio::test] +async fn thread_name_updated_broadcasts_for_loaded_threads() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri(), "never")?; + let conversation_id = create_rollout(codex_home.path(), "2025-01-05T12-00-00")?; + + let bind_addr = reserve_local_addr()?; + let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?; + + let result = async { + let mut ws1 = connect_websocket(bind_addr).await?; + let mut ws2 = connect_websocket(bind_addr).await?; + initialize_both_clients(&mut ws1, &mut ws2).await?; + + send_request( + &mut ws1, + "thread/resume", + 10, + Some(serde_json::to_value(ThreadResumeParams { + thread_id: conversation_id.clone(), + ..Default::default() + })?), + ) + .await?; + let resume_resp: JSONRPCResponse = read_response_for_id(&mut ws1, 10).await?; + let resume: ThreadResumeResponse = to_response::(resume_resp)?; + assert_eq!(resume.thread.id, conversation_id); + + let renamed = "Loaded rename"; + send_request( + &mut ws1, + "thread/name/set", + 11, + Some(serde_json::to_value(ThreadSetNameParams { + thread_id: conversation_id.clone(), + name: renamed.to_string(), + })?), + ) + .await?; + let (rename_resp, ws1_notification) = + read_response_and_notification_for_method(&mut ws1, 11, "thread/name/updated").await?; + let _: ThreadSetNameResponse = to_response::(rename_resp)?; + assert_thread_name_updated(ws1_notification, &conversation_id, renamed)?; + + let ws2_notification = + read_notification_for_method(&mut ws2, "thread/name/updated").await?; + assert_thread_name_updated(ws2_notification, &conversation_id, renamed)?; + + assert_no_message(&mut ws1, Duration::from_millis(250)).await?; + assert_no_message(&mut ws2, Duration::from_millis(250)).await?; + Ok(()) + } + .await; + + process + .kill() + .await + .context("failed to stop websocket app-server process")?; + result +} + +#[tokio::test] +async fn thread_name_updated_broadcasts_for_not_loaded_threads() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri(), "never")?; + let conversation_id = create_rollout(codex_home.path(), "2025-01-05T12-05-00")?; + + let bind_addr = reserve_local_addr()?; + let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?; + + let result = async { + let mut ws1 = connect_websocket(bind_addr).await?; + let mut ws2 = connect_websocket(bind_addr).await?; + initialize_both_clients(&mut ws1, &mut ws2).await?; + + let renamed = "Stored rename"; + send_request( + &mut ws1, + "thread/name/set", + 20, + Some(serde_json::to_value(ThreadSetNameParams { + thread_id: conversation_id.clone(), + name: renamed.to_string(), + })?), + ) + .await?; + let (rename_resp, ws1_notification) = + read_response_and_notification_for_method(&mut ws1, 20, "thread/name/updated").await?; + let _: ThreadSetNameResponse = to_response::(rename_resp)?; + assert_thread_name_updated(ws1_notification, &conversation_id, renamed)?; + + let ws2_notification = + read_notification_for_method(&mut ws2, "thread/name/updated").await?; + assert_thread_name_updated(ws2_notification, &conversation_id, renamed)?; + + assert_no_message(&mut ws1, Duration::from_millis(250)).await?; + assert_no_message(&mut ws2, Duration::from_millis(250)).await?; + Ok(()) + } + .await; + + process + .kill() + .await + .context("failed to stop websocket app-server process")?; + result +} + +async fn initialize_both_clients(ws1: &mut WsClient, ws2: &mut WsClient) -> Result<()> { + send_initialize_request(ws1, 1, "ws_client_one").await?; + timeout(DEFAULT_READ_TIMEOUT, read_response_for_id(ws1, 1)).await??; + + send_initialize_request(ws2, 2, "ws_client_two").await?; + timeout(DEFAULT_READ_TIMEOUT, read_response_for_id(ws2, 2)).await??; + Ok(()) +} + +fn create_rollout(codex_home: &std::path::Path, filename_ts: &str) -> Result { + create_fake_rollout_with_text_elements( + codex_home, + filename_ts, + "2025-01-05T12:00:00Z", + "Saved user message", + Vec::new(), + Some("mock_provider"), + None, + ) +} + +fn assert_thread_name_updated( + notification: JSONRPCNotification, + thread_id: &str, + thread_name: &str, +) -> Result<()> { + let notification: ThreadNameUpdatedNotification = + serde_json::from_value(notification.params.context("thread/name/updated params")?)?; + assert_eq!(notification.thread_id, thread_id); + assert_eq!(notification.thread_name.as_deref(), Some(thread_name)); + Ok(()) +}