From 515ac2cd19a22151388d16c2d6c73e604cdbd3a0 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Sat, 24 Jan 2026 15:21:34 +0100 Subject: [PATCH] feat: add thread spawn source for collab tools (#9769) --- .../app-server-protocol/src/protocol/v2.rs | 5 ++- codex-rs/codex-api/src/requests/headers.rs | 11 +++---- codex-rs/core/src/agent/control.rs | 26 ++++++++++----- codex-rs/core/src/client.rs | 12 +++---- codex-rs/core/src/thread_manager.rs | 33 +++++++++++++++++-- codex-rs/core/src/tools/handlers/collab.rs | 10 +++++- codex-rs/protocol/src/protocol.rs | 4 +++ 7 files changed, 75 insertions(+), 26 deletions(-) diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index acf08be99..d1accddc0 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -30,6 +30,7 @@ use codex_protocol::protocol::SkillErrorInfo as CoreSkillErrorInfo; use codex_protocol::protocol::SkillInterface as CoreSkillInterface; use codex_protocol::protocol::SkillMetadata as CoreSkillMetadata; use codex_protocol::protocol::SkillScope as CoreSkillScope; +use codex_protocol::protocol::SubAgentSource as CoreSubAgentSource; use codex_protocol::protocol::TokenUsage as CoreTokenUsage; use codex_protocol::protocol::TokenUsageInfo as CoreTokenUsageInfo; use codex_protocol::user_input::ByteRange as CoreByteRange; @@ -700,6 +701,7 @@ pub enum SessionSource { VsCode, Exec, AppServer, + SubAgent(CoreSubAgentSource), #[serde(other)] Unknown, } @@ -711,7 +713,7 @@ impl From for SessionSource { CoreSessionSource::VSCode => SessionSource::VsCode, CoreSessionSource::Exec => SessionSource::Exec, CoreSessionSource::Mcp => SessionSource::AppServer, - CoreSessionSource::SubAgent(_) => SessionSource::Unknown, + CoreSessionSource::SubAgent(sub) => SessionSource::SubAgent(sub), CoreSessionSource::Unknown => SessionSource::Unknown, } } @@ -724,6 +726,7 @@ impl From for CoreSessionSource { SessionSource::VsCode => CoreSessionSource::VSCode, SessionSource::Exec => CoreSessionSource::Exec, SessionSource::AppServer => CoreSessionSource::Mcp, + SessionSource::SubAgent(sub) => CoreSessionSource::SubAgent(sub), SessionSource::Unknown => CoreSessionSource::Unknown, } } diff --git a/codex-rs/codex-api/src/requests/headers.rs b/codex-rs/codex-api/src/requests/headers.rs index 02f08724f..02f8c61c3 100644 --- a/codex-rs/codex-api/src/requests/headers.rs +++ b/codex-rs/codex-api/src/requests/headers.rs @@ -15,13 +15,12 @@ pub(crate) fn subagent_header(source: &Option) -> Option return None; }; match sub { + codex_protocol::protocol::SubAgentSource::Review => Some("review".to_string()), + codex_protocol::protocol::SubAgentSource::Compact => Some("compact".to_string()), + codex_protocol::protocol::SubAgentSource::ThreadSpawn { .. } => { + Some("collab_spawn".to_string()) + } codex_protocol::protocol::SubAgentSource::Other(label) => Some(label.clone()), - other => Some( - serde_json::to_value(other) - .ok() - .and_then(|v| v.as_str().map(std::string::ToString::to_string)) - .unwrap_or_else(|| "other".to_string()), - ), } } diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index 6c5ebf9f0..611c0d164 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -39,12 +39,20 @@ impl AgentControl { &self, config: crate::config::Config, prompt: String, + session_source: Option, ) -> CodexResult { let state = self.upgrade()?; let reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?; // The same `AgentControl` is sent to spawn the thread. - let new_thread = state.spawn_new_thread(config, self.clone()).await?; + let new_thread = match session_source { + Some(session_source) => { + state + .spawn_new_thread_with_source(config, self.clone(), session_source) + .await? + } + None => state.spawn_new_thread(config, self.clone()).await?, + }; reservation.commit(new_thread.thread_id); // Notify a new thread has been created. This notification will be processed by clients @@ -268,7 +276,7 @@ mod tests { let control = AgentControl::default(); let (_home, config) = test_config().await; let err = control - .spawn_agent(config, "hello".to_string()) + .spawn_agent(config, "hello".to_string(), None) .await .expect_err("spawn_agent should fail without a manager"); assert_eq!( @@ -370,7 +378,7 @@ mod tests { let harness = AgentControlHarness::new().await; let thread_id = harness .control - .spawn_agent(harness.config.clone(), "spawned".to_string()) + .spawn_agent(harness.config.clone(), "spawned".to_string(), None) .await .expect("spawn_agent should succeed"); let _thread = harness @@ -417,12 +425,12 @@ mod tests { .expect("start thread"); let first_agent_id = control - .spawn_agent(config.clone(), "hello".to_string()) + .spawn_agent(config.clone(), "hello".to_string(), None) .await .expect("spawn_agent should succeed"); let err = control - .spawn_agent(config, "hello again".to_string()) + .spawn_agent(config, "hello again".to_string(), None) .await .expect_err("spawn_agent should respect max threads"); let CodexErr::AgentLimitReached { @@ -455,7 +463,7 @@ mod tests { let control = manager.agent_control(); let first_agent_id = control - .spawn_agent(config.clone(), "hello".to_string()) + .spawn_agent(config.clone(), "hello".to_string(), None) .await .expect("spawn_agent should succeed"); let _ = control @@ -464,7 +472,7 @@ mod tests { .expect("shutdown agent"); let second_agent_id = control - .spawn_agent(config.clone(), "hello again".to_string()) + .spawn_agent(config.clone(), "hello again".to_string(), None) .await .expect("spawn_agent should succeed after shutdown"); let _ = control @@ -490,12 +498,12 @@ mod tests { let cloned = control.clone(); let first_agent_id = cloned - .spawn_agent(config.clone(), "hello".to_string()) + .spawn_agent(config.clone(), "hello".to_string(), None) .await .expect("spawn_agent should succeed"); let err = control - .spawn_agent(config, "hello again".to_string()) + .spawn_agent(config, "hello again".to_string(), None) .await .expect_err("spawn_agent should respect shared guard"); let CodexErr::AgentLimitReached { max_threads } = err else { diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index f0f2b125f..437380dee 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -226,13 +226,11 @@ impl ModelClient { let mut extra_headers = ApiHeaderMap::new(); if let SessionSource::SubAgent(sub) = &self.state.session_source { - let subagent = if let crate::protocol::SubAgentSource::Other(label) = sub { - label.clone() - } else { - serde_json::to_value(sub) - .ok() - .and_then(|v| v.as_str().map(std::string::ToString::to_string)) - .unwrap_or_else(|| "other".to_string()) + let subagent = match sub { + crate::protocol::SubAgentSource::Review => "review".to_string(), + crate::protocol::SubAgentSource::Compact => "compact".to_string(), + crate::protocol::SubAgentSource::ThreadSpawn { .. } => "collab_spawn".to_string(), + crate::protocol::SubAgentSource::Other(label) => label.clone(), }; if let Ok(val) = HeaderValue::from_str(&subagent) { extra_headers.insert("x-openai-subagent", val); diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 70d8cb790..fac08cb3f 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -314,11 +314,22 @@ impl ThreadManagerState { config: Config, agent_control: AgentControl, ) -> CodexResult { - self.spawn_thread( + self.spawn_new_thread_with_source(config, agent_control, self.session_source.clone()) + .await + } + + pub(crate) async fn spawn_new_thread_with_source( + &self, + config: Config, + agent_control: AgentControl, + session_source: SessionSource, + ) -> CodexResult { + self.spawn_thread_with_source( config, InitialHistory::New, Arc::clone(&self.auth_manager), agent_control, + session_source, ) .await } @@ -330,6 +341,24 @@ impl ThreadManagerState { initial_history: InitialHistory, auth_manager: Arc, agent_control: AgentControl, + ) -> CodexResult { + self.spawn_thread_with_source( + config, + initial_history, + auth_manager, + agent_control, + self.session_source.clone(), + ) + .await + } + + pub(crate) async fn spawn_thread_with_source( + &self, + config: Config, + initial_history: InitialHistory, + auth_manager: Arc, + agent_control: AgentControl, + session_source: SessionSource, ) -> CodexResult { let CodexSpawnOk { codex, thread_id, .. @@ -339,7 +368,7 @@ impl ThreadManagerState { Arc::clone(&self.models_manager), Arc::clone(&self.skills_manager), initial_history, - self.session_source.clone(), + session_source, agent_control, ) .await?; diff --git a/codex-rs/core/src/tools/handlers/collab.rs b/codex-rs/core/src/tools/handlers/collab.rs index 56463e644..6bdebf6a2 100644 --- a/codex-rs/core/src/tools/handlers/collab.rs +++ b/codex-rs/core/src/tools/handlers/collab.rs @@ -78,6 +78,8 @@ impl ToolHandler for CollabHandler { mod spawn { use super::*; use crate::agent::AgentRole; + use codex_protocol::protocol::SessionSource; + use codex_protocol::protocol::SubAgentSource; use std::sync::Arc; #[derive(Debug, Deserialize)] @@ -125,7 +127,13 @@ mod spawn { let result = session .services .agent_control - .spawn_agent(config, prompt.clone()) + .spawn_agent( + config, + prompt.clone(), + Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + parent_thread_id: session.conversation_id, + })), + ) .await .map_err(collab_spawn_error); let (new_thread_id, status) = match &result { diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 1f35fed60..5fcefc7a2 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -1518,6 +1518,7 @@ pub enum SessionSource { pub enum SubAgentSource { Review, Compact, + ThreadSpawn { parent_thread_id: ThreadId }, Other(String), } @@ -1539,6 +1540,9 @@ impl fmt::Display for SubAgentSource { match self { SubAgentSource::Review => f.write_str("review"), SubAgentSource::Compact => f.write_str("compact"), + SubAgentSource::ThreadSpawn { parent_thread_id } => { + write!(f, "thread_spawn_{parent_thread_id}") + } SubAgentSource::Other(other) => f.write_str(other), } }