From 48aeb67f7a1cf54c37fc07955fd69131e0d256c3 Mon Sep 17 00:00:00 2001 From: gt-oai Date: Mon, 26 Jan 2026 15:58:14 +0000 Subject: [PATCH] Fix flakey conversation flow test (#9784) I've seen this test fail with: ``` - Mock #1. Expected range of matching incoming requests: == 2 Number of matched incoming requests: 1 ``` This is because we pop the wrong task_complete events and then the test exits. I think this is because the MCP events are now buffered after https://github.com/openai/codex/pull/8874. So: 1. clear the buffer before we do any user message sending 2. additionally listen for task start before task complete 3. use the ID from task start to find the correct task complete event. --- .../suite/codex_message_processor_flow.rs | 37 +++++++++++++++++-- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs b/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs index b97acfc40..d905c3c1b 100644 --- a/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs +++ b/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs @@ -108,6 +108,10 @@ async fn test_codex_jsonrpc_conversation_flow() -> Result<()> { let AddConversationSubscriptionResponse { subscription_id } = to_response::(add_listener_resp)?; + // Drop any buffered events from conversation setup to avoid + // matching an earlier task_complete. + mcp.clear_message_buffer(); + // 3) sendUserMessage (should trigger notifications; we only validate an OK response) let send_user_id = mcp .send_send_user_message_request(SendUserMessageParams { @@ -125,13 +129,38 @@ async fn test_codex_jsonrpc_conversation_flow() -> Result<()> { .await??; let SendUserMessageResponse {} = to_response::(send_user_resp)?; - // Verify the task_finished notification is received. - // Note this also ensures that the final request to the server was made. - let task_finished_notification: JSONRPCNotification = timeout( + let task_started_notification: JSONRPCNotification = timeout( DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_notification_message("codex/event/task_complete"), + mcp.read_stream_until_notification_message("codex/event/task_started"), ) .await??; + let task_started_event: Event = serde_json::from_value( + task_started_notification + .params + .clone() + .expect("task_started should have params"), + ) + .expect("task_started should deserialize to Event"); + + // Verify the task_finished notification for this turn is received. + // Note this also ensures that the final request to the server was made. + let task_finished_notification: JSONRPCNotification = loop { + let notification: JSONRPCNotification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/task_complete"), + ) + .await??; + let event: Event = serde_json::from_value( + notification + .params + .clone() + .expect("task_complete should have params"), + ) + .expect("task_complete should deserialize to Event"); + if event.id == task_started_event.id { + break notification; + } + }; let serde_json::Value::Object(map) = task_finished_notification .params .expect("notification should have params")