diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 5965204ce..a039c60d9 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2008,11 +2008,9 @@ name = "codex-exec-server" version = "0.0.0" dependencies = [ "anyhow", - "base64 0.22.1", "clap", "codex-app-server-protocol", "codex-utils-cargo-bin", - "codex-utils-pty", "futures", "pretty_assertions", "serde", diff --git a/codex-rs/exec-server/README.md b/codex-rs/exec-server/README.md index c4194fda4..3c71dfa19 100644 --- a/codex-rs/exec-server/README.md +++ b/codex-rs/exec-server/README.md @@ -24,12 +24,10 @@ the wire. The standalone binary supports: - `ws://IP:PORT` (default) -- `stdio://` Wire framing: - websocket: one JSON-RPC message per websocket text frame -- stdio: one newline-delimited JSON-RPC message per line on stdin/stdout ## Lifecycle @@ -43,8 +41,8 @@ Each connection follows this sequence: If the server receives any notification other than `initialized`, it replies with an error using request id `-1`. -If the stdio connection closes, the server terminates any remaining managed -processes before exiting. +If the websocket connection closes, the server terminates any remaining managed +processes for that client connection. ## API @@ -239,13 +237,13 @@ Typical error cases: The crate exports: - `ExecServerClient` -- `ExecServerLaunchCommand` -- `ExecServerProcess` - `ExecServerError` -- protocol structs such as `ExecParams`, `ExecResponse`, - `WriteParams`, `TerminateParams`, `ExecOutputDeltaNotification`, and - `ExecExitedNotification` -- `run_main()` for embedding the stdio server in a binary +- `ExecServerClientConnectOptions` +- `RemoteExecServerConnectArgs` +- protocol structs `InitializeParams` and `InitializeResponse` +- `DEFAULT_LISTEN_URL` and `ExecServerListenUrlParseError` +- `run_main_with_listen_url()` +- `run_main()` for embedding the websocket server in a binary ## Example session diff --git a/codex-rs/exec-server/src/bin/codex-exec-server.rs b/codex-rs/exec-server/src/bin/codex-exec-server.rs index 7bcb14190..82fa9ec00 100644 --- a/codex-rs/exec-server/src/bin/codex-exec-server.rs +++ b/codex-rs/exec-server/src/bin/codex-exec-server.rs @@ -1,20 +1,18 @@ use clap::Parser; -use codex_exec_server::ExecServerTransport; #[derive(Debug, Parser)] struct ExecServerArgs { - /// Transport endpoint URL. Supported values: `ws://IP:PORT` (default), - /// `stdio://`. + /// Transport endpoint URL. Supported values: `ws://IP:PORT` (default). #[arg( long = "listen", value_name = "URL", - default_value = ExecServerTransport::DEFAULT_LISTEN_URL + default_value = codex_exec_server::DEFAULT_LISTEN_URL )] - listen: ExecServerTransport, + listen: String, } #[tokio::main] async fn main() -> Result<(), Box> { let args = ExecServerArgs::parse(); - codex_exec_server::run_main_with_transport(args.listen).await + codex_exec_server::run_main_with_listen_url(&args.listen).await } diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index 9830771a0..4b4e69f24 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -1,8 +1,6 @@ use std::sync::Arc; use std::time::Duration; -use tokio::io::AsyncRead; -use tokio::io::AsyncWrite; use tokio::time::timeout; use tokio_tungstenite::connect_async; use tracing::warn; @@ -136,22 +134,6 @@ impl ExecServerClient { Ok(client) } - pub async fn connect_stdio( - stdin: W, - stdout: R, - options: ExecServerClientConnectOptions, - ) -> Result - where - R: AsyncRead + Unpin + Send + 'static, - W: AsyncWrite + Unpin + Send + 'static, - { - Self::connect( - JsonRpcConnection::from_stdio(stdout, stdin, "exec-server stdio".to_string()), - options, - ) - .await - } - pub async fn connect_websocket( args: RemoteExecServerConnectArgs, ) -> Result { diff --git a/codex-rs/exec-server/src/connection.rs b/codex-rs/exec-server/src/connection.rs index af03fc068..89f19560c 100644 --- a/codex-rs/exec-server/src/connection.rs +++ b/codex-rs/exec-server/src/connection.rs @@ -1,16 +1,21 @@ use codex_app_server_protocol::JSONRPCMessage; use futures::SinkExt; use futures::StreamExt; -use tokio::io::AsyncBufReadExt; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; -use tokio::io::AsyncWriteExt; -use tokio::io::BufReader; -use tokio::io::BufWriter; use tokio::sync::mpsc; use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::tungstenite::Message; +#[cfg(test)] +use tokio::io::AsyncBufReadExt; +#[cfg(test)] +use tokio::io::AsyncWriteExt; +#[cfg(test)] +use tokio::io::BufReader; +#[cfg(test)] +use tokio::io::BufWriter; + pub(crate) const CHANNEL_CAPACITY: usize = 128; #[derive(Debug)] @@ -27,6 +32,7 @@ pub(crate) struct JsonRpcConnection { } impl JsonRpcConnection { + #[cfg(test)] pub(crate) fn from_stdio(reader: R, writer: W, connection_label: String) -> Self where R: AsyncRead + Unpin + Send + 'static, @@ -256,6 +262,7 @@ async fn send_malformed_message( .await; } +#[cfg(test)] async fn write_jsonrpc_line_message( writer: &mut BufWriter, message: &JSONRPCMessage, diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index e204d6e08..b6b9c4137 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -1,7 +1,6 @@ mod client; mod client_api; mod connection; -mod local; mod protocol; mod rpc; mod server; @@ -10,12 +9,9 @@ pub use client::ExecServerClient; pub use client::ExecServerError; pub use client_api::ExecServerClientConnectOptions; pub use client_api::RemoteExecServerConnectArgs; -pub use local::ExecServerLaunchCommand; -pub use local::SpawnedExecServer; -pub use local::spawn_local_exec_server; pub use protocol::InitializeParams; pub use protocol::InitializeResponse; -pub use server::ExecServerTransport; -pub use server::ExecServerTransportParseError; +pub use server::DEFAULT_LISTEN_URL; +pub use server::ExecServerListenUrlParseError; pub use server::run_main; -pub use server::run_main_with_transport; +pub use server::run_main_with_listen_url; diff --git a/codex-rs/exec-server/src/local.rs b/codex-rs/exec-server/src/local.rs deleted file mode 100644 index e51c94394..000000000 --- a/codex-rs/exec-server/src/local.rs +++ /dev/null @@ -1,71 +0,0 @@ -use std::path::PathBuf; -use std::process::Stdio; -use std::sync::Mutex as StdMutex; - -use tokio::process::Child; -use tokio::process::Command; - -use crate::client::ExecServerClient; -use crate::client::ExecServerError; -use crate::client_api::ExecServerClientConnectOptions; - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct ExecServerLaunchCommand { - pub program: PathBuf, - pub args: Vec, -} - -pub struct SpawnedExecServer { - client: ExecServerClient, - child: StdMutex>, -} - -impl SpawnedExecServer { - pub fn client(&self) -> &ExecServerClient { - &self.client - } -} - -impl Drop for SpawnedExecServer { - fn drop(&mut self) { - if let Ok(mut child_guard) = self.child.lock() - && let Some(child) = child_guard.as_mut() - { - let _ = child.start_kill(); - } - } -} - -pub async fn spawn_local_exec_server( - command: ExecServerLaunchCommand, - options: ExecServerClientConnectOptions, -) -> Result { - let mut child = Command::new(&command.program); - child.args(&command.args); - child.args(["--listen", "stdio://"]); - child.stdin(Stdio::piped()); - child.stdout(Stdio::piped()); - child.stderr(Stdio::inherit()); - child.kill_on_drop(true); - - let mut child = child.spawn().map_err(ExecServerError::Spawn)?; - let stdin = child.stdin.take().ok_or_else(|| { - ExecServerError::Protocol("exec-server stdin was not captured".to_string()) - })?; - let stdout = child.stdout.take().ok_or_else(|| { - ExecServerError::Protocol("exec-server stdout was not captured".to_string()) - })?; - - let client = match ExecServerClient::connect_stdio(stdin, stdout, options).await { - Ok(client) => client, - Err(err) => { - let _ = child.start_kill(); - return Err(err); - } - }; - - Ok(SpawnedExecServer { - client, - child: StdMutex::new(Some(child)), - }) -} diff --git a/codex-rs/exec-server/src/server.rs b/codex-rs/exec-server/src/server.rs index 15ce8650f..af1e929cf 100644 --- a/codex-rs/exec-server/src/server.rs +++ b/codex-rs/exec-server/src/server.rs @@ -4,15 +4,15 @@ mod processor; mod transport; pub(crate) use handler::ExecServerHandler; -pub use transport::ExecServerTransport; -pub use transport::ExecServerTransportParseError; +pub use transport::DEFAULT_LISTEN_URL; +pub use transport::ExecServerListenUrlParseError; pub async fn run_main() -> Result<(), Box> { - run_main_with_transport(ExecServerTransport::Stdio).await + run_main_with_listen_url(DEFAULT_LISTEN_URL).await } -pub async fn run_main_with_transport( - transport: ExecServerTransport, +pub async fn run_main_with_listen_url( + listen_url: &str, ) -> Result<(), Box> { - transport::run_transport(transport).await + transport::run_transport(listen_url).await } diff --git a/codex-rs/exec-server/src/server/transport.rs b/codex-rs/exec-server/src/server/transport.rs index edbec7fa9..22b57a0b1 100644 --- a/codex-rs/exec-server/src/server/transport.rs +++ b/codex-rs/exec-server/src/server/transport.rs @@ -1,5 +1,4 @@ use std::net::SocketAddr; -use std::str::FromStr; use tokio::net::TcpListener; use tokio_tungstenite::accept_async; @@ -8,26 +7,22 @@ use tracing::warn; use crate::connection::JsonRpcConnection; use crate::server::processor::run_connection; -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum ExecServerTransport { - Stdio, - WebSocket { bind_address: SocketAddr }, -} +pub const DEFAULT_LISTEN_URL: &str = "ws://127.0.0.1:0"; #[derive(Debug, Clone, Eq, PartialEq)] -pub enum ExecServerTransportParseError { +pub enum ExecServerListenUrlParseError { UnsupportedListenUrl(String), InvalidWebSocketListenUrl(String), } -impl std::fmt::Display for ExecServerTransportParseError { +impl std::fmt::Display for ExecServerListenUrlParseError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - ExecServerTransportParseError::UnsupportedListenUrl(listen_url) => write!( + ExecServerListenUrlParseError::UnsupportedListenUrl(listen_url) => write!( f, - "unsupported --listen URL `{listen_url}`; expected `stdio://` or `ws://IP:PORT`" + "unsupported --listen URL `{listen_url}`; expected `ws://IP:PORT`" ), - ExecServerTransportParseError::InvalidWebSocketListenUrl(listen_url) => write!( + ExecServerListenUrlParseError::InvalidWebSocketListenUrl(listen_url) => write!( f, "invalid websocket --listen URL `{listen_url}`; expected `ws://IP:PORT`" ), @@ -35,54 +30,27 @@ impl std::fmt::Display for ExecServerTransportParseError { } } -impl std::error::Error for ExecServerTransportParseError {} +impl std::error::Error for ExecServerListenUrlParseError {} -impl ExecServerTransport { - pub const DEFAULT_LISTEN_URL: &str = "ws://127.0.0.1:0"; - - pub fn from_listen_url(listen_url: &str) -> Result { - if listen_url == "stdio://" { - return Ok(Self::Stdio); - } - - if let Some(socket_addr) = listen_url.strip_prefix("ws://") { - let bind_address = socket_addr.parse::().map_err(|_| { - ExecServerTransportParseError::InvalidWebSocketListenUrl(listen_url.to_string()) - })?; - return Ok(Self::WebSocket { bind_address }); - } - - Err(ExecServerTransportParseError::UnsupportedListenUrl( - listen_url.to_string(), - )) +pub(crate) fn parse_listen_url( + listen_url: &str, +) -> Result { + if let Some(socket_addr) = listen_url.strip_prefix("ws://") { + return socket_addr.parse::().map_err(|_| { + ExecServerListenUrlParseError::InvalidWebSocketListenUrl(listen_url.to_string()) + }); } -} -impl FromStr for ExecServerTransport { - type Err = ExecServerTransportParseError; - - fn from_str(s: &str) -> Result { - Self::from_listen_url(s) - } + Err(ExecServerListenUrlParseError::UnsupportedListenUrl( + listen_url.to_string(), + )) } pub(crate) async fn run_transport( - transport: ExecServerTransport, + listen_url: &str, ) -> Result<(), Box> { - match transport { - ExecServerTransport::Stdio => { - run_connection(JsonRpcConnection::from_stdio( - tokio::io::stdin(), - tokio::io::stdout(), - "exec-server stdio".to_string(), - )) - .await; - Ok(()) - } - ExecServerTransport::WebSocket { bind_address } => { - run_websocket_listener(bind_address).await - } - } + let bind_address = parse_listen_url(listen_url)?; + run_websocket_listener(bind_address).await } async fn run_websocket_listener( diff --git a/codex-rs/exec-server/src/server/transport_tests.rs b/codex-rs/exec-server/src/server/transport_tests.rs index bc440e2aa..b81e82727 100644 --- a/codex-rs/exec-server/src/server/transport_tests.rs +++ b/codex-rs/exec-server/src/server/transport_tests.rs @@ -1,41 +1,31 @@ use pretty_assertions::assert_eq; -use super::ExecServerTransport; +use super::DEFAULT_LISTEN_URL; +use super::parse_listen_url; #[test] -fn exec_server_transport_parses_default_websocket_listen_url() { - let transport = ExecServerTransport::from_listen_url(ExecServerTransport::DEFAULT_LISTEN_URL) - .expect("default listen URL should parse"); +fn parse_listen_url_accepts_default_websocket_url() { + let bind_address = + parse_listen_url(DEFAULT_LISTEN_URL).expect("default listen URL should parse"); assert_eq!( - transport, - ExecServerTransport::WebSocket { - bind_address: "127.0.0.1:0".parse().expect("valid socket address"), - } + bind_address, + "127.0.0.1:0".parse().expect("valid socket address") ); } #[test] -fn exec_server_transport_parses_stdio_listen_url() { - let transport = - ExecServerTransport::from_listen_url("stdio://").expect("stdio listen URL should parse"); - assert_eq!(transport, ExecServerTransport::Stdio); -} - -#[test] -fn exec_server_transport_parses_websocket_listen_url() { - let transport = ExecServerTransport::from_listen_url("ws://127.0.0.1:1234") - .expect("websocket listen URL should parse"); +fn parse_listen_url_accepts_websocket_url() { + let bind_address = + parse_listen_url("ws://127.0.0.1:1234").expect("websocket listen URL should parse"); assert_eq!( - transport, - ExecServerTransport::WebSocket { - bind_address: "127.0.0.1:1234".parse().expect("valid socket address"), - } + bind_address, + "127.0.0.1:1234".parse().expect("valid socket address") ); } #[test] -fn exec_server_transport_rejects_invalid_websocket_listen_url() { - let err = ExecServerTransport::from_listen_url("ws://localhost:1234") +fn parse_listen_url_rejects_invalid_websocket_url() { + let err = parse_listen_url("ws://localhost:1234") .expect_err("hostname bind address should be rejected"); assert_eq!( err.to_string(), @@ -44,11 +34,11 @@ fn exec_server_transport_rejects_invalid_websocket_listen_url() { } #[test] -fn exec_server_transport_rejects_unsupported_listen_url() { - let err = ExecServerTransport::from_listen_url("http://127.0.0.1:1234") - .expect_err("unsupported scheme should fail"); +fn parse_listen_url_rejects_unsupported_url() { + let err = + parse_listen_url("http://127.0.0.1:1234").expect_err("unsupported scheme should fail"); assert_eq!( err.to_string(), - "unsupported --listen URL `http://127.0.0.1:1234`; expected `stdio://` or `ws://IP:PORT`" + "unsupported --listen URL `http://127.0.0.1:1234`; expected `ws://IP:PORT`" ); } diff --git a/codex-rs/exec-server/tests/common/exec_server.rs b/codex-rs/exec-server/tests/common/exec_server.rs new file mode 100644 index 000000000..225e4e485 --- /dev/null +++ b/codex-rs/exec-server/tests/common/exec_server.rs @@ -0,0 +1,188 @@ +#![allow(dead_code)] + +use std::process::Stdio; +use std::time::Duration; + +use anyhow::anyhow; +use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCNotification; +use codex_app_server_protocol::JSONRPCRequest; +use codex_app_server_protocol::RequestId; +use codex_utils_cargo_bin::cargo_bin; +use futures::SinkExt; +use futures::StreamExt; +use tokio::process::Child; +use tokio::process::Command; +use tokio::time::Instant; +use tokio::time::sleep; +use tokio::time::timeout; +use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::Message; + +const CONNECT_TIMEOUT: Duration = Duration::from_secs(5); +const CONNECT_RETRY_INTERVAL: Duration = Duration::from_millis(25); +const EVENT_TIMEOUT: Duration = Duration::from_secs(5); + +pub(crate) struct ExecServerHarness { + child: Child, + websocket: tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >, + next_request_id: i64, +} + +impl Drop for ExecServerHarness { + fn drop(&mut self) { + let _ = self.child.start_kill(); + } +} + +pub(crate) async fn exec_server() -> anyhow::Result { + let binary = cargo_bin("codex-exec-server")?; + let websocket_url = reserve_websocket_url()?; + let mut child = Command::new(binary); + child.args(["--listen", &websocket_url]); + child.stdin(Stdio::null()); + child.stdout(Stdio::null()); + child.stderr(Stdio::inherit()); + let child = child.spawn()?; + + let (websocket, _) = connect_websocket_when_ready(&websocket_url).await?; + Ok(ExecServerHarness { + child, + websocket, + next_request_id: 1, + }) +} + +impl ExecServerHarness { + pub(crate) async fn send_request( + &mut self, + method: &str, + params: serde_json::Value, + ) -> anyhow::Result { + let id = RequestId::Integer(self.next_request_id); + self.next_request_id += 1; + self.send_message(JSONRPCMessage::Request(JSONRPCRequest { + id: id.clone(), + method: method.to_string(), + params: Some(params), + trace: None, + })) + .await?; + Ok(id) + } + + pub(crate) async fn send_notification( + &mut self, + method: &str, + params: serde_json::Value, + ) -> anyhow::Result<()> { + self.send_message(JSONRPCMessage::Notification(JSONRPCNotification { + method: method.to_string(), + params: Some(params), + })) + .await + } + + pub(crate) async fn send_raw_text(&mut self, text: &str) -> anyhow::Result<()> { + self.websocket + .send(Message::Text(text.to_string().into())) + .await?; + Ok(()) + } + + pub(crate) async fn next_event(&mut self) -> anyhow::Result { + self.next_event_with_timeout(EVENT_TIMEOUT).await + } + + pub(crate) async fn wait_for_event( + &mut self, + mut predicate: F, + ) -> anyhow::Result + where + F: FnMut(&JSONRPCMessage) -> bool, + { + let deadline = Instant::now() + EVENT_TIMEOUT; + loop { + let now = Instant::now(); + if now >= deadline { + return Err(anyhow!( + "timed out waiting for matching exec-server event after {EVENT_TIMEOUT:?}" + )); + } + let remaining = deadline.duration_since(now); + let event = self.next_event_with_timeout(remaining).await?; + if predicate(&event) { + return Ok(event); + } + } + } + + pub(crate) async fn shutdown(&mut self) -> anyhow::Result<()> { + self.child.start_kill()?; + Ok(()) + } + + async fn send_message(&mut self, message: JSONRPCMessage) -> anyhow::Result<()> { + let encoded = serde_json::to_string(&message)?; + self.websocket.send(Message::Text(encoded.into())).await?; + Ok(()) + } + + async fn next_event_with_timeout( + &mut self, + timeout_duration: Duration, + ) -> anyhow::Result { + loop { + let frame = timeout(timeout_duration, self.websocket.next()) + .await + .map_err(|_| anyhow!("timed out waiting for exec-server websocket event"))? + .ok_or_else(|| anyhow!("exec-server websocket closed"))??; + + match frame { + Message::Text(text) => { + return Ok(serde_json::from_str(text.as_ref())?); + } + Message::Binary(bytes) => { + return Ok(serde_json::from_slice(bytes.as_ref())?); + } + Message::Close(_) => return Err(anyhow!("exec-server websocket closed")), + Message::Ping(_) | Message::Pong(_) => {} + _ => {} + } + } + } +} + +fn reserve_websocket_url() -> anyhow::Result { + let listener = std::net::TcpListener::bind("127.0.0.1:0")?; + let addr = listener.local_addr()?; + drop(listener); + Ok(format!("ws://{addr}")) +} + +async fn connect_websocket_when_ready( + websocket_url: &str, +) -> anyhow::Result<( + tokio_tungstenite::WebSocketStream>, + tokio_tungstenite::tungstenite::handshake::client::Response, +)> { + let deadline = Instant::now() + CONNECT_TIMEOUT; + loop { + match connect_async(websocket_url).await { + Ok(websocket) => return Ok(websocket), + Err(err) + if Instant::now() < deadline + && matches!( + err, + tokio_tungstenite::tungstenite::Error::Io(ref io_err) + if io_err.kind() == std::io::ErrorKind::ConnectionRefused + ) => + { + sleep(CONNECT_RETRY_INTERVAL).await; + } + Err(err) => return Err(err.into()), + } + } +} diff --git a/codex-rs/exec-server/tests/common/mod.rs b/codex-rs/exec-server/tests/common/mod.rs new file mode 100644 index 000000000..81f5f7c1d --- /dev/null +++ b/codex-rs/exec-server/tests/common/mod.rs @@ -0,0 +1 @@ +pub(crate) mod exec_server; diff --git a/codex-rs/exec-server/tests/initialize.rs b/codex-rs/exec-server/tests/initialize.rs new file mode 100644 index 000000000..0e95c9f9a --- /dev/null +++ b/codex-rs/exec-server/tests/initialize.rs @@ -0,0 +1,34 @@ +#![cfg(unix)] + +mod common; + +use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCResponse; +use codex_exec_server::InitializeParams; +use codex_exec_server::InitializeResponse; +use common::exec_server::exec_server; +use pretty_assertions::assert_eq; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn exec_server_accepts_initialize() -> anyhow::Result<()> { + let mut server = exec_server().await?; + let initialize_id = server + .send_request( + "initialize", + serde_json::to_value(InitializeParams { + client_name: "exec-server-test".to_string(), + })?, + ) + .await?; + + let response = server.next_event().await?; + let JSONRPCMessage::Response(JSONRPCResponse { id, result }) = response else { + panic!("expected initialize response"); + }; + assert_eq!(id, initialize_id); + let initialize_response: InitializeResponse = serde_json::from_value(result)?; + assert_eq!(initialize_response, InitializeResponse {}); + + server.shutdown().await?; + Ok(()) +} diff --git a/codex-rs/exec-server/tests/process.rs b/codex-rs/exec-server/tests/process.rs new file mode 100644 index 000000000..a99a889ed --- /dev/null +++ b/codex-rs/exec-server/tests/process.rs @@ -0,0 +1,65 @@ +#![cfg(unix)] + +mod common; + +use codex_app_server_protocol::JSONRPCError; +use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCResponse; +use codex_exec_server::InitializeParams; +use common::exec_server::exec_server; +use pretty_assertions::assert_eq; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn exec_server_stubs_process_start_over_websocket() -> anyhow::Result<()> { + let mut server = exec_server().await?; + let initialize_id = server + .send_request( + "initialize", + serde_json::to_value(InitializeParams { + client_name: "exec-server-test".to_string(), + })?, + ) + .await?; + let _ = server + .wait_for_event(|event| { + matches!( + event, + JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &initialize_id + ) + }) + .await?; + + let process_start_id = server + .send_request( + "process/start", + serde_json::json!({ + "processId": "proc-1", + "argv": ["true"], + "cwd": std::env::current_dir()?, + "env": {}, + "tty": false, + "arg0": null + }), + ) + .await?; + let response = server + .wait_for_event(|event| { + matches!( + event, + JSONRPCMessage::Error(JSONRPCError { id, .. }) if id == &process_start_id + ) + }) + .await?; + let JSONRPCMessage::Error(JSONRPCError { id, error }) = response else { + panic!("expected process/start stub error"); + }; + assert_eq!(id, process_start_id); + assert_eq!(error.code, -32601); + assert_eq!( + error.message, + "exec-server stub does not implement `process/start` yet" + ); + + server.shutdown().await?; + Ok(()) +} diff --git a/codex-rs/exec-server/tests/stdio_smoke.rs b/codex-rs/exec-server/tests/stdio_smoke.rs deleted file mode 100644 index 240180efd..000000000 --- a/codex-rs/exec-server/tests/stdio_smoke.rs +++ /dev/null @@ -1,129 +0,0 @@ -#![cfg(unix)] - -use std::process::Stdio; -use std::time::Duration; - -use codex_app_server_protocol::JSONRPCMessage; -use codex_app_server_protocol::JSONRPCNotification; -use codex_app_server_protocol::JSONRPCRequest; -use codex_app_server_protocol::JSONRPCResponse; -use codex_app_server_protocol::RequestId; -use codex_exec_server::InitializeParams; -use codex_exec_server::InitializeResponse; -use codex_utils_cargo_bin::cargo_bin; -use pretty_assertions::assert_eq; -use tokio::io::AsyncBufReadExt; -use tokio::io::AsyncWriteExt; -use tokio::io::BufReader; -use tokio::process::Command; -use tokio::time::timeout; - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn exec_server_accepts_initialize_over_stdio() -> anyhow::Result<()> { - let binary = cargo_bin("codex-exec-server")?; - let mut child = Command::new(binary); - child.args(["--listen", "stdio://"]); - child.stdin(Stdio::piped()); - child.stdout(Stdio::piped()); - child.stderr(Stdio::inherit()); - let mut child = child.spawn()?; - - let mut stdin = child.stdin.take().expect("stdin"); - let stdout = child.stdout.take().expect("stdout"); - let mut stdout = BufReader::new(stdout).lines(); - - let initialize = JSONRPCMessage::Request(JSONRPCRequest { - id: RequestId::Integer(1), - method: "initialize".to_string(), - params: Some(serde_json::to_value(InitializeParams { - client_name: "exec-server-test".to_string(), - })?), - trace: None, - }); - stdin - .write_all(format!("{}\n", serde_json::to_string(&initialize)?).as_bytes()) - .await?; - - let response_line = timeout(Duration::from_secs(5), stdout.next_line()).await??; - let response_line = response_line.expect("response line"); - let response: JSONRPCMessage = serde_json::from_str(&response_line)?; - let JSONRPCMessage::Response(JSONRPCResponse { id, result }) = response else { - panic!("expected initialize response"); - }; - assert_eq!(id, RequestId::Integer(1)); - let initialize_response: InitializeResponse = serde_json::from_value(result)?; - assert_eq!(initialize_response, InitializeResponse {}); - - let initialized = JSONRPCMessage::Notification(JSONRPCNotification { - method: "initialized".to_string(), - params: Some(serde_json::json!({})), - }); - stdin - .write_all(format!("{}\n", serde_json::to_string(&initialized)?).as_bytes()) - .await?; - - child.start_kill()?; - Ok(()) -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn exec_server_stubs_process_start_over_stdio() -> anyhow::Result<()> { - let binary = cargo_bin("codex-exec-server")?; - let mut child = Command::new(binary); - child.args(["--listen", "stdio://"]); - child.stdin(Stdio::piped()); - child.stdout(Stdio::piped()); - child.stderr(Stdio::inherit()); - let mut child = child.spawn()?; - - let mut stdin = child.stdin.take().expect("stdin"); - let stdout = child.stdout.take().expect("stdout"); - let mut stdout = BufReader::new(stdout).lines(); - - let initialize = JSONRPCMessage::Request(JSONRPCRequest { - id: RequestId::Integer(1), - method: "initialize".to_string(), - params: Some(serde_json::to_value(InitializeParams { - client_name: "exec-server-test".to_string(), - })?), - trace: None, - }); - stdin - .write_all(format!("{}\n", serde_json::to_string(&initialize)?).as_bytes()) - .await?; - let _ = timeout(Duration::from_secs(5), stdout.next_line()).await??; - - let exec = JSONRPCMessage::Request(JSONRPCRequest { - id: RequestId::Integer(2), - method: "process/start".to_string(), - params: Some(serde_json::json!({ - "processId": "proc-1", - "argv": ["true"], - "cwd": std::env::current_dir()?, - "env": {}, - "tty": false, - "arg0": null - })), - trace: None, - }); - stdin - .write_all(format!("{}\n", serde_json::to_string(&exec)?).as_bytes()) - .await?; - - let response_line = timeout(Duration::from_secs(5), stdout.next_line()).await??; - let response_line = response_line.expect("exec response line"); - let response: JSONRPCMessage = serde_json::from_str(&response_line)?; - let JSONRPCMessage::Error(codex_app_server_protocol::JSONRPCError { id, error }) = response - else { - panic!("expected process/start stub error"); - }; - assert_eq!(id, RequestId::Integer(2)); - assert_eq!(error.code, -32601); - assert_eq!( - error.message, - "exec-server stub does not implement `process/start` yet" - ); - - child.start_kill()?; - Ok(()) -} diff --git a/codex-rs/exec-server/tests/websocket.rs b/codex-rs/exec-server/tests/websocket.rs new file mode 100644 index 000000000..f26efa520 --- /dev/null +++ b/codex-rs/exec-server/tests/websocket.rs @@ -0,0 +1,60 @@ +#![cfg(unix)] + +mod common; + +use codex_app_server_protocol::JSONRPCError; +use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCResponse; +use codex_exec_server::InitializeParams; +use codex_exec_server::InitializeResponse; +use common::exec_server::exec_server; +use pretty_assertions::assert_eq; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn exec_server_reports_malformed_websocket_json_and_keeps_running() -> anyhow::Result<()> { + let mut server = exec_server().await?; + server.send_raw_text("not-json").await?; + + let response = server + .wait_for_event(|event| matches!(event, JSONRPCMessage::Error(_))) + .await?; + let JSONRPCMessage::Error(JSONRPCError { id, error }) = response else { + panic!("expected malformed-message error response"); + }; + assert_eq!(id, codex_app_server_protocol::RequestId::Integer(-1)); + assert_eq!(error.code, -32600); + assert!( + error + .message + .starts_with("failed to parse websocket JSON-RPC message from exec-server websocket"), + "unexpected malformed-message error: {}", + error.message + ); + + let initialize_id = server + .send_request( + "initialize", + serde_json::to_value(InitializeParams { + client_name: "exec-server-test".to_string(), + })?, + ) + .await?; + + let response = server + .wait_for_event(|event| { + matches!( + event, + JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &initialize_id + ) + }) + .await?; + let JSONRPCMessage::Response(JSONRPCResponse { id, result }) = response else { + panic!("expected initialize response after malformed input"); + }; + assert_eq!(id, initialize_id); + let initialize_response: InitializeResponse = serde_json::from_value(result)?; + assert_eq!(initialize_response, InitializeResponse {}); + + server.shutdown().await?; + Ok(()) +} diff --git a/codex-rs/exec-server/tests/websocket_smoke.rs b/codex-rs/exec-server/tests/websocket_smoke.rs deleted file mode 100644 index 2a51a4d3a..000000000 --- a/codex-rs/exec-server/tests/websocket_smoke.rs +++ /dev/null @@ -1,229 +0,0 @@ -#![cfg(unix)] - -use std::process::Stdio; -use std::time::Duration; - -use codex_app_server_protocol::JSONRPCError; -use codex_app_server_protocol::JSONRPCMessage; -use codex_app_server_protocol::JSONRPCNotification; -use codex_app_server_protocol::JSONRPCRequest; -use codex_app_server_protocol::JSONRPCResponse; -use codex_app_server_protocol::RequestId; -use codex_exec_server::InitializeParams; -use codex_exec_server::InitializeResponse; -use codex_utils_cargo_bin::cargo_bin; -use pretty_assertions::assert_eq; -use tokio::process::Command; -use tokio_tungstenite::connect_async; -use tokio_tungstenite::tungstenite::Message; - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn exec_server_accepts_initialize_over_websocket() -> anyhow::Result<()> { - let binary = cargo_bin("codex-exec-server")?; - let websocket_url = reserve_websocket_url()?; - let mut child = Command::new(binary); - child.args(["--listen", &websocket_url]); - child.stdin(Stdio::null()); - child.stdout(Stdio::null()); - child.stderr(Stdio::inherit()); - let mut child = child.spawn()?; - - let (mut websocket, _) = connect_websocket_when_ready(&websocket_url).await?; - let initialize = JSONRPCMessage::Request(JSONRPCRequest { - id: RequestId::Integer(1), - method: "initialize".to_string(), - params: Some(serde_json::to_value(InitializeParams { - client_name: "exec-server-test".to_string(), - })?), - trace: None, - }); - futures::SinkExt::send( - &mut websocket, - Message::Text(serde_json::to_string(&initialize)?.into()), - ) - .await?; - - let Some(Ok(Message::Text(response_text))) = futures::StreamExt::next(&mut websocket).await - else { - panic!("expected initialize response"); - }; - let response: JSONRPCMessage = serde_json::from_str(response_text.as_ref())?; - let JSONRPCMessage::Response(JSONRPCResponse { id, result }) = response else { - panic!("expected initialize response"); - }; - assert_eq!(id, RequestId::Integer(1)); - let initialize_response: InitializeResponse = serde_json::from_value(result)?; - assert_eq!(initialize_response, InitializeResponse {}); - - let initialized = JSONRPCMessage::Notification(JSONRPCNotification { - method: "initialized".to_string(), - params: Some(serde_json::json!({})), - }); - futures::SinkExt::send( - &mut websocket, - Message::Text(serde_json::to_string(&initialized)?.into()), - ) - .await?; - - child.start_kill()?; - Ok(()) -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn exec_server_reports_malformed_websocket_json_and_keeps_running() -> anyhow::Result<()> { - let binary = cargo_bin("codex-exec-server")?; - let websocket_url = reserve_websocket_url()?; - let mut child = Command::new(binary); - child.args(["--listen", &websocket_url]); - child.stdin(Stdio::null()); - child.stdout(Stdio::null()); - child.stderr(Stdio::inherit()); - let mut child = child.spawn()?; - - let (mut websocket, _) = connect_websocket_when_ready(&websocket_url).await?; - futures::SinkExt::send(&mut websocket, Message::Text("not-json".to_string().into())).await?; - - let Some(Ok(Message::Text(response_text))) = futures::StreamExt::next(&mut websocket).await - else { - panic!("expected malformed-message error response"); - }; - let response: JSONRPCMessage = serde_json::from_str(response_text.as_ref())?; - let JSONRPCMessage::Error(JSONRPCError { id, error }) = response else { - panic!("expected malformed-message error response"); - }; - assert_eq!(id, RequestId::Integer(-1)); - assert_eq!(error.code, -32600); - assert!( - error - .message - .starts_with("failed to parse websocket JSON-RPC message from exec-server websocket"), - "unexpected malformed-message error: {}", - error.message - ); - - let initialize = JSONRPCMessage::Request(JSONRPCRequest { - id: RequestId::Integer(1), - method: "initialize".to_string(), - params: Some(serde_json::to_value(InitializeParams { - client_name: "exec-server-test".to_string(), - })?), - trace: None, - }); - futures::SinkExt::send( - &mut websocket, - Message::Text(serde_json::to_string(&initialize)?.into()), - ) - .await?; - - let Some(Ok(Message::Text(response_text))) = futures::StreamExt::next(&mut websocket).await - else { - panic!("expected initialize response after malformed input"); - }; - let response: JSONRPCMessage = serde_json::from_str(response_text.as_ref())?; - let JSONRPCMessage::Response(JSONRPCResponse { id, result }) = response else { - panic!("expected initialize response after malformed input"); - }; - assert_eq!(id, RequestId::Integer(1)); - let initialize_response: InitializeResponse = serde_json::from_value(result)?; - assert_eq!(initialize_response, InitializeResponse {}); - - child.start_kill()?; - Ok(()) -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn exec_server_stubs_process_start_over_websocket() -> anyhow::Result<()> { - let binary = cargo_bin("codex-exec-server")?; - let websocket_url = reserve_websocket_url()?; - let mut child = Command::new(binary); - child.args(["--listen", &websocket_url]); - child.stdin(Stdio::null()); - child.stdout(Stdio::null()); - child.stderr(Stdio::inherit()); - let mut child = child.spawn()?; - - let (mut websocket, _) = connect_websocket_when_ready(&websocket_url).await?; - let initialize = JSONRPCMessage::Request(JSONRPCRequest { - id: RequestId::Integer(1), - method: "initialize".to_string(), - params: Some(serde_json::to_value(InitializeParams { - client_name: "exec-server-test".to_string(), - })?), - trace: None, - }); - futures::SinkExt::send( - &mut websocket, - Message::Text(serde_json::to_string(&initialize)?.into()), - ) - .await?; - let _ = futures::StreamExt::next(&mut websocket).await; - - let exec = JSONRPCMessage::Request(JSONRPCRequest { - id: RequestId::Integer(2), - method: "process/start".to_string(), - params: Some(serde_json::json!({ - "processId": "proc-1", - "argv": ["true"], - "cwd": std::env::current_dir()?, - "env": {}, - "tty": false, - "arg0": null - })), - trace: None, - }); - futures::SinkExt::send( - &mut websocket, - Message::Text(serde_json::to_string(&exec)?.into()), - ) - .await?; - - let Some(Ok(Message::Text(response_text))) = futures::StreamExt::next(&mut websocket).await - else { - panic!("expected process/start error"); - }; - let response: JSONRPCMessage = serde_json::from_str(response_text.as_ref())?; - let JSONRPCMessage::Error(JSONRPCError { id, error }) = response else { - panic!("expected process/start stub error"); - }; - assert_eq!(id, RequestId::Integer(2)); - assert_eq!(error.code, -32601); - assert_eq!( - error.message, - "exec-server stub does not implement `process/start` yet" - ); - - child.start_kill()?; - Ok(()) -} - -fn reserve_websocket_url() -> anyhow::Result { - let listener = std::net::TcpListener::bind("127.0.0.1:0")?; - let addr = listener.local_addr()?; - drop(listener); - Ok(format!("ws://{addr}")) -} - -async fn connect_websocket_when_ready( - websocket_url: &str, -) -> anyhow::Result<( - tokio_tungstenite::WebSocketStream>, - tokio_tungstenite::tungstenite::handshake::client::Response, -)> { - let deadline = tokio::time::Instant::now() + Duration::from_secs(5); - loop { - match connect_async(websocket_url).await { - Ok(websocket) => return Ok(websocket), - Err(err) - if tokio::time::Instant::now() < deadline - && matches!( - err, - tokio_tungstenite::tungstenite::Error::Io(ref io_err) - if io_err.kind() == std::io::ErrorKind::ConnectionRefused - ) => - { - tokio::time::sleep(Duration::from_millis(25)).await; - } - Err(err) => return Err(err.into()), - } - } -}