diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 535396feb..b8c431de1 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1420,6 +1420,8 @@ dependencies = [ "codex-protocol", "serde", "serde_json", + "tungstenite", + "url", "uuid", ] diff --git a/codex-rs/app-server-test-client/Cargo.toml b/codex-rs/app-server-test-client/Cargo.toml index 25a881364..05c8938f0 100644 --- a/codex-rs/app-server-test-client/Cargo.toml +++ b/codex-rs/app-server-test-client/Cargo.toml @@ -14,4 +14,6 @@ codex-app-server-protocol = { workspace = true } codex-protocol = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } +tungstenite = { workspace = true } +url = { workspace = true } uuid = { workspace = true, features = ["v4"] } diff --git a/codex-rs/app-server-test-client/README.md b/codex-rs/app-server-test-client/README.md index ee75348c9..5ef6e2344 100644 --- a/codex-rs/app-server-test-client/README.md +++ b/codex-rs/app-server-test-client/README.md @@ -1,2 +1,19 @@ # App Server Test Client -Exercises simple `codex app-server` flows end-to-end, logging JSON-RPC messages sent between client and server to stdout. +Quickstart for running and hitting `codex app-server`. + +## Quickstart + +Run from `/codex-rs`. + +```bash +# 1) Build debug codex binary +cargo build -p codex-cli --bin codex + +# 2) Start websocket app-server in background +cargo run -p codex-app-server-test-client -- \ + --codex-bin ./target/debug/codex \ + serve --listen ws://127.0.0.1:4222 --kill + +# 3) Call app-server (defaults to ws://127.0.0.1:4222) +cargo run -p codex-app-server-test-client -- model-list +``` diff --git a/codex-rs/app-server-test-client/src/lib.rs b/codex-rs/app-server-test-client/src/lib.rs index 91012572a..7b55c30a3 100644 --- a/codex-rs/app-server-test-client/src/lib.rs +++ b/codex-rs/app-server-test-client/src/lib.rs @@ -1,8 +1,10 @@ use std::collections::VecDeque; use std::fs; +use std::fs::OpenOptions; use std::io::BufRead; use std::io::BufReader; use std::io::Write; +use std::net::TcpStream; use std::path::Path; use std::path::PathBuf; use std::process::Child; @@ -53,6 +55,8 @@ use codex_app_server_protocol::SendUserMessageParams; use codex_app_server_protocol::SendUserMessageResponse; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequest; +use codex_app_server_protocol::ThreadListParams; +use codex_app_server_protocol::ThreadListResponse; use codex_app_server_protocol::ThreadResumeParams; use codex_app_server_protocol::ThreadResumeResponse; use codex_app_server_protocol::ThreadStartParams; @@ -67,15 +71,28 @@ use codex_protocol::protocol::EventMsg; use serde::Serialize; use serde::de::DeserializeOwned; use serde_json::Value; +use tungstenite::Message; +use tungstenite::WebSocket; +use tungstenite::connect; +use tungstenite::stream::MaybeTlsStream; +use url::Url; use uuid::Uuid; /// Minimal launcher that initializes the Codex app-server and logs the handshake. #[derive(Parser)] #[command(author = "Codex", version, about = "Bootstrap Codex app-server", long_about = None)] struct Cli { - /// Path to the `codex` CLI binary. - #[arg(long, env = "CODEX_BIN", default_value = "codex")] - codex_bin: PathBuf, + /// Path to the `codex` CLI binary. When set, requests use stdio by + /// spawning `codex app-server` as a child process. + #[arg(long, env = "CODEX_BIN", global = true)] + codex_bin: Option, + + /// Existing websocket server URL to connect to. + /// + /// If neither `--codex-bin` nor `--url` is provided, defaults to + /// `ws://127.0.0.1:4222`. + #[arg(long, env = "CODEX_APP_SERVER_URL", global = true)] + url: Option, /// Forwarded to the `codex` CLI as `--config key=value`. Repeatable. /// @@ -105,6 +122,18 @@ struct Cli { #[derive(Subcommand)] enum CliCommand { + /// Start `codex app-server` on a websocket endpoint in the background. + /// + /// Logs are written to: + /// `/tmp/codex-app-server-test-client/` + Serve { + /// WebSocket listen URL passed to `codex app-server --listen`. + #[arg(long, default_value = "ws://127.0.0.1:4222")] + listen: String, + /// Kill any process listening on the same port before starting. + #[arg(long, default_value_t = false)] + kill: bool, + }, /// Send a user message through the Codex app-server. SendMessage { /// User message to send to Codex. @@ -122,6 +151,13 @@ enum CliCommand { /// User message to send to Codex. user_message: String, }, + /// Resume a V2 thread and continuously stream notifications/events. + /// + /// This command does not auto-exit; stop it with SIGINT/SIGTERM/SIGKILL. + ThreadResume { + /// Existing thread id to resume. + thread_id: String, + }, /// Start a V2 turn that elicits an ExecCommand approval. #[command(name = "trigger-cmd-approval")] TriggerCmdApproval { @@ -151,11 +187,19 @@ enum CliCommand { /// List the available models from the Codex app-server. #[command(name = "model-list")] ModelList, + /// List stored threads from the Codex app-server. + #[command(name = "thread-list")] + ThreadList { + /// Number of threads to return. + #[arg(long, default_value_t = 20)] + limit: u32, + }, } pub fn run() -> Result<()> { let Cli { codex_bin, + url, config_overrides, dynamic_tools, command, @@ -164,59 +208,222 @@ pub fn run() -> Result<()> { let dynamic_tools = parse_dynamic_tools_arg(&dynamic_tools)?; match command { + CliCommand::Serve { listen, kill } => { + ensure_dynamic_tools_unused(&dynamic_tools, "serve")?; + let codex_bin = codex_bin.unwrap_or_else(|| PathBuf::from("codex")); + serve(&codex_bin, &config_overrides, &listen, kill) + } CliCommand::SendMessage { user_message } => { ensure_dynamic_tools_unused(&dynamic_tools, "send-message")?; - send_message(&codex_bin, &config_overrides, user_message) + let endpoint = resolve_endpoint(codex_bin, url)?; + send_message(&endpoint, &config_overrides, user_message) } CliCommand::SendMessageV2 { user_message } => { - send_message_v2(&codex_bin, &config_overrides, user_message, &dynamic_tools) + let endpoint = resolve_endpoint(codex_bin, url)?; + send_message_v2_endpoint(&endpoint, &config_overrides, user_message, &dynamic_tools) } CliCommand::ResumeMessageV2 { thread_id, user_message, - } => resume_message_v2( - &codex_bin, - &config_overrides, - thread_id, - user_message, - &dynamic_tools, - ), + } => { + let endpoint = resolve_endpoint(codex_bin, url)?; + resume_message_v2( + &endpoint, + &config_overrides, + thread_id, + user_message, + &dynamic_tools, + ) + } + CliCommand::ThreadResume { thread_id } => { + ensure_dynamic_tools_unused(&dynamic_tools, "thread-resume")?; + let endpoint = resolve_endpoint(codex_bin, url)?; + thread_resume_follow(&endpoint, &config_overrides, thread_id) + } CliCommand::TriggerCmdApproval { user_message } => { - trigger_cmd_approval(&codex_bin, &config_overrides, user_message, &dynamic_tools) + let endpoint = resolve_endpoint(codex_bin, url)?; + trigger_cmd_approval(&endpoint, &config_overrides, user_message, &dynamic_tools) } CliCommand::TriggerPatchApproval { user_message } => { - trigger_patch_approval(&codex_bin, &config_overrides, user_message, &dynamic_tools) + let endpoint = resolve_endpoint(codex_bin, url)?; + trigger_patch_approval(&endpoint, &config_overrides, user_message, &dynamic_tools) } CliCommand::NoTriggerCmdApproval => { - no_trigger_cmd_approval(&codex_bin, &config_overrides, &dynamic_tools) + let endpoint = resolve_endpoint(codex_bin, url)?; + no_trigger_cmd_approval(&endpoint, &config_overrides, &dynamic_tools) } CliCommand::SendFollowUpV2 { first_message, follow_up_message, - } => send_follow_up_v2( - &codex_bin, - &config_overrides, - first_message, - follow_up_message, - &dynamic_tools, - ), + } => { + let endpoint = resolve_endpoint(codex_bin, url)?; + send_follow_up_v2( + &endpoint, + &config_overrides, + first_message, + follow_up_message, + &dynamic_tools, + ) + } CliCommand::TestLogin => { ensure_dynamic_tools_unused(&dynamic_tools, "test-login")?; - test_login(&codex_bin, &config_overrides) + let endpoint = resolve_endpoint(codex_bin, url)?; + test_login(&endpoint, &config_overrides) } CliCommand::GetAccountRateLimits => { ensure_dynamic_tools_unused(&dynamic_tools, "get-account-rate-limits")?; - get_account_rate_limits(&codex_bin, &config_overrides) + let endpoint = resolve_endpoint(codex_bin, url)?; + get_account_rate_limits(&endpoint, &config_overrides) } CliCommand::ModelList => { ensure_dynamic_tools_unused(&dynamic_tools, "model-list")?; - model_list(&codex_bin, &config_overrides) + let endpoint = resolve_endpoint(codex_bin, url)?; + model_list(&endpoint, &config_overrides) + } + CliCommand::ThreadList { limit } => { + ensure_dynamic_tools_unused(&dynamic_tools, "thread-list")?; + let endpoint = resolve_endpoint(codex_bin, url)?; + thread_list(&endpoint, &config_overrides, limit) } } } -fn send_message(codex_bin: &Path, config_overrides: &[String], user_message: String) -> Result<()> { - let mut client = CodexClient::spawn(codex_bin, config_overrides)?; +enum Endpoint { + SpawnCodex(PathBuf), + ConnectWs(String), +} + +fn resolve_endpoint(codex_bin: Option, url: Option) -> Result { + if codex_bin.is_some() && url.is_some() { + bail!("--codex-bin and --url are mutually exclusive"); + } + if let Some(codex_bin) = codex_bin { + return Ok(Endpoint::SpawnCodex(codex_bin)); + } + if let Some(url) = url { + return Ok(Endpoint::ConnectWs(url)); + } + Ok(Endpoint::ConnectWs("ws://127.0.0.1:4222".to_string())) +} + +fn serve(codex_bin: &Path, config_overrides: &[String], listen: &str, kill: bool) -> Result<()> { + let runtime_dir = PathBuf::from("/tmp/codex-app-server-test-client"); + fs::create_dir_all(&runtime_dir) + .with_context(|| format!("failed to create runtime dir {}", runtime_dir.display()))?; + let log_path = runtime_dir.join("app-server.log"); + if kill { + kill_listeners_on_same_port(listen)?; + } + + let log_file = OpenOptions::new() + .create(true) + .append(true) + .open(&log_path) + .with_context(|| format!("failed to open log file {}", log_path.display()))?; + let log_file_stderr = log_file + .try_clone() + .with_context(|| format!("failed to clone log file handle {}", log_path.display()))?; + + let mut cmdline = format!( + "tail -f /dev/null | RUST_BACKTRACE=full RUST_LOG=warn,codex_=trace {}", + shell_quote(&codex_bin.display().to_string()) + ); + for override_kv in config_overrides { + cmdline.push_str(&format!(" --config {}", shell_quote(override_kv))); + } + cmdline.push_str(&format!(" app-server --listen {}", shell_quote(listen))); + + let child = Command::new("nohup") + .arg("sh") + .arg("-c") + .arg(cmdline) + .stdin(Stdio::null()) + .stdout(Stdio::from(log_file)) + .stderr(Stdio::from(log_file_stderr)) + .spawn() + .with_context(|| format!("failed to start `{}` app-server", codex_bin.display()))?; + + let pid = child.id(); + + println!("started codex app-server"); + println!("listen: {listen}"); + println!("pid: {pid} (launcher process)"); + println!("log: {}", log_path.display()); + + Ok(()) +} + +fn kill_listeners_on_same_port(listen: &str) -> Result<()> { + let url = Url::parse(listen).with_context(|| format!("invalid --listen URL `{listen}`"))?; + let port = url + .port_or_known_default() + .with_context(|| format!("unable to infer port from --listen URL `{listen}`"))?; + + let output = Command::new("lsof") + .arg("-nP") + .arg(format!("-tiTCP:{port}")) + .arg("-sTCP:LISTEN") + .output() + .with_context(|| format!("failed to run lsof for port {port}"))?; + + if !output.status.success() { + return Ok(()); + } + + let pids: Vec = String::from_utf8_lossy(&output.stdout) + .lines() + .filter_map(|line| line.trim().parse::().ok()) + .collect(); + + if pids.is_empty() { + return Ok(()); + } + + for pid in pids { + println!("killing listener pid {pid} on port {port}"); + let pid_str = pid.to_string(); + let term_status = Command::new("kill") + .arg(&pid_str) + .status() + .with_context(|| format!("failed to send SIGTERM to pid {pid}"))?; + if !term_status.success() { + continue; + } + } + + thread::sleep(Duration::from_millis(300)); + + let output = Command::new("lsof") + .arg("-nP") + .arg(format!("-tiTCP:{port}")) + .arg("-sTCP:LISTEN") + .output() + .with_context(|| format!("failed to re-check listeners on port {port}"))?; + if !output.status.success() { + return Ok(()); + } + let remaining: Vec = String::from_utf8_lossy(&output.stdout) + .lines() + .filter_map(|line| line.trim().parse::().ok()) + .collect(); + for pid in remaining { + println!("force killing remaining listener pid {pid} on port {port}"); + let _ = Command::new("kill").arg("-9").arg(pid.to_string()).status(); + } + + Ok(()) +} + +fn shell_quote(input: &str) -> String { + format!("'{}'", input.replace('\'', "'\\''")) +} + +fn send_message( + endpoint: &Endpoint, + config_overrides: &[String], + user_message: String, +) -> Result<()> { + let mut client = CodexClient::connect(endpoint, config_overrides)?; let initialize = client.initialize()?; println!("< initialize response: {initialize:?}"); @@ -242,9 +449,19 @@ pub fn send_message_v2( config_overrides: &[String], user_message: String, dynamic_tools: &Option>, +) -> Result<()> { + let endpoint = Endpoint::SpawnCodex(codex_bin.to_path_buf()); + send_message_v2_endpoint(&endpoint, config_overrides, user_message, dynamic_tools) +} + +fn send_message_v2_endpoint( + endpoint: &Endpoint, + config_overrides: &[String], + user_message: String, + dynamic_tools: &Option>, ) -> Result<()> { send_message_v2_with_policies( - codex_bin, + endpoint, config_overrides, user_message, None, @@ -254,7 +471,7 @@ pub fn send_message_v2( } fn resume_message_v2( - codex_bin: &Path, + endpoint: &Endpoint, config_overrides: &[String], thread_id: String, user_message: String, @@ -262,7 +479,7 @@ fn resume_message_v2( ) -> Result<()> { ensure_dynamic_tools_unused(dynamic_tools, "resume-message-v2")?; - let mut client = CodexClient::spawn(codex_bin, config_overrides)?; + let mut client = CodexClient::connect(endpoint, config_overrides)?; let initialize = client.initialize()?; println!("< initialize response: {initialize:?}"); @@ -288,8 +505,28 @@ fn resume_message_v2( Ok(()) } +fn thread_resume_follow( + endpoint: &Endpoint, + config_overrides: &[String], + thread_id: String, +) -> Result<()> { + let mut client = CodexClient::connect(endpoint, config_overrides)?; + + let initialize = client.initialize()?; + println!("< initialize response: {initialize:?}"); + + let resume_response = client.thread_resume(ThreadResumeParams { + thread_id, + ..Default::default() + })?; + println!("< thread/resume response: {resume_response:?}"); + println!("< streaming notifications until process is terminated"); + + client.stream_notifications_forever() +} + fn trigger_cmd_approval( - codex_bin: &Path, + endpoint: &Endpoint, config_overrides: &[String], user_message: Option, dynamic_tools: &Option>, @@ -298,7 +535,7 @@ fn trigger_cmd_approval( "Run `touch /tmp/should-trigger-approval` so I can confirm the file exists."; let message = user_message.unwrap_or_else(|| default_prompt.to_string()); send_message_v2_with_policies( - codex_bin, + endpoint, config_overrides, message, Some(AskForApproval::OnRequest), @@ -310,7 +547,7 @@ fn trigger_cmd_approval( } fn trigger_patch_approval( - codex_bin: &Path, + endpoint: &Endpoint, config_overrides: &[String], user_message: Option, dynamic_tools: &Option>, @@ -319,7 +556,7 @@ fn trigger_patch_approval( "Create a file named APPROVAL_DEMO.txt containing a short hello message using apply_patch."; let message = user_message.unwrap_or_else(|| default_prompt.to_string()); send_message_v2_with_policies( - codex_bin, + endpoint, config_overrides, message, Some(AskForApproval::OnRequest), @@ -331,13 +568,13 @@ fn trigger_patch_approval( } fn no_trigger_cmd_approval( - codex_bin: &Path, + endpoint: &Endpoint, config_overrides: &[String], dynamic_tools: &Option>, ) -> Result<()> { let prompt = "Run `touch should_not_trigger_approval.txt`"; send_message_v2_with_policies( - codex_bin, + endpoint, config_overrides, prompt.to_string(), None, @@ -347,14 +584,14 @@ fn no_trigger_cmd_approval( } fn send_message_v2_with_policies( - codex_bin: &Path, + endpoint: &Endpoint, config_overrides: &[String], user_message: String, approval_policy: Option, sandbox_policy: Option, dynamic_tools: &Option>, ) -> Result<()> { - let mut client = CodexClient::spawn(codex_bin, config_overrides)?; + let mut client = CodexClient::connect(endpoint, config_overrides)?; let initialize = client.initialize()?; println!("< initialize response: {initialize:?}"); @@ -385,13 +622,13 @@ fn send_message_v2_with_policies( } fn send_follow_up_v2( - codex_bin: &Path, + endpoint: &Endpoint, config_overrides: &[String], first_message: String, follow_up_message: String, dynamic_tools: &Option>, ) -> Result<()> { - let mut client = CodexClient::spawn(codex_bin, config_overrides)?; + let mut client = CodexClient::connect(endpoint, config_overrides)?; let initialize = client.initialize()?; println!("< initialize response: {initialize:?}"); @@ -431,8 +668,8 @@ fn send_follow_up_v2( Ok(()) } -fn test_login(codex_bin: &Path, config_overrides: &[String]) -> Result<()> { - let mut client = CodexClient::spawn(codex_bin, config_overrides)?; +fn test_login(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> { + let mut client = CodexClient::connect(endpoint, config_overrides)?; let initialize = client.initialize()?; println!("< initialize response: {initialize:?}"); @@ -461,8 +698,8 @@ fn test_login(codex_bin: &Path, config_overrides: &[String]) -> Result<()> { } } -fn get_account_rate_limits(codex_bin: &Path, config_overrides: &[String]) -> Result<()> { - let mut client = CodexClient::spawn(codex_bin, config_overrides)?; +fn get_account_rate_limits(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> { + let mut client = CodexClient::connect(endpoint, config_overrides)?; let initialize = client.initialize()?; println!("< initialize response: {initialize:?}"); @@ -473,8 +710,8 @@ fn get_account_rate_limits(codex_bin: &Path, config_overrides: &[String]) -> Res Ok(()) } -fn model_list(codex_bin: &Path, config_overrides: &[String]) -> Result<()> { - let mut client = CodexClient::spawn(codex_bin, config_overrides)?; +fn model_list(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> { + let mut client = CodexClient::connect(endpoint, config_overrides)?; let initialize = client.initialize()?; println!("< initialize response: {initialize:?}"); @@ -485,6 +722,26 @@ fn model_list(codex_bin: &Path, config_overrides: &[String]) -> Result<()> { Ok(()) } +fn thread_list(endpoint: &Endpoint, config_overrides: &[String], limit: u32) -> Result<()> { + let mut client = CodexClient::connect(endpoint, config_overrides)?; + + let initialize = client.initialize()?; + println!("< initialize response: {initialize:?}"); + + let response = client.thread_list(ThreadListParams { + cursor: None, + limit: Some(limit), + sort_key: None, + model_providers: None, + source_kinds: None, + archived: None, + cwd: None, + })?; + println!("< thread/list response: {response:?}"); + + Ok(()) +} + fn ensure_dynamic_tools_unused( dynamic_tools: &Option>, command: &str, @@ -519,15 +776,32 @@ fn parse_dynamic_tools_arg(dynamic_tools: &Option) -> Result, + stdout: BufReader, + }, + WebSocket { + url: String, + socket: Box>>, + }, +} + struct CodexClient { - child: Child, - stdin: Option, - stdout: BufReader, + transport: ClientTransport, pending_notifications: VecDeque, } impl CodexClient { - fn spawn(codex_bin: &Path, config_overrides: &[String]) -> Result { + fn connect(endpoint: &Endpoint, config_overrides: &[String]) -> Result { + match endpoint { + Endpoint::SpawnCodex(codex_bin) => Self::spawn_stdio(codex_bin, config_overrides), + Endpoint::ConnectWs(url) => Self::connect_websocket(url), + } + } + + fn spawn_stdio(codex_bin: &Path, config_overrides: &[String]) -> Result { let codex_bin_display = codex_bin.display(); let mut cmd = Command::new(codex_bin); for override_kv in config_overrides { @@ -551,9 +825,27 @@ impl CodexClient { .context("codex app-server stdout unavailable")?; Ok(Self { - child: codex_app_server, - stdin: Some(stdin), - stdout: BufReader::new(stdout), + transport: ClientTransport::Stdio { + child: codex_app_server, + stdin: Some(stdin), + stdout: BufReader::new(stdout), + }, + pending_notifications: VecDeque::new(), + }) + } + + fn connect_websocket(url: &str) -> Result { + let parsed = Url::parse(url).with_context(|| format!("invalid websocket URL `{url}`"))?; + let (socket, _response) = connect(parsed.as_str()).with_context(|| { + format!( + "failed to connect to websocket app-server at `{url}`; if no server is running, start one with `codex-app-server-test-client serve --listen {url}`" + ) + })?; + Ok(Self { + transport: ClientTransport::WebSocket { + url: url.to_string(), + socket: Box::new(socket), + }, pending_notifications: VecDeque::new(), }) } @@ -575,7 +867,16 @@ impl CodexClient { }, }; - self.send_request(request, request_id, "initialize") + let response: InitializeResponse = self.send_request(request, request_id, "initialize")?; + + // Complete the initialize handshake. + let initialized = JSONRPCMessage::Notification(JSONRPCNotification { + method: "initialized".to_string(), + params: None, + }); + self.write_jsonrpc_message(initialized)?; + + Ok(response) } fn start_thread(&mut self) -> Result { @@ -701,6 +1002,16 @@ impl CodexClient { self.send_request(request, request_id, "model/list") } + fn thread_list(&mut self, params: ThreadListParams) -> Result { + let request_id = self.request_id(); + let request = ClientRequest::ThreadList { + request_id: request_id.clone(), + params, + }; + + self.send_request(request, request_id, "thread/list") + } + fn stream_conversation(&mut self, conversation_id: &ThreadId) -> Result<()> { loop { let notification = self.next_notification()?; @@ -835,6 +1146,12 @@ impl CodexClient { Ok(()) } + fn stream_notifications_forever(&mut self) -> Result<()> { + loop { + let _ = self.next_notification()?; + } + } + fn extract_event( &self, notification: JSONRPCNotification, @@ -882,17 +1199,7 @@ impl CodexClient { let request_json = serde_json::to_string(request)?; let request_pretty = serde_json::to_string_pretty(request)?; print_multiline_with_prefix("> ", &request_pretty); - - if let Some(stdin) = self.stdin.as_mut() { - writeln!(stdin, "{request_json}")?; - stdin - .flush() - .context("failed to flush request to codex app-server")?; - } else { - bail!("codex app-server stdin closed"); - } - - Ok(()) + self.write_payload(&request_json) } fn wait_for_response(&mut self, request_id: RequestId, method: &str) -> Result @@ -947,17 +1254,8 @@ impl CodexClient { fn read_jsonrpc_message(&mut self) -> Result { loop { - let mut response_line = String::new(); - let bytes = self - .stdout - .read_line(&mut response_line) - .context("failed to read from codex app-server")?; - - if bytes == 0 { - bail!("codex app-server closed stdout"); - } - - let trimmed = response_line.trim(); + let raw = self.read_payload()?; + let trimmed = raw.trim(); if trimmed.is_empty() { continue; } @@ -1086,16 +1384,56 @@ impl CodexClient { let payload = serde_json::to_string(&message)?; let pretty = serde_json::to_string_pretty(&message)?; print_multiline_with_prefix("> ", &pretty); + self.write_payload(&payload) + } - if let Some(stdin) = self.stdin.as_mut() { - writeln!(stdin, "{payload}")?; - stdin - .flush() - .context("failed to flush response to codex app-server")?; - return Ok(()); + fn write_payload(&mut self, payload: &str) -> Result<()> { + match &mut self.transport { + ClientTransport::Stdio { stdin, .. } => { + if let Some(stdin) = stdin.as_mut() { + writeln!(stdin, "{payload}")?; + stdin + .flush() + .context("failed to flush payload to codex app-server")?; + return Ok(()); + } + bail!("codex app-server stdin closed") + } + ClientTransport::WebSocket { socket, url } => { + socket + .send(Message::Text(payload.to_string().into())) + .with_context(|| format!("failed to write websocket message to `{url}`"))?; + Ok(()) + } } + } - bail!("codex app-server stdin closed") + fn read_payload(&mut self) -> Result { + match &mut self.transport { + ClientTransport::Stdio { stdout, .. } => { + let mut response_line = String::new(); + let bytes = stdout + .read_line(&mut response_line) + .context("failed to read from codex app-server")?; + if bytes == 0 { + bail!("codex app-server closed stdout"); + } + Ok(response_line) + } + ClientTransport::WebSocket { socket, url } => loop { + let frame = socket + .read() + .with_context(|| format!("failed to read websocket message from `{url}`"))?; + match frame { + Message::Text(text) => return Ok(text.to_string()), + Message::Binary(_) | Message::Ping(_) | Message::Pong(_) => continue, + Message::Close(_) => { + bail!("websocket app-server at `{url}` closed the connection") + } + Message::Frame(_) => continue, + } + }, + } } } @@ -1107,21 +1445,25 @@ fn print_multiline_with_prefix(prefix: &str, payload: &str) { impl Drop for CodexClient { fn drop(&mut self) { - let _ = self.stdin.take(); + let ClientTransport::Stdio { child, stdin, .. } = &mut self.transport else { + return; + }; - if let Ok(Some(status)) = self.child.try_wait() { + let _ = stdin.take(); + + if let Ok(Some(status)) = child.try_wait() { println!("[codex app-server exited: {status}]"); return; } thread::sleep(Duration::from_millis(100)); - if let Ok(Some(status)) = self.child.try_wait() { + if let Ok(Some(status)) = child.try_wait() { println!("[codex app-server exited: {status}]"); return; } - let _ = self.child.kill(); - let _ = self.child.wait(); + let _ = child.kill(); + let _ = child.wait(); } }