core-agent-ide/codex-rs/app-server/src/in_process.rs
Eric Traut 9dba7337f2
Start TUI on embedded app server (#14512)
This PR is part of the effort to move the TUI on top of the app server.
In a previous PR, we introduced an in-process app server and moved
`exec` on top of it.

For the TUI, we want to do the migration in stages. The app server
doesn't currently expose all of the functionality required by the TUI,
so we're going to need to support a hybrid approach as we make the
transition.

This PR changes the TUI initialization to instantiate an in-process app
server and access its `AuthManager` and `ThreadManager` rather than
constructing its own copies. It also adds a placeholder TUI event
handler that will eventually translate app server events into TUI
events. App server notifications are accepted but ignored for now. It
also adds proper shutdown of the app server when the TUI terminates.
2026-03-13 12:04:41 -06:00

898 lines
39 KiB
Rust

//! In-process app-server runtime host for local embedders.
//!
//! This module runs the existing [`MessageProcessor`] and outbound routing logic
//! on Tokio tasks, but replaces socket/stdio transports with bounded in-memory
//! channels. The intent is to preserve app-server semantics while avoiding a
//! process boundary for CLI surfaces that run in the same process.
//!
//! # Lifecycle
//!
//! 1. Construct runtime state with [`InProcessStartArgs`].
//! 2. Call [`start`], which performs the `initialize` / `initialized` handshake
//! internally and returns a ready-to-use [`InProcessClientHandle`].
//! 3. Send requests via [`InProcessClientHandle::request`], notifications via
//! [`InProcessClientHandle::notify`], and consume events via
//! [`InProcessClientHandle::next_event`].
//! 4. Terminate with [`InProcessClientHandle::shutdown`].
//!
//! # Transport model
//!
//! The runtime is transport-local but not protocol-free. Incoming requests are
//! typed [`ClientRequest`] values, yet responses still come back through the
//! same JSON-RPC result envelope that `MessageProcessor` uses for stdio and
//! websocket transports. This keeps in-process behavior aligned with
//! app-server rather than creating a second execution contract.
//!
//! # Backpressure
//!
//! Command submission uses `try_send` and can return `WouldBlock`, while event
//! fanout may drop notifications under saturation. Server requests are never
//! silently abandoned: if they cannot be queued they are failed back into
//! `MessageProcessor` with overload or internal errors so approval flows do
//! not hang indefinitely.
//!
//! # Relationship to `codex-app-server-client`
//!
//! This module provides the low-level runtime handle ([`InProcessClientHandle`]).
//! Higher-level callers (TUI, exec) should go through `codex-app-server-client`,
//! which wraps this module behind a worker task with async request/response
//! helpers, surface-specific startup policy, and bounded shutdown.
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::hash_map::Entry;
use std::io::Error as IoError;
use std::io::ErrorKind;
use std::io::Result as IoResult;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::error_code::OVERLOADED_ERROR_CODE;
use crate::message_processor::ConnectionSessionState;
use crate::message_processor::MessageProcessor;
use crate::message_processor::MessageProcessorArgs;
use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::OutgoingEnvelope;
use crate::outgoing_message::OutgoingMessage;
use crate::outgoing_message::OutgoingMessageSender;
use crate::transport::CHANNEL_CAPACITY;
use crate::transport::OutboundConnectionState;
use crate::transport::route_outgoing_envelope;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_arg0::Arg0DispatchPaths;
use codex_core::AuthManager;
use codex_core::ThreadManager;
use codex_core::config::Config;
use codex_core::config_loader::CloudRequirementsLoader;
use codex_core::config_loader::LoaderOverrides;
use codex_feedback::CodexFeedback;
use codex_protocol::protocol::SessionSource;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::timeout;
use toml::Value as TomlValue;
use tracing::warn;
const IN_PROCESS_CONNECTION_ID: ConnectionId = ConnectionId(0);
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
/// Default bounded channel capacity for in-process runtime queues.
pub const DEFAULT_IN_PROCESS_CHANNEL_CAPACITY: usize = CHANNEL_CAPACITY;
type PendingClientRequestResponse = std::result::Result<Result, JSONRPCErrorError>;
fn server_notification_requires_delivery(notification: &ServerNotification) -> bool {
matches!(notification, ServerNotification::TurnCompleted(_))
}
fn legacy_notification_requires_delivery(notification: &JSONRPCNotification) -> bool {
matches!(
notification
.method
.strip_prefix("codex/event/")
.unwrap_or(&notification.method),
"task_complete" | "turn_aborted" | "shutdown_complete"
)
}
/// Input needed to start an in-process app-server runtime.
///
/// These fields mirror the pieces of ambient process state that stdio and
/// websocket transports normally assemble before `MessageProcessor` starts.
#[derive(Clone)]
pub struct InProcessStartArgs {
/// Resolved argv0 dispatch paths used by command execution internals.
pub arg0_paths: Arg0DispatchPaths,
/// Shared base config used to initialize core components.
pub config: Arc<Config>,
/// CLI config overrides that are already parsed into TOML values.
pub cli_overrides: Vec<(String, TomlValue)>,
/// Loader override knobs used by config API paths.
pub loader_overrides: LoaderOverrides,
/// Preloaded cloud requirements provider.
pub cloud_requirements: CloudRequirementsLoader,
/// Optional prebuilt auth manager reused by an embedding caller.
pub auth_manager: Option<Arc<AuthManager>>,
/// Optional prebuilt thread manager reused by an embedding caller.
pub thread_manager: Option<Arc<ThreadManager>>,
/// Feedback sink used by app-server/core telemetry and logs.
pub feedback: CodexFeedback,
/// Startup warnings emitted after initialize succeeds.
pub config_warnings: Vec<ConfigWarningNotification>,
/// Session source stamped into thread/session metadata.
pub session_source: SessionSource,
/// Whether auth loading should honor the `CODEX_API_KEY` environment variable.
pub enable_codex_api_key_env: bool,
/// Initialize params used for initial handshake.
pub initialize: InitializeParams,
/// Capacity used for all runtime queues (clamped to at least 1).
pub channel_capacity: usize,
}
/// Event emitted from the app-server to the in-process client.
///
/// The stream carries three event families because CLI surfaces are mid-migration
/// from the legacy `codex_protocol::Event` model to the typed app-server
/// notification model. Once all surfaces consume only [`ServerNotification`],
/// [`LegacyNotification`](Self::LegacyNotification) can be removed.
///
/// [`Lagged`](Self::Lagged) is a transport health marker, not an application
/// event — it signals that the consumer fell behind and some events were dropped.
#[derive(Debug, Clone)]
pub enum InProcessServerEvent {
/// Server request that requires client response/rejection.
ServerRequest(ServerRequest),
/// App-server notification directed to the embedded client.
ServerNotification(ServerNotification),
/// Legacy JSON-RPC notification from core event bridge.
LegacyNotification(JSONRPCNotification),
/// Indicates one or more events were dropped due to backpressure.
Lagged { skipped: usize },
}
/// Internal message sent from [`InProcessClientHandle`] methods to the runtime task.
///
/// Requests carry a oneshot sender for the response; notifications and server-request
/// replies are fire-and-forget from the caller's perspective (transport errors are
/// caught by `try_send` on the outer channel).
enum InProcessClientMessage {
Request {
request: Box<ClientRequest>,
response_tx: oneshot::Sender<PendingClientRequestResponse>,
},
Notification {
notification: ClientNotification,
},
ServerRequestResponse {
request_id: RequestId,
result: Result,
},
ServerRequestError {
request_id: RequestId,
error: JSONRPCErrorError,
},
Shutdown {
done_tx: oneshot::Sender<()>,
},
}
enum ProcessorCommand {
Request(Box<ClientRequest>),
Notification(ClientNotification),
}
#[derive(Clone)]
pub struct InProcessClientSender {
client_tx: mpsc::Sender<InProcessClientMessage>,
}
impl InProcessClientSender {
pub async fn request(&self, request: ClientRequest) -> IoResult<PendingClientRequestResponse> {
let (response_tx, response_rx) = oneshot::channel();
self.try_send_client_message(InProcessClientMessage::Request {
request: Box::new(request),
response_tx,
})?;
response_rx.await.map_err(|err| {
IoError::new(
ErrorKind::BrokenPipe,
format!("in-process request response channel closed: {err}"),
)
})
}
pub fn notify(&self, notification: ClientNotification) -> IoResult<()> {
self.try_send_client_message(InProcessClientMessage::Notification { notification })
}
pub fn respond_to_server_request(&self, request_id: RequestId, result: Result) -> IoResult<()> {
self.try_send_client_message(InProcessClientMessage::ServerRequestResponse {
request_id,
result,
})
}
pub fn fail_server_request(
&self,
request_id: RequestId,
error: JSONRPCErrorError,
) -> IoResult<()> {
self.try_send_client_message(InProcessClientMessage::ServerRequestError {
request_id,
error,
})
}
fn try_send_client_message(&self, message: InProcessClientMessage) -> IoResult<()> {
match self.client_tx.try_send(message) {
Ok(()) => Ok(()),
Err(mpsc::error::TrySendError::Full(_)) => Err(IoError::new(
ErrorKind::WouldBlock,
"in-process app-server client queue is full",
)),
Err(mpsc::error::TrySendError::Closed(_)) => Err(IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server runtime is closed",
)),
}
}
}
/// Handle used by an in-process client to call app-server and consume events.
///
/// This is the low-level runtime handle. Higher-level callers should usually go
/// through `codex-app-server-client`, which adds worker-task buffering,
/// request/response helpers, and surface-specific startup policy.
pub struct InProcessClientHandle {
client: InProcessClientSender,
event_rx: mpsc::Receiver<InProcessServerEvent>,
runtime_handle: tokio::task::JoinHandle<()>,
}
impl InProcessClientHandle {
/// Sends a typed client request into the in-process runtime.
///
/// The returned value is a transport-level `IoResult` containing either a
/// JSON-RPC success payload or JSON-RPC error payload. Callers must keep
/// request IDs unique among concurrent requests; reusing an in-flight ID
/// produces an `INVALID_REQUEST` response and can make request routing
/// ambiguous in the caller.
pub async fn request(&self, request: ClientRequest) -> IoResult<PendingClientRequestResponse> {
self.client.request(request).await
}
/// Sends a typed client notification into the in-process runtime.
///
/// Notifications do not have an application-level response. Transport
/// errors indicate queue saturation or closed runtime.
pub fn notify(&self, notification: ClientNotification) -> IoResult<()> {
self.client.notify(notification)
}
/// Resolves a pending [`ServerRequest`](InProcessServerEvent::ServerRequest).
///
/// This should be used only with request IDs received from the current
/// runtime event stream; sending arbitrary IDs has no effect on app-server
/// state and can mask a stuck approval flow in the caller.
pub fn respond_to_server_request(&self, request_id: RequestId, result: Result) -> IoResult<()> {
self.client.respond_to_server_request(request_id, result)
}
/// Rejects a pending [`ServerRequest`](InProcessServerEvent::ServerRequest).
///
/// Use this when the embedder cannot satisfy a server request; leaving
/// requests unanswered can stall turn progress.
pub fn fail_server_request(
&self,
request_id: RequestId,
error: JSONRPCErrorError,
) -> IoResult<()> {
self.client.fail_server_request(request_id, error)
}
/// Receives the next server event from the in-process runtime.
///
/// Returns `None` when the runtime task exits and no more events are
/// available.
pub async fn next_event(&mut self) -> Option<InProcessServerEvent> {
self.event_rx.recv().await
}
/// Requests runtime shutdown and waits for worker termination.
///
/// Shutdown is bounded by internal timeouts and may abort background tasks
/// if graceful drain does not complete in time.
pub async fn shutdown(self) -> IoResult<()> {
let mut runtime_handle = self.runtime_handle;
let (done_tx, done_rx) = oneshot::channel();
if self
.client
.client_tx
.send(InProcessClientMessage::Shutdown { done_tx })
.await
.is_ok()
{
let _ = timeout(SHUTDOWN_TIMEOUT, done_rx).await;
}
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut runtime_handle).await {
runtime_handle.abort();
let _ = runtime_handle.await;
}
Ok(())
}
pub fn sender(&self) -> InProcessClientSender {
self.client.clone()
}
}
/// Starts an in-process app-server runtime and performs initialize handshake.
///
/// This function sends `initialize` followed by `initialized` before returning
/// the handle, so callers receive a ready-to-use runtime. If initialize fails,
/// the runtime is shut down and an `InvalidData` error is returned.
pub async fn start(args: InProcessStartArgs) -> IoResult<InProcessClientHandle> {
let initialize = args.initialize.clone();
let client = start_uninitialized(args);
let initialize_response = client
.request(ClientRequest::Initialize {
request_id: RequestId::Integer(0),
params: initialize,
})
.await?;
if let Err(error) = initialize_response {
let _ = client.shutdown().await;
return Err(IoError::new(
ErrorKind::InvalidData,
format!("in-process initialize failed: {}", error.message),
));
}
client.notify(ClientNotification::Initialized)?;
Ok(client)
}
fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
let channel_capacity = args.channel_capacity.max(1);
let (client_tx, mut client_rx) = mpsc::channel::<InProcessClientMessage>(channel_capacity);
let (event_tx, event_rx) = mpsc::channel::<InProcessServerEvent>(channel_capacity);
let runtime_handle = tokio::spawn(async move {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(channel_capacity);
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
let (writer_tx, mut writer_rx) = mpsc::channel::<OutgoingMessage>(channel_capacity);
let outbound_initialized = Arc::new(AtomicBool::new(false));
let outbound_experimental_api_enabled = Arc::new(AtomicBool::new(false));
let outbound_opted_out_notification_methods = Arc::new(RwLock::new(HashSet::new()));
let mut outbound_connections = HashMap::<ConnectionId, OutboundConnectionState>::new();
outbound_connections.insert(
IN_PROCESS_CONNECTION_ID,
OutboundConnectionState::new(
writer_tx,
Arc::clone(&outbound_initialized),
Arc::clone(&outbound_experimental_api_enabled),
Arc::clone(&outbound_opted_out_notification_methods),
true,
None,
),
);
let mut outbound_handle = tokio::spawn(async move {
while let Some(envelope) = outgoing_rx.recv().await {
route_outgoing_envelope(&mut outbound_connections, envelope).await;
}
});
let processor_outgoing = Arc::clone(&outgoing_message_sender);
let (processor_tx, mut processor_rx) = mpsc::channel::<ProcessorCommand>(channel_capacity);
let mut processor_handle = tokio::spawn(async move {
let mut processor = MessageProcessor::new(MessageProcessorArgs {
outgoing: Arc::clone(&processor_outgoing),
arg0_paths: args.arg0_paths,
config: args.config,
cli_overrides: args.cli_overrides,
loader_overrides: args.loader_overrides,
cloud_requirements: args.cloud_requirements,
auth_manager: args.auth_manager,
thread_manager: args.thread_manager,
feedback: args.feedback,
log_db: None,
config_warnings: args.config_warnings,
session_source: args.session_source,
enable_codex_api_key_env: args.enable_codex_api_key_env,
});
let mut thread_created_rx = processor.thread_created_receiver();
let mut session = ConnectionSessionState::default();
let mut listen_for_threads = true;
loop {
tokio::select! {
command = processor_rx.recv() => {
match command {
Some(ProcessorCommand::Request(request)) => {
let was_initialized = session.initialized;
processor
.process_client_request(
IN_PROCESS_CONNECTION_ID,
*request,
&mut session,
&outbound_initialized,
)
.await;
if let Ok(mut opted_out_notification_methods) =
outbound_opted_out_notification_methods.write()
{
*opted_out_notification_methods =
session.opted_out_notification_methods.clone();
} else {
warn!("failed to update outbound opted-out notifications");
}
outbound_experimental_api_enabled.store(
session.experimental_api_enabled,
Ordering::Release,
);
if !was_initialized && session.initialized {
processor.send_initialize_notifications().await;
}
}
Some(ProcessorCommand::Notification(notification)) => {
processor.process_client_notification(notification).await;
}
None => {
break;
}
}
}
created = thread_created_rx.recv(), if listen_for_threads => {
match created {
Ok(thread_id) => {
let connection_ids = if session.initialized {
vec![IN_PROCESS_CONNECTION_ID]
} else {
Vec::<ConnectionId>::new()
};
processor
.try_attach_thread_listener(thread_id, connection_ids)
.await;
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
warn!("thread_created receiver lagged; skipping resync");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
listen_for_threads = false;
}
}
}
}
}
processor.clear_runtime_references();
processor.drain_background_tasks().await;
processor.shutdown_threads().await;
processor.connection_closed(IN_PROCESS_CONNECTION_ID).await;
});
let mut pending_request_responses =
HashMap::<RequestId, oneshot::Sender<PendingClientRequestResponse>>::new();
let mut shutdown_ack = None;
loop {
tokio::select! {
message = client_rx.recv() => {
match message {
Some(InProcessClientMessage::Request { request, response_tx }) => {
let request = *request;
let request_id = request.id().clone();
match pending_request_responses.entry(request_id.clone()) {
Entry::Vacant(entry) => {
entry.insert(response_tx);
}
Entry::Occupied(_) => {
let _ = response_tx.send(Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("duplicate request id: {request_id:?}"),
data: None,
}));
continue;
}
}
match processor_tx.try_send(ProcessorCommand::Request(Box::new(request))) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(_)) => {
if let Some(response_tx) =
pending_request_responses.remove(&request_id)
{
let _ = response_tx.send(Err(JSONRPCErrorError {
code: OVERLOADED_ERROR_CODE,
message: "in-process app-server request queue is full"
.to_string(),
data: None,
}));
}
}
Err(mpsc::error::TrySendError::Closed(_)) => {
if let Some(response_tx) =
pending_request_responses.remove(&request_id)
{
let _ = response_tx.send(Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message:
"in-process app-server request processor is closed"
.to_string(),
data: None,
}));
}
break;
}
}
}
Some(InProcessClientMessage::Notification { notification }) => {
match processor_tx.try_send(ProcessorCommand::Notification(notification)) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(_)) => {
warn!("dropping in-process client notification (queue full)");
}
Err(mpsc::error::TrySendError::Closed(_)) => {
break;
}
}
}
Some(InProcessClientMessage::ServerRequestResponse { request_id, result }) => {
outgoing_message_sender
.notify_client_response(request_id, result)
.await;
}
Some(InProcessClientMessage::ServerRequestError { request_id, error }) => {
outgoing_message_sender
.notify_client_error(request_id, error)
.await;
}
Some(InProcessClientMessage::Shutdown { done_tx }) => {
shutdown_ack = Some(done_tx);
break;
}
None => {
break;
}
}
}
outgoing_message = writer_rx.recv() => {
let Some(outgoing_message) = outgoing_message else {
break;
};
match outgoing_message {
OutgoingMessage::Response(response) => {
if let Some(response_tx) = pending_request_responses.remove(&response.id) {
let _ = response_tx.send(Ok(response.result));
} else {
warn!(
request_id = ?response.id,
"dropping unmatched in-process response"
);
}
}
OutgoingMessage::Error(error) => {
if let Some(response_tx) = pending_request_responses.remove(&error.id) {
let _ = response_tx.send(Err(error.error));
} else {
warn!(
request_id = ?error.id,
"dropping unmatched in-process error response"
);
}
}
OutgoingMessage::Request(request) => {
// Send directly to avoid cloning; on failure the
// original value is returned inside the error.
if let Err(send_error) = event_tx
.try_send(InProcessServerEvent::ServerRequest(request))
{
let (code, message, inner) = match send_error {
mpsc::error::TrySendError::Full(inner) => (
OVERLOADED_ERROR_CODE,
"in-process server request queue is full",
inner,
),
mpsc::error::TrySendError::Closed(inner) => (
INTERNAL_ERROR_CODE,
"in-process server request consumer is closed",
inner,
),
};
let request_id = match inner {
InProcessServerEvent::ServerRequest(req) => req.id().clone(),
_ => unreachable!("we just sent a ServerRequest variant"),
};
outgoing_message_sender
.notify_client_error(
request_id,
JSONRPCErrorError {
code,
message: message.to_string(),
data: None,
},
)
.await;
}
}
OutgoingMessage::AppServerNotification(notification) => {
if server_notification_requires_delivery(&notification) {
if event_tx
.send(InProcessServerEvent::ServerNotification(notification))
.await
.is_err()
{
break;
}
} else if let Err(send_error) =
event_tx.try_send(InProcessServerEvent::ServerNotification(notification))
{
match send_error {
mpsc::error::TrySendError::Full(_) => {
warn!("dropping in-process server notification (queue full)");
}
mpsc::error::TrySendError::Closed(_) => {
break;
}
}
}
}
OutgoingMessage::Notification(notification) => {
let notification = JSONRPCNotification {
method: notification.method,
params: notification.params,
};
if legacy_notification_requires_delivery(&notification) {
if event_tx
.send(InProcessServerEvent::LegacyNotification(notification))
.await
.is_err()
{
break;
}
} else if let Err(send_error) =
event_tx.try_send(InProcessServerEvent::LegacyNotification(notification))
{
match send_error {
mpsc::error::TrySendError::Full(_) => {
warn!("dropping in-process legacy notification (queue full)");
}
mpsc::error::TrySendError::Closed(_) => {
break;
}
}
}
}
}
}
}
}
drop(writer_rx);
drop(processor_tx);
outgoing_message_sender
.cancel_all_requests(Some(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "in-process app-server runtime is shutting down".to_string(),
data: None,
}))
.await;
// Drop the runtime's last sender before awaiting the router task so
// `outgoing_rx.recv()` can observe channel closure and exit cleanly.
drop(outgoing_message_sender);
for (_, response_tx) in pending_request_responses {
let _ = response_tx.send(Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "in-process app-server runtime is shutting down".to_string(),
data: None,
}));
}
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut processor_handle).await {
processor_handle.abort();
let _ = processor_handle.await;
}
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut outbound_handle).await {
outbound_handle.abort();
let _ = outbound_handle.await;
}
if let Some(done_tx) = shutdown_ack {
let _ = done_tx.send(());
}
});
InProcessClientHandle {
client: InProcessClientSender { client_tx },
event_rx,
runtime_handle,
}
}
#[cfg(test)]
mod tests {
use super::*;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ConfigRequirementsReadResponse;
use codex_app_server_protocol::SessionSource as ApiSessionSource;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnStatus;
use codex_core::config::ConfigBuilder;
use pretty_assertions::assert_eq;
async fn build_test_config() -> Config {
match ConfigBuilder::default().build().await {
Ok(config) => config,
Err(_) => Config::load_default_with_cli_overrides(Vec::new())
.expect("default config should load"),
}
}
async fn start_test_client_with_capacity(
session_source: SessionSource,
channel_capacity: usize,
) -> InProcessClientHandle {
let args = InProcessStartArgs {
arg0_paths: Arg0DispatchPaths::default(),
config: Arc::new(build_test_config().await),
cli_overrides: Vec::new(),
loader_overrides: LoaderOverrides::default(),
cloud_requirements: CloudRequirementsLoader::default(),
auth_manager: None,
thread_manager: None,
feedback: CodexFeedback::new(),
config_warnings: Vec::new(),
session_source,
enable_codex_api_key_env: false,
initialize: InitializeParams {
client_info: ClientInfo {
name: "codex-in-process-test".to_string(),
title: None,
version: "0.0.0".to_string(),
},
capabilities: None,
},
channel_capacity,
};
start(args).await.expect("in-process runtime should start")
}
async fn start_test_client(session_source: SessionSource) -> InProcessClientHandle {
start_test_client_with_capacity(session_source, DEFAULT_IN_PROCESS_CHANNEL_CAPACITY).await
}
#[tokio::test]
async fn in_process_start_initializes_and_handles_typed_v2_request() {
let client = start_test_client(SessionSource::Cli).await;
let response = client
.request(ClientRequest::ConfigRequirementsRead {
request_id: RequestId::Integer(1),
params: None,
})
.await
.expect("request transport should work")
.expect("request should succeed");
assert!(response.is_object());
let _parsed: ConfigRequirementsReadResponse =
serde_json::from_value(response).expect("response should match v2 schema");
client
.shutdown()
.await
.expect("in-process runtime should shutdown cleanly");
}
#[tokio::test]
async fn in_process_start_uses_requested_session_source_for_thread_start() {
for (requested_source, expected_source) in [
(SessionSource::Cli, ApiSessionSource::Cli),
(SessionSource::Exec, ApiSessionSource::Exec),
] {
let client = start_test_client(requested_source).await;
let response = client
.request(ClientRequest::ThreadStart {
request_id: RequestId::Integer(2),
params: ThreadStartParams {
ephemeral: Some(true),
..ThreadStartParams::default()
},
})
.await
.expect("request transport should work")
.expect("thread/start should succeed");
let parsed: ThreadStartResponse =
serde_json::from_value(response).expect("thread/start response should parse");
assert_eq!(parsed.thread.source, expected_source);
client
.shutdown()
.await
.expect("in-process runtime should shutdown cleanly");
}
}
#[tokio::test]
async fn in_process_start_clamps_zero_channel_capacity() {
let client = start_test_client_with_capacity(SessionSource::Cli, 0).await;
let response = loop {
match client
.request(ClientRequest::ConfigRequirementsRead {
request_id: RequestId::Integer(4),
params: None,
})
.await
{
Ok(response) => break response.expect("request should succeed"),
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
tokio::task::yield_now().await;
}
Err(err) => panic!("request transport should work: {err}"),
}
};
let _parsed: ConfigRequirementsReadResponse =
serde_json::from_value(response).expect("response should match v2 schema");
client
.shutdown()
.await
.expect("in-process runtime should shutdown cleanly");
}
#[test]
fn guaranteed_delivery_helpers_cover_terminal_notifications() {
assert!(server_notification_requires_delivery(
&ServerNotification::TurnCompleted(TurnCompletedNotification {
thread_id: "thread-1".to_string(),
turn: Turn {
id: "turn-1".to_string(),
items: Vec::new(),
status: TurnStatus::Completed,
error: None,
},
})
));
assert!(legacy_notification_requires_delivery(
&JSONRPCNotification {
method: "codex/event/task_complete".to_string(),
params: None,
}
));
assert!(legacy_notification_requires_delivery(
&JSONRPCNotification {
method: "codex/event/turn_aborted".to_string(),
params: None,
}
));
assert!(legacy_notification_requires_delivery(
&JSONRPCNotification {
method: "codex/event/shutdown_complete".to_string(),
params: None,
}
));
assert!(!legacy_notification_requires_delivery(
&JSONRPCNotification {
method: "codex/event/item_started".to_string(),
params: None,
}
));
}
}