From 7ed3e3760d4faa8035b71dc17156a12c4636bd65 Mon Sep 17 00:00:00 2001 From: Michael Bolin Date: Thu, 19 Feb 2026 13:51:18 -0800 Subject: [PATCH] tests(thread_resume): interrupt running turns in resume error-path tests (#12269) ## Why `thread_resume` tests can intentionally create an in-flight turn, assert a `thread/resume` error path, and return immediately. That leaves turn work active during teardown, which can surface as intermittent `LEAK` failures. Sample output that motivated this investigation (reported during test runs): ```text LEAK ... codex-app-server::all suite::v2::thread_resume::thread_resume_rejoins_running_thread_even_with_override_mismatch ``` ## What Changed Updated only `codex-rs/app-server/tests/suite/v2/thread_resume.rs`: - `thread_resume_rejects_history_when_thread_is_running` - `thread_resume_rejects_mismatched_path_when_thread_is_running` Both tests now: 1. capture the running turn id from `TurnStartResponse` 2. assert the expected `thread/resume` error 3. call `turn/interrupt` for that running turn 4. wait for `codex/event/turn_aborted` before returning ## Why This Is The Correct Fix These tests are specifically validating resume behavior while a turn is active. They should also own cleanup of that active turn before exiting. Explicitly interrupting and waiting for the terminal abort notification removes teardown races and avoids relying on process-drop behavior to clean up in-flight work. ## Repro / Verification Repro command used for investigation: ```bash cargo nextest run -p codex-app-server -j 2 --no-fail-fast --stress-count 50 --status-level leak --final-status-level fail -E 'test(suite::v2::thread_resume::thread_resume_rejoins_running_thread_even_with_override_mismatch) | test(suite::v2::thread_resume::thread_resume_rejects_history_when_thread_is_running) | test(suite::v2::thread_resume::thread_resume_rejects_mismatched_path_when_thread_is_running) | test(suite::v2::thread_resume::thread_resume_keeps_in_flight_turn_streaming)' ``` Observed before this change: intermittent `LEAK` in `thread_resume_rejects_history_when_thread_is_running`. Also verified with: - `cargo test -p codex-app-server` --- [//]: # (BEGIN SAPLING FOOTER) Stack created with [Sapling](https://sapling-scm.com). Best reviewed with [ReviewStack](https://reviewstack.dev/openai/codex/pull/12269). * #12271 * __->__ #12269 --- .../tests/suite/v2/thread_resume.rs | 75 ++++++++++++++++--- 1 file changed, 65 insertions(+), 10 deletions(-) diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index 576f03156..4a57767ab 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -15,7 +15,10 @@ use codex_app_server_protocol::ThreadResumeResponse; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::ThreadStatus; +use codex_app_server_protocol::TurnInterruptParams; +use codex_app_server_protocol::TurnInterruptResponse; use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::TurnStartResponse; use codex_app_server_protocol::TurnStatus; use codex_app_server_protocol::UserInput; use codex_protocol::config_types::Personality; @@ -358,9 +361,10 @@ async fn thread_resume_rejects_history_when_thread_is_running() -> Result<()> { .await??; primary.clear_message_buffer(); - let running_turn_id = primary + let thread_id = thread.id.clone(); + let running_turn_request_id = primary .send_turn_start_request(TurnStartParams { - thread_id: thread.id.clone(), + thread_id: thread_id.clone(), input: vec![UserInput::Text { text: "keep running".to_string(), text_elements: Vec::new(), @@ -368,11 +372,13 @@ async fn thread_resume_rejects_history_when_thread_is_running() -> Result<()> { ..Default::default() }) .await?; - timeout( + let running_turn_resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, - primary.read_stream_until_response_message(RequestId::Integer(running_turn_id)), + primary.read_stream_until_response_message(RequestId::Integer(running_turn_request_id)), ) .await??; + let TurnStartResponse { turn: running_turn } = + to_response::(running_turn_resp)?; timeout( DEFAULT_READ_TIMEOUT, primary.read_stream_until_notification_message("turn/started"), @@ -381,7 +387,7 @@ async fn thread_resume_rejects_history_when_thread_is_running() -> Result<()> { let resume_id = primary .send_thread_resume_request(ThreadResumeParams { - thread_id: thread.id, + thread_id: thread_id.clone(), history: Some(vec![ResponseItem::Message { id: None, role: "user".to_string(), @@ -407,6 +413,29 @@ async fn thread_resume_rejects_history_when_thread_is_running() -> Result<()> { resume_err.error.message ); + // This test intentionally keeps a turn running to exercise the resume error path. + // Keep this explicit interrupt + turn_aborted wait so teardown does not leave + // in-flight work behind (which can show up as LEAK in nextest). + let interrupt_id = primary + .send_turn_interrupt_request(TurnInterruptParams { + thread_id, + turn_id: running_turn.id, + }) + .await?; + let interrupt_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(interrupt_id)), + ) + .await??; + let _turn_interrupt_response: TurnInterruptResponse = + to_response::(interrupt_resp)?; + + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_notification_message("codex/event/turn_aborted"), + ) + .await??; + Ok(()) } @@ -462,9 +491,10 @@ async fn thread_resume_rejects_mismatched_path_when_thread_is_running() -> Resul .await??; primary.clear_message_buffer(); - let running_turn_id = primary + let thread_id = thread.id.clone(); + let running_turn_request_id = primary .send_turn_start_request(TurnStartParams { - thread_id: thread.id.clone(), + thread_id: thread_id.clone(), input: vec![UserInput::Text { text: "keep running".to_string(), text_elements: Vec::new(), @@ -472,11 +502,13 @@ async fn thread_resume_rejects_mismatched_path_when_thread_is_running() -> Resul ..Default::default() }) .await?; - timeout( + let running_turn_resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, - primary.read_stream_until_response_message(RequestId::Integer(running_turn_id)), + primary.read_stream_until_response_message(RequestId::Integer(running_turn_request_id)), ) .await??; + let TurnStartResponse { turn: running_turn } = + to_response::(running_turn_resp)?; timeout( DEFAULT_READ_TIMEOUT, primary.read_stream_until_notification_message("turn/started"), @@ -485,7 +517,7 @@ async fn thread_resume_rejects_mismatched_path_when_thread_is_running() -> Resul let resume_id = primary .send_thread_resume_request(ThreadResumeParams { - thread_id: thread.id, + thread_id: thread_id.clone(), path: Some(PathBuf::from("/tmp/does-not-match-running-rollout.jsonl")), ..Default::default() }) @@ -501,6 +533,29 @@ async fn thread_resume_rejects_mismatched_path_when_thread_is_running() -> Resul resume_err.error.message ); + // This test intentionally keeps a turn running to exercise the resume error path. + // Keep this explicit interrupt + turn_aborted wait so teardown does not leave + // in-flight work behind (which can show up as LEAK in nextest). + let interrupt_id = primary + .send_turn_interrupt_request(TurnInterruptParams { + thread_id, + turn_id: running_turn.id, + }) + .await?; + let interrupt_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(interrupt_id)), + ) + .await??; + let _turn_interrupt_response: TurnInterruptResponse = + to_response::(interrupt_resp)?; + + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_notification_message("codex/event/turn_aborted"), + ) + .await??; + Ok(()) }