ashb commented on code in PR #53831:
URL: https://github.com/apache/airflow/pull/53831#discussion_r2283333349


##########
airflow-core/docs/administration-and-deployment/logging-monitoring/callbacks.rst:
##########
@@ -95,22 +101,43 @@ Before each task begins to execute, the 
``task_execute_callback`` function will
 
     with DAG(
         dag_id="example_callback",
-        schedule=None,
-        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
-        dagrun_timeout=datetime.timedelta(minutes=60),
-        catchup=False,
         on_success_callback=dag_success_alert,
         default_args={"on_execute_callback": task_execute_callback},
-        tags=["example"],
     ):
         task1 = EmptyOperator(task_id="task1", 
on_failure_callback=[task_failure_alert])
         task2 = EmptyOperator(task_id="task2")
         task3 = EmptyOperator(task_id="task3")
         task1 >> task2 >> task3
 
-.. note::
-    As of Airflow 2.6.0, callbacks now supports a list of callback functions, 
allowing users to specify multiple functions
-    to be executed in the desired event. Simply pass a list of callback 
functions to the callback args when defining your DAG/task
-    callbacks: e.g ``on_failure_callback=[callback_func_1, callback_func_2]``
-
 Full list of variables available in ``context`` in :doc:`docs 
<../../templates-ref>` and `code 
<https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/context.py>`_.
+
+
+Using Notifiers
+^^^^^^^^^^^^^^^
+
+You can use Notifiers in your ``DAG`` definition by passing it as an argument 
to the ``on_*_callbacks``.

Review Comment:
   ```suggestion
   You can use Notifiers in your Dag definition by passing it as an argument to 
the ``on_*_callbacks``.
   ```
   I think is the casing we are using now? Or if not that, at least don't put 
it in a code block



##########
airflow-core/src/airflow/triggers/deadline.py:
##########
@@ -49,7 +49,21 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
 
         try:
             callback = import_string(self.callback_path)
-            result = await callback(**self.callback_kwargs)
+
+            # TODO: get airflow context
+            context: dict = {}
+
+            # If callback is an awaitable class, callback_kwargs will be used 
as parameters to the constructor
+            if hasattr(callback, "__await__"):
+                callback_instance = callback(**self.callback_kwargs)
+
+                # Since parameters cannot be passed in __await__, context is 
passed as an attribute
+                callback_instance.context = context

Review Comment:
   Was there a reason why we set it as an attribute, instead of passing it in 
as another kwarg on L58?
   
   ```suggestion
                   callback_instance = callback(**self.callback_kwargs, 
context=context)
   ```
   
   Not every class is writable/can have attributes added to it. I'm not sure 
which approach is actually more "developer friendly".



##########
airflow-core/tests/unit/models/test_deadline.py:
##########
@@ -142,32 +149,18 @@ def test_prune_deadlines(self, mock_session, conditions):
         else:
             mock_session.query.assert_not_called()
 
-    def test_orm(self):
-        deadline_orm = Deadline(
-            deadline_time=DEFAULT_DATE,
-            callback=TEST_ASYNC_CALLBACK,
-            dagrun_id=RUN_ID,
-        )
-
+    def test_orm(self, deadline_orm, dagrun):

Review Comment:
   Ditto here -- this doesn't seem to be really testing anything anymore.



##########
airflow-core/src/airflow/triggers/deadline.py:
##########
@@ -49,7 +49,21 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
 
         try:
             callback = import_string(self.callback_path)
-            result = await callback(**self.callback_kwargs)
+
+            # TODO: get airflow context
+            context: dict = {}
+
+            # If callback is an awaitable class, callback_kwargs will be used 
as parameters to the constructor
+            if hasattr(callback, "__await__"):
+                callback_instance = callback(**self.callback_kwargs)
+
+                # Since parameters cannot be passed in __await__, context is 
passed as an attribute
+                callback_instance.context = context

Review Comment:
   Do you have an example of what the callback class looks like that hits this 
code path?



##########
task-sdk/src/airflow/sdk/bases/notifier.py:
##########
@@ -33,11 +32,31 @@
 
 
 class BaseNotifier(LoggingMixin, Templater):
-    """BaseNotifier class for sending notifications."""
+    """
+    BaseNotifier class for sending notifications.
+
+    It can be used asynchronously (preferred) if `async_notify`is implemented 
and/or
+    synchronously if `notify` is implemented.
+
+    Currently, the DAG/Task state change callbacks run on the DAG Processor 
and only support sync usage.
+
+    Usage:

Review Comment:
   ```suggestion
       Usage::
       
   ```
   
   (Then it'll get rendered as a code block if these docs ever rendered by 
sphinx)



##########
airflow-core/tests/unit/models/test_deadline.py:
##########
@@ -87,19 +98,7 @@ def setup_method():
     def teardown_method():
         _clean_db()
 
-    def test_add_deadline(self, dagrun, session):
-        assert session.query(Deadline).count() == 0
-        deadline_orm = Deadline(
-            deadline_time=DEFAULT_DATE,
-            callback=TEST_ASYNC_CALLBACK,
-            dagrun_id=dagrun.id,
-        )
-
-        session.add(deadline_orm)
-        session.flush()
-
-        assert session.query(Deadline).count() == 1
-
+    def test_add_deadline(self, dagrun, deadline_orm, session):

Review Comment:
   Is this test worth keeping? Does it actually test anything useful?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to