Fix: update parallel tool call exec approval to approve on request id (#11162)
### Summary In parallel tool call, exec command approvals were not approved at request level but at a turn level. i.e. when a single request is approved, the system currently treats all requests in turn as approved. ### Before https://github.com/user-attachments/assets/d50ed129-b3d2-4b2f-97fa-8601eb11f6a8 ### After https://github.com/user-attachments/assets/36528a43-a4aa-4775-9e12-f13287ef19fc
This commit is contained in:
parent
47356ff83c
commit
c4b771a16f
15 changed files with 190 additions and 135 deletions
|
|
@ -97,3 +97,5 @@ This folder is the root of a Cargo workspace. It contains quite a bit of experim
|
|||
- [`exec/`](./exec) "headless" CLI for use in automation.
|
||||
- [`tui/`](./tui) CLI that launches a fullscreen TUI built with [Ratatui](https://ratatui.rs/).
|
||||
- [`cli/`](./cli) CLI multitool that provides the aforementioned CLIs via subcommands.
|
||||
|
||||
If you want to contribute or inspect behavior in detail, start by reading the module-level `README.md` files under each crate and run the project workspace from the top-level `codex-rs` directory so shared config, features, and build scripts stay aligned.
|
||||
|
|
|
|||
|
|
@ -140,7 +140,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
|||
ApiVersion::V1 => {
|
||||
let params = ApplyPatchApprovalParams {
|
||||
conversation_id,
|
||||
call_id,
|
||||
call_id: call_id.clone(),
|
||||
file_changes: changes.clone(),
|
||||
reason,
|
||||
grant_root,
|
||||
|
|
@ -149,7 +149,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
|||
.send_request(ServerRequestPayload::ApplyPatchApproval(params))
|
||||
.await;
|
||||
tokio::spawn(async move {
|
||||
on_patch_approval_response(event_turn_id, rx, conversation).await;
|
||||
on_patch_approval_response(call_id, rx, conversation).await;
|
||||
});
|
||||
}
|
||||
ApiVersion::V2 => {
|
||||
|
|
@ -216,7 +216,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
|||
ApiVersion::V1 => {
|
||||
let params = ExecCommandApprovalParams {
|
||||
conversation_id,
|
||||
call_id,
|
||||
call_id: call_id.clone(),
|
||||
command,
|
||||
cwd,
|
||||
reason,
|
||||
|
|
@ -226,7 +226,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
|||
.send_request(ServerRequestPayload::ExecCommandApproval(params))
|
||||
.await;
|
||||
tokio::spawn(async move {
|
||||
on_exec_approval_response(event_turn_id, rx, conversation).await;
|
||||
on_exec_approval_response(call_id, event_turn_id, rx, conversation).await;
|
||||
});
|
||||
}
|
||||
ApiVersion::V2 => {
|
||||
|
|
@ -1428,7 +1428,7 @@ async fn handle_error(
|
|||
}
|
||||
|
||||
async fn on_patch_approval_response(
|
||||
event_turn_id: String,
|
||||
call_id: String,
|
||||
receiver: oneshot::Receiver<JsonValue>,
|
||||
codex: Arc<CodexThread>,
|
||||
) {
|
||||
|
|
@ -1439,7 +1439,7 @@ async fn on_patch_approval_response(
|
|||
error!("request failed: {err:?}");
|
||||
if let Err(submit_err) = codex
|
||||
.submit(Op::PatchApproval {
|
||||
id: event_turn_id.clone(),
|
||||
id: call_id.clone(),
|
||||
decision: ReviewDecision::Denied,
|
||||
})
|
||||
.await
|
||||
|
|
@ -1460,7 +1460,7 @@ async fn on_patch_approval_response(
|
|||
|
||||
if let Err(err) = codex
|
||||
.submit(Op::PatchApproval {
|
||||
id: event_turn_id,
|
||||
id: call_id,
|
||||
decision: response.decision,
|
||||
})
|
||||
.await
|
||||
|
|
@ -1470,7 +1470,8 @@ async fn on_patch_approval_response(
|
|||
}
|
||||
|
||||
async fn on_exec_approval_response(
|
||||
event_turn_id: String,
|
||||
call_id: String,
|
||||
turn_id: String,
|
||||
receiver: oneshot::Receiver<JsonValue>,
|
||||
conversation: Arc<CodexThread>,
|
||||
) {
|
||||
|
|
@ -1496,7 +1497,8 @@ async fn on_exec_approval_response(
|
|||
|
||||
if let Err(err) = conversation
|
||||
.submit(Op::ExecApproval {
|
||||
id: event_turn_id,
|
||||
id: call_id,
|
||||
turn_id: Some(turn_id),
|
||||
decision: response.decision,
|
||||
})
|
||||
.await
|
||||
|
|
@ -1678,7 +1680,7 @@ async fn on_file_change_request_approval_response(
|
|||
if let Some(status) = completion_status {
|
||||
complete_file_change_item(
|
||||
conversation_id,
|
||||
item_id,
|
||||
item_id.clone(),
|
||||
changes,
|
||||
status,
|
||||
event_turn_id.clone(),
|
||||
|
|
@ -1690,7 +1692,7 @@ async fn on_file_change_request_approval_response(
|
|||
|
||||
if let Err(err) = codex
|
||||
.submit(Op::PatchApproval {
|
||||
id: event_turn_id,
|
||||
id: item_id,
|
||||
decision,
|
||||
})
|
||||
.await
|
||||
|
|
@ -1771,7 +1773,8 @@ async fn on_command_execution_request_approval_response(
|
|||
|
||||
if let Err(err) = conversation
|
||||
.submit(Op::ExecApproval {
|
||||
id: event_turn_id,
|
||||
id: item_id,
|
||||
turn_id: Some(event_turn_id),
|
||||
decision,
|
||||
})
|
||||
.await
|
||||
|
|
|
|||
|
|
@ -1946,7 +1946,7 @@ impl Session {
|
|||
|
||||
/// Emit an exec approval request event and await the user's decision.
|
||||
///
|
||||
/// The request is keyed by `sub_id`/`call_id` so matching responses are delivered
|
||||
/// The request is keyed by `call_id` so matching responses are delivered
|
||||
/// to the correct in-flight turn. If the task is aborted, this returns the
|
||||
/// default `ReviewDecision` (`Denied`).
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
|
|
@ -1959,22 +1959,21 @@ impl Session {
|
|||
reason: Option<String>,
|
||||
proposed_execpolicy_amendment: Option<ExecPolicyAmendment>,
|
||||
) -> ReviewDecision {
|
||||
let sub_id = turn_context.sub_id.clone();
|
||||
// Add the tx_approve callback to the map before sending the request.
|
||||
let (tx_approve, rx_approve) = oneshot::channel();
|
||||
let event_id = sub_id.clone();
|
||||
let approval_id = call_id.clone();
|
||||
let prev_entry = {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
match active.as_mut() {
|
||||
Some(at) => {
|
||||
let mut ts = at.turn_state.lock().await;
|
||||
ts.insert_pending_approval(sub_id, tx_approve)
|
||||
ts.insert_pending_approval(approval_id.clone(), tx_approve)
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
};
|
||||
if prev_entry.is_some() {
|
||||
warn!("Overwriting existing pending approval for sub_id: {event_id}");
|
||||
warn!("Overwriting existing pending approval for call_id: {approval_id}");
|
||||
}
|
||||
|
||||
let parsed_cmd = parse_command(&command);
|
||||
|
|
@ -1999,22 +1998,21 @@ impl Session {
|
|||
reason: Option<String>,
|
||||
grant_root: Option<PathBuf>,
|
||||
) -> oneshot::Receiver<ReviewDecision> {
|
||||
let sub_id = turn_context.sub_id.clone();
|
||||
// Add the tx_approve callback to the map before sending the request.
|
||||
let (tx_approve, rx_approve) = oneshot::channel();
|
||||
let event_id = sub_id.clone();
|
||||
let approval_id = call_id.clone();
|
||||
let prev_entry = {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
match active.as_mut() {
|
||||
Some(at) => {
|
||||
let mut ts = at.turn_state.lock().await;
|
||||
ts.insert_pending_approval(sub_id, tx_approve)
|
||||
ts.insert_pending_approval(approval_id.clone(), tx_approve)
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
};
|
||||
if prev_entry.is_some() {
|
||||
warn!("Overwriting existing pending approval for sub_id: {event_id}");
|
||||
warn!("Overwriting existing pending approval for call_id: {approval_id}");
|
||||
}
|
||||
|
||||
let event = EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
|
||||
|
|
@ -2106,13 +2104,13 @@ impl Session {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn notify_approval(&self, sub_id: &str, decision: ReviewDecision) {
|
||||
pub async fn notify_approval(&self, approval_id: &str, decision: ReviewDecision) {
|
||||
let entry = {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
match active.as_mut() {
|
||||
Some(at) => {
|
||||
let mut ts = at.turn_state.lock().await;
|
||||
ts.remove_pending_approval(sub_id)
|
||||
ts.remove_pending_approval(approval_id)
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
|
|
@ -2122,7 +2120,7 @@ impl Session {
|
|||
tx_approve.send(decision).ok();
|
||||
}
|
||||
None => {
|
||||
warn!("No pending approval found for sub_id: {sub_id}");
|
||||
warn!("No pending approval found for call_id: {approval_id}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -2881,8 +2879,12 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
|
|||
handlers::user_input_or_turn(&sess, sub.id.clone(), sub.op, &mut previous_context)
|
||||
.await;
|
||||
}
|
||||
Op::ExecApproval { id, decision } => {
|
||||
handlers::exec_approval(&sess, id, decision).await;
|
||||
Op::ExecApproval {
|
||||
id: approval_id,
|
||||
turn_id,
|
||||
decision,
|
||||
} => {
|
||||
handlers::exec_approval(&sess, approval_id, turn_id, decision).await;
|
||||
}
|
||||
Op::PatchApproval { id, decision } => {
|
||||
handlers::patch_approval(&sess, id, decision).await;
|
||||
|
|
@ -3205,7 +3207,13 @@ mod handlers {
|
|||
|
||||
/// Propagate a user's exec approval decision to the session.
|
||||
/// Also optionally applies an execpolicy amendment.
|
||||
pub async fn exec_approval(sess: &Arc<Session>, id: String, decision: ReviewDecision) {
|
||||
pub async fn exec_approval(
|
||||
sess: &Arc<Session>,
|
||||
approval_id: String,
|
||||
turn_id: Option<String>,
|
||||
decision: ReviewDecision,
|
||||
) {
|
||||
let event_turn_id = turn_id.unwrap_or_else(|| approval_id.clone());
|
||||
if let ReviewDecision::ApprovedExecpolicyAmendment {
|
||||
proposed_execpolicy_amendment,
|
||||
} = &decision
|
||||
|
|
@ -3215,15 +3223,18 @@ mod handlers {
|
|||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
sess.record_execpolicy_amendment_message(&id, proposed_execpolicy_amendment)
|
||||
.await;
|
||||
sess.record_execpolicy_amendment_message(
|
||||
&event_turn_id,
|
||||
proposed_execpolicy_amendment,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
let message = format!("Failed to apply execpolicy amendment: {err}");
|
||||
tracing::warn!("{message}");
|
||||
let warning = EventMsg::Warning(WarningEvent { message });
|
||||
sess.send_event_raw(Event {
|
||||
id: id.clone(),
|
||||
id: event_turn_id.clone(),
|
||||
msg: warning,
|
||||
})
|
||||
.await;
|
||||
|
|
@ -3234,7 +3245,7 @@ mod handlers {
|
|||
ReviewDecision::Abort => {
|
||||
sess.interrupt_task().await;
|
||||
}
|
||||
other => sess.notify_approval(&id, other).await,
|
||||
other => sess.notify_approval(&approval_id, other).await,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -307,7 +307,7 @@ async fn forward_ops(
|
|||
/// Handle an ExecApprovalRequest by consulting the parent session and replying.
|
||||
async fn handle_exec_approval(
|
||||
codex: &Codex,
|
||||
id: String,
|
||||
turn_id: String,
|
||||
parent_session: &Session,
|
||||
parent_ctx: &TurnContext,
|
||||
event: ExecApprovalRequestEvent,
|
||||
|
|
@ -321,6 +321,7 @@ async fn handle_exec_approval(
|
|||
proposed_execpolicy_amendment,
|
||||
..
|
||||
} = event;
|
||||
let approval_id = call_id.clone();
|
||||
// Race approval with cancellation and timeout to avoid hangs.
|
||||
let approval_fut = parent_session.request_command_approval(
|
||||
parent_ctx,
|
||||
|
|
@ -330,21 +331,22 @@ async fn handle_exec_approval(
|
|||
reason,
|
||||
proposed_execpolicy_amendment,
|
||||
);
|
||||
let decision = await_approval_with_cancel(
|
||||
approval_fut,
|
||||
parent_session,
|
||||
&parent_ctx.sub_id,
|
||||
cancel_token,
|
||||
)
|
||||
.await;
|
||||
let decision =
|
||||
await_approval_with_cancel(approval_fut, parent_session, &approval_id, cancel_token).await;
|
||||
|
||||
let _ = codex.submit(Op::ExecApproval { id, decision }).await;
|
||||
let _ = codex
|
||||
.submit(Op::ExecApproval {
|
||||
id: approval_id,
|
||||
turn_id: Some(turn_id),
|
||||
decision,
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Handle an ApplyPatchApprovalRequest by consulting the parent session and replying.
|
||||
async fn handle_patch_approval(
|
||||
codex: &Codex,
|
||||
id: String,
|
||||
_id: String,
|
||||
parent_session: &Session,
|
||||
parent_ctx: &TurnContext,
|
||||
event: ApplyPatchApprovalRequestEvent,
|
||||
|
|
@ -357,17 +359,23 @@ async fn handle_patch_approval(
|
|||
grant_root,
|
||||
..
|
||||
} = event;
|
||||
let approval_id = call_id.clone();
|
||||
let decision_rx = parent_session
|
||||
.request_patch_approval(parent_ctx, call_id, changes, reason, grant_root)
|
||||
.await;
|
||||
let decision = await_approval_with_cancel(
|
||||
async move { decision_rx.await.unwrap_or_default() },
|
||||
parent_session,
|
||||
&parent_ctx.sub_id,
|
||||
&approval_id,
|
||||
cancel_token,
|
||||
)
|
||||
.await;
|
||||
let _ = codex.submit(Op::PatchApproval { id, decision }).await;
|
||||
let _ = codex
|
||||
.submit(Op::PatchApproval {
|
||||
id: approval_id,
|
||||
decision,
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn handle_request_user_input(
|
||||
|
|
@ -423,7 +431,7 @@ where
|
|||
async fn await_approval_with_cancel<F>(
|
||||
fut: F,
|
||||
parent_session: &Session,
|
||||
sub_id: &str,
|
||||
approval_id: &str,
|
||||
cancel_token: &CancellationToken,
|
||||
) -> codex_protocol::protocol::ReviewDecision
|
||||
where
|
||||
|
|
@ -433,7 +441,7 @@ where
|
|||
biased;
|
||||
_ = cancel_token.cancelled() => {
|
||||
parent_session
|
||||
.notify_approval(sub_id, codex_protocol::protocol::ReviewDecision::Abort)
|
||||
.notify_approval(approval_id, codex_protocol::protocol::ReviewDecision::Abort)
|
||||
.await;
|
||||
codex_protocol::protocol::ReviewDecision::Abort
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,7 +16,6 @@ use schemars::JsonSchema;
|
|||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
use std::env::VarError;
|
||||
use std::time::Duration;
|
||||
|
||||
const DEFAULT_STREAM_IDLE_TIMEOUT_MS: u64 = 300_000;
|
||||
|
|
@ -116,7 +115,9 @@ pub struct ModelProviderInfo {
|
|||
|
||||
impl ModelProviderInfo {
|
||||
fn build_header_map(&self) -> crate::error::Result<HeaderMap> {
|
||||
let mut headers = HeaderMap::new();
|
||||
let capacity = self.http_headers.as_ref().map_or(0, HashMap::len)
|
||||
+ self.env_http_headers.as_ref().map_or(0, HashMap::len);
|
||||
let mut headers = HeaderMap::with_capacity(capacity);
|
||||
if let Some(extra) = &self.http_headers {
|
||||
for (k, v) in extra {
|
||||
if let (Ok(name), Ok(value)) = (HeaderName::try_from(k), HeaderValue::try_from(v)) {
|
||||
|
|
@ -179,21 +180,16 @@ impl ModelProviderInfo {
|
|||
pub fn api_key(&self) -> crate::error::Result<Option<String>> {
|
||||
match &self.env_key {
|
||||
Some(env_key) => {
|
||||
let env_value = std::env::var(env_key);
|
||||
env_value
|
||||
.and_then(|v| {
|
||||
if v.trim().is_empty() {
|
||||
Err(VarError::NotPresent)
|
||||
} else {
|
||||
Ok(Some(v))
|
||||
}
|
||||
})
|
||||
.map_err(|_| {
|
||||
let api_key = std::env::var(env_key)
|
||||
.ok()
|
||||
.filter(|v| !v.trim().is_empty())
|
||||
.ok_or_else(|| {
|
||||
crate::error::CodexErr::EnvVar(EnvVarError {
|
||||
var: env_key.clone(),
|
||||
instructions: self.env_key_instructions.clone(),
|
||||
})
|
||||
})
|
||||
})?;
|
||||
Ok(Some(api_key))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
|
|
@ -298,20 +294,19 @@ pub fn built_in_model_providers() -> HashMap<String, ModelProviderInfo> {
|
|||
pub fn create_oss_provider(default_provider_port: u16, wire_api: WireApi) -> ModelProviderInfo {
|
||||
// These CODEX_OSS_ environment variables are experimental: we may
|
||||
// switch to reading values from config.toml instead.
|
||||
let codex_oss_base_url = match std::env::var("CODEX_OSS_BASE_URL")
|
||||
let default_codex_oss_base_url = format!(
|
||||
"http://localhost:{codex_oss_port}/v1",
|
||||
codex_oss_port = std::env::var("CODEX_OSS_PORT")
|
||||
.ok()
|
||||
.filter(|value| !value.trim().is_empty())
|
||||
.and_then(|value| value.parse::<u16>().ok())
|
||||
.unwrap_or(default_provider_port)
|
||||
);
|
||||
|
||||
let codex_oss_base_url = std::env::var("CODEX_OSS_BASE_URL")
|
||||
.ok()
|
||||
.filter(|v| !v.trim().is_empty())
|
||||
{
|
||||
Some(url) => url,
|
||||
None => format!(
|
||||
"http://localhost:{port}/v1",
|
||||
port = std::env::var("CODEX_OSS_PORT")
|
||||
.ok()
|
||||
.filter(|v| !v.trim().is_empty())
|
||||
.and_then(|v| v.parse::<u16>().ok())
|
||||
.unwrap_or(default_provider_port)
|
||||
),
|
||||
};
|
||||
.unwrap_or(default_codex_oss_base_url);
|
||||
create_oss_provider_with_base_url(&codex_oss_base_url, wire_api)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1528,7 +1528,8 @@ async fn run_scenario(scenario: &ScenarioSpec) -> Result<()> {
|
|||
}
|
||||
test.codex
|
||||
.submit(Op::ExecApproval {
|
||||
id: "0".into(),
|
||||
id: approval.call_id,
|
||||
turn_id: None,
|
||||
decision: decision.clone(),
|
||||
})
|
||||
.await?;
|
||||
|
|
@ -1549,7 +1550,7 @@ async fn run_scenario(scenario: &ScenarioSpec) -> Result<()> {
|
|||
}
|
||||
test.codex
|
||||
.submit(Op::PatchApproval {
|
||||
id: "0".into(),
|
||||
id: approval.call_id,
|
||||
decision: decision.clone(),
|
||||
})
|
||||
.await?;
|
||||
|
|
@ -1624,10 +1625,10 @@ async fn approving_apply_patch_for_session_skips_future_prompts_for_same_file()
|
|||
sandbox_policy.clone(),
|
||||
)
|
||||
.await?;
|
||||
let _ = expect_patch_approval(&test, call_id_1).await;
|
||||
let approval = expect_patch_approval(&test, call_id_1).await;
|
||||
test.codex
|
||||
.submit(Op::PatchApproval {
|
||||
id: "0".into(),
|
||||
id: approval.call_id,
|
||||
decision: ReviewDecision::ApprovedForSession,
|
||||
})
|
||||
.await?;
|
||||
|
|
@ -1746,7 +1747,8 @@ async fn approving_execpolicy_amendment_persists_policy_and_skips_future_prompts
|
|||
|
||||
test.codex
|
||||
.submit(Op::ExecApproval {
|
||||
id: "0".into(),
|
||||
id: approval.call_id,
|
||||
turn_id: None,
|
||||
decision: ReviewDecision::ApprovedExecpolicyAmendment {
|
||||
proposed_execpolicy_amendment: expected_execpolicy_amendment.clone(),
|
||||
},
|
||||
|
|
|
|||
|
|
@ -87,15 +87,19 @@ async fn codex_delegate_forwards_exec_approval_and_proceeds_on_approval() {
|
|||
.await;
|
||||
|
||||
// Expect parent-side approval request (forwarded by delegate).
|
||||
wait_for_event(&test.codex, |ev| {
|
||||
let approval_event = wait_for_event(&test.codex, |ev| {
|
||||
matches!(ev, EventMsg::ExecApprovalRequest(_))
|
||||
})
|
||||
.await;
|
||||
let EventMsg::ExecApprovalRequest(approval) = approval_event else {
|
||||
panic!("expected ExecApprovalRequest event");
|
||||
};
|
||||
|
||||
// Approve via parent; id "0" is the active sub_id in tests.
|
||||
// Approve via parent using the emitted approval call ID.
|
||||
test.codex
|
||||
.submit(Op::ExecApproval {
|
||||
id: "0".into(),
|
||||
id: approval.call_id,
|
||||
turn_id: None,
|
||||
decision: ReviewDecision::Approved,
|
||||
})
|
||||
.await
|
||||
|
|
@ -161,15 +165,18 @@ async fn codex_delegate_forwards_patch_approval_and_proceeds_on_decision() {
|
|||
matches!(ev, EventMsg::EnteredReviewMode(_))
|
||||
})
|
||||
.await;
|
||||
wait_for_event(&test.codex, |ev| {
|
||||
let approval_event = wait_for_event(&test.codex, |ev| {
|
||||
matches!(ev, EventMsg::ApplyPatchApprovalRequest(_))
|
||||
})
|
||||
.await;
|
||||
let EventMsg::ApplyPatchApprovalRequest(approval) = approval_event else {
|
||||
panic!("expected ApplyPatchApprovalRequest event");
|
||||
};
|
||||
|
||||
// Deny via parent so delegate can continue; id "0" is the active sub_id in tests.
|
||||
// Deny via parent so delegate can continue, using the emitted approval call ID.
|
||||
test.codex
|
||||
.submit(Op::PatchApproval {
|
||||
id: "0".into(),
|
||||
id: approval.call_id,
|
||||
decision: ReviewDecision::Denied,
|
||||
})
|
||||
.await
|
||||
|
|
|
|||
|
|
@ -1032,11 +1032,16 @@ async fn handle_container_exec_user_approved_records_tool_decision() {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
|
||||
let approval_event =
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
|
||||
let EventMsg::ExecApprovalRequest(approval) = approval_event else {
|
||||
panic!("expected ExecApprovalRequest event");
|
||||
};
|
||||
|
||||
codex
|
||||
.submit(Op::ExecApproval {
|
||||
id: "0".into(),
|
||||
id: approval.call_id,
|
||||
turn_id: None,
|
||||
decision: ReviewDecision::Approved,
|
||||
})
|
||||
.await
|
||||
|
|
@ -1092,11 +1097,16 @@ async fn handle_container_exec_user_approved_for_session_records_tool_decision()
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
|
||||
let approval_event =
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
|
||||
let EventMsg::ExecApprovalRequest(approval) = approval_event else {
|
||||
panic!("expected ExecApprovalRequest event");
|
||||
};
|
||||
|
||||
codex
|
||||
.submit(Op::ExecApproval {
|
||||
id: "0".into(),
|
||||
id: approval.call_id,
|
||||
turn_id: None,
|
||||
decision: ReviewDecision::ApprovedForSession,
|
||||
})
|
||||
.await
|
||||
|
|
@ -1152,11 +1162,16 @@ async fn handle_sandbox_error_user_approves_retry_records_tool_decision() {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
|
||||
let approval_event =
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
|
||||
let EventMsg::ExecApprovalRequest(approval) = approval_event else {
|
||||
panic!("expected ExecApprovalRequest event");
|
||||
};
|
||||
|
||||
codex
|
||||
.submit(Op::ExecApproval {
|
||||
id: "0".into(),
|
||||
id: approval.call_id,
|
||||
turn_id: None,
|
||||
decision: ReviewDecision::Approved,
|
||||
})
|
||||
.await
|
||||
|
|
@ -1212,11 +1227,16 @@ async fn handle_container_exec_user_denies_records_tool_decision() {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
|
||||
let approval_event =
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
|
||||
let EventMsg::ExecApprovalRequest(approval) = approval_event else {
|
||||
panic!("expected ExecApprovalRequest event");
|
||||
};
|
||||
|
||||
codex
|
||||
.submit(Op::ExecApproval {
|
||||
id: "0".into(),
|
||||
id: approval.call_id,
|
||||
turn_id: None,
|
||||
decision: ReviewDecision::Denied,
|
||||
})
|
||||
.await
|
||||
|
|
@ -1272,11 +1292,16 @@ async fn handle_sandbox_error_user_approves_for_session_records_tool_decision()
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
|
||||
let approval_event =
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
|
||||
let EventMsg::ExecApprovalRequest(approval) = approval_event else {
|
||||
panic!("expected ExecApprovalRequest event");
|
||||
};
|
||||
|
||||
codex
|
||||
.submit(Op::ExecApproval {
|
||||
id: "0".into(),
|
||||
id: approval.call_id,
|
||||
turn_id: None,
|
||||
decision: ReviewDecision::ApprovedForSession,
|
||||
})
|
||||
.await
|
||||
|
|
@ -1333,11 +1358,16 @@ async fn handle_sandbox_error_user_denies_records_tool_decision() {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
|
||||
let approval_event =
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
|
||||
let EventMsg::ExecApprovalRequest(approval) = approval_event else {
|
||||
panic!("expected ExecApprovalRequest event");
|
||||
};
|
||||
|
||||
codex
|
||||
.submit(Op::ExecApproval {
|
||||
id: "0".into(),
|
||||
id: approval.call_id,
|
||||
turn_id: None,
|
||||
decision: ReviewDecision::Denied,
|
||||
})
|
||||
.await
|
||||
|
|
|
|||
|
|
@ -60,6 +60,7 @@ pub(crate) async fn handle_exec_approval_request(
|
|||
codex_parsed_cmd: Vec<ParsedCommand>,
|
||||
thread_id: ThreadId,
|
||||
) {
|
||||
let approval_id = call_id.clone();
|
||||
let escaped_command =
|
||||
shlex::try_join(command.iter().map(String::as_str)).unwrap_or_else(|_| command.join(" "));
|
||||
let message = format!(
|
||||
|
|
@ -100,14 +101,16 @@ pub(crate) async fn handle_exec_approval_request(
|
|||
// Listen for the response on a separate task so we don't block the main agent loop.
|
||||
{
|
||||
let codex = codex.clone();
|
||||
let approval_id = approval_id.clone();
|
||||
let event_id = event_id.clone();
|
||||
tokio::spawn(async move {
|
||||
on_exec_approval_response(event_id, on_response, codex).await;
|
||||
on_exec_approval_response(approval_id, event_id, on_response, codex).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn on_exec_approval_response(
|
||||
approval_id: String,
|
||||
event_id: String,
|
||||
receiver: tokio::sync::oneshot::Receiver<serde_json::Value>,
|
||||
codex: Arc<CodexThread>,
|
||||
|
|
@ -133,7 +136,8 @@ async fn on_exec_approval_response(
|
|||
|
||||
if let Err(err) = codex
|
||||
.submit(Op::ExecApproval {
|
||||
id: event_id,
|
||||
id: approval_id,
|
||||
turn_id: Some(event_id),
|
||||
decision: response.decision,
|
||||
})
|
||||
.await
|
||||
|
|
|
|||
|
|
@ -53,6 +53,7 @@ pub(crate) async fn handle_patch_approval_request(
|
|||
event_id: String,
|
||||
thread_id: ThreadId,
|
||||
) {
|
||||
let approval_id = call_id.clone();
|
||||
let mut message_lines = Vec::new();
|
||||
if let Some(r) = &reason {
|
||||
message_lines.push(r.clone());
|
||||
|
|
@ -92,15 +93,15 @@ pub(crate) async fn handle_patch_approval_request(
|
|||
// Listen for the response on a separate task so we don't block the main agent loop.
|
||||
{
|
||||
let codex = codex.clone();
|
||||
let event_id = event_id.clone();
|
||||
let approval_id = approval_id.clone();
|
||||
tokio::spawn(async move {
|
||||
on_patch_approval_response(event_id, on_response, codex).await;
|
||||
on_patch_approval_response(approval_id, on_response, codex).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn on_patch_approval_response(
|
||||
event_id: String,
|
||||
approval_id: String,
|
||||
receiver: tokio::sync::oneshot::Receiver<serde_json::Value>,
|
||||
codex: Arc<CodexThread>,
|
||||
) {
|
||||
|
|
@ -111,7 +112,7 @@ pub(crate) async fn on_patch_approval_response(
|
|||
error!("request failed: {err:?}");
|
||||
if let Err(submit_err) = codex
|
||||
.submit(Op::PatchApproval {
|
||||
id: event_id.clone(),
|
||||
id: approval_id.clone(),
|
||||
decision: ReviewDecision::Denied,
|
||||
})
|
||||
.await
|
||||
|
|
@ -131,7 +132,7 @@ pub(crate) async fn on_patch_approval_response(
|
|||
|
||||
if let Err(err) = codex
|
||||
.submit(Op::PatchApproval {
|
||||
id: event_id,
|
||||
id: approval_id,
|
||||
decision: response.decision,
|
||||
})
|
||||
.await
|
||||
|
|
|
|||
|
|
@ -198,6 +198,9 @@ pub enum Op {
|
|||
ExecApproval {
|
||||
/// The id of the submission we are approving
|
||||
id: String,
|
||||
/// Turn id associated with the approval event, when available.
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
turn_id: Option<String>,
|
||||
/// The user's decision in response to the request.
|
||||
decision: ReviewDecision,
|
||||
},
|
||||
|
|
|
|||
|
|
@ -195,6 +195,7 @@ impl ApprovalOverlay {
|
|||
self.app_event_tx.send(AppEvent::InsertHistoryCell(cell));
|
||||
self.app_event_tx.send(AppEvent::CodexOp(Op::ExecApproval {
|
||||
id: id.to_string(),
|
||||
turn_id: None,
|
||||
decision,
|
||||
}));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1786,21 +1786,19 @@ impl ChatWidget {
|
|||
self.add_to_history(history_cell::new_plan_update(update));
|
||||
}
|
||||
|
||||
fn on_exec_approval_request(&mut self, id: String, ev: ExecApprovalRequestEvent) {
|
||||
let id2 = id.clone();
|
||||
fn on_exec_approval_request(&mut self, _id: String, ev: ExecApprovalRequestEvent) {
|
||||
let ev2 = ev.clone();
|
||||
self.defer_or_handle(
|
||||
|q| q.push_exec_approval(id, ev),
|
||||
|s| s.handle_exec_approval_now(id2, ev2),
|
||||
|q| q.push_exec_approval(ev),
|
||||
|s| s.handle_exec_approval_now(ev2),
|
||||
);
|
||||
}
|
||||
|
||||
fn on_apply_patch_approval_request(&mut self, id: String, ev: ApplyPatchApprovalRequestEvent) {
|
||||
let id2 = id.clone();
|
||||
fn on_apply_patch_approval_request(&mut self, _id: String, ev: ApplyPatchApprovalRequestEvent) {
|
||||
let ev2 = ev.clone();
|
||||
self.defer_or_handle(
|
||||
|q| q.push_apply_patch_approval(id, ev),
|
||||
|s| s.handle_apply_patch_approval_now(id2, ev2),
|
||||
|q| q.push_apply_patch_approval(ev),
|
||||
|s| s.handle_apply_patch_approval_now(ev2),
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -2365,14 +2363,14 @@ impl ChatWidget {
|
|||
self.had_work_activity = true;
|
||||
}
|
||||
|
||||
pub(crate) fn handle_exec_approval_now(&mut self, id: String, ev: ExecApprovalRequestEvent) {
|
||||
pub(crate) fn handle_exec_approval_now(&mut self, ev: ExecApprovalRequestEvent) {
|
||||
self.flush_answer_stream_with_separator();
|
||||
let command = shlex::try_join(ev.command.iter().map(String::as_str))
|
||||
.unwrap_or_else(|_| ev.command.join(" "));
|
||||
self.notify(Notification::ExecApprovalRequested { command });
|
||||
|
||||
let request = ApprovalRequest::Exec {
|
||||
id,
|
||||
id: ev.call_id,
|
||||
command: ev.command,
|
||||
reason: ev.reason,
|
||||
proposed_execpolicy_amendment: ev.proposed_execpolicy_amendment,
|
||||
|
|
@ -2382,15 +2380,11 @@ impl ChatWidget {
|
|||
self.request_redraw();
|
||||
}
|
||||
|
||||
pub(crate) fn handle_apply_patch_approval_now(
|
||||
&mut self,
|
||||
id: String,
|
||||
ev: ApplyPatchApprovalRequestEvent,
|
||||
) {
|
||||
pub(crate) fn handle_apply_patch_approval_now(&mut self, ev: ApplyPatchApprovalRequestEvent) {
|
||||
self.flush_answer_stream_with_separator();
|
||||
|
||||
let request = ApprovalRequest::ApplyPatch {
|
||||
id,
|
||||
id: ev.call_id,
|
||||
reason: ev.reason,
|
||||
changes: ev.changes.clone(),
|
||||
cwd: self.config.cwd.clone(),
|
||||
|
|
|
|||
|
|
@ -14,8 +14,8 @@ use super::ChatWidget;
|
|||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum QueuedInterrupt {
|
||||
ExecApproval(String, ExecApprovalRequestEvent),
|
||||
ApplyPatchApproval(String, ApplyPatchApprovalRequestEvent),
|
||||
ExecApproval(ExecApprovalRequestEvent),
|
||||
ApplyPatchApproval(ApplyPatchApprovalRequestEvent),
|
||||
Elicitation(ElicitationRequestEvent),
|
||||
RequestUserInput(RequestUserInputEvent),
|
||||
ExecBegin(ExecCommandBeginEvent),
|
||||
|
|
@ -42,17 +42,13 @@ impl InterruptManager {
|
|||
self.queue.is_empty()
|
||||
}
|
||||
|
||||
pub(crate) fn push_exec_approval(&mut self, id: String, ev: ExecApprovalRequestEvent) {
|
||||
self.queue.push_back(QueuedInterrupt::ExecApproval(id, ev));
|
||||
pub(crate) fn push_exec_approval(&mut self, ev: ExecApprovalRequestEvent) {
|
||||
self.queue.push_back(QueuedInterrupt::ExecApproval(ev));
|
||||
}
|
||||
|
||||
pub(crate) fn push_apply_patch_approval(
|
||||
&mut self,
|
||||
id: String,
|
||||
ev: ApplyPatchApprovalRequestEvent,
|
||||
) {
|
||||
pub(crate) fn push_apply_patch_approval(&mut self, ev: ApplyPatchApprovalRequestEvent) {
|
||||
self.queue
|
||||
.push_back(QueuedInterrupt::ApplyPatchApproval(id, ev));
|
||||
.push_back(QueuedInterrupt::ApplyPatchApproval(ev));
|
||||
}
|
||||
|
||||
pub(crate) fn push_elicitation(&mut self, ev: ElicitationRequestEvent) {
|
||||
|
|
@ -86,10 +82,8 @@ impl InterruptManager {
|
|||
pub(crate) fn flush_all(&mut self, chat: &mut ChatWidget) {
|
||||
while let Some(q) = self.queue.pop_front() {
|
||||
match q {
|
||||
QueuedInterrupt::ExecApproval(id, ev) => chat.handle_exec_approval_now(id, ev),
|
||||
QueuedInterrupt::ApplyPatchApproval(id, ev) => {
|
||||
chat.handle_apply_patch_approval_now(id, ev)
|
||||
}
|
||||
QueuedInterrupt::ExecApproval(ev) => chat.handle_exec_approval_now(ev),
|
||||
QueuedInterrupt::ApplyPatchApproval(ev) => chat.handle_apply_patch_approval_now(ev),
|
||||
QueuedInterrupt::Elicitation(ev) => chat.handle_elicitation_request_now(ev),
|
||||
QueuedInterrupt::RequestUserInput(ev) => chat.handle_request_user_input_now(ev),
|
||||
QueuedInterrupt::ExecBegin(ev) => chat.handle_exec_begin_now(ev),
|
||||
|
|
|
|||
|
|
@ -5130,9 +5130,9 @@ async fn apply_patch_manual_flow_snapshot() {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn apply_patch_approval_sends_op_with_submission_id() {
|
||||
async fn apply_patch_approval_sends_op_with_call_id() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
|
||||
// Simulate receiving an approval request with a distinct submission id and call id
|
||||
// Simulate receiving an approval request with a distinct event id and call id.
|
||||
let mut changes = HashMap::new();
|
||||
changes.insert(
|
||||
PathBuf::from("file.rs"),
|
||||
|
|
@ -5155,11 +5155,11 @@ async fn apply_patch_approval_sends_op_with_submission_id() {
|
|||
// Approve via key press 'y'
|
||||
chat.handle_key_event(KeyEvent::new(KeyCode::Char('y'), KeyModifiers::NONE));
|
||||
|
||||
// Expect a CodexOp with PatchApproval carrying the submission id, not call id
|
||||
// Expect a CodexOp with PatchApproval carrying the call id.
|
||||
let mut found = false;
|
||||
while let Ok(app_ev) = rx.try_recv() {
|
||||
if let AppEvent::CodexOp(Op::PatchApproval { id, decision }) = app_ev {
|
||||
assert_eq!(id, "sub-123");
|
||||
assert_eq!(id, "call-999");
|
||||
assert_matches!(decision, codex_core::protocol::ReviewDecision::Approved);
|
||||
found = true;
|
||||
break;
|
||||
|
|
@ -5207,7 +5207,7 @@ async fn apply_patch_full_flow_integration_like() {
|
|||
.expect("expected op forwarded to codex channel");
|
||||
match forwarded {
|
||||
Op::PatchApproval { id, decision } => {
|
||||
assert_eq!(id, "sub-xyz");
|
||||
assert_eq!(id, "call-1");
|
||||
assert_matches!(decision, codex_core::protocol::ReviewDecision::Approved);
|
||||
}
|
||||
other => panic!("unexpected op forwarded: {other:?}"),
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue