diff --git a/codex-rs/app-server-test-client/src/lib.rs b/codex-rs/app-server-test-client/src/lib.rs index 3aaa0d1fe..14ca0cff5 100644 --- a/codex-rs/app-server-test-client/src/lib.rs +++ b/codex-rs/app-server-test-client/src/lib.rs @@ -69,8 +69,8 @@ use codex_app_server_protocol::TurnStartResponse; use codex_app_server_protocol::TurnStatus; use codex_app_server_protocol::UserInput as V2UserInput; use codex_core::config::Config; +use codex_otel::OtelProvider; use codex_otel::current_span_w3c_trace_context; -use codex_otel::otel_provider::OtelProvider; use codex_protocol::openai_models::ReasoningEffort; use codex_protocol::protocol::W3cTraceContext; use codex_utils_cli::CliConfigOverrides; diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 8ad14b8e1..887c2c650 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -513,7 +513,6 @@ pub async fn run_main_with_transport( .map(|layer| layer.with_filter(Targets::new().with_default(Level::TRACE))); let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer()); let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer()); - let _ = tracing_subscriber::registry() .with(stderr_fmt) .with(feedback_layer) @@ -826,6 +825,10 @@ pub async fn run_main_with_transport( let _ = handle.await; } + if let Some(otel) = otel { + otel.shutdown(); + } + Ok(()) } diff --git a/codex-rs/core/src/otel_init.rs b/codex-rs/core/src/otel_init.rs index 2db80e8d4..74e30ef82 100644 --- a/codex-rs/core/src/otel_init.rs +++ b/codex-rs/core/src/otel_init.rs @@ -3,11 +3,11 @@ use crate::config::types::OtelExporterKind as Kind; use crate::config::types::OtelHttpProtocol as Protocol; use crate::default_client::originator; use crate::features::Feature; +use codex_otel::OtelProvider; use codex_otel::config::OtelExporter; use codex_otel::config::OtelHttpProtocol; use codex_otel::config::OtelSettings; use codex_otel::config::OtelTlsConfig as OtelTlsSettings; -use codex_otel::otel_provider::OtelProvider; use std::error::Error; /// Build an OpenTelemetry provider from the app Config. diff --git a/codex-rs/otel/Cargo.toml b/codex-rs/otel/Cargo.toml index 0fa14ff54..154c305ac 100644 --- a/codex-rs/otel/Cargo.toml +++ b/codex-rs/otel/Cargo.toml @@ -43,6 +43,7 @@ opentelemetry-otlp = { workspace = true, features = [ ]} opentelemetry-semantic-conventions = { workspace = true } opentelemetry_sdk = { workspace = true, features = [ + "experimental_trace_batch_span_processor_with_async_runtime", "experimental_metrics_custom_reader", "logs", "metrics", diff --git a/codex-rs/otel/README.md b/codex-rs/otel/README.md index be90d6141..3739f5f02 100644 --- a/codex-rs/otel/README.md +++ b/codex-rs/otel/README.md @@ -2,8 +2,8 @@ `codex-otel` is the OpenTelemetry integration crate for Codex. It provides: -- Provider wiring for log/trace/metric exporters (`codex_otel::OtelProvider`, - `codex_otel::provider`, and the compatibility shim `codex_otel::otel_provider`). +- Provider wiring for log/trace/metric exporters (`codex_otel::OtelProvider` + and `codex_otel::provider`). - Session-scoped business event emission via `codex_otel::SessionTelemetry`. - Low-level metrics APIs via `codex_otel::metrics`. - Trace-context helpers via `codex_otel::trace_context` and crate-root re-exports. diff --git a/codex-rs/otel/src/lib.rs b/codex-rs/otel/src/lib.rs index 5a4ba31e4..cd1bbe5ce 100644 --- a/codex-rs/otel/src/lib.rs +++ b/codex-rs/otel/src/lib.rs @@ -1,7 +1,6 @@ pub mod config; mod events; pub mod metrics; -pub mod otel_provider; pub mod provider; pub mod trace_context; @@ -24,6 +23,7 @@ pub use crate::trace_context::current_span_trace_id; pub use crate::trace_context::current_span_w3c_trace_context; pub use crate::trace_context::set_parent_from_context; pub use crate::trace_context::set_parent_from_w3c_trace_context; +pub use crate::trace_context::span_w3c_trace_context; pub use crate::trace_context::traceparent_context_from_env; pub use codex_utils_string::sanitize_metric_tag_value; diff --git a/codex-rs/otel/src/otel_provider.rs b/codex-rs/otel/src/otel_provider.rs deleted file mode 100644 index 97db9ee8d..000000000 --- a/codex-rs/otel/src/otel_provider.rs +++ /dev/null @@ -1,4 +0,0 @@ -//! Compatibility shim for `codex_otel::otel_provider`. - -pub use crate::provider::*; -pub use crate::trace_context::traceparent_context_from_env; diff --git a/codex-rs/otel/src/otlp.rs b/codex-rs/otel/src/otlp.rs index c70e5e55e..f098542d5 100644 --- a/codex-rs/otel/src/otlp.rs +++ b/codex-rs/otel/src/otlp.rs @@ -75,13 +75,29 @@ pub(crate) fn build_http_client( tls: &OtelTlsConfig, timeout_var: &str, ) -> Result> { - if tokio::runtime::Handle::try_current().is_ok() { + if current_tokio_runtime_is_multi_thread() { tokio::task::block_in_place(|| build_http_client_inner(tls, timeout_var)) + } else if tokio::runtime::Handle::try_current().is_ok() { + let tls = tls.clone(); + let timeout_var = timeout_var.to_string(); + std::thread::spawn(move || { + build_http_client_inner(&tls, &timeout_var).map_err(|err| err.to_string()) + }) + .join() + .map_err(|_| config_error("failed to join OTLP blocking HTTP client builder thread"))? + .map_err(config_error) } else { build_http_client_inner(tls, timeout_var) } } +pub(crate) fn current_tokio_runtime_is_multi_thread() -> bool { + match tokio::runtime::Handle::try_current() { + Ok(handle) => handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread, + Err(_) => false, + } +} + fn build_http_client_inner( tls: &OtelTlsConfig, timeout_var: &str, @@ -129,6 +145,54 @@ fn build_http_client_inner( .map_err(|error| Box::new(error) as Box) } +pub(crate) fn build_async_http_client( + tls: Option<&OtelTlsConfig>, + timeout_var: &str, +) -> Result> { + let mut builder = reqwest::Client::builder().timeout(resolve_otlp_timeout(timeout_var)); + + if let Some(tls) = tls { + if let Some(path) = tls.ca_certificate.as_ref() { + let (pem, location) = read_bytes(path)?; + let certificate = ReqwestCertificate::from_pem(pem.as_slice()).map_err(|error| { + config_error(format!( + "failed to parse certificate {}: {error}", + location.display() + )) + })?; + builder = builder + .tls_built_in_root_certs(false) + .add_root_certificate(certificate); + } + + match (&tls.client_certificate, &tls.client_private_key) { + (Some(cert_path), Some(key_path)) => { + let (mut cert_pem, cert_location) = read_bytes(cert_path)?; + let (key_pem, key_location) = read_bytes(key_path)?; + cert_pem.extend_from_slice(key_pem.as_slice()); + let identity = ReqwestIdentity::from_pem(cert_pem.as_slice()).map_err(|error| { + config_error(format!( + "failed to parse client identity using {} and {}: {error}", + cert_location.display(), + key_location.display() + )) + })?; + builder = builder.identity(identity).https_only(true); + } + (Some(_), None) | (None, Some(_)) => { + return Err(config_error( + "client_certificate and client_private_key must both be provided for mTLS", + )); + } + (None, None) => {} + } + } + + builder + .build() + .map_err(|error| Box::new(error) as Box) +} + pub(crate) fn resolve_otlp_timeout(signal_var: &str) -> Duration { if let Some(timeout) = read_timeout_env(signal_var) { return timeout; @@ -161,3 +225,48 @@ fn read_bytes(path: &AbsolutePathBuf) -> Result<(Vec, PathBuf), Box) -> Box { Box::new(io::Error::new(ErrorKind::InvalidData, message.into())) } + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + use tokio::runtime::Builder; + + #[test] + fn current_tokio_runtime_is_multi_thread_detects_runtime_flavor() { + assert!(!current_tokio_runtime_is_multi_thread()); + + let current_thread_runtime = Builder::new_current_thread() + .enable_all() + .build() + .expect("current-thread runtime"); + assert_eq!( + current_thread_runtime.block_on(async { current_tokio_runtime_is_multi_thread() }), + false + ); + + let multi_thread_runtime = Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build() + .expect("multi-thread runtime"); + assert_eq!( + multi_thread_runtime.block_on(async { current_tokio_runtime_is_multi_thread() }), + true + ); + } + + #[test] + fn build_http_client_works_in_current_thread_runtime() { + let runtime = Builder::new_current_thread() + .enable_all() + .build() + .expect("current-thread runtime"); + + let client = runtime.block_on(async { + build_http_client(&OtelTlsConfig::default(), OTEL_EXPORTER_OTLP_TIMEOUT) + }); + + assert!(client.is_ok()); + } +} diff --git a/codex-rs/otel/src/provider.rs b/codex-rs/otel/src/provider.rs index dad09156a..6227e33bd 100644 --- a/codex-rs/otel/src/provider.rs +++ b/codex-rs/otel/src/provider.rs @@ -23,9 +23,11 @@ use opentelemetry_otlp::tonic_types::transport::ClientTlsConfig; use opentelemetry_sdk::Resource; use opentelemetry_sdk::logs::SdkLoggerProvider; use opentelemetry_sdk::propagation::TraceContextPropagator; +use opentelemetry_sdk::runtime; use opentelemetry_sdk::trace::BatchSpanProcessor; use opentelemetry_sdk::trace::SdkTracerProvider; use opentelemetry_sdk::trace::Tracer; +use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor as TokioBatchSpanProcessor; use opentelemetry_semantic_conventions as semconv; use std::error::Error; use tracing::debug; @@ -50,15 +52,16 @@ pub struct OtelProvider { impl OtelProvider { pub fn shutdown(&self) { - if let Some(logger) = &self.logger { - let _ = logger.shutdown(); - } if let Some(tracer_provider) = &self.tracer_provider { + let _ = tracer_provider.force_flush(); let _ = tracer_provider.shutdown(); } if let Some(metrics) = &self.metrics { let _ = metrics.shutdown(); } + if let Some(logger) = &self.logger { + let _ = logger.shutdown(); + } } pub fn from(settings: &OtelSettings) -> Result, Box> { @@ -159,15 +162,16 @@ impl OtelProvider { impl Drop for OtelProvider { fn drop(&mut self) { - if let Some(logger) = &self.logger { - let _ = logger.shutdown(); - } if let Some(tracer_provider) = &self.tracer_provider { + let _ = tracer_provider.force_flush(); let _ = tracer_provider.shutdown(); } if let Some(metrics) = &self.metrics { let _ = metrics.shutdown(); } + if let Some(logger) = &self.logger { + let _ = logger.shutdown(); + } } } @@ -321,6 +325,34 @@ fn build_tracer_provider( } => { debug!("Using OTLP Http exporter for traces: {endpoint}"); + if crate::otlp::current_tokio_runtime_is_multi_thread() { + let protocol = match protocol { + OtelHttpProtocol::Binary => Protocol::HttpBinary, + OtelHttpProtocol::Json => Protocol::HttpJson, + }; + + let mut exporter_builder = SpanExporter::builder() + .with_http() + .with_endpoint(endpoint) + .with_protocol(protocol) + .with_headers(headers); + + let client = crate::otlp::build_async_http_client( + tls.as_ref(), + OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, + )?; + exporter_builder = exporter_builder.with_http_client(client); + + let processor = + TokioBatchSpanProcessor::builder(exporter_builder.build()?, runtime::Tokio) + .build(); + + return Ok(SdkTracerProvider::builder() + .with_resource(resource.clone()) + .with_span_processor(processor) + .build()); + } + let protocol = match protocol { OtelHttpProtocol::Binary => Protocol::HttpBinary, OtelHttpProtocol::Json => Protocol::HttpJson, diff --git a/codex-rs/otel/src/trace_context.rs b/codex-rs/otel/src/trace_context.rs index 913bbb205..b2a57a951 100644 --- a/codex-rs/otel/src/trace_context.rs +++ b/codex-rs/otel/src/trace_context.rs @@ -17,7 +17,11 @@ const TRACESTATE_ENV_VAR: &str = "TRACESTATE"; static TRACEPARENT_CONTEXT: OnceLock> = OnceLock::new(); pub fn current_span_w3c_trace_context() -> Option { - let context = Span::current().context(); + span_w3c_trace_context(&Span::current()) +} + +pub fn span_w3c_trace_context(span: &Span) -> Option { + let context = span.context(); if !context.span().span_context().is_valid() { return None; } diff --git a/codex-rs/otel/tests/suite/otel_export_routing_policy.rs b/codex-rs/otel/tests/suite/otel_export_routing_policy.rs index 75d9bde83..317c6a691 100644 --- a/codex-rs/otel/tests/suite/otel_export_routing_policy.rs +++ b/codex-rs/otel/tests/suite/otel_export_routing_policy.rs @@ -1,6 +1,6 @@ +use codex_otel::OtelProvider; use codex_otel::SessionTelemetry; use codex_otel::TelemetryAuthMode; -use codex_otel::otel_provider::OtelProvider; use opentelemetry::KeyValue; use opentelemetry::logs::AnyValue; use opentelemetry::trace::TracerProvider as _; diff --git a/codex-rs/otel/tests/suite/otlp_http_loopback.rs b/codex-rs/otel/tests/suite/otlp_http_loopback.rs index 0a9b1f390..c3bb042fe 100644 --- a/codex-rs/otel/tests/suite/otlp_http_loopback.rs +++ b/codex-rs/otel/tests/suite/otlp_http_loopback.rs @@ -1,5 +1,7 @@ +use codex_otel::OtelProvider; use codex_otel::config::OtelExporter; use codex_otel::config::OtelHttpProtocol; +use codex_otel::config::OtelSettings; use codex_otel::metrics::MetricsClient; use codex_otel::metrics::MetricsConfig; use codex_otel::metrics::Result; @@ -8,10 +10,12 @@ use std::io::Read as _; use std::io::Write as _; use std::net::TcpListener; use std::net::TcpStream; +use std::path::PathBuf; use std::sync::mpsc; use std::thread; use std::time::Duration; use std::time::Instant; +use tracing_subscriber::layer::SubscriberExt; struct CapturedRequest { path: String, @@ -212,3 +216,346 @@ fn otlp_http_exporter_sends_metrics_to_collector() -> Result<()> { Ok(()) } + +#[test] +fn otlp_http_exporter_sends_traces_to_collector() +-> std::result::Result<(), Box> { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind"); + let addr = listener.local_addr().expect("local_addr"); + listener.set_nonblocking(true).expect("set_nonblocking"); + + let (tx, rx) = mpsc::channel::>(); + let server = thread::spawn(move || { + let mut captured = Vec::new(); + let deadline = Instant::now() + Duration::from_secs(3); + + while Instant::now() < deadline { + match listener.accept() { + Ok((mut stream, _)) => { + let result = read_http_request(&mut stream); + let _ = write_http_response(&mut stream, "202 Accepted"); + if let Ok((path, headers, body)) = result { + captured.push(CapturedRequest { + path, + content_type: headers.get("content-type").cloned(), + body, + }); + } + } + Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { + thread::sleep(Duration::from_millis(10)); + } + Err(_) => break, + } + } + + let _ = tx.send(captured); + }); + + let otel = OtelProvider::from(&OtelSettings { + environment: "test".to_string(), + service_name: "codex-cli".to_string(), + service_version: env!("CARGO_PKG_VERSION").to_string(), + codex_home: PathBuf::from("."), + exporter: OtelExporter::None, + trace_exporter: OtelExporter::OtlpHttp { + endpoint: format!("http://{addr}/v1/traces"), + headers: HashMap::new(), + protocol: OtelHttpProtocol::Json, + tls: None, + }, + metrics_exporter: OtelExporter::None, + runtime_metrics: false, + })? + .expect("otel provider"); + let tracing_layer = otel.tracing_layer().expect("tracing layer"); + let subscriber = tracing_subscriber::registry().with(tracing_layer); + + tracing::subscriber::with_default(subscriber, || { + let span = tracing::info_span!( + "trace-loopback", + otel.name = "trace-loopback", + otel.kind = "server", + rpc.system = "jsonrpc", + rpc.method = "trace-loopback", + ); + let _guard = span.enter(); + tracing::info!("trace loopback event"); + }); + otel.shutdown(); + + server.join().expect("server join"); + let captured = rx.recv_timeout(Duration::from_secs(1)).expect("captured"); + + let request = captured + .iter() + .find(|req| req.path == "/v1/traces") + .unwrap_or_else(|| { + let paths = captured + .iter() + .map(|req| req.path.as_str()) + .collect::>() + .join(", "); + panic!( + "missing /v1/traces request; got {}: {paths}", + captured.len() + ); + }); + let content_type = request + .content_type + .as_deref() + .unwrap_or(""); + assert!( + content_type.starts_with("application/json"), + "unexpected content-type: {content_type}" + ); + + let body = String::from_utf8_lossy(&request.body); + assert!( + body.contains("trace-loopback"), + "expected span name not found; body prefix: {}", + &body.chars().take(2000).collect::() + ); + assert!( + body.contains("codex-cli"), + "expected service name not found; body prefix: {}", + &body.chars().take(2000).collect::() + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn otlp_http_exporter_sends_traces_to_collector_in_tokio_runtime() +-> std::result::Result<(), Box> { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind"); + let addr = listener.local_addr().expect("local_addr"); + listener.set_nonblocking(true).expect("set_nonblocking"); + + let (tx, rx) = mpsc::channel::>(); + let server = thread::spawn(move || { + let mut captured = Vec::new(); + let deadline = Instant::now() + Duration::from_secs(3); + + while Instant::now() < deadline { + match listener.accept() { + Ok((mut stream, _)) => { + let result = read_http_request(&mut stream); + let _ = write_http_response(&mut stream, "202 Accepted"); + if let Ok((path, headers, body)) = result { + captured.push(CapturedRequest { + path, + content_type: headers.get("content-type").cloned(), + body, + }); + } + } + Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { + thread::sleep(Duration::from_millis(10)); + } + Err(_) => break, + } + } + + let _ = tx.send(captured); + }); + + let otel = OtelProvider::from(&OtelSettings { + environment: "test".to_string(), + service_name: "codex-cli".to_string(), + service_version: env!("CARGO_PKG_VERSION").to_string(), + codex_home: PathBuf::from("."), + exporter: OtelExporter::None, + trace_exporter: OtelExporter::OtlpHttp { + endpoint: format!("http://{addr}/v1/traces"), + headers: HashMap::new(), + protocol: OtelHttpProtocol::Json, + tls: None, + }, + metrics_exporter: OtelExporter::None, + runtime_metrics: false, + })? + .expect("otel provider"); + let tracing_layer = otel.tracing_layer().expect("tracing layer"); + let subscriber = tracing_subscriber::registry().with(tracing_layer); + + tracing::subscriber::with_default(subscriber, || { + let span = tracing::info_span!( + "trace-loopback-tokio", + otel.name = "trace-loopback-tokio", + otel.kind = "server", + rpc.system = "jsonrpc", + rpc.method = "trace-loopback-tokio", + ); + let _guard = span.enter(); + tracing::info!("trace loopback event from tokio runtime"); + }); + otel.shutdown(); + + server.join().expect("server join"); + let captured = rx.recv_timeout(Duration::from_secs(1)).expect("captured"); + + let request = captured + .iter() + .find(|req| req.path == "/v1/traces") + .unwrap_or_else(|| { + let paths = captured + .iter() + .map(|req| req.path.as_str()) + .collect::>() + .join(", "); + panic!( + "missing /v1/traces request; got {}: {paths}", + captured.len() + ); + }); + let content_type = request + .content_type + .as_deref() + .unwrap_or(""); + assert!( + content_type.starts_with("application/json"), + "unexpected content-type: {content_type}" + ); + + let body = String::from_utf8_lossy(&request.body); + assert!( + body.contains("trace-loopback-tokio"), + "expected span name not found; body prefix: {}", + &body.chars().take(2000).collect::() + ); + assert!( + body.contains("codex-cli"), + "expected service name not found; body prefix: {}", + &body.chars().take(2000).collect::() + ); + + Ok(()) +} + +#[test] +fn otlp_http_exporter_sends_traces_to_collector_in_current_thread_tokio_runtime() +-> std::result::Result<(), Box> { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind"); + let addr = listener.local_addr().expect("local_addr"); + listener.set_nonblocking(true).expect("set_nonblocking"); + + let (tx, rx) = mpsc::channel::>(); + let server = thread::spawn(move || { + let mut captured = Vec::new(); + let deadline = Instant::now() + Duration::from_secs(3); + + while Instant::now() < deadline { + match listener.accept() { + Ok((mut stream, _)) => { + let result = read_http_request(&mut stream); + let _ = write_http_response(&mut stream, "202 Accepted"); + if let Ok((path, headers, body)) = result { + captured.push(CapturedRequest { + path, + content_type: headers.get("content-type").cloned(), + body, + }); + } + } + Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { + thread::sleep(Duration::from_millis(10)); + } + Err(_) => break, + } + } + + let _ = tx.send(captured); + }); + + let (runtime_result_tx, runtime_result_rx) = mpsc::channel::>(); + let runtime_thread = thread::spawn(move || { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("current-thread runtime"); + + let result = runtime.block_on(async move { + let otel = OtelProvider::from(&OtelSettings { + environment: "test".to_string(), + service_name: "codex-cli".to_string(), + service_version: env!("CARGO_PKG_VERSION").to_string(), + codex_home: PathBuf::from("."), + exporter: OtelExporter::None, + trace_exporter: OtelExporter::OtlpHttp { + endpoint: format!("http://{addr}/v1/traces"), + headers: HashMap::new(), + protocol: OtelHttpProtocol::Json, + tls: None, + }, + metrics_exporter: OtelExporter::None, + runtime_metrics: false, + }) + .map_err(|err| err.to_string())? + .expect("otel provider"); + let tracing_layer = otel.tracing_layer().expect("tracing layer"); + let subscriber = tracing_subscriber::registry().with(tracing_layer); + + tracing::subscriber::with_default(subscriber, || { + let span = tracing::info_span!( + "trace-loopback-current-thread", + otel.name = "trace-loopback-current-thread", + otel.kind = "server", + rpc.system = "jsonrpc", + rpc.method = "trace-loopback-current-thread", + ); + let _guard = span.enter(); + tracing::info!("trace loopback event from current-thread tokio runtime"); + }); + otel.shutdown(); + Ok::<(), String>(()) + }); + let _ = runtime_result_tx.send(result); + }); + + runtime_result_rx + .recv_timeout(Duration::from_secs(5)) + .expect("current-thread runtime should complete") + .map_err(std::io::Error::other)?; + runtime_thread.join().expect("runtime thread"); + + server.join().expect("server join"); + let captured = rx.recv_timeout(Duration::from_secs(1)).expect("captured"); + + let request = captured + .iter() + .find(|req| req.path == "/v1/traces") + .unwrap_or_else(|| { + let paths = captured + .iter() + .map(|req| req.path.as_str()) + .collect::>() + .join(", "); + panic!( + "missing /v1/traces request; got {}: {paths}", + captured.len() + ); + }); + let content_type = request + .content_type + .as_deref() + .unwrap_or(""); + assert!( + content_type.starts_with("application/json"), + "unexpected content-type: {content_type}" + ); + + let body = String::from_utf8_lossy(&request.body); + assert!( + body.contains("trace-loopback-current-thread"), + "expected span name not found; body prefix: {}", + &body.chars().take(2000).collect::() + ); + assert!( + body.contains("codex-cli"), + "expected service name not found; body prefix: {}", + &body.chars().take(2000).collect::() + ); + + Ok(()) +}