Copilot commented on code in PR #63977:
URL: https://github.com/apache/airflow/pull/63977#discussion_r3025332400


##########
providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py:
##########
@@ -215,8 +215,10 @@ def __init__(
                 f"Expected str, datetime.datetime, or None for parameter 
'logical_date'. Got {type(logical_date).__name__}"
             )
 
-        if fail_when_dag_is_paused and AIRFLOW_V_3_0_PLUS:
-            raise NotImplementedError("Setting `fail_when_dag_is_paused` not 
yet supported for Airflow 3.x")
+        if fail_when_dag_is_paused and AIRFLOW_V_3_0_PLUS and not 
AIRFLOW_V_3_2_PLUS:
+            raise NotImplementedError(
+                "Setting `fail_when_dag_is_paused` is only supported for 
Airflow 3.2 and above"

Review Comment:
   The operator docstring currently states “If the dag to trigger is paused, 
DagIsPaused will be raised” for `fail_when_dag_is_paused`, but for Airflow 
3.0/3.1 it now raises `NotImplementedError` during initialization. Please 
update the parameter documentation to reflect the version-dependent behavior 
(e.g., supported on Airflow 2.x and Airflow 3.2+ only) so API users don’t get 
surprised by the init-time exception.
   ```suggestion
                   "Setting `fail_when_dag_is_paused` is only supported for 
Airflow 2.x and Airflow 3.2+ "
                   "(it is not available on Airflow 3.0 and 3.1)."
   ```



##########
providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py:
##########
@@ -306,6 +310,24 @@ def 
test_trigger_dag_run_with_fail_when_dag_is_paused_should_fail(self):
                 fail_when_dag_is_paused=True,
             )
 
+    @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Test only for Airflow 
3.2+")
+    def 
test_trigger_dag_run_with_fail_when_dag_is_paused_raises_dag_is_paused(self):
+        from airflow.providers.standard.operators.trigger_dagrun import 
DagIsPaused
+        from airflow.sdk.types import RuntimeTaskInstanceProtocol
+
+        ti = mock.Mock(spec=RuntimeTaskInstanceProtocol)
+        ti.get_dag.return_value = mock.Mock(is_paused=True)
+
+        task = TriggerDagRunOperator(
+            task_id="test_task",
+            trigger_dag_id=TRIGGERED_DAG_ID,
+            conf={"foo": "bar"},
+            fail_when_dag_is_paused=True,

Review Comment:
   This test is intended to validate the paused-DAG behavior, but it currently 
leaves `openlineage_inject_parent_info` enabled (default `True`), which can 
pull in unrelated logic and potentially require additional `ti` attributes on 
the mock. To make the test more focused and less brittle, set 
`openlineage_inject_parent_info=False` in the operator instantiation for this 
test.
   ```suggestion
               fail_when_dag_is_paused=True,
               openlineage_inject_parent_info=False,
   ```



##########
providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py:
##########
@@ -306,6 +310,24 @@ def 
test_trigger_dag_run_with_fail_when_dag_is_paused_should_fail(self):
                 fail_when_dag_is_paused=True,
             )
 
+    @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Test only for Airflow 
3.2+")
+    def 
test_trigger_dag_run_with_fail_when_dag_is_paused_raises_dag_is_paused(self):

Review Comment:
   The new Airflow 3.2+ coverage only asserts the paused path. Please add a 
companion test for the non-paused case (e.g., `is_paused=False`) that asserts 
`DagIsPaused` is not raised and execution proceeds to the expected Airflow 3 
deferral/trigger behavior (likely raising `DagRunTriggerException`). This will 
help ensure the pause check doesn’t incorrectly block triggering on 3.2+.



##########
providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py:
##########
@@ -296,6 +291,19 @@ def _trigger_dag_af_3(self, context, run_id, 
parsed_logical_date):
 
         raise DagRunTriggerException(**kwargs_accepted)
 
+    def _raise_if_trigger_dag_is_paused(self, context: Context) -> None:
+        if AIRFLOW_V_3_0_PLUS:
+            dag = context["ti"].get_dag(dag_id=self.trigger_dag_id)
+            if dag.is_paused:
+                raise DagIsPaused(dag_id=self.trigger_dag_id)
+            return

Review Comment:
   `_raise_if_trigger_dag_is_paused()` uses the Airflow 3 Task SDK branch for 
all `AIRFLOW_V_3_0_PLUS`, but the feature is explicitly unsupported on 3.0/3.1 
(enforced in `__init__`). To keep this helper consistent and robust (e.g., if 
it’s called from another path in the future), consider gating this branch on 
`AIRFLOW_V_3_2_PLUS` (and either fall back to the DB-based check or raise the 
same `NotImplementedError`), rather than only `AIRFLOW_V_3_0_PLUS`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to