core-agent-ide/codex-rs/codex-client/src/transport.rs

123 lines
3.5 KiB
Rust

use crate::default_client::CodexHttpClient;
use crate::default_client::CodexRequestBuilder;
use crate::error::TransportError;
use crate::request::Request;
use crate::request::Response;
use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
use futures::stream::BoxStream;
use http::HeaderMap;
use http::Method;
use http::StatusCode;
use tracing::Level;
use tracing::enabled;
use tracing::trace;
pub type ByteStream = BoxStream<'static, Result<Bytes, TransportError>>;
pub struct StreamResponse {
pub status: StatusCode,
pub headers: HeaderMap,
pub bytes: ByteStream,
}
#[async_trait]
pub trait HttpTransport: Send + Sync {
async fn execute(&self, req: Request) -> Result<Response, TransportError>;
async fn stream(&self, req: Request) -> Result<StreamResponse, TransportError>;
}
#[derive(Clone, Debug)]
pub struct ReqwestTransport {
client: CodexHttpClient,
}
impl ReqwestTransport {
pub fn new(client: reqwest::Client) -> Self {
Self {
client: CodexHttpClient::new(client),
}
}
fn build(&self, req: Request) -> Result<CodexRequestBuilder, TransportError> {
let mut builder = self
.client
.request(
Method::from_bytes(req.method.as_str().as_bytes()).unwrap_or(Method::GET),
&req.url,
)
.headers(req.headers);
if let Some(timeout) = req.timeout {
builder = builder.timeout(timeout);
}
if let Some(body) = req.body {
builder = builder.json(&body);
}
Ok(builder)
}
fn map_error(err: reqwest::Error) -> TransportError {
if err.is_timeout() {
TransportError::Timeout
} else {
TransportError::Network(err.to_string())
}
}
}
#[async_trait]
impl HttpTransport for ReqwestTransport {
async fn execute(&self, req: Request) -> Result<Response, TransportError> {
let builder = self.build(req)?;
let resp = builder.send().await.map_err(Self::map_error)?;
let status = resp.status();
let headers = resp.headers().clone();
let bytes = resp.bytes().await.map_err(Self::map_error)?;
if !status.is_success() {
let body = String::from_utf8(bytes.to_vec()).ok();
return Err(TransportError::Http {
status,
headers: Some(headers),
body,
});
}
Ok(Response {
status,
headers,
body: bytes,
})
}
async fn stream(&self, req: Request) -> Result<StreamResponse, TransportError> {
if enabled!(Level::TRACE) {
trace!(
"{} to {}: {}",
req.method,
req.url,
req.body.as_ref().unwrap_or_default()
);
}
let builder = self.build(req)?;
let resp = builder.send().await.map_err(Self::map_error)?;
let status = resp.status();
let headers = resp.headers().clone();
if !status.is_success() {
let body = resp.text().await.ok();
return Err(TransportError::Http {
status,
headers: Some(headers),
body,
});
}
let stream = resp
.bytes_stream()
.map(|result| result.map_err(Self::map_error));
Ok(StreamResponse {
status,
headers,
bytes: Box::pin(stream),
})
}
}