From f48d88067efae4a1cf5df6d7ec5210c728afa4e1 Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Fri, 5 Dec 2025 12:09:43 -0800 Subject: [PATCH] Fix unified_exec on windows (#7620) Fix unified_exec on windows Requires removal of PSUEDOCONSOLE_INHERIT_CURSOR flag so child processed don't attempt to wait for cursor position response (and timeout). https://github.com/wezterm/wezterm/compare/main...pakrym:wezterm:PSUEDOCONSOLE_INHERIT_CURSOR?expand=1 --------- Co-authored-by: pakrym-oai --- codex-rs/Cargo.lock | 8 +- codex-rs/Cargo.toml | 3 +- codex-rs/core/tests/common/lib.rs | 4 +- codex-rs/core/tests/suite/unified_exec.rs | 92 ++++++++++++++++++++++- codex-rs/utils/pty/Cargo.toml | 2 +- codex-rs/utils/pty/src/lib.rs | 30 ++++++++ 6 files changed, 128 insertions(+), 11 deletions(-) diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 48f87efc2..d809adfad 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2557,8 +2557,7 @@ dependencies = [ [[package]] name = "filedescriptor" version = "0.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e40758ed24c9b2eeb76c35fb0aebc66c626084edd827e07e1552279814c6682d" +source = "git+https://github.com/pakrym/wezterm?branch=PSUEDOCONSOLE_INHERIT_CURSOR#fe38df8409545a696909aa9a09e63438630f217d" dependencies = [ "libc", "thiserror 1.0.69", @@ -4632,8 +4631,7 @@ dependencies = [ [[package]] name = "portable-pty" version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4a596a2b3d2752d94f51fac2d4a96737b8705dddd311a32b9af47211f08671e" +source = "git+https://github.com/pakrym/wezterm?branch=PSUEDOCONSOLE_INHERIT_CURSOR#fe38df8409545a696909aa9a09e63438630f217d" dependencies = [ "anyhow", "bitflags 1.3.2", @@ -4642,7 +4640,7 @@ dependencies = [ "lazy_static", "libc", "log", - "nix 0.28.0", + "nix 0.29.0", "serial2", "shared_library", "shell-words", diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index 2339cd4e6..d3d3c36c3 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -178,8 +178,8 @@ seccompiler = "0.5.0" sentry = "0.34.0" serde = "1" serde_json = "1" -serde_yaml = "0.9" serde_with = "3.16" +serde_yaml = "0.9" serial_test = "3.2.0" sha1 = "0.10.6" sha2 = "0.10" @@ -288,6 +288,7 @@ opt-level = 0 # Uncomment to debug local changes. # ratatui = { path = "../../ratatui" } crossterm = { git = "https://github.com/nornagon/crossterm", branch = "nornagon/color-query" } +portable-pty = { git = "https://github.com/pakrym/wezterm", branch = "PSUEDOCONSOLE_INHERIT_CURSOR" } ratatui = { git = "https://github.com/nornagon/ratatui", branch = "nornagon-v0.29.0-patch" } # Uncomment to debug local changes. diff --git a/codex-rs/core/tests/common/lib.rs b/codex-rs/core/tests/common/lib.rs index 2c8c28d71..d643fb77f 100644 --- a/codex-rs/core/tests/common/lib.rs +++ b/codex-rs/core/tests/common/lib.rs @@ -374,9 +374,7 @@ macro_rules! skip_if_no_network { macro_rules! skip_if_windows { ($return_value:expr $(,)?) => {{ if cfg!(target_os = "windows") { - println!( - "Skipping test because it cannot execute when network is disabled in a Codex sandbox." - ); + println!("Skipping test because it cannot execute on Windows."); return $return_value; } }}; diff --git a/codex-rs/core/tests/suite/unified_exec.rs b/codex-rs/core/tests/suite/unified_exec.rs index 5e8f5a8cd..33e469fc1 100644 --- a/codex-rs/core/tests/suite/unified_exec.rs +++ b/codex-rs/core/tests/suite/unified_exec.rs @@ -1,4 +1,3 @@ -#![cfg(not(target_os = "windows"))] use std::collections::HashMap; use std::ffi::OsStr; use std::fs; @@ -24,6 +23,7 @@ use core_test_support::responses::sse; use core_test_support::responses::start_mock_server; use core_test_support::skip_if_no_network; use core_test_support::skip_if_sandbox; +use core_test_support::skip_if_windows; use core_test_support::test_codex::TestCodex; use core_test_support::test_codex::TestCodexHarness; use core_test_support::test_codex::test_codex; @@ -155,6 +155,7 @@ fn collect_tool_outputs(bodies: &[Value]) -> Result Result<()> { skip_if_no_network!(Ok(())); skip_if_sandbox!(Ok(())); + skip_if_windows!(Ok(())); let builder = test_codex().with_config(|config| { config.include_apply_patch_tool = true; @@ -279,6 +280,7 @@ async fn unified_exec_intercepts_apply_patch_exec_command() -> Result<()> { async fn unified_exec_emits_exec_command_begin_event() -> Result<()> { skip_if_no_network!(Ok(())); skip_if_sandbox!(Ok(())); + skip_if_windows!(Ok(())); let server = start_mock_server().await; @@ -350,6 +352,7 @@ async fn unified_exec_emits_exec_command_begin_event() -> Result<()> { async fn unified_exec_resolves_relative_workdir() -> Result<()> { skip_if_no_network!(Ok(())); skip_if_sandbox!(Ok(())); + skip_if_windows!(Ok(())); let server = start_mock_server().await; @@ -427,6 +430,7 @@ async fn unified_exec_resolves_relative_workdir() -> Result<()> { async fn unified_exec_respects_workdir_override() -> Result<()> { skip_if_no_network!(Ok(())); skip_if_sandbox!(Ok(())); + skip_if_windows!(Ok(())); let server = start_mock_server().await; @@ -505,6 +509,7 @@ async fn unified_exec_respects_workdir_override() -> Result<()> { async fn unified_exec_emits_exec_command_end_event() -> Result<()> { skip_if_no_network!(Ok(())); skip_if_sandbox!(Ok(())); + skip_if_windows!(Ok(())); let server = start_mock_server().await; @@ -591,6 +596,7 @@ async fn unified_exec_emits_exec_command_end_event() -> Result<()> { async fn unified_exec_emits_output_delta_for_exec_command() -> Result<()> { skip_if_no_network!(Ok(())); skip_if_sandbox!(Ok(())); + skip_if_windows!(Ok(())); let server = start_mock_server().await; @@ -662,6 +668,7 @@ async fn unified_exec_emits_output_delta_for_exec_command() -> Result<()> { async fn unified_exec_emits_output_delta_for_write_stdin() -> Result<()> { skip_if_no_network!(Ok(())); skip_if_sandbox!(Ok(())); + skip_if_windows!(Ok(())); let server = start_mock_server().await; @@ -761,6 +768,7 @@ async fn unified_exec_emits_output_delta_for_write_stdin() -> Result<()> { async fn unified_exec_emits_begin_for_write_stdin() -> Result<()> { skip_if_no_network!(Ok(())); skip_if_sandbox!(Ok(())); + skip_if_windows!(Ok(())); let server = start_mock_server().await; @@ -857,6 +865,7 @@ async fn unified_exec_emits_begin_for_write_stdin() -> Result<()> { async fn unified_exec_emits_begin_event_for_write_stdin_requests() -> Result<()> { skip_if_no_network!(Ok(())); skip_if_sandbox!(Ok(())); + skip_if_windows!(Ok(())); let server = start_mock_server().await; @@ -978,6 +987,7 @@ async fn unified_exec_emits_begin_event_for_write_stdin_requests() -> Result<()> async fn exec_command_reports_chunk_and_exit_metadata() -> Result<()> { skip_if_no_network!(Ok(())); skip_if_sandbox!(Ok(())); + skip_if_windows!(Ok(())); let server = start_mock_server().await; @@ -1085,6 +1095,7 @@ async fn exec_command_reports_chunk_and_exit_metadata() -> Result<()> { async fn unified_exec_respects_early_exit_notifications() -> Result<()> { skip_if_no_network!(Ok(())); skip_if_sandbox!(Ok(())); + skip_if_windows!(Ok(())); let server = start_mock_server().await; @@ -1177,6 +1188,7 @@ async fn unified_exec_respects_early_exit_notifications() -> Result<()> { async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> { skip_if_no_network!(Ok(())); skip_if_sandbox!(Ok(())); + skip_if_windows!(Ok(())); let server = start_mock_server().await; @@ -1338,6 +1350,7 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> { async fn unified_exec_emits_end_event_when_session_dies_via_stdin() -> Result<()> { skip_if_no_network!(Ok(())); skip_if_sandbox!(Ok(())); + skip_if_windows!(Ok(())); let server = start_mock_server().await; @@ -1442,6 +1455,7 @@ async fn unified_exec_emits_end_event_when_session_dies_via_stdin() -> Result<() async fn unified_exec_reuses_session_via_stdin() -> Result<()> { skip_if_no_network!(Ok(())); skip_if_sandbox!(Ok(())); + skip_if_windows!(Ok(())); let server = start_mock_server().await; @@ -1553,6 +1567,7 @@ async fn unified_exec_reuses_session_via_stdin() -> Result<()> { async fn unified_exec_streams_after_lagged_output() -> Result<()> { skip_if_no_network!(Ok(())); skip_if_sandbox!(Ok(())); + skip_if_windows!(Ok(())); let server = start_mock_server().await; @@ -1684,6 +1699,7 @@ PY async fn unified_exec_timeout_and_followup_poll() -> Result<()> { skip_if_no_network!(Ok(())); skip_if_sandbox!(Ok(())); + skip_if_windows!(Ok(())); let server = start_mock_server().await; @@ -1790,6 +1806,7 @@ async fn unified_exec_timeout_and_followup_poll() -> Result<()> { async fn unified_exec_formats_large_output_summary() -> Result<()> { skip_if_no_network!(Ok(())); skip_if_sandbox!(Ok(())); + skip_if_windows!(Ok(())); let server = start_mock_server().await; @@ -1875,6 +1892,7 @@ PY async fn unified_exec_runs_under_sandbox() -> Result<()> { skip_if_no_network!(Ok(())); skip_if_sandbox!(Ok(())); + skip_if_windows!(Ok(())); let server = start_mock_server().await; @@ -2067,11 +2085,83 @@ async fn unified_exec_python_prompt_under_seatbelt() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn unified_exec_runs_on_all_platforms() -> Result<()> { + skip_if_no_network!(Ok(())); + skip_if_sandbox!(Ok(())); + + let server = start_mock_server().await; + + let mut builder = test_codex().with_config(|config| { + config.features.enable(Feature::UnifiedExec); + }); + let TestCodex { + codex, + cwd, + session_configured, + .. + } = builder.build(&server).await?; + + let call_id = "uexec"; + let args = serde_json::json!({ + "cmd": "echo 'hello crossplat'", + }); + + let responses = vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?), + ev_completed("resp-1"), + ]), + sse(vec![ + ev_assistant_message("msg-1", "done"), + ev_completed("resp-2"), + ]), + ]; + mount_sse_sequence(&server, responses).await; + + let session_model = session_configured.model.clone(); + + codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "summarize large output".into(), + }], + final_output_json_schema: None, + cwd: cwd.path().to_path_buf(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::DangerFullAccess, + model: session_model, + effort: None, + summary: ReasoningSummary::Auto, + }) + .await?; + + wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; + + let requests = server.received_requests().await.expect("recorded requests"); + assert!(!requests.is_empty(), "expected at least one POST request"); + + let bodies = requests + .iter() + .map(|req| req.body_json::().expect("request json")) + .collect::>(); + + let outputs = collect_tool_outputs(&bodies)?; + let output = outputs.get(call_id).expect("missing output"); + + // TODO: Weaker match because windows produces control characters + assert_regex_match(".*hello crossplat.*", &output.output); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[ignore] async fn unified_exec_prunes_exited_sessions_first() -> Result<()> { skip_if_no_network!(Ok(())); skip_if_sandbox!(Ok(())); + skip_if_windows!(Ok(())); let server = start_mock_server().await; diff --git a/codex-rs/utils/pty/Cargo.toml b/codex-rs/utils/pty/Cargo.toml index d640c71aa..2b3de5aa1 100644 --- a/codex-rs/utils/pty/Cargo.toml +++ b/codex-rs/utils/pty/Cargo.toml @@ -10,4 +10,4 @@ workspace = true [dependencies] anyhow = { workspace = true } portable-pty = { workspace = true } -tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync"] } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync", "time"] } diff --git a/codex-rs/utils/pty/src/lib.rs b/codex-rs/utils/pty/src/lib.rs index 14cc43076..23d69b6f6 100644 --- a/codex-rs/utils/pty/src/lib.rs +++ b/codex-rs/utils/pty/src/lib.rs @@ -1,3 +1,4 @@ +use core::fmt; use std::collections::HashMap; use std::io::ErrorKind; use std::path::Path; @@ -9,13 +10,20 @@ use std::time::Duration; use anyhow::Result; use portable_pty::native_pty_system; use portable_pty::CommandBuilder; +use portable_pty::MasterPty; use portable_pty::PtySize; +use portable_pty::SlavePty; use tokio::sync::broadcast; use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::sync::Mutex as TokioMutex; use tokio::task::JoinHandle; +pub struct PtyPairWrapper { + pub _slave: Option>, + pub _master: Box, +} + #[derive(Debug)] pub struct ExecCommandSession { writer_tx: mpsc::Sender>, @@ -26,6 +34,15 @@ pub struct ExecCommandSession { wait_handle: StdMutex>>, exit_status: Arc, exit_code: Arc>>, + // PtyPair must be preserved because the process will receive Control+C if the + // slave is closed + _pair: StdMutex, +} + +impl fmt::Debug for PtyPairWrapper { + fn fmt(&self, _: &mut fmt::Formatter<'_>) -> fmt::Result { + Ok(()) + } } impl ExecCommandSession { @@ -39,6 +56,7 @@ impl ExecCommandSession { wait_handle: JoinHandle<()>, exit_status: Arc, exit_code: Arc>>, + pair: PtyPairWrapper, ) -> (Self, broadcast::Receiver>) { let initial_output_rx = output_tx.subscribe(); ( @@ -51,6 +69,7 @@ impl ExecCommandSession { wait_handle: StdMutex::new(Some(wait_handle)), exit_status, exit_code, + _pair: StdMutex::new(pair), }, initial_output_rx, ) @@ -192,6 +211,16 @@ pub async fn spawn_pty_process( let _ = exit_tx.send(code); }); + let pair = PtyPairWrapper { + _slave: if cfg!(windows) { + // Keep the slave handle alive on Windows to prevent the process from receiving Control+C + Some(pair.slave) + } else { + None + }, + _master: pair.master, + }; + let (session, output_rx) = ExecCommandSession::new( writer_tx, output_tx, @@ -201,6 +230,7 @@ pub async fn spawn_pty_process( wait_handle, exit_status, exit_code, + pair, ); Ok(SpawnedPty {