diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 11138fdc8..ae230e04d 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1701,7 +1701,6 @@ dependencies = [ "anyhow", "arboard", "assert_matches", - "async-stream", "base64", "chrono", "clap", @@ -6910,6 +6909,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/codex-rs/tui/Cargo.toml b/codex-rs/tui/Cargo.toml index 1d9f31159..06167c973 100644 --- a/codex-rs/tui/Cargo.toml +++ b/codex-rs/tui/Cargo.toml @@ -23,7 +23,6 @@ workspace = true [dependencies] anyhow = { workspace = true } -async-stream = { workspace = true } base64 = { workspace = true } chrono = { workspace = true, features = ["serde"] } clap = { workspace = true, features = ["derive"] } @@ -81,7 +80,7 @@ tokio = { workspace = true, features = [ "test-util", "time", ] } -tokio-stream = { workspace = true } +tokio-stream = { workspace = true, features = ["sync"] } toml = { workspace = true } tracing = { workspace = true, features = ["log"] } tracing-appender = { workspace = true } diff --git a/codex-rs/tui/src/tui.rs b/codex-rs/tui/src/tui.rs index f9566da90..0770b7664 100644 --- a/codex-rs/tui/src/tui.rs +++ b/codex-rs/tui/src/tui.rs @@ -16,7 +16,6 @@ use crossterm::event::DisableBracketedPaste; use crossterm::event::DisableFocusChange; use crossterm::event::EnableBracketedPaste; use crossterm::event::EnableFocusChange; -use crossterm::event::Event; use crossterm::event::KeyEvent; use crossterm::event::KeyboardEnhancementFlags; use crossterm::event::PopKeyboardEnhancementFlags; @@ -32,7 +31,6 @@ use ratatui::crossterm::terminal::enable_raw_mode; use ratatui::layout::Offset; use ratatui::layout::Rect; use ratatui::text::Line; -use tokio::select; use tokio::sync::broadcast; use tokio_stream::Stream; @@ -42,11 +40,12 @@ use crate::custom_terminal::Terminal as CustomTerminal; use crate::notifications::DesktopNotificationBackend; use crate::notifications::NotificationBackendKind; use crate::notifications::detect_backend; -#[cfg(unix)] -use crate::tui::job_control::SUSPEND_KEY; +use crate::tui::event_stream::EventBroker; +use crate::tui::event_stream::TuiEventStream; #[cfg(unix)] use crate::tui::job_control::SuspendContext; +mod event_stream; mod frame_requester; #[cfg(unix)] mod job_control; @@ -156,7 +155,7 @@ fn set_panic_hook() { })); } -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum TuiEvent { Key(KeyEvent), Paste(String), @@ -166,6 +165,7 @@ pub enum TuiEvent { pub struct Tui { frame_requester: FrameRequester, draw_tx: broadcast::Sender<()>, + event_broker: Arc, pub(crate) terminal: Terminal, pending_history_lines: Vec>, alt_saved_viewport: Option, @@ -194,6 +194,7 @@ impl Tui { Self { frame_requester, draw_tx, + event_broker: Arc::new(EventBroker::new()), terminal, pending_history_lines: vec![], alt_saved_viewport: None, @@ -214,6 +215,18 @@ impl Tui { self.enhanced_keys_supported } + // todo(sayan) unused for now; intend to use to enable opening external editors + #[allow(unused)] + pub fn pause_events(&mut self) { + self.event_broker.pause_events(); + } + + // todo(sayan) unused for now; intend to use to enable opening external editors + #[allow(unused)] + pub fn resume_events(&mut self) { + self.event_broker.resume_events(); + } + /// Emit a desktop notification now if the terminal is unfocused. /// Returns true if a notification was posted. pub fn notify(&mut self, message: impl AsRef) -> bool { @@ -262,79 +275,21 @@ impl Tui { } pub fn event_stream(&self) -> Pin + Send + 'static>> { - use tokio_stream::StreamExt; - - let mut crossterm_events = crossterm::event::EventStream::new(); - let mut draw_rx = self.draw_tx.subscribe(); - - // State for tracking how we should resume from ^Z suspend. #[cfg(unix)] - let suspend_context = self.suspend_context.clone(); - #[cfg(unix)] - let alt_screen_active = self.alt_screen_active.clone(); - - let terminal_focused = self.terminal_focused.clone(); - let event_stream = async_stream::stream! { - loop { - select! { - event_result = crossterm_events.next() => { - match event_result { - Some(Ok(event)) => { - match event { - Event::Key(key_event) => { - #[cfg(unix)] - if SUSPEND_KEY.is_press(key_event) { - let _ = suspend_context.suspend(&alt_screen_active); - // We continue here after resume. - yield TuiEvent::Draw; - continue; - } - yield TuiEvent::Key(key_event); - } - Event::Resize(_, _) => { - yield TuiEvent::Draw; - } - Event::Paste(pasted) => { - yield TuiEvent::Paste(pasted); - } - Event::FocusGained => { - terminal_focused.store(true, Ordering::Relaxed); - crate::terminal_palette::requery_default_colors(); - yield TuiEvent::Draw; - } - Event::FocusLost => { - terminal_focused.store(false, Ordering::Relaxed); - } - _ => {} - } - } - Some(Err(_)) | None => { - // Exit the loop in case of broken pipe as we will never - // recover from it - break; - } - } - } - result = draw_rx.recv() => { - match result { - Ok(_) => { - yield TuiEvent::Draw; - } - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { - // We dropped one or more draw notifications; coalesce to a single draw. - yield TuiEvent::Draw; - } - Err(tokio::sync::broadcast::error::RecvError::Closed) => { - // Sender dropped. This stream likely outlived its owning `Tui`; - // exit to avoid spinning on a permanently-closed receiver. - break; - } - } - } - } - } - }; - Box::pin(event_stream) + let stream = TuiEventStream::new( + self.event_broker.clone(), + self.draw_tx.subscribe(), + self.terminal_focused.clone(), + self.suspend_context.clone(), + self.alt_screen_active.clone(), + ); + #[cfg(not(unix))] + let stream = TuiEventStream::new( + self.event_broker.clone(), + self.draw_tx.subscribe(), + self.terminal_focused.clone(), + ); + Box::pin(stream) } /// Enter alternate screen and expand the viewport to full terminal size, saving the current diff --git a/codex-rs/tui/src/tui/event_stream.rs b/codex-rs/tui/src/tui/event_stream.rs new file mode 100644 index 000000000..2ce0aa7d2 --- /dev/null +++ b/codex-rs/tui/src/tui/event_stream.rs @@ -0,0 +1,511 @@ +//! Event stream plumbing for the TUI. +//! +//! - [`EventBroker`] holds the shared crossterm stream so multiple callers reuse the same +//! input source and can drop/recreate it on pause/resume without rebuilding consumers. +//! - [`TuiEventStream`] wraps a draw event subscription plus the shared [`EventBroker`] and maps crossterm +//! events into [`TuiEvent`]. +//! - [`EventSource`] abstracts the underlying event producer; the real implementation is +//! [`CrosstermEventSource`] and tests can swap in [`FakeEventSource`]. +//! +//! The motivation for dropping/recreating the crossterm event stream is to enable the TUI to fully relinquish stdin. +//! If the stream is not dropped, it will continue to read from stdin even if it is not actively being polled +//! (due to how crossterm's EventStream is implemented), potentially stealing input from other processes reading stdin, +//! like terminal text editors. This race can cause missed input or capturing terminal query responses (for example, OSC palette/size queries) +//! that the other process expects to read. Stopping polling, instead of dropping the stream, is only sufficient when the +//! pause happens before the stream enters a pending state; otherwise the crossterm reader thread may keep reading +//! from stdin, so the safer approach is to drop and recreate the event stream when we need to hand off the terminal. +//! +//! See https://ratatui.rs/recipes/apps/spawn-vim/ and https://www.reddit.com/r/rust/comments/1f3o33u/myterious_crossterm_input_after_running_vim for more details. + +use std::pin::Pin; +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::task::Context; +use std::task::Poll; + +use crossterm::event::Event; +use tokio::sync::broadcast; +use tokio::sync::watch; +use tokio_stream::Stream; +use tokio_stream::wrappers::BroadcastStream; +use tokio_stream::wrappers::WatchStream; +use tokio_stream::wrappers::errors::BroadcastStreamRecvError; + +use super::TuiEvent; + +/// Result type produced by an event source. +pub type EventResult = std::io::Result; + +/// Abstraction over a source of terminal events. Allows swapping in a fake for tests. +/// Value in production is [`CrosstermEventSource`]. +pub trait EventSource: Send + 'static { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; +} + +/// Shared crossterm input state for all [`TuiEventStream`] instances. A single crossterm EventStream +/// is reused so all streams still see the same input source. +/// +/// This intermediate layer enables dropping/recreating the underlying EventStream (pause/resume) without rebuilding consumers. +pub struct EventBroker { + state: Mutex>, + resume_events_tx: watch::Sender<()>, +} + +/// Tracks state of underlying [`EventSource`]. +enum EventBrokerState { + Paused, // Underlying event source (i.e., crossterm EventStream) dropped + Start, // A new event source will be created on next poll + Running(S), // Event source is currently running +} + +impl EventBrokerState { + /// Return the running event source, starting it if needed; None when paused. + fn active_event_source_mut(&mut self) -> Option<&mut S> { + match self { + EventBrokerState::Paused => None, + EventBrokerState::Start => { + *self = EventBrokerState::Running(S::default()); + match self { + EventBrokerState::Running(events) => Some(events), + EventBrokerState::Paused | EventBrokerState::Start => unreachable!(), + } + } + EventBrokerState::Running(events) => Some(events), + } + } +} + +impl EventBroker { + pub fn new() -> Self { + let (resume_events_tx, _resume_events_rx) = watch::channel(()); + Self { + state: Mutex::new(EventBrokerState::Start), + resume_events_tx, + } + } + + /// Drop the underlying event source + pub fn pause_events(&self) { + let mut state = self + .state + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + *state = EventBrokerState::Paused; + } + + /// Create a new instance of the underlying event source + pub fn resume_events(&self) { + let mut state = self + .state + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + *state = EventBrokerState::Start; + let _ = self.resume_events_tx.send(()); + } + + /// Subscribe to a notification that fires whenever [`Self::resume_events`] is called. + /// + /// This is used to wake `poll_crossterm_event` when it is paused and waiting for the + /// underlying crossterm stream to be recreated. + pub fn resume_events_rx(&self) -> watch::Receiver<()> { + self.resume_events_tx.subscribe() + } +} + +/// Real crossterm-backed event source. +pub struct CrosstermEventSource(pub crossterm::event::EventStream); + +impl Default for CrosstermEventSource { + fn default() -> Self { + Self(crossterm::event::EventStream::new()) + } +} + +impl EventSource for CrosstermEventSource { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_next(cx) + } +} + +/// TuiEventStream is a struct for reading TUI events (draws and user input). +/// Each instance has its own draw subscription (the draw channel is broadcast, so +/// multiple receivers are fine), while crossterm input is funneled through a +/// single shared [`EventBroker`] because crossterm uses a global stdin reader and +/// does not support fan-out. Multiple TuiEventStream instances can exist during the app lifetime +/// (for nested or sequential screens), but only one should be polled at a time, +/// otherwise one instance can consume ("steal") input events and the other will miss them. +pub struct TuiEventStream { + broker: Arc>, + draw_stream: BroadcastStream<()>, + resume_stream: WatchStream<()>, + terminal_focused: Arc, + poll_draw_first: bool, + #[cfg(unix)] + suspend_context: crate::tui::job_control::SuspendContext, + #[cfg(unix)] + alt_screen_active: Arc, +} + +impl TuiEventStream { + pub fn new( + broker: Arc>, + draw_rx: broadcast::Receiver<()>, + terminal_focused: Arc, + #[cfg(unix)] suspend_context: crate::tui::job_control::SuspendContext, + #[cfg(unix)] alt_screen_active: Arc, + ) -> Self { + let resume_stream = WatchStream::from_changes(broker.resume_events_rx()); + Self { + broker, + draw_stream: BroadcastStream::new(draw_rx), + resume_stream, + terminal_focused, + poll_draw_first: false, + #[cfg(unix)] + suspend_context, + #[cfg(unix)] + alt_screen_active, + } + } + + /// Poll the shared crossterm stream for the next mapped `TuiEvent`. + /// + /// This skips events we don't use (mouse events, etc.) and keeps polling until it yields + /// a mapped event, hits `Pending`, or sees EOF/error. When the broker is paused, it drops + /// the underlying stream and returns `Pending` to fully release stdin. + pub fn poll_crossterm_event(&mut self, cx: &mut Context<'_>) -> Poll> { + // Some crossterm events map to None (e.g. FocusLost, mouse); loop so we keep polling + // until we return a mapped event, hit Pending, or see EOF/error. + loop { + let poll_result = { + let mut state = self + .broker + .state + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let events = match state.active_event_source_mut() { + Some(events) => events, + None => { + drop(state); + // Poll resume_stream so resume_events wakes a stream paused here + match Pin::new(&mut self.resume_stream).poll_next(cx) { + Poll::Ready(Some(())) => continue, + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => return Poll::Pending, + } + } + }; + match Pin::new(events).poll_next(cx) { + Poll::Ready(Some(Ok(event))) => Some(event), + Poll::Ready(Some(Err(_))) | Poll::Ready(None) => { + *state = EventBrokerState::Start; + return Poll::Ready(None); + } + Poll::Pending => { + drop(state); + // Poll resume_stream so resume_events can wake us even while waiting on stdin + match Pin::new(&mut self.resume_stream).poll_next(cx) { + Poll::Ready(Some(())) => continue, + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => return Poll::Pending, + } + } + } + }; + + if let Some(mapped) = poll_result.and_then(|event| self.map_crossterm_event(event)) { + return Poll::Ready(Some(mapped)); + } + } + } + + /// Poll the draw broadcast stream for the next draw event. Draw events are used to trigger a redraw of the TUI. + pub fn poll_draw_event(&mut self, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.draw_stream).poll_next(cx) { + Poll::Ready(Some(Ok(()))) => Poll::Ready(Some(TuiEvent::Draw)), + Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(_)))) => { + Poll::Ready(Some(TuiEvent::Draw)) + } + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } + + /// Map a crossterm event to a [`TuiEvent`], skipping events we don't use (mouse events, etc.). + fn map_crossterm_event(&mut self, event: Event) -> Option { + match event { + Event::Key(key_event) => { + #[cfg(unix)] + if crate::tui::job_control::SUSPEND_KEY.is_press(key_event) { + let _ = self.suspend_context.suspend(&self.alt_screen_active); + return Some(TuiEvent::Draw); + } + Some(TuiEvent::Key(key_event)) + } + Event::Resize(_, _) => Some(TuiEvent::Draw), + Event::Paste(pasted) => Some(TuiEvent::Paste(pasted)), + Event::FocusGained => { + self.terminal_focused.store(true, Ordering::Relaxed); + crate::terminal_palette::requery_default_colors(); + Some(TuiEvent::Draw) + } + Event::FocusLost => { + self.terminal_focused.store(false, Ordering::Relaxed); + None + } + _ => None, + } + } +} + +impl Unpin for TuiEventStream {} + +impl Stream for TuiEventStream { + type Item = TuiEvent; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // approximate fairness + no starvation via round-robin. + let draw_first = self.poll_draw_first; + self.poll_draw_first = !self.poll_draw_first; + + if draw_first { + if let Poll::Ready(event) = self.poll_draw_event(cx) { + return Poll::Ready(event); + } + if let Poll::Ready(event) = self.poll_crossterm_event(cx) { + return Poll::Ready(event); + } + } else { + if let Poll::Ready(event) = self.poll_crossterm_event(cx) { + return Poll::Ready(event); + } + if let Poll::Ready(event) = self.poll_draw_event(cx) { + return Poll::Ready(event); + } + } + + Poll::Pending + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crossterm::event::Event; + use crossterm::event::KeyCode; + use crossterm::event::KeyEvent; + use crossterm::event::KeyModifiers; + use pretty_assertions::assert_eq; + use std::task::Context; + use std::task::Poll; + use std::time::Duration; + use tokio::sync::broadcast; + use tokio::sync::mpsc; + use tokio::time::timeout; + use tokio_stream::StreamExt; + + /// Simple fake event source for tests; feed events via the handle. + struct FakeEventSource { + rx: mpsc::UnboundedReceiver, + tx: mpsc::UnboundedSender, + } + + struct FakeEventSourceHandle { + broker: Arc>, + } + + impl FakeEventSource { + fn new() -> Self { + let (tx, rx) = mpsc::unbounded_channel(); + Self { rx, tx } + } + } + + impl Default for FakeEventSource { + fn default() -> Self { + Self::new() + } + } + + impl FakeEventSourceHandle { + fn new(broker: Arc>) -> Self { + Self { broker } + } + + fn send(&self, event: EventResult) { + let mut state = self + .broker + .state + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let Some(source) = state.active_event_source_mut() else { + return; + }; + let _ = source.tx.send(event); + } + } + + impl EventSource for FakeEventSource { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().rx).poll_recv(cx) + } + } + + fn make_stream( + broker: Arc>, + draw_rx: broadcast::Receiver<()>, + terminal_focused: Arc, + ) -> TuiEventStream { + TuiEventStream::new( + broker, + draw_rx, + terminal_focused, + #[cfg(unix)] + crate::tui::job_control::SuspendContext::new(), + #[cfg(unix)] + Arc::new(AtomicBool::new(false)), + ) + } + + type SetupState = ( + Arc>, + FakeEventSourceHandle, + broadcast::Sender<()>, + broadcast::Receiver<()>, + Arc, + ); + + fn setup() -> SetupState { + let source = FakeEventSource::new(); + let broker = Arc::new(EventBroker::new()); + *broker.state.lock().unwrap() = EventBrokerState::Running(source); + let handle = FakeEventSourceHandle::new(broker.clone()); + + let (draw_tx, draw_rx) = broadcast::channel(1); + let terminal_focused = Arc::new(AtomicBool::new(true)); + (broker, handle, draw_tx, draw_rx, terminal_focused) + } + + #[tokio::test(flavor = "current_thread")] + async fn key_event_skips_unmapped() { + let (broker, handle, _draw_tx, draw_rx, terminal_focused) = setup(); + let mut stream = make_stream(broker, draw_rx, terminal_focused); + + handle.send(Ok(Event::FocusLost)); + handle.send(Ok(Event::Key(KeyEvent::new( + KeyCode::Char('a'), + KeyModifiers::NONE, + )))); + + let next = stream.next().await.unwrap(); + match next { + TuiEvent::Key(key) => { + assert_eq!(key, KeyEvent::new(KeyCode::Char('a'), KeyModifiers::NONE)); + } + other => panic!("expected key event, got {other:?}"), + } + } + + #[tokio::test(flavor = "current_thread")] + async fn draw_and_key_events_yield_both() { + let (broker, handle, draw_tx, draw_rx, terminal_focused) = setup(); + let mut stream = make_stream(broker, draw_rx, terminal_focused); + + let expected_key = KeyEvent::new(KeyCode::Char('a'), KeyModifiers::NONE); + let _ = draw_tx.send(()); + handle.send(Ok(Event::Key(expected_key))); + + let first = stream.next().await.unwrap(); + let second = stream.next().await.unwrap(); + + let mut saw_draw = false; + let mut saw_key = false; + for event in [first, second] { + match event { + TuiEvent::Draw => { + saw_draw = true; + } + TuiEvent::Key(key) => { + assert_eq!(key, expected_key); + saw_key = true; + } + other => panic!("expected draw or key event, got {other:?}"), + } + } + + assert!(saw_draw && saw_key, "expected both draw and key events"); + } + + #[tokio::test(flavor = "current_thread")] + async fn lagged_draw_maps_to_draw() { + let (broker, _handle, draw_tx, draw_rx, terminal_focused) = setup(); + let mut stream = make_stream(broker, draw_rx.resubscribe(), terminal_focused); + + // Fill channel to force Lagged on the receiver. + let _ = draw_tx.send(()); + let _ = draw_tx.send(()); + + let first = stream.next().await; + assert!(matches!(first, Some(TuiEvent::Draw))); + } + + #[tokio::test(flavor = "current_thread")] + async fn error_or_eof_ends_stream() { + let (broker, handle, _draw_tx, draw_rx, terminal_focused) = setup(); + let mut stream = make_stream(broker, draw_rx, terminal_focused); + + handle.send(Err(std::io::Error::other("boom"))); + + let next = stream.next().await; + assert!(next.is_none()); + } + + #[tokio::test(flavor = "current_thread")] + async fn resume_wakes_paused_stream() { + let (broker, handle, _draw_tx, draw_rx, terminal_focused) = setup(); + let mut stream = make_stream(broker.clone(), draw_rx, terminal_focused); + + broker.pause_events(); + + let task = tokio::spawn(async move { stream.next().await }); + tokio::task::yield_now().await; + + broker.resume_events(); + let expected_key = KeyEvent::new(KeyCode::Char('r'), KeyModifiers::NONE); + handle.send(Ok(Event::Key(expected_key))); + + let event = timeout(Duration::from_millis(100), task) + .await + .expect("timed out waiting for resumed event") + .expect("join failed"); + match event { + Some(TuiEvent::Key(key)) => assert_eq!(key, expected_key), + other => panic!("expected key event, got {other:?}"), + } + } + + #[tokio::test(flavor = "current_thread")] + async fn resume_wakes_pending_stream() { + let (broker, handle, _draw_tx, draw_rx, terminal_focused) = setup(); + let mut stream = make_stream(broker.clone(), draw_rx, terminal_focused); + + let task = tokio::spawn(async move { stream.next().await }); + tokio::task::yield_now().await; + + broker.pause_events(); + broker.resume_events(); + let expected_key = KeyEvent::new(KeyCode::Char('p'), KeyModifiers::NONE); + handle.send(Ok(Event::Key(expected_key))); + + let event = timeout(Duration::from_millis(100), task) + .await + .expect("timed out waiting for resumed event") + .expect("join failed"); + match event { + Some(TuiEvent::Key(key)) => assert_eq!(key, expected_key), + other => panic!("expected key event, got {other:?}"), + } + } +}