From efc8d45750e512f42a2b1de8a80733d593d9df17 Mon Sep 17 00:00:00 2001 From: Owen Lin Date: Thu, 12 Feb 2026 11:34:22 -0800 Subject: [PATCH] feat(app-server): experimental flag to persist extended history (#11227) This PR adds an experimental `persist_extended_history` bool flag to app-server thread APIs so rollout logs can retain a richer set of EventMsgs for non-lossy Thread > Turn > ThreadItems reconstruction (i.e. on `thread/resume`). ### Motivation Today, our rollout recorder only persists a small subset (e.g. user message, reasoning, assistant message) of `EventMsg` types, dropping a good number (like command exec, file change, etc.) that are important for reconstructing full item history for `thread/resume`, `thread/read`, and `thread/fork`. Some clients want to be able to resume a thread without lossiness. This lossiness is primarily a UI thing, since what the model sees are `ResponseItem` and not `EventMsg`. ### Approach This change introduces an opt-in `persist_full_history` flag to preserve those events when you start/resume/fork a thread (defaults to `false`). This is done by adding an `EventPersistenceMode` to the rollout recorder: - `Limited` (existing behavior, default) - `Extended` (new opt-in behavior) In `Extended` mode, persist additional `EventMsg` variants needed for non-lossy app-server `ThreadItem` reconstruction. We now store the following ThreadItems that we didn't before: - web search - command execution - patch/file changes - MCP tool calls - image view calls - collab tool outcomes - context compaction - review mode enter/exit For **command executions** in particular, we truncate the output using the existing `truncate_text` from core to store an upper bound of 10,000 bytes, which is also the default value for truncating tool outputs shown to the model. This keeps the size of the rollout file and command execution items returned over the wire reasonable. And we also persist `EventMsg::Error` which we can now map back to the Turn's status and populates the Turn's error metadata. #### Updates to EventMsgs To truly make `thread/resume` non-lossy, we also needed to persist the `status` on `EventMsg::CommandExecutionEndEvent` and `EventMsg::PatchApplyEndEvent`. Previously it was not obvious whether a command failed or was declined (similar for apply_patch). These EventMsgs were never persisted before so I made it a required field. --- AGENTS.md | 2 + codex-rs/Cargo.lock | 1 + codex-rs/app-server-protocol/Cargo.toml | 1 + .../schema/json/EventMsg.json | 52 + .../schema/json/ServerNotification.json | 34 + .../codex_app_server_protocol.schemas.json | 34 + .../json/v1/ForkConversationResponse.json | 34 + .../json/v1/ResumeConversationResponse.json | 34 + .../v1/SessionConfiguredNotification.json | 34 + .../schema/typescript/ExecCommandEndEvent.ts | 7 +- .../schema/typescript/ExecCommandStatus.ts | 5 + .../schema/typescript/PatchApplyEndEvent.ts | 7 +- .../schema/typescript/PatchApplyStatus.ts | 5 + .../schema/typescript/index.ts | 2 + .../schema/typescript/v2/ThreadForkParams.ts | 6 +- .../typescript/v2/ThreadResumeParams.ts | 6 +- .../schema/typescript/v2/ThreadStartParams.ts | 6 +- .../src/protocol/common.rs | 13 - .../src/protocol/thread_history.rs | 1024 ++++++++++++++++- .../app-server-protocol/src/protocol/v2.rs | 49 + codex-rs/app-server/README.md | 2 + .../app-server/src/bespoke_event_handling.rs | 17 +- .../app-server/src/codex_message_processor.rs | 25 +- codex-rs/core/src/agent/control.rs | 2 +- codex-rs/core/src/codex.rs | 23 +- codex-rs/core/src/codex_delegate.rs | 1 + codex-rs/core/src/lib.rs | 1 + codex-rs/core/src/rollout/policy.rs | 74 +- codex-rs/core/src/rollout/recorder.rs | 164 ++- codex-rs/core/src/tasks/user_shell.rs | 8 + codex-rs/core/src/thread_manager.rs | 21 +- codex-rs/core/src/tools/events.rs | 60 +- codex-rs/core/src/tools/handlers/collab.rs | 1 + .../core/tests/suite/compact_resume_fork.rs | 2 +- codex-rs/core/tests/suite/fork_thread.rs | 4 +- .../core/tests/suite/permissions_messages.rs | 2 +- codex-rs/core/tests/suite/resume_warning.rs | 2 +- .../core/tests/suite/rollout_list_find.rs | 2 + .../tests/suite/unstable_features_warning.rs | 4 +- .../tests/event_processor_with_json_output.rs | 8 + codex-rs/protocol/src/protocol.rs | 68 ++ codex-rs/tui/src/app.rs | 4 +- codex-rs/tui/src/chatwidget/tests.rs | 11 + 43 files changed, 1724 insertions(+), 138 deletions(-) create mode 100644 codex-rs/app-server-protocol/schema/typescript/ExecCommandStatus.ts create mode 100644 codex-rs/app-server-protocol/schema/typescript/PatchApplyStatus.ts diff --git a/AGENTS.md b/AGENTS.md index 292e4c92d..62f303769 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -158,3 +158,5 @@ These guidelines apply to app-server protocol work in `codex-rs`, especially: `just write-app-server-schema` (and `just write-app-server-schema --experimental` when experimental API fixtures are affected). - Validate with `cargo test -p codex-app-server-protocol`. +- Avoid boilerplate tests that only assert experimental field markers for individual + request fields in `common.rs`; rely on schema generation/tests and behavioral coverage instead. diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index f0ddba0f6..ad6ba0910 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1401,6 +1401,7 @@ dependencies = [ "schemars 0.8.22", "serde", "serde_json", + "shlex", "similar", "strum_macros 0.27.2", "tempfile", diff --git a/codex-rs/app-server-protocol/Cargo.toml b/codex-rs/app-server-protocol/Cargo.toml index 7b2b8c53d..2ab6f291f 100644 --- a/codex-rs/app-server-protocol/Cargo.toml +++ b/codex-rs/app-server-protocol/Cargo.toml @@ -20,6 +20,7 @@ codex-utils-absolute-path = { workspace = true } schemars = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } +shlex = { workspace = true } strum_macros = { workspace = true } thiserror = { workspace = true } ts-rs = { workspace = true } diff --git a/codex-rs/app-server-protocol/schema/json/EventMsg.json b/codex-rs/app-server-protocol/schema/json/EventMsg.json index bdeb06b7b..c0a502899 100644 --- a/codex-rs/app-server-protocol/schema/json/EventMsg.json +++ b/codex-rs/app-server-protocol/schema/json/EventMsg.json @@ -1349,6 +1349,14 @@ "default": "agent", "description": "Where the command originated. Defaults to Agent for backward compatibility." }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/ExecCommandStatus" + } + ], + "description": "Completion status for this command execution." + }, "stderr": { "description": "Captured stderr", "type": "string" @@ -1377,6 +1385,7 @@ "exit_code", "formatted_output", "parsed_cmd", + "status", "stderr", "stdout", "turn_id", @@ -1805,6 +1814,14 @@ "description": "The changes that were applied (mirrors PatchApplyBeginEvent::changes).", "type": "object" }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/PatchApplyStatus" + } + ], + "description": "Completion status for this patch application." + }, "stderr": { "description": "Captured stderr (parser errors, IO failures, etc.).", "type": "string" @@ -1832,6 +1849,7 @@ }, "required": [ "call_id", + "status", "stderr", "stdout", "success", @@ -2873,6 +2891,14 @@ ], "type": "string" }, + "ExecCommandStatus": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "ExecOutputStream": { "enum": [ "stdout", @@ -3400,6 +3426,14 @@ } ] }, + "PatchApplyStatus": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "PlanItemArg": { "additionalProperties": false, "properties": { @@ -6185,6 +6219,14 @@ "default": "agent", "description": "Where the command originated. Defaults to Agent for backward compatibility." }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/ExecCommandStatus" + } + ], + "description": "Completion status for this command execution." + }, "stderr": { "description": "Captured stderr", "type": "string" @@ -6213,6 +6255,7 @@ "exit_code", "formatted_output", "parsed_cmd", + "status", "stderr", "stdout", "turn_id", @@ -6641,6 +6684,14 @@ "description": "The changes that were applied (mirrors PatchApplyBeginEvent::changes).", "type": "object" }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/PatchApplyStatus" + } + ], + "description": "Completion status for this patch application." + }, "stderr": { "description": "Captured stderr (parser errors, IO failures, etc.).", "type": "string" @@ -6668,6 +6719,7 @@ }, "required": [ "call_id", + "status", "stderr", "stdout", "success", diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index dcdc315d9..1b312ea1e 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -1965,6 +1965,14 @@ "default": "agent", "description": "Where the command originated. Defaults to Agent for backward compatibility." }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/ExecCommandStatus" + } + ], + "description": "Completion status for this command execution." + }, "stderr": { "description": "Captured stderr", "type": "string" @@ -1993,6 +2001,7 @@ "exit_code", "formatted_output", "parsed_cmd", + "status", "stderr", "stdout", "turn_id", @@ -2421,6 +2430,14 @@ "description": "The changes that were applied (mirrors PatchApplyBeginEvent::changes).", "type": "object" }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/PatchApplyStatus2" + } + ], + "description": "Completion status for this patch application." + }, "stderr": { "description": "Captured stderr (parser errors, IO failures, etc.).", "type": "string" @@ -2448,6 +2465,7 @@ }, "required": [ "call_id", + "status", "stderr", "stdout", "success", @@ -3489,6 +3507,14 @@ ], "type": "string" }, + "ExecCommandStatus": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "ExecOutputStream": { "enum": [ "stdout", @@ -4285,6 +4311,14 @@ ], "type": "string" }, + "PatchApplyStatus2": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "PatchChangeKind": { "oneOf": [ { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index f7be9db0d..e124325f0 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -3362,6 +3362,14 @@ "default": "agent", "description": "Where the command originated. Defaults to Agent for backward compatibility." }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/ExecCommandStatus" + } + ], + "description": "Completion status for this command execution." + }, "stderr": { "description": "Captured stderr", "type": "string" @@ -3390,6 +3398,7 @@ "exit_code", "formatted_output", "parsed_cmd", + "status", "stderr", "stdout", "turn_id", @@ -3818,6 +3827,14 @@ "description": "The changes that were applied (mirrors PatchApplyBeginEvent::changes).", "type": "object" }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/v2/PatchApplyStatus" + } + ], + "description": "Completion status for this patch application." + }, "stderr": { "description": "Captured stderr (parser errors, IO failures, etc.).", "type": "string" @@ -3845,6 +3862,7 @@ }, "required": [ "call_id", + "status", "stderr", "stdout", "success", @@ -4942,6 +4960,14 @@ ], "type": "string" }, + "ExecCommandStatus": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "ExecOneOffCommandParams": { "$schema": "http://json-schema.org/draft-07/schema#", "properties": { @@ -6446,6 +6472,14 @@ } ] }, + "PatchApplyStatus": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "PlanItemArg": { "additionalProperties": false, "properties": { diff --git a/codex-rs/app-server-protocol/schema/json/v1/ForkConversationResponse.json b/codex-rs/app-server-protocol/schema/json/v1/ForkConversationResponse.json index 9c9b52d4e..7d0d26c05 100644 --- a/codex-rs/app-server-protocol/schema/json/v1/ForkConversationResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v1/ForkConversationResponse.json @@ -1349,6 +1349,14 @@ "default": "agent", "description": "Where the command originated. Defaults to Agent for backward compatibility." }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/ExecCommandStatus" + } + ], + "description": "Completion status for this command execution." + }, "stderr": { "description": "Captured stderr", "type": "string" @@ -1377,6 +1385,7 @@ "exit_code", "formatted_output", "parsed_cmd", + "status", "stderr", "stdout", "turn_id", @@ -1805,6 +1814,14 @@ "description": "The changes that were applied (mirrors PatchApplyBeginEvent::changes).", "type": "object" }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/PatchApplyStatus" + } + ], + "description": "Completion status for this patch application." + }, "stderr": { "description": "Captured stderr (parser errors, IO failures, etc.).", "type": "string" @@ -1832,6 +1849,7 @@ }, "required": [ "call_id", + "status", "stderr", "stdout", "success", @@ -2873,6 +2891,14 @@ ], "type": "string" }, + "ExecCommandStatus": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "ExecOutputStream": { "enum": [ "stdout", @@ -3400,6 +3426,14 @@ } ] }, + "PatchApplyStatus": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "PlanItemArg": { "additionalProperties": false, "properties": { diff --git a/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationResponse.json b/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationResponse.json index 01bb2bbc2..4b1e90815 100644 --- a/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v1/ResumeConversationResponse.json @@ -1349,6 +1349,14 @@ "default": "agent", "description": "Where the command originated. Defaults to Agent for backward compatibility." }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/ExecCommandStatus" + } + ], + "description": "Completion status for this command execution." + }, "stderr": { "description": "Captured stderr", "type": "string" @@ -1377,6 +1385,7 @@ "exit_code", "formatted_output", "parsed_cmd", + "status", "stderr", "stdout", "turn_id", @@ -1805,6 +1814,14 @@ "description": "The changes that were applied (mirrors PatchApplyBeginEvent::changes).", "type": "object" }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/PatchApplyStatus" + } + ], + "description": "Completion status for this patch application." + }, "stderr": { "description": "Captured stderr (parser errors, IO failures, etc.).", "type": "string" @@ -1832,6 +1849,7 @@ }, "required": [ "call_id", + "status", "stderr", "stdout", "success", @@ -2873,6 +2891,14 @@ ], "type": "string" }, + "ExecCommandStatus": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "ExecOutputStream": { "enum": [ "stdout", @@ -3400,6 +3426,14 @@ } ] }, + "PatchApplyStatus": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "PlanItemArg": { "additionalProperties": false, "properties": { diff --git a/codex-rs/app-server-protocol/schema/json/v1/SessionConfiguredNotification.json b/codex-rs/app-server-protocol/schema/json/v1/SessionConfiguredNotification.json index f68ec3331..c9fb2c8e3 100644 --- a/codex-rs/app-server-protocol/schema/json/v1/SessionConfiguredNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v1/SessionConfiguredNotification.json @@ -1349,6 +1349,14 @@ "default": "agent", "description": "Where the command originated. Defaults to Agent for backward compatibility." }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/ExecCommandStatus" + } + ], + "description": "Completion status for this command execution." + }, "stderr": { "description": "Captured stderr", "type": "string" @@ -1377,6 +1385,7 @@ "exit_code", "formatted_output", "parsed_cmd", + "status", "stderr", "stdout", "turn_id", @@ -1805,6 +1814,14 @@ "description": "The changes that were applied (mirrors PatchApplyBeginEvent::changes).", "type": "object" }, + "status": { + "allOf": [ + { + "$ref": "#/definitions/PatchApplyStatus" + } + ], + "description": "Completion status for this patch application." + }, "stderr": { "description": "Captured stderr (parser errors, IO failures, etc.).", "type": "string" @@ -1832,6 +1849,7 @@ }, "required": [ "call_id", + "status", "stderr", "stdout", "success", @@ -2873,6 +2891,14 @@ ], "type": "string" }, + "ExecCommandStatus": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "ExecOutputStream": { "enum": [ "stdout", @@ -3400,6 +3426,14 @@ } ] }, + "PatchApplyStatus": { + "enum": [ + "completed", + "failed", + "declined" + ], + "type": "string" + }, "PlanItemArg": { "additionalProperties": false, "properties": { diff --git a/codex-rs/app-server-protocol/schema/typescript/ExecCommandEndEvent.ts b/codex-rs/app-server-protocol/schema/typescript/ExecCommandEndEvent.ts index c9b465e45..0bfc41ea8 100644 --- a/codex-rs/app-server-protocol/schema/typescript/ExecCommandEndEvent.ts +++ b/codex-rs/app-server-protocol/schema/typescript/ExecCommandEndEvent.ts @@ -2,6 +2,7 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { ExecCommandSource } from "./ExecCommandSource"; +import type { ExecCommandStatus } from "./ExecCommandStatus"; import type { ParsedCommand } from "./ParsedCommand"; export type ExecCommandEndEvent = { @@ -56,4 +57,8 @@ duration: string, /** * Formatted output from the command, as seen by the model. */ -formatted_output: string, }; +formatted_output: string, +/** + * Completion status for this command execution. + */ +status: ExecCommandStatus, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/ExecCommandStatus.ts b/codex-rs/app-server-protocol/schema/typescript/ExecCommandStatus.ts new file mode 100644 index 000000000..d8d91fb19 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/ExecCommandStatus.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type ExecCommandStatus = "completed" | "failed" | "declined"; diff --git a/codex-rs/app-server-protocol/schema/typescript/PatchApplyEndEvent.ts b/codex-rs/app-server-protocol/schema/typescript/PatchApplyEndEvent.ts index d52940af1..9dacb00e4 100644 --- a/codex-rs/app-server-protocol/schema/typescript/PatchApplyEndEvent.ts +++ b/codex-rs/app-server-protocol/schema/typescript/PatchApplyEndEvent.ts @@ -2,6 +2,7 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { FileChange } from "./FileChange"; +import type { PatchApplyStatus } from "./PatchApplyStatus"; export type PatchApplyEndEvent = { /** @@ -28,4 +29,8 @@ success: boolean, /** * The changes that were applied (mirrors PatchApplyBeginEvent::changes). */ -changes: { [key in string]?: FileChange }, }; +changes: { [key in string]?: FileChange }, +/** + * Completion status for this patch application. + */ +status: PatchApplyStatus, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/PatchApplyStatus.ts b/codex-rs/app-server-protocol/schema/typescript/PatchApplyStatus.ts new file mode 100644 index 000000000..721fcd9b1 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/PatchApplyStatus.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type PatchApplyStatus = "completed" | "failed" | "declined"; diff --git a/codex-rs/app-server-protocol/schema/typescript/index.ts b/codex-rs/app-server-protocol/schema/typescript/index.ts index 62a1718f9..2e7446a3d 100644 --- a/codex-rs/app-server-protocol/schema/typescript/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/index.ts @@ -62,6 +62,7 @@ export type { ExecCommandBeginEvent } from "./ExecCommandBeginEvent"; export type { ExecCommandEndEvent } from "./ExecCommandEndEvent"; export type { ExecCommandOutputDeltaEvent } from "./ExecCommandOutputDeltaEvent"; export type { ExecCommandSource } from "./ExecCommandSource"; +export type { ExecCommandStatus } from "./ExecCommandStatus"; export type { ExecOneOffCommandParams } from "./ExecOneOffCommandParams"; export type { ExecOneOffCommandResponse } from "./ExecOneOffCommandResponse"; export type { ExecOutputStream } from "./ExecOutputStream"; @@ -129,6 +130,7 @@ export type { NewConversationResponse } from "./NewConversationResponse"; export type { ParsedCommand } from "./ParsedCommand"; export type { PatchApplyBeginEvent } from "./PatchApplyBeginEvent"; export type { PatchApplyEndEvent } from "./PatchApplyEndEvent"; +export type { PatchApplyStatus } from "./PatchApplyStatus"; export type { Personality } from "./Personality"; export type { PlanDeltaEvent } from "./PlanDeltaEvent"; export type { PlanItem } from "./PlanItem"; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts index 44c81a24e..742e4d703 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts @@ -21,4 +21,8 @@ export type ThreadForkParams = {threadId: string, /** path?: string | null, /** * Configuration overrides for the forked thread, if any. */ -model?: string | null, modelProvider?: string | null, cwd?: string | null, approvalPolicy?: AskForApproval | null, sandbox?: SandboxMode | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null}; +model?: string | null, modelProvider?: string | null, cwd?: string | null, approvalPolicy?: AskForApproval | null, sandbox?: SandboxMode | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null, /** + * If true, persist additional rollout EventMsg variants required to + * reconstruct a richer thread history on subsequent resume/fork/read. + */ +persistExtendedHistory: boolean}; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts index c868b8f95..e79c748cc 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts @@ -30,4 +30,8 @@ history?: Array | null, /** path?: string | null, /** * Configuration overrides for the resumed thread, if any. */ -model?: string | null, modelProvider?: string | null, cwd?: string | null, approvalPolicy?: AskForApproval | null, sandbox?: SandboxMode | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null, personality?: Personality | null}; +model?: string | null, modelProvider?: string | null, cwd?: string | null, approvalPolicy?: AskForApproval | null, sandbox?: SandboxMode | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null, personality?: Personality | null, /** + * If true, persist additional rollout EventMsg variants required to + * reconstruct a richer thread history on subsequent resume/fork/read. + */ +persistExtendedHistory: boolean}; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadStartParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadStartParams.ts index 84ad633d5..ed5e89e73 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadStartParams.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadStartParams.ts @@ -10,4 +10,8 @@ export type ThreadStartParams = {model?: string | null, modelProvider?: string | * If true, opt into emitting raw Responses API items on the event stream. * This is for internal use only (e.g. Codex Cloud). */ -experimentalRawEvents: boolean}; +experimentalRawEvents: boolean, /** + * If true, persist additional rollout EventMsg variants required to + * reconstruct a richer thread history on resume/fork/read. + */ +persistExtendedHistory: boolean}; diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 71d75a156..bc9134116 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -1323,17 +1323,4 @@ mod tests { let reason = crate::experimental_api::ExperimentalApi::experimental_reason(&request); assert_eq!(reason, Some("mock/experimentalMethod")); } - - #[test] - fn thread_start_mock_field_is_marked_experimental() { - let request = ClientRequest::ThreadStart { - request_id: RequestId::Integer(1), - params: v2::ThreadStartParams { - mock_experimental_field: Some("mock".to_string()), - ..Default::default() - }, - }; - let reason = crate::experimental_api::ExperimentalApi::experimental_reason(&request); - assert_eq!(reason, Some("thread/start.mockExperimentalField")); - } } diff --git a/codex-rs/app-server-protocol/src/protocol/thread_history.rs b/codex-rs/app-server-protocol/src/protocol/thread_history.rs index 231ee99b2..a54b550fd 100644 --- a/codex-rs/app-server-protocol/src/protocol/thread_history.rs +++ b/codex-rs/app-server-protocol/src/protocol/thread_history.rs @@ -1,21 +1,49 @@ +use crate::protocol::v2::CollabAgentState; +use crate::protocol::v2::CollabAgentTool; +use crate::protocol::v2::CollabAgentToolCallStatus; +use crate::protocol::v2::CommandAction; +use crate::protocol::v2::CommandExecutionStatus; +use crate::protocol::v2::FileUpdateChange; +use crate::protocol::v2::McpToolCallError; +use crate::protocol::v2::McpToolCallResult; +use crate::protocol::v2::McpToolCallStatus; +use crate::protocol::v2::PatchApplyStatus; +use crate::protocol::v2::PatchChangeKind; use crate::protocol::v2::ThreadItem; use crate::protocol::v2::Turn; +use crate::protocol::v2::TurnError as V2TurnError; use crate::protocol::v2::TurnError; use crate::protocol::v2::TurnStatus; use crate::protocol::v2::UserInput; +use crate::protocol::v2::WebSearchAction; use codex_protocol::protocol::AgentReasoningEvent; use codex_protocol::protocol::AgentReasoningRawContentEvent; +use codex_protocol::protocol::AgentStatus; use codex_protocol::protocol::CompactedItem; +use codex_protocol::protocol::ContextCompactedEvent; +use codex_protocol::protocol::ErrorEvent; use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::ExecCommandEndEvent; use codex_protocol::protocol::ItemCompletedEvent; +use codex_protocol::protocol::McpToolCallEndEvent; +use codex_protocol::protocol::PatchApplyEndEvent; +use codex_protocol::protocol::ReviewOutputEvent; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::ThreadRolledBackEvent; use codex_protocol::protocol::TurnAbortedEvent; use codex_protocol::protocol::TurnCompleteEvent; use codex_protocol::protocol::TurnStartedEvent; use codex_protocol::protocol::UserMessageEvent; +use codex_protocol::protocol::ViewImageToolCallEvent; +use codex_protocol::protocol::WebSearchEndEvent; +use std::collections::HashMap; use uuid::Uuid; +#[cfg(test)] +use codex_protocol::protocol::ExecCommandStatus as CoreExecCommandStatus; +#[cfg(test)] +use codex_protocol::protocol::PatchApplyStatus as CorePatchApplyStatus; + /// Convert persisted [`RolloutItem`] entries into a sequence of [`Turn`] values. /// /// When available, this uses `TurnContext.turn_id` as the canonical turn id so @@ -58,10 +86,24 @@ impl ThreadHistoryBuilder { EventMsg::AgentReasoningRawContent(payload) => { self.handle_agent_reasoning_raw_content(payload) } + EventMsg::WebSearchEnd(payload) => self.handle_web_search_end(payload), + EventMsg::ExecCommandEnd(payload) => self.handle_exec_command_end(payload), + EventMsg::PatchApplyEnd(payload) => self.handle_patch_apply_end(payload), + EventMsg::McpToolCallEnd(payload) => self.handle_mcp_tool_call_end(payload), + EventMsg::ViewImageToolCall(payload) => self.handle_view_image_tool_call(payload), + EventMsg::CollabAgentSpawnEnd(payload) => self.handle_collab_agent_spawn_end(payload), + EventMsg::CollabAgentInteractionEnd(payload) => { + self.handle_collab_agent_interaction_end(payload) + } + EventMsg::CollabWaitingEnd(payload) => self.handle_collab_waiting_end(payload), + EventMsg::CollabCloseEnd(payload) => self.handle_collab_close_end(payload), + EventMsg::CollabResumeEnd(payload) => self.handle_collab_resume_end(payload), + EventMsg::ContextCompacted(payload) => self.handle_context_compacted(payload), + EventMsg::EnteredReviewMode(payload) => self.handle_entered_review_mode(payload), + EventMsg::ExitedReviewMode(payload) => self.handle_exited_review_mode(payload), EventMsg::ItemCompleted(payload) => self.handle_item_completed(payload), + EventMsg::Error(payload) => self.handle_error(payload), EventMsg::TokenCount(_) => {} - EventMsg::EnteredReviewMode(_) => {} - EventMsg::ExitedReviewMode(_) => {} EventMsg::ThreadRolledBack(payload) => self.handle_thread_rollback(payload), EventMsg::UndoCompleted(_) => {} EventMsg::TurnAborted(payload) => self.handle_turn_aborted(payload), @@ -153,23 +195,348 @@ impl ThreadHistoryBuilder { } fn handle_item_completed(&mut self, payload: &ItemCompletedEvent) { - if let codex_protocol::items::TurnItem::Plan(plan) = &payload.item { - if plan.text.is_empty() { - return; - } - let id = self.next_item_id(); - self.ensure_turn().items.push(ThreadItem::Plan { - id, - text: plan.text.clone(), - }); + if let codex_protocol::items::TurnItem::Plan(plan) = &payload.item + && plan.text.is_empty() + { + return; } + + let item = ThreadItem::from(payload.item.clone()); + self.ensure_turn().items.push(item); } - fn handle_turn_aborted(&mut self, _payload: &TurnAbortedEvent) { + fn handle_web_search_end(&mut self, payload: &WebSearchEndEvent) { + let item = ThreadItem::WebSearch { + id: payload.call_id.clone(), + query: payload.query.clone(), + action: Some(WebSearchAction::from(payload.action.clone())), + }; + self.ensure_turn().items.push(item); + } + + fn handle_exec_command_end(&mut self, payload: &ExecCommandEndEvent) { + let status: CommandExecutionStatus = (&payload.status).into(); + let duration_ms = i64::try_from(payload.duration.as_millis()).unwrap_or(i64::MAX); + let aggregated_output = if payload.aggregated_output.is_empty() { + None + } else { + Some(payload.aggregated_output.clone()) + }; + let command = shlex::try_join(payload.command.iter().map(String::as_str)) + .unwrap_or_else(|_| payload.command.join(" ")); + let command_actions = payload + .parsed_cmd + .iter() + .cloned() + .map(CommandAction::from) + .collect(); + let item = ThreadItem::CommandExecution { + id: payload.call_id.clone(), + command, + cwd: payload.cwd.clone(), + process_id: payload.process_id.clone(), + status, + command_actions, + aggregated_output, + exit_code: Some(payload.exit_code), + duration_ms: Some(duration_ms), + }; + + // Command completions can arrive out of order. Unified exec may return + // while a PTY is still running, then emit ExecCommandEnd later from a + // background exit watcher when that process finally exits. By then, a + // newer user turn may already have started. Route by event turn_id so + // replay preserves the original turn association. + if let Some(turn) = self.current_turn.as_mut() + && turn.id == payload.turn_id + { + turn.items.push(item); + return; + } + + // If the originating turn is already finalized, append there instead + // of attaching to whichever turn is currently active during replay. + if let Some(turn) = self + .turns + .iter_mut() + .find(|turn| turn.id == payload.turn_id) + { + turn.items.push(item); + return; + } + + // Backward-compatibility fallback for partial/legacy streams where the + // event turn_id does not match any known replay turn. + self.ensure_turn().items.push(item); + } + + fn handle_patch_apply_end(&mut self, payload: &PatchApplyEndEvent) { + let status: PatchApplyStatus = (&payload.status).into(); + let item = ThreadItem::FileChange { + id: payload.call_id.clone(), + changes: convert_patch_changes(&payload.changes), + status, + }; + self.ensure_turn().items.push(item); + } + + fn handle_mcp_tool_call_end(&mut self, payload: &McpToolCallEndEvent) { + let status = if payload.is_success() { + McpToolCallStatus::Completed + } else { + McpToolCallStatus::Failed + }; + let duration_ms = i64::try_from(payload.duration.as_millis()).ok(); + let (result, error) = match &payload.result { + Ok(value) => ( + Some(McpToolCallResult { + content: value.content.clone(), + structured_content: value.structured_content.clone(), + }), + None, + ), + Err(message) => ( + None, + Some(McpToolCallError { + message: message.clone(), + }), + ), + }; + let item = ThreadItem::McpToolCall { + id: payload.call_id.clone(), + server: payload.invocation.server.clone(), + tool: payload.invocation.tool.clone(), + status, + arguments: payload + .invocation + .arguments + .clone() + .unwrap_or(serde_json::Value::Null), + result, + error, + duration_ms, + }; + self.ensure_turn().items.push(item); + } + + fn handle_view_image_tool_call(&mut self, payload: &ViewImageToolCallEvent) { + let item = ThreadItem::ImageView { + id: payload.call_id.clone(), + path: payload.path.to_string_lossy().into_owned(), + }; + self.ensure_turn().items.push(item); + } + + fn handle_collab_agent_spawn_end( + &mut self, + payload: &codex_protocol::protocol::CollabAgentSpawnEndEvent, + ) { + let has_receiver = payload.new_thread_id.is_some(); + let status = match &payload.status { + AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed, + _ if has_receiver => CollabAgentToolCallStatus::Completed, + _ => CollabAgentToolCallStatus::Failed, + }; + let (receiver_thread_ids, agents_states) = match &payload.new_thread_id { + Some(id) => { + let receiver_id = id.to_string(); + let received_status = CollabAgentState::from(payload.status.clone()); + ( + vec![receiver_id.clone()], + [(receiver_id, received_status)].into_iter().collect(), + ) + } + None => (Vec::new(), HashMap::new()), + }; + self.ensure_turn() + .items + .push(ThreadItem::CollabAgentToolCall { + id: payload.call_id.clone(), + tool: CollabAgentTool::SpawnAgent, + status, + sender_thread_id: payload.sender_thread_id.to_string(), + receiver_thread_ids, + prompt: Some(payload.prompt.clone()), + agents_states, + }); + } + + fn handle_collab_agent_interaction_end( + &mut self, + payload: &codex_protocol::protocol::CollabAgentInteractionEndEvent, + ) { + let status = match &payload.status { + AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed, + _ => CollabAgentToolCallStatus::Completed, + }; + let receiver_id = payload.receiver_thread_id.to_string(); + let received_status = CollabAgentState::from(payload.status.clone()); + self.ensure_turn() + .items + .push(ThreadItem::CollabAgentToolCall { + id: payload.call_id.clone(), + tool: CollabAgentTool::SendInput, + status, + sender_thread_id: payload.sender_thread_id.to_string(), + receiver_thread_ids: vec![receiver_id.clone()], + prompt: Some(payload.prompt.clone()), + agents_states: [(receiver_id, received_status)].into_iter().collect(), + }); + } + + fn handle_collab_waiting_end( + &mut self, + payload: &codex_protocol::protocol::CollabWaitingEndEvent, + ) { + let status = if payload + .statuses + .values() + .any(|status| matches!(status, AgentStatus::Errored(_) | AgentStatus::NotFound)) + { + CollabAgentToolCallStatus::Failed + } else { + CollabAgentToolCallStatus::Completed + }; + let mut receiver_thread_ids: Vec = + payload.statuses.keys().map(ToString::to_string).collect(); + receiver_thread_ids.sort(); + let agents_states = payload + .statuses + .iter() + .map(|(id, status)| (id.to_string(), CollabAgentState::from(status.clone()))) + .collect(); + self.ensure_turn() + .items + .push(ThreadItem::CollabAgentToolCall { + id: payload.call_id.clone(), + tool: CollabAgentTool::Wait, + status, + sender_thread_id: payload.sender_thread_id.to_string(), + receiver_thread_ids, + prompt: None, + agents_states, + }); + } + + fn handle_collab_close_end(&mut self, payload: &codex_protocol::protocol::CollabCloseEndEvent) { + let status = match &payload.status { + AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed, + _ => CollabAgentToolCallStatus::Completed, + }; + let receiver_id = payload.receiver_thread_id.to_string(); + let agents_states = [( + receiver_id.clone(), + CollabAgentState::from(payload.status.clone()), + )] + .into_iter() + .collect(); + self.ensure_turn() + .items + .push(ThreadItem::CollabAgentToolCall { + id: payload.call_id.clone(), + tool: CollabAgentTool::CloseAgent, + status, + sender_thread_id: payload.sender_thread_id.to_string(), + receiver_thread_ids: vec![receiver_id], + prompt: None, + agents_states, + }); + } + + fn handle_collab_resume_end( + &mut self, + payload: &codex_protocol::protocol::CollabResumeEndEvent, + ) { + let status = match &payload.status { + AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed, + _ => CollabAgentToolCallStatus::Completed, + }; + let receiver_id = payload.receiver_thread_id.to_string(); + let agents_states = [( + receiver_id.clone(), + CollabAgentState::from(payload.status.clone()), + )] + .into_iter() + .collect(); + self.ensure_turn() + .items + .push(ThreadItem::CollabAgentToolCall { + id: payload.call_id.clone(), + tool: CollabAgentTool::ResumeAgent, + status, + sender_thread_id: payload.sender_thread_id.to_string(), + receiver_thread_ids: vec![receiver_id], + prompt: None, + agents_states, + }); + } + + fn handle_context_compacted(&mut self, _payload: &ContextCompactedEvent) { + let id = self.next_item_id(); + self.ensure_turn() + .items + .push(ThreadItem::ContextCompaction { id }); + } + + fn handle_entered_review_mode(&mut self, payload: &codex_protocol::protocol::ReviewRequest) { + let review = payload + .user_facing_hint + .clone() + .unwrap_or_else(|| "Review requested.".to_string()); + let id = self.next_item_id(); + self.ensure_turn() + .items + .push(ThreadItem::EnteredReviewMode { id, review }); + } + + fn handle_exited_review_mode( + &mut self, + payload: &codex_protocol::protocol::ExitedReviewModeEvent, + ) { + let review = payload + .review_output + .as_ref() + .map(render_review_output_text) + .unwrap_or_else(|| REVIEW_FALLBACK_MESSAGE.to_string()); + let id = self.next_item_id(); + self.ensure_turn() + .items + .push(ThreadItem::ExitedReviewMode { id, review }); + } + + fn handle_error(&mut self, payload: &ErrorEvent) { + if !payload.affects_turn_status() { + return; + } let Some(turn) = self.current_turn.as_mut() else { return; }; - turn.status = TurnStatus::Interrupted; + turn.status = TurnStatus::Failed; + turn.error = Some(V2TurnError { + message: payload.message.clone(), + codex_error_info: payload.codex_error_info.clone().map(Into::into), + additional_details: None, + }); + } + + fn handle_turn_aborted(&mut self, payload: &TurnAbortedEvent) { + if let Some(turn_id) = payload.turn_id.as_deref() { + // Prefer an exact ID match so we interrupt the turn explicitly targeted by the event. + if let Some(turn) = self.current_turn.as_mut().filter(|turn| turn.id == turn_id) { + turn.status = TurnStatus::Interrupted; + return; + } + + if let Some(turn) = self.turns.iter_mut().find(|turn| turn.id == turn_id) { + turn.status = TurnStatus::Interrupted; + return; + } + } + + // If the event has no ID (or refers to an unknown turn), fall back to the active turn. + if let Some(turn) = self.current_turn.as_mut() { + turn.status = TurnStatus::Interrupted; + } } fn handle_turn_started(&mut self, payload: &TurnStartedEvent) { @@ -180,9 +547,36 @@ impl ThreadHistoryBuilder { ); } - fn handle_turn_complete(&mut self, _payload: &TurnCompleteEvent) { + fn handle_turn_complete(&mut self, payload: &TurnCompleteEvent) { + let mark_completed = |status: &mut TurnStatus| { + if matches!(*status, TurnStatus::Completed | TurnStatus::InProgress) { + *status = TurnStatus::Completed; + } + }; + + // Prefer an exact ID match from the active turn and then close it. + if let Some(current_turn) = self + .current_turn + .as_mut() + .filter(|turn| turn.id == payload.turn_id) + { + mark_completed(&mut current_turn.status); + self.finish_current_turn(); + return; + } + + if let Some(turn) = self + .turns + .iter_mut() + .find(|turn| turn.id == payload.turn_id) + { + mark_completed(&mut turn.status); + return; + } + + // If the completion event cannot be matched, apply it to the active turn. if let Some(current_turn) = self.current_turn.as_mut() { - current_turn.status = TurnStatus::Completed; + mark_completed(&mut current_turn.status); self.finish_current_turn(); } } @@ -274,6 +668,59 @@ impl ThreadHistoryBuilder { } } +const REVIEW_FALLBACK_MESSAGE: &str = "Reviewer failed to output a response."; + +fn render_review_output_text(output: &ReviewOutputEvent) -> String { + let explanation = output.overall_explanation.trim(); + if explanation.is_empty() { + REVIEW_FALLBACK_MESSAGE.to_string() + } else { + explanation.to_string() + } +} + +fn convert_patch_changes( + changes: &HashMap, +) -> Vec { + let mut converted: Vec = changes + .iter() + .map(|(path, change)| FileUpdateChange { + path: path.to_string_lossy().into_owned(), + kind: map_patch_change_kind(change), + diff: format_file_change_diff(change), + }) + .collect(); + converted.sort_by(|a, b| a.path.cmp(&b.path)); + converted +} + +fn map_patch_change_kind(change: &codex_protocol::protocol::FileChange) -> PatchChangeKind { + match change { + codex_protocol::protocol::FileChange::Add { .. } => PatchChangeKind::Add, + codex_protocol::protocol::FileChange::Delete { .. } => PatchChangeKind::Delete, + codex_protocol::protocol::FileChange::Update { move_path, .. } => PatchChangeKind::Update { + move_path: move_path.clone(), + }, + } +} + +fn format_file_change_diff(change: &codex_protocol::protocol::FileChange) -> String { + match change { + codex_protocol::protocol::FileChange::Add { content } => content.clone(), + codex_protocol::protocol::FileChange::Delete { content } => content.clone(), + codex_protocol::protocol::FileChange::Update { + unified_diff, + move_path, + } => { + if let Some(path) = move_path { + format!("{unified_diff}\n\nMoved to: {}", path.display()) + } else { + unified_diff.clone() + } + } + } +} + struct PendingTurn { id: String, items: Vec, @@ -308,17 +755,28 @@ impl From for Turn { #[cfg(test)] mod tests { use super::*; + use codex_protocol::ThreadId; + use codex_protocol::models::WebSearchAction as CoreWebSearchAction; + use codex_protocol::parse_command::ParsedCommand; use codex_protocol::protocol::AgentMessageEvent; use codex_protocol::protocol::AgentReasoningEvent; use codex_protocol::protocol::AgentReasoningRawContentEvent; + use codex_protocol::protocol::CodexErrorInfo; use codex_protocol::protocol::CompactedItem; + use codex_protocol::protocol::ExecCommandEndEvent; + use codex_protocol::protocol::ExecCommandSource; + use codex_protocol::protocol::McpInvocation; + use codex_protocol::protocol::McpToolCallEndEvent; use codex_protocol::protocol::ThreadRolledBackEvent; use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnAbortedEvent; use codex_protocol::protocol::TurnCompleteEvent; use codex_protocol::protocol::TurnStartedEvent; use codex_protocol::protocol::UserMessageEvent; + use codex_protocol::protocol::WebSearchEndEvent; use pretty_assertions::assert_eq; + use std::path::PathBuf; + use std::time::Duration; use uuid::Uuid; #[test] @@ -706,6 +1164,371 @@ mod tests { ); } + #[test] + fn reconstructs_tool_items_from_persisted_completion_events() { + let events = vec![ + EventMsg::UserMessage(UserMessageEvent { + message: "run tools".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::WebSearchEnd(WebSearchEndEvent { + call_id: "search-1".into(), + query: "codex".into(), + action: CoreWebSearchAction::Search { + query: Some("codex".into()), + queries: None, + }, + }), + EventMsg::ExecCommandEnd(ExecCommandEndEvent { + call_id: "exec-1".into(), + process_id: Some("pid-1".into()), + turn_id: "turn-1".into(), + command: vec!["echo".into(), "hello world".into()], + cwd: PathBuf::from("/tmp"), + parsed_cmd: vec![ParsedCommand::Unknown { + cmd: "echo hello world".into(), + }], + source: ExecCommandSource::Agent, + interaction_input: None, + stdout: String::new(), + stderr: String::new(), + aggregated_output: "hello world\n".into(), + exit_code: 0, + duration: Duration::from_millis(12), + formatted_output: String::new(), + status: CoreExecCommandStatus::Completed, + }), + EventMsg::McpToolCallEnd(McpToolCallEndEvent { + call_id: "mcp-1".into(), + invocation: McpInvocation { + server: "docs".into(), + tool: "lookup".into(), + arguments: Some(serde_json::json!({"id":"123"})), + }, + duration: Duration::from_millis(8), + result: Err("boom".into()), + }), + ]; + + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); + assert_eq!(turns.len(), 1); + assert_eq!(turns[0].items.len(), 4); + assert_eq!( + turns[0].items[1], + ThreadItem::WebSearch { + id: "search-1".into(), + query: "codex".into(), + action: Some(WebSearchAction::Search { + query: Some("codex".into()), + queries: None, + }), + } + ); + assert_eq!( + turns[0].items[2], + ThreadItem::CommandExecution { + id: "exec-1".into(), + command: "echo 'hello world'".into(), + cwd: PathBuf::from("/tmp"), + process_id: Some("pid-1".into()), + status: CommandExecutionStatus::Completed, + command_actions: vec![CommandAction::Unknown { + command: "echo hello world".into(), + }], + aggregated_output: Some("hello world\n".into()), + exit_code: Some(0), + duration_ms: Some(12), + } + ); + assert_eq!( + turns[0].items[3], + ThreadItem::McpToolCall { + id: "mcp-1".into(), + server: "docs".into(), + tool: "lookup".into(), + status: McpToolCallStatus::Failed, + arguments: serde_json::json!({"id":"123"}), + result: None, + error: Some(McpToolCallError { + message: "boom".into(), + }), + duration_ms: Some(8), + } + ); + } + + #[test] + fn reconstructs_declined_exec_and_patch_items() { + let events = vec![ + EventMsg::UserMessage(UserMessageEvent { + message: "run tools".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::ExecCommandEnd(ExecCommandEndEvent { + call_id: "exec-declined".into(), + process_id: Some("pid-2".into()), + turn_id: "turn-1".into(), + command: vec!["ls".into()], + cwd: PathBuf::from("/tmp"), + parsed_cmd: vec![ParsedCommand::Unknown { cmd: "ls".into() }], + source: ExecCommandSource::Agent, + interaction_input: None, + stdout: String::new(), + stderr: "exec command rejected by user".into(), + aggregated_output: "exec command rejected by user".into(), + exit_code: -1, + duration: Duration::ZERO, + formatted_output: String::new(), + status: CoreExecCommandStatus::Declined, + }), + EventMsg::PatchApplyEnd(PatchApplyEndEvent { + call_id: "patch-declined".into(), + turn_id: "turn-1".into(), + stdout: String::new(), + stderr: "patch rejected by user".into(), + success: false, + changes: [( + PathBuf::from("README.md"), + codex_protocol::protocol::FileChange::Add { + content: "hello\n".into(), + }, + )] + .into_iter() + .collect(), + status: CorePatchApplyStatus::Declined, + }), + ]; + + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); + assert_eq!(turns.len(), 1); + assert_eq!(turns[0].items.len(), 3); + assert_eq!( + turns[0].items[1], + ThreadItem::CommandExecution { + id: "exec-declined".into(), + command: "ls".into(), + cwd: PathBuf::from("/tmp"), + process_id: Some("pid-2".into()), + status: CommandExecutionStatus::Declined, + command_actions: vec![CommandAction::Unknown { + command: "ls".into(), + }], + aggregated_output: Some("exec command rejected by user".into()), + exit_code: Some(-1), + duration_ms: Some(0), + } + ); + assert_eq!( + turns[0].items[2], + ThreadItem::FileChange { + id: "patch-declined".into(), + changes: vec![FileUpdateChange { + path: "README.md".into(), + kind: PatchChangeKind::Add, + diff: "hello\n".into(), + }], + status: PatchApplyStatus::Declined, + } + ); + } + + #[test] + fn assigns_late_exec_completion_to_original_turn() { + let events = vec![ + EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-a".into(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + EventMsg::UserMessage(UserMessageEvent { + message: "first".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-a".into(), + last_agent_message: None, + }), + EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-b".into(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + EventMsg::UserMessage(UserMessageEvent { + message: "second".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::ExecCommandEnd(ExecCommandEndEvent { + call_id: "exec-late".into(), + process_id: Some("pid-42".into()), + turn_id: "turn-a".into(), + command: vec!["echo".into(), "done".into()], + cwd: PathBuf::from("/tmp"), + parsed_cmd: vec![ParsedCommand::Unknown { + cmd: "echo done".into(), + }], + source: ExecCommandSource::Agent, + interaction_input: None, + stdout: "done\n".into(), + stderr: String::new(), + aggregated_output: "done\n".into(), + exit_code: 0, + duration: Duration::from_millis(5), + formatted_output: "done\n".into(), + status: CoreExecCommandStatus::Completed, + }), + EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-b".into(), + last_agent_message: None, + }), + ]; + + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); + assert_eq!(turns.len(), 2); + assert_eq!(turns[0].id, "turn-a"); + assert_eq!(turns[1].id, "turn-b"); + assert_eq!(turns[0].items.len(), 2); + assert_eq!(turns[1].items.len(), 1); + assert_eq!( + turns[0].items[1], + ThreadItem::CommandExecution { + id: "exec-late".into(), + command: "echo done".into(), + cwd: PathBuf::from("/tmp"), + process_id: Some("pid-42".into()), + status: CommandExecutionStatus::Completed, + command_actions: vec![CommandAction::Unknown { + command: "echo done".into(), + }], + aggregated_output: Some("done\n".into()), + exit_code: Some(0), + duration_ms: Some(5), + } + ); + } + + #[test] + fn late_turn_complete_does_not_close_active_turn() { + let events = vec![ + EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-a".into(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + EventMsg::UserMessage(UserMessageEvent { + message: "first".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-a".into(), + last_agent_message: None, + }), + EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-b".into(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + EventMsg::UserMessage(UserMessageEvent { + message: "second".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-a".into(), + last_agent_message: None, + }), + EventMsg::AgentMessage(AgentMessageEvent { + message: "still in b".into(), + }), + EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-b".into(), + last_agent_message: None, + }), + ]; + + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); + assert_eq!(turns.len(), 2); + assert_eq!(turns[0].id, "turn-a"); + assert_eq!(turns[1].id, "turn-b"); + assert_eq!(turns[1].items.len(), 2); + } + + #[test] + fn late_turn_aborted_does_not_interrupt_active_turn() { + let events = vec![ + EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-a".into(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + EventMsg::UserMessage(UserMessageEvent { + message: "first".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-a".into(), + last_agent_message: None, + }), + EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-b".into(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + EventMsg::UserMessage(UserMessageEvent { + message: "second".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::TurnAborted(TurnAbortedEvent { + turn_id: Some("turn-a".into()), + reason: TurnAbortReason::Replaced, + }), + EventMsg::AgentMessage(AgentMessageEvent { + message: "still in b".into(), + }), + ]; + + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); + assert_eq!(turns.len(), 2); + assert_eq!(turns[0].id, "turn-a"); + assert_eq!(turns[1].id, "turn-b"); + assert_eq!(turns[1].status, TurnStatus::Completed); + assert_eq!(turns[1].items.len(), 2); + } + #[test] fn preserves_compaction_only_turn() { let items = vec![ @@ -735,4 +1558,175 @@ mod tests { }] ); } + + #[test] + fn reconstructs_collab_resume_end_item() { + let events = vec![ + EventMsg::UserMessage(UserMessageEvent { + message: "resume agent".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::CollabResumeEnd(codex_protocol::protocol::CollabResumeEndEvent { + call_id: "resume-1".into(), + sender_thread_id: ThreadId::try_from("00000000-0000-0000-0000-000000000001") + .expect("valid sender thread id"), + receiver_thread_id: ThreadId::try_from("00000000-0000-0000-0000-000000000002") + .expect("valid receiver thread id"), + status: AgentStatus::Completed(None), + }), + ]; + + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); + assert_eq!(turns.len(), 1); + assert_eq!(turns[0].items.len(), 2); + assert_eq!( + turns[0].items[1], + ThreadItem::CollabAgentToolCall { + id: "resume-1".into(), + tool: CollabAgentTool::ResumeAgent, + status: CollabAgentToolCallStatus::Completed, + sender_thread_id: "00000000-0000-0000-0000-000000000001".into(), + receiver_thread_ids: vec!["00000000-0000-0000-0000-000000000002".into()], + prompt: None, + agents_states: [( + "00000000-0000-0000-0000-000000000002".into(), + CollabAgentState { + status: crate::protocol::v2::CollabAgentStatus::Completed, + message: None, + }, + )] + .into_iter() + .collect(), + } + ); + } + + #[test] + fn rollback_failed_error_does_not_mark_turn_failed() { + let events = vec![ + EventMsg::UserMessage(UserMessageEvent { + message: "hello".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::AgentMessage(AgentMessageEvent { + message: "done".into(), + }), + EventMsg::Error(ErrorEvent { + message: "rollback failed".into(), + codex_error_info: Some(CodexErrorInfo::ThreadRollbackFailed), + }), + ]; + + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); + assert_eq!(turns.len(), 1); + assert_eq!(turns[0].status, TurnStatus::Completed); + assert_eq!(turns[0].error, None); + } + + #[test] + fn out_of_turn_error_does_not_create_or_fail_a_turn() { + let events = vec![ + EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-a".into(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + EventMsg::UserMessage(UserMessageEvent { + message: "hello".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-a".into(), + last_agent_message: None, + }), + EventMsg::Error(ErrorEvent { + message: "request-level failure".into(), + codex_error_info: Some(CodexErrorInfo::BadRequest), + }), + ]; + + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); + assert_eq!(turns.len(), 1); + assert_eq!( + turns[0], + Turn { + id: "turn-a".into(), + status: TurnStatus::Completed, + error: None, + items: vec![ThreadItem::UserMessage { + id: "item-1".into(), + content: vec![UserInput::Text { + text: "hello".into(), + text_elements: Vec::new(), + }], + }], + } + ); + } + + #[test] + fn error_then_turn_complete_preserves_failed_status() { + let events = vec![ + EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-a".into(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + EventMsg::UserMessage(UserMessageEvent { + message: "hello".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::Error(ErrorEvent { + message: "stream failure".into(), + codex_error_info: Some(CodexErrorInfo::ResponseStreamDisconnected { + http_status_code: Some(502), + }), + }), + EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-a".into(), + last_agent_message: None, + }), + ]; + + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); + assert_eq!(turns.len(), 1); + assert_eq!(turns[0].id, "turn-a"); + assert_eq!(turns[0].status, TurnStatus::Failed); + assert_eq!( + turns[0].error, + Some(TurnError { + message: "stream failure".into(), + codex_error_info: Some( + crate::protocol::v2::CodexErrorInfo::ResponseStreamDisconnected { + http_status_code: Some(502), + } + ), + additional_details: None, + }) + ); + } } diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 69510df9f..7cff3e2de 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -29,7 +29,9 @@ use codex_protocol::protocol::AgentStatus as CoreAgentStatus; use codex_protocol::protocol::AskForApproval as CoreAskForApproval; use codex_protocol::protocol::CodexErrorInfo as CoreCodexErrorInfo; use codex_protocol::protocol::CreditsSnapshot as CoreCreditsSnapshot; +use codex_protocol::protocol::ExecCommandStatus as CoreExecCommandStatus; use codex_protocol::protocol::NetworkAccess as CoreNetworkAccess; +use codex_protocol::protocol::PatchApplyStatus as CorePatchApplyStatus; use codex_protocol::protocol::RateLimitSnapshot as CoreRateLimitSnapshot; use codex_protocol::protocol::RateLimitWindow as CoreRateLimitWindow; use codex_protocol::protocol::ReadOnlyAccess as CoreReadOnlyAccess; @@ -1422,6 +1424,11 @@ pub struct ThreadStartParams { #[experimental("thread/start.experimentalRawEvents")] #[serde(default)] pub experimental_raw_events: bool, + /// If true, persist additional rollout EventMsg variants required to + /// reconstruct a richer thread history on resume/fork/read. + #[experimental("thread/start.persistFullHistory")] + #[serde(default)] + pub persist_extended_history: bool, } #[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)] @@ -1503,6 +1510,11 @@ pub struct ThreadResumeParams { pub developer_instructions: Option, #[ts(optional = nullable)] pub personality: Option, + /// If true, persist additional rollout EventMsg variants required to + /// reconstruct a richer thread history on subsequent resume/fork/read. + #[experimental("thread/resume.persistFullHistory")] + #[serde(default)] + pub persist_extended_history: bool, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] @@ -1556,6 +1568,11 @@ pub struct ThreadForkParams { pub base_instructions: Option, #[ts(optional = nullable)] pub developer_instructions: Option, + /// If true, persist additional rollout EventMsg variants required to + /// reconstruct a richer thread history on subsequent resume/fork/read. + #[experimental("thread/fork.persistFullHistory")] + #[serde(default)] + pub persist_extended_history: bool, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] @@ -2622,6 +2639,22 @@ pub enum CommandExecutionStatus { Declined, } +impl From for CommandExecutionStatus { + fn from(value: CoreExecCommandStatus) -> Self { + Self::from(&value) + } +} + +impl From<&CoreExecCommandStatus> for CommandExecutionStatus { + fn from(value: &CoreExecCommandStatus) -> Self { + match value { + CoreExecCommandStatus::Completed => CommandExecutionStatus::Completed, + CoreExecCommandStatus::Failed => CommandExecutionStatus::Failed, + CoreExecCommandStatus::Declined => CommandExecutionStatus::Declined, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] @@ -2662,6 +2695,22 @@ pub enum PatchApplyStatus { Declined, } +impl From for PatchApplyStatus { + fn from(value: CorePatchApplyStatus) -> Self { + Self::from(&value) + } +} + +impl From<&CorePatchApplyStatus> for PatchApplyStatus { + fn from(value: &CorePatchApplyStatus) -> Self { + match value { + CorePatchApplyStatus::Completed => PatchApplyStatus::Completed, + CorePatchApplyStatus::Failed => PatchApplyStatus::Failed, + CorePatchApplyStatus::Declined => PatchApplyStatus::Declined, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index b6c7f9e4f..13c3fda1e 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -209,6 +209,8 @@ To branch from a stored session, call `thread/fork` with the `thread.id`. This c { "method": "thread/started", "params": { "thread": { … } } } ``` +Experimental API: `thread/start`, `thread/resume`, and `thread/fork` accept `persistExtendedHistory: true` to persist a richer subset of ThreadItems for non-lossy history when calling `thread/read`, `thread/resume`, and `thread/fork` later. This does not backfill events that were not persisted previously. + ### Example: List threads (with pagination & filters) `thread/list` lets you render a history UI. Results default to `createdAt` (newest first) descending. Pass any combination of: diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 3892f8886..79170e64b 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -717,6 +717,10 @@ pub(crate) async fn apply_bespoke_event_handling( .await; }; + if !ev.affects_turn_status() { + return; + } + let turn_error = TurnError { message: ev.message, codex_error_info: ev.codex_error_info.map(V2CodexErrorInfo::from), @@ -887,11 +891,7 @@ pub(crate) async fn apply_bespoke_event_handling( // and emit the corresponding EventMsg, we repurpose the call_id as the item_id. let item_id = patch_end_event.call_id.clone(); - let status = if patch_end_event.success { - PatchApplyStatus::Completed - } else { - PatchApplyStatus::Failed - }; + let status: PatchApplyStatus = (&patch_end_event.status).into(); let changes = convert_patch_changes(&patch_end_event.changes); complete_file_change_item( conversation_id, @@ -998,14 +998,11 @@ pub(crate) async fn apply_bespoke_event_handling( aggregated_output, exit_code, duration, + status, .. } = exec_command_end_event; - let status = if exit_code == 0 { - CommandExecutionStatus::Completed - } else { - CommandExecutionStatus::Failed - }; + let status: CommandExecutionStatus = (&status).into(); let command_actions = parsed_cmd .into_iter() .map(V2ParsedCommand::from) diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index b986b649c..603f34b01 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -1894,6 +1894,7 @@ impl CodexMessageProcessor { experimental_raw_events, personality, ephemeral, + persist_extended_history, } = params; let mut typesafe_overrides = self.build_thread_config_overrides( model, @@ -1953,7 +1954,7 @@ impl CodexMessageProcessor { match self .thread_manager - .start_thread_with_tools(config, core_dynamic_tools) + .start_thread_with_tools(config, core_dynamic_tools, persist_extended_history) .await { Ok(new_conv) => { @@ -2690,6 +2691,7 @@ impl CodexMessageProcessor { base_instructions, developer_instructions, personality, + persist_extended_history, } = params; let thread_history = if let Some(history) = history { @@ -2805,7 +2807,12 @@ impl CodexMessageProcessor { match self .thread_manager - .resume_thread_with_history(config, thread_history, self.auth_manager.clone()) + .resume_thread_with_history( + config, + thread_history, + self.auth_manager.clone(), + persist_extended_history, + ) .await { Ok(NewThread { @@ -2910,6 +2917,7 @@ impl CodexMessageProcessor { config: cli_overrides, base_instructions, developer_instructions, + persist_extended_history, } = params; let (rollout_path, source_thread_id) = if let Some(path) = path { @@ -3021,7 +3029,12 @@ impl CodexMessageProcessor { .. } = match self .thread_manager - .fork_thread(usize::MAX, config, rollout_path.clone()) + .fork_thread( + usize::MAX, + config, + rollout_path.clone(), + persist_extended_history, + ) .await { Ok(thread) => thread, @@ -3962,7 +3975,7 @@ impl CodexMessageProcessor { match self .thread_manager - .resume_thread_with_history(config, thread_history, self.auth_manager.clone()) + .resume_thread_with_history(config, thread_history, self.auth_manager.clone(), false) .await { Ok(NewThread { @@ -4162,7 +4175,7 @@ impl CodexMessageProcessor { .. } = match self .thread_manager - .fork_thread(usize::MAX, config, rollout_path.clone()) + .fork_thread(usize::MAX, config, rollout_path.clone(), false) .await { Ok(thread) => thread, @@ -5168,7 +5181,7 @@ impl CodexMessageProcessor { .. } = self .thread_manager - .fork_thread(usize::MAX, config, rollout_path) + .fork_thread(usize::MAX, config, rollout_path, false) .await .map_err(|err| JSONRPCErrorError { code: INTERNAL_ERROR_CODE, diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index b0164e86e..b6cd0d82a 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -50,7 +50,7 @@ impl AgentControl { let new_thread = match session_source { Some(session_source) => { state - .spawn_new_thread_with_source(config, self.clone(), session_source) + .spawn_new_thread_with_source(config, self.clone(), session_source, false) .await? } None => state.spawn_new_thread(config, self.clone()).await?, diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index dbfa92ff4..647b2f760 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -196,6 +196,7 @@ use crate::rollout::RolloutRecorder; use crate::rollout::RolloutRecorderParams; use crate::rollout::map_session_init_error; use crate::rollout::metadata; +use crate::rollout::policy::EventPersistenceMode; use crate::shell; use crate::shell_snapshot::ShellSnapshot; use crate::skills::SkillError; @@ -284,6 +285,7 @@ impl Codex { session_source: SessionSource, agent_control: AgentControl, dynamic_tools: Vec, + persist_extended_history: bool, ) -> CodexResult { let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY); let (tx_event, rx_event) = async_channel::unbounded(); @@ -396,6 +398,7 @@ impl Codex { original_config_do_not_use: Arc::clone(&config), session_source, dynamic_tools, + persist_extended_history, }; // Generate a unique ID for the lifetime of this Codex session. @@ -733,6 +736,7 @@ pub(crate) struct SessionConfiguration { /// Source of the session (cli, vscode, exec, mcp, ...) session_source: SessionSource, dynamic_tools: Vec, + persist_extended_history: bool, } impl SessionConfiguration { @@ -984,12 +988,24 @@ impl Session { text: session_configuration.base_instructions.clone(), }, session_configuration.dynamic_tools.clone(), + if session_configuration.persist_extended_history { + EventPersistenceMode::Extended + } else { + EventPersistenceMode::Limited + }, ), ) } InitialHistory::Resumed(resumed_history) => ( resumed_history.conversation_id, - RolloutRecorderParams::resume(resumed_history.rollout_path.clone()), + RolloutRecorderParams::resume( + resumed_history.rollout_path.clone(), + if session_configuration.persist_extended_history { + EventPersistenceMode::Extended + } else { + EventPersistenceMode::Limited + }, + ), ), }; let state_builder = match &initial_history { @@ -6297,6 +6313,7 @@ mod tests { original_config_do_not_use: Arc::clone(&config), session_source: SessionSource::Exec, dynamic_tools: Vec::new(), + persist_extended_history: false, }; let mut state = SessionState::new(session_configuration); @@ -6387,6 +6404,7 @@ mod tests { original_config_do_not_use: Arc::clone(&config), session_source: SessionSource::Exec, dynamic_tools: Vec::new(), + persist_extended_history: false, }; let mut state = SessionState::new(session_configuration); @@ -6696,6 +6714,7 @@ mod tests { original_config_do_not_use: Arc::clone(&config), session_source: SessionSource::Exec, dynamic_tools: Vec::new(), + persist_extended_history: false, } } @@ -6748,6 +6767,7 @@ mod tests { original_config_do_not_use: Arc::clone(&config), session_source: SessionSource::Exec, dynamic_tools: Vec::new(), + persist_extended_history: false, }; let per_turn_config = Session::build_per_turn_config(&session_configuration); let model_info = ModelsManager::construct_model_info_offline_for_tests( @@ -6893,6 +6913,7 @@ mod tests { original_config_do_not_use: Arc::clone(&config), session_source: SessionSource::Exec, dynamic_tools: Vec::new(), + persist_extended_history: false, }; let per_turn_config = Session::build_per_turn_config(&session_configuration); let model_info = ModelsManager::construct_model_info_offline_for_tests( diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 504b876bb..1f4240f68 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -58,6 +58,7 @@ pub(crate) async fn run_codex_thread_interactive( SessionSource::SubAgent(SubAgentSource::Review), parent_session.services.agent_control.clone(), Vec::new(), + false, ) .await?; let codex = Arc::new(codex); diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 53ac2e0d8..6f075e7dc 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -124,6 +124,7 @@ pub use rollout::list::ThreadsPage; pub use rollout::list::parse_cursor; pub use rollout::list::read_head_for_summary; pub use rollout::list::read_session_meta_line; +pub use rollout::policy::EventPersistenceMode; pub use rollout::rollout_date_parts; pub use rollout::session_index::find_thread_names_by_ids; mod function_tool; diff --git a/codex-rs/core/src/rollout/policy.rs b/codex-rs/core/src/rollout/policy.rs index 87db64fb5..dc3e111a6 100644 --- a/codex-rs/core/src/rollout/policy.rs +++ b/codex-rs/core/src/rollout/policy.rs @@ -2,12 +2,20 @@ use crate::protocol::EventMsg; use crate::protocol::RolloutItem; use codex_protocol::models::ResponseItem; -/// Whether a rollout `item` should be persisted in rollout files. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub enum EventPersistenceMode { + #[default] + Limited, + Extended, +} + +/// Whether a rollout `item` should be persisted in rollout files for the +/// provided persistence `mode`. #[inline] -pub(crate) fn is_persisted_response_item(item: &RolloutItem) -> bool { +pub(crate) fn is_persisted_response_item(item: &RolloutItem, mode: EventPersistenceMode) -> bool { match item { RolloutItem::ResponseItem(item) => should_persist_response_item(item), - RolloutItem::EventMsg(ev) => should_persist_event_msg(ev), + RolloutItem::EventMsg(ev) => should_persist_event_msg(ev, mode), // Persist Codex executive markers so we can analyze flows (e.g., compaction, API turns). RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => { true @@ -51,9 +59,33 @@ pub(crate) fn should_persist_response_item_for_memories(item: &ResponseItem) -> } } -/// Whether an `EventMsg` should be persisted in rollout files. +/// Whether an `EventMsg` should be persisted in rollout files for the +/// provided persistence `mode`. #[inline] -pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { +pub(crate) fn should_persist_event_msg(ev: &EventMsg, mode: EventPersistenceMode) -> bool { + match mode { + EventPersistenceMode::Limited => should_persist_event_msg_limited(ev), + EventPersistenceMode::Extended => should_persist_event_msg_extended(ev), + } +} + +fn should_persist_event_msg_limited(ev: &EventMsg) -> bool { + matches!( + event_msg_persistence_mode(ev), + Some(EventPersistenceMode::Limited) + ) +} + +fn should_persist_event_msg_extended(ev: &EventMsg) -> bool { + matches!( + event_msg_persistence_mode(ev), + Some(EventPersistenceMode::Limited) | Some(EventPersistenceMode::Extended) + ) +} + +/// Returns the minimum persistence mode that includes this event. +/// `None` means the event should never be persisted. +fn event_msg_persistence_mode(ev: &EventMsg) -> Option { match ev { EventMsg::UserMessage(_) | EventMsg::AgentMessage(_) @@ -67,15 +99,29 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { | EventMsg::UndoCompleted(_) | EventMsg::TurnAborted(_) | EventMsg::TurnStarted(_) - | EventMsg::TurnComplete(_) => true, + | EventMsg::TurnComplete(_) => Some(EventPersistenceMode::Limited), EventMsg::ItemCompleted(event) => { // Plan items are derived from streaming tags and are not part of the // raw ResponseItem history, so we persist their completion to replay // them on resume without bloating rollouts with every item lifecycle. - matches!(event.item, codex_protocol::items::TurnItem::Plan(_)) + if matches!(event.item, codex_protocol::items::TurnItem::Plan(_)) { + Some(EventPersistenceMode::Limited) + } else { + None + } } EventMsg::Error(_) - | EventMsg::Warning(_) + | EventMsg::WebSearchEnd(_) + | EventMsg::ExecCommandEnd(_) + | EventMsg::PatchApplyEnd(_) + | EventMsg::McpToolCallEnd(_) + | EventMsg::ViewImageToolCall(_) + | EventMsg::CollabAgentSpawnEnd(_) + | EventMsg::CollabAgentInteractionEnd(_) + | EventMsg::CollabWaitingEnd(_) + | EventMsg::CollabCloseEnd(_) + | EventMsg::CollabResumeEnd(_) => Some(EventPersistenceMode::Extended), + EventMsg::Warning(_) | EventMsg::AgentMessageDelta(_) | EventMsg::AgentReasoningDelta(_) | EventMsg::AgentReasoningRawContentDelta(_) @@ -84,13 +130,10 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { | EventMsg::SessionConfigured(_) | EventMsg::ThreadNameUpdated(_) | EventMsg::McpToolCallBegin(_) - | EventMsg::McpToolCallEnd(_) | EventMsg::WebSearchBegin(_) - | EventMsg::WebSearchEnd(_) | EventMsg::ExecCommandBegin(_) | EventMsg::TerminalInteraction(_) | EventMsg::ExecCommandOutputDelta(_) - | EventMsg::ExecCommandEnd(_) | EventMsg::ExecApprovalRequest(_) | EventMsg::RequestUserInput(_) | EventMsg::DynamicToolCallRequest(_) @@ -99,7 +142,6 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { | EventMsg::BackgroundEvent(_) | EventMsg::StreamError(_) | EventMsg::PatchApplyBegin(_) - | EventMsg::PatchApplyEnd(_) | EventMsg::TurnDiff(_) | EventMsg::GetHistoryEntryResponse(_) | EventMsg::UndoStarted(_) @@ -112,7 +154,6 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { | EventMsg::RemoteSkillDownloaded(_) | EventMsg::PlanUpdate(_) | EventMsg::ShutdownComplete - | EventMsg::ViewImageToolCall(_) | EventMsg::DeprecationNotice(_) | EventMsg::ItemStarted(_) | EventMsg::AgentMessageContentDelta(_) @@ -121,14 +162,9 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { | EventMsg::ReasoningRawContentDelta(_) | EventMsg::SkillsUpdateAvailable | EventMsg::CollabAgentSpawnBegin(_) - | EventMsg::CollabAgentSpawnEnd(_) | EventMsg::CollabAgentInteractionBegin(_) - | EventMsg::CollabAgentInteractionEnd(_) | EventMsg::CollabWaitingBegin(_) - | EventMsg::CollabWaitingEnd(_) | EventMsg::CollabCloseBegin(_) - | EventMsg::CollabCloseEnd(_) - | EventMsg::CollabResumeBegin(_) - | EventMsg::CollabResumeEnd(_) => false, + | EventMsg::CollabResumeBegin(_) => None, } } diff --git a/codex-rs/core/src/rollout/recorder.rs b/codex-rs/core/src/rollout/recorder.rs index 4e94d0356..d0df0828a 100644 --- a/codex-rs/core/src/rollout/recorder.rs +++ b/codex-rs/core/src/rollout/recorder.rs @@ -36,6 +36,7 @@ use super::list::get_threads_in_root; use super::list::parse_cursor; use super::list::parse_timestamp_uuid_from_filename; use super::metadata; +use super::policy::EventPersistenceMode; use super::policy::is_persisted_response_item; use crate::config::Config; use crate::default_client::originator; @@ -43,6 +44,9 @@ use crate::git_info::collect_git_info; use crate::path_utils; use crate::state_db; use crate::state_db::StateDbHandle; +use crate::truncate::TruncationPolicy; +use crate::truncate::truncate_text; +use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::InitialHistory; use codex_protocol::protocol::ResumedHistory; use codex_protocol::protocol::RolloutItem; @@ -67,6 +71,7 @@ pub struct RolloutRecorder { tx: Sender, pub(crate) rollout_path: PathBuf, state_db: Option, + event_persistence_mode: EventPersistenceMode, } #[derive(Clone)] @@ -77,9 +82,11 @@ pub enum RolloutRecorderParams { source: SessionSource, base_instructions: BaseInstructions, dynamic_tools: Vec, + event_persistence_mode: EventPersistenceMode, }, Resume { path: PathBuf, + event_persistence_mode: EventPersistenceMode, }, } @@ -104,6 +111,7 @@ impl RolloutRecorderParams { source: SessionSource, base_instructions: BaseInstructions, dynamic_tools: Vec, + event_persistence_mode: EventPersistenceMode, ) -> Self { Self::Create { conversation_id, @@ -111,11 +119,42 @@ impl RolloutRecorderParams { source, base_instructions, dynamic_tools, + event_persistence_mode, } } - pub fn resume(path: PathBuf) -> Self { - Self::Resume { path } + pub fn resume(path: PathBuf, event_persistence_mode: EventPersistenceMode) -> Self { + Self::Resume { + path, + event_persistence_mode, + } + } +} + +const PERSISTED_EXEC_AGGREGATED_OUTPUT_MAX_BYTES: usize = 10_000; + +fn sanitize_rollout_item_for_persistence( + item: RolloutItem, + mode: EventPersistenceMode, +) -> RolloutItem { + if mode != EventPersistenceMode::Extended { + return item; + } + + match item { + RolloutItem::EventMsg(EventMsg::ExecCommandEnd(mut event)) => { + // Persist only a bounded aggregated summary of command output. + event.aggregated_output = truncate_text( + &event.aggregated_output, + TruncationPolicy::Bytes(PERSISTED_EXEC_AGGREGATED_OUTPUT_MAX_BYTES), + ); + // Drop unnecessary fields from rollout storage since aggregated_output is all we need. + event.stdout.clear(); + event.stderr.clear(); + event.formatted_output.clear(); + RolloutItem::EventMsg(EventMsg::ExecCommandEnd(event)) + } + _ => item, } } @@ -322,58 +361,70 @@ impl RolloutRecorder { state_db_ctx: Option, state_builder: Option, ) -> std::io::Result { - let (file, deferred_log_file_info, rollout_path, meta) = match params { - RolloutRecorderParams::Create { - conversation_id, - forked_from_id, - source, - base_instructions, - dynamic_tools, - } => { - let log_file_info = precompute_log_file_info(config, conversation_id)?; - let path = log_file_info.path.clone(); - let session_id = log_file_info.conversation_id; - let started_at = log_file_info.timestamp; - - let timestamp_format: &[FormatItem] = format_description!( - "[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z" - ); - let timestamp = started_at - .to_offset(time::UtcOffset::UTC) - .format(timestamp_format) - .map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?; - - let session_meta = SessionMeta { - id: session_id, + let (file, deferred_log_file_info, rollout_path, meta, event_persistence_mode) = + match params { + RolloutRecorderParams::Create { + conversation_id, forked_from_id, - timestamp, - cwd: config.cwd.clone(), - originator: originator().value, - cli_version: env!("CARGO_PKG_VERSION").to_string(), source, - model_provider: Some(config.model_provider_id.clone()), - base_instructions: Some(base_instructions), - dynamic_tools: if dynamic_tools.is_empty() { - None - } else { - Some(dynamic_tools) - }, - }; + base_instructions, + dynamic_tools, + event_persistence_mode, + } => { + let log_file_info = precompute_log_file_info(config, conversation_id)?; + let path = log_file_info.path.clone(); + let session_id = log_file_info.conversation_id; + let started_at = log_file_info.timestamp; - (None, Some(log_file_info), path, Some(session_meta)) - } - RolloutRecorderParams::Resume { path } => ( - Some( - tokio::fs::OpenOptions::new() - .append(true) - .open(&path) - .await?, + let timestamp_format: &[FormatItem] = format_description!( + "[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z" + ); + let timestamp = started_at + .to_offset(time::UtcOffset::UTC) + .format(timestamp_format) + .map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?; + + let session_meta = SessionMeta { + id: session_id, + forked_from_id, + timestamp, + cwd: config.cwd.clone(), + originator: originator().value, + cli_version: env!("CARGO_PKG_VERSION").to_string(), + source, + model_provider: Some(config.model_provider_id.clone()), + base_instructions: Some(base_instructions), + dynamic_tools: if dynamic_tools.is_empty() { + None + } else { + Some(dynamic_tools) + }, + }; + + ( + None, + Some(log_file_info), + path, + Some(session_meta), + event_persistence_mode, + ) + } + RolloutRecorderParams::Resume { + path, + event_persistence_mode, + } => ( + Some( + tokio::fs::OpenOptions::new() + .append(true) + .open(&path) + .await?, + ), + None, + path, + None, + event_persistence_mode, ), - None, - path, - None, - ), - }; + }; // Clone the cwd for the spawned task to collect git info asynchronously let cwd = config.cwd.clone(); @@ -402,6 +453,7 @@ impl RolloutRecorder { tx, rollout_path, state_db: state_db_ctx, + event_persistence_mode, }) } @@ -419,8 +471,11 @@ impl RolloutRecorder { // Note that function calls may look a bit strange if they are // "fully qualified MCP tool calls," so we could consider // reformatting them in that case. - if is_persisted_response_item(item) { - filtered.push(item.clone()); + if is_persisted_response_item(item, self.event_persistence_mode) { + filtered.push(sanitize_rollout_item_for_persistence( + item.clone(), + self.event_persistence_mode, + )); } } if filtered.is_empty() { @@ -673,9 +728,7 @@ async fn rollout_writer( RolloutCmd::AddItems(items) => { let mut persisted_items = Vec::new(); for item in items { - if is_persisted_response_item(&item) { - persisted_items.push(item); - } + persisted_items.push(item); } if persisted_items.is_empty() { continue; @@ -1003,6 +1056,7 @@ mod tests { SessionSource::Exec, BaseInstructions::default(), Vec::new(), + EventPersistenceMode::Limited, ), None, None, diff --git a/codex-rs/core/src/tasks/user_shell.rs b/codex-rs/core/src/tasks/user_shell.rs index ef6c7180a..43950a8da 100644 --- a/codex-rs/core/src/tasks/user_shell.rs +++ b/codex-rs/core/src/tasks/user_shell.rs @@ -21,6 +21,7 @@ use crate::protocol::EventMsg; use crate::protocol::ExecCommandBeginEvent; use crate::protocol::ExecCommandEndEvent; use crate::protocol::ExecCommandSource; +use crate::protocol::ExecCommandStatus; use crate::protocol::SandboxPolicy; use crate::protocol::TurnStartedEvent; use crate::sandboxing::ExecRequest; @@ -207,6 +208,7 @@ pub(crate) async fn execute_user_shell_command( exit_code: -1, duration: Duration::ZERO, formatted_output: aborted_message, + status: ExecCommandStatus::Failed, }), ) .await; @@ -233,6 +235,11 @@ pub(crate) async fn execute_user_shell_command( &output, turn_context.truncation_policy, ), + status: if output.exit_code == 0 { + ExecCommandStatus::Completed + } else { + ExecCommandStatus::Failed + }, }), ) .await; @@ -272,6 +279,7 @@ pub(crate) async fn execute_user_shell_command( &exec_output, turn_context.truncation_policy, ), + status: ExecCommandStatus::Failed, }), ) .await; diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 8117f62ec..b13ef8c61 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -277,13 +277,15 @@ impl ThreadManager { } pub async fn start_thread(&self, config: Config) -> CodexResult { - self.start_thread_with_tools(config, Vec::new()).await + self.start_thread_with_tools(config, Vec::new(), false) + .await } pub async fn start_thread_with_tools( &self, config: Config, dynamic_tools: Vec, + persist_extended_history: bool, ) -> CodexResult { self.state .spawn_thread( @@ -292,6 +294,7 @@ impl ThreadManager { Arc::clone(&self.state.auth_manager), self.agent_control(), dynamic_tools, + persist_extended_history, ) .await } @@ -303,7 +306,7 @@ impl ThreadManager { auth_manager: Arc, ) -> CodexResult { let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?; - self.resume_thread_with_history(config, initial_history, auth_manager) + self.resume_thread_with_history(config, initial_history, auth_manager, false) .await } @@ -312,6 +315,7 @@ impl ThreadManager { config: Config, initial_history: InitialHistory, auth_manager: Arc, + persist_extended_history: bool, ) -> CodexResult { self.state .spawn_thread( @@ -320,6 +324,7 @@ impl ThreadManager { auth_manager, self.agent_control(), Vec::new(), + persist_extended_history, ) .await } @@ -349,6 +354,7 @@ impl ThreadManager { nth_user_message: usize, config: Config, path: PathBuf, + persist_extended_history: bool, ) -> CodexResult { let history = RolloutRecorder::get_rollout_history(&path).await?; let history = truncate_before_nth_user_message(history, nth_user_message); @@ -359,6 +365,7 @@ impl ThreadManager { Arc::clone(&self.state.auth_manager), self.agent_control(), Vec::new(), + persist_extended_history, ) .await } @@ -409,7 +416,7 @@ impl ThreadManagerState { config: Config, agent_control: AgentControl, ) -> CodexResult { - self.spawn_new_thread_with_source(config, agent_control, self.session_source.clone()) + self.spawn_new_thread_with_source(config, agent_control, self.session_source.clone(), false) .await } @@ -418,6 +425,7 @@ impl ThreadManagerState { config: Config, agent_control: AgentControl, session_source: SessionSource, + persist_extended_history: bool, ) -> CodexResult { self.spawn_thread_with_source( config, @@ -426,6 +434,7 @@ impl ThreadManagerState { agent_control, session_source, Vec::new(), + persist_extended_history, ) .await } @@ -445,6 +454,7 @@ impl ThreadManagerState { agent_control, session_source, Vec::new(), + false, ) .await } @@ -457,6 +467,7 @@ impl ThreadManagerState { auth_manager: Arc, agent_control: AgentControl, dynamic_tools: Vec, + persist_extended_history: bool, ) -> CodexResult { self.spawn_thread_with_source( config, @@ -465,10 +476,12 @@ impl ThreadManagerState { agent_control, self.session_source.clone(), dynamic_tools, + persist_extended_history, ) .await } + #[allow(clippy::too_many_arguments)] pub(crate) async fn spawn_thread_with_source( &self, config: Config, @@ -477,6 +490,7 @@ impl ThreadManagerState { agent_control: AgentControl, session_source: SessionSource, dynamic_tools: Vec, + persist_extended_history: bool, ) -> CodexResult { self.file_watcher.register_config(&config); let CodexSpawnOk { @@ -491,6 +505,7 @@ impl ThreadManagerState { session_source, agent_control, dynamic_tools, + persist_extended_history, ) .await?; self.finalize_thread_spawn(codex, thread_id).await diff --git a/codex-rs/core/src/tools/events.rs b/codex-rs/core/src/tools/events.rs index ba720918b..65972f5a5 100644 --- a/codex-rs/core/src/tools/events.rs +++ b/codex-rs/core/src/tools/events.rs @@ -9,9 +9,11 @@ use crate::protocol::EventMsg; use crate::protocol::ExecCommandBeginEvent; use crate::protocol::ExecCommandEndEvent; use crate::protocol::ExecCommandSource; +use crate::protocol::ExecCommandStatus; use crate::protocol::FileChange; use crate::protocol::PatchApplyBeginEvent; use crate::protocol::PatchApplyEndEvent; +use crate::protocol::PatchApplyStatus; use crate::protocol::TurnDiffEvent; use crate::tools::context::SharedTurnDiffTracker; use crate::tools::sandboxing::ToolError; @@ -56,6 +58,7 @@ pub(crate) enum ToolEventStage { pub(crate) enum ToolEventFailure { Output(ExecToolCallOutput), Message(String), + Rejected(String), } pub(crate) async fn emit_exec_command_begin( @@ -195,6 +198,11 @@ impl ToolEmitter { output.stdout.text.clone(), output.stderr.text.clone(), output.exit_code == 0, + if output.exit_code == 0 { + PatchApplyStatus::Completed + } else { + PatchApplyStatus::Failed + }, ) .await; } @@ -208,6 +216,11 @@ impl ToolEmitter { output.stdout.text.clone(), output.stderr.text.clone(), output.exit_code == 0, + if output.exit_code == 0 { + PatchApplyStatus::Completed + } else { + PatchApplyStatus::Failed + }, ) .await; } @@ -221,6 +234,21 @@ impl ToolEmitter { String::new(), (*message).to_string(), false, + PatchApplyStatus::Failed, + ) + .await; + } + ( + Self::ApplyPatch { changes, .. }, + ToolEventStage::Failure(ToolEventFailure::Rejected(message)), + ) => { + emit_patch_end( + ctx, + changes.clone(), + String::new(), + (*message).to_string(), + false, + PatchApplyStatus::Declined, ) .await; } @@ -301,6 +329,13 @@ impl ToolEmitter { Err(ToolError::Rejected(msg)) => { // Normalize common rejection messages for exec tools so tests and // users see a clear, consistent phrase. + // + // NOTE: ToolError::Rejected is currently used for both user-declined approvals + // and some operational/runtime rejection paths (for example setup failures). + // We intentionally map all of them through the "rejected" event path for now, + // which means a subset of non-user failures may be reported as Declined. + // + // TODO: We should add a new ToolError variant for user-declined approvals. let normalized = if msg == "rejected by user" { match self { Self::Shell { .. } | Self::UnifiedExec { .. } => { @@ -311,7 +346,7 @@ impl ToolEmitter { } else { msg }; - let event = ToolEventStage::Failure(ToolEventFailure::Message(normalized.clone())); + let event = ToolEventStage::Failure(ToolEventFailure::Rejected(normalized.clone())); let result = Err(FunctionCallError::RespondToModel(normalized)); (event, result) } @@ -357,6 +392,7 @@ struct ExecCommandResult { exit_code: i32, duration: Duration, formatted_output: String, + status: ExecCommandStatus, } async fn emit_exec_stage( @@ -386,6 +422,11 @@ async fn emit_exec_stage( exit_code: output.exit_code, duration: output.duration, formatted_output: format_exec_output_str(&output, ctx.turn.truncation_policy), + status: if output.exit_code == 0 { + ExecCommandStatus::Completed + } else { + ExecCommandStatus::Failed + }, }; emit_exec_end(ctx, exec_input, exec_result).await; } @@ -398,6 +439,20 @@ async fn emit_exec_stage( exit_code: -1, duration: Duration::ZERO, formatted_output: text, + status: ExecCommandStatus::Failed, + }; + emit_exec_end(ctx, exec_input, exec_result).await; + } + ToolEventStage::Failure(ToolEventFailure::Rejected(message)) => { + let text = message.to_string(); + let exec_result = ExecCommandResult { + stdout: String::new(), + stderr: text.clone(), + aggregated_output: text.clone(), + exit_code: -1, + duration: Duration::ZERO, + formatted_output: text, + status: ExecCommandStatus::Declined, }; emit_exec_end(ctx, exec_input, exec_result).await; } @@ -427,6 +482,7 @@ async fn emit_exec_end( exit_code: exec_result.exit_code, duration: exec_result.duration, formatted_output: exec_result.formatted_output, + status: exec_result.status, }), ) .await; @@ -438,6 +494,7 @@ async fn emit_patch_end( stdout: String, stderr: String, success: bool, + status: PatchApplyStatus, ) { ctx.session .send_event( @@ -449,6 +506,7 @@ async fn emit_patch_end( stderr, success, changes, + status, }), ) .await; diff --git a/codex-rs/core/src/tools/handlers/collab.rs b/codex-rs/core/src/tools/handlers/collab.rs index aef8e3cc7..07a21a3fc 100644 --- a/codex-rs/core/src/tools/handlers/collab.rs +++ b/codex-rs/core/src/tools/handlers/collab.rs @@ -1301,6 +1301,7 @@ mod tests { phase: None, })]), AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy")), + false, ) .await .expect("start thread"); diff --git a/codex-rs/core/tests/suite/compact_resume_fork.rs b/codex-rs/core/tests/suite/compact_resume_fork.rs index 325ad4829..ff90d4baf 100644 --- a/codex-rs/core/tests/suite/compact_resume_fork.rs +++ b/codex-rs/core/tests/suite/compact_resume_fork.rs @@ -1019,7 +1019,7 @@ async fn fork_thread( nth_user_message: usize, ) -> Arc { manager - .fork_thread(nth_user_message, config.clone(), path) + .fork_thread(nth_user_message, config.clone(), path, false) .await .expect("fork conversation") .thread diff --git a/codex-rs/core/tests/suite/fork_thread.rs b/codex-rs/core/tests/suite/fork_thread.rs index d5363cc0a..3d97400a9 100644 --- a/codex-rs/core/tests/suite/fork_thread.rs +++ b/codex-rs/core/tests/suite/fork_thread.rs @@ -110,7 +110,7 @@ async fn fork_thread_twice_drops_to_first_message() { thread: codex_fork1, .. } = thread_manager - .fork_thread(1, config_for_fork.clone(), base_path.clone()) + .fork_thread(1, config_for_fork.clone(), base_path.clone(), false) .await .expect("fork 1"); @@ -129,7 +129,7 @@ async fn fork_thread_twice_drops_to_first_message() { thread: codex_fork2, .. } = thread_manager - .fork_thread(0, config_for_fork.clone(), fork1_path.clone()) + .fork_thread(0, config_for_fork.clone(), fork1_path.clone(), false) .await .expect("fork 2"); diff --git a/codex-rs/core/tests/suite/permissions_messages.rs b/codex-rs/core/tests/suite/permissions_messages.rs index 5bb92114f..c3936742e 100644 --- a/codex-rs/core/tests/suite/permissions_messages.rs +++ b/codex-rs/core/tests/suite/permissions_messages.rs @@ -413,7 +413,7 @@ async fn resume_and_fork_append_permissions_messages() -> Result<()> { fork_config.approval_policy = Constrained::allow_any(AskForApproval::UnlessTrusted); let forked = initial .thread_manager - .fork_thread(usize::MAX, fork_config, rollout_path) + .fork_thread(usize::MAX, fork_config, rollout_path, false) .await?; forked .thread diff --git a/codex-rs/core/tests/suite/resume_warning.rs b/codex-rs/core/tests/suite/resume_warning.rs index 5bea0e1f0..d1746dbf8 100644 --- a/codex-rs/core/tests/suite/resume_warning.rs +++ b/codex-rs/core/tests/suite/resume_warning.rs @@ -68,7 +68,7 @@ async fn emits_warning_when_resumed_model_differs() { thread: conversation, .. } = thread_manager - .resume_thread_with_history(config, initial_history, auth_manager) + .resume_thread_with_history(config, initial_history, auth_manager, false) .await .expect("resume conversation"); diff --git a/codex-rs/core/tests/suite/rollout_list_find.rs b/codex-rs/core/tests/suite/rollout_list_find.rs index 059ca929e..d9f92b85b 100644 --- a/codex-rs/core/tests/suite/rollout_list_find.rs +++ b/codex-rs/core/tests/suite/rollout_list_find.rs @@ -4,6 +4,7 @@ use std::path::Path; use std::path::PathBuf; use chrono::Utc; +use codex_core::EventPersistenceMode; use codex_core::RolloutRecorder; use codex_core::RolloutRecorderParams; use codex_core::config::ConfigBuilder; @@ -171,6 +172,7 @@ async fn find_locates_rollout_file_written_by_recorder() -> std::io::Result<()> SessionSource::Exec, BaseInstructions::default(), Vec::new(), + EventPersistenceMode::Limited, ), None, None, diff --git a/codex-rs/core/tests/suite/unstable_features_warning.rs b/codex-rs/core/tests/suite/unstable_features_warning.rs index e82572abc..1cb07c1f4 100644 --- a/codex-rs/core/tests/suite/unstable_features_warning.rs +++ b/codex-rs/core/tests/suite/unstable_features_warning.rs @@ -39,7 +39,7 @@ async fn emits_warning_when_unstable_features_enabled_via_config() { thread: conversation, .. } = thread_manager - .resume_thread_with_history(config, InitialHistory::New, auth_manager) + .resume_thread_with_history(config, InitialHistory::New, auth_manager, false) .await .expect("spawn conversation"); @@ -77,7 +77,7 @@ async fn suppresses_warning_when_configured() { thread: conversation, .. } = thread_manager - .resume_thread_with_history(config, InitialHistory::New, auth_manager) + .resume_thread_with_history(config, InitialHistory::New, auth_manager, false) .await .expect("spawn conversation"); diff --git a/codex-rs/exec/tests/event_processor_with_json_output.rs b/codex-rs/exec/tests/event_processor_with_json_output.rs index e05883041..46cd61545 100644 --- a/codex-rs/exec/tests/event_processor_with_json_output.rs +++ b/codex-rs/exec/tests/event_processor_with_json_output.rs @@ -11,12 +11,14 @@ use codex_core::protocol::EventMsg; use codex_core::protocol::ExecCommandBeginEvent; use codex_core::protocol::ExecCommandEndEvent; use codex_core::protocol::ExecCommandSource; +use codex_core::protocol::ExecCommandStatus as CoreExecCommandStatus; use codex_core::protocol::FileChange; use codex_core::protocol::McpInvocation; use codex_core::protocol::McpToolCallBeginEvent; use codex_core::protocol::McpToolCallEndEvent; use codex_core::protocol::PatchApplyBeginEvent; use codex_core::protocol::PatchApplyEndEvent; +use codex_core::protocol::PatchApplyStatus as CorePatchApplyStatus; use codex_core::protocol::SandboxPolicy; use codex_core::protocol::SessionConfiguredEvent; use codex_core::protocol::WarningEvent; @@ -901,6 +903,7 @@ fn exec_command_end_success_produces_completed_command_item() { exit_code: 0, duration: Duration::from_millis(5), formatted_output: String::new(), + status: CoreExecCommandStatus::Completed, }), ); let out_ok = ep.collect_thread_events(&end_ok); @@ -988,6 +991,7 @@ fn command_execution_output_delta_updates_item_progress() { exit_code: 0, duration: Duration::from_millis(3), formatted_output: String::new(), + status: CoreExecCommandStatus::Completed, }), ); let out_end = ep.collect_thread_events(&end); @@ -1061,6 +1065,7 @@ fn exec_command_end_failure_produces_failed_command_item() { exit_code: 1, duration: Duration::from_millis(2), formatted_output: String::new(), + status: CoreExecCommandStatus::Failed, }), ); let out_fail = ep.collect_thread_events(&end_fail); @@ -1102,6 +1107,7 @@ fn exec_command_end_without_begin_is_ignored() { exit_code: 0, duration: Duration::from_millis(1), formatted_output: String::new(), + status: CoreExecCommandStatus::Completed, }), ); let out = ep.collect_thread_events(&end_only); @@ -1157,6 +1163,7 @@ fn patch_apply_success_produces_item_completed_patchapply() { stderr: String::new(), success: true, changes: changes.clone(), + status: CorePatchApplyStatus::Completed, }), ); let out_end = ep.collect_thread_events(&end); @@ -1228,6 +1235,7 @@ fn patch_apply_failure_produces_item_completed_patchapply_failed() { stderr: "failed to apply".to_string(), success: false, changes: changes.clone(), + status: CorePatchApplyStatus::Failed, }), ); let out_end = ep.collect_thread_events(&end); diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 40e981b93..6097c60e8 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -1160,6 +1160,27 @@ pub enum CodexErrorInfo { Other, } +impl CodexErrorInfo { + /// Whether this error should mark the current turn as failed when replaying history. + pub fn affects_turn_status(&self) -> bool { + match self { + Self::ThreadRollbackFailed => false, + Self::ContextWindowExceeded + | Self::UsageLimitExceeded + | Self::ServerOverloaded + | Self::HttpConnectionFailed { .. } + | Self::ResponseStreamConnectionFailed { .. } + | Self::InternalServerError + | Self::Unauthorized + | Self::BadRequest + | Self::SandboxError + | Self::ResponseStreamDisconnected { .. } + | Self::ResponseTooManyFailedAttempts { .. } + | Self::Other => true, + } + } +} + #[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] pub struct RawResponseItemEvent { pub item: ResponseItem, @@ -1297,6 +1318,15 @@ pub struct ErrorEvent { pub codex_error_info: Option, } +impl ErrorEvent { + /// Whether this error should mark the current turn as failed when replaying history. + pub fn affects_turn_status(&self) -> bool { + self.codex_error_info + .as_ref() + .is_none_or(CodexErrorInfo::affects_turn_status) + } +} + #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] pub struct WarningEvent { pub message: String, @@ -2051,6 +2081,14 @@ pub enum ExecCommandSource { UnifiedExecInteraction, } +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "snake_case")] +pub enum ExecCommandStatus { + Completed, + Failed, + Declined, +} + #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] pub struct ExecCommandBeginEvent { /// Identifier so this can be paired with the ExecCommandEnd event. @@ -2112,6 +2150,8 @@ pub struct ExecCommandEndEvent { pub duration: Duration, /// Formatted output from the command, as seen by the model. pub formatted_output: String, + /// Completion status for this command execution. + pub status: ExecCommandStatus, } #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] @@ -2235,6 +2275,16 @@ pub struct PatchApplyEndEvent { /// The changes that were applied (mirrors PatchApplyBeginEvent::changes). #[serde(default)] pub changes: HashMap, + /// Completion status for this patch application. + pub status: PatchApplyStatus, +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "snake_case")] +pub enum PatchApplyStatus { + Completed, + Failed, + Declined, } #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] @@ -2789,6 +2839,24 @@ mod tests { assert!(event.as_legacy_events(false).is_empty()); } + #[test] + fn rollback_failed_error_does_not_affect_turn_status() { + let event = ErrorEvent { + message: "rollback failed".into(), + codex_error_info: Some(CodexErrorInfo::ThreadRollbackFailed), + }; + assert!(!event.affects_turn_status()); + } + + #[test] + fn generic_error_affects_turn_status() { + let event = ErrorEvent { + message: "generic".into(), + codex_error_info: Some(CodexErrorInfo::Other), + }; + assert!(event.affects_turn_status()); + } + #[test] fn user_input_serialization_omits_final_output_json_schema_when_none() -> Result<()> { let op = Op::UserInput { diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 2f426570f..c700a3a84 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -1078,7 +1078,7 @@ impl App { SessionSelection::Fork(path) => { otel_manager.counter("codex.thread.fork", 1, &[("source", "cli_subcommand")]); let forked = thread_manager - .fork_thread(usize::MAX, config.clone(), path.clone()) + .fork_thread(usize::MAX, config.clone(), path.clone(), false) .await .wrap_err_with(|| { let path_display = path.display(); @@ -1463,7 +1463,7 @@ impl App { if path.exists() { match self .server - .fork_thread(usize::MAX, self.config.clone(), path.clone()) + .fork_thread(usize::MAX, self.config.clone(), path.clone(), false) .await { Ok(forked) => { diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index a126c5f98..e3dc0af34 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -36,6 +36,7 @@ use codex_core::protocol::ExecApprovalRequestEvent; use codex_core::protocol::ExecCommandBeginEvent; use codex_core::protocol::ExecCommandEndEvent; use codex_core::protocol::ExecCommandSource; +use codex_core::protocol::ExecCommandStatus as CoreExecCommandStatus; use codex_core::protocol::ExecPolicyAmendment; use codex_core::protocol::ExitedReviewModeEvent; use codex_core::protocol::FileChange; @@ -46,6 +47,7 @@ use codex_core::protocol::McpStartupUpdateEvent; use codex_core::protocol::Op; use codex_core::protocol::PatchApplyBeginEvent; use codex_core::protocol::PatchApplyEndEvent; +use codex_core::protocol::PatchApplyStatus as CorePatchApplyStatus; use codex_core::protocol::RateLimitWindow; use codex_core::protocol::ReviewRequest; use codex_core::protocol::ReviewTarget; @@ -2180,6 +2182,11 @@ fn end_exec( exit_code, duration: std::time::Duration::from_millis(5), formatted_output: aggregated, + status: if exit_code == 0 { + CoreExecCommandStatus::Completed + } else { + CoreExecCommandStatus::Failed + }, }), }); } @@ -2641,6 +2648,7 @@ async fn exec_end_without_begin_uses_event_command() { exit_code: 0, duration: std::time::Duration::from_millis(5), formatted_output: "done".to_string(), + status: CoreExecCommandStatus::Completed, }), }); @@ -5284,6 +5292,7 @@ async fn apply_patch_events_emit_history_cells() { stderr: String::new(), success: true, changes: end_changes, + status: CorePatchApplyStatus::Completed, }; chat.handle_codex_event(Event { id: "s1".into(), @@ -5511,6 +5520,7 @@ async fn apply_patch_full_flow_integration_like() { stderr: String::new(), success: true, changes: end_changes, + status: CorePatchApplyStatus::Completed, }), }); } @@ -6101,6 +6111,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() { exit_code: 0, duration: std::time::Duration::from_millis(16000), formatted_output: String::new(), + status: CoreExecCommandStatus::Completed, }), }); chat.handle_codex_event(Event {