Copilot commented on code in PR #64521:
URL: https://github.com/apache/airflow/pull/64521#discussion_r3025332213
##########
tests/datasets/test_manager.py:
##########
@@ -228,3 +228,32 @@ def test_create_datasets_notifies_dataset_listener(self,
session):
# Ensure the listener was notified
assert len(dataset_listener.created) == 1
assert dataset_listener.created[0].uri == dsm.uri
+
+ @pytest.mark.skip_if_database_isolation_mode
+ def test_register_dataset_change_queues_stale_dag(self, session,
mock_task_instance):
+ dsem = DatasetManager()
+
+ dsm = DatasetModel(uri="test_dataset_uri_3")
+ session.add(dsm)
+
+ # Setup a dag that is STALE but NOT PAUSED
+ # We want stale dags to still receive updates
+ stale_dag = DagModel(dag_id="stale_dag", is_active=False,
is_paused=False)
+ session.add(stale_dag)
+
+ # Link the Stale Dag to the Asset
+ dsm.scheduled_dags =
[DagScheduleDatasetReference(dag_id=stale_dag.dag_id)]
Review Comment:
`DatasetModel` doesn’t have a `scheduled_dags` relationship (and assigning
to it won’t populate `DatasetModel.consuming_dags`), so this test won’t
actually link the stale DAG to the dataset and the queue count assertion will
fail. Use `dsm.consuming_dags =
[DagScheduleDatasetReference(dag_id=stale_dag.dag_id)]` (matching the other
tests in this file) and update the nearby comment to refer to “dataset” (not
“asset”).
```suggestion
# Link the stale DAG to the dataset
dsm.consuming_dags =
[DagScheduleDatasetReference(dag_id=stale_dag.dag_id)]
```
##########
airflow/datasets/manager.py:
##########
@@ -114,7 +114,7 @@ def register_dataset_change(
session.add(dataset_event)
dags_to_queue_from_dataset = {
- ref.dag for ref in dataset_model.consuming_dags if
ref.dag.is_active and not ref.dag.is_paused
+ ref.dag for ref in dataset_model.consuming_dags if not
ref.dag.is_paused
}
Review Comment:
Removing the `ref.dag.is_active` filter changes behavior for DAGs with
`is_active=False` (previously no `DatasetDagRunQueue` row was created; now one
will be). There’s an existing expectation that disabled consumer DAGs don’t get
queued (e.g. `tests/jobs/test_scheduler_job.py`
`test_no_create_dag_runs_when_dag_disabled`, param `{"is_active": False}`), so
this PR should update those tests/behavior to match the new semantics (queue
even when inactive so it can be picked up once the DAG becomes healthy/active
again).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]