From 188f79afeedf95f100aeaaf0b665255edaac74d9 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 6 Jan 2026 19:44:39 +0000 Subject: [PATCH] feat: drop agent bus and store the agent status in codex directly (#8788) --- codex-rs/core/src/agent/bus.rs | 61 --------- codex-rs/core/src/agent/control.rs | 147 ++++++---------------- codex-rs/core/src/agent/mod.rs | 6 +- codex-rs/core/src/agent/status.rs | 15 +++ codex-rs/core/src/codex.rs | 29 ++++- codex-rs/core/src/codex_conversation.rs | 5 + codex-rs/core/src/codex_delegate.rs | 4 + codex-rs/core/src/conversation_manager.rs | 6 +- codex-rs/protocol/src/protocol.rs | 20 +++ 9 files changed, 113 insertions(+), 180 deletions(-) delete mode 100644 codex-rs/core/src/agent/bus.rs create mode 100644 codex-rs/core/src/agent/status.rs diff --git a/codex-rs/core/src/agent/bus.rs b/codex-rs/core/src/agent/bus.rs deleted file mode 100644 index 0b15f415f..000000000 --- a/codex-rs/core/src/agent/bus.rs +++ /dev/null @@ -1,61 +0,0 @@ -use codex_protocol::ConversationId; -use codex_protocol::protocol::EventMsg; -use std::collections::HashMap; -use std::sync::Arc; -use tokio::sync::RwLock; - -/// Status store for globally-tracked agents. -#[derive(Clone, Default)] -pub(crate) struct AgentBus { - /// In-memory map of conversation id to the latest derived status. - statuses: Arc>>, -} - -#[derive(Clone, Debug, PartialEq, Eq)] -pub(crate) enum AgentStatus { - PendingInit, - Running, - Completed(Option), - Errored(String), - Shutdown, - #[allow(dead_code)] // Used by upcoming multi-agent tooling. - NotFound, -} - -impl AgentBus { - /// Fetch the last known status for `agent_id`, returning `NotFound` if unseen. - #[allow(dead_code)] // Used by upcoming multi-agent tooling. - pub(crate) async fn status(&self, agent_id: ConversationId) -> AgentStatus { - let statuses = self.statuses.read().await; - statuses - .get(&agent_id) - .cloned() - .unwrap_or(AgentStatus::NotFound) - } - - /// Derive and record agent status from a single emitted event. - pub(crate) async fn on_event(&self, conversation_id: ConversationId, msg: &EventMsg) { - let next_status = match msg { - EventMsg::TaskStarted(_) => Some(AgentStatus::Running), - EventMsg::TaskComplete(ev) => { - Some(AgentStatus::Completed(ev.last_agent_message.clone())) - } - EventMsg::TurnAborted(ev) => Some(AgentStatus::Errored(format!("{:?}", ev.reason))), - EventMsg::Error(ev) => Some(AgentStatus::Errored(ev.message.clone())), - EventMsg::ShutdownComplete => Some(AgentStatus::Shutdown), - _ => None, - }; - if let Some(status) = next_status { - self.record_status(&conversation_id, status).await; - } - } - - /// Force-set the tracked status for an agent conversation. - pub(crate) async fn record_status( - &self, - conversation_id: &ConversationId, - status: AgentStatus, - ) { - self.statuses.write().await.insert(*conversation_id, status); - } -} diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index 181051b46..468f580a2 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -1,5 +1,4 @@ use crate::CodexConversation; -use crate::agent::AgentBus; use crate::agent::AgentStatus; use crate::conversation_manager::ConversationManagerState; use crate::error::CodexErr; @@ -20,14 +19,12 @@ pub(crate) struct AgentControl { /// This is `Weak` to avoid reference cycles and shadow persistence of the form /// `ConversationManagerState -> CodexConversation -> Session -> SessionServices -> ConversationManagerState`. manager: Weak, - /// Shared agent status store updated from emitted events. - pub(crate) bus: AgentBus, } impl AgentControl { /// Construct a new `AgentControl` that can spawn/message agents via the given manager state. - pub(crate) fn new(manager: Weak, bus: AgentBus) -> Self { - Self { manager, bus } + pub(crate) fn new(manager: Weak) -> Self { + Self { manager } } #[allow(dead_code)] // Used by upcoming multi-agent tooling. @@ -44,25 +41,13 @@ impl AgentControl { let state = self.upgrade()?; let new_conversation = state.spawn_new_conversation(config, self.clone()).await?; - self.bus - .record_status(&new_conversation.conversation_id, AgentStatus::PendingInit) - .await; - if headless { - spawn_headless_drain( - Arc::clone(&new_conversation.conversation), - new_conversation.conversation_id, - self.clone(), - ); + spawn_headless_drain(Arc::clone(&new_conversation.conversation)); } self.send_prompt(new_conversation.conversation_id, prompt) .await?; - self.bus - .record_status(&new_conversation.conversation_id, AgentStatus::Running) - .await; - Ok(new_conversation.conversation_id) } @@ -85,6 +70,19 @@ impl AgentControl { .await } + #[allow(dead_code)] // Used by upcoming multi-agent tooling. + /// Fetch the last known status for `agent_id`, returning `NotFound` when unavailable. + pub(crate) async fn get_status(&self, agent_id: ConversationId) -> AgentStatus { + let Ok(state) = self.upgrade() else { + // No agent available if upgrade fails. + return AgentStatus::NotFound; + }; + let Ok(conversation) = state.get_conversation(agent_id).await else { + return AgentStatus::NotFound; + }; + conversation.agent_status().await + } + fn upgrade(&self) -> CodexResult> { self.manager.upgrade().ok_or_else(|| { CodexErr::UnsupportedOperation("conversation manager dropped".to_string()) @@ -96,11 +94,7 @@ impl AgentControl { /// `CodexConversation::next_event()`. The underlying event channel is unbounded, so the producer can /// accumulate events indefinitely. This drain task prevents that memory growth by polling and /// discarding events until shutdown. -fn spawn_headless_drain( - conversation: Arc, - conversation_id: ConversationId, - agent_control: AgentControl, -) { +fn spawn_headless_drain(conversation: Arc) { tokio::spawn(async move { loop { match conversation.next_event().await { @@ -110,10 +104,7 @@ fn spawn_headless_drain( } } Err(err) => { - agent_control - .bus - .record_status(&conversation_id, AgentStatus::Errored(err.to_string())) - .await; + tracing::warn!("failed to receive event from agent: {err:?}"); break; } } @@ -124,6 +115,7 @@ fn spawn_headless_drain( #[cfg(test)] mod tests { use super::*; + use crate::agent::agent_status_from_event; use codex_protocol::protocol::ErrorEvent; use codex_protocol::protocol::TaskCompleteEvent; use codex_protocol::protocol::TaskStartedEvent; @@ -145,110 +137,53 @@ mod tests { } #[tokio::test] - async fn record_status_persists_to_bus() { + async fn get_status_returns_not_found_without_manager() { let control = AgentControl::default(); - let conversation_id = ConversationId::new(); - - control - .bus - .record_status(&conversation_id, AgentStatus::PendingInit) - .await; - - let got = control.bus.status(conversation_id).await; - assert_eq!(got, AgentStatus::PendingInit); + let got = control.get_status(ConversationId::new()).await; + assert_eq!(got, AgentStatus::NotFound); } #[tokio::test] async fn on_event_updates_status_from_task_started() { - let control = AgentControl::default(); - let conversation_id = ConversationId::new(); - - control - .bus - .on_event( - conversation_id, - &EventMsg::TaskStarted(TaskStartedEvent { - model_context_window: None, - }), - ) - .await; - - let got = control.bus.status(conversation_id).await; - assert_eq!(got, AgentStatus::Running); + let status = agent_status_from_event(&EventMsg::TaskStarted(TaskStartedEvent { + model_context_window: None, + })); + assert_eq!(status, Some(AgentStatus::Running)); } #[tokio::test] async fn on_event_updates_status_from_task_complete() { - let control = AgentControl::default(); - let conversation_id = ConversationId::new(); - - control - .bus - .on_event( - conversation_id, - &EventMsg::TaskComplete(TaskCompleteEvent { - last_agent_message: Some("done".to_string()), - }), - ) - .await; - + let status = agent_status_from_event(&EventMsg::TaskComplete(TaskCompleteEvent { + last_agent_message: Some("done".to_string()), + })); let expected = AgentStatus::Completed(Some("done".to_string())); - let got = control.bus.status(conversation_id).await; - assert_eq!(got, expected); + assert_eq!(status, Some(expected)); } #[tokio::test] async fn on_event_updates_status_from_error() { - let control = AgentControl::default(); - let conversation_id = ConversationId::new(); - - control - .bus - .on_event( - conversation_id, - &EventMsg::Error(ErrorEvent { - message: "boom".to_string(), - codex_error_info: None, - }), - ) - .await; + let status = agent_status_from_event(&EventMsg::Error(ErrorEvent { + message: "boom".to_string(), + codex_error_info: None, + })); let expected = AgentStatus::Errored("boom".to_string()); - let got = control.bus.status(conversation_id).await; - assert_eq!(got, expected); + assert_eq!(status, Some(expected)); } #[tokio::test] async fn on_event_updates_status_from_turn_aborted() { - let control = AgentControl::default(); - let conversation_id = ConversationId::new(); - - control - .bus - .on_event( - conversation_id, - &EventMsg::TurnAborted(TurnAbortedEvent { - reason: TurnAbortReason::Interrupted, - }), - ) - .await; + let status = agent_status_from_event(&EventMsg::TurnAborted(TurnAbortedEvent { + reason: TurnAbortReason::Interrupted, + })); let expected = AgentStatus::Errored("Interrupted".to_string()); - let got = control.bus.status(conversation_id).await; - assert_eq!(got, expected); + assert_eq!(status, Some(expected)); } #[tokio::test] async fn on_event_updates_status_from_shutdown_complete() { - let control = AgentControl::default(); - let conversation_id = ConversationId::new(); - - control - .bus - .on_event(conversation_id, &EventMsg::ShutdownComplete) - .await; - - let got = control.bus.status(conversation_id).await; - assert_eq!(got, AgentStatus::Shutdown); + let status = agent_status_from_event(&EventMsg::ShutdownComplete); + assert_eq!(status, Some(AgentStatus::Shutdown)); } } diff --git a/codex-rs/core/src/agent/mod.rs b/codex-rs/core/src/agent/mod.rs index 29eb9577a..d6348b38b 100644 --- a/codex-rs/core/src/agent/mod.rs +++ b/codex-rs/core/src/agent/mod.rs @@ -1,6 +1,6 @@ -pub(crate) mod bus; pub(crate) mod control; +pub(crate) mod status; -pub(crate) use bus::AgentBus; -pub(crate) use bus::AgentStatus; +pub(crate) use codex_protocol::protocol::AgentStatus; pub(crate) use control::AgentControl; +pub(crate) use status::agent_status_from_event; diff --git a/codex-rs/core/src/agent/status.rs b/codex-rs/core/src/agent/status.rs new file mode 100644 index 000000000..f5345a291 --- /dev/null +++ b/codex-rs/core/src/agent/status.rs @@ -0,0 +1,15 @@ +use codex_protocol::protocol::AgentStatus; +use codex_protocol::protocol::EventMsg; + +/// Derive the next agent status from a single emitted event. +/// Returns `None` when the event does not affect status tracking. +pub(crate) fn agent_status_from_event(msg: &EventMsg) -> Option { + match msg { + EventMsg::TaskStarted(_) => Some(AgentStatus::Running), + EventMsg::TaskComplete(ev) => Some(AgentStatus::Completed(ev.last_agent_message.clone())), + EventMsg::TurnAborted(ev) => Some(AgentStatus::Errored(format!("{:?}", ev.reason))), + EventMsg::Error(ev) => Some(AgentStatus::Errored(ev.message.clone())), + EventMsg::ShutdownComplete => Some(AgentStatus::Shutdown), + _ => None, + } +} diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 2d73b47f5..e0729b57a 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -9,6 +9,8 @@ use std::sync::atomic::Ordering; use crate::AuthManager; use crate::SandboxState; use crate::agent::AgentControl; +use crate::agent::AgentStatus; +use crate::agent::agent_status_from_event; use crate::client_common::REVIEW_PROMPT; use crate::compact; use crate::compact::run_inline_auto_compact_task; @@ -168,6 +170,8 @@ pub struct Codex { pub(crate) next_id: AtomicU64, pub(crate) tx_sub: Sender, pub(crate) rx_event: Receiver, + // Last known status of the agent. + pub(crate) agent_status: Arc>, } /// Wrapper returned by [`Codex::spawn`] containing the spawned [`Codex`], @@ -275,6 +279,7 @@ impl Codex { // Generate a unique ID for the lifetime of this Codex session. let session_source_clone = session_configuration.session_source.clone(); + let agent_status = Arc::new(RwLock::new(AgentStatus::PendingInit)); let session = Session::new( session_configuration, @@ -283,6 +288,7 @@ impl Codex { models_manager.clone(), exec_policy, tx_event.clone(), + Arc::clone(&agent_status), conversation_history, session_source_clone, skills_manager, @@ -301,6 +307,7 @@ impl Codex { next_id: AtomicU64::new(0), tx_sub, rx_event, + agent_status, }; Ok(CodexSpawnOk { @@ -338,6 +345,11 @@ impl Codex { .map_err(|_| CodexErr::InternalAgentDied)?; Ok(event) } + + pub(crate) async fn agent_status(&self) -> AgentStatus { + let status = self.agent_status.read().await; + status.clone() + } } /// Context for an initialized model agent @@ -346,6 +358,7 @@ impl Codex { pub(crate) struct Session { conversation_id: ConversationId, tx_event: Sender, + agent_status: Arc>, state: Mutex, /// The set of enabled features should be invariant for the lifetime of the /// session. @@ -549,6 +562,7 @@ impl Session { models_manager: Arc, exec_policy: ExecPolicyManager, tx_event: Sender, + agent_status: Arc>, initial_history: InitialHistory, session_source: SessionSource, skills_manager: Arc, @@ -680,6 +694,7 @@ impl Session { let sess = Arc::new(Session { conversation_id, tx_event: tx_event.clone(), + agent_status: Arc::clone(&agent_status), state: Mutex::new(state), features: config.features.clone(), active_turn: Mutex::new(None), @@ -1000,11 +1015,11 @@ impl Session { } pub(crate) async fn send_event_raw(&self, event: Event) { - self.services - .agent_control - .bus - .on_event(self.conversation_id, &event.msg) - .await; + // Record the last known agent status. + if let Some(status) = agent_status_from_event(&event.msg) { + let mut guard = self.agent_status.write().await; + *guard = status; + } // Persist the event into rollout (recorder filters as needed) let rollout_items = vec![RolloutItem::EventMsg(event.msg.clone())]; self.persist_rollout_items(&rollout_items).await; @@ -3237,6 +3252,7 @@ mod tests { let models_manager = Arc::new(ModelsManager::new(auth_manager.clone())); let agent_control = AgentControl::default(); let exec_policy = ExecPolicyManager::default(); + let agent_status = Arc::new(RwLock::new(AgentStatus::PendingInit)); let model = ModelsManager::get_model_offline(config.model.as_deref()); let session_configuration = SessionConfiguration { provider: config.model_provider.clone(), @@ -3299,6 +3315,7 @@ mod tests { let session = Session { conversation_id, tx_event, + agent_status: Arc::clone(&agent_status), state: Mutex::new(state), features: config.features.clone(), active_turn: Mutex::new(None), @@ -3326,6 +3343,7 @@ mod tests { let models_manager = Arc::new(ModelsManager::new(auth_manager.clone())); let agent_control = AgentControl::default(); let exec_policy = ExecPolicyManager::default(); + let agent_status = Arc::new(RwLock::new(AgentStatus::PendingInit)); let model = ModelsManager::get_model_offline(config.model.as_deref()); let session_configuration = SessionConfiguration { provider: config.model_provider.clone(), @@ -3388,6 +3406,7 @@ mod tests { let session = Arc::new(Session { conversation_id, tx_event, + agent_status: Arc::clone(&agent_status), state: Mutex::new(state), features: config.features.clone(), active_turn: Mutex::new(None), diff --git a/codex-rs/core/src/codex_conversation.rs b/codex-rs/core/src/codex_conversation.rs index 5bb9c97c5..723a4cf4f 100644 --- a/codex-rs/core/src/codex_conversation.rs +++ b/codex-rs/core/src/codex_conversation.rs @@ -1,3 +1,4 @@ +use crate::agent::AgentStatus; use crate::codex::Codex; use crate::error::Result as CodexResult; use crate::protocol::Event; @@ -33,6 +34,10 @@ impl CodexConversation { self.codex.next_event().await } + pub async fn agent_status(&self) -> AgentStatus { + self.codex.agent_status().await + } + pub fn rollout_path(&self) -> PathBuf { self.rollout_path.clone() } diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 10d45190d..d7458f1ac 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -87,6 +87,7 @@ pub(crate) async fn run_codex_conversation_interactive( next_id: AtomicU64::new(0), tx_sub: tx_ops, rx_event: rx_sub, + agent_status: Arc::clone(&codex.agent_status), }) } @@ -128,6 +129,7 @@ pub(crate) async fn run_codex_conversation_one_shot( // Bridge events so we can observe completion and shut down automatically. let (tx_bridge, rx_bridge) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY); let ops_tx = io.tx_sub.clone(); + let agent_status = Arc::clone(&io.agent_status); let io_for_bridge = io; tokio::spawn(async move { while let Ok(event) = io_for_bridge.next_event().await { @@ -159,6 +161,7 @@ pub(crate) async fn run_codex_conversation_one_shot( next_id: AtomicU64::new(0), rx_event: rx_bridge, tx_sub: tx_closed, + agent_status, }) } @@ -373,6 +376,7 @@ mod tests { next_id: AtomicU64::new(0), tx_sub, rx_event: rx_events, + agent_status: Default::default(), }); let (session, ctx, _rx_evt) = crate::codex::make_session_and_context_with_rx().await; diff --git a/codex-rs/core/src/conversation_manager.rs b/codex-rs/core/src/conversation_manager.rs index 1092a488b..c5bb9a175 100644 --- a/codex-rs/core/src/conversation_manager.rs +++ b/codex-rs/core/src/conversation_manager.rs @@ -3,7 +3,6 @@ use crate::AuthManager; use crate::CodexAuth; #[cfg(any(test, feature = "test-support"))] use crate::ModelProviderInfo; -use crate::agent::AgentBus; use crate::agent::AgentControl; use crate::codex::Codex; use crate::codex::CodexSpawnOk; @@ -58,7 +57,6 @@ pub(crate) struct ConversationManagerState { models_manager: Arc, skills_manager: Arc, session_source: SessionSource, - agent_bus: AgentBus, } impl ConversationManager { @@ -72,7 +70,6 @@ impl ConversationManager { )), auth_manager, session_source, - agent_bus: AgentBus::default(), }), #[cfg(any(test, feature = "test-support"))] _test_codex_home_guard: None, @@ -111,7 +108,6 @@ impl ConversationManager { )), auth_manager, session_source: SessionSource::Exec, - agent_bus: AgentBus::default(), }), _test_codex_home_guard: None, } @@ -210,7 +206,7 @@ impl ConversationManager { } fn agent_control(&self) -> AgentControl { - AgentControl::new(Arc::downgrade(&self.state), self.state.agent_bus.clone()) + AgentControl::new(Arc::downgrade(&self.state)) } } diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index f9369de9c..7fb3d62d2 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -672,6 +672,26 @@ pub enum EventMsg { ReasoningRawContentDelta(ReasoningRawContentDeltaEvent), } +/// Agent lifecycle status, derived from emitted events. +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS, Default)] +#[serde(rename_all = "snake_case")] +#[ts(rename_all = "snake_case")] +pub enum AgentStatus { + /// Agent is waiting for initialization. + #[default] + PendingInit, + /// Agent is currently running. + Running, + /// Agent is done. Contains the final assistant message. + Completed(Option), + /// Agent encountered an error. + Errored(String), + /// Agent has been shutdowned. + Shutdown, + /// Agent is not found. + NotFound, +} + /// Codex errors that we expose to clients. #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, JsonSchema, TS)] #[serde(rename_all = "snake_case")]