feat: do not enqueue phase 2 if not necessary (#12344)
This commit is contained in:
parent
5a30cd3f92
commit
fd67aba114
2 changed files with 118 additions and 9 deletions
|
|
@ -2069,7 +2069,7 @@ WHERE kind = 'memory_stage1'
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn mark_stage1_job_succeeded_no_output_tracks_watermark_without_persisting_output() {
|
||||
async fn mark_stage1_job_succeeded_no_output_skips_phase2_when_output_was_already_absent() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
|
|
@ -2122,23 +2122,129 @@ WHERE kind = 'memory_stage1'
|
|||
.expect("claim stage1 up-to-date");
|
||||
assert_eq!(up_to_date, Stage1JobClaimOutcome::SkippedUpToDate);
|
||||
|
||||
let global_job_row_count = sqlx::query("SELECT COUNT(*) AS count FROM jobs WHERE kind = ?")
|
||||
.bind("memory_consolidate_global")
|
||||
.fetch_one(runtime.pool.as_ref())
|
||||
.await
|
||||
.expect("load phase2 job row count")
|
||||
.try_get::<i64, _>("count")
|
||||
.expect("phase2 job row count");
|
||||
assert_eq!(
|
||||
global_job_row_count, 0,
|
||||
"no-output without an existing stage1 output should not enqueue phase2"
|
||||
);
|
||||
|
||||
let claim_phase2 = runtime
|
||||
.try_claim_global_phase2_job(owner, 3600)
|
||||
.await
|
||||
.expect("claim phase2");
|
||||
assert_eq!(
|
||||
claim_phase2,
|
||||
Phase2JobClaimOutcome::SkippedNotDirty,
|
||||
"phase2 should remain clean when no-output deleted nothing"
|
||||
);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn mark_stage1_job_succeeded_no_output_enqueues_phase2_when_deleting_output() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let thread_id = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
|
||||
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
|
||||
let owner_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
|
||||
runtime
|
||||
.upsert_thread(&test_thread_metadata(
|
||||
&codex_home,
|
||||
thread_id,
|
||||
codex_home.join("workspace"),
|
||||
))
|
||||
.await
|
||||
.expect("upsert thread");
|
||||
|
||||
let first_claim = runtime
|
||||
.try_claim_stage1_job(thread_id, owner, 100, 3600, 64)
|
||||
.await
|
||||
.expect("claim initial stage1");
|
||||
let first_token = match first_claim {
|
||||
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
|
||||
other => panic!("unexpected initial stage1 claim outcome: {other:?}"),
|
||||
};
|
||||
assert!(
|
||||
runtime
|
||||
.mark_stage1_job_succeeded(thread_id, first_token.as_str(), 100, "raw", "sum", None)
|
||||
.await
|
||||
.expect("mark initial stage1 succeeded"),
|
||||
"initial stage1 success should create stage1 output"
|
||||
);
|
||||
|
||||
let phase2_claim = runtime
|
||||
.try_claim_global_phase2_job(owner, 3600)
|
||||
.await
|
||||
.expect("claim phase2 after initial output");
|
||||
let (phase2_token, phase2_input_watermark) = match phase2_claim {
|
||||
Phase2JobClaimOutcome::Claimed {
|
||||
ownership_token,
|
||||
input_watermark,
|
||||
} => (ownership_token, input_watermark),
|
||||
other => panic!("unexpected phase2 claim after initial output: {other:?}"),
|
||||
};
|
||||
assert_eq!(phase2_input_watermark, 100);
|
||||
assert!(
|
||||
runtime
|
||||
.mark_global_phase2_job_succeeded(phase2_token.as_str(), phase2_input_watermark)
|
||||
.await
|
||||
.expect("mark initial phase2 succeeded"),
|
||||
"initial phase2 success should clear global dirty state"
|
||||
);
|
||||
|
||||
let no_output_claim = runtime
|
||||
.try_claim_stage1_job(thread_id, owner_b, 101, 3600, 64)
|
||||
.await
|
||||
.expect("claim stage1 for no-output delete");
|
||||
let no_output_token = match no_output_claim {
|
||||
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
|
||||
other => panic!("unexpected no-output stage1 claim outcome: {other:?}"),
|
||||
};
|
||||
assert!(
|
||||
runtime
|
||||
.mark_stage1_job_succeeded_no_output(thread_id, no_output_token.as_str())
|
||||
.await
|
||||
.expect("mark stage1 no-output after existing output"),
|
||||
"no-output should succeed when deleting an existing stage1 output"
|
||||
);
|
||||
|
||||
let output_row_count =
|
||||
sqlx::query("SELECT COUNT(*) AS count FROM stage1_outputs WHERE thread_id = ?")
|
||||
.bind(thread_id.to_string())
|
||||
.fetch_one(runtime.pool.as_ref())
|
||||
.await
|
||||
.expect("load stage1 output count after delete")
|
||||
.try_get::<i64, _>("count")
|
||||
.expect("stage1 output count");
|
||||
assert_eq!(output_row_count, 0);
|
||||
|
||||
let claim_phase2 = runtime
|
||||
.try_claim_global_phase2_job(owner, 3600)
|
||||
.await
|
||||
.expect("claim phase2 after no-output deletion");
|
||||
let (phase2_token, phase2_input_watermark) = match claim_phase2 {
|
||||
Phase2JobClaimOutcome::Claimed {
|
||||
ownership_token,
|
||||
input_watermark,
|
||||
} => (ownership_token, input_watermark),
|
||||
other => panic!("unexpected phase2 claim outcome after no-output success: {other:?}"),
|
||||
other => panic!("unexpected phase2 claim after no-output deletion: {other:?}"),
|
||||
};
|
||||
assert_eq!(phase2_input_watermark, 100);
|
||||
assert_eq!(phase2_input_watermark, 101);
|
||||
assert!(
|
||||
runtime
|
||||
.mark_global_phase2_job_succeeded(phase2_token.as_str(), phase2_input_watermark,)
|
||||
.mark_global_phase2_job_succeeded(phase2_token.as_str(), phase2_input_watermark)
|
||||
.await
|
||||
.expect("mark phase2 succeeded after no-output")
|
||||
.expect("mark phase2 succeeded after no-output delete")
|
||||
);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
|
|
|
|||
|
|
@ -498,7 +498,7 @@ WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at
|
|||
/// - sets `status='done'` and `last_success_watermark = input_watermark`
|
||||
/// - deletes any existing `stage1_outputs` row for the thread
|
||||
/// - enqueues/advances the global phase-2 job watermark using the claimed
|
||||
/// `input_watermark`
|
||||
/// `input_watermark` only when deleting an existing `stage1_outputs` row
|
||||
pub async fn mark_stage1_job_succeeded_no_output(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
|
|
@ -548,7 +548,7 @@ WHERE kind = ? AND job_key = ? AND ownership_token = ?
|
|||
.await?
|
||||
.try_get::<i64, _>("input_watermark")?;
|
||||
|
||||
sqlx::query(
|
||||
let deleted_rows = sqlx::query(
|
||||
r#"
|
||||
DELETE FROM stage1_outputs
|
||||
WHERE thread_id = ?
|
||||
|
|
@ -556,9 +556,12 @@ WHERE thread_id = ?
|
|||
)
|
||||
.bind(thread_id.as_str())
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
.await?
|
||||
.rows_affected();
|
||||
|
||||
enqueue_global_consolidation_with_executor(&mut *tx, source_updated_at).await?;
|
||||
if deleted_rows > 0 {
|
||||
enqueue_global_consolidation_with_executor(&mut *tx, source_updated_at).await?;
|
||||
}
|
||||
|
||||
tx.commit().await?;
|
||||
Ok(true)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue