From 31bf1dbe63d06a45de78a0701cf3593d343a4d9b Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Tue, 10 Mar 2026 18:38:39 -0700 Subject: [PATCH] Make unified exec session_id numeric (#14279) It's a number on the write_stdin input, make it a number on the output and also internally. --- codex-rs/core/src/tools/context.rs | 6 +- .../core/src/tools/handlers/unified_exec.rs | 8 +- codex-rs/core/src/tools/spec.rs | 2 +- .../core/src/unified_exec/async_watcher.rs | 4 +- codex-rs/core/src/unified_exec/errors.rs | 2 +- codex-rs/core/src/unified_exec/mod.rs | 50 ++---- .../core/src/unified_exec/process_manager.rs | 154 +++++++++--------- 7 files changed, 99 insertions(+), 127 deletions(-) diff --git a/codex-rs/core/src/tools/context.rs b/codex-rs/core/src/tools/context.rs index 36a328b37..041de50f5 100644 --- a/codex-rs/core/src/tools/context.rs +++ b/codex-rs/core/src/tools/context.rs @@ -159,7 +159,7 @@ pub struct ExecCommandToolOutput { /// Raw bytes returned for this unified exec call before any truncation. pub raw_output: Vec, pub max_output_tokens: Option, - pub process_id: Option, + pub process_id: Option, pub exit_code: Option, pub original_token_count: Option, pub session_command: Option>, @@ -194,7 +194,7 @@ impl ToolOutput for ExecCommandToolOutput { #[serde(skip_serializing_if = "Option::is_none")] exit_code: Option, #[serde(skip_serializing_if = "Option::is_none")] - session_id: Option, + session_id: Option, #[serde(skip_serializing_if = "Option::is_none")] original_token_count: Option, output: String, @@ -204,7 +204,7 @@ impl ToolOutput for ExecCommandToolOutput { chunk_id: (!self.chunk_id.is_empty()).then(|| self.chunk_id.clone()), wall_time_seconds: self.wall_time.as_secs_f64(), exit_code: self.exit_code, - session_id: self.process_id.clone(), + session_id: self.process_id, original_token_count: self.original_token_count, output: self.truncated_output(), }; diff --git a/codex-rs/core/src/tools/handlers/unified_exec.rs b/codex-rs/core/src/tools/handlers/unified_exec.rs index 4eb7125e9..edc6763ef 100644 --- a/codex-rs/core/src/tools/handlers/unified_exec.rs +++ b/codex-rs/core/src/tools/handlers/unified_exec.rs @@ -190,7 +190,7 @@ impl ToolHandler for UnifiedExecHandler { ) { let approval_policy = context.turn.approval_policy.value(); - manager.release_process_id(&process_id).await; + manager.release_process_id(process_id).await; return Err(FunctionCallError::RespondToModel(format!( "approval policy is {approval_policy:?}; reject command — you cannot ask for escalated permissions if the approval policy is {approval_policy:?}" ))); @@ -211,7 +211,7 @@ impl ToolHandler for UnifiedExecHandler { ) { Ok(normalized) => normalized, Err(err) => { - manager.release_process_id(&process_id).await; + manager.release_process_id(process_id).await; return Err(FunctionCallError::RespondToModel(err)); } }; @@ -228,7 +228,7 @@ impl ToolHandler for UnifiedExecHandler { ) .await? { - manager.release_process_id(&process_id).await; + manager.release_process_id(process_id).await; return Ok(ExecCommandToolOutput { event_call_id: String::new(), chunk_id: String::new(), @@ -271,7 +271,7 @@ impl ToolHandler for UnifiedExecHandler { let args: WriteStdinArgs = parse_arguments(&arguments)?; let response = manager .write_stdin(WriteStdinRequest { - process_id: &args.session_id.to_string(), + process_id: args.session_id, input: &args.chars, yield_time_ms: args.yield_time_ms, max_output_tokens: args.max_output_tokens, diff --git a/codex-rs/core/src/tools/spec.rs b/codex-rs/core/src/tools/spec.rs index 321a5377f..8f7a25076 100644 --- a/codex-rs/core/src/tools/spec.rs +++ b/codex-rs/core/src/tools/spec.rs @@ -59,7 +59,7 @@ fn unified_exec_output_schema() -> JsonValue { "description": "Process exit code when the command finished during this call." }, "session_id": { - "type": "string", + "type": "number", "description": "Session identifier to pass to write_stdin when the process is still running." }, "original_token_count": { diff --git a/codex-rs/core/src/unified_exec/async_watcher.rs b/codex-rs/core/src/unified_exec/async_watcher.rs index 1fbb1e8f6..47543a00f 100644 --- a/codex-rs/core/src/unified_exec/async_watcher.rs +++ b/codex-rs/core/src/unified_exec/async_watcher.rs @@ -110,7 +110,7 @@ pub(crate) fn spawn_exit_watcher( call_id: String, command: Vec, cwd: PathBuf, - process_id: String, + process_id: i32, transcript: Arc>, started_at: Instant, ) { @@ -129,7 +129,7 @@ pub(crate) fn spawn_exit_watcher( call_id, command, cwd, - Some(process_id), + Some(process_id.to_string()), transcript, String::new(), exit_code, diff --git a/codex-rs/core/src/unified_exec/errors.rs b/codex-rs/core/src/unified_exec/errors.rs index 284c7bca6..966775eee 100644 --- a/codex-rs/core/src/unified_exec/errors.rs +++ b/codex-rs/core/src/unified_exec/errors.rs @@ -7,7 +7,7 @@ pub(crate) enum UnifiedExecError { CreateProcess { message: String }, // The model is trained on `session_id`, but internally we track a `process_id`. #[error("Unknown process id {process_id}")] - UnknownProcessId { process_id: String }, + UnknownProcessId { process_id: i32 }, #[error("failed to write to stdin")] WriteToStdin, #[error( diff --git a/codex-rs/core/src/unified_exec/mod.rs b/codex-rs/core/src/unified_exec/mod.rs index c090346a2..91af47acc 100644 --- a/codex-rs/core/src/unified_exec/mod.rs +++ b/codex-rs/core/src/unified_exec/mod.rs @@ -86,7 +86,7 @@ impl UnifiedExecContext { #[derive(Debug)] pub(crate) struct ExecCommandRequest { pub command: Vec, - pub process_id: String, + pub process_id: i32, pub yield_time_ms: u64, pub max_output_tokens: Option, pub workdir: Option, @@ -101,7 +101,7 @@ pub(crate) struct ExecCommandRequest { #[derive(Debug)] pub(crate) struct WriteStdinRequest<'a> { - pub process_id: &'a str, + pub process_id: i32, pub input: &'a str, pub yield_time_ms: u64, pub max_output_tokens: Option, @@ -109,14 +109,14 @@ pub(crate) struct WriteStdinRequest<'a> { #[derive(Default)] pub(crate) struct ProcessStore { - processes: HashMap, - reserved_process_ids: HashSet, + processes: HashMap, + reserved_process_ids: HashSet, } impl ProcessStore { - fn remove(&mut self, process_id: &str) -> Option { - self.reserved_process_ids.remove(process_id); - self.processes.remove(process_id) + fn remove(&mut self, process_id: i32) -> Option { + self.reserved_process_ids.remove(&process_id); + self.processes.remove(&process_id) } } @@ -144,7 +144,7 @@ impl Default for UnifiedExecProcessManager { struct ProcessEntry { process: Arc, call_id: String, - process_id: String, + process_id: i32, command: Vec, tty: bool, network_approval_id: Option, @@ -238,7 +238,7 @@ mod tests { async fn write_stdin( session: &Arc, - process_id: &str, + process_id: i32, input: &str, yield_time_ms: u64, ) -> Result { @@ -294,11 +294,7 @@ mod tests { let (session, turn) = test_session_and_turn().await; let open_shell = exec_command(&session, &turn, "bash -i", 2_500).await?; - let process_id = open_shell - .process_id - .as_ref() - .expect("expected process_id") - .as_str(); + let process_id = open_shell.process_id.expect("expected process_id"); write_stdin( &session, @@ -330,15 +326,11 @@ mod tests { let (session, turn) = test_session_and_turn().await; let shell_a = exec_command(&session, &turn, "bash -i", 2_500).await?; - let session_a = shell_a - .process_id - .as_ref() - .expect("expected process id") - .clone(); + let session_a = shell_a.process_id.expect("expected process id"); write_stdin( &session, - session_a.as_str(), + session_a, "export CODEX_INTERACTIVE_SHELL_VAR=codex\n", 2_500, ) @@ -358,11 +350,7 @@ mod tests { let out_3 = write_stdin( &session, - shell_a - .process_id - .as_ref() - .expect("expected process id") - .as_str(), + shell_a.process_id.expect("expected process id"), "echo $CODEX_INTERACTIVE_SHELL_VAR\n", 2_500, ) @@ -384,11 +372,7 @@ mod tests { let (session, turn) = test_session_and_turn().await; let open_shell = exec_command(&session, &turn, "bash -i", 2_500).await?; - let process_id = open_shell - .process_id - .as_ref() - .expect("expected process id") - .as_str(); + let process_id = open_shell.process_id.expect("expected process id"); write_stdin( &session, @@ -501,11 +485,7 @@ mod tests { let (session, turn) = test_session_and_turn().await; let open_shell = exec_command(&session, &turn, "bash -i", 2_500).await?; - let process_id = open_shell - .process_id - .as_ref() - .expect("expected process id") - .as_str(); + let process_id = open_shell.process_id.expect("expected process id"); write_stdin(&session, process_id, "exit\n", 2_500).await?; diff --git a/codex-rs/core/src/unified_exec/process_manager.rs b/codex-rs/core/src/unified_exec/process_manager.rs index f50da1f71..29311b1ff 100644 --- a/codex-rs/core/src/unified_exec/process_manager.rs +++ b/codex-rs/core/src/unified_exec/process_manager.rs @@ -98,41 +98,39 @@ struct PreparedProcessHandles { cancellation_token: CancellationToken, pause_state: Option>, command: Vec, - process_id: String, + process_id: i32, tty: bool, } impl UnifiedExecProcessManager { - pub(crate) async fn allocate_process_id(&self) -> String { + pub(crate) async fn allocate_process_id(&self) -> i32 { loop { let mut store = self.process_store.lock().await; let process_id = if should_use_deterministic_process_ids() { // test or deterministic mode - let next = store + store .reserved_process_ids .iter() - .filter_map(|s| s.parse::().ok()) + .copied() .max() .map(|m| std::cmp::max(m, 999) + 1) - .unwrap_or(1000); - - next.to_string() + .unwrap_or(1000) } else { // production mode → random - rand::rng().random_range(1_000..100_000).to_string() + rand::rng().random_range(1_000..100_000) }; if store.reserved_process_ids.contains(&process_id) { continue; } - store.reserved_process_ids.insert(process_id.clone()); + store.reserved_process_ids.insert(process_id); return process_id; } } - pub(crate) async fn release_process_id(&self, process_id: &str) { + pub(crate) async fn release_process_id(&self, process_id: i32) { let removed = { let mut store = self.process_store.lock().await; store.remove(process_id) @@ -172,7 +170,7 @@ impl UnifiedExecProcessManager { (Arc::new(process), deferred_network_approval) } Err(err) => { - self.release_process_id(&request.process_id).await; + self.release_process_id(request.process_id).await; return Err(err); } }; @@ -188,7 +186,7 @@ impl UnifiedExecProcessManager { &request.command, cwd.clone(), ExecCommandSource::UnifiedExecStartup, - Some(request.process_id.clone()), + Some(request.process_id.to_string()), ); emitter.emit(event_ctx, ToolEventStage::Begin).await; @@ -227,7 +225,7 @@ impl UnifiedExecProcessManager { let exit_code = process.exit_code(); let has_exited = process.has_exited() || exit_code.is_some(); let chunk_id = generate_chunk_id(); - let process_id = request.process_id.clone(); + let process_id = request.process_id; if has_exited { // Short‑lived command: emit ExecCommandEnd immediately using the @@ -240,7 +238,7 @@ impl UnifiedExecProcessManager { context.call_id.clone(), request.command.clone(), cwd.clone(), - Some(process_id), + Some(process_id.to_string()), Arc::clone(&transcript), text.clone(), exit, @@ -248,7 +246,7 @@ impl UnifiedExecProcessManager { ) .await; - self.release_process_id(&request.process_id).await; + self.release_process_id(request.process_id).await; finish_deferred_network_approval( context.session.as_ref(), deferred_network_approval.take(), @@ -287,7 +285,7 @@ impl UnifiedExecProcessManager { process_id: if has_exited { None } else { - Some(request.process_id.clone()) + Some(request.process_id) }, exit_code, original_token_count: Some(original_token_count), @@ -301,7 +299,7 @@ impl UnifiedExecProcessManager { &self, request: WriteStdinRequest<'_>, ) -> Result { - let process_id = request.process_id.to_string(); + let process_id = request.process_id; let PreparedProcessHandles { writer_tx, @@ -315,7 +313,7 @@ impl UnifiedExecProcessManager { process_id, tty, .. - } = self.prepare_process_handles(process_id.as_str()).await?; + } = self.prepare_process_handles(process_id).await?; if !request.input.is_empty() { if !tty { @@ -359,7 +357,7 @@ impl UnifiedExecProcessManager { // still alive or has exited and been removed from the store; we thread // that through so the handler can tag TerminalInteraction with an // appropriate process_id and exit_code. - let status = self.refresh_process_state(process_id.as_str()).await; + let status = self.refresh_process_state(process_id).await; let (process_id, exit_code, event_call_id) = match status { ProcessStatus::Alive { exit_code, @@ -372,7 +370,7 @@ impl UnifiedExecProcessManager { } ProcessStatus::Unknown => { return Err(UnifiedExecError::UnknownProcessId { - process_id: request.process_id.to_string(), + process_id: request.process_id, }); } }; @@ -392,18 +390,18 @@ impl UnifiedExecProcessManager { Ok(response) } - async fn refresh_process_state(&self, process_id: &str) -> ProcessStatus { + async fn refresh_process_state(&self, process_id: i32) -> ProcessStatus { let status = { let mut store = self.process_store.lock().await; - let Some(entry) = store.processes.get(process_id) else { + let Some(entry) = store.processes.get(&process_id) else { return ProcessStatus::Unknown; }; let exit_code = entry.process.exit_code(); - let process_id = entry.process_id.clone(); + let process_id = entry.process_id; if entry.process.has_exited() { - let Some(entry) = store.remove(&process_id) else { + let Some(entry) = store.remove(process_id) else { return ProcessStatus::Unknown; }; ProcessStatus::Exited { @@ -426,16 +424,13 @@ impl UnifiedExecProcessManager { async fn prepare_process_handles( &self, - process_id: &str, + process_id: i32, ) -> Result { let mut store = self.process_store.lock().await; - let entry = - store - .processes - .get_mut(process_id) - .ok_or(UnifiedExecError::UnknownProcessId { - process_id: process_id.to_string(), - })?; + let entry = store + .processes + .get_mut(&process_id) + .ok_or(UnifiedExecError::UnknownProcessId { process_id })?; entry.last_used = Instant::now(); let OutputHandles { output_buffer, @@ -458,7 +453,7 @@ impl UnifiedExecProcessManager { cancellation_token, pause_state, command: entry.command.clone(), - process_id: entry.process_id.clone(), + process_id: entry.process_id, tty: entry.tty, }) } @@ -481,7 +476,7 @@ impl UnifiedExecProcessManager { command: &[String], cwd: PathBuf, started_at: Instant, - process_id: String, + process_id: i32, tty: bool, network_approval_id: Option, transcript: Arc>, @@ -489,7 +484,7 @@ impl UnifiedExecProcessManager { let entry = ProcessEntry { process: Arc::clone(&process), call_id: context.call_id.clone(), - process_id: process_id.clone(), + process_id, command: command.to_vec(), tty, network_approval_id, @@ -499,7 +494,7 @@ impl UnifiedExecProcessManager { let (number_processes, pruned_entry) = { let mut store = self.process_store.lock().await; let pruned_entry = Self::prune_processes_if_needed(&mut store); - store.processes.insert(process_id.clone(), entry); + store.processes.insert(process_id, entry); (store.processes.len(), pruned_entry) }; // prune_processes_if_needed runs while holding process_store; do async @@ -526,7 +521,7 @@ impl UnifiedExecProcessManager { context.call_id.clone(), command.to_vec(), cwd, - process_id.clone(), + process_id, transcript, started_at, ); @@ -759,31 +754,31 @@ impl UnifiedExecProcessManager { return None; } - let meta: Vec<(String, Instant, bool)> = store + let meta: Vec<(i32, Instant, bool)> = store .processes .iter() - .map(|(id, entry)| (id.clone(), entry.last_used, entry.process.has_exited())) + .map(|(id, entry)| (*id, entry.last_used, entry.process.has_exited())) .collect(); if let Some(process_id) = Self::process_id_to_prune_from_meta(&meta) { - return store.remove(&process_id); + return store.remove(process_id); } None } // Centralized pruning policy so we can easily swap strategies later. - fn process_id_to_prune_from_meta(meta: &[(String, Instant, bool)]) -> Option { + fn process_id_to_prune_from_meta(meta: &[(i32, Instant, bool)]) -> Option { if meta.is_empty() { return None; } let mut by_recency = meta.to_vec(); by_recency.sort_by_key(|(_, last_used, _)| Reverse(*last_used)); - let protected: HashSet = by_recency + let protected: HashSet = by_recency .iter() .take(8) - .map(|(process_id, _, _)| process_id.clone()) + .map(|(process_id, _, _)| *process_id) .collect(); let mut lru = meta.to_vec(); @@ -793,7 +788,7 @@ impl UnifiedExecProcessManager { .iter() .find(|(process_id, _, exited)| !protected.contains(process_id) && *exited) { - return Some(process_id.clone()); + return Some(*process_id); } lru.into_iter() @@ -824,7 +819,7 @@ enum ProcessStatus { Alive { exit_code: Option, call_id: String, - process_id: String, + process_id: i32, }, Exited { exit_code: Option, @@ -874,67 +869,64 @@ mod tests { #[test] fn pruning_prefers_exited_processes_outside_recently_used() { let now = Instant::now(); - let id = |n: i32| n.to_string(); let meta = vec![ - (id(1), now - Duration::from_secs(40), false), - (id(2), now - Duration::from_secs(30), true), - (id(3), now - Duration::from_secs(20), false), - (id(4), now - Duration::from_secs(19), false), - (id(5), now - Duration::from_secs(18), false), - (id(6), now - Duration::from_secs(17), false), - (id(7), now - Duration::from_secs(16), false), - (id(8), now - Duration::from_secs(15), false), - (id(9), now - Duration::from_secs(14), false), - (id(10), now - Duration::from_secs(13), false), + (1, now - Duration::from_secs(40), false), + (2, now - Duration::from_secs(30), true), + (3, now - Duration::from_secs(20), false), + (4, now - Duration::from_secs(19), false), + (5, now - Duration::from_secs(18), false), + (6, now - Duration::from_secs(17), false), + (7, now - Duration::from_secs(16), false), + (8, now - Duration::from_secs(15), false), + (9, now - Duration::from_secs(14), false), + (10, now - Duration::from_secs(13), false), ]; let candidate = UnifiedExecProcessManager::process_id_to_prune_from_meta(&meta); - assert_eq!(candidate, Some(id(2))); + assert_eq!(candidate, Some(2)); } #[test] fn pruning_falls_back_to_lru_when_no_exited() { let now = Instant::now(); - let id = |n: i32| n.to_string(); let meta = vec![ - (id(1), now - Duration::from_secs(40), false), - (id(2), now - Duration::from_secs(30), false), - (id(3), now - Duration::from_secs(20), false), - (id(4), now - Duration::from_secs(19), false), - (id(5), now - Duration::from_secs(18), false), - (id(6), now - Duration::from_secs(17), false), - (id(7), now - Duration::from_secs(16), false), - (id(8), now - Duration::from_secs(15), false), - (id(9), now - Duration::from_secs(14), false), - (id(10), now - Duration::from_secs(13), false), + (1, now - Duration::from_secs(40), false), + (2, now - Duration::from_secs(30), false), + (3, now - Duration::from_secs(20), false), + (4, now - Duration::from_secs(19), false), + (5, now - Duration::from_secs(18), false), + (6, now - Duration::from_secs(17), false), + (7, now - Duration::from_secs(16), false), + (8, now - Duration::from_secs(15), false), + (9, now - Duration::from_secs(14), false), + (10, now - Duration::from_secs(13), false), ]; let candidate = UnifiedExecProcessManager::process_id_to_prune_from_meta(&meta); - assert_eq!(candidate, Some(id(1))); + assert_eq!(candidate, Some(1)); } #[test] fn pruning_protects_recent_processes_even_if_exited() { let now = Instant::now(); - let id = |n: i32| n.to_string(); let meta = vec![ - (id(1), now - Duration::from_secs(40), false), - (id(2), now - Duration::from_secs(30), false), - (id(3), now - Duration::from_secs(20), true), - (id(4), now - Duration::from_secs(19), false), - (id(5), now - Duration::from_secs(18), false), - (id(6), now - Duration::from_secs(17), false), - (id(7), now - Duration::from_secs(16), false), - (id(8), now - Duration::from_secs(15), false), - (id(9), now - Duration::from_secs(14), false), - (id(10), now - Duration::from_secs(13), true), + (1, now - Duration::from_secs(40), false), + (2, now - Duration::from_secs(30), false), + (3, now - Duration::from_secs(20), true), + (4, now - Duration::from_secs(19), false), + (5, now - Duration::from_secs(18), false), + (6, now - Duration::from_secs(17), false), + (7, now - Duration::from_secs(16), false), + (8, now - Duration::from_secs(15), false), + (9, now - Duration::from_secs(14), false), + (10, now - Duration::from_secs(13), true), ]; let candidate = UnifiedExecProcessManager::process_id_to_prune_from_meta(&meta); // (10) is exited but among the last 8; we should drop the LRU outside that set. - assert_eq!(candidate, Some(id(1))); + assert_eq!(candidate, Some(1)); } }