diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index d00aa4545..1b448db6f 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2011,6 +2011,7 @@ dependencies = [ "serde", "serde_json", "tempfile", + "test-case", "thiserror 2.0.18", "tokio", "tokio-tungstenite", diff --git a/codex-rs/app-server/src/fs_api.rs b/codex-rs/app-server/src/fs_api.rs index 9baa2b1dc..1f8a32362 100644 --- a/codex-rs/app-server/src/fs_api.rs +++ b/codex-rs/app-server/src/fs_api.rs @@ -34,7 +34,7 @@ pub(crate) struct FsApi { impl Default for FsApi { fn default() -> Self { Self { - file_system: Arc::new(Environment::default().get_filesystem()), + file_system: Environment::default().get_filesystem(), } } } diff --git a/codex-rs/core/src/tools/handlers/view_image.rs b/codex-rs/core/src/tools/handlers/view_image.rs index 9cbe9bbc7..f4015a762 100644 --- a/codex-rs/core/src/tools/handlers/view_image.rs +++ b/codex-rs/core/src/tools/handlers/view_image.rs @@ -1,5 +1,4 @@ use async_trait::async_trait; -use codex_exec_server::ExecutorFileSystem; use codex_protocol::models::FunctionCallOutputBody; use codex_protocol::models::FunctionCallOutputContentItem; use codex_protocol::models::FunctionCallOutputPayload; diff --git a/codex-rs/exec-server/Cargo.toml b/codex-rs/exec-server/Cargo.toml index fac7649e4..3ec6b6b94 100644 --- a/codex-rs/exec-server/Cargo.toml +++ b/codex-rs/exec-server/Cargo.toml @@ -44,3 +44,4 @@ anyhow = { workspace = true } codex-utils-cargo-bin = { workspace = true } pretty_assertions = { workspace = true } tempfile = { workspace = true } +test-case = "3.3.1" diff --git a/codex-rs/exec-server/src/environment.rs b/codex-rs/exec-server/src/environment.rs index c8635ec03..3ca1cfe90 100644 --- a/codex-rs/exec-server/src/environment.rs +++ b/codex-rs/exec-server/src/environment.rs @@ -1,8 +1,10 @@ use crate::ExecServerClient; use crate::ExecServerError; use crate::RemoteExecServerConnectArgs; -use crate::fs; -use crate::fs::ExecutorFileSystem; +use crate::file_system::ExecutorFileSystem; +use crate::local_file_system::LocalFileSystem; +use crate::remote_file_system::RemoteFileSystem; +use std::sync::Arc; #[derive(Clone, Default)] pub struct Environment { @@ -56,8 +58,12 @@ impl Environment { self.remote_exec_server_client.as_ref() } - pub fn get_filesystem(&self) -> impl ExecutorFileSystem + use<> { - fs::LocalFileSystem + pub fn get_filesystem(&self) -> Arc { + if let Some(client) = self.remote_exec_server_client.clone() { + Arc::new(RemoteFileSystem::new(client)) + } else { + Arc::new(LocalFileSystem) + } } } diff --git a/codex-rs/exec-server/src/file_system.rs b/codex-rs/exec-server/src/file_system.rs new file mode 100644 index 000000000..35c2243f8 --- /dev/null +++ b/codex-rs/exec-server/src/file_system.rs @@ -0,0 +1,65 @@ +use async_trait::async_trait; +use codex_utils_absolute_path::AbsolutePathBuf; +use tokio::io; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct CreateDirectoryOptions { + pub recursive: bool, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct RemoveOptions { + pub recursive: bool, + pub force: bool, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct CopyOptions { + pub recursive: bool, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct FileMetadata { + pub is_directory: bool, + pub is_file: bool, + pub created_at_ms: i64, + pub modified_at_ms: i64, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct ReadDirectoryEntry { + pub file_name: String, + pub is_directory: bool, + pub is_file: bool, +} + +pub type FileSystemResult = io::Result; + +#[async_trait] +pub trait ExecutorFileSystem: Send + Sync { + async fn read_file(&self, path: &AbsolutePathBuf) -> FileSystemResult>; + + async fn write_file(&self, path: &AbsolutePathBuf, contents: Vec) -> FileSystemResult<()>; + + async fn create_directory( + &self, + path: &AbsolutePathBuf, + options: CreateDirectoryOptions, + ) -> FileSystemResult<()>; + + async fn get_metadata(&self, path: &AbsolutePathBuf) -> FileSystemResult; + + async fn read_directory( + &self, + path: &AbsolutePathBuf, + ) -> FileSystemResult>; + + async fn remove(&self, path: &AbsolutePathBuf, options: RemoveOptions) -> FileSystemResult<()>; + + async fn copy( + &self, + source_path: &AbsolutePathBuf, + destination_path: &AbsolutePathBuf, + options: CopyOptions, + ) -> FileSystemResult<()>; +} diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index 3c50d0ec5..55c42ebb9 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -2,8 +2,10 @@ mod client; mod client_api; mod connection; mod environment; -mod fs; +mod file_system; +mod local_file_system; mod protocol; +mod remote_file_system; mod rpc; mod server; @@ -28,13 +30,13 @@ pub use codex_app_server_protocol::FsRemoveResponse; pub use codex_app_server_protocol::FsWriteFileParams; pub use codex_app_server_protocol::FsWriteFileResponse; pub use environment::Environment; -pub use fs::CopyOptions; -pub use fs::CreateDirectoryOptions; -pub use fs::ExecutorFileSystem; -pub use fs::FileMetadata; -pub use fs::FileSystemResult; -pub use fs::ReadDirectoryEntry; -pub use fs::RemoveOptions; +pub use file_system::CopyOptions; +pub use file_system::CreateDirectoryOptions; +pub use file_system::ExecutorFileSystem; +pub use file_system::FileMetadata; +pub use file_system::FileSystemResult; +pub use file_system::ReadDirectoryEntry; +pub use file_system::RemoveOptions; pub use protocol::ExecExitedNotification; pub use protocol::ExecOutputDeltaNotification; pub use protocol::ExecOutputStream; diff --git a/codex-rs/exec-server/src/fs.rs b/codex-rs/exec-server/src/local_file_system.rs similarity index 85% rename from codex-rs/exec-server/src/fs.rs rename to codex-rs/exec-server/src/local_file_system.rs index 82e0b8e6e..fba7efa30 100644 --- a/codex-rs/exec-server/src/fs.rs +++ b/codex-rs/exec-server/src/local_file_system.rs @@ -7,70 +7,16 @@ use std::time::SystemTime; use std::time::UNIX_EPOCH; use tokio::io; +use crate::CopyOptions; +use crate::CreateDirectoryOptions; +use crate::ExecutorFileSystem; +use crate::FileMetadata; +use crate::FileSystemResult; +use crate::ReadDirectoryEntry; +use crate::RemoveOptions; + const MAX_READ_FILE_BYTES: u64 = 512 * 1024 * 1024; -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub struct CreateDirectoryOptions { - pub recursive: bool, -} - -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub struct RemoveOptions { - pub recursive: bool, - pub force: bool, -} - -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub struct CopyOptions { - pub recursive: bool, -} - -#[derive(Clone, Debug, Eq, PartialEq)] -pub struct FileMetadata { - pub is_directory: bool, - pub is_file: bool, - pub created_at_ms: i64, - pub modified_at_ms: i64, -} - -#[derive(Clone, Debug, Eq, PartialEq)] -pub struct ReadDirectoryEntry { - pub file_name: String, - pub is_directory: bool, - pub is_file: bool, -} - -pub type FileSystemResult = io::Result; - -#[async_trait] -pub trait ExecutorFileSystem: Send + Sync { - async fn read_file(&self, path: &AbsolutePathBuf) -> FileSystemResult>; - - async fn write_file(&self, path: &AbsolutePathBuf, contents: Vec) -> FileSystemResult<()>; - - async fn create_directory( - &self, - path: &AbsolutePathBuf, - options: CreateDirectoryOptions, - ) -> FileSystemResult<()>; - - async fn get_metadata(&self, path: &AbsolutePathBuf) -> FileSystemResult; - - async fn read_directory( - &self, - path: &AbsolutePathBuf, - ) -> FileSystemResult>; - - async fn remove(&self, path: &AbsolutePathBuf, options: RemoveOptions) -> FileSystemResult<()>; - - async fn copy( - &self, - source_path: &AbsolutePathBuf, - destination_path: &AbsolutePathBuf, - options: CopyOptions, - ) -> FileSystemResult<()>; -} - #[derive(Clone, Default)] pub(crate) struct LocalFileSystem; diff --git a/codex-rs/exec-server/src/remote_file_system.rs b/codex-rs/exec-server/src/remote_file_system.rs new file mode 100644 index 000000000..9711f43e5 --- /dev/null +++ b/codex-rs/exec-server/src/remote_file_system.rs @@ -0,0 +1,154 @@ +use async_trait::async_trait; +use base64::Engine as _; +use base64::engine::general_purpose::STANDARD; +use codex_app_server_protocol::FsCopyParams; +use codex_app_server_protocol::FsCreateDirectoryParams; +use codex_app_server_protocol::FsGetMetadataParams; +use codex_app_server_protocol::FsReadDirectoryParams; +use codex_app_server_protocol::FsReadFileParams; +use codex_app_server_protocol::FsRemoveParams; +use codex_app_server_protocol::FsWriteFileParams; +use codex_utils_absolute_path::AbsolutePathBuf; +use tokio::io; + +use crate::CopyOptions; +use crate::CreateDirectoryOptions; +use crate::ExecServerClient; +use crate::ExecServerError; +use crate::ExecutorFileSystem; +use crate::FileMetadata; +use crate::FileSystemResult; +use crate::ReadDirectoryEntry; +use crate::RemoveOptions; + +const INVALID_REQUEST_ERROR_CODE: i64 = -32600; + +#[derive(Clone)] +pub(crate) struct RemoteFileSystem { + client: ExecServerClient, +} + +impl RemoteFileSystem { + pub(crate) fn new(client: ExecServerClient) -> Self { + Self { client } + } +} + +#[async_trait] +impl ExecutorFileSystem for RemoteFileSystem { + async fn read_file(&self, path: &AbsolutePathBuf) -> FileSystemResult> { + let response = self + .client + .fs_read_file(FsReadFileParams { path: path.clone() }) + .await + .map_err(map_remote_error)?; + STANDARD.decode(response.data_base64).map_err(|err| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("remote fs/readFile returned invalid base64 dataBase64: {err}"), + ) + }) + } + + async fn write_file(&self, path: &AbsolutePathBuf, contents: Vec) -> FileSystemResult<()> { + self.client + .fs_write_file(FsWriteFileParams { + path: path.clone(), + data_base64: STANDARD.encode(contents), + }) + .await + .map_err(map_remote_error)?; + Ok(()) + } + + async fn create_directory( + &self, + path: &AbsolutePathBuf, + options: CreateDirectoryOptions, + ) -> FileSystemResult<()> { + self.client + .fs_create_directory(FsCreateDirectoryParams { + path: path.clone(), + recursive: Some(options.recursive), + }) + .await + .map_err(map_remote_error)?; + Ok(()) + } + + async fn get_metadata(&self, path: &AbsolutePathBuf) -> FileSystemResult { + let response = self + .client + .fs_get_metadata(FsGetMetadataParams { path: path.clone() }) + .await + .map_err(map_remote_error)?; + Ok(FileMetadata { + is_directory: response.is_directory, + is_file: response.is_file, + created_at_ms: response.created_at_ms, + modified_at_ms: response.modified_at_ms, + }) + } + + async fn read_directory( + &self, + path: &AbsolutePathBuf, + ) -> FileSystemResult> { + let response = self + .client + .fs_read_directory(FsReadDirectoryParams { path: path.clone() }) + .await + .map_err(map_remote_error)?; + Ok(response + .entries + .into_iter() + .map(|entry| ReadDirectoryEntry { + file_name: entry.file_name, + is_directory: entry.is_directory, + is_file: entry.is_file, + }) + .collect()) + } + + async fn remove(&self, path: &AbsolutePathBuf, options: RemoveOptions) -> FileSystemResult<()> { + self.client + .fs_remove(FsRemoveParams { + path: path.clone(), + recursive: Some(options.recursive), + force: Some(options.force), + }) + .await + .map_err(map_remote_error)?; + Ok(()) + } + + async fn copy( + &self, + source_path: &AbsolutePathBuf, + destination_path: &AbsolutePathBuf, + options: CopyOptions, + ) -> FileSystemResult<()> { + self.client + .fs_copy(FsCopyParams { + source_path: source_path.clone(), + destination_path: destination_path.clone(), + recursive: options.recursive, + }) + .await + .map_err(map_remote_error)?; + Ok(()) + } +} + +fn map_remote_error(error: ExecServerError) -> io::Error { + match error { + ExecServerError::Server { code, message } if code == INVALID_REQUEST_ERROR_CODE => { + io::Error::new(io::ErrorKind::InvalidInput, message) + } + ExecServerError::Server { message, .. } => io::Error::other(message), + ExecServerError::Closed => { + io::Error::new(io::ErrorKind::BrokenPipe, "exec-server transport closed") + } + _ => io::Error::other(error.to_string()), + } +} diff --git a/codex-rs/exec-server/src/server.rs b/codex-rs/exec-server/src/server.rs index c403b029d..4bd90dd9a 100644 --- a/codex-rs/exec-server/src/server.rs +++ b/codex-rs/exec-server/src/server.rs @@ -1,4 +1,4 @@ -mod filesystem; +mod file_system_handler; mod handler; mod processor; mod registry; diff --git a/codex-rs/exec-server/src/server/filesystem.rs b/codex-rs/exec-server/src/server/file_system_handler.rs similarity index 93% rename from codex-rs/exec-server/src/server/filesystem.rs rename to codex-rs/exec-server/src/server/file_system_handler.rs index a263bb1fe..2e4e1592d 100644 --- a/codex-rs/exec-server/src/server/filesystem.rs +++ b/codex-rs/exec-server/src/server/file_system_handler.rs @@ -1,5 +1,4 @@ use std::io; -use std::sync::Arc; use base64::Engine as _; use base64::engine::general_purpose::STANDARD; @@ -22,26 +21,18 @@ use codex_app_server_protocol::JSONRPCErrorError; use crate::CopyOptions; use crate::CreateDirectoryOptions; -use crate::Environment; use crate::ExecutorFileSystem; use crate::RemoveOptions; +use crate::local_file_system::LocalFileSystem; use crate::rpc::internal_error; use crate::rpc::invalid_request; -#[derive(Clone)] -pub(crate) struct ExecServerFileSystem { - file_system: Arc, +#[derive(Clone, Default)] +pub(crate) struct FileSystemHandler { + file_system: LocalFileSystem, } -impl Default for ExecServerFileSystem { - fn default() -> Self { - Self { - file_system: Arc::new(Environment::default().get_filesystem()), - } - } -} - -impl ExecServerFileSystem { +impl FileSystemHandler { pub(crate) async fn read_file( &self, params: FsReadFileParams, diff --git a/codex-rs/exec-server/src/server/handler.rs b/codex-rs/exec-server/src/server/handler.rs index c21aeecb5..0ddd7ee50 100644 --- a/codex-rs/exec-server/src/server/handler.rs +++ b/codex-rs/exec-server/src/server/handler.rs @@ -43,7 +43,7 @@ use crate::rpc::RpcNotificationSender; use crate::rpc::internal_error; use crate::rpc::invalid_params; use crate::rpc::invalid_request; -use crate::server::filesystem::ExecServerFileSystem; +use crate::server::file_system_handler::FileSystemHandler; const RETAINED_OUTPUT_BYTES_PER_PROCESS: usize = 1024 * 1024; #[cfg(test)] @@ -75,7 +75,7 @@ enum ProcessEntry { pub(crate) struct ExecServerHandler { notifications: RpcNotificationSender, - file_system: ExecServerFileSystem, + file_system: FileSystemHandler, processes: Arc>>, initialize_requested: AtomicBool, initialized: AtomicBool, @@ -85,7 +85,7 @@ impl ExecServerHandler { pub(crate) fn new(notifications: RpcNotificationSender) -> Self { Self { notifications, - file_system: ExecServerFileSystem::default(), + file_system: FileSystemHandler::default(), processes: Arc::new(Mutex::new(HashMap::new())), initialize_requested: AtomicBool::new(false), initialized: AtomicBool::new(false), diff --git a/codex-rs/exec-server/src/server/transport.rs b/codex-rs/exec-server/src/server/transport.rs index 22b57a0b1..4726465cc 100644 --- a/codex-rs/exec-server/src/server/transport.rs +++ b/codex-rs/exec-server/src/server/transport.rs @@ -59,6 +59,7 @@ async fn run_websocket_listener( let listener = TcpListener::bind(bind_address).await?; let local_addr = listener.local_addr()?; tracing::info!("codex-exec-server listening on ws://{local_addr}"); + println!("ws://{local_addr}"); loop { let (stream, peer_addr) = listener.accept().await?; diff --git a/codex-rs/exec-server/tests/common/exec_server.rs b/codex-rs/exec-server/tests/common/exec_server.rs index 225e4e485..c7c120ee1 100644 --- a/codex-rs/exec-server/tests/common/exec_server.rs +++ b/codex-rs/exec-server/tests/common/exec_server.rs @@ -11,6 +11,8 @@ use codex_app_server_protocol::RequestId; use codex_utils_cargo_bin::cargo_bin; use futures::SinkExt; use futures::StreamExt; +use tokio::io::AsyncBufReadExt; +use tokio::io::BufReader; use tokio::process::Child; use tokio::process::Command; use tokio::time::Instant; @@ -25,6 +27,7 @@ const EVENT_TIMEOUT: Duration = Duration::from_secs(5); pub(crate) struct ExecServerHarness { child: Child, + websocket_url: String, websocket: tokio_tungstenite::WebSocketStream< tokio_tungstenite::MaybeTlsStream, >, @@ -39,23 +42,28 @@ impl Drop for ExecServerHarness { pub(crate) async fn exec_server() -> anyhow::Result { let binary = cargo_bin("codex-exec-server")?; - let websocket_url = reserve_websocket_url()?; let mut child = Command::new(binary); - child.args(["--listen", &websocket_url]); + child.args(["--listen", "ws://127.0.0.1:0"]); child.stdin(Stdio::null()); - child.stdout(Stdio::null()); + child.stdout(Stdio::piped()); child.stderr(Stdio::inherit()); - let child = child.spawn()?; + let mut child = child.spawn()?; + let websocket_url = read_listen_url_from_stdout(&mut child).await?; let (websocket, _) = connect_websocket_when_ready(&websocket_url).await?; Ok(ExecServerHarness { child, + websocket_url, websocket, next_request_id: 1, }) } impl ExecServerHarness { + pub(crate) fn websocket_url(&self) -> &str { + &self.websocket_url + } + pub(crate) async fn send_request( &mut self, method: &str, @@ -155,13 +163,6 @@ impl ExecServerHarness { } } -fn reserve_websocket_url() -> anyhow::Result { - let listener = std::net::TcpListener::bind("127.0.0.1:0")?; - let addr = listener.local_addr()?; - drop(listener); - Ok(format!("ws://{addr}")) -} - async fn connect_websocket_when_ready( websocket_url: &str, ) -> anyhow::Result<( @@ -186,3 +187,30 @@ async fn connect_websocket_when_ready( } } } + +async fn read_listen_url_from_stdout(child: &mut Child) -> anyhow::Result { + let stdout = child + .stdout + .take() + .ok_or_else(|| anyhow!("failed to capture exec-server stdout"))?; + let mut lines = BufReader::new(stdout).lines(); + let deadline = Instant::now() + CONNECT_TIMEOUT; + + loop { + let now = Instant::now(); + if now >= deadline { + return Err(anyhow!( + "timed out waiting for exec-server listen URL on stdout after {CONNECT_TIMEOUT:?}" + )); + } + let remaining = deadline.duration_since(now); + let line = timeout(remaining, lines.next_line()) + .await + .map_err(|_| anyhow!("timed out waiting for exec-server stdout"))?? + .ok_or_else(|| anyhow!("exec-server stdout closed before emitting listen URL"))?; + let listen_url = line.trim(); + if listen_url.starts_with("ws://") { + return Ok(listen_url.to_string()); + } + } +} diff --git a/codex-rs/exec-server/tests/file_system.rs b/codex-rs/exec-server/tests/file_system.rs new file mode 100644 index 000000000..ed90d7aa9 --- /dev/null +++ b/codex-rs/exec-server/tests/file_system.rs @@ -0,0 +1,361 @@ +#![cfg(unix)] + +mod common; + +use std::os::unix::fs::symlink; +use std::process::Command; +use std::sync::Arc; + +use anyhow::Context; +use anyhow::Result; +use codex_exec_server::CopyOptions; +use codex_exec_server::CreateDirectoryOptions; +use codex_exec_server::Environment; +use codex_exec_server::ExecutorFileSystem; +use codex_exec_server::ReadDirectoryEntry; +use codex_exec_server::RemoveOptions; +use codex_utils_absolute_path::AbsolutePathBuf; +use pretty_assertions::assert_eq; +use tempfile::TempDir; +use test_case::test_case; + +use common::exec_server::ExecServerHarness; +use common::exec_server::exec_server; + +struct FileSystemContext { + file_system: Arc, + _server: Option, +} + +async fn create_file_system_context(use_remote: bool) -> Result { + if use_remote { + let server = exec_server().await?; + let environment = Environment::create(Some(server.websocket_url().to_string())).await?; + Ok(FileSystemContext { + file_system: environment.get_filesystem(), + _server: Some(server), + }) + } else { + let environment = Environment::create(None).await?; + Ok(FileSystemContext { + file_system: environment.get_filesystem(), + _server: None, + }) + } +} + +fn absolute_path(path: std::path::PathBuf) -> AbsolutePathBuf { + assert!( + path.is_absolute(), + "path must be absolute: {}", + path.display() + ); + match AbsolutePathBuf::try_from(path) { + Ok(path) => path, + Err(err) => panic!("path should be absolute: {err}"), + } +} + +#[test_case(false ; "local")] +#[test_case(true ; "remote")] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn file_system_get_metadata_returns_expected_fields(use_remote: bool) -> Result<()> { + let context = create_file_system_context(use_remote).await?; + let file_system = context.file_system; + + let tmp = TempDir::new()?; + let file_path = tmp.path().join("note.txt"); + std::fs::write(&file_path, "hello")?; + + let metadata = file_system + .get_metadata(&absolute_path(file_path)) + .await + .with_context(|| format!("mode={use_remote}"))?; + assert_eq!(metadata.is_directory, false); + assert_eq!(metadata.is_file, true); + assert!(metadata.modified_at_ms > 0); + + Ok(()) +} + +#[test_case(false ; "local")] +#[test_case(true ; "remote")] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn file_system_methods_cover_surface_area(use_remote: bool) -> Result<()> { + let context = create_file_system_context(use_remote).await?; + let file_system = context.file_system; + + let tmp = TempDir::new()?; + let source_dir = tmp.path().join("source"); + let nested_dir = source_dir.join("nested"); + let source_file = source_dir.join("root.txt"); + let nested_file = nested_dir.join("note.txt"); + let copied_dir = tmp.path().join("copied"); + let copied_file = tmp.path().join("copy.txt"); + + file_system + .create_directory( + &absolute_path(nested_dir.clone()), + CreateDirectoryOptions { recursive: true }, + ) + .await + .with_context(|| format!("mode={use_remote}"))?; + + file_system + .write_file( + &absolute_path(nested_file.clone()), + b"hello from trait".to_vec(), + ) + .await + .with_context(|| format!("mode={use_remote}"))?; + file_system + .write_file( + &absolute_path(source_file.clone()), + b"hello from source root".to_vec(), + ) + .await + .with_context(|| format!("mode={use_remote}"))?; + + let nested_file_contents = file_system + .read_file(&absolute_path(nested_file.clone())) + .await + .with_context(|| format!("mode={use_remote}"))?; + assert_eq!(nested_file_contents, b"hello from trait"); + + file_system + .copy( + &absolute_path(nested_file), + &absolute_path(copied_file.clone()), + CopyOptions { recursive: false }, + ) + .await + .with_context(|| format!("mode={use_remote}"))?; + assert_eq!(std::fs::read_to_string(copied_file)?, "hello from trait"); + + file_system + .copy( + &absolute_path(source_dir.clone()), + &absolute_path(copied_dir.clone()), + CopyOptions { recursive: true }, + ) + .await + .with_context(|| format!("mode={use_remote}"))?; + assert_eq!( + std::fs::read_to_string(copied_dir.join("nested").join("note.txt"))?, + "hello from trait" + ); + + let mut entries = file_system + .read_directory(&absolute_path(source_dir)) + .await + .with_context(|| format!("mode={use_remote}"))?; + entries.sort_by(|left, right| left.file_name.cmp(&right.file_name)); + assert_eq!( + entries, + vec![ + ReadDirectoryEntry { + file_name: "nested".to_string(), + is_directory: true, + is_file: false, + }, + ReadDirectoryEntry { + file_name: "root.txt".to_string(), + is_directory: false, + is_file: true, + }, + ] + ); + + file_system + .remove( + &absolute_path(copied_dir.clone()), + RemoveOptions { + recursive: true, + force: true, + }, + ) + .await + .with_context(|| format!("mode={use_remote}"))?; + assert!(!copied_dir.exists()); + + Ok(()) +} + +#[test_case(false ; "local")] +#[test_case(true ; "remote")] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn file_system_copy_rejects_directory_without_recursive(use_remote: bool) -> Result<()> { + let context = create_file_system_context(use_remote).await?; + let file_system = context.file_system; + + let tmp = TempDir::new()?; + let source_dir = tmp.path().join("source"); + std::fs::create_dir_all(&source_dir)?; + + let error = file_system + .copy( + &absolute_path(source_dir), + &absolute_path(tmp.path().join("dest")), + CopyOptions { recursive: false }, + ) + .await; + let error = match error { + Ok(()) => panic!("copy should fail"), + Err(error) => error, + }; + assert_eq!(error.kind(), std::io::ErrorKind::InvalidInput); + assert_eq!( + error.to_string(), + "fs/copy requires recursive: true when sourcePath is a directory" + ); + + Ok(()) +} + +#[test_case(false ; "local")] +#[test_case(true ; "remote")] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn file_system_copy_rejects_copying_directory_into_descendant( + use_remote: bool, +) -> Result<()> { + let context = create_file_system_context(use_remote).await?; + let file_system = context.file_system; + + let tmp = TempDir::new()?; + let source_dir = tmp.path().join("source"); + std::fs::create_dir_all(source_dir.join("nested"))?; + + let error = file_system + .copy( + &absolute_path(source_dir.clone()), + &absolute_path(source_dir.join("nested").join("copy")), + CopyOptions { recursive: true }, + ) + .await; + let error = match error { + Ok(()) => panic!("copy should fail"), + Err(error) => error, + }; + assert_eq!(error.kind(), std::io::ErrorKind::InvalidInput); + assert_eq!( + error.to_string(), + "fs/copy cannot copy a directory to itself or one of its descendants" + ); + + Ok(()) +} + +#[test_case(false ; "local")] +#[test_case(true ; "remote")] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn file_system_copy_preserves_symlinks_in_recursive_copy(use_remote: bool) -> Result<()> { + let context = create_file_system_context(use_remote).await?; + let file_system = context.file_system; + + let tmp = TempDir::new()?; + let source_dir = tmp.path().join("source"); + let nested_dir = source_dir.join("nested"); + let copied_dir = tmp.path().join("copied"); + std::fs::create_dir_all(&nested_dir)?; + symlink("nested", source_dir.join("nested-link"))?; + + file_system + .copy( + &absolute_path(source_dir), + &absolute_path(copied_dir.clone()), + CopyOptions { recursive: true }, + ) + .await + .with_context(|| format!("mode={use_remote}"))?; + + let copied_link = copied_dir.join("nested-link"); + let metadata = std::fs::symlink_metadata(&copied_link)?; + assert!(metadata.file_type().is_symlink()); + assert_eq!( + std::fs::read_link(copied_link)?, + std::path::PathBuf::from("nested") + ); + + Ok(()) +} + +#[test_case(false ; "local")] +#[test_case(true ; "remote")] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn file_system_copy_ignores_unknown_special_files_in_recursive_copy( + use_remote: bool, +) -> Result<()> { + let context = create_file_system_context(use_remote).await?; + let file_system = context.file_system; + + let tmp = TempDir::new()?; + let source_dir = tmp.path().join("source"); + let copied_dir = tmp.path().join("copied"); + std::fs::create_dir_all(&source_dir)?; + std::fs::write(source_dir.join("note.txt"), "hello")?; + + let fifo_path = source_dir.join("named-pipe"); + let output = Command::new("mkfifo").arg(&fifo_path).output()?; + if !output.status.success() { + anyhow::bail!( + "mkfifo failed: stdout={} stderr={}", + String::from_utf8_lossy(&output.stdout).trim(), + String::from_utf8_lossy(&output.stderr).trim() + ); + } + + file_system + .copy( + &absolute_path(source_dir), + &absolute_path(copied_dir.clone()), + CopyOptions { recursive: true }, + ) + .await + .with_context(|| format!("mode={use_remote}"))?; + + assert_eq!( + std::fs::read_to_string(copied_dir.join("note.txt"))?, + "hello" + ); + assert!(!copied_dir.join("named-pipe").exists()); + + Ok(()) +} + +#[test_case(false ; "local")] +#[test_case(true ; "remote")] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn file_system_copy_rejects_standalone_fifo_source(use_remote: bool) -> Result<()> { + let context = create_file_system_context(use_remote).await?; + let file_system = context.file_system; + + let tmp = TempDir::new()?; + let fifo_path = tmp.path().join("named-pipe"); + let output = Command::new("mkfifo").arg(&fifo_path).output()?; + if !output.status.success() { + anyhow::bail!( + "mkfifo failed: stdout={} stderr={}", + String::from_utf8_lossy(&output.stdout).trim(), + String::from_utf8_lossy(&output.stderr).trim() + ); + } + + let error = file_system + .copy( + &absolute_path(fifo_path), + &absolute_path(tmp.path().join("copied")), + CopyOptions { recursive: false }, + ) + .await; + let error = match error { + Ok(()) => panic!("copy should fail"), + Err(error) => error, + }; + assert_eq!(error.kind(), std::io::ErrorKind::InvalidInput); + assert_eq!( + error.to_string(), + "fs/copy only supports regular files, directories, and symlinks" + ); + + Ok(()) +}