feat(app-server-test-client): OTEL setup for tracing (#13493)

### Overview
This PR:
- Updates `app-server-test-client` to load OTEL settings from
`$CODEX_HOME/config.toml` and initializes its own OTEL provider.
- Add real client root spans to app-server test client traces.

This updates `codex-app-server-test-client` so its Datadog traces
reflect the full client-driven flow instead of a set of server spans
stitched together under a synthetic parent.

Before this change, the test client generated a fake `traceparent` once
and reused it for every JSON-RPC request. That kept the requests in one
trace, but there was no real client span at the top, so Datadog ended up
showing the sequence in a slightly misleading way, where all RPCs were
anchored under `initialize`.

Now the test client:
- loads OTEL settings from the normal Codex config path, including
`$CODEX_HOME/config.toml` and existing --config overrides
- initializes tracing the same way other Codex binaries do when trace
export is enabled
- creates a real client root span for each scripted command
- creates per-request client spans for JSON-RPC methods like
`initialize`, `thread/start`, and `turn/start`
- injects W3C trace context from the current client span into
request.trace instead of reusing a fabricated carrier

This gives us a cleaner trace shape in Datadog:
- one trace URL for the whole scripted flow
- a visible client root span
- proper client/server parent-child relationships for each app-server
request
This commit is contained in:
Owen Lin 2026-03-04 13:30:09 -08:00 committed by GitHub
parent 2322e49549
commit 8dfd654196
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 379 additions and 228 deletions

6
codex-rs/Cargo.lock generated
View file

@ -1490,9 +1490,15 @@ dependencies = [
"anyhow",
"clap",
"codex-app-server-protocol",
"codex-core",
"codex-otel",
"codex-protocol",
"codex-utils-cli",
"serde",
"serde_json",
"tokio",
"tracing",
"tracing-subscriber",
"tungstenite",
"url",
"uuid",

View file

@ -3914,4 +3914,4 @@
}
],
"title": "ClientRequest"
}
}

View file

@ -15391,4 +15391,4 @@
},
"title": "CodexAppServerProtocol",
"type": "object"
}
}

View file

@ -14291,4 +14291,4 @@
},
"title": "CodexAppServerProtocolV2",
"type": "object"
}
}

View file

@ -25,4 +25,4 @@
],
"title": "WindowsSandboxSetupStartParams",
"type": "object"
}
}

View file

@ -11,9 +11,15 @@ workspace = true
anyhow = { workspace = true }
clap = { workspace = true, features = ["derive", "env"] }
codex-app-server-protocol = { workspace = true }
codex-core = { workspace = true }
codex-otel = { workspace = true }
codex-protocol = { workspace = true }
codex-utils-cli = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["rt"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
tungstenite = { workspace = true }
url = { workspace = true }
uuid = { workspace = true, features = ["v4"] }

View file

@ -62,10 +62,17 @@ use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_core::config::Config;
use codex_otel::current_span_w3c_trace_context;
use codex_otel::otel_provider::OtelProvider;
use codex_protocol::protocol::W3cTraceContext;
use codex_utils_cli::CliConfigOverrides;
use serde::Serialize;
use serde::de::DeserializeOwned;
use serde_json::Value;
use tracing::info_span;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tungstenite::Message;
use tungstenite::WebSocket;
use tungstenite::connect;
@ -98,6 +105,10 @@ const NOTIFICATIONS_TO_OPT_OUT: &[&str] = &[
];
const APP_SERVER_GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
const APP_SERVER_GRACEFUL_SHUTDOWN_POLL_INTERVAL: Duration = Duration::from_millis(100);
const DEFAULT_ANALYTICS_ENABLED: bool = true;
const OTEL_SERVICE_NAME: &str = "codex-app-server-test-client";
const TRACE_DISABLED_MESSAGE: &str =
"Not enabled - enable tracing in $CODEX_HOME/config.toml to get a trace URL!";
/// Minimal launcher that initializes the Codex app-server and logs the handshake.
#[derive(Parser)]
@ -236,7 +247,7 @@ enum CliCommand {
},
}
pub fn run() -> Result<()> {
pub async fn run() -> Result<()> {
let Cli {
codex_bin,
url,
@ -256,7 +267,7 @@ pub fn run() -> Result<()> {
CliCommand::SendMessage { user_message } => {
ensure_dynamic_tools_unused(&dynamic_tools, "send-message")?;
let endpoint = resolve_endpoint(codex_bin, url)?;
send_message(&endpoint, &config_overrides, user_message)
send_message(&endpoint, &config_overrides, user_message).await
}
CliCommand::SendMessageV2 {
experimental_api,
@ -270,6 +281,7 @@ pub fn run() -> Result<()> {
experimental_api,
&dynamic_tools,
)
.await
}
CliCommand::ResumeMessageV2 {
thread_id,
@ -283,28 +295,29 @@ pub fn run() -> Result<()> {
user_message,
&dynamic_tools,
)
.await
}
CliCommand::ThreadResume { thread_id } => {
ensure_dynamic_tools_unused(&dynamic_tools, "thread-resume")?;
let endpoint = resolve_endpoint(codex_bin, url)?;
thread_resume_follow(&endpoint, &config_overrides, thread_id)
thread_resume_follow(&endpoint, &config_overrides, thread_id).await
}
CliCommand::Watch => {
ensure_dynamic_tools_unused(&dynamic_tools, "watch")?;
let endpoint = resolve_endpoint(codex_bin, url)?;
watch(&endpoint, &config_overrides)
watch(&endpoint, &config_overrides).await
}
CliCommand::TriggerCmdApproval { user_message } => {
let endpoint = resolve_endpoint(codex_bin, url)?;
trigger_cmd_approval(&endpoint, &config_overrides, user_message, &dynamic_tools)
trigger_cmd_approval(&endpoint, &config_overrides, user_message, &dynamic_tools).await
}
CliCommand::TriggerPatchApproval { user_message } => {
let endpoint = resolve_endpoint(codex_bin, url)?;
trigger_patch_approval(&endpoint, &config_overrides, user_message, &dynamic_tools)
trigger_patch_approval(&endpoint, &config_overrides, user_message, &dynamic_tools).await
}
CliCommand::NoTriggerCmdApproval => {
let endpoint = resolve_endpoint(codex_bin, url)?;
no_trigger_cmd_approval(&endpoint, &config_overrides, &dynamic_tools)
no_trigger_cmd_approval(&endpoint, &config_overrides, &dynamic_tools).await
}
CliCommand::SendFollowUpV2 {
first_message,
@ -318,6 +331,7 @@ pub fn run() -> Result<()> {
follow_up_message,
&dynamic_tools,
)
.await
}
CliCommand::TriggerZshForkMultiCmdApproval {
user_message,
@ -333,26 +347,27 @@ pub fn run() -> Result<()> {
abort_on,
&dynamic_tools,
)
.await
}
CliCommand::TestLogin => {
ensure_dynamic_tools_unused(&dynamic_tools, "test-login")?;
let endpoint = resolve_endpoint(codex_bin, url)?;
test_login(&endpoint, &config_overrides)
test_login(&endpoint, &config_overrides).await
}
CliCommand::GetAccountRateLimits => {
ensure_dynamic_tools_unused(&dynamic_tools, "get-account-rate-limits")?;
let endpoint = resolve_endpoint(codex_bin, url)?;
get_account_rate_limits(&endpoint, &config_overrides)
get_account_rate_limits(&endpoint, &config_overrides).await
}
CliCommand::ModelList => {
ensure_dynamic_tools_unused(&dynamic_tools, "model-list")?;
let endpoint = resolve_endpoint(codex_bin, url)?;
model_list(&endpoint, &config_overrides)
model_list(&endpoint, &config_overrides).await
}
CliCommand::ThreadList { limit } => {
ensure_dynamic_tools_unused(&dynamic_tools, "thread-list")?;
let endpoint = resolve_endpoint(codex_bin, url)?;
thread_list(&endpoint, &config_overrides, limit)
thread_list(&endpoint, &config_overrides, limit).await
}
}
}
@ -487,7 +502,15 @@ fn shell_quote(input: &str) -> String {
format!("'{}'", input.replace('\'', "'\\''"))
}
fn send_message(
struct SendMessagePolicies<'a> {
command_name: &'static str,
experimental_api: bool,
approval_policy: Option<AskForApproval>,
sandbox_policy: Option<SandboxPolicy>,
dynamic_tools: &'a Option<Vec<DynamicToolSpec>>,
}
async fn send_message(
endpoint: &Endpoint,
config_overrides: &[String],
user_message: String,
@ -497,14 +520,18 @@ fn send_message(
endpoint,
config_overrides,
user_message,
false,
None,
None,
&dynamic_tools,
SendMessagePolicies {
command_name: "send-message",
experimental_api: false,
approval_policy: None,
sandbox_policy: None,
dynamic_tools: &dynamic_tools,
},
)
.await
}
pub fn send_message_v2(
pub async fn send_message_v2(
codex_bin: &Path,
config_overrides: &[String],
user_message: String,
@ -518,9 +545,10 @@ pub fn send_message_v2(
true,
dynamic_tools,
)
.await
}
fn send_message_v2_endpoint(
async fn send_message_v2_endpoint(
endpoint: &Endpoint,
config_overrides: &[String],
user_message: String,
@ -535,14 +563,18 @@ fn send_message_v2_endpoint(
endpoint,
config_overrides,
user_message,
experimental_api,
None,
None,
dynamic_tools,
SendMessagePolicies {
command_name: "send-message-v2",
experimental_api,
approval_policy: None,
sandbox_policy: None,
dynamic_tools,
},
)
.await
}
fn trigger_zsh_fork_multi_cmd_approval(
async fn trigger_zsh_fork_multi_cmd_approval(
endpoint: &Endpoint,
config_overrides: &[String],
user_message: Option<String>,
@ -559,89 +591,96 @@ fn trigger_zsh_fork_multi_cmd_approval(
let default_prompt = "Run this exact command using shell command execution without rewriting or splitting it: /usr/bin/true && /usr/bin/true";
let message = user_message.unwrap_or_else(|| default_prompt.to_string());
with_client(endpoint, config_overrides, |client| {
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
with_client(
"trigger-zsh-fork-multi-cmd-approval",
endpoint,
config_overrides,
|client| {
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let thread_response = client.thread_start(ThreadStartParams {
dynamic_tools: dynamic_tools.clone(),
..Default::default()
})?;
println!("< thread/start response: {thread_response:?}");
let thread_response = client.thread_start(ThreadStartParams {
dynamic_tools: dynamic_tools.clone(),
..Default::default()
})?;
println!("< thread/start response: {thread_response:?}");
client.command_approval_behavior = match abort_on {
Some(index) => CommandApprovalBehavior::AbortOn(index),
None => CommandApprovalBehavior::AlwaysAccept,
};
client.command_approval_count = 0;
client.command_approval_item_ids.clear();
client.command_execution_statuses.clear();
client.last_turn_status = None;
client.command_approval_behavior = match abort_on {
Some(index) => CommandApprovalBehavior::AbortOn(index),
None => CommandApprovalBehavior::AlwaysAccept,
};
client.command_approval_count = 0;
client.command_approval_item_ids.clear();
client.command_execution_statuses.clear();
client.last_turn_status = None;
let mut turn_params = TurnStartParams {
thread_id: thread_response.thread.id.clone(),
input: vec![V2UserInput::Text {
text: message,
text_elements: Vec::new(),
}],
..Default::default()
};
turn_params.approval_policy = Some(AskForApproval::OnRequest);
turn_params.sandbox_policy = Some(SandboxPolicy::ReadOnly {
access: ReadOnlyAccess::FullAccess,
network_access: false,
});
let mut turn_params = TurnStartParams {
thread_id: thread_response.thread.id.clone(),
input: vec![V2UserInput::Text {
text: message,
text_elements: Vec::new(),
}],
..Default::default()
};
turn_params.approval_policy = Some(AskForApproval::OnRequest);
turn_params.sandbox_policy = Some(SandboxPolicy::ReadOnly {
access: ReadOnlyAccess::FullAccess,
network_access: false,
});
let turn_response = client.turn_start(turn_params)?;
println!("< turn/start response: {turn_response:?}");
client.stream_turn(&thread_response.thread.id, &turn_response.turn.id)?;
let turn_response = client.turn_start(turn_params)?;
println!("< turn/start response: {turn_response:?}");
client.stream_turn(&thread_response.thread.id, &turn_response.turn.id)?;
if client.command_approval_count < min_approvals {
bail!(
"expected at least {min_approvals} command approvals, got {}",
client.command_approval_count
);
}
let mut approvals_per_item = std::collections::BTreeMap::new();
for item_id in &client.command_approval_item_ids {
*approvals_per_item.entry(item_id.clone()).or_insert(0usize) += 1;
}
let max_approvals_for_one_item = approvals_per_item.values().copied().max().unwrap_or(0);
if max_approvals_for_one_item < min_approvals {
bail!(
"expected at least {min_approvals} approvals for one command item, got max {max_approvals_for_one_item} with map {approvals_per_item:?}"
);
}
let last_command_status = client.command_execution_statuses.last();
if abort_on.is_none() {
if last_command_status != Some(&CommandExecutionStatus::Completed) {
bail!("expected completed command execution, got {last_command_status:?}");
}
if client.last_turn_status != Some(TurnStatus::Completed) {
if client.command_approval_count < min_approvals {
bail!(
"expected completed turn in all-accept flow, got {:?}",
client.last_turn_status
"expected at least {min_approvals} command approvals, got {}",
client.command_approval_count
);
}
} else if last_command_status == Some(&CommandExecutionStatus::Completed) {
bail!(
"expected non-completed command execution in mixed approval/decline flow, got {last_command_status:?}"
let mut approvals_per_item = std::collections::BTreeMap::new();
for item_id in &client.command_approval_item_ids {
*approvals_per_item.entry(item_id.clone()).or_insert(0usize) += 1;
}
let max_approvals_for_one_item =
approvals_per_item.values().copied().max().unwrap_or(0);
if max_approvals_for_one_item < min_approvals {
bail!(
"expected at least {min_approvals} approvals for one command item, got max {max_approvals_for_one_item} with map {approvals_per_item:?}"
);
}
let last_command_status = client.command_execution_statuses.last();
if abort_on.is_none() {
if last_command_status != Some(&CommandExecutionStatus::Completed) {
bail!("expected completed command execution, got {last_command_status:?}");
}
if client.last_turn_status != Some(TurnStatus::Completed) {
bail!(
"expected completed turn in all-accept flow, got {:?}",
client.last_turn_status
);
}
} else if last_command_status == Some(&CommandExecutionStatus::Completed) {
bail!(
"expected non-completed command execution in mixed approval/decline flow, got {last_command_status:?}"
);
}
println!(
"[zsh-fork multi-approval summary] approvals={}, approvals_per_item={approvals_per_item:?}, command_statuses={:?}, turn_status={:?}",
client.command_approval_count,
client.command_execution_statuses,
client.last_turn_status
);
}
println!(
"[zsh-fork multi-approval summary] approvals={}, approvals_per_item={approvals_per_item:?}, command_statuses={:?}, turn_status={:?}",
client.command_approval_count,
client.command_execution_statuses,
client.last_turn_status
);
Ok(())
})
Ok(())
},
)
.await
}
fn resume_message_v2(
async fn resume_message_v2(
endpoint: &Endpoint,
config_overrides: &[String],
thread_id: String,
@ -650,7 +689,7 @@ fn resume_message_v2(
) -> Result<()> {
ensure_dynamic_tools_unused(dynamic_tools, "resume-message-v2")?;
with_client(endpoint, config_overrides, |client| {
with_client("resume-message-v2", endpoint, config_overrides, |client| {
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
@ -674,39 +713,42 @@ fn resume_message_v2(
Ok(())
})
.await
}
fn thread_resume_follow(
async fn thread_resume_follow(
endpoint: &Endpoint,
config_overrides: &[String],
thread_id: String,
) -> Result<()> {
let mut client = CodexClient::connect(endpoint, config_overrides)?;
with_client("thread-resume", endpoint, config_overrides, |client| {
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let resume_response = client.thread_resume(ThreadResumeParams {
thread_id,
..Default::default()
})?;
println!("< thread/resume response: {resume_response:?}");
println!("< streaming notifications until process is terminated");
let resume_response = client.thread_resume(ThreadResumeParams {
thread_id,
..Default::default()
})?;
println!("< thread/resume response: {resume_response:?}");
println!("< streaming notifications until process is terminated");
client.stream_notifications_forever()
client.stream_notifications_forever()
})
.await
}
fn watch(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> {
let mut client = CodexClient::connect(endpoint, config_overrides)?;
async fn watch(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> {
with_client("watch", endpoint, config_overrides, |client| {
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
println!("< streaming inbound messages until process is terminated");
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
println!("< streaming inbound messages until process is terminated");
client.stream_notifications_forever()
client.stream_notifications_forever()
})
.await
}
fn trigger_cmd_approval(
async fn trigger_cmd_approval(
endpoint: &Endpoint,
config_overrides: &[String],
user_message: Option<String>,
@ -719,17 +761,21 @@ fn trigger_cmd_approval(
endpoint,
config_overrides,
message,
true,
Some(AskForApproval::OnRequest),
Some(SandboxPolicy::ReadOnly {
access: ReadOnlyAccess::FullAccess,
network_access: false,
}),
dynamic_tools,
SendMessagePolicies {
command_name: "trigger-cmd-approval",
experimental_api: true,
approval_policy: Some(AskForApproval::OnRequest),
sandbox_policy: Some(SandboxPolicy::ReadOnly {
access: ReadOnlyAccess::FullAccess,
network_access: false,
}),
dynamic_tools,
},
)
.await
}
fn trigger_patch_approval(
async fn trigger_patch_approval(
endpoint: &Endpoint,
config_overrides: &[String],
user_message: Option<String>,
@ -742,17 +788,21 @@ fn trigger_patch_approval(
endpoint,
config_overrides,
message,
true,
Some(AskForApproval::OnRequest),
Some(SandboxPolicy::ReadOnly {
access: ReadOnlyAccess::FullAccess,
network_access: false,
}),
dynamic_tools,
SendMessagePolicies {
command_name: "trigger-patch-approval",
experimental_api: true,
approval_policy: Some(AskForApproval::OnRequest),
sandbox_policy: Some(SandboxPolicy::ReadOnly {
access: ReadOnlyAccess::FullAccess,
network_access: false,
}),
dynamic_tools,
},
)
.await
}
fn no_trigger_cmd_approval(
async fn no_trigger_cmd_approval(
endpoint: &Endpoint,
config_overrides: &[String],
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
@ -762,60 +812,67 @@ fn no_trigger_cmd_approval(
endpoint,
config_overrides,
prompt.to_string(),
true,
None,
None,
dynamic_tools,
SendMessagePolicies {
command_name: "no-trigger-cmd-approval",
experimental_api: true,
approval_policy: None,
sandbox_policy: None,
dynamic_tools,
},
)
.await
}
fn send_message_v2_with_policies(
async fn send_message_v2_with_policies(
endpoint: &Endpoint,
config_overrides: &[String],
user_message: String,
experimental_api: bool,
approval_policy: Option<AskForApproval>,
sandbox_policy: Option<SandboxPolicy>,
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
policies: SendMessagePolicies<'_>,
) -> Result<()> {
with_client(endpoint, config_overrides, |client| {
let initialize = client.initialize_with_experimental_api(experimental_api)?;
println!("< initialize response: {initialize:?}");
with_client(
policies.command_name,
endpoint,
config_overrides,
|client| {
let initialize = client.initialize_with_experimental_api(policies.experimental_api)?;
println!("< initialize response: {initialize:?}");
let thread_response = client.thread_start(ThreadStartParams {
dynamic_tools: dynamic_tools.clone(),
..Default::default()
})?;
println!("< thread/start response: {thread_response:?}");
let mut turn_params = TurnStartParams {
thread_id: thread_response.thread.id.clone(),
input: vec![V2UserInput::Text {
text: user_message,
// Test client sends plain text without UI element ranges.
text_elements: Vec::new(),
}],
..Default::default()
};
turn_params.approval_policy = approval_policy;
turn_params.sandbox_policy = sandbox_policy;
let thread_response = client.thread_start(ThreadStartParams {
dynamic_tools: policies.dynamic_tools.clone(),
..Default::default()
})?;
println!("< thread/start response: {thread_response:?}");
let mut turn_params = TurnStartParams {
thread_id: thread_response.thread.id.clone(),
input: vec![V2UserInput::Text {
text: user_message,
// Test client sends plain text without UI element ranges.
text_elements: Vec::new(),
}],
..Default::default()
};
turn_params.approval_policy = policies.approval_policy;
turn_params.sandbox_policy = policies.sandbox_policy;
let turn_response = client.turn_start(turn_params)?;
println!("< turn/start response: {turn_response:?}");
let turn_response = client.turn_start(turn_params)?;
println!("< turn/start response: {turn_response:?}");
client.stream_turn(&thread_response.thread.id, &turn_response.turn.id)?;
client.stream_turn(&thread_response.thread.id, &turn_response.turn.id)?;
Ok(())
})
Ok(())
},
)
.await
}
fn send_follow_up_v2(
async fn send_follow_up_v2(
endpoint: &Endpoint,
config_overrides: &[String],
first_message: String,
follow_up_message: String,
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
) -> Result<()> {
with_client(endpoint, config_overrides, |client| {
with_client("send-follow-up-v2", endpoint, config_overrides, |client| {
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
@ -853,10 +910,11 @@ fn send_follow_up_v2(
Ok(())
})
.await
}
fn test_login(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> {
with_client(endpoint, config_overrides, |client| {
async fn test_login(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> {
with_client("test-login", endpoint, config_overrides, |client| {
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
@ -883,22 +941,29 @@ fn test_login(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> {
);
}
})
.await
}
fn get_account_rate_limits(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> {
with_client(endpoint, config_overrides, |client| {
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
async fn get_account_rate_limits(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> {
with_client(
"get-account-rate-limits",
endpoint,
config_overrides,
|client| {
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let response = client.get_account_rate_limits()?;
println!("< account/rateLimits/read response: {response:?}");
let response = client.get_account_rate_limits()?;
println!("< account/rateLimits/read response: {response:?}");
Ok(())
})
Ok(())
},
)
.await
}
fn model_list(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> {
with_client(endpoint, config_overrides, |client| {
async fn model_list(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> {
with_client("model-list", endpoint, config_overrides, |client| {
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
@ -907,10 +972,11 @@ fn model_list(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> {
Ok(())
})
.await
}
fn thread_list(endpoint: &Endpoint, config_overrides: &[String], limit: u32) -> Result<()> {
with_client(endpoint, config_overrides, |client| {
async fn thread_list(endpoint: &Endpoint, config_overrides: &[String], limit: u32) -> Result<()> {
with_client("thread-list", endpoint, config_overrides, |client| {
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
@ -928,16 +994,28 @@ fn thread_list(endpoint: &Endpoint, config_overrides: &[String], limit: u32) ->
Ok(())
})
.await
}
fn with_client<T>(
async fn with_client<T>(
command_name: &'static str,
endpoint: &Endpoint,
config_overrides: &[String],
f: impl FnOnce(&mut CodexClient) -> Result<T>,
) -> Result<T> {
let mut client = CodexClient::connect(endpoint, config_overrides)?;
let result = f(&mut client);
client.print_trace_summary();
let tracing = TestClientTracing::initialize(config_overrides).await?;
let command_span = info_span!(
"app_server_test_client.command",
otel.kind = "client",
otel.name = command_name,
app_server_test_client.command = command_name,
);
let trace_summary = command_span.in_scope(|| TraceSummary::capture(tracing.traces_enabled));
let result = command_span.in_scope(|| {
let mut client = CodexClient::connect(endpoint, config_overrides)?;
f(&mut client)
});
print_trace_summary(&trace_summary);
result
}
@ -995,8 +1073,6 @@ struct CodexClient {
command_approval_item_ids: Vec<String>,
command_execution_statuses: Vec<CommandExecutionStatus>,
last_turn_status: Option<TurnStatus>,
trace_id: String,
trace_root_span_id: String,
}
#[derive(Debug, Clone, Copy)]
@ -1056,8 +1132,6 @@ impl CodexClient {
command_approval_item_ids: Vec::new(),
command_execution_statuses: Vec::new(),
last_turn_status: None,
trace_id: generate_trace_id(),
trace_root_span_id: generate_parent_span_id(),
})
}
@ -1079,8 +1153,6 @@ impl CodexClient {
command_approval_item_ids: Vec::new(),
command_execution_statuses: Vec::new(),
last_turn_status: None,
trace_id: generate_trace_id(),
trace_root_span_id: generate_parent_span_id(),
})
}
@ -1302,37 +1374,31 @@ impl CodexClient {
where
T: DeserializeOwned,
{
self.write_request(&request)?;
self.wait_for_response(request_id, method)
let request_span = info_span!(
"app_server_test_client.request",
otel.kind = "client",
otel.name = method,
rpc.system = "jsonrpc",
rpc.method = method,
rpc.request_id = ?request_id,
);
request_span.in_scope(|| {
self.write_request(&request)?;
self.wait_for_response(request_id, method)
})
}
fn write_request(&mut self, request: &ClientRequest) -> Result<()> {
let request = self.jsonrpc_request_with_trace(request)?;
let request_value = serde_json::to_value(request)?;
let mut request: JSONRPCRequest = serde_json::from_value(request_value)
.context("client request was not a valid JSON-RPC request")?;
request.trace = current_span_w3c_trace_context();
let request_json = serde_json::to_string(&request)?;
let request_pretty = serde_json::to_string_pretty(&request)?;
print_multiline_with_prefix("> ", &request_pretty);
self.write_payload(&request_json)
}
fn jsonrpc_request_with_trace(&self, request: &ClientRequest) -> Result<JSONRPCRequest> {
let request_value = serde_json::to_value(request)?;
let mut request: JSONRPCRequest = serde_json::from_value(request_value)
.context("client request was not a valid JSON-RPC request")?;
request.trace = Some(W3cTraceContext {
traceparent: Some(format!(
"00-{}-{}-01",
self.trace_id, self.trace_root_span_id
)),
tracestate: None,
});
Ok(request)
}
fn print_trace_summary(&self) {
println!("\n[Datadog trace]");
println!("go/trace/{}\n", self.trace_id);
}
fn wait_for_response<T>(&mut self, request_id: RequestId, method: &str) -> Result<T>
where
T: DeserializeOwned,
@ -1598,21 +1664,91 @@ impl CodexClient {
}
}
fn generate_trace_id() -> String {
Uuid::new_v4().simple().to_string()
}
fn generate_parent_span_id() -> String {
let uuid = Uuid::new_v4().simple().to_string();
uuid[..16].to_string()
}
fn print_multiline_with_prefix(prefix: &str, payload: &str) {
for line in payload.lines() {
println!("{prefix}{line}");
}
}
struct TestClientTracing {
_otel_provider: Option<OtelProvider>,
traces_enabled: bool,
}
impl TestClientTracing {
async fn initialize(config_overrides: &[String]) -> Result<Self> {
let cli_kv_overrides = CliConfigOverrides {
raw_overrides: config_overrides.to_vec(),
}
.parse_overrides()
.map_err(|e| anyhow::anyhow!("error parsing -c overrides: {e}"))?;
let config = Config::load_with_cli_overrides(cli_kv_overrides)
.await
.context("error loading config")?;
let otel_provider = codex_core::otel_init::build_provider(
&config,
env!("CARGO_PKG_VERSION"),
Some(OTEL_SERVICE_NAME),
DEFAULT_ANALYTICS_ENABLED,
)
.map_err(|e| anyhow::anyhow!("error loading otel config: {e}"))?;
let traces_enabled = otel_provider
.as_ref()
.and_then(|provider| provider.tracer_provider.as_ref())
.is_some();
if let Some(provider) = otel_provider.as_ref()
&& traces_enabled
{
let _ = tracing_subscriber::registry()
.with(provider.tracing_layer())
.try_init();
}
Ok(Self {
traces_enabled,
_otel_provider: otel_provider,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum TraceSummary {
Enabled { url: String },
Disabled,
}
impl TraceSummary {
fn capture(traces_enabled: bool) -> Self {
if !traces_enabled {
return Self::Disabled;
}
current_span_w3c_trace_context()
.as_ref()
.and_then(trace_url_from_context)
.map_or(Self::Disabled, |url| Self::Enabled { url })
}
}
fn trace_url_from_context(trace: &W3cTraceContext) -> Option<String> {
let traceparent = trace.traceparent.as_deref()?;
let mut parts = traceparent.split('-');
match (parts.next(), parts.next(), parts.next(), parts.next()) {
(Some(_version), Some(trace_id), Some(_span_id), Some(_trace_flags))
if trace_id.len() == 32 =>
{
Some(format!("go/trace/{trace_id}"))
}
_ => None,
}
}
fn print_trace_summary(trace_summary: &TraceSummary) {
println!("\n[Datadog trace]");
match trace_summary {
TraceSummary::Enabled { url } => println!("{url}\n"),
TraceSummary::Disabled => println!("{TRACE_DISABLED_MESSAGE}\n"),
}
}
impl Drop for CodexClient {
fn drop(&mut self) {
let ClientTransport::Stdio { child, stdin, .. } = &mut self.transport else {

View file

@ -1,5 +1,7 @@
use anyhow::Result;
use tokio::runtime::Builder;
fn main() -> Result<()> {
codex_app_server_test_client::run()
let runtime = Builder::new_current_thread().enable_all().build()?;
runtime.block_on(codex_app_server_test_client::run())
}

View file

@ -473,11 +473,12 @@ fn run_execpolicycheck(cmd: ExecPolicyCheckCommand) -> anyhow::Result<()> {
cmd.run()
}
fn run_debug_app_server_command(cmd: DebugAppServerCommand) -> anyhow::Result<()> {
async fn run_debug_app_server_command(cmd: DebugAppServerCommand) -> anyhow::Result<()> {
match cmd.subcommand {
DebugAppServerSubcommand::SendMessageV2(cmd) => {
let codex_bin = std::env::current_exe()?;
codex_app_server_test_client::send_message_v2(&codex_bin, &[], cmd.user_message, &None)
.await
}
}
}
@ -755,7 +756,7 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
},
Some(Subcommand::Debug(DebugCommand { subcommand })) => match subcommand {
DebugSubcommand::AppServer(cmd) => {
run_debug_app_server_command(cmd)?;
run_debug_app_server_command(cmd).await?;
}
DebugSubcommand::ClearMemories => {
run_debug_clear_memories_command(&root_config_overrides, &interactive).await?;