diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index ba3156dad..ab8743e84 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1490,9 +1490,15 @@ dependencies = [ "anyhow", "clap", "codex-app-server-protocol", + "codex-core", + "codex-otel", "codex-protocol", + "codex-utils-cli", "serde", "serde_json", + "tokio", + "tracing", + "tracing-subscriber", "tungstenite", "url", "uuid", diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index 62495a79a..0e3c68de3 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -3914,4 +3914,4 @@ } ], "title": "ClientRequest" -} +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index f09f2fc55..86cbda281 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -15391,4 +15391,4 @@ }, "title": "CodexAppServerProtocol", "type": "object" -} +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index 7f37aaecb..342820599 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -14291,4 +14291,4 @@ }, "title": "CodexAppServerProtocolV2", "type": "object" -} +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/json/v2/WindowsSandboxSetupStartParams.json b/codex-rs/app-server-protocol/schema/json/v2/WindowsSandboxSetupStartParams.json index 663d36064..00edc8b02 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/WindowsSandboxSetupStartParams.json +++ b/codex-rs/app-server-protocol/schema/json/v2/WindowsSandboxSetupStartParams.json @@ -25,4 +25,4 @@ ], "title": "WindowsSandboxSetupStartParams", "type": "object" -} +} \ No newline at end of file diff --git a/codex-rs/app-server-test-client/Cargo.toml b/codex-rs/app-server-test-client/Cargo.toml index 05c8938f0..32d2588fd 100644 --- a/codex-rs/app-server-test-client/Cargo.toml +++ b/codex-rs/app-server-test-client/Cargo.toml @@ -11,9 +11,15 @@ workspace = true anyhow = { workspace = true } clap = { workspace = true, features = ["derive", "env"] } codex-app-server-protocol = { workspace = true } +codex-core = { workspace = true } +codex-otel = { workspace = true } codex-protocol = { workspace = true } +codex-utils-cli = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } +tokio = { workspace = true, features = ["rt"] } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } tungstenite = { workspace = true } url = { workspace = true } uuid = { workspace = true, features = ["v4"] } diff --git a/codex-rs/app-server-test-client/src/lib.rs b/codex-rs/app-server-test-client/src/lib.rs index 06f677ceb..28579c6c8 100644 --- a/codex-rs/app-server-test-client/src/lib.rs +++ b/codex-rs/app-server-test-client/src/lib.rs @@ -62,10 +62,17 @@ use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStartResponse; use codex_app_server_protocol::TurnStatus; use codex_app_server_protocol::UserInput as V2UserInput; +use codex_core::config::Config; +use codex_otel::current_span_w3c_trace_context; +use codex_otel::otel_provider::OtelProvider; use codex_protocol::protocol::W3cTraceContext; +use codex_utils_cli::CliConfigOverrides; use serde::Serialize; use serde::de::DeserializeOwned; use serde_json::Value; +use tracing::info_span; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; use tungstenite::Message; use tungstenite::WebSocket; use tungstenite::connect; @@ -98,6 +105,10 @@ const NOTIFICATIONS_TO_OPT_OUT: &[&str] = &[ ]; const APP_SERVER_GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5); const APP_SERVER_GRACEFUL_SHUTDOWN_POLL_INTERVAL: Duration = Duration::from_millis(100); +const DEFAULT_ANALYTICS_ENABLED: bool = true; +const OTEL_SERVICE_NAME: &str = "codex-app-server-test-client"; +const TRACE_DISABLED_MESSAGE: &str = + "Not enabled - enable tracing in $CODEX_HOME/config.toml to get a trace URL!"; /// Minimal launcher that initializes the Codex app-server and logs the handshake. #[derive(Parser)] @@ -236,7 +247,7 @@ enum CliCommand { }, } -pub fn run() -> Result<()> { +pub async fn run() -> Result<()> { let Cli { codex_bin, url, @@ -256,7 +267,7 @@ pub fn run() -> Result<()> { CliCommand::SendMessage { user_message } => { ensure_dynamic_tools_unused(&dynamic_tools, "send-message")?; let endpoint = resolve_endpoint(codex_bin, url)?; - send_message(&endpoint, &config_overrides, user_message) + send_message(&endpoint, &config_overrides, user_message).await } CliCommand::SendMessageV2 { experimental_api, @@ -270,6 +281,7 @@ pub fn run() -> Result<()> { experimental_api, &dynamic_tools, ) + .await } CliCommand::ResumeMessageV2 { thread_id, @@ -283,28 +295,29 @@ pub fn run() -> Result<()> { user_message, &dynamic_tools, ) + .await } CliCommand::ThreadResume { thread_id } => { ensure_dynamic_tools_unused(&dynamic_tools, "thread-resume")?; let endpoint = resolve_endpoint(codex_bin, url)?; - thread_resume_follow(&endpoint, &config_overrides, thread_id) + thread_resume_follow(&endpoint, &config_overrides, thread_id).await } CliCommand::Watch => { ensure_dynamic_tools_unused(&dynamic_tools, "watch")?; let endpoint = resolve_endpoint(codex_bin, url)?; - watch(&endpoint, &config_overrides) + watch(&endpoint, &config_overrides).await } CliCommand::TriggerCmdApproval { user_message } => { let endpoint = resolve_endpoint(codex_bin, url)?; - trigger_cmd_approval(&endpoint, &config_overrides, user_message, &dynamic_tools) + trigger_cmd_approval(&endpoint, &config_overrides, user_message, &dynamic_tools).await } CliCommand::TriggerPatchApproval { user_message } => { let endpoint = resolve_endpoint(codex_bin, url)?; - trigger_patch_approval(&endpoint, &config_overrides, user_message, &dynamic_tools) + trigger_patch_approval(&endpoint, &config_overrides, user_message, &dynamic_tools).await } CliCommand::NoTriggerCmdApproval => { let endpoint = resolve_endpoint(codex_bin, url)?; - no_trigger_cmd_approval(&endpoint, &config_overrides, &dynamic_tools) + no_trigger_cmd_approval(&endpoint, &config_overrides, &dynamic_tools).await } CliCommand::SendFollowUpV2 { first_message, @@ -318,6 +331,7 @@ pub fn run() -> Result<()> { follow_up_message, &dynamic_tools, ) + .await } CliCommand::TriggerZshForkMultiCmdApproval { user_message, @@ -333,26 +347,27 @@ pub fn run() -> Result<()> { abort_on, &dynamic_tools, ) + .await } CliCommand::TestLogin => { ensure_dynamic_tools_unused(&dynamic_tools, "test-login")?; let endpoint = resolve_endpoint(codex_bin, url)?; - test_login(&endpoint, &config_overrides) + test_login(&endpoint, &config_overrides).await } CliCommand::GetAccountRateLimits => { ensure_dynamic_tools_unused(&dynamic_tools, "get-account-rate-limits")?; let endpoint = resolve_endpoint(codex_bin, url)?; - get_account_rate_limits(&endpoint, &config_overrides) + get_account_rate_limits(&endpoint, &config_overrides).await } CliCommand::ModelList => { ensure_dynamic_tools_unused(&dynamic_tools, "model-list")?; let endpoint = resolve_endpoint(codex_bin, url)?; - model_list(&endpoint, &config_overrides) + model_list(&endpoint, &config_overrides).await } CliCommand::ThreadList { limit } => { ensure_dynamic_tools_unused(&dynamic_tools, "thread-list")?; let endpoint = resolve_endpoint(codex_bin, url)?; - thread_list(&endpoint, &config_overrides, limit) + thread_list(&endpoint, &config_overrides, limit).await } } } @@ -487,7 +502,15 @@ fn shell_quote(input: &str) -> String { format!("'{}'", input.replace('\'', "'\\''")) } -fn send_message( +struct SendMessagePolicies<'a> { + command_name: &'static str, + experimental_api: bool, + approval_policy: Option, + sandbox_policy: Option, + dynamic_tools: &'a Option>, +} + +async fn send_message( endpoint: &Endpoint, config_overrides: &[String], user_message: String, @@ -497,14 +520,18 @@ fn send_message( endpoint, config_overrides, user_message, - false, - None, - None, - &dynamic_tools, + SendMessagePolicies { + command_name: "send-message", + experimental_api: false, + approval_policy: None, + sandbox_policy: None, + dynamic_tools: &dynamic_tools, + }, ) + .await } -pub fn send_message_v2( +pub async fn send_message_v2( codex_bin: &Path, config_overrides: &[String], user_message: String, @@ -518,9 +545,10 @@ pub fn send_message_v2( true, dynamic_tools, ) + .await } -fn send_message_v2_endpoint( +async fn send_message_v2_endpoint( endpoint: &Endpoint, config_overrides: &[String], user_message: String, @@ -535,14 +563,18 @@ fn send_message_v2_endpoint( endpoint, config_overrides, user_message, - experimental_api, - None, - None, - dynamic_tools, + SendMessagePolicies { + command_name: "send-message-v2", + experimental_api, + approval_policy: None, + sandbox_policy: None, + dynamic_tools, + }, ) + .await } -fn trigger_zsh_fork_multi_cmd_approval( +async fn trigger_zsh_fork_multi_cmd_approval( endpoint: &Endpoint, config_overrides: &[String], user_message: Option, @@ -559,89 +591,96 @@ fn trigger_zsh_fork_multi_cmd_approval( let default_prompt = "Run this exact command using shell command execution without rewriting or splitting it: /usr/bin/true && /usr/bin/true"; let message = user_message.unwrap_or_else(|| default_prompt.to_string()); - with_client(endpoint, config_overrides, |client| { - let initialize = client.initialize()?; - println!("< initialize response: {initialize:?}"); + with_client( + "trigger-zsh-fork-multi-cmd-approval", + endpoint, + config_overrides, + |client| { + let initialize = client.initialize()?; + println!("< initialize response: {initialize:?}"); - let thread_response = client.thread_start(ThreadStartParams { - dynamic_tools: dynamic_tools.clone(), - ..Default::default() - })?; - println!("< thread/start response: {thread_response:?}"); + let thread_response = client.thread_start(ThreadStartParams { + dynamic_tools: dynamic_tools.clone(), + ..Default::default() + })?; + println!("< thread/start response: {thread_response:?}"); - client.command_approval_behavior = match abort_on { - Some(index) => CommandApprovalBehavior::AbortOn(index), - None => CommandApprovalBehavior::AlwaysAccept, - }; - client.command_approval_count = 0; - client.command_approval_item_ids.clear(); - client.command_execution_statuses.clear(); - client.last_turn_status = None; + client.command_approval_behavior = match abort_on { + Some(index) => CommandApprovalBehavior::AbortOn(index), + None => CommandApprovalBehavior::AlwaysAccept, + }; + client.command_approval_count = 0; + client.command_approval_item_ids.clear(); + client.command_execution_statuses.clear(); + client.last_turn_status = None; - let mut turn_params = TurnStartParams { - thread_id: thread_response.thread.id.clone(), - input: vec![V2UserInput::Text { - text: message, - text_elements: Vec::new(), - }], - ..Default::default() - }; - turn_params.approval_policy = Some(AskForApproval::OnRequest); - turn_params.sandbox_policy = Some(SandboxPolicy::ReadOnly { - access: ReadOnlyAccess::FullAccess, - network_access: false, - }); + let mut turn_params = TurnStartParams { + thread_id: thread_response.thread.id.clone(), + input: vec![V2UserInput::Text { + text: message, + text_elements: Vec::new(), + }], + ..Default::default() + }; + turn_params.approval_policy = Some(AskForApproval::OnRequest); + turn_params.sandbox_policy = Some(SandboxPolicy::ReadOnly { + access: ReadOnlyAccess::FullAccess, + network_access: false, + }); - let turn_response = client.turn_start(turn_params)?; - println!("< turn/start response: {turn_response:?}"); - client.stream_turn(&thread_response.thread.id, &turn_response.turn.id)?; + let turn_response = client.turn_start(turn_params)?; + println!("< turn/start response: {turn_response:?}"); + client.stream_turn(&thread_response.thread.id, &turn_response.turn.id)?; - if client.command_approval_count < min_approvals { - bail!( - "expected at least {min_approvals} command approvals, got {}", - client.command_approval_count - ); - } - let mut approvals_per_item = std::collections::BTreeMap::new(); - for item_id in &client.command_approval_item_ids { - *approvals_per_item.entry(item_id.clone()).or_insert(0usize) += 1; - } - let max_approvals_for_one_item = approvals_per_item.values().copied().max().unwrap_or(0); - if max_approvals_for_one_item < min_approvals { - bail!( - "expected at least {min_approvals} approvals for one command item, got max {max_approvals_for_one_item} with map {approvals_per_item:?}" - ); - } - - let last_command_status = client.command_execution_statuses.last(); - if abort_on.is_none() { - if last_command_status != Some(&CommandExecutionStatus::Completed) { - bail!("expected completed command execution, got {last_command_status:?}"); - } - if client.last_turn_status != Some(TurnStatus::Completed) { + if client.command_approval_count < min_approvals { bail!( - "expected completed turn in all-accept flow, got {:?}", - client.last_turn_status + "expected at least {min_approvals} command approvals, got {}", + client.command_approval_count ); } - } else if last_command_status == Some(&CommandExecutionStatus::Completed) { - bail!( - "expected non-completed command execution in mixed approval/decline flow, got {last_command_status:?}" + let mut approvals_per_item = std::collections::BTreeMap::new(); + for item_id in &client.command_approval_item_ids { + *approvals_per_item.entry(item_id.clone()).or_insert(0usize) += 1; + } + let max_approvals_for_one_item = + approvals_per_item.values().copied().max().unwrap_or(0); + if max_approvals_for_one_item < min_approvals { + bail!( + "expected at least {min_approvals} approvals for one command item, got max {max_approvals_for_one_item} with map {approvals_per_item:?}" + ); + } + + let last_command_status = client.command_execution_statuses.last(); + if abort_on.is_none() { + if last_command_status != Some(&CommandExecutionStatus::Completed) { + bail!("expected completed command execution, got {last_command_status:?}"); + } + if client.last_turn_status != Some(TurnStatus::Completed) { + bail!( + "expected completed turn in all-accept flow, got {:?}", + client.last_turn_status + ); + } + } else if last_command_status == Some(&CommandExecutionStatus::Completed) { + bail!( + "expected non-completed command execution in mixed approval/decline flow, got {last_command_status:?}" + ); + } + + println!( + "[zsh-fork multi-approval summary] approvals={}, approvals_per_item={approvals_per_item:?}, command_statuses={:?}, turn_status={:?}", + client.command_approval_count, + client.command_execution_statuses, + client.last_turn_status ); - } - println!( - "[zsh-fork multi-approval summary] approvals={}, approvals_per_item={approvals_per_item:?}, command_statuses={:?}, turn_status={:?}", - client.command_approval_count, - client.command_execution_statuses, - client.last_turn_status - ); - - Ok(()) - }) + Ok(()) + }, + ) + .await } -fn resume_message_v2( +async fn resume_message_v2( endpoint: &Endpoint, config_overrides: &[String], thread_id: String, @@ -650,7 +689,7 @@ fn resume_message_v2( ) -> Result<()> { ensure_dynamic_tools_unused(dynamic_tools, "resume-message-v2")?; - with_client(endpoint, config_overrides, |client| { + with_client("resume-message-v2", endpoint, config_overrides, |client| { let initialize = client.initialize()?; println!("< initialize response: {initialize:?}"); @@ -674,39 +713,42 @@ fn resume_message_v2( Ok(()) }) + .await } -fn thread_resume_follow( +async fn thread_resume_follow( endpoint: &Endpoint, config_overrides: &[String], thread_id: String, ) -> Result<()> { - let mut client = CodexClient::connect(endpoint, config_overrides)?; + with_client("thread-resume", endpoint, config_overrides, |client| { + let initialize = client.initialize()?; + println!("< initialize response: {initialize:?}"); - let initialize = client.initialize()?; - println!("< initialize response: {initialize:?}"); + let resume_response = client.thread_resume(ThreadResumeParams { + thread_id, + ..Default::default() + })?; + println!("< thread/resume response: {resume_response:?}"); + println!("< streaming notifications until process is terminated"); - let resume_response = client.thread_resume(ThreadResumeParams { - thread_id, - ..Default::default() - })?; - println!("< thread/resume response: {resume_response:?}"); - println!("< streaming notifications until process is terminated"); - - client.stream_notifications_forever() + client.stream_notifications_forever() + }) + .await } -fn watch(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> { - let mut client = CodexClient::connect(endpoint, config_overrides)?; +async fn watch(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> { + with_client("watch", endpoint, config_overrides, |client| { + let initialize = client.initialize()?; + println!("< initialize response: {initialize:?}"); + println!("< streaming inbound messages until process is terminated"); - let initialize = client.initialize()?; - println!("< initialize response: {initialize:?}"); - println!("< streaming inbound messages until process is terminated"); - - client.stream_notifications_forever() + client.stream_notifications_forever() + }) + .await } -fn trigger_cmd_approval( +async fn trigger_cmd_approval( endpoint: &Endpoint, config_overrides: &[String], user_message: Option, @@ -719,17 +761,21 @@ fn trigger_cmd_approval( endpoint, config_overrides, message, - true, - Some(AskForApproval::OnRequest), - Some(SandboxPolicy::ReadOnly { - access: ReadOnlyAccess::FullAccess, - network_access: false, - }), - dynamic_tools, + SendMessagePolicies { + command_name: "trigger-cmd-approval", + experimental_api: true, + approval_policy: Some(AskForApproval::OnRequest), + sandbox_policy: Some(SandboxPolicy::ReadOnly { + access: ReadOnlyAccess::FullAccess, + network_access: false, + }), + dynamic_tools, + }, ) + .await } -fn trigger_patch_approval( +async fn trigger_patch_approval( endpoint: &Endpoint, config_overrides: &[String], user_message: Option, @@ -742,17 +788,21 @@ fn trigger_patch_approval( endpoint, config_overrides, message, - true, - Some(AskForApproval::OnRequest), - Some(SandboxPolicy::ReadOnly { - access: ReadOnlyAccess::FullAccess, - network_access: false, - }), - dynamic_tools, + SendMessagePolicies { + command_name: "trigger-patch-approval", + experimental_api: true, + approval_policy: Some(AskForApproval::OnRequest), + sandbox_policy: Some(SandboxPolicy::ReadOnly { + access: ReadOnlyAccess::FullAccess, + network_access: false, + }), + dynamic_tools, + }, ) + .await } -fn no_trigger_cmd_approval( +async fn no_trigger_cmd_approval( endpoint: &Endpoint, config_overrides: &[String], dynamic_tools: &Option>, @@ -762,60 +812,67 @@ fn no_trigger_cmd_approval( endpoint, config_overrides, prompt.to_string(), - true, - None, - None, - dynamic_tools, + SendMessagePolicies { + command_name: "no-trigger-cmd-approval", + experimental_api: true, + approval_policy: None, + sandbox_policy: None, + dynamic_tools, + }, ) + .await } -fn send_message_v2_with_policies( +async fn send_message_v2_with_policies( endpoint: &Endpoint, config_overrides: &[String], user_message: String, - experimental_api: bool, - approval_policy: Option, - sandbox_policy: Option, - dynamic_tools: &Option>, + policies: SendMessagePolicies<'_>, ) -> Result<()> { - with_client(endpoint, config_overrides, |client| { - let initialize = client.initialize_with_experimental_api(experimental_api)?; - println!("< initialize response: {initialize:?}"); + with_client( + policies.command_name, + endpoint, + config_overrides, + |client| { + let initialize = client.initialize_with_experimental_api(policies.experimental_api)?; + println!("< initialize response: {initialize:?}"); - let thread_response = client.thread_start(ThreadStartParams { - dynamic_tools: dynamic_tools.clone(), - ..Default::default() - })?; - println!("< thread/start response: {thread_response:?}"); - let mut turn_params = TurnStartParams { - thread_id: thread_response.thread.id.clone(), - input: vec![V2UserInput::Text { - text: user_message, - // Test client sends plain text without UI element ranges. - text_elements: Vec::new(), - }], - ..Default::default() - }; - turn_params.approval_policy = approval_policy; - turn_params.sandbox_policy = sandbox_policy; + let thread_response = client.thread_start(ThreadStartParams { + dynamic_tools: policies.dynamic_tools.clone(), + ..Default::default() + })?; + println!("< thread/start response: {thread_response:?}"); + let mut turn_params = TurnStartParams { + thread_id: thread_response.thread.id.clone(), + input: vec![V2UserInput::Text { + text: user_message, + // Test client sends plain text without UI element ranges. + text_elements: Vec::new(), + }], + ..Default::default() + }; + turn_params.approval_policy = policies.approval_policy; + turn_params.sandbox_policy = policies.sandbox_policy; - let turn_response = client.turn_start(turn_params)?; - println!("< turn/start response: {turn_response:?}"); + let turn_response = client.turn_start(turn_params)?; + println!("< turn/start response: {turn_response:?}"); - client.stream_turn(&thread_response.thread.id, &turn_response.turn.id)?; + client.stream_turn(&thread_response.thread.id, &turn_response.turn.id)?; - Ok(()) - }) + Ok(()) + }, + ) + .await } -fn send_follow_up_v2( +async fn send_follow_up_v2( endpoint: &Endpoint, config_overrides: &[String], first_message: String, follow_up_message: String, dynamic_tools: &Option>, ) -> Result<()> { - with_client(endpoint, config_overrides, |client| { + with_client("send-follow-up-v2", endpoint, config_overrides, |client| { let initialize = client.initialize()?; println!("< initialize response: {initialize:?}"); @@ -853,10 +910,11 @@ fn send_follow_up_v2( Ok(()) }) + .await } -fn test_login(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> { - with_client(endpoint, config_overrides, |client| { +async fn test_login(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> { + with_client("test-login", endpoint, config_overrides, |client| { let initialize = client.initialize()?; println!("< initialize response: {initialize:?}"); @@ -883,22 +941,29 @@ fn test_login(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> { ); } }) + .await } -fn get_account_rate_limits(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> { - with_client(endpoint, config_overrides, |client| { - let initialize = client.initialize()?; - println!("< initialize response: {initialize:?}"); +async fn get_account_rate_limits(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> { + with_client( + "get-account-rate-limits", + endpoint, + config_overrides, + |client| { + let initialize = client.initialize()?; + println!("< initialize response: {initialize:?}"); - let response = client.get_account_rate_limits()?; - println!("< account/rateLimits/read response: {response:?}"); + let response = client.get_account_rate_limits()?; + println!("< account/rateLimits/read response: {response:?}"); - Ok(()) - }) + Ok(()) + }, + ) + .await } -fn model_list(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> { - with_client(endpoint, config_overrides, |client| { +async fn model_list(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> { + with_client("model-list", endpoint, config_overrides, |client| { let initialize = client.initialize()?; println!("< initialize response: {initialize:?}"); @@ -907,10 +972,11 @@ fn model_list(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> { Ok(()) }) + .await } -fn thread_list(endpoint: &Endpoint, config_overrides: &[String], limit: u32) -> Result<()> { - with_client(endpoint, config_overrides, |client| { +async fn thread_list(endpoint: &Endpoint, config_overrides: &[String], limit: u32) -> Result<()> { + with_client("thread-list", endpoint, config_overrides, |client| { let initialize = client.initialize()?; println!("< initialize response: {initialize:?}"); @@ -928,16 +994,28 @@ fn thread_list(endpoint: &Endpoint, config_overrides: &[String], limit: u32) -> Ok(()) }) + .await } -fn with_client( +async fn with_client( + command_name: &'static str, endpoint: &Endpoint, config_overrides: &[String], f: impl FnOnce(&mut CodexClient) -> Result, ) -> Result { - let mut client = CodexClient::connect(endpoint, config_overrides)?; - let result = f(&mut client); - client.print_trace_summary(); + let tracing = TestClientTracing::initialize(config_overrides).await?; + let command_span = info_span!( + "app_server_test_client.command", + otel.kind = "client", + otel.name = command_name, + app_server_test_client.command = command_name, + ); + let trace_summary = command_span.in_scope(|| TraceSummary::capture(tracing.traces_enabled)); + let result = command_span.in_scope(|| { + let mut client = CodexClient::connect(endpoint, config_overrides)?; + f(&mut client) + }); + print_trace_summary(&trace_summary); result } @@ -995,8 +1073,6 @@ struct CodexClient { command_approval_item_ids: Vec, command_execution_statuses: Vec, last_turn_status: Option, - trace_id: String, - trace_root_span_id: String, } #[derive(Debug, Clone, Copy)] @@ -1056,8 +1132,6 @@ impl CodexClient { command_approval_item_ids: Vec::new(), command_execution_statuses: Vec::new(), last_turn_status: None, - trace_id: generate_trace_id(), - trace_root_span_id: generate_parent_span_id(), }) } @@ -1079,8 +1153,6 @@ impl CodexClient { command_approval_item_ids: Vec::new(), command_execution_statuses: Vec::new(), last_turn_status: None, - trace_id: generate_trace_id(), - trace_root_span_id: generate_parent_span_id(), }) } @@ -1302,37 +1374,31 @@ impl CodexClient { where T: DeserializeOwned, { - self.write_request(&request)?; - self.wait_for_response(request_id, method) + let request_span = info_span!( + "app_server_test_client.request", + otel.kind = "client", + otel.name = method, + rpc.system = "jsonrpc", + rpc.method = method, + rpc.request_id = ?request_id, + ); + request_span.in_scope(|| { + self.write_request(&request)?; + self.wait_for_response(request_id, method) + }) } fn write_request(&mut self, request: &ClientRequest) -> Result<()> { - let request = self.jsonrpc_request_with_trace(request)?; + let request_value = serde_json::to_value(request)?; + let mut request: JSONRPCRequest = serde_json::from_value(request_value) + .context("client request was not a valid JSON-RPC request")?; + request.trace = current_span_w3c_trace_context(); let request_json = serde_json::to_string(&request)?; let request_pretty = serde_json::to_string_pretty(&request)?; print_multiline_with_prefix("> ", &request_pretty); self.write_payload(&request_json) } - fn jsonrpc_request_with_trace(&self, request: &ClientRequest) -> Result { - let request_value = serde_json::to_value(request)?; - let mut request: JSONRPCRequest = serde_json::from_value(request_value) - .context("client request was not a valid JSON-RPC request")?; - request.trace = Some(W3cTraceContext { - traceparent: Some(format!( - "00-{}-{}-01", - self.trace_id, self.trace_root_span_id - )), - tracestate: None, - }); - Ok(request) - } - - fn print_trace_summary(&self) { - println!("\n[Datadog trace]"); - println!("go/trace/{}\n", self.trace_id); - } - fn wait_for_response(&mut self, request_id: RequestId, method: &str) -> Result where T: DeserializeOwned, @@ -1598,21 +1664,91 @@ impl CodexClient { } } -fn generate_trace_id() -> String { - Uuid::new_v4().simple().to_string() -} - -fn generate_parent_span_id() -> String { - let uuid = Uuid::new_v4().simple().to_string(); - uuid[..16].to_string() -} - fn print_multiline_with_prefix(prefix: &str, payload: &str) { for line in payload.lines() { println!("{prefix}{line}"); } } +struct TestClientTracing { + _otel_provider: Option, + traces_enabled: bool, +} + +impl TestClientTracing { + async fn initialize(config_overrides: &[String]) -> Result { + let cli_kv_overrides = CliConfigOverrides { + raw_overrides: config_overrides.to_vec(), + } + .parse_overrides() + .map_err(|e| anyhow::anyhow!("error parsing -c overrides: {e}"))?; + let config = Config::load_with_cli_overrides(cli_kv_overrides) + .await + .context("error loading config")?; + let otel_provider = codex_core::otel_init::build_provider( + &config, + env!("CARGO_PKG_VERSION"), + Some(OTEL_SERVICE_NAME), + DEFAULT_ANALYTICS_ENABLED, + ) + .map_err(|e| anyhow::anyhow!("error loading otel config: {e}"))?; + let traces_enabled = otel_provider + .as_ref() + .and_then(|provider| provider.tracer_provider.as_ref()) + .is_some(); + if let Some(provider) = otel_provider.as_ref() + && traces_enabled + { + let _ = tracing_subscriber::registry() + .with(provider.tracing_layer()) + .try_init(); + } + Ok(Self { + traces_enabled, + _otel_provider: otel_provider, + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +enum TraceSummary { + Enabled { url: String }, + Disabled, +} + +impl TraceSummary { + fn capture(traces_enabled: bool) -> Self { + if !traces_enabled { + return Self::Disabled; + } + current_span_w3c_trace_context() + .as_ref() + .and_then(trace_url_from_context) + .map_or(Self::Disabled, |url| Self::Enabled { url }) + } +} + +fn trace_url_from_context(trace: &W3cTraceContext) -> Option { + let traceparent = trace.traceparent.as_deref()?; + let mut parts = traceparent.split('-'); + match (parts.next(), parts.next(), parts.next(), parts.next()) { + (Some(_version), Some(trace_id), Some(_span_id), Some(_trace_flags)) + if trace_id.len() == 32 => + { + Some(format!("go/trace/{trace_id}")) + } + _ => None, + } +} + +fn print_trace_summary(trace_summary: &TraceSummary) { + println!("\n[Datadog trace]"); + match trace_summary { + TraceSummary::Enabled { url } => println!("{url}\n"), + TraceSummary::Disabled => println!("{TRACE_DISABLED_MESSAGE}\n"), + } +} + impl Drop for CodexClient { fn drop(&mut self) { let ClientTransport::Stdio { child, stdin, .. } = &mut self.transport else { diff --git a/codex-rs/app-server-test-client/src/main.rs b/codex-rs/app-server-test-client/src/main.rs index a4da2e402..794bede1c 100644 --- a/codex-rs/app-server-test-client/src/main.rs +++ b/codex-rs/app-server-test-client/src/main.rs @@ -1,5 +1,7 @@ use anyhow::Result; +use tokio::runtime::Builder; fn main() -> Result<()> { - codex_app_server_test_client::run() + let runtime = Builder::new_current_thread().enable_all().build()?; + runtime.block_on(codex_app_server_test_client::run()) } diff --git a/codex-rs/cli/src/main.rs b/codex-rs/cli/src/main.rs index f818b743e..41b836186 100644 --- a/codex-rs/cli/src/main.rs +++ b/codex-rs/cli/src/main.rs @@ -473,11 +473,12 @@ fn run_execpolicycheck(cmd: ExecPolicyCheckCommand) -> anyhow::Result<()> { cmd.run() } -fn run_debug_app_server_command(cmd: DebugAppServerCommand) -> anyhow::Result<()> { +async fn run_debug_app_server_command(cmd: DebugAppServerCommand) -> anyhow::Result<()> { match cmd.subcommand { DebugAppServerSubcommand::SendMessageV2(cmd) => { let codex_bin = std::env::current_exe()?; codex_app_server_test_client::send_message_v2(&codex_bin, &[], cmd.user_message, &None) + .await } } } @@ -755,7 +756,7 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { }, Some(Subcommand::Debug(DebugCommand { subcommand })) => match subcommand { DebugSubcommand::AppServer(cmd) => { - run_debug_app_server_command(cmd)?; + run_debug_app_server_command(cmd).await?; } DebugSubcommand::ClearMemories => { run_debug_clear_memories_command(&root_config_overrides, &interactive).await?;