feat: agent controller (#8783)
Added an agent control plane that lets sessions spawn or message other conversations via `AgentControl`. `AgentBus` (core/src/agent/bus.rs) keeps track of the last known status of a conversation. ConversationManager now holds shared state behind an Arc so AgentControl keeps only a weak back-reference, the goal is just to avoid explicit cycle reference. Follow-ups: * Build a small tool in the TUI to be able to see every agent and send manual message to each of them * Handle approval requests in this TUI * Add tools to spawn/communicate between agents (see related design) * Define agent types
This commit is contained in:
parent
915352b10c
commit
1dd1355df3
10 changed files with 511 additions and 126 deletions
61
codex-rs/core/src/agent/bus.rs
Normal file
61
codex-rs/core/src/agent/bus.rs
Normal file
|
|
@ -0,0 +1,61 @@
|
|||
use codex_protocol::ConversationId;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
/// Status store for globally-tracked agents.
|
||||
#[derive(Clone, Default)]
|
||||
pub(crate) struct AgentBus {
|
||||
/// In-memory map of conversation id to the latest derived status.
|
||||
statuses: Arc<RwLock<HashMap<ConversationId, AgentStatus>>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub(crate) enum AgentStatus {
|
||||
PendingInit,
|
||||
Running,
|
||||
Completed(Option<String>),
|
||||
Errored(String),
|
||||
Shutdown,
|
||||
#[allow(dead_code)] // Used by upcoming multi-agent tooling.
|
||||
NotFound,
|
||||
}
|
||||
|
||||
impl AgentBus {
|
||||
/// Fetch the last known status for `agent_id`, returning `NotFound` if unseen.
|
||||
#[allow(dead_code)] // Used by upcoming multi-agent tooling.
|
||||
pub(crate) async fn status(&self, agent_id: ConversationId) -> AgentStatus {
|
||||
let statuses = self.statuses.read().await;
|
||||
statuses
|
||||
.get(&agent_id)
|
||||
.cloned()
|
||||
.unwrap_or(AgentStatus::NotFound)
|
||||
}
|
||||
|
||||
/// Derive and record agent status from a single emitted event.
|
||||
pub(crate) async fn on_event(&self, conversation_id: ConversationId, msg: &EventMsg) {
|
||||
let next_status = match msg {
|
||||
EventMsg::TaskStarted(_) => Some(AgentStatus::Running),
|
||||
EventMsg::TaskComplete(ev) => {
|
||||
Some(AgentStatus::Completed(ev.last_agent_message.clone()))
|
||||
}
|
||||
EventMsg::TurnAborted(ev) => Some(AgentStatus::Errored(format!("{:?}", ev.reason))),
|
||||
EventMsg::Error(ev) => Some(AgentStatus::Errored(ev.message.clone())),
|
||||
EventMsg::ShutdownComplete => Some(AgentStatus::Shutdown),
|
||||
_ => None,
|
||||
};
|
||||
if let Some(status) = next_status {
|
||||
self.record_status(&conversation_id, status).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Force-set the tracked status for an agent conversation.
|
||||
pub(crate) async fn record_status(
|
||||
&self,
|
||||
conversation_id: &ConversationId,
|
||||
status: AgentStatus,
|
||||
) {
|
||||
self.statuses.write().await.insert(*conversation_id, status);
|
||||
}
|
||||
}
|
||||
254
codex-rs/core/src/agent/control.rs
Normal file
254
codex-rs/core/src/agent/control.rs
Normal file
|
|
@ -0,0 +1,254 @@
|
|||
use crate::CodexConversation;
|
||||
use crate::agent::AgentBus;
|
||||
use crate::agent::AgentStatus;
|
||||
use crate::conversation_manager::ConversationManagerState;
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result as CodexResult;
|
||||
use codex_protocol::ConversationId;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Weak;
|
||||
|
||||
/// Control-plane handle for multi-agent operations.
|
||||
/// `AgentControl` is held by each session (via `SessionServices`). It provides capability to
|
||||
/// spawn new agents and the inter-agent communication layer.
|
||||
#[derive(Clone, Default)]
|
||||
pub(crate) struct AgentControl {
|
||||
/// Weak handle back to the global conversation registry/state.
|
||||
/// This is `Weak` to avoid reference cycles and shadow persistence of the form
|
||||
/// `ConversationManagerState -> CodexConversation -> Session -> SessionServices -> ConversationManagerState`.
|
||||
manager: Weak<ConversationManagerState>,
|
||||
/// Shared agent status store updated from emitted events.
|
||||
pub(crate) bus: AgentBus,
|
||||
}
|
||||
|
||||
impl AgentControl {
|
||||
/// Construct a new `AgentControl` that can spawn/message agents via the given manager state.
|
||||
pub(crate) fn new(manager: Weak<ConversationManagerState>, bus: AgentBus) -> Self {
|
||||
Self { manager, bus }
|
||||
}
|
||||
|
||||
#[allow(dead_code)] // Used by upcoming multi-agent tooling.
|
||||
/// Spawn a new agent conversation and submit the initial prompt.
|
||||
///
|
||||
/// If `headless` is true, a background drain task is spawned to prevent unbounded event growth
|
||||
/// of the channel queue when there is no client actively reading the conversation events.
|
||||
pub(crate) async fn spawn_agent(
|
||||
&self,
|
||||
config: crate::config::Config,
|
||||
prompt: String,
|
||||
headless: bool,
|
||||
) -> CodexResult<ConversationId> {
|
||||
let state = self.upgrade()?;
|
||||
let new_conversation = state.spawn_new_conversation(config, self.clone()).await?;
|
||||
|
||||
self.bus
|
||||
.record_status(&new_conversation.conversation_id, AgentStatus::PendingInit)
|
||||
.await;
|
||||
|
||||
if headless {
|
||||
spawn_headless_drain(
|
||||
Arc::clone(&new_conversation.conversation),
|
||||
new_conversation.conversation_id,
|
||||
self.clone(),
|
||||
);
|
||||
}
|
||||
|
||||
self.send_prompt(new_conversation.conversation_id, prompt)
|
||||
.await?;
|
||||
|
||||
self.bus
|
||||
.record_status(&new_conversation.conversation_id, AgentStatus::Running)
|
||||
.await;
|
||||
|
||||
Ok(new_conversation.conversation_id)
|
||||
}
|
||||
|
||||
#[allow(dead_code)] // Used by upcoming multi-agent tooling.
|
||||
/// Send a `user` prompt to an existing agent conversation.
|
||||
pub(crate) async fn send_prompt(
|
||||
&self,
|
||||
agent_id: ConversationId,
|
||||
prompt: String,
|
||||
) -> CodexResult<String> {
|
||||
let state = self.upgrade()?;
|
||||
state
|
||||
.send_op(
|
||||
agent_id,
|
||||
Op::UserInput {
|
||||
items: vec![UserInput::Text { text: prompt }],
|
||||
final_output_json_schema: None,
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
fn upgrade(&self) -> CodexResult<Arc<ConversationManagerState>> {
|
||||
self.manager.upgrade().ok_or_else(|| {
|
||||
CodexErr::UnsupportedOperation("conversation manager dropped".to_string())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// When an agent is spawned "headless" (no UI/view attached), there may be no consumer polling
|
||||
/// `CodexConversation::next_event()`. The underlying event channel is unbounded, so the producer can
|
||||
/// accumulate events indefinitely. This drain task prevents that memory growth by polling and
|
||||
/// discarding events until shutdown.
|
||||
fn spawn_headless_drain(
|
||||
conversation: Arc<CodexConversation>,
|
||||
conversation_id: ConversationId,
|
||||
agent_control: AgentControl,
|
||||
) {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match conversation.next_event().await {
|
||||
Ok(event) => {
|
||||
if matches!(event.msg, EventMsg::ShutdownComplete) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
agent_control
|
||||
.bus
|
||||
.record_status(&conversation_id, AgentStatus::Errored(err.to_string()))
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use codex_protocol::protocol::ErrorEvent;
|
||||
use codex_protocol::protocol::TaskCompleteEvent;
|
||||
use codex_protocol::protocol::TaskStartedEvent;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use codex_protocol::protocol::TurnAbortedEvent;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_prompt_errors_when_manager_dropped() {
|
||||
let control = AgentControl::default();
|
||||
let err = control
|
||||
.send_prompt(ConversationId::new(), "hello".to_string())
|
||||
.await
|
||||
.expect_err("send_prompt should fail without a manager");
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
"unsupported operation: conversation manager dropped"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn record_status_persists_to_bus() {
|
||||
let control = AgentControl::default();
|
||||
let conversation_id = ConversationId::new();
|
||||
|
||||
control
|
||||
.bus
|
||||
.record_status(&conversation_id, AgentStatus::PendingInit)
|
||||
.await;
|
||||
|
||||
let got = control.bus.status(conversation_id).await;
|
||||
assert_eq!(got, AgentStatus::PendingInit);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn on_event_updates_status_from_task_started() {
|
||||
let control = AgentControl::default();
|
||||
let conversation_id = ConversationId::new();
|
||||
|
||||
control
|
||||
.bus
|
||||
.on_event(
|
||||
conversation_id,
|
||||
&EventMsg::TaskStarted(TaskStartedEvent {
|
||||
model_context_window: None,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
let got = control.bus.status(conversation_id).await;
|
||||
assert_eq!(got, AgentStatus::Running);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn on_event_updates_status_from_task_complete() {
|
||||
let control = AgentControl::default();
|
||||
let conversation_id = ConversationId::new();
|
||||
|
||||
control
|
||||
.bus
|
||||
.on_event(
|
||||
conversation_id,
|
||||
&EventMsg::TaskComplete(TaskCompleteEvent {
|
||||
last_agent_message: Some("done".to_string()),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
let expected = AgentStatus::Completed(Some("done".to_string()));
|
||||
let got = control.bus.status(conversation_id).await;
|
||||
assert_eq!(got, expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn on_event_updates_status_from_error() {
|
||||
let control = AgentControl::default();
|
||||
let conversation_id = ConversationId::new();
|
||||
|
||||
control
|
||||
.bus
|
||||
.on_event(
|
||||
conversation_id,
|
||||
&EventMsg::Error(ErrorEvent {
|
||||
message: "boom".to_string(),
|
||||
codex_error_info: None,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
let expected = AgentStatus::Errored("boom".to_string());
|
||||
let got = control.bus.status(conversation_id).await;
|
||||
assert_eq!(got, expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn on_event_updates_status_from_turn_aborted() {
|
||||
let control = AgentControl::default();
|
||||
let conversation_id = ConversationId::new();
|
||||
|
||||
control
|
||||
.bus
|
||||
.on_event(
|
||||
conversation_id,
|
||||
&EventMsg::TurnAborted(TurnAbortedEvent {
|
||||
reason: TurnAbortReason::Interrupted,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
let expected = AgentStatus::Errored("Interrupted".to_string());
|
||||
let got = control.bus.status(conversation_id).await;
|
||||
assert_eq!(got, expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn on_event_updates_status_from_shutdown_complete() {
|
||||
let control = AgentControl::default();
|
||||
let conversation_id = ConversationId::new();
|
||||
|
||||
control
|
||||
.bus
|
||||
.on_event(conversation_id, &EventMsg::ShutdownComplete)
|
||||
.await;
|
||||
|
||||
let got = control.bus.status(conversation_id).await;
|
||||
assert_eq!(got, AgentStatus::Shutdown);
|
||||
}
|
||||
}
|
||||
6
codex-rs/core/src/agent/mod.rs
Normal file
6
codex-rs/core/src/agent/mod.rs
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
pub(crate) mod bus;
|
||||
pub(crate) mod control;
|
||||
|
||||
pub(crate) use bus::AgentBus;
|
||||
pub(crate) use bus::AgentStatus;
|
||||
pub(crate) use control::AgentControl;
|
||||
|
|
@ -8,6 +8,7 @@ use std::sync::atomic::Ordering;
|
|||
|
||||
use crate::AuthManager;
|
||||
use crate::SandboxState;
|
||||
use crate::agent::AgentControl;
|
||||
use crate::client_common::REVIEW_PROMPT;
|
||||
use crate::compact;
|
||||
use crate::compact::run_inline_auto_compact_task;
|
||||
|
|
@ -207,13 +208,14 @@ fn maybe_push_chat_wire_api_deprecation(
|
|||
|
||||
impl Codex {
|
||||
/// Spawn a new [`Codex`] and initialize the session.
|
||||
pub async fn spawn(
|
||||
pub(crate) async fn spawn(
|
||||
config: Config,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
models_manager: Arc<ModelsManager>,
|
||||
skills_manager: Arc<SkillsManager>,
|
||||
conversation_history: InitialHistory,
|
||||
session_source: SessionSource,
|
||||
agent_control: AgentControl,
|
||||
) -> CodexResult<CodexSpawnOk> {
|
||||
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
let (tx_event, rx_event) = async_channel::unbounded();
|
||||
|
|
@ -284,6 +286,7 @@ impl Codex {
|
|||
conversation_history,
|
||||
session_source_clone,
|
||||
skills_manager,
|
||||
agent_control,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
|
|
@ -549,6 +552,7 @@ impl Session {
|
|||
initial_history: InitialHistory,
|
||||
session_source: SessionSource,
|
||||
skills_manager: Arc<SkillsManager>,
|
||||
agent_control: AgentControl,
|
||||
) -> anyhow::Result<Arc<Self>> {
|
||||
debug!(
|
||||
"Configuring session: model={}; provider={:?}",
|
||||
|
|
@ -670,6 +674,7 @@ impl Session {
|
|||
models_manager: Arc::clone(&models_manager),
|
||||
tool_approvals: Mutex::new(ApprovalStore::default()),
|
||||
skills_manager,
|
||||
agent_control,
|
||||
};
|
||||
|
||||
let sess = Arc::new(Session {
|
||||
|
|
@ -995,6 +1000,11 @@ impl Session {
|
|||
}
|
||||
|
||||
pub(crate) async fn send_event_raw(&self, event: Event) {
|
||||
self.services
|
||||
.agent_control
|
||||
.bus
|
||||
.on_event(self.conversation_id, &event.msg)
|
||||
.await;
|
||||
// Persist the event into rollout (recorder filters as needed)
|
||||
let rollout_items = vec![RolloutItem::EventMsg(event.msg.clone())];
|
||||
self.persist_rollout_items(&rollout_items).await;
|
||||
|
|
@ -3225,6 +3235,7 @@ mod tests {
|
|||
let auth_manager =
|
||||
AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key"));
|
||||
let models_manager = Arc::new(ModelsManager::new(auth_manager.clone()));
|
||||
let agent_control = AgentControl::default();
|
||||
let exec_policy = ExecPolicyManager::default();
|
||||
let model = ModelsManager::get_model_offline(config.model.as_deref());
|
||||
let session_configuration = SessionConfiguration {
|
||||
|
|
@ -3271,6 +3282,7 @@ mod tests {
|
|||
models_manager: Arc::clone(&models_manager),
|
||||
tool_approvals: Mutex::new(ApprovalStore::default()),
|
||||
skills_manager,
|
||||
agent_control,
|
||||
};
|
||||
|
||||
let turn_context = Session::make_turn_context(
|
||||
|
|
@ -3312,6 +3324,7 @@ mod tests {
|
|||
let auth_manager =
|
||||
AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key"));
|
||||
let models_manager = Arc::new(ModelsManager::new(auth_manager.clone()));
|
||||
let agent_control = AgentControl::default();
|
||||
let exec_policy = ExecPolicyManager::default();
|
||||
let model = ModelsManager::get_model_offline(config.model.as_deref());
|
||||
let session_configuration = SessionConfiguration {
|
||||
|
|
@ -3358,6 +3371,7 @@ mod tests {
|
|||
models_manager: Arc::clone(&models_manager),
|
||||
tool_approvals: Mutex::new(ApprovalStore::default()),
|
||||
skills_manager,
|
||||
agent_control,
|
||||
};
|
||||
|
||||
let turn_context = Arc::new(Session::make_turn_context(
|
||||
|
|
|
|||
|
|
@ -52,6 +52,7 @@ pub(crate) async fn run_codex_conversation_interactive(
|
|||
Arc::clone(&parent_session.services.skills_manager),
|
||||
initial_history.unwrap_or(InitialHistory::New),
|
||||
SessionSource::SubAgent(SubAgentSource::Review),
|
||||
parent_session.services.agent_control.clone(),
|
||||
)
|
||||
.await?;
|
||||
let codex = Arc::new(codex);
|
||||
|
|
|
|||
|
|
@ -3,6 +3,8 @@ use crate::AuthManager;
|
|||
use crate::CodexAuth;
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
use crate::ModelProviderInfo;
|
||||
use crate::agent::AgentBus;
|
||||
use crate::agent::AgentControl;
|
||||
use crate::codex::Codex;
|
||||
use crate::codex::CodexSpawnOk;
|
||||
use crate::codex::INITIAL_SUBMIT_ID;
|
||||
|
|
@ -21,6 +23,7 @@ use codex_protocol::items::TurnItem;
|
|||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::openai_models::ModelPreset;
|
||||
use codex_protocol::protocol::InitialHistory;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use std::collections::HashMap;
|
||||
|
|
@ -41,24 +44,36 @@ pub struct NewConversation {
|
|||
/// [`ConversationManager`] is responsible for creating conversations and
|
||||
/// maintaining them in memory.
|
||||
pub struct ConversationManager {
|
||||
state: Arc<ConversationManagerState>,
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
_test_codex_home_guard: Option<TempDir>,
|
||||
}
|
||||
|
||||
/// Shared, `Arc`-owned state for [`ConversationManager`]. This `Arc` is required to have a single
|
||||
/// `Arc` reference that can be downgraded to by `AgentControl` while preventing every single
|
||||
/// function to require an `Arc<&Self>`.
|
||||
pub(crate) struct ConversationManagerState {
|
||||
conversations: Arc<RwLock<HashMap<ConversationId, Arc<CodexConversation>>>>,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
models_manager: Arc<ModelsManager>,
|
||||
skills_manager: Arc<SkillsManager>,
|
||||
session_source: SessionSource,
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
_test_codex_home_guard: Option<TempDir>,
|
||||
agent_bus: AgentBus,
|
||||
}
|
||||
|
||||
impl ConversationManager {
|
||||
pub fn new(auth_manager: Arc<AuthManager>, session_source: SessionSource) -> Self {
|
||||
let skills_manager = Arc::new(SkillsManager::new(auth_manager.codex_home().to_path_buf()));
|
||||
Self {
|
||||
conversations: Arc::new(RwLock::new(HashMap::new())),
|
||||
auth_manager: auth_manager.clone(),
|
||||
session_source,
|
||||
models_manager: Arc::new(ModelsManager::new(auth_manager)),
|
||||
skills_manager,
|
||||
state: Arc::new(ConversationManagerState {
|
||||
conversations: Arc::new(RwLock::new(HashMap::new())),
|
||||
models_manager: Arc::new(ModelsManager::new(auth_manager.clone())),
|
||||
skills_manager: Arc::new(SkillsManager::new(
|
||||
auth_manager.codex_home().to_path_buf(),
|
||||
)),
|
||||
auth_manager,
|
||||
session_source,
|
||||
agent_bus: AgentBus::default(),
|
||||
}),
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
_test_codex_home_guard: None,
|
||||
}
|
||||
|
|
@ -83,40 +98,166 @@ impl ConversationManager {
|
|||
provider: ModelProviderInfo,
|
||||
codex_home: PathBuf,
|
||||
) -> Self {
|
||||
let auth_manager = crate::AuthManager::from_auth_for_testing_with_home(auth, codex_home);
|
||||
let skills_manager = Arc::new(SkillsManager::new(auth_manager.codex_home().to_path_buf()));
|
||||
let auth_manager = AuthManager::from_auth_for_testing_with_home(auth, codex_home);
|
||||
Self {
|
||||
conversations: Arc::new(RwLock::new(HashMap::new())),
|
||||
auth_manager: auth_manager.clone(),
|
||||
session_source: SessionSource::Exec,
|
||||
models_manager: Arc::new(ModelsManager::with_provider(auth_manager, provider)),
|
||||
skills_manager,
|
||||
state: Arc::new(ConversationManagerState {
|
||||
conversations: Arc::new(RwLock::new(HashMap::new())),
|
||||
models_manager: Arc::new(ModelsManager::with_provider(
|
||||
auth_manager.clone(),
|
||||
provider,
|
||||
)),
|
||||
skills_manager: Arc::new(SkillsManager::new(
|
||||
auth_manager.codex_home().to_path_buf(),
|
||||
)),
|
||||
auth_manager,
|
||||
session_source: SessionSource::Exec,
|
||||
agent_bus: AgentBus::default(),
|
||||
}),
|
||||
_test_codex_home_guard: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn session_source(&self) -> SessionSource {
|
||||
self.session_source.clone()
|
||||
self.state.session_source.clone()
|
||||
}
|
||||
|
||||
pub fn skills_manager(&self) -> Arc<SkillsManager> {
|
||||
self.skills_manager.clone()
|
||||
self.state.skills_manager.clone()
|
||||
}
|
||||
|
||||
pub fn get_models_manager(&self) -> Arc<ModelsManager> {
|
||||
self.state.models_manager.clone()
|
||||
}
|
||||
|
||||
pub async fn list_models(&self, config: &Config) -> Vec<ModelPreset> {
|
||||
self.state.models_manager.list_models(config).await
|
||||
}
|
||||
|
||||
pub async fn get_conversation(
|
||||
&self,
|
||||
conversation_id: ConversationId,
|
||||
) -> CodexResult<Arc<CodexConversation>> {
|
||||
self.state.get_conversation(conversation_id).await
|
||||
}
|
||||
|
||||
pub async fn new_conversation(&self, config: Config) -> CodexResult<NewConversation> {
|
||||
self.state
|
||||
.spawn_conversation(
|
||||
config,
|
||||
InitialHistory::New,
|
||||
Arc::clone(&self.state.auth_manager),
|
||||
self.agent_control(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn resume_conversation_from_rollout(
|
||||
&self,
|
||||
config: Config,
|
||||
rollout_path: PathBuf,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
) -> CodexResult<NewConversation> {
|
||||
let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?;
|
||||
self.resume_conversation_with_history(config, initial_history, auth_manager)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn resume_conversation_with_history(
|
||||
&self,
|
||||
config: Config,
|
||||
initial_history: InitialHistory,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
) -> CodexResult<NewConversation> {
|
||||
self.state
|
||||
.spawn_conversation(config, initial_history, auth_manager, self.agent_control())
|
||||
.await
|
||||
}
|
||||
|
||||
/// Removes the conversation from the manager's internal map, though the conversation is stored
|
||||
/// as `Arc<CodexConversation>`, it is possible that other references to it exist elsewhere.
|
||||
/// Returns the conversation if the conversation was found and removed.
|
||||
pub async fn remove_conversation(
|
||||
&self,
|
||||
conversation_id: &ConversationId,
|
||||
) -> Option<Arc<CodexConversation>> {
|
||||
self.state
|
||||
.conversations
|
||||
.write()
|
||||
.await
|
||||
.remove(conversation_id)
|
||||
}
|
||||
|
||||
/// Fork an existing conversation by taking messages up to the given position (not including
|
||||
/// the message at the given position) and starting a new conversation with identical
|
||||
/// configuration (unless overridden by the caller's `config`). The new conversation will have
|
||||
/// a fresh id.
|
||||
pub async fn fork_conversation(
|
||||
&self,
|
||||
nth_user_message: usize,
|
||||
config: Config,
|
||||
path: PathBuf,
|
||||
) -> CodexResult<NewConversation> {
|
||||
let history = RolloutRecorder::get_rollout_history(&path).await?;
|
||||
let history = truncate_before_nth_user_message(history, nth_user_message);
|
||||
self.state
|
||||
.spawn_conversation(
|
||||
config,
|
||||
history,
|
||||
Arc::clone(&self.state.auth_manager),
|
||||
self.agent_control(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
fn agent_control(&self) -> AgentControl {
|
||||
AgentControl::new(Arc::downgrade(&self.state), self.state.agent_bus.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl ConversationManagerState {
|
||||
pub(crate) async fn get_conversation(
|
||||
&self,
|
||||
conversation_id: ConversationId,
|
||||
) -> CodexResult<Arc<CodexConversation>> {
|
||||
let conversations = self.conversations.read().await;
|
||||
conversations
|
||||
.get(&conversation_id)
|
||||
.cloned()
|
||||
.ok_or_else(|| CodexErr::ConversationNotFound(conversation_id))
|
||||
}
|
||||
|
||||
pub(crate) async fn send_op(
|
||||
&self,
|
||||
conversation_id: ConversationId,
|
||||
op: Op,
|
||||
) -> CodexResult<String> {
|
||||
self.get_conversation(conversation_id)
|
||||
.await?
|
||||
.submit(op)
|
||||
.await
|
||||
}
|
||||
|
||||
#[allow(dead_code)] // Used by upcoming multi-agent tooling.
|
||||
pub(crate) async fn spawn_new_conversation(
|
||||
&self,
|
||||
config: Config,
|
||||
agent_control: AgentControl,
|
||||
) -> CodexResult<NewConversation> {
|
||||
self.spawn_conversation(
|
||||
config,
|
||||
self.auth_manager.clone(),
|
||||
self.models_manager.clone(),
|
||||
InitialHistory::New,
|
||||
Arc::clone(&self.auth_manager),
|
||||
agent_control,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn spawn_conversation(
|
||||
pub(crate) async fn spawn_conversation(
|
||||
&self,
|
||||
config: Config,
|
||||
initial_history: InitialHistory,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
models_manager: Arc<ModelsManager>,
|
||||
agent_control: AgentControl,
|
||||
) -> CodexResult<NewConversation> {
|
||||
let CodexSpawnOk {
|
||||
codex,
|
||||
|
|
@ -124,10 +265,11 @@ impl ConversationManager {
|
|||
} = Codex::spawn(
|
||||
config,
|
||||
auth_manager,
|
||||
models_manager,
|
||||
self.skills_manager.clone(),
|
||||
InitialHistory::New,
|
||||
Arc::clone(&self.models_manager),
|
||||
Arc::clone(&self.skills_manager),
|
||||
initial_history,
|
||||
self.session_source.clone(),
|
||||
agent_control,
|
||||
)
|
||||
.await?;
|
||||
self.finalize_spawn(codex, conversation_id).await
|
||||
|
|
@ -138,9 +280,6 @@ impl ConversationManager {
|
|||
codex: Codex,
|
||||
conversation_id: ConversationId,
|
||||
) -> CodexResult<NewConversation> {
|
||||
// The first event must be `SessionInitialized`. Validate and forward it
|
||||
// to the caller so that they can display it in the conversation
|
||||
// history.
|
||||
let event = codex.next_event().await?;
|
||||
let session_configured = match event {
|
||||
Event {
|
||||
|
|
@ -167,100 +306,6 @@ impl ConversationManager {
|
|||
session_configured,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn get_conversation(
|
||||
&self,
|
||||
conversation_id: ConversationId,
|
||||
) -> CodexResult<Arc<CodexConversation>> {
|
||||
let conversations = self.conversations.read().await;
|
||||
conversations
|
||||
.get(&conversation_id)
|
||||
.cloned()
|
||||
.ok_or_else(|| CodexErr::ConversationNotFound(conversation_id))
|
||||
}
|
||||
|
||||
pub async fn resume_conversation_from_rollout(
|
||||
&self,
|
||||
config: Config,
|
||||
rollout_path: PathBuf,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
) -> CodexResult<NewConversation> {
|
||||
let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?;
|
||||
self.resume_conversation_with_history(config, initial_history, auth_manager)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn resume_conversation_with_history(
|
||||
&self,
|
||||
config: Config,
|
||||
initial_history: InitialHistory,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
) -> CodexResult<NewConversation> {
|
||||
let CodexSpawnOk {
|
||||
codex,
|
||||
conversation_id,
|
||||
} = Codex::spawn(
|
||||
config,
|
||||
auth_manager,
|
||||
self.models_manager.clone(),
|
||||
self.skills_manager.clone(),
|
||||
initial_history,
|
||||
self.session_source.clone(),
|
||||
)
|
||||
.await?;
|
||||
self.finalize_spawn(codex, conversation_id).await
|
||||
}
|
||||
|
||||
/// Removes the conversation from the manager's internal map, though the
|
||||
/// conversation is stored as `Arc<CodexConversation>`, it is possible that
|
||||
/// other references to it exist elsewhere. Returns the conversation if the
|
||||
/// conversation was found and removed.
|
||||
pub async fn remove_conversation(
|
||||
&self,
|
||||
conversation_id: &ConversationId,
|
||||
) -> Option<Arc<CodexConversation>> {
|
||||
self.conversations.write().await.remove(conversation_id)
|
||||
}
|
||||
|
||||
/// Fork an existing conversation by taking messages up to the given position
|
||||
/// (not including the message at the given position) and starting a new
|
||||
/// conversation with identical configuration (unless overridden by the
|
||||
/// caller's `config`). The new conversation will have a fresh id.
|
||||
pub async fn fork_conversation(
|
||||
&self,
|
||||
nth_user_message: usize,
|
||||
config: Config,
|
||||
path: PathBuf,
|
||||
) -> CodexResult<NewConversation> {
|
||||
// Compute the prefix up to the cut point.
|
||||
let history = RolloutRecorder::get_rollout_history(&path).await?;
|
||||
let history = truncate_before_nth_user_message(history, nth_user_message);
|
||||
|
||||
// Spawn a new conversation with the computed initial history.
|
||||
let auth_manager = self.auth_manager.clone();
|
||||
let CodexSpawnOk {
|
||||
codex,
|
||||
conversation_id,
|
||||
} = Codex::spawn(
|
||||
config,
|
||||
auth_manager,
|
||||
self.models_manager.clone(),
|
||||
self.skills_manager.clone(),
|
||||
history,
|
||||
self.session_source.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
self.finalize_spawn(codex, conversation_id).await
|
||||
}
|
||||
|
||||
pub async fn list_models(&self, config: &Config) -> Vec<ModelPreset> {
|
||||
self.models_manager.list_models(config).await
|
||||
}
|
||||
|
||||
pub fn get_models_manager(&self) -> Arc<ModelsManager> {
|
||||
self.models_manager.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Return a prefix of `items` obtained by cutting strictly before the nth user message
|
||||
|
|
@ -345,14 +390,13 @@ mod tests {
|
|||
},
|
||||
ResponseItem::FunctionCall {
|
||||
id: None,
|
||||
call_id: "c1".to_string(),
|
||||
name: "tool".to_string(),
|
||||
arguments: "{}".to_string(),
|
||||
call_id: "c1".to_string(),
|
||||
},
|
||||
assistant_msg("a4"),
|
||||
];
|
||||
|
||||
// Wrap as InitialHistory::Forked with response items only.
|
||||
let initial: Vec<RolloutItem> = items
|
||||
.iter()
|
||||
.cloned()
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ pub mod codex;
|
|||
mod codex_conversation;
|
||||
mod compact_remote;
|
||||
pub use codex_conversation::CodexConversation;
|
||||
mod agent;
|
||||
mod codex_delegate;
|
||||
mod command_safety;
|
||||
pub mod config;
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ use std::sync::Arc;
|
|||
|
||||
use crate::AuthManager;
|
||||
use crate::RolloutRecorder;
|
||||
use crate::agent::AgentControl;
|
||||
use crate::exec_policy::ExecPolicyManager;
|
||||
use crate::mcp_connection_manager::McpConnectionManager;
|
||||
use crate::models_manager::manager::ModelsManager;
|
||||
|
|
@ -28,4 +29,5 @@ pub(crate) struct SessionServices {
|
|||
pub(crate) otel_manager: OtelManager,
|
||||
pub(crate) tool_approvals: Mutex<ApprovalStore>,
|
||||
pub(crate) skills_manager: Arc<SkillsManager>,
|
||||
pub(crate) agent_control: AgentControl,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -143,6 +143,7 @@ impl TestCodexBuilder {
|
|||
config.model_provider.clone(),
|
||||
config.codex_home.clone(),
|
||||
);
|
||||
let conversation_manager = Arc::new(conversation_manager);
|
||||
|
||||
let new_conversation = match resume_from {
|
||||
Some(path) => {
|
||||
|
|
@ -164,7 +165,7 @@ impl TestCodexBuilder {
|
|||
config,
|
||||
codex: new_conversation.conversation,
|
||||
session_configured: new_conversation.session_configured,
|
||||
conversation_manager: Arc::new(conversation_manager),
|
||||
conversation_manager,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -442,7 +442,8 @@ where
|
|||
|
||||
mutate_config(&mut config);
|
||||
|
||||
let conversation_manager = Arc::new(ConversationManager::with_models_provider(auth, provider));
|
||||
let conversation_manager = ConversationManager::with_models_provider(auth, provider);
|
||||
let conversation_manager = Arc::new(conversation_manager);
|
||||
|
||||
let new_conversation = conversation_manager
|
||||
.new_conversation(config.clone())
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue