Add FS abstraction and use in view_image (#14960)

Adds an environment crate and environment + file system abstraction.

Environment is a combination of attributes and services specific to
environment the agent is connected to:
File system, process management, OS, default shell.

The goal is to move most of agent logic that assumes environment to work
through the environment abstraction.
This commit is contained in:
pakrym-oai 2026-03-17 17:36:23 -07:00 committed by GitHub
parent 19b887128e
commit 83a60fdb94
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 597 additions and 352 deletions

15
codex-rs/Cargo.lock generated
View file

@ -1427,6 +1427,7 @@ dependencies = [
"codex-chatgpt",
"codex-cloud-requirements",
"codex-core",
"codex-environment",
"codex-feedback",
"codex-file-search",
"codex-login",
@ -1462,7 +1463,6 @@ dependencies = [
"tracing-opentelemetry",
"tracing-subscriber",
"uuid",
"walkdir",
"wiremock",
]
@ -1841,6 +1841,7 @@ dependencies = [
"codex-client",
"codex-config",
"codex-connectors",
"codex-environment",
"codex-execpolicy",
"codex-file-search",
"codex-git",
@ -1944,6 +1945,17 @@ dependencies = [
"serde_json",
]
[[package]]
name = "codex-environment"
version = "0.0.0"
dependencies = [
"async-trait",
"codex-utils-absolute-path",
"pretty_assertions",
"tempfile",
"tokio",
]
[[package]]
name = "codex-exec"
version = "0.0.0"
@ -2744,7 +2756,6 @@ dependencies = [
"base64 0.22.1",
"codex-utils-cache",
"image",
"tempfile",
"thiserror 2.0.18",
"tokio",
]

View file

@ -22,6 +22,7 @@ members = [
"shell-escalation",
"skills",
"core",
"environment",
"hooks",
"secrets",
"exec",
@ -103,6 +104,7 @@ codex-cloud-requirements = { path = "cloud-requirements" }
codex-connectors = { path = "connectors" }
codex-config = { path = "config" }
codex-core = { path = "core" }
codex-environment = { path = "environment" }
codex-exec = { path = "exec" }
codex-execpolicy = { path = "execpolicy" }
codex-experimental-api-macros = { path = "codex-experimental-api-macros" }

View file

@ -32,6 +32,7 @@ axum = { workspace = true, default-features = false, features = [
codex-arg0 = { workspace = true }
codex-cloud-requirements = { workspace = true }
codex-core = { workspace = true }
codex-environment = { workspace = true }
codex-otel = { workspace = true }
codex-shell-command = { workspace = true }
codex-utils-cli = { workspace = true }
@ -68,7 +69,6 @@ tokio-tungstenite = { workspace = true }
tracing = { workspace = true, features = ["log"] }
tracing-subscriber = { workspace = true, features = ["env-filter", "fmt", "json"] }
uuid = { workspace = true, features = ["serde", "v7"] }
walkdir = { workspace = true }
[dev-dependencies]
app_test_support = { workspace = true }

View file

@ -18,23 +18,37 @@ use codex_app_server_protocol::FsRemoveResponse;
use codex_app_server_protocol::FsWriteFileParams;
use codex_app_server_protocol::FsWriteFileResponse;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_environment::CopyOptions;
use codex_environment::CreateDirectoryOptions;
use codex_environment::Environment;
use codex_environment::ExecutorFileSystem;
use codex_environment::RemoveOptions;
use std::io;
use std::path::Component;
use std::path::Path;
use std::path::PathBuf;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use walkdir::WalkDir;
use std::sync::Arc;
#[derive(Clone, Default)]
pub(crate) struct FsApi;
#[derive(Clone)]
pub(crate) struct FsApi {
file_system: Arc<dyn ExecutorFileSystem>,
}
impl Default for FsApi {
fn default() -> Self {
Self {
file_system: Arc::new(Environment.get_filesystem()),
}
}
}
impl FsApi {
pub(crate) async fn read_file(
&self,
params: FsReadFileParams,
) -> Result<FsReadFileResponse, JSONRPCErrorError> {
let bytes = tokio::fs::read(params.path).await.map_err(map_io_error)?;
let bytes = self
.file_system
.read_file(&params.path)
.await
.map_err(map_fs_error)?;
Ok(FsReadFileResponse {
data_base64: STANDARD.encode(bytes),
})
@ -49,9 +63,10 @@ impl FsApi {
"fs/writeFile requires valid base64 dataBase64: {err}"
))
})?;
tokio::fs::write(params.path, bytes)
self.file_system
.write_file(&params.path, bytes)
.await
.map_err(map_io_error)?;
.map_err(map_fs_error)?;
Ok(FsWriteFileResponse {})
}
@ -59,15 +74,15 @@ impl FsApi {
&self,
params: FsCreateDirectoryParams,
) -> Result<FsCreateDirectoryResponse, JSONRPCErrorError> {
if params.recursive.unwrap_or(true) {
tokio::fs::create_dir_all(params.path)
.await
.map_err(map_io_error)?;
} else {
tokio::fs::create_dir(params.path)
.await
.map_err(map_io_error)?;
}
self.file_system
.create_directory(
&params.path,
CreateDirectoryOptions {
recursive: params.recursive.unwrap_or(true),
},
)
.await
.map_err(map_fs_error)?;
Ok(FsCreateDirectoryResponse {})
}
@ -75,14 +90,16 @@ impl FsApi {
&self,
params: FsGetMetadataParams,
) -> Result<FsGetMetadataResponse, JSONRPCErrorError> {
let metadata = tokio::fs::metadata(params.path)
let metadata = self
.file_system
.get_metadata(&params.path)
.await
.map_err(map_io_error)?;
.map_err(map_fs_error)?;
Ok(FsGetMetadataResponse {
is_directory: metadata.is_dir(),
is_file: metadata.is_file(),
created_at_ms: metadata.created().ok().map_or(0, system_time_to_unix_ms),
modified_at_ms: metadata.modified().ok().map_or(0, system_time_to_unix_ms),
is_directory: metadata.is_directory,
is_file: metadata.is_file,
created_at_ms: metadata.created_at_ms,
modified_at_ms: metadata.modified_at_ms,
})
}
@ -90,232 +107,59 @@ impl FsApi {
&self,
params: FsReadDirectoryParams,
) -> Result<FsReadDirectoryResponse, JSONRPCErrorError> {
let mut entries = Vec::new();
let mut read_dir = tokio::fs::read_dir(params.path)
let entries = self
.file_system
.read_directory(&params.path)
.await
.map_err(map_io_error)?;
while let Some(entry) = read_dir.next_entry().await.map_err(map_io_error)? {
let metadata = tokio::fs::metadata(entry.path())
.await
.map_err(map_io_error)?;
entries.push(FsReadDirectoryEntry {
file_name: entry.file_name().to_string_lossy().into_owned(),
is_directory: metadata.is_dir(),
is_file: metadata.is_file(),
});
}
Ok(FsReadDirectoryResponse { entries })
.map_err(map_fs_error)?;
Ok(FsReadDirectoryResponse {
entries: entries
.into_iter()
.map(|entry| FsReadDirectoryEntry {
file_name: entry.file_name,
is_directory: entry.is_directory,
is_file: entry.is_file,
})
.collect(),
})
}
pub(crate) async fn remove(
&self,
params: FsRemoveParams,
) -> Result<FsRemoveResponse, JSONRPCErrorError> {
let path = params.path.as_path();
let recursive = params.recursive.unwrap_or(true);
let force = params.force.unwrap_or(true);
match tokio::fs::symlink_metadata(path).await {
Ok(metadata) => {
let file_type = metadata.file_type();
if file_type.is_dir() {
if recursive {
tokio::fs::remove_dir_all(path)
.await
.map_err(map_io_error)?;
} else {
tokio::fs::remove_dir(path).await.map_err(map_io_error)?;
}
} else {
tokio::fs::remove_file(path).await.map_err(map_io_error)?;
}
Ok(FsRemoveResponse {})
}
Err(err) if err.kind() == io::ErrorKind::NotFound && force => Ok(FsRemoveResponse {}),
Err(err) => Err(map_io_error(err)),
}
self.file_system
.remove(
&params.path,
RemoveOptions {
recursive: params.recursive.unwrap_or(true),
force: params.force.unwrap_or(true),
},
)
.await
.map_err(map_fs_error)?;
Ok(FsRemoveResponse {})
}
pub(crate) async fn copy(
&self,
params: FsCopyParams,
) -> Result<FsCopyResponse, JSONRPCErrorError> {
let FsCopyParams {
source_path,
destination_path,
recursive,
} = params;
tokio::task::spawn_blocking(move || -> Result<(), JSONRPCErrorError> {
let metadata =
std::fs::symlink_metadata(source_path.as_path()).map_err(map_io_error)?;
let file_type = metadata.file_type();
if file_type.is_dir() {
if !recursive {
return Err(invalid_request(
"fs/copy requires recursive: true when sourcePath is a directory",
));
}
if destination_is_same_or_descendant_of_source(
source_path.as_path(),
destination_path.as_path(),
)
.map_err(map_io_error)?
{
return Err(invalid_request(
"fs/copy cannot copy a directory to itself or one of its descendants",
));
}
copy_dir_recursive(source_path.as_path(), destination_path.as_path())
.map_err(map_io_error)?;
return Ok(());
}
if file_type.is_symlink() {
copy_symlink(source_path.as_path(), destination_path.as_path())
.map_err(map_io_error)?;
return Ok(());
}
if file_type.is_file() {
std::fs::copy(source_path.as_path(), destination_path.as_path())
.map_err(map_io_error)?;
return Ok(());
}
Err(invalid_request(
"fs/copy only supports regular files, directories, and symlinks",
))
})
.await
.map_err(map_join_error)??;
self.file_system
.copy(
&params.source_path,
&params.destination_path,
CopyOptions {
recursive: params.recursive,
},
)
.await
.map_err(map_fs_error)?;
Ok(FsCopyResponse {})
}
}
fn copy_dir_recursive(source: &Path, target: &Path) -> io::Result<()> {
for entry in WalkDir::new(source) {
let entry = entry.map_err(|err| {
if let Some(io_err) = err.io_error() {
io::Error::new(io_err.kind(), io_err.to_string())
} else {
io::Error::other(err.to_string())
}
})?;
let relative_path = entry.path().strip_prefix(source).map_err(|err| {
io::Error::other(format!(
"failed to compute relative path for {} under {}: {err}",
entry.path().display(),
source.display()
))
})?;
let target_path = target.join(relative_path);
let file_type = entry.file_type();
if file_type.is_dir() {
std::fs::create_dir_all(&target_path)?;
continue;
}
if file_type.is_file() {
std::fs::copy(entry.path(), &target_path)?;
continue;
}
if file_type.is_symlink() {
copy_symlink(entry.path(), &target_path)?;
continue;
}
// For now ignore special files such as FIFOs, sockets, and device nodes during recursive copies.
}
Ok(())
}
fn destination_is_same_or_descendant_of_source(
source: &Path,
destination: &Path,
) -> io::Result<bool> {
let source = std::fs::canonicalize(source)?;
let destination = resolve_copy_destination_path(destination)?;
Ok(destination.starts_with(&source))
}
fn resolve_copy_destination_path(path: &Path) -> io::Result<PathBuf> {
let mut normalized = PathBuf::new();
for component in path.components() {
match component {
Component::Prefix(prefix) => normalized.push(prefix.as_os_str()),
Component::RootDir => normalized.push(component.as_os_str()),
Component::CurDir => {}
Component::ParentDir => {
normalized.pop();
}
Component::Normal(part) => normalized.push(part),
}
}
let mut unresolved_suffix = Vec::new();
let mut existing_path = normalized.as_path();
while !existing_path.exists() {
let Some(file_name) = existing_path.file_name() else {
break;
};
unresolved_suffix.push(file_name.to_os_string());
let Some(parent) = existing_path.parent() else {
break;
};
existing_path = parent;
}
let mut resolved = std::fs::canonicalize(existing_path)?;
for file_name in unresolved_suffix.iter().rev() {
resolved.push(file_name);
}
Ok(resolved)
}
fn copy_symlink(source: &Path, target: &Path) -> io::Result<()> {
let link_target = std::fs::read_link(source)?;
#[cfg(unix)]
{
std::os::unix::fs::symlink(&link_target, target)
}
#[cfg(windows)]
{
if symlink_points_to_directory(source)? {
std::os::windows::fs::symlink_dir(&link_target, target)
} else {
std::os::windows::fs::symlink_file(&link_target, target)
}
}
#[cfg(not(any(unix, windows)))]
{
let _ = link_target;
let _ = target;
Err(io::Error::new(
io::ErrorKind::Unsupported,
"copying symlinks is unsupported on this platform",
))
}
}
#[cfg(windows)]
fn symlink_points_to_directory(source: &Path) -> io::Result<bool> {
use std::os::windows::fs::FileTypeExt;
Ok(std::fs::symlink_metadata(source)?
.file_type()
.is_symlink_dir())
}
fn system_time_to_unix_ms(time: SystemTime) -> i64 {
time.duration_since(UNIX_EPOCH)
.ok()
.and_then(|duration| i64::try_from(duration.as_millis()).ok())
.unwrap_or(0)
}
pub(crate) fn invalid_request(message: impl Into<String>) -> JSONRPCErrorError {
fn invalid_request(message: impl Into<String>) -> JSONRPCErrorError {
JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: message.into(),
@ -323,43 +167,14 @@ pub(crate) fn invalid_request(message: impl Into<String>) -> JSONRPCErrorError {
}
}
fn map_join_error(err: tokio::task::JoinError) -> JSONRPCErrorError {
JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("filesystem task failed: {err}"),
data: None,
}
}
pub(crate) fn map_io_error(err: io::Error) -> JSONRPCErrorError {
JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: err.to_string(),
data: None,
}
}
#[cfg(all(test, windows))]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
#[test]
fn symlink_points_to_directory_handles_dangling_directory_symlinks() -> io::Result<()> {
use std::os::windows::fs::symlink_dir;
let temp_dir = tempfile::TempDir::new()?;
let source_dir = temp_dir.path().join("source");
let link_path = temp_dir.path().join("source-link");
std::fs::create_dir(&source_dir)?;
if symlink_dir(&source_dir, &link_path).is_err() {
return Ok(());
fn map_fs_error(err: io::Error) -> JSONRPCErrorError {
if err.kind() == io::ErrorKind::InvalidInput {
invalid_request(err.to_string())
} else {
JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: err.to_string(),
data: None,
}
std::fs::remove_dir(&source_dir)?;
assert_eq!(symlink_points_to_directory(&link_path)?, true);
Ok(())
}
}

View file

@ -253,7 +253,7 @@ impl MessageProcessor {
analytics_events_client,
);
let external_agent_config_api = ExternalAgentConfigApi::new(config.codex_home.clone());
let fs_api = FsApi;
let fs_api = FsApi::default();
Self {
outgoing,

View file

@ -34,6 +34,7 @@ codex-async-utils = { workspace = true }
codex-client = { workspace = true }
codex-connectors = { workspace = true }
codex-config = { workspace = true }
codex-environment = { workspace = true }
codex-shell-command = { workspace = true }
codex-skills = { workspace = true }
codex-execpolicy = { workspace = true }

View file

@ -60,6 +60,7 @@ use chrono::Local;
use chrono::Utc;
use codex_app_server_protocol::McpServerElicitationRequest;
use codex_app_server_protocol::McpServerElicitationRequestParams;
use codex_environment::Environment;
use codex_hooks::HookEvent;
use codex_hooks::HookEventAfterAgent;
use codex_hooks::HookPayload;
@ -785,6 +786,7 @@ pub(crate) struct TurnContext {
pub(crate) reasoning_effort: Option<ReasoningEffortConfig>,
pub(crate) reasoning_summary: ReasoningSummaryConfig,
pub(crate) session_source: SessionSource,
pub(crate) environment: Arc<Environment>,
/// The session's current working directory. All relative paths provided by
/// the model as well as sandbox policies are resolved against this path
/// instead of `std::env::current_dir()`.
@ -894,6 +896,7 @@ impl TurnContext {
reasoning_effort,
reasoning_summary: self.reasoning_summary,
session_source: self.session_source.clone(),
environment: Arc::clone(&self.environment),
cwd: self.cwd.clone(),
current_date: self.current_date.clone(),
timezone: self.timezone.clone(),
@ -1282,6 +1285,7 @@ impl Session {
model_info: ModelInfo,
models_manager: &ModelsManager,
network: Option<NetworkProxy>,
environment: Arc<Environment>,
sub_id: String,
js_repl: Arc<JsReplHandle>,
skills_outcome: Arc<SkillLoadOutcome>,
@ -1338,6 +1342,7 @@ impl Session {
reasoning_effort,
reasoning_summary,
session_source,
environment,
cwd,
current_date: Some(current_date),
timezone: Some(timezone),
@ -1810,6 +1815,7 @@ impl Session {
code_mode_service: crate::tools::code_mode::CodeModeService::new(
config.js_repl_node_path.clone(),
),
environment: Arc::new(Environment),
};
let js_repl = Arc::new(JsReplHandle::with_node_path(
config.js_repl_node_path.clone(),
@ -2389,6 +2395,7 @@ impl Session {
.network_proxy
.as_ref()
.map(StartedNetworkProxy::proxy),
Arc::clone(&self.services.environment),
sub_id,
Arc::clone(&self.js_repl),
skills_outcome,
@ -5198,6 +5205,7 @@ async fn spawn_review_thread(
reasoning_effort,
reasoning_summary,
session_source,
environment: Arc::clone(&parent_turn_context.environment),
tools_config,
features: parent_turn_context.features.clone(),
ghost_snapshot: parent_turn_context.ghost_snapshot.clone(),

View file

@ -2466,6 +2466,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
true,
));
let network_approval = Arc::new(NetworkApprovalService::default());
let environment = Arc::new(codex_environment::Environment);
let file_watcher = Arc::new(FileWatcher::noop());
let services = SessionServices {
@ -2520,6 +2521,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
code_mode_service: crate::tools::code_mode::CodeModeService::new(
config.js_repl_node_path.clone(),
),
environment: Arc::clone(&environment),
};
let js_repl = Arc::new(JsReplHandle::with_node_path(
config.js_repl_node_path.clone(),
@ -2539,6 +2541,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
model_info,
&models_manager,
None,
environment,
"turn_id".to_string(),
Arc::clone(&js_repl),
skills_outcome,
@ -3258,6 +3261,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
true,
));
let network_approval = Arc::new(NetworkApprovalService::default());
let environment = Arc::new(codex_environment::Environment);
let file_watcher = Arc::new(FileWatcher::noop());
let services = SessionServices {
@ -3312,6 +3316,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
code_mode_service: crate::tools::code_mode::CodeModeService::new(
config.js_repl_node_path.clone(),
),
environment: Arc::clone(&environment),
};
let js_repl = Arc::new(JsReplHandle::with_node_path(
config.js_repl_node_path.clone(),
@ -3331,6 +3336,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
model_info,
&models_manager,
None,
environment,
"turn_id".to_string(),
Arc::clone(&js_repl),
skills_outcome,

View file

@ -20,6 +20,7 @@ use crate::tools::network_approval::NetworkApprovalService;
use crate::tools::runtimes::ExecveSessionApproval;
use crate::tools::sandboxing::ApprovalStore;
use crate::unified_exec::UnifiedExecProcessManager;
use codex_environment::Environment;
use codex_hooks::Hooks;
use codex_otel::SessionTelemetry;
use codex_utils_absolute_path::AbsolutePathBuf;
@ -61,4 +62,5 @@ pub(crate) struct SessionServices {
/// Session-scoped model client shared across turns.
pub(crate) model_client: ModelClient,
pub(crate) code_mode_service: CodeModeService,
pub(crate) environment: Arc<Environment>,
}

View file

@ -1,12 +1,13 @@
use async_trait::async_trait;
use codex_environment::ExecutorFileSystem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputContentItem;
use codex_protocol::models::ImageDetail;
use codex_protocol::models::local_image_content_items_with_label_number;
use codex_protocol::openai_models::InputModality;
use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_image::PromptImageMode;
use serde::Deserialize;
use tokio::fs;
use crate::function_tool::FunctionCallError;
use crate::original_image_detail::can_request_original_image_detail;
@ -87,22 +88,41 @@ impl ToolHandler for ViewImageHandler {
}
};
let abs_path = turn.resolve_path(Some(args.path));
let abs_path =
AbsolutePathBuf::try_from(turn.resolve_path(Some(args.path))).map_err(|error| {
FunctionCallError::RespondToModel(format!("unable to resolve image path: {error}"))
})?;
let metadata = fs::metadata(&abs_path).await.map_err(|error| {
FunctionCallError::RespondToModel(format!(
"unable to locate image at `{}`: {error}",
abs_path.display()
))
})?;
let metadata = turn
.environment
.get_filesystem()
.get_metadata(&abs_path)
.await
.map_err(|error| {
FunctionCallError::RespondToModel(format!(
"unable to locate image at `{}`: {error}",
abs_path.display()
))
})?;
if !metadata.is_file() {
if !metadata.is_file {
return Err(FunctionCallError::RespondToModel(format!(
"image path `{}` is not a file",
abs_path.display()
)));
}
let event_path = abs_path.clone();
let file_bytes = turn
.environment
.get_filesystem()
.read_file(&abs_path)
.await
.map_err(|error| {
FunctionCallError::RespondToModel(format!(
"unable to read image at `{}`: {error}",
abs_path.display()
))
})?;
let event_path = abs_path.to_path_buf();
let can_request_original_detail =
can_request_original_image_detail(turn.features.get(), &turn.model_info);
@ -116,7 +136,10 @@ impl ToolHandler for ViewImageHandler {
let image_detail = use_original_detail.then_some(ImageDetail::Original);
let content = local_image_content_items_with_label_number(
&abs_path, /*label_number*/ None, image_mode,
abs_path.as_path(),
file_bytes,
/*label_number*/ None,
image_mode,
)
.into_iter()
.map(|item| match item {

View file

@ -0,0 +1,6 @@
load("//:defs.bzl", "codex_rust_crate")
codex_rust_crate(
name = "environment",
crate_name = "codex_environment",
)

View file

@ -0,0 +1,21 @@
[package]
name = "codex-environment"
version.workspace = true
edition.workspace = true
license.workspace = true
[lib]
name = "codex_environment"
path = "src/lib.rs"
[lints]
workspace = true
[dependencies]
async-trait = { workspace = true }
codex-utils-absolute-path = { workspace = true }
tokio = { workspace = true, features = ["fs", "io-util", "rt"] }
[dev-dependencies]
pretty_assertions = { workspace = true }
tempfile = { workspace = true }

View file

@ -0,0 +1,332 @@
use async_trait::async_trait;
use codex_utils_absolute_path::AbsolutePathBuf;
use std::path::Component;
use std::path::Path;
use std::path::PathBuf;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use tokio::io;
const MAX_READ_FILE_BYTES: u64 = 512 * 1024 * 1024;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct CreateDirectoryOptions {
pub recursive: bool,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct RemoveOptions {
pub recursive: bool,
pub force: bool,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct CopyOptions {
pub recursive: bool,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct FileMetadata {
pub is_directory: bool,
pub is_file: bool,
pub created_at_ms: i64,
pub modified_at_ms: i64,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ReadDirectoryEntry {
pub file_name: String,
pub is_directory: bool,
pub is_file: bool,
}
pub type FileSystemResult<T> = io::Result<T>;
#[async_trait]
pub trait ExecutorFileSystem: Send + Sync {
async fn read_file(&self, path: &AbsolutePathBuf) -> FileSystemResult<Vec<u8>>;
async fn write_file(&self, path: &AbsolutePathBuf, contents: Vec<u8>) -> FileSystemResult<()>;
async fn create_directory(
&self,
path: &AbsolutePathBuf,
options: CreateDirectoryOptions,
) -> FileSystemResult<()>;
async fn get_metadata(&self, path: &AbsolutePathBuf) -> FileSystemResult<FileMetadata>;
async fn read_directory(
&self,
path: &AbsolutePathBuf,
) -> FileSystemResult<Vec<ReadDirectoryEntry>>;
async fn remove(&self, path: &AbsolutePathBuf, options: RemoveOptions) -> FileSystemResult<()>;
async fn copy(
&self,
source_path: &AbsolutePathBuf,
destination_path: &AbsolutePathBuf,
options: CopyOptions,
) -> FileSystemResult<()>;
}
#[derive(Clone, Default)]
pub(crate) struct LocalFileSystem;
#[async_trait]
impl ExecutorFileSystem for LocalFileSystem {
async fn read_file(&self, path: &AbsolutePathBuf) -> FileSystemResult<Vec<u8>> {
let metadata = tokio::fs::metadata(path.as_path()).await?;
if metadata.len() > MAX_READ_FILE_BYTES {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("file is too large to read: limit is {MAX_READ_FILE_BYTES} bytes"),
));
}
tokio::fs::read(path.as_path()).await
}
async fn write_file(&self, path: &AbsolutePathBuf, contents: Vec<u8>) -> FileSystemResult<()> {
tokio::fs::write(path.as_path(), contents).await
}
async fn create_directory(
&self,
path: &AbsolutePathBuf,
options: CreateDirectoryOptions,
) -> FileSystemResult<()> {
if options.recursive {
tokio::fs::create_dir_all(path.as_path()).await?;
} else {
tokio::fs::create_dir(path.as_path()).await?;
}
Ok(())
}
async fn get_metadata(&self, path: &AbsolutePathBuf) -> FileSystemResult<FileMetadata> {
let metadata = tokio::fs::metadata(path.as_path()).await?;
Ok(FileMetadata {
is_directory: metadata.is_dir(),
is_file: metadata.is_file(),
created_at_ms: metadata.created().ok().map_or(0, system_time_to_unix_ms),
modified_at_ms: metadata.modified().ok().map_or(0, system_time_to_unix_ms),
})
}
async fn read_directory(
&self,
path: &AbsolutePathBuf,
) -> FileSystemResult<Vec<ReadDirectoryEntry>> {
let mut entries = Vec::new();
let mut read_dir = tokio::fs::read_dir(path.as_path()).await?;
while let Some(entry) = read_dir.next_entry().await? {
let metadata = tokio::fs::metadata(entry.path()).await?;
entries.push(ReadDirectoryEntry {
file_name: entry.file_name().to_string_lossy().into_owned(),
is_directory: metadata.is_dir(),
is_file: metadata.is_file(),
});
}
Ok(entries)
}
async fn remove(&self, path: &AbsolutePathBuf, options: RemoveOptions) -> FileSystemResult<()> {
match tokio::fs::symlink_metadata(path.as_path()).await {
Ok(metadata) => {
let file_type = metadata.file_type();
if file_type.is_dir() {
if options.recursive {
tokio::fs::remove_dir_all(path.as_path()).await?;
} else {
tokio::fs::remove_dir(path.as_path()).await?;
}
} else {
tokio::fs::remove_file(path.as_path()).await?;
}
Ok(())
}
Err(err) if err.kind() == io::ErrorKind::NotFound && options.force => Ok(()),
Err(err) => Err(err),
}
}
async fn copy(
&self,
source_path: &AbsolutePathBuf,
destination_path: &AbsolutePathBuf,
options: CopyOptions,
) -> FileSystemResult<()> {
let source_path = source_path.to_path_buf();
let destination_path = destination_path.to_path_buf();
tokio::task::spawn_blocking(move || -> FileSystemResult<()> {
let metadata = std::fs::symlink_metadata(source_path.as_path())?;
let file_type = metadata.file_type();
if file_type.is_dir() {
if !options.recursive {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"fs/copy requires recursive: true when sourcePath is a directory",
));
}
if destination_is_same_or_descendant_of_source(
source_path.as_path(),
destination_path.as_path(),
)? {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"fs/copy cannot copy a directory to itself or one of its descendants",
));
}
copy_dir_recursive(source_path.as_path(), destination_path.as_path())?;
return Ok(());
}
if file_type.is_symlink() {
copy_symlink(source_path.as_path(), destination_path.as_path())?;
return Ok(());
}
if file_type.is_file() {
std::fs::copy(source_path.as_path(), destination_path.as_path())?;
return Ok(());
}
Err(io::Error::new(
io::ErrorKind::InvalidInput,
"fs/copy only supports regular files, directories, and symlinks",
))
})
.await
.map_err(|err| io::Error::other(format!("filesystem task failed: {err}")))?
}
}
fn copy_dir_recursive(source: &Path, target: &Path) -> io::Result<()> {
std::fs::create_dir_all(target)?;
for entry in std::fs::read_dir(source)? {
let entry = entry?;
let source_path = entry.path();
let target_path = target.join(entry.file_name());
let file_type = entry.file_type()?;
if file_type.is_dir() {
copy_dir_recursive(&source_path, &target_path)?;
} else if file_type.is_file() {
std::fs::copy(&source_path, &target_path)?;
} else if file_type.is_symlink() {
copy_symlink(&source_path, &target_path)?;
}
}
Ok(())
}
fn destination_is_same_or_descendant_of_source(
source: &Path,
destination: &Path,
) -> io::Result<bool> {
let source = std::fs::canonicalize(source)?;
let destination = resolve_copy_destination_path(destination)?;
Ok(destination.starts_with(&source))
}
fn resolve_copy_destination_path(path: &Path) -> io::Result<PathBuf> {
let mut normalized = PathBuf::new();
for component in path.components() {
match component {
Component::Prefix(prefix) => normalized.push(prefix.as_os_str()),
Component::RootDir => normalized.push(component.as_os_str()),
Component::CurDir => {}
Component::ParentDir => {
normalized.pop();
}
Component::Normal(part) => normalized.push(part),
}
}
let mut unresolved_suffix = Vec::new();
let mut existing_path = normalized.as_path();
while !existing_path.exists() {
let Some(file_name) = existing_path.file_name() else {
break;
};
unresolved_suffix.push(file_name.to_os_string());
let Some(parent) = existing_path.parent() else {
break;
};
existing_path = parent;
}
let mut resolved = std::fs::canonicalize(existing_path)?;
for file_name in unresolved_suffix.iter().rev() {
resolved.push(file_name);
}
Ok(resolved)
}
fn copy_symlink(source: &Path, target: &Path) -> io::Result<()> {
let link_target = std::fs::read_link(source)?;
#[cfg(unix)]
{
std::os::unix::fs::symlink(&link_target, target)
}
#[cfg(windows)]
{
if symlink_points_to_directory(source)? {
std::os::windows::fs::symlink_dir(&link_target, target)
} else {
std::os::windows::fs::symlink_file(&link_target, target)
}
}
#[cfg(not(any(unix, windows)))]
{
let _ = link_target;
let _ = target;
Err(io::Error::new(
io::ErrorKind::Unsupported,
"copying symlinks is unsupported on this platform",
))
}
}
#[cfg(windows)]
fn symlink_points_to_directory(source: &Path) -> io::Result<bool> {
use std::os::windows::fs::FileTypeExt;
Ok(std::fs::symlink_metadata(source)?
.file_type()
.is_symlink_dir())
}
fn system_time_to_unix_ms(time: SystemTime) -> i64 {
time.duration_since(UNIX_EPOCH)
.ok()
.and_then(|duration| i64::try_from(duration.as_millis()).ok())
.unwrap_or(0)
}
#[cfg(all(test, windows))]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
#[test]
fn symlink_points_to_directory_handles_dangling_directory_symlinks() -> io::Result<()> {
use std::os::windows::fs::symlink_dir;
let temp_dir = tempfile::TempDir::new()?;
let source_dir = temp_dir.path().join("source");
let link_path = temp_dir.path().join("source-link");
std::fs::create_dir(&source_dir)?;
if symlink_dir(&source_dir, &link_path).is_err() {
return Ok(());
}
std::fs::remove_dir(&source_dir)?;
assert_eq!(symlink_points_to_directory(&link_path)?, true);
Ok(())
}
}

View file

@ -0,0 +1,18 @@
pub mod fs;
pub use fs::CopyOptions;
pub use fs::CreateDirectoryOptions;
pub use fs::ExecutorFileSystem;
pub use fs::FileMetadata;
pub use fs::FileSystemResult;
pub use fs::ReadDirectoryEntry;
pub use fs::RemoveOptions;
#[derive(Clone, Debug, Default)]
pub struct Environment;
impl Environment {
pub fn get_filesystem(&self) -> impl ExecutorFileSystem + use<> {
fs::LocalFileSystem
}
}

View file

@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::path::Path;
use codex_utils_image::PromptImageMode;
use codex_utils_image::load_for_prompt;
use codex_utils_image::load_for_prompt_bytes;
use serde::Deserialize;
use serde::Deserializer;
use serde::Serialize;
@ -944,10 +944,11 @@ fn unsupported_image_error_placeholder(path: &std::path::Path, mime: &str) -> Co
pub fn local_image_content_items_with_label_number(
path: &std::path::Path,
file_bytes: Vec<u8>,
label_number: Option<usize>,
mode: PromptImageMode,
) -> Vec<ContentItem> {
match load_for_prompt(path, mode) {
match load_for_prompt_bytes(path, file_bytes, mode) {
Ok(image) => {
let mut items = Vec::with_capacity(3);
if let Some(label_number) = label_number {
@ -1114,11 +1115,15 @@ impl From<Vec<UserInput>> for ResponseInputItem {
}
UserInput::LocalImage { path } => {
image_index += 1;
local_image_content_items_with_label_number(
&path,
Some(image_index),
PromptImageMode::ResizeToFit,
)
match std::fs::read(&path) {
Ok(file_bytes) => local_image_content_items_with_label_number(
&path,
file_bytes,
Some(image_index),
PromptImageMode::ResizeToFit,
),
Err(err) => vec![local_image_error_placeholder(&path, err)],
}
}
UserInput::Skill { .. } | UserInput::Mention { .. } => Vec::new(), // Tool bodies are injected later in core
})

View file

@ -16,4 +16,3 @@ tokio = { workspace = true, features = ["fs", "rt", "rt-multi-thread", "macros"]
[dev-dependencies]
image = { workspace = true, features = ["jpeg", "png", "gif", "webp"] }
tempfile = { workspace = true }

View file

@ -53,18 +53,13 @@ struct ImageCacheKey {
static IMAGE_CACHE: LazyLock<BlockingLruCache<ImageCacheKey, EncodedImage>> =
LazyLock::new(|| BlockingLruCache::new(NonZeroUsize::new(32).unwrap_or(NonZeroUsize::MIN)));
pub fn load_and_resize_to_fit(path: &Path) -> Result<EncodedImage, ImageProcessingError> {
load_for_prompt(path, PromptImageMode::ResizeToFit)
}
pub fn load_for_prompt(
pub fn load_for_prompt_bytes(
path: &Path,
file_bytes: Vec<u8>,
mode: PromptImageMode,
) -> Result<EncodedImage, ImageProcessingError> {
let path_buf = path.to_path_buf();
let file_bytes = read_file_bytes(path, &path_buf)?;
let key = ImageCacheKey {
digest: sha1_digest(&file_bytes),
mode,
@ -136,24 +131,6 @@ fn can_preserve_source_bytes(format: ImageFormat) -> bool {
)
}
fn read_file_bytes(path: &Path, path_for_error: &Path) -> Result<Vec<u8>, ImageProcessingError> {
match tokio::runtime::Handle::try_current() {
// If we're inside a Tokio runtime, avoid block_on (it panics on worker threads).
// Use block_in_place and do a standard blocking read safely.
Ok(_) => tokio::task::block_in_place(|| std::fs::read(path)).map_err(|source| {
ImageProcessingError::Read {
path: path_for_error.to_path_buf(),
source,
}
}),
// Outside a runtime, just read synchronously.
Err(_) => std::fs::read(path).map_err(|source| ImageProcessingError::Read {
path: path_for_error.to_path_buf(),
source,
}),
}
}
fn encode_image(
image: &DynamicImage,
preferred_format: ImageFormat,
@ -223,11 +200,20 @@ fn format_to_mime(format: ImageFormat) -> String {
#[cfg(test)]
mod tests {
use std::io::Cursor;
use super::*;
use image::GenericImageView;
use image::ImageBuffer;
use image::Rgba;
use tempfile::NamedTempFile;
fn image_bytes(image: &ImageBuffer<Rgba<u8>, Vec<u8>>, format: ImageFormat) -> Vec<u8> {
let mut encoded = Cursor::new(Vec::new());
DynamicImage::ImageRgba8(image.clone())
.write_to(&mut encoded, format)
.expect("encode image to bytes");
encoded.into_inner()
}
#[tokio::test(flavor = "multi_thread")]
async fn returns_original_image_when_within_bounds() {
@ -235,14 +221,15 @@ mod tests {
(ImageFormat::Png, "image/png"),
(ImageFormat::WebP, "image/webp"),
] {
let temp_file = NamedTempFile::new().expect("temp file");
let image = ImageBuffer::from_pixel(64, 32, Rgba([10u8, 20, 30, 255]));
image
.save_with_format(temp_file.path(), format)
.expect("write image to temp file");
let original_bytes = image_bytes(&image, format);
let original_bytes = std::fs::read(temp_file.path()).expect("read written image");
let encoded = load_and_resize_to_fit(temp_file.path()).expect("process image");
let encoded = load_for_prompt_bytes(
Path::new("in-memory-image"),
original_bytes.clone(),
PromptImageMode::ResizeToFit,
)
.expect("process image");
assert_eq!(encoded.width, 64);
assert_eq!(encoded.height, 32);
@ -257,13 +244,15 @@ mod tests {
(ImageFormat::Png, "image/png"),
(ImageFormat::WebP, "image/webp"),
] {
let temp_file = NamedTempFile::new().expect("temp file");
let image = ImageBuffer::from_pixel(4096, 2048, Rgba([200u8, 10, 10, 255]));
image
.save_with_format(temp_file.path(), format)
.expect("write image to temp file");
let original_bytes = image_bytes(&image, format);
let processed = load_and_resize_to_fit(temp_file.path()).expect("process image");
let processed = load_for_prompt_bytes(
Path::new("in-memory-image"),
original_bytes,
PromptImageMode::ResizeToFit,
)
.expect("process image");
assert!(processed.width <= MAX_WIDTH);
assert!(processed.height <= MAX_HEIGHT);
@ -281,15 +270,15 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn preserves_large_image_in_original_mode() {
let temp_file = NamedTempFile::new().expect("temp file");
let image = ImageBuffer::from_pixel(4096, 2048, Rgba([180u8, 30, 30, 255]));
image
.save_with_format(temp_file.path(), ImageFormat::Png)
.expect("write png to temp file");
let original_bytes = image_bytes(&image, ImageFormat::Png);
let original_bytes = std::fs::read(temp_file.path()).expect("read written image");
let processed =
load_for_prompt(temp_file.path(), PromptImageMode::Original).expect("process image");
let processed = load_for_prompt_bytes(
Path::new("in-memory-image"),
original_bytes.clone(),
PromptImageMode::Original,
)
.expect("process image");
assert_eq!(processed.width, 4096);
assert_eq!(processed.height, 2048);
@ -299,10 +288,12 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn fails_cleanly_for_invalid_images() {
let temp_file = NamedTempFile::new().expect("temp file");
std::fs::write(temp_file.path(), b"not an image").expect("write bytes");
let err = load_and_resize_to_fit(temp_file.path()).expect_err("invalid image should fail");
let err = load_for_prompt_bytes(
Path::new("in-memory-image"),
b"not an image".to_vec(),
PromptImageMode::ResizeToFit,
)
.expect_err("invalid image should fail");
match err {
ImageProcessingError::Decode { .. } => {}
_ => panic!("unexpected error variant"),
@ -315,20 +306,25 @@ mod tests {
IMAGE_CACHE.clear();
}
let temp_file = NamedTempFile::new().expect("temp file");
let first_image = ImageBuffer::from_pixel(32, 16, Rgba([20u8, 120, 220, 255]));
first_image
.save_with_format(temp_file.path(), ImageFormat::Png)
.expect("write initial image");
let first_bytes = image_bytes(&first_image, ImageFormat::Png);
let first = load_and_resize_to_fit(temp_file.path()).expect("process first image");
let first = load_for_prompt_bytes(
Path::new("in-memory-image"),
first_bytes,
PromptImageMode::ResizeToFit,
)
.expect("process first image");
let second_image = ImageBuffer::from_pixel(96, 48, Rgba([50u8, 60, 70, 255]));
second_image
.save_with_format(temp_file.path(), ImageFormat::Png)
.expect("write updated image");
let second_bytes = image_bytes(&second_image, ImageFormat::Png);
let second = load_and_resize_to_fit(temp_file.path()).expect("process updated image");
let second = load_for_prompt_bytes(
Path::new("in-memory-image"),
second_bytes,
PromptImageMode::ResizeToFit,
)
.expect("process updated image");
assert_eq!(first.width, 32);
assert_eq!(first.height, 16);