core-agent-ide/codex-rs/exec/src/lib.rs
vishnu-oai 04c1782e52
OpenTelemetry events (#2103)
### Title

## otel

Codex can emit [OpenTelemetry](https://opentelemetry.io/) **log events**
that
describe each run: outbound API requests, streamed responses, user
input,
tool-approval decisions, and the result of every tool invocation. Export
is
**disabled by default** so local runs remain self-contained. Opt in by
adding an
`[otel]` table and choosing an exporter.

```toml
[otel]
environment = "staging"   # defaults to "dev"
exporter = "none"          # defaults to "none"; set to otlp-http or otlp-grpc to send events
log_user_prompt = false    # defaults to false; redact prompt text unless explicitly enabled
```

Codex tags every exported event with `service.name = "codex-cli"`, the
CLI
version, and an `env` attribute so downstream collectors can distinguish
dev/staging/prod traffic. Only telemetry produced inside the
`codex_otel`
crate—the events listed below—is forwarded to the exporter.

### Event catalog

Every event shares a common set of metadata fields: `event.timestamp`,
`conversation.id`, `app.version`, `auth_mode` (when available),
`user.account_id` (when available), `terminal.type`, `model`, and
`slug`.

With OTEL enabled Codex emits the following event types (in addition to
the
metadata above):

- `codex.api_request`
  - `cf_ray` (optional)
  - `attempt`
  - `duration_ms`
  - `http.response.status_code` (optional)
  - `error.message` (failures)
- `codex.sse_event`
  - `event.kind`
  - `duration_ms`
  - `error.message` (failures)
  - `input_token_count` (completion only)
  - `output_token_count` (completion only)
  - `cached_token_count` (completion only, optional)
  - `reasoning_token_count` (completion only, optional)
  - `tool_token_count` (completion only)
- `codex.user_prompt`
  - `prompt_length`
  - `prompt` (redacted unless `log_user_prompt = true`)
- `codex.tool_decision`
  - `tool_name`
  - `call_id`
- `decision` (`approved`, `approved_for_session`, `denied`, or `abort`)
  - `source` (`config` or `user`)
- `codex.tool_result`
  - `tool_name`
  - `call_id`
  - `arguments`
  - `duration_ms` (execution time for the tool)
  - `success` (`"true"` or `"false"`)
  - `output`

### Choosing an exporter

Set `otel.exporter` to control where events go:

- `none` – leaves instrumentation active but skips exporting. This is
the
  default.
- `otlp-http` – posts OTLP log records to an OTLP/HTTP collector.
Specify the
  endpoint, protocol, and headers your collector expects:

  ```toml
  [otel]
  exporter = { otlp-http = {
    endpoint = "https://otel.example.com/v1/logs",
    protocol = "binary",
    headers = { "x-otlp-api-key" = "${OTLP_TOKEN}" }
  }}
  ```

- `otlp-grpc` – streams OTLP log records over gRPC. Provide the endpoint
and any
  metadata headers:

  ```toml
  [otel]
  exporter = { otlp-grpc = {
    endpoint = "https://otel.example.com:4317",
    headers = { "x-otlp-meta" = "abc123" }
  }}
  ```

If the exporter is `none` nothing is written anywhere; otherwise you
must run or point to your
own collector. All exporters run on a background batch worker that is
flushed on
shutdown.

If you build Codex from source the OTEL crate is still behind an `otel`
feature
flag; the official prebuilt binaries ship with the feature enabled. When
the
feature is disabled the telemetry hooks become no-ops so the CLI
continues to
function without the extra dependencies.

---------

Co-authored-by: Anton Panasenko <apanasenko@openai.com>
2025-09-29 11:30:55 -07:00

428 lines
15 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

mod cli;
mod event_processor;
mod event_processor_with_human_output;
pub mod event_processor_with_json_output;
pub mod exec_events;
pub mod experimental_event_processor_with_json_output;
pub use cli::Cli;
use codex_core::AuthManager;
use codex_core::BUILT_IN_OSS_MODEL_PROVIDER_ID;
use codex_core::ConversationManager;
use codex_core::NewConversation;
use codex_core::config::Config;
use codex_core::config::ConfigOverrides;
use codex_core::git_info::get_git_repo_root;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_core::protocol::TaskCompleteEvent;
use codex_ollama::DEFAULT_OSS_MODEL;
use codex_protocol::config_types::SandboxMode;
use event_processor_with_human_output::EventProcessorWithHumanOutput;
use event_processor_with_json_output::EventProcessorWithJsonOutput;
use experimental_event_processor_with_json_output::ExperimentalEventProcessorWithJsonOutput;
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use serde_json::Value;
use std::io::IsTerminal;
use std::io::Read;
use std::path::PathBuf;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::prelude::*;
use crate::cli::Command as ExecCommand;
use crate::event_processor::CodexStatus;
use crate::event_processor::EventProcessor;
use codex_core::find_conversation_path_by_id_str;
pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()> {
let Cli {
command,
images,
model: model_cli_arg,
oss,
config_profile,
full_auto,
dangerously_bypass_approvals_and_sandbox,
cwd,
skip_git_repo_check,
color,
last_message_file,
json: json_mode,
experimental_json,
sandbox_mode: sandbox_mode_cli_arg,
prompt,
output_schema: output_schema_path,
include_plan_tool,
config_overrides,
} = cli;
// Determine the prompt source (parent or subcommand) and read from stdin if needed.
let prompt_arg = match &command {
// Allow prompt before the subcommand by falling back to the parent-level prompt
// when the Resume subcommand did not provide its own prompt.
Some(ExecCommand::Resume(args)) => args.prompt.clone().or(prompt),
None => prompt,
};
let prompt = match prompt_arg {
Some(p) if p != "-" => p,
// Either `-` was passed or no positional arg.
maybe_dash => {
// When no arg (None) **and** stdin is a TTY, bail out early unless the
// user explicitly forced reading via `-`.
let force_stdin = matches!(maybe_dash.as_deref(), Some("-"));
if std::io::stdin().is_terminal() && !force_stdin {
eprintln!(
"No prompt provided. Either specify one as an argument or pipe the prompt into stdin."
);
std::process::exit(1);
}
// Ensure the user knows we are waiting on stdin, as they may
// have gotten into this state by mistake. If so, and they are not
// writing to stdin, Codex will hang indefinitely, so this should
// help them debug in that case.
if !force_stdin {
eprintln!("Reading prompt from stdin...");
}
let mut buffer = String::new();
if let Err(e) = std::io::stdin().read_to_string(&mut buffer) {
eprintln!("Failed to read prompt from stdin: {e}");
std::process::exit(1);
} else if buffer.trim().is_empty() {
eprintln!("No prompt provided via stdin.");
std::process::exit(1);
}
buffer
}
};
let output_schema = load_output_schema(output_schema_path);
let (stdout_with_ansi, stderr_with_ansi) = match color {
cli::Color::Always => (true, true),
cli::Color::Never => (false, false),
cli::Color::Auto => (
std::io::stdout().is_terminal(),
std::io::stderr().is_terminal(),
),
};
// Build fmt layer (existing logging) to compose with OTEL layer.
let default_level = "error";
// Build env_filter separately and attach via with_filter.
let env_filter = EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::try_new(default_level))
.unwrap_or_else(|_| EnvFilter::new(default_level));
let fmt_layer = tracing_subscriber::fmt::layer()
.with_ansi(stderr_with_ansi)
.with_writer(std::io::stderr)
.with_filter(env_filter);
let sandbox_mode = if full_auto {
Some(SandboxMode::WorkspaceWrite)
} else if dangerously_bypass_approvals_and_sandbox {
Some(SandboxMode::DangerFullAccess)
} else {
sandbox_mode_cli_arg.map(Into::<SandboxMode>::into)
};
// When using `--oss`, let the bootstrapper pick the model (defaulting to
// gpt-oss:20b) and ensure it is present locally. Also, force the builtin
// `oss` model provider.
let model = if let Some(model) = model_cli_arg {
Some(model)
} else if oss {
Some(DEFAULT_OSS_MODEL.to_owned())
} else {
None // No model specified, will use the default.
};
let model_provider = if oss {
Some(BUILT_IN_OSS_MODEL_PROVIDER_ID.to_string())
} else {
None // No specific model provider override.
};
// Load configuration and determine approval policy
let overrides = ConfigOverrides {
model,
review_model: None,
config_profile,
// This CLI is intended to be headless and has no affordances for asking
// the user for approval.
approval_policy: Some(AskForApproval::Never),
sandbox_mode,
cwd: cwd.map(|p| p.canonicalize().unwrap_or(p)),
model_provider,
codex_linux_sandbox_exe,
base_instructions: None,
include_plan_tool: Some(include_plan_tool),
include_apply_patch_tool: None,
include_view_image_tool: None,
show_raw_agent_reasoning: oss.then_some(true),
tools_web_search_request: None,
};
// Parse `-c` overrides.
let cli_kv_overrides = match config_overrides.parse_overrides() {
Ok(v) => v,
Err(e) => {
eprintln!("Error parsing -c overrides: {e}");
std::process::exit(1);
}
};
let config = Config::load_with_cli_overrides(cli_kv_overrides, overrides)?;
let otel = codex_core::otel_init::build_provider(&config, env!("CARGO_PKG_VERSION"));
#[allow(clippy::print_stderr)]
let otel = match otel {
Ok(otel) => otel,
Err(e) => {
eprintln!("Could not create otel exporter: {e}");
std::process::exit(1);
}
};
if let Some(provider) = otel.as_ref() {
let otel_layer = OpenTelemetryTracingBridge::new(&provider.logger).with_filter(
tracing_subscriber::filter::filter_fn(codex_core::otel_init::codex_export_filter),
);
let _ = tracing_subscriber::registry()
.with(fmt_layer)
.with(otel_layer)
.try_init();
} else {
let _ = tracing_subscriber::registry().with(fmt_layer).try_init();
}
let mut event_processor: Box<dyn EventProcessor> = match (json_mode, experimental_json) {
(_, true) => Box::new(ExperimentalEventProcessorWithJsonOutput::new(
last_message_file.clone(),
)),
(true, _) => {
eprintln!(
"The existing `--json` output format is being deprecated. Please try the new format using `--experimental-json`."
);
Box::new(EventProcessorWithJsonOutput::new(last_message_file.clone()))
}
_ => Box::new(EventProcessorWithHumanOutput::create_with_ansi(
stdout_with_ansi,
&config,
last_message_file.clone(),
)),
};
if oss {
codex_ollama::ensure_oss_ready(&config)
.await
.map_err(|e| anyhow::anyhow!("OSS setup failed: {e}"))?;
}
let default_cwd = config.cwd.to_path_buf();
let default_approval_policy = config.approval_policy;
let default_sandbox_policy = config.sandbox_policy.clone();
let default_model = config.model.clone();
let default_effort = config.model_reasoning_effort;
let default_summary = config.model_reasoning_summary;
if !skip_git_repo_check && get_git_repo_root(&default_cwd).is_none() {
eprintln!("Not inside a trusted directory and --skip-git-repo-check was not specified.");
std::process::exit(1);
}
let conversation_manager =
ConversationManager::new(AuthManager::shared(config.codex_home.clone()));
// Handle resume subcommand by resolving a rollout path and using explicit resume API.
let NewConversation {
conversation_id: _,
conversation,
session_configured,
} = if let Some(ExecCommand::Resume(args)) = command {
let resume_path = resolve_resume_path(&config, &args).await?;
if let Some(path) = resume_path {
conversation_manager
.resume_conversation_from_rollout(
config.clone(),
path,
AuthManager::shared(config.codex_home.clone()),
)
.await?
} else {
conversation_manager
.new_conversation(config.clone())
.await?
}
} else {
conversation_manager
.new_conversation(config.clone())
.await?
};
// Print the effective configuration and prompt so users can see what Codex
// is using.
event_processor.print_config_summary(&config, &prompt, &session_configured);
info!("Codex initialized with event: {session_configured:?}");
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
{
let conversation = conversation.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
tracing::debug!("Keyboard interrupt");
// Immediately notify Codex to abort any inflight task.
conversation.submit(Op::Interrupt).await.ok();
// Exit the inner loop and return to the main input prompt. The codex
// will emit a `TurnInterrupted` (Error) event which is drained later.
break;
}
res = conversation.next_event() => match res {
Ok(event) => {
debug!("Received event: {event:?}");
let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete);
if let Err(e) = tx.send(event) {
error!("Error sending event: {e:?}");
break;
}
if is_shutdown_complete {
info!("Received shutdown event, exiting event loop.");
break;
}
},
Err(e) => {
error!("Error receiving event: {e:?}");
break;
}
}
}
}
});
}
// Send images first, if any.
if !images.is_empty() {
let items: Vec<InputItem> = images
.into_iter()
.map(|path| InputItem::LocalImage { path })
.collect();
let initial_images_event_id = conversation.submit(Op::UserInput { items }).await?;
info!("Sent images with event ID: {initial_images_event_id}");
while let Ok(event) = conversation.next_event().await {
if event.id == initial_images_event_id
&& matches!(
event.msg,
EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: _,
})
)
{
break;
}
}
}
// Send the prompt.
let items: Vec<InputItem> = vec![InputItem::Text { text: prompt }];
let initial_prompt_task_id = conversation
.submit(Op::UserTurn {
items,
cwd: default_cwd,
approval_policy: default_approval_policy,
sandbox_policy: default_sandbox_policy,
model: default_model,
effort: default_effort,
summary: default_summary,
final_output_json_schema: output_schema,
})
.await?;
info!("Sent prompt with event ID: {initial_prompt_task_id}");
// Run the loop until the task is complete.
// Track whether a fatal error was reported by the server so we can
// exit with a non-zero status for automation-friendly signaling.
let mut error_seen = false;
while let Some(event) = rx.recv().await {
if matches!(event.msg, EventMsg::Error(_)) {
error_seen = true;
}
let shutdown: CodexStatus = event_processor.process_event(event);
match shutdown {
CodexStatus::Running => continue,
CodexStatus::InitiateShutdown => {
conversation.submit(Op::Shutdown).await?;
}
CodexStatus::Shutdown => {
break;
}
}
}
if error_seen {
std::process::exit(1);
}
Ok(())
}
async fn resolve_resume_path(
config: &Config,
args: &crate::cli::ResumeArgs,
) -> anyhow::Result<Option<PathBuf>> {
if args.last {
match codex_core::RolloutRecorder::list_conversations(&config.codex_home, 1, None).await {
Ok(page) => Ok(page.items.first().map(|it| it.path.clone())),
Err(e) => {
error!("Error listing conversations: {e}");
Ok(None)
}
}
} else if let Some(id_str) = args.session_id.as_deref() {
let path = find_conversation_path_by_id_str(&config.codex_home, id_str).await?;
Ok(path)
} else {
Ok(None)
}
}
fn load_output_schema(path: Option<PathBuf>) -> Option<Value> {
let path = path?;
let schema_str = match std::fs::read_to_string(&path) {
Ok(contents) => contents,
Err(err) => {
eprintln!(
"Failed to read output schema file {}: {err}",
path.display()
);
std::process::exit(1);
}
};
match serde_json::from_str::<Value>(&schema_str) {
Ok(value) => Some(value),
Err(err) => {
eprintln!(
"Output schema file {} is not valid JSON: {err}",
path.display()
);
std::process::exit(1);
}
}
}