2026-02-11 16:21:13 -08:00
use crate ::outgoing_message ::ConnectionId ;
2026-02-11 12:20:54 -08:00
use crate ::outgoing_message ::ConnectionRequestId ;
2026-02-27 12:45:59 -08:00
use codex_app_server_protocol ::RequestId ;
2026-02-19 21:29:05 -08:00
use codex_app_server_protocol ::ThreadHistoryBuilder ;
use codex_app_server_protocol ::Turn ;
2026-02-11 12:20:54 -08:00
use codex_app_server_protocol ::TurnError ;
2026-02-11 16:21:13 -08:00
use codex_core ::CodexThread ;
2026-02-19 21:29:05 -08:00
use codex_core ::ThreadConfigSnapshot ;
2026-02-11 12:20:54 -08:00
use codex_protocol ::ThreadId ;
2026-02-19 21:29:05 -08:00
use codex_protocol ::protocol ::EventMsg ;
2026-02-11 12:20:54 -08:00
use std ::collections ::HashMap ;
use std ::collections ::HashSet ;
2026-02-19 21:29:05 -08:00
use std ::path ::PathBuf ;
2026-02-11 12:20:54 -08:00
use std ::sync ::Arc ;
2026-02-11 16:21:13 -08:00
use std ::sync ::Weak ;
2026-02-11 12:20:54 -08:00
use tokio ::sync ::Mutex ;
2026-02-19 21:29:05 -08:00
use tokio ::sync ::mpsc ;
2026-02-11 12:20:54 -08:00
use tokio ::sync ::oneshot ;
type PendingInterruptQueue = Vec < (
ConnectionRequestId ,
crate ::codex_message_processor ::ApiVersion ,
) > ;
2026-02-19 21:29:05 -08:00
pub ( crate ) struct PendingThreadResumeRequest {
pub ( crate ) request_id : ConnectionRequestId ,
pub ( crate ) rollout_path : PathBuf ,
pub ( crate ) config_snapshot : ThreadConfigSnapshot ,
2026-03-04 17:16:43 -08:00
pub ( crate ) thread_summary : codex_app_server_protocol ::Thread ,
2026-02-19 21:29:05 -08:00
}
2026-02-27 12:45:59 -08:00
// ThreadListenerCommand is used to perform operations in the context of the thread listener, for serialization purposes.
2026-02-19 21:29:05 -08:00
pub ( crate ) enum ThreadListenerCommand {
2026-02-27 12:45:59 -08:00
// SendThreadResumeResponse is used to resume an already running thread by sending the thread's history to the client and atomically subscribing for new updates.
SendThreadResumeResponse ( Box < PendingThreadResumeRequest > ) ,
// ResolveServerRequest is used to notify the client that the request has been resolved.
// It is executed in the thread listener's context to ensure that the resolved notification is ordered with regard to the request itself.
ResolveServerRequest {
request_id : RequestId ,
completion_tx : oneshot ::Sender < ( ) > ,
} ,
2026-02-19 21:29:05 -08:00
}
2026-02-11 12:20:54 -08:00
/// Per-conversation accumulation of the latest states e.g. error message while a turn runs.
#[ derive(Default, Clone) ]
pub ( crate ) struct TurnSummary {
pub ( crate ) file_change_started : HashSet < String > ,
2026-02-17 17:55:57 -08:00
pub ( crate ) command_execution_started : HashSet < String > ,
2026-02-11 12:20:54 -08:00
pub ( crate ) last_error : Option < TurnError > ,
}
#[ derive(Default) ]
pub ( crate ) struct ThreadState {
pub ( crate ) pending_interrupts : PendingInterruptQueue ,
pub ( crate ) pending_rollbacks : Option < ConnectionRequestId > ,
pub ( crate ) turn_summary : TurnSummary ,
2026-02-11 16:21:13 -08:00
pub ( crate ) cancel_tx : Option < oneshot ::Sender < ( ) > > ,
pub ( crate ) experimental_raw_events : bool ,
2026-02-20 12:35:16 -08:00
pub ( crate ) listener_generation : u64 ,
2026-02-19 21:29:05 -08:00
listener_command_tx : Option < mpsc ::UnboundedSender < ThreadListenerCommand > > ,
current_turn_history : ThreadHistoryBuilder ,
2026-02-11 16:21:13 -08:00
listener_thread : Option < Weak < CodexThread > > ,
2026-02-11 12:20:54 -08:00
}
impl ThreadState {
2026-02-11 16:21:13 -08:00
pub ( crate ) fn listener_matches ( & self , conversation : & Arc < CodexThread > ) -> bool {
self . listener_thread
. as_ref ( )
. and_then ( Weak ::upgrade )
. is_some_and ( | existing | Arc ::ptr_eq ( & existing , conversation ) )
}
pub ( crate ) fn set_listener (
& mut self ,
cancel_tx : oneshot ::Sender < ( ) > ,
conversation : & Arc < CodexThread > ,
2026-02-20 12:35:16 -08:00
) -> ( mpsc ::UnboundedReceiver < ThreadListenerCommand > , u64 ) {
2026-02-11 16:21:13 -08:00
if let Some ( previous ) = self . cancel_tx . replace ( cancel_tx ) {
2026-02-11 12:20:54 -08:00
let _ = previous . send ( ( ) ) ;
}
2026-02-20 12:35:16 -08:00
self . listener_generation = self . listener_generation . wrapping_add ( 1 ) ;
2026-02-19 21:29:05 -08:00
let ( listener_command_tx , listener_command_rx ) = mpsc ::unbounded_channel ( ) ;
self . listener_command_tx = Some ( listener_command_tx ) ;
2026-02-11 16:21:13 -08:00
self . listener_thread = Some ( Arc ::downgrade ( conversation ) ) ;
2026-02-20 12:35:16 -08:00
( listener_command_rx , self . listener_generation )
2026-02-11 12:20:54 -08:00
}
2026-02-11 16:21:13 -08:00
pub ( crate ) fn clear_listener ( & mut self ) {
if let Some ( cancel_tx ) = self . cancel_tx . take ( ) {
2026-02-11 12:20:54 -08:00
let _ = cancel_tx . send ( ( ) ) ;
}
2026-02-19 21:29:05 -08:00
self . listener_command_tx = None ;
self . current_turn_history . reset ( ) ;
2026-02-11 16:21:13 -08:00
self . listener_thread = None ;
2026-02-11 12:20:54 -08:00
}
2026-02-11 16:21:13 -08:00
pub ( crate ) fn set_experimental_raw_events ( & mut self , enabled : bool ) {
self . experimental_raw_events = enabled ;
2026-02-11 12:20:54 -08:00
}
2026-02-19 21:29:05 -08:00
pub ( crate ) fn listener_command_tx (
& self ,
) -> Option < mpsc ::UnboundedSender < ThreadListenerCommand > > {
self . listener_command_tx . clone ( )
}
pub ( crate ) fn active_turn_snapshot ( & self ) -> Option < Turn > {
self . current_turn_history . active_turn_snapshot ( )
}
pub ( crate ) fn track_current_turn_event ( & mut self , event : & EventMsg ) {
self . current_turn_history . handle_event ( event ) ;
if ! self . current_turn_history . has_active_turn ( ) {
self . current_turn_history . reset ( ) ;
}
}
2026-02-11 12:20:54 -08:00
}
2026-02-27 17:40:08 -08:00
struct ThreadEntry {
state : Arc < Mutex < ThreadState > > ,
connection_ids : HashSet < ConnectionId > ,
}
impl Default for ThreadEntry {
fn default ( ) -> Self {
Self {
state : Arc ::new ( Mutex ::new ( ThreadState ::default ( ) ) ) ,
connection_ids : HashSet ::new ( ) ,
}
}
}
2026-02-11 12:20:54 -08:00
#[ derive(Default) ]
2026-02-27 17:40:08 -08:00
struct ThreadStateManagerInner {
live_connections : HashSet < ConnectionId > ,
threads : HashMap < ThreadId , ThreadEntry > ,
2026-02-11 16:21:13 -08:00
thread_ids_by_connection : HashMap < ConnectionId , HashSet < ThreadId > > ,
2026-02-11 12:20:54 -08:00
}
2026-02-27 17:40:08 -08:00
#[ derive(Clone, Default) ]
pub ( crate ) struct ThreadStateManager {
state : Arc < Mutex < ThreadStateManagerInner > > ,
}
2026-02-11 12:20:54 -08:00
impl ThreadStateManager {
pub ( crate ) fn new ( ) -> Self {
Self ::default ( )
}
2026-02-27 17:40:08 -08:00
pub ( crate ) async fn connection_initialized ( & self , connection_id : ConnectionId ) {
self . state
. lock ( )
. await
. live_connections
. insert ( connection_id ) ;
2026-02-11 12:20:54 -08:00
}
2026-02-27 17:40:08 -08:00
pub ( crate ) async fn subscribed_connection_ids ( & self , thread_id : ThreadId ) -> Vec < ConnectionId > {
let state = self . state . lock ( ) . await ;
state
. threads
. get ( & thread_id )
. map ( | thread_entry | thread_entry . connection_ids . iter ( ) . copied ( ) . collect ( ) )
. unwrap_or_default ( )
}
2026-02-11 16:21:13 -08:00
2026-02-27 17:40:08 -08:00
pub ( crate ) async fn thread_state ( & self , thread_id : ThreadId ) -> Arc < Mutex < ThreadState > > {
let mut state = self . state . lock ( ) . await ;
state . threads . entry ( thread_id ) . or_default ( ) . state . clone ( )
}
2026-02-11 16:21:13 -08:00
2026-02-27 17:40:08 -08:00
pub ( crate ) async fn remove_thread_state ( & self , thread_id : ThreadId ) {
let thread_state = {
let mut state = self . state . lock ( ) . await ;
let thread_state = state
. threads
. remove ( & thread_id )
. map ( | thread_entry | thread_entry . state ) ;
state . thread_ids_by_connection . retain ( | _ , thread_ids | {
thread_ids . remove ( & thread_id ) ;
! thread_ids . is_empty ( )
} ) ;
thread_state
} ;
if let Some ( thread_state ) = thread_state {
2026-02-21 21:33:33 -08:00
let mut thread_state = thread_state . lock ( ) . await ;
tracing ::debug! (
thread_id = % thread_id ,
listener_generation = thread_state . listener_generation ,
had_listener = thread_state . cancel_tx . is_some ( ) ,
had_active_turn = thread_state . active_turn_snapshot ( ) . is_some ( ) ,
" clearing thread listener during thread-state teardown "
) ;
thread_state . clear_listener ( ) ;
2026-02-11 12:20:54 -08:00
}
}
2026-03-16 16:42:43 +00:00
pub ( crate ) async fn clear_all_listeners ( & self ) {
let thread_states = {
let state = self . state . lock ( ) . await ;
state
. threads
. iter ( )
. map ( | ( thread_id , thread_entry ) | ( * thread_id , thread_entry . state . clone ( ) ) )
. collect ::< Vec < _ > > ( )
} ;
for ( thread_id , thread_state ) in thread_states {
let mut thread_state = thread_state . lock ( ) . await ;
tracing ::debug! (
thread_id = % thread_id ,
listener_generation = thread_state . listener_generation ,
had_listener = thread_state . cancel_tx . is_some ( ) ,
had_active_turn = thread_state . active_turn_snapshot ( ) . is_some ( ) ,
" clearing thread listener during app-server shutdown "
) ;
thread_state . clear_listener ( ) ;
}
}
2026-02-25 13:14:30 -08:00
pub ( crate ) async fn unsubscribe_connection_from_thread (
2026-02-27 17:40:08 -08:00
& self ,
2026-02-25 13:14:30 -08:00
thread_id : ThreadId ,
connection_id : ConnectionId ,
) -> bool {
{
2026-02-27 17:40:08 -08:00
let mut state = self . state . lock ( ) . await ;
if ! state . threads . contains_key ( & thread_id ) {
return false ;
}
2026-02-25 13:14:30 -08:00
2026-02-27 17:40:08 -08:00
if ! state
. thread_ids_by_connection
. get ( & connection_id )
. is_some_and ( | thread_ids | thread_ids . contains ( & thread_id ) )
{
return false ;
2026-02-25 13:14:30 -08:00
}
2026-02-27 17:40:08 -08:00
if let Some ( thread_ids ) = state . thread_ids_by_connection . get_mut ( & connection_id ) {
thread_ids . remove ( & thread_id ) ;
if thread_ids . is_empty ( ) {
state . thread_ids_by_connection . remove ( & connection_id ) ;
}
}
if let Some ( thread_entry ) = state . threads . get_mut ( & thread_id ) {
thread_entry . connection_ids . remove ( & connection_id ) ;
}
} ;
2026-02-25 13:14:30 -08:00
true
}
pub ( crate ) async fn has_subscribers ( & self , thread_id : ThreadId ) -> bool {
2026-02-27 17:40:08 -08:00
self . state
2026-02-25 13:14:30 -08:00
. lock ( )
. await
2026-02-27 17:40:08 -08:00
. threads
. get ( & thread_id )
. is_some_and ( | thread_entry | ! thread_entry . connection_ids . is_empty ( ) )
2026-02-25 13:14:30 -08:00
}
2026-02-27 17:40:08 -08:00
pub ( crate ) async fn try_ensure_connection_subscribed (
& self ,
2026-02-11 16:21:13 -08:00
thread_id : ThreadId ,
connection_id : ConnectionId ,
experimental_raw_events : bool ,
2026-02-27 17:40:08 -08:00
) -> Option < Arc < Mutex < ThreadState > > > {
let thread_state = {
let mut state = self . state . lock ( ) . await ;
if ! state . live_connections . contains ( & connection_id ) {
return None ;
}
state
. thread_ids_by_connection
. entry ( connection_id )
. or_default ( )
. insert ( thread_id ) ;
let thread_entry = state . threads . entry ( thread_id ) . or_default ( ) ;
thread_entry . connection_ids . insert ( connection_id ) ;
thread_entry . state . clone ( )
} ;
2026-02-11 16:21:13 -08:00
{
let mut thread_state_guard = thread_state . lock ( ) . await ;
if experimental_raw_events {
2026-03-16 16:48:15 -07:00
thread_state_guard . set_experimental_raw_events ( /* enabled */ true ) ;
2026-02-11 16:21:13 -08:00
}
}
2026-02-27 17:40:08 -08:00
Some ( thread_state )
2026-02-11 12:20:54 -08:00
}
2026-02-11 16:21:13 -08:00
2026-02-27 17:40:08 -08:00
pub ( crate ) async fn try_add_connection_to_thread (
& self ,
thread_id : ThreadId ,
connection_id : ConnectionId ,
) -> bool {
let mut state = self . state . lock ( ) . await ;
if ! state . live_connections . contains ( & connection_id ) {
return false ;
}
state
2026-02-19 21:29:05 -08:00
. thread_ids_by_connection
2026-02-27 17:40:08 -08:00
. entry ( connection_id )
. or_default ( )
. insert ( thread_id ) ;
state
. threads
. entry ( thread_id )
. or_default ( )
. connection_ids
. insert ( connection_id ) ;
true
}
pub ( crate ) async fn remove_connection ( & self , connection_id : ConnectionId ) {
let thread_states = {
let mut state = self . state . lock ( ) . await ;
state . live_connections . remove ( & connection_id ) ;
let thread_ids = state
. thread_ids_by_connection
. remove ( & connection_id )
. unwrap_or_default ( ) ;
for thread_id in & thread_ids {
if let Some ( thread_entry ) = state . threads . get_mut ( thread_id ) {
thread_entry . connection_ids . remove ( & connection_id ) ;
2026-02-19 21:29:05 -08:00
}
}
2026-02-27 17:40:08 -08:00
thread_ids
. into_iter ( )
. map ( | thread_id | {
(
thread_id ,
state
. threads
. get ( & thread_id )
. is_none_or ( | thread_entry | thread_entry . connection_ids . is_empty ( ) ) ,
state
. threads
. get ( & thread_id )
. map ( | thread_entry | thread_entry . state . clone ( ) ) ,
)
} )
. collect ::< Vec < _ > > ( )
} ;
2026-02-19 21:29:05 -08:00
2026-02-27 17:40:08 -08:00
for ( thread_id , no_subscribers , thread_state ) in thread_states {
if ! no_subscribers {
continue ;
2026-02-11 16:21:13 -08:00
}
2026-02-27 17:40:08 -08:00
let Some ( thread_state ) = thread_state else {
continue ;
} ;
let listener_generation = thread_state . lock ( ) . await . listener_generation ;
tracing ::debug! (
thread_id = % thread_id ,
connection_id = ? connection_id ,
listener_generation ,
" retaining thread listener after connection disconnect left zero subscribers "
) ;
2026-02-11 16:21:13 -08:00
}
}
2026-02-11 12:20:54 -08:00
}