From d2394a2494651b5adeddc940b6c309fd407ac1e3 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 3 Feb 2026 11:31:57 +0000 Subject: [PATCH] chore: nuke chat/completions API (#10157) --- .../schema/json/ClientRequest.json | 4 +- .../schema/json/EventMsg.json | 4 +- .../schema/json/ServerNotification.json | 4 +- .../codex_app_server_protocol.schemas.json | 8 +- .../json/v1/ForkConversationResponse.json | 4 +- .../json/v1/ResumeConversationParams.json | 4 +- .../json/v1/ResumeConversationResponse.json | 4 +- .../v1/SessionConfiguredNotification.json | 4 +- .../RawResponseItemCompletedNotification.json | 4 +- .../schema/json/v2/ThreadResumeParams.json | 4 +- .../typescript/FunctionCallOutputPayload.ts | 3 +- codex-rs/codex-api/README.md | 4 +- codex-rs/codex-api/src/common.rs | 2 +- .../src/endpoint/{chat.rs => aggregate.rs} | 159 +--- codex-rs/codex-api/src/endpoint/compact.rs | 35 +- codex-rs/codex-api/src/endpoint/mod.rs | 2 +- codex-rs/codex-api/src/endpoint/responses.rs | 1 - codex-rs/codex-api/src/lib.rs | 5 +- codex-rs/codex-api/src/provider.rs | 3 +- codex-rs/codex-api/src/requests/chat.rs | 494 ------------ codex-rs/codex-api/src/requests/mod.rs | 3 - codex-rs/codex-api/src/sse/chat.rs | 717 ------------------ codex-rs/codex-api/src/sse/mod.rs | 1 - codex-rs/codex-api/tests/clients.rs | 35 +- codex-rs/common/src/oss.rs | 39 +- codex-rs/core/config.schema.json | 13 +- codex-rs/core/src/client.rs | 85 +-- codex-rs/core/src/codex.rs | 30 - codex-rs/core/src/config/mod.rs | 96 ++- codex-rs/core/src/lib.rs | 2 - codex-rs/core/src/model_provider_info.rs | 68 +- codex-rs/core/src/tools/spec.rs | 57 -- .../core/tests/chat_completions_payload.rs | 338 --------- codex-rs/core/tests/chat_completions_sse.rs | 466 ------------ codex-rs/core/tests/suite/cli_stream.rs | 41 +- codex-rs/core/tests/suite/rmcp_client.rs | 194 ----- codex-rs/exec/src/cli.rs | 2 +- codex-rs/exec/src/lib.rs | 18 +- codex-rs/mcp-server/tests/common/lib.rs | 2 +- .../tests/common/mock_model_server.rs | 6 +- codex-rs/mcp-server/tests/common/responses.rs | 97 +-- codex-rs/mcp-server/tests/suite/codex_tool.rs | 29 +- codex-rs/ollama/src/client.rs | 7 +- codex-rs/ollama/src/lib.rs | 62 +- codex-rs/protocol/src/models.rs | 8 +- codex-rs/tui/src/app.rs | 12 - codex-rs/tui/src/cli.rs | 2 +- codex-rs/tui/src/lib.rs | 10 - codex-rs/tui/src/oss_selection.rs | 7 - 49 files changed, 268 insertions(+), 2931 deletions(-) rename codex-rs/codex-api/src/endpoint/{chat.rs => aggregate.rs} (54%) delete mode 100644 codex-rs/codex-api/src/requests/chat.rs delete mode 100644 codex-rs/codex-api/src/sse/chat.rs delete mode 100644 codex-rs/core/tests/chat_completions_payload.rs delete mode 100644 codex-rs/core/tests/chat_completions_sse.rs diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index 6f5ebcdf0..518d9aaf3 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -526,7 +526,7 @@ ] }, "FunctionCallOutputPayload": { - "description": "The payload we send back to OpenAI when reporting a tool call result.\n\n`content` preserves the historical plain-string payload so downstream integrations (tests, logging, etc.) can keep treating tool output as `String`. When an MCP server returns richer data we additionally populate `content_items` with the structured form that the Responses/Chat Completions APIs understand.", + "description": "The payload we send back to OpenAI when reporting a tool call result.\n\n`content` preserves the historical plain-string payload so downstream integrations (tests, logging, etc.) can keep treating tool output as `String`. When an MCP server returns richer data we additionally populate `content_items` with the structured form that the Responses API understands.", "properties": { "content": { "type": "string" @@ -1410,7 +1410,7 @@ ] }, "id": { - "description": "Set when using the chat completions API.", + "description": "Legacy id field retained for compatibility with older payloads.", "type": [ "string", "null" diff --git a/codex-rs/app-server-protocol/schema/json/EventMsg.json b/codex-rs/app-server-protocol/schema/json/EventMsg.json index 3e73bdc8b..976e76adc 100644 --- a/codex-rs/app-server-protocol/schema/json/EventMsg.json +++ b/codex-rs/app-server-protocol/schema/json/EventMsg.json @@ -2857,7 +2857,7 @@ ] }, "FunctionCallOutputPayload": { - "description": "The payload we send back to OpenAI when reporting a tool call result.\n\n`content` preserves the historical plain-string payload so downstream integrations (tests, logging, etc.) can keep treating tool output as `String`. When an MCP server returns richer data we additionally populate `content_items` with the structured form that the Responses/Chat Completions APIs understand.", + "description": "The payload we send back to OpenAI when reporting a tool call result.\n\n`content` preserves the historical plain-string payload so downstream integrations (tests, logging, etc.) can keep treating tool output as `String`. When an MCP server returns richer data we additionally populate `content_items` with the structured form that the Responses API understands.", "properties": { "content": { "type": "string" @@ -3689,7 +3689,7 @@ ] }, "id": { - "description": "Set when using the chat completions API.", + "description": "Legacy id field retained for compatibility with older payloads.", "type": [ "string", "null" diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index 5d9e719d8..4d47fa738 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -3477,7 +3477,7 @@ ] }, "FunctionCallOutputPayload": { - "description": "The payload we send back to OpenAI when reporting a tool call result.\n\n`content` preserves the historical plain-string payload so downstream integrations (tests, logging, etc.) can keep treating tool output as `String`. When an MCP server returns richer data we additionally populate `content_items` with the structured form that the Responses/Chat Completions APIs understand.", + "description": "The payload we send back to OpenAI when reporting a tool call result.\n\n`content` preserves the historical plain-string payload so downstream integrations (tests, logging, etc.) can keep treating tool output as `String`. When an MCP server returns richer data we additionally populate `content_items` with the structured form that the Responses API understands.", "properties": { "content": { "type": "string" @@ -4729,7 +4729,7 @@ ] }, "id": { - "description": "Set when using the chat completions API.", + "description": "Legacy id field retained for compatibility with older payloads.", "type": [ "string", "null" diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 5a5ddd101..282a6ef42 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -4965,7 +4965,7 @@ ] }, "FunctionCallOutputPayload": { - "description": "The payload we send back to OpenAI when reporting a tool call result.\n\n`content` preserves the historical plain-string payload so downstream integrations (tests, logging, etc.) can keep treating tool output as `String`. When an MCP server returns richer data we additionally populate `content_items` with the structured form that the Responses/Chat Completions APIs understand.", + "description": "The payload we send back to OpenAI when reporting a tool call result.\n\n`content` preserves the historical plain-string payload so downstream integrations (tests, logging, etc.) can keep treating tool output as `String`. When an MCP server returns richer data we additionally populate `content_items` with the structured form that the Responses API understands.", "properties": { "content": { "type": "string" @@ -6629,7 +6629,7 @@ ] }, "id": { - "description": "Set when using the chat completions API.", + "description": "Legacy id field retained for compatibility with older payloads.", "type": [ "string", "null" @@ -11151,7 +11151,7 @@ ] }, "FunctionCallOutputPayload": { - "description": "The payload we send back to OpenAI when reporting a tool call result.\n\n`content` preserves the historical plain-string payload so downstream integrations (tests, logging, etc.) can keep treating tool output as `String`. When an MCP server returns richer data we additionally populate `content_items` with the structured form that the Responses/Chat Completions APIs understand.", + "description": "The payload we send back to OpenAI when reporting a tool call result.\n\n`content` preserves the historical plain-string payload so downstream integrations (tests, logging, etc.) can keep treating tool output as `String`. When an MCP server returns richer data we additionally populate `content_items` with the structured form that the Responses API understands.", "properties": { "content": { "type": "string" @@ -12588,7 +12588,7 @@ ] }, "id": { - "description": "Set when using the chat completions API.", + "description": "Legacy id field retained for compatibility with older payloads.", "type": [ "string", "null" diff --git a/codex-rs/app-server-protocol/schema/json/v1/ForkConversationResponse.json b/codex-rs/app-server-protocol/schema/json/v1/ForkConversationResponse.json index ad251a461..58302b4a2 100644 --- a/codex-rs/app-server-protocol/schema/json/v1/ForkConversationResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v1/ForkConversationResponse.json @@ -2857,7 +2857,7 @@ ] }, "FunctionCallOutputPayload": { - "description": "The payload we send back to OpenAI when reporting a tool call result.\n\n`content` preserves the historical plain-string payload so downstream integrations (tests, logging, etc.) can keep treating tool output as `String`. When an MCP server returns richer data we additionally populate `content_items` with the structured form that the Responses/Chat Completions APIs understand.", + "description": "The payload we send back to OpenAI when reporting a tool call result.\n\n`content` preserves the historical plain-string payload so downstream integrations (tests, logging, etc.) can keep treating tool output as `String`. When an MCP server returns richer data we additionally populate `content_items` with the structured form that the Responses API understands.", "properties": { "content": { "type": "string" @@ -3689,7 +3689,7 @@ ] }, "id": { - "description": "Set when using the chat completions API.", + "description": "Legacy id field retained for compatibility with older payloads.", "type": [ "string", "null" diff --git a/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationParams.json b/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationParams.json index 41284b009..1261306e6 100644 --- a/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationParams.json +++ b/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationParams.json @@ -144,7 +144,7 @@ ] }, "FunctionCallOutputPayload": { - "description": "The payload we send back to OpenAI when reporting a tool call result.\n\n`content` preserves the historical plain-string payload so downstream integrations (tests, logging, etc.) can keep treating tool output as `String`. When an MCP server returns richer data we additionally populate `content_items` with the structured form that the Responses/Chat Completions APIs understand.", + "description": "The payload we send back to OpenAI when reporting a tool call result.\n\n`content` preserves the historical plain-string payload so downstream integrations (tests, logging, etc.) can keep treating tool output as `String`. When an MCP server returns richer data we additionally populate `content_items` with the structured form that the Responses API understands.", "properties": { "content": { "type": "string" @@ -530,7 +530,7 @@ ] }, "id": { - "description": "Set when using the chat completions API.", + "description": "Legacy id field retained for compatibility with older payloads.", "type": [ "string", "null" diff --git a/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationResponse.json b/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationResponse.json index abd5beb21..ce13d0715 100644 --- a/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationResponse.json @@ -2857,7 +2857,7 @@ ] }, "FunctionCallOutputPayload": { - "description": "The payload we send back to OpenAI when reporting a tool call result.\n\n`content` preserves the historical plain-string payload so downstream integrations (tests, logging, etc.) can keep treating tool output as `String`. When an MCP server returns richer data we additionally populate `content_items` with the structured form that the Responses/Chat Completions APIs understand.", + "description": "The payload we send back to OpenAI when reporting a tool call result.\n\n`content` preserves the historical plain-string payload so downstream integrations (tests, logging, etc.) can keep treating tool output as `String`. When an MCP server returns richer data we additionally populate `content_items` with the structured form that the Responses API understands.", "properties": { "content": { "type": "string" @@ -3689,7 +3689,7 @@ ] }, "id": { - "description": "Set when using the chat completions API.", + "description": "Legacy id field retained for compatibility with older payloads.", "type": [ "string", "null" diff --git a/codex-rs/app-server-protocol/schema/json/v1/SessionConfiguredNotification.json b/codex-rs/app-server-protocol/schema/json/v1/SessionConfiguredNotification.json index 69c2c4b8b..c77ccc1f9 100644 --- a/codex-rs/app-server-protocol/schema/json/v1/SessionConfiguredNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v1/SessionConfiguredNotification.json @@ -2857,7 +2857,7 @@ ] }, "FunctionCallOutputPayload": { - "description": "The payload we send back to OpenAI when reporting a tool call result.\n\n`content` preserves the historical plain-string payload so downstream integrations (tests, logging, etc.) can keep treating tool output as `String`. When an MCP server returns richer data we additionally populate `content_items` with the structured form that the Responses/Chat Completions APIs understand.", + "description": "The payload we send back to OpenAI when reporting a tool call result.\n\n`content` preserves the historical plain-string payload so downstream integrations (tests, logging, etc.) can keep treating tool output as `String`. When an MCP server returns richer data we additionally populate `content_items` with the structured form that the Responses API understands.", "properties": { "content": { "type": "string" @@ -3689,7 +3689,7 @@ ] }, "id": { - "description": "Set when using the chat completions API.", + "description": "Legacy id field retained for compatibility with older payloads.", "type": [ "string", "null" diff --git a/codex-rs/app-server-protocol/schema/json/v2/RawResponseItemCompletedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/RawResponseItemCompletedNotification.json index aecb8c2b5..1b307c9b8 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/RawResponseItemCompletedNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v2/RawResponseItemCompletedNotification.json @@ -111,7 +111,7 @@ ] }, "FunctionCallOutputPayload": { - "description": "The payload we send back to OpenAI when reporting a tool call result.\n\n`content` preserves the historical plain-string payload so downstream integrations (tests, logging, etc.) can keep treating tool output as `String`. When an MCP server returns richer data we additionally populate `content_items` with the structured form that the Responses/Chat Completions APIs understand.", + "description": "The payload we send back to OpenAI when reporting a tool call result.\n\n`content` preserves the historical plain-string payload so downstream integrations (tests, logging, etc.) can keep treating tool output as `String`. When an MCP server returns richer data we additionally populate `content_items` with the structured form that the Responses API understands.", "properties": { "content": { "type": "string" @@ -417,7 +417,7 @@ ] }, "id": { - "description": "Set when using the chat completions API.", + "description": "Legacy id field retained for compatibility with older payloads.", "type": [ "string", "null" diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json index 52567cec8..cc05de490 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json @@ -120,7 +120,7 @@ ] }, "FunctionCallOutputPayload": { - "description": "The payload we send back to OpenAI when reporting a tool call result.\n\n`content` preserves the historical plain-string payload so downstream integrations (tests, logging, etc.) can keep treating tool output as `String`. When an MCP server returns richer data we additionally populate `content_items` with the structured form that the Responses/Chat Completions APIs understand.", + "description": "The payload we send back to OpenAI when reporting a tool call result.\n\n`content` preserves the historical plain-string payload so downstream integrations (tests, logging, etc.) can keep treating tool output as `String`. When an MCP server returns richer data we additionally populate `content_items` with the structured form that the Responses API understands.", "properties": { "content": { "type": "string" @@ -433,7 +433,7 @@ ] }, "id": { - "description": "Set when using the chat completions API.", + "description": "Legacy id field retained for compatibility with older payloads.", "type": [ "string", "null" diff --git a/codex-rs/app-server-protocol/schema/typescript/FunctionCallOutputPayload.ts b/codex-rs/app-server-protocol/schema/typescript/FunctionCallOutputPayload.ts index 776369b10..94370f582 100644 --- a/codex-rs/app-server-protocol/schema/typescript/FunctionCallOutputPayload.ts +++ b/codex-rs/app-server-protocol/schema/typescript/FunctionCallOutputPayload.ts @@ -9,7 +9,6 @@ import type { FunctionCallOutputContentItem } from "./FunctionCallOutputContentI * `content` preserves the historical plain-string payload so downstream * integrations (tests, logging, etc.) can keep treating tool output as * `String`. When an MCP server returns richer data we additionally populate - * `content_items` with the structured form that the Responses/Chat - * Completions APIs understand. + * `content_items` with the structured form that the Responses API understands. */ export type FunctionCallOutputPayload = { content: string, content_items: Array | null, success: boolean | null, }; diff --git a/codex-rs/codex-api/README.md b/codex-rs/codex-api/README.md index 98db0bec6..c1f7d230c 100644 --- a/codex-rs/codex-api/README.md +++ b/codex-rs/codex-api/README.md @@ -2,7 +2,7 @@ Typed clients for Codex/OpenAI APIs built on top of the generic transport in `codex-client`. -- Hosts the request/response models and prompt helpers for Responses, Chat Completions, and Compact APIs. +- Hosts the request/response models and prompt helpers for Responses and Compact APIs. - Owns provider configuration (base URLs, headers, query params), auth header injection, retry tuning, and stream idle settings. - Parses SSE streams into `ResponseEvent`/`ResponseStream`, including rate-limit snapshots and API-specific error mapping. - Serves as the wire-level layer consumed by `codex-core`; higher layers handle auth refresh and business logic. @@ -11,7 +11,7 @@ Typed clients for Codex/OpenAI APIs built on top of the generic transport in `co The public interface of this crate is intentionally small and uniform: -- **Prompted endpoints (Chat + Responses)** +- **Prompted endpoints (Responses)** - Input: a single `Prompt` plus endpoint-specific options. - `Prompt` (re-exported as `codex_api::Prompt`) carries: - `instructions: String` – the fully-resolved system prompt for this turn. diff --git a/codex-rs/codex-api/src/common.rs b/codex-rs/codex-api/src/common.rs index 9a7aab997..a9127644f 100644 --- a/codex-rs/codex-api/src/common.rs +++ b/codex-rs/codex-api/src/common.rs @@ -13,7 +13,7 @@ use std::task::Context; use std::task::Poll; use tokio::sync::mpsc; -/// Canonical prompt input for Chat and Responses endpoints. +/// Canonical prompt input for Responses endpoints. #[derive(Debug, Clone)] pub struct Prompt { /// Fully-resolved system instructions for this turn. diff --git a/codex-rs/codex-api/src/endpoint/chat.rs b/codex-rs/codex-api/src/endpoint/aggregate.rs similarity index 54% rename from codex-rs/codex-api/src/endpoint/chat.rs rename to codex-rs/codex-api/src/endpoint/aggregate.rs index b183597d6..ac0cee904 100644 --- a/codex-rs/codex-api/src/endpoint/chat.rs +++ b/codex-rs/codex-api/src/endpoint/aggregate.rs @@ -1,111 +1,21 @@ -use crate::ChatRequest; -use crate::auth::AuthProvider; -use crate::common::Prompt as ApiPrompt; use crate::common::ResponseEvent; use crate::common::ResponseStream; -use crate::endpoint::streaming::StreamingClient; use crate::error::ApiError; -use crate::provider::Provider; -use crate::provider::WireApi; -use crate::sse::chat::spawn_chat_stream; -use crate::telemetry::SseTelemetry; -use codex_client::HttpTransport; -use codex_client::RequestCompression; -use codex_client::RequestTelemetry; use codex_protocol::models::ContentItem; use codex_protocol::models::ReasoningItemContent; use codex_protocol::models::ResponseItem; -use codex_protocol::protocol::SessionSource; use futures::Stream; -use http::HeaderMap; -use serde_json::Value; use std::collections::VecDeque; use std::pin::Pin; -use std::sync::Arc; use std::task::Context; use std::task::Poll; -pub struct ChatClient { - streaming: StreamingClient, -} - -impl ChatClient { - pub fn new(transport: T, provider: Provider, auth: A) -> Self { - Self { - streaming: StreamingClient::new(transport, provider, auth), - } - } - - pub fn with_telemetry( - self, - request: Option>, - sse: Option>, - ) -> Self { - Self { - streaming: self.streaming.with_telemetry(request, sse), - } - } - - pub async fn stream_request(&self, request: ChatRequest) -> Result { - self.stream(request.body, request.headers).await - } - - pub async fn stream_prompt( - &self, - model: &str, - prompt: &ApiPrompt, - conversation_id: Option, - session_source: Option, - ) -> Result { - use crate::requests::ChatRequestBuilder; - - let request = - ChatRequestBuilder::new(model, &prompt.instructions, &prompt.input, &prompt.tools) - .conversation_id(conversation_id) - .session_source(session_source) - .build(self.streaming.provider())?; - - self.stream_request(request).await - } - - fn path(&self) -> &'static str { - match self.streaming.provider().wire { - WireApi::Chat => "chat/completions", - _ => "responses", - } - } - - pub async fn stream( - &self, - body: Value, - extra_headers: HeaderMap, - ) -> Result { - self.streaming - .stream( - self.path(), - body, - extra_headers, - RequestCompression::None, - spawn_chat_stream, - None, - ) - .await - } -} - -#[derive(Copy, Clone, Eq, PartialEq)] -pub enum AggregateMode { - AggregatedOnly, - Streaming, -} - /// Stream adapter that merges token deltas into a single assistant message per turn. pub struct AggregatedStream { inner: ResponseStream, cumulative: String, cumulative_reasoning: String, pending: VecDeque, - mode: AggregateMode, } impl Stream for AggregatedStream { @@ -122,7 +32,7 @@ impl Stream for AggregatedStream { match Pin::new(&mut this.inner).poll_next(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(None) => return Poll::Ready(None), - Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))), + Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))), Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(item)))) => { let is_assistant_message = matches!( &item, @@ -130,29 +40,16 @@ impl Stream for AggregatedStream { ); if is_assistant_message { - match this.mode { - AggregateMode::AggregatedOnly => { - if this.cumulative.is_empty() - && let ResponseItem::Message { content, .. } = &item - && let Some(text) = content.iter().find_map(|c| match c { - ContentItem::OutputText { text } => Some(text), - _ => None, - }) - { - this.cumulative.push_str(text); - } - continue; - } - AggregateMode::Streaming => { - if this.cumulative.is_empty() { - return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone( - item, - )))); - } else { - continue; - } - } + if this.cumulative.is_empty() + && let ResponseItem::Message { content, .. } = &item + && let Some(text) = content.iter().find_map(|c| match c { + ContentItem::OutputText { text } => Some(text), + _ => None, + }) + { + this.cumulative.push_str(text); } + continue; } return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(item)))); @@ -216,35 +113,20 @@ impl Stream for AggregatedStream { token_usage, }))); } - Poll::Ready(Some(Ok(ResponseEvent::Created))) => { - continue; - } + Poll::Ready(Some(Ok(ResponseEvent::Created))) => continue, Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta)))) => { this.cumulative.push_str(&delta); - if matches!(this.mode, AggregateMode::Streaming) { - return Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta)))); - } else { - continue; - } + continue; } Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta { delta, - content_index, + content_index: _, }))) => { this.cumulative_reasoning.push_str(&delta); - if matches!(this.mode, AggregateMode::Streaming) { - return Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta { - delta, - content_index, - }))); - } else { - continue; - } - } - Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta { .. }))) => continue, - Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryPartAdded { .. }))) => { continue; } + Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta { .. }))) => continue, + Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryPartAdded { .. }))) => continue, Poll::Ready(Some(Ok(ResponseEvent::OutputItemAdded(item)))) => { return Poll::Ready(Some(Ok(ResponseEvent::OutputItemAdded(item)))); } @@ -255,28 +137,21 @@ impl Stream for AggregatedStream { pub trait AggregateStreamExt { fn aggregate(self) -> AggregatedStream; - - fn streaming_mode(self) -> ResponseStream; } impl AggregateStreamExt for ResponseStream { fn aggregate(self) -> AggregatedStream { - AggregatedStream::new(self, AggregateMode::AggregatedOnly) - } - - fn streaming_mode(self) -> ResponseStream { - self + AggregatedStream::new(self) } } impl AggregatedStream { - fn new(inner: ResponseStream, mode: AggregateMode) -> Self { + fn new(inner: ResponseStream) -> Self { AggregatedStream { inner, cumulative: String::new(), cumulative_reasoning: String::new(), pending: VecDeque::new(), - mode, } } } diff --git a/codex-rs/codex-api/src/endpoint/compact.rs b/codex-rs/codex-api/src/endpoint/compact.rs index 2b02ebd0f..8edba7c59 100644 --- a/codex-rs/codex-api/src/endpoint/compact.rs +++ b/codex-rs/codex-api/src/endpoint/compact.rs @@ -36,12 +36,9 @@ impl CompactClient { self } - fn path(&self) -> Result<&'static str, ApiError> { + fn path(&self) -> &'static str { match self.provider.wire { - WireApi::Compact | WireApi::Responses => Ok("responses/compact"), - WireApi::Chat => Err(ApiError::Stream( - "compact endpoint requires responses wire api".to_string(), - )), + WireApi::Compact | WireApi::Responses => "responses/compact", } } @@ -50,7 +47,7 @@ impl CompactClient { body: serde_json::Value, extra_headers: HeaderMap, ) -> Result, ApiError> { - let path = self.path()?; + let path = self.path(); let builder = || { let mut req = self.provider.build_request(Method::POST, path); req.headers.extend(extra_headers.clone()); @@ -139,24 +136,14 @@ mod tests { } } - #[tokio::test] - async fn errors_when_wire_is_chat() { - let client = CompactClient::new(DummyTransport, provider(WireApi::Chat), DummyAuth); - let input = CompactionInput { - model: "gpt-test", - input: &[], - instructions: "inst", - }; - let err = client - .compact_input(&input, HeaderMap::new()) - .await - .expect_err("expected wire mismatch to fail"); + #[test] + fn path_is_responses_compact_for_supported_wire_apis() { + let responses_client = + CompactClient::new(DummyTransport, provider(WireApi::Responses), DummyAuth); + assert_eq!(responses_client.path(), "responses/compact"); - match err { - ApiError::Stream(msg) => { - assert_eq!(msg, "compact endpoint requires responses wire api"); - } - other => panic!("unexpected error: {other:?}"), - } + let compact_client = + CompactClient::new(DummyTransport, provider(WireApi::Compact), DummyAuth); + assert_eq!(compact_client.path(), "responses/compact"); } } diff --git a/codex-rs/codex-api/src/endpoint/mod.rs b/codex-rs/codex-api/src/endpoint/mod.rs index 2fa116c08..2bb524b22 100644 --- a/codex-rs/codex-api/src/endpoint/mod.rs +++ b/codex-rs/codex-api/src/endpoint/mod.rs @@ -1,4 +1,4 @@ -pub mod chat; +pub mod aggregate; pub mod compact; pub mod models; pub mod responses; diff --git a/codex-rs/codex-api/src/endpoint/responses.rs b/codex-rs/codex-api/src/endpoint/responses.rs index 4aded3391..11ff60754 100644 --- a/codex-rs/codex-api/src/endpoint/responses.rs +++ b/codex-rs/codex-api/src/endpoint/responses.rs @@ -111,7 +111,6 @@ impl ResponsesClient { fn path(&self) -> &'static str { match self.streaming.provider().wire { WireApi::Responses | WireApi::Compact => "responses", - WireApi::Chat => "chat/completions", } } diff --git a/codex-rs/codex-api/src/lib.rs b/codex-rs/codex-api/src/lib.rs index 2e340970c..6a2490c5a 100644 --- a/codex-rs/codex-api/src/lib.rs +++ b/codex-rs/codex-api/src/lib.rs @@ -22,8 +22,7 @@ pub use crate::common::ResponseEvent; pub use crate::common::ResponseStream; pub use crate::common::ResponsesApiRequest; pub use crate::common::create_text_param_for_request; -pub use crate::endpoint::chat::AggregateStreamExt; -pub use crate::endpoint::chat::ChatClient; +pub use crate::endpoint::aggregate::AggregateStreamExt; pub use crate::endpoint::compact::CompactClient; pub use crate::endpoint::models::ModelsClient; pub use crate::endpoint::responses::ResponsesClient; @@ -34,8 +33,6 @@ pub use crate::error::ApiError; pub use crate::provider::Provider; pub use crate::provider::WireApi; pub use crate::provider::is_azure_responses_wire_base_url; -pub use crate::requests::ChatRequest; -pub use crate::requests::ChatRequestBuilder; pub use crate::requests::ResponsesRequest; pub use crate::requests::ResponsesRequestBuilder; pub use crate::sse::stream_from_fixture; diff --git a/codex-rs/codex-api/src/provider.rs b/codex-rs/codex-api/src/provider.rs index d7e7c1941..fab3ed75e 100644 --- a/codex-rs/codex-api/src/provider.rs +++ b/codex-rs/codex-api/src/provider.rs @@ -12,7 +12,6 @@ use url::Url; #[derive(Debug, Clone, PartialEq, Eq)] pub enum WireApi { Responses, - Chat, Compact, } @@ -182,7 +181,7 @@ mod tests { } assert!(!is_azure_responses_wire_base_url( - WireApi::Chat, + WireApi::Compact, "Azure", Some("https://foo.openai.azure.com/openai") )); diff --git a/codex-rs/codex-api/src/requests/chat.rs b/codex-rs/codex-api/src/requests/chat.rs deleted file mode 100644 index 60a591e5c..000000000 --- a/codex-rs/codex-api/src/requests/chat.rs +++ /dev/null @@ -1,494 +0,0 @@ -use crate::error::ApiError; -use crate::provider::Provider; -use crate::requests::headers::build_conversation_headers; -use crate::requests::headers::insert_header; -use crate::requests::headers::subagent_header; -use codex_protocol::models::ContentItem; -use codex_protocol::models::FunctionCallOutputContentItem; -use codex_protocol::models::ReasoningItemContent; -use codex_protocol::models::ResponseItem; -use codex_protocol::protocol::SessionSource; -use http::HeaderMap; -use serde_json::Value; -use serde_json::json; -use std::collections::HashMap; - -/// Assembled request body plus headers for Chat Completions streaming calls. -pub struct ChatRequest { - pub body: Value, - pub headers: HeaderMap, -} - -pub struct ChatRequestBuilder<'a> { - model: &'a str, - instructions: &'a str, - input: &'a [ResponseItem], - tools: &'a [Value], - conversation_id: Option, - session_source: Option, -} - -impl<'a> ChatRequestBuilder<'a> { - pub fn new( - model: &'a str, - instructions: &'a str, - input: &'a [ResponseItem], - tools: &'a [Value], - ) -> Self { - Self { - model, - instructions, - input, - tools, - conversation_id: None, - session_source: None, - } - } - - pub fn conversation_id(mut self, id: Option) -> Self { - self.conversation_id = id; - self - } - - pub fn session_source(mut self, source: Option) -> Self { - self.session_source = source; - self - } - - pub fn build(self, _provider: &Provider) -> Result { - let mut messages = Vec::::new(); - messages.push(json!({"role": "system", "content": self.instructions})); - - let input = self.input; - let mut reasoning_by_anchor_index: HashMap = HashMap::new(); - let mut last_emitted_role: Option<&str> = None; - for item in input { - match item { - ResponseItem::Message { role, .. } => last_emitted_role = Some(role.as_str()), - ResponseItem::FunctionCall { .. } | ResponseItem::LocalShellCall { .. } => { - last_emitted_role = Some("assistant") - } - ResponseItem::FunctionCallOutput { .. } => last_emitted_role = Some("tool"), - ResponseItem::Reasoning { .. } | ResponseItem::Other => {} - ResponseItem::CustomToolCall { .. } => {} - ResponseItem::CustomToolCallOutput { .. } => {} - ResponseItem::WebSearchCall { .. } => {} - ResponseItem::GhostSnapshot { .. } => {} - ResponseItem::Compaction { .. } => {} - } - } - - let mut last_user_index: Option = None; - for (idx, item) in input.iter().enumerate() { - if let ResponseItem::Message { role, .. } = item - && role == "user" - { - last_user_index = Some(idx); - } - } - - if !matches!(last_emitted_role, Some("user")) { - for (idx, item) in input.iter().enumerate() { - if let Some(u_idx) = last_user_index - && idx <= u_idx - { - continue; - } - - if let ResponseItem::Reasoning { - content: Some(items), - .. - } = item - { - let mut text = String::new(); - for entry in items { - match entry { - ReasoningItemContent::ReasoningText { text: segment } - | ReasoningItemContent::Text { text: segment } => { - text.push_str(segment) - } - } - } - if text.trim().is_empty() { - continue; - } - - let mut attached = false; - if idx > 0 - && let ResponseItem::Message { role, .. } = &input[idx - 1] - && role == "assistant" - { - reasoning_by_anchor_index - .entry(idx - 1) - .and_modify(|v| v.push_str(&text)) - .or_insert(text.clone()); - attached = true; - } - - if !attached && idx + 1 < input.len() { - match &input[idx + 1] { - ResponseItem::FunctionCall { .. } - | ResponseItem::LocalShellCall { .. } => { - reasoning_by_anchor_index - .entry(idx + 1) - .and_modify(|v| v.push_str(&text)) - .or_insert(text.clone()); - } - ResponseItem::Message { role, .. } if role == "assistant" => { - reasoning_by_anchor_index - .entry(idx + 1) - .and_modify(|v| v.push_str(&text)) - .or_insert(text.clone()); - } - _ => {} - } - } - } - } - } - - let mut last_assistant_text: Option = None; - - for (idx, item) in input.iter().enumerate() { - match item { - ResponseItem::Message { role, content, .. } => { - let mut text = String::new(); - let mut items: Vec = Vec::new(); - let mut saw_image = false; - - for c in content { - match c { - ContentItem::InputText { text: t } - | ContentItem::OutputText { text: t } => { - text.push_str(t); - items.push(json!({"type":"text","text": t})); - } - ContentItem::InputImage { image_url } => { - saw_image = true; - items.push( - json!({"type":"image_url","image_url": {"url": image_url}}), - ); - } - } - } - - if role == "assistant" { - if let Some(prev) = &last_assistant_text - && prev == &text - { - continue; - } - last_assistant_text = Some(text.clone()); - } - - let content_value = if role == "assistant" { - json!(text) - } else if saw_image { - json!(items) - } else { - json!(text) - }; - - let mut msg = json!({"role": role, "content": content_value}); - if role == "assistant" - && let Some(reasoning) = reasoning_by_anchor_index.get(&idx) - && let Some(obj) = msg.as_object_mut() - { - obj.insert("reasoning".to_string(), json!(reasoning)); - } - messages.push(msg); - } - ResponseItem::FunctionCall { - name, - arguments, - call_id, - .. - } => { - let reasoning = reasoning_by_anchor_index.get(&idx).map(String::as_str); - let tool_call = json!({ - "id": call_id, - "type": "function", - "function": { - "name": name, - "arguments": arguments, - } - }); - push_tool_call_message(&mut messages, tool_call, reasoning); - } - ResponseItem::LocalShellCall { - id, - call_id: _, - status, - action, - } => { - let reasoning = reasoning_by_anchor_index.get(&idx).map(String::as_str); - let tool_call = json!({ - "id": id.clone().unwrap_or_default(), - "type": "local_shell_call", - "status": status, - "action": action, - }); - push_tool_call_message(&mut messages, tool_call, reasoning); - } - ResponseItem::FunctionCallOutput { call_id, output } => { - let content_value = if let Some(items) = &output.content_items { - let mapped: Vec = items - .iter() - .map(|it| match it { - FunctionCallOutputContentItem::InputText { text } => { - json!({"type":"text","text": text}) - } - FunctionCallOutputContentItem::InputImage { image_url } => { - json!({"type":"image_url","image_url": {"url": image_url}}) - } - }) - .collect(); - json!(mapped) - } else { - json!(output.content) - }; - - messages.push(json!({ - "role": "tool", - "tool_call_id": call_id, - "content": content_value, - })); - } - ResponseItem::CustomToolCall { - id, - call_id: _, - name, - input, - status: _, - } => { - let tool_call = json!({ - "id": id, - "type": "custom", - "custom": { - "name": name, - "input": input, - } - }); - let reasoning = reasoning_by_anchor_index.get(&idx).map(String::as_str); - push_tool_call_message(&mut messages, tool_call, reasoning); - } - ResponseItem::CustomToolCallOutput { call_id, output } => { - messages.push(json!({ - "role": "tool", - "tool_call_id": call_id, - "content": output, - })); - } - ResponseItem::GhostSnapshot { .. } => { - continue; - } - ResponseItem::Reasoning { .. } - | ResponseItem::WebSearchCall { .. } - | ResponseItem::Other - | ResponseItem::Compaction { .. } => { - continue; - } - } - } - - let payload = json!({ - "model": self.model, - "messages": messages, - "stream": true, - "tools": self.tools, - }); - - let mut headers = build_conversation_headers(self.conversation_id); - if let Some(subagent) = subagent_header(&self.session_source) { - insert_header(&mut headers, "x-openai-subagent", &subagent); - } - - Ok(ChatRequest { - body: payload, - headers, - }) - } -} - -fn push_tool_call_message(messages: &mut Vec, tool_call: Value, reasoning: Option<&str>) { - // Chat Completions requires that tool calls are grouped into a single assistant message - // (with `tool_calls: [...]`) followed by tool role responses. - if let Some(Value::Object(obj)) = messages.last_mut() - && obj.get("role").and_then(Value::as_str) == Some("assistant") - && obj.get("content").is_some_and(Value::is_null) - && let Some(tool_calls) = obj.get_mut("tool_calls").and_then(Value::as_array_mut) - { - tool_calls.push(tool_call); - if let Some(reasoning) = reasoning { - if let Some(Value::String(existing)) = obj.get_mut("reasoning") { - if !existing.is_empty() { - existing.push('\n'); - } - existing.push_str(reasoning); - } else { - obj.insert( - "reasoning".to_string(), - Value::String(reasoning.to_string()), - ); - } - } - return; - } - - let mut msg = json!({ - "role": "assistant", - "content": null, - "tool_calls": [tool_call], - }); - if let Some(reasoning) = reasoning - && let Some(obj) = msg.as_object_mut() - { - obj.insert("reasoning".to_string(), json!(reasoning)); - } - messages.push(msg); -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::provider::RetryConfig; - use crate::provider::WireApi; - use codex_protocol::models::FunctionCallOutputPayload; - use codex_protocol::protocol::SessionSource; - use codex_protocol::protocol::SubAgentSource; - use http::HeaderValue; - use pretty_assertions::assert_eq; - use std::time::Duration; - - fn provider() -> Provider { - Provider { - name: "openai".to_string(), - base_url: "https://api.openai.com/v1".to_string(), - query_params: None, - wire: WireApi::Chat, - headers: HeaderMap::new(), - retry: RetryConfig { - max_attempts: 1, - base_delay: Duration::from_millis(10), - retry_429: false, - retry_5xx: true, - retry_transport: true, - }, - stream_idle_timeout: Duration::from_secs(1), - } - } - - #[test] - fn attaches_conversation_and_subagent_headers() { - let prompt_input = vec![ResponseItem::Message { - id: None, - role: "user".to_string(), - content: vec![ContentItem::InputText { - text: "hi".to_string(), - }], - end_turn: None, - phase: None, - }]; - let req = ChatRequestBuilder::new("gpt-test", "inst", &prompt_input, &[]) - .conversation_id(Some("conv-1".into())) - .session_source(Some(SessionSource::SubAgent(SubAgentSource::Review))) - .build(&provider()) - .expect("request"); - - assert_eq!( - req.headers.get("session_id"), - Some(&HeaderValue::from_static("conv-1")) - ); - assert_eq!( - req.headers.get("x-openai-subagent"), - Some(&HeaderValue::from_static("review")) - ); - } - - #[test] - fn groups_consecutive_tool_calls_into_a_single_assistant_message() { - let prompt_input = vec![ - ResponseItem::Message { - id: None, - role: "user".to_string(), - content: vec![ContentItem::InputText { - text: "read these".to_string(), - }], - end_turn: None, - phase: None, - }, - ResponseItem::FunctionCall { - id: None, - name: "read_file".to_string(), - arguments: r#"{"path":"a.txt"}"#.to_string(), - call_id: "call-a".to_string(), - }, - ResponseItem::FunctionCall { - id: None, - name: "read_file".to_string(), - arguments: r#"{"path":"b.txt"}"#.to_string(), - call_id: "call-b".to_string(), - }, - ResponseItem::FunctionCall { - id: None, - name: "read_file".to_string(), - arguments: r#"{"path":"c.txt"}"#.to_string(), - call_id: "call-c".to_string(), - }, - ResponseItem::FunctionCallOutput { - call_id: "call-a".to_string(), - output: FunctionCallOutputPayload { - content: "A".to_string(), - ..Default::default() - }, - }, - ResponseItem::FunctionCallOutput { - call_id: "call-b".to_string(), - output: FunctionCallOutputPayload { - content: "B".to_string(), - ..Default::default() - }, - }, - ResponseItem::FunctionCallOutput { - call_id: "call-c".to_string(), - output: FunctionCallOutputPayload { - content: "C".to_string(), - ..Default::default() - }, - }, - ]; - - let req = ChatRequestBuilder::new("gpt-test", "inst", &prompt_input, &[]) - .build(&provider()) - .expect("request"); - - let messages = req - .body - .get("messages") - .and_then(|v| v.as_array()) - .expect("messages array"); - // system + user + assistant(tool_calls=[...]) + 3 tool outputs - assert_eq!(messages.len(), 6); - - assert_eq!(messages[0]["role"], "system"); - assert_eq!(messages[1]["role"], "user"); - - let tool_calls_msg = &messages[2]; - assert_eq!(tool_calls_msg["role"], "assistant"); - assert_eq!(tool_calls_msg["content"], serde_json::Value::Null); - let tool_calls = tool_calls_msg["tool_calls"] - .as_array() - .expect("tool_calls array"); - assert_eq!(tool_calls.len(), 3); - assert_eq!(tool_calls[0]["id"], "call-a"); - assert_eq!(tool_calls[1]["id"], "call-b"); - assert_eq!(tool_calls[2]["id"], "call-c"); - - assert_eq!(messages[3]["role"], "tool"); - assert_eq!(messages[3]["tool_call_id"], "call-a"); - assert_eq!(messages[4]["role"], "tool"); - assert_eq!(messages[4]["tool_call_id"], "call-b"); - assert_eq!(messages[5]["role"], "tool"); - assert_eq!(messages[5]["tool_call_id"], "call-c"); - } -} diff --git a/codex-rs/codex-api/src/requests/mod.rs b/codex-rs/codex-api/src/requests/mod.rs index f0ab23a25..35fecf9a9 100644 --- a/codex-rs/codex-api/src/requests/mod.rs +++ b/codex-rs/codex-api/src/requests/mod.rs @@ -1,8 +1,5 @@ -pub mod chat; pub(crate) mod headers; pub mod responses; -pub use chat::ChatRequest; -pub use chat::ChatRequestBuilder; pub use responses::ResponsesRequest; pub use responses::ResponsesRequestBuilder; diff --git a/codex-rs/codex-api/src/sse/chat.rs b/codex-rs/codex-api/src/sse/chat.rs deleted file mode 100644 index 4b7884528..000000000 --- a/codex-rs/codex-api/src/sse/chat.rs +++ /dev/null @@ -1,717 +0,0 @@ -use crate::common::ResponseEvent; -use crate::common::ResponseStream; -use crate::error::ApiError; -use crate::telemetry::SseTelemetry; -use codex_client::StreamResponse; -use codex_protocol::models::ContentItem; -use codex_protocol::models::ReasoningItemContent; -use codex_protocol::models::ResponseItem; -use eventsource_stream::Eventsource; -use futures::Stream; -use futures::StreamExt; -use std::collections::HashMap; -use std::collections::HashSet; -use std::sync::Arc; -use std::sync::OnceLock; -use std::time::Duration; -use tokio::sync::mpsc; -use tokio::time::Instant; -use tokio::time::timeout; -use tracing::debug; -use tracing::trace; - -pub(crate) fn spawn_chat_stream( - stream_response: StreamResponse, - idle_timeout: Duration, - telemetry: Option>, - _turn_state: Option>>, -) -> ResponseStream { - let (tx_event, rx_event) = mpsc::channel::>(1600); - tokio::spawn(async move { - process_chat_sse(stream_response.bytes, tx_event, idle_timeout, telemetry).await; - }); - ResponseStream { rx_event } -} - -/// Processes Server-Sent Events from the legacy Chat Completions streaming API. -/// -/// The upstream protocol terminates a streaming response with a final sentinel event -/// (`data: [DONE]`). Historically, some of our test stubs have emitted `data: DONE` -/// (without brackets) instead. -/// -/// `eventsource_stream` delivers these sentinels as regular events rather than signaling -/// end-of-stream. If we try to parse them as JSON, we log and skip them, then keep -/// polling for more events. -/// -/// On servers that keep the HTTP connection open after emitting the sentinel (notably -/// wiremock on Windows), skipping the sentinel means we never emit `ResponseEvent::Completed`. -/// Higher-level workflows/tests that wait for completion before issuing subsequent model -/// calls will then stall, which shows up as "expected N requests, got 1" verification -/// failures in the mock server. -pub async fn process_chat_sse( - stream: S, - tx_event: mpsc::Sender>, - idle_timeout: Duration, - telemetry: Option>, -) where - S: Stream> + Unpin, -{ - let mut stream = stream.eventsource(); - - #[derive(Default, Debug)] - struct ToolCallState { - id: Option, - name: Option, - arguments: String, - } - - let mut tool_calls: HashMap = HashMap::new(); - let mut tool_call_order: Vec = Vec::new(); - let mut tool_call_order_seen: HashSet = HashSet::new(); - let mut tool_call_index_by_id: HashMap = HashMap::new(); - let mut next_tool_call_index = 0usize; - let mut last_tool_call_index: Option = None; - let mut assistant_item: Option = None; - let mut reasoning_item: Option = None; - let mut completed_sent = false; - - async fn flush_and_complete( - tx_event: &mpsc::Sender>, - reasoning_item: &mut Option, - assistant_item: &mut Option, - ) { - if let Some(reasoning) = reasoning_item.take() { - let _ = tx_event - .send(Ok(ResponseEvent::OutputItemDone(reasoning))) - .await; - } - - if let Some(assistant) = assistant_item.take() { - let _ = tx_event - .send(Ok(ResponseEvent::OutputItemDone(assistant))) - .await; - } - - let _ = tx_event - .send(Ok(ResponseEvent::Completed { - response_id: String::new(), - token_usage: None, - })) - .await; - } - - loop { - let start = Instant::now(); - let response = timeout(idle_timeout, stream.next()).await; - if let Some(t) = telemetry.as_ref() { - t.on_sse_poll(&response, start.elapsed()); - } - let sse = match response { - Ok(Some(Ok(sse))) => sse, - Ok(Some(Err(e))) => { - let _ = tx_event.send(Err(ApiError::Stream(e.to_string()))).await; - return; - } - Ok(None) => { - if !completed_sent { - flush_and_complete(&tx_event, &mut reasoning_item, &mut assistant_item).await; - } - return; - } - Err(_) => { - let _ = tx_event - .send(Err(ApiError::Stream("idle timeout waiting for SSE".into()))) - .await; - return; - } - }; - - trace!("SSE event: {}", sse.data); - - let data = sse.data.trim(); - - if data.is_empty() { - continue; - } - - if data == "[DONE]" || data == "DONE" { - if !completed_sent { - flush_and_complete(&tx_event, &mut reasoning_item, &mut assistant_item).await; - } - return; - } - - let value: serde_json::Value = match serde_json::from_str(data) { - Ok(val) => val, - Err(err) => { - debug!( - "Failed to parse ChatCompletions SSE event: {err}, data: {}", - data - ); - continue; - } - }; - - let Some(choices) = value.get("choices").and_then(|c| c.as_array()) else { - continue; - }; - - for choice in choices { - if let Some(delta) = choice.get("delta") { - if let Some(reasoning) = delta.get("reasoning") { - if let Some(text) = reasoning.as_str() { - append_reasoning_text(&tx_event, &mut reasoning_item, text.to_string()) - .await; - } else if let Some(text) = reasoning.get("text").and_then(|v| v.as_str()) { - append_reasoning_text(&tx_event, &mut reasoning_item, text.to_string()) - .await; - } else if let Some(text) = reasoning.get("content").and_then(|v| v.as_str()) { - append_reasoning_text(&tx_event, &mut reasoning_item, text.to_string()) - .await; - } - } - - if let Some(content) = delta.get("content") { - if content.is_array() { - for item in content.as_array().unwrap_or(&vec![]) { - if let Some(text) = item.get("text").and_then(|t| t.as_str()) { - append_assistant_text( - &tx_event, - &mut assistant_item, - text.to_string(), - ) - .await; - } - } - } else if let Some(text) = content.as_str() { - append_assistant_text(&tx_event, &mut assistant_item, text.to_string()) - .await; - } - } - - if let Some(tool_call_values) = delta.get("tool_calls").and_then(|c| c.as_array()) { - for tool_call in tool_call_values { - let mut index = tool_call - .get("index") - .and_then(serde_json::Value::as_u64) - .map(|i| i as usize); - - let mut call_id_for_lookup = None; - if let Some(call_id) = tool_call.get("id").and_then(|i| i.as_str()) { - call_id_for_lookup = Some(call_id.to_string()); - if let Some(existing) = tool_call_index_by_id.get(call_id) { - index = Some(*existing); - } - } - - if index.is_none() && call_id_for_lookup.is_none() { - index = last_tool_call_index; - } - - let index = index.unwrap_or_else(|| { - while tool_calls.contains_key(&next_tool_call_index) { - next_tool_call_index += 1; - } - let idx = next_tool_call_index; - next_tool_call_index += 1; - idx - }); - - let call_state = tool_calls.entry(index).or_default(); - if tool_call_order_seen.insert(index) { - tool_call_order.push(index); - } - - if let Some(id) = tool_call.get("id").and_then(|i| i.as_str()) { - call_state.id.get_or_insert_with(|| id.to_string()); - tool_call_index_by_id.entry(id.to_string()).or_insert(index); - } - - if let Some(func) = tool_call.get("function") { - if let Some(fname) = func.get("name").and_then(|n| n.as_str()) - && !fname.is_empty() - { - call_state.name.get_or_insert_with(|| fname.to_string()); - } - if let Some(arguments) = func.get("arguments").and_then(|a| a.as_str()) - { - call_state.arguments.push_str(arguments); - } - } - - last_tool_call_index = Some(index); - } - } - } - - if let Some(message) = choice.get("message") - && let Some(reasoning) = message.get("reasoning") - { - if let Some(text) = reasoning.as_str() { - append_reasoning_text(&tx_event, &mut reasoning_item, text.to_string()).await; - } else if let Some(text) = reasoning.get("text").and_then(|v| v.as_str()) { - append_reasoning_text(&tx_event, &mut reasoning_item, text.to_string()).await; - } else if let Some(text) = reasoning.get("content").and_then(|v| v.as_str()) { - append_reasoning_text(&tx_event, &mut reasoning_item, text.to_string()).await; - } - } - - let finish_reason = choice.get("finish_reason").and_then(|r| r.as_str()); - if finish_reason == Some("stop") { - if let Some(reasoning) = reasoning_item.take() { - let _ = tx_event - .send(Ok(ResponseEvent::OutputItemDone(reasoning))) - .await; - } - - if let Some(assistant) = assistant_item.take() { - let _ = tx_event - .send(Ok(ResponseEvent::OutputItemDone(assistant))) - .await; - } - if !completed_sent { - let _ = tx_event - .send(Ok(ResponseEvent::Completed { - response_id: String::new(), - token_usage: None, - })) - .await; - completed_sent = true; - } - continue; - } - - if finish_reason == Some("length") { - let _ = tx_event.send(Err(ApiError::ContextWindowExceeded)).await; - return; - } - - if finish_reason == Some("tool_calls") { - if let Some(reasoning) = reasoning_item.take() { - let _ = tx_event - .send(Ok(ResponseEvent::OutputItemDone(reasoning))) - .await; - } - - for index in tool_call_order.drain(..) { - let Some(state) = tool_calls.remove(&index) else { - continue; - }; - tool_call_order_seen.remove(&index); - let ToolCallState { - id, - name, - arguments, - } = state; - let Some(name) = name else { - debug!("Skipping tool call at index {index} because name is missing"); - continue; - }; - let item = ResponseItem::FunctionCall { - id: None, - name, - arguments, - call_id: id.unwrap_or_else(|| format!("tool-call-{index}")), - }; - let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; - } - } - } - } -} - -async fn append_assistant_text( - tx_event: &mpsc::Sender>, - assistant_item: &mut Option, - text: String, -) { - if assistant_item.is_none() { - let item = ResponseItem::Message { - id: None, - role: "assistant".to_string(), - content: vec![], - end_turn: None, - phase: None, - }; - *assistant_item = Some(item.clone()); - let _ = tx_event - .send(Ok(ResponseEvent::OutputItemAdded(item))) - .await; - } - - if let Some(ResponseItem::Message { content, .. }) = assistant_item { - content.push(ContentItem::OutputText { text: text.clone() }); - let _ = tx_event - .send(Ok(ResponseEvent::OutputTextDelta(text.clone()))) - .await; - } -} - -async fn append_reasoning_text( - tx_event: &mpsc::Sender>, - reasoning_item: &mut Option, - text: String, -) { - if reasoning_item.is_none() { - let item = ResponseItem::Reasoning { - id: String::new(), - summary: Vec::new(), - content: Some(vec![]), - encrypted_content: None, - }; - *reasoning_item = Some(item.clone()); - let _ = tx_event - .send(Ok(ResponseEvent::OutputItemAdded(item))) - .await; - } - - if let Some(ResponseItem::Reasoning { - content: Some(content), - .. - }) = reasoning_item - { - let content_index = content.len() as i64; - content.push(ReasoningItemContent::ReasoningText { text: text.clone() }); - - let _ = tx_event - .send(Ok(ResponseEvent::ReasoningContentDelta { - delta: text.clone(), - content_index, - })) - .await; - } -} - -#[cfg(test)] -mod tests { - use super::*; - use assert_matches::assert_matches; - use codex_protocol::models::ResponseItem; - use futures::TryStreamExt; - use serde_json::json; - use tokio::sync::mpsc; - use tokio_util::io::ReaderStream; - - fn build_body(events: &[serde_json::Value]) -> String { - let mut body = String::new(); - for e in events { - body.push_str(&format!("event: message\ndata: {e}\n\n")); - } - body - } - - /// Regression test: the stream should complete when we see a `[DONE]` sentinel. - /// - /// This is important for tests/mocks that don't immediately close the underlying - /// connection after emitting the sentinel. - #[tokio::test] - async fn completes_on_done_sentinel_without_json() { - let events = collect_events("event: message\ndata: [DONE]\n\n").await; - assert_matches!(&events[..], [ResponseEvent::Completed { .. }]); - } - - async fn collect_events(body: &str) -> Vec { - let reader = ReaderStream::new(std::io::Cursor::new(body.to_string())) - .map_err(|err| codex_client::TransportError::Network(err.to_string())); - let (tx, mut rx) = mpsc::channel::>(16); - tokio::spawn(process_chat_sse( - reader, - tx, - Duration::from_millis(1000), - None, - )); - - let mut out = Vec::new(); - while let Some(ev) = rx.recv().await { - out.push(ev.expect("stream error")); - } - out - } - - #[tokio::test] - async fn concatenates_tool_call_arguments_across_deltas() { - let delta_name = json!({ - "choices": [{ - "delta": { - "tool_calls": [{ - "id": "call_a", - "index": 0, - "function": { "name": "do_a" } - }] - } - }] - }); - - let delta_args_1 = json!({ - "choices": [{ - "delta": { - "tool_calls": [{ - "index": 0, - "function": { "arguments": "{ \"foo\":" } - }] - } - }] - }); - - let delta_args_2 = json!({ - "choices": [{ - "delta": { - "tool_calls": [{ - "index": 0, - "function": { "arguments": "1}" } - }] - } - }] - }); - - let finish = json!({ - "choices": [{ - "finish_reason": "tool_calls" - }] - }); - - let body = build_body(&[delta_name, delta_args_1, delta_args_2, finish]); - let events = collect_events(&body).await; - assert_matches!( - &events[..], - [ - ResponseEvent::OutputItemDone(ResponseItem::FunctionCall { call_id, name, arguments, .. }), - ResponseEvent::Completed { .. } - ] if call_id == "call_a" && name == "do_a" && arguments == "{ \"foo\":1}" - ); - } - - #[tokio::test] - async fn emits_multiple_tool_calls() { - let delta_a = json!({ - "choices": [{ - "delta": { - "tool_calls": [{ - "id": "call_a", - "function": { "name": "do_a", "arguments": "{\"foo\":1}" } - }] - } - }] - }); - - let delta_b = json!({ - "choices": [{ - "delta": { - "tool_calls": [{ - "id": "call_b", - "function": { "name": "do_b", "arguments": "{\"bar\":2}" } - }] - } - }] - }); - - let finish = json!({ - "choices": [{ - "finish_reason": "tool_calls" - }] - }); - - let body = build_body(&[delta_a, delta_b, finish]); - let events = collect_events(&body).await; - assert_matches!( - &events[..], - [ - ResponseEvent::OutputItemDone(ResponseItem::FunctionCall { call_id: call_a, name: name_a, arguments: args_a, .. }), - ResponseEvent::OutputItemDone(ResponseItem::FunctionCall { call_id: call_b, name: name_b, arguments: args_b, .. }), - ResponseEvent::Completed { .. } - ] if call_a == "call_a" && name_a == "do_a" && args_a == "{\"foo\":1}" && call_b == "call_b" && name_b == "do_b" && args_b == "{\"bar\":2}" - ); - } - - #[tokio::test] - async fn emits_tool_calls_for_multiple_choices() { - let payload = json!({ - "choices": [ - { - "delta": { - "tool_calls": [{ - "id": "call_a", - "index": 0, - "function": { "name": "do_a", "arguments": "{}" } - }] - }, - "finish_reason": "tool_calls" - }, - { - "delta": { - "tool_calls": [{ - "id": "call_b", - "index": 0, - "function": { "name": "do_b", "arguments": "{}" } - }] - }, - "finish_reason": "tool_calls" - } - ] - }); - - let body = build_body(&[payload]); - let events = collect_events(&body).await; - assert_matches!( - &events[..], - [ - ResponseEvent::OutputItemDone(ResponseItem::FunctionCall { call_id: call_a, name: name_a, arguments: args_a, .. }), - ResponseEvent::OutputItemDone(ResponseItem::FunctionCall { call_id: call_b, name: name_b, arguments: args_b, .. }), - ResponseEvent::Completed { .. } - ] if call_a == "call_a" && name_a == "do_a" && args_a == "{}" && call_b == "call_b" && name_b == "do_b" && args_b == "{}" - ); - } - - #[tokio::test] - async fn merges_tool_calls_by_index_when_id_missing_on_subsequent_deltas() { - let delta_with_id = json!({ - "choices": [{ - "delta": { - "tool_calls": [{ - "index": 0, - "id": "call_a", - "function": { "name": "do_a", "arguments": "{ \"foo\":" } - }] - } - }] - }); - - let delta_without_id = json!({ - "choices": [{ - "delta": { - "tool_calls": [{ - "index": 0, - "function": { "arguments": "1}" } - }] - } - }] - }); - - let finish = json!({ - "choices": [{ - "finish_reason": "tool_calls" - }] - }); - - let body = build_body(&[delta_with_id, delta_without_id, finish]); - let events = collect_events(&body).await; - assert_matches!( - &events[..], - [ - ResponseEvent::OutputItemDone(ResponseItem::FunctionCall { call_id, name, arguments, .. }), - ResponseEvent::Completed { .. } - ] if call_id == "call_a" && name == "do_a" && arguments == "{ \"foo\":1}" - ); - } - - #[tokio::test] - async fn preserves_tool_call_name_when_empty_deltas_arrive() { - let delta_with_name = json!({ - "choices": [{ - "delta": { - "tool_calls": [{ - "id": "call_a", - "function": { "name": "do_a" } - }] - } - }] - }); - - let delta_with_empty_name = json!({ - "choices": [{ - "delta": { - "tool_calls": [{ - "id": "call_a", - "function": { "name": "", "arguments": "{}" } - }] - } - }] - }); - - let finish = json!({ - "choices": [{ - "finish_reason": "tool_calls" - }] - }); - - let body = build_body(&[delta_with_name, delta_with_empty_name, finish]); - let events = collect_events(&body).await; - assert_matches!( - &events[..], - [ - ResponseEvent::OutputItemDone(ResponseItem::FunctionCall { name, arguments, .. }), - ResponseEvent::Completed { .. } - ] if name == "do_a" && arguments == "{}" - ); - } - - #[tokio::test] - async fn emits_tool_calls_even_when_content_and_reasoning_present() { - let delta_content_and_tools = json!({ - "choices": [{ - "delta": { - "content": [{"text": "hi"}], - "reasoning": "because", - "tool_calls": [{ - "id": "call_a", - "function": { "name": "do_a", "arguments": "{}" } - }] - } - }] - }); - - let finish = json!({ - "choices": [{ - "finish_reason": "tool_calls" - }] - }); - - let body = build_body(&[delta_content_and_tools, finish]); - let events = collect_events(&body).await; - - assert_matches!( - &events[..], - [ - ResponseEvent::OutputItemAdded(ResponseItem::Reasoning { .. }), - ResponseEvent::ReasoningContentDelta { .. }, - ResponseEvent::OutputItemAdded(ResponseItem::Message { .. }), - ResponseEvent::OutputTextDelta(delta), - ResponseEvent::OutputItemDone(ResponseItem::Reasoning { .. }), - ResponseEvent::OutputItemDone(ResponseItem::FunctionCall { call_id, name, .. }), - ResponseEvent::OutputItemDone(ResponseItem::Message { .. }), - ResponseEvent::Completed { .. } - ] if delta == "hi" && call_id == "call_a" && name == "do_a" - ); - } - - #[tokio::test] - async fn drops_partial_tool_calls_on_stop_finish_reason() { - let delta_tool = json!({ - "choices": [{ - "delta": { - "tool_calls": [{ - "id": "call_a", - "function": { "name": "do_a", "arguments": "{}" } - }] - } - }] - }); - - let finish_stop = json!({ - "choices": [{ - "finish_reason": "stop" - }] - }); - - let body = build_body(&[delta_tool, finish_stop]); - let events = collect_events(&body).await; - - assert!(!events.iter().any(|ev| { - matches!( - ev, - ResponseEvent::OutputItemDone(ResponseItem::FunctionCall { .. }) - ) - })); - assert_matches!(events.last(), Some(ResponseEvent::Completed { .. })); - } -} diff --git a/codex-rs/codex-api/src/sse/mod.rs b/codex-rs/codex-api/src/sse/mod.rs index e3ab770c4..cb689afc1 100644 --- a/codex-rs/codex-api/src/sse/mod.rs +++ b/codex-rs/codex-api/src/sse/mod.rs @@ -1,4 +1,3 @@ -pub mod chat; pub mod responses; pub use responses::process_sse; diff --git a/codex-rs/codex-api/tests/clients.rs b/codex-rs/codex-api/tests/clients.rs index 3040fb872..96923ffe2 100644 --- a/codex-rs/codex-api/tests/clients.rs +++ b/codex-rs/codex-api/tests/clients.rs @@ -6,7 +6,6 @@ use anyhow::Result; use async_trait::async_trait; use bytes::Bytes; use codex_api::AuthProvider; -use codex_api::ChatClient; use codex_api::Provider; use codex_api::ResponsesClient; use codex_api::ResponsesOptions; @@ -195,34 +194,6 @@ data: {"id":"resp-1","output":[{"type":"message","role":"assistant","content":[{ } } -#[tokio::test] -async fn chat_client_uses_chat_completions_path_for_chat_wire() -> Result<()> { - let state = RecordingState::default(); - let transport = RecordingTransport::new(state.clone()); - let client = ChatClient::new(transport, provider("openai", WireApi::Chat), NoAuth); - - let body = serde_json::json!({ "echo": true }); - let _stream = client.stream(body, HeaderMap::new()).await?; - - let requests = state.take_stream_requests(); - assert_path_ends_with(&requests, "/chat/completions"); - Ok(()) -} - -#[tokio::test] -async fn chat_client_uses_responses_path_for_responses_wire() -> Result<()> { - let state = RecordingState::default(); - let transport = RecordingTransport::new(state.clone()); - let client = ChatClient::new(transport, provider("openai", WireApi::Responses), NoAuth); - - let body = serde_json::json!({ "echo": true }); - let _stream = client.stream(body, HeaderMap::new()).await?; - - let requests = state.take_stream_requests(); - assert_path_ends_with(&requests, "/responses"); - Ok(()) -} - #[tokio::test] async fn responses_client_uses_responses_path_for_responses_wire() -> Result<()> { let state = RecordingState::default(); @@ -240,10 +211,10 @@ async fn responses_client_uses_responses_path_for_responses_wire() -> Result<()> } #[tokio::test] -async fn responses_client_uses_chat_path_for_chat_wire() -> Result<()> { +async fn responses_client_uses_responses_path_for_compact_wire() -> Result<()> { let state = RecordingState::default(); let transport = RecordingTransport::new(state.clone()); - let client = ResponsesClient::new(transport, provider("openai", WireApi::Chat), NoAuth); + let client = ResponsesClient::new(transport, provider("openai", WireApi::Compact), NoAuth); let body = serde_json::json!({ "echo": true }); let _stream = client @@ -251,7 +222,7 @@ async fn responses_client_uses_chat_path_for_chat_wire() -> Result<()> { .await?; let requests = state.take_stream_requests(); - assert_path_ends_with(&requests, "/chat/completions"); + assert_path_ends_with(&requests, "/responses"); Ok(()) } diff --git a/codex-rs/common/src/oss.rs b/codex-rs/common/src/oss.rs index f686bb601..a44a6a7d3 100644 --- a/codex-rs/common/src/oss.rs +++ b/codex-rs/common/src/oss.rs @@ -1,52 +1,18 @@ //! OSS provider utilities shared between TUI and exec. use codex_core::LMSTUDIO_OSS_PROVIDER_ID; -use codex_core::OLLAMA_CHAT_PROVIDER_ID; use codex_core::OLLAMA_OSS_PROVIDER_ID; -use codex_core::WireApi; use codex_core::config::Config; -use codex_core::protocol::DeprecationNoticeEvent; -use std::io; /// Returns the default model for a given OSS provider. pub fn get_default_model_for_oss_provider(provider_id: &str) -> Option<&'static str> { match provider_id { LMSTUDIO_OSS_PROVIDER_ID => Some(codex_lmstudio::DEFAULT_OSS_MODEL), - OLLAMA_OSS_PROVIDER_ID | OLLAMA_CHAT_PROVIDER_ID => Some(codex_ollama::DEFAULT_OSS_MODEL), + OLLAMA_OSS_PROVIDER_ID => Some(codex_ollama::DEFAULT_OSS_MODEL), _ => None, } } -/// Returns a deprecation notice if Ollama doesn't support the responses wire API. -pub async fn ollama_chat_deprecation_notice( - config: &Config, -) -> io::Result> { - if config.model_provider_id != OLLAMA_OSS_PROVIDER_ID - || config.model_provider.wire_api != WireApi::Responses - { - return Ok(None); - } - - if let Some(detection) = codex_ollama::detect_wire_api(&config.model_provider).await? - && detection.wire_api == WireApi::Chat - { - let version_suffix = detection - .version - .as_ref() - .map(|version| format!(" (version {version})")) - .unwrap_or_default(); - let summary = format!( - "Your Ollama server{version_suffix} doesn't support the Responses API. Either update Ollama or set `oss_provider = \"{OLLAMA_CHAT_PROVIDER_ID}\"` (or `model_provider = \"{OLLAMA_CHAT_PROVIDER_ID}\"`) in your config.toml to use the \"chat\" wire API. Support for the \"chat\" wire API is deprecated and will soon be removed." - ); - return Ok(Some(DeprecationNoticeEvent { - summary, - details: None, - })); - } - - Ok(None) -} - /// Ensures the specified OSS provider is ready (models downloaded, service reachable). pub async fn ensure_oss_provider_ready( provider_id: &str, @@ -58,7 +24,8 @@ pub async fn ensure_oss_provider_ready( .await .map_err(|e| std::io::Error::other(format!("OSS setup failed: {e}")))?; } - OLLAMA_OSS_PROVIDER_ID | OLLAMA_CHAT_PROVIDER_ID => { + OLLAMA_OSS_PROVIDER_ID => { + codex_ollama::ensure_responses_supported(&config.model_provider).await?; codex_ollama::ensure_oss_ready(config) .await .map_err(|e| std::io::Error::other(format!("OSS setup failed: {e}")))?; diff --git a/codex-rs/core/config.schema.json b/codex-rs/core/config.schema.json index 2315f7586..0a048db69 100644 --- a/codex-rs/core/config.schema.json +++ b/codex-rs/core/config.schema.json @@ -464,7 +464,7 @@ "$ref": "#/definitions/WireApi" } ], - "default": "chat", + "default": "responses", "description": "Which wire protocol this provider expects." } }, @@ -1087,7 +1087,7 @@ "type": "string" }, "WireApi": { - "description": "Wire protocol that the provider speaks. Most third-party services only implement the classic OpenAI Chat Completions JSON schema, whereas OpenAI itself (and a handful of others) additionally expose the more modern *Responses* API. The two protocols use different request/response shapes and *cannot* be auto-detected at runtime, therefore each provider entry must declare which one it expects.", + "description": "Wire protocol that the provider speaks.", "oneOf": [ { "description": "The Responses API exposed by OpenAI at `/v1/responses`.", @@ -1095,13 +1095,6 @@ "responses" ], "type": "string" - }, - { - "description": "Regular Chat Completions compatible with `/v1/chat/completions`.", - "enum": [ - "chat" - ], - "type": "string" } ] } @@ -1423,7 +1416,7 @@ "type": "array" }, "oss_provider": { - "description": "Preferred OSS provider for local models, e.g. \"lmstudio\", \"ollama\", or \"ollama-chat\".", + "description": "Preferred OSS provider for local models, e.g. \"lmstudio\" or \"ollama\".", "type": "string" }, "otel": { diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 37fef163a..953b11b8f 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -8,8 +8,6 @@ use crate::api_bridge::auth_provider_from_auth; use crate::api_bridge::map_api_error; use crate::auth::UnauthorizedRecovery; use crate::turn_metadata::build_turn_metadata_header; -use codex_api::AggregateStreamExt; -use codex_api::ChatClient as ApiChatClient; use codex_api::CompactClient as ApiCompactClient; use codex_api::CompactionInput as ApiCompactionInput; use codex_api::Prompt as ApiPrompt; @@ -17,7 +15,6 @@ use codex_api::RequestTelemetry; use codex_api::ReqwestTransport; use codex_api::ResponseAppendWsRequest; use codex_api::ResponseCreateWsRequest; -use codex_api::ResponseStream as ApiResponseStream; use codex_api::ResponsesClient as ApiResponsesClient; use codex_api::ResponsesOptions as ApiResponsesOptions; use codex_api::ResponsesWebsocketClient as ApiWebSocketResponsesClient; @@ -69,7 +66,6 @@ use crate::features::Feature; use crate::flags::CODEX_RS_SSE_FIXTURE; use crate::model_provider_info::ModelProviderInfo; use crate::model_provider_info::WireApi; -use crate::tools::spec::create_tools_json_for_chat_completions_api; use crate::tools::spec::create_tools_json_for_responses_api; use crate::transport_manager::TransportManager; @@ -310,11 +306,7 @@ impl ModelClientSession { .and_then(|cache| cache.header.clone()) } - /// Streams a single model turn using either the Responses or Chat - /// Completions wire API, depending on the configured provider. - /// - /// For Chat providers, the underlying stream is optionally aggregated - /// based on the `show_raw_agent_reasoning` flag in the config. + /// Streams a single model turn using the configured Responses transport. pub async fn stream(&mut self, prompt: &Prompt) -> Result { let wire_api = self.state.provider.wire_api; match wire_api { @@ -328,21 +320,6 @@ impl ModelClientSession { self.stream_responses_api(prompt).await } } - WireApi::Chat => { - let api_stream = self.stream_chat_completions(prompt).await?; - - if self.state.config.show_raw_agent_reasoning { - Ok(map_response_stream( - api_stream.streaming_mode(), - self.state.otel_manager.clone(), - )) - } else { - Ok(map_response_stream( - api_stream.aggregate(), - self.state.otel_manager.clone(), - )) - } - } } } @@ -544,64 +521,6 @@ impl ModelClientSession { } } - /// Streams a turn via the OpenAI Chat Completions API. - /// - /// This path is only used when the provider is configured with - /// `WireApi::Chat`; it does not support `output_schema` today. - async fn stream_chat_completions(&self, prompt: &Prompt) -> Result { - if prompt.output_schema.is_some() { - return Err(CodexErr::UnsupportedOperation( - "output_schema is not supported for Chat Completions API".to_string(), - )); - } - - let auth_manager = self.state.auth_manager.clone(); - let instructions = prompt.base_instructions.text.clone(); - let tools_json = create_tools_json_for_chat_completions_api(&prompt.tools)?; - let api_prompt = build_api_prompt(prompt, instructions, tools_json); - let conversation_id = self.state.conversation_id.to_string(); - let session_source = self.state.session_source.clone(); - - let mut auth_recovery = auth_manager - .as_ref() - .map(super::auth::AuthManager::unauthorized_recovery); - loop { - let auth = match auth_manager.as_ref() { - Some(manager) => manager.auth().await, - None => None, - }; - let api_provider = self - .state - .provider - .to_api_provider(auth.as_ref().map(CodexAuth::internal_auth_mode))?; - let api_auth = auth_provider_from_auth(auth.clone(), &self.state.provider)?; - let transport = ReqwestTransport::new(build_reqwest_client()); - let (request_telemetry, sse_telemetry) = self.build_streaming_telemetry(); - let client = ApiChatClient::new(transport, api_provider, api_auth) - .with_telemetry(Some(request_telemetry), Some(sse_telemetry)); - - let stream_result = client - .stream_prompt( - &self.state.model_info.slug, - &api_prompt, - Some(conversation_id.clone()), - Some(session_source.clone()), - ) - .await; - - match stream_result { - Ok(stream) => return Ok(stream), - Err(ApiError::Transport(TransportError::Http { status, .. })) - if status == StatusCode::UNAUTHORIZED => - { - handle_unauthorized(status, &mut auth_recovery).await?; - continue; - } - Err(err) => return Err(map_api_error(err)), - } - } - } - /// Streams a turn via the OpenAI Responses API. /// /// Handles SSE fixtures, reasoning summaries, verbosity, and the @@ -709,7 +628,7 @@ impl ModelClientSession { } } - /// Builds request and SSE telemetry for streaming API calls (Chat/Responses). + /// Builds request and SSE telemetry for streaming API calls. fn build_streaming_telemetry(&self) -> (Arc, Arc) { let telemetry = Arc::new(ApiTelemetry::new(self.state.otel_manager.clone())); let request_telemetry: Arc = telemetry.clone(); diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index ba84b6b43..d6d4d93b5 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -3,9 +3,7 @@ use std::collections::HashSet; use std::fmt::Debug; use std::path::PathBuf; use std::sync::Arc; -use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering; use crate::AuthManager; use crate::CodexAuth; @@ -96,7 +94,6 @@ use tracing::trace_span; use tracing::warn; use crate::ModelProviderInfo; -use crate::WireApi; use crate::client::ModelClient; use crate::client::ModelClientSession; use crate::client_common::Prompt; @@ -130,7 +127,6 @@ use crate::mentions::build_connector_slug_counts; use crate::mentions::build_skill_name_counts; use crate::mentions::collect_explicit_app_paths; use crate::mentions::collect_tool_mentions_from_messages; -use crate::model_provider_info::CHAT_WIRE_API_DEPRECATION_SUMMARY; use crate::project_doc::get_user_instructions; use crate::proposed_plan_parser::ProposedPlanParser; use crate::proposed_plan_parser::ProposedPlanSegment; @@ -243,31 +239,6 @@ pub struct CodexSpawnOk { pub(crate) const INITIAL_SUBMIT_ID: &str = ""; pub(crate) const SUBMISSION_CHANNEL_CAPACITY: usize = 64; -static CHAT_WIRE_API_DEPRECATION_EMITTED: AtomicBool = AtomicBool::new(false); - -fn maybe_push_chat_wire_api_deprecation( - config: &Config, - post_session_configured_events: &mut Vec, -) { - if config.model_provider.wire_api != WireApi::Chat { - return; - } - - if CHAT_WIRE_API_DEPRECATION_EMITTED - .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) - .is_err() - { - return; - } - - post_session_configured_events.push(Event { - id: INITIAL_SUBMIT_ID.to_owned(), - msg: EventMsg::DeprecationNotice(DeprecationNoticeEvent { - summary: CHAT_WIRE_API_DEPRECATION_SUMMARY.to_string(), - details: None, - }), - }); -} impl Codex { /// Spawn a new [`Codex`] and initialize the session. @@ -865,7 +836,6 @@ impl Session { }), }); } - maybe_push_chat_wire_api_deprecation(&config, &mut post_session_configured_events); maybe_push_unstable_features_warning(&config, &mut post_session_configured_events); let auth = auth.as_ref(); diff --git a/codex-rs/core/src/config/mod.rs b/codex-rs/core/src/config/mod.rs index 8a53f388b..bafdb4b59 100644 --- a/codex-rs/core/src/config/mod.rs +++ b/codex-rs/core/src/config/mod.rs @@ -32,9 +32,10 @@ use crate::features::FeatureOverrides; use crate::features::Features; use crate::features::FeaturesToml; use crate::git_info::resolve_root_git_project_for_trust; +use crate::model_provider_info::LEGACY_OLLAMA_CHAT_PROVIDER_ID; use crate::model_provider_info::LMSTUDIO_OSS_PROVIDER_ID; use crate::model_provider_info::ModelProviderInfo; -use crate::model_provider_info::OLLAMA_CHAT_PROVIDER_ID; +use crate::model_provider_info::OLLAMA_CHAT_PROVIDER_REMOVED_ERROR; use crate::model_provider_info::OLLAMA_OSS_PROVIDER_ID; use crate::model_provider_info::built_in_model_providers; use crate::project_doc::DEFAULT_PROJECT_DOC_FILENAME; @@ -757,14 +758,20 @@ pub fn set_project_trust_level( pub fn set_default_oss_provider(codex_home: &Path, provider: &str) -> std::io::Result<()> { // Validate that the provider is one of the known OSS providers match provider { - LMSTUDIO_OSS_PROVIDER_ID | OLLAMA_OSS_PROVIDER_ID | OLLAMA_CHAT_PROVIDER_ID => { + LMSTUDIO_OSS_PROVIDER_ID | OLLAMA_OSS_PROVIDER_ID => { // Valid provider, continue } + LEGACY_OLLAMA_CHAT_PROVIDER_ID => { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + OLLAMA_CHAT_PROVIDER_REMOVED_ERROR, + )); + } _ => { return Err(std::io::Error::new( std::io::ErrorKind::InvalidInput, format!( - "Invalid OSS provider '{provider}'. Must be one of: {LMSTUDIO_OSS_PROVIDER_ID}, {OLLAMA_OSS_PROVIDER_ID}, {OLLAMA_CHAT_PROVIDER_ID}" + "Invalid OSS provider '{provider}'. Must be one of: {LMSTUDIO_OSS_PROVIDER_ID}, {OLLAMA_OSS_PROVIDER_ID}" ), )); } @@ -985,7 +992,7 @@ pub struct ConfigToml { pub experimental_compact_prompt_file: Option, pub experimental_use_unified_exec_tool: Option, pub experimental_use_freeform_apply_patch: Option, - /// Preferred OSS provider for local models, e.g. "lmstudio", "ollama", or "ollama-chat". + /// Preferred OSS provider for local models, e.g. "lmstudio" or "ollama". pub oss_provider: Option, } @@ -1411,10 +1418,12 @@ impl Config { let model_provider = model_providers .get(&model_provider_id) .ok_or_else(|| { - std::io::Error::new( - std::io::ErrorKind::NotFound, - format!("Model provider `{model_provider_id}` not found"), - ) + let message = if model_provider_id == LEGACY_OLLAMA_CHAT_PROVIDER_ID { + OLLAMA_CHAT_PROVIDER_REMOVED_ERROR.to_string() + } else { + format!("Model provider `{model_provider_id}` not found") + }; + std::io::Error::new(std::io::ErrorKind::NotFound, message) })? .clone(); @@ -3576,7 +3585,7 @@ model = "gpt-5.1-codex" cfg: ConfigToml, model_provider_map: HashMap, openai_provider: ModelProviderInfo, - openai_chat_completions_provider: ModelProviderInfo, + openai_custom_provider: ModelProviderInfo, } impl PrecedenceTestFixture { @@ -3658,11 +3667,11 @@ profile = "gpt3" [analytics] enabled = true -[model_providers.openai-chat-completions] -name = "OpenAI using Chat Completions" +[model_providers.openai-custom] +name = "OpenAI custom" base_url = "https://api.openai.com/v1" env_key = "OPENAI_API_KEY" -wire_api = "chat" +wire_api = "responses" request_max_retries = 4 # retry failed HTTP requests stream_max_retries = 10 # retry dropped SSE streams stream_idle_timeout_ms = 300000 # 5m idle timeout @@ -3676,7 +3685,7 @@ model_reasoning_summary = "detailed" [profiles.gpt3] model = "gpt-3.5-turbo" -model_provider = "openai-chat-completions" +model_provider = "openai-custom" [profiles.zdr] model = "o3" @@ -3707,11 +3716,11 @@ model_verbosity = "high" let codex_home_temp_dir = TempDir::new().unwrap(); - let openai_chat_completions_provider = ModelProviderInfo { - name: "OpenAI using Chat Completions".to_string(), + let openai_custom_provider = ModelProviderInfo { + name: "OpenAI custom".to_string(), base_url: Some("https://api.openai.com/v1".to_string()), env_key: Some("OPENAI_API_KEY".to_string()), - wire_api: crate::WireApi::Chat, + wire_api: crate::WireApi::Responses, env_key_instructions: None, experimental_bearer_token: None, query_params: None, @@ -3725,10 +3734,7 @@ model_verbosity = "high" }; let model_provider_map = { let mut model_provider_map = built_in_model_providers(); - model_provider_map.insert( - "openai-chat-completions".to_string(), - openai_chat_completions_provider.clone(), - ); + model_provider_map.insert("openai-custom".to_string(), openai_custom_provider.clone()); model_provider_map }; @@ -3743,7 +3749,7 @@ model_verbosity = "high" cfg, model_provider_map, openai_provider, - openai_chat_completions_provider, + openai_custom_provider, }) } @@ -3864,8 +3870,8 @@ model_verbosity = "high" review_model: None, model_context_window: None, model_auto_compact_token_limit: None, - model_provider_id: "openai-chat-completions".to_string(), - model_provider: fixture.openai_chat_completions_provider.clone(), + model_provider_id: "openai-custom".to_string(), + model_provider: fixture.openai_custom_provider.clone(), approval_policy: Constrained::allow_any(AskForApproval::UnlessTrusted), sandbox_policy: Constrained::allow_any(SandboxPolicy::new_read_only_policy()), enforce_residency: Constrained::allow_any(None), @@ -4266,6 +4272,50 @@ trust_level = "trusted" Ok(()) } + #[test] + fn test_set_default_oss_provider_rejects_legacy_ollama_chat_provider() -> std::io::Result<()> { + let temp_dir = TempDir::new()?; + let codex_home = temp_dir.path(); + + let result = set_default_oss_provider(codex_home, LEGACY_OLLAMA_CHAT_PROVIDER_ID); + assert!(result.is_err()); + let error = result.unwrap_err(); + assert_eq!(error.kind(), std::io::ErrorKind::InvalidInput); + assert!( + error + .to_string() + .contains(OLLAMA_CHAT_PROVIDER_REMOVED_ERROR) + ); + + Ok(()) + } + + #[test] + fn test_load_config_rejects_legacy_ollama_chat_provider_with_helpful_error() + -> std::io::Result<()> { + let codex_home = TempDir::new()?; + let cfg = ConfigToml { + model_provider: Some(LEGACY_OLLAMA_CHAT_PROVIDER_ID.to_string()), + ..Default::default() + }; + + let result = Config::load_from_base_config_with_overrides( + cfg, + ConfigOverrides::default(), + codex_home.path().to_path_buf(), + ); + assert!(result.is_err()); + let error = result.unwrap_err(); + assert_eq!(error.kind(), std::io::ErrorKind::NotFound); + assert!( + error + .to_string() + .contains(OLLAMA_CHAT_PROVIDER_REMOVED_ERROR) + ); + + Ok(()) + } + #[test] fn test_untrusted_project_gets_workspace_write_sandbox() -> anyhow::Result<()> { let config_with_untrusted = r#" diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 1f81f3eab..c9d67e19f 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -61,12 +61,10 @@ pub mod token_data; mod truncate; mod unified_exec; pub mod windows_sandbox; -pub use model_provider_info::CHAT_WIRE_API_DEPRECATION_SUMMARY; pub use model_provider_info::DEFAULT_LMSTUDIO_PORT; pub use model_provider_info::DEFAULT_OLLAMA_PORT; pub use model_provider_info::LMSTUDIO_OSS_PROVIDER_ID; pub use model_provider_info::ModelProviderInfo; -pub use model_provider_info::OLLAMA_CHAT_PROVIDER_ID; pub use model_provider_info::OLLAMA_OSS_PROVIDER_ID; pub use model_provider_info::WireApi; pub use model_provider_info::built_in_model_providers; diff --git a/codex-rs/core/src/model_provider_info.rs b/codex-rs/core/src/model_provider_info.rs index 6ba913489..126389193 100644 --- a/codex-rs/core/src/model_provider_info.rs +++ b/codex-rs/core/src/model_provider_info.rs @@ -28,25 +28,33 @@ const DEFAULT_REQUEST_MAX_RETRIES: u64 = 4; const MAX_STREAM_MAX_RETRIES: u64 = 100; /// Hard cap for user-configured `request_max_retries`. const MAX_REQUEST_MAX_RETRIES: u64 = 100; -pub const CHAT_WIRE_API_DEPRECATION_SUMMARY: &str = r#"Support for the "chat" wire API is deprecated and will soon be removed. Update your model provider definition in config.toml to use wire_api = "responses"."#; const OPENAI_PROVIDER_NAME: &str = "OpenAI"; +const CHAT_WIRE_API_REMOVED_ERROR: &str = "`wire_api = \"chat\"` is no longer supported.\nHow to fix: set `wire_api = \"responses\"` in your provider config.\nMore info: https://github.com/openai/codex/discussions/7782"; +pub(crate) const LEGACY_OLLAMA_CHAT_PROVIDER_ID: &str = "ollama-chat"; +pub(crate) const OLLAMA_CHAT_PROVIDER_REMOVED_ERROR: &str = "`ollama-chat` is no longer supported.\nHow to fix: replace `ollama-chat` with `ollama` in `model_provider`, `oss_provider`, or `--local-provider`.\nMore info: https://github.com/openai/codex/discussions/7782"; -/// Wire protocol that the provider speaks. Most third-party services only -/// implement the classic OpenAI Chat Completions JSON schema, whereas OpenAI -/// itself (and a handful of others) additionally expose the more modern -/// *Responses* API. The two protocols use different request/response shapes -/// and *cannot* be auto-detected at runtime, therefore each provider entry -/// must declare which one it expects. -#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +/// Wire protocol that the provider speaks. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, JsonSchema)] #[serde(rename_all = "lowercase")] pub enum WireApi { /// The Responses API exposed by OpenAI at `/v1/responses`. - Responses, - - /// Regular Chat Completions compatible with `/v1/chat/completions`. #[default] - Chat, + Responses, +} + +impl<'de> Deserialize<'de> for WireApi { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let value = String::deserialize(deserializer)?; + match value.as_str() { + "responses" => Ok(Self::Responses), + "chat" => Err(serde::de::Error::custom(CHAT_WIRE_API_REMOVED_ERROR)), + _ => Err(serde::de::Error::unknown_variant(&value, &["responses"])), + } + } } /// Serializable representation of a provider definition. @@ -163,7 +171,6 @@ impl ModelProviderInfo { query_params: self.query_params.clone(), wire: match self.wire_api { WireApi::Responses => ApiWireApi::Responses, - WireApi::Chat => ApiWireApi::Chat, }, headers, retry, @@ -172,12 +179,11 @@ impl ModelProviderInfo { } pub(crate) fn is_azure_responses_endpoint(&self) -> bool { - let wire = match self.wire_api { - WireApi::Responses => ApiWireApi::Responses, - WireApi::Chat => ApiWireApi::Chat, - }; - - is_azure_responses_wire_base_url(wire, &self.name, self.base_url.as_deref()) + is_azure_responses_wire_base_url( + ApiWireApi::Responses, + &self.name, + self.base_url.as_deref(), + ) } /// If `env_key` is Some, returns the API key for this provider if present @@ -277,7 +283,6 @@ pub const DEFAULT_OLLAMA_PORT: u16 = 11434; pub const LMSTUDIO_OSS_PROVIDER_ID: &str = "lmstudio"; pub const OLLAMA_OSS_PROVIDER_ID: &str = "ollama"; -pub const OLLAMA_CHAT_PROVIDER_ID: &str = "ollama-chat"; /// Built-in default provider list. pub fn built_in_model_providers() -> HashMap { @@ -293,10 +298,6 @@ pub fn built_in_model_providers() -> HashMap { OLLAMA_OSS_PROVIDER_ID, create_oss_provider(DEFAULT_OLLAMA_PORT, WireApi::Responses), ), - ( - OLLAMA_CHAT_PROVIDER_ID, - create_oss_provider(DEFAULT_OLLAMA_PORT, WireApi::Chat), - ), ( LMSTUDIO_OSS_PROVIDER_ID, create_oss_provider(DEFAULT_LMSTUDIO_PORT, WireApi::Responses), @@ -363,7 +364,7 @@ base_url = "http://localhost:11434/v1" env_key: None, env_key_instructions: None, experimental_bearer_token: None, - wire_api: WireApi::Chat, + wire_api: WireApi::Responses, query_params: None, http_headers: None, env_http_headers: None, @@ -392,7 +393,7 @@ query_params = { api-version = "2025-04-01-preview" } env_key: Some("AZURE_OPENAI_API_KEY".into()), env_key_instructions: None, experimental_bearer_token: None, - wire_api: WireApi::Chat, + wire_api: WireApi::Responses, query_params: Some(maplit::hashmap! { "api-version".to_string() => "2025-04-01-preview".to_string(), }), @@ -424,7 +425,7 @@ env_http_headers = { "X-Example-Env-Header" = "EXAMPLE_ENV_VAR" } env_key: Some("API_KEY".into()), env_key_instructions: None, experimental_bearer_token: None, - wire_api: WireApi::Chat, + wire_api: WireApi::Responses, query_params: None, http_headers: Some(maplit::hashmap! { "X-Example-Header".to_string() => "example-value".to_string(), @@ -442,4 +443,17 @@ env_http_headers = { "X-Example-Env-Header" = "EXAMPLE_ENV_VAR" } let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap(); assert_eq!(expected_provider, provider); } + + #[test] + fn test_deserialize_chat_wire_api_shows_helpful_error() { + let provider_toml = r#" +name = "OpenAI using Chat Completions" +base_url = "https://api.openai.com/v1" +env_key = "OPENAI_API_KEY" +wire_api = "chat" + "#; + + let err = toml::from_str::(provider_toml).unwrap_err(); + assert!(err.to_string().contains(CHAT_WIRE_API_REMOVED_ERROR)); + } } diff --git a/codex-rs/core/src/tools/spec.rs b/codex-rs/core/src/tools/spec.rs index c08feeef9..bd37e8bcc 100644 --- a/codex-rs/core/src/tools/spec.rs +++ b/codex-rs/core/src/tools/spec.rs @@ -1048,42 +1048,6 @@ pub fn create_tools_json_for_responses_api( Ok(tools_json) } -/// Returns JSON values that are compatible with Function Calling in the -/// Chat Completions API: -/// https://platform.openai.com/docs/guides/function-calling?api-mode=chat -pub(crate) fn create_tools_json_for_chat_completions_api( - tools: &[ToolSpec], -) -> crate::error::Result> { - // We start with the JSON for the Responses API and than rewrite it to match - // the chat completions tool call format. - let responses_api_tools_json = create_tools_json_for_responses_api(tools)?; - let tools_json = responses_api_tools_json - .into_iter() - .filter_map(|mut tool| { - if tool.get("type") != Some(&serde_json::Value::String("function".to_string())) { - return None; - } - - if let Some(map) = tool.as_object_mut() { - let name = map - .get("name") - .and_then(|v| v.as_str()) - .unwrap_or_default() - .to_string(); - // Remove "type" field as it is not needed in chat completions. - map.remove("type"); - Some(json!({ - "type": "function", - "name": name, - "function": map, - })) - } else { - None - } - }) - .collect::>(); - Ok(tools_json) -} pub(crate) fn mcp_tool_to_openai_tool( fully_qualified_name: String, @@ -2578,26 +2542,5 @@ Examples of valid command strings: }, })] ); - - let tools_json = create_tools_json_for_chat_completions_api(&tools).unwrap(); - - assert_eq!( - tools_json, - vec![json!({ - "type": "function", - "name": "demo", - "function": { - "name": "demo", - "description": "A demo tool", - "strict": false, - "parameters": { - "type": "object", - "properties": { - "foo": { "type": "string" } - }, - }, - } - })] - ); } } diff --git a/codex-rs/core/tests/chat_completions_payload.rs b/codex-rs/core/tests/chat_completions_payload.rs deleted file mode 100644 index 277209144..000000000 --- a/codex-rs/core/tests/chat_completions_payload.rs +++ /dev/null @@ -1,338 +0,0 @@ -#![allow(clippy::expect_used)] - -use std::sync::Arc; - -use codex_app_server_protocol::AuthMode; -use codex_core::ContentItem; -use codex_core::LocalShellAction; -use codex_core::LocalShellExecAction; -use codex_core::LocalShellStatus; -use codex_core::ModelClient; -use codex_core::ModelProviderInfo; -use codex_core::Prompt; -use codex_core::ResponseItem; -use codex_core::TransportManager; -use codex_core::WireApi; -use codex_core::models_manager::manager::ModelsManager; -use codex_otel::OtelManager; -use codex_protocol::ThreadId; -use codex_protocol::models::ReasoningItemContent; -use codex_protocol::protocol::SessionSource; -use core_test_support::load_default_config_for_test; -use core_test_support::skip_if_no_network; -use futures::StreamExt; -use serde_json::Value; -use tempfile::TempDir; -use wiremock::Mock; -use wiremock::MockServer; -use wiremock::ResponseTemplate; -use wiremock::matchers::method; -use wiremock::matchers::path; - -async fn run_request(input: Vec) -> Value { - let server = MockServer::start().await; - - let template = ResponseTemplate::new(200) - .insert_header("content-type", "text/event-stream") - .set_body_raw( - "data: {\"choices\":[{\"delta\":{}}]}\n\ndata: [DONE]\n\n", - "text/event-stream", - ); - - Mock::given(method("POST")) - .and(path("/v1/chat/completions")) - .respond_with(template) - .expect(1) - .mount(&server) - .await; - - let provider = ModelProviderInfo { - name: "mock".into(), - base_url: Some(format!("{}/v1", server.uri())), - env_key: None, - env_key_instructions: None, - experimental_bearer_token: None, - wire_api: WireApi::Chat, - query_params: None, - http_headers: None, - env_http_headers: None, - request_max_retries: Some(0), - stream_max_retries: Some(0), - stream_idle_timeout_ms: Some(5_000), - requires_openai_auth: false, - supports_websockets: false, - }; - - let codex_home = match TempDir::new() { - Ok(dir) => dir, - Err(e) => panic!("failed to create TempDir: {e}"), - }; - let mut config = load_default_config_for_test(&codex_home).await; - config.model_provider_id = provider.name.clone(); - config.model_provider = provider.clone(); - config.show_raw_agent_reasoning = true; - let effort = config.model_reasoning_effort; - let summary = config.model_reasoning_summary; - let config = Arc::new(config); - - let conversation_id = ThreadId::new(); - let model = ModelsManager::get_model_offline(config.model.as_deref()); - let model_info = ModelsManager::construct_model_info_offline(model.as_str(), &config); - let otel_manager = OtelManager::new( - conversation_id, - model.as_str(), - model_info.slug.as_str(), - None, - Some("test@test.com".to_string()), - Some(AuthMode::ApiKey), - false, - "test".to_string(), - SessionSource::Exec, - ); - - let mut client_session = ModelClient::new( - Arc::clone(&config), - None, - model_info, - otel_manager, - provider, - effort, - summary, - conversation_id, - SessionSource::Exec, - TransportManager::new(), - ) - .new_session(None); - - let mut prompt = Prompt::default(); - prompt.input = input; - - let mut stream = match client_session.stream(&prompt).await { - Ok(s) => s, - Err(e) => panic!("stream chat failed: {e}"), - }; - while let Some(event) = stream.next().await { - if let Err(e) = event { - panic!("stream event error: {e}"); - } - } - - let all_requests = server.received_requests().await.expect("received requests"); - let requests: Vec<_> = all_requests - .iter() - .filter(|req| req.method == "POST" && req.url.path().ends_with("/chat/completions")) - .collect(); - let request = requests - .first() - .unwrap_or_else(|| panic!("expected POST request to /chat/completions")); - match request.body_json() { - Ok(v) => v, - Err(e) => panic!("invalid json body: {e}"), - } -} - -fn user_message(text: &str) -> ResponseItem { - ResponseItem::Message { - id: None, - role: "user".to_string(), - content: vec![ContentItem::InputText { - text: text.to_string(), - }], - end_turn: None, - phase: None, - } -} - -fn assistant_message(text: &str) -> ResponseItem { - ResponseItem::Message { - id: None, - role: "assistant".to_string(), - content: vec![ContentItem::OutputText { - text: text.to_string(), - }], - end_turn: None, - phase: None, - } -} - -fn reasoning_item(text: &str) -> ResponseItem { - ResponseItem::Reasoning { - id: String::new(), - summary: Vec::new(), - content: Some(vec![ReasoningItemContent::ReasoningText { - text: text.to_string(), - }]), - encrypted_content: None, - } -} - -fn function_call() -> ResponseItem { - ResponseItem::FunctionCall { - id: None, - name: "f".to_string(), - arguments: "{}".to_string(), - call_id: "c1".to_string(), - } -} - -fn local_shell_call() -> ResponseItem { - ResponseItem::LocalShellCall { - id: Some("id1".to_string()), - call_id: None, - status: LocalShellStatus::InProgress, - action: LocalShellAction::Exec(LocalShellExecAction { - command: vec!["echo".to_string()], - timeout_ms: Some(1_000), - working_directory: None, - env: None, - user: None, - }), - } -} - -fn messages_from(body: &Value) -> Vec { - match body["messages"].as_array() { - Some(arr) => arr.clone(), - None => panic!("messages array missing"), - } -} - -fn first_assistant(messages: &[Value]) -> &Value { - match messages.iter().find(|msg| msg["role"] == "assistant") { - Some(v) => v, - None => panic!("assistant message not present"), - } -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn omits_reasoning_when_none_present() { - skip_if_no_network!(); - - let body = run_request(vec![user_message("u1"), assistant_message("a1")]).await; - let messages = messages_from(&body); - let assistant = first_assistant(&messages); - - assert_eq!(assistant["content"], Value::String("a1".into())); - assert!(assistant.get("reasoning").is_none()); -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn attaches_reasoning_to_previous_assistant() { - skip_if_no_network!(); - - let body = run_request(vec![ - user_message("u1"), - assistant_message("a1"), - reasoning_item("rA"), - ]) - .await; - let messages = messages_from(&body); - let assistant = first_assistant(&messages); - - assert_eq!(assistant["content"], Value::String("a1".into())); - assert_eq!(assistant["reasoning"], Value::String("rA".into())); -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn attaches_reasoning_to_function_call_anchor() { - skip_if_no_network!(); - - let body = run_request(vec![ - user_message("u1"), - reasoning_item("rFunc"), - function_call(), - ]) - .await; - let messages = messages_from(&body); - let assistant = first_assistant(&messages); - - assert_eq!(assistant["reasoning"], Value::String("rFunc".into())); - let tool_calls = match assistant["tool_calls"].as_array() { - Some(arr) => arr, - None => panic!("tool call list missing"), - }; - assert_eq!(tool_calls.len(), 1); - assert_eq!(tool_calls[0]["type"], Value::String("function".into())); -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn attaches_reasoning_to_local_shell_call() { - skip_if_no_network!(); - - let body = run_request(vec![ - user_message("u1"), - reasoning_item("rShell"), - local_shell_call(), - ]) - .await; - let messages = messages_from(&body); - let assistant = first_assistant(&messages); - - assert_eq!(assistant["reasoning"], Value::String("rShell".into())); - assert_eq!( - assistant["tool_calls"][0]["type"], - Value::String("local_shell_call".into()) - ); -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn drops_reasoning_when_last_role_is_user() { - skip_if_no_network!(); - - let body = run_request(vec![ - assistant_message("aPrev"), - reasoning_item("rHist"), - user_message("uNew"), - ]) - .await; - let messages = messages_from(&body); - assert!(messages.iter().all(|msg| msg.get("reasoning").is_none())); -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn ignores_reasoning_before_last_user() { - skip_if_no_network!(); - - let body = run_request(vec![ - user_message("u1"), - assistant_message("a1"), - user_message("u2"), - reasoning_item("rAfterU1"), - ]) - .await; - let messages = messages_from(&body); - assert!(messages.iter().all(|msg| msg.get("reasoning").is_none())); -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn skips_empty_reasoning_segments() { - skip_if_no_network!(); - - let body = run_request(vec![ - user_message("u1"), - assistant_message("a1"), - reasoning_item(""), - reasoning_item(" "), - ]) - .await; - let messages = messages_from(&body); - let assistant = first_assistant(&messages); - assert!(assistant.get("reasoning").is_none()); -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn suppresses_duplicate_assistant_messages() { - skip_if_no_network!(); - - let body = run_request(vec![assistant_message("dup"), assistant_message("dup")]).await; - let messages = messages_from(&body); - let assistant_messages: Vec<_> = messages - .iter() - .filter(|msg| msg["role"] == "assistant") - .collect(); - assert_eq!(assistant_messages.len(), 1); - assert_eq!( - assistant_messages[0]["content"], - Value::String("dup".into()) - ); -} diff --git a/codex-rs/core/tests/chat_completions_sse.rs b/codex-rs/core/tests/chat_completions_sse.rs deleted file mode 100644 index 98e2fd9f0..000000000 --- a/codex-rs/core/tests/chat_completions_sse.rs +++ /dev/null @@ -1,466 +0,0 @@ -use assert_matches::assert_matches; -use codex_core::AuthManager; -use std::sync::Arc; -use tracing_test::traced_test; - -use codex_core::CodexAuth; -use codex_core::ContentItem; -use codex_core::ModelClient; -use codex_core::ModelProviderInfo; -use codex_core::Prompt; -use codex_core::ResponseEvent; -use codex_core::ResponseItem; -use codex_core::TransportManager; -use codex_core::WireApi; -use codex_core::models_manager::manager::ModelsManager; -use codex_otel::OtelManager; -use codex_protocol::ThreadId; -use codex_protocol::models::ReasoningItemContent; -use codex_protocol::protocol::SessionSource; -use core_test_support::load_default_config_for_test; -use core_test_support::skip_if_no_network; -use futures::StreamExt; -use tempfile::TempDir; -use wiremock::Mock; -use wiremock::MockServer; -use wiremock::ResponseTemplate; -use wiremock::matchers::method; -use wiremock::matchers::path; - -async fn run_stream(sse_body: &str) -> Vec { - run_stream_with_bytes(sse_body.as_bytes()).await -} - -async fn run_stream_with_bytes(sse_body: &[u8]) -> Vec { - let server = MockServer::start().await; - - let template = ResponseTemplate::new(200) - .insert_header("content-type", "text/event-stream") - .set_body_bytes(sse_body.to_vec()); - - Mock::given(method("POST")) - .and(path("/v1/chat/completions")) - .respond_with(template) - .expect(1) - .mount(&server) - .await; - - let provider = ModelProviderInfo { - name: "mock".into(), - base_url: Some(format!("{}/v1", server.uri())), - env_key: None, - env_key_instructions: None, - experimental_bearer_token: None, - wire_api: WireApi::Chat, - query_params: None, - http_headers: None, - env_http_headers: None, - request_max_retries: Some(0), - stream_max_retries: Some(0), - stream_idle_timeout_ms: Some(5_000), - requires_openai_auth: false, - supports_websockets: false, - }; - - let codex_home = match TempDir::new() { - Ok(dir) => dir, - Err(e) => panic!("failed to create TempDir: {e}"), - }; - let mut config = load_default_config_for_test(&codex_home).await; - config.model_provider_id = provider.name.clone(); - config.model_provider = provider.clone(); - config.show_raw_agent_reasoning = true; - let effort = config.model_reasoning_effort; - let summary = config.model_reasoning_summary; - let config = Arc::new(config); - - let conversation_id = ThreadId::new(); - let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key")); - let auth_mode = auth_manager.get_auth_mode(); - let model = ModelsManager::get_model_offline(config.model.as_deref()); - let model_info = ModelsManager::construct_model_info_offline(model.as_str(), &config); - let otel_manager = OtelManager::new( - conversation_id, - model.as_str(), - model_info.slug.as_str(), - None, - Some("test@test.com".to_string()), - auth_mode, - false, - "test".to_string(), - SessionSource::Exec, - ); - - let mut client = ModelClient::new( - Arc::clone(&config), - None, - model_info, - otel_manager, - provider, - effort, - summary, - conversation_id, - SessionSource::Exec, - TransportManager::new(), - ) - .new_session(None); - - let mut prompt = Prompt::default(); - prompt.input = vec![ResponseItem::Message { - id: None, - role: "user".to_string(), - content: vec![ContentItem::InputText { - text: "hello".to_string(), - }], - end_turn: None, - phase: None, - }]; - - let mut stream = match client.stream(&prompt).await { - Ok(s) => s, - Err(e) => panic!("stream chat failed: {e}"), - }; - let mut events = Vec::new(); - while let Some(event) = stream.next().await { - match event { - Ok(ev) => events.push(ev), - // We still collect the error to exercise telemetry and complete the task. - Err(_e) => break, - } - } - events -} - -fn assert_message(item: &ResponseItem, expected: &str) { - if let ResponseItem::Message { content, .. } = item { - let text = content.iter().find_map(|part| match part { - ContentItem::OutputText { text } | ContentItem::InputText { text } => Some(text), - _ => None, - }); - let Some(text) = text else { - panic!("message missing text: {item:?}"); - }; - assert_eq!(text, expected); - } else { - panic!("expected message item, got: {item:?}"); - } -} - -fn assert_reasoning(item: &ResponseItem, expected: &str) { - if let ResponseItem::Reasoning { - content: Some(parts), - .. - } = item - { - let mut combined = String::new(); - for part in parts { - match part { - ReasoningItemContent::ReasoningText { text } - | ReasoningItemContent::Text { text } => combined.push_str(text), - } - } - assert_eq!(combined, expected); - } else { - panic!("expected reasoning item, got: {item:?}"); - } -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn streams_text_without_reasoning() { - skip_if_no_network!(); - - let sse = concat!( - "data: {\"choices\":[{\"delta\":{\"content\":\"hi\"}}]}\n\n", - "data: {\"choices\":[{\"delta\":{}}]}\n\n", - "data: [DONE]\n\n", - ); - - let events = run_stream(sse).await; - assert_eq!(events.len(), 4, "unexpected events: {events:?}"); - - match &events[0] { - ResponseEvent::OutputItemAdded(ResponseItem::Message { .. }) => {} - other => panic!("expected initial assistant item, got {other:?}"), - } - - match &events[1] { - ResponseEvent::OutputTextDelta(text) => assert_eq!(text, "hi"), - other => panic!("expected text delta, got {other:?}"), - } - - match &events[2] { - ResponseEvent::OutputItemDone(item) => assert_message(item, "hi"), - other => panic!("expected terminal message, got {other:?}"), - } - - assert_matches!(events[3], ResponseEvent::Completed { .. }); -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn streams_reasoning_from_string_delta() { - skip_if_no_network!(); - - let sse = concat!( - "data: {\"choices\":[{\"delta\":{\"reasoning\":\"think1\"}}]}\n\n", - "data: {\"choices\":[{\"delta\":{\"content\":\"ok\"}}]}\n\n", - "data: {\"choices\":[{\"delta\":{} ,\"finish_reason\":\"stop\"}]}\n\n", - ); - - let events = run_stream(sse).await; - assert_eq!(events.len(), 7, "unexpected events: {events:?}"); - - match &events[0] { - ResponseEvent::OutputItemAdded(ResponseItem::Reasoning { .. }) => {} - other => panic!("expected initial reasoning item, got {other:?}"), - } - - match &events[1] { - ResponseEvent::ReasoningContentDelta { - delta, - content_index, - } => { - assert_eq!(delta, "think1"); - assert_eq!(content_index, &0); - } - other => panic!("expected reasoning delta, got {other:?}"), - } - - match &events[2] { - ResponseEvent::OutputItemAdded(ResponseItem::Message { .. }) => {} - other => panic!("expected initial message item, got {other:?}"), - } - - match &events[3] { - ResponseEvent::OutputTextDelta(text) => assert_eq!(text, "ok"), - other => panic!("expected text delta, got {other:?}"), - } - - match &events[4] { - ResponseEvent::OutputItemDone(item) => assert_reasoning(item, "think1"), - other => panic!("expected terminal reasoning, got {other:?}"), - } - - match &events[5] { - ResponseEvent::OutputItemDone(item) => assert_message(item, "ok"), - other => panic!("expected terminal message, got {other:?}"), - } - - assert_matches!(events[6], ResponseEvent::Completed { .. }); -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn streams_reasoning_from_object_delta() { - skip_if_no_network!(); - - let sse = concat!( - "data: {\"choices\":[{\"delta\":{\"reasoning\":{\"text\":\"partA\"}}}]}\n\n", - "data: {\"choices\":[{\"delta\":{\"reasoning\":{\"content\":\"partB\"}}}]}\n\n", - "data: {\"choices\":[{\"delta\":{\"content\":\"answer\"}}]}\n\n", - "data: {\"choices\":[{\"delta\":{} ,\"finish_reason\":\"stop\"}]}\n\n", - ); - - let events = run_stream(sse).await; - assert_eq!(events.len(), 8, "unexpected events: {events:?}"); - - match &events[0] { - ResponseEvent::OutputItemAdded(ResponseItem::Reasoning { .. }) => {} - other => panic!("expected initial reasoning item, got {other:?}"), - } - - match &events[1] { - ResponseEvent::ReasoningContentDelta { - delta, - content_index, - } => { - assert_eq!(delta, "partA"); - assert_eq!(content_index, &0); - } - other => panic!("expected reasoning delta, got {other:?}"), - } - - match &events[2] { - ResponseEvent::ReasoningContentDelta { - delta, - content_index, - } => { - assert_eq!(delta, "partB"); - assert_eq!(content_index, &1); - } - other => panic!("expected reasoning delta, got {other:?}"), - } - - match &events[3] { - ResponseEvent::OutputItemAdded(ResponseItem::Message { .. }) => {} - other => panic!("expected initial message item, got {other:?}"), - } - - match &events[4] { - ResponseEvent::OutputTextDelta(text) => assert_eq!(text, "answer"), - other => panic!("expected text delta, got {other:?}"), - } - - match &events[5] { - ResponseEvent::OutputItemDone(item) => assert_reasoning(item, "partApartB"), - other => panic!("expected terminal reasoning, got {other:?}"), - } - - match &events[6] { - ResponseEvent::OutputItemDone(item) => assert_message(item, "answer"), - other => panic!("expected terminal message, got {other:?}"), - } - - assert_matches!(events[7], ResponseEvent::Completed { .. }); -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn streams_reasoning_from_final_message() { - skip_if_no_network!(); - - let sse = "data: {\"choices\":[{\"message\":{\"reasoning\":\"final-cot\"},\"finish_reason\":\"stop\"}]}\n\n"; - - let events = run_stream(sse).await; - assert_eq!(events.len(), 4, "unexpected events: {events:?}"); - - match &events[0] { - ResponseEvent::OutputItemAdded(ResponseItem::Reasoning { .. }) => {} - other => panic!("expected initial reasoning item, got {other:?}"), - } - - match &events[1] { - ResponseEvent::ReasoningContentDelta { - delta, - content_index, - } => { - assert_eq!(delta, "final-cot"); - assert_eq!(content_index, &0); - } - other => panic!("expected reasoning delta, got {other:?}"), - } - - match &events[2] { - ResponseEvent::OutputItemDone(item) => assert_reasoning(item, "final-cot"), - other => panic!("expected reasoning item, got {other:?}"), - } - - assert_matches!(events[3], ResponseEvent::Completed { .. }); -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn streams_reasoning_before_tool_call() { - skip_if_no_network!(); - - let sse = concat!( - "data: {\"choices\":[{\"delta\":{\"reasoning\":\"pre-tool\"}}]}\n\n", - "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"id\":\"call_1\",\"type\":\"function\",\"function\":{\"name\":\"run\",\"arguments\":\"{}\"}}]},\"finish_reason\":\"tool_calls\"}]}\n\n", - ); - - let events = run_stream(sse).await; - assert_eq!(events.len(), 5, "unexpected events: {events:?}"); - - match &events[0] { - ResponseEvent::OutputItemAdded(ResponseItem::Reasoning { .. }) => {} - other => panic!("expected initial reasoning item, got {other:?}"), - } - - match &events[1] { - ResponseEvent::ReasoningContentDelta { - delta, - content_index, - } => { - assert_eq!(delta, "pre-tool"); - assert_eq!(content_index, &0); - } - other => panic!("expected reasoning delta, got {other:?}"), - } - - match &events[2] { - ResponseEvent::OutputItemDone(item) => assert_reasoning(item, "pre-tool"), - other => panic!("expected reasoning item, got {other:?}"), - } - - match &events[3] { - ResponseEvent::OutputItemDone(ResponseItem::FunctionCall { - name, - arguments, - call_id, - .. - }) => { - assert_eq!(name, "run"); - assert_eq!(arguments, "{}"); - assert_eq!(call_id, "call_1"); - } - other => panic!("expected function call, got {other:?}"), - } - - assert_matches!(events[4], ResponseEvent::Completed { .. }); -} - -#[tokio::test] -#[traced_test] -async fn chat_sse_emits_failed_on_parse_error() { - skip_if_no_network!(); - - let sse_body = concat!("data: not-json\n\n", "data: [DONE]\n\n"); - - let _ = run_stream(sse_body).await; - - logs_assert(|lines: &[&str]| { - lines - .iter() - .find(|line| { - line.contains("codex.api_request") && line.contains("http.response.status_code=200") - }) - .map(|_| Ok(())) - .unwrap_or(Err("cannot find codex.api_request event".to_string())) - }); - - logs_assert(|lines: &[&str]| { - lines - .iter() - .find(|line| { - line.contains("codex.sse_event") - && line.contains("error.message") - && line.contains("expected ident at line 1 column 2") - }) - .map(|_| Ok(())) - .unwrap_or(Err("cannot find SSE event".to_string())) - }); -} - -#[tokio::test] -#[traced_test] -async fn chat_sse_done_chunk_emits_event() { - skip_if_no_network!(); - - let sse_body = "data: [DONE]\n\n"; - - let _ = run_stream(sse_body).await; - - logs_assert(|lines: &[&str]| { - lines - .iter() - .find(|line| line.contains("codex.sse_event") && line.contains("event.kind=message")) - .map(|_| Ok(())) - .unwrap_or(Err("cannot find SSE event".to_string())) - }); -} - -#[tokio::test] -#[traced_test] -async fn chat_sse_emits_error_on_invalid_utf8() { - skip_if_no_network!(); - - let _ = run_stream_with_bytes(b"data: \x80\x80\n\n").await; - - logs_assert(|lines: &[&str]| { - lines - .iter() - .find(|line| { - line.contains("codex.sse_event") - && line.contains("error.message") - && line.contains("UTF8 error: invalid utf-8 sequence of 1 bytes from index 0") - }) - .map(|_| Ok(())) - .unwrap_or(Err("cannot find SSE event".to_string())) - }); -} diff --git a/codex-rs/core/tests/suite/cli_stream.rs b/codex-rs/core/tests/suite/cli_stream.rs index 9ca2c6bd6..291f596e2 100644 --- a/codex-rs/core/tests/suite/cli_stream.rs +++ b/codex-rs/core/tests/suite/cli_stream.rs @@ -4,15 +4,12 @@ use codex_core::auth::CODEX_API_KEY_ENV_VAR; use codex_core::protocol::GitInfo; use codex_utils_cargo_bin::find_resource; use core_test_support::fs_wait; +use core_test_support::responses; use core_test_support::skip_if_no_network; use std::time::Duration; use tempfile::TempDir; use uuid::Uuid; -use wiremock::Mock; use wiremock::MockServer; -use wiremock::ResponseTemplate; -use wiremock::matchers::method; -use wiremock::matchers::path; fn repo_root() -> std::path::PathBuf { #[expect(clippy::expect_used)] @@ -24,41 +21,28 @@ fn cli_responses_fixture() -> std::path::PathBuf { find_resource!("tests/cli_responses_fixture.sse").expect("failed to resolve fixture path") } -/// Tests streaming chat completions through the CLI using a mock server. -/// This test: -/// 1. Sets up a mock server that simulates OpenAI's chat completions API -/// 2. Configures codex to use this mock server via a custom provider -/// 3. Sends a simple "hello?" prompt and verifies the streamed response -/// 4. Ensures the response is received exactly once and contains "hi" +/// Tests streaming the Responses API through the CLI using a mock server. #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn chat_mode_stream_cli() { +async fn responses_mode_stream_cli() { skip_if_no_network!(); let server = MockServer::start().await; let repo_root = repo_root(); - let sse = concat!( - "data: {\"choices\":[{\"delta\":{\"content\":\"hi\"}}]}\n\n", - "data: {\"choices\":[{\"delta\":{}}]}\n\n", - "data: [DONE]\n\n" - ); - Mock::given(method("POST")) - .and(path("/v1/chat/completions")) - .respond_with( - ResponseTemplate::new(200) - .insert_header("content-type", "text/event-stream") - .set_body_raw(sse, "text/event-stream"), - ) - .expect(1) - .mount(&server) - .await; + let sse = responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_assistant_message("msg-1", "hi"), + responses::ev_completed("resp-1"), + ]); + let resp_mock = responses::mount_sse_once(&server, sse).await; let home = TempDir::new().unwrap(); let provider_override = format!( - "model_providers.mock={{ name = \"mock\", base_url = \"{}/v1\", env_key = \"PATH\", wire_api = \"chat\" }}", + "model_providers.mock={{ name = \"mock\", base_url = \"{}/v1\", env_key = \"PATH\", wire_api = \"responses\" }}", server.uri() ); let bin = codex_utils_cargo_bin::cargo_bin("codex").unwrap(); let mut cmd = AssertCommand::new(bin); + cmd.timeout(Duration::from_secs(30)); cmd.arg("exec") .arg("--skip-git-repo-check") .arg("-c") @@ -81,7 +65,8 @@ async fn chat_mode_stream_cli() { let hi_lines = stdout.lines().filter(|line| line.trim() == "hi").count(); assert_eq!(hi_lines, 1, "Expected exactly one line with 'hi'"); - server.verify().await; + let request = resp_mock.single_request(); + assert_eq!(request.path(), "/v1/responses"); // Verify a new session rollout was created and is discoverable via list_conversations let provider_filter = vec!["mock".to_string()]; diff --git a/codex-rs/core/tests/suite/rmcp_client.rs b/codex-rs/core/tests/suite/rmcp_client.rs index cdf6a1adc..b5664c2e7 100644 --- a/codex-rs/core/tests/suite/rmcp_client.rs +++ b/codex-rs/core/tests/suite/rmcp_client.rs @@ -329,200 +329,6 @@ async fn stdio_image_responses_round_trip() -> anyhow::Result<()> { Ok(()) } -#[tokio::test(flavor = "multi_thread", worker_threads = 1)] -#[serial(mcp_test_value)] -async fn stdio_image_completions_round_trip() -> anyhow::Result<()> { - skip_if_no_network!(Ok(())); - - let server = responses::start_mock_server().await; - - let call_id = "img-cc-1"; - let server_name = "rmcp"; - let tool_name = format!("mcp__{server_name}__image"); - - let tool_call = json!({ - "choices": [ - { - "delta": { - "tool_calls": [ - { - "id": call_id, - "type": "function", - "function": {"name": tool_name, "arguments": "{}"} - } - ] - }, - "finish_reason": "tool_calls" - } - ] - }); - let sse_tool_call = format!( - "data: {}\n\ndata: [DONE]\n\n", - serde_json::to_string(&tool_call)? - ); - - let final_assistant = json!({ - "choices": [ - { - "delta": {"content": "rmcp image tool completed successfully."}, - "finish_reason": "stop" - } - ] - }); - let sse_final = format!( - "data: {}\n\ndata: [DONE]\n\n", - serde_json::to_string(&final_assistant)? - ); - - use std::sync::atomic::AtomicUsize; - use std::sync::atomic::Ordering; - struct ChatSeqResponder { - num_calls: AtomicUsize, - bodies: Vec, - } - impl wiremock::Respond for ChatSeqResponder { - fn respond(&self, _: &wiremock::Request) -> wiremock::ResponseTemplate { - let idx = self.num_calls.fetch_add(1, Ordering::SeqCst); - match self.bodies.get(idx) { - Some(body) => wiremock::ResponseTemplate::new(200) - .insert_header("content-type", "text/event-stream") - .set_body_string(body.clone()), - None => panic!("no chat completion response for index {idx}"), - } - } - } - - let chat_seq = ChatSeqResponder { - num_calls: AtomicUsize::new(0), - bodies: vec![sse_tool_call, sse_final], - }; - wiremock::Mock::given(wiremock::matchers::method("POST")) - .and(wiremock::matchers::path("/v1/chat/completions")) - .respond_with(chat_seq) - .expect(2) - .mount(&server) - .await; - - let rmcp_test_server_bin = stdio_server_bin()?; - - let fixture = test_codex() - .with_config(move |config| { - config.model_provider.wire_api = codex_core::WireApi::Chat; - let mut servers = config.mcp_servers.get().clone(); - servers.insert( - server_name.to_string(), - McpServerConfig { - transport: McpServerTransportConfig::Stdio { - command: rmcp_test_server_bin, - args: Vec::new(), - env: Some(HashMap::from([( - "MCP_TEST_IMAGE_DATA_URL".to_string(), - OPENAI_PNG.to_string(), - )])), - env_vars: Vec::new(), - cwd: None, - }, - enabled: true, - disabled_reason: None, - startup_timeout_sec: Some(Duration::from_secs(10)), - tool_timeout_sec: None, - enabled_tools: None, - disabled_tools: None, - scopes: None, - }, - ); - config - .mcp_servers - .set(servers) - .expect("test mcp servers should accept any configuration"); - }) - .build(&server) - .await?; - let session_model = fixture.session_configured.model.clone(); - - fixture - .codex - .submit(Op::UserTurn { - items: vec![UserInput::Text { - text: "call the rmcp image tool".into(), - text_elements: Vec::new(), - }], - final_output_json_schema: None, - cwd: fixture.cwd.path().to_path_buf(), - approval_policy: AskForApproval::Never, - sandbox_policy: SandboxPolicy::ReadOnly, - model: session_model, - effort: None, - summary: ReasoningSummary::Auto, - collaboration_mode: None, - personality: None, - }) - .await?; - - let begin_event = wait_for_event(&fixture.codex, |ev| { - matches!(ev, EventMsg::McpToolCallBegin(_)) - }) - .await; - let EventMsg::McpToolCallBegin(begin) = begin_event else { - unreachable!("begin"); - }; - assert_eq!( - begin, - McpToolCallBeginEvent { - call_id: call_id.to_string(), - invocation: McpInvocation { - server: server_name.to_string(), - tool: "image".to_string(), - arguments: Some(json!({})), - }, - }, - ); - - let end_event = wait_for_event(&fixture.codex, |ev| { - matches!(ev, EventMsg::McpToolCallEnd(_)) - }) - .await; - let EventMsg::McpToolCallEnd(end) = end_event else { - unreachable!("end"); - }; - assert!(end.result.as_ref().is_ok(), "tool call should succeed"); - - wait_for_event(&fixture.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; - - // Chat Completions assertion: the second POST should include a tool role message - // with an array `content` containing an item with the expected data URL. - let all_requests = server.received_requests().await.expect("requests captured"); - let requests: Vec<_> = all_requests - .iter() - .filter(|req| req.method == "POST" && req.url.path().ends_with("/chat/completions")) - .collect(); - assert!(requests.len() >= 2, "expected two chat completion calls"); - let second = requests[1]; - let body: Value = serde_json::from_slice(&second.body)?; - let messages = body - .get("messages") - .and_then(Value::as_array) - .cloned() - .expect("messages array"); - let tool_msg = messages - .iter() - .find(|m| { - m.get("role") == Some(&json!("tool")) && m.get("tool_call_id") == Some(&json!(call_id)) - }) - .cloned() - .expect("tool message present"); - assert_eq!( - tool_msg, - json!({ - "role": "tool", - "tool_call_id": call_id, - "content": [{"type": "image_url", "image_url": {"url": OPENAI_PNG}}] - }) - ); - - Ok(()) -} - #[tokio::test(flavor = "multi_thread", worker_threads = 1)] #[serial(mcp_test_value)] async fn stdio_server_propagates_whitelisted_env_vars() -> anyhow::Result<()> { diff --git a/codex-rs/exec/src/cli.rs b/codex-rs/exec/src/cli.rs index 2980370de..c27c39f01 100644 --- a/codex-rs/exec/src/cli.rs +++ b/codex-rs/exec/src/cli.rs @@ -30,7 +30,7 @@ pub struct Cli { #[arg(long = "oss", default_value_t = false)] pub oss: bool, - /// Specify which local provider to use (lmstudio, ollama, or ollama-chat). + /// Specify which local provider to use (lmstudio or ollama). /// If not specified with --oss, will use config default or show selection. #[arg(long = "local-provider")] pub oss_provider: Option, diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index 74f056ba4..a6ded3927 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -16,11 +16,9 @@ pub use cli::ReviewArgs; use codex_cloud_requirements::cloud_requirements_loader; use codex_common::oss::ensure_oss_provider_ready; use codex_common::oss::get_default_model_for_oss_provider; -use codex_common::oss::ollama_chat_deprecation_notice; use codex_core::AuthManager; use codex_core::LMSTUDIO_OSS_PROVIDER_ID; use codex_core::NewThread; -use codex_core::OLLAMA_CHAT_PROVIDER_ID; use codex_core::OLLAMA_OSS_PROVIDER_ID; use codex_core::ThreadManager; use codex_core::auth::enforce_login_restrictions; @@ -219,7 +217,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any Some(provider) } else { return Err(anyhow::anyhow!( - "No default OSS provider configured. Use --local-provider=provider or set oss_provider to one of: {LMSTUDIO_OSS_PROVIDER_ID}, {OLLAMA_OSS_PROVIDER_ID}, {OLLAMA_CHAT_PROVIDER_ID} in config.toml" + "No default OSS provider configured. Use --local-provider=provider or set oss_provider to one of: {LMSTUDIO_OSS_PROVIDER_ID}, {OLLAMA_OSS_PROVIDER_ID} in config.toml" )); } } else { @@ -273,14 +271,6 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any std::process::exit(1); } - let ollama_chat_support_notice = match ollama_chat_deprecation_notice(&config).await { - Ok(notice) => notice, - Err(err) => { - tracing::warn!(?err, "Failed to detect Ollama wire API"); - None - } - }; - let otel = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { codex_core::otel_init::build_provider(&config, env!("CARGO_PKG_VERSION"), None, false) })) { @@ -313,12 +303,6 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any last_message_file.clone(), )), }; - if let Some(notice) = ollama_chat_support_notice { - event_processor.process_event(Event { - id: String::new(), - msg: EventMsg::DeprecationNotice(notice), - }); - } if oss { // We're in the oss section, so provider_id should be Some diff --git a/codex-rs/mcp-server/tests/common/lib.rs b/codex-rs/mcp-server/tests/common/lib.rs index c2c3757ef..d2ed896ce 100644 --- a/codex-rs/mcp-server/tests/common/lib.rs +++ b/codex-rs/mcp-server/tests/common/lib.rs @@ -6,7 +6,7 @@ pub use core_test_support::format_with_current_shell; pub use core_test_support::format_with_current_shell_display_non_login; pub use core_test_support::format_with_current_shell_non_login; pub use mcp_process::McpProcess; -pub use mock_model_server::create_mock_chat_completions_server; +pub use mock_model_server::create_mock_responses_server; pub use responses::create_apply_patch_sse_response; pub use responses::create_final_assistant_message_sse_response; pub use responses::create_shell_command_sse_response; diff --git a/codex-rs/mcp-server/tests/common/mock_model_server.rs b/codex-rs/mcp-server/tests/common/mock_model_server.rs index be7f3eb5b..a1cec2a22 100644 --- a/codex-rs/mcp-server/tests/common/mock_model_server.rs +++ b/codex-rs/mcp-server/tests/common/mock_model_server.rs @@ -9,8 +9,8 @@ use wiremock::matchers::method; use wiremock::matchers::path; /// Create a mock server that will provide the responses, in order, for -/// requests to the `/v1/chat/completions` endpoint. -pub async fn create_mock_chat_completions_server(responses: Vec) -> MockServer { +/// requests to the `/v1/responses` endpoint. +pub async fn create_mock_responses_server(responses: Vec) -> MockServer { let server = MockServer::start().await; let num_calls = responses.len(); @@ -20,7 +20,7 @@ pub async fn create_mock_chat_completions_server(responses: Vec) -> Mock }; Mock::given(method("POST")) - .and(path("/v1/chat/completions")) + .and(path("/v1/responses")) .respond_with(seq_responder) .expect(num_calls as u64) .mount(&server) diff --git a/codex-rs/mcp-server/tests/common/responses.rs b/codex-rs/mcp-server/tests/common/responses.rs index 0a9183c04..48a575a4c 100644 --- a/codex-rs/mcp-server/tests/common/responses.rs +++ b/codex-rs/mcp-server/tests/common/responses.rs @@ -1,96 +1,47 @@ -use serde_json::json; use std::path::Path; +use core_test_support::responses; +use serde_json::json; + pub fn create_shell_command_sse_response( command: Vec, workdir: Option<&Path>, timeout_ms: Option, call_id: &str, ) -> anyhow::Result { - // The `arguments` for the `shell_command` tool is a serialized JSON object. let command_str = shlex::try_join(command.iter().map(String::as_str))?; - let tool_call_arguments = serde_json::to_string(&json!({ + let arguments = serde_json::to_string(&json!({ "command": command_str, "workdir": workdir.map(|w| w.to_string_lossy()), - "timeout_ms": timeout_ms + "timeout_ms": timeout_ms, }))?; - let tool_call = json!({ - "choices": [ - { - "delta": { - "tool_calls": [ - { - "id": call_id, - "function": { - "name": "shell_command", - "arguments": tool_call_arguments - } - } - ] - }, - "finish_reason": "tool_calls" - } - ] - }); - - let sse = format!( - "data: {}\n\ndata: DONE\n\n", - serde_json::to_string(&tool_call)? - ); - Ok(sse) + let response_id = format!("resp-{call_id}"); + Ok(responses::sse(vec![ + responses::ev_response_created(&response_id), + responses::ev_function_call(call_id, "shell_command", &arguments), + responses::ev_completed(&response_id), + ])) } pub fn create_final_assistant_message_sse_response(message: &str) -> anyhow::Result { - let assistant_message = json!({ - "choices": [ - { - "delta": { - "content": message - }, - "finish_reason": "stop" - } - ] - }); - - let sse = format!( - "data: {}\n\ndata: DONE\n\n", - serde_json::to_string(&assistant_message)? - ); - Ok(sse) + let response_id = "resp-final"; + Ok(responses::sse(vec![ + responses::ev_response_created(response_id), + responses::ev_assistant_message("msg-final", message), + responses::ev_completed(response_id), + ])) } pub fn create_apply_patch_sse_response( patch_content: &str, call_id: &str, ) -> anyhow::Result { - // Use shell_command to call apply_patch with heredoc format let command = format!("apply_patch <<'EOF'\n{patch_content}\nEOF"); - let tool_call_arguments = serde_json::to_string(&json!({ - "command": command - }))?; - - let tool_call = json!({ - "choices": [ - { - "delta": { - "tool_calls": [ - { - "id": call_id, - "function": { - "name": "shell_command", - "arguments": tool_call_arguments - } - } - ] - }, - "finish_reason": "tool_calls" - } - ] - }); - - let sse = format!( - "data: {}\n\ndata: DONE\n\n", - serde_json::to_string(&tool_call)? - ); - Ok(sse) + let arguments = serde_json::to_string(&json!({ "command": command }))?; + let response_id = format!("resp-{call_id}"); + Ok(responses::sse(vec![ + responses::ev_response_created(&response_id), + responses::ev_function_call(call_id, "shell_command", &arguments), + responses::ev_completed(&response_id), + ])) } diff --git a/codex-rs/mcp-server/tests/suite/codex_tool.rs b/codex-rs/mcp-server/tests/suite/codex_tool.rs index 15836b905..edf2f1b02 100644 --- a/codex-rs/mcp-server/tests/suite/codex_tool.rs +++ b/codex-rs/mcp-server/tests/suite/codex_tool.rs @@ -25,7 +25,7 @@ use core_test_support::skip_if_no_network; use mcp_test_support::McpProcess; use mcp_test_support::create_apply_patch_sse_response; use mcp_test_support::create_final_assistant_message_sse_response; -use mcp_test_support::create_mock_chat_completions_server; +use mcp_test_support::create_mock_responses_server; use mcp_test_support::create_shell_command_sse_response; use mcp_test_support::format_with_current_shell; @@ -87,7 +87,7 @@ async fn shell_command_approval_triggers_elicitation() -> anyhow::Result<()> { ]) .await?; - // Send a "codex" tool request, which should hit the completions endpoint. + // Send a "codex" tool request, which should hit the responses endpoint. // In turn, it should reply with a tool call, which the MCP should forward // as an elicitation. let codex_request_id = mcp_process @@ -349,10 +349,8 @@ async fn codex_tool_passes_base_instructions() -> anyhow::Result<()> { #![expect(clippy::expect_used, clippy::unwrap_used)] let server = - create_mock_chat_completions_server(vec![create_final_assistant_message_sse_response( - "Enjoy!", - )?]) - .await; + create_mock_responses_server(vec![create_final_assistant_message_sse_response("Enjoy!")?]) + .await; // Run `codex mcp` with a specific config.toml. let codex_home = TempDir::new()?; @@ -360,7 +358,7 @@ async fn codex_tool_passes_base_instructions() -> anyhow::Result<()> { let mut mcp_process = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp_process.initialize()).await??; - // Send a "codex" tool request, which should hit the completions endpoint. + // Send a "codex" tool request, which should hit the responses endpoint. let codex_request_id = mcp_process .send_codex_tool_call(CodexToolCallParam { prompt: "How are you?".to_string(), @@ -400,18 +398,23 @@ async fn codex_tool_passes_base_instructions() -> anyhow::Result<()> { let requests = server.received_requests().await.unwrap(); let request = requests[0].body_json::()?; - let instructions = request["messages"][0]["content"].as_str().unwrap(); + let instructions = request["instructions"] + .as_str() + .expect("responses request should include instructions"); assert!(instructions.starts_with("You are a helpful assistant.")); - let developer_messages: Vec<&serde_json::Value> = request["messages"] + let developer_messages: Vec<&serde_json::Value> = request["input"] .as_array() - .unwrap() + .expect("responses request should include input items") .iter() .filter(|msg| msg.get("role").and_then(|role| role.as_str()) == Some("developer")) .collect(); let developer_contents: Vec<&str> = developer_messages .iter() - .filter_map(|msg| msg.get("content").and_then(|value| value.as_str())) + .filter_map(|msg| msg.get("content").and_then(serde_json::Value::as_array)) + .flat_map(|content| content.iter()) + .filter(|span| span.get("type").and_then(serde_json::Value::as_str) == Some("input_text")) + .filter_map(|span| span.get("text").and_then(serde_json::Value::as_str)) .collect(); assert!( developer_contents @@ -469,7 +472,7 @@ pub struct McpHandle { } async fn create_mcp_process(responses: Vec) -> anyhow::Result { - let server = create_mock_chat_completions_server(responses).await; + let server = create_mock_responses_server(responses).await; let codex_home = TempDir::new()?; create_config_toml(codex_home.path(), &server.uri())?; let mut mcp_process = McpProcess::new(codex_home.path()).await?; @@ -499,7 +502,7 @@ model_provider = "mock_provider" [model_providers.mock_provider] name = "Mock provider for test" base_url = "{server_uri}/v1" -wire_api = "chat" +wire_api = "responses" request_max_retries = 0 stream_max_retries = 0 "# diff --git a/codex-rs/ollama/src/client.rs b/codex-rs/ollama/src/client.rs index 4f603c68b..a0ab2d04f 100644 --- a/codex-rs/ollama/src/client.rs +++ b/codex-rs/ollama/src/client.rs @@ -13,7 +13,6 @@ use crate::url::base_url_to_host_root; use crate::url::is_openai_compatible_base_url; use codex_core::ModelProviderInfo; use codex_core::OLLAMA_OSS_PROVIDER_ID; -use codex_core::WireApi; use codex_core::config::Config; const OLLAMA_CONNECTION_ERROR: &str = "No running Ollama server detected. Start it with: `ollama serve` (after installing). Install instructions: https://github.com/ollama/ollama?tab=readme-ov-file#ollama"; @@ -49,7 +48,7 @@ impl OllamaClient { #[cfg(test)] async fn try_from_provider_with_base_url(base_url: &str) -> io::Result { let provider = - codex_core::create_oss_provider_with_base_url(base_url, codex_core::WireApi::Chat); + codex_core::create_oss_provider_with_base_url(base_url, codex_core::WireApi::Responses); Self::try_from_provider(&provider).await } @@ -60,9 +59,7 @@ impl OllamaClient { .base_url .as_ref() .expect("oss provider must have a base_url"); - let uses_openai_compat = is_openai_compatible_base_url(base_url) - || matches!(provider.wire_api, WireApi::Chat) - && is_openai_compatible_base_url(base_url); + let uses_openai_compat = is_openai_compatible_base_url(base_url); let host_root = base_url_to_host_root(base_url); let client = reqwest::Client::builder() .connect_timeout(std::time::Duration::from_secs(5)) diff --git a/codex-rs/ollama/src/lib.rs b/codex-rs/ollama/src/lib.rs index b049f0a48..02f375458 100644 --- a/codex-rs/ollama/src/lib.rs +++ b/codex-rs/ollama/src/lib.rs @@ -5,7 +5,6 @@ mod url; pub use client::OllamaClient; use codex_core::ModelProviderInfo; -use codex_core::WireApi; use codex_core::config::Config; pub use pull::CliProgressReporter; pub use pull::PullEvent; @@ -16,11 +15,6 @@ use semver::Version; /// Default OSS model to use when `--oss` is passed without an explicit `-m`. pub const DEFAULT_OSS_MODEL: &str = "gpt-oss:20b"; -pub struct WireApiDetection { - pub wire_api: WireApi, - pub version: Option, -} - /// Prepare the local OSS environment when `--oss` is selected. /// /// - Ensures a local Ollama server is reachable. @@ -58,60 +52,46 @@ fn min_responses_version() -> Version { Version::new(0, 13, 4) } -fn wire_api_for_version(version: &Version) -> WireApi { - if *version == Version::new(0, 0, 0) || *version >= min_responses_version() { - WireApi::Responses - } else { - WireApi::Chat - } +fn supports_responses(version: &Version) -> bool { + *version == Version::new(0, 0, 0) || *version >= min_responses_version() } -/// Detect which wire API the running Ollama server supports based on its version. -/// Returns `Ok(None)` when the version endpoint is missing or unparsable; callers -/// should keep the configured default in that case. -pub async fn detect_wire_api( - provider: &ModelProviderInfo, -) -> std::io::Result> { +/// Ensure the running Ollama server is new enough to support the Responses API. +/// +/// Returns `Ok(())` when the version endpoint is missing or unparsable. +pub async fn ensure_responses_supported(provider: &ModelProviderInfo) -> std::io::Result<()> { let client = crate::OllamaClient::try_from_provider(provider).await?; let Some(version) = client.fetch_version().await? else { - return Ok(None); + return Ok(()); }; - let wire_api = wire_api_for_version(&version); + if supports_responses(&version) { + return Ok(()); + } - Ok(Some(WireApiDetection { - wire_api, - version: Some(version), - })) + let min = min_responses_version(); + Err(std::io::Error::other(format!( + "Ollama {version} is too old. Codex requires Ollama {min} or newer." + ))) } #[cfg(test)] mod tests { use super::*; - use pretty_assertions::assert_eq; #[test] - fn test_wire_api_for_version_dev_zero_keeps_responses() { - assert_eq!( - wire_api_for_version(&Version::new(0, 0, 0)), - WireApi::Responses - ); + fn supports_responses_for_dev_zero() { + assert!(supports_responses(&Version::new(0, 0, 0))); } #[test] - fn test_wire_api_for_version_before_cutoff_is_chat() { - assert_eq!(wire_api_for_version(&Version::new(0, 13, 3)), WireApi::Chat); + fn does_not_support_responses_before_cutoff() { + assert!(!supports_responses(&Version::new(0, 13, 3))); } #[test] - fn test_wire_api_for_version_at_or_after_cutoff_is_responses() { - assert_eq!( - wire_api_for_version(&Version::new(0, 13, 4)), - WireApi::Responses - ); - assert_eq!( - wire_api_for_version(&Version::new(0, 14, 0)), - WireApi::Responses - ); + fn supports_responses_at_or_after_cutoff() { + assert!(supports_responses(&Version::new(0, 13, 4))); + assert!(supports_responses(&Version::new(0, 14, 0))); } } diff --git a/codex-rs/protocol/src/models.rs b/codex-rs/protocol/src/models.rs index 74cc9a53f..fe63666e3 100644 --- a/codex-rs/protocol/src/models.rs +++ b/codex-rs/protocol/src/models.rs @@ -109,7 +109,7 @@ pub enum ResponseItem { encrypted_content: Option, }, LocalShellCall { - /// Set when using the chat completions API. + /// Legacy id field retained for compatibility with older payloads. #[serde(default, skip_serializing)] #[ts(skip)] id: Option, @@ -125,8 +125,7 @@ pub enum ResponseItem { name: String, // The Responses API returns the function call arguments as a *string* that contains // JSON, not as an already‑parsed object. We keep it as a raw string here and let - // Session::handle_function_call parse it into a Value. This exactly matches the - // Chat Completions + Responses API behavior. + // Session::handle_function_call parse it into a Value. arguments: String, call_id: String, }, @@ -786,8 +785,7 @@ pub enum FunctionCallOutputContentItem { /// `content` preserves the historical plain-string payload so downstream /// integrations (tests, logging, etc.) can keep treating tool output as /// `String`. When an MCP server returns richer data we additionally populate -/// `content_items` with the structured form that the Responses/Chat -/// Completions APIs understand. +/// `content_items` with the structured form that the Responses API understands. #[derive(Debug, Default, Clone, PartialEq, JsonSchema, TS)] pub struct FunctionCallOutputPayload { pub content: String, diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 48ef0a772..0160e8f23 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -48,7 +48,6 @@ use codex_core::models_manager::manager::RefreshStrategy; use codex_core::models_manager::model_presets::HIDE_GPT_5_1_CODEX_MAX_MIGRATION_PROMPT_CONFIG; use codex_core::models_manager::model_presets::HIDE_GPT5_1_MIGRATION_PROMPT_CONFIG; use codex_core::protocol::AskForApproval; -use codex_core::protocol::DeprecationNoticeEvent; use codex_core::protocol::Event; use codex_core::protocol::EventMsg; use codex_core::protocol::FinalOutput; @@ -184,15 +183,6 @@ fn emit_skill_load_warnings(app_event_tx: &AppEventSender, errors: &[SkillErrorI } } -fn emit_deprecation_notice(app_event_tx: &AppEventSender, notice: Option) { - let Some(DeprecationNoticeEvent { summary, details }) = notice else { - return; - }; - app_event_tx.send(AppEvent::InsertHistoryCell(Box::new( - crate::history_cell::new_deprecation_notice(summary, details), - ))); -} - fn emit_project_config_warnings(app_event_tx: &AppEventSender, config: &Config) { let mut disabled_folders = Vec::new(); @@ -926,12 +916,10 @@ impl App { session_selection: SessionSelection, feedback: codex_feedback::CodexFeedback, is_first_run: bool, - ollama_chat_support_notice: Option, ) -> Result { use tokio_stream::StreamExt; let (app_event_tx, mut app_event_rx) = unbounded_channel(); let app_event_tx = AppEventSender::new(app_event_tx); - emit_deprecation_notice(&app_event_tx, ollama_chat_support_notice); emit_project_config_warnings(&app_event_tx, &config); tui.set_notification_method(config.tui_notification_method); diff --git a/codex-rs/tui/src/cli.rs b/codex-rs/tui/src/cli.rs index 8308f1c2a..e6880437e 100644 --- a/codex-rs/tui/src/cli.rs +++ b/codex-rs/tui/src/cli.rs @@ -58,7 +58,7 @@ pub struct Cli { #[arg(long = "oss", default_value_t = false)] pub oss: bool, - /// Specify which local provider to use (lmstudio, ollama, or ollama-chat). + /// Specify which local provider to use (lmstudio or ollama). /// If not specified with --oss, will use config default or show selection. #[arg(long = "local-provider")] pub oss_provider: Option, diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index 862f93eac..2e5aee5c3 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -11,7 +11,6 @@ use codex_app_server_protocol::AuthMode; use codex_cloud_requirements::cloud_requirements_loader; use codex_common::oss::ensure_oss_provider_ready; use codex_common::oss::get_default_model_for_oss_provider; -use codex_common::oss::ollama_chat_deprecation_notice; use codex_core::AuthManager; use codex_core::CodexAuth; use codex_core::INTERACTIVE_SESSION_SOURCES; @@ -508,14 +507,6 @@ async fn run_ratatui_app( } else { initial_config }; - - let ollama_chat_support_notice = match ollama_chat_deprecation_notice(&config).await { - Ok(notice) => notice, - Err(err) => { - tracing::warn!(?err, "Failed to detect Ollama wire API"); - None - } - }; let mut missing_session_exit = |id_str: &str, action: &str| { error!("Error finding conversation path: {id_str}"); restore(); @@ -701,7 +692,6 @@ async fn run_ratatui_app( session_selection, feedback, should_show_trust_screen, // Proxy to: is it a first run in this directory? - ollama_chat_support_notice, ) .await; diff --git a/codex-rs/tui/src/oss_selection.rs b/codex-rs/tui/src/oss_selection.rs index 726791910..c73b4e332 100644 --- a/codex-rs/tui/src/oss_selection.rs +++ b/codex-rs/tui/src/oss_selection.rs @@ -4,7 +4,6 @@ use std::sync::LazyLock; use codex_core::DEFAULT_LMSTUDIO_PORT; use codex_core::DEFAULT_OLLAMA_PORT; use codex_core::LMSTUDIO_OSS_PROVIDER_ID; -use codex_core::OLLAMA_CHAT_PROVIDER_ID; use codex_core::OLLAMA_OSS_PROVIDER_ID; use codex_core::config::set_default_oss_provider; use crossterm::event::Event; @@ -75,12 +74,6 @@ static OSS_SELECT_OPTIONS: LazyLock> = LazyLock::new(|| { key: KeyCode::Char('o'), provider_id: OLLAMA_OSS_PROVIDER_ID, }, - SelectOption { - label: Line::from(vec!["Ollama (".into(), "c".underlined(), "hat)".into()]), - description: "Local Ollama server (chat wire API, default port 11434)", - key: KeyCode::Char('c'), - provider_id: OLLAMA_CHAT_PROVIDER_ID, - }, ] });