From 22f4113ac15673fe7e70f9651fa9f19df739d2a1 Mon Sep 17 00:00:00 2001 From: joeytrasatti-openai Date: Wed, 4 Mar 2026 17:16:43 -0800 Subject: [PATCH] Preserve persisted thread git info in resume (#13504) ## Summary - ensure `thread.resume` reuses the stored `gitInfo` instead of rebuilding it from the live working tree - persist and apply thread git metadata through the resume flow and add a regression test covering branch mismatch cases ## Testing - Not run (not requested) --- .../app-server/src/codex_message_processor.rs | 247 ++++++++++++------ codex-rs/app-server/src/thread_state.rs | 1 + .../tests/suite/v2/thread_resume.rs | 202 ++++++++++++++ 3 files changed, 366 insertions(+), 84 deletions(-) diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 4ad15a960..2f3660239 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -2701,7 +2701,13 @@ impl CodexMessageProcessor { } }; - let db_summary = read_summary_from_state_db_by_thread_id(&self.config, thread_uuid).await; + let loaded_thread = self.thread_manager.get_thread(thread_uuid).await.ok(); + let loaded_thread_state_db = loaded_thread.as_ref().and_then(|thread| thread.state_db()); + let db_summary = if let Some(state_db_ctx) = loaded_thread_state_db.as_ref() { + read_summary_from_state_db_context_by_thread_id(Some(state_db_ctx), thread_uuid).await + } else { + read_summary_from_state_db_by_thread_id(&self.config, thread_uuid).await + }; let mut rollout_path = db_summary.as_ref().map(|summary| summary.path.clone()); if rollout_path.is_none() || include_turns { rollout_path = @@ -2755,7 +2761,7 @@ impl CodexMessageProcessor { } } } else { - let Ok(thread) = self.thread_manager.get_thread(thread_uuid).await else { + let Some(thread) = loaded_thread else { self.send_invalid_request_error( request_id, format!("thread not loaded: {thread_uuid}"), @@ -2960,6 +2966,7 @@ impl CodexMessageProcessor { }; let fallback_model_provider = config.model_provider_id.clone(); + let response_history = thread_history.clone(); match self .thread_manager @@ -2973,8 +2980,8 @@ impl CodexMessageProcessor { { Ok(NewThread { thread_id, + thread, session_configured, - .. }) => { let SessionConfiguredEvent { rollout_path, .. } = session_configured; let Some(rollout_path) = rollout_path else { @@ -3000,9 +3007,11 @@ impl CodexMessageProcessor { ); let Some(mut thread) = self - .load_thread_from_rollout_or_send_internal( + .load_thread_from_resume_source_or_send_internal( request_id.clone(), thread_id, + thread.as_ref(), + &response_history, rollout_path.as_path(), fallback_model_provider.as_str(), ) @@ -3157,6 +3166,20 @@ impl CodexMessageProcessor { mismatch_details.join("; ") ); } + let thread_summary = match load_thread_summary_for_rollout( + &self.config, + existing_thread_id, + rollout_path.as_path(), + config_snapshot.model_provider_id.as_str(), + ) + .await + { + Ok(thread) => thread, + Err(message) => { + self.send_internal_error(request_id, message).await; + return true; + } + }; let listener_command_tx = { let thread_state = thread_state.lock().await; @@ -3177,8 +3200,9 @@ impl CodexMessageProcessor { let command = crate::thread_state::ThreadListenerCommand::SendThreadResumeResponse( Box::new(crate::thread_state::PendingThreadResumeRequest { request_id: request_id.clone(), - rollout_path, + rollout_path: rollout_path.clone(), config_snapshot, + thread_summary, }), ); if listener_command_tx.send(command).is_err() { @@ -3276,45 +3300,61 @@ impl CodexMessageProcessor { } } - async fn load_thread_from_rollout_or_send_internal( + async fn load_thread_from_resume_source_or_send_internal( &self, request_id: ConnectionRequestId, thread_id: ThreadId, + thread: &CodexThread, + thread_history: &InitialHistory, rollout_path: &Path, fallback_provider: &str, ) -> Option { - let mut thread = match read_summary_from_rollout(rollout_path, fallback_provider).await { - Ok(summary) => summary_to_thread(summary), - Err(err) => { - self.send_internal_error( - request_id, - format!( - "failed to load rollout `{}` for thread {thread_id}: {err}", - rollout_path.display() - ), + let thread = match thread_history { + InitialHistory::Resumed(resumed) => { + load_thread_summary_for_rollout( + &self.config, + resumed.conversation_id, + resumed.rollout_path.as_path(), + fallback_provider, ) - .await; + .await + } + InitialHistory::Forked(items) => { + let config_snapshot = thread.config_snapshot().await; + let mut thread = build_thread_from_snapshot( + thread_id, + &config_snapshot, + Some(rollout_path.into()), + ); + thread.preview = preview_from_rollout_items(items); + Ok(thread) + } + InitialHistory::New => Err(format!( + "failed to build resume response for thread {thread_id}: initial history missing" + )), + }; + let mut thread = match thread { + Ok(thread) => thread, + Err(message) => { + self.send_internal_error(request_id, message).await; return None; } }; - match read_rollout_items_from_rollout(rollout_path).await { - Ok(items) => { - thread.turns = build_turns_from_rollout_items(&items); - self.attach_thread_name(thread_id, &mut thread).await; - Some(thread) - } - Err(err) => { - self.send_internal_error( - request_id, - format!( - "failed to load rollout `{}` for thread {thread_id}: {err}", - rollout_path.display() - ), - ) - .await; - None - } + thread.id = thread_id.to_string(); + thread.path = Some(rollout_path.to_path_buf()); + let history_items = thread_history.get_rollout_items(); + if let Err(message) = populate_resume_turns( + &mut thread, + ResumeTurnSource::HistoryItems(&history_items), + None, + ) + .await + { + self.send_internal_error(request_id, message).await; + return None; } + self.attach_thread_name(thread_id, &mut thread).await; + Some(thread) } async fn attach_thread_name(&self, thread_id: ThreadId, thread: &mut Thread) { @@ -6300,29 +6340,26 @@ async fn handle_pending_thread_resume_request( let request_id = pending.request_id; let connection_id = request_id.connection_id; - let mut thread = match load_thread_for_running_resume_response( - conversation_id, - pending.rollout_path.as_path(), - pending.config_snapshot.model_provider_id.as_str(), + let mut thread = pending.thread_summary; + if let Err(message) = populate_resume_turns( + &mut thread, + ResumeTurnSource::RolloutPath(pending.rollout_path.as_path()), active_turn.as_ref(), ) .await { - Ok(thread) => thread, - Err(message) => { - outgoing - .send_error( - request_id, - JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message, - data: None, - }, - ) - .await; - return; - } - }; + outgoing + .send_error( + request_id, + JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message, + data: None, + }, + ) + .await; + return; + } has_in_progress_turn = has_in_progress_turn || thread @@ -6372,6 +6409,38 @@ async fn handle_pending_thread_resume_request( .await; } +enum ResumeTurnSource<'a> { + RolloutPath(&'a Path), + HistoryItems(&'a [RolloutItem]), +} + +async fn populate_resume_turns( + thread: &mut Thread, + turn_source: ResumeTurnSource<'_>, + active_turn: Option<&Turn>, +) -> std::result::Result<(), String> { + let mut turns = match turn_source { + ResumeTurnSource::RolloutPath(rollout_path) => { + read_rollout_items_from_rollout(rollout_path) + .await + .map(|items| build_turns_from_rollout_items(&items)) + .map_err(|err| { + format!( + "failed to load rollout `{}` for thread {}: {err}", + rollout_path.display(), + thread.id + ) + })? + } + ResumeTurnSource::HistoryItems(items) => build_turns_from_rollout_items(items), + }; + if let Some(active_turn) = active_turn { + merge_turn_history_with_active_turn(&mut turns, active_turn.clone()); + } + thread.turns = turns; + Ok(()) +} + async fn resolve_pending_server_request( conversation_id: ThreadId, thread_state_manager: &ThreadStateManager, @@ -6397,38 +6466,6 @@ async fn resolve_pending_server_request( .await; } -async fn load_thread_for_running_resume_response( - conversation_id: ThreadId, - rollout_path: &Path, - fallback_provider: &str, - active_turn: Option<&Turn>, -) -> std::result::Result { - let mut thread = read_summary_from_rollout(rollout_path, fallback_provider) - .await - .map(summary_to_thread) - .map_err(|err| { - format!( - "failed to load rollout `{}` for thread {conversation_id}: {err}", - rollout_path.display() - ) - })?; - - let mut turns = read_rollout_items_from_rollout(rollout_path) - .await - .map(|items| build_turns_from_rollout_items(&items)) - .map_err(|err| { - format!( - "failed to load rollout `{}` for thread {conversation_id}: {err}", - rollout_path.display() - ) - })?; - if let Some(active_turn) = active_turn { - merge_turn_history_with_active_turn(&mut turns, active_turn.clone()); - } - thread.turns = turns; - Ok(thread) -} - fn merge_turn_history_with_active_turn(turns: &mut Vec, active_turn: Turn) { turns.retain(|turn| turn.id != active_turn.id); turns.push(active_turn); @@ -7026,6 +7063,48 @@ fn map_git_info(git_info: &CoreGitInfo) -> ConversationGitInfo { } } +async fn load_thread_summary_for_rollout( + config: &Config, + thread_id: ThreadId, + rollout_path: &Path, + fallback_provider: &str, +) -> std::result::Result { + let mut thread = read_summary_from_rollout(rollout_path, fallback_provider) + .await + .map(summary_to_thread) + .map_err(|err| { + format!( + "failed to load rollout `{}` for thread {thread_id}: {err}", + rollout_path.display() + ) + })?; + if let Some(summary) = read_summary_from_state_db_by_thread_id(config, thread_id).await { + merge_mutable_thread_metadata(&mut thread, summary_to_thread(summary)); + } + Ok(thread) +} + +fn merge_mutable_thread_metadata(thread: &mut Thread, persisted_thread: Thread) { + thread.git_info = persisted_thread.git_info; +} + +fn preview_from_rollout_items(items: &[RolloutItem]) -> String { + items + .iter() + .find_map(|item| match item { + RolloutItem::ResponseItem(item) => match codex_core::parse_turn_item(item) { + Some(codex_protocol::items::TurnItem::UserMessage(user)) => Some(user.message()), + _ => None, + }, + _ => None, + }) + .map(|preview| match preview.find(USER_MESSAGE_BEGIN) { + Some(idx) => preview[idx + USER_MESSAGE_BEGIN.len()..].trim().to_string(), + None => preview, + }) + .unwrap_or_default() +} + fn with_thread_spawn_agent_metadata( source: codex_protocol::protocol::SessionSource, agent_nickname: Option, diff --git a/codex-rs/app-server/src/thread_state.rs b/codex-rs/app-server/src/thread_state.rs index 915cc0fd7..5176fe133 100644 --- a/codex-rs/app-server/src/thread_state.rs +++ b/codex-rs/app-server/src/thread_state.rs @@ -26,6 +26,7 @@ pub(crate) struct PendingThreadResumeRequest { pub(crate) request_id: ConnectionRequestId, pub(crate) rollout_path: PathBuf, pub(crate) config_snapshot: ThreadConfigSnapshot, + pub(crate) thread_summary: codex_app_server_protocol::Thread, } // ThreadListenerCommand is used to perform operations in the context of the thread listener, for serialization purposes. diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index 9628a7816..ca85a0ca4 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -23,6 +23,8 @@ use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::SessionSource; use codex_app_server_protocol::ThreadItem; +use codex_app_server_protocol::ThreadMetadataGitInfoUpdateParams; +use codex_app_server_protocol::ThreadMetadataUpdateParams; use codex_app_server_protocol::ThreadResumeParams; use codex_app_server_protocol::ThreadResumeResponse; use codex_app_server_protocol::ThreadStartParams; @@ -32,19 +34,27 @@ use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStartResponse; use codex_app_server_protocol::TurnStatus; use codex_app_server_protocol::UserInput; +use codex_protocol::ThreadId; use codex_protocol::config_types::Personality; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::SessionMeta; +use codex_protocol::protocol::SessionMetaLine; +use codex_protocol::protocol::SessionSource as RolloutSessionSource; use codex_protocol::user_input::ByteRange; use codex_protocol::user_input::TextElement; +use codex_state::StateRuntime; use core_test_support::responses; use core_test_support::skip_if_no_network; use pretty_assertions::assert_eq; +use serde_json::json; use std::fs::FileTimes; use std::path::Path; use std::path::PathBuf; +use std::process::Command; use tempfile::TempDir; use tokio::time::timeout; +use uuid::Uuid; const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); const CODEX_5_2_INSTRUCTIONS_TEMPLATE_DEFAULT: &str = "You are Codex, a coding agent based on GPT-5. You and the user share the same workspace and collaborate to achieve the user's goals."; @@ -170,6 +180,198 @@ async fn thread_resume_returns_rollout_history() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_resume_prefers_persisted_git_metadata_for_local_threads() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + let config_toml = codex_home.path().join("config.toml"); + std::fs::write( + &config_toml, + format!( + r#" +model = "gpt-5.2-codex" +approval_policy = "never" +sandbox_mode = "read-only" + +model_provider = "mock_provider" + +[features] +personality = true +sqlite = true + +[model_providers.mock_provider] +name = "Mock provider for test" +base_url = "{}/v1" +wire_api = "responses" +request_max_retries = 0 +stream_max_retries = 0 +"#, + server.uri() + ), + )?; + + let repo_path = codex_home.path().join("repo"); + std::fs::create_dir_all(&repo_path)?; + assert!( + Command::new("git") + .args(["init"]) + .arg(&repo_path) + .status()? + .success() + ); + assert!( + Command::new("git") + .current_dir(&repo_path) + .args(["checkout", "-B", "master"]) + .status()? + .success() + ); + assert!( + Command::new("git") + .current_dir(&repo_path) + .args(["config", "user.name", "Test User"]) + .status()? + .success() + ); + assert!( + Command::new("git") + .current_dir(&repo_path) + .args(["config", "user.email", "test@example.com"]) + .status()? + .success() + ); + std::fs::write(repo_path.join("README.md"), "test\n")?; + assert!( + Command::new("git") + .current_dir(&repo_path) + .args(["add", "README.md"]) + .status()? + .success() + ); + assert!( + Command::new("git") + .current_dir(&repo_path) + .args(["commit", "-m", "initial"]) + .status()? + .success() + ); + let head_branch = Command::new("git") + .current_dir(&repo_path) + .args(["branch", "--show-current"]) + .output()?; + assert_eq!( + String::from_utf8(head_branch.stdout)?.trim(), + "master", + "test repo should stay on master to verify resume ignores live HEAD" + ); + + let thread_id = Uuid::new_v4().to_string(); + let conversation_id = ThreadId::from_string(&thread_id)?; + let rollout_path = rollout_path(codex_home.path(), "2025-01-05T12-00-00", &thread_id); + let rollout_dir = rollout_path.parent().expect("rollout parent directory"); + std::fs::create_dir_all(rollout_dir)?; + let session_meta = SessionMeta { + id: conversation_id, + forked_from_id: None, + timestamp: "2025-01-05T12:00:00Z".to_string(), + cwd: repo_path.clone(), + originator: "codex".to_string(), + cli_version: "0.0.0".to_string(), + source: RolloutSessionSource::Cli, + agent_nickname: None, + agent_role: None, + model_provider: Some("mock_provider".to_string()), + base_instructions: None, + dynamic_tools: None, + memory_mode: None, + }; + std::fs::write( + &rollout_path, + [ + json!({ + "timestamp": "2025-01-05T12:00:00Z", + "type": "session_meta", + "payload": serde_json::to_value(SessionMetaLine { + meta: session_meta, + git: None, + })?, + }) + .to_string(), + json!({ + "timestamp": "2025-01-05T12:00:00Z", + "type": "response_item", + "payload": { + "type": "message", + "role": "user", + "content": [{"type": "input_text", "text": "Saved user message"}] + } + }) + .to_string(), + json!({ + "timestamp": "2025-01-05T12:00:00Z", + "type": "event_msg", + "payload": { + "type": "user_message", + "message": "Saved user message", + "kind": "plain" + } + }) + .to_string(), + ] + .join("\n") + + "\n", + )?; + let state_db = StateRuntime::init( + codex_home.path().to_path_buf(), + "mock_provider".into(), + None, + ) + .await?; + state_db.mark_backfill_complete(None).await?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let update_id = mcp + .send_thread_metadata_update_request(ThreadMetadataUpdateParams { + thread_id: thread_id.clone(), + git_info: Some(ThreadMetadataGitInfoUpdateParams { + sha: None, + branch: Some(Some("feature/pr-branch".to_string())), + origin_url: None, + }), + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(update_id)), + ) + .await??; + + let resume_id = mcp + .send_thread_resume_request(ThreadResumeParams { + thread_id, + ..Default::default() + }) + .await?; + let resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(resume_id)), + ) + .await??; + let ThreadResumeResponse { thread, .. } = to_response::(resume_resp)?; + + assert_eq!( + thread + .git_info + .as_ref() + .and_then(|git| git.branch.as_deref()), + Some("feature/pr-branch") + ); + + Ok(()) +} + #[tokio::test] async fn thread_resume_without_overrides_does_not_change_updated_at_or_mtime() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await;