ashb commented on code in PR #53831:
URL: https://github.com/apache/airflow/pull/53831#discussion_r2283333349
##########
airflow-core/docs/administration-and-deployment/logging-monitoring/callbacks.rst:
##########
@@ -95,22 +101,43 @@ Before each task begins to execute, the
``task_execute_callback`` function will
with DAG(
dag_id="example_callback",
- schedule=None,
- start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
- dagrun_timeout=datetime.timedelta(minutes=60),
- catchup=False,
on_success_callback=dag_success_alert,
default_args={"on_execute_callback": task_execute_callback},
- tags=["example"],
):
task1 = EmptyOperator(task_id="task1",
on_failure_callback=[task_failure_alert])
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3")
task1 >> task2 >> task3
-.. note::
- As of Airflow 2.6.0, callbacks now supports a list of callback functions,
allowing users to specify multiple functions
- to be executed in the desired event. Simply pass a list of callback
functions to the callback args when defining your DAG/task
- callbacks: e.g ``on_failure_callback=[callback_func_1, callback_func_2]``
-
Full list of variables available in ``context`` in :doc:`docs
<../../templates-ref>` and `code
<https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/context.py>`_.
+
+
+Using Notifiers
+^^^^^^^^^^^^^^^
+
+You can use Notifiers in your ``DAG`` definition by passing it as an argument
to the ``on_*_callbacks``.
Review Comment:
```suggestion
You can use Notifiers in your Dag definition by passing it as an argument to
the ``on_*_callbacks``.
```
I think is the casing we are using now? Or if not that, at least don't put
it in a code block
##########
airflow-core/src/airflow/triggers/deadline.py:
##########
@@ -49,7 +49,21 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
try:
callback = import_string(self.callback_path)
- result = await callback(**self.callback_kwargs)
+
+ # TODO: get airflow context
+ context: dict = {}
+
+ # If callback is an awaitable class, callback_kwargs will be used
as parameters to the constructor
+ if hasattr(callback, "__await__"):
+ callback_instance = callback(**self.callback_kwargs)
+
+ # Since parameters cannot be passed in __await__, context is
passed as an attribute
+ callback_instance.context = context
Review Comment:
Was there a reason why we set it as an attribute, instead of passing it in
as another kwarg on L58?
```suggestion
callback_instance = callback(**self.callback_kwargs,
context=context)
```
Not every class is writable/can have attributes added to it. I'm not sure
which approach is actually more "developer friendly".
##########
airflow-core/tests/unit/models/test_deadline.py:
##########
@@ -142,32 +149,18 @@ def test_prune_deadlines(self, mock_session, conditions):
else:
mock_session.query.assert_not_called()
- def test_orm(self):
- deadline_orm = Deadline(
- deadline_time=DEFAULT_DATE,
- callback=TEST_ASYNC_CALLBACK,
- dagrun_id=RUN_ID,
- )
-
+ def test_orm(self, deadline_orm, dagrun):
Review Comment:
Ditto here -- this doesn't seem to be really testing anything anymore.
##########
airflow-core/src/airflow/triggers/deadline.py:
##########
@@ -49,7 +49,21 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
try:
callback = import_string(self.callback_path)
- result = await callback(**self.callback_kwargs)
+
+ # TODO: get airflow context
+ context: dict = {}
+
+ # If callback is an awaitable class, callback_kwargs will be used
as parameters to the constructor
+ if hasattr(callback, "__await__"):
+ callback_instance = callback(**self.callback_kwargs)
+
+ # Since parameters cannot be passed in __await__, context is
passed as an attribute
+ callback_instance.context = context
Review Comment:
Do you have an example of what the callback class looks like that hits this
code path?
##########
task-sdk/src/airflow/sdk/bases/notifier.py:
##########
@@ -33,11 +32,31 @@
class BaseNotifier(LoggingMixin, Templater):
- """BaseNotifier class for sending notifications."""
+ """
+ BaseNotifier class for sending notifications.
+
+ It can be used asynchronously (preferred) if `async_notify`is implemented
and/or
+ synchronously if `notify` is implemented.
+
+ Currently, the DAG/Task state change callbacks run on the DAG Processor
and only support sync usage.
+
+ Usage:
Review Comment:
```suggestion
Usage::
```
(Then it'll get rendered as a code block if these docs ever rendered by
sphinx)
##########
airflow-core/tests/unit/models/test_deadline.py:
##########
@@ -87,19 +98,7 @@ def setup_method():
def teardown_method():
_clean_db()
- def test_add_deadline(self, dagrun, session):
- assert session.query(Deadline).count() == 0
- deadline_orm = Deadline(
- deadline_time=DEFAULT_DATE,
- callback=TEST_ASYNC_CALLBACK,
- dagrun_id=dagrun.id,
- )
-
- session.add(deadline_orm)
- session.flush()
-
- assert session.query(Deadline).count() == 1
-
+ def test_add_deadline(self, dagrun, deadline_orm, session):
Review Comment:
Is this test worth keeping? Does it actually test anything useful?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]