diff --git a/codex-rs/core/src/tools/js_repl/kernel.js b/codex-rs/core/src/tools/js_repl/kernel.js index 7fd1cbc9c..b54b26f37 100644 --- a/codex-rs/core/src/tools/js_repl/kernel.js +++ b/codex-rs/core/src/tools/js_repl/kernel.js @@ -6,7 +6,6 @@ const { Buffer } = require("node:buffer"); const crypto = require("node:crypto"); const fs = require("node:fs"); const { builtinModules, createRequire } = require("node:module"); -const { createInterface } = require("node:readline"); const { performance } = require("node:perf_hooks"); const path = require("node:path"); const { URL, URLSearchParams, fileURLToPath, pathToFileURL } = require( @@ -1659,6 +1658,7 @@ function handleEmitImageResult(message) { } let queue = Promise.resolve(); +let pendingInputSegments = []; process.on("uncaughtException", (error) => { scheduleFatalExit("uncaught exception", error); @@ -1668,8 +1668,7 @@ process.on("unhandledRejection", (reason) => { scheduleFatalExit("unhandled rejection", reason); }); -const input = createInterface({ input: process.stdin, crlfDelay: Infinity }); -input.on("line", (line) => { +function handleInputLine(line) { if (!line.trim()) { return; } @@ -1692,4 +1691,49 @@ input.on("line", (line) => { if (message.type === "emit_image_result") { handleEmitImageResult(message); } +} + +function takePendingInputFrame() { + if (pendingInputSegments.length === 0) { + return null; + } + + // Keep raw stdin chunks queued until a full JSONL frame is ready so we only + // assemble the frame bytes once. + const frame = + pendingInputSegments.length === 1 + ? pendingInputSegments[0] + : Buffer.concat(pendingInputSegments); + pendingInputSegments = []; + return frame; +} + +function handleInputFrame(frame) { + if (!frame) { + return; + } + + if (frame[frame.length - 1] === 0x0d) { + frame = frame.subarray(0, frame.length - 1); + } + handleInputLine(frame.toString("utf8")); +} + +process.stdin.on("data", (chunk) => { + const input = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); + let segmentStart = 0; + let frameEnd = input.indexOf(0x0a); + while (frameEnd !== -1) { + pendingInputSegments.push(input.subarray(segmentStart, frameEnd)); + handleInputFrame(takePendingInputFrame()); + segmentStart = frameEnd + 1; + frameEnd = input.indexOf(0x0a, segmentStart); + } + if (segmentStart < input.length) { + pendingInputSegments.push(input.subarray(segmentStart)); + } +}); + +process.stdin.on("end", () => { + handleInputFrame(takePendingInputFrame()); }); diff --git a/codex-rs/core/src/tools/js_repl/mod_tests.rs b/codex-rs/core/src/tools/js_repl/mod_tests.rs index 2ea0e67f6..f3e9f384f 100644 --- a/codex-rs/core/src/tools/js_repl/mod_tests.rs +++ b/codex-rs/core/src/tools/js_repl/mod_tests.rs @@ -1532,6 +1532,95 @@ await codex.emitImage(out); Ok(()) } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn js_repl_dynamic_tool_response_preserves_js_line_separator_text() -> anyhow::Result<()> { + if !can_run_js_repl_runtime_tests().await { + return Ok(()); + } + + for (tool_name, description, expected_text, literal) in [ + ( + "line_separator_tool", + "Returns text containing U+2028.", + "alpha\u{2028}omega".to_string(), + r#""alpha\u2028omega""#, + ), + ( + "paragraph_separator_tool", + "Returns text containing U+2029.", + "alpha\u{2029}omega".to_string(), + r#""alpha\u2029omega""#, + ), + ] { + let (session, turn, rx_event) = + make_session_and_context_with_dynamic_tools_and_rx(vec![DynamicToolSpec { + name: tool_name.to_string(), + description: description.to_string(), + input_schema: serde_json::json!({ + "type": "object", + "properties": {}, + "additionalProperties": false + }), + }]) + .await; + + *session.active_turn.lock().await = Some(crate::state::ActiveTurn::default()); + + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let manager = turn.js_repl.manager().await?; + let code = format!( + r#" +const out = await codex.tool("{tool_name}", {{}}); +const text = typeof out === "string" ? out : out?.output; +console.log(text === {literal}); +console.log(text); +"# + ); + + let session_for_response = Arc::clone(&session); + let expected_text_for_response = expected_text.clone(); + let response_watcher = async move { + loop { + let event = tokio::time::timeout(Duration::from_secs(2), rx_event.recv()).await??; + if let EventMsg::DynamicToolCallRequest(request) = event.msg { + session_for_response + .notify_dynamic_tool_response( + &request.call_id, + DynamicToolResponse { + content_items: vec![DynamicToolCallOutputContentItem::InputText { + text: expected_text_for_response.clone(), + }], + success: true, + }, + ) + .await; + return Ok::<(), anyhow::Error>(()); + } + } + }; + + let (result, response_watcher_result) = tokio::join!( + manager.execute( + Arc::clone(&session), + Arc::clone(&turn), + tracker, + JsReplArgs { + code, + timeout_ms: Some(15_000), + }, + ), + response_watcher, + ); + response_watcher_result?; + + let result = result?; + assert_eq!(result.output, format!("true\n{expected_text}")); + } + + Ok(()) +} + #[tokio::test] async fn js_repl_prefers_env_node_module_dirs_over_config() -> anyhow::Result<()> { if !can_run_js_repl_runtime_tests().await {