From 83a60fdb94d5ee074a9ec33a48699d576a89c4a1 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Tue, 17 Mar 2026 17:36:23 -0700 Subject: [PATCH] Add FS abstraction and use in view_image (#14960) Adds an environment crate and environment + file system abstraction. Environment is a combination of attributes and services specific to environment the agent is connected to: File system, process management, OS, default shell. The goal is to move most of agent logic that assumes environment to work through the environment abstraction. --- codex-rs/Cargo.lock | 15 +- codex-rs/Cargo.toml | 2 + codex-rs/app-server/Cargo.toml | 2 +- codex-rs/app-server/src/fs_api.rs | 359 +++++------------- codex-rs/app-server/src/message_processor.rs | 2 +- codex-rs/core/Cargo.toml | 1 + codex-rs/core/src/codex.rs | 8 + codex-rs/core/src/codex_tests.rs | 6 + codex-rs/core/src/state/service.rs | 2 + .../core/src/tools/handlers/view_image.rs | 45 ++- codex-rs/environment/BUILD.bazel | 6 + codex-rs/environment/Cargo.toml | 21 + codex-rs/environment/src/fs.rs | 332 ++++++++++++++++ codex-rs/environment/src/lib.rs | 18 + codex-rs/protocol/src/models.rs | 19 +- codex-rs/utils/image/Cargo.toml | 1 - codex-rs/utils/image/src/lib.rs | 110 +++--- 17 files changed, 597 insertions(+), 352 deletions(-) create mode 100644 codex-rs/environment/BUILD.bazel create mode 100644 codex-rs/environment/Cargo.toml create mode 100644 codex-rs/environment/src/fs.rs create mode 100644 codex-rs/environment/src/lib.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 84f911bd4..74609ca05 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1427,6 +1427,7 @@ dependencies = [ "codex-chatgpt", "codex-cloud-requirements", "codex-core", + "codex-environment", "codex-feedback", "codex-file-search", "codex-login", @@ -1462,7 +1463,6 @@ dependencies = [ "tracing-opentelemetry", "tracing-subscriber", "uuid", - "walkdir", "wiremock", ] @@ -1841,6 +1841,7 @@ dependencies = [ "codex-client", "codex-config", "codex-connectors", + "codex-environment", "codex-execpolicy", "codex-file-search", "codex-git", @@ -1944,6 +1945,17 @@ dependencies = [ "serde_json", ] +[[package]] +name = "codex-environment" +version = "0.0.0" +dependencies = [ + "async-trait", + "codex-utils-absolute-path", + "pretty_assertions", + "tempfile", + "tokio", +] + [[package]] name = "codex-exec" version = "0.0.0" @@ -2744,7 +2756,6 @@ dependencies = [ "base64 0.22.1", "codex-utils-cache", "image", - "tempfile", "thiserror 2.0.18", "tokio", ] diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index b1e0fcf37..35ff64195 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -22,6 +22,7 @@ members = [ "shell-escalation", "skills", "core", + "environment", "hooks", "secrets", "exec", @@ -103,6 +104,7 @@ codex-cloud-requirements = { path = "cloud-requirements" } codex-connectors = { path = "connectors" } codex-config = { path = "config" } codex-core = { path = "core" } +codex-environment = { path = "environment" } codex-exec = { path = "exec" } codex-execpolicy = { path = "execpolicy" } codex-experimental-api-macros = { path = "codex-experimental-api-macros" } diff --git a/codex-rs/app-server/Cargo.toml b/codex-rs/app-server/Cargo.toml index 0d62c8f13..c4588df7e 100644 --- a/codex-rs/app-server/Cargo.toml +++ b/codex-rs/app-server/Cargo.toml @@ -32,6 +32,7 @@ axum = { workspace = true, default-features = false, features = [ codex-arg0 = { workspace = true } codex-cloud-requirements = { workspace = true } codex-core = { workspace = true } +codex-environment = { workspace = true } codex-otel = { workspace = true } codex-shell-command = { workspace = true } codex-utils-cli = { workspace = true } @@ -68,7 +69,6 @@ tokio-tungstenite = { workspace = true } tracing = { workspace = true, features = ["log"] } tracing-subscriber = { workspace = true, features = ["env-filter", "fmt", "json"] } uuid = { workspace = true, features = ["serde", "v7"] } -walkdir = { workspace = true } [dev-dependencies] app_test_support = { workspace = true } diff --git a/codex-rs/app-server/src/fs_api.rs b/codex-rs/app-server/src/fs_api.rs index 32a331995..601842862 100644 --- a/codex-rs/app-server/src/fs_api.rs +++ b/codex-rs/app-server/src/fs_api.rs @@ -18,23 +18,37 @@ use codex_app_server_protocol::FsRemoveResponse; use codex_app_server_protocol::FsWriteFileParams; use codex_app_server_protocol::FsWriteFileResponse; use codex_app_server_protocol::JSONRPCErrorError; +use codex_environment::CopyOptions; +use codex_environment::CreateDirectoryOptions; +use codex_environment::Environment; +use codex_environment::ExecutorFileSystem; +use codex_environment::RemoveOptions; use std::io; -use std::path::Component; -use std::path::Path; -use std::path::PathBuf; -use std::time::SystemTime; -use std::time::UNIX_EPOCH; -use walkdir::WalkDir; +use std::sync::Arc; -#[derive(Clone, Default)] -pub(crate) struct FsApi; +#[derive(Clone)] +pub(crate) struct FsApi { + file_system: Arc, +} + +impl Default for FsApi { + fn default() -> Self { + Self { + file_system: Arc::new(Environment.get_filesystem()), + } + } +} impl FsApi { pub(crate) async fn read_file( &self, params: FsReadFileParams, ) -> Result { - let bytes = tokio::fs::read(params.path).await.map_err(map_io_error)?; + let bytes = self + .file_system + .read_file(¶ms.path) + .await + .map_err(map_fs_error)?; Ok(FsReadFileResponse { data_base64: STANDARD.encode(bytes), }) @@ -49,9 +63,10 @@ impl FsApi { "fs/writeFile requires valid base64 dataBase64: {err}" )) })?; - tokio::fs::write(params.path, bytes) + self.file_system + .write_file(¶ms.path, bytes) .await - .map_err(map_io_error)?; + .map_err(map_fs_error)?; Ok(FsWriteFileResponse {}) } @@ -59,15 +74,15 @@ impl FsApi { &self, params: FsCreateDirectoryParams, ) -> Result { - if params.recursive.unwrap_or(true) { - tokio::fs::create_dir_all(params.path) - .await - .map_err(map_io_error)?; - } else { - tokio::fs::create_dir(params.path) - .await - .map_err(map_io_error)?; - } + self.file_system + .create_directory( + ¶ms.path, + CreateDirectoryOptions { + recursive: params.recursive.unwrap_or(true), + }, + ) + .await + .map_err(map_fs_error)?; Ok(FsCreateDirectoryResponse {}) } @@ -75,14 +90,16 @@ impl FsApi { &self, params: FsGetMetadataParams, ) -> Result { - let metadata = tokio::fs::metadata(params.path) + let metadata = self + .file_system + .get_metadata(¶ms.path) .await - .map_err(map_io_error)?; + .map_err(map_fs_error)?; Ok(FsGetMetadataResponse { - is_directory: metadata.is_dir(), - is_file: metadata.is_file(), - created_at_ms: metadata.created().ok().map_or(0, system_time_to_unix_ms), - modified_at_ms: metadata.modified().ok().map_or(0, system_time_to_unix_ms), + is_directory: metadata.is_directory, + is_file: metadata.is_file, + created_at_ms: metadata.created_at_ms, + modified_at_ms: metadata.modified_at_ms, }) } @@ -90,232 +107,59 @@ impl FsApi { &self, params: FsReadDirectoryParams, ) -> Result { - let mut entries = Vec::new(); - let mut read_dir = tokio::fs::read_dir(params.path) + let entries = self + .file_system + .read_directory(¶ms.path) .await - .map_err(map_io_error)?; - while let Some(entry) = read_dir.next_entry().await.map_err(map_io_error)? { - let metadata = tokio::fs::metadata(entry.path()) - .await - .map_err(map_io_error)?; - entries.push(FsReadDirectoryEntry { - file_name: entry.file_name().to_string_lossy().into_owned(), - is_directory: metadata.is_dir(), - is_file: metadata.is_file(), - }); - } - Ok(FsReadDirectoryResponse { entries }) + .map_err(map_fs_error)?; + Ok(FsReadDirectoryResponse { + entries: entries + .into_iter() + .map(|entry| FsReadDirectoryEntry { + file_name: entry.file_name, + is_directory: entry.is_directory, + is_file: entry.is_file, + }) + .collect(), + }) } pub(crate) async fn remove( &self, params: FsRemoveParams, ) -> Result { - let path = params.path.as_path(); - let recursive = params.recursive.unwrap_or(true); - let force = params.force.unwrap_or(true); - match tokio::fs::symlink_metadata(path).await { - Ok(metadata) => { - let file_type = metadata.file_type(); - if file_type.is_dir() { - if recursive { - tokio::fs::remove_dir_all(path) - .await - .map_err(map_io_error)?; - } else { - tokio::fs::remove_dir(path).await.map_err(map_io_error)?; - } - } else { - tokio::fs::remove_file(path).await.map_err(map_io_error)?; - } - Ok(FsRemoveResponse {}) - } - Err(err) if err.kind() == io::ErrorKind::NotFound && force => Ok(FsRemoveResponse {}), - Err(err) => Err(map_io_error(err)), - } + self.file_system + .remove( + ¶ms.path, + RemoveOptions { + recursive: params.recursive.unwrap_or(true), + force: params.force.unwrap_or(true), + }, + ) + .await + .map_err(map_fs_error)?; + Ok(FsRemoveResponse {}) } pub(crate) async fn copy( &self, params: FsCopyParams, ) -> Result { - let FsCopyParams { - source_path, - destination_path, - recursive, - } = params; - tokio::task::spawn_blocking(move || -> Result<(), JSONRPCErrorError> { - let metadata = - std::fs::symlink_metadata(source_path.as_path()).map_err(map_io_error)?; - let file_type = metadata.file_type(); - - if file_type.is_dir() { - if !recursive { - return Err(invalid_request( - "fs/copy requires recursive: true when sourcePath is a directory", - )); - } - if destination_is_same_or_descendant_of_source( - source_path.as_path(), - destination_path.as_path(), - ) - .map_err(map_io_error)? - { - return Err(invalid_request( - "fs/copy cannot copy a directory to itself or one of its descendants", - )); - } - copy_dir_recursive(source_path.as_path(), destination_path.as_path()) - .map_err(map_io_error)?; - return Ok(()); - } - - if file_type.is_symlink() { - copy_symlink(source_path.as_path(), destination_path.as_path()) - .map_err(map_io_error)?; - return Ok(()); - } - - if file_type.is_file() { - std::fs::copy(source_path.as_path(), destination_path.as_path()) - .map_err(map_io_error)?; - return Ok(()); - } - - Err(invalid_request( - "fs/copy only supports regular files, directories, and symlinks", - )) - }) - .await - .map_err(map_join_error)??; + self.file_system + .copy( + ¶ms.source_path, + ¶ms.destination_path, + CopyOptions { + recursive: params.recursive, + }, + ) + .await + .map_err(map_fs_error)?; Ok(FsCopyResponse {}) } } -fn copy_dir_recursive(source: &Path, target: &Path) -> io::Result<()> { - for entry in WalkDir::new(source) { - let entry = entry.map_err(|err| { - if let Some(io_err) = err.io_error() { - io::Error::new(io_err.kind(), io_err.to_string()) - } else { - io::Error::other(err.to_string()) - } - })?; - let relative_path = entry.path().strip_prefix(source).map_err(|err| { - io::Error::other(format!( - "failed to compute relative path for {} under {}: {err}", - entry.path().display(), - source.display() - )) - })?; - let target_path = target.join(relative_path); - let file_type = entry.file_type(); - - if file_type.is_dir() { - std::fs::create_dir_all(&target_path)?; - continue; - } - - if file_type.is_file() { - std::fs::copy(entry.path(), &target_path)?; - continue; - } - - if file_type.is_symlink() { - copy_symlink(entry.path(), &target_path)?; - continue; - } - - // For now ignore special files such as FIFOs, sockets, and device nodes during recursive copies. - } - Ok(()) -} - -fn destination_is_same_or_descendant_of_source( - source: &Path, - destination: &Path, -) -> io::Result { - let source = std::fs::canonicalize(source)?; - let destination = resolve_copy_destination_path(destination)?; - Ok(destination.starts_with(&source)) -} - -fn resolve_copy_destination_path(path: &Path) -> io::Result { - let mut normalized = PathBuf::new(); - for component in path.components() { - match component { - Component::Prefix(prefix) => normalized.push(prefix.as_os_str()), - Component::RootDir => normalized.push(component.as_os_str()), - Component::CurDir => {} - Component::ParentDir => { - normalized.pop(); - } - Component::Normal(part) => normalized.push(part), - } - } - - let mut unresolved_suffix = Vec::new(); - let mut existing_path = normalized.as_path(); - while !existing_path.exists() { - let Some(file_name) = existing_path.file_name() else { - break; - }; - unresolved_suffix.push(file_name.to_os_string()); - let Some(parent) = existing_path.parent() else { - break; - }; - existing_path = parent; - } - - let mut resolved = std::fs::canonicalize(existing_path)?; - for file_name in unresolved_suffix.iter().rev() { - resolved.push(file_name); - } - Ok(resolved) -} - -fn copy_symlink(source: &Path, target: &Path) -> io::Result<()> { - let link_target = std::fs::read_link(source)?; - #[cfg(unix)] - { - std::os::unix::fs::symlink(&link_target, target) - } - #[cfg(windows)] - { - if symlink_points_to_directory(source)? { - std::os::windows::fs::symlink_dir(&link_target, target) - } else { - std::os::windows::fs::symlink_file(&link_target, target) - } - } - #[cfg(not(any(unix, windows)))] - { - let _ = link_target; - let _ = target; - Err(io::Error::new( - io::ErrorKind::Unsupported, - "copying symlinks is unsupported on this platform", - )) - } -} - -#[cfg(windows)] -fn symlink_points_to_directory(source: &Path) -> io::Result { - use std::os::windows::fs::FileTypeExt; - - Ok(std::fs::symlink_metadata(source)? - .file_type() - .is_symlink_dir()) -} - -fn system_time_to_unix_ms(time: SystemTime) -> i64 { - time.duration_since(UNIX_EPOCH) - .ok() - .and_then(|duration| i64::try_from(duration.as_millis()).ok()) - .unwrap_or(0) -} - -pub(crate) fn invalid_request(message: impl Into) -> JSONRPCErrorError { +fn invalid_request(message: impl Into) -> JSONRPCErrorError { JSONRPCErrorError { code: INVALID_REQUEST_ERROR_CODE, message: message.into(), @@ -323,43 +167,14 @@ pub(crate) fn invalid_request(message: impl Into) -> JSONRPCErrorError { } } -fn map_join_error(err: tokio::task::JoinError) -> JSONRPCErrorError { - JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: format!("filesystem task failed: {err}"), - data: None, - } -} - -pub(crate) fn map_io_error(err: io::Error) -> JSONRPCErrorError { - JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: err.to_string(), - data: None, - } -} - -#[cfg(all(test, windows))] -mod tests { - use super::*; - use pretty_assertions::assert_eq; - - #[test] - fn symlink_points_to_directory_handles_dangling_directory_symlinks() -> io::Result<()> { - use std::os::windows::fs::symlink_dir; - - let temp_dir = tempfile::TempDir::new()?; - let source_dir = temp_dir.path().join("source"); - let link_path = temp_dir.path().join("source-link"); - std::fs::create_dir(&source_dir)?; - - if symlink_dir(&source_dir, &link_path).is_err() { - return Ok(()); +fn map_fs_error(err: io::Error) -> JSONRPCErrorError { + if err.kind() == io::ErrorKind::InvalidInput { + invalid_request(err.to_string()) + } else { + JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: err.to_string(), + data: None, } - - std::fs::remove_dir(&source_dir)?; - - assert_eq!(symlink_points_to_directory(&link_path)?, true); - Ok(()) } } diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 01719b9b5..f7ea2c705 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -253,7 +253,7 @@ impl MessageProcessor { analytics_events_client, ); let external_agent_config_api = ExternalAgentConfigApi::new(config.codex_home.clone()); - let fs_api = FsApi; + let fs_api = FsApi::default(); Self { outgoing, diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index ef6b8a013..d11e20981 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -34,6 +34,7 @@ codex-async-utils = { workspace = true } codex-client = { workspace = true } codex-connectors = { workspace = true } codex-config = { workspace = true } +codex-environment = { workspace = true } codex-shell-command = { workspace = true } codex-skills = { workspace = true } codex-execpolicy = { workspace = true } diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index d36747918..b7503b8c6 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -60,6 +60,7 @@ use chrono::Local; use chrono::Utc; use codex_app_server_protocol::McpServerElicitationRequest; use codex_app_server_protocol::McpServerElicitationRequestParams; +use codex_environment::Environment; use codex_hooks::HookEvent; use codex_hooks::HookEventAfterAgent; use codex_hooks::HookPayload; @@ -785,6 +786,7 @@ pub(crate) struct TurnContext { pub(crate) reasoning_effort: Option, pub(crate) reasoning_summary: ReasoningSummaryConfig, pub(crate) session_source: SessionSource, + pub(crate) environment: Arc, /// The session's current working directory. All relative paths provided by /// the model as well as sandbox policies are resolved against this path /// instead of `std::env::current_dir()`. @@ -894,6 +896,7 @@ impl TurnContext { reasoning_effort, reasoning_summary: self.reasoning_summary, session_source: self.session_source.clone(), + environment: Arc::clone(&self.environment), cwd: self.cwd.clone(), current_date: self.current_date.clone(), timezone: self.timezone.clone(), @@ -1282,6 +1285,7 @@ impl Session { model_info: ModelInfo, models_manager: &ModelsManager, network: Option, + environment: Arc, sub_id: String, js_repl: Arc, skills_outcome: Arc, @@ -1338,6 +1342,7 @@ impl Session { reasoning_effort, reasoning_summary, session_source, + environment, cwd, current_date: Some(current_date), timezone: Some(timezone), @@ -1810,6 +1815,7 @@ impl Session { code_mode_service: crate::tools::code_mode::CodeModeService::new( config.js_repl_node_path.clone(), ), + environment: Arc::new(Environment), }; let js_repl = Arc::new(JsReplHandle::with_node_path( config.js_repl_node_path.clone(), @@ -2389,6 +2395,7 @@ impl Session { .network_proxy .as_ref() .map(StartedNetworkProxy::proxy), + Arc::clone(&self.services.environment), sub_id, Arc::clone(&self.js_repl), skills_outcome, @@ -5198,6 +5205,7 @@ async fn spawn_review_thread( reasoning_effort, reasoning_summary, session_source, + environment: Arc::clone(&parent_turn_context.environment), tools_config, features: parent_turn_context.features.clone(), ghost_snapshot: parent_turn_context.ghost_snapshot.clone(), diff --git a/codex-rs/core/src/codex_tests.rs b/codex-rs/core/src/codex_tests.rs index 0f45e3de6..bb70bdd7d 100644 --- a/codex-rs/core/src/codex_tests.rs +++ b/codex-rs/core/src/codex_tests.rs @@ -2466,6 +2466,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { true, )); let network_approval = Arc::new(NetworkApprovalService::default()); + let environment = Arc::new(codex_environment::Environment); let file_watcher = Arc::new(FileWatcher::noop()); let services = SessionServices { @@ -2520,6 +2521,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { code_mode_service: crate::tools::code_mode::CodeModeService::new( config.js_repl_node_path.clone(), ), + environment: Arc::clone(&environment), }; let js_repl = Arc::new(JsReplHandle::with_node_path( config.js_repl_node_path.clone(), @@ -2539,6 +2541,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { model_info, &models_manager, None, + environment, "turn_id".to_string(), Arc::clone(&js_repl), skills_outcome, @@ -3258,6 +3261,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx( true, )); let network_approval = Arc::new(NetworkApprovalService::default()); + let environment = Arc::new(codex_environment::Environment); let file_watcher = Arc::new(FileWatcher::noop()); let services = SessionServices { @@ -3312,6 +3316,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx( code_mode_service: crate::tools::code_mode::CodeModeService::new( config.js_repl_node_path.clone(), ), + environment: Arc::clone(&environment), }; let js_repl = Arc::new(JsReplHandle::with_node_path( config.js_repl_node_path.clone(), @@ -3331,6 +3336,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx( model_info, &models_manager, None, + environment, "turn_id".to_string(), Arc::clone(&js_repl), skills_outcome, diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index 851618c00..1a3f58d0f 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -20,6 +20,7 @@ use crate::tools::network_approval::NetworkApprovalService; use crate::tools::runtimes::ExecveSessionApproval; use crate::tools::sandboxing::ApprovalStore; use crate::unified_exec::UnifiedExecProcessManager; +use codex_environment::Environment; use codex_hooks::Hooks; use codex_otel::SessionTelemetry; use codex_utils_absolute_path::AbsolutePathBuf; @@ -61,4 +62,5 @@ pub(crate) struct SessionServices { /// Session-scoped model client shared across turns. pub(crate) model_client: ModelClient, pub(crate) code_mode_service: CodeModeService, + pub(crate) environment: Arc, } diff --git a/codex-rs/core/src/tools/handlers/view_image.rs b/codex-rs/core/src/tools/handlers/view_image.rs index 5757aeb4e..3957549d2 100644 --- a/codex-rs/core/src/tools/handlers/view_image.rs +++ b/codex-rs/core/src/tools/handlers/view_image.rs @@ -1,12 +1,13 @@ use async_trait::async_trait; +use codex_environment::ExecutorFileSystem; use codex_protocol::models::ContentItem; use codex_protocol::models::FunctionCallOutputContentItem; use codex_protocol::models::ImageDetail; use codex_protocol::models::local_image_content_items_with_label_number; use codex_protocol::openai_models::InputModality; +use codex_utils_absolute_path::AbsolutePathBuf; use codex_utils_image::PromptImageMode; use serde::Deserialize; -use tokio::fs; use crate::function_tool::FunctionCallError; use crate::original_image_detail::can_request_original_image_detail; @@ -87,22 +88,41 @@ impl ToolHandler for ViewImageHandler { } }; - let abs_path = turn.resolve_path(Some(args.path)); + let abs_path = + AbsolutePathBuf::try_from(turn.resolve_path(Some(args.path))).map_err(|error| { + FunctionCallError::RespondToModel(format!("unable to resolve image path: {error}")) + })?; - let metadata = fs::metadata(&abs_path).await.map_err(|error| { - FunctionCallError::RespondToModel(format!( - "unable to locate image at `{}`: {error}", - abs_path.display() - )) - })?; + let metadata = turn + .environment + .get_filesystem() + .get_metadata(&abs_path) + .await + .map_err(|error| { + FunctionCallError::RespondToModel(format!( + "unable to locate image at `{}`: {error}", + abs_path.display() + )) + })?; - if !metadata.is_file() { + if !metadata.is_file { return Err(FunctionCallError::RespondToModel(format!( "image path `{}` is not a file", abs_path.display() ))); } - let event_path = abs_path.clone(); + let file_bytes = turn + .environment + .get_filesystem() + .read_file(&abs_path) + .await + .map_err(|error| { + FunctionCallError::RespondToModel(format!( + "unable to read image at `{}`: {error}", + abs_path.display() + )) + })?; + let event_path = abs_path.to_path_buf(); let can_request_original_detail = can_request_original_image_detail(turn.features.get(), &turn.model_info); @@ -116,7 +136,10 @@ impl ToolHandler for ViewImageHandler { let image_detail = use_original_detail.then_some(ImageDetail::Original); let content = local_image_content_items_with_label_number( - &abs_path, /*label_number*/ None, image_mode, + abs_path.as_path(), + file_bytes, + /*label_number*/ None, + image_mode, ) .into_iter() .map(|item| match item { diff --git a/codex-rs/environment/BUILD.bazel b/codex-rs/environment/BUILD.bazel new file mode 100644 index 000000000..90487c35e --- /dev/null +++ b/codex-rs/environment/BUILD.bazel @@ -0,0 +1,6 @@ +load("//:defs.bzl", "codex_rust_crate") + +codex_rust_crate( + name = "environment", + crate_name = "codex_environment", +) diff --git a/codex-rs/environment/Cargo.toml b/codex-rs/environment/Cargo.toml new file mode 100644 index 000000000..255348f7a --- /dev/null +++ b/codex-rs/environment/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "codex-environment" +version.workspace = true +edition.workspace = true +license.workspace = true + +[lib] +name = "codex_environment" +path = "src/lib.rs" + +[lints] +workspace = true + +[dependencies] +async-trait = { workspace = true } +codex-utils-absolute-path = { workspace = true } +tokio = { workspace = true, features = ["fs", "io-util", "rt"] } + +[dev-dependencies] +pretty_assertions = { workspace = true } +tempfile = { workspace = true } diff --git a/codex-rs/environment/src/fs.rs b/codex-rs/environment/src/fs.rs new file mode 100644 index 000000000..82e0b8e6e --- /dev/null +++ b/codex-rs/environment/src/fs.rs @@ -0,0 +1,332 @@ +use async_trait::async_trait; +use codex_utils_absolute_path::AbsolutePathBuf; +use std::path::Component; +use std::path::Path; +use std::path::PathBuf; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; +use tokio::io; + +const MAX_READ_FILE_BYTES: u64 = 512 * 1024 * 1024; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct CreateDirectoryOptions { + pub recursive: bool, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct RemoveOptions { + pub recursive: bool, + pub force: bool, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct CopyOptions { + pub recursive: bool, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct FileMetadata { + pub is_directory: bool, + pub is_file: bool, + pub created_at_ms: i64, + pub modified_at_ms: i64, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct ReadDirectoryEntry { + pub file_name: String, + pub is_directory: bool, + pub is_file: bool, +} + +pub type FileSystemResult = io::Result; + +#[async_trait] +pub trait ExecutorFileSystem: Send + Sync { + async fn read_file(&self, path: &AbsolutePathBuf) -> FileSystemResult>; + + async fn write_file(&self, path: &AbsolutePathBuf, contents: Vec) -> FileSystemResult<()>; + + async fn create_directory( + &self, + path: &AbsolutePathBuf, + options: CreateDirectoryOptions, + ) -> FileSystemResult<()>; + + async fn get_metadata(&self, path: &AbsolutePathBuf) -> FileSystemResult; + + async fn read_directory( + &self, + path: &AbsolutePathBuf, + ) -> FileSystemResult>; + + async fn remove(&self, path: &AbsolutePathBuf, options: RemoveOptions) -> FileSystemResult<()>; + + async fn copy( + &self, + source_path: &AbsolutePathBuf, + destination_path: &AbsolutePathBuf, + options: CopyOptions, + ) -> FileSystemResult<()>; +} + +#[derive(Clone, Default)] +pub(crate) struct LocalFileSystem; + +#[async_trait] +impl ExecutorFileSystem for LocalFileSystem { + async fn read_file(&self, path: &AbsolutePathBuf) -> FileSystemResult> { + let metadata = tokio::fs::metadata(path.as_path()).await?; + if metadata.len() > MAX_READ_FILE_BYTES { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("file is too large to read: limit is {MAX_READ_FILE_BYTES} bytes"), + )); + } + tokio::fs::read(path.as_path()).await + } + + async fn write_file(&self, path: &AbsolutePathBuf, contents: Vec) -> FileSystemResult<()> { + tokio::fs::write(path.as_path(), contents).await + } + + async fn create_directory( + &self, + path: &AbsolutePathBuf, + options: CreateDirectoryOptions, + ) -> FileSystemResult<()> { + if options.recursive { + tokio::fs::create_dir_all(path.as_path()).await?; + } else { + tokio::fs::create_dir(path.as_path()).await?; + } + Ok(()) + } + + async fn get_metadata(&self, path: &AbsolutePathBuf) -> FileSystemResult { + let metadata = tokio::fs::metadata(path.as_path()).await?; + Ok(FileMetadata { + is_directory: metadata.is_dir(), + is_file: metadata.is_file(), + created_at_ms: metadata.created().ok().map_or(0, system_time_to_unix_ms), + modified_at_ms: metadata.modified().ok().map_or(0, system_time_to_unix_ms), + }) + } + + async fn read_directory( + &self, + path: &AbsolutePathBuf, + ) -> FileSystemResult> { + let mut entries = Vec::new(); + let mut read_dir = tokio::fs::read_dir(path.as_path()).await?; + while let Some(entry) = read_dir.next_entry().await? { + let metadata = tokio::fs::metadata(entry.path()).await?; + entries.push(ReadDirectoryEntry { + file_name: entry.file_name().to_string_lossy().into_owned(), + is_directory: metadata.is_dir(), + is_file: metadata.is_file(), + }); + } + Ok(entries) + } + + async fn remove(&self, path: &AbsolutePathBuf, options: RemoveOptions) -> FileSystemResult<()> { + match tokio::fs::symlink_metadata(path.as_path()).await { + Ok(metadata) => { + let file_type = metadata.file_type(); + if file_type.is_dir() { + if options.recursive { + tokio::fs::remove_dir_all(path.as_path()).await?; + } else { + tokio::fs::remove_dir(path.as_path()).await?; + } + } else { + tokio::fs::remove_file(path.as_path()).await?; + } + Ok(()) + } + Err(err) if err.kind() == io::ErrorKind::NotFound && options.force => Ok(()), + Err(err) => Err(err), + } + } + + async fn copy( + &self, + source_path: &AbsolutePathBuf, + destination_path: &AbsolutePathBuf, + options: CopyOptions, + ) -> FileSystemResult<()> { + let source_path = source_path.to_path_buf(); + let destination_path = destination_path.to_path_buf(); + tokio::task::spawn_blocking(move || -> FileSystemResult<()> { + let metadata = std::fs::symlink_metadata(source_path.as_path())?; + let file_type = metadata.file_type(); + + if file_type.is_dir() { + if !options.recursive { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "fs/copy requires recursive: true when sourcePath is a directory", + )); + } + if destination_is_same_or_descendant_of_source( + source_path.as_path(), + destination_path.as_path(), + )? { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "fs/copy cannot copy a directory to itself or one of its descendants", + )); + } + copy_dir_recursive(source_path.as_path(), destination_path.as_path())?; + return Ok(()); + } + + if file_type.is_symlink() { + copy_symlink(source_path.as_path(), destination_path.as_path())?; + return Ok(()); + } + + if file_type.is_file() { + std::fs::copy(source_path.as_path(), destination_path.as_path())?; + return Ok(()); + } + + Err(io::Error::new( + io::ErrorKind::InvalidInput, + "fs/copy only supports regular files, directories, and symlinks", + )) + }) + .await + .map_err(|err| io::Error::other(format!("filesystem task failed: {err}")))? + } +} + +fn copy_dir_recursive(source: &Path, target: &Path) -> io::Result<()> { + std::fs::create_dir_all(target)?; + for entry in std::fs::read_dir(source)? { + let entry = entry?; + let source_path = entry.path(); + let target_path = target.join(entry.file_name()); + let file_type = entry.file_type()?; + + if file_type.is_dir() { + copy_dir_recursive(&source_path, &target_path)?; + } else if file_type.is_file() { + std::fs::copy(&source_path, &target_path)?; + } else if file_type.is_symlink() { + copy_symlink(&source_path, &target_path)?; + } + } + Ok(()) +} + +fn destination_is_same_or_descendant_of_source( + source: &Path, + destination: &Path, +) -> io::Result { + let source = std::fs::canonicalize(source)?; + let destination = resolve_copy_destination_path(destination)?; + Ok(destination.starts_with(&source)) +} + +fn resolve_copy_destination_path(path: &Path) -> io::Result { + let mut normalized = PathBuf::new(); + for component in path.components() { + match component { + Component::Prefix(prefix) => normalized.push(prefix.as_os_str()), + Component::RootDir => normalized.push(component.as_os_str()), + Component::CurDir => {} + Component::ParentDir => { + normalized.pop(); + } + Component::Normal(part) => normalized.push(part), + } + } + + let mut unresolved_suffix = Vec::new(); + let mut existing_path = normalized.as_path(); + while !existing_path.exists() { + let Some(file_name) = existing_path.file_name() else { + break; + }; + unresolved_suffix.push(file_name.to_os_string()); + let Some(parent) = existing_path.parent() else { + break; + }; + existing_path = parent; + } + + let mut resolved = std::fs::canonicalize(existing_path)?; + for file_name in unresolved_suffix.iter().rev() { + resolved.push(file_name); + } + Ok(resolved) +} + +fn copy_symlink(source: &Path, target: &Path) -> io::Result<()> { + let link_target = std::fs::read_link(source)?; + #[cfg(unix)] + { + std::os::unix::fs::symlink(&link_target, target) + } + #[cfg(windows)] + { + if symlink_points_to_directory(source)? { + std::os::windows::fs::symlink_dir(&link_target, target) + } else { + std::os::windows::fs::symlink_file(&link_target, target) + } + } + #[cfg(not(any(unix, windows)))] + { + let _ = link_target; + let _ = target; + Err(io::Error::new( + io::ErrorKind::Unsupported, + "copying symlinks is unsupported on this platform", + )) + } +} + +#[cfg(windows)] +fn symlink_points_to_directory(source: &Path) -> io::Result { + use std::os::windows::fs::FileTypeExt; + + Ok(std::fs::symlink_metadata(source)? + .file_type() + .is_symlink_dir()) +} + +fn system_time_to_unix_ms(time: SystemTime) -> i64 { + time.duration_since(UNIX_EPOCH) + .ok() + .and_then(|duration| i64::try_from(duration.as_millis()).ok()) + .unwrap_or(0) +} + +#[cfg(all(test, windows))] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn symlink_points_to_directory_handles_dangling_directory_symlinks() -> io::Result<()> { + use std::os::windows::fs::symlink_dir; + + let temp_dir = tempfile::TempDir::new()?; + let source_dir = temp_dir.path().join("source"); + let link_path = temp_dir.path().join("source-link"); + std::fs::create_dir(&source_dir)?; + + if symlink_dir(&source_dir, &link_path).is_err() { + return Ok(()); + } + + std::fs::remove_dir(&source_dir)?; + + assert_eq!(symlink_points_to_directory(&link_path)?, true); + Ok(()) + } +} diff --git a/codex-rs/environment/src/lib.rs b/codex-rs/environment/src/lib.rs new file mode 100644 index 000000000..0cf9f22f2 --- /dev/null +++ b/codex-rs/environment/src/lib.rs @@ -0,0 +1,18 @@ +pub mod fs; + +pub use fs::CopyOptions; +pub use fs::CreateDirectoryOptions; +pub use fs::ExecutorFileSystem; +pub use fs::FileMetadata; +pub use fs::FileSystemResult; +pub use fs::ReadDirectoryEntry; +pub use fs::RemoveOptions; + +#[derive(Clone, Debug, Default)] +pub struct Environment; + +impl Environment { + pub fn get_filesystem(&self) -> impl ExecutorFileSystem + use<> { + fs::LocalFileSystem + } +} diff --git a/codex-rs/protocol/src/models.rs b/codex-rs/protocol/src/models.rs index 1e5a7445e..a2c337d24 100644 --- a/codex-rs/protocol/src/models.rs +++ b/codex-rs/protocol/src/models.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::path::Path; use codex_utils_image::PromptImageMode; -use codex_utils_image::load_for_prompt; +use codex_utils_image::load_for_prompt_bytes; use serde::Deserialize; use serde::Deserializer; use serde::Serialize; @@ -944,10 +944,11 @@ fn unsupported_image_error_placeholder(path: &std::path::Path, mime: &str) -> Co pub fn local_image_content_items_with_label_number( path: &std::path::Path, + file_bytes: Vec, label_number: Option, mode: PromptImageMode, ) -> Vec { - match load_for_prompt(path, mode) { + match load_for_prompt_bytes(path, file_bytes, mode) { Ok(image) => { let mut items = Vec::with_capacity(3); if let Some(label_number) = label_number { @@ -1114,11 +1115,15 @@ impl From> for ResponseInputItem { } UserInput::LocalImage { path } => { image_index += 1; - local_image_content_items_with_label_number( - &path, - Some(image_index), - PromptImageMode::ResizeToFit, - ) + match std::fs::read(&path) { + Ok(file_bytes) => local_image_content_items_with_label_number( + &path, + file_bytes, + Some(image_index), + PromptImageMode::ResizeToFit, + ), + Err(err) => vec![local_image_error_placeholder(&path, err)], + } } UserInput::Skill { .. } | UserInput::Mention { .. } => Vec::new(), // Tool bodies are injected later in core }) diff --git a/codex-rs/utils/image/Cargo.toml b/codex-rs/utils/image/Cargo.toml index 370249738..e835e49e7 100644 --- a/codex-rs/utils/image/Cargo.toml +++ b/codex-rs/utils/image/Cargo.toml @@ -16,4 +16,3 @@ tokio = { workspace = true, features = ["fs", "rt", "rt-multi-thread", "macros"] [dev-dependencies] image = { workspace = true, features = ["jpeg", "png", "gif", "webp"] } -tempfile = { workspace = true } diff --git a/codex-rs/utils/image/src/lib.rs b/codex-rs/utils/image/src/lib.rs index d0aba2f60..8fd072426 100644 --- a/codex-rs/utils/image/src/lib.rs +++ b/codex-rs/utils/image/src/lib.rs @@ -53,18 +53,13 @@ struct ImageCacheKey { static IMAGE_CACHE: LazyLock> = LazyLock::new(|| BlockingLruCache::new(NonZeroUsize::new(32).unwrap_or(NonZeroUsize::MIN))); -pub fn load_and_resize_to_fit(path: &Path) -> Result { - load_for_prompt(path, PromptImageMode::ResizeToFit) -} - -pub fn load_for_prompt( +pub fn load_for_prompt_bytes( path: &Path, + file_bytes: Vec, mode: PromptImageMode, ) -> Result { let path_buf = path.to_path_buf(); - let file_bytes = read_file_bytes(path, &path_buf)?; - let key = ImageCacheKey { digest: sha1_digest(&file_bytes), mode, @@ -136,24 +131,6 @@ fn can_preserve_source_bytes(format: ImageFormat) -> bool { ) } -fn read_file_bytes(path: &Path, path_for_error: &Path) -> Result, ImageProcessingError> { - match tokio::runtime::Handle::try_current() { - // If we're inside a Tokio runtime, avoid block_on (it panics on worker threads). - // Use block_in_place and do a standard blocking read safely. - Ok(_) => tokio::task::block_in_place(|| std::fs::read(path)).map_err(|source| { - ImageProcessingError::Read { - path: path_for_error.to_path_buf(), - source, - } - }), - // Outside a runtime, just read synchronously. - Err(_) => std::fs::read(path).map_err(|source| ImageProcessingError::Read { - path: path_for_error.to_path_buf(), - source, - }), - } -} - fn encode_image( image: &DynamicImage, preferred_format: ImageFormat, @@ -223,11 +200,20 @@ fn format_to_mime(format: ImageFormat) -> String { #[cfg(test)] mod tests { + use std::io::Cursor; + use super::*; use image::GenericImageView; use image::ImageBuffer; use image::Rgba; - use tempfile::NamedTempFile; + + fn image_bytes(image: &ImageBuffer, Vec>, format: ImageFormat) -> Vec { + let mut encoded = Cursor::new(Vec::new()); + DynamicImage::ImageRgba8(image.clone()) + .write_to(&mut encoded, format) + .expect("encode image to bytes"); + encoded.into_inner() + } #[tokio::test(flavor = "multi_thread")] async fn returns_original_image_when_within_bounds() { @@ -235,14 +221,15 @@ mod tests { (ImageFormat::Png, "image/png"), (ImageFormat::WebP, "image/webp"), ] { - let temp_file = NamedTempFile::new().expect("temp file"); let image = ImageBuffer::from_pixel(64, 32, Rgba([10u8, 20, 30, 255])); - image - .save_with_format(temp_file.path(), format) - .expect("write image to temp file"); + let original_bytes = image_bytes(&image, format); - let original_bytes = std::fs::read(temp_file.path()).expect("read written image"); - let encoded = load_and_resize_to_fit(temp_file.path()).expect("process image"); + let encoded = load_for_prompt_bytes( + Path::new("in-memory-image"), + original_bytes.clone(), + PromptImageMode::ResizeToFit, + ) + .expect("process image"); assert_eq!(encoded.width, 64); assert_eq!(encoded.height, 32); @@ -257,13 +244,15 @@ mod tests { (ImageFormat::Png, "image/png"), (ImageFormat::WebP, "image/webp"), ] { - let temp_file = NamedTempFile::new().expect("temp file"); let image = ImageBuffer::from_pixel(4096, 2048, Rgba([200u8, 10, 10, 255])); - image - .save_with_format(temp_file.path(), format) - .expect("write image to temp file"); + let original_bytes = image_bytes(&image, format); - let processed = load_and_resize_to_fit(temp_file.path()).expect("process image"); + let processed = load_for_prompt_bytes( + Path::new("in-memory-image"), + original_bytes, + PromptImageMode::ResizeToFit, + ) + .expect("process image"); assert!(processed.width <= MAX_WIDTH); assert!(processed.height <= MAX_HEIGHT); @@ -281,15 +270,15 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn preserves_large_image_in_original_mode() { - let temp_file = NamedTempFile::new().expect("temp file"); let image = ImageBuffer::from_pixel(4096, 2048, Rgba([180u8, 30, 30, 255])); - image - .save_with_format(temp_file.path(), ImageFormat::Png) - .expect("write png to temp file"); + let original_bytes = image_bytes(&image, ImageFormat::Png); - let original_bytes = std::fs::read(temp_file.path()).expect("read written image"); - let processed = - load_for_prompt(temp_file.path(), PromptImageMode::Original).expect("process image"); + let processed = load_for_prompt_bytes( + Path::new("in-memory-image"), + original_bytes.clone(), + PromptImageMode::Original, + ) + .expect("process image"); assert_eq!(processed.width, 4096); assert_eq!(processed.height, 2048); @@ -299,10 +288,12 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn fails_cleanly_for_invalid_images() { - let temp_file = NamedTempFile::new().expect("temp file"); - std::fs::write(temp_file.path(), b"not an image").expect("write bytes"); - - let err = load_and_resize_to_fit(temp_file.path()).expect_err("invalid image should fail"); + let err = load_for_prompt_bytes( + Path::new("in-memory-image"), + b"not an image".to_vec(), + PromptImageMode::ResizeToFit, + ) + .expect_err("invalid image should fail"); match err { ImageProcessingError::Decode { .. } => {} _ => panic!("unexpected error variant"), @@ -315,20 +306,25 @@ mod tests { IMAGE_CACHE.clear(); } - let temp_file = NamedTempFile::new().expect("temp file"); let first_image = ImageBuffer::from_pixel(32, 16, Rgba([20u8, 120, 220, 255])); - first_image - .save_with_format(temp_file.path(), ImageFormat::Png) - .expect("write initial image"); + let first_bytes = image_bytes(&first_image, ImageFormat::Png); - let first = load_and_resize_to_fit(temp_file.path()).expect("process first image"); + let first = load_for_prompt_bytes( + Path::new("in-memory-image"), + first_bytes, + PromptImageMode::ResizeToFit, + ) + .expect("process first image"); let second_image = ImageBuffer::from_pixel(96, 48, Rgba([50u8, 60, 70, 255])); - second_image - .save_with_format(temp_file.path(), ImageFormat::Png) - .expect("write updated image"); + let second_bytes = image_bytes(&second_image, ImageFormat::Png); - let second = load_and_resize_to_fit(temp_file.path()).expect("process updated image"); + let second = load_for_prompt_bytes( + Path::new("in-memory-image"), + second_bytes, + PromptImageMode::ResizeToFit, + ) + .expect("process updated image"); assert_eq!(first.width, 32); assert_eq!(first.height, 16);