chore: better error handling on collab tools (#9143)

This commit is contained in:
jif-oai 2026-01-13 13:56:11 +00:00 committed by GitHub
parent 3a300d1117
commit 3b8d79ee11
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 55 additions and 30 deletions

View file

@ -57,7 +57,7 @@ impl AgentControl {
prompt: String,
) -> CodexResult<String> {
let state = self.upgrade()?;
state
let result = state
.send_op(
agent_id,
Op::UserInput {
@ -65,13 +65,19 @@ impl AgentControl {
final_output_json_schema: None,
},
)
.await
.await;
if matches!(result, Err(CodexErr::InternalAgentDied)) {
let _ = state.remove_thread(&agent_id).await;
}
result
}
/// Submit a shutdown request to an existing agent thread.
pub(crate) async fn shutdown_agent(&self, agent_id: ThreadId) -> CodexResult<String> {
let state = self.upgrade()?;
state.send_op(agent_id, Op::Shutdown {}).await
let result = state.send_op(agent_id, Op::Shutdown {}).await;
let _ = state.remove_thread(&agent_id).await;
result
}
#[allow(dead_code)] // Will be used for collab tools.

View file

@ -249,6 +249,7 @@ impl ThreadManager {
}
impl ThreadManagerState {
/// Fetch a thread by ID or return ThreadNotFound.
pub(crate) async fn get_thread(&self, thread_id: ThreadId) -> CodexResult<Arc<CodexThread>> {
let threads = self.threads.read().await;
threads
@ -257,6 +258,7 @@ impl ThreadManagerState {
.ok_or_else(|| CodexErr::ThreadNotFound(thread_id))
}
/// Send an operation to a thread by ID.
pub(crate) async fn send_op(&self, thread_id: ThreadId, op: Op) -> CodexResult<String> {
let thread = self.get_thread(thread_id).await?;
#[cfg(any(test, feature = "test-support"))]
@ -268,7 +270,12 @@ impl ThreadManagerState {
thread.submit(op).await
}
#[allow(dead_code)] // Used by upcoming multi-agent tooling.
/// Remove a thread from the manager by ID, returning it when present.
pub(crate) async fn remove_thread(&self, thread_id: &ThreadId) -> Option<Arc<CodexThread>> {
self.threads.write().await.remove(thread_id)
}
/// Spawn a new thread with no history using a provided config.
pub(crate) async fn spawn_new_thread(
&self,
config: Config,
@ -283,6 +290,7 @@ impl ThreadManagerState {
.await
}
/// Spawn a new thread with optional history and register it with the manager.
pub(crate) async fn spawn_thread(
&self,
config: Config,

View file

@ -91,7 +91,7 @@ mod spawn {
.agent_control
.spawn_agent(config, args.message, true)
.await
.map_err(|err| FunctionCallError::Fatal(err.to_string()))?;
.map_err(collab_spawn_error)?;
Ok(ToolOutput::Function {
content: format!("agent_id: {result}"),
@ -123,17 +123,13 @@ mod send_input {
"Empty message can't be send to an agent".to_string(),
));
}
let agent_id_for_err = agent_id;
let content = session
.services
.agent_control
.send_prompt(agent_id, args.message)
.await
.map_err(|err| match err {
CodexErr::ThreadNotFound(id) => {
FunctionCallError::RespondToModel(format!("agent with id {id} not found"))
}
err => FunctionCallError::Fatal(err.to_string()),
})?;
.map_err(|err| collab_agent_error(agent_id_for_err, err))?;
Ok(ToolOutput::Function {
content,
@ -182,17 +178,13 @@ mod wait {
ms => ms.min(MAX_WAIT_TIMEOUT_MS),
};
let agent_id_for_err = agent_id;
let mut status_rx = session
.services
.agent_control
.subscribe_status(agent_id)
.await
.map_err(|err| match err {
CodexErr::ThreadNotFound(id) => {
FunctionCallError::RespondToModel(format!("agent with id {id} not found"))
}
err => FunctionCallError::Fatal(err.to_string()),
})?;
.map_err(|err| collab_agent_error(agent_id_for_err, err))?;
// Get last known status.
let mut status = status_rx.borrow_and_update().clone();
@ -254,31 +246,23 @@ pub mod close_agent {
) -> Result<ToolOutput, FunctionCallError> {
let args: CloseAgentArgs = parse_arguments(&arguments)?;
let agent_id = agent_id(&args.id)?;
let agent_id_for_err = agent_id;
let mut status_rx = session
.services
.agent_control
.subscribe_status(agent_id)
.await
.map_err(|err| match err {
CodexErr::ThreadNotFound(id) => {
FunctionCallError::RespondToModel(format!("agent with id {id} not found"))
}
err => FunctionCallError::Fatal(err.to_string()),
})?;
.map_err(|err| collab_agent_error(agent_id_for_err, err))?;
let status = status_rx.borrow_and_update().clone();
if !matches!(status, AgentStatus::Shutdown) {
let agent_id_for_err = agent_id;
let _ = session
.services
.agent_control
.shutdown_agent(agent_id)
.await
.map_err(|err| match err {
CodexErr::ThreadNotFound(id) => {
FunctionCallError::RespondToModel(format!("agent with id {id} not found"))
}
err => FunctionCallError::Fatal(err.to_string()),
})?;
.map_err(|err| collab_agent_error(agent_id_for_err, err))?;
}
let content = serde_json::to_string(&CloseAgentResult { status }).map_err(|err| {
@ -298,6 +282,30 @@ fn agent_id(id: &str) -> Result<ThreadId, FunctionCallError> {
.map_err(|e| FunctionCallError::RespondToModel(format!("invalid agent id {id}: {e:?}")))
}
fn collab_spawn_error(err: CodexErr) -> FunctionCallError {
match err {
CodexErr::UnsupportedOperation(_) => {
FunctionCallError::RespondToModel("collab manager unavailable".to_string())
}
err => FunctionCallError::RespondToModel(format!("collab spawn failed: {err}")),
}
}
fn collab_agent_error(agent_id: ThreadId, err: CodexErr) -> FunctionCallError {
match err {
CodexErr::ThreadNotFound(id) => {
FunctionCallError::RespondToModel(format!("agent with id {id} not found"))
}
CodexErr::InternalAgentDied => {
FunctionCallError::RespondToModel(format!("agent with id {agent_id} is closed"))
}
CodexErr::UnsupportedOperation(_) => {
FunctionCallError::RespondToModel("collab manager unavailable".to_string())
}
err => FunctionCallError::RespondToModel(format!("collab tool failed: {err}")),
}
}
fn build_agent_spawn_config(turn: &TurnContext) -> Result<Config, FunctionCallError> {
let base_config = turn.client.config();
let mut config = (*base_config).clone();
@ -452,7 +460,7 @@ mod tests {
};
assert_eq!(
err,
FunctionCallError::Fatal("unsupported operation: thread manager dropped".to_string())
FunctionCallError::RespondToModel("collab manager unavailable".to_string())
);
}
@ -664,6 +672,9 @@ mod tests {
.iter()
.any(|(id, op)| *id == agent_id && matches!(op, Op::Shutdown));
assert_eq!(submitted_shutdown, true);
let status_after = manager.agent_control().get_status(agent_id).await;
assert_eq!(status_after, AgentStatus::NotFound);
}
#[tokio::test]