From 20f2a216df3e2d534069438ca7126811de9ff89a Mon Sep 17 00:00:00 2001 From: Owen Lin Date: Wed, 18 Mar 2026 20:41:06 -0700 Subject: [PATCH] feat(core, tracing): create turn spans over websockets (#14632) ## Description Dependent on: - [responsesapi] https://github.com/openai/openai/pull/760991 - [codex-backend] https://github.com/openai/openai/pull/760985 `codex app-server -> codex-backend -> responsesapi` now reuses a persistent websocket connection across many turns. This PR updates tracing when using websockets so that each `response.create` websocket request propagates the current tracing context, so we can get a holistic end-to-end trace for each turn. Tracing is propagated via special keys (`ws_request_header_traceparent`, `ws_request_header_tracestate`) set in the `client_metadata` param in Responses API. Currently tracing on websockets is a bit broken because we only set tracing context on ws connection time, so it's detached from a `turn/start` request. --- codex-rs/Cargo.lock | 5 + codex-rs/codex-api/src/common.rs | 26 ++++ codex-rs/codex-api/src/lib.rs | 3 + codex-rs/core/src/client.rs | 12 +- codex-rs/core/src/codex_tests.rs | 27 +--- codex-rs/core/tests/common/Cargo.toml | 5 + codex-rs/core/tests/common/lib.rs | 1 + codex-rs/core/tests/common/tracing.rs | 26 ++++ .../core/tests/suite/client_websockets.rs | 138 ++++++++++++++++++ 9 files changed, 221 insertions(+), 22 deletions(-) create mode 100644 codex-rs/core/tests/common/tracing.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index a039c60d9..0f03c455c 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -3086,6 +3086,8 @@ dependencies = [ "ctor 0.6.3", "futures", "notify", + "opentelemetry", + "opentelemetry_sdk", "pretty_assertions", "regex-lite", "reqwest", @@ -3094,6 +3096,9 @@ dependencies = [ "tempfile", "tokio", "tokio-tungstenite", + "tracing", + "tracing-opentelemetry", + "tracing-subscriber", "walkdir", "wiremock", "zstd", diff --git a/codex-rs/codex-api/src/common.rs b/codex-rs/codex-api/src/common.rs index 85ac96520..39fb976e6 100644 --- a/codex-rs/codex-api/src/common.rs +++ b/codex-rs/codex-api/src/common.rs @@ -5,6 +5,7 @@ use codex_protocol::models::ResponseItem; use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::protocol::RateLimitSnapshot; use codex_protocol::protocol::TokenUsage; +use codex_protocol::protocol::W3cTraceContext; use futures::Stream; use serde::Deserialize; use serde::Serialize; @@ -15,6 +16,9 @@ use std::task::Context; use std::task::Poll; use tokio::sync::mpsc; +pub const WS_REQUEST_HEADER_TRACEPARENT_CLIENT_METADATA_KEY: &str = "ws_request_header_traceparent"; +pub const WS_REQUEST_HEADER_TRACESTATE_CLIENT_METADATA_KEY: &str = "ws_request_header_tracestate"; + /// Canonical input payload for the compaction endpoint. #[derive(Debug, Clone, Serialize)] pub struct CompactionInput<'a> { @@ -215,6 +219,28 @@ pub struct ResponseCreateWsRequest { pub client_metadata: Option>, } +pub fn response_create_client_metadata( + client_metadata: Option>, + trace: Option<&W3cTraceContext>, +) -> Option> { + let mut client_metadata = client_metadata.unwrap_or_default(); + + if let Some(traceparent) = trace.and_then(|trace| trace.traceparent.as_deref()) { + client_metadata.insert( + WS_REQUEST_HEADER_TRACEPARENT_CLIENT_METADATA_KEY.to_string(), + traceparent.to_string(), + ); + } + if let Some(tracestate) = trace.and_then(|trace| trace.tracestate.as_deref()) { + client_metadata.insert( + WS_REQUEST_HEADER_TRACESTATE_CLIENT_METADATA_KEY.to_string(), + tracestate.to_string(), + ); + } + + (!client_metadata.is_empty()).then_some(client_metadata) +} + #[derive(Debug, Serialize)] #[serde(tag = "type")] #[allow(clippy::large_enum_variant)] diff --git a/codex-rs/codex-api/src/lib.rs b/codex-rs/codex-api/src/lib.rs index a1588a983..865abf8a7 100644 --- a/codex-rs/codex-api/src/lib.rs +++ b/codex-rs/codex-api/src/lib.rs @@ -23,7 +23,10 @@ pub use crate::common::ResponseCreateWsRequest; pub use crate::common::ResponseEvent; pub use crate::common::ResponseStream; pub use crate::common::ResponsesApiRequest; +pub use crate::common::WS_REQUEST_HEADER_TRACEPARENT_CLIENT_METADATA_KEY; +pub use crate::common::WS_REQUEST_HEADER_TRACESTATE_CLIENT_METADATA_KEY; pub use crate::common::create_text_param_for_request; +pub use crate::common::response_create_client_metadata; pub use crate::endpoint::compact::CompactClient; pub use crate::endpoint::memories::MemoriesClient; pub use crate::endpoint::models::ModelsClient; diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index eb5cb4c08..ba71033c3 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -59,7 +59,9 @@ use codex_api::common::ResponsesWsRequest; use codex_api::create_text_param_for_request; use codex_api::error::ApiError; use codex_api::requests::responses::Compression; +use codex_api::response_create_client_metadata; use codex_otel::SessionTelemetry; +use codex_otel::current_span_w3c_trace_context; use codex_protocol::ThreadId; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; @@ -69,6 +71,7 @@ use codex_protocol::models::ResponseItem; use codex_protocol::openai_models::ModelInfo; use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::W3cTraceContext; use eventsource_stream::Event; use eventsource_stream::EventStreamError; use futures::StreamExt; @@ -1099,6 +1102,7 @@ impl ModelClientSession { service_tier: Option, turn_metadata_header: Option<&str>, warmup: bool, + request_trace: Option, ) -> Result { let auth_manager = self.client.state.auth_manager.clone(); @@ -1125,7 +1129,10 @@ impl ModelClientSession { service_tier, )?; let mut ws_payload = ResponseCreateWsRequest { - client_metadata: build_ws_client_metadata(turn_metadata_header), + client_metadata: response_create_client_metadata( + build_ws_client_metadata(turn_metadata_header), + request_trace.as_ref(), + ), ..ResponseCreateWsRequest::from(&request) }; if warmup { @@ -1249,6 +1256,7 @@ impl ModelClientSession { service_tier, turn_metadata_header, /*warmup*/ true, + current_span_w3c_trace_context(), ) .await { @@ -1292,6 +1300,7 @@ impl ModelClientSession { match wire_api { WireApi::Responses => { if self.client.responses_websocket_enabled() { + let request_trace = current_span_w3c_trace_context(); match self .stream_responses_websocket( prompt, @@ -1302,6 +1311,7 @@ impl ModelClientSession { service_tier, turn_metadata_header, /*warmup*/ false, + request_trace, ) .await? { diff --git a/codex-rs/core/src/codex_tests.rs b/codex-rs/core/src/codex_tests.rs index 89d14e37e..ddec552c7 100644 --- a/codex-rs/core/src/codex_tests.rs +++ b/codex-rs/core/src/codex_tests.rs @@ -72,15 +72,13 @@ use codex_protocol::protocol::ConversationAudioParams; use codex_protocol::protocol::RealtimeAudioFrame; use codex_protocol::protocol::Submission; use codex_protocol::protocol::W3cTraceContext; +use core_test_support::tracing::install_test_tracing; use opentelemetry::trace::TraceContextExt; use opentelemetry::trace::TraceId; -use opentelemetry::trace::TracerProvider as _; -use opentelemetry_sdk::trace::SdkTracerProvider; use std::path::Path; use std::time::Duration; use tokio::time::sleep; use tracing_opentelemetry::OpenTelemetrySpanExt; -use tracing_subscriber::prelude::*; use codex_protocol::mcp::CallToolResult as McpCallToolResult; use pretty_assertions::assert_eq; @@ -90,7 +88,6 @@ use serde::Deserialize; use serde_json::json; use std::path::PathBuf; use std::sync::Arc; -use std::sync::Once; use std::time::Duration as StdDuration; #[path = "codex_tests_guardian.rs"] @@ -2031,18 +2028,6 @@ fn text_block(s: &str) -> serde_json::Value { }) } -fn init_test_tracing() { - static INIT: Once = Once::new(); - INIT.call_once(|| { - let provider = SdkTracerProvider::builder().build(); - let tracer = provider.tracer("codex-core-tests"); - let subscriber = - tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer)); - tracing::subscriber::set_global_default(subscriber) - .expect("global tracing subscriber should only be installed once"); - }); -} - async fn build_test_config(codex_home: &Path) -> Config { ConfigBuilder::default() .codex_home(codex_home.to_path_buf()) @@ -2730,7 +2715,7 @@ async fn submit_with_id_captures_current_span_trace_context() { session_loop_termination: completed_session_loop_termination(), }; - init_test_tracing(); + let _trace_test_context = install_test_tracing("codex-core-tests"); let request_parent = W3cTraceContext { traceparent: Some("00-00000000000000000000000000000011-0000000000000022-01".into()), @@ -2766,7 +2751,7 @@ async fn submit_with_id_captures_current_span_trace_context() { async fn new_default_turn_captures_current_span_trace_id() { let (session, _turn_context) = make_session_and_context().await; - init_test_tracing(); + let _trace_test_context = install_test_tracing("codex-core-tests"); let request_parent = W3cTraceContext { traceparent: Some("00-00000000000000000000000000000011-0000000000000022-01".into()), @@ -2801,7 +2786,7 @@ async fn new_default_turn_captures_current_span_trace_id() { #[test] fn submission_dispatch_span_prefers_submission_trace_context() { - init_test_tracing(); + let _trace_test_context = install_test_tracing("codex-core-tests"); let ambient_parent = W3cTraceContext { traceparent: Some("00-00000000000000000000000000000033-0000000000000044-01".into()), @@ -2834,7 +2819,7 @@ fn submission_dispatch_span_prefers_submission_trace_context() { #[test] fn submission_dispatch_span_uses_debug_for_realtime_audio() { - init_test_tracing(); + let _trace_test_context = install_test_tracing("codex-core-tests"); let dispatch_span = submission_dispatch_span(&Submission { id: "sub-1".into(), @@ -2917,7 +2902,7 @@ async fn spawn_task_turn_span_inherits_dispatch_trace_context() { } } - init_test_tracing(); + let _trace_test_context = install_test_tracing("codex-core-tests"); let request_parent = W3cTraceContext { traceparent: Some("00-00000000000000000000000000000011-0000000000000022-01".into()), diff --git a/codex-rs/core/tests/common/Cargo.toml b/codex-rs/core/tests/common/Cargo.toml index a7d35c0de..86ecf2921 100644 --- a/codex-rs/core/tests/common/Cargo.toml +++ b/codex-rs/core/tests/common/Cargo.toml @@ -18,11 +18,16 @@ codex-utils-cargo-bin = { workspace = true } ctor = { workspace = true } futures = { workspace = true } notify = { workspace = true } +opentelemetry = { workspace = true } +opentelemetry_sdk = { workspace = true } regex-lite = { workspace = true } serde_json = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true, features = ["net", "time"] } tokio-tungstenite = { workspace = true } +tracing = { workspace = true } +tracing-opentelemetry = { workspace = true } +tracing-subscriber = { workspace = true } walkdir = { workspace = true } wiremock = { workspace = true } shlex = { workspace = true } diff --git a/codex-rs/core/tests/common/lib.rs b/codex-rs/core/tests/common/lib.rs index 9b592301c..17f949beb 100644 --- a/codex-rs/core/tests/common/lib.rs +++ b/codex-rs/core/tests/common/lib.rs @@ -21,6 +21,7 @@ pub mod responses; pub mod streaming_sse; pub mod test_codex; pub mod test_codex_exec; +pub mod tracing; pub mod zsh_fork; #[ctor] diff --git a/codex-rs/core/tests/common/tracing.rs b/codex-rs/core/tests/common/tracing.rs new file mode 100644 index 000000000..5470e0d31 --- /dev/null +++ b/codex-rs/core/tests/common/tracing.rs @@ -0,0 +1,26 @@ +use opentelemetry::global; +use opentelemetry::trace::TracerProvider as _; +use opentelemetry_sdk::propagation::TraceContextPropagator; +use opentelemetry_sdk::trace::SdkTracerProvider; +use tracing::dispatcher::DefaultGuard; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; + +pub struct TestTracingContext { + _provider: SdkTracerProvider, + _guard: DefaultGuard, +} + +pub fn install_test_tracing(tracer_name: &str) -> TestTracingContext { + global::set_text_map_propagator(TraceContextPropagator::new()); + + let provider = SdkTracerProvider::builder().build(); + let tracer = provider.tracer(tracer_name.to_string()); + let subscriber = + tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer)); + + TestTracingContext { + _provider: provider, + _guard: subscriber.set_default(), + } +} diff --git a/codex-rs/core/tests/suite/client_websockets.rs b/codex-rs/core/tests/suite/client_websockets.rs index 38ef3b682..4416ff108 100755 --- a/codex-rs/core/tests/suite/client_websockets.rs +++ b/codex-rs/core/tests/suite/client_websockets.rs @@ -1,4 +1,6 @@ #![allow(clippy::expect_used, clippy::unwrap_used)] +use codex_api::WS_REQUEST_HEADER_TRACEPARENT_CLIENT_METADATA_KEY; +use codex_api::WS_REQUEST_HEADER_TRACESTATE_CLIENT_METADATA_KEY; use codex_core::CodexAuth; use codex_core::ModelClient; use codex_core::ModelClientSession; @@ -10,6 +12,7 @@ use codex_core::X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER; use codex_core::features::Feature; use codex_otel::SessionTelemetry; use codex_otel::TelemetryAuthMode; +use codex_otel::current_span_w3c_trace_context; use codex_otel::metrics::MetricsClient; use codex_otel::metrics::MetricsConfig; use codex_protocol::ThreadId; @@ -24,6 +27,7 @@ use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::Op; use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::W3cTraceContext; use codex_protocol::user_input::UserInput; use core_test_support::load_default_config_for_test; use core_test_support::responses::WebSocketConnectionConfig; @@ -35,6 +39,7 @@ use core_test_support::responses::start_websocket_server; use core_test_support::responses::start_websocket_server_with_headers; use core_test_support::skip_if_no_network; use core_test_support::test_codex::test_codex; +use core_test_support::tracing::install_test_tracing; use core_test_support::wait_for_event; use futures::StreamExt; use opentelemetry_sdk::metrics::InMemoryMetricExporter; @@ -43,6 +48,7 @@ use serde_json::json; use std::sync::Arc; use std::time::Duration; use tempfile::TempDir; +use tracing::Instrument; use tracing_test::traced_test; const MODEL: &str = "gpt-5.2-codex"; @@ -50,6 +56,32 @@ const OPENAI_BETA_HEADER: &str = "OpenAI-Beta"; const WS_V2_BETA_HEADER_VALUE: &str = "responses_websockets=2026-02-06"; const X_CLIENT_REQUEST_ID_HEADER: &str = "x-client-request-id"; +fn assert_request_trace_matches(body: &serde_json::Value, expected_trace: &W3cTraceContext) { + let client_metadata = body["client_metadata"] + .as_object() + .expect("missing client_metadata payload"); + let actual_traceparent = client_metadata + .get(WS_REQUEST_HEADER_TRACEPARENT_CLIENT_METADATA_KEY) + .and_then(serde_json::Value::as_str) + .expect("missing traceparent"); + let expected_traceparent = expected_trace + .traceparent + .as_deref() + .expect("missing expected traceparent"); + + assert_eq!(actual_traceparent, expected_traceparent); + assert_eq!( + client_metadata + .get(WS_REQUEST_HEADER_TRACESTATE_CLIENT_METADATA_KEY) + .and_then(serde_json::Value::as_str), + expected_trace.tracestate.as_deref() + ); + assert!( + body.get("trace").is_none(), + "top-level trace should not be sent" + ); +} + struct WebsocketTestHarness { _codex_home: TempDir, client: ModelClient, @@ -119,6 +151,112 @@ async fn responses_websocket_streams_without_feature_flag_when_provider_supports server.shutdown().await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn responses_websocket_reuses_connection_with_per_turn_trace_payloads() { + skip_if_no_network!(); + + let _trace_test_context = install_test_tracing("client-websocket-test"); + + let server = start_websocket_server(vec![vec![ + vec![ev_response_created("resp-1"), ev_completed("resp-1")], + vec![ev_response_created("resp-2"), ev_completed("resp-2")], + ]]) + .await; + + let harness = websocket_harness(&server).await; + let prompt_one = prompt_with_input(vec![message_item("hello")]); + let prompt_two = prompt_with_input(vec![message_item("again")]); + + let first_trace = { + let mut client_session = harness.client.new_session(); + async { + let expected_trace = + current_span_w3c_trace_context().expect("current span should have trace context"); + stream_until_complete(&mut client_session, &harness, &prompt_one).await; + expected_trace + } + .instrument(tracing::info_span!("client.websocket.turn_one")) + .await + }; + + let second_trace = { + let mut client_session = harness.client.new_session(); + async { + let expected_trace = + current_span_w3c_trace_context().expect("current span should have trace context"); + stream_until_complete(&mut client_session, &harness, &prompt_two).await; + expected_trace + } + .instrument(tracing::info_span!("client.websocket.turn_two")) + .await + }; + + assert_eq!(server.handshakes().len(), 1); + let connection = server.single_connection(); + assert_eq!(connection.len(), 2); + + let first_request = connection + .first() + .expect("missing first request") + .body_json(); + let second_request = connection + .get(1) + .expect("missing second request") + .body_json(); + assert_request_trace_matches(&first_request, &first_trace); + assert_request_trace_matches(&second_request, &second_trace); + + let first_traceparent = first_request["client_metadata"] + [WS_REQUEST_HEADER_TRACEPARENT_CLIENT_METADATA_KEY] + .as_str() + .expect("missing first traceparent"); + let second_traceparent = second_request["client_metadata"] + [WS_REQUEST_HEADER_TRACEPARENT_CLIENT_METADATA_KEY] + .as_str() + .expect("missing second traceparent"); + assert_ne!(first_traceparent, second_traceparent); + + server.shutdown().await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn responses_websocket_preconnect_does_not_replace_turn_trace_payload() { + skip_if_no_network!(); + + let _trace_test_context = install_test_tracing("client-websocket-test"); + + let server = start_websocket_server(vec![vec![vec![ + ev_response_created("resp-1"), + ev_completed("resp-1"), + ]]]) + .await; + + let harness = websocket_harness(&server).await; + let mut client_session = harness.client.new_session(); + client_session + .preconnect_websocket(&harness.session_telemetry, &harness.model_info) + .await + .expect("websocket preconnect failed"); + let prompt = prompt_with_input(vec![message_item("hello")]); + + let expected_trace = async { + let expected_trace = + current_span_w3c_trace_context().expect("current span should have trace context"); + stream_until_complete(&mut client_session, &harness, &prompt).await; + expected_trace + } + .instrument(tracing::info_span!("client.websocket.request")) + .await; + + assert_eq!(server.handshakes().len(), 1); + let connection = server.single_connection(); + assert_eq!(connection.len(), 1); + let request = connection.first().expect("missing request").body_json(); + assert_request_trace_matches(&request, &expected_trace); + + server.shutdown().await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn responses_websocket_preconnect_reuses_connection() { skip_if_no_network!();