diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index 576f03156..4a57767ab 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -15,7 +15,10 @@ use codex_app_server_protocol::ThreadResumeResponse; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::ThreadStatus; +use codex_app_server_protocol::TurnInterruptParams; +use codex_app_server_protocol::TurnInterruptResponse; use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::TurnStartResponse; use codex_app_server_protocol::TurnStatus; use codex_app_server_protocol::UserInput; use codex_protocol::config_types::Personality; @@ -358,9 +361,10 @@ async fn thread_resume_rejects_history_when_thread_is_running() -> Result<()> { .await??; primary.clear_message_buffer(); - let running_turn_id = primary + let thread_id = thread.id.clone(); + let running_turn_request_id = primary .send_turn_start_request(TurnStartParams { - thread_id: thread.id.clone(), + thread_id: thread_id.clone(), input: vec![UserInput::Text { text: "keep running".to_string(), text_elements: Vec::new(), @@ -368,11 +372,13 @@ async fn thread_resume_rejects_history_when_thread_is_running() -> Result<()> { ..Default::default() }) .await?; - timeout( + let running_turn_resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, - primary.read_stream_until_response_message(RequestId::Integer(running_turn_id)), + primary.read_stream_until_response_message(RequestId::Integer(running_turn_request_id)), ) .await??; + let TurnStartResponse { turn: running_turn } = + to_response::(running_turn_resp)?; timeout( DEFAULT_READ_TIMEOUT, primary.read_stream_until_notification_message("turn/started"), @@ -381,7 +387,7 @@ async fn thread_resume_rejects_history_when_thread_is_running() -> Result<()> { let resume_id = primary .send_thread_resume_request(ThreadResumeParams { - thread_id: thread.id, + thread_id: thread_id.clone(), history: Some(vec![ResponseItem::Message { id: None, role: "user".to_string(), @@ -407,6 +413,29 @@ async fn thread_resume_rejects_history_when_thread_is_running() -> Result<()> { resume_err.error.message ); + // This test intentionally keeps a turn running to exercise the resume error path. + // Keep this explicit interrupt + turn_aborted wait so teardown does not leave + // in-flight work behind (which can show up as LEAK in nextest). + let interrupt_id = primary + .send_turn_interrupt_request(TurnInterruptParams { + thread_id, + turn_id: running_turn.id, + }) + .await?; + let interrupt_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(interrupt_id)), + ) + .await??; + let _turn_interrupt_response: TurnInterruptResponse = + to_response::(interrupt_resp)?; + + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_notification_message("codex/event/turn_aborted"), + ) + .await??; + Ok(()) } @@ -462,9 +491,10 @@ async fn thread_resume_rejects_mismatched_path_when_thread_is_running() -> Resul .await??; primary.clear_message_buffer(); - let running_turn_id = primary + let thread_id = thread.id.clone(); + let running_turn_request_id = primary .send_turn_start_request(TurnStartParams { - thread_id: thread.id.clone(), + thread_id: thread_id.clone(), input: vec![UserInput::Text { text: "keep running".to_string(), text_elements: Vec::new(), @@ -472,11 +502,13 @@ async fn thread_resume_rejects_mismatched_path_when_thread_is_running() -> Resul ..Default::default() }) .await?; - timeout( + let running_turn_resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, - primary.read_stream_until_response_message(RequestId::Integer(running_turn_id)), + primary.read_stream_until_response_message(RequestId::Integer(running_turn_request_id)), ) .await??; + let TurnStartResponse { turn: running_turn } = + to_response::(running_turn_resp)?; timeout( DEFAULT_READ_TIMEOUT, primary.read_stream_until_notification_message("turn/started"), @@ -485,7 +517,7 @@ async fn thread_resume_rejects_mismatched_path_when_thread_is_running() -> Resul let resume_id = primary .send_thread_resume_request(ThreadResumeParams { - thread_id: thread.id, + thread_id: thread_id.clone(), path: Some(PathBuf::from("/tmp/does-not-match-running-rollout.jsonl")), ..Default::default() }) @@ -501,6 +533,29 @@ async fn thread_resume_rejects_mismatched_path_when_thread_is_running() -> Resul resume_err.error.message ); + // This test intentionally keeps a turn running to exercise the resume error path. + // Keep this explicit interrupt + turn_aborted wait so teardown does not leave + // in-flight work behind (which can show up as LEAK in nextest). + let interrupt_id = primary + .send_turn_interrupt_request(TurnInterruptParams { + thread_id, + turn_id: running_turn.id, + }) + .await?; + let interrupt_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(interrupt_id)), + ) + .await??; + let _turn_interrupt_response: TurnInterruptResponse = + to_response::(interrupt_resp)?; + + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_notification_message("codex/event/turn_aborted"), + ) + .await??; + Ok(()) }