Fix js_repl hangs on U+2028/U+2029 dynamic tool responses (#14421)
## Summary Dynamic tool responses containing literal U+2028 / U+2029 would cause await codex.tool(...) to hang even though the response had already arrived. This PR replaces the kernel’s readline-based stdin handling with byte-oriented JSONL framing that handles these characters properly. ## Testing - `cargo test -p codex-core` - tested the binary on a repro case and confirmed it's fixed --------- Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
parent
09ba6b47ae
commit
f35d46002a
2 changed files with 136 additions and 3 deletions
|
|
@ -6,7 +6,6 @@ const { Buffer } = require("node:buffer");
|
|||
const crypto = require("node:crypto");
|
||||
const fs = require("node:fs");
|
||||
const { builtinModules, createRequire } = require("node:module");
|
||||
const { createInterface } = require("node:readline");
|
||||
const { performance } = require("node:perf_hooks");
|
||||
const path = require("node:path");
|
||||
const { URL, URLSearchParams, fileURLToPath, pathToFileURL } = require(
|
||||
|
|
@ -1659,6 +1658,7 @@ function handleEmitImageResult(message) {
|
|||
}
|
||||
|
||||
let queue = Promise.resolve();
|
||||
let pendingInputSegments = [];
|
||||
|
||||
process.on("uncaughtException", (error) => {
|
||||
scheduleFatalExit("uncaught exception", error);
|
||||
|
|
@ -1668,8 +1668,7 @@ process.on("unhandledRejection", (reason) => {
|
|||
scheduleFatalExit("unhandled rejection", reason);
|
||||
});
|
||||
|
||||
const input = createInterface({ input: process.stdin, crlfDelay: Infinity });
|
||||
input.on("line", (line) => {
|
||||
function handleInputLine(line) {
|
||||
if (!line.trim()) {
|
||||
return;
|
||||
}
|
||||
|
|
@ -1692,4 +1691,49 @@ input.on("line", (line) => {
|
|||
if (message.type === "emit_image_result") {
|
||||
handleEmitImageResult(message);
|
||||
}
|
||||
}
|
||||
|
||||
function takePendingInputFrame() {
|
||||
if (pendingInputSegments.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Keep raw stdin chunks queued until a full JSONL frame is ready so we only
|
||||
// assemble the frame bytes once.
|
||||
const frame =
|
||||
pendingInputSegments.length === 1
|
||||
? pendingInputSegments[0]
|
||||
: Buffer.concat(pendingInputSegments);
|
||||
pendingInputSegments = [];
|
||||
return frame;
|
||||
}
|
||||
|
||||
function handleInputFrame(frame) {
|
||||
if (!frame) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (frame[frame.length - 1] === 0x0d) {
|
||||
frame = frame.subarray(0, frame.length - 1);
|
||||
}
|
||||
handleInputLine(frame.toString("utf8"));
|
||||
}
|
||||
|
||||
process.stdin.on("data", (chunk) => {
|
||||
const input = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
|
||||
let segmentStart = 0;
|
||||
let frameEnd = input.indexOf(0x0a);
|
||||
while (frameEnd !== -1) {
|
||||
pendingInputSegments.push(input.subarray(segmentStart, frameEnd));
|
||||
handleInputFrame(takePendingInputFrame());
|
||||
segmentStart = frameEnd + 1;
|
||||
frameEnd = input.indexOf(0x0a, segmentStart);
|
||||
}
|
||||
if (segmentStart < input.length) {
|
||||
pendingInputSegments.push(input.subarray(segmentStart));
|
||||
}
|
||||
});
|
||||
|
||||
process.stdin.on("end", () => {
|
||||
handleInputFrame(takePendingInputFrame());
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1532,6 +1532,95 @@ await codex.emitImage(out);
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn js_repl_dynamic_tool_response_preserves_js_line_separator_text() -> anyhow::Result<()> {
|
||||
if !can_run_js_repl_runtime_tests().await {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
for (tool_name, description, expected_text, literal) in [
|
||||
(
|
||||
"line_separator_tool",
|
||||
"Returns text containing U+2028.",
|
||||
"alpha\u{2028}omega".to_string(),
|
||||
r#""alpha\u2028omega""#,
|
||||
),
|
||||
(
|
||||
"paragraph_separator_tool",
|
||||
"Returns text containing U+2029.",
|
||||
"alpha\u{2029}omega".to_string(),
|
||||
r#""alpha\u2029omega""#,
|
||||
),
|
||||
] {
|
||||
let (session, turn, rx_event) =
|
||||
make_session_and_context_with_dynamic_tools_and_rx(vec![DynamicToolSpec {
|
||||
name: tool_name.to_string(),
|
||||
description: description.to_string(),
|
||||
input_schema: serde_json::json!({
|
||||
"type": "object",
|
||||
"properties": {},
|
||||
"additionalProperties": false
|
||||
}),
|
||||
}])
|
||||
.await;
|
||||
|
||||
*session.active_turn.lock().await = Some(crate::state::ActiveTurn::default());
|
||||
|
||||
let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default()));
|
||||
let manager = turn.js_repl.manager().await?;
|
||||
let code = format!(
|
||||
r#"
|
||||
const out = await codex.tool("{tool_name}", {{}});
|
||||
const text = typeof out === "string" ? out : out?.output;
|
||||
console.log(text === {literal});
|
||||
console.log(text);
|
||||
"#
|
||||
);
|
||||
|
||||
let session_for_response = Arc::clone(&session);
|
||||
let expected_text_for_response = expected_text.clone();
|
||||
let response_watcher = async move {
|
||||
loop {
|
||||
let event = tokio::time::timeout(Duration::from_secs(2), rx_event.recv()).await??;
|
||||
if let EventMsg::DynamicToolCallRequest(request) = event.msg {
|
||||
session_for_response
|
||||
.notify_dynamic_tool_response(
|
||||
&request.call_id,
|
||||
DynamicToolResponse {
|
||||
content_items: vec![DynamicToolCallOutputContentItem::InputText {
|
||||
text: expected_text_for_response.clone(),
|
||||
}],
|
||||
success: true,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
return Ok::<(), anyhow::Error>(());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let (result, response_watcher_result) = tokio::join!(
|
||||
manager.execute(
|
||||
Arc::clone(&session),
|
||||
Arc::clone(&turn),
|
||||
tracker,
|
||||
JsReplArgs {
|
||||
code,
|
||||
timeout_ms: Some(15_000),
|
||||
},
|
||||
),
|
||||
response_watcher,
|
||||
);
|
||||
response_watcher_result?;
|
||||
|
||||
let result = result?;
|
||||
assert_eq!(result.output, format!("true\n{expected_text}"));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn js_repl_prefers_env_node_module_dirs_over_config() -> anyhow::Result<()> {
|
||||
if !can_run_js_repl_runtime_tests().await {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue