app-server-test-client websocket client and thread tools (#11755)

- add websocket endpoint mode with default ws://127.0.0.1:4222 while
keeping stdio codex-bin path compatibility
- add thread-resume (follow stream) and thread-list commands for manual
thread lifecycle testing
- quickstart docs
This commit is contained in:
Max Johnson 2026-02-13 09:34:35 -08:00 committed by GitHub
parent 38c442ca7f
commit f687b074ca
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 452 additions and 89 deletions

2
codex-rs/Cargo.lock generated
View file

@ -1420,6 +1420,8 @@ dependencies = [
"codex-protocol",
"serde",
"serde_json",
"tungstenite",
"url",
"uuid",
]

View file

@ -14,4 +14,6 @@ codex-app-server-protocol = { workspace = true }
codex-protocol = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tungstenite = { workspace = true }
url = { workspace = true }
uuid = { workspace = true, features = ["v4"] }

View file

@ -1,2 +1,19 @@
# App Server Test Client
Exercises simple `codex app-server` flows end-to-end, logging JSON-RPC messages sent between client and server to stdout.
Quickstart for running and hitting `codex app-server`.
## Quickstart
Run from `<reporoot>/codex-rs`.
```bash
# 1) Build debug codex binary
cargo build -p codex-cli --bin codex
# 2) Start websocket app-server in background
cargo run -p codex-app-server-test-client -- \
--codex-bin ./target/debug/codex \
serve --listen ws://127.0.0.1:4222 --kill
# 3) Call app-server (defaults to ws://127.0.0.1:4222)
cargo run -p codex-app-server-test-client -- model-list
```

View file

@ -1,8 +1,10 @@
use std::collections::VecDeque;
use std::fs;
use std::fs::OpenOptions;
use std::io::BufRead;
use std::io::BufReader;
use std::io::Write;
use std::net::TcpStream;
use std::path::Path;
use std::path::PathBuf;
use std::process::Child;
@ -53,6 +55,8 @@ use codex_app_server_protocol::SendUserMessageParams;
use codex_app_server_protocol::SendUserMessageResponse;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
@ -67,15 +71,28 @@ use codex_protocol::protocol::EventMsg;
use serde::Serialize;
use serde::de::DeserializeOwned;
use serde_json::Value;
use tungstenite::Message;
use tungstenite::WebSocket;
use tungstenite::connect;
use tungstenite::stream::MaybeTlsStream;
use url::Url;
use uuid::Uuid;
/// Minimal launcher that initializes the Codex app-server and logs the handshake.
#[derive(Parser)]
#[command(author = "Codex", version, about = "Bootstrap Codex app-server", long_about = None)]
struct Cli {
/// Path to the `codex` CLI binary.
#[arg(long, env = "CODEX_BIN", default_value = "codex")]
codex_bin: PathBuf,
/// Path to the `codex` CLI binary. When set, requests use stdio by
/// spawning `codex app-server` as a child process.
#[arg(long, env = "CODEX_BIN", global = true)]
codex_bin: Option<PathBuf>,
/// Existing websocket server URL to connect to.
///
/// If neither `--codex-bin` nor `--url` is provided, defaults to
/// `ws://127.0.0.1:4222`.
#[arg(long, env = "CODEX_APP_SERVER_URL", global = true)]
url: Option<String>,
/// Forwarded to the `codex` CLI as `--config key=value`. Repeatable.
///
@ -105,6 +122,18 @@ struct Cli {
#[derive(Subcommand)]
enum CliCommand {
/// Start `codex app-server` on a websocket endpoint in the background.
///
/// Logs are written to:
/// `/tmp/codex-app-server-test-client/`
Serve {
/// WebSocket listen URL passed to `codex app-server --listen`.
#[arg(long, default_value = "ws://127.0.0.1:4222")]
listen: String,
/// Kill any process listening on the same port before starting.
#[arg(long, default_value_t = false)]
kill: bool,
},
/// Send a user message through the Codex app-server.
SendMessage {
/// User message to send to Codex.
@ -122,6 +151,13 @@ enum CliCommand {
/// User message to send to Codex.
user_message: String,
},
/// Resume a V2 thread and continuously stream notifications/events.
///
/// This command does not auto-exit; stop it with SIGINT/SIGTERM/SIGKILL.
ThreadResume {
/// Existing thread id to resume.
thread_id: String,
},
/// Start a V2 turn that elicits an ExecCommand approval.
#[command(name = "trigger-cmd-approval")]
TriggerCmdApproval {
@ -151,11 +187,19 @@ enum CliCommand {
/// List the available models from the Codex app-server.
#[command(name = "model-list")]
ModelList,
/// List stored threads from the Codex app-server.
#[command(name = "thread-list")]
ThreadList {
/// Number of threads to return.
#[arg(long, default_value_t = 20)]
limit: u32,
},
}
pub fn run() -> Result<()> {
let Cli {
codex_bin,
url,
config_overrides,
dynamic_tools,
command,
@ -164,59 +208,222 @@ pub fn run() -> Result<()> {
let dynamic_tools = parse_dynamic_tools_arg(&dynamic_tools)?;
match command {
CliCommand::Serve { listen, kill } => {
ensure_dynamic_tools_unused(&dynamic_tools, "serve")?;
let codex_bin = codex_bin.unwrap_or_else(|| PathBuf::from("codex"));
serve(&codex_bin, &config_overrides, &listen, kill)
}
CliCommand::SendMessage { user_message } => {
ensure_dynamic_tools_unused(&dynamic_tools, "send-message")?;
send_message(&codex_bin, &config_overrides, user_message)
let endpoint = resolve_endpoint(codex_bin, url)?;
send_message(&endpoint, &config_overrides, user_message)
}
CliCommand::SendMessageV2 { user_message } => {
send_message_v2(&codex_bin, &config_overrides, user_message, &dynamic_tools)
let endpoint = resolve_endpoint(codex_bin, url)?;
send_message_v2_endpoint(&endpoint, &config_overrides, user_message, &dynamic_tools)
}
CliCommand::ResumeMessageV2 {
thread_id,
user_message,
} => resume_message_v2(
&codex_bin,
&config_overrides,
thread_id,
user_message,
&dynamic_tools,
),
} => {
let endpoint = resolve_endpoint(codex_bin, url)?;
resume_message_v2(
&endpoint,
&config_overrides,
thread_id,
user_message,
&dynamic_tools,
)
}
CliCommand::ThreadResume { thread_id } => {
ensure_dynamic_tools_unused(&dynamic_tools, "thread-resume")?;
let endpoint = resolve_endpoint(codex_bin, url)?;
thread_resume_follow(&endpoint, &config_overrides, thread_id)
}
CliCommand::TriggerCmdApproval { user_message } => {
trigger_cmd_approval(&codex_bin, &config_overrides, user_message, &dynamic_tools)
let endpoint = resolve_endpoint(codex_bin, url)?;
trigger_cmd_approval(&endpoint, &config_overrides, user_message, &dynamic_tools)
}
CliCommand::TriggerPatchApproval { user_message } => {
trigger_patch_approval(&codex_bin, &config_overrides, user_message, &dynamic_tools)
let endpoint = resolve_endpoint(codex_bin, url)?;
trigger_patch_approval(&endpoint, &config_overrides, user_message, &dynamic_tools)
}
CliCommand::NoTriggerCmdApproval => {
no_trigger_cmd_approval(&codex_bin, &config_overrides, &dynamic_tools)
let endpoint = resolve_endpoint(codex_bin, url)?;
no_trigger_cmd_approval(&endpoint, &config_overrides, &dynamic_tools)
}
CliCommand::SendFollowUpV2 {
first_message,
follow_up_message,
} => send_follow_up_v2(
&codex_bin,
&config_overrides,
first_message,
follow_up_message,
&dynamic_tools,
),
} => {
let endpoint = resolve_endpoint(codex_bin, url)?;
send_follow_up_v2(
&endpoint,
&config_overrides,
first_message,
follow_up_message,
&dynamic_tools,
)
}
CliCommand::TestLogin => {
ensure_dynamic_tools_unused(&dynamic_tools, "test-login")?;
test_login(&codex_bin, &config_overrides)
let endpoint = resolve_endpoint(codex_bin, url)?;
test_login(&endpoint, &config_overrides)
}
CliCommand::GetAccountRateLimits => {
ensure_dynamic_tools_unused(&dynamic_tools, "get-account-rate-limits")?;
get_account_rate_limits(&codex_bin, &config_overrides)
let endpoint = resolve_endpoint(codex_bin, url)?;
get_account_rate_limits(&endpoint, &config_overrides)
}
CliCommand::ModelList => {
ensure_dynamic_tools_unused(&dynamic_tools, "model-list")?;
model_list(&codex_bin, &config_overrides)
let endpoint = resolve_endpoint(codex_bin, url)?;
model_list(&endpoint, &config_overrides)
}
CliCommand::ThreadList { limit } => {
ensure_dynamic_tools_unused(&dynamic_tools, "thread-list")?;
let endpoint = resolve_endpoint(codex_bin, url)?;
thread_list(&endpoint, &config_overrides, limit)
}
}
}
fn send_message(codex_bin: &Path, config_overrides: &[String], user_message: String) -> Result<()> {
let mut client = CodexClient::spawn(codex_bin, config_overrides)?;
enum Endpoint {
SpawnCodex(PathBuf),
ConnectWs(String),
}
fn resolve_endpoint(codex_bin: Option<PathBuf>, url: Option<String>) -> Result<Endpoint> {
if codex_bin.is_some() && url.is_some() {
bail!("--codex-bin and --url are mutually exclusive");
}
if let Some(codex_bin) = codex_bin {
return Ok(Endpoint::SpawnCodex(codex_bin));
}
if let Some(url) = url {
return Ok(Endpoint::ConnectWs(url));
}
Ok(Endpoint::ConnectWs("ws://127.0.0.1:4222".to_string()))
}
fn serve(codex_bin: &Path, config_overrides: &[String], listen: &str, kill: bool) -> Result<()> {
let runtime_dir = PathBuf::from("/tmp/codex-app-server-test-client");
fs::create_dir_all(&runtime_dir)
.with_context(|| format!("failed to create runtime dir {}", runtime_dir.display()))?;
let log_path = runtime_dir.join("app-server.log");
if kill {
kill_listeners_on_same_port(listen)?;
}
let log_file = OpenOptions::new()
.create(true)
.append(true)
.open(&log_path)
.with_context(|| format!("failed to open log file {}", log_path.display()))?;
let log_file_stderr = log_file
.try_clone()
.with_context(|| format!("failed to clone log file handle {}", log_path.display()))?;
let mut cmdline = format!(
"tail -f /dev/null | RUST_BACKTRACE=full RUST_LOG=warn,codex_=trace {}",
shell_quote(&codex_bin.display().to_string())
);
for override_kv in config_overrides {
cmdline.push_str(&format!(" --config {}", shell_quote(override_kv)));
}
cmdline.push_str(&format!(" app-server --listen {}", shell_quote(listen)));
let child = Command::new("nohup")
.arg("sh")
.arg("-c")
.arg(cmdline)
.stdin(Stdio::null())
.stdout(Stdio::from(log_file))
.stderr(Stdio::from(log_file_stderr))
.spawn()
.with_context(|| format!("failed to start `{}` app-server", codex_bin.display()))?;
let pid = child.id();
println!("started codex app-server");
println!("listen: {listen}");
println!("pid: {pid} (launcher process)");
println!("log: {}", log_path.display());
Ok(())
}
fn kill_listeners_on_same_port(listen: &str) -> Result<()> {
let url = Url::parse(listen).with_context(|| format!("invalid --listen URL `{listen}`"))?;
let port = url
.port_or_known_default()
.with_context(|| format!("unable to infer port from --listen URL `{listen}`"))?;
let output = Command::new("lsof")
.arg("-nP")
.arg(format!("-tiTCP:{port}"))
.arg("-sTCP:LISTEN")
.output()
.with_context(|| format!("failed to run lsof for port {port}"))?;
if !output.status.success() {
return Ok(());
}
let pids: Vec<u32> = String::from_utf8_lossy(&output.stdout)
.lines()
.filter_map(|line| line.trim().parse::<u32>().ok())
.collect();
if pids.is_empty() {
return Ok(());
}
for pid in pids {
println!("killing listener pid {pid} on port {port}");
let pid_str = pid.to_string();
let term_status = Command::new("kill")
.arg(&pid_str)
.status()
.with_context(|| format!("failed to send SIGTERM to pid {pid}"))?;
if !term_status.success() {
continue;
}
}
thread::sleep(Duration::from_millis(300));
let output = Command::new("lsof")
.arg("-nP")
.arg(format!("-tiTCP:{port}"))
.arg("-sTCP:LISTEN")
.output()
.with_context(|| format!("failed to re-check listeners on port {port}"))?;
if !output.status.success() {
return Ok(());
}
let remaining: Vec<u32> = String::from_utf8_lossy(&output.stdout)
.lines()
.filter_map(|line| line.trim().parse::<u32>().ok())
.collect();
for pid in remaining {
println!("force killing remaining listener pid {pid} on port {port}");
let _ = Command::new("kill").arg("-9").arg(pid.to_string()).status();
}
Ok(())
}
fn shell_quote(input: &str) -> String {
format!("'{}'", input.replace('\'', "'\\''"))
}
fn send_message(
endpoint: &Endpoint,
config_overrides: &[String],
user_message: String,
) -> Result<()> {
let mut client = CodexClient::connect(endpoint, config_overrides)?;
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
@ -242,9 +449,19 @@ pub fn send_message_v2(
config_overrides: &[String],
user_message: String,
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
) -> Result<()> {
let endpoint = Endpoint::SpawnCodex(codex_bin.to_path_buf());
send_message_v2_endpoint(&endpoint, config_overrides, user_message, dynamic_tools)
}
fn send_message_v2_endpoint(
endpoint: &Endpoint,
config_overrides: &[String],
user_message: String,
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
) -> Result<()> {
send_message_v2_with_policies(
codex_bin,
endpoint,
config_overrides,
user_message,
None,
@ -254,7 +471,7 @@ pub fn send_message_v2(
}
fn resume_message_v2(
codex_bin: &Path,
endpoint: &Endpoint,
config_overrides: &[String],
thread_id: String,
user_message: String,
@ -262,7 +479,7 @@ fn resume_message_v2(
) -> Result<()> {
ensure_dynamic_tools_unused(dynamic_tools, "resume-message-v2")?;
let mut client = CodexClient::spawn(codex_bin, config_overrides)?;
let mut client = CodexClient::connect(endpoint, config_overrides)?;
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
@ -288,8 +505,28 @@ fn resume_message_v2(
Ok(())
}
fn thread_resume_follow(
endpoint: &Endpoint,
config_overrides: &[String],
thread_id: String,
) -> Result<()> {
let mut client = CodexClient::connect(endpoint, config_overrides)?;
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let resume_response = client.thread_resume(ThreadResumeParams {
thread_id,
..Default::default()
})?;
println!("< thread/resume response: {resume_response:?}");
println!("< streaming notifications until process is terminated");
client.stream_notifications_forever()
}
fn trigger_cmd_approval(
codex_bin: &Path,
endpoint: &Endpoint,
config_overrides: &[String],
user_message: Option<String>,
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
@ -298,7 +535,7 @@ fn trigger_cmd_approval(
"Run `touch /tmp/should-trigger-approval` so I can confirm the file exists.";
let message = user_message.unwrap_or_else(|| default_prompt.to_string());
send_message_v2_with_policies(
codex_bin,
endpoint,
config_overrides,
message,
Some(AskForApproval::OnRequest),
@ -310,7 +547,7 @@ fn trigger_cmd_approval(
}
fn trigger_patch_approval(
codex_bin: &Path,
endpoint: &Endpoint,
config_overrides: &[String],
user_message: Option<String>,
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
@ -319,7 +556,7 @@ fn trigger_patch_approval(
"Create a file named APPROVAL_DEMO.txt containing a short hello message using apply_patch.";
let message = user_message.unwrap_or_else(|| default_prompt.to_string());
send_message_v2_with_policies(
codex_bin,
endpoint,
config_overrides,
message,
Some(AskForApproval::OnRequest),
@ -331,13 +568,13 @@ fn trigger_patch_approval(
}
fn no_trigger_cmd_approval(
codex_bin: &Path,
endpoint: &Endpoint,
config_overrides: &[String],
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
) -> Result<()> {
let prompt = "Run `touch should_not_trigger_approval.txt`";
send_message_v2_with_policies(
codex_bin,
endpoint,
config_overrides,
prompt.to_string(),
None,
@ -347,14 +584,14 @@ fn no_trigger_cmd_approval(
}
fn send_message_v2_with_policies(
codex_bin: &Path,
endpoint: &Endpoint,
config_overrides: &[String],
user_message: String,
approval_policy: Option<AskForApproval>,
sandbox_policy: Option<SandboxPolicy>,
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
) -> Result<()> {
let mut client = CodexClient::spawn(codex_bin, config_overrides)?;
let mut client = CodexClient::connect(endpoint, config_overrides)?;
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
@ -385,13 +622,13 @@ fn send_message_v2_with_policies(
}
fn send_follow_up_v2(
codex_bin: &Path,
endpoint: &Endpoint,
config_overrides: &[String],
first_message: String,
follow_up_message: String,
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
) -> Result<()> {
let mut client = CodexClient::spawn(codex_bin, config_overrides)?;
let mut client = CodexClient::connect(endpoint, config_overrides)?;
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
@ -431,8 +668,8 @@ fn send_follow_up_v2(
Ok(())
}
fn test_login(codex_bin: &Path, config_overrides: &[String]) -> Result<()> {
let mut client = CodexClient::spawn(codex_bin, config_overrides)?;
fn test_login(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> {
let mut client = CodexClient::connect(endpoint, config_overrides)?;
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
@ -461,8 +698,8 @@ fn test_login(codex_bin: &Path, config_overrides: &[String]) -> Result<()> {
}
}
fn get_account_rate_limits(codex_bin: &Path, config_overrides: &[String]) -> Result<()> {
let mut client = CodexClient::spawn(codex_bin, config_overrides)?;
fn get_account_rate_limits(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> {
let mut client = CodexClient::connect(endpoint, config_overrides)?;
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
@ -473,8 +710,8 @@ fn get_account_rate_limits(codex_bin: &Path, config_overrides: &[String]) -> Res
Ok(())
}
fn model_list(codex_bin: &Path, config_overrides: &[String]) -> Result<()> {
let mut client = CodexClient::spawn(codex_bin, config_overrides)?;
fn model_list(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> {
let mut client = CodexClient::connect(endpoint, config_overrides)?;
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
@ -485,6 +722,26 @@ fn model_list(codex_bin: &Path, config_overrides: &[String]) -> Result<()> {
Ok(())
}
fn thread_list(endpoint: &Endpoint, config_overrides: &[String], limit: u32) -> Result<()> {
let mut client = CodexClient::connect(endpoint, config_overrides)?;
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let response = client.thread_list(ThreadListParams {
cursor: None,
limit: Some(limit),
sort_key: None,
model_providers: None,
source_kinds: None,
archived: None,
cwd: None,
})?;
println!("< thread/list response: {response:?}");
Ok(())
}
fn ensure_dynamic_tools_unused(
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
command: &str,
@ -519,15 +776,32 @@ fn parse_dynamic_tools_arg(dynamic_tools: &Option<String>) -> Result<Option<Vec<
Ok(Some(tools))
}
enum ClientTransport {
Stdio {
child: Child,
stdin: Option<ChildStdin>,
stdout: BufReader<ChildStdout>,
},
WebSocket {
url: String,
socket: Box<WebSocket<MaybeTlsStream<TcpStream>>>,
},
}
struct CodexClient {
child: Child,
stdin: Option<ChildStdin>,
stdout: BufReader<ChildStdout>,
transport: ClientTransport,
pending_notifications: VecDeque<JSONRPCNotification>,
}
impl CodexClient {
fn spawn(codex_bin: &Path, config_overrides: &[String]) -> Result<Self> {
fn connect(endpoint: &Endpoint, config_overrides: &[String]) -> Result<Self> {
match endpoint {
Endpoint::SpawnCodex(codex_bin) => Self::spawn_stdio(codex_bin, config_overrides),
Endpoint::ConnectWs(url) => Self::connect_websocket(url),
}
}
fn spawn_stdio(codex_bin: &Path, config_overrides: &[String]) -> Result<Self> {
let codex_bin_display = codex_bin.display();
let mut cmd = Command::new(codex_bin);
for override_kv in config_overrides {
@ -551,9 +825,27 @@ impl CodexClient {
.context("codex app-server stdout unavailable")?;
Ok(Self {
child: codex_app_server,
stdin: Some(stdin),
stdout: BufReader::new(stdout),
transport: ClientTransport::Stdio {
child: codex_app_server,
stdin: Some(stdin),
stdout: BufReader::new(stdout),
},
pending_notifications: VecDeque::new(),
})
}
fn connect_websocket(url: &str) -> Result<Self> {
let parsed = Url::parse(url).with_context(|| format!("invalid websocket URL `{url}`"))?;
let (socket, _response) = connect(parsed.as_str()).with_context(|| {
format!(
"failed to connect to websocket app-server at `{url}`; if no server is running, start one with `codex-app-server-test-client serve --listen {url}`"
)
})?;
Ok(Self {
transport: ClientTransport::WebSocket {
url: url.to_string(),
socket: Box::new(socket),
},
pending_notifications: VecDeque::new(),
})
}
@ -575,7 +867,16 @@ impl CodexClient {
},
};
self.send_request(request, request_id, "initialize")
let response: InitializeResponse = self.send_request(request, request_id, "initialize")?;
// Complete the initialize handshake.
let initialized = JSONRPCMessage::Notification(JSONRPCNotification {
method: "initialized".to_string(),
params: None,
});
self.write_jsonrpc_message(initialized)?;
Ok(response)
}
fn start_thread(&mut self) -> Result<NewConversationResponse> {
@ -701,6 +1002,16 @@ impl CodexClient {
self.send_request(request, request_id, "model/list")
}
fn thread_list(&mut self, params: ThreadListParams) -> Result<ThreadListResponse> {
let request_id = self.request_id();
let request = ClientRequest::ThreadList {
request_id: request_id.clone(),
params,
};
self.send_request(request, request_id, "thread/list")
}
fn stream_conversation(&mut self, conversation_id: &ThreadId) -> Result<()> {
loop {
let notification = self.next_notification()?;
@ -835,6 +1146,12 @@ impl CodexClient {
Ok(())
}
fn stream_notifications_forever(&mut self) -> Result<()> {
loop {
let _ = self.next_notification()?;
}
}
fn extract_event(
&self,
notification: JSONRPCNotification,
@ -882,17 +1199,7 @@ impl CodexClient {
let request_json = serde_json::to_string(request)?;
let request_pretty = serde_json::to_string_pretty(request)?;
print_multiline_with_prefix("> ", &request_pretty);
if let Some(stdin) = self.stdin.as_mut() {
writeln!(stdin, "{request_json}")?;
stdin
.flush()
.context("failed to flush request to codex app-server")?;
} else {
bail!("codex app-server stdin closed");
}
Ok(())
self.write_payload(&request_json)
}
fn wait_for_response<T>(&mut self, request_id: RequestId, method: &str) -> Result<T>
@ -947,17 +1254,8 @@ impl CodexClient {
fn read_jsonrpc_message(&mut self) -> Result<JSONRPCMessage> {
loop {
let mut response_line = String::new();
let bytes = self
.stdout
.read_line(&mut response_line)
.context("failed to read from codex app-server")?;
if bytes == 0 {
bail!("codex app-server closed stdout");
}
let trimmed = response_line.trim();
let raw = self.read_payload()?;
let trimmed = raw.trim();
if trimmed.is_empty() {
continue;
}
@ -1086,16 +1384,56 @@ impl CodexClient {
let payload = serde_json::to_string(&message)?;
let pretty = serde_json::to_string_pretty(&message)?;
print_multiline_with_prefix("> ", &pretty);
self.write_payload(&payload)
}
if let Some(stdin) = self.stdin.as_mut() {
writeln!(stdin, "{payload}")?;
stdin
.flush()
.context("failed to flush response to codex app-server")?;
return Ok(());
fn write_payload(&mut self, payload: &str) -> Result<()> {
match &mut self.transport {
ClientTransport::Stdio { stdin, .. } => {
if let Some(stdin) = stdin.as_mut() {
writeln!(stdin, "{payload}")?;
stdin
.flush()
.context("failed to flush payload to codex app-server")?;
return Ok(());
}
bail!("codex app-server stdin closed")
}
ClientTransport::WebSocket { socket, url } => {
socket
.send(Message::Text(payload.to_string().into()))
.with_context(|| format!("failed to write websocket message to `{url}`"))?;
Ok(())
}
}
}
bail!("codex app-server stdin closed")
fn read_payload(&mut self) -> Result<String> {
match &mut self.transport {
ClientTransport::Stdio { stdout, .. } => {
let mut response_line = String::new();
let bytes = stdout
.read_line(&mut response_line)
.context("failed to read from codex app-server")?;
if bytes == 0 {
bail!("codex app-server closed stdout");
}
Ok(response_line)
}
ClientTransport::WebSocket { socket, url } => loop {
let frame = socket
.read()
.with_context(|| format!("failed to read websocket message from `{url}`"))?;
match frame {
Message::Text(text) => return Ok(text.to_string()),
Message::Binary(_) | Message::Ping(_) | Message::Pong(_) => continue,
Message::Close(_) => {
bail!("websocket app-server at `{url}` closed the connection")
}
Message::Frame(_) => continue,
}
},
}
}
}
@ -1107,21 +1445,25 @@ fn print_multiline_with_prefix(prefix: &str, payload: &str) {
impl Drop for CodexClient {
fn drop(&mut self) {
let _ = self.stdin.take();
let ClientTransport::Stdio { child, stdin, .. } = &mut self.transport else {
return;
};
if let Ok(Some(status)) = self.child.try_wait() {
let _ = stdin.take();
if let Ok(Some(status)) = child.try_wait() {
println!("[codex app-server exited: {status}]");
return;
}
thread::sleep(Duration::from_millis(100));
if let Ok(Some(status)) = self.child.try_wait() {
if let Ok(Some(status)) = child.try_wait() {
println!("[codex app-server exited: {status}]");
return;
}
let _ = self.child.kill();
let _ = self.child.wait();
let _ = child.kill();
let _ = child.wait();
}
}