Preserve persisted thread git info in resume (#13504)

## Summary
- ensure `thread.resume` reuses the stored `gitInfo` instead of
rebuilding it from the live working tree
- persist and apply thread git metadata through the resume flow and add
a regression test covering branch mismatch cases

## Testing
- Not run (not requested)
This commit is contained in:
joeytrasatti-openai 2026-03-04 17:16:43 -08:00 committed by GitHub
parent 95aad8719f
commit 22f4113ac1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 366 additions and 84 deletions

View file

@ -2701,7 +2701,13 @@ impl CodexMessageProcessor {
}
};
let db_summary = read_summary_from_state_db_by_thread_id(&self.config, thread_uuid).await;
let loaded_thread = self.thread_manager.get_thread(thread_uuid).await.ok();
let loaded_thread_state_db = loaded_thread.as_ref().and_then(|thread| thread.state_db());
let db_summary = if let Some(state_db_ctx) = loaded_thread_state_db.as_ref() {
read_summary_from_state_db_context_by_thread_id(Some(state_db_ctx), thread_uuid).await
} else {
read_summary_from_state_db_by_thread_id(&self.config, thread_uuid).await
};
let mut rollout_path = db_summary.as_ref().map(|summary| summary.path.clone());
if rollout_path.is_none() || include_turns {
rollout_path =
@ -2755,7 +2761,7 @@ impl CodexMessageProcessor {
}
}
} else {
let Ok(thread) = self.thread_manager.get_thread(thread_uuid).await else {
let Some(thread) = loaded_thread else {
self.send_invalid_request_error(
request_id,
format!("thread not loaded: {thread_uuid}"),
@ -2960,6 +2966,7 @@ impl CodexMessageProcessor {
};
let fallback_model_provider = config.model_provider_id.clone();
let response_history = thread_history.clone();
match self
.thread_manager
@ -2973,8 +2980,8 @@ impl CodexMessageProcessor {
{
Ok(NewThread {
thread_id,
thread,
session_configured,
..
}) => {
let SessionConfiguredEvent { rollout_path, .. } = session_configured;
let Some(rollout_path) = rollout_path else {
@ -3000,9 +3007,11 @@ impl CodexMessageProcessor {
);
let Some(mut thread) = self
.load_thread_from_rollout_or_send_internal(
.load_thread_from_resume_source_or_send_internal(
request_id.clone(),
thread_id,
thread.as_ref(),
&response_history,
rollout_path.as_path(),
fallback_model_provider.as_str(),
)
@ -3157,6 +3166,20 @@ impl CodexMessageProcessor {
mismatch_details.join("; ")
);
}
let thread_summary = match load_thread_summary_for_rollout(
&self.config,
existing_thread_id,
rollout_path.as_path(),
config_snapshot.model_provider_id.as_str(),
)
.await
{
Ok(thread) => thread,
Err(message) => {
self.send_internal_error(request_id, message).await;
return true;
}
};
let listener_command_tx = {
let thread_state = thread_state.lock().await;
@ -3177,8 +3200,9 @@ impl CodexMessageProcessor {
let command = crate::thread_state::ThreadListenerCommand::SendThreadResumeResponse(
Box::new(crate::thread_state::PendingThreadResumeRequest {
request_id: request_id.clone(),
rollout_path,
rollout_path: rollout_path.clone(),
config_snapshot,
thread_summary,
}),
);
if listener_command_tx.send(command).is_err() {
@ -3276,45 +3300,61 @@ impl CodexMessageProcessor {
}
}
async fn load_thread_from_rollout_or_send_internal(
async fn load_thread_from_resume_source_or_send_internal(
&self,
request_id: ConnectionRequestId,
thread_id: ThreadId,
thread: &CodexThread,
thread_history: &InitialHistory,
rollout_path: &Path,
fallback_provider: &str,
) -> Option<Thread> {
let mut thread = match read_summary_from_rollout(rollout_path, fallback_provider).await {
Ok(summary) => summary_to_thread(summary),
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for thread {thread_id}: {err}",
rollout_path.display()
),
let thread = match thread_history {
InitialHistory::Resumed(resumed) => {
load_thread_summary_for_rollout(
&self.config,
resumed.conversation_id,
resumed.rollout_path.as_path(),
fallback_provider,
)
.await;
.await
}
InitialHistory::Forked(items) => {
let config_snapshot = thread.config_snapshot().await;
let mut thread = build_thread_from_snapshot(
thread_id,
&config_snapshot,
Some(rollout_path.into()),
);
thread.preview = preview_from_rollout_items(items);
Ok(thread)
}
InitialHistory::New => Err(format!(
"failed to build resume response for thread {thread_id}: initial history missing"
)),
};
let mut thread = match thread {
Ok(thread) => thread,
Err(message) => {
self.send_internal_error(request_id, message).await;
return None;
}
};
match read_rollout_items_from_rollout(rollout_path).await {
Ok(items) => {
thread.turns = build_turns_from_rollout_items(&items);
self.attach_thread_name(thread_id, &mut thread).await;
Some(thread)
}
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for thread {thread_id}: {err}",
rollout_path.display()
),
)
.await;
None
}
thread.id = thread_id.to_string();
thread.path = Some(rollout_path.to_path_buf());
let history_items = thread_history.get_rollout_items();
if let Err(message) = populate_resume_turns(
&mut thread,
ResumeTurnSource::HistoryItems(&history_items),
None,
)
.await
{
self.send_internal_error(request_id, message).await;
return None;
}
self.attach_thread_name(thread_id, &mut thread).await;
Some(thread)
}
async fn attach_thread_name(&self, thread_id: ThreadId, thread: &mut Thread) {
@ -6300,29 +6340,26 @@ async fn handle_pending_thread_resume_request(
let request_id = pending.request_id;
let connection_id = request_id.connection_id;
let mut thread = match load_thread_for_running_resume_response(
conversation_id,
pending.rollout_path.as_path(),
pending.config_snapshot.model_provider_id.as_str(),
let mut thread = pending.thread_summary;
if let Err(message) = populate_resume_turns(
&mut thread,
ResumeTurnSource::RolloutPath(pending.rollout_path.as_path()),
active_turn.as_ref(),
)
.await
{
Ok(thread) => thread,
Err(message) => {
outgoing
.send_error(
request_id,
JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message,
data: None,
},
)
.await;
return;
}
};
outgoing
.send_error(
request_id,
JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message,
data: None,
},
)
.await;
return;
}
has_in_progress_turn = has_in_progress_turn
|| thread
@ -6372,6 +6409,38 @@ async fn handle_pending_thread_resume_request(
.await;
}
enum ResumeTurnSource<'a> {
RolloutPath(&'a Path),
HistoryItems(&'a [RolloutItem]),
}
async fn populate_resume_turns(
thread: &mut Thread,
turn_source: ResumeTurnSource<'_>,
active_turn: Option<&Turn>,
) -> std::result::Result<(), String> {
let mut turns = match turn_source {
ResumeTurnSource::RolloutPath(rollout_path) => {
read_rollout_items_from_rollout(rollout_path)
.await
.map(|items| build_turns_from_rollout_items(&items))
.map_err(|err| {
format!(
"failed to load rollout `{}` for thread {}: {err}",
rollout_path.display(),
thread.id
)
})?
}
ResumeTurnSource::HistoryItems(items) => build_turns_from_rollout_items(items),
};
if let Some(active_turn) = active_turn {
merge_turn_history_with_active_turn(&mut turns, active_turn.clone());
}
thread.turns = turns;
Ok(())
}
async fn resolve_pending_server_request(
conversation_id: ThreadId,
thread_state_manager: &ThreadStateManager,
@ -6397,38 +6466,6 @@ async fn resolve_pending_server_request(
.await;
}
async fn load_thread_for_running_resume_response(
conversation_id: ThreadId,
rollout_path: &Path,
fallback_provider: &str,
active_turn: Option<&Turn>,
) -> std::result::Result<Thread, String> {
let mut thread = read_summary_from_rollout(rollout_path, fallback_provider)
.await
.map(summary_to_thread)
.map_err(|err| {
format!(
"failed to load rollout `{}` for thread {conversation_id}: {err}",
rollout_path.display()
)
})?;
let mut turns = read_rollout_items_from_rollout(rollout_path)
.await
.map(|items| build_turns_from_rollout_items(&items))
.map_err(|err| {
format!(
"failed to load rollout `{}` for thread {conversation_id}: {err}",
rollout_path.display()
)
})?;
if let Some(active_turn) = active_turn {
merge_turn_history_with_active_turn(&mut turns, active_turn.clone());
}
thread.turns = turns;
Ok(thread)
}
fn merge_turn_history_with_active_turn(turns: &mut Vec<Turn>, active_turn: Turn) {
turns.retain(|turn| turn.id != active_turn.id);
turns.push(active_turn);
@ -7026,6 +7063,48 @@ fn map_git_info(git_info: &CoreGitInfo) -> ConversationGitInfo {
}
}
async fn load_thread_summary_for_rollout(
config: &Config,
thread_id: ThreadId,
rollout_path: &Path,
fallback_provider: &str,
) -> std::result::Result<Thread, String> {
let mut thread = read_summary_from_rollout(rollout_path, fallback_provider)
.await
.map(summary_to_thread)
.map_err(|err| {
format!(
"failed to load rollout `{}` for thread {thread_id}: {err}",
rollout_path.display()
)
})?;
if let Some(summary) = read_summary_from_state_db_by_thread_id(config, thread_id).await {
merge_mutable_thread_metadata(&mut thread, summary_to_thread(summary));
}
Ok(thread)
}
fn merge_mutable_thread_metadata(thread: &mut Thread, persisted_thread: Thread) {
thread.git_info = persisted_thread.git_info;
}
fn preview_from_rollout_items(items: &[RolloutItem]) -> String {
items
.iter()
.find_map(|item| match item {
RolloutItem::ResponseItem(item) => match codex_core::parse_turn_item(item) {
Some(codex_protocol::items::TurnItem::UserMessage(user)) => Some(user.message()),
_ => None,
},
_ => None,
})
.map(|preview| match preview.find(USER_MESSAGE_BEGIN) {
Some(idx) => preview[idx + USER_MESSAGE_BEGIN.len()..].trim().to_string(),
None => preview,
})
.unwrap_or_default()
}
fn with_thread_spawn_agent_metadata(
source: codex_protocol::protocol::SessionSource,
agent_nickname: Option<String>,

View file

@ -26,6 +26,7 @@ pub(crate) struct PendingThreadResumeRequest {
pub(crate) request_id: ConnectionRequestId,
pub(crate) rollout_path: PathBuf,
pub(crate) config_snapshot: ThreadConfigSnapshot,
pub(crate) thread_summary: codex_app_server_protocol::Thread,
}
// ThreadListenerCommand is used to perform operations in the context of the thread listener, for serialization purposes.

View file

@ -23,6 +23,8 @@ use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::SessionSource;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadMetadataGitInfoUpdateParams;
use codex_app_server_protocol::ThreadMetadataUpdateParams;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
@ -32,19 +34,27 @@ use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput;
use codex_protocol::ThreadId;
use codex_protocol::config_types::Personality;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource as RolloutSessionSource;
use codex_protocol::user_input::ByteRange;
use codex_protocol::user_input::TextElement;
use codex_state::StateRuntime;
use core_test_support::responses;
use core_test_support::skip_if_no_network;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::fs::FileTimes;
use std::path::Path;
use std::path::PathBuf;
use std::process::Command;
use tempfile::TempDir;
use tokio::time::timeout;
use uuid::Uuid;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const CODEX_5_2_INSTRUCTIONS_TEMPLATE_DEFAULT: &str = "You are Codex, a coding agent based on GPT-5. You and the user share the same workspace and collaborate to achieve the user's goals.";
@ -170,6 +180,198 @@ async fn thread_resume_returns_rollout_history() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_resume_prefers_persisted_git_metadata_for_local_threads() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
let config_toml = codex_home.path().join("config.toml");
std::fs::write(
&config_toml,
format!(
r#"
model = "gpt-5.2-codex"
approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[features]
personality = true
sqlite = true
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#,
server.uri()
),
)?;
let repo_path = codex_home.path().join("repo");
std::fs::create_dir_all(&repo_path)?;
assert!(
Command::new("git")
.args(["init"])
.arg(&repo_path)
.status()?
.success()
);
assert!(
Command::new("git")
.current_dir(&repo_path)
.args(["checkout", "-B", "master"])
.status()?
.success()
);
assert!(
Command::new("git")
.current_dir(&repo_path)
.args(["config", "user.name", "Test User"])
.status()?
.success()
);
assert!(
Command::new("git")
.current_dir(&repo_path)
.args(["config", "user.email", "test@example.com"])
.status()?
.success()
);
std::fs::write(repo_path.join("README.md"), "test\n")?;
assert!(
Command::new("git")
.current_dir(&repo_path)
.args(["add", "README.md"])
.status()?
.success()
);
assert!(
Command::new("git")
.current_dir(&repo_path)
.args(["commit", "-m", "initial"])
.status()?
.success()
);
let head_branch = Command::new("git")
.current_dir(&repo_path)
.args(["branch", "--show-current"])
.output()?;
assert_eq!(
String::from_utf8(head_branch.stdout)?.trim(),
"master",
"test repo should stay on master to verify resume ignores live HEAD"
);
let thread_id = Uuid::new_v4().to_string();
let conversation_id = ThreadId::from_string(&thread_id)?;
let rollout_path = rollout_path(codex_home.path(), "2025-01-05T12-00-00", &thread_id);
let rollout_dir = rollout_path.parent().expect("rollout parent directory");
std::fs::create_dir_all(rollout_dir)?;
let session_meta = SessionMeta {
id: conversation_id,
forked_from_id: None,
timestamp: "2025-01-05T12:00:00Z".to_string(),
cwd: repo_path.clone(),
originator: "codex".to_string(),
cli_version: "0.0.0".to_string(),
source: RolloutSessionSource::Cli,
agent_nickname: None,
agent_role: None,
model_provider: Some("mock_provider".to_string()),
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
};
std::fs::write(
&rollout_path,
[
json!({
"timestamp": "2025-01-05T12:00:00Z",
"type": "session_meta",
"payload": serde_json::to_value(SessionMetaLine {
meta: session_meta,
git: None,
})?,
})
.to_string(),
json!({
"timestamp": "2025-01-05T12:00:00Z",
"type": "response_item",
"payload": {
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": "Saved user message"}]
}
})
.to_string(),
json!({
"timestamp": "2025-01-05T12:00:00Z",
"type": "event_msg",
"payload": {
"type": "user_message",
"message": "Saved user message",
"kind": "plain"
}
})
.to_string(),
]
.join("\n")
+ "\n",
)?;
let state_db = StateRuntime::init(
codex_home.path().to_path_buf(),
"mock_provider".into(),
None,
)
.await?;
state_db.mark_backfill_complete(None).await?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let update_id = mcp
.send_thread_metadata_update_request(ThreadMetadataUpdateParams {
thread_id: thread_id.clone(),
git_info: Some(ThreadMetadataGitInfoUpdateParams {
sha: None,
branch: Some(Some("feature/pr-branch".to_string())),
origin_url: None,
}),
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(update_id)),
)
.await??;
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id,
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(
thread
.git_info
.as_ref()
.and_then(|git| git.branch.as_deref()),
Some("feature/pr-branch")
);
Ok(())
}
#[tokio::test]
async fn thread_resume_without_overrides_does_not_change_updated_at_or_mtime() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;