This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 982502efaf Remove special handling of backfills in scheduler (#42678)
982502efaf is described below

commit 982502efaff13166d597f97e75357998393db423
Author: Daniel Standish <[email protected]>
AuthorDate: Wed Oct 2 19:31:05 2024 -0700

    Remove special handling of backfills in scheduler (#42678)
    
    Before airflow 3.0, scheduler would completely ignore all backfill runs. 
This PR gets rid of that logic so that backfill runs are treated the same as 
non-backfill. This is a baby step on the way to adding scheduling of backfill 
dag runs into the scheduler.
---
 airflow/jobs/scheduler_job_runner.py | 11 ++---------
 airflow/models/dagrun.py             |  4 ++--
 tests/jobs/test_scheduler_job.py     | 35 ++++++++++++++++++-----------------
 3 files changed, 22 insertions(+), 28 deletions(-)

diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index de6ce5019b..21c871e951 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -342,14 +342,11 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             num_starved_tasks = len(starved_tasks)
             num_starved_tasks_task_dagrun_concurrency = 
len(starved_tasks_task_dagrun_concurrency)
 
-            # Get task instances associated with scheduled
-            # DagRuns which are not backfilled, in the given states,
-            # and the dag is not paused
             query = (
                 select(TI)
                 .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
                 .join(TI.dag_run)
-                .where(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == 
DagRunState.RUNNING)
+                .where(DR.state == DagRunState.RUNNING)
                 .join(TI.dag_model)
                 .where(not_(DM.is_paused))
                 .where(TI.state == TaskInstanceState.SCHEDULED)
@@ -1020,7 +1017,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 .where(
                     DagModel.is_paused == expression.true(),
                     DagRun.state == DagRunState.RUNNING,
-                    DagRun.run_type != DagRunType.BACKFILL_JOB,
                 )
                 .having(DagRun.last_scheduling_decision <= 
func.max(TI.updated_at))
                 .group_by(DagRun)
@@ -1838,10 +1834,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                         .join(TI.queued_by_job)
                         .where(Job.state.is_distinct_from(JobState.RUNNING))
                         .join(TI.dag_run)
-                        .where(
-                            DagRun.run_type != DagRunType.BACKFILL_JOB,
-                            DagRun.state == DagRunState.RUNNING,
-                        )
+                        .where(DagRun.state == DagRunState.RUNNING)
                         .options(load_only(TI.dag_id, TI.task_id, TI.run_id))
                     )
 
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 4928c7fcbd..d1dbeaf41e 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -405,7 +405,7 @@ class DagRun(Base, LoggingMixin):
         query = (
             select(cls)
             .with_hint(cls, "USE INDEX (idx_dag_run_running_dags)", 
dialect_name="mysql")
-            .where(cls.state == DagRunState.RUNNING, cls.run_type != 
DagRunType.BACKFILL_JOB)
+            .where(cls.state == DagRunState.RUNNING)
             .join(DagModel, DagModel.dag_id == cls.dag_id)
             .where(DagModel.is_paused == false(), DagModel.is_active == true())
             .order_by(
@@ -450,7 +450,7 @@ class DagRun(Base, LoggingMixin):
         )
         query = (
             select(cls)
-            .where(cls.state == DagRunState.QUEUED, cls.run_type != 
DagRunType.BACKFILL_JOB)
+            .where(cls.state == DagRunState.QUEUED)
             .join(DagModel, DagModel.dag_id == cls.dag_id)
             .where(DagModel.is_paused == false(), DagModel.is_active == true())
             .outerjoin(running_drs, running_drs.c.dag_id == DagRun.dag_id)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 97d84da9c4..c8b40f6af7 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -580,11 +580,11 @@ class TestSchedulerJob:
         assert State.SCHEDULED == ti1.state
         session.rollback()
 
-    def test_execute_task_instances_backfill_tasks_wont_execute(self, 
dag_maker):
+    def test_execute_task_instances_backfill_tasks_will_execute(self, 
dag_maker):
         """
         Tests that backfill tasks won't get executed.
         """
-        dag_id = 
"SchedulerJobTest.test_execute_task_instances_backfill_tasks_wont_execute"
+        dag_id = 
"SchedulerJobTest.test_execute_task_instances_backfill_tasks_will_execute"
         task_id_1 = "dummy_task"
 
         with dag_maker(dag_id=dag_id):
@@ -606,7 +606,7 @@ class TestSchedulerJob:
         self.job_runner._critical_section_enqueue_task_instances(session)
         session.flush()
         ti1.refresh_from_db()
-        assert State.SCHEDULED == ti1.state
+        assert ti1.state == TaskInstanceState.QUEUED
         session.rollback()
 
     @conf_vars({("scheduler", "standalone_dag_processor"): "False"})
@@ -696,24 +696,25 @@ class TestSchedulerJob:
         self.job_runner = SchedulerJobRunner(job=scheduler_job, 
subdir=os.devnull)
         session = settings.Session()
 
-        dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
-        dr2 = dag_maker.create_dagrun_after(dr1, 
run_type=DagRunType.BACKFILL_JOB, state=State.RUNNING)
+        dr_non_backfill = 
dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+        dr_backfill = dag_maker.create_dagrun_after(
+            dr_non_backfill, run_type=DagRunType.BACKFILL_JOB, 
state=State.RUNNING
+        )
+
+        ti_backfill = dr_backfill.get_task_instance(task1.task_id)
+        ti_non_backfill = dr_non_backfill.get_task_instance(task1.task_id)
 
-        ti_backfill = dr2.get_task_instance(task1.task_id)
-        ti_with_dagrun = dr1.get_task_instance(task1.task_id)
-        # ti_with_paused
         ti_backfill.state = State.SCHEDULED
-        ti_with_dagrun.state = State.SCHEDULED
+        ti_non_backfill.state = State.SCHEDULED
 
-        session.merge(dr2)
+        session.merge(dr_backfill)
         session.merge(ti_backfill)
-        session.merge(ti_with_dagrun)
+        session.merge(ti_non_backfill)
         session.flush()
 
-        res = self.job_runner._executable_task_instances_to_queued(max_tis=32, 
session=session)
-        assert 1 == len(res)
-        res_keys = (x.key for x in res)
-        assert ti_with_dagrun.key in res_keys
+        queued_tis = 
self.job_runner._executable_task_instances_to_queued(max_tis=32, 
session=session)
+        assert len(queued_tis) == 2
+        assert {x.key for x in queued_tis} == {ti_non_backfill.key, 
ti_backfill.key}
         session.rollback()
 
     def test_find_executable_task_instances_pool(self, dag_maker):
@@ -2124,7 +2125,7 @@ class TestSchedulerJob:
         assert ti.state == State.NONE
 
         ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
-        assert ti2.state == State.QUEUED, "Tasks run by Backfill Jobs should 
not be reset"
+        assert ti2.state == State.NONE, "Tasks run by Backfill Jobs should be 
treated the same"
 
     def test_adopt_or_reset_orphaned_tasks_multiple_executors(self, dag_maker, 
mock_executors):
         """Test that with multiple executors configured tasks are sorted 
correctly and handed off to the
@@ -5802,7 +5803,7 @@ class TestSchedulerJob:
         session.flush()
 
         (backfill_run,) = DagRun.find(dag_id=dag.dag_id, 
run_type=DagRunType.BACKFILL_JOB, session=session)
-        assert backfill_run.state == State.RUNNING
+        assert backfill_run.state == State.SUCCESS
 
     def test_asset_orphaning(self, dag_maker, session):
         asset1 = Asset(uri="ds1")

Reply via email to