diff --git a/AGENTS.md b/AGENTS.md index 292e4c92d..62f303769 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -158,3 +158,5 @@ These guidelines apply to app-server protocol work in `codex-rs`, especially: `just write-app-server-schema` (and `just write-app-server-schema --experimental` when experimental API fixtures are affected). - Validate with `cargo test -p codex-app-server-protocol`. +- Avoid boilerplate tests that only assert experimental field markers for individual + request fields in `common.rs`; rely on schema generation/tests and behavioral coverage instead. diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index f0ddba0f6..ad6ba0910 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1401,6 +1401,7 @@ dependencies = [ "schemars 0.8.22", "serde", "serde_json", + "shlex", "similar", "strum_macros 0.27.2", "tempfile", diff --git a/codex-rs/app-server-protocol/Cargo.toml b/codex-rs/app-server-protocol/Cargo.toml index 7b2b8c53d..2ab6f291f 100644 --- a/codex-rs/app-server-protocol/Cargo.toml +++ b/codex-rs/app-server-protocol/Cargo.toml @@ -20,6 +20,7 @@ codex-utils-absolute-path = { workspace = true } schemars = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } +shlex = { workspace = true } strum_macros = { workspace = true } thiserror = { workspace = true } ts-rs = { workspace = true } diff --git a/codex-rs/app-server-protocol/schema/json/EventMsg.json b/codex-rs/app-server-protocol/schema/json/EventMsg.json index bdeb06b7b..c0a502899 100644 --- a/codex-rs/app-server-protocol/schema/json/EventMsg.json +++ b/codex-rs/app-server-protocol/schema/json/EventMsg.json @@ -1349,6 +1349,14 @@ "default": "agent", "description": "Where the command originated. Defaults to Agent for backward compatibility." }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/ExecCommandStatus" + } + ], + "description": "Completion status for this command execution." + }, "stderr": { "description": "Captured stderr", "type": "string" @@ -1377,6 +1385,7 @@ "exit_code", "formatted_output", "parsed_cmd", + "status", "stderr", "stdout", "turn_id", @@ -1805,6 +1814,14 @@ "description": "The changes that were applied (mirrors PatchApplyBeginEvent::changes).", "type": "object" }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/PatchApplyStatus" + } + ], + "description": "Completion status for this patch application." + }, "stderr": { "description": "Captured stderr (parser errors, IO failures, etc.).", "type": "string" @@ -1832,6 +1849,7 @@ }, "required": [ "call_id", + "status", "stderr", "stdout", "success", @@ -2873,6 +2891,14 @@ ], "type": "string" }, + "ExecCommandStatus": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "ExecOutputStream": { "enum": [ "stdout", @@ -3400,6 +3426,14 @@ } ] }, + "PatchApplyStatus": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "PlanItemArg": { "additionalProperties": false, "properties": { @@ -6185,6 +6219,14 @@ "default": "agent", "description": "Where the command originated. Defaults to Agent for backward compatibility." }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/ExecCommandStatus" + } + ], + "description": "Completion status for this command execution." + }, "stderr": { "description": "Captured stderr", "type": "string" @@ -6213,6 +6255,7 @@ "exit_code", "formatted_output", "parsed_cmd", + "status", "stderr", "stdout", "turn_id", @@ -6641,6 +6684,14 @@ "description": "The changes that were applied (mirrors PatchApplyBeginEvent::changes).", "type": "object" }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/PatchApplyStatus" + } + ], + "description": "Completion status for this patch application." + }, "stderr": { "description": "Captured stderr (parser errors, IO failures, etc.).", "type": "string" @@ -6668,6 +6719,7 @@ }, "required": [ "call_id", + "status", "stderr", "stdout", "success", diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index dcdc315d9..1b312ea1e 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -1965,6 +1965,14 @@ "default": "agent", "description": "Where the command originated. Defaults to Agent for backward compatibility." }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/ExecCommandStatus" + } + ], + "description": "Completion status for this command execution." + }, "stderr": { "description": "Captured stderr", "type": "string" @@ -1993,6 +2001,7 @@ "exit_code", "formatted_output", "parsed_cmd", + "status", "stderr", "stdout", "turn_id", @@ -2421,6 +2430,14 @@ "description": "The changes that were applied (mirrors PatchApplyBeginEvent::changes).", "type": "object" }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/PatchApplyStatus2" + } + ], + "description": "Completion status for this patch application." + }, "stderr": { "description": "Captured stderr (parser errors, IO failures, etc.).", "type": "string" @@ -2448,6 +2465,7 @@ }, "required": [ "call_id", + "status", "stderr", "stdout", "success", @@ -3489,6 +3507,14 @@ ], "type": "string" }, + "ExecCommandStatus": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "ExecOutputStream": { "enum": [ "stdout", @@ -4285,6 +4311,14 @@ ], "type": "string" }, + "PatchApplyStatus2": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "PatchChangeKind": { "oneOf": [ { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index f7be9db0d..e124325f0 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -3362,6 +3362,14 @@ "default": "agent", "description": "Where the command originated. Defaults to Agent for backward compatibility." }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/ExecCommandStatus" + } + ], + "description": "Completion status for this command execution." + }, "stderr": { "description": "Captured stderr", "type": "string" @@ -3390,6 +3398,7 @@ "exit_code", "formatted_output", "parsed_cmd", + "status", "stderr", "stdout", "turn_id", @@ -3818,6 +3827,14 @@ "description": "The changes that were applied (mirrors PatchApplyBeginEvent::changes).", "type": "object" }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/v2/PatchApplyStatus" + } + ], + "description": "Completion status for this patch application." + }, "stderr": { "description": "Captured stderr (parser errors, IO failures, etc.).", "type": "string" @@ -3845,6 +3862,7 @@ }, "required": [ "call_id", + "status", "stderr", "stdout", "success", @@ -4942,6 +4960,14 @@ ], "type": "string" }, + "ExecCommandStatus": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "ExecOneOffCommandParams": { "$schema": "http://json-schema.org/draft-07/schema#", "properties": { @@ -6446,6 +6472,14 @@ } ] }, + "PatchApplyStatus": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "PlanItemArg": { "additionalProperties": false, "properties": { diff --git a/codex-rs/app-server-protocol/schema/json/v1/ForkConversationResponse.json b/codex-rs/app-server-protocol/schema/json/v1/ForkConversationResponse.json index 9c9b52d4e..7d0d26c05 100644 --- a/codex-rs/app-server-protocol/schema/json/v1/ForkConversationResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v1/ForkConversationResponse.json @@ -1349,6 +1349,14 @@ "default": "agent", "description": "Where the command originated. Defaults to Agent for backward compatibility." }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/ExecCommandStatus" + } + ], + "description": "Completion status for this command execution." + }, "stderr": { "description": "Captured stderr", "type": "string" @@ -1377,6 +1385,7 @@ "exit_code", "formatted_output", "parsed_cmd", + "status", "stderr", "stdout", "turn_id", @@ -1805,6 +1814,14 @@ "description": "The changes that were applied (mirrors PatchApplyBeginEvent::changes).", "type": "object" }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/PatchApplyStatus" + } + ], + "description": "Completion status for this patch application." + }, "stderr": { "description": "Captured stderr (parser errors, IO failures, etc.).", "type": "string" @@ -1832,6 +1849,7 @@ }, "required": [ "call_id", + "status", "stderr", "stdout", "success", @@ -2873,6 +2891,14 @@ ], "type": "string" }, + "ExecCommandStatus": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "ExecOutputStream": { "enum": [ "stdout", @@ -3400,6 +3426,14 @@ } ] }, + "PatchApplyStatus": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "PlanItemArg": { "additionalProperties": false, "properties": { diff --git a/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationResponse.json b/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationResponse.json index 01bb2bbc2..4b1e90815 100644 --- a/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationResponse.json @@ -1349,6 +1349,14 @@ "default": "agent", "description": "Where the command originated. Defaults to Agent for backward compatibility." }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/ExecCommandStatus" + } + ], + "description": "Completion status for this command execution." + }, "stderr": { "description": "Captured stderr", "type": "string" @@ -1377,6 +1385,7 @@ "exit_code", "formatted_output", "parsed_cmd", + "status", "stderr", "stdout", "turn_id", @@ -1805,6 +1814,14 @@ "description": "The changes that were applied (mirrors PatchApplyBeginEvent::changes).", "type": "object" }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/PatchApplyStatus" + } + ], + "description": "Completion status for this patch application." + }, "stderr": { "description": "Captured stderr (parser errors, IO failures, etc.).", "type": "string" @@ -1832,6 +1849,7 @@ }, "required": [ "call_id", + "status", "stderr", "stdout", "success", @@ -2873,6 +2891,14 @@ ], "type": "string" }, + "ExecCommandStatus": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "ExecOutputStream": { "enum": [ "stdout", @@ -3400,6 +3426,14 @@ } ] }, + "PatchApplyStatus": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "PlanItemArg": { "additionalProperties": false, "properties": { diff --git a/codex-rs/app-server-protocol/schema/json/v1/SessionConfiguredNotification.json b/codex-rs/app-server-protocol/schema/json/v1/SessionConfiguredNotification.json index f68ec3331..c9fb2c8e3 100644 --- a/codex-rs/app-server-protocol/schema/json/v1/SessionConfiguredNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v1/SessionConfiguredNotification.json @@ -1349,6 +1349,14 @@ "default": "agent", "description": "Where the command originated. Defaults to Agent for backward compatibility." }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/ExecCommandStatus" + } + ], + "description": "Completion status for this command execution." + }, "stderr": { "description": "Captured stderr", "type": "string" @@ -1377,6 +1385,7 @@ "exit_code", "formatted_output", "parsed_cmd", + "status", "stderr", "stdout", "turn_id", @@ -1805,6 +1814,14 @@ "description": "The changes that were applied (mirrors PatchApplyBeginEvent::changes).", "type": "object" }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/PatchApplyStatus" + } + ], + "description": "Completion status for this patch application." + }, "stderr": { "description": "Captured stderr (parser errors, IO failures, etc.).", "type": "string" @@ -1832,6 +1849,7 @@ }, "required": [ "call_id", + "status", "stderr", "stdout", "success", @@ -2873,6 +2891,14 @@ ], "type": "string" }, + "ExecCommandStatus": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "ExecOutputStream": { "enum": [ "stdout", @@ -3400,6 +3426,14 @@ } ] }, + "PatchApplyStatus": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "PlanItemArg": { "additionalProperties": false, "properties": { diff --git a/codex-rs/app-server-protocol/schema/typescript/ExecCommandEndEvent.ts b/codex-rs/app-server-protocol/schema/typescript/ExecCommandEndEvent.ts index c9b465e45..0bfc41ea8 100644 --- a/codex-rs/app-server-protocol/schema/typescript/ExecCommandEndEvent.ts +++ b/codex-rs/app-server-protocol/schema/typescript/ExecCommandEndEvent.ts @@ -2,6 +2,7 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { ExecCommandSource } from "./ExecCommandSource"; +import type { ExecCommandStatus } from "./ExecCommandStatus"; import type { ParsedCommand } from "./ParsedCommand"; export type ExecCommandEndEvent = { @@ -56,4 +57,8 @@ duration: string, /** * Formatted output from the command, as seen by the model. */ -formatted_output: string, }; +formatted_output: string, +/** + * Completion status for this command execution. + */ +status: ExecCommandStatus, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/ExecCommandStatus.ts b/codex-rs/app-server-protocol/schema/typescript/ExecCommandStatus.ts new file mode 100644 index 000000000..d8d91fb19 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/ExecCommandStatus.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type ExecCommandStatus = "completed" | "failed" | "declined"; diff --git a/codex-rs/app-server-protocol/schema/typescript/PatchApplyEndEvent.ts b/codex-rs/app-server-protocol/schema/typescript/PatchApplyEndEvent.ts index d52940af1..9dacb00e4 100644 --- a/codex-rs/app-server-protocol/schema/typescript/PatchApplyEndEvent.ts +++ b/codex-rs/app-server-protocol/schema/typescript/PatchApplyEndEvent.ts @@ -2,6 +2,7 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { FileChange } from "./FileChange"; +import type { PatchApplyStatus } from "./PatchApplyStatus"; export type PatchApplyEndEvent = { /** @@ -28,4 +29,8 @@ success: boolean, /** * The changes that were applied (mirrors PatchApplyBeginEvent::changes). */ -changes: { [key in string]?: FileChange }, }; +changes: { [key in string]?: FileChange }, +/** + * Completion status for this patch application. + */ +status: PatchApplyStatus, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/PatchApplyStatus.ts b/codex-rs/app-server-protocol/schema/typescript/PatchApplyStatus.ts new file mode 100644 index 000000000..721fcd9b1 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/PatchApplyStatus.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type PatchApplyStatus = "completed" | "failed" | "declined"; diff --git a/codex-rs/app-server-protocol/schema/typescript/index.ts b/codex-rs/app-server-protocol/schema/typescript/index.ts index 62a1718f9..2e7446a3d 100644 --- a/codex-rs/app-server-protocol/schema/typescript/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/index.ts @@ -62,6 +62,7 @@ export type { ExecCommandBeginEvent } from "./ExecCommandBeginEvent"; export type { ExecCommandEndEvent } from "./ExecCommandEndEvent"; export type { ExecCommandOutputDeltaEvent } from "./ExecCommandOutputDeltaEvent"; export type { ExecCommandSource } from "./ExecCommandSource"; +export type { ExecCommandStatus } from "./ExecCommandStatus"; export type { ExecOneOffCommandParams } from "./ExecOneOffCommandParams"; export type { ExecOneOffCommandResponse } from "./ExecOneOffCommandResponse"; export type { ExecOutputStream } from "./ExecOutputStream"; @@ -129,6 +130,7 @@ export type { NewConversationResponse } from "./NewConversationResponse"; export type { ParsedCommand } from "./ParsedCommand"; export type { PatchApplyBeginEvent } from "./PatchApplyBeginEvent"; export type { PatchApplyEndEvent } from "./PatchApplyEndEvent"; +export type { PatchApplyStatus } from "./PatchApplyStatus"; export type { Personality } from "./Personality"; export type { PlanDeltaEvent } from "./PlanDeltaEvent"; export type { PlanItem } from "./PlanItem"; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts index 44c81a24e..742e4d703 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts @@ -21,4 +21,8 @@ export type ThreadForkParams = {threadId: string, /** path?: string | null, /** * Configuration overrides for the forked thread, if any. */ -model?: string | null, modelProvider?: string | null, cwd?: string | null, approvalPolicy?: AskForApproval | null, sandbox?: SandboxMode | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null}; +model?: string | null, modelProvider?: string | null, cwd?: string | null, approvalPolicy?: AskForApproval | null, sandbox?: SandboxMode | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null, /** + * If true, persist additional rollout EventMsg variants required to + * reconstruct a richer thread history on subsequent resume/fork/read. + */ +persistExtendedHistory: boolean}; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts index c868b8f95..e79c748cc 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts @@ -30,4 +30,8 @@ history?: Array | null, /** path?: string | null, /** * Configuration overrides for the resumed thread, if any. */ -model?: string | null, modelProvider?: string | null, cwd?: string | null, approvalPolicy?: AskForApproval | null, sandbox?: SandboxMode | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null, personality?: Personality | null}; +model?: string | null, modelProvider?: string | null, cwd?: string | null, approvalPolicy?: AskForApproval | null, sandbox?: SandboxMode | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null, personality?: Personality | null, /** + * If true, persist additional rollout EventMsg variants required to + * reconstruct a richer thread history on subsequent resume/fork/read. + */ +persistExtendedHistory: boolean}; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadStartParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadStartParams.ts index 84ad633d5..ed5e89e73 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadStartParams.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadStartParams.ts @@ -10,4 +10,8 @@ export type ThreadStartParams = {model?: string | null, modelProvider?: string | * If true, opt into emitting raw Responses API items on the event stream. * This is for internal use only (e.g. Codex Cloud). */ -experimentalRawEvents: boolean}; +experimentalRawEvents: boolean, /** + * If true, persist additional rollout EventMsg variants required to + * reconstruct a richer thread history on resume/fork/read. + */ +persistExtendedHistory: boolean}; diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 71d75a156..bc9134116 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -1323,17 +1323,4 @@ mod tests { let reason = crate::experimental_api::ExperimentalApi::experimental_reason(&request); assert_eq!(reason, Some("mock/experimentalMethod")); } - - #[test] - fn thread_start_mock_field_is_marked_experimental() { - let request = ClientRequest::ThreadStart { - request_id: RequestId::Integer(1), - params: v2::ThreadStartParams { - mock_experimental_field: Some("mock".to_string()), - ..Default::default() - }, - }; - let reason = crate::experimental_api::ExperimentalApi::experimental_reason(&request); - assert_eq!(reason, Some("thread/start.mockExperimentalField")); - } } diff --git a/codex-rs/app-server-protocol/src/protocol/thread_history.rs b/codex-rs/app-server-protocol/src/protocol/thread_history.rs index 231ee99b2..a54b550fd 100644 --- a/codex-rs/app-server-protocol/src/protocol/thread_history.rs +++ b/codex-rs/app-server-protocol/src/protocol/thread_history.rs @@ -1,21 +1,49 @@ +use crate::protocol::v2::CollabAgentState; +use crate::protocol::v2::CollabAgentTool; +use crate::protocol::v2::CollabAgentToolCallStatus; +use crate::protocol::v2::CommandAction; +use crate::protocol::v2::CommandExecutionStatus; +use crate::protocol::v2::FileUpdateChange; +use crate::protocol::v2::McpToolCallError; +use crate::protocol::v2::McpToolCallResult; +use crate::protocol::v2::McpToolCallStatus; +use crate::protocol::v2::PatchApplyStatus; +use crate::protocol::v2::PatchChangeKind; use crate::protocol::v2::ThreadItem; use crate::protocol::v2::Turn; +use crate::protocol::v2::TurnError as V2TurnError; use crate::protocol::v2::TurnError; use crate::protocol::v2::TurnStatus; use crate::protocol::v2::UserInput; +use crate::protocol::v2::WebSearchAction; use codex_protocol::protocol::AgentReasoningEvent; use codex_protocol::protocol::AgentReasoningRawContentEvent; +use codex_protocol::protocol::AgentStatus; use codex_protocol::protocol::CompactedItem; +use codex_protocol::protocol::ContextCompactedEvent; +use codex_protocol::protocol::ErrorEvent; use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::ExecCommandEndEvent; use codex_protocol::protocol::ItemCompletedEvent; +use codex_protocol::protocol::McpToolCallEndEvent; +use codex_protocol::protocol::PatchApplyEndEvent; +use codex_protocol::protocol::ReviewOutputEvent; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::ThreadRolledBackEvent; use codex_protocol::protocol::TurnAbortedEvent; use codex_protocol::protocol::TurnCompleteEvent; use codex_protocol::protocol::TurnStartedEvent; use codex_protocol::protocol::UserMessageEvent; +use codex_protocol::protocol::ViewImageToolCallEvent; +use codex_protocol::protocol::WebSearchEndEvent; +use std::collections::HashMap; use uuid::Uuid; +#[cfg(test)] +use codex_protocol::protocol::ExecCommandStatus as CoreExecCommandStatus; +#[cfg(test)] +use codex_protocol::protocol::PatchApplyStatus as CorePatchApplyStatus; + /// Convert persisted [`RolloutItem`] entries into a sequence of [`Turn`] values. /// /// When available, this uses `TurnContext.turn_id` as the canonical turn id so @@ -58,10 +86,24 @@ impl ThreadHistoryBuilder { EventMsg::AgentReasoningRawContent(payload) => { self.handle_agent_reasoning_raw_content(payload) } + EventMsg::WebSearchEnd(payload) => self.handle_web_search_end(payload), + EventMsg::ExecCommandEnd(payload) => self.handle_exec_command_end(payload), + EventMsg::PatchApplyEnd(payload) => self.handle_patch_apply_end(payload), + EventMsg::McpToolCallEnd(payload) => self.handle_mcp_tool_call_end(payload), + EventMsg::ViewImageToolCall(payload) => self.handle_view_image_tool_call(payload), + EventMsg::CollabAgentSpawnEnd(payload) => self.handle_collab_agent_spawn_end(payload), + EventMsg::CollabAgentInteractionEnd(payload) => { + self.handle_collab_agent_interaction_end(payload) + } + EventMsg::CollabWaitingEnd(payload) => self.handle_collab_waiting_end(payload), + EventMsg::CollabCloseEnd(payload) => self.handle_collab_close_end(payload), + EventMsg::CollabResumeEnd(payload) => self.handle_collab_resume_end(payload), + EventMsg::ContextCompacted(payload) => self.handle_context_compacted(payload), + EventMsg::EnteredReviewMode(payload) => self.handle_entered_review_mode(payload), + EventMsg::ExitedReviewMode(payload) => self.handle_exited_review_mode(payload), EventMsg::ItemCompleted(payload) => self.handle_item_completed(payload), + EventMsg::Error(payload) => self.handle_error(payload), EventMsg::TokenCount(_) => {} - EventMsg::EnteredReviewMode(_) => {} - EventMsg::ExitedReviewMode(_) => {} EventMsg::ThreadRolledBack(payload) => self.handle_thread_rollback(payload), EventMsg::UndoCompleted(_) => {} EventMsg::TurnAborted(payload) => self.handle_turn_aborted(payload), @@ -153,23 +195,348 @@ impl ThreadHistoryBuilder { } fn handle_item_completed(&mut self, payload: &ItemCompletedEvent) { - if let codex_protocol::items::TurnItem::Plan(plan) = &payload.item { - if plan.text.is_empty() { - return; - } - let id = self.next_item_id(); - self.ensure_turn().items.push(ThreadItem::Plan { - id, - text: plan.text.clone(), - }); + if let codex_protocol::items::TurnItem::Plan(plan) = &payload.item + && plan.text.is_empty() + { + return; } + + let item = ThreadItem::from(payload.item.clone()); + self.ensure_turn().items.push(item); } - fn handle_turn_aborted(&mut self, _payload: &TurnAbortedEvent) { + fn handle_web_search_end(&mut self, payload: &WebSearchEndEvent) { + let item = ThreadItem::WebSearch { + id: payload.call_id.clone(), + query: payload.query.clone(), + action: Some(WebSearchAction::from(payload.action.clone())), + }; + self.ensure_turn().items.push(item); + } + + fn handle_exec_command_end(&mut self, payload: &ExecCommandEndEvent) { + let status: CommandExecutionStatus = (&payload.status).into(); + let duration_ms = i64::try_from(payload.duration.as_millis()).unwrap_or(i64::MAX); + let aggregated_output = if payload.aggregated_output.is_empty() { + None + } else { + Some(payload.aggregated_output.clone()) + }; + let command = shlex::try_join(payload.command.iter().map(String::as_str)) + .unwrap_or_else(|_| payload.command.join(" ")); + let command_actions = payload + .parsed_cmd + .iter() + .cloned() + .map(CommandAction::from) + .collect(); + let item = ThreadItem::CommandExecution { + id: payload.call_id.clone(), + command, + cwd: payload.cwd.clone(), + process_id: payload.process_id.clone(), + status, + command_actions, + aggregated_output, + exit_code: Some(payload.exit_code), + duration_ms: Some(duration_ms), + }; + + // Command completions can arrive out of order. Unified exec may return + // while a PTY is still running, then emit ExecCommandEnd later from a + // background exit watcher when that process finally exits. By then, a + // newer user turn may already have started. Route by event turn_id so + // replay preserves the original turn association. + if let Some(turn) = self.current_turn.as_mut() + && turn.id == payload.turn_id + { + turn.items.push(item); + return; + } + + // If the originating turn is already finalized, append there instead + // of attaching to whichever turn is currently active during replay. + if let Some(turn) = self + .turns + .iter_mut() + .find(|turn| turn.id == payload.turn_id) + { + turn.items.push(item); + return; + } + + // Backward-compatibility fallback for partial/legacy streams where the + // event turn_id does not match any known replay turn. + self.ensure_turn().items.push(item); + } + + fn handle_patch_apply_end(&mut self, payload: &PatchApplyEndEvent) { + let status: PatchApplyStatus = (&payload.status).into(); + let item = ThreadItem::FileChange { + id: payload.call_id.clone(), + changes: convert_patch_changes(&payload.changes), + status, + }; + self.ensure_turn().items.push(item); + } + + fn handle_mcp_tool_call_end(&mut self, payload: &McpToolCallEndEvent) { + let status = if payload.is_success() { + McpToolCallStatus::Completed + } else { + McpToolCallStatus::Failed + }; + let duration_ms = i64::try_from(payload.duration.as_millis()).ok(); + let (result, error) = match &payload.result { + Ok(value) => ( + Some(McpToolCallResult { + content: value.content.clone(), + structured_content: value.structured_content.clone(), + }), + None, + ), + Err(message) => ( + None, + Some(McpToolCallError { + message: message.clone(), + }), + ), + }; + let item = ThreadItem::McpToolCall { + id: payload.call_id.clone(), + server: payload.invocation.server.clone(), + tool: payload.invocation.tool.clone(), + status, + arguments: payload + .invocation + .arguments + .clone() + .unwrap_or(serde_json::Value::Null), + result, + error, + duration_ms, + }; + self.ensure_turn().items.push(item); + } + + fn handle_view_image_tool_call(&mut self, payload: &ViewImageToolCallEvent) { + let item = ThreadItem::ImageView { + id: payload.call_id.clone(), + path: payload.path.to_string_lossy().into_owned(), + }; + self.ensure_turn().items.push(item); + } + + fn handle_collab_agent_spawn_end( + &mut self, + payload: &codex_protocol::protocol::CollabAgentSpawnEndEvent, + ) { + let has_receiver = payload.new_thread_id.is_some(); + let status = match &payload.status { + AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed, + _ if has_receiver => CollabAgentToolCallStatus::Completed, + _ => CollabAgentToolCallStatus::Failed, + }; + let (receiver_thread_ids, agents_states) = match &payload.new_thread_id { + Some(id) => { + let receiver_id = id.to_string(); + let received_status = CollabAgentState::from(payload.status.clone()); + ( + vec![receiver_id.clone()], + [(receiver_id, received_status)].into_iter().collect(), + ) + } + None => (Vec::new(), HashMap::new()), + }; + self.ensure_turn() + .items + .push(ThreadItem::CollabAgentToolCall { + id: payload.call_id.clone(), + tool: CollabAgentTool::SpawnAgent, + status, + sender_thread_id: payload.sender_thread_id.to_string(), + receiver_thread_ids, + prompt: Some(payload.prompt.clone()), + agents_states, + }); + } + + fn handle_collab_agent_interaction_end( + &mut self, + payload: &codex_protocol::protocol::CollabAgentInteractionEndEvent, + ) { + let status = match &payload.status { + AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed, + _ => CollabAgentToolCallStatus::Completed, + }; + let receiver_id = payload.receiver_thread_id.to_string(); + let received_status = CollabAgentState::from(payload.status.clone()); + self.ensure_turn() + .items + .push(ThreadItem::CollabAgentToolCall { + id: payload.call_id.clone(), + tool: CollabAgentTool::SendInput, + status, + sender_thread_id: payload.sender_thread_id.to_string(), + receiver_thread_ids: vec![receiver_id.clone()], + prompt: Some(payload.prompt.clone()), + agents_states: [(receiver_id, received_status)].into_iter().collect(), + }); + } + + fn handle_collab_waiting_end( + &mut self, + payload: &codex_protocol::protocol::CollabWaitingEndEvent, + ) { + let status = if payload + .statuses + .values() + .any(|status| matches!(status, AgentStatus::Errored(_) | AgentStatus::NotFound)) + { + CollabAgentToolCallStatus::Failed + } else { + CollabAgentToolCallStatus::Completed + }; + let mut receiver_thread_ids: Vec = + payload.statuses.keys().map(ToString::to_string).collect(); + receiver_thread_ids.sort(); + let agents_states = payload + .statuses + .iter() + .map(|(id, status)| (id.to_string(), CollabAgentState::from(status.clone()))) + .collect(); + self.ensure_turn() + .items + .push(ThreadItem::CollabAgentToolCall { + id: payload.call_id.clone(), + tool: CollabAgentTool::Wait, + status, + sender_thread_id: payload.sender_thread_id.to_string(), + receiver_thread_ids, + prompt: None, + agents_states, + }); + } + + fn handle_collab_close_end(&mut self, payload: &codex_protocol::protocol::CollabCloseEndEvent) { + let status = match &payload.status { + AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed, + _ => CollabAgentToolCallStatus::Completed, + }; + let receiver_id = payload.receiver_thread_id.to_string(); + let agents_states = [( + receiver_id.clone(), + CollabAgentState::from(payload.status.clone()), + )] + .into_iter() + .collect(); + self.ensure_turn() + .items + .push(ThreadItem::CollabAgentToolCall { + id: payload.call_id.clone(), + tool: CollabAgentTool::CloseAgent, + status, + sender_thread_id: payload.sender_thread_id.to_string(), + receiver_thread_ids: vec![receiver_id], + prompt: None, + agents_states, + }); + } + + fn handle_collab_resume_end( + &mut self, + payload: &codex_protocol::protocol::CollabResumeEndEvent, + ) { + let status = match &payload.status { + AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed, + _ => CollabAgentToolCallStatus::Completed, + }; + let receiver_id = payload.receiver_thread_id.to_string(); + let agents_states = [( + receiver_id.clone(), + CollabAgentState::from(payload.status.clone()), + )] + .into_iter() + .collect(); + self.ensure_turn() + .items + .push(ThreadItem::CollabAgentToolCall { + id: payload.call_id.clone(), + tool: CollabAgentTool::ResumeAgent, + status, + sender_thread_id: payload.sender_thread_id.to_string(), + receiver_thread_ids: vec![receiver_id], + prompt: None, + agents_states, + }); + } + + fn handle_context_compacted(&mut self, _payload: &ContextCompactedEvent) { + let id = self.next_item_id(); + self.ensure_turn() + .items + .push(ThreadItem::ContextCompaction { id }); + } + + fn handle_entered_review_mode(&mut self, payload: &codex_protocol::protocol::ReviewRequest) { + let review = payload + .user_facing_hint + .clone() + .unwrap_or_else(|| "Review requested.".to_string()); + let id = self.next_item_id(); + self.ensure_turn() + .items + .push(ThreadItem::EnteredReviewMode { id, review }); + } + + fn handle_exited_review_mode( + &mut self, + payload: &codex_protocol::protocol::ExitedReviewModeEvent, + ) { + let review = payload + .review_output + .as_ref() + .map(render_review_output_text) + .unwrap_or_else(|| REVIEW_FALLBACK_MESSAGE.to_string()); + let id = self.next_item_id(); + self.ensure_turn() + .items + .push(ThreadItem::ExitedReviewMode { id, review }); + } + + fn handle_error(&mut self, payload: &ErrorEvent) { + if !payload.affects_turn_status() { + return; + } let Some(turn) = self.current_turn.as_mut() else { return; }; - turn.status = TurnStatus::Interrupted; + turn.status = TurnStatus::Failed; + turn.error = Some(V2TurnError { + message: payload.message.clone(), + codex_error_info: payload.codex_error_info.clone().map(Into::into), + additional_details: None, + }); + } + + fn handle_turn_aborted(&mut self, payload: &TurnAbortedEvent) { + if let Some(turn_id) = payload.turn_id.as_deref() { + // Prefer an exact ID match so we interrupt the turn explicitly targeted by the event. + if let Some(turn) = self.current_turn.as_mut().filter(|turn| turn.id == turn_id) { + turn.status = TurnStatus::Interrupted; + return; + } + + if let Some(turn) = self.turns.iter_mut().find(|turn| turn.id == turn_id) { + turn.status = TurnStatus::Interrupted; + return; + } + } + + // If the event has no ID (or refers to an unknown turn), fall back to the active turn. + if let Some(turn) = self.current_turn.as_mut() { + turn.status = TurnStatus::Interrupted; + } } fn handle_turn_started(&mut self, payload: &TurnStartedEvent) { @@ -180,9 +547,36 @@ impl ThreadHistoryBuilder { ); } - fn handle_turn_complete(&mut self, _payload: &TurnCompleteEvent) { + fn handle_turn_complete(&mut self, payload: &TurnCompleteEvent) { + let mark_completed = |status: &mut TurnStatus| { + if matches!(*status, TurnStatus::Completed | TurnStatus::InProgress) { + *status = TurnStatus::Completed; + } + }; + + // Prefer an exact ID match from the active turn and then close it. + if let Some(current_turn) = self + .current_turn + .as_mut() + .filter(|turn| turn.id == payload.turn_id) + { + mark_completed(&mut current_turn.status); + self.finish_current_turn(); + return; + } + + if let Some(turn) = self + .turns + .iter_mut() + .find(|turn| turn.id == payload.turn_id) + { + mark_completed(&mut turn.status); + return; + } + + // If the completion event cannot be matched, apply it to the active turn. if let Some(current_turn) = self.current_turn.as_mut() { - current_turn.status = TurnStatus::Completed; + mark_completed(&mut current_turn.status); self.finish_current_turn(); } } @@ -274,6 +668,59 @@ impl ThreadHistoryBuilder { } } +const REVIEW_FALLBACK_MESSAGE: &str = "Reviewer failed to output a response."; + +fn render_review_output_text(output: &ReviewOutputEvent) -> String { + let explanation = output.overall_explanation.trim(); + if explanation.is_empty() { + REVIEW_FALLBACK_MESSAGE.to_string() + } else { + explanation.to_string() + } +} + +fn convert_patch_changes( + changes: &HashMap, +) -> Vec { + let mut converted: Vec = changes + .iter() + .map(|(path, change)| FileUpdateChange { + path: path.to_string_lossy().into_owned(), + kind: map_patch_change_kind(change), + diff: format_file_change_diff(change), + }) + .collect(); + converted.sort_by(|a, b| a.path.cmp(&b.path)); + converted +} + +fn map_patch_change_kind(change: &codex_protocol::protocol::FileChange) -> PatchChangeKind { + match change { + codex_protocol::protocol::FileChange::Add { .. } => PatchChangeKind::Add, + codex_protocol::protocol::FileChange::Delete { .. } => PatchChangeKind::Delete, + codex_protocol::protocol::FileChange::Update { move_path, .. } => PatchChangeKind::Update { + move_path: move_path.clone(), + }, + } +} + +fn format_file_change_diff(change: &codex_protocol::protocol::FileChange) -> String { + match change { + codex_protocol::protocol::FileChange::Add { content } => content.clone(), + codex_protocol::protocol::FileChange::Delete { content } => content.clone(), + codex_protocol::protocol::FileChange::Update { + unified_diff, + move_path, + } => { + if let Some(path) = move_path { + format!("{unified_diff}\n\nMoved to: {}", path.display()) + } else { + unified_diff.clone() + } + } + } +} + struct PendingTurn { id: String, items: Vec, @@ -308,17 +755,28 @@ impl From for Turn { #[cfg(test)] mod tests { use super::*; + use codex_protocol::ThreadId; + use codex_protocol::models::WebSearchAction as CoreWebSearchAction; + use codex_protocol::parse_command::ParsedCommand; use codex_protocol::protocol::AgentMessageEvent; use codex_protocol::protocol::AgentReasoningEvent; use codex_protocol::protocol::AgentReasoningRawContentEvent; + use codex_protocol::protocol::CodexErrorInfo; use codex_protocol::protocol::CompactedItem; + use codex_protocol::protocol::ExecCommandEndEvent; + use codex_protocol::protocol::ExecCommandSource; + use codex_protocol::protocol::McpInvocation; + use codex_protocol::protocol::McpToolCallEndEvent; use codex_protocol::protocol::ThreadRolledBackEvent; use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnAbortedEvent; use codex_protocol::protocol::TurnCompleteEvent; use codex_protocol::protocol::TurnStartedEvent; use codex_protocol::protocol::UserMessageEvent; + use codex_protocol::protocol::WebSearchEndEvent; use pretty_assertions::assert_eq; + use std::path::PathBuf; + use std::time::Duration; use uuid::Uuid; #[test] @@ -706,6 +1164,371 @@ mod tests { ); } + #[test] + fn reconstructs_tool_items_from_persisted_completion_events() { + let events = vec![ + EventMsg::UserMessage(UserMessageEvent { + message: "run tools".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::WebSearchEnd(WebSearchEndEvent { + call_id: "search-1".into(), + query: "codex".into(), + action: CoreWebSearchAction::Search { + query: Some("codex".into()), + queries: None, + }, + }), + EventMsg::ExecCommandEnd(ExecCommandEndEvent { + call_id: "exec-1".into(), + process_id: Some("pid-1".into()), + turn_id: "turn-1".into(), + command: vec!["echo".into(), "hello world".into()], + cwd: PathBuf::from("/tmp"), + parsed_cmd: vec![ParsedCommand::Unknown { + cmd: "echo hello world".into(), + }], + source: ExecCommandSource::Agent, + interaction_input: None, + stdout: String::new(), + stderr: String::new(), + aggregated_output: "hello world\n".into(), + exit_code: 0, + duration: Duration::from_millis(12), + formatted_output: String::new(), + status: CoreExecCommandStatus::Completed, + }), + EventMsg::McpToolCallEnd(McpToolCallEndEvent { + call_id: "mcp-1".into(), + invocation: McpInvocation { + server: "docs".into(), + tool: "lookup".into(), + arguments: Some(serde_json::json!({"id":"123"})), + }, + duration: Duration::from_millis(8), + result: Err("boom".into()), + }), + ]; + + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); + assert_eq!(turns.len(), 1); + assert_eq!(turns[0].items.len(), 4); + assert_eq!( + turns[0].items[1], + ThreadItem::WebSearch { + id: "search-1".into(), + query: "codex".into(), + action: Some(WebSearchAction::Search { + query: Some("codex".into()), + queries: None, + }), + } + ); + assert_eq!( + turns[0].items[2], + ThreadItem::CommandExecution { + id: "exec-1".into(), + command: "echo 'hello world'".into(), + cwd: PathBuf::from("/tmp"), + process_id: Some("pid-1".into()), + status: CommandExecutionStatus::Completed, + command_actions: vec![CommandAction::Unknown { + command: "echo hello world".into(), + }], + aggregated_output: Some("hello world\n".into()), + exit_code: Some(0), + duration_ms: Some(12), + } + ); + assert_eq!( + turns[0].items[3], + ThreadItem::McpToolCall { + id: "mcp-1".into(), + server: "docs".into(), + tool: "lookup".into(), + status: McpToolCallStatus::Failed, + arguments: serde_json::json!({"id":"123"}), + result: None, + error: Some(McpToolCallError { + message: "boom".into(), + }), + duration_ms: Some(8), + } + ); + } + + #[test] + fn reconstructs_declined_exec_and_patch_items() { + let events = vec![ + EventMsg::UserMessage(UserMessageEvent { + message: "run tools".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::ExecCommandEnd(ExecCommandEndEvent { + call_id: "exec-declined".into(), + process_id: Some("pid-2".into()), + turn_id: "turn-1".into(), + command: vec!["ls".into()], + cwd: PathBuf::from("/tmp"), + parsed_cmd: vec![ParsedCommand::Unknown { cmd: "ls".into() }], + source: ExecCommandSource::Agent, + interaction_input: None, + stdout: String::new(), + stderr: "exec command rejected by user".into(), + aggregated_output: "exec command rejected by user".into(), + exit_code: -1, + duration: Duration::ZERO, + formatted_output: String::new(), + status: CoreExecCommandStatus::Declined, + }), + EventMsg::PatchApplyEnd(PatchApplyEndEvent { + call_id: "patch-declined".into(), + turn_id: "turn-1".into(), + stdout: String::new(), + stderr: "patch rejected by user".into(), + success: false, + changes: [( + PathBuf::from("README.md"), + codex_protocol::protocol::FileChange::Add { + content: "hello\n".into(), + }, + )] + .into_iter() + .collect(), + status: CorePatchApplyStatus::Declined, + }), + ]; + + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); + assert_eq!(turns.len(), 1); + assert_eq!(turns[0].items.len(), 3); + assert_eq!( + turns[0].items[1], + ThreadItem::CommandExecution { + id: "exec-declined".into(), + command: "ls".into(), + cwd: PathBuf::from("/tmp"), + process_id: Some("pid-2".into()), + status: CommandExecutionStatus::Declined, + command_actions: vec![CommandAction::Unknown { + command: "ls".into(), + }], + aggregated_output: Some("exec command rejected by user".into()), + exit_code: Some(-1), + duration_ms: Some(0), + } + ); + assert_eq!( + turns[0].items[2], + ThreadItem::FileChange { + id: "patch-declined".into(), + changes: vec![FileUpdateChange { + path: "README.md".into(), + kind: PatchChangeKind::Add, + diff: "hello\n".into(), + }], + status: PatchApplyStatus::Declined, + } + ); + } + + #[test] + fn assigns_late_exec_completion_to_original_turn() { + let events = vec![ + EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-a".into(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + EventMsg::UserMessage(UserMessageEvent { + message: "first".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-a".into(), + last_agent_message: None, + }), + EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-b".into(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + EventMsg::UserMessage(UserMessageEvent { + message: "second".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::ExecCommandEnd(ExecCommandEndEvent { + call_id: "exec-late".into(), + process_id: Some("pid-42".into()), + turn_id: "turn-a".into(), + command: vec!["echo".into(), "done".into()], + cwd: PathBuf::from("/tmp"), + parsed_cmd: vec![ParsedCommand::Unknown { + cmd: "echo done".into(), + }], + source: ExecCommandSource::Agent, + interaction_input: None, + stdout: "done\n".into(), + stderr: String::new(), + aggregated_output: "done\n".into(), + exit_code: 0, + duration: Duration::from_millis(5), + formatted_output: "done\n".into(), + status: CoreExecCommandStatus::Completed, + }), + EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-b".into(), + last_agent_message: None, + }), + ]; + + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); + assert_eq!(turns.len(), 2); + assert_eq!(turns[0].id, "turn-a"); + assert_eq!(turns[1].id, "turn-b"); + assert_eq!(turns[0].items.len(), 2); + assert_eq!(turns[1].items.len(), 1); + assert_eq!( + turns[0].items[1], + ThreadItem::CommandExecution { + id: "exec-late".into(), + command: "echo done".into(), + cwd: PathBuf::from("/tmp"), + process_id: Some("pid-42".into()), + status: CommandExecutionStatus::Completed, + command_actions: vec![CommandAction::Unknown { + command: "echo done".into(), + }], + aggregated_output: Some("done\n".into()), + exit_code: Some(0), + duration_ms: Some(5), + } + ); + } + + #[test] + fn late_turn_complete_does_not_close_active_turn() { + let events = vec![ + EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-a".into(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + EventMsg::UserMessage(UserMessageEvent { + message: "first".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-a".into(), + last_agent_message: None, + }), + EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-b".into(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + EventMsg::UserMessage(UserMessageEvent { + message: "second".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-a".into(), + last_agent_message: None, + }), + EventMsg::AgentMessage(AgentMessageEvent { + message: "still in b".into(), + }), + EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-b".into(), + last_agent_message: None, + }), + ]; + + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); + assert_eq!(turns.len(), 2); + assert_eq!(turns[0].id, "turn-a"); + assert_eq!(turns[1].id, "turn-b"); + assert_eq!(turns[1].items.len(), 2); + } + + #[test] + fn late_turn_aborted_does_not_interrupt_active_turn() { + let events = vec![ + EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-a".into(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + EventMsg::UserMessage(UserMessageEvent { + message: "first".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-a".into(), + last_agent_message: None, + }), + EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-b".into(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + EventMsg::UserMessage(UserMessageEvent { + message: "second".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::TurnAborted(TurnAbortedEvent { + turn_id: Some("turn-a".into()), + reason: TurnAbortReason::Replaced, + }), + EventMsg::AgentMessage(AgentMessageEvent { + message: "still in b".into(), + }), + ]; + + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); + assert_eq!(turns.len(), 2); + assert_eq!(turns[0].id, "turn-a"); + assert_eq!(turns[1].id, "turn-b"); + assert_eq!(turns[1].status, TurnStatus::Completed); + assert_eq!(turns[1].items.len(), 2); + } + #[test] fn preserves_compaction_only_turn() { let items = vec![ @@ -735,4 +1558,175 @@ mod tests { }] ); } + + #[test] + fn reconstructs_collab_resume_end_item() { + let events = vec![ + EventMsg::UserMessage(UserMessageEvent { + message: "resume agent".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::CollabResumeEnd(codex_protocol::protocol::CollabResumeEndEvent { + call_id: "resume-1".into(), + sender_thread_id: ThreadId::try_from("00000000-0000-0000-0000-000000000001") + .expect("valid sender thread id"), + receiver_thread_id: ThreadId::try_from("00000000-0000-0000-0000-000000000002") + .expect("valid receiver thread id"), + status: AgentStatus::Completed(None), + }), + ]; + + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); + assert_eq!(turns.len(), 1); + assert_eq!(turns[0].items.len(), 2); + assert_eq!( + turns[0].items[1], + ThreadItem::CollabAgentToolCall { + id: "resume-1".into(), + tool: CollabAgentTool::ResumeAgent, + status: CollabAgentToolCallStatus::Completed, + sender_thread_id: "00000000-0000-0000-0000-000000000001".into(), + receiver_thread_ids: vec!["00000000-0000-0000-0000-000000000002".into()], + prompt: None, + agents_states: [( + "00000000-0000-0000-0000-000000000002".into(), + CollabAgentState { + status: crate::protocol::v2::CollabAgentStatus::Completed, + message: None, + }, + )] + .into_iter() + .collect(), + } + ); + } + + #[test] + fn rollback_failed_error_does_not_mark_turn_failed() { + let events = vec![ + EventMsg::UserMessage(UserMessageEvent { + message: "hello".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::AgentMessage(AgentMessageEvent { + message: "done".into(), + }), + EventMsg::Error(ErrorEvent { + message: "rollback failed".into(), + codex_error_info: Some(CodexErrorInfo::ThreadRollbackFailed), + }), + ]; + + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); + assert_eq!(turns.len(), 1); + assert_eq!(turns[0].status, TurnStatus::Completed); + assert_eq!(turns[0].error, None); + } + + #[test] + fn out_of_turn_error_does_not_create_or_fail_a_turn() { + let events = vec![ + EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-a".into(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + EventMsg::UserMessage(UserMessageEvent { + message: "hello".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-a".into(), + last_agent_message: None, + }), + EventMsg::Error(ErrorEvent { + message: "request-level failure".into(), + codex_error_info: Some(CodexErrorInfo::BadRequest), + }), + ]; + + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); + assert_eq!(turns.len(), 1); + assert_eq!( + turns[0], + Turn { + id: "turn-a".into(), + status: TurnStatus::Completed, + error: None, + items: vec![ThreadItem::UserMessage { + id: "item-1".into(), + content: vec![UserInput::Text { + text: "hello".into(), + text_elements: Vec::new(), + }], + }], + } + ); + } + + #[test] + fn error_then_turn_complete_preserves_failed_status() { + let events = vec![ + EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-a".into(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + EventMsg::UserMessage(UserMessageEvent { + message: "hello".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::Error(ErrorEvent { + message: "stream failure".into(), + codex_error_info: Some(CodexErrorInfo::ResponseStreamDisconnected { + http_status_code: Some(502), + }), + }), + EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-a".into(), + last_agent_message: None, + }), + ]; + + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); + assert_eq!(turns.len(), 1); + assert_eq!(turns[0].id, "turn-a"); + assert_eq!(turns[0].status, TurnStatus::Failed); + assert_eq!( + turns[0].error, + Some(TurnError { + message: "stream failure".into(), + codex_error_info: Some( + crate::protocol::v2::CodexErrorInfo::ResponseStreamDisconnected { + http_status_code: Some(502), + } + ), + additional_details: None, + }) + ); + } } diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 69510df9f..7cff3e2de 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -29,7 +29,9 @@ use codex_protocol::protocol::AgentStatus as CoreAgentStatus; use codex_protocol::protocol::AskForApproval as CoreAskForApproval; use codex_protocol::protocol::CodexErrorInfo as CoreCodexErrorInfo; use codex_protocol::protocol::CreditsSnapshot as CoreCreditsSnapshot; +use codex_protocol::protocol::ExecCommandStatus as CoreExecCommandStatus; use codex_protocol::protocol::NetworkAccess as CoreNetworkAccess; +use codex_protocol::protocol::PatchApplyStatus as CorePatchApplyStatus; use codex_protocol::protocol::RateLimitSnapshot as CoreRateLimitSnapshot; use codex_protocol::protocol::RateLimitWindow as CoreRateLimitWindow; use codex_protocol::protocol::ReadOnlyAccess as CoreReadOnlyAccess; @@ -1422,6 +1424,11 @@ pub struct ThreadStartParams { #[experimental("thread/start.experimentalRawEvents")] #[serde(default)] pub experimental_raw_events: bool, + /// If true, persist additional rollout EventMsg variants required to + /// reconstruct a richer thread history on resume/fork/read. + #[experimental("thread/start.persistFullHistory")] + #[serde(default)] + pub persist_extended_history: bool, } #[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)] @@ -1503,6 +1510,11 @@ pub struct ThreadResumeParams { pub developer_instructions: Option, #[ts(optional = nullable)] pub personality: Option, + /// If true, persist additional rollout EventMsg variants required to + /// reconstruct a richer thread history on subsequent resume/fork/read. + #[experimental("thread/resume.persistFullHistory")] + #[serde(default)] + pub persist_extended_history: bool, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] @@ -1556,6 +1568,11 @@ pub struct ThreadForkParams { pub base_instructions: Option, #[ts(optional = nullable)] pub developer_instructions: Option, + /// If true, persist additional rollout EventMsg variants required to + /// reconstruct a richer thread history on subsequent resume/fork/read. + #[experimental("thread/fork.persistFullHistory")] + #[serde(default)] + pub persist_extended_history: bool, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] @@ -2622,6 +2639,22 @@ pub enum CommandExecutionStatus { Declined, } +impl From for CommandExecutionStatus { + fn from(value: CoreExecCommandStatus) -> Self { + Self::from(&value) + } +} + +impl From<&CoreExecCommandStatus> for CommandExecutionStatus { + fn from(value: &CoreExecCommandStatus) -> Self { + match value { + CoreExecCommandStatus::Completed => CommandExecutionStatus::Completed, + CoreExecCommandStatus::Failed => CommandExecutionStatus::Failed, + CoreExecCommandStatus::Declined => CommandExecutionStatus::Declined, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] @@ -2662,6 +2695,22 @@ pub enum PatchApplyStatus { Declined, } +impl From for PatchApplyStatus { + fn from(value: CorePatchApplyStatus) -> Self { + Self::from(&value) + } +} + +impl From<&CorePatchApplyStatus> for PatchApplyStatus { + fn from(value: &CorePatchApplyStatus) -> Self { + match value { + CorePatchApplyStatus::Completed => PatchApplyStatus::Completed, + CorePatchApplyStatus::Failed => PatchApplyStatus::Failed, + CorePatchApplyStatus::Declined => PatchApplyStatus::Declined, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index b6c7f9e4f..13c3fda1e 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -209,6 +209,8 @@ To branch from a stored session, call `thread/fork` with the `thread.id`. This c { "method": "thread/started", "params": { "thread": { … } } } ``` +Experimental API: `thread/start`, `thread/resume`, and `thread/fork` accept `persistExtendedHistory: true` to persist a richer subset of ThreadItems for non-lossy history when calling `thread/read`, `thread/resume`, and `thread/fork` later. This does not backfill events that were not persisted previously. + ### Example: List threads (with pagination & filters) `thread/list` lets you render a history UI. Results default to `createdAt` (newest first) descending. Pass any combination of: diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 3892f8886..79170e64b 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -717,6 +717,10 @@ pub(crate) async fn apply_bespoke_event_handling( .await; }; + if !ev.affects_turn_status() { + return; + } + let turn_error = TurnError { message: ev.message, codex_error_info: ev.codex_error_info.map(V2CodexErrorInfo::from), @@ -887,11 +891,7 @@ pub(crate) async fn apply_bespoke_event_handling( // and emit the corresponding EventMsg, we repurpose the call_id as the item_id. let item_id = patch_end_event.call_id.clone(); - let status = if patch_end_event.success { - PatchApplyStatus::Completed - } else { - PatchApplyStatus::Failed - }; + let status: PatchApplyStatus = (&patch_end_event.status).into(); let changes = convert_patch_changes(&patch_end_event.changes); complete_file_change_item( conversation_id, @@ -998,14 +998,11 @@ pub(crate) async fn apply_bespoke_event_handling( aggregated_output, exit_code, duration, + status, .. } = exec_command_end_event; - let status = if exit_code == 0 { - CommandExecutionStatus::Completed - } else { - CommandExecutionStatus::Failed - }; + let status: CommandExecutionStatus = (&status).into(); let command_actions = parsed_cmd .into_iter() .map(V2ParsedCommand::from) diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index b986b649c..603f34b01 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -1894,6 +1894,7 @@ impl CodexMessageProcessor { experimental_raw_events, personality, ephemeral, + persist_extended_history, } = params; let mut typesafe_overrides = self.build_thread_config_overrides( model, @@ -1953,7 +1954,7 @@ impl CodexMessageProcessor { match self .thread_manager - .start_thread_with_tools(config, core_dynamic_tools) + .start_thread_with_tools(config, core_dynamic_tools, persist_extended_history) .await { Ok(new_conv) => { @@ -2690,6 +2691,7 @@ impl CodexMessageProcessor { base_instructions, developer_instructions, personality, + persist_extended_history, } = params; let thread_history = if let Some(history) = history { @@ -2805,7 +2807,12 @@ impl CodexMessageProcessor { match self .thread_manager - .resume_thread_with_history(config, thread_history, self.auth_manager.clone()) + .resume_thread_with_history( + config, + thread_history, + self.auth_manager.clone(), + persist_extended_history, + ) .await { Ok(NewThread { @@ -2910,6 +2917,7 @@ impl CodexMessageProcessor { config: cli_overrides, base_instructions, developer_instructions, + persist_extended_history, } = params; let (rollout_path, source_thread_id) = if let Some(path) = path { @@ -3021,7 +3029,12 @@ impl CodexMessageProcessor { .. } = match self .thread_manager - .fork_thread(usize::MAX, config, rollout_path.clone()) + .fork_thread( + usize::MAX, + config, + rollout_path.clone(), + persist_extended_history, + ) .await { Ok(thread) => thread, @@ -3962,7 +3975,7 @@ impl CodexMessageProcessor { match self .thread_manager - .resume_thread_with_history(config, thread_history, self.auth_manager.clone()) + .resume_thread_with_history(config, thread_history, self.auth_manager.clone(), false) .await { Ok(NewThread { @@ -4162,7 +4175,7 @@ impl CodexMessageProcessor { .. } = match self .thread_manager - .fork_thread(usize::MAX, config, rollout_path.clone()) + .fork_thread(usize::MAX, config, rollout_path.clone(), false) .await { Ok(thread) => thread, @@ -5168,7 +5181,7 @@ impl CodexMessageProcessor { .. } = self .thread_manager - .fork_thread(usize::MAX, config, rollout_path) + .fork_thread(usize::MAX, config, rollout_path, false) .await .map_err(|err| JSONRPCErrorError { code: INTERNAL_ERROR_CODE, diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index b0164e86e..b6cd0d82a 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -50,7 +50,7 @@ impl AgentControl { let new_thread = match session_source { Some(session_source) => { state - .spawn_new_thread_with_source(config, self.clone(), session_source) + .spawn_new_thread_with_source(config, self.clone(), session_source, false) .await? } None => state.spawn_new_thread(config, self.clone()).await?, diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index dbfa92ff4..647b2f760 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -196,6 +196,7 @@ use crate::rollout::RolloutRecorder; use crate::rollout::RolloutRecorderParams; use crate::rollout::map_session_init_error; use crate::rollout::metadata; +use crate::rollout::policy::EventPersistenceMode; use crate::shell; use crate::shell_snapshot::ShellSnapshot; use crate::skills::SkillError; @@ -284,6 +285,7 @@ impl Codex { session_source: SessionSource, agent_control: AgentControl, dynamic_tools: Vec, + persist_extended_history: bool, ) -> CodexResult { let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY); let (tx_event, rx_event) = async_channel::unbounded(); @@ -396,6 +398,7 @@ impl Codex { original_config_do_not_use: Arc::clone(&config), session_source, dynamic_tools, + persist_extended_history, }; // Generate a unique ID for the lifetime of this Codex session. @@ -733,6 +736,7 @@ pub(crate) struct SessionConfiguration { /// Source of the session (cli, vscode, exec, mcp, ...) session_source: SessionSource, dynamic_tools: Vec, + persist_extended_history: bool, } impl SessionConfiguration { @@ -984,12 +988,24 @@ impl Session { text: session_configuration.base_instructions.clone(), }, session_configuration.dynamic_tools.clone(), + if session_configuration.persist_extended_history { + EventPersistenceMode::Extended + } else { + EventPersistenceMode::Limited + }, ), ) } InitialHistory::Resumed(resumed_history) => ( resumed_history.conversation_id, - RolloutRecorderParams::resume(resumed_history.rollout_path.clone()), + RolloutRecorderParams::resume( + resumed_history.rollout_path.clone(), + if session_configuration.persist_extended_history { + EventPersistenceMode::Extended + } else { + EventPersistenceMode::Limited + }, + ), ), }; let state_builder = match &initial_history { @@ -6297,6 +6313,7 @@ mod tests { original_config_do_not_use: Arc::clone(&config), session_source: SessionSource::Exec, dynamic_tools: Vec::new(), + persist_extended_history: false, }; let mut state = SessionState::new(session_configuration); @@ -6387,6 +6404,7 @@ mod tests { original_config_do_not_use: Arc::clone(&config), session_source: SessionSource::Exec, dynamic_tools: Vec::new(), + persist_extended_history: false, }; let mut state = SessionState::new(session_configuration); @@ -6696,6 +6714,7 @@ mod tests { original_config_do_not_use: Arc::clone(&config), session_source: SessionSource::Exec, dynamic_tools: Vec::new(), + persist_extended_history: false, } } @@ -6748,6 +6767,7 @@ mod tests { original_config_do_not_use: Arc::clone(&config), session_source: SessionSource::Exec, dynamic_tools: Vec::new(), + persist_extended_history: false, }; let per_turn_config = Session::build_per_turn_config(&session_configuration); let model_info = ModelsManager::construct_model_info_offline_for_tests( @@ -6893,6 +6913,7 @@ mod tests { original_config_do_not_use: Arc::clone(&config), session_source: SessionSource::Exec, dynamic_tools: Vec::new(), + persist_extended_history: false, }; let per_turn_config = Session::build_per_turn_config(&session_configuration); let model_info = ModelsManager::construct_model_info_offline_for_tests( diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 504b876bb..1f4240f68 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -58,6 +58,7 @@ pub(crate) async fn run_codex_thread_interactive( SessionSource::SubAgent(SubAgentSource::Review), parent_session.services.agent_control.clone(), Vec::new(), + false, ) .await?; let codex = Arc::new(codex); diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 53ac2e0d8..6f075e7dc 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -124,6 +124,7 @@ pub use rollout::list::ThreadsPage; pub use rollout::list::parse_cursor; pub use rollout::list::read_head_for_summary; pub use rollout::list::read_session_meta_line; +pub use rollout::policy::EventPersistenceMode; pub use rollout::rollout_date_parts; pub use rollout::session_index::find_thread_names_by_ids; mod function_tool; diff --git a/codex-rs/core/src/rollout/policy.rs b/codex-rs/core/src/rollout/policy.rs index 87db64fb5..dc3e111a6 100644 --- a/codex-rs/core/src/rollout/policy.rs +++ b/codex-rs/core/src/rollout/policy.rs @@ -2,12 +2,20 @@ use crate::protocol::EventMsg; use crate::protocol::RolloutItem; use codex_protocol::models::ResponseItem; -/// Whether a rollout `item` should be persisted in rollout files. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub enum EventPersistenceMode { + #[default] + Limited, + Extended, +} + +/// Whether a rollout `item` should be persisted in rollout files for the +/// provided persistence `mode`. #[inline] -pub(crate) fn is_persisted_response_item(item: &RolloutItem) -> bool { +pub(crate) fn is_persisted_response_item(item: &RolloutItem, mode: EventPersistenceMode) -> bool { match item { RolloutItem::ResponseItem(item) => should_persist_response_item(item), - RolloutItem::EventMsg(ev) => should_persist_event_msg(ev), + RolloutItem::EventMsg(ev) => should_persist_event_msg(ev, mode), // Persist Codex executive markers so we can analyze flows (e.g., compaction, API turns). RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => { true @@ -51,9 +59,33 @@ pub(crate) fn should_persist_response_item_for_memories(item: &ResponseItem) -> } } -/// Whether an `EventMsg` should be persisted in rollout files. +/// Whether an `EventMsg` should be persisted in rollout files for the +/// provided persistence `mode`. #[inline] -pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { +pub(crate) fn should_persist_event_msg(ev: &EventMsg, mode: EventPersistenceMode) -> bool { + match mode { + EventPersistenceMode::Limited => should_persist_event_msg_limited(ev), + EventPersistenceMode::Extended => should_persist_event_msg_extended(ev), + } +} + +fn should_persist_event_msg_limited(ev: &EventMsg) -> bool { + matches!( + event_msg_persistence_mode(ev), + Some(EventPersistenceMode::Limited) + ) +} + +fn should_persist_event_msg_extended(ev: &EventMsg) -> bool { + matches!( + event_msg_persistence_mode(ev), + Some(EventPersistenceMode::Limited) | Some(EventPersistenceMode::Extended) + ) +} + +/// Returns the minimum persistence mode that includes this event. +/// `None` means the event should never be persisted. +fn event_msg_persistence_mode(ev: &EventMsg) -> Option { match ev { EventMsg::UserMessage(_) | EventMsg::AgentMessage(_) @@ -67,15 +99,29 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { | EventMsg::UndoCompleted(_) | EventMsg::TurnAborted(_) | EventMsg::TurnStarted(_) - | EventMsg::TurnComplete(_) => true, + | EventMsg::TurnComplete(_) => Some(EventPersistenceMode::Limited), EventMsg::ItemCompleted(event) => { // Plan items are derived from streaming tags and are not part of the // raw ResponseItem history, so we persist their completion to replay // them on resume without bloating rollouts with every item lifecycle. - matches!(event.item, codex_protocol::items::TurnItem::Plan(_)) + if matches!(event.item, codex_protocol::items::TurnItem::Plan(_)) { + Some(EventPersistenceMode::Limited) + } else { + None + } } EventMsg::Error(_) - | EventMsg::Warning(_) + | EventMsg::WebSearchEnd(_) + | EventMsg::ExecCommandEnd(_) + | EventMsg::PatchApplyEnd(_) + | EventMsg::McpToolCallEnd(_) + | EventMsg::ViewImageToolCall(_) + | EventMsg::CollabAgentSpawnEnd(_) + | EventMsg::CollabAgentInteractionEnd(_) + | EventMsg::CollabWaitingEnd(_) + | EventMsg::CollabCloseEnd(_) + | EventMsg::CollabResumeEnd(_) => Some(EventPersistenceMode::Extended), + EventMsg::Warning(_) | EventMsg::AgentMessageDelta(_) | EventMsg::AgentReasoningDelta(_) | EventMsg::AgentReasoningRawContentDelta(_) @@ -84,13 +130,10 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { | EventMsg::SessionConfigured(_) | EventMsg::ThreadNameUpdated(_) | EventMsg::McpToolCallBegin(_) - | EventMsg::McpToolCallEnd(_) | EventMsg::WebSearchBegin(_) - | EventMsg::WebSearchEnd(_) | EventMsg::ExecCommandBegin(_) | EventMsg::TerminalInteraction(_) | EventMsg::ExecCommandOutputDelta(_) - | EventMsg::ExecCommandEnd(_) | EventMsg::ExecApprovalRequest(_) | EventMsg::RequestUserInput(_) | EventMsg::DynamicToolCallRequest(_) @@ -99,7 +142,6 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { | EventMsg::BackgroundEvent(_) | EventMsg::StreamError(_) | EventMsg::PatchApplyBegin(_) - | EventMsg::PatchApplyEnd(_) | EventMsg::TurnDiff(_) | EventMsg::GetHistoryEntryResponse(_) | EventMsg::UndoStarted(_) @@ -112,7 +154,6 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { | EventMsg::RemoteSkillDownloaded(_) | EventMsg::PlanUpdate(_) | EventMsg::ShutdownComplete - | EventMsg::ViewImageToolCall(_) | EventMsg::DeprecationNotice(_) | EventMsg::ItemStarted(_) | EventMsg::AgentMessageContentDelta(_) @@ -121,14 +162,9 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { | EventMsg::ReasoningRawContentDelta(_) | EventMsg::SkillsUpdateAvailable | EventMsg::CollabAgentSpawnBegin(_) - | EventMsg::CollabAgentSpawnEnd(_) | EventMsg::CollabAgentInteractionBegin(_) - | EventMsg::CollabAgentInteractionEnd(_) | EventMsg::CollabWaitingBegin(_) - | EventMsg::CollabWaitingEnd(_) | EventMsg::CollabCloseBegin(_) - | EventMsg::CollabCloseEnd(_) - | EventMsg::CollabResumeBegin(_) - | EventMsg::CollabResumeEnd(_) => false, + | EventMsg::CollabResumeBegin(_) => None, } } diff --git a/codex-rs/core/src/rollout/recorder.rs b/codex-rs/core/src/rollout/recorder.rs index 4e94d0356..d0df0828a 100644 --- a/codex-rs/core/src/rollout/recorder.rs +++ b/codex-rs/core/src/rollout/recorder.rs @@ -36,6 +36,7 @@ use super::list::get_threads_in_root; use super::list::parse_cursor; use super::list::parse_timestamp_uuid_from_filename; use super::metadata; +use super::policy::EventPersistenceMode; use super::policy::is_persisted_response_item; use crate::config::Config; use crate::default_client::originator; @@ -43,6 +44,9 @@ use crate::git_info::collect_git_info; use crate::path_utils; use crate::state_db; use crate::state_db::StateDbHandle; +use crate::truncate::TruncationPolicy; +use crate::truncate::truncate_text; +use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::InitialHistory; use codex_protocol::protocol::ResumedHistory; use codex_protocol::protocol::RolloutItem; @@ -67,6 +71,7 @@ pub struct RolloutRecorder { tx: Sender, pub(crate) rollout_path: PathBuf, state_db: Option, + event_persistence_mode: EventPersistenceMode, } #[derive(Clone)] @@ -77,9 +82,11 @@ pub enum RolloutRecorderParams { source: SessionSource, base_instructions: BaseInstructions, dynamic_tools: Vec, + event_persistence_mode: EventPersistenceMode, }, Resume { path: PathBuf, + event_persistence_mode: EventPersistenceMode, }, } @@ -104,6 +111,7 @@ impl RolloutRecorderParams { source: SessionSource, base_instructions: BaseInstructions, dynamic_tools: Vec, + event_persistence_mode: EventPersistenceMode, ) -> Self { Self::Create { conversation_id, @@ -111,11 +119,42 @@ impl RolloutRecorderParams { source, base_instructions, dynamic_tools, + event_persistence_mode, } } - pub fn resume(path: PathBuf) -> Self { - Self::Resume { path } + pub fn resume(path: PathBuf, event_persistence_mode: EventPersistenceMode) -> Self { + Self::Resume { + path, + event_persistence_mode, + } + } +} + +const PERSISTED_EXEC_AGGREGATED_OUTPUT_MAX_BYTES: usize = 10_000; + +fn sanitize_rollout_item_for_persistence( + item: RolloutItem, + mode: EventPersistenceMode, +) -> RolloutItem { + if mode != EventPersistenceMode::Extended { + return item; + } + + match item { + RolloutItem::EventMsg(EventMsg::ExecCommandEnd(mut event)) => { + // Persist only a bounded aggregated summary of command output. + event.aggregated_output = truncate_text( + &event.aggregated_output, + TruncationPolicy::Bytes(PERSISTED_EXEC_AGGREGATED_OUTPUT_MAX_BYTES), + ); + // Drop unnecessary fields from rollout storage since aggregated_output is all we need. + event.stdout.clear(); + event.stderr.clear(); + event.formatted_output.clear(); + RolloutItem::EventMsg(EventMsg::ExecCommandEnd(event)) + } + _ => item, } } @@ -322,58 +361,70 @@ impl RolloutRecorder { state_db_ctx: Option, state_builder: Option, ) -> std::io::Result { - let (file, deferred_log_file_info, rollout_path, meta) = match params { - RolloutRecorderParams::Create { - conversation_id, - forked_from_id, - source, - base_instructions, - dynamic_tools, - } => { - let log_file_info = precompute_log_file_info(config, conversation_id)?; - let path = log_file_info.path.clone(); - let session_id = log_file_info.conversation_id; - let started_at = log_file_info.timestamp; - - let timestamp_format: &[FormatItem] = format_description!( - "[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z" - ); - let timestamp = started_at - .to_offset(time::UtcOffset::UTC) - .format(timestamp_format) - .map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?; - - let session_meta = SessionMeta { - id: session_id, + let (file, deferred_log_file_info, rollout_path, meta, event_persistence_mode) = + match params { + RolloutRecorderParams::Create { + conversation_id, forked_from_id, - timestamp, - cwd: config.cwd.clone(), - originator: originator().value, - cli_version: env!("CARGO_PKG_VERSION").to_string(), source, - model_provider: Some(config.model_provider_id.clone()), - base_instructions: Some(base_instructions), - dynamic_tools: if dynamic_tools.is_empty() { - None - } else { - Some(dynamic_tools) - }, - }; + base_instructions, + dynamic_tools, + event_persistence_mode, + } => { + let log_file_info = precompute_log_file_info(config, conversation_id)?; + let path = log_file_info.path.clone(); + let session_id = log_file_info.conversation_id; + let started_at = log_file_info.timestamp; - (None, Some(log_file_info), path, Some(session_meta)) - } - RolloutRecorderParams::Resume { path } => ( - Some( - tokio::fs::OpenOptions::new() - .append(true) - .open(&path) - .await?, + let timestamp_format: &[FormatItem] = format_description!( + "[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z" + ); + let timestamp = started_at + .to_offset(time::UtcOffset::UTC) + .format(timestamp_format) + .map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?; + + let session_meta = SessionMeta { + id: session_id, + forked_from_id, + timestamp, + cwd: config.cwd.clone(), + originator: originator().value, + cli_version: env!("CARGO_PKG_VERSION").to_string(), + source, + model_provider: Some(config.model_provider_id.clone()), + base_instructions: Some(base_instructions), + dynamic_tools: if dynamic_tools.is_empty() { + None + } else { + Some(dynamic_tools) + }, + }; + + ( + None, + Some(log_file_info), + path, + Some(session_meta), + event_persistence_mode, + ) + } + RolloutRecorderParams::Resume { + path, + event_persistence_mode, + } => ( + Some( + tokio::fs::OpenOptions::new() + .append(true) + .open(&path) + .await?, + ), + None, + path, + None, + event_persistence_mode, ), - None, - path, - None, - ), - }; + }; // Clone the cwd for the spawned task to collect git info asynchronously let cwd = config.cwd.clone(); @@ -402,6 +453,7 @@ impl RolloutRecorder { tx, rollout_path, state_db: state_db_ctx, + event_persistence_mode, }) } @@ -419,8 +471,11 @@ impl RolloutRecorder { // Note that function calls may look a bit strange if they are // "fully qualified MCP tool calls," so we could consider // reformatting them in that case. - if is_persisted_response_item(item) { - filtered.push(item.clone()); + if is_persisted_response_item(item, self.event_persistence_mode) { + filtered.push(sanitize_rollout_item_for_persistence( + item.clone(), + self.event_persistence_mode, + )); } } if filtered.is_empty() { @@ -673,9 +728,7 @@ async fn rollout_writer( RolloutCmd::AddItems(items) => { let mut persisted_items = Vec::new(); for item in items { - if is_persisted_response_item(&item) { - persisted_items.push(item); - } + persisted_items.push(item); } if persisted_items.is_empty() { continue; @@ -1003,6 +1056,7 @@ mod tests { SessionSource::Exec, BaseInstructions::default(), Vec::new(), + EventPersistenceMode::Limited, ), None, None, diff --git a/codex-rs/core/src/tasks/user_shell.rs b/codex-rs/core/src/tasks/user_shell.rs index ef6c7180a..43950a8da 100644 --- a/codex-rs/core/src/tasks/user_shell.rs +++ b/codex-rs/core/src/tasks/user_shell.rs @@ -21,6 +21,7 @@ use crate::protocol::EventMsg; use crate::protocol::ExecCommandBeginEvent; use crate::protocol::ExecCommandEndEvent; use crate::protocol::ExecCommandSource; +use crate::protocol::ExecCommandStatus; use crate::protocol::SandboxPolicy; use crate::protocol::TurnStartedEvent; use crate::sandboxing::ExecRequest; @@ -207,6 +208,7 @@ pub(crate) async fn execute_user_shell_command( exit_code: -1, duration: Duration::ZERO, formatted_output: aborted_message, + status: ExecCommandStatus::Failed, }), ) .await; @@ -233,6 +235,11 @@ pub(crate) async fn execute_user_shell_command( &output, turn_context.truncation_policy, ), + status: if output.exit_code == 0 { + ExecCommandStatus::Completed + } else { + ExecCommandStatus::Failed + }, }), ) .await; @@ -272,6 +279,7 @@ pub(crate) async fn execute_user_shell_command( &exec_output, turn_context.truncation_policy, ), + status: ExecCommandStatus::Failed, }), ) .await; diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 8117f62ec..b13ef8c61 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -277,13 +277,15 @@ impl ThreadManager { } pub async fn start_thread(&self, config: Config) -> CodexResult { - self.start_thread_with_tools(config, Vec::new()).await + self.start_thread_with_tools(config, Vec::new(), false) + .await } pub async fn start_thread_with_tools( &self, config: Config, dynamic_tools: Vec, + persist_extended_history: bool, ) -> CodexResult { self.state .spawn_thread( @@ -292,6 +294,7 @@ impl ThreadManager { Arc::clone(&self.state.auth_manager), self.agent_control(), dynamic_tools, + persist_extended_history, ) .await } @@ -303,7 +306,7 @@ impl ThreadManager { auth_manager: Arc, ) -> CodexResult { let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?; - self.resume_thread_with_history(config, initial_history, auth_manager) + self.resume_thread_with_history(config, initial_history, auth_manager, false) .await } @@ -312,6 +315,7 @@ impl ThreadManager { config: Config, initial_history: InitialHistory, auth_manager: Arc, + persist_extended_history: bool, ) -> CodexResult { self.state .spawn_thread( @@ -320,6 +324,7 @@ impl ThreadManager { auth_manager, self.agent_control(), Vec::new(), + persist_extended_history, ) .await } @@ -349,6 +354,7 @@ impl ThreadManager { nth_user_message: usize, config: Config, path: PathBuf, + persist_extended_history: bool, ) -> CodexResult { let history = RolloutRecorder::get_rollout_history(&path).await?; let history = truncate_before_nth_user_message(history, nth_user_message); @@ -359,6 +365,7 @@ impl ThreadManager { Arc::clone(&self.state.auth_manager), self.agent_control(), Vec::new(), + persist_extended_history, ) .await } @@ -409,7 +416,7 @@ impl ThreadManagerState { config: Config, agent_control: AgentControl, ) -> CodexResult { - self.spawn_new_thread_with_source(config, agent_control, self.session_source.clone()) + self.spawn_new_thread_with_source(config, agent_control, self.session_source.clone(), false) .await } @@ -418,6 +425,7 @@ impl ThreadManagerState { config: Config, agent_control: AgentControl, session_source: SessionSource, + persist_extended_history: bool, ) -> CodexResult { self.spawn_thread_with_source( config, @@ -426,6 +434,7 @@ impl ThreadManagerState { agent_control, session_source, Vec::new(), + persist_extended_history, ) .await } @@ -445,6 +454,7 @@ impl ThreadManagerState { agent_control, session_source, Vec::new(), + false, ) .await } @@ -457,6 +467,7 @@ impl ThreadManagerState { auth_manager: Arc, agent_control: AgentControl, dynamic_tools: Vec, + persist_extended_history: bool, ) -> CodexResult { self.spawn_thread_with_source( config, @@ -465,10 +476,12 @@ impl ThreadManagerState { agent_control, self.session_source.clone(), dynamic_tools, + persist_extended_history, ) .await } + #[allow(clippy::too_many_arguments)] pub(crate) async fn spawn_thread_with_source( &self, config: Config, @@ -477,6 +490,7 @@ impl ThreadManagerState { agent_control: AgentControl, session_source: SessionSource, dynamic_tools: Vec, + persist_extended_history: bool, ) -> CodexResult { self.file_watcher.register_config(&config); let CodexSpawnOk { @@ -491,6 +505,7 @@ impl ThreadManagerState { session_source, agent_control, dynamic_tools, + persist_extended_history, ) .await?; self.finalize_thread_spawn(codex, thread_id).await diff --git a/codex-rs/core/src/tools/events.rs b/codex-rs/core/src/tools/events.rs index ba720918b..65972f5a5 100644 --- a/codex-rs/core/src/tools/events.rs +++ b/codex-rs/core/src/tools/events.rs @@ -9,9 +9,11 @@ use crate::protocol::EventMsg; use crate::protocol::ExecCommandBeginEvent; use crate::protocol::ExecCommandEndEvent; use crate::protocol::ExecCommandSource; +use crate::protocol::ExecCommandStatus; use crate::protocol::FileChange; use crate::protocol::PatchApplyBeginEvent; use crate::protocol::PatchApplyEndEvent; +use crate::protocol::PatchApplyStatus; use crate::protocol::TurnDiffEvent; use crate::tools::context::SharedTurnDiffTracker; use crate::tools::sandboxing::ToolError; @@ -56,6 +58,7 @@ pub(crate) enum ToolEventStage { pub(crate) enum ToolEventFailure { Output(ExecToolCallOutput), Message(String), + Rejected(String), } pub(crate) async fn emit_exec_command_begin( @@ -195,6 +198,11 @@ impl ToolEmitter { output.stdout.text.clone(), output.stderr.text.clone(), output.exit_code == 0, + if output.exit_code == 0 { + PatchApplyStatus::Completed + } else { + PatchApplyStatus::Failed + }, ) .await; } @@ -208,6 +216,11 @@ impl ToolEmitter { output.stdout.text.clone(), output.stderr.text.clone(), output.exit_code == 0, + if output.exit_code == 0 { + PatchApplyStatus::Completed + } else { + PatchApplyStatus::Failed + }, ) .await; } @@ -221,6 +234,21 @@ impl ToolEmitter { String::new(), (*message).to_string(), false, + PatchApplyStatus::Failed, + ) + .await; + } + ( + Self::ApplyPatch { changes, .. }, + ToolEventStage::Failure(ToolEventFailure::Rejected(message)), + ) => { + emit_patch_end( + ctx, + changes.clone(), + String::new(), + (*message).to_string(), + false, + PatchApplyStatus::Declined, ) .await; } @@ -301,6 +329,13 @@ impl ToolEmitter { Err(ToolError::Rejected(msg)) => { // Normalize common rejection messages for exec tools so tests and // users see a clear, consistent phrase. + // + // NOTE: ToolError::Rejected is currently used for both user-declined approvals + // and some operational/runtime rejection paths (for example setup failures). + // We intentionally map all of them through the "rejected" event path for now, + // which means a subset of non-user failures may be reported as Declined. + // + // TODO: We should add a new ToolError variant for user-declined approvals. let normalized = if msg == "rejected by user" { match self { Self::Shell { .. } | Self::UnifiedExec { .. } => { @@ -311,7 +346,7 @@ impl ToolEmitter { } else { msg }; - let event = ToolEventStage::Failure(ToolEventFailure::Message(normalized.clone())); + let event = ToolEventStage::Failure(ToolEventFailure::Rejected(normalized.clone())); let result = Err(FunctionCallError::RespondToModel(normalized)); (event, result) } @@ -357,6 +392,7 @@ struct ExecCommandResult { exit_code: i32, duration: Duration, formatted_output: String, + status: ExecCommandStatus, } async fn emit_exec_stage( @@ -386,6 +422,11 @@ async fn emit_exec_stage( exit_code: output.exit_code, duration: output.duration, formatted_output: format_exec_output_str(&output, ctx.turn.truncation_policy), + status: if output.exit_code == 0 { + ExecCommandStatus::Completed + } else { + ExecCommandStatus::Failed + }, }; emit_exec_end(ctx, exec_input, exec_result).await; } @@ -398,6 +439,20 @@ async fn emit_exec_stage( exit_code: -1, duration: Duration::ZERO, formatted_output: text, + status: ExecCommandStatus::Failed, + }; + emit_exec_end(ctx, exec_input, exec_result).await; + } + ToolEventStage::Failure(ToolEventFailure::Rejected(message)) => { + let text = message.to_string(); + let exec_result = ExecCommandResult { + stdout: String::new(), + stderr: text.clone(), + aggregated_output: text.clone(), + exit_code: -1, + duration: Duration::ZERO, + formatted_output: text, + status: ExecCommandStatus::Declined, }; emit_exec_end(ctx, exec_input, exec_result).await; } @@ -427,6 +482,7 @@ async fn emit_exec_end( exit_code: exec_result.exit_code, duration: exec_result.duration, formatted_output: exec_result.formatted_output, + status: exec_result.status, }), ) .await; @@ -438,6 +494,7 @@ async fn emit_patch_end( stdout: String, stderr: String, success: bool, + status: PatchApplyStatus, ) { ctx.session .send_event( @@ -449,6 +506,7 @@ async fn emit_patch_end( stderr, success, changes, + status, }), ) .await; diff --git a/codex-rs/core/src/tools/handlers/collab.rs b/codex-rs/core/src/tools/handlers/collab.rs index aef8e3cc7..07a21a3fc 100644 --- a/codex-rs/core/src/tools/handlers/collab.rs +++ b/codex-rs/core/src/tools/handlers/collab.rs @@ -1301,6 +1301,7 @@ mod tests { phase: None, })]), AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy")), + false, ) .await .expect("start thread"); diff --git a/codex-rs/core/tests/suite/compact_resume_fork.rs b/codex-rs/core/tests/suite/compact_resume_fork.rs index 325ad4829..ff90d4baf 100644 --- a/codex-rs/core/tests/suite/compact_resume_fork.rs +++ b/codex-rs/core/tests/suite/compact_resume_fork.rs @@ -1019,7 +1019,7 @@ async fn fork_thread( nth_user_message: usize, ) -> Arc { manager - .fork_thread(nth_user_message, config.clone(), path) + .fork_thread(nth_user_message, config.clone(), path, false) .await .expect("fork conversation") .thread diff --git a/codex-rs/core/tests/suite/fork_thread.rs b/codex-rs/core/tests/suite/fork_thread.rs index d5363cc0a..3d97400a9 100644 --- a/codex-rs/core/tests/suite/fork_thread.rs +++ b/codex-rs/core/tests/suite/fork_thread.rs @@ -110,7 +110,7 @@ async fn fork_thread_twice_drops_to_first_message() { thread: codex_fork1, .. } = thread_manager - .fork_thread(1, config_for_fork.clone(), base_path.clone()) + .fork_thread(1, config_for_fork.clone(), base_path.clone(), false) .await .expect("fork 1"); @@ -129,7 +129,7 @@ async fn fork_thread_twice_drops_to_first_message() { thread: codex_fork2, .. } = thread_manager - .fork_thread(0, config_for_fork.clone(), fork1_path.clone()) + .fork_thread(0, config_for_fork.clone(), fork1_path.clone(), false) .await .expect("fork 2"); diff --git a/codex-rs/core/tests/suite/permissions_messages.rs b/codex-rs/core/tests/suite/permissions_messages.rs index 5bb92114f..c3936742e 100644 --- a/codex-rs/core/tests/suite/permissions_messages.rs +++ b/codex-rs/core/tests/suite/permissions_messages.rs @@ -413,7 +413,7 @@ async fn resume_and_fork_append_permissions_messages() -> Result<()> { fork_config.approval_policy = Constrained::allow_any(AskForApproval::UnlessTrusted); let forked = initial .thread_manager - .fork_thread(usize::MAX, fork_config, rollout_path) + .fork_thread(usize::MAX, fork_config, rollout_path, false) .await?; forked .thread diff --git a/codex-rs/core/tests/suite/resume_warning.rs b/codex-rs/core/tests/suite/resume_warning.rs index 5bea0e1f0..d1746dbf8 100644 --- a/codex-rs/core/tests/suite/resume_warning.rs +++ b/codex-rs/core/tests/suite/resume_warning.rs @@ -68,7 +68,7 @@ async fn emits_warning_when_resumed_model_differs() { thread: conversation, .. } = thread_manager - .resume_thread_with_history(config, initial_history, auth_manager) + .resume_thread_with_history(config, initial_history, auth_manager, false) .await .expect("resume conversation"); diff --git a/codex-rs/core/tests/suite/rollout_list_find.rs b/codex-rs/core/tests/suite/rollout_list_find.rs index 059ca929e..d9f92b85b 100644 --- a/codex-rs/core/tests/suite/rollout_list_find.rs +++ b/codex-rs/core/tests/suite/rollout_list_find.rs @@ -4,6 +4,7 @@ use std::path::Path; use std::path::PathBuf; use chrono::Utc; +use codex_core::EventPersistenceMode; use codex_core::RolloutRecorder; use codex_core::RolloutRecorderParams; use codex_core::config::ConfigBuilder; @@ -171,6 +172,7 @@ async fn find_locates_rollout_file_written_by_recorder() -> std::io::Result<()> SessionSource::Exec, BaseInstructions::default(), Vec::new(), + EventPersistenceMode::Limited, ), None, None, diff --git a/codex-rs/core/tests/suite/unstable_features_warning.rs b/codex-rs/core/tests/suite/unstable_features_warning.rs index e82572abc..1cb07c1f4 100644 --- a/codex-rs/core/tests/suite/unstable_features_warning.rs +++ b/codex-rs/core/tests/suite/unstable_features_warning.rs @@ -39,7 +39,7 @@ async fn emits_warning_when_unstable_features_enabled_via_config() { thread: conversation, .. } = thread_manager - .resume_thread_with_history(config, InitialHistory::New, auth_manager) + .resume_thread_with_history(config, InitialHistory::New, auth_manager, false) .await .expect("spawn conversation"); @@ -77,7 +77,7 @@ async fn suppresses_warning_when_configured() { thread: conversation, .. } = thread_manager - .resume_thread_with_history(config, InitialHistory::New, auth_manager) + .resume_thread_with_history(config, InitialHistory::New, auth_manager, false) .await .expect("spawn conversation"); diff --git a/codex-rs/exec/tests/event_processor_with_json_output.rs b/codex-rs/exec/tests/event_processor_with_json_output.rs index e05883041..46cd61545 100644 --- a/codex-rs/exec/tests/event_processor_with_json_output.rs +++ b/codex-rs/exec/tests/event_processor_with_json_output.rs @@ -11,12 +11,14 @@ use codex_core::protocol::EventMsg; use codex_core::protocol::ExecCommandBeginEvent; use codex_core::protocol::ExecCommandEndEvent; use codex_core::protocol::ExecCommandSource; +use codex_core::protocol::ExecCommandStatus as CoreExecCommandStatus; use codex_core::protocol::FileChange; use codex_core::protocol::McpInvocation; use codex_core::protocol::McpToolCallBeginEvent; use codex_core::protocol::McpToolCallEndEvent; use codex_core::protocol::PatchApplyBeginEvent; use codex_core::protocol::PatchApplyEndEvent; +use codex_core::protocol::PatchApplyStatus as CorePatchApplyStatus; use codex_core::protocol::SandboxPolicy; use codex_core::protocol::SessionConfiguredEvent; use codex_core::protocol::WarningEvent; @@ -901,6 +903,7 @@ fn exec_command_end_success_produces_completed_command_item() { exit_code: 0, duration: Duration::from_millis(5), formatted_output: String::new(), + status: CoreExecCommandStatus::Completed, }), ); let out_ok = ep.collect_thread_events(&end_ok); @@ -988,6 +991,7 @@ fn command_execution_output_delta_updates_item_progress() { exit_code: 0, duration: Duration::from_millis(3), formatted_output: String::new(), + status: CoreExecCommandStatus::Completed, }), ); let out_end = ep.collect_thread_events(&end); @@ -1061,6 +1065,7 @@ fn exec_command_end_failure_produces_failed_command_item() { exit_code: 1, duration: Duration::from_millis(2), formatted_output: String::new(), + status: CoreExecCommandStatus::Failed, }), ); let out_fail = ep.collect_thread_events(&end_fail); @@ -1102,6 +1107,7 @@ fn exec_command_end_without_begin_is_ignored() { exit_code: 0, duration: Duration::from_millis(1), formatted_output: String::new(), + status: CoreExecCommandStatus::Completed, }), ); let out = ep.collect_thread_events(&end_only); @@ -1157,6 +1163,7 @@ fn patch_apply_success_produces_item_completed_patchapply() { stderr: String::new(), success: true, changes: changes.clone(), + status: CorePatchApplyStatus::Completed, }), ); let out_end = ep.collect_thread_events(&end); @@ -1228,6 +1235,7 @@ fn patch_apply_failure_produces_item_completed_patchapply_failed() { stderr: "failed to apply".to_string(), success: false, changes: changes.clone(), + status: CorePatchApplyStatus::Failed, }), ); let out_end = ep.collect_thread_events(&end); diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 40e981b93..6097c60e8 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -1160,6 +1160,27 @@ pub enum CodexErrorInfo { Other, } +impl CodexErrorInfo { + /// Whether this error should mark the current turn as failed when replaying history. + pub fn affects_turn_status(&self) -> bool { + match self { + Self::ThreadRollbackFailed => false, + Self::ContextWindowExceeded + | Self::UsageLimitExceeded + | Self::ServerOverloaded + | Self::HttpConnectionFailed { .. } + | Self::ResponseStreamConnectionFailed { .. } + | Self::InternalServerError + | Self::Unauthorized + | Self::BadRequest + | Self::SandboxError + | Self::ResponseStreamDisconnected { .. } + | Self::ResponseTooManyFailedAttempts { .. } + | Self::Other => true, + } + } +} + #[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] pub struct RawResponseItemEvent { pub item: ResponseItem, @@ -1297,6 +1318,15 @@ pub struct ErrorEvent { pub codex_error_info: Option, } +impl ErrorEvent { + /// Whether this error should mark the current turn as failed when replaying history. + pub fn affects_turn_status(&self) -> bool { + self.codex_error_info + .as_ref() + .is_none_or(CodexErrorInfo::affects_turn_status) + } +} + #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] pub struct WarningEvent { pub message: String, @@ -2051,6 +2081,14 @@ pub enum ExecCommandSource { UnifiedExecInteraction, } +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "snake_case")] +pub enum ExecCommandStatus { + Completed, + Failed, + Declined, +} + #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] pub struct ExecCommandBeginEvent { /// Identifier so this can be paired with the ExecCommandEnd event. @@ -2112,6 +2150,8 @@ pub struct ExecCommandEndEvent { pub duration: Duration, /// Formatted output from the command, as seen by the model. pub formatted_output: String, + /// Completion status for this command execution. + pub status: ExecCommandStatus, } #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] @@ -2235,6 +2275,16 @@ pub struct PatchApplyEndEvent { /// The changes that were applied (mirrors PatchApplyBeginEvent::changes). #[serde(default)] pub changes: HashMap, + /// Completion status for this patch application. + pub status: PatchApplyStatus, +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "snake_case")] +pub enum PatchApplyStatus { + Completed, + Failed, + Declined, } #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] @@ -2789,6 +2839,24 @@ mod tests { assert!(event.as_legacy_events(false).is_empty()); } + #[test] + fn rollback_failed_error_does_not_affect_turn_status() { + let event = ErrorEvent { + message: "rollback failed".into(), + codex_error_info: Some(CodexErrorInfo::ThreadRollbackFailed), + }; + assert!(!event.affects_turn_status()); + } + + #[test] + fn generic_error_affects_turn_status() { + let event = ErrorEvent { + message: "generic".into(), + codex_error_info: Some(CodexErrorInfo::Other), + }; + assert!(event.affects_turn_status()); + } + #[test] fn user_input_serialization_omits_final_output_json_schema_when_none() -> Result<()> { let op = Op::UserInput { diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 2f426570f..c700a3a84 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -1078,7 +1078,7 @@ impl App { SessionSelection::Fork(path) => { otel_manager.counter("codex.thread.fork", 1, &[("source", "cli_subcommand")]); let forked = thread_manager - .fork_thread(usize::MAX, config.clone(), path.clone()) + .fork_thread(usize::MAX, config.clone(), path.clone(), false) .await .wrap_err_with(|| { let path_display = path.display(); @@ -1463,7 +1463,7 @@ impl App { if path.exists() { match self .server - .fork_thread(usize::MAX, self.config.clone(), path.clone()) + .fork_thread(usize::MAX, self.config.clone(), path.clone(), false) .await { Ok(forked) => { diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index a126c5f98..e3dc0af34 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -36,6 +36,7 @@ use codex_core::protocol::ExecApprovalRequestEvent; use codex_core::protocol::ExecCommandBeginEvent; use codex_core::protocol::ExecCommandEndEvent; use codex_core::protocol::ExecCommandSource; +use codex_core::protocol::ExecCommandStatus as CoreExecCommandStatus; use codex_core::protocol::ExecPolicyAmendment; use codex_core::protocol::ExitedReviewModeEvent; use codex_core::protocol::FileChange; @@ -46,6 +47,7 @@ use codex_core::protocol::McpStartupUpdateEvent; use codex_core::protocol::Op; use codex_core::protocol::PatchApplyBeginEvent; use codex_core::protocol::PatchApplyEndEvent; +use codex_core::protocol::PatchApplyStatus as CorePatchApplyStatus; use codex_core::protocol::RateLimitWindow; use codex_core::protocol::ReviewRequest; use codex_core::protocol::ReviewTarget; @@ -2180,6 +2182,11 @@ fn end_exec( exit_code, duration: std::time::Duration::from_millis(5), formatted_output: aggregated, + status: if exit_code == 0 { + CoreExecCommandStatus::Completed + } else { + CoreExecCommandStatus::Failed + }, }), }); } @@ -2641,6 +2648,7 @@ async fn exec_end_without_begin_uses_event_command() { exit_code: 0, duration: std::time::Duration::from_millis(5), formatted_output: "done".to_string(), + status: CoreExecCommandStatus::Completed, }), }); @@ -5284,6 +5292,7 @@ async fn apply_patch_events_emit_history_cells() { stderr: String::new(), success: true, changes: end_changes, + status: CorePatchApplyStatus::Completed, }; chat.handle_codex_event(Event { id: "s1".into(), @@ -5511,6 +5520,7 @@ async fn apply_patch_full_flow_integration_like() { stderr: String::new(), success: true, changes: end_changes, + status: CorePatchApplyStatus::Completed, }), }); } @@ -6101,6 +6111,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() { exit_code: 0, duration: std::time::Duration::from_millis(16000), formatted_output: String::new(), + status: CoreExecCommandStatus::Completed, }), }); chat.handle_codex_event(Event {