Order outputs before inputs (#6691)
For better caching performance all output items should be rendered in the order they were produced before all new input items (for example, all function_call before all function_call_output).
This commit is contained in:
parent
c3951e505d
commit
cfcc87a953
2 changed files with 103 additions and 57 deletions
|
|
@ -13,92 +13,58 @@ pub(crate) async fn process_items(
|
|||
sess: &Session,
|
||||
turn_context: &TurnContext,
|
||||
) -> (Vec<ResponseInputItem>, Vec<ResponseItem>) {
|
||||
let mut items_to_record_in_conversation_history = Vec::<ResponseItem>::new();
|
||||
let mut outputs_to_record = Vec::<ResponseItem>::new();
|
||||
let mut new_inputs_to_record = Vec::<ResponseItem>::new();
|
||||
let mut responses = Vec::<ResponseInputItem>::new();
|
||||
for processed_response_item in processed_items {
|
||||
let crate::codex::ProcessedResponseItem { item, response } = processed_response_item;
|
||||
match (&item, &response) {
|
||||
(ResponseItem::Message { role, .. }, None) if role == "assistant" => {
|
||||
// If the model returned a message, we need to record it.
|
||||
items_to_record_in_conversation_history.push(item);
|
||||
}
|
||||
(
|
||||
ResponseItem::LocalShellCall { .. },
|
||||
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
|
||||
) => {
|
||||
items_to_record_in_conversation_history.push(item);
|
||||
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
|
||||
|
||||
if let Some(response) = &response {
|
||||
responses.push(response.clone());
|
||||
}
|
||||
|
||||
match response {
|
||||
Some(ResponseInputItem::FunctionCallOutput { call_id, output }) => {
|
||||
new_inputs_to_record.push(ResponseItem::FunctionCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output: output.clone(),
|
||||
});
|
||||
}
|
||||
(
|
||||
ResponseItem::FunctionCall { .. },
|
||||
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
|
||||
) => {
|
||||
items_to_record_in_conversation_history.push(item);
|
||||
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
|
||||
|
||||
Some(ResponseInputItem::CustomToolCallOutput { call_id, output }) => {
|
||||
new_inputs_to_record.push(ResponseItem::CustomToolCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output: output.clone(),
|
||||
});
|
||||
}
|
||||
(
|
||||
ResponseItem::CustomToolCall { .. },
|
||||
Some(ResponseInputItem::CustomToolCallOutput { call_id, output }),
|
||||
) => {
|
||||
items_to_record_in_conversation_history.push(item);
|
||||
items_to_record_in_conversation_history.push(ResponseItem::CustomToolCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output: output.clone(),
|
||||
});
|
||||
}
|
||||
(
|
||||
ResponseItem::FunctionCall { .. },
|
||||
Some(ResponseInputItem::McpToolCallOutput { call_id, result }),
|
||||
) => {
|
||||
items_to_record_in_conversation_history.push(item);
|
||||
Some(ResponseInputItem::McpToolCallOutput { call_id, result }) => {
|
||||
let output = match result {
|
||||
Ok(call_tool_result) => FunctionCallOutputPayload::from(call_tool_result),
|
||||
Ok(call_tool_result) => FunctionCallOutputPayload::from(&call_tool_result),
|
||||
Err(err) => FunctionCallOutputPayload {
|
||||
content: err.clone(),
|
||||
success: Some(false),
|
||||
..Default::default()
|
||||
},
|
||||
};
|
||||
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
|
||||
new_inputs_to_record.push(ResponseItem::FunctionCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output,
|
||||
});
|
||||
}
|
||||
(
|
||||
ResponseItem::Reasoning {
|
||||
id,
|
||||
summary,
|
||||
content,
|
||||
encrypted_content,
|
||||
},
|
||||
None,
|
||||
) => {
|
||||
items_to_record_in_conversation_history.push(ResponseItem::Reasoning {
|
||||
id: id.clone(),
|
||||
summary: summary.clone(),
|
||||
content: content.clone(),
|
||||
encrypted_content: encrypted_content.clone(),
|
||||
});
|
||||
}
|
||||
None => {}
|
||||
_ => {
|
||||
warn!("Unexpected response item: {item:?} with response: {response:?}");
|
||||
}
|
||||
};
|
||||
if let Some(response) = response {
|
||||
responses.push(response);
|
||||
}
|
||||
|
||||
outputs_to_record.push(item);
|
||||
}
|
||||
|
||||
let all_items_to_record = [outputs_to_record, new_inputs_to_record].concat();
|
||||
// Only attempt to take the lock if there is something to record.
|
||||
if !items_to_record_in_conversation_history.is_empty() {
|
||||
sess.record_conversation_items(turn_context, &items_to_record_in_conversation_history)
|
||||
if !all_items_to_record.is_empty() {
|
||||
sess.record_conversation_items(turn_context, &all_items_to_record)
|
||||
.await;
|
||||
}
|
||||
(responses, items_to_record_in_conversation_history)
|
||||
(responses, all_items_to_record)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ use codex_protocol::user_input::UserInput;
|
|||
use core_test_support::responses::ev_assistant_message;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_function_call;
|
||||
use core_test_support::responses::mount_sse_once;
|
||||
use core_test_support::responses::mount_sse_sequence;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
|
|
@ -21,6 +22,7 @@ use core_test_support::skip_if_no_network;
|
|||
use core_test_support::test_codex::TestCodex;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
|
||||
async fn run_turn(test: &TestCodex, prompt: &str) -> anyhow::Result<()> {
|
||||
|
|
@ -204,3 +206,81 @@ async fn mixed_tools_fall_back_to_serial() -> anyhow::Result<()> {
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn tool_results_grouped() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let test = build_codex_with_test_tool(&server).await?;
|
||||
|
||||
let shell_args = serde_json::to_string(&json!({
|
||||
"command": ["/bin/sh", "-c", "echo 'shell output'"],
|
||||
"timeout_ms": 1_000,
|
||||
}))?;
|
||||
|
||||
mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
json!({"type": "response.created", "response": {"id": "resp-1"}}),
|
||||
ev_function_call("call-1", "shell", &shell_args),
|
||||
ev_function_call("call-2", "shell", &shell_args),
|
||||
ev_function_call("call-3", "shell", &shell_args),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let tool_output_request = mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-1", "done"),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
run_turn(&test, "run shell three times").await?;
|
||||
|
||||
let input = tool_output_request.single_request().input();
|
||||
|
||||
// find all function_call inputs with indexes
|
||||
let function_calls = input
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter(|(_, item)| item.get("type").and_then(Value::as_str) == Some("function_call"))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let function_call_outputs = input
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter(|(_, item)| {
|
||||
item.get("type").and_then(Value::as_str) == Some("function_call_output")
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(function_calls.len(), 3);
|
||||
assert_eq!(function_call_outputs.len(), 3);
|
||||
|
||||
for (index, _) in &function_calls {
|
||||
for (output_index, _) in &function_call_outputs {
|
||||
assert!(
|
||||
*index < *output_index,
|
||||
"all function calls must come before outputs"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// output should come in the order of the function calls
|
||||
let zipped = function_calls
|
||||
.iter()
|
||||
.zip(function_call_outputs.iter())
|
||||
.collect::<Vec<_>>();
|
||||
for (call, output) in zipped {
|
||||
assert_eq!(
|
||||
call.1.get("call_id").and_then(Value::as_str),
|
||||
output.1.get("call_id").and_then(Value::as_str)
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue