feat(app-server, core): add more spans (#14479)
## Description This PR expands tracing coverage across app-server thread startup, core session initialization, and the Responses transport layer. It also gives core dispatch spans stable operation-specific names so traces are easier to follow than the old generic `submission_dispatch` spans. Also use `fmt::Display` for types that we serialize in traces so we send strings instead of rust types
This commit is contained in:
parent
914f7c7317
commit
014e19510d
17 changed files with 473 additions and 88 deletions
|
|
@ -5,6 +5,7 @@ use codex_protocol::protocol::W3cTraceContext;
|
|||
use schemars::JsonSchema;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::fmt;
|
||||
use ts_rs::TS;
|
||||
|
||||
pub const JSONRPC_VERSION: &str = "2.0";
|
||||
|
|
@ -19,6 +20,15 @@ pub enum RequestId {
|
|||
Integer(i64),
|
||||
}
|
||||
|
||||
impl fmt::Display for RequestId {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::String(value) => f.write_str(value),
|
||||
Self::Integer(value) => write!(f, "{value}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type Result = serde_json::Value;
|
||||
|
||||
/// Refers to any valid JSON-RPC object that can be decoded off the wire, or encoded to be sent.
|
||||
|
|
|
|||
|
|
@ -92,7 +92,7 @@ fn transport_name(transport: AppServerTransport) -> &'static str {
|
|||
fn app_server_request_span_template(
|
||||
method: &str,
|
||||
transport: &'static str,
|
||||
request_id: &impl std::fmt::Debug,
|
||||
request_id: &impl std::fmt::Display,
|
||||
connection_id: ConnectionId,
|
||||
) -> Span {
|
||||
info_span!(
|
||||
|
|
@ -102,8 +102,8 @@ fn app_server_request_span_template(
|
|||
rpc.system = "jsonrpc",
|
||||
rpc.method = method,
|
||||
rpc.transport = transport,
|
||||
rpc.request_id = ?request_id,
|
||||
app_server.connection_id = ?connection_id,
|
||||
rpc.request_id = %request_id,
|
||||
app_server.connection_id = %connection_id,
|
||||
app_server.api_version = "v2",
|
||||
app_server.client_name = field::Empty,
|
||||
app_server.client_version = field::Empty,
|
||||
|
|
@ -122,14 +122,14 @@ fn record_client_info(span: &Span, client_name: Option<&str>, client_version: Op
|
|||
fn attach_parent_context(
|
||||
span: &Span,
|
||||
method: &str,
|
||||
request_id: &impl std::fmt::Debug,
|
||||
request_id: &impl std::fmt::Display,
|
||||
parent_trace: Option<&W3cTraceContext>,
|
||||
) {
|
||||
if let Some(trace) = parent_trace {
|
||||
if !set_parent_from_w3c_trace_context(span, trace) {
|
||||
tracing::warn!(
|
||||
rpc_method = method,
|
||||
rpc_request_id = ?request_id,
|
||||
rpc_request_id = %request_id,
|
||||
"ignoring invalid inbound request trace carrier"
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1999,6 +1999,7 @@ impl CodexMessageProcessor {
|
|||
})
|
||||
.collect()
|
||||
};
|
||||
let core_dynamic_tool_count = core_dynamic_tools.len();
|
||||
|
||||
match listener_task_context
|
||||
.thread_manager
|
||||
|
|
@ -2009,6 +2010,12 @@ impl CodexMessageProcessor {
|
|||
service_name,
|
||||
request_trace,
|
||||
)
|
||||
.instrument(tracing::info_span!(
|
||||
"app_server.thread_start.create_thread",
|
||||
otel.name = "app_server.thread_start.create_thread",
|
||||
thread_start.dynamic_tool_count = core_dynamic_tool_count,
|
||||
thread_start.persist_extended_history = persist_extended_history,
|
||||
))
|
||||
.await
|
||||
{
|
||||
Ok(new_conv) => {
|
||||
|
|
@ -2018,7 +2025,13 @@ impl CodexMessageProcessor {
|
|||
session_configured,
|
||||
..
|
||||
} = new_conv;
|
||||
let config_snapshot = thread.config_snapshot().await;
|
||||
let config_snapshot = thread
|
||||
.config_snapshot()
|
||||
.instrument(tracing::info_span!(
|
||||
"app_server.thread_start.config_snapshot",
|
||||
otel.name = "app_server.thread_start.config_snapshot",
|
||||
))
|
||||
.await;
|
||||
let mut thread = build_thread_from_snapshot(
|
||||
thread_id,
|
||||
&config_snapshot,
|
||||
|
|
@ -2034,6 +2047,11 @@ impl CodexMessageProcessor {
|
|||
experimental_raw_events,
|
||||
ApiVersion::V2,
|
||||
)
|
||||
.instrument(tracing::info_span!(
|
||||
"app_server.thread_start.attach_listener",
|
||||
otel.name = "app_server.thread_start.attach_listener",
|
||||
thread_start.experimental_raw_events = experimental_raw_events,
|
||||
))
|
||||
.await,
|
||||
thread_id,
|
||||
request_id.connection_id,
|
||||
|
|
@ -2043,12 +2061,20 @@ impl CodexMessageProcessor {
|
|||
listener_task_context
|
||||
.thread_watch_manager
|
||||
.upsert_thread_silently(thread.clone())
|
||||
.instrument(tracing::info_span!(
|
||||
"app_server.thread_start.upsert_thread",
|
||||
otel.name = "app_server.thread_start.upsert_thread",
|
||||
))
|
||||
.await;
|
||||
|
||||
thread.status = resolve_thread_status(
|
||||
listener_task_context
|
||||
.thread_watch_manager
|
||||
.loaded_status_for_thread(&thread.id)
|
||||
.instrument(tracing::info_span!(
|
||||
"app_server.thread_start.resolve_status",
|
||||
otel.name = "app_server.thread_start.resolve_status",
|
||||
))
|
||||
.await,
|
||||
false,
|
||||
);
|
||||
|
|
@ -2067,12 +2093,20 @@ impl CodexMessageProcessor {
|
|||
listener_task_context
|
||||
.outgoing
|
||||
.send_response(request_id, response)
|
||||
.instrument(tracing::info_span!(
|
||||
"app_server.thread_start.send_response",
|
||||
otel.name = "app_server.thread_start.send_response",
|
||||
))
|
||||
.await;
|
||||
|
||||
let notif = ThreadStartedNotification { thread };
|
||||
listener_task_context
|
||||
.outgoing
|
||||
.send_server_notification(ServerNotification::ThreadStarted(notif))
|
||||
.instrument(tracing::info_span!(
|
||||
"app_server.thread_start.notify_started",
|
||||
otel.name = "app_server.thread_start.notify_started",
|
||||
))
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
|
|
|
|||
|
|
@ -47,8 +47,6 @@ use tracing_subscriber::layer::SubscriberExt;
|
|||
use wiremock::MockServer;
|
||||
|
||||
const TEST_CONNECTION_ID: ConnectionId = ConnectionId(7);
|
||||
const CORE_TURN_SANITY_SPAN_NAMES: &[&str] =
|
||||
&["submission_dispatch", "session_task.turn", "run_turn"];
|
||||
|
||||
struct TestTracing {
|
||||
exporter: InMemorySpanExporter,
|
||||
|
|
@ -284,17 +282,21 @@ fn find_rpc_span_with_trace<'a>(
|
|||
})
|
||||
}
|
||||
|
||||
fn find_span_by_name_with_trace<'a>(
|
||||
fn find_span_with_trace<'a, F>(
|
||||
spans: &'a [SpanData],
|
||||
name: &str,
|
||||
trace_id: TraceId,
|
||||
) -> &'a SpanData {
|
||||
description: &str,
|
||||
predicate: F,
|
||||
) -> &'a SpanData
|
||||
where
|
||||
F: Fn(&SpanData) -> bool,
|
||||
{
|
||||
spans
|
||||
.iter()
|
||||
.find(|span| span.name.as_ref() == name && span.span_context.trace_id() == trace_id)
|
||||
.find(|span| span.span_context.trace_id() == trace_id && predicate(span))
|
||||
.unwrap_or_else(|| {
|
||||
panic!(
|
||||
"missing span named {name} for trace={trace_id}; exported spans:\n{}",
|
||||
"missing span matching {description} for trace={trace_id}; exported spans:\n{}",
|
||||
format_spans(spans)
|
||||
)
|
||||
})
|
||||
|
|
@ -319,12 +321,17 @@ fn format_spans(spans: &[SpanData]) -> String {
|
|||
.join("\n")
|
||||
}
|
||||
|
||||
fn assert_span_descends_from(spans: &[SpanData], child: &SpanData, ancestor: &SpanData) {
|
||||
fn span_depth_from_ancestor(
|
||||
spans: &[SpanData],
|
||||
child: &SpanData,
|
||||
ancestor: &SpanData,
|
||||
) -> Option<usize> {
|
||||
let ancestor_span_id = ancestor.span_context.span_id();
|
||||
let mut parent_span_id = child.parent_span_id;
|
||||
let mut depth = 1;
|
||||
while parent_span_id != SpanId::INVALID {
|
||||
if parent_span_id == ancestor_span_id {
|
||||
return;
|
||||
return Some(depth);
|
||||
}
|
||||
let Some(parent_span) = spans
|
||||
.iter()
|
||||
|
|
@ -333,6 +340,15 @@ fn assert_span_descends_from(spans: &[SpanData], child: &SpanData, ancestor: &Sp
|
|||
break;
|
||||
};
|
||||
parent_span_id = parent_span.parent_span_id;
|
||||
depth += 1;
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
fn assert_span_descends_from(spans: &[SpanData], child: &SpanData, ancestor: &SpanData) {
|
||||
if span_depth_from_ancestor(spans, child, ancestor).is_some() {
|
||||
return;
|
||||
}
|
||||
|
||||
panic!(
|
||||
|
|
@ -343,6 +359,27 @@ fn assert_span_descends_from(spans: &[SpanData], child: &SpanData, ancestor: &Sp
|
|||
);
|
||||
}
|
||||
|
||||
fn assert_has_internal_descendant_at_min_depth(
|
||||
spans: &[SpanData],
|
||||
ancestor: &SpanData,
|
||||
min_depth: usize,
|
||||
) {
|
||||
if spans.iter().any(|span| {
|
||||
span.span_kind == SpanKind::Internal
|
||||
&& span.span_context.trace_id() == ancestor.span_context.trace_id()
|
||||
&& span_depth_from_ancestor(spans, span, ancestor)
|
||||
.is_some_and(|depth| depth >= min_depth)
|
||||
}) {
|
||||
return;
|
||||
}
|
||||
|
||||
panic!(
|
||||
"missing internal descendant at depth >= {min_depth} below {}; exported spans:\n{}",
|
||||
ancestor.name,
|
||||
format_spans(spans)
|
||||
);
|
||||
}
|
||||
|
||||
async fn read_response<T: serde::de::DeserializeOwned>(
|
||||
outgoing_rx: &mut mpsc::Receiver<crate::outgoing_message::OutgoingEnvelope>,
|
||||
request_id: i64,
|
||||
|
|
@ -443,6 +480,21 @@ where
|
|||
);
|
||||
}
|
||||
|
||||
async fn wait_for_new_exported_spans<F>(
|
||||
tracing: &TestTracing,
|
||||
baseline_len: usize,
|
||||
predicate: F,
|
||||
) -> Vec<SpanData>
|
||||
where
|
||||
F: Fn(&[SpanData]) -> bool,
|
||||
{
|
||||
let spans = wait_for_exported_spans(tracing, |spans| {
|
||||
spans.len() > baseline_len && predicate(&spans[baseline_len..])
|
||||
})
|
||||
.await;
|
||||
spans.into_iter().skip(baseline_len).collect()
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn thread_start_jsonrpc_span_exports_server_span_and_parents_children() -> Result<()> {
|
||||
let _guard = tracing_test_guard().lock().await;
|
||||
|
|
@ -450,33 +502,65 @@ async fn thread_start_jsonrpc_span_exports_server_span_and_parents_children() ->
|
|||
|
||||
let RemoteTrace {
|
||||
trace_id: remote_trace_id,
|
||||
parent_span_id: remote_parent_span_id,
|
||||
context: remote_trace,
|
||||
..
|
||||
} = RemoteTrace::new("00000000000000000000000000000011", "0000000000000022");
|
||||
|
||||
let _: ThreadStartResponse = harness.start_thread(2, Some(remote_trace)).await;
|
||||
let spans = wait_for_exported_spans(harness.tracing, |spans| {
|
||||
let _: ThreadStartResponse = harness.start_thread(20_002, None).await;
|
||||
let untraced_spans = wait_for_exported_spans(harness.tracing, |spans| {
|
||||
spans.iter().any(|span| {
|
||||
span.span_kind == SpanKind::Server
|
||||
&& span_attr(span, "rpc.method") == Some("thread/start")
|
||||
})
|
||||
})
|
||||
.await;
|
||||
let untraced_server_span = find_rpc_span_with_trace(
|
||||
&untraced_spans,
|
||||
SpanKind::Server,
|
||||
"thread/start",
|
||||
untraced_spans
|
||||
.iter()
|
||||
.rev()
|
||||
.find(|span| {
|
||||
span.span_kind == SpanKind::Server
|
||||
&& span_attr(span, "rpc.system") == Some("jsonrpc")
|
||||
&& span_attr(span, "rpc.method") == Some("thread/start")
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
panic!(
|
||||
"missing latest thread/start server span; exported spans:\n{}",
|
||||
format_spans(&untraced_spans)
|
||||
)
|
||||
})
|
||||
.span_context
|
||||
.trace_id(),
|
||||
);
|
||||
assert_has_internal_descendant_at_min_depth(&untraced_spans, untraced_server_span, 1);
|
||||
|
||||
let baseline_len = untraced_spans.len();
|
||||
let _: ThreadStartResponse = harness.start_thread(20_003, Some(remote_trace)).await;
|
||||
let spans = wait_for_new_exported_spans(harness.tracing, baseline_len, |spans| {
|
||||
spans.iter().any(|span| {
|
||||
span.span_kind == SpanKind::Server
|
||||
&& span_attr(span, "rpc.method") == Some("thread/start")
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
}) && spans.iter().any(|span| {
|
||||
span.name.as_ref() == "thread_spawn" && span.span_context.trace_id() == remote_trace_id
|
||||
}) && spans.iter().any(|span| {
|
||||
span.name.as_ref() == "session_init" && span.span_context.trace_id() == remote_trace_id
|
||||
span.name.as_ref() == "app_server.thread_start.notify_started"
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
})
|
||||
})
|
||||
.await;
|
||||
|
||||
let server_request_span =
|
||||
find_rpc_span_with_trace(&spans, SpanKind::Server, "thread/start", remote_trace_id);
|
||||
let thread_spawn_span = find_span_by_name_with_trace(&spans, "thread_spawn", remote_trace_id);
|
||||
let session_init_span = find_span_by_name_with_trace(&spans, "session_init", remote_trace_id);
|
||||
assert_eq!(server_request_span.name.as_ref(), "thread/start");
|
||||
assert_eq!(server_request_span.parent_span_id, remote_parent_span_id);
|
||||
assert!(server_request_span.parent_span_is_remote);
|
||||
assert_eq!(server_request_span.span_context.trace_id(), remote_trace_id);
|
||||
assert_ne!(server_request_span.span_context.span_id(), SpanId::INVALID);
|
||||
assert_span_descends_from(&spans, thread_spawn_span, server_request_span);
|
||||
assert_span_descends_from(&spans, session_init_span, server_request_span);
|
||||
assert_has_internal_descendant_at_min_depth(&spans, server_request_span, 1);
|
||||
assert_has_internal_descendant_at_min_depth(&spans, server_request_span, 2);
|
||||
harness.shutdown().await;
|
||||
|
||||
Ok(())
|
||||
|
|
@ -527,7 +611,7 @@ async fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
|
|||
&& span_attr(span, "rpc.method") == Some("turn/start")
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
}) && spans.iter().any(|span| {
|
||||
CORE_TURN_SANITY_SPAN_NAMES.contains(&span.name.as_ref())
|
||||
span_attr(span, "codex.op") == Some("user_input")
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
})
|
||||
})
|
||||
|
|
@ -535,17 +619,9 @@ async fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
|
|||
|
||||
let server_request_span =
|
||||
find_rpc_span_with_trace(&spans, SpanKind::Server, "turn/start", remote_trace_id);
|
||||
let core_turn_span = spans
|
||||
.iter()
|
||||
.find(|span| {
|
||||
CORE_TURN_SANITY_SPAN_NAMES.contains(&span.name.as_ref())
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
panic!(
|
||||
"missing representative core turn span for trace={remote_trace_id}; exported spans:\n{}",
|
||||
format_spans(&spans)
|
||||
)
|
||||
let core_turn_span =
|
||||
find_span_with_trace(&spans, remote_trace_id, "codex.op=user_input", |span| {
|
||||
span_attr(span, "codex.op") == Some("user_input")
|
||||
});
|
||||
|
||||
assert_eq!(server_request_span.parent_span_id, remote_parent_span_id);
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicI64;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
|
@ -32,6 +33,12 @@ pub(crate) type ClientRequestResult = std::result::Result<Result, JSONRPCErrorEr
|
|||
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
|
||||
pub(crate) struct ConnectionId(pub(crate) u64);
|
||||
|
||||
impl fmt::Display for ConnectionId {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
/// Stable identifier for a client request scoped to a transport connection.
|
||||
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
||||
pub(crate) struct ConnectionRequestId {
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ use http::Method;
|
|||
use serde_json::Value;
|
||||
use std::sync::Arc;
|
||||
use std::sync::OnceLock;
|
||||
use tracing::instrument;
|
||||
|
||||
pub struct ResponsesClient<T: HttpTransport, A: AuthProvider> {
|
||||
session: EndpointSession<T, A>,
|
||||
|
|
@ -55,6 +56,16 @@ impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "responses.stream_request",
|
||||
level = "info",
|
||||
skip_all,
|
||||
fields(
|
||||
transport = "responses_http",
|
||||
http.method = "POST",
|
||||
api.path = "responses"
|
||||
)
|
||||
)]
|
||||
pub async fn stream_request(
|
||||
&self,
|
||||
request: ResponsesApiRequest,
|
||||
|
|
@ -90,6 +101,17 @@ impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
|
|||
"responses"
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "responses.stream",
|
||||
level = "info",
|
||||
skip_all,
|
||||
fields(
|
||||
transport = "responses_http",
|
||||
http.method = "POST",
|
||||
api.path = "responses",
|
||||
turn.has_state = turn_state.is_some()
|
||||
)
|
||||
)]
|
||||
pub async fn stream(
|
||||
&self,
|
||||
body: Value,
|
||||
|
|
|
|||
|
|
@ -35,9 +35,12 @@ use tokio_tungstenite::connect_async_tls_with_config;
|
|||
use tokio_tungstenite::tungstenite::Error as WsError;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
|
||||
use tracing::Instrument;
|
||||
use tracing::Span;
|
||||
use tracing::debug;
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
use tracing::instrument;
|
||||
use tracing::trace;
|
||||
use tungstenite::extensions::ExtensionsConfig;
|
||||
use tungstenite::extensions::compression::deflate::DeflateConfig;
|
||||
|
|
@ -202,6 +205,12 @@ impl ResponsesWebsocketConnection {
|
|||
self.stream.lock().await.is_none()
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "responses_websocket.stream_request",
|
||||
level = "info",
|
||||
skip_all,
|
||||
fields(transport = "responses_websocket", api.path = "responses")
|
||||
)]
|
||||
pub async fn stream_request(
|
||||
&self,
|
||||
request: ResponsesWsRequest,
|
||||
|
|
@ -218,48 +227,52 @@ impl ResponsesWebsocketConnection {
|
|||
ApiError::Stream(format!("failed to encode websocket request: {err}"))
|
||||
})?;
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Some(model) = server_model {
|
||||
let _ = tx_event.send(Ok(ResponseEvent::ServerModel(model))).await;
|
||||
}
|
||||
if let Some(etag) = models_etag {
|
||||
let _ = tx_event.send(Ok(ResponseEvent::ModelsEtag(etag))).await;
|
||||
}
|
||||
if server_reasoning_included {
|
||||
let _ = tx_event
|
||||
.send(Ok(ResponseEvent::ServerReasoningIncluded(true)))
|
||||
.await;
|
||||
}
|
||||
let mut guard = stream.lock().await;
|
||||
let result = {
|
||||
let Some(ws_stream) = guard.as_mut() else {
|
||||
let current_span = Span::current();
|
||||
tokio::spawn(
|
||||
async move {
|
||||
if let Some(model) = server_model {
|
||||
let _ = tx_event.send(Ok(ResponseEvent::ServerModel(model))).await;
|
||||
}
|
||||
if let Some(etag) = models_etag {
|
||||
let _ = tx_event.send(Ok(ResponseEvent::ModelsEtag(etag))).await;
|
||||
}
|
||||
if server_reasoning_included {
|
||||
let _ = tx_event
|
||||
.send(Err(ApiError::Stream(
|
||||
"websocket connection is closed".to_string(),
|
||||
)))
|
||||
.send(Ok(ResponseEvent::ServerReasoningIncluded(true)))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
let mut guard = stream.lock().await;
|
||||
let result = {
|
||||
let Some(ws_stream) = guard.as_mut() else {
|
||||
let _ = tx_event
|
||||
.send(Err(ApiError::Stream(
|
||||
"websocket connection is closed".to_string(),
|
||||
)))
|
||||
.await;
|
||||
return;
|
||||
};
|
||||
|
||||
run_websocket_response_stream(
|
||||
ws_stream,
|
||||
tx_event.clone(),
|
||||
request_body,
|
||||
idle_timeout,
|
||||
telemetry,
|
||||
)
|
||||
.await
|
||||
};
|
||||
|
||||
run_websocket_response_stream(
|
||||
ws_stream,
|
||||
tx_event.clone(),
|
||||
request_body,
|
||||
idle_timeout,
|
||||
telemetry,
|
||||
)
|
||||
.await
|
||||
};
|
||||
|
||||
if let Err(err) = result {
|
||||
// A terminal stream error should reach the caller immediately. Waiting for a
|
||||
// graceful close handshake here can stall indefinitely and mask the error.
|
||||
let failed_stream = guard.take();
|
||||
drop(guard);
|
||||
drop(failed_stream);
|
||||
let _ = tx_event.send(Err(err)).await;
|
||||
if let Err(err) = result {
|
||||
// A terminal stream error should reach the caller immediately. Waiting for a
|
||||
// graceful close handshake here can stall indefinitely and mask the error.
|
||||
let failed_stream = guard.take();
|
||||
drop(guard);
|
||||
drop(failed_stream);
|
||||
let _ = tx_event.send(Err(err)).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
.instrument(current_span),
|
||||
);
|
||||
|
||||
Ok(ResponseStream { rx_event })
|
||||
}
|
||||
|
|
@ -275,6 +288,12 @@ impl<A: AuthProvider> ResponsesWebsocketClient<A> {
|
|||
Self { provider, auth }
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "responses_websocket.connect",
|
||||
level = "info",
|
||||
skip_all,
|
||||
fields(transport = "responses_websocket", api.path = "responses")
|
||||
)]
|
||||
pub async fn connect(
|
||||
&self,
|
||||
extra_headers: HeaderMap,
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ use http::HeaderMap;
|
|||
use http::Method;
|
||||
use serde_json::Value;
|
||||
use std::sync::Arc;
|
||||
use tracing::instrument;
|
||||
|
||||
pub(crate) struct EndpointSession<T: HttpTransport, A: AuthProvider> {
|
||||
transport: T,
|
||||
|
|
@ -68,6 +69,12 @@ impl<T: HttpTransport, A: AuthProvider> EndpointSession<T, A> {
|
|||
.await
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "endpoint_session.execute_with",
|
||||
level = "info",
|
||||
skip_all,
|
||||
fields(http.method = %method, api.path = path)
|
||||
)]
|
||||
pub(crate) async fn execute_with<C>(
|
||||
&self,
|
||||
method: Method,
|
||||
|
|
@ -96,6 +103,12 @@ impl<T: HttpTransport, A: AuthProvider> EndpointSession<T, A> {
|
|||
Ok(response)
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "endpoint_session.stream_with",
|
||||
level = "info",
|
||||
skip_all,
|
||||
fields(http.method = %method, api.path = path)
|
||||
)]
|
||||
pub(crate) async fn stream_with<C>(
|
||||
&self,
|
||||
method: Method,
|
||||
|
|
|
|||
|
|
@ -1,12 +1,12 @@
|
|||
use http::Error as HttpError;
|
||||
use http::HeaderMap;
|
||||
use http::HeaderName;
|
||||
use http::HeaderValue;
|
||||
use opentelemetry::global;
|
||||
use opentelemetry::propagation::Injector;
|
||||
use reqwest::IntoUrl;
|
||||
use reqwest::Method;
|
||||
use reqwest::Response;
|
||||
use reqwest::header::HeaderMap;
|
||||
use reqwest::header::HeaderName;
|
||||
use reqwest::header::HeaderValue;
|
||||
use serde::Serialize;
|
||||
use std::fmt::Display;
|
||||
use std::time::Duration;
|
||||
|
|
|
|||
|
|
@ -80,6 +80,7 @@ use tokio::sync::oneshot;
|
|||
use tokio::sync::oneshot::error::TryRecvError;
|
||||
use tokio_tungstenite::tungstenite::Error;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
use tracing::instrument;
|
||||
use tracing::trace;
|
||||
use tracing::warn;
|
||||
|
||||
|
|
@ -732,6 +733,18 @@ impl ModelClientSession {
|
|||
Ok(())
|
||||
}
|
||||
/// Returns a websocket connection for this turn.
|
||||
#[instrument(
|
||||
name = "model_client.websocket_connection",
|
||||
level = "info",
|
||||
skip_all,
|
||||
fields(
|
||||
provider = %self.client.state.provider.name,
|
||||
wire_api = %self.client.state.provider.wire_api,
|
||||
transport = "responses_websocket",
|
||||
api.path = "responses",
|
||||
turn.has_metadata_header = turn_metadata_header.is_some()
|
||||
)
|
||||
)]
|
||||
async fn websocket_connection(
|
||||
&mut self,
|
||||
session_telemetry: &SessionTelemetry,
|
||||
|
|
@ -789,6 +802,19 @@ impl ModelClientSession {
|
|||
/// Handles SSE fixtures, reasoning summaries, verbosity, and the
|
||||
/// `text` controls used for output schemas.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(
|
||||
name = "model_client.stream_responses_api",
|
||||
level = "info",
|
||||
skip_all,
|
||||
fields(
|
||||
model = %model_info.slug,
|
||||
wire_api = %self.client.state.provider.wire_api,
|
||||
transport = "responses_http",
|
||||
http.method = "POST",
|
||||
api.path = "responses",
|
||||
turn.has_metadata_header = turn_metadata_header.is_some()
|
||||
)
|
||||
)]
|
||||
async fn stream_responses_api(
|
||||
&self,
|
||||
prompt: &Prompt,
|
||||
|
|
@ -856,6 +882,19 @@ impl ModelClientSession {
|
|||
|
||||
/// Streams a turn via the Responses API over WebSocket transport.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(
|
||||
name = "model_client.stream_responses_websocket",
|
||||
level = "info",
|
||||
skip_all,
|
||||
fields(
|
||||
model = %model_info.slug,
|
||||
wire_api = %self.client.state.provider.wire_api,
|
||||
transport = "responses_websocket",
|
||||
api.path = "responses",
|
||||
turn.has_metadata_header = turn_metadata_header.is_some(),
|
||||
websocket.warmup = warmup
|
||||
)
|
||||
)]
|
||||
async fn stream_responses_websocket(
|
||||
&mut self,
|
||||
prompt: &Prompt,
|
||||
|
|
|
|||
|
|
@ -584,7 +584,6 @@ impl Codex {
|
|||
let session_source_clone = session_configuration.session_source.clone();
|
||||
let (agent_status_tx, agent_status_rx) = watch::channel(AgentStatus::PendingInit);
|
||||
|
||||
let session_init_span = info_span!("session_init");
|
||||
let session = Session::new(
|
||||
session_configuration,
|
||||
config.clone(),
|
||||
|
|
@ -601,7 +600,6 @@ impl Codex {
|
|||
file_watcher,
|
||||
agent_control,
|
||||
)
|
||||
.instrument(session_init_span)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!("Failed to create session: {e:#}");
|
||||
|
|
@ -1340,6 +1338,7 @@ impl Session {
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(name = "session_init", level = "info", skip_all)]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn new(
|
||||
mut session_configuration: SessionConfiguration,
|
||||
|
|
@ -1431,18 +1430,29 @@ impl Session {
|
|||
.await?;
|
||||
Ok((Some(rollout_recorder), state_db_ctx))
|
||||
}
|
||||
};
|
||||
}
|
||||
.instrument(info_span!(
|
||||
"session_init.rollout",
|
||||
otel.name = "session_init.rollout",
|
||||
session_init.ephemeral = config.ephemeral,
|
||||
));
|
||||
|
||||
let is_subagent = matches!(
|
||||
session_configuration.session_source,
|
||||
SessionSource::SubAgent(_)
|
||||
);
|
||||
let history_meta_fut = async {
|
||||
if matches!(
|
||||
session_configuration.session_source,
|
||||
SessionSource::SubAgent(_)
|
||||
) {
|
||||
if is_subagent {
|
||||
(0, 0)
|
||||
} else {
|
||||
crate::message_history::history_metadata(&config).await
|
||||
}
|
||||
};
|
||||
}
|
||||
.instrument(info_span!(
|
||||
"session_init.history_metadata",
|
||||
otel.name = "session_init.history_metadata",
|
||||
session_init.is_subagent = is_subagent,
|
||||
));
|
||||
let auth_manager_clone = Arc::clone(&auth_manager);
|
||||
let config_for_mcp = Arc::clone(&config);
|
||||
let mcp_manager_for_mcp = Arc::clone(&mcp_manager);
|
||||
|
|
@ -1455,7 +1465,11 @@ impl Session {
|
|||
)
|
||||
.await;
|
||||
(auth, mcp_servers, auth_statuses)
|
||||
};
|
||||
}
|
||||
.instrument(info_span!(
|
||||
"session_init.auth_mcp",
|
||||
otel.name = "session_init.auth_mcp",
|
||||
));
|
||||
|
||||
// Join all independent futures.
|
||||
let (
|
||||
|
|
@ -1613,7 +1627,12 @@ impl Session {
|
|||
tx
|
||||
};
|
||||
let thread_name =
|
||||
match session_index::find_thread_name_by_id(&config.codex_home, &conversation_id).await
|
||||
match session_index::find_thread_name_by_id(&config.codex_home, &conversation_id)
|
||||
.instrument(info_span!(
|
||||
"session_init.thread_name_lookup",
|
||||
otel.name = "session_init.thread_name_lookup",
|
||||
))
|
||||
.await
|
||||
{
|
||||
Ok(name) => name,
|
||||
Err(err) => {
|
||||
|
|
@ -1663,6 +1682,12 @@ impl Session {
|
|||
managed_network_requirements_enabled,
|
||||
network_proxy_audit_metadata,
|
||||
)
|
||||
.instrument(info_span!(
|
||||
"session_init.network_proxy",
|
||||
otel.name = "session_init.network_proxy",
|
||||
session_init.managed_network_requirements_enabled =
|
||||
managed_network_requirements_enabled,
|
||||
))
|
||||
.await?;
|
||||
(Some(network_proxy), Some(session_network_proxy))
|
||||
} else {
|
||||
|
|
@ -1812,6 +1837,8 @@ impl Session {
|
|||
.map(|(name, _)| name.clone())
|
||||
.collect();
|
||||
required_mcp_servers.sort();
|
||||
let enabled_mcp_server_count = mcp_servers.values().filter(|server| server.enabled).count();
|
||||
let required_mcp_server_count = required_mcp_servers.len();
|
||||
let tool_plugin_provenance = mcp_manager.tool_plugin_provenance(config.as_ref());
|
||||
{
|
||||
let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await;
|
||||
|
|
@ -1829,6 +1856,12 @@ impl Session {
|
|||
codex_apps_tools_cache_key(auth),
|
||||
tool_plugin_provenance,
|
||||
)
|
||||
.instrument(info_span!(
|
||||
"session_init.mcp_manager_init",
|
||||
otel.name = "session_init.mcp_manager_init",
|
||||
session_init.enabled_mcp_server_count = enabled_mcp_server_count,
|
||||
session_init.required_mcp_server_count = required_mcp_server_count,
|
||||
))
|
||||
.await;
|
||||
{
|
||||
let mut manager_guard = sess.services.mcp_connection_manager.write().await;
|
||||
|
|
@ -1848,6 +1881,11 @@ impl Session {
|
|||
.read()
|
||||
.await
|
||||
.required_startup_failures(&required_mcp_servers)
|
||||
.instrument(info_span!(
|
||||
"session_init.required_mcp_wait",
|
||||
otel.name = "session_init.required_mcp_wait",
|
||||
session_init.required_mcp_server_count = required_mcp_server_count,
|
||||
))
|
||||
.await;
|
||||
if !failures.is_empty() {
|
||||
let details = failures
|
||||
|
|
@ -4269,11 +4307,23 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
|
|||
}
|
||||
|
||||
fn submission_dispatch_span(sub: &Submission) -> tracing::Span {
|
||||
let op_name = sub.op.kind();
|
||||
let span_name = format!("op.dispatch.{op_name}");
|
||||
let dispatch_span = match &sub.op {
|
||||
Op::RealtimeConversationAudio(_) => {
|
||||
debug_span!("submission_dispatch", submission.id = sub.id.as_str())
|
||||
debug_span!(
|
||||
"submission_dispatch",
|
||||
otel.name = span_name.as_str(),
|
||||
submission.id = sub.id.as_str(),
|
||||
codex.op = op_name
|
||||
)
|
||||
}
|
||||
_ => info_span!("submission_dispatch", submission.id = sub.id.as_str()),
|
||||
_ => info_span!(
|
||||
"submission_dispatch",
|
||||
otel.name = span_name.as_str(),
|
||||
submission.id = sub.id.as_str(),
|
||||
codex.op = op_name
|
||||
),
|
||||
};
|
||||
if let Some(trace) = sub.trace.as_ref()
|
||||
&& !set_parent_from_w3c_trace_context(&dispatch_span, trace)
|
||||
|
|
|
|||
|
|
@ -2491,6 +2491,34 @@ fn submission_dispatch_span_uses_debug_for_realtime_audio() {
|
|||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn op_kind_distinguishes_turn_ops() {
|
||||
assert_eq!(
|
||||
Op::OverrideTurnContext {
|
||||
cwd: None,
|
||||
approval_policy: None,
|
||||
sandbox_policy: None,
|
||||
windows_sandbox_level: None,
|
||||
model: None,
|
||||
effort: None,
|
||||
summary: None,
|
||||
service_tier: None,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
}
|
||||
.kind(),
|
||||
"override_turn_context"
|
||||
);
|
||||
assert_eq!(
|
||||
Op::UserInput {
|
||||
items: vec![],
|
||||
final_output_json_schema: None,
|
||||
}
|
||||
.kind(),
|
||||
"user_input"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_task_turn_span_inherits_dispatch_trace_context() {
|
||||
struct TraceCaptureTask {
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ use codex_protocol::protocol::SandboxPolicy;
|
|||
use thiserror::Error;
|
||||
use tokio::fs;
|
||||
use tokio::task::spawn_blocking;
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::bash::parse_shell_lc_plain_commands;
|
||||
use crate::bash::parse_shell_lc_single_command_prefix;
|
||||
|
|
@ -187,6 +188,7 @@ impl ExecPolicyManager {
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "info", skip_all)]
|
||||
pub(crate) async fn load(config_stack: &ConfigLayerStack) -> Result<Self, ExecPolicyError> {
|
||||
let (policy, warning) = load_exec_policy_with_warning(config_stack).await?;
|
||||
if let Some(err) = warning.as_ref() {
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ use schemars::JsonSchema;
|
|||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::time::Duration;
|
||||
|
||||
const DEFAULT_STREAM_IDLE_TIMEOUT_MS: u64 = 300_000;
|
||||
|
|
@ -40,6 +41,15 @@ pub enum WireApi {
|
|||
Responses,
|
||||
}
|
||||
|
||||
impl fmt::Display for WireApi {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let value = match self {
|
||||
Self::Responses => "responses",
|
||||
};
|
||||
f.write_str(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for WireApi {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ use codex_protocol::openai_models::ModelInfo;
|
|||
use codex_protocol::openai_models::ModelPreset;
|
||||
use codex_protocol::openai_models::ModelsResponse;
|
||||
use http::HeaderMap;
|
||||
use std::fmt;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
|
@ -26,6 +27,7 @@ use tokio::sync::TryLockError;
|
|||
use tokio::time::timeout;
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
use tracing::instrument;
|
||||
|
||||
const MODEL_CACHE_FILE: &str = "models_cache.json";
|
||||
const DEFAULT_MODEL_CACHE_TTL: Duration = Duration::from_secs(300);
|
||||
|
|
@ -42,6 +44,22 @@ pub enum RefreshStrategy {
|
|||
OnlineIfUncached,
|
||||
}
|
||||
|
||||
impl RefreshStrategy {
|
||||
const fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
Self::Online => "online",
|
||||
Self::Offline => "offline",
|
||||
Self::OnlineIfUncached => "online_if_uncached",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for RefreshStrategy {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.write_str(self.as_str())
|
||||
}
|
||||
}
|
||||
|
||||
/// How the manager's base catalog is sourced for the lifetime of the process.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
enum CatalogMode {
|
||||
|
|
@ -102,6 +120,11 @@ impl ModelsManager {
|
|||
/// List all available models, refreshing according to the specified strategy.
|
||||
///
|
||||
/// Returns model presets sorted by priority and filtered by auth mode and visibility.
|
||||
#[instrument(
|
||||
level = "info",
|
||||
skip(self),
|
||||
fields(refresh_strategy = %refresh_strategy)
|
||||
)]
|
||||
pub async fn list_models(&self, refresh_strategy: RefreshStrategy) -> Vec<ModelPreset> {
|
||||
if let Err(err) = self.refresh_available_models(refresh_strategy).await {
|
||||
error!("failed to refresh available models: {err}");
|
||||
|
|
@ -137,6 +160,14 @@ impl ModelsManager {
|
|||
///
|
||||
/// If `model` is provided, returns it directly. Otherwise selects the default based on
|
||||
/// auth mode and available models.
|
||||
#[instrument(
|
||||
level = "info",
|
||||
skip(self, model),
|
||||
fields(
|
||||
model.provided = model.is_some(),
|
||||
refresh_strategy = %refresh_strategy
|
||||
)
|
||||
)]
|
||||
pub async fn get_default_model(
|
||||
&self,
|
||||
model: &Option<String>,
|
||||
|
|
@ -160,6 +191,7 @@ impl ModelsManager {
|
|||
|
||||
// todo(aibrahim): look if we can tighten it to pub(crate)
|
||||
/// Look up model metadata, applying remote overrides and config adjustments.
|
||||
#[instrument(level = "info", skip(self, config), fields(model = model))]
|
||||
pub async fn get_model_info(&self, model: &str, config: &Config) -> ModelInfo {
|
||||
let remote_models = self.get_remote_models().await;
|
||||
Self::construct_model_info_from_candidates(model, &remote_models, config)
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ use std::path::PathBuf;
|
|||
use tokio::io::AsyncReadExt;
|
||||
use toml::Value as TomlValue;
|
||||
use tracing::error;
|
||||
use tracing::instrument;
|
||||
|
||||
pub(crate) const HIERARCHICAL_AGENTS_MESSAGE: &str =
|
||||
include_str!("../hierarchical_agents_message.md");
|
||||
|
|
@ -80,6 +81,7 @@ fn render_js_repl_instructions(config: &Config) -> Option<String> {
|
|||
|
||||
/// Combines `Config::instructions` and `AGENTS.md` (if present) into a single
|
||||
/// string of instructions.
|
||||
#[instrument(level = "info", skip_all)]
|
||||
pub(crate) async fn get_user_instructions(
|
||||
config: &Config,
|
||||
skills: Option<&[SkillMetadata]>,
|
||||
|
|
|
|||
|
|
@ -478,6 +478,47 @@ pub enum Op {
|
|||
ListModels,
|
||||
}
|
||||
|
||||
impl Op {
|
||||
pub fn kind(&self) -> &'static str {
|
||||
match self {
|
||||
Self::Interrupt => "interrupt",
|
||||
Self::CleanBackgroundTerminals => "clean_background_terminals",
|
||||
Self::RealtimeConversationStart(_) => "realtime_conversation_start",
|
||||
Self::RealtimeConversationAudio(_) => "realtime_conversation_audio",
|
||||
Self::RealtimeConversationText(_) => "realtime_conversation_text",
|
||||
Self::RealtimeConversationClose => "realtime_conversation_close",
|
||||
Self::UserInput { .. } => "user_input",
|
||||
Self::UserTurn { .. } => "user_turn",
|
||||
Self::OverrideTurnContext { .. } => "override_turn_context",
|
||||
Self::ExecApproval { .. } => "exec_approval",
|
||||
Self::PatchApproval { .. } => "patch_approval",
|
||||
Self::ResolveElicitation { .. } => "resolve_elicitation",
|
||||
Self::UserInputAnswer { .. } => "user_input_answer",
|
||||
Self::RequestPermissionsResponse { .. } => "request_permissions_response",
|
||||
Self::DynamicToolResponse { .. } => "dynamic_tool_response",
|
||||
Self::AddToHistory { .. } => "add_to_history",
|
||||
Self::GetHistoryEntryRequest { .. } => "get_history_entry_request",
|
||||
Self::ListMcpTools => "list_mcp_tools",
|
||||
Self::RefreshMcpServers { .. } => "refresh_mcp_servers",
|
||||
Self::ReloadUserConfig => "reload_user_config",
|
||||
Self::ListCustomPrompts => "list_custom_prompts",
|
||||
Self::ListSkills { .. } => "list_skills",
|
||||
Self::ListRemoteSkills { .. } => "list_remote_skills",
|
||||
Self::DownloadRemoteSkill { .. } => "download_remote_skill",
|
||||
Self::Compact => "compact",
|
||||
Self::DropMemories => "drop_memories",
|
||||
Self::UpdateMemories => "update_memories",
|
||||
Self::SetThreadName { .. } => "set_thread_name",
|
||||
Self::Undo => "undo",
|
||||
Self::ThreadRollback { .. } => "thread_rollback",
|
||||
Self::Review { .. } => "review",
|
||||
Self::Shutdown => "shutdown",
|
||||
Self::RunUserShellCommand { .. } => "run_user_shell_command",
|
||||
Self::ListModels => "list_models",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Determines the conditions under which the user is consulted to approve
|
||||
/// running the command proposed by Codex.
|
||||
#[derive(
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue