feat: drop agent bus and store the agent status in codex directly (#8788)

This commit is contained in:
jif-oai 2026-01-06 19:44:39 +00:00 committed by GitHub
parent a0b2d03302
commit 188f79afee
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 113 additions and 180 deletions

View file

@ -1,61 +0,0 @@
use codex_protocol::ConversationId;
use codex_protocol::protocol::EventMsg;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
/// Status store for globally-tracked agents.
#[derive(Clone, Default)]
pub(crate) struct AgentBus {
/// In-memory map of conversation id to the latest derived status.
statuses: Arc<RwLock<HashMap<ConversationId, AgentStatus>>>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum AgentStatus {
PendingInit,
Running,
Completed(Option<String>),
Errored(String),
Shutdown,
#[allow(dead_code)] // Used by upcoming multi-agent tooling.
NotFound,
}
impl AgentBus {
/// Fetch the last known status for `agent_id`, returning `NotFound` if unseen.
#[allow(dead_code)] // Used by upcoming multi-agent tooling.
pub(crate) async fn status(&self, agent_id: ConversationId) -> AgentStatus {
let statuses = self.statuses.read().await;
statuses
.get(&agent_id)
.cloned()
.unwrap_or(AgentStatus::NotFound)
}
/// Derive and record agent status from a single emitted event.
pub(crate) async fn on_event(&self, conversation_id: ConversationId, msg: &EventMsg) {
let next_status = match msg {
EventMsg::TaskStarted(_) => Some(AgentStatus::Running),
EventMsg::TaskComplete(ev) => {
Some(AgentStatus::Completed(ev.last_agent_message.clone()))
}
EventMsg::TurnAborted(ev) => Some(AgentStatus::Errored(format!("{:?}", ev.reason))),
EventMsg::Error(ev) => Some(AgentStatus::Errored(ev.message.clone())),
EventMsg::ShutdownComplete => Some(AgentStatus::Shutdown),
_ => None,
};
if let Some(status) = next_status {
self.record_status(&conversation_id, status).await;
}
}
/// Force-set the tracked status for an agent conversation.
pub(crate) async fn record_status(
&self,
conversation_id: &ConversationId,
status: AgentStatus,
) {
self.statuses.write().await.insert(*conversation_id, status);
}
}

View file

@ -1,5 +1,4 @@
use crate::CodexConversation;
use crate::agent::AgentBus;
use crate::agent::AgentStatus;
use crate::conversation_manager::ConversationManagerState;
use crate::error::CodexErr;
@ -20,14 +19,12 @@ pub(crate) struct AgentControl {
/// This is `Weak` to avoid reference cycles and shadow persistence of the form
/// `ConversationManagerState -> CodexConversation -> Session -> SessionServices -> ConversationManagerState`.
manager: Weak<ConversationManagerState>,
/// Shared agent status store updated from emitted events.
pub(crate) bus: AgentBus,
}
impl AgentControl {
/// Construct a new `AgentControl` that can spawn/message agents via the given manager state.
pub(crate) fn new(manager: Weak<ConversationManagerState>, bus: AgentBus) -> Self {
Self { manager, bus }
pub(crate) fn new(manager: Weak<ConversationManagerState>) -> Self {
Self { manager }
}
#[allow(dead_code)] // Used by upcoming multi-agent tooling.
@ -44,25 +41,13 @@ impl AgentControl {
let state = self.upgrade()?;
let new_conversation = state.spawn_new_conversation(config, self.clone()).await?;
self.bus
.record_status(&new_conversation.conversation_id, AgentStatus::PendingInit)
.await;
if headless {
spawn_headless_drain(
Arc::clone(&new_conversation.conversation),
new_conversation.conversation_id,
self.clone(),
);
spawn_headless_drain(Arc::clone(&new_conversation.conversation));
}
self.send_prompt(new_conversation.conversation_id, prompt)
.await?;
self.bus
.record_status(&new_conversation.conversation_id, AgentStatus::Running)
.await;
Ok(new_conversation.conversation_id)
}
@ -85,6 +70,19 @@ impl AgentControl {
.await
}
#[allow(dead_code)] // Used by upcoming multi-agent tooling.
/// Fetch the last known status for `agent_id`, returning `NotFound` when unavailable.
pub(crate) async fn get_status(&self, agent_id: ConversationId) -> AgentStatus {
let Ok(state) = self.upgrade() else {
// No agent available if upgrade fails.
return AgentStatus::NotFound;
};
let Ok(conversation) = state.get_conversation(agent_id).await else {
return AgentStatus::NotFound;
};
conversation.agent_status().await
}
fn upgrade(&self) -> CodexResult<Arc<ConversationManagerState>> {
self.manager.upgrade().ok_or_else(|| {
CodexErr::UnsupportedOperation("conversation manager dropped".to_string())
@ -96,11 +94,7 @@ impl AgentControl {
/// `CodexConversation::next_event()`. The underlying event channel is unbounded, so the producer can
/// accumulate events indefinitely. This drain task prevents that memory growth by polling and
/// discarding events until shutdown.
fn spawn_headless_drain(
conversation: Arc<CodexConversation>,
conversation_id: ConversationId,
agent_control: AgentControl,
) {
fn spawn_headless_drain(conversation: Arc<CodexConversation>) {
tokio::spawn(async move {
loop {
match conversation.next_event().await {
@ -110,10 +104,7 @@ fn spawn_headless_drain(
}
}
Err(err) => {
agent_control
.bus
.record_status(&conversation_id, AgentStatus::Errored(err.to_string()))
.await;
tracing::warn!("failed to receive event from agent: {err:?}");
break;
}
}
@ -124,6 +115,7 @@ fn spawn_headless_drain(
#[cfg(test)]
mod tests {
use super::*;
use crate::agent::agent_status_from_event;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::TaskCompleteEvent;
use codex_protocol::protocol::TaskStartedEvent;
@ -145,110 +137,53 @@ mod tests {
}
#[tokio::test]
async fn record_status_persists_to_bus() {
async fn get_status_returns_not_found_without_manager() {
let control = AgentControl::default();
let conversation_id = ConversationId::new();
control
.bus
.record_status(&conversation_id, AgentStatus::PendingInit)
.await;
let got = control.bus.status(conversation_id).await;
assert_eq!(got, AgentStatus::PendingInit);
let got = control.get_status(ConversationId::new()).await;
assert_eq!(got, AgentStatus::NotFound);
}
#[tokio::test]
async fn on_event_updates_status_from_task_started() {
let control = AgentControl::default();
let conversation_id = ConversationId::new();
control
.bus
.on_event(
conversation_id,
&EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
)
.await;
let got = control.bus.status(conversation_id).await;
assert_eq!(got, AgentStatus::Running);
let status = agent_status_from_event(&EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}));
assert_eq!(status, Some(AgentStatus::Running));
}
#[tokio::test]
async fn on_event_updates_status_from_task_complete() {
let control = AgentControl::default();
let conversation_id = ConversationId::new();
control
.bus
.on_event(
conversation_id,
&EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: Some("done".to_string()),
}),
)
.await;
let status = agent_status_from_event(&EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: Some("done".to_string()),
}));
let expected = AgentStatus::Completed(Some("done".to_string()));
let got = control.bus.status(conversation_id).await;
assert_eq!(got, expected);
assert_eq!(status, Some(expected));
}
#[tokio::test]
async fn on_event_updates_status_from_error() {
let control = AgentControl::default();
let conversation_id = ConversationId::new();
control
.bus
.on_event(
conversation_id,
&EventMsg::Error(ErrorEvent {
message: "boom".to_string(),
codex_error_info: None,
}),
)
.await;
let status = agent_status_from_event(&EventMsg::Error(ErrorEvent {
message: "boom".to_string(),
codex_error_info: None,
}));
let expected = AgentStatus::Errored("boom".to_string());
let got = control.bus.status(conversation_id).await;
assert_eq!(got, expected);
assert_eq!(status, Some(expected));
}
#[tokio::test]
async fn on_event_updates_status_from_turn_aborted() {
let control = AgentControl::default();
let conversation_id = ConversationId::new();
control
.bus
.on_event(
conversation_id,
&EventMsg::TurnAborted(TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
)
.await;
let status = agent_status_from_event(&EventMsg::TurnAborted(TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}));
let expected = AgentStatus::Errored("Interrupted".to_string());
let got = control.bus.status(conversation_id).await;
assert_eq!(got, expected);
assert_eq!(status, Some(expected));
}
#[tokio::test]
async fn on_event_updates_status_from_shutdown_complete() {
let control = AgentControl::default();
let conversation_id = ConversationId::new();
control
.bus
.on_event(conversation_id, &EventMsg::ShutdownComplete)
.await;
let got = control.bus.status(conversation_id).await;
assert_eq!(got, AgentStatus::Shutdown);
let status = agent_status_from_event(&EventMsg::ShutdownComplete);
assert_eq!(status, Some(AgentStatus::Shutdown));
}
}

View file

@ -1,6 +1,6 @@
pub(crate) mod bus;
pub(crate) mod control;
pub(crate) mod status;
pub(crate) use bus::AgentBus;
pub(crate) use bus::AgentStatus;
pub(crate) use codex_protocol::protocol::AgentStatus;
pub(crate) use control::AgentControl;
pub(crate) use status::agent_status_from_event;

View file

@ -0,0 +1,15 @@
use codex_protocol::protocol::AgentStatus;
use codex_protocol::protocol::EventMsg;
/// Derive the next agent status from a single emitted event.
/// Returns `None` when the event does not affect status tracking.
pub(crate) fn agent_status_from_event(msg: &EventMsg) -> Option<AgentStatus> {
match msg {
EventMsg::TaskStarted(_) => Some(AgentStatus::Running),
EventMsg::TaskComplete(ev) => Some(AgentStatus::Completed(ev.last_agent_message.clone())),
EventMsg::TurnAborted(ev) => Some(AgentStatus::Errored(format!("{:?}", ev.reason))),
EventMsg::Error(ev) => Some(AgentStatus::Errored(ev.message.clone())),
EventMsg::ShutdownComplete => Some(AgentStatus::Shutdown),
_ => None,
}
}

View file

@ -9,6 +9,8 @@ use std::sync::atomic::Ordering;
use crate::AuthManager;
use crate::SandboxState;
use crate::agent::AgentControl;
use crate::agent::AgentStatus;
use crate::agent::agent_status_from_event;
use crate::client_common::REVIEW_PROMPT;
use crate::compact;
use crate::compact::run_inline_auto_compact_task;
@ -168,6 +170,8 @@ pub struct Codex {
pub(crate) next_id: AtomicU64,
pub(crate) tx_sub: Sender<Submission>,
pub(crate) rx_event: Receiver<Event>,
// Last known status of the agent.
pub(crate) agent_status: Arc<RwLock<AgentStatus>>,
}
/// Wrapper returned by [`Codex::spawn`] containing the spawned [`Codex`],
@ -275,6 +279,7 @@ impl Codex {
// Generate a unique ID for the lifetime of this Codex session.
let session_source_clone = session_configuration.session_source.clone();
let agent_status = Arc::new(RwLock::new(AgentStatus::PendingInit));
let session = Session::new(
session_configuration,
@ -283,6 +288,7 @@ impl Codex {
models_manager.clone(),
exec_policy,
tx_event.clone(),
Arc::clone(&agent_status),
conversation_history,
session_source_clone,
skills_manager,
@ -301,6 +307,7 @@ impl Codex {
next_id: AtomicU64::new(0),
tx_sub,
rx_event,
agent_status,
};
Ok(CodexSpawnOk {
@ -338,6 +345,11 @@ impl Codex {
.map_err(|_| CodexErr::InternalAgentDied)?;
Ok(event)
}
pub(crate) async fn agent_status(&self) -> AgentStatus {
let status = self.agent_status.read().await;
status.clone()
}
}
/// Context for an initialized model agent
@ -346,6 +358,7 @@ impl Codex {
pub(crate) struct Session {
conversation_id: ConversationId,
tx_event: Sender<Event>,
agent_status: Arc<RwLock<AgentStatus>>,
state: Mutex<SessionState>,
/// The set of enabled features should be invariant for the lifetime of the
/// session.
@ -549,6 +562,7 @@ impl Session {
models_manager: Arc<ModelsManager>,
exec_policy: ExecPolicyManager,
tx_event: Sender<Event>,
agent_status: Arc<RwLock<AgentStatus>>,
initial_history: InitialHistory,
session_source: SessionSource,
skills_manager: Arc<SkillsManager>,
@ -680,6 +694,7 @@ impl Session {
let sess = Arc::new(Session {
conversation_id,
tx_event: tx_event.clone(),
agent_status: Arc::clone(&agent_status),
state: Mutex::new(state),
features: config.features.clone(),
active_turn: Mutex::new(None),
@ -1000,11 +1015,11 @@ impl Session {
}
pub(crate) async fn send_event_raw(&self, event: Event) {
self.services
.agent_control
.bus
.on_event(self.conversation_id, &event.msg)
.await;
// Record the last known agent status.
if let Some(status) = agent_status_from_event(&event.msg) {
let mut guard = self.agent_status.write().await;
*guard = status;
}
// Persist the event into rollout (recorder filters as needed)
let rollout_items = vec![RolloutItem::EventMsg(event.msg.clone())];
self.persist_rollout_items(&rollout_items).await;
@ -3237,6 +3252,7 @@ mod tests {
let models_manager = Arc::new(ModelsManager::new(auth_manager.clone()));
let agent_control = AgentControl::default();
let exec_policy = ExecPolicyManager::default();
let agent_status = Arc::new(RwLock::new(AgentStatus::PendingInit));
let model = ModelsManager::get_model_offline(config.model.as_deref());
let session_configuration = SessionConfiguration {
provider: config.model_provider.clone(),
@ -3299,6 +3315,7 @@ mod tests {
let session = Session {
conversation_id,
tx_event,
agent_status: Arc::clone(&agent_status),
state: Mutex::new(state),
features: config.features.clone(),
active_turn: Mutex::new(None),
@ -3326,6 +3343,7 @@ mod tests {
let models_manager = Arc::new(ModelsManager::new(auth_manager.clone()));
let agent_control = AgentControl::default();
let exec_policy = ExecPolicyManager::default();
let agent_status = Arc::new(RwLock::new(AgentStatus::PendingInit));
let model = ModelsManager::get_model_offline(config.model.as_deref());
let session_configuration = SessionConfiguration {
provider: config.model_provider.clone(),
@ -3388,6 +3406,7 @@ mod tests {
let session = Arc::new(Session {
conversation_id,
tx_event,
agent_status: Arc::clone(&agent_status),
state: Mutex::new(state),
features: config.features.clone(),
active_turn: Mutex::new(None),

View file

@ -1,3 +1,4 @@
use crate::agent::AgentStatus;
use crate::codex::Codex;
use crate::error::Result as CodexResult;
use crate::protocol::Event;
@ -33,6 +34,10 @@ impl CodexConversation {
self.codex.next_event().await
}
pub async fn agent_status(&self) -> AgentStatus {
self.codex.agent_status().await
}
pub fn rollout_path(&self) -> PathBuf {
self.rollout_path.clone()
}

View file

@ -87,6 +87,7 @@ pub(crate) async fn run_codex_conversation_interactive(
next_id: AtomicU64::new(0),
tx_sub: tx_ops,
rx_event: rx_sub,
agent_status: Arc::clone(&codex.agent_status),
})
}
@ -128,6 +129,7 @@ pub(crate) async fn run_codex_conversation_one_shot(
// Bridge events so we can observe completion and shut down automatically.
let (tx_bridge, rx_bridge) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let ops_tx = io.tx_sub.clone();
let agent_status = Arc::clone(&io.agent_status);
let io_for_bridge = io;
tokio::spawn(async move {
while let Ok(event) = io_for_bridge.next_event().await {
@ -159,6 +161,7 @@ pub(crate) async fn run_codex_conversation_one_shot(
next_id: AtomicU64::new(0),
rx_event: rx_bridge,
tx_sub: tx_closed,
agent_status,
})
}
@ -373,6 +376,7 @@ mod tests {
next_id: AtomicU64::new(0),
tx_sub,
rx_event: rx_events,
agent_status: Default::default(),
});
let (session, ctx, _rx_evt) = crate::codex::make_session_and_context_with_rx().await;

View file

@ -3,7 +3,6 @@ use crate::AuthManager;
use crate::CodexAuth;
#[cfg(any(test, feature = "test-support"))]
use crate::ModelProviderInfo;
use crate::agent::AgentBus;
use crate::agent::AgentControl;
use crate::codex::Codex;
use crate::codex::CodexSpawnOk;
@ -58,7 +57,6 @@ pub(crate) struct ConversationManagerState {
models_manager: Arc<ModelsManager>,
skills_manager: Arc<SkillsManager>,
session_source: SessionSource,
agent_bus: AgentBus,
}
impl ConversationManager {
@ -72,7 +70,6 @@ impl ConversationManager {
)),
auth_manager,
session_source,
agent_bus: AgentBus::default(),
}),
#[cfg(any(test, feature = "test-support"))]
_test_codex_home_guard: None,
@ -111,7 +108,6 @@ impl ConversationManager {
)),
auth_manager,
session_source: SessionSource::Exec,
agent_bus: AgentBus::default(),
}),
_test_codex_home_guard: None,
}
@ -210,7 +206,7 @@ impl ConversationManager {
}
fn agent_control(&self) -> AgentControl {
AgentControl::new(Arc::downgrade(&self.state), self.state.agent_bus.clone())
AgentControl::new(Arc::downgrade(&self.state))
}
}

View file

@ -672,6 +672,26 @@ pub enum EventMsg {
ReasoningRawContentDelta(ReasoningRawContentDeltaEvent),
}
/// Agent lifecycle status, derived from emitted events.
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS, Default)]
#[serde(rename_all = "snake_case")]
#[ts(rename_all = "snake_case")]
pub enum AgentStatus {
/// Agent is waiting for initialization.
#[default]
PendingInit,
/// Agent is currently running.
Running,
/// Agent is done. Contains the final assistant message.
Completed(Option<String>),
/// Agent encountered an error.
Errored(String),
/// Agent has been shutdowned.
Shutdown,
/// Agent is not found.
NotFound,
}
/// Codex errors that we expose to clients.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, JsonSchema, TS)]
#[serde(rename_all = "snake_case")]