From da3689f0ef7422c3857e1156d4b78d3482cc26d6 Mon Sep 17 00:00:00 2001 From: Eric Traut Date: Sun, 8 Mar 2026 18:43:55 -0600 Subject: [PATCH] Add in-process app server and wire up exec to use it (#14005) This is a subset of PR #13636. See that PR for a full overview of the architectural change. This PR implements the in-process app server and modifies the non-interactive "exec" entry point to use the app server. --------- Co-authored-by: Felipe Coury --- codex-rs/Cargo.lock | 21 + codex-rs/Cargo.toml | 2 + codex-rs/app-server-client/Cargo.toml | 30 + codex-rs/app-server-client/README.md | 67 + codex-rs/app-server-client/src/lib.rs | 801 ++++++++++++ .../src/protocol/common.rs | 22 + codex-rs/app-server/src/app_server_tracing.rs | 65 + codex-rs/app-server/src/in_process.rs | 884 +++++++++++++ codex-rs/app-server/src/lib.rs | 4 + codex-rs/app-server/src/message_processor.rs | 426 ++++--- codex-rs/app-server/src/outgoing_message.rs | 19 + codex-rs/exec/Cargo.toml | 3 + codex-rs/exec/src/lib.rs | 1106 +++++++++++++---- 13 files changed, 3019 insertions(+), 431 deletions(-) create mode 100644 codex-rs/app-server-client/Cargo.toml create mode 100644 codex-rs/app-server-client/README.md create mode 100644 codex-rs/app-server-client/src/lib.rs create mode 100644 codex-rs/app-server/src/in_process.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 7fc34083f..008cd5722 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1458,6 +1458,24 @@ dependencies = [ "wiremock", ] +[[package]] +name = "codex-app-server-client" +version = "0.0.0" +dependencies = [ + "codex-app-server", + "codex-app-server-protocol", + "codex-arg0", + "codex-core", + "codex-feedback", + "codex-protocol", + "pretty_assertions", + "serde", + "serde_json", + "tokio", + "toml 0.9.11+spec-1.1.0", + "tracing", +] + [[package]] name = "codex-app-server-protocol" version = "0.0.0" @@ -1899,10 +1917,13 @@ dependencies = [ "anyhow", "assert_cmd", "clap", + "codex-app-server-client", + "codex-app-server-protocol", "codex-apply-patch", "codex-arg0", "codex-cloud-requirements", "codex-core", + "codex-feedback", "codex-otel", "codex-protocol", "codex-utils-absolute-path", diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index cbf7bb6ab..681487c09 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -4,6 +4,7 @@ members = [ "ansi-escape", "async-utils", "app-server", + "app-server-client", "app-server-protocol", "app-server-test-client", "debug-client", @@ -86,6 +87,7 @@ codex-api = { path = "codex-api" } codex-artifacts = { path = "artifacts" } codex-package-manager = { path = "package-manager" } codex-app-server = { path = "app-server" } +codex-app-server-client = { path = "app-server-client" } codex-app-server-protocol = { path = "app-server-protocol" } codex-app-server-test-client = { path = "app-server-test-client" } codex-apply-patch = { path = "apply-patch" } diff --git a/codex-rs/app-server-client/Cargo.toml b/codex-rs/app-server-client/Cargo.toml new file mode 100644 index 000000000..addde4e52 --- /dev/null +++ b/codex-rs/app-server-client/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "codex-app-server-client" +version.workspace = true +edition.workspace = true +license.workspace = true + +[lib] +name = "codex_app_server_client" +path = "src/lib.rs" + +[lints] +workspace = true + +[dependencies] +codex-app-server = { workspace = true } +codex-app-server-protocol = { workspace = true } +codex-arg0 = { workspace = true } +codex-core = { workspace = true } +codex-feedback = { workspace = true } +codex-protocol = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true, features = ["sync", "time", "rt"] } +toml = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +pretty_assertions = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/codex-rs/app-server-client/README.md b/codex-rs/app-server-client/README.md new file mode 100644 index 000000000..8944ab4ba --- /dev/null +++ b/codex-rs/app-server-client/README.md @@ -0,0 +1,67 @@ +# codex-app-server-client + +Shared in-process app-server client used by conversational CLI surfaces: + +- `codex-exec` +- `codex-tui` + +## Purpose + +This crate centralizes startup and lifecycle management for an in-process +`codex-app-server` runtime, so CLI clients do not need to duplicate: + +- app-server bootstrap and initialize handshake +- in-memory request/event transport wiring +- lifecycle orchestration around caller-provided startup identity +- graceful shutdown behavior + +## Startup identity + +Callers pass both the app-server `SessionSource` and the initialize +`client_info.name` explicitly when starting the facade. + +That keeps thread metadata (for example in `thread/list` and `thread/read`) +aligned with the originating runtime without baking TUI/exec-specific policy +into the shared client layer. + +## Transport model + +The in-process path uses typed channels: + +- client -> server: `ClientRequest` / `ClientNotification` +- server -> client: `InProcessServerEvent` + - `ServerRequest` + - `ServerNotification` + - `LegacyNotification` + +JSON serialization is still used at external transport boundaries +(stdio/websocket), but the in-process hot path is typed. + +Typed requests still receive app-server responses through the JSON-RPC +result envelope internally. That is intentional: the in-process path is +meant to preserve app-server semantics while removing the process +boundary, not to introduce a second response contract. + +## Bootstrap behavior + +The client facade starts an already-initialized in-process runtime, but +thread bootstrap still follows normal app-server flow: + +- caller sends `thread/start` or `thread/resume` +- app-server returns the immediate typed response +- richer session metadata may arrive later as a `SessionConfigured` + legacy event + +Surfaces such as TUI and exec may therefore need a short bootstrap +phase where they reconcile startup response data with later events. + +## Backpressure and shutdown + +- Queues are bounded and use `DEFAULT_IN_PROCESS_CHANNEL_CAPACITY` by default. +- Full queues return explicit overload behavior instead of unbounded growth. +- `shutdown()` performs a bounded graceful shutdown and then aborts if timeout + is exceeded. + +If the client falls behind on event consumption, the worker emits +`InProcessServerEvent::Lagged` and may reject pending server requests so +approval flows do not hang indefinitely behind a saturated queue. diff --git a/codex-rs/app-server-client/src/lib.rs b/codex-rs/app-server-client/src/lib.rs new file mode 100644 index 000000000..80a328384 --- /dev/null +++ b/codex-rs/app-server-client/src/lib.rs @@ -0,0 +1,801 @@ +//! Shared in-process app-server client facade for CLI surfaces. +//! +//! This crate wraps [`codex_app_server::in_process`] behind a single async API +//! used by surfaces like TUI and exec. It centralizes: +//! +//! - Runtime startup and initialize-capabilities handshake. +//! - Typed caller-provided startup identity (`SessionSource` + client name). +//! - Typed and raw request/notification dispatch. +//! - Server request resolution and rejection. +//! - Event consumption with backpressure signaling ([`InProcessServerEvent::Lagged`]). +//! - Bounded graceful shutdown with abort fallback. +//! +//! The facade interposes a worker task between the caller and the underlying +//! [`InProcessClientHandle`](codex_app_server::in_process::InProcessClientHandle), +//! bridging async `mpsc` channels on both sides. Queues are bounded so overload +//! surfaces as channel-full errors rather than unbounded memory growth. + +use std::error::Error; +use std::fmt; +use std::io::Error as IoError; +use std::io::ErrorKind; +use std::io::Result as IoResult; +use std::sync::Arc; +use std::time::Duration; + +pub use codex_app_server::in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY; +pub use codex_app_server::in_process::InProcessServerEvent; +use codex_app_server::in_process::InProcessStartArgs; +use codex_app_server_protocol::ClientInfo; +use codex_app_server_protocol::ClientNotification; +use codex_app_server_protocol::ClientRequest; +use codex_app_server_protocol::ConfigWarningNotification; +use codex_app_server_protocol::InitializeCapabilities; +use codex_app_server_protocol::InitializeParams; +use codex_app_server_protocol::JSONRPCErrorError; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::Result as JsonRpcResult; +use codex_arg0::Arg0DispatchPaths; +use codex_core::config::Config; +use codex_core::config_loader::CloudRequirementsLoader; +use codex_core::config_loader::LoaderOverrides; +use codex_feedback::CodexFeedback; +use codex_protocol::protocol::SessionSource; +use serde::de::DeserializeOwned; +use tokio::sync::mpsc; +use tokio::sync::oneshot; +use tokio::time::timeout; +use toml::Value as TomlValue; +use tracing::warn; + +const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5); + +/// Raw app-server request result for typed in-process requests. +/// +/// Even on the in-process path, successful responses still travel back through +/// the same JSON-RPC result envelope used by socket/stdio transports because +/// `MessageProcessor` continues to produce that shape internally. +pub type RequestResult = std::result::Result; + +fn event_requires_delivery(event: &InProcessServerEvent) -> bool { + // These terminal events drive surface shutdown/completion state. Dropping + // them under backpressure can leave exec/TUI waiting forever even though + // the underlying turn has already ended. + match event { + InProcessServerEvent::ServerNotification( + codex_app_server_protocol::ServerNotification::TurnCompleted(_), + ) => true, + InProcessServerEvent::LegacyNotification(notification) => matches!( + notification + .method + .strip_prefix("codex/event/") + .unwrap_or(¬ification.method), + "task_complete" | "turn_aborted" | "shutdown_complete" + ), + _ => false, + } +} + +/// Layered error for [`InProcessAppServerClient::request_typed`]. +/// +/// This keeps transport failures, server-side JSON-RPC failures, and response +/// decode failures distinct so callers can decide whether to retry, surface a +/// server error, or treat the response as an internal request/response mismatch. +#[derive(Debug)] +pub enum TypedRequestError { + Transport { + method: String, + source: IoError, + }, + Server { + method: String, + source: JSONRPCErrorError, + }, + Deserialize { + method: String, + source: serde_json::Error, + }, +} + +impl fmt::Display for TypedRequestError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Transport { method, source } => { + write!(f, "{method} transport error: {source}") + } + Self::Server { method, source } => { + write!(f, "{method} failed: {}", source.message) + } + Self::Deserialize { method, source } => { + write!(f, "{method} response decode error: {source}") + } + } + } +} + +impl Error for TypedRequestError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + Self::Transport { source, .. } => Some(source), + Self::Server { .. } => None, + Self::Deserialize { source, .. } => Some(source), + } + } +} + +#[derive(Clone)] +pub struct InProcessClientStartArgs { + /// Resolved argv0 dispatch paths used by command execution internals. + pub arg0_paths: Arg0DispatchPaths, + /// Shared config used to initialize app-server runtime. + pub config: Arc, + /// CLI config overrides that are already parsed into TOML values. + pub cli_overrides: Vec<(String, TomlValue)>, + /// Loader override knobs used by config API paths. + pub loader_overrides: LoaderOverrides, + /// Preloaded cloud requirements provider. + pub cloud_requirements: CloudRequirementsLoader, + /// Feedback sink used by app-server/core telemetry and logs. + pub feedback: CodexFeedback, + /// Startup warnings emitted after initialize succeeds. + pub config_warnings: Vec, + /// Session source recorded in app-server thread metadata. + pub session_source: SessionSource, + /// Whether auth loading should honor the `CODEX_API_KEY` environment variable. + pub enable_codex_api_key_env: bool, + /// Client name reported during initialize. + pub client_name: String, + /// Client version reported during initialize. + pub client_version: String, + /// Whether experimental APIs are requested at initialize time. + pub experimental_api: bool, + /// Notification methods this client opts out of receiving. + pub opt_out_notification_methods: Vec, + /// Queue capacity for command/event channels (clamped to at least 1). + pub channel_capacity: usize, +} + +impl InProcessClientStartArgs { + /// Builds initialize params from caller-provided metadata. + pub fn initialize_params(&self) -> InitializeParams { + let capabilities = InitializeCapabilities { + experimental_api: self.experimental_api, + opt_out_notification_methods: if self.opt_out_notification_methods.is_empty() { + None + } else { + Some(self.opt_out_notification_methods.clone()) + }, + }; + + InitializeParams { + client_info: ClientInfo { + name: self.client_name.clone(), + title: None, + version: self.client_version.clone(), + }, + capabilities: Some(capabilities), + } + } + + fn into_runtime_start_args(self) -> InProcessStartArgs { + let initialize = self.initialize_params(); + InProcessStartArgs { + arg0_paths: self.arg0_paths, + config: self.config, + cli_overrides: self.cli_overrides, + loader_overrides: self.loader_overrides, + cloud_requirements: self.cloud_requirements, + feedback: self.feedback, + config_warnings: self.config_warnings, + session_source: self.session_source, + enable_codex_api_key_env: self.enable_codex_api_key_env, + initialize, + channel_capacity: self.channel_capacity, + } + } +} + +/// Internal command sent from public facade methods to the worker task. +/// +/// Each variant carries a oneshot sender so the caller can `await` the +/// result without holding a mutable reference to the client. +enum ClientCommand { + Request { + request: Box, + response_tx: oneshot::Sender>, + }, + Notify { + notification: ClientNotification, + response_tx: oneshot::Sender>, + }, + ResolveServerRequest { + request_id: RequestId, + result: JsonRpcResult, + response_tx: oneshot::Sender>, + }, + RejectServerRequest { + request_id: RequestId, + error: JSONRPCErrorError, + response_tx: oneshot::Sender>, + }, + Shutdown { + response_tx: oneshot::Sender>, + }, +} + +/// Async facade over the in-process app-server runtime. +/// +/// This type owns a worker task that bridges between: +/// - caller-facing async `mpsc` channels used by TUI/exec +/// - [`codex_app_server::in_process::InProcessClientHandle`], which speaks to +/// the embedded `MessageProcessor` +/// +/// The facade intentionally preserves the server's request/notification/event +/// model instead of exposing direct core runtime handles. That keeps in-process +/// callers aligned with app-server behavior while still avoiding a process +/// boundary. +pub struct InProcessAppServerClient { + command_tx: mpsc::Sender, + event_rx: mpsc::Receiver, + worker_handle: tokio::task::JoinHandle<()>, +} + +impl InProcessAppServerClient { + /// Starts the in-process runtime and facade worker task. + /// + /// The returned client is ready for requests and event consumption. If the + /// internal event queue is saturated later, server requests are rejected + /// with overload error instead of being silently dropped. + pub async fn start(args: InProcessClientStartArgs) -> IoResult { + let channel_capacity = args.channel_capacity.max(1); + let mut handle = + codex_app_server::in_process::start(args.into_runtime_start_args()).await?; + let request_sender = handle.sender(); + let (command_tx, mut command_rx) = mpsc::channel::(channel_capacity); + let (event_tx, event_rx) = mpsc::channel::(channel_capacity); + + let worker_handle = tokio::spawn(async move { + let mut event_stream_enabled = true; + let mut skipped_events = 0usize; + loop { + tokio::select! { + command = command_rx.recv() => { + match command { + Some(ClientCommand::Request { request, response_tx }) => { + let request_sender = request_sender.clone(); + // Request waits happen on a detached task so + // this loop can keep draining runtime events + // while the request is blocked on client input. + tokio::spawn(async move { + let result = request_sender.request(*request).await; + let _ = response_tx.send(result); + }); + } + Some(ClientCommand::Notify { + notification, + response_tx, + }) => { + let result = request_sender.notify(notification); + let _ = response_tx.send(result); + } + Some(ClientCommand::ResolveServerRequest { + request_id, + result, + response_tx, + }) => { + let send_result = + request_sender.respond_to_server_request(request_id, result); + let _ = response_tx.send(send_result); + } + Some(ClientCommand::RejectServerRequest { + request_id, + error, + response_tx, + }) => { + let send_result = request_sender.fail_server_request(request_id, error); + let _ = response_tx.send(send_result); + } + Some(ClientCommand::Shutdown { response_tx }) => { + let shutdown_result = handle.shutdown().await; + let _ = response_tx.send(shutdown_result); + break; + } + None => { + let _ = handle.shutdown().await; + break; + } + } + } + event = handle.next_event(), if event_stream_enabled => { + let Some(event) = event else { + break; + }; + + if skipped_events > 0 { + if event_requires_delivery(&event) { + // Surface lag before the terminal event, but + // do not let the lag marker itself cause us to + // drop the completion/abort notification that + // the caller is blocked on. + if event_tx + .send(InProcessServerEvent::Lagged { + skipped: skipped_events, + }) + .await + .is_err() + { + event_stream_enabled = false; + continue; + } + skipped_events = 0; + } else { + match event_tx.try_send(InProcessServerEvent::Lagged { + skipped: skipped_events, + }) { + Ok(()) => { + skipped_events = 0; + } + Err(mpsc::error::TrySendError::Full(_)) => { + skipped_events = skipped_events.saturating_add(1); + warn!( + "dropping in-process app-server event because consumer queue is full" + ); + if let InProcessServerEvent::ServerRequest(request) = event { + let _ = request_sender.fail_server_request( + request.id().clone(), + JSONRPCErrorError { + code: -32001, + message: "in-process app-server event queue is full".to_string(), + data: None, + }, + ); + } + continue; + } + Err(mpsc::error::TrySendError::Closed(_)) => { + event_stream_enabled = false; + continue; + } + } + } + } + + if event_requires_delivery(&event) { + // Block until the consumer catches up for + // terminal notifications; this preserves the + // completion signal even when the queue is + // otherwise saturated. + if event_tx.send(event).await.is_err() { + event_stream_enabled = false; + } + continue; + } + + match event_tx.try_send(event) { + Ok(()) => {} + Err(mpsc::error::TrySendError::Full(event)) => { + skipped_events = skipped_events.saturating_add(1); + warn!("dropping in-process app-server event because consumer queue is full"); + if let InProcessServerEvent::ServerRequest(request) = event { + let _ = request_sender.fail_server_request( + request.id().clone(), + JSONRPCErrorError { + code: -32001, + message: "in-process app-server event queue is full".to_string(), + data: None, + }, + ); + } + } + Err(mpsc::error::TrySendError::Closed(_)) => { + event_stream_enabled = false; + } + } + } + } + } + }); + + Ok(Self { + command_tx, + event_rx, + worker_handle, + }) + } + + /// Sends a typed client request and returns raw JSON-RPC result. + /// + /// Callers that expect a concrete response type should usually prefer + /// [`request_typed`](Self::request_typed). + pub async fn request(&self, request: ClientRequest) -> IoResult { + let (response_tx, response_rx) = oneshot::channel(); + self.command_tx + .send(ClientCommand::Request { + request: Box::new(request), + response_tx, + }) + .await + .map_err(|_| { + IoError::new( + ErrorKind::BrokenPipe, + "in-process app-server worker channel is closed", + ) + })?; + response_rx.await.map_err(|_| { + IoError::new( + ErrorKind::BrokenPipe, + "in-process app-server request channel is closed", + ) + })? + } + + /// Sends a typed client request and decodes the successful response body. + /// + /// This still deserializes from a JSON value produced by app-server's + /// JSON-RPC result envelope. Because the caller chooses `T`, `Deserialize` + /// failures indicate an internal request/response mismatch at the call site + /// (or an in-process bug), not transport skew from an external client. + pub async fn request_typed(&self, request: ClientRequest) -> Result + where + T: DeserializeOwned, + { + let method = request_method_name(&request); + let response = + self.request(request) + .await + .map_err(|source| TypedRequestError::Transport { + method: method.clone(), + source, + })?; + let result = response.map_err(|source| TypedRequestError::Server { + method: method.clone(), + source, + })?; + serde_json::from_value(result) + .map_err(|source| TypedRequestError::Deserialize { method, source }) + } + + /// Sends a typed client notification. + pub async fn notify(&self, notification: ClientNotification) -> IoResult<()> { + let (response_tx, response_rx) = oneshot::channel(); + self.command_tx + .send(ClientCommand::Notify { + notification, + response_tx, + }) + .await + .map_err(|_| { + IoError::new( + ErrorKind::BrokenPipe, + "in-process app-server worker channel is closed", + ) + })?; + response_rx.await.map_err(|_| { + IoError::new( + ErrorKind::BrokenPipe, + "in-process app-server notify channel is closed", + ) + })? + } + + /// Resolves a pending server request. + /// + /// This should only be called with request IDs obtained from the current + /// client's event stream. + pub async fn resolve_server_request( + &self, + request_id: RequestId, + result: JsonRpcResult, + ) -> IoResult<()> { + let (response_tx, response_rx) = oneshot::channel(); + self.command_tx + .send(ClientCommand::ResolveServerRequest { + request_id, + result, + response_tx, + }) + .await + .map_err(|_| { + IoError::new( + ErrorKind::BrokenPipe, + "in-process app-server worker channel is closed", + ) + })?; + response_rx.await.map_err(|_| { + IoError::new( + ErrorKind::BrokenPipe, + "in-process app-server resolve channel is closed", + ) + })? + } + + /// Rejects a pending server request with JSON-RPC error payload. + pub async fn reject_server_request( + &self, + request_id: RequestId, + error: JSONRPCErrorError, + ) -> IoResult<()> { + let (response_tx, response_rx) = oneshot::channel(); + self.command_tx + .send(ClientCommand::RejectServerRequest { + request_id, + error, + response_tx, + }) + .await + .map_err(|_| { + IoError::new( + ErrorKind::BrokenPipe, + "in-process app-server worker channel is closed", + ) + })?; + response_rx.await.map_err(|_| { + IoError::new( + ErrorKind::BrokenPipe, + "in-process app-server reject channel is closed", + ) + })? + } + + /// Returns the next in-process event, or `None` when worker exits. + /// + /// Callers are expected to drain this stream promptly. If they fall behind, + /// the worker emits [`InProcessServerEvent::Lagged`] markers and may reject + /// pending server requests rather than letting approval flows hang. + pub async fn next_event(&mut self) -> Option { + self.event_rx.recv().await + } + + /// Shuts down worker and in-process runtime with bounded wait. + /// + /// If graceful shutdown exceeds timeout, the worker task is aborted to + /// avoid leaking background tasks in embedding callers. + pub async fn shutdown(self) -> IoResult<()> { + let Self { + command_tx, + event_rx, + worker_handle, + } = self; + let mut worker_handle = worker_handle; + // Drop the caller-facing receiver before asking the worker to shut + // down. That unblocks any pending must-deliver `event_tx.send(..)` + // so the worker can reach `handle.shutdown()` instead of timing out + // and getting aborted with the runtime still attached. + drop(event_rx); + let (response_tx, response_rx) = oneshot::channel(); + if command_tx + .send(ClientCommand::Shutdown { response_tx }) + .await + .is_ok() + && let Ok(command_result) = timeout(SHUTDOWN_TIMEOUT, response_rx).await + { + command_result.map_err(|_| { + IoError::new( + ErrorKind::BrokenPipe, + "in-process app-server shutdown channel is closed", + ) + })??; + } + + if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut worker_handle).await { + worker_handle.abort(); + let _ = worker_handle.await; + } + Ok(()) + } +} + +/// Extracts the JSON-RPC method name for diagnostics without extending the +/// protocol crate with in-process-only helpers. +fn request_method_name(request: &ClientRequest) -> String { + serde_json::to_value(request) + .ok() + .and_then(|value| { + value + .get("method") + .and_then(serde_json::Value::as_str) + .map(ToOwned::to_owned) + }) + .unwrap_or_else(|| "".to_string()) +} + +#[cfg(test)] +mod tests { + use super::*; + use codex_app_server_protocol::ConfigRequirementsReadResponse; + use codex_app_server_protocol::SessionSource as ApiSessionSource; + use codex_app_server_protocol::ThreadStartParams; + use codex_app_server_protocol::ThreadStartResponse; + use codex_core::config::ConfigBuilder; + use pretty_assertions::assert_eq; + use tokio::time::Duration; + use tokio::time::timeout; + + async fn build_test_config() -> Config { + match ConfigBuilder::default().build().await { + Ok(config) => config, + Err(_) => Config::load_default_with_cli_overrides(Vec::new()) + .expect("default config should load"), + } + } + + async fn start_test_client_with_capacity( + session_source: SessionSource, + channel_capacity: usize, + ) -> InProcessAppServerClient { + InProcessAppServerClient::start(InProcessClientStartArgs { + arg0_paths: Arg0DispatchPaths::default(), + config: Arc::new(build_test_config().await), + cli_overrides: Vec::new(), + loader_overrides: LoaderOverrides::default(), + cloud_requirements: CloudRequirementsLoader::default(), + feedback: CodexFeedback::new(), + config_warnings: Vec::new(), + session_source, + enable_codex_api_key_env: false, + client_name: "codex-app-server-client-test".to_string(), + client_version: "0.0.0-test".to_string(), + experimental_api: true, + opt_out_notification_methods: Vec::new(), + channel_capacity, + }) + .await + .expect("in-process app-server client should start") + } + + async fn start_test_client(session_source: SessionSource) -> InProcessAppServerClient { + start_test_client_with_capacity(session_source, DEFAULT_IN_PROCESS_CHANNEL_CAPACITY).await + } + + #[tokio::test] + async fn typed_request_roundtrip_works() { + let client = start_test_client(SessionSource::Exec).await; + let _response: ConfigRequirementsReadResponse = client + .request_typed(ClientRequest::ConfigRequirementsRead { + request_id: RequestId::Integer(1), + params: None, + }) + .await + .expect("typed request should succeed"); + client.shutdown().await.expect("shutdown should complete"); + } + + #[tokio::test] + async fn typed_request_reports_json_rpc_errors() { + let client = start_test_client(SessionSource::Exec).await; + let err = client + .request_typed::(ClientRequest::ThreadRead { + request_id: RequestId::Integer(99), + params: codex_app_server_protocol::ThreadReadParams { + thread_id: "missing-thread".to_string(), + include_turns: false, + }, + }) + .await + .expect_err("missing thread should return a JSON-RPC error"); + assert!( + err.to_string().starts_with("thread/read failed:"), + "expected method-qualified JSON-RPC failure message" + ); + client.shutdown().await.expect("shutdown should complete"); + } + + #[tokio::test] + async fn caller_provided_session_source_is_applied() { + for (session_source, expected_source) in [ + (SessionSource::Exec, ApiSessionSource::Exec), + (SessionSource::Cli, ApiSessionSource::Cli), + ] { + let client = start_test_client(session_source).await; + let parsed: ThreadStartResponse = client + .request_typed(ClientRequest::ThreadStart { + request_id: RequestId::Integer(2), + params: ThreadStartParams { + ephemeral: Some(true), + ..ThreadStartParams::default() + }, + }) + .await + .expect("thread/start should succeed"); + assert_eq!(parsed.thread.source, expected_source); + client.shutdown().await.expect("shutdown should complete"); + } + } + + #[tokio::test] + async fn tiny_channel_capacity_still_supports_request_roundtrip() { + let client = start_test_client_with_capacity(SessionSource::Exec, 1).await; + let _response: ConfigRequirementsReadResponse = client + .request_typed(ClientRequest::ConfigRequirementsRead { + request_id: RequestId::Integer(1), + params: None, + }) + .await + .expect("typed request should succeed"); + client.shutdown().await.expect("shutdown should complete"); + } + + #[test] + fn typed_request_error_exposes_sources() { + let transport = TypedRequestError::Transport { + method: "config/read".to_string(), + source: IoError::new(ErrorKind::BrokenPipe, "closed"), + }; + assert_eq!(std::error::Error::source(&transport).is_some(), true); + + let server = TypedRequestError::Server { + method: "thread/read".to_string(), + source: JSONRPCErrorError { + code: -32603, + data: None, + message: "internal".to_string(), + }, + }; + assert_eq!(std::error::Error::source(&server).is_some(), false); + + let deserialize = TypedRequestError::Deserialize { + method: "thread/start".to_string(), + source: serde_json::from_str::("\"nope\"") + .expect_err("invalid integer should return deserialize error"), + }; + assert_eq!(std::error::Error::source(&deserialize).is_some(), true); + } + + #[tokio::test] + async fn next_event_surfaces_lagged_markers() { + let (command_tx, _command_rx) = mpsc::channel(1); + let (event_tx, event_rx) = mpsc::channel(1); + let worker_handle = tokio::spawn(async {}); + event_tx + .send(InProcessServerEvent::Lagged { skipped: 3 }) + .await + .expect("lagged marker should enqueue"); + drop(event_tx); + + let mut client = InProcessAppServerClient { + command_tx, + event_rx, + worker_handle, + }; + + let event = timeout(Duration::from_secs(2), client.next_event()) + .await + .expect("lagged marker should arrive before timeout"); + assert!(matches!( + event, + Some(InProcessServerEvent::Lagged { skipped: 3 }) + )); + + client.shutdown().await.expect("shutdown should complete"); + } + + #[test] + fn event_requires_delivery_marks_terminal_events() { + assert!(event_requires_delivery( + &InProcessServerEvent::ServerNotification( + codex_app_server_protocol::ServerNotification::TurnCompleted( + codex_app_server_protocol::TurnCompletedNotification { + thread_id: "thread".to_string(), + turn: codex_app_server_protocol::Turn { + id: "turn".to_string(), + items: Vec::new(), + status: codex_app_server_protocol::TurnStatus::Completed, + error: None, + }, + } + ) + ) + )); + assert!(event_requires_delivery( + &InProcessServerEvent::LegacyNotification( + codex_app_server_protocol::JSONRPCNotification { + method: "codex/event/turn_aborted".to_string(), + params: None, + } + ) + )); + assert!(!event_requires_delivery(&InProcessServerEvent::Lagged { + skipped: 1 + })); + } +} diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 54c042525..8e87aa9ec 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -110,6 +110,26 @@ macro_rules! client_request_definitions { )* } + impl ClientRequest { + pub fn id(&self) -> &RequestId { + match self { + $(Self::$variant { request_id, .. } => request_id,)* + } + } + + pub fn method(&self) -> String { + serde_json::to_value(self) + .ok() + .and_then(|value| { + value + .get("method") + .and_then(serde_json::Value::as_str) + .map(str::to_owned) + }) + .unwrap_or_else(|| "".to_string()) + } + } + impl crate::experimental_api::ExperimentalApi for ClientRequest { fn experimental_reason(&self) -> Option<&'static str> { match self { @@ -1136,6 +1156,8 @@ mod tests { request_id: RequestId::Integer(1), params: None, }; + assert_eq!(request.id(), &RequestId::Integer(1)); + assert_eq!(request.method(), "account/rateLimits/read"); assert_eq!( json!({ "method": "account/rateLimits/read", diff --git a/codex-rs/app-server/src/app_server_tracing.rs b/codex-rs/app-server/src/app_server_tracing.rs index 779cef967..fd32be87c 100644 --- a/codex-rs/app-server/src/app_server_tracing.rs +++ b/codex-rs/app-server/src/app_server_tracing.rs @@ -1,6 +1,16 @@ +//! Tracing helpers shared by socket and in-process app-server entry points. +//! +//! The in-process path intentionally reuses the same span shape as JSON-RPC +//! transports so request telemetry stays comparable across stdio, websocket, +//! and embedded callers. [`typed_request_span`] is the in-process counterpart +//! of [`request_span`] and stamps `rpc.transport` as `"in-process"` while +//! deriving client identity from the typed [`ClientRequest`] rather than +//! from a parsed JSON envelope. + use crate::message_processor::ConnectionSessionState; use crate::outgoing_message::ConnectionId; use crate::transport::AppServerTransport; +use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::JSONRPCRequest; use codex_otel::set_parent_from_context; @@ -65,6 +75,51 @@ pub(crate) fn request_span( span } +/// Builds tracing span metadata for typed in-process requests. +/// +/// This mirrors `request_span` semantics while stamping transport as +/// `in-process` and deriving client info either from initialize params or +/// from existing connection session state. +pub(crate) fn typed_request_span( + request: &ClientRequest, + connection_id: ConnectionId, + session: &ConnectionSessionState, +) -> Span { + let method = request.method(); + let span = info_span!( + "app_server.request", + otel.kind = "server", + otel.name = method, + rpc.system = "jsonrpc", + rpc.method = method, + rpc.transport = "in-process", + rpc.request_id = ?request.id(), + app_server.connection_id = ?connection_id, + app_server.api_version = "v2", + app_server.client_name = field::Empty, + app_server.client_version = field::Empty, + ); + + if let Some((client_name, client_version)) = initialize_client_info_from_typed_request(request) + { + span.record("app_server.client_name", client_name); + span.record("app_server.client_version", client_version); + } else { + if let Some(client_name) = session.app_server_client_name.as_deref() { + span.record("app_server.client_name", client_name); + } + if let Some(client_version) = session.client_version.as_deref() { + span.record("app_server.client_version", client_version); + } + } + + if let Some(context) = traceparent_context_from_env() { + set_parent_from_context(&span, context); + } + + span +} + fn transport_name(transport: AppServerTransport) -> &'static str { match transport { AppServerTransport::Stdio => "stdio", @@ -99,3 +154,13 @@ fn initialize_client_info(request: &JSONRPCRequest) -> Option let params = request.params.clone()?; serde_json::from_value(params).ok() } + +fn initialize_client_info_from_typed_request(request: &ClientRequest) -> Option<(&str, &str)> { + match request { + ClientRequest::Initialize { params, .. } => Some(( + params.client_info.name.as_str(), + params.client_info.version.as_str(), + )), + _ => None, + } +} diff --git a/codex-rs/app-server/src/in_process.rs b/codex-rs/app-server/src/in_process.rs new file mode 100644 index 000000000..4e3572ee5 --- /dev/null +++ b/codex-rs/app-server/src/in_process.rs @@ -0,0 +1,884 @@ +//! In-process app-server runtime host for local embedders. +//! +//! This module runs the existing [`MessageProcessor`] and outbound routing logic +//! on Tokio tasks, but replaces socket/stdio transports with bounded in-memory +//! channels. The intent is to preserve app-server semantics while avoiding a +//! process boundary for CLI surfaces that run in the same process. +//! +//! # Lifecycle +//! +//! 1. Construct runtime state with [`InProcessStartArgs`]. +//! 2. Call [`start`], which performs the `initialize` / `initialized` handshake +//! internally and returns a ready-to-use [`InProcessClientHandle`]. +//! 3. Send requests via [`InProcessClientHandle::request`], notifications via +//! [`InProcessClientHandle::notify`], and consume events via +//! [`InProcessClientHandle::next_event`]. +//! 4. Terminate with [`InProcessClientHandle::shutdown`]. +//! +//! # Transport model +//! +//! The runtime is transport-local but not protocol-free. Incoming requests are +//! typed [`ClientRequest`] values, yet responses still come back through the +//! same JSON-RPC result envelope that `MessageProcessor` uses for stdio and +//! websocket transports. This keeps in-process behavior aligned with +//! app-server rather than creating a second execution contract. +//! +//! # Backpressure +//! +//! Command submission uses `try_send` and can return `WouldBlock`, while event +//! fanout may drop notifications under saturation. Server requests are never +//! silently abandoned: if they cannot be queued they are failed back into +//! `MessageProcessor` with overload or internal errors so approval flows do +//! not hang indefinitely. +//! +//! # Relationship to `codex-app-server-client` +//! +//! This module provides the low-level runtime handle ([`InProcessClientHandle`]). +//! Higher-level callers (TUI, exec) should go through `codex-app-server-client`, +//! which wraps this module behind a worker task with async request/response +//! helpers, surface-specific startup policy, and bounded shutdown. + +use std::collections::HashMap; +use std::collections::HashSet; +use std::collections::hash_map::Entry; +use std::io::Error as IoError; +use std::io::ErrorKind; +use std::io::Result as IoResult; +use std::sync::Arc; +use std::sync::RwLock; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::time::Duration; + +use crate::error_code::INTERNAL_ERROR_CODE; +use crate::error_code::INVALID_REQUEST_ERROR_CODE; +use crate::error_code::OVERLOADED_ERROR_CODE; +use crate::message_processor::ConnectionSessionState; +use crate::message_processor::MessageProcessor; +use crate::message_processor::MessageProcessorArgs; +use crate::outgoing_message::ConnectionId; +use crate::outgoing_message::OutgoingEnvelope; +use crate::outgoing_message::OutgoingMessage; +use crate::outgoing_message::OutgoingMessageSender; +use crate::transport::CHANNEL_CAPACITY; +use crate::transport::OutboundConnectionState; +use crate::transport::route_outgoing_envelope; +use codex_app_server_protocol::ClientNotification; +use codex_app_server_protocol::ClientRequest; +use codex_app_server_protocol::ConfigWarningNotification; +use codex_app_server_protocol::InitializeParams; +use codex_app_server_protocol::JSONRPCErrorError; +use codex_app_server_protocol::JSONRPCNotification; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::Result; +use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ServerRequest; +use codex_arg0::Arg0DispatchPaths; +use codex_core::config::Config; +use codex_core::config_loader::CloudRequirementsLoader; +use codex_core::config_loader::LoaderOverrides; +use codex_feedback::CodexFeedback; +use codex_protocol::protocol::SessionSource; +use tokio::sync::mpsc; +use tokio::sync::oneshot; +use tokio::time::timeout; +use toml::Value as TomlValue; +use tracing::warn; + +const IN_PROCESS_CONNECTION_ID: ConnectionId = ConnectionId(0); +const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5); +/// Default bounded channel capacity for in-process runtime queues. +pub const DEFAULT_IN_PROCESS_CHANNEL_CAPACITY: usize = CHANNEL_CAPACITY; + +type PendingClientRequestResponse = std::result::Result; + +fn server_notification_requires_delivery(notification: &ServerNotification) -> bool { + matches!(notification, ServerNotification::TurnCompleted(_)) +} + +fn legacy_notification_requires_delivery(notification: &JSONRPCNotification) -> bool { + matches!( + notification + .method + .strip_prefix("codex/event/") + .unwrap_or(¬ification.method), + "task_complete" | "turn_aborted" | "shutdown_complete" + ) +} + +/// Input needed to start an in-process app-server runtime. +/// +/// These fields mirror the pieces of ambient process state that stdio and +/// websocket transports normally assemble before `MessageProcessor` starts. +#[derive(Clone)] +pub struct InProcessStartArgs { + /// Resolved argv0 dispatch paths used by command execution internals. + pub arg0_paths: Arg0DispatchPaths, + /// Shared base config used to initialize core components. + pub config: Arc, + /// CLI config overrides that are already parsed into TOML values. + pub cli_overrides: Vec<(String, TomlValue)>, + /// Loader override knobs used by config API paths. + pub loader_overrides: LoaderOverrides, + /// Preloaded cloud requirements provider. + pub cloud_requirements: CloudRequirementsLoader, + /// Feedback sink used by app-server/core telemetry and logs. + pub feedback: CodexFeedback, + /// Startup warnings emitted after initialize succeeds. + pub config_warnings: Vec, + /// Session source stamped into thread/session metadata. + pub session_source: SessionSource, + /// Whether auth loading should honor the `CODEX_API_KEY` environment variable. + pub enable_codex_api_key_env: bool, + /// Initialize params used for initial handshake. + pub initialize: InitializeParams, + /// Capacity used for all runtime queues (clamped to at least 1). + pub channel_capacity: usize, +} + +/// Event emitted from the app-server to the in-process client. +/// +/// The stream carries three event families because CLI surfaces are mid-migration +/// from the legacy `codex_protocol::Event` model to the typed app-server +/// notification model. Once all surfaces consume only [`ServerNotification`], +/// [`LegacyNotification`](Self::LegacyNotification) can be removed. +/// +/// [`Lagged`](Self::Lagged) is a transport health marker, not an application +/// event — it signals that the consumer fell behind and some events were dropped. +#[derive(Debug, Clone)] +pub enum InProcessServerEvent { + /// Server request that requires client response/rejection. + ServerRequest(ServerRequest), + /// App-server notification directed to the embedded client. + ServerNotification(ServerNotification), + /// Legacy JSON-RPC notification from core event bridge. + LegacyNotification(JSONRPCNotification), + /// Indicates one or more events were dropped due to backpressure. + Lagged { skipped: usize }, +} + +/// Internal message sent from [`InProcessClientHandle`] methods to the runtime task. +/// +/// Requests carry a oneshot sender for the response; notifications and server-request +/// replies are fire-and-forget from the caller's perspective (transport errors are +/// caught by `try_send` on the outer channel). +enum InProcessClientMessage { + Request { + request: Box, + response_tx: oneshot::Sender, + }, + Notification { + notification: ClientNotification, + }, + ServerRequestResponse { + request_id: RequestId, + result: Result, + }, + ServerRequestError { + request_id: RequestId, + error: JSONRPCErrorError, + }, + Shutdown { + done_tx: oneshot::Sender<()>, + }, +} + +enum ProcessorCommand { + Request(Box), + Notification(ClientNotification), +} + +#[derive(Clone)] +pub struct InProcessClientSender { + client_tx: mpsc::Sender, +} + +impl InProcessClientSender { + pub async fn request(&self, request: ClientRequest) -> IoResult { + let (response_tx, response_rx) = oneshot::channel(); + self.try_send_client_message(InProcessClientMessage::Request { + request: Box::new(request), + response_tx, + })?; + response_rx.await.map_err(|err| { + IoError::new( + ErrorKind::BrokenPipe, + format!("in-process request response channel closed: {err}"), + ) + }) + } + + pub fn notify(&self, notification: ClientNotification) -> IoResult<()> { + self.try_send_client_message(InProcessClientMessage::Notification { notification }) + } + + pub fn respond_to_server_request(&self, request_id: RequestId, result: Result) -> IoResult<()> { + self.try_send_client_message(InProcessClientMessage::ServerRequestResponse { + request_id, + result, + }) + } + + pub fn fail_server_request( + &self, + request_id: RequestId, + error: JSONRPCErrorError, + ) -> IoResult<()> { + self.try_send_client_message(InProcessClientMessage::ServerRequestError { + request_id, + error, + }) + } + + fn try_send_client_message(&self, message: InProcessClientMessage) -> IoResult<()> { + match self.client_tx.try_send(message) { + Ok(()) => Ok(()), + Err(mpsc::error::TrySendError::Full(_)) => Err(IoError::new( + ErrorKind::WouldBlock, + "in-process app-server client queue is full", + )), + Err(mpsc::error::TrySendError::Closed(_)) => Err(IoError::new( + ErrorKind::BrokenPipe, + "in-process app-server runtime is closed", + )), + } + } +} + +/// Handle used by an in-process client to call app-server and consume events. +/// +/// This is the low-level runtime handle. Higher-level callers should usually go +/// through `codex-app-server-client`, which adds worker-task buffering, +/// request/response helpers, and surface-specific startup policy. +pub struct InProcessClientHandle { + client: InProcessClientSender, + event_rx: mpsc::Receiver, + runtime_handle: tokio::task::JoinHandle<()>, +} + +impl InProcessClientHandle { + /// Sends a typed client request into the in-process runtime. + /// + /// The returned value is a transport-level `IoResult` containing either a + /// JSON-RPC success payload or JSON-RPC error payload. Callers must keep + /// request IDs unique among concurrent requests; reusing an in-flight ID + /// produces an `INVALID_REQUEST` response and can make request routing + /// ambiguous in the caller. + pub async fn request(&self, request: ClientRequest) -> IoResult { + self.client.request(request).await + } + + /// Sends a typed client notification into the in-process runtime. + /// + /// Notifications do not have an application-level response. Transport + /// errors indicate queue saturation or closed runtime. + pub fn notify(&self, notification: ClientNotification) -> IoResult<()> { + self.client.notify(notification) + } + + /// Resolves a pending [`ServerRequest`](InProcessServerEvent::ServerRequest). + /// + /// This should be used only with request IDs received from the current + /// runtime event stream; sending arbitrary IDs has no effect on app-server + /// state and can mask a stuck approval flow in the caller. + pub fn respond_to_server_request(&self, request_id: RequestId, result: Result) -> IoResult<()> { + self.client.respond_to_server_request(request_id, result) + } + + /// Rejects a pending [`ServerRequest`](InProcessServerEvent::ServerRequest). + /// + /// Use this when the embedder cannot satisfy a server request; leaving + /// requests unanswered can stall turn progress. + pub fn fail_server_request( + &self, + request_id: RequestId, + error: JSONRPCErrorError, + ) -> IoResult<()> { + self.client.fail_server_request(request_id, error) + } + + /// Receives the next server event from the in-process runtime. + /// + /// Returns `None` when the runtime task exits and no more events are + /// available. + pub async fn next_event(&mut self) -> Option { + self.event_rx.recv().await + } + + /// Requests runtime shutdown and waits for worker termination. + /// + /// Shutdown is bounded by internal timeouts and may abort background tasks + /// if graceful drain does not complete in time. + pub async fn shutdown(self) -> IoResult<()> { + let mut runtime_handle = self.runtime_handle; + let (done_tx, done_rx) = oneshot::channel(); + + if self + .client + .client_tx + .send(InProcessClientMessage::Shutdown { done_tx }) + .await + .is_ok() + { + let _ = timeout(SHUTDOWN_TIMEOUT, done_rx).await; + } + + if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut runtime_handle).await { + runtime_handle.abort(); + let _ = runtime_handle.await; + } + Ok(()) + } + + pub fn sender(&self) -> InProcessClientSender { + self.client.clone() + } +} + +/// Starts an in-process app-server runtime and performs initialize handshake. +/// +/// This function sends `initialize` followed by `initialized` before returning +/// the handle, so callers receive a ready-to-use runtime. If initialize fails, +/// the runtime is shut down and an `InvalidData` error is returned. +pub async fn start(args: InProcessStartArgs) -> IoResult { + let initialize = args.initialize.clone(); + let client = start_uninitialized(args); + + let initialize_response = client + .request(ClientRequest::Initialize { + request_id: RequestId::Integer(0), + params: initialize, + }) + .await?; + if let Err(error) = initialize_response { + let _ = client.shutdown().await; + return Err(IoError::new( + ErrorKind::InvalidData, + format!("in-process initialize failed: {}", error.message), + )); + } + client.notify(ClientNotification::Initialized)?; + + Ok(client) +} + +fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle { + let channel_capacity = args.channel_capacity.max(1); + let (client_tx, mut client_rx) = mpsc::channel::(channel_capacity); + let (event_tx, event_rx) = mpsc::channel::(channel_capacity); + + let runtime_handle = tokio::spawn(async move { + let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(channel_capacity); + let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx)); + + let (writer_tx, mut writer_rx) = mpsc::channel::(channel_capacity); + let outbound_initialized = Arc::new(AtomicBool::new(false)); + let outbound_experimental_api_enabled = Arc::new(AtomicBool::new(false)); + let outbound_opted_out_notification_methods = Arc::new(RwLock::new(HashSet::new())); + + let mut outbound_connections = HashMap::::new(); + outbound_connections.insert( + IN_PROCESS_CONNECTION_ID, + OutboundConnectionState::new( + writer_tx, + Arc::clone(&outbound_initialized), + Arc::clone(&outbound_experimental_api_enabled), + Arc::clone(&outbound_opted_out_notification_methods), + None, + ), + ); + let mut outbound_handle = tokio::spawn(async move { + while let Some(envelope) = outgoing_rx.recv().await { + route_outgoing_envelope(&mut outbound_connections, envelope).await; + } + }); + + let processor_outgoing = Arc::clone(&outgoing_message_sender); + let (processor_tx, mut processor_rx) = mpsc::channel::(channel_capacity); + let mut processor_handle = tokio::spawn(async move { + let mut processor = MessageProcessor::new(MessageProcessorArgs { + outgoing: Arc::clone(&processor_outgoing), + arg0_paths: args.arg0_paths, + config: args.config, + cli_overrides: args.cli_overrides, + loader_overrides: args.loader_overrides, + cloud_requirements: args.cloud_requirements, + feedback: args.feedback, + log_db: None, + config_warnings: args.config_warnings, + session_source: args.session_source, + enable_codex_api_key_env: args.enable_codex_api_key_env, + }); + let mut thread_created_rx = processor.thread_created_receiver(); + let mut session = ConnectionSessionState::default(); + let mut listen_for_threads = true; + + loop { + tokio::select! { + command = processor_rx.recv() => { + match command { + Some(ProcessorCommand::Request(request)) => { + let was_initialized = session.initialized; + processor + .process_client_request( + IN_PROCESS_CONNECTION_ID, + *request, + &mut session, + &outbound_initialized, + ) + .await; + if let Ok(mut opted_out_notification_methods) = + outbound_opted_out_notification_methods.write() + { + *opted_out_notification_methods = + session.opted_out_notification_methods.clone(); + } else { + warn!("failed to update outbound opted-out notifications"); + } + outbound_experimental_api_enabled.store( + session.experimental_api_enabled, + Ordering::Release, + ); + if !was_initialized && session.initialized { + processor.send_initialize_notifications().await; + } + } + Some(ProcessorCommand::Notification(notification)) => { + processor.process_client_notification(notification).await; + } + None => { + break; + } + } + } + created = thread_created_rx.recv(), if listen_for_threads => { + match created { + Ok(thread_id) => { + let connection_ids = if session.initialized { + vec![IN_PROCESS_CONNECTION_ID] + } else { + Vec::::new() + }; + processor + .try_attach_thread_listener(thread_id, connection_ids) + .await; + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { + warn!("thread_created receiver lagged; skipping resync"); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + listen_for_threads = false; + } + } + } + } + } + + processor.connection_closed(IN_PROCESS_CONNECTION_ID).await; + }); + let mut pending_request_responses = + HashMap::>::new(); + let mut shutdown_ack = None; + + loop { + tokio::select! { + message = client_rx.recv() => { + match message { + Some(InProcessClientMessage::Request { request, response_tx }) => { + let request = *request; + let request_id = request.id().clone(); + match pending_request_responses.entry(request_id.clone()) { + Entry::Vacant(entry) => { + entry.insert(response_tx); + } + Entry::Occupied(_) => { + let _ = response_tx.send(Err(JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("duplicate request id: {request_id:?}"), + data: None, + })); + continue; + } + } + + match processor_tx.try_send(ProcessorCommand::Request(Box::new(request))) { + Ok(()) => {} + Err(mpsc::error::TrySendError::Full(_)) => { + if let Some(response_tx) = + pending_request_responses.remove(&request_id) + { + let _ = response_tx.send(Err(JSONRPCErrorError { + code: OVERLOADED_ERROR_CODE, + message: "in-process app-server request queue is full" + .to_string(), + data: None, + })); + } + } + Err(mpsc::error::TrySendError::Closed(_)) => { + if let Some(response_tx) = + pending_request_responses.remove(&request_id) + { + let _ = response_tx.send(Err(JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: + "in-process app-server request processor is closed" + .to_string(), + data: None, + })); + } + break; + } + } + } + Some(InProcessClientMessage::Notification { notification }) => { + match processor_tx.try_send(ProcessorCommand::Notification(notification)) { + Ok(()) => {} + Err(mpsc::error::TrySendError::Full(_)) => { + warn!("dropping in-process client notification (queue full)"); + } + Err(mpsc::error::TrySendError::Closed(_)) => { + break; + } + } + } + Some(InProcessClientMessage::ServerRequestResponse { request_id, result }) => { + outgoing_message_sender + .notify_client_response(request_id, result) + .await; + } + Some(InProcessClientMessage::ServerRequestError { request_id, error }) => { + outgoing_message_sender + .notify_client_error(request_id, error) + .await; + } + Some(InProcessClientMessage::Shutdown { done_tx }) => { + shutdown_ack = Some(done_tx); + break; + } + None => { + break; + } + } + } + outgoing_message = writer_rx.recv() => { + let Some(outgoing_message) = outgoing_message else { + break; + }; + match outgoing_message { + OutgoingMessage::Response(response) => { + if let Some(response_tx) = pending_request_responses.remove(&response.id) { + let _ = response_tx.send(Ok(response.result)); + } else { + warn!( + request_id = ?response.id, + "dropping unmatched in-process response" + ); + } + } + OutgoingMessage::Error(error) => { + if let Some(response_tx) = pending_request_responses.remove(&error.id) { + let _ = response_tx.send(Err(error.error)); + } else { + warn!( + request_id = ?error.id, + "dropping unmatched in-process error response" + ); + } + } + OutgoingMessage::Request(request) => { + // Send directly to avoid cloning; on failure the + // original value is returned inside the error. + if let Err(send_error) = event_tx + .try_send(InProcessServerEvent::ServerRequest(request)) + { + let (code, message, inner) = match send_error { + mpsc::error::TrySendError::Full(inner) => ( + OVERLOADED_ERROR_CODE, + "in-process server request queue is full", + inner, + ), + mpsc::error::TrySendError::Closed(inner) => ( + INTERNAL_ERROR_CODE, + "in-process server request consumer is closed", + inner, + ), + }; + let request_id = match inner { + InProcessServerEvent::ServerRequest(req) => req.id().clone(), + _ => unreachable!("we just sent a ServerRequest variant"), + }; + outgoing_message_sender + .notify_client_error( + request_id, + JSONRPCErrorError { + code, + message: message.to_string(), + data: None, + }, + ) + .await; + } + } + OutgoingMessage::AppServerNotification(notification) => { + if server_notification_requires_delivery(¬ification) { + if event_tx + .send(InProcessServerEvent::ServerNotification(notification)) + .await + .is_err() + { + break; + } + } else if let Err(send_error) = + event_tx.try_send(InProcessServerEvent::ServerNotification(notification)) + { + match send_error { + mpsc::error::TrySendError::Full(_) => { + warn!("dropping in-process server notification (queue full)"); + } + mpsc::error::TrySendError::Closed(_) => { + break; + } + } + } + } + OutgoingMessage::Notification(notification) => { + let notification = JSONRPCNotification { + method: notification.method, + params: notification.params, + }; + if legacy_notification_requires_delivery(¬ification) { + if event_tx + .send(InProcessServerEvent::LegacyNotification(notification)) + .await + .is_err() + { + break; + } + } else if let Err(send_error) = + event_tx.try_send(InProcessServerEvent::LegacyNotification(notification)) + { + match send_error { + mpsc::error::TrySendError::Full(_) => { + warn!("dropping in-process legacy notification (queue full)"); + } + mpsc::error::TrySendError::Closed(_) => { + break; + } + } + } + } + } + } + } + } + + drop(writer_rx); + drop(processor_tx); + outgoing_message_sender + .cancel_all_requests(Some(JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: "in-process app-server runtime is shutting down".to_string(), + data: None, + })) + .await; + // Drop the runtime's last sender before awaiting the router task so + // `outgoing_rx.recv()` can observe channel closure and exit cleanly. + drop(outgoing_message_sender); + for (_, response_tx) in pending_request_responses { + let _ = response_tx.send(Err(JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: "in-process app-server runtime is shutting down".to_string(), + data: None, + })); + } + + if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut processor_handle).await { + processor_handle.abort(); + let _ = processor_handle.await; + } + if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut outbound_handle).await { + outbound_handle.abort(); + let _ = outbound_handle.await; + } + + if let Some(done_tx) = shutdown_ack { + let _ = done_tx.send(()); + } + }); + + InProcessClientHandle { + client: InProcessClientSender { client_tx }, + event_rx, + runtime_handle, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use codex_app_server_protocol::ClientInfo; + use codex_app_server_protocol::ConfigRequirementsReadResponse; + use codex_app_server_protocol::SessionSource as ApiSessionSource; + use codex_app_server_protocol::ThreadStartParams; + use codex_app_server_protocol::ThreadStartResponse; + use codex_app_server_protocol::Turn; + use codex_app_server_protocol::TurnCompletedNotification; + use codex_app_server_protocol::TurnStatus; + use codex_core::config::ConfigBuilder; + use pretty_assertions::assert_eq; + + async fn build_test_config() -> Config { + match ConfigBuilder::default().build().await { + Ok(config) => config, + Err(_) => Config::load_default_with_cli_overrides(Vec::new()) + .expect("default config should load"), + } + } + + async fn start_test_client_with_capacity( + session_source: SessionSource, + channel_capacity: usize, + ) -> InProcessClientHandle { + let args = InProcessStartArgs { + arg0_paths: Arg0DispatchPaths::default(), + config: Arc::new(build_test_config().await), + cli_overrides: Vec::new(), + loader_overrides: LoaderOverrides::default(), + cloud_requirements: CloudRequirementsLoader::default(), + feedback: CodexFeedback::new(), + config_warnings: Vec::new(), + session_source, + enable_codex_api_key_env: false, + initialize: InitializeParams { + client_info: ClientInfo { + name: "codex-in-process-test".to_string(), + title: None, + version: "0.0.0".to_string(), + }, + capabilities: None, + }, + channel_capacity, + }; + start(args).await.expect("in-process runtime should start") + } + + async fn start_test_client(session_source: SessionSource) -> InProcessClientHandle { + start_test_client_with_capacity(session_source, DEFAULT_IN_PROCESS_CHANNEL_CAPACITY).await + } + + #[tokio::test] + async fn in_process_start_initializes_and_handles_typed_v2_request() { + let client = start_test_client(SessionSource::Cli).await; + let response = client + .request(ClientRequest::ConfigRequirementsRead { + request_id: RequestId::Integer(1), + params: None, + }) + .await + .expect("request transport should work") + .expect("request should succeed"); + assert!(response.is_object()); + + let _parsed: ConfigRequirementsReadResponse = + serde_json::from_value(response).expect("response should match v2 schema"); + client + .shutdown() + .await + .expect("in-process runtime should shutdown cleanly"); + } + + #[tokio::test] + async fn in_process_start_uses_requested_session_source_for_thread_start() { + for (requested_source, expected_source) in [ + (SessionSource::Cli, ApiSessionSource::Cli), + (SessionSource::Exec, ApiSessionSource::Exec), + ] { + let client = start_test_client(requested_source).await; + let response = client + .request(ClientRequest::ThreadStart { + request_id: RequestId::Integer(2), + params: ThreadStartParams { + ephemeral: Some(true), + ..ThreadStartParams::default() + }, + }) + .await + .expect("request transport should work") + .expect("thread/start should succeed"); + let parsed: ThreadStartResponse = + serde_json::from_value(response).expect("thread/start response should parse"); + assert_eq!(parsed.thread.source, expected_source); + client + .shutdown() + .await + .expect("in-process runtime should shutdown cleanly"); + } + } + + #[tokio::test] + async fn in_process_start_clamps_zero_channel_capacity() { + let client = start_test_client_with_capacity(SessionSource::Cli, 0).await; + let response = loop { + match client + .request(ClientRequest::ConfigRequirementsRead { + request_id: RequestId::Integer(4), + params: None, + }) + .await + { + Ok(response) => break response.expect("request should succeed"), + Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { + tokio::task::yield_now().await; + } + Err(err) => panic!("request transport should work: {err}"), + } + }; + let _parsed: ConfigRequirementsReadResponse = + serde_json::from_value(response).expect("response should match v2 schema"); + client + .shutdown() + .await + .expect("in-process runtime should shutdown cleanly"); + } + + #[test] + fn guaranteed_delivery_helpers_cover_terminal_notifications() { + assert!(server_notification_requires_delivery( + &ServerNotification::TurnCompleted(TurnCompletedNotification { + thread_id: "thread-1".to_string(), + turn: Turn { + id: "turn-1".to_string(), + items: Vec::new(), + status: TurnStatus::Completed, + error: None, + }, + }) + )); + + assert!(legacy_notification_requires_delivery( + &JSONRPCNotification { + method: "codex/event/task_complete".to_string(), + params: None, + } + )); + assert!(legacy_notification_requires_delivery( + &JSONRPCNotification { + method: "codex/event/turn_aborted".to_string(), + params: None, + } + )); + assert!(legacy_notification_requires_delivery( + &JSONRPCNotification { + method: "codex/event/shutdown_complete".to_string(), + params: None, + } + )); + assert!(!legacy_notification_requires_delivery( + &JSONRPCNotification { + method: "codex/event/item_started".to_string(), + params: None, + } + )); + } +} diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 34796c575..26f90eb61 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -39,6 +39,7 @@ use codex_core::check_execpolicy_for_warnings; use codex_core::config_loader::ConfigLoadError; use codex_core::config_loader::TextRange as CoreTextRange; use codex_feedback::CodexFeedback; +use codex_protocol::protocol::SessionSource; use codex_state::log_db; use tokio::sync::mpsc; use tokio::task::JoinHandle; @@ -65,6 +66,7 @@ mod error_code; mod external_agent_config_api; mod filters; mod fuzzy_file_search; +pub mod in_process; mod message_processor; mod models; mod outgoing_message; @@ -597,6 +599,8 @@ pub async fn run_main_with_transport( feedback: feedback.clone(), log_db, config_warnings, + session_source: SessionSource::VSCode, + enable_codex_api_key_env: false, }); let mut thread_created_rx = processor.thread_created_receiver(); let mut running_turn_count_rx = processor.subscribe_running_assistant_turn_count(); diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index d9c7d08bd..76ca255c9 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -18,6 +18,7 @@ use codex_app_server_protocol::ChatgptAuthTokensRefreshParams; use codex_app_server_protocol::ChatgptAuthTokensRefreshReason; use codex_app_server_protocol::ChatgptAuthTokensRefreshResponse; use codex_app_server_protocol::ClientInfo; +use codex_app_server_protocol::ClientNotification; use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::ConfigBatchWriteParams; use codex_app_server_protocol::ConfigReadParams; @@ -157,6 +158,8 @@ pub(crate) struct MessageProcessorArgs { pub(crate) feedback: CodexFeedback, pub(crate) log_db: Option, pub(crate) config_warnings: Vec, + pub(crate) session_source: SessionSource, + pub(crate) enable_codex_api_key_env: bool, } impl MessageProcessor { @@ -173,10 +176,12 @@ impl MessageProcessor { feedback, log_db, config_warnings, + session_source, + enable_codex_api_key_env, } = args; let auth_manager = AuthManager::shared( config.codex_home.clone(), - false, + enable_codex_api_key_env, config.cli_auth_credentials_store_mode, ); auth_manager.set_forced_chatgpt_workspace_id(config.forced_chatgpt_workspace_id.clone()); @@ -186,7 +191,7 @@ impl MessageProcessor { let thread_manager = Arc::new(ThreadManager::new( config.codex_home.clone(), auth_manager.clone(), - SessionSource::VSCode, + session_source, config.model_catalog.clone(), CollaborationModesConfig { default_mode_request_user_input: config @@ -275,187 +280,50 @@ impl MessageProcessor { } }; - match codex_request { - // Handle Initialize internally so CodexMessageProcessor does not have to concern - // itself with the `initialized` bool. - ClientRequest::Initialize { request_id, params } => { - let request_id = ConnectionRequestId { - connection_id, - request_id, - }; - if session.initialized { - let error = JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: "Already initialized".to_string(), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - return; - } else { - // TODO(maxj): Revisit capability scoping for `experimental_api_enabled`. - // Current behavior is per-connection. Reviewer feedback notes this can - // create odd cross-client behavior (for example dynamic tool calls on a - // shared thread when another connected client did not opt into - // experimental API). Proposed direction is instance-global first-write-wins - // with initialize-time mismatch rejection. - let (experimental_api_enabled, opt_out_notification_methods) = - match params.capabilities { - Some(capabilities) => ( - capabilities.experimental_api, - capabilities - .opt_out_notification_methods - .unwrap_or_default(), - ), - None => (false, Vec::new()), - }; - session.experimental_api_enabled = experimental_api_enabled; - session.opted_out_notification_methods = - opt_out_notification_methods.into_iter().collect(); - let ClientInfo { - name, - title: _title, - version, - } = params.client_info; - session.app_server_client_name = Some(name.clone()); - session.client_version = Some(version.clone()); - if let Err(error) = set_default_originator(name.clone()) { - match error { - SetOriginatorError::InvalidHeaderValue => { - let error = JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!( - "Invalid clientInfo.name: '{name}'. Must be a valid HTTP header value." - ), - data: None, - }; - self.outgoing.send_error(request_id.clone(), error).await; - return; - } - SetOriginatorError::AlreadyInitialized => { - // No-op. This is expected to happen if the originator is already set via env var. - // TODO(owen): Once we remove support for CODEX_INTERNAL_ORIGINATOR_OVERRIDE, - // this will be an unexpected state and we can return a JSON-RPC error indicating - // internal server error. - } - } - } - set_default_client_residency_requirement(self.config.enforce_residency.value()); - let user_agent_suffix = format!("{name}; {version}"); - if let Ok(mut suffix) = USER_AGENT_SUFFIX.lock() { - *suffix = Some(user_agent_suffix); - } + self.handle_client_request( + connection_id, + request_id, + codex_request, + session, + outbound_initialized, + ) + .await; + } + .instrument(request_span) + .await; + } - let user_agent = get_codex_user_agent(); - let response = InitializeResponse { user_agent }; - self.outgoing.send_response(request_id, response).await; - - session.initialized = true; - outbound_initialized.store(true, Ordering::Release); - self.codex_message_processor - .connection_initialized(connection_id) - .await; - return; - } - } - _ => { - if !session.initialized { - let error = JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: "Not initialized".to_string(), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - return; - } - } - } - if let Some(reason) = codex_request.experimental_reason() - && !session.experimental_api_enabled - { - let error = JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: experimental_required_message(reason), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - return; - } - - match codex_request { - ClientRequest::ConfigRead { request_id, params } => { - self.handle_config_read( - ConnectionRequestId { - connection_id, - request_id, - }, - params, - ) - .await; - } - ClientRequest::ExternalAgentConfigDetect { request_id, params } => { - self.handle_external_agent_config_detect( - ConnectionRequestId { - connection_id, - request_id, - }, - params, - ) - .await; - } - ClientRequest::ExternalAgentConfigImport { request_id, params } => { - self.handle_external_agent_config_import( - ConnectionRequestId { - connection_id, - request_id, - }, - params, - ) - .await; - } - ClientRequest::ConfigValueWrite { request_id, params } => { - self.handle_config_value_write( - ConnectionRequestId { - connection_id, - request_id, - }, - params, - ) - .await; - } - ClientRequest::ConfigBatchWrite { request_id, params } => { - self.handle_config_batch_write( - ConnectionRequestId { - connection_id, - request_id, - }, - params, - ) - .await; - } - ClientRequest::ConfigRequirementsRead { - request_id, - params: _, - } => { - self.handle_config_requirements_read(ConnectionRequestId { - connection_id, - request_id, - }) - .await; - } - other => { - // Box the delegated future so this wrapper's async state machine does not - // inline the full `CodexMessageProcessor::process_request` future, which - // can otherwise push worker-thread stack usage over the edge. - self.codex_message_processor - .process_request( - connection_id, - other, - session.app_server_client_name.clone(), - ) - .boxed() - .await; - } - } + /// Handles a typed request path used by in-process embedders. + /// + /// This bypasses JSON request deserialization but keeps identical request + /// semantics by delegating to `handle_client_request`. + pub(crate) async fn process_client_request( + &mut self, + connection_id: ConnectionId, + request: ClientRequest, + session: &mut ConnectionSessionState, + outbound_initialized: &AtomicBool, + ) { + let request_span = + crate::app_server_tracing::typed_request_span(&request, connection_id, session); + async { + let request_id = ConnectionRequestId { + connection_id, + request_id: request.id().clone(), + }; + tracing::trace!( + ?connection_id, + request_id = ?request_id.request_id, + "app-server typed request" + ); + self.handle_client_request( + connection_id, + request_id, + request, + session, + outbound_initialized, + ) + .await; } .instrument(request_span) .await; @@ -467,6 +335,13 @@ impl MessageProcessor { tracing::info!("<- notification: {:?}", notification); } + /// Handles typed notifications from in-process clients. + pub(crate) async fn process_client_notification(&self, notification: ClientNotification) { + // Currently, we do not expect to receive any typed notifications from + // in-process clients, so we just log them. + tracing::info!("<- typed notification: {:?}", notification); + } + pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver { self.codex_message_processor.thread_created_receiver() } @@ -513,6 +388,193 @@ impl MessageProcessor { self.outgoing.notify_client_error(err.id, err.error).await; } + async fn handle_client_request( + &mut self, + connection_id: ConnectionId, + request_id: ConnectionRequestId, + codex_request: ClientRequest, + session: &mut ConnectionSessionState, + outbound_initialized: &AtomicBool, + ) { + match codex_request { + // Handle Initialize internally so CodexMessageProcessor does not have to concern + // itself with the `initialized` bool. + ClientRequest::Initialize { request_id, params } => { + let request_id = ConnectionRequestId { + connection_id, + request_id, + }; + if session.initialized { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: "Already initialized".to_string(), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + + // TODO(maxj): Revisit capability scoping for `experimental_api_enabled`. + // Current behavior is per-connection. Reviewer feedback notes this can + // create odd cross-client behavior (for example dynamic tool calls on a + // shared thread when another connected client did not opt into + // experimental API). Proposed direction is instance-global first-write-wins + // with initialize-time mismatch rejection. + let (experimental_api_enabled, opt_out_notification_methods) = + match params.capabilities { + Some(capabilities) => ( + capabilities.experimental_api, + capabilities + .opt_out_notification_methods + .unwrap_or_default(), + ), + None => (false, Vec::new()), + }; + session.experimental_api_enabled = experimental_api_enabled; + session.opted_out_notification_methods = + opt_out_notification_methods.into_iter().collect(); + let ClientInfo { + name, + title: _title, + version, + } = params.client_info; + session.app_server_client_name = Some(name.clone()); + session.client_version = Some(version.clone()); + if let Err(error) = set_default_originator(name.clone()) { + match error { + SetOriginatorError::InvalidHeaderValue => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!( + "Invalid clientInfo.name: '{name}'. Must be a valid HTTP header value." + ), + data: None, + }; + self.outgoing.send_error(request_id.clone(), error).await; + return; + } + SetOriginatorError::AlreadyInitialized => { + // No-op. This is expected to happen if the originator is already set via env var. + // TODO(owen): Once we remove support for CODEX_INTERNAL_ORIGINATOR_OVERRIDE, + // this will be an unexpected state and we can return a JSON-RPC error indicating + // internal server error. + } + } + } + set_default_client_residency_requirement(self.config.enforce_residency.value()); + let user_agent_suffix = format!("{name}; {version}"); + if let Ok(mut suffix) = USER_AGENT_SUFFIX.lock() { + *suffix = Some(user_agent_suffix); + } + + let user_agent = get_codex_user_agent(); + let response = InitializeResponse { user_agent }; + self.outgoing.send_response(request_id, response).await; + + session.initialized = true; + outbound_initialized.store(true, Ordering::Release); + self.codex_message_processor + .connection_initialized(connection_id) + .await; + return; + } + _ => { + if !session.initialized { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: "Not initialized".to_string(), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + } + } + if let Some(reason) = codex_request.experimental_reason() + && !session.experimental_api_enabled + { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: experimental_required_message(reason), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + + match codex_request { + ClientRequest::ConfigRead { request_id, params } => { + self.handle_config_read( + ConnectionRequestId { + connection_id, + request_id, + }, + params, + ) + .await; + } + ClientRequest::ExternalAgentConfigDetect { request_id, params } => { + self.handle_external_agent_config_detect( + ConnectionRequestId { + connection_id, + request_id, + }, + params, + ) + .await; + } + ClientRequest::ExternalAgentConfigImport { request_id, params } => { + self.handle_external_agent_config_import( + ConnectionRequestId { + connection_id, + request_id, + }, + params, + ) + .await; + } + ClientRequest::ConfigValueWrite { request_id, params } => { + self.handle_config_value_write( + ConnectionRequestId { + connection_id, + request_id, + }, + params, + ) + .await; + } + ClientRequest::ConfigBatchWrite { request_id, params } => { + self.handle_config_batch_write( + ConnectionRequestId { + connection_id, + request_id, + }, + params, + ) + .await; + } + ClientRequest::ConfigRequirementsRead { + request_id, + params: _, + } => { + self.handle_config_requirements_read(ConnectionRequestId { + connection_id, + request_id, + }) + .await; + } + other => { + // Box the delegated future so this wrapper's async state machine does not + // inline the full `CodexMessageProcessor::process_request` future, which + // can otherwise push worker-thread stack usage over the edge. + self.codex_message_processor + .process_request(connection_id, other, session.app_server_client_name.clone()) + .boxed() + .await; + } + } + } + async fn handle_config_read(&self, request_id: ConnectionRequestId, params: ConfigReadParams) { match self.config_api.read(params).await { Ok(response) => self.outgoing.send_response(request_id, response).await, diff --git a/codex-rs/app-server/src/outgoing_message.rs b/codex-rs/app-server/src/outgoing_message.rs index bc38bd36c..d615e5151 100644 --- a/codex-rs/app-server/src/outgoing_message.rs +++ b/codex-rs/app-server/src/outgoing_message.rs @@ -273,6 +273,25 @@ impl OutgoingMessageSender { self.take_request_callback(id).await.is_some() } + pub(crate) async fn cancel_all_requests(&self, error: Option) { + let entries = { + let mut request_id_to_callback = self.request_id_to_callback.lock().await; + request_id_to_callback + .drain() + .map(|(_, entry)| entry) + .collect::>() + }; + + if let Some(error) = error { + for entry in entries { + if let Err(err) = entry.callback.send(Err(error.clone())) { + let request_id = entry.request.id(); + warn!("could not notify callback for {request_id:?} due to: {err:?}"); + } + } + } + } + async fn take_request_callback( &self, id: &RequestId, diff --git a/codex-rs/exec/Cargo.toml b/codex-rs/exec/Cargo.toml index 1eb4f7e27..378523560 100644 --- a/codex-rs/exec/Cargo.toml +++ b/codex-rs/exec/Cargo.toml @@ -19,8 +19,11 @@ workspace = true anyhow = { workspace = true } clap = { workspace = true, features = ["derive"] } codex-arg0 = { workspace = true } +codex-app-server-client = { workspace = true } +codex-app-server-protocol = { workspace = true } codex-cloud-requirements = { workspace = true } codex-core = { workspace = true } +codex-feedback = { workspace = true } codex-otel = { workspace = true } codex-protocol = { workspace = true } codex-utils-absolute-path = { workspace = true } diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index 55978021f..b4336db81 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -13,13 +13,38 @@ pub mod exec_events; pub use cli::Cli; pub use cli::Command; pub use cli::ReviewArgs; +use codex_app_server_client::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY; +use codex_app_server_client::InProcessAppServerClient; +use codex_app_server_client::InProcessClientStartArgs; +use codex_app_server_client::InProcessServerEvent; +use codex_app_server_protocol::ChatgptAuthTokensRefreshResponse; +use codex_app_server_protocol::ClientRequest; +use codex_app_server_protocol::ConfigWarningNotification; +use codex_app_server_protocol::JSONRPCErrorError; +use codex_app_server_protocol::JSONRPCNotification; +use codex_app_server_protocol::McpServerElicitationAction; +use codex_app_server_protocol::McpServerElicitationRequestResponse; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ReviewStartParams; +use codex_app_server_protocol::ReviewStartResponse; +use codex_app_server_protocol::ReviewTarget as ApiReviewTarget; +use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ServerRequest; +use codex_app_server_protocol::ThreadResumeParams; +use codex_app_server_protocol::ThreadResumeResponse; +use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::ThreadUnsubscribeParams; +use codex_app_server_protocol::ThreadUnsubscribeResponse; +use codex_app_server_protocol::TurnInterruptParams; +use codex_app_server_protocol::TurnInterruptResponse; +use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::TurnStartResponse; use codex_arg0::Arg0DispatchPaths; use codex_cloud_requirements::cloud_requirements_loader; use codex_core::AuthManager; use codex_core::LMSTUDIO_OSS_PROVIDER_ID; -use codex_core::NewThread; use codex_core::OLLAMA_OSS_PROVIDER_ID; -use codex_core::ThreadManager; use codex_core::auth::enforce_login_restrictions; use codex_core::check_execpolicy_for_warnings; use codex_core::config::Config; @@ -29,23 +54,22 @@ use codex_core::config::find_codex_home; use codex_core::config::load_config_as_toml_with_cli_overrides; use codex_core::config::resolve_oss_provider; use codex_core::config_loader::ConfigLoadError; +use codex_core::config_loader::LoaderOverrides; use codex_core::config_loader::format_config_error_with_source; use codex_core::format_exec_policy_error_with_source; use codex_core::git_info::get_git_repo_root; -use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig; -use codex_core::models_manager::manager::RefreshStrategy; +use codex_feedback::CodexFeedback; use codex_otel::set_parent_from_context; use codex_otel::traceparent_context_from_env; -use codex_protocol::approvals::ElicitationAction; +use codex_protocol::account::PlanType as AccountPlanType; use codex_protocol::config_types::SandboxMode; use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; -use codex_protocol::protocol::Op; use codex_protocol::protocol::ReviewRequest; use codex_protocol::protocol::ReviewTarget; +use codex_protocol::protocol::SessionConfiguredEvent; use codex_protocol::protocol::SessionSource; -use codex_protocol::protocol::SubAgentSource; use codex_protocol::user_input::UserInput; use codex_utils_absolute_path::AbsolutePathBuf; use codex_utils_oss::ensure_oss_provider_ready; @@ -54,14 +78,13 @@ use event_processor_with_human_output::EventProcessorWithHumanOutput; use event_processor_with_jsonl_output::EventProcessorWithJsonOutput; use serde_json::Value; use std::collections::HashSet; +use std::collections::VecDeque; use std::io::IsTerminal; use std::io::Read; use std::path::PathBuf; -use std::sync::Arc; use supports_color::Stream; -use tokio::sync::Mutex; +use tokio::sync::mpsc; use tracing::Instrument; -use tracing::debug; use tracing::error; use tracing::field; use tracing::info; @@ -91,15 +114,24 @@ enum InitialOperation { }, } -#[derive(Clone)] -struct ThreadEventEnvelope { - thread_id: codex_protocol::ThreadId, - thread: Arc, - event: Event, - suppress_output: bool, +struct RequestIdSequencer { + next: i64, +} + +impl RequestIdSequencer { + fn new() -> Self { + Self { next: 1 } + } + + fn next(&mut self) -> RequestId { + let id = self.next; + self.next += 1; + RequestId::Integer(id) + } } struct ExecRunArgs { + in_process_start_args: InProcessClientStartArgs, command: Option, config: Config, cursor_ansi: bool, @@ -265,6 +297,9 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result // TODO(gt): Make cloud requirements failures blocking once we can fail-closed. let cloud_requirements = cloud_requirements_loader(cloud_auth_manager, chatgpt_base_url, codex_home.clone()); + let run_cli_overrides = cli_kv_overrides.clone(); + let run_loader_overrides = LoaderOverrides::default(); + let run_cloud_requirements = cloud_requirements.clone(); let model_provider = if oss { let resolved = resolve_oss_provider( @@ -382,7 +417,34 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result if let Some(context) = traceparent_context_from_env() { set_parent_from_context(&exec_span, context); } + let config_warnings: Vec = config + .startup_warnings + .iter() + .map(|warning| ConfigWarningNotification { + summary: warning.clone(), + details: None, + path: None, + range: None, + }) + .collect(); + let in_process_start_args = InProcessClientStartArgs { + arg0_paths, + config: std::sync::Arc::new(config.clone()), + cli_overrides: run_cli_overrides, + loader_overrides: run_loader_overrides, + cloud_requirements: run_cloud_requirements, + feedback: CodexFeedback::new(), + config_warnings, + session_source: SessionSource::Exec, + enable_codex_api_key_env: true, + client_name: "codex-exec".to_string(), + client_version: env!("CARGO_PKG_VERSION").to_string(), + experimental_api: true, + opt_out_notification_methods: Vec::new(), + channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY, + }; run_exec_session(ExecRunArgs { + in_process_start_args, command, config, cursor_ansi, @@ -404,6 +466,7 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> { let ExecRunArgs { + in_process_start_args, command, config, cursor_ansi, @@ -469,49 +532,73 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> { std::process::exit(1); } - let auth_manager = AuthManager::shared( - config.codex_home.clone(), - true, - config.cli_auth_credentials_store_mode, - ); - let thread_manager = Arc::new(ThreadManager::new( - config.codex_home.clone(), - auth_manager.clone(), - SessionSource::Exec, - config.model_catalog.clone(), - CollaborationModesConfig { - default_mode_request_user_input: config - .features - .enabled(codex_core::features::Feature::DefaultModeRequestUserInput), - }, - )); - let default_model = thread_manager - .get_models_manager() - .get_default_model(&config.model, RefreshStrategy::OnlineIfUncached) - .await; + let mut request_ids = RequestIdSequencer::new(); + let mut client = InProcessAppServerClient::start(in_process_start_args) + .await + .map_err(|err| { + anyhow::anyhow!("failed to initialize in-process app-server client: {err}") + })?; // Handle resume subcommand by resolving a rollout path and using explicit resume API. - let NewThread { - thread_id: primary_thread_id, - thread, - session_configured, - } = if let Some(ExecCommand::Resume(args)) = command.as_ref() { - let resume_path = resolve_resume_path(&config, args).await?; + let (primary_thread_id, fallback_session_configured) = + if let Some(ExecCommand::Resume(args)) = command.as_ref() { + let resume_path = resolve_resume_path(&config, args).await?; - if let Some(path) = resume_path { - thread_manager - .resume_thread_from_rollout(config.clone(), path, auth_manager.clone()) - .await? + if let Some(path) = resume_path { + let response: ThreadResumeResponse = send_request_with_response( + &client, + ClientRequest::ThreadResume { + request_id: request_ids.next(), + params: thread_resume_params_from_config(&config, Some(path)), + }, + "thread/resume", + ) + .await + .map_err(anyhow::Error::msg)?; + let session_configured = session_configured_from_thread_resume_response(&response) + .map_err(anyhow::Error::msg)?; + (session_configured.session_id, session_configured) + } else { + let response: ThreadStartResponse = send_request_with_response( + &client, + ClientRequest::ThreadStart { + request_id: request_ids.next(), + params: thread_start_params_from_config(&config), + }, + "thread/start", + ) + .await + .map_err(anyhow::Error::msg)?; + let session_configured = session_configured_from_thread_start_response(&response) + .map_err(anyhow::Error::msg)?; + (session_configured.session_id, session_configured) + } } else { - thread_manager.start_thread(config.clone()).await? - } - } else { - thread_manager.start_thread(config.clone()).await? - }; + let response: ThreadStartResponse = send_request_with_response( + &client, + ClientRequest::ThreadStart { + request_id: request_ids.next(), + params: thread_start_params_from_config(&config), + }, + "thread/start", + ) + .await + .map_err(anyhow::Error::msg)?; + let session_configured = session_configured_from_thread_start_response(&response) + .map_err(anyhow::Error::msg)?; + (session_configured.session_id, session_configured) + }; + let primary_thread_id_for_span = primary_thread_id.to_string(); + let mut buffered_events = VecDeque::new(); + // Use the start/resume response as the authoritative bootstrap payload. + // Waiting for a later streamed `SessionConfigured` event adds up to 10s of + // avoidable startup latency on the in-process path. + let session_configured = fallback_session_configured; + exec_span.record("thread.id", primary_thread_id_for_span.as_str()); - let (initial_operation, prompt_summary) = match (command, prompt, images) { + let (initial_operation, prompt_summary) = match (command.as_ref(), prompt, images) { (Some(ExecCommand::Review(review_cli)), _, _) => { let review_request = build_review_request(review_cli)?; let summary = codex_core::review_prompts::user_facing_hint(&review_request.target); @@ -532,7 +619,7 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> { let prompt_text = resolve_prompt(prompt_arg); let mut items: Vec = imgs .into_iter() - .chain(args.images.into_iter()) + .chain(args.images.iter().cloned()) .map(|path| UserInput::LocalImage { path }) .collect(); items.push(UserInput::Text { @@ -577,84 +664,62 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> { info!("Codex initialized with event: {session_configured:?}"); - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); - let attached_threads = Arc::new(Mutex::new(HashSet::from([primary_thread_id]))); - spawn_thread_listener(primary_thread_id, thread.clone(), tx.clone(), false); - - { - let thread = thread.clone(); - tokio::spawn(async move { - if tokio::signal::ctrl_c().await.is_ok() { - tracing::debug!("Keyboard interrupt"); - // Immediately notify Codex to abort any in-flight task. - thread.submit(Op::Interrupt).await.ok(); - } - }); - } - - { - let thread_manager = Arc::clone(&thread_manager); - let attached_threads = Arc::clone(&attached_threads); - let tx = tx.clone(); - let mut thread_created_rx = thread_manager.subscribe_thread_created(); - tokio::spawn(async move { - loop { - match thread_created_rx.recv().await { - Ok(thread_id) => { - if attached_threads.lock().await.contains(&thread_id) { - continue; - } - match thread_manager.get_thread(thread_id).await { - Ok(thread) => { - attached_threads.lock().await.insert(thread_id); - let suppress_output = - is_agent_job_subagent(&thread.config_snapshot().await); - spawn_thread_listener( - thread_id, - thread, - tx.clone(), - suppress_output, - ); - } - Err(err) => { - warn!("failed to attach listener for thread {thread_id}: {err}") - } - } - } - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { - warn!("thread_created receiver lagged; skipping resync"); - } - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, - } - } - }); - } + let (interrupt_tx, mut interrupt_rx) = mpsc::unbounded_channel::<()>(); + tokio::spawn(async move { + if tokio::signal::ctrl_c().await.is_ok() { + tracing::debug!("Keyboard interrupt"); + let _ = interrupt_tx.send(()); + } + }); let task_id = match initial_operation { InitialOperation::UserTurn { items, output_schema, } => { - let task_id = thread - .submit(Op::UserTurn { - items, - cwd: default_cwd, - approval_policy: default_approval_policy, - sandbox_policy: default_sandbox_policy.clone(), - model: default_model, - effort: default_effort, - summary: None, - service_tier: None, - final_output_json_schema: output_schema, - collaboration_mode: None, - personality: None, - }) - .await?; + let response: TurnStartResponse = send_request_with_response( + &client, + ClientRequest::TurnStart { + request_id: request_ids.next(), + params: TurnStartParams { + thread_id: primary_thread_id_for_span.clone(), + input: items.into_iter().map(Into::into).collect(), + cwd: Some(default_cwd), + approval_policy: Some(default_approval_policy.into()), + sandbox_policy: Some(default_sandbox_policy.clone().into()), + model: None, + service_tier: None, + effort: default_effort, + summary: None, + personality: None, + output_schema, + collaboration_mode: None, + }, + }, + "turn/start", + ) + .await + .map_err(anyhow::Error::msg)?; + let task_id = response.turn.id; info!("Sent prompt with event ID: {task_id}"); task_id } InitialOperation::Review { review_request } => { - let task_id = thread.submit(Op::Review { review_request }).await?; + let response: ReviewStartResponse = send_request_with_response( + &client, + ClientRequest::ReviewStart { + request_id: request_ids.next(), + params: ReviewStartParams { + thread_id: primary_thread_id_for_span.clone(), + target: review_target_to_api(review_request.target), + delivery: None, + }, + }, + "review/start", + ) + .await + .map_err(anyhow::Error::msg)?; + let task_id = response.turn.id; info!("Sent review request with event ID: {task_id}"); task_id } @@ -665,70 +730,158 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> { // Track whether a fatal error was reported by the server so we can // exit with a non-zero status for automation-friendly signaling. let mut error_seen = false; - let mut shutdown_requested = false; - while let Some(envelope) = rx.recv().await { - let ThreadEventEnvelope { - thread_id, - thread, - event, - suppress_output, - } = envelope; - if suppress_output && should_suppress_agent_job_event(&event.msg) { - continue; - } - if matches!(event.msg, EventMsg::Error(_)) { - error_seen = true; - } - if shutdown_requested - && !matches!(&event.msg, EventMsg::ShutdownComplete | EventMsg::Error(_)) - { - continue; - } - if let EventMsg::ElicitationRequest(ev) = &event.msg { - // Automatically cancel elicitation requests in exec mode. - thread - .submit(Op::ResolveElicitation { - server_name: ev.server_name.clone(), - request_id: ev.id.clone(), - decision: ElicitationAction::Cancel, - content: None, - meta: None, - }) - .await?; - } - if let EventMsg::McpStartupUpdate(update) = &event.msg - && required_mcp_servers.contains(&update.server) - && let codex_protocol::protocol::McpStartupStatus::Failed { error } = &update.status - { - error_seen = true; - eprintln!( - "Required MCP server '{}' failed to initialize: {error}", - update.server - ); - if !shutdown_requested { - thread.submit(Op::Shutdown).await?; - shutdown_requested = true; + let mut interrupt_channel_open = true; + let primary_thread_id_for_requests = primary_thread_id.to_string(); + loop { + let server_event = if let Some(event) = buffered_events.pop_front() { + Some(event) + } else { + tokio::select! { + maybe_interrupt = interrupt_rx.recv(), if interrupt_channel_open => { + if maybe_interrupt.is_none() { + interrupt_channel_open = false; + continue; + } + if let Err(err) = send_request_with_response::( + &client, + ClientRequest::TurnInterrupt { + request_id: request_ids.next(), + params: TurnInterruptParams { + thread_id: primary_thread_id_for_requests.clone(), + turn_id: task_id.clone(), + }, + }, + "turn/interrupt", + ) + .await + { + warn!("turn/interrupt failed: {err}"); + } + continue; + } + maybe_event = client.next_event() => maybe_event, } - } - if thread_id != primary_thread_id && matches!(&event.msg, EventMsg::TurnComplete(_)) { - continue; - } - let shutdown = event_processor.process_event(event); - if thread_id != primary_thread_id && matches!(shutdown, CodexStatus::InitiateShutdown) { - continue; - } - match shutdown { - CodexStatus::Running => continue, - CodexStatus::InitiateShutdown => { - if !shutdown_requested { - thread.submit(Op::Shutdown).await?; - shutdown_requested = true; + }; + + let Some(server_event) = server_event else { + break; + }; + + match server_event { + InProcessServerEvent::ServerRequest(request) => { + handle_server_request( + &client, + request, + &config, + &primary_thread_id_for_requests, + &mut error_seen, + ) + .await; + } + InProcessServerEvent::ServerNotification(notification) => { + if let ServerNotification::Error(payload) = ¬ification + && payload.thread_id == primary_thread_id_for_requests + && payload.turn_id == task_id + && !payload.will_retry + { + error_seen = true; } } - CodexStatus::Shutdown if thread_id == primary_thread_id => break, - CodexStatus::Shutdown => continue, + InProcessServerEvent::LegacyNotification(notification) => { + let decoded = match decode_legacy_notification(notification) { + Ok(event) => event, + Err(err) => { + warn!("{err}"); + continue; + } + }; + if decoded.conversation_id.as_deref() + != Some(primary_thread_id_for_requests.as_str()) + && decoded.conversation_id.is_some() + { + continue; + } + let event = decoded.event; + if matches!(event.msg, EventMsg::SessionConfigured(_)) { + continue; + } + if matches!(event.msg, EventMsg::Error(_)) { + // The legacy bridge still carries fatal turn failures for + // exec. Preserve the non-zero exit behavior until this + // path is fully replaced by typed server notifications. + error_seen = true; + } + match &event.msg { + EventMsg::TurnComplete(payload) => { + if payload.turn_id != task_id { + continue; + } + } + EventMsg::TurnAborted(payload) => { + if payload.turn_id.as_deref() != Some(task_id.as_str()) { + continue; + } + } + EventMsg::McpStartupUpdate(update) => { + if required_mcp_servers.contains(&update.server) + && let codex_protocol::protocol::McpStartupStatus::Failed { error } = + &update.status + { + error_seen = true; + eprintln!( + "Required MCP server '{}' failed to initialize: {error}", + update.server + ); + if let Err(err) = request_shutdown( + &client, + &mut request_ids, + &primary_thread_id_for_requests, + ) + .await + { + warn!("thread/unsubscribe failed during shutdown: {err}"); + } + break; + } + } + _ => {} + } + + match event_processor.process_event(event) { + CodexStatus::Running => {} + CodexStatus::InitiateShutdown => { + if let Err(err) = request_shutdown( + &client, + &mut request_ids, + &primary_thread_id_for_requests, + ) + .await + { + warn!("thread/unsubscribe failed during shutdown: {err}"); + } + break; + } + CodexStatus::Shutdown => { + // `ShutdownComplete` does not identify which attached + // thread emitted it, so subagent shutdowns must not end + // the primary exec loop early. + } + } + } + InProcessServerEvent::Lagged { skipped } => { + let message = lagged_event_warning_message(skipped); + warn!("{message}"); + let _ = event_processor.process_event(Event { + id: String::new(), + msg: EventMsg::Warning(codex_protocol::protocol::WarningEvent { message }), + }); + } } } + + if let Err(err) = client.shutdown().await { + warn!("in-process app-server shutdown failed: {err}"); + } event_processor.print_final_output(); if error_seen { std::process::exit(1); @@ -737,68 +890,475 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> { Ok(()) } -fn spawn_thread_listener( - thread_id: codex_protocol::ThreadId, - thread: Arc, - tx: tokio::sync::mpsc::UnboundedSender, - suppress_output: bool, -) { - tokio::spawn(async move { - loop { - match thread.next_event().await { - Ok(event) => { - debug!("Received event: {event:?}"); - - let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete); - if let Err(err) = tx.send(ThreadEventEnvelope { - thread_id, - thread: Arc::clone(&thread), - event, - suppress_output, - }) { - error!("Error sending event: {err:?}"); - break; - } - if is_shutdown_complete { - info!( - "Received shutdown event for thread {thread_id}, exiting event loop." - ); - break; - } - } - Err(err) => { - error!("Error receiving event: {err:?}"); - break; - } - } +fn sandbox_mode_from_policy( + sandbox_policy: &codex_protocol::protocol::SandboxPolicy, +) -> Option { + match sandbox_policy { + codex_protocol::protocol::SandboxPolicy::DangerFullAccess => { + Some(codex_app_server_protocol::SandboxMode::DangerFullAccess) } - }); -} - -fn is_agent_job_subagent(config: &codex_core::ThreadConfigSnapshot) -> bool { - match &config.session_source { - SessionSource::SubAgent(SubAgentSource::Other(source)) => source.starts_with("agent_job:"), - _ => false, + codex_protocol::protocol::SandboxPolicy::ReadOnly { .. } => { + Some(codex_app_server_protocol::SandboxMode::ReadOnly) + } + codex_protocol::protocol::SandboxPolicy::WorkspaceWrite { .. } => { + Some(codex_app_server_protocol::SandboxMode::WorkspaceWrite) + } + codex_protocol::protocol::SandboxPolicy::ExternalSandbox { .. } => None, } } -fn should_suppress_agent_job_event(msg: &EventMsg) -> bool { - !matches!( - msg, - EventMsg::ExecApprovalRequest(_) - | EventMsg::ApplyPatchApprovalRequest(_) - | EventMsg::RequestUserInput(_) - | EventMsg::DynamicToolCallRequest(_) - | EventMsg::DynamicToolCallResponse(_) - | EventMsg::ElicitationRequest(_) - | EventMsg::Error(_) - | EventMsg::Warning(_) - | EventMsg::DeprecationNotice(_) - | EventMsg::StreamError(_) - | EventMsg::ShutdownComplete +fn thread_start_params_from_config(config: &Config) -> ThreadStartParams { + ThreadStartParams { + model: config.model.clone(), + model_provider: Some(config.model_provider_id.clone()), + cwd: Some(config.cwd.to_string_lossy().to_string()), + approval_policy: Some(config.permissions.approval_policy.value().into()), + sandbox: sandbox_mode_from_policy(config.permissions.sandbox_policy.get()), + ephemeral: Some(config.ephemeral), + ..ThreadStartParams::default() + } +} + +fn thread_resume_params_from_config(config: &Config, path: Option) -> ThreadResumeParams { + ThreadResumeParams { + thread_id: "resume".to_string(), + path, + model: config.model.clone(), + model_provider: Some(config.model_provider_id.clone()), + cwd: Some(config.cwd.to_string_lossy().to_string()), + approval_policy: Some(config.permissions.approval_policy.value().into()), + sandbox: sandbox_mode_from_policy(config.permissions.sandbox_policy.get()), + ..ThreadResumeParams::default() + } +} + +async fn send_request_with_response( + client: &InProcessAppServerClient, + request: ClientRequest, + method: &str, +) -> Result +where + T: serde::de::DeserializeOwned, +{ + client.request_typed(request).await.map_err(|err| { + if method.is_empty() { + err.to_string() + } else { + format!("{method}: {err}") + } + }) +} + +fn session_configured_from_thread_start_response( + response: &ThreadStartResponse, +) -> Result { + session_configured_from_thread_response( + &response.thread.id, + response.thread.name.clone(), + response.thread.path.clone(), + response.model.clone(), + response.model_provider.clone(), + response.service_tier, + response.approval_policy.to_core(), + response.sandbox.to_core(), + response.cwd.clone(), + response.reasoning_effort, ) } +fn session_configured_from_thread_resume_response( + response: &ThreadResumeResponse, +) -> Result { + session_configured_from_thread_response( + &response.thread.id, + response.thread.name.clone(), + response.thread.path.clone(), + response.model.clone(), + response.model_provider.clone(), + response.service_tier, + response.approval_policy.to_core(), + response.sandbox.to_core(), + response.cwd.clone(), + response.reasoning_effort, + ) +} + +#[expect( + clippy::too_many_arguments, + reason = "session mapping keeps explicit fields" +)] +/// Synthesizes startup session metadata from `thread/start` or `thread/resume`. +/// +/// This is a compatibility bridge for the current in-process architecture. +/// Some session fields are not available synchronously from the start/resume +/// response, so callers must treat the result as a best-effort fallback until +/// a later `SessionConfigured` event proves otherwise. +/// TODO(architecture): stop synthesizing a partial `SessionConfiguredEvent` +/// here. Either return the authoritative session-configured payload from +/// `thread/start`/`thread/resume`, or introduce a smaller bootstrap type for +/// exec so this path cannot accidentally depend on placeholder fields. +fn session_configured_from_thread_response( + thread_id: &str, + thread_name: Option, + rollout_path: Option, + model: String, + model_provider_id: String, + service_tier: Option, + approval_policy: AskForApproval, + sandbox_policy: codex_protocol::protocol::SandboxPolicy, + cwd: PathBuf, + reasoning_effort: Option, +) -> Result { + let session_id = codex_protocol::ThreadId::from_string(thread_id) + .map_err(|err| format!("thread id `{thread_id}` is invalid: {err}"))?; + + Ok(SessionConfiguredEvent { + session_id, + forked_from_id: None, + thread_name, + model, + model_provider_id, + service_tier, + approval_policy, + sandbox_policy, + cwd, + reasoning_effort, + history_log_id: 0, + history_entry_count: 0, + initial_messages: None, + network_proxy: None, + rollout_path, + }) +} + +fn review_target_to_api(target: ReviewTarget) -> ApiReviewTarget { + match target { + ReviewTarget::UncommittedChanges => ApiReviewTarget::UncommittedChanges, + ReviewTarget::BaseBranch { branch } => ApiReviewTarget::BaseBranch { branch }, + ReviewTarget::Commit { sha, title } => ApiReviewTarget::Commit { sha, title }, + ReviewTarget::Custom { instructions } => ApiReviewTarget::Custom { instructions }, + } +} + +fn normalize_legacy_notification_method(method: &str) -> &str { + method.strip_prefix("codex/event/").unwrap_or(method) +} + +fn lagged_event_warning_message(skipped: usize) -> String { + format!("in-process app-server event stream lagged; dropped {skipped} events") +} + +struct DecodedLegacyNotification { + conversation_id: Option, + event: Event, +} + +fn decode_legacy_notification( + notification: JSONRPCNotification, +) -> Result { + let value = notification + .params + .unwrap_or_else(|| serde_json::Value::Object(serde_json::Map::new())); + let method = notification.method; + let normalized_method = normalize_legacy_notification_method(&method).to_string(); + let serde_json::Value::Object(mut object) = value else { + return Err(format!( + "legacy notification `{method}` params were not an object" + )); + }; + let conversation_id = object + .get("conversationId") + .and_then(serde_json::Value::as_str) + .map(str::to_owned); + let mut event_payload = if let Some(serde_json::Value::Object(msg_payload)) = object.get("msg") + { + serde_json::Value::Object(msg_payload.clone()) + } else { + object.remove("conversationId"); + serde_json::Value::Object(object) + }; + let serde_json::Value::Object(ref mut object) = event_payload else { + return Err(format!( + "legacy notification `{method}` event payload was not an object" + )); + }; + object.insert( + "type".to_string(), + serde_json::Value::String(normalized_method), + ); + + let msg: EventMsg = serde_json::from_value(event_payload) + .map_err(|err| format!("failed to decode event: {err}"))?; + Ok(DecodedLegacyNotification { + conversation_id, + event: Event { + id: String::new(), + msg, + }, + }) +} + +fn canceled_mcp_server_elicitation_response() -> Result { + serde_json::to_value(McpServerElicitationRequestResponse { + action: McpServerElicitationAction::Cancel, + content: None, + meta: None, + }) + .map_err(|err| format!("failed to encode mcp elicitation response: {err}")) +} + +async fn request_shutdown( + client: &InProcessAppServerClient, + request_ids: &mut RequestIdSequencer, + thread_id: &str, +) -> Result<(), String> { + let request = ClientRequest::ThreadUnsubscribe { + request_id: request_ids.next(), + params: ThreadUnsubscribeParams { + thread_id: thread_id.to_string(), + }, + }; + send_request_with_response::(client, request, "thread/unsubscribe") + .await + .map(|_| ()) +} + +async fn resolve_server_request( + client: &InProcessAppServerClient, + request_id: RequestId, + value: serde_json::Value, + method: &str, +) -> Result<(), String> { + client + .resolve_server_request(request_id, value) + .await + .map_err(|err| format!("failed to resolve `{method}` server request: {err}")) +} + +async fn reject_server_request( + client: &InProcessAppServerClient, + request_id: RequestId, + method: &str, + reason: String, +) -> Result<(), String> { + client + .reject_server_request( + request_id, + JSONRPCErrorError { + code: -32000, + message: reason, + data: None, + }, + ) + .await + .map_err(|err| format!("failed to reject `{method}` server request: {err}")) +} + +fn server_request_method_name(request: &ServerRequest) -> String { + serde_json::to_value(request) + .ok() + .and_then(|value| { + value + .get("method") + .and_then(serde_json::Value::as_str) + .map(str::to_owned) + }) + .unwrap_or_else(|| "unknown".to_string()) +} + +async fn handle_server_request( + client: &InProcessAppServerClient, + request: ServerRequest, + config: &Config, + _thread_id: &str, + error_seen: &mut bool, +) { + let method = server_request_method_name(&request); + let handle_result = match request { + ServerRequest::McpServerElicitationRequest { request_id, .. } => { + // Exec auto-cancels elicitation instead of surfacing it + // interactively. Preserve that behavior for attached subagent + // threads too so we do not turn a cancel into a decline/error. + match canceled_mcp_server_elicitation_response() { + Ok(value) => { + resolve_server_request( + client, + request_id, + value, + "mcpServer/elicitation/request", + ) + .await + } + Err(err) => Err(err), + } + } + ServerRequest::ChatgptAuthTokensRefresh { request_id, params } => { + let refresh_result = tokio::task::spawn_blocking({ + let config = config.clone(); + move || local_external_chatgpt_tokens(&config) + }) + .await; + + match refresh_result { + Err(err) => { + reject_server_request( + client, + request_id, + &method, + format!("local chatgpt auth refresh task failed in exec: {err}"), + ) + .await + } + Ok(Err(reason)) => reject_server_request(client, request_id, &method, reason).await, + Ok(Ok(response)) => { + if let Some(previous_account_id) = params.previous_account_id.as_deref() + && previous_account_id != response.chatgpt_account_id + { + warn!( + "local auth refresh account mismatch: expected `{previous_account_id}`, got `{}`", + response.chatgpt_account_id + ); + } + match serde_json::to_value(response) { + Ok(value) => { + resolve_server_request( + client, + request_id, + value, + "account/chatgptAuthTokens/refresh", + ) + .await + } + Err(err) => Err(format!( + "failed to serialize chatgpt auth refresh response: {err}" + )), + } + } + } + } + ServerRequest::CommandExecutionRequestApproval { request_id, params } => { + reject_server_request( + client, + request_id, + &method, + format!( + "command execution approval is not supported in exec mode for thread `{}`", + params.thread_id + ), + ) + .await + } + ServerRequest::FileChangeRequestApproval { request_id, params } => { + reject_server_request( + client, + request_id, + &method, + format!( + "file change approval is not supported in exec mode for thread `{}`", + params.thread_id + ), + ) + .await + } + ServerRequest::ToolRequestUserInput { request_id, params } => { + reject_server_request( + client, + request_id, + &method, + format!( + "request_user_input is not supported in exec mode for thread `{}`", + params.thread_id + ), + ) + .await + } + ServerRequest::DynamicToolCall { request_id, params } => { + reject_server_request( + client, + request_id, + &method, + format!( + "dynamic tool calls are not supported in exec mode for thread `{}`", + params.thread_id + ), + ) + .await + } + ServerRequest::ApplyPatchApproval { request_id, params } => { + reject_server_request( + client, + request_id, + &method, + format!( + "apply_patch approval is not supported in exec mode for thread `{}`", + params.conversation_id + ), + ) + .await + } + ServerRequest::ExecCommandApproval { request_id, params } => { + reject_server_request( + client, + request_id, + &method, + format!( + "exec command approval is not supported in exec mode for thread `{}`", + params.conversation_id + ), + ) + .await + } + }; + + if let Err(err) = handle_result { + *error_seen = true; + warn!("{err}"); + } +} + +fn local_external_chatgpt_tokens( + config: &Config, +) -> Result { + let auth_manager = AuthManager::shared( + config.codex_home.clone(), + false, + config.cli_auth_credentials_store_mode, + ); + auth_manager.set_forced_chatgpt_workspace_id(config.forced_chatgpt_workspace_id.clone()); + auth_manager.reload(); + + let auth = auth_manager + .auth_cached() + .ok_or_else(|| "no cached auth available for local token refresh".to_string())?; + if !auth.is_external_chatgpt_tokens() { + return Err("external ChatGPT token auth is not active".to_string()); + } + + let access_token = auth + .get_token() + .map_err(|err| format!("failed to read external access token: {err}"))?; + let chatgpt_account_id = auth + .get_account_id() + .ok_or_else(|| "external token auth is missing chatgpt account id".to_string())?; + let chatgpt_plan_type = auth.account_plan_type().map(|plan_type| match plan_type { + AccountPlanType::Free => "free".to_string(), + AccountPlanType::Go => "go".to_string(), + AccountPlanType::Plus => "plus".to_string(), + AccountPlanType::Pro => "pro".to_string(), + AccountPlanType::Team => "team".to_string(), + AccountPlanType::Business => "business".to_string(), + AccountPlanType::Enterprise => "enterprise".to_string(), + AccountPlanType::Edu => "edu".to_string(), + AccountPlanType::Unknown => "unknown".to_string(), + }); + + Ok(ChatgptAuthTokensRefreshResponse { + access_token, + chatgpt_account_id, + chatgpt_plan_type, + }) +} + async fn resolve_resume_path( config: &Config, args: &crate::cli::ResumeArgs, @@ -980,17 +1540,17 @@ fn resolve_prompt(prompt_arg: Option) -> String { } } -fn build_review_request(args: ReviewArgs) -> anyhow::Result { +fn build_review_request(args: &ReviewArgs) -> anyhow::Result { let target = if args.uncommitted { ReviewTarget::UncommittedChanges - } else if let Some(branch) = args.base { + } else if let Some(branch) = args.base.clone() { ReviewTarget::BaseBranch { branch } - } else if let Some(sha) = args.commit { + } else if let Some(sha) = args.commit.clone() { ReviewTarget::Commit { sha, - title: args.commit_title, + title: args.commit_title.clone(), } - } else if let Some(prompt_arg) = args.prompt { + } else if let Some(prompt_arg) = args.prompt.clone() { let prompt = resolve_prompt(Some(prompt_arg)).trim().to_string(); if prompt.is_empty() { anyhow::bail!("Review prompt cannot be empty"); @@ -1053,14 +1613,14 @@ mod tests { #[test] fn builds_uncommitted_review_request() { - let request = build_review_request(ReviewArgs { + let args = ReviewArgs { uncommitted: true, base: None, commit: None, commit_title: None, prompt: None, - }) - .expect("builds uncommitted review request"); + }; + let request = build_review_request(&args).expect("builds uncommitted review request"); let expected = ReviewRequest { target: ReviewTarget::UncommittedChanges, @@ -1072,14 +1632,14 @@ mod tests { #[test] fn builds_commit_review_request_with_title() { - let request = build_review_request(ReviewArgs { + let args = ReviewArgs { uncommitted: false, base: None, commit: Some("123456789".to_string()), commit_title: Some("Add review command".to_string()), prompt: None, - }) - .expect("builds commit review request"); + }; + let request = build_review_request(&args).expect("builds commit review request"); let expected = ReviewRequest { target: ReviewTarget::Commit { @@ -1094,14 +1654,14 @@ mod tests { #[test] fn builds_custom_review_request_trims_prompt() { - let request = build_review_request(ReviewArgs { + let args = ReviewArgs { uncommitted: false, base: None, commit: None, commit_title: None, prompt: Some(" custom review instructions ".to_string()), - }) - .expect("builds custom review request"); + }; + let request = build_review_request(&args).expect("builds custom review request"); let expected = ReviewRequest { target: ReviewTarget::Custom { @@ -1187,4 +1747,52 @@ mod tests { assert_eq!(err, PromptDecodeError::InvalidUtf8 { valid_up_to: 0 }); } + + #[test] + fn lagged_event_warning_message_is_explicit() { + assert_eq!( + lagged_event_warning_message(7), + "in-process app-server event stream lagged; dropped 7 events".to_string() + ); + } + + #[test] + fn decode_legacy_notification_preserves_conversation_id() { + let decoded = decode_legacy_notification(JSONRPCNotification { + method: "codex/event/error".to_string(), + params: Some(serde_json::json!({ + "conversationId": "thread-123", + "msg": { + "message": "boom" + } + })), + }) + .expect("legacy notification should decode"); + + assert_eq!(decoded.conversation_id.as_deref(), Some("thread-123")); + assert!(matches!( + decoded.event.msg, + EventMsg::Error(codex_protocol::protocol::ErrorEvent { + message, + codex_error_info: None, + }) if message == "boom" + )); + } + + #[test] + fn canceled_mcp_server_elicitation_response_uses_cancel_action() { + let value = canceled_mcp_server_elicitation_response() + .expect("mcp elicitation cancel response should serialize"); + let response: McpServerElicitationRequestResponse = + serde_json::from_value(value).expect("cancel response should deserialize"); + + assert_eq!( + response, + McpServerElicitationRequestResponse { + action: McpServerElicitationAction::Cancel, + content: None, + meta: None, + } + ); + } }