Copilot commented on code in PR #63702:
URL: https://github.com/apache/airflow/pull/63702#discussion_r3066499952
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1535,11 +1535,19 @@ def _are_premature_tis(
finished_tis: list[TI],
session: Session,
) -> tuple[bool, bool]:
+ # Do not pass the cached finished_tis to DepContext. The list was built
+ # at the start of the scheduling loop and may be stale when a
concurrent
+ # API call (e.g. "mark task as success") clears downstream task states
+ # between the snapshot and this evaluation. Leaving finished_tis=None
+ # makes DepContext.ensure_finished_tis() re-query the database so the
+ # trigger-rule evaluation sees the latest committed states instead of
+ # incorrectly marking tasks as upstream_failed based on outdated data.
+ # See: https://github.com/apache/airflow/issues/63697
dep_context = DepContext(
flag_upstream_failed=True,
ignore_in_retry_period=True,
ignore_in_reschedule_period=True,
- finished_tis=finished_tis,
+ finished_tis=None,
)
Review Comment:
Setting `finished_tis=None` forces `DepContext.ensure_finished_tis()` to
issue a DB query, but this runs in the same SQLAlchemy session where the
original `finished_tis` snapshot was already loaded earlier in
`task_instance_scheduling_decisions()`. Because ORM identity-map caching can
keep previously-loaded `TaskInstance.state` values stale across re-queries,
this may still evaluate trigger rules against outdated states unless the
relevant instances are expired/refreshed (or the re-query uses
`populate_existing`). Consider expiring the cached TIs (or using a
populate-existing/refresh strategy) before dependency evaluation to guarantee
fresh states.
##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -3562,3 +3562,81 @@ def
test_emit_dagrun_span_with_none_or_empty_carrier(self, dag_maker, session, c
assert spans[0].name == f"dag_run.{dr.dag_id}"
else:
assert len(spans) == 0
+
+
[email protected]_test
[email protected]_if_database_isolation_mode
+def test_are_premature_tis_refreshes_finished_states(dag_maker, session):
+ """
+ Verify that _are_premature_tis does not use stale finished_tis cache.
+
+ Reproduces the race condition from
https://github.com/apache/airflow/issues/63697
+ where the scheduler's cached finished_tis snapshot causes downstream tasks
to be
+ permanently stuck in upstream_failed after a concurrent API call clears
them.
Review Comment:
The test docstring includes a direct reference to an issue number/URL. In
this repo, test docstrings should describe behavior without embedding issue
numbers; consider moving the issue reference to a regular comment (or the PR
description) and keep the docstring focused on expected behavior.
```suggestion
This ensures the scheduler refreshes finished task instance states after
concurrent task clearing instead of incorrectly leaving downstream tasks
in
upstream_failed.
```
##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -3562,3 +3562,81 @@ def
test_emit_dagrun_span_with_none_or_empty_carrier(self, dag_maker, session, c
assert spans[0].name == f"dag_run.{dr.dag_id}"
else:
assert len(spans) == 0
+
+
[email protected]_test
[email protected]_if_database_isolation_mode
+def test_are_premature_tis_refreshes_finished_states(dag_maker, session):
+ """
+ Verify that _are_premature_tis does not use stale finished_tis cache.
+
+ Reproduces the race condition from
https://github.com/apache/airflow/issues/63697
+ where the scheduler's cached finished_tis snapshot causes downstream tasks
to be
+ permanently stuck in upstream_failed after a concurrent API call clears
them.
+ """
+ from sqlalchemy.orm import Session as SASession
Review Comment:
Importing `Session as SASession` inside the test function adds a new
in-function import. Please move this import to the module import section (or a
`TYPE_CHECKING` block if only for typing) to keep imports consistent and avoid
hidden import-time side effects.
##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -3562,3 +3562,81 @@ def
test_emit_dagrun_span_with_none_or_empty_carrier(self, dag_maker, session, c
assert spans[0].name == f"dag_run.{dr.dag_id}"
else:
assert len(spans) == 0
+
+
[email protected]_test
[email protected]_if_database_isolation_mode
+def test_are_premature_tis_refreshes_finished_states(dag_maker, session):
+ """
+ Verify that _are_premature_tis does not use stale finished_tis cache.
+
+ Reproduces the race condition from
https://github.com/apache/airflow/issues/63697
+ where the scheduler's cached finished_tis snapshot causes downstream tasks
to be
+ permanently stuck in upstream_failed after a concurrent API call clears
them.
+ """
+ from sqlalchemy.orm import Session as SASession
+
+ with dag_maker("test_upstream_failed_race", session=session):
+ fail_task = EmptyOperator(task_id="fail_task")
+ t0 = EmptyOperator(task_id="t0")
+ t1 = EmptyOperator(task_id="t1")
+ t2 = EmptyOperator(task_id="t2")
+ fail_task >> t0 >> t1 >> t2
+
+ dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+ tis = {ti.task_id: ti for ti in dr.task_instances}
+
+ # Step 1: fail_task fails, scheduler propagates upstream_failed to t0
+ tis["fail_task"].state = TaskInstanceState.FAILED
+ tis["t0"].state = TaskInstanceState.UPSTREAM_FAILED
+ session.flush()
+ session.commit()
+
+ # Step 2: Scheduler takes snapshot (simulated by building finished_tis
list)
+ bind = session.get_bind()
+ scheduler_session: SASession = SASession(bind=bind)
+ try:
+ sched_dr = scheduler_session.get(DagRun, dr.id)
+ sched_dr.dag = dr.dag
+
+ stale_finished_tis = sched_dr.get_task_instances(state=State.finished,
session=scheduler_session)
+ dag = sched_dr.get_dag()
+ for ti in stale_finished_tis:
+ ti.task = dag.get_task(ti.task_id)
+
+ # Step 3: API clears tasks in the primary session (concurrent
connection)
+ session.expire_all()
+ api_tis = {ti.task_id: ti for ti in
dr.get_task_instances(session=session)}
+ api_tis["fail_task"].state = TaskInstanceState.SUCCESS
+ api_tis["t0"].state = None # cleared
+ session.flush()
+ session.commit()
+
+ # Step 4: Scheduler evaluates with stale cache
+ unfinished_tis = sched_dr.get_task_instances(state=State.unfinished,
session=scheduler_session)
+ for ti in unfinished_tis:
+ ti.task = dag.get_task(ti.task_id)
+
+ # Call _are_premature_tis — the fix makes it ignore the stale
finished_tis
+ # and re-query from DB, seeing the updated states
+ sched_dr._are_premature_tis(
+ unfinished_tis=unfinished_tis,
+ finished_tis=stale_finished_tis,
+ session=scheduler_session,
+ )
+ scheduler_session.flush()
+ scheduler_session.commit()
+
+ # Step 5: Verify t1 was NOT incorrectly marked upstream_failed
+ session.expire_all()
+ final_states = {ti.task_id: ti.state for ti in
dr.get_task_instances(session=session)}
+ assert final_states["fail_task"] == TaskInstanceState.SUCCESS
+ assert final_states["t0"] is None # cleared by API
+ # With the fix, t1 should NOT be upstream_failed because the scheduler
+ # re-reads finished_tis from DB and sees fail_task=SUCCESS, t0=None
+ assert final_states["t1"] != TaskInstanceState.UPSTREAM_FAILED, (
+ "t1 should not be upstream_failed — the scheduler should have
re-read "
Review Comment:
The final assertion only checks that `t1` is not `UPSTREAM_FAILED`, which is
fairly loose and could let other incorrect terminal states slip through without
failing the test. Consider asserting the exact expected post-condition (e.g.
that `t1` remains cleared / in an unfinished state) to make this a stronger
regression test for the race.
```suggestion
# Step 5: Verify t1 remains cleared/unfinished after the API clear
session.expire_all()
final_states = {ti.task_id: ti.state for ti in
dr.get_task_instances(session=session)}
assert final_states["fail_task"] == TaskInstanceState.SUCCESS
assert final_states["t0"] is None # cleared by API
# With the fix, t1 should remain unset because the scheduler re-reads
# finished_tis from DB and sees fail_task=SUCCESS, t0=None.
assert final_states["t1"] is None, (
"t1 should remain cleared/unset — the scheduler should have
re-read "
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]