From 58e1e570faf0a2cb888acdb18df720f149b5006a Mon Sep 17 00:00:00 2001 From: Josh McKinney Date: Tue, 2 Dec 2025 15:19:27 -0800 Subject: [PATCH] refactor: tui.rs extract several pieces (#7461) Pull FrameRequester out of tui.rs into its own module and make a FrameScheduler struct. This is effectively an Actor/Handler approach (see https://ryhl.io/blog/actors-with-tokio/). Adds tests and docs. Small refactor of pending_viewport_area logic. --- codex-rs/Cargo.lock | 2 + codex-rs/tui/Cargo.toml | 1 + codex-rs/tui/src/tui.rs | 131 +++---------- codex-rs/tui/src/tui/frame_requester.rs | 249 ++++++++++++++++++++++++ 4 files changed, 283 insertions(+), 100 deletions(-) create mode 100644 codex-rs/tui/src/tui/frame_requester.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index e68034eff..b9fcc969b 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1615,6 +1615,7 @@ dependencies = [ "textwrap 0.16.2", "tokio", "tokio-stream", + "tokio-util", "toml", "tracing", "tracing-appender", @@ -6600,6 +6601,7 @@ dependencies = [ "futures-sink", "futures-util", "pin-project-lite", + "slab", "tokio", ] diff --git a/codex-rs/tui/Cargo.toml b/codex-rs/tui/Cargo.toml index d9906b2f0..be4f5aead 100644 --- a/codex-rs/tui/Cargo.toml +++ b/codex-rs/tui/Cargo.toml @@ -91,6 +91,7 @@ unicode-width = { workspace = true } url = { workspace = true } codex-windows-sandbox = { workspace = true } +tokio-util = { workspace = true, features = ["time"] } [target.'cfg(unix)'.dependencies] libc = { workspace = true } diff --git a/codex-rs/tui/src/tui.rs b/codex-rs/tui/src/tui.rs index 7cbf252e7..5502b8335 100644 --- a/codex-rs/tui/src/tui.rs +++ b/codex-rs/tui/src/tui.rs @@ -9,8 +9,6 @@ use std::pin::Pin; use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; -use std::time::Duration; -use std::time::Instant; use crossterm::Command; use crossterm::SynchronizedUpdate; @@ -32,10 +30,13 @@ use ratatui::crossterm::execute; use ratatui::crossterm::terminal::disable_raw_mode; use ratatui::crossterm::terminal::enable_raw_mode; use ratatui::layout::Offset; +use ratatui::layout::Rect; use ratatui::text::Line; use tokio::select; +use tokio::sync::broadcast; use tokio_stream::Stream; +pub use self::frame_requester::FrameRequester; use crate::custom_terminal; use crate::custom_terminal::Terminal as CustomTerminal; #[cfg(unix)] @@ -43,6 +44,7 @@ use crate::tui::job_control::SUSPEND_KEY; #[cfg(unix)] use crate::tui::job_control::SuspendContext; +mod frame_requester; #[cfg(unix)] mod job_control; @@ -159,8 +161,8 @@ pub enum TuiEvent { } pub struct Tui { - frame_schedule_tx: tokio::sync::mpsc::UnboundedSender, - draw_tx: tokio::sync::broadcast::Sender<()>, + frame_requester: FrameRequester, + draw_tx: broadcast::Sender<()>, pub(crate) terminal: Terminal, pending_history_lines: Vec>, alt_saved_viewport: Option, @@ -173,36 +175,10 @@ pub struct Tui { enhanced_keys_supported: bool, } -#[derive(Clone, Debug)] -pub struct FrameRequester { - frame_schedule_tx: tokio::sync::mpsc::UnboundedSender, -} -impl FrameRequester { - pub fn schedule_frame(&self) { - let _ = self.frame_schedule_tx.send(Instant::now()); - } - pub fn schedule_frame_in(&self, dur: Duration) { - let _ = self.frame_schedule_tx.send(Instant::now() + dur); - } -} - -#[cfg(test)] -impl FrameRequester { - /// Create a no-op frame requester for tests. - pub(crate) fn test_dummy() -> Self { - let (tx, _rx) = tokio::sync::mpsc::unbounded_channel(); - FrameRequester { - frame_schedule_tx: tx, - } - } -} - impl Tui { pub fn new(terminal: Terminal) -> Self { - let (frame_schedule_tx, frame_schedule_rx) = tokio::sync::mpsc::unbounded_channel(); - let (draw_tx, _) = tokio::sync::broadcast::channel(1); - - spawn_frame_scheduler(frame_schedule_rx, draw_tx.clone()); + let (draw_tx, _) = broadcast::channel(1); + let frame_requester = FrameRequester::new(draw_tx.clone()); // Detect keyboard enhancement support before any EventStream is created so the // crossterm poller can acquire its lock without contention. @@ -212,7 +188,7 @@ impl Tui { let _ = crate::terminal_palette::default_colors(); Self { - frame_schedule_tx, + frame_requester, draw_tx, terminal, pending_history_lines: vec![], @@ -226,9 +202,7 @@ impl Tui { } pub fn frame_requester(&self) -> FrameRequester { - FrameRequester { - frame_schedule_tx: self.frame_schedule_tx.clone(), - } + self.frame_requester.clone() } pub fn enhanced_keys_supported(&self) -> bool { @@ -362,34 +336,14 @@ impl Tui { // Precompute any viewport updates that need a cursor-position query before entering // the synchronized update, to avoid racing with the event reader. - let mut pending_viewport_area: Option = None; - { - let terminal = &mut self.terminal; - let screen_size = terminal.size()?; - let last_known_screen_size = terminal.last_known_screen_size; - if screen_size != last_known_screen_size - && let Ok(cursor_pos) = terminal.get_cursor_position() - { - let last_known_cursor_pos = terminal.last_known_cursor_pos; - // If we resized AND the cursor moved, we adjust the viewport area to keep the - // cursor in the same position. This is a heuristic that seems to work well - // at least in iTerm2. - if cursor_pos.y != last_known_cursor_pos.y { - let cursor_delta = cursor_pos.y as i32 - last_known_cursor_pos.y as i32; - let new_viewport_area = terminal.viewport_area.offset(Offset { - x: 0, - y: cursor_delta, - }); - pending_viewport_area = Some(new_viewport_area); - } - } - } + let mut pending_viewport_area = self.pending_viewport_area()?; stdout().sync_update(|_| { #[cfg(unix)] if let Some(prepared) = prepared_resume.take() { prepared.apply(&mut self.terminal)?; } + let terminal = &mut self.terminal; if let Some(new_area) = pending_viewport_area.take() { terminal.set_viewport_area(new_area); @@ -440,51 +394,28 @@ impl Tui { }) })? } -} -/// Spawn background scheduler to coalesce frame requests and emit draws at deadlines. -fn spawn_frame_scheduler( - frame_schedule_rx: tokio::sync::mpsc::UnboundedReceiver, - draw_tx: tokio::sync::broadcast::Sender<()>, -) { - tokio::spawn(async move { - use tokio::select; - use tokio::time::Instant as TokioInstant; - use tokio::time::sleep_until; - - let mut rx = frame_schedule_rx; - let mut next_deadline: Option = None; - - loop { - let target = next_deadline - .unwrap_or_else(|| Instant::now() + Duration::from_secs(60 * 60 * 24 * 365)); - let sleep_fut = sleep_until(TokioInstant::from_std(target)); - tokio::pin!(sleep_fut); - - select! { - recv = rx.recv() => { - match recv { - Some(at) => { - if next_deadline.is_none_or(|cur| at < cur) { - next_deadline = Some(at); - } - // Do not send a draw immediately here. By continuing the loop, - // we recompute the sleep target so the draw fires once via the - // sleep branch, coalescing multiple requests into a single draw. - continue; - } - None => break, - } - } - _ = &mut sleep_fut => { - if next_deadline.is_some() { - next_deadline = None; - let _ = draw_tx.send(()); - } - } + fn pending_viewport_area(&mut self) -> Result> { + let terminal = &mut self.terminal; + let screen_size = terminal.size()?; + let last_known_screen_size = terminal.last_known_screen_size; + if screen_size != last_known_screen_size + && let Ok(cursor_pos) = terminal.get_cursor_position() + { + let last_known_cursor_pos = terminal.last_known_cursor_pos; + // If we resized AND the cursor moved, we adjust the viewport area to keep the + // cursor in the same position. This is a heuristic that seems to work well + // at least in iTerm2. + if cursor_pos.y != last_known_cursor_pos.y { + let offset = Offset { + x: 0, + y: cursor_pos.y as i32 - last_known_cursor_pos.y as i32, + }; + return Ok(Some(terminal.viewport_area.offset(offset))); } } - }); + Ok(None) + } } /// Command that emits an OSC 9 desktop notification with a message. diff --git a/codex-rs/tui/src/tui/frame_requester.rs b/codex-rs/tui/src/tui/frame_requester.rs new file mode 100644 index 000000000..4f7886aa2 --- /dev/null +++ b/codex-rs/tui/src/tui/frame_requester.rs @@ -0,0 +1,249 @@ +//! Frame draw scheduling utilities for the TUI. +//! +//! This module exposes [`FrameRequester`], a lightweight handle that widgets and +//! background tasks can clone to request future redraws of the TUI. +//! +//! Internally it spawns a [`FrameScheduler`] task that coalesces many requests +//! into a single notification on a broadcast channel used by the main TUI event +//! loop. This keeps animations and status updates smooth without redrawing more +//! often than necessary. +//! +//! This follows the actor-style design from +//! [“Actors with Tokio”](https://ryhl.io/blog/actors-with-tokio/), with a +//! dedicated scheduler task and lightweight request handles. + +use std::time::Duration; +use std::time::Instant; + +use tokio::sync::broadcast; +use tokio::sync::mpsc; + +/// A requester for scheduling future frame draws on the TUI event loop. +/// +/// This is the handler side of an actor/handler pair with `FrameScheduler`, which coalesces +/// multiple frame requests into a single draw operation. +/// +/// Clones of this type can be freely shared across tasks to make it possible to trigger frame draws +/// from anywhere in the TUI code. +#[derive(Clone, Debug)] +pub struct FrameRequester { + frame_schedule_tx: mpsc::UnboundedSender, +} + +impl FrameRequester { + /// Create a new FrameRequester and spawn its associated FrameScheduler task. + /// + /// The provided `draw_tx` is used to notify the TUI event loop of scheduled draws. + pub fn new(draw_tx: broadcast::Sender<()>) -> Self { + let (tx, rx) = mpsc::unbounded_channel(); + let scheduler = FrameScheduler::new(rx, draw_tx); + tokio::spawn(scheduler.run()); + Self { + frame_schedule_tx: tx, + } + } + + /// Schedule a frame draw as soon as possible. + pub fn schedule_frame(&self) { + let _ = self.frame_schedule_tx.send(Instant::now()); + } + + /// Schedule a frame draw to occur after the specified duration. + pub fn schedule_frame_in(&self, dur: Duration) { + let _ = self.frame_schedule_tx.send(Instant::now() + dur); + } +} + +#[cfg(test)] +impl FrameRequester { + /// Create a no-op frame requester for tests. + pub(crate) fn test_dummy() -> Self { + let (tx, _rx) = mpsc::unbounded_channel(); + FrameRequester { + frame_schedule_tx: tx, + } + } +} + +/// A scheduler for coalescing frame draw requests and notifying the TUI event loop. +/// +/// This type is internal to `FrameRequester` and is spawned as a task to handle scheduling logic. +struct FrameScheduler { + receiver: mpsc::UnboundedReceiver, + draw_tx: broadcast::Sender<()>, +} + +impl FrameScheduler { + /// Create a new FrameScheduler with the provided receiver and draw notification sender. + fn new(receiver: mpsc::UnboundedReceiver, draw_tx: broadcast::Sender<()>) -> Self { + Self { receiver, draw_tx } + } + + /// Run the scheduling loop, coalescing frame requests and notifying the TUI event loop. + /// + /// This method runs indefinitely until all senders are dropped. A single draw notification + /// is sent for multiple requests scheduled before the next draw deadline. + async fn run(mut self) { + const ONE_YEAR: Duration = Duration::from_secs(60 * 60 * 24 * 365); + let mut next_deadline: Option = None; + loop { + let target = next_deadline.unwrap_or_else(|| Instant::now() + ONE_YEAR); + let deadline = tokio::time::sleep_until(target.into()); + tokio::pin!(deadline); + + tokio::select! { + draw_at = self.receiver.recv() => { + let Some(draw_at) = draw_at else { + // All senders dropped; exit the scheduler. + break + }; + next_deadline = Some(next_deadline.map_or(draw_at, |cur| cur.min(draw_at))); + + // Do not send a draw immediately here. By continuing the loop, + // we recompute the sleep target so the draw fires once via the + // sleep branch, coalescing multiple requests into a single draw. + continue; + } + _ = &mut deadline => { + if next_deadline.is_some() { + next_deadline = None; + let _ = self.draw_tx.send(()); + } + } + } + } + } +} +#[cfg(test)] +mod tests { + use super::*; + use tokio::time; + use tokio_util::time::FutureExt; + + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn test_schedule_frame_immediate_triggers_once() { + let (draw_tx, mut draw_rx) = broadcast::channel(16); + let requester = FrameRequester::new(draw_tx); + + requester.schedule_frame(); + + // Advance time minimally to let the scheduler process and hit the deadline == now. + time::advance(Duration::from_millis(1)).await; + + // First draw should arrive. + let first = draw_rx + .recv() + .timeout(Duration::from_millis(50)) + .await + .expect("timed out waiting for first draw"); + assert!(first.is_ok(), "broadcast closed unexpectedly"); + + // No second draw should arrive. + let second = draw_rx.recv().timeout(Duration::from_millis(20)).await; + assert!(second.is_err(), "unexpected extra draw received"); + } + + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn test_schedule_frame_in_triggers_at_delay() { + let (draw_tx, mut draw_rx) = broadcast::channel(16); + let requester = FrameRequester::new(draw_tx); + + requester.schedule_frame_in(Duration::from_millis(50)); + + // Advance less than the delay: no draw yet. + time::advance(Duration::from_millis(30)).await; + let early = draw_rx.recv().timeout(Duration::from_millis(10)).await; + assert!(early.is_err(), "draw fired too early"); + + // Advance past the deadline: one draw should fire. + time::advance(Duration::from_millis(25)).await; + let first = draw_rx + .recv() + .timeout(Duration::from_millis(50)) + .await + .expect("timed out waiting for scheduled draw"); + assert!(first.is_ok(), "broadcast closed unexpectedly"); + + // No second draw should arrive. + let second = draw_rx.recv().timeout(Duration::from_millis(20)).await; + assert!(second.is_err(), "unexpected extra draw received"); + } + + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn test_coalesces_multiple_requests_into_single_draw() { + let (draw_tx, mut draw_rx) = broadcast::channel(16); + let requester = FrameRequester::new(draw_tx); + + // Schedule multiple immediate requests close together. + requester.schedule_frame(); + requester.schedule_frame(); + requester.schedule_frame(); + + // Allow the scheduler to process and hit the coalesced deadline. + time::advance(Duration::from_millis(1)).await; + + // Expect only a single draw notification despite three requests. + let first = draw_rx + .recv() + .timeout(Duration::from_millis(50)) + .await + .expect("timed out waiting for coalesced draw"); + assert!(first.is_ok(), "broadcast closed unexpectedly"); + + // No additional draw should be sent for the same coalesced batch. + let second = draw_rx.recv().timeout(Duration::from_millis(20)).await; + assert!(second.is_err(), "unexpected extra draw received"); + } + + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn test_coalesces_mixed_immediate_and_delayed_requests() { + let (draw_tx, mut draw_rx) = broadcast::channel(16); + let requester = FrameRequester::new(draw_tx); + + // Schedule a delayed draw and then an immediate one; should coalesce and fire at the earliest (immediate). + requester.schedule_frame_in(Duration::from_millis(100)); + requester.schedule_frame(); + + time::advance(Duration::from_millis(1)).await; + + let first = draw_rx + .recv() + .timeout(Duration::from_millis(50)) + .await + .expect("timed out waiting for coalesced immediate draw"); + assert!(first.is_ok(), "broadcast closed unexpectedly"); + + // The later delayed request should have been coalesced into the earlier one; no second draw. + let second = draw_rx.recv().timeout(Duration::from_millis(120)).await; + assert!(second.is_err(), "unexpected extra draw received"); + } + + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn test_multiple_delayed_requests_coalesce_to_earliest() { + let (draw_tx, mut draw_rx) = broadcast::channel(16); + let requester = FrameRequester::new(draw_tx); + + // Schedule multiple delayed draws; they should coalesce to the earliest (10ms). + requester.schedule_frame_in(Duration::from_millis(100)); + requester.schedule_frame_in(Duration::from_millis(20)); + requester.schedule_frame_in(Duration::from_millis(120)); + + // Advance to just before the earliest deadline: no draw yet. + time::advance(Duration::from_millis(10)).await; + let early = draw_rx.recv().timeout(Duration::from_millis(10)).await; + assert!(early.is_err(), "draw fired too early"); + + // Advance past the earliest deadline: one draw should fire. + time::advance(Duration::from_millis(20)).await; + let first = draw_rx + .recv() + .timeout(Duration::from_millis(50)) + .await + .expect("timed out waiting for earliest coalesced draw"); + assert!(first.is_ok(), "broadcast closed unexpectedly"); + + // No additional draw should fire for the later delayed requests. + let second = draw_rx.recv().timeout(Duration::from_millis(120)).await; + assert!(second.is_err(), "unexpected extra draw received"); + } +}