This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new edce53582c fix: DagRuns with UPSTREAM_FAILED tasks get stuck in the
backfill. (#36954)
edce53582c is described below
commit edce53582c7548cdfd6bbe9fd7970c8f4def4155
Author: Aleksey Kirilishin <[email protected]>
AuthorDate: Fri Jan 26 22:41:42 2024 +0300
fix: DagRuns with UPSTREAM_FAILED tasks get stuck in the backfill. (#36954)
---
airflow/jobs/backfill_job_runner.py | 8 +++--
.../test_backfill_with_upstream_failed_task.py | 36 ++++++++++++++++++++++
tests/jobs/test_backfill_job.py | 30 +++++++++++++++++-
3 files changed, 70 insertions(+), 4 deletions(-)
diff --git a/airflow/jobs/backfill_job_runner.py
b/airflow/jobs/backfill_job_runner.py
index a18cc1d1ae..7e57979ccd 100644
--- a/airflow/jobs/backfill_job_runner.py
+++ b/airflow/jobs/backfill_job_runner.py
@@ -106,7 +106,7 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
failed: set[TaskInstanceKey] = attr.ib(factory=set)
not_ready: set[TaskInstanceKey] = attr.ib(factory=set)
deadlocked: set[TaskInstance] = attr.ib(factory=set)
- active_runs: list[DagRun] = attr.ib(factory=list)
+ active_runs: set[DagRun] = attr.ib(factory=set)
executed_dag_run_dates: set[pendulum.DateTime] = attr.ib(factory=set)
finished_runs: int = 0
total_runs: int = 0
@@ -518,6 +518,8 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
ti_status.running.pop(key)
# Reset the failed task in backfill to scheduled state
ti.set_state(TaskInstanceState.SCHEDULED,
session=session)
+ if ti.dag_run not in ti_status.active_runs:
+ ti_status.active_runs.add(ti.dag_run)
else:
# Default behaviour which works for subdag.
if ti.state in (TaskInstanceState.FAILED,
TaskInstanceState.UPSTREAM_FAILED):
@@ -738,7 +740,7 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
session.commit()
# update dag run state
- _dag_runs = ti_status.active_runs[:]
+ _dag_runs = ti_status.active_runs.copy()
for run in _dag_runs:
run.update_state(session=session)
if run.state in State.finished_dr_states:
@@ -840,7 +842,7 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
dag_run = self._get_dag_run(dagrun_info, dag, session=session)
if dag_run is not None:
tis_map = self._task_instances_for_dag_run(dag, dag_run,
session=session)
- ti_status.active_runs.append(dag_run)
+ ti_status.active_runs.add(dag_run)
ti_status.to_run.update(tis_map or {})
processed_dag_run_dates = self._process_backfill_task_instances(
diff --git a/tests/dags/test_backfill_with_upstream_failed_task.py
b/tests/dags/test_backfill_with_upstream_failed_task.py
new file mode 100644
index 0000000000..d2cb6353bf
--- /dev/null
+++ b/tests/dags/test_backfill_with_upstream_failed_task.py
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import datetime
+
+from airflow.models.dag import DAG
+from airflow.operators.bash import BashOperator
+
+dag = DAG(
+ dag_id="test_backfill_with_upstream_failed_task",
+ default_args={"retries": 0, "start_date": datetime.datetime(2010, 1, 1)},
+ schedule="0 0 * * *",
+)
+
+failing_task = BashOperator(task_id="failing_task", bash_command="exit 1",
dag=dag)
+downstream_task = BashOperator(task_id="downstream_task", bash_command="echo
1", dag=dag)
+downstream_task.set_upstream(failing_task)
+
+if __name__ == "__main__":
+ dag.cli()
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index 88ce758b57..0802f11aa9 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -1916,7 +1916,7 @@ class TestBackfillJob:
executor = MockExecutor()
ti_status = BackfillJobRunner._DagRunTaskStatus()
- ti_status.active_runs.append(dr)
+ ti_status.active_runs.add(dr)
ti_status.to_run = {ti.key: ti for ti in dr.task_instances}
job = Job(executor=executor)
@@ -2103,3 +2103,31 @@ class TestBackfillJob:
assert dag_run.state == DagRunState.FAILED
dag.clear()
+
+ def test_backfill_failed_dag_with_upstream_failed_task(self, dag_maker):
+ self.dagbag.process_file(str(TEST_DAGS_FOLDER /
"test_backfill_with_upstream_failed_task.py"))
+ dag = self.dagbag.get_dag("test_backfill_with_upstream_failed_task")
+
+ # We have to use the "fake" version of perform_heartbeat due to the
'is_unit_test' check in
+ # the original one. However, instead of using the original version of
perform_heartbeat,
+ # we can simply wait for a LocalExecutor's worker cycle. The approach
with sleep works well now,
+ # but it can be replaced with checking the state of the LocalTaskJob.
+ def fake_perform_heartbeat(*args, **kwargs):
+ import time
+
+ time.sleep(1)
+
+ with mock.patch("airflow.jobs.backfill_job_runner.perform_heartbeat",
fake_perform_heartbeat):
+ job = Job(executor=ExecutorLoader.load_executor("LocalExecutor"))
+ job_runner = BackfillJobRunner(
+ job=job,
+ dag=dag,
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE,
+ rerun_failed_tasks=True,
+ )
+ with pytest.raises(BackfillUnfinished):
+ run_job(job=job, execute_callable=job_runner._execute)
+
+ dr: DagRun = dag.get_last_dagrun()
+ assert dr.state == State.FAILED