feat: add head-tail buffer for unified_exec (#8735)

This commit is contained in:
jif-oai 2026-01-06 15:48:44 +00:00 committed by GitHub
parent 06e21c7a65
commit 32db8ea5ca
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 338 additions and 114 deletions

View file

@ -7,6 +7,8 @@ use tokio::time::Duration;
use tokio::time::Instant;
use tokio::time::Sleep;
use super::UnifiedExecContext;
use super::session::UnifiedExecSession;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::exec::ExecToolCallOutput;
@ -19,10 +21,7 @@ use crate::protocol::ExecOutputStream;
use crate::tools::events::ToolEmitter;
use crate::tools::events::ToolEventCtx;
use crate::tools::events::ToolEventStage;
use super::CommandTranscript;
use super::UnifiedExecContext;
use super::session::UnifiedExecSession;
use crate::unified_exec::head_tail_buffer::HeadTailBuffer;
pub(crate) const TRAILING_OUTPUT_GRACE: Duration = Duration::from_millis(100);
@ -40,7 +39,7 @@ const UNIFIED_EXEC_OUTPUT_DELTA_MAX_BYTES: usize = 8192;
pub(crate) fn start_streaming_output(
session: &UnifiedExecSession,
context: &UnifiedExecContext,
transcript: Arc<Mutex<CommandTranscript>>,
transcript: Arc<Mutex<HeadTailBuffer>>,
) {
let mut receiver = session.output_receiver();
let output_drained = session.output_drained_notify();
@ -112,7 +111,7 @@ pub(crate) fn spawn_exit_watcher(
command: Vec<String>,
cwd: PathBuf,
process_id: String,
transcript: Arc<Mutex<CommandTranscript>>,
transcript: Arc<Mutex<HeadTailBuffer>>,
started_at: Instant,
) {
let exit_token = session.cancellation_token();
@ -142,7 +141,7 @@ pub(crate) fn spawn_exit_watcher(
async fn process_chunk(
pending: &mut Vec<u8>,
transcript: &Arc<Mutex<CommandTranscript>>,
transcript: &Arc<Mutex<HeadTailBuffer>>,
call_id: &str,
session_ref: &Arc<Session>,
turn_ref: &Arc<TurnContext>,
@ -153,7 +152,7 @@ async fn process_chunk(
while let Some(prefix) = split_valid_utf8_prefix(pending) {
{
let mut guard = transcript.lock().await;
guard.append(&prefix);
guard.push_chunk(prefix.to_vec());
}
if *emitted_deltas >= MAX_EXEC_OUTPUT_DELTAS_PER_CALL {
@ -183,7 +182,7 @@ pub(crate) async fn emit_exec_end_for_unified_exec(
command: Vec<String>,
cwd: PathBuf,
process_id: Option<String>,
transcript: Arc<Mutex<CommandTranscript>>,
transcript: Arc<Mutex<HeadTailBuffer>>,
fallback_output: String,
exit_code: i32,
duration: Duration,
@ -240,15 +239,15 @@ fn split_valid_utf8_prefix_with_max(buffer: &mut Vec<u8>, max_bytes: usize) -> O
}
async fn resolve_aggregated_output(
transcript: &Arc<Mutex<CommandTranscript>>,
transcript: &Arc<Mutex<HeadTailBuffer>>,
fallback: String,
) -> String {
let guard = transcript.lock().await;
if guard.data.is_empty() {
if guard.retained_bytes() == 0 {
return fallback;
}
String::from_utf8_lossy(&guard.data).to_string()
String::from_utf8_lossy(&guard.to_bytes()).to_string()
}
#[cfg(test)]

View file

@ -0,0 +1,272 @@
use crate::unified_exec::UNIFIED_EXEC_OUTPUT_MAX_BYTES;
use std::collections::VecDeque;
/// A capped buffer that preserves a stable prefix ("head") and suffix ("tail"),
/// dropping the middle once it exceeds the configured maximum. The buffer is
/// symmetric meaning 50% of the capacity is allocated to the head and 50% is
/// allocated to the tail.
#[derive(Debug)]
pub(crate) struct HeadTailBuffer {
max_bytes: usize,
head_budget: usize,
tail_budget: usize,
head: VecDeque<Vec<u8>>,
tail: VecDeque<Vec<u8>>,
head_bytes: usize,
tail_bytes: usize,
omitted_bytes: usize,
}
impl Default for HeadTailBuffer {
fn default() -> Self {
Self::new(UNIFIED_EXEC_OUTPUT_MAX_BYTES)
}
}
impl HeadTailBuffer {
/// Create a new buffer that retains at most `max_bytes` of output.
///
/// The retained output is split across a prefix ("head") and suffix ("tail")
/// budget, dropping bytes from the middle once the limit is exceeded.
pub(crate) fn new(max_bytes: usize) -> Self {
let head_budget = max_bytes / 2;
let tail_budget = max_bytes.saturating_sub(head_budget);
Self {
max_bytes,
head_budget,
tail_budget,
head: VecDeque::new(),
tail: VecDeque::new(),
head_bytes: 0,
tail_bytes: 0,
omitted_bytes: 0,
}
}
// Used for tests.
#[allow(dead_code)]
/// Total bytes currently retained by the buffer (head + tail).
pub(crate) fn retained_bytes(&self) -> usize {
self.head_bytes.saturating_add(self.tail_bytes)
}
// Used for tests.
#[allow(dead_code)]
/// Total bytes that were dropped from the middle due to the size cap.
pub(crate) fn omitted_bytes(&self) -> usize {
self.omitted_bytes
}
/// Append a chunk of bytes to the buffer.
///
/// Bytes are first added to the head until the head budget is full; any
/// remaining bytes are added to the tail, with older tail bytes being
/// dropped to preserve the tail budget.
pub(crate) fn push_chunk(&mut self, chunk: Vec<u8>) {
if self.max_bytes == 0 {
self.omitted_bytes = self.omitted_bytes.saturating_add(chunk.len());
return;
}
// Fill the head budget first, then keep a capped tail.
if self.head_bytes < self.head_budget {
let remaining_head = self.head_budget.saturating_sub(self.head_bytes);
if chunk.len() <= remaining_head {
self.head_bytes = self.head_bytes.saturating_add(chunk.len());
self.head.push_back(chunk);
return;
}
// Split the chunk: part goes to head, remainder goes to tail.
let (head_part, tail_part) = chunk.split_at(remaining_head);
if !head_part.is_empty() {
self.head_bytes = self.head_bytes.saturating_add(head_part.len());
self.head.push_back(head_part.to_vec());
}
self.push_to_tail(tail_part.to_vec());
return;
}
self.push_to_tail(chunk);
}
/// Snapshot the retained output as a list of chunks.
///
/// The returned chunks are ordered as: head chunks first, then tail chunks.
/// Omitted bytes are not represented in the snapshot.
pub(crate) fn snapshot_chunks(&self) -> Vec<Vec<u8>> {
let mut out = Vec::new();
out.extend(self.head.iter().cloned());
out.extend(self.tail.iter().cloned());
out
}
/// Return the retained output as a single byte vector.
///
/// The output is formed by concatenating head chunks, then tail chunks.
/// Omitted bytes are not represented in the returned value.
pub(crate) fn to_bytes(&self) -> Vec<u8> {
let mut out = Vec::with_capacity(self.retained_bytes());
for chunk in self.head.iter() {
out.extend_from_slice(chunk);
}
for chunk in self.tail.iter() {
out.extend_from_slice(chunk);
}
out
}
/// Drain all retained chunks from the buffer and reset its state.
///
/// The drained chunks are returned in head-then-tail order. Omitted bytes
/// are discarded along with the retained content.
pub(crate) fn drain_chunks(&mut self) -> Vec<Vec<u8>> {
let mut out: Vec<Vec<u8>> = self.head.drain(..).collect();
out.extend(self.tail.drain(..));
self.head_bytes = 0;
self.tail_bytes = 0;
self.omitted_bytes = 0;
out
}
fn push_to_tail(&mut self, chunk: Vec<u8>) {
if self.tail_budget == 0 {
self.omitted_bytes = self.omitted_bytes.saturating_add(chunk.len());
return;
}
if chunk.len() >= self.tail_budget {
// This single chunk is larger than the whole tail budget. Keep only the last
// tail_budget bytes and drop everything else.
let start = chunk.len().saturating_sub(self.tail_budget);
let kept = chunk[start..].to_vec();
let dropped = chunk.len().saturating_sub(kept.len());
self.omitted_bytes = self
.omitted_bytes
.saturating_add(self.tail_bytes)
.saturating_add(dropped);
self.tail.clear();
self.tail_bytes = kept.len();
self.tail.push_back(kept);
return;
}
self.tail_bytes = self.tail_bytes.saturating_add(chunk.len());
self.tail.push_back(chunk);
self.trim_tail_to_budget();
}
fn trim_tail_to_budget(&mut self) {
let mut excess = self.tail_bytes.saturating_sub(self.tail_budget);
while excess > 0 {
match self.tail.front_mut() {
Some(front) if excess >= front.len() => {
excess -= front.len();
self.tail_bytes = self.tail_bytes.saturating_sub(front.len());
self.omitted_bytes = self.omitted_bytes.saturating_add(front.len());
self.tail.pop_front();
}
Some(front) => {
front.drain(..excess);
self.tail_bytes = self.tail_bytes.saturating_sub(excess);
self.omitted_bytes = self.omitted_bytes.saturating_add(excess);
break;
}
None => break,
}
}
}
}
#[cfg(test)]
mod tests {
use super::HeadTailBuffer;
use pretty_assertions::assert_eq;
#[test]
fn keeps_prefix_and_suffix_when_over_budget() {
let mut buf = HeadTailBuffer::new(10);
buf.push_chunk(b"0123456789".to_vec());
assert_eq!(buf.omitted_bytes(), 0);
// Exceeds max by 2; we should keep head+tail and omit the middle.
buf.push_chunk(b"ab".to_vec());
assert!(buf.omitted_bytes() > 0);
let rendered = String::from_utf8_lossy(&buf.to_bytes()).to_string();
assert!(rendered.starts_with("01234"));
assert!(rendered.ends_with("89ab"));
}
#[test]
fn max_bytes_zero_drops_everything() {
let mut buf = HeadTailBuffer::new(0);
buf.push_chunk(b"abc".to_vec());
assert_eq!(buf.retained_bytes(), 0);
assert_eq!(buf.omitted_bytes(), 3);
assert_eq!(buf.to_bytes(), b"".to_vec());
assert_eq!(buf.snapshot_chunks(), Vec::<Vec<u8>>::new());
}
#[test]
fn head_budget_zero_keeps_only_last_byte_in_tail() {
let mut buf = HeadTailBuffer::new(1);
buf.push_chunk(b"abc".to_vec());
assert_eq!(buf.retained_bytes(), 1);
assert_eq!(buf.omitted_bytes(), 2);
assert_eq!(buf.to_bytes(), b"c".to_vec());
}
#[test]
fn draining_resets_state() {
let mut buf = HeadTailBuffer::new(10);
buf.push_chunk(b"0123456789".to_vec());
buf.push_chunk(b"ab".to_vec());
let drained = buf.drain_chunks();
assert!(!drained.is_empty());
assert_eq!(buf.retained_bytes(), 0);
assert_eq!(buf.omitted_bytes(), 0);
assert_eq!(buf.to_bytes(), b"".to_vec());
}
#[test]
fn chunk_larger_than_tail_budget_keeps_only_tail_end() {
let mut buf = HeadTailBuffer::new(10);
buf.push_chunk(b"0123456789".to_vec());
// Tail budget is 5 bytes. This chunk should replace the tail and keep only its last 5 bytes.
buf.push_chunk(b"ABCDEFGHIJK".to_vec());
let out = String::from_utf8_lossy(&buf.to_bytes()).to_string();
assert!(out.starts_with("01234"));
assert!(out.ends_with("GHIJK"));
assert!(buf.omitted_bytes() > 0);
}
#[test]
fn fills_head_then_tail_across_multiple_chunks() {
let mut buf = HeadTailBuffer::new(10);
// Fill the 5-byte head budget across multiple chunks.
buf.push_chunk(b"01".to_vec());
buf.push_chunk(b"234".to_vec());
assert_eq!(buf.to_bytes(), b"01234".to_vec());
// Then fill the 5-byte tail budget.
buf.push_chunk(b"567".to_vec());
buf.push_chunk(b"89".to_vec());
assert_eq!(buf.to_bytes(), b"0123456789".to_vec());
assert_eq!(buf.omitted_bytes(), 0);
// One more byte causes the tail to drop its oldest byte.
buf.push_chunk(b"a".to_vec());
assert_eq!(buf.to_bytes(), b"012346789a".to_vec());
assert_eq!(buf.omitted_bytes(), 1);
}
}

View file

@ -37,6 +37,7 @@ use crate::sandboxing::SandboxPermissions;
mod async_watcher;
mod errors;
mod head_tail_buffer;
mod session;
mod session_manager;
@ -53,24 +54,6 @@ pub(crate) const MAX_UNIFIED_EXEC_SESSIONS: usize = 64;
// Send a warning message to the models when it reaches this number of sessions.
pub(crate) const WARNING_UNIFIED_EXEC_SESSIONS: usize = 60;
#[derive(Debug, Default)]
pub(crate) struct CommandTranscript {
pub data: Vec<u8>,
}
impl CommandTranscript {
pub fn append(&mut self, bytes: &[u8]) {
self.data.extend_from_slice(bytes);
if self.data.len() > UNIFIED_EXEC_OUTPUT_MAX_BYTES {
let excess = self
.data
.len()
.saturating_sub(UNIFIED_EXEC_OUTPUT_MAX_BYTES);
self.data.drain(..excess);
}
}
}
pub(crate) struct UnifiedExecContext {
pub session: Arc<Session>,
pub turn: Arc<TurnContext>,
@ -173,6 +156,7 @@ pub(crate) fn generate_chunk_id() -> String {
#[cfg(test)]
#[cfg(unix)]
mod tests {
use super::head_tail_buffer::HeadTailBuffer;
use super::*;
use crate::codex::Session;
use crate::codex::TurnContext;
@ -185,8 +169,6 @@ mod tests {
use std::sync::Arc;
use tokio::time::Duration;
use super::session::OutputBufferState;
async fn test_session_and_turn() -> (Arc<Session>, Arc<TurnContext>) {
let (session, mut turn) = make_session_and_context().await;
turn.approval_policy = AskForApproval::Never;
@ -245,21 +227,36 @@ mod tests {
}
#[test]
fn push_chunk_trims_only_excess_bytes() {
let mut buffer = OutputBufferState::default();
fn push_chunk_preserves_prefix_and_suffix() {
let mut buffer = HeadTailBuffer::default();
buffer.push_chunk(vec![b'a'; UNIFIED_EXEC_OUTPUT_MAX_BYTES]);
buffer.push_chunk(vec![b'b']);
buffer.push_chunk(vec![b'c']);
assert_eq!(buffer.total_bytes, UNIFIED_EXEC_OUTPUT_MAX_BYTES);
let snapshot = buffer.snapshot();
assert_eq!(snapshot.len(), 3);
assert_eq!(buffer.retained_bytes(), UNIFIED_EXEC_OUTPUT_MAX_BYTES);
let snapshot = buffer.snapshot_chunks();
let first = snapshot.first().expect("expected at least one chunk");
assert_eq!(first.first(), Some(&b'a'));
assert!(snapshot.iter().any(|chunk| chunk.as_slice() == b"b"));
assert_eq!(
snapshot.first().unwrap().len(),
UNIFIED_EXEC_OUTPUT_MAX_BYTES - 2
snapshot
.last()
.expect("expected at least one chunk")
.as_slice(),
b"c"
);
assert_eq!(snapshot.get(2).unwrap(), &vec![b'c']);
assert_eq!(snapshot.get(1).unwrap(), &vec![b'b']);
}
#[test]
fn head_tail_buffer_default_preserves_prefix_and_suffix() {
let mut buffer = HeadTailBuffer::default();
buffer.push_chunk(vec![b'a'; UNIFIED_EXEC_OUTPUT_MAX_BYTES]);
buffer.push_chunk(b"bc".to_vec());
let rendered = buffer.to_bytes();
assert_eq!(rendered.first(), Some(&b'a'));
assert!(rendered.ends_with(b"bc"));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]

View file

@ -1,6 +1,5 @@
#![allow(clippy::module_inception)]
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::Notify;
@ -19,54 +18,11 @@ use crate::truncate::formatted_truncate_text;
use codex_utils_pty::ExecCommandSession;
use codex_utils_pty::SpawnedPty;
use super::UNIFIED_EXEC_OUTPUT_MAX_BYTES;
use super::UNIFIED_EXEC_OUTPUT_MAX_TOKENS;
use super::UnifiedExecError;
use super::head_tail_buffer::HeadTailBuffer;
#[derive(Debug, Default)]
pub(crate) struct OutputBufferState {
chunks: VecDeque<Vec<u8>>,
pub(crate) total_bytes: usize,
}
impl OutputBufferState {
pub(super) fn push_chunk(&mut self, chunk: Vec<u8>) {
self.total_bytes = self.total_bytes.saturating_add(chunk.len());
self.chunks.push_back(chunk);
let mut excess = self
.total_bytes
.saturating_sub(UNIFIED_EXEC_OUTPUT_MAX_BYTES);
while excess > 0 {
match self.chunks.front_mut() {
Some(front) if excess >= front.len() => {
excess -= front.len();
self.total_bytes = self.total_bytes.saturating_sub(front.len());
self.chunks.pop_front();
}
Some(front) => {
front.drain(..excess);
self.total_bytes = self.total_bytes.saturating_sub(excess);
break;
}
None => break,
}
}
}
pub(super) fn drain(&mut self) -> Vec<Vec<u8>> {
let drained: Vec<Vec<u8>> = self.chunks.drain(..).collect();
self.total_bytes = 0;
drained
}
pub(super) fn snapshot(&self) -> Vec<Vec<u8>> {
self.chunks.iter().cloned().collect()
}
}
pub(crate) type OutputBuffer = Arc<Mutex<OutputBufferState>>;
pub(crate) type OutputBuffer = Arc<Mutex<HeadTailBuffer>>;
pub(crate) struct OutputHandles {
pub(crate) output_buffer: OutputBuffer,
pub(crate) output_notify: Arc<Notify>,
@ -90,7 +46,7 @@ impl UnifiedExecSession {
initial_output_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
sandbox_type: SandboxType,
) -> Self {
let output_buffer = Arc::new(Mutex::new(OutputBufferState::default()));
let output_buffer = Arc::new(Mutex::new(HeadTailBuffer::default()));
let output_notify = Arc::new(Notify::new());
let cancellation_token = CancellationToken::new();
let output_drained = Arc::new(Notify::new());
@ -163,7 +119,7 @@ impl UnifiedExecSession {
async fn snapshot_output(&self) -> Vec<Vec<u8>> {
let guard = self.output_buffer.lock().await;
guard.snapshot()
guard.snapshot_chunks()
}
pub(crate) fn sandbox_type(&self) -> SandboxType {

View file

@ -29,27 +29,26 @@ use crate::tools::sandboxing::ToolCtx;
use crate::truncate::TruncationPolicy;
use crate::truncate::approx_token_count;
use crate::truncate::formatted_truncate_text;
use super::CommandTranscript;
use super::ExecCommandRequest;
use super::MAX_UNIFIED_EXEC_SESSIONS;
use super::SessionEntry;
use super::SessionStore;
use super::UnifiedExecContext;
use super::UnifiedExecError;
use super::UnifiedExecResponse;
use super::UnifiedExecSessionManager;
use super::WARNING_UNIFIED_EXEC_SESSIONS;
use super::WriteStdinRequest;
use super::async_watcher::emit_exec_end_for_unified_exec;
use super::async_watcher::spawn_exit_watcher;
use super::async_watcher::start_streaming_output;
use super::clamp_yield_time;
use super::generate_chunk_id;
use super::resolve_max_tokens;
use super::session::OutputBuffer;
use super::session::OutputHandles;
use super::session::UnifiedExecSession;
use crate::unified_exec::ExecCommandRequest;
use crate::unified_exec::MAX_UNIFIED_EXEC_SESSIONS;
use crate::unified_exec::SessionEntry;
use crate::unified_exec::SessionStore;
use crate::unified_exec::UnifiedExecContext;
use crate::unified_exec::UnifiedExecError;
use crate::unified_exec::UnifiedExecResponse;
use crate::unified_exec::UnifiedExecSessionManager;
use crate::unified_exec::WARNING_UNIFIED_EXEC_SESSIONS;
use crate::unified_exec::WriteStdinRequest;
use crate::unified_exec::async_watcher::emit_exec_end_for_unified_exec;
use crate::unified_exec::async_watcher::spawn_exit_watcher;
use crate::unified_exec::async_watcher::start_streaming_output;
use crate::unified_exec::clamp_yield_time;
use crate::unified_exec::generate_chunk_id;
use crate::unified_exec::head_tail_buffer::HeadTailBuffer;
use crate::unified_exec::resolve_max_tokens;
use crate::unified_exec::session::OutputBuffer;
use crate::unified_exec::session::OutputHandles;
use crate::unified_exec::session::UnifiedExecSession;
const UNIFIED_EXEC_ENV: [(&str, &str); 9] = [
("NO_COLOR", "1"),
@ -144,7 +143,7 @@ impl UnifiedExecSessionManager {
}
};
let transcript = Arc::new(tokio::sync::Mutex::new(CommandTranscript::default()));
let transcript = Arc::new(tokio::sync::Mutex::new(HeadTailBuffer::default()));
let event_ctx = ToolEventCtx::new(
context.session.as_ref(),
context.turn.as_ref(),
@ -158,6 +157,7 @@ impl UnifiedExecSessionManager {
Some(request.process_id.clone()),
);
emitter.emit(event_ctx, ToolEventStage::Begin).await;
start_streaming_output(&session, context, Arc::clone(&transcript));
let max_tokens = resolve_max_tokens(request.max_output_tokens);
@ -408,7 +408,7 @@ impl UnifiedExecSessionManager {
cwd: PathBuf,
started_at: Instant,
process_id: String,
transcript: Arc<tokio::sync::Mutex<CommandTranscript>>,
transcript: Arc<tokio::sync::Mutex<HeadTailBuffer>>,
) {
let entry = SessionEntry {
session: Arc::clone(&session),
@ -550,11 +550,11 @@ impl UnifiedExecSessionManager {
let mut collected: Vec<u8> = Vec::with_capacity(4096);
let mut exit_signal_received = cancellation_token.is_cancelled();
loop {
let drained_chunks;
let drained_chunks: Vec<Vec<u8>>;
let mut wait_for_output = None;
{
let mut guard = output_buffer.lock().await;
drained_chunks = guard.drain();
drained_chunks = guard.drain_chunks();
if drained_chunks.is_empty() {
wait_for_output = Some(output_notify.notified());
}