Simplify error managment in run_turn (#8849)
This commit is contained in:
parent
187924d761
commit
a9b5e8a136
2 changed files with 69 additions and 40 deletions
|
|
@ -2495,7 +2495,7 @@ async fn run_turn(
|
|||
|
||||
let mut retries = 0;
|
||||
loop {
|
||||
match try_run_turn(
|
||||
let err = match try_run_turn(
|
||||
Arc::clone(&router),
|
||||
Arc::clone(&sess),
|
||||
Arc::clone(&turn_context),
|
||||
|
|
@ -2505,17 +2505,10 @@ async fn run_turn(
|
|||
)
|
||||
.await
|
||||
{
|
||||
// todo(aibrahim): map special cases and ? on other errors
|
||||
Ok(output) => return Ok(output),
|
||||
Err(CodexErr::TurnAborted) => {
|
||||
return Err(CodexErr::TurnAborted);
|
||||
}
|
||||
Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted),
|
||||
Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)),
|
||||
Err(e @ CodexErr::Fatal(_)) => return Err(e),
|
||||
Err(e @ CodexErr::ContextWindowExceeded) => {
|
||||
Err(CodexErr::ContextWindowExceeded) => {
|
||||
sess.set_total_tokens_full(&turn_context).await;
|
||||
return Err(e);
|
||||
return Err(CodexErr::ContextWindowExceeded);
|
||||
}
|
||||
Err(CodexErr::UsageLimitReached(e)) => {
|
||||
let rate_limits = e.rate_limits.clone();
|
||||
|
|
@ -2524,39 +2517,38 @@ async fn run_turn(
|
|||
}
|
||||
return Err(CodexErr::UsageLimitReached(e));
|
||||
}
|
||||
Err(CodexErr::UsageNotIncluded) => return Err(CodexErr::UsageNotIncluded),
|
||||
Err(e @ CodexErr::QuotaExceeded) => return Err(e),
|
||||
Err(e @ CodexErr::InvalidImageRequest()) => return Err(e),
|
||||
Err(e @ CodexErr::InvalidRequest(_)) => return Err(e),
|
||||
Err(e @ CodexErr::RefreshTokenFailed(_)) => return Err(e),
|
||||
Err(e) => {
|
||||
// Use the configured provider-specific stream retry budget.
|
||||
let max_retries = turn_context.client.get_provider().stream_max_retries();
|
||||
if retries < max_retries {
|
||||
retries += 1;
|
||||
let delay = match e {
|
||||
CodexErr::Stream(_, Some(delay)) => delay,
|
||||
_ => backoff(retries),
|
||||
};
|
||||
warn!(
|
||||
"stream disconnected - retrying turn ({retries}/{max_retries} in {delay:?})...",
|
||||
);
|
||||
Err(err) => err,
|
||||
};
|
||||
|
||||
// Surface retry information to any UI/front‑end so the
|
||||
// user understands what is happening instead of staring
|
||||
// at a seemingly frozen screen.
|
||||
sess.notify_stream_error(
|
||||
&turn_context,
|
||||
format!("Reconnecting... {retries}/{max_retries}"),
|
||||
e,
|
||||
)
|
||||
.await;
|
||||
if !err.is_retryable() {
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
tokio::time::sleep(delay).await;
|
||||
} else {
|
||||
return Err(e);
|
||||
// Use the configured provider-specific stream retry budget.
|
||||
let max_retries = turn_context.client.get_provider().stream_max_retries();
|
||||
if retries < max_retries {
|
||||
retries += 1;
|
||||
let delay = match &err {
|
||||
CodexErr::Stream(_, requested_delay) => {
|
||||
requested_delay.unwrap_or_else(|| backoff(retries))
|
||||
}
|
||||
}
|
||||
_ => backoff(retries),
|
||||
};
|
||||
warn!("stream disconnected - retrying turn ({retries}/{max_retries} in {delay:?})...",);
|
||||
|
||||
// Surface retry information to any UI/front‑end so the
|
||||
// user understands what is happening instead of staring
|
||||
// at a seemingly frozen screen.
|
||||
sess.notify_stream_error(
|
||||
&turn_context,
|
||||
format!("Reconnecting... {retries}/{max_retries}"),
|
||||
err,
|
||||
)
|
||||
.await;
|
||||
|
||||
tokio::time::sleep(delay).await;
|
||||
} else {
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -181,6 +181,43 @@ impl From<CancelErr> for CodexErr {
|
|||
}
|
||||
}
|
||||
|
||||
impl CodexErr {
|
||||
pub fn is_retryable(&self) -> bool {
|
||||
match self {
|
||||
CodexErr::TurnAborted
|
||||
| CodexErr::Interrupted
|
||||
| CodexErr::EnvVar(_)
|
||||
| CodexErr::Fatal(_)
|
||||
| CodexErr::UsageNotIncluded
|
||||
| CodexErr::QuotaExceeded
|
||||
| CodexErr::InvalidImageRequest()
|
||||
| CodexErr::InvalidRequest(_)
|
||||
| CodexErr::RefreshTokenFailed(_)
|
||||
| CodexErr::UnsupportedOperation(_)
|
||||
| CodexErr::Sandbox(_)
|
||||
| CodexErr::LandlockSandboxExecutableNotProvided
|
||||
| CodexErr::RetryLimit(_)
|
||||
| CodexErr::ContextWindowExceeded
|
||||
| CodexErr::ThreadNotFound(_)
|
||||
| CodexErr::Spawn
|
||||
| CodexErr::SessionConfiguredNotFirstEvent
|
||||
| CodexErr::UsageLimitReached(_) => false,
|
||||
CodexErr::Stream(..)
|
||||
| CodexErr::Timeout
|
||||
| CodexErr::UnexpectedStatus(_)
|
||||
| CodexErr::ResponseStreamFailed(_)
|
||||
| CodexErr::ConnectionFailed(_)
|
||||
| CodexErr::InternalServerError
|
||||
| CodexErr::InternalAgentDied
|
||||
| CodexErr::Io(_)
|
||||
| CodexErr::Json(_)
|
||||
| CodexErr::TokioJoin(_) => true,
|
||||
#[cfg(target_os = "linux")]
|
||||
CodexErr::LandlockRuleset(_) | CodexErr::LandlockPathFd(_) => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectionFailedError {
|
||||
pub source: reqwest::Error,
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue