Reverts openai/codex#15020 I messed up the commit in my PR and accidentally merged changes that were still under review.
899 lines
39 KiB
Rust
899 lines
39 KiB
Rust
//! In-process app-server runtime host for local embedders.
|
|
//!
|
|
//! This module runs the existing [`MessageProcessor`] and outbound routing logic
|
|
//! on Tokio tasks, but replaces socket/stdio transports with bounded in-memory
|
|
//! channels. The intent is to preserve app-server semantics while avoiding a
|
|
//! process boundary for CLI surfaces that run in the same process.
|
|
//!
|
|
//! # Lifecycle
|
|
//!
|
|
//! 1. Construct runtime state with [`InProcessStartArgs`].
|
|
//! 2. Call [`start`], which performs the `initialize` / `initialized` handshake
|
|
//! internally and returns a ready-to-use [`InProcessClientHandle`].
|
|
//! 3. Send requests via [`InProcessClientHandle::request`], notifications via
|
|
//! [`InProcessClientHandle::notify`], and consume events via
|
|
//! [`InProcessClientHandle::next_event`].
|
|
//! 4. Terminate with [`InProcessClientHandle::shutdown`].
|
|
//!
|
|
//! # Transport model
|
|
//!
|
|
//! The runtime is transport-local but not protocol-free. Incoming requests are
|
|
//! typed [`ClientRequest`] values, yet responses still come back through the
|
|
//! same JSON-RPC result envelope that `MessageProcessor` uses for stdio and
|
|
//! websocket transports. This keeps in-process behavior aligned with
|
|
//! app-server rather than creating a second execution contract.
|
|
//!
|
|
//! # Backpressure
|
|
//!
|
|
//! Command submission uses `try_send` and can return `WouldBlock`, while event
|
|
//! fanout may drop notifications under saturation. Server requests are never
|
|
//! silently abandoned: if they cannot be queued they are failed back into
|
|
//! `MessageProcessor` with overload or internal errors so approval flows do
|
|
//! not hang indefinitely.
|
|
//!
|
|
//! # Relationship to `codex-app-server-client`
|
|
//!
|
|
//! This module provides the low-level runtime handle ([`InProcessClientHandle`]).
|
|
//! Higher-level callers (TUI, exec) should go through `codex-app-server-client`,
|
|
//! which wraps this module behind a worker task with async request/response
|
|
//! helpers, surface-specific startup policy, and bounded shutdown.
|
|
|
|
use std::collections::HashMap;
|
|
use std::collections::HashSet;
|
|
use std::collections::hash_map::Entry;
|
|
use std::io::Error as IoError;
|
|
use std::io::ErrorKind;
|
|
use std::io::Result as IoResult;
|
|
use std::sync::Arc;
|
|
use std::sync::RwLock;
|
|
use std::sync::atomic::AtomicBool;
|
|
use std::sync::atomic::Ordering;
|
|
use std::time::Duration;
|
|
|
|
use crate::error_code::INTERNAL_ERROR_CODE;
|
|
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
|
|
use crate::error_code::OVERLOADED_ERROR_CODE;
|
|
use crate::message_processor::ConnectionSessionState;
|
|
use crate::message_processor::MessageProcessor;
|
|
use crate::message_processor::MessageProcessorArgs;
|
|
use crate::outgoing_message::ConnectionId;
|
|
use crate::outgoing_message::OutgoingEnvelope;
|
|
use crate::outgoing_message::OutgoingMessage;
|
|
use crate::outgoing_message::OutgoingMessageSender;
|
|
use crate::transport::CHANNEL_CAPACITY;
|
|
use crate::transport::OutboundConnectionState;
|
|
use crate::transport::route_outgoing_envelope;
|
|
use codex_app_server_protocol::ClientNotification;
|
|
use codex_app_server_protocol::ClientRequest;
|
|
use codex_app_server_protocol::ConfigWarningNotification;
|
|
use codex_app_server_protocol::InitializeParams;
|
|
use codex_app_server_protocol::JSONRPCErrorError;
|
|
use codex_app_server_protocol::JSONRPCNotification;
|
|
use codex_app_server_protocol::RequestId;
|
|
use codex_app_server_protocol::Result;
|
|
use codex_app_server_protocol::ServerNotification;
|
|
use codex_app_server_protocol::ServerRequest;
|
|
use codex_arg0::Arg0DispatchPaths;
|
|
use codex_core::AuthManager;
|
|
use codex_core::ThreadManager;
|
|
use codex_core::config::Config;
|
|
use codex_core::config_loader::CloudRequirementsLoader;
|
|
use codex_core::config_loader::LoaderOverrides;
|
|
use codex_feedback::CodexFeedback;
|
|
use codex_protocol::protocol::SessionSource;
|
|
use tokio::sync::mpsc;
|
|
use tokio::sync::oneshot;
|
|
use tokio::time::timeout;
|
|
use toml::Value as TomlValue;
|
|
use tracing::warn;
|
|
|
|
const IN_PROCESS_CONNECTION_ID: ConnectionId = ConnectionId(0);
|
|
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
|
|
/// Default bounded channel capacity for in-process runtime queues.
|
|
pub const DEFAULT_IN_PROCESS_CHANNEL_CAPACITY: usize = CHANNEL_CAPACITY;
|
|
|
|
type PendingClientRequestResponse = std::result::Result<Result, JSONRPCErrorError>;
|
|
|
|
fn server_notification_requires_delivery(notification: &ServerNotification) -> bool {
|
|
matches!(notification, ServerNotification::TurnCompleted(_))
|
|
}
|
|
|
|
fn legacy_notification_requires_delivery(notification: &JSONRPCNotification) -> bool {
|
|
matches!(
|
|
notification
|
|
.method
|
|
.strip_prefix("codex/event/")
|
|
.unwrap_or(¬ification.method),
|
|
"task_complete" | "turn_aborted" | "shutdown_complete"
|
|
)
|
|
}
|
|
|
|
/// Input needed to start an in-process app-server runtime.
|
|
///
|
|
/// These fields mirror the pieces of ambient process state that stdio and
|
|
/// websocket transports normally assemble before `MessageProcessor` starts.
|
|
#[derive(Clone)]
|
|
pub struct InProcessStartArgs {
|
|
/// Resolved argv0 dispatch paths used by command execution internals.
|
|
pub arg0_paths: Arg0DispatchPaths,
|
|
/// Shared base config used to initialize core components.
|
|
pub config: Arc<Config>,
|
|
/// CLI config overrides that are already parsed into TOML values.
|
|
pub cli_overrides: Vec<(String, TomlValue)>,
|
|
/// Loader override knobs used by config API paths.
|
|
pub loader_overrides: LoaderOverrides,
|
|
/// Preloaded cloud requirements provider.
|
|
pub cloud_requirements: CloudRequirementsLoader,
|
|
/// Optional prebuilt auth manager reused by an embedding caller.
|
|
pub auth_manager: Option<Arc<AuthManager>>,
|
|
/// Optional prebuilt thread manager reused by an embedding caller.
|
|
pub thread_manager: Option<Arc<ThreadManager>>,
|
|
/// Feedback sink used by app-server/core telemetry and logs.
|
|
pub feedback: CodexFeedback,
|
|
/// Startup warnings emitted after initialize succeeds.
|
|
pub config_warnings: Vec<ConfigWarningNotification>,
|
|
/// Session source stamped into thread/session metadata.
|
|
pub session_source: SessionSource,
|
|
/// Whether auth loading should honor the `CODEX_API_KEY` environment variable.
|
|
pub enable_codex_api_key_env: bool,
|
|
/// Initialize params used for initial handshake.
|
|
pub initialize: InitializeParams,
|
|
/// Capacity used for all runtime queues (clamped to at least 1).
|
|
pub channel_capacity: usize,
|
|
}
|
|
|
|
/// Event emitted from the app-server to the in-process client.
|
|
///
|
|
/// The stream carries three event families because CLI surfaces are mid-migration
|
|
/// from the legacy `codex_protocol::Event` model to the typed app-server
|
|
/// notification model. Once all surfaces consume only [`ServerNotification`],
|
|
/// [`LegacyNotification`](Self::LegacyNotification) can be removed.
|
|
///
|
|
/// [`Lagged`](Self::Lagged) is a transport health marker, not an application
|
|
/// event — it signals that the consumer fell behind and some events were dropped.
|
|
#[derive(Debug, Clone)]
|
|
pub enum InProcessServerEvent {
|
|
/// Server request that requires client response/rejection.
|
|
ServerRequest(ServerRequest),
|
|
/// App-server notification directed to the embedded client.
|
|
ServerNotification(ServerNotification),
|
|
/// Legacy JSON-RPC notification from core event bridge.
|
|
LegacyNotification(JSONRPCNotification),
|
|
/// Indicates one or more events were dropped due to backpressure.
|
|
Lagged { skipped: usize },
|
|
}
|
|
|
|
/// Internal message sent from [`InProcessClientHandle`] methods to the runtime task.
|
|
///
|
|
/// Requests carry a oneshot sender for the response; notifications and server-request
|
|
/// replies are fire-and-forget from the caller's perspective (transport errors are
|
|
/// caught by `try_send` on the outer channel).
|
|
enum InProcessClientMessage {
|
|
Request {
|
|
request: Box<ClientRequest>,
|
|
response_tx: oneshot::Sender<PendingClientRequestResponse>,
|
|
},
|
|
Notification {
|
|
notification: ClientNotification,
|
|
},
|
|
ServerRequestResponse {
|
|
request_id: RequestId,
|
|
result: Result,
|
|
},
|
|
ServerRequestError {
|
|
request_id: RequestId,
|
|
error: JSONRPCErrorError,
|
|
},
|
|
Shutdown {
|
|
done_tx: oneshot::Sender<()>,
|
|
},
|
|
}
|
|
|
|
enum ProcessorCommand {
|
|
Request(Box<ClientRequest>),
|
|
Notification(ClientNotification),
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct InProcessClientSender {
|
|
client_tx: mpsc::Sender<InProcessClientMessage>,
|
|
}
|
|
|
|
impl InProcessClientSender {
|
|
pub async fn request(&self, request: ClientRequest) -> IoResult<PendingClientRequestResponse> {
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
self.try_send_client_message(InProcessClientMessage::Request {
|
|
request: Box::new(request),
|
|
response_tx,
|
|
})?;
|
|
response_rx.await.map_err(|err| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
format!("in-process request response channel closed: {err}"),
|
|
)
|
|
})
|
|
}
|
|
|
|
pub fn notify(&self, notification: ClientNotification) -> IoResult<()> {
|
|
self.try_send_client_message(InProcessClientMessage::Notification { notification })
|
|
}
|
|
|
|
pub fn respond_to_server_request(&self, request_id: RequestId, result: Result) -> IoResult<()> {
|
|
self.try_send_client_message(InProcessClientMessage::ServerRequestResponse {
|
|
request_id,
|
|
result,
|
|
})
|
|
}
|
|
|
|
pub fn fail_server_request(
|
|
&self,
|
|
request_id: RequestId,
|
|
error: JSONRPCErrorError,
|
|
) -> IoResult<()> {
|
|
self.try_send_client_message(InProcessClientMessage::ServerRequestError {
|
|
request_id,
|
|
error,
|
|
})
|
|
}
|
|
|
|
fn try_send_client_message(&self, message: InProcessClientMessage) -> IoResult<()> {
|
|
match self.client_tx.try_send(message) {
|
|
Ok(()) => Ok(()),
|
|
Err(mpsc::error::TrySendError::Full(_)) => Err(IoError::new(
|
|
ErrorKind::WouldBlock,
|
|
"in-process app-server client queue is full",
|
|
)),
|
|
Err(mpsc::error::TrySendError::Closed(_)) => Err(IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server runtime is closed",
|
|
)),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Handle used by an in-process client to call app-server and consume events.
|
|
///
|
|
/// This is the low-level runtime handle. Higher-level callers should usually go
|
|
/// through `codex-app-server-client`, which adds worker-task buffering,
|
|
/// request/response helpers, and surface-specific startup policy.
|
|
pub struct InProcessClientHandle {
|
|
client: InProcessClientSender,
|
|
event_rx: mpsc::Receiver<InProcessServerEvent>,
|
|
runtime_handle: tokio::task::JoinHandle<()>,
|
|
}
|
|
|
|
impl InProcessClientHandle {
|
|
/// Sends a typed client request into the in-process runtime.
|
|
///
|
|
/// The returned value is a transport-level `IoResult` containing either a
|
|
/// JSON-RPC success payload or JSON-RPC error payload. Callers must keep
|
|
/// request IDs unique among concurrent requests; reusing an in-flight ID
|
|
/// produces an `INVALID_REQUEST` response and can make request routing
|
|
/// ambiguous in the caller.
|
|
pub async fn request(&self, request: ClientRequest) -> IoResult<PendingClientRequestResponse> {
|
|
self.client.request(request).await
|
|
}
|
|
|
|
/// Sends a typed client notification into the in-process runtime.
|
|
///
|
|
/// Notifications do not have an application-level response. Transport
|
|
/// errors indicate queue saturation or closed runtime.
|
|
pub fn notify(&self, notification: ClientNotification) -> IoResult<()> {
|
|
self.client.notify(notification)
|
|
}
|
|
|
|
/// Resolves a pending [`ServerRequest`](InProcessServerEvent::ServerRequest).
|
|
///
|
|
/// This should be used only with request IDs received from the current
|
|
/// runtime event stream; sending arbitrary IDs has no effect on app-server
|
|
/// state and can mask a stuck approval flow in the caller.
|
|
pub fn respond_to_server_request(&self, request_id: RequestId, result: Result) -> IoResult<()> {
|
|
self.client.respond_to_server_request(request_id, result)
|
|
}
|
|
|
|
/// Rejects a pending [`ServerRequest`](InProcessServerEvent::ServerRequest).
|
|
///
|
|
/// Use this when the embedder cannot satisfy a server request; leaving
|
|
/// requests unanswered can stall turn progress.
|
|
pub fn fail_server_request(
|
|
&self,
|
|
request_id: RequestId,
|
|
error: JSONRPCErrorError,
|
|
) -> IoResult<()> {
|
|
self.client.fail_server_request(request_id, error)
|
|
}
|
|
|
|
/// Receives the next server event from the in-process runtime.
|
|
///
|
|
/// Returns `None` when the runtime task exits and no more events are
|
|
/// available.
|
|
pub async fn next_event(&mut self) -> Option<InProcessServerEvent> {
|
|
self.event_rx.recv().await
|
|
}
|
|
|
|
/// Requests runtime shutdown and waits for worker termination.
|
|
///
|
|
/// Shutdown is bounded by internal timeouts and may abort background tasks
|
|
/// if graceful drain does not complete in time.
|
|
pub async fn shutdown(self) -> IoResult<()> {
|
|
let mut runtime_handle = self.runtime_handle;
|
|
let (done_tx, done_rx) = oneshot::channel();
|
|
|
|
if self
|
|
.client
|
|
.client_tx
|
|
.send(InProcessClientMessage::Shutdown { done_tx })
|
|
.await
|
|
.is_ok()
|
|
{
|
|
let _ = timeout(SHUTDOWN_TIMEOUT, done_rx).await;
|
|
}
|
|
|
|
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut runtime_handle).await {
|
|
runtime_handle.abort();
|
|
let _ = runtime_handle.await;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub fn sender(&self) -> InProcessClientSender {
|
|
self.client.clone()
|
|
}
|
|
}
|
|
|
|
/// Starts an in-process app-server runtime and performs initialize handshake.
|
|
///
|
|
/// This function sends `initialize` followed by `initialized` before returning
|
|
/// the handle, so callers receive a ready-to-use runtime. If initialize fails,
|
|
/// the runtime is shut down and an `InvalidData` error is returned.
|
|
pub async fn start(args: InProcessStartArgs) -> IoResult<InProcessClientHandle> {
|
|
let initialize = args.initialize.clone();
|
|
let client = start_uninitialized(args);
|
|
|
|
let initialize_response = client
|
|
.request(ClientRequest::Initialize {
|
|
request_id: RequestId::Integer(0),
|
|
params: initialize,
|
|
})
|
|
.await?;
|
|
if let Err(error) = initialize_response {
|
|
let _ = client.shutdown().await;
|
|
return Err(IoError::new(
|
|
ErrorKind::InvalidData,
|
|
format!("in-process initialize failed: {}", error.message),
|
|
));
|
|
}
|
|
client.notify(ClientNotification::Initialized)?;
|
|
|
|
Ok(client)
|
|
}
|
|
|
|
fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
|
let channel_capacity = args.channel_capacity.max(1);
|
|
let (client_tx, mut client_rx) = mpsc::channel::<InProcessClientMessage>(channel_capacity);
|
|
let (event_tx, event_rx) = mpsc::channel::<InProcessServerEvent>(channel_capacity);
|
|
|
|
let runtime_handle = tokio::spawn(async move {
|
|
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(channel_capacity);
|
|
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
|
|
|
let (writer_tx, mut writer_rx) = mpsc::channel::<OutgoingMessage>(channel_capacity);
|
|
let outbound_initialized = Arc::new(AtomicBool::new(false));
|
|
let outbound_experimental_api_enabled = Arc::new(AtomicBool::new(false));
|
|
let outbound_opted_out_notification_methods = Arc::new(RwLock::new(HashSet::new()));
|
|
|
|
let mut outbound_connections = HashMap::<ConnectionId, OutboundConnectionState>::new();
|
|
outbound_connections.insert(
|
|
IN_PROCESS_CONNECTION_ID,
|
|
OutboundConnectionState::new(
|
|
writer_tx,
|
|
Arc::clone(&outbound_initialized),
|
|
Arc::clone(&outbound_experimental_api_enabled),
|
|
Arc::clone(&outbound_opted_out_notification_methods),
|
|
/*allow_legacy_notifications*/ true,
|
|
/*disconnect_sender*/ None,
|
|
),
|
|
);
|
|
let mut outbound_handle = tokio::spawn(async move {
|
|
while let Some(envelope) = outgoing_rx.recv().await {
|
|
route_outgoing_envelope(&mut outbound_connections, envelope).await;
|
|
}
|
|
});
|
|
|
|
let processor_outgoing = Arc::clone(&outgoing_message_sender);
|
|
let (processor_tx, mut processor_rx) = mpsc::channel::<ProcessorCommand>(channel_capacity);
|
|
let mut processor_handle = tokio::spawn(async move {
|
|
let mut processor = MessageProcessor::new(MessageProcessorArgs {
|
|
outgoing: Arc::clone(&processor_outgoing),
|
|
arg0_paths: args.arg0_paths,
|
|
config: args.config,
|
|
cli_overrides: args.cli_overrides,
|
|
loader_overrides: args.loader_overrides,
|
|
cloud_requirements: args.cloud_requirements,
|
|
auth_manager: args.auth_manager,
|
|
thread_manager: args.thread_manager,
|
|
feedback: args.feedback,
|
|
log_db: None,
|
|
config_warnings: args.config_warnings,
|
|
session_source: args.session_source,
|
|
enable_codex_api_key_env: args.enable_codex_api_key_env,
|
|
});
|
|
let mut thread_created_rx = processor.thread_created_receiver();
|
|
let mut session = ConnectionSessionState::default();
|
|
let mut listen_for_threads = true;
|
|
|
|
loop {
|
|
tokio::select! {
|
|
command = processor_rx.recv() => {
|
|
match command {
|
|
Some(ProcessorCommand::Request(request)) => {
|
|
let was_initialized = session.initialized;
|
|
processor
|
|
.process_client_request(
|
|
IN_PROCESS_CONNECTION_ID,
|
|
*request,
|
|
&mut session,
|
|
&outbound_initialized,
|
|
)
|
|
.await;
|
|
if let Ok(mut opted_out_notification_methods) =
|
|
outbound_opted_out_notification_methods.write()
|
|
{
|
|
*opted_out_notification_methods =
|
|
session.opted_out_notification_methods.clone();
|
|
} else {
|
|
warn!("failed to update outbound opted-out notifications");
|
|
}
|
|
outbound_experimental_api_enabled.store(
|
|
session.experimental_api_enabled,
|
|
Ordering::Release,
|
|
);
|
|
if !was_initialized && session.initialized {
|
|
processor.send_initialize_notifications().await;
|
|
}
|
|
}
|
|
Some(ProcessorCommand::Notification(notification)) => {
|
|
processor.process_client_notification(notification).await;
|
|
}
|
|
None => {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
created = thread_created_rx.recv(), if listen_for_threads => {
|
|
match created {
|
|
Ok(thread_id) => {
|
|
let connection_ids = if session.initialized {
|
|
vec![IN_PROCESS_CONNECTION_ID]
|
|
} else {
|
|
Vec::<ConnectionId>::new()
|
|
};
|
|
processor
|
|
.try_attach_thread_listener(thread_id, connection_ids)
|
|
.await;
|
|
}
|
|
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
|
|
warn!("thread_created receiver lagged; skipping resync");
|
|
}
|
|
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
|
listen_for_threads = false;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
processor.clear_runtime_references();
|
|
processor.connection_closed(IN_PROCESS_CONNECTION_ID).await;
|
|
processor.clear_all_thread_listeners().await;
|
|
processor.drain_background_tasks().await;
|
|
processor.shutdown_threads().await;
|
|
});
|
|
let mut pending_request_responses =
|
|
HashMap::<RequestId, oneshot::Sender<PendingClientRequestResponse>>::new();
|
|
let mut shutdown_ack = None;
|
|
|
|
loop {
|
|
tokio::select! {
|
|
message = client_rx.recv() => {
|
|
match message {
|
|
Some(InProcessClientMessage::Request { request, response_tx }) => {
|
|
let request = *request;
|
|
let request_id = request.id().clone();
|
|
match pending_request_responses.entry(request_id.clone()) {
|
|
Entry::Vacant(entry) => {
|
|
entry.insert(response_tx);
|
|
}
|
|
Entry::Occupied(_) => {
|
|
let _ = response_tx.send(Err(JSONRPCErrorError {
|
|
code: INVALID_REQUEST_ERROR_CODE,
|
|
message: format!("duplicate request id: {request_id:?}"),
|
|
data: None,
|
|
}));
|
|
continue;
|
|
}
|
|
}
|
|
|
|
match processor_tx.try_send(ProcessorCommand::Request(Box::new(request))) {
|
|
Ok(()) => {}
|
|
Err(mpsc::error::TrySendError::Full(_)) => {
|
|
if let Some(response_tx) =
|
|
pending_request_responses.remove(&request_id)
|
|
{
|
|
let _ = response_tx.send(Err(JSONRPCErrorError {
|
|
code: OVERLOADED_ERROR_CODE,
|
|
message: "in-process app-server request queue is full"
|
|
.to_string(),
|
|
data: None,
|
|
}));
|
|
}
|
|
}
|
|
Err(mpsc::error::TrySendError::Closed(_)) => {
|
|
if let Some(response_tx) =
|
|
pending_request_responses.remove(&request_id)
|
|
{
|
|
let _ = response_tx.send(Err(JSONRPCErrorError {
|
|
code: INTERNAL_ERROR_CODE,
|
|
message:
|
|
"in-process app-server request processor is closed"
|
|
.to_string(),
|
|
data: None,
|
|
}));
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
Some(InProcessClientMessage::Notification { notification }) => {
|
|
match processor_tx.try_send(ProcessorCommand::Notification(notification)) {
|
|
Ok(()) => {}
|
|
Err(mpsc::error::TrySendError::Full(_)) => {
|
|
warn!("dropping in-process client notification (queue full)");
|
|
}
|
|
Err(mpsc::error::TrySendError::Closed(_)) => {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
Some(InProcessClientMessage::ServerRequestResponse { request_id, result }) => {
|
|
outgoing_message_sender
|
|
.notify_client_response(request_id, result)
|
|
.await;
|
|
}
|
|
Some(InProcessClientMessage::ServerRequestError { request_id, error }) => {
|
|
outgoing_message_sender
|
|
.notify_client_error(request_id, error)
|
|
.await;
|
|
}
|
|
Some(InProcessClientMessage::Shutdown { done_tx }) => {
|
|
shutdown_ack = Some(done_tx);
|
|
break;
|
|
}
|
|
None => {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
outgoing_message = writer_rx.recv() => {
|
|
let Some(outgoing_message) = outgoing_message else {
|
|
break;
|
|
};
|
|
match outgoing_message {
|
|
OutgoingMessage::Response(response) => {
|
|
if let Some(response_tx) = pending_request_responses.remove(&response.id) {
|
|
let _ = response_tx.send(Ok(response.result));
|
|
} else {
|
|
warn!(
|
|
request_id = ?response.id,
|
|
"dropping unmatched in-process response"
|
|
);
|
|
}
|
|
}
|
|
OutgoingMessage::Error(error) => {
|
|
if let Some(response_tx) = pending_request_responses.remove(&error.id) {
|
|
let _ = response_tx.send(Err(error.error));
|
|
} else {
|
|
warn!(
|
|
request_id = ?error.id,
|
|
"dropping unmatched in-process error response"
|
|
);
|
|
}
|
|
}
|
|
OutgoingMessage::Request(request) => {
|
|
// Send directly to avoid cloning; on failure the
|
|
// original value is returned inside the error.
|
|
if let Err(send_error) = event_tx
|
|
.try_send(InProcessServerEvent::ServerRequest(request))
|
|
{
|
|
let (code, message, inner) = match send_error {
|
|
mpsc::error::TrySendError::Full(inner) => (
|
|
OVERLOADED_ERROR_CODE,
|
|
"in-process server request queue is full",
|
|
inner,
|
|
),
|
|
mpsc::error::TrySendError::Closed(inner) => (
|
|
INTERNAL_ERROR_CODE,
|
|
"in-process server request consumer is closed",
|
|
inner,
|
|
),
|
|
};
|
|
let request_id = match inner {
|
|
InProcessServerEvent::ServerRequest(req) => req.id().clone(),
|
|
_ => unreachable!("we just sent a ServerRequest variant"),
|
|
};
|
|
outgoing_message_sender
|
|
.notify_client_error(
|
|
request_id,
|
|
JSONRPCErrorError {
|
|
code,
|
|
message: message.to_string(),
|
|
data: None,
|
|
},
|
|
)
|
|
.await;
|
|
}
|
|
}
|
|
OutgoingMessage::AppServerNotification(notification) => {
|
|
if server_notification_requires_delivery(¬ification) {
|
|
if event_tx
|
|
.send(InProcessServerEvent::ServerNotification(notification))
|
|
.await
|
|
.is_err()
|
|
{
|
|
break;
|
|
}
|
|
} else if let Err(send_error) =
|
|
event_tx.try_send(InProcessServerEvent::ServerNotification(notification))
|
|
{
|
|
match send_error {
|
|
mpsc::error::TrySendError::Full(_) => {
|
|
warn!("dropping in-process server notification (queue full)");
|
|
}
|
|
mpsc::error::TrySendError::Closed(_) => {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
OutgoingMessage::Notification(notification) => {
|
|
let notification = JSONRPCNotification {
|
|
method: notification.method,
|
|
params: notification.params,
|
|
};
|
|
if legacy_notification_requires_delivery(¬ification) {
|
|
if event_tx
|
|
.send(InProcessServerEvent::LegacyNotification(notification))
|
|
.await
|
|
.is_err()
|
|
{
|
|
break;
|
|
}
|
|
} else if let Err(send_error) =
|
|
event_tx.try_send(InProcessServerEvent::LegacyNotification(notification))
|
|
{
|
|
match send_error {
|
|
mpsc::error::TrySendError::Full(_) => {
|
|
warn!("dropping in-process legacy notification (queue full)");
|
|
}
|
|
mpsc::error::TrySendError::Closed(_) => {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
drop(writer_rx);
|
|
drop(processor_tx);
|
|
outgoing_message_sender
|
|
.cancel_all_requests(Some(JSONRPCErrorError {
|
|
code: INTERNAL_ERROR_CODE,
|
|
message: "in-process app-server runtime is shutting down".to_string(),
|
|
data: None,
|
|
}))
|
|
.await;
|
|
// Drop the runtime's last sender before awaiting the router task so
|
|
// `outgoing_rx.recv()` can observe channel closure and exit cleanly.
|
|
drop(outgoing_message_sender);
|
|
for (_, response_tx) in pending_request_responses {
|
|
let _ = response_tx.send(Err(JSONRPCErrorError {
|
|
code: INTERNAL_ERROR_CODE,
|
|
message: "in-process app-server runtime is shutting down".to_string(),
|
|
data: None,
|
|
}));
|
|
}
|
|
|
|
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut processor_handle).await {
|
|
processor_handle.abort();
|
|
let _ = processor_handle.await;
|
|
}
|
|
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut outbound_handle).await {
|
|
outbound_handle.abort();
|
|
let _ = outbound_handle.await;
|
|
}
|
|
|
|
if let Some(done_tx) = shutdown_ack {
|
|
let _ = done_tx.send(());
|
|
}
|
|
});
|
|
|
|
InProcessClientHandle {
|
|
client: InProcessClientSender { client_tx },
|
|
event_rx,
|
|
runtime_handle,
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use codex_app_server_protocol::ClientInfo;
|
|
use codex_app_server_protocol::ConfigRequirementsReadResponse;
|
|
use codex_app_server_protocol::SessionSource as ApiSessionSource;
|
|
use codex_app_server_protocol::ThreadStartParams;
|
|
use codex_app_server_protocol::ThreadStartResponse;
|
|
use codex_app_server_protocol::Turn;
|
|
use codex_app_server_protocol::TurnCompletedNotification;
|
|
use codex_app_server_protocol::TurnStatus;
|
|
use codex_core::config::ConfigBuilder;
|
|
use pretty_assertions::assert_eq;
|
|
|
|
async fn build_test_config() -> Config {
|
|
match ConfigBuilder::default().build().await {
|
|
Ok(config) => config,
|
|
Err(_) => Config::load_default_with_cli_overrides(Vec::new())
|
|
.expect("default config should load"),
|
|
}
|
|
}
|
|
|
|
async fn start_test_client_with_capacity(
|
|
session_source: SessionSource,
|
|
channel_capacity: usize,
|
|
) -> InProcessClientHandle {
|
|
let args = InProcessStartArgs {
|
|
arg0_paths: Arg0DispatchPaths::default(),
|
|
config: Arc::new(build_test_config().await),
|
|
cli_overrides: Vec::new(),
|
|
loader_overrides: LoaderOverrides::default(),
|
|
cloud_requirements: CloudRequirementsLoader::default(),
|
|
auth_manager: None,
|
|
thread_manager: None,
|
|
feedback: CodexFeedback::new(),
|
|
config_warnings: Vec::new(),
|
|
session_source,
|
|
enable_codex_api_key_env: false,
|
|
initialize: InitializeParams {
|
|
client_info: ClientInfo {
|
|
name: "codex-in-process-test".to_string(),
|
|
title: None,
|
|
version: "0.0.0".to_string(),
|
|
},
|
|
capabilities: None,
|
|
},
|
|
channel_capacity,
|
|
};
|
|
start(args).await.expect("in-process runtime should start")
|
|
}
|
|
|
|
async fn start_test_client(session_source: SessionSource) -> InProcessClientHandle {
|
|
start_test_client_with_capacity(session_source, DEFAULT_IN_PROCESS_CHANNEL_CAPACITY).await
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn in_process_start_initializes_and_handles_typed_v2_request() {
|
|
let client = start_test_client(SessionSource::Cli).await;
|
|
let response = client
|
|
.request(ClientRequest::ConfigRequirementsRead {
|
|
request_id: RequestId::Integer(1),
|
|
params: None,
|
|
})
|
|
.await
|
|
.expect("request transport should work")
|
|
.expect("request should succeed");
|
|
assert!(response.is_object());
|
|
|
|
let _parsed: ConfigRequirementsReadResponse =
|
|
serde_json::from_value(response).expect("response should match v2 schema");
|
|
client
|
|
.shutdown()
|
|
.await
|
|
.expect("in-process runtime should shutdown cleanly");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn in_process_start_uses_requested_session_source_for_thread_start() {
|
|
for (requested_source, expected_source) in [
|
|
(SessionSource::Cli, ApiSessionSource::Cli),
|
|
(SessionSource::Exec, ApiSessionSource::Exec),
|
|
] {
|
|
let client = start_test_client(requested_source).await;
|
|
let response = client
|
|
.request(ClientRequest::ThreadStart {
|
|
request_id: RequestId::Integer(2),
|
|
params: ThreadStartParams {
|
|
ephemeral: Some(true),
|
|
..ThreadStartParams::default()
|
|
},
|
|
})
|
|
.await
|
|
.expect("request transport should work")
|
|
.expect("thread/start should succeed");
|
|
let parsed: ThreadStartResponse =
|
|
serde_json::from_value(response).expect("thread/start response should parse");
|
|
assert_eq!(parsed.thread.source, expected_source);
|
|
client
|
|
.shutdown()
|
|
.await
|
|
.expect("in-process runtime should shutdown cleanly");
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn in_process_start_clamps_zero_channel_capacity() {
|
|
let client = start_test_client_with_capacity(SessionSource::Cli, 0).await;
|
|
let response = loop {
|
|
match client
|
|
.request(ClientRequest::ConfigRequirementsRead {
|
|
request_id: RequestId::Integer(4),
|
|
params: None,
|
|
})
|
|
.await
|
|
{
|
|
Ok(response) => break response.expect("request should succeed"),
|
|
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
|
|
tokio::task::yield_now().await;
|
|
}
|
|
Err(err) => panic!("request transport should work: {err}"),
|
|
}
|
|
};
|
|
let _parsed: ConfigRequirementsReadResponse =
|
|
serde_json::from_value(response).expect("response should match v2 schema");
|
|
client
|
|
.shutdown()
|
|
.await
|
|
.expect("in-process runtime should shutdown cleanly");
|
|
}
|
|
|
|
#[test]
|
|
fn guaranteed_delivery_helpers_cover_terminal_notifications() {
|
|
assert!(server_notification_requires_delivery(
|
|
&ServerNotification::TurnCompleted(TurnCompletedNotification {
|
|
thread_id: "thread-1".to_string(),
|
|
turn: Turn {
|
|
id: "turn-1".to_string(),
|
|
items: Vec::new(),
|
|
status: TurnStatus::Completed,
|
|
error: None,
|
|
},
|
|
})
|
|
));
|
|
|
|
assert!(legacy_notification_requires_delivery(
|
|
&JSONRPCNotification {
|
|
method: "codex/event/task_complete".to_string(),
|
|
params: None,
|
|
}
|
|
));
|
|
assert!(legacy_notification_requires_delivery(
|
|
&JSONRPCNotification {
|
|
method: "codex/event/turn_aborted".to_string(),
|
|
params: None,
|
|
}
|
|
));
|
|
assert!(legacy_notification_requires_delivery(
|
|
&JSONRPCNotification {
|
|
method: "codex/event/shutdown_complete".to_string(),
|
|
params: None,
|
|
}
|
|
));
|
|
assert!(!legacy_notification_requires_delivery(
|
|
&JSONRPCNotification {
|
|
method: "codex/event/item_started".to_string(),
|
|
params: None,
|
|
}
|
|
));
|
|
}
|
|
}
|