This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch v2-11-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-11-test by this push:
new 5cc47247114 fix: Skip dataset-triggered dags without
SerializedDagModel (#63546)
5cc47247114 is described below
commit 5cc47247114cf34820f66cdcb21117f8df30ed1c
Author: Leonardo Soares <[email protected]>
AuthorDate: Fri Apr 10 05:28:59 2026 -0300
fix: Skip dataset-triggered dags without SerializedDagModel (#63546)
Co-authored-by: Wei Lee <[email protected]>
---
airflow/models/dag.py | 27 +++++++++++-
tests/models/test_dag.py | 111 +++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 137 insertions(+), 1 deletion(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 6b3744db138..741b28120df 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -4069,6 +4069,11 @@ class DagModel(Base):
This will return a resultset of rows that is row-level-locked with a
"SELECT ... FOR UPDATE" query,
you should ensure that any scheduling decisions are made in a single
transaction -- as soon as the
transaction is committed it will be unlocked.
+
+ For dataset-triggered scheduling, Dags that have
``DatasetDagRunQueue`` rows but no matching
+ ``SerializedDagModel`` row are omitted from the returned
``dataset_triggered_dag_info`` until
+ serialization exists; DDRQs are **not** deleted here so the scheduler
can re-evaluate on a
+ later run.
"""
from airflow.models.serialized_dag import SerializedDagModel
@@ -4094,13 +4099,33 @@ class DagModel(Base):
ser_dags = session.scalars(
select(SerializedDagModel).where(SerializedDagModel.dag_id.in_(dag_statuses.keys()))
).all()
+ ser_dag_ids = {s.dag_id for s in ser_dags}
+ missing_from_serialized = set(by_dag.keys()) - ser_dag_ids
+ if missing_from_serialized:
+ log.info(
+ "Dags have queued dataset events (DDRQs), but are not found in
the serialized_dag table."
+ " — skipping Dag run creation: %s",
+ sorted(missing_from_serialized),
+ )
+ for dag_id in missing_from_serialized:
+ del by_dag[dag_id]
+ del dag_statuses[dag_id]
for ser_dag in ser_dags:
dag_id = ser_dag.dag_id
statuses = dag_statuses[dag_id]
+ dataset_condition = ser_dag.dag.timetable.dataset_condition
- if not dag_ready(dag_id,
cond=ser_dag.dag.timetable.dataset_condition, statuses=statuses):
+ if not dag_ready(dag_id, cond=dataset_condition,
statuses=statuses):
del by_dag[dag_id]
del dag_statuses[dag_id]
+ else:
+ log.debug(
+ "Dataset condition satisfied: dag_id=%s, condition=%s,
ddrq_uris=%s, ddrq_count=%d",
+ dag_id,
+ dataset_condition,
+ sorted(statuses.keys()),
+ len(statuses),
+ )
del dag_statuses
dataset_triggered_dag_info = {}
for dag_id, records in by_dag.items():
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 5f721b61d26..e13da06ab1a 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3030,6 +3030,7 @@ class TestDagModel:
max_active_runs=1,
schedule=[dataset],
start_date=pendulum.now().add(days=-2),
+ serialized=True,
) as dag:
EmptyOperator(task_id="dummy")
@@ -3064,6 +3065,115 @@ class TestDagModel:
dag_models = query.all()
assert dag_models == [dag_model]
+ def test_dags_needing_dagruns_skips_ddrq_when_serialized_dag_missing(self,
session, caplog):
+ """DDRQ rows for a Dag without SerializedDagModel must be skipped (no
dataset_triggered info).
+
+ Rows must remain in ``dataset_dag_run_queue`` so the scheduler can
re-evaluate on a later
+ heartbeat once ``SerializedDagModel`` exists (``dags_needing_dagruns``
only drops them from
+ the in-memory candidate set, it does not delete ORM rows).
+ """
+ orphan_dag_id = "ddr_q_no_serialized_dag"
+ session.add(DatasetModel(uri="dataset_for_orphan_ddrq"))
+ session.flush()
+ dataset_id =
session.query(DatasetModel.id).filter_by(uri="dataset_for_orphan_ddrq").scalar()
+ session.add(
+ DagModel(
+ dag_id=orphan_dag_id,
+ max_active_tasks=1,
+ has_task_concurrency_limits=False,
+ next_dagrun=timezone.datetime(2038, 1, 1),
+ next_dagrun_create_after=timezone.datetime(2038, 1, 2),
+ is_active=True,
+ has_import_errors=False,
+ is_paused=False,
+ )
+ )
+ session.flush()
+ session.add(DatasetDagRunQueue(dataset_id=dataset_id,
target_dag_id=orphan_dag_id))
+ session.flush()
+
+ with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"):
+ _query, dataset_triggered_dag_info =
DagModel.dags_needing_dagruns(session)
+
+ assert orphan_dag_id not in dataset_triggered_dag_info
+ assert (
+ "Dags have queued dataset events (DDRQs), but are not found in the
serialized_dag table."
+ in caplog.text
+ )
+ assert orphan_dag_id in caplog.text
+ assert (
+ session.query(DatasetDagRunQueue)
+ .filter(DatasetDagRunQueue.target_dag_id == orphan_dag_id)
+ .count()
+ == 1
+ )
+
+ def
test_dags_needing_dagruns_missing_serialized_warning_lists_sorted_dag_ids(self,
session, caplog):
+ """When multiple dags lack SerializedDagModel, the warning lists
dag_ids sorted."""
+ session.add_all(
+ [
+ DatasetModel(uri="ds_ghost_z"),
+ DatasetModel(uri="ds_ghost_a"),
+ ]
+ )
+ session.flush()
+ ds_z_id =
session.query(DatasetModel.id).filter_by(uri="ds_ghost_z").scalar()
+ ds_a_id =
session.query(DatasetModel.id).filter_by(uri="ds_ghost_a").scalar()
+ far = timezone.datetime(2038, 1, 1)
+ far_after = timezone.datetime(2038, 1, 2)
+ session.add_all(
+ [
+ DagModel(
+ dag_id="ghost_z",
+ max_active_tasks=1,
+ has_task_concurrency_limits=False,
+ next_dagrun=far,
+ next_dagrun_create_after=far_after,
+ is_active=True,
+ has_import_errors=False,
+ is_paused=False,
+ ),
+ DagModel(
+ dag_id="ghost_a",
+ max_active_tasks=1,
+ has_task_concurrency_limits=False,
+ next_dagrun=far,
+ next_dagrun_create_after=far_after,
+ is_active=True,
+ has_import_errors=False,
+ is_paused=False,
+ ),
+ ]
+ )
+ session.flush()
+
+ session.add_all(
+ [
+ DatasetDagRunQueue(dataset_id=ds_z_id,
target_dag_id="ghost_z"),
+ DatasetDagRunQueue(dataset_id=ds_a_id,
target_dag_id="ghost_a"),
+ ]
+ )
+ session.flush()
+
+ with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"):
+ _query, dataset_triggered_dag_info =
DagModel.dags_needing_dagruns(session)
+
+ assert "ghost_a" not in dataset_triggered_dag_info
+ assert "ghost_z" not in dataset_triggered_dag_info
+ msg = next(
+ r.message
+ for r in caplog.records
+ if "Dags have queued dataset events (DDRQs), but are not found in
the serialized_dag table."
+ in r.message
+ )
+ assert msg.index("ghost_a") < msg.index("ghost_z")
+ assert (
+ session.query(DatasetDagRunQueue)
+ .filter(DatasetDagRunQueue.target_dag_id.in_(("ghost_a",
"ghost_z")))
+ .count()
+ == 2
+ )
+
def test_dags_needing_dagruns_dataset_aliases(self, dag_maker, session):
# link dataset_alias hello_alias to dataset hello
dataset_model = DatasetModel(uri="hello")
@@ -3078,6 +3188,7 @@ class TestDagModel:
max_active_runs=1,
schedule=[DatasetAlias(name="hello_alias")],
start_date=pendulum.now().add(days=-2),
+ serialized=True,
):
EmptyOperator(task_id="dummy")