ws turn metadata via client_metadata (#11953)
This commit is contained in:
parent
2f3d0b186b
commit
429cc4860e
3 changed files with 101 additions and 2 deletions
|
|
@ -9,6 +9,7 @@ use futures::Stream;
|
|||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
|
@ -175,6 +176,7 @@ impl From<&ResponsesApiRequest> for ResponseCreateWsRequest {
|
|||
include: request.include.clone(),
|
||||
prompt_cache_key: request.prompt_cache_key.clone(),
|
||||
text: request.text.clone(),
|
||||
client_metadata: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -197,11 +199,15 @@ pub struct ResponseCreateWsRequest {
|
|||
pub prompt_cache_key: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub text: Option<TextControls>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub client_metadata: Option<HashMap<String, String>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct ResponseAppendWsRequest {
|
||||
pub input: Vec<ResponseItem>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub client_metadata: Option<HashMap<String, String>>,
|
||||
}
|
||||
#[derive(Debug, Serialize)]
|
||||
#[serde(tag = "type")]
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@
|
|||
//! back. This avoids duplicate handshakes but means a failed prewarm can consume one retry
|
||||
//! budget slot before any turn payload is sent.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::OnceLock;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
|
|
@ -623,6 +624,7 @@ impl ModelClientSession {
|
|||
if !responses_websockets_v2_enabled {
|
||||
return ResponsesWsRequest::ResponseAppend(ResponseAppendWsRequest {
|
||||
input: append_items,
|
||||
client_metadata: payload.client_metadata,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
@ -811,7 +813,10 @@ impl ModelClientSession {
|
|||
effort,
|
||||
summary,
|
||||
)?;
|
||||
let ws_payload = ResponseCreateWsRequest::from(&request);
|
||||
let ws_payload = ResponseCreateWsRequest {
|
||||
client_metadata: build_ws_client_metadata(turn_metadata_header),
|
||||
..ResponseCreateWsRequest::from(&request)
|
||||
};
|
||||
|
||||
match self
|
||||
.websocket_connection(
|
||||
|
|
@ -968,6 +973,14 @@ fn parse_turn_metadata_header(turn_metadata_header: Option<&str>) -> Option<Head
|
|||
turn_metadata_header.and_then(|value| HeaderValue::from_str(value).ok())
|
||||
}
|
||||
|
||||
fn build_ws_client_metadata(turn_metadata_header: Option<&str>) -> Option<HashMap<String, String>> {
|
||||
let turn_metadata_header = parse_turn_metadata_header(turn_metadata_header)?;
|
||||
let turn_metadata = turn_metadata_header.to_str().ok()?.to_string();
|
||||
let mut client_metadata = HashMap::new();
|
||||
client_metadata.insert(X_CODEX_TURN_METADATA_HEADER.to_string(), turn_metadata);
|
||||
Some(client_metadata)
|
||||
}
|
||||
|
||||
/// Builds the extra headers attached to Responses API requests.
|
||||
///
|
||||
/// These headers implement Codex-specific conventions:
|
||||
|
|
|
|||
|
|
@ -714,6 +714,77 @@ async fn responses_websocket_appends_on_prefix() {
|
|||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_forwards_turn_metadata_on_create_and_append() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let server = start_websocket_server(vec![vec![
|
||||
vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", "assistant output"),
|
||||
ev_done(),
|
||||
],
|
||||
vec![ev_response_created("resp-2"), ev_completed("resp-2")],
|
||||
]])
|
||||
.await;
|
||||
|
||||
let harness = websocket_harness(&server).await;
|
||||
let mut client_session = harness.client.new_session();
|
||||
let first_turn_metadata = r#"{"turn_id":"turn-123","sandbox":"workspace-write"}"#;
|
||||
let enriched_turn_metadata = r#"{"turn_id":"turn-123","sandbox":"workspace-write","workspaces":[{"root_path":"/tmp/repo","latest_git_commit_hash":"abc123","associated_remote_urls":["git@github.com:openai/codex.git"],"has_changes":true}]}"#;
|
||||
let prompt_one = prompt_with_input(vec![message_item("hello")]);
|
||||
let prompt_two = prompt_with_input(vec![
|
||||
message_item("hello"),
|
||||
assistant_message_item("msg-1", "assistant output"),
|
||||
message_item("second"),
|
||||
]);
|
||||
|
||||
stream_until_complete_with_turn_metadata(
|
||||
&mut client_session,
|
||||
&harness,
|
||||
&prompt_one,
|
||||
Some(first_turn_metadata),
|
||||
)
|
||||
.await;
|
||||
stream_until_complete_with_turn_metadata(
|
||||
&mut client_session,
|
||||
&harness,
|
||||
&prompt_two,
|
||||
Some(enriched_turn_metadata),
|
||||
)
|
||||
.await;
|
||||
|
||||
let connection = server.single_connection();
|
||||
assert_eq!(connection.len(), 2);
|
||||
let first = connection.first().expect("missing request").body_json();
|
||||
let second = connection.get(1).expect("missing request").body_json();
|
||||
|
||||
assert_eq!(first["type"].as_str(), Some("response.create"));
|
||||
assert_eq!(
|
||||
first["client_metadata"]["x-codex-turn-metadata"].as_str(),
|
||||
Some(first_turn_metadata)
|
||||
);
|
||||
assert_eq!(second["type"].as_str(), Some("response.append"));
|
||||
assert_eq!(
|
||||
second["client_metadata"]["x-codex-turn-metadata"].as_str(),
|
||||
Some(enriched_turn_metadata)
|
||||
);
|
||||
|
||||
let first_metadata: serde_json::Value =
|
||||
serde_json::from_str(first_turn_metadata).expect("first metadata should be valid json");
|
||||
let second_metadata: serde_json::Value = serde_json::from_str(enriched_turn_metadata)
|
||||
.expect("enriched metadata should be valid json");
|
||||
|
||||
assert_eq!(first_metadata["turn_id"].as_str(), Some("turn-123"));
|
||||
assert_eq!(second_metadata["turn_id"].as_str(), Some("turn-123"));
|
||||
assert_eq!(
|
||||
second_metadata["workspaces"][0]["has_changes"].as_bool(),
|
||||
Some(true)
|
||||
);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_creates_on_prefix_when_previous_completion_cannot_append() {
|
||||
skip_if_no_network!();
|
||||
|
|
@ -1173,6 +1244,15 @@ async fn stream_until_complete(
|
|||
client_session: &mut ModelClientSession,
|
||||
harness: &WebsocketTestHarness,
|
||||
prompt: &Prompt,
|
||||
) {
|
||||
stream_until_complete_with_turn_metadata(client_session, harness, prompt, None).await;
|
||||
}
|
||||
|
||||
async fn stream_until_complete_with_turn_metadata(
|
||||
client_session: &mut ModelClientSession,
|
||||
harness: &WebsocketTestHarness,
|
||||
prompt: &Prompt,
|
||||
turn_metadata_header: Option<&str>,
|
||||
) {
|
||||
let mut stream = client_session
|
||||
.stream(
|
||||
|
|
@ -1181,7 +1261,7 @@ async fn stream_until_complete(
|
|||
&harness.otel_manager,
|
||||
harness.effort,
|
||||
harness.summary,
|
||||
None,
|
||||
turn_metadata_header,
|
||||
)
|
||||
.await
|
||||
.expect("websocket stream failed");
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue