ashb commented on code in PR #53201:
URL: https://github.com/apache/airflow/pull/53201#discussion_r2213702719


##########
airflow-core/src/airflow/models/trigger.py:
##########
@@ -192,6 +194,15 @@ def fetch_trigger_ids_with_asset(cls, session: Session = 
NEW_SESSION) -> set[str
         query = select(asset_trigger_association_table.columns.trigger_id)
         return {trigger_id for trigger_id in session.scalars(query)}
 
+    @classmethod
+    @provide_session
+    def fetch_trigger_ids_with_deadline_callback(cls, session: Session = 
NEW_SESSION) -> set[str]:
+        """Fetch all the trigger IDs associated with a Deadline Alert."""
+        from airflow.models.deadline import Deadline
+
+        query = 
select(Deadline.trigger_id).where(Deadline.trigger_id.is_not(None))
+        return {trigger_id for trigger_id in session.scalars(query)}

Review Comment:
   Hmmmm, I'm not a fan of this pattern -- we now have it in two places, we 
might need to think about a more general purpose way than adding another 
special cased column/relationship



##########
airflow-core/src/airflow/triggers/deadline.py:
##########
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import logging
+from collections.abc import AsyncIterator
+from typing import Any
+
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+from airflow.utils.module_loading import import_string
+
+logger = logging.getLogger(__name__)
+
+
+class DeadlineCallbackTrigger(BaseTrigger):
+    """Trigger that executes a deadline callback function asynchronously."""
+
+    def __init__(self, callback_path: str, callback_kwargs: dict[str, Any] | 
None = None):
+        super().__init__()
+        self.callback_path = callback_path
+        self.callback_kwargs = callback_kwargs or {}
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        return (
+            f"{type(self).__module__}.{type(self).__qualname__}",
+            {"callback_path": self.callback_path, "callback_kwargs": 
self.callback_kwargs},
+        )
+
+    async def run(self) -> AsyncIterator[TriggerEvent]:
+        callback = import_string(self.callback_path)

Review Comment:
   For this to run, there's a reasonable chance that well need to load the DAG 
bundle to get the callback, right? I.e. callbacks are defined inside the DAG 
file?



##########
airflow-core/src/airflow/models/deadline.py:
##########
@@ -150,6 +160,36 @@ def prune_deadlines(cls, *, session: Session, conditions: 
dict[Column, Any]) ->
 
         return deleted_count
 
+    def handle_miss(self, session: Session):
+        """Handle a missed deadline by creating and starting a trigger to run 
the callback."""
+        callback_func = import_string(self.callback)
+        # TODO: Improve this check so that the callback doesn't need to be 
present on the scheduler
+        if not asyncio.iscoroutinefunction(callback_func):
+            # TODO: For sync callbacks, use executor instead of trigger
+            logger.error("Sync callbacks not supported yet: %s", self.callback)

Review Comment:
   Could this be detected at parse time Instead of only when the callback is 
being run?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to