diff --git a/codex-rs/codex-api/src/common.rs b/codex-rs/codex-api/src/common.rs index 3ef96c185..f27f936b5 100644 --- a/codex-rs/codex-api/src/common.rs +++ b/codex-rs/codex-api/src/common.rs @@ -9,6 +9,7 @@ use futures::Stream; use serde::Deserialize; use serde::Serialize; use serde_json::Value; +use std::collections::HashMap; use std::pin::Pin; use std::task::Context; use std::task::Poll; @@ -175,6 +176,7 @@ impl From<&ResponsesApiRequest> for ResponseCreateWsRequest { include: request.include.clone(), prompt_cache_key: request.prompt_cache_key.clone(), text: request.text.clone(), + client_metadata: None, } } } @@ -197,11 +199,15 @@ pub struct ResponseCreateWsRequest { pub prompt_cache_key: Option, #[serde(skip_serializing_if = "Option::is_none")] pub text: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub client_metadata: Option>, } #[derive(Debug, Serialize)] pub struct ResponseAppendWsRequest { pub input: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub client_metadata: Option>, } #[derive(Debug, Serialize)] #[serde(tag = "type")] diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 09ae0eb0d..658e212fa 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -26,6 +26,7 @@ //! back. This avoids duplicate handshakes but means a failed prewarm can consume one retry //! budget slot before any turn payload is sent. +use std::collections::HashMap; use std::sync::Arc; use std::sync::OnceLock; use std::sync::atomic::AtomicBool; @@ -623,6 +624,7 @@ impl ModelClientSession { if !responses_websockets_v2_enabled { return ResponsesWsRequest::ResponseAppend(ResponseAppendWsRequest { input: append_items, + client_metadata: payload.client_metadata, }); } } @@ -811,7 +813,10 @@ impl ModelClientSession { effort, summary, )?; - let ws_payload = ResponseCreateWsRequest::from(&request); + let ws_payload = ResponseCreateWsRequest { + client_metadata: build_ws_client_metadata(turn_metadata_header), + ..ResponseCreateWsRequest::from(&request) + }; match self .websocket_connection( @@ -968,6 +973,14 @@ fn parse_turn_metadata_header(turn_metadata_header: Option<&str>) -> Option) -> Option> { + let turn_metadata_header = parse_turn_metadata_header(turn_metadata_header)?; + let turn_metadata = turn_metadata_header.to_str().ok()?.to_string(); + let mut client_metadata = HashMap::new(); + client_metadata.insert(X_CODEX_TURN_METADATA_HEADER.to_string(), turn_metadata); + Some(client_metadata) +} + /// Builds the extra headers attached to Responses API requests. /// /// These headers implement Codex-specific conventions: diff --git a/codex-rs/core/tests/suite/client_websockets.rs b/codex-rs/core/tests/suite/client_websockets.rs index 626d0695b..1c7c578ca 100755 --- a/codex-rs/core/tests/suite/client_websockets.rs +++ b/codex-rs/core/tests/suite/client_websockets.rs @@ -714,6 +714,77 @@ async fn responses_websocket_appends_on_prefix() { server.shutdown().await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn responses_websocket_forwards_turn_metadata_on_create_and_append() { + skip_if_no_network!(); + + let server = start_websocket_server(vec![vec![ + vec![ + ev_response_created("resp-1"), + ev_assistant_message("msg-1", "assistant output"), + ev_done(), + ], + vec![ev_response_created("resp-2"), ev_completed("resp-2")], + ]]) + .await; + + let harness = websocket_harness(&server).await; + let mut client_session = harness.client.new_session(); + let first_turn_metadata = r#"{"turn_id":"turn-123","sandbox":"workspace-write"}"#; + let enriched_turn_metadata = r#"{"turn_id":"turn-123","sandbox":"workspace-write","workspaces":[{"root_path":"/tmp/repo","latest_git_commit_hash":"abc123","associated_remote_urls":["git@github.com:openai/codex.git"],"has_changes":true}]}"#; + let prompt_one = prompt_with_input(vec![message_item("hello")]); + let prompt_two = prompt_with_input(vec![ + message_item("hello"), + assistant_message_item("msg-1", "assistant output"), + message_item("second"), + ]); + + stream_until_complete_with_turn_metadata( + &mut client_session, + &harness, + &prompt_one, + Some(first_turn_metadata), + ) + .await; + stream_until_complete_with_turn_metadata( + &mut client_session, + &harness, + &prompt_two, + Some(enriched_turn_metadata), + ) + .await; + + let connection = server.single_connection(); + assert_eq!(connection.len(), 2); + let first = connection.first().expect("missing request").body_json(); + let second = connection.get(1).expect("missing request").body_json(); + + assert_eq!(first["type"].as_str(), Some("response.create")); + assert_eq!( + first["client_metadata"]["x-codex-turn-metadata"].as_str(), + Some(first_turn_metadata) + ); + assert_eq!(second["type"].as_str(), Some("response.append")); + assert_eq!( + second["client_metadata"]["x-codex-turn-metadata"].as_str(), + Some(enriched_turn_metadata) + ); + + let first_metadata: serde_json::Value = + serde_json::from_str(first_turn_metadata).expect("first metadata should be valid json"); + let second_metadata: serde_json::Value = serde_json::from_str(enriched_turn_metadata) + .expect("enriched metadata should be valid json"); + + assert_eq!(first_metadata["turn_id"].as_str(), Some("turn-123")); + assert_eq!(second_metadata["turn_id"].as_str(), Some("turn-123")); + assert_eq!( + second_metadata["workspaces"][0]["has_changes"].as_bool(), + Some(true) + ); + + server.shutdown().await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn responses_websocket_creates_on_prefix_when_previous_completion_cannot_append() { skip_if_no_network!(); @@ -1173,6 +1244,15 @@ async fn stream_until_complete( client_session: &mut ModelClientSession, harness: &WebsocketTestHarness, prompt: &Prompt, +) { + stream_until_complete_with_turn_metadata(client_session, harness, prompt, None).await; +} + +async fn stream_until_complete_with_turn_metadata( + client_session: &mut ModelClientSession, + harness: &WebsocketTestHarness, + prompt: &Prompt, + turn_metadata_header: Option<&str>, ) { let mut stream = client_session .stream( @@ -1181,7 +1261,7 @@ async fn stream_until_complete( &harness.otel_manager, harness.effort, harness.summary, - None, + turn_metadata_header, ) .await .expect("websocket stream failed");