diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index abb878ae0..52aefb472 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -106,6 +106,7 @@ use tokio::sync::mpsc; use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::unbounded_channel; +use tokio::task::JoinHandle; use toml::Value as TomlValue; mod pending_interactive_replay; @@ -684,6 +685,7 @@ pub(crate) struct App { windows_sandbox: WindowsSandboxState, thread_event_channels: HashMap, + thread_event_listener_tasks: HashMap>, agent_picker_threads: HashMap, active_thread_id: Option, active_thread_rx: Option>, @@ -863,6 +865,23 @@ impl App { self.suppress_shutdown_complete = true; self.chat_widget.submit_op(Op::Shutdown); self.server.remove_thread(&thread_id).await; + self.abort_thread_event_listener(thread_id); + } + } + + fn abort_thread_event_listener(&mut self, thread_id: ThreadId) { + if let Some(handle) = self.thread_event_listener_tasks.remove(&thread_id) { + handle.abort(); + } + } + + fn abort_all_thread_event_listeners(&mut self) { + for handle in self + .thread_event_listener_tasks + .drain() + .map(|(_, handle)| handle) + { + handle.abort(); } } @@ -928,6 +947,14 @@ impl App { self.refresh_pending_thread_approvals().await; } + async fn note_thread_outbound_op(&mut self, thread_id: ThreadId, op: &Op) { + let Some(channel) = self.thread_event_channels.get(&thread_id) else { + return; + }; + let mut store = channel.store.lock().await; + store.note_outbound_op(op); + } + async fn note_active_thread_outbound_op(&mut self, op: &Op) { if !ThreadEventStore::op_can_change_pending_replay_state(op) { return; @@ -935,11 +962,113 @@ impl App { let Some(thread_id) = self.active_thread_id else { return; }; - let Some(channel) = self.thread_event_channels.get(&thread_id) else { - return; + self.note_thread_outbound_op(thread_id, op).await; + } + + fn thread_label(&self, thread_id: ThreadId) -> String { + let is_primary = self.primary_thread_id == Some(thread_id); + let fallback_label = if is_primary { + "Main [default]".to_string() + } else { + let thread_id = thread_id.to_string(); + let short_id: String = thread_id.chars().take(8).collect(); + format!("Agent ({short_id})") }; - let mut store = channel.store.lock().await; - store.note_outbound_op(op); + if let Some(entry) = self.agent_picker_threads.get(&thread_id) { + let label = format_agent_picker_item_name( + entry.agent_nickname.as_deref(), + entry.agent_role.as_deref(), + is_primary, + ); + if label == "Agent" { + let thread_id = thread_id.to_string(); + let short_id: String = thread_id.chars().take(8).collect(); + format!("{label} ({short_id})") + } else { + label + } + } else { + fallback_label + } + } + + async fn thread_cwd(&self, thread_id: ThreadId) -> Option { + let channel = self.thread_event_channels.get(&thread_id)?; + let store = channel.store.lock().await; + match store.session_configured.as_ref().map(|event| &event.msg) { + Some(EventMsg::SessionConfigured(session)) => Some(session.cwd.clone()), + _ => None, + } + } + + async fn approval_request_for_thread_event( + &self, + thread_id: ThreadId, + event: &Event, + ) -> Option { + let thread_label = Some(self.thread_label(thread_id)); + match &event.msg { + EventMsg::ExecApprovalRequest(ev) => Some(ApprovalRequest::Exec { + thread_id, + thread_label, + id: ev.effective_approval_id(), + command: ev.command.clone(), + reason: ev.reason.clone(), + available_decisions: ev.effective_available_decisions(), + network_approval_context: ev.network_approval_context.clone(), + additional_permissions: ev.additional_permissions.clone(), + }), + EventMsg::ApplyPatchApprovalRequest(ev) => Some(ApprovalRequest::ApplyPatch { + thread_id, + thread_label, + id: ev.call_id.clone(), + reason: ev.reason.clone(), + cwd: self + .thread_cwd(thread_id) + .await + .unwrap_or_else(|| self.config.cwd.clone()), + changes: ev.changes.clone(), + }), + EventMsg::ElicitationRequest(ev) => Some(ApprovalRequest::McpElicitation { + thread_id, + thread_label, + server_name: ev.server_name.clone(), + request_id: ev.id.clone(), + message: ev.message.clone(), + }), + _ => None, + } + } + + async fn submit_op_to_thread(&mut self, thread_id: ThreadId, op: Op) { + let replay_state_op = + ThreadEventStore::op_can_change_pending_replay_state(&op).then(|| op.clone()); + let submitted = if self.active_thread_id == Some(thread_id) { + self.chat_widget.submit_op(op) + } else { + crate::session_log::log_outbound_op(&op); + match self.server.get_thread(thread_id).await { + Ok(thread) => match thread.submit(op).await { + Ok(_) => true, + Err(err) => { + self.chat_widget.add_error_message(format!( + "Failed to submit op to thread {thread_id}: {err}" + )); + false + } + }, + Err(err) => { + self.chat_widget.add_error_message(format!( + "Failed to find thread {thread_id} for approval response: {err}" + )); + false + } + } + }; + if submitted && let Some(op) = replay_state_op.as_ref() { + self.note_thread_outbound_op(thread_id, op).await; + self.refresh_pending_thread_approvals().await; + } } async fn refresh_pending_thread_approvals(&mut self) { @@ -965,32 +1094,7 @@ impl App { let threads = pending_thread_ids .into_iter() - .map(|thread_id| { - let is_primary = self.primary_thread_id == Some(thread_id); - let fallback_label = if is_primary { - "Main [default]".to_string() - } else { - let thread_id = thread_id.to_string(); - let short_id: String = thread_id.chars().take(8).collect(); - format!("Agent ({short_id})") - }; - if let Some(entry) = self.agent_picker_threads.get(&thread_id) { - let label = format_agent_picker_item_name( - entry.agent_nickname.as_deref(), - entry.agent_role.as_deref(), - is_primary, - ); - if label == "Agent" { - let thread_id = thread_id.to_string(); - let short_id: String = thread_id.chars().take(8).collect(); - format!("{label} ({short_id})") - } else { - label - } - } else { - fallback_label - } - }) + .map(|thread_id| self.thread_label(thread_id)) .collect(); self.chat_widget.set_pending_thread_approvals(threads); @@ -999,6 +1103,12 @@ impl App { async fn enqueue_thread_event(&mut self, thread_id: ThreadId, event: Event) -> Result<()> { let refresh_pending_thread_approvals = ThreadEventStore::event_can_change_pending_thread_approvals(&event); + let inactive_approval_request = if self.active_thread_id != Some(thread_id) { + self.approval_request_for_thread_event(thread_id, &event) + .await + } else { + None + }; let (sender, store) = { let channel = self.ensure_thread_channel(thread_id); (channel.sender.clone(), Arc::clone(&channel.store)) @@ -1027,6 +1137,8 @@ impl App { tracing::warn!("thread {thread_id} event channel closed"); } } + } else if let Some(request) = inactive_approval_request { + self.chat_widget.push_approval_request(request); } if refresh_pending_thread_approvals { self.refresh_pending_thread_approvals().await; @@ -1034,6 +1146,19 @@ impl App { Ok(()) } + async fn handle_routed_thread_event( + &mut self, + thread_id: ThreadId, + event: Event, + ) -> Result<()> { + if !self.thread_event_channels.contains_key(&thread_id) { + tracing::debug!("dropping stale event for untracked thread {thread_id}"); + return Ok(()); + } + + self.enqueue_thread_event(thread_id, event).await + } + async fn enqueue_primary_event(&mut self, event: Event) -> Result<()> { if let Some(thread_id) = self.primary_thread_id { return self.enqueue_thread_event(thread_id, event).await; @@ -1045,12 +1170,12 @@ impl App { self.primary_session_configured = Some(session.clone()); self.ensure_thread_channel(thread_id); self.activate_thread_channel(thread_id).await; + self.enqueue_thread_event(thread_id, event).await?; let pending = std::mem::take(&mut self.pending_primary_events); for pending_event in pending { self.enqueue_thread_event(thread_id, pending_event).await?; } - self.enqueue_thread_event(thread_id, event).await?; } else { self.pending_primary_events.push_back(event); } @@ -1227,6 +1352,7 @@ impl App { } fn reset_thread_event_state(&mut self) { + self.abort_all_thread_event_listeners(); self.thread_event_channels.clear(); self.agent_picker_threads.clear(); self.active_thread_id = None; @@ -1597,6 +1723,7 @@ impl App { pending_shutdown_exit_thread_id: None, windows_sandbox: WindowsSandboxState::default(), thread_event_channels: HashMap::new(), + thread_event_listener_tasks: HashMap::new(), agent_picker_threads: HashMap::new(), active_thread_id: None, active_thread_rx: None, @@ -2011,6 +2138,9 @@ impl App { AppEvent::CodexEvent(event) => { self.enqueue_primary_event(event).await?; } + AppEvent::ThreadEvent { thread_id, event } => { + self.handle_routed_thread_event(thread_id, event).await?; + } AppEvent::Exit(mode) => { return Ok(self.handle_exit_mode(mode)); } @@ -2026,6 +2156,9 @@ impl App { self.refresh_pending_thread_approvals().await; } } + AppEvent::SubmitThreadOp { thread_id, op } => { + self.submit_op_to_thread(thread_id, op).await; + } AppEvent::DiffResult(text) => { // Clear the in-progress state in the bottom pane self.chat_widget.on_diff_complete(); @@ -2823,9 +2956,6 @@ impl App { AppEvent::OpenAgentPicker => { self.open_agent_picker().await; } - AppEvent::RefreshPendingThreadApprovals => { - self.refresh_pending_thread_approvals().await; - } AppEvent::SelectAgentThread(thread_id) => { self.select_agent_thread(tui, thread_id).await?; } @@ -3184,11 +3314,9 @@ impl App { }; let channel = ThreadEventChannel::new_with_session_configured(THREAD_EVENT_CHANNEL_CAPACITY, event); - let sender = channel.sender.clone(); - let store = Arc::clone(&channel.store); let app_event_tx = self.app_event_tx.clone(); self.thread_event_channels.insert(thread_id, channel); - tokio::spawn(async move { + let listener_handle = tokio::spawn(async move { loop { let event = match thread.next_event().await { Ok(event) => event, @@ -3197,22 +3325,11 @@ impl App { break; } }; - let refresh_pending_thread_approvals = - ThreadEventStore::event_can_change_pending_thread_approvals(&event); - let should_send = { - let mut guard = store.lock().await; - guard.push_event(event.clone()); - guard.active - }; - if refresh_pending_thread_approvals { - app_event_tx.send(AppEvent::RefreshPendingThreadApprovals); - } - if should_send && let Err(err) = sender.send(event).await { - tracing::debug!("external thread {thread_id} channel closed: {err}"); - break; - } + app_event_tx.send(AppEvent::ThreadEvent { thread_id, event }); } }); + self.thread_event_listener_tasks + .insert(thread_id, listener_handle); Ok(()) } @@ -3612,6 +3729,152 @@ mod tests { ); } + #[tokio::test] + async fn enqueue_primary_event_delivers_session_configured_before_buffered_approval() + -> Result<()> { + let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await; + let thread_id = ThreadId::new(); + let approval_event = Event { + id: "approval-event".to_string(), + msg: EventMsg::ExecApprovalRequest( + codex_protocol::protocol::ExecApprovalRequestEvent { + call_id: "call-1".to_string(), + approval_id: None, + turn_id: "turn-1".to_string(), + command: vec!["echo".to_string(), "hello".to_string()], + cwd: PathBuf::from("/tmp/project"), + reason: Some("needs approval".to_string()), + network_approval_context: None, + proposed_execpolicy_amendment: None, + proposed_network_policy_amendments: None, + additional_permissions: None, + available_decisions: None, + parsed_cmd: Vec::new(), + }, + ), + }; + let session_configured_event = Event { + id: "session-configured".to_string(), + msg: EventMsg::SessionConfigured(SessionConfiguredEvent { + session_id: thread_id, + forked_from_id: None, + thread_name: None, + model: "gpt-test".to_string(), + model_provider_id: "test-provider".to_string(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::new_read_only_policy(), + cwd: PathBuf::from("/tmp/project"), + reasoning_effort: None, + history_log_id: 0, + history_entry_count: 0, + initial_messages: None, + network_proxy: None, + rollout_path: Some(PathBuf::new()), + }), + }; + + app.enqueue_primary_event(approval_event.clone()).await?; + app.enqueue_primary_event(session_configured_event.clone()) + .await?; + + let rx = app + .active_thread_rx + .as_mut() + .expect("primary thread receiver should be active"); + let first_event = time::timeout(Duration::from_millis(50), rx.recv()) + .await + .expect("timed out waiting for session configured event") + .expect("channel closed unexpectedly"); + let second_event = time::timeout(Duration::from_millis(50), rx.recv()) + .await + .expect("timed out waiting for buffered approval event") + .expect("channel closed unexpectedly"); + + assert!(matches!(first_event.msg, EventMsg::SessionConfigured(_))); + assert!(matches!(second_event.msg, EventMsg::ExecApprovalRequest(_))); + + app.handle_codex_event_now(first_event); + app.handle_codex_event_now(second_event); + app.chat_widget + .handle_key_event(KeyEvent::new(KeyCode::Char('y'), KeyModifiers::NONE)); + + while let Ok(app_event) = app_event_rx.try_recv() { + if let AppEvent::SubmitThreadOp { + thread_id: op_thread_id, + .. + } = app_event + { + assert_eq!(op_thread_id, thread_id); + return Ok(()); + } + } + + panic!("expected approval action to submit a thread-scoped op"); + } + + #[tokio::test] + async fn routed_thread_event_does_not_recreate_channel_after_reset() -> Result<()> { + let mut app = make_test_app().await; + let thread_id = ThreadId::new(); + app.thread_event_channels.insert( + thread_id, + ThreadEventChannel::new(THREAD_EVENT_CHANNEL_CAPACITY), + ); + + app.reset_thread_event_state(); + app.handle_routed_thread_event( + thread_id, + Event { + id: "stale-event".to_string(), + msg: EventMsg::ShutdownComplete, + }, + ) + .await?; + + assert!( + !app.thread_event_channels.contains_key(&thread_id), + "stale routed events should not recreate cleared thread channels" + ); + assert_eq!(app.active_thread_id, None); + assert_eq!(app.primary_thread_id, None); + Ok(()) + } + + #[tokio::test] + async fn reset_thread_event_state_aborts_listener_tasks() { + struct NotifyOnDrop(Option>); + + impl Drop for NotifyOnDrop { + fn drop(&mut self) { + if let Some(tx) = self.0.take() { + let _ = tx.send(()); + } + } + } + + let mut app = make_test_app().await; + let thread_id = ThreadId::new(); + let (started_tx, started_rx) = tokio::sync::oneshot::channel(); + let (dropped_tx, dropped_rx) = tokio::sync::oneshot::channel(); + let handle = tokio::spawn(async move { + let _notify_on_drop = NotifyOnDrop(Some(dropped_tx)); + let _ = started_tx.send(()); + std::future::pending::<()>().await; + }); + app.thread_event_listener_tasks.insert(thread_id, handle); + started_rx + .await + .expect("listener task should report it started"); + + app.reset_thread_event_state(); + + assert_eq!(app.thread_event_listener_tasks.is_empty(), true); + time::timeout(Duration::from_millis(50), dropped_rx) + .await + .expect("timed out waiting for listener task abort") + .expect("listener task drop notification should succeed"); + } + #[tokio::test] async fn enqueue_thread_event_does_not_block_when_channel_full() -> Result<()> { let mut app = make_test_app().await; @@ -3761,6 +4024,85 @@ mod tests { assert!(app.chat_widget.pending_thread_approvals().is_empty()); } + #[tokio::test] + async fn inactive_thread_approval_bubbles_into_active_view() -> Result<()> { + let mut app = make_test_app().await; + let main_thread_id = + ThreadId::from_string("00000000-0000-0000-0000-000000000011").expect("valid thread"); + let agent_thread_id = + ThreadId::from_string("00000000-0000-0000-0000-000000000022").expect("valid thread"); + + app.primary_thread_id = Some(main_thread_id); + app.active_thread_id = Some(main_thread_id); + app.thread_event_channels + .insert(main_thread_id, ThreadEventChannel::new(1)); + app.thread_event_channels.insert( + agent_thread_id, + ThreadEventChannel::new_with_session_configured( + 1, + Event { + id: String::new(), + msg: EventMsg::SessionConfigured(SessionConfiguredEvent { + session_id: agent_thread_id, + forked_from_id: None, + thread_name: None, + model: "gpt-5".to_string(), + model_provider_id: "test-provider".to_string(), + approval_policy: AskForApproval::OnRequest, + sandbox_policy: SandboxPolicy::new_workspace_write_policy(), + cwd: PathBuf::from("/tmp/agent"), + reasoning_effort: None, + history_log_id: 0, + history_entry_count: 0, + initial_messages: None, + network_proxy: None, + rollout_path: Some(PathBuf::from("/tmp/agent-rollout.jsonl")), + }), + }, + ), + ); + app.agent_picker_threads.insert( + agent_thread_id, + AgentPickerThreadEntry { + agent_nickname: Some("Robie".to_string()), + agent_role: Some("explorer".to_string()), + is_closed: false, + }, + ); + + app.enqueue_thread_event( + agent_thread_id, + Event { + id: "ev-approval".to_string(), + msg: EventMsg::ExecApprovalRequest( + codex_protocol::protocol::ExecApprovalRequestEvent { + call_id: "call-approval".to_string(), + approval_id: None, + turn_id: "turn-approval".to_string(), + command: vec!["echo".to_string(), "hi".to_string()], + cwd: PathBuf::from("/tmp/agent"), + reason: Some("need approval".to_string()), + network_approval_context: None, + proposed_execpolicy_amendment: None, + proposed_network_policy_amendments: None, + additional_permissions: None, + available_decisions: None, + parsed_cmd: Vec::new(), + }, + ), + }, + ) + .await?; + + assert_eq!(app.chat_widget.has_active_view(), true); + assert_eq!( + app.chat_widget.pending_thread_approvals(), + &["Robie [explorer]".to_string()] + ); + + Ok(()) + } + #[test] fn agent_picker_item_name_snapshot() { let thread_id = @@ -4041,6 +4383,7 @@ mod tests { pending_shutdown_exit_thread_id: None, windows_sandbox: WindowsSandboxState::default(), thread_event_channels: HashMap::new(), + thread_event_listener_tasks: HashMap::new(), agent_picker_threads: HashMap::new(), active_thread_id: None, active_thread_rx: None, @@ -4100,6 +4443,7 @@ mod tests { pending_shutdown_exit_thread_id: None, windows_sandbox: WindowsSandboxState::default(), thread_event_channels: HashMap::new(), + thread_event_listener_tasks: HashMap::new(), agent_picker_threads: HashMap::new(), active_thread_id: None, active_thread_rx: None, diff --git a/codex-rs/tui/src/app_event.rs b/codex-rs/tui/src/app_event.rs index 104895ad9..e769764c9 100644 --- a/codex-rs/tui/src/app_event.rs +++ b/codex-rs/tui/src/app_event.rs @@ -73,8 +73,17 @@ pub(crate) enum AppEvent { /// Switch the active thread to the selected agent. SelectAgentThread(ThreadId), - /// Recompute the list of inactive threads that still need approval. - RefreshPendingThreadApprovals, + /// Submit an op to the specified thread, regardless of current focus. + SubmitThreadOp { + thread_id: ThreadId, + op: codex_protocol::protocol::Op, + }, + + /// Forward an event from a non-primary thread into the app-level thread router. + ThreadEvent { + thread_id: ThreadId, + event: Event, + }, /// Start a new session. NewSession, diff --git a/codex-rs/tui/src/bottom_pane/approval_overlay.rs b/codex-rs/tui/src/bottom_pane/approval_overlay.rs index b4a2de333..a9ef2cd38 100644 --- a/codex-rs/tui/src/bottom_pane/approval_overlay.rs +++ b/codex-rs/tui/src/bottom_pane/approval_overlay.rs @@ -17,6 +17,7 @@ use crate::render::highlight::highlight_bash_to_lines; use crate::render::renderable::ColumnRenderable; use crate::render::renderable::Renderable; use codex_core::features::Features; +use codex_protocol::ThreadId; use codex_protocol::mcp::RequestId; use codex_protocol::models::PermissionProfile; use codex_protocol::protocol::ElicitationAction; @@ -41,6 +42,8 @@ use ratatui::widgets::Wrap; #[derive(Clone, Debug)] pub(crate) enum ApprovalRequest { Exec { + thread_id: ThreadId, + thread_label: Option, id: String, command: Vec, reason: Option, @@ -49,18 +52,40 @@ pub(crate) enum ApprovalRequest { additional_permissions: Option, }, ApplyPatch { + thread_id: ThreadId, + thread_label: Option, id: String, reason: Option, cwd: PathBuf, changes: HashMap, }, McpElicitation { + thread_id: ThreadId, + thread_label: Option, server_name: String, request_id: RequestId, message: String, }, } +impl ApprovalRequest { + fn thread_id(&self) -> ThreadId { + match self { + ApprovalRequest::Exec { thread_id, .. } + | ApprovalRequest::ApplyPatch { thread_id, .. } + | ApprovalRequest::McpElicitation { thread_id, .. } => *thread_id, + } + } + + fn thread_label(&self) -> Option<&str> { + match self { + ApprovalRequest::Exec { thread_label, .. } + | ApprovalRequest::ApplyPatch { thread_label, .. } + | ApprovalRequest::McpElicitation { thread_label, .. } => thread_label.as_deref(), + } + } +} + /// Modal overlay asking the user to approve or deny one or more requests. pub(crate) struct ApprovalOverlay { current_request: Option, @@ -158,13 +183,7 @@ impl ApprovalOverlay { .collect(); let params = SelectionViewParams { - footer_hint: Some(Line::from(vec![ - "Press ".into(), - key_hint::plain(KeyCode::Enter).into(), - " to confirm or ".into(), - key_hint::plain(KeyCode::Esc).into(), - " to cancel".into(), - ])), + footer_hint: Some(approval_footer_hint(request)), items, header, ..Default::default() @@ -207,20 +226,39 @@ impl ApprovalOverlay { } fn handle_exec_decision(&self, id: &str, command: &[String], decision: ReviewDecision) { - let cell = history_cell::new_approval_decision_cell(command.to_vec(), decision.clone()); - self.app_event_tx.send(AppEvent::InsertHistoryCell(cell)); - self.app_event_tx.send(AppEvent::CodexOp(Op::ExecApproval { - id: id.to_string(), - turn_id: None, - decision, - })); + let Some(request) = self.current_request.as_ref() else { + return; + }; + if request.thread_label().is_none() { + let cell = history_cell::new_approval_decision_cell(command.to_vec(), decision.clone()); + self.app_event_tx.send(AppEvent::InsertHistoryCell(cell)); + } + let thread_id = request.thread_id(); + self.app_event_tx.send(AppEvent::SubmitThreadOp { + thread_id, + op: Op::ExecApproval { + id: id.to_string(), + turn_id: None, + decision, + }, + }); } fn handle_patch_decision(&self, id: &str, decision: ReviewDecision) { - self.app_event_tx.send(AppEvent::CodexOp(Op::PatchApproval { - id: id.to_string(), - decision, - })); + let Some(thread_id) = self + .current_request + .as_ref() + .map(ApprovalRequest::thread_id) + else { + return; + }; + self.app_event_tx.send(AppEvent::SubmitThreadOp { + thread_id, + op: Op::PatchApproval { + id: id.to_string(), + decision, + }, + }); } fn handle_elicitation_decision( @@ -229,12 +267,21 @@ impl ApprovalOverlay { request_id: &RequestId, decision: ElicitationAction, ) { - self.app_event_tx - .send(AppEvent::CodexOp(Op::ResolveElicitation { + let Some(thread_id) = self + .current_request + .as_ref() + .map(ApprovalRequest::thread_id) + else { + return; + }; + self.app_event_tx.send(AppEvent::SubmitThreadOp { + thread_id, + op: Op::ResolveElicitation { server_name: server_name.to_string(), request_id: request_id.clone(), decision, - })); + }, + }); } fn advance_queue(&mut self) { @@ -261,6 +308,23 @@ impl ApprovalOverlay { false } } + KeyEvent { + kind: KeyEventKind::Press, + code: KeyCode::Char('o'), + .. + } => { + if let Some(request) = self.current_request.as_ref() { + if request.thread_label().is_some() { + self.app_event_tx + .send(AppEvent::SelectAgentThread(request.thread_id())); + true + } else { + false + } + } else { + false + } + } e => { if let Some(idx) = self .options @@ -347,9 +411,28 @@ impl Renderable for ApprovalOverlay { } } +fn approval_footer_hint(request: &ApprovalRequest) -> Line<'static> { + let mut spans = vec![ + "Press ".into(), + key_hint::plain(KeyCode::Enter).into(), + " to confirm or ".into(), + key_hint::plain(KeyCode::Esc).into(), + " to cancel".into(), + ]; + if request.thread_label().is_some() { + spans.extend([ + " or ".into(), + key_hint::plain(KeyCode::Char('o')).into(), + " to open thread".into(), + ]); + } + Line::from(spans) +} + fn build_header(request: &ApprovalRequest) -> Box { match request { ApprovalRequest::Exec { + thread_label, reason, command, network_approval_context, @@ -357,6 +440,13 @@ fn build_header(request: &ApprovalRequest) -> Box { .. } => { let mut header: Vec> = Vec::new(); + if let Some(thread_label) = thread_label { + header.push(Line::from(vec![ + "Thread: ".into(), + thread_label.clone().bold(), + ])); + header.push(Line::from("")); + } if let Some(reason) = reason { header.push(Line::from(vec!["Reason: ".into(), reason.clone().italic()])); header.push(Line::from("")); @@ -381,12 +471,20 @@ fn build_header(request: &ApprovalRequest) -> Box { Box::new(Paragraph::new(header).wrap(Wrap { trim: false })) } ApprovalRequest::ApplyPatch { + thread_label, reason, cwd, changes, .. } => { let mut header: Vec> = Vec::new(); + if let Some(thread_label) = thread_label { + header.push(Box::new(Line::from(vec![ + "Thread: ".into(), + thread_label.clone().bold(), + ]))); + header.push(Box::new(Line::from(""))); + } if let Some(reason) = reason && !reason.is_empty() { @@ -403,16 +501,25 @@ fn build_header(request: &ApprovalRequest) -> Box { Box::new(ColumnRenderable::with(header)) } ApprovalRequest::McpElicitation { + thread_label, server_name, message, .. } => { - let header = Paragraph::new(vec![ + let mut lines = Vec::new(); + if let Some(thread_label) = thread_label { + lines.push(Line::from(vec![ + "Thread: ".into(), + thread_label.clone().bold(), + ])); + lines.push(Line::from("")); + } + lines.extend([ Line::from(vec!["Server: ".into(), server_name.clone().bold()]), Line::from(""), Line::from(message.clone()), - ]) - .wrap(Wrap { trim: false }); + ]); + let header = Paragraph::new(lines).wrap(Wrap { trim: false }); Box::new(header) } } @@ -652,6 +759,8 @@ mod tests { fn make_exec_request() -> ApprovalRequest { ApprovalRequest::Exec { + thread_id: ThreadId::new(), + thread_label: None, id: "test".to_string(), command: vec!["echo".to_string(), "hi".to_string()], reason: Some("reason".to_string()), @@ -679,10 +788,10 @@ mod tests { let mut view = ApprovalOverlay::new(make_exec_request(), tx, Features::with_defaults()); assert!(!view.is_complete()); view.handle_key_event(KeyEvent::new(KeyCode::Char('y'), KeyModifiers::NONE)); - // We expect at least one CodexOp message in the queue. + // We expect at least one thread-scoped approval op message in the queue. let mut saw_op = false; while let Ok(ev) = rx.try_recv() { - if matches!(ev, AppEvent::CodexOp(_)) { + if matches!(ev, AppEvent::SubmitThreadOp { .. }) { saw_op = true; break; } @@ -690,12 +799,68 @@ mod tests { assert!(saw_op, "expected approval decision to emit an op"); } + #[test] + fn o_opens_source_thread_for_cross_thread_approval() { + let (tx, mut rx) = unbounded_channel::(); + let tx = AppEventSender::new(tx); + let thread_id = ThreadId::new(); + let mut view = ApprovalOverlay::new( + ApprovalRequest::Exec { + thread_id, + thread_label: Some("Robie [explorer]".to_string()), + id: "test".to_string(), + command: vec!["echo".to_string(), "hi".to_string()], + reason: None, + available_decisions: vec![ReviewDecision::Approved, ReviewDecision::Abort], + network_approval_context: None, + additional_permissions: None, + }, + tx, + Features::with_defaults(), + ); + + view.handle_key_event(KeyEvent::new(KeyCode::Char('o'), KeyModifiers::NONE)); + + let event = rx.try_recv().expect("expected select-agent-thread event"); + assert_eq!( + matches!(event, AppEvent::SelectAgentThread(id) if id == thread_id), + true + ); + } + + #[test] + fn cross_thread_footer_hint_mentions_o_shortcut() { + let (tx, _rx) = unbounded_channel::(); + let tx = AppEventSender::new(tx); + let view = ApprovalOverlay::new( + ApprovalRequest::Exec { + thread_id: ThreadId::new(), + thread_label: Some("Robie [explorer]".to_string()), + id: "test".to_string(), + command: vec!["echo".to_string(), "hi".to_string()], + reason: None, + available_decisions: vec![ReviewDecision::Approved, ReviewDecision::Abort], + network_approval_context: None, + additional_permissions: None, + }, + tx, + Features::with_defaults(), + ); + + assert_snapshot!( + "approval_overlay_cross_thread_prompt", + render_overlay_lines(&view, 80) + ); + } + #[test] fn exec_prefix_option_emits_execpolicy_amendment() { let (tx, mut rx) = unbounded_channel::(); let tx = AppEventSender::new(tx); let mut view = ApprovalOverlay::new( ApprovalRequest::Exec { + thread_id: ThreadId::new(), + thread_label: None, id: "test".to_string(), command: vec!["echo".to_string()], reason: None, @@ -717,7 +882,11 @@ mod tests { view.handle_key_event(KeyEvent::new(KeyCode::Char('p'), KeyModifiers::NONE)); let mut saw_op = false; while let Ok(ev) = rx.try_recv() { - if let AppEvent::CodexOp(Op::ExecApproval { decision, .. }) = ev { + if let AppEvent::SubmitThreadOp { + op: Op::ExecApproval { decision, .. }, + .. + } = ev + { assert_eq!( decision, ReviewDecision::ApprovedExecpolicyAmendment { @@ -742,6 +911,8 @@ mod tests { let tx = AppEventSender::new(tx); let mut view = ApprovalOverlay::new( ApprovalRequest::Exec { + thread_id: ThreadId::new(), + thread_label: None, id: "test".to_string(), command: vec!["curl".to_string(), "https://example.com".to_string()], reason: None, @@ -779,6 +950,8 @@ mod tests { let tx = AppEventSender::new(tx); let command = vec!["echo".into(), "hello".into(), "world".into()]; let exec_request = ApprovalRequest::Exec { + thread_id: ThreadId::new(), + thread_label: None, id: "test".into(), command, reason: None, @@ -893,6 +1066,8 @@ mod tests { let (tx, _rx) = unbounded_channel::(); let tx = AppEventSender::new(tx); let exec_request = ApprovalRequest::Exec { + thread_id: ThreadId::new(), + thread_label: None, id: "test".into(), command: vec!["cat".into(), "/tmp/readme.txt".into()], reason: None, @@ -932,6 +1107,8 @@ mod tests { let (tx, _rx) = unbounded_channel::(); let tx = AppEventSender::new(tx); let exec_request = ApprovalRequest::Exec { + thread_id: ThreadId::new(), + thread_label: None, id: "test".into(), command: vec!["cat".into(), "/tmp/readme.txt".into()], reason: Some("need filesystem access".into()), @@ -958,6 +1135,8 @@ mod tests { let (tx, _rx) = unbounded_channel::(); let tx = AppEventSender::new(tx); let exec_request = ApprovalRequest::Exec { + thread_id: ThreadId::new(), + thread_label: None, id: "test".into(), command: vec!["curl".into(), "https://example.com".into()], reason: Some("network request blocked".into()), @@ -1049,7 +1228,11 @@ mod tests { let mut decision = None; while let Ok(ev) = rx.try_recv() { - if let AppEvent::CodexOp(Op::ExecApproval { decision: d, .. }) = ev { + if let AppEvent::SubmitThreadOp { + op: Op::ExecApproval { decision: d, .. }, + .. + } = ev + { decision = Some(d); break; } diff --git a/codex-rs/tui/src/bottom_pane/mod.rs b/codex-rs/tui/src/bottom_pane/mod.rs index dac006988..72ffeace9 100644 --- a/codex-rs/tui/src/bottom_pane/mod.rs +++ b/codex-rs/tui/src/bottom_pane/mod.rs @@ -812,6 +812,11 @@ impl BottomPane { self.is_task_running } + #[cfg(test)] + pub(crate) fn has_active_view(&self) -> bool { + !self.view_stack.is_empty() + } + /// Return true when the pane is in the regular composer state without any /// overlays or popups and not running a task. This is the safe context to /// use Esc-Esc for backtracking from the main view. @@ -1115,6 +1120,8 @@ mod tests { fn exec_request() -> ApprovalRequest { ApprovalRequest::Exec { + thread_id: codex_protocol::ThreadId::new(), + thread_label: None, id: "1".to_string(), command: vec!["echo".into(), "ok".into()], reason: None, diff --git a/codex-rs/tui/src/bottom_pane/snapshots/codex_tui__bottom_pane__approval_overlay__tests__approval_overlay_cross_thread_prompt.snap b/codex-rs/tui/src/bottom_pane/snapshots/codex_tui__bottom_pane__approval_overlay__tests__approval_overlay_cross_thread_prompt.snap new file mode 100644 index 000000000..ddd106c63 --- /dev/null +++ b/codex-rs/tui/src/bottom_pane/snapshots/codex_tui__bottom_pane__approval_overlay__tests__approval_overlay_cross_thread_prompt.snap @@ -0,0 +1,14 @@ +--- +source: tui/src/bottom_pane/approval_overlay.rs +expression: "render_overlay_lines(&view, 80)" +--- + Would you like to run the following command? + + Thread: Robie [explorer] + + $ echo hi + +› 1. Yes, proceed (y) + 2. No, and tell Codex what to do differently (esc) + + Press enter to confirm or esc to cancel or o to open thread diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index d727a1712..bc140fdf6 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -2582,6 +2582,8 @@ impl ChatWidget { let available_decisions = ev.effective_available_decisions(); let request = ApprovalRequest::Exec { + thread_id: self.thread_id.unwrap_or_default(), + thread_label: None, id: ev.effective_approval_id(), command: ev.command, reason: ev.reason, @@ -2598,6 +2600,8 @@ impl ChatWidget { self.flush_answer_stream_with_separator(); let request = ApprovalRequest::ApplyPatch { + thread_id: self.thread_id.unwrap_or_default(), + thread_label: None, id: ev.call_id, reason: ev.reason, changes: ev.changes.clone(), @@ -2620,6 +2624,8 @@ impl ChatWidget { }); let request = ApprovalRequest::McpElicitation { + thread_id: self.thread_id.unwrap_or_default(), + thread_label: None, server_name: ev.server_name, request_id: ev.id, message: ev.message, @@ -2629,6 +2635,12 @@ impl ChatWidget { self.request_redraw(); } + pub(crate) fn push_approval_request(&mut self, request: ApprovalRequest) { + self.bottom_pane + .push_approval_request(request, &self.config.features); + self.request_redraw(); + } + pub(crate) fn handle_request_user_input_now(&mut self, ev: RequestUserInputEvent) { self.flush_answer_stream_with_separator(); self.bottom_pane.push_user_input_request(ev); @@ -7543,6 +7555,11 @@ impl ChatWidget { self.bottom_pane.pending_thread_approvals() } + #[cfg(test)] + pub(crate) fn has_active_view(&self) -> bool { + self.bottom_pane.has_active_view() + } + pub(crate) fn show_esc_backtrack_hint(&mut self) { self.bottom_pane.show_esc_backtrack_hint(); } diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index b3b1a4922..d89ed4bf5 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -2854,7 +2854,11 @@ async fn exec_approval_uses_approval_id_when_present() { let mut found = false; while let Ok(app_ev) = rx.try_recv() { - if let AppEvent::CodexOp(Op::ExecApproval { id, decision, .. }) = app_ev { + if let AppEvent::SubmitThreadOp { + op: Op::ExecApproval { id, decision, .. }, + .. + } = app_ev + { assert_eq!(id, "approval-subcommand"); assert_matches!(decision, codex_protocol::protocol::ReviewDecision::Approved); found = true; @@ -7637,10 +7641,14 @@ async fn apply_patch_approval_sends_op_with_call_id() { // Approve via key press 'y' chat.handle_key_event(KeyEvent::new(KeyCode::Char('y'), KeyModifiers::NONE)); - // Expect a CodexOp with PatchApproval carrying the call id. + // Expect a thread-scoped PatchApproval op carrying the call id. let mut found = false; while let Ok(app_ev) = rx.try_recv() { - if let AppEvent::CodexOp(Op::PatchApproval { id, decision }) = app_ev { + if let AppEvent::SubmitThreadOp { + op: Op::PatchApproval { id, decision }, + .. + } = app_ev + { assert_eq!(id, "call-999"); assert_matches!(decision, codex_protocol::protocol::ReviewDecision::Approved); found = true; @@ -7671,16 +7679,16 @@ async fn apply_patch_full_flow_integration_like() { }), }); - // 2) User approves via 'y' and App receives a CodexOp + // 2) User approves via 'y' and App receives a thread-scoped op chat.handle_key_event(KeyEvent::new(KeyCode::Char('y'), KeyModifiers::NONE)); let mut maybe_op: Option = None; while let Ok(app_ev) = rx.try_recv() { - if let AppEvent::CodexOp(op) = app_ev { + if let AppEvent::SubmitThreadOp { op, .. } = app_ev { maybe_op = Some(op); break; } } - let op = maybe_op.expect("expected CodexOp after key press"); + let op = maybe_op.expect("expected thread-scoped op after key press"); // 3) App forwards to widget.submit_op, which pushes onto codex_op_tx chat.submit_op(op);