Copilot commented on code in PR #64109:
URL: https://github.com/apache/airflow/pull/64109#discussion_r3066479270
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -679,9 +679,27 @@ def get_queued_dag_runs_to_set_running(cls, session:
Session) -> ScalarResult[Da
.subquery()
)
+ available_dagruns_rn = (
+ select(
+ DagRun.dag_id,
+ DagRun.id,
+ func.row_number()
+ .over(partition_by=[DagRun.dag_id, DagRun.backfill_id],
order_by=DagRun.logical_date)
Review Comment:
The window `order_by` uses only `DagRun.logical_date`, which may not be a
total ordering (ties can occur), making the chosen dagruns nondeterministic
across DB engines/plans and potentially causing flaky behavior. Add a stable
tie-breaker (e.g., also order by `DagRun.id` or `DagRun.run_after`) so
row-number assignment is deterministic within each `(dag_id, backfill_id)`
partition.
```suggestion
.over(
partition_by=[DagRun.dag_id, DagRun.backfill_id],
order_by=[DagRun.logical_date, DagRun.id],
)
```
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -5881,41 +5969,55 @@ def _running_counts():
EmptyOperator(task_id="mytask")
dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED,
state=State.QUEUED)
- for _ in range(9):
+ for _ in range(29):
dr = dag_maker.create_dagrun_after(dr,
run_type=DagRunType.SCHEDULED, state=State.QUEUED)
# initial state -- nothing is running
assert dag1_non_b_running == 0
assert dag1_b_running == 0
assert total_running == 0
- assert session.scalar(select(func.count(DagRun.id))) == 46
+ assert session.scalar(select(func.count(DagRun.id))) == 66
assert session.scalar(select(func.count()).where(DagRun.dag_id ==
dag1_dag_id)) == 36
# now let's run it once
self.job_runner._start_queued_dagruns(session)
session.flush()
# after running the scheduler one time, observe that only one dag run
is started
- # this is because there are 30 runs for dag 1 so neither the backfills
nor
+ # and 3 backfill dagruns are started
+ # this is because there are 30 dags, most of which get filtered due to
max_active_runs
+ # and so due to the default dagruns to examine, we look at the first
20 dags which CAN be run
+ # according to the max_active_runs parameter, meaning 3 backfill runs
will start, 1 non backfill and
+ # all dagruns of dag2
# any runs for dag2 get started
assert DagRun.DEFAULT_DAGRUNS_TO_EXAMINE == 20
dag1_non_b_running, dag1_b_running, total_running = _running_counts()
assert dag1_non_b_running == 1
- assert dag1_b_running == 0
- assert total_running == 1
- assert session.scalar(select(func.count()).select_from(DagRun)) == 46
+ assert dag1_b_running == 3
+ assert total_running == 20
+ assert session.scalar(select(func.count()).select_from(DagRun)) == 66
assert session.scalar(select(func.count()).where(DagRun.dag_id ==
dag1_dag_id)) == 36
+ # now we finish all lower priority backfill tasks, and observe new
higher priority tasks are started
+ session.execute(
+ update(DagRun)
+ .where(DagRun.dag_id == "test_dag2", DagRun.state ==
DagRunState.RUNNING)
+ .values(state=DagRunState.SUCCESS)
+ )
+ session.commit()
Review Comment:
Calling `session.commit()` inside a unit test can break transactional test
isolation (fixtures that rely on nested transactions/rollbacks) and is usually
unnecessary here since the updated rows are read again within the same session.
Prefer removing the commit and relying on `flush()` (or keep it as a `flush()`
only) so the state transition is visible without finalizing the transaction.
```suggestion
```
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -5881,41 +5969,55 @@ def _running_counts():
EmptyOperator(task_id="mytask")
dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED,
state=State.QUEUED)
- for _ in range(9):
+ for _ in range(29):
dr = dag_maker.create_dagrun_after(dr,
run_type=DagRunType.SCHEDULED, state=State.QUEUED)
# initial state -- nothing is running
assert dag1_non_b_running == 0
assert dag1_b_running == 0
assert total_running == 0
- assert session.scalar(select(func.count(DagRun.id))) == 46
+ assert session.scalar(select(func.count(DagRun.id))) == 66
assert session.scalar(select(func.count()).where(DagRun.dag_id ==
dag1_dag_id)) == 36
# now let's run it once
self.job_runner._start_queued_dagruns(session)
session.flush()
# after running the scheduler one time, observe that only one dag run
is started
- # this is because there are 30 runs for dag 1 so neither the backfills
nor
+ # and 3 backfill dagruns are started
+ # this is because there are 30 dags, most of which get filtered due to
max_active_runs
+ # and so due to the default dagruns to examine, we look at the first
20 dags which CAN be run
+ # according to the max_active_runs parameter, meaning 3 backfill runs
will start, 1 non backfill and
+ # all dagruns of dag2
# any runs for dag2 get started
Review Comment:
These explanatory comments repeatedly say 'dags' where they appear to mean
'dagruns' (e.g., '30 dags', 'first 20 dags'), which makes the rationale hard to
follow. Clarifying the terminology here (dag vs dagrun) would prevent
misunderstanding when debugging scheduler selection behavior.
```suggestion
# this is because there are 30 queued dagruns, many of which get
filtered because their DAGs
# have already reached max_active_runs
# and so due to the default dagruns-to-examine limit, we look at the
first 20 dagruns that CAN be run
# according to the max_active_runs parameter, meaning 3 backfill
runs will start, 1 non-backfill,
# and all runnable dagruns for dag2
```
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -3290,6 +3290,94 @@ def
test_runs_are_created_after_max_active_runs_was_reached(self, dag_maker, ses
dag_runs = DagRun.find(dag_id=dag.dag_id, session=session)
assert len(dag_runs) == 2
+ def test_runs_are_not_starved_by_max_active_runs_limit(self, dag_maker,
session):
+ """
+ Test that dagruns are not starved by max_active_runs
+ """
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+
+ dag_ids = ["dag1", "dag2", "dag3"]
+
+ max_active_runs = 3
+
+ for dag_id in dag_ids:
+ with dag_maker(
+ dag_id=dag_id,
+ max_active_runs=max_active_runs,
+ session=session,
+ catchup=True,
+ schedule=timedelta(seconds=60),
+ start_date=DEFAULT_DATE,
+ ):
+ # Need to use something that doesn't immediately get marked as
success by the scheduler
+ BashOperator(task_id="task", bash_command="true")
+
+ dag_run = dag_maker.create_dagrun(
+ state=State.QUEUED, session=session,
run_type=DagRunType.SCHEDULED
+ )
+
+ for _ in range(50):
+ # create a bunch of dagruns in queued state, to make sure they
are filtered by max_active_runs
+ dag_run = dag_maker.create_dagrun_after(
+ dag_run, run_type=DagRunType.SCHEDULED, state=State.QUEUED
+ )
+
+ self.job_runner._start_queued_dagruns(session)
+ session.flush()
+
+ running_dagrun_count = session.scalar(
+ select(func.count()).select_from(DagRun).where(DagRun.state ==
DagRunState.RUNNING)
+ )
+
+ assert running_dagrun_count == max_active_runs * len(dag_ids)
+
+ def
test_no_more_dagruns_are_set_to_running_when_max_active_runs_exceeded(self,
dag_maker, session):
+ """
+ Test that dagruns are not moved to running if there are more than the
max_active_runs running dagruns
+ """
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+
+ max_active_runs = 1
+ with dag_maker(
+ dag_id="test_dag",
+ max_active_runs=max_active_runs,
+ session=session,
+ catchup=True,
+ schedule=timedelta(seconds=60),
+ start_date=DEFAULT_DATE,
+ ):
+ # Need to use something that doesn't immediately get marked as
success by the scheduler
+ BashOperator(task_id="task", bash_command="true")
+
+ dag_run = dag_maker.create_dagrun(state=State.RUNNING,
session=session, run_type=DagRunType.SCHEDULED)
+
+ for _ in range(5):
+ # create a bunch of dagruns in queued state, to make sure they are
filtered by max_active_runs
Review Comment:
The comment says these are created in `queued` state, but the code sets
`state=State.RUNNING`. If the intent is to pre-fill/exceed `max_active_runs`
with RUNNING dagruns (which makes sense for this test), update the comment to
match the actual setup to avoid confusion for future maintainers.
```suggestion
# create a bunch of dagruns in running state, to exceed
max_active_runs
```
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -679,9 +679,27 @@ def get_queued_dag_runs_to_set_running(cls, session:
Session) -> ScalarResult[Da
.subquery()
)
+ available_dagruns_rn = (
+ select(
+ DagRun.dag_id,
+ DagRun.id,
+ func.row_number()
+ .over(partition_by=[DagRun.dag_id, DagRun.backfill_id],
order_by=DagRun.logical_date)
+ .label("rn"),
+ )
Review Comment:
This computes `row_number()` across *all* queued dagruns before applying
later eligibility filters (DagModel/Backfill joins, paused checks, etc.). On
large installations with many queued dagruns, that full-table window can become
a costly bottleneck. Consider pushing more predicates/joins into the same
subquery/CTE used for the window (so the window runs only on eligible
candidates), or otherwise narrowing the queued set prior to the window
calculation.
```suggestion
)
.join(
DagModel,
and_(
DagModel.dag_id == DagRun.dag_id,
DagModel.is_paused == false(),
DagModel.is_stale == false(),
),
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]