diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index 3635bcb86..f027cac6e 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -2069,7 +2069,7 @@ WHERE kind = 'memory_stage1' } #[tokio::test] - async fn mark_stage1_job_succeeded_no_output_tracks_watermark_without_persisting_output() { + async fn mark_stage1_job_succeeded_no_output_skips_phase2_when_output_was_already_absent() { let codex_home = unique_temp_dir(); let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) .await @@ -2122,23 +2122,129 @@ WHERE kind = 'memory_stage1' .expect("claim stage1 up-to-date"); assert_eq!(up_to_date, Stage1JobClaimOutcome::SkippedUpToDate); + let global_job_row_count = sqlx::query("SELECT COUNT(*) AS count FROM jobs WHERE kind = ?") + .bind("memory_consolidate_global") + .fetch_one(runtime.pool.as_ref()) + .await + .expect("load phase2 job row count") + .try_get::("count") + .expect("phase2 job row count"); + assert_eq!( + global_job_row_count, 0, + "no-output without an existing stage1 output should not enqueue phase2" + ); + let claim_phase2 = runtime .try_claim_global_phase2_job(owner, 3600) .await .expect("claim phase2"); + assert_eq!( + claim_phase2, + Phase2JobClaimOutcome::SkippedNotDirty, + "phase2 should remain clean when no-output deleted nothing" + ); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + + #[tokio::test] + async fn mark_stage1_job_succeeded_no_output_enqueues_phase2_when_deleting_output() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + + let thread_id = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id"); + let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id"); + let owner_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id"); + runtime + .upsert_thread(&test_thread_metadata( + &codex_home, + thread_id, + codex_home.join("workspace"), + )) + .await + .expect("upsert thread"); + + let first_claim = runtime + .try_claim_stage1_job(thread_id, owner, 100, 3600, 64) + .await + .expect("claim initial stage1"); + let first_token = match first_claim { + Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token, + other => panic!("unexpected initial stage1 claim outcome: {other:?}"), + }; + assert!( + runtime + .mark_stage1_job_succeeded(thread_id, first_token.as_str(), 100, "raw", "sum", None) + .await + .expect("mark initial stage1 succeeded"), + "initial stage1 success should create stage1 output" + ); + + let phase2_claim = runtime + .try_claim_global_phase2_job(owner, 3600) + .await + .expect("claim phase2 after initial output"); + let (phase2_token, phase2_input_watermark) = match phase2_claim { + Phase2JobClaimOutcome::Claimed { + ownership_token, + input_watermark, + } => (ownership_token, input_watermark), + other => panic!("unexpected phase2 claim after initial output: {other:?}"), + }; + assert_eq!(phase2_input_watermark, 100); + assert!( + runtime + .mark_global_phase2_job_succeeded(phase2_token.as_str(), phase2_input_watermark) + .await + .expect("mark initial phase2 succeeded"), + "initial phase2 success should clear global dirty state" + ); + + let no_output_claim = runtime + .try_claim_stage1_job(thread_id, owner_b, 101, 3600, 64) + .await + .expect("claim stage1 for no-output delete"); + let no_output_token = match no_output_claim { + Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token, + other => panic!("unexpected no-output stage1 claim outcome: {other:?}"), + }; + assert!( + runtime + .mark_stage1_job_succeeded_no_output(thread_id, no_output_token.as_str()) + .await + .expect("mark stage1 no-output after existing output"), + "no-output should succeed when deleting an existing stage1 output" + ); + + let output_row_count = + sqlx::query("SELECT COUNT(*) AS count FROM stage1_outputs WHERE thread_id = ?") + .bind(thread_id.to_string()) + .fetch_one(runtime.pool.as_ref()) + .await + .expect("load stage1 output count after delete") + .try_get::("count") + .expect("stage1 output count"); + assert_eq!(output_row_count, 0); + + let claim_phase2 = runtime + .try_claim_global_phase2_job(owner, 3600) + .await + .expect("claim phase2 after no-output deletion"); let (phase2_token, phase2_input_watermark) = match claim_phase2 { Phase2JobClaimOutcome::Claimed { ownership_token, input_watermark, } => (ownership_token, input_watermark), - other => panic!("unexpected phase2 claim outcome after no-output success: {other:?}"), + other => panic!("unexpected phase2 claim after no-output deletion: {other:?}"), }; - assert_eq!(phase2_input_watermark, 100); + assert_eq!(phase2_input_watermark, 101); assert!( runtime - .mark_global_phase2_job_succeeded(phase2_token.as_str(), phase2_input_watermark,) + .mark_global_phase2_job_succeeded(phase2_token.as_str(), phase2_input_watermark) .await - .expect("mark phase2 succeeded after no-output") + .expect("mark phase2 succeeded after no-output delete") ); let _ = tokio::fs::remove_dir_all(codex_home).await; diff --git a/codex-rs/state/src/runtime/memories.rs b/codex-rs/state/src/runtime/memories.rs index f2a50e3c9..bad5679ea 100644 --- a/codex-rs/state/src/runtime/memories.rs +++ b/codex-rs/state/src/runtime/memories.rs @@ -498,7 +498,7 @@ WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at /// - sets `status='done'` and `last_success_watermark = input_watermark` /// - deletes any existing `stage1_outputs` row for the thread /// - enqueues/advances the global phase-2 job watermark using the claimed - /// `input_watermark` + /// `input_watermark` only when deleting an existing `stage1_outputs` row pub async fn mark_stage1_job_succeeded_no_output( &self, thread_id: ThreadId, @@ -548,7 +548,7 @@ WHERE kind = ? AND job_key = ? AND ownership_token = ? .await? .try_get::("input_watermark")?; - sqlx::query( + let deleted_rows = sqlx::query( r#" DELETE FROM stage1_outputs WHERE thread_id = ? @@ -556,9 +556,12 @@ WHERE thread_id = ? ) .bind(thread_id.as_str()) .execute(&mut *tx) - .await?; + .await? + .rows_affected(); - enqueue_global_consolidation_with_executor(&mut *tx, source_updated_at).await?; + if deleted_rows > 0 { + enqueue_global_consolidation_with_executor(&mut *tx, source_updated_at).await?; + } tx.commit().await?; Ok(true)