core: reuse parent shell snapshot for thread-spawn subagents (#13052)
## Summary
- reuse the parent shell snapshot when spawning/forking/resuming
`SessionSource::SubAgent(SubAgentSource::ThreadSpawn { .. })` sessions
- plumb inherited snapshot through `AgentControl -> ThreadManager ->
Codex::spawn -> SessionConfiguration`
- skip shell snapshot refresh on cwd updates for thread-spawn subagents
so inherited snapshots are not replaced
## Why
- avoids per-subagent shell snapshot creation and cleanup work
- keeps thread-spawn subagents on the parent snapshot path, matching the
intended parent/child snapshot model
## Validation
- `just fmt` (in `codex-rs`)
- `cargo test -p codex-core --no-run`
- `cargo test -p codex-core spawn_agent -- --nocapture`
- `cargo test -p codex-core --test all
suite::agent_jobs::spawn_agents_on_csv_runs_and_exports`
## Notes
- full `cargo test -p codex-core --test all` was left running separately
for broader verification
Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
parent
2a5bcc053f
commit
c2e126f92a
4 changed files with 91 additions and 10 deletions
|
|
@ -7,6 +7,7 @@ use crate::find_thread_path_by_id_str;
|
|||
use crate::rollout::RolloutRecorder;
|
||||
use crate::session_prefix::format_subagent_context_line;
|
||||
use crate::session_prefix::format_subagent_notification_message;
|
||||
use crate::shell_snapshot::ShellSnapshot;
|
||||
use crate::state_db;
|
||||
use crate::thread_manager::ThreadManagerState;
|
||||
use codex_protocol::ThreadId;
|
||||
|
|
@ -83,6 +84,9 @@ impl AgentControl {
|
|||
) -> CodexResult<ThreadId> {
|
||||
let state = self.upgrade()?;
|
||||
let mut reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
|
||||
let inherited_shell_snapshot = self
|
||||
.inherited_shell_snapshot_for_source(&state, session_source.as_ref())
|
||||
.await;
|
||||
let session_source = match session_source {
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
|
|
@ -161,6 +165,7 @@ impl AgentControl {
|
|||
self.clone(),
|
||||
session_source,
|
||||
false,
|
||||
inherited_shell_snapshot,
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
|
|
@ -171,6 +176,7 @@ impl AgentControl {
|
|||
session_source,
|
||||
false,
|
||||
None,
|
||||
inherited_shell_snapshot,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
|
|
@ -235,6 +241,9 @@ impl AgentControl {
|
|||
other => other,
|
||||
};
|
||||
let notification_source = session_source.clone();
|
||||
let inherited_shell_snapshot = self
|
||||
.inherited_shell_snapshot_for_source(&state, Some(&session_source))
|
||||
.await;
|
||||
let rollout_path =
|
||||
find_thread_path_by_id_str(config.codex_home.as_path(), &thread_id.to_string())
|
||||
.await?
|
||||
|
|
@ -246,6 +255,7 @@ impl AgentControl {
|
|||
rollout_path,
|
||||
self.clone(),
|
||||
session_source,
|
||||
inherited_shell_snapshot,
|
||||
)
|
||||
.await?;
|
||||
reservation.commit(resumed_thread.thread_id);
|
||||
|
|
@ -431,6 +441,22 @@ impl AgentControl {
|
|||
.upgrade()
|
||||
.ok_or_else(|| CodexErr::UnsupportedOperation("thread manager dropped".to_string()))
|
||||
}
|
||||
|
||||
async fn inherited_shell_snapshot_for_source(
|
||||
&self,
|
||||
state: &Arc<ThreadManagerState>,
|
||||
session_source: Option<&SessionSource>,
|
||||
) -> Option<Arc<ShellSnapshot>> {
|
||||
let Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id, ..
|
||||
})) = session_source
|
||||
else {
|
||||
return None;
|
||||
};
|
||||
|
||||
let parent_thread = state.get_thread(*parent_thread_id).await.ok()?;
|
||||
parent_thread.codex.session.user_shell().shell_snapshot()
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
|
|
|||
|
|
@ -345,6 +345,7 @@ impl Codex {
|
|||
dynamic_tools: Vec<DynamicToolSpec>,
|
||||
persist_extended_history: bool,
|
||||
metrics_service_name: Option<String>,
|
||||
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
|
||||
) -> CodexResult<CodexSpawnOk> {
|
||||
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
let (tx_event, rx_event) = async_channel::unbounded();
|
||||
|
|
@ -475,6 +476,7 @@ impl Codex {
|
|||
session_source,
|
||||
dynamic_tools,
|
||||
persist_extended_history,
|
||||
inherited_shell_snapshot,
|
||||
};
|
||||
|
||||
// Generate a unique ID for the lifetime of this Codex session.
|
||||
|
|
@ -865,6 +867,7 @@ pub(crate) struct SessionConfiguration {
|
|||
session_source: SessionSource,
|
||||
dynamic_tools: Vec<DynamicToolSpec>,
|
||||
persist_extended_history: bool,
|
||||
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
|
||||
}
|
||||
|
||||
impl SessionConfiguration {
|
||||
|
|
@ -1383,13 +1386,19 @@ impl Session {
|
|||
};
|
||||
// Create the mutable state for the Session.
|
||||
let shell_snapshot_tx = if config.features.enabled(Feature::ShellSnapshot) {
|
||||
ShellSnapshot::start_snapshotting(
|
||||
config.codex_home.clone(),
|
||||
conversation_id,
|
||||
session_configuration.cwd.clone(),
|
||||
&mut default_shell,
|
||||
otel_manager.clone(),
|
||||
)
|
||||
if let Some(snapshot) = session_configuration.inherited_shell_snapshot.clone() {
|
||||
let (tx, rx) = watch::channel(Some(snapshot));
|
||||
default_shell.shell_snapshot = rx;
|
||||
tx
|
||||
} else {
|
||||
ShellSnapshot::start_snapshotting(
|
||||
config.codex_home.clone(),
|
||||
conversation_id,
|
||||
session_configuration.cwd.clone(),
|
||||
&mut default_shell,
|
||||
otel_manager.clone(),
|
||||
)
|
||||
}
|
||||
} else {
|
||||
let (tx, rx) = watch::channel(None);
|
||||
default_shell.shell_snapshot = rx;
|
||||
|
|
@ -1978,6 +1987,7 @@ impl Session {
|
|||
previous_cwd: &Path,
|
||||
next_cwd: &Path,
|
||||
codex_home: &Path,
|
||||
session_source: &SessionSource,
|
||||
) {
|
||||
if previous_cwd == next_cwd {
|
||||
return;
|
||||
|
|
@ -1987,6 +1997,13 @@ impl Session {
|
|||
return;
|
||||
}
|
||||
|
||||
if matches!(
|
||||
session_source,
|
||||
SessionSource::SubAgent(SubAgentSource::ThreadSpawn { .. })
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
ShellSnapshot::refresh_snapshot(
|
||||
codex_home.to_path_buf(),
|
||||
self.conversation_id,
|
||||
|
|
@ -2008,10 +2025,16 @@ impl Session {
|
|||
let previous_cwd = state.session_configuration.cwd.clone();
|
||||
let next_cwd = updated.cwd.clone();
|
||||
let codex_home = updated.codex_home.clone();
|
||||
let session_source = updated.session_source.clone();
|
||||
state.session_configuration = updated;
|
||||
drop(state);
|
||||
|
||||
self.maybe_refresh_shell_snapshot_for_cwd(&previous_cwd, &next_cwd, &codex_home);
|
||||
self.maybe_refresh_shell_snapshot_for_cwd(
|
||||
&previous_cwd,
|
||||
&next_cwd,
|
||||
&codex_home,
|
||||
&session_source,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -2027,7 +2050,13 @@ impl Session {
|
|||
sub_id: String,
|
||||
updates: SessionSettingsUpdate,
|
||||
) -> ConstraintResult<Arc<TurnContext>> {
|
||||
let (session_configuration, sandbox_policy_changed, previous_cwd, codex_home) = {
|
||||
let (
|
||||
session_configuration,
|
||||
sandbox_policy_changed,
|
||||
previous_cwd,
|
||||
codex_home,
|
||||
session_source,
|
||||
) = {
|
||||
let mut state = self.state.lock().await;
|
||||
match state.session_configuration.clone().apply(&updates) {
|
||||
Ok(next) => {
|
||||
|
|
@ -2035,8 +2064,15 @@ impl Session {
|
|||
let sandbox_policy_changed =
|
||||
state.session_configuration.sandbox_policy != next.sandbox_policy;
|
||||
let codex_home = next.codex_home.clone();
|
||||
let session_source = next.session_source.clone();
|
||||
state.session_configuration = next.clone();
|
||||
(next, sandbox_policy_changed, previous_cwd, codex_home)
|
||||
(
|
||||
next,
|
||||
sandbox_policy_changed,
|
||||
previous_cwd,
|
||||
codex_home,
|
||||
session_source,
|
||||
)
|
||||
}
|
||||
Err(err) => {
|
||||
drop(state);
|
||||
|
|
@ -2057,6 +2093,7 @@ impl Session {
|
|||
&previous_cwd,
|
||||
&session_configuration.cwd,
|
||||
&codex_home,
|
||||
&session_source,
|
||||
);
|
||||
|
||||
Ok(self
|
||||
|
|
@ -7667,6 +7704,7 @@ mod tests {
|
|||
session_source: SessionSource::Exec,
|
||||
dynamic_tools: Vec::new(),
|
||||
persist_extended_history: false,
|
||||
inherited_shell_snapshot: None,
|
||||
};
|
||||
|
||||
let mut state = SessionState::new(session_configuration);
|
||||
|
|
@ -7760,6 +7798,7 @@ mod tests {
|
|||
session_source: SessionSource::Exec,
|
||||
dynamic_tools: Vec::new(),
|
||||
persist_extended_history: false,
|
||||
inherited_shell_snapshot: None,
|
||||
};
|
||||
|
||||
let mut state = SessionState::new(session_configuration);
|
||||
|
|
@ -8072,6 +8111,7 @@ mod tests {
|
|||
session_source: SessionSource::Exec,
|
||||
dynamic_tools: Vec::new(),
|
||||
persist_extended_history: false,
|
||||
inherited_shell_snapshot: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -8126,6 +8166,7 @@ mod tests {
|
|||
session_source: SessionSource::Exec,
|
||||
dynamic_tools: Vec::new(),
|
||||
persist_extended_history: false,
|
||||
inherited_shell_snapshot: None,
|
||||
};
|
||||
|
||||
let (tx_event, _rx_event) = async_channel::unbounded();
|
||||
|
|
@ -8216,6 +8257,7 @@ mod tests {
|
|||
session_source: SessionSource::Exec,
|
||||
dynamic_tools: Vec::new(),
|
||||
persist_extended_history: false,
|
||||
inherited_shell_snapshot: None,
|
||||
};
|
||||
let per_turn_config = Session::build_per_turn_config(&session_configuration);
|
||||
let model_info = ModelsManager::construct_model_info_offline_for_tests(
|
||||
|
|
@ -8383,6 +8425,7 @@ mod tests {
|
|||
session_source: SessionSource::Exec,
|
||||
dynamic_tools,
|
||||
persist_extended_history: false,
|
||||
inherited_shell_snapshot: None,
|
||||
};
|
||||
let per_turn_config = Session::build_per_turn_config(&session_configuration);
|
||||
let model_info = ModelsManager::construct_model_info_offline_for_tests(
|
||||
|
|
|
|||
|
|
@ -62,6 +62,7 @@ pub(crate) async fn run_codex_thread_interactive(
|
|||
Vec::new(),
|
||||
false,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
let codex = Arc::new(codex);
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ use crate::protocol::EventMsg;
|
|||
use crate::protocol::SessionConfiguredEvent;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::rollout::truncation;
|
||||
use crate::shell_snapshot::ShellSnapshot;
|
||||
use crate::skills::SkillsManager;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::CollaborationModeMask;
|
||||
|
|
@ -479,6 +480,7 @@ impl ThreadManagerState {
|
|||
self.session_source.clone(),
|
||||
false,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
|
@ -490,6 +492,7 @@ impl ThreadManagerState {
|
|||
session_source: SessionSource,
|
||||
persist_extended_history: bool,
|
||||
metrics_service_name: Option<String>,
|
||||
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
|
||||
) -> CodexResult<NewThread> {
|
||||
self.spawn_thread_with_source(
|
||||
config,
|
||||
|
|
@ -500,6 +503,7 @@ impl ThreadManagerState {
|
|||
Vec::new(),
|
||||
persist_extended_history,
|
||||
metrics_service_name,
|
||||
inherited_shell_snapshot,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
|
@ -510,6 +514,7 @@ impl ThreadManagerState {
|
|||
rollout_path: PathBuf,
|
||||
agent_control: AgentControl,
|
||||
session_source: SessionSource,
|
||||
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
|
||||
) -> CodexResult<NewThread> {
|
||||
let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?;
|
||||
self.spawn_thread_with_source(
|
||||
|
|
@ -521,6 +526,7 @@ impl ThreadManagerState {
|
|||
Vec::new(),
|
||||
false,
|
||||
None,
|
||||
inherited_shell_snapshot,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
|
@ -532,6 +538,7 @@ impl ThreadManagerState {
|
|||
agent_control: AgentControl,
|
||||
session_source: SessionSource,
|
||||
persist_extended_history: bool,
|
||||
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
|
||||
) -> CodexResult<NewThread> {
|
||||
self.spawn_thread_with_source(
|
||||
config,
|
||||
|
|
@ -542,6 +549,7 @@ impl ThreadManagerState {
|
|||
Vec::new(),
|
||||
persist_extended_history,
|
||||
None,
|
||||
inherited_shell_snapshot,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
|
@ -567,6 +575,7 @@ impl ThreadManagerState {
|
|||
dynamic_tools,
|
||||
persist_extended_history,
|
||||
metrics_service_name,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
|
@ -582,6 +591,7 @@ impl ThreadManagerState {
|
|||
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
|
||||
persist_extended_history: bool,
|
||||
metrics_service_name: Option<String>,
|
||||
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
|
||||
) -> CodexResult<NewThread> {
|
||||
let watch_registration = self
|
||||
.file_watcher
|
||||
|
|
@ -602,6 +612,7 @@ impl ThreadManagerState {
|
|||
dynamic_tools,
|
||||
persist_extended_history,
|
||||
metrics_service_name,
|
||||
inherited_shell_snapshot,
|
||||
)
|
||||
.await?;
|
||||
self.finalize_thread_spawn(codex, thread_id, watch_registration)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue