app-server: Silence thread status changes caused by thread being created (#13079)
Currently we emit `thread/status/changed` with `Idle` status right before sending `thread/started` event (which also has `Idle` status in it). It feels that there is no point in that as client has no way to know prior state of the thread as it didn't exist yet, so silence these kinds of notifications.
This commit is contained in:
parent
146b798129
commit
9022cdc563
6 changed files with 143 additions and 21 deletions
|
|
@ -120,9 +120,9 @@ Example with notification opt-out:
|
|||
|
||||
## API Overview
|
||||
|
||||
- `thread/start` — create a new thread; emits `thread/started` and auto-subscribes you to turn/item events for that thread.
|
||||
- `thread/start` — create a new thread; emits `thread/started` (including the current `thread.status`) and auto-subscribes you to turn/item events for that thread.
|
||||
- `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it.
|
||||
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; emits `thread/started` and auto-subscribes you to turn/item events for the new thread.
|
||||
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; emits `thread/started` (including the current `thread.status`) and auto-subscribes you to turn/item events for the new thread.
|
||||
- `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
|
||||
- `thread/loaded/list` — list the thread ids currently loaded in memory.
|
||||
- `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
|
||||
|
|
@ -273,10 +273,11 @@ When `nextCursor` is `null`, you’ve reached the final page.
|
|||
|
||||
### Example: Track thread status changes
|
||||
|
||||
`thread/status/changed` is emitted whenever a loaded thread's status changes:
|
||||
`thread/status/changed` is emitted whenever a loaded thread's status changes after it has already been introduced to the client:
|
||||
|
||||
- Includes `threadId` and the new `status`.
|
||||
- Status can be `notLoaded`, `idle`, `systemError`, or `active` (with `activeFlags`; `active` implies running).
|
||||
- `thread/start`, `thread/fork`, and detached review threads do not emit a separate initial `thread/status/changed`; their `thread/started` notification already carries the current `thread.status`.
|
||||
|
||||
```json
|
||||
{ "method": "thread/status/changed", "params": {
|
||||
|
|
|
|||
|
|
@ -2198,7 +2198,7 @@ impl CodexMessageProcessor {
|
|||
|
||||
listener_task_context
|
||||
.thread_watch_manager
|
||||
.upsert_thread(thread.clone())
|
||||
.upsert_thread_silently(thread.clone())
|
||||
.await;
|
||||
|
||||
thread.status = resolve_thread_status(
|
||||
|
|
@ -3704,7 +3704,7 @@ impl CodexMessageProcessor {
|
|||
}
|
||||
|
||||
self.thread_watch_manager
|
||||
.upsert_thread(thread.clone())
|
||||
.upsert_thread_silently(thread.clone())
|
||||
.await;
|
||||
|
||||
thread.status = resolve_thread_status(
|
||||
|
|
@ -6295,7 +6295,7 @@ impl CodexMessageProcessor {
|
|||
Ok(summary) => {
|
||||
let mut thread = summary_to_thread(summary);
|
||||
self.thread_watch_manager
|
||||
.upsert_thread(thread.clone())
|
||||
.upsert_thread_silently(thread.clone())
|
||||
.await;
|
||||
thread.status = resolve_thread_status(
|
||||
self.thread_watch_manager
|
||||
|
|
|
|||
|
|
@ -91,7 +91,12 @@ impl ThreadWatchManager {
|
|||
}
|
||||
|
||||
pub(crate) async fn upsert_thread(&self, thread: Thread) {
|
||||
self.mutate_and_publish(move |state| state.upsert_thread(thread.id))
|
||||
self.mutate_and_publish(move |state| state.upsert_thread(thread.id, true))
|
||||
.await;
|
||||
}
|
||||
|
||||
pub(crate) async fn upsert_thread_silently(&self, thread: Thread) {
|
||||
self.mutate_and_publish(move |state| state.upsert_thread(thread.id, false))
|
||||
.await;
|
||||
}
|
||||
|
||||
|
|
@ -289,14 +294,22 @@ struct ThreadWatchState {
|
|||
}
|
||||
|
||||
impl ThreadWatchState {
|
||||
fn upsert_thread(&mut self, thread_id: String) -> Option<ThreadStatusChangedNotification> {
|
||||
fn upsert_thread(
|
||||
&mut self,
|
||||
thread_id: String,
|
||||
emit_notification: bool,
|
||||
) -> Option<ThreadStatusChangedNotification> {
|
||||
let previous_status = self.status_for(&thread_id);
|
||||
let runtime = self
|
||||
.runtime_by_thread_id
|
||||
.entry(thread_id.clone())
|
||||
.or_default();
|
||||
runtime.is_loaded = true;
|
||||
self.status_changed_notification(thread_id, previous_status)
|
||||
if emit_notification {
|
||||
self.status_changed_notification(thread_id, previous_status)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_thread(&mut self, thread_id: &str) -> Option<ThreadStatusChangedNotification> {
|
||||
|
|
@ -692,6 +705,45 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn silent_upsert_skips_initial_notification() {
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8);
|
||||
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
)));
|
||||
|
||||
manager
|
||||
.upsert_thread_silently(test_thread(
|
||||
INTERACTIVE_THREAD_ID,
|
||||
codex_app_server_protocol::SessionSource::Cli,
|
||||
))
|
||||
.await;
|
||||
|
||||
assert_eq!(
|
||||
manager
|
||||
.loaded_status_for_thread(INTERACTIVE_THREAD_ID)
|
||||
.await,
|
||||
ThreadStatus::Idle,
|
||||
);
|
||||
assert!(
|
||||
timeout(Duration::from_millis(100), outgoing_rx.recv())
|
||||
.await
|
||||
.is_err(),
|
||||
"silent upsert should not emit thread/status/changed"
|
||||
);
|
||||
|
||||
manager.note_turn_started(INTERACTIVE_THREAD_ID).await;
|
||||
assert_eq!(
|
||||
recv_status_changed_notification(&mut outgoing_rx).await,
|
||||
ThreadStatusChangedNotification {
|
||||
thread_id: INTERACTIVE_THREAD_ID.to_string(),
|
||||
status: ThreadStatus::Active {
|
||||
active_flags: vec![],
|
||||
},
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
async fn wait_for_status(
|
||||
manager: &ThreadWatchManager,
|
||||
thread_id: &str,
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ use app_test_support::to_response;
|
|||
use codex_app_server_protocol::ItemCompletedNotification;
|
||||
use codex_app_server_protocol::ItemStartedNotification;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
|
|
@ -19,9 +20,12 @@ use codex_app_server_protocol::ServerRequest;
|
|||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStartedNotification;
|
||||
use codex_app_server_protocol::ThreadStatusChangedNotification;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_app_server_protocol::UserInput as V2UserInput;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
|
@ -301,6 +305,31 @@ async fn review_start_with_detached_delivery_returns_new_thread_id() -> Result<(
|
|||
"detached review should run on a different thread"
|
||||
);
|
||||
|
||||
let deadline = tokio::time::Instant::now() + DEFAULT_READ_TIMEOUT;
|
||||
let notification = loop {
|
||||
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
|
||||
let message = timeout(remaining, mcp.read_next_message()).await??;
|
||||
let JSONRPCMessage::Notification(notification) = message else {
|
||||
continue;
|
||||
};
|
||||
if notification.method == "thread/status/changed" {
|
||||
let status_changed: ThreadStatusChangedNotification =
|
||||
serde_json::from_value(notification.params.expect("params must be present"))?;
|
||||
if status_changed.thread_id == review_thread_id {
|
||||
anyhow::bail!(
|
||||
"detached review threads should be introduced without a preceding thread/status/changed"
|
||||
);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if notification.method == "thread/started" {
|
||||
break notification;
|
||||
}
|
||||
};
|
||||
let started: ThreadStartedNotification =
|
||||
serde_json::from_value(notification.params.expect("params must be present"))?;
|
||||
assert_eq!(started.thread.id, review_thread_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -389,6 +418,11 @@ async fn start_default_thread(mcp: &mut McpProcess) -> Result<String> {
|
|||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/started"),
|
||||
)
|
||||
.await??;
|
||||
Ok(thread.id)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ use app_test_support::create_fake_rollout;
|
|||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::SessionSource;
|
||||
|
|
@ -15,6 +15,7 @@ use codex_app_server_protocol::ThreadStartParams;
|
|||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStartedNotification;
|
||||
use codex_app_server_protocol::ThreadStatus;
|
||||
use codex_app_server_protocol::ThreadStatusChangedNotification;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
|
@ -124,11 +125,27 @@ async fn thread_fork_creates_new_thread_and_emits_started() -> Result<()> {
|
|||
}
|
||||
|
||||
// A corresponding thread/started notification should arrive.
|
||||
let notif: JSONRPCNotification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/started"),
|
||||
)
|
||||
.await??;
|
||||
let deadline = tokio::time::Instant::now() + DEFAULT_READ_TIMEOUT;
|
||||
let notif = loop {
|
||||
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
|
||||
let message = timeout(remaining, mcp.read_next_message()).await??;
|
||||
let JSONRPCMessage::Notification(notif) = message else {
|
||||
continue;
|
||||
};
|
||||
if notif.method == "thread/status/changed" {
|
||||
let status_changed: ThreadStatusChangedNotification =
|
||||
serde_json::from_value(notif.params.expect("params must be present"))?;
|
||||
if status_changed.thread_id == thread.id {
|
||||
anyhow::bail!(
|
||||
"thread/fork should introduce the thread without a preceding thread/status/changed"
|
||||
);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if notif.method == "thread/started" {
|
||||
break notif;
|
||||
}
|
||||
};
|
||||
let started_params = notif.params.clone().expect("params must be present");
|
||||
let started_thread_json = started_params
|
||||
.get("thread")
|
||||
|
|
|
|||
|
|
@ -3,16 +3,18 @@ use app_test_support::McpProcess;
|
|||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStartedNotification;
|
||||
use codex_app_server_protocol::ThreadStatus;
|
||||
use codex_app_server_protocol::ThreadStatusChangedNotification;
|
||||
use codex_core::config::set_project_trust_level;
|
||||
use codex_protocol::config_types::TrustLevel;
|
||||
use codex_protocol::openai_models::ReasoningEffort;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use std::path::Path;
|
||||
use tempfile::TempDir;
|
||||
|
|
@ -92,11 +94,27 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> {
|
|||
assert_eq!(thread.name, None);
|
||||
|
||||
// A corresponding thread/started notification should arrive.
|
||||
let notif: JSONRPCNotification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/started"),
|
||||
)
|
||||
.await??;
|
||||
let deadline = tokio::time::Instant::now() + DEFAULT_READ_TIMEOUT;
|
||||
let notif = loop {
|
||||
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
|
||||
let message = timeout(remaining, mcp.read_next_message()).await??;
|
||||
let JSONRPCMessage::Notification(notif) = message else {
|
||||
continue;
|
||||
};
|
||||
if notif.method == "thread/status/changed" {
|
||||
let status_changed: ThreadStatusChangedNotification =
|
||||
serde_json::from_value(notif.params.expect("params must be present"))?;
|
||||
if status_changed.thread_id == thread.id {
|
||||
anyhow::bail!(
|
||||
"thread/start should introduce the thread without a preceding thread/status/changed"
|
||||
);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if notif.method == "thread/started" {
|
||||
break notif;
|
||||
}
|
||||
};
|
||||
let started_params = notif.params.clone().expect("params must be present");
|
||||
let started_thread_json = started_params
|
||||
.get("thread")
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue