fix(tui): restore remote resume and fork history (#14930)
## Problem
When the TUI connects to a **remote** app-server (via WebSocket), resume
and fork operations lost all conversation history.
`AppServerStartedThread` carried only the `SessionConfigured` event, not
the full `Thread` snapshot. After resume or fork, the chat transcript
was empty — prior turns were silently discarded.
A secondary issue: `primary_session_configured` was not cleared on
reset, causing stale session state after reconnection.
## Approach: TUI-side only, zero app-server changes
The app-server **already returns** the full `Thread` object (with
populated `turns: Vec<Turn>`) in its `ThreadStartResponse`,
`ThreadResumeResponse`, and `ThreadForkResponse`. The data was always
there — the TUI was simply throwing it away. The old
`AppServerStartedThread` struct only kept the `SessionConfiguredEvent`,
discarding the rich turn history that the server had already provided.
This PR fixes the problem entirely within `tui_app_server` (3 files
changed, 0 changes to `app-server`, `app-server-protocol`, or any other
crate). Rather than modifying the server to send history in a different
format or adding a new endpoint, the fix preserves the existing `Thread`
snapshot and replays it through the TUI's standard event pipeline —
making restored sessions indistinguishable from live ones.
## Solution
Add a **thread snapshot replay** path. When the server hands back a
`Thread` object (on start, resume, or fork),
`restore_started_app_server_thread` converts its historical turns into
the same core `Event` sequence the TUI already processes for live
interactions, then replays them into the event store so the chat widget
renders them.
Key changes:
- **`AppServerStartedThread` now carries the full `Thread`** —
`started_thread_from_{start,resume,fork}_response` clone the thread into
the struct alongside the existing `SessionConfiguredEvent`.
- **`thread_snapshot_events()`** walks the thread's turns and items,
producing `TurnStarted` → `ItemCompleted`* →
`TurnComplete`/`TurnAborted` event sequences that the TUI already knows
how to render.
- **`restore_started_app_server_thread()`** pushes the session event +
history events into the thread channel's store, activates the channel,
and replays the snapshot — used for initial startup, resume, and fork.
- **`primary_session_configured` cleared on reset** to prevent stale
session state after reconnection.
## Tradeoffs
- **`Thread` is cloned into `AppServerStartedThread`**: The full thread
snapshot (including all historical turns) is cloned at startup. For
long-lived threads this could be large, but it's a one-time cost and
avoids lifetime gymnastics with the response.
## Tests
- `restore_started_app_server_thread_replays_remote_history` —
end-to-end: constructs a `Thread` with one completed turn, restores it,
and asserts user/agent messages appear in the transcript.
- `bridges_thread_snapshot_turns_for_resume_restore` — unit: verifies
`thread_snapshot_events` produces the correct event sequence for
completed and interrupted turns.
## Test plan
- [ ] Verify `cargo check -p codex-tui-app-server` passes
- [ ] Verify `cargo test -p codex-tui-app-server` passes
- [ ] Manual: connect to a remote app-server, resume an existing thread,
confirm history renders in the chat widget
- [ ] Manual: fork a thread via remote, confirm prior turns appear
This commit is contained in:
parent
8e258eb3f5
commit
78e8ee4591
5 changed files with 958 additions and 231 deletions
|
|
@ -118,6 +118,7 @@ mod pending_interactive_replay;
|
|||
|
||||
use self::agent_navigation::AgentNavigationDirection;
|
||||
use self::agent_navigation::AgentNavigationState;
|
||||
use self::app_server_adapter::thread_snapshot_events;
|
||||
use self::app_server_requests::PendingAppServerRequests;
|
||||
use self::pending_interactive_replay::PendingInteractiveReplayState;
|
||||
|
||||
|
|
@ -2050,6 +2051,7 @@ impl App {
|
|||
self.active_thread_id = None;
|
||||
self.active_thread_rx = None;
|
||||
self.primary_thread_id = None;
|
||||
self.primary_session_configured = None;
|
||||
self.pending_primary_events.clear();
|
||||
self.pending_app_server_requests.clear();
|
||||
self.chat_widget.set_pending_thread_approvals(Vec::new());
|
||||
|
|
@ -2117,11 +2119,66 @@ impl App {
|
|||
let init = self.chatwidget_init_for_forked_or_resumed_thread(tui, self.config.clone());
|
||||
self.chat_widget = ChatWidget::new_with_app_event(init);
|
||||
self.reset_thread_event_state();
|
||||
self.enqueue_primary_event(Event {
|
||||
self.restore_started_app_server_thread(started).await
|
||||
}
|
||||
|
||||
/// Hydrate thread state from an `AppServerStartedThread` returned by the
|
||||
/// app-server start/resume/fork handshake.
|
||||
///
|
||||
/// This is the single path that every session-start variant funnels
|
||||
/// through. It performs four things in order:
|
||||
///
|
||||
/// 1. Converts the `Thread` snapshot into protocol-level `Event`s.
|
||||
/// 2. Builds a **lossless** replay snapshot from a temporary store so that
|
||||
/// the initial render sees all history even when the thread has more
|
||||
/// turns than the bounded channel capacity.
|
||||
/// 3. Pushes the same events into the real channel store for backtrack and
|
||||
/// navigation.
|
||||
/// 4. Activates the thread channel and replays the snapshot into the chat
|
||||
/// widget.
|
||||
async fn restore_started_app_server_thread(
|
||||
&mut self,
|
||||
started: AppServerStartedThread,
|
||||
) -> Result<()> {
|
||||
let session_configured = started.session_configured;
|
||||
let thread_id = session_configured.session_id;
|
||||
let session_event = Event {
|
||||
id: String::new(),
|
||||
msg: EventMsg::SessionConfigured(started.session_configured),
|
||||
})
|
||||
.await
|
||||
msg: EventMsg::SessionConfigured(session_configured.clone()),
|
||||
};
|
||||
let history_events =
|
||||
thread_snapshot_events(&started.thread, started.show_raw_agent_reasoning);
|
||||
let replay_snapshot = {
|
||||
let mut replay_store = ThreadEventStore::new(history_events.len().saturating_add(1));
|
||||
replay_store.push_event(session_event.clone());
|
||||
for event in &history_events {
|
||||
replay_store.push_event(event.clone());
|
||||
}
|
||||
replay_store.snapshot()
|
||||
};
|
||||
|
||||
self.primary_thread_id = Some(thread_id);
|
||||
self.primary_session_configured = Some(session_configured);
|
||||
self.upsert_agent_picker_thread(
|
||||
thread_id, /*agent_nickname*/ None, /*agent_role*/ None,
|
||||
/*is_closed*/ false,
|
||||
);
|
||||
|
||||
let store = {
|
||||
let channel = self.ensure_thread_channel(thread_id);
|
||||
Arc::clone(&channel.store)
|
||||
};
|
||||
{
|
||||
let mut store = store.lock().await;
|
||||
store.push_event(session_event);
|
||||
for event in history_events {
|
||||
store.push_event(event);
|
||||
}
|
||||
}
|
||||
|
||||
self.activate_thread_channel(thread_id).await;
|
||||
self.replay_thread_snapshot(replay_snapshot, /*resume_restored_queue*/ false);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn fresh_session_config(&self) -> Config {
|
||||
|
|
@ -2187,6 +2244,8 @@ impl App {
|
|||
snapshot: ThreadEventSnapshot,
|
||||
resume_restored_queue: bool,
|
||||
) {
|
||||
self.chat_widget
|
||||
.set_initial_user_message_submit_suppressed(/*suppressed*/ true);
|
||||
if let Some(event) = snapshot.session_configured {
|
||||
self.handle_codex_event_replay(event);
|
||||
}
|
||||
|
|
@ -2199,6 +2258,9 @@ impl App {
|
|||
}
|
||||
self.chat_widget
|
||||
.set_queue_autosend_suppressed(/*suppressed*/ false);
|
||||
self.chat_widget
|
||||
.set_initial_user_message_submit_suppressed(/*suppressed*/ false);
|
||||
self.chat_widget.submit_initial_user_message_if_pending();
|
||||
if resume_restored_queue {
|
||||
self.chat_widget.maybe_send_next_queued_input();
|
||||
}
|
||||
|
|
@ -2313,7 +2375,7 @@ impl App {
|
|||
let enhanced_keys_supported = tui.enhanced_keys_supported();
|
||||
let wait_for_initial_session_configured =
|
||||
Self::should_wait_for_initial_session(&session_selection);
|
||||
let (mut chat_widget, initial_session_configured) = match session_selection {
|
||||
let (mut chat_widget, initial_started_thread) = match session_selection {
|
||||
SessionSelection::StartFresh | SessionSelection::Exit => {
|
||||
let started = app_server.start_thread(&config).await?;
|
||||
let startup_tooltip_override =
|
||||
|
|
@ -2342,10 +2404,7 @@ impl App {
|
|||
status_line_invalid_items_warned: status_line_invalid_items_warned.clone(),
|
||||
session_telemetry: session_telemetry.clone(),
|
||||
};
|
||||
(
|
||||
ChatWidget::new_with_app_event(init),
|
||||
Some(started.session_configured),
|
||||
)
|
||||
(ChatWidget::new_with_app_event(init), started)
|
||||
}
|
||||
SessionSelection::Resume(target_session) => {
|
||||
let resumed = app_server
|
||||
|
|
@ -2378,10 +2437,7 @@ impl App {
|
|||
status_line_invalid_items_warned: status_line_invalid_items_warned.clone(),
|
||||
session_telemetry: session_telemetry.clone(),
|
||||
};
|
||||
(
|
||||
ChatWidget::new_with_app_event(init),
|
||||
Some(resumed.session_configured),
|
||||
)
|
||||
(ChatWidget::new_with_app_event(init), resumed)
|
||||
}
|
||||
SessionSelection::Fork(target_session) => {
|
||||
session_telemetry.counter(
|
||||
|
|
@ -2419,10 +2475,7 @@ impl App {
|
|||
status_line_invalid_items_warned: status_line_invalid_items_warned.clone(),
|
||||
session_telemetry: session_telemetry.clone(),
|
||||
};
|
||||
(
|
||||
ChatWidget::new_with_app_event(init),
|
||||
Some(forked.session_configured),
|
||||
)
|
||||
(ChatWidget::new_with_app_event(init), forked)
|
||||
}
|
||||
};
|
||||
|
||||
|
|
@ -2474,13 +2527,8 @@ impl App {
|
|||
pending_primary_events: VecDeque::new(),
|
||||
pending_app_server_requests: PendingAppServerRequests::default(),
|
||||
};
|
||||
if let Some(session_configured) = initial_session_configured {
|
||||
app.enqueue_primary_event(Event {
|
||||
id: String::new(),
|
||||
msg: EventMsg::SessionConfigured(session_configured),
|
||||
})
|
||||
app.restore_started_app_server_thread(initial_started_thread)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// On startup, if Agent mode (workspace-write) or ReadOnly is active, warn about world-writable dirs on Windows.
|
||||
#[cfg(target_os = "windows")]
|
||||
|
|
@ -4427,6 +4475,7 @@ mod tests {
|
|||
use crate::app_backtrack::BacktrackSelection;
|
||||
use crate::app_backtrack::BacktrackState;
|
||||
use crate::app_backtrack::user_count;
|
||||
use crate::app_server_session::AppServerStartedThread;
|
||||
use crate::chatwidget::tests::make_chatwidget_manual_with_sender;
|
||||
use crate::chatwidget::tests::set_chatgpt_auth;
|
||||
use crate::file_search::FileSearchManager;
|
||||
|
|
@ -4437,6 +4486,11 @@ mod tests {
|
|||
use crate::multi_agents::AgentPickerThreadEntry;
|
||||
use assert_matches::assert_matches;
|
||||
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadStatus;
|
||||
use codex_app_server_protocol::Turn;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_core::config::ConfigBuilder;
|
||||
use codex_core::config::ConfigOverrides;
|
||||
use codex_core::config::types::ModelAvailabilityNuxConfig;
|
||||
|
|
@ -6680,6 +6734,392 @@ guardian_approval = true
|
|||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn restore_started_app_server_thread_replays_remote_history() -> Result<()> {
|
||||
let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await;
|
||||
let thread_id = ThreadId::new();
|
||||
|
||||
app.restore_started_app_server_thread(AppServerStartedThread {
|
||||
thread: Thread {
|
||||
id: thread_id.to_string(),
|
||||
preview: "hello".to_string(),
|
||||
ephemeral: false,
|
||||
model_provider: "test-provider".to_string(),
|
||||
created_at: 0,
|
||||
updated_at: 0,
|
||||
status: ThreadStatus::Idle,
|
||||
path: None,
|
||||
cwd: PathBuf::from("/tmp/project"),
|
||||
cli_version: "test".to_string(),
|
||||
source: SessionSource::Cli.into(),
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
git_info: None,
|
||||
name: Some("restored".to_string()),
|
||||
turns: vec![Turn {
|
||||
id: "turn-1".to_string(),
|
||||
items: vec![
|
||||
ThreadItem::UserMessage {
|
||||
id: "user-1".to_string(),
|
||||
content: vec![codex_app_server_protocol::UserInput::Text {
|
||||
text: "hello from remote".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
},
|
||||
ThreadItem::AgentMessage {
|
||||
id: "assistant-1".to_string(),
|
||||
text: "restored response".to_string(),
|
||||
phase: None,
|
||||
},
|
||||
],
|
||||
status: TurnStatus::Completed,
|
||||
error: None,
|
||||
}],
|
||||
},
|
||||
session_configured: SessionConfiguredEvent {
|
||||
session_id: thread_id,
|
||||
forked_from_id: None,
|
||||
thread_name: Some("restored".to_string()),
|
||||
model: "gpt-test".to_string(),
|
||||
model_provider_id: "test-provider".to_string(),
|
||||
service_tier: None,
|
||||
approval_policy: AskForApproval::Never,
|
||||
approvals_reviewer: ApprovalsReviewer::User,
|
||||
sandbox_policy: SandboxPolicy::new_read_only_policy(),
|
||||
cwd: PathBuf::from("/tmp/project"),
|
||||
reasoning_effort: None,
|
||||
history_log_id: 0,
|
||||
history_entry_count: 0,
|
||||
initial_messages: None,
|
||||
network_proxy: None,
|
||||
rollout_path: Some(PathBuf::new()),
|
||||
},
|
||||
show_raw_agent_reasoning: false,
|
||||
})
|
||||
.await?;
|
||||
|
||||
while let Ok(event) = app_event_rx.try_recv() {
|
||||
if let AppEvent::InsertHistoryCell(cell) = event {
|
||||
let cell: Arc<dyn HistoryCell> = cell.into();
|
||||
app.transcript_cells.push(cell);
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(app.primary_thread_id, Some(thread_id));
|
||||
assert_eq!(app.active_thread_id, Some(thread_id));
|
||||
|
||||
let user_messages: Vec<String> = app
|
||||
.transcript_cells
|
||||
.iter()
|
||||
.filter_map(|cell| {
|
||||
cell.as_any()
|
||||
.downcast_ref::<UserHistoryCell>()
|
||||
.map(|cell| cell.message.clone())
|
||||
})
|
||||
.collect();
|
||||
let agent_messages: Vec<String> = app
|
||||
.transcript_cells
|
||||
.iter()
|
||||
.filter_map(|cell| {
|
||||
cell.as_any()
|
||||
.downcast_ref::<AgentMessageCell>()
|
||||
.map(|cell| {
|
||||
cell.display_lines(80)
|
||||
.into_iter()
|
||||
.map(|line| line.to_string())
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n")
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
assert_eq!(user_messages, vec!["hello from remote".to_string()]);
|
||||
assert_eq!(agent_messages, vec!["• restored response".to_string()]);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn restore_started_app_server_thread_submits_initial_prompt_after_history_replay()
|
||||
-> Result<()> {
|
||||
let (mut app, mut app_event_rx, mut op_rx) = make_test_app_with_channels().await;
|
||||
let thread_id = ThreadId::new();
|
||||
app.chat_widget.set_initial_user_message_for_test(
|
||||
crate::chatwidget::create_initial_user_message(
|
||||
Some("resume prompt".to_string()),
|
||||
Vec::new(),
|
||||
Vec::new(),
|
||||
),
|
||||
);
|
||||
|
||||
app.restore_started_app_server_thread(AppServerStartedThread {
|
||||
thread: Thread {
|
||||
id: thread_id.to_string(),
|
||||
preview: "hello".to_string(),
|
||||
ephemeral: false,
|
||||
model_provider: "test-provider".to_string(),
|
||||
created_at: 0,
|
||||
updated_at: 0,
|
||||
status: ThreadStatus::Idle,
|
||||
path: None,
|
||||
cwd: PathBuf::from("/tmp/project"),
|
||||
cli_version: "test".to_string(),
|
||||
source: SessionSource::Cli.into(),
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
git_info: None,
|
||||
name: Some("restored".to_string()),
|
||||
turns: vec![Turn {
|
||||
id: "turn-1".to_string(),
|
||||
items: vec![
|
||||
ThreadItem::UserMessage {
|
||||
id: "user-1".to_string(),
|
||||
content: vec![codex_app_server_protocol::UserInput::Text {
|
||||
text: "hello from remote".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
},
|
||||
ThreadItem::AgentMessage {
|
||||
id: "assistant-1".to_string(),
|
||||
text: "restored response".to_string(),
|
||||
phase: None,
|
||||
},
|
||||
],
|
||||
status: TurnStatus::Completed,
|
||||
error: None,
|
||||
}],
|
||||
},
|
||||
session_configured: SessionConfiguredEvent {
|
||||
session_id: thread_id,
|
||||
forked_from_id: None,
|
||||
thread_name: Some("restored".to_string()),
|
||||
model: "gpt-test".to_string(),
|
||||
model_provider_id: "test-provider".to_string(),
|
||||
service_tier: None,
|
||||
approval_policy: AskForApproval::Never,
|
||||
approvals_reviewer: ApprovalsReviewer::User,
|
||||
sandbox_policy: SandboxPolicy::new_read_only_policy(),
|
||||
cwd: PathBuf::from("/tmp/project"),
|
||||
reasoning_effort: None,
|
||||
history_log_id: 0,
|
||||
history_entry_count: 0,
|
||||
initial_messages: None,
|
||||
network_proxy: None,
|
||||
rollout_path: Some(PathBuf::new()),
|
||||
},
|
||||
show_raw_agent_reasoning: false,
|
||||
})
|
||||
.await?;
|
||||
|
||||
while let Ok(event) = app_event_rx.try_recv() {
|
||||
if let AppEvent::InsertHistoryCell(cell) = event {
|
||||
let cell: Arc<dyn HistoryCell> = cell.into();
|
||||
app.transcript_cells.push(cell);
|
||||
}
|
||||
}
|
||||
|
||||
let user_messages: Vec<String> = app
|
||||
.transcript_cells
|
||||
.iter()
|
||||
.filter_map(|cell| {
|
||||
cell.as_any()
|
||||
.downcast_ref::<UserHistoryCell>()
|
||||
.map(|cell| cell.message.clone())
|
||||
})
|
||||
.collect();
|
||||
|
||||
assert_eq!(
|
||||
user_messages,
|
||||
vec!["hello from remote".to_string(), "resume prompt".to_string()]
|
||||
);
|
||||
match next_user_turn_op(&mut op_rx) {
|
||||
Op::UserTurn { items, .. } => assert_eq!(
|
||||
items,
|
||||
vec![UserInput::Text {
|
||||
text: "resume prompt".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}]
|
||||
),
|
||||
other => panic!("expected resume prompt submission, got {other:?}"),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn restore_started_app_server_thread_replays_history_beyond_store_capacity() -> Result<()>
|
||||
{
|
||||
let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await;
|
||||
let thread_id = ThreadId::new();
|
||||
let turn_count = THREAD_EVENT_CHANNEL_CAPACITY + 5;
|
||||
|
||||
let turns = (0..turn_count)
|
||||
.map(|index| Turn {
|
||||
id: format!("turn-{index}"),
|
||||
items: vec![ThreadItem::UserMessage {
|
||||
id: format!("user-{index}"),
|
||||
content: vec![codex_app_server_protocol::UserInput::Text {
|
||||
text: format!("message {index}"),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
}],
|
||||
status: TurnStatus::Completed,
|
||||
error: None,
|
||||
})
|
||||
.collect();
|
||||
|
||||
app.restore_started_app_server_thread(AppServerStartedThread {
|
||||
thread: Thread {
|
||||
id: thread_id.to_string(),
|
||||
preview: "hello".to_string(),
|
||||
ephemeral: false,
|
||||
model_provider: "test-provider".to_string(),
|
||||
created_at: 0,
|
||||
updated_at: 0,
|
||||
status: ThreadStatus::Idle,
|
||||
path: None,
|
||||
cwd: PathBuf::from("/tmp/project"),
|
||||
cli_version: "test".to_string(),
|
||||
source: SessionSource::Cli.into(),
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
git_info: None,
|
||||
name: Some("restored".to_string()),
|
||||
turns,
|
||||
},
|
||||
session_configured: SessionConfiguredEvent {
|
||||
session_id: thread_id,
|
||||
forked_from_id: None,
|
||||
thread_name: Some("restored".to_string()),
|
||||
model: "gpt-test".to_string(),
|
||||
model_provider_id: "test-provider".to_string(),
|
||||
service_tier: None,
|
||||
approval_policy: AskForApproval::Never,
|
||||
approvals_reviewer: ApprovalsReviewer::User,
|
||||
sandbox_policy: SandboxPolicy::new_read_only_policy(),
|
||||
cwd: PathBuf::from("/tmp/project"),
|
||||
reasoning_effort: None,
|
||||
history_log_id: 0,
|
||||
history_entry_count: 0,
|
||||
initial_messages: None,
|
||||
network_proxy: None,
|
||||
rollout_path: Some(PathBuf::new()),
|
||||
},
|
||||
show_raw_agent_reasoning: false,
|
||||
})
|
||||
.await?;
|
||||
|
||||
while let Ok(event) = app_event_rx.try_recv() {
|
||||
if let AppEvent::InsertHistoryCell(cell) = event {
|
||||
let cell: Arc<dyn HistoryCell> = cell.into();
|
||||
app.transcript_cells.push(cell);
|
||||
}
|
||||
}
|
||||
|
||||
let user_messages: Vec<String> = app
|
||||
.transcript_cells
|
||||
.iter()
|
||||
.filter_map(|cell| {
|
||||
cell.as_any()
|
||||
.downcast_ref::<UserHistoryCell>()
|
||||
.map(|cell| cell.message.clone())
|
||||
})
|
||||
.collect();
|
||||
|
||||
assert_eq!(user_messages.len(), turn_count);
|
||||
assert_eq!(user_messages.first().map(String::as_str), Some("message 0"));
|
||||
let last_message = format!("message {}", turn_count - 1);
|
||||
assert_eq!(
|
||||
user_messages.last().map(String::as_str),
|
||||
Some(last_message.as_str())
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn restore_started_app_server_thread_replays_raw_reasoning_when_enabled() -> Result<()> {
|
||||
let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await;
|
||||
let thread_id = ThreadId::new();
|
||||
|
||||
app.restore_started_app_server_thread(AppServerStartedThread {
|
||||
thread: Thread {
|
||||
id: thread_id.to_string(),
|
||||
preview: "hello".to_string(),
|
||||
ephemeral: false,
|
||||
model_provider: "test-provider".to_string(),
|
||||
created_at: 0,
|
||||
updated_at: 0,
|
||||
status: ThreadStatus::Idle,
|
||||
path: None,
|
||||
cwd: PathBuf::from("/tmp/project"),
|
||||
cli_version: "test".to_string(),
|
||||
source: SessionSource::Cli.into(),
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
git_info: None,
|
||||
name: Some("restored".to_string()),
|
||||
turns: vec![Turn {
|
||||
id: "turn-1".to_string(),
|
||||
items: vec![ThreadItem::Reasoning {
|
||||
id: "reasoning-1".to_string(),
|
||||
summary: vec!["summary reasoning".to_string()],
|
||||
content: vec!["raw reasoning".to_string()],
|
||||
}],
|
||||
status: TurnStatus::Completed,
|
||||
error: None,
|
||||
}],
|
||||
},
|
||||
session_configured: SessionConfiguredEvent {
|
||||
session_id: thread_id,
|
||||
forked_from_id: None,
|
||||
thread_name: Some("restored".to_string()),
|
||||
model: "gpt-test".to_string(),
|
||||
model_provider_id: "test-provider".to_string(),
|
||||
service_tier: None,
|
||||
approval_policy: AskForApproval::Never,
|
||||
approvals_reviewer: ApprovalsReviewer::User,
|
||||
sandbox_policy: SandboxPolicy::new_read_only_policy(),
|
||||
cwd: PathBuf::from("/tmp/project"),
|
||||
reasoning_effort: None,
|
||||
history_log_id: 0,
|
||||
history_entry_count: 0,
|
||||
initial_messages: None,
|
||||
network_proxy: None,
|
||||
rollout_path: Some(PathBuf::new()),
|
||||
},
|
||||
show_raw_agent_reasoning: true,
|
||||
})
|
||||
.await?;
|
||||
|
||||
while let Ok(event) = app_event_rx.try_recv() {
|
||||
if let AppEvent::InsertHistoryCell(cell) = event {
|
||||
let cell: Arc<dyn HistoryCell> = cell.into();
|
||||
app.transcript_cells.push(cell);
|
||||
}
|
||||
}
|
||||
|
||||
let channel = app
|
||||
.thread_event_channels
|
||||
.get(&thread_id)
|
||||
.expect("restored thread channel should exist");
|
||||
let snapshot = channel.store.lock().await.snapshot();
|
||||
let replayed_raw_reasoning = snapshot.events.iter().any(|event| {
|
||||
matches!(
|
||||
&event.msg,
|
||||
EventMsg::AgentReasoningRawContent(raw) if raw.text == "raw reasoning"
|
||||
)
|
||||
});
|
||||
|
||||
assert!(
|
||||
replayed_raw_reasoning,
|
||||
"expected restored snapshot to keep raw reasoning event: {:?}",
|
||||
snapshot.events
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn thread_event_store_tracks_active_turn_lifecycle() {
|
||||
let mut store = ThreadEventStore::new(8);
|
||||
|
|
|
|||
|
|
@ -19,7 +19,10 @@ use crate::app_server_session::status_account_display_from_auth_mode;
|
|||
use codex_app_server_client::AppServerEvent;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::Turn;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::ModeKind;
|
||||
use codex_protocol::items::AgentMessageContent;
|
||||
|
|
@ -48,6 +51,8 @@ use codex_protocol::protocol::ThreadNameUpdatedEvent;
|
|||
use codex_protocol::protocol::TokenCountEvent;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use codex_protocol::protocol::TokenUsageInfo;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use codex_protocol::protocol::TurnAbortedEvent;
|
||||
use codex_protocol::protocol::TurnCompleteEvent;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
use serde_json::Value;
|
||||
|
|
@ -196,6 +201,31 @@ impl App {
|
|||
}
|
||||
}
|
||||
|
||||
/// Convert a `Thread` snapshot into a flat sequence of protocol `Event`s
|
||||
/// suitable for replaying into the TUI event store.
|
||||
///
|
||||
/// Each turn is expanded into `TurnStarted`, zero or more `ItemCompleted`,
|
||||
/// and a terminal event that matches the turn's `TurnStatus`. Returns an
|
||||
/// empty vec (with a warning log) if the thread ID is not a valid UUID.
|
||||
pub(super) fn thread_snapshot_events(
|
||||
thread: &Thread,
|
||||
show_raw_agent_reasoning: bool,
|
||||
) -> Vec<Event> {
|
||||
let Ok(thread_id) = ThreadId::from_string(&thread.id) else {
|
||||
tracing::warn!(
|
||||
thread_id = %thread.id,
|
||||
"ignoring app-server thread snapshot with invalid thread id"
|
||||
);
|
||||
return Vec::new();
|
||||
};
|
||||
|
||||
thread
|
||||
.turns
|
||||
.iter()
|
||||
.flat_map(|turn| turn_snapshot_events(thread_id, turn, show_raw_agent_reasoning))
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn legacy_thread_event(params: Option<Value>) -> Option<(ThreadId, Event)> {
|
||||
let Value::Object(mut params) = params? else {
|
||||
return None;
|
||||
|
|
@ -286,16 +316,16 @@ fn server_notification_thread_events(
|
|||
}),
|
||||
}],
|
||||
)),
|
||||
ServerNotification::TurnCompleted(notification) => Some((
|
||||
ThreadId::from_string(¬ification.thread_id).ok()?,
|
||||
vec![Event {
|
||||
id: String::new(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: notification.turn.id,
|
||||
last_agent_message: None,
|
||||
}),
|
||||
}],
|
||||
)),
|
||||
ServerNotification::TurnCompleted(notification) => {
|
||||
let thread_id = ThreadId::from_string(¬ification.thread_id).ok()?;
|
||||
let mut events = Vec::new();
|
||||
append_terminal_turn_events(
|
||||
&mut events,
|
||||
¬ification.turn,
|
||||
/*include_failed_error*/ false,
|
||||
);
|
||||
Some((thread_id, events))
|
||||
}
|
||||
ServerNotification::ItemStarted(notification) => Some((
|
||||
ThreadId::from_string(¬ification.thread_id).ok()?,
|
||||
vec![Event {
|
||||
|
|
@ -303,7 +333,7 @@ fn server_notification_thread_events(
|
|||
msg: EventMsg::ItemStarted(ItemStartedEvent {
|
||||
thread_id: ThreadId::from_string(¬ification.thread_id).ok()?,
|
||||
turn_id: notification.turn_id,
|
||||
item: thread_item_to_core(notification.item)?,
|
||||
item: thread_item_to_core(¬ification.item)?,
|
||||
}),
|
||||
}],
|
||||
)),
|
||||
|
|
@ -314,7 +344,7 @@ fn server_notification_thread_events(
|
|||
msg: EventMsg::ItemCompleted(ItemCompletedEvent {
|
||||
thread_id: ThreadId::from_string(¬ification.thread_id).ok()?,
|
||||
turn_id: notification.turn_id,
|
||||
item: thread_item_to_core(notification.item)?,
|
||||
item: thread_item_to_core(¬ification.item)?,
|
||||
}),
|
||||
}],
|
||||
)),
|
||||
|
|
@ -418,36 +448,150 @@ fn token_usage_from_app_server(
|
|||
}
|
||||
}
|
||||
|
||||
fn thread_item_to_core(item: ThreadItem) -> Option<TurnItem> {
|
||||
/// Expand a single `Turn` into the event sequence the TUI would have
|
||||
/// observed if it had been connected for the turn's entire lifetime.
|
||||
///
|
||||
/// Snapshot replay keeps committed-item semantics for user / plan /
|
||||
/// agent-message items, while replaying the legacy events that still
|
||||
/// drive rendering for reasoning, web-search, image-generation, and
|
||||
/// context-compaction history cells.
|
||||
fn turn_snapshot_events(
|
||||
thread_id: ThreadId,
|
||||
turn: &Turn,
|
||||
show_raw_agent_reasoning: bool,
|
||||
) -> Vec<Event> {
|
||||
let mut events = vec![Event {
|
||||
id: String::new(),
|
||||
msg: EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: turn.id.clone(),
|
||||
model_context_window: None,
|
||||
collaboration_mode_kind: ModeKind::default(),
|
||||
}),
|
||||
}];
|
||||
|
||||
for item in &turn.items {
|
||||
let Some(item) = thread_item_to_core(item) else {
|
||||
continue;
|
||||
};
|
||||
match item {
|
||||
TurnItem::UserMessage(_) | TurnItem::Plan(_) | TurnItem::AgentMessage(_) => {
|
||||
events.push(Event {
|
||||
id: String::new(),
|
||||
msg: EventMsg::ItemCompleted(ItemCompletedEvent {
|
||||
thread_id,
|
||||
turn_id: turn.id.clone(),
|
||||
item,
|
||||
}),
|
||||
});
|
||||
}
|
||||
TurnItem::Reasoning(_)
|
||||
| TurnItem::WebSearch(_)
|
||||
| TurnItem::ImageGeneration(_)
|
||||
| TurnItem::ContextCompaction(_) => {
|
||||
events.extend(
|
||||
item.as_legacy_events(show_raw_agent_reasoning)
|
||||
.into_iter()
|
||||
.map(|msg| Event {
|
||||
id: String::new(),
|
||||
msg,
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
append_terminal_turn_events(&mut events, turn, /*include_failed_error*/ true);
|
||||
|
||||
events
|
||||
}
|
||||
|
||||
/// Append the terminal event(s) for a turn based on its `TurnStatus`.
|
||||
///
|
||||
/// This function is shared between the live notification bridge
|
||||
/// (`TurnCompleted` handling) and the snapshot replay path so that both
|
||||
/// produce identical `EventMsg` sequences for the same turn status.
|
||||
///
|
||||
/// - `Completed` → `TurnComplete`
|
||||
/// - `Interrupted` → `TurnAborted { reason: Interrupted }`
|
||||
/// - `Failed` → `Error` (if present) then `TurnComplete`
|
||||
/// - `InProgress` → no events (the turn is still running)
|
||||
fn append_terminal_turn_events(events: &mut Vec<Event>, turn: &Turn, include_failed_error: bool) {
|
||||
match turn.status {
|
||||
TurnStatus::Completed => events.push(Event {
|
||||
id: String::new(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: turn.id.clone(),
|
||||
last_agent_message: None,
|
||||
}),
|
||||
}),
|
||||
TurnStatus::Interrupted => events.push(Event {
|
||||
id: String::new(),
|
||||
msg: EventMsg::TurnAborted(TurnAbortedEvent {
|
||||
turn_id: Some(turn.id.clone()),
|
||||
reason: TurnAbortReason::Interrupted,
|
||||
}),
|
||||
}),
|
||||
TurnStatus::Failed => {
|
||||
if include_failed_error && let Some(error) = &turn.error {
|
||||
events.push(Event {
|
||||
id: String::new(),
|
||||
msg: EventMsg::Error(ErrorEvent {
|
||||
message: error.message.clone(),
|
||||
codex_error_info: error
|
||||
.codex_error_info
|
||||
.clone()
|
||||
.and_then(app_server_codex_error_info_to_core),
|
||||
}),
|
||||
});
|
||||
}
|
||||
events.push(Event {
|
||||
id: String::new(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: turn.id.clone(),
|
||||
last_agent_message: None,
|
||||
}),
|
||||
});
|
||||
}
|
||||
TurnStatus::InProgress => {
|
||||
// Preserve unfinished turns during snapshot replay without emitting completion events.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn thread_item_to_core(item: &ThreadItem) -> Option<TurnItem> {
|
||||
match item {
|
||||
ThreadItem::UserMessage { id, content } => Some(TurnItem::UserMessage(UserMessageItem {
|
||||
id,
|
||||
id: id.clone(),
|
||||
content: content
|
||||
.into_iter()
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(codex_app_server_protocol::UserInput::into_core)
|
||||
.collect(),
|
||||
})),
|
||||
ThreadItem::AgentMessage { id, text, phase } => {
|
||||
Some(TurnItem::AgentMessage(AgentMessageItem {
|
||||
id,
|
||||
content: vec![AgentMessageContent::Text { text }],
|
||||
phase,
|
||||
id: id.clone(),
|
||||
content: vec![AgentMessageContent::Text { text: text.clone() }],
|
||||
phase: phase.clone(),
|
||||
}))
|
||||
}
|
||||
ThreadItem::Plan { id, text } => Some(TurnItem::Plan(PlanItem { id, text })),
|
||||
ThreadItem::Plan { id, text } => Some(TurnItem::Plan(PlanItem {
|
||||
id: id.clone(),
|
||||
text: text.clone(),
|
||||
})),
|
||||
ThreadItem::Reasoning {
|
||||
id,
|
||||
summary,
|
||||
content,
|
||||
} => Some(TurnItem::Reasoning(ReasoningItem {
|
||||
id,
|
||||
summary_text: summary,
|
||||
raw_content: content,
|
||||
id: id.clone(),
|
||||
summary_text: summary.clone(),
|
||||
raw_content: content.clone(),
|
||||
})),
|
||||
ThreadItem::WebSearch { id, query, action } => Some(TurnItem::WebSearch(WebSearchItem {
|
||||
id,
|
||||
query,
|
||||
action: app_server_web_search_action_to_core(action?)?,
|
||||
id: id.clone(),
|
||||
query: query.clone(),
|
||||
action: app_server_web_search_action_to_core(action.clone()?)?,
|
||||
})),
|
||||
ThreadItem::ImageGeneration {
|
||||
id,
|
||||
|
|
@ -455,14 +599,16 @@ fn thread_item_to_core(item: ThreadItem) -> Option<TurnItem> {
|
|||
revised_prompt,
|
||||
result,
|
||||
} => Some(TurnItem::ImageGeneration(ImageGenerationItem {
|
||||
id,
|
||||
status,
|
||||
revised_prompt,
|
||||
result,
|
||||
id: id.clone(),
|
||||
status: status.clone(),
|
||||
revised_prompt: revised_prompt.clone(),
|
||||
result: result.clone(),
|
||||
saved_path: None,
|
||||
})),
|
||||
ThreadItem::ContextCompaction { id } => {
|
||||
Some(TurnItem::ContextCompaction(ContextCompactionItem { id }))
|
||||
Some(TurnItem::ContextCompaction(ContextCompactionItem {
|
||||
id: id.clone(),
|
||||
}))
|
||||
}
|
||||
ThreadItem::CommandExecution { .. }
|
||||
| ThreadItem::FileChange { .. }
|
||||
|
|
@ -491,7 +637,9 @@ fn app_server_web_search_action_to_core(
|
|||
codex_app_server_protocol::WebSearchAction::FindInPage { url, pattern } => {
|
||||
Some(codex_protocol::models::WebSearchAction::FindInPage { url, pattern })
|
||||
}
|
||||
codex_app_server_protocol::WebSearchAction::Other => None,
|
||||
codex_app_server_protocol::WebSearchAction::Other => {
|
||||
Some(codex_protocol::models::WebSearchAction::Other)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -504,13 +652,19 @@ fn app_server_codex_error_info_to_core(
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::server_notification_thread_events;
|
||||
use super::thread_snapshot_events;
|
||||
use super::turn_snapshot_events;
|
||||
use codex_app_server_protocol::AgentMessageDeltaNotification;
|
||||
use codex_app_server_protocol::CodexErrorInfo;
|
||||
use codex_app_server_protocol::ItemCompletedNotification;
|
||||
use codex_app_server_protocol::ReasoningSummaryTextDeltaNotification;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadStatus;
|
||||
use codex_app_server_protocol::Turn;
|
||||
use codex_app_server_protocol::TurnCompletedNotification;
|
||||
use codex_app_server_protocol::TurnError;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::items::AgentMessageContent;
|
||||
|
|
@ -518,7 +672,11 @@ mod tests {
|
|||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::models::MessagePhase;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use codex_protocol::protocol::TurnAbortedEvent;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[test]
|
||||
fn bridges_completed_agent_messages_from_server_notifications() {
|
||||
|
|
@ -601,6 +759,74 @@ mod tests {
|
|||
assert_eq!(completed.last_agent_message, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bridges_interrupted_turn_completion_from_server_notifications() {
|
||||
let thread_id = "019cee8c-b993-7e33-88c0-014d4e62612d".to_string();
|
||||
let turn_id = "019cee8c-b9b4-7f10-a1b0-38caa876a012".to_string();
|
||||
|
||||
let (actual_thread_id, events) = server_notification_thread_events(
|
||||
ServerNotification::TurnCompleted(TurnCompletedNotification {
|
||||
thread_id: thread_id.clone(),
|
||||
turn: Turn {
|
||||
id: turn_id.clone(),
|
||||
items: Vec::new(),
|
||||
status: TurnStatus::Interrupted,
|
||||
error: None,
|
||||
},
|
||||
}),
|
||||
)
|
||||
.expect("notification should bridge");
|
||||
|
||||
assert_eq!(
|
||||
actual_thread_id,
|
||||
ThreadId::from_string(&thread_id).expect("valid thread id")
|
||||
);
|
||||
let [event] = events.as_slice() else {
|
||||
panic!("expected one bridged event");
|
||||
};
|
||||
let EventMsg::TurnAborted(aborted) = &event.msg else {
|
||||
panic!("expected turn aborted event");
|
||||
};
|
||||
assert_eq!(aborted.turn_id.as_deref(), Some(turn_id.as_str()));
|
||||
assert_eq!(aborted.reason, TurnAbortReason::Interrupted);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bridges_failed_turn_completion_from_server_notifications() {
|
||||
let thread_id = "019cee8c-b993-7e33-88c0-014d4e62612d".to_string();
|
||||
let turn_id = "019cee8c-b9b4-7f10-a1b0-38caa876a012".to_string();
|
||||
|
||||
let (actual_thread_id, events) = server_notification_thread_events(
|
||||
ServerNotification::TurnCompleted(TurnCompletedNotification {
|
||||
thread_id: thread_id.clone(),
|
||||
turn: Turn {
|
||||
id: turn_id.clone(),
|
||||
items: Vec::new(),
|
||||
status: TurnStatus::Failed,
|
||||
error: Some(TurnError {
|
||||
message: "request failed".to_string(),
|
||||
codex_error_info: Some(CodexErrorInfo::Other),
|
||||
additional_details: None,
|
||||
}),
|
||||
},
|
||||
}),
|
||||
)
|
||||
.expect("notification should bridge");
|
||||
|
||||
assert_eq!(
|
||||
actual_thread_id,
|
||||
ThreadId::from_string(&thread_id).expect("valid thread id")
|
||||
);
|
||||
let [complete_event] = events.as_slice() else {
|
||||
panic!("expected turn completion only");
|
||||
};
|
||||
let EventMsg::TurnComplete(completed) = &complete_event.msg else {
|
||||
panic!("expected turn complete event");
|
||||
};
|
||||
assert_eq!(completed.turn_id, turn_id);
|
||||
assert_eq!(completed.last_agent_message, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bridges_text_deltas_from_server_notifications() {
|
||||
let thread_id = "019cee8c-b993-7e33-88c0-014d4e62612d".to_string();
|
||||
|
|
@ -642,4 +868,177 @@ mod tests {
|
|||
};
|
||||
assert_eq!(delta.delta, "Thinking");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bridges_thread_snapshot_turns_for_resume_restore() {
|
||||
let thread_id = ThreadId::new();
|
||||
let events = thread_snapshot_events(
|
||||
&Thread {
|
||||
id: thread_id.to_string(),
|
||||
preview: "hello".to_string(),
|
||||
ephemeral: false,
|
||||
model_provider: "openai".to_string(),
|
||||
created_at: 0,
|
||||
updated_at: 0,
|
||||
status: ThreadStatus::Idle,
|
||||
path: None,
|
||||
cwd: PathBuf::from("/tmp/project"),
|
||||
cli_version: "test".to_string(),
|
||||
source: SessionSource::Cli.into(),
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
git_info: None,
|
||||
name: Some("restore".to_string()),
|
||||
turns: vec![
|
||||
Turn {
|
||||
id: "turn-complete".to_string(),
|
||||
items: vec![
|
||||
ThreadItem::UserMessage {
|
||||
id: "user-1".to_string(),
|
||||
content: vec![codex_app_server_protocol::UserInput::Text {
|
||||
text: "hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
},
|
||||
ThreadItem::AgentMessage {
|
||||
id: "assistant-1".to_string(),
|
||||
text: "hi".to_string(),
|
||||
phase: Some(MessagePhase::FinalAnswer),
|
||||
},
|
||||
],
|
||||
status: TurnStatus::Completed,
|
||||
error: None,
|
||||
},
|
||||
Turn {
|
||||
id: "turn-interrupted".to_string(),
|
||||
items: Vec::new(),
|
||||
status: TurnStatus::Interrupted,
|
||||
error: None,
|
||||
},
|
||||
Turn {
|
||||
id: "turn-failed".to_string(),
|
||||
items: Vec::new(),
|
||||
status: TurnStatus::Failed,
|
||||
error: Some(TurnError {
|
||||
message: "request failed".to_string(),
|
||||
codex_error_info: Some(CodexErrorInfo::Other),
|
||||
additional_details: None,
|
||||
}),
|
||||
},
|
||||
],
|
||||
},
|
||||
/*show_raw_agent_reasoning*/ false,
|
||||
);
|
||||
|
||||
assert_eq!(events.len(), 9);
|
||||
assert!(matches!(events[0].msg, EventMsg::TurnStarted(_)));
|
||||
assert!(matches!(events[1].msg, EventMsg::ItemCompleted(_)));
|
||||
assert!(matches!(events[2].msg, EventMsg::ItemCompleted(_)));
|
||||
assert!(matches!(events[3].msg, EventMsg::TurnComplete(_)));
|
||||
assert!(matches!(events[4].msg, EventMsg::TurnStarted(_)));
|
||||
let EventMsg::TurnAborted(TurnAbortedEvent { turn_id, reason }) = &events[5].msg else {
|
||||
panic!("expected interrupted turn replay");
|
||||
};
|
||||
assert_eq!(turn_id.as_deref(), Some("turn-interrupted"));
|
||||
assert_eq!(*reason, TurnAbortReason::Interrupted);
|
||||
assert!(matches!(events[6].msg, EventMsg::TurnStarted(_)));
|
||||
let EventMsg::Error(error) = &events[7].msg else {
|
||||
panic!("expected failed turn error replay");
|
||||
};
|
||||
assert_eq!(error.message, "request failed");
|
||||
assert_eq!(
|
||||
error.codex_error_info,
|
||||
Some(codex_protocol::protocol::CodexErrorInfo::Other)
|
||||
);
|
||||
assert!(matches!(events[8].msg, EventMsg::TurnComplete(_)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bridges_non_message_snapshot_items_via_legacy_events() {
|
||||
let events = turn_snapshot_events(
|
||||
ThreadId::new(),
|
||||
&Turn {
|
||||
id: "turn-complete".to_string(),
|
||||
items: vec![
|
||||
ThreadItem::Reasoning {
|
||||
id: "reasoning-1".to_string(),
|
||||
summary: vec!["Need to inspect config".to_string()],
|
||||
content: vec!["hidden chain".to_string()],
|
||||
},
|
||||
ThreadItem::WebSearch {
|
||||
id: "search-1".to_string(),
|
||||
query: "ratatui stylize".to_string(),
|
||||
action: Some(codex_app_server_protocol::WebSearchAction::Other),
|
||||
},
|
||||
ThreadItem::ImageGeneration {
|
||||
id: "image-1".to_string(),
|
||||
status: "completed".to_string(),
|
||||
revised_prompt: Some("diagram".to_string()),
|
||||
result: "image.png".to_string(),
|
||||
},
|
||||
ThreadItem::ContextCompaction {
|
||||
id: "compact-1".to_string(),
|
||||
},
|
||||
],
|
||||
status: TurnStatus::Completed,
|
||||
error: None,
|
||||
},
|
||||
/*show_raw_agent_reasoning*/ false,
|
||||
);
|
||||
|
||||
assert_eq!(events.len(), 6);
|
||||
assert!(matches!(events[0].msg, EventMsg::TurnStarted(_)));
|
||||
let EventMsg::AgentReasoning(reasoning) = &events[1].msg else {
|
||||
panic!("expected reasoning replay");
|
||||
};
|
||||
assert_eq!(reasoning.text, "Need to inspect config");
|
||||
let EventMsg::WebSearchEnd(web_search) = &events[2].msg else {
|
||||
panic!("expected web search replay");
|
||||
};
|
||||
assert_eq!(web_search.call_id, "search-1");
|
||||
assert_eq!(web_search.query, "ratatui stylize");
|
||||
assert_eq!(
|
||||
web_search.action,
|
||||
codex_protocol::models::WebSearchAction::Other
|
||||
);
|
||||
let EventMsg::ImageGenerationEnd(image_generation) = &events[3].msg else {
|
||||
panic!("expected image generation replay");
|
||||
};
|
||||
assert_eq!(image_generation.call_id, "image-1");
|
||||
assert_eq!(image_generation.status, "completed");
|
||||
assert_eq!(image_generation.revised_prompt.as_deref(), Some("diagram"));
|
||||
assert_eq!(image_generation.result, "image.png");
|
||||
assert!(matches!(events[4].msg, EventMsg::ContextCompacted(_)));
|
||||
assert!(matches!(events[5].msg, EventMsg::TurnComplete(_)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bridges_raw_reasoning_snapshot_items_when_enabled() {
|
||||
let events = turn_snapshot_events(
|
||||
ThreadId::new(),
|
||||
&Turn {
|
||||
id: "turn-complete".to_string(),
|
||||
items: vec![ThreadItem::Reasoning {
|
||||
id: "reasoning-1".to_string(),
|
||||
summary: vec!["Need to inspect config".to_string()],
|
||||
content: vec!["hidden chain".to_string()],
|
||||
}],
|
||||
status: TurnStatus::Completed,
|
||||
error: None,
|
||||
},
|
||||
/*show_raw_agent_reasoning*/ true,
|
||||
);
|
||||
|
||||
assert_eq!(events.len(), 4);
|
||||
assert!(matches!(events[0].msg, EventMsg::TurnStarted(_)));
|
||||
let EventMsg::AgentReasoning(reasoning) = &events[1].msg else {
|
||||
panic!("expected reasoning replay");
|
||||
};
|
||||
assert_eq!(reasoning.text, "Need to inspect config");
|
||||
let EventMsg::AgentReasoningRawContent(raw_reasoning) = &events[2].msg else {
|
||||
panic!("expected raw reasoning replay");
|
||||
};
|
||||
assert_eq!(raw_reasoning.text, "hidden chain");
|
||||
assert!(matches!(events[3].msg, EventMsg::TurnComplete(_)));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -55,15 +55,6 @@ use codex_app_server_protocol::TurnSteerResponse;
|
|||
use codex_core::config::Config;
|
||||
use codex_otel::TelemetryAuthMode;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::items::AgentMessageContent;
|
||||
use codex_protocol::items::AgentMessageItem;
|
||||
use codex_protocol::items::ContextCompactionItem;
|
||||
use codex_protocol::items::ImageGenerationItem;
|
||||
use codex_protocol::items::PlanItem;
|
||||
use codex_protocol::items::ReasoningItem;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::items::UserMessageItem;
|
||||
use codex_protocol::items::WebSearchItem;
|
||||
use codex_protocol::openai_models::ModelAvailabilityNux;
|
||||
use codex_protocol::openai_models::ModelPreset;
|
||||
use codex_protocol::openai_models::ModelUpgrade;
|
||||
|
|
@ -73,8 +64,6 @@ use codex_protocol::protocol::ConversationAudioParams;
|
|||
use codex_protocol::protocol::ConversationStartParams;
|
||||
use codex_protocol::protocol::ConversationTextParams;
|
||||
use codex_protocol::protocol::CreditsSnapshot;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::ItemCompletedEvent;
|
||||
use codex_protocol::protocol::RateLimitSnapshot;
|
||||
use codex_protocol::protocol::RateLimitWindow;
|
||||
use codex_protocol::protocol::ReviewRequest;
|
||||
|
|
@ -123,8 +112,18 @@ impl ThreadParamsMode {
|
|||
}
|
||||
}
|
||||
|
||||
/// Result of starting, resuming, or forking an app-server thread.
|
||||
///
|
||||
/// Carries the full `Thread` snapshot returned by the server alongside the
|
||||
/// derived `SessionConfiguredEvent`. The snapshot's `turns` are used by
|
||||
/// `App::restore_started_app_server_thread` to seed the event store and
|
||||
/// replay transcript history — this is the only source of prior-turn data
|
||||
/// for remote sessions, where historical websocket notifications are not
|
||||
/// re-sent after the handshake.
|
||||
pub(crate) struct AppServerStartedThread {
|
||||
pub(crate) thread: Thread,
|
||||
pub(crate) session_configured: SessionConfiguredEvent,
|
||||
pub(crate) show_raw_agent_reasoning: bool,
|
||||
}
|
||||
|
||||
impl AppServerSession {
|
||||
|
|
@ -267,7 +266,7 @@ impl AppServerSession {
|
|||
})
|
||||
.await
|
||||
.wrap_err("thread/start failed during TUI bootstrap")?;
|
||||
started_thread_from_start_response(&response)
|
||||
started_thread_from_start_response(response)
|
||||
}
|
||||
|
||||
pub(crate) async fn resume_thread(
|
||||
|
|
@ -289,7 +288,7 @@ impl AppServerSession {
|
|||
})
|
||||
.await
|
||||
.wrap_err("thread/resume failed during TUI bootstrap")?;
|
||||
started_thread_from_resume_response(&response, show_raw_agent_reasoning)
|
||||
started_thread_from_resume_response(response, show_raw_agent_reasoning)
|
||||
}
|
||||
|
||||
pub(crate) async fn fork_thread(
|
||||
|
|
@ -311,7 +310,7 @@ impl AppServerSession {
|
|||
})
|
||||
.await
|
||||
.wrap_err("thread/fork failed during TUI bootstrap")?;
|
||||
started_thread_from_fork_response(&response, show_raw_agent_reasoning)
|
||||
started_thread_from_fork_response(response, show_raw_agent_reasoning)
|
||||
}
|
||||
|
||||
fn thread_params_mode(&self) -> ThreadParamsMode {
|
||||
|
|
@ -836,46 +835,42 @@ fn thread_cwd_from_config(config: &Config, thread_params_mode: ThreadParamsMode)
|
|||
}
|
||||
|
||||
fn started_thread_from_start_response(
|
||||
response: &ThreadStartResponse,
|
||||
response: ThreadStartResponse,
|
||||
) -> Result<AppServerStartedThread> {
|
||||
let session_configured = session_configured_from_thread_start_response(response)
|
||||
let session_configured = session_configured_from_thread_start_response(&response)
|
||||
.map_err(color_eyre::eyre::Report::msg)?;
|
||||
Ok(AppServerStartedThread { session_configured })
|
||||
Ok(AppServerStartedThread {
|
||||
thread: response.thread,
|
||||
session_configured,
|
||||
show_raw_agent_reasoning: false,
|
||||
})
|
||||
}
|
||||
|
||||
fn started_thread_from_resume_response(
|
||||
response: &ThreadResumeResponse,
|
||||
response: ThreadResumeResponse,
|
||||
show_raw_agent_reasoning: bool,
|
||||
) -> Result<AppServerStartedThread> {
|
||||
let session_configured = session_configured_from_thread_resume_response(response)
|
||||
let session_configured = session_configured_from_thread_resume_response(&response)
|
||||
.map_err(color_eyre::eyre::Report::msg)?;
|
||||
let thread = response.thread;
|
||||
Ok(AppServerStartedThread {
|
||||
session_configured: SessionConfiguredEvent {
|
||||
initial_messages: thread_initial_messages(
|
||||
&session_configured.session_id,
|
||||
&response.thread.turns,
|
||||
show_raw_agent_reasoning,
|
||||
),
|
||||
..session_configured
|
||||
},
|
||||
thread,
|
||||
session_configured,
|
||||
show_raw_agent_reasoning,
|
||||
})
|
||||
}
|
||||
|
||||
fn started_thread_from_fork_response(
|
||||
response: &ThreadForkResponse,
|
||||
response: ThreadForkResponse,
|
||||
show_raw_agent_reasoning: bool,
|
||||
) -> Result<AppServerStartedThread> {
|
||||
let session_configured = session_configured_from_thread_fork_response(response)
|
||||
let session_configured = session_configured_from_thread_fork_response(&response)
|
||||
.map_err(color_eyre::eyre::Report::msg)?;
|
||||
let thread = response.thread;
|
||||
Ok(AppServerStartedThread {
|
||||
session_configured: SessionConfiguredEvent {
|
||||
initial_messages: thread_initial_messages(
|
||||
&session_configured.session_id,
|
||||
&response.thread.turns,
|
||||
show_raw_agent_reasoning,
|
||||
),
|
||||
..session_configured
|
||||
},
|
||||
thread,
|
||||
session_configured,
|
||||
show_raw_agent_reasoning,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -992,121 +987,6 @@ fn session_configured_from_thread_response(
|
|||
})
|
||||
}
|
||||
|
||||
fn thread_initial_messages(
|
||||
thread_id: &ThreadId,
|
||||
turns: &[codex_app_server_protocol::Turn],
|
||||
show_raw_agent_reasoning: bool,
|
||||
) -> Option<Vec<EventMsg>> {
|
||||
let events: Vec<EventMsg> = turns
|
||||
.iter()
|
||||
.flat_map(|turn| turn_initial_messages(thread_id, turn, show_raw_agent_reasoning))
|
||||
.collect();
|
||||
(!events.is_empty()).then_some(events)
|
||||
}
|
||||
|
||||
fn turn_initial_messages(
|
||||
thread_id: &ThreadId,
|
||||
turn: &codex_app_server_protocol::Turn,
|
||||
show_raw_agent_reasoning: bool,
|
||||
) -> Vec<EventMsg> {
|
||||
turn.items
|
||||
.iter()
|
||||
.cloned()
|
||||
.filter_map(app_server_thread_item_to_core)
|
||||
.flat_map(|item| match item {
|
||||
TurnItem::UserMessage(item) => vec![item.as_legacy_event()],
|
||||
TurnItem::Plan(item) => vec![EventMsg::ItemCompleted(ItemCompletedEvent {
|
||||
thread_id: *thread_id,
|
||||
turn_id: turn.id.clone(),
|
||||
item: TurnItem::Plan(item),
|
||||
})],
|
||||
item => item.as_legacy_events(show_raw_agent_reasoning),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn app_server_thread_item_to_core(item: codex_app_server_protocol::ThreadItem) -> Option<TurnItem> {
|
||||
match item {
|
||||
codex_app_server_protocol::ThreadItem::UserMessage { id, content } => {
|
||||
Some(TurnItem::UserMessage(UserMessageItem {
|
||||
id,
|
||||
content: content
|
||||
.into_iter()
|
||||
.map(codex_app_server_protocol::UserInput::into_core)
|
||||
.collect(),
|
||||
}))
|
||||
}
|
||||
codex_app_server_protocol::ThreadItem::AgentMessage { id, text, phase } => {
|
||||
Some(TurnItem::AgentMessage(AgentMessageItem {
|
||||
id,
|
||||
content: vec![AgentMessageContent::Text { text }],
|
||||
phase,
|
||||
}))
|
||||
}
|
||||
codex_app_server_protocol::ThreadItem::Plan { id, text } => {
|
||||
Some(TurnItem::Plan(PlanItem { id, text }))
|
||||
}
|
||||
codex_app_server_protocol::ThreadItem::Reasoning {
|
||||
id,
|
||||
summary,
|
||||
content,
|
||||
} => Some(TurnItem::Reasoning(ReasoningItem {
|
||||
id,
|
||||
summary_text: summary,
|
||||
raw_content: content,
|
||||
})),
|
||||
codex_app_server_protocol::ThreadItem::WebSearch { id, query, action } => {
|
||||
Some(TurnItem::WebSearch(WebSearchItem {
|
||||
id,
|
||||
query,
|
||||
action: app_server_web_search_action_to_core(action?)?,
|
||||
}))
|
||||
}
|
||||
codex_app_server_protocol::ThreadItem::ImageGeneration {
|
||||
id,
|
||||
status,
|
||||
revised_prompt,
|
||||
result,
|
||||
} => Some(TurnItem::ImageGeneration(ImageGenerationItem {
|
||||
id,
|
||||
status,
|
||||
revised_prompt,
|
||||
result,
|
||||
saved_path: None,
|
||||
})),
|
||||
codex_app_server_protocol::ThreadItem::ContextCompaction { id } => {
|
||||
Some(TurnItem::ContextCompaction(ContextCompactionItem { id }))
|
||||
}
|
||||
codex_app_server_protocol::ThreadItem::CommandExecution { .. }
|
||||
| codex_app_server_protocol::ThreadItem::FileChange { .. }
|
||||
| codex_app_server_protocol::ThreadItem::McpToolCall { .. }
|
||||
| codex_app_server_protocol::ThreadItem::DynamicToolCall { .. }
|
||||
| codex_app_server_protocol::ThreadItem::CollabAgentToolCall { .. }
|
||||
| codex_app_server_protocol::ThreadItem::ImageView { .. }
|
||||
| codex_app_server_protocol::ThreadItem::EnteredReviewMode { .. }
|
||||
| codex_app_server_protocol::ThreadItem::ExitedReviewMode { .. } => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn app_server_web_search_action_to_core(
|
||||
action: codex_app_server_protocol::WebSearchAction,
|
||||
) -> Option<codex_protocol::models::WebSearchAction> {
|
||||
match action {
|
||||
codex_app_server_protocol::WebSearchAction::Search { query, queries } => {
|
||||
Some(codex_protocol::models::WebSearchAction::Search { query, queries })
|
||||
}
|
||||
codex_app_server_protocol::WebSearchAction::OpenPage { url } => {
|
||||
Some(codex_protocol::models::WebSearchAction::OpenPage { url })
|
||||
}
|
||||
codex_app_server_protocol::WebSearchAction::FindInPage { url, pattern } => {
|
||||
Some(codex_protocol::models::WebSearchAction::FindInPage { url, pattern })
|
||||
}
|
||||
codex_app_server_protocol::WebSearchAction::Other => {
|
||||
Some(codex_protocol::models::WebSearchAction::Other)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn app_server_rate_limit_snapshots_to_core(
|
||||
response: GetAccountRateLimitsResponse,
|
||||
) -> Vec<RateLimitSnapshot> {
|
||||
|
|
@ -1204,7 +1084,7 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn resume_response_restores_initial_messages_from_turn_items() {
|
||||
fn resume_response_relies_on_snapshot_replay_not_initial_messages() {
|
||||
let thread_id = ThreadId::new();
|
||||
let response = ThreadResumeResponse {
|
||||
thread: codex_app_server_protocol::Thread {
|
||||
|
|
@ -1254,29 +1134,11 @@ mod tests {
|
|||
};
|
||||
|
||||
let started =
|
||||
started_thread_from_resume_response(&response, /*show_raw_agent_reasoning*/ false)
|
||||
started_thread_from_resume_response(response, /*show_raw_agent_reasoning*/ false)
|
||||
.expect("resume response should map");
|
||||
let initial_messages = started
|
||||
.session_configured
|
||||
.initial_messages
|
||||
.expect("resume response should restore replay history");
|
||||
|
||||
assert_eq!(initial_messages.len(), 2);
|
||||
match &initial_messages[0] {
|
||||
EventMsg::UserMessage(event) => {
|
||||
assert_eq!(event.message, "hello from history");
|
||||
assert_eq!(event.images.as_ref(), Some(&Vec::new()));
|
||||
assert!(event.local_images.is_empty());
|
||||
assert!(event.text_elements.is_empty());
|
||||
}
|
||||
other => panic!("expected replayed user message, got {other:?}"),
|
||||
}
|
||||
match &initial_messages[1] {
|
||||
EventMsg::AgentMessage(event) => {
|
||||
assert_eq!(event.message, "assistant reply");
|
||||
assert_eq!(event.phase, None);
|
||||
}
|
||||
other => panic!("expected replayed agent message, got {other:?}"),
|
||||
}
|
||||
assert!(started.session_configured.initial_messages.is_none());
|
||||
assert!(!started.show_raw_agent_reasoning);
|
||||
assert_eq!(started.thread.turns.len(), 1);
|
||||
assert_eq!(started.thread.turns[0].items.len(), 2);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -719,6 +719,10 @@ pub(crate) struct ChatWidget {
|
|||
// When resuming an existing session (selected via resume picker), avoid an
|
||||
// immediate redraw on SessionConfigured to prevent a gratuitous UI flicker.
|
||||
suppress_session_configured_redraw: bool,
|
||||
// During snapshot restore, defer startup prompt submission until replayed
|
||||
// history has been rendered so resumed/forked prompts keep chronological
|
||||
// order.
|
||||
suppress_initial_user_message_submit: bool,
|
||||
// User messages queued while a turn is in progress
|
||||
queued_user_messages: VecDeque<UserMessage>,
|
||||
// Steers already submitted to core but not yet committed into history.
|
||||
|
|
@ -1427,7 +1431,11 @@ impl ChatWidget {
|
|||
self.prefetch_connectors();
|
||||
}
|
||||
if let Some(user_message) = self.initial_user_message.take() {
|
||||
self.submit_user_message(user_message);
|
||||
if self.suppress_initial_user_message_submit {
|
||||
self.initial_user_message = Some(user_message);
|
||||
} else {
|
||||
self.submit_user_message(user_message);
|
||||
}
|
||||
}
|
||||
if let Some(forked_from_id) = forked_from_id {
|
||||
self.emit_forked_thread_event(forked_from_id);
|
||||
|
|
@ -1437,6 +1445,21 @@ impl ChatWidget {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) fn set_initial_user_message_submit_suppressed(&mut self, suppressed: bool) {
|
||||
self.suppress_initial_user_message_submit = suppressed;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn set_initial_user_message_for_test(&mut self, user_message: Option<UserMessage>) {
|
||||
self.initial_user_message = user_message;
|
||||
}
|
||||
|
||||
pub(crate) fn submit_initial_user_message_if_pending(&mut self) {
|
||||
if let Some(user_message) = self.initial_user_message.take() {
|
||||
self.submit_user_message(user_message);
|
||||
}
|
||||
}
|
||||
|
||||
fn emit_forked_thread_event(&self, forked_from_id: ThreadId) {
|
||||
let app_event_tx = self.app_event_tx.clone();
|
||||
let codex_home = self.config.codex_home.clone();
|
||||
|
|
@ -3624,6 +3647,7 @@ impl ChatWidget {
|
|||
show_welcome_banner: is_first_run,
|
||||
startup_tooltip_override,
|
||||
suppress_session_configured_redraw: false,
|
||||
suppress_initial_user_message_submit: false,
|
||||
pending_notification: None,
|
||||
quit_shortcut_expires_at: None,
|
||||
quit_shortcut_key: None,
|
||||
|
|
@ -3816,6 +3840,7 @@ impl ChatWidget {
|
|||
show_welcome_banner: false,
|
||||
startup_tooltip_override: None,
|
||||
suppress_session_configured_redraw: true,
|
||||
suppress_initial_user_message_submit: false,
|
||||
pending_notification: None,
|
||||
quit_shortcut_expires_at: None,
|
||||
quit_shortcut_key: None,
|
||||
|
|
|
|||
|
|
@ -1898,6 +1898,7 @@ async fn make_chatwidget_manual(
|
|||
submit_pending_steers_after_interrupt: false,
|
||||
queued_message_edit_binding: crate::key_hint::alt(KeyCode::Up),
|
||||
suppress_session_configured_redraw: false,
|
||||
suppress_initial_user_message_submit: false,
|
||||
pending_notification: None,
|
||||
quit_shortcut_expires_at: None,
|
||||
quit_shortcut_key: None,
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue