diff --git a/codex-rs/app-server-protocol/schema/json/EventMsg.json b/codex-rs/app-server-protocol/schema/json/EventMsg.json index 33f58d18b..518d65020 100644 --- a/codex-rs/app-server-protocol/schema/json/EventMsg.json +++ b/codex-rs/app-server-protocol/schema/json/EventMsg.json @@ -556,6 +556,73 @@ "title": "WarningEventMsg", "type": "object" }, + { + "description": "Realtime conversation lifecycle start event.", + "properties": { + "session_id": { + "type": [ + "string", + "null" + ] + }, + "type": { + "enum": [ + "realtime_conversation_started" + ], + "title": "RealtimeConversationStartedEventMsgType", + "type": "string" + } + }, + "required": [ + "type" + ], + "title": "RealtimeConversationStartedEventMsg", + "type": "object" + }, + { + "description": "Realtime conversation streaming payload event.", + "properties": { + "payload": { + "$ref": "#/definitions/RealtimeEvent" + }, + "type": { + "enum": [ + "realtime_conversation_realtime" + ], + "title": "RealtimeConversationRealtimeEventMsgType", + "type": "string" + } + }, + "required": [ + "payload", + "type" + ], + "title": "RealtimeConversationRealtimeEventMsg", + "type": "object" + }, + { + "description": "Realtime conversation lifecycle close event.", + "properties": { + "reason": { + "type": [ + "string", + "null" + ] + }, + "type": { + "enum": [ + "realtime_conversation_closed" + ], + "title": "RealtimeConversationClosedEventMsgType", + "type": "string" + } + }, + "required": [ + "type" + ], + "title": "RealtimeConversationClosedEventMsg", + "type": "object" + }, { "description": "Model routing changed from the requested model to a different model.", "properties": { @@ -3856,6 +3923,120 @@ } ] }, + "RealtimeAudioFrame": { + "properties": { + "data": { + "type": "string" + }, + "num_channels": { + "format": "uint16", + "minimum": 0.0, + "type": "integer" + }, + "sample_rate": { + "format": "uint32", + "minimum": 0.0, + "type": "integer" + }, + "samples_per_channel": { + "format": "uint32", + "minimum": 0.0, + "type": [ + "integer", + "null" + ] + } + }, + "required": [ + "data", + "num_channels", + "sample_rate" + ], + "type": "object" + }, + "RealtimeEvent": { + "oneOf": [ + { + "additionalProperties": false, + "properties": { + "SessionCreated": { + "properties": { + "session_id": { + "type": "string" + } + }, + "required": [ + "session_id" + ], + "type": "object" + } + }, + "required": [ + "SessionCreated" + ], + "title": "SessionCreatedRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "SessionUpdated": { + "properties": { + "backend_prompt": { + "type": [ + "string", + "null" + ] + } + }, + "type": "object" + } + }, + "required": [ + "SessionUpdated" + ], + "title": "SessionUpdatedRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "AudioOut": { + "$ref": "#/definitions/RealtimeAudioFrame" + } + }, + "required": [ + "AudioOut" + ], + "title": "AudioOutRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "ConversationItemAdded": true + }, + "required": [ + "ConversationItemAdded" + ], + "title": "ConversationItemAddedRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "Error": { + "type": "string" + } + }, + "required": [ + "Error" + ], + "title": "ErrorRealtimeEvent", + "type": "object" + } + ] + }, "ReasoningEffort": { "description": "See https://platform.openai.com/docs/guides/reasoning?api-mode=responses#get-started-with-reasoning", "enum": [ @@ -5620,6 +5801,73 @@ "title": "WarningEventMsg", "type": "object" }, + { + "description": "Realtime conversation lifecycle start event.", + "properties": { + "session_id": { + "type": [ + "string", + "null" + ] + }, + "type": { + "enum": [ + "realtime_conversation_started" + ], + "title": "RealtimeConversationStartedEventMsgType", + "type": "string" + } + }, + "required": [ + "type" + ], + "title": "RealtimeConversationStartedEventMsg", + "type": "object" + }, + { + "description": "Realtime conversation streaming payload event.", + "properties": { + "payload": { + "$ref": "#/definitions/RealtimeEvent" + }, + "type": { + "enum": [ + "realtime_conversation_realtime" + ], + "title": "RealtimeConversationRealtimeEventMsgType", + "type": "string" + } + }, + "required": [ + "payload", + "type" + ], + "title": "RealtimeConversationRealtimeEventMsg", + "type": "object" + }, + { + "description": "Realtime conversation lifecycle close event.", + "properties": { + "reason": { + "type": [ + "string", + "null" + ] + }, + "type": { + "enum": [ + "realtime_conversation_closed" + ], + "title": "RealtimeConversationClosedEventMsgType", + "type": "string" + } + }, + "required": [ + "type" + ], + "title": "RealtimeConversationClosedEventMsg", + "type": "object" + }, { "description": "Model routing changed from the requested model to a different model.", "properties": { diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index 1df0a8b05..51c445bf1 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -1372,6 +1372,73 @@ "title": "WarningEventMsg", "type": "object" }, + { + "description": "Realtime conversation lifecycle start event.", + "properties": { + "session_id": { + "type": [ + "string", + "null" + ] + }, + "type": { + "enum": [ + "realtime_conversation_started" + ], + "title": "RealtimeConversationStartedEventMsgType", + "type": "string" + } + }, + "required": [ + "type" + ], + "title": "RealtimeConversationStartedEventMsg", + "type": "object" + }, + { + "description": "Realtime conversation streaming payload event.", + "properties": { + "payload": { + "$ref": "#/definitions/RealtimeEvent" + }, + "type": { + "enum": [ + "realtime_conversation_realtime" + ], + "title": "RealtimeConversationRealtimeEventMsgType", + "type": "string" + } + }, + "required": [ + "payload", + "type" + ], + "title": "RealtimeConversationRealtimeEventMsg", + "type": "object" + }, + { + "description": "Realtime conversation lifecycle close event.", + "properties": { + "reason": { + "type": [ + "string", + "null" + ] + }, + "type": { + "enum": [ + "realtime_conversation_closed" + ], + "title": "RealtimeConversationClosedEventMsgType", + "type": "string" + } + }, + "required": [ + "type" + ], + "title": "RealtimeConversationClosedEventMsg", + "type": "object" + }, { "description": "Model routing changed from the requested model to a different model.", "properties": { @@ -5176,6 +5243,120 @@ } ] }, + "RealtimeAudioFrame": { + "properties": { + "data": { + "type": "string" + }, + "num_channels": { + "format": "uint16", + "minimum": 0.0, + "type": "integer" + }, + "sample_rate": { + "format": "uint32", + "minimum": 0.0, + "type": "integer" + }, + "samples_per_channel": { + "format": "uint32", + "minimum": 0.0, + "type": [ + "integer", + "null" + ] + } + }, + "required": [ + "data", + "num_channels", + "sample_rate" + ], + "type": "object" + }, + "RealtimeEvent": { + "oneOf": [ + { + "additionalProperties": false, + "properties": { + "SessionCreated": { + "properties": { + "session_id": { + "type": "string" + } + }, + "required": [ + "session_id" + ], + "type": "object" + } + }, + "required": [ + "SessionCreated" + ], + "title": "SessionCreatedRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "SessionUpdated": { + "properties": { + "backend_prompt": { + "type": [ + "string", + "null" + ] + } + }, + "type": "object" + } + }, + "required": [ + "SessionUpdated" + ], + "title": "SessionUpdatedRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "AudioOut": { + "$ref": "#/definitions/RealtimeAudioFrame" + } + }, + "required": [ + "AudioOut" + ], + "title": "AudioOutRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "ConversationItemAdded": true + }, + "required": [ + "ConversationItemAdded" + ], + "title": "ConversationItemAddedRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "Error": { + "type": "string" + } + }, + "required": [ + "Error" + ], + "title": "ErrorRealtimeEvent", + "type": "object" + } + ] + }, "ReasoningEffort": { "description": "See https://platform.openai.com/docs/guides/reasoning?api-mode=responses#get-started-with-reasoning", "enum": [ diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 4b1728512..2c1eb4031 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -2611,6 +2611,73 @@ "title": "WarningEventMsg", "type": "object" }, + { + "description": "Realtime conversation lifecycle start event.", + "properties": { + "session_id": { + "type": [ + "string", + "null" + ] + }, + "type": { + "enum": [ + "realtime_conversation_started" + ], + "title": "RealtimeConversationStartedEventMsgType", + "type": "string" + } + }, + "required": [ + "type" + ], + "title": "RealtimeConversationStartedEventMsg", + "type": "object" + }, + { + "description": "Realtime conversation streaming payload event.", + "properties": { + "payload": { + "$ref": "#/definitions/RealtimeEvent" + }, + "type": { + "enum": [ + "realtime_conversation_realtime" + ], + "title": "RealtimeConversationRealtimeEventMsgType", + "type": "string" + } + }, + "required": [ + "payload", + "type" + ], + "title": "RealtimeConversationRealtimeEventMsg", + "type": "object" + }, + { + "description": "Realtime conversation lifecycle close event.", + "properties": { + "reason": { + "type": [ + "string", + "null" + ] + }, + "type": { + "enum": [ + "realtime_conversation_closed" + ], + "title": "RealtimeConversationClosedEventMsgType", + "type": "string" + } + }, + "required": [ + "type" + ], + "title": "RealtimeConversationClosedEventMsg", + "type": "object" + }, { "description": "Model routing changed from the requested model to a different model.", "properties": { @@ -7027,6 +7094,120 @@ } ] }, + "RealtimeAudioFrame": { + "properties": { + "data": { + "type": "string" + }, + "num_channels": { + "format": "uint16", + "minimum": 0.0, + "type": "integer" + }, + "sample_rate": { + "format": "uint32", + "minimum": 0.0, + "type": "integer" + }, + "samples_per_channel": { + "format": "uint32", + "minimum": 0.0, + "type": [ + "integer", + "null" + ] + } + }, + "required": [ + "data", + "num_channels", + "sample_rate" + ], + "type": "object" + }, + "RealtimeEvent": { + "oneOf": [ + { + "additionalProperties": false, + "properties": { + "SessionCreated": { + "properties": { + "session_id": { + "type": "string" + } + }, + "required": [ + "session_id" + ], + "type": "object" + } + }, + "required": [ + "SessionCreated" + ], + "title": "SessionCreatedRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "SessionUpdated": { + "properties": { + "backend_prompt": { + "type": [ + "string", + "null" + ] + } + }, + "type": "object" + } + }, + "required": [ + "SessionUpdated" + ], + "title": "SessionUpdatedRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "AudioOut": { + "$ref": "#/definitions/RealtimeAudioFrame" + } + }, + "required": [ + "AudioOut" + ], + "title": "AudioOutRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "ConversationItemAdded": true + }, + "required": [ + "ConversationItemAdded" + ], + "title": "ConversationItemAddedRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "Error": { + "type": "string" + } + }, + "required": [ + "Error" + ], + "title": "ErrorRealtimeEvent", + "type": "object" + } + ] + }, "ReasoningEffort": { "description": "See https://platform.openai.com/docs/guides/reasoning?api-mode=responses#get-started-with-reasoning", "enum": [ diff --git a/codex-rs/app-server-protocol/schema/json/v1/ForkConversationResponse.json b/codex-rs/app-server-protocol/schema/json/v1/ForkConversationResponse.json index f88d5f741..70521ec72 100644 --- a/codex-rs/app-server-protocol/schema/json/v1/ForkConversationResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v1/ForkConversationResponse.json @@ -556,6 +556,73 @@ "title": "WarningEventMsg", "type": "object" }, + { + "description": "Realtime conversation lifecycle start event.", + "properties": { + "session_id": { + "type": [ + "string", + "null" + ] + }, + "type": { + "enum": [ + "realtime_conversation_started" + ], + "title": "RealtimeConversationStartedEventMsgType", + "type": "string" + } + }, + "required": [ + "type" + ], + "title": "RealtimeConversationStartedEventMsg", + "type": "object" + }, + { + "description": "Realtime conversation streaming payload event.", + "properties": { + "payload": { + "$ref": "#/definitions/RealtimeEvent" + }, + "type": { + "enum": [ + "realtime_conversation_realtime" + ], + "title": "RealtimeConversationRealtimeEventMsgType", + "type": "string" + } + }, + "required": [ + "payload", + "type" + ], + "title": "RealtimeConversationRealtimeEventMsg", + "type": "object" + }, + { + "description": "Realtime conversation lifecycle close event.", + "properties": { + "reason": { + "type": [ + "string", + "null" + ] + }, + "type": { + "enum": [ + "realtime_conversation_closed" + ], + "title": "RealtimeConversationClosedEventMsgType", + "type": "string" + } + }, + "required": [ + "type" + ], + "title": "RealtimeConversationClosedEventMsg", + "type": "object" + }, { "description": "Model routing changed from the requested model to a different model.", "properties": { @@ -3856,6 +3923,120 @@ } ] }, + "RealtimeAudioFrame": { + "properties": { + "data": { + "type": "string" + }, + "num_channels": { + "format": "uint16", + "minimum": 0.0, + "type": "integer" + }, + "sample_rate": { + "format": "uint32", + "minimum": 0.0, + "type": "integer" + }, + "samples_per_channel": { + "format": "uint32", + "minimum": 0.0, + "type": [ + "integer", + "null" + ] + } + }, + "required": [ + "data", + "num_channels", + "sample_rate" + ], + "type": "object" + }, + "RealtimeEvent": { + "oneOf": [ + { + "additionalProperties": false, + "properties": { + "SessionCreated": { + "properties": { + "session_id": { + "type": "string" + } + }, + "required": [ + "session_id" + ], + "type": "object" + } + }, + "required": [ + "SessionCreated" + ], + "title": "SessionCreatedRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "SessionUpdated": { + "properties": { + "backend_prompt": { + "type": [ + "string", + "null" + ] + } + }, + "type": "object" + } + }, + "required": [ + "SessionUpdated" + ], + "title": "SessionUpdatedRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "AudioOut": { + "$ref": "#/definitions/RealtimeAudioFrame" + } + }, + "required": [ + "AudioOut" + ], + "title": "AudioOutRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "ConversationItemAdded": true + }, + "required": [ + "ConversationItemAdded" + ], + "title": "ConversationItemAddedRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "Error": { + "type": "string" + } + }, + "required": [ + "Error" + ], + "title": "ErrorRealtimeEvent", + "type": "object" + } + ] + }, "ReasoningEffort": { "description": "See https://platform.openai.com/docs/guides/reasoning?api-mode=responses#get-started-with-reasoning", "enum": [ diff --git a/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationResponse.json b/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationResponse.json index e619dfe12..3ce759346 100644 --- a/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationResponse.json @@ -556,6 +556,73 @@ "title": "WarningEventMsg", "type": "object" }, + { + "description": "Realtime conversation lifecycle start event.", + "properties": { + "session_id": { + "type": [ + "string", + "null" + ] + }, + "type": { + "enum": [ + "realtime_conversation_started" + ], + "title": "RealtimeConversationStartedEventMsgType", + "type": "string" + } + }, + "required": [ + "type" + ], + "title": "RealtimeConversationStartedEventMsg", + "type": "object" + }, + { + "description": "Realtime conversation streaming payload event.", + "properties": { + "payload": { + "$ref": "#/definitions/RealtimeEvent" + }, + "type": { + "enum": [ + "realtime_conversation_realtime" + ], + "title": "RealtimeConversationRealtimeEventMsgType", + "type": "string" + } + }, + "required": [ + "payload", + "type" + ], + "title": "RealtimeConversationRealtimeEventMsg", + "type": "object" + }, + { + "description": "Realtime conversation lifecycle close event.", + "properties": { + "reason": { + "type": [ + "string", + "null" + ] + }, + "type": { + "enum": [ + "realtime_conversation_closed" + ], + "title": "RealtimeConversationClosedEventMsgType", + "type": "string" + } + }, + "required": [ + "type" + ], + "title": "RealtimeConversationClosedEventMsg", + "type": "object" + }, { "description": "Model routing changed from the requested model to a different model.", "properties": { @@ -3856,6 +3923,120 @@ } ] }, + "RealtimeAudioFrame": { + "properties": { + "data": { + "type": "string" + }, + "num_channels": { + "format": "uint16", + "minimum": 0.0, + "type": "integer" + }, + "sample_rate": { + "format": "uint32", + "minimum": 0.0, + "type": "integer" + }, + "samples_per_channel": { + "format": "uint32", + "minimum": 0.0, + "type": [ + "integer", + "null" + ] + } + }, + "required": [ + "data", + "num_channels", + "sample_rate" + ], + "type": "object" + }, + "RealtimeEvent": { + "oneOf": [ + { + "additionalProperties": false, + "properties": { + "SessionCreated": { + "properties": { + "session_id": { + "type": "string" + } + }, + "required": [ + "session_id" + ], + "type": "object" + } + }, + "required": [ + "SessionCreated" + ], + "title": "SessionCreatedRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "SessionUpdated": { + "properties": { + "backend_prompt": { + "type": [ + "string", + "null" + ] + } + }, + "type": "object" + } + }, + "required": [ + "SessionUpdated" + ], + "title": "SessionUpdatedRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "AudioOut": { + "$ref": "#/definitions/RealtimeAudioFrame" + } + }, + "required": [ + "AudioOut" + ], + "title": "AudioOutRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "ConversationItemAdded": true + }, + "required": [ + "ConversationItemAdded" + ], + "title": "ConversationItemAddedRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "Error": { + "type": "string" + } + }, + "required": [ + "Error" + ], + "title": "ErrorRealtimeEvent", + "type": "object" + } + ] + }, "ReasoningEffort": { "description": "See https://platform.openai.com/docs/guides/reasoning?api-mode=responses#get-started-with-reasoning", "enum": [ diff --git a/codex-rs/app-server-protocol/schema/json/v1/SessionConfiguredNotification.json b/codex-rs/app-server-protocol/schema/json/v1/SessionConfiguredNotification.json index 61887b400..a3010337b 100644 --- a/codex-rs/app-server-protocol/schema/json/v1/SessionConfiguredNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v1/SessionConfiguredNotification.json @@ -556,6 +556,73 @@ "title": "WarningEventMsg", "type": "object" }, + { + "description": "Realtime conversation lifecycle start event.", + "properties": { + "session_id": { + "type": [ + "string", + "null" + ] + }, + "type": { + "enum": [ + "realtime_conversation_started" + ], + "title": "RealtimeConversationStartedEventMsgType", + "type": "string" + } + }, + "required": [ + "type" + ], + "title": "RealtimeConversationStartedEventMsg", + "type": "object" + }, + { + "description": "Realtime conversation streaming payload event.", + "properties": { + "payload": { + "$ref": "#/definitions/RealtimeEvent" + }, + "type": { + "enum": [ + "realtime_conversation_realtime" + ], + "title": "RealtimeConversationRealtimeEventMsgType", + "type": "string" + } + }, + "required": [ + "payload", + "type" + ], + "title": "RealtimeConversationRealtimeEventMsg", + "type": "object" + }, + { + "description": "Realtime conversation lifecycle close event.", + "properties": { + "reason": { + "type": [ + "string", + "null" + ] + }, + "type": { + "enum": [ + "realtime_conversation_closed" + ], + "title": "RealtimeConversationClosedEventMsgType", + "type": "string" + } + }, + "required": [ + "type" + ], + "title": "RealtimeConversationClosedEventMsg", + "type": "object" + }, { "description": "Model routing changed from the requested model to a different model.", "properties": { @@ -3856,6 +3923,120 @@ } ] }, + "RealtimeAudioFrame": { + "properties": { + "data": { + "type": "string" + }, + "num_channels": { + "format": "uint16", + "minimum": 0.0, + "type": "integer" + }, + "sample_rate": { + "format": "uint32", + "minimum": 0.0, + "type": "integer" + }, + "samples_per_channel": { + "format": "uint32", + "minimum": 0.0, + "type": [ + "integer", + "null" + ] + } + }, + "required": [ + "data", + "num_channels", + "sample_rate" + ], + "type": "object" + }, + "RealtimeEvent": { + "oneOf": [ + { + "additionalProperties": false, + "properties": { + "SessionCreated": { + "properties": { + "session_id": { + "type": "string" + } + }, + "required": [ + "session_id" + ], + "type": "object" + } + }, + "required": [ + "SessionCreated" + ], + "title": "SessionCreatedRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "SessionUpdated": { + "properties": { + "backend_prompt": { + "type": [ + "string", + "null" + ] + } + }, + "type": "object" + } + }, + "required": [ + "SessionUpdated" + ], + "title": "SessionUpdatedRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "AudioOut": { + "$ref": "#/definitions/RealtimeAudioFrame" + } + }, + "required": [ + "AudioOut" + ], + "title": "AudioOutRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "ConversationItemAdded": true + }, + "required": [ + "ConversationItemAdded" + ], + "title": "ConversationItemAddedRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "Error": { + "type": "string" + } + }, + "required": [ + "Error" + ], + "title": "ErrorRealtimeEvent", + "type": "object" + } + ] + }, "ReasoningEffort": { "description": "See https://platform.openai.com/docs/guides/reasoning?api-mode=responses#get-started-with-reasoning", "enum": [ diff --git a/codex-rs/app-server-protocol/schema/typescript/EventMsg.ts b/codex-rs/app-server-protocol/schema/typescript/EventMsg.ts index 5ed3ca252..3aa06058e 100644 --- a/codex-rs/app-server-protocol/schema/typescript/EventMsg.ts +++ b/codex-rs/app-server-protocol/schema/typescript/EventMsg.ts @@ -47,6 +47,9 @@ import type { PatchApplyBeginEvent } from "./PatchApplyBeginEvent"; import type { PatchApplyEndEvent } from "./PatchApplyEndEvent"; import type { PlanDeltaEvent } from "./PlanDeltaEvent"; import type { RawResponseItemEvent } from "./RawResponseItemEvent"; +import type { RealtimeConversationClosedEvent } from "./RealtimeConversationClosedEvent"; +import type { RealtimeConversationRealtimeEvent } from "./RealtimeConversationRealtimeEvent"; +import type { RealtimeConversationStartedEvent } from "./RealtimeConversationStartedEvent"; import type { ReasoningContentDeltaEvent } from "./ReasoningContentDeltaEvent"; import type { ReasoningRawContentDeltaEvent } from "./ReasoningRawContentDeltaEvent"; import type { RemoteSkillDownloadedEvent } from "./RemoteSkillDownloadedEvent"; @@ -75,4 +78,4 @@ import type { WebSearchEndEvent } from "./WebSearchEndEvent"; * Response event from the agent * NOTE: Make sure none of these values have optional types, as it will mess up the extension code-gen. */ -export type EventMsg = { "type": "error" } & ErrorEvent | { "type": "warning" } & WarningEvent | { "type": "model_reroute" } & ModelRerouteEvent | { "type": "context_compacted" } & ContextCompactedEvent | { "type": "thread_rolled_back" } & ThreadRolledBackEvent | { "type": "task_started" } & TurnStartedEvent | { "type": "task_complete" } & TurnCompleteEvent | { "type": "token_count" } & TokenCountEvent | { "type": "agent_message" } & AgentMessageEvent | { "type": "user_message" } & UserMessageEvent | { "type": "agent_message_delta" } & AgentMessageDeltaEvent | { "type": "agent_reasoning" } & AgentReasoningEvent | { "type": "agent_reasoning_delta" } & AgentReasoningDeltaEvent | { "type": "agent_reasoning_raw_content" } & AgentReasoningRawContentEvent | { "type": "agent_reasoning_raw_content_delta" } & AgentReasoningRawContentDeltaEvent | { "type": "agent_reasoning_section_break" } & AgentReasoningSectionBreakEvent | { "type": "session_configured" } & SessionConfiguredEvent | { "type": "thread_name_updated" } & ThreadNameUpdatedEvent | { "type": "mcp_startup_update" } & McpStartupUpdateEvent | { "type": "mcp_startup_complete" } & McpStartupCompleteEvent | { "type": "mcp_tool_call_begin" } & McpToolCallBeginEvent | { "type": "mcp_tool_call_end" } & McpToolCallEndEvent | { "type": "web_search_begin" } & WebSearchBeginEvent | { "type": "web_search_end" } & WebSearchEndEvent | { "type": "exec_command_begin" } & ExecCommandBeginEvent | { "type": "exec_command_output_delta" } & ExecCommandOutputDeltaEvent | { "type": "terminal_interaction" } & TerminalInteractionEvent | { "type": "exec_command_end" } & ExecCommandEndEvent | { "type": "view_image_tool_call" } & ViewImageToolCallEvent | { "type": "exec_approval_request" } & ExecApprovalRequestEvent | { "type": "request_user_input" } & RequestUserInputEvent | { "type": "dynamic_tool_call_request" } & DynamicToolCallRequest | { "type": "elicitation_request" } & ElicitationRequestEvent | { "type": "apply_patch_approval_request" } & ApplyPatchApprovalRequestEvent | { "type": "deprecation_notice" } & DeprecationNoticeEvent | { "type": "background_event" } & BackgroundEventEvent | { "type": "undo_started" } & UndoStartedEvent | { "type": "undo_completed" } & UndoCompletedEvent | { "type": "stream_error" } & StreamErrorEvent | { "type": "patch_apply_begin" } & PatchApplyBeginEvent | { "type": "patch_apply_end" } & PatchApplyEndEvent | { "type": "turn_diff" } & TurnDiffEvent | { "type": "get_history_entry_response" } & GetHistoryEntryResponseEvent | { "type": "mcp_list_tools_response" } & McpListToolsResponseEvent | { "type": "list_custom_prompts_response" } & ListCustomPromptsResponseEvent | { "type": "list_skills_response" } & ListSkillsResponseEvent | { "type": "list_remote_skills_response" } & ListRemoteSkillsResponseEvent | { "type": "remote_skill_downloaded" } & RemoteSkillDownloadedEvent | { "type": "skills_update_available" } | { "type": "plan_update" } & UpdatePlanArgs | { "type": "turn_aborted" } & TurnAbortedEvent | { "type": "shutdown_complete" } | { "type": "entered_review_mode" } & ReviewRequest | { "type": "exited_review_mode" } & ExitedReviewModeEvent | { "type": "raw_response_item" } & RawResponseItemEvent | { "type": "item_started" } & ItemStartedEvent | { "type": "item_completed" } & ItemCompletedEvent | { "type": "agent_message_content_delta" } & AgentMessageContentDeltaEvent | { "type": "plan_delta" } & PlanDeltaEvent | { "type": "reasoning_content_delta" } & ReasoningContentDeltaEvent | { "type": "reasoning_raw_content_delta" } & ReasoningRawContentDeltaEvent | { "type": "collab_agent_spawn_begin" } & CollabAgentSpawnBeginEvent | { "type": "collab_agent_spawn_end" } & CollabAgentSpawnEndEvent | { "type": "collab_agent_interaction_begin" } & CollabAgentInteractionBeginEvent | { "type": "collab_agent_interaction_end" } & CollabAgentInteractionEndEvent | { "type": "collab_waiting_begin" } & CollabWaitingBeginEvent | { "type": "collab_waiting_end" } & CollabWaitingEndEvent | { "type": "collab_close_begin" } & CollabCloseBeginEvent | { "type": "collab_close_end" } & CollabCloseEndEvent | { "type": "collab_resume_begin" } & CollabResumeBeginEvent | { "type": "collab_resume_end" } & CollabResumeEndEvent; +export type EventMsg = { "type": "error" } & ErrorEvent | { "type": "warning" } & WarningEvent | { "type": "realtime_conversation_started" } & RealtimeConversationStartedEvent | { "type": "realtime_conversation_realtime" } & RealtimeConversationRealtimeEvent | { "type": "realtime_conversation_closed" } & RealtimeConversationClosedEvent | { "type": "model_reroute" } & ModelRerouteEvent | { "type": "context_compacted" } & ContextCompactedEvent | { "type": "thread_rolled_back" } & ThreadRolledBackEvent | { "type": "task_started" } & TurnStartedEvent | { "type": "task_complete" } & TurnCompleteEvent | { "type": "token_count" } & TokenCountEvent | { "type": "agent_message" } & AgentMessageEvent | { "type": "user_message" } & UserMessageEvent | { "type": "agent_message_delta" } & AgentMessageDeltaEvent | { "type": "agent_reasoning" } & AgentReasoningEvent | { "type": "agent_reasoning_delta" } & AgentReasoningDeltaEvent | { "type": "agent_reasoning_raw_content" } & AgentReasoningRawContentEvent | { "type": "agent_reasoning_raw_content_delta" } & AgentReasoningRawContentDeltaEvent | { "type": "agent_reasoning_section_break" } & AgentReasoningSectionBreakEvent | { "type": "session_configured" } & SessionConfiguredEvent | { "type": "thread_name_updated" } & ThreadNameUpdatedEvent | { "type": "mcp_startup_update" } & McpStartupUpdateEvent | { "type": "mcp_startup_complete" } & McpStartupCompleteEvent | { "type": "mcp_tool_call_begin" } & McpToolCallBeginEvent | { "type": "mcp_tool_call_end" } & McpToolCallEndEvent | { "type": "web_search_begin" } & WebSearchBeginEvent | { "type": "web_search_end" } & WebSearchEndEvent | { "type": "exec_command_begin" } & ExecCommandBeginEvent | { "type": "exec_command_output_delta" } & ExecCommandOutputDeltaEvent | { "type": "terminal_interaction" } & TerminalInteractionEvent | { "type": "exec_command_end" } & ExecCommandEndEvent | { "type": "view_image_tool_call" } & ViewImageToolCallEvent | { "type": "exec_approval_request" } & ExecApprovalRequestEvent | { "type": "request_user_input" } & RequestUserInputEvent | { "type": "dynamic_tool_call_request" } & DynamicToolCallRequest | { "type": "elicitation_request" } & ElicitationRequestEvent | { "type": "apply_patch_approval_request" } & ApplyPatchApprovalRequestEvent | { "type": "deprecation_notice" } & DeprecationNoticeEvent | { "type": "background_event" } & BackgroundEventEvent | { "type": "undo_started" } & UndoStartedEvent | { "type": "undo_completed" } & UndoCompletedEvent | { "type": "stream_error" } & StreamErrorEvent | { "type": "patch_apply_begin" } & PatchApplyBeginEvent | { "type": "patch_apply_end" } & PatchApplyEndEvent | { "type": "turn_diff" } & TurnDiffEvent | { "type": "get_history_entry_response" } & GetHistoryEntryResponseEvent | { "type": "mcp_list_tools_response" } & McpListToolsResponseEvent | { "type": "list_custom_prompts_response" } & ListCustomPromptsResponseEvent | { "type": "list_skills_response" } & ListSkillsResponseEvent | { "type": "list_remote_skills_response" } & ListRemoteSkillsResponseEvent | { "type": "remote_skill_downloaded" } & RemoteSkillDownloadedEvent | { "type": "skills_update_available" } | { "type": "plan_update" } & UpdatePlanArgs | { "type": "turn_aborted" } & TurnAbortedEvent | { "type": "shutdown_complete" } | { "type": "entered_review_mode" } & ReviewRequest | { "type": "exited_review_mode" } & ExitedReviewModeEvent | { "type": "raw_response_item" } & RawResponseItemEvent | { "type": "item_started" } & ItemStartedEvent | { "type": "item_completed" } & ItemCompletedEvent | { "type": "agent_message_content_delta" } & AgentMessageContentDeltaEvent | { "type": "plan_delta" } & PlanDeltaEvent | { "type": "reasoning_content_delta" } & ReasoningContentDeltaEvent | { "type": "reasoning_raw_content_delta" } & ReasoningRawContentDeltaEvent | { "type": "collab_agent_spawn_begin" } & CollabAgentSpawnBeginEvent | { "type": "collab_agent_spawn_end" } & CollabAgentSpawnEndEvent | { "type": "collab_agent_interaction_begin" } & CollabAgentInteractionBeginEvent | { "type": "collab_agent_interaction_end" } & CollabAgentInteractionEndEvent | { "type": "collab_waiting_begin" } & CollabWaitingBeginEvent | { "type": "collab_waiting_end" } & CollabWaitingEndEvent | { "type": "collab_close_begin" } & CollabCloseBeginEvent | { "type": "collab_close_end" } & CollabCloseEndEvent | { "type": "collab_resume_begin" } & CollabResumeBeginEvent | { "type": "collab_resume_end" } & CollabResumeEndEvent; diff --git a/codex-rs/app-server-protocol/schema/typescript/RealtimeAudioFrame.ts b/codex-rs/app-server-protocol/schema/typescript/RealtimeAudioFrame.ts new file mode 100644 index 000000000..99c0c1063 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/RealtimeAudioFrame.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type RealtimeAudioFrame = { data: string, sample_rate: number, num_channels: number, samples_per_channel: number | null, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/RealtimeConversationClosedEvent.ts b/codex-rs/app-server-protocol/schema/typescript/RealtimeConversationClosedEvent.ts new file mode 100644 index 000000000..c73e6833a --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/RealtimeConversationClosedEvent.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type RealtimeConversationClosedEvent = { reason: string | null, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/RealtimeConversationRealtimeEvent.ts b/codex-rs/app-server-protocol/schema/typescript/RealtimeConversationRealtimeEvent.ts new file mode 100644 index 000000000..4ff24a828 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/RealtimeConversationRealtimeEvent.ts @@ -0,0 +1,6 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { RealtimeEvent } from "./RealtimeEvent"; + +export type RealtimeConversationRealtimeEvent = { payload: RealtimeEvent, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/RealtimeConversationStartedEvent.ts b/codex-rs/app-server-protocol/schema/typescript/RealtimeConversationStartedEvent.ts new file mode 100644 index 000000000..f2894fcb1 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/RealtimeConversationStartedEvent.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type RealtimeConversationStartedEvent = { session_id: string | null, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/RealtimeEvent.ts b/codex-rs/app-server-protocol/schema/typescript/RealtimeEvent.ts new file mode 100644 index 000000000..6297beebe --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/RealtimeEvent.ts @@ -0,0 +1,7 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { RealtimeAudioFrame } from "./RealtimeAudioFrame"; +import type { JsonValue } from "./serde_json/JsonValue"; + +export type RealtimeEvent = { "SessionCreated": { session_id: string, } } | { "SessionUpdated": { backend_prompt: string | null, } } | { "AudioOut": RealtimeAudioFrame } | { "ConversationItemAdded": JsonValue } | { "Error": string }; diff --git a/codex-rs/app-server-protocol/schema/typescript/index.ts b/codex-rs/app-server-protocol/schema/typescript/index.ts index cda04def8..30197f892 100644 --- a/codex-rs/app-server-protocol/schema/typescript/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/index.ts @@ -148,6 +148,11 @@ export type { RateLimitSnapshot } from "./RateLimitSnapshot"; export type { RateLimitWindow } from "./RateLimitWindow"; export type { RawResponseItemEvent } from "./RawResponseItemEvent"; export type { ReadOnlyAccess } from "./ReadOnlyAccess"; +export type { RealtimeAudioFrame } from "./RealtimeAudioFrame"; +export type { RealtimeConversationClosedEvent } from "./RealtimeConversationClosedEvent"; +export type { RealtimeConversationRealtimeEvent } from "./RealtimeConversationRealtimeEvent"; +export type { RealtimeConversationStartedEvent } from "./RealtimeConversationStartedEvent"; +export type { RealtimeEvent } from "./RealtimeEvent"; export type { ReasoningContentDeltaEvent } from "./ReasoningContentDeltaEvent"; export type { ReasoningEffort } from "./ReasoningEffort"; export type { ReasoningItem } from "./ReasoningItem"; diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs index e9a297de0..48cc1d700 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs @@ -372,7 +372,8 @@ impl RealtimeWebsocketClient { default_headers: HeaderMap, ) -> Result { ensure_rustls_crypto_provider(); - let ws_url = websocket_url_from_api_url(config.api_url.as_str())?; + // Keep provider base_url semantics aligned with HTTP clients; derive the ws endpoint here. + let ws_url = websocket_url_from_api_url(self.provider.base_url.as_str())?; let mut request = ws_url .as_str() @@ -638,7 +639,7 @@ mod tests { let provider = Provider { name: "test".to_string(), - base_url: "http://localhost".to_string(), + base_url: format!("http://{addr}"), query_params: Some(HashMap::new()), headers: HeaderMap::new(), retry: crate::provider::RetryConfig { @@ -654,7 +655,6 @@ mod tests { let connection = client .connect( RealtimeSessionConfig { - api_url: format!("ws://{addr}"), prompt: "backend prompt".to_string(), session_id: Some("conv_1".to_string()), }, @@ -765,7 +765,7 @@ mod tests { let provider = Provider { name: "test".to_string(), - base_url: "http://localhost".to_string(), + base_url: format!("http://{addr}"), query_params: Some(HashMap::new()), headers: HeaderMap::new(), retry: crate::provider::RetryConfig { @@ -781,7 +781,6 @@ mod tests { let connection = client .connect( RealtimeSessionConfig { - api_url: format!("ws://{addr}"), prompt: "backend prompt".to_string(), session_id: Some("conv_1".to_string()), }, diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/mod.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/mod.rs index 469fea8dc..a89dbd3e7 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/mod.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/mod.rs @@ -1,10 +1,10 @@ pub mod methods; pub mod protocol; +pub use codex_protocol::protocol::RealtimeAudioFrame; +pub use codex_protocol::protocol::RealtimeEvent; pub use methods::RealtimeWebsocketClient; pub use methods::RealtimeWebsocketConnection; pub use methods::RealtimeWebsocketEvents; pub use methods::RealtimeWebsocketWriter; -pub use protocol::RealtimeAudioFrame; -pub use protocol::RealtimeEvent; pub use protocol::RealtimeSessionConfig; diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs index db63f5179..f2f0616fc 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs @@ -1,33 +1,15 @@ -use serde::Deserialize; +pub use codex_protocol::protocol::RealtimeAudioFrame; +pub use codex_protocol::protocol::RealtimeEvent; use serde::Serialize; use serde_json::Value; use tracing::debug; #[derive(Debug, Clone, PartialEq, Eq)] pub struct RealtimeSessionConfig { - pub api_url: String, pub prompt: String, pub session_id: Option, } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct RealtimeAudioFrame { - pub data: String, - pub sample_rate: u32, - pub num_channels: u16, - #[serde(skip_serializing_if = "Option::is_none")] - pub samples_per_channel: Option, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum RealtimeEvent { - SessionCreated { session_id: String }, - SessionUpdated { backend_prompt: Option }, - AudioOut(RealtimeAudioFrame), - ConversationItemAdded(Value), - Error(String), -} - #[derive(Debug, Clone, Serialize)] #[serde(tag = "type")] pub(super) enum RealtimeOutboundMessage { diff --git a/codex-rs/codex-api/src/lib.rs b/codex-rs/codex-api/src/lib.rs index ff8953c03..23c08d7e2 100644 --- a/codex-rs/codex-api/src/lib.rs +++ b/codex-rs/codex-api/src/lib.rs @@ -29,8 +29,6 @@ pub use crate::endpoint::aggregate::AggregateStreamExt; pub use crate::endpoint::compact::CompactClient; pub use crate::endpoint::memories::MemoriesClient; pub use crate::endpoint::models::ModelsClient; -pub use crate::endpoint::realtime_websocket::RealtimeAudioFrame; -pub use crate::endpoint::realtime_websocket::RealtimeEvent; pub use crate::endpoint::realtime_websocket::RealtimeSessionConfig; pub use crate::endpoint::realtime_websocket::RealtimeWebsocketClient; pub use crate::endpoint::realtime_websocket::RealtimeWebsocketConnection; @@ -44,3 +42,5 @@ pub use crate::provider::is_azure_responses_wire_base_url; pub use crate::sse::stream_from_fixture; pub use crate::telemetry::SseTelemetry; pub use crate::telemetry::WebsocketTelemetry; +pub use codex_protocol::protocol::RealtimeAudioFrame; +pub use codex_protocol::protocol::RealtimeEvent; diff --git a/codex-rs/codex-api/tests/realtime_websocket_e2e.rs b/codex-rs/codex-api/tests/realtime_websocket_e2e.rs index 08d93b914..ae8b082b9 100644 --- a/codex-rs/codex-api/tests/realtime_websocket_e2e.rs +++ b/codex-rs/codex-api/tests/realtime_websocket_e2e.rs @@ -50,10 +50,10 @@ where (addr, server) } -fn test_provider() -> Provider { +fn test_provider(base_url: String) -> Provider { Provider { name: "test".to_string(), - base_url: "http://localhost".to_string(), + base_url, query_params: Some(HashMap::new()), headers: HeaderMap::new(), retry: RetryConfig { @@ -124,11 +124,10 @@ async fn realtime_ws_e2e_session_create_and_event_flow() { }) .await; - let client = RealtimeWebsocketClient::new(test_provider()); + let client = RealtimeWebsocketClient::new(test_provider(format!("http://{addr}"))); let connection = client .connect( RealtimeSessionConfig { - api_url: format!("ws://{addr}"), prompt: "backend prompt".to_string(), session_id: Some("conv_123".to_string()), }, @@ -215,11 +214,10 @@ async fn realtime_ws_e2e_send_while_next_event_waits() { }) .await; - let client = RealtimeWebsocketClient::new(test_provider()); + let client = RealtimeWebsocketClient::new(test_provider(format!("http://{addr}"))); let connection = client .connect( RealtimeSessionConfig { - api_url: format!("ws://{addr}"), prompt: "backend prompt".to_string(), session_id: Some("conv_123".to_string()), }, @@ -277,11 +275,10 @@ async fn realtime_ws_e2e_disconnected_emitted_once() { }) .await; - let client = RealtimeWebsocketClient::new(test_provider()); + let client = RealtimeWebsocketClient::new(test_provider(format!("http://{addr}"))); let connection = client .connect( RealtimeSessionConfig { - api_url: format!("ws://{addr}"), prompt: "backend prompt".to_string(), session_id: Some("conv_123".to_string()), }, @@ -337,11 +334,10 @@ async fn realtime_ws_e2e_ignores_unknown_text_events() { }) .await; - let client = RealtimeWebsocketClient::new(test_provider()); + let client = RealtimeWebsocketClient::new(test_provider(format!("http://{addr}"))); let connection = client .connect( RealtimeSessionConfig { - api_url: format!("ws://{addr}"), prompt: "backend prompt".to_string(), session_id: Some("conv_123".to_string()), }, diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index fcddf4e83..88e6fef89 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -30,6 +30,11 @@ use crate::features::maybe_push_unstable_features_warning; use crate::models_manager::manager::ModelsManager; use crate::parse_command::parse_command; use crate::parse_turn_item; +use crate::realtime_conversation::RealtimeConversationManager; +use crate::realtime_conversation::handle_audio as handle_realtime_conversation_audio; +use crate::realtime_conversation::handle_close as handle_realtime_conversation_close; +use crate::realtime_conversation::handle_start as handle_realtime_conversation_start; +use crate::realtime_conversation::handle_text as handle_realtime_conversation_text; use crate::rollout::session_index; use crate::stream_events_utils::HandleOutputCtx; use crate::stream_events_utils::handle_non_tool_response_item; @@ -282,7 +287,7 @@ pub struct CodexSpawnOk { } pub(crate) const INITIAL_SUBMIT_ID: &str = ""; -pub(crate) const SUBMISSION_CHANNEL_CAPACITY: usize = 64; +pub(crate) const SUBMISSION_CHANNEL_CAPACITY: usize = 512; const CYBER_VERIFY_URL: &str = "https://chatgpt.com/cyber"; const CYBER_SAFETY_URL: &str = "https://developers.openai.com/codex/concepts/cyber-safety"; @@ -523,11 +528,13 @@ pub(crate) struct Session { /// session. features: Features, pending_mcp_server_refresh_config: Mutex>, + pub(crate) conversation: Arc, pub(crate) active_turn: Mutex>, pub(crate) services: SessionServices, js_repl: Arc, next_internal_sub_id: AtomicU64, } + /// The context needed for a single turn of the thread. #[derive(Debug)] pub(crate) struct TurnContext { @@ -1357,6 +1364,7 @@ impl Session { state: Mutex::new(state), features: config.features.clone(), pending_mcp_server_refresh_config: Mutex::new(None), + conversation: Arc::new(RealtimeConversationManager::new()), active_turn: Mutex::new(None), services, js_repl, @@ -1961,7 +1969,7 @@ impl Session { state.take_startup_regular_task() } - async fn get_config(&self) -> std::sync::Arc { + pub(crate) async fn get_config(&self) -> std::sync::Arc { let state = self.state.lock().await; state .session_configuration @@ -1969,6 +1977,11 @@ impl Session { .clone() } + pub(crate) async fn provider(&self) -> ModelProviderInfo { + let state = self.state.lock().await; + state.session_configuration.provider.clone() + } + pub(crate) async fn reload_user_config_layer(&self) { let config_toml_path = { let state = self.state.lock().await; @@ -3170,6 +3183,29 @@ async fn submission_loop(sess: Arc, config: Arc, rx_sub: Receiv Op::CleanBackgroundTerminals => { handlers::clean_background_terminals(&sess).await; } + Op::RealtimeConversationStart(params) => { + if let Err(err) = + handle_realtime_conversation_start(&sess, sub.id.clone(), params).await + { + sess.send_event_raw(Event { + id: sub.id.clone(), + msg: EventMsg::Error(ErrorEvent { + message: err.to_string(), + codex_error_info: Some(CodexErrorInfo::Other), + }), + }) + .await; + } + } + Op::RealtimeConversationAudio(params) => { + handle_realtime_conversation_audio(&sess, sub.id.clone(), params).await; + } + Op::RealtimeConversationText(params) => { + handle_realtime_conversation_text(&sess, sub.id.clone(), params).await; + } + Op::RealtimeConversationClose => { + handle_realtime_conversation_close(&sess, sub.id.clone()).await; + } Op::OverrideTurnContext { cwd, approval_policy, @@ -4015,6 +4051,7 @@ mod handlers { pub async fn shutdown(sess: &Arc, sub_id: String) -> bool { sess.abort_all_tasks(TurnAbortReason::Interrupted).await; + let _ = sess.conversation.shutdown().await; sess.services .unified_exec_manager .terminate_all_processes() @@ -7467,6 +7504,7 @@ mod tests { state: Mutex::new(state), features: config.features.clone(), pending_mcp_server_refresh_config: Mutex::new(None), + conversation: Arc::new(RealtimeConversationManager::new()), active_turn: Mutex::new(None), services, js_repl, @@ -7623,6 +7661,7 @@ mod tests { state: Mutex::new(state), features: config.features.clone(), pending_mcp_server_refresh_config: Mutex::new(None), + conversation: Arc::new(RealtimeConversationManager::new()), active_turn: Mutex::new(None), services, js_repl, diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index ccfd340df..4dc2a607c 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -13,6 +13,7 @@ pub mod auth; mod client; mod client_common; pub mod codex; +mod realtime_conversation; pub use codex::SteerInputError; mod codex_thread; mod compact_remote; diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs new file mode 100644 index 000000000..dfaddd97b --- /dev/null +++ b/codex-rs/core/src/realtime_conversation.rs @@ -0,0 +1,354 @@ +use crate::CodexAuth; +use crate::api_bridge::map_api_error; +use crate::codex::Session; +use crate::default_client::default_headers; +use crate::error::CodexErr; +use crate::error::Result as CodexResult; +use async_channel::Receiver; +use async_channel::Sender; +use async_channel::TrySendError; +use codex_api::Provider as ApiProvider; +use codex_api::RealtimeAudioFrame; +use codex_api::RealtimeEvent; +use codex_api::RealtimeSessionConfig; +use codex_api::RealtimeWebsocketClient; +use codex_api::endpoint::realtime_websocket::RealtimeWebsocketEvents; +use codex_api::endpoint::realtime_websocket::RealtimeWebsocketWriter; +use codex_protocol::protocol::CodexErrorInfo; +use codex_protocol::protocol::ConversationAudioParams; +use codex_protocol::protocol::ConversationStartParams; +use codex_protocol::protocol::ConversationTextParams; +use codex_protocol::protocol::ErrorEvent; +use codex_protocol::protocol::Event; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::RealtimeConversationClosedEvent; +use codex_protocol::protocol::RealtimeConversationRealtimeEvent; +use codex_protocol::protocol::RealtimeConversationStartedEvent; +use http::HeaderMap; +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio::task::JoinHandle; +use tracing::error; +use tracing::warn; + +const AUDIO_IN_QUEUE_CAPACITY: usize = 256; +const TEXT_IN_QUEUE_CAPACITY: usize = 64; +const OUTPUT_EVENTS_QUEUE_CAPACITY: usize = 256; + +pub(crate) struct RealtimeConversationManager { + state: Mutex>, +} + +#[allow(dead_code)] +struct ConversationState { + audio_tx: Sender, + text_tx: Sender, + task: JoinHandle<()>, +} + +#[allow(dead_code)] +impl RealtimeConversationManager { + pub(crate) fn new() -> Self { + Self { + state: Mutex::new(None), + } + } + + pub(crate) async fn running_state(&self) -> Option<()> { + let state = self.state.lock().await; + state.as_ref().map(|_| ()) + } + + pub(crate) async fn start( + &self, + api_provider: ApiProvider, + extra_headers: Option, + prompt: String, + session_id: Option, + ) -> CodexResult> { + let previous_state = { + let mut guard = self.state.lock().await; + guard.take() + }; + if let Some(state) = previous_state { + state.task.abort(); + let _ = state.task.await; + } + + let session_config = RealtimeSessionConfig { prompt, session_id }; + let client = RealtimeWebsocketClient::new(api_provider); + let connection = client + .connect( + session_config, + extra_headers.unwrap_or_default(), + default_headers(), + ) + .await + .map_err(map_api_error)?; + + let writer = connection.writer(); + let events = connection.events(); + let (audio_tx, audio_rx) = + async_channel::bounded::(AUDIO_IN_QUEUE_CAPACITY); + let (text_tx, text_rx) = async_channel::bounded::(TEXT_IN_QUEUE_CAPACITY); + let (events_tx, events_rx) = + async_channel::bounded::(OUTPUT_EVENTS_QUEUE_CAPACITY); + + let task = spawn_realtime_input_task(writer, events, text_rx, audio_rx, events_tx); + + let mut guard = self.state.lock().await; + *guard = Some(ConversationState { + audio_tx, + text_tx, + task, + }); + Ok(events_rx) + } + + pub(crate) async fn audio_in(&self, frame: RealtimeAudioFrame) -> CodexResult<()> { + let sender = { + let guard = self.state.lock().await; + guard.as_ref().map(|state| state.audio_tx.clone()) + }; + + let Some(sender) = sender else { + return Err(CodexErr::InvalidRequest( + "conversation is not running".to_string(), + )); + }; + + match sender.try_send(frame) { + Ok(()) => Ok(()), + Err(TrySendError::Full(_)) => { + warn!("dropping input audio frame due to full queue"); + Ok(()) + } + Err(TrySendError::Closed(_)) => Err(CodexErr::InvalidRequest( + "conversation is not running".to_string(), + )), + } + } + + pub(crate) async fn text_in(&self, text: String) -> CodexResult<()> { + let sender = { + let guard = self.state.lock().await; + guard.as_ref().map(|state| state.text_tx.clone()) + }; + + let Some(sender) = sender else { + return Err(CodexErr::InvalidRequest( + "conversation is not running".to_string(), + )); + }; + + sender + .send(text) + .await + .map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?; + Ok(()) + } + + pub(crate) async fn shutdown(&self) -> CodexResult<()> { + let state = { + let mut guard = self.state.lock().await; + guard.take() + }; + + if let Some(state) = state { + state.task.abort(); + let _ = state.task.await; + } + Ok(()) + } +} + +pub(crate) async fn handle_start( + sess: &Arc, + sub_id: String, + params: ConversationStartParams, +) -> CodexResult<()> { + let provider = sess.provider().await; + let auth = sess.services.auth_manager.auth().await; + let api_provider = provider.to_api_provider(auth.as_ref().map(CodexAuth::auth_mode))?; + + let requested_session_id = params + .session_id + .or_else(|| Some(sess.conversation_id.to_string())); + let events_rx = match sess + .conversation + .start( + api_provider, + None, + params.prompt, + requested_session_id.clone(), + ) + .await + { + Ok(events_rx) => events_rx, + Err(err) => { + send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::Other).await; + return Ok(()); + } + }; + + sess.send_event_raw(Event { + id: sub_id.clone(), + msg: EventMsg::RealtimeConversationStarted(RealtimeConversationStartedEvent { + session_id: requested_session_id, + }), + }) + .await; + + let sess_clone = Arc::clone(sess); + tokio::spawn(async move { + let ev = |msg| Event { + id: sub_id.clone(), + msg, + }; + while let Ok(event) = events_rx.recv().await { + sess_clone + .send_event_raw(ev(EventMsg::RealtimeConversationRealtime( + RealtimeConversationRealtimeEvent { payload: event }, + ))) + .await; + } + if let Some(()) = sess_clone.conversation.running_state().await { + sess_clone + .send_event_raw(ev(EventMsg::RealtimeConversationClosed( + RealtimeConversationClosedEvent { + reason: Some("transport_closed".to_string()), + }, + ))) + .await; + } + }); + + Ok(()) +} + +pub(crate) async fn handle_audio( + sess: &Arc, + sub_id: String, + params: ConversationAudioParams, +) { + if let Err(err) = sess.conversation.audio_in(params.frame).await { + send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest).await; + } +} + +pub(crate) async fn handle_text( + sess: &Arc, + sub_id: String, + params: ConversationTextParams, +) { + if let Err(err) = sess.conversation.text_in(params.text).await { + send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest).await; + } +} + +pub(crate) async fn handle_close(sess: &Arc, sub_id: String) { + match sess.conversation.shutdown().await { + Ok(()) => { + sess.send_event_raw(Event { + id: sub_id, + msg: EventMsg::RealtimeConversationClosed(RealtimeConversationClosedEvent { + reason: Some("requested".to_string()), + }), + }) + .await; + } + Err(err) => { + send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::Other).await; + } + } +} + +fn spawn_realtime_input_task( + writer: RealtimeWebsocketWriter, + events: RealtimeWebsocketEvents, + text_rx: Receiver, + audio_rx: Receiver, + events_tx: Sender, +) -> JoinHandle<()> { + tokio::spawn(async move { + loop { + tokio::select! { + biased; + text = text_rx.recv() => { + match text { + Ok(text) => { + if let Err(err) = writer.send_conversation_item_create(text).await { + let mapped_error = map_api_error(err); + warn!("failed to send input text: {mapped_error}"); + break; + } + } + Err(_) => break, + } + } + event = events.next_event() => { + match event { + Ok(Some(event)) => { + let should_stop = matches!(&event, RealtimeEvent::Error(_)); + if events_tx.send(event).await.is_err() { + break; + } + if should_stop { + error!("realtime stream error event received"); + break; + } + } + Ok(None) => { + let _ = events_tx + .send(RealtimeEvent::Error( + "realtime websocket connection is closed".to_string(), + )) + .await; + break; + } + Err(err) => { + let mapped_error = map_api_error(err); + if events_tx + .send(RealtimeEvent::Error(mapped_error.to_string())) + .await + .is_err() + { + break; + } + error!("realtime stream closed: {mapped_error}"); + break; + } + } + } + frame = audio_rx.recv() => { + match frame { + Ok(frame) => { + if let Err(err) = writer.send_audio_frame(frame).await { + let mapped_error = map_api_error(err); + error!("failed to send input audio: {mapped_error}"); + break; + } + } + Err(_) => break, + } + } + } + } + }) +} + +async fn send_conversation_error( + sess: &Arc, + sub_id: String, + message: String, + codex_error_info: CodexErrorInfo, +) { + sess.send_event_raw(Event { + id: sub_id, + msg: EventMsg::Error(ErrorEvent { + message, + codex_error_info: Some(codex_error_info), + }), + }) + .await; +} diff --git a/codex-rs/core/src/rollout/policy.rs b/codex-rs/core/src/rollout/policy.rs index f83433efb..7899f0e83 100644 --- a/codex-rs/core/src/rollout/policy.rs +++ b/codex-rs/core/src/rollout/policy.rs @@ -122,6 +122,9 @@ fn event_msg_persistence_mode(ev: &EventMsg) -> Option { | EventMsg::CollabCloseEnd(_) | EventMsg::CollabResumeEnd(_) => Some(EventPersistenceMode::Extended), EventMsg::Warning(_) + | EventMsg::RealtimeConversationStarted(_) + | EventMsg::RealtimeConversationRealtime(_) + | EventMsg::RealtimeConversationClosed(_) | EventMsg::ModelReroute(_) | EventMsg::AgentMessageDelta(_) | EventMsg::AgentReasoningDelta(_) diff --git a/codex-rs/core/tests/suite/mod.rs b/codex-rs/core/tests/suite/mod.rs index 5e3d65799..7d73e4668 100644 --- a/codex-rs/core/tests/suite/mod.rs +++ b/codex-rs/core/tests/suite/mod.rs @@ -96,6 +96,7 @@ mod personality_migration; mod prompt_caching; mod quota_exceeded; mod read_file; +mod realtime_conversation; mod remote_models; mod request_compression; mod request_user_input; diff --git a/codex-rs/core/tests/suite/realtime_conversation.rs b/codex-rs/core/tests/suite/realtime_conversation.rs new file mode 100644 index 000000000..a6eccc969 --- /dev/null +++ b/codex-rs/core/tests/suite/realtime_conversation.rs @@ -0,0 +1,360 @@ +use anyhow::Result; +use codex_core::protocol::CodexErrorInfo; +use codex_core::protocol::ConversationAudioParams; +use codex_core::protocol::ConversationStartParams; +use codex_core::protocol::ConversationTextParams; +use codex_core::protocol::ErrorEvent; +use codex_core::protocol::EventMsg; +use codex_core::protocol::Op; +use codex_core::protocol::RealtimeAudioFrame; +use codex_core::protocol::RealtimeConversationRealtimeEvent; +use codex_core::protocol::RealtimeEvent; +use core_test_support::responses::start_websocket_server; +use core_test_support::skip_if_no_network; +use core_test_support::test_codex::test_codex; +use core_test_support::wait_for_event_match; +use pretty_assertions::assert_eq; +use serde_json::json; +use std::time::Duration; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn conversation_start_audio_text_close_round_trip() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_websocket_server(vec![ + vec![], + vec![ + vec![json!({ + "type": "session.created", + "session": { "id": "sess_1" } + })], + vec![], + vec![ + json!({ + "type": "response.output_audio.delta", + "delta": "AQID", + "sample_rate": 24000, + "num_channels": 1 + }), + json!({ + "type": "conversation.item.added", + "item": { + "type": "message", + "role": "assistant", + "content": [{"type": "text", "text": "hi"}] + } + }), + ], + ], + ]) + .await; + + let mut builder = test_codex(); + let test = builder.build_with_websocket_server(&server).await?; + assert!(server.wait_for_handshakes(1, Duration::from_secs(2)).await); + + test.codex + .submit(Op::RealtimeConversationStart(ConversationStartParams { + prompt: "backend prompt".to_string(), + session_id: None, + })) + .await?; + + let started = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationStarted(started) => Some(Ok(started.clone())), + EventMsg::Error(err) => Some(Err(err.clone())), + _ => None, + }) + .await + .unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}")); + assert!(started.session_id.is_some()); + + let session_created = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::SessionCreated { session_id }, + }) => Some(session_id.clone()), + _ => None, + }) + .await; + assert_eq!(session_created, "sess_1"); + + test.codex + .submit(Op::RealtimeConversationAudio(ConversationAudioParams { + frame: RealtimeAudioFrame { + data: "AQID".to_string(), + sample_rate: 24000, + num_channels: 1, + samples_per_channel: Some(480), + }, + })) + .await?; + test.codex + .submit(Op::RealtimeConversationText(ConversationTextParams { + text: "hello".to_string(), + })) + .await?; + + let audio_out = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::AudioOut(frame), + }) => Some(frame.clone()), + _ => None, + }) + .await; + assert_eq!(audio_out.data, "AQID"); + + let connections = server.connections(); + assert_eq!(connections.len(), 2); + let connection = &connections[1]; + assert_eq!(connection.len(), 3); + assert_eq!( + connection[0].body_json()["type"].as_str(), + Some("session.create") + ); + assert_eq!( + connection[0].body_json()["session"]["conversation_id"] + .as_str() + .expect("session.create conversation_id"), + started + .session_id + .as_deref() + .expect("started session id should be present") + ); + let request_types = [ + connection[1].body_json()["type"] + .as_str() + .expect("request type") + .to_string(), + connection[2].body_json()["type"] + .as_str() + .expect("request type") + .to_string(), + ]; + assert_eq!( + request_types, + [ + "conversation.item.create".to_string(), + "response.input_audio.delta".to_string(), + ] + ); + + test.codex.submit(Op::RealtimeConversationClose).await?; + let closed = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()), + _ => None, + }) + .await; + assert!(matches!( + closed.reason.as_deref(), + Some("requested" | "transport_closed") + )); + + server.shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn conversation_transport_close_emits_closed_event() -> Result<()> { + skip_if_no_network!(Ok(())); + + let session_created = vec![json!({ + "type": "session.created", + "session": { "id": "sess_1" } + })]; + let server = start_websocket_server(vec![vec![], vec![session_created]]).await; + + let mut builder = test_codex(); + let test = builder.build_with_websocket_server(&server).await?; + assert!(server.wait_for_handshakes(1, Duration::from_secs(2)).await); + + test.codex + .submit(Op::RealtimeConversationStart(ConversationStartParams { + prompt: "backend prompt".to_string(), + session_id: None, + })) + .await?; + + let started = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationStarted(started) => Some(Ok(started.clone())), + EventMsg::Error(err) => Some(Err(err.clone())), + _ => None, + }) + .await + .unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}")); + assert!(started.session_id.is_some()); + + let session_created = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::SessionCreated { session_id }, + }) => Some(session_id.clone()), + _ => None, + }) + .await; + assert_eq!(session_created, "sess_1"); + + let closed = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()), + _ => None, + }) + .await; + assert_eq!(closed.reason.as_deref(), Some("transport_closed")); + + server.shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn conversation_audio_before_start_emits_error() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_websocket_server(vec![]).await; + let mut builder = test_codex(); + let test = builder.build_with_websocket_server(&server).await?; + + test.codex + .submit(Op::RealtimeConversationAudio(ConversationAudioParams { + frame: RealtimeAudioFrame { + data: "AQID".to_string(), + sample_rate: 24000, + num_channels: 1, + samples_per_channel: Some(480), + }, + })) + .await?; + + let err = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::Error(err) => Some(err.clone()), + _ => None, + }) + .await; + assert_eq!(err.codex_error_info, Some(CodexErrorInfo::BadRequest)); + assert_eq!(err.message, "conversation is not running"); + + server.shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn conversation_text_before_start_emits_error() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_websocket_server(vec![]).await; + let mut builder = test_codex(); + let test = builder.build_with_websocket_server(&server).await?; + + test.codex + .submit(Op::RealtimeConversationText(ConversationTextParams { + text: "hello".to_string(), + })) + .await?; + + let err = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::Error(err) => Some(err.clone()), + _ => None, + }) + .await; + assert_eq!(err.codex_error_info, Some(CodexErrorInfo::BadRequest)); + assert_eq!(err.message, "conversation is not running"); + + server.shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn conversation_second_start_replaces_runtime() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_websocket_server(vec![ + vec![], + vec![vec![json!({ + "type": "session.created", + "session": { "id": "sess_old" } + })]], + vec![ + vec![json!({ + "type": "session.created", + "session": { "id": "sess_new" } + })], + vec![json!({ + "type": "response.output_audio.delta", + "delta": "AQID", + "sample_rate": 24000, + "num_channels": 1 + })], + ], + ]) + .await; + let mut builder = test_codex(); + let test = builder.build_with_websocket_server(&server).await?; + assert!(server.wait_for_handshakes(1, Duration::from_secs(2)).await); + + test.codex + .submit(Op::RealtimeConversationStart(ConversationStartParams { + prompt: "old".to_string(), + session_id: Some("conv_old".to_string()), + })) + .await?; + wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::SessionCreated { session_id }, + }) if session_id == "sess_old" => Some(Ok(())), + EventMsg::Error(err) => Some(Err(err.clone())), + _ => None, + }) + .await + .unwrap_or_else(|err: ErrorEvent| panic!("first conversation start failed: {err:?}")); + + test.codex + .submit(Op::RealtimeConversationStart(ConversationStartParams { + prompt: "new".to_string(), + session_id: Some("conv_new".to_string()), + })) + .await?; + wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::SessionCreated { session_id }, + }) if session_id == "sess_new" => Some(Ok(())), + EventMsg::Error(err) => Some(Err(err.clone())), + _ => None, + }) + .await + .unwrap_or_else(|err: ErrorEvent| panic!("second conversation start failed: {err:?}")); + + test.codex + .submit(Op::RealtimeConversationAudio(ConversationAudioParams { + frame: RealtimeAudioFrame { + data: "AQID".to_string(), + sample_rate: 24000, + num_channels: 1, + samples_per_channel: Some(480), + }, + })) + .await?; + let _ = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::AudioOut(frame), + }) if frame.data == "AQID" => Some(()), + _ => None, + }) + .await; + + let connections = server.connections(); + assert_eq!(connections.len(), 3); + assert_eq!(connections[1].len(), 1); + assert_eq!( + connections[1][0].body_json()["session"]["conversation_id"].as_str(), + Some("conv_old") + ); + assert_eq!(connections[2].len(), 2); + assert_eq!( + connections[2][0].body_json()["session"]["conversation_id"].as_str(), + Some("conv_new") + ); + assert_eq!( + connections[2][1].body_json()["type"].as_str(), + Some("response.input_audio.delta") + ); + + server.shutdown().await; + Ok(()) +} diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index 6583c19f9..1e2a1b9e6 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -806,6 +806,9 @@ impl EventProcessor for EventProcessorWithHumanOutput { | EventMsg::RequestUserInput(_) | EventMsg::CollabResumeBegin(_) | EventMsg::CollabResumeEnd(_) + | EventMsg::RealtimeConversationStarted(_) + | EventMsg::RealtimeConversationRealtime(_) + | EventMsg::RealtimeConversationClosed(_) | EventMsg::DynamicToolCallRequest(_) => {} } CodexStatus::Running diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index c20c877c4..762052ccf 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -374,6 +374,9 @@ async fn run_codex_tool_session_inner( | EventMsg::CollabCloseEnd(_) | EventMsg::CollabResumeBegin(_) | EventMsg::CollabResumeEnd(_) + | EventMsg::RealtimeConversationStarted(_) + | EventMsg::RealtimeConversationRealtime(_) + | EventMsg::RealtimeConversationClosed(_) | EventMsg::DeprecationNotice(_) => { // For now, we do not do anything extra for these // events. Note that diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 5b3d41888..86a124865 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -85,6 +85,41 @@ pub struct McpServerRefreshConfig { pub mcp_oauth_credentials_store_mode: Value, } +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)] +pub struct ConversationStartParams { + pub prompt: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub session_id: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] +pub struct RealtimeAudioFrame { + pub data: String, + pub sample_rate: u32, + pub num_channels: u16, + #[serde(skip_serializing_if = "Option::is_none")] + pub samples_per_channel: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] +pub enum RealtimeEvent { + SessionCreated { session_id: String }, + SessionUpdated { backend_prompt: Option }, + AudioOut(RealtimeAudioFrame), + ConversationItemAdded(Value), + Error(String), +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)] +pub struct ConversationAudioParams { + pub frame: RealtimeAudioFrame, +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)] +pub struct ConversationTextParams { + pub text: String, +} + /// Submission operation #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema)] #[serde(tag = "type", rename_all = "snake_case")] @@ -98,6 +133,18 @@ pub enum Op { /// Terminate all running background terminal processes for this thread. CleanBackgroundTerminals, + /// Start a realtime conversation stream. + RealtimeConversationStart(ConversationStartParams), + + /// Send audio input to the running realtime conversation stream. + RealtimeConversationAudio(ConversationAudioParams), + + /// Send text input to the running realtime conversation stream. + RealtimeConversationText(ConversationTextParams), + + /// Close the running realtime conversation stream. + RealtimeConversationClose, + /// Legacy user input. /// /// Prefer [`Op::UserTurn`] so the caller provides full turn context @@ -899,6 +946,15 @@ pub enum EventMsg { /// indicates the turn continued but the user should still be notified. Warning(WarningEvent), + /// Realtime conversation lifecycle start event. + RealtimeConversationStarted(RealtimeConversationStartedEvent), + + /// Realtime conversation streaming payload event. + RealtimeConversationRealtime(RealtimeConversationRealtimeEvent), + + /// Realtime conversation lifecycle close event. + RealtimeConversationClosed(RealtimeConversationClosedEvent), + /// Model routing changed from the requested model to a different model. ModelReroute(ModelRerouteEvent), @@ -1078,6 +1134,22 @@ pub enum EventMsg { CollabResumeEnd(CollabResumeEndEvent), } +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)] +pub struct RealtimeConversationStartedEvent { + pub session_id: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)] +pub struct RealtimeConversationRealtimeEvent { + pub payload: RealtimeEvent, +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)] +pub struct RealtimeConversationClosedEvent { + #[serde(skip_serializing_if = "Option::is_none")] + pub reason: Option, +} + impl From for EventMsg { fn from(event: CollabAgentSpawnBeginEvent) -> Self { EventMsg::CollabAgentSpawnBegin(event) @@ -3047,6 +3119,61 @@ mod tests { assert!(event.affects_turn_status()); } + #[test] + fn conversation_op_serializes_as_unnested_variants() { + let audio = Op::RealtimeConversationAudio(ConversationAudioParams { + frame: RealtimeAudioFrame { + data: "AQID".to_string(), + sample_rate: 24_000, + num_channels: 1, + samples_per_channel: Some(480), + }, + }); + let start = Op::RealtimeConversationStart(ConversationStartParams { + prompt: "be helpful".to_string(), + session_id: Some("conv_1".to_string()), + }); + let text = Op::RealtimeConversationText(ConversationTextParams { + text: "hello".to_string(), + }); + let close = Op::RealtimeConversationClose; + + assert_eq!( + serde_json::to_value(&start).unwrap(), + json!({ + "type": "realtime_conversation_start", + "prompt": "be helpful", + "session_id": "conv_1" + }) + ); + assert_eq!( + serde_json::to_value(&audio).unwrap(), + json!({ + "type": "realtime_conversation_audio", + "frame": { + "data": "AQID", + "sample_rate": 24000, + "num_channels": 1, + "samples_per_channel": 480 + } + }) + ); + assert_eq!( + serde_json::from_value::(serde_json::to_value(&text).unwrap()).unwrap(), + text + ); + assert_eq!( + serde_json::to_value(&close).unwrap(), + json!({ + "type": "realtime_conversation_close" + }) + ); + assert_eq!( + serde_json::from_value::(serde_json::to_value(&close).unwrap()).unwrap(), + close + ); + } + #[test] fn user_input_serialization_omits_final_output_json_schema_when_none() -> Result<()> { let op = Op::UserInput { diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index afc894b22..8850cc8b7 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -4186,6 +4186,9 @@ impl ChatWidget { | EventMsg::AgentMessageContentDelta(_) | EventMsg::ReasoningContentDelta(_) | EventMsg::ReasoningRawContentDelta(_) + | EventMsg::RealtimeConversationStarted(_) + | EventMsg::RealtimeConversationRealtime(_) + | EventMsg::RealtimeConversationClosed(_) | EventMsg::DynamicToolCallRequest(_) => {} EventMsg::ItemCompleted(event) => { let item = event.item;