Do not emit start/end events for write stdin (#7561)
This commit is contained in:
parent
badda736c6
commit
ac5fa6baf8
4 changed files with 19 additions and 171 deletions
|
|
@ -215,7 +215,6 @@ impl ToolHandler for UnifiedExecHandler {
|
|||
})?;
|
||||
manager
|
||||
.write_stdin(WriteStdinRequest {
|
||||
call_id: &call_id,
|
||||
process_id: &args.session_id.to_string(),
|
||||
input: &args.chars,
|
||||
yield_time_ms: args.yield_time_ms,
|
||||
|
|
|
|||
|
|
@ -80,7 +80,6 @@ pub(crate) struct ExecCommandRequest {
|
|||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct WriteStdinRequest<'a> {
|
||||
pub call_id: &'a str,
|
||||
pub process_id: &'a str,
|
||||
pub input: &'a str,
|
||||
pub yield_time_ms: u64,
|
||||
|
|
@ -216,7 +215,6 @@ mod tests {
|
|||
.services
|
||||
.unified_exec_manager
|
||||
.write_stdin(WriteStdinRequest {
|
||||
call_id: "write-stdin",
|
||||
process_id,
|
||||
input,
|
||||
yield_time_ms,
|
||||
|
|
|
|||
|
|
@ -24,7 +24,6 @@ use crate::sandboxing::ExecEnv;
|
|||
use crate::sandboxing::SandboxPermissions;
|
||||
use crate::tools::events::ToolEmitter;
|
||||
use crate::tools::events::ToolEventCtx;
|
||||
use crate::tools::events::ToolEventFailure;
|
||||
use crate::tools::events::ToolEventStage;
|
||||
use crate::tools::orchestrator::ToolOrchestrator;
|
||||
use crate::tools::runtimes::unified_exec::UnifiedExecRequest as UnifiedExecToolRequest;
|
||||
|
|
@ -77,7 +76,6 @@ struct PreparedSessionHandles {
|
|||
session_ref: Arc<Session>,
|
||||
turn_ref: Arc<TurnContext>,
|
||||
command: Vec<String>,
|
||||
cwd: PathBuf,
|
||||
process_id: String,
|
||||
}
|
||||
|
||||
|
|
@ -234,41 +232,12 @@ impl UnifiedExecSessionManager {
|
|||
session_ref,
|
||||
turn_ref,
|
||||
command: session_command,
|
||||
cwd: session_cwd,
|
||||
process_id,
|
||||
..
|
||||
} = self.prepare_session_handles(process_id.as_str()).await?;
|
||||
|
||||
let interaction_emitter = ToolEmitter::unified_exec(
|
||||
&session_command,
|
||||
session_cwd.clone(),
|
||||
ExecCommandSource::UnifiedExecInteraction,
|
||||
(!request.input.is_empty()).then(|| request.input.to_string()),
|
||||
Some(process_id.clone()),
|
||||
);
|
||||
let make_event_ctx = || {
|
||||
ToolEventCtx::new(
|
||||
session_ref.as_ref(),
|
||||
turn_ref.as_ref(),
|
||||
request.call_id,
|
||||
None,
|
||||
)
|
||||
};
|
||||
interaction_emitter
|
||||
.emit(make_event_ctx(), ToolEventStage::Begin)
|
||||
.await;
|
||||
|
||||
if !request.input.is_empty() {
|
||||
if let Err(err) = Self::send_input(&writer_tx, request.input.as_bytes()).await {
|
||||
interaction_emitter
|
||||
.emit(
|
||||
make_event_ctx(),
|
||||
ToolEventStage::Failure(ToolEventFailure::Message(format!(
|
||||
"write_stdin failed: {err:?}"
|
||||
))),
|
||||
)
|
||||
.await;
|
||||
return Err(err);
|
||||
}
|
||||
Self::send_input(&writer_tx, request.input.as_bytes()).await?;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
|
||||
|
|
@ -319,21 +288,6 @@ impl UnifiedExecSessionManager {
|
|||
session_command: Some(session_command.clone()),
|
||||
};
|
||||
|
||||
let interaction_output = ExecToolCallOutput {
|
||||
exit_code: response.exit_code.unwrap_or(0),
|
||||
stdout: StreamOutput::new(response.output.clone()),
|
||||
stderr: StreamOutput::new(String::new()),
|
||||
aggregated_output: StreamOutput::new(response.output.clone()),
|
||||
duration: response.wall_time,
|
||||
timed_out: false,
|
||||
};
|
||||
interaction_emitter
|
||||
.emit(
|
||||
make_event_ctx(),
|
||||
ToolEventStage::Success(interaction_output),
|
||||
)
|
||||
.await;
|
||||
|
||||
if response.process_id.is_some() {
|
||||
Self::emit_waiting_status(&session_ref, &turn_ref, &session_command).await;
|
||||
}
|
||||
|
|
@ -400,7 +354,6 @@ impl UnifiedExecSessionManager {
|
|||
session_ref: Arc::clone(&entry.session_ref),
|
||||
turn_ref: Arc::clone(&entry.turn_ref),
|
||||
command: entry.command.clone(),
|
||||
cwd: entry.cwd.clone(),
|
||||
process_id: entry.process_id.clone(),
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -765,104 +765,7 @@ async fn unified_exec_emits_output_delta_for_write_stdin() -> Result<()> {
|
|||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn unified_exec_emits_begin_for_write_stdin() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
skip_if_sandbox!(Ok(()));
|
||||
skip_if_windows!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.use_experimental_unified_exec_tool = true;
|
||||
config.features.enable(Feature::UnifiedExec);
|
||||
});
|
||||
let TestCodex {
|
||||
codex,
|
||||
cwd,
|
||||
session_configured,
|
||||
..
|
||||
} = builder.build(&server).await?;
|
||||
|
||||
let open_call_id = "uexec-open-for-begin";
|
||||
let open_args = json!({
|
||||
"shell": "bash".to_string(),
|
||||
"cmd": "bash -i".to_string(),
|
||||
"yield_time_ms": 200,
|
||||
});
|
||||
|
||||
let stdin_call_id = "uexec-stdin-begin";
|
||||
let stdin_args = json!({
|
||||
"chars": "echo hello",
|
||||
"session_id": 1000,
|
||||
"yield_time_ms": 400,
|
||||
});
|
||||
|
||||
let responses = vec![
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_function_call(
|
||||
open_call_id,
|
||||
"exec_command",
|
||||
&serde_json::to_string(&open_args)?,
|
||||
),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-2"),
|
||||
ev_function_call(
|
||||
stdin_call_id,
|
||||
"write_stdin",
|
||||
&serde_json::to_string(&stdin_args)?,
|
||||
),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-3"),
|
||||
ev_assistant_message("msg-1", "done"),
|
||||
ev_completed("resp-3"),
|
||||
]),
|
||||
];
|
||||
mount_sse_sequence(&server, responses).await;
|
||||
|
||||
let session_model = session_configured.model.clone();
|
||||
|
||||
codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "begin events for stdin".into(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: cwd.path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
||||
model: session_model,
|
||||
effort: None,
|
||||
summary: ReasoningSummary::Auto,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let begin_event = wait_for_event_match(&codex, |msg| match msg {
|
||||
EventMsg::ExecCommandBegin(ev) if ev.call_id == stdin_call_id => Some(ev.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
assert_command(&begin_event.command, "-lc", "bash -i");
|
||||
assert_eq!(
|
||||
begin_event.interaction_input,
|
||||
Some("echo hello".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
begin_event.source,
|
||||
ExecCommandSource::UnifiedExecInteraction
|
||||
);
|
||||
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn unified_exec_emits_begin_event_for_write_stdin_requests() -> Result<()> {
|
||||
async fn unified_exec_emits_one_begin_and_one_end_event() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
skip_if_sandbox!(Ok(()));
|
||||
skip_if_windows!(Ok(()));
|
||||
|
|
@ -883,8 +786,8 @@ async fn unified_exec_emits_begin_event_for_write_stdin_requests() -> Result<()>
|
|||
let open_call_id = "uexec-open-session";
|
||||
let open_args = json!({
|
||||
"shell": "bash".to_string(),
|
||||
"cmd": "bash -i".to_string(),
|
||||
"yield_time_ms": 250,
|
||||
"cmd": "sleep 0.1".to_string(),
|
||||
"yield_time_ms": 10,
|
||||
});
|
||||
|
||||
let poll_call_id = "uexec-poll-empty";
|
||||
|
|
@ -939,10 +842,12 @@ async fn unified_exec_emits_begin_event_for_write_stdin_requests() -> Result<()>
|
|||
.await?;
|
||||
|
||||
let mut begin_events = Vec::new();
|
||||
let mut end_events = Vec::new();
|
||||
loop {
|
||||
let event_msg = wait_for_event(&codex, |_| true).await;
|
||||
match event_msg {
|
||||
EventMsg::ExecCommandBegin(event) => begin_events.push(event),
|
||||
EventMsg::ExecCommandEnd(event) => end_events.push(event),
|
||||
EventMsg::TaskComplete(_) => break,
|
||||
_ => {}
|
||||
}
|
||||
|
|
@ -950,16 +855,19 @@ async fn unified_exec_emits_begin_event_for_write_stdin_requests() -> Result<()>
|
|||
|
||||
assert_eq!(
|
||||
begin_events.len(),
|
||||
2,
|
||||
"expected begin events for the startup command and the write_stdin call"
|
||||
1,
|
||||
"expected begin events for the startup command"
|
||||
);
|
||||
|
||||
let open_event = begin_events
|
||||
.iter()
|
||||
.find(|ev| ev.call_id == open_call_id)
|
||||
.expect("missing exec_command begin");
|
||||
assert_eq!(
|
||||
end_events.len(),
|
||||
1,
|
||||
"expected end event for the write_stdin call"
|
||||
);
|
||||
|
||||
assert_command(&open_event.command, "-lc", "bash -i");
|
||||
let open_event = &begin_events[0];
|
||||
|
||||
assert_command(&open_event.command, "-lc", "sleep 0.1");
|
||||
|
||||
assert!(
|
||||
open_event.interaction_input.is_none(),
|
||||
|
|
@ -967,18 +875,8 @@ async fn unified_exec_emits_begin_event_for_write_stdin_requests() -> Result<()>
|
|||
);
|
||||
assert_eq!(open_event.source, ExecCommandSource::UnifiedExecStartup);
|
||||
|
||||
let poll_event = begin_events
|
||||
.iter()
|
||||
.find(|ev| ev.call_id == poll_call_id)
|
||||
.expect("missing write_stdin begin");
|
||||
|
||||
assert_command(&poll_event.command, "-lc", "bash -i");
|
||||
|
||||
assert!(
|
||||
poll_event.interaction_input.is_none(),
|
||||
"poll begin events should omit interaction input"
|
||||
);
|
||||
assert_eq!(poll_event.source, ExecCommandSource::UnifiedExecInteraction);
|
||||
let end_event = &end_events[0];
|
||||
assert_eq!(end_event.call_id, open_call_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue