feat: add connector capabilities to sub-agents (#11191)

This commit is contained in:
jif-oai 2026-02-10 11:53:01 +00:00 committed by GitHub
parent 6049ff02a0
commit 87ccc5bbae
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 314 additions and 89 deletions

View file

@ -40,7 +40,7 @@ impl AgentControl {
pub(crate) async fn spawn_agent(
&self,
config: crate::config::Config,
prompt: String,
items: Vec<UserInput>,
session_source: Option<SessionSource>,
) -> CodexResult<ThreadId> {
let state = self.upgrade()?;
@ -62,7 +62,7 @@ impl AgentControl {
// TODO(jif) add helper for drain
state.notify_thread_created(new_thread.thread_id);
self.send_prompt(new_thread.thread_id, prompt).await?;
self.send_input(new_thread.thread_id, items).await?;
Ok(new_thread.thread_id)
}
@ -93,22 +93,18 @@ impl AgentControl {
Ok(resumed_thread.thread_id)
}
/// Send a `user` prompt to an existing agent thread.
pub(crate) async fn send_prompt(
/// Send rich user input items to an existing agent thread.
pub(crate) async fn send_input(
&self,
agent_id: ThreadId,
prompt: String,
items: Vec<UserInput>,
) -> CodexResult<String> {
let state = self.upgrade()?;
let result = state
.send_op(
agent_id,
Op::UserInput {
items: vec![UserInput::Text {
text: prompt,
// Agent control prompts are plain text with no UI text elements.
text_elements: Vec::new(),
}],
items,
final_output_json_schema: None,
},
)
@ -202,6 +198,13 @@ mod tests {
test_config_with_cli_overrides(Vec::new()).await
}
fn text_input(text: &str) -> Vec<UserInput> {
vec![UserInput::Text {
text: text.to_string(),
text_elements: Vec::new(),
}]
}
struct AgentControlHarness {
_home: TempDir,
config: Config,
@ -237,12 +240,18 @@ mod tests {
}
#[tokio::test]
async fn send_prompt_errors_when_manager_dropped() {
async fn send_input_errors_when_manager_dropped() {
let control = AgentControl::default();
let err = control
.send_prompt(ThreadId::new(), "hello".to_string())
.send_input(
ThreadId::new(),
vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}],
)
.await
.expect_err("send_prompt should fail without a manager");
.expect_err("send_input should fail without a manager");
assert_eq!(
err.to_string(),
"unsupported operation: thread manager dropped"
@ -306,7 +315,7 @@ mod tests {
let control = AgentControl::default();
let (_home, config) = test_config().await;
let err = control
.spawn_agent(config, "hello".to_string(), None)
.spawn_agent(config, text_input("hello"), None)
.await
.expect_err("spawn_agent should fail without a manager");
assert_eq!(
@ -334,14 +343,20 @@ mod tests {
}
#[tokio::test]
async fn send_prompt_errors_when_thread_missing() {
async fn send_input_errors_when_thread_missing() {
let harness = AgentControlHarness::new().await;
let thread_id = ThreadId::new();
let err = harness
.control
.send_prompt(thread_id, "hello".to_string())
.send_input(
thread_id,
vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}],
)
.await
.expect_err("send_prompt should fail for missing thread");
.expect_err("send_input should fail for missing thread");
assert_matches!(err, CodexErr::ThreadNotFound(id) if id == thread_id);
}
@ -393,15 +408,21 @@ mod tests {
}
#[tokio::test]
async fn send_prompt_submits_user_message() {
async fn send_input_submits_user_message() {
let harness = AgentControlHarness::new().await;
let (thread_id, _thread) = harness.start_thread().await;
let submission_id = harness
.control
.send_prompt(thread_id, "hello from tests".to_string())
.send_input(
thread_id,
vec![UserInput::Text {
text: "hello from tests".to_string(),
text_elements: Vec::new(),
}],
)
.await
.expect("send_prompt should succeed");
.expect("send_input should succeed");
assert!(!submission_id.is_empty());
let expected = (
thread_id,
@ -426,7 +447,7 @@ mod tests {
let harness = AgentControlHarness::new().await;
let thread_id = harness
.control
.spawn_agent(harness.config.clone(), "spawned".to_string(), None)
.spawn_agent(harness.config.clone(), text_input("spawned"), None)
.await
.expect("spawn_agent should succeed");
let _thread = harness
@ -473,12 +494,12 @@ mod tests {
.expect("start thread");
let first_agent_id = control
.spawn_agent(config.clone(), "hello".to_string(), None)
.spawn_agent(config.clone(), text_input("hello"), None)
.await
.expect("spawn_agent should succeed");
let err = control
.spawn_agent(config, "hello again".to_string(), None)
.spawn_agent(config, text_input("hello again"), None)
.await
.expect_err("spawn_agent should respect max threads");
let CodexErr::AgentLimitReached {
@ -511,7 +532,7 @@ mod tests {
let control = manager.agent_control();
let first_agent_id = control
.spawn_agent(config.clone(), "hello".to_string(), None)
.spawn_agent(config.clone(), text_input("hello"), None)
.await
.expect("spawn_agent should succeed");
let _ = control
@ -520,7 +541,7 @@ mod tests {
.expect("shutdown agent");
let second_agent_id = control
.spawn_agent(config.clone(), "hello again".to_string(), None)
.spawn_agent(config.clone(), text_input("hello again"), None)
.await
.expect("spawn_agent should succeed after shutdown");
let _ = control
@ -546,12 +567,12 @@ mod tests {
let cloned = control.clone();
let first_agent_id = cloned
.spawn_agent(config.clone(), "hello".to_string(), None)
.spawn_agent(config.clone(), text_input("hello"), None)
.await
.expect("spawn_agent should succeed");
let err = control
.spawn_agent(config, "hello again".to_string(), None)
.spawn_agent(config, text_input("hello again"), None)
.await
.expect_err("spawn_agent should respect shared guard");
let CodexErr::AgentLimitReached { max_threads } = err else {
@ -581,7 +602,7 @@ mod tests {
let control = manager.agent_control();
let resumable_id = control
.spawn_agent(config.clone(), "hello".to_string(), None)
.spawn_agent(config.clone(), text_input("hello"), None)
.await
.expect("spawn_agent should succeed");
let rollout_path = manager
@ -596,7 +617,7 @@ mod tests {
.expect("shutdown resumable thread");
let active_id = control
.spawn_agent(config.clone(), "occupy".to_string(), None)
.spawn_agent(config.clone(), text_input("occupy"), None)
.await
.expect("spawn_agent should succeed for active slot");
@ -640,7 +661,7 @@ mod tests {
.expect_err("resume should fail for missing rollout path");
let resumed_id = control
.spawn_agent(config, "hello".to_string(), None)
.spawn_agent(config, text_input("hello"), None)
.await
.expect("spawn should succeed after failed resume");
let _ = control

View file

@ -28,6 +28,7 @@ use codex_protocol::protocol::CollabWaitingBeginEvent;
use codex_protocol::protocol::CollabWaitingEndEvent;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::user_input::UserInput;
use serde::Deserialize;
use serde::Serialize;
@ -95,7 +96,8 @@ mod spawn {
#[derive(Debug, Deserialize)]
struct SpawnAgentArgs {
message: String,
message: Option<String>,
items: Option<Vec<UserInput>>,
agent_type: Option<AgentRole>,
}
@ -112,12 +114,8 @@ mod spawn {
) -> Result<ToolOutput, FunctionCallError> {
let args: SpawnAgentArgs = parse_arguments(&arguments)?;
let agent_role = args.agent_type.unwrap_or(AgentRole::Default);
let prompt = args.message;
if prompt.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"Empty message can't be sent to an agent".to_string(),
));
}
let input_items = parse_collab_input(args.message, args.items)?;
let prompt = input_preview(&input_items);
let session_source = turn.session_source.clone();
let child_depth = next_thread_spawn_depth(&session_source);
if exceeds_thread_spawn_depth_limit(child_depth) {
@ -150,7 +148,7 @@ mod spawn {
.agent_control
.spawn_agent(
config,
prompt.clone(),
input_items,
Some(thread_spawn_source(session.conversation_id, child_depth)),
)
.await
@ -198,7 +196,8 @@ mod send_input {
#[derive(Debug, Deserialize)]
struct SendInputArgs {
id: String,
message: String,
message: Option<String>,
items: Option<Vec<UserInput>>,
#[serde(default)]
interrupt: bool,
}
@ -216,12 +215,8 @@ mod send_input {
) -> Result<ToolOutput, FunctionCallError> {
let args: SendInputArgs = parse_arguments(&arguments)?;
let receiver_thread_id = agent_id(&args.id)?;
let prompt = args.message;
if prompt.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"Empty message can't be sent to an agent".to_string(),
));
}
let input_items = parse_collab_input(args.message, args.items)?;
let prompt = input_preview(&input_items);
if args.interrupt {
session
.services
@ -245,7 +240,7 @@ mod send_input {
let result = session
.services
.agent_control
.send_prompt(receiver_thread_id, prompt.clone())
.send_input(receiver_thread_id, input_items)
.await
.map_err(|err| collab_agent_error(receiver_thread_id, err));
let status = session
@ -738,6 +733,57 @@ fn thread_spawn_source(parent_thread_id: ThreadId, depth: i32) -> SessionSource
})
}
fn parse_collab_input(
message: Option<String>,
items: Option<Vec<UserInput>>,
) -> Result<Vec<UserInput>, FunctionCallError> {
match (message, items) {
(Some(_), Some(_)) => Err(FunctionCallError::RespondToModel(
"Provide either message or items, but not both".to_string(),
)),
(None, None) => Err(FunctionCallError::RespondToModel(
"Provide one of: message or items".to_string(),
)),
(Some(message), None) => {
if message.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"Empty message can't be sent to an agent".to_string(),
));
}
Ok(vec![UserInput::Text {
text: message,
text_elements: Vec::new(),
}])
}
(None, Some(items)) => {
if items.is_empty() {
return Err(FunctionCallError::RespondToModel(
"Items can't be empty".to_string(),
));
}
Ok(items)
}
}
}
fn input_preview(items: &[UserInput]) -> String {
let parts: Vec<String> = items
.iter()
.map(|item| match item {
UserInput::Text { text, .. } => text.clone(),
UserInput::Image { .. } => "[image]".to_string(),
UserInput::LocalImage { path } => format!("[local_image:{}]", path.display()),
UserInput::Skill { name, path } => {
format!("[skill:${name}]({})", path.display())
}
UserInput::Mention { name, path } => format!("[mention:${name}]({path})"),
_ => "[input]".to_string(),
})
.collect();
parts.join("\n")
}
fn build_agent_spawn_config(
base_instructions: &BaseInstructions,
turn: &TurnContext,
@ -915,6 +961,29 @@ mod tests {
);
}
#[tokio::test]
async fn spawn_agent_rejects_when_message_and_items_are_both_set() {
let (session, turn) = make_session_and_context().await;
let invocation = invocation(
Arc::new(session),
Arc::new(turn),
"spawn_agent",
function_payload(json!({
"message": "hello",
"items": [{"type": "mention", "name": "drive", "path": "app://drive"}]
})),
);
let Err(err) = CollabHandler.handle(invocation).await else {
panic!("message+items should be rejected");
};
assert_eq!(
err,
FunctionCallError::RespondToModel(
"Provide either message or items, but not both".to_string()
)
);
}
#[tokio::test]
async fn spawn_agent_errors_when_manager_dropped() {
let (session, turn) = make_session_and_context().await;
@ -981,6 +1050,30 @@ mod tests {
);
}
#[tokio::test]
async fn send_input_rejects_when_message_and_items_are_both_set() {
let (session, turn) = make_session_and_context().await;
let invocation = invocation(
Arc::new(session),
Arc::new(turn),
"send_input",
function_payload(json!({
"id": ThreadId::new().to_string(),
"message": "hello",
"items": [{"type": "mention", "name": "drive", "path": "app://drive"}]
})),
);
let Err(err) = CollabHandler.handle(invocation).await else {
panic!("message+items should be rejected");
};
assert_eq!(
err,
FunctionCallError::RespondToModel(
"Provide either message or items, but not both".to_string()
)
);
}
#[tokio::test]
async fn send_input_rejects_invalid_id() {
let (session, turn) = make_session_and_context().await;
@ -1059,6 +1152,57 @@ mod tests {
.expect("shutdown should submit");
}
#[tokio::test]
async fn send_input_accepts_structured_items() {
let (mut session, turn) = make_session_and_context().await;
let manager = thread_manager();
session.services.agent_control = manager.agent_control();
let config = turn.config.as_ref().clone();
let thread = manager.start_thread(config).await.expect("start thread");
let agent_id = thread.thread_id;
let invocation = invocation(
Arc::new(session),
Arc::new(turn),
"send_input",
function_payload(json!({
"id": agent_id.to_string(),
"items": [
{"type": "mention", "name": "drive", "path": "app://google_drive"},
{"type": "text", "text": "read the folder"}
]
})),
);
CollabHandler
.handle(invocation)
.await
.expect("send_input should succeed");
let expected = Op::UserInput {
items: vec![
UserInput::Mention {
name: "drive".to_string(),
path: "app://google_drive".to_string(),
},
UserInput::Text {
text: "read the folder".to_string(),
text_elements: Vec::new(),
},
],
final_output_json_schema: None,
};
let captured = manager
.captured_ops()
.into_iter()
.find(|(id, op)| *id == agent_id && *op == expected);
assert_eq!(captured, Some((agent_id, expected)));
let _ = thread
.thread
.submit(Op::Shutdown {})
.await
.expect("shutdown should submit");
}
#[tokio::test]
async fn resume_agent_rejects_invalid_id() {
let (session, turn) = make_session_and_context().await;

View file

@ -453,26 +453,80 @@ fn create_view_image_tool() -> ToolSpec {
})
}
fn create_collab_input_items_schema() -> JsonSchema {
let properties = BTreeMap::from([
(
"type".to_string(),
JsonSchema::String {
description: Some(
"Input item type: text, image, local_image, skill, or mention.".to_string(),
),
},
),
(
"text".to_string(),
JsonSchema::String {
description: Some("Text content when type is text.".to_string()),
},
),
(
"image_url".to_string(),
JsonSchema::String {
description: Some("Image URL when type is image.".to_string()),
},
),
(
"path".to_string(),
JsonSchema::String {
description: Some(
"Path when type is local_image/skill, or mention target such as app://<connector-id> when type is mention."
.to_string(),
),
},
),
(
"name".to_string(),
JsonSchema::String {
description: Some("Display name when type is skill or mention.".to_string()),
},
),
]);
JsonSchema::Array {
items: Box::new(JsonSchema::Object {
properties,
required: None,
additional_properties: Some(false.into()),
}),
description: Some(
"Structured input items. Use this to pass explicit mentions (for example app:// connector paths)."
.to_string(),
),
}
}
fn create_spawn_agent_tool() -> ToolSpec {
let mut properties = BTreeMap::new();
properties.insert(
"message".to_string(),
JsonSchema::String {
description: Some(
"Initial task for the new agent. Include scope, constraints, and the expected output."
.to_string(),
),
},
);
properties.insert(
"agent_type".to_string(),
JsonSchema::String {
description: Some(format!(
"Optional agent type ({}). Use an explicit type when delegating.",
AgentRole::enum_values().join(", ")
)),
},
);
let properties = BTreeMap::from([
(
"message".to_string(),
JsonSchema::String {
description: Some(
"Initial plain-text task for the new agent. Use either message or items."
.to_string(),
),
},
),
("items".to_string(), create_collab_input_items_schema()),
(
"agent_type".to_string(),
JsonSchema::String {
description: Some(format!(
"Optional agent type ({}). Use an explicit type when delegating.",
AgentRole::enum_values().join(", ")
)),
},
),
]);
ToolSpec::Function(ResponsesApiTool {
name: "spawn_agent".to_string(),
@ -482,35 +536,40 @@ fn create_spawn_agent_tool() -> ToolSpec {
strict: false,
parameters: JsonSchema::Object {
properties,
required: Some(vec!["message".to_string()]),
required: None,
additional_properties: Some(false.into()),
},
})
}
fn create_send_input_tool() -> ToolSpec {
let mut properties = BTreeMap::new();
properties.insert(
"id".to_string(),
JsonSchema::String {
description: Some("Agent id to message (from spawn_agent).".to_string()),
},
);
properties.insert(
"message".to_string(),
JsonSchema::String {
description: Some("Message to send to the agent.".to_string()),
},
);
properties.insert(
"interrupt".to_string(),
JsonSchema::Boolean {
description: Some(
"When true, stop the agent's current task and handle this immediately. When false (default), queue this message."
.to_string(),
),
},
);
let properties = BTreeMap::from([
(
"id".to_string(),
JsonSchema::String {
description: Some("Agent id to message (from spawn_agent).".to_string()),
},
),
(
"message".to_string(),
JsonSchema::String {
description: Some(
"Legacy plain-text message to send to the agent. Use either message or items."
.to_string(),
),
},
),
("items".to_string(), create_collab_input_items_schema()),
(
"interrupt".to_string(),
JsonSchema::Boolean {
description: Some(
"When true, stop the agent's current task and handle this immediately. When false (default), queue this message."
.to_string(),
),
},
),
]);
ToolSpec::Function(ResponsesApiTool {
name: "send_input".to_string(),
@ -520,7 +579,7 @@ fn create_send_input_tool() -> ToolSpec {
strict: false,
parameters: JsonSchema::Object {
properties,
required: Some(vec!["id".to_string(), "message".to_string()]),
required: Some(vec!["id".to_string()]),
additional_properties: Some(false.into()),
},
})
@ -1086,6 +1145,7 @@ fn create_read_mcp_resource_tool() -> ToolSpec {
},
})
}
/// TODO(dylan): deprecate once we get rid of json tool
#[derive(Serialize, Deserialize)]
pub(crate) struct ApplyPatchToolArgs {