diff --git a/codex-rs/core/tests/suite/stream_no_completed.rs b/codex-rs/core/tests/suite/stream_no_completed.rs index ac6d59b21..e6fc7ee8c 100644 --- a/codex-rs/core/tests/suite/stream_no_completed.rs +++ b/codex-rs/core/tests/suite/stream_no_completed.rs @@ -6,63 +6,40 @@ use codex_core::WireApi; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::Op; use codex_protocol::user_input::UserInput; +use codex_utils_cargo_bin::find_resource; use core_test_support::load_sse_fixture; -use core_test_support::responses::ev_completed; -use core_test_support::responses::ev_response_created; -use core_test_support::responses::sse; +use core_test_support::responses; use core_test_support::skip_if_no_network; +use core_test_support::streaming_sse::StreamingSseChunk; +use core_test_support::streaming_sse::start_streaming_sse_server; use core_test_support::test_codex::TestCodex; use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; -use wiremock::Mock; -use wiremock::MockServer; -use wiremock::Request; -use wiremock::Respond; -use wiremock::ResponseTemplate; -use wiremock::matchers::method; -use wiremock::matchers::path; fn sse_incomplete() -> String { - load_sse_fixture("tests/fixtures/incomplete_sse.json") + let fixture = find_resource!("tests/fixtures/incomplete_sse.json") + .unwrap_or_else(|err| panic!("failed to resolve incomplete_sse fixture: {err}")); + load_sse_fixture(fixture) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn retries_on_early_close() { skip_if_no_network!(); - let server = MockServer::start().await; + let incomplete_sse = sse_incomplete(); + let completed_sse = responses::sse_completed("resp_ok"); - struct SeqResponder; - impl Respond for SeqResponder { - fn respond(&self, _: &Request) -> ResponseTemplate { - use std::sync::atomic::AtomicUsize; - use std::sync::atomic::Ordering; - static CALLS: AtomicUsize = AtomicUsize::new(0); - let n = CALLS.fetch_add(1, Ordering::SeqCst); - if n == 0 { - ResponseTemplate::new(200) - .insert_header("content-type", "text/event-stream") - .set_body_raw(sse_incomplete(), "text/event-stream") - } else { - ResponseTemplate::new(200) - .insert_header("content-type", "text/event-stream") - .set_body_raw( - sse(vec![ - ev_response_created("resp_ok"), - ev_completed("resp_ok"), - ]), - "text/event-stream", - ) - } - } - } - - Mock::given(method("POST")) - .and(path("/v1/responses")) - .respond_with(SeqResponder {}) - .expect(2) - .mount(&server) - .await; + let (server, _) = start_streaming_sse_server(vec![ + vec![StreamingSseChunk { + gate: None, + body: incomplete_sse, + }], + vec![StreamingSseChunk { + gate: None, + body: completed_sse, + }], + ]) + .await; // Configure retry behavior explicitly to avoid mutating process-wide // environment variables. @@ -92,7 +69,7 @@ async fn retries_on_early_close() { .with_config(move |config| { config.model_provider = model_provider; }) - .build(&server) + .build_with_streaming_server(&server) .await .unwrap(); @@ -109,4 +86,13 @@ async fn retries_on_early_close() { // Wait until TurnComplete (should succeed after retry). wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await; + + let requests = server.requests().await; + assert_eq!( + requests.len(), + 2, + "expected retry after incomplete SSE stream" + ); + + server.shutdown().await; }