From fb2df99cf132c0980d1b54e1a8fb64a1f9ed8906 Mon Sep 17 00:00:00 2001 From: Celia Chen Date: Mon, 2 Feb 2026 16:06:44 -0800 Subject: [PATCH] [feat] persist thread_dynamic_tools in db (#10252) Persist thread_dynamic_tools in sqlite and read first from it. Fall back to rollout files if it's not found. Persist dynamic tools to both sqlite and rollout files. Saw that new sessions get populated to db correctly & old sessions get backfilled correctly at startup: ``` celia@com-92114 codex-rs % sqlite3 ~/.codex/state.sqlite \ "select thread_id, position,name,description,input_schema from thread_dynamic_tools;" 019c0cad-ec0d-74b2-a787-e8b33a349117|0|geo_lookup|lookup a city|{"properties":{"city":{"type":"string"}},"required":["city"],"type":"object"} .... 019c10ca-aa4b-7620-ae40-c0919fbd7ea7|0|geo_lookup|lookup a city|{"properties":{"city":{"type":"string"}},"required":["city"],"type":"object"} ``` --- codex-rs/app-server-test-client/src/main.rs | 117 ++++++++++++++++-- codex-rs/core/src/codex.rs | 31 ++++- codex-rs/core/src/rollout/metadata.rs | 23 ++++ codex-rs/core/src/state_db.rs | 47 +++++++ codex-rs/core/tests/suite/sqlite_state.rs | 37 +++++- .../migrations/0004_thread_dynamic_tools.sql | 11 ++ codex-rs/state/src/runtime.rs | 109 ++++++++++++++++ 7 files changed, 359 insertions(+), 16 deletions(-) create mode 100644 codex-rs/state/migrations/0004_thread_dynamic_tools.sql diff --git a/codex-rs/app-server-test-client/src/main.rs b/codex-rs/app-server-test-client/src/main.rs index d949c3a73..6d3fc06d8 100644 --- a/codex-rs/app-server-test-client/src/main.rs +++ b/codex-rs/app-server-test-client/src/main.rs @@ -1,7 +1,9 @@ use std::collections::VecDeque; +use std::fs; use std::io::BufRead; use std::io::BufReader; use std::io::Write; +use std::path::Path; use std::process::Child; use std::process::ChildStdin; use std::process::ChildStdout; @@ -24,6 +26,7 @@ use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::CommandExecutionApprovalDecision; use codex_app_server_protocol::CommandExecutionRequestApprovalParams; use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; +use codex_app_server_protocol::DynamicToolSpec; use codex_app_server_protocol::FileChangeApprovalDecision; use codex_app_server_protocol::FileChangeRequestApprovalParams; use codex_app_server_protocol::FileChangeRequestApprovalResponse; @@ -83,6 +86,15 @@ struct Cli { )] config_overrides: Vec, + /// JSON array of dynamic tool specs or a single tool object. + /// Prefix a filename with '@' to read from a file. + /// + /// Example: + /// --dynamic-tools '[{"name":"demo","description":"Demo","inputSchema":{"type":"object"}}]' + /// --dynamic-tools @/path/to/tools.json + #[arg(long, value_name = "json-or-@file", global = true)] + dynamic_tools: Option, + #[command(subcommand)] command: CliCommand, } @@ -140,23 +152,29 @@ fn main() -> Result<()> { let Cli { codex_bin, config_overrides, + dynamic_tools, command, } = Cli::parse(); + let dynamic_tools = parse_dynamic_tools_arg(&dynamic_tools)?; + match command { CliCommand::SendMessage { user_message } => { + ensure_dynamic_tools_unused(&dynamic_tools, "send-message")?; send_message(&codex_bin, &config_overrides, user_message) } CliCommand::SendMessageV2 { user_message } => { - send_message_v2(&codex_bin, &config_overrides, user_message) + send_message_v2(&codex_bin, &config_overrides, user_message, &dynamic_tools) } CliCommand::TriggerCmdApproval { user_message } => { - trigger_cmd_approval(&codex_bin, &config_overrides, user_message) + trigger_cmd_approval(&codex_bin, &config_overrides, user_message, &dynamic_tools) } CliCommand::TriggerPatchApproval { user_message } => { - trigger_patch_approval(&codex_bin, &config_overrides, user_message) + trigger_patch_approval(&codex_bin, &config_overrides, user_message, &dynamic_tools) + } + CliCommand::NoTriggerCmdApproval => { + no_trigger_cmd_approval(&codex_bin, &config_overrides, &dynamic_tools) } - CliCommand::NoTriggerCmdApproval => no_trigger_cmd_approval(&codex_bin, &config_overrides), CliCommand::SendFollowUpV2 { first_message, follow_up_message, @@ -165,10 +183,20 @@ fn main() -> Result<()> { &config_overrides, first_message, follow_up_message, + &dynamic_tools, ), - CliCommand::TestLogin => test_login(&codex_bin, &config_overrides), - CliCommand::GetAccountRateLimits => get_account_rate_limits(&codex_bin, &config_overrides), - CliCommand::ModelList => model_list(&codex_bin, &config_overrides), + CliCommand::TestLogin => { + ensure_dynamic_tools_unused(&dynamic_tools, "test-login")?; + test_login(&codex_bin, &config_overrides) + } + CliCommand::GetAccountRateLimits => { + ensure_dynamic_tools_unused(&dynamic_tools, "get-account-rate-limits")?; + get_account_rate_limits(&codex_bin, &config_overrides) + } + CliCommand::ModelList => { + ensure_dynamic_tools_unused(&dynamic_tools, "model-list")?; + model_list(&codex_bin, &config_overrides) + } } } @@ -198,14 +226,23 @@ fn send_message_v2( codex_bin: &str, config_overrides: &[String], user_message: String, + dynamic_tools: &Option>, ) -> Result<()> { - send_message_v2_with_policies(codex_bin, config_overrides, user_message, None, None) + send_message_v2_with_policies( + codex_bin, + config_overrides, + user_message, + None, + None, + dynamic_tools, + ) } fn trigger_cmd_approval( codex_bin: &str, config_overrides: &[String], user_message: Option, + dynamic_tools: &Option>, ) -> Result<()> { let default_prompt = "Run `touch /tmp/should-trigger-approval` so I can confirm the file exists."; @@ -216,6 +253,7 @@ fn trigger_cmd_approval( message, Some(AskForApproval::OnRequest), Some(SandboxPolicy::ReadOnly), + dynamic_tools, ) } @@ -223,6 +261,7 @@ fn trigger_patch_approval( codex_bin: &str, config_overrides: &[String], user_message: Option, + dynamic_tools: &Option>, ) -> Result<()> { let default_prompt = "Create a file named APPROVAL_DEMO.txt containing a short hello message using apply_patch."; @@ -233,12 +272,24 @@ fn trigger_patch_approval( message, Some(AskForApproval::OnRequest), Some(SandboxPolicy::ReadOnly), + dynamic_tools, ) } -fn no_trigger_cmd_approval(codex_bin: &str, config_overrides: &[String]) -> Result<()> { +fn no_trigger_cmd_approval( + codex_bin: &str, + config_overrides: &[String], + dynamic_tools: &Option>, +) -> Result<()> { let prompt = "Run `touch should_not_trigger_approval.txt`"; - send_message_v2_with_policies(codex_bin, config_overrides, prompt.to_string(), None, None) + send_message_v2_with_policies( + codex_bin, + config_overrides, + prompt.to_string(), + None, + None, + dynamic_tools, + ) } fn send_message_v2_with_policies( @@ -247,13 +298,17 @@ fn send_message_v2_with_policies( user_message: String, approval_policy: Option, sandbox_policy: Option, + dynamic_tools: &Option>, ) -> Result<()> { let mut client = CodexClient::spawn(codex_bin, config_overrides)?; let initialize = client.initialize()?; println!("< initialize response: {initialize:?}"); - let thread_response = client.thread_start(ThreadStartParams::default())?; + let thread_response = client.thread_start(ThreadStartParams { + dynamic_tools: dynamic_tools.clone(), + ..Default::default() + })?; println!("< thread/start response: {thread_response:?}"); let mut turn_params = TurnStartParams { thread_id: thread_response.thread.id.clone(), @@ -280,13 +335,17 @@ fn send_follow_up_v2( config_overrides: &[String], first_message: String, follow_up_message: String, + dynamic_tools: &Option>, ) -> Result<()> { let mut client = CodexClient::spawn(codex_bin, config_overrides)?; let initialize = client.initialize()?; println!("< initialize response: {initialize:?}"); - let thread_response = client.thread_start(ThreadStartParams::default())?; + let thread_response = client.thread_start(ThreadStartParams { + dynamic_tools: dynamic_tools.clone(), + ..Default::default() + })?; println!("< thread/start response: {thread_response:?}"); let first_turn_params = TurnStartParams { @@ -372,6 +431,40 @@ fn model_list(codex_bin: &str, config_overrides: &[String]) -> Result<()> { Ok(()) } +fn ensure_dynamic_tools_unused( + dynamic_tools: &Option>, + command: &str, +) -> Result<()> { + if dynamic_tools.is_some() { + bail!( + "dynamic tools are only supported for v2 thread/start; remove --dynamic-tools for {command} or use send-message-v2" + ); + } + Ok(()) +} + +fn parse_dynamic_tools_arg(dynamic_tools: &Option) -> Result>> { + let Some(raw_arg) = dynamic_tools.as_deref() else { + return Ok(None); + }; + + let raw_json = if let Some(path) = raw_arg.strip_prefix('@') { + fs::read_to_string(Path::new(path)) + .with_context(|| format!("read dynamic tools file {path}"))? + } else { + raw_arg.to_string() + }; + + let value: Value = serde_json::from_str(&raw_json).context("parse dynamic tools JSON")?; + let tools = match value { + Value::Array(_) => serde_json::from_value(value).context("decode dynamic tools array")?, + Value::Object(_) => vec![serde_json::from_value(value).context("decode dynamic tool")?], + _ => bail!("dynamic tools JSON must be an object or array"), + }; + + Ok(Some(tools)) +} + struct CodexClient { child: Child, stdin: Option, diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index ba5ee02d6..b4e385950 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -333,9 +333,36 @@ impl Codex { .clone() .or_else(|| conversation_history.get_base_instructions().map(|s| s.text)) .unwrap_or_else(|| model_info.get_model_instructions(config.personality)); - // Respect explicit thread-start tools; fall back to persisted tools when resuming a thread. + + // Respect thread-start tools. When missing (resumed/forked threads), read from the db + // first, then fall back to rollout-file tools. + let persisted_tools = if dynamic_tools.is_empty() + && config.features.enabled(Feature::Sqlite) + { + let thread_id = match &conversation_history { + InitialHistory::Resumed(resumed) => Some(resumed.conversation_id), + InitialHistory::Forked(_) => conversation_history.forked_from_id(), + InitialHistory::New => None, + }; + match thread_id { + Some(thread_id) => { + let state_db_ctx = state_db::open_if_present( + config.codex_home.as_path(), + config.model_provider_id.as_str(), + ) + .await; + state_db::get_dynamic_tools(state_db_ctx.as_deref(), thread_id, "codex_spawn") + .await + } + None => None, + } + } else { + None + }; let dynamic_tools = if dynamic_tools.is_empty() { - conversation_history.get_dynamic_tools().unwrap_or_default() + persisted_tools + .or_else(|| conversation_history.get_dynamic_tools()) + .unwrap_or_default() } else { dynamic_tools }; diff --git a/codex-rs/core/src/rollout/metadata.rs b/codex-rs/core/src/rollout/metadata.rs index 2d59d71af..42e52f78d 100644 --- a/codex-rs/core/src/rollout/metadata.rs +++ b/codex-rs/core/src/rollout/metadata.rs @@ -187,6 +187,29 @@ pub(crate) async fn backfill_sessions( warn!("failed to upsert rollout {}: {err}", path.display()); } else { stats.upserted = stats.upserted.saturating_add(1); + if let Ok(meta_line) = rollout::list::read_session_meta_line(&path).await { + if let Err(err) = runtime + .persist_dynamic_tools( + meta_line.meta.id, + meta_line.meta.dynamic_tools.as_deref(), + ) + .await + { + if let Some(otel) = otel { + otel.counter( + DB_ERROR_METRIC, + 1, + &[("stage", "backfill_dynamic_tools")], + ); + } + warn!("failed to backfill dynamic tools {}: {err}", path.display()); + } + } else { + warn!( + "failed to read session meta for dynamic tools {}", + path.display() + ); + } } } Err(err) => { diff --git a/codex-rs/core/src/state_db.rs b/codex-rs/core/src/state_db.rs index a52f46593..ff95ed946 100644 --- a/codex-rs/core/src/state_db.rs +++ b/codex-rs/core/src/state_db.rs @@ -9,6 +9,7 @@ use chrono::Timelike; use chrono::Utc; use codex_otel::OtelManager; use codex_protocol::ThreadId; +use codex_protocol::dynamic_tools::DynamicToolSpec; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; use codex_state::DB_METRIC_COMPARE_ERROR; @@ -196,6 +197,37 @@ pub async fn find_rollout_path_by_id( }) } +/// Get dynamic tools for a thread id using SQLite. +pub async fn get_dynamic_tools( + context: Option<&codex_state::StateRuntime>, + thread_id: ThreadId, + stage: &str, +) -> Option> { + let ctx = context?; + match ctx.get_dynamic_tools(thread_id).await { + Ok(tools) => tools, + Err(err) => { + warn!("state db get_dynamic_tools failed during {stage}: {err}"); + None + } + } +} + +/// Persist dynamic tools for a thread id using SQLite, if none exist yet. +pub async fn persist_dynamic_tools( + context: Option<&codex_state::StateRuntime>, + thread_id: ThreadId, + tools: Option<&[DynamicToolSpec]>, + stage: &str, +) { + let Some(ctx) = context else { + return; + }; + if let Err(err) = ctx.persist_dynamic_tools(thread_id, tools).await { + warn!("state db persist_dynamic_tools failed during {stage}: {err}"); + } +} + /// Reconcile rollout items into SQLite, falling back to scanning the rollout file. pub async fn reconcile_rollout( context: Option<&codex_state::StateRuntime>, @@ -235,6 +267,21 @@ pub async fn reconcile_rollout( "state db reconcile_rollout upsert failed {}: {err}", rollout_path.display() ); + return; + } + if let Ok(meta_line) = crate::rollout::list::read_session_meta_line(rollout_path).await { + persist_dynamic_tools( + Some(ctx), + meta_line.meta.id, + meta_line.meta.dynamic_tools.as_deref(), + "reconcile_rollout", + ) + .await; + } else { + warn!( + "state db reconcile_rollout missing session meta {}", + rollout_path.display() + ); } } diff --git a/codex-rs/core/tests/suite/sqlite_state.rs b/codex-rs/core/tests/suite/sqlite_state.rs index 2582f90cb..218da3482 100644 --- a/codex-rs/core/tests/suite/sqlite_state.rs +++ b/codex-rs/core/tests/suite/sqlite_state.rs @@ -1,6 +1,7 @@ use anyhow::Result; use codex_core::features::Feature; use codex_protocol::ThreadId; +use codex_protocol::dynamic_tools::DynamicToolSpec; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::RolloutLine; @@ -74,6 +75,28 @@ async fn backfill_scans_existing_rollouts() -> Result<()> { let rollout_rel_path = format!("sessions/2026/01/27/rollout-2026-01-27T12-00-00-{uuid}.jsonl"); let rollout_rel_path_for_hook = rollout_rel_path.clone(); + let dynamic_tools = vec![ + DynamicToolSpec { + name: "geo_lookup".to_string(), + description: "lookup a city".to_string(), + input_schema: json!({ + "type": "object", + "required": ["city"], + "properties": { "city": { "type": "string" } } + }), + }, + DynamicToolSpec { + name: "weather_lookup".to_string(), + description: "lookup weather".to_string(), + input_schema: json!({ + "type": "object", + "required": ["zip"], + "properties": { "zip": { "type": "string" } } + }), + }, + ]; + let dynamic_tools_for_hook = dynamic_tools.clone(); + let mut builder = test_codex() .with_pre_build_hook(move |codex_home| { let rollout_path = codex_home.join(&rollout_rel_path_for_hook); @@ -81,7 +104,6 @@ async fn backfill_scans_existing_rollouts() -> Result<()> { .parent() .expect("rollout path should have parent"); fs::create_dir_all(parent).expect("should create rollout directory"); - let session_meta_line = SessionMetaLine { meta: SessionMeta { id: thread_id, @@ -93,7 +115,7 @@ async fn backfill_scans_existing_rollouts() -> Result<()> { source: SessionSource::default(), model_provider: None, base_instructions: None, - dynamic_tools: None, + dynamic_tools: Some(dynamic_tools_for_hook), }, git: None, }; @@ -155,6 +177,17 @@ async fn backfill_scans_existing_rollouts() -> Result<()> { assert_eq!(metadata.model_provider, default_provider); assert!(metadata.has_user_event); + let mut stored_tools = None; + for _ in 0..40 { + stored_tools = db.get_dynamic_tools(thread_id).await?; + if stored_tools.is_some() { + break; + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + let stored_tools = stored_tools.expect("dynamic tools should be stored"); + assert_eq!(stored_tools, dynamic_tools); + Ok(()) } diff --git a/codex-rs/state/migrations/0004_thread_dynamic_tools.sql b/codex-rs/state/migrations/0004_thread_dynamic_tools.sql new file mode 100644 index 000000000..0f40b5f80 --- /dev/null +++ b/codex-rs/state/migrations/0004_thread_dynamic_tools.sql @@ -0,0 +1,11 @@ +CREATE TABLE thread_dynamic_tools ( + thread_id TEXT NOT NULL, + position INTEGER NOT NULL, + name TEXT NOT NULL, + description TEXT NOT NULL, + input_schema TEXT NOT NULL, + PRIMARY KEY(thread_id, position), + FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE +); + +CREATE INDEX idx_thread_dynamic_tools_thread ON thread_dynamic_tools(thread_id); diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index 6f2e12896..3b37b6d42 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -16,8 +16,10 @@ use chrono::DateTime; use chrono::Utc; use codex_otel::OtelManager; use codex_protocol::ThreadId; +use codex_protocol::dynamic_tools::DynamicToolSpec; use codex_protocol::protocol::RolloutItem; use log::LevelFilter; +use serde_json::Value; use sqlx::ConnectOptions; use sqlx::QueryBuilder; use sqlx::Row; @@ -117,6 +119,38 @@ WHERE id = ? .transpose() } + /// Get dynamic tools for a thread, if present. + pub async fn get_dynamic_tools( + &self, + thread_id: ThreadId, + ) -> anyhow::Result>> { + let rows = sqlx::query( + r#" +SELECT name, description, input_schema +FROM thread_dynamic_tools +WHERE thread_id = ? +ORDER BY position ASC + "#, + ) + .bind(thread_id.to_string()) + .fetch_all(self.pool.as_ref()) + .await?; + if rows.is_empty() { + return Ok(None); + } + let mut tools = Vec::with_capacity(rows.len()); + for row in rows { + let input_schema: String = row.try_get("input_schema")?; + let input_schema = serde_json::from_str::(input_schema.as_str())?; + tools.push(DynamicToolSpec { + name: row.try_get("name")?, + description: row.try_get("description")?, + input_schema, + }); + } + Ok(Some(tools)) + } + /// Find a rollout path by thread id using the underlying database. pub async fn find_rollout_path_by_id( &self, @@ -369,6 +403,58 @@ ON CONFLICT(id) DO UPDATE SET Ok(()) } + /// Persist dynamic tools for a thread if none have been stored yet. + /// + /// Dynamic tools are defined at thread start and should not change afterward. + /// This only writes the first time we see tools for a given thread. + pub async fn persist_dynamic_tools( + &self, + thread_id: ThreadId, + tools: Option<&[DynamicToolSpec]>, + ) -> anyhow::Result<()> { + let Some(tools) = tools else { + return Ok(()); + }; + if tools.is_empty() { + return Ok(()); + } + let mut tx = self.pool.begin().await?; + let thread_id = thread_id.to_string(); + let existing: Option = + sqlx::query_scalar("SELECT 1 FROM thread_dynamic_tools WHERE thread_id = ? LIMIT 1") + .bind(thread_id.as_str()) + .fetch_optional(&mut *tx) + .await?; + if existing.is_some() { + tx.commit().await?; + return Ok(()); + } + for (idx, tool) in tools.iter().enumerate() { + let position = i64::try_from(idx).unwrap_or(i64::MAX); + let input_schema = serde_json::to_string(&tool.input_schema)?; + sqlx::query( + r#" +INSERT INTO thread_dynamic_tools ( + thread_id, + position, + name, + description, + input_schema +) VALUES (?, ?, ?, ?, ?) + "#, + ) + .bind(thread_id.as_str()) + .bind(position) + .bind(tool.name.as_str()) + .bind(tool.description.as_str()) + .bind(input_schema) + .execute(&mut *tx) + .await?; + } + tx.commit().await?; + Ok(()) + } + /// Apply rollout items incrementally using the underlying database. pub async fn apply_rollout_items( &self, @@ -390,12 +476,25 @@ ON CONFLICT(id) DO UPDATE SET if let Some(updated_at) = file_modified_time_utc(builder.rollout_path.as_path()).await { metadata.updated_at = updated_at; } + // Keep the thread upsert before dynamic tools to satisfy the foreign key constraint: + // thread_dynamic_tools.thread_id -> threads.id. if let Err(err) = self.upsert_thread(&metadata).await { if let Some(otel) = otel { otel.counter(DB_ERROR_METRIC, 1, &[("stage", "apply_rollout_items")]); } return Err(err); } + let dynamic_tools = extract_dynamic_tools(items); + if let Some(dynamic_tools) = dynamic_tools + && let Err(err) = self + .persist_dynamic_tools(builder.id, dynamic_tools.as_deref()) + .await + { + if let Some(otel) = otel { + otel.counter(DB_ERROR_METRIC, 1, &[("stage", "persist_dynamic_tools")]); + } + return Err(err); + } Ok(()) } @@ -507,6 +606,16 @@ fn push_like_filters<'a>( builder.push(")"); } +fn extract_dynamic_tools(items: &[RolloutItem]) -> Option>> { + items.iter().find_map(|item| match item { + RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.dynamic_tools.clone()), + RolloutItem::ResponseItem(_) + | RolloutItem::Compacted(_) + | RolloutItem::TurnContext(_) + | RolloutItem::EventMsg(_) => None, + }) +} + async fn open_sqlite(path: &Path) -> anyhow::Result { let options = SqliteConnectOptions::new() .filename(path)