From b8156706e65a1bce600dcb07d7eca0f8df9b1955 Mon Sep 17 00:00:00 2001 From: Jeremy Rose <172423086+nornagon-openai@users.noreply.github.com> Date: Wed, 28 Jan 2026 10:54:43 -0800 Subject: [PATCH] file-search: improve file query perf (#9939) switch nucleo-matcher for nucleo and use a "file search session" w/ live updating query instead of a single hermetic run per query. --- codex-rs/Cargo.lock | 37 +- codex-rs/Cargo.toml | 3 +- .../tests/suite/fuzzy_file_search.rs | 19 +- codex-rs/core/src/rollout/list.rs | 3 +- codex-rs/file-search/Cargo.toml | 4 +- codex-rs/file-search/src/lib.rs | 945 +++++++++++++----- codex-rs/tui/src/app.rs | 4 +- codex-rs/tui/src/bottom_pane/chat_composer.rs | 31 +- .../tui/src/bottom_pane/file_search_popup.rs | 8 - codex-rs/tui/src/file_search.rs | 233 ++--- 10 files changed, 865 insertions(+), 422 deletions(-) diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 38a86f154..5d17acab9 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1572,11 +1572,13 @@ version = "0.0.0" dependencies = [ "anyhow", "clap", + "crossbeam-channel", "ignore", - "nucleo-matcher", + "nucleo", "pretty_assertions", "serde", "serde_json", + "tempfile", "tokio", ] @@ -4899,11 +4901,20 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "nucleo" +version = "0.5.0" +source = "git+https://github.com/helix-editor/nucleo.git?rev=4253de9faabb4e5c6d81d946a5e35a90f87347ee#4253de9faabb4e5c6d81d946a5e35a90f87347ee" +dependencies = [ + "nucleo-matcher", + "parking_lot", + "rayon", +] + [[package]] name = "nucleo-matcher" version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf33f538733d1a5a3494b836ba913207f14d9d4a1d3cd67030c5061bdd2cac85" +source = "git+https://github.com/helix-editor/nucleo.git?rev=4253de9faabb4e5c6d81d946a5e35a90f87347ee#4253de9faabb4e5c6d81d946a5e35a90f87347ee" dependencies = [ "memchr", "unicode-segmentation", @@ -6331,6 +6342,26 @@ dependencies = [ "ratatui", ] +[[package]] +name = "rayon" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.5.15" diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index ae4eb768b..9b0ce2e16 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -128,6 +128,7 @@ clap = "4" clap_complete = "4" color-eyre = "0.6.3" crossterm = "0.28.1" +crossbeam-channel = "0.5.15" ctor = "0.6.3" derive_more = "2" diffy = "0.4.2" @@ -161,7 +162,7 @@ maplit = "1.0.2" mime_guess = "2.0.5" multimap = "0.10.0" notify = "8.2.0" -nucleo-matcher = "0.3.1" +nucleo = { git = "https://github.com/helix-editor/nucleo.git", rev = "4253de9faabb4e5c6d81d946a5e35a90f87347ee" } once_cell = "1.20.2" openssl-sys = "*" opentelemetry = "0.31.0" diff --git a/codex-rs/app-server/tests/suite/fuzzy_file_search.rs b/codex-rs/app-server/tests/suite/fuzzy_file_search.rs index 9c95e3de3..87fdf3911 100644 --- a/codex-rs/app-server/tests/suite/fuzzy_file_search.rs +++ b/codex-rs/app-server/tests/suite/fuzzy_file_search.rs @@ -48,8 +48,7 @@ async fn test_fuzzy_file_search_sorts_and_includes_indices() -> Result<()> { .await??; let value = resp.result; - // The path separator on Windows affects the score. - let expected_score = if cfg!(windows) { 69 } else { 72 }; + let expected_score = 72; assert_eq!( value, @@ -59,16 +58,9 @@ async fn test_fuzzy_file_search_sorts_and_includes_indices() -> Result<()> { "root": root_path.clone(), "path": "abexy", "file_name": "abexy", - "score": 88, + "score": 84, "indices": [0, 1, 2], }, - { - "root": root_path.clone(), - "path": "abcde", - "file_name": "abcde", - "score": 74, - "indices": [0, 1, 4], - }, { "root": root_path.clone(), "path": sub_abce_rel, @@ -76,6 +68,13 @@ async fn test_fuzzy_file_search_sorts_and_includes_indices() -> Result<()> { "score": expected_score, "indices": [4, 5, 7], }, + { + "root": root_path.clone(), + "path": "abcde", + "file_name": "abcde", + "score": 71, + "indices": [0, 1, 4], + }, ] }) ); diff --git a/codex-rs/core/src/rollout/list.rs b/codex-rs/core/src/rollout/list.rs index 7469bbeb0..edff86606 100644 --- a/codex-rs/core/src/rollout/list.rs +++ b/codex-rs/core/src/rollout/list.rs @@ -1,3 +1,4 @@ +use async_trait::async_trait; use std::cmp::Reverse; use std::ffi::OsStr; use std::io::{self}; @@ -7,8 +8,6 @@ use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::AtomicBool; - -use async_trait::async_trait; use time::OffsetDateTime; use time::PrimitiveDateTime; use time::format_description::FormatItem; diff --git a/codex-rs/file-search/Cargo.toml b/codex-rs/file-search/Cargo.toml index 70ddcf2bb..3802ed5fe 100644 --- a/codex-rs/file-search/Cargo.toml +++ b/codex-rs/file-search/Cargo.toml @@ -15,11 +15,13 @@ path = "src/lib.rs" [dependencies] anyhow = { workspace = true } clap = { workspace = true, features = ["derive"] } +crossbeam-channel = { workspace = true } ignore = { workspace = true } -nucleo-matcher = { workspace = true } +nucleo = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } tokio = { workspace = true, features = ["full"] } [dev-dependencies] pretty_assertions = { workspace = true } +tempfile = { workspace = true } diff --git a/codex-rs/file-search/src/lib.rs b/codex-rs/file-search/src/lib.rs index d55eb929f..39255fa36 100644 --- a/codex-rs/file-search/src/lib.rs +++ b/codex-rs/file-search/src/lib.rs @@ -1,38 +1,54 @@ +use crossbeam_channel::Receiver; +use crossbeam_channel::Sender; +use crossbeam_channel::after; +use crossbeam_channel::never; +use crossbeam_channel::select; +use crossbeam_channel::unbounded; use ignore::WalkBuilder; use ignore::overrides::OverrideBuilder; -use nucleo_matcher::Matcher; -use nucleo_matcher::Utf32Str; -use nucleo_matcher::pattern::AtomKind; -use nucleo_matcher::pattern::CaseMatching; -use nucleo_matcher::pattern::Normalization; -use nucleo_matcher::pattern::Pattern; +use nucleo::Config; +use nucleo::Injector; +use nucleo::Matcher; +use nucleo::Nucleo; +use nucleo::Utf32String; +use nucleo::pattern::CaseMatching; +use nucleo::pattern::Normalization; use serde::Serialize; -use std::cell::UnsafeCell; -use std::cmp::Reverse; -use std::collections::BinaryHeap; use std::num::NonZero; use std::path::Path; +use std::path::PathBuf; use std::sync::Arc; +use std::sync::Condvar; +use std::sync::Mutex; +use std::sync::RwLock; use std::sync::atomic::AtomicBool; -use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; +use std::thread; +use std::time::Duration; use tokio::process::Command; +#[cfg(test)] +use nucleo::Utf32Str; +#[cfg(test)] +use nucleo::pattern::AtomKind; +#[cfg(test)] +use nucleo::pattern::Pattern; + mod cli; pub use cli::Cli; /// A single match result returned from the search. /// -/// * `score` – Relevance score returned by `nucleo_matcher`. +/// * `score` – Relevance score returned by `nucleo`. /// * `path` – Path to the matched file (relative to the search directory). /// * `indices` – Optional list of character indices that matched the query. /// These are only filled when the caller of [`run`] sets /// `compute_indices` to `true`. The indices vector follows the -/// guidance from `nucleo_matcher::Pattern::indices`: they are +/// guidance from `nucleo::pattern::Pattern::indices`: they are /// unique and sorted in ascending order so that callers can use /// them directly for highlighting. -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, PartialEq, Eq)] pub struct FileMatch { pub score: u32, pub path: String, @@ -54,6 +70,127 @@ pub struct FileSearchResults { pub total_match_count: usize, } +#[derive(Debug, Clone, Serialize, PartialEq, Eq, Default)] +pub struct FileSearchSnapshot { + pub query: String, + pub matches: Vec, + pub total_match_count: usize, + pub scanned_file_count: usize, + pub walk_complete: bool, +} + +#[derive(Debug, Clone)] +pub struct SessionOptions { + pub limit: NonZero, + pub exclude: Vec, + pub threads: NonZero, + pub compute_indices: bool, + pub respect_gitignore: bool, +} + +impl Default for SessionOptions { + fn default() -> Self { + Self { + #[expect(clippy::unwrap_used)] + limit: NonZero::new(20).unwrap(), + exclude: Vec::new(), + #[expect(clippy::unwrap_used)] + threads: NonZero::new(2).unwrap(), + compute_indices: false, + respect_gitignore: true, + } + } +} + +pub trait SessionReporter: Send + Sync + 'static { + /// Called when the debounced top-N changes. + fn on_update(&self, snapshot: &FileSearchSnapshot); + + /// Called when the session becomes idle or is cancelled. Guaranteed to be called at least once per update_query. + fn on_complete(&self); +} + +pub struct FileSearchSession { + inner: Arc, +} + +impl FileSearchSession { + /// Update the query. This should be cheap relative to re-walking. + pub fn update_query(&self, pattern_text: &str) { + let _ = self + .inner + .work_tx + .send(WorkSignal::QueryUpdated(pattern_text.to_string())); + } +} + +impl Drop for FileSearchSession { + fn drop(&mut self) { + self.inner.shutdown.store(true, Ordering::Relaxed); + let _ = self.inner.work_tx.send(WorkSignal::Shutdown); + } +} + +pub fn create_session( + search_directory: &Path, + options: SessionOptions, + reporter: Arc, +) -> anyhow::Result { + create_session_inner(search_directory, options, reporter, None) +} + +fn create_session_inner( + search_directory: &Path, + options: SessionOptions, + reporter: Arc, + cancel_flag: Option>, +) -> anyhow::Result { + let SessionOptions { + limit, + exclude, + threads, + compute_indices, + respect_gitignore, + } = options; + + let override_matcher = build_override_matcher(search_directory, &exclude)?; + let (work_tx, work_rx) = unbounded(); + + let notify_tx = work_tx.clone(); + let notify = Arc::new(move || { + let _ = notify_tx.send(WorkSignal::NucleoNotify); + }); + let nucleo = Nucleo::new( + Config::DEFAULT.match_paths(), + notify, + Some(threads.get()), + 1, + ); + let injector = nucleo.injector(); + + let cancelled = cancel_flag.unwrap_or_else(|| Arc::new(AtomicBool::new(false))); + + let inner = Arc::new(SessionInner { + search_directory: search_directory.to_path_buf(), + limit: limit.get(), + threads: threads.get(), + compute_indices, + respect_gitignore, + cancelled: cancelled.clone(), + shutdown: Arc::new(AtomicBool::new(false)), + reporter, + work_tx: work_tx.clone(), + }); + + let matcher_inner = inner.clone(); + thread::spawn(move || matcher_worker(matcher_inner, work_rx, nucleo)); + + let walker_inner = inner.clone(); + thread::spawn(move || walker_worker(walker_inner, override_matcher, injector)); + + Ok(FileSearchSession { inner }) +} + pub trait Reporter { fn report_match(&self, file_match: &FileMatch); fn warn_matches_truncated(&self, total_match_count: usize, shown_match_count: usize); @@ -142,172 +279,31 @@ pub fn run( compute_indices: bool, respect_gitignore: bool, ) -> anyhow::Result { - let pattern = create_pattern(pattern_text); - // Create one BestMatchesList per worker thread so that each worker can - // operate independently. The results across threads will be merged when - // the traversal is complete. - let WorkerCount { - num_walk_builder_threads, - num_best_matches_lists, - } = create_worker_count(threads); - let best_matchers_per_worker: Vec> = (0..num_best_matches_lists) - .map(|_| { - UnsafeCell::new(BestMatchesList::new( - limit.get(), - pattern.clone(), - Matcher::new(nucleo_matcher::Config::DEFAULT), - )) - }) - .collect(); + let reporter = Arc::new(RunReporter::default()); + let session = create_session_inner( + search_directory, + SessionOptions { + limit, + exclude, + threads, + compute_indices, + respect_gitignore, + }, + reporter.clone(), + Some(cancel_flag), + )?; - // Use the same tree-walker library that ripgrep uses. We use it directly so - // that we can leverage the parallelism it provides. - let mut walk_builder = WalkBuilder::new(search_directory); - walk_builder - .threads(num_walk_builder_threads) - // Allow hidden entries. - .hidden(false) - // Follow symlinks to search their contents. - .follow_links(true) - // Don't require git to be present to apply to apply git-related ignore rules. - .require_git(false); - if !respect_gitignore { - walk_builder - .git_ignore(false) - .git_global(false) - .git_exclude(false) - .ignore(false) - .parents(false); - } - - if !exclude.is_empty() { - let mut override_builder = OverrideBuilder::new(search_directory); - for exclude in exclude { - // The `!` prefix is used to indicate an exclude pattern. - let exclude_pattern = format!("!{exclude}"); - override_builder.add(&exclude_pattern)?; - } - let override_matcher = override_builder.build()?; - walk_builder.overrides(override_matcher); - } - let walker = walk_builder.build_parallel(); - - // Each worker created by `WalkParallel::run()` will have its own - // `BestMatchesList` to update. - let index_counter = AtomicUsize::new(0); - walker.run(|| { - let index = index_counter.fetch_add(1, Ordering::Relaxed); - let best_list_ptr = best_matchers_per_worker[index].get(); - let best_list = unsafe { &mut *best_list_ptr }; - - // Each worker keeps a local counter so we only read the atomic flag - // every N entries which is cheaper than checking on every file. - const CHECK_INTERVAL: usize = 1024; - let mut processed = 0; - - let cancel = cancel_flag.clone(); - - Box::new(move |entry| { - if let Some(path) = get_file_path(&entry, search_directory) { - best_list.insert(path); - } - - processed += 1; - if processed % CHECK_INTERVAL == 0 && cancel.load(Ordering::Relaxed) { - ignore::WalkState::Quit - } else { - ignore::WalkState::Continue - } - }) - }); - - fn get_file_path<'a>( - entry_result: &'a Result, - search_directory: &std::path::Path, - ) -> Option<&'a str> { - let entry = match entry_result { - Ok(e) => e, - Err(_) => return None, - }; - if entry.file_type().is_some_and(|ft| ft.is_dir()) { - return None; - } - let path = entry.path(); - match path.strip_prefix(search_directory) { - Ok(rel_path) => rel_path.to_str(), - Err(_) => None, - } - } - - // If the cancel flag is set, we return early with an empty result. - if cancel_flag.load(Ordering::Relaxed) { - return Ok(FileSearchResults { - matches: Vec::new(), - total_match_count: 0, - }); - } - - // Merge results across best_matchers_per_worker. - let mut global_heap: BinaryHeap> = BinaryHeap::new(); - let mut total_match_count = 0; - for best_list_cell in best_matchers_per_worker.iter() { - let best_list = unsafe { &*best_list_cell.get() }; - total_match_count += best_list.num_matches; - for &Reverse((score, ref line)) in best_list.binary_heap.iter() { - if global_heap.len() < limit.get() { - global_heap.push(Reverse((score, line.clone()))); - } else if let Some(min_element) = global_heap.peek() - && score > min_element.0.0 - { - global_heap.pop(); - global_heap.push(Reverse((score, line.clone()))); - } - } - } - - let mut raw_matches: Vec<(u32, String)> = global_heap.into_iter().map(|r| r.0).collect(); - sort_matches(&mut raw_matches); - - // Transform into `FileMatch`, optionally computing indices. - let mut matcher = if compute_indices { - Some(Matcher::new(nucleo_matcher::Config::DEFAULT)) - } else { - None - }; - - let matches: Vec = raw_matches - .into_iter() - .map(|(score, path)| { - let indices = if compute_indices { - let mut buf = Vec::::new(); - let haystack: Utf32Str<'_> = Utf32Str::new(&path, &mut buf); - let mut idx_vec: Vec = Vec::new(); - if let Some(ref mut m) = matcher { - // Ignore the score returned from indices – we already have `score`. - pattern.indices(haystack, m, &mut idx_vec); - } - idx_vec.sort_unstable(); - idx_vec.dedup(); - Some(idx_vec) - } else { - None - }; - - FileMatch { - score, - path, - indices, - } - }) - .collect(); + session.update_query(pattern_text); + let snapshot = reporter.wait_for_complete(); Ok(FileSearchResults { - matches, - total_match_count, + matches: snapshot.matches, + total_match_count: snapshot.total_match_count, }) } /// Sort matches in-place by descending score, then ascending path. +#[cfg(test)] fn sort_matches(matches: &mut [(u32, String)]) { matches.sort_by(cmp_by_score_desc_then_path_asc::<(u32, String), _, _>( |t| t.0, @@ -332,73 +328,7 @@ where } } -/// Maintains the `max_count` best matches for a given pattern. -struct BestMatchesList { - max_count: usize, - num_matches: usize, - pattern: Pattern, - matcher: Matcher, - binary_heap: BinaryHeap>, - - /// Internal buffer for converting strings to UTF-32. - utf32buf: Vec, -} - -impl BestMatchesList { - fn new(max_count: usize, pattern: Pattern, matcher: Matcher) -> Self { - Self { - max_count, - num_matches: 0, - pattern, - matcher, - binary_heap: BinaryHeap::new(), - utf32buf: Vec::::new(), - } - } - - fn insert(&mut self, line: &str) { - let haystack: Utf32Str<'_> = Utf32Str::new(line, &mut self.utf32buf); - if let Some(score) = self.pattern.score(haystack, &mut self.matcher) { - // In the tests below, we verify that score() returns None for a - // non-match, so we can categorically increment the count here. - self.num_matches += 1; - - if self.binary_heap.len() < self.max_count { - self.binary_heap.push(Reverse((score, line.to_string()))); - } else if let Some(min_element) = self.binary_heap.peek() - && score > min_element.0.0 - { - self.binary_heap.pop(); - self.binary_heap.push(Reverse((score, line.to_string()))); - } - } - } -} - -struct WorkerCount { - num_walk_builder_threads: usize, - num_best_matches_lists: usize, -} - -fn create_worker_count(num_workers: NonZero) -> WorkerCount { - // It appears that the number of times the function passed to - // `WalkParallel::run()` is called is: the number of threads specified to - // the builder PLUS ONE. - // - // In `WalkParallel::visit()`, the builder function gets called once here: - // https://github.com/BurntSushi/ripgrep/blob/79cbe89deb1151e703f4d91b19af9cdcc128b765/crates/ignore/src/walk.rs#L1233 - // - // And then once for every worker here: - // https://github.com/BurntSushi/ripgrep/blob/79cbe89deb1151e703f4d91b19af9cdcc128b765/crates/ignore/src/walk.rs#L1288 - let num_walk_builder_threads = num_workers.get(); - let num_best_matches_lists = num_walk_builder_threads + 1; - - WorkerCount { - num_walk_builder_threads, - num_best_matches_lists, - } -} - +#[cfg(test)] fn create_pattern(pattern: &str) -> Pattern { Pattern::new( pattern, @@ -408,16 +338,281 @@ fn create_pattern(pattern: &str) -> Pattern { ) } +struct SessionInner { + search_directory: PathBuf, + limit: usize, + threads: usize, + compute_indices: bool, + respect_gitignore: bool, + cancelled: Arc, + shutdown: Arc, + reporter: Arc, + work_tx: Sender, +} + +enum WorkSignal { + QueryUpdated(String), + NucleoNotify, + WalkComplete, + Shutdown, +} + +fn build_override_matcher( + search_directory: &Path, + exclude: &[String], +) -> anyhow::Result> { + if exclude.is_empty() { + return Ok(None); + } + let mut override_builder = OverrideBuilder::new(search_directory); + for exclude in exclude { + let exclude_pattern = format!("!{exclude}"); + override_builder.add(&exclude_pattern)?; + } + let matcher = override_builder.build()?; + Ok(Some(matcher)) +} + +fn walker_worker( + inner: Arc, + override_matcher: Option, + injector: Injector>, +) { + let mut walk_builder = WalkBuilder::new(&inner.search_directory); + walk_builder + .threads(inner.threads) + // Allow hidden entries. + .hidden(false) + // Follow symlinks to search their contents. + .follow_links(true) + // Don't require git to be present to apply to apply git-related ignore rules. + .require_git(false); + if !inner.respect_gitignore { + walk_builder + .git_ignore(false) + .git_global(false) + .git_exclude(false) + .ignore(false) + .parents(false); + } + if let Some(override_matcher) = override_matcher { + walk_builder.overrides(override_matcher); + } + + let walker = walk_builder.build_parallel(); + + fn get_file_path<'a>( + entry_result: &'a Result, + search_directory: &Path, + ) -> Option<&'a str> { + let entry = match entry_result { + Ok(entry) => entry, + Err(_) => return None, + }; + if entry.file_type().is_some_and(|ft| ft.is_dir()) { + return None; + } + let path = entry.path(); + match path.strip_prefix(search_directory) { + Ok(rel_path) => rel_path.to_str(), + Err(_) => None, + } + } + + walker.run(|| { + const CHECK_INTERVAL: usize = 1024; + let mut n = 0; + let search_directory = inner.search_directory.clone(); + let injector = injector.clone(); + let cancelled = inner.cancelled.clone(); + let shutdown = inner.shutdown.clone(); + + Box::new(move |entry| { + if let Some(path) = get_file_path(&entry, &search_directory) { + injector.push(Arc::from(path), |path, cols| { + cols[0] = Utf32String::from(path.as_ref()); + }); + } + n += 1; + if n >= CHECK_INTERVAL { + if cancelled.load(Ordering::Relaxed) || shutdown.load(Ordering::Relaxed) { + return ignore::WalkState::Quit; + } + n = 0; + } + ignore::WalkState::Continue + }) + }); + let _ = inner.work_tx.send(WorkSignal::WalkComplete); +} + +fn matcher_worker( + inner: Arc, + work_rx: Receiver, + mut nucleo: Nucleo>, +) -> anyhow::Result<()> { + const TICK_TIMEOUT_MS: u64 = 10; + let config = Config::DEFAULT.match_paths(); + let mut indices_matcher = inner.compute_indices.then(|| Matcher::new(config.clone())); + let cancel_requested = || inner.cancelled.load(Ordering::Relaxed); + let shutdown_requested = || inner.shutdown.load(Ordering::Relaxed); + + let mut last_query = String::new(); + let mut next_notify = never(); + let mut will_notify = false; + let mut walk_complete = false; + + loop { + select! { + recv(work_rx) -> signal => { + let Ok(signal) = signal else { + break; + }; + match signal { + WorkSignal::QueryUpdated(query) => { + let append = query.starts_with(&last_query); + nucleo.pattern.reparse( + 0, + &query, + CaseMatching::Smart, + Normalization::Smart, + append, + ); + last_query = query; + will_notify = true; + next_notify = after(Duration::from_millis(0)); + } + WorkSignal::NucleoNotify => { + if !will_notify { + will_notify = true; + next_notify = after(Duration::from_millis(TICK_TIMEOUT_MS)); + } + } + WorkSignal::WalkComplete => { + walk_complete = true; + if !will_notify { + will_notify = true; + next_notify = after(Duration::from_millis(0)); + } + } + WorkSignal::Shutdown => { + break; + } + } + } + recv(next_notify) -> _ => { + will_notify = false; + let status = nucleo.tick(TICK_TIMEOUT_MS); + if status.changed { + let snapshot = nucleo.snapshot(); + let limit = inner.limit.min(snapshot.matched_item_count() as usize); + let pattern = snapshot.pattern().column_pattern(0); + let matches: Vec<_> = snapshot + .matches() + .iter() + .take(limit) + .filter_map(|match_| { + let item = snapshot.get_item(match_.idx)?; + let indices = if let Some(indices_matcher) = indices_matcher.as_mut() { + let mut idx_vec = Vec::::new(); + let haystack = item.matcher_columns[0].slice(..); + let _ = pattern.indices(haystack, indices_matcher, &mut idx_vec); + idx_vec.sort_unstable(); + idx_vec.dedup(); + Some(idx_vec) + } else { + None + }; + Some(FileMatch { + score: match_.score, + path: item.data.as_ref().to_string(), + indices, + }) + }) + .collect(); + + let snapshot = FileSearchSnapshot { + query: last_query.clone(), + matches, + total_match_count: snapshot.matched_item_count() as usize, + scanned_file_count: snapshot.item_count() as usize, + walk_complete, + }; + inner.reporter.on_update(&snapshot); + } + if !status.running && walk_complete { + inner.reporter.on_complete(); + } + } + default(Duration::from_millis(100)) => { + // Occasionally check the cancel flag. + } + } + + if cancel_requested() || shutdown_requested() { + break; + } + } + + // If we cancelled or otherwise exited the loop, make sure the reporter is notified. + inner.reporter.on_complete(); + + Ok(()) +} + +#[derive(Default)] +struct RunReporter { + snapshot: RwLock, + completed: (Condvar, Mutex), +} + +impl SessionReporter for RunReporter { + fn on_update(&self, snapshot: &FileSearchSnapshot) { + #[expect(clippy::unwrap_used)] + let mut guard = self.snapshot.write().unwrap(); + *guard = snapshot.clone(); + } + + fn on_complete(&self) { + let (cv, mutex) = &self.completed; + let mut completed = mutex.lock().unwrap(); + *completed = true; + cv.notify_all(); + } +} + +impl RunReporter { + fn wait_for_complete(&self) -> FileSearchSnapshot { + let (cv, mutex) = &self.completed; + let mut completed = mutex.lock().unwrap(); + while !*completed { + completed = cv.wait(completed).unwrap(); + } + self.snapshot.read().unwrap().clone() + } +} + #[cfg(test)] mod tests { + #![allow(clippy::unwrap_used)] + use super::*; use pretty_assertions::assert_eq; + use std::fs; + use std::sync::Arc; + use std::sync::Condvar; + use std::sync::Mutex; + use std::sync::atomic::AtomicBool; + use std::thread; + use std::time::Duration; + use std::time::Instant; + use tempfile::TempDir; #[test] fn verify_score_is_none_for_non_match() { let mut utf32buf = Vec::::new(); let line = "hello"; - let mut matcher = Matcher::new(nucleo_matcher::Config::DEFAULT); + let mut matcher = Matcher::new(Config::DEFAULT); let haystack: Utf32Str<'_> = Utf32Str::new(line, &mut utf32buf); let pattern = create_pattern("zzz"); let score = pattern.score(haystack, &mut matcher); @@ -453,4 +648,274 @@ mod tests { fn file_name_from_path_falls_back_to_full_path() { assert_eq!(file_name_from_path(""), ""); } + + #[derive(Default)] + struct RecordingReporter { + updates: Mutex>, + complete_times: Mutex>, + complete_cv: Condvar, + update_cv: Condvar, + } + + impl RecordingReporter { + fn wait_for_complete(&self, timeout: Duration) -> bool { + let completes = self.complete_times.lock().unwrap(); + if !completes.is_empty() { + return true; + } + let (completes, _) = self.complete_cv.wait_timeout(completes, timeout).unwrap(); + !completes.is_empty() + } + fn clear(&self) { + self.updates.lock().unwrap().clear(); + self.complete_times.lock().unwrap().clear(); + } + + fn updates(&self) -> Vec { + self.updates.lock().unwrap().clone() + } + + fn wait_for_updates_at_least(&self, min_len: usize, timeout: Duration) -> bool { + let updates = self.updates.lock().unwrap(); + if updates.len() >= min_len { + return true; + } + let (updates, _) = self.update_cv.wait_timeout(updates, timeout).unwrap(); + updates.len() >= min_len + } + + fn snapshot(&self) -> FileSearchSnapshot { + self.updates + .lock() + .unwrap() + .last() + .cloned() + .unwrap_or_default() + } + } + + impl SessionReporter for RecordingReporter { + fn on_update(&self, snapshot: &FileSearchSnapshot) { + let mut updates = self.updates.lock().unwrap(); + updates.push(snapshot.clone()); + self.update_cv.notify_all(); + } + + fn on_complete(&self) { + { + let mut complete_times = self.complete_times.lock().unwrap(); + complete_times.push(Instant::now()); + } + self.complete_cv.notify_all(); + } + } + + fn create_temp_tree(file_count: usize) -> TempDir { + let dir = tempfile::tempdir().unwrap(); + for i in 0..file_count { + let path = dir.path().join(format!("file-{i:04}.txt")); + fs::write(path, format!("contents {i}")).unwrap(); + } + dir + } + + #[test] + fn session_scanned_file_count_is_monotonic_across_queries() { + let dir = create_temp_tree(200); + let reporter = Arc::new(RecordingReporter::default()); + let session = create_session(dir.path(), SessionOptions::default(), reporter.clone()) + .expect("session"); + + session.update_query("file-00"); + thread::sleep(Duration::from_millis(20)); + let first_snapshot = reporter.snapshot(); + session.update_query("file-01"); + thread::sleep(Duration::from_millis(20)); + let second_snapshot = reporter.snapshot(); + let _ = reporter.wait_for_complete(Duration::from_secs(5)); + let completed_snapshot = reporter.snapshot(); + + assert!(second_snapshot.scanned_file_count >= first_snapshot.scanned_file_count); + assert!(completed_snapshot.scanned_file_count >= second_snapshot.scanned_file_count); + } + + #[test] + fn session_streams_updates_before_walk_complete() { + let dir = create_temp_tree(600); + let reporter = Arc::new(RecordingReporter::default()); + let session = create_session(dir.path(), SessionOptions::default(), reporter.clone()) + .expect("session"); + + session.update_query("file-0"); + let completed = reporter.wait_for_complete(Duration::from_secs(5)); + + assert!(completed); + let updates = reporter.updates(); + assert!(updates.iter().any(|snapshot| !snapshot.walk_complete)); + } + + #[test] + fn session_accepts_query_updates_after_walk_complete() { + let dir = tempfile::tempdir().unwrap(); + fs::write(dir.path().join("alpha.txt"), "alpha").unwrap(); + fs::write(dir.path().join("beta.txt"), "beta").unwrap(); + let reporter = Arc::new(RecordingReporter::default()); + let session = create_session(dir.path(), SessionOptions::default(), reporter.clone()) + .expect("session"); + + session.update_query("alpha"); + assert!(reporter.wait_for_complete(Duration::from_secs(5))); + let updates_before = reporter.updates().len(); + + session.update_query("beta"); + assert!(reporter.wait_for_updates_at_least(updates_before + 1, Duration::from_secs(5),)); + + let updates = reporter.updates(); + let last_update = updates.last().cloned().expect("update"); + assert!( + last_update + .matches + .iter() + .any(|file_match| file_match.path.contains("beta.txt")) + ); + } + + #[test] + fn session_emits_complete_when_query_changes_with_no_matches() { + let dir = tempfile::tempdir().unwrap(); + fs::write(dir.path().join("alpha.txt"), "alpha").unwrap(); + fs::write(dir.path().join("beta.txt"), "beta").unwrap(); + let reporter = Arc::new(RecordingReporter::default()); + let session = create_session_inner( + dir.path(), + SessionOptions::default(), + reporter.clone(), + None, + ) + .expect("session"); + + session.update_query("asdf"); + assert!(reporter.wait_for_complete(Duration::from_secs(5))); + + let completed_snapshot = reporter.snapshot(); + assert_eq!(completed_snapshot.matches, Vec::new()); + assert_eq!(completed_snapshot.total_match_count, 0); + + reporter.clear(); + + session.update_query("asdfa"); + assert!(reporter.wait_for_complete(Duration::from_secs(5))); + assert!(!reporter.updates().is_empty()); + } + + #[test] + fn dropping_session_does_not_cancel_siblings_with_shared_cancel_flag() { + let root_a = create_temp_tree(200); + let root_b = create_temp_tree(4_000); + let cancel_flag = Arc::new(AtomicBool::new(false)); + + let reporter_a = Arc::new(RecordingReporter::default()); + let session_a = create_session_inner( + root_a.path(), + SessionOptions::default(), + reporter_a, + Some(cancel_flag.clone()), + ) + .expect("session_a"); + + let reporter_b = Arc::new(RecordingReporter::default()); + let session_b = create_session_inner( + root_b.path(), + SessionOptions::default(), + reporter_b.clone(), + Some(cancel_flag), + ) + .expect("session_b"); + + session_a.update_query("file-0"); + session_b.update_query("file-1"); + + thread::sleep(Duration::from_millis(5)); + drop(session_a); + + let completed = reporter_b.wait_for_complete(Duration::from_secs(5)); + assert_eq!(completed, true); + } + + #[test] + fn session_emits_updates_when_query_changes() { + let dir = create_temp_tree(200); + let reporter = Arc::new(RecordingReporter::default()); + let session = create_session(dir.path(), SessionOptions::default(), reporter.clone()) + .expect("session"); + + session.update_query("zzzzzzzz"); + let completed = reporter.wait_for_complete(Duration::from_secs(5)); + assert!(completed); + + reporter.clear(); + + session.update_query("zzzzzzzzq"); + let completed = reporter.wait_for_complete(Duration::from_secs(5)); + assert!(completed); + + let updates = reporter.updates(); + assert_eq!(updates.len(), 1); + } + + #[test] + fn run_returns_matches_for_query() { + let dir = create_temp_tree(40); + let results = run( + "file-000", + NonZero::new(20).unwrap(), + dir.path(), + Vec::new(), + NonZero::new(2).unwrap(), + Arc::new(AtomicBool::new(false)), + false, + true, + ) + .expect("run ok"); + + assert!(!results.matches.is_empty()); + assert!(results.total_match_count >= results.matches.len()); + assert!( + results + .matches + .iter() + .any(|m| m.path.contains("file-0000.txt")) + ); + } + + #[test] + fn cancel_exits_run() { + let dir = create_temp_tree(200); + let cancel_flag = Arc::new(AtomicBool::new(true)); + let search_dir = dir.path().to_path_buf(); + let (tx, rx) = std::sync::mpsc::channel(); + + let handle = thread::spawn(move || { + let result = run( + "file-", + NonZero::new(20).unwrap(), + &search_dir, + Vec::new(), + NonZero::new(2).unwrap(), + cancel_flag, + false, + true, + ); + let _ = tx.send(result); + }); + + let result = rx + .recv_timeout(Duration::from_secs(2)) + .expect("run should exit after cancellation"); + handle.join().unwrap(); + + let results = result.expect("run ok"); + assert_eq!(results.matches, Vec::new()); + assert_eq!(results.total_match_count, 0); + } } diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 4e9cfb3fc..bf4ab6798 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -1507,9 +1507,7 @@ impl App { tui.frame_requester().schedule_frame(); } AppEvent::StartFileSearch(query) => { - if !query.is_empty() { - self.file_search.on_user_query(query); - } + self.file_search.on_user_query(query); } AppEvent::FileSearchResult { query, matches } => { self.chat_widget.apply_file_search_result(query, matches); diff --git a/codex-rs/tui/src/bottom_pane/chat_composer.rs b/codex-rs/tui/src/bottom_pane/chat_composer.rs index 4c1228cb3..3a3f5d407 100644 --- a/codex-rs/tui/src/bottom_pane/chat_composer.rs +++ b/codex-rs/tui/src/bottom_pane/chat_composer.rs @@ -2380,6 +2380,11 @@ impl ChatComposer { // When browsing input history (shell-style Up/Down recall), skip all popup // synchronization so nothing steals focus from continued history navigation. if browsing_history { + if self.current_file_query.is_some() { + self.app_event_tx + .send(AppEvent::StartFileSearch(String::new())); + self.current_file_query = None; + } self.active_popup = ActivePopup::None; return; } @@ -2390,12 +2395,22 @@ impl ChatComposer { self.sync_command_popup(allow_command_popup); if matches!(self.active_popup, ActivePopup::Command(_)) { + if self.current_file_query.is_some() { + self.app_event_tx + .send(AppEvent::StartFileSearch(String::new())); + self.current_file_query = None; + } self.dismissed_file_popup_token = None; self.dismissed_skill_popup_token = None; return; } if let Some(token) = skill_token { + if self.current_file_query.is_some() { + self.app_event_tx + .send(AppEvent::StartFileSearch(String::new())); + self.current_file_query = None; + } self.sync_skill_popup(token); return; } @@ -2406,6 +2421,11 @@ impl ChatComposer { return; } + if self.current_file_query.is_some() { + self.app_event_tx + .send(AppEvent::StartFileSearch(String::new())); + self.current_file_query = None; + } self.dismissed_file_popup_token = None; if matches!( self.active_popup, @@ -2539,7 +2559,10 @@ impl ChatComposer { return; } - if !query.is_empty() { + if query.is_empty() { + self.app_event_tx + .send(AppEvent::StartFileSearch(String::new())); + } else { self.app_event_tx .send(AppEvent::StartFileSearch(query.clone())); } @@ -2563,7 +2586,11 @@ impl ChatComposer { } } - self.current_file_query = Some(query); + if query.is_empty() { + self.current_file_query = None; + } else { + self.current_file_query = Some(query); + } self.dismissed_file_popup_token = None; } diff --git a/codex-rs/tui/src/bottom_pane/file_search_popup.rs b/codex-rs/tui/src/bottom_pane/file_search_popup.rs index e0a0cc2f4..e018c7ff8 100644 --- a/codex-rs/tui/src/bottom_pane/file_search_popup.rs +++ b/codex-rs/tui/src/bottom_pane/file_search_popup.rs @@ -43,18 +43,10 @@ impl FileSearchPopup { return; } - // Determine if current matches are still relevant. - let keep_existing = query.starts_with(&self.display_query); - self.pending_query.clear(); self.pending_query.push_str(query); self.waiting = true; // waiting for new results - - if !keep_existing { - self.matches.clear(); - self.state.reset(); - } } /// Put the popup into an "idle" state used for an empty query (just "@"). diff --git a/codex-rs/tui/src/file_search.rs b/codex-rs/tui/src/file_search.rs index af4651264..88c56f231 100644 --- a/codex-rs/tui/src/file_search.rs +++ b/codex-rs/tui/src/file_search.rs @@ -1,32 +1,15 @@ -//! Helper that owns the debounce/cancellation logic for `@` file searches. +//! Session-based orchestration for `@` file searches. //! -//! `ChatComposer` publishes *every* change of the `@token` as -//! `AppEvent::StartFileSearch(query)`. -//! This struct receives those events and decides when to actually spawn the -//! expensive search (handled in the main `App` thread). It tries to ensure: -//! -//! - Even when the user types long text quickly, they will start seeing results -//! after a short delay using an early version of what they typed. -//! - At most one search is in-flight at any time. -//! -//! It works as follows: -//! -//! 1. First query starts a debounce timer. -//! 2. While the timer is pending, the latest query from the user is stored. -//! 3. When the timer fires, it is cleared, and a search is done for the most -//! recent query. -//! 4. If there is a in-flight search that is not a prefix of the latest thing -//! the user typed, it is cancelled. +//! `ChatComposer` publishes every change of the `@token` as +//! `AppEvent::StartFileSearch(query)`. This manager owns a single +//! `codex-file-search` session for the current search root, updates the query +//! on every keystroke, and drops the session when the query becomes empty. use codex_file_search as file_search; use std::num::NonZeroUsize; use std::path::PathBuf; use std::sync::Arc; use std::sync::Mutex; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; -use std::thread; -use std::time::Duration; use crate::app_event::AppEvent; use crate::app_event_sender::AppEventSender; @@ -34,35 +17,16 @@ use crate::app_event_sender::AppEventSender; const MAX_FILE_SEARCH_RESULTS: NonZeroUsize = NonZeroUsize::new(20).unwrap(); const NUM_FILE_SEARCH_THREADS: NonZeroUsize = NonZeroUsize::new(2).unwrap(); -/// How long to wait after a keystroke before firing the first search when none -/// is currently running. Keeps early queries more meaningful. -const FILE_SEARCH_DEBOUNCE: Duration = Duration::from_millis(100); - -const ACTIVE_SEARCH_COMPLETE_POLL_INTERVAL: Duration = Duration::from_millis(20); - -/// State machine for file-search orchestration. pub(crate) struct FileSearchManager { - /// Unified state guarded by one mutex. state: Arc>, - search_dir: PathBuf, app_tx: AppEventSender, } struct SearchState { - /// Latest query typed by user (updated every keystroke). latest_query: String, - - /// true if a search is currently scheduled. - is_search_scheduled: bool, - - /// If there is an active search, this will be the query being searched. - active_search: Option, -} - -struct ActiveSearch { - query: String, - cancellation_token: Arc, + session: Option, + session_token: usize, } impl FileSearchManager { @@ -70,8 +34,8 @@ impl FileSearchManager { Self { state: Arc::new(Mutex::new(SearchState { latest_query: String::new(), - is_search_scheduled: false, - active_search: None, + session: None, + session_token: 0, })), search_dir, app_tx: tx, @@ -80,120 +44,85 @@ impl FileSearchManager { /// Call whenever the user edits the `@` token. pub fn on_user_query(&self, query: String) { - { - #[expect(clippy::unwrap_used)] - let mut st = self.state.lock().unwrap(); - if query == st.latest_query { - // No change, nothing to do. - return; - } + #[expect(clippy::unwrap_used)] + let mut st = self.state.lock().unwrap(); + if query == st.latest_query { + return; + } + st.latest_query.clear(); + st.latest_query.push_str(&query); - // Update latest query. - st.latest_query.clear(); - st.latest_query.push_str(&query); - - // If there is an in-flight search that is definitely obsolete, - // cancel it now. - if let Some(active_search) = &st.active_search - && !query.starts_with(&active_search.query) - { - active_search - .cancellation_token - .store(true, Ordering::Relaxed); - st.active_search = None; - } - - // Schedule a search to run after debounce. - if !st.is_search_scheduled { - st.is_search_scheduled = true; - } else { - return; - } + if query.is_empty() { + st.session.take(); + return; } - // If we are here, we set `st.is_search_scheduled = true` before - // dropping the lock. This means we are the only thread that can spawn a - // debounce timer. - let state = self.state.clone(); - let search_dir = self.search_dir.clone(); - let tx_clone = self.app_tx.clone(); - thread::spawn(move || { - // Always do a minimum debounce, but then poll until the - // `active_search` is cleared. - thread::sleep(FILE_SEARCH_DEBOUNCE); - loop { - #[expect(clippy::unwrap_used)] - if state.lock().unwrap().active_search.is_none() { - break; - } - thread::sleep(ACTIVE_SEARCH_COMPLETE_POLL_INTERVAL); - } - - // The debounce timer has expired, so start a search using the - // latest query. - let cancellation_token = Arc::new(AtomicBool::new(false)); - let token = cancellation_token.clone(); - let query = { - #[expect(clippy::unwrap_used)] - let mut st = state.lock().unwrap(); - let query = st.latest_query.clone(); - st.is_search_scheduled = false; - st.active_search = Some(ActiveSearch { - query: query.clone(), - cancellation_token: token, - }); - query - }; - - FileSearchManager::spawn_file_search( - query, - search_dir, - tx_clone, - cancellation_token, - state, - ); - }); + if st.session.is_none() { + self.start_session_locked(&mut st); + } + if let Some(session) = st.session.as_ref() { + session.update_query(&query); + } } - fn spawn_file_search( - query: String, - search_dir: PathBuf, - tx: AppEventSender, - cancellation_token: Arc, - search_state: Arc>, - ) { - let compute_indices = true; - std::thread::spawn(move || { - let matches = file_search::run( - &query, - MAX_FILE_SEARCH_RESULTS, - &search_dir, - Vec::new(), - NUM_FILE_SEARCH_THREADS, - cancellation_token.clone(), - compute_indices, - true, - ) - .map(|res| res.matches) - .unwrap_or_default(); - - let is_cancelled = cancellation_token.load(Ordering::Relaxed); - if !is_cancelled { - tx.send(AppEvent::FileSearchResult { query, matches }); + fn start_session_locked(&self, st: &mut SearchState) { + st.session_token = st.session_token.wrapping_add(1); + let session_token = st.session_token; + let reporter = Arc::new(TuiSessionReporter { + state: self.state.clone(), + app_tx: self.app_tx.clone(), + session_token, + }); + let session = file_search::create_session( + &self.search_dir, + file_search::SessionOptions { + limit: MAX_FILE_SEARCH_RESULTS, + exclude: Vec::new(), + threads: NUM_FILE_SEARCH_THREADS, + compute_indices: true, + respect_gitignore: true, + }, + reporter, + ); + match session { + Ok(session) => st.session = Some(session), + Err(err) => { + tracing::warn!("file search session failed to start: {err}"); + st.session = None; } + } + } +} - // Reset the active search state. Do a pointer comparison to verify - // that we are clearing the ActiveSearch that corresponds to the - // cancellation token we were given. - { - #[expect(clippy::unwrap_used)] - let mut st = search_state.lock().unwrap(); - if let Some(active_search) = &st.active_search - && Arc::ptr_eq(&active_search.cancellation_token, &cancellation_token) - { - st.active_search = None; - } - } +struct TuiSessionReporter { + state: Arc>, + app_tx: AppEventSender, + session_token: usize, +} + +impl TuiSessionReporter { + fn send_snapshot(&self, snapshot: &file_search::FileSearchSnapshot) { + #[expect(clippy::unwrap_used)] + let st = self.state.lock().unwrap(); + if st.session_token != self.session_token + || st.latest_query.is_empty() + || snapshot.query.is_empty() + { + return; + } + let query = snapshot.query.clone(); + drop(st); + self.app_tx.send(AppEvent::FileSearchResult { + query, + matches: snapshot.matches.clone(), }); } } + +impl file_search::SessionReporter for TuiSessionReporter { + fn on_update(&self, snapshot: &file_search::FileSearchSnapshot) { + self.send_snapshot(snapshot); + } + + fn on_complete(&self) {} +}