From fb0aaf94de7db12e22a293f5b0dd26aa6c8d6a8e Mon Sep 17 00:00:00 2001 From: Max Johnson <162359438+maxj-oai@users.noreply.github.com> Date: Fri, 13 Feb 2026 15:09:58 -0800 Subject: [PATCH] codex-rs: fix thread resume rejoin semantics (#11756) ## Summary - always rejoin an in-memory running thread on `thread/resume`, even when overrides are present - reject `thread/resume` when `history` is provided for a running thread - reject `thread/resume` when `path` mismatches the running thread rollout path - warn (but do not fail) on override mismatches for running threads - add more `thread_resume` integration tests and fixes; including restart-based resume-with-overrides coverage ## Validation - `just fmt` - `cargo test -p codex-app-server --test all thread_resume` - manual test with app-server-test-client https://github.com/openai/codex/pull/11755 - manual test both stdio and websocket in app --- codex-rs/app-server-test-client/README.md | 30 + .../app-server/src/codex_message_processor.rs | 498 ++++++++++++---- .../tests/suite/v2/thread_resume.rs | 531 ++++++++++++++++-- 3 files changed, 917 insertions(+), 142 deletions(-) diff --git a/codex-rs/app-server-test-client/README.md b/codex-rs/app-server-test-client/README.md index 5ef6e2344..ab11f1235 100644 --- a/codex-rs/app-server-test-client/README.md +++ b/codex-rs/app-server-test-client/README.md @@ -17,3 +17,33 @@ cargo run -p codex-app-server-test-client -- \ # 3) Call app-server (defaults to ws://127.0.0.1:4222) cargo run -p codex-app-server-test-client -- model-list ``` + +## Testing Thread Rejoin Behavior + +Build and start an app server using commands above. The app-server log is written to `/tmp/codex-app-server-test-client/app-server.log` + +### 1) Get a thread id + +Create at least one thread, then list threads: + +```bash +cargo run -p codex-app-server-test-client -- send-message-v2 "seed thread for rejoin test" +cargo run -p codex-app-server-test-client -- thread-list --limit 5 +``` + +Copy a thread id from the `thread-list` output. + +### 2) Rejoin while a turn is in progress (two terminals) + +Terminal A: + +```bash +cargo run --bin codex-app-server-test-client -- \ + resume-message-v2 "respond with thorough docs on the rust core" +``` + +Terminal B (while Terminal A is still streaming): + +```bash +cargo run --bin codex-app-server-test-client -- thread-resume +``` diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 198892d0a..156a2e313 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -2689,6 +2689,13 @@ impl CodexMessageProcessor { } async fn thread_resume(&mut self, request_id: ConnectionRequestId, params: ThreadResumeParams) { + if self + .resume_running_thread(request_id.clone(), ¶ms) + .await + { + return; + } + let ThreadResumeParams { thread_id, history, @@ -2706,77 +2713,21 @@ impl CodexMessageProcessor { } = params; let thread_history = if let Some(history) = history { - if history.is_empty() { - self.send_invalid_request_error( - request_id, - "history must not be empty".to_string(), - ) - .await; + let Some(thread_history) = self + .resume_thread_from_history(request_id.clone(), history.as_slice()) + .await + else { return; - } - InitialHistory::Forked(history.into_iter().map(RolloutItem::ResponseItem).collect()) - } else if let Some(path) = path { - match RolloutRecorder::get_rollout_history(&path).await { - Ok(initial_history) => initial_history, - Err(err) => { - self.send_invalid_request_error( - request_id, - format!("failed to load rollout `{}`: {err}", path.display()), - ) - .await; - return; - } - } + }; + thread_history } else { - let existing_thread_id = match ThreadId::from_string(&thread_id) { - Ok(id) => id, - Err(err) => { - let error = JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!("invalid thread id: {err}"), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - return; - } + let Some(thread_history) = self + .resume_thread_from_rollout(request_id.clone(), &thread_id, path.as_ref()) + .await + else { + return; }; - - let path = match find_thread_path_by_id_str( - &self.config.codex_home, - &existing_thread_id.to_string(), - ) - .await - { - Ok(Some(p)) => p, - Ok(None) => { - self.send_invalid_request_error( - request_id, - format!("no rollout found for thread id {existing_thread_id}"), - ) - .await; - return; - } - Err(err) => { - self.send_invalid_request_error( - request_id, - format!("failed to locate thread id {existing_thread_id}: {err}"), - ) - .await; - return; - } - }; - - match RolloutRecorder::get_rollout_history(&path).await { - Ok(initial_history) => initial_history, - Err(err) => { - self.send_invalid_request_error( - request_id, - format!("failed to load rollout `{}`: {err}", path.display()), - ) - .await; - return; - } - } + thread_history }; let history_cwd = thread_history.session_cwd(); @@ -2857,41 +2808,17 @@ impl CodexMessageProcessor { ); } - let mut thread = match read_summary_from_rollout( - rollout_path.as_path(), - fallback_model_provider.as_str(), - ) - .await - { - Ok(summary) => summary_to_thread(summary), - Err(err) => { - self.send_internal_error( - request_id, - format!( - "failed to load rollout `{}` for thread {thread_id}: {err}", - rollout_path.display() - ), - ) - .await; - return; - } + let Some(thread) = self + .load_thread_from_rollout_or_send_internal( + request_id.clone(), + thread_id, + rollout_path.as_path(), + fallback_model_provider.as_str(), + ) + .await + else { + return; }; - match read_rollout_items_from_rollout(rollout_path.as_path()).await { - Ok(items) => { - thread.turns = build_turns_from_rollout_items(&items); - } - Err(err) => { - self.send_internal_error( - request_id, - format!( - "failed to load rollout `{}` for thread {thread_id}: {err}", - rollout_path.display() - ), - ) - .await; - return; - } - } let response = ThreadResumeResponse { thread, @@ -2916,6 +2843,278 @@ impl CodexMessageProcessor { } } + async fn resume_running_thread( + &mut self, + request_id: ConnectionRequestId, + params: &ThreadResumeParams, + ) -> bool { + if let Ok(existing_thread_id) = ThreadId::from_string(¶ms.thread_id) + && let Ok(existing_thread) = self.thread_manager.get_thread(existing_thread_id).await + { + if params.history.is_some() { + self.send_invalid_request_error( + request_id, + format!( + "cannot resume thread {existing_thread_id} with history while it is already running" + ), + ) + .await; + return true; + } + + let rollout_path = if let Some(path) = existing_thread.rollout_path() { + if path.exists() { + path + } else { + match find_thread_path_by_id_str( + &self.config.codex_home, + &existing_thread_id.to_string(), + ) + .await + { + Ok(Some(path)) => path, + Ok(None) => { + self.send_invalid_request_error( + request_id, + format!("no rollout found for thread id {existing_thread_id}"), + ) + .await; + return true; + } + Err(err) => { + self.send_invalid_request_error( + request_id, + format!("failed to locate thread id {existing_thread_id}: {err}"), + ) + .await; + return true; + } + } + } + } else { + match find_thread_path_by_id_str( + &self.config.codex_home, + &existing_thread_id.to_string(), + ) + .await + { + Ok(Some(path)) => path, + Ok(None) => { + self.send_invalid_request_error( + request_id, + format!("no rollout found for thread id {existing_thread_id}"), + ) + .await; + return true; + } + Err(err) => { + self.send_invalid_request_error( + request_id, + format!("failed to locate thread id {existing_thread_id}: {err}"), + ) + .await; + return true; + } + } + }; + + if let Some(requested_path) = params.path.as_ref() + && requested_path != &rollout_path + { + self.send_invalid_request_error( + request_id, + format!( + "cannot resume running thread {existing_thread_id} with mismatched path: requested `{}`, active `{}`", + requested_path.display(), + rollout_path.display() + ), + ) + .await; + return true; + } + + if let Err(err) = self + .ensure_conversation_listener( + existing_thread_id, + request_id.connection_id, + false, + ApiVersion::V2, + ) + .await + { + tracing::warn!( + "failed to attach listener for thread {}: {}", + existing_thread_id, + err.message + ); + } + + let config_snapshot = existing_thread.config_snapshot().await; + let mismatch_details = collect_resume_override_mismatches(params, &config_snapshot); + if !mismatch_details.is_empty() { + tracing::warn!( + "thread/resume overrides ignored for running thread {}: {}", + existing_thread_id, + mismatch_details.join("; ") + ); + } + + let Some(thread) = self + .load_thread_from_rollout_or_send_internal( + request_id.clone(), + existing_thread_id, + rollout_path.as_path(), + config_snapshot.model_provider_id.as_str(), + ) + .await + else { + return true; + }; + + let ThreadConfigSnapshot { + model, + model_provider_id, + approval_policy, + sandbox_policy, + cwd, + reasoning_effort, + .. + } = config_snapshot; + let response = ThreadResumeResponse { + thread, + model, + model_provider: model_provider_id, + cwd, + approval_policy: approval_policy.into(), + sandbox: sandbox_policy.into(), + reasoning_effort, + }; + self.outgoing.send_response(request_id, response).await; + return true; + } + false + } + + async fn resume_thread_from_history( + &self, + request_id: ConnectionRequestId, + history: &[ResponseItem], + ) -> Option { + if history.is_empty() { + self.send_invalid_request_error(request_id, "history must not be empty".to_string()) + .await; + return None; + } + Some(InitialHistory::Forked( + history + .iter() + .cloned() + .map(RolloutItem::ResponseItem) + .collect(), + )) + } + + async fn resume_thread_from_rollout( + &self, + request_id: ConnectionRequestId, + thread_id: &str, + path: Option<&PathBuf>, + ) -> Option { + let rollout_path = if let Some(path) = path { + path.clone() + } else { + let existing_thread_id = match ThreadId::from_string(thread_id) { + Ok(id) => id, + Err(err) => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("invalid thread id: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return None; + } + }; + + match find_thread_path_by_id_str( + &self.config.codex_home, + &existing_thread_id.to_string(), + ) + .await + { + Ok(Some(path)) => path, + Ok(None) => { + self.send_invalid_request_error( + request_id, + format!("no rollout found for thread id {existing_thread_id}"), + ) + .await; + return None; + } + Err(err) => { + self.send_invalid_request_error( + request_id, + format!("failed to locate thread id {existing_thread_id}: {err}"), + ) + .await; + return None; + } + } + }; + + match RolloutRecorder::get_rollout_history(&rollout_path).await { + Ok(initial_history) => Some(initial_history), + Err(err) => { + self.send_invalid_request_error( + request_id, + format!("failed to load rollout `{}`: {err}", rollout_path.display()), + ) + .await; + None + } + } + } + + async fn load_thread_from_rollout_or_send_internal( + &self, + request_id: ConnectionRequestId, + thread_id: ThreadId, + rollout_path: &Path, + fallback_provider: &str, + ) -> Option { + let mut thread = match read_summary_from_rollout(rollout_path, fallback_provider).await { + Ok(summary) => summary_to_thread(summary), + Err(err) => { + self.send_internal_error( + request_id, + format!( + "failed to load rollout `{}` for thread {thread_id}: {err}", + rollout_path.display() + ), + ) + .await; + return None; + } + }; + match read_rollout_items_from_rollout(rollout_path).await { + Ok(items) => { + thread.turns = build_turns_from_rollout_items(&items); + Some(thread) + } + Err(err) => { + self.send_internal_error( + request_id, + format!( + "failed to load rollout `{}` for thread {thread_id}: {err}", + rollout_path.display() + ), + ) + .await; + None + } + } + } + async fn thread_fork(&mut self, request_id: ConnectionRequestId, params: ThreadForkParams) { let ThreadForkParams { thread_id, @@ -5808,6 +6007,101 @@ impl CodexMessageProcessor { } } +fn collect_resume_override_mismatches( + request: &ThreadResumeParams, + config_snapshot: &ThreadConfigSnapshot, +) -> Vec { + let mut mismatch_details = Vec::new(); + + if let Some(requested_model) = request.model.as_deref() + && requested_model != config_snapshot.model + { + mismatch_details.push(format!( + "model requested={requested_model} active={}", + config_snapshot.model + )); + } + if let Some(requested_provider) = request.model_provider.as_deref() + && requested_provider != config_snapshot.model_provider_id + { + mismatch_details.push(format!( + "model_provider requested={requested_provider} active={}", + config_snapshot.model_provider_id + )); + } + if let Some(requested_cwd) = request.cwd.as_deref() { + let requested_cwd_path = std::path::PathBuf::from(requested_cwd); + if requested_cwd_path != config_snapshot.cwd { + mismatch_details.push(format!( + "cwd requested={} active={}", + requested_cwd_path.display(), + config_snapshot.cwd.display() + )); + } + } + if let Some(requested_approval) = request.approval_policy.as_ref() { + let active_approval: AskForApproval = config_snapshot.approval_policy.into(); + if requested_approval != &active_approval { + mismatch_details.push(format!( + "approval_policy requested={requested_approval:?} active={active_approval:?}" + )); + } + } + if let Some(requested_sandbox) = request.sandbox.as_ref() { + let sandbox_matches = matches!( + (requested_sandbox, &config_snapshot.sandbox_policy), + ( + SandboxMode::ReadOnly, + codex_protocol::protocol::SandboxPolicy::ReadOnly { .. } + ) | ( + SandboxMode::WorkspaceWrite, + codex_protocol::protocol::SandboxPolicy::WorkspaceWrite { .. } + ) | ( + SandboxMode::DangerFullAccess, + codex_protocol::protocol::SandboxPolicy::DangerFullAccess + ) | ( + SandboxMode::DangerFullAccess, + codex_protocol::protocol::SandboxPolicy::ExternalSandbox { .. } + ) + ); + if !sandbox_matches { + mismatch_details.push(format!( + "sandbox requested={requested_sandbox:?} active={:?}", + config_snapshot.sandbox_policy + )); + } + } + if let Some(requested_personality) = request.personality.as_ref() + && config_snapshot.personality.as_ref() != Some(requested_personality) + { + mismatch_details.push(format!( + "personality requested={requested_personality:?} active={:?}", + config_snapshot.personality + )); + } + + if request.config.is_some() { + mismatch_details + .push("config overrides were provided and ignored while running".to_string()); + } + if request.base_instructions.is_some() { + mismatch_details + .push("baseInstructions override was provided and ignored while running".to_string()); + } + if request.developer_instructions.is_some() { + mismatch_details.push( + "developerInstructions override was provided and ignored while running".to_string(), + ); + } + if request.persist_extended_history { + mismatch_details.push( + "persistExtendedHistory override was provided and ignored while running".to_string(), + ); + } + + mismatch_details +} + fn skills_to_info( skills: &[codex_core::skills::SkillMetadata], disabled_paths: &std::collections::HashSet, diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index a92d24ed0..e001d4a7f 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -209,18 +209,412 @@ async fn thread_resume_without_overrides_does_not_change_updated_at_or_mtime() - Ok(()) } +#[tokio::test] +async fn thread_resume_keeps_in_flight_turn_streaming() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut primary = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??; + + let start_id = primary + .send_thread_start_request(ThreadStartParams { + model: Some("gpt-5.1-codex-max".to_string()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let seed_turn_id = primary + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![UserInput::Text { + text: "seed history".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(seed_turn_id)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_notification_message("turn/completed"), + ) + .await??; + primary.clear_message_buffer(); + + let mut secondary = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, secondary.initialize()).await??; + + let turn_id = primary + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![UserInput::Text { + text: "respond with docs".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(turn_id)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_notification_message("turn/started"), + ) + .await??; + + let resume_id = secondary + .send_thread_resume_request(ThreadResumeParams { + thread_id: thread.id, + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + secondary.read_stream_until_response_message(RequestId::Integer(resume_id)), + ) + .await??; + + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + Ok(()) +} + +#[tokio::test] +async fn thread_resume_rejects_history_when_thread_is_running() -> Result<()> { + let server = responses::start_mock_server().await; + let first_body = responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_assistant_message("msg-1", "Done"), + responses::ev_completed("resp-1"), + ]); + let second_body = responses::sse(vec![responses::ev_response_created("resp-2")]); + let _first_response_mock = responses::mount_sse_once(&server, first_body).await; + let _second_response_mock = responses::mount_sse_once(&server, second_body).await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut primary = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??; + + let start_id = primary + .send_thread_start_request(ThreadStartParams { + model: Some("gpt-5.1-codex-max".to_string()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let seed_turn_id = primary + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![UserInput::Text { + text: "seed history".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(seed_turn_id)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_notification_message("turn/completed"), + ) + .await??; + primary.clear_message_buffer(); + + let running_turn_id = primary + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![UserInput::Text { + text: "keep running".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(running_turn_id)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_notification_message("turn/started"), + ) + .await??; + + let resume_id = primary + .send_thread_resume_request(ThreadResumeParams { + thread_id: thread.id, + history: Some(vec![ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "history override".to_string(), + }], + end_turn: None, + phase: None, + }]), + ..Default::default() + }) + .await?; + let resume_err: JSONRPCError = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_error_message(RequestId::Integer(resume_id)), + ) + .await??; + assert!( + resume_err.error.message.contains("cannot resume thread") + && resume_err.error.message.contains("with history") + && resume_err.error.message.contains("running"), + "unexpected resume error: {}", + resume_err.error.message + ); + + Ok(()) +} + +#[tokio::test] +async fn thread_resume_rejects_mismatched_path_when_thread_is_running() -> Result<()> { + let server = responses::start_mock_server().await; + let first_body = responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_assistant_message("msg-1", "Done"), + responses::ev_completed("resp-1"), + ]); + let second_body = responses::sse(vec![responses::ev_response_created("resp-2")]); + let _first_response_mock = responses::mount_sse_once(&server, first_body).await; + let _second_response_mock = responses::mount_sse_once(&server, second_body).await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut primary = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??; + + let start_id = primary + .send_thread_start_request(ThreadStartParams { + model: Some("gpt-5.1-codex-max".to_string()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let seed_turn_id = primary + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![UserInput::Text { + text: "seed history".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(seed_turn_id)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_notification_message("turn/completed"), + ) + .await??; + primary.clear_message_buffer(); + + let running_turn_id = primary + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![UserInput::Text { + text: "keep running".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(running_turn_id)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_notification_message("turn/started"), + ) + .await??; + + let resume_id = primary + .send_thread_resume_request(ThreadResumeParams { + thread_id: thread.id, + path: Some(PathBuf::from("/tmp/does-not-match-running-rollout.jsonl")), + ..Default::default() + }) + .await?; + let resume_err: JSONRPCError = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_error_message(RequestId::Integer(resume_id)), + ) + .await??; + assert!( + resume_err.error.message.contains("mismatched path"), + "unexpected resume error: {}", + resume_err.error.message + ); + + Ok(()) +} + +#[tokio::test] +async fn thread_resume_rejoins_running_thread_even_with_override_mismatch() -> Result<()> { + let server = responses::start_mock_server().await; + let first_body = responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_assistant_message("msg-1", "Done"), + responses::ev_completed("resp-1"), + ]); + let second_body = responses::sse(vec![responses::ev_response_created("resp-2")]); + let _response_mock = + responses::mount_sse_sequence(&server, vec![first_body, second_body]).await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut primary = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??; + + let start_id = primary + .send_thread_start_request(ThreadStartParams { + model: Some("gpt-5.1-codex-max".to_string()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let seed_turn_id = primary + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![UserInput::Text { + text: "seed history".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(seed_turn_id)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_notification_message("turn/completed"), + ) + .await??; + primary.clear_message_buffer(); + + let running_turn_id = primary + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![UserInput::Text { + text: "keep running".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(running_turn_id)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_notification_message("turn/started"), + ) + .await??; + + let resume_id = primary + .send_thread_resume_request(ThreadResumeParams { + thread_id: thread.id.clone(), + model: Some("not-the-running-model".to_string()), + cwd: Some("/tmp".to_string()), + ..Default::default() + }) + .await?; + let resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(resume_id)), + ) + .await??; + let ThreadResumeResponse { model, .. } = to_response::(resume_resp)?; + assert_eq!(model, "gpt-5.1-codex-max"); + + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + Ok(()) +} + #[tokio::test] async fn thread_resume_with_overrides_defers_updated_at_until_turn_start() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; let codex_home = TempDir::new()?; - let rollout = setup_rollout_fixture(codex_home.path(), &server.uri())?; + create_config_toml(codex_home.path(), &server.uri())?; - let mut mcp = McpProcess::new(codex_home.path()).await?; - timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + let RestartedThreadFixture { + mut mcp, + thread_id, + rollout_file_path, + } = start_materialized_thread_and_restart(codex_home.path(), "materialize").await?; + let expected_updated_at_rfc3339 = "2025-01-07T00:00:00Z"; + set_rollout_mtime(rollout_file_path.as_path(), expected_updated_at_rfc3339)?; + let before_modified = std::fs::metadata(&rollout_file_path)?.modified()?; + let expected_updated_at = chrono::DateTime::parse_from_rfc3339(expected_updated_at_rfc3339)? + .with_timezone(&Utc) + .timestamp(); let resume_id = mcp .send_thread_resume_request(ThreadResumeParams { - thread_id: rollout.conversation_id.clone(), + thread_id, model: Some("mock-model".to_string()), ..Default::default() }) @@ -230,16 +624,19 @@ async fn thread_resume_with_overrides_defers_updated_at_until_turn_start() -> Re mcp.read_stream_until_response_message(RequestId::Integer(resume_id)), ) .await??; - let ThreadResumeResponse { thread, .. } = to_response::(resume_resp)?; + let ThreadResumeResponse { + thread: resumed_thread, + .. + } = to_response::(resume_resp)?; - assert_eq!(thread.updated_at, rollout.expected_updated_at); + assert_eq!(resumed_thread.updated_at, expected_updated_at); - let after_resume_modified = std::fs::metadata(&rollout.rollout_file_path)?.modified()?; - assert_eq!(after_resume_modified, rollout.before_modified); + let after_resume_modified = std::fs::metadata(&rollout_file_path)?.modified()?; + assert_eq!(after_resume_modified, before_modified); let turn_id = mcp .send_turn_start_request(TurnStartParams { - thread_id: rollout.conversation_id, + thread_id: resumed_thread.id, input: vec![UserInput::Text { text: "Hello".to_string(), text_elements: Vec::new(), @@ -258,8 +655,8 @@ async fn thread_resume_with_overrides_defers_updated_at_until_turn_start() -> Re ) .await??; - let after_turn_modified = std::fs::metadata(&rollout.rollout_file_path)?.modified()?; - assert!(after_turn_modified > rollout.before_modified); + let after_turn_modified = std::fs::metadata(&rollout_file_path)?.modified()?; + assert!(after_turn_modified > before_modified); Ok(()) } @@ -374,22 +771,9 @@ async fn thread_resume_supports_history_and_overrides() -> Result<()> { let codex_home = TempDir::new()?; create_config_toml(codex_home.path(), &server.uri())?; - let mut mcp = McpProcess::new(codex_home.path()).await?; - timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; - - // Start a thread. - let start_id = mcp - .send_thread_start_request(ThreadStartParams { - model: Some("gpt-5.1-codex-max".to_string()), - ..Default::default() - }) - .await?; - let start_resp: JSONRPCResponse = timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_response_message(RequestId::Integer(start_id)), - ) - .await??; - let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + let RestartedThreadFixture { + mut mcp, thread_id, .. + } = start_materialized_thread_and_restart(codex_home.path(), "seed history").await?; let history_text = "Hello from history"; let history = vec![ResponseItem::Message { @@ -405,7 +789,7 @@ async fn thread_resume_supports_history_and_overrides() -> Result<()> { // Resume with explicit history and override the model. let resume_id = mcp .send_thread_resume_request(ThreadResumeParams { - thread_id: thread.id, + thread_id, history: Some(history), model: Some("mock-model".to_string()), model_provider: Some("mock_provider".to_string()), @@ -429,6 +813,70 @@ async fn thread_resume_supports_history_and_overrides() -> Result<()> { Ok(()) } +struct RestartedThreadFixture { + mcp: McpProcess, + thread_id: String, + rollout_file_path: PathBuf, +} + +async fn start_materialized_thread_and_restart( + codex_home: &Path, + seed_text: &str, +) -> Result { + let mut first_mcp = McpProcess::new(codex_home).await?; + timeout(DEFAULT_READ_TIMEOUT, first_mcp.initialize()).await??; + + let start_id = first_mcp + .send_thread_start_request(ThreadStartParams { + model: Some("gpt-5.1-codex-max".to_string()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + first_mcp.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let materialize_turn_id = first_mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![UserInput::Text { + text: seed_text.to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + first_mcp.read_stream_until_response_message(RequestId::Integer(materialize_turn_id)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + first_mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + let thread_id = thread.id; + let rollout_file_path = thread + .path + .ok_or_else(|| anyhow::anyhow!("thread path missing from thread/start response"))?; + + drop(first_mcp); + + let mut second_mcp = McpProcess::new(codex_home).await?; + timeout(DEFAULT_READ_TIMEOUT, second_mcp.initialize()).await??; + + Ok(RestartedThreadFixture { + mcp: second_mcp, + thread_id, + rollout_file_path, + }) +} + #[tokio::test] async fn thread_resume_accepts_personality_override() -> Result<()> { skip_if_no_network!(Ok(())); @@ -449,10 +897,10 @@ async fn thread_resume_accepts_personality_override() -> Result<()> { let codex_home = TempDir::new()?; create_config_toml(codex_home.path(), &server.uri())?; - let mut mcp = McpProcess::new(codex_home.path()).await?; - timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + let mut primary = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??; - let start_id = mcp + let start_id = primary .send_thread_start_request(ThreadStartParams { model: Some("gpt-5.2-codex".to_string()), ..Default::default() @@ -460,12 +908,12 @@ async fn thread_resume_accepts_personality_override() -> Result<()> { .await?; let start_resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_response_message(RequestId::Integer(start_id)), + primary.read_stream_until_response_message(RequestId::Integer(start_id)), ) .await??; let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; - let materialize_id = mcp + let materialize_id = primary .send_turn_start_request(TurnStartParams { thread_id: thread.id.clone(), input: vec![UserInput::Text { @@ -477,16 +925,19 @@ async fn thread_resume_accepts_personality_override() -> Result<()> { .await?; timeout( DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_response_message(RequestId::Integer(materialize_id)), + primary.read_stream_until_response_message(RequestId::Integer(materialize_id)), ) .await??; timeout( DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_notification_message("turn/completed"), + primary.read_stream_until_notification_message("turn/completed"), ) .await??; - let resume_id = mcp + let mut secondary = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, secondary.initialize()).await??; + + let resume_id = secondary .send_thread_resume_request(ThreadResumeParams { thread_id: thread.id, model: Some("gpt-5.2-codex".to_string()), @@ -496,12 +947,12 @@ async fn thread_resume_accepts_personality_override() -> Result<()> { .await?; let resume_resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_response_message(RequestId::Integer(resume_id)), + secondary.read_stream_until_response_message(RequestId::Integer(resume_id)), ) .await??; let resume: ThreadResumeResponse = to_response::(resume_resp)?; - let turn_id = mcp + let turn_id = secondary .send_turn_start_request(TurnStartParams { thread_id: resume.thread.id, input: vec![UserInput::Text { @@ -513,13 +964,13 @@ async fn thread_resume_accepts_personality_override() -> Result<()> { .await?; timeout( DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_response_message(RequestId::Integer(turn_id)), + secondary.read_stream_until_response_message(RequestId::Integer(turn_id)), ) .await??; timeout( DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_notification_message("turn/completed"), + secondary.read_stream_until_notification_message("turn/completed"), ) .await??;