From 014e19510d9fb4bc09c3b8e90fb05d7f3aa39700 Mon Sep 17 00:00:00 2001 From: Owen Lin Date: Fri, 13 Mar 2026 13:16:33 -0700 Subject: [PATCH] feat(app-server, core): add more spans (#14479) ## Description This PR expands tracing coverage across app-server thread startup, core session initialization, and the Responses transport layer. It also gives core dispatch spans stable operation-specific names so traces are easier to follow than the old generic `submission_dispatch` spans. Also use `fmt::Display` for types that we serialize in traces so we send strings instead of rust types --- .../app-server-protocol/src/jsonrpc_lite.rs | 10 ++ codex-rs/app-server/src/app_server_tracing.rs | 10 +- .../app-server/src/codex_message_processor.rs | 36 ++++- .../src/message_processor/tracing_tests.rs | 136 ++++++++++++++---- codex-rs/app-server/src/outgoing_message.rs | 7 + codex-rs/codex-api/src/endpoint/responses.rs | 22 +++ .../src/endpoint/responses_websocket.rs | 93 +++++++----- codex-rs/codex-api/src/endpoint/session.rs | 13 ++ codex-rs/codex-client/src/default_client.rs | 6 +- codex-rs/core/src/client.rs | 39 +++++ codex-rs/core/src/codex.rs | 74 ++++++++-- codex-rs/core/src/codex_tests.rs | 28 ++++ codex-rs/core/src/exec_policy.rs | 2 + codex-rs/core/src/model_provider_info.rs | 10 ++ codex-rs/core/src/models_manager/manager.rs | 32 +++++ codex-rs/core/src/project_doc.rs | 2 + codex-rs/protocol/src/protocol.rs | 41 ++++++ 17 files changed, 473 insertions(+), 88 deletions(-) diff --git a/codex-rs/app-server-protocol/src/jsonrpc_lite.rs b/codex-rs/app-server-protocol/src/jsonrpc_lite.rs index 13d3e0cc9..4e8858ce0 100644 --- a/codex-rs/app-server-protocol/src/jsonrpc_lite.rs +++ b/codex-rs/app-server-protocol/src/jsonrpc_lite.rs @@ -5,6 +5,7 @@ use codex_protocol::protocol::W3cTraceContext; use schemars::JsonSchema; use serde::Deserialize; use serde::Serialize; +use std::fmt; use ts_rs::TS; pub const JSONRPC_VERSION: &str = "2.0"; @@ -19,6 +20,15 @@ pub enum RequestId { Integer(i64), } +impl fmt::Display for RequestId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::String(value) => f.write_str(value), + Self::Integer(value) => write!(f, "{value}"), + } + } +} + pub type Result = serde_json::Value; /// Refers to any valid JSON-RPC object that can be decoded off the wire, or encoded to be sent. diff --git a/codex-rs/app-server/src/app_server_tracing.rs b/codex-rs/app-server/src/app_server_tracing.rs index d1555e28e..564b2eb2d 100644 --- a/codex-rs/app-server/src/app_server_tracing.rs +++ b/codex-rs/app-server/src/app_server_tracing.rs @@ -92,7 +92,7 @@ fn transport_name(transport: AppServerTransport) -> &'static str { fn app_server_request_span_template( method: &str, transport: &'static str, - request_id: &impl std::fmt::Debug, + request_id: &impl std::fmt::Display, connection_id: ConnectionId, ) -> Span { info_span!( @@ -102,8 +102,8 @@ fn app_server_request_span_template( rpc.system = "jsonrpc", rpc.method = method, rpc.transport = transport, - rpc.request_id = ?request_id, - app_server.connection_id = ?connection_id, + rpc.request_id = %request_id, + app_server.connection_id = %connection_id, app_server.api_version = "v2", app_server.client_name = field::Empty, app_server.client_version = field::Empty, @@ -122,14 +122,14 @@ fn record_client_info(span: &Span, client_name: Option<&str>, client_version: Op fn attach_parent_context( span: &Span, method: &str, - request_id: &impl std::fmt::Debug, + request_id: &impl std::fmt::Display, parent_trace: Option<&W3cTraceContext>, ) { if let Some(trace) = parent_trace { if !set_parent_from_w3c_trace_context(span, trace) { tracing::warn!( rpc_method = method, - rpc_request_id = ?request_id, + rpc_request_id = %request_id, "ignoring invalid inbound request trace carrier" ); } diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 5b14ccb4f..81a001dcd 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -1999,6 +1999,7 @@ impl CodexMessageProcessor { }) .collect() }; + let core_dynamic_tool_count = core_dynamic_tools.len(); match listener_task_context .thread_manager @@ -2009,6 +2010,12 @@ impl CodexMessageProcessor { service_name, request_trace, ) + .instrument(tracing::info_span!( + "app_server.thread_start.create_thread", + otel.name = "app_server.thread_start.create_thread", + thread_start.dynamic_tool_count = core_dynamic_tool_count, + thread_start.persist_extended_history = persist_extended_history, + )) .await { Ok(new_conv) => { @@ -2018,7 +2025,13 @@ impl CodexMessageProcessor { session_configured, .. } = new_conv; - let config_snapshot = thread.config_snapshot().await; + let config_snapshot = thread + .config_snapshot() + .instrument(tracing::info_span!( + "app_server.thread_start.config_snapshot", + otel.name = "app_server.thread_start.config_snapshot", + )) + .await; let mut thread = build_thread_from_snapshot( thread_id, &config_snapshot, @@ -2034,6 +2047,11 @@ impl CodexMessageProcessor { experimental_raw_events, ApiVersion::V2, ) + .instrument(tracing::info_span!( + "app_server.thread_start.attach_listener", + otel.name = "app_server.thread_start.attach_listener", + thread_start.experimental_raw_events = experimental_raw_events, + )) .await, thread_id, request_id.connection_id, @@ -2043,12 +2061,20 @@ impl CodexMessageProcessor { listener_task_context .thread_watch_manager .upsert_thread_silently(thread.clone()) + .instrument(tracing::info_span!( + "app_server.thread_start.upsert_thread", + otel.name = "app_server.thread_start.upsert_thread", + )) .await; thread.status = resolve_thread_status( listener_task_context .thread_watch_manager .loaded_status_for_thread(&thread.id) + .instrument(tracing::info_span!( + "app_server.thread_start.resolve_status", + otel.name = "app_server.thread_start.resolve_status", + )) .await, false, ); @@ -2067,12 +2093,20 @@ impl CodexMessageProcessor { listener_task_context .outgoing .send_response(request_id, response) + .instrument(tracing::info_span!( + "app_server.thread_start.send_response", + otel.name = "app_server.thread_start.send_response", + )) .await; let notif = ThreadStartedNotification { thread }; listener_task_context .outgoing .send_server_notification(ServerNotification::ThreadStarted(notif)) + .instrument(tracing::info_span!( + "app_server.thread_start.notify_started", + otel.name = "app_server.thread_start.notify_started", + )) .await; } Err(err) => { diff --git a/codex-rs/app-server/src/message_processor/tracing_tests.rs b/codex-rs/app-server/src/message_processor/tracing_tests.rs index 40d7fb234..76362406a 100644 --- a/codex-rs/app-server/src/message_processor/tracing_tests.rs +++ b/codex-rs/app-server/src/message_processor/tracing_tests.rs @@ -47,8 +47,6 @@ use tracing_subscriber::layer::SubscriberExt; use wiremock::MockServer; const TEST_CONNECTION_ID: ConnectionId = ConnectionId(7); -const CORE_TURN_SANITY_SPAN_NAMES: &[&str] = - &["submission_dispatch", "session_task.turn", "run_turn"]; struct TestTracing { exporter: InMemorySpanExporter, @@ -284,17 +282,21 @@ fn find_rpc_span_with_trace<'a>( }) } -fn find_span_by_name_with_trace<'a>( +fn find_span_with_trace<'a, F>( spans: &'a [SpanData], - name: &str, trace_id: TraceId, -) -> &'a SpanData { + description: &str, + predicate: F, +) -> &'a SpanData +where + F: Fn(&SpanData) -> bool, +{ spans .iter() - .find(|span| span.name.as_ref() == name && span.span_context.trace_id() == trace_id) + .find(|span| span.span_context.trace_id() == trace_id && predicate(span)) .unwrap_or_else(|| { panic!( - "missing span named {name} for trace={trace_id}; exported spans:\n{}", + "missing span matching {description} for trace={trace_id}; exported spans:\n{}", format_spans(spans) ) }) @@ -319,12 +321,17 @@ fn format_spans(spans: &[SpanData]) -> String { .join("\n") } -fn assert_span_descends_from(spans: &[SpanData], child: &SpanData, ancestor: &SpanData) { +fn span_depth_from_ancestor( + spans: &[SpanData], + child: &SpanData, + ancestor: &SpanData, +) -> Option { let ancestor_span_id = ancestor.span_context.span_id(); let mut parent_span_id = child.parent_span_id; + let mut depth = 1; while parent_span_id != SpanId::INVALID { if parent_span_id == ancestor_span_id { - return; + return Some(depth); } let Some(parent_span) = spans .iter() @@ -333,6 +340,15 @@ fn assert_span_descends_from(spans: &[SpanData], child: &SpanData, ancestor: &Sp break; }; parent_span_id = parent_span.parent_span_id; + depth += 1; + } + + None +} + +fn assert_span_descends_from(spans: &[SpanData], child: &SpanData, ancestor: &SpanData) { + if span_depth_from_ancestor(spans, child, ancestor).is_some() { + return; } panic!( @@ -343,6 +359,27 @@ fn assert_span_descends_from(spans: &[SpanData], child: &SpanData, ancestor: &Sp ); } +fn assert_has_internal_descendant_at_min_depth( + spans: &[SpanData], + ancestor: &SpanData, + min_depth: usize, +) { + if spans.iter().any(|span| { + span.span_kind == SpanKind::Internal + && span.span_context.trace_id() == ancestor.span_context.trace_id() + && span_depth_from_ancestor(spans, span, ancestor) + .is_some_and(|depth| depth >= min_depth) + }) { + return; + } + + panic!( + "missing internal descendant at depth >= {min_depth} below {}; exported spans:\n{}", + ancestor.name, + format_spans(spans) + ); +} + async fn read_response( outgoing_rx: &mut mpsc::Receiver, request_id: i64, @@ -443,6 +480,21 @@ where ); } +async fn wait_for_new_exported_spans( + tracing: &TestTracing, + baseline_len: usize, + predicate: F, +) -> Vec +where + F: Fn(&[SpanData]) -> bool, +{ + let spans = wait_for_exported_spans(tracing, |spans| { + spans.len() > baseline_len && predicate(&spans[baseline_len..]) + }) + .await; + spans.into_iter().skip(baseline_len).collect() +} + #[tokio::test(flavor = "current_thread")] async fn thread_start_jsonrpc_span_exports_server_span_and_parents_children() -> Result<()> { let _guard = tracing_test_guard().lock().await; @@ -450,33 +502,65 @@ async fn thread_start_jsonrpc_span_exports_server_span_and_parents_children() -> let RemoteTrace { trace_id: remote_trace_id, + parent_span_id: remote_parent_span_id, context: remote_trace, .. } = RemoteTrace::new("00000000000000000000000000000011", "0000000000000022"); - let _: ThreadStartResponse = harness.start_thread(2, Some(remote_trace)).await; - let spans = wait_for_exported_spans(harness.tracing, |spans| { + let _: ThreadStartResponse = harness.start_thread(20_002, None).await; + let untraced_spans = wait_for_exported_spans(harness.tracing, |spans| { + spans.iter().any(|span| { + span.span_kind == SpanKind::Server + && span_attr(span, "rpc.method") == Some("thread/start") + }) + }) + .await; + let untraced_server_span = find_rpc_span_with_trace( + &untraced_spans, + SpanKind::Server, + "thread/start", + untraced_spans + .iter() + .rev() + .find(|span| { + span.span_kind == SpanKind::Server + && span_attr(span, "rpc.system") == Some("jsonrpc") + && span_attr(span, "rpc.method") == Some("thread/start") + }) + .unwrap_or_else(|| { + panic!( + "missing latest thread/start server span; exported spans:\n{}", + format_spans(&untraced_spans) + ) + }) + .span_context + .trace_id(), + ); + assert_has_internal_descendant_at_min_depth(&untraced_spans, untraced_server_span, 1); + + let baseline_len = untraced_spans.len(); + let _: ThreadStartResponse = harness.start_thread(20_003, Some(remote_trace)).await; + let spans = wait_for_new_exported_spans(harness.tracing, baseline_len, |spans| { spans.iter().any(|span| { span.span_kind == SpanKind::Server && span_attr(span, "rpc.method") == Some("thread/start") && span.span_context.trace_id() == remote_trace_id }) && spans.iter().any(|span| { - span.name.as_ref() == "thread_spawn" && span.span_context.trace_id() == remote_trace_id - }) && spans.iter().any(|span| { - span.name.as_ref() == "session_init" && span.span_context.trace_id() == remote_trace_id + span.name.as_ref() == "app_server.thread_start.notify_started" + && span.span_context.trace_id() == remote_trace_id }) }) .await; let server_request_span = find_rpc_span_with_trace(&spans, SpanKind::Server, "thread/start", remote_trace_id); - let thread_spawn_span = find_span_by_name_with_trace(&spans, "thread_spawn", remote_trace_id); - let session_init_span = find_span_by_name_with_trace(&spans, "session_init", remote_trace_id); assert_eq!(server_request_span.name.as_ref(), "thread/start"); + assert_eq!(server_request_span.parent_span_id, remote_parent_span_id); + assert!(server_request_span.parent_span_is_remote); assert_eq!(server_request_span.span_context.trace_id(), remote_trace_id); assert_ne!(server_request_span.span_context.span_id(), SpanId::INVALID); - assert_span_descends_from(&spans, thread_spawn_span, server_request_span); - assert_span_descends_from(&spans, session_init_span, server_request_span); + assert_has_internal_descendant_at_min_depth(&spans, server_request_span, 1); + assert_has_internal_descendant_at_min_depth(&spans, server_request_span, 2); harness.shutdown().await; Ok(()) @@ -527,7 +611,7 @@ async fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> { && span_attr(span, "rpc.method") == Some("turn/start") && span.span_context.trace_id() == remote_trace_id }) && spans.iter().any(|span| { - CORE_TURN_SANITY_SPAN_NAMES.contains(&span.name.as_ref()) + span_attr(span, "codex.op") == Some("user_input") && span.span_context.trace_id() == remote_trace_id }) }) @@ -535,17 +619,9 @@ async fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> { let server_request_span = find_rpc_span_with_trace(&spans, SpanKind::Server, "turn/start", remote_trace_id); - let core_turn_span = spans - .iter() - .find(|span| { - CORE_TURN_SANITY_SPAN_NAMES.contains(&span.name.as_ref()) - && span.span_context.trace_id() == remote_trace_id - }) - .unwrap_or_else(|| { - panic!( - "missing representative core turn span for trace={remote_trace_id}; exported spans:\n{}", - format_spans(&spans) - ) + let core_turn_span = + find_span_with_trace(&spans, remote_trace_id, "codex.op=user_input", |span| { + span_attr(span, "codex.op") == Some("user_input") }); assert_eq!(server_request_span.parent_span_id, remote_parent_span_id); diff --git a/codex-rs/app-server/src/outgoing_message.rs b/codex-rs/app-server/src/outgoing_message.rs index baf4b65a0..43c6304ae 100644 --- a/codex-rs/app-server/src/outgoing_message.rs +++ b/codex-rs/app-server/src/outgoing_message.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::fmt; use std::sync::Arc; use std::sync::atomic::AtomicI64; use std::sync::atomic::Ordering; @@ -32,6 +33,12 @@ pub(crate) type ClientRequestResult = std::result::Result) -> fmt::Result { + write!(f, "{}", self.0) + } +} + /// Stable identifier for a client request scoped to a transport connection. #[derive(Clone, Debug, Eq, Hash, PartialEq)] pub(crate) struct ConnectionRequestId { diff --git a/codex-rs/codex-api/src/endpoint/responses.rs b/codex-rs/codex-api/src/endpoint/responses.rs index d21208619..57a44d2e2 100644 --- a/codex-rs/codex-api/src/endpoint/responses.rs +++ b/codex-rs/codex-api/src/endpoint/responses.rs @@ -21,6 +21,7 @@ use http::Method; use serde_json::Value; use std::sync::Arc; use std::sync::OnceLock; +use tracing::instrument; pub struct ResponsesClient { session: EndpointSession, @@ -55,6 +56,16 @@ impl ResponsesClient { } } + #[instrument( + name = "responses.stream_request", + level = "info", + skip_all, + fields( + transport = "responses_http", + http.method = "POST", + api.path = "responses" + ) + )] pub async fn stream_request( &self, request: ResponsesApiRequest, @@ -90,6 +101,17 @@ impl ResponsesClient { "responses" } + #[instrument( + name = "responses.stream", + level = "info", + skip_all, + fields( + transport = "responses_http", + http.method = "POST", + api.path = "responses", + turn.has_state = turn_state.is_some() + ) + )] pub async fn stream( &self, body: Value, diff --git a/codex-rs/codex-api/src/endpoint/responses_websocket.rs b/codex-rs/codex-api/src/endpoint/responses_websocket.rs index d5bc6fd4b..28923238b 100644 --- a/codex-rs/codex-api/src/endpoint/responses_websocket.rs +++ b/codex-rs/codex-api/src/endpoint/responses_websocket.rs @@ -35,9 +35,12 @@ use tokio_tungstenite::connect_async_tls_with_config; use tokio_tungstenite::tungstenite::Error as WsError; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::client::IntoClientRequest; +use tracing::Instrument; +use tracing::Span; use tracing::debug; use tracing::error; use tracing::info; +use tracing::instrument; use tracing::trace; use tungstenite::extensions::ExtensionsConfig; use tungstenite::extensions::compression::deflate::DeflateConfig; @@ -202,6 +205,12 @@ impl ResponsesWebsocketConnection { self.stream.lock().await.is_none() } + #[instrument( + name = "responses_websocket.stream_request", + level = "info", + skip_all, + fields(transport = "responses_websocket", api.path = "responses") + )] pub async fn stream_request( &self, request: ResponsesWsRequest, @@ -218,48 +227,52 @@ impl ResponsesWebsocketConnection { ApiError::Stream(format!("failed to encode websocket request: {err}")) })?; - tokio::spawn(async move { - if let Some(model) = server_model { - let _ = tx_event.send(Ok(ResponseEvent::ServerModel(model))).await; - } - if let Some(etag) = models_etag { - let _ = tx_event.send(Ok(ResponseEvent::ModelsEtag(etag))).await; - } - if server_reasoning_included { - let _ = tx_event - .send(Ok(ResponseEvent::ServerReasoningIncluded(true))) - .await; - } - let mut guard = stream.lock().await; - let result = { - let Some(ws_stream) = guard.as_mut() else { + let current_span = Span::current(); + tokio::spawn( + async move { + if let Some(model) = server_model { + let _ = tx_event.send(Ok(ResponseEvent::ServerModel(model))).await; + } + if let Some(etag) = models_etag { + let _ = tx_event.send(Ok(ResponseEvent::ModelsEtag(etag))).await; + } + if server_reasoning_included { let _ = tx_event - .send(Err(ApiError::Stream( - "websocket connection is closed".to_string(), - ))) + .send(Ok(ResponseEvent::ServerReasoningIncluded(true))) .await; - return; + } + let mut guard = stream.lock().await; + let result = { + let Some(ws_stream) = guard.as_mut() else { + let _ = tx_event + .send(Err(ApiError::Stream( + "websocket connection is closed".to_string(), + ))) + .await; + return; + }; + + run_websocket_response_stream( + ws_stream, + tx_event.clone(), + request_body, + idle_timeout, + telemetry, + ) + .await }; - run_websocket_response_stream( - ws_stream, - tx_event.clone(), - request_body, - idle_timeout, - telemetry, - ) - .await - }; - - if let Err(err) = result { - // A terminal stream error should reach the caller immediately. Waiting for a - // graceful close handshake here can stall indefinitely and mask the error. - let failed_stream = guard.take(); - drop(guard); - drop(failed_stream); - let _ = tx_event.send(Err(err)).await; + if let Err(err) = result { + // A terminal stream error should reach the caller immediately. Waiting for a + // graceful close handshake here can stall indefinitely and mask the error. + let failed_stream = guard.take(); + drop(guard); + drop(failed_stream); + let _ = tx_event.send(Err(err)).await; + } } - }); + .instrument(current_span), + ); Ok(ResponseStream { rx_event }) } @@ -275,6 +288,12 @@ impl ResponsesWebsocketClient { Self { provider, auth } } + #[instrument( + name = "responses_websocket.connect", + level = "info", + skip_all, + fields(transport = "responses_websocket", api.path = "responses") + )] pub async fn connect( &self, extra_headers: HeaderMap, diff --git a/codex-rs/codex-api/src/endpoint/session.rs b/codex-rs/codex-api/src/endpoint/session.rs index a6cd7bfe3..0086b4aa1 100644 --- a/codex-rs/codex-api/src/endpoint/session.rs +++ b/codex-rs/codex-api/src/endpoint/session.rs @@ -12,6 +12,7 @@ use http::HeaderMap; use http::Method; use serde_json::Value; use std::sync::Arc; +use tracing::instrument; pub(crate) struct EndpointSession { transport: T, @@ -68,6 +69,12 @@ impl EndpointSession { .await } + #[instrument( + name = "endpoint_session.execute_with", + level = "info", + skip_all, + fields(http.method = %method, api.path = path) + )] pub(crate) async fn execute_with( &self, method: Method, @@ -96,6 +103,12 @@ impl EndpointSession { Ok(response) } + #[instrument( + name = "endpoint_session.stream_with", + level = "info", + skip_all, + fields(http.method = %method, api.path = path) + )] pub(crate) async fn stream_with( &self, method: Method, diff --git a/codex-rs/codex-client/src/default_client.rs b/codex-rs/codex-client/src/default_client.rs index 4e328f7ae..56b3ce4b1 100644 --- a/codex-rs/codex-client/src/default_client.rs +++ b/codex-rs/codex-client/src/default_client.rs @@ -1,12 +1,12 @@ use http::Error as HttpError; +use http::HeaderMap; +use http::HeaderName; +use http::HeaderValue; use opentelemetry::global; use opentelemetry::propagation::Injector; use reqwest::IntoUrl; use reqwest::Method; use reqwest::Response; -use reqwest::header::HeaderMap; -use reqwest::header::HeaderName; -use reqwest::header::HeaderValue; use serde::Serialize; use std::fmt::Display; use std::time::Duration; diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 436d45545..32c1653ae 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -80,6 +80,7 @@ use tokio::sync::oneshot; use tokio::sync::oneshot::error::TryRecvError; use tokio_tungstenite::tungstenite::Error; use tokio_tungstenite::tungstenite::Message; +use tracing::instrument; use tracing::trace; use tracing::warn; @@ -732,6 +733,18 @@ impl ModelClientSession { Ok(()) } /// Returns a websocket connection for this turn. + #[instrument( + name = "model_client.websocket_connection", + level = "info", + skip_all, + fields( + provider = %self.client.state.provider.name, + wire_api = %self.client.state.provider.wire_api, + transport = "responses_websocket", + api.path = "responses", + turn.has_metadata_header = turn_metadata_header.is_some() + ) + )] async fn websocket_connection( &mut self, session_telemetry: &SessionTelemetry, @@ -789,6 +802,19 @@ impl ModelClientSession { /// Handles SSE fixtures, reasoning summaries, verbosity, and the /// `text` controls used for output schemas. #[allow(clippy::too_many_arguments)] + #[instrument( + name = "model_client.stream_responses_api", + level = "info", + skip_all, + fields( + model = %model_info.slug, + wire_api = %self.client.state.provider.wire_api, + transport = "responses_http", + http.method = "POST", + api.path = "responses", + turn.has_metadata_header = turn_metadata_header.is_some() + ) + )] async fn stream_responses_api( &self, prompt: &Prompt, @@ -856,6 +882,19 @@ impl ModelClientSession { /// Streams a turn via the Responses API over WebSocket transport. #[allow(clippy::too_many_arguments)] + #[instrument( + name = "model_client.stream_responses_websocket", + level = "info", + skip_all, + fields( + model = %model_info.slug, + wire_api = %self.client.state.provider.wire_api, + transport = "responses_websocket", + api.path = "responses", + turn.has_metadata_header = turn_metadata_header.is_some(), + websocket.warmup = warmup + ) + )] async fn stream_responses_websocket( &mut self, prompt: &Prompt, diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index fb35ab4a3..10e8bebf8 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -584,7 +584,6 @@ impl Codex { let session_source_clone = session_configuration.session_source.clone(); let (agent_status_tx, agent_status_rx) = watch::channel(AgentStatus::PendingInit); - let session_init_span = info_span!("session_init"); let session = Session::new( session_configuration, config.clone(), @@ -601,7 +600,6 @@ impl Codex { file_watcher, agent_control, ) - .instrument(session_init_span) .await .map_err(|e| { error!("Failed to create session: {e:#}"); @@ -1340,6 +1338,7 @@ impl Session { } } + #[instrument(name = "session_init", level = "info", skip_all)] #[allow(clippy::too_many_arguments)] async fn new( mut session_configuration: SessionConfiguration, @@ -1431,18 +1430,29 @@ impl Session { .await?; Ok((Some(rollout_recorder), state_db_ctx)) } - }; + } + .instrument(info_span!( + "session_init.rollout", + otel.name = "session_init.rollout", + session_init.ephemeral = config.ephemeral, + )); + let is_subagent = matches!( + session_configuration.session_source, + SessionSource::SubAgent(_) + ); let history_meta_fut = async { - if matches!( - session_configuration.session_source, - SessionSource::SubAgent(_) - ) { + if is_subagent { (0, 0) } else { crate::message_history::history_metadata(&config).await } - }; + } + .instrument(info_span!( + "session_init.history_metadata", + otel.name = "session_init.history_metadata", + session_init.is_subagent = is_subagent, + )); let auth_manager_clone = Arc::clone(&auth_manager); let config_for_mcp = Arc::clone(&config); let mcp_manager_for_mcp = Arc::clone(&mcp_manager); @@ -1455,7 +1465,11 @@ impl Session { ) .await; (auth, mcp_servers, auth_statuses) - }; + } + .instrument(info_span!( + "session_init.auth_mcp", + otel.name = "session_init.auth_mcp", + )); // Join all independent futures. let ( @@ -1613,7 +1627,12 @@ impl Session { tx }; let thread_name = - match session_index::find_thread_name_by_id(&config.codex_home, &conversation_id).await + match session_index::find_thread_name_by_id(&config.codex_home, &conversation_id) + .instrument(info_span!( + "session_init.thread_name_lookup", + otel.name = "session_init.thread_name_lookup", + )) + .await { Ok(name) => name, Err(err) => { @@ -1663,6 +1682,12 @@ impl Session { managed_network_requirements_enabled, network_proxy_audit_metadata, ) + .instrument(info_span!( + "session_init.network_proxy", + otel.name = "session_init.network_proxy", + session_init.managed_network_requirements_enabled = + managed_network_requirements_enabled, + )) .await?; (Some(network_proxy), Some(session_network_proxy)) } else { @@ -1812,6 +1837,8 @@ impl Session { .map(|(name, _)| name.clone()) .collect(); required_mcp_servers.sort(); + let enabled_mcp_server_count = mcp_servers.values().filter(|server| server.enabled).count(); + let required_mcp_server_count = required_mcp_servers.len(); let tool_plugin_provenance = mcp_manager.tool_plugin_provenance(config.as_ref()); { let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await; @@ -1829,6 +1856,12 @@ impl Session { codex_apps_tools_cache_key(auth), tool_plugin_provenance, ) + .instrument(info_span!( + "session_init.mcp_manager_init", + otel.name = "session_init.mcp_manager_init", + session_init.enabled_mcp_server_count = enabled_mcp_server_count, + session_init.required_mcp_server_count = required_mcp_server_count, + )) .await; { let mut manager_guard = sess.services.mcp_connection_manager.write().await; @@ -1848,6 +1881,11 @@ impl Session { .read() .await .required_startup_failures(&required_mcp_servers) + .instrument(info_span!( + "session_init.required_mcp_wait", + otel.name = "session_init.required_mcp_wait", + session_init.required_mcp_server_count = required_mcp_server_count, + )) .await; if !failures.is_empty() { let details = failures @@ -4269,11 +4307,23 @@ async fn submission_loop(sess: Arc, config: Arc, rx_sub: Receiv } fn submission_dispatch_span(sub: &Submission) -> tracing::Span { + let op_name = sub.op.kind(); + let span_name = format!("op.dispatch.{op_name}"); let dispatch_span = match &sub.op { Op::RealtimeConversationAudio(_) => { - debug_span!("submission_dispatch", submission.id = sub.id.as_str()) + debug_span!( + "submission_dispatch", + otel.name = span_name.as_str(), + submission.id = sub.id.as_str(), + codex.op = op_name + ) } - _ => info_span!("submission_dispatch", submission.id = sub.id.as_str()), + _ => info_span!( + "submission_dispatch", + otel.name = span_name.as_str(), + submission.id = sub.id.as_str(), + codex.op = op_name + ), }; if let Some(trace) = sub.trace.as_ref() && !set_parent_from_w3c_trace_context(&dispatch_span, trace) diff --git a/codex-rs/core/src/codex_tests.rs b/codex-rs/core/src/codex_tests.rs index a1d88ff3d..3059a34d7 100644 --- a/codex-rs/core/src/codex_tests.rs +++ b/codex-rs/core/src/codex_tests.rs @@ -2491,6 +2491,34 @@ fn submission_dispatch_span_uses_debug_for_realtime_audio() { ); } +#[test] +fn op_kind_distinguishes_turn_ops() { + assert_eq!( + Op::OverrideTurnContext { + cwd: None, + approval_policy: None, + sandbox_policy: None, + windows_sandbox_level: None, + model: None, + effort: None, + summary: None, + service_tier: None, + collaboration_mode: None, + personality: None, + } + .kind(), + "override_turn_context" + ); + assert_eq!( + Op::UserInput { + items: vec![], + final_output_json_schema: None, + } + .kind(), + "user_input" + ); +} + #[tokio::test] async fn spawn_task_turn_span_inherits_dispatch_trace_context() { struct TraceCaptureTask { diff --git a/codex-rs/core/src/exec_policy.rs b/codex-rs/core/src/exec_policy.rs index aa0691f88..10171b574 100644 --- a/codex-rs/core/src/exec_policy.rs +++ b/codex-rs/core/src/exec_policy.rs @@ -28,6 +28,7 @@ use codex_protocol::protocol::SandboxPolicy; use thiserror::Error; use tokio::fs; use tokio::task::spawn_blocking; +use tracing::instrument; use crate::bash::parse_shell_lc_plain_commands; use crate::bash::parse_shell_lc_single_command_prefix; @@ -187,6 +188,7 @@ impl ExecPolicyManager { } } + #[instrument(level = "info", skip_all)] pub(crate) async fn load(config_stack: &ConfigLayerStack) -> Result { let (policy, warning) = load_exec_policy_with_warning(config_stack).await?; if let Some(err) = warning.as_ref() { diff --git a/codex-rs/core/src/model_provider_info.rs b/codex-rs/core/src/model_provider_info.rs index d8e2ea35e..fe78a846f 100644 --- a/codex-rs/core/src/model_provider_info.rs +++ b/codex-rs/core/src/model_provider_info.rs @@ -16,6 +16,7 @@ use schemars::JsonSchema; use serde::Deserialize; use serde::Serialize; use std::collections::HashMap; +use std::fmt; use std::time::Duration; const DEFAULT_STREAM_IDLE_TIMEOUT_MS: u64 = 300_000; @@ -40,6 +41,15 @@ pub enum WireApi { Responses, } +impl fmt::Display for WireApi { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let value = match self { + Self::Responses => "responses", + }; + f.write_str(value) + } +} + impl<'de> Deserialize<'de> for WireApi { fn deserialize(deserializer: D) -> Result where diff --git a/codex-rs/core/src/models_manager/manager.rs b/codex-rs/core/src/models_manager/manager.rs index 89c6cdcb3..fed50cb5f 100644 --- a/codex-rs/core/src/models_manager/manager.rs +++ b/codex-rs/core/src/models_manager/manager.rs @@ -18,6 +18,7 @@ use codex_protocol::openai_models::ModelInfo; use codex_protocol::openai_models::ModelPreset; use codex_protocol::openai_models::ModelsResponse; use http::HeaderMap; +use std::fmt; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -26,6 +27,7 @@ use tokio::sync::TryLockError; use tokio::time::timeout; use tracing::error; use tracing::info; +use tracing::instrument; const MODEL_CACHE_FILE: &str = "models_cache.json"; const DEFAULT_MODEL_CACHE_TTL: Duration = Duration::from_secs(300); @@ -42,6 +44,22 @@ pub enum RefreshStrategy { OnlineIfUncached, } +impl RefreshStrategy { + const fn as_str(self) -> &'static str { + match self { + Self::Online => "online", + Self::Offline => "offline", + Self::OnlineIfUncached => "online_if_uncached", + } + } +} + +impl fmt::Display for RefreshStrategy { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(self.as_str()) + } +} + /// How the manager's base catalog is sourced for the lifetime of the process. #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum CatalogMode { @@ -102,6 +120,11 @@ impl ModelsManager { /// List all available models, refreshing according to the specified strategy. /// /// Returns model presets sorted by priority and filtered by auth mode and visibility. + #[instrument( + level = "info", + skip(self), + fields(refresh_strategy = %refresh_strategy) + )] pub async fn list_models(&self, refresh_strategy: RefreshStrategy) -> Vec { if let Err(err) = self.refresh_available_models(refresh_strategy).await { error!("failed to refresh available models: {err}"); @@ -137,6 +160,14 @@ impl ModelsManager { /// /// If `model` is provided, returns it directly. Otherwise selects the default based on /// auth mode and available models. + #[instrument( + level = "info", + skip(self, model), + fields( + model.provided = model.is_some(), + refresh_strategy = %refresh_strategy + ) + )] pub async fn get_default_model( &self, model: &Option, @@ -160,6 +191,7 @@ impl ModelsManager { // todo(aibrahim): look if we can tighten it to pub(crate) /// Look up model metadata, applying remote overrides and config adjustments. + #[instrument(level = "info", skip(self, config), fields(model = model))] pub async fn get_model_info(&self, model: &str, config: &Config) -> ModelInfo { let remote_models = self.get_remote_models().await; Self::construct_model_info_from_candidates(model, &remote_models, config) diff --git a/codex-rs/core/src/project_doc.rs b/codex-rs/core/src/project_doc.rs index 0aa94c836..bde0fbe84 100644 --- a/codex-rs/core/src/project_doc.rs +++ b/codex-rs/core/src/project_doc.rs @@ -31,6 +31,7 @@ use std::path::PathBuf; use tokio::io::AsyncReadExt; use toml::Value as TomlValue; use tracing::error; +use tracing::instrument; pub(crate) const HIERARCHICAL_AGENTS_MESSAGE: &str = include_str!("../hierarchical_agents_message.md"); @@ -80,6 +81,7 @@ fn render_js_repl_instructions(config: &Config) -> Option { /// Combines `Config::instructions` and `AGENTS.md` (if present) into a single /// string of instructions. +#[instrument(level = "info", skip_all)] pub(crate) async fn get_user_instructions( config: &Config, skills: Option<&[SkillMetadata]>, diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index b450db37f..25c9e3fda 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -478,6 +478,47 @@ pub enum Op { ListModels, } +impl Op { + pub fn kind(&self) -> &'static str { + match self { + Self::Interrupt => "interrupt", + Self::CleanBackgroundTerminals => "clean_background_terminals", + Self::RealtimeConversationStart(_) => "realtime_conversation_start", + Self::RealtimeConversationAudio(_) => "realtime_conversation_audio", + Self::RealtimeConversationText(_) => "realtime_conversation_text", + Self::RealtimeConversationClose => "realtime_conversation_close", + Self::UserInput { .. } => "user_input", + Self::UserTurn { .. } => "user_turn", + Self::OverrideTurnContext { .. } => "override_turn_context", + Self::ExecApproval { .. } => "exec_approval", + Self::PatchApproval { .. } => "patch_approval", + Self::ResolveElicitation { .. } => "resolve_elicitation", + Self::UserInputAnswer { .. } => "user_input_answer", + Self::RequestPermissionsResponse { .. } => "request_permissions_response", + Self::DynamicToolResponse { .. } => "dynamic_tool_response", + Self::AddToHistory { .. } => "add_to_history", + Self::GetHistoryEntryRequest { .. } => "get_history_entry_request", + Self::ListMcpTools => "list_mcp_tools", + Self::RefreshMcpServers { .. } => "refresh_mcp_servers", + Self::ReloadUserConfig => "reload_user_config", + Self::ListCustomPrompts => "list_custom_prompts", + Self::ListSkills { .. } => "list_skills", + Self::ListRemoteSkills { .. } => "list_remote_skills", + Self::DownloadRemoteSkill { .. } => "download_remote_skill", + Self::Compact => "compact", + Self::DropMemories => "drop_memories", + Self::UpdateMemories => "update_memories", + Self::SetThreadName { .. } => "set_thread_name", + Self::Undo => "undo", + Self::ThreadRollback { .. } => "thread_rollback", + Self::Review { .. } => "review", + Self::Shutdown => "shutdown", + Self::RunUserShellCommand { .. } => "run_user_shell_command", + Self::ListModels => "list_models", + } + } +} + /// Determines the conditions under which the user is consulted to approve /// running the command proposed by Codex. #[derive(