refactor TUI event loop to enable dropping + recreating crossterm event stream (#7961)
Introduces an `EventBroker` between the crossterm `EventStream` source and the consumers in the TUI. This enables dropping + recreating the `crossterm_events` without invalidating the consumer. Dropping and recreating the crossterm event stream enables us to fully relinquish `stdin` while the app keeps running. If the stream is not dropped, it will continue to read from `stdin` even when it is not actively being polled, potentially stealing input from other processes. See [here](https://www.reddit.com/r/rust/comments/1f3o33u/myterious_crossterm_input_after_running_vim/?utm_source=chatgpt.com) and [here](https://ratatui.rs/recipes/apps/spawn-vim/) for details. ### Tests Added tests for new `EventBroker` setup, existing tests pass, tested locally.
This commit is contained in:
parent
bef36f4ae7
commit
cf44511e77
4 changed files with 545 additions and 80 deletions
2
codex-rs/Cargo.lock
generated
2
codex-rs/Cargo.lock
generated
|
|
@ -1701,7 +1701,6 @@ dependencies = [
|
|||
"anyhow",
|
||||
"arboard",
|
||||
"assert_matches",
|
||||
"async-stream",
|
||||
"base64",
|
||||
"chrono",
|
||||
"clap",
|
||||
|
|
@ -6910,6 +6909,7 @@ dependencies = [
|
|||
"futures-core",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
|||
|
|
@ -23,7 +23,6 @@ workspace = true
|
|||
|
||||
[dependencies]
|
||||
anyhow = { workspace = true }
|
||||
async-stream = { workspace = true }
|
||||
base64 = { workspace = true }
|
||||
chrono = { workspace = true, features = ["serde"] }
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
|
|
@ -81,7 +80,7 @@ tokio = { workspace = true, features = [
|
|||
"test-util",
|
||||
"time",
|
||||
] }
|
||||
tokio-stream = { workspace = true }
|
||||
tokio-stream = { workspace = true, features = ["sync"] }
|
||||
toml = { workspace = true }
|
||||
tracing = { workspace = true, features = ["log"] }
|
||||
tracing-appender = { workspace = true }
|
||||
|
|
|
|||
|
|
@ -16,7 +16,6 @@ use crossterm::event::DisableBracketedPaste;
|
|||
use crossterm::event::DisableFocusChange;
|
||||
use crossterm::event::EnableBracketedPaste;
|
||||
use crossterm::event::EnableFocusChange;
|
||||
use crossterm::event::Event;
|
||||
use crossterm::event::KeyEvent;
|
||||
use crossterm::event::KeyboardEnhancementFlags;
|
||||
use crossterm::event::PopKeyboardEnhancementFlags;
|
||||
|
|
@ -32,7 +31,6 @@ use ratatui::crossterm::terminal::enable_raw_mode;
|
|||
use ratatui::layout::Offset;
|
||||
use ratatui::layout::Rect;
|
||||
use ratatui::text::Line;
|
||||
use tokio::select;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio_stream::Stream;
|
||||
|
||||
|
|
@ -42,11 +40,12 @@ use crate::custom_terminal::Terminal as CustomTerminal;
|
|||
use crate::notifications::DesktopNotificationBackend;
|
||||
use crate::notifications::NotificationBackendKind;
|
||||
use crate::notifications::detect_backend;
|
||||
#[cfg(unix)]
|
||||
use crate::tui::job_control::SUSPEND_KEY;
|
||||
use crate::tui::event_stream::EventBroker;
|
||||
use crate::tui::event_stream::TuiEventStream;
|
||||
#[cfg(unix)]
|
||||
use crate::tui::job_control::SuspendContext;
|
||||
|
||||
mod event_stream;
|
||||
mod frame_requester;
|
||||
#[cfg(unix)]
|
||||
mod job_control;
|
||||
|
|
@ -156,7 +155,7 @@ fn set_panic_hook() {
|
|||
}));
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum TuiEvent {
|
||||
Key(KeyEvent),
|
||||
Paste(String),
|
||||
|
|
@ -166,6 +165,7 @@ pub enum TuiEvent {
|
|||
pub struct Tui {
|
||||
frame_requester: FrameRequester,
|
||||
draw_tx: broadcast::Sender<()>,
|
||||
event_broker: Arc<EventBroker>,
|
||||
pub(crate) terminal: Terminal,
|
||||
pending_history_lines: Vec<Line<'static>>,
|
||||
alt_saved_viewport: Option<ratatui::layout::Rect>,
|
||||
|
|
@ -194,6 +194,7 @@ impl Tui {
|
|||
Self {
|
||||
frame_requester,
|
||||
draw_tx,
|
||||
event_broker: Arc::new(EventBroker::new()),
|
||||
terminal,
|
||||
pending_history_lines: vec![],
|
||||
alt_saved_viewport: None,
|
||||
|
|
@ -214,6 +215,18 @@ impl Tui {
|
|||
self.enhanced_keys_supported
|
||||
}
|
||||
|
||||
// todo(sayan) unused for now; intend to use to enable opening external editors
|
||||
#[allow(unused)]
|
||||
pub fn pause_events(&mut self) {
|
||||
self.event_broker.pause_events();
|
||||
}
|
||||
|
||||
// todo(sayan) unused for now; intend to use to enable opening external editors
|
||||
#[allow(unused)]
|
||||
pub fn resume_events(&mut self) {
|
||||
self.event_broker.resume_events();
|
||||
}
|
||||
|
||||
/// Emit a desktop notification now if the terminal is unfocused.
|
||||
/// Returns true if a notification was posted.
|
||||
pub fn notify(&mut self, message: impl AsRef<str>) -> bool {
|
||||
|
|
@ -262,79 +275,21 @@ impl Tui {
|
|||
}
|
||||
|
||||
pub fn event_stream(&self) -> Pin<Box<dyn Stream<Item = TuiEvent> + Send + 'static>> {
|
||||
use tokio_stream::StreamExt;
|
||||
|
||||
let mut crossterm_events = crossterm::event::EventStream::new();
|
||||
let mut draw_rx = self.draw_tx.subscribe();
|
||||
|
||||
// State for tracking how we should resume from ^Z suspend.
|
||||
#[cfg(unix)]
|
||||
let suspend_context = self.suspend_context.clone();
|
||||
#[cfg(unix)]
|
||||
let alt_screen_active = self.alt_screen_active.clone();
|
||||
|
||||
let terminal_focused = self.terminal_focused.clone();
|
||||
let event_stream = async_stream::stream! {
|
||||
loop {
|
||||
select! {
|
||||
event_result = crossterm_events.next() => {
|
||||
match event_result {
|
||||
Some(Ok(event)) => {
|
||||
match event {
|
||||
Event::Key(key_event) => {
|
||||
#[cfg(unix)]
|
||||
if SUSPEND_KEY.is_press(key_event) {
|
||||
let _ = suspend_context.suspend(&alt_screen_active);
|
||||
// We continue here after resume.
|
||||
yield TuiEvent::Draw;
|
||||
continue;
|
||||
}
|
||||
yield TuiEvent::Key(key_event);
|
||||
}
|
||||
Event::Resize(_, _) => {
|
||||
yield TuiEvent::Draw;
|
||||
}
|
||||
Event::Paste(pasted) => {
|
||||
yield TuiEvent::Paste(pasted);
|
||||
}
|
||||
Event::FocusGained => {
|
||||
terminal_focused.store(true, Ordering::Relaxed);
|
||||
crate::terminal_palette::requery_default_colors();
|
||||
yield TuiEvent::Draw;
|
||||
}
|
||||
Event::FocusLost => {
|
||||
terminal_focused.store(false, Ordering::Relaxed);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
Some(Err(_)) | None => {
|
||||
// Exit the loop in case of broken pipe as we will never
|
||||
// recover from it
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
result = draw_rx.recv() => {
|
||||
match result {
|
||||
Ok(_) => {
|
||||
yield TuiEvent::Draw;
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
|
||||
// We dropped one or more draw notifications; coalesce to a single draw.
|
||||
yield TuiEvent::Draw;
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
||||
// Sender dropped. This stream likely outlived its owning `Tui`;
|
||||
// exit to avoid spinning on a permanently-closed receiver.
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
Box::pin(event_stream)
|
||||
let stream = TuiEventStream::new(
|
||||
self.event_broker.clone(),
|
||||
self.draw_tx.subscribe(),
|
||||
self.terminal_focused.clone(),
|
||||
self.suspend_context.clone(),
|
||||
self.alt_screen_active.clone(),
|
||||
);
|
||||
#[cfg(not(unix))]
|
||||
let stream = TuiEventStream::new(
|
||||
self.event_broker.clone(),
|
||||
self.draw_tx.subscribe(),
|
||||
self.terminal_focused.clone(),
|
||||
);
|
||||
Box::pin(stream)
|
||||
}
|
||||
|
||||
/// Enter alternate screen and expand the viewport to full terminal size, saving the current
|
||||
|
|
|
|||
511
codex-rs/tui/src/tui/event_stream.rs
Normal file
511
codex-rs/tui/src/tui/event_stream.rs
Normal file
|
|
@ -0,0 +1,511 @@
|
|||
//! Event stream plumbing for the TUI.
|
||||
//!
|
||||
//! - [`EventBroker`] holds the shared crossterm stream so multiple callers reuse the same
|
||||
//! input source and can drop/recreate it on pause/resume without rebuilding consumers.
|
||||
//! - [`TuiEventStream`] wraps a draw event subscription plus the shared [`EventBroker`] and maps crossterm
|
||||
//! events into [`TuiEvent`].
|
||||
//! - [`EventSource`] abstracts the underlying event producer; the real implementation is
|
||||
//! [`CrosstermEventSource`] and tests can swap in [`FakeEventSource`].
|
||||
//!
|
||||
//! The motivation for dropping/recreating the crossterm event stream is to enable the TUI to fully relinquish stdin.
|
||||
//! If the stream is not dropped, it will continue to read from stdin even if it is not actively being polled
|
||||
//! (due to how crossterm's EventStream is implemented), potentially stealing input from other processes reading stdin,
|
||||
//! like terminal text editors. This race can cause missed input or capturing terminal query responses (for example, OSC palette/size queries)
|
||||
//! that the other process expects to read. Stopping polling, instead of dropping the stream, is only sufficient when the
|
||||
//! pause happens before the stream enters a pending state; otherwise the crossterm reader thread may keep reading
|
||||
//! from stdin, so the safer approach is to drop and recreate the event stream when we need to hand off the terminal.
|
||||
//!
|
||||
//! See https://ratatui.rs/recipes/apps/spawn-vim/ and https://www.reddit.com/r/rust/comments/1f3o33u/myterious_crossterm_input_after_running_vim for more details.
|
||||
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
use crossterm::event::Event;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::watch;
|
||||
use tokio_stream::Stream;
|
||||
use tokio_stream::wrappers::BroadcastStream;
|
||||
use tokio_stream::wrappers::WatchStream;
|
||||
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
|
||||
|
||||
use super::TuiEvent;
|
||||
|
||||
/// Result type produced by an event source.
|
||||
pub type EventResult = std::io::Result<Event>;
|
||||
|
||||
/// Abstraction over a source of terminal events. Allows swapping in a fake for tests.
|
||||
/// Value in production is [`CrosstermEventSource`].
|
||||
pub trait EventSource: Send + 'static {
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<EventResult>>;
|
||||
}
|
||||
|
||||
/// Shared crossterm input state for all [`TuiEventStream`] instances. A single crossterm EventStream
|
||||
/// is reused so all streams still see the same input source.
|
||||
///
|
||||
/// This intermediate layer enables dropping/recreating the underlying EventStream (pause/resume) without rebuilding consumers.
|
||||
pub struct EventBroker<S: EventSource = CrosstermEventSource> {
|
||||
state: Mutex<EventBrokerState<S>>,
|
||||
resume_events_tx: watch::Sender<()>,
|
||||
}
|
||||
|
||||
/// Tracks state of underlying [`EventSource`].
|
||||
enum EventBrokerState<S: EventSource> {
|
||||
Paused, // Underlying event source (i.e., crossterm EventStream) dropped
|
||||
Start, // A new event source will be created on next poll
|
||||
Running(S), // Event source is currently running
|
||||
}
|
||||
|
||||
impl<S: EventSource + Default> EventBrokerState<S> {
|
||||
/// Return the running event source, starting it if needed; None when paused.
|
||||
fn active_event_source_mut(&mut self) -> Option<&mut S> {
|
||||
match self {
|
||||
EventBrokerState::Paused => None,
|
||||
EventBrokerState::Start => {
|
||||
*self = EventBrokerState::Running(S::default());
|
||||
match self {
|
||||
EventBrokerState::Running(events) => Some(events),
|
||||
EventBrokerState::Paused | EventBrokerState::Start => unreachable!(),
|
||||
}
|
||||
}
|
||||
EventBrokerState::Running(events) => Some(events),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: EventSource + Default> EventBroker<S> {
|
||||
pub fn new() -> Self {
|
||||
let (resume_events_tx, _resume_events_rx) = watch::channel(());
|
||||
Self {
|
||||
state: Mutex::new(EventBrokerState::Start),
|
||||
resume_events_tx,
|
||||
}
|
||||
}
|
||||
|
||||
/// Drop the underlying event source
|
||||
pub fn pause_events(&self) {
|
||||
let mut state = self
|
||||
.state
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
*state = EventBrokerState::Paused;
|
||||
}
|
||||
|
||||
/// Create a new instance of the underlying event source
|
||||
pub fn resume_events(&self) {
|
||||
let mut state = self
|
||||
.state
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
*state = EventBrokerState::Start;
|
||||
let _ = self.resume_events_tx.send(());
|
||||
}
|
||||
|
||||
/// Subscribe to a notification that fires whenever [`Self::resume_events`] is called.
|
||||
///
|
||||
/// This is used to wake `poll_crossterm_event` when it is paused and waiting for the
|
||||
/// underlying crossterm stream to be recreated.
|
||||
pub fn resume_events_rx(&self) -> watch::Receiver<()> {
|
||||
self.resume_events_tx.subscribe()
|
||||
}
|
||||
}
|
||||
|
||||
/// Real crossterm-backed event source.
|
||||
pub struct CrosstermEventSource(pub crossterm::event::EventStream);
|
||||
|
||||
impl Default for CrosstermEventSource {
|
||||
fn default() -> Self {
|
||||
Self(crossterm::event::EventStream::new())
|
||||
}
|
||||
}
|
||||
|
||||
impl EventSource for CrosstermEventSource {
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<EventResult>> {
|
||||
Pin::new(&mut self.get_mut().0).poll_next(cx)
|
||||
}
|
||||
}
|
||||
|
||||
/// TuiEventStream is a struct for reading TUI events (draws and user input).
|
||||
/// Each instance has its own draw subscription (the draw channel is broadcast, so
|
||||
/// multiple receivers are fine), while crossterm input is funneled through a
|
||||
/// single shared [`EventBroker`] because crossterm uses a global stdin reader and
|
||||
/// does not support fan-out. Multiple TuiEventStream instances can exist during the app lifetime
|
||||
/// (for nested or sequential screens), but only one should be polled at a time,
|
||||
/// otherwise one instance can consume ("steal") input events and the other will miss them.
|
||||
pub struct TuiEventStream<S: EventSource + Default + Unpin = CrosstermEventSource> {
|
||||
broker: Arc<EventBroker<S>>,
|
||||
draw_stream: BroadcastStream<()>,
|
||||
resume_stream: WatchStream<()>,
|
||||
terminal_focused: Arc<AtomicBool>,
|
||||
poll_draw_first: bool,
|
||||
#[cfg(unix)]
|
||||
suspend_context: crate::tui::job_control::SuspendContext,
|
||||
#[cfg(unix)]
|
||||
alt_screen_active: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl<S: EventSource + Default + Unpin> TuiEventStream<S> {
|
||||
pub fn new(
|
||||
broker: Arc<EventBroker<S>>,
|
||||
draw_rx: broadcast::Receiver<()>,
|
||||
terminal_focused: Arc<AtomicBool>,
|
||||
#[cfg(unix)] suspend_context: crate::tui::job_control::SuspendContext,
|
||||
#[cfg(unix)] alt_screen_active: Arc<AtomicBool>,
|
||||
) -> Self {
|
||||
let resume_stream = WatchStream::from_changes(broker.resume_events_rx());
|
||||
Self {
|
||||
broker,
|
||||
draw_stream: BroadcastStream::new(draw_rx),
|
||||
resume_stream,
|
||||
terminal_focused,
|
||||
poll_draw_first: false,
|
||||
#[cfg(unix)]
|
||||
suspend_context,
|
||||
#[cfg(unix)]
|
||||
alt_screen_active,
|
||||
}
|
||||
}
|
||||
|
||||
/// Poll the shared crossterm stream for the next mapped `TuiEvent`.
|
||||
///
|
||||
/// This skips events we don't use (mouse events, etc.) and keeps polling until it yields
|
||||
/// a mapped event, hits `Pending`, or sees EOF/error. When the broker is paused, it drops
|
||||
/// the underlying stream and returns `Pending` to fully release stdin.
|
||||
pub fn poll_crossterm_event(&mut self, cx: &mut Context<'_>) -> Poll<Option<TuiEvent>> {
|
||||
// Some crossterm events map to None (e.g. FocusLost, mouse); loop so we keep polling
|
||||
// until we return a mapped event, hit Pending, or see EOF/error.
|
||||
loop {
|
||||
let poll_result = {
|
||||
let mut state = self
|
||||
.broker
|
||||
.state
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
let events = match state.active_event_source_mut() {
|
||||
Some(events) => events,
|
||||
None => {
|
||||
drop(state);
|
||||
// Poll resume_stream so resume_events wakes a stream paused here
|
||||
match Pin::new(&mut self.resume_stream).poll_next(cx) {
|
||||
Poll::Ready(Some(())) => continue,
|
||||
Poll::Ready(None) => return Poll::Ready(None),
|
||||
Poll::Pending => return Poll::Pending,
|
||||
}
|
||||
}
|
||||
};
|
||||
match Pin::new(events).poll_next(cx) {
|
||||
Poll::Ready(Some(Ok(event))) => Some(event),
|
||||
Poll::Ready(Some(Err(_))) | Poll::Ready(None) => {
|
||||
*state = EventBrokerState::Start;
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
Poll::Pending => {
|
||||
drop(state);
|
||||
// Poll resume_stream so resume_events can wake us even while waiting on stdin
|
||||
match Pin::new(&mut self.resume_stream).poll_next(cx) {
|
||||
Poll::Ready(Some(())) => continue,
|
||||
Poll::Ready(None) => return Poll::Ready(None),
|
||||
Poll::Pending => return Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(mapped) = poll_result.and_then(|event| self.map_crossterm_event(event)) {
|
||||
return Poll::Ready(Some(mapped));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Poll the draw broadcast stream for the next draw event. Draw events are used to trigger a redraw of the TUI.
|
||||
pub fn poll_draw_event(&mut self, cx: &mut Context<'_>) -> Poll<Option<TuiEvent>> {
|
||||
match Pin::new(&mut self.draw_stream).poll_next(cx) {
|
||||
Poll::Ready(Some(Ok(()))) => Poll::Ready(Some(TuiEvent::Draw)),
|
||||
Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(_)))) => {
|
||||
Poll::Ready(Some(TuiEvent::Draw))
|
||||
}
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
|
||||
/// Map a crossterm event to a [`TuiEvent`], skipping events we don't use (mouse events, etc.).
|
||||
fn map_crossterm_event(&mut self, event: Event) -> Option<TuiEvent> {
|
||||
match event {
|
||||
Event::Key(key_event) => {
|
||||
#[cfg(unix)]
|
||||
if crate::tui::job_control::SUSPEND_KEY.is_press(key_event) {
|
||||
let _ = self.suspend_context.suspend(&self.alt_screen_active);
|
||||
return Some(TuiEvent::Draw);
|
||||
}
|
||||
Some(TuiEvent::Key(key_event))
|
||||
}
|
||||
Event::Resize(_, _) => Some(TuiEvent::Draw),
|
||||
Event::Paste(pasted) => Some(TuiEvent::Paste(pasted)),
|
||||
Event::FocusGained => {
|
||||
self.terminal_focused.store(true, Ordering::Relaxed);
|
||||
crate::terminal_palette::requery_default_colors();
|
||||
Some(TuiEvent::Draw)
|
||||
}
|
||||
Event::FocusLost => {
|
||||
self.terminal_focused.store(false, Ordering::Relaxed);
|
||||
None
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: EventSource + Default + Unpin> Unpin for TuiEventStream<S> {}
|
||||
|
||||
impl<S: EventSource + Default + Unpin> Stream for TuiEventStream<S> {
|
||||
type Item = TuiEvent;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
// approximate fairness + no starvation via round-robin.
|
||||
let draw_first = self.poll_draw_first;
|
||||
self.poll_draw_first = !self.poll_draw_first;
|
||||
|
||||
if draw_first {
|
||||
if let Poll::Ready(event) = self.poll_draw_event(cx) {
|
||||
return Poll::Ready(event);
|
||||
}
|
||||
if let Poll::Ready(event) = self.poll_crossterm_event(cx) {
|
||||
return Poll::Ready(event);
|
||||
}
|
||||
} else {
|
||||
if let Poll::Ready(event) = self.poll_crossterm_event(cx) {
|
||||
return Poll::Ready(event);
|
||||
}
|
||||
if let Poll::Ready(event) = self.poll_draw_event(cx) {
|
||||
return Poll::Ready(event);
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crossterm::event::Event;
|
||||
use crossterm::event::KeyCode;
|
||||
use crossterm::event::KeyEvent;
|
||||
use crossterm::event::KeyModifiers;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::timeout;
|
||||
use tokio_stream::StreamExt;
|
||||
|
||||
/// Simple fake event source for tests; feed events via the handle.
|
||||
struct FakeEventSource {
|
||||
rx: mpsc::UnboundedReceiver<EventResult>,
|
||||
tx: mpsc::UnboundedSender<EventResult>,
|
||||
}
|
||||
|
||||
struct FakeEventSourceHandle {
|
||||
broker: Arc<EventBroker<FakeEventSource>>,
|
||||
}
|
||||
|
||||
impl FakeEventSource {
|
||||
fn new() -> Self {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
Self { rx, tx }
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for FakeEventSource {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl FakeEventSourceHandle {
|
||||
fn new(broker: Arc<EventBroker<FakeEventSource>>) -> Self {
|
||||
Self { broker }
|
||||
}
|
||||
|
||||
fn send(&self, event: EventResult) {
|
||||
let mut state = self
|
||||
.broker
|
||||
.state
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
let Some(source) = state.active_event_source_mut() else {
|
||||
return;
|
||||
};
|
||||
let _ = source.tx.send(event);
|
||||
}
|
||||
}
|
||||
|
||||
impl EventSource for FakeEventSource {
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<EventResult>> {
|
||||
Pin::new(&mut self.get_mut().rx).poll_recv(cx)
|
||||
}
|
||||
}
|
||||
|
||||
fn make_stream(
|
||||
broker: Arc<EventBroker<FakeEventSource>>,
|
||||
draw_rx: broadcast::Receiver<()>,
|
||||
terminal_focused: Arc<AtomicBool>,
|
||||
) -> TuiEventStream<FakeEventSource> {
|
||||
TuiEventStream::new(
|
||||
broker,
|
||||
draw_rx,
|
||||
terminal_focused,
|
||||
#[cfg(unix)]
|
||||
crate::tui::job_control::SuspendContext::new(),
|
||||
#[cfg(unix)]
|
||||
Arc::new(AtomicBool::new(false)),
|
||||
)
|
||||
}
|
||||
|
||||
type SetupState = (
|
||||
Arc<EventBroker<FakeEventSource>>,
|
||||
FakeEventSourceHandle,
|
||||
broadcast::Sender<()>,
|
||||
broadcast::Receiver<()>,
|
||||
Arc<AtomicBool>,
|
||||
);
|
||||
|
||||
fn setup() -> SetupState {
|
||||
let source = FakeEventSource::new();
|
||||
let broker = Arc::new(EventBroker::new());
|
||||
*broker.state.lock().unwrap() = EventBrokerState::Running(source);
|
||||
let handle = FakeEventSourceHandle::new(broker.clone());
|
||||
|
||||
let (draw_tx, draw_rx) = broadcast::channel(1);
|
||||
let terminal_focused = Arc::new(AtomicBool::new(true));
|
||||
(broker, handle, draw_tx, draw_rx, terminal_focused)
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn key_event_skips_unmapped() {
|
||||
let (broker, handle, _draw_tx, draw_rx, terminal_focused) = setup();
|
||||
let mut stream = make_stream(broker, draw_rx, terminal_focused);
|
||||
|
||||
handle.send(Ok(Event::FocusLost));
|
||||
handle.send(Ok(Event::Key(KeyEvent::new(
|
||||
KeyCode::Char('a'),
|
||||
KeyModifiers::NONE,
|
||||
))));
|
||||
|
||||
let next = stream.next().await.unwrap();
|
||||
match next {
|
||||
TuiEvent::Key(key) => {
|
||||
assert_eq!(key, KeyEvent::new(KeyCode::Char('a'), KeyModifiers::NONE));
|
||||
}
|
||||
other => panic!("expected key event, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn draw_and_key_events_yield_both() {
|
||||
let (broker, handle, draw_tx, draw_rx, terminal_focused) = setup();
|
||||
let mut stream = make_stream(broker, draw_rx, terminal_focused);
|
||||
|
||||
let expected_key = KeyEvent::new(KeyCode::Char('a'), KeyModifiers::NONE);
|
||||
let _ = draw_tx.send(());
|
||||
handle.send(Ok(Event::Key(expected_key)));
|
||||
|
||||
let first = stream.next().await.unwrap();
|
||||
let second = stream.next().await.unwrap();
|
||||
|
||||
let mut saw_draw = false;
|
||||
let mut saw_key = false;
|
||||
for event in [first, second] {
|
||||
match event {
|
||||
TuiEvent::Draw => {
|
||||
saw_draw = true;
|
||||
}
|
||||
TuiEvent::Key(key) => {
|
||||
assert_eq!(key, expected_key);
|
||||
saw_key = true;
|
||||
}
|
||||
other => panic!("expected draw or key event, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
assert!(saw_draw && saw_key, "expected both draw and key events");
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn lagged_draw_maps_to_draw() {
|
||||
let (broker, _handle, draw_tx, draw_rx, terminal_focused) = setup();
|
||||
let mut stream = make_stream(broker, draw_rx.resubscribe(), terminal_focused);
|
||||
|
||||
// Fill channel to force Lagged on the receiver.
|
||||
let _ = draw_tx.send(());
|
||||
let _ = draw_tx.send(());
|
||||
|
||||
let first = stream.next().await;
|
||||
assert!(matches!(first, Some(TuiEvent::Draw)));
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn error_or_eof_ends_stream() {
|
||||
let (broker, handle, _draw_tx, draw_rx, terminal_focused) = setup();
|
||||
let mut stream = make_stream(broker, draw_rx, terminal_focused);
|
||||
|
||||
handle.send(Err(std::io::Error::other("boom")));
|
||||
|
||||
let next = stream.next().await;
|
||||
assert!(next.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn resume_wakes_paused_stream() {
|
||||
let (broker, handle, _draw_tx, draw_rx, terminal_focused) = setup();
|
||||
let mut stream = make_stream(broker.clone(), draw_rx, terminal_focused);
|
||||
|
||||
broker.pause_events();
|
||||
|
||||
let task = tokio::spawn(async move { stream.next().await });
|
||||
tokio::task::yield_now().await;
|
||||
|
||||
broker.resume_events();
|
||||
let expected_key = KeyEvent::new(KeyCode::Char('r'), KeyModifiers::NONE);
|
||||
handle.send(Ok(Event::Key(expected_key)));
|
||||
|
||||
let event = timeout(Duration::from_millis(100), task)
|
||||
.await
|
||||
.expect("timed out waiting for resumed event")
|
||||
.expect("join failed");
|
||||
match event {
|
||||
Some(TuiEvent::Key(key)) => assert_eq!(key, expected_key),
|
||||
other => panic!("expected key event, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn resume_wakes_pending_stream() {
|
||||
let (broker, handle, _draw_tx, draw_rx, terminal_focused) = setup();
|
||||
let mut stream = make_stream(broker.clone(), draw_rx, terminal_focused);
|
||||
|
||||
let task = tokio::spawn(async move { stream.next().await });
|
||||
tokio::task::yield_now().await;
|
||||
|
||||
broker.pause_events();
|
||||
broker.resume_events();
|
||||
let expected_key = KeyEvent::new(KeyCode::Char('p'), KeyModifiers::NONE);
|
||||
handle.send(Ok(Event::Key(expected_key)));
|
||||
|
||||
let event = timeout(Duration::from_millis(100), task)
|
||||
.await
|
||||
.expect("timed out waiting for resumed event")
|
||||
.expect("join failed");
|
||||
match event {
|
||||
Some(TuiEvent::Key(key)) => assert_eq!(key, expected_key),
|
||||
other => panic!("expected key event, got {other:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue