feat(app-server): support archived threads in thread/list (#9571)

This commit is contained in:
Owen Lin 2026-01-22 12:22:36 -08:00 committed by GitHub
parent 80240b3b67
commit 733cb68496
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 433 additions and 31 deletions

View file

@ -1205,6 +1205,9 @@ pub struct ThreadListParams {
/// Optional provider filter; when set, only sessions recorded under these
/// providers are returned. When present but empty, includes all providers.
pub model_providers: Option<Vec<String>>,
/// Optional archived filter; when set to true, only archived threads are returned.
/// If false or null, only non-archived threads are returned.
pub archived: Option<bool>,
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, JsonSchema, TS)]

View file

@ -148,6 +148,7 @@ To branch from a stored session, call `thread/fork` with the `thread.id`. This c
- `limit` — server defaults to a reasonable page size if unset.
- `sortKey``created_at` (default) or `updated_at`.
- `modelProviders` — restrict results to specific providers; unset, null, or an empty array will include all providers.
- `archived` — when `true`, list archived threads only. When `false` or `null`, list non-archived threads (default).
Example:
@ -202,7 +203,7 @@ Use `thread/archive` to move the persisted rollout (stored as a JSONL file on di
{ "id": 21, "result": {} }
```
An archived thread will not appear in future calls to `thread/list`.
An archived thread will not appear in `thread/list` unless `archived` is set to `true`.
### Example: Start a turn (send user input)

View file

@ -1626,6 +1626,7 @@ impl CodexMessageProcessor {
limit,
sort_key,
model_providers,
archived,
} = params;
let requested_page_size = limit
@ -1637,7 +1638,13 @@ impl CodexMessageProcessor {
ThreadSortKey::UpdatedAt => CoreThreadSortKey::UpdatedAt,
};
let (summaries, next_cursor) = match self
.list_threads_common(requested_page_size, cursor, model_providers, core_sort_key)
.list_threads_common(
requested_page_size,
cursor,
model_providers,
core_sort_key,
archived.unwrap_or(false),
)
.await
{
Ok(r) => r,
@ -2280,6 +2287,7 @@ impl CodexMessageProcessor {
cursor,
model_providers,
CoreThreadSortKey::UpdatedAt,
false,
)
.await
{
@ -2299,6 +2307,7 @@ impl CodexMessageProcessor {
cursor: Option<String>,
model_providers: Option<Vec<String>>,
sort_key: CoreThreadSortKey,
archived: bool,
) -> Result<(Vec<ConversationSummary>, Option<String>), JSONRPCErrorError> {
let mut cursor_obj: Option<RolloutCursor> = match cursor.as_ref() {
Some(cursor_str) => {
@ -2329,21 +2338,39 @@ impl CodexMessageProcessor {
while remaining > 0 {
let page_size = remaining.min(THREAD_LIST_MAX_LIMIT);
let page = RolloutRecorder::list_threads(
&self.config.codex_home,
page_size,
cursor_obj.as_ref(),
sort_key,
INTERACTIVE_SESSION_SOURCES,
model_provider_filter.as_deref(),
fallback_provider.as_str(),
)
.await
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to list threads: {err}"),
data: None,
})?;
let page = if archived {
RolloutRecorder::list_archived_threads(
&self.config.codex_home,
page_size,
cursor_obj.as_ref(),
sort_key,
INTERACTIVE_SESSION_SOURCES,
model_provider_filter.as_deref(),
fallback_provider.as_str(),
)
.await
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to list threads: {err}"),
data: None,
})?
} else {
RolloutRecorder::list_threads(
&self.config.codex_home,
page_size,
cursor_obj.as_ref(),
sort_key,
INTERACTIVE_SESSION_SOURCES,
model_provider_filter.as_deref(),
fallback_provider.as_str(),
)
.await
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to list threads: {err}"),
data: None,
})?
};
let mut filtered = page
.items

View file

@ -12,9 +12,11 @@ use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SessionSource;
use codex_app_server_protocol::ThreadListResponse;
use codex_app_server_protocol::ThreadSortKey;
use codex_core::ARCHIVED_SESSIONS_SUBDIR;
use codex_protocol::protocol::GitInfo as CoreGitInfo;
use pretty_assertions::assert_eq;
use std::cmp::Reverse;
use std::fs;
use std::fs::FileTimes;
use std::fs::OpenOptions;
use std::path::Path;
@ -36,8 +38,9 @@ async fn list_threads(
cursor: Option<String>,
limit: Option<u32>,
providers: Option<Vec<String>>,
archived: Option<bool>,
) -> Result<ThreadListResponse> {
list_threads_with_sort(mcp, cursor, limit, providers, None).await
list_threads_with_sort(mcp, cursor, limit, providers, None, archived).await
}
async fn list_threads_with_sort(
@ -46,6 +49,7 @@ async fn list_threads_with_sort(
limit: Option<u32>,
providers: Option<Vec<String>>,
sort_key: Option<ThreadSortKey>,
archived: Option<bool>,
) -> Result<ThreadListResponse> {
let request_id = mcp
.send_thread_list_request(codex_app_server_protocol::ThreadListParams {
@ -53,6 +57,7 @@ async fn list_threads_with_sort(
limit,
sort_key,
model_providers: providers,
archived,
})
.await?;
let resp: JSONRPCResponse = timeout(
@ -125,6 +130,7 @@ async fn thread_list_basic_empty() -> Result<()> {
None,
Some(10),
Some(vec!["mock_provider".to_string()]),
None,
)
.await?;
assert!(data.is_empty());
@ -187,6 +193,7 @@ async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> {
None,
Some(2),
Some(vec!["mock_provider".to_string()]),
None,
)
.await?;
assert_eq!(data1.len(), 2);
@ -211,6 +218,7 @@ async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> {
Some(cursor1),
Some(2),
Some(vec!["mock_provider".to_string()]),
None,
)
.await?;
assert!(data2.len() <= 2);
@ -260,6 +268,7 @@ async fn thread_list_respects_provider_filter() -> Result<()> {
None,
Some(10),
Some(vec!["other_provider".to_string()]),
None,
)
.await?;
assert_eq!(data.len(), 1);
@ -309,6 +318,7 @@ async fn thread_list_fetches_until_limit_or_exhausted() -> Result<()> {
None,
Some(8),
Some(vec!["target_provider".to_string()]),
None,
)
.await?;
assert_eq!(
@ -353,6 +363,7 @@ async fn thread_list_enforces_max_limit() -> Result<()> {
None,
Some(200),
Some(vec!["mock_provider".to_string()]),
None,
)
.await?;
assert_eq!(
@ -398,6 +409,7 @@ async fn thread_list_stops_when_not_enough_filtered_results_exist() -> Result<()
None,
Some(10),
Some(vec!["target_provider".to_string()]),
None,
)
.await?;
assert_eq!(
@ -444,6 +456,7 @@ async fn thread_list_includes_git_info() -> Result<()> {
None,
Some(10),
Some(vec!["mock_provider".to_string()]),
None,
)
.await?;
let thread = data
@ -502,6 +515,7 @@ async fn thread_list_default_sorts_by_created_at() -> Result<()> {
Some(10),
Some(vec!["mock_provider".to_string()]),
None,
None,
)
.await?;
@ -562,6 +576,7 @@ async fn thread_list_sort_updated_at_orders_by_mtime() -> Result<()> {
Some(10),
Some(vec!["mock_provider".to_string()]),
Some(ThreadSortKey::UpdatedAt),
None,
)
.await?;
@ -625,6 +640,7 @@ async fn thread_list_updated_at_paginates_with_cursor() -> Result<()> {
Some(2),
Some(vec!["mock_provider".to_string()]),
Some(ThreadSortKey::UpdatedAt),
None,
)
.await?;
let ids_page1: Vec<_> = page1.iter().map(|thread| thread.id.as_str()).collect();
@ -640,6 +656,7 @@ async fn thread_list_updated_at_paginates_with_cursor() -> Result<()> {
Some(2),
Some(vec!["mock_provider".to_string()]),
Some(ThreadSortKey::UpdatedAt),
None,
)
.await?;
let ids_page2: Vec<_> = page2.iter().map(|thread| thread.id.as_str()).collect();
@ -678,6 +695,7 @@ async fn thread_list_created_at_tie_breaks_by_uuid() -> Result<()> {
None,
Some(10),
Some(vec!["mock_provider".to_string()]),
None,
)
.await?;
@ -730,6 +748,7 @@ async fn thread_list_updated_at_tie_breaks_by_uuid() -> Result<()> {
Some(10),
Some(vec!["mock_provider".to_string()]),
Some(ThreadSortKey::UpdatedAt),
None,
)
.await?;
@ -769,6 +788,7 @@ async fn thread_list_updated_at_uses_mtime() -> Result<()> {
Some(10),
Some(vec!["mock_provider".to_string()]),
Some(ThreadSortKey::UpdatedAt),
None,
)
.await?;
@ -786,6 +806,65 @@ async fn thread_list_updated_at_uses_mtime() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_list_archived_filter() -> Result<()> {
let codex_home = TempDir::new()?;
create_minimal_config(codex_home.path())?;
let active_id = create_fake_rollout(
codex_home.path(),
"2025-03-01T10-00-00",
"2025-03-01T10:00:00Z",
"Active",
Some("mock_provider"),
None,
)?;
let archived_id = create_fake_rollout(
codex_home.path(),
"2025-03-01T09-00-00",
"2025-03-01T09:00:00Z",
"Archived",
Some("mock_provider"),
None,
)?;
let archived_dir = codex_home.path().join(ARCHIVED_SESSIONS_SUBDIR);
fs::create_dir_all(&archived_dir)?;
let archived_source = rollout_path(codex_home.path(), "2025-03-01T09-00-00", &archived_id);
let archived_dest = archived_dir.join(
archived_source
.file_name()
.expect("archived rollout should have a file name"),
);
fs::rename(&archived_source, &archived_dest)?;
let mut mcp = init_mcp(codex_home.path()).await?;
let ThreadListResponse { data, .. } = list_threads(
&mut mcp,
None,
Some(10),
Some(vec!["mock_provider".to_string()]),
None,
)
.await?;
assert_eq!(data.len(), 1);
assert_eq!(data[0].id, active_id);
let ThreadListResponse { data, .. } = list_threads(
&mut mcp,
None,
Some(10),
Some(vec!["mock_provider".to_string()]),
Some(true),
)
.await?;
assert_eq!(data.len(), 1);
assert_eq!(data[0].id, archived_id);
Ok(())
}
#[tokio::test]
async fn thread_list_invalid_cursor_returns_error() -> Result<()> {
let codex_home = TempDir::new()?;
@ -799,6 +878,7 @@ async fn thread_list_invalid_cursor_returns_error() -> Result<()> {
limit: Some(2),
sort_key: None,
model_providers: Some(vec!["mock_provider".to_string()]),
archived: None,
})
.await?;
let error: JSONRPCError = timeout(

View file

@ -79,6 +79,19 @@ pub enum ThreadSortKey {
UpdatedAt,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ThreadListLayout {
NestedByDate,
Flat,
}
pub(crate) struct ThreadListConfig<'a> {
pub(crate) allowed_sources: &'a [SessionSource],
pub(crate) model_providers: Option<&'a [String]>,
pub(crate) default_provider: &'a str,
pub(crate) layout: ThreadListLayout,
}
/// Pagination cursor identifying a file by timestamp and UUID.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Cursor {
@ -259,9 +272,29 @@ pub(crate) async fn get_threads(
model_providers: Option<&[String]>,
default_provider: &str,
) -> io::Result<ThreadsPage> {
let mut root = codex_home.to_path_buf();
root.push(SESSIONS_SUBDIR);
let root = codex_home.join(SESSIONS_SUBDIR);
get_threads_in_root(
root,
page_size,
cursor,
sort_key,
ThreadListConfig {
allowed_sources,
model_providers,
default_provider,
layout: ThreadListLayout::NestedByDate,
},
)
.await
}
pub(crate) async fn get_threads_in_root(
root: PathBuf,
page_size: usize,
cursor: Option<&Cursor>,
sort_key: ThreadSortKey,
config: ThreadListConfig<'_>,
) -> io::Result<ThreadsPage> {
if !root.exists() {
return Ok(ThreadsPage {
items: Vec::new(),
@ -273,18 +306,34 @@ pub(crate) async fn get_threads(
let anchor = cursor.cloned();
let provider_matcher =
model_providers.and_then(|filters| ProviderMatcher::new(filters, default_provider));
let provider_matcher = config
.model_providers
.and_then(|filters| ProviderMatcher::new(filters, config.default_provider));
let result = traverse_directories_for_paths(
root.clone(),
page_size,
anchor,
sort_key,
allowed_sources,
provider_matcher.as_ref(),
)
.await?;
let result = match config.layout {
ThreadListLayout::NestedByDate => {
traverse_directories_for_paths(
root.clone(),
page_size,
anchor,
sort_key,
config.allowed_sources,
provider_matcher.as_ref(),
)
.await?
}
ThreadListLayout::Flat => {
traverse_flat_paths(
root.clone(),
page_size,
anchor,
sort_key,
config.allowed_sources,
provider_matcher.as_ref(),
)
.await?
}
};
Ok(result)
}
@ -324,6 +373,26 @@ async fn traverse_directories_for_paths(
}
}
async fn traverse_flat_paths(
root: PathBuf,
page_size: usize,
anchor: Option<Cursor>,
sort_key: ThreadSortKey,
allowed_sources: &[SessionSource],
provider_matcher: Option<&ProviderMatcher<'_>>,
) -> io::Result<ThreadsPage> {
match sort_key {
ThreadSortKey::CreatedAt => {
traverse_flat_paths_created(root, page_size, anchor, allowed_sources, provider_matcher)
.await
}
ThreadSortKey::UpdatedAt => {
traverse_flat_paths_updated(root, page_size, anchor, allowed_sources, provider_matcher)
.await
}
}
}
/// Walk the rollout directory tree in reverse chronological order and
/// collect items until the page fills or the scan cap is hit.
///
@ -437,6 +506,116 @@ async fn traverse_directories_for_paths_updated(
})
}
async fn traverse_flat_paths_created(
root: PathBuf,
page_size: usize,
anchor: Option<Cursor>,
allowed_sources: &[SessionSource],
provider_matcher: Option<&ProviderMatcher<'_>>,
) -> io::Result<ThreadsPage> {
let mut items: Vec<ThreadItem> = Vec::with_capacity(page_size);
let mut scanned_files = 0usize;
let mut anchor_state = AnchorState::new(anchor);
let mut more_matches_available = false;
let files = collect_flat_rollout_files(&root, &mut scanned_files).await?;
for (ts, id, path) in files.into_iter() {
if anchor_state.should_skip(ts, id) {
continue;
}
if items.len() == page_size {
more_matches_available = true;
break;
}
let updated_at = file_modified_time(&path)
.await
.unwrap_or(None)
.and_then(format_rfc3339);
if let Some(item) =
build_thread_item(path, allowed_sources, provider_matcher, updated_at).await
{
items.push(item);
}
}
let reached_scan_cap = scanned_files >= MAX_SCAN_FILES;
if reached_scan_cap && !items.is_empty() {
more_matches_available = true;
}
let next = if more_matches_available {
build_next_cursor(&items, ThreadSortKey::CreatedAt)
} else {
None
};
Ok(ThreadsPage {
items,
next_cursor: next,
num_scanned_files: scanned_files,
reached_scan_cap,
})
}
async fn traverse_flat_paths_updated(
root: PathBuf,
page_size: usize,
anchor: Option<Cursor>,
allowed_sources: &[SessionSource],
provider_matcher: Option<&ProviderMatcher<'_>>,
) -> io::Result<ThreadsPage> {
let mut items: Vec<ThreadItem> = Vec::with_capacity(page_size);
let mut scanned_files = 0usize;
let mut anchor_state = AnchorState::new(anchor);
let mut more_matches_available = false;
let candidates = collect_flat_files_by_updated_at(&root, &mut scanned_files).await?;
let mut candidates = candidates;
candidates.sort_by_key(|candidate| {
let ts = candidate.updated_at.unwrap_or(OffsetDateTime::UNIX_EPOCH);
(Reverse(ts), Reverse(candidate.id))
});
for candidate in candidates.into_iter() {
let ts = candidate.updated_at.unwrap_or(OffsetDateTime::UNIX_EPOCH);
if anchor_state.should_skip(ts, candidate.id) {
continue;
}
if items.len() == page_size {
more_matches_available = true;
break;
}
let updated_at_fallback = candidate.updated_at.and_then(format_rfc3339);
if let Some(item) = build_thread_item(
candidate.path,
allowed_sources,
provider_matcher,
updated_at_fallback,
)
.await
{
items.push(item);
}
}
let reached_scan_cap = scanned_files >= MAX_SCAN_FILES;
if reached_scan_cap && !items.is_empty() {
more_matches_available = true;
}
let next = if more_matches_available {
build_next_cursor(&items, ThreadSortKey::UpdatedAt)
} else {
None
};
Ok(ThreadsPage {
items,
next_cursor: next,
num_scanned_files: scanned_files,
reached_scan_cap,
})
}
/// Pagination cursor token format: "<ts>|<uuid>" where `ts` uses
/// YYYY-MM-DDThh-mm-ss (UTC, second precision).
/// The cursor orders files by the requested sort key (timestamp desc, then UUID desc).
@ -558,6 +737,44 @@ where
Ok(collected)
}
async fn collect_flat_rollout_files(
root: &Path,
scanned_files: &mut usize,
) -> io::Result<Vec<(OffsetDateTime, Uuid, PathBuf)>> {
let mut dir = tokio::fs::read_dir(root).await?;
let mut collected = Vec::new();
while let Some(entry) = dir.next_entry().await? {
if *scanned_files >= MAX_SCAN_FILES {
break;
}
if !entry
.file_type()
.await
.map(|ft| ft.is_file())
.unwrap_or(false)
{
continue;
}
let file_name = entry.file_name();
let Some(name_str) = file_name.to_str() else {
continue;
};
if !name_str.starts_with("rollout-") || !name_str.ends_with(".jsonl") {
continue;
}
let Some((ts, id)) = parse_timestamp_uuid_from_filename(name_str) else {
continue;
};
*scanned_files += 1;
if *scanned_files > MAX_SCAN_FILES {
break;
}
collected.push((ts, id, entry.path()));
}
collected.sort_by_key(|(ts, sid, _path)| (Reverse(*ts), Reverse(*sid)));
Ok(collected)
}
async fn collect_rollout_day_files(
day_path: &Path,
) -> io::Result<Vec<(OffsetDateTime, Uuid, PathBuf)>> {
@ -610,6 +827,49 @@ async fn collect_files_by_updated_at(
Ok(candidates)
}
async fn collect_flat_files_by_updated_at(
root: &Path,
scanned_files: &mut usize,
) -> io::Result<Vec<ThreadCandidate>> {
let mut candidates = Vec::new();
let mut dir = tokio::fs::read_dir(root).await?;
while let Some(entry) = dir.next_entry().await? {
if *scanned_files >= MAX_SCAN_FILES {
break;
}
if !entry
.file_type()
.await
.map(|ft| ft.is_file())
.unwrap_or(false)
{
continue;
}
let file_name = entry.file_name();
let Some(name_str) = file_name.to_str() else {
continue;
};
if !name_str.starts_with("rollout-") || !name_str.ends_with(".jsonl") {
continue;
}
let Some((_ts, id)) = parse_timestamp_uuid_from_filename(name_str) else {
continue;
};
*scanned_files += 1;
if *scanned_files > MAX_SCAN_FILES {
break;
}
let updated_at = file_modified_time(&entry.path()).await.unwrap_or(None);
candidates.push(ThreadCandidate {
path: entry.path(),
id,
updated_at,
});
}
Ok(candidates)
}
async fn walk_rollout_files(
root: &Path,
scanned_files: &mut usize,

View file

@ -19,11 +19,15 @@ use tokio::sync::oneshot;
use tracing::info;
use tracing::warn;
use super::ARCHIVED_SESSIONS_SUBDIR;
use super::SESSIONS_SUBDIR;
use super::list::Cursor;
use super::list::ThreadListConfig;
use super::list::ThreadListLayout;
use super::list::ThreadSortKey;
use super::list::ThreadsPage;
use super::list::get_threads;
use super::list::get_threads_in_root;
use super::policy::is_persisted_response_item;
use crate::config::Config;
use crate::default_client::originator;
@ -119,6 +123,32 @@ impl RolloutRecorder {
.await
}
/// List archived threads (rollout files) under the archived sessions directory.
pub async fn list_archived_threads(
codex_home: &Path,
page_size: usize,
cursor: Option<&Cursor>,
sort_key: ThreadSortKey,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
default_provider: &str,
) -> std::io::Result<ThreadsPage> {
let root = codex_home.join(ARCHIVED_SESSIONS_SUBDIR);
get_threads_in_root(
root,
page_size,
cursor,
sort_key,
ThreadListConfig {
allowed_sources,
model_providers,
default_provider,
layout: ThreadListLayout::Flat,
},
)
.await
}
/// Find the newest recorded thread path, optionally filtering to a matching cwd.
#[allow(clippy::too_many_arguments)]
pub async fn find_latest_thread_path(

View file

@ -173,6 +173,7 @@ impl AppServerClient {
limit: None,
sort_key: None,
model_providers: None,
archived: None,
},
};
self.send(&request)?;