app-server: Emit thread/name/updated event globally (#13674)
This commit is contained in:
parent
3449e00bc9
commit
51fcdc760d
6 changed files with 232 additions and 3 deletions
|
|
@ -130,7 +130,7 @@ Example with notification opt-out:
|
|||
- `thread/status/changed` — notification emitted when a loaded thread’s status changes (`threadId` + new `status`).
|
||||
- `thread/archive` — move a thread’s rollout file into the archived directory; returns `{}` on success and emits `thread/archived`.
|
||||
- `thread/unsubscribe` — unsubscribe this connection from thread turn/item events. If this was the last subscriber, the server shuts down and unloads the thread, then emits `thread/closed`.
|
||||
- `thread/name/set` — set or update a thread’s user-facing name for either a loaded thread or a persisted rollout; returns `{}` on success. Thread names are not required to be unique; name lookups resolve to the most recently updated thread.
|
||||
- `thread/name/set` — set or update a thread’s user-facing name for either a loaded thread or a persisted rollout; returns `{}` on success and emits `thread/name/updated` to initialized, opted-in clients. Thread names are not required to be unique; name lookups resolve to the most recently updated thread.
|
||||
- `thread/unarchive` — move an archived rollout file back into the sessions directory; returns the restored `thread` on success and emits `thread/unarchived`.
|
||||
- `thread/compact/start` — trigger conversation history compaction for a thread; returns `{}` immediately while progress streams through standard turn/item notifications.
|
||||
- `thread/backgroundTerminals/clean` — terminate all running background terminals for a thread (experimental; requires `capabilities.experimentalApi`); returns `{}` when the cleanup request is accepted.
|
||||
|
|
|
|||
|
|
@ -1590,7 +1590,9 @@ pub(crate) async fn apply_bespoke_event_handling(
|
|||
thread_name: thread_name_event.thread_name,
|
||||
};
|
||||
outgoing
|
||||
.send_server_notification(ServerNotification::ThreadNameUpdated(notification))
|
||||
.send_global_server_notification(ServerNotification::ThreadNameUpdated(
|
||||
notification,
|
||||
))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -101,6 +101,10 @@ impl ThreadScopedOutgoingMessageSender {
|
|||
.await;
|
||||
}
|
||||
|
||||
pub(crate) async fn send_global_server_notification(&self, notification: ServerNotification) {
|
||||
self.outgoing.send_server_notification(notification).await;
|
||||
}
|
||||
|
||||
pub(crate) async fn abort_pending_server_requests(&self) {
|
||||
self.outgoing
|
||||
.cancel_requests_for_thread(
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ use codex_app_server_protocol::ClientInfo;
|
|||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCRequest;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
|
|
@ -202,6 +203,56 @@ pub(super) async fn read_response_for_id(
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) async fn read_notification_for_method(
|
||||
stream: &mut WsClient,
|
||||
method: &str,
|
||||
) -> Result<JSONRPCNotification> {
|
||||
loop {
|
||||
let message = read_jsonrpc_message(stream).await?;
|
||||
if let JSONRPCMessage::Notification(notification) = message
|
||||
&& notification.method == method
|
||||
{
|
||||
return Ok(notification);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn read_response_and_notification_for_method(
|
||||
stream: &mut WsClient,
|
||||
id: i64,
|
||||
method: &str,
|
||||
) -> Result<(JSONRPCResponse, JSONRPCNotification)> {
|
||||
let target_id = RequestId::Integer(id);
|
||||
let mut response = None;
|
||||
let mut notification = None;
|
||||
|
||||
while response.is_none() || notification.is_none() {
|
||||
let message = read_jsonrpc_message(stream).await?;
|
||||
match message {
|
||||
JSONRPCMessage::Response(candidate) if candidate.id == target_id => {
|
||||
response = Some(candidate);
|
||||
}
|
||||
JSONRPCMessage::Notification(candidate) if candidate.method == method => {
|
||||
if notification.replace(candidate).is_some() {
|
||||
bail!(
|
||||
"received duplicate notification for method `{method}` before completing paired read"
|
||||
);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
let Some(response) = response else {
|
||||
bail!("response must be set before returning");
|
||||
};
|
||||
let Some(notification) = notification else {
|
||||
bail!("notification must be set before returning");
|
||||
};
|
||||
|
||||
Ok((response, notification))
|
||||
}
|
||||
|
||||
async fn read_error_for_id(stream: &mut WsClient, id: i64) -> Result<JSONRPCError> {
|
||||
let target_id = RequestId::Integer(id);
|
||||
loop {
|
||||
|
|
@ -237,7 +288,7 @@ async fn read_jsonrpc_message(stream: &mut WsClient) -> Result<JSONRPCMessage> {
|
|||
}
|
||||
}
|
||||
|
||||
async fn assert_no_message(stream: &mut WsClient, wait_for: Duration) -> Result<()> {
|
||||
pub(super) async fn assert_no_message(stream: &mut WsClient, wait_for: Duration) -> Result<()> {
|
||||
match timeout(wait_for, stream.next()).await {
|
||||
Ok(Some(Ok(frame))) => bail!("unexpected frame while waiting for silence: {frame:?}"),
|
||||
Ok(Some(Err(err))) => bail!("unexpected websocket read error: {err}"),
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ mod thread_fork;
|
|||
mod thread_list;
|
||||
mod thread_loaded_list;
|
||||
mod thread_metadata_update;
|
||||
mod thread_name_websocket;
|
||||
mod thread_read;
|
||||
mod thread_resume;
|
||||
mod thread_rollback;
|
||||
|
|
|
|||
171
codex-rs/app-server/tests/suite/v2/thread_name_websocket.rs
Normal file
171
codex-rs/app-server/tests/suite/v2/thread_name_websocket.rs
Normal file
|
|
@ -0,0 +1,171 @@
|
|||
use super::connection_handling_websocket::DEFAULT_READ_TIMEOUT;
|
||||
use super::connection_handling_websocket::WsClient;
|
||||
use super::connection_handling_websocket::assert_no_message;
|
||||
use super::connection_handling_websocket::connect_websocket;
|
||||
use super::connection_handling_websocket::create_config_toml;
|
||||
use super::connection_handling_websocket::read_notification_for_method;
|
||||
use super::connection_handling_websocket::read_response_and_notification_for_method;
|
||||
use super::connection_handling_websocket::read_response_for_id;
|
||||
use super::connection_handling_websocket::reserve_local_addr;
|
||||
use super::connection_handling_websocket::send_initialize_request;
|
||||
use super::connection_handling_websocket::send_request;
|
||||
use super::connection_handling_websocket::spawn_websocket_server;
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use app_test_support::create_fake_rollout_with_text_elements;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::ThreadNameUpdatedNotification;
|
||||
use codex_app_server_protocol::ThreadResumeParams;
|
||||
use codex_app_server_protocol::ThreadResumeResponse;
|
||||
use codex_app_server_protocol::ThreadSetNameParams;
|
||||
use codex_app_server_protocol::ThreadSetNameResponse;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::timeout;
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_name_updated_broadcasts_for_loaded_threads() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
let conversation_id = create_rollout(codex_home.path(), "2025-01-05T12-00-00")?;
|
||||
|
||||
let bind_addr = reserve_local_addr()?;
|
||||
let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
|
||||
|
||||
let result = async {
|
||||
let mut ws1 = connect_websocket(bind_addr).await?;
|
||||
let mut ws2 = connect_websocket(bind_addr).await?;
|
||||
initialize_both_clients(&mut ws1, &mut ws2).await?;
|
||||
|
||||
send_request(
|
||||
&mut ws1,
|
||||
"thread/resume",
|
||||
10,
|
||||
Some(serde_json::to_value(ThreadResumeParams {
|
||||
thread_id: conversation_id.clone(),
|
||||
..Default::default()
|
||||
})?),
|
||||
)
|
||||
.await?;
|
||||
let resume_resp: JSONRPCResponse = read_response_for_id(&mut ws1, 10).await?;
|
||||
let resume: ThreadResumeResponse = to_response::<ThreadResumeResponse>(resume_resp)?;
|
||||
assert_eq!(resume.thread.id, conversation_id);
|
||||
|
||||
let renamed = "Loaded rename";
|
||||
send_request(
|
||||
&mut ws1,
|
||||
"thread/name/set",
|
||||
11,
|
||||
Some(serde_json::to_value(ThreadSetNameParams {
|
||||
thread_id: conversation_id.clone(),
|
||||
name: renamed.to_string(),
|
||||
})?),
|
||||
)
|
||||
.await?;
|
||||
let (rename_resp, ws1_notification) =
|
||||
read_response_and_notification_for_method(&mut ws1, 11, "thread/name/updated").await?;
|
||||
let _: ThreadSetNameResponse = to_response::<ThreadSetNameResponse>(rename_resp)?;
|
||||
assert_thread_name_updated(ws1_notification, &conversation_id, renamed)?;
|
||||
|
||||
let ws2_notification =
|
||||
read_notification_for_method(&mut ws2, "thread/name/updated").await?;
|
||||
assert_thread_name_updated(ws2_notification, &conversation_id, renamed)?;
|
||||
|
||||
assert_no_message(&mut ws1, Duration::from_millis(250)).await?;
|
||||
assert_no_message(&mut ws2, Duration::from_millis(250)).await?;
|
||||
Ok(())
|
||||
}
|
||||
.await;
|
||||
|
||||
process
|
||||
.kill()
|
||||
.await
|
||||
.context("failed to stop websocket app-server process")?;
|
||||
result
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_name_updated_broadcasts_for_not_loaded_threads() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
let conversation_id = create_rollout(codex_home.path(), "2025-01-05T12-05-00")?;
|
||||
|
||||
let bind_addr = reserve_local_addr()?;
|
||||
let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
|
||||
|
||||
let result = async {
|
||||
let mut ws1 = connect_websocket(bind_addr).await?;
|
||||
let mut ws2 = connect_websocket(bind_addr).await?;
|
||||
initialize_both_clients(&mut ws1, &mut ws2).await?;
|
||||
|
||||
let renamed = "Stored rename";
|
||||
send_request(
|
||||
&mut ws1,
|
||||
"thread/name/set",
|
||||
20,
|
||||
Some(serde_json::to_value(ThreadSetNameParams {
|
||||
thread_id: conversation_id.clone(),
|
||||
name: renamed.to_string(),
|
||||
})?),
|
||||
)
|
||||
.await?;
|
||||
let (rename_resp, ws1_notification) =
|
||||
read_response_and_notification_for_method(&mut ws1, 20, "thread/name/updated").await?;
|
||||
let _: ThreadSetNameResponse = to_response::<ThreadSetNameResponse>(rename_resp)?;
|
||||
assert_thread_name_updated(ws1_notification, &conversation_id, renamed)?;
|
||||
|
||||
let ws2_notification =
|
||||
read_notification_for_method(&mut ws2, "thread/name/updated").await?;
|
||||
assert_thread_name_updated(ws2_notification, &conversation_id, renamed)?;
|
||||
|
||||
assert_no_message(&mut ws1, Duration::from_millis(250)).await?;
|
||||
assert_no_message(&mut ws2, Duration::from_millis(250)).await?;
|
||||
Ok(())
|
||||
}
|
||||
.await;
|
||||
|
||||
process
|
||||
.kill()
|
||||
.await
|
||||
.context("failed to stop websocket app-server process")?;
|
||||
result
|
||||
}
|
||||
|
||||
async fn initialize_both_clients(ws1: &mut WsClient, ws2: &mut WsClient) -> Result<()> {
|
||||
send_initialize_request(ws1, 1, "ws_client_one").await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, read_response_for_id(ws1, 1)).await??;
|
||||
|
||||
send_initialize_request(ws2, 2, "ws_client_two").await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, read_response_for_id(ws2, 2)).await??;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_rollout(codex_home: &std::path::Path, filename_ts: &str) -> Result<String> {
|
||||
create_fake_rollout_with_text_elements(
|
||||
codex_home,
|
||||
filename_ts,
|
||||
"2025-01-05T12:00:00Z",
|
||||
"Saved user message",
|
||||
Vec::new(),
|
||||
Some("mock_provider"),
|
||||
None,
|
||||
)
|
||||
}
|
||||
|
||||
fn assert_thread_name_updated(
|
||||
notification: JSONRPCNotification,
|
||||
thread_id: &str,
|
||||
thread_name: &str,
|
||||
) -> Result<()> {
|
||||
let notification: ThreadNameUpdatedNotification =
|
||||
serde_json::from_value(notification.params.context("thread/name/updated params")?)?;
|
||||
assert_eq!(notification.thread_id, thread_id);
|
||||
assert_eq!(notification.thread_name.as_deref(), Some(thread_name));
|
||||
Ok(())
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue