refactor: tui.rs extract several pieces (#7461)

Pull FrameRequester out of tui.rs into its own module and make a
FrameScheduler struct. This is effectively an Actor/Handler approach
(see https://ryhl.io/blog/actors-with-tokio/). Adds tests and docs.

Small refactor of pending_viewport_area logic.
This commit is contained in:
Josh McKinney 2025-12-02 15:19:27 -08:00 committed by GitHub
parent ec93b6daf3
commit 58e1e570fa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 283 additions and 100 deletions

2
codex-rs/Cargo.lock generated
View file

@ -1615,6 +1615,7 @@ dependencies = [
"textwrap 0.16.2",
"tokio",
"tokio-stream",
"tokio-util",
"toml",
"tracing",
"tracing-appender",
@ -6600,6 +6601,7 @@ dependencies = [
"futures-sink",
"futures-util",
"pin-project-lite",
"slab",
"tokio",
]

View file

@ -91,6 +91,7 @@ unicode-width = { workspace = true }
url = { workspace = true }
codex-windows-sandbox = { workspace = true }
tokio-util = { workspace = true, features = ["time"] }
[target.'cfg(unix)'.dependencies]
libc = { workspace = true }

View file

@ -9,8 +9,6 @@ use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
use crossterm::Command;
use crossterm::SynchronizedUpdate;
@ -32,10 +30,13 @@ use ratatui::crossterm::execute;
use ratatui::crossterm::terminal::disable_raw_mode;
use ratatui::crossterm::terminal::enable_raw_mode;
use ratatui::layout::Offset;
use ratatui::layout::Rect;
use ratatui::text::Line;
use tokio::select;
use tokio::sync::broadcast;
use tokio_stream::Stream;
pub use self::frame_requester::FrameRequester;
use crate::custom_terminal;
use crate::custom_terminal::Terminal as CustomTerminal;
#[cfg(unix)]
@ -43,6 +44,7 @@ use crate::tui::job_control::SUSPEND_KEY;
#[cfg(unix)]
use crate::tui::job_control::SuspendContext;
mod frame_requester;
#[cfg(unix)]
mod job_control;
@ -159,8 +161,8 @@ pub enum TuiEvent {
}
pub struct Tui {
frame_schedule_tx: tokio::sync::mpsc::UnboundedSender<Instant>,
draw_tx: tokio::sync::broadcast::Sender<()>,
frame_requester: FrameRequester,
draw_tx: broadcast::Sender<()>,
pub(crate) terminal: Terminal,
pending_history_lines: Vec<Line<'static>>,
alt_saved_viewport: Option<ratatui::layout::Rect>,
@ -173,36 +175,10 @@ pub struct Tui {
enhanced_keys_supported: bool,
}
#[derive(Clone, Debug)]
pub struct FrameRequester {
frame_schedule_tx: tokio::sync::mpsc::UnboundedSender<Instant>,
}
impl FrameRequester {
pub fn schedule_frame(&self) {
let _ = self.frame_schedule_tx.send(Instant::now());
}
pub fn schedule_frame_in(&self, dur: Duration) {
let _ = self.frame_schedule_tx.send(Instant::now() + dur);
}
}
#[cfg(test)]
impl FrameRequester {
/// Create a no-op frame requester for tests.
pub(crate) fn test_dummy() -> Self {
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
FrameRequester {
frame_schedule_tx: tx,
}
}
}
impl Tui {
pub fn new(terminal: Terminal) -> Self {
let (frame_schedule_tx, frame_schedule_rx) = tokio::sync::mpsc::unbounded_channel();
let (draw_tx, _) = tokio::sync::broadcast::channel(1);
spawn_frame_scheduler(frame_schedule_rx, draw_tx.clone());
let (draw_tx, _) = broadcast::channel(1);
let frame_requester = FrameRequester::new(draw_tx.clone());
// Detect keyboard enhancement support before any EventStream is created so the
// crossterm poller can acquire its lock without contention.
@ -212,7 +188,7 @@ impl Tui {
let _ = crate::terminal_palette::default_colors();
Self {
frame_schedule_tx,
frame_requester,
draw_tx,
terminal,
pending_history_lines: vec![],
@ -226,9 +202,7 @@ impl Tui {
}
pub fn frame_requester(&self) -> FrameRequester {
FrameRequester {
frame_schedule_tx: self.frame_schedule_tx.clone(),
}
self.frame_requester.clone()
}
pub fn enhanced_keys_supported(&self) -> bool {
@ -362,34 +336,14 @@ impl Tui {
// Precompute any viewport updates that need a cursor-position query before entering
// the synchronized update, to avoid racing with the event reader.
let mut pending_viewport_area: Option<ratatui::layout::Rect> = None;
{
let terminal = &mut self.terminal;
let screen_size = terminal.size()?;
let last_known_screen_size = terminal.last_known_screen_size;
if screen_size != last_known_screen_size
&& let Ok(cursor_pos) = terminal.get_cursor_position()
{
let last_known_cursor_pos = terminal.last_known_cursor_pos;
// If we resized AND the cursor moved, we adjust the viewport area to keep the
// cursor in the same position. This is a heuristic that seems to work well
// at least in iTerm2.
if cursor_pos.y != last_known_cursor_pos.y {
let cursor_delta = cursor_pos.y as i32 - last_known_cursor_pos.y as i32;
let new_viewport_area = terminal.viewport_area.offset(Offset {
x: 0,
y: cursor_delta,
});
pending_viewport_area = Some(new_viewport_area);
}
}
}
let mut pending_viewport_area = self.pending_viewport_area()?;
stdout().sync_update(|_| {
#[cfg(unix)]
if let Some(prepared) = prepared_resume.take() {
prepared.apply(&mut self.terminal)?;
}
let terminal = &mut self.terminal;
if let Some(new_area) = pending_viewport_area.take() {
terminal.set_viewport_area(new_area);
@ -440,51 +394,28 @@ impl Tui {
})
})?
}
}
/// Spawn background scheduler to coalesce frame requests and emit draws at deadlines.
fn spawn_frame_scheduler(
frame_schedule_rx: tokio::sync::mpsc::UnboundedReceiver<Instant>,
draw_tx: tokio::sync::broadcast::Sender<()>,
) {
tokio::spawn(async move {
use tokio::select;
use tokio::time::Instant as TokioInstant;
use tokio::time::sleep_until;
let mut rx = frame_schedule_rx;
let mut next_deadline: Option<Instant> = None;
loop {
let target = next_deadline
.unwrap_or_else(|| Instant::now() + Duration::from_secs(60 * 60 * 24 * 365));
let sleep_fut = sleep_until(TokioInstant::from_std(target));
tokio::pin!(sleep_fut);
select! {
recv = rx.recv() => {
match recv {
Some(at) => {
if next_deadline.is_none_or(|cur| at < cur) {
next_deadline = Some(at);
}
// Do not send a draw immediately here. By continuing the loop,
// we recompute the sleep target so the draw fires once via the
// sleep branch, coalescing multiple requests into a single draw.
continue;
}
None => break,
}
}
_ = &mut sleep_fut => {
if next_deadline.is_some() {
next_deadline = None;
let _ = draw_tx.send(());
}
}
fn pending_viewport_area(&mut self) -> Result<Option<Rect>> {
let terminal = &mut self.terminal;
let screen_size = terminal.size()?;
let last_known_screen_size = terminal.last_known_screen_size;
if screen_size != last_known_screen_size
&& let Ok(cursor_pos) = terminal.get_cursor_position()
{
let last_known_cursor_pos = terminal.last_known_cursor_pos;
// If we resized AND the cursor moved, we adjust the viewport area to keep the
// cursor in the same position. This is a heuristic that seems to work well
// at least in iTerm2.
if cursor_pos.y != last_known_cursor_pos.y {
let offset = Offset {
x: 0,
y: cursor_pos.y as i32 - last_known_cursor_pos.y as i32,
};
return Ok(Some(terminal.viewport_area.offset(offset)));
}
}
});
Ok(None)
}
}
/// Command that emits an OSC 9 desktop notification with a message.

View file

@ -0,0 +1,249 @@
//! Frame draw scheduling utilities for the TUI.
//!
//! This module exposes [`FrameRequester`], a lightweight handle that widgets and
//! background tasks can clone to request future redraws of the TUI.
//!
//! Internally it spawns a [`FrameScheduler`] task that coalesces many requests
//! into a single notification on a broadcast channel used by the main TUI event
//! loop. This keeps animations and status updates smooth without redrawing more
//! often than necessary.
//!
//! This follows the actor-style design from
//! [“Actors with Tokio”](https://ryhl.io/blog/actors-with-tokio/), with a
//! dedicated scheduler task and lightweight request handles.
use std::time::Duration;
use std::time::Instant;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
/// A requester for scheduling future frame draws on the TUI event loop.
///
/// This is the handler side of an actor/handler pair with `FrameScheduler`, which coalesces
/// multiple frame requests into a single draw operation.
///
/// Clones of this type can be freely shared across tasks to make it possible to trigger frame draws
/// from anywhere in the TUI code.
#[derive(Clone, Debug)]
pub struct FrameRequester {
frame_schedule_tx: mpsc::UnboundedSender<Instant>,
}
impl FrameRequester {
/// Create a new FrameRequester and spawn its associated FrameScheduler task.
///
/// The provided `draw_tx` is used to notify the TUI event loop of scheduled draws.
pub fn new(draw_tx: broadcast::Sender<()>) -> Self {
let (tx, rx) = mpsc::unbounded_channel();
let scheduler = FrameScheduler::new(rx, draw_tx);
tokio::spawn(scheduler.run());
Self {
frame_schedule_tx: tx,
}
}
/// Schedule a frame draw as soon as possible.
pub fn schedule_frame(&self) {
let _ = self.frame_schedule_tx.send(Instant::now());
}
/// Schedule a frame draw to occur after the specified duration.
pub fn schedule_frame_in(&self, dur: Duration) {
let _ = self.frame_schedule_tx.send(Instant::now() + dur);
}
}
#[cfg(test)]
impl FrameRequester {
/// Create a no-op frame requester for tests.
pub(crate) fn test_dummy() -> Self {
let (tx, _rx) = mpsc::unbounded_channel();
FrameRequester {
frame_schedule_tx: tx,
}
}
}
/// A scheduler for coalescing frame draw requests and notifying the TUI event loop.
///
/// This type is internal to `FrameRequester` and is spawned as a task to handle scheduling logic.
struct FrameScheduler {
receiver: mpsc::UnboundedReceiver<Instant>,
draw_tx: broadcast::Sender<()>,
}
impl FrameScheduler {
/// Create a new FrameScheduler with the provided receiver and draw notification sender.
fn new(receiver: mpsc::UnboundedReceiver<Instant>, draw_tx: broadcast::Sender<()>) -> Self {
Self { receiver, draw_tx }
}
/// Run the scheduling loop, coalescing frame requests and notifying the TUI event loop.
///
/// This method runs indefinitely until all senders are dropped. A single draw notification
/// is sent for multiple requests scheduled before the next draw deadline.
async fn run(mut self) {
const ONE_YEAR: Duration = Duration::from_secs(60 * 60 * 24 * 365);
let mut next_deadline: Option<Instant> = None;
loop {
let target = next_deadline.unwrap_or_else(|| Instant::now() + ONE_YEAR);
let deadline = tokio::time::sleep_until(target.into());
tokio::pin!(deadline);
tokio::select! {
draw_at = self.receiver.recv() => {
let Some(draw_at) = draw_at else {
// All senders dropped; exit the scheduler.
break
};
next_deadline = Some(next_deadline.map_or(draw_at, |cur| cur.min(draw_at)));
// Do not send a draw immediately here. By continuing the loop,
// we recompute the sleep target so the draw fires once via the
// sleep branch, coalescing multiple requests into a single draw.
continue;
}
_ = &mut deadline => {
if next_deadline.is_some() {
next_deadline = None;
let _ = self.draw_tx.send(());
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::time;
use tokio_util::time::FutureExt;
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_schedule_frame_immediate_triggers_once() {
let (draw_tx, mut draw_rx) = broadcast::channel(16);
let requester = FrameRequester::new(draw_tx);
requester.schedule_frame();
// Advance time minimally to let the scheduler process and hit the deadline == now.
time::advance(Duration::from_millis(1)).await;
// First draw should arrive.
let first = draw_rx
.recv()
.timeout(Duration::from_millis(50))
.await
.expect("timed out waiting for first draw");
assert!(first.is_ok(), "broadcast closed unexpectedly");
// No second draw should arrive.
let second = draw_rx.recv().timeout(Duration::from_millis(20)).await;
assert!(second.is_err(), "unexpected extra draw received");
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_schedule_frame_in_triggers_at_delay() {
let (draw_tx, mut draw_rx) = broadcast::channel(16);
let requester = FrameRequester::new(draw_tx);
requester.schedule_frame_in(Duration::from_millis(50));
// Advance less than the delay: no draw yet.
time::advance(Duration::from_millis(30)).await;
let early = draw_rx.recv().timeout(Duration::from_millis(10)).await;
assert!(early.is_err(), "draw fired too early");
// Advance past the deadline: one draw should fire.
time::advance(Duration::from_millis(25)).await;
let first = draw_rx
.recv()
.timeout(Duration::from_millis(50))
.await
.expect("timed out waiting for scheduled draw");
assert!(first.is_ok(), "broadcast closed unexpectedly");
// No second draw should arrive.
let second = draw_rx.recv().timeout(Duration::from_millis(20)).await;
assert!(second.is_err(), "unexpected extra draw received");
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_coalesces_multiple_requests_into_single_draw() {
let (draw_tx, mut draw_rx) = broadcast::channel(16);
let requester = FrameRequester::new(draw_tx);
// Schedule multiple immediate requests close together.
requester.schedule_frame();
requester.schedule_frame();
requester.schedule_frame();
// Allow the scheduler to process and hit the coalesced deadline.
time::advance(Duration::from_millis(1)).await;
// Expect only a single draw notification despite three requests.
let first = draw_rx
.recv()
.timeout(Duration::from_millis(50))
.await
.expect("timed out waiting for coalesced draw");
assert!(first.is_ok(), "broadcast closed unexpectedly");
// No additional draw should be sent for the same coalesced batch.
let second = draw_rx.recv().timeout(Duration::from_millis(20)).await;
assert!(second.is_err(), "unexpected extra draw received");
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_coalesces_mixed_immediate_and_delayed_requests() {
let (draw_tx, mut draw_rx) = broadcast::channel(16);
let requester = FrameRequester::new(draw_tx);
// Schedule a delayed draw and then an immediate one; should coalesce and fire at the earliest (immediate).
requester.schedule_frame_in(Duration::from_millis(100));
requester.schedule_frame();
time::advance(Duration::from_millis(1)).await;
let first = draw_rx
.recv()
.timeout(Duration::from_millis(50))
.await
.expect("timed out waiting for coalesced immediate draw");
assert!(first.is_ok(), "broadcast closed unexpectedly");
// The later delayed request should have been coalesced into the earlier one; no second draw.
let second = draw_rx.recv().timeout(Duration::from_millis(120)).await;
assert!(second.is_err(), "unexpected extra draw received");
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_multiple_delayed_requests_coalesce_to_earliest() {
let (draw_tx, mut draw_rx) = broadcast::channel(16);
let requester = FrameRequester::new(draw_tx);
// Schedule multiple delayed draws; they should coalesce to the earliest (10ms).
requester.schedule_frame_in(Duration::from_millis(100));
requester.schedule_frame_in(Duration::from_millis(20));
requester.schedule_frame_in(Duration::from_millis(120));
// Advance to just before the earliest deadline: no draw yet.
time::advance(Duration::from_millis(10)).await;
let early = draw_rx.recv().timeout(Duration::from_millis(10)).await;
assert!(early.is_err(), "draw fired too early");
// Advance past the earliest deadline: one draw should fire.
time::advance(Duration::from_millis(20)).await;
let first = draw_rx
.recv()
.timeout(Duration::from_millis(50))
.await
.expect("timed out waiting for earliest coalesced draw");
assert!(first.is_ok(), "broadcast closed unexpectedly");
// No additional draw should fire for the later delayed requests.
let second = draw_rx.recv().timeout(Duration::from_millis(120)).await;
assert!(second.is_err(), "unexpected extra draw received");
}
}