[app-server & core] introduce new codex error code and v2 app-server error events (#6938)
This PR does two things:
1. populate a new `codex_error_code` protocol in error events sent from
core to client;
2. old v1 core events `codex/event/stream_error` and `codex/event/error`
will now both become `error`. We also show codex error code for
turncompleted -> error status.
new events in app server test:
```
< {
< "method": "codex/event/stream_error",
< "params": {
< "conversationId": "019aa34c-0c14-70e0-9706-98520a760d67",
< "id": "0",
< "msg": {
< "codex_error_code": {
< "response_stream_disconnected": {
< "http_status_code": 401
< }
< },
< "message": "Reconnecting... 2/5",
< "type": "stream_error"
< }
< }
< }
{
< "method": "error",
< "params": {
< "error": {
< "codexErrorCode": {
< "responseStreamDisconnected": {
< "httpStatusCode": 401
< }
< },
< "message": "Reconnecting... 2/5"
< }
< }
< }
< {
< "method": "turn/completed",
< "params": {
< "turn": {
< "error": {
< "codexErrorCode": {
< "responseTooManyFailedAttempts": {
< "httpStatusCode": 401
< }
< },
< "message": "exceeded retry limit, last status: 401 Unauthorized, request id: 9a1b495a1a97ed3e-SJC"
< },
< "id": "0",
< "items": [],
< "status": "failed"
< }
< }
< }
```
This commit is contained in:
parent
3f92ad4190
commit
9bce050385
16 changed files with 329 additions and 38 deletions
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
|
|
@ -891,6 +891,7 @@ dependencies = [
|
|||
"serde",
|
||||
"serde_json",
|
||||
"strum_macros 0.27.2",
|
||||
"thiserror 2.0.17",
|
||||
"ts-rs",
|
||||
"uuid",
|
||||
]
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ schemars = { workspace = true }
|
|||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
strum_macros = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
ts-rs = { workspace = true }
|
||||
uuid = { workspace = true, features = ["serde", "v7"] }
|
||||
|
||||
|
|
|
|||
|
|
@ -378,7 +378,7 @@ macro_rules! server_notification_definitions {
|
|||
impl TryFrom<JSONRPCNotification> for ServerNotification {
|
||||
type Error = serde_json::Error;
|
||||
|
||||
fn try_from(value: JSONRPCNotification) -> Result<Self, Self::Error> {
|
||||
fn try_from(value: JSONRPCNotification) -> Result<Self, serde_json::Error> {
|
||||
serde_json::from_value(serde_json::to_value(value)?)
|
||||
}
|
||||
}
|
||||
|
|
@ -487,6 +487,7 @@ pub struct FuzzyFileSearchResponse {
|
|||
|
||||
server_notification_definitions! {
|
||||
/// NEW NOTIFICATIONS
|
||||
Error => "error" (v2::ErrorNotification),
|
||||
ThreadStarted => "thread/started" (v2::ThreadStartedNotification),
|
||||
TurnStarted => "turn/started" (v2::TurnStartedNotification),
|
||||
TurnCompleted => "turn/completed" (v2::TurnCompletedNotification),
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ use codex_protocol::items::AgentMessageContent as CoreAgentMessageContent;
|
|||
use codex_protocol::items::TurnItem as CoreTurnItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::parse_command::ParsedCommand as CoreParsedCommand;
|
||||
use codex_protocol::protocol::CodexErrorInfo as CoreCodexErrorInfo;
|
||||
use codex_protocol::protocol::CreditsSnapshot as CoreCreditsSnapshot;
|
||||
use codex_protocol::protocol::RateLimitSnapshot as CoreRateLimitSnapshot;
|
||||
use codex_protocol::protocol::RateLimitWindow as CoreRateLimitWindow;
|
||||
|
|
@ -20,6 +21,7 @@ use schemars::JsonSchema;
|
|||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value as JsonValue;
|
||||
use thiserror::Error;
|
||||
use ts_rs::TS;
|
||||
|
||||
// Macro to declare a camelCased API v2 enum mirroring a core enum which
|
||||
|
|
@ -47,6 +49,69 @@ macro_rules! v2_enum_from_core {
|
|||
};
|
||||
}
|
||||
|
||||
/// This translation layer make sure that we expose codex error code in camel case.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub enum CodexErrorInfo {
|
||||
ContextWindowExceeded,
|
||||
UsageLimitExceeded,
|
||||
HttpConnectionFailed {
|
||||
#[serde(rename = "httpStatusCode")]
|
||||
#[ts(rename = "httpStatusCode")]
|
||||
http_status_code: Option<u16>,
|
||||
},
|
||||
/// Failed to connect to the response SSE stream.
|
||||
ResponseStreamConnectionFailed {
|
||||
#[serde(rename = "httpStatusCode")]
|
||||
#[ts(rename = "httpStatusCode")]
|
||||
http_status_code: Option<u16>,
|
||||
},
|
||||
InternalServerError,
|
||||
Unauthorized,
|
||||
BadRequest,
|
||||
SandboxError,
|
||||
/// The response SSE stream disconnected in the middle of a turn before completion.
|
||||
ResponseStreamDisconnected {
|
||||
#[serde(rename = "httpStatusCode")]
|
||||
#[ts(rename = "httpStatusCode")]
|
||||
http_status_code: Option<u16>,
|
||||
},
|
||||
/// Reached the retry limit for responses.
|
||||
ResponseTooManyFailedAttempts {
|
||||
#[serde(rename = "httpStatusCode")]
|
||||
#[ts(rename = "httpStatusCode")]
|
||||
http_status_code: Option<u16>,
|
||||
},
|
||||
Other,
|
||||
}
|
||||
|
||||
impl From<CoreCodexErrorInfo> for CodexErrorInfo {
|
||||
fn from(value: CoreCodexErrorInfo) -> Self {
|
||||
match value {
|
||||
CoreCodexErrorInfo::ContextWindowExceeded => CodexErrorInfo::ContextWindowExceeded,
|
||||
CoreCodexErrorInfo::UsageLimitExceeded => CodexErrorInfo::UsageLimitExceeded,
|
||||
CoreCodexErrorInfo::HttpConnectionFailed { http_status_code } => {
|
||||
CodexErrorInfo::HttpConnectionFailed { http_status_code }
|
||||
}
|
||||
CoreCodexErrorInfo::ResponseStreamConnectionFailed { http_status_code } => {
|
||||
CodexErrorInfo::ResponseStreamConnectionFailed { http_status_code }
|
||||
}
|
||||
CoreCodexErrorInfo::InternalServerError => CodexErrorInfo::InternalServerError,
|
||||
CoreCodexErrorInfo::Unauthorized => CodexErrorInfo::Unauthorized,
|
||||
CoreCodexErrorInfo::BadRequest => CodexErrorInfo::BadRequest,
|
||||
CoreCodexErrorInfo::SandboxError => CodexErrorInfo::SandboxError,
|
||||
CoreCodexErrorInfo::ResponseStreamDisconnected { http_status_code } => {
|
||||
CodexErrorInfo::ResponseStreamDisconnected { http_status_code }
|
||||
}
|
||||
CoreCodexErrorInfo::ResponseTooManyFailedAttempts { http_status_code } => {
|
||||
CodexErrorInfo::ResponseTooManyFailedAttempts { http_status_code }
|
||||
}
|
||||
CoreCodexErrorInfo::Other => CodexErrorInfo::Other,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
v2_enum_from_core!(
|
||||
pub enum AskForApproval from codex_protocol::protocol::AskForApproval {
|
||||
UnlessTrusted, OnFailure, OnRequest, Never
|
||||
|
|
@ -544,11 +609,20 @@ pub struct Turn {
|
|||
pub status: TurnStatus,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS, Error)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
#[error("{message}")]
|
||||
pub struct TurnError {
|
||||
pub message: String,
|
||||
pub codex_error_code: Option<CodexErrorInfo>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct TurnError {
|
||||
pub message: String,
|
||||
pub struct ErrorNotification {
|
||||
pub error: TurnError,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
|
|
@ -1091,6 +1165,7 @@ mod tests {
|
|||
use codex_protocol::items::WebSearchItem;
|
||||
use codex_protocol::user_input::UserInput as CoreUserInput;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[test]
|
||||
|
|
@ -1176,4 +1251,20 @@ mod tests {
|
|||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn codex_error_code_serializes_http_status_code_in_camel_case() {
|
||||
let value = CodexErrorInfo::ResponseTooManyFailedAttempts {
|
||||
http_status_code: Some(401),
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
serde_json::to_value(value).unwrap(),
|
||||
json!({
|
||||
"responseTooManyFailedAttempts": {
|
||||
"httpStatusCode": 401
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,11 +8,13 @@ use codex_app_server_protocol::AgentMessageDeltaNotification;
|
|||
use codex_app_server_protocol::ApplyPatchApprovalParams;
|
||||
use codex_app_server_protocol::ApplyPatchApprovalResponse;
|
||||
use codex_app_server_protocol::ApprovalDecision;
|
||||
use codex_app_server_protocol::CodexErrorInfo as V2CodexErrorInfo;
|
||||
use codex_app_server_protocol::CommandAction as V2ParsedCommand;
|
||||
use codex_app_server_protocol::CommandExecutionOutputDeltaNotification;
|
||||
use codex_app_server_protocol::CommandExecutionRequestApprovalParams;
|
||||
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
|
||||
use codex_app_server_protocol::CommandExecutionStatus;
|
||||
use codex_app_server_protocol::ErrorNotification;
|
||||
use codex_app_server_protocol::ExecCommandApprovalParams;
|
||||
use codex_app_server_protocol::ExecCommandApprovalResponse;
|
||||
use codex_app_server_protocol::FileChangeRequestApprovalParams;
|
||||
|
|
@ -260,7 +262,29 @@ pub(crate) async fn apply_bespoke_event_handling(
|
|||
}
|
||||
}
|
||||
EventMsg::Error(ev) => {
|
||||
handle_error(conversation_id, ev.message, &turn_summary_store).await;
|
||||
let turn_error = TurnError {
|
||||
message: ev.message,
|
||||
codex_error_code: ev.codex_error_code.map(V2CodexErrorInfo::from),
|
||||
};
|
||||
handle_error(conversation_id, turn_error.clone(), &turn_summary_store).await;
|
||||
outgoing
|
||||
.send_server_notification(ServerNotification::Error(ErrorNotification {
|
||||
error: turn_error,
|
||||
}))
|
||||
.await;
|
||||
}
|
||||
EventMsg::StreamError(ev) => {
|
||||
// We don't need to update the turn summary store for stream errors as they are intermediate error states for retries,
|
||||
// but we notify the client.
|
||||
let turn_error = TurnError {
|
||||
message: ev.message,
|
||||
codex_error_code: ev.codex_error_code.map(V2CodexErrorInfo::from),
|
||||
};
|
||||
outgoing
|
||||
.send_server_notification(ServerNotification::Error(ErrorNotification {
|
||||
error: turn_error,
|
||||
}))
|
||||
.await;
|
||||
}
|
||||
EventMsg::EnteredReviewMode(review_request) => {
|
||||
let notification = ItemStartedNotification {
|
||||
|
|
@ -508,10 +532,8 @@ async fn handle_turn_complete(
|
|||
) {
|
||||
let turn_summary = find_and_remove_turn_summary(conversation_id, turn_summary_store).await;
|
||||
|
||||
let status = if let Some(message) = turn_summary.last_error_message {
|
||||
TurnStatus::Failed {
|
||||
error: TurnError { message },
|
||||
}
|
||||
let status = if let Some(error) = turn_summary.last_error {
|
||||
TurnStatus::Failed { error }
|
||||
} else {
|
||||
TurnStatus::Completed
|
||||
};
|
||||
|
|
@ -532,11 +554,11 @@ async fn handle_turn_interrupted(
|
|||
|
||||
async fn handle_error(
|
||||
conversation_id: ConversationId,
|
||||
message: String,
|
||||
error: TurnError,
|
||||
turn_summary_store: &TurnSummaryStore,
|
||||
) {
|
||||
let mut map = turn_summary_store.lock().await;
|
||||
map.entry(conversation_id).or_default().last_error_message = Some(message);
|
||||
map.entry(conversation_id).or_default().last_error = Some(error);
|
||||
}
|
||||
|
||||
async fn on_patch_approval_response(
|
||||
|
|
@ -873,10 +895,24 @@ mod tests {
|
|||
let conversation_id = ConversationId::new();
|
||||
let turn_summary_store = new_turn_summary_store();
|
||||
|
||||
handle_error(conversation_id, "boom".to_string(), &turn_summary_store).await;
|
||||
handle_error(
|
||||
conversation_id,
|
||||
TurnError {
|
||||
message: "boom".to_string(),
|
||||
codex_error_code: Some(V2CodexErrorInfo::InternalServerError),
|
||||
},
|
||||
&turn_summary_store,
|
||||
)
|
||||
.await;
|
||||
|
||||
let turn_summary = find_and_remove_turn_summary(conversation_id, &turn_summary_store).await;
|
||||
assert_eq!(turn_summary.last_error_message, Some("boom".to_string()));
|
||||
assert_eq!(
|
||||
turn_summary.last_error,
|
||||
Some(TurnError {
|
||||
message: "boom".to_string(),
|
||||
codex_error_code: Some(V2CodexErrorInfo::InternalServerError),
|
||||
})
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -916,7 +952,15 @@ mod tests {
|
|||
let conversation_id = ConversationId::new();
|
||||
let event_id = "interrupt1".to_string();
|
||||
let turn_summary_store = new_turn_summary_store();
|
||||
handle_error(conversation_id, "oops".to_string(), &turn_summary_store).await;
|
||||
handle_error(
|
||||
conversation_id,
|
||||
TurnError {
|
||||
message: "oops".to_string(),
|
||||
codex_error_code: None,
|
||||
},
|
||||
&turn_summary_store,
|
||||
)
|
||||
.await;
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
|
||||
|
|
@ -948,7 +992,15 @@ mod tests {
|
|||
let conversation_id = ConversationId::new();
|
||||
let event_id = "complete_err1".to_string();
|
||||
let turn_summary_store = new_turn_summary_store();
|
||||
handle_error(conversation_id, "bad".to_string(), &turn_summary_store).await;
|
||||
handle_error(
|
||||
conversation_id,
|
||||
TurnError {
|
||||
message: "bad".to_string(),
|
||||
codex_error_code: Some(V2CodexErrorInfo::Other),
|
||||
},
|
||||
&turn_summary_store,
|
||||
)
|
||||
.await;
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
|
||||
|
|
@ -972,6 +1024,7 @@ mod tests {
|
|||
TurnStatus::Failed {
|
||||
error: TurnError {
|
||||
message: "bad".to_string(),
|
||||
codex_error_code: Some(V2CodexErrorInfo::Other),
|
||||
}
|
||||
}
|
||||
);
|
||||
|
|
@ -1022,7 +1075,15 @@ mod tests {
|
|||
|
||||
// Turn 1 on conversation A
|
||||
let a_turn1 = "a_turn1".to_string();
|
||||
handle_error(conversation_a, "a1".to_string(), &turn_summary_store).await;
|
||||
handle_error(
|
||||
conversation_a,
|
||||
TurnError {
|
||||
message: "a1".to_string(),
|
||||
codex_error_code: Some(V2CodexErrorInfo::BadRequest),
|
||||
},
|
||||
&turn_summary_store,
|
||||
)
|
||||
.await;
|
||||
handle_turn_complete(
|
||||
conversation_a,
|
||||
a_turn1.clone(),
|
||||
|
|
@ -1033,7 +1094,15 @@ mod tests {
|
|||
|
||||
// Turn 1 on conversation B
|
||||
let b_turn1 = "b_turn1".to_string();
|
||||
handle_error(conversation_b, "b1".to_string(), &turn_summary_store).await;
|
||||
handle_error(
|
||||
conversation_b,
|
||||
TurnError {
|
||||
message: "b1".to_string(),
|
||||
codex_error_code: None,
|
||||
},
|
||||
&turn_summary_store,
|
||||
)
|
||||
.await;
|
||||
handle_turn_complete(
|
||||
conversation_b,
|
||||
b_turn1.clone(),
|
||||
|
|
@ -1065,6 +1134,7 @@ mod tests {
|
|||
TurnStatus::Failed {
|
||||
error: TurnError {
|
||||
message: "a1".to_string(),
|
||||
codex_error_code: Some(V2CodexErrorInfo::BadRequest),
|
||||
}
|
||||
}
|
||||
);
|
||||
|
|
@ -1085,6 +1155,7 @@ mod tests {
|
|||
TurnStatus::Failed {
|
||||
error: TurnError {
|
||||
message: "b1".to_string(),
|
||||
codex_error_code: None,
|
||||
}
|
||||
}
|
||||
);
|
||||
|
|
|
|||
|
|
@ -83,6 +83,7 @@ use codex_app_server_protocol::ThreadStartParams;
|
|||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStartedNotification;
|
||||
use codex_app_server_protocol::Turn;
|
||||
use codex_app_server_protocol::TurnError;
|
||||
use codex_app_server_protocol::TurnInterruptParams;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
|
|
@ -162,8 +163,8 @@ pub(crate) type PendingInterrupts = Arc<Mutex<HashMap<ConversationId, PendingInt
|
|||
/// Per-conversation accumulation of the latest states e.g. error message while a turn runs.
|
||||
#[derive(Default, Clone)]
|
||||
pub(crate) struct TurnSummary {
|
||||
pub(crate) last_error_message: Option<String>,
|
||||
pub(crate) file_change_started: HashSet<String>,
|
||||
pub(crate) last_error: Option<TurnError>,
|
||||
}
|
||||
|
||||
pub(crate) type TurnSummaryStore = Arc<Mutex<HashMap<ConversationId, TurnSummary>>>;
|
||||
|
|
|
|||
|
|
@ -79,7 +79,6 @@ use crate::protocol::ApplyPatchApprovalRequestEvent;
|
|||
use crate::protocol::AskForApproval;
|
||||
use crate::protocol::BackgroundEventEvent;
|
||||
use crate::protocol::DeprecationNoticeEvent;
|
||||
use crate::protocol::ErrorEvent;
|
||||
use crate::protocol::Event;
|
||||
use crate::protocol::EventMsg;
|
||||
use crate::protocol::ExecApprovalRequestEvent;
|
||||
|
|
@ -129,6 +128,7 @@ use codex_protocol::models::ContentItem;
|
|||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::CodexErrorInfo;
|
||||
use codex_protocol::protocol::InitialHistory;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use codex_utils_readiness::Readiness;
|
||||
|
|
@ -1189,9 +1189,14 @@ impl Session {
|
|||
&self,
|
||||
turn_context: &TurnContext,
|
||||
message: impl Into<String>,
|
||||
codex_error: CodexErr,
|
||||
) {
|
||||
let codex_error_code = CodexErrorInfo::ResponseStreamDisconnected {
|
||||
http_status_code: codex_error.http_status_code_value(),
|
||||
};
|
||||
let event = EventMsg::StreamError(StreamErrorEvent {
|
||||
message: message.into(),
|
||||
codex_error_code: Some(codex_error_code),
|
||||
});
|
||||
self.send_event(turn_context, event).await;
|
||||
}
|
||||
|
|
@ -1437,6 +1442,7 @@ mod handlers {
|
|||
use crate::tasks::UndoTask;
|
||||
use crate::tasks::UserShellCommandTask;
|
||||
use codex_protocol::custom_prompts::CustomPrompt;
|
||||
use codex_protocol::protocol::CodexErrorInfo;
|
||||
use codex_protocol::protocol::ErrorEvent;
|
||||
use codex_protocol::protocol::Event;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
|
|
@ -1683,6 +1689,7 @@ mod handlers {
|
|||
id: sub_id.clone(),
|
||||
msg: EventMsg::Error(ErrorEvent {
|
||||
message: "Failed to shutdown rollout recorder".to_string(),
|
||||
codex_error_code: Some(CodexErrorInfo::Other),
|
||||
}),
|
||||
};
|
||||
sess.send_event_raw(event).await;
|
||||
|
|
@ -1937,9 +1944,7 @@ pub(crate) async fn run_task(
|
|||
}
|
||||
Err(e) => {
|
||||
info!("Turn error: {e:#}");
|
||||
let event = EventMsg::Error(ErrorEvent {
|
||||
message: e.to_string(),
|
||||
});
|
||||
let event = EventMsg::Error(e.to_error_event(None));
|
||||
sess.send_event(&turn_context, event).await;
|
||||
// let the user continue the conversation
|
||||
break;
|
||||
|
|
@ -2064,6 +2069,7 @@ async fn run_turn(
|
|||
sess.notify_stream_error(
|
||||
&turn_context,
|
||||
format!("Reconnecting... {retries}/{max_retries}"),
|
||||
e,
|
||||
)
|
||||
.await;
|
||||
|
||||
|
|
|
|||
|
|
@ -10,7 +10,6 @@ use crate::error::Result as CodexResult;
|
|||
use crate::features::Feature;
|
||||
use crate::protocol::AgentMessageEvent;
|
||||
use crate::protocol::CompactedItem;
|
||||
use crate::protocol::ErrorEvent;
|
||||
use crate::protocol::EventMsg;
|
||||
use crate::protocol::TaskStartedEvent;
|
||||
use crate::protocol::TurnContextItem;
|
||||
|
|
@ -128,9 +127,7 @@ async fn run_compact_task_inner(
|
|||
continue;
|
||||
}
|
||||
sess.set_total_tokens_full(turn_context.as_ref()).await;
|
||||
let event = EventMsg::Error(ErrorEvent {
|
||||
message: e.to_string(),
|
||||
});
|
||||
let event = EventMsg::Error(e.to_error_event(None));
|
||||
sess.send_event(&turn_context, event).await;
|
||||
return;
|
||||
}
|
||||
|
|
@ -141,14 +138,13 @@ async fn run_compact_task_inner(
|
|||
sess.notify_stream_error(
|
||||
turn_context.as_ref(),
|
||||
format!("Reconnecting... {retries}/{max_retries}"),
|
||||
e,
|
||||
)
|
||||
.await;
|
||||
tokio::time::sleep(delay).await;
|
||||
continue;
|
||||
} else {
|
||||
let event = EventMsg::Error(ErrorEvent {
|
||||
message: e.to_string(),
|
||||
});
|
||||
let event = EventMsg::Error(e.to_error_event(None));
|
||||
sess.send_event(&turn_context, event).await;
|
||||
return;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ use crate::codex::TurnContext;
|
|||
use crate::error::Result as CodexResult;
|
||||
use crate::protocol::AgentMessageEvent;
|
||||
use crate::protocol::CompactedItem;
|
||||
use crate::protocol::ErrorEvent;
|
||||
use crate::protocol::EventMsg;
|
||||
use crate::protocol::RolloutItem;
|
||||
use crate::protocol::TaskStartedEvent;
|
||||
|
|
@ -30,9 +29,9 @@ pub(crate) async fn run_remote_compact_task(sess: Arc<Session>, turn_context: Ar
|
|||
|
||||
async fn run_remote_compact_task_inner(sess: &Arc<Session>, turn_context: &Arc<TurnContext>) {
|
||||
if let Err(err) = run_remote_compact_task_inner_impl(sess, turn_context).await {
|
||||
let event = EventMsg::Error(ErrorEvent {
|
||||
message: format!("Error running remote compact task: {err}"),
|
||||
});
|
||||
let event = EventMsg::Error(
|
||||
err.to_error_event(Some("Error running remote compact task".to_string())),
|
||||
);
|
||||
sess.send_event(turn_context, event).await;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,6 +10,8 @@ use chrono::Local;
|
|||
use chrono::Utc;
|
||||
use codex_async_utils::CancelErr;
|
||||
use codex_protocol::ConversationId;
|
||||
use codex_protocol::protocol::CodexErrorInfo;
|
||||
use codex_protocol::protocol::ErrorEvent;
|
||||
use codex_protocol::protocol::RateLimitSnapshot;
|
||||
use reqwest::StatusCode;
|
||||
use serde_json;
|
||||
|
|
@ -430,6 +432,57 @@ impl CodexErr {
|
|||
pub fn downcast_ref<T: std::any::Any>(&self) -> Option<&T> {
|
||||
(self as &dyn std::any::Any).downcast_ref::<T>()
|
||||
}
|
||||
|
||||
/// Translate core error to client-facing protocol error.
|
||||
pub fn to_codex_protocol_error(&self) -> CodexErrorInfo {
|
||||
match self {
|
||||
CodexErr::ContextWindowExceeded => CodexErrorInfo::ContextWindowExceeded,
|
||||
CodexErr::UsageLimitReached(_)
|
||||
| CodexErr::QuotaExceeded
|
||||
| CodexErr::UsageNotIncluded => CodexErrorInfo::UsageLimitExceeded,
|
||||
CodexErr::RetryLimit(_) => CodexErrorInfo::ResponseTooManyFailedAttempts {
|
||||
http_status_code: self.http_status_code_value(),
|
||||
},
|
||||
CodexErr::ConnectionFailed(_) => CodexErrorInfo::HttpConnectionFailed {
|
||||
http_status_code: self.http_status_code_value(),
|
||||
},
|
||||
CodexErr::ResponseStreamFailed(_) => CodexErrorInfo::ResponseStreamConnectionFailed {
|
||||
http_status_code: self.http_status_code_value(),
|
||||
},
|
||||
CodexErr::RefreshTokenFailed(_) => CodexErrorInfo::Unauthorized,
|
||||
CodexErr::SessionConfiguredNotFirstEvent
|
||||
| CodexErr::InternalServerError
|
||||
| CodexErr::InternalAgentDied => CodexErrorInfo::InternalServerError,
|
||||
CodexErr::UnsupportedOperation(_) | CodexErr::ConversationNotFound(_) => {
|
||||
CodexErrorInfo::BadRequest
|
||||
}
|
||||
CodexErr::Sandbox(_) => CodexErrorInfo::SandboxError,
|
||||
_ => CodexErrorInfo::Other,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_error_event(&self, message_prefix: Option<String>) -> ErrorEvent {
|
||||
let error_message = self.to_string();
|
||||
let message: String = match message_prefix {
|
||||
Some(prefix) => format!("{prefix}: {error_message}"),
|
||||
None => error_message,
|
||||
};
|
||||
ErrorEvent {
|
||||
message,
|
||||
codex_error_code: Some(self.to_codex_protocol_error()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn http_status_code_value(&self) -> Option<u16> {
|
||||
let http_status_code = match self {
|
||||
CodexErr::RetryLimit(err) => Some(err.status),
|
||||
CodexErr::UnexpectedStatus(err) => Some(err.status),
|
||||
CodexErr::ConnectionFailed(err) => err.source.status(),
|
||||
CodexErr::ResponseStreamFailed(err) => err.source.status(),
|
||||
_ => None,
|
||||
};
|
||||
http_status_code.as_ref().map(StatusCode::as_u16)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_error_message_ui(e: &CodexErr) -> String {
|
||||
|
|
@ -478,6 +531,10 @@ mod tests {
|
|||
use chrono::Utc;
|
||||
use codex_protocol::protocol::RateLimitWindow;
|
||||
use pretty_assertions::assert_eq;
|
||||
use reqwest::Response;
|
||||
use reqwest::ResponseBuilderExt;
|
||||
use reqwest::StatusCode;
|
||||
use reqwest::Url;
|
||||
|
||||
fn rate_limit_snapshot() -> RateLimitSnapshot {
|
||||
let primary_reset_at = Utc
|
||||
|
|
@ -573,6 +630,33 @@ mod tests {
|
|||
assert_eq!(get_error_message_ui(&err), "stdout only");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn to_error_event_handles_response_stream_failed() {
|
||||
let response = http::Response::builder()
|
||||
.status(StatusCode::TOO_MANY_REQUESTS)
|
||||
.url(Url::parse("http://example.com").unwrap())
|
||||
.body("")
|
||||
.unwrap();
|
||||
let source = Response::from(response).error_for_status_ref().unwrap_err();
|
||||
let err = CodexErr::ResponseStreamFailed(ResponseStreamFailed {
|
||||
source,
|
||||
request_id: Some("req-123".to_string()),
|
||||
});
|
||||
|
||||
let event = err.to_error_event(Some("prefix".to_string()));
|
||||
|
||||
assert_eq!(
|
||||
event.message,
|
||||
"prefix: Error while reading the server response: HTTP status client error (429 Too Many Requests) for url (http://example.com/), request id: req-123"
|
||||
);
|
||||
assert_eq!(
|
||||
event.codex_error_code,
|
||||
Some(CodexErrorInfo::ResponseStreamConnectionFailed {
|
||||
http_status_code: Some(429)
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sandbox_denied_reports_exit_code_when_no_output_available() {
|
||||
let output = ExecToolCallOutput {
|
||||
|
|
|
|||
|
|
@ -161,7 +161,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
|
|||
fn process_event(&mut self, event: Event) -> CodexStatus {
|
||||
let Event { id: _, msg } = event;
|
||||
match msg {
|
||||
EventMsg::Error(ErrorEvent { message }) => {
|
||||
EventMsg::Error(ErrorEvent { message, .. }) => {
|
||||
let prefix = "ERROR:".style(self.red);
|
||||
ts_msg!(self, "{prefix} {message}");
|
||||
}
|
||||
|
|
@ -221,7 +221,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
|
|||
EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => {
|
||||
ts_msg!(self, "{}", message.style(self.dimmed));
|
||||
}
|
||||
EventMsg::StreamError(StreamErrorEvent { message }) => {
|
||||
EventMsg::StreamError(StreamErrorEvent { message, .. }) => {
|
||||
ts_msg!(self, "{}", message.style(self.dimmed));
|
||||
}
|
||||
EventMsg::TaskStarted(_) => {
|
||||
|
|
|
|||
|
|
@ -47,6 +47,7 @@ use codex_exec::exec_events::WebSearchItem;
|
|||
use codex_protocol::plan_tool::PlanItemArg;
|
||||
use codex_protocol::plan_tool::StepStatus;
|
||||
use codex_protocol::plan_tool::UpdatePlanArgs;
|
||||
use codex_protocol::protocol::CodexErrorInfo;
|
||||
use mcp_types::CallToolResult;
|
||||
use mcp_types::ContentBlock;
|
||||
use mcp_types::TextContent;
|
||||
|
|
@ -539,6 +540,7 @@ fn error_event_produces_error() {
|
|||
"e1",
|
||||
EventMsg::Error(codex_core::protocol::ErrorEvent {
|
||||
message: "boom".to_string(),
|
||||
codex_error_code: Some(CodexErrorInfo::Other),
|
||||
}),
|
||||
));
|
||||
assert_eq!(
|
||||
|
|
@ -578,6 +580,7 @@ fn stream_error_event_produces_error() {
|
|||
"e1",
|
||||
EventMsg::StreamError(codex_core::protocol::StreamErrorEvent {
|
||||
message: "retrying".to_string(),
|
||||
codex_error_code: Some(CodexErrorInfo::Other),
|
||||
}),
|
||||
));
|
||||
assert_eq!(
|
||||
|
|
@ -596,6 +599,7 @@ fn error_followed_by_task_complete_produces_turn_failed() {
|
|||
"e1",
|
||||
EventMsg::Error(ErrorEvent {
|
||||
message: "boom".to_string(),
|
||||
codex_error_code: Some(CodexErrorInfo::Other),
|
||||
}),
|
||||
);
|
||||
assert_eq!(
|
||||
|
|
|
|||
|
|
@ -562,6 +562,35 @@ pub enum EventMsg {
|
|||
ReasoningRawContentDelta(ReasoningRawContentDeltaEvent),
|
||||
}
|
||||
|
||||
/// Codex errors that we expose to clients.
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
#[ts(rename_all = "snake_case")]
|
||||
pub enum CodexErrorInfo {
|
||||
ContextWindowExceeded,
|
||||
UsageLimitExceeded,
|
||||
HttpConnectionFailed {
|
||||
http_status_code: Option<u16>,
|
||||
},
|
||||
/// Failed to connect to the response SSE stream.
|
||||
ResponseStreamConnectionFailed {
|
||||
http_status_code: Option<u16>,
|
||||
},
|
||||
InternalServerError,
|
||||
Unauthorized,
|
||||
BadRequest,
|
||||
SandboxError,
|
||||
/// The response SSE stream disconnected in the middle of a turnbefore completion.
|
||||
ResponseStreamDisconnected {
|
||||
http_status_code: Option<u16>,
|
||||
},
|
||||
/// Reached the retry limit for responses.
|
||||
ResponseTooManyFailedAttempts {
|
||||
http_status_code: Option<u16>,
|
||||
},
|
||||
Other,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)]
|
||||
pub struct RawResponseItemEvent {
|
||||
pub item: ResponseItem,
|
||||
|
|
@ -686,6 +715,8 @@ pub struct ExitedReviewModeEvent {
|
|||
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
|
||||
pub struct ErrorEvent {
|
||||
pub message: String,
|
||||
#[serde(default)]
|
||||
pub codex_error_code: Option<CodexErrorInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
|
||||
|
|
@ -1363,6 +1394,8 @@ pub struct UndoCompletedEvent {
|
|||
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
|
||||
pub struct StreamErrorEvent {
|
||||
pub message: String,
|
||||
#[serde(default)]
|
||||
pub codex_error_code: Option<CodexErrorInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
|
||||
|
|
|
|||
|
|
@ -1664,7 +1664,7 @@ impl ChatWidget {
|
|||
self.on_rate_limit_snapshot(ev.rate_limits);
|
||||
}
|
||||
EventMsg::Warning(WarningEvent { message }) => self.on_warning(message),
|
||||
EventMsg::Error(ErrorEvent { message }) => self.on_error(message),
|
||||
EventMsg::Error(ErrorEvent { message, .. }) => self.on_error(message),
|
||||
EventMsg::McpStartupUpdate(ev) => self.on_mcp_startup_update(ev),
|
||||
EventMsg::McpStartupComplete(ev) => self.on_mcp_startup_complete(ev),
|
||||
EventMsg::TurnAborted(ev) => match ev.reason {
|
||||
|
|
@ -1707,7 +1707,9 @@ impl ChatWidget {
|
|||
}
|
||||
EventMsg::UndoStarted(ev) => self.on_undo_started(ev),
|
||||
EventMsg::UndoCompleted(ev) => self.on_undo_completed(ev),
|
||||
EventMsg::StreamError(StreamErrorEvent { message }) => self.on_stream_error(message),
|
||||
EventMsg::StreamError(StreamErrorEvent { message, .. }) => {
|
||||
self.on_stream_error(message)
|
||||
}
|
||||
EventMsg::UserMessage(ev) => {
|
||||
if from_replay {
|
||||
self.on_user_message_event(ev);
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ use codex_core::CodexConversation;
|
|||
use codex_core::ConversationManager;
|
||||
use codex_core::NewConversation;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::protocol::ErrorEvent;
|
||||
use codex_core::protocol::Event;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::Op;
|
||||
|
|
@ -37,7 +36,7 @@ pub(crate) fn spawn_agent(
|
|||
eprintln!("{message}");
|
||||
app_event_tx_clone.send(AppEvent::CodexEvent(Event {
|
||||
id: "".to_string(),
|
||||
msg: EventMsg::Error(ErrorEvent { message }),
|
||||
msg: EventMsg::Error(err.to_error_event(None)),
|
||||
}));
|
||||
app_event_tx_clone.send(AppEvent::ExitRequest);
|
||||
tracing::error!("failed to initialize codex: {err}");
|
||||
|
|
|
|||
|
|
@ -50,6 +50,7 @@ use codex_protocol::parse_command::ParsedCommand;
|
|||
use codex_protocol::plan_tool::PlanItemArg;
|
||||
use codex_protocol::plan_tool::StepStatus;
|
||||
use codex_protocol::plan_tool::UpdatePlanArgs;
|
||||
use codex_protocol::protocol::CodexErrorInfo;
|
||||
use crossterm::event::KeyCode;
|
||||
use crossterm::event::KeyEvent;
|
||||
use crossterm::event::KeyModifiers;
|
||||
|
|
@ -2647,6 +2648,7 @@ fn stream_error_updates_status_indicator() {
|
|||
id: "sub-1".into(),
|
||||
msg: EventMsg::StreamError(StreamErrorEvent {
|
||||
message: msg.to_string(),
|
||||
codex_error_code: Some(CodexErrorInfo::Other),
|
||||
}),
|
||||
});
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue