From 807f8a43c2823b39a65e83fec31cf93f0e37a646 Mon Sep 17 00:00:00 2001 From: Anton Panasenko Date: Mon, 5 Jan 2026 10:27:00 -0800 Subject: [PATCH] feat: expose outputSchema to user_turn/turn_start app_server API (#8377) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit What changed - Added `outputSchema` support to the app-server APIs, mirroring `codex exec --output-schema` behavior. - V1 `sendUserTurn` now accepts `outputSchema` and constrains the final assistant message for that turn. - V2 `turn/start` now accepts `outputSchema` and constrains the final assistant message for that turn (explicitly per-turn only). Core behavior - `Op::UserTurn` already supported `final_output_json_schema`; now V1 `sendUserTurn` forwards `outputSchema` into that field. - `Op::UserInput` now carries `final_output_json_schema` for per-turn settings updates; core maps it into `SessionSettingsUpdate.final_output_json_schema` so it applies to the created turn context. - V2 `turn/start` does NOT persist the schema via `OverrideTurnContext` (it’s applied only for the current turn). Other overrides (cwd/model/etc) keep their existing persistent behavior. API / docs - `codex-rs/app-server-protocol/src/protocol/v1.rs`: add `output_schema: Option` to `SendUserTurnParams` (serialized as `outputSchema`). - `codex-rs/app-server-protocol/src/protocol/v2.rs`: add `output_schema: Option` to `TurnStartParams` (serialized as `outputSchema`). - `codex-rs/app-server/README.md`: document `outputSchema` for `turn/start` and clarify it applies only to the current turn. - `codex-rs/docs/codex_mcp_interface.md`: document `outputSchema` for v1 `sendUserTurn` and v2 `turn/start`. Tests added/updated - New app-server integration tests asserting `outputSchema` is forwarded into outbound `/responses` requests as `text.format`: - `codex-rs/app-server/tests/suite/output_schema.rs` - `codex-rs/app-server/tests/suite/v2/output_schema.rs` - Added per-turn semantics tests (schema does not leak to the next turn): - `send_user_turn_output_schema_is_per_turn_v1` - `turn_start_output_schema_is_per_turn_v2` - Added protocol wire-compat tests for the merged op: - serialize omits `final_output_json_schema` when `None` - deserialize works when field is missing - serialize includes `final_output_json_schema` when `Some(schema)` Call site updates (high level) - Updated all `Op::UserInput { .. }` constructions to include `final_output_json_schema`: - `codex-rs/app-server/src/codex_message_processor.rs` - `codex-rs/core/src/codex_delegate.rs` - `codex-rs/mcp-server/src/codex_tool_runner.rs` - `codex-rs/tui/src/chatwidget.rs` - `codex-rs/tui2/src/chatwidget.rs` - plus impacted core tests. Validation - `just fmt` - `cargo test -p codex-core` - `cargo test -p codex-app-server` - `cargo test -p codex-mcp-server` - `cargo test -p codex-tui` - `cargo test -p codex-tui2` - `cargo test -p codex-protocol` - `cargo clippy --all-features --tests --profile dev --fix -- -D warnings` --- .../app-server-protocol/src/protocol/v1.rs | 2 + .../app-server-protocol/src/protocol/v2.rs | 2 + codex-rs/app-server/README.md | 11 +- .../app-server/src/codex_message_processor.rs | 5 +- .../suite/codex_message_processor_flow.rs | 3 + codex-rs/app-server/tests/suite/mod.rs | 1 + .../app-server/tests/suite/output_schema.rs | 282 ++++++++++++++++++ codex-rs/app-server/tests/suite/v2/mod.rs | 1 + .../tests/suite/v2/output_schema.rs | 231 ++++++++++++++ .../app-server/tests/suite/v2/turn_start.rs | 2 + codex-rs/core/src/codex.rs | 11 +- codex-rs/core/src/codex_delegate.rs | 6 +- codex-rs/core/tests/suite/abort_tasks.rs | 3 + codex-rs/core/tests/suite/client.rs | 25 ++ codex-rs/core/tests/suite/compact.rs | 17 ++ codex-rs/core/tests/suite/compact_remote.rs | 4 + .../core/tests/suite/compact_resume_fork.rs | 1 + .../core/tests/suite/fork_conversation.rs | 1 + codex-rs/core/tests/suite/items.rs | 7 + codex-rs/core/tests/suite/otel.rs | 22 ++ codex-rs/core/tests/suite/prompt_caching.rs | 10 + codex-rs/core/tests/suite/quota_exceeded.rs | 1 + codex-rs/core/tests/suite/resume.rs | 2 + codex-rs/core/tests/suite/review.rs | 1 + .../suite/stream_error_allows_next_turn.rs | 2 + .../core/tests/suite/stream_no_completed.rs | 1 + .../core/tests/suite/user_notification.rs | 1 + codex-rs/docs/codex_mcp_interface.md | 4 +- codex-rs/mcp-server/src/codex_tool_runner.rs | 2 + codex-rs/protocol/src/protocol.rs | 59 ++++ codex-rs/tui/src/chatwidget.rs | 5 +- codex-rs/tui2/src/chatwidget.rs | 5 +- 32 files changed, 722 insertions(+), 8 deletions(-) create mode 100644 codex-rs/app-server/tests/suite/output_schema.rs create mode 100644 codex-rs/app-server/tests/suite/v2/output_schema.rs diff --git a/codex-rs/app-server-protocol/src/protocol/v1.rs b/codex-rs/app-server-protocol/src/protocol/v1.rs index df39f8809..8aad35e41 100644 --- a/codex-rs/app-server-protocol/src/protocol/v1.rs +++ b/codex-rs/app-server-protocol/src/protocol/v1.rs @@ -384,6 +384,8 @@ pub struct SendUserTurnParams { pub model: String, pub effort: Option, pub summary: ReasoningSummary, + /// Optional JSON Schema used to constrain the final assistant message for this turn. + pub output_schema: Option, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 7f09216ea..13a654535 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -1319,6 +1319,8 @@ pub struct TurnStartParams { pub effort: Option, /// Override the reasoning summary for this turn and subsequent turns. pub summary: Option, + /// Optional JSON Schema used to constrain the final assistant message for this turn. + pub output_schema: Option, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 787ec398d..267d7577e 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -162,7 +162,7 @@ Turns attach user input (text or images) to a thread and trigger Codex generatio - `{"type":"image","url":"https://…png"}` - `{"type":"localImage","path":"/tmp/screenshot.png"}` -You can optionally specify config overrides on the new turn. If specified, these settings become the default for subsequent turns on the same thread. +You can optionally specify config overrides on the new turn. If specified, these settings become the default for subsequent turns on the same thread. `outputSchema` applies only to the current turn. ```json { "method": "turn/start", "id": 30, "params": { @@ -178,7 +178,14 @@ You can optionally specify config overrides on the new turn. If specified, these }, "model": "gpt-5.1-codex", "effort": "medium", - "summary": "concise" + "summary": "concise", + // Optional JSON Schema to constrain the final assistant message for this turn. + "outputSchema": { + "type": "object", + "properties": { "answer": { "type": "string" } }, + "required": ["answer"], + "additionalProperties": false + } } } { "id": 30, "result": { "turn": { "id": "turn_456", diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index d1804801d..f5e9fce59 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -2579,6 +2579,7 @@ impl CodexMessageProcessor { let _ = conversation .submit(Op::UserInput { items: mapped_items, + final_output_json_schema: None, }) .await; @@ -2598,6 +2599,7 @@ impl CodexMessageProcessor { model, effort, summary, + output_schema, } = params; let Ok(conversation) = self @@ -2632,7 +2634,7 @@ impl CodexMessageProcessor { model, effort, summary, - final_output_json_schema: None, + final_output_json_schema: output_schema, }) .await; @@ -2741,6 +2743,7 @@ impl CodexMessageProcessor { let turn_id = conversation .submit(Op::UserInput { items: mapped_items, + final_output_json_schema: params.output_schema, }) .await; diff --git a/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs b/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs index be94dd822..c044e1c4c 100644 --- a/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs +++ b/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs @@ -305,6 +305,7 @@ async fn test_send_user_turn_changes_approval_policy_behavior() -> Result<()> { model: "mock-model".to_string(), effort: Some(ReasoningEffort::Medium), summary: ReasoningSummary::Auto, + output_schema: None, }) .await?; // Acknowledge sendUserTurn @@ -418,6 +419,7 @@ async fn test_send_user_turn_updates_sandbox_and_cwd_between_turns() -> Result<( model: model.clone(), effort: Some(ReasoningEffort::Medium), summary: ReasoningSummary::Auto, + output_schema: None, }) .await?; timeout( @@ -443,6 +445,7 @@ async fn test_send_user_turn_updates_sandbox_and_cwd_between_turns() -> Result<( model: model.clone(), effort: Some(ReasoningEffort::Medium), summary: ReasoningSummary::Auto, + output_schema: None, }) .await?; timeout( diff --git a/codex-rs/app-server/tests/suite/mod.rs b/codex-rs/app-server/tests/suite/mod.rs index 37f7659f4..916a8f5a7 100644 --- a/codex-rs/app-server/tests/suite/mod.rs +++ b/codex-rs/app-server/tests/suite/mod.rs @@ -7,6 +7,7 @@ mod fuzzy_file_search; mod interrupt; mod list_resume; mod login; +mod output_schema; mod send_message; mod set_default_model; mod user_agent; diff --git a/codex-rs/app-server/tests/suite/output_schema.rs b/codex-rs/app-server/tests/suite/output_schema.rs new file mode 100644 index 000000000..4ec500a24 --- /dev/null +++ b/codex-rs/app-server/tests/suite/output_schema.rs @@ -0,0 +1,282 @@ +use anyhow::Result; +use app_test_support::McpProcess; +use app_test_support::to_response; +use codex_app_server_protocol::AddConversationListenerParams; +use codex_app_server_protocol::InputItem; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::NewConversationParams; +use codex_app_server_protocol::NewConversationResponse; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::SendUserTurnParams; +use codex_app_server_protocol::SendUserTurnResponse; +use codex_core::protocol::AskForApproval; +use codex_core::protocol::SandboxPolicy; +use codex_protocol::config_types::ReasoningSummary; +use codex_protocol::openai_models::ReasoningEffort; +use core_test_support::responses; +use core_test_support::skip_if_no_network; +use pretty_assertions::assert_eq; +use std::path::Path; +use tempfile::TempDir; +use tokio::time::timeout; + +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +#[tokio::test] +async fn send_user_turn_accepts_output_schema_v1() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = responses::start_mock_server().await; + let body = responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_assistant_message("msg-1", "Done"), + responses::ev_completed("resp-1"), + ]); + let response_mock = responses::mount_sse_once(&server, body).await; + + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let new_conv_id = mcp + .send_new_conversation_request(NewConversationParams { + ..Default::default() + }) + .await?; + let new_conv_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)), + ) + .await??; + let NewConversationResponse { + conversation_id, .. + } = to_response::(new_conv_resp)?; + + let listener_id = mcp + .send_add_conversation_listener_request(AddConversationListenerParams { + conversation_id, + experimental_raw_events: false, + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(listener_id)), + ) + .await??; + + let output_schema = serde_json::json!({ + "type": "object", + "properties": { + "answer": { "type": "string" } + }, + "required": ["answer"], + "additionalProperties": false + }); + + let send_turn_id = mcp + .send_send_user_turn_request(SendUserTurnParams { + conversation_id, + items: vec![InputItem::Text { + text: "Hello".to_string(), + }], + cwd: codex_home.path().to_path_buf(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::new_read_only_policy(), + model: "mock-model".to_string(), + effort: Some(ReasoningEffort::Medium), + summary: ReasoningSummary::Auto, + output_schema: Some(output_schema.clone()), + }) + .await?; + let _send_turn_resp: SendUserTurnResponse = to_response::( + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(send_turn_id)), + ) + .await??, + )?; + + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/task_complete"), + ) + .await??; + + let request = response_mock.single_request(); + let payload = request.body_json(); + let text = payload.get("text").expect("request missing text field"); + let format = text + .get("format") + .expect("request missing text.format field"); + assert_eq!( + format, + &serde_json::json!({ + "name": "codex_output_schema", + "type": "json_schema", + "strict": true, + "schema": output_schema, + }) + ); + + Ok(()) +} + +#[tokio::test] +async fn send_user_turn_output_schema_is_per_turn_v1() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = responses::start_mock_server().await; + let body1 = responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_assistant_message("msg-1", "Done"), + responses::ev_completed("resp-1"), + ]); + let response_mock1 = responses::mount_sse_once(&server, body1).await; + + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let new_conv_id = mcp + .send_new_conversation_request(NewConversationParams { + ..Default::default() + }) + .await?; + let new_conv_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)), + ) + .await??; + let NewConversationResponse { + conversation_id, .. + } = to_response::(new_conv_resp)?; + + let listener_id = mcp + .send_add_conversation_listener_request(AddConversationListenerParams { + conversation_id, + experimental_raw_events: false, + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(listener_id)), + ) + .await??; + + let output_schema = serde_json::json!({ + "type": "object", + "properties": { + "answer": { "type": "string" } + }, + "required": ["answer"], + "additionalProperties": false + }); + + let send_turn_id = mcp + .send_send_user_turn_request(SendUserTurnParams { + conversation_id, + items: vec![InputItem::Text { + text: "Hello".to_string(), + }], + cwd: codex_home.path().to_path_buf(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::new_read_only_policy(), + model: "mock-model".to_string(), + effort: Some(ReasoningEffort::Medium), + summary: ReasoningSummary::Auto, + output_schema: Some(output_schema.clone()), + }) + .await?; + let _send_turn_resp: SendUserTurnResponse = to_response::( + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(send_turn_id)), + ) + .await??, + )?; + + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/task_complete"), + ) + .await??; + + let payload1 = response_mock1.single_request().body_json(); + assert_eq!( + payload1.pointer("/text/format"), + Some(&serde_json::json!({ + "name": "codex_output_schema", + "type": "json_schema", + "strict": true, + "schema": output_schema, + })) + ); + + let body2 = responses::sse(vec![ + responses::ev_response_created("resp-2"), + responses::ev_assistant_message("msg-2", "Done"), + responses::ev_completed("resp-2"), + ]); + let response_mock2 = responses::mount_sse_once(&server, body2).await; + + let send_turn_id_2 = mcp + .send_send_user_turn_request(SendUserTurnParams { + conversation_id, + items: vec![InputItem::Text { + text: "Hello again".to_string(), + }], + cwd: codex_home.path().to_path_buf(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::new_read_only_policy(), + model: "mock-model".to_string(), + effort: Some(ReasoningEffort::Medium), + summary: ReasoningSummary::Auto, + output_schema: None, + }) + .await?; + let _send_turn_resp_2: SendUserTurnResponse = to_response::( + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(send_turn_id_2)), + ) + .await??, + )?; + + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/task_complete"), + ) + .await??; + + let payload2 = response_mock2.single_request().body_json(); + assert_eq!(payload2.pointer("/text/format"), None); + + Ok(()) +} + +fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> { + let config_toml = codex_home.join("config.toml"); + std::fs::write( + config_toml, + format!( + r#" +model = "mock-model" +approval_policy = "never" +sandbox_mode = "read-only" + +model_provider = "mock_provider" + +[model_providers.mock_provider] +name = "Mock provider for test" +base_url = "{server_uri}/v1" +wire_api = "responses" +request_max_retries = 0 +stream_max_retries = 0 +"# + ), + ) +} diff --git a/codex-rs/app-server/tests/suite/v2/mod.rs b/codex-rs/app-server/tests/suite/v2/mod.rs index 16d2142b2..f23f792d3 100644 --- a/codex-rs/app-server/tests/suite/v2/mod.rs +++ b/codex-rs/app-server/tests/suite/v2/mod.rs @@ -1,6 +1,7 @@ mod account; mod config_rpc; mod model_list; +mod output_schema; mod rate_limits; mod review; mod thread_archive; diff --git a/codex-rs/app-server/tests/suite/v2/output_schema.rs b/codex-rs/app-server/tests/suite/v2/output_schema.rs new file mode 100644 index 000000000..f23c03703 --- /dev/null +++ b/codex-rs/app-server/tests/suite/v2/output_schema.rs @@ -0,0 +1,231 @@ +use anyhow::Result; +use app_test_support::McpProcess; +use app_test_support::to_response; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::TurnStartResponse; +use codex_app_server_protocol::UserInput as V2UserInput; +use core_test_support::responses; +use core_test_support::skip_if_no_network; +use pretty_assertions::assert_eq; +use std::path::Path; +use tempfile::TempDir; +use tokio::time::timeout; + +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +#[tokio::test] +async fn turn_start_accepts_output_schema_v2() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = responses::start_mock_server().await; + let body = responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_assistant_message("msg-1", "Done"), + responses::ev_completed("resp-1"), + ]); + let response_mock = responses::mount_sse_once(&server, body).await; + + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let thread_req = mcp + .send_thread_start_request(ThreadStartParams { + ..Default::default() + }) + .await?; + let thread_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(thread_req)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(thread_resp)?; + + let output_schema = serde_json::json!({ + "type": "object", + "properties": { + "answer": { "type": "string" } + }, + "required": ["answer"], + "additionalProperties": false + }); + + let turn_req = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "Hello".to_string(), + }], + output_schema: Some(output_schema.clone()), + ..Default::default() + }) + .await?; + let turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_req)), + ) + .await??; + let _turn: TurnStartResponse = to_response::(turn_resp)?; + + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + let request = response_mock.single_request(); + let payload = request.body_json(); + let text = payload.get("text").expect("request missing text field"); + let format = text + .get("format") + .expect("request missing text.format field"); + assert_eq!( + format, + &serde_json::json!({ + "name": "codex_output_schema", + "type": "json_schema", + "strict": true, + "schema": output_schema, + }) + ); + + Ok(()) +} + +#[tokio::test] +async fn turn_start_output_schema_is_per_turn_v2() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = responses::start_mock_server().await; + let body1 = responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_assistant_message("msg-1", "Done"), + responses::ev_completed("resp-1"), + ]); + let response_mock1 = responses::mount_sse_once(&server, body1).await; + + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let thread_req = mcp + .send_thread_start_request(ThreadStartParams { + ..Default::default() + }) + .await?; + let thread_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(thread_req)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(thread_resp)?; + + let output_schema = serde_json::json!({ + "type": "object", + "properties": { + "answer": { "type": "string" } + }, + "required": ["answer"], + "additionalProperties": false + }); + + let turn_req_1 = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "Hello".to_string(), + }], + output_schema: Some(output_schema.clone()), + ..Default::default() + }) + .await?; + let turn_resp_1: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_req_1)), + ) + .await??; + let _turn: TurnStartResponse = to_response::(turn_resp_1)?; + + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + let payload1 = response_mock1.single_request().body_json(); + assert_eq!( + payload1.pointer("/text/format"), + Some(&serde_json::json!({ + "name": "codex_output_schema", + "type": "json_schema", + "strict": true, + "schema": output_schema, + })) + ); + + let body2 = responses::sse(vec![ + responses::ev_response_created("resp-2"), + responses::ev_assistant_message("msg-2", "Done"), + responses::ev_completed("resp-2"), + ]); + let response_mock2 = responses::mount_sse_once(&server, body2).await; + + let turn_req_2 = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "Hello again".to_string(), + }], + output_schema: None, + ..Default::default() + }) + .await?; + let turn_resp_2: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_req_2)), + ) + .await??; + let _turn: TurnStartResponse = to_response::(turn_resp_2)?; + + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + let payload2 = response_mock2.single_request().body_json(); + assert_eq!(payload2.pointer("/text/format"), None); + + Ok(()) +} + +fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> { + let config_toml = codex_home.join("config.toml"); + std::fs::write( + config_toml, + format!( + r#" +model = "mock-model" +approval_policy = "never" +sandbox_mode = "read-only" + +model_provider = "mock_provider" + +[model_providers.mock_provider] +name = "Mock provider for test" +base_url = "{server_uri}/v1" +wire_api = "responses" +request_max_retries = 0 +stream_max_retries = 0 +"# + ), + ) +} diff --git a/codex-rs/app-server/tests/suite/v2/turn_start.rs b/codex-rs/app-server/tests/suite/v2/turn_start.rs index 1948487d1..60fa656b1 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_start.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_start.rs @@ -540,6 +540,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> { model: Some("mock-model".to_string()), effort: Some(ReasoningEffort::Medium), summary: Some(ReasoningSummary::Auto), + output_schema: None, }) .await?; timeout( @@ -566,6 +567,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> { model: Some("mock-model".to_string()), effort: Some(ReasoningEffort::Medium), summary: Some(ReasoningSummary::Auto), + output_schema: None, }) .await?; timeout( diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 996d156f4..e853a2e38 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -1776,7 +1776,16 @@ mod handlers { final_output_json_schema: Some(final_output_json_schema), }, ), - Op::UserInput { items } => (items, SessionSettingsUpdate::default()), + Op::UserInput { + items, + final_output_json_schema, + } => ( + items, + SessionSettingsUpdate { + final_output_json_schema: Some(final_output_json_schema), + ..Default::default() + }, + ), _ => unreachable!(), }; diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index a7e70ff23..e767abf4a 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -118,7 +118,11 @@ pub(crate) async fn run_codex_conversation_one_shot( .await?; // Send the initial input to kick off the one-shot turn. - io.submit(Op::UserInput { items: input }).await?; + io.submit(Op::UserInput { + items: input, + final_output_json_schema: None, + }) + .await?; // Bridge events so we can observe completion and shut down automatically. let (tx_bridge, rx_bridge) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY); diff --git a/codex-rs/core/tests/suite/abort_tasks.rs b/codex-rs/core/tests/suite/abort_tasks.rs index 0d4a807a3..53d32e140 100644 --- a/codex-rs/core/tests/suite/abort_tasks.rs +++ b/codex-rs/core/tests/suite/abort_tasks.rs @@ -49,6 +49,7 @@ async fn interrupt_long_running_tool_emits_turn_aborted() { items: vec![UserInput::Text { text: "start sleep".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -101,6 +102,7 @@ async fn interrupt_tool_records_history_entries() { items: vec![UserInput::Text { text: "start history recording".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -117,6 +119,7 @@ async fn interrupt_tool_records_history_entries() { items: vec![UserInput::Text { text: "follow up".into(), }], + final_output_json_schema: None, }) .await .unwrap(); diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index a22027f99..5ce6e9f2f 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -290,6 +290,7 @@ async fn resume_includes_initial_messages_and_sends_prior_items() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -365,6 +366,7 @@ async fn includes_conversation_id_and_model_headers_in_request() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -424,6 +426,7 @@ async fn includes_base_instructions_override_in_request() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -488,6 +491,7 @@ async fn chatgpt_auth_sends_correct_request() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -582,6 +586,7 @@ async fn prefers_apikey_when_config_prefers_apikey_even_with_chatgpt_tokens() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -622,6 +627,7 @@ async fn includes_user_instructions_message_in_request() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -692,6 +698,7 @@ async fn skills_append_to_instructions() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -741,6 +748,7 @@ async fn includes_configured_effort_in_request() -> anyhow::Result<()> { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -777,6 +785,7 @@ async fn includes_no_effort_in_request() -> anyhow::Result<()> { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -811,6 +820,7 @@ async fn includes_default_reasoning_effort_in_request_when_defined_by_model_fami items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -849,6 +859,7 @@ async fn configured_reasoning_summary_is_sent() -> anyhow::Result<()> { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -887,6 +898,7 @@ async fn reasoning_summary_is_omitted_when_disabled() -> anyhow::Result<()> { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -919,6 +931,7 @@ async fn includes_default_verbosity_in_request() -> anyhow::Result<()> { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -958,6 +971,7 @@ async fn configured_verbosity_not_sent_for_models_without_support() -> anyhow::R items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -996,6 +1010,7 @@ async fn configured_verbosity_is_sent() -> anyhow::Result<()> { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1050,6 +1065,7 @@ async fn includes_developer_instructions_message_in_request() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1280,6 +1296,7 @@ async fn token_count_includes_rate_limits_snapshot() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1437,6 +1454,7 @@ async fn usage_limit_error_emits_rate_limit_event() -> anyhow::Result<()> { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .expect("submission should succeed while emitting usage limit error events"); @@ -1506,6 +1524,7 @@ async fn context_window_error_sets_total_tokens_to_model_window() -> anyhow::Res items: vec![UserInput::Text { text: "seed turn".into(), }], + final_output_json_schema: None, }) .await?; @@ -1516,6 +1535,7 @@ async fn context_window_error_sets_total_tokens_to_model_window() -> anyhow::Res items: vec![UserInput::Text { text: "trigger context window".into(), }], + final_output_json_schema: None, }) .await?; @@ -1635,6 +1655,7 @@ async fn azure_overrides_assign_properties_used_for_responses_url() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1717,6 +1738,7 @@ async fn env_var_overrides_loaded_auth() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1800,6 +1822,7 @@ async fn history_dedupes_streamed_and_final_messages_across_turns() { codex .submit(Op::UserInput { items: vec![UserInput::Text { text: "U1".into() }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1809,6 +1832,7 @@ async fn history_dedupes_streamed_and_final_messages_across_turns() { codex .submit(Op::UserInput { items: vec![UserInput::Text { text: "U2".into() }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1818,6 +1842,7 @@ async fn history_dedupes_streamed_and_final_messages_across_turns() { codex .submit(Op::UserInput { items: vec![UserInput::Text { text: "U3".into() }], + final_output_json_schema: None, }) .await .unwrap(); diff --git a/codex-rs/core/tests/suite/compact.rs b/codex-rs/core/tests/suite/compact.rs index c7556e338..09b5fb18b 100644 --- a/codex-rs/core/tests/suite/compact.rs +++ b/codex-rs/core/tests/suite/compact.rs @@ -161,6 +161,7 @@ async fn summarize_context_three_requests_and_instructions() { items: vec![UserInput::Text { text: "hello world".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -181,6 +182,7 @@ async fn summarize_context_three_requests_and_instructions() { items: vec![UserInput::Text { text: THIRD_USER_MSG.into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -580,6 +582,7 @@ async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() { items: vec![UserInput::Text { text: user_message.into(), }], + final_output_json_schema: None, }) .await .expect("submit user input"); @@ -1084,6 +1087,7 @@ async fn auto_compact_runs_after_token_limit_hit() { items: vec![UserInput::Text { text: FIRST_AUTO_MSG.into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1095,6 +1099,7 @@ async fn auto_compact_runs_after_token_limit_hit() { items: vec![UserInput::Text { text: SECOND_AUTO_MSG.into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1106,6 +1111,7 @@ async fn auto_compact_runs_after_token_limit_hit() { items: vec![UserInput::Text { text: POST_AUTO_USER_MSG.into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1418,6 +1424,7 @@ async fn auto_compact_persists_rollout_entries() { items: vec![UserInput::Text { text: FIRST_AUTO_MSG.into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1428,6 +1435,7 @@ async fn auto_compact_persists_rollout_entries() { items: vec![UserInput::Text { text: SECOND_AUTO_MSG.into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1438,6 +1446,7 @@ async fn auto_compact_persists_rollout_entries() { items: vec![UserInput::Text { text: POST_AUTO_USER_MSG.into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1529,6 +1538,7 @@ async fn manual_compact_retries_after_context_window_error() { items: vec![UserInput::Text { text: "first turn".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1661,6 +1671,7 @@ async fn manual_compact_twice_preserves_latest_user_messages() { items: vec![UserInput::Text { text: first_user_message.into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1674,6 +1685,7 @@ async fn manual_compact_twice_preserves_latest_user_messages() { items: vec![UserInput::Text { text: second_user_message.into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1687,6 +1699,7 @@ async fn manual_compact_twice_preserves_latest_user_messages() { items: vec![UserInput::Text { text: final_user_message.into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1866,6 +1879,7 @@ async fn auto_compact_allows_multiple_attempts_when_interleaved_with_other_turn_ codex .submit(Op::UserInput { items: vec![UserInput::Text { text: user.into() }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1978,6 +1992,7 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() { items: vec![UserInput::Text { text: FUNCTION_CALL_LIMIT_MSG.into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1989,6 +2004,7 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() { items: vec![UserInput::Text { text: follow_up_user.into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -2103,6 +2119,7 @@ async fn auto_compact_counts_encrypted_reasoning_before_last_user() { codex .submit(Op::UserInput { items: vec![UserInput::Text { text: user.into() }], + final_output_json_schema: None, }) .await .unwrap(); diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index 34e44419b..5a2fb5453 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -74,6 +74,7 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> { items: vec![UserInput::Text { text: "hello remote compact".into(), }], + final_output_json_schema: None, }) .await?; wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; @@ -86,6 +87,7 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> { items: vec![UserInput::Text { text: "after compact".into(), }], + final_output_json_schema: None, }) .await?; wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; @@ -191,6 +193,7 @@ async fn remote_compact_runs_automatically() -> Result<()> { items: vec![UserInput::Text { text: "hello remote compact".into(), }], + final_output_json_schema: None, }) .await?; let message = wait_for_event_match(&codex, |ev| match ev { @@ -263,6 +266,7 @@ async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()> items: vec![UserInput::Text { text: "needs compaction".into(), }], + final_output_json_schema: None, }) .await?; wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; diff --git a/codex-rs/core/tests/suite/compact_resume_fork.rs b/codex-rs/core/tests/suite/compact_resume_fork.rs index 75468ae14..3e38c89b3 100644 --- a/codex-rs/core/tests/suite/compact_resume_fork.rs +++ b/codex-rs/core/tests/suite/compact_resume_fork.rs @@ -884,6 +884,7 @@ async fn user_turn(conversation: &Arc, text: &str) { conversation .submit(Op::UserInput { items: vec![UserInput::Text { text: text.into() }], + final_output_json_schema: None, }) .await .expect("submit user turn"); diff --git a/codex-rs/core/tests/suite/fork_conversation.rs b/codex-rs/core/tests/suite/fork_conversation.rs index d302b4d77..dab856c80 100644 --- a/codex-rs/core/tests/suite/fork_conversation.rs +++ b/codex-rs/core/tests/suite/fork_conversation.rs @@ -74,6 +74,7 @@ async fn fork_conversation_twice_drops_to_first_message() { items: vec![UserInput::Text { text: text.to_string(), }], + final_output_json_schema: None, }) .await .unwrap(); diff --git a/codex-rs/core/tests/suite/items.rs b/codex-rs/core/tests/suite/items.rs index 4ec23e9de..3c6cf5ff3 100644 --- a/codex-rs/core/tests/suite/items.rs +++ b/codex-rs/core/tests/suite/items.rs @@ -43,6 +43,7 @@ async fn user_message_item_is_emitted() -> anyhow::Result<()> { items: (vec![UserInput::Text { text: "please inspect sample.txt".into(), }]), + final_output_json_schema: None, }) .await?; @@ -99,6 +100,7 @@ async fn assistant_message_item_is_emitted() -> anyhow::Result<()> { items: vec![UserInput::Text { text: "please summarize results".into(), }], + final_output_json_schema: None, }) .await?; @@ -155,6 +157,7 @@ async fn reasoning_item_is_emitted() -> anyhow::Result<()> { items: vec![UserInput::Text { text: "explain your reasoning".into(), }], + final_output_json_schema: None, }) .await?; @@ -213,6 +216,7 @@ async fn web_search_item_is_emitted() -> anyhow::Result<()> { items: vec![UserInput::Text { text: "find the weather".into(), }], + final_output_json_schema: None, }) .await?; @@ -265,6 +269,7 @@ async fn agent_message_content_delta_has_item_metadata() -> anyhow::Result<()> { items: vec![UserInput::Text { text: "please stream text".into(), }], + final_output_json_schema: None, }) .await?; @@ -330,6 +335,7 @@ async fn reasoning_content_delta_has_item_metadata() -> anyhow::Result<()> { items: vec![UserInput::Text { text: "reason through it".into(), }], + final_output_json_schema: None, }) .await?; @@ -387,6 +393,7 @@ async fn reasoning_raw_content_delta_respects_flag() -> anyhow::Result<()> { items: vec![UserInput::Text { text: "show raw reasoning".into(), }], + final_output_json_schema: None, }) .await?; diff --git a/codex-rs/core/tests/suite/otel.rs b/codex-rs/core/tests/suite/otel.rs index e19c41da8..4fa45f010 100644 --- a/codex-rs/core/tests/suite/otel.rs +++ b/codex-rs/core/tests/suite/otel.rs @@ -46,6 +46,7 @@ async fn responses_api_emits_api_request_event() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -87,6 +88,7 @@ async fn process_sse_emits_tracing_for_output_item() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -125,6 +127,7 @@ async fn process_sse_emits_failed_event_on_parse_error() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -164,6 +167,7 @@ async fn process_sse_records_failed_event_when_stream_closes_without_completed() items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -223,6 +227,7 @@ async fn process_sse_failed_event_records_response_error_message() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -280,6 +285,7 @@ async fn process_sse_failed_event_logs_parse_error() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -324,6 +330,7 @@ async fn process_sse_failed_event_logs_missing_error() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -377,6 +384,7 @@ async fn process_sse_failed_event_logs_response_completed_parse_error() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -427,6 +435,7 @@ async fn process_sse_emits_completed_telemetry() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -494,6 +503,7 @@ async fn handle_responses_span_records_response_kind_and_tool_name() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -558,6 +568,7 @@ async fn record_responses_sets_span_fields_for_response_events() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -637,6 +648,7 @@ async fn handle_response_item_records_tool_result_for_custom_tool_call() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -704,6 +716,7 @@ async fn handle_response_item_records_tool_result_for_function_call() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -781,6 +794,7 @@ async fn handle_response_item_records_tool_result_for_local_shell_missing_ids() items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -842,6 +856,7 @@ async fn handle_response_item_records_tool_result_for_local_shell_call() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -946,6 +961,7 @@ async fn handle_container_exec_autoapprove_from_config_records_tool_decision() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -994,6 +1010,7 @@ async fn handle_container_exec_user_approved_records_tool_decision() { items: vec![UserInput::Text { text: "approved".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1052,6 +1069,7 @@ async fn handle_container_exec_user_approved_for_session_records_tool_decision() items: vec![UserInput::Text { text: "persist".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1110,6 +1128,7 @@ async fn handle_sandbox_error_user_approves_retry_records_tool_decision() { items: vec![UserInput::Text { text: "retry".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1168,6 +1187,7 @@ async fn handle_container_exec_user_denies_records_tool_decision() { items: vec![UserInput::Text { text: "deny".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1226,6 +1246,7 @@ async fn handle_sandbox_error_user_approves_for_session_records_tool_decision() items: vec![UserInput::Text { text: "persist".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -1285,6 +1306,7 @@ async fn handle_sandbox_error_user_denies_records_tool_decision() { items: vec![UserInput::Text { text: "deny".into(), }], + final_output_json_schema: None, }) .await .unwrap(); diff --git a/codex-rs/core/tests/suite/prompt_caching.rs b/codex-rs/core/tests/suite/prompt_caching.rs index c21174014..f815e2e8a 100644 --- a/codex-rs/core/tests/suite/prompt_caching.rs +++ b/codex-rs/core/tests/suite/prompt_caching.rs @@ -102,6 +102,7 @@ async fn prompt_tools_are_consistent_across_requests() -> anyhow::Result<()> { items: vec![UserInput::Text { text: "hello 1".into(), }], + final_output_json_schema: None, }) .await?; wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; @@ -111,6 +112,7 @@ async fn prompt_tools_are_consistent_across_requests() -> anyhow::Result<()> { items: vec![UserInput::Text { text: "hello 2".into(), }], + final_output_json_schema: None, }) .await?; wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; @@ -175,6 +177,7 @@ async fn codex_mini_latest_tools() -> anyhow::Result<()> { items: vec![UserInput::Text { text: "hello 1".into(), }], + final_output_json_schema: None, }) .await?; @@ -184,6 +187,7 @@ async fn codex_mini_latest_tools() -> anyhow::Result<()> { items: vec![UserInput::Text { text: "hello 2".into(), }], + final_output_json_schema: None, }) .await?; @@ -238,6 +242,7 @@ async fn prefixes_context_and_instructions_once_and_consistently_across_requests items: vec![UserInput::Text { text: "hello 1".into(), }], + final_output_json_schema: None, }) .await?; wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; @@ -247,6 +252,7 @@ async fn prefixes_context_and_instructions_once_and_consistently_across_requests items: vec![UserInput::Text { text: "hello 2".into(), }], + final_output_json_schema: None, }) .await?; wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; @@ -307,6 +313,7 @@ async fn overrides_turn_context_but_keeps_cached_prefix_and_key_constant() -> an items: vec![UserInput::Text { text: "hello 1".into(), }], + final_output_json_schema: None, }) .await?; wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; @@ -334,6 +341,7 @@ async fn overrides_turn_context_but_keeps_cached_prefix_and_key_constant() -> an items: vec![UserInput::Text { text: "hello 2".into(), }], + final_output_json_schema: None, }) .await?; wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; @@ -412,6 +420,7 @@ async fn override_before_first_turn_emits_environment_context() -> anyhow::Resul items: vec![UserInput::Text { text: "first message".into(), }], + final_output_json_schema: None, }) .await?; @@ -504,6 +513,7 @@ async fn per_turn_overrides_keep_cached_prefix_and_key_constant() -> anyhow::Res items: vec![UserInput::Text { text: "hello 1".into(), }], + final_output_json_schema: None, }) .await?; wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; diff --git a/codex-rs/core/tests/suite/quota_exceeded.rs b/codex-rs/core/tests/suite/quota_exceeded.rs index 0156c8d11..e7ccd5384 100644 --- a/codex-rs/core/tests/suite/quota_exceeded.rs +++ b/codex-rs/core/tests/suite/quota_exceeded.rs @@ -44,6 +44,7 @@ async fn quota_exceeded_emits_single_error_event() -> Result<()> { items: vec![UserInput::Text { text: "quota?".into(), }], + final_output_json_schema: None, }) .await .unwrap(); diff --git a/codex-rs/core/tests/suite/resume.rs b/codex-rs/core/tests/suite/resume.rs index bda27533c..1fee3858e 100644 --- a/codex-rs/core/tests/suite/resume.rs +++ b/codex-rs/core/tests/suite/resume.rs @@ -37,6 +37,7 @@ async fn resume_includes_initial_messages_from_rollout_events() -> Result<()> { items: vec![UserInput::Text { text: "Record some messages".into(), }], + final_output_json_schema: None, }) .await?; @@ -89,6 +90,7 @@ async fn resume_includes_initial_messages_from_reasoning_events() -> Result<()> items: vec![UserInput::Text { text: "Record reasoning messages".into(), }], + final_output_json_schema: None, }) .await?; diff --git a/codex-rs/core/tests/suite/review.rs b/codex-rs/core/tests/suite/review.rs index b88abe7ac..b35213f7e 100644 --- a/codex-rs/core/tests/suite/review.rs +++ b/codex-rs/core/tests/suite/review.rs @@ -665,6 +665,7 @@ async fn review_history_surfaces_in_parent_session() { items: vec![UserInput::Text { text: followup.clone(), }], + final_output_json_schema: None, }) .await .unwrap(); diff --git a/codex-rs/core/tests/suite/stream_error_allows_next_turn.rs b/codex-rs/core/tests/suite/stream_error_allows_next_turn.rs index e6f8aa956..e7a609126 100644 --- a/codex-rs/core/tests/suite/stream_error_allows_next_turn.rs +++ b/codex-rs/core/tests/suite/stream_error_allows_next_turn.rs @@ -89,6 +89,7 @@ async fn continue_after_stream_error() { items: vec![UserInput::Text { text: "first message".into(), }], + final_output_json_schema: None, }) .await .unwrap(); @@ -106,6 +107,7 @@ async fn continue_after_stream_error() { items: vec![UserInput::Text { text: "follow up".into(), }], + final_output_json_schema: None, }) .await .unwrap(); diff --git a/codex-rs/core/tests/suite/stream_no_completed.rs b/codex-rs/core/tests/suite/stream_no_completed.rs index 17a19e7c3..a203658c2 100644 --- a/codex-rs/core/tests/suite/stream_no_completed.rs +++ b/codex-rs/core/tests/suite/stream_no_completed.rs @@ -96,6 +96,7 @@ async fn retries_on_early_close() { items: vec![UserInput::Text { text: "hello".into(), }], + final_output_json_schema: None, }) .await .unwrap(); diff --git a/codex-rs/core/tests/suite/user_notification.rs b/codex-rs/core/tests/suite/user_notification.rs index 0d1e941d2..a3289b87f 100644 --- a/codex-rs/core/tests/suite/user_notification.rs +++ b/codex-rs/core/tests/suite/user_notification.rs @@ -61,6 +61,7 @@ echo -n "${@: -1}" > $(dirname "${0}")/notify.txt"#, items: vec![UserInput::Text { text: "hello world".into(), }], + final_output_json_schema: None, }) .await?; wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; diff --git a/codex-rs/docs/codex_mcp_interface.md b/codex-rs/docs/codex_mcp_interface.md index 124e2f91d..edd5ac1b2 100644 --- a/codex-rs/docs/codex_mcp_interface.md +++ b/codex-rs/docs/codex_mcp_interface.md @@ -70,7 +70,9 @@ Response: `{ conversationId, model, reasoningEffort?, rolloutPath }` Send input to the active turn: - `sendUserMessage` → enqueue items to the conversation -- `sendUserTurn` → structured turn with explicit `cwd`, `approvalPolicy`, `sandboxPolicy`, `model`, optional `effort`, and `summary` +- `sendUserTurn` → structured turn with explicit `cwd`, `approvalPolicy`, `sandboxPolicy`, `model`, optional `effort`, `summary`, and optional `outputSchema` (JSON Schema for the final assistant message) + +For v2 threads, `turn/start` also accepts `outputSchema` to constrain the final assistant message for that turn. Interrupt a running turn: `interruptConversation`. diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index 39ae7486e..b50d0c764 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -94,6 +94,7 @@ pub async fn run_codex_tool_session( items: vec![UserInput::Text { text: initial_prompt.clone(), }], + final_output_json_schema: None, }, }; @@ -128,6 +129,7 @@ pub async fn run_codex_tool_session_reply( if let Err(e) = conversation .submit(Op::UserInput { items: vec![UserInput::Text { text: prompt }], + final_output_json_schema: None, }) .await { diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 489610862..f9369de9c 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -74,6 +74,9 @@ pub enum Op { UserInput { /// User input items, see `InputItem` items: Vec, + /// Optional JSON Schema used to constrain the final assistant message for this turn. + #[serde(skip_serializing_if = "Option::is_none")] + final_output_json_schema: Option, }, /// Similar to [`Op::UserInput`], but contains additional context required @@ -1928,6 +1931,62 @@ mod tests { assert!(event.as_legacy_events(false).is_empty()); } + #[test] + fn user_input_serialization_omits_final_output_json_schema_when_none() -> Result<()> { + let op = Op::UserInput { + items: Vec::new(), + final_output_json_schema: None, + }; + + let json_op = serde_json::to_value(op)?; + assert_eq!(json_op, json!({ "type": "user_input", "items": [] })); + + Ok(()) + } + + #[test] + fn user_input_deserializes_without_final_output_json_schema_field() -> Result<()> { + let op: Op = serde_json::from_value(json!({ "type": "user_input", "items": [] }))?; + + assert_eq!( + op, + Op::UserInput { + items: Vec::new(), + final_output_json_schema: None, + } + ); + + Ok(()) + } + + #[test] + fn user_input_serialization_includes_final_output_json_schema_when_some() -> Result<()> { + let schema = json!({ + "type": "object", + "properties": { + "answer": { "type": "string" } + }, + "required": ["answer"], + "additionalProperties": false + }); + let op = Op::UserInput { + items: Vec::new(), + final_output_json_schema: Some(schema.clone()), + }; + + let json_op = serde_json::to_value(op)?; + assert_eq!( + json_op, + json!({ + "type": "user_input", + "items": [], + "final_output_json_schema": schema, + }) + ); + + Ok(()) + } + /// Serialize Event to verify that its JSON representation has the expected /// amount of nesting. #[test] diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index f4418cead..41ab0087e 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -1953,7 +1953,10 @@ impl ChatWidget { } self.codex_op_tx - .send(Op::UserInput { items }) + .send(Op::UserInput { + items, + final_output_json_schema: None, + }) .unwrap_or_else(|e| { tracing::error!("failed to send message: {e}"); }); diff --git a/codex-rs/tui2/src/chatwidget.rs b/codex-rs/tui2/src/chatwidget.rs index d92cf6021..b661894a1 100644 --- a/codex-rs/tui2/src/chatwidget.rs +++ b/codex-rs/tui2/src/chatwidget.rs @@ -1761,7 +1761,10 @@ impl ChatWidget { } self.codex_op_tx - .send(Op::UserInput { items }) + .send(Op::UserInput { + items, + final_output_json_schema: None, + }) .unwrap_or_else(|e| { tracing::error!("failed to send message: {e}"); });