From ac5fa6baf8898fba30d1f7ad6bea2da986e3fc93 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Mon, 8 Dec 2025 15:23:02 -0800 Subject: [PATCH] Do not emit start/end events for write stdin (#7561) --- .../core/src/tools/handlers/unified_exec.rs | 1 - codex-rs/core/src/unified_exec/mod.rs | 2 - .../core/src/unified_exec/session_manager.rs | 51 +------ codex-rs/core/tests/suite/unified_exec.rs | 136 +++--------------- 4 files changed, 19 insertions(+), 171 deletions(-) diff --git a/codex-rs/core/src/tools/handlers/unified_exec.rs b/codex-rs/core/src/tools/handlers/unified_exec.rs index f2500a413..66cf624a6 100644 --- a/codex-rs/core/src/tools/handlers/unified_exec.rs +++ b/codex-rs/core/src/tools/handlers/unified_exec.rs @@ -215,7 +215,6 @@ impl ToolHandler for UnifiedExecHandler { })?; manager .write_stdin(WriteStdinRequest { - call_id: &call_id, process_id: &args.session_id.to_string(), input: &args.chars, yield_time_ms: args.yield_time_ms, diff --git a/codex-rs/core/src/unified_exec/mod.rs b/codex-rs/core/src/unified_exec/mod.rs index 34b62df34..02a0f9ead 100644 --- a/codex-rs/core/src/unified_exec/mod.rs +++ b/codex-rs/core/src/unified_exec/mod.rs @@ -80,7 +80,6 @@ pub(crate) struct ExecCommandRequest { #[derive(Debug)] pub(crate) struct WriteStdinRequest<'a> { - pub call_id: &'a str, pub process_id: &'a str, pub input: &'a str, pub yield_time_ms: u64, @@ -216,7 +215,6 @@ mod tests { .services .unified_exec_manager .write_stdin(WriteStdinRequest { - call_id: "write-stdin", process_id, input, yield_time_ms, diff --git a/codex-rs/core/src/unified_exec/session_manager.rs b/codex-rs/core/src/unified_exec/session_manager.rs index 88d65ca14..af706b4b2 100644 --- a/codex-rs/core/src/unified_exec/session_manager.rs +++ b/codex-rs/core/src/unified_exec/session_manager.rs @@ -24,7 +24,6 @@ use crate::sandboxing::ExecEnv; use crate::sandboxing::SandboxPermissions; use crate::tools::events::ToolEmitter; use crate::tools::events::ToolEventCtx; -use crate::tools::events::ToolEventFailure; use crate::tools::events::ToolEventStage; use crate::tools::orchestrator::ToolOrchestrator; use crate::tools::runtimes::unified_exec::UnifiedExecRequest as UnifiedExecToolRequest; @@ -77,7 +76,6 @@ struct PreparedSessionHandles { session_ref: Arc, turn_ref: Arc, command: Vec, - cwd: PathBuf, process_id: String, } @@ -234,41 +232,12 @@ impl UnifiedExecSessionManager { session_ref, turn_ref, command: session_command, - cwd: session_cwd, process_id, + .. } = self.prepare_session_handles(process_id.as_str()).await?; - let interaction_emitter = ToolEmitter::unified_exec( - &session_command, - session_cwd.clone(), - ExecCommandSource::UnifiedExecInteraction, - (!request.input.is_empty()).then(|| request.input.to_string()), - Some(process_id.clone()), - ); - let make_event_ctx = || { - ToolEventCtx::new( - session_ref.as_ref(), - turn_ref.as_ref(), - request.call_id, - None, - ) - }; - interaction_emitter - .emit(make_event_ctx(), ToolEventStage::Begin) - .await; - if !request.input.is_empty() { - if let Err(err) = Self::send_input(&writer_tx, request.input.as_bytes()).await { - interaction_emitter - .emit( - make_event_ctx(), - ToolEventStage::Failure(ToolEventFailure::Message(format!( - "write_stdin failed: {err:?}" - ))), - ) - .await; - return Err(err); - } + Self::send_input(&writer_tx, request.input.as_bytes()).await?; tokio::time::sleep(Duration::from_millis(100)).await; } @@ -319,21 +288,6 @@ impl UnifiedExecSessionManager { session_command: Some(session_command.clone()), }; - let interaction_output = ExecToolCallOutput { - exit_code: response.exit_code.unwrap_or(0), - stdout: StreamOutput::new(response.output.clone()), - stderr: StreamOutput::new(String::new()), - aggregated_output: StreamOutput::new(response.output.clone()), - duration: response.wall_time, - timed_out: false, - }; - interaction_emitter - .emit( - make_event_ctx(), - ToolEventStage::Success(interaction_output), - ) - .await; - if response.process_id.is_some() { Self::emit_waiting_status(&session_ref, &turn_ref, &session_command).await; } @@ -400,7 +354,6 @@ impl UnifiedExecSessionManager { session_ref: Arc::clone(&entry.session_ref), turn_ref: Arc::clone(&entry.turn_ref), command: entry.command.clone(), - cwd: entry.cwd.clone(), process_id: entry.process_id.clone(), }) } diff --git a/codex-rs/core/tests/suite/unified_exec.rs b/codex-rs/core/tests/suite/unified_exec.rs index 33e469fc1..6a62e35df 100644 --- a/codex-rs/core/tests/suite/unified_exec.rs +++ b/codex-rs/core/tests/suite/unified_exec.rs @@ -765,104 +765,7 @@ async fn unified_exec_emits_output_delta_for_write_stdin() -> Result<()> { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn unified_exec_emits_begin_for_write_stdin() -> Result<()> { - skip_if_no_network!(Ok(())); - skip_if_sandbox!(Ok(())); - skip_if_windows!(Ok(())); - - let server = start_mock_server().await; - - let mut builder = test_codex().with_config(|config| { - config.use_experimental_unified_exec_tool = true; - config.features.enable(Feature::UnifiedExec); - }); - let TestCodex { - codex, - cwd, - session_configured, - .. - } = builder.build(&server).await?; - - let open_call_id = "uexec-open-for-begin"; - let open_args = json!({ - "shell": "bash".to_string(), - "cmd": "bash -i".to_string(), - "yield_time_ms": 200, - }); - - let stdin_call_id = "uexec-stdin-begin"; - let stdin_args = json!({ - "chars": "echo hello", - "session_id": 1000, - "yield_time_ms": 400, - }); - - let responses = vec![ - sse(vec![ - ev_response_created("resp-1"), - ev_function_call( - open_call_id, - "exec_command", - &serde_json::to_string(&open_args)?, - ), - ev_completed("resp-1"), - ]), - sse(vec![ - ev_response_created("resp-2"), - ev_function_call( - stdin_call_id, - "write_stdin", - &serde_json::to_string(&stdin_args)?, - ), - ev_completed("resp-2"), - ]), - sse(vec![ - ev_response_created("resp-3"), - ev_assistant_message("msg-1", "done"), - ev_completed("resp-3"), - ]), - ]; - mount_sse_sequence(&server, responses).await; - - let session_model = session_configured.model.clone(); - - codex - .submit(Op::UserTurn { - items: vec![UserInput::Text { - text: "begin events for stdin".into(), - }], - final_output_json_schema: None, - cwd: cwd.path().to_path_buf(), - approval_policy: AskForApproval::Never, - sandbox_policy: SandboxPolicy::DangerFullAccess, - model: session_model, - effort: None, - summary: ReasoningSummary::Auto, - }) - .await?; - - let begin_event = wait_for_event_match(&codex, |msg| match msg { - EventMsg::ExecCommandBegin(ev) if ev.call_id == stdin_call_id => Some(ev.clone()), - _ => None, - }) - .await; - - assert_command(&begin_event.command, "-lc", "bash -i"); - assert_eq!( - begin_event.interaction_input, - Some("echo hello".to_string()) - ); - assert_eq!( - begin_event.source, - ExecCommandSource::UnifiedExecInteraction - ); - - wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; - Ok(()) -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn unified_exec_emits_begin_event_for_write_stdin_requests() -> Result<()> { +async fn unified_exec_emits_one_begin_and_one_end_event() -> Result<()> { skip_if_no_network!(Ok(())); skip_if_sandbox!(Ok(())); skip_if_windows!(Ok(())); @@ -883,8 +786,8 @@ async fn unified_exec_emits_begin_event_for_write_stdin_requests() -> Result<()> let open_call_id = "uexec-open-session"; let open_args = json!({ "shell": "bash".to_string(), - "cmd": "bash -i".to_string(), - "yield_time_ms": 250, + "cmd": "sleep 0.1".to_string(), + "yield_time_ms": 10, }); let poll_call_id = "uexec-poll-empty"; @@ -939,10 +842,12 @@ async fn unified_exec_emits_begin_event_for_write_stdin_requests() -> Result<()> .await?; let mut begin_events = Vec::new(); + let mut end_events = Vec::new(); loop { let event_msg = wait_for_event(&codex, |_| true).await; match event_msg { EventMsg::ExecCommandBegin(event) => begin_events.push(event), + EventMsg::ExecCommandEnd(event) => end_events.push(event), EventMsg::TaskComplete(_) => break, _ => {} } @@ -950,16 +855,19 @@ async fn unified_exec_emits_begin_event_for_write_stdin_requests() -> Result<()> assert_eq!( begin_events.len(), - 2, - "expected begin events for the startup command and the write_stdin call" + 1, + "expected begin events for the startup command" ); - let open_event = begin_events - .iter() - .find(|ev| ev.call_id == open_call_id) - .expect("missing exec_command begin"); + assert_eq!( + end_events.len(), + 1, + "expected end event for the write_stdin call" + ); - assert_command(&open_event.command, "-lc", "bash -i"); + let open_event = &begin_events[0]; + + assert_command(&open_event.command, "-lc", "sleep 0.1"); assert!( open_event.interaction_input.is_none(), @@ -967,18 +875,8 @@ async fn unified_exec_emits_begin_event_for_write_stdin_requests() -> Result<()> ); assert_eq!(open_event.source, ExecCommandSource::UnifiedExecStartup); - let poll_event = begin_events - .iter() - .find(|ev| ev.call_id == poll_call_id) - .expect("missing write_stdin begin"); - - assert_command(&poll_event.command, "-lc", "bash -i"); - - assert!( - poll_event.interaction_input.is_none(), - "poll begin events should omit interaction input" - ); - assert_eq!(poll_event.source, ExecCommandSource::UnifiedExecInteraction); + let end_event = &end_events[0]; + assert_eq!(end_event.call_id, open_call_id); Ok(()) }