chore: add small debug client (#8894)

Small debug client, do not use in production
This commit is contained in:
jif-oai 2026-01-08 13:40:14 +00:00 committed by GitHub
parent be212db0c8
commit 0318f30ed8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 1413 additions and 0 deletions

12
codex-rs/Cargo.lock generated
View file

@ -1355,6 +1355,18 @@ dependencies = [
"zstd",
]
[[package]]
name = "codex-debug-client"
version = "0.0.0"
dependencies = [
"anyhow",
"clap",
"codex-app-server-protocol",
"pretty_assertions",
"serde",
"serde_json",
]
[[package]]
name = "codex-exec"
version = "0.0.0"

View file

@ -6,6 +6,7 @@ members = [
"app-server",
"app-server-protocol",
"app-server-test-client",
"debug-client",
"apply-patch",
"arg0",
"feedback",

View file

@ -0,0 +1,15 @@
[package]
name = "codex-debug-client"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
anyhow.workspace = true
clap = { workspace = true, features = ["derive"] }
codex-app-server-protocol.workspace = true
serde.workspace = true
serde_json.workspace = true
[dev-dependencies]
pretty_assertions.workspace = true

View file

@ -0,0 +1,57 @@
WARNING: this code is mainly generated by Codex and should not be used in production
# codex-debug-client
A tiny interactive client for `codex app-server` (protocol v2 only). It prints
all JSON-RPC lines from the server and lets you send new turns as you type.
## Usage
Start the app-server client (it will spawn `codex app-server` itself):
```
cargo run -p codex-debug-client -- \
--codex-bin codex \
--approval-policy on-request
```
You can resume a specific thread:
```
cargo run -p codex-debug-client -- --thread-id thr_123
```
### CLI flags
- `--codex-bin <path>`: path to the `codex` binary (default: `codex`).
- `-c, --config key=value`: pass through `--config` overrides to `codex`.
- `--thread-id <id>`: resume a thread instead of starting a new one.
- `--approval-policy <policy>`: `untrusted`, `on-failure`, `on-request`, `never`.
- `--auto-approve`: auto-approve command/file-change approvals (default: decline).
- `--final-only`: only show completed assistant messages and tool items.
- `--model <name>`: optional model override for thread start/resume.
- `--model-provider <name>`: optional provider override.
- `--cwd <path>`: optional working directory override.
## Interactive commands
Type a line to send it as a new turn. Commands are prefixed with `:`:
- `:help` show help
- `:new` start a new thread
- `:resume <thread-id>` resume a thread
- `:use <thread-id>` switch active thread without resuming
- `:refresh-thread` list available threads
- `:quit` exit
The prompt shows the active thread id. Client messages (help, errors, approvals)
print to stderr; raw server JSON prints to stdout so you can pipe/record it
unless `--final-only` is set.
## Notes
- The client performs the required initialize/initialized handshake.
- It prints every server notification and response line as it arrives.
- Approvals for `item/commandExecution/requestApproval` and
`item/fileChange/requestApproval` are auto-responded to with decline unless
`--auto-approve` is set.

View file

@ -0,0 +1,398 @@
use std::io::BufRead;
use std::io::BufReader;
use std::io::Write;
use std::process::Child;
use std::process::ChildStdin;
use std::process::ChildStdout;
use std::process::Command;
use std::process::Stdio;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
use std::sync::mpsc::Sender;
use anyhow::Context;
use anyhow::Result;
use codex_app_server_protocol::AskForApproval;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::CommandExecutionApprovalDecision;
use codex_app_server_protocol::FileChangeApprovalDecision;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::UserInput;
use serde::Serialize;
use crate::output::Output;
use crate::reader::start_reader;
use crate::state::PendingRequest;
use crate::state::ReaderEvent;
use crate::state::State;
pub struct AppServerClient {
child: Child,
stdin: Arc<Mutex<Option<ChildStdin>>>,
stdout: Option<BufReader<ChildStdout>>,
next_request_id: AtomicI64,
state: Arc<Mutex<State>>,
output: Output,
filtered_output: bool,
}
impl AppServerClient {
pub fn spawn(
codex_bin: &str,
config_overrides: &[String],
output: Output,
filtered_output: bool,
) -> Result<Self> {
let mut cmd = Command::new(codex_bin);
for override_kv in config_overrides {
cmd.arg("--config").arg(override_kv);
}
let mut child = cmd
.arg("app-server")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
.with_context(|| format!("failed to start `{codex_bin}` app-server"))?;
let stdin = child
.stdin
.take()
.context("codex app-server stdin unavailable")?;
let stdout = child
.stdout
.take()
.context("codex app-server stdout unavailable")?;
Ok(Self {
child,
stdin: Arc::new(Mutex::new(Some(stdin))),
stdout: Some(BufReader::new(stdout)),
next_request_id: AtomicI64::new(1),
state: Arc::new(Mutex::new(State::default())),
output,
filtered_output,
})
}
pub fn initialize(&mut self) -> Result<()> {
let request_id = self.next_request_id();
let request = ClientRequest::Initialize {
request_id: request_id.clone(),
params: codex_app_server_protocol::InitializeParams {
client_info: ClientInfo {
name: "debug-client".to_string(),
title: Some("Debug Client".to_string()),
version: env!("CARGO_PKG_VERSION").to_string(),
},
},
};
self.send(&request)?;
let response = self.read_until_response(&request_id)?;
let _parsed: codex_app_server_protocol::InitializeResponse =
serde_json::from_value(response.result).context("decode initialize response")?;
let initialized = ClientNotification::Initialized;
self.send(&initialized)?;
Ok(())
}
pub fn start_thread(&mut self, params: ThreadStartParams) -> Result<String> {
let request_id = self.next_request_id();
let request = ClientRequest::ThreadStart {
request_id: request_id.clone(),
params,
};
self.send(&request)?;
let response = self.read_until_response(&request_id)?;
let parsed: ThreadStartResponse =
serde_json::from_value(response.result).context("decode thread/start response")?;
let thread_id = parsed.thread.id;
self.set_thread_id(thread_id.clone());
Ok(thread_id)
}
pub fn resume_thread(&mut self, params: ThreadResumeParams) -> Result<String> {
let request_id = self.next_request_id();
let request = ClientRequest::ThreadResume {
request_id: request_id.clone(),
params,
};
self.send(&request)?;
let response = self.read_until_response(&request_id)?;
let parsed: ThreadResumeResponse =
serde_json::from_value(response.result).context("decode thread/resume response")?;
let thread_id = parsed.thread.id;
self.set_thread_id(thread_id.clone());
Ok(thread_id)
}
pub fn request_thread_start(&self, params: ThreadStartParams) -> Result<RequestId> {
let request_id = self.next_request_id();
self.track_pending(request_id.clone(), PendingRequest::Start);
let request = ClientRequest::ThreadStart {
request_id: request_id.clone(),
params,
};
self.send(&request)?;
Ok(request_id)
}
pub fn request_thread_resume(&self, params: ThreadResumeParams) -> Result<RequestId> {
let request_id = self.next_request_id();
self.track_pending(request_id.clone(), PendingRequest::Resume);
let request = ClientRequest::ThreadResume {
request_id: request_id.clone(),
params,
};
self.send(&request)?;
Ok(request_id)
}
pub fn request_thread_list(&self, cursor: Option<String>) -> Result<RequestId> {
let request_id = self.next_request_id();
self.track_pending(request_id.clone(), PendingRequest::List);
let request = ClientRequest::ThreadList {
request_id: request_id.clone(),
params: ThreadListParams {
cursor,
limit: None,
model_providers: None,
},
};
self.send(&request)?;
Ok(request_id)
}
pub fn send_turn(&self, thread_id: &str, text: String) -> Result<RequestId> {
let request_id = self.next_request_id();
let request = ClientRequest::TurnStart {
request_id: request_id.clone(),
params: TurnStartParams {
thread_id: thread_id.to_string(),
input: vec![UserInput::Text { text }],
..Default::default()
},
};
self.send(&request)?;
Ok(request_id)
}
pub fn start_reader(
&mut self,
events: Sender<ReaderEvent>,
auto_approve: bool,
filtered_output: bool,
) -> Result<()> {
let stdout = self.stdout.take().context("reader already started")?;
start_reader(
stdout,
Arc::clone(&self.stdin),
Arc::clone(&self.state),
events,
self.output.clone(),
auto_approve,
filtered_output,
);
Ok(())
}
pub fn thread_id(&self) -> Option<String> {
let state = self.state.lock().expect("state lock poisoned");
state.thread_id.clone()
}
pub fn set_thread_id(&self, thread_id: String) {
let mut state = self.state.lock().expect("state lock poisoned");
state.thread_id = Some(thread_id);
self.remember_thread_locked(&mut state);
}
pub fn use_thread(&self, thread_id: String) -> bool {
let mut state = self.state.lock().expect("state lock poisoned");
let known = state.known_threads.iter().any(|id| id == &thread_id);
state.thread_id = Some(thread_id);
self.remember_thread_locked(&mut state);
known
}
pub fn shutdown(&mut self) {
if let Ok(mut stdin) = self.stdin.lock() {
let _ = stdin.take();
}
let _ = self.child.wait();
}
fn track_pending(&self, request_id: RequestId, kind: PendingRequest) {
let mut state = self.state.lock().expect("state lock poisoned");
state.pending.insert(request_id, kind);
}
fn remember_thread_locked(&self, state: &mut State) {
if let Some(thread_id) = state.thread_id.as_ref()
&& !state.known_threads.iter().any(|id| id == thread_id)
{
state.known_threads.push(thread_id.clone());
}
}
fn next_request_id(&self) -> RequestId {
let id = self.next_request_id.fetch_add(1, Ordering::SeqCst);
RequestId::Integer(id)
}
fn send<T: Serialize>(&self, value: &T) -> Result<()> {
let json = serde_json::to_string(value).context("serialize message")?;
let mut line = json;
line.push('\n');
let mut stdin = self.stdin.lock().expect("stdin lock poisoned");
let Some(stdin) = stdin.as_mut() else {
anyhow::bail!("stdin already closed");
};
stdin.write_all(line.as_bytes()).context("write message")?;
stdin.flush().context("flush message")?;
Ok(())
}
fn read_until_response(&mut self, request_id: &RequestId) -> Result<JSONRPCResponse> {
let stdin = Arc::clone(&self.stdin);
let output = self.output.clone();
let reader = self.stdout.as_mut().context("stdout missing")?;
let mut buffer = String::new();
loop {
buffer.clear();
let bytes = reader
.read_line(&mut buffer)
.context("read server output")?;
if bytes == 0 {
anyhow::bail!("server closed stdout while awaiting response {request_id:?}");
}
let line = buffer.trim_end_matches(['\n', '\r']);
if !line.is_empty() && !self.filtered_output {
let _ = output.server_line(line);
}
let message = match serde_json::from_str::<JSONRPCMessage>(line) {
Ok(message) => message,
Err(_) => continue,
};
match message {
JSONRPCMessage::Response(response) => {
if &response.id == request_id {
return Ok(response);
}
}
JSONRPCMessage::Request(request) => {
let _ = handle_server_request(request, &stdin);
}
_ => {}
}
}
}
}
fn handle_server_request(
request: JSONRPCRequest,
stdin: &Arc<Mutex<Option<ChildStdin>>>,
) -> Result<()> {
let Ok(server_request) = codex_app_server_protocol::ServerRequest::try_from(request) else {
return Ok(());
};
match server_request {
codex_app_server_protocol::ServerRequest::CommandExecutionRequestApproval {
request_id,
..
} => {
let response = codex_app_server_protocol::CommandExecutionRequestApprovalResponse {
decision: CommandExecutionApprovalDecision::Decline,
};
send_jsonrpc_response(stdin, request_id, response)
}
codex_app_server_protocol::ServerRequest::FileChangeRequestApproval {
request_id, ..
} => {
let response = codex_app_server_protocol::FileChangeRequestApprovalResponse {
decision: FileChangeApprovalDecision::Decline,
};
send_jsonrpc_response(stdin, request_id, response)
}
_ => Ok(()),
}
}
fn send_jsonrpc_response<T: Serialize>(
stdin: &Arc<Mutex<Option<ChildStdin>>>,
request_id: RequestId,
response: T,
) -> Result<()> {
let result = serde_json::to_value(response).context("serialize response")?;
let message = JSONRPCMessage::Response(JSONRPCResponse {
id: request_id,
result,
});
send_with_stdin(stdin, &message)
}
fn send_with_stdin<T: Serialize>(stdin: &Arc<Mutex<Option<ChildStdin>>>, value: &T) -> Result<()> {
let json = serde_json::to_string(value).context("serialize message")?;
let mut line = json;
line.push('\n');
let mut stdin = stdin.lock().expect("stdin lock poisoned");
let Some(stdin) = stdin.as_mut() else {
anyhow::bail!("stdin already closed");
};
stdin.write_all(line.as_bytes()).context("write message")?;
stdin.flush().context("flush message")?;
Ok(())
}
pub fn build_thread_start_params(
approval_policy: AskForApproval,
model: Option<String>,
model_provider: Option<String>,
cwd: Option<String>,
) -> ThreadStartParams {
ThreadStartParams {
model,
model_provider,
cwd,
approval_policy: Some(approval_policy),
experimental_raw_events: false,
..Default::default()
}
}
pub fn build_thread_resume_params(
thread_id: String,
approval_policy: AskForApproval,
model: Option<String>,
model_provider: Option<String>,
cwd: Option<String>,
) -> ThreadResumeParams {
ThreadResumeParams {
thread_id,
model,
model_provider,
cwd,
approval_policy: Some(approval_policy),
..Default::default()
}
}

View file

@ -0,0 +1,156 @@
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum InputAction {
Message(String),
Command(UserCommand),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum UserCommand {
Help,
Quit,
NewThread,
Resume(String),
Use(String),
RefreshThread,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ParseError {
EmptyCommand,
MissingArgument { name: &'static str },
UnknownCommand { command: String },
}
impl ParseError {
pub fn message(&self) -> String {
match self {
Self::EmptyCommand => "empty command after ':'".to_string(),
Self::MissingArgument { name } => {
format!("missing required argument: {name}")
}
Self::UnknownCommand { command } => format!("unknown command: {command}"),
}
}
}
pub fn parse_input(line: &str) -> Result<Option<InputAction>, ParseError> {
let trimmed = line.trim();
if trimmed.is_empty() {
return Ok(None);
}
let Some(command_line) = trimmed.strip_prefix(':') else {
return Ok(Some(InputAction::Message(trimmed.to_string())));
};
let mut parts = command_line.split_whitespace();
let Some(command) = parts.next() else {
return Err(ParseError::EmptyCommand);
};
match command {
"help" | "h" => Ok(Some(InputAction::Command(UserCommand::Help))),
"quit" | "q" | "exit" => Ok(Some(InputAction::Command(UserCommand::Quit))),
"new" => Ok(Some(InputAction::Command(UserCommand::NewThread))),
"resume" => {
let thread_id = parts
.next()
.ok_or(ParseError::MissingArgument { name: "thread-id" })?;
Ok(Some(InputAction::Command(UserCommand::Resume(
thread_id.to_string(),
))))
}
"use" => {
let thread_id = parts
.next()
.ok_or(ParseError::MissingArgument { name: "thread-id" })?;
Ok(Some(InputAction::Command(UserCommand::Use(
thread_id.to_string(),
))))
}
"refresh-thread" => Ok(Some(InputAction::Command(UserCommand::RefreshThread))),
_ => Err(ParseError::UnknownCommand {
command: command.to_string(),
}),
}
}
#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
use super::InputAction;
use super::ParseError;
use super::UserCommand;
use super::parse_input;
#[test]
fn parses_message() {
let result = parse_input("hello there").unwrap();
assert_eq!(
result,
Some(InputAction::Message("hello there".to_string()))
);
}
#[test]
fn parses_help_command() {
let result = parse_input(":help").unwrap();
assert_eq!(result, Some(InputAction::Command(UserCommand::Help)));
}
#[test]
fn parses_new_thread() {
let result = parse_input(":new").unwrap();
assert_eq!(result, Some(InputAction::Command(UserCommand::NewThread)));
}
#[test]
fn parses_resume() {
let result = parse_input(":resume thr_123").unwrap();
assert_eq!(
result,
Some(InputAction::Command(UserCommand::Resume(
"thr_123".to_string()
)))
);
}
#[test]
fn parses_use() {
let result = parse_input(":use thr_456").unwrap();
assert_eq!(
result,
Some(InputAction::Command(UserCommand::Use(
"thr_456".to_string()
)))
);
}
#[test]
fn parses_refresh_thread() {
let result = parse_input(":refresh-thread").unwrap();
assert_eq!(
result,
Some(InputAction::Command(UserCommand::RefreshThread))
);
}
#[test]
fn rejects_missing_resume_arg() {
let result = parse_input(":resume");
assert_eq!(
result,
Err(ParseError::MissingArgument { name: "thread-id" })
);
}
#[test]
fn rejects_missing_use_arg() {
let result = parse_input(":use");
assert_eq!(
result,
Err(ParseError::MissingArgument { name: "thread-id" })
);
}
}

View file

@ -0,0 +1,293 @@
mod client;
mod commands;
mod output;
mod reader;
mod state;
use std::io;
use std::io::BufRead;
use std::sync::mpsc;
use anyhow::Context;
use anyhow::Result;
use clap::ArgAction;
use clap::Parser;
use codex_app_server_protocol::AskForApproval;
use crate::client::AppServerClient;
use crate::client::build_thread_resume_params;
use crate::client::build_thread_start_params;
use crate::commands::InputAction;
use crate::commands::UserCommand;
use crate::commands::parse_input;
use crate::output::Output;
use crate::state::ReaderEvent;
#[derive(Parser)]
#[command(author = "Codex", version, about = "Minimal app-server client")]
struct Cli {
/// Path to the `codex` CLI binary.
#[arg(long, default_value = "codex")]
codex_bin: String,
/// Forwarded to the `codex` CLI as `--config key=value`. Repeatable.
#[arg(short = 'c', long = "config", value_name = "key=value", action = ArgAction::Append)]
config_overrides: Vec<String>,
/// Resume an existing thread instead of starting a new one.
#[arg(long)]
thread_id: Option<String>,
/// Set the approval policy for the thread.
#[arg(long, default_value = "on-request")]
approval_policy: String,
/// Auto-approve command/file-change approvals.
#[arg(long, default_value_t = false)]
auto_approve: bool,
/// Only show final assistant messages and tool calls.
#[arg(long, default_value_t = false)]
final_only: bool,
/// Optional model override when starting/resuming a thread.
#[arg(long)]
model: Option<String>,
/// Optional model provider override when starting/resuming a thread.
#[arg(long)]
model_provider: Option<String>,
/// Optional working directory override when starting/resuming a thread.
#[arg(long)]
cwd: Option<String>,
}
fn main() -> Result<()> {
let cli = Cli::parse();
let output = Output::new();
let approval_policy = parse_approval_policy(&cli.approval_policy)?;
let mut client = AppServerClient::spawn(
&cli.codex_bin,
&cli.config_overrides,
output.clone(),
cli.final_only,
)?;
client.initialize()?;
let thread_id = if let Some(thread_id) = cli.thread_id.as_ref() {
client.resume_thread(build_thread_resume_params(
thread_id.clone(),
approval_policy,
cli.model.clone(),
cli.model_provider.clone(),
cli.cwd.clone(),
))?
} else {
client.start_thread(build_thread_start_params(
approval_policy,
cli.model.clone(),
cli.model_provider.clone(),
cli.cwd.clone(),
))?
};
output
.client_line(&format!("connected to thread {thread_id}"))
.ok();
output.set_prompt(&thread_id);
let (event_tx, event_rx) = mpsc::channel();
client.start_reader(event_tx, cli.auto_approve, cli.final_only)?;
print_help(&output);
let stdin = io::stdin();
let mut lines = stdin.lock().lines();
loop {
drain_events(&event_rx, &output);
let prompt_thread = client
.thread_id()
.unwrap_or_else(|| "no-thread".to_string());
output.prompt(&prompt_thread).ok();
let Some(line) = lines.next() else {
break;
};
let line = line.context("read stdin")?;
match parse_input(&line) {
Ok(None) => continue,
Ok(Some(InputAction::Message(message))) => {
let Some(active_thread) = client.thread_id() else {
output
.client_line("no active thread; use :new or :resume <id>")
.ok();
continue;
};
if let Err(err) = client.send_turn(&active_thread, message) {
output
.client_line(&format!("failed to send turn: {err}"))
.ok();
}
}
Ok(Some(InputAction::Command(command))) => {
if !handle_command(command, &client, &output, approval_policy, &cli) {
break;
}
}
Err(err) => {
output.client_line(&err.message()).ok();
}
}
}
client.shutdown();
Ok(())
}
fn handle_command(
command: UserCommand,
client: &AppServerClient,
output: &Output,
approval_policy: AskForApproval,
cli: &Cli,
) -> bool {
match command {
UserCommand::Help => {
print_help(output);
true
}
UserCommand::Quit => false,
UserCommand::NewThread => {
match client.request_thread_start(build_thread_start_params(
approval_policy,
cli.model.clone(),
cli.model_provider.clone(),
cli.cwd.clone(),
)) {
Ok(request_id) => {
output
.client_line(&format!("requested new thread ({request_id:?})"))
.ok();
}
Err(err) => {
output
.client_line(&format!("failed to start thread: {err}"))
.ok();
}
}
true
}
UserCommand::Resume(thread_id) => {
match client.request_thread_resume(build_thread_resume_params(
thread_id,
approval_policy,
cli.model.clone(),
cli.model_provider.clone(),
cli.cwd.clone(),
)) {
Ok(request_id) => {
output
.client_line(&format!("requested thread resume ({request_id:?})"))
.ok();
}
Err(err) => {
output
.client_line(&format!("failed to resume thread: {err}"))
.ok();
}
}
true
}
UserCommand::Use(thread_id) => {
let known = client.use_thread(thread_id.clone());
output.set_prompt(&thread_id);
if known {
output
.client_line(&format!("switched active thread to {thread_id}"))
.ok();
} else {
output
.client_line(&format!(
"switched active thread to {thread_id} (unknown; use :resume to load)"
))
.ok();
}
true
}
UserCommand::RefreshThread => {
match client.request_thread_list(None) {
Ok(request_id) => {
output
.client_line(&format!("requested thread list ({request_id:?})"))
.ok();
}
Err(err) => {
output
.client_line(&format!("failed to list threads: {err}"))
.ok();
}
}
true
}
}
}
fn parse_approval_policy(value: &str) -> Result<AskForApproval> {
match value {
"untrusted" | "unless-trusted" | "unlessTrusted" => Ok(AskForApproval::UnlessTrusted),
"on-failure" | "onFailure" => Ok(AskForApproval::OnFailure),
"on-request" | "onRequest" => Ok(AskForApproval::OnRequest),
"never" => Ok(AskForApproval::Never),
_ => anyhow::bail!(
"unknown approval policy: {value}. Expected one of: untrusted, on-failure, on-request, never"
),
}
}
fn drain_events(event_rx: &mpsc::Receiver<ReaderEvent>, output: &Output) {
while let Ok(event) = event_rx.try_recv() {
match event {
ReaderEvent::ThreadReady { thread_id } => {
output
.client_line(&format!("active thread is now {thread_id}"))
.ok();
output.set_prompt(&thread_id);
}
ReaderEvent::ThreadList {
thread_ids,
next_cursor,
} => {
if thread_ids.is_empty() {
output.client_line("threads: (none)").ok();
} else {
output.client_line("threads:").ok();
for thread_id in thread_ids {
output.client_line(&format!(" {thread_id}")).ok();
}
}
if let Some(next_cursor) = next_cursor {
output
.client_line(&format!(
"more threads available, next cursor: {next_cursor}"
))
.ok();
}
}
}
}
}
fn print_help(output: &Output) {
let _ = output.client_line("commands:");
let _ = output.client_line(" :help show this help");
let _ = output.client_line(" :new start a new thread");
let _ = output.client_line(" :resume <thread-id> resume an existing thread");
let _ = output.client_line(" :use <thread-id> switch the active thread");
let _ = output.client_line(" :refresh-thread list available threads");
let _ = output.client_line(" :quit exit");
let _ = output.client_line("type a message to send it as a new turn");
}

View file

@ -0,0 +1,121 @@
use std::io;
use std::io::IsTerminal;
use std::io::Write;
use std::sync::Arc;
use std::sync::Mutex;
#[derive(Clone, Copy, Debug)]
pub enum LabelColor {
Assistant,
Tool,
ToolMeta,
Thread,
}
#[derive(Debug, Default)]
struct PromptState {
thread_id: Option<String>,
visible: bool,
}
#[derive(Clone, Debug)]
pub struct Output {
lock: Arc<Mutex<()>>,
prompt: Arc<Mutex<PromptState>>,
color: bool,
}
impl Output {
pub fn new() -> Self {
let no_color = std::env::var_os("NO_COLOR").is_some();
let color = !no_color && io::stdout().is_terminal() && io::stderr().is_terminal();
Self {
lock: Arc::new(Mutex::new(())),
prompt: Arc::new(Mutex::new(PromptState::default())),
color,
}
}
pub fn server_line(&self, line: &str) -> io::Result<()> {
let _guard = self.lock.lock().expect("output lock poisoned");
self.clear_prompt_line_locked()?;
let mut stdout = io::stdout();
writeln!(stdout, "{line}")?;
stdout.flush()?;
self.redraw_prompt_locked()
}
pub fn client_line(&self, line: &str) -> io::Result<()> {
let _guard = self.lock.lock().expect("output lock poisoned");
self.clear_prompt_line_locked()?;
let mut stderr = io::stderr();
writeln!(stderr, "{line}")?;
stderr.flush()
}
pub fn prompt(&self, thread_id: &str) -> io::Result<()> {
let _guard = self.lock.lock().expect("output lock poisoned");
self.set_prompt_locked(thread_id);
self.write_prompt_locked()
}
pub fn set_prompt(&self, thread_id: &str) {
let _guard = self.lock.lock().expect("output lock poisoned");
self.set_prompt_locked(thread_id);
}
pub fn format_label(&self, label: &str, color: LabelColor) -> String {
if !self.color {
return label.to_string();
}
let code = match color {
LabelColor::Assistant => "32",
LabelColor::Tool => "36",
LabelColor::ToolMeta => "33",
LabelColor::Thread => "34",
};
format!("\x1b[{code}m{label}\x1b[0m")
}
fn clear_prompt_line_locked(&self) -> io::Result<()> {
let mut prompt = self.prompt.lock().expect("prompt lock poisoned");
if prompt.visible {
let mut stderr = io::stderr();
writeln!(stderr)?;
stderr.flush()?;
prompt.visible = false;
}
Ok(())
}
fn redraw_prompt_locked(&self) -> io::Result<()> {
if self
.prompt
.lock()
.expect("prompt lock poisoned")
.thread_id
.is_some()
{
self.write_prompt_locked()?;
}
Ok(())
}
fn set_prompt_locked(&self, thread_id: &str) {
let mut prompt = self.prompt.lock().expect("prompt lock poisoned");
prompt.thread_id = Some(thread_id.to_string());
}
fn write_prompt_locked(&self) -> io::Result<()> {
let mut prompt = self.prompt.lock().expect("prompt lock poisoned");
let Some(thread_id) = prompt.thread_id.as_ref() else {
return Ok(());
};
let mut stderr = io::stderr();
write!(stderr, "({thread_id})> ")?;
stderr.flush()?;
prompt.visible = true;
Ok(())
}
}

View file

@ -0,0 +1,332 @@
use std::io::BufRead;
use std::io::BufReader;
use std::process::ChildStdout;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::mpsc::Sender;
use std::thread;
use std::thread::JoinHandle;
use anyhow::Context;
use codex_app_server_protocol::CommandExecutionApprovalDecision;
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
use codex_app_server_protocol::FileChangeApprovalDecision;
use codex_app_server_protocol::FileChangeRequestApprovalResponse;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadListResponse;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartResponse;
use serde::Serialize;
use std::io::Write;
use crate::output::LabelColor;
use crate::output::Output;
use crate::state::PendingRequest;
use crate::state::ReaderEvent;
use crate::state::State;
pub fn start_reader(
mut stdout: BufReader<ChildStdout>,
stdin: Arc<Mutex<Option<std::process::ChildStdin>>>,
state: Arc<Mutex<State>>,
events: Sender<ReaderEvent>,
output: Output,
auto_approve: bool,
filtered_output: bool,
) -> JoinHandle<()> {
thread::spawn(move || {
let command_decision = if auto_approve {
CommandExecutionApprovalDecision::Accept
} else {
CommandExecutionApprovalDecision::Decline
};
let file_decision = if auto_approve {
FileChangeApprovalDecision::Accept
} else {
FileChangeApprovalDecision::Decline
};
let mut buffer = String::new();
loop {
buffer.clear();
match stdout.read_line(&mut buffer) {
Ok(0) => break,
Ok(_) => {}
Err(err) => {
let _ = output.client_line(&format!("failed to read from server: {err}"));
break;
}
}
let line = buffer.trim_end_matches(['\n', '\r']);
if !line.is_empty() && !filtered_output {
let _ = output.server_line(line);
}
let Ok(message) = serde_json::from_str::<JSONRPCMessage>(line) else {
continue;
};
match message {
JSONRPCMessage::Request(request) => {
if let Err(err) = handle_server_request(
request,
&command_decision,
&file_decision,
&stdin,
&output,
) {
let _ =
output.client_line(&format!("failed to handle server request: {err}"));
}
}
JSONRPCMessage::Response(response) => {
if let Err(err) = handle_response(response, &state, &events) {
let _ = output.client_line(&format!("failed to handle response: {err}"));
}
}
JSONRPCMessage::Notification(notification) => {
if filtered_output
&& let Err(err) = handle_filtered_notification(notification, &output)
{
let _ =
output.client_line(&format!("failed to filter notification: {err}"));
}
}
_ => {}
}
}
})
}
fn handle_server_request(
request: JSONRPCRequest,
command_decision: &CommandExecutionApprovalDecision,
file_decision: &FileChangeApprovalDecision,
stdin: &Arc<Mutex<Option<std::process::ChildStdin>>>,
output: &Output,
) -> anyhow::Result<()> {
let server_request = match ServerRequest::try_from(request.clone()) {
Ok(server_request) => server_request,
Err(_) => return Ok(()),
};
match server_request {
ServerRequest::CommandExecutionRequestApproval { request_id, params } => {
let response = CommandExecutionRequestApprovalResponse {
decision: command_decision.clone(),
};
output.client_line(&format!(
"auto-response for command approval {request_id:?}: {command_decision:?} ({params:?})"
))?;
send_response(stdin, request_id, response)
}
ServerRequest::FileChangeRequestApproval { request_id, params } => {
let response = FileChangeRequestApprovalResponse {
decision: file_decision.clone(),
};
output.client_line(&format!(
"auto-response for file change approval {request_id:?}: {file_decision:?} ({params:?})"
))?;
send_response(stdin, request_id, response)
}
_ => Ok(()),
}
}
fn handle_response(
response: JSONRPCResponse,
state: &Arc<Mutex<State>>,
events: &Sender<ReaderEvent>,
) -> anyhow::Result<()> {
let pending = {
let mut state = state.lock().expect("state lock poisoned");
state.pending.remove(&response.id)
};
let Some(pending) = pending else {
return Ok(());
};
match pending {
PendingRequest::Start => {
let parsed = serde_json::from_value::<ThreadStartResponse>(response.result)
.context("decode thread/start response")?;
let thread_id = parsed.thread.id;
{
let mut state = state.lock().expect("state lock poisoned");
state.thread_id = Some(thread_id.clone());
if !state.known_threads.iter().any(|id| id == &thread_id) {
state.known_threads.push(thread_id.clone());
}
}
events.send(ReaderEvent::ThreadReady { thread_id }).ok();
}
PendingRequest::Resume => {
let parsed = serde_json::from_value::<ThreadResumeResponse>(response.result)
.context("decode thread/resume response")?;
let thread_id = parsed.thread.id;
{
let mut state = state.lock().expect("state lock poisoned");
state.thread_id = Some(thread_id.clone());
if !state.known_threads.iter().any(|id| id == &thread_id) {
state.known_threads.push(thread_id.clone());
}
}
events.send(ReaderEvent::ThreadReady { thread_id }).ok();
}
PendingRequest::List => {
let parsed = serde_json::from_value::<ThreadListResponse>(response.result)
.context("decode thread/list response")?;
let thread_ids: Vec<String> = parsed.data.into_iter().map(|thread| thread.id).collect();
{
let mut state = state.lock().expect("state lock poisoned");
for thread_id in &thread_ids {
if !state.known_threads.iter().any(|id| id == thread_id) {
state.known_threads.push(thread_id.clone());
}
}
}
events
.send(ReaderEvent::ThreadList {
thread_ids,
next_cursor: parsed.next_cursor,
})
.ok();
}
}
Ok(())
}
fn handle_filtered_notification(
notification: JSONRPCNotification,
output: &Output,
) -> anyhow::Result<()> {
let Ok(server_notification) = ServerNotification::try_from(notification) else {
return Ok(());
};
match server_notification {
ServerNotification::ItemCompleted(payload) => {
emit_filtered_item(payload.item, &payload.thread_id, output)
}
_ => Ok(()),
}
}
fn emit_filtered_item(item: ThreadItem, thread_id: &str, output: &Output) -> anyhow::Result<()> {
let thread_label = output.format_label(thread_id, LabelColor::Thread);
match item {
ThreadItem::AgentMessage { text, .. } => {
let label = output.format_label("assistant", LabelColor::Assistant);
output.server_line(&format!("{thread_label} {label}: {text}"))?;
}
ThreadItem::CommandExecution {
command,
status,
exit_code,
aggregated_output,
..
} => {
let label = output.format_label("tool", LabelColor::Tool);
output.server_line(&format!(
"{thread_label} {label}: command {command} ({status:?})"
))?;
if let Some(exit_code) = exit_code {
let label = output.format_label("tool exit", LabelColor::ToolMeta);
output.server_line(&format!("{thread_label} {label}: {exit_code}"))?;
}
if let Some(aggregated_output) = aggregated_output {
let label = output.format_label("tool output", LabelColor::ToolMeta);
write_multiline(
output,
&thread_label,
&format!("{label}:"),
&aggregated_output,
)?;
}
}
ThreadItem::FileChange {
changes, status, ..
} => {
let label = output.format_label("tool", LabelColor::Tool);
output.server_line(&format!(
"{thread_label} {label}: file change ({status:?}, {} files)",
changes.len()
))?;
}
ThreadItem::McpToolCall {
server,
tool,
status,
arguments,
result,
error,
..
} => {
let label = output.format_label("tool", LabelColor::Tool);
output.server_line(&format!(
"{thread_label} {label}: {server}.{tool} ({status:?})"
))?;
if !arguments.is_null() {
let label = output.format_label("tool args", LabelColor::ToolMeta);
output.server_line(&format!("{thread_label} {label}: {arguments}"))?;
}
if let Some(result) = result {
let label = output.format_label("tool result", LabelColor::ToolMeta);
output.server_line(&format!("{thread_label} {label}: {result:?}"))?;
}
if let Some(error) = error {
let label = output.format_label("tool error", LabelColor::ToolMeta);
output.server_line(&format!("{thread_label} {label}: {error:?}"))?;
}
}
_ => {}
}
Ok(())
}
fn write_multiline(
output: &Output,
thread_label: &str,
header: &str,
text: &str,
) -> anyhow::Result<()> {
output.server_line(&format!("{thread_label} {header}"))?;
for line in text.lines() {
output.server_line(&format!("{thread_label} {line}"))?;
}
Ok(())
}
fn send_response<T: Serialize>(
stdin: &Arc<Mutex<Option<std::process::ChildStdin>>>,
request_id: codex_app_server_protocol::RequestId,
response: T,
) -> anyhow::Result<()> {
let result = serde_json::to_value(response).context("serialize response")?;
let message = JSONRPCResponse {
id: request_id,
result,
};
let json = serde_json::to_string(&message).context("serialize response message")?;
let mut line = json;
line.push('\n');
let mut stdin = stdin.lock().expect("stdin lock poisoned");
let Some(stdin) = stdin.as_mut() else {
anyhow::bail!("stdin already closed");
};
stdin.write_all(line.as_bytes()).context("write response")?;
stdin.flush().context("flush response")?;
Ok(())
}

View file

@ -0,0 +1,28 @@
use std::collections::HashMap;
use codex_app_server_protocol::RequestId;
#[derive(Debug, Default)]
pub struct State {
pub pending: HashMap<RequestId, PendingRequest>,
pub thread_id: Option<String>,
pub known_threads: Vec<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PendingRequest {
Start,
Resume,
List,
}
#[derive(Debug, Clone)]
pub enum ReaderEvent {
ThreadReady {
thread_id: String,
},
ThreadList {
thread_ids: Vec<String>,
next_cursor: Option<String>,
},
}