From cfcc87a953fa2085052b1f62df2e9a86e7b72681 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Fri, 14 Nov 2025 14:54:11 -0800 Subject: [PATCH] Order outputs before inputs (#6691) For better caching performance all output items should be rendered in the order they were produced before all new input items (for example, all function_call before all function_call_output). --- codex-rs/core/src/response_processing.rs | 80 ++++++------------- codex-rs/core/tests/suite/tool_parallelism.rs | 80 +++++++++++++++++++ 2 files changed, 103 insertions(+), 57 deletions(-) diff --git a/codex-rs/core/src/response_processing.rs b/codex-rs/core/src/response_processing.rs index d1767b74c..458f82526 100644 --- a/codex-rs/core/src/response_processing.rs +++ b/codex-rs/core/src/response_processing.rs @@ -13,92 +13,58 @@ pub(crate) async fn process_items( sess: &Session, turn_context: &TurnContext, ) -> (Vec, Vec) { - let mut items_to_record_in_conversation_history = Vec::::new(); + let mut outputs_to_record = Vec::::new(); + let mut new_inputs_to_record = Vec::::new(); let mut responses = Vec::::new(); for processed_response_item in processed_items { let crate::codex::ProcessedResponseItem { item, response } = processed_response_item; - match (&item, &response) { - (ResponseItem::Message { role, .. }, None) if role == "assistant" => { - // If the model returned a message, we need to record it. - items_to_record_in_conversation_history.push(item); - } - ( - ResponseItem::LocalShellCall { .. }, - Some(ResponseInputItem::FunctionCallOutput { call_id, output }), - ) => { - items_to_record_in_conversation_history.push(item); - items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput { + + if let Some(response) = &response { + responses.push(response.clone()); + } + + match response { + Some(ResponseInputItem::FunctionCallOutput { call_id, output }) => { + new_inputs_to_record.push(ResponseItem::FunctionCallOutput { call_id: call_id.clone(), output: output.clone(), }); } - ( - ResponseItem::FunctionCall { .. }, - Some(ResponseInputItem::FunctionCallOutput { call_id, output }), - ) => { - items_to_record_in_conversation_history.push(item); - items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput { + + Some(ResponseInputItem::CustomToolCallOutput { call_id, output }) => { + new_inputs_to_record.push(ResponseItem::CustomToolCallOutput { call_id: call_id.clone(), output: output.clone(), }); } - ( - ResponseItem::CustomToolCall { .. }, - Some(ResponseInputItem::CustomToolCallOutput { call_id, output }), - ) => { - items_to_record_in_conversation_history.push(item); - items_to_record_in_conversation_history.push(ResponseItem::CustomToolCallOutput { - call_id: call_id.clone(), - output: output.clone(), - }); - } - ( - ResponseItem::FunctionCall { .. }, - Some(ResponseInputItem::McpToolCallOutput { call_id, result }), - ) => { - items_to_record_in_conversation_history.push(item); + Some(ResponseInputItem::McpToolCallOutput { call_id, result }) => { let output = match result { - Ok(call_tool_result) => FunctionCallOutputPayload::from(call_tool_result), + Ok(call_tool_result) => FunctionCallOutputPayload::from(&call_tool_result), Err(err) => FunctionCallOutputPayload { content: err.clone(), success: Some(false), ..Default::default() }, }; - items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput { + new_inputs_to_record.push(ResponseItem::FunctionCallOutput { call_id: call_id.clone(), output, }); } - ( - ResponseItem::Reasoning { - id, - summary, - content, - encrypted_content, - }, - None, - ) => { - items_to_record_in_conversation_history.push(ResponseItem::Reasoning { - id: id.clone(), - summary: summary.clone(), - content: content.clone(), - encrypted_content: encrypted_content.clone(), - }); - } + None => {} _ => { warn!("Unexpected response item: {item:?} with response: {response:?}"); } }; - if let Some(response) = response { - responses.push(response); - } + + outputs_to_record.push(item); } + let all_items_to_record = [outputs_to_record, new_inputs_to_record].concat(); // Only attempt to take the lock if there is something to record. - if !items_to_record_in_conversation_history.is_empty() { - sess.record_conversation_items(turn_context, &items_to_record_in_conversation_history) + if !all_items_to_record.is_empty() { + sess.record_conversation_items(turn_context, &all_items_to_record) .await; } - (responses, items_to_record_in_conversation_history) + (responses, all_items_to_record) } diff --git a/codex-rs/core/tests/suite/tool_parallelism.rs b/codex-rs/core/tests/suite/tool_parallelism.rs index a1e96fa02..30ea0f7d2 100644 --- a/codex-rs/core/tests/suite/tool_parallelism.rs +++ b/codex-rs/core/tests/suite/tool_parallelism.rs @@ -14,6 +14,7 @@ use codex_protocol::user_input::UserInput; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; use core_test_support::responses::ev_function_call; +use core_test_support::responses::mount_sse_once; use core_test_support::responses::mount_sse_sequence; use core_test_support::responses::sse; use core_test_support::responses::start_mock_server; @@ -21,6 +22,7 @@ use core_test_support::skip_if_no_network; use core_test_support::test_codex::TestCodex; use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; +use serde_json::Value; use serde_json::json; async fn run_turn(test: &TestCodex, prompt: &str) -> anyhow::Result<()> { @@ -204,3 +206,81 @@ async fn mixed_tools_fall_back_to_serial() -> anyhow::Result<()> { Ok(()) } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn tool_results_grouped() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let test = build_codex_with_test_tool(&server).await?; + + let shell_args = serde_json::to_string(&json!({ + "command": ["/bin/sh", "-c", "echo 'shell output'"], + "timeout_ms": 1_000, + }))?; + + mount_sse_once( + &server, + sse(vec![ + json!({"type": "response.created", "response": {"id": "resp-1"}}), + ev_function_call("call-1", "shell", &shell_args), + ev_function_call("call-2", "shell", &shell_args), + ev_function_call("call-3", "shell", &shell_args), + ev_completed("resp-1"), + ]), + ) + .await; + let tool_output_request = mount_sse_once( + &server, + sse(vec![ + ev_assistant_message("msg-1", "done"), + ev_completed("resp-2"), + ]), + ) + .await; + + run_turn(&test, "run shell three times").await?; + + let input = tool_output_request.single_request().input(); + + // find all function_call inputs with indexes + let function_calls = input + .iter() + .enumerate() + .filter(|(_, item)| item.get("type").and_then(Value::as_str) == Some("function_call")) + .collect::>(); + + let function_call_outputs = input + .iter() + .enumerate() + .filter(|(_, item)| { + item.get("type").and_then(Value::as_str) == Some("function_call_output") + }) + .collect::>(); + + assert_eq!(function_calls.len(), 3); + assert_eq!(function_call_outputs.len(), 3); + + for (index, _) in &function_calls { + for (output_index, _) in &function_call_outputs { + assert!( + *index < *output_index, + "all function calls must come before outputs" + ); + } + } + + // output should come in the order of the function calls + let zipped = function_calls + .iter() + .zip(function_call_outputs.iter()) + .collect::>(); + for (call, output) in zipped { + assert_eq!( + call.1.get("call_id").and_then(Value::as_str), + output.1.get("call_id").and_then(Value::as_str) + ); + } + + Ok(()) +}