parent
124a09e577
commit
4cef89a122
17 changed files with 250 additions and 246 deletions
|
|
@ -146,7 +146,7 @@ use crate::tools::sandboxing::ApprovalStore;
|
|||
use crate::tools::spec::ToolsConfig;
|
||||
use crate::tools::spec::ToolsConfigParams;
|
||||
use crate::turn_diff_tracker::TurnDiffTracker;
|
||||
use crate::unified_exec::UnifiedExecSessionManager;
|
||||
use crate::unified_exec::UnifiedExecProcessManager;
|
||||
use crate::user_instructions::DeveloperInstructions;
|
||||
use crate::user_instructions::UserInstructions;
|
||||
use crate::user_notification::UserNotification;
|
||||
|
|
@ -677,7 +677,7 @@ impl Session {
|
|||
let services = SessionServices {
|
||||
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())),
|
||||
mcp_startup_cancellation_token: CancellationToken::new(),
|
||||
unified_exec_manager: UnifiedExecSessionManager::default(),
|
||||
unified_exec_manager: UnifiedExecProcessManager::default(),
|
||||
notifier: UserNotifier::new(config.notify.clone()),
|
||||
rollout: Mutex::new(Some(rollout_recorder)),
|
||||
user_shell: Arc::new(default_shell),
|
||||
|
|
@ -2130,7 +2130,7 @@ mod handlers {
|
|||
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
|
||||
sess.services
|
||||
.unified_exec_manager
|
||||
.terminate_all_sessions()
|
||||
.terminate_all_processes()
|
||||
.await;
|
||||
info!("Shutting down Codex instance");
|
||||
|
||||
|
|
@ -3517,7 +3517,7 @@ mod tests {
|
|||
let services = SessionServices {
|
||||
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())),
|
||||
mcp_startup_cancellation_token: CancellationToken::new(),
|
||||
unified_exec_manager: UnifiedExecSessionManager::default(),
|
||||
unified_exec_manager: UnifiedExecProcessManager::default(),
|
||||
notifier: UserNotifier::new(None),
|
||||
rollout: Mutex::new(None),
|
||||
user_shell: Arc::new(default_user_shell()),
|
||||
|
|
@ -3608,7 +3608,7 @@ mod tests {
|
|||
let services = SessionServices {
|
||||
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())),
|
||||
mcp_startup_cancellation_token: CancellationToken::new(),
|
||||
unified_exec_manager: UnifiedExecSessionManager::default(),
|
||||
unified_exec_manager: UnifiedExecProcessManager::default(),
|
||||
notifier: UserNotifier::new(None),
|
||||
rollout: Mutex::new(None),
|
||||
user_shell: Arc::new(default_user_shell()),
|
||||
|
|
@ -3655,7 +3655,7 @@ mod tests {
|
|||
session.features = features;
|
||||
|
||||
session
|
||||
.record_model_warning("too many unified exec sessions", &turn_context)
|
||||
.record_model_warning("too many unified exec processes", &turn_context)
|
||||
.await;
|
||||
|
||||
let mut history = session.clone_history().await;
|
||||
|
|
@ -3668,7 +3668,7 @@ mod tests {
|
|||
assert_eq!(
|
||||
content,
|
||||
&vec![ContentItem::InputText {
|
||||
text: "Warning: too many unified exec sessions".to_string(),
|
||||
text: "Warning: too many unified exec processes".to_string(),
|
||||
}]
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -346,7 +346,7 @@ pub const FEATURES: &[FeatureSpec] = &[
|
|||
stage: Stage::Beta {
|
||||
name: "Background terminal",
|
||||
menu_description: "Run long-running terminal commands in the background.",
|
||||
announcement: "NEW! Try Background terminals for long running processes. Enable in /experimental!",
|
||||
announcement: "NEW! Try Background terminals for long-running commands. Enable in /experimental!",
|
||||
},
|
||||
default_enabled: false,
|
||||
},
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ use crate::mcp_connection_manager::McpConnectionManager;
|
|||
use crate::models_manager::manager::ModelsManager;
|
||||
use crate::skills::SkillsManager;
|
||||
use crate::tools::sandboxing::ApprovalStore;
|
||||
use crate::unified_exec::UnifiedExecSessionManager;
|
||||
use crate::unified_exec::UnifiedExecProcessManager;
|
||||
use crate::user_notification::UserNotifier;
|
||||
use codex_otel::otel_manager::OtelManager;
|
||||
use tokio::sync::Mutex;
|
||||
|
|
@ -18,7 +18,7 @@ use tokio_util::sync::CancellationToken;
|
|||
pub(crate) struct SessionServices {
|
||||
pub(crate) mcp_connection_manager: Arc<RwLock<McpConnectionManager>>,
|
||||
pub(crate) mcp_startup_cancellation_token: CancellationToken,
|
||||
pub(crate) unified_exec_manager: UnifiedExecSessionManager,
|
||||
pub(crate) unified_exec_manager: UnifiedExecProcessManager,
|
||||
pub(crate) notifier: UserNotifier,
|
||||
pub(crate) rollout: Mutex<Option<RolloutRecorder>>,
|
||||
pub(crate) user_shell: Arc<crate::shell::Shell>,
|
||||
|
|
|
|||
|
|
@ -159,7 +159,7 @@ impl Session {
|
|||
for task in self.take_all_running_tasks().await {
|
||||
self.handle_task_abort(task, reason.clone()).await;
|
||||
}
|
||||
self.close_unified_exec_sessions().await;
|
||||
self.close_unified_exec_processes().await;
|
||||
}
|
||||
|
||||
pub async fn on_task_finished(
|
||||
|
|
@ -168,7 +168,7 @@ impl Session {
|
|||
last_agent_message: Option<String>,
|
||||
) {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
let should_close_sessions = if let Some(at) = active.as_mut()
|
||||
let should_close_processes = if let Some(at) = active.as_mut()
|
||||
&& at.remove_task(&turn_context.sub_id)
|
||||
{
|
||||
*active = None;
|
||||
|
|
@ -177,8 +177,8 @@ impl Session {
|
|||
false
|
||||
};
|
||||
drop(active);
|
||||
if should_close_sessions {
|
||||
self.close_unified_exec_sessions().await;
|
||||
if should_close_processes {
|
||||
self.close_unified_exec_processes().await;
|
||||
}
|
||||
let event = EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message });
|
||||
self.send_event(turn_context.as_ref(), event).await;
|
||||
|
|
@ -203,10 +203,10 @@ impl Session {
|
|||
}
|
||||
}
|
||||
|
||||
async fn close_unified_exec_sessions(&self) {
|
||||
async fn close_unified_exec_processes(&self) {
|
||||
self.services
|
||||
.unified_exec_manager
|
||||
.terminate_all_sessions()
|
||||
.terminate_all_processes()
|
||||
.await;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -13,8 +13,8 @@ use crate::tools::registry::ToolHandler;
|
|||
use crate::tools::registry::ToolKind;
|
||||
use crate::unified_exec::ExecCommandRequest;
|
||||
use crate::unified_exec::UnifiedExecContext;
|
||||
use crate::unified_exec::UnifiedExecProcessManager;
|
||||
use crate::unified_exec::UnifiedExecResponse;
|
||||
use crate::unified_exec::UnifiedExecSessionManager;
|
||||
use crate::unified_exec::WriteStdinRequest;
|
||||
use async_trait::async_trait;
|
||||
use serde::Deserialize;
|
||||
|
|
@ -112,7 +112,7 @@ impl ToolHandler for UnifiedExecHandler {
|
|||
}
|
||||
};
|
||||
|
||||
let manager: &UnifiedExecSessionManager = &session.services.unified_exec_manager;
|
||||
let manager: &UnifiedExecProcessManager = &session.services.unified_exec_manager;
|
||||
let context = UnifiedExecContext::new(session.clone(), turn.clone(), call_id.clone());
|
||||
|
||||
let response = match tool_name.as_str() {
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
Runtime: unified exec
|
||||
|
||||
Handles approval + sandbox orchestration for unified exec requests, delegating to
|
||||
the session manager to spawn PTYs once an ExecEnv is prepared.
|
||||
the process manager to spawn PTYs once an ExecEnv is prepared.
|
||||
*/
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::SandboxErr;
|
||||
|
|
@ -25,8 +25,8 @@ use crate::tools::sandboxing::ToolError;
|
|||
use crate::tools::sandboxing::ToolRuntime;
|
||||
use crate::tools::sandboxing::with_cached_approval;
|
||||
use crate::unified_exec::UnifiedExecError;
|
||||
use crate::unified_exec::UnifiedExecSession;
|
||||
use crate::unified_exec::UnifiedExecSessionManager;
|
||||
use crate::unified_exec::UnifiedExecProcess;
|
||||
use crate::unified_exec::UnifiedExecProcessManager;
|
||||
use codex_protocol::protocol::ReviewDecision;
|
||||
use futures::future::BoxFuture;
|
||||
use std::collections::HashMap;
|
||||
|
|
@ -50,7 +50,7 @@ pub struct UnifiedExecApprovalKey {
|
|||
}
|
||||
|
||||
pub struct UnifiedExecRuntime<'a> {
|
||||
manager: &'a UnifiedExecSessionManager,
|
||||
manager: &'a UnifiedExecProcessManager,
|
||||
}
|
||||
|
||||
impl UnifiedExecRequest {
|
||||
|
|
@ -74,7 +74,7 @@ impl UnifiedExecRequest {
|
|||
}
|
||||
|
||||
impl<'a> UnifiedExecRuntime<'a> {
|
||||
pub fn new(manager: &'a UnifiedExecSessionManager) -> Self {
|
||||
pub fn new(manager: &'a UnifiedExecProcessManager) -> Self {
|
||||
Self { manager }
|
||||
}
|
||||
}
|
||||
|
|
@ -158,13 +158,13 @@ impl Approvable<UnifiedExecRequest> for UnifiedExecRuntime<'_> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecSession> for UnifiedExecRuntime<'a> {
|
||||
impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRuntime<'a> {
|
||||
async fn run(
|
||||
&mut self,
|
||||
req: &UnifiedExecRequest,
|
||||
attempt: &SandboxAttempt<'_>,
|
||||
ctx: &ToolCtx<'_>,
|
||||
) -> Result<UnifiedExecSession, ToolError> {
|
||||
) -> Result<UnifiedExecProcess, ToolError> {
|
||||
let base_command = &req.command;
|
||||
let session_shell = ctx.session.user_shell();
|
||||
let command = maybe_wrap_shell_lc_with_snapshot(base_command, session_shell.as_ref());
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ use tokio::time::Instant;
|
|||
use tokio::time::Sleep;
|
||||
|
||||
use super::UnifiedExecContext;
|
||||
use super::session::UnifiedExecSession;
|
||||
use super::process::UnifiedExecProcess;
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::exec::ExecToolCallOutput;
|
||||
|
|
@ -37,13 +37,13 @@ const UNIFIED_EXEC_OUTPUT_DELTA_MAX_BYTES: usize = 8192;
|
|||
/// shared transcript, and emits ExecCommandOutputDelta events on UTF‑8
|
||||
/// boundaries.
|
||||
pub(crate) fn start_streaming_output(
|
||||
session: &UnifiedExecSession,
|
||||
process: &UnifiedExecProcess,
|
||||
context: &UnifiedExecContext,
|
||||
transcript: Arc<Mutex<HeadTailBuffer>>,
|
||||
) {
|
||||
let mut receiver = session.output_receiver();
|
||||
let output_drained = session.output_drained_notify();
|
||||
let exit_token = session.cancellation_token();
|
||||
let mut receiver = process.output_receiver();
|
||||
let output_drained = process.output_drained_notify();
|
||||
let exit_token = process.cancellation_token();
|
||||
|
||||
let session_ref = Arc::clone(&context.session);
|
||||
let turn_ref = Arc::clone(&context.turn);
|
||||
|
|
@ -104,7 +104,7 @@ pub(crate) fn start_streaming_output(
|
|||
/// single ExecCommandEnd event with the aggregated transcript.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn spawn_exit_watcher(
|
||||
session: Arc<UnifiedExecSession>,
|
||||
process: Arc<UnifiedExecProcess>,
|
||||
session_ref: Arc<Session>,
|
||||
turn_ref: Arc<TurnContext>,
|
||||
call_id: String,
|
||||
|
|
@ -114,14 +114,14 @@ pub(crate) fn spawn_exit_watcher(
|
|||
transcript: Arc<Mutex<HeadTailBuffer>>,
|
||||
started_at: Instant,
|
||||
) {
|
||||
let exit_token = session.cancellation_token();
|
||||
let output_drained = session.output_drained_notify();
|
||||
let exit_token = process.cancellation_token();
|
||||
let output_drained = process.output_drained_notify();
|
||||
|
||||
tokio::spawn(async move {
|
||||
exit_token.cancelled().await;
|
||||
output_drained.notified().await;
|
||||
|
||||
let exit_code = session.exit_code().unwrap_or(-1);
|
||||
let exit_code = process.exit_code().unwrap_or(-1);
|
||||
let duration = Instant::now().saturating_duration_since(started_at);
|
||||
emit_exec_end_for_unified_exec(
|
||||
session_ref,
|
||||
|
|
|
|||
|
|
@ -3,11 +3,11 @@ use thiserror::Error;
|
|||
|
||||
#[derive(Debug, Error)]
|
||||
pub(crate) enum UnifiedExecError {
|
||||
#[error("Failed to create unified exec session: {message}")]
|
||||
CreateSession { message: String },
|
||||
// Called "session" in the model's training.
|
||||
#[error("Unknown session id {process_id}")]
|
||||
UnknownSessionId { process_id: String },
|
||||
#[error("Failed to create unified exec process: {message}")]
|
||||
CreateProcess { message: String },
|
||||
// The model is trained on `session_id`, but internally we track a `process_id`.
|
||||
#[error("Unknown process id {process_id}")]
|
||||
UnknownProcessId { process_id: String },
|
||||
#[error("failed to write to stdin")]
|
||||
WriteToStdin,
|
||||
#[error("missing command line for unified exec request")]
|
||||
|
|
@ -20,8 +20,8 @@ pub(crate) enum UnifiedExecError {
|
|||
}
|
||||
|
||||
impl UnifiedExecError {
|
||||
pub(crate) fn create_session(message: String) -> Self {
|
||||
Self::CreateSession { message }
|
||||
pub(crate) fn create_process(message: String) -> Self {
|
||||
Self::CreateProcess { message }
|
||||
}
|
||||
|
||||
pub(crate) fn sandbox_denied(message: String, output: ExecToolCallOutput) -> Self {
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
//! Unified Exec: interactive PTY execution orchestrated with approvals + sandboxing.
|
||||
//! Unified Exec: interactive process execution orchestrated with approvals + sandboxing.
|
||||
//!
|
||||
//! Responsibilities
|
||||
//! - Manages interactive PTY sessions (create, reuse, buffer output with caps).
|
||||
//! - Manages interactive processes (create, reuse, buffer output with caps).
|
||||
//! - Uses the shared ToolOrchestrator to handle approval, sandbox selection, and
|
||||
//! retry semantics in a single, descriptive flow.
|
||||
//! - Spawns the PTY from a sandbox‑transformed `ExecEnv`; on sandbox denial,
|
||||
|
|
@ -9,17 +9,17 @@
|
|||
//! - Uses the shared `is_likely_sandbox_denied` heuristic to keep denial messages
|
||||
//! consistent with other exec paths.
|
||||
//!
|
||||
//! Flow at a glance (open session)
|
||||
//! Flow at a glance (open process)
|
||||
//! 1) Build a small request `{ command, cwd }`.
|
||||
//! 2) Orchestrator: approval (bypass/cache/prompt) → select sandbox → run.
|
||||
//! 3) Runtime: transform `CommandSpec` → `ExecEnv` → spawn PTY.
|
||||
//! 4) If denial, orchestrator retries with `SandboxType::None`.
|
||||
//! 5) Session is returned with streaming output + metadata.
|
||||
//! 5) Process handle is returned with streaming output + metadata.
|
||||
//!
|
||||
//! This keeps policy logic and user interaction centralized while the PTY/session
|
||||
//! This keeps policy logic and user interaction centralized while the PTY/process
|
||||
//! concerns remain isolated here. The implementation is split between:
|
||||
//! - `session.rs`: PTY session lifecycle + output buffering.
|
||||
//! - `session_manager.rs`: orchestration (approvals, sandboxing, reuse) and request handling.
|
||||
//! - `process.rs`: PTY process lifecycle + output buffering.
|
||||
//! - `process_manager.rs`: orchestration (approvals, sandboxing, reuse) and request handling.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
|
|
@ -38,21 +38,21 @@ use crate::sandboxing::SandboxPermissions;
|
|||
mod async_watcher;
|
||||
mod errors;
|
||||
mod head_tail_buffer;
|
||||
mod session;
|
||||
mod session_manager;
|
||||
mod process;
|
||||
mod process_manager;
|
||||
|
||||
pub(crate) use errors::UnifiedExecError;
|
||||
pub(crate) use session::UnifiedExecSession;
|
||||
pub(crate) use process::UnifiedExecProcess;
|
||||
|
||||
pub(crate) const MIN_YIELD_TIME_MS: u64 = 250;
|
||||
pub(crate) const MAX_YIELD_TIME_MS: u64 = 30_000;
|
||||
pub(crate) const DEFAULT_MAX_OUTPUT_TOKENS: usize = 10_000;
|
||||
pub(crate) const UNIFIED_EXEC_OUTPUT_MAX_BYTES: usize = 1024 * 1024; // 1 MiB
|
||||
pub(crate) const UNIFIED_EXEC_OUTPUT_MAX_TOKENS: usize = UNIFIED_EXEC_OUTPUT_MAX_BYTES / 4;
|
||||
pub(crate) const MAX_UNIFIED_EXEC_SESSIONS: usize = 64;
|
||||
pub(crate) const MAX_UNIFIED_EXEC_PROCESSES: usize = 64;
|
||||
|
||||
// Send a warning message to the models when it reaches this number of sessions.
|
||||
pub(crate) const WARNING_UNIFIED_EXEC_SESSIONS: usize = 60;
|
||||
// Send a warning message to the models when it reaches this number of processes.
|
||||
pub(crate) const WARNING_UNIFIED_EXEC_PROCESSES: usize = 60;
|
||||
|
||||
pub(crate) struct UnifiedExecContext {
|
||||
pub session: Arc<Session>,
|
||||
|
|
@ -104,32 +104,32 @@ pub(crate) struct UnifiedExecResponse {
|
|||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct SessionStore {
|
||||
sessions: HashMap<String, SessionEntry>,
|
||||
reserved_sessions_id: HashSet<String>,
|
||||
pub(crate) struct ProcessStore {
|
||||
processes: HashMap<String, ProcessEntry>,
|
||||
reserved_process_ids: HashSet<String>,
|
||||
}
|
||||
|
||||
impl SessionStore {
|
||||
fn remove(&mut self, session_id: &str) -> Option<SessionEntry> {
|
||||
self.reserved_sessions_id.remove(session_id);
|
||||
self.sessions.remove(session_id)
|
||||
impl ProcessStore {
|
||||
fn remove(&mut self, process_id: &str) -> Option<ProcessEntry> {
|
||||
self.reserved_process_ids.remove(process_id);
|
||||
self.processes.remove(process_id)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct UnifiedExecSessionManager {
|
||||
session_store: Mutex<SessionStore>,
|
||||
pub(crate) struct UnifiedExecProcessManager {
|
||||
process_store: Mutex<ProcessStore>,
|
||||
}
|
||||
|
||||
impl Default for UnifiedExecSessionManager {
|
||||
impl Default for UnifiedExecProcessManager {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
session_store: Mutex::new(SessionStore::default()),
|
||||
process_store: Mutex::new(ProcessStore::default()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct SessionEntry {
|
||||
session: Arc<UnifiedExecSession>,
|
||||
struct ProcessEntry {
|
||||
process: Arc<UnifiedExecProcess>,
|
||||
session_ref: Arc<Session>,
|
||||
turn_ref: Arc<TurnContext>,
|
||||
call_id: String,
|
||||
|
|
@ -421,10 +421,10 @@ mod tests {
|
|||
session
|
||||
.services
|
||||
.unified_exec_manager
|
||||
.session_store
|
||||
.process_store
|
||||
.lock()
|
||||
.await
|
||||
.sessions
|
||||
.processes
|
||||
.is_empty()
|
||||
);
|
||||
|
||||
|
|
@ -432,7 +432,7 @@ mod tests {
|
|||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn reusing_completed_session_returns_unknown_session() -> anyhow::Result<()> {
|
||||
async fn reusing_completed_process_returns_unknown_process() -> anyhow::Result<()> {
|
||||
skip_if_sandbox!(Ok(()));
|
||||
|
||||
let (session, turn) = test_session_and_turn().await;
|
||||
|
|
@ -450,23 +450,23 @@ mod tests {
|
|||
|
||||
let err = write_stdin(&session, process_id, "", 100)
|
||||
.await
|
||||
.expect_err("expected unknown session error");
|
||||
.expect_err("expected unknown process error");
|
||||
|
||||
match err {
|
||||
UnifiedExecError::UnknownSessionId { process_id: err_id } => {
|
||||
UnifiedExecError::UnknownProcessId { process_id: err_id } => {
|
||||
assert_eq!(err_id, process_id, "process id should match request");
|
||||
}
|
||||
other => panic!("expected UnknownSessionId, got {other:?}"),
|
||||
other => panic!("expected UnknownProcessId, got {other:?}"),
|
||||
}
|
||||
|
||||
assert!(
|
||||
session
|
||||
.services
|
||||
.unified_exec_manager
|
||||
.session_store
|
||||
.process_store
|
||||
.lock()
|
||||
.await
|
||||
.sessions
|
||||
.processes
|
||||
.is_empty()
|
||||
);
|
||||
|
||||
|
|
|
|||
|
|
@ -30,8 +30,8 @@ pub(crate) struct OutputHandles {
|
|||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct UnifiedExecSession {
|
||||
session: ExecCommandSession,
|
||||
pub(crate) struct UnifiedExecProcess {
|
||||
process_handle: ExecCommandSession,
|
||||
output_buffer: OutputBuffer,
|
||||
output_notify: Arc<Notify>,
|
||||
cancellation_token: CancellationToken,
|
||||
|
|
@ -40,9 +40,9 @@ pub(crate) struct UnifiedExecSession {
|
|||
sandbox_type: SandboxType,
|
||||
}
|
||||
|
||||
impl UnifiedExecSession {
|
||||
impl UnifiedExecProcess {
|
||||
pub(super) fn new(
|
||||
session: ExecCommandSession,
|
||||
process_handle: ExecCommandSession,
|
||||
initial_output_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
|
||||
sandbox_type: SandboxType,
|
||||
) -> Self {
|
||||
|
|
@ -69,7 +69,7 @@ impl UnifiedExecSession {
|
|||
});
|
||||
|
||||
Self {
|
||||
session,
|
||||
process_handle,
|
||||
output_buffer,
|
||||
output_notify,
|
||||
cancellation_token,
|
||||
|
|
@ -80,7 +80,7 @@ impl UnifiedExecSession {
|
|||
}
|
||||
|
||||
pub(super) fn writer_sender(&self) -> mpsc::Sender<Vec<u8>> {
|
||||
self.session.writer_sender()
|
||||
self.process_handle.writer_sender()
|
||||
}
|
||||
|
||||
pub(super) fn output_handles(&self) -> OutputHandles {
|
||||
|
|
@ -92,7 +92,7 @@ impl UnifiedExecSession {
|
|||
}
|
||||
|
||||
pub(super) fn output_receiver(&self) -> tokio::sync::broadcast::Receiver<Vec<u8>> {
|
||||
self.session.output_receiver()
|
||||
self.process_handle.output_receiver()
|
||||
}
|
||||
|
||||
pub(super) fn cancellation_token(&self) -> CancellationToken {
|
||||
|
|
@ -104,15 +104,15 @@ impl UnifiedExecSession {
|
|||
}
|
||||
|
||||
pub(super) fn has_exited(&self) -> bool {
|
||||
self.session.has_exited()
|
||||
self.process_handle.has_exited()
|
||||
}
|
||||
|
||||
pub(super) fn exit_code(&self) -> Option<i32> {
|
||||
self.session.exit_code()
|
||||
self.process_handle.exit_code()
|
||||
}
|
||||
|
||||
pub(super) fn terminate(&self) {
|
||||
self.session.terminate();
|
||||
self.process_handle.terminate();
|
||||
self.cancellation_token.cancel();
|
||||
self.output_task.abort();
|
||||
}
|
||||
|
|
@ -164,7 +164,7 @@ impl UnifiedExecSession {
|
|||
TruncationPolicy::Tokens(UNIFIED_EXEC_OUTPUT_MAX_TOKENS),
|
||||
);
|
||||
let message = if snippet.is_empty() {
|
||||
format!("Session exited with code {exit_code}")
|
||||
format!("Process exited with code {exit_code}")
|
||||
} else {
|
||||
snippet
|
||||
};
|
||||
|
|
@ -178,11 +178,11 @@ impl UnifiedExecSession {
|
|||
sandbox_type: SandboxType,
|
||||
) -> Result<Self, UnifiedExecError> {
|
||||
let SpawnedPty {
|
||||
session,
|
||||
session: process_handle,
|
||||
output_rx,
|
||||
mut exit_rx,
|
||||
} = spawned;
|
||||
let managed = Self::new(session, output_rx, sandbox_type);
|
||||
let managed = Self::new(process_handle, output_rx, sandbox_type);
|
||||
|
||||
let exit_ready = matches!(exit_rx.try_recv(), Ok(_) | Err(TryRecvError::Closed));
|
||||
|
||||
|
|
@ -217,7 +217,7 @@ impl UnifiedExecSession {
|
|||
}
|
||||
}
|
||||
|
||||
impl Drop for UnifiedExecSession {
|
||||
impl Drop for UnifiedExecProcess {
|
||||
fn drop(&mut self) {
|
||||
self.terminate();
|
||||
}
|
||||
|
|
@ -30,14 +30,14 @@ use crate::truncate::TruncationPolicy;
|
|||
use crate::truncate::approx_token_count;
|
||||
use crate::truncate::formatted_truncate_text;
|
||||
use crate::unified_exec::ExecCommandRequest;
|
||||
use crate::unified_exec::MAX_UNIFIED_EXEC_SESSIONS;
|
||||
use crate::unified_exec::SessionEntry;
|
||||
use crate::unified_exec::SessionStore;
|
||||
use crate::unified_exec::MAX_UNIFIED_EXEC_PROCESSES;
|
||||
use crate::unified_exec::ProcessEntry;
|
||||
use crate::unified_exec::ProcessStore;
|
||||
use crate::unified_exec::UnifiedExecContext;
|
||||
use crate::unified_exec::UnifiedExecError;
|
||||
use crate::unified_exec::UnifiedExecProcessManager;
|
||||
use crate::unified_exec::UnifiedExecResponse;
|
||||
use crate::unified_exec::UnifiedExecSessionManager;
|
||||
use crate::unified_exec::WARNING_UNIFIED_EXEC_SESSIONS;
|
||||
use crate::unified_exec::WARNING_UNIFIED_EXEC_PROCESSES;
|
||||
use crate::unified_exec::WriteStdinRequest;
|
||||
use crate::unified_exec::async_watcher::emit_exec_end_for_unified_exec;
|
||||
use crate::unified_exec::async_watcher::spawn_exit_watcher;
|
||||
|
|
@ -45,10 +45,10 @@ use crate::unified_exec::async_watcher::start_streaming_output;
|
|||
use crate::unified_exec::clamp_yield_time;
|
||||
use crate::unified_exec::generate_chunk_id;
|
||||
use crate::unified_exec::head_tail_buffer::HeadTailBuffer;
|
||||
use crate::unified_exec::process::OutputBuffer;
|
||||
use crate::unified_exec::process::OutputHandles;
|
||||
use crate::unified_exec::process::UnifiedExecProcess;
|
||||
use crate::unified_exec::resolve_max_tokens;
|
||||
use crate::unified_exec::session::OutputBuffer;
|
||||
use crate::unified_exec::session::OutputHandles;
|
||||
use crate::unified_exec::session::UnifiedExecSession;
|
||||
|
||||
const UNIFIED_EXEC_ENV: [(&str, &str); 9] = [
|
||||
("NO_COLOR", "1"),
|
||||
|
|
@ -69,7 +69,7 @@ fn apply_unified_exec_env(mut env: HashMap<String, String>) -> HashMap<String, S
|
|||
env
|
||||
}
|
||||
|
||||
struct PreparedSessionHandles {
|
||||
struct PreparedProcessHandles {
|
||||
writer_tx: mpsc::Sender<Vec<u8>>,
|
||||
output_buffer: OutputBuffer,
|
||||
output_notify: Arc<Notify>,
|
||||
|
|
@ -80,10 +80,10 @@ struct PreparedSessionHandles {
|
|||
process_id: String,
|
||||
}
|
||||
|
||||
impl UnifiedExecSessionManager {
|
||||
impl UnifiedExecProcessManager {
|
||||
pub(crate) async fn allocate_process_id(&self) -> String {
|
||||
loop {
|
||||
let mut store = self.session_store.lock().await;
|
||||
let mut store = self.process_store.lock().await;
|
||||
|
||||
let process_id = if !cfg!(test) && !cfg!(feature = "deterministic_process_ids") {
|
||||
// production mode → random
|
||||
|
|
@ -91,7 +91,7 @@ impl UnifiedExecSessionManager {
|
|||
} else {
|
||||
// test or deterministic mode
|
||||
let next = store
|
||||
.reserved_sessions_id
|
||||
.reserved_process_ids
|
||||
.iter()
|
||||
.filter_map(|s| s.parse::<i32>().ok())
|
||||
.max()
|
||||
|
|
@ -101,17 +101,17 @@ impl UnifiedExecSessionManager {
|
|||
next.to_string()
|
||||
};
|
||||
|
||||
if store.reserved_sessions_id.contains(&process_id) {
|
||||
if store.reserved_process_ids.contains(&process_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
store.reserved_sessions_id.insert(process_id.clone());
|
||||
store.reserved_process_ids.insert(process_id.clone());
|
||||
return process_id;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn release_process_id(&self, process_id: &str) {
|
||||
let mut store = self.session_store.lock().await;
|
||||
let mut store = self.process_store.lock().await;
|
||||
store.remove(process_id);
|
||||
}
|
||||
|
||||
|
|
@ -125,7 +125,7 @@ impl UnifiedExecSessionManager {
|
|||
.clone()
|
||||
.unwrap_or_else(|| context.turn.cwd.clone());
|
||||
|
||||
let session = self
|
||||
let process = self
|
||||
.open_session_with_sandbox(
|
||||
&request.command,
|
||||
cwd.clone(),
|
||||
|
|
@ -135,8 +135,8 @@ impl UnifiedExecSessionManager {
|
|||
)
|
||||
.await;
|
||||
|
||||
let session = match session {
|
||||
Ok(session) => Arc::new(session),
|
||||
let process = match process {
|
||||
Ok(process) => Arc::new(process),
|
||||
Err(err) => {
|
||||
self.release_process_id(&request.process_id).await;
|
||||
return Err(err);
|
||||
|
|
@ -158,7 +158,7 @@ impl UnifiedExecSessionManager {
|
|||
);
|
||||
emitter.emit(event_ctx, ToolEventStage::Begin).await;
|
||||
|
||||
start_streaming_output(&session, context, Arc::clone(&transcript));
|
||||
start_streaming_output(&process, context, Arc::clone(&transcript));
|
||||
|
||||
let max_tokens = resolve_max_tokens(request.max_output_tokens);
|
||||
let yield_time_ms = clamp_yield_time(request.yield_time_ms);
|
||||
|
|
@ -171,7 +171,7 @@ impl UnifiedExecSessionManager {
|
|||
output_buffer,
|
||||
output_notify,
|
||||
cancellation_token,
|
||||
} = session.output_handles();
|
||||
} = process.output_handles();
|
||||
let deadline = start + Duration::from_millis(yield_time_ms);
|
||||
let collected = Self::collect_output_until_deadline(
|
||||
&output_buffer,
|
||||
|
|
@ -184,8 +184,8 @@ impl UnifiedExecSessionManager {
|
|||
|
||||
let text = String::from_utf8_lossy(&collected).to_string();
|
||||
let output = formatted_truncate_text(&text, TruncationPolicy::Tokens(max_tokens));
|
||||
let exit_code = session.exit_code();
|
||||
let has_exited = session.has_exited() || exit_code.is_some();
|
||||
let exit_code = process.exit_code();
|
||||
let has_exited = process.has_exited() || exit_code.is_some();
|
||||
let chunk_id = generate_chunk_id();
|
||||
let process_id = request.process_id.clone();
|
||||
if has_exited {
|
||||
|
|
@ -208,14 +208,14 @@ impl UnifiedExecSessionManager {
|
|||
.await;
|
||||
|
||||
self.release_process_id(&request.process_id).await;
|
||||
session.check_for_sandbox_denial_with_text(&text).await?;
|
||||
process.check_for_sandbox_denial_with_text(&text).await?;
|
||||
} else {
|
||||
// Long‑lived command: persist the session so write_stdin can reuse
|
||||
// Long‑lived command: persist the process so write_stdin can reuse
|
||||
// it, and register a background watcher that will emit
|
||||
// ExecCommandEnd when the PTY eventually exits (even if no further
|
||||
// tool calls are made).
|
||||
self.store_session(
|
||||
Arc::clone(&session),
|
||||
self.store_process(
|
||||
Arc::clone(&process),
|
||||
context,
|
||||
&request.command,
|
||||
cwd.clone(),
|
||||
|
|
@ -254,7 +254,7 @@ impl UnifiedExecSessionManager {
|
|||
) -> Result<UnifiedExecResponse, UnifiedExecError> {
|
||||
let process_id = request.process_id.to_string();
|
||||
|
||||
let PreparedSessionHandles {
|
||||
let PreparedProcessHandles {
|
||||
writer_tx,
|
||||
output_buffer,
|
||||
output_notify,
|
||||
|
|
@ -264,7 +264,7 @@ impl UnifiedExecSessionManager {
|
|||
command: session_command,
|
||||
process_id,
|
||||
..
|
||||
} = self.prepare_session_handles(process_id.as_str()).await?;
|
||||
} = self.prepare_process_handles(process_id.as_str()).await?;
|
||||
|
||||
if !request.input.is_empty() {
|
||||
Self::send_input(&writer_tx, request.input.as_bytes()).await?;
|
||||
|
|
@ -291,23 +291,23 @@ impl UnifiedExecSessionManager {
|
|||
let original_token_count = approx_token_count(&text);
|
||||
let chunk_id = generate_chunk_id();
|
||||
|
||||
// After polling, refresh_session_state tells us whether the PTY is
|
||||
// After polling, refresh_process_state tells us whether the PTY is
|
||||
// still alive or has exited and been removed from the store; we thread
|
||||
// that through so the handler can tag TerminalInteraction with an
|
||||
// appropriate process_id and exit_code.
|
||||
let status = self.refresh_session_state(process_id.as_str()).await;
|
||||
let status = self.refresh_process_state(process_id.as_str()).await;
|
||||
let (process_id, exit_code, event_call_id) = match status {
|
||||
SessionStatus::Alive {
|
||||
ProcessStatus::Alive {
|
||||
exit_code,
|
||||
call_id,
|
||||
process_id,
|
||||
} => (Some(process_id), exit_code, call_id),
|
||||
SessionStatus::Exited { exit_code, entry } => {
|
||||
ProcessStatus::Exited { exit_code, entry } => {
|
||||
let call_id = entry.call_id.clone();
|
||||
(None, exit_code, call_id)
|
||||
}
|
||||
SessionStatus::Unknown => {
|
||||
return Err(UnifiedExecError::UnknownSessionId {
|
||||
ProcessStatus::Unknown => {
|
||||
return Err(UnifiedExecError::UnknownProcessId {
|
||||
process_id: request.process_id.to_string(),
|
||||
});
|
||||
}
|
||||
|
|
@ -332,25 +332,25 @@ impl UnifiedExecSessionManager {
|
|||
Ok(response)
|
||||
}
|
||||
|
||||
async fn refresh_session_state(&self, process_id: &str) -> SessionStatus {
|
||||
let mut store = self.session_store.lock().await;
|
||||
let Some(entry) = store.sessions.get(process_id) else {
|
||||
return SessionStatus::Unknown;
|
||||
async fn refresh_process_state(&self, process_id: &str) -> ProcessStatus {
|
||||
let mut store = self.process_store.lock().await;
|
||||
let Some(entry) = store.processes.get(process_id) else {
|
||||
return ProcessStatus::Unknown;
|
||||
};
|
||||
|
||||
let exit_code = entry.session.exit_code();
|
||||
let exit_code = entry.process.exit_code();
|
||||
let process_id = entry.process_id.clone();
|
||||
|
||||
if entry.session.has_exited() {
|
||||
if entry.process.has_exited() {
|
||||
let Some(entry) = store.remove(&process_id) else {
|
||||
return SessionStatus::Unknown;
|
||||
return ProcessStatus::Unknown;
|
||||
};
|
||||
SessionStatus::Exited {
|
||||
ProcessStatus::Exited {
|
||||
exit_code,
|
||||
entry: Box::new(entry),
|
||||
}
|
||||
} else {
|
||||
SessionStatus::Alive {
|
||||
ProcessStatus::Alive {
|
||||
exit_code,
|
||||
call_id: entry.call_id.clone(),
|
||||
process_id,
|
||||
|
|
@ -358,16 +358,16 @@ impl UnifiedExecSessionManager {
|
|||
}
|
||||
}
|
||||
|
||||
async fn prepare_session_handles(
|
||||
async fn prepare_process_handles(
|
||||
&self,
|
||||
process_id: &str,
|
||||
) -> Result<PreparedSessionHandles, UnifiedExecError> {
|
||||
let mut store = self.session_store.lock().await;
|
||||
) -> Result<PreparedProcessHandles, UnifiedExecError> {
|
||||
let mut store = self.process_store.lock().await;
|
||||
let entry =
|
||||
store
|
||||
.sessions
|
||||
.processes
|
||||
.get_mut(process_id)
|
||||
.ok_or(UnifiedExecError::UnknownSessionId {
|
||||
.ok_or(UnifiedExecError::UnknownProcessId {
|
||||
process_id: process_id.to_string(),
|
||||
})?;
|
||||
entry.last_used = Instant::now();
|
||||
|
|
@ -375,10 +375,10 @@ impl UnifiedExecSessionManager {
|
|||
output_buffer,
|
||||
output_notify,
|
||||
cancellation_token,
|
||||
} = entry.session.output_handles();
|
||||
} = entry.process.output_handles();
|
||||
|
||||
Ok(PreparedSessionHandles {
|
||||
writer_tx: entry.session.writer_sender(),
|
||||
Ok(PreparedProcessHandles {
|
||||
writer_tx: entry.process.writer_sender(),
|
||||
output_buffer,
|
||||
output_notify,
|
||||
cancellation_token,
|
||||
|
|
@ -400,9 +400,9 @@ impl UnifiedExecSessionManager {
|
|||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn store_session(
|
||||
async fn store_process(
|
||||
&self,
|
||||
session: Arc<UnifiedExecSession>,
|
||||
process: Arc<UnifiedExecProcess>,
|
||||
context: &UnifiedExecContext,
|
||||
command: &[String],
|
||||
cwd: PathBuf,
|
||||
|
|
@ -410,8 +410,8 @@ impl UnifiedExecSessionManager {
|
|||
process_id: String,
|
||||
transcript: Arc<tokio::sync::Mutex<HeadTailBuffer>>,
|
||||
) {
|
||||
let entry = SessionEntry {
|
||||
session: Arc::clone(&session),
|
||||
let entry = ProcessEntry {
|
||||
process: Arc::clone(&process),
|
||||
session_ref: Arc::clone(&context.session),
|
||||
turn_ref: Arc::clone(&context.turn),
|
||||
call_id: context.call_id.clone(),
|
||||
|
|
@ -419,25 +419,25 @@ impl UnifiedExecSessionManager {
|
|||
command: command.to_vec(),
|
||||
last_used: started_at,
|
||||
};
|
||||
let number_sessions = {
|
||||
let mut store = self.session_store.lock().await;
|
||||
Self::prune_sessions_if_needed(&mut store);
|
||||
store.sessions.insert(process_id.clone(), entry);
|
||||
store.sessions.len()
|
||||
let number_processes = {
|
||||
let mut store = self.process_store.lock().await;
|
||||
Self::prune_processes_if_needed(&mut store);
|
||||
store.processes.insert(process_id.clone(), entry);
|
||||
store.processes.len()
|
||||
};
|
||||
|
||||
if number_sessions >= WARNING_UNIFIED_EXEC_SESSIONS {
|
||||
if number_processes >= WARNING_UNIFIED_EXEC_PROCESSES {
|
||||
context
|
||||
.session
|
||||
.record_model_warning(
|
||||
format!("The maximum number of unified exec sessions you can keep open is {WARNING_UNIFIED_EXEC_SESSIONS} and you currently have {number_sessions} sessions open. Reuse older sessions or close them to prevent automatic pruning of old session"),
|
||||
format!("The maximum number of unified exec processes you can keep open is {WARNING_UNIFIED_EXEC_PROCESSES} and you currently have {number_processes} processes open. Reuse older processes or close them to prevent automatic pruning of old processes"),
|
||||
&context.turn
|
||||
)
|
||||
.await;
|
||||
};
|
||||
|
||||
spawn_exit_watcher(
|
||||
Arc::clone(&session),
|
||||
Arc::clone(&process),
|
||||
Arc::clone(&context.session),
|
||||
Arc::clone(&context.turn),
|
||||
context.call_id.clone(),
|
||||
|
|
@ -471,7 +471,7 @@ impl UnifiedExecSessionManager {
|
|||
pub(crate) async fn open_session_with_exec_env(
|
||||
&self,
|
||||
env: &ExecEnv,
|
||||
) -> Result<UnifiedExecSession, UnifiedExecError> {
|
||||
) -> Result<UnifiedExecProcess, UnifiedExecError> {
|
||||
let (program, args) = env
|
||||
.command
|
||||
.split_first()
|
||||
|
|
@ -485,8 +485,8 @@ impl UnifiedExecSessionManager {
|
|||
&env.arg0,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| UnifiedExecError::create_session(err.to_string()))?;
|
||||
UnifiedExecSession::from_spawned(spawned, env.sandbox).await
|
||||
.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
|
||||
UnifiedExecProcess::from_spawned(spawned, env.sandbox).await
|
||||
}
|
||||
|
||||
pub(super) async fn open_session_with_sandbox(
|
||||
|
|
@ -496,7 +496,7 @@ impl UnifiedExecSessionManager {
|
|||
sandbox_permissions: SandboxPermissions,
|
||||
justification: Option<String>,
|
||||
context: &UnifiedExecContext,
|
||||
) -> Result<UnifiedExecSession, UnifiedExecError> {
|
||||
) -> Result<UnifiedExecProcess, UnifiedExecError> {
|
||||
let env = apply_unified_exec_env(create_env(&context.turn.shell_environment_policy));
|
||||
let features = context.session.features();
|
||||
let mut orchestrator = ToolOrchestrator::new();
|
||||
|
|
@ -536,7 +536,7 @@ impl UnifiedExecSessionManager {
|
|||
context.turn.approval_policy,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| UnifiedExecError::create_session(format!("{e:?}")))
|
||||
.map_err(|e| UnifiedExecError::create_process(format!("{e:?}")))
|
||||
}
|
||||
|
||||
pub(super) async fn collect_output_until_deadline(
|
||||
|
|
@ -600,20 +600,20 @@ impl UnifiedExecSessionManager {
|
|||
collected
|
||||
}
|
||||
|
||||
fn prune_sessions_if_needed(store: &mut SessionStore) -> bool {
|
||||
if store.sessions.len() < MAX_UNIFIED_EXEC_SESSIONS {
|
||||
fn prune_processes_if_needed(store: &mut ProcessStore) -> bool {
|
||||
if store.processes.len() < MAX_UNIFIED_EXEC_PROCESSES {
|
||||
return false;
|
||||
}
|
||||
|
||||
let meta: Vec<(String, Instant, bool)> = store
|
||||
.sessions
|
||||
.processes
|
||||
.iter()
|
||||
.map(|(id, entry)| (id.clone(), entry.last_used, entry.session.has_exited()))
|
||||
.map(|(id, entry)| (id.clone(), entry.last_used, entry.process.has_exited()))
|
||||
.collect();
|
||||
|
||||
if let Some(session_id) = Self::session_id_to_prune_from_meta(&meta) {
|
||||
if let Some(entry) = store.remove(&session_id) {
|
||||
entry.session.terminate();
|
||||
if let Some(process_id) = Self::process_id_to_prune_from_meta(&meta) {
|
||||
if let Some(entry) = store.remove(&process_id) {
|
||||
entry.process.terminate();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
|
@ -622,7 +622,7 @@ impl UnifiedExecSessionManager {
|
|||
}
|
||||
|
||||
// Centralized pruning policy so we can easily swap strategies later.
|
||||
fn session_id_to_prune_from_meta(meta: &[(String, Instant, bool)]) -> Option<String> {
|
||||
fn process_id_to_prune_from_meta(meta: &[(String, Instant, bool)]) -> Option<String> {
|
||||
if meta.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
|
@ -650,22 +650,25 @@ impl UnifiedExecSessionManager {
|
|||
.map(|(process_id, _, _)| process_id)
|
||||
}
|
||||
|
||||
pub(crate) async fn terminate_all_sessions(&self) {
|
||||
let entries: Vec<SessionEntry> = {
|
||||
let mut sessions = self.session_store.lock().await;
|
||||
let entries: Vec<SessionEntry> =
|
||||
sessions.sessions.drain().map(|(_, entry)| entry).collect();
|
||||
sessions.reserved_sessions_id.clear();
|
||||
pub(crate) async fn terminate_all_processes(&self) {
|
||||
let entries: Vec<ProcessEntry> = {
|
||||
let mut processes = self.process_store.lock().await;
|
||||
let entries: Vec<ProcessEntry> = processes
|
||||
.processes
|
||||
.drain()
|
||||
.map(|(_, entry)| entry)
|
||||
.collect();
|
||||
processes.reserved_process_ids.clear();
|
||||
entries
|
||||
};
|
||||
|
||||
for entry in entries {
|
||||
entry.session.terminate();
|
||||
entry.process.terminate();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum SessionStatus {
|
||||
enum ProcessStatus {
|
||||
Alive {
|
||||
exit_code: Option<i32>,
|
||||
call_id: String,
|
||||
|
|
@ -673,7 +676,7 @@ enum SessionStatus {
|
|||
},
|
||||
Exited {
|
||||
exit_code: Option<i32>,
|
||||
entry: Box<SessionEntry>,
|
||||
entry: Box<ProcessEntry>,
|
||||
},
|
||||
Unknown,
|
||||
}
|
||||
|
|
@ -716,7 +719,7 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn pruning_prefers_exited_sessions_outside_recently_used() {
|
||||
fn pruning_prefers_exited_processes_outside_recently_used() {
|
||||
let now = Instant::now();
|
||||
let id = |n: i32| n.to_string();
|
||||
let meta = vec![
|
||||
|
|
@ -732,7 +735,7 @@ mod tests {
|
|||
(id(10), now - Duration::from_secs(13), false),
|
||||
];
|
||||
|
||||
let candidate = UnifiedExecSessionManager::session_id_to_prune_from_meta(&meta);
|
||||
let candidate = UnifiedExecProcessManager::process_id_to_prune_from_meta(&meta);
|
||||
|
||||
assert_eq!(candidate, Some(id(2)));
|
||||
}
|
||||
|
|
@ -754,13 +757,13 @@ mod tests {
|
|||
(id(10), now - Duration::from_secs(13), false),
|
||||
];
|
||||
|
||||
let candidate = UnifiedExecSessionManager::session_id_to_prune_from_meta(&meta);
|
||||
let candidate = UnifiedExecProcessManager::process_id_to_prune_from_meta(&meta);
|
||||
|
||||
assert_eq!(candidate, Some(id(1)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pruning_protects_recent_sessions_even_if_exited() {
|
||||
fn pruning_protects_recent_processes_even_if_exited() {
|
||||
let now = Instant::now();
|
||||
let id = |n: i32| n.to_string();
|
||||
let meta = vec![
|
||||
|
|
@ -776,7 +779,7 @@ mod tests {
|
|||
(id(10), now - Duration::from_secs(13), true),
|
||||
];
|
||||
|
||||
let candidate = UnifiedExecSessionManager::session_id_to_prune_from_meta(&meta);
|
||||
let candidate = UnifiedExecProcessManager::process_id_to_prune_from_meta(&meta);
|
||||
|
||||
// (10) is exited but among the last 8; we should drop the LRU outside that set.
|
||||
assert_eq!(candidate, Some(id(1)));
|
||||
|
|
@ -1689,7 +1689,7 @@ async fn unified_exec_closes_long_running_session_at_turn_end() -> Result<()> {
|
|||
codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "close unified exec sessions on turn end".into(),
|
||||
text: "close unified exec processes on turn end".into(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: cwd.path().to_path_buf(),
|
||||
|
|
@ -1710,7 +1710,7 @@ async fn unified_exec_closes_long_running_session_at_turn_end() -> Result<()> {
|
|||
let begin_process_id = begin_event
|
||||
.process_id
|
||||
.clone()
|
||||
.expect("expected process_id for long-running unified exec session");
|
||||
.expect("expected process_id for long-running unified exec process");
|
||||
|
||||
let pid = wait_for_pid_file(&pid_path).await?;
|
||||
assert!(
|
||||
|
|
@ -2560,7 +2560,7 @@ async fn unified_exec_prunes_exited_sessions_first() -> Result<()> {
|
|||
let prune_start = requests
|
||||
.iter()
|
||||
.find_map(|req| req.function_call_output_text(prune_call_id))
|
||||
.expect("missing initial prune session output");
|
||||
.expect("missing initial prune process output");
|
||||
let prune_start_output = parse_unified_exec_output(&prune_start)?;
|
||||
assert!(prune_start_output.process_id.is_some());
|
||||
assert!(prune_start_output.exit_code.is_none());
|
||||
|
|
@ -2573,7 +2573,7 @@ async fn unified_exec_prunes_exited_sessions_first() -> Result<()> {
|
|||
assert!(keep_write_output.process_id.is_some());
|
||||
assert!(
|
||||
keep_write_output.output.contains("still alive"),
|
||||
"expected cat session to echo input, got {:?}",
|
||||
"expected cat process to echo input, got {:?}",
|
||||
keep_write_output.output
|
||||
);
|
||||
|
||||
|
|
@ -2582,7 +2582,7 @@ async fn unified_exec_prunes_exited_sessions_first() -> Result<()> {
|
|||
.find_map(|req| req.function_call_output_text(probe_call_id))
|
||||
.expect("missing probe output");
|
||||
assert!(
|
||||
pruned_probe.contains("UnknownSessionId") || pruned_probe.contains("Unknown process id"),
|
||||
pruned_probe.contains("UnknownProcessId") || pruned_probe.contains("Unknown process id"),
|
||||
"expected probe to fail after pruning, got {pruned_probe:?}"
|
||||
);
|
||||
|
||||
|
|
|
|||
|
|
@ -416,8 +416,8 @@ impl BottomPane {
|
|||
self.request_redraw();
|
||||
}
|
||||
|
||||
pub(crate) fn set_unified_exec_sessions(&mut self, sessions: Vec<String>) {
|
||||
if self.unified_exec_footer.set_sessions(sessions) {
|
||||
pub(crate) fn set_unified_exec_processes(&mut self, processes: Vec<String>) {
|
||||
if self.unified_exec_footer.set_processes(processes) {
|
||||
self.request_redraw();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,34 +8,34 @@ use crate::live_wrap::take_prefix_by_width;
|
|||
use crate::render::renderable::Renderable;
|
||||
|
||||
pub(crate) struct UnifiedExecFooter {
|
||||
sessions: Vec<String>,
|
||||
processes: Vec<String>,
|
||||
}
|
||||
|
||||
impl UnifiedExecFooter {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
sessions: Vec::new(),
|
||||
processes: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn set_sessions(&mut self, sessions: Vec<String>) -> bool {
|
||||
if self.sessions == sessions {
|
||||
pub(crate) fn set_processes(&mut self, processes: Vec<String>) -> bool {
|
||||
if self.processes == processes {
|
||||
return false;
|
||||
}
|
||||
self.sessions = sessions;
|
||||
self.processes = processes;
|
||||
true
|
||||
}
|
||||
|
||||
pub(crate) fn is_empty(&self) -> bool {
|
||||
self.sessions.is_empty()
|
||||
self.processes.is_empty()
|
||||
}
|
||||
|
||||
fn render_lines(&self, width: u16) -> Vec<Line<'static>> {
|
||||
if self.sessions.is_empty() || width < 4 {
|
||||
if self.processes.is_empty() || width < 4 {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
let count = self.sessions.len();
|
||||
let count = self.processes.len();
|
||||
let plural = if count == 1 { "" } else { "s" };
|
||||
let message = format!(" {count} background terminal{plural} running · /ps to view");
|
||||
let (truncated, _, _) = take_prefix_by_width(&message, width as usize);
|
||||
|
|
@ -72,7 +72,7 @@ mod tests {
|
|||
#[test]
|
||||
fn render_more_sessions() {
|
||||
let mut footer = UnifiedExecFooter::new();
|
||||
footer.set_sessions(vec!["rg \"foo\" src".to_string()]);
|
||||
footer.set_processes(vec!["rg \"foo\" src".to_string()]);
|
||||
let width = 50;
|
||||
let height = footer.desired_height(width);
|
||||
let mut buf = Buffer::empty(Rect::new(0, 0, width, height));
|
||||
|
|
@ -83,7 +83,7 @@ mod tests {
|
|||
#[test]
|
||||
fn render_many_sessions() {
|
||||
let mut footer = UnifiedExecFooter::new();
|
||||
footer.set_sessions((0..123).map(|idx| format!("cmd {idx}")).collect());
|
||||
footer.set_processes((0..123).map(|idx| format!("cmd {idx}")).collect());
|
||||
let width = 50;
|
||||
let height = footer.desired_height(width);
|
||||
let mut buf = Buffer::empty(Rect::new(0, 0, width, height));
|
||||
|
|
|
|||
|
|
@ -154,7 +154,7 @@ struct RunningCommand {
|
|||
source: ExecCommandSource,
|
||||
}
|
||||
|
||||
struct UnifiedExecSessionSummary {
|
||||
struct UnifiedExecProcessSummary {
|
||||
key: String,
|
||||
command_display: String,
|
||||
}
|
||||
|
|
@ -332,7 +332,7 @@ pub(crate) struct ChatWidget {
|
|||
suppressed_exec_calls: HashSet<String>,
|
||||
last_unified_wait: Option<UnifiedExecWaitState>,
|
||||
task_complete_pending: bool,
|
||||
unified_exec_sessions: Vec<UnifiedExecSessionSummary>,
|
||||
unified_exec_processes: Vec<UnifiedExecProcessSummary>,
|
||||
mcp_startup_status: Option<HashMap<String, McpStartupStatus>>,
|
||||
// Queue of interruptive UI events deferred during an active write cycle
|
||||
interrupts: InterruptManager,
|
||||
|
|
@ -801,7 +801,7 @@ impl ChatWidget {
|
|||
fn on_interrupted_turn(&mut self, reason: TurnAbortReason) {
|
||||
// Finalize, log a gentle prompt, and clear running state.
|
||||
self.finalize_turn();
|
||||
self.unified_exec_sessions.clear();
|
||||
self.unified_exec_processes.clear();
|
||||
self.sync_unified_exec_footer();
|
||||
|
||||
if reason != TurnAbortReason::ReviewEnded {
|
||||
|
|
@ -868,7 +868,7 @@ impl ChatWidget {
|
|||
fn on_exec_command_begin(&mut self, ev: ExecCommandBeginEvent) {
|
||||
self.flush_answer_stream_with_separator();
|
||||
if is_unified_exec_source(ev.source) {
|
||||
self.track_unified_exec_session_begin(&ev);
|
||||
self.track_unified_exec_process_begin(&ev);
|
||||
if !is_standard_tool_call(&ev.parsed_cmd) {
|
||||
return;
|
||||
}
|
||||
|
|
@ -887,10 +887,10 @@ impl ChatWidget {
|
|||
fn on_terminal_interaction(&mut self, ev: TerminalInteractionEvent) {
|
||||
self.flush_answer_stream_with_separator();
|
||||
let command_display = self
|
||||
.unified_exec_sessions
|
||||
.unified_exec_processes
|
||||
.iter()
|
||||
.find(|session| session.key == ev.process_id)
|
||||
.map(|session| session.command_display.clone());
|
||||
.find(|process| process.key == ev.process_id)
|
||||
.map(|process| process.command_display.clone());
|
||||
if ev.stdin.is_empty() {
|
||||
// Empty stdin means we are still waiting on background output; keep a live shimmer cell.
|
||||
if let Some(wait_cell) = self.active_cell.as_mut().and_then(|cell| {
|
||||
|
|
@ -898,7 +898,7 @@ impl ChatWidget {
|
|||
.downcast_mut::<history_cell::UnifiedExecWaitCell>()
|
||||
}) && wait_cell.matches(command_display.as_deref())
|
||||
{
|
||||
// Same session still waiting; update command display if it shows up late.
|
||||
// Same process still waiting; update command display if it shows up late.
|
||||
wait_cell.update_command_display(command_display);
|
||||
self.request_redraw();
|
||||
return;
|
||||
|
|
@ -967,7 +967,7 @@ impl ChatWidget {
|
|||
|
||||
fn on_exec_command_end(&mut self, ev: ExecCommandEndEvent) {
|
||||
if is_unified_exec_source(ev.source) {
|
||||
self.track_unified_exec_session_end(&ev);
|
||||
self.track_unified_exec_process_end(&ev);
|
||||
if !self.bottom_pane.is_task_running() {
|
||||
return;
|
||||
}
|
||||
|
|
@ -976,20 +976,20 @@ impl ChatWidget {
|
|||
self.defer_or_handle(|q| q.push_exec_end(ev), |s| s.handle_exec_end_now(ev2));
|
||||
}
|
||||
|
||||
fn track_unified_exec_session_begin(&mut self, ev: &ExecCommandBeginEvent) {
|
||||
fn track_unified_exec_process_begin(&mut self, ev: &ExecCommandBeginEvent) {
|
||||
if ev.source != ExecCommandSource::UnifiedExecStartup {
|
||||
return;
|
||||
}
|
||||
let key = ev.process_id.clone().unwrap_or(ev.call_id.to_string());
|
||||
let command_display = strip_bash_lc_and_escape(&ev.command);
|
||||
if let Some(existing) = self
|
||||
.unified_exec_sessions
|
||||
.unified_exec_processes
|
||||
.iter_mut()
|
||||
.find(|session| session.key == key)
|
||||
.find(|process| process.key == key)
|
||||
{
|
||||
existing.command_display = command_display;
|
||||
} else {
|
||||
self.unified_exec_sessions.push(UnifiedExecSessionSummary {
|
||||
self.unified_exec_processes.push(UnifiedExecProcessSummary {
|
||||
key,
|
||||
command_display,
|
||||
});
|
||||
|
|
@ -997,23 +997,23 @@ impl ChatWidget {
|
|||
self.sync_unified_exec_footer();
|
||||
}
|
||||
|
||||
fn track_unified_exec_session_end(&mut self, ev: &ExecCommandEndEvent) {
|
||||
fn track_unified_exec_process_end(&mut self, ev: &ExecCommandEndEvent) {
|
||||
let key = ev.process_id.clone().unwrap_or(ev.call_id.to_string());
|
||||
let before = self.unified_exec_sessions.len();
|
||||
self.unified_exec_sessions
|
||||
.retain(|session| session.key != key);
|
||||
if self.unified_exec_sessions.len() != before {
|
||||
let before = self.unified_exec_processes.len();
|
||||
self.unified_exec_processes
|
||||
.retain(|process| process.key != key);
|
||||
if self.unified_exec_processes.len() != before {
|
||||
self.sync_unified_exec_footer();
|
||||
}
|
||||
}
|
||||
|
||||
fn sync_unified_exec_footer(&mut self) {
|
||||
let sessions = self
|
||||
.unified_exec_sessions
|
||||
let processes = self
|
||||
.unified_exec_processes
|
||||
.iter()
|
||||
.map(|session| session.command_display.clone())
|
||||
.map(|process| process.command_display.clone())
|
||||
.collect();
|
||||
self.bottom_pane.set_unified_exec_sessions(sessions);
|
||||
self.bottom_pane.set_unified_exec_processes(processes);
|
||||
}
|
||||
|
||||
fn on_mcp_tool_call_begin(&mut self, ev: McpToolCallBeginEvent) {
|
||||
|
|
@ -1459,7 +1459,7 @@ impl ChatWidget {
|
|||
suppressed_exec_calls: HashSet::new(),
|
||||
last_unified_wait: None,
|
||||
task_complete_pending: false,
|
||||
unified_exec_sessions: Vec::new(),
|
||||
unified_exec_processes: Vec::new(),
|
||||
mcp_startup_status: None,
|
||||
interrupts: InterruptManager::new(),
|
||||
reasoning_buffer: String::new(),
|
||||
|
|
@ -1545,7 +1545,7 @@ impl ChatWidget {
|
|||
suppressed_exec_calls: HashSet::new(),
|
||||
last_unified_wait: None,
|
||||
task_complete_pending: false,
|
||||
unified_exec_sessions: Vec::new(),
|
||||
unified_exec_processes: Vec::new(),
|
||||
mcp_startup_status: None,
|
||||
interrupts: InterruptManager::new(),
|
||||
reasoning_buffer: String::new(),
|
||||
|
|
@ -2295,12 +2295,12 @@ impl ChatWidget {
|
|||
}
|
||||
|
||||
pub(crate) fn add_ps_output(&mut self) {
|
||||
let sessions = self
|
||||
.unified_exec_sessions
|
||||
let processes = self
|
||||
.unified_exec_processes
|
||||
.iter()
|
||||
.map(|session| session.command_display.clone())
|
||||
.map(|process| process.command_display.clone())
|
||||
.collect();
|
||||
self.add_to_history(history_cell::new_unified_exec_sessions_output(sessions));
|
||||
self.add_to_history(history_cell::new_unified_exec_processes_output(processes));
|
||||
}
|
||||
|
||||
fn stop_rate_limit_poller(&mut self) {
|
||||
|
|
|
|||
|
|
@ -388,7 +388,7 @@ async fn make_chatwidget_manual(
|
|||
suppressed_exec_calls: HashSet::new(),
|
||||
last_unified_wait: None,
|
||||
task_complete_pending: false,
|
||||
unified_exec_sessions: Vec::new(),
|
||||
unified_exec_processes: Vec::new(),
|
||||
mcp_startup_status: None,
|
||||
interrupts: InterruptManager::new(),
|
||||
reasoning_buffer: String::new(),
|
||||
|
|
@ -2659,12 +2659,12 @@ async fn interrupt_prepends_queued_messages_before_existing_composer_text() {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn interrupt_clears_unified_exec_sessions() {
|
||||
async fn interrupt_clears_unified_exec_processes() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
|
||||
|
||||
begin_unified_exec_startup(&mut chat, "call-1", "process-1", "sleep 5");
|
||||
begin_unified_exec_startup(&mut chat, "call-2", "process-2", "sleep 6");
|
||||
assert_eq!(chat.unified_exec_sessions.len(), 2);
|
||||
assert_eq!(chat.unified_exec_processes.len(), 2);
|
||||
|
||||
chat.handle_codex_event(Event {
|
||||
id: "turn-1".into(),
|
||||
|
|
@ -2673,7 +2673,7 @@ async fn interrupt_clears_unified_exec_sessions() {
|
|||
}),
|
||||
});
|
||||
|
||||
assert!(chat.unified_exec_sessions.is_empty());
|
||||
assert!(chat.unified_exec_processes.is_empty());
|
||||
|
||||
let _ = drain_insert_history(&mut rx);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -517,29 +517,29 @@ pub(crate) fn new_unified_exec_wait_live(
|
|||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct UnifiedExecSessionsCell {
|
||||
sessions: Vec<String>,
|
||||
struct UnifiedExecProcessesCell {
|
||||
processes: Vec<String>,
|
||||
}
|
||||
|
||||
impl UnifiedExecSessionsCell {
|
||||
fn new(sessions: Vec<String>) -> Self {
|
||||
Self { sessions }
|
||||
impl UnifiedExecProcessesCell {
|
||||
fn new(processes: Vec<String>) -> Self {
|
||||
Self { processes }
|
||||
}
|
||||
}
|
||||
|
||||
impl HistoryCell for UnifiedExecSessionsCell {
|
||||
impl HistoryCell for UnifiedExecProcessesCell {
|
||||
fn display_lines(&self, width: u16) -> Vec<Line<'static>> {
|
||||
if width == 0 {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
let wrap_width = width as usize;
|
||||
let max_sessions = 16usize;
|
||||
let max_processes = 16usize;
|
||||
let mut out: Vec<Line<'static>> = Vec::new();
|
||||
out.push(vec!["Background terminals".bold()].into());
|
||||
out.push("".into());
|
||||
|
||||
if self.sessions.is_empty() {
|
||||
if self.processes.is_empty() {
|
||||
out.push(" • No background terminals running.".italic().into());
|
||||
return out;
|
||||
}
|
||||
|
|
@ -549,8 +549,8 @@ impl HistoryCell for UnifiedExecSessionsCell {
|
|||
let truncation_suffix = " [...]";
|
||||
let truncation_suffix_width = UnicodeWidthStr::width(truncation_suffix);
|
||||
let mut shown = 0usize;
|
||||
for command in &self.sessions {
|
||||
if shown >= max_sessions {
|
||||
for command in &self.processes {
|
||||
if shown >= max_processes {
|
||||
break;
|
||||
}
|
||||
let (snippet, snippet_truncated) = {
|
||||
|
|
@ -590,7 +590,7 @@ impl HistoryCell for UnifiedExecSessionsCell {
|
|||
shown += 1;
|
||||
}
|
||||
|
||||
let remaining = self.sessions.len().saturating_sub(shown);
|
||||
let remaining = self.processes.len().saturating_sub(shown);
|
||||
if remaining > 0 {
|
||||
let more_text = format!("... and {remaining} more running");
|
||||
if wrap_width <= prefix_width {
|
||||
|
|
@ -610,9 +610,9 @@ impl HistoryCell for UnifiedExecSessionsCell {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) fn new_unified_exec_sessions_output(sessions: Vec<String>) -> CompositeHistoryCell {
|
||||
pub(crate) fn new_unified_exec_processes_output(processes: Vec<String>) -> CompositeHistoryCell {
|
||||
let command = PlainHistoryCell::new(vec!["/ps".magenta().into()]);
|
||||
let summary = UnifiedExecSessionsCell::new(sessions);
|
||||
let summary = UnifiedExecProcessesCell::new(processes);
|
||||
CompositeHistoryCell::new(vec![Box::new(command), Box::new(summary)])
|
||||
}
|
||||
|
||||
|
|
@ -1819,14 +1819,14 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn ps_output_empty_snapshot() {
|
||||
let cell = new_unified_exec_sessions_output(Vec::new());
|
||||
let cell = new_unified_exec_processes_output(Vec::new());
|
||||
let rendered = render_lines(&cell.display_lines(60)).join("\n");
|
||||
insta::assert_snapshot!(rendered);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ps_output_multiline_snapshot() {
|
||||
let cell = new_unified_exec_sessions_output(vec![
|
||||
let cell = new_unified_exec_processes_output(vec![
|
||||
"echo hello\nand then some extra text".to_string(),
|
||||
"rg \"foo\" src".to_string(),
|
||||
]);
|
||||
|
|
@ -1836,7 +1836,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn ps_output_long_command_snapshot() {
|
||||
let cell = new_unified_exec_sessions_output(vec![String::from(
|
||||
let cell = new_unified_exec_processes_output(vec![String::from(
|
||||
"rg \"foo\" src --glob '**/*.rs' --max-count 1000 --no-ignore --hidden --follow --glob '!target/**'",
|
||||
)]);
|
||||
let rendered = render_lines(&cell.display_lines(36)).join("\n");
|
||||
|
|
@ -1845,8 +1845,9 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn ps_output_many_sessions_snapshot() {
|
||||
let cell =
|
||||
new_unified_exec_sessions_output((0..20).map(|idx| format!("command {idx}")).collect());
|
||||
let cell = new_unified_exec_processes_output(
|
||||
(0..20).map(|idx| format!("command {idx}")).collect(),
|
||||
);
|
||||
let rendered = render_lines(&cell.display_lines(32)).join("\n");
|
||||
insta::assert_snapshot!(rendered);
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue