diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index dd2018750..da196ef2c 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -12,6 +12,7 @@ use crate::bottom_pane::SelectionViewParams; use crate::bottom_pane::popup_consts::standard_popup_hint_line; use crate::chatwidget::ChatWidget; use crate::chatwidget::ExternalEditorState; +use crate::chatwidget::ThreadInputState; use crate::cwd_prompt::CwdPromptAction; use crate::diff_render::DiffSummary; use crate::exec_command::strip_bash_lc_and_escape; @@ -254,6 +255,7 @@ struct SessionSummary { struct ThreadEventSnapshot { session_configured: Option, events: Vec, + input_state: Option, } #[derive(Debug)] @@ -262,6 +264,7 @@ struct ThreadEventStore { buffer: VecDeque, user_message_ids: HashSet, pending_interactive_replay: PendingInteractiveReplayState, + input_state: Option, capacity: usize, active: bool, } @@ -273,6 +276,7 @@ impl ThreadEventStore { buffer: VecDeque::new(), user_message_ids: HashSet::new(), pending_interactive_replay: PendingInteractiveReplayState::default(), + input_state: None, capacity, active: false, } @@ -342,6 +346,7 @@ impl ThreadEventStore { }) .cloned() .collect(), + input_state: self.input_state.clone(), } } @@ -917,13 +922,15 @@ impl App { let Some(active_id) = self.active_thread_id else { return; }; - let Some(receiver) = self.active_thread_rx.take() else { - return; - }; + let input_state = self.chat_widget.capture_thread_input_state(); if let Some(channel) = self.thread_event_channels.get_mut(&active_id) { + let receiver = self.active_thread_rx.take(); let mut store = channel.store.lock().await; store.active = false; - channel.receiver = Some(receiver); + store.input_state = input_state; + if let Some(receiver) = receiver { + channel.receiver = Some(receiver); + } } } @@ -1335,7 +1342,7 @@ impl App { self.chat_widget = ChatWidget::new_with_op_sender(init, codex_op_tx); self.reset_for_thread_switch(tui)?; - self.replay_thread_snapshot(snapshot); + self.replay_thread_snapshot(snapshot, !is_replay_only); if is_replay_only { self.chat_widget.add_info_message( format!("Agent thread {thread_id} is closed. Replaying saved transcript."), @@ -1473,13 +1480,24 @@ impl App { (active_thread_id != primary_thread_id).then_some((active_thread_id, primary_thread_id)) } - fn replay_thread_snapshot(&mut self, snapshot: ThreadEventSnapshot) { + fn replay_thread_snapshot( + &mut self, + snapshot: ThreadEventSnapshot, + resume_restored_queue: bool, + ) { if let Some(event) = snapshot.session_configured { self.handle_codex_event_replay(event); } + self.chat_widget.set_queue_autosend_suppressed(true); + self.chat_widget + .restore_thread_input_state(snapshot.input_state); for event in snapshot.events { self.handle_codex_event_replay(event); } + self.chat_widget.set_queue_autosend_suppressed(false); + if resume_restored_queue { + self.chat_widget.maybe_send_next_queued_input(); + } self.refresh_status_line(); } @@ -3667,7 +3685,12 @@ mod tests { use codex_core::config::types::ModelAvailabilityNuxConfig; use codex_otel::OtelManager; use codex_protocol::ThreadId; + use codex_protocol::config_types::CollaborationMode; + use codex_protocol::config_types::CollaborationModeMask; + use codex_protocol::config_types::ModeKind; + use codex_protocol::config_types::Settings; use codex_protocol::openai_models::ModelAvailabilityNux; + use codex_protocol::protocol::AgentMessageDeltaEvent; use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; @@ -3675,6 +3698,10 @@ mod tests { use codex_protocol::protocol::SessionConfiguredEvent; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::ThreadRolledBackEvent; + use codex_protocol::protocol::TurnAbortReason; + use codex_protocol::protocol::TurnAbortedEvent; + use codex_protocol::protocol::TurnCompleteEvent; + use codex_protocol::protocol::TurnStartedEvent; use codex_protocol::protocol::UserMessageEvent; use codex_protocol::user_input::TextElement; use codex_protocol::user_input::UserInput; @@ -3977,6 +4004,755 @@ mod tests { Ok(()) } + #[tokio::test] + async fn replay_thread_snapshot_restores_draft_and_queued_input() { + let mut app = make_test_app().await; + let thread_id = ThreadId::new(); + app.thread_event_channels.insert( + thread_id, + ThreadEventChannel::new_with_session_configured( + THREAD_EVENT_CHANNEL_CAPACITY, + Event { + id: "session-configured".to_string(), + msg: EventMsg::SessionConfigured(SessionConfiguredEvent { + session_id: thread_id, + forked_from_id: None, + thread_name: None, + model: "gpt-test".to_string(), + model_provider_id: "test-provider".to_string(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::new_read_only_policy(), + cwd: PathBuf::from("/tmp/project"), + reasoning_effort: None, + history_log_id: 0, + history_entry_count: 0, + initial_messages: None, + network_proxy: None, + rollout_path: Some(PathBuf::new()), + }), + }, + ), + ); + app.activate_thread_channel(thread_id).await; + + app.chat_widget + .apply_external_edit("draft prompt".to_string()); + app.chat_widget.submit_user_message_with_mode( + "queued follow-up".to_string(), + CollaborationModeMask { + name: "Default".to_string(), + mode: None, + model: None, + reasoning_effort: None, + developer_instructions: None, + }, + ); + let expected_input_state = app + .chat_widget + .capture_thread_input_state() + .expect("expected thread input state"); + + app.store_active_thread_receiver().await; + + let snapshot = { + let channel = app + .thread_event_channels + .get(&thread_id) + .expect("thread channel should exist"); + let store = channel.store.lock().await; + assert_eq!(store.input_state, Some(expected_input_state)); + store.snapshot() + }; + + let (chat_widget, _app_event_tx, _rx, mut new_op_rx) = + make_chatwidget_manual_with_sender().await; + app.chat_widget = chat_widget; + + app.replay_thread_snapshot(snapshot, true); + + assert_eq!(app.chat_widget.composer_text_with_pending(), "draft prompt"); + assert!(app.chat_widget.queued_user_message_texts().is_empty()); + match next_user_turn_op(&mut new_op_rx) { + Op::UserTurn { items, .. } => assert_eq!( + items, + vec![UserInput::Text { + text: "queued follow-up".to_string(), + text_elements: Vec::new(), + }] + ), + other => panic!("expected queued follow-up submission, got {other:?}"), + } + } + + #[tokio::test] + async fn replayed_turn_complete_submits_restored_queued_follow_up() { + let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await; + let thread_id = ThreadId::new(); + let session_configured = Event { + id: "session-configured".to_string(), + msg: EventMsg::SessionConfigured(SessionConfiguredEvent { + session_id: thread_id, + forked_from_id: None, + thread_name: None, + model: "gpt-test".to_string(), + model_provider_id: "test-provider".to_string(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::new_read_only_policy(), + cwd: PathBuf::from("/tmp/project"), + reasoning_effort: None, + history_log_id: 0, + history_entry_count: 0, + initial_messages: None, + network_proxy: None, + rollout_path: Some(PathBuf::new()), + }), + }; + app.chat_widget + .handle_codex_event(session_configured.clone()); + app.chat_widget.handle_codex_event(Event { + id: "turn-started".to_string(), + msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + }); + app.chat_widget.handle_codex_event(Event { + id: "agent-delta".to_string(), + msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { + delta: "streaming".to_string(), + }), + }); + app.chat_widget + .apply_external_edit("queued follow-up".to_string()); + app.chat_widget + .handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE)); + let input_state = app + .chat_widget + .capture_thread_input_state() + .expect("expected queued follow-up state"); + + let (chat_widget, _app_event_tx, _rx, mut new_op_rx) = + make_chatwidget_manual_with_sender().await; + app.chat_widget = chat_widget; + app.chat_widget.handle_codex_event(session_configured); + while new_op_rx.try_recv().is_ok() {} + app.replay_thread_snapshot( + ThreadEventSnapshot { + session_configured: None, + events: vec![Event { + id: "turn-complete".to_string(), + msg: EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-1".to_string(), + last_agent_message: None, + }), + }], + input_state: Some(input_state), + }, + true, + ); + + match next_user_turn_op(&mut new_op_rx) { + Op::UserTurn { items, .. } => assert_eq!( + items, + vec![UserInput::Text { + text: "queued follow-up".to_string(), + text_elements: Vec::new(), + }] + ), + other => panic!("expected queued follow-up submission, got {other:?}"), + } + } + + #[tokio::test] + async fn replay_only_thread_keeps_restored_queue_visible() { + let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await; + let thread_id = ThreadId::new(); + let session_configured = Event { + id: "session-configured".to_string(), + msg: EventMsg::SessionConfigured(SessionConfiguredEvent { + session_id: thread_id, + forked_from_id: None, + thread_name: None, + model: "gpt-test".to_string(), + model_provider_id: "test-provider".to_string(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::new_read_only_policy(), + cwd: PathBuf::from("/tmp/project"), + reasoning_effort: None, + history_log_id: 0, + history_entry_count: 0, + initial_messages: None, + network_proxy: None, + rollout_path: Some(PathBuf::new()), + }), + }; + app.chat_widget + .handle_codex_event(session_configured.clone()); + app.chat_widget.handle_codex_event(Event { + id: "turn-started".to_string(), + msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + }); + app.chat_widget.handle_codex_event(Event { + id: "agent-delta".to_string(), + msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { + delta: "streaming".to_string(), + }), + }); + app.chat_widget + .apply_external_edit("queued follow-up".to_string()); + app.chat_widget + .handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE)); + let input_state = app + .chat_widget + .capture_thread_input_state() + .expect("expected queued follow-up state"); + + let (chat_widget, _app_event_tx, _rx, mut new_op_rx) = + make_chatwidget_manual_with_sender().await; + app.chat_widget = chat_widget; + app.chat_widget.handle_codex_event(session_configured); + while new_op_rx.try_recv().is_ok() {} + + app.replay_thread_snapshot( + ThreadEventSnapshot { + session_configured: None, + events: vec![Event { + id: "turn-complete".to_string(), + msg: EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-1".to_string(), + last_agent_message: None, + }), + }], + input_state: Some(input_state), + }, + false, + ); + + assert_eq!( + app.chat_widget.queued_user_message_texts(), + vec!["queued follow-up".to_string()] + ); + assert!( + new_op_rx.try_recv().is_err(), + "replay-only threads should not auto-submit restored queue" + ); + } + + #[tokio::test] + async fn replay_thread_snapshot_keeps_queue_when_running_state_only_comes_from_snapshot() { + let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await; + let thread_id = ThreadId::new(); + let session_configured = Event { + id: "session-configured".to_string(), + msg: EventMsg::SessionConfigured(SessionConfiguredEvent { + session_id: thread_id, + forked_from_id: None, + thread_name: None, + model: "gpt-test".to_string(), + model_provider_id: "test-provider".to_string(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::new_read_only_policy(), + cwd: PathBuf::from("/tmp/project"), + reasoning_effort: None, + history_log_id: 0, + history_entry_count: 0, + initial_messages: None, + network_proxy: None, + rollout_path: Some(PathBuf::new()), + }), + }; + app.chat_widget + .handle_codex_event(session_configured.clone()); + app.chat_widget.handle_codex_event(Event { + id: "turn-started".to_string(), + msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + }); + app.chat_widget.handle_codex_event(Event { + id: "agent-delta".to_string(), + msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { + delta: "streaming".to_string(), + }), + }); + app.chat_widget + .apply_external_edit("queued follow-up".to_string()); + app.chat_widget + .handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE)); + let input_state = app + .chat_widget + .capture_thread_input_state() + .expect("expected queued follow-up state"); + + let (chat_widget, _app_event_tx, _rx, mut new_op_rx) = + make_chatwidget_manual_with_sender().await; + app.chat_widget = chat_widget; + app.chat_widget.handle_codex_event(session_configured); + while new_op_rx.try_recv().is_ok() {} + + app.replay_thread_snapshot( + ThreadEventSnapshot { + session_configured: None, + events: vec![], + input_state: Some(input_state), + }, + true, + ); + + assert_eq!( + app.chat_widget.queued_user_message_texts(), + vec!["queued follow-up".to_string()] + ); + assert!( + new_op_rx.try_recv().is_err(), + "restored queue should stay queued when replay did not prove the turn finished" + ); + } + + #[tokio::test] + async fn replay_thread_snapshot_does_not_submit_queue_before_replay_catches_up() { + let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await; + let thread_id = ThreadId::new(); + let session_configured = Event { + id: "session-configured".to_string(), + msg: EventMsg::SessionConfigured(SessionConfiguredEvent { + session_id: thread_id, + forked_from_id: None, + thread_name: None, + model: "gpt-test".to_string(), + model_provider_id: "test-provider".to_string(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::new_read_only_policy(), + cwd: PathBuf::from("/tmp/project"), + reasoning_effort: None, + history_log_id: 0, + history_entry_count: 0, + initial_messages: None, + network_proxy: None, + rollout_path: Some(PathBuf::new()), + }), + }; + app.chat_widget + .handle_codex_event(session_configured.clone()); + app.chat_widget.handle_codex_event(Event { + id: "turn-started".to_string(), + msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + }); + app.chat_widget.handle_codex_event(Event { + id: "agent-delta".to_string(), + msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { + delta: "streaming".to_string(), + }), + }); + app.chat_widget + .apply_external_edit("queued follow-up".to_string()); + app.chat_widget + .handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE)); + let input_state = app + .chat_widget + .capture_thread_input_state() + .expect("expected queued follow-up state"); + + let (chat_widget, _app_event_tx, _rx, mut new_op_rx) = + make_chatwidget_manual_with_sender().await; + app.chat_widget = chat_widget; + app.chat_widget.handle_codex_event(session_configured); + while new_op_rx.try_recv().is_ok() {} + + app.replay_thread_snapshot( + ThreadEventSnapshot { + session_configured: None, + events: vec![ + Event { + id: "older-turn-complete".to_string(), + msg: EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-0".to_string(), + last_agent_message: None, + }), + }, + Event { + id: "latest-turn-started".to_string(), + msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + }, + ], + input_state: Some(input_state), + }, + true, + ); + + assert!( + new_op_rx.try_recv().is_err(), + "queued follow-up should stay queued until the latest turn completes" + ); + assert_eq!( + app.chat_widget.queued_user_message_texts(), + vec!["queued follow-up".to_string()] + ); + + app.chat_widget.handle_codex_event(Event { + id: "latest-turn-complete".to_string(), + msg: EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-1".to_string(), + last_agent_message: None, + }), + }); + + match next_user_turn_op(&mut new_op_rx) { + Op::UserTurn { items, .. } => assert_eq!( + items, + vec![UserInput::Text { + text: "queued follow-up".to_string(), + text_elements: Vec::new(), + }] + ), + other => panic!("expected queued follow-up submission, got {other:?}"), + } + } + + #[tokio::test] + async fn replay_thread_snapshot_restores_pending_pastes_for_submit() { + let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await; + let thread_id = ThreadId::new(); + app.thread_event_channels.insert( + thread_id, + ThreadEventChannel::new_with_session_configured( + THREAD_EVENT_CHANNEL_CAPACITY, + Event { + id: "session-configured".to_string(), + msg: EventMsg::SessionConfigured(SessionConfiguredEvent { + session_id: thread_id, + forked_from_id: None, + thread_name: None, + model: "gpt-test".to_string(), + model_provider_id: "test-provider".to_string(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::new_read_only_policy(), + cwd: PathBuf::from("/tmp/project"), + reasoning_effort: None, + history_log_id: 0, + history_entry_count: 0, + initial_messages: None, + network_proxy: None, + rollout_path: Some(PathBuf::new()), + }), + }, + ), + ); + app.activate_thread_channel(thread_id).await; + + let large = "x".repeat(1005); + app.chat_widget.handle_paste(large.clone()); + let expected_input_state = app + .chat_widget + .capture_thread_input_state() + .expect("expected thread input state"); + + app.store_active_thread_receiver().await; + + let snapshot = { + let channel = app + .thread_event_channels + .get(&thread_id) + .expect("thread channel should exist"); + let store = channel.store.lock().await; + assert_eq!(store.input_state, Some(expected_input_state)); + store.snapshot() + }; + + let (chat_widget, _app_event_tx, _rx, mut new_op_rx) = + make_chatwidget_manual_with_sender().await; + app.chat_widget = chat_widget; + app.replay_thread_snapshot(snapshot, true); + + assert_eq!(app.chat_widget.composer_text_with_pending(), large); + + app.chat_widget + .handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE)); + + match next_user_turn_op(&mut new_op_rx) { + Op::UserTurn { items, .. } => assert_eq!( + items, + vec![UserInput::Text { + text: large, + text_elements: Vec::new(), + }] + ), + other => panic!("expected restored paste submission, got {other:?}"), + } + } + + #[tokio::test] + async fn replay_thread_snapshot_restores_collaboration_mode_for_draft_submit() { + let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await; + let thread_id = ThreadId::new(); + let session_configured = Event { + id: "session-configured".to_string(), + msg: EventMsg::SessionConfigured(SessionConfiguredEvent { + session_id: thread_id, + forked_from_id: None, + thread_name: None, + model: "gpt-test".to_string(), + model_provider_id: "test-provider".to_string(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::new_read_only_policy(), + cwd: PathBuf::from("/tmp/project"), + reasoning_effort: None, + history_log_id: 0, + history_entry_count: 0, + initial_messages: None, + network_proxy: None, + rollout_path: Some(PathBuf::new()), + }), + }; + app.chat_widget + .handle_codex_event(session_configured.clone()); + app.chat_widget + .set_reasoning_effort(Some(ReasoningEffortConfig::High)); + app.chat_widget + .set_collaboration_mask(CollaborationModeMask { + name: "Plan".to_string(), + mode: Some(ModeKind::Plan), + model: Some("gpt-restored".to_string()), + reasoning_effort: Some(Some(ReasoningEffortConfig::High)), + developer_instructions: None, + }); + app.chat_widget + .apply_external_edit("draft prompt".to_string()); + let input_state = app + .chat_widget + .capture_thread_input_state() + .expect("expected draft input state"); + + let (chat_widget, _app_event_tx, _rx, mut new_op_rx) = + make_chatwidget_manual_with_sender().await; + app.chat_widget = chat_widget; + app.chat_widget.handle_codex_event(session_configured); + app.chat_widget + .set_reasoning_effort(Some(ReasoningEffortConfig::Low)); + app.chat_widget + .set_collaboration_mask(CollaborationModeMask { + name: "Default".to_string(), + mode: Some(ModeKind::Default), + model: Some("gpt-replacement".to_string()), + reasoning_effort: Some(Some(ReasoningEffortConfig::Low)), + developer_instructions: None, + }); + while new_op_rx.try_recv().is_ok() {} + + app.replay_thread_snapshot( + ThreadEventSnapshot { + session_configured: None, + events: vec![], + input_state: Some(input_state), + }, + true, + ); + app.chat_widget + .handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE)); + + match next_user_turn_op(&mut new_op_rx) { + Op::UserTurn { + items, + model, + effort, + collaboration_mode, + .. + } => { + assert_eq!( + items, + vec![UserInput::Text { + text: "draft prompt".to_string(), + text_elements: Vec::new(), + }] + ); + assert_eq!(model, "gpt-restored".to_string()); + assert_eq!(effort, Some(ReasoningEffortConfig::High)); + assert_eq!( + collaboration_mode, + Some(CollaborationMode { + mode: ModeKind::Plan, + settings: Settings { + model: "gpt-restored".to_string(), + reasoning_effort: Some(ReasoningEffortConfig::High), + developer_instructions: None, + }, + }) + ); + } + other => panic!("expected restored draft submission, got {other:?}"), + } + } + + #[tokio::test] + async fn replay_thread_snapshot_restores_collaboration_mode_without_input() { + let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await; + let thread_id = ThreadId::new(); + let session_configured = Event { + id: "session-configured".to_string(), + msg: EventMsg::SessionConfigured(SessionConfiguredEvent { + session_id: thread_id, + forked_from_id: None, + thread_name: None, + model: "gpt-test".to_string(), + model_provider_id: "test-provider".to_string(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::new_read_only_policy(), + cwd: PathBuf::from("/tmp/project"), + reasoning_effort: None, + history_log_id: 0, + history_entry_count: 0, + initial_messages: None, + network_proxy: None, + rollout_path: Some(PathBuf::new()), + }), + }; + app.chat_widget + .handle_codex_event(session_configured.clone()); + app.chat_widget + .set_reasoning_effort(Some(ReasoningEffortConfig::High)); + app.chat_widget + .set_collaboration_mask(CollaborationModeMask { + name: "Plan".to_string(), + mode: Some(ModeKind::Plan), + model: Some("gpt-restored".to_string()), + reasoning_effort: Some(Some(ReasoningEffortConfig::High)), + developer_instructions: None, + }); + let input_state = app + .chat_widget + .capture_thread_input_state() + .expect("expected collaboration-only input state"); + + let (chat_widget, _app_event_tx, _rx, _new_op_rx) = + make_chatwidget_manual_with_sender().await; + app.chat_widget = chat_widget; + app.chat_widget.handle_codex_event(session_configured); + app.chat_widget + .set_reasoning_effort(Some(ReasoningEffortConfig::Low)); + app.chat_widget + .set_collaboration_mask(CollaborationModeMask { + name: "Default".to_string(), + mode: Some(ModeKind::Default), + model: Some("gpt-replacement".to_string()), + reasoning_effort: Some(Some(ReasoningEffortConfig::Low)), + developer_instructions: None, + }); + + app.replay_thread_snapshot( + ThreadEventSnapshot { + session_configured: None, + events: vec![], + input_state: Some(input_state), + }, + true, + ); + + assert_eq!( + app.chat_widget.active_collaboration_mode_kind(), + ModeKind::Plan + ); + assert_eq!(app.chat_widget.current_model(), "gpt-restored"); + assert_eq!( + app.chat_widget.current_reasoning_effort(), + Some(ReasoningEffortConfig::High) + ); + } + + #[tokio::test] + async fn replayed_interrupted_turn_restores_queued_input_to_composer() { + let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await; + let thread_id = ThreadId::new(); + let session_configured = Event { + id: "session-configured".to_string(), + msg: EventMsg::SessionConfigured(SessionConfiguredEvent { + session_id: thread_id, + forked_from_id: None, + thread_name: None, + model: "gpt-test".to_string(), + model_provider_id: "test-provider".to_string(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::new_read_only_policy(), + cwd: PathBuf::from("/tmp/project"), + reasoning_effort: None, + history_log_id: 0, + history_entry_count: 0, + initial_messages: None, + network_proxy: None, + rollout_path: Some(PathBuf::new()), + }), + }; + app.chat_widget + .handle_codex_event(session_configured.clone()); + app.chat_widget.handle_codex_event(Event { + id: "turn-started".to_string(), + msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + }); + app.chat_widget.handle_codex_event(Event { + id: "agent-delta".to_string(), + msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { + delta: "streaming".to_string(), + }), + }); + app.chat_widget + .apply_external_edit("queued follow-up".to_string()); + app.chat_widget + .handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE)); + let input_state = app + .chat_widget + .capture_thread_input_state() + .expect("expected queued follow-up state"); + + let (chat_widget, _app_event_tx, _rx, mut new_op_rx) = + make_chatwidget_manual_with_sender().await; + app.chat_widget = chat_widget; + app.chat_widget.handle_codex_event(session_configured); + while new_op_rx.try_recv().is_ok() {} + + app.replay_thread_snapshot( + ThreadEventSnapshot { + session_configured: None, + events: vec![Event { + id: "turn-aborted".to_string(), + msg: EventMsg::TurnAborted(TurnAbortedEvent { + turn_id: Some("turn-1".to_string()), + reason: TurnAbortReason::ReviewEnded, + }), + }], + input_state: Some(input_state), + }, + true, + ); + + assert_eq!( + app.chat_widget.composer_text_with_pending(), + "queued follow-up" + ); + assert!(app.chat_widget.queued_user_message_texts().is_empty()); + assert!( + new_op_rx.try_recv().is_err(), + "replayed interrupted turns should restore queued input for editing, not submit it" + ); + } + #[tokio::test] async fn open_agent_picker_keeps_missing_threads_for_replay() -> Result<()> { let mut app = make_test_app().await; @@ -4564,6 +5340,17 @@ mod tests { ) } + fn next_user_turn_op(op_rx: &mut tokio::sync::mpsc::UnboundedReceiver) -> Op { + let mut seen = Vec::new(); + while let Ok(op) = op_rx.try_recv() { + if matches!(op, Op::UserTurn { .. }) { + return op; + } + seen.push(format!("{op:?}")); + } + panic!("expected UserTurn op, saw: {seen:?}"); + } + fn test_otel_manager(config: &Config, model: &str) -> OtelManager { let model_info = codex_core::test_support::construct_model_info_offline(model, config); OtelManager::new( diff --git a/codex-rs/tui/src/bottom_pane/mod.rs b/codex-rs/tui/src/bottom_pane/mod.rs index 74862df99..1e6e6fce0 100644 --- a/codex-rs/tui/src/bottom_pane/mod.rs +++ b/codex-rs/tui/src/bottom_pane/mod.rs @@ -579,6 +579,10 @@ impl BottomPane { self.composer.current_text_with_pending() } + pub(crate) fn composer_pending_pastes(&self) -> Vec<(String, String)> { + self.composer.pending_pastes() + } + pub(crate) fn apply_external_edit(&mut self, text: String) { self.composer.apply_external_edit(text); self.request_redraw(); @@ -604,6 +608,11 @@ impl BottomPane { urls } + pub(crate) fn set_composer_pending_pastes(&mut self, pending_pastes: Vec<(String, String)>) { + self.composer.set_pending_pastes(pending_pastes); + self.request_redraw(); + } + /// Update the status indicator header (defaults to "Working") and details below it. /// /// Passing `None` clears any existing details. No-ops if the status indicator is not active. diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 840505f36..fb9404a08 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -607,6 +607,7 @@ pub(crate) struct ChatWidget { retry_status_header: Option, // Set when commentary output completes; once stream queues go idle we restore the status row. pending_status_indicator_restore: bool, + suppress_queue_autosend: bool, thread_id: Option, thread_name: Option, forked_from: Option, @@ -721,6 +722,7 @@ pub(crate) struct ActiveCellTranscriptKey { pub(crate) animation_tick: Option, } +#[derive(Debug, Clone, PartialEq)] pub(crate) struct UserMessage { text: String, local_images: Vec, @@ -734,6 +736,36 @@ pub(crate) struct UserMessage { mention_bindings: Vec, } +#[derive(Debug, Clone, PartialEq, Default)] +struct ThreadComposerState { + text: String, + local_images: Vec, + remote_image_urls: Vec, + text_elements: Vec, + mention_bindings: Vec, + pending_pastes: Vec<(String, String)>, +} + +impl ThreadComposerState { + fn has_content(&self) -> bool { + !self.text.is_empty() + || !self.local_images.is_empty() + || !self.remote_image_urls.is_empty() + || !self.text_elements.is_empty() + || !self.mention_bindings.is_empty() + || !self.pending_pastes.is_empty() + } +} + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct ThreadInputState { + composer: Option, + queued_user_messages: VecDeque, + current_collaboration_mode: CollaborationMode, + active_collaboration_mask: Option, + agent_turn_running: bool, +} + impl From for UserMessage { fn from(text: String) -> Self { Self { @@ -2021,6 +2053,80 @@ impl ChatWidget { ); } + pub(crate) fn capture_thread_input_state(&self) -> Option { + let composer = ThreadComposerState { + text: self.bottom_pane.composer_text(), + text_elements: self.bottom_pane.composer_text_elements(), + local_images: self.bottom_pane.composer_local_images(), + remote_image_urls: self.bottom_pane.remote_image_urls(), + mention_bindings: self.bottom_pane.composer_mention_bindings(), + pending_pastes: self.bottom_pane.composer_pending_pastes(), + }; + Some(ThreadInputState { + composer: composer.has_content().then_some(composer), + queued_user_messages: self.queued_user_messages.clone(), + current_collaboration_mode: self.current_collaboration_mode.clone(), + active_collaboration_mask: self.active_collaboration_mask.clone(), + agent_turn_running: self.agent_turn_running, + }) + } + + pub(crate) fn restore_thread_input_state(&mut self, input_state: Option) { + if let Some(input_state) = input_state { + self.current_collaboration_mode = input_state.current_collaboration_mode; + self.active_collaboration_mask = input_state.active_collaboration_mask; + self.agent_turn_running = input_state.agent_turn_running; + self.update_collaboration_mode_indicator(); + self.refresh_model_display(); + if let Some(composer) = input_state.composer { + let local_image_paths = composer + .local_images + .into_iter() + .map(|img| img.path) + .collect(); + self.set_remote_image_urls(composer.remote_image_urls); + self.bottom_pane.set_composer_text_with_mention_bindings( + composer.text, + composer.text_elements, + local_image_paths, + composer.mention_bindings, + ); + self.bottom_pane + .set_composer_pending_pastes(composer.pending_pastes); + } else { + self.set_remote_image_urls(Vec::new()); + self.bottom_pane.set_composer_text_with_mention_bindings( + String::new(), + Vec::new(), + Vec::new(), + Vec::new(), + ); + self.bottom_pane.set_composer_pending_pastes(Vec::new()); + } + self.queued_user_messages = input_state.queued_user_messages; + } else { + self.agent_turn_running = false; + self.set_remote_image_urls(Vec::new()); + self.bottom_pane.set_composer_text_with_mention_bindings( + String::new(), + Vec::new(), + Vec::new(), + Vec::new(), + ); + self.bottom_pane.set_composer_pending_pastes(Vec::new()); + self.queued_user_messages.clear(); + } + self.turn_sleep_inhibitor + .set_turn_running(self.agent_turn_running); + self.update_task_running_state(); + self.refresh_queued_user_messages(); + self.request_redraw(); + } + + pub(crate) fn set_queue_autosend_suppressed(&mut self, suppressed: bool) { + self.suppress_queue_autosend = suppressed; + } + fn on_plan_update(&mut self, update: UpdatePlanArgs) { self.saw_plan_update_this_turn = true; self.add_to_history(history_cell::new_plan_update(update)); @@ -2977,6 +3083,7 @@ impl ChatWidget { current_status_header: String::from("Working"), retry_status_header: None, pending_status_indicator_restore: false, + suppress_queue_autosend: false, thread_id: None, thread_name: None, forked_from: None, @@ -3158,6 +3265,7 @@ impl ChatWidget { current_status_header: String::from("Working"), retry_status_header: None, pending_status_indicator_restore: false, + suppress_queue_autosend: false, thread_id: None, thread_name: None, forked_from: None, @@ -3328,6 +3436,7 @@ impl ChatWidget { current_status_header: String::from("Working"), retry_status_header: None, pending_status_indicator_restore: false, + suppress_queue_autosend: false, thread_id: None, thread_name: None, forked_from: None, @@ -4881,7 +4990,10 @@ impl ChatWidget { } // If idle and there are queued inputs, submit exactly one to start the next turn. - fn maybe_send_next_queued_input(&mut self) { + pub(crate) fn maybe_send_next_queued_input(&mut self) { + if self.suppress_queue_autosend { + return; + } if self.bottom_pane.is_task_running() { return; } @@ -7827,6 +7939,14 @@ impl ChatWidget { self.bottom_pane.remote_image_urls() } + #[cfg(test)] + pub(crate) fn queued_user_message_texts(&self) -> Vec { + self.queued_user_messages + .iter() + .map(|message| message.text.clone()) + .collect() + } + #[cfg(test)] pub(crate) fn pending_thread_approvals(&self) -> &[String] { self.bottom_pane.pending_thread_approvals() diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index 2861be570..b9167d211 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -1805,6 +1805,7 @@ async fn make_chatwidget_manual( current_status_header: String::from("Working"), retry_status_header: None, pending_status_indicator_restore: false, + suppress_queue_autosend: false, thread_id: None, thread_name: None, forked_from: None, @@ -3367,6 +3368,30 @@ async fn empty_enter_during_task_does_not_queue() { assert!(chat.queued_user_messages.is_empty()); } +#[tokio::test] +async fn restore_thread_input_state_syncs_sleep_inhibitor_state() { + let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await; + chat.set_feature_enabled(Feature::PreventIdleSleep, true); + + chat.restore_thread_input_state(Some(ThreadInputState { + composer: None, + queued_user_messages: VecDeque::new(), + current_collaboration_mode: chat.current_collaboration_mode.clone(), + active_collaboration_mask: chat.active_collaboration_mask.clone(), + agent_turn_running: true, + })); + + assert!(chat.agent_turn_running); + assert!(chat.turn_sleep_inhibitor.is_turn_running()); + assert!(chat.bottom_pane.is_task_running()); + + chat.restore_thread_input_state(None); + + assert!(!chat.agent_turn_running); + assert!(!chat.turn_sleep_inhibitor.is_turn_running()); + assert!(!chat.bottom_pane.is_task_running()); +} + #[tokio::test] async fn alt_up_edits_most_recent_queued_message() { let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await; diff --git a/codex-rs/utils/sleep-inhibitor/src/lib.rs b/codex-rs/utils/sleep-inhibitor/src/lib.rs index 8c9c5a9cc..0b3d2c9a3 100644 --- a/codex-rs/utils/sleep-inhibitor/src/lib.rs +++ b/codex-rs/utils/sleep-inhibitor/src/lib.rs @@ -29,6 +29,7 @@ use windows_inhibitor as imp; #[derive(Debug)] pub struct SleepInhibitor { enabled: bool, + turn_running: bool, platform: imp::SleepInhibitor, } @@ -36,12 +37,14 @@ impl SleepInhibitor { pub fn new(enabled: bool) -> Self { Self { enabled, + turn_running: false, platform: imp::SleepInhibitor::new(), } } /// Update the active turn state; turns sleep prevention on/off as needed. pub fn set_turn_running(&mut self, turn_running: bool) { + self.turn_running = turn_running; if !self.enabled { self.release(); return; @@ -61,6 +64,11 @@ impl SleepInhibitor { fn release(&mut self) { self.platform.release(); } + + /// Return the latest turn-running state requested by the caller. + pub fn is_turn_running(&self) -> bool { + self.turn_running + } } #[cfg(test)] @@ -71,14 +79,18 @@ mod tests { fn sleep_inhibitor_toggles_without_panicking() { let mut inhibitor = SleepInhibitor::new(true); inhibitor.set_turn_running(true); + assert!(inhibitor.is_turn_running()); inhibitor.set_turn_running(false); + assert!(!inhibitor.is_turn_running()); } #[test] fn sleep_inhibitor_disabled_does_not_panic() { let mut inhibitor = SleepInhibitor::new(false); inhibitor.set_turn_running(true); + assert!(inhibitor.is_turn_running()); inhibitor.set_turn_running(false); + assert!(!inhibitor.is_turn_running()); } #[test]