Keep things simple and build a full Responses API request request right in the model client
126 lines
3.9 KiB
Rust
126 lines
3.9 KiB
Rust
use crate::auth::AuthProvider;
|
|
use crate::common::ResponseStream;
|
|
use crate::common::ResponsesApiRequest;
|
|
use crate::endpoint::session::EndpointSession;
|
|
use crate::error::ApiError;
|
|
use crate::provider::Provider;
|
|
use crate::requests::headers::build_conversation_headers;
|
|
use crate::requests::headers::insert_header;
|
|
use crate::requests::headers::subagent_header;
|
|
use crate::requests::responses::Compression;
|
|
use crate::requests::responses::attach_item_ids;
|
|
use crate::sse::spawn_response_stream;
|
|
use crate::telemetry::SseTelemetry;
|
|
use codex_client::HttpTransport;
|
|
use codex_client::RequestCompression;
|
|
use codex_client::RequestTelemetry;
|
|
use codex_protocol::protocol::SessionSource;
|
|
use http::HeaderMap;
|
|
use http::HeaderValue;
|
|
use http::Method;
|
|
use serde_json::Value;
|
|
use std::sync::Arc;
|
|
use std::sync::OnceLock;
|
|
|
|
pub struct ResponsesClient<T: HttpTransport, A: AuthProvider> {
|
|
session: EndpointSession<T, A>,
|
|
sse_telemetry: Option<Arc<dyn SseTelemetry>>,
|
|
}
|
|
|
|
#[derive(Default)]
|
|
pub struct ResponsesOptions {
|
|
pub conversation_id: Option<String>,
|
|
pub session_source: Option<SessionSource>,
|
|
pub extra_headers: HeaderMap,
|
|
pub compression: Compression,
|
|
pub turn_state: Option<Arc<OnceLock<String>>>,
|
|
}
|
|
|
|
impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
|
|
pub fn new(transport: T, provider: Provider, auth: A) -> Self {
|
|
Self {
|
|
session: EndpointSession::new(transport, provider, auth),
|
|
sse_telemetry: None,
|
|
}
|
|
}
|
|
|
|
pub fn with_telemetry(
|
|
self,
|
|
request: Option<Arc<dyn RequestTelemetry>>,
|
|
sse: Option<Arc<dyn SseTelemetry>>,
|
|
) -> Self {
|
|
Self {
|
|
session: self.session.with_request_telemetry(request),
|
|
sse_telemetry: sse,
|
|
}
|
|
}
|
|
|
|
pub async fn stream_request(
|
|
&self,
|
|
request: ResponsesApiRequest,
|
|
options: ResponsesOptions,
|
|
) -> Result<ResponseStream, ApiError> {
|
|
let ResponsesOptions {
|
|
conversation_id,
|
|
session_source,
|
|
extra_headers,
|
|
compression,
|
|
turn_state,
|
|
} = options;
|
|
|
|
let mut body = serde_json::to_value(&request)
|
|
.map_err(|e| ApiError::Stream(format!("failed to encode responses request: {e}")))?;
|
|
if request.store && self.session.provider().is_azure_responses_endpoint() {
|
|
attach_item_ids(&mut body, &request.input);
|
|
}
|
|
|
|
let mut headers = extra_headers;
|
|
headers.extend(build_conversation_headers(conversation_id));
|
|
if let Some(subagent) = subagent_header(&session_source) {
|
|
insert_header(&mut headers, "x-openai-subagent", &subagent);
|
|
}
|
|
|
|
self.stream(body, headers, compression, turn_state).await
|
|
}
|
|
|
|
fn path() -> &'static str {
|
|
"responses"
|
|
}
|
|
|
|
pub async fn stream(
|
|
&self,
|
|
body: Value,
|
|
extra_headers: HeaderMap,
|
|
compression: Compression,
|
|
turn_state: Option<Arc<OnceLock<String>>>,
|
|
) -> Result<ResponseStream, ApiError> {
|
|
let request_compression = match compression {
|
|
Compression::None => RequestCompression::None,
|
|
Compression::Zstd => RequestCompression::Zstd,
|
|
};
|
|
|
|
let stream_response = self
|
|
.session
|
|
.stream_with(
|
|
Method::POST,
|
|
Self::path(),
|
|
extra_headers,
|
|
Some(body),
|
|
|req| {
|
|
req.headers.insert(
|
|
http::header::ACCEPT,
|
|
HeaderValue::from_static("text/event-stream"),
|
|
);
|
|
req.compression = request_compression;
|
|
},
|
|
)
|
|
.await?;
|
|
|
|
Ok(spawn_response_stream(
|
|
stream_response,
|
|
self.session.provider().stream_idle_timeout,
|
|
self.sse_telemetry.clone(),
|
|
turn_state,
|
|
))
|
|
}
|
|
}
|