Make unified exec session_id numeric (#14279)

It's a number on the write_stdin input, make it a number on the output
and also internally.
This commit is contained in:
pakrym-oai 2026-03-10 18:38:39 -07:00 committed by Michael Bolin
parent 01792a4c61
commit 31bf1dbe63
7 changed files with 99 additions and 127 deletions

View file

@ -159,7 +159,7 @@ pub struct ExecCommandToolOutput {
/// Raw bytes returned for this unified exec call before any truncation.
pub raw_output: Vec<u8>,
pub max_output_tokens: Option<usize>,
pub process_id: Option<String>,
pub process_id: Option<i32>,
pub exit_code: Option<i32>,
pub original_token_count: Option<usize>,
pub session_command: Option<Vec<String>>,
@ -194,7 +194,7 @@ impl ToolOutput for ExecCommandToolOutput {
#[serde(skip_serializing_if = "Option::is_none")]
exit_code: Option<i32>,
#[serde(skip_serializing_if = "Option::is_none")]
session_id: Option<String>,
session_id: Option<i32>,
#[serde(skip_serializing_if = "Option::is_none")]
original_token_count: Option<usize>,
output: String,
@ -204,7 +204,7 @@ impl ToolOutput for ExecCommandToolOutput {
chunk_id: (!self.chunk_id.is_empty()).then(|| self.chunk_id.clone()),
wall_time_seconds: self.wall_time.as_secs_f64(),
exit_code: self.exit_code,
session_id: self.process_id.clone(),
session_id: self.process_id,
original_token_count: self.original_token_count,
output: self.truncated_output(),
};

View file

@ -190,7 +190,7 @@ impl ToolHandler for UnifiedExecHandler {
)
{
let approval_policy = context.turn.approval_policy.value();
manager.release_process_id(&process_id).await;
manager.release_process_id(process_id).await;
return Err(FunctionCallError::RespondToModel(format!(
"approval policy is {approval_policy:?}; reject command — you cannot ask for escalated permissions if the approval policy is {approval_policy:?}"
)));
@ -211,7 +211,7 @@ impl ToolHandler for UnifiedExecHandler {
) {
Ok(normalized) => normalized,
Err(err) => {
manager.release_process_id(&process_id).await;
manager.release_process_id(process_id).await;
return Err(FunctionCallError::RespondToModel(err));
}
};
@ -228,7 +228,7 @@ impl ToolHandler for UnifiedExecHandler {
)
.await?
{
manager.release_process_id(&process_id).await;
manager.release_process_id(process_id).await;
return Ok(ExecCommandToolOutput {
event_call_id: String::new(),
chunk_id: String::new(),
@ -271,7 +271,7 @@ impl ToolHandler for UnifiedExecHandler {
let args: WriteStdinArgs = parse_arguments(&arguments)?;
let response = manager
.write_stdin(WriteStdinRequest {
process_id: &args.session_id.to_string(),
process_id: args.session_id,
input: &args.chars,
yield_time_ms: args.yield_time_ms,
max_output_tokens: args.max_output_tokens,

View file

@ -59,7 +59,7 @@ fn unified_exec_output_schema() -> JsonValue {
"description": "Process exit code when the command finished during this call."
},
"session_id": {
"type": "string",
"type": "number",
"description": "Session identifier to pass to write_stdin when the process is still running."
},
"original_token_count": {

View file

@ -110,7 +110,7 @@ pub(crate) fn spawn_exit_watcher(
call_id: String,
command: Vec<String>,
cwd: PathBuf,
process_id: String,
process_id: i32,
transcript: Arc<Mutex<HeadTailBuffer>>,
started_at: Instant,
) {
@ -129,7 +129,7 @@ pub(crate) fn spawn_exit_watcher(
call_id,
command,
cwd,
Some(process_id),
Some(process_id.to_string()),
transcript,
String::new(),
exit_code,

View file

@ -7,7 +7,7 @@ pub(crate) enum UnifiedExecError {
CreateProcess { message: String },
// The model is trained on `session_id`, but internally we track a `process_id`.
#[error("Unknown process id {process_id}")]
UnknownProcessId { process_id: String },
UnknownProcessId { process_id: i32 },
#[error("failed to write to stdin")]
WriteToStdin,
#[error(

View file

@ -86,7 +86,7 @@ impl UnifiedExecContext {
#[derive(Debug)]
pub(crate) struct ExecCommandRequest {
pub command: Vec<String>,
pub process_id: String,
pub process_id: i32,
pub yield_time_ms: u64,
pub max_output_tokens: Option<usize>,
pub workdir: Option<PathBuf>,
@ -101,7 +101,7 @@ pub(crate) struct ExecCommandRequest {
#[derive(Debug)]
pub(crate) struct WriteStdinRequest<'a> {
pub process_id: &'a str,
pub process_id: i32,
pub input: &'a str,
pub yield_time_ms: u64,
pub max_output_tokens: Option<usize>,
@ -109,14 +109,14 @@ pub(crate) struct WriteStdinRequest<'a> {
#[derive(Default)]
pub(crate) struct ProcessStore {
processes: HashMap<String, ProcessEntry>,
reserved_process_ids: HashSet<String>,
processes: HashMap<i32, ProcessEntry>,
reserved_process_ids: HashSet<i32>,
}
impl ProcessStore {
fn remove(&mut self, process_id: &str) -> Option<ProcessEntry> {
self.reserved_process_ids.remove(process_id);
self.processes.remove(process_id)
fn remove(&mut self, process_id: i32) -> Option<ProcessEntry> {
self.reserved_process_ids.remove(&process_id);
self.processes.remove(&process_id)
}
}
@ -144,7 +144,7 @@ impl Default for UnifiedExecProcessManager {
struct ProcessEntry {
process: Arc<UnifiedExecProcess>,
call_id: String,
process_id: String,
process_id: i32,
command: Vec<String>,
tty: bool,
network_approval_id: Option<String>,
@ -238,7 +238,7 @@ mod tests {
async fn write_stdin(
session: &Arc<Session>,
process_id: &str,
process_id: i32,
input: &str,
yield_time_ms: u64,
) -> Result<ExecCommandToolOutput, UnifiedExecError> {
@ -294,11 +294,7 @@ mod tests {
let (session, turn) = test_session_and_turn().await;
let open_shell = exec_command(&session, &turn, "bash -i", 2_500).await?;
let process_id = open_shell
.process_id
.as_ref()
.expect("expected process_id")
.as_str();
let process_id = open_shell.process_id.expect("expected process_id");
write_stdin(
&session,
@ -330,15 +326,11 @@ mod tests {
let (session, turn) = test_session_and_turn().await;
let shell_a = exec_command(&session, &turn, "bash -i", 2_500).await?;
let session_a = shell_a
.process_id
.as_ref()
.expect("expected process id")
.clone();
let session_a = shell_a.process_id.expect("expected process id");
write_stdin(
&session,
session_a.as_str(),
session_a,
"export CODEX_INTERACTIVE_SHELL_VAR=codex\n",
2_500,
)
@ -358,11 +350,7 @@ mod tests {
let out_3 = write_stdin(
&session,
shell_a
.process_id
.as_ref()
.expect("expected process id")
.as_str(),
shell_a.process_id.expect("expected process id"),
"echo $CODEX_INTERACTIVE_SHELL_VAR\n",
2_500,
)
@ -384,11 +372,7 @@ mod tests {
let (session, turn) = test_session_and_turn().await;
let open_shell = exec_command(&session, &turn, "bash -i", 2_500).await?;
let process_id = open_shell
.process_id
.as_ref()
.expect("expected process id")
.as_str();
let process_id = open_shell.process_id.expect("expected process id");
write_stdin(
&session,
@ -501,11 +485,7 @@ mod tests {
let (session, turn) = test_session_and_turn().await;
let open_shell = exec_command(&session, &turn, "bash -i", 2_500).await?;
let process_id = open_shell
.process_id
.as_ref()
.expect("expected process id")
.as_str();
let process_id = open_shell.process_id.expect("expected process id");
write_stdin(&session, process_id, "exit\n", 2_500).await?;

View file

@ -98,41 +98,39 @@ struct PreparedProcessHandles {
cancellation_token: CancellationToken,
pause_state: Option<watch::Receiver<bool>>,
command: Vec<String>,
process_id: String,
process_id: i32,
tty: bool,
}
impl UnifiedExecProcessManager {
pub(crate) async fn allocate_process_id(&self) -> String {
pub(crate) async fn allocate_process_id(&self) -> i32 {
loop {
let mut store = self.process_store.lock().await;
let process_id = if should_use_deterministic_process_ids() {
// test or deterministic mode
let next = store
store
.reserved_process_ids
.iter()
.filter_map(|s| s.parse::<i32>().ok())
.copied()
.max()
.map(|m| std::cmp::max(m, 999) + 1)
.unwrap_or(1000);
next.to_string()
.unwrap_or(1000)
} else {
// production mode → random
rand::rng().random_range(1_000..100_000).to_string()
rand::rng().random_range(1_000..100_000)
};
if store.reserved_process_ids.contains(&process_id) {
continue;
}
store.reserved_process_ids.insert(process_id.clone());
store.reserved_process_ids.insert(process_id);
return process_id;
}
}
pub(crate) async fn release_process_id(&self, process_id: &str) {
pub(crate) async fn release_process_id(&self, process_id: i32) {
let removed = {
let mut store = self.process_store.lock().await;
store.remove(process_id)
@ -172,7 +170,7 @@ impl UnifiedExecProcessManager {
(Arc::new(process), deferred_network_approval)
}
Err(err) => {
self.release_process_id(&request.process_id).await;
self.release_process_id(request.process_id).await;
return Err(err);
}
};
@ -188,7 +186,7 @@ impl UnifiedExecProcessManager {
&request.command,
cwd.clone(),
ExecCommandSource::UnifiedExecStartup,
Some(request.process_id.clone()),
Some(request.process_id.to_string()),
);
emitter.emit(event_ctx, ToolEventStage::Begin).await;
@ -227,7 +225,7 @@ impl UnifiedExecProcessManager {
let exit_code = process.exit_code();
let has_exited = process.has_exited() || exit_code.is_some();
let chunk_id = generate_chunk_id();
let process_id = request.process_id.clone();
let process_id = request.process_id;
if has_exited {
// Shortlived command: emit ExecCommandEnd immediately using the
@ -240,7 +238,7 @@ impl UnifiedExecProcessManager {
context.call_id.clone(),
request.command.clone(),
cwd.clone(),
Some(process_id),
Some(process_id.to_string()),
Arc::clone(&transcript),
text.clone(),
exit,
@ -248,7 +246,7 @@ impl UnifiedExecProcessManager {
)
.await;
self.release_process_id(&request.process_id).await;
self.release_process_id(request.process_id).await;
finish_deferred_network_approval(
context.session.as_ref(),
deferred_network_approval.take(),
@ -287,7 +285,7 @@ impl UnifiedExecProcessManager {
process_id: if has_exited {
None
} else {
Some(request.process_id.clone())
Some(request.process_id)
},
exit_code,
original_token_count: Some(original_token_count),
@ -301,7 +299,7 @@ impl UnifiedExecProcessManager {
&self,
request: WriteStdinRequest<'_>,
) -> Result<ExecCommandToolOutput, UnifiedExecError> {
let process_id = request.process_id.to_string();
let process_id = request.process_id;
let PreparedProcessHandles {
writer_tx,
@ -315,7 +313,7 @@ impl UnifiedExecProcessManager {
process_id,
tty,
..
} = self.prepare_process_handles(process_id.as_str()).await?;
} = self.prepare_process_handles(process_id).await?;
if !request.input.is_empty() {
if !tty {
@ -359,7 +357,7 @@ impl UnifiedExecProcessManager {
// still alive or has exited and been removed from the store; we thread
// that through so the handler can tag TerminalInteraction with an
// appropriate process_id and exit_code.
let status = self.refresh_process_state(process_id.as_str()).await;
let status = self.refresh_process_state(process_id).await;
let (process_id, exit_code, event_call_id) = match status {
ProcessStatus::Alive {
exit_code,
@ -372,7 +370,7 @@ impl UnifiedExecProcessManager {
}
ProcessStatus::Unknown => {
return Err(UnifiedExecError::UnknownProcessId {
process_id: request.process_id.to_string(),
process_id: request.process_id,
});
}
};
@ -392,18 +390,18 @@ impl UnifiedExecProcessManager {
Ok(response)
}
async fn refresh_process_state(&self, process_id: &str) -> ProcessStatus {
async fn refresh_process_state(&self, process_id: i32) -> ProcessStatus {
let status = {
let mut store = self.process_store.lock().await;
let Some(entry) = store.processes.get(process_id) else {
let Some(entry) = store.processes.get(&process_id) else {
return ProcessStatus::Unknown;
};
let exit_code = entry.process.exit_code();
let process_id = entry.process_id.clone();
let process_id = entry.process_id;
if entry.process.has_exited() {
let Some(entry) = store.remove(&process_id) else {
let Some(entry) = store.remove(process_id) else {
return ProcessStatus::Unknown;
};
ProcessStatus::Exited {
@ -426,16 +424,13 @@ impl UnifiedExecProcessManager {
async fn prepare_process_handles(
&self,
process_id: &str,
process_id: i32,
) -> Result<PreparedProcessHandles, UnifiedExecError> {
let mut store = self.process_store.lock().await;
let entry =
store
.processes
.get_mut(process_id)
.ok_or(UnifiedExecError::UnknownProcessId {
process_id: process_id.to_string(),
})?;
let entry = store
.processes
.get_mut(&process_id)
.ok_or(UnifiedExecError::UnknownProcessId { process_id })?;
entry.last_used = Instant::now();
let OutputHandles {
output_buffer,
@ -458,7 +453,7 @@ impl UnifiedExecProcessManager {
cancellation_token,
pause_state,
command: entry.command.clone(),
process_id: entry.process_id.clone(),
process_id: entry.process_id,
tty: entry.tty,
})
}
@ -481,7 +476,7 @@ impl UnifiedExecProcessManager {
command: &[String],
cwd: PathBuf,
started_at: Instant,
process_id: String,
process_id: i32,
tty: bool,
network_approval_id: Option<String>,
transcript: Arc<tokio::sync::Mutex<HeadTailBuffer>>,
@ -489,7 +484,7 @@ impl UnifiedExecProcessManager {
let entry = ProcessEntry {
process: Arc::clone(&process),
call_id: context.call_id.clone(),
process_id: process_id.clone(),
process_id,
command: command.to_vec(),
tty,
network_approval_id,
@ -499,7 +494,7 @@ impl UnifiedExecProcessManager {
let (number_processes, pruned_entry) = {
let mut store = self.process_store.lock().await;
let pruned_entry = Self::prune_processes_if_needed(&mut store);
store.processes.insert(process_id.clone(), entry);
store.processes.insert(process_id, entry);
(store.processes.len(), pruned_entry)
};
// prune_processes_if_needed runs while holding process_store; do async
@ -526,7 +521,7 @@ impl UnifiedExecProcessManager {
context.call_id.clone(),
command.to_vec(),
cwd,
process_id.clone(),
process_id,
transcript,
started_at,
);
@ -759,31 +754,31 @@ impl UnifiedExecProcessManager {
return None;
}
let meta: Vec<(String, Instant, bool)> = store
let meta: Vec<(i32, Instant, bool)> = store
.processes
.iter()
.map(|(id, entry)| (id.clone(), entry.last_used, entry.process.has_exited()))
.map(|(id, entry)| (*id, entry.last_used, entry.process.has_exited()))
.collect();
if let Some(process_id) = Self::process_id_to_prune_from_meta(&meta) {
return store.remove(&process_id);
return store.remove(process_id);
}
None
}
// Centralized pruning policy so we can easily swap strategies later.
fn process_id_to_prune_from_meta(meta: &[(String, Instant, bool)]) -> Option<String> {
fn process_id_to_prune_from_meta(meta: &[(i32, Instant, bool)]) -> Option<i32> {
if meta.is_empty() {
return None;
}
let mut by_recency = meta.to_vec();
by_recency.sort_by_key(|(_, last_used, _)| Reverse(*last_used));
let protected: HashSet<String> = by_recency
let protected: HashSet<i32> = by_recency
.iter()
.take(8)
.map(|(process_id, _, _)| process_id.clone())
.map(|(process_id, _, _)| *process_id)
.collect();
let mut lru = meta.to_vec();
@ -793,7 +788,7 @@ impl UnifiedExecProcessManager {
.iter()
.find(|(process_id, _, exited)| !protected.contains(process_id) && *exited)
{
return Some(process_id.clone());
return Some(*process_id);
}
lru.into_iter()
@ -824,7 +819,7 @@ enum ProcessStatus {
Alive {
exit_code: Option<i32>,
call_id: String,
process_id: String,
process_id: i32,
},
Exited {
exit_code: Option<i32>,
@ -874,67 +869,64 @@ mod tests {
#[test]
fn pruning_prefers_exited_processes_outside_recently_used() {
let now = Instant::now();
let id = |n: i32| n.to_string();
let meta = vec![
(id(1), now - Duration::from_secs(40), false),
(id(2), now - Duration::from_secs(30), true),
(id(3), now - Duration::from_secs(20), false),
(id(4), now - Duration::from_secs(19), false),
(id(5), now - Duration::from_secs(18), false),
(id(6), now - Duration::from_secs(17), false),
(id(7), now - Duration::from_secs(16), false),
(id(8), now - Duration::from_secs(15), false),
(id(9), now - Duration::from_secs(14), false),
(id(10), now - Duration::from_secs(13), false),
(1, now - Duration::from_secs(40), false),
(2, now - Duration::from_secs(30), true),
(3, now - Duration::from_secs(20), false),
(4, now - Duration::from_secs(19), false),
(5, now - Duration::from_secs(18), false),
(6, now - Duration::from_secs(17), false),
(7, now - Duration::from_secs(16), false),
(8, now - Duration::from_secs(15), false),
(9, now - Duration::from_secs(14), false),
(10, now - Duration::from_secs(13), false),
];
let candidate = UnifiedExecProcessManager::process_id_to_prune_from_meta(&meta);
assert_eq!(candidate, Some(id(2)));
assert_eq!(candidate, Some(2));
}
#[test]
fn pruning_falls_back_to_lru_when_no_exited() {
let now = Instant::now();
let id = |n: i32| n.to_string();
let meta = vec![
(id(1), now - Duration::from_secs(40), false),
(id(2), now - Duration::from_secs(30), false),
(id(3), now - Duration::from_secs(20), false),
(id(4), now - Duration::from_secs(19), false),
(id(5), now - Duration::from_secs(18), false),
(id(6), now - Duration::from_secs(17), false),
(id(7), now - Duration::from_secs(16), false),
(id(8), now - Duration::from_secs(15), false),
(id(9), now - Duration::from_secs(14), false),
(id(10), now - Duration::from_secs(13), false),
(1, now - Duration::from_secs(40), false),
(2, now - Duration::from_secs(30), false),
(3, now - Duration::from_secs(20), false),
(4, now - Duration::from_secs(19), false),
(5, now - Duration::from_secs(18), false),
(6, now - Duration::from_secs(17), false),
(7, now - Duration::from_secs(16), false),
(8, now - Duration::from_secs(15), false),
(9, now - Duration::from_secs(14), false),
(10, now - Duration::from_secs(13), false),
];
let candidate = UnifiedExecProcessManager::process_id_to_prune_from_meta(&meta);
assert_eq!(candidate, Some(id(1)));
assert_eq!(candidate, Some(1));
}
#[test]
fn pruning_protects_recent_processes_even_if_exited() {
let now = Instant::now();
let id = |n: i32| n.to_string();
let meta = vec![
(id(1), now - Duration::from_secs(40), false),
(id(2), now - Duration::from_secs(30), false),
(id(3), now - Duration::from_secs(20), true),
(id(4), now - Duration::from_secs(19), false),
(id(5), now - Duration::from_secs(18), false),
(id(6), now - Duration::from_secs(17), false),
(id(7), now - Duration::from_secs(16), false),
(id(8), now - Duration::from_secs(15), false),
(id(9), now - Duration::from_secs(14), false),
(id(10), now - Duration::from_secs(13), true),
(1, now - Duration::from_secs(40), false),
(2, now - Duration::from_secs(30), false),
(3, now - Duration::from_secs(20), true),
(4, now - Duration::from_secs(19), false),
(5, now - Duration::from_secs(18), false),
(6, now - Duration::from_secs(17), false),
(7, now - Duration::from_secs(16), false),
(8, now - Duration::from_secs(15), false),
(9, now - Duration::from_secs(14), false),
(10, now - Duration::from_secs(13), true),
];
let candidate = UnifiedExecProcessManager::process_id_to_prune_from_meta(&meta);
// (10) is exited but among the last 8; we should drop the LRU outside that set.
assert_eq!(candidate, Some(id(1)));
assert_eq!(candidate, Some(1));
}
}