tests(thread_resume): interrupt running turns in resume error-path tests (#12269)
## Why `thread_resume` tests can intentionally create an in-flight turn, assert a `thread/resume` error path, and return immediately. That leaves turn work active during teardown, which can surface as intermittent `LEAK` failures. Sample output that motivated this investigation (reported during test runs): ```text LEAK ... codex-app-server::all suite::v2::thread_resume::thread_resume_rejoins_running_thread_even_with_override_mismatch ``` ## What Changed Updated only `codex-rs/app-server/tests/suite/v2/thread_resume.rs`: - `thread_resume_rejects_history_when_thread_is_running` - `thread_resume_rejects_mismatched_path_when_thread_is_running` Both tests now: 1. capture the running turn id from `TurnStartResponse` 2. assert the expected `thread/resume` error 3. call `turn/interrupt` for that running turn 4. wait for `codex/event/turn_aborted` before returning ## Why This Is The Correct Fix These tests are specifically validating resume behavior while a turn is active. They should also own cleanup of that active turn before exiting. Explicitly interrupting and waiting for the terminal abort notification removes teardown races and avoids relying on process-drop behavior to clean up in-flight work. ## Repro / Verification Repro command used for investigation: ```bash cargo nextest run -p codex-app-server -j 2 --no-fail-fast --stress-count 50 --status-level leak --final-status-level fail -E 'test(suite::v2::thread_resume::thread_resume_rejoins_running_thread_even_with_override_mismatch) | test(suite::v2::thread_resume::thread_resume_rejects_history_when_thread_is_running) | test(suite::v2::thread_resume::thread_resume_rejects_mismatched_path_when_thread_is_running) | test(suite::v2::thread_resume::thread_resume_keeps_in_flight_turn_streaming)' ``` Observed before this change: intermittent `LEAK` in `thread_resume_rejects_history_when_thread_is_running`. Also verified with: - `cargo test -p codex-app-server` --- [//]: # (BEGIN SAPLING FOOTER) Stack created with [Sapling](https://sapling-scm.com). Best reviewed with [ReviewStack](https://reviewstack.dev/openai/codex/pull/12269). * #12271 * __->__ #12269
This commit is contained in:
parent
4edb1441a7
commit
7ed3e3760d
1 changed files with 65 additions and 10 deletions
|
|
@ -15,7 +15,10 @@ use codex_app_server_protocol::ThreadResumeResponse;
|
|||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStatus;
|
||||
use codex_app_server_protocol::TurnInterruptParams;
|
||||
use codex_app_server_protocol::TurnInterruptResponse;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
use codex_protocol::config_types::Personality;
|
||||
|
|
@ -358,9 +361,10 @@ async fn thread_resume_rejects_history_when_thread_is_running() -> Result<()> {
|
|||
.await??;
|
||||
primary.clear_message_buffer();
|
||||
|
||||
let running_turn_id = primary
|
||||
let thread_id = thread.id.clone();
|
||||
let running_turn_request_id = primary
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
thread_id: thread_id.clone(),
|
||||
input: vec![UserInput::Text {
|
||||
text: "keep running".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
|
|
@ -368,11 +372,13 @@ async fn thread_resume_rejects_history_when_thread_is_running() -> Result<()> {
|
|||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
timeout(
|
||||
let running_turn_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
primary.read_stream_until_response_message(RequestId::Integer(running_turn_id)),
|
||||
primary.read_stream_until_response_message(RequestId::Integer(running_turn_request_id)),
|
||||
)
|
||||
.await??;
|
||||
let TurnStartResponse { turn: running_turn } =
|
||||
to_response::<TurnStartResponse>(running_turn_resp)?;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
primary.read_stream_until_notification_message("turn/started"),
|
||||
|
|
@ -381,7 +387,7 @@ async fn thread_resume_rejects_history_when_thread_is_running() -> Result<()> {
|
|||
|
||||
let resume_id = primary
|
||||
.send_thread_resume_request(ThreadResumeParams {
|
||||
thread_id: thread.id,
|
||||
thread_id: thread_id.clone(),
|
||||
history: Some(vec![ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
|
|
@ -407,6 +413,29 @@ async fn thread_resume_rejects_history_when_thread_is_running() -> Result<()> {
|
|||
resume_err.error.message
|
||||
);
|
||||
|
||||
// This test intentionally keeps a turn running to exercise the resume error path.
|
||||
// Keep this explicit interrupt + turn_aborted wait so teardown does not leave
|
||||
// in-flight work behind (which can show up as LEAK in nextest).
|
||||
let interrupt_id = primary
|
||||
.send_turn_interrupt_request(TurnInterruptParams {
|
||||
thread_id,
|
||||
turn_id: running_turn.id,
|
||||
})
|
||||
.await?;
|
||||
let interrupt_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
primary.read_stream_until_response_message(RequestId::Integer(interrupt_id)),
|
||||
)
|
||||
.await??;
|
||||
let _turn_interrupt_response: TurnInterruptResponse =
|
||||
to_response::<TurnInterruptResponse>(interrupt_resp)?;
|
||||
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
primary.read_stream_until_notification_message("codex/event/turn_aborted"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -462,9 +491,10 @@ async fn thread_resume_rejects_mismatched_path_when_thread_is_running() -> Resul
|
|||
.await??;
|
||||
primary.clear_message_buffer();
|
||||
|
||||
let running_turn_id = primary
|
||||
let thread_id = thread.id.clone();
|
||||
let running_turn_request_id = primary
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
thread_id: thread_id.clone(),
|
||||
input: vec![UserInput::Text {
|
||||
text: "keep running".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
|
|
@ -472,11 +502,13 @@ async fn thread_resume_rejects_mismatched_path_when_thread_is_running() -> Resul
|
|||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
timeout(
|
||||
let running_turn_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
primary.read_stream_until_response_message(RequestId::Integer(running_turn_id)),
|
||||
primary.read_stream_until_response_message(RequestId::Integer(running_turn_request_id)),
|
||||
)
|
||||
.await??;
|
||||
let TurnStartResponse { turn: running_turn } =
|
||||
to_response::<TurnStartResponse>(running_turn_resp)?;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
primary.read_stream_until_notification_message("turn/started"),
|
||||
|
|
@ -485,7 +517,7 @@ async fn thread_resume_rejects_mismatched_path_when_thread_is_running() -> Resul
|
|||
|
||||
let resume_id = primary
|
||||
.send_thread_resume_request(ThreadResumeParams {
|
||||
thread_id: thread.id,
|
||||
thread_id: thread_id.clone(),
|
||||
path: Some(PathBuf::from("/tmp/does-not-match-running-rollout.jsonl")),
|
||||
..Default::default()
|
||||
})
|
||||
|
|
@ -501,6 +533,29 @@ async fn thread_resume_rejects_mismatched_path_when_thread_is_running() -> Resul
|
|||
resume_err.error.message
|
||||
);
|
||||
|
||||
// This test intentionally keeps a turn running to exercise the resume error path.
|
||||
// Keep this explicit interrupt + turn_aborted wait so teardown does not leave
|
||||
// in-flight work behind (which can show up as LEAK in nextest).
|
||||
let interrupt_id = primary
|
||||
.send_turn_interrupt_request(TurnInterruptParams {
|
||||
thread_id,
|
||||
turn_id: running_turn.id,
|
||||
})
|
||||
.await?;
|
||||
let interrupt_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
primary.read_stream_until_response_message(RequestId::Integer(interrupt_id)),
|
||||
)
|
||||
.await??;
|
||||
let _turn_interrupt_response: TurnInterruptResponse =
|
||||
to_response::<TurnInterruptResponse>(interrupt_resp)?;
|
||||
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
primary.read_stream_until_notification_message("codex/event/turn_aborted"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue