core-agent-ide/codex-rs/exec/src/lib.rs
Curtis 'Fjord' Hawthorne 5f8776d34d
Allow global exec flags after resume and fix CI codex build/timeout (#8440)
**Motivation**
- Bring `codex exec resume` to parity with top‑level flags so global
options (git check bypass, json, model, sandbox toggles) work after the
subcommand, including when outside a git repo.

**Description**
- Exec CLI: mark `--skip-git-repo-check`, `--json`, `--model`,
`--full-auto`, and `--dangerously-bypass-approvals-and-sandbox` as
global so they’re accepted after `resume`.
- Tests: add `exec_resume_accepts_global_flags_after_subcommand` to
verify those flags work when passed after `resume`.

**Testing**
- `just fmt`
- `cargo test -p codex-exec` (pass; ran with elevated perms to allow
network/port binds)
- Manual: exercised `codex exec resume` with global flags after the
subcommand to confirm behavior.
2026-01-05 22:12:09 +00:00

663 lines
22 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// - In the default output mode, it is paramount that the only thing written to
// stdout is the final message (if any).
// - In --json mode, stdout must be valid JSONL, one event per line.
// For both modes, any other output must be written to stderr.
#![deny(clippy::print_stdout)]
mod cli;
mod event_processor;
mod event_processor_with_human_output;
pub mod event_processor_with_jsonl_output;
pub mod exec_events;
pub use cli::Cli;
pub use cli::Command;
pub use cli::ReviewArgs;
use codex_common::oss::ensure_oss_provider_ready;
use codex_common::oss::get_default_model_for_oss_provider;
use codex_core::AuthManager;
use codex_core::ConversationManager;
use codex_core::LMSTUDIO_OSS_PROVIDER_ID;
use codex_core::NewConversation;
use codex_core::OLLAMA_OSS_PROVIDER_ID;
use codex_core::auth::enforce_login_restrictions;
use codex_core::config::Config;
use codex_core::config::ConfigOverrides;
use codex_core::config::find_codex_home;
use codex_core::config::load_config_as_toml_with_cli_overrides;
use codex_core::config::resolve_oss_provider;
use codex_core::git_info::get_git_repo_root;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_core::protocol::ReviewRequest;
use codex_core::protocol::ReviewTarget;
use codex_core::protocol::SessionSource;
use codex_protocol::approvals::ElicitationAction;
use codex_protocol::config_types::SandboxMode;
use codex_protocol::user_input::UserInput;
use codex_utils_absolute_path::AbsolutePathBuf;
use event_processor_with_human_output::EventProcessorWithHumanOutput;
use event_processor_with_jsonl_output::EventProcessorWithJsonOutput;
use serde_json::Value;
use std::io::IsTerminal;
use std::io::Read;
use std::path::PathBuf;
use supports_color::Stream;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::prelude::*;
use crate::cli::Command as ExecCommand;
use crate::event_processor::CodexStatus;
use crate::event_processor::EventProcessor;
use codex_core::default_client::set_default_originator;
use codex_core::find_conversation_path_by_id_str;
enum InitialOperation {
UserTurn {
items: Vec<UserInput>,
output_schema: Option<Value>,
},
Review {
review_request: ReviewRequest,
},
}
pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()> {
if let Err(err) = set_default_originator("codex_exec".to_string()) {
tracing::warn!(?err, "Failed to set codex exec originator override {err:?}");
}
let Cli {
command,
images,
model: model_cli_arg,
oss,
oss_provider,
config_profile,
full_auto,
dangerously_bypass_approvals_and_sandbox,
cwd,
skip_git_repo_check,
add_dir,
color,
last_message_file,
json: json_mode,
sandbox_mode: sandbox_mode_cli_arg,
prompt,
output_schema: output_schema_path,
config_overrides,
} = cli;
let (stdout_with_ansi, stderr_with_ansi) = match color {
cli::Color::Always => (true, true),
cli::Color::Never => (false, false),
cli::Color::Auto => (
supports_color::on_cached(Stream::Stdout).is_some(),
supports_color::on_cached(Stream::Stderr).is_some(),
),
};
// Build fmt layer (existing logging) to compose with OTEL layer.
let default_level = "error";
// Build env_filter separately and attach via with_filter.
let env_filter = EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::try_new(default_level))
.unwrap_or_else(|_| EnvFilter::new(default_level));
let fmt_layer = tracing_subscriber::fmt::layer()
.with_ansi(stderr_with_ansi)
.with_writer(std::io::stderr)
.with_filter(env_filter);
let sandbox_mode = if full_auto {
Some(SandboxMode::WorkspaceWrite)
} else if dangerously_bypass_approvals_and_sandbox {
Some(SandboxMode::DangerFullAccess)
} else {
sandbox_mode_cli_arg.map(Into::<SandboxMode>::into)
};
// Parse `-c` overrides from the CLI.
let cli_kv_overrides = match config_overrides.parse_overrides() {
Ok(v) => v,
#[allow(clippy::print_stderr)]
Err(e) => {
eprintln!("Error parsing -c overrides: {e}");
std::process::exit(1);
}
};
let resolved_cwd = cwd.clone();
let config_cwd = match resolved_cwd.as_deref() {
Some(path) => AbsolutePathBuf::from_absolute_path(path.canonicalize()?)?,
None => AbsolutePathBuf::current_dir()?,
};
// we load config.toml here to determine project state.
#[allow(clippy::print_stderr)]
let config_toml = {
let codex_home = match find_codex_home() {
Ok(codex_home) => codex_home,
Err(err) => {
eprintln!("Error finding codex home: {err}");
std::process::exit(1);
}
};
match load_config_as_toml_with_cli_overrides(
&codex_home,
&config_cwd,
cli_kv_overrides.clone(),
)
.await
{
Ok(config_toml) => config_toml,
Err(err) => {
eprintln!("Error loading config.toml: {err}");
std::process::exit(1);
}
}
};
let model_provider = if oss {
let resolved = resolve_oss_provider(
oss_provider.as_deref(),
&config_toml,
config_profile.clone(),
);
if let Some(provider) = resolved {
Some(provider)
} else {
return Err(anyhow::anyhow!(
"No default OSS provider configured. Use --local-provider=provider or set oss_provider to either {LMSTUDIO_OSS_PROVIDER_ID} or {OLLAMA_OSS_PROVIDER_ID} in config.toml"
));
}
} else {
None // No OSS mode enabled
};
// When using `--oss`, let the bootstrapper pick the model based on selected provider
let model = if let Some(model) = model_cli_arg {
Some(model)
} else if oss {
model_provider
.as_ref()
.and_then(|provider_id| get_default_model_for_oss_provider(provider_id))
.map(std::borrow::ToOwned::to_owned)
} else {
None // No model specified, will use the default.
};
// Load configuration and determine approval policy
let overrides = ConfigOverrides {
model,
review_model: None,
config_profile,
// Default to never ask for approvals in headless mode. Feature flags can override.
approval_policy: Some(AskForApproval::Never),
sandbox_mode,
cwd: resolved_cwd,
model_provider: model_provider.clone(),
codex_linux_sandbox_exe,
base_instructions: None,
developer_instructions: None,
compact_prompt: None,
include_apply_patch_tool: None,
show_raw_agent_reasoning: oss.then_some(true),
tools_web_search_request: None,
additional_writable_roots: add_dir,
};
let config =
Config::load_with_cli_overrides_and_harness_overrides(cli_kv_overrides, overrides).await?;
if let Err(err) = enforce_login_restrictions(&config).await {
eprintln!("{err}");
std::process::exit(1);
}
let otel = codex_core::otel_init::build_provider(&config, env!("CARGO_PKG_VERSION"));
#[allow(clippy::print_stderr)]
let otel = match otel {
Ok(otel) => otel,
Err(e) => {
eprintln!("Could not create otel exporter: {e}");
std::process::exit(1);
}
};
let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer());
let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());
let _ = tracing_subscriber::registry()
.with(fmt_layer)
.with(otel_tracing_layer)
.with(otel_logger_layer)
.try_init();
let mut event_processor: Box<dyn EventProcessor> = match json_mode {
true => Box::new(EventProcessorWithJsonOutput::new(last_message_file.clone())),
_ => Box::new(EventProcessorWithHumanOutput::create_with_ansi(
stdout_with_ansi,
&config,
last_message_file.clone(),
)),
};
if oss {
// We're in the oss section, so provider_id should be Some
// Let's handle None case gracefully though just in case
let provider_id = match model_provider.as_ref() {
Some(id) => id,
None => {
error!("OSS provider unexpectedly not set when oss flag is used");
return Err(anyhow::anyhow!(
"OSS provider not set but oss flag was used"
));
}
};
ensure_oss_provider_ready(provider_id, &config)
.await
.map_err(|e| anyhow::anyhow!("OSS setup failed: {e}"))?;
}
let default_cwd = config.cwd.to_path_buf();
let default_approval_policy = config.approval_policy.value();
let default_sandbox_policy = config.sandbox_policy.get();
let default_effort = config.model_reasoning_effort;
let default_summary = config.model_reasoning_summary;
if !skip_git_repo_check && get_git_repo_root(&default_cwd).is_none() {
eprintln!("Not inside a trusted directory and --skip-git-repo-check was not specified.");
std::process::exit(1);
}
let auth_manager = AuthManager::shared(
config.codex_home.clone(),
true,
config.cli_auth_credentials_store_mode,
);
let conversation_manager = ConversationManager::new(auth_manager.clone(), SessionSource::Exec);
let default_model = conversation_manager
.get_models_manager()
.get_model(&config.model, &config)
.await;
// Handle resume subcommand by resolving a rollout path and using explicit resume API.
let NewConversation {
conversation_id: _,
conversation,
session_configured,
} = if let Some(ExecCommand::Resume(args)) = command.as_ref() {
let resume_path = resolve_resume_path(&config, args).await?;
if let Some(path) = resume_path {
conversation_manager
.resume_conversation_from_rollout(config.clone(), path, auth_manager.clone())
.await?
} else {
conversation_manager
.new_conversation(config.clone())
.await?
}
} else {
conversation_manager
.new_conversation(config.clone())
.await?
};
let (initial_operation, prompt_summary) = match (command, prompt, images) {
(Some(ExecCommand::Review(review_cli)), _, _) => {
let review_request = build_review_request(review_cli)?;
let summary = codex_core::review_prompts::user_facing_hint(&review_request.target);
(InitialOperation::Review { review_request }, summary)
}
(Some(ExecCommand::Resume(args)), root_prompt, imgs) => {
let prompt_arg = args
.prompt
.clone()
.or_else(|| {
if args.last {
args.session_id.clone()
} else {
None
}
})
.or(root_prompt);
let prompt_text = resolve_prompt(prompt_arg);
let mut items: Vec<UserInput> = imgs
.into_iter()
.chain(args.images.into_iter())
.map(|path| UserInput::LocalImage { path })
.collect();
items.push(UserInput::Text {
text: prompt_text.clone(),
});
let output_schema = load_output_schema(output_schema_path.clone());
(
InitialOperation::UserTurn {
items,
output_schema,
},
prompt_text,
)
}
(None, root_prompt, imgs) => {
let prompt_text = resolve_prompt(root_prompt);
let mut items: Vec<UserInput> = imgs
.into_iter()
.map(|path| UserInput::LocalImage { path })
.collect();
items.push(UserInput::Text {
text: prompt_text.clone(),
});
let output_schema = load_output_schema(output_schema_path);
(
InitialOperation::UserTurn {
items,
output_schema,
},
prompt_text,
)
}
};
// Print the effective configuration and initial request so users can see what Codex
// is using.
event_processor.print_config_summary(&config, &prompt_summary, &session_configured);
info!("Codex initialized with event: {session_configured:?}");
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
{
let conversation = conversation.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
tracing::debug!("Keyboard interrupt");
// Immediately notify Codex to abort any inflight task.
conversation.submit(Op::Interrupt).await.ok();
// Exit the inner loop and return to the main input prompt. The codex
// will emit a `TurnInterrupted` (Error) event which is drained later.
break;
}
res = conversation.next_event() => match res {
Ok(event) => {
debug!("Received event: {event:?}");
let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete);
if let Err(e) = tx.send(event) {
error!("Error sending event: {e:?}");
break;
}
if is_shutdown_complete {
info!("Received shutdown event, exiting event loop.");
break;
}
},
Err(e) => {
error!("Error receiving event: {e:?}");
break;
}
}
}
}
});
}
match initial_operation {
InitialOperation::UserTurn {
items,
output_schema,
} => {
let task_id = conversation
.submit(Op::UserTurn {
items,
cwd: default_cwd,
approval_policy: default_approval_policy,
sandbox_policy: default_sandbox_policy.clone(),
model: default_model,
effort: default_effort,
summary: default_summary,
final_output_json_schema: output_schema,
})
.await?;
info!("Sent prompt with event ID: {task_id}");
task_id
}
InitialOperation::Review { review_request } => {
let task_id = conversation.submit(Op::Review { review_request }).await?;
info!("Sent review request with event ID: {task_id}");
task_id
}
};
// Run the loop until the task is complete.
// Track whether a fatal error was reported by the server so we can
// exit with a non-zero status for automation-friendly signaling.
let mut error_seen = false;
while let Some(event) = rx.recv().await {
if let EventMsg::ElicitationRequest(ev) = &event.msg {
// Automatically cancel elicitation requests in exec mode.
conversation
.submit(Op::ResolveElicitation {
server_name: ev.server_name.clone(),
request_id: ev.id.clone(),
decision: ElicitationAction::Cancel,
})
.await?;
}
if matches!(event.msg, EventMsg::Error(_)) {
error_seen = true;
}
let shutdown: CodexStatus = event_processor.process_event(event);
match shutdown {
CodexStatus::Running => continue,
CodexStatus::InitiateShutdown => {
conversation.submit(Op::Shutdown).await?;
}
CodexStatus::Shutdown => {
break;
}
}
}
event_processor.print_final_output();
if error_seen {
std::process::exit(1);
}
Ok(())
}
async fn resolve_resume_path(
config: &Config,
args: &crate::cli::ResumeArgs,
) -> anyhow::Result<Option<PathBuf>> {
if args.last {
let default_provider_filter = vec![config.model_provider_id.clone()];
match codex_core::RolloutRecorder::list_conversations(
&config.codex_home,
1,
None,
&[],
Some(default_provider_filter.as_slice()),
&config.model_provider_id,
)
.await
{
Ok(page) => Ok(page.items.first().map(|it| it.path.clone())),
Err(e) => {
error!("Error listing conversations: {e}");
Ok(None)
}
}
} else if let Some(id_str) = args.session_id.as_deref() {
let path = find_conversation_path_by_id_str(&config.codex_home, id_str).await?;
Ok(path)
} else {
Ok(None)
}
}
fn load_output_schema(path: Option<PathBuf>) -> Option<Value> {
let path = path?;
let schema_str = match std::fs::read_to_string(&path) {
Ok(contents) => contents,
Err(err) => {
eprintln!(
"Failed to read output schema file {}: {err}",
path.display()
);
std::process::exit(1);
}
};
match serde_json::from_str::<Value>(&schema_str) {
Ok(value) => Some(value),
Err(err) => {
eprintln!(
"Output schema file {} is not valid JSON: {err}",
path.display()
);
std::process::exit(1);
}
}
}
fn resolve_prompt(prompt_arg: Option<String>) -> String {
match prompt_arg {
Some(p) if p != "-" => p,
maybe_dash => {
let force_stdin = matches!(maybe_dash.as_deref(), Some("-"));
if std::io::stdin().is_terminal() && !force_stdin {
eprintln!(
"No prompt provided. Either specify one as an argument or pipe the prompt into stdin."
);
std::process::exit(1);
}
if !force_stdin {
eprintln!("Reading prompt from stdin...");
}
let mut buffer = String::new();
if let Err(e) = std::io::stdin().read_to_string(&mut buffer) {
eprintln!("Failed to read prompt from stdin: {e}");
std::process::exit(1);
} else if buffer.trim().is_empty() {
eprintln!("No prompt provided via stdin.");
std::process::exit(1);
}
buffer
}
}
}
fn build_review_request(args: ReviewArgs) -> anyhow::Result<ReviewRequest> {
let target = if args.uncommitted {
ReviewTarget::UncommittedChanges
} else if let Some(branch) = args.base {
ReviewTarget::BaseBranch { branch }
} else if let Some(sha) = args.commit {
ReviewTarget::Commit {
sha,
title: args.commit_title,
}
} else if let Some(prompt_arg) = args.prompt {
let prompt = resolve_prompt(Some(prompt_arg)).trim().to_string();
if prompt.is_empty() {
anyhow::bail!("Review prompt cannot be empty");
}
ReviewTarget::Custom {
instructions: prompt,
}
} else {
anyhow::bail!(
"Specify --uncommitted, --base, --commit, or provide custom review instructions"
);
};
Ok(ReviewRequest {
target,
user_facing_hint: None,
})
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
#[test]
fn builds_uncommitted_review_request() {
let request = build_review_request(ReviewArgs {
uncommitted: true,
base: None,
commit: None,
commit_title: None,
prompt: None,
})
.expect("builds uncommitted review request");
let expected = ReviewRequest {
target: ReviewTarget::UncommittedChanges,
user_facing_hint: None,
};
assert_eq!(request, expected);
}
#[test]
fn builds_commit_review_request_with_title() {
let request = build_review_request(ReviewArgs {
uncommitted: false,
base: None,
commit: Some("123456789".to_string()),
commit_title: Some("Add review command".to_string()),
prompt: None,
})
.expect("builds commit review request");
let expected = ReviewRequest {
target: ReviewTarget::Commit {
sha: "123456789".to_string(),
title: Some("Add review command".to_string()),
},
user_facing_hint: None,
};
assert_eq!(request, expected);
}
#[test]
fn builds_custom_review_request_trims_prompt() {
let request = build_review_request(ReviewArgs {
uncommitted: false,
base: None,
commit: None,
commit_title: None,
prompt: Some(" custom review instructions ".to_string()),
})
.expect("builds custom review request");
let expected = ReviewRequest {
target: ReviewTarget::Custom {
instructions: "custom review instructions".to_string(),
},
user_facing_hint: None,
};
assert_eq!(request, expected);
}
}