From 27724f6ead1a47720ac14173d84bb4fa0d7812df Mon Sep 17 00:00:00 2001 From: Owen Lin Date: Wed, 4 Mar 2026 11:09:17 -0800 Subject: [PATCH] feat(core, tracing): add a span representing a turn (#13424) This is PR 3 of the app-server tracing rollout. PRs https://github.com/openai/codex/pull/13285 and https://github.com/openai/codex/pull/13368 gave us inbound request spans in app-server and propagated trace context through Submission. This change finishes the next piece in core: when a request actually starts a turn, we now create a core-owned long-lived span that stays open for the real lifetime of the turn. What changed: - `Session::spawn_task` can now optionally create a long-lived turn span and run the spawned task inside it - `turn/start` uses that path, so normal turn execution stays under a single core-owned span after the async handoff - `review/start` uses the same pattern - added a unit test that verifies the spawned turn task inherits the submission dispatch trace ancestry **Why** The app-server request span is intentionally short-lived. Once work crosses into core, we still want one span that covers the actual execution window until completion or interruption. This keeps that ownership where it belongs: in the layer that owns the runtime lifecycle. --- codex-rs/core/src/codex.rs | 127 ++++++++++++++++++++-- codex-rs/core/src/tasks/compact.rs | 4 + codex-rs/core/src/tasks/ghost_snapshot.rs | 4 + codex-rs/core/src/tasks/mod.rs | 18 ++- codex-rs/core/src/tasks/regular.rs | 4 + codex-rs/core/src/tasks/review.rs | 4 + codex-rs/core/src/tasks/undo.rs | 4 + codex-rs/core/src/tasks/user_shell.rs | 4 + 8 files changed, 158 insertions(+), 11 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 6cb7b0f61..1e1d96d4a 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -6710,6 +6710,7 @@ mod tests { use serde_json::json; use std::path::PathBuf; use std::sync::Arc; + use std::sync::Once; use std::time::Duration as StdDuration; struct InstructionsTestCase { @@ -8176,10 +8177,16 @@ mod tests { }) } - fn test_tracing_subscriber() -> impl tracing::Subscriber + Send + Sync { - let provider = SdkTracerProvider::builder().build(); - let tracer = provider.tracer("codex-core-tests"); - tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer)) + fn init_test_tracing() { + static INIT: Once = Once::new(); + INIT.call_once(|| { + let provider = SdkTracerProvider::builder().build(); + let tracer = provider.tracer("codex-core-tests"); + let subscriber = tracing_subscriber::registry() + .with(tracing_opentelemetry::layer().with_tracer(tracer)); + tracing::subscriber::set_global_default(subscriber) + .expect("global tracing subscriber should only be installed once"); + }); } async fn build_test_config(codex_home: &Path) -> Config { @@ -8525,8 +8532,7 @@ mod tests { session: Arc::new(session), }; - let subscriber = test_tracing_subscriber(); - let _guard = tracing::subscriber::set_default(subscriber); + init_test_tracing(); let request_parent = W3cTraceContext { traceparent: Some("00-00000000000000000000000000000011-0000000000000022-01".into()), @@ -8560,8 +8566,7 @@ mod tests { #[test] fn submission_dispatch_span_prefers_submission_trace_context() { - let subscriber = test_tracing_subscriber(); - let _guard = tracing::subscriber::set_default(subscriber); + init_test_tracing(); let ambient_parent = W3cTraceContext { traceparent: Some("00-00000000000000000000000000000033-0000000000000044-01".into()), @@ -8592,6 +8597,108 @@ mod tests { ); } + #[tokio::test] + async fn spawn_task_turn_span_inherits_dispatch_trace_context() { + struct TraceCaptureTask { + captured_trace: Arc>>, + } + + #[async_trait::async_trait] + impl SessionTask for TraceCaptureTask { + fn kind(&self) -> TaskKind { + TaskKind::Regular + } + + fn span_name(&self) -> &'static str { + "session_task.trace_capture" + } + + async fn run( + self: Arc, + _session: Arc, + _ctx: Arc, + _input: Vec, + _cancellation_token: CancellationToken, + ) -> Option { + let mut trace = self + .captured_trace + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + *trace = current_span_w3c_trace_context(); + None + } + } + + init_test_tracing(); + + let request_parent = W3cTraceContext { + traceparent: Some("00-00000000000000000000000000000011-0000000000000022-01".into()), + tracestate: Some("vendor=value".into()), + }; + let request_span = tracing::info_span!("app_server.request"); + assert!(set_parent_from_w3c_trace_context( + &request_span, + &request_parent + )); + + let submission_trace = async { + current_span_w3c_trace_context().expect("request span should have trace context") + } + .instrument(request_span) + .await; + + let dispatch_span = submission_dispatch_span(&Submission { + id: "sub-1".into(), + op: Op::Interrupt, + trace: Some(submission_trace.clone()), + }); + let dispatch_span_id = dispatch_span.context().span().span_context().span_id(); + + let (sess, tc, rx) = make_session_and_context_with_rx().await; + let captured_trace = Arc::new(std::sync::Mutex::new(None)); + + async { + sess.spawn_task( + Arc::clone(&tc), + vec![UserInput::Text { + text: "hello".to_string(), + text_elements: Vec::new(), + }], + TraceCaptureTask { + captured_trace: Arc::clone(&captured_trace), + }, + ) + .await; + } + .instrument(dispatch_span) + .await; + + let evt = tokio::time::timeout(StdDuration::from_secs(2), rx.recv()) + .await + .expect("timeout waiting for turn completion") + .expect("event"); + assert!(matches!(evt.msg, EventMsg::TurnComplete(_))); + + let task_trace = captured_trace + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .clone() + .expect("turn task should capture the current span trace context"); + let submission_context = + codex_otel::context_from_w3c_trace_context(&submission_trace).expect("submission"); + let task_context = + codex_otel::context_from_w3c_trace_context(&task_trace).expect("task trace"); + + assert_eq!( + task_context.span().span_context().trace_id(), + submission_context.span().span_context().trace_id() + ); + assert_ne!( + task_context.span().span_context().span_id(), + dispatch_span_id + ); + } + pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx( dynamic_tools: Vec, ) -> ( @@ -9396,6 +9503,10 @@ mod tests { self.kind } + fn span_name(&self) -> &'static str { + "session_task.never_ending" + } + async fn run( self: Arc, _session: Arc, diff --git a/codex-rs/core/src/tasks/compact.rs b/codex-rs/core/src/tasks/compact.rs index db2a494c3..a8c0a5e06 100644 --- a/codex-rs/core/src/tasks/compact.rs +++ b/codex-rs/core/src/tasks/compact.rs @@ -17,6 +17,10 @@ impl SessionTask for CompactTask { TaskKind::Compact } + fn span_name(&self) -> &'static str { + "session_task.compact" + } + async fn run( self: Arc, session: Arc, diff --git a/codex-rs/core/src/tasks/ghost_snapshot.rs b/codex-rs/core/src/tasks/ghost_snapshot.rs index 48a6137c6..ded8533a2 100644 --- a/codex-rs/core/src/tasks/ghost_snapshot.rs +++ b/codex-rs/core/src/tasks/ghost_snapshot.rs @@ -32,6 +32,10 @@ impl SessionTask for GhostSnapshotTask { TaskKind::Regular } + fn span_name(&self) -> &'static str { + "session_task.ghost_snapshot" + } + async fn run( self: Arc, session: Arc, diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index 301d25b6d..a8feeb513 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -14,7 +14,7 @@ use tokio::sync::Notify; use tokio_util::sync::CancellationToken; use tokio_util::task::AbortOnDropHandle; use tracing::Instrument; -use tracing::Span; +use tracing::info_span; use tracing::trace; use tracing::warn; @@ -89,6 +89,9 @@ pub(crate) trait SessionTask: Send + Sync + 'static { /// surface it in telemetry and UI. fn kind(&self) -> TaskKind; + /// Returns the tracing name for a spawned task span. + fn span_name(&self) -> &'static str; + /// Executes the task until completion or cancellation. /// /// Implementations typically stream protocol events using `session` and @@ -127,6 +130,7 @@ impl Session { let task: Arc = Arc::new(task); let task_kind = task.kind(); + let span_name = task.span_name(); let cancellation_token = CancellationToken::new(); let done = Arc::new(Notify::new()); @@ -137,7 +141,15 @@ impl Session { let ctx = Arc::clone(&turn_context); let task_for_run = Arc::clone(&task); let task_cancellation_token = cancellation_token.child_token(); - let session_span = Span::current(); + // Task-owned turn spans keep a core-owned span open for the + // full task lifecycle after the submission dispatch span ends. + let task_span = info_span!( + "turn", + otel.name = span_name, + thread.id = %self.conversation_id, + turn.id = %turn_context.sub_id, + model = %turn_context.model_info.slug, + ); tokio::spawn( async move { let ctx_for_finish = Arc::clone(&ctx); @@ -158,7 +170,7 @@ impl Session { } done_clone.notify_waiters(); } - .instrument(session_span), + .instrument(task_span), ) }; diff --git a/codex-rs/core/src/tasks/regular.rs b/codex-rs/core/src/tasks/regular.rs index b4af4e11f..e3a9ef4c3 100644 --- a/codex-rs/core/src/tasks/regular.rs +++ b/codex-rs/core/src/tasks/regular.rs @@ -68,6 +68,10 @@ impl SessionTask for RegularTask { TaskKind::Regular } + fn span_name(&self) -> &'static str { + "session_task.turn" + } + async fn run( self: Arc, session: Arc, diff --git a/codex-rs/core/src/tasks/review.rs b/codex-rs/core/src/tasks/review.rs index 4192236b4..4787ee782 100644 --- a/codex-rs/core/src/tasks/review.rs +++ b/codex-rs/core/src/tasks/review.rs @@ -43,6 +43,10 @@ impl SessionTask for ReviewTask { TaskKind::Review } + fn span_name(&self) -> &'static str { + "session_task.review" + } + async fn run( self: Arc, session: Arc, diff --git a/codex-rs/core/src/tasks/undo.rs b/codex-rs/core/src/tasks/undo.rs index b2fb4577a..95f51c1ec 100644 --- a/codex-rs/core/src/tasks/undo.rs +++ b/codex-rs/core/src/tasks/undo.rs @@ -31,6 +31,10 @@ impl SessionTask for UndoTask { TaskKind::Regular } + fn span_name(&self) -> &'static str { + "session_task.undo" + } + async fn run( self: Arc, session: Arc, diff --git a/codex-rs/core/src/tasks/user_shell.rs b/codex-rs/core/src/tasks/user_shell.rs index 18dc5006e..deab56bc5 100644 --- a/codex-rs/core/src/tasks/user_shell.rs +++ b/codex-rs/core/src/tasks/user_shell.rs @@ -66,6 +66,10 @@ impl SessionTask for UserShellCommandTask { TaskKind::Regular } + fn span_name(&self) -> &'static str { + "session_task.user_shell" + } + async fn run( self: Arc, session: Arc,