Stabilize incomplete SSE retry test (#13879)

## What changed
- The retry test now uses the same streaming SSE test server used by
production-style tests instead of a wiremock sequence.
- The fixture is resolved via `find_resource!`, and the test asserts
that exactly two outbound requests were sent.

## Why this fixes the flake
- The old wiremock sequence approximated early-close behavior, but it
did not reproduce the same streaming semantics the real client sees.
- That meant the retry path depended on mock implementation details
instead of on the actual transport behavior we care about.
- Switching to the streaming SSE helper makes the test exercise the real
early-close/retry contract, and counting requests directly verifies that
we retried exactly once rather than merely hoping the sequence aligned.

## Scope
- Test-only change.
This commit is contained in:
Ahmed Ibrahim 2026-03-09 22:34:44 -07:00 committed by GitHub
parent 2e24be2134
commit aa6a57dfa2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -6,63 +6,40 @@ use codex_core::WireApi;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::user_input::UserInput;
use codex_utils_cargo_bin::find_resource;
use core_test_support::load_sse_fixture;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::sse;
use core_test_support::responses;
use core_test_support::skip_if_no_network;
use core_test_support::streaming_sse::StreamingSseChunk;
use core_test_support::streaming_sse::start_streaming_sse_server;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::Request;
use wiremock::Respond;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
fn sse_incomplete() -> String {
load_sse_fixture("tests/fixtures/incomplete_sse.json")
let fixture = find_resource!("tests/fixtures/incomplete_sse.json")
.unwrap_or_else(|err| panic!("failed to resolve incomplete_sse fixture: {err}"));
load_sse_fixture(fixture)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn retries_on_early_close() {
skip_if_no_network!();
let server = MockServer::start().await;
let incomplete_sse = sse_incomplete();
let completed_sse = responses::sse_completed("resp_ok");
struct SeqResponder;
impl Respond for SeqResponder {
fn respond(&self, _: &Request) -> ResponseTemplate {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
static CALLS: AtomicUsize = AtomicUsize::new(0);
let n = CALLS.fetch_add(1, Ordering::SeqCst);
if n == 0 {
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_incomplete(), "text/event-stream")
} else {
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(
sse(vec![
ev_response_created("resp_ok"),
ev_completed("resp_ok"),
]),
"text/event-stream",
)
}
}
}
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(SeqResponder {})
.expect(2)
.mount(&server)
.await;
let (server, _) = start_streaming_sse_server(vec![
vec![StreamingSseChunk {
gate: None,
body: incomplete_sse,
}],
vec![StreamingSseChunk {
gate: None,
body: completed_sse,
}],
])
.await;
// Configure retry behavior explicitly to avoid mutating process-wide
// environment variables.
@ -92,7 +69,7 @@ async fn retries_on_early_close() {
.with_config(move |config| {
config.model_provider = model_provider;
})
.build(&server)
.build_with_streaming_server(&server)
.await
.unwrap();
@ -109,4 +86,13 @@ async fn retries_on_early_close() {
// Wait until TurnComplete (should succeed after retry).
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let requests = server.requests().await;
assert_eq!(
requests.len(),
2,
"expected retry after incomplete SSE stream"
);
server.shutdown().await;
}