Fix resume picker when user event appears after head (#9512)
Fixes #9501 Contributing guide: https://github.com/openai/codex/blob/main/docs/contributing.md ## Summary The resume picker requires a session_meta line and at least one user_message event within the initial head scan. Some rollout files contain multiple session_meta entries before the first user_message, so the user event can fall outside the default head window and the session is omitted from the picker even though it is resumable by ID. This PR keeps the head summary bounded but extends scanning for a user_message once a session_meta has been observed. The summary still caps stored head entries, but we allow a small, bounded extra scan to find the first user event so valid sessions are not filtered out. ## Changes - Continue scanning past the head limit (bounded) when session_meta is present but no user_message has been seen yet. - Mark session_meta as seen even if the head summary buffer is already full. - Add a regression test with multiple session_meta lines before the first user_message. ## Why This Is Safe - The head summary remains bounded to avoid unbounded memory usage. - The extra scan is capped (USER_EVENT_SCAN_LIMIT) and only triggers after a session_meta is seen. - Behavior is unchanged for typical files where the user_message appears early. ## Testing - cargo test -p codex-core --lib test_list_threads_scans_past_head_for_user_event
This commit is contained in:
parent
45fe58159e
commit
e0ae219f36
2 changed files with 97 additions and 4 deletions
|
|
@ -72,6 +72,7 @@ struct HeadTailSummary {
|
|||
/// Hard cap to bound worst‑case work per request.
|
||||
const MAX_SCAN_FILES: usize = 10000;
|
||||
const HEAD_RECORD_LIMIT: usize = 10;
|
||||
const USER_EVENT_SCAN_LIMIT: usize = 200;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum ThreadSortKey {
|
||||
|
|
@ -943,14 +944,20 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTai
|
|||
let reader = tokio::io::BufReader::new(file);
|
||||
let mut lines = reader.lines();
|
||||
let mut summary = HeadTailSummary::default();
|
||||
let mut lines_scanned = 0usize;
|
||||
|
||||
while summary.head.len() < head_limit {
|
||||
while lines_scanned < head_limit
|
||||
|| (summary.saw_session_meta
|
||||
&& !summary.saw_user_event
|
||||
&& lines_scanned < head_limit + USER_EVENT_SCAN_LIMIT)
|
||||
{
|
||||
let line_opt = lines.next_line().await?;
|
||||
let Some(line) = line_opt else { break };
|
||||
let trimmed = line.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
lines_scanned += 1;
|
||||
|
||||
let parsed: Result<RolloutLine, _> = serde_json::from_str(trimmed);
|
||||
let Ok(rollout_line) = parsed else { continue };
|
||||
|
|
@ -963,9 +970,11 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTai
|
|||
.created_at
|
||||
.clone()
|
||||
.or_else(|| Some(rollout_line.timestamp.clone()));
|
||||
if let Ok(val) = serde_json::to_value(session_meta_line) {
|
||||
summary.saw_session_meta = true;
|
||||
if summary.head.len() < head_limit
|
||||
&& let Ok(val) = serde_json::to_value(session_meta_line)
|
||||
{
|
||||
summary.head.push(val);
|
||||
summary.saw_session_meta = true;
|
||||
}
|
||||
}
|
||||
RolloutItem::ResponseItem(item) => {
|
||||
|
|
@ -973,7 +982,9 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTai
|
|||
.created_at
|
||||
.clone()
|
||||
.or_else(|| Some(rollout_line.timestamp.clone()));
|
||||
if let Ok(val) = serde_json::to_value(item) {
|
||||
if summary.head.len() < head_limit
|
||||
&& let Ok(val) = serde_json::to_value(item)
|
||||
{
|
||||
summary.head.push(val);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -131,6 +131,63 @@ fn write_session_file_with_provider(
|
|||
Ok((dt, uuid))
|
||||
}
|
||||
|
||||
fn write_session_file_with_delayed_user_event(
|
||||
root: &Path,
|
||||
ts_str: &str,
|
||||
uuid: Uuid,
|
||||
meta_lines_before_user: usize,
|
||||
) -> std::io::Result<()> {
|
||||
let format: &[FormatItem] =
|
||||
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
|
||||
let dt = PrimitiveDateTime::parse(ts_str, format)
|
||||
.unwrap()
|
||||
.assume_utc();
|
||||
let dir = root
|
||||
.join("sessions")
|
||||
.join(format!("{:04}", dt.year()))
|
||||
.join(format!("{:02}", u8::from(dt.month())))
|
||||
.join(format!("{:02}", dt.day()));
|
||||
fs::create_dir_all(&dir)?;
|
||||
|
||||
let filename = format!("rollout-{ts_str}-{uuid}.jsonl");
|
||||
let file_path = dir.join(filename);
|
||||
let mut file = File::create(file_path)?;
|
||||
|
||||
for i in 0..meta_lines_before_user {
|
||||
let id = if i == 0 {
|
||||
uuid
|
||||
} else {
|
||||
Uuid::from_u128(100 + i as u128)
|
||||
};
|
||||
let payload = serde_json::json!({
|
||||
"id": id,
|
||||
"timestamp": ts_str,
|
||||
"cwd": ".",
|
||||
"originator": "test_originator",
|
||||
"cli_version": "test_version",
|
||||
"source": "vscode",
|
||||
"model_provider": "test-provider",
|
||||
});
|
||||
let meta = serde_json::json!({
|
||||
"timestamp": ts_str,
|
||||
"type": "session_meta",
|
||||
"payload": payload,
|
||||
});
|
||||
writeln!(file, "{meta}")?;
|
||||
}
|
||||
|
||||
let user_event = serde_json::json!({
|
||||
"timestamp": ts_str,
|
||||
"type": "event_msg",
|
||||
"payload": {"type": "user_message", "message": "Hello from user", "kind": "plain"}
|
||||
});
|
||||
writeln!(file, "{user_event}")?;
|
||||
|
||||
let times = FileTimes::new().set_modified(dt.into());
|
||||
file.set_times(times)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_session_file_with_meta_payload(
|
||||
root: &Path,
|
||||
ts_str: &str,
|
||||
|
|
@ -539,6 +596,31 @@ async fn test_pagination_cursor() {
|
|||
assert_eq!(page3, expected_page3);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_list_threads_scans_past_head_for_user_event() {
|
||||
let temp = TempDir::new().unwrap();
|
||||
let home = temp.path();
|
||||
|
||||
let uuid = Uuid::from_u128(99);
|
||||
let ts = "2025-05-01T10-30-00";
|
||||
write_session_file_with_delayed_user_event(home, ts, uuid, 12).unwrap();
|
||||
|
||||
let provider_filter = provider_vec(&[TEST_PROVIDER]);
|
||||
let page = get_threads(
|
||||
home,
|
||||
10,
|
||||
None,
|
||||
ThreadSortKey::CreatedAt,
|
||||
INTERACTIVE_SESSION_SOURCES,
|
||||
Some(provider_filter.as_slice()),
|
||||
TEST_PROVIDER,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(page.items.len(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_thread_contents() {
|
||||
let temp = TempDir::new().unwrap();
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue