This PR replicates the `tui` code directory and creates a temporary parallel `tui_app_server` directory. It also implements a new feature flag `tui_app_server` to select between the two tui implementations. Once the new app-server-based TUI is stabilized, we'll delete the old `tui` directory and feature flag.
911 lines
37 KiB
Rust
911 lines
37 KiB
Rust
/*
|
|
This module implements the websocket-backed app-server client transport.
|
|
|
|
It owns the remote connection lifecycle, including the initialize/initialized
|
|
handshake, JSON-RPC request/response routing, server-request resolution, and
|
|
notification streaming. The rest of the crate uses the same `AppServerEvent`
|
|
surface for both in-process and remote transports, so callers such as
|
|
`tui_app_server` can switch between them without changing their higher-level
|
|
session logic.
|
|
*/
|
|
|
|
use std::collections::HashMap;
|
|
use std::collections::VecDeque;
|
|
use std::io::Error as IoError;
|
|
use std::io::ErrorKind;
|
|
use std::io::Result as IoResult;
|
|
use std::time::Duration;
|
|
|
|
use crate::AppServerEvent;
|
|
use crate::RequestResult;
|
|
use crate::SHUTDOWN_TIMEOUT;
|
|
use crate::TypedRequestError;
|
|
use crate::request_method_name;
|
|
use codex_app_server_protocol::ClientInfo;
|
|
use codex_app_server_protocol::ClientNotification;
|
|
use codex_app_server_protocol::ClientRequest;
|
|
use codex_app_server_protocol::InitializeCapabilities;
|
|
use codex_app_server_protocol::InitializeParams;
|
|
use codex_app_server_protocol::JSONRPCError;
|
|
use codex_app_server_protocol::JSONRPCErrorError;
|
|
use codex_app_server_protocol::JSONRPCMessage;
|
|
use codex_app_server_protocol::JSONRPCNotification;
|
|
use codex_app_server_protocol::JSONRPCRequest;
|
|
use codex_app_server_protocol::JSONRPCResponse;
|
|
use codex_app_server_protocol::RequestId;
|
|
use codex_app_server_protocol::Result as JsonRpcResult;
|
|
use codex_app_server_protocol::ServerNotification;
|
|
use codex_app_server_protocol::ServerRequest;
|
|
use futures::SinkExt;
|
|
use futures::StreamExt;
|
|
use serde::de::DeserializeOwned;
|
|
use tokio::net::TcpStream;
|
|
use tokio::sync::mpsc;
|
|
use tokio::sync::oneshot;
|
|
use tokio::time::timeout;
|
|
use tokio_tungstenite::MaybeTlsStream;
|
|
use tokio_tungstenite::WebSocketStream;
|
|
use tokio_tungstenite::connect_async;
|
|
use tokio_tungstenite::tungstenite::Message;
|
|
use tracing::warn;
|
|
use url::Url;
|
|
|
|
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
|
|
const INITIALIZE_TIMEOUT: Duration = Duration::from_secs(10);
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct RemoteAppServerConnectArgs {
|
|
pub websocket_url: String,
|
|
pub client_name: String,
|
|
pub client_version: String,
|
|
pub experimental_api: bool,
|
|
pub opt_out_notification_methods: Vec<String>,
|
|
pub channel_capacity: usize,
|
|
}
|
|
|
|
impl RemoteAppServerConnectArgs {
|
|
fn initialize_params(&self) -> InitializeParams {
|
|
let capabilities = InitializeCapabilities {
|
|
experimental_api: self.experimental_api,
|
|
opt_out_notification_methods: if self.opt_out_notification_methods.is_empty() {
|
|
None
|
|
} else {
|
|
Some(self.opt_out_notification_methods.clone())
|
|
},
|
|
};
|
|
|
|
InitializeParams {
|
|
client_info: ClientInfo {
|
|
name: self.client_name.clone(),
|
|
title: None,
|
|
version: self.client_version.clone(),
|
|
},
|
|
capabilities: Some(capabilities),
|
|
}
|
|
}
|
|
}
|
|
|
|
enum RemoteClientCommand {
|
|
Request {
|
|
request: Box<ClientRequest>,
|
|
response_tx: oneshot::Sender<IoResult<RequestResult>>,
|
|
},
|
|
Notify {
|
|
notification: ClientNotification,
|
|
response_tx: oneshot::Sender<IoResult<()>>,
|
|
},
|
|
ResolveServerRequest {
|
|
request_id: RequestId,
|
|
result: JsonRpcResult,
|
|
response_tx: oneshot::Sender<IoResult<()>>,
|
|
},
|
|
RejectServerRequest {
|
|
request_id: RequestId,
|
|
error: JSONRPCErrorError,
|
|
response_tx: oneshot::Sender<IoResult<()>>,
|
|
},
|
|
Shutdown {
|
|
response_tx: oneshot::Sender<IoResult<()>>,
|
|
},
|
|
}
|
|
|
|
pub struct RemoteAppServerClient {
|
|
command_tx: mpsc::Sender<RemoteClientCommand>,
|
|
event_rx: mpsc::Receiver<AppServerEvent>,
|
|
pending_events: VecDeque<AppServerEvent>,
|
|
worker_handle: tokio::task::JoinHandle<()>,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct RemoteAppServerRequestHandle {
|
|
command_tx: mpsc::Sender<RemoteClientCommand>,
|
|
}
|
|
|
|
impl RemoteAppServerClient {
|
|
pub async fn connect(args: RemoteAppServerConnectArgs) -> IoResult<Self> {
|
|
let channel_capacity = args.channel_capacity.max(1);
|
|
let websocket_url = args.websocket_url.clone();
|
|
let url = Url::parse(&websocket_url).map_err(|err| {
|
|
IoError::new(
|
|
ErrorKind::InvalidInput,
|
|
format!("invalid websocket URL `{websocket_url}`: {err}"),
|
|
)
|
|
})?;
|
|
let stream = timeout(CONNECT_TIMEOUT, connect_async(url.as_str()))
|
|
.await
|
|
.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::TimedOut,
|
|
format!("timed out connecting to remote app server at `{websocket_url}`"),
|
|
)
|
|
})?
|
|
.map(|(stream, _response)| stream)
|
|
.map_err(|err| {
|
|
IoError::other(format!(
|
|
"failed to connect to remote app server at `{websocket_url}`: {err}"
|
|
))
|
|
})?;
|
|
let mut stream = stream;
|
|
let pending_events = initialize_remote_connection(
|
|
&mut stream,
|
|
&websocket_url,
|
|
args.initialize_params(),
|
|
INITIALIZE_TIMEOUT,
|
|
)
|
|
.await?;
|
|
|
|
let (command_tx, mut command_rx) = mpsc::channel::<RemoteClientCommand>(channel_capacity);
|
|
let (event_tx, event_rx) = mpsc::channel::<AppServerEvent>(channel_capacity);
|
|
let worker_handle = tokio::spawn(async move {
|
|
let mut pending_requests =
|
|
HashMap::<RequestId, oneshot::Sender<IoResult<RequestResult>>>::new();
|
|
let mut skipped_events = 0usize;
|
|
loop {
|
|
tokio::select! {
|
|
command = command_rx.recv() => {
|
|
let Some(command) = command else {
|
|
let _ = stream.close(None).await;
|
|
break;
|
|
};
|
|
match command {
|
|
RemoteClientCommand::Request { request, response_tx } => {
|
|
let request_id = request_id_from_client_request(&request);
|
|
if pending_requests.contains_key(&request_id) {
|
|
let _ = response_tx.send(Err(IoError::new(
|
|
ErrorKind::InvalidInput,
|
|
format!("duplicate remote app-server request id `{request_id}`"),
|
|
)));
|
|
continue;
|
|
}
|
|
pending_requests.insert(request_id.clone(), response_tx);
|
|
if let Err(err) = write_jsonrpc_message(
|
|
&mut stream,
|
|
JSONRPCMessage::Request(jsonrpc_request_from_client_request(*request)),
|
|
&websocket_url,
|
|
)
|
|
.await
|
|
{
|
|
let err_message = err.to_string();
|
|
if let Some(response_tx) = pending_requests.remove(&request_id) {
|
|
let _ = response_tx.send(Err(err));
|
|
}
|
|
let _ = deliver_event(
|
|
&event_tx,
|
|
&mut skipped_events,
|
|
AppServerEvent::Disconnected {
|
|
message: format!(
|
|
"remote app server at `{websocket_url}` write failed: {err_message}"
|
|
),
|
|
},
|
|
&mut stream,
|
|
)
|
|
.await;
|
|
break;
|
|
}
|
|
}
|
|
RemoteClientCommand::Notify { notification, response_tx } => {
|
|
let result = write_jsonrpc_message(
|
|
&mut stream,
|
|
JSONRPCMessage::Notification(
|
|
jsonrpc_notification_from_client_notification(notification),
|
|
),
|
|
&websocket_url,
|
|
)
|
|
.await;
|
|
let _ = response_tx.send(result);
|
|
}
|
|
RemoteClientCommand::ResolveServerRequest {
|
|
request_id,
|
|
result,
|
|
response_tx,
|
|
} => {
|
|
let result = write_jsonrpc_message(
|
|
&mut stream,
|
|
JSONRPCMessage::Response(JSONRPCResponse {
|
|
id: request_id,
|
|
result,
|
|
}),
|
|
&websocket_url,
|
|
)
|
|
.await;
|
|
let _ = response_tx.send(result);
|
|
}
|
|
RemoteClientCommand::RejectServerRequest {
|
|
request_id,
|
|
error,
|
|
response_tx,
|
|
} => {
|
|
let result = write_jsonrpc_message(
|
|
&mut stream,
|
|
JSONRPCMessage::Error(JSONRPCError {
|
|
error,
|
|
id: request_id,
|
|
}),
|
|
&websocket_url,
|
|
)
|
|
.await;
|
|
let _ = response_tx.send(result);
|
|
}
|
|
RemoteClientCommand::Shutdown { response_tx } => {
|
|
let close_result = stream.close(None).await.map_err(|err| {
|
|
IoError::other(format!(
|
|
"failed to close websocket app server `{websocket_url}`: {err}"
|
|
))
|
|
});
|
|
let _ = response_tx.send(close_result);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
message = stream.next() => {
|
|
match message {
|
|
Some(Ok(Message::Text(text))) => {
|
|
match serde_json::from_str::<JSONRPCMessage>(&text) {
|
|
Ok(JSONRPCMessage::Response(response)) => {
|
|
if let Some(response_tx) = pending_requests.remove(&response.id) {
|
|
let _ = response_tx.send(Ok(Ok(response.result)));
|
|
}
|
|
}
|
|
Ok(JSONRPCMessage::Error(error)) => {
|
|
if let Some(response_tx) = pending_requests.remove(&error.id) {
|
|
let _ = response_tx.send(Ok(Err(error.error)));
|
|
}
|
|
}
|
|
Ok(JSONRPCMessage::Notification(notification)) => {
|
|
let event = app_server_event_from_notification(notification);
|
|
if let Err(err) = deliver_event(
|
|
&event_tx,
|
|
&mut skipped_events,
|
|
event,
|
|
&mut stream,
|
|
)
|
|
.await
|
|
{
|
|
warn!(%err, "failed to deliver remote app-server event");
|
|
break;
|
|
}
|
|
}
|
|
Ok(JSONRPCMessage::Request(request)) => {
|
|
let request_id = request.id.clone();
|
|
let method = request.method.clone();
|
|
match ServerRequest::try_from(request) {
|
|
Ok(request) => {
|
|
if let Err(err) = deliver_event(
|
|
&event_tx,
|
|
&mut skipped_events,
|
|
AppServerEvent::ServerRequest(request),
|
|
&mut stream,
|
|
)
|
|
.await
|
|
{
|
|
warn!(%err, "failed to deliver remote app-server server request");
|
|
break;
|
|
}
|
|
}
|
|
Err(err) => {
|
|
warn!(%err, method, "rejecting unknown remote app-server request");
|
|
if let Err(reject_err) = write_jsonrpc_message(
|
|
&mut stream,
|
|
JSONRPCMessage::Error(JSONRPCError {
|
|
error: JSONRPCErrorError {
|
|
code: -32601,
|
|
message: format!(
|
|
"unsupported remote app-server request `{method}`"
|
|
),
|
|
data: None,
|
|
},
|
|
id: request_id,
|
|
}),
|
|
&websocket_url,
|
|
)
|
|
.await
|
|
{
|
|
let err_message = reject_err.to_string();
|
|
let _ = deliver_event(
|
|
&event_tx,
|
|
&mut skipped_events,
|
|
AppServerEvent::Disconnected {
|
|
message: format!(
|
|
"remote app server at `{websocket_url}` write failed: {err_message}"
|
|
),
|
|
},
|
|
&mut stream,
|
|
)
|
|
.await;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Err(err) => {
|
|
let _ = deliver_event(
|
|
&event_tx,
|
|
&mut skipped_events,
|
|
AppServerEvent::Disconnected {
|
|
message: format!(
|
|
"remote app server at `{websocket_url}` sent invalid JSON-RPC: {err}"
|
|
),
|
|
},
|
|
&mut stream,
|
|
)
|
|
.await;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
Some(Ok(Message::Close(frame))) => {
|
|
let reason = frame
|
|
.as_ref()
|
|
.map(|frame| frame.reason.to_string())
|
|
.filter(|reason| !reason.is_empty())
|
|
.unwrap_or_else(|| "connection closed".to_string());
|
|
let _ = deliver_event(
|
|
&event_tx,
|
|
&mut skipped_events,
|
|
AppServerEvent::Disconnected {
|
|
message: format!(
|
|
"remote app server at `{websocket_url}` disconnected: {reason}"
|
|
),
|
|
},
|
|
&mut stream,
|
|
)
|
|
.await;
|
|
break;
|
|
}
|
|
Some(Ok(Message::Binary(_)))
|
|
| Some(Ok(Message::Ping(_)))
|
|
| Some(Ok(Message::Pong(_)))
|
|
| Some(Ok(Message::Frame(_))) => {}
|
|
Some(Err(err)) => {
|
|
let _ = deliver_event(
|
|
&event_tx,
|
|
&mut skipped_events,
|
|
AppServerEvent::Disconnected {
|
|
message: format!(
|
|
"remote app server at `{websocket_url}` transport failed: {err}"
|
|
),
|
|
},
|
|
&mut stream,
|
|
)
|
|
.await;
|
|
break;
|
|
}
|
|
None => {
|
|
let _ = deliver_event(
|
|
&event_tx,
|
|
&mut skipped_events,
|
|
AppServerEvent::Disconnected {
|
|
message: format!(
|
|
"remote app server at `{websocket_url}` closed the connection"
|
|
),
|
|
},
|
|
&mut stream,
|
|
)
|
|
.await;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
let err = IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"remote app-server worker channel is closed",
|
|
);
|
|
for (_, response_tx) in pending_requests {
|
|
let _ = response_tx.send(Err(IoError::new(err.kind(), err.to_string())));
|
|
}
|
|
});
|
|
|
|
Ok(Self {
|
|
command_tx,
|
|
event_rx,
|
|
pending_events: pending_events.into(),
|
|
worker_handle,
|
|
})
|
|
}
|
|
|
|
pub fn request_handle(&self) -> RemoteAppServerRequestHandle {
|
|
RemoteAppServerRequestHandle {
|
|
command_tx: self.command_tx.clone(),
|
|
}
|
|
}
|
|
|
|
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(RemoteClientCommand::Request {
|
|
request: Box::new(request),
|
|
response_tx,
|
|
})
|
|
.await
|
|
.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"remote app-server worker channel is closed",
|
|
)
|
|
})?;
|
|
response_rx.await.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"remote app-server request channel is closed",
|
|
)
|
|
})?
|
|
}
|
|
|
|
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
|
|
where
|
|
T: DeserializeOwned,
|
|
{
|
|
let method = request_method_name(&request);
|
|
let response =
|
|
self.request(request)
|
|
.await
|
|
.map_err(|source| TypedRequestError::Transport {
|
|
method: method.clone(),
|
|
source,
|
|
})?;
|
|
let result = response.map_err(|source| TypedRequestError::Server {
|
|
method: method.clone(),
|
|
source,
|
|
})?;
|
|
serde_json::from_value(result)
|
|
.map_err(|source| TypedRequestError::Deserialize { method, source })
|
|
}
|
|
|
|
pub async fn notify(&self, notification: ClientNotification) -> IoResult<()> {
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(RemoteClientCommand::Notify {
|
|
notification,
|
|
response_tx,
|
|
})
|
|
.await
|
|
.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"remote app-server worker channel is closed",
|
|
)
|
|
})?;
|
|
response_rx.await.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"remote app-server notify channel is closed",
|
|
)
|
|
})?
|
|
}
|
|
|
|
pub async fn resolve_server_request(
|
|
&self,
|
|
request_id: RequestId,
|
|
result: JsonRpcResult,
|
|
) -> IoResult<()> {
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(RemoteClientCommand::ResolveServerRequest {
|
|
request_id,
|
|
result,
|
|
response_tx,
|
|
})
|
|
.await
|
|
.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"remote app-server worker channel is closed",
|
|
)
|
|
})?;
|
|
response_rx.await.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"remote app-server resolve channel is closed",
|
|
)
|
|
})?
|
|
}
|
|
|
|
pub async fn reject_server_request(
|
|
&self,
|
|
request_id: RequestId,
|
|
error: JSONRPCErrorError,
|
|
) -> IoResult<()> {
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(RemoteClientCommand::RejectServerRequest {
|
|
request_id,
|
|
error,
|
|
response_tx,
|
|
})
|
|
.await
|
|
.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"remote app-server worker channel is closed",
|
|
)
|
|
})?;
|
|
response_rx.await.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"remote app-server reject channel is closed",
|
|
)
|
|
})?
|
|
}
|
|
|
|
pub async fn next_event(&mut self) -> Option<AppServerEvent> {
|
|
if let Some(event) = self.pending_events.pop_front() {
|
|
return Some(event);
|
|
}
|
|
self.event_rx.recv().await
|
|
}
|
|
|
|
pub async fn shutdown(self) -> IoResult<()> {
|
|
let Self {
|
|
command_tx,
|
|
event_rx,
|
|
pending_events: _pending_events,
|
|
worker_handle,
|
|
} = self;
|
|
let mut worker_handle = worker_handle;
|
|
drop(event_rx);
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
if command_tx
|
|
.send(RemoteClientCommand::Shutdown { response_tx })
|
|
.await
|
|
.is_ok()
|
|
&& let Ok(command_result) = timeout(SHUTDOWN_TIMEOUT, response_rx).await
|
|
{
|
|
command_result.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"remote app-server shutdown channel is closed",
|
|
)
|
|
})??;
|
|
}
|
|
|
|
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut worker_handle).await {
|
|
worker_handle.abort();
|
|
let _ = worker_handle.await;
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl RemoteAppServerRequestHandle {
|
|
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(RemoteClientCommand::Request {
|
|
request: Box::new(request),
|
|
response_tx,
|
|
})
|
|
.await
|
|
.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"remote app-server worker channel is closed",
|
|
)
|
|
})?;
|
|
response_rx.await.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"remote app-server request channel is closed",
|
|
)
|
|
})?
|
|
}
|
|
|
|
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
|
|
where
|
|
T: DeserializeOwned,
|
|
{
|
|
let method = request_method_name(&request);
|
|
let response =
|
|
self.request(request)
|
|
.await
|
|
.map_err(|source| TypedRequestError::Transport {
|
|
method: method.clone(),
|
|
source,
|
|
})?;
|
|
let result = response.map_err(|source| TypedRequestError::Server {
|
|
method: method.clone(),
|
|
source,
|
|
})?;
|
|
serde_json::from_value(result)
|
|
.map_err(|source| TypedRequestError::Deserialize { method, source })
|
|
}
|
|
}
|
|
|
|
async fn initialize_remote_connection(
|
|
stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
|
|
websocket_url: &str,
|
|
params: InitializeParams,
|
|
initialize_timeout: Duration,
|
|
) -> IoResult<Vec<AppServerEvent>> {
|
|
let initialize_request_id = RequestId::String("initialize".to_string());
|
|
let mut pending_events = Vec::new();
|
|
write_jsonrpc_message(
|
|
stream,
|
|
JSONRPCMessage::Request(jsonrpc_request_from_client_request(
|
|
ClientRequest::Initialize {
|
|
request_id: initialize_request_id.clone(),
|
|
params,
|
|
},
|
|
)),
|
|
websocket_url,
|
|
)
|
|
.await?;
|
|
|
|
timeout(initialize_timeout, async {
|
|
loop {
|
|
match stream.next().await {
|
|
Some(Ok(Message::Text(text))) => {
|
|
let message = serde_json::from_str::<JSONRPCMessage>(&text).map_err(|err| {
|
|
IoError::other(format!(
|
|
"remote app server at `{websocket_url}` sent invalid initialize response: {err}"
|
|
))
|
|
})?;
|
|
match message {
|
|
JSONRPCMessage::Response(response) if response.id == initialize_request_id => {
|
|
break Ok(());
|
|
}
|
|
JSONRPCMessage::Error(error) if error.id == initialize_request_id => {
|
|
break Err(IoError::other(format!(
|
|
"remote app server at `{websocket_url}` rejected initialize: {}",
|
|
error.error.message
|
|
)));
|
|
}
|
|
JSONRPCMessage::Notification(notification) => {
|
|
pending_events.push(app_server_event_from_notification(notification));
|
|
}
|
|
JSONRPCMessage::Request(request) => {
|
|
let request_id = request.id.clone();
|
|
let method = request.method.clone();
|
|
match ServerRequest::try_from(request) {
|
|
Ok(request) => {
|
|
pending_events.push(AppServerEvent::ServerRequest(request));
|
|
}
|
|
Err(err) => {
|
|
warn!(%err, method, "rejecting unknown remote app-server request during initialize");
|
|
write_jsonrpc_message(
|
|
stream,
|
|
JSONRPCMessage::Error(JSONRPCError {
|
|
error: JSONRPCErrorError {
|
|
code: -32601,
|
|
message: format!(
|
|
"unsupported remote app-server request `{method}`"
|
|
),
|
|
data: None,
|
|
},
|
|
id: request_id,
|
|
}),
|
|
websocket_url,
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
}
|
|
JSONRPCMessage::Response(_) | JSONRPCMessage::Error(_) => {}
|
|
}
|
|
}
|
|
Some(Ok(Message::Binary(_)))
|
|
| Some(Ok(Message::Ping(_)))
|
|
| Some(Ok(Message::Pong(_)))
|
|
| Some(Ok(Message::Frame(_))) => {}
|
|
Some(Ok(Message::Close(frame))) => {
|
|
let reason = frame
|
|
.as_ref()
|
|
.map(|frame| frame.reason.to_string())
|
|
.filter(|reason| !reason.is_empty())
|
|
.unwrap_or_else(|| "connection closed during initialize".to_string());
|
|
break Err(IoError::new(
|
|
ErrorKind::ConnectionAborted,
|
|
format!(
|
|
"remote app server at `{websocket_url}` closed during initialize: {reason}"
|
|
),
|
|
));
|
|
}
|
|
Some(Err(err)) => {
|
|
break Err(IoError::other(format!(
|
|
"remote app server at `{websocket_url}` transport failed during initialize: {err}"
|
|
)));
|
|
}
|
|
None => {
|
|
break Err(IoError::new(
|
|
ErrorKind::UnexpectedEof,
|
|
format!("remote app server at `{websocket_url}` closed during initialize"),
|
|
));
|
|
}
|
|
}
|
|
}
|
|
})
|
|
.await
|
|
.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::TimedOut,
|
|
format!("timed out waiting for initialize response from `{websocket_url}`"),
|
|
)
|
|
})??;
|
|
|
|
write_jsonrpc_message(
|
|
stream,
|
|
JSONRPCMessage::Notification(jsonrpc_notification_from_client_notification(
|
|
ClientNotification::Initialized,
|
|
)),
|
|
websocket_url,
|
|
)
|
|
.await?;
|
|
|
|
Ok(pending_events)
|
|
}
|
|
|
|
fn app_server_event_from_notification(notification: JSONRPCNotification) -> AppServerEvent {
|
|
match ServerNotification::try_from(notification.clone()) {
|
|
Ok(notification) => AppServerEvent::ServerNotification(notification),
|
|
Err(_) => AppServerEvent::LegacyNotification(notification),
|
|
}
|
|
}
|
|
|
|
async fn deliver_event(
|
|
event_tx: &mpsc::Sender<AppServerEvent>,
|
|
skipped_events: &mut usize,
|
|
event: AppServerEvent,
|
|
stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
|
|
) -> IoResult<()> {
|
|
if *skipped_events > 0 {
|
|
if event_requires_delivery(&event) {
|
|
if event_tx
|
|
.send(AppServerEvent::Lagged {
|
|
skipped: *skipped_events,
|
|
})
|
|
.await
|
|
.is_err()
|
|
{
|
|
return Err(IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"remote app-server event consumer channel is closed",
|
|
));
|
|
}
|
|
*skipped_events = 0;
|
|
} else {
|
|
match event_tx.try_send(AppServerEvent::Lagged {
|
|
skipped: *skipped_events,
|
|
}) {
|
|
Ok(()) => *skipped_events = 0,
|
|
Err(mpsc::error::TrySendError::Full(_)) => {
|
|
*skipped_events = (*skipped_events).saturating_add(1);
|
|
reject_if_server_request_dropped(stream, &event).await?;
|
|
return Ok(());
|
|
}
|
|
Err(mpsc::error::TrySendError::Closed(_)) => {
|
|
return Err(IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"remote app-server event consumer channel is closed",
|
|
));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if event_requires_delivery(&event) {
|
|
event_tx.send(event).await.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"remote app-server event consumer channel is closed",
|
|
)
|
|
})?;
|
|
return Ok(());
|
|
}
|
|
|
|
match event_tx.try_send(event) {
|
|
Ok(()) => Ok(()),
|
|
Err(mpsc::error::TrySendError::Full(event)) => {
|
|
*skipped_events = (*skipped_events).saturating_add(1);
|
|
reject_if_server_request_dropped(stream, &event).await
|
|
}
|
|
Err(mpsc::error::TrySendError::Closed(_)) => Err(IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"remote app-server event consumer channel is closed",
|
|
)),
|
|
}
|
|
}
|
|
|
|
async fn reject_if_server_request_dropped(
|
|
stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
|
|
event: &AppServerEvent,
|
|
) -> IoResult<()> {
|
|
let AppServerEvent::ServerRequest(request) = event else {
|
|
return Ok(());
|
|
};
|
|
write_jsonrpc_message(
|
|
stream,
|
|
JSONRPCMessage::Error(JSONRPCError {
|
|
error: JSONRPCErrorError {
|
|
code: -32001,
|
|
message: "remote app-server event queue is full".to_string(),
|
|
data: None,
|
|
},
|
|
id: request.id().clone(),
|
|
}),
|
|
"<remote-app-server>",
|
|
)
|
|
.await
|
|
}
|
|
|
|
fn event_requires_delivery(event: &AppServerEvent) -> bool {
|
|
match event {
|
|
AppServerEvent::ServerNotification(ServerNotification::TurnCompleted(_)) => true,
|
|
AppServerEvent::LegacyNotification(notification) => matches!(
|
|
notification
|
|
.method
|
|
.strip_prefix("codex/event/")
|
|
.unwrap_or(¬ification.method),
|
|
"task_complete" | "turn_aborted" | "shutdown_complete"
|
|
),
|
|
AppServerEvent::Disconnected { .. } => true,
|
|
AppServerEvent::Lagged { .. }
|
|
| AppServerEvent::ServerNotification(_)
|
|
| AppServerEvent::ServerRequest(_) => false,
|
|
}
|
|
}
|
|
|
|
fn request_id_from_client_request(request: &ClientRequest) -> RequestId {
|
|
jsonrpc_request_from_client_request(request.clone()).id
|
|
}
|
|
|
|
fn jsonrpc_request_from_client_request(request: ClientRequest) -> JSONRPCRequest {
|
|
let value = match serde_json::to_value(request) {
|
|
Ok(value) => value,
|
|
Err(err) => panic!("client request should serialize: {err}"),
|
|
};
|
|
match serde_json::from_value(value) {
|
|
Ok(request) => request,
|
|
Err(err) => panic!("client request should encode as JSON-RPC request: {err}"),
|
|
}
|
|
}
|
|
|
|
fn jsonrpc_notification_from_client_notification(
|
|
notification: ClientNotification,
|
|
) -> JSONRPCNotification {
|
|
let value = match serde_json::to_value(notification) {
|
|
Ok(value) => value,
|
|
Err(err) => panic!("client notification should serialize: {err}"),
|
|
};
|
|
match serde_json::from_value(value) {
|
|
Ok(notification) => notification,
|
|
Err(err) => panic!("client notification should encode as JSON-RPC notification: {err}"),
|
|
}
|
|
}
|
|
|
|
async fn write_jsonrpc_message(
|
|
stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
|
|
message: JSONRPCMessage,
|
|
websocket_url: &str,
|
|
) -> IoResult<()> {
|
|
let payload = serde_json::to_string(&message).map_err(IoError::other)?;
|
|
stream
|
|
.send(Message::Text(payload.into()))
|
|
.await
|
|
.map_err(|err| {
|
|
IoError::other(format!(
|
|
"failed to write websocket message to `{websocket_url}`: {err}"
|
|
))
|
|
})
|
|
}
|