Currently we emit `thread/status/changed` with `Idle` status right before sending `thread/started` event (which also has `Idle` status in it). It feels that there is no point in that as client has no way to know prior state of the thread as it didn't exist yet, so silence these kinds of notifications.
804 lines
24 KiB
Rust
804 lines
24 KiB
Rust
#[cfg(test)]
|
|
use crate::outgoing_message::OutgoingEnvelope;
|
|
#[cfg(test)]
|
|
use crate::outgoing_message::OutgoingMessage;
|
|
use crate::outgoing_message::OutgoingMessageSender;
|
|
use codex_app_server_protocol::ServerNotification;
|
|
use codex_app_server_protocol::Thread;
|
|
use codex_app_server_protocol::ThreadActiveFlag;
|
|
use codex_app_server_protocol::ThreadStatus;
|
|
use codex_app_server_protocol::ThreadStatusChangedNotification;
|
|
use std::collections::HashMap;
|
|
#[cfg(test)]
|
|
use std::path::PathBuf;
|
|
use std::sync::Arc;
|
|
use tokio::sync::Mutex;
|
|
#[cfg(test)]
|
|
use tokio::sync::mpsc;
|
|
use tokio::sync::watch;
|
|
|
|
#[derive(Clone)]
|
|
pub(crate) struct ThreadWatchManager {
|
|
state: Arc<Mutex<ThreadWatchState>>,
|
|
outgoing: Option<Arc<OutgoingMessageSender>>,
|
|
running_turn_count_tx: watch::Sender<usize>,
|
|
}
|
|
|
|
pub(crate) struct ThreadWatchActiveGuard {
|
|
manager: ThreadWatchManager,
|
|
thread_id: String,
|
|
guard_type: ThreadWatchActiveGuardType,
|
|
handle: tokio::runtime::Handle,
|
|
}
|
|
|
|
impl ThreadWatchActiveGuard {
|
|
fn new(
|
|
manager: ThreadWatchManager,
|
|
thread_id: String,
|
|
guard_type: ThreadWatchActiveGuardType,
|
|
) -> Self {
|
|
Self {
|
|
manager,
|
|
thread_id,
|
|
guard_type,
|
|
handle: tokio::runtime::Handle::current(),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Drop for ThreadWatchActiveGuard {
|
|
fn drop(&mut self) {
|
|
let manager = self.manager.clone();
|
|
let thread_id = self.thread_id.clone();
|
|
let guard_type = self.guard_type;
|
|
self.handle.spawn(async move {
|
|
manager
|
|
.note_active_guard_released(thread_id, guard_type)
|
|
.await;
|
|
});
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Copy)]
|
|
enum ThreadWatchActiveGuardType {
|
|
Permission,
|
|
UserInput,
|
|
}
|
|
|
|
impl Default for ThreadWatchManager {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
impl ThreadWatchManager {
|
|
pub(crate) fn new() -> Self {
|
|
let (running_turn_count_tx, _running_turn_count_rx) = watch::channel(0);
|
|
Self {
|
|
state: Arc::new(Mutex::new(ThreadWatchState::default())),
|
|
outgoing: None,
|
|
running_turn_count_tx,
|
|
}
|
|
}
|
|
|
|
pub(crate) fn new_with_outgoing(outgoing: Arc<OutgoingMessageSender>) -> Self {
|
|
let (running_turn_count_tx, _running_turn_count_rx) = watch::channel(0);
|
|
Self {
|
|
state: Arc::new(Mutex::new(ThreadWatchState::default())),
|
|
outgoing: Some(outgoing),
|
|
running_turn_count_tx,
|
|
}
|
|
}
|
|
|
|
pub(crate) async fn upsert_thread(&self, thread: Thread) {
|
|
self.mutate_and_publish(move |state| state.upsert_thread(thread.id, true))
|
|
.await;
|
|
}
|
|
|
|
pub(crate) async fn upsert_thread_silently(&self, thread: Thread) {
|
|
self.mutate_and_publish(move |state| state.upsert_thread(thread.id, false))
|
|
.await;
|
|
}
|
|
|
|
pub(crate) async fn remove_thread(&self, thread_id: &str) {
|
|
let thread_id = thread_id.to_string();
|
|
self.mutate_and_publish(move |state| state.remove_thread(&thread_id))
|
|
.await;
|
|
}
|
|
|
|
pub(crate) async fn loaded_status_for_thread(&self, thread_id: &str) -> ThreadStatus {
|
|
self.state.lock().await.loaded_status_for_thread(thread_id)
|
|
}
|
|
|
|
pub(crate) async fn loaded_statuses_for_threads(
|
|
&self,
|
|
thread_ids: Vec<String>,
|
|
) -> HashMap<String, ThreadStatus> {
|
|
let state = self.state.lock().await;
|
|
thread_ids
|
|
.into_iter()
|
|
.map(|thread_id| {
|
|
let status = state.loaded_status_for_thread(&thread_id);
|
|
(thread_id, status)
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
#[cfg(test)]
|
|
pub(crate) async fn running_turn_count(&self) -> usize {
|
|
self.state
|
|
.lock()
|
|
.await
|
|
.runtime_by_thread_id
|
|
.values()
|
|
.filter(|runtime| runtime.running)
|
|
.count()
|
|
}
|
|
|
|
pub(crate) fn subscribe_running_turn_count(&self) -> watch::Receiver<usize> {
|
|
self.running_turn_count_tx.subscribe()
|
|
}
|
|
|
|
pub(crate) async fn note_turn_started(&self, thread_id: &str) {
|
|
self.update_runtime_for_thread(thread_id, |runtime| {
|
|
runtime.is_loaded = true;
|
|
runtime.running = true;
|
|
runtime.has_system_error = false;
|
|
})
|
|
.await;
|
|
}
|
|
|
|
pub(crate) async fn note_turn_completed(&self, thread_id: &str, _failed: bool) {
|
|
self.clear_active_state(thread_id).await;
|
|
}
|
|
|
|
pub(crate) async fn note_turn_interrupted(&self, thread_id: &str) {
|
|
self.clear_active_state(thread_id).await;
|
|
}
|
|
|
|
pub(crate) async fn note_thread_shutdown(&self, thread_id: &str) {
|
|
self.update_runtime_for_thread(thread_id, |runtime| {
|
|
runtime.running = false;
|
|
runtime.pending_permission_requests = 0;
|
|
runtime.pending_user_input_requests = 0;
|
|
runtime.is_loaded = false;
|
|
})
|
|
.await;
|
|
}
|
|
|
|
pub(crate) async fn note_system_error(&self, thread_id: &str) {
|
|
self.update_runtime_for_thread(thread_id, |runtime| {
|
|
runtime.running = false;
|
|
runtime.pending_permission_requests = 0;
|
|
runtime.pending_user_input_requests = 0;
|
|
runtime.has_system_error = true;
|
|
})
|
|
.await;
|
|
}
|
|
|
|
async fn clear_active_state(&self, thread_id: &str) {
|
|
self.update_runtime_for_thread(thread_id, move |runtime| {
|
|
runtime.running = false;
|
|
runtime.pending_permission_requests = 0;
|
|
runtime.pending_user_input_requests = 0;
|
|
})
|
|
.await;
|
|
}
|
|
|
|
pub(crate) async fn note_permission_requested(
|
|
&self,
|
|
thread_id: &str,
|
|
) -> ThreadWatchActiveGuard {
|
|
self.note_pending_request(thread_id, ThreadWatchActiveGuardType::Permission)
|
|
.await
|
|
}
|
|
|
|
pub(crate) async fn note_user_input_requested(
|
|
&self,
|
|
thread_id: &str,
|
|
) -> ThreadWatchActiveGuard {
|
|
self.note_pending_request(thread_id, ThreadWatchActiveGuardType::UserInput)
|
|
.await
|
|
}
|
|
|
|
async fn note_pending_request(
|
|
&self,
|
|
thread_id: &str,
|
|
guard_type: ThreadWatchActiveGuardType,
|
|
) -> ThreadWatchActiveGuard {
|
|
self.update_runtime_for_thread(thread_id, move |runtime| {
|
|
runtime.is_loaded = true;
|
|
let counter = Self::pending_counter(runtime, guard_type);
|
|
*counter = counter.saturating_add(1);
|
|
})
|
|
.await;
|
|
ThreadWatchActiveGuard::new(self.clone(), thread_id.to_string(), guard_type)
|
|
}
|
|
|
|
async fn mutate_and_publish<F>(&self, mutate: F)
|
|
where
|
|
F: FnOnce(&mut ThreadWatchState) -> Option<ThreadStatusChangedNotification>,
|
|
{
|
|
let (notification, running_turn_count) = {
|
|
let mut state = self.state.lock().await;
|
|
let notification = mutate(&mut state);
|
|
let running_turn_count = state
|
|
.runtime_by_thread_id
|
|
.values()
|
|
.filter(|runtime| runtime.running)
|
|
.count();
|
|
(notification, running_turn_count)
|
|
};
|
|
let _ = self.running_turn_count_tx.send(running_turn_count);
|
|
|
|
if let Some(notification) = notification
|
|
&& let Some(outgoing) = &self.outgoing
|
|
{
|
|
outgoing
|
|
.send_server_notification(ServerNotification::ThreadStatusChanged(notification))
|
|
.await;
|
|
}
|
|
}
|
|
|
|
async fn note_active_guard_released(
|
|
&self,
|
|
thread_id: String,
|
|
guard_type: ThreadWatchActiveGuardType,
|
|
) {
|
|
self.update_runtime_for_thread(&thread_id, move |runtime| {
|
|
let counter = Self::pending_counter(runtime, guard_type);
|
|
*counter = counter.saturating_sub(1);
|
|
})
|
|
.await;
|
|
}
|
|
|
|
async fn update_runtime_for_thread<F>(&self, thread_id: &str, update: F)
|
|
where
|
|
F: FnOnce(&mut RuntimeFacts),
|
|
{
|
|
let thread_id = thread_id.to_string();
|
|
self.mutate_and_publish(move |state| state.update_runtime(&thread_id, update))
|
|
.await;
|
|
}
|
|
|
|
fn pending_counter(
|
|
runtime: &mut RuntimeFacts,
|
|
guard_type: ThreadWatchActiveGuardType,
|
|
) -> &mut u32 {
|
|
match guard_type {
|
|
ThreadWatchActiveGuardType::Permission => &mut runtime.pending_permission_requests,
|
|
ThreadWatchActiveGuardType::UserInput => &mut runtime.pending_user_input_requests,
|
|
}
|
|
}
|
|
}
|
|
|
|
pub(crate) fn resolve_thread_status(
|
|
status: ThreadStatus,
|
|
has_in_progress_turn: bool,
|
|
) -> ThreadStatus {
|
|
// Running-turn events can arrive before the watch runtime state is observed by
|
|
// the listener loop. In that window we prefer to reflect a real active turn as
|
|
// `Active` instead of `Idle`/`NotLoaded`.
|
|
if has_in_progress_turn && matches!(status, ThreadStatus::Idle | ThreadStatus::NotLoaded) {
|
|
return ThreadStatus::Active {
|
|
active_flags: Vec::new(),
|
|
};
|
|
}
|
|
|
|
status
|
|
}
|
|
|
|
#[derive(Default)]
|
|
struct ThreadWatchState {
|
|
runtime_by_thread_id: HashMap<String, RuntimeFacts>,
|
|
}
|
|
|
|
impl ThreadWatchState {
|
|
fn upsert_thread(
|
|
&mut self,
|
|
thread_id: String,
|
|
emit_notification: bool,
|
|
) -> Option<ThreadStatusChangedNotification> {
|
|
let previous_status = self.status_for(&thread_id);
|
|
let runtime = self
|
|
.runtime_by_thread_id
|
|
.entry(thread_id.clone())
|
|
.or_default();
|
|
runtime.is_loaded = true;
|
|
if emit_notification {
|
|
self.status_changed_notification(thread_id, previous_status)
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
|
|
fn remove_thread(&mut self, thread_id: &str) -> Option<ThreadStatusChangedNotification> {
|
|
let previous_status = self.status_for(thread_id);
|
|
self.runtime_by_thread_id.remove(thread_id);
|
|
if previous_status.is_some() && previous_status != Some(ThreadStatus::NotLoaded) {
|
|
Some(ThreadStatusChangedNotification {
|
|
thread_id: thread_id.to_string(),
|
|
status: ThreadStatus::NotLoaded,
|
|
})
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
|
|
fn update_runtime<F>(
|
|
&mut self,
|
|
thread_id: &str,
|
|
mutate: F,
|
|
) -> Option<ThreadStatusChangedNotification>
|
|
where
|
|
F: FnOnce(&mut RuntimeFacts),
|
|
{
|
|
let previous_status = self.status_for(thread_id);
|
|
let runtime = self
|
|
.runtime_by_thread_id
|
|
.entry(thread_id.to_string())
|
|
.or_default();
|
|
runtime.is_loaded = true;
|
|
mutate(runtime);
|
|
self.status_changed_notification(thread_id.to_string(), previous_status)
|
|
}
|
|
|
|
fn status_for(&self, thread_id: &str) -> Option<ThreadStatus> {
|
|
self.runtime_by_thread_id
|
|
.get(thread_id)
|
|
.map(loaded_thread_status)
|
|
}
|
|
|
|
fn loaded_status_for_thread(&self, thread_id: &str) -> ThreadStatus {
|
|
self.status_for(thread_id)
|
|
.unwrap_or(ThreadStatus::NotLoaded)
|
|
}
|
|
|
|
fn status_changed_notification(
|
|
&self,
|
|
thread_id: String,
|
|
previous_status: Option<ThreadStatus>,
|
|
) -> Option<ThreadStatusChangedNotification> {
|
|
let status = self.status_for(&thread_id)?;
|
|
|
|
if previous_status.as_ref() == Some(&status) {
|
|
return None;
|
|
}
|
|
|
|
Some(ThreadStatusChangedNotification { thread_id, status })
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Default)]
|
|
struct RuntimeFacts {
|
|
is_loaded: bool,
|
|
running: bool,
|
|
pending_permission_requests: u32,
|
|
pending_user_input_requests: u32,
|
|
has_system_error: bool,
|
|
}
|
|
|
|
fn loaded_thread_status(runtime: &RuntimeFacts) -> ThreadStatus {
|
|
if !runtime.is_loaded {
|
|
return ThreadStatus::NotLoaded;
|
|
}
|
|
|
|
let mut active_flags = Vec::new();
|
|
if runtime.pending_permission_requests > 0 {
|
|
active_flags.push(ThreadActiveFlag::WaitingOnApproval);
|
|
}
|
|
if runtime.pending_user_input_requests > 0 {
|
|
active_flags.push(ThreadActiveFlag::WaitingOnUserInput);
|
|
}
|
|
|
|
if runtime.running || !active_flags.is_empty() {
|
|
return ThreadStatus::Active { active_flags };
|
|
}
|
|
|
|
if runtime.has_system_error {
|
|
return ThreadStatus::SystemError;
|
|
}
|
|
|
|
ThreadStatus::Idle
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use pretty_assertions::assert_eq;
|
|
use tokio::time::Duration;
|
|
use tokio::time::timeout;
|
|
|
|
const INTERACTIVE_THREAD_ID: &str = "00000000-0000-0000-0000-000000000001";
|
|
const NON_INTERACTIVE_THREAD_ID: &str = "00000000-0000-0000-0000-000000000002";
|
|
|
|
#[tokio::test]
|
|
async fn loaded_status_defaults_to_not_loaded_for_untracked_threads() {
|
|
let manager = ThreadWatchManager::new();
|
|
|
|
assert_eq!(
|
|
manager
|
|
.loaded_status_for_thread("00000000-0000-0000-0000-000000000003")
|
|
.await,
|
|
ThreadStatus::NotLoaded,
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn tracks_non_interactive_thread_status() {
|
|
let manager = ThreadWatchManager::new();
|
|
manager
|
|
.upsert_thread(test_thread(
|
|
NON_INTERACTIVE_THREAD_ID,
|
|
codex_app_server_protocol::SessionSource::AppServer,
|
|
))
|
|
.await;
|
|
|
|
manager.note_turn_started(NON_INTERACTIVE_THREAD_ID).await;
|
|
|
|
assert_eq!(
|
|
manager
|
|
.loaded_status_for_thread(NON_INTERACTIVE_THREAD_ID)
|
|
.await,
|
|
ThreadStatus::Active {
|
|
active_flags: vec![],
|
|
},
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn status_updates_track_single_thread() {
|
|
let manager = ThreadWatchManager::new();
|
|
manager
|
|
.upsert_thread(test_thread(
|
|
INTERACTIVE_THREAD_ID,
|
|
codex_app_server_protocol::SessionSource::Cli,
|
|
))
|
|
.await;
|
|
|
|
manager.note_turn_started(INTERACTIVE_THREAD_ID).await;
|
|
assert_eq!(
|
|
manager
|
|
.loaded_status_for_thread(INTERACTIVE_THREAD_ID)
|
|
.await,
|
|
ThreadStatus::Active {
|
|
active_flags: vec![],
|
|
},
|
|
);
|
|
|
|
let permission_guard = manager
|
|
.note_permission_requested(INTERACTIVE_THREAD_ID)
|
|
.await;
|
|
assert_eq!(
|
|
manager
|
|
.loaded_status_for_thread(INTERACTIVE_THREAD_ID)
|
|
.await,
|
|
ThreadStatus::Active {
|
|
active_flags: vec![ThreadActiveFlag::WaitingOnApproval],
|
|
},
|
|
);
|
|
|
|
let user_input_guard = manager
|
|
.note_user_input_requested(INTERACTIVE_THREAD_ID)
|
|
.await;
|
|
assert_eq!(
|
|
manager
|
|
.loaded_status_for_thread(INTERACTIVE_THREAD_ID)
|
|
.await,
|
|
ThreadStatus::Active {
|
|
active_flags: vec![
|
|
ThreadActiveFlag::WaitingOnApproval,
|
|
ThreadActiveFlag::WaitingOnUserInput,
|
|
],
|
|
},
|
|
);
|
|
|
|
drop(permission_guard);
|
|
wait_for_status(
|
|
&manager,
|
|
INTERACTIVE_THREAD_ID,
|
|
ThreadStatus::Active {
|
|
active_flags: vec![ThreadActiveFlag::WaitingOnUserInput],
|
|
},
|
|
)
|
|
.await;
|
|
|
|
drop(user_input_guard);
|
|
wait_for_status(
|
|
&manager,
|
|
INTERACTIVE_THREAD_ID,
|
|
ThreadStatus::Active {
|
|
active_flags: vec![],
|
|
},
|
|
)
|
|
.await;
|
|
|
|
manager
|
|
.note_turn_completed(INTERACTIVE_THREAD_ID, false)
|
|
.await;
|
|
assert_eq!(
|
|
manager
|
|
.loaded_status_for_thread(INTERACTIVE_THREAD_ID)
|
|
.await,
|
|
ThreadStatus::Idle,
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn resolves_in_progress_turn_to_active_status() {
|
|
let status = resolve_thread_status(ThreadStatus::Idle, true);
|
|
assert_eq!(
|
|
status,
|
|
ThreadStatus::Active {
|
|
active_flags: Vec::new(),
|
|
}
|
|
);
|
|
|
|
let status = resolve_thread_status(ThreadStatus::NotLoaded, true);
|
|
assert_eq!(
|
|
status,
|
|
ThreadStatus::Active {
|
|
active_flags: Vec::new(),
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn keeps_status_when_no_in_progress_turn() {
|
|
assert_eq!(
|
|
resolve_thread_status(ThreadStatus::Idle, false),
|
|
ThreadStatus::Idle
|
|
);
|
|
assert_eq!(
|
|
resolve_thread_status(ThreadStatus::SystemError, false),
|
|
ThreadStatus::SystemError
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn system_error_sets_idle_flag_until_next_turn() {
|
|
let manager = ThreadWatchManager::new();
|
|
manager
|
|
.upsert_thread(test_thread(
|
|
INTERACTIVE_THREAD_ID,
|
|
codex_app_server_protocol::SessionSource::Cli,
|
|
))
|
|
.await;
|
|
|
|
manager.note_turn_started(INTERACTIVE_THREAD_ID).await;
|
|
manager.note_system_error(INTERACTIVE_THREAD_ID).await;
|
|
|
|
assert_eq!(
|
|
manager
|
|
.loaded_status_for_thread(INTERACTIVE_THREAD_ID)
|
|
.await,
|
|
ThreadStatus::SystemError,
|
|
);
|
|
|
|
manager.note_turn_started(INTERACTIVE_THREAD_ID).await;
|
|
assert_eq!(
|
|
manager
|
|
.loaded_status_for_thread(INTERACTIVE_THREAD_ID)
|
|
.await,
|
|
ThreadStatus::Active {
|
|
active_flags: vec![],
|
|
},
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn shutdown_marks_thread_not_loaded() {
|
|
let manager = ThreadWatchManager::new();
|
|
manager
|
|
.upsert_thread(test_thread(
|
|
INTERACTIVE_THREAD_ID,
|
|
codex_app_server_protocol::SessionSource::Cli,
|
|
))
|
|
.await;
|
|
|
|
manager.note_turn_started(INTERACTIVE_THREAD_ID).await;
|
|
manager.note_thread_shutdown(INTERACTIVE_THREAD_ID).await;
|
|
|
|
assert_eq!(
|
|
manager
|
|
.loaded_status_for_thread(INTERACTIVE_THREAD_ID)
|
|
.await,
|
|
ThreadStatus::NotLoaded,
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn loaded_statuses_default_to_not_loaded_for_untracked_threads() {
|
|
let manager = ThreadWatchManager::new();
|
|
manager
|
|
.upsert_thread(test_thread(
|
|
INTERACTIVE_THREAD_ID,
|
|
codex_app_server_protocol::SessionSource::Cli,
|
|
))
|
|
.await;
|
|
manager.note_turn_started(INTERACTIVE_THREAD_ID).await;
|
|
|
|
let statuses = manager
|
|
.loaded_statuses_for_threads(vec![
|
|
INTERACTIVE_THREAD_ID.to_string(),
|
|
NON_INTERACTIVE_THREAD_ID.to_string(),
|
|
])
|
|
.await;
|
|
|
|
assert_eq!(
|
|
statuses.get(INTERACTIVE_THREAD_ID),
|
|
Some(&ThreadStatus::Active {
|
|
active_flags: vec![],
|
|
}),
|
|
);
|
|
assert_eq!(
|
|
statuses.get(NON_INTERACTIVE_THREAD_ID),
|
|
Some(&ThreadStatus::NotLoaded),
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn has_running_turns_tracks_runtime_running_flag_only() {
|
|
let manager = ThreadWatchManager::new();
|
|
manager
|
|
.upsert_thread(test_thread(
|
|
INTERACTIVE_THREAD_ID,
|
|
codex_app_server_protocol::SessionSource::Cli,
|
|
))
|
|
.await;
|
|
|
|
assert_eq!(manager.running_turn_count().await, 0);
|
|
|
|
let _permission_guard = manager
|
|
.note_permission_requested(INTERACTIVE_THREAD_ID)
|
|
.await;
|
|
assert_eq!(manager.running_turn_count().await, 0);
|
|
|
|
manager.note_turn_started(INTERACTIVE_THREAD_ID).await;
|
|
assert_eq!(manager.running_turn_count().await, 1);
|
|
|
|
manager
|
|
.note_turn_completed(INTERACTIVE_THREAD_ID, false)
|
|
.await;
|
|
assert_eq!(manager.running_turn_count().await, 0);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn status_change_emits_notification() {
|
|
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8);
|
|
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new(
|
|
outgoing_tx,
|
|
)));
|
|
|
|
manager
|
|
.upsert_thread(test_thread(
|
|
INTERACTIVE_THREAD_ID,
|
|
codex_app_server_protocol::SessionSource::Cli,
|
|
))
|
|
.await;
|
|
assert_eq!(
|
|
recv_status_changed_notification(&mut outgoing_rx).await,
|
|
ThreadStatusChangedNotification {
|
|
thread_id: INTERACTIVE_THREAD_ID.to_string(),
|
|
status: ThreadStatus::Idle,
|
|
},
|
|
);
|
|
|
|
manager.note_turn_started(INTERACTIVE_THREAD_ID).await;
|
|
assert_eq!(
|
|
recv_status_changed_notification(&mut outgoing_rx).await,
|
|
ThreadStatusChangedNotification {
|
|
thread_id: INTERACTIVE_THREAD_ID.to_string(),
|
|
status: ThreadStatus::Active {
|
|
active_flags: vec![],
|
|
},
|
|
},
|
|
);
|
|
|
|
manager.remove_thread(INTERACTIVE_THREAD_ID).await;
|
|
assert_eq!(
|
|
recv_status_changed_notification(&mut outgoing_rx).await,
|
|
ThreadStatusChangedNotification {
|
|
thread_id: INTERACTIVE_THREAD_ID.to_string(),
|
|
status: ThreadStatus::NotLoaded,
|
|
},
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn silent_upsert_skips_initial_notification() {
|
|
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8);
|
|
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new(
|
|
outgoing_tx,
|
|
)));
|
|
|
|
manager
|
|
.upsert_thread_silently(test_thread(
|
|
INTERACTIVE_THREAD_ID,
|
|
codex_app_server_protocol::SessionSource::Cli,
|
|
))
|
|
.await;
|
|
|
|
assert_eq!(
|
|
manager
|
|
.loaded_status_for_thread(INTERACTIVE_THREAD_ID)
|
|
.await,
|
|
ThreadStatus::Idle,
|
|
);
|
|
assert!(
|
|
timeout(Duration::from_millis(100), outgoing_rx.recv())
|
|
.await
|
|
.is_err(),
|
|
"silent upsert should not emit thread/status/changed"
|
|
);
|
|
|
|
manager.note_turn_started(INTERACTIVE_THREAD_ID).await;
|
|
assert_eq!(
|
|
recv_status_changed_notification(&mut outgoing_rx).await,
|
|
ThreadStatusChangedNotification {
|
|
thread_id: INTERACTIVE_THREAD_ID.to_string(),
|
|
status: ThreadStatus::Active {
|
|
active_flags: vec![],
|
|
},
|
|
},
|
|
);
|
|
}
|
|
|
|
async fn wait_for_status(
|
|
manager: &ThreadWatchManager,
|
|
thread_id: &str,
|
|
expected_status: ThreadStatus,
|
|
) {
|
|
timeout(Duration::from_secs(1), async {
|
|
loop {
|
|
let status = manager.loaded_status_for_thread(thread_id).await;
|
|
if status == expected_status {
|
|
break;
|
|
}
|
|
tokio::task::yield_now().await;
|
|
}
|
|
})
|
|
.await
|
|
.expect("timed out waiting for status");
|
|
}
|
|
|
|
async fn recv_status_changed_notification(
|
|
outgoing_rx: &mut mpsc::Receiver<OutgoingEnvelope>,
|
|
) -> ThreadStatusChangedNotification {
|
|
let envelope = timeout(Duration::from_secs(1), outgoing_rx.recv())
|
|
.await
|
|
.expect("timed out waiting for outgoing notification")
|
|
.expect("outgoing channel closed unexpectedly");
|
|
let OutgoingEnvelope::Broadcast { message } = envelope else {
|
|
panic!("expected broadcast notification");
|
|
};
|
|
let OutgoingMessage::AppServerNotification(ServerNotification::ThreadStatusChanged(
|
|
notification,
|
|
)) = message
|
|
else {
|
|
panic!("expected thread/status/changed notification");
|
|
};
|
|
notification
|
|
}
|
|
|
|
fn test_thread(thread_id: &str, source: codex_app_server_protocol::SessionSource) -> Thread {
|
|
Thread {
|
|
id: thread_id.to_string(),
|
|
preview: String::new(),
|
|
ephemeral: false,
|
|
model_provider: "mock-provider".to_string(),
|
|
created_at: 0,
|
|
updated_at: 0,
|
|
status: ThreadStatus::NotLoaded,
|
|
path: None,
|
|
cwd: PathBuf::from("/tmp"),
|
|
cli_version: "test".to_string(),
|
|
agent_nickname: None,
|
|
agent_role: None,
|
|
source,
|
|
git_info: None,
|
|
name: None,
|
|
turns: Vec::new(),
|
|
}
|
|
}
|
|
}
|