From 3b8d79ee11aa70b040d7f0e850fe6cc2f8f24a08 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 13 Jan 2026 13:56:11 +0000 Subject: [PATCH] chore: better error handling on collab tools (#9143) --- codex-rs/core/src/agent/control.rs | 12 +++-- codex-rs/core/src/thread_manager.rs | 10 +++- codex-rs/core/src/tools/handlers/collab.rs | 63 +++++++++++++--------- 3 files changed, 55 insertions(+), 30 deletions(-) diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index ce7e89273..11e8a1745 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -57,7 +57,7 @@ impl AgentControl { prompt: String, ) -> CodexResult { let state = self.upgrade()?; - state + let result = state .send_op( agent_id, Op::UserInput { @@ -65,13 +65,19 @@ impl AgentControl { final_output_json_schema: None, }, ) - .await + .await; + if matches!(result, Err(CodexErr::InternalAgentDied)) { + let _ = state.remove_thread(&agent_id).await; + } + result } /// Submit a shutdown request to an existing agent thread. pub(crate) async fn shutdown_agent(&self, agent_id: ThreadId) -> CodexResult { let state = self.upgrade()?; - state.send_op(agent_id, Op::Shutdown {}).await + let result = state.send_op(agent_id, Op::Shutdown {}).await; + let _ = state.remove_thread(&agent_id).await; + result } #[allow(dead_code)] // Will be used for collab tools. diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 7943c23d9..0a124f29c 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -249,6 +249,7 @@ impl ThreadManager { } impl ThreadManagerState { + /// Fetch a thread by ID or return ThreadNotFound. pub(crate) async fn get_thread(&self, thread_id: ThreadId) -> CodexResult> { let threads = self.threads.read().await; threads @@ -257,6 +258,7 @@ impl ThreadManagerState { .ok_or_else(|| CodexErr::ThreadNotFound(thread_id)) } + /// Send an operation to a thread by ID. pub(crate) async fn send_op(&self, thread_id: ThreadId, op: Op) -> CodexResult { let thread = self.get_thread(thread_id).await?; #[cfg(any(test, feature = "test-support"))] @@ -268,7 +270,12 @@ impl ThreadManagerState { thread.submit(op).await } - #[allow(dead_code)] // Used by upcoming multi-agent tooling. + /// Remove a thread from the manager by ID, returning it when present. + pub(crate) async fn remove_thread(&self, thread_id: &ThreadId) -> Option> { + self.threads.write().await.remove(thread_id) + } + + /// Spawn a new thread with no history using a provided config. pub(crate) async fn spawn_new_thread( &self, config: Config, @@ -283,6 +290,7 @@ impl ThreadManagerState { .await } + /// Spawn a new thread with optional history and register it with the manager. pub(crate) async fn spawn_thread( &self, config: Config, diff --git a/codex-rs/core/src/tools/handlers/collab.rs b/codex-rs/core/src/tools/handlers/collab.rs index 0f74ef658..5d5d4c619 100644 --- a/codex-rs/core/src/tools/handlers/collab.rs +++ b/codex-rs/core/src/tools/handlers/collab.rs @@ -91,7 +91,7 @@ mod spawn { .agent_control .spawn_agent(config, args.message, true) .await - .map_err(|err| FunctionCallError::Fatal(err.to_string()))?; + .map_err(collab_spawn_error)?; Ok(ToolOutput::Function { content: format!("agent_id: {result}"), @@ -123,17 +123,13 @@ mod send_input { "Empty message can't be send to an agent".to_string(), )); } + let agent_id_for_err = agent_id; let content = session .services .agent_control .send_prompt(agent_id, args.message) .await - .map_err(|err| match err { - CodexErr::ThreadNotFound(id) => { - FunctionCallError::RespondToModel(format!("agent with id {id} not found")) - } - err => FunctionCallError::Fatal(err.to_string()), - })?; + .map_err(|err| collab_agent_error(agent_id_for_err, err))?; Ok(ToolOutput::Function { content, @@ -182,17 +178,13 @@ mod wait { ms => ms.min(MAX_WAIT_TIMEOUT_MS), }; + let agent_id_for_err = agent_id; let mut status_rx = session .services .agent_control .subscribe_status(agent_id) .await - .map_err(|err| match err { - CodexErr::ThreadNotFound(id) => { - FunctionCallError::RespondToModel(format!("agent with id {id} not found")) - } - err => FunctionCallError::Fatal(err.to_string()), - })?; + .map_err(|err| collab_agent_error(agent_id_for_err, err))?; // Get last known status. let mut status = status_rx.borrow_and_update().clone(); @@ -254,31 +246,23 @@ pub mod close_agent { ) -> Result { let args: CloseAgentArgs = parse_arguments(&arguments)?; let agent_id = agent_id(&args.id)?; + let agent_id_for_err = agent_id; let mut status_rx = session .services .agent_control .subscribe_status(agent_id) .await - .map_err(|err| match err { - CodexErr::ThreadNotFound(id) => { - FunctionCallError::RespondToModel(format!("agent with id {id} not found")) - } - err => FunctionCallError::Fatal(err.to_string()), - })?; + .map_err(|err| collab_agent_error(agent_id_for_err, err))?; let status = status_rx.borrow_and_update().clone(); if !matches!(status, AgentStatus::Shutdown) { + let agent_id_for_err = agent_id; let _ = session .services .agent_control .shutdown_agent(agent_id) .await - .map_err(|err| match err { - CodexErr::ThreadNotFound(id) => { - FunctionCallError::RespondToModel(format!("agent with id {id} not found")) - } - err => FunctionCallError::Fatal(err.to_string()), - })?; + .map_err(|err| collab_agent_error(agent_id_for_err, err))?; } let content = serde_json::to_string(&CloseAgentResult { status }).map_err(|err| { @@ -298,6 +282,30 @@ fn agent_id(id: &str) -> Result { .map_err(|e| FunctionCallError::RespondToModel(format!("invalid agent id {id}: {e:?}"))) } +fn collab_spawn_error(err: CodexErr) -> FunctionCallError { + match err { + CodexErr::UnsupportedOperation(_) => { + FunctionCallError::RespondToModel("collab manager unavailable".to_string()) + } + err => FunctionCallError::RespondToModel(format!("collab spawn failed: {err}")), + } +} + +fn collab_agent_error(agent_id: ThreadId, err: CodexErr) -> FunctionCallError { + match err { + CodexErr::ThreadNotFound(id) => { + FunctionCallError::RespondToModel(format!("agent with id {id} not found")) + } + CodexErr::InternalAgentDied => { + FunctionCallError::RespondToModel(format!("agent with id {agent_id} is closed")) + } + CodexErr::UnsupportedOperation(_) => { + FunctionCallError::RespondToModel("collab manager unavailable".to_string()) + } + err => FunctionCallError::RespondToModel(format!("collab tool failed: {err}")), + } +} + fn build_agent_spawn_config(turn: &TurnContext) -> Result { let base_config = turn.client.config(); let mut config = (*base_config).clone(); @@ -452,7 +460,7 @@ mod tests { }; assert_eq!( err, - FunctionCallError::Fatal("unsupported operation: thread manager dropped".to_string()) + FunctionCallError::RespondToModel("collab manager unavailable".to_string()) ); } @@ -664,6 +672,9 @@ mod tests { .iter() .any(|(id, op)| *id == agent_id && matches!(op, Op::Shutdown)); assert_eq!(submitted_shutdown, true); + + let status_after = manager.agent_control().get_status(agent_id).await; + assert_eq!(status_after, AgentStatus::NotFound); } #[tokio::test]