Remove legacy app-server notification handling from tui_app_server (#15390)
As part of moving the TUI onto the app server, we added some temporary handling of some legacy events. We've confirmed that these do not need to be supported, so this PR removes this support from the tui_app_server, allowing for additional simplifications in follow-on PRs. These events are needed only for very old rollouts. None of the other app server clients (IDE extension or app) support these either. ## Summary - stop translating legacy `codex/event/*` notifications inside `tui_app_server` - remove the TUI-side legacy warning and rollback buffering/replay paths that were only fed by those notifications - keep the lower-level app-server and app-server-client legacy event plumbing intact so PR #15106 can rebase on top and handle the remaining exec/lower-layer migration separately
This commit is contained in:
parent
0d9bb8ea58
commit
b0236501e2
3 changed files with 12 additions and 481 deletions
|
|
@ -464,8 +464,6 @@ enum ThreadBufferedEvent {
|
|||
Notification(ServerNotification),
|
||||
Request(ServerRequest),
|
||||
HistoryEntryResponse(GetHistoryEntryResponseEvent),
|
||||
LegacyWarning(String),
|
||||
LegacyRollback { num_turns: u32 },
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
|
@ -474,7 +472,6 @@ struct ThreadEventStore {
|
|||
turns: Vec<Turn>,
|
||||
buffer: VecDeque<ThreadBufferedEvent>,
|
||||
pending_interactive_replay: PendingInteractiveReplayState,
|
||||
pending_local_legacy_rollbacks: VecDeque<u32>,
|
||||
active_turn_id: Option<String>,
|
||||
input_state: Option<ThreadInputState>,
|
||||
capacity: usize,
|
||||
|
|
@ -483,10 +480,7 @@ struct ThreadEventStore {
|
|||
|
||||
impl ThreadEventStore {
|
||||
fn event_survives_session_refresh(event: &ThreadBufferedEvent) -> bool {
|
||||
matches!(
|
||||
event,
|
||||
ThreadBufferedEvent::Request(_) | ThreadBufferedEvent::LegacyWarning(_)
|
||||
)
|
||||
matches!(event, ThreadBufferedEvent::Request(_))
|
||||
}
|
||||
|
||||
fn new(capacity: usize) -> Self {
|
||||
|
|
@ -495,7 +489,6 @@ impl ThreadEventStore {
|
|||
turns: Vec::new(),
|
||||
buffer: VecDeque::new(),
|
||||
pending_interactive_replay: PendingInteractiveReplayState::default(),
|
||||
pending_local_legacy_rollbacks: VecDeque::new(),
|
||||
active_turn_id: None,
|
||||
input_state: None,
|
||||
capacity,
|
||||
|
|
@ -521,7 +514,6 @@ impl ThreadEventStore {
|
|||
}
|
||||
|
||||
fn set_turns(&mut self, turns: Vec<Turn>) {
|
||||
self.pending_local_legacy_rollbacks.clear();
|
||||
self.active_turn_id = turns
|
||||
.iter()
|
||||
.rev()
|
||||
|
|
@ -578,37 +570,6 @@ impl ThreadEventStore {
|
|||
self.active_turn_id = None;
|
||||
}
|
||||
|
||||
fn note_local_thread_rollback(&mut self, num_turns: u32) {
|
||||
self.pending_local_legacy_rollbacks.push_back(num_turns);
|
||||
while self.pending_local_legacy_rollbacks.len() > self.capacity {
|
||||
self.pending_local_legacy_rollbacks.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
fn consume_pending_local_legacy_rollback(&mut self, num_turns: u32) -> bool {
|
||||
match self.pending_local_legacy_rollbacks.front() {
|
||||
Some(pending_num_turns) if *pending_num_turns == num_turns => {
|
||||
self.pending_local_legacy_rollbacks.pop_front();
|
||||
true
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn apply_legacy_thread_rollback(&mut self, num_turns: u32) {
|
||||
let num_turns = usize::try_from(num_turns).unwrap_or(usize::MAX);
|
||||
if num_turns >= self.turns.len() {
|
||||
self.turns.clear();
|
||||
} else {
|
||||
self.turns
|
||||
.truncate(self.turns.len().saturating_sub(num_turns));
|
||||
}
|
||||
self.buffer.clear();
|
||||
self.pending_interactive_replay = PendingInteractiveReplayState::default();
|
||||
self.pending_local_legacy_rollbacks.clear();
|
||||
self.active_turn_id = None;
|
||||
}
|
||||
|
||||
fn snapshot(&self) -> ThreadEventSnapshot {
|
||||
ThreadEventSnapshot {
|
||||
session: self.session.clone(),
|
||||
|
|
@ -623,9 +584,7 @@ impl ThreadEventStore {
|
|||
.pending_interactive_replay
|
||||
.should_replay_snapshot_request(request),
|
||||
ThreadBufferedEvent::Notification(_)
|
||||
| ThreadBufferedEvent::HistoryEntryResponse(_)
|
||||
| ThreadBufferedEvent::LegacyWarning(_)
|
||||
| ThreadBufferedEvent::LegacyRollback { .. } => true,
|
||||
| ThreadBufferedEvent::HistoryEntryResponse(_) => true,
|
||||
})
|
||||
.cloned()
|
||||
.collect(),
|
||||
|
|
@ -2283,50 +2242,6 @@ impl App {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn enqueue_thread_legacy_warning(
|
||||
&mut self,
|
||||
thread_id: ThreadId,
|
||||
message: String,
|
||||
) -> Result<()> {
|
||||
let (sender, store) = {
|
||||
let channel = self.ensure_thread_channel(thread_id);
|
||||
(channel.sender.clone(), Arc::clone(&channel.store))
|
||||
};
|
||||
|
||||
let should_send = {
|
||||
let mut guard = store.lock().await;
|
||||
guard
|
||||
.buffer
|
||||
.push_back(ThreadBufferedEvent::LegacyWarning(message.clone()));
|
||||
if guard.buffer.len() > guard.capacity
|
||||
&& let Some(removed) = guard.buffer.pop_front()
|
||||
&& let ThreadBufferedEvent::Request(request) = &removed
|
||||
{
|
||||
guard
|
||||
.pending_interactive_replay
|
||||
.note_evicted_server_request(request);
|
||||
}
|
||||
guard.active
|
||||
};
|
||||
|
||||
if should_send {
|
||||
match sender.try_send(ThreadBufferedEvent::LegacyWarning(message)) {
|
||||
Ok(()) => {}
|
||||
Err(TrySendError::Full(event)) => {
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = sender.send(event).await {
|
||||
tracing::warn!("thread {thread_id} event channel closed: {err}");
|
||||
}
|
||||
});
|
||||
}
|
||||
Err(TrySendError::Closed(_)) => {
|
||||
tracing::warn!("thread {thread_id} event channel closed");
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn enqueue_thread_history_entry_response(
|
||||
&mut self,
|
||||
thread_id: ThreadId,
|
||||
|
|
@ -2371,64 +2286,6 @@ impl App {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn enqueue_thread_legacy_rollback(
|
||||
&mut self,
|
||||
thread_id: ThreadId,
|
||||
num_turns: u32,
|
||||
) -> Result<()> {
|
||||
let (sender, store) = {
|
||||
let channel = self.ensure_thread_channel(thread_id);
|
||||
(channel.sender.clone(), Arc::clone(&channel.store))
|
||||
};
|
||||
|
||||
let should_send = {
|
||||
let mut guard = store.lock().await;
|
||||
if guard.consume_pending_local_legacy_rollback(num_turns) {
|
||||
false
|
||||
} else {
|
||||
guard.apply_legacy_thread_rollback(num_turns);
|
||||
guard.active
|
||||
}
|
||||
};
|
||||
|
||||
if should_send {
|
||||
match sender.try_send(ThreadBufferedEvent::LegacyRollback { num_turns }) {
|
||||
Ok(()) => {}
|
||||
Err(TrySendError::Full(event)) => {
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = sender.send(event).await {
|
||||
tracing::warn!("thread {thread_id} event channel closed: {err}");
|
||||
}
|
||||
});
|
||||
}
|
||||
Err(TrySendError::Closed(_)) => {
|
||||
tracing::warn!("thread {thread_id} event channel closed");
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn enqueue_primary_thread_legacy_warning(&mut self, message: String) -> Result<()> {
|
||||
if let Some(thread_id) = self.primary_thread_id {
|
||||
return self.enqueue_thread_legacy_warning(thread_id, message).await;
|
||||
}
|
||||
self.pending_primary_events
|
||||
.push_back(ThreadBufferedEvent::LegacyWarning(message));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn enqueue_primary_thread_legacy_rollback(&mut self, num_turns: u32) -> Result<()> {
|
||||
if let Some(thread_id) = self.primary_thread_id {
|
||||
return self
|
||||
.enqueue_thread_legacy_rollback(thread_id, num_turns)
|
||||
.await;
|
||||
}
|
||||
self.pending_primary_events
|
||||
.push_back(ThreadBufferedEvent::LegacyRollback { num_turns });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn enqueue_primary_thread_session(
|
||||
&mut self,
|
||||
session: ThreadSessionState,
|
||||
|
|
@ -2466,14 +2323,6 @@ impl App {
|
|||
self.enqueue_thread_history_entry_response(thread_id, event)
|
||||
.await?;
|
||||
}
|
||||
ThreadBufferedEvent::LegacyWarning(message) => {
|
||||
self.enqueue_thread_legacy_warning(thread_id, message)
|
||||
.await?;
|
||||
}
|
||||
ThreadBufferedEvent::LegacyRollback { num_turns } => {
|
||||
self.enqueue_thread_legacy_rollback(thread_id, num_turns)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
self.chat_widget
|
||||
|
|
@ -4769,7 +4618,6 @@ impl App {
|
|||
if let Some(channel) = self.thread_event_channels.get(&thread_id) {
|
||||
let mut store = channel.store.lock().await;
|
||||
store.apply_thread_rollback(response);
|
||||
store.note_local_thread_rollback(num_turns);
|
||||
}
|
||||
if self.active_thread_id == Some(thread_id)
|
||||
&& let Some(mut rx) = self.active_thread_rx.take()
|
||||
|
|
@ -4814,13 +4662,6 @@ impl App {
|
|||
ThreadBufferedEvent::HistoryEntryResponse(event) => {
|
||||
self.chat_widget.handle_history_entry_response(event);
|
||||
}
|
||||
ThreadBufferedEvent::LegacyWarning(message) => {
|
||||
self.chat_widget.add_warning_message(message);
|
||||
}
|
||||
ThreadBufferedEvent::LegacyRollback { num_turns } => {
|
||||
self.handle_backtrack_rollback_succeeded(num_turns);
|
||||
self.chat_widget.handle_thread_rolled_back();
|
||||
}
|
||||
}
|
||||
if needs_refresh {
|
||||
self.refresh_status_line();
|
||||
|
|
@ -4838,13 +4679,6 @@ impl App {
|
|||
ThreadBufferedEvent::HistoryEntryResponse(event) => {
|
||||
self.chat_widget.handle_history_entry_response(event)
|
||||
}
|
||||
ThreadBufferedEvent::LegacyWarning(message) => {
|
||||
self.chat_widget.add_warning_message(message);
|
||||
}
|
||||
ThreadBufferedEvent::LegacyRollback { num_turns } => {
|
||||
self.handle_backtrack_rollback_succeeded(num_turns);
|
||||
self.chat_widget.handle_thread_rolled_back();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -5335,6 +5169,7 @@ mod tests {
|
|||
use codex_app_server_protocol::AdditionalPermissionProfile;
|
||||
use codex_app_server_protocol::AgentMessageDeltaNotification;
|
||||
use codex_app_server_protocol::CommandExecutionRequestApprovalParams;
|
||||
use codex_app_server_protocol::ConfigWarningNotification;
|
||||
use codex_app_server_protocol::NetworkApprovalContext as AppServerNetworkApprovalContext;
|
||||
use codex_app_server_protocol::NetworkApprovalProtocol as AppServerNetworkApprovalProtocol;
|
||||
use codex_app_server_protocol::NetworkPolicyAmendment as AppServerNetworkPolicyAmendment;
|
||||
|
|
@ -5935,33 +5770,6 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn replay_thread_snapshot_replays_legacy_warning_history() {
|
||||
let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await;
|
||||
|
||||
app.replay_thread_snapshot(
|
||||
ThreadEventSnapshot {
|
||||
session: None,
|
||||
turns: Vec::new(),
|
||||
events: vec![ThreadBufferedEvent::LegacyWarning(
|
||||
"legacy warning message".to_string(),
|
||||
)],
|
||||
input_state: None,
|
||||
},
|
||||
false,
|
||||
);
|
||||
|
||||
let mut saw_warning = false;
|
||||
while let Ok(event) = app_event_rx.try_recv() {
|
||||
if let AppEvent::InsertHistoryCell(cell) = event {
|
||||
let transcript = lines_to_single_string(&cell.transcript_lines(80));
|
||||
saw_warning |= transcript.contains("legacy warning message");
|
||||
}
|
||||
}
|
||||
|
||||
assert!(saw_warning, "expected replayed legacy warning history cell");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn replay_only_thread_keeps_restored_queue_visible() {
|
||||
let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await;
|
||||
|
|
@ -7308,87 +7116,6 @@ guardian_approval = true
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn legacy_warning_eviction_clears_pending_interactive_replay_state() -> Result<()> {
|
||||
let mut app = make_test_app().await;
|
||||
let thread_id = ThreadId::new();
|
||||
let channel = ThreadEventChannel::new(1);
|
||||
{
|
||||
let mut store = channel.store.lock().await;
|
||||
store.push_request(exec_approval_request(
|
||||
thread_id,
|
||||
"turn-approval",
|
||||
"call-approval",
|
||||
None,
|
||||
));
|
||||
assert_eq!(store.has_pending_thread_approvals(), true);
|
||||
}
|
||||
app.thread_event_channels.insert(thread_id, channel);
|
||||
|
||||
app.enqueue_thread_legacy_warning(thread_id, "legacy warning".to_string())
|
||||
.await?;
|
||||
|
||||
let store = app
|
||||
.thread_event_channels
|
||||
.get(&thread_id)
|
||||
.expect("thread store should exist")
|
||||
.store
|
||||
.lock()
|
||||
.await;
|
||||
assert_eq!(store.has_pending_thread_approvals(), false);
|
||||
let snapshot = store.snapshot();
|
||||
assert_eq!(snapshot.events.len(), 1);
|
||||
assert!(matches!(
|
||||
snapshot.events.first(),
|
||||
Some(ThreadBufferedEvent::LegacyWarning(message)) if message == "legacy warning"
|
||||
));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn legacy_thread_rollback_trims_inactive_thread_snapshot_state() -> Result<()> {
|
||||
let mut app = make_test_app().await;
|
||||
let thread_id = ThreadId::new();
|
||||
let session = test_thread_session(thread_id, PathBuf::from("/tmp/project"));
|
||||
let turns = vec![
|
||||
test_turn("turn-1", TurnStatus::Completed, Vec::new()),
|
||||
test_turn("turn-2", TurnStatus::Completed, Vec::new()),
|
||||
];
|
||||
let channel = ThreadEventChannel::new_with_session(4, session, turns);
|
||||
{
|
||||
let mut store = channel.store.lock().await;
|
||||
store.push_request(exec_approval_request(
|
||||
thread_id,
|
||||
"turn-approval",
|
||||
"call-approval",
|
||||
None,
|
||||
));
|
||||
assert_eq!(store.has_pending_thread_approvals(), true);
|
||||
}
|
||||
app.thread_event_channels.insert(thread_id, channel);
|
||||
|
||||
app.enqueue_thread_legacy_rollback(thread_id, 1).await?;
|
||||
|
||||
let store = app
|
||||
.thread_event_channels
|
||||
.get(&thread_id)
|
||||
.expect("thread store should exist")
|
||||
.store
|
||||
.lock()
|
||||
.await;
|
||||
assert_eq!(
|
||||
store.turns,
|
||||
vec![test_turn("turn-1", TurnStatus::Completed, Vec::new())]
|
||||
);
|
||||
assert_eq!(store.has_pending_thread_approvals(), false);
|
||||
let snapshot = store.snapshot();
|
||||
assert_eq!(snapshot.turns, store.turns);
|
||||
assert!(snapshot.events.is_empty());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn inactive_thread_started_notification_initializes_replay_session() -> Result<()> {
|
||||
let mut app = make_test_app().await;
|
||||
|
|
@ -8108,16 +7835,6 @@ guardian_approval = true
|
|||
assert_eq!(store.has_pending_thread_approvals(), false);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn thread_event_store_consumes_matching_local_legacy_rollback_once() {
|
||||
let mut store = ThreadEventStore::new(8);
|
||||
store.note_local_thread_rollback(2);
|
||||
|
||||
assert!(store.consume_pending_local_legacy_rollback(2));
|
||||
assert!(!store.consume_pending_local_legacy_rollback(2));
|
||||
assert!(!store.consume_pending_local_legacy_rollback(1));
|
||||
}
|
||||
|
||||
fn next_user_turn_op(op_rx: &mut tokio::sync::mpsc::UnboundedReceiver<Op>) -> Op {
|
||||
let mut seen = Vec::new();
|
||||
while let Ok(op) = op_rx.try_recv() {
|
||||
|
|
@ -9076,8 +8793,13 @@ guardian_approval = true
|
|||
let (tx, rx) = mpsc::channel(8);
|
||||
app.active_thread_id = Some(thread_id);
|
||||
app.active_thread_rx = Some(rx);
|
||||
tx.send(ThreadBufferedEvent::LegacyWarning(
|
||||
"stale warning".to_string(),
|
||||
tx.send(ThreadBufferedEvent::Notification(
|
||||
ServerNotification::ConfigWarning(ConfigWarningNotification {
|
||||
summary: "stale warning".to_string(),
|
||||
details: None,
|
||||
path: None,
|
||||
range: None,
|
||||
}),
|
||||
))
|
||||
.await
|
||||
.expect("event should queue");
|
||||
|
|
@ -9115,62 +8837,6 @@ guardian_approval = true
|
|||
assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn local_rollback_response_suppresses_matching_legacy_rollback() {
|
||||
let mut app = make_test_app().await;
|
||||
let thread_id = ThreadId::new();
|
||||
let session = test_thread_session(thread_id, PathBuf::from("/tmp/project"));
|
||||
let initial_turns = vec![
|
||||
test_turn("turn-1", TurnStatus::Completed, Vec::new()),
|
||||
test_turn("turn-2", TurnStatus::Completed, Vec::new()),
|
||||
];
|
||||
app.thread_event_channels.insert(
|
||||
thread_id,
|
||||
ThreadEventChannel::new_with_session(8, session, initial_turns),
|
||||
);
|
||||
|
||||
app.handle_thread_rollback_response(
|
||||
thread_id,
|
||||
1,
|
||||
&ThreadRollbackResponse {
|
||||
thread: Thread {
|
||||
id: thread_id.to_string(),
|
||||
preview: String::new(),
|
||||
ephemeral: false,
|
||||
model_provider: "openai".to_string(),
|
||||
created_at: 0,
|
||||
updated_at: 0,
|
||||
status: codex_app_server_protocol::ThreadStatus::Idle,
|
||||
path: None,
|
||||
cwd: PathBuf::from("/tmp/project"),
|
||||
cli_version: "0.0.0".to_string(),
|
||||
source: SessionSource::Cli.into(),
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
git_info: None,
|
||||
name: None,
|
||||
turns: vec![test_turn("turn-1", TurnStatus::Completed, Vec::new())],
|
||||
},
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
app.enqueue_thread_legacy_rollback(thread_id, 1)
|
||||
.await
|
||||
.expect("legacy rollback should not fail");
|
||||
|
||||
let store = app
|
||||
.thread_event_channels
|
||||
.get(&thread_id)
|
||||
.expect("thread channel")
|
||||
.store
|
||||
.lock()
|
||||
.await;
|
||||
let snapshot = store.snapshot();
|
||||
assert_eq!(snapshot.turns.len(), 1);
|
||||
assert!(snapshot.events.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn new_session_requests_shutdown_for_previous_conversation() {
|
||||
let (mut app, mut app_event_rx, mut op_rx) = make_test_app_with_channels().await;
|
||||
|
|
|
|||
|
|
@ -21,7 +21,6 @@ use codex_app_server_client::AppServerEvent;
|
|||
use codex_app_server_protocol::AuthMode;
|
||||
use codex_app_server_protocol::ChatgptAuthTokensRefreshParams;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
|
|
@ -106,16 +105,9 @@ use codex_protocol::protocol::TurnAbortedEvent;
|
|||
use codex_protocol::protocol::TurnCompleteEvent;
|
||||
#[cfg(test)]
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
use serde_json::Value;
|
||||
#[cfg(test)]
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
enum LegacyThreadNotification {
|
||||
Warning(String),
|
||||
Rollback { num_turns: u32 },
|
||||
}
|
||||
|
||||
impl App {
|
||||
pub(super) async fn handle_app_server_event(
|
||||
&mut self,
|
||||
|
|
@ -133,37 +125,8 @@ impl App {
|
|||
self.handle_server_notification_event(app_server_client, notification)
|
||||
.await;
|
||||
}
|
||||
AppServerEvent::LegacyNotification(notification) => {
|
||||
if let Some((thread_id, legacy_notification)) =
|
||||
legacy_thread_notification(notification)
|
||||
{
|
||||
let result = match legacy_notification {
|
||||
LegacyThreadNotification::Warning(message) => {
|
||||
if self.primary_thread_id == Some(thread_id)
|
||||
|| self.primary_thread_id.is_none()
|
||||
{
|
||||
self.enqueue_primary_thread_legacy_warning(message).await
|
||||
} else {
|
||||
self.enqueue_thread_legacy_warning(thread_id, message).await
|
||||
}
|
||||
}
|
||||
LegacyThreadNotification::Rollback { num_turns } => {
|
||||
if self.primary_thread_id == Some(thread_id)
|
||||
|| self.primary_thread_id.is_none()
|
||||
{
|
||||
self.enqueue_primary_thread_legacy_rollback(num_turns).await
|
||||
} else {
|
||||
self.enqueue_thread_legacy_rollback(thread_id, num_turns)
|
||||
.await
|
||||
}
|
||||
}
|
||||
};
|
||||
if let Err(err) = result {
|
||||
tracing::warn!("failed to enqueue app-server legacy notification: {err}");
|
||||
}
|
||||
} else {
|
||||
tracing::debug!("ignoring legacy app-server notification in tui_app_server");
|
||||
}
|
||||
AppServerEvent::LegacyNotification(_) => {
|
||||
tracing::debug!("ignoring legacy app-server notification in tui_app_server");
|
||||
}
|
||||
AppServerEvent::ServerRequest(request) => {
|
||||
if let ServerRequest::ChatgptAuthTokensRefresh { request_id, params } = request {
|
||||
|
|
@ -567,48 +530,6 @@ pub(super) fn thread_snapshot_events(
|
|||
.collect()
|
||||
}
|
||||
|
||||
fn legacy_thread_notification(
|
||||
notification: JSONRPCNotification,
|
||||
) -> Option<(ThreadId, LegacyThreadNotification)> {
|
||||
let method = notification
|
||||
.method
|
||||
.strip_prefix("codex/event/")
|
||||
.unwrap_or(¬ification.method);
|
||||
|
||||
let Value::Object(mut params) = notification.params? else {
|
||||
return None;
|
||||
};
|
||||
let thread_id = params
|
||||
.remove("conversationId")
|
||||
.and_then(|value| serde_json::from_value::<String>(value).ok())
|
||||
.and_then(|value| ThreadId::from_string(&value).ok())?;
|
||||
let msg = params.get("msg").and_then(Value::as_object)?;
|
||||
|
||||
match method {
|
||||
"warning" => {
|
||||
let message = msg
|
||||
.get("type")
|
||||
.and_then(Value::as_str)
|
||||
.zip(msg.get("message"))
|
||||
.and_then(|(kind, message)| (kind == "warning").then_some(message))
|
||||
.and_then(Value::as_str)
|
||||
.map(ToOwned::to_owned)?;
|
||||
Some((thread_id, LegacyThreadNotification::Warning(message)))
|
||||
}
|
||||
"thread_rolled_back" => {
|
||||
let num_turns = msg
|
||||
.get("type")
|
||||
.and_then(Value::as_str)
|
||||
.zip(msg.get("num_turns"))
|
||||
.and_then(|(kind, num_turns)| (kind == "thread_rolled_back").then_some(num_turns))
|
||||
.and_then(Value::as_u64)
|
||||
.and_then(|num_turns| u32::try_from(num_turns).ok())?;
|
||||
Some((thread_id, LegacyThreadNotification::Rollback { num_turns }))
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn server_notification_thread_events(
|
||||
notification: ServerNotification,
|
||||
|
|
@ -1289,9 +1210,7 @@ fn app_server_codex_error_info_to_core(
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::LegacyThreadNotification;
|
||||
use super::command_execution_started_event;
|
||||
use super::legacy_thread_notification;
|
||||
use super::server_notification_thread_events;
|
||||
use super::thread_snapshot_events;
|
||||
use super::turn_snapshot_events;
|
||||
|
|
@ -1303,7 +1222,6 @@ mod tests {
|
|||
use codex_app_server_protocol::CommandExecutionStatus;
|
||||
use codex_app_server_protocol::ItemCompletedNotification;
|
||||
use codex_app_server_protocol::ItemStartedNotification;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::ReasoningSummaryTextDeltaNotification;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::Thread;
|
||||
|
|
@ -1324,57 +1242,8 @@ mod tests {
|
|||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use codex_protocol::protocol::TurnAbortedEvent;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[test]
|
||||
fn legacy_warning_notification_extracts_thread_id_and_message() {
|
||||
let thread_id = ThreadId::new();
|
||||
let warning = legacy_thread_notification(JSONRPCNotification {
|
||||
method: "codex/event/warning".to_string(),
|
||||
params: Some(json!({
|
||||
"conversationId": thread_id.to_string(),
|
||||
"id": "event-1",
|
||||
"msg": {
|
||||
"type": "warning",
|
||||
"message": "legacy warning message",
|
||||
},
|
||||
})),
|
||||
});
|
||||
|
||||
assert_eq!(
|
||||
warning,
|
||||
Some((
|
||||
thread_id,
|
||||
LegacyThreadNotification::Warning("legacy warning message".to_string())
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn legacy_thread_rollback_notification_extracts_thread_id_and_turn_count() {
|
||||
let thread_id = ThreadId::new();
|
||||
let rollback = legacy_thread_notification(JSONRPCNotification {
|
||||
method: "codex/event/thread_rolled_back".to_string(),
|
||||
params: Some(json!({
|
||||
"conversationId": thread_id.to_string(),
|
||||
"id": "event-1",
|
||||
"msg": {
|
||||
"type": "thread_rolled_back",
|
||||
"num_turns": 2,
|
||||
},
|
||||
})),
|
||||
});
|
||||
|
||||
assert_eq!(
|
||||
rollback,
|
||||
Some((
|
||||
thread_id,
|
||||
LegacyThreadNotification::Rollback { num_turns: 2 }
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bridges_completed_agent_messages_from_server_notifications() {
|
||||
let thread_id = "019cee8c-b993-7e33-88c0-014d4e62612d".to_string();
|
||||
|
|
|
|||
|
|
@ -9548,10 +9548,6 @@ impl ChatWidget {
|
|||
self.request_redraw();
|
||||
}
|
||||
|
||||
pub(crate) fn add_warning_message(&mut self, message: String) {
|
||||
self.on_warning(message);
|
||||
}
|
||||
|
||||
fn add_app_server_stub_message(&mut self, feature: &str) {
|
||||
warn!(feature, "stubbed unsupported app-server TUI feature");
|
||||
self.add_error_message(format!("{feature}: {APP_SERVER_TUI_STUB_MESSAGE}"));
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue