diff --git a/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs b/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs index b97acfc40..d905c3c1b 100644 --- a/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs +++ b/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs @@ -108,6 +108,10 @@ async fn test_codex_jsonrpc_conversation_flow() -> Result<()> { let AddConversationSubscriptionResponse { subscription_id } = to_response::(add_listener_resp)?; + // Drop any buffered events from conversation setup to avoid + // matching an earlier task_complete. + mcp.clear_message_buffer(); + // 3) sendUserMessage (should trigger notifications; we only validate an OK response) let send_user_id = mcp .send_send_user_message_request(SendUserMessageParams { @@ -125,13 +129,38 @@ async fn test_codex_jsonrpc_conversation_flow() -> Result<()> { .await??; let SendUserMessageResponse {} = to_response::(send_user_resp)?; - // Verify the task_finished notification is received. - // Note this also ensures that the final request to the server was made. - let task_finished_notification: JSONRPCNotification = timeout( + let task_started_notification: JSONRPCNotification = timeout( DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_notification_message("codex/event/task_complete"), + mcp.read_stream_until_notification_message("codex/event/task_started"), ) .await??; + let task_started_event: Event = serde_json::from_value( + task_started_notification + .params + .clone() + .expect("task_started should have params"), + ) + .expect("task_started should deserialize to Event"); + + // Verify the task_finished notification for this turn is received. + // Note this also ensures that the final request to the server was made. + let task_finished_notification: JSONRPCNotification = loop { + let notification: JSONRPCNotification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/task_complete"), + ) + .await??; + let event: Event = serde_json::from_value( + notification + .params + .clone() + .expect("task_complete should have params"), + ) + .expect("task_complete should deserialize to Event"); + if event.id == task_started_event.id { + break notification; + } + }; let serde_json::Value::Object(map) = task_finished_notification .params .expect("notification should have params")