Copilot commented on code in PR #64521:
URL: https://github.com/apache/airflow/pull/64521#discussion_r3025332213


##########
tests/datasets/test_manager.py:
##########
@@ -228,3 +228,32 @@ def test_create_datasets_notifies_dataset_listener(self, 
session):
         # Ensure the listener was notified
         assert len(dataset_listener.created) == 1
         assert dataset_listener.created[0].uri == dsm.uri
+
+    @pytest.mark.skip_if_database_isolation_mode
+    def test_register_dataset_change_queues_stale_dag(self, session, 
mock_task_instance):
+        dsem = DatasetManager()
+
+        dsm = DatasetModel(uri="test_dataset_uri_3")
+        session.add(dsm)
+
+        # Setup a dag that is STALE but NOT PAUSED
+        # We want stale dags to still receive updates
+        stale_dag = DagModel(dag_id="stale_dag", is_active=False, 
is_paused=False)
+        session.add(stale_dag)
+
+        # Link the Stale Dag to the Asset
+        dsm.scheduled_dags = 
[DagScheduleDatasetReference(dag_id=stale_dag.dag_id)]

Review Comment:
   `DatasetModel` doesn’t have a `scheduled_dags` relationship (and assigning 
to it won’t populate `DatasetModel.consuming_dags`), so this test won’t 
actually link the stale DAG to the dataset and the queue count assertion will 
fail. Use `dsm.consuming_dags = 
[DagScheduleDatasetReference(dag_id=stale_dag.dag_id)]` (matching the other 
tests in this file) and update the nearby comment to refer to “dataset” (not 
“asset”).
   ```suggestion
           # Link the stale DAG to the dataset
           dsm.consuming_dags = 
[DagScheduleDatasetReference(dag_id=stale_dag.dag_id)]
   ```



##########
airflow/datasets/manager.py:
##########
@@ -114,7 +114,7 @@ def register_dataset_change(
         session.add(dataset_event)
 
         dags_to_queue_from_dataset = {
-            ref.dag for ref in dataset_model.consuming_dags if 
ref.dag.is_active and not ref.dag.is_paused
+            ref.dag for ref in dataset_model.consuming_dags if not 
ref.dag.is_paused
         }

Review Comment:
   Removing the `ref.dag.is_active` filter changes behavior for DAGs with 
`is_active=False` (previously no `DatasetDagRunQueue` row was created; now one 
will be). There’s an existing expectation that disabled consumer DAGs don’t get 
queued (e.g. `tests/jobs/test_scheduler_job.py` 
`test_no_create_dag_runs_when_dag_disabled`, param `{"is_active": False}`), so 
this PR should update those tests/behavior to match the new semantics (queue 
even when inactive so it can be picked up once the DAG becomes healthy/active 
again).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to