diff --git a/codex-rs/tui_app_server/src/app.rs b/codex-rs/tui_app_server/src/app.rs index 171df6927..d4569ae7a 100644 --- a/codex-rs/tui_app_server/src/app.rs +++ b/codex-rs/tui_app_server/src/app.rs @@ -464,8 +464,6 @@ enum ThreadBufferedEvent { Notification(ServerNotification), Request(ServerRequest), HistoryEntryResponse(GetHistoryEntryResponseEvent), - LegacyWarning(String), - LegacyRollback { num_turns: u32 }, } #[derive(Debug)] @@ -474,7 +472,6 @@ struct ThreadEventStore { turns: Vec, buffer: VecDeque, pending_interactive_replay: PendingInteractiveReplayState, - pending_local_legacy_rollbacks: VecDeque, active_turn_id: Option, input_state: Option, capacity: usize, @@ -483,10 +480,7 @@ struct ThreadEventStore { impl ThreadEventStore { fn event_survives_session_refresh(event: &ThreadBufferedEvent) -> bool { - matches!( - event, - ThreadBufferedEvent::Request(_) | ThreadBufferedEvent::LegacyWarning(_) - ) + matches!(event, ThreadBufferedEvent::Request(_)) } fn new(capacity: usize) -> Self { @@ -495,7 +489,6 @@ impl ThreadEventStore { turns: Vec::new(), buffer: VecDeque::new(), pending_interactive_replay: PendingInteractiveReplayState::default(), - pending_local_legacy_rollbacks: VecDeque::new(), active_turn_id: None, input_state: None, capacity, @@ -521,7 +514,6 @@ impl ThreadEventStore { } fn set_turns(&mut self, turns: Vec) { - self.pending_local_legacy_rollbacks.clear(); self.active_turn_id = turns .iter() .rev() @@ -578,37 +570,6 @@ impl ThreadEventStore { self.active_turn_id = None; } - fn note_local_thread_rollback(&mut self, num_turns: u32) { - self.pending_local_legacy_rollbacks.push_back(num_turns); - while self.pending_local_legacy_rollbacks.len() > self.capacity { - self.pending_local_legacy_rollbacks.pop_front(); - } - } - - fn consume_pending_local_legacy_rollback(&mut self, num_turns: u32) -> bool { - match self.pending_local_legacy_rollbacks.front() { - Some(pending_num_turns) if *pending_num_turns == num_turns => { - self.pending_local_legacy_rollbacks.pop_front(); - true - } - _ => false, - } - } - - fn apply_legacy_thread_rollback(&mut self, num_turns: u32) { - let num_turns = usize::try_from(num_turns).unwrap_or(usize::MAX); - if num_turns >= self.turns.len() { - self.turns.clear(); - } else { - self.turns - .truncate(self.turns.len().saturating_sub(num_turns)); - } - self.buffer.clear(); - self.pending_interactive_replay = PendingInteractiveReplayState::default(); - self.pending_local_legacy_rollbacks.clear(); - self.active_turn_id = None; - } - fn snapshot(&self) -> ThreadEventSnapshot { ThreadEventSnapshot { session: self.session.clone(), @@ -623,9 +584,7 @@ impl ThreadEventStore { .pending_interactive_replay .should_replay_snapshot_request(request), ThreadBufferedEvent::Notification(_) - | ThreadBufferedEvent::HistoryEntryResponse(_) - | ThreadBufferedEvent::LegacyWarning(_) - | ThreadBufferedEvent::LegacyRollback { .. } => true, + | ThreadBufferedEvent::HistoryEntryResponse(_) => true, }) .cloned() .collect(), @@ -2283,50 +2242,6 @@ impl App { Ok(()) } - async fn enqueue_thread_legacy_warning( - &mut self, - thread_id: ThreadId, - message: String, - ) -> Result<()> { - let (sender, store) = { - let channel = self.ensure_thread_channel(thread_id); - (channel.sender.clone(), Arc::clone(&channel.store)) - }; - - let should_send = { - let mut guard = store.lock().await; - guard - .buffer - .push_back(ThreadBufferedEvent::LegacyWarning(message.clone())); - if guard.buffer.len() > guard.capacity - && let Some(removed) = guard.buffer.pop_front() - && let ThreadBufferedEvent::Request(request) = &removed - { - guard - .pending_interactive_replay - .note_evicted_server_request(request); - } - guard.active - }; - - if should_send { - match sender.try_send(ThreadBufferedEvent::LegacyWarning(message)) { - Ok(()) => {} - Err(TrySendError::Full(event)) => { - tokio::spawn(async move { - if let Err(err) = sender.send(event).await { - tracing::warn!("thread {thread_id} event channel closed: {err}"); - } - }); - } - Err(TrySendError::Closed(_)) => { - tracing::warn!("thread {thread_id} event channel closed"); - } - } - } - Ok(()) - } - async fn enqueue_thread_history_entry_response( &mut self, thread_id: ThreadId, @@ -2371,64 +2286,6 @@ impl App { Ok(()) } - async fn enqueue_thread_legacy_rollback( - &mut self, - thread_id: ThreadId, - num_turns: u32, - ) -> Result<()> { - let (sender, store) = { - let channel = self.ensure_thread_channel(thread_id); - (channel.sender.clone(), Arc::clone(&channel.store)) - }; - - let should_send = { - let mut guard = store.lock().await; - if guard.consume_pending_local_legacy_rollback(num_turns) { - false - } else { - guard.apply_legacy_thread_rollback(num_turns); - guard.active - } - }; - - if should_send { - match sender.try_send(ThreadBufferedEvent::LegacyRollback { num_turns }) { - Ok(()) => {} - Err(TrySendError::Full(event)) => { - tokio::spawn(async move { - if let Err(err) = sender.send(event).await { - tracing::warn!("thread {thread_id} event channel closed: {err}"); - } - }); - } - Err(TrySendError::Closed(_)) => { - tracing::warn!("thread {thread_id} event channel closed"); - } - } - } - Ok(()) - } - - async fn enqueue_primary_thread_legacy_warning(&mut self, message: String) -> Result<()> { - if let Some(thread_id) = self.primary_thread_id { - return self.enqueue_thread_legacy_warning(thread_id, message).await; - } - self.pending_primary_events - .push_back(ThreadBufferedEvent::LegacyWarning(message)); - Ok(()) - } - - async fn enqueue_primary_thread_legacy_rollback(&mut self, num_turns: u32) -> Result<()> { - if let Some(thread_id) = self.primary_thread_id { - return self - .enqueue_thread_legacy_rollback(thread_id, num_turns) - .await; - } - self.pending_primary_events - .push_back(ThreadBufferedEvent::LegacyRollback { num_turns }); - Ok(()) - } - async fn enqueue_primary_thread_session( &mut self, session: ThreadSessionState, @@ -2466,14 +2323,6 @@ impl App { self.enqueue_thread_history_entry_response(thread_id, event) .await?; } - ThreadBufferedEvent::LegacyWarning(message) => { - self.enqueue_thread_legacy_warning(thread_id, message) - .await?; - } - ThreadBufferedEvent::LegacyRollback { num_turns } => { - self.enqueue_thread_legacy_rollback(thread_id, num_turns) - .await?; - } } } self.chat_widget @@ -4769,7 +4618,6 @@ impl App { if let Some(channel) = self.thread_event_channels.get(&thread_id) { let mut store = channel.store.lock().await; store.apply_thread_rollback(response); - store.note_local_thread_rollback(num_turns); } if self.active_thread_id == Some(thread_id) && let Some(mut rx) = self.active_thread_rx.take() @@ -4814,13 +4662,6 @@ impl App { ThreadBufferedEvent::HistoryEntryResponse(event) => { self.chat_widget.handle_history_entry_response(event); } - ThreadBufferedEvent::LegacyWarning(message) => { - self.chat_widget.add_warning_message(message); - } - ThreadBufferedEvent::LegacyRollback { num_turns } => { - self.handle_backtrack_rollback_succeeded(num_turns); - self.chat_widget.handle_thread_rolled_back(); - } } if needs_refresh { self.refresh_status_line(); @@ -4838,13 +4679,6 @@ impl App { ThreadBufferedEvent::HistoryEntryResponse(event) => { self.chat_widget.handle_history_entry_response(event) } - ThreadBufferedEvent::LegacyWarning(message) => { - self.chat_widget.add_warning_message(message); - } - ThreadBufferedEvent::LegacyRollback { num_turns } => { - self.handle_backtrack_rollback_succeeded(num_turns); - self.chat_widget.handle_thread_rolled_back(); - } } } @@ -5335,6 +5169,7 @@ mod tests { use codex_app_server_protocol::AdditionalPermissionProfile; use codex_app_server_protocol::AgentMessageDeltaNotification; use codex_app_server_protocol::CommandExecutionRequestApprovalParams; + use codex_app_server_protocol::ConfigWarningNotification; use codex_app_server_protocol::NetworkApprovalContext as AppServerNetworkApprovalContext; use codex_app_server_protocol::NetworkApprovalProtocol as AppServerNetworkApprovalProtocol; use codex_app_server_protocol::NetworkPolicyAmendment as AppServerNetworkPolicyAmendment; @@ -5935,33 +5770,6 @@ mod tests { } } - #[tokio::test] - async fn replay_thread_snapshot_replays_legacy_warning_history() { - let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await; - - app.replay_thread_snapshot( - ThreadEventSnapshot { - session: None, - turns: Vec::new(), - events: vec![ThreadBufferedEvent::LegacyWarning( - "legacy warning message".to_string(), - )], - input_state: None, - }, - false, - ); - - let mut saw_warning = false; - while let Ok(event) = app_event_rx.try_recv() { - if let AppEvent::InsertHistoryCell(cell) = event { - let transcript = lines_to_single_string(&cell.transcript_lines(80)); - saw_warning |= transcript.contains("legacy warning message"); - } - } - - assert!(saw_warning, "expected replayed legacy warning history cell"); - } - #[tokio::test] async fn replay_only_thread_keeps_restored_queue_visible() { let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await; @@ -7308,87 +7116,6 @@ guardian_approval = true Ok(()) } - #[tokio::test] - async fn legacy_warning_eviction_clears_pending_interactive_replay_state() -> Result<()> { - let mut app = make_test_app().await; - let thread_id = ThreadId::new(); - let channel = ThreadEventChannel::new(1); - { - let mut store = channel.store.lock().await; - store.push_request(exec_approval_request( - thread_id, - "turn-approval", - "call-approval", - None, - )); - assert_eq!(store.has_pending_thread_approvals(), true); - } - app.thread_event_channels.insert(thread_id, channel); - - app.enqueue_thread_legacy_warning(thread_id, "legacy warning".to_string()) - .await?; - - let store = app - .thread_event_channels - .get(&thread_id) - .expect("thread store should exist") - .store - .lock() - .await; - assert_eq!(store.has_pending_thread_approvals(), false); - let snapshot = store.snapshot(); - assert_eq!(snapshot.events.len(), 1); - assert!(matches!( - snapshot.events.first(), - Some(ThreadBufferedEvent::LegacyWarning(message)) if message == "legacy warning" - )); - - Ok(()) - } - - #[tokio::test] - async fn legacy_thread_rollback_trims_inactive_thread_snapshot_state() -> Result<()> { - let mut app = make_test_app().await; - let thread_id = ThreadId::new(); - let session = test_thread_session(thread_id, PathBuf::from("/tmp/project")); - let turns = vec![ - test_turn("turn-1", TurnStatus::Completed, Vec::new()), - test_turn("turn-2", TurnStatus::Completed, Vec::new()), - ]; - let channel = ThreadEventChannel::new_with_session(4, session, turns); - { - let mut store = channel.store.lock().await; - store.push_request(exec_approval_request( - thread_id, - "turn-approval", - "call-approval", - None, - )); - assert_eq!(store.has_pending_thread_approvals(), true); - } - app.thread_event_channels.insert(thread_id, channel); - - app.enqueue_thread_legacy_rollback(thread_id, 1).await?; - - let store = app - .thread_event_channels - .get(&thread_id) - .expect("thread store should exist") - .store - .lock() - .await; - assert_eq!( - store.turns, - vec![test_turn("turn-1", TurnStatus::Completed, Vec::new())] - ); - assert_eq!(store.has_pending_thread_approvals(), false); - let snapshot = store.snapshot(); - assert_eq!(snapshot.turns, store.turns); - assert!(snapshot.events.is_empty()); - - Ok(()) - } - #[tokio::test] async fn inactive_thread_started_notification_initializes_replay_session() -> Result<()> { let mut app = make_test_app().await; @@ -8108,16 +7835,6 @@ guardian_approval = true assert_eq!(store.has_pending_thread_approvals(), false); } - #[test] - fn thread_event_store_consumes_matching_local_legacy_rollback_once() { - let mut store = ThreadEventStore::new(8); - store.note_local_thread_rollback(2); - - assert!(store.consume_pending_local_legacy_rollback(2)); - assert!(!store.consume_pending_local_legacy_rollback(2)); - assert!(!store.consume_pending_local_legacy_rollback(1)); - } - fn next_user_turn_op(op_rx: &mut tokio::sync::mpsc::UnboundedReceiver) -> Op { let mut seen = Vec::new(); while let Ok(op) = op_rx.try_recv() { @@ -9076,8 +8793,13 @@ guardian_approval = true let (tx, rx) = mpsc::channel(8); app.active_thread_id = Some(thread_id); app.active_thread_rx = Some(rx); - tx.send(ThreadBufferedEvent::LegacyWarning( - "stale warning".to_string(), + tx.send(ThreadBufferedEvent::Notification( + ServerNotification::ConfigWarning(ConfigWarningNotification { + summary: "stale warning".to_string(), + details: None, + path: None, + range: None, + }), )) .await .expect("event should queue"); @@ -9115,62 +8837,6 @@ guardian_approval = true assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty))); } - #[tokio::test] - async fn local_rollback_response_suppresses_matching_legacy_rollback() { - let mut app = make_test_app().await; - let thread_id = ThreadId::new(); - let session = test_thread_session(thread_id, PathBuf::from("/tmp/project")); - let initial_turns = vec![ - test_turn("turn-1", TurnStatus::Completed, Vec::new()), - test_turn("turn-2", TurnStatus::Completed, Vec::new()), - ]; - app.thread_event_channels.insert( - thread_id, - ThreadEventChannel::new_with_session(8, session, initial_turns), - ); - - app.handle_thread_rollback_response( - thread_id, - 1, - &ThreadRollbackResponse { - thread: Thread { - id: thread_id.to_string(), - preview: String::new(), - ephemeral: false, - model_provider: "openai".to_string(), - created_at: 0, - updated_at: 0, - status: codex_app_server_protocol::ThreadStatus::Idle, - path: None, - cwd: PathBuf::from("/tmp/project"), - cli_version: "0.0.0".to_string(), - source: SessionSource::Cli.into(), - agent_nickname: None, - agent_role: None, - git_info: None, - name: None, - turns: vec![test_turn("turn-1", TurnStatus::Completed, Vec::new())], - }, - }, - ) - .await; - - app.enqueue_thread_legacy_rollback(thread_id, 1) - .await - .expect("legacy rollback should not fail"); - - let store = app - .thread_event_channels - .get(&thread_id) - .expect("thread channel") - .store - .lock() - .await; - let snapshot = store.snapshot(); - assert_eq!(snapshot.turns.len(), 1); - assert!(snapshot.events.is_empty()); - } - #[tokio::test] async fn new_session_requests_shutdown_for_previous_conversation() { let (mut app, mut app_event_rx, mut op_rx) = make_test_app_with_channels().await; diff --git a/codex-rs/tui_app_server/src/app/app_server_adapter.rs b/codex-rs/tui_app_server/src/app/app_server_adapter.rs index c144e36db..26aff5ae3 100644 --- a/codex-rs/tui_app_server/src/app/app_server_adapter.rs +++ b/codex-rs/tui_app_server/src/app/app_server_adapter.rs @@ -21,7 +21,6 @@ use codex_app_server_client::AppServerEvent; use codex_app_server_protocol::AuthMode; use codex_app_server_protocol::ChatgptAuthTokensRefreshParams; use codex_app_server_protocol::JSONRPCErrorError; -use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequest; @@ -106,16 +105,9 @@ use codex_protocol::protocol::TurnAbortedEvent; use codex_protocol::protocol::TurnCompleteEvent; #[cfg(test)] use codex_protocol::protocol::TurnStartedEvent; -use serde_json::Value; #[cfg(test)] use std::time::Duration; -#[derive(Debug, PartialEq, Eq)] -enum LegacyThreadNotification { - Warning(String), - Rollback { num_turns: u32 }, -} - impl App { pub(super) async fn handle_app_server_event( &mut self, @@ -133,37 +125,8 @@ impl App { self.handle_server_notification_event(app_server_client, notification) .await; } - AppServerEvent::LegacyNotification(notification) => { - if let Some((thread_id, legacy_notification)) = - legacy_thread_notification(notification) - { - let result = match legacy_notification { - LegacyThreadNotification::Warning(message) => { - if self.primary_thread_id == Some(thread_id) - || self.primary_thread_id.is_none() - { - self.enqueue_primary_thread_legacy_warning(message).await - } else { - self.enqueue_thread_legacy_warning(thread_id, message).await - } - } - LegacyThreadNotification::Rollback { num_turns } => { - if self.primary_thread_id == Some(thread_id) - || self.primary_thread_id.is_none() - { - self.enqueue_primary_thread_legacy_rollback(num_turns).await - } else { - self.enqueue_thread_legacy_rollback(thread_id, num_turns) - .await - } - } - }; - if let Err(err) = result { - tracing::warn!("failed to enqueue app-server legacy notification: {err}"); - } - } else { - tracing::debug!("ignoring legacy app-server notification in tui_app_server"); - } + AppServerEvent::LegacyNotification(_) => { + tracing::debug!("ignoring legacy app-server notification in tui_app_server"); } AppServerEvent::ServerRequest(request) => { if let ServerRequest::ChatgptAuthTokensRefresh { request_id, params } = request { @@ -567,48 +530,6 @@ pub(super) fn thread_snapshot_events( .collect() } -fn legacy_thread_notification( - notification: JSONRPCNotification, -) -> Option<(ThreadId, LegacyThreadNotification)> { - let method = notification - .method - .strip_prefix("codex/event/") - .unwrap_or(¬ification.method); - - let Value::Object(mut params) = notification.params? else { - return None; - }; - let thread_id = params - .remove("conversationId") - .and_then(|value| serde_json::from_value::(value).ok()) - .and_then(|value| ThreadId::from_string(&value).ok())?; - let msg = params.get("msg").and_then(Value::as_object)?; - - match method { - "warning" => { - let message = msg - .get("type") - .and_then(Value::as_str) - .zip(msg.get("message")) - .and_then(|(kind, message)| (kind == "warning").then_some(message)) - .and_then(Value::as_str) - .map(ToOwned::to_owned)?; - Some((thread_id, LegacyThreadNotification::Warning(message))) - } - "thread_rolled_back" => { - let num_turns = msg - .get("type") - .and_then(Value::as_str) - .zip(msg.get("num_turns")) - .and_then(|(kind, num_turns)| (kind == "thread_rolled_back").then_some(num_turns)) - .and_then(Value::as_u64) - .and_then(|num_turns| u32::try_from(num_turns).ok())?; - Some((thread_id, LegacyThreadNotification::Rollback { num_turns })) - } - _ => None, - } -} - #[cfg(test)] fn server_notification_thread_events( notification: ServerNotification, @@ -1289,9 +1210,7 @@ fn app_server_codex_error_info_to_core( #[cfg(test)] mod tests { - use super::LegacyThreadNotification; use super::command_execution_started_event; - use super::legacy_thread_notification; use super::server_notification_thread_events; use super::thread_snapshot_events; use super::turn_snapshot_events; @@ -1303,7 +1222,6 @@ mod tests { use codex_app_server_protocol::CommandExecutionStatus; use codex_app_server_protocol::ItemCompletedNotification; use codex_app_server_protocol::ItemStartedNotification; - use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::ReasoningSummaryTextDeltaNotification; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::Thread; @@ -1324,57 +1242,8 @@ mod tests { use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnAbortedEvent; use pretty_assertions::assert_eq; - use serde_json::json; use std::path::PathBuf; - #[test] - fn legacy_warning_notification_extracts_thread_id_and_message() { - let thread_id = ThreadId::new(); - let warning = legacy_thread_notification(JSONRPCNotification { - method: "codex/event/warning".to_string(), - params: Some(json!({ - "conversationId": thread_id.to_string(), - "id": "event-1", - "msg": { - "type": "warning", - "message": "legacy warning message", - }, - })), - }); - - assert_eq!( - warning, - Some(( - thread_id, - LegacyThreadNotification::Warning("legacy warning message".to_string()) - )) - ); - } - - #[test] - fn legacy_thread_rollback_notification_extracts_thread_id_and_turn_count() { - let thread_id = ThreadId::new(); - let rollback = legacy_thread_notification(JSONRPCNotification { - method: "codex/event/thread_rolled_back".to_string(), - params: Some(json!({ - "conversationId": thread_id.to_string(), - "id": "event-1", - "msg": { - "type": "thread_rolled_back", - "num_turns": 2, - }, - })), - }); - - assert_eq!( - rollback, - Some(( - thread_id, - LegacyThreadNotification::Rollback { num_turns: 2 } - )) - ); - } - #[test] fn bridges_completed_agent_messages_from_server_notifications() { let thread_id = "019cee8c-b993-7e33-88c0-014d4e62612d".to_string(); diff --git a/codex-rs/tui_app_server/src/chatwidget.rs b/codex-rs/tui_app_server/src/chatwidget.rs index 0d53ae3f7..82cd0d99b 100644 --- a/codex-rs/tui_app_server/src/chatwidget.rs +++ b/codex-rs/tui_app_server/src/chatwidget.rs @@ -9548,10 +9548,6 @@ impl ChatWidget { self.request_redraw(); } - pub(crate) fn add_warning_message(&mut self, message: String) { - self.on_warning(message); - } - fn add_app_server_stub_message(&mut self, feature: &str) { warn!(feature, "stubbed unsupported app-server TUI feature"); self.add_error_message(format!("{feature}: {APP_SERVER_TUI_STUB_MESSAGE}"));