From 733cb684963b793ce2ffe2de7c13b98a442dac83 Mon Sep 17 00:00:00 2001 From: Owen Lin Date: Thu, 22 Jan 2026 12:22:36 -0800 Subject: [PATCH] feat(app-server): support archived threads in thread/list (#9571) --- .../app-server-protocol/src/protocol/v2.rs | 3 + codex-rs/app-server/README.md | 3 +- .../app-server/src/codex_message_processor.rs | 59 +++- .../app-server/tests/suite/v2/thread_list.rs | 82 ++++- codex-rs/core/src/rollout/list.rs | 286 +++++++++++++++++- codex-rs/core/src/rollout/recorder.rs | 30 ++ codex-rs/debug-client/src/client.rs | 1 + 7 files changed, 433 insertions(+), 31 deletions(-) diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 48e0708bd..66654e051 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -1205,6 +1205,9 @@ pub struct ThreadListParams { /// Optional provider filter; when set, only sessions recorded under these /// providers are returned. When present but empty, includes all providers. pub model_providers: Option>, + /// Optional archived filter; when set to true, only archived threads are returned. + /// If false or null, only non-archived threads are returned. + pub archived: Option, } #[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, JsonSchema, TS)] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 8106200e3..aef925373 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -148,6 +148,7 @@ To branch from a stored session, call `thread/fork` with the `thread.id`. This c - `limit` — server defaults to a reasonable page size if unset. - `sortKey` — `created_at` (default) or `updated_at`. - `modelProviders` — restrict results to specific providers; unset, null, or an empty array will include all providers. +- `archived` — when `true`, list archived threads only. When `false` or `null`, list non-archived threads (default). Example: @@ -202,7 +203,7 @@ Use `thread/archive` to move the persisted rollout (stored as a JSONL file on di { "id": 21, "result": {} } ``` -An archived thread will not appear in future calls to `thread/list`. +An archived thread will not appear in `thread/list` unless `archived` is set to `true`. ### Example: Start a turn (send user input) diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index b9d09c71d..36611d157 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -1626,6 +1626,7 @@ impl CodexMessageProcessor { limit, sort_key, model_providers, + archived, } = params; let requested_page_size = limit @@ -1637,7 +1638,13 @@ impl CodexMessageProcessor { ThreadSortKey::UpdatedAt => CoreThreadSortKey::UpdatedAt, }; let (summaries, next_cursor) = match self - .list_threads_common(requested_page_size, cursor, model_providers, core_sort_key) + .list_threads_common( + requested_page_size, + cursor, + model_providers, + core_sort_key, + archived.unwrap_or(false), + ) .await { Ok(r) => r, @@ -2280,6 +2287,7 @@ impl CodexMessageProcessor { cursor, model_providers, CoreThreadSortKey::UpdatedAt, + false, ) .await { @@ -2299,6 +2307,7 @@ impl CodexMessageProcessor { cursor: Option, model_providers: Option>, sort_key: CoreThreadSortKey, + archived: bool, ) -> Result<(Vec, Option), JSONRPCErrorError> { let mut cursor_obj: Option = match cursor.as_ref() { Some(cursor_str) => { @@ -2329,21 +2338,39 @@ impl CodexMessageProcessor { while remaining > 0 { let page_size = remaining.min(THREAD_LIST_MAX_LIMIT); - let page = RolloutRecorder::list_threads( - &self.config.codex_home, - page_size, - cursor_obj.as_ref(), - sort_key, - INTERACTIVE_SESSION_SOURCES, - model_provider_filter.as_deref(), - fallback_provider.as_str(), - ) - .await - .map_err(|err| JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: format!("failed to list threads: {err}"), - data: None, - })?; + let page = if archived { + RolloutRecorder::list_archived_threads( + &self.config.codex_home, + page_size, + cursor_obj.as_ref(), + sort_key, + INTERACTIVE_SESSION_SOURCES, + model_provider_filter.as_deref(), + fallback_provider.as_str(), + ) + .await + .map_err(|err| JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("failed to list threads: {err}"), + data: None, + })? + } else { + RolloutRecorder::list_threads( + &self.config.codex_home, + page_size, + cursor_obj.as_ref(), + sort_key, + INTERACTIVE_SESSION_SOURCES, + model_provider_filter.as_deref(), + fallback_provider.as_str(), + ) + .await + .map_err(|err| JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("failed to list threads: {err}"), + data: None, + })? + }; let mut filtered = page .items diff --git a/codex-rs/app-server/tests/suite/v2/thread_list.rs b/codex-rs/app-server/tests/suite/v2/thread_list.rs index 7c3568c63..8eb253df6 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_list.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_list.rs @@ -12,9 +12,11 @@ use codex_app_server_protocol::RequestId; use codex_app_server_protocol::SessionSource; use codex_app_server_protocol::ThreadListResponse; use codex_app_server_protocol::ThreadSortKey; +use codex_core::ARCHIVED_SESSIONS_SUBDIR; use codex_protocol::protocol::GitInfo as CoreGitInfo; use pretty_assertions::assert_eq; use std::cmp::Reverse; +use std::fs; use std::fs::FileTimes; use std::fs::OpenOptions; use std::path::Path; @@ -36,8 +38,9 @@ async fn list_threads( cursor: Option, limit: Option, providers: Option>, + archived: Option, ) -> Result { - list_threads_with_sort(mcp, cursor, limit, providers, None).await + list_threads_with_sort(mcp, cursor, limit, providers, None, archived).await } async fn list_threads_with_sort( @@ -46,6 +49,7 @@ async fn list_threads_with_sort( limit: Option, providers: Option>, sort_key: Option, + archived: Option, ) -> Result { let request_id = mcp .send_thread_list_request(codex_app_server_protocol::ThreadListParams { @@ -53,6 +57,7 @@ async fn list_threads_with_sort( limit, sort_key, model_providers: providers, + archived, }) .await?; let resp: JSONRPCResponse = timeout( @@ -125,6 +130,7 @@ async fn thread_list_basic_empty() -> Result<()> { None, Some(10), Some(vec!["mock_provider".to_string()]), + None, ) .await?; assert!(data.is_empty()); @@ -187,6 +193,7 @@ async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> { None, Some(2), Some(vec!["mock_provider".to_string()]), + None, ) .await?; assert_eq!(data1.len(), 2); @@ -211,6 +218,7 @@ async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> { Some(cursor1), Some(2), Some(vec!["mock_provider".to_string()]), + None, ) .await?; assert!(data2.len() <= 2); @@ -260,6 +268,7 @@ async fn thread_list_respects_provider_filter() -> Result<()> { None, Some(10), Some(vec!["other_provider".to_string()]), + None, ) .await?; assert_eq!(data.len(), 1); @@ -309,6 +318,7 @@ async fn thread_list_fetches_until_limit_or_exhausted() -> Result<()> { None, Some(8), Some(vec!["target_provider".to_string()]), + None, ) .await?; assert_eq!( @@ -353,6 +363,7 @@ async fn thread_list_enforces_max_limit() -> Result<()> { None, Some(200), Some(vec!["mock_provider".to_string()]), + None, ) .await?; assert_eq!( @@ -398,6 +409,7 @@ async fn thread_list_stops_when_not_enough_filtered_results_exist() -> Result<() None, Some(10), Some(vec!["target_provider".to_string()]), + None, ) .await?; assert_eq!( @@ -444,6 +456,7 @@ async fn thread_list_includes_git_info() -> Result<()> { None, Some(10), Some(vec!["mock_provider".to_string()]), + None, ) .await?; let thread = data @@ -502,6 +515,7 @@ async fn thread_list_default_sorts_by_created_at() -> Result<()> { Some(10), Some(vec!["mock_provider".to_string()]), None, + None, ) .await?; @@ -562,6 +576,7 @@ async fn thread_list_sort_updated_at_orders_by_mtime() -> Result<()> { Some(10), Some(vec!["mock_provider".to_string()]), Some(ThreadSortKey::UpdatedAt), + None, ) .await?; @@ -625,6 +640,7 @@ async fn thread_list_updated_at_paginates_with_cursor() -> Result<()> { Some(2), Some(vec!["mock_provider".to_string()]), Some(ThreadSortKey::UpdatedAt), + None, ) .await?; let ids_page1: Vec<_> = page1.iter().map(|thread| thread.id.as_str()).collect(); @@ -640,6 +656,7 @@ async fn thread_list_updated_at_paginates_with_cursor() -> Result<()> { Some(2), Some(vec!["mock_provider".to_string()]), Some(ThreadSortKey::UpdatedAt), + None, ) .await?; let ids_page2: Vec<_> = page2.iter().map(|thread| thread.id.as_str()).collect(); @@ -678,6 +695,7 @@ async fn thread_list_created_at_tie_breaks_by_uuid() -> Result<()> { None, Some(10), Some(vec!["mock_provider".to_string()]), + None, ) .await?; @@ -730,6 +748,7 @@ async fn thread_list_updated_at_tie_breaks_by_uuid() -> Result<()> { Some(10), Some(vec!["mock_provider".to_string()]), Some(ThreadSortKey::UpdatedAt), + None, ) .await?; @@ -769,6 +788,7 @@ async fn thread_list_updated_at_uses_mtime() -> Result<()> { Some(10), Some(vec!["mock_provider".to_string()]), Some(ThreadSortKey::UpdatedAt), + None, ) .await?; @@ -786,6 +806,65 @@ async fn thread_list_updated_at_uses_mtime() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_list_archived_filter() -> Result<()> { + let codex_home = TempDir::new()?; + create_minimal_config(codex_home.path())?; + + let active_id = create_fake_rollout( + codex_home.path(), + "2025-03-01T10-00-00", + "2025-03-01T10:00:00Z", + "Active", + Some("mock_provider"), + None, + )?; + let archived_id = create_fake_rollout( + codex_home.path(), + "2025-03-01T09-00-00", + "2025-03-01T09:00:00Z", + "Archived", + Some("mock_provider"), + None, + )?; + + let archived_dir = codex_home.path().join(ARCHIVED_SESSIONS_SUBDIR); + fs::create_dir_all(&archived_dir)?; + let archived_source = rollout_path(codex_home.path(), "2025-03-01T09-00-00", &archived_id); + let archived_dest = archived_dir.join( + archived_source + .file_name() + .expect("archived rollout should have a file name"), + ); + fs::rename(&archived_source, &archived_dest)?; + + let mut mcp = init_mcp(codex_home.path()).await?; + + let ThreadListResponse { data, .. } = list_threads( + &mut mcp, + None, + Some(10), + Some(vec!["mock_provider".to_string()]), + None, + ) + .await?; + assert_eq!(data.len(), 1); + assert_eq!(data[0].id, active_id); + + let ThreadListResponse { data, .. } = list_threads( + &mut mcp, + None, + Some(10), + Some(vec!["mock_provider".to_string()]), + Some(true), + ) + .await?; + assert_eq!(data.len(), 1); + assert_eq!(data[0].id, archived_id); + + Ok(()) +} + #[tokio::test] async fn thread_list_invalid_cursor_returns_error() -> Result<()> { let codex_home = TempDir::new()?; @@ -799,6 +878,7 @@ async fn thread_list_invalid_cursor_returns_error() -> Result<()> { limit: Some(2), sort_key: None, model_providers: Some(vec!["mock_provider".to_string()]), + archived: None, }) .await?; let error: JSONRPCError = timeout( diff --git a/codex-rs/core/src/rollout/list.rs b/codex-rs/core/src/rollout/list.rs index 01e7b1733..e571dccc6 100644 --- a/codex-rs/core/src/rollout/list.rs +++ b/codex-rs/core/src/rollout/list.rs @@ -79,6 +79,19 @@ pub enum ThreadSortKey { UpdatedAt, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum ThreadListLayout { + NestedByDate, + Flat, +} + +pub(crate) struct ThreadListConfig<'a> { + pub(crate) allowed_sources: &'a [SessionSource], + pub(crate) model_providers: Option<&'a [String]>, + pub(crate) default_provider: &'a str, + pub(crate) layout: ThreadListLayout, +} + /// Pagination cursor identifying a file by timestamp and UUID. #[derive(Debug, Clone, PartialEq, Eq)] pub struct Cursor { @@ -259,9 +272,29 @@ pub(crate) async fn get_threads( model_providers: Option<&[String]>, default_provider: &str, ) -> io::Result { - let mut root = codex_home.to_path_buf(); - root.push(SESSIONS_SUBDIR); + let root = codex_home.join(SESSIONS_SUBDIR); + get_threads_in_root( + root, + page_size, + cursor, + sort_key, + ThreadListConfig { + allowed_sources, + model_providers, + default_provider, + layout: ThreadListLayout::NestedByDate, + }, + ) + .await +} +pub(crate) async fn get_threads_in_root( + root: PathBuf, + page_size: usize, + cursor: Option<&Cursor>, + sort_key: ThreadSortKey, + config: ThreadListConfig<'_>, +) -> io::Result { if !root.exists() { return Ok(ThreadsPage { items: Vec::new(), @@ -273,18 +306,34 @@ pub(crate) async fn get_threads( let anchor = cursor.cloned(); - let provider_matcher = - model_providers.and_then(|filters| ProviderMatcher::new(filters, default_provider)); + let provider_matcher = config + .model_providers + .and_then(|filters| ProviderMatcher::new(filters, config.default_provider)); - let result = traverse_directories_for_paths( - root.clone(), - page_size, - anchor, - sort_key, - allowed_sources, - provider_matcher.as_ref(), - ) - .await?; + let result = match config.layout { + ThreadListLayout::NestedByDate => { + traverse_directories_for_paths( + root.clone(), + page_size, + anchor, + sort_key, + config.allowed_sources, + provider_matcher.as_ref(), + ) + .await? + } + ThreadListLayout::Flat => { + traverse_flat_paths( + root.clone(), + page_size, + anchor, + sort_key, + config.allowed_sources, + provider_matcher.as_ref(), + ) + .await? + } + }; Ok(result) } @@ -324,6 +373,26 @@ async fn traverse_directories_for_paths( } } +async fn traverse_flat_paths( + root: PathBuf, + page_size: usize, + anchor: Option, + sort_key: ThreadSortKey, + allowed_sources: &[SessionSource], + provider_matcher: Option<&ProviderMatcher<'_>>, +) -> io::Result { + match sort_key { + ThreadSortKey::CreatedAt => { + traverse_flat_paths_created(root, page_size, anchor, allowed_sources, provider_matcher) + .await + } + ThreadSortKey::UpdatedAt => { + traverse_flat_paths_updated(root, page_size, anchor, allowed_sources, provider_matcher) + .await + } + } +} + /// Walk the rollout directory tree in reverse chronological order and /// collect items until the page fills or the scan cap is hit. /// @@ -437,6 +506,116 @@ async fn traverse_directories_for_paths_updated( }) } +async fn traverse_flat_paths_created( + root: PathBuf, + page_size: usize, + anchor: Option, + allowed_sources: &[SessionSource], + provider_matcher: Option<&ProviderMatcher<'_>>, +) -> io::Result { + let mut items: Vec = Vec::with_capacity(page_size); + let mut scanned_files = 0usize; + let mut anchor_state = AnchorState::new(anchor); + let mut more_matches_available = false; + + let files = collect_flat_rollout_files(&root, &mut scanned_files).await?; + for (ts, id, path) in files.into_iter() { + if anchor_state.should_skip(ts, id) { + continue; + } + if items.len() == page_size { + more_matches_available = true; + break; + } + let updated_at = file_modified_time(&path) + .await + .unwrap_or(None) + .and_then(format_rfc3339); + if let Some(item) = + build_thread_item(path, allowed_sources, provider_matcher, updated_at).await + { + items.push(item); + } + } + + let reached_scan_cap = scanned_files >= MAX_SCAN_FILES; + if reached_scan_cap && !items.is_empty() { + more_matches_available = true; + } + + let next = if more_matches_available { + build_next_cursor(&items, ThreadSortKey::CreatedAt) + } else { + None + }; + Ok(ThreadsPage { + items, + next_cursor: next, + num_scanned_files: scanned_files, + reached_scan_cap, + }) +} + +async fn traverse_flat_paths_updated( + root: PathBuf, + page_size: usize, + anchor: Option, + allowed_sources: &[SessionSource], + provider_matcher: Option<&ProviderMatcher<'_>>, +) -> io::Result { + let mut items: Vec = Vec::with_capacity(page_size); + let mut scanned_files = 0usize; + let mut anchor_state = AnchorState::new(anchor); + let mut more_matches_available = false; + + let candidates = collect_flat_files_by_updated_at(&root, &mut scanned_files).await?; + let mut candidates = candidates; + candidates.sort_by_key(|candidate| { + let ts = candidate.updated_at.unwrap_or(OffsetDateTime::UNIX_EPOCH); + (Reverse(ts), Reverse(candidate.id)) + }); + + for candidate in candidates.into_iter() { + let ts = candidate.updated_at.unwrap_or(OffsetDateTime::UNIX_EPOCH); + if anchor_state.should_skip(ts, candidate.id) { + continue; + } + if items.len() == page_size { + more_matches_available = true; + break; + } + + let updated_at_fallback = candidate.updated_at.and_then(format_rfc3339); + if let Some(item) = build_thread_item( + candidate.path, + allowed_sources, + provider_matcher, + updated_at_fallback, + ) + .await + { + items.push(item); + } + } + + let reached_scan_cap = scanned_files >= MAX_SCAN_FILES; + if reached_scan_cap && !items.is_empty() { + more_matches_available = true; + } + + let next = if more_matches_available { + build_next_cursor(&items, ThreadSortKey::UpdatedAt) + } else { + None + }; + Ok(ThreadsPage { + items, + next_cursor: next, + num_scanned_files: scanned_files, + reached_scan_cap, + }) +} + /// Pagination cursor token format: "|" where `ts` uses /// YYYY-MM-DDThh-mm-ss (UTC, second precision). /// The cursor orders files by the requested sort key (timestamp desc, then UUID desc). @@ -558,6 +737,44 @@ where Ok(collected) } +async fn collect_flat_rollout_files( + root: &Path, + scanned_files: &mut usize, +) -> io::Result> { + let mut dir = tokio::fs::read_dir(root).await?; + let mut collected = Vec::new(); + while let Some(entry) = dir.next_entry().await? { + if *scanned_files >= MAX_SCAN_FILES { + break; + } + if !entry + .file_type() + .await + .map(|ft| ft.is_file()) + .unwrap_or(false) + { + continue; + } + let file_name = entry.file_name(); + let Some(name_str) = file_name.to_str() else { + continue; + }; + if !name_str.starts_with("rollout-") || !name_str.ends_with(".jsonl") { + continue; + } + let Some((ts, id)) = parse_timestamp_uuid_from_filename(name_str) else { + continue; + }; + *scanned_files += 1; + if *scanned_files > MAX_SCAN_FILES { + break; + } + collected.push((ts, id, entry.path())); + } + collected.sort_by_key(|(ts, sid, _path)| (Reverse(*ts), Reverse(*sid))); + Ok(collected) +} + async fn collect_rollout_day_files( day_path: &Path, ) -> io::Result> { @@ -610,6 +827,49 @@ async fn collect_files_by_updated_at( Ok(candidates) } +async fn collect_flat_files_by_updated_at( + root: &Path, + scanned_files: &mut usize, +) -> io::Result> { + let mut candidates = Vec::new(); + let mut dir = tokio::fs::read_dir(root).await?; + while let Some(entry) = dir.next_entry().await? { + if *scanned_files >= MAX_SCAN_FILES { + break; + } + if !entry + .file_type() + .await + .map(|ft| ft.is_file()) + .unwrap_or(false) + { + continue; + } + let file_name = entry.file_name(); + let Some(name_str) = file_name.to_str() else { + continue; + }; + if !name_str.starts_with("rollout-") || !name_str.ends_with(".jsonl") { + continue; + } + let Some((_ts, id)) = parse_timestamp_uuid_from_filename(name_str) else { + continue; + }; + *scanned_files += 1; + if *scanned_files > MAX_SCAN_FILES { + break; + } + let updated_at = file_modified_time(&entry.path()).await.unwrap_or(None); + candidates.push(ThreadCandidate { + path: entry.path(), + id, + updated_at, + }); + } + + Ok(candidates) +} + async fn walk_rollout_files( root: &Path, scanned_files: &mut usize, diff --git a/codex-rs/core/src/rollout/recorder.rs b/codex-rs/core/src/rollout/recorder.rs index c5bd14b3c..53425051c 100644 --- a/codex-rs/core/src/rollout/recorder.rs +++ b/codex-rs/core/src/rollout/recorder.rs @@ -19,11 +19,15 @@ use tokio::sync::oneshot; use tracing::info; use tracing::warn; +use super::ARCHIVED_SESSIONS_SUBDIR; use super::SESSIONS_SUBDIR; use super::list::Cursor; +use super::list::ThreadListConfig; +use super::list::ThreadListLayout; use super::list::ThreadSortKey; use super::list::ThreadsPage; use super::list::get_threads; +use super::list::get_threads_in_root; use super::policy::is_persisted_response_item; use crate::config::Config; use crate::default_client::originator; @@ -119,6 +123,32 @@ impl RolloutRecorder { .await } + /// List archived threads (rollout files) under the archived sessions directory. + pub async fn list_archived_threads( + codex_home: &Path, + page_size: usize, + cursor: Option<&Cursor>, + sort_key: ThreadSortKey, + allowed_sources: &[SessionSource], + model_providers: Option<&[String]>, + default_provider: &str, + ) -> std::io::Result { + let root = codex_home.join(ARCHIVED_SESSIONS_SUBDIR); + get_threads_in_root( + root, + page_size, + cursor, + sort_key, + ThreadListConfig { + allowed_sources, + model_providers, + default_provider, + layout: ThreadListLayout::Flat, + }, + ) + .await + } + /// Find the newest recorded thread path, optionally filtering to a matching cwd. #[allow(clippy::too_many_arguments)] pub async fn find_latest_thread_path( diff --git a/codex-rs/debug-client/src/client.rs b/codex-rs/debug-client/src/client.rs index 673a2a8fb..c0a2746ee 100644 --- a/codex-rs/debug-client/src/client.rs +++ b/codex-rs/debug-client/src/client.rs @@ -173,6 +173,7 @@ impl AppServerClient { limit: None, sort_key: None, model_providers: None, + archived: None, }, }; self.send(&request)?;