diff --git a/MODULE.bazel.lock b/MODULE.bazel.lock index c1ab56b65..03ad23ff7 100644 --- a/MODULE.bazel.lock +++ b/MODULE.bazel.lock @@ -612,10 +612,6 @@ "arrayvec_0.7.6": "{\"dependencies\":[{\"kind\":\"dev\",\"name\":\"bencher\",\"req\":\"^0.1.4\"},{\"default_features\":false,\"name\":\"borsh\",\"optional\":true,\"req\":\"^1.2.0\"},{\"kind\":\"dev\",\"name\":\"matches\",\"req\":\"^0.1\"},{\"default_features\":false,\"name\":\"serde\",\"optional\":true,\"req\":\"^1.0\"},{\"kind\":\"dev\",\"name\":\"serde_test\",\"req\":\"^1.0\"},{\"default_features\":false,\"name\":\"zeroize\",\"optional\":true,\"req\":\"^1.4\"}],\"features\":{\"default\":[\"std\"],\"std\":[]}}", "ascii-canvas_3.0.0": "{\"dependencies\":[{\"kind\":\"dev\",\"name\":\"diff\",\"req\":\"^0.1\"},{\"name\":\"term\",\"req\":\"^0.7\"}],\"features\":{}}", "ascii_1.1.0": "{\"dependencies\":[{\"name\":\"serde\",\"optional\":true,\"req\":\"^1.0.25\"},{\"name\":\"serde_test\",\"optional\":true,\"req\":\"^1.0\"}],\"features\":{\"alloc\":[],\"default\":[\"std\"],\"std\":[\"alloc\"]}}", - "askama_0.15.4": "{\"dependencies\":[{\"default_features\":false,\"name\":\"askama_macros\",\"optional\":true,\"req\":\"=0.15.4\"},{\"kind\":\"dev\",\"name\":\"assert_matches\",\"req\":\"^1.5.0\"},{\"kind\":\"dev\",\"name\":\"criterion\",\"req\":\"^0.8\"},{\"name\":\"itoa\",\"req\":\"^1.0.11\"},{\"default_features\":false,\"name\":\"percent-encoding\",\"optional\":true,\"req\":\"^2.1.0\"},{\"default_features\":false,\"name\":\"serde\",\"optional\":true,\"req\":\"^1.0\"},{\"default_features\":false,\"name\":\"serde_json\",\"optional\":true,\"req\":\"^1.0\"}],\"features\":{\"alloc\":[\"askama_macros?/alloc\",\"serde?/alloc\",\"serde_json?/alloc\",\"percent-encoding?/alloc\"],\"code-in-doc\":[\"askama_macros?/code-in-doc\"],\"config\":[\"askama_macros?/config\"],\"default\":[\"config\",\"derive\",\"std\",\"urlencode\"],\"derive\":[\"dep:askama_macros\",\"dep:askama_macros\"],\"full\":[\"default\",\"code-in-doc\",\"serde_json\"],\"nightly-spans\":[\"askama_macros/nightly-spans\"],\"serde_json\":[\"std\",\"askama_macros?/serde_json\",\"dep:serde\",\"dep:serde_json\"],\"std\":[\"alloc\",\"askama_macros?/std\",\"serde?/std\",\"serde_json?/std\",\"percent-encoding?/std\"],\"urlencode\":[\"askama_macros?/urlencode\",\"dep:percent-encoding\"]}}", - "askama_derive_0.15.4": "{\"dependencies\":[{\"name\":\"basic-toml\",\"optional\":true,\"req\":\"^0.1.1\"},{\"kind\":\"dev\",\"name\":\"console\",\"req\":\"^0.16.0\"},{\"kind\":\"dev\",\"name\":\"criterion\",\"req\":\"^0.8\"},{\"name\":\"memchr\",\"req\":\"^2\"},{\"name\":\"parser\",\"package\":\"askama_parser\",\"req\":\"=0.15.4\"},{\"kind\":\"dev\",\"name\":\"prettyplease\",\"req\":\"^0.2.20\"},{\"default_features\":false,\"name\":\"proc-macro2\",\"req\":\"^1\"},{\"default_features\":false,\"name\":\"pulldown-cmark\",\"optional\":true,\"req\":\"^0.13.0\"},{\"default_features\":false,\"name\":\"quote\",\"req\":\"^1\"},{\"name\":\"rustc-hash\",\"req\":\"^2.0.0\"},{\"name\":\"serde\",\"optional\":true,\"req\":\"^1.0\"},{\"name\":\"serde_derive\",\"optional\":true,\"req\":\"^1.0\"},{\"kind\":\"dev\",\"name\":\"similar\",\"req\":\"^2.6.0\"},{\"default_features\":false,\"features\":[\"clone-impls\",\"derive\",\"full\",\"parsing\",\"printing\"],\"name\":\"syn\",\"req\":\"^2.0.3\"}],\"features\":{\"alloc\":[],\"code-in-doc\":[\"dep:pulldown-cmark\"],\"config\":[\"external-sources\",\"dep:basic-toml\",\"dep:serde\",\"dep:serde_derive\",\"parser/config\"],\"default\":[\"alloc\",\"code-in-doc\",\"config\",\"external-sources\",\"proc-macro\",\"serde_json\",\"std\",\"urlencode\"],\"external-sources\":[],\"nightly-spans\":[],\"proc-macro\":[\"proc-macro2/proc-macro\"],\"serde_json\":[],\"std\":[\"alloc\"],\"urlencode\":[]}}", - "askama_macros_0.15.4": "{\"dependencies\":[{\"default_features\":false,\"features\":[\"external-sources\",\"proc-macro\"],\"name\":\"askama_derive\",\"package\":\"askama_derive\",\"req\":\"=0.15.4\"}],\"features\":{\"alloc\":[\"askama_derive/alloc\"],\"code-in-doc\":[\"askama_derive/code-in-doc\"],\"config\":[\"askama_derive/config\"],\"default\":[\"config\",\"derive\",\"std\",\"urlencode\"],\"derive\":[],\"full\":[\"default\",\"code-in-doc\",\"serde_json\"],\"nightly-spans\":[\"askama_derive/nightly-spans\"],\"serde_json\":[\"askama_derive/serde_json\"],\"std\":[\"askama_derive/std\"],\"urlencode\":[\"askama_derive/urlencode\"]}}", - "askama_parser_0.15.4": "{\"dependencies\":[{\"kind\":\"dev\",\"name\":\"criterion\",\"req\":\"^0.8\"},{\"name\":\"rustc-hash\",\"req\":\"^2.0.0\"},{\"name\":\"serde\",\"optional\":true,\"req\":\"^1.0\"},{\"name\":\"serde_derive\",\"optional\":true,\"req\":\"^1.0\"},{\"name\":\"unicode-ident\",\"req\":\"^1.0.12\"},{\"features\":[\"simd\"],\"name\":\"winnow\",\"req\":\"^0.7.0\"}],\"features\":{\"config\":[\"dep:serde\",\"dep:serde_derive\"]}}", "asn1-rs-derive_0.6.0": "{\"dependencies\":[{\"name\":\"proc-macro2\",\"req\":\"^1.0\"},{\"name\":\"quote\",\"req\":\"^1.0\"},{\"features\":[\"full\"],\"name\":\"syn\",\"req\":\"^2.0\"},{\"name\":\"synstructure\",\"req\":\"^0.13\"}],\"features\":{}}", "asn1-rs-impl_0.2.0": "{\"dependencies\":[{\"name\":\"proc-macro2\",\"req\":\"^1\"},{\"name\":\"quote\",\"req\":\"^1\"},{\"name\":\"syn\",\"req\":\"^2.0\"}],\"features\":{}}", "asn1-rs_0.7.1": "{\"dependencies\":[{\"name\":\"asn1-rs-derive\",\"req\":\"^0.6\"},{\"name\":\"asn1-rs-impl\",\"req\":\"^0.2\"},{\"name\":\"bitvec\",\"optional\":true,\"req\":\"^1.0\"},{\"name\":\"colored\",\"optional\":true,\"req\":\"^3.0\"},{\"kind\":\"dev\",\"name\":\"colored\",\"req\":\"^3.0\"},{\"name\":\"cookie-factory\",\"optional\":true,\"req\":\"^0.3.0\"},{\"name\":\"displaydoc\",\"req\":\"^0.2.2\"},{\"kind\":\"dev\",\"name\":\"hex-literal\",\"req\":\"^0.4\"},{\"default_features\":false,\"features\":[\"std\"],\"name\":\"nom\",\"req\":\"^7.0\"},{\"name\":\"num-bigint\",\"optional\":true,\"req\":\"^0.4\"},{\"name\":\"num-traits\",\"req\":\"^0.2.14\"},{\"kind\":\"dev\",\"name\":\"pem\",\"req\":\"^3.0\"},{\"name\":\"rusticata-macros\",\"req\":\"^4.0\"},{\"name\":\"thiserror\",\"req\":\"^2.0.0\"},{\"features\":[\"macros\",\"parsing\",\"formatting\"],\"name\":\"time\",\"optional\":true,\"req\":\"^0.3\"},{\"kind\":\"dev\",\"name\":\"trybuild\",\"req\":\"^1.0\"}],\"features\":{\"bigint\":[\"num-bigint\"],\"bits\":[\"bitvec\"],\"datetime\":[\"time\"],\"debug\":[\"std\",\"colored\"],\"default\":[\"std\"],\"serialize\":[\"cookie-factory\"],\"std\":[],\"trace\":[\"debug\"]}}", diff --git a/codex-rs/app-server/tests/common/mcp_process.rs b/codex-rs/app-server/tests/common/mcp_process.rs index 77212f943..249a28021 100644 --- a/codex-rs/app-server/tests/common/mcp_process.rs +++ b/codex-rs/app-server/tests/common/mcp_process.rs @@ -61,6 +61,7 @@ use codex_app_server_protocol::ThreadResumeParams; use codex_app_server_protocol::ThreadRollbackParams; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadUnarchiveParams; +use codex_app_server_protocol::TurnCompletedNotification; use codex_app_server_protocol::TurnInterruptParams; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnSteerParams; @@ -572,6 +573,63 @@ impl McpProcess { self.send_request("turn/interrupt", params).await } + /// Deterministically clean up an intentionally in-flight turn. + /// + /// Some tests assert behavior while a turn is still running. Returning from those tests + /// without an explicit interrupt + `codex/event/turn_aborted` wait can leave in-flight work + /// racing teardown and intermittently show up as `LEAK` in nextest. + /// + /// In rare races, the turn can also fail or complete on its own after we send + /// `turn/interrupt` but before the server emits the interrupt response. The helper treats a + /// buffered matching `turn/completed` notification as sufficient terminal cleanup in that + /// case so teardown does not flap on timing. + pub async fn interrupt_turn_and_wait_for_aborted( + &mut self, + thread_id: String, + turn_id: String, + read_timeout: std::time::Duration, + ) -> anyhow::Result<()> { + let interrupt_request_id = self + .send_turn_interrupt_request(TurnInterruptParams { + thread_id: thread_id.clone(), + turn_id: turn_id.clone(), + }) + .await?; + match tokio::time::timeout( + read_timeout, + self.read_stream_until_response_message(RequestId::Integer(interrupt_request_id)), + ) + .await + { + Ok(result) => { + result.with_context(|| "failed while waiting for turn interrupt response")?; + } + Err(err) => { + if self.pending_turn_completed_notification(&thread_id, &turn_id) { + return Ok(()); + } + return Err(err).with_context(|| "timed out waiting for turn interrupt response"); + } + } + match tokio::time::timeout( + read_timeout, + self.read_stream_until_notification_message("codex/event/turn_aborted"), + ) + .await + { + Ok(result) => { + result.with_context(|| "failed while waiting for turn aborted notification")?; + } + Err(err) => { + if self.pending_turn_completed_notification(&thread_id, &turn_id) { + return Ok(()); + } + return Err(err).with_context(|| "timed out waiting for turn aborted notification"); + } + } + Ok(()) + } + /// Send a `turn/steer` JSON-RPC request (v2). pub async fn send_turn_steer_request( &mut self, @@ -940,6 +998,25 @@ impl McpProcess { None } + fn pending_turn_completed_notification(&self, thread_id: &str, turn_id: &str) -> bool { + self.pending_messages.iter().any(|message| { + let JSONRPCMessage::Notification(notification) = message else { + return false; + }; + if notification.method != "turn/completed" { + return false; + } + let Some(params) = notification.params.as_ref() else { + return false; + }; + let Ok(payload) = serde_json::from_value::(params.clone()) + else { + return false; + }; + payload.thread_id == thread_id && payload.turn.id == turn_id + }) + } + fn message_request_id(message: &JSONRPCMessage) -> Option<&RequestId> { match message { JSONRPCMessage::Request(request) => Some(&request.id), diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index 4a57767ab..15bc4012e 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -15,8 +15,6 @@ use codex_app_server_protocol::ThreadResumeResponse; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::ThreadStatus; -use codex_app_server_protocol::TurnInterruptParams; -use codex_app_server_protocol::TurnInterruptResponse; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStartResponse; use codex_app_server_protocol::TurnStatus; @@ -317,9 +315,14 @@ async fn thread_resume_rejects_history_when_thread_is_running() -> Result<()> { responses::ev_assistant_message("msg-1", "Done"), responses::ev_completed("resp-1"), ]); - let second_body = responses::sse(vec![responses::ev_response_created("resp-2")]); + let second_response = responses::sse_response(responses::sse(vec![ + responses::ev_response_created("resp-2"), + responses::ev_assistant_message("msg-2", "Done"), + responses::ev_completed("resp-2"), + ])) + .set_delay(std::time::Duration::from_millis(500)); let _first_response_mock = responses::mount_sse_once(&server, first_body).await; - let _second_response_mock = responses::mount_sse_once(&server, second_body).await; + let _second_response_mock = responses::mount_response_once(&server, second_response).await; let codex_home = TempDir::new()?; create_config_toml(codex_home.path(), &server.uri())?; @@ -413,28 +416,9 @@ async fn thread_resume_rejects_history_when_thread_is_running() -> Result<()> { resume_err.error.message ); - // This test intentionally keeps a turn running to exercise the resume error path. - // Keep this explicit interrupt + turn_aborted wait so teardown does not leave - // in-flight work behind (which can show up as LEAK in nextest). - let interrupt_id = primary - .send_turn_interrupt_request(TurnInterruptParams { - thread_id, - turn_id: running_turn.id, - }) + primary + .interrupt_turn_and_wait_for_aborted(thread_id, running_turn.id, DEFAULT_READ_TIMEOUT) .await?; - let interrupt_resp: JSONRPCResponse = timeout( - DEFAULT_READ_TIMEOUT, - primary.read_stream_until_response_message(RequestId::Integer(interrupt_id)), - ) - .await??; - let _turn_interrupt_response: TurnInterruptResponse = - to_response::(interrupt_resp)?; - - timeout( - DEFAULT_READ_TIMEOUT, - primary.read_stream_until_notification_message("codex/event/turn_aborted"), - ) - .await??; Ok(()) } @@ -447,9 +431,14 @@ async fn thread_resume_rejects_mismatched_path_when_thread_is_running() -> Resul responses::ev_assistant_message("msg-1", "Done"), responses::ev_completed("resp-1"), ]); - let second_body = responses::sse(vec![responses::ev_response_created("resp-2")]); + let second_response = responses::sse_response(responses::sse(vec![ + responses::ev_response_created("resp-2"), + responses::ev_assistant_message("msg-2", "Done"), + responses::ev_completed("resp-2"), + ])) + .set_delay(std::time::Duration::from_millis(500)); let _first_response_mock = responses::mount_sse_once(&server, first_body).await; - let _second_response_mock = responses::mount_sse_once(&server, second_body).await; + let _second_response_mock = responses::mount_response_once(&server, second_response).await; let codex_home = TempDir::new()?; create_config_toml(codex_home.path(), &server.uri())?; @@ -533,28 +522,9 @@ async fn thread_resume_rejects_mismatched_path_when_thread_is_running() -> Resul resume_err.error.message ); - // This test intentionally keeps a turn running to exercise the resume error path. - // Keep this explicit interrupt + turn_aborted wait so teardown does not leave - // in-flight work behind (which can show up as LEAK in nextest). - let interrupt_id = primary - .send_turn_interrupt_request(TurnInterruptParams { - thread_id, - turn_id: running_turn.id, - }) + primary + .interrupt_turn_and_wait_for_aborted(thread_id, running_turn.id, DEFAULT_READ_TIMEOUT) .await?; - let interrupt_resp: JSONRPCResponse = timeout( - DEFAULT_READ_TIMEOUT, - primary.read_stream_until_response_message(RequestId::Integer(interrupt_id)), - ) - .await??; - let _turn_interrupt_response: TurnInterruptResponse = - to_response::(interrupt_resp)?; - - timeout( - DEFAULT_READ_TIMEOUT, - primary.read_stream_until_notification_message("codex/event/turn_aborted"), - ) - .await??; Ok(()) } diff --git a/codex-rs/app-server/tests/suite/v2/turn_start.rs b/codex-rs/app-server/tests/suite/v2/turn_start.rs index 2eecfc559..d2d850176 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_start.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_start.rs @@ -747,7 +747,12 @@ async fn turn_start_accepts_local_image_input() -> Result<()> { let TurnStartResponse { turn } = to_response::(turn_resp)?; assert!(!turn.id.is_empty()); - // This test only validates that turn/start responds and returns a turn. + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + Ok(()) } diff --git a/codex-rs/app-server/tests/suite/v2/turn_start_zsh_fork.rs b/codex-rs/app-server/tests/suite/v2/turn_start_zsh_fork.rs index 4233ce649..4be127dff 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_start_zsh_fork.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_start_zsh_fork.rs @@ -29,6 +29,7 @@ use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::TurnCompletedNotification; use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::TurnStartResponse; use codex_app_server_protocol::TurnStatus; use codex_app_server_protocol::UserInput as V2UserInput; use codex_core::features::FEATURES; @@ -100,7 +101,7 @@ async fn turn_start_shell_zsh_fork_executes_command_v2() -> Result<()> { let turn_id = mcp .send_turn_start_request(TurnStartParams { - thread_id: thread.id, + thread_id: thread.id.clone(), input: vec![V2UserInput::Text { text: "run echo hi".to_string(), text_elements: Vec::new(), @@ -114,11 +115,12 @@ async fn turn_start_shell_zsh_fork_executes_command_v2() -> Result<()> { ..Default::default() }) .await?; - timeout( + let turn_resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(turn_id)), ) .await??; + let TurnStartResponse { turn } = to_response::(turn_resp)?; let started_command_execution = timeout(DEFAULT_READ_TIMEOUT, async { loop { @@ -149,6 +151,9 @@ async fn turn_start_shell_zsh_fork_executes_command_v2() -> Result<()> { assert!(command.contains(" -lc 'echo hi'")); assert_eq!(cwd, workspace); + mcp.interrupt_turn_and_wait_for_aborted(thread.id, turn.id, DEFAULT_READ_TIMEOUT) + .await?; + Ok(()) } @@ -504,11 +509,12 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2() ..Default::default() }) .await?; - timeout( + let turn_resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(turn_id)), ) .await??; + let TurnStartResponse { turn } = to_response::(turn_resp)?; let mut approval_ids = Vec::new(); for decision in [ @@ -577,6 +583,9 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2() assert_eq!(approval_ids.len(), 2); assert_ne!(approval_ids[0], approval_ids[1]); + mcp.interrupt_turn_and_wait_for_aborted(thread.id, turn.id, DEFAULT_READ_TIMEOUT) + .await?; + Ok(()) } diff --git a/codex-rs/app-server/tests/suite/v2/turn_steer.rs b/codex-rs/app-server/tests/suite/v2/turn_steer.rs index 89704326f..779e77577 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_steer.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_steer.rs @@ -136,7 +136,7 @@ async fn turn_steer_returns_active_turn_id() -> Result<()> { let steer_req = mcp .send_turn_steer_request(TurnSteerParams { - thread_id: thread.id, + thread_id: thread.id.clone(), input: vec![V2UserInput::Text { text: "steer".to_string(), text_elements: Vec::new(), @@ -152,6 +152,9 @@ async fn turn_steer_returns_active_turn_id() -> Result<()> { let steer: TurnSteerResponse = to_response::(steer_resp)?; assert_eq!(steer.turn_id, turn.id); + mcp.interrupt_turn_and_wait_for_aborted(thread.id, steer.turn_id, DEFAULT_READ_TIMEOUT) + .await?; + Ok(()) }