From 606d85055f61ca9e81f0b96a4e7f6effc33c82be Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Wed, 18 Mar 2026 09:37:13 -0700 Subject: [PATCH] Add notify to code-mode (#14842) Allows model to send an out-of-band notification. The notification is injected as another tool call output for the same call_id. --- .../schema/json/ClientRequest.json | 6 ++ .../codex_app_server_protocol.schemas.json | 6 ++ .../codex_app_server_protocol.v2.schemas.json | 6 ++ .../RawResponseItemCompletedNotification.json | 6 ++ .../schema/json/v2/ThreadResumeParams.json | 6 ++ .../schema/typescript/ResponseItem.ts | 2 +- codex-rs/core/src/client_common.rs | 8 +- codex-rs/core/src/client_common_tests.rs | 2 + codex-rs/core/src/context_manager/history.rs | 18 ++-- .../core/src/context_manager/history_tests.rs | 10 +++ .../core/src/context_manager/normalize.rs | 1 + codex-rs/core/src/guardian/prompt.rs | 28 +++--- codex-rs/core/src/stream_events_utils.rs | 15 ++-- codex-rs/core/src/tools/code_mode/bridge.js | 1 + .../core/src/tools/code_mode/description.md | 1 + .../src/tools/code_mode/execute_handler.rs | 5 +- codex-rs/core/src/tools/code_mode/mod.rs | 3 + codex-rs/core/src/tools/code_mode/process.rs | 17 ++-- codex-rs/core/src/tools/code_mode/protocol.rs | 56 +++++++++++- codex-rs/core/src/tools/code_mode/runner.cjs | 43 ++++++++- codex-rs/core/src/tools/code_mode/worker.rs | 89 +++++++++++++------ codex-rs/core/src/tools/context.rs | 1 + codex-rs/core/src/tools/context_tests.rs | 8 +- codex-rs/core/src/tools/js_repl/mod_tests.rs | 1 + codex-rs/core/tests/suite/client.rs | 3 + codex-rs/core/tests/suite/code_mode.rs | 39 ++++++++ codex-rs/protocol/src/models.rs | 19 +++- 27 files changed, 323 insertions(+), 77 deletions(-) diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index 5472a70a3..a6eb901a5 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -1759,6 +1759,12 @@ "call_id": { "type": "string" }, + "name": { + "type": [ + "string", + "null" + ] + }, "output": { "$ref": "#/definitions/FunctionCallOutputBody" }, diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 5ea8fc418..bc889ef3c 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -10360,6 +10360,12 @@ "call_id": { "type": "string" }, + "name": { + "type": [ + "string", + "null" + ] + }, "output": { "$ref": "#/definitions/v2/FunctionCallOutputBody" }, diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index e5a07c058..25155d483 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -7148,6 +7148,12 @@ "call_id": { "type": "string" }, + "name": { + "type": [ + "string", + "null" + ] + }, "output": { "$ref": "#/definitions/FunctionCallOutputBody" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/RawResponseItemCompletedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/RawResponseItemCompletedNotification.json index 621332c3c..2b0c66da4 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/RawResponseItemCompletedNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v2/RawResponseItemCompletedNotification.json @@ -607,6 +607,12 @@ "call_id": { "type": "string" }, + "name": { + "type": [ + "string", + "null" + ] + }, "output": { "$ref": "#/definitions/FunctionCallOutputBody" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json index 3524f86b2..3c8eb552a 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json @@ -673,6 +673,12 @@ "call_id": { "type": "string" }, + "name": { + "type": [ + "string", + "null" + ] + }, "output": { "$ref": "#/definitions/FunctionCallOutputBody" }, diff --git a/codex-rs/app-server-protocol/schema/typescript/ResponseItem.ts b/codex-rs/app-server-protocol/schema/typescript/ResponseItem.ts index 48e6b69d7..e9ab2a84f 100644 --- a/codex-rs/app-server-protocol/schema/typescript/ResponseItem.ts +++ b/codex-rs/app-server-protocol/schema/typescript/ResponseItem.ts @@ -15,4 +15,4 @@ export type ResponseItem = { "type": "message", role: string, content: Array { + ResponseItem::FunctionCallOutput { + call_id, output, .. + } + | ResponseItem::CustomToolCallOutput { + call_id, output, .. + } => { if shell_call_ids.remove(call_id) && let Some(structured) = output .text_content() diff --git a/codex-rs/core/src/client_common_tests.rs b/codex-rs/core/src/client_common_tests.rs index 769defabb..2f2305c7a 100644 --- a/codex-rs/core/src/client_common_tests.rs +++ b/codex-rs/core/src/client_common_tests.rs @@ -161,6 +161,7 @@ fn reserializes_shell_outputs_for_function_and_custom_tool_calls() { }, ResponseItem::CustomToolCallOutput { call_id: "call-2".to_string(), + name: None, output: FunctionCallOutputPayload::from_text(raw_output.to_string()), }, ]; @@ -190,6 +191,7 @@ fn reserializes_shell_outputs_for_function_and_custom_tool_calls() { }, ResponseItem::CustomToolCallOutput { call_id: "call-2".to_string(), + name: None, output: FunctionCallOutputPayload::from_text(expected_output.to_string()), }, ] diff --git a/codex-rs/core/src/context_manager/history.rs b/codex-rs/core/src/context_manager/history.rs index 4d7f4c558..d09e100fe 100644 --- a/codex-rs/core/src/context_manager/history.rs +++ b/codex-rs/core/src/context_manager/history.rs @@ -362,15 +362,15 @@ impl ContextManager { ), } } - ResponseItem::CustomToolCallOutput { call_id, output } => { - ResponseItem::CustomToolCallOutput { - call_id: call_id.clone(), - output: truncate_function_output_payload( - output, - policy_with_serialization_budget, - ), - } - } + ResponseItem::CustomToolCallOutput { + call_id, + name, + output, + } => ResponseItem::CustomToolCallOutput { + call_id: call_id.clone(), + name: name.clone(), + output: truncate_function_output_payload(output, policy_with_serialization_budget), + }, ResponseItem::Message { .. } | ResponseItem::Reasoning { .. } | ResponseItem::LocalShellCall { .. } diff --git a/codex-rs/core/src/context_manager/history_tests.rs b/codex-rs/core/src/context_manager/history_tests.rs index 066272748..71b3aded0 100644 --- a/codex-rs/core/src/context_manager/history_tests.rs +++ b/codex-rs/core/src/context_manager/history_tests.rs @@ -73,6 +73,7 @@ fn user_input_text_msg(text: &str) -> ResponseItem { fn custom_tool_call_output(call_id: &str, output: &str) -> ResponseItem { ResponseItem::CustomToolCallOutput { call_id: call_id.to_string(), + name: None, output: FunctionCallOutputPayload::from_text(output.to_string()), } } @@ -296,6 +297,7 @@ fn for_prompt_strips_images_when_model_does_not_support_images() { }, ResponseItem::CustomToolCallOutput { call_id: "tool-1".to_string(), + name: None, output: FunctionCallOutputPayload::from_content_items(vec![ FunctionCallOutputContentItem::InputText { text: "js repl result".to_string(), @@ -358,6 +360,7 @@ fn for_prompt_strips_images_when_model_does_not_support_images() { }, ResponseItem::CustomToolCallOutput { call_id: "tool-1".to_string(), + name: None, output: FunctionCallOutputPayload::from_content_items(vec![ FunctionCallOutputContentItem::InputText { text: "js repl result".to_string(), @@ -806,6 +809,7 @@ fn remove_first_item_handles_custom_tool_pair() { }, ResponseItem::CustomToolCallOutput { call_id: "tool-1".to_string(), + name: None, output: FunctionCallOutputPayload::from_text("ok".to_string()), }, ]; @@ -885,6 +889,7 @@ fn record_items_truncates_custom_tool_call_output_content() { let long_output = line.repeat(2_500); let item = ResponseItem::CustomToolCallOutput { call_id: "tool-200".to_string(), + name: None, output: FunctionCallOutputPayload::from_text(long_output.clone()), }; @@ -1087,6 +1092,7 @@ fn normalize_adds_missing_output_for_custom_tool_call() { }, ResponseItem::CustomToolCallOutput { call_id: "tool-x".to_string(), + name: None, output: FunctionCallOutputPayload::from_text("aborted".to_string()), }, ] @@ -1154,6 +1160,7 @@ fn normalize_removes_orphan_function_call_output() { fn normalize_removes_orphan_custom_tool_call_output() { let items = vec![ResponseItem::CustomToolCallOutput { call_id: "orphan-2".to_string(), + name: None, output: FunctionCallOutputPayload::from_text("ok".to_string()), }]; let mut h = create_history_with_items(items); @@ -1229,6 +1236,7 @@ fn normalize_mixed_inserts_and_removals() { }, ResponseItem::CustomToolCallOutput { call_id: "t1".to_string(), + name: None, output: FunctionCallOutputPayload::from_text("aborted".to_string()), }, ResponseItem::LocalShellCall { @@ -1366,6 +1374,7 @@ fn normalize_removes_orphan_function_call_output_panics_in_debug() { fn normalize_removes_orphan_custom_tool_call_output_panics_in_debug() { let items = vec![ResponseItem::CustomToolCallOutput { call_id: "orphan-2".to_string(), + name: None, output: FunctionCallOutputPayload::from_text("ok".to_string()), }]; let mut h = create_history_with_items(items); @@ -1532,6 +1541,7 @@ fn image_data_url_payload_does_not_dominate_custom_tool_call_output_estimate() { let image_url = format!("data:image/png;base64,{payload}"); let item = ResponseItem::CustomToolCallOutput { call_id: "call-js-repl".to_string(), + name: None, output: FunctionCallOutputPayload::from_content_items(vec![ FunctionCallOutputContentItem::InputText { text: "Screenshot captured".to_string(), diff --git a/codex-rs/core/src/context_manager/normalize.rs b/codex-rs/core/src/context_manager/normalize.rs index c217e0939..839bae331 100644 --- a/codex-rs/core/src/context_manager/normalize.rs +++ b/codex-rs/core/src/context_manager/normalize.rs @@ -79,6 +79,7 @@ pub(crate) fn ensure_call_outputs_present(items: &mut Vec) { idx, ResponseItem::CustomToolCallOutput { call_id: call_id.clone(), + name: None, output: FunctionCallOutputPayload::from_text("aborted".to_string()), }, )); diff --git a/codex-rs/core/src/guardian/prompt.rs b/codex-rs/core/src/guardian/prompt.rs index 5f315a6ca..be0291648 100644 --- a/codex-rs/core/src/guardian/prompt.rs +++ b/codex-rs/core/src/guardian/prompt.rs @@ -264,20 +264,22 @@ pub(crate) fn collect_guardian_transcript_entries( serde_json::to_string(action).ok(), ) }), - ResponseItem::FunctionCallOutput { call_id, output } - | ResponseItem::CustomToolCallOutput { call_id, output } => { - output.body.to_text().and_then(|text| { - non_empty_entry( - GuardianTranscriptEntryKind::Tool( - tool_names_by_call_id.get(call_id).map_or_else( - || "tool result".to_string(), - |name| format!("tool {name} result"), - ), - ), - text, - ) - }) + ResponseItem::FunctionCallOutput { + call_id, output, .. } + | ResponseItem::CustomToolCallOutput { + call_id, output, .. + } => output.body.to_text().and_then(|text| { + non_empty_entry( + GuardianTranscriptEntryKind::Tool( + tool_names_by_call_id.get(call_id).map_or_else( + || "tool result".to_string(), + |name| format!("tool {name} result"), + ), + ), + text, + ) + }), _ => None, }; diff --git a/codex-rs/core/src/stream_events_utils.rs b/codex-rs/core/src/stream_events_utils.rs index 084cb4b1a..c5f784906 100644 --- a/codex-rs/core/src/stream_events_utils.rs +++ b/codex-rs/core/src/stream_events_utils.rs @@ -384,12 +384,15 @@ pub(crate) fn response_input_to_response_item(input: &ResponseInputItem) -> Opti output: output.clone(), }) } - ResponseInputItem::CustomToolCallOutput { call_id, output } => { - Some(ResponseItem::CustomToolCallOutput { - call_id: call_id.clone(), - output: output.clone(), - }) - } + ResponseInputItem::CustomToolCallOutput { + call_id, + name, + output, + } => Some(ResponseItem::CustomToolCallOutput { + call_id: call_id.clone(), + name: name.clone(), + output: output.clone(), + }), ResponseInputItem::McpToolCallOutput { call_id, output } => { let output = output.as_function_call_output_payload(); Some(ResponseItem::FunctionCallOutput { diff --git a/codex-rs/core/src/tools/code_mode/bridge.js b/codex-rs/core/src/tools/code_mode/bridge.js index 3bce19290..0c61a9db1 100644 --- a/codex-rs/core/src/tools/code_mode/bridge.js +++ b/codex-rs/core/src/tools/code_mode/bridge.js @@ -30,6 +30,7 @@ Object.defineProperty(globalThis, '__codexContentItems', { defineGlobal('exit', __codexRuntime.exit); defineGlobal('image', __codexRuntime.image); defineGlobal('load', __codexRuntime.load); + defineGlobal('notify', __codexRuntime.notify); defineGlobal('store', __codexRuntime.store); defineGlobal('text', __codexRuntime.text); defineGlobal('tools', __codexRuntime.tools); diff --git a/codex-rs/core/src/tools/code_mode/description.md b/codex-rs/core/src/tools/code_mode/description.md index 79e51ebf6..6bf33a184 100644 --- a/codex-rs/core/src/tools/code_mode/description.md +++ b/codex-rs/core/src/tools/code_mode/description.md @@ -14,5 +14,6 @@ - `image(imageUrl: string)`: Appends an image item and returns it. `image_url` can be an HTTPS URL or a base64-encoded `data:` URL. - `store(key: string, value: any)`: stores a serializable value under a string key for later `exec` calls in the same session. - `load(key: string)`: returns the stored value for a string key, or `undefined` if it is missing. +- `notify(value: string | number | boolean | undefined | null)`: immediately injects an extra `custom_tool_call_output` for the current `exec` call. Values are stringified like `text(...)`. - `ALL_TOOLS`: metadata for the enabled nested tools as `{ name, description }` entries. - `yield_control()`: yields the accumulated output to the model immediately while the script keeps running. diff --git a/codex-rs/core/src/tools/code_mode/execute_handler.rs b/codex-rs/core/src/tools/code_mode/execute_handler.rs index 58f1ad50e..9eba126dd 100644 --- a/codex-rs/core/src/tools/code_mode/execute_handler.rs +++ b/codex-rs/core/src/tools/code_mode/execute_handler.rs @@ -43,6 +43,7 @@ impl CodeModeExecuteHandler { &self, session: std::sync::Arc, turn: std::sync::Arc, + call_id: String, code: String, ) -> Result { let args = parse_freeform_args(&code)?; @@ -62,6 +63,7 @@ impl CodeModeExecuteHandler { let message = HostToNodeMessage::Start { request_id: request_id.clone(), cell_id: cell_id.clone(), + tool_call_id: call_id, default_yield_time_ms: super::DEFAULT_EXEC_YIELD_TIME_MS, enabled_tools, stored_values, @@ -198,6 +200,7 @@ impl ToolHandler for CodeModeExecuteHandler { let ToolInvocation { session, turn, + call_id, tool_name, payload, .. @@ -205,7 +208,7 @@ impl ToolHandler for CodeModeExecuteHandler { match payload { ToolPayload::Custom { input } if tool_name == PUBLIC_TOOL_NAME => { - self.execute(session, turn, input).await + self.execute(session, turn, call_id, input).await } _ => Err(FunctionCallError::RespondToModel(format!( "{PUBLIC_TOOL_NAME} expects raw JavaScript source text" diff --git a/codex-rs/core/src/tools/code_mode/mod.rs b/codex-rs/core/src/tools/code_mode/mod.rs index 5a0be3ccf..a7a7c40ea 100644 --- a/codex-rs/core/src/tools/code_mode/mod.rs +++ b/codex-rs/core/src/tools/code_mode/mod.rs @@ -110,6 +110,9 @@ async fn handle_node_message( ) -> Result { match message { protocol::NodeToHostMessage::ToolCall { .. } => Err(protocol::unexpected_tool_call_error()), + protocol::NodeToHostMessage::Notify { .. } => Err(format!( + "unexpected {PUBLIC_TOOL_NAME} notify message in response path" + )), protocol::NodeToHostMessage::Yielded { content_items, .. } => { let mut delta_items = output_content_items_from_json_values(content_items)?; delta_items = truncate_code_mode_result(delta_items, poll_max_output_tokens.flatten()); diff --git a/codex-rs/core/src/tools/code_mode/process.rs b/codex-rs/core/src/tools/code_mode/process.rs index d27296fca..6dd6cde3a 100644 --- a/codex-rs/core/src/tools/code_mode/process.rs +++ b/codex-rs/core/src/tools/code_mode/process.rs @@ -13,7 +13,6 @@ use tracing::warn; use super::CODE_MODE_RUNNER_SOURCE; use super::PUBLIC_TOOL_NAME; -use super::protocol::CodeModeToolCall; use super::protocol::HostToNodeMessage; use super::protocol::NodeToHostMessage; use super::protocol::message_request_id; @@ -23,7 +22,7 @@ pub(super) struct CodeModeProcess { pub(super) stdin: Arc>, pub(super) stdout_task: JoinHandle<()>, pub(super) response_waiters: Arc>>>, - pub(super) tool_call_rx: Arc>>, + pub(super) message_rx: Arc>>, } impl CodeModeProcess { @@ -92,7 +91,7 @@ pub(super) async fn spawn_code_mode_process( String, oneshot::Sender, >::new())); - let (tool_call_tx, tool_call_rx) = mpsc::unbounded_channel(); + let (message_tx, message_rx) = mpsc::unbounded_channel(); tokio::spawn(async move { let mut reader = BufReader::new(stderr); @@ -135,12 +134,14 @@ pub(super) async fn spawn_code_mode_process( } }; match message { - NodeToHostMessage::ToolCall { tool_call } => { - let _ = tool_call_tx.send(tool_call); + message @ (NodeToHostMessage::ToolCall { .. } + | NodeToHostMessage::Notify { .. }) => { + let _ = message_tx.send(message); } message => { - let request_id = message_request_id(&message).to_string(); - if let Some(waiter) = response_waiters.lock().await.remove(&request_id) { + if let Some(request_id) = message_request_id(&message) + && let Some(waiter) = response_waiters.lock().await.remove(request_id) + { let _ = waiter.send(message); } } @@ -155,7 +156,7 @@ pub(super) async fn spawn_code_mode_process( stdin, stdout_task, response_waiters, - tool_call_rx: Arc::new(Mutex::new(tool_call_rx)), + message_rx: Arc::new(Mutex::new(message_rx)), }) } diff --git a/codex-rs/core/src/tools/code_mode/protocol.rs b/codex-rs/core/src/tools/code_mode/protocol.rs index fc0a497ea..8116d95b4 100644 --- a/codex-rs/core/src/tools/code_mode/protocol.rs +++ b/codex-rs/core/src/tools/code_mode/protocol.rs @@ -36,12 +36,20 @@ pub(super) struct CodeModeToolCall { pub(super) input: Option, } +#[derive(Clone, Debug, Deserialize)] +pub(super) struct CodeModeNotify { + pub(super) cell_id: String, + pub(super) call_id: String, + pub(super) text: String, +} + #[derive(Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub(super) enum HostToNodeMessage { Start { request_id: String, cell_id: String, + tool_call_id: String, default_yield_time_ms: u64, enabled_tools: Vec, stored_values: HashMap, @@ -65,7 +73,7 @@ pub(super) enum HostToNodeMessage { }, } -#[derive(Deserialize)] +#[derive(Debug, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] pub(super) enum NodeToHostMessage { ToolCall { @@ -80,6 +88,10 @@ pub(super) enum NodeToHostMessage { request_id: String, content_items: Vec, }, + Notify { + #[serde(flatten)] + notify: CodeModeNotify, + }, Result { request_id: String, content_items: Vec, @@ -105,15 +117,51 @@ pub(super) fn build_source( .replace("__CODE_MODE_USER_CODE_PLACEHOLDER__", user_code)) } -pub(super) fn message_request_id(message: &NodeToHostMessage) -> &str { +pub(super) fn message_request_id(message: &NodeToHostMessage) -> Option<&str> { match message { - NodeToHostMessage::ToolCall { tool_call } => &tool_call.request_id, + NodeToHostMessage::ToolCall { .. } => None, NodeToHostMessage::Yielded { request_id, .. } | NodeToHostMessage::Terminated { request_id, .. } - | NodeToHostMessage::Result { request_id, .. } => request_id, + | NodeToHostMessage::Result { request_id, .. } => Some(request_id), + NodeToHostMessage::Notify { .. } => None, } } pub(super) fn unexpected_tool_call_error() -> String { format!("{PUBLIC_TOOL_NAME} received an unexpected tool call response") } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use super::CodeModeNotify; + use super::NodeToHostMessage; + use super::message_request_id; + + #[test] + fn message_request_id_absent_for_notify() { + let message = NodeToHostMessage::Notify { + notify: CodeModeNotify { + cell_id: "1".to_string(), + call_id: "call-1".to_string(), + text: "hello".to_string(), + }, + }; + + assert_eq!(None, message_request_id(&message)); + } + + #[test] + fn message_request_id_present_for_result() { + let message = NodeToHostMessage::Result { + request_id: "req-1".to_string(), + content_items: Vec::new(), + stored_values: HashMap::new(), + error_text: None, + max_output_tokens_per_exec_call: None, + }; + + assert_eq!(Some("req-1"), message_request_id(&message)); + } +} diff --git a/codex-rs/core/src/tools/code_mode/runner.cjs b/codex-rs/core/src/tools/code_mode/runner.cjs index 48b9d5a9a..2fcfddeaf 100644 --- a/codex-rs/core/src/tools/code_mode/runner.cjs +++ b/codex-rs/core/src/tools/code_mode/runner.cjs @@ -233,7 +233,7 @@ function codeModeWorkerMain() { throw new TypeError('image expects an http(s) or data URL'); } - function createCodeModeHelpers(context, state) { + function createCodeModeHelpers(context, state, toolCallId) { const load = (key) => { if (typeof key !== 'string') { throw new TypeError('load key must be a string'); @@ -268,6 +268,21 @@ function codeModeWorkerMain() { const yieldControl = () => { parentPort.postMessage({ type: 'yield' }); }; + const notify = (value) => { + const text = serializeOutputText(value); + if (text.trim().length === 0) { + throw new TypeError('notify expects non-empty text'); + } + if (typeof toolCallId !== 'string' || toolCallId.length === 0) { + throw new TypeError('notify requires a valid tool call id'); + } + parentPort.postMessage({ + type: 'notify', + call_id: toolCallId, + text, + }); + return text; + }; const exit = () => { throw new CodeModeExitSignal(); }; @@ -276,6 +291,7 @@ function codeModeWorkerMain() { exit, image, load, + notify, output_image: image, output_text: text, store, @@ -290,6 +306,7 @@ function codeModeWorkerMain() { 'exit', 'image', 'load', + 'notify', 'output_text', 'output_image', 'store', @@ -300,6 +317,7 @@ function codeModeWorkerMain() { this.setExport('exit', helpers.exit); this.setExport('image', helpers.image); this.setExport('load', helpers.load); + this.setExport('notify', helpers.notify); this.setExport('output_text', helpers.output_text); this.setExport('output_image', helpers.output_image); this.setExport('store', helpers.store); @@ -316,6 +334,7 @@ function codeModeWorkerMain() { exit: helpers.exit, image: helpers.image, load: helpers.load, + notify: helpers.notify, store: helpers.store, text: helpers.text, tools: createGlobalToolsNamespace(callTool, enabledTools), @@ -448,6 +467,7 @@ function codeModeWorkerMain() { async function main() { const start = workerData ?? {}; + const toolCallId = start.tool_call_id; const state = { storedValues: cloneJsonValue(start.stored_values ?? {}), }; @@ -457,7 +477,7 @@ function codeModeWorkerMain() { const context = vm.createContext({ __codexContentItems: contentItems, }); - const helpers = createCodeModeHelpers(context, state); + const helpers = createCodeModeHelpers(context, state, toolCallId); Object.defineProperty(context, '__codexRuntime', { value: createBridgeRuntime(callTool, enabledTools, helpers), configurable: true, @@ -631,6 +651,9 @@ function sessionWorkerSource() { } function startSession(protocol, sessions, start) { + if (typeof start.tool_call_id !== 'string' || start.tool_call_id.length === 0) { + throw new TypeError('start requires a valid tool_call_id'); + } const maxOutputTokensPerExecCall = start.max_output_tokens == null ? DEFAULT_MAX_OUTPUT_TOKENS_PER_EXEC_CALL @@ -704,6 +727,22 @@ async function handleWorkerMessage(protocol, sessions, session, message) { return; } + if (message.type === 'notify') { + if (typeof message.text !== 'string' || message.text.trim().length === 0) { + throw new TypeError('notify requires non-empty text'); + } + if (typeof message.call_id !== 'string' || message.call_id.length === 0) { + throw new TypeError('notify requires a valid call id'); + } + await protocol.send({ + type: 'notify', + cell_id: session.id, + call_id: message.call_id, + text: message.text, + }); + return; + } + if (message.type === 'tool_call') { void forwardToolCall(protocol, session, message); return; diff --git a/codex-rs/core/src/tools/code_mode/worker.rs b/codex-rs/core/src/tools/code_mode/worker.rs index 223ac7a7c..7456f9c6f 100644 --- a/codex-rs/core/src/tools/code_mode/worker.rs +++ b/codex-rs/core/src/tools/code_mode/worker.rs @@ -1,13 +1,18 @@ use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; +use tracing::error; use tracing::warn; +use codex_protocol::models::FunctionCallOutputPayload; +use codex_protocol::models::ResponseInputItem; + use super::ExecContext; use super::PUBLIC_TOOL_NAME; use super::call_nested_tool; use super::process::CodeModeProcess; use super::process::write_message; use super::protocol::HostToNodeMessage; +use super::protocol::NodeToHostMessage; use crate::tools::parallel::ToolCallRuntime; pub(crate) struct CodeModeWorker { shutdown_tx: Option>, @@ -29,39 +34,71 @@ impl CodeModeProcess { ) -> CodeModeWorker { let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); let stdin = self.stdin.clone(); - let tool_call_rx = self.tool_call_rx.clone(); + let message_rx = self.message_rx.clone(); tokio::spawn(async move { loop { - let tool_call = tokio::select! { + let next_message = tokio::select! { _ = &mut shutdown_rx => break, - tool_call = async { - let mut tool_call_rx = tool_call_rx.lock().await; - tool_call_rx.recv().await - } => tool_call, + message = async { + let mut message_rx = message_rx.lock().await; + message_rx.recv().await + } => message, }; - let Some(tool_call) = tool_call else { + let Some(next_message) = next_message else { break; }; - let exec = exec.clone(); - let tool_runtime = tool_runtime.clone(); - let stdin = stdin.clone(); - tokio::spawn(async move { - let response = HostToNodeMessage::Response { - request_id: tool_call.request_id, - id: tool_call.id, - code_mode_result: call_nested_tool( - exec, - tool_runtime, - tool_call.name, - tool_call.input, - CancellationToken::new(), - ) - .await, - }; - if let Err(err) = write_message(&stdin, &response).await { - warn!("failed to write {PUBLIC_TOOL_NAME} tool response: {err}"); + match next_message { + NodeToHostMessage::ToolCall { tool_call } => { + let exec = exec.clone(); + let tool_runtime = tool_runtime.clone(); + let stdin = stdin.clone(); + tokio::spawn(async move { + let response = HostToNodeMessage::Response { + request_id: tool_call.request_id, + id: tool_call.id, + code_mode_result: call_nested_tool( + exec, + tool_runtime, + tool_call.name, + tool_call.input, + CancellationToken::new(), + ) + .await, + }; + if let Err(err) = write_message(&stdin, &response).await { + warn!("failed to write {PUBLIC_TOOL_NAME} tool response: {err}"); + } + }); } - }); + NodeToHostMessage::Notify { notify } => { + if notify.text.trim().is_empty() { + continue; + } + if exec + .session + .inject_response_items(vec![ResponseInputItem::CustomToolCallOutput { + call_id: notify.call_id.clone(), + name: Some(PUBLIC_TOOL_NAME.to_string()), + output: FunctionCallOutputPayload::from_text(notify.text), + }]) + .await + .is_err() + { + warn!( + "failed to inject {PUBLIC_TOOL_NAME} notify message for cell {}: no active turn", + notify.cell_id + ); + } + } + unexpected_message @ (NodeToHostMessage::Yielded { .. } + | NodeToHostMessage::Terminated { .. } + | NodeToHostMessage::Result { .. }) => { + error!( + "received unexpected {PUBLIC_TOOL_NAME} message in worker loop: {unexpected_message:?}" + ); + break; + } + } } }); diff --git a/codex-rs/core/src/tools/context.rs b/codex-rs/core/src/tools/context.rs index b7f185ceb..6670af261 100644 --- a/codex-rs/core/src/tools/context.rs +++ b/codex-rs/core/src/tools/context.rs @@ -417,6 +417,7 @@ fn function_tool_response( if matches!(payload, ToolPayload::Custom { .. }) { return ResponseInputItem::CustomToolCallOutput { call_id: call_id.to_string(), + name: None, output: FunctionCallOutputPayload { body, success }, }; } diff --git a/codex-rs/core/src/tools/context_tests.rs b/codex-rs/core/src/tools/context_tests.rs index e494e41dc..54bf2ec75 100644 --- a/codex-rs/core/src/tools/context_tests.rs +++ b/codex-rs/core/src/tools/context_tests.rs @@ -12,7 +12,9 @@ fn custom_tool_calls_should_roundtrip_as_custom_outputs() { .to_response_item("call-42", &payload); match response { - ResponseInputItem::CustomToolCallOutput { call_id, output } => { + ResponseInputItem::CustomToolCallOutput { + call_id, output, .. + } => { assert_eq!(call_id, "call-42"); assert_eq!(output.content_items(), None); assert_eq!(output.body.to_text().as_deref(), Some("patched")); @@ -106,7 +108,9 @@ fn custom_tool_calls_can_derive_text_from_content_items() { .to_response_item("call-99", &payload); match response { - ResponseInputItem::CustomToolCallOutput { call_id, output } => { + ResponseInputItem::CustomToolCallOutput { + call_id, output, .. + } => { let expected = vec![ FunctionCallOutputContentItem::InputText { text: "line 1".to_string(), diff --git a/codex-rs/core/src/tools/js_repl/mod_tests.rs b/codex-rs/core/src/tools/js_repl/mod_tests.rs index db23072ef..54779d809 100644 --- a/codex-rs/core/src/tools/js_repl/mod_tests.rs +++ b/codex-rs/core/src/tools/js_repl/mod_tests.rs @@ -376,6 +376,7 @@ fn validate_emitted_image_url_rejects_non_data_scheme() { fn summarize_tool_call_response_for_multimodal_custom_output() { let response = ResponseInputItem::CustomToolCallOutput { call_id: "call-1".to_string(), + name: None, output: FunctionCallOutputPayload::from_content_items(vec![ FunctionCallOutputContentItem::InputImage { image_url: "data:image/png;base64,abcd".to_string(), diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index 5a6a22b6e..35dee3fa7 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -389,6 +389,7 @@ async fn resume_replays_legacy_js_repl_image_rollout_shapes() { timestamp: "2024-01-01T00:00:02.000Z".to_string(), item: RolloutItem::ResponseItem(ResponseItem::CustomToolCallOutput { call_id: "legacy-js-call".to_string(), + name: None, output: FunctionCallOutputPayload::from_text("legacy js_repl stdout".to_string()), }), }, @@ -546,6 +547,7 @@ async fn resume_replays_image_tool_outputs_with_detail() { timestamp: "2024-01-01T00:00:02.500Z".to_string(), item: RolloutItem::ResponseItem(ResponseItem::CustomToolCallOutput { call_id: custom_call_id.to_string(), + name: None, output: FunctionCallOutputPayload::from_content_items(vec![ FunctionCallOutputContentItem::InputImage { image_url: image_url.to_string(), @@ -1898,6 +1900,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() { }); prompt.input.push(ResponseItem::CustomToolCallOutput { call_id: "custom-tool-call-id".into(), + name: None, output: FunctionCallOutputPayload::from_text("ok".into()), }); diff --git a/codex-rs/core/tests/suite/code_mode.rs b/codex-rs/core/tests/suite/code_mode.rs index 6c5995f5c..53e3d9e8c 100644 --- a/codex-rs/core/tests/suite/code_mode.rs +++ b/codex-rs/core/tests/suite/code_mode.rs @@ -1551,6 +1551,44 @@ text({ json: true }); Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn code_mode_notify_injects_additional_exec_tool_output_into_active_context() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = responses::start_mock_server().await; + let (_test, second_mock) = run_code_mode_turn( + &server, + "use exec notify helper", + r#" +notify("code_mode_notify_marker"); +await tools.test_sync_tool({}); +text("done"); +"#, + false, + ) + .await?; + + let req = second_mock.single_request(); + let has_notify_output = req + .inputs_of_type("custom_tool_call_output") + .iter() + .any(|item| { + item.get("call_id").and_then(serde_json::Value::as_str) == Some("call-1") + && item + .get("output") + .and_then(serde_json::Value::as_str) + .is_some_and(|text| text.contains("code_mode_notify_marker")) + && item.get("name").and_then(serde_json::Value::as_str) == Some("exec") + }); + assert!( + has_notify_output, + "expected notify marker in custom_tool_call_output item: {:?}", + req.input() + ); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn code_mode_exit_stops_script_immediately() -> Result<()> { skip_if_no_network!(Ok(())); @@ -1957,6 +1995,7 @@ text(JSON.stringify(Object.getOwnPropertyNames(globalThis).sort())); "isFinite", "isNaN", "load", + "notify", "parseFloat", "parseInt", "store", diff --git a/codex-rs/protocol/src/models.rs b/codex-rs/protocol/src/models.rs index a2c337d24..79e3991ea 100644 --- a/codex-rs/protocol/src/models.rs +++ b/codex-rs/protocol/src/models.rs @@ -241,6 +241,9 @@ pub enum ResponseInputItem { }, CustomToolCallOutput { call_id: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + #[ts(optional)] + name: Option, #[ts(as = "FunctionCallOutputBody")] #[schemars(with = "FunctionCallOutputBody")] output: FunctionCallOutputPayload, @@ -382,6 +385,9 @@ pub enum ResponseItem { // text or structured content items. CustomToolCallOutput { call_id: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + #[ts(optional)] + name: Option, #[ts(as = "FunctionCallOutputBody")] #[schemars(with = "FunctionCallOutputBody")] output: FunctionCallOutputPayload, @@ -1008,9 +1014,15 @@ impl From for ResponseItem { let output = output.into_function_call_output_payload(); Self::FunctionCallOutput { call_id, output } } - ResponseInputItem::CustomToolCallOutput { call_id, output } => { - Self::CustomToolCallOutput { call_id, output } - } + ResponseInputItem::CustomToolCallOutput { + call_id, + name, + output, + } => Self::CustomToolCallOutput { + call_id, + name, + output, + }, ResponseInputItem::ToolSearchOutput { call_id, status, @@ -2392,6 +2404,7 @@ mod tests { fn serializes_custom_tool_image_outputs_as_array() -> Result<()> { let item = ResponseInputItem::CustomToolCallOutput { call_id: "call1".into(), + name: None, output: FunctionCallOutputPayload::from_content_items(vec![ FunctionCallOutputContentItem::InputImage { image_url: "data:image/png;base64,BASE64".into(),