This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 5c507f9f66574e47e897224030b9444d7d0d8b95 Author: Rahul Vats <[email protected]> AuthorDate: Wed May 14 16:26:44 2025 +0530 Filter query to update the dag_run table with backfill details, using a condition on dag_id (#50577) (cherry picked from commit b299c431631d7cc88ce7919c8ed481e3ed3cc38d) --- airflow-core/src/airflow/models/backfill.py | 7 +++++-- airflow-core/tests/unit/models/test_backfill.py | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/models/backfill.py b/airflow-core/src/airflow/models/backfill.py index 640b915a086..311c7b3f6bd 100644 --- a/airflow-core/src/airflow/models/backfill.py +++ b/airflow-core/src/airflow/models/backfill.py @@ -299,7 +299,10 @@ def _create_backfill_dag_run( return lock = session.execute( with_row_locks( - query=select(DagRun).where(DagRun.logical_date == info.logical_date), + query=select(DagRun).where( + DagRun.logical_date == info.logical_date, + DagRun.dag_id == dag.dag_id, + ), session=session, skip_locked=True, ) @@ -403,7 +406,7 @@ def _handle_clear_run(session, dag, dr, info, backfill_id, sort_ordinal): # Update backfill_id and run_type in DagRun table session.execute( update(DagRun) - .where(DagRun.logical_date == info.logical_date) + .where(DagRun.logical_date == info.logical_date, DagRun.dag_id == dag.dag_id) .values( backfill_id=backfill_id, run_type=DagRunType.BACKFILL_JOB, diff --git a/airflow-core/tests/unit/models/test_backfill.py b/airflow-core/tests/unit/models/test_backfill.py index 2c52c51f429..d5e86cb949b 100644 --- a/airflow-core/tests/unit/models/test_backfill.py +++ b/airflow-core/tests/unit/models/test_backfill.py @@ -284,7 +284,7 @@ def test_reprocess_behavior(reprocess_behavior, num_in_b, exc_reasons, dag_maker query = ( select(DagRun) .join(BackfillDagRun.dag_run) - .where(BackfillDagRun.backfill_id == b.id) + .where(BackfillDagRun.backfill_id == b.id, DagRun.dag_id == dag.dag_id) .order_by(BackfillDagRun.sort_ordinal) ) # these are all the dag runs that are part of this backfill
