This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 982502efaf Remove special handling of backfills in scheduler (#42678)
982502efaf is described below
commit 982502efaff13166d597f97e75357998393db423
Author: Daniel Standish <[email protected]>
AuthorDate: Wed Oct 2 19:31:05 2024 -0700
Remove special handling of backfills in scheduler (#42678)
Before airflow 3.0, scheduler would completely ignore all backfill runs.
This PR gets rid of that logic so that backfill runs are treated the same as
non-backfill. This is a baby step on the way to adding scheduling of backfill
dag runs into the scheduler.
---
airflow/jobs/scheduler_job_runner.py | 11 ++---------
airflow/models/dagrun.py | 4 ++--
tests/jobs/test_scheduler_job.py | 35 ++++++++++++++++++-----------------
3 files changed, 22 insertions(+), 28 deletions(-)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index de6ce5019b..21c871e951 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -342,14 +342,11 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
num_starved_tasks = len(starved_tasks)
num_starved_tasks_task_dagrun_concurrency =
len(starved_tasks_task_dagrun_concurrency)
- # Get task instances associated with scheduled
- # DagRuns which are not backfilled, in the given states,
- # and the dag is not paused
query = (
select(TI)
.with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
.join(TI.dag_run)
- .where(DR.run_type != DagRunType.BACKFILL_JOB, DR.state ==
DagRunState.RUNNING)
+ .where(DR.state == DagRunState.RUNNING)
.join(TI.dag_model)
.where(not_(DM.is_paused))
.where(TI.state == TaskInstanceState.SCHEDULED)
@@ -1020,7 +1017,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
.where(
DagModel.is_paused == expression.true(),
DagRun.state == DagRunState.RUNNING,
- DagRun.run_type != DagRunType.BACKFILL_JOB,
)
.having(DagRun.last_scheduling_decision <=
func.max(TI.updated_at))
.group_by(DagRun)
@@ -1838,10 +1834,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
.join(TI.queued_by_job)
.where(Job.state.is_distinct_from(JobState.RUNNING))
.join(TI.dag_run)
- .where(
- DagRun.run_type != DagRunType.BACKFILL_JOB,
- DagRun.state == DagRunState.RUNNING,
- )
+ .where(DagRun.state == DagRunState.RUNNING)
.options(load_only(TI.dag_id, TI.task_id, TI.run_id))
)
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 4928c7fcbd..d1dbeaf41e 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -405,7 +405,7 @@ class DagRun(Base, LoggingMixin):
query = (
select(cls)
.with_hint(cls, "USE INDEX (idx_dag_run_running_dags)",
dialect_name="mysql")
- .where(cls.state == DagRunState.RUNNING, cls.run_type !=
DagRunType.BACKFILL_JOB)
+ .where(cls.state == DagRunState.RUNNING)
.join(DagModel, DagModel.dag_id == cls.dag_id)
.where(DagModel.is_paused == false(), DagModel.is_active == true())
.order_by(
@@ -450,7 +450,7 @@ class DagRun(Base, LoggingMixin):
)
query = (
select(cls)
- .where(cls.state == DagRunState.QUEUED, cls.run_type !=
DagRunType.BACKFILL_JOB)
+ .where(cls.state == DagRunState.QUEUED)
.join(DagModel, DagModel.dag_id == cls.dag_id)
.where(DagModel.is_paused == false(), DagModel.is_active == true())
.outerjoin(running_drs, running_drs.c.dag_id == DagRun.dag_id)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 97d84da9c4..c8b40f6af7 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -580,11 +580,11 @@ class TestSchedulerJob:
assert State.SCHEDULED == ti1.state
session.rollback()
- def test_execute_task_instances_backfill_tasks_wont_execute(self,
dag_maker):
+ def test_execute_task_instances_backfill_tasks_will_execute(self,
dag_maker):
"""
Tests that backfill tasks won't get executed.
"""
- dag_id =
"SchedulerJobTest.test_execute_task_instances_backfill_tasks_wont_execute"
+ dag_id =
"SchedulerJobTest.test_execute_task_instances_backfill_tasks_will_execute"
task_id_1 = "dummy_task"
with dag_maker(dag_id=dag_id):
@@ -606,7 +606,7 @@ class TestSchedulerJob:
self.job_runner._critical_section_enqueue_task_instances(session)
session.flush()
ti1.refresh_from_db()
- assert State.SCHEDULED == ti1.state
+ assert ti1.state == TaskInstanceState.QUEUED
session.rollback()
@conf_vars({("scheduler", "standalone_dag_processor"): "False"})
@@ -696,24 +696,25 @@ class TestSchedulerJob:
self.job_runner = SchedulerJobRunner(job=scheduler_job,
subdir=os.devnull)
session = settings.Session()
- dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
- dr2 = dag_maker.create_dagrun_after(dr1,
run_type=DagRunType.BACKFILL_JOB, state=State.RUNNING)
+ dr_non_backfill =
dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+ dr_backfill = dag_maker.create_dagrun_after(
+ dr_non_backfill, run_type=DagRunType.BACKFILL_JOB,
state=State.RUNNING
+ )
+
+ ti_backfill = dr_backfill.get_task_instance(task1.task_id)
+ ti_non_backfill = dr_non_backfill.get_task_instance(task1.task_id)
- ti_backfill = dr2.get_task_instance(task1.task_id)
- ti_with_dagrun = dr1.get_task_instance(task1.task_id)
- # ti_with_paused
ti_backfill.state = State.SCHEDULED
- ti_with_dagrun.state = State.SCHEDULED
+ ti_non_backfill.state = State.SCHEDULED
- session.merge(dr2)
+ session.merge(dr_backfill)
session.merge(ti_backfill)
- session.merge(ti_with_dagrun)
+ session.merge(ti_non_backfill)
session.flush()
- res = self.job_runner._executable_task_instances_to_queued(max_tis=32,
session=session)
- assert 1 == len(res)
- res_keys = (x.key for x in res)
- assert ti_with_dagrun.key in res_keys
+ queued_tis =
self.job_runner._executable_task_instances_to_queued(max_tis=32,
session=session)
+ assert len(queued_tis) == 2
+ assert {x.key for x in queued_tis} == {ti_non_backfill.key,
ti_backfill.key}
session.rollback()
def test_find_executable_task_instances_pool(self, dag_maker):
@@ -2124,7 +2125,7 @@ class TestSchedulerJob:
assert ti.state == State.NONE
ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
- assert ti2.state == State.QUEUED, "Tasks run by Backfill Jobs should
not be reset"
+ assert ti2.state == State.NONE, "Tasks run by Backfill Jobs should be
treated the same"
def test_adopt_or_reset_orphaned_tasks_multiple_executors(self, dag_maker,
mock_executors):
"""Test that with multiple executors configured tasks are sorted
correctly and handed off to the
@@ -5802,7 +5803,7 @@ class TestSchedulerJob:
session.flush()
(backfill_run,) = DagRun.find(dag_id=dag.dag_id,
run_type=DagRunType.BACKFILL_JOB, session=session)
- assert backfill_run.state == State.RUNNING
+ assert backfill_run.state == State.SUCCESS
def test_asset_orphaning(self, dag_maker, session):
asset1 = Asset(uri="ds1")