diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index 045301e09..423d90085 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -83,6 +83,9 @@ ], "type": "object" }, + "AgentPath": { + "type": "string" + }, "AppBranding": { "description": "EXPERIMENTAL - app metadata returned by app-list APIs.", "properties": { @@ -1999,6 +2002,17 @@ "null" ] }, + "agent_path": { + "anyOf": [ + { + "$ref": "#/definitions/AgentPath" + }, + { + "type": "null" + } + ], + "default": null + }, "agent_role": { "default": null, "type": [ diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 3d392be1a..261411ded 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -4899,6 +4899,9 @@ "title": "AgentMessageDeltaNotification", "type": "object" }, + "AgentPath": { + "type": "string" + }, "AnalyticsConfig": { "additionalProperties": true, "properties": { @@ -11515,6 +11518,17 @@ "null" ] }, + "agent_path": { + "anyOf": [ + { + "$ref": "#/definitions/v2/AgentPath" + }, + { + "type": "null" + } + ], + "default": null + }, "agent_role": { "default": null, "type": [ diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index e06b5d1a1..e48397d3f 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -139,6 +139,9 @@ "title": "AgentMessageDeltaNotification", "type": "object" }, + "AgentPath": { + "type": "string" + }, "AnalyticsConfig": { "additionalProperties": true, "properties": { @@ -9275,6 +9278,17 @@ "null" ] }, + "agent_path": { + "anyOf": [ + { + "$ref": "#/definitions/AgentPath" + }, + { + "type": "null" + } + ], + "default": null + }, "agent_role": { "default": null, "type": [ diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json index 8aee99f90..774042191 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json @@ -5,6 +5,9 @@ "description": "A path that is guaranteed to be absolute and normalized (though it is not guaranteed to be canonicalized or exist on the filesystem).\n\nIMPORTANT: When deserializing an `AbsolutePathBuf`, a base path must be set using [AbsolutePathBufGuard::new]. If no base path is set, the deserialization will fail unless the path being deserialized is already absolute.", "type": "string" }, + "AgentPath": { + "type": "string" + }, "ApprovalsReviewer": { "description": "Configures who approval requests are routed to for review. Examples include sandbox escapes, blocked network access, MCP approval prompts, and ARC escalations. Defaults to `user`. `guardian_subagent` uses a carefully prompted subagent to gather relevant context and apply a risk-based decision framework before approving or denying the request.", "enum": [ @@ -900,6 +903,17 @@ "null" ] }, + "agent_path": { + "anyOf": [ + { + "$ref": "#/definitions/AgentPath" + }, + { + "type": "null" + } + ], + "default": null + }, "agent_role": { "default": null, "type": [ diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json index 05f3ae87c..55f02fedb 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json @@ -1,6 +1,9 @@ { "$schema": "http://json-schema.org/draft-07/schema#", "definitions": { + "AgentPath": { + "type": "string" + }, "ByteRange": { "properties": { "end": { @@ -658,6 +661,17 @@ "null" ] }, + "agent_path": { + "anyOf": [ + { + "$ref": "#/definitions/AgentPath" + }, + { + "type": "null" + } + ], + "default": null + }, "agent_role": { "default": null, "type": [ diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json index 214c25f54..300f8d1f3 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json @@ -1,6 +1,9 @@ { "$schema": "http://json-schema.org/draft-07/schema#", "definitions": { + "AgentPath": { + "type": "string" + }, "ByteRange": { "properties": { "end": { @@ -658,6 +661,17 @@ "null" ] }, + "agent_path": { + "anyOf": [ + { + "$ref": "#/definitions/AgentPath" + }, + { + "type": "null" + } + ], + "default": null + }, "agent_role": { "default": null, "type": [ diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json index 2a8fe06ec..6c6597a66 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json @@ -1,6 +1,9 @@ { "$schema": "http://json-schema.org/draft-07/schema#", "definitions": { + "AgentPath": { + "type": "string" + }, "ByteRange": { "properties": { "end": { @@ -658,6 +661,17 @@ "null" ] }, + "agent_path": { + "anyOf": [ + { + "$ref": "#/definitions/AgentPath" + }, + { + "type": "null" + } + ], + "default": null + }, "agent_role": { "default": null, "type": [ diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json index 468325cef..35a41983a 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json @@ -5,6 +5,9 @@ "description": "A path that is guaranteed to be absolute and normalized (though it is not guaranteed to be canonicalized or exist on the filesystem).\n\nIMPORTANT: When deserializing an `AbsolutePathBuf`, a base path must be set using [AbsolutePathBufGuard::new]. If no base path is set, the deserialization will fail unless the path being deserialized is already absolute.", "type": "string" }, + "AgentPath": { + "type": "string" + }, "ApprovalsReviewer": { "description": "Configures who approval requests are routed to for review. Examples include sandbox escapes, blocked network access, MCP approval prompts, and ARC escalations. Defaults to `user`. `guardian_subagent` uses a carefully prompted subagent to gather relevant context and apply a risk-based decision framework before approving or denying the request.", "enum": [ @@ -900,6 +903,17 @@ "null" ] }, + "agent_path": { + "anyOf": [ + { + "$ref": "#/definitions/AgentPath" + }, + { + "type": "null" + } + ], + "default": null + }, "agent_role": { "default": null, "type": [ diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json index def818dcf..35e03397b 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json @@ -1,6 +1,9 @@ { "$schema": "http://json-schema.org/draft-07/schema#", "definitions": { + "AgentPath": { + "type": "string" + }, "ByteRange": { "properties": { "end": { @@ -658,6 +661,17 @@ "null" ] }, + "agent_path": { + "anyOf": [ + { + "$ref": "#/definitions/AgentPath" + }, + { + "type": "null" + } + ], + "default": null + }, "agent_role": { "default": null, "type": [ diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json index c225b1c0f..568c65456 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json @@ -5,6 +5,9 @@ "description": "A path that is guaranteed to be absolute and normalized (though it is not guaranteed to be canonicalized or exist on the filesystem).\n\nIMPORTANT: When deserializing an `AbsolutePathBuf`, a base path must be set using [AbsolutePathBufGuard::new]. If no base path is set, the deserialization will fail unless the path being deserialized is already absolute.", "type": "string" }, + "AgentPath": { + "type": "string" + }, "ApprovalsReviewer": { "description": "Configures who approval requests are routed to for review. Examples include sandbox escapes, blocked network access, MCP approval prompts, and ARC escalations. Defaults to `user`. `guardian_subagent` uses a carefully prompted subagent to gather relevant context and apply a risk-based decision framework before approving or denying the request.", "enum": [ @@ -900,6 +903,17 @@ "null" ] }, + "agent_path": { + "anyOf": [ + { + "$ref": "#/definitions/AgentPath" + }, + { + "type": "null" + } + ], + "default": null + }, "agent_role": { "default": null, "type": [ diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json index df7670cdb..971233fcd 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json @@ -1,6 +1,9 @@ { "$schema": "http://json-schema.org/draft-07/schema#", "definitions": { + "AgentPath": { + "type": "string" + }, "ByteRange": { "properties": { "end": { @@ -658,6 +661,17 @@ "null" ] }, + "agent_path": { + "anyOf": [ + { + "$ref": "#/definitions/AgentPath" + }, + { + "type": "null" + } + ], + "default": null + }, "agent_role": { "default": null, "type": [ diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json index d95cd4dd8..94046cd18 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json @@ -1,6 +1,9 @@ { "$schema": "http://json-schema.org/draft-07/schema#", "definitions": { + "AgentPath": { + "type": "string" + }, "ByteRange": { "properties": { "end": { @@ -658,6 +661,17 @@ "null" ] }, + "agent_path": { + "anyOf": [ + { + "$ref": "#/definitions/AgentPath" + }, + { + "type": "null" + } + ], + "default": null + }, "agent_role": { "default": null, "type": [ diff --git a/codex-rs/app-server-protocol/schema/typescript/AgentPath.ts b/codex-rs/app-server-protocol/schema/typescript/AgentPath.ts new file mode 100644 index 000000000..6e55ce69e --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/AgentPath.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type AgentPath = string; diff --git a/codex-rs/app-server-protocol/schema/typescript/SubAgentSource.ts b/codex-rs/app-server-protocol/schema/typescript/SubAgentSource.ts index df261bf3e..669e5802b 100644 --- a/codex-rs/app-server-protocol/schema/typescript/SubAgentSource.ts +++ b/codex-rs/app-server-protocol/schema/typescript/SubAgentSource.ts @@ -1,6 +1,7 @@ // GENERATED CODE! DO NOT MODIFY BY HAND! // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { AgentPath } from "./AgentPath"; import type { ThreadId } from "./ThreadId"; -export type SubAgentSource = "review" | "compact" | { "thread_spawn": { parent_thread_id: ThreadId, depth: number, agent_nickname: string | null, agent_role: string | null, } } | "memory_consolidation" | { "other": string }; +export type SubAgentSource = "review" | "compact" | { "thread_spawn": { parent_thread_id: ThreadId, depth: number, agent_path: AgentPath | null, agent_nickname: string | null, agent_role: string | null, } } | "memory_consolidation" | { "other": string }; diff --git a/codex-rs/app-server-protocol/schema/typescript/index.ts b/codex-rs/app-server-protocol/schema/typescript/index.ts index 73f2cc8e5..777feaa56 100644 --- a/codex-rs/app-server-protocol/schema/typescript/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/index.ts @@ -1,6 +1,7 @@ // GENERATED CODE! DO NOT MODIFY BY HAND! export type { AbsolutePathBuf } from "./AbsolutePathBuf"; +export type { AgentPath } from "./AgentPath"; export type { ApplyPatchApprovalParams } from "./ApplyPatchApprovalParams"; export type { ApplyPatchApprovalResponse } from "./ApplyPatchApprovalResponse"; export type { AuthMode } from "./AuthMode"; diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 06e2cd3ec..1b02e4bb6 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -8240,6 +8240,7 @@ fn with_thread_spawn_agent_metadata( codex_protocol::protocol::SubAgentSource::ThreadSpawn { parent_thread_id, depth, + agent_path, agent_nickname: existing_agent_nickname, agent_role: existing_agent_role, }, @@ -8247,6 +8248,7 @@ fn with_thread_spawn_agent_metadata( codex_protocol::protocol::SubAgentSource::ThreadSpawn { parent_thread_id, depth, + agent_path, agent_nickname: agent_nickname.or(existing_agent_nickname), agent_role: agent_role.or(existing_agent_role), }, @@ -8793,6 +8795,7 @@ mod tests { source: SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth: 1, + agent_path: None, agent_nickname: None, agent_role: None, }), @@ -8885,6 +8888,7 @@ mod tests { serde_json::to_string(&SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id: ThreadId::from_string("ad7f0408-99b8-4f6e-a46f-bd0eec433370")?, depth: 1, + agent_path: None, agent_nickname: None, agent_role: None, }))?; diff --git a/codex-rs/app-server/src/filters.rs b/codex-rs/app-server/src/filters.rs index a59750961..6d2b90dba 100644 --- a/codex-rs/app-server/src/filters.rs +++ b/codex-rs/app-server/src/filters.rs @@ -133,6 +133,7 @@ mod tests { let spawn = CoreSessionSource::SubAgent(CoreSubAgentSource::ThreadSpawn { parent_thread_id, depth: 1, + agent_path: None, agent_nickname: None, agent_role: None, }); diff --git a/codex-rs/app-server/tests/common/rollout.rs b/codex-rs/app-server/tests/common/rollout.rs index 8146f7ae9..b67390154 100644 --- a/codex-rs/app-server/tests/common/rollout.rs +++ b/codex-rs/app-server/tests/common/rollout.rs @@ -79,6 +79,7 @@ pub fn create_fake_rollout_with_source( originator: "codex".to_string(), cli_version: "0.0.0".to_string(), source, + agent_path: None, agent_nickname: None, agent_role: None, model_provider: model_provider.map(str::to_string), @@ -161,6 +162,7 @@ pub fn create_fake_rollout_with_text_elements( originator: "codex".to_string(), cli_version: "0.0.0".to_string(), source: SessionSource::Cli, + agent_path: None, agent_nickname: None, agent_role: None, model_provider: model_provider.map(str::to_string), diff --git a/codex-rs/app-server/tests/suite/v2/thread_list.rs b/codex-rs/app-server/tests/suite/v2/thread_list.rs index 79031ddd3..75bffe622 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_list.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_list.rs @@ -661,6 +661,7 @@ async fn thread_list_filters_by_source_kind_subagent_thread_spawn() -> Result<() CoreSessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth: 1, + agent_path: None, agent_nickname: None, agent_role: None, }), @@ -724,6 +725,7 @@ async fn thread_list_filters_by_subagent_variant() -> Result<()> { CoreSessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth: 1, + agent_path: None, agent_nickname: None, agent_role: None, }), diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index 5cbcd3b25..4443abd6e 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -322,6 +322,7 @@ stream_max_retries = 0 originator: "codex".to_string(), cli_version: "0.0.0".to_string(), source: RolloutSessionSource::Cli, + agent_path: None, agent_nickname: None, agent_role: None, model_provider: Some("mock_provider".to_string()), diff --git a/codex-rs/core/config.schema.json b/codex-rs/core/config.schema.json index 056b4c4b7..ba774380d 100644 --- a/codex-rs/core/config.schema.json +++ b/codex-rs/core/config.schema.json @@ -422,6 +422,9 @@ "multi_agent": { "type": "boolean" }, + "multi_agent_v2": { + "type": "boolean" + }, "personality": { "type": "boolean" }, @@ -2028,6 +2031,9 @@ "multi_agent": { "type": "boolean" }, + "multi_agent_v2": { + "type": "boolean" + }, "personality": { "type": "boolean" }, diff --git a/codex-rs/core/src/agent/agent_resolver.rs b/codex-rs/core/src/agent/agent_resolver.rs new file mode 100644 index 000000000..3d1f75f57 --- /dev/null +++ b/codex-rs/core/src/agent/agent_resolver.rs @@ -0,0 +1,55 @@ +use crate::codex::Session; +use crate::codex::TurnContext; +use crate::function_tool::FunctionCallError; +use codex_protocol::ThreadId; +use std::sync::Arc; + +/// Resolves a single tool-facing agent target to a thread id. +pub(crate) async fn resolve_agent_target( + session: &Arc, + turn: &Arc, + target: &str, +) -> Result { + register_session_root(session, turn); + if let Ok(thread_id) = ThreadId::from_string(target) { + return Ok(thread_id); + } + + session + .services + .agent_control + .resolve_agent_reference(session.conversation_id, &turn.session_source, target) + .await + .map_err(|err| match err { + crate::error::CodexErr::UnsupportedOperation(message) => { + FunctionCallError::RespondToModel(message) + } + other => FunctionCallError::RespondToModel(other.to_string()), + }) +} + +/// Resolves multiple tool-facing agent targets to thread ids. +pub(crate) async fn resolve_agent_targets( + session: &Arc, + turn: &Arc, + targets: Vec, +) -> Result, FunctionCallError> { + if targets.is_empty() { + return Err(FunctionCallError::RespondToModel( + "agent targets must be non-empty".to_string(), + )); + } + + let mut resolved = Vec::with_capacity(targets.len()); + for target in &targets { + resolved.push(resolve_agent_target(session, turn, target).await?); + } + Ok(resolved) +} + +fn register_session_root(session: &Arc, turn: &Arc) { + session + .services + .agent_control + .register_session_root(session.conversation_id, &turn.session_source); +} diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index d75fc8952..10cbd441b 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -1,4 +1,5 @@ use crate::agent::AgentStatus; +use crate::agent::guards::AgentMetadata; use crate::agent::guards::Guards; use crate::agent::role::DEFAULT_ROLE_NAME; use crate::agent::role::resolve_role_config; @@ -15,6 +16,7 @@ use crate::shell_snapshot::ShellSnapshot; use crate::state_db; use crate::thread_manager::ThreadManagerState; use codex_features::Feature; +use codex_protocol::AgentPath; use codex_protocol::ThreadId; use codex_protocol::models::FunctionCallOutputPayload; use codex_protocol::models::ResponseItem; @@ -41,6 +43,13 @@ pub(crate) struct SpawnAgentOptions { pub(crate) fork_parent_spawn_call_id: Option, } +#[derive(Clone, Debug)] +pub(crate) struct LiveAgent { + pub(crate) thread_id: ThreadId, + pub(crate) metadata: AgentMetadata, + pub(crate) status: AgentStatus, +} + fn default_agent_nickname_list() -> Vec<&'static str> { AGENT_NAMES .lines() @@ -69,9 +78,9 @@ fn agent_nickname_candidates( /// Control-plane handle for multi-agent operations. /// `AgentControl` is held by each session (via `SessionServices`). It provides capability to /// spawn new agents and the inter-agent communication layer. -/// An `AgentControl` instance is shared per "user session" which means the same `AgentControl` -/// is used for every sub-agent spawned by Codex. By doing so, we make sure the guards are -/// scoped to a user session. +/// An `AgentControl` instance is intended to be created at most once per root thread/session +/// tree. That same `AgentControl` is then shared with every sub-agent spawned from that root, +/// which keeps the guards scoped to that root thread rather than the entire `ThreadManager`. #[derive(Clone, Default)] pub(crate) struct AgentControl { /// Weak handle back to the global thread registry/state. @@ -97,17 +106,30 @@ impl AgentControl { items: Vec, session_source: Option, ) -> CodexResult { - self.spawn_agent_with_options(config, items, session_source, SpawnAgentOptions::default()) - .await + Ok(self + .spawn_agent_internal(config, items, session_source, SpawnAgentOptions::default()) + .await? + .thread_id) } - pub(crate) async fn spawn_agent_with_options( + pub(crate) async fn spawn_agent_with_metadata( &self, config: crate::config::Config, items: Vec, session_source: Option, options: SpawnAgentOptions, - ) -> CodexResult { + ) -> CodexResult { + self.spawn_agent_internal(config, items, session_source, options) + .await + } + + async fn spawn_agent_internal( + &self, + config: crate::config::Config, + items: Vec, + session_source: Option, + options: SpawnAgentOptions, + ) -> CodexResult { let state = self.upgrade()?; let mut reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?; let inherited_shell_snapshot = self @@ -116,25 +138,26 @@ impl AgentControl { let inherited_exec_policy = self .inherited_exec_policy_for_source(&state, session_source.as_ref(), &config) .await; - let session_source = match session_source { + let (session_source, mut agent_metadata) = match session_source { Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth, + agent_path, agent_role, .. })) => { - let candidate_names = agent_nickname_candidates(&config, agent_role.as_deref()); - let candidate_name_refs: Vec<&str> = - candidate_names.iter().map(String::as_str).collect(); - let agent_nickname = reservation.reserve_agent_nickname(&candidate_name_refs)?; - Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + let (session_source, agent_metadata) = self.prepare_thread_spawn( + &mut reservation, + &config, parent_thread_id, depth, - agent_nickname: Some(agent_nickname), + agent_path, agent_role, - })) + /*preferred_agent_nickname*/ None, + )?; + (Some(session_source), agent_metadata) } - other => other, + other => (other, AgentMetadata::default()), }; let notification_source = session_source.clone(); @@ -217,7 +240,8 @@ impl AgentControl { } None => state.spawn_new_thread(config, self.clone()).await?, }; - reservation.commit(new_thread.thread_id); + agent_metadata.agent_id = Some(new_thread.thread_id); + reservation.commit(agent_metadata.clone()); // Notify a new thread has been created. This notification will be processed by clients // to subscribe or drain this newly created thread. @@ -232,9 +256,22 @@ impl AgentControl { .await; self.send_input(new_thread.thread_id, items).await?; - self.maybe_start_completion_watcher(new_thread.thread_id, notification_source); + let child_reference = agent_metadata + .agent_path + .as_ref() + .map(ToString::to_string) + .unwrap_or_else(|| new_thread.thread_id.to_string()); + self.maybe_start_completion_watcher( + new_thread.thread_id, + notification_source, + child_reference, + ); - Ok(new_thread.thread_id) + Ok(LiveAgent { + thread_id: new_thread.thread_id, + metadata: agent_metadata, + status: self.get_status(new_thread.thread_id).await, + }) } /// Resume an existing agent thread from a recorded rollout file. @@ -283,6 +320,7 @@ impl AgentControl { SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth: child_depth, + agent_path: None, agent_nickname: None, agent_role: None, }); @@ -324,14 +362,14 @@ impl AgentControl { } let state = self.upgrade()?; let mut reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?; - let session_source = match session_source { + let (session_source, agent_metadata) = match session_source { SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth, - .. + agent_path, + agent_role: _, + agent_nickname: _, }) => { - // Collab resume callers rebuild a placeholder ThreadSpawn source. Rehydrate the - // stored nickname/role from sqlite when available; otherwise leave both unset. let (resumed_agent_nickname, resumed_agent_role) = if let Some(state_db_ctx) = state_db::get_state_db(&config).await { match state_db_ctx.get_thread(thread_id).await { @@ -341,27 +379,17 @@ impl AgentControl { } else { (None, None) }; - let reserved_agent_nickname = resumed_agent_nickname - .as_deref() - .map(|agent_nickname| { - let candidate_names = - agent_nickname_candidates(&config, resumed_agent_role.as_deref()); - let candidate_name_refs: Vec<&str> = - candidate_names.iter().map(String::as_str).collect(); - reservation.reserve_agent_nickname_with_preference( - &candidate_name_refs, - Some(agent_nickname), - ) - }) - .transpose()?; - SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + self.prepare_thread_spawn( + &mut reservation, + &config, parent_thread_id, depth, - agent_nickname: reserved_agent_nickname, - agent_role: resumed_agent_role, - }) + agent_path, + resumed_agent_role, + resumed_agent_nickname, + )? } - other => other, + other => (other, AgentMetadata::default()), }; let notification_source = session_source.clone(); let inherited_shell_snapshot = self @@ -393,13 +421,21 @@ impl AgentControl { inherited_exec_policy, ) .await?; - reservation.commit(resumed_thread.thread_id); + let mut agent_metadata = agent_metadata; + agent_metadata.agent_id = Some(resumed_thread.thread_id); + reservation.commit(agent_metadata.clone()); // Resumed threads are re-registered in-memory and need the same listener // attachment path as freshly spawned threads. state.notify_thread_created(resumed_thread.thread_id); + let child_reference = agent_metadata + .agent_path + .as_ref() + .map(ToString::to_string) + .unwrap_or_else(|| resumed_thread.thread_id.to_string()); self.maybe_start_completion_watcher( resumed_thread.thread_id, Some(notification_source.clone()), + child_reference, ); self.persist_thread_spawn_edge_for_source( resumed_thread.thread.as_ref(), @@ -500,21 +536,18 @@ impl AgentControl { thread.agent_status().await } - pub(crate) async fn get_agent_nickname_and_role( + pub(crate) fn register_session_root( &self, - agent_id: ThreadId, - ) -> Option<(Option, Option)> { - let Ok(state) = self.upgrade() else { - return None; - }; - let Ok(thread) = state.get_thread(agent_id).await else { - return None; - }; - let session_source = thread.config_snapshot().await.session_source; - Some(( - session_source.get_nickname(), - session_source.get_agent_role(), - )) + current_thread_id: ThreadId, + current_session_source: &SessionSource, + ) { + if thread_spawn_parent_thread_id(current_session_source).is_none() { + self.state.register_root_thread(current_thread_id); + } + } + + pub(crate) fn get_agent_metadata(&self, agent_id: ThreadId) -> Option { + self.state.agent_metadata_for_thread(agent_id) } pub(crate) async fn get_agent_config_snapshot( @@ -530,6 +563,33 @@ impl AgentControl { Some(thread.config_snapshot().await) } + pub(crate) async fn resolve_agent_reference( + &self, + _current_thread_id: ThreadId, + current_session_source: &SessionSource, + agent_reference: &str, + ) -> CodexResult { + let current_agent_path = current_session_source + .get_agent_path() + .unwrap_or_else(AgentPath::root); + let agent_path = current_agent_path + .resolve(agent_reference) + .map_err(CodexErr::UnsupportedOperation)?; + if agent_path.is_root() { + return Err(CodexErr::UnsupportedOperation( + "root is not a spawned agent".to_string(), + )); + } + + if let Some(thread_id) = self.state.agent_id_for_path(&agent_path) { + return Ok(thread_id); + } + Err(CodexErr::UnsupportedOperation(format!( + "live agent path `{}` not found", + agent_path.as_str() + ))) + } + /// Subscribe to status updates for `agent_id`, yielding the latest value and changes. pub(crate) async fn subscribe_status( &self, @@ -560,8 +620,13 @@ impl AgentControl { agents .into_iter() - .map(|(thread_id, nickname)| { - format_subagent_context_line(&thread_id.to_string(), nickname.as_deref()) + .map(|(thread_id, metadata)| { + let reference = metadata + .agent_path + .as_ref() + .map(|agent_path| agent_path.name().to_string()) + .unwrap_or_else(|| thread_id.to_string()); + format_subagent_context_line(reference.as_str(), metadata.agent_nickname.as_deref()) }) .collect::>() .join("\n") @@ -575,6 +640,7 @@ impl AgentControl { &self, child_thread_id: ThreadId, session_source: Option, + child_reference: String, ) { let Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, .. @@ -610,13 +676,52 @@ impl AgentControl { }; parent_thread .inject_user_message_without_turn(format_subagent_notification_message( - &child_thread_id.to_string(), + child_reference.as_str(), &status, )) .await; }); } + #[allow(clippy::too_many_arguments)] + fn prepare_thread_spawn( + &self, + reservation: &mut crate::agent::guards::SpawnReservation, + config: &crate::config::Config, + parent_thread_id: ThreadId, + depth: i32, + agent_path: Option, + agent_role: Option, + preferred_agent_nickname: Option, + ) -> CodexResult<(SessionSource, AgentMetadata)> { + if depth == 1 { + self.state.register_root_thread(parent_thread_id); + } + if let Some(agent_path) = agent_path.as_ref() { + reservation.reserve_agent_path(agent_path)?; + } + let candidate_names = agent_nickname_candidates(config, agent_role.as_deref()); + let candidate_name_refs: Vec<&str> = candidate_names.iter().map(String::as_str).collect(); + let agent_nickname = Some(reservation.reserve_agent_nickname_with_preference( + &candidate_name_refs, + preferred_agent_nickname.as_deref(), + )?); + let session_source = SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + parent_thread_id, + depth, + agent_path: agent_path.clone(), + agent_nickname: agent_nickname.clone(), + agent_role: agent_role.clone(), + }); + let agent_metadata = AgentMetadata { + agent_id: None, + agent_path, + agent_nickname, + agent_role, + }; + Ok((session_source, agent_metadata)) + } + fn upgrade(&self) -> CodexResult> { self.manager .upgrade() @@ -666,7 +771,7 @@ impl AgentControl { async fn open_thread_spawn_children( &self, parent_thread_id: ThreadId, - ) -> CodexResult)>> { + ) -> CodexResult> { let mut children_by_parent = self.live_thread_spawn_children().await?; Ok(children_by_parent .remove(&parent_thread_id) @@ -675,9 +780,9 @@ impl AgentControl { async fn live_thread_spawn_children( &self, - ) -> CodexResult)>>> { + ) -> CodexResult>> { let state = self.upgrade()?; - let mut children_by_parent = HashMap::)>>::new(); + let mut children_by_parent = HashMap::>::new(); for thread_id in state.list_thread_ids().await { let Ok(thread) = state.get_thread(thread_id).await else { @@ -691,11 +796,26 @@ impl AgentControl { children_by_parent .entry(parent_thread_id) .or_default() - .push((thread_id, snapshot.session_source.get_nickname())); + .push(( + thread_id, + self.state + .agent_metadata_for_thread(thread_id) + .unwrap_or(AgentMetadata { + agent_id: Some(thread_id), + ..Default::default() + }), + )); } for children in children_by_parent.values_mut() { - children.sort_by(|left, right| left.0.to_string().cmp(&right.0.to_string())); + children.sort_by(|left, right| { + left.1 + .agent_path + .as_deref() + .unwrap_or_default() + .cmp(right.1.agent_path.as_deref().unwrap_or_default()) + .then_with(|| left.0.to_string().cmp(&right.0.to_string())) + }); } Ok(children_by_parent) diff --git a/codex-rs/core/src/agent/control_tests.rs b/codex-rs/core/src/agent/control_tests.rs index 24344db71..20c051f85 100644 --- a/codex-rs/core/src/agent/control_tests.rs +++ b/codex-rs/core/src/agent/control_tests.rs @@ -443,12 +443,13 @@ async fn spawn_agent_can_fork_parent_thread_history() { let child_thread_id = harness .control - .spawn_agent_with_options( + .spawn_agent_with_metadata( harness.config.clone(), text_input("child task"), Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth: 1, + agent_path: None, agent_nickname: None, agent_role: None, })), @@ -457,7 +458,8 @@ async fn spawn_agent_can_fork_parent_thread_history() { }, ) .await - .expect("forked spawn should succeed"); + .expect("forked spawn should succeed") + .thread_id; let child_thread = harness .manager @@ -526,12 +528,13 @@ async fn spawn_agent_fork_injects_output_for_parent_spawn_call() { let child_thread_id = harness .control - .spawn_agent_with_options( + .spawn_agent_with_metadata( harness.config.clone(), text_input("child task"), Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth: 1, + agent_path: None, agent_nickname: None, agent_role: None, })), @@ -540,7 +543,8 @@ async fn spawn_agent_fork_injects_output_for_parent_spawn_call() { }, ) .await - .expect("forked spawn should succeed"); + .expect("forked spawn should succeed") + .thread_id; let child_thread = harness .manager @@ -596,12 +600,13 @@ async fn spawn_agent_fork_flushes_parent_rollout_before_loading_history() { let child_thread_id = harness .control - .spawn_agent_with_options( + .spawn_agent_with_metadata( harness.config.clone(), text_input("child task"), Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth: 1, + agent_path: None, agent_nickname: None, agent_role: None, })), @@ -610,7 +615,8 @@ async fn spawn_agent_fork_flushes_parent_rollout_before_loading_history() { }, ) .await - .expect("forked spawn should flush parent rollout before loading history"); + .expect("forked spawn should flush parent rollout before loading history") + .thread_id; let child_thread = harness .manager @@ -855,6 +861,7 @@ async fn spawn_child_completion_notifies_parent_history() { Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth: 1, + agent_path: None, agent_nickname: None, agent_role: Some("explorer".to_string()), })), @@ -886,9 +893,11 @@ async fn completion_watcher_notifies_parent_when_child_is_missing() { Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth: 1, + agent_path: None, agent_nickname: None, agent_role: Some("explorer".to_string()), })), + child_thread_id.to_string(), ); assert_eq!(wait_for_subagent_notification(&parent_thread).await, true); @@ -903,7 +912,7 @@ async fn completion_watcher_notifies_parent_when_child_is_missing() { assert_eq!( history_contains_text( &history_items, - &format!("\"agent_id\":\"{child_thread_id}\"") + &format!("\"agent_path\":\"{child_thread_id}\"") ), true ); @@ -926,6 +935,7 @@ async fn spawn_thread_subagent_gets_random_nickname_in_session_source() { Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth: 1, + agent_path: None, agent_nickname: None, agent_role: Some("explorer".to_string()), })), @@ -945,6 +955,7 @@ async fn spawn_thread_subagent_gets_random_nickname_in_session_source() { depth, agent_nickname, agent_role, + .. }) = snapshot.session_source else { panic!("expected thread-spawn sub-agent source"); @@ -976,6 +987,7 @@ async fn spawn_thread_subagent_uses_role_specific_nickname_candidates() { Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth: 1, + agent_path: None, agent_nickname: None, agent_role: Some("researcher".to_string()), })), @@ -1018,6 +1030,8 @@ async fn resume_thread_subagent_restores_stored_nickname_and_role() { control, }; let (parent_thread_id, _parent_thread) = harness.start_thread().await; + let agent_path = AgentPath::from_string("/root/explorer".to_string()) + .expect("test agent path should be valid"); let child_thread_id = harness .control @@ -1027,6 +1041,7 @@ async fn resume_thread_subagent_restores_stored_nickname_and_role() { Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth: 1, + agent_path: Some(agent_path.clone()), agent_nickname: None, agent_role: Some("explorer".to_string()), })), @@ -1095,6 +1110,7 @@ async fn resume_thread_subagent_restores_stored_nickname_and_role() { SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth: 1, + agent_path: Some(agent_path.clone()), agent_nickname: None, agent_role: None, }), @@ -1113,14 +1129,17 @@ async fn resume_thread_subagent_restores_stored_nickname_and_role() { let SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id: resumed_parent_thread_id, depth: resumed_depth, + agent_path: resumed_agent_path, agent_nickname: resumed_nickname, agent_role: resumed_role, + .. }) = resumed_snapshot.session_source else { panic!("expected thread-spawn sub-agent source"); }; assert_eq!(resumed_parent_thread_id, parent_thread_id); assert_eq!(resumed_depth, 1); + assert_eq!(resumed_agent_path, Some(agent_path)); assert_eq!(resumed_nickname, Some(original_nickname)); assert_eq!(resumed_role, Some("explorer".to_string())); @@ -1206,6 +1225,7 @@ async fn shutdown_agent_tree_closes_live_descendants() { Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth: 1, + agent_path: None, agent_nickname: None, agent_role: Some("explorer".to_string()), })), @@ -1220,6 +1240,7 @@ async fn shutdown_agent_tree_closes_live_descendants() { Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id: child_thread_id, depth: 2, + agent_path: None, agent_nickname: None, agent_role: Some("worker".to_string()), })), @@ -1289,6 +1310,7 @@ async fn shutdown_agent_tree_closes_descendants_when_started_at_child() { Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth: 1, + agent_path: None, agent_nickname: None, agent_role: Some("explorer".to_string()), })), @@ -1303,6 +1325,7 @@ async fn shutdown_agent_tree_closes_descendants_when_started_at_child() { Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id: child_thread_id, depth: 2, + agent_path: None, agent_nickname: None, agent_role: Some("worker".to_string()), })), @@ -1378,6 +1401,7 @@ async fn resume_agent_from_rollout_does_not_reopen_closed_descendants() { Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth: 1, + agent_path: None, agent_nickname: None, agent_role: Some("explorer".to_string()), })), @@ -1392,6 +1416,7 @@ async fn resume_agent_from_rollout_does_not_reopen_closed_descendants() { Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id: child_thread_id, depth: 2, + agent_path: None, agent_nickname: None, agent_role: Some("worker".to_string()), })), @@ -1471,6 +1496,7 @@ async fn resume_closed_child_reopens_open_descendants() { Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth: 1, + agent_path: None, agent_nickname: None, agent_role: Some("explorer".to_string()), })), @@ -1485,6 +1511,7 @@ async fn resume_closed_child_reopens_open_descendants() { Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id: child_thread_id, depth: 2, + agent_path: None, agent_nickname: None, agent_role: Some("worker".to_string()), })), @@ -1524,6 +1551,7 @@ async fn resume_closed_child_reopens_open_descendants() { SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth: 1, + agent_path: None, agent_nickname: None, agent_role: None, }), @@ -1565,6 +1593,7 @@ async fn resume_agent_from_rollout_reopens_open_descendants_after_manager_shutdo Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth: 1, + agent_path: None, agent_nickname: None, agent_role: Some("explorer".to_string()), })), @@ -1579,6 +1608,7 @@ async fn resume_agent_from_rollout_reopens_open_descendants_after_manager_shutdo Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id: child_thread_id, depth: 2, + agent_path: None, agent_nickname: None, agent_role: Some("worker".to_string()), })), @@ -1654,6 +1684,7 @@ async fn resume_agent_from_rollout_uses_edge_data_when_descendant_metadata_sourc Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth: 1, + agent_path: None, agent_nickname: None, agent_role: Some("explorer".to_string()), })), @@ -1668,6 +1699,7 @@ async fn resume_agent_from_rollout_uses_edge_data_when_descendant_metadata_sourc Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id: child_thread_id, depth: 2, + agent_path: None, agent_nickname: None, agent_role: Some("worker".to_string()), })), @@ -1705,6 +1737,7 @@ async fn resume_agent_from_rollout_uses_edge_data_when_descendant_metadata_sourc serde_json::to_string(&SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id: ThreadId::new(), depth: 99, + agent_path: None, agent_nickname: None, agent_role: Some("worker".to_string()), })) @@ -1782,6 +1815,7 @@ async fn resume_agent_from_rollout_skips_descendants_when_parent_resume_fails() Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth: 1, + agent_path: None, agent_nickname: None, agent_role: Some("explorer".to_string()), })), @@ -1796,6 +1830,7 @@ async fn resume_agent_from_rollout_skips_descendants_when_parent_resume_fails() Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id: child_thread_id, depth: 2, + agent_path: None, agent_nickname: None, agent_role: Some("worker".to_string()), })), diff --git a/codex-rs/core/src/agent/guards.rs b/codex-rs/core/src/agent/guards.rs index 12fdc0aeb..665c02ebf 100644 --- a/codex-rs/core/src/agent/guards.rs +++ b/codex-rs/core/src/agent/guards.rs @@ -1,11 +1,13 @@ use crate::error::CodexErr; use crate::error::Result; +use codex_protocol::AgentPath; use codex_protocol::ThreadId; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; use rand::prelude::IndexedRandom; use std::collections::HashMap; use std::collections::HashSet; +use std::collections::hash_map::Entry; use std::sync::Arc; use std::sync::Mutex; use std::sync::atomic::AtomicUsize; @@ -25,12 +27,19 @@ pub(crate) struct Guards { #[derive(Default)] struct ActiveAgents { - threads_set: HashSet, - thread_agent_nicknames: HashMap, + agent_tree: HashMap, used_agent_nicknames: HashSet, nickname_reset_count: usize, } +#[derive(Clone, Debug, Default)] +pub(crate) struct AgentMetadata { + pub(crate) agent_id: Option, + pub(crate) agent_path: Option, + pub(crate) agent_nickname: Option, + pub(crate) agent_role: Option, +} + fn format_agent_nickname(name: &str, nickname_reset_count: usize) -> String { match nickname_reset_count { 0 => name.to_string(), @@ -82,38 +91,83 @@ impl Guards { state: Arc::clone(self), active: true, reserved_agent_nickname: None, + reserved_agent_path: None, }) } pub(crate) fn release_spawned_thread(&self, thread_id: ThreadId) { - let removed = { + let removed_counted_agent = { let mut active_agents = self .active_agents .lock() .unwrap_or_else(std::sync::PoisonError::into_inner); - let removed = active_agents.threads_set.remove(&thread_id); - active_agents.thread_agent_nicknames.remove(&thread_id); - removed + let removed_key = active_agents + .agent_tree + .iter() + .find_map(|(key, metadata)| (metadata.agent_id == Some(thread_id)).then_some(key)) + .cloned(); + removed_key + .and_then(|key| active_agents.agent_tree.remove(key.as_str())) + .is_some_and(|metadata| { + !metadata.agent_path.as_ref().is_some_and(AgentPath::is_root) + }) }; - if removed { + if removed_counted_agent { self.total_count.fetch_sub(1, Ordering::AcqRel); } } - fn register_spawned_thread(&self, thread_id: ThreadId, agent_nickname: Option) { + pub(crate) fn register_root_thread(&self, thread_id: ThreadId) { let mut active_agents = self .active_agents .lock() .unwrap_or_else(std::sync::PoisonError::into_inner); - active_agents.threads_set.insert(thread_id); - if let Some(agent_nickname) = agent_nickname { - active_agents - .used_agent_nicknames - .insert(agent_nickname.clone()); - active_agents - .thread_agent_nicknames - .insert(thread_id, agent_nickname); + active_agents + .agent_tree + .entry(AgentPath::ROOT.to_string()) + .or_insert_with(|| AgentMetadata { + agent_id: Some(thread_id), + agent_path: Some(AgentPath::root()), + ..Default::default() + }); + } + + pub(crate) fn agent_id_for_path(&self, agent_path: &AgentPath) -> Option { + self.active_agents + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .agent_tree + .get(agent_path.as_str()) + .and_then(|metadata| metadata.agent_id) + } + + pub(crate) fn agent_metadata_for_thread(&self, thread_id: ThreadId) -> Option { + self.active_agents + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .agent_tree + .values() + .find(|metadata| metadata.agent_id == Some(thread_id)) + .cloned() + } + + fn register_spawned_thread(&self, agent_metadata: AgentMetadata) { + let Some(thread_id) = agent_metadata.agent_id else { + return; + }; + let mut active_agents = self + .active_agents + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let key = agent_metadata + .agent_path + .as_ref() + .map(ToString::to_string) + .unwrap_or_else(|| format!("thread:{thread_id}")); + if let Some(agent_nickname) = agent_metadata.agent_nickname.clone() { + active_agents.used_agent_nicknames.insert(agent_nickname); } + active_agents.agent_tree.insert(key, agent_metadata); } fn reserve_agent_nickname(&self, names: &[&str], preferred: Option<&str>) -> Option { @@ -156,6 +210,39 @@ impl Guards { Some(agent_nickname) } + fn reserve_agent_path(&self, agent_path: &AgentPath) -> Result<()> { + let mut active_agents = self + .active_agents + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + match active_agents.agent_tree.entry(agent_path.to_string()) { + Entry::Occupied(_) => Err(CodexErr::UnsupportedOperation(format!( + "agent path `{agent_path}` already exists" + ))), + Entry::Vacant(entry) => { + entry.insert(AgentMetadata { + agent_path: Some(agent_path.clone()), + ..Default::default() + }); + Ok(()) + } + } + } + + fn release_reserved_agent_path(&self, agent_path: &AgentPath) { + let mut active_agents = self + .active_agents + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + if active_agents + .agent_tree + .get(agent_path.as_str()) + .is_some_and(|metadata| metadata.agent_id.is_none()) + { + active_agents.agent_tree.remove(agent_path.as_str()); + } + } + fn try_increment_spawned(&self, max_threads: usize) -> bool { let mut current = self.total_count.load(Ordering::Acquire); loop { @@ -179,13 +266,10 @@ pub(crate) struct SpawnReservation { state: Arc, active: bool, reserved_agent_nickname: Option, + reserved_agent_path: Option, } impl SpawnReservation { - pub(crate) fn reserve_agent_nickname(&mut self, names: &[&str]) -> Result { - self.reserve_agent_nickname_with_preference(names, /*preferred*/ None) - } - pub(crate) fn reserve_agent_nickname_with_preference( &mut self, names: &[&str], @@ -201,18 +285,16 @@ impl SpawnReservation { Ok(agent_nickname) } - pub(crate) fn commit(self, thread_id: ThreadId) { - self.commit_with_agent_nickname(thread_id, /*agent_nickname*/ None); + pub(crate) fn reserve_agent_path(&mut self, agent_path: &AgentPath) -> Result<()> { + self.state.reserve_agent_path(agent_path)?; + self.reserved_agent_path = Some(agent_path.clone()); + Ok(()) } - pub(crate) fn commit_with_agent_nickname( - mut self, - thread_id: ThreadId, - agent_nickname: Option, - ) { - let agent_nickname = self.reserved_agent_nickname.take().or(agent_nickname); - self.state - .register_spawned_thread(thread_id, agent_nickname); + pub(crate) fn commit(mut self, agent_metadata: AgentMetadata) { + self.reserved_agent_nickname = None; + self.reserved_agent_path = None; + self.state.register_spawned_thread(agent_metadata); self.active = false; } } @@ -220,6 +302,9 @@ impl SpawnReservation { impl Drop for SpawnReservation { fn drop(&mut self) { if self.active { + if let Some(agent_path) = self.reserved_agent_path.take() { + self.state.release_reserved_agent_path(&agent_path); + } self.state.total_count.fetch_sub(1, Ordering::AcqRel); } } diff --git a/codex-rs/core/src/agent/guards_tests.rs b/codex-rs/core/src/agent/guards_tests.rs index 53bb5f3b3..9da4cec84 100644 --- a/codex-rs/core/src/agent/guards_tests.rs +++ b/codex-rs/core/src/agent/guards_tests.rs @@ -1,7 +1,19 @@ use super::*; +use codex_protocol::AgentPath; use pretty_assertions::assert_eq; use std::collections::HashSet; +fn agent_path(path: &str) -> AgentPath { + AgentPath::try_from(path).expect("valid agent path") +} + +fn agent_metadata(thread_id: ThreadId) -> AgentMetadata { + AgentMetadata { + agent_id: Some(thread_id), + ..Default::default() + } +} + #[test] fn format_agent_nickname_adds_ordinals_after_reset() { assert_eq!(format_agent_nickname("Plato", 0), "Plato"); @@ -21,6 +33,7 @@ fn thread_spawn_depth_increments_and_enforces_limit() { let session_source = SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id: ThreadId::new(), depth: 1, + agent_path: None, agent_nickname: None, agent_role: None, }); @@ -52,7 +65,7 @@ fn commit_holds_slot_until_release() { let guards = Arc::new(Guards::default()); let reservation = guards.reserve_spawn_slot(Some(1)).expect("reserve slot"); let thread_id = ThreadId::new(); - reservation.commit(thread_id); + reservation.commit(agent_metadata(thread_id)); let err = match guards.reserve_spawn_slot(Some(1)) { Ok(_) => panic!("limit should be enforced"), @@ -75,7 +88,7 @@ fn release_ignores_unknown_thread_id() { let guards = Arc::new(Guards::default()); let reservation = guards.reserve_spawn_slot(Some(1)).expect("reserve slot"); let thread_id = ThreadId::new(); - reservation.commit(thread_id); + reservation.commit(agent_metadata(thread_id)); guards.release_spawned_thread(ThreadId::new()); @@ -100,13 +113,13 @@ fn release_is_idempotent_for_registered_threads() { let guards = Arc::new(Guards::default()); let reservation = guards.reserve_spawn_slot(Some(1)).expect("reserve slot"); let first_id = ThreadId::new(); - reservation.commit(first_id); + reservation.commit(agent_metadata(first_id)); guards.release_spawned_thread(first_id); let reservation = guards.reserve_spawn_slot(Some(1)).expect("slot reused"); let second_id = ThreadId::new(); - reservation.commit(second_id); + reservation.commit(agent_metadata(second_id)); guards.release_spawned_thread(first_id); @@ -131,14 +144,14 @@ fn failed_spawn_keeps_nickname_marked_used() { let guards = Arc::new(Guards::default()); let mut reservation = guards.reserve_spawn_slot(None).expect("reserve slot"); let agent_nickname = reservation - .reserve_agent_nickname(&["alpha"]) + .reserve_agent_nickname_with_preference(&["alpha"], /*preferred*/ None) .expect("reserve agent name"); assert_eq!(agent_nickname, "alpha"); drop(reservation); let mut reservation = guards.reserve_spawn_slot(None).expect("reserve slot"); let agent_nickname = reservation - .reserve_agent_nickname(&["alpha", "beta"]) + .reserve_agent_nickname_with_preference(&["alpha", "beta"], /*preferred*/ None) .expect("unused name should still be preferred"); assert_eq!(agent_nickname, "beta"); } @@ -148,17 +161,17 @@ fn agent_nickname_resets_used_pool_when_exhausted() { let guards = Arc::new(Guards::default()); let mut first = guards.reserve_spawn_slot(None).expect("reserve first slot"); let first_name = first - .reserve_agent_nickname(&["alpha"]) + .reserve_agent_nickname_with_preference(&["alpha"], /*preferred*/ None) .expect("reserve first agent name"); let first_id = ThreadId::new(); - first.commit(first_id); + first.commit(agent_metadata(first_id)); assert_eq!(first_name, "alpha"); let mut second = guards .reserve_spawn_slot(None) .expect("reserve second slot"); let second_name = second - .reserve_agent_nickname(&["alpha"]) + .reserve_agent_nickname_with_preference(&["alpha"], /*preferred*/ None) .expect("name should be reused after pool reset"); assert_eq!(second_name, "alpha the 2nd"); let active_agents = guards @@ -174,10 +187,10 @@ fn released_nickname_stays_used_until_pool_reset() { let mut first = guards.reserve_spawn_slot(None).expect("reserve first slot"); let first_name = first - .reserve_agent_nickname(&["alpha"]) + .reserve_agent_nickname_with_preference(&["alpha"], /*preferred*/ None) .expect("reserve first agent name"); let first_id = ThreadId::new(); - first.commit(first_id); + first.commit(agent_metadata(first_id)); assert_eq!(first_name, "alpha"); guards.release_spawned_thread(first_id); @@ -186,16 +199,16 @@ fn released_nickname_stays_used_until_pool_reset() { .reserve_spawn_slot(None) .expect("reserve second slot"); let second_name = second - .reserve_agent_nickname(&["alpha", "beta"]) + .reserve_agent_nickname_with_preference(&["alpha", "beta"], /*preferred*/ None) .expect("released name should still be marked used"); assert_eq!(second_name, "beta"); let second_id = ThreadId::new(); - second.commit(second_id); + second.commit(agent_metadata(second_id)); guards.release_spawned_thread(second_id); let mut third = guards.reserve_spawn_slot(None).expect("reserve third slot"); let third_name = third - .reserve_agent_nickname(&["alpha", "beta"]) + .reserve_agent_nickname_with_preference(&["alpha", "beta"], /*preferred*/ None) .expect("pool reset should permit a duplicate"); let expected_names = HashSet::from(["alpha the 2nd".to_string(), "beta the 2nd".to_string()]); assert!(expected_names.contains(&third_name)); @@ -212,10 +225,10 @@ fn repeated_resets_advance_the_ordinal_suffix() { let mut first = guards.reserve_spawn_slot(None).expect("reserve first slot"); let first_name = first - .reserve_agent_nickname(&["Plato"]) + .reserve_agent_nickname_with_preference(&["Plato"], /*preferred*/ None) .expect("reserve first agent name"); let first_id = ThreadId::new(); - first.commit(first_id); + first.commit(agent_metadata(first_id)); assert_eq!(first_name, "Plato"); guards.release_spawned_thread(first_id); @@ -223,16 +236,16 @@ fn repeated_resets_advance_the_ordinal_suffix() { .reserve_spawn_slot(None) .expect("reserve second slot"); let second_name = second - .reserve_agent_nickname(&["Plato"]) + .reserve_agent_nickname_with_preference(&["Plato"], /*preferred*/ None) .expect("reserve second agent name"); let second_id = ThreadId::new(); - second.commit(second_id); + second.commit(agent_metadata(second_id)); assert_eq!(second_name, "Plato the 2nd"); guards.release_spawned_thread(second_id); let mut third = guards.reserve_spawn_slot(None).expect("reserve third slot"); let third_name = third - .reserve_agent_nickname(&["Plato"]) + .reserve_agent_nickname_with_preference(&["Plato"], /*preferred*/ None) .expect("reserve third agent name"); assert_eq!(third_name, "Plato the 3rd"); let active_agents = guards @@ -241,3 +254,59 @@ fn repeated_resets_advance_the_ordinal_suffix() { .unwrap_or_else(std::sync::PoisonError::into_inner); assert_eq!(active_agents.nickname_reset_count, 2); } + +#[test] +fn register_root_thread_indexes_root_path() { + let guards = Arc::new(Guards::default()); + let root_thread_id = ThreadId::new(); + + guards.register_root_thread(root_thread_id); + + assert_eq!( + guards.agent_id_for_path(&AgentPath::root()), + Some(root_thread_id) + ); +} + +#[test] +fn reserved_agent_path_is_released_when_spawn_fails() { + let guards = Arc::new(Guards::default()); + let mut first = guards.reserve_spawn_slot(None).expect("reserve first slot"); + first + .reserve_agent_path(&agent_path("/root/researcher")) + .expect("reserve first path"); + drop(first); + + let mut second = guards + .reserve_spawn_slot(None) + .expect("reserve second slot"); + second + .reserve_agent_path(&agent_path("/root/researcher")) + .expect("dropped reservation should free the path"); +} + +#[test] +fn committed_agent_path_is_indexed_until_release() { + let guards = Arc::new(Guards::default()); + let thread_id = ThreadId::new(); + let mut reservation = guards.reserve_spawn_slot(None).expect("reserve slot"); + reservation + .reserve_agent_path(&agent_path("/root/researcher")) + .expect("reserve path"); + reservation.commit(AgentMetadata { + agent_id: Some(thread_id), + agent_path: Some(agent_path("/root/researcher")), + ..Default::default() + }); + + assert_eq!( + guards.agent_id_for_path(&agent_path("/root/researcher")), + Some(thread_id) + ); + + guards.release_spawned_thread(thread_id); + assert_eq!( + guards.agent_id_for_path(&agent_path("/root/researcher")), + None + ); +} diff --git a/codex-rs/core/src/agent/mod.rs b/codex-rs/core/src/agent/mod.rs index 15be909c3..681f993a9 100644 --- a/codex-rs/core/src/agent/mod.rs +++ b/codex-rs/core/src/agent/mod.rs @@ -1,3 +1,4 @@ +pub(crate) mod agent_resolver; pub(crate) mod control; mod guards; pub(crate) mod role; diff --git a/codex-rs/core/src/personality_migration_tests.rs b/codex-rs/core/src/personality_migration_tests.rs index fef1297a9..de1070ad3 100644 --- a/codex-rs/core/src/personality_migration_tests.rs +++ b/codex-rs/core/src/personality_migration_tests.rs @@ -38,6 +38,7 @@ async fn write_session_with_user_event(codex_home: &Path) -> io::Result<()> { originator: "test_originator".to_string(), cli_version: "test_version".to_string(), source: SessionSource::Cli, + agent_path: None, agent_nickname: None, agent_role: None, model_provider: None, diff --git a/codex-rs/core/src/realtime_context_tests.rs b/codex-rs/core/src/realtime_context_tests.rs index a04b77139..a19abf3ed 100644 --- a/codex-rs/core/src/realtime_context_tests.rs +++ b/codex-rs/core/src/realtime_context_tests.rs @@ -23,6 +23,7 @@ fn thread_metadata(cwd: &str, title: &str, first_user_message: &str) -> ThreadMe .single() .expect("valid timestamp"), source: "cli".to_string(), + agent_path: None, agent_nickname: None, agent_role: None, model_provider: "test-provider".to_string(), diff --git a/codex-rs/core/src/rollout/metadata.rs b/codex-rs/core/src/rollout/metadata.rs index d2edfbb0d..5b032d217 100644 --- a/codex-rs/core/src/rollout/metadata.rs +++ b/codex-rs/core/src/rollout/metadata.rs @@ -49,6 +49,7 @@ pub(crate) fn builder_from_session_meta( builder.model_provider = session_meta.meta.model_provider.clone(); builder.agent_nickname = session_meta.meta.agent_nickname.clone(); builder.agent_role = session_meta.meta.agent_role.clone(); + builder.agent_path = session_meta.meta.agent_path.clone(); builder.cwd = session_meta.meta.cwd.clone(); builder.cli_version = Some(session_meta.meta.cli_version.clone()); builder.sandbox_policy = SandboxPolicy::new_read_only_policy(); diff --git a/codex-rs/core/src/rollout/metadata_tests.rs b/codex-rs/core/src/rollout/metadata_tests.rs index 5556d7002..dacd9e67b 100644 --- a/codex-rs/core/src/rollout/metadata_tests.rs +++ b/codex-rs/core/src/rollout/metadata_tests.rs @@ -38,6 +38,7 @@ async fn extract_metadata_from_rollout_uses_session_meta() { originator: "cli".to_string(), cli_version: "0.0.0".to_string(), source: SessionSource::default(), + agent_path: None, agent_nickname: None, agent_role: None, model_provider: Some("openai".to_string()), @@ -88,6 +89,7 @@ async fn extract_metadata_from_rollout_returns_latest_memory_mode() { originator: "cli".to_string(), cli_version: "0.0.0".to_string(), source: SessionSource::default(), + agent_path: None, agent_nickname: None, agent_role: None, model_provider: Some("openai".to_string()), @@ -355,6 +357,7 @@ fn write_rollout_in_sessions_with_cwd( originator: "cli".to_string(), cli_version: "0.0.0".to_string(), source: SessionSource::default(), + agent_path: None, agent_nickname: None, agent_role: None, model_provider: Some("test-provider".to_string()), diff --git a/codex-rs/core/src/rollout/recorder.rs b/codex-rs/core/src/rollout/recorder.rs index 002269d59..72a3e3c63 100644 --- a/codex-rs/core/src/rollout/recorder.rs +++ b/codex-rs/core/src/rollout/recorder.rs @@ -405,6 +405,7 @@ impl RolloutRecorder { cli_version: env!("CARGO_PKG_VERSION").to_string(), agent_nickname: source.get_nickname(), agent_role: source.get_agent_role(), + agent_path: source.get_agent_path().map(Into::into), source, model_provider: Some(config.model_provider_id.clone()), base_instructions: Some(base_instructions), diff --git a/codex-rs/core/src/rollout/tests.rs b/codex-rs/core/src/rollout/tests.rs index c491e2975..44e536e50 100644 --- a/codex-rs/core/src/rollout/tests.rs +++ b/codex-rs/core/src/rollout/tests.rs @@ -1101,6 +1101,7 @@ async fn test_updated_at_uses_file_mtime() -> Result<()> { originator: "test_originator".into(), cli_version: "test_version".into(), source: SessionSource::VSCode, + agent_path: None, agent_nickname: None, agent_role: None, model_provider: Some("test-provider".into()), diff --git a/codex-rs/core/src/session_prefix.rs b/codex-rs/core/src/session_prefix.rs index db3ac00a6..42f213a1d 100644 --- a/codex-rs/core/src/session_prefix.rs +++ b/codex-rs/core/src/session_prefix.rs @@ -4,18 +4,25 @@ use codex_protocol::protocol::AgentStatus; /// messages but are not user intent. use crate::contextual_user_message::SUBAGENT_NOTIFICATION_FRAGMENT; -pub(crate) fn format_subagent_notification_message(agent_id: &str, status: &AgentStatus) -> String { +// TODO(jif) unify with structured schema +pub(crate) fn format_subagent_notification_message( + agent_reference: &str, + status: &AgentStatus, +) -> String { let payload_json = serde_json::json!({ - "agent_id": agent_id, + "agent_path": agent_reference, "status": status, }) .to_string(); SUBAGENT_NOTIFICATION_FRAGMENT.wrap(payload_json) } -pub(crate) fn format_subagent_context_line(agent_id: &str, agent_nickname: Option<&str>) -> String { +pub(crate) fn format_subagent_context_line( + agent_reference: &str, + agent_nickname: Option<&str>, +) -> String { match agent_nickname.filter(|nickname| !nickname.is_empty()) { - Some(agent_nickname) => format!("- {agent_id}: {agent_nickname}"), - None => format!("- {agent_id}"), + Some(agent_nickname) => format!("- {agent_reference}: {agent_nickname}"), + None => format!("- {agent_reference}"), } } diff --git a/codex-rs/core/src/tools/handlers/multi_agents.rs b/codex-rs/core/src/tools/handlers/multi_agents.rs index 8fa990a3b..897af0d5f 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents.rs @@ -6,6 +6,8 @@ //! then optionally layer role-specific config on top. use crate::agent::AgentStatus; +use crate::agent::agent_resolver::resolve_agent_target; +use crate::agent::agent_resolver::resolve_agent_targets; use crate::agent::exceeds_thread_spawn_depth_limit; use crate::codex::Session; use crate::codex::TurnContext; @@ -22,6 +24,7 @@ use crate::tools::registry::ToolHandler; use crate::tools::registry::ToolKind; use async_trait::async_trait; use codex_features::Feature; +use codex_protocol::AgentPath; use codex_protocol::ThreadId; use codex_protocol::models::BaseInstructions; use codex_protocol::models::ResponseInputItem; @@ -59,11 +62,6 @@ pub(crate) const MIN_WAIT_TIMEOUT_MS: i64 = 10_000; pub(crate) const DEFAULT_WAIT_TIMEOUT_MS: i64 = 30_000; pub(crate) const MAX_WAIT_TIMEOUT_MS: i64 = 3600 * 1000; -#[derive(Debug, Deserialize)] -struct CloseAgentArgs { - id: String, -} - fn function_arguments(payload: ToolPayload) -> Result { match payload { ToolPayload::Function { arguments } => Ok(arguments), @@ -111,11 +109,6 @@ mod send_input; mod spawn; pub(crate) mod wait; -fn agent_id(id: &str) -> Result { - ThreadId::from_string(id) - .map_err(|e| FunctionCallError::RespondToModel(format!("invalid agent id {id}: {e:?}"))) -} - fn build_wait_agent_statuses( statuses: &HashMap, receiver_agents: &[CollabAgentRef], @@ -155,9 +148,10 @@ fn build_wait_agent_statuses( fn collab_spawn_error(err: CodexErr) -> FunctionCallError { match err { - CodexErr::UnsupportedOperation(_) => { + CodexErr::UnsupportedOperation(message) if message == "thread manager dropped" => { FunctionCallError::RespondToModel("collab manager unavailable".to_string()) } + CodexErr::UnsupportedOperation(message) => FunctionCallError::RespondToModel(message), err => FunctionCallError::RespondToModel(format!("collab spawn failed: {err}")), } } @@ -179,15 +173,28 @@ fn collab_agent_error(agent_id: ThreadId, err: CodexErr) -> FunctionCallError { fn thread_spawn_source( parent_thread_id: ThreadId, + parent_session_source: &SessionSource, depth: i32, agent_role: Option<&str>, -) -> SessionSource { - SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + task_name: Option, +) -> Result { + let agent_path = task_name + .as_deref() + .map(|task_name| { + parent_session_source + .get_agent_path() + .unwrap_or_else(AgentPath::root) + .join(task_name) + .map_err(FunctionCallError::RespondToModel) + }) + .transpose()?; + Ok(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth, + agent_path, agent_nickname: None, agent_role: agent_role.map(str::to_string), - }) + })) } fn parse_collab_input( diff --git a/codex-rs/core/src/tools/handlers/multi_agents/close_agent.rs b/codex-rs/core/src/tools/handlers/multi_agents/close_agent.rs index f65fdd644..022faa7b7 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents/close_agent.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents/close_agent.rs @@ -24,13 +24,12 @@ impl ToolHandler for Handler { } = invocation; let arguments = function_arguments(payload)?; let args: CloseAgentArgs = parse_arguments(&arguments)?; - let agent_id = agent_id(&args.id)?; - let (receiver_agent_nickname, receiver_agent_role) = session + let agent_id = resolve_agent_target(&session, &turn, &args.target).await?; + let receiver_agent = session .services .agent_control - .get_agent_nickname_and_role(agent_id) - .await - .unwrap_or((None, None)); + .get_agent_metadata(agent_id) + .unwrap_or_default(); session .send_event( &turn, @@ -58,8 +57,8 @@ impl ToolHandler for Handler { call_id: call_id.clone(), sender_thread_id: session.conversation_id, receiver_thread_id: agent_id, - receiver_agent_nickname: receiver_agent_nickname.clone(), - receiver_agent_role: receiver_agent_role.clone(), + receiver_agent_nickname: receiver_agent.agent_nickname.clone(), + receiver_agent_role: receiver_agent.agent_role.clone(), status, } .into(), @@ -82,8 +81,8 @@ impl ToolHandler for Handler { call_id, sender_thread_id: session.conversation_id, receiver_thread_id: agent_id, - receiver_agent_nickname, - receiver_agent_role, + receiver_agent_nickname: receiver_agent.agent_nickname, + receiver_agent_role: receiver_agent.agent_role, status: status.clone(), } .into(), @@ -119,3 +118,8 @@ impl ToolOutput for CloseAgentResult { tool_output_code_mode_result(self, "close_agent") } } + +#[derive(Debug, Deserialize)] +struct CloseAgentArgs { + target: String, +} diff --git a/codex-rs/core/src/tools/handlers/multi_agents/resume_agent.rs b/codex-rs/core/src/tools/handlers/multi_agents/resume_agent.rs index f8a339cc6..85e879c1b 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents/resume_agent.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents/resume_agent.rs @@ -25,13 +25,14 @@ impl ToolHandler for Handler { } = invocation; let arguments = function_arguments(payload)?; let args: ResumeAgentArgs = parse_arguments(&arguments)?; - let receiver_thread_id = agent_id(&args.id)?; - let (receiver_agent_nickname, receiver_agent_role) = session + let receiver_thread_id = ThreadId::from_string(&args.id).map_err(|err| { + FunctionCallError::RespondToModel(format!("invalid agent id {}: {err:?}", args.id)) + })?; + let receiver_agent = session .services .agent_control - .get_agent_nickname_and_role(receiver_thread_id) - .await - .unwrap_or((None, None)); + .get_agent_metadata(receiver_thread_id) + .unwrap_or_default(); let child_depth = next_thread_spawn_depth(&turn.session_source); let max_depth = turn.config.agent_max_depth; if exceeds_thread_spawn_depth_limit(child_depth, max_depth) { @@ -47,8 +48,8 @@ impl ToolHandler for Handler { call_id: call_id.clone(), sender_thread_id: session.conversation_id, receiver_thread_id, - receiver_agent_nickname: receiver_agent_nickname.clone(), - receiver_agent_role: receiver_agent_role.clone(), + receiver_agent_nickname: receiver_agent.agent_nickname.clone(), + receiver_agent_role: receiver_agent.agent_role.clone(), } .into(), ) @@ -59,11 +60,22 @@ impl ToolHandler for Handler { .agent_control .get_status(receiver_thread_id) .await; - let error = if matches!(status, AgentStatus::NotFound) { + let (receiver_agent, error) = if matches!(status, AgentStatus::NotFound) { match try_resume_closed_agent(&session, &turn, receiver_thread_id, child_depth).await { - Ok(resumed_status) => { - status = resumed_status; - None + Ok(()) => { + status = session + .services + .agent_control + .get_status(receiver_thread_id) + .await; + ( + session + .services + .agent_control + .get_agent_metadata(receiver_thread_id) + .unwrap_or(receiver_agent), + None, + ) } Err(err) => { status = session @@ -71,19 +83,12 @@ impl ToolHandler for Handler { .agent_control .get_status(receiver_thread_id) .await; - Some(err) + (receiver_agent, Some(err)) } } } else { - None + (receiver_agent, None) }; - - let (receiver_agent_nickname, receiver_agent_role) = session - .services - .agent_control - .get_agent_nickname_and_role(receiver_thread_id) - .await - .unwrap_or((receiver_agent_nickname, receiver_agent_role)); session .send_event( &turn, @@ -91,8 +96,8 @@ impl ToolHandler for Handler { call_id, sender_thread_id: session.conversation_id, receiver_thread_id, - receiver_agent_nickname, - receiver_agent_role, + receiver_agent_nickname: receiver_agent.agent_nickname, + receiver_agent_role: receiver_agent.agent_role, status: status.clone(), } .into(), @@ -142,9 +147,9 @@ async fn try_resume_closed_agent( turn: &Arc, receiver_thread_id: ThreadId, child_depth: i32, -) -> Result { +) -> Result<(), FunctionCallError> { let config = build_agent_resume_config(turn.as_ref(), child_depth)?; - let resumed_thread_id = session + session .services .agent_control .resume_agent_from_rollout( @@ -152,16 +157,13 @@ async fn try_resume_closed_agent( receiver_thread_id, thread_spawn_source( session.conversation_id, + &turn.session_source, child_depth, /*agent_role*/ None, - ), + /*task_name*/ None, + )?, ) .await - .map_err(|err| collab_agent_error(receiver_thread_id, err))?; - - Ok(session - .services - .agent_control - .get_status(resumed_thread_id) - .await) + .map(|_| ()) + .map_err(|err| collab_agent_error(receiver_thread_id, err)) } diff --git a/codex-rs/core/src/tools/handlers/multi_agents/send_input.rs b/codex-rs/core/src/tools/handlers/multi_agents/send_input.rs index 0b6b06f21..8fc4dd515 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents/send_input.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents/send_input.rs @@ -24,15 +24,14 @@ impl ToolHandler for Handler { } = invocation; let arguments = function_arguments(payload)?; let args: SendInputArgs = parse_arguments(&arguments)?; - let receiver_thread_id = agent_id(&args.id)?; + let receiver_thread_id = resolve_agent_target(&session, &turn, &args.target).await?; let input_items = parse_collab_input(args.message, args.items)?; let prompt = input_preview(&input_items); - let (receiver_agent_nickname, receiver_agent_role) = session + let receiver_agent = session .services .agent_control - .get_agent_nickname_and_role(receiver_thread_id) - .await - .unwrap_or((None, None)); + .get_agent_metadata(receiver_thread_id) + .unwrap_or_default(); if args.interrupt { session .services @@ -71,8 +70,8 @@ impl ToolHandler for Handler { call_id, sender_thread_id: session.conversation_id, receiver_thread_id, - receiver_agent_nickname, - receiver_agent_role, + receiver_agent_nickname: receiver_agent.agent_nickname, + receiver_agent_role: receiver_agent.agent_role, prompt, status, } @@ -87,7 +86,7 @@ impl ToolHandler for Handler { #[derive(Debug, Deserialize)] struct SendInputArgs { - id: String, + target: String, message: Option, items: Option>, #[serde(default)] diff --git a/codex-rs/core/src/tools/handlers/multi_agents/spawn.rs b/codex-rs/core/src/tools/handlers/multi_agents/spawn.rs index 7a27cd94c..53ab4d35f 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents/spawn.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents/spawn.rs @@ -77,26 +77,29 @@ impl ToolHandler for Handler { let result = session .services .agent_control - .spawn_agent_with_options( + .spawn_agent_with_metadata( config, input_items, Some(thread_spawn_source( session.conversation_id, + &turn.session_source, child_depth, role_name, - )), + args.task_name.clone(), + )?), SpawnAgentOptions { fork_parent_spawn_call_id: args.fork_context.then(|| call_id.clone()), }, ) .await .map_err(collab_spawn_error); - let (new_thread_id, status) = match &result { - Ok(thread_id) => ( - Some(*thread_id), - session.services.agent_control.get_status(*thread_id).await, + let (new_thread_id, new_agent_metadata, status) = match &result { + Ok(spawned_agent) => ( + Some(spawned_agent.thread_id), + Some(spawned_agent.metadata.clone()), + spawned_agent.status.clone(), ), - Err(_) => (None, AgentStatus::NotFound), + Err(_) => (None, None, AgentStatus::NotFound), }; let agent_snapshot = match new_thread_id { Some(thread_id) => { @@ -108,19 +111,20 @@ impl ToolHandler for Handler { } None => None, }; - let (new_agent_nickname, new_agent_role) = match (&agent_snapshot, new_thread_id) { - (Some(snapshot), _) => ( - snapshot.session_source.get_nickname(), - snapshot.session_source.get_agent_role(), - ), - (None, Some(thread_id)) => session - .services - .agent_control - .get_agent_nickname_and_role(thread_id) - .await - .unwrap_or((None, None)), - (None, None) => (None, None), - }; + let (new_agent_path, new_agent_nickname, new_agent_role) = + match (&agent_snapshot, new_agent_metadata) { + (Some(snapshot), _) => ( + snapshot.session_source.get_agent_path().map(String::from), + snapshot.session_source.get_nickname(), + snapshot.session_source.get_agent_role(), + ), + (None, Some(metadata)) => ( + metadata.agent_path.map(String::from), + metadata.agent_nickname, + metadata.agent_role, + ), + (None, None) => (None, None, None), + }; let effective_model = agent_snapshot .as_ref() .map(|snapshot| snapshot.model.clone()) @@ -130,6 +134,7 @@ impl ToolHandler for Handler { .and_then(|snapshot| snapshot.reasoning_effort) .unwrap_or(args.reasoning_effort.unwrap_or_default()); let nickname = new_agent_nickname.clone(); + let task_name = new_agent_path.clone(); session .send_event( &turn, @@ -147,7 +152,7 @@ impl ToolHandler for Handler { .into(), ) .await; - let new_thread_id = result?; + let new_thread_id = result?.thread_id; let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME); turn.session_telemetry.counter( "codex.multi_agent.spawn", @@ -156,7 +161,8 @@ impl ToolHandler for Handler { ); Ok(SpawnAgentResult { - agent_id: new_thread_id.to_string(), + agent_id: task_name.is_none().then(|| new_thread_id.to_string()), + task_name, nickname, }) } @@ -166,6 +172,7 @@ impl ToolHandler for Handler { struct SpawnAgentArgs { message: Option, items: Option>, + task_name: Option, agent_type: Option, model: Option, reasoning_effort: Option, @@ -175,7 +182,8 @@ struct SpawnAgentArgs { #[derive(Debug, Serialize)] pub(crate) struct SpawnAgentResult { - agent_id: String, + agent_id: Option, + task_name: Option, nickname: Option, } diff --git a/codex-rs/core/src/tools/handlers/multi_agents/wait.rs b/codex-rs/core/src/tools/handlers/multi_agents/wait.rs index 2d655ce86..8458402ce 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents/wait.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents/wait.rs @@ -34,28 +34,27 @@ impl ToolHandler for Handler { } = invocation; let arguments = function_arguments(payload)?; let args: WaitArgs = parse_arguments(&arguments)?; - if args.ids.is_empty() { - return Err(FunctionCallError::RespondToModel( - "ids must be non-empty".to_owned(), - )); - } - let receiver_thread_ids = args - .ids - .iter() - .map(|id| agent_id(id)) - .collect::, _>>()?; + let receiver_thread_ids = resolve_agent_targets(&session, &turn, args.targets).await?; let mut receiver_agents = Vec::with_capacity(receiver_thread_ids.len()); + let mut target_by_thread_id = HashMap::with_capacity(receiver_thread_ids.len()); for receiver_thread_id in &receiver_thread_ids { - let (agent_nickname, agent_role) = session + let agent_metadata = session .services .agent_control - .get_agent_nickname_and_role(*receiver_thread_id) - .await - .unwrap_or((None, None)); + .get_agent_metadata(*receiver_thread_id) + .unwrap_or_default(); + target_by_thread_id.insert( + *receiver_thread_id, + agent_metadata + .agent_path + .as_ref() + .map(ToString::to_string) + .unwrap_or_else(|| receiver_thread_id.to_string()), + ); receiver_agents.push(CollabAgentRef { thread_id: *receiver_thread_id, - agent_nickname, - agent_role, + agent_nickname: agent_metadata.agent_nickname, + agent_role: agent_metadata.agent_role, }); } @@ -151,11 +150,20 @@ impl ToolHandler for Handler { results }; - let statuses_map = statuses.clone().into_iter().collect::>(); - let agent_statuses = build_wait_agent_statuses(&statuses_map, &receiver_agents); + let timed_out = statuses.is_empty(); + let statuses_by_id = statuses.clone().into_iter().collect::>(); + let agent_statuses = build_wait_agent_statuses(&statuses_by_id, &receiver_agents); let result = WaitAgentResult { - status: statuses_map.clone(), - timed_out: statuses.is_empty(), + status: statuses + .into_iter() + .filter_map(|(thread_id, status)| { + target_by_thread_id + .get(&thread_id) + .cloned() + .map(|target| (target, status)) + }) + .collect(), + timed_out, }; session @@ -165,7 +173,7 @@ impl ToolHandler for Handler { sender_thread_id: session.conversation_id, call_id, agent_statuses, - statuses: statuses_map, + statuses: statuses_by_id, } .into(), ) @@ -177,13 +185,14 @@ impl ToolHandler for Handler { #[derive(Debug, Deserialize)] struct WaitArgs { - ids: Vec, + #[serde(default)] + targets: Vec, timeout_ms: Option, } #[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] pub(crate) struct WaitAgentResult { - pub(crate) status: HashMap, + pub(crate) status: HashMap, pub(crate) timed_out: bool, } diff --git a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs index abd491efd..d8b2a713a 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs @@ -57,6 +57,10 @@ fn function_payload(args: serde_json::Value) -> ToolPayload { } } +fn parse_agent_id(id: &str) -> ThreadId { + ThreadId::from_string(id).expect("agent id should be valid") +} + fn thread_manager() -> ThreadManager { ThreadManager::with_models_provider_for_tests( CodexAuth::from_api_key("dummy"), @@ -195,7 +199,7 @@ async fn spawn_agent_uses_explorer_role_and_preserves_approval_policy() { let (content, _) = expect_text_output(output); let result: SpawnAgentResult = serde_json::from_str(&content).expect("spawn_agent result should be json"); - let agent_id = agent_id(&result.agent_id).expect("agent_id should be valid"); + let agent_id = parse_agent_id(&result.agent_id); assert!( result .nickname @@ -212,6 +216,33 @@ async fn spawn_agent_uses_explorer_role_and_preserves_approval_policy() { assert_eq!(snapshot.model_provider_id, "ollama"); } +#[tokio::test] +async fn spawn_agent_includes_task_name_key_when_not_named() { + let (mut session, turn) = make_session_and_context().await; + let manager = thread_manager(); + session.services.agent_control = manager.agent_control(); + + let output = SpawnAgentHandler + .handle(invocation( + Arc::new(session), + Arc::new(turn), + "spawn_agent", + function_payload(json!({ + "message": "inspect this repo" + })), + )) + .await + .expect("spawn_agent should succeed"); + let (content, success) = expect_text_output(output); + let result: serde_json::Value = + serde_json::from_str(&content).expect("spawn_agent result should be json"); + + assert!(result["agent_id"].is_string()); + assert_eq!(result["task_name"], serde_json::Value::Null); + assert!(result.get("nickname").is_some()); + assert_eq!(success, Some(true)); +} + #[tokio::test] async fn spawn_agent_errors_when_manager_dropped() { let (session, turn) = make_session_and_context().await; @@ -230,6 +261,160 @@ async fn spawn_agent_errors_when_manager_dropped() { ); } +#[tokio::test] +async fn multi_agent_v2_spawn_returns_path_and_send_input_accepts_relative_path() { + #[derive(Debug, Deserialize)] + struct SpawnAgentResult { + task_name: String, + nickname: Option, + } + + let (mut session, mut turn) = make_session_and_context().await; + let manager = thread_manager(); + let root = manager + .start_thread((*turn.config).clone()) + .await + .expect("root thread should start"); + session.services.agent_control = manager.agent_control(); + session.conversation_id = root.thread_id; + let mut config = (*turn.config).clone(); + config + .features + .enable(Feature::MultiAgentV2) + .expect("test config should allow feature update"); + turn.config = Arc::new(config); + + let session = Arc::new(session); + let turn = Arc::new(turn); + let spawn_output = SpawnAgentHandler + .handle(invocation( + session.clone(), + turn.clone(), + "spawn_agent", + function_payload(json!({ + "message": "inspect this repo", + "task_name": "test_process" + })), + )) + .await + .expect("spawn_agent should succeed"); + let (content, _) = expect_text_output(spawn_output); + let spawn_result: SpawnAgentResult = + serde_json::from_str(&content).expect("spawn result should parse"); + assert_eq!(spawn_result.task_name, "/root/test_process"); + assert!(spawn_result.nickname.is_some()); + + let child_thread_id = session + .services + .agent_control + .resolve_agent_reference( + session.conversation_id, + &turn.session_source, + "test_process", + ) + .await + .expect("relative path should resolve"); + let child_snapshot = manager + .get_thread(child_thread_id) + .await + .expect("child thread should exist") + .config_snapshot() + .await; + assert_eq!( + child_snapshot.session_source.get_agent_path().as_deref(), + Some("/root/test_process") + ); + + SendInputHandler + .handle(invocation( + session.clone(), + turn.clone(), + "send_input", + function_payload(json!({ + "target": "test_process", + "message": "continue" + })), + )) + .await + .expect("send_input should accept v2 path"); +} + +#[tokio::test] +async fn multi_agent_v2_spawn_includes_agent_id_key_when_named() { + let (mut session, mut turn) = make_session_and_context().await; + let manager = thread_manager(); + let root = manager + .start_thread((*turn.config).clone()) + .await + .expect("root thread should start"); + session.services.agent_control = manager.agent_control(); + session.conversation_id = root.thread_id; + let mut config = (*turn.config).clone(); + config + .features + .enable(Feature::MultiAgentV2) + .expect("test config should allow feature update"); + turn.config = Arc::new(config); + + let output = SpawnAgentHandler + .handle(invocation( + Arc::new(session), + Arc::new(turn), + "spawn_agent", + function_payload(json!({ + "message": "inspect this repo", + "task_name": "test_process" + })), + )) + .await + .expect("spawn_agent should succeed"); + let (content, success) = expect_text_output(output); + let result: serde_json::Value = + serde_json::from_str(&content).expect("spawn_agent result should be json"); + + assert_eq!(result["agent_id"], serde_json::Value::Null); + assert_eq!(result["task_name"], "/root/test_process"); + assert!(result.get("nickname").is_some()); + assert_eq!(success, Some(true)); +} + +#[tokio::test] +async fn multi_agent_v2_spawn_surfaces_task_name_validation_errors() { + let (mut session, mut turn) = make_session_and_context().await; + let manager = thread_manager(); + let root = manager + .start_thread((*turn.config).clone()) + .await + .expect("root thread should start"); + session.services.agent_control = manager.agent_control(); + session.conversation_id = root.thread_id; + let mut config = (*turn.config).clone(); + config + .features + .enable(Feature::MultiAgentV2) + .expect("test config should allow feature update"); + turn.config = Arc::new(config); + + let invocation = invocation( + Arc::new(session), + Arc::new(turn), + "spawn_agent", + function_payload(json!({ + "message": "inspect this repo", + "task_name": "BadName" + })), + ); + let Err(err) = SpawnAgentHandler.handle(invocation).await else { + panic!("invalid agent name should be rejected"); + }; + assert_eq!( + err, + FunctionCallError::RespondToModel( + "agent_name must use only lowercase letters, digits, and underscores".to_string() + ) + ); +} + #[tokio::test] async fn spawn_agent_reapplies_runtime_sandbox_after_role_config() { fn pick_allowed_sandbox_policy( @@ -293,7 +478,7 @@ async fn spawn_agent_reapplies_runtime_sandbox_after_role_config() { let (content, _) = expect_text_output(output); let result: SpawnAgentResult = serde_json::from_str(&content).expect("spawn_agent result should be json"); - let agent_id = agent_id(&result.agent_id).expect("agent_id should be valid"); + let agent_id = parse_agent_id(&result.agent_id); assert!( result .nickname @@ -334,6 +519,7 @@ async fn spawn_agent_rejects_when_depth_limit_exceeded() { turn.session_source = SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id: session.conversation_id, depth: max_depth, + agent_path: None, agent_nickname: None, agent_role: None, }); @@ -373,6 +559,7 @@ async fn spawn_agent_allows_depth_up_to_configured_max_depth() { turn.session_source = SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id: session.conversation_id, depth: DEFAULT_AGENT_MAX_DEPTH, + agent_path: None, agent_nickname: None, agent_role: None, }); @@ -407,7 +594,7 @@ async fn send_input_rejects_empty_message() { Arc::new(session), Arc::new(turn), "send_input", - function_payload(json!({"id": ThreadId::new().to_string(), "message": ""})), + function_payload(json!({"target": ThreadId::new().to_string(), "message": ""})), ); let Err(err) = SendInputHandler.handle(invocation).await else { panic!("empty message should be rejected"); @@ -426,7 +613,7 @@ async fn send_input_rejects_when_message_and_items_are_both_set() { Arc::new(turn), "send_input", function_payload(json!({ - "id": ThreadId::new().to_string(), + "target": ThreadId::new().to_string(), "message": "hello", "items": [{"type": "mention", "name": "drive", "path": "app://drive"}] })), @@ -449,7 +636,7 @@ async fn send_input_rejects_invalid_id() { Arc::new(session), Arc::new(turn), "send_input", - function_payload(json!({"id": "not-a-uuid", "message": "hi"})), + function_payload(json!({"target": "not-a-uuid", "message": "hi"})), ); let Err(err) = SendInputHandler.handle(invocation).await else { panic!("invalid id should be rejected"); @@ -457,7 +644,10 @@ async fn send_input_rejects_invalid_id() { let FunctionCallError::RespondToModel(msg) = err else { panic!("expected respond-to-model error"); }; - assert!(msg.starts_with("invalid agent id not-a-uuid:")); + assert_eq!( + msg, + "agent_name must use only lowercase letters, digits, and underscores" + ); } #[tokio::test] @@ -470,7 +660,7 @@ async fn send_input_reports_missing_agent() { Arc::new(session), Arc::new(turn), "send_input", - function_payload(json!({"id": agent_id.to_string(), "message": "hi"})), + function_payload(json!({"target": agent_id.to_string(), "message": "hi"})), ); let Err(err) = SendInputHandler.handle(invocation).await else { panic!("missing agent should be reported"); @@ -494,7 +684,7 @@ async fn send_input_interrupts_before_prompt() { Arc::new(turn), "send_input", function_payload(json!({ - "id": agent_id.to_string(), + "target": agent_id.to_string(), "message": "hi", "interrupt": true })), @@ -533,7 +723,7 @@ async fn send_input_accepts_structured_items() { Arc::new(turn), "send_input", function_payload(json!({ - "id": agent_id.to_string(), + "target": agent_id.to_string(), "items": [ {"type": "mention", "name": "drive", "path": "app://google_drive"}, {"type": "text", "text": "read the folder"} @@ -703,7 +893,7 @@ async fn resume_agent_restores_closed_agent_and_accepts_send_input() { session, turn, "send_input", - function_payload(json!({"id": agent_id.to_string(), "message": "hello"})), + function_payload(json!({"target": agent_id.to_string(), "message": "hello"})), ); let output = SendInputHandler .handle(send_invocation) @@ -736,6 +926,7 @@ async fn resume_agent_rejects_when_depth_limit_exceeded() { turn.session_source = SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id: session.conversation_id, depth: max_depth, + agent_path: None, agent_nickname: None, agent_role: None, }); @@ -765,7 +956,7 @@ async fn wait_agent_rejects_non_positive_timeout() { Arc::new(turn), "wait_agent", function_payload(json!({ - "ids": [ThreadId::new().to_string()], + "targets": [ThreadId::new().to_string()], "timeout_ms": 0 })), ); @@ -779,13 +970,13 @@ async fn wait_agent_rejects_non_positive_timeout() { } #[tokio::test] -async fn wait_agent_rejects_invalid_id() { +async fn wait_agent_rejects_invalid_target() { let (session, turn) = make_session_and_context().await; let invocation = invocation( Arc::new(session), Arc::new(turn), "wait_agent", - function_payload(json!({"ids": ["invalid"]})), + function_payload(json!({"targets": ["invalid"]})), ); let Err(err) = WaitAgentHandler.handle(invocation).await else { panic!("invalid id should be rejected"); @@ -793,27 +984,62 @@ async fn wait_agent_rejects_invalid_id() { let FunctionCallError::RespondToModel(msg) = err else { panic!("expected respond-to-model error"); }; - assert!(msg.starts_with("invalid agent id invalid:")); + assert_eq!(msg, "live agent path `/root/invalid` not found"); } #[tokio::test] -async fn wait_agent_rejects_empty_ids() { +async fn wait_agent_rejects_empty_targets() { let (session, turn) = make_session_and_context().await; let invocation = invocation( Arc::new(session), Arc::new(turn), "wait_agent", - function_payload(json!({"ids": []})), + function_payload(json!({"targets": []})), ); let Err(err) = WaitAgentHandler.handle(invocation).await else { panic!("empty ids should be rejected"); }; assert_eq!( err, - FunctionCallError::RespondToModel("ids must be non-empty".to_string()) + FunctionCallError::RespondToModel("agent targets must be non-empty".to_string()) ); } +#[tokio::test] +async fn multi_agent_v2_wait_agent_accepts_targets_argument() { + let (mut session, mut turn) = make_session_and_context().await; + let target = ThreadId::new().to_string(); + let manager = thread_manager(); + session.services.agent_control = manager.agent_control(); + let mut config = (*turn.config).clone(); + config + .features + .enable(Feature::MultiAgentV2) + .expect("test config should allow feature update"); + turn.config = Arc::new(config); + let invocation = invocation( + Arc::new(session), + Arc::new(turn), + "wait_agent", + function_payload(json!({"targets": [target.clone()]})), + ); + let output = WaitAgentHandler + .handle(invocation) + .await + .expect("targets should be accepted in v2 mode"); + let (content, success) = expect_text_output(output); + let result: wait::WaitAgentResult = + serde_json::from_str(&content).expect("wait_agent result should be json"); + assert_eq!( + result, + wait::WaitAgentResult { + status: HashMap::from([(target, AgentStatus::NotFound)]), + timed_out: false, + } + ); + assert_eq!(success, None); +} + #[tokio::test] async fn wait_agent_returns_not_found_for_missing_agents() { let (mut session, turn) = make_session_and_context().await; @@ -826,7 +1052,7 @@ async fn wait_agent_returns_not_found_for_missing_agents() { Arc::new(turn), "wait_agent", function_payload(json!({ - "ids": [id_a.to_string(), id_b.to_string()], + "targets": [id_a.to_string(), id_b.to_string()], "timeout_ms": 1000 })), ); @@ -840,7 +1066,10 @@ async fn wait_agent_returns_not_found_for_missing_agents() { assert_eq!( result, wait::WaitAgentResult { - status: HashMap::from([(id_a, AgentStatus::NotFound), (id_b, AgentStatus::NotFound),]), + status: HashMap::from([ + (id_a.to_string(), AgentStatus::NotFound), + (id_b.to_string(), AgentStatus::NotFound), + ]), timed_out: false } ); @@ -860,7 +1089,7 @@ async fn wait_agent_times_out_when_status_is_not_final() { Arc::new(turn), "wait_agent", function_payload(json!({ - "ids": [agent_id.to_string()], + "targets": [agent_id.to_string()], "timeout_ms": MIN_WAIT_TIMEOUT_MS })), ); @@ -900,7 +1129,7 @@ async fn wait_agent_clamps_short_timeouts_to_minimum() { Arc::new(turn), "wait_agent", function_payload(json!({ - "ids": [agent_id.to_string()], + "targets": [agent_id.to_string()], "timeout_ms": 10 })), ); @@ -950,7 +1179,7 @@ async fn wait_agent_returns_final_status_without_timeout() { Arc::new(turn), "wait_agent", function_payload(json!({ - "ids": [agent_id.to_string()], + "targets": [agent_id.to_string()], "timeout_ms": 1000 })), ); @@ -964,13 +1193,106 @@ async fn wait_agent_returns_final_status_without_timeout() { assert_eq!( result, wait::WaitAgentResult { - status: HashMap::from([(agent_id, AgentStatus::Shutdown)]), + status: HashMap::from([(agent_id.to_string(), AgentStatus::Shutdown)]), timed_out: false } ); assert_eq!(success, None); } +#[tokio::test] +async fn multi_agent_v2_wait_agent_returns_statuses_keyed_by_path() { + #[derive(Debug, Deserialize)] + struct SpawnAgentResult { + task_name: String, + } + + let (mut session, mut turn) = make_session_and_context().await; + let manager = thread_manager(); + let root = manager + .start_thread((*turn.config).clone()) + .await + .expect("root thread should start"); + session.services.agent_control = manager.agent_control(); + session.conversation_id = root.thread_id; + let mut config = (*turn.config).clone(); + config + .features + .enable(Feature::MultiAgentV2) + .expect("test config should allow feature update"); + turn.config = Arc::new(config); + + let session = Arc::new(session); + let turn = Arc::new(turn); + let spawn_output = SpawnAgentHandler + .handle(invocation( + session.clone(), + turn.clone(), + "spawn_agent", + function_payload(json!({ + "message": "inspect this repo", + "task_name": "test_process" + })), + )) + .await + .expect("spawn_agent should succeed"); + let (content, _) = expect_text_output(spawn_output); + let spawn_result: SpawnAgentResult = + serde_json::from_str(&content).expect("spawn result should parse"); + + let agent_id = session + .services + .agent_control + .resolve_agent_reference( + session.conversation_id, + &turn.session_source, + "test_process", + ) + .await + .expect("relative path should resolve"); + let mut status_rx = manager + .agent_control() + .subscribe_status(agent_id) + .await + .expect("subscribe should succeed"); + + let child_thread = manager + .get_thread(agent_id) + .await + .expect("child should exist"); + let _ = child_thread + .submit(Op::Shutdown {}) + .await + .expect("shutdown should submit"); + let _ = timeout(Duration::from_secs(1), status_rx.changed()) + .await + .expect("shutdown status should arrive"); + + let wait_output = WaitAgentHandler + .handle(invocation( + session, + turn, + "wait_agent", + function_payload(json!({ + "targets": ["test_process"], + "timeout_ms": 1000 + })), + )) + .await + .expect("wait_agent should succeed"); + let (content, success) = expect_text_output(wait_output); + let result: wait::WaitAgentResult = + serde_json::from_str(&content).expect("wait_agent result should be json"); + assert_eq!( + result, + wait::WaitAgentResult { + status: HashMap::from([(spawn_result.task_name, AgentStatus::Shutdown)]), + timed_out: false, + } + ); + assert_eq!(success, None); +} + #[tokio::test] async fn close_agent_submits_shutdown_and_returns_previous_status() { let (mut session, turn) = make_session_and_context().await; @@ -985,7 +1307,7 @@ async fn close_agent_submits_shutdown_and_returns_previous_status() { Arc::new(session), Arc::new(turn), "close_agent", - function_payload(json!({"id": agent_id.to_string()})), + function_payload(json!({"target": agent_id.to_string()})), ); let output = CloseAgentHandler .handle(invocation) @@ -1037,13 +1359,12 @@ async fn tool_handlers_cascade_close_and_resume_and_keep_explicitly_closed_subtr let (child_content, child_success) = expect_text_output(child_spawn_output); let child_result: serde_json::Value = serde_json::from_str(&child_content).expect("child spawn result should be json"); - let child_thread_id = agent_id( + let child_thread_id = parse_agent_id( child_result .get("agent_id") .and_then(serde_json::Value::as_str) .expect("child spawn result should include agent_id"), - ) - .expect("child agent_id should be valid"); + ); assert_eq!(child_success, Some(true)); let child_thread = manager @@ -1063,13 +1384,12 @@ async fn tool_handlers_cascade_close_and_resume_and_keep_explicitly_closed_subtr let (grandchild_content, grandchild_success) = expect_text_output(grandchild_spawn_output); let grandchild_result: serde_json::Value = serde_json::from_str(&grandchild_content).expect("grandchild spawn result should be json"); - let grandchild_thread_id = agent_id( + let grandchild_thread_id = parse_agent_id( grandchild_result .get("agent_id") .and_then(serde_json::Value::as_str) .expect("grandchild spawn result should include agent_id"), - ) - .expect("grandchild agent_id should be valid"); + ); assert_eq!(grandchild_success, Some(true)); let close_output = CloseAgentHandler @@ -1077,7 +1397,7 @@ async fn tool_handlers_cascade_close_and_resume_and_keep_explicitly_closed_subtr parent_session.clone(), parent_session.new_default_turn().await, "close_agent", - function_payload(json!({"id": child_thread_id.to_string()})), + function_payload(json!({"target": child_thread_id.to_string()})), )) .await .expect("close_agent should close the child subtree"); @@ -1129,7 +1449,7 @@ async fn tool_handlers_cascade_close_and_resume_and_keep_explicitly_closed_subtr parent_session.clone(), parent_session.new_default_turn().await, "close_agent", - function_payload(json!({"id": child_thread_id.to_string()})), + function_payload(json!({"target": child_thread_id.to_string()})), )) .await .expect("close_agent should be repeatable for the child subtree"); diff --git a/codex-rs/core/src/tools/spec.rs b/codex-rs/core/src/tools/spec.rs index 662e97d10..5ae2f333d 100644 --- a/codex-rs/core/src/tools/spec.rs +++ b/codex-rs/core/src/tools/spec.rs @@ -129,20 +129,29 @@ fn agent_status_output_schema() -> JsonValue { }) } -fn spawn_agent_output_schema() -> JsonValue { +fn spawn_agent_output_schema(multi_agent_v2: bool) -> JsonValue { + let task_name_description = if multi_agent_v2 { + "Canonical task name for the spawned agent." + } else { + "Canonical task name for the spawned agent when one was assigned." + }; json!({ "type": "object", "properties": { "agent_id": { - "type": "string", - "description": "Thread identifier for the spawned agent." + "type": ["string", "null"], + "description": "Thread identifier for the spawned agent when no task name was assigned." + }, + "task_name": { + "type": ["string", "null"], + "description": task_name_description }, "nickname": { "type": ["string", "null"], "description": "User-facing nickname for the spawned agent when available." } }, - "required": ["agent_id", "nickname"], + "required": ["agent_id", "task_name", "nickname"], "additionalProperties": false }) } @@ -178,7 +187,7 @@ fn wait_output_schema() -> JsonValue { "properties": { "status": { "type": "object", - "description": "Final statuses keyed by agent id for agents that finished before the timeout.", + "description": "Final statuses keyed by canonical task name when available, otherwise by agent id.", "additionalProperties": agent_status_output_schema() }, "timed_out": { @@ -276,6 +285,7 @@ pub(crate) struct ToolsConfig { pub js_repl_tools_only: bool, pub can_request_original_image_detail: bool, pub collab_tools: bool, + pub multi_agent_v2: bool, pub artifact_tools: bool, pub request_user_input: bool, pub default_mode_request_user_input: bool, @@ -325,6 +335,7 @@ impl ToolsConfig { let include_js_repl_tools_only = include_js_repl && features.enabled(Feature::JsReplToolsOnly); let include_collab_tools = features.enabled(Feature::Collab); + let include_multi_agent_v2 = features.enabled(Feature::MultiAgentV2); let include_agent_jobs = features.enabled(Feature::SpawnCsv); let include_request_user_input = !matches!(session_source, SessionSource::SubAgent(_)); let include_default_mode_request_user_input = @@ -408,6 +419,7 @@ impl ToolsConfig { js_repl_tools_only: include_js_repl_tools_only, can_request_original_image_detail: include_original_image_detail, collab_tools: include_collab_tools, + multi_agent_v2: include_multi_agent_v2, artifact_tools: include_artifact_tools, request_user_input: include_request_user_input, default_mode_request_user_input: include_default_mode_request_user_input, @@ -1076,7 +1088,8 @@ fn create_collab_input_items_schema() -> JsonSchema { fn create_spawn_agent_tool(config: &ToolsConfig) -> ToolSpec { let available_models_description = spawn_agent_models_description(&config.available_models); - let properties = BTreeMap::from([ + let return_value_description = "Returns the canonical task name when the spawned agent was named, otherwise the agent id, plus the user-facing nickname when available."; + let mut properties = BTreeMap::from([ ( "message".to_string(), JsonSchema::String { @@ -1123,6 +1136,15 @@ fn create_spawn_agent_tool(config: &ToolsConfig) -> ToolSpec { }, ), ]); + properties.insert( + "task_name".to_string(), + JsonSchema::String { + description: Some( + "Optional task name for the new agent. Use lowercase letters, digits, and underscores." + .to_string(), + ), + }, + ); ToolSpec::Function(ResponsesApiTool { name: "spawn_agent".to_string(), @@ -1131,7 +1153,7 @@ fn create_spawn_agent_tool(config: &ToolsConfig) -> ToolSpec { Only use `spawn_agent` if and only if the user explicitly asks for sub-agents, delegation, or parallel agent work. Requests for depth, thoroughness, research, investigation, or detailed codebase analysis do not count as permission to spawn. Agent-role guidance below only helps choose which agent to use after spawning is already authorized; it never authorizes spawning by itself. - Spawn a sub-agent for a well-scoped task. Returns the agent id (and user-facing nickname when available) to use to communicate with this agent. This spawn_agent tool provides you access to smaller but more efficient sub-agents. A mini model can solve many tasks faster than the main model. You should follow the rules and guidelines below to use this tool. + Spawn a sub-agent for a well-scoped task. {return_value_description} This spawn_agent tool provides you access to smaller but more efficient sub-agents. A mini model can solve many tasks faster than the main model. You should follow the rules and guidelines below to use this tool. {available_models_description} ### When to delegate vs. do the subtask yourself @@ -1170,7 +1192,7 @@ fn create_spawn_agent_tool(config: &ToolsConfig) -> ToolSpec { required: None, additional_properties: Some(false.into()), }, - output_schema: Some(spawn_agent_output_schema()), + output_schema: Some(spawn_agent_output_schema(config.multi_agent_v2)), }) } @@ -1335,9 +1357,11 @@ fn create_report_agent_job_result_tool() -> ToolSpec { fn create_send_input_tool() -> ToolSpec { let properties = BTreeMap::from([ ( - "id".to_string(), + "target".to_string(), JsonSchema::String { - description: Some("Agent id to message (from spawn_agent).".to_string()), + description: Some( + "Agent id or canonical task name to message (from spawn_agent).".to_string(), + ), }, ), ( @@ -1369,7 +1393,7 @@ fn create_send_input_tool() -> ToolSpec { defer_loading: None, parameters: JsonSchema::Object { properties, - required: Some(vec!["id".to_string()]), + required: Some(vec!["target".to_string()]), additional_properties: Some(false.into()), }, output_schema: Some(send_input_output_schema()), @@ -1404,11 +1428,11 @@ fn create_resume_agent_tool() -> ToolSpec { fn create_wait_agent_tool() -> ToolSpec { let mut properties = BTreeMap::new(); properties.insert( - "ids".to_string(), + "targets".to_string(), JsonSchema::Array { items: Box::new(JsonSchema::String { description: None }), description: Some( - "Agent ids to wait on. Pass multiple ids to wait for whichever finishes first." + "Agent ids or canonical task names to wait on. Pass multiple targets to wait for whichever finishes first." .to_string(), ), }, @@ -1430,7 +1454,7 @@ fn create_wait_agent_tool() -> ToolSpec { defer_loading: None, parameters: JsonSchema::Object { properties, - required: Some(vec!["ids".to_string()]), + required: Some(vec!["targets".to_string()]), additional_properties: Some(false.into()), }, output_schema: Some(wait_output_schema()), @@ -1556,9 +1580,11 @@ fn create_request_permissions_tool() -> ToolSpec { fn create_close_agent_tool() -> ToolSpec { let mut properties = BTreeMap::new(); properties.insert( - "id".to_string(), + "target".to_string(), JsonSchema::String { - description: Some("Agent id to close (from spawn_agent).".to_string()), + description: Some( + "Agent id or canonical task name to close (from spawn_agent).".to_string(), + ), }, ); @@ -1569,7 +1595,7 @@ fn create_close_agent_tool() -> ToolSpec { defer_loading: None, parameters: JsonSchema::Object { properties, - required: Some(vec!["id".to_string()]), + required: Some(vec!["target".to_string()]), additional_properties: Some(false.into()), }, output_schema: Some(close_agent_output_schema()), @@ -2966,12 +2992,15 @@ pub(crate) fn build_specs_with_discoverable_tools( /*supports_parallel_tool_calls*/ false, config.code_mode_enabled, ); - push_tool_spec( - &mut builder, - create_resume_agent_tool(), - /*supports_parallel_tool_calls*/ false, - config.code_mode_enabled, - ); + if !config.multi_agent_v2 { + push_tool_spec( + &mut builder, + create_resume_agent_tool(), + /*supports_parallel_tool_calls*/ false, + config.code_mode_enabled, + ); + builder.register_handler("resume_agent", Arc::new(ResumeAgentHandler)); + } push_tool_spec( &mut builder, create_wait_agent_tool(), @@ -2986,7 +3015,6 @@ pub(crate) fn build_specs_with_discoverable_tools( ); builder.register_handler("spawn_agent", Arc::new(SpawnAgentHandler)); builder.register_handler("send_input", Arc::new(SendInputHandler)); - builder.register_handler("resume_agent", Arc::new(ResumeAgentHandler)); builder.register_handler("wait_agent", Arc::new(WaitAgentHandler)); builder.register_handler("close_agent", Arc::new(CloseAgentHandler)); } diff --git a/codex-rs/core/src/tools/spec_tests.rs b/codex-rs/core/src/tools/spec_tests.rs index 3142dd46a..1cb6bb166 100644 --- a/codex-rs/core/src/tools/spec_tests.rs +++ b/codex-rs/core/src/tools/spec_tests.rs @@ -469,12 +469,15 @@ fn test_full_toolset_specs_for_gpt5_codex_unified_exec_web_search() { create_view_image_tool(config.can_request_original_image_detail), create_spawn_agent_tool(&config), create_send_input_tool(), - create_resume_agent_tool(), create_wait_agent_tool(), create_close_agent_tool(), ] { expected.insert(tool_name(&spec).to_string(), spec); } + if !config.multi_agent_v2 { + let spec = create_resume_agent_tool(); + expected.insert(tool_name(&spec).to_string(), spec); + } if config.exec_permission_approvals_enabled { let spec = create_request_permissions_tool(); @@ -520,6 +523,96 @@ fn test_build_specs_collab_tools_enabled() { assert_lacks_tool_name(&tools, "spawn_agents_on_csv"); } +#[test] +fn test_build_specs_multi_agent_v2_uses_task_names_and_hides_resume() { + let config = test_config(); + let model_info = ModelsManager::construct_model_info_offline_for_tests("gpt-5-codex", &config); + let mut features = Features::with_defaults(); + features.enable(Feature::Collab); + features.enable(Feature::MultiAgentV2); + let available_models = Vec::new(); + let tools_config = ToolsConfig::new(&ToolsConfigParams { + model_info: &model_info, + available_models: &available_models, + features: &features, + web_search_mode: Some(WebSearchMode::Cached), + session_source: SessionSource::Cli, + sandbox_policy: &SandboxPolicy::DangerFullAccess, + windows_sandbox_level: WindowsSandboxLevel::Disabled, + }); + let (tools, _) = build_specs(&tools_config, None, None, &[]).build(); + + let spawn_agent = find_tool(&tools, "spawn_agent"); + let ToolSpec::Function(ResponsesApiTool { + parameters, + output_schema, + .. + }) = &spawn_agent.spec + else { + panic!("spawn_agent should be a function tool"); + }; + let JsonSchema::Object { + properties, + required, + .. + } = parameters + else { + panic!("spawn_agent should use object params"); + }; + assert!(properties.contains_key("task_name")); + assert_eq!(required.as_ref(), None); + let output_schema = output_schema + .as_ref() + .expect("spawn_agent should define output schema"); + assert_eq!( + output_schema["required"], + json!(["agent_id", "task_name", "nickname"]) + ); + + let send_input = find_tool(&tools, "send_input"); + let ToolSpec::Function(ResponsesApiTool { parameters, .. }) = &send_input.spec else { + panic!("send_input should be a function tool"); + }; + let JsonSchema::Object { + properties, + required, + .. + } = parameters + else { + panic!("send_input should use object params"); + }; + assert!(properties.contains_key("target")); + assert_eq!(required.as_ref(), Some(&vec!["target".to_string()])); + + let wait_agent = find_tool(&tools, "wait_agent"); + let ToolSpec::Function(ResponsesApiTool { + parameters, + output_schema, + .. + }) = &wait_agent.spec + else { + panic!("wait_agent should be a function tool"); + }; + let JsonSchema::Object { + properties, + required, + .. + } = parameters + else { + panic!("wait_agent should use object params"); + }; + assert!(properties.contains_key("targets")); + assert_eq!(required.as_ref(), Some(&vec!["targets".to_string()])); + let output_schema = output_schema + .as_ref() + .expect("wait_agent should define output schema"); + assert_eq!( + output_schema["properties"]["status"]["description"], + json!("Final statuses keyed by canonical task name when available, otherwise by agent id.") + ); + assert_lacks_tool_name(&tools, "resume_agent"); +} + #[test] fn test_build_specs_enable_fanout_enables_agent_jobs_and_collab_tools() { let config = test_config(); diff --git a/codex-rs/core/tests/suite/personality_migration.rs b/codex-rs/core/tests/suite/personality_migration.rs index adbd86cb2..0a8dd61d9 100644 --- a/codex-rs/core/tests/suite/personality_migration.rs +++ b/codex-rs/core/tests/suite/personality_migration.rs @@ -66,6 +66,7 @@ async fn write_rollout_with_user_event(dir: &Path, thread_id: ThreadId) -> io::R originator: "test_originator".to_string(), cli_version: "test_version".to_string(), source: SessionSource::Cli, + agent_path: None, agent_nickname: None, agent_role: None, model_provider: None, @@ -110,6 +111,7 @@ async fn write_rollout_with_meta_only(dir: &Path, thread_id: ThreadId) -> io::Re originator: "test_originator".to_string(), cli_version: "test_version".to_string(), source: SessionSource::Cli, + agent_path: None, agent_nickname: None, agent_role: None, model_provider: None, diff --git a/codex-rs/core/tests/suite/sqlite_state.rs b/codex-rs/core/tests/suite/sqlite_state.rs index 2801f1ab1..248ada02c 100644 --- a/codex-rs/core/tests/suite/sqlite_state.rs +++ b/codex-rs/core/tests/suite/sqlite_state.rs @@ -141,6 +141,7 @@ async fn backfill_scans_existing_rollouts() -> Result<()> { originator: "test".to_string(), cli_version: "test".to_string(), source: SessionSource::default(), + agent_path: None, agent_nickname: None, agent_role: None, model_provider: None, diff --git a/codex-rs/deny.toml b/codex-rs/deny.toml index 29bf6c588..6a3b94798 100644 --- a/codex-rs/deny.toml +++ b/codex-rs/deny.toml @@ -73,8 +73,12 @@ ignore = [ { id = "RUSTSEC-2024-0388", reason = "derivative is unmaintained; pulled in via starlark v0.13.0 used by execpolicy/cli/core; no fixed release yet" }, { id = "RUSTSEC-2025-0057", reason = "fxhash is unmaintained; pulled in via starlark_map/starlark v0.13.0 used by execpolicy/cli/core; no fixed release yet" }, { id = "RUSTSEC-2024-0436", reason = "paste is unmaintained; pulled in via ratatui/rmcp/starlark used by tui/execpolicy; no fixed release yet" }, - # TODO(joshka, nornagon): remove this exception when once we update the ratatui fork to a version that uses lru 0.13+. - { id = "RUSTSEC-2026-0002", reason = "lru 0.12.5 is pulled in via ratatui fork; cannot upgrade until the fork is updated" }, + # TODO(fcoury): remove these exceptions when the aws-lc-sys upgrade path is Bazel-compatible in this workspace. + { id = "RUSTSEC-2026-0044", reason = "aws-lc-sys remains on 0.37.0 because upgrading currently breaks Bazel fetch/build for this workspace" }, + { id = "RUSTSEC-2026-0045", reason = "aws-lc-sys remains on 0.37.0 because upgrading currently breaks Bazel fetch/build for this workspace" }, + { id = "RUSTSEC-2026-0046", reason = "aws-lc-sys remains on 0.37.0 because upgrading currently breaks Bazel fetch/build for this workspace" }, + { id = "RUSTSEC-2026-0047", reason = "aws-lc-sys remains on 0.37.0 because upgrading currently breaks Bazel fetch/build for this workspace" }, + { id = "RUSTSEC-2026-0048", reason = "aws-lc-sys remains on 0.37.0 because upgrading currently breaks Bazel fetch/build for this workspace" }, # TODO(fcoury): remove this exception when syntect drops yaml-rust and bincode, or updates to versions that have fixed the vulnerabilities. { id = "RUSTSEC-2024-0320", reason = "yaml-rust is unmaintained; pulled in via syntect v5.3.0 used by codex-tui for syntax highlighting; no fixed release yet" }, { id = "RUSTSEC-2025-0141", reason = "bincode is unmaintained; pulled in via syntect v5.3.0 used by codex-tui for syntax highlighting; no fixed release yet" }, diff --git a/codex-rs/features/src/lib.rs b/codex-rs/features/src/lib.rs index 938d09885..101cbda73 100644 --- a/codex-rs/features/src/lib.rs +++ b/codex-rs/features/src/lib.rs @@ -138,6 +138,8 @@ pub enum Feature { EnableRequestCompression, /// Enable collab tools. Collab, + /// Enable task-path-based multi-agent routing. + MultiAgentV2, /// Enable CSV-backed agent job tools. SpawnCsv, /// Enable apps. @@ -711,6 +713,12 @@ pub const FEATURES: &[FeatureSpec] = &[ stage: Stage::Stable, default_enabled: true, }, + FeatureSpec { + id: Feature::MultiAgentV2, + key: "multi_agent_v2", + stage: Stage::UnderDevelopment, + default_enabled: false, + }, FeatureSpec { id: Feature::SpawnCsv, key: "enable_fanout", diff --git a/codex-rs/protocol/src/agent_path.rs b/codex-rs/protocol/src/agent_path.rs new file mode 100644 index 000000000..f0b99438d --- /dev/null +++ b/codex-rs/protocol/src/agent_path.rs @@ -0,0 +1,223 @@ +use schemars::JsonSchema; +use serde::Deserialize; +use serde::Serialize; +use std::fmt; +use std::ops::Deref; +use std::str::FromStr; +use ts_rs::TS; + +#[derive( + Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema, TS, +)] +#[serde(try_from = "String", into = "String")] +#[schemars(with = "String")] +#[ts(type = "string")] +pub struct AgentPath(String); + +impl AgentPath { + pub const ROOT: &str = "/root"; + const ROOT_SEGMENT: &str = "root"; + + pub fn root() -> Self { + Self(Self::ROOT.to_string()) + } + + pub fn from_string(path: String) -> Result { + validate_absolute_path(path.as_str())?; + Ok(Self(path)) + } + + pub fn as_str(&self) -> &str { + self.0.as_str() + } + + pub fn is_root(&self) -> bool { + self.as_str() == Self::ROOT + } + + pub fn name(&self) -> &str { + if self.is_root() { + return Self::ROOT_SEGMENT; + } + self.as_str() + .rsplit('/') + .next() + .filter(|segment| !segment.is_empty()) + .unwrap_or(Self::ROOT_SEGMENT) + } + + pub fn join(&self, agent_name: &str) -> Result { + validate_agent_name(agent_name)?; + Self::from_string(format!("{self}/{agent_name}")) + } + + pub fn resolve(&self, reference: &str) -> Result { + if reference.is_empty() { + return Err("agent path must not be empty".to_string()); + } + if reference == Self::ROOT { + return Ok(Self::root()); + } + if reference.starts_with('/') { + return Self::try_from(reference); + } + + validate_relative_reference(reference)?; + Self::from_string(format!("{self}/{reference}")) + } +} + +impl TryFrom for AgentPath { + type Error = String; + + fn try_from(value: String) -> Result { + Self::from_string(value) + } +} + +impl TryFrom<&str> for AgentPath { + type Error = String; + + fn try_from(value: &str) -> Result { + Self::from_string(value.to_string()) + } +} + +impl From for String { + fn from(value: AgentPath) -> Self { + value.0 + } +} + +impl FromStr for AgentPath { + type Err = String; + + fn from_str(s: &str) -> Result { + Self::try_from(s) + } +} + +impl AsRef for AgentPath { + fn as_ref(&self) -> &str { + self.as_str() + } +} + +impl Deref for AgentPath { + type Target = str; + + fn deref(&self) -> &Self::Target { + self.as_str() + } +} + +impl fmt::Display for AgentPath { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(self.as_str()) + } +} + +fn validate_agent_name(agent_name: &str) -> Result<(), String> { + if agent_name.is_empty() { + return Err("agent_name must not be empty".to_string()); + } + if agent_name == AgentPath::ROOT_SEGMENT { + return Err("agent_name `root` is reserved".to_string()); + } + if agent_name == "." || agent_name == ".." { + return Err(format!("agent_name `{agent_name}` is reserved")); + } + if agent_name.contains('/') { + return Err("agent_name must not contain `/`".to_string()); + } + if !agent_name + .chars() + .all(|ch| ch.is_ascii_lowercase() || ch.is_ascii_digit() || ch == '_') + { + return Err( + "agent_name must use only lowercase letters, digits, and underscores".to_string(), + ); + } + Ok(()) +} + +fn validate_absolute_path(path: &str) -> Result<(), String> { + let Some(stripped) = path.strip_prefix('/') else { + return Err("absolute agent paths must start with `/root`".to_string()); + }; + let mut segments = stripped.split('/'); + let Some(root) = segments.next() else { + return Err("absolute agent path must not be empty".to_string()); + }; + if root != AgentPath::ROOT_SEGMENT { + return Err("absolute agent paths must start with `/root`".to_string()); + } + if stripped.ends_with('/') { + return Err("absolute agent path must not end with `/`".to_string()); + } + for segment in segments { + validate_agent_name(segment)?; + } + Ok(()) +} + +fn validate_relative_reference(reference: &str) -> Result<(), String> { + if reference.ends_with('/') { + return Err("relative agent path must not end with `/`".to_string()); + } + for segment in reference.split('/') { + validate_agent_name(segment)?; + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::AgentPath; + use pretty_assertions::assert_eq; + + #[test] + fn root_has_expected_name() { + let root = AgentPath::root(); + assert_eq!(root.as_str(), AgentPath::ROOT); + assert_eq!(root.name(), "root"); + assert!(root.is_root()); + } + + #[test] + fn join_builds_child_paths() { + let root = AgentPath::root(); + let child = root.join("researcher").expect("child path"); + assert_eq!(child.as_str(), "/root/researcher"); + assert_eq!(child.name(), "researcher"); + } + + #[test] + fn resolve_supports_relative_and_absolute_references() { + let current = AgentPath::try_from("/root/researcher").expect("path"); + assert_eq!( + current.resolve("worker").expect("relative path"), + AgentPath::try_from("/root/researcher/worker").expect("path") + ); + assert_eq!( + current.resolve("/root/other").expect("absolute path"), + AgentPath::try_from("/root/other").expect("path") + ); + } + + #[test] + fn invalid_names_and_paths_are_rejected() { + assert_eq!( + AgentPath::root().join("BadName"), + Err("agent_name must use only lowercase letters, digits, and underscores".to_string()) + ); + assert_eq!( + AgentPath::try_from("/not-root"), + Err("absolute agent paths must start with `/root`".to_string()) + ); + assert_eq!( + AgentPath::root().resolve("../sibling"), + Err("agent_name `..` is reserved".to_string()) + ); + } +} diff --git a/codex-rs/protocol/src/lib.rs b/codex-rs/protocol/src/lib.rs index 08466ba4e..56924cc50 100644 --- a/codex-rs/protocol/src/lib.rs +++ b/codex-rs/protocol/src/lib.rs @@ -1,5 +1,7 @@ pub mod account; +mod agent_path; mod thread_id; +pub use agent_path::AgentPath; pub use thread_id::ThreadId; pub mod approvals; pub mod config_types; diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index beccedb78..808be6259 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -12,6 +12,7 @@ use std::path::PathBuf; use std::str::FromStr; use std::time::Duration; +use crate::AgentPath; use crate::ThreadId; use crate::approvals::ElicitationRequestEvent; use crate::config_types::ApprovalsReviewer; @@ -2288,6 +2289,8 @@ pub enum SubAgentSource { parent_thread_id: ThreadId, depth: i32, #[serde(default)] + agent_path: Option, + #[serde(default)] agent_nickname: Option, #[serde(default, alias = "agent_type")] agent_role: Option, @@ -2351,6 +2354,16 @@ impl SessionSource { _ => None, } } + + pub fn get_agent_path(&self) -> Option { + match self { + SessionSource::SubAgent(SubAgentSource::ThreadSpawn { agent_path, .. }) => { + agent_path.clone() + } + _ => None, + } + } + pub fn restriction_product(&self) -> Option { match self { SessionSource::Custom(source) => Product::from_session_source_name(source), @@ -2411,6 +2424,9 @@ pub struct SessionMeta { /// Optional role (agent_role) assigned to an AgentControl-spawned sub-agent. #[serde(default, alias = "agent_type", skip_serializing_if = "Option::is_none")] pub agent_role: Option, + /// Optional canonical agent path assigned to an AgentControl-spawned sub-agent. + #[serde(skip_serializing_if = "Option::is_none")] + pub agent_path: Option, pub model_provider: Option, /// base_instructions for the session. This *should* always be present when creating a new session, /// but may be missing for older sessions. If not present, fall back to rendering the base_instructions @@ -2434,6 +2450,7 @@ impl Default for SessionMeta { source: SessionSource::default(), agent_nickname: None, agent_role: None, + agent_path: None, model_provider: None, base_instructions: None, dynamic_tools: None, diff --git a/codex-rs/state/migrations/0022_threads_agent_path.sql b/codex-rs/state/migrations/0022_threads_agent_path.sql new file mode 100644 index 000000000..934094570 --- /dev/null +++ b/codex-rs/state/migrations/0022_threads_agent_path.sql @@ -0,0 +1 @@ +ALTER TABLE threads ADD COLUMN agent_path TEXT; diff --git a/codex-rs/state/src/extract.rs b/codex-rs/state/src/extract.rs index 037b1f5d2..833938800 100644 --- a/codex-rs/state/src/extract.rs +++ b/codex-rs/state/src/extract.rs @@ -50,6 +50,7 @@ fn apply_session_meta_from_item(metadata: &mut ThreadMetadata, meta_line: &Sessi metadata.source = enum_to_string(&meta_line.meta.source); metadata.agent_nickname = meta_line.meta.agent_nickname.clone(); metadata.agent_role = meta_line.meta.agent_role.clone(); + metadata.agent_path = meta_line.meta.agent_path.clone(); if let Some(provider) = meta_line.meta.model_provider.as_deref() { metadata.model_provider = provider.to_string(); } @@ -251,6 +252,7 @@ mod tests { originator: "codex_cli_rs".to_string(), cli_version: "0.0.0".to_string(), source: SessionSource::Cli, + agent_path: None, agent_nickname: None, agent_role: None, model_provider: Some("openai".to_string()), @@ -377,6 +379,7 @@ mod tests { originator: "codex_cli_rs".to_string(), cli_version: "0.0.0".to_string(), source: SessionSource::Cli, + agent_path: None, agent_nickname: None, agent_role: None, model_provider: Some("openai".to_string()), @@ -402,6 +405,7 @@ mod tests { created_at, updated_at: created_at, source: "cli".to_string(), + agent_path: None, agent_nickname: None, agent_role: None, model_provider: "openai".to_string(), diff --git a/codex-rs/state/src/model/thread_metadata.rs b/codex-rs/state/src/model/thread_metadata.rs index db4a2d95e..03a8a6f94 100644 --- a/codex-rs/state/src/model/thread_metadata.rs +++ b/codex-rs/state/src/model/thread_metadata.rs @@ -69,6 +69,8 @@ pub struct ThreadMetadata { pub agent_nickname: Option, /// Optional role (agent_role) assigned to an AgentControl-spawned sub-agent. pub agent_role: Option, + /// Optional canonical agent path assigned to an AgentControl-spawned sub-agent. + pub agent_path: Option, /// The model provider identifier. pub model_provider: String, /// The latest observed model for the thread. @@ -116,6 +118,8 @@ pub struct ThreadMetadataBuilder { pub agent_nickname: Option, /// Optional role (agent_role) assigned to the session. pub agent_role: Option, + /// Optional canonical agent path assigned to the session. + pub agent_path: Option, /// The model provider identifier, if known. pub model_provider: Option, /// The working directory for the thread. @@ -152,6 +156,7 @@ impl ThreadMetadataBuilder { source, agent_nickname: None, agent_role: None, + agent_path: None, model_provider: None, cwd: PathBuf::new(), cli_version: None, @@ -182,6 +187,10 @@ impl ThreadMetadataBuilder { source, agent_nickname: self.agent_nickname.clone(), agent_role: self.agent_role.clone(), + agent_path: self + .agent_path + .clone() + .or_else(|| self.source.get_agent_path().map(Into::into)), model_provider: self .model_provider .clone() @@ -241,6 +250,9 @@ impl ThreadMetadata { if self.agent_role != other.agent_role { diffs.push("agent_role"); } + if self.agent_path != other.agent_path { + diffs.push("agent_path"); + } if self.model_provider != other.model_provider { diffs.push("model_provider"); } @@ -300,6 +312,7 @@ pub(crate) struct ThreadRow { source: String, agent_nickname: Option, agent_role: Option, + agent_path: Option, model_provider: String, model: Option, reasoning_effort: Option, @@ -326,6 +339,7 @@ impl ThreadRow { source: row.try_get("source")?, agent_nickname: row.try_get("agent_nickname")?, agent_role: row.try_get("agent_role")?, + agent_path: row.try_get("agent_path")?, model_provider: row.try_get("model_provider")?, model: row.try_get("model")?, reasoning_effort: row.try_get("reasoning_effort")?, @@ -356,6 +370,7 @@ impl TryFrom for ThreadMetadata { source, agent_nickname, agent_role, + agent_path, model_provider, model, reasoning_effort, @@ -379,6 +394,7 @@ impl TryFrom for ThreadMetadata { source, agent_nickname, agent_role, + agent_path, model_provider, model, reasoning_effort: reasoning_effort @@ -447,6 +463,7 @@ mod tests { source: "cli".to_string(), agent_nickname: None, agent_role: None, + agent_path: None, model_provider: "openai".to_string(), model: Some("gpt-5".to_string()), reasoning_effort: reasoning_effort.map(str::to_string), @@ -474,6 +491,7 @@ mod tests { source: "cli".to_string(), agent_nickname: None, agent_role: None, + agent_path: None, model_provider: "openai".to_string(), model: Some("gpt-5".to_string()), reasoning_effort, diff --git a/codex-rs/state/src/runtime/memories.rs b/codex-rs/state/src/runtime/memories.rs index 5ca33885f..386e0fd3e 100644 --- a/codex-rs/state/src/runtime/memories.rs +++ b/codex-rs/state/src/runtime/memories.rs @@ -166,6 +166,7 @@ SELECT created_at, updated_at, source, + agent_path, agent_nickname, agent_role, model_provider, diff --git a/codex-rs/state/src/runtime/test_support.rs b/codex-rs/state/src/runtime/test_support.rs index 229ece64b..5f0733685 100644 --- a/codex-rs/state/src/runtime/test_support.rs +++ b/codex-rs/state/src/runtime/test_support.rs @@ -50,6 +50,7 @@ pub(super) fn test_thread_metadata( source: "cli".to_string(), agent_nickname: None, agent_role: None, + agent_path: None, model_provider: "test-provider".to_string(), model: Some("gpt-5".to_string()), reasoning_effort: Some(ReasoningEffort::Medium), diff --git a/codex-rs/state/src/runtime/threads.rs b/codex-rs/state/src/runtime/threads.rs index 1f62deb62..0972f9d1f 100644 --- a/codex-rs/state/src/runtime/threads.rs +++ b/codex-rs/state/src/runtime/threads.rs @@ -13,6 +13,7 @@ SELECT source, agent_nickname, agent_role, + agent_path, model_provider, model, reasoning_effort, @@ -142,6 +143,62 @@ ON CONFLICT(child_thread_id) DO UPDATE SET .await } + /// Find a direct spawned child of `parent_thread_id` by canonical agent path. + pub async fn find_thread_spawn_child_by_path( + &self, + parent_thread_id: ThreadId, + agent_path: &str, + ) -> anyhow::Result> { + let rows = sqlx::query( + r#" +SELECT threads.id +FROM thread_spawn_edges +JOIN threads ON threads.id = thread_spawn_edges.child_thread_id +WHERE thread_spawn_edges.parent_thread_id = ? + AND threads.agent_path = ? +ORDER BY threads.id +LIMIT 2 + "#, + ) + .bind(parent_thread_id.to_string()) + .bind(agent_path) + .fetch_all(self.pool.as_ref()) + .await?; + one_thread_id_from_rows(rows, agent_path) + } + + /// Find a spawned descendant of `root_thread_id` by canonical agent path. + pub async fn find_thread_spawn_descendant_by_path( + &self, + root_thread_id: ThreadId, + agent_path: &str, + ) -> anyhow::Result> { + let rows = sqlx::query( + r#" +WITH RECURSIVE subtree(child_thread_id) AS ( + SELECT child_thread_id + FROM thread_spawn_edges + WHERE parent_thread_id = ? + UNION ALL + SELECT edge.child_thread_id + FROM thread_spawn_edges AS edge + JOIN subtree ON edge.parent_thread_id = subtree.child_thread_id +) +SELECT threads.id +FROM subtree +JOIN threads ON threads.id = subtree.child_thread_id +WHERE threads.agent_path = ? +ORDER BY threads.id +LIMIT 2 + "#, + ) + .bind(root_thread_id.to_string()) + .bind(agent_path) + .fetch_all(self.pool.as_ref()) + .await?; + one_thread_id_from_rows(rows, agent_path) + } + async fn list_thread_spawn_children_matching( &self, parent_thread_id: ThreadId, @@ -293,6 +350,7 @@ SELECT source, agent_nickname, agent_role, + agent_path, model_provider, model, reasoning_effort, @@ -393,6 +451,7 @@ INSERT INTO threads ( source, agent_nickname, agent_role, + agent_path, model_provider, model, reasoning_effort, @@ -409,7 +468,7 @@ INSERT INTO threads ( git_branch, git_origin_url, memory_mode -) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO NOTHING "#, ) @@ -420,6 +479,7 @@ ON CONFLICT(id) DO NOTHING .bind(metadata.source.as_str()) .bind(metadata.agent_nickname.as_deref()) .bind(metadata.agent_role.as_deref()) + .bind(metadata.agent_path.as_deref()) .bind(metadata.model_provider.as_str()) .bind(metadata.model.as_deref()) .bind( @@ -518,6 +578,7 @@ INSERT INTO threads ( source, agent_nickname, agent_role, + agent_path, model_provider, model, reasoning_effort, @@ -534,7 +595,7 @@ INSERT INTO threads ( git_branch, git_origin_url, memory_mode -) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET rollout_path = excluded.rollout_path, created_at = excluded.created_at, @@ -542,6 +603,7 @@ ON CONFLICT(id) DO UPDATE SET source = excluded.source, agent_nickname = excluded.agent_nickname, agent_role = excluded.agent_role, + agent_path = excluded.agent_path, model_provider = excluded.model_provider, model = excluded.model, reasoning_effort = excluded.reasoning_effort, @@ -566,6 +628,7 @@ ON CONFLICT(id) DO UPDATE SET .bind(metadata.source.as_str()) .bind(metadata.agent_nickname.as_deref()) .bind(metadata.agent_role.as_deref()) + .bind(metadata.agent_path.as_deref()) .bind(metadata.model_provider.as_str()) .bind(metadata.model.as_deref()) .bind( @@ -753,6 +816,26 @@ ON CONFLICT(thread_id, position) DO NOTHING } } +fn one_thread_id_from_rows( + rows: Vec, + agent_path: &str, +) -> anyhow::Result> { + let mut ids = rows + .into_iter() + .map(|row| { + let id: String = row.try_get("id")?; + ThreadId::try_from(id).map_err(anyhow::Error::from) + }) + .collect::, _>>()?; + match ids.len() { + 0 => Ok(None), + 1 => Ok(ids.pop()), + _ => Err(anyhow::anyhow!( + "multiple agents found for canonical path `{agent_path}`" + )), + } +} + pub(super) fn extract_dynamic_tools(items: &[RolloutItem]) -> Option>> { items.iter().find_map(|item| match item { RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.dynamic_tools.clone()), @@ -942,6 +1025,7 @@ mod tests { originator: String::new(), cli_version: String::new(), source: SessionSource::Cli, + agent_path: None, agent_nickname: None, agent_role: None, model_provider: None, @@ -996,6 +1080,7 @@ mod tests { originator: String::new(), cli_version: String::new(), source: SessionSource::Cli, + agent_path: None, agent_nickname: None, agent_role: None, model_provider: None, diff --git a/patches/aws-lc-sys_memcmp_check.patch b/patches/aws-lc-sys_memcmp_check.patch index beaa4cf85..6d652b9ed 100644 --- a/patches/aws-lc-sys_memcmp_check.patch +++ b/patches/aws-lc-sys_memcmp_check.patch @@ -10,7 +10,7 @@ diff --git a/builder/cc_builder.rs b/builder/cc_builder.rs #[non_exhaustive] #[derive(PartialEq, Eq)] -@@ -661,6 +661,16 @@ +@@ -681,6 +681,16 @@ } let mut memcmp_compile_args = Vec::from(memcmp_compiler.args()); @@ -27,7 +27,7 @@ diff --git a/builder/cc_builder.rs b/builder/cc_builder.rs // This check invokes the compiled executable and hence needs to link // it. CMake handles this via LDFLAGS but `cc` doesn't. In setups with // custom linker setups this could lead to a mismatch between the -@@ -672,6 +682,15 @@ +@@ -692,6 +702,15 @@ } } @@ -43,7 +43,7 @@ diff --git a/builder/cc_builder.rs b/builder/cc_builder.rs memcmp_compile_args.push( self.manifest_dir .join("aws-lc") -@@ -725,6 +744,40 @@ +@@ -742,6 +761,40 @@ } let _ = fs::remove_file(exec_path); }