core-agent-ide/codex-rs/app-server-client/src/lib.rs
Ahmed Ibrahim 2e22885e79
Split features into codex-features crate (#15253)
- Split the feature system into a new `codex-features` crate.
- Cut `codex-core` and workspace consumers over to the new config and
warning APIs.

Co-authored-by: Ahmed Ibrahim <219906144+aibrahim-oai@users.noreply.github.com>
Co-authored-by: Codex <noreply@openai.com>
2026-03-19 20:12:07 -07:00

1571 lines
60 KiB
Rust

//! Shared in-process app-server client facade for CLI surfaces.
//!
//! This crate wraps [`codex_app_server::in_process`] behind a single async API
//! used by surfaces like TUI and exec. It centralizes:
//!
//! - Runtime startup and initialize-capabilities handshake.
//! - Typed caller-provided startup identity (`SessionSource` + client name).
//! - Typed and raw request/notification dispatch.
//! - Server request resolution and rejection.
//! - Event consumption with backpressure signaling ([`InProcessServerEvent::Lagged`]).
//! - Bounded graceful shutdown with abort fallback.
//!
//! The facade interposes a worker task between the caller and the underlying
//! [`InProcessClientHandle`](codex_app_server::in_process::InProcessClientHandle),
//! bridging async `mpsc` channels on both sides. Queues are bounded so overload
//! surfaces as channel-full errors rather than unbounded memory growth.
mod remote;
use std::error::Error;
use std::fmt;
use std::io::Error as IoError;
use std::io::ErrorKind;
use std::io::Result as IoResult;
use std::sync::Arc;
use std::time::Duration;
pub use codex_app_server::in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY;
pub use codex_app_server::in_process::InProcessServerEvent;
use codex_app_server::in_process::InProcessStartArgs;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::InitializeCapabilities;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result as JsonRpcResult;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_arg0::Arg0DispatchPaths;
use codex_core::AuthManager;
use codex_core::ThreadManager;
use codex_core::config::Config;
use codex_core::config_loader::CloudRequirementsLoader;
use codex_core::config_loader::LoaderOverrides;
use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig;
use codex_features::Feature;
use codex_feedback::CodexFeedback;
use codex_protocol::protocol::SessionSource;
use serde::de::DeserializeOwned;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::timeout;
use toml::Value as TomlValue;
use tracing::warn;
pub use crate::remote::RemoteAppServerClient;
pub use crate::remote::RemoteAppServerConnectArgs;
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
/// Raw app-server request result for typed in-process requests.
///
/// Even on the in-process path, successful responses still travel back through
/// the same JSON-RPC result envelope used by socket/stdio transports because
/// `MessageProcessor` continues to produce that shape internally.
pub type RequestResult = std::result::Result<JsonRpcResult, JSONRPCErrorError>;
#[derive(Debug, Clone)]
pub enum AppServerEvent {
Lagged { skipped: usize },
ServerNotification(ServerNotification),
LegacyNotification(JSONRPCNotification),
ServerRequest(ServerRequest),
Disconnected { message: String },
}
impl From<InProcessServerEvent> for AppServerEvent {
fn from(value: InProcessServerEvent) -> Self {
match value {
InProcessServerEvent::Lagged { skipped } => Self::Lagged { skipped },
InProcessServerEvent::ServerNotification(notification) => {
Self::ServerNotification(notification)
}
InProcessServerEvent::LegacyNotification(notification) => {
Self::LegacyNotification(notification)
}
InProcessServerEvent::ServerRequest(request) => Self::ServerRequest(request),
}
}
}
fn event_requires_delivery(event: &InProcessServerEvent) -> bool {
// These terminal events drive surface shutdown/completion state. Dropping
// them under backpressure can leave exec/TUI waiting forever even though
// the underlying turn has already ended.
match event {
InProcessServerEvent::ServerNotification(
codex_app_server_protocol::ServerNotification::TurnCompleted(_),
) => true,
InProcessServerEvent::LegacyNotification(notification) => matches!(
notification
.method
.strip_prefix("codex/event/")
.unwrap_or(&notification.method),
"task_complete" | "turn_aborted" | "shutdown_complete"
),
_ => false,
}
}
/// Layered error for [`InProcessAppServerClient::request_typed`].
///
/// This keeps transport failures, server-side JSON-RPC failures, and response
/// decode failures distinct so callers can decide whether to retry, surface a
/// server error, or treat the response as an internal request/response mismatch.
#[derive(Debug)]
pub enum TypedRequestError {
Transport {
method: String,
source: IoError,
},
Server {
method: String,
source: JSONRPCErrorError,
},
Deserialize {
method: String,
source: serde_json::Error,
},
}
impl fmt::Display for TypedRequestError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Transport { method, source } => {
write!(f, "{method} transport error: {source}")
}
Self::Server { method, source } => {
write!(f, "{method} failed: {}", source.message)
}
Self::Deserialize { method, source } => {
write!(f, "{method} response decode error: {source}")
}
}
}
}
impl Error for TypedRequestError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::Transport { source, .. } => Some(source),
Self::Server { .. } => None,
Self::Deserialize { source, .. } => Some(source),
}
}
}
#[derive(Clone)]
struct SharedCoreManagers {
// Temporary bootstrap escape hatch for embedders that still need direct
// core handles during the in-process app-server migration. Once TUI/exec
// stop depending on direct manager access, remove this wrapper and keep
// manager ownership entirely inside the app-server runtime.
auth_manager: Arc<AuthManager>,
thread_manager: Arc<ThreadManager>,
}
#[derive(Clone)]
pub struct InProcessClientStartArgs {
/// Resolved argv0 dispatch paths used by command execution internals.
pub arg0_paths: Arg0DispatchPaths,
/// Shared config used to initialize app-server runtime.
pub config: Arc<Config>,
/// CLI config overrides that are already parsed into TOML values.
pub cli_overrides: Vec<(String, TomlValue)>,
/// Loader override knobs used by config API paths.
pub loader_overrides: LoaderOverrides,
/// Preloaded cloud requirements provider.
pub cloud_requirements: CloudRequirementsLoader,
/// Feedback sink used by app-server/core telemetry and logs.
pub feedback: CodexFeedback,
/// Startup warnings emitted after initialize succeeds.
pub config_warnings: Vec<ConfigWarningNotification>,
/// Session source recorded in app-server thread metadata.
pub session_source: SessionSource,
/// Whether auth loading should honor the `CODEX_API_KEY` environment variable.
pub enable_codex_api_key_env: bool,
/// Client name reported during initialize.
pub client_name: String,
/// Client version reported during initialize.
pub client_version: String,
/// Whether experimental APIs are requested at initialize time.
pub experimental_api: bool,
/// Notification methods this client opts out of receiving.
pub opt_out_notification_methods: Vec<String>,
/// Queue capacity for command/event channels (clamped to at least 1).
pub channel_capacity: usize,
}
impl InProcessClientStartArgs {
fn shared_core_managers(&self) -> SharedCoreManagers {
let auth_manager = AuthManager::shared(
self.config.codex_home.clone(),
self.enable_codex_api_key_env,
self.config.cli_auth_credentials_store_mode,
);
let thread_manager = Arc::new(ThreadManager::new(
self.config.as_ref(),
auth_manager.clone(),
self.session_source.clone(),
CollaborationModesConfig {
default_mode_request_user_input: self
.config
.features
.enabled(Feature::DefaultModeRequestUserInput),
},
));
SharedCoreManagers {
auth_manager,
thread_manager,
}
}
/// Builds initialize params from caller-provided metadata.
pub fn initialize_params(&self) -> InitializeParams {
let capabilities = InitializeCapabilities {
experimental_api: self.experimental_api,
opt_out_notification_methods: if self.opt_out_notification_methods.is_empty() {
None
} else {
Some(self.opt_out_notification_methods.clone())
},
};
InitializeParams {
client_info: ClientInfo {
name: self.client_name.clone(),
title: None,
version: self.client_version.clone(),
},
capabilities: Some(capabilities),
}
}
fn into_runtime_start_args(self, shared_core: &SharedCoreManagers) -> InProcessStartArgs {
let initialize = self.initialize_params();
InProcessStartArgs {
arg0_paths: self.arg0_paths,
config: self.config,
cli_overrides: self.cli_overrides,
loader_overrides: self.loader_overrides,
cloud_requirements: self.cloud_requirements,
auth_manager: Some(shared_core.auth_manager.clone()),
thread_manager: Some(shared_core.thread_manager.clone()),
feedback: self.feedback,
config_warnings: self.config_warnings,
session_source: self.session_source,
enable_codex_api_key_env: self.enable_codex_api_key_env,
initialize,
channel_capacity: self.channel_capacity,
}
}
}
/// Internal command sent from public facade methods to the worker task.
///
/// Each variant carries a oneshot sender so the caller can `await` the
/// result without holding a mutable reference to the client.
enum ClientCommand {
Request {
request: Box<ClientRequest>,
response_tx: oneshot::Sender<IoResult<RequestResult>>,
},
Notify {
notification: ClientNotification,
response_tx: oneshot::Sender<IoResult<()>>,
},
ResolveServerRequest {
request_id: RequestId,
result: JsonRpcResult,
response_tx: oneshot::Sender<IoResult<()>>,
},
RejectServerRequest {
request_id: RequestId,
error: JSONRPCErrorError,
response_tx: oneshot::Sender<IoResult<()>>,
},
Shutdown {
response_tx: oneshot::Sender<IoResult<()>>,
},
}
/// Async facade over the in-process app-server runtime.
///
/// This type owns a worker task that bridges between:
/// - caller-facing async `mpsc` channels used by TUI/exec
/// - [`codex_app_server::in_process::InProcessClientHandle`], which speaks to
/// the embedded `MessageProcessor`
///
/// The facade intentionally preserves the server's request/notification/event
/// model instead of exposing direct core runtime handles. That keeps in-process
/// callers aligned with app-server behavior while still avoiding a process
/// boundary.
pub struct InProcessAppServerClient {
command_tx: mpsc::Sender<ClientCommand>,
event_rx: mpsc::Receiver<InProcessServerEvent>,
worker_handle: tokio::task::JoinHandle<()>,
auth_manager: Arc<AuthManager>,
thread_manager: Arc<ThreadManager>,
}
#[derive(Clone)]
pub struct InProcessAppServerRequestHandle {
command_tx: mpsc::Sender<ClientCommand>,
}
#[derive(Clone)]
pub enum AppServerRequestHandle {
InProcess(InProcessAppServerRequestHandle),
Remote(crate::remote::RemoteAppServerRequestHandle),
}
pub enum AppServerClient {
InProcess(InProcessAppServerClient),
Remote(RemoteAppServerClient),
}
impl InProcessAppServerClient {
/// Starts the in-process runtime and facade worker task.
///
/// The returned client is ready for requests and event consumption. If the
/// internal event queue is saturated later, server requests are rejected
/// with overload error instead of being silently dropped.
pub async fn start(args: InProcessClientStartArgs) -> IoResult<Self> {
let channel_capacity = args.channel_capacity.max(1);
let shared_core = args.shared_core_managers();
let mut handle =
codex_app_server::in_process::start(args.into_runtime_start_args(&shared_core)).await?;
let request_sender = handle.sender();
let (command_tx, mut command_rx) = mpsc::channel::<ClientCommand>(channel_capacity);
let (event_tx, event_rx) = mpsc::channel::<InProcessServerEvent>(channel_capacity);
let worker_handle = tokio::spawn(async move {
let mut event_stream_enabled = true;
let mut skipped_events = 0usize;
loop {
tokio::select! {
command = command_rx.recv() => {
match command {
Some(ClientCommand::Request { request, response_tx }) => {
let request_sender = request_sender.clone();
// Request waits happen on a detached task so
// this loop can keep draining runtime events
// while the request is blocked on client input.
tokio::spawn(async move {
let result = request_sender.request(*request).await;
let _ = response_tx.send(result);
});
}
Some(ClientCommand::Notify {
notification,
response_tx,
}) => {
let result = request_sender.notify(notification);
let _ = response_tx.send(result);
}
Some(ClientCommand::ResolveServerRequest {
request_id,
result,
response_tx,
}) => {
let send_result =
request_sender.respond_to_server_request(request_id, result);
let _ = response_tx.send(send_result);
}
Some(ClientCommand::RejectServerRequest {
request_id,
error,
response_tx,
}) => {
let send_result = request_sender.fail_server_request(request_id, error);
let _ = response_tx.send(send_result);
}
Some(ClientCommand::Shutdown { response_tx }) => {
let shutdown_result = handle.shutdown().await;
let _ = response_tx.send(shutdown_result);
break;
}
None => {
let _ = handle.shutdown().await;
break;
}
}
}
event = handle.next_event(), if event_stream_enabled => {
let Some(event) = event else {
break;
};
if skipped_events > 0 {
if event_requires_delivery(&event) {
// Surface lag before the terminal event, but
// do not let the lag marker itself cause us to
// drop the completion/abort notification that
// the caller is blocked on.
if event_tx
.send(InProcessServerEvent::Lagged {
skipped: skipped_events,
})
.await
.is_err()
{
event_stream_enabled = false;
continue;
}
skipped_events = 0;
} else {
match event_tx.try_send(InProcessServerEvent::Lagged {
skipped: skipped_events,
}) {
Ok(()) => {
skipped_events = 0;
}
Err(mpsc::error::TrySendError::Full(_)) => {
skipped_events = skipped_events.saturating_add(1);
warn!(
"dropping in-process app-server event because consumer queue is full"
);
if let InProcessServerEvent::ServerRequest(request) = event {
let _ = request_sender.fail_server_request(
request.id().clone(),
JSONRPCErrorError {
code: -32001,
message: "in-process app-server event queue is full".to_string(),
data: None,
},
);
}
continue;
}
Err(mpsc::error::TrySendError::Closed(_)) => {
event_stream_enabled = false;
continue;
}
}
}
}
if event_requires_delivery(&event) {
// Block until the consumer catches up for
// terminal notifications; this preserves the
// completion signal even when the queue is
// otherwise saturated.
if event_tx.send(event).await.is_err() {
event_stream_enabled = false;
}
continue;
}
match event_tx.try_send(event) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(event)) => {
skipped_events = skipped_events.saturating_add(1);
warn!("dropping in-process app-server event because consumer queue is full");
if let InProcessServerEvent::ServerRequest(request) = event {
let _ = request_sender.fail_server_request(
request.id().clone(),
JSONRPCErrorError {
code: -32001,
message: "in-process app-server event queue is full".to_string(),
data: None,
},
);
}
}
Err(mpsc::error::TrySendError::Closed(_)) => {
event_stream_enabled = false;
}
}
}
}
}
});
Ok(Self {
command_tx,
event_rx,
worker_handle,
auth_manager: shared_core.auth_manager,
thread_manager: shared_core.thread_manager,
})
}
/// Temporary bootstrap escape hatch for embedders migrating toward RPC-only usage.
pub fn auth_manager(&self) -> Arc<AuthManager> {
self.auth_manager.clone()
}
/// Temporary bootstrap escape hatch for embedders migrating toward RPC-only usage.
pub fn thread_manager(&self) -> Arc<ThreadManager> {
self.thread_manager.clone()
}
pub fn request_handle(&self) -> InProcessAppServerRequestHandle {
InProcessAppServerRequestHandle {
command_tx: self.command_tx.clone(),
}
}
/// Sends a typed client request and returns raw JSON-RPC result.
///
/// Callers that expect a concrete response type should usually prefer
/// [`request_typed`](Self::request_typed).
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(ClientCommand::Request {
request: Box::new(request),
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server request channel is closed",
)
})?
}
/// Sends a typed client request and decodes the successful response body.
///
/// This still deserializes from a JSON value produced by app-server's
/// JSON-RPC result envelope. Because the caller chooses `T`, `Deserialize`
/// failures indicate an internal request/response mismatch at the call site
/// (or an in-process bug), not transport skew from an external client.
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
where
T: DeserializeOwned,
{
let method = request_method_name(&request);
let response =
self.request(request)
.await
.map_err(|source| TypedRequestError::Transport {
method: method.clone(),
source,
})?;
let result = response.map_err(|source| TypedRequestError::Server {
method: method.clone(),
source,
})?;
serde_json::from_value(result)
.map_err(|source| TypedRequestError::Deserialize { method, source })
}
/// Sends a typed client notification.
pub async fn notify(&self, notification: ClientNotification) -> IoResult<()> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(ClientCommand::Notify {
notification,
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server notify channel is closed",
)
})?
}
/// Resolves a pending server request.
///
/// This should only be called with request IDs obtained from the current
/// client's event stream.
pub async fn resolve_server_request(
&self,
request_id: RequestId,
result: JsonRpcResult,
) -> IoResult<()> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(ClientCommand::ResolveServerRequest {
request_id,
result,
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server resolve channel is closed",
)
})?
}
/// Rejects a pending server request with JSON-RPC error payload.
pub async fn reject_server_request(
&self,
request_id: RequestId,
error: JSONRPCErrorError,
) -> IoResult<()> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(ClientCommand::RejectServerRequest {
request_id,
error,
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server reject channel is closed",
)
})?
}
/// Returns the next in-process event, or `None` when worker exits.
///
/// Callers are expected to drain this stream promptly. If they fall behind,
/// the worker emits [`InProcessServerEvent::Lagged`] markers and may reject
/// pending server requests rather than letting approval flows hang.
pub async fn next_event(&mut self) -> Option<InProcessServerEvent> {
self.event_rx.recv().await
}
/// Shuts down worker and in-process runtime with bounded wait.
///
/// If graceful shutdown exceeds timeout, the worker task is aborted to
/// avoid leaking background tasks in embedding callers.
pub async fn shutdown(self) -> IoResult<()> {
let Self {
command_tx,
event_rx,
worker_handle,
auth_manager: _,
thread_manager: _,
} = self;
let mut worker_handle = worker_handle;
// Drop the caller-facing receiver before asking the worker to shut
// down. That unblocks any pending must-deliver `event_tx.send(..)`
// so the worker can reach `handle.shutdown()` instead of timing out
// and getting aborted with the runtime still attached.
drop(event_rx);
let (response_tx, response_rx) = oneshot::channel();
if command_tx
.send(ClientCommand::Shutdown { response_tx })
.await
.is_ok()
&& let Ok(command_result) = timeout(SHUTDOWN_TIMEOUT, response_rx).await
{
command_result.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server shutdown channel is closed",
)
})??;
}
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut worker_handle).await {
worker_handle.abort();
let _ = worker_handle.await;
}
Ok(())
}
}
impl InProcessAppServerRequestHandle {
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(ClientCommand::Request {
request: Box::new(request),
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server request channel is closed",
)
})?
}
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
where
T: DeserializeOwned,
{
let method = request_method_name(&request);
let response =
self.request(request)
.await
.map_err(|source| TypedRequestError::Transport {
method: method.clone(),
source,
})?;
let result = response.map_err(|source| TypedRequestError::Server {
method: method.clone(),
source,
})?;
serde_json::from_value(result)
.map_err(|source| TypedRequestError::Deserialize { method, source })
}
}
impl AppServerRequestHandle {
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
match self {
Self::InProcess(handle) => handle.request(request).await,
Self::Remote(handle) => handle.request(request).await,
}
}
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
where
T: DeserializeOwned,
{
match self {
Self::InProcess(handle) => handle.request_typed(request).await,
Self::Remote(handle) => handle.request_typed(request).await,
}
}
}
impl AppServerClient {
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
match self {
Self::InProcess(client) => client.request(request).await,
Self::Remote(client) => client.request(request).await,
}
}
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
where
T: DeserializeOwned,
{
match self {
Self::InProcess(client) => client.request_typed(request).await,
Self::Remote(client) => client.request_typed(request).await,
}
}
pub async fn notify(&self, notification: ClientNotification) -> IoResult<()> {
match self {
Self::InProcess(client) => client.notify(notification).await,
Self::Remote(client) => client.notify(notification).await,
}
}
pub async fn resolve_server_request(
&self,
request_id: RequestId,
result: JsonRpcResult,
) -> IoResult<()> {
match self {
Self::InProcess(client) => client.resolve_server_request(request_id, result).await,
Self::Remote(client) => client.resolve_server_request(request_id, result).await,
}
}
pub async fn reject_server_request(
&self,
request_id: RequestId,
error: JSONRPCErrorError,
) -> IoResult<()> {
match self {
Self::InProcess(client) => client.reject_server_request(request_id, error).await,
Self::Remote(client) => client.reject_server_request(request_id, error).await,
}
}
pub async fn next_event(&mut self) -> Option<AppServerEvent> {
match self {
Self::InProcess(client) => client.next_event().await.map(Into::into),
Self::Remote(client) => client.next_event().await,
}
}
pub async fn shutdown(self) -> IoResult<()> {
match self {
Self::InProcess(client) => client.shutdown().await,
Self::Remote(client) => client.shutdown().await,
}
}
pub fn request_handle(&self) -> AppServerRequestHandle {
match self {
Self::InProcess(client) => AppServerRequestHandle::InProcess(client.request_handle()),
Self::Remote(client) => AppServerRequestHandle::Remote(client.request_handle()),
}
}
}
/// Extracts the JSON-RPC method name for diagnostics without extending the
/// protocol crate with in-process-only helpers.
pub(crate) fn request_method_name(request: &ClientRequest) -> String {
serde_json::to_value(request)
.ok()
.and_then(|value| {
value
.get("method")
.and_then(serde_json::Value::as_str)
.map(ToOwned::to_owned)
})
.unwrap_or_else(|| "<unknown>".to_string())
}
#[cfg(test)]
mod tests {
use super::*;
use codex_app_server_protocol::AccountUpdatedNotification;
use codex_app_server_protocol::ConfigRequirementsReadResponse;
use codex_app_server_protocol::GetAccountResponse;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::SessionSource as ApiSessionSource;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ToolRequestUserInputParams;
use codex_app_server_protocol::ToolRequestUserInputQuestion;
use codex_core::AuthManager;
use codex_core::ThreadManager;
use codex_core::config::ConfigBuilder;
use futures::SinkExt;
use futures::StreamExt;
use pretty_assertions::assert_eq;
use tokio::net::TcpListener;
use tokio::time::Duration;
use tokio::time::timeout;
use tokio_tungstenite::accept_async;
use tokio_tungstenite::tungstenite::Message;
async fn build_test_config() -> Config {
match ConfigBuilder::default().build().await {
Ok(config) => config,
Err(_) => Config::load_default_with_cli_overrides(Vec::new())
.expect("default config should load"),
}
}
async fn start_test_client_with_capacity(
session_source: SessionSource,
channel_capacity: usize,
) -> InProcessAppServerClient {
InProcessAppServerClient::start(InProcessClientStartArgs {
arg0_paths: Arg0DispatchPaths::default(),
config: Arc::new(build_test_config().await),
cli_overrides: Vec::new(),
loader_overrides: LoaderOverrides::default(),
cloud_requirements: CloudRequirementsLoader::default(),
feedback: CodexFeedback::new(),
config_warnings: Vec::new(),
session_source,
enable_codex_api_key_env: false,
client_name: "codex-app-server-client-test".to_string(),
client_version: "0.0.0-test".to_string(),
experimental_api: true,
opt_out_notification_methods: Vec::new(),
channel_capacity,
})
.await
.expect("in-process app-server client should start")
}
async fn start_test_client(session_source: SessionSource) -> InProcessAppServerClient {
start_test_client_with_capacity(session_source, DEFAULT_IN_PROCESS_CHANNEL_CAPACITY).await
}
async fn start_test_remote_server<F, Fut>(handler: F) -> String
where
F: FnOnce(tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>) -> Fut
+ Send
+ 'static,
Fut: std::future::Future<Output = ()> + Send + 'static,
{
let listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("listener should bind");
let addr = listener.local_addr().expect("listener address");
tokio::spawn(async move {
let (stream, _) = listener.accept().await.expect("accept should succeed");
let websocket = accept_async(stream)
.await
.expect("websocket upgrade should succeed");
handler(websocket).await;
});
format!("ws://{addr}")
}
async fn expect_remote_initialize(
websocket: &mut tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
) {
let JSONRPCMessage::Request(request) = read_websocket_message(websocket).await else {
panic!("expected initialize request");
};
assert_eq!(request.method, "initialize");
write_websocket_message(
websocket,
JSONRPCMessage::Response(JSONRPCResponse {
id: request.id,
result: serde_json::json!({}),
}),
)
.await;
let JSONRPCMessage::Notification(notification) = read_websocket_message(websocket).await
else {
panic!("expected initialized notification");
};
assert_eq!(notification.method, "initialized");
}
async fn read_websocket_message(
websocket: &mut tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
) -> JSONRPCMessage {
loop {
let frame = websocket
.next()
.await
.expect("frame should be available")
.expect("frame should decode");
match frame {
Message::Text(text) => {
return serde_json::from_str::<JSONRPCMessage>(&text)
.expect("text frame should be valid JSON-RPC");
}
Message::Binary(_) | Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => {
continue;
}
Message::Close(_) => panic!("unexpected close frame"),
}
}
}
async fn write_websocket_message(
websocket: &mut tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
message: JSONRPCMessage,
) {
websocket
.send(Message::Text(
serde_json::to_string(&message)
.expect("message should serialize")
.into(),
))
.await
.expect("message should send");
}
fn test_remote_connect_args(websocket_url: String) -> RemoteAppServerConnectArgs {
RemoteAppServerConnectArgs {
websocket_url,
client_name: "codex-app-server-client-test".to_string(),
client_version: "0.0.0-test".to_string(),
experimental_api: true,
opt_out_notification_methods: Vec::new(),
channel_capacity: 8,
}
}
#[tokio::test]
async fn typed_request_roundtrip_works() {
let client = start_test_client(SessionSource::Exec).await;
let _response: ConfigRequirementsReadResponse = client
.request_typed(ClientRequest::ConfigRequirementsRead {
request_id: RequestId::Integer(1),
params: None,
})
.await
.expect("typed request should succeed");
client.shutdown().await.expect("shutdown should complete");
}
#[tokio::test]
async fn typed_request_reports_json_rpc_errors() {
let client = start_test_client(SessionSource::Exec).await;
let err = client
.request_typed::<ConfigRequirementsReadResponse>(ClientRequest::ThreadRead {
request_id: RequestId::Integer(99),
params: codex_app_server_protocol::ThreadReadParams {
thread_id: "missing-thread".to_string(),
include_turns: false,
},
})
.await
.expect_err("missing thread should return a JSON-RPC error");
assert!(
err.to_string().starts_with("thread/read failed:"),
"expected method-qualified JSON-RPC failure message"
);
client.shutdown().await.expect("shutdown should complete");
}
#[tokio::test]
async fn caller_provided_session_source_is_applied() {
for (session_source, expected_source) in [
(SessionSource::Exec, ApiSessionSource::Exec),
(SessionSource::Cli, ApiSessionSource::Cli),
] {
let client = start_test_client(session_source).await;
let parsed: ThreadStartResponse = client
.request_typed(ClientRequest::ThreadStart {
request_id: RequestId::Integer(2),
params: ThreadStartParams {
ephemeral: Some(true),
..ThreadStartParams::default()
},
})
.await
.expect("thread/start should succeed");
assert_eq!(parsed.thread.source, expected_source);
client.shutdown().await.expect("shutdown should complete");
}
}
#[tokio::test]
async fn shared_thread_manager_tracks_threads_started_via_app_server() {
let client = start_test_client(SessionSource::Cli).await;
let response: ThreadStartResponse = client
.request_typed(ClientRequest::ThreadStart {
request_id: RequestId::Integer(3),
params: ThreadStartParams {
ephemeral: Some(true),
..ThreadStartParams::default()
},
})
.await
.expect("thread/start should succeed");
let created_thread_id = codex_protocol::ThreadId::from_string(&response.thread.id)
.expect("thread id should parse");
timeout(
Duration::from_secs(2),
client.thread_manager().get_thread(created_thread_id),
)
.await
.expect("timed out waiting for retained thread manager to observe started thread")
.expect("started thread should be visible through the shared thread manager");
let thread_ids = client.thread_manager().list_thread_ids().await;
assert!(thread_ids.contains(&created_thread_id));
client.shutdown().await.expect("shutdown should complete");
}
#[tokio::test]
async fn tiny_channel_capacity_still_supports_request_roundtrip() {
let client = start_test_client_with_capacity(SessionSource::Exec, 1).await;
let _response: ConfigRequirementsReadResponse = client
.request_typed(ClientRequest::ConfigRequirementsRead {
request_id: RequestId::Integer(1),
params: None,
})
.await
.expect("typed request should succeed");
client.shutdown().await.expect("shutdown should complete");
}
#[tokio::test]
async fn remote_typed_request_roundtrip_works() {
let websocket_url = start_test_remote_server(|mut websocket| async move {
expect_remote_initialize(&mut websocket).await;
let JSONRPCMessage::Request(request) = read_websocket_message(&mut websocket).await
else {
panic!("expected account/read request");
};
assert_eq!(request.method, "account/read");
write_websocket_message(
&mut websocket,
JSONRPCMessage::Response(JSONRPCResponse {
id: request.id,
result: serde_json::to_value(GetAccountResponse {
account: None,
requires_openai_auth: false,
})
.expect("response should serialize"),
}),
)
.await;
})
.await;
let client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
.await
.expect("remote client should connect");
let response: GetAccountResponse = client
.request_typed(ClientRequest::GetAccount {
request_id: RequestId::Integer(1),
params: codex_app_server_protocol::GetAccountParams {
refresh_token: false,
},
})
.await
.expect("typed request should succeed");
assert_eq!(response.account, None);
client.shutdown().await.expect("shutdown should complete");
}
#[tokio::test]
async fn remote_duplicate_request_id_keeps_original_waiter() {
let (first_request_seen_tx, first_request_seen_rx) = tokio::sync::oneshot::channel();
let websocket_url = start_test_remote_server(|mut websocket| async move {
expect_remote_initialize(&mut websocket).await;
let JSONRPCMessage::Request(request) = read_websocket_message(&mut websocket).await
else {
panic!("expected account/read request");
};
assert_eq!(request.method, "account/read");
first_request_seen_tx
.send(request.id.clone())
.expect("request id should send");
assert!(
timeout(
Duration::from_millis(100),
read_websocket_message(&mut websocket)
)
.await
.is_err(),
"duplicate request should not be forwarded to the server"
);
write_websocket_message(
&mut websocket,
JSONRPCMessage::Response(JSONRPCResponse {
id: request.id,
result: serde_json::to_value(GetAccountResponse {
account: None,
requires_openai_auth: false,
})
.expect("response should serialize"),
}),
)
.await;
let _ = websocket.next().await;
})
.await;
let client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
.await
.expect("remote client should connect");
let first_request_handle = client.request_handle();
let second_request_handle = first_request_handle.clone();
let first_request = tokio::spawn(async move {
first_request_handle
.request_typed::<GetAccountResponse>(ClientRequest::GetAccount {
request_id: RequestId::Integer(1),
params: codex_app_server_protocol::GetAccountParams {
refresh_token: false,
},
})
.await
});
let first_request_id = first_request_seen_rx
.await
.expect("server should observe the first request");
assert_eq!(first_request_id, RequestId::Integer(1));
let second_err = second_request_handle
.request_typed::<GetAccountResponse>(ClientRequest::GetAccount {
request_id: RequestId::Integer(1),
params: codex_app_server_protocol::GetAccountParams {
refresh_token: false,
},
})
.await
.expect_err("duplicate request id should be rejected");
assert_eq!(
second_err.to_string(),
"account/read transport error: duplicate remote app-server request id `1`"
);
let first_response = first_request
.await
.expect("first request task should join")
.expect("first request should succeed");
assert_eq!(
first_response,
GetAccountResponse {
account: None,
requires_openai_auth: false,
}
);
client.shutdown().await.expect("shutdown should complete");
}
#[tokio::test]
async fn remote_notifications_arrive_over_websocket() {
let websocket_url = start_test_remote_server(|mut websocket| async move {
expect_remote_initialize(&mut websocket).await;
write_websocket_message(
&mut websocket,
JSONRPCMessage::Notification(
serde_json::from_value(
serde_json::to_value(ServerNotification::AccountUpdated(
AccountUpdatedNotification {
auth_mode: None,
plan_type: None,
},
))
.expect("notification should serialize"),
)
.expect("notification should convert to JSON-RPC"),
),
)
.await;
})
.await;
let mut client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
.await
.expect("remote client should connect");
let event = client.next_event().await.expect("event should arrive");
assert!(matches!(
event,
AppServerEvent::ServerNotification(ServerNotification::AccountUpdated(_))
));
client.shutdown().await.expect("shutdown should complete");
}
#[tokio::test]
async fn remote_server_request_resolution_roundtrip_works() {
let websocket_url = start_test_remote_server(|mut websocket| async move {
expect_remote_initialize(&mut websocket).await;
let request_id = RequestId::String("srv-1".to_string());
let server_request = JSONRPCRequest {
id: request_id.clone(),
method: "item/tool/requestUserInput".to_string(),
params: Some(
serde_json::to_value(ToolRequestUserInputParams {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item_id: "call-1".to_string(),
questions: vec![ToolRequestUserInputQuestion {
id: "question-1".to_string(),
header: "Mode".to_string(),
question: "Pick one".to_string(),
is_other: false,
is_secret: false,
options: Some(vec![]),
}],
})
.expect("params should serialize"),
),
trace: None,
};
write_websocket_message(&mut websocket, JSONRPCMessage::Request(server_request)).await;
let JSONRPCMessage::Response(response) = read_websocket_message(&mut websocket).await
else {
panic!("expected server request response");
};
assert_eq!(response.id, request_id);
})
.await;
let mut client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
.await
.expect("remote client should connect");
let AppServerEvent::ServerRequest(request) = client
.next_event()
.await
.expect("request event should arrive")
else {
panic!("expected server request event");
};
client
.resolve_server_request(request.id().clone(), serde_json::json!({}))
.await
.expect("server request should resolve");
client.shutdown().await.expect("shutdown should complete");
}
#[tokio::test]
async fn remote_server_request_received_during_initialize_is_delivered() {
let websocket_url = start_test_remote_server(|mut websocket| async move {
let JSONRPCMessage::Request(request) = read_websocket_message(&mut websocket).await
else {
panic!("expected initialize request");
};
assert_eq!(request.method, "initialize");
let request_id = RequestId::String("srv-init".to_string());
write_websocket_message(
&mut websocket,
JSONRPCMessage::Request(JSONRPCRequest {
id: request_id.clone(),
method: "item/tool/requestUserInput".to_string(),
params: Some(
serde_json::to_value(ToolRequestUserInputParams {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item_id: "call-1".to_string(),
questions: vec![ToolRequestUserInputQuestion {
id: "question-1".to_string(),
header: "Mode".to_string(),
question: "Pick one".to_string(),
is_other: false,
is_secret: false,
options: Some(vec![]),
}],
})
.expect("params should serialize"),
),
trace: None,
}),
)
.await;
write_websocket_message(
&mut websocket,
JSONRPCMessage::Response(JSONRPCResponse {
id: request.id,
result: serde_json::json!({}),
}),
)
.await;
let JSONRPCMessage::Notification(notification) =
read_websocket_message(&mut websocket).await
else {
panic!("expected initialized notification");
};
assert_eq!(notification.method, "initialized");
let JSONRPCMessage::Response(response) = read_websocket_message(&mut websocket).await
else {
panic!("expected server request response");
};
assert_eq!(response.id, request_id);
})
.await;
let mut client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
.await
.expect("remote client should connect");
let AppServerEvent::ServerRequest(request) = client
.next_event()
.await
.expect("request event should arrive")
else {
panic!("expected server request event");
};
client
.resolve_server_request(request.id().clone(), serde_json::json!({}))
.await
.expect("server request should resolve");
client.shutdown().await.expect("shutdown should complete");
}
#[tokio::test]
async fn remote_unknown_server_request_is_rejected() {
let websocket_url = start_test_remote_server(|mut websocket| async move {
expect_remote_initialize(&mut websocket).await;
let request_id = RequestId::String("srv-unknown".to_string());
write_websocket_message(
&mut websocket,
JSONRPCMessage::Request(JSONRPCRequest {
id: request_id.clone(),
method: "thread/unknown".to_string(),
params: None,
trace: None,
}),
)
.await;
let JSONRPCMessage::Error(response) = read_websocket_message(&mut websocket).await
else {
panic!("expected JSON-RPC error response");
};
assert_eq!(response.id, request_id);
assert_eq!(response.error.code, -32601);
assert_eq!(
response.error.message,
"unsupported remote app-server request `thread/unknown`"
);
})
.await;
let client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
.await
.expect("remote client should connect");
client.shutdown().await.expect("shutdown should complete");
}
#[tokio::test]
async fn remote_disconnect_surfaces_as_event() {
let websocket_url = start_test_remote_server(|mut websocket| async move {
expect_remote_initialize(&mut websocket).await;
websocket.close(None).await.expect("close should succeed");
})
.await;
let mut client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
.await
.expect("remote client should connect");
let event = client
.next_event()
.await
.expect("disconnect event should arrive");
assert!(matches!(event, AppServerEvent::Disconnected { .. }));
}
#[test]
fn typed_request_error_exposes_sources() {
let transport = TypedRequestError::Transport {
method: "config/read".to_string(),
source: IoError::new(ErrorKind::BrokenPipe, "closed"),
};
assert_eq!(std::error::Error::source(&transport).is_some(), true);
let server = TypedRequestError::Server {
method: "thread/read".to_string(),
source: JSONRPCErrorError {
code: -32603,
data: None,
message: "internal".to_string(),
},
};
assert_eq!(std::error::Error::source(&server).is_some(), false);
let deserialize = TypedRequestError::Deserialize {
method: "thread/start".to_string(),
source: serde_json::from_str::<u32>("\"nope\"")
.expect_err("invalid integer should return deserialize error"),
};
assert_eq!(std::error::Error::source(&deserialize).is_some(), true);
}
#[tokio::test]
async fn next_event_surfaces_lagged_markers() {
let (command_tx, _command_rx) = mpsc::channel(1);
let (event_tx, event_rx) = mpsc::channel(1);
let worker_handle = tokio::spawn(async {});
let config = build_test_config().await;
let auth_manager = AuthManager::shared(
config.codex_home.clone(),
false,
config.cli_auth_credentials_store_mode,
);
let thread_manager = Arc::new(ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Exec,
CollaborationModesConfig {
default_mode_request_user_input: config
.features
.enabled(Feature::DefaultModeRequestUserInput),
},
));
event_tx
.send(InProcessServerEvent::Lagged { skipped: 3 })
.await
.expect("lagged marker should enqueue");
drop(event_tx);
let mut client = InProcessAppServerClient {
command_tx,
event_rx,
worker_handle,
auth_manager,
thread_manager,
};
let event = timeout(Duration::from_secs(2), client.next_event())
.await
.expect("lagged marker should arrive before timeout");
assert!(matches!(
event,
Some(InProcessServerEvent::Lagged { skipped: 3 })
));
client.shutdown().await.expect("shutdown should complete");
}
#[test]
fn event_requires_delivery_marks_terminal_events() {
assert!(event_requires_delivery(
&InProcessServerEvent::ServerNotification(
codex_app_server_protocol::ServerNotification::TurnCompleted(
codex_app_server_protocol::TurnCompletedNotification {
thread_id: "thread".to_string(),
turn: codex_app_server_protocol::Turn {
id: "turn".to_string(),
items: Vec::new(),
status: codex_app_server_protocol::TurnStatus::Completed,
error: None,
},
}
)
)
));
assert!(event_requires_delivery(
&InProcessServerEvent::LegacyNotification(
codex_app_server_protocol::JSONRPCNotification {
method: "codex/event/turn_aborted".to_string(),
params: None,
}
)
));
assert!(!event_requires_delivery(&InProcessServerEvent::Lagged {
skipped: 1
}));
}
#[tokio::test]
async fn accessors_expose_retained_shared_managers() {
let client = start_test_client(SessionSource::Cli).await;
assert!(
Arc::ptr_eq(&client.auth_manager(), &client.auth_manager()),
"auth_manager accessor should clone the retained shared manager"
);
assert!(
Arc::ptr_eq(&client.thread_manager(), &client.thread_manager()),
"thread_manager accessor should clone the retained shared manager"
);
client.shutdown().await.expect("shutdown should complete");
}
#[tokio::test]
async fn shutdown_completes_promptly_with_retained_shared_managers() {
let client = start_test_client(SessionSource::Cli).await;
timeout(Duration::from_secs(1), client.shutdown())
.await
.expect("shutdown should not wait for the 5s fallback timeout")
.expect("shutdown should complete");
}
}