This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch v2-11-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-11-test by this push:
     new 5cc47247114 fix: Skip dataset-triggered dags without 
SerializedDagModel (#63546)
5cc47247114 is described below

commit 5cc47247114cf34820f66cdcb21117f8df30ed1c
Author: Leonardo Soares <[email protected]>
AuthorDate: Fri Apr 10 05:28:59 2026 -0300

    fix: Skip dataset-triggered dags without SerializedDagModel (#63546)
    
    Co-authored-by: Wei Lee <[email protected]>
---
 airflow/models/dag.py    |  27 +++++++++++-
 tests/models/test_dag.py | 111 +++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 137 insertions(+), 1 deletion(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 6b3744db138..741b28120df 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -4069,6 +4069,11 @@ class DagModel(Base):
         This will return a resultset of rows that is row-level-locked with a 
"SELECT ... FOR UPDATE" query,
         you should ensure that any scheduling decisions are made in a single 
transaction -- as soon as the
         transaction is committed it will be unlocked.
+
+        For dataset-triggered scheduling, Dags that have 
``DatasetDagRunQueue`` rows but no matching
+        ``SerializedDagModel`` row are omitted from the returned 
``dataset_triggered_dag_info`` until
+        serialization exists; DDRQs are **not** deleted here so the scheduler 
can re-evaluate on a
+        later run.
         """
         from airflow.models.serialized_dag import SerializedDagModel
 
@@ -4094,13 +4099,33 @@ class DagModel(Base):
         ser_dags = session.scalars(
             
select(SerializedDagModel).where(SerializedDagModel.dag_id.in_(dag_statuses.keys()))
         ).all()
+        ser_dag_ids = {s.dag_id for s in ser_dags}
+        missing_from_serialized = set(by_dag.keys()) - ser_dag_ids
+        if missing_from_serialized:
+            log.info(
+                "Dags have queued dataset events (DDRQs), but are not found in 
the serialized_dag table."
+                " — skipping Dag run creation: %s",
+                sorted(missing_from_serialized),
+            )
+            for dag_id in missing_from_serialized:
+                del by_dag[dag_id]
+                del dag_statuses[dag_id]
         for ser_dag in ser_dags:
             dag_id = ser_dag.dag_id
             statuses = dag_statuses[dag_id]
+            dataset_condition = ser_dag.dag.timetable.dataset_condition
 
-            if not dag_ready(dag_id, 
cond=ser_dag.dag.timetable.dataset_condition, statuses=statuses):
+            if not dag_ready(dag_id, cond=dataset_condition, 
statuses=statuses):
                 del by_dag[dag_id]
                 del dag_statuses[dag_id]
+            else:
+                log.debug(
+                    "Dataset condition satisfied: dag_id=%s, condition=%s, 
ddrq_uris=%s, ddrq_count=%d",
+                    dag_id,
+                    dataset_condition,
+                    sorted(statuses.keys()),
+                    len(statuses),
+                )
         del dag_statuses
         dataset_triggered_dag_info = {}
         for dag_id, records in by_dag.items():
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 5f721b61d26..e13da06ab1a 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3030,6 +3030,7 @@ class TestDagModel:
             max_active_runs=1,
             schedule=[dataset],
             start_date=pendulum.now().add(days=-2),
+            serialized=True,
         ) as dag:
             EmptyOperator(task_id="dummy")
 
@@ -3064,6 +3065,115 @@ class TestDagModel:
         dag_models = query.all()
         assert dag_models == [dag_model]
 
+    def test_dags_needing_dagruns_skips_ddrq_when_serialized_dag_missing(self, 
session, caplog):
+        """DDRQ rows for a Dag without SerializedDagModel must be skipped (no 
dataset_triggered info).
+
+        Rows must remain in ``dataset_dag_run_queue`` so the scheduler can 
re-evaluate on a later
+        heartbeat once ``SerializedDagModel`` exists (``dags_needing_dagruns`` 
only drops them from
+        the in-memory candidate set, it does not delete ORM rows).
+        """
+        orphan_dag_id = "ddr_q_no_serialized_dag"
+        session.add(DatasetModel(uri="dataset_for_orphan_ddrq"))
+        session.flush()
+        dataset_id = 
session.query(DatasetModel.id).filter_by(uri="dataset_for_orphan_ddrq").scalar()
+        session.add(
+            DagModel(
+                dag_id=orphan_dag_id,
+                max_active_tasks=1,
+                has_task_concurrency_limits=False,
+                next_dagrun=timezone.datetime(2038, 1, 1),
+                next_dagrun_create_after=timezone.datetime(2038, 1, 2),
+                is_active=True,
+                has_import_errors=False,
+                is_paused=False,
+            )
+        )
+        session.flush()
+        session.add(DatasetDagRunQueue(dataset_id=dataset_id, 
target_dag_id=orphan_dag_id))
+        session.flush()
+
+        with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"):
+            _query, dataset_triggered_dag_info = 
DagModel.dags_needing_dagruns(session)
+
+        assert orphan_dag_id not in dataset_triggered_dag_info
+        assert (
+            "Dags have queued dataset events (DDRQs), but are not found in the 
serialized_dag table."
+            in caplog.text
+        )
+        assert orphan_dag_id in caplog.text
+        assert (
+            session.query(DatasetDagRunQueue)
+            .filter(DatasetDagRunQueue.target_dag_id == orphan_dag_id)
+            .count()
+            == 1
+        )
+
+    def 
test_dags_needing_dagruns_missing_serialized_warning_lists_sorted_dag_ids(self, 
session, caplog):
+        """When multiple dags lack SerializedDagModel, the warning lists 
dag_ids sorted."""
+        session.add_all(
+            [
+                DatasetModel(uri="ds_ghost_z"),
+                DatasetModel(uri="ds_ghost_a"),
+            ]
+        )
+        session.flush()
+        ds_z_id = 
session.query(DatasetModel.id).filter_by(uri="ds_ghost_z").scalar()
+        ds_a_id = 
session.query(DatasetModel.id).filter_by(uri="ds_ghost_a").scalar()
+        far = timezone.datetime(2038, 1, 1)
+        far_after = timezone.datetime(2038, 1, 2)
+        session.add_all(
+            [
+                DagModel(
+                    dag_id="ghost_z",
+                    max_active_tasks=1,
+                    has_task_concurrency_limits=False,
+                    next_dagrun=far,
+                    next_dagrun_create_after=far_after,
+                    is_active=True,
+                    has_import_errors=False,
+                    is_paused=False,
+                ),
+                DagModel(
+                    dag_id="ghost_a",
+                    max_active_tasks=1,
+                    has_task_concurrency_limits=False,
+                    next_dagrun=far,
+                    next_dagrun_create_after=far_after,
+                    is_active=True,
+                    has_import_errors=False,
+                    is_paused=False,
+                ),
+            ]
+        )
+        session.flush()
+
+        session.add_all(
+            [
+                DatasetDagRunQueue(dataset_id=ds_z_id, 
target_dag_id="ghost_z"),
+                DatasetDagRunQueue(dataset_id=ds_a_id, 
target_dag_id="ghost_a"),
+            ]
+        )
+        session.flush()
+
+        with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"):
+            _query, dataset_triggered_dag_info = 
DagModel.dags_needing_dagruns(session)
+
+        assert "ghost_a" not in dataset_triggered_dag_info
+        assert "ghost_z" not in dataset_triggered_dag_info
+        msg = next(
+            r.message
+            for r in caplog.records
+            if "Dags have queued dataset events (DDRQs), but are not found in 
the serialized_dag table."
+            in r.message
+        )
+        assert msg.index("ghost_a") < msg.index("ghost_z")
+        assert (
+            session.query(DatasetDagRunQueue)
+            .filter(DatasetDagRunQueue.target_dag_id.in_(("ghost_a", 
"ghost_z")))
+            .count()
+            == 2
+        )
+
     def test_dags_needing_dagruns_dataset_aliases(self, dag_maker, session):
         # link dataset_alias hello_alias to dataset hello
         dataset_model = DatasetModel(uri="hello")
@@ -3078,6 +3188,7 @@ class TestDagModel:
             max_active_runs=1,
             schedule=[DatasetAlias(name="hello_alias")],
             start_date=pendulum.now().add(days=-2),
+            serialized=True,
         ):
             EmptyOperator(task_id="dummy")
 

Reply via email to