diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index bee0dd1b2..9334a0764 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -40,7 +40,7 @@ impl AgentControl { pub(crate) async fn spawn_agent( &self, config: crate::config::Config, - prompt: String, + items: Vec, session_source: Option, ) -> CodexResult { let state = self.upgrade()?; @@ -62,7 +62,7 @@ impl AgentControl { // TODO(jif) add helper for drain state.notify_thread_created(new_thread.thread_id); - self.send_prompt(new_thread.thread_id, prompt).await?; + self.send_input(new_thread.thread_id, items).await?; Ok(new_thread.thread_id) } @@ -93,22 +93,18 @@ impl AgentControl { Ok(resumed_thread.thread_id) } - /// Send a `user` prompt to an existing agent thread. - pub(crate) async fn send_prompt( + /// Send rich user input items to an existing agent thread. + pub(crate) async fn send_input( &self, agent_id: ThreadId, - prompt: String, + items: Vec, ) -> CodexResult { let state = self.upgrade()?; let result = state .send_op( agent_id, Op::UserInput { - items: vec![UserInput::Text { - text: prompt, - // Agent control prompts are plain text with no UI text elements. - text_elements: Vec::new(), - }], + items, final_output_json_schema: None, }, ) @@ -202,6 +198,13 @@ mod tests { test_config_with_cli_overrides(Vec::new()).await } + fn text_input(text: &str) -> Vec { + vec![UserInput::Text { + text: text.to_string(), + text_elements: Vec::new(), + }] + } + struct AgentControlHarness { _home: TempDir, config: Config, @@ -237,12 +240,18 @@ mod tests { } #[tokio::test] - async fn send_prompt_errors_when_manager_dropped() { + async fn send_input_errors_when_manager_dropped() { let control = AgentControl::default(); let err = control - .send_prompt(ThreadId::new(), "hello".to_string()) + .send_input( + ThreadId::new(), + vec![UserInput::Text { + text: "hello".to_string(), + text_elements: Vec::new(), + }], + ) .await - .expect_err("send_prompt should fail without a manager"); + .expect_err("send_input should fail without a manager"); assert_eq!( err.to_string(), "unsupported operation: thread manager dropped" @@ -306,7 +315,7 @@ mod tests { let control = AgentControl::default(); let (_home, config) = test_config().await; let err = control - .spawn_agent(config, "hello".to_string(), None) + .spawn_agent(config, text_input("hello"), None) .await .expect_err("spawn_agent should fail without a manager"); assert_eq!( @@ -334,14 +343,20 @@ mod tests { } #[tokio::test] - async fn send_prompt_errors_when_thread_missing() { + async fn send_input_errors_when_thread_missing() { let harness = AgentControlHarness::new().await; let thread_id = ThreadId::new(); let err = harness .control - .send_prompt(thread_id, "hello".to_string()) + .send_input( + thread_id, + vec![UserInput::Text { + text: "hello".to_string(), + text_elements: Vec::new(), + }], + ) .await - .expect_err("send_prompt should fail for missing thread"); + .expect_err("send_input should fail for missing thread"); assert_matches!(err, CodexErr::ThreadNotFound(id) if id == thread_id); } @@ -393,15 +408,21 @@ mod tests { } #[tokio::test] - async fn send_prompt_submits_user_message() { + async fn send_input_submits_user_message() { let harness = AgentControlHarness::new().await; let (thread_id, _thread) = harness.start_thread().await; let submission_id = harness .control - .send_prompt(thread_id, "hello from tests".to_string()) + .send_input( + thread_id, + vec![UserInput::Text { + text: "hello from tests".to_string(), + text_elements: Vec::new(), + }], + ) .await - .expect("send_prompt should succeed"); + .expect("send_input should succeed"); assert!(!submission_id.is_empty()); let expected = ( thread_id, @@ -426,7 +447,7 @@ mod tests { let harness = AgentControlHarness::new().await; let thread_id = harness .control - .spawn_agent(harness.config.clone(), "spawned".to_string(), None) + .spawn_agent(harness.config.clone(), text_input("spawned"), None) .await .expect("spawn_agent should succeed"); let _thread = harness @@ -473,12 +494,12 @@ mod tests { .expect("start thread"); let first_agent_id = control - .spawn_agent(config.clone(), "hello".to_string(), None) + .spawn_agent(config.clone(), text_input("hello"), None) .await .expect("spawn_agent should succeed"); let err = control - .spawn_agent(config, "hello again".to_string(), None) + .spawn_agent(config, text_input("hello again"), None) .await .expect_err("spawn_agent should respect max threads"); let CodexErr::AgentLimitReached { @@ -511,7 +532,7 @@ mod tests { let control = manager.agent_control(); let first_agent_id = control - .spawn_agent(config.clone(), "hello".to_string(), None) + .spawn_agent(config.clone(), text_input("hello"), None) .await .expect("spawn_agent should succeed"); let _ = control @@ -520,7 +541,7 @@ mod tests { .expect("shutdown agent"); let second_agent_id = control - .spawn_agent(config.clone(), "hello again".to_string(), None) + .spawn_agent(config.clone(), text_input("hello again"), None) .await .expect("spawn_agent should succeed after shutdown"); let _ = control @@ -546,12 +567,12 @@ mod tests { let cloned = control.clone(); let first_agent_id = cloned - .spawn_agent(config.clone(), "hello".to_string(), None) + .spawn_agent(config.clone(), text_input("hello"), None) .await .expect("spawn_agent should succeed"); let err = control - .spawn_agent(config, "hello again".to_string(), None) + .spawn_agent(config, text_input("hello again"), None) .await .expect_err("spawn_agent should respect shared guard"); let CodexErr::AgentLimitReached { max_threads } = err else { @@ -581,7 +602,7 @@ mod tests { let control = manager.agent_control(); let resumable_id = control - .spawn_agent(config.clone(), "hello".to_string(), None) + .spawn_agent(config.clone(), text_input("hello"), None) .await .expect("spawn_agent should succeed"); let rollout_path = manager @@ -596,7 +617,7 @@ mod tests { .expect("shutdown resumable thread"); let active_id = control - .spawn_agent(config.clone(), "occupy".to_string(), None) + .spawn_agent(config.clone(), text_input("occupy"), None) .await .expect("spawn_agent should succeed for active slot"); @@ -640,7 +661,7 @@ mod tests { .expect_err("resume should fail for missing rollout path"); let resumed_id = control - .spawn_agent(config, "hello".to_string(), None) + .spawn_agent(config, text_input("hello"), None) .await .expect("spawn should succeed after failed resume"); let _ = control diff --git a/codex-rs/core/src/tools/handlers/collab.rs b/codex-rs/core/src/tools/handlers/collab.rs index 833c3d355..d10410068 100644 --- a/codex-rs/core/src/tools/handlers/collab.rs +++ b/codex-rs/core/src/tools/handlers/collab.rs @@ -28,6 +28,7 @@ use codex_protocol::protocol::CollabWaitingBeginEvent; use codex_protocol::protocol::CollabWaitingEndEvent; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; +use codex_protocol::user_input::UserInput; use serde::Deserialize; use serde::Serialize; @@ -95,7 +96,8 @@ mod spawn { #[derive(Debug, Deserialize)] struct SpawnAgentArgs { - message: String, + message: Option, + items: Option>, agent_type: Option, } @@ -112,12 +114,8 @@ mod spawn { ) -> Result { let args: SpawnAgentArgs = parse_arguments(&arguments)?; let agent_role = args.agent_type.unwrap_or(AgentRole::Default); - let prompt = args.message; - if prompt.trim().is_empty() { - return Err(FunctionCallError::RespondToModel( - "Empty message can't be sent to an agent".to_string(), - )); - } + let input_items = parse_collab_input(args.message, args.items)?; + let prompt = input_preview(&input_items); let session_source = turn.session_source.clone(); let child_depth = next_thread_spawn_depth(&session_source); if exceeds_thread_spawn_depth_limit(child_depth) { @@ -150,7 +148,7 @@ mod spawn { .agent_control .spawn_agent( config, - prompt.clone(), + input_items, Some(thread_spawn_source(session.conversation_id, child_depth)), ) .await @@ -198,7 +196,8 @@ mod send_input { #[derive(Debug, Deserialize)] struct SendInputArgs { id: String, - message: String, + message: Option, + items: Option>, #[serde(default)] interrupt: bool, } @@ -216,12 +215,8 @@ mod send_input { ) -> Result { let args: SendInputArgs = parse_arguments(&arguments)?; let receiver_thread_id = agent_id(&args.id)?; - let prompt = args.message; - if prompt.trim().is_empty() { - return Err(FunctionCallError::RespondToModel( - "Empty message can't be sent to an agent".to_string(), - )); - } + let input_items = parse_collab_input(args.message, args.items)?; + let prompt = input_preview(&input_items); if args.interrupt { session .services @@ -245,7 +240,7 @@ mod send_input { let result = session .services .agent_control - .send_prompt(receiver_thread_id, prompt.clone()) + .send_input(receiver_thread_id, input_items) .await .map_err(|err| collab_agent_error(receiver_thread_id, err)); let status = session @@ -738,6 +733,57 @@ fn thread_spawn_source(parent_thread_id: ThreadId, depth: i32) -> SessionSource }) } +fn parse_collab_input( + message: Option, + items: Option>, +) -> Result, FunctionCallError> { + match (message, items) { + (Some(_), Some(_)) => Err(FunctionCallError::RespondToModel( + "Provide either message or items, but not both".to_string(), + )), + (None, None) => Err(FunctionCallError::RespondToModel( + "Provide one of: message or items".to_string(), + )), + (Some(message), None) => { + if message.trim().is_empty() { + return Err(FunctionCallError::RespondToModel( + "Empty message can't be sent to an agent".to_string(), + )); + } + Ok(vec![UserInput::Text { + text: message, + text_elements: Vec::new(), + }]) + } + (None, Some(items)) => { + if items.is_empty() { + return Err(FunctionCallError::RespondToModel( + "Items can't be empty".to_string(), + )); + } + Ok(items) + } + } +} + +fn input_preview(items: &[UserInput]) -> String { + let parts: Vec = items + .iter() + .map(|item| match item { + UserInput::Text { text, .. } => text.clone(), + UserInput::Image { .. } => "[image]".to_string(), + UserInput::LocalImage { path } => format!("[local_image:{}]", path.display()), + UserInput::Skill { name, path } => { + format!("[skill:${name}]({})", path.display()) + } + UserInput::Mention { name, path } => format!("[mention:${name}]({path})"), + _ => "[input]".to_string(), + }) + .collect(); + + parts.join("\n") +} + fn build_agent_spawn_config( base_instructions: &BaseInstructions, turn: &TurnContext, @@ -915,6 +961,29 @@ mod tests { ); } + #[tokio::test] + async fn spawn_agent_rejects_when_message_and_items_are_both_set() { + let (session, turn) = make_session_and_context().await; + let invocation = invocation( + Arc::new(session), + Arc::new(turn), + "spawn_agent", + function_payload(json!({ + "message": "hello", + "items": [{"type": "mention", "name": "drive", "path": "app://drive"}] + })), + ); + let Err(err) = CollabHandler.handle(invocation).await else { + panic!("message+items should be rejected"); + }; + assert_eq!( + err, + FunctionCallError::RespondToModel( + "Provide either message or items, but not both".to_string() + ) + ); + } + #[tokio::test] async fn spawn_agent_errors_when_manager_dropped() { let (session, turn) = make_session_and_context().await; @@ -981,6 +1050,30 @@ mod tests { ); } + #[tokio::test] + async fn send_input_rejects_when_message_and_items_are_both_set() { + let (session, turn) = make_session_and_context().await; + let invocation = invocation( + Arc::new(session), + Arc::new(turn), + "send_input", + function_payload(json!({ + "id": ThreadId::new().to_string(), + "message": "hello", + "items": [{"type": "mention", "name": "drive", "path": "app://drive"}] + })), + ); + let Err(err) = CollabHandler.handle(invocation).await else { + panic!("message+items should be rejected"); + }; + assert_eq!( + err, + FunctionCallError::RespondToModel( + "Provide either message or items, but not both".to_string() + ) + ); + } + #[tokio::test] async fn send_input_rejects_invalid_id() { let (session, turn) = make_session_and_context().await; @@ -1059,6 +1152,57 @@ mod tests { .expect("shutdown should submit"); } + #[tokio::test] + async fn send_input_accepts_structured_items() { + let (mut session, turn) = make_session_and_context().await; + let manager = thread_manager(); + session.services.agent_control = manager.agent_control(); + let config = turn.config.as_ref().clone(); + let thread = manager.start_thread(config).await.expect("start thread"); + let agent_id = thread.thread_id; + let invocation = invocation( + Arc::new(session), + Arc::new(turn), + "send_input", + function_payload(json!({ + "id": agent_id.to_string(), + "items": [ + {"type": "mention", "name": "drive", "path": "app://google_drive"}, + {"type": "text", "text": "read the folder"} + ] + })), + ); + CollabHandler + .handle(invocation) + .await + .expect("send_input should succeed"); + + let expected = Op::UserInput { + items: vec![ + UserInput::Mention { + name: "drive".to_string(), + path: "app://google_drive".to_string(), + }, + UserInput::Text { + text: "read the folder".to_string(), + text_elements: Vec::new(), + }, + ], + final_output_json_schema: None, + }; + let captured = manager + .captured_ops() + .into_iter() + .find(|(id, op)| *id == agent_id && *op == expected); + assert_eq!(captured, Some((agent_id, expected))); + + let _ = thread + .thread + .submit(Op::Shutdown {}) + .await + .expect("shutdown should submit"); + } + #[tokio::test] async fn resume_agent_rejects_invalid_id() { let (session, turn) = make_session_and_context().await; diff --git a/codex-rs/core/src/tools/spec.rs b/codex-rs/core/src/tools/spec.rs index 693912f8d..d0c5a3c07 100644 --- a/codex-rs/core/src/tools/spec.rs +++ b/codex-rs/core/src/tools/spec.rs @@ -453,26 +453,80 @@ fn create_view_image_tool() -> ToolSpec { }) } +fn create_collab_input_items_schema() -> JsonSchema { + let properties = BTreeMap::from([ + ( + "type".to_string(), + JsonSchema::String { + description: Some( + "Input item type: text, image, local_image, skill, or mention.".to_string(), + ), + }, + ), + ( + "text".to_string(), + JsonSchema::String { + description: Some("Text content when type is text.".to_string()), + }, + ), + ( + "image_url".to_string(), + JsonSchema::String { + description: Some("Image URL when type is image.".to_string()), + }, + ), + ( + "path".to_string(), + JsonSchema::String { + description: Some( + "Path when type is local_image/skill, or mention target such as app:// when type is mention." + .to_string(), + ), + }, + ), + ( + "name".to_string(), + JsonSchema::String { + description: Some("Display name when type is skill or mention.".to_string()), + }, + ), + ]); + + JsonSchema::Array { + items: Box::new(JsonSchema::Object { + properties, + required: None, + additional_properties: Some(false.into()), + }), + description: Some( + "Structured input items. Use this to pass explicit mentions (for example app:// connector paths)." + .to_string(), + ), + } +} + fn create_spawn_agent_tool() -> ToolSpec { - let mut properties = BTreeMap::new(); - properties.insert( - "message".to_string(), - JsonSchema::String { - description: Some( - "Initial task for the new agent. Include scope, constraints, and the expected output." - .to_string(), - ), - }, - ); - properties.insert( - "agent_type".to_string(), - JsonSchema::String { - description: Some(format!( - "Optional agent type ({}). Use an explicit type when delegating.", - AgentRole::enum_values().join(", ") - )), - }, - ); + let properties = BTreeMap::from([ + ( + "message".to_string(), + JsonSchema::String { + description: Some( + "Initial plain-text task for the new agent. Use either message or items." + .to_string(), + ), + }, + ), + ("items".to_string(), create_collab_input_items_schema()), + ( + "agent_type".to_string(), + JsonSchema::String { + description: Some(format!( + "Optional agent type ({}). Use an explicit type when delegating.", + AgentRole::enum_values().join(", ") + )), + }, + ), + ]); ToolSpec::Function(ResponsesApiTool { name: "spawn_agent".to_string(), @@ -482,35 +536,40 @@ fn create_spawn_agent_tool() -> ToolSpec { strict: false, parameters: JsonSchema::Object { properties, - required: Some(vec!["message".to_string()]), + required: None, additional_properties: Some(false.into()), }, }) } fn create_send_input_tool() -> ToolSpec { - let mut properties = BTreeMap::new(); - properties.insert( - "id".to_string(), - JsonSchema::String { - description: Some("Agent id to message (from spawn_agent).".to_string()), - }, - ); - properties.insert( - "message".to_string(), - JsonSchema::String { - description: Some("Message to send to the agent.".to_string()), - }, - ); - properties.insert( - "interrupt".to_string(), - JsonSchema::Boolean { - description: Some( - "When true, stop the agent's current task and handle this immediately. When false (default), queue this message." - .to_string(), - ), - }, - ); + let properties = BTreeMap::from([ + ( + "id".to_string(), + JsonSchema::String { + description: Some("Agent id to message (from spawn_agent).".to_string()), + }, + ), + ( + "message".to_string(), + JsonSchema::String { + description: Some( + "Legacy plain-text message to send to the agent. Use either message or items." + .to_string(), + ), + }, + ), + ("items".to_string(), create_collab_input_items_schema()), + ( + "interrupt".to_string(), + JsonSchema::Boolean { + description: Some( + "When true, stop the agent's current task and handle this immediately. When false (default), queue this message." + .to_string(), + ), + }, + ), + ]); ToolSpec::Function(ResponsesApiTool { name: "send_input".to_string(), @@ -520,7 +579,7 @@ fn create_send_input_tool() -> ToolSpec { strict: false, parameters: JsonSchema::Object { properties, - required: Some(vec!["id".to_string(), "message".to_string()]), + required: Some(vec!["id".to_string()]), additional_properties: Some(false.into()), }, }) @@ -1086,6 +1145,7 @@ fn create_read_mcp_resource_tool() -> ToolSpec { }, }) } + /// TODO(dylan): deprecate once we get rid of json tool #[derive(Serialize, Deserialize)] pub(crate) struct ApplyPatchToolArgs {