The idea is that codex-exec exposes an Environment struct with services on it. Each of those is a trait. Depending on construction parameters passed to Environment they are either backed by local or remote server but core doesn't see these differences.
332 lines
10 KiB
Rust
332 lines
10 KiB
Rust
use async_trait::async_trait;
|
|
use codex_utils_absolute_path::AbsolutePathBuf;
|
|
use std::path::Component;
|
|
use std::path::Path;
|
|
use std::path::PathBuf;
|
|
use std::time::SystemTime;
|
|
use std::time::UNIX_EPOCH;
|
|
use tokio::io;
|
|
|
|
const MAX_READ_FILE_BYTES: u64 = 512 * 1024 * 1024;
|
|
|
|
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
|
pub struct CreateDirectoryOptions {
|
|
pub recursive: bool,
|
|
}
|
|
|
|
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
|
pub struct RemoveOptions {
|
|
pub recursive: bool,
|
|
pub force: bool,
|
|
}
|
|
|
|
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
|
pub struct CopyOptions {
|
|
pub recursive: bool,
|
|
}
|
|
|
|
#[derive(Clone, Debug, Eq, PartialEq)]
|
|
pub struct FileMetadata {
|
|
pub is_directory: bool,
|
|
pub is_file: bool,
|
|
pub created_at_ms: i64,
|
|
pub modified_at_ms: i64,
|
|
}
|
|
|
|
#[derive(Clone, Debug, Eq, PartialEq)]
|
|
pub struct ReadDirectoryEntry {
|
|
pub file_name: String,
|
|
pub is_directory: bool,
|
|
pub is_file: bool,
|
|
}
|
|
|
|
pub type FileSystemResult<T> = io::Result<T>;
|
|
|
|
#[async_trait]
|
|
pub trait ExecutorFileSystem: Send + Sync {
|
|
async fn read_file(&self, path: &AbsolutePathBuf) -> FileSystemResult<Vec<u8>>;
|
|
|
|
async fn write_file(&self, path: &AbsolutePathBuf, contents: Vec<u8>) -> FileSystemResult<()>;
|
|
|
|
async fn create_directory(
|
|
&self,
|
|
path: &AbsolutePathBuf,
|
|
options: CreateDirectoryOptions,
|
|
) -> FileSystemResult<()>;
|
|
|
|
async fn get_metadata(&self, path: &AbsolutePathBuf) -> FileSystemResult<FileMetadata>;
|
|
|
|
async fn read_directory(
|
|
&self,
|
|
path: &AbsolutePathBuf,
|
|
) -> FileSystemResult<Vec<ReadDirectoryEntry>>;
|
|
|
|
async fn remove(&self, path: &AbsolutePathBuf, options: RemoveOptions) -> FileSystemResult<()>;
|
|
|
|
async fn copy(
|
|
&self,
|
|
source_path: &AbsolutePathBuf,
|
|
destination_path: &AbsolutePathBuf,
|
|
options: CopyOptions,
|
|
) -> FileSystemResult<()>;
|
|
}
|
|
|
|
#[derive(Clone, Default)]
|
|
pub(crate) struct LocalFileSystem;
|
|
|
|
#[async_trait]
|
|
impl ExecutorFileSystem for LocalFileSystem {
|
|
async fn read_file(&self, path: &AbsolutePathBuf) -> FileSystemResult<Vec<u8>> {
|
|
let metadata = tokio::fs::metadata(path.as_path()).await?;
|
|
if metadata.len() > MAX_READ_FILE_BYTES {
|
|
return Err(io::Error::new(
|
|
io::ErrorKind::InvalidInput,
|
|
format!("file is too large to read: limit is {MAX_READ_FILE_BYTES} bytes"),
|
|
));
|
|
}
|
|
tokio::fs::read(path.as_path()).await
|
|
}
|
|
|
|
async fn write_file(&self, path: &AbsolutePathBuf, contents: Vec<u8>) -> FileSystemResult<()> {
|
|
tokio::fs::write(path.as_path(), contents).await
|
|
}
|
|
|
|
async fn create_directory(
|
|
&self,
|
|
path: &AbsolutePathBuf,
|
|
options: CreateDirectoryOptions,
|
|
) -> FileSystemResult<()> {
|
|
if options.recursive {
|
|
tokio::fs::create_dir_all(path.as_path()).await?;
|
|
} else {
|
|
tokio::fs::create_dir(path.as_path()).await?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn get_metadata(&self, path: &AbsolutePathBuf) -> FileSystemResult<FileMetadata> {
|
|
let metadata = tokio::fs::metadata(path.as_path()).await?;
|
|
Ok(FileMetadata {
|
|
is_directory: metadata.is_dir(),
|
|
is_file: metadata.is_file(),
|
|
created_at_ms: metadata.created().ok().map_or(0, system_time_to_unix_ms),
|
|
modified_at_ms: metadata.modified().ok().map_or(0, system_time_to_unix_ms),
|
|
})
|
|
}
|
|
|
|
async fn read_directory(
|
|
&self,
|
|
path: &AbsolutePathBuf,
|
|
) -> FileSystemResult<Vec<ReadDirectoryEntry>> {
|
|
let mut entries = Vec::new();
|
|
let mut read_dir = tokio::fs::read_dir(path.as_path()).await?;
|
|
while let Some(entry) = read_dir.next_entry().await? {
|
|
let metadata = tokio::fs::metadata(entry.path()).await?;
|
|
entries.push(ReadDirectoryEntry {
|
|
file_name: entry.file_name().to_string_lossy().into_owned(),
|
|
is_directory: metadata.is_dir(),
|
|
is_file: metadata.is_file(),
|
|
});
|
|
}
|
|
Ok(entries)
|
|
}
|
|
|
|
async fn remove(&self, path: &AbsolutePathBuf, options: RemoveOptions) -> FileSystemResult<()> {
|
|
match tokio::fs::symlink_metadata(path.as_path()).await {
|
|
Ok(metadata) => {
|
|
let file_type = metadata.file_type();
|
|
if file_type.is_dir() {
|
|
if options.recursive {
|
|
tokio::fs::remove_dir_all(path.as_path()).await?;
|
|
} else {
|
|
tokio::fs::remove_dir(path.as_path()).await?;
|
|
}
|
|
} else {
|
|
tokio::fs::remove_file(path.as_path()).await?;
|
|
}
|
|
Ok(())
|
|
}
|
|
Err(err) if err.kind() == io::ErrorKind::NotFound && options.force => Ok(()),
|
|
Err(err) => Err(err),
|
|
}
|
|
}
|
|
|
|
async fn copy(
|
|
&self,
|
|
source_path: &AbsolutePathBuf,
|
|
destination_path: &AbsolutePathBuf,
|
|
options: CopyOptions,
|
|
) -> FileSystemResult<()> {
|
|
let source_path = source_path.to_path_buf();
|
|
let destination_path = destination_path.to_path_buf();
|
|
tokio::task::spawn_blocking(move || -> FileSystemResult<()> {
|
|
let metadata = std::fs::symlink_metadata(source_path.as_path())?;
|
|
let file_type = metadata.file_type();
|
|
|
|
if file_type.is_dir() {
|
|
if !options.recursive {
|
|
return Err(io::Error::new(
|
|
io::ErrorKind::InvalidInput,
|
|
"fs/copy requires recursive: true when sourcePath is a directory",
|
|
));
|
|
}
|
|
if destination_is_same_or_descendant_of_source(
|
|
source_path.as_path(),
|
|
destination_path.as_path(),
|
|
)? {
|
|
return Err(io::Error::new(
|
|
io::ErrorKind::InvalidInput,
|
|
"fs/copy cannot copy a directory to itself or one of its descendants",
|
|
));
|
|
}
|
|
copy_dir_recursive(source_path.as_path(), destination_path.as_path())?;
|
|
return Ok(());
|
|
}
|
|
|
|
if file_type.is_symlink() {
|
|
copy_symlink(source_path.as_path(), destination_path.as_path())?;
|
|
return Ok(());
|
|
}
|
|
|
|
if file_type.is_file() {
|
|
std::fs::copy(source_path.as_path(), destination_path.as_path())?;
|
|
return Ok(());
|
|
}
|
|
|
|
Err(io::Error::new(
|
|
io::ErrorKind::InvalidInput,
|
|
"fs/copy only supports regular files, directories, and symlinks",
|
|
))
|
|
})
|
|
.await
|
|
.map_err(|err| io::Error::other(format!("filesystem task failed: {err}")))?
|
|
}
|
|
}
|
|
|
|
fn copy_dir_recursive(source: &Path, target: &Path) -> io::Result<()> {
|
|
std::fs::create_dir_all(target)?;
|
|
for entry in std::fs::read_dir(source)? {
|
|
let entry = entry?;
|
|
let source_path = entry.path();
|
|
let target_path = target.join(entry.file_name());
|
|
let file_type = entry.file_type()?;
|
|
|
|
if file_type.is_dir() {
|
|
copy_dir_recursive(&source_path, &target_path)?;
|
|
} else if file_type.is_file() {
|
|
std::fs::copy(&source_path, &target_path)?;
|
|
} else if file_type.is_symlink() {
|
|
copy_symlink(&source_path, &target_path)?;
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn destination_is_same_or_descendant_of_source(
|
|
source: &Path,
|
|
destination: &Path,
|
|
) -> io::Result<bool> {
|
|
let source = std::fs::canonicalize(source)?;
|
|
let destination = resolve_copy_destination_path(destination)?;
|
|
Ok(destination.starts_with(&source))
|
|
}
|
|
|
|
fn resolve_copy_destination_path(path: &Path) -> io::Result<PathBuf> {
|
|
let mut normalized = PathBuf::new();
|
|
for component in path.components() {
|
|
match component {
|
|
Component::Prefix(prefix) => normalized.push(prefix.as_os_str()),
|
|
Component::RootDir => normalized.push(component.as_os_str()),
|
|
Component::CurDir => {}
|
|
Component::ParentDir => {
|
|
normalized.pop();
|
|
}
|
|
Component::Normal(part) => normalized.push(part),
|
|
}
|
|
}
|
|
|
|
let mut unresolved_suffix = Vec::new();
|
|
let mut existing_path = normalized.as_path();
|
|
while !existing_path.exists() {
|
|
let Some(file_name) = existing_path.file_name() else {
|
|
break;
|
|
};
|
|
unresolved_suffix.push(file_name.to_os_string());
|
|
let Some(parent) = existing_path.parent() else {
|
|
break;
|
|
};
|
|
existing_path = parent;
|
|
}
|
|
|
|
let mut resolved = std::fs::canonicalize(existing_path)?;
|
|
for file_name in unresolved_suffix.iter().rev() {
|
|
resolved.push(file_name);
|
|
}
|
|
Ok(resolved)
|
|
}
|
|
|
|
fn copy_symlink(source: &Path, target: &Path) -> io::Result<()> {
|
|
let link_target = std::fs::read_link(source)?;
|
|
#[cfg(unix)]
|
|
{
|
|
std::os::unix::fs::symlink(&link_target, target)
|
|
}
|
|
#[cfg(windows)]
|
|
{
|
|
if symlink_points_to_directory(source)? {
|
|
std::os::windows::fs::symlink_dir(&link_target, target)
|
|
} else {
|
|
std::os::windows::fs::symlink_file(&link_target, target)
|
|
}
|
|
}
|
|
#[cfg(not(any(unix, windows)))]
|
|
{
|
|
let _ = link_target;
|
|
let _ = target;
|
|
Err(io::Error::new(
|
|
io::ErrorKind::Unsupported,
|
|
"copying symlinks is unsupported on this platform",
|
|
))
|
|
}
|
|
}
|
|
|
|
#[cfg(windows)]
|
|
fn symlink_points_to_directory(source: &Path) -> io::Result<bool> {
|
|
use std::os::windows::fs::FileTypeExt;
|
|
|
|
Ok(std::fs::symlink_metadata(source)?
|
|
.file_type()
|
|
.is_symlink_dir())
|
|
}
|
|
|
|
fn system_time_to_unix_ms(time: SystemTime) -> i64 {
|
|
time.duration_since(UNIX_EPOCH)
|
|
.ok()
|
|
.and_then(|duration| i64::try_from(duration.as_millis()).ok())
|
|
.unwrap_or(0)
|
|
}
|
|
|
|
#[cfg(all(test, windows))]
|
|
mod tests {
|
|
use super::*;
|
|
use pretty_assertions::assert_eq;
|
|
|
|
#[test]
|
|
fn symlink_points_to_directory_handles_dangling_directory_symlinks() -> io::Result<()> {
|
|
use std::os::windows::fs::symlink_dir;
|
|
|
|
let temp_dir = tempfile::TempDir::new()?;
|
|
let source_dir = temp_dir.path().join("source");
|
|
let link_path = temp_dir.path().join("source-link");
|
|
std::fs::create_dir(&source_dir)?;
|
|
|
|
if symlink_dir(&source_dir, &link_path).is_err() {
|
|
return Ok(());
|
|
}
|
|
|
|
std::fs::remove_dir(&source_dir)?;
|
|
|
|
assert_eq!(symlink_points_to_directory(&link_path)?, true);
|
|
Ok(())
|
|
}
|
|
}
|