Copilot commented on code in PR #63702:
URL: https://github.com/apache/airflow/pull/63702#discussion_r3066499952


##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1535,11 +1535,19 @@ def _are_premature_tis(
         finished_tis: list[TI],
         session: Session,
     ) -> tuple[bool, bool]:
+        # Do not pass the cached finished_tis to DepContext. The list was built
+        # at the start of the scheduling loop and may be stale when a 
concurrent
+        # API call (e.g. "mark task as success") clears downstream task states
+        # between the snapshot and this evaluation.  Leaving finished_tis=None
+        # makes DepContext.ensure_finished_tis() re-query the database so the
+        # trigger-rule evaluation sees the latest committed states instead of
+        # incorrectly marking tasks as upstream_failed based on outdated data.
+        # See: https://github.com/apache/airflow/issues/63697
         dep_context = DepContext(
             flag_upstream_failed=True,
             ignore_in_retry_period=True,
             ignore_in_reschedule_period=True,
-            finished_tis=finished_tis,
+            finished_tis=None,
         )

Review Comment:
   Setting `finished_tis=None` forces `DepContext.ensure_finished_tis()` to 
issue a DB query, but this runs in the same SQLAlchemy session where the 
original `finished_tis` snapshot was already loaded earlier in 
`task_instance_scheduling_decisions()`. Because ORM identity-map caching can 
keep previously-loaded `TaskInstance.state` values stale across re-queries, 
this may still evaluate trigger rules against outdated states unless the 
relevant instances are expired/refreshed (or the re-query uses 
`populate_existing`). Consider expiring the cached TIs (or using a 
populate-existing/refresh strategy) before dependency evaluation to guarantee 
fresh states.



##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -3562,3 +3562,81 @@ def 
test_emit_dagrun_span_with_none_or_empty_carrier(self, dag_maker, session, c
             assert spans[0].name == f"dag_run.{dr.dag_id}"
         else:
             assert len(spans) == 0
+
+
[email protected]_test
[email protected]_if_database_isolation_mode
+def test_are_premature_tis_refreshes_finished_states(dag_maker, session):
+    """
+    Verify that _are_premature_tis does not use stale finished_tis cache.
+
+    Reproduces the race condition from 
https://github.com/apache/airflow/issues/63697
+    where the scheduler's cached finished_tis snapshot causes downstream tasks 
to be
+    permanently stuck in upstream_failed after a concurrent API call clears 
them.

Review Comment:
   The test docstring includes a direct reference to an issue number/URL. In 
this repo, test docstrings should describe behavior without embedding issue 
numbers; consider moving the issue reference to a regular comment (or the PR 
description) and keep the docstring focused on expected behavior.
   ```suggestion
       This ensures the scheduler refreshes finished task instance states after
       concurrent task clearing instead of incorrectly leaving downstream tasks 
in
       upstream_failed.
   ```



##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -3562,3 +3562,81 @@ def 
test_emit_dagrun_span_with_none_or_empty_carrier(self, dag_maker, session, c
             assert spans[0].name == f"dag_run.{dr.dag_id}"
         else:
             assert len(spans) == 0
+
+
[email protected]_test
[email protected]_if_database_isolation_mode
+def test_are_premature_tis_refreshes_finished_states(dag_maker, session):
+    """
+    Verify that _are_premature_tis does not use stale finished_tis cache.
+
+    Reproduces the race condition from 
https://github.com/apache/airflow/issues/63697
+    where the scheduler's cached finished_tis snapshot causes downstream tasks 
to be
+    permanently stuck in upstream_failed after a concurrent API call clears 
them.
+    """
+    from sqlalchemy.orm import Session as SASession

Review Comment:
   Importing `Session as SASession` inside the test function adds a new 
in-function import. Please move this import to the module import section (or a 
`TYPE_CHECKING` block if only for typing) to keep imports consistent and avoid 
hidden import-time side effects.



##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -3562,3 +3562,81 @@ def 
test_emit_dagrun_span_with_none_or_empty_carrier(self, dag_maker, session, c
             assert spans[0].name == f"dag_run.{dr.dag_id}"
         else:
             assert len(spans) == 0
+
+
[email protected]_test
[email protected]_if_database_isolation_mode
+def test_are_premature_tis_refreshes_finished_states(dag_maker, session):
+    """
+    Verify that _are_premature_tis does not use stale finished_tis cache.
+
+    Reproduces the race condition from 
https://github.com/apache/airflow/issues/63697
+    where the scheduler's cached finished_tis snapshot causes downstream tasks 
to be
+    permanently stuck in upstream_failed after a concurrent API call clears 
them.
+    """
+    from sqlalchemy.orm import Session as SASession
+
+    with dag_maker("test_upstream_failed_race", session=session):
+        fail_task = EmptyOperator(task_id="fail_task")
+        t0 = EmptyOperator(task_id="t0")
+        t1 = EmptyOperator(task_id="t1")
+        t2 = EmptyOperator(task_id="t2")
+        fail_task >> t0 >> t1 >> t2
+
+    dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+    tis = {ti.task_id: ti for ti in dr.task_instances}
+
+    # Step 1: fail_task fails, scheduler propagates upstream_failed to t0
+    tis["fail_task"].state = TaskInstanceState.FAILED
+    tis["t0"].state = TaskInstanceState.UPSTREAM_FAILED
+    session.flush()
+    session.commit()
+
+    # Step 2: Scheduler takes snapshot (simulated by building finished_tis 
list)
+    bind = session.get_bind()
+    scheduler_session: SASession = SASession(bind=bind)
+    try:
+        sched_dr = scheduler_session.get(DagRun, dr.id)
+        sched_dr.dag = dr.dag
+
+        stale_finished_tis = sched_dr.get_task_instances(state=State.finished, 
session=scheduler_session)
+        dag = sched_dr.get_dag()
+        for ti in stale_finished_tis:
+            ti.task = dag.get_task(ti.task_id)
+
+        # Step 3: API clears tasks in the primary session (concurrent 
connection)
+        session.expire_all()
+        api_tis = {ti.task_id: ti for ti in 
dr.get_task_instances(session=session)}
+        api_tis["fail_task"].state = TaskInstanceState.SUCCESS
+        api_tis["t0"].state = None  # cleared
+        session.flush()
+        session.commit()
+
+        # Step 4: Scheduler evaluates with stale cache
+        unfinished_tis = sched_dr.get_task_instances(state=State.unfinished, 
session=scheduler_session)
+        for ti in unfinished_tis:
+            ti.task = dag.get_task(ti.task_id)
+
+        # Call _are_premature_tis — the fix makes it ignore the stale 
finished_tis
+        # and re-query from DB, seeing the updated states
+        sched_dr._are_premature_tis(
+            unfinished_tis=unfinished_tis,
+            finished_tis=stale_finished_tis,
+            session=scheduler_session,
+        )
+        scheduler_session.flush()
+        scheduler_session.commit()
+
+        # Step 5: Verify t1 was NOT incorrectly marked upstream_failed
+        session.expire_all()
+        final_states = {ti.task_id: ti.state for ti in 
dr.get_task_instances(session=session)}
+        assert final_states["fail_task"] == TaskInstanceState.SUCCESS
+        assert final_states["t0"] is None  # cleared by API
+        # With the fix, t1 should NOT be upstream_failed because the scheduler
+        # re-reads finished_tis from DB and sees fail_task=SUCCESS, t0=None
+        assert final_states["t1"] != TaskInstanceState.UPSTREAM_FAILED, (
+            "t1 should not be upstream_failed — the scheduler should have 
re-read "

Review Comment:
   The final assertion only checks that `t1` is not `UPSTREAM_FAILED`, which is 
fairly loose and could let other incorrect terminal states slip through without 
failing the test. Consider asserting the exact expected post-condition (e.g. 
that `t1` remains cleared / in an unfinished state) to make this a stronger 
regression test for the race.
   ```suggestion
           # Step 5: Verify t1 remains cleared/unfinished after the API clear
           session.expire_all()
           final_states = {ti.task_id: ti.state for ti in 
dr.get_task_instances(session=session)}
           assert final_states["fail_task"] == TaskInstanceState.SUCCESS
           assert final_states["t0"] is None  # cleared by API
           # With the fix, t1 should remain unset because the scheduler re-reads
           # finished_tis from DB and sees fail_task=SUCCESS, t0=None.
           assert final_states["t1"] is None, (
               "t1 should remain cleared/unset — the scheduler should have 
re-read "
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to