feat(core, tracing): create turn spans over websockets (#14632)
## Description Dependent on: - [responsesapi] https://github.com/openai/openai/pull/760991 - [codex-backend] https://github.com/openai/openai/pull/760985 `codex app-server -> codex-backend -> responsesapi` now reuses a persistent websocket connection across many turns. This PR updates tracing when using websockets so that each `response.create` websocket request propagates the current tracing context, so we can get a holistic end-to-end trace for each turn. Tracing is propagated via special keys (`ws_request_header_traceparent`, `ws_request_header_tracestate`) set in the `client_metadata` param in Responses API. Currently tracing on websockets is a bit broken because we only set tracing context on ws connection time, so it's detached from a `turn/start` request.
This commit is contained in:
parent
903660edba
commit
20f2a216df
9 changed files with 221 additions and 22 deletions
5
codex-rs/Cargo.lock
generated
5
codex-rs/Cargo.lock
generated
|
|
@ -3086,6 +3086,8 @@ dependencies = [
|
|||
"ctor 0.6.3",
|
||||
"futures",
|
||||
"notify",
|
||||
"opentelemetry",
|
||||
"opentelemetry_sdk",
|
||||
"pretty_assertions",
|
||||
"regex-lite",
|
||||
"reqwest",
|
||||
|
|
@ -3094,6 +3096,9 @@ dependencies = [
|
|||
"tempfile",
|
||||
"tokio",
|
||||
"tokio-tungstenite",
|
||||
"tracing",
|
||||
"tracing-opentelemetry",
|
||||
"tracing-subscriber",
|
||||
"walkdir",
|
||||
"wiremock",
|
||||
"zstd",
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ use codex_protocol::models::ResponseItem;
|
|||
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
|
||||
use codex_protocol::protocol::RateLimitSnapshot;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use codex_protocol::protocol::W3cTraceContext;
|
||||
use futures::Stream;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
|
|
@ -15,6 +16,9 @@ use std::task::Context;
|
|||
use std::task::Poll;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
pub const WS_REQUEST_HEADER_TRACEPARENT_CLIENT_METADATA_KEY: &str = "ws_request_header_traceparent";
|
||||
pub const WS_REQUEST_HEADER_TRACESTATE_CLIENT_METADATA_KEY: &str = "ws_request_header_tracestate";
|
||||
|
||||
/// Canonical input payload for the compaction endpoint.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct CompactionInput<'a> {
|
||||
|
|
@ -215,6 +219,28 @@ pub struct ResponseCreateWsRequest {
|
|||
pub client_metadata: Option<HashMap<String, String>>,
|
||||
}
|
||||
|
||||
pub fn response_create_client_metadata(
|
||||
client_metadata: Option<HashMap<String, String>>,
|
||||
trace: Option<&W3cTraceContext>,
|
||||
) -> Option<HashMap<String, String>> {
|
||||
let mut client_metadata = client_metadata.unwrap_or_default();
|
||||
|
||||
if let Some(traceparent) = trace.and_then(|trace| trace.traceparent.as_deref()) {
|
||||
client_metadata.insert(
|
||||
WS_REQUEST_HEADER_TRACEPARENT_CLIENT_METADATA_KEY.to_string(),
|
||||
traceparent.to_string(),
|
||||
);
|
||||
}
|
||||
if let Some(tracestate) = trace.and_then(|trace| trace.tracestate.as_deref()) {
|
||||
client_metadata.insert(
|
||||
WS_REQUEST_HEADER_TRACESTATE_CLIENT_METADATA_KEY.to_string(),
|
||||
tracestate.to_string(),
|
||||
);
|
||||
}
|
||||
|
||||
(!client_metadata.is_empty()).then_some(client_metadata)
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
#[serde(tag = "type")]
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
|
|
|
|||
|
|
@ -23,7 +23,10 @@ pub use crate::common::ResponseCreateWsRequest;
|
|||
pub use crate::common::ResponseEvent;
|
||||
pub use crate::common::ResponseStream;
|
||||
pub use crate::common::ResponsesApiRequest;
|
||||
pub use crate::common::WS_REQUEST_HEADER_TRACEPARENT_CLIENT_METADATA_KEY;
|
||||
pub use crate::common::WS_REQUEST_HEADER_TRACESTATE_CLIENT_METADATA_KEY;
|
||||
pub use crate::common::create_text_param_for_request;
|
||||
pub use crate::common::response_create_client_metadata;
|
||||
pub use crate::endpoint::compact::CompactClient;
|
||||
pub use crate::endpoint::memories::MemoriesClient;
|
||||
pub use crate::endpoint::models::ModelsClient;
|
||||
|
|
|
|||
|
|
@ -59,7 +59,9 @@ use codex_api::common::ResponsesWsRequest;
|
|||
use codex_api::create_text_param_for_request;
|
||||
use codex_api::error::ApiError;
|
||||
use codex_api::requests::responses::Compression;
|
||||
use codex_api::response_create_client_metadata;
|
||||
use codex_otel::SessionTelemetry;
|
||||
use codex_otel::current_span_w3c_trace_context;
|
||||
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
||||
|
|
@ -69,6 +71,7 @@ use codex_protocol::models::ResponseItem;
|
|||
use codex_protocol::openai_models::ModelInfo;
|
||||
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::W3cTraceContext;
|
||||
use eventsource_stream::Event;
|
||||
use eventsource_stream::EventStreamError;
|
||||
use futures::StreamExt;
|
||||
|
|
@ -1099,6 +1102,7 @@ impl ModelClientSession {
|
|||
service_tier: Option<ServiceTier>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
warmup: bool,
|
||||
request_trace: Option<W3cTraceContext>,
|
||||
) -> Result<WebsocketStreamOutcome> {
|
||||
let auth_manager = self.client.state.auth_manager.clone();
|
||||
|
||||
|
|
@ -1125,7 +1129,10 @@ impl ModelClientSession {
|
|||
service_tier,
|
||||
)?;
|
||||
let mut ws_payload = ResponseCreateWsRequest {
|
||||
client_metadata: build_ws_client_metadata(turn_metadata_header),
|
||||
client_metadata: response_create_client_metadata(
|
||||
build_ws_client_metadata(turn_metadata_header),
|
||||
request_trace.as_ref(),
|
||||
),
|
||||
..ResponseCreateWsRequest::from(&request)
|
||||
};
|
||||
if warmup {
|
||||
|
|
@ -1249,6 +1256,7 @@ impl ModelClientSession {
|
|||
service_tier,
|
||||
turn_metadata_header,
|
||||
/*warmup*/ true,
|
||||
current_span_w3c_trace_context(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
|
@ -1292,6 +1300,7 @@ impl ModelClientSession {
|
|||
match wire_api {
|
||||
WireApi::Responses => {
|
||||
if self.client.responses_websocket_enabled() {
|
||||
let request_trace = current_span_w3c_trace_context();
|
||||
match self
|
||||
.stream_responses_websocket(
|
||||
prompt,
|
||||
|
|
@ -1302,6 +1311,7 @@ impl ModelClientSession {
|
|||
service_tier,
|
||||
turn_metadata_header,
|
||||
/*warmup*/ false,
|
||||
request_trace,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
|
|
|
|||
|
|
@ -72,15 +72,13 @@ use codex_protocol::protocol::ConversationAudioParams;
|
|||
use codex_protocol::protocol::RealtimeAudioFrame;
|
||||
use codex_protocol::protocol::Submission;
|
||||
use codex_protocol::protocol::W3cTraceContext;
|
||||
use core_test_support::tracing::install_test_tracing;
|
||||
use opentelemetry::trace::TraceContextExt;
|
||||
use opentelemetry::trace::TraceId;
|
||||
use opentelemetry::trace::TracerProvider as _;
|
||||
use opentelemetry_sdk::trace::SdkTracerProvider;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use tokio::time::sleep;
|
||||
use tracing_opentelemetry::OpenTelemetrySpanExt;
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
use codex_protocol::mcp::CallToolResult as McpCallToolResult;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
|
@ -90,7 +88,6 @@ use serde::Deserialize;
|
|||
use serde_json::json;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Once;
|
||||
use std::time::Duration as StdDuration;
|
||||
|
||||
#[path = "codex_tests_guardian.rs"]
|
||||
|
|
@ -2031,18 +2028,6 @@ fn text_block(s: &str) -> serde_json::Value {
|
|||
})
|
||||
}
|
||||
|
||||
fn init_test_tracing() {
|
||||
static INIT: Once = Once::new();
|
||||
INIT.call_once(|| {
|
||||
let provider = SdkTracerProvider::builder().build();
|
||||
let tracer = provider.tracer("codex-core-tests");
|
||||
let subscriber =
|
||||
tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer));
|
||||
tracing::subscriber::set_global_default(subscriber)
|
||||
.expect("global tracing subscriber should only be installed once");
|
||||
});
|
||||
}
|
||||
|
||||
async fn build_test_config(codex_home: &Path) -> Config {
|
||||
ConfigBuilder::default()
|
||||
.codex_home(codex_home.to_path_buf())
|
||||
|
|
@ -2730,7 +2715,7 @@ async fn submit_with_id_captures_current_span_trace_context() {
|
|||
session_loop_termination: completed_session_loop_termination(),
|
||||
};
|
||||
|
||||
init_test_tracing();
|
||||
let _trace_test_context = install_test_tracing("codex-core-tests");
|
||||
|
||||
let request_parent = W3cTraceContext {
|
||||
traceparent: Some("00-00000000000000000000000000000011-0000000000000022-01".into()),
|
||||
|
|
@ -2766,7 +2751,7 @@ async fn submit_with_id_captures_current_span_trace_context() {
|
|||
async fn new_default_turn_captures_current_span_trace_id() {
|
||||
let (session, _turn_context) = make_session_and_context().await;
|
||||
|
||||
init_test_tracing();
|
||||
let _trace_test_context = install_test_tracing("codex-core-tests");
|
||||
|
||||
let request_parent = W3cTraceContext {
|
||||
traceparent: Some("00-00000000000000000000000000000011-0000000000000022-01".into()),
|
||||
|
|
@ -2801,7 +2786,7 @@ async fn new_default_turn_captures_current_span_trace_id() {
|
|||
|
||||
#[test]
|
||||
fn submission_dispatch_span_prefers_submission_trace_context() {
|
||||
init_test_tracing();
|
||||
let _trace_test_context = install_test_tracing("codex-core-tests");
|
||||
|
||||
let ambient_parent = W3cTraceContext {
|
||||
traceparent: Some("00-00000000000000000000000000000033-0000000000000044-01".into()),
|
||||
|
|
@ -2834,7 +2819,7 @@ fn submission_dispatch_span_prefers_submission_trace_context() {
|
|||
|
||||
#[test]
|
||||
fn submission_dispatch_span_uses_debug_for_realtime_audio() {
|
||||
init_test_tracing();
|
||||
let _trace_test_context = install_test_tracing("codex-core-tests");
|
||||
|
||||
let dispatch_span = submission_dispatch_span(&Submission {
|
||||
id: "sub-1".into(),
|
||||
|
|
@ -2917,7 +2902,7 @@ async fn spawn_task_turn_span_inherits_dispatch_trace_context() {
|
|||
}
|
||||
}
|
||||
|
||||
init_test_tracing();
|
||||
let _trace_test_context = install_test_tracing("codex-core-tests");
|
||||
|
||||
let request_parent = W3cTraceContext {
|
||||
traceparent: Some("00-00000000000000000000000000000011-0000000000000022-01".into()),
|
||||
|
|
|
|||
|
|
@ -18,11 +18,16 @@ codex-utils-cargo-bin = { workspace = true }
|
|||
ctor = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
notify = { workspace = true }
|
||||
opentelemetry = { workspace = true }
|
||||
opentelemetry_sdk = { workspace = true }
|
||||
regex-lite = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
tokio = { workspace = true, features = ["net", "time"] }
|
||||
tokio-tungstenite = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-opentelemetry = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
walkdir = { workspace = true }
|
||||
wiremock = { workspace = true }
|
||||
shlex = { workspace = true }
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ pub mod responses;
|
|||
pub mod streaming_sse;
|
||||
pub mod test_codex;
|
||||
pub mod test_codex_exec;
|
||||
pub mod tracing;
|
||||
pub mod zsh_fork;
|
||||
|
||||
#[ctor]
|
||||
|
|
|
|||
26
codex-rs/core/tests/common/tracing.rs
Normal file
26
codex-rs/core/tests/common/tracing.rs
Normal file
|
|
@ -0,0 +1,26 @@
|
|||
use opentelemetry::global;
|
||||
use opentelemetry::trace::TracerProvider as _;
|
||||
use opentelemetry_sdk::propagation::TraceContextPropagator;
|
||||
use opentelemetry_sdk::trace::SdkTracerProvider;
|
||||
use tracing::dispatcher::DefaultGuard;
|
||||
use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::util::SubscriberInitExt;
|
||||
|
||||
pub struct TestTracingContext {
|
||||
_provider: SdkTracerProvider,
|
||||
_guard: DefaultGuard,
|
||||
}
|
||||
|
||||
pub fn install_test_tracing(tracer_name: &str) -> TestTracingContext {
|
||||
global::set_text_map_propagator(TraceContextPropagator::new());
|
||||
|
||||
let provider = SdkTracerProvider::builder().build();
|
||||
let tracer = provider.tracer(tracer_name.to_string());
|
||||
let subscriber =
|
||||
tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer));
|
||||
|
||||
TestTracingContext {
|
||||
_provider: provider,
|
||||
_guard: subscriber.set_default(),
|
||||
}
|
||||
}
|
||||
|
|
@ -1,4 +1,6 @@
|
|||
#![allow(clippy::expect_used, clippy::unwrap_used)]
|
||||
use codex_api::WS_REQUEST_HEADER_TRACEPARENT_CLIENT_METADATA_KEY;
|
||||
use codex_api::WS_REQUEST_HEADER_TRACESTATE_CLIENT_METADATA_KEY;
|
||||
use codex_core::CodexAuth;
|
||||
use codex_core::ModelClient;
|
||||
use codex_core::ModelClientSession;
|
||||
|
|
@ -10,6 +12,7 @@ use codex_core::X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER;
|
|||
use codex_core::features::Feature;
|
||||
use codex_otel::SessionTelemetry;
|
||||
use codex_otel::TelemetryAuthMode;
|
||||
use codex_otel::current_span_w3c_trace_context;
|
||||
use codex_otel::metrics::MetricsClient;
|
||||
use codex_otel::metrics::MetricsConfig;
|
||||
use codex_protocol::ThreadId;
|
||||
|
|
@ -24,6 +27,7 @@ use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
|
|||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::W3cTraceContext;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::load_default_config_for_test;
|
||||
use core_test_support::responses::WebSocketConnectionConfig;
|
||||
|
|
@ -35,6 +39,7 @@ use core_test_support::responses::start_websocket_server;
|
|||
use core_test_support::responses::start_websocket_server_with_headers;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::tracing::install_test_tracing;
|
||||
use core_test_support::wait_for_event;
|
||||
use futures::StreamExt;
|
||||
use opentelemetry_sdk::metrics::InMemoryMetricExporter;
|
||||
|
|
@ -43,6 +48,7 @@ use serde_json::json;
|
|||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tempfile::TempDir;
|
||||
use tracing::Instrument;
|
||||
use tracing_test::traced_test;
|
||||
|
||||
const MODEL: &str = "gpt-5.2-codex";
|
||||
|
|
@ -50,6 +56,32 @@ const OPENAI_BETA_HEADER: &str = "OpenAI-Beta";
|
|||
const WS_V2_BETA_HEADER_VALUE: &str = "responses_websockets=2026-02-06";
|
||||
const X_CLIENT_REQUEST_ID_HEADER: &str = "x-client-request-id";
|
||||
|
||||
fn assert_request_trace_matches(body: &serde_json::Value, expected_trace: &W3cTraceContext) {
|
||||
let client_metadata = body["client_metadata"]
|
||||
.as_object()
|
||||
.expect("missing client_metadata payload");
|
||||
let actual_traceparent = client_metadata
|
||||
.get(WS_REQUEST_HEADER_TRACEPARENT_CLIENT_METADATA_KEY)
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.expect("missing traceparent");
|
||||
let expected_traceparent = expected_trace
|
||||
.traceparent
|
||||
.as_deref()
|
||||
.expect("missing expected traceparent");
|
||||
|
||||
assert_eq!(actual_traceparent, expected_traceparent);
|
||||
assert_eq!(
|
||||
client_metadata
|
||||
.get(WS_REQUEST_HEADER_TRACESTATE_CLIENT_METADATA_KEY)
|
||||
.and_then(serde_json::Value::as_str),
|
||||
expected_trace.tracestate.as_deref()
|
||||
);
|
||||
assert!(
|
||||
body.get("trace").is_none(),
|
||||
"top-level trace should not be sent"
|
||||
);
|
||||
}
|
||||
|
||||
struct WebsocketTestHarness {
|
||||
_codex_home: TempDir,
|
||||
client: ModelClient,
|
||||
|
|
@ -119,6 +151,112 @@ async fn responses_websocket_streams_without_feature_flag_when_provider_supports
|
|||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_reuses_connection_with_per_turn_trace_payloads() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let _trace_test_context = install_test_tracing("client-websocket-test");
|
||||
|
||||
let server = start_websocket_server(vec![vec![
|
||||
vec![ev_response_created("resp-1"), ev_completed("resp-1")],
|
||||
vec![ev_response_created("resp-2"), ev_completed("resp-2")],
|
||||
]])
|
||||
.await;
|
||||
|
||||
let harness = websocket_harness(&server).await;
|
||||
let prompt_one = prompt_with_input(vec![message_item("hello")]);
|
||||
let prompt_two = prompt_with_input(vec![message_item("again")]);
|
||||
|
||||
let first_trace = {
|
||||
let mut client_session = harness.client.new_session();
|
||||
async {
|
||||
let expected_trace =
|
||||
current_span_w3c_trace_context().expect("current span should have trace context");
|
||||
stream_until_complete(&mut client_session, &harness, &prompt_one).await;
|
||||
expected_trace
|
||||
}
|
||||
.instrument(tracing::info_span!("client.websocket.turn_one"))
|
||||
.await
|
||||
};
|
||||
|
||||
let second_trace = {
|
||||
let mut client_session = harness.client.new_session();
|
||||
async {
|
||||
let expected_trace =
|
||||
current_span_w3c_trace_context().expect("current span should have trace context");
|
||||
stream_until_complete(&mut client_session, &harness, &prompt_two).await;
|
||||
expected_trace
|
||||
}
|
||||
.instrument(tracing::info_span!("client.websocket.turn_two"))
|
||||
.await
|
||||
};
|
||||
|
||||
assert_eq!(server.handshakes().len(), 1);
|
||||
let connection = server.single_connection();
|
||||
assert_eq!(connection.len(), 2);
|
||||
|
||||
let first_request = connection
|
||||
.first()
|
||||
.expect("missing first request")
|
||||
.body_json();
|
||||
let second_request = connection
|
||||
.get(1)
|
||||
.expect("missing second request")
|
||||
.body_json();
|
||||
assert_request_trace_matches(&first_request, &first_trace);
|
||||
assert_request_trace_matches(&second_request, &second_trace);
|
||||
|
||||
let first_traceparent = first_request["client_metadata"]
|
||||
[WS_REQUEST_HEADER_TRACEPARENT_CLIENT_METADATA_KEY]
|
||||
.as_str()
|
||||
.expect("missing first traceparent");
|
||||
let second_traceparent = second_request["client_metadata"]
|
||||
[WS_REQUEST_HEADER_TRACEPARENT_CLIENT_METADATA_KEY]
|
||||
.as_str()
|
||||
.expect("missing second traceparent");
|
||||
assert_ne!(first_traceparent, second_traceparent);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_preconnect_does_not_replace_turn_trace_payload() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let _trace_test_context = install_test_tracing("client-websocket-test");
|
||||
|
||||
let server = start_websocket_server(vec![vec![vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_completed("resp-1"),
|
||||
]]])
|
||||
.await;
|
||||
|
||||
let harness = websocket_harness(&server).await;
|
||||
let mut client_session = harness.client.new_session();
|
||||
client_session
|
||||
.preconnect_websocket(&harness.session_telemetry, &harness.model_info)
|
||||
.await
|
||||
.expect("websocket preconnect failed");
|
||||
let prompt = prompt_with_input(vec![message_item("hello")]);
|
||||
|
||||
let expected_trace = async {
|
||||
let expected_trace =
|
||||
current_span_w3c_trace_context().expect("current span should have trace context");
|
||||
stream_until_complete(&mut client_session, &harness, &prompt).await;
|
||||
expected_trace
|
||||
}
|
||||
.instrument(tracing::info_span!("client.websocket.request"))
|
||||
.await;
|
||||
|
||||
assert_eq!(server.handshakes().len(), 1);
|
||||
let connection = server.single_connection();
|
||||
assert_eq!(connection.len(), 1);
|
||||
let request = connection.first().expect("missing request").body_json();
|
||||
assert_request_trace_matches(&request, &expected_trace);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_preconnect_reuses_connection() {
|
||||
skip_if_no_network!();
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue