[app-server] feat: add thread_id and turn_id to item and error notifications (#7124)

Add `thread_id` and `turn_id` to `item/started`, `item/completed`, and
`error` notifications. Otherwise the client will have a hard time
knowing which thread & turn (if multiple threads are running in
parallel) a new item/error is for.

Also add `thread_id` to `turn/started` and `turn/completed`.
This commit is contained in:
Owen Lin 2025-11-25 08:05:47 -08:00 committed by GitHub
parent 37d83e075e
commit 157a16cefa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 149 additions and 22 deletions

View file

@ -807,6 +807,8 @@ pub struct TurnError {
#[ts(export_to = "v2/")]
pub struct ErrorNotification {
pub error: TurnError,
pub thread_id: String,
pub turn_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
@ -1118,6 +1120,7 @@ pub struct ThreadStartedNotification {
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct TurnStartedNotification {
pub thread_id: String,
pub turn: Turn,
}
@ -1134,6 +1137,7 @@ pub struct Usage {
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct TurnCompletedNotification {
pub thread_id: String,
pub turn: Turn,
}
@ -1142,6 +1146,8 @@ pub struct TurnCompletedNotification {
#[ts(export_to = "v2/")]
pub struct ItemStartedNotification {
pub item: ThreadItem,
pub thread_id: String,
pub turn_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
@ -1149,6 +1155,8 @@ pub struct ItemStartedNotification {
#[ts(export_to = "v2/")]
pub struct ItemCompletedNotification {
pub item: ThreadItem,
pub thread_id: String,
pub turn_id: String,
}
// Item-specific progress notifications

View file

@ -117,7 +117,11 @@ pub(crate) async fn apply_bespoke_event_handling(
changes: patch_changes.clone(),
status: PatchApplyStatus::InProgress,
};
let notification = ItemStartedNotification { item };
let notification = ItemStartedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_id.clone(),
item,
};
outgoing
.send_server_notification(ServerNotification::ItemStarted(notification))
.await;
@ -200,6 +204,7 @@ pub(crate) async fn apply_bespoke_event_handling(
tokio::spawn(async move {
on_command_execution_request_approval_response(
event_id,
conversation_id,
item_id,
command_string,
cwd,
@ -214,13 +219,23 @@ pub(crate) async fn apply_bespoke_event_handling(
},
// TODO(celia): properly construct McpToolCall TurnItem in core.
EventMsg::McpToolCallBegin(begin_event) => {
let notification = construct_mcp_tool_call_notification(begin_event).await;
let notification = construct_mcp_tool_call_notification(
begin_event,
conversation_id.to_string(),
event_id.clone(),
)
.await;
outgoing
.send_server_notification(ServerNotification::ItemStarted(notification))
.await;
}
EventMsg::McpToolCallEnd(end_event) => {
let notification = construct_mcp_tool_call_end_notification(end_event).await;
let notification = construct_mcp_tool_call_end_notification(
end_event,
conversation_id.to_string(),
event_id.clone(),
)
.await;
outgoing
.send_server_notification(ServerNotification::ItemCompleted(notification))
.await;
@ -287,6 +302,8 @@ pub(crate) async fn apply_bespoke_event_handling(
outgoing
.send_server_notification(ServerNotification::Error(ErrorNotification {
error: turn_error,
thread_id: conversation_id.to_string(),
turn_id: event_id.clone(),
}))
.await;
}
@ -300,11 +317,15 @@ pub(crate) async fn apply_bespoke_event_handling(
outgoing
.send_server_notification(ServerNotification::Error(ErrorNotification {
error: turn_error,
thread_id: conversation_id.to_string(),
turn_id: event_id.clone(),
}))
.await;
}
EventMsg::EnteredReviewMode(review_request) => {
let notification = ItemStartedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_id.clone(),
item: ThreadItem::CodeReview {
id: event_id.clone(),
review: review_request.user_facing_hint,
@ -316,14 +337,22 @@ pub(crate) async fn apply_bespoke_event_handling(
}
EventMsg::ItemStarted(item_started_event) => {
let item: ThreadItem = item_started_event.item.clone().into();
let notification = ItemStartedNotification { item };
let notification = ItemStartedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_id.clone(),
item,
};
outgoing
.send_server_notification(ServerNotification::ItemStarted(notification))
.await;
}
EventMsg::ItemCompleted(item_completed_event) => {
let item: ThreadItem = item_completed_event.item.clone().into();
let notification = ItemCompletedNotification { item };
let notification = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_id.clone(),
item,
};
outgoing
.send_server_notification(ServerNotification::ItemCompleted(notification))
.await;
@ -333,9 +362,12 @@ pub(crate) async fn apply_bespoke_event_handling(
Some(output) => render_review_output_text(&output),
None => REVIEW_FALLBACK_MESSAGE.to_string(),
};
let review_item_id = event_id.clone();
let notification = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_id.clone(),
item: ThreadItem::CodeReview {
id: event_id,
id: review_item_id,
review: review_text,
},
};
@ -359,7 +391,11 @@ pub(crate) async fn apply_bespoke_event_handling(
changes: convert_patch_changes(&patch_begin_event.changes),
status: PatchApplyStatus::InProgress,
};
let notification = ItemStartedNotification { item };
let notification = ItemStartedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_id.clone(),
item,
};
outgoing
.send_server_notification(ServerNotification::ItemStarted(notification))
.await;
@ -381,6 +417,7 @@ pub(crate) async fn apply_bespoke_event_handling(
item_id,
changes,
status,
event_id.clone(),
outgoing.as_ref(),
&turn_summary_store,
)
@ -406,7 +443,11 @@ pub(crate) async fn apply_bespoke_event_handling(
exit_code: None,
duration_ms: None,
};
let notification = ItemStartedNotification { item };
let notification = ItemStartedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_id.clone(),
item,
};
outgoing
.send_server_notification(ServerNotification::ItemStarted(notification))
.await;
@ -463,7 +504,11 @@ pub(crate) async fn apply_bespoke_event_handling(
duration_ms: Some(duration_ms),
};
let notification = ItemCompletedNotification { item };
let notification = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_id.clone(),
item,
};
outgoing
.send_server_notification(ServerNotification::ItemCompleted(notification))
.await;
@ -500,11 +545,13 @@ pub(crate) async fn apply_bespoke_event_handling(
}
async fn emit_turn_completed_with_status(
conversation_id: ConversationId,
event_id: String,
status: TurnStatus,
outgoing: &OutgoingMessageSender,
) {
let notification = TurnCompletedNotification {
thread_id: conversation_id.to_string(),
turn: Turn {
id: event_id,
items: vec![],
@ -521,6 +568,7 @@ async fn complete_file_change_item(
item_id: String,
changes: Vec<FileUpdateChange>,
status: PatchApplyStatus,
turn_id: String,
outgoing: &OutgoingMessageSender,
turn_summary_store: &TurnSummaryStore,
) {
@ -536,13 +584,20 @@ async fn complete_file_change_item(
changes,
status,
};
let notification = ItemCompletedNotification { item };
let notification = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
turn_id,
item,
};
outgoing
.send_server_notification(ServerNotification::ItemCompleted(notification))
.await;
}
#[allow(clippy::too_many_arguments)]
async fn complete_command_execution_item(
conversation_id: ConversationId,
turn_id: String,
item_id: String,
command: String,
cwd: PathBuf,
@ -560,7 +615,11 @@ async fn complete_command_execution_item(
exit_code: None,
duration_ms: None,
};
let notification = ItemCompletedNotification { item };
let notification = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
turn_id,
item,
};
outgoing
.send_server_notification(ServerNotification::ItemCompleted(notification))
.await;
@ -588,7 +647,7 @@ async fn handle_turn_complete(
TurnStatus::Completed
};
emit_turn_completed_with_status(event_id, status, outgoing).await;
emit_turn_completed_with_status(conversation_id, event_id, status, outgoing).await;
}
async fn handle_turn_interrupted(
@ -599,7 +658,8 @@ async fn handle_turn_interrupted(
) {
find_and_remove_turn_summary(conversation_id, turn_summary_store).await;
emit_turn_completed_with_status(event_id, TurnStatus::Interrupted, outgoing).await;
emit_turn_completed_with_status(conversation_id, event_id, TurnStatus::Interrupted, outgoing)
.await;
}
async fn handle_error(
@ -798,6 +858,7 @@ async fn on_file_change_request_approval_response(
item_id,
changes,
status,
event_id.clone(),
outgoing.as_ref(),
&turn_summary_store,
)
@ -818,6 +879,7 @@ async fn on_file_change_request_approval_response(
#[allow(clippy::too_many_arguments)]
async fn on_command_execution_request_approval_response(
event_id: String,
conversation_id: ConversationId,
item_id: String,
command: String,
cwd: PathBuf,
@ -867,6 +929,8 @@ async fn on_command_execution_request_approval_response(
if let Some(status) = completion_status {
complete_command_execution_item(
conversation_id,
event_id.clone(),
item_id.clone(),
command.clone(),
cwd.clone(),
@ -891,6 +955,8 @@ async fn on_command_execution_request_approval_response(
/// similar to handle_mcp_tool_call_begin in exec
async fn construct_mcp_tool_call_notification(
begin_event: McpToolCallBeginEvent,
thread_id: String,
turn_id: String,
) -> ItemStartedNotification {
let item = ThreadItem::McpToolCall {
id: begin_event.call_id,
@ -901,12 +967,18 @@ async fn construct_mcp_tool_call_notification(
result: None,
error: None,
};
ItemStartedNotification { item }
ItemStartedNotification {
thread_id,
turn_id,
item,
}
}
/// simiilar to handle_mcp_tool_call_end in exec
async fn construct_mcp_tool_call_end_notification(
end_event: McpToolCallEndEvent,
thread_id: String,
turn_id: String,
) -> ItemCompletedNotification {
let status = if end_event.is_success() {
McpToolCallStatus::Completed
@ -939,7 +1011,11 @@ async fn construct_mcp_tool_call_end_notification(
result,
error,
};
ItemCompletedNotification { item }
ItemCompletedNotification {
thread_id,
turn_id,
item,
}
}
#[cfg(test)]
@ -1122,9 +1198,18 @@ mod tests {
},
};
let notification = construct_mcp_tool_call_notification(begin_event.clone()).await;
let thread_id = ConversationId::new().to_string();
let turn_id = "turn_1".to_string();
let notification = construct_mcp_tool_call_notification(
begin_event.clone(),
thread_id.clone(),
turn_id.clone(),
)
.await;
let expected = ItemStartedNotification {
thread_id,
turn_id,
item: ThreadItem::McpToolCall {
id: begin_event.call_id,
server: begin_event.invocation.server,
@ -1267,9 +1352,18 @@ mod tests {
},
};
let notification = construct_mcp_tool_call_notification(begin_event.clone()).await;
let thread_id = ConversationId::new().to_string();
let turn_id = "turn_2".to_string();
let notification = construct_mcp_tool_call_notification(
begin_event.clone(),
thread_id.clone(),
turn_id.clone(),
)
.await;
let expected = ItemStartedNotification {
thread_id,
turn_id,
item: ThreadItem::McpToolCall {
id: begin_event.call_id,
server: begin_event.invocation.server,
@ -1308,9 +1402,18 @@ mod tests {
result: Ok(result),
};
let notification = construct_mcp_tool_call_end_notification(end_event.clone()).await;
let thread_id = ConversationId::new().to_string();
let turn_id = "turn_3".to_string();
let notification = construct_mcp_tool_call_end_notification(
end_event.clone(),
thread_id.clone(),
turn_id.clone(),
)
.await;
let expected = ItemCompletedNotification {
thread_id,
turn_id,
item: ThreadItem::McpToolCall {
id: end_event.call_id,
server: end_event.invocation.server,
@ -1341,9 +1444,18 @@ mod tests {
result: Err("boom".to_string()),
};
let notification = construct_mcp_tool_call_end_notification(end_event.clone()).await;
let thread_id = ConversationId::new().to_string();
let turn_id = "turn_4".to_string();
let notification = construct_mcp_tool_call_end_notification(
end_event.clone(),
thread_id.clone(),
turn_id.clone(),
)
.await;
let expected = ItemCompletedNotification {
thread_id,
turn_id,
item: ThreadItem::McpToolCall {
id: end_event.call_id,
server: end_event.invocation.server,

View file

@ -2478,7 +2478,10 @@ impl CodexMessageProcessor {
self.outgoing.send_response(request_id, response).await;
// Emit v2 turn/started notification.
let notif = TurnStartedNotification { turn };
let notif = TurnStartedNotification {
thread_id: params.thread_id,
turn,
};
self.outgoing
.send_server_notification(ServerNotification::TurnStarted(notif))
.await;
@ -2536,7 +2539,7 @@ impl CodexMessageProcessor {
let response = TurnStartResponse { turn: turn.clone() };
self.outgoing.send_response(request_id, response).await;
let notif = TurnStartedNotification { turn };
let notif = TurnStartedNotification { thread_id, turn };
self.outgoing
.send_server_notification(ServerNotification::TurnStarted(notif))
.await;

View file

@ -88,10 +88,11 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> {
// Give the command a brief moment to start.
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let thread_id = thread.id.clone();
// Interrupt the in-progress turn by id (v2 API).
let interrupt_id = mcp
.send_turn_interrupt_request(TurnInterruptParams {
thread_id: thread.id,
thread_id: thread_id.clone(),
turn_id: turn.id,
})
.await?;
@ -112,6 +113,7 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> {
.params
.expect("turn/completed params must be present"),
)?;
assert_eq!(completed.thread_id, thread_id);
assert_eq!(completed.turn.status, TurnStatus::Interrupted);
Ok(())

View file

@ -95,6 +95,7 @@ async fn turn_start_emits_notifications_and_accepts_model_override() -> Result<(
.await??;
let started: TurnStartedNotification =
serde_json::from_value(notif.params.expect("params must be present"))?;
assert_eq!(started.thread_id, thread.id);
assert_eq!(
started.turn.status,
codex_app_server_protocol::TurnStatus::InProgress
@ -138,6 +139,7 @@ async fn turn_start_emits_notifications_and_accepts_model_override() -> Result<(
.params
.expect("turn/completed params must be present"),
)?;
assert_eq!(completed.thread_id, thread.id);
assert_eq!(completed.turn.status, TurnStatus::Completed);
Ok(())