Lee-W commented on code in PR #53727:
URL: https://github.com/apache/airflow/pull/53727#discussion_r2266271491


##########
airflow-core/docs/core-concepts/dags.rst:
##########
@@ -829,3 +829,40 @@ if it fails for ``N`` number of times consecutively.
 we can also provide and override these configuration from DAG argument:
 
 - ``max_consecutive_failed_dag_runs``: Overrides 
:ref:`config:core__max_consecutive_failed_dag_runs_per_dag`.
+
+Deadline Alerts
+---------------
+
+.. versionadded:: 3.1
+
+Deadline Alerts allow you to set time thresholds for your DAG runs and 
automatically respond when those

Review Comment:
   ```suggestion
   Deadline Alerts allow you to set time thresholds for your Dag runs and 
automatically respond when those
   ```



##########
airflow-core/docs/core-concepts/dags.rst:
##########
@@ -829,3 +829,40 @@ if it fails for ``N`` number of times consecutively.
 we can also provide and override these configuration from DAG argument:
 
 - ``max_consecutive_failed_dag_runs``: Overrides 
:ref:`config:core__max_consecutive_failed_dag_runs_per_dag`.
+
+Deadline Alerts
+---------------
+
+.. versionadded:: 3.1
+
+Deadline Alerts allow you to set time thresholds for your DAG runs and 
automatically respond when those
+thresholds are exceeded. You can set deadlines relative to a fixed datetime, 
use one of the available
+calculated references (like DAG queue time or start time), or implement your 
own custom reference.
+When a deadline is exceeded, it triggers a callback which can notify you or 
take other actions.
+
+Here's a simple example using the existing email Notifier:
+
+.. code-block:: python
+
+    from datetime import timedelta
+    from airflow import DAG
+    from airflow.providers.smtp.notifications.smtp import SmtpNotifier
+    from airflow.sdk.definitions.deadline import DeadlineAlert, 
DeadlineReference
+
+    with DAG(
+        dag_id="email_deadline",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=30),
+            callback=SmtpNotifier(
+                to="[email protected]",
+                subject="[Alert] DAG {{ dag.dag_id }} exceeded time threshold",

Review Comment:
   ```suggestion
                   subject="[Alert] Dag {{ dag.dag_id }} exceeded time 
threshold",
   ```



##########
airflow-core/docs/howto/deadline-alerts.rst:
##########
@@ -0,0 +1,312 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Deadline Alerts
+===============
+
+The :class:`~airflow.sdk.definitions.deadline.DeadlineAlert` feature is the 
next evolution of
+the old SLA.  Deadline Alerts allow you to set time thresholds for your DAG 
runs and automatically

Review Comment:
   ```suggestion
   the old SLA.  Deadline Alerts allow you to set time thresholds for your Dag 
runs and automatically
   ```



##########
airflow-core/docs/core-concepts/dags.rst:
##########
@@ -829,3 +829,40 @@ if it fails for ``N`` number of times consecutively.
 we can also provide and override these configuration from DAG argument:
 
 - ``max_consecutive_failed_dag_runs``: Overrides 
:ref:`config:core__max_consecutive_failed_dag_runs_per_dag`.
+
+Deadline Alerts
+---------------
+
+.. versionadded:: 3.1
+
+Deadline Alerts allow you to set time thresholds for your DAG runs and 
automatically respond when those
+thresholds are exceeded. You can set deadlines relative to a fixed datetime, 
use one of the available
+calculated references (like DAG queue time or start time), or implement your 
own custom reference.

Review Comment:
   ```suggestion
   calculated references (like Dag queue time or start time), or implement your 
own custom reference.
   ```



##########
airflow-core/docs/howto/deadline-alerts.rst:
##########
@@ -0,0 +1,312 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Deadline Alerts
+===============
+
+The :class:`~airflow.sdk.definitions.deadline.DeadlineAlert` feature is the 
next evolution of
+the old SLA.  Deadline Alerts allow you to set time thresholds for your DAG 
runs and automatically
+respond when those thresholds are exceeded. You can set up Deadline Alerts by 
choosing a built-in
+reference point, setting an interval, and defining a response using either 
Airflow's Notifiers or
+a custom callback function.
+
+Creating a Deadline Alert
+-------------------------
+
+Creating a Deadline Alert requires three mandatory and one optional parameter:
+
+* Reference: When to start counting from
+* Interval: How far before or after the reference point to trigger the alert
+* Callback: What to do when the deadline is exceeded
+* Callback Kwargs:  Optional values to pass to the Callback when it is run

Review Comment:
   ```suggestion
   * ``reference``: When to start counting from
   * ``interval``: How far before or after the reference point to trigger the 
alert
   * ``callback``: What to do when the deadline is exceeded
   ```
   
   looks more like 3 mandatory args but we can modify callback



##########
airflow-core/docs/howto/deadline-alerts.rst:
##########
@@ -0,0 +1,312 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Deadline Alerts
+===============
+
+The :class:`~airflow.sdk.definitions.deadline.DeadlineAlert` feature is the 
next evolution of
+the old SLA.  Deadline Alerts allow you to set time thresholds for your DAG 
runs and automatically
+respond when those thresholds are exceeded. You can set up Deadline Alerts by 
choosing a built-in
+reference point, setting an interval, and defining a response using either 
Airflow's Notifiers or
+a custom callback function.
+
+Creating a Deadline Alert
+-------------------------
+
+Creating a Deadline Alert requires three mandatory and one optional parameter:

Review Comment:
   ```suggestion
   Creating a Deadline Alert requires 3 mandatory and 1 optional parameter:
   ```
   
   I feel it might be more readable this way 🤔 



##########
airflow-core/docs/howto/deadline-alerts.rst:
##########
@@ -0,0 +1,312 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Deadline Alerts
+===============
+
+The :class:`~airflow.sdk.definitions.deadline.DeadlineAlert` feature is the 
next evolution of
+the old SLA.  Deadline Alerts allow you to set time thresholds for your DAG 
runs and automatically
+respond when those thresholds are exceeded. You can set up Deadline Alerts by 
choosing a built-in
+reference point, setting an interval, and defining a response using either 
Airflow's Notifiers or
+a custom callback function.
+
+Creating a Deadline Alert
+-------------------------
+
+Creating a Deadline Alert requires three mandatory and one optional parameter:
+
+* Reference: When to start counting from
+* Interval: How far before or after the reference point to trigger the alert
+* Callback: What to do when the deadline is exceeded
+* Callback Kwargs:  Optional values to pass to the Callback when it is run
+
+Here is how Deadlines are calculated:
+
+::
+
+    [Reference] ------ [Interval] ------> [Deadline]
+        ^                                     ^
+        |                                     |
+     Start time                          Trigger point
+
+Below is an example DAG implementation. If the DAG has not finished 15 minutes 
after it was queued, send a Slack message:
+
+.. code-block:: python
+
+    from datetime import datetime, timedelta
+    from airflow import DAG
+    from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, 
DeadlineReference
+    from airflow.providers.slack.notifications.slack_webhook import 
SlackWebhookNotifier
+    from airflow.providers.standard.operators.empty import EmptyOperator
+
+    with DAG(
+        dag_id="deadline_alert_example",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=15),
+            callback=AsyncCallback(
+                SlackWebhookNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "DAG 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+The timeline for this example would look like this:
+
+::
+
+    |------|-----------|---------|-----------|--------|
+        Scheduled    Queued    Started    Deadline
+         00:00       00:03      00:05      00:18
+
+Using Built-in References
+-------------------------
+
+Airflow provides several built-in reference points that you can use with 
DeadlineAlert:
+
+``DeadlineReference.DAGRUN_QUEUED_AT``
+    Measures time from when the DagRun was queued. Useful for monitoring 
resource constraints.
+
+``DeadlineReference.DAGRUN_LOGICAL_DATE``
+    References when the DAG run was scheduled to start. For example, setting 
an interval of
+    ``timedelta(minutes=15)`` would trigger the alert if the DAG hasn't 
completed 15 minutes
+    after it was scheduled to start, regardless of when (or if) it actually 
began executing.
+    Useful for ensuring scheduled DAGs complete before their next scheduled 
run.
+
+``DeadlineReference.FIXED_DATETIME``
+    Specifies a fixed point in time. Useful when DAGs must complete by a 
specific time.
+
+Here's an example using a fixed datetime:
+
+.. code-block:: python
+
+    tomorrow_at_ten = datetime.combine(datetime.now().date() + 
timedelta(days=1), time(10, 0))
+
+    with DAG(
+        dag_id="fixed_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.FIXED_DATETIME(tomorrow_at_ten),
+            interval=timedelta(minutes=-30),  # Alert 30 minutes before the 
reference.
+            callback=AsyncCallback(
+                SlackWebhookNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "DAG 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+The timeline for this example would look like this:
+
+::
+
+    |------|----------|---------|------------|--------|
+         Queued     Start    Deadline    Reference
+         09:15      09:17     09:30       10:00
+
+.. note::
+    Note that since the interval is a negative value, the deadline is before 
the reference in this case.
+
+Using Callbacks
+---------------
+
+When a deadline is exceeded, the callback is executed. You can use an existing 
:doc:`Notifier </howto/notifications>`
+or create a custom callback function.  A callback must be either an 
:class:`~airflow.sdk.definitions.deadline.AsyncCallback`
+or a :class:`~airflow.sdk.definitions.deadline.SyncCallback`.
+
+Using Built-in Notifiers
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+Here's an example using the Slack Notifier if the DagRun has not finished 
within 30 minutes of it being queued:

Review Comment:
   ```suggestion
   Here's an example using the Slack Notifier if the Dag run has not finished 
within 30 minutes of it being queued:
   ```



##########
airflow-core/docs/howto/deadline-alerts.rst:
##########
@@ -0,0 +1,312 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Deadline Alerts
+===============
+
+The :class:`~airflow.sdk.definitions.deadline.DeadlineAlert` feature is the 
next evolution of
+the old SLA.  Deadline Alerts allow you to set time thresholds for your DAG 
runs and automatically
+respond when those thresholds are exceeded. You can set up Deadline Alerts by 
choosing a built-in
+reference point, setting an interval, and defining a response using either 
Airflow's Notifiers or
+a custom callback function.
+
+Creating a Deadline Alert
+-------------------------
+
+Creating a Deadline Alert requires three mandatory and one optional parameter:
+
+* Reference: When to start counting from
+* Interval: How far before or after the reference point to trigger the alert
+* Callback: What to do when the deadline is exceeded
+* Callback Kwargs:  Optional values to pass to the Callback when it is run

Review Comment:
   ```suggestion
   * Callback Kwargs: Optional values to pass to the Callback when it is run
   ```



##########
airflow-core/docs/howto/deadline-alerts.rst:
##########
@@ -0,0 +1,312 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Deadline Alerts
+===============
+
+The :class:`~airflow.sdk.definitions.deadline.DeadlineAlert` feature is the 
next evolution of
+the old SLA.  Deadline Alerts allow you to set time thresholds for your DAG 
runs and automatically
+respond when those thresholds are exceeded. You can set up Deadline Alerts by 
choosing a built-in
+reference point, setting an interval, and defining a response using either 
Airflow's Notifiers or
+a custom callback function.
+
+Creating a Deadline Alert
+-------------------------
+
+Creating a Deadline Alert requires three mandatory and one optional parameter:
+
+* Reference: When to start counting from
+* Interval: How far before or after the reference point to trigger the alert
+* Callback: What to do when the deadline is exceeded
+* Callback Kwargs:  Optional values to pass to the Callback when it is run
+
+Here is how Deadlines are calculated:
+
+::
+
+    [Reference] ------ [Interval] ------> [Deadline]
+        ^                                     ^
+        |                                     |
+     Start time                          Trigger point
+
+Below is an example DAG implementation. If the DAG has not finished 15 minutes 
after it was queued, send a Slack message:
+
+.. code-block:: python
+
+    from datetime import datetime, timedelta
+    from airflow import DAG
+    from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, 
DeadlineReference
+    from airflow.providers.slack.notifications.slack_webhook import 
SlackWebhookNotifier
+    from airflow.providers.standard.operators.empty import EmptyOperator
+
+    with DAG(
+        dag_id="deadline_alert_example",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=15),
+            callback=AsyncCallback(
+                SlackWebhookNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "DAG 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+The timeline for this example would look like this:
+
+::
+
+    |------|-----------|---------|-----------|--------|
+        Scheduled    Queued    Started    Deadline
+         00:00       00:03      00:05      00:18
+
+Using Built-in References
+-------------------------
+
+Airflow provides several built-in reference points that you can use with 
DeadlineAlert:
+
+``DeadlineReference.DAGRUN_QUEUED_AT``
+    Measures time from when the DagRun was queued. Useful for monitoring 
resource constraints.
+
+``DeadlineReference.DAGRUN_LOGICAL_DATE``
+    References when the DAG run was scheduled to start. For example, setting 
an interval of
+    ``timedelta(minutes=15)`` would trigger the alert if the DAG hasn't 
completed 15 minutes
+    after it was scheduled to start, regardless of when (or if) it actually 
began executing.
+    Useful for ensuring scheduled DAGs complete before their next scheduled 
run.
+
+``DeadlineReference.FIXED_DATETIME``
+    Specifies a fixed point in time. Useful when DAGs must complete by a 
specific time.
+
+Here's an example using a fixed datetime:
+
+.. code-block:: python
+
+    tomorrow_at_ten = datetime.combine(datetime.now().date() + 
timedelta(days=1), time(10, 0))
+
+    with DAG(
+        dag_id="fixed_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.FIXED_DATETIME(tomorrow_at_ten),
+            interval=timedelta(minutes=-30),  # Alert 30 minutes before the 
reference.
+            callback=AsyncCallback(
+                SlackWebhookNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "DAG 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+The timeline for this example would look like this:
+
+::
+
+    |------|----------|---------|------------|--------|
+         Queued     Start    Deadline    Reference
+         09:15      09:17     09:30       10:00
+
+.. note::
+    Note that since the interval is a negative value, the deadline is before 
the reference in this case.
+
+Using Callbacks
+---------------
+
+When a deadline is exceeded, the callback is executed. You can use an existing 
:doc:`Notifier </howto/notifications>`
+or create a custom callback function.  A callback must be either an 
:class:`~airflow.sdk.definitions.deadline.AsyncCallback`
+or a :class:`~airflow.sdk.definitions.deadline.SyncCallback`.
+
+Using Built-in Notifiers
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+Here's an example using the Slack Notifier if the DagRun has not finished 
within 30 minutes of it being queued:
+
+.. code-block:: python
+
+    with DAG(
+        dag_id="slack_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=30),
+            callback=AsyncCallback(
+                SlackNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "DAG 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+Creating Custom Callbacks
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+You can create custom callbacks for more complex handling. The 
``callback_kwargs`` specified in

Review Comment:
   This `callback_kwargs` is a bit confusing to me as it's not the actual 
parameter name.



##########
airflow-core/docs/howto/deadline-alerts.rst:
##########
@@ -0,0 +1,312 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Deadline Alerts
+===============
+
+The :class:`~airflow.sdk.definitions.deadline.DeadlineAlert` feature is the 
next evolution of
+the old SLA.  Deadline Alerts allow you to set time thresholds for your DAG 
runs and automatically
+respond when those thresholds are exceeded. You can set up Deadline Alerts by 
choosing a built-in
+reference point, setting an interval, and defining a response using either 
Airflow's Notifiers or
+a custom callback function.
+
+Creating a Deadline Alert
+-------------------------
+
+Creating a Deadline Alert requires three mandatory and one optional parameter:
+
+* Reference: When to start counting from
+* Interval: How far before or after the reference point to trigger the alert
+* Callback: What to do when the deadline is exceeded
+* Callback Kwargs:  Optional values to pass to the Callback when it is run
+
+Here is how Deadlines are calculated:
+
+::
+
+    [Reference] ------ [Interval] ------> [Deadline]
+        ^                                     ^
+        |                                     |
+     Start time                          Trigger point
+
+Below is an example DAG implementation. If the DAG has not finished 15 minutes 
after it was queued, send a Slack message:
+
+.. code-block:: python
+
+    from datetime import datetime, timedelta
+    from airflow import DAG
+    from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, 
DeadlineReference
+    from airflow.providers.slack.notifications.slack_webhook import 
SlackWebhookNotifier
+    from airflow.providers.standard.operators.empty import EmptyOperator
+
+    with DAG(
+        dag_id="deadline_alert_example",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=15),
+            callback=AsyncCallback(
+                SlackWebhookNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "DAG 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+The timeline for this example would look like this:
+
+::
+
+    |------|-----------|---------|-----------|--------|
+        Scheduled    Queued    Started    Deadline
+         00:00       00:03      00:05      00:18
+
+Using Built-in References
+-------------------------
+
+Airflow provides several built-in reference points that you can use with 
DeadlineAlert:
+
+``DeadlineReference.DAGRUN_QUEUED_AT``
+    Measures time from when the DagRun was queued. Useful for monitoring 
resource constraints.
+
+``DeadlineReference.DAGRUN_LOGICAL_DATE``
+    References when the DAG run was scheduled to start. For example, setting 
an interval of
+    ``timedelta(minutes=15)`` would trigger the alert if the DAG hasn't 
completed 15 minutes
+    after it was scheduled to start, regardless of when (or if) it actually 
began executing.
+    Useful for ensuring scheduled DAGs complete before their next scheduled 
run.
+
+``DeadlineReference.FIXED_DATETIME``
+    Specifies a fixed point in time. Useful when DAGs must complete by a 
specific time.
+
+Here's an example using a fixed datetime:
+
+.. code-block:: python
+
+    tomorrow_at_ten = datetime.combine(datetime.now().date() + 
timedelta(days=1), time(10, 0))
+
+    with DAG(
+        dag_id="fixed_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.FIXED_DATETIME(tomorrow_at_ten),
+            interval=timedelta(minutes=-30),  # Alert 30 minutes before the 
reference.
+            callback=AsyncCallback(
+                SlackWebhookNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "DAG 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+The timeline for this example would look like this:
+
+::
+
+    |------|----------|---------|------------|--------|
+         Queued     Start    Deadline    Reference
+         09:15      09:17     09:30       10:00
+
+.. note::
+    Note that since the interval is a negative value, the deadline is before 
the reference in this case.
+
+Using Callbacks
+---------------
+
+When a deadline is exceeded, the callback is executed. You can use an existing 
:doc:`Notifier </howto/notifications>`
+or create a custom callback function.  A callback must be either an 
:class:`~airflow.sdk.definitions.deadline.AsyncCallback`
+or a :class:`~airflow.sdk.definitions.deadline.SyncCallback`.
+
+Using Built-in Notifiers
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+Here's an example using the Slack Notifier if the DagRun has not finished 
within 30 minutes of it being queued:
+
+.. code-block:: python
+
+    with DAG(
+        dag_id="slack_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=30),
+            callback=AsyncCallback(
+                SlackNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "DAG 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+Creating Custom Callbacks
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+You can create custom callbacks for more complex handling. The 
``callback_kwargs`` specified in
+the ``DeadlineAlert`` are passed to the callback function, if any are 
provided.  **Synchronous callbacks**
+(standard python methods) can be defined in the dag bundle and are run in the 
Executor.  **Asynchronous
+callbacks** must be defined somewhere in the Triggerer's system path.
+
+.. note::
+    Regarding Async Custom Deadline callbacks:
+
+    * Async callbacks are executed by the Triggerer, so users must ensure they 
are importable by the Triggerer.
+    * One easy way to do this is to place the callback as a top-level method 
in a new file in the plugins folder.
+    * The Triggerer will need to be restarted when a callback is added or 
changed in order to reload the file.
+
+A **custom synchronous callback** might look like this:
+
+.. code-block:: python
+
+    from datetime import timedelta
+
+    from airflow import DAG
+    from airflow.providers.standard.operators.empty import EmptyOperator
+    from airflow.sdk.definitions.deadline import DeadlineAlert, 
DeadlineReference, SyncCallback
+
+
+    def custom_synchronous_callback(**kwargs):
+        """Handle deadline violation with custom logic."""
+        print(f"Deadline exceeded for DAG {kwargs.get("dag_id")}!")
+        print(f"Alert type: {kwargs.get("alert_type")}")
+        # Additional custom handling here
+
+
+    with DAG(
+        dag_id="custom_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=15),
+            callback=SyncCallback(
+                custom_synchronous_callback,
+                kwargs={"alert_type": "time_exceeded", "dag_id": 
"custom_deadline_alert"},
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+A **custom asynchronous callback** is only slightly more work.  Note in the 
following example that
+the custom callback code is placed in a separate file, and must be imported in 
the DAG file.

Review Comment:
   ```suggestion
   the custom callback code is placed in a separate file, and must be imported 
in the Dag file.
   ```



##########
airflow-core/docs/howto/deadline-alerts.rst:
##########
@@ -0,0 +1,312 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Deadline Alerts
+===============
+
+The :class:`~airflow.sdk.definitions.deadline.DeadlineAlert` feature is the 
next evolution of
+the old SLA.  Deadline Alerts allow you to set time thresholds for your DAG 
runs and automatically
+respond when those thresholds are exceeded. You can set up Deadline Alerts by 
choosing a built-in
+reference point, setting an interval, and defining a response using either 
Airflow's Notifiers or
+a custom callback function.
+
+Creating a Deadline Alert
+-------------------------
+
+Creating a Deadline Alert requires three mandatory and one optional parameter:
+
+* Reference: When to start counting from
+* Interval: How far before or after the reference point to trigger the alert
+* Callback: What to do when the deadline is exceeded
+* Callback Kwargs:  Optional values to pass to the Callback when it is run
+
+Here is how Deadlines are calculated:
+
+::
+
+    [Reference] ------ [Interval] ------> [Deadline]
+        ^                                     ^
+        |                                     |
+     Start time                          Trigger point
+
+Below is an example DAG implementation. If the DAG has not finished 15 minutes 
after it was queued, send a Slack message:
+
+.. code-block:: python
+
+    from datetime import datetime, timedelta
+    from airflow import DAG
+    from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, 
DeadlineReference
+    from airflow.providers.slack.notifications.slack_webhook import 
SlackWebhookNotifier
+    from airflow.providers.standard.operators.empty import EmptyOperator
+
+    with DAG(
+        dag_id="deadline_alert_example",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=15),
+            callback=AsyncCallback(
+                SlackWebhookNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "DAG 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+The timeline for this example would look like this:
+
+::
+
+    |------|-----------|---------|-----------|--------|
+        Scheduled    Queued    Started    Deadline
+         00:00       00:03      00:05      00:18
+
+Using Built-in References
+-------------------------
+
+Airflow provides several built-in reference points that you can use with 
DeadlineAlert:
+
+``DeadlineReference.DAGRUN_QUEUED_AT``
+    Measures time from when the DagRun was queued. Useful for monitoring 
resource constraints.
+
+``DeadlineReference.DAGRUN_LOGICAL_DATE``
+    References when the DAG run was scheduled to start. For example, setting 
an interval of
+    ``timedelta(minutes=15)`` would trigger the alert if the DAG hasn't 
completed 15 minutes
+    after it was scheduled to start, regardless of when (or if) it actually 
began executing.
+    Useful for ensuring scheduled DAGs complete before their next scheduled 
run.
+
+``DeadlineReference.FIXED_DATETIME``
+    Specifies a fixed point in time. Useful when DAGs must complete by a 
specific time.
+
+Here's an example using a fixed datetime:
+
+.. code-block:: python
+
+    tomorrow_at_ten = datetime.combine(datetime.now().date() + 
timedelta(days=1), time(10, 0))
+
+    with DAG(
+        dag_id="fixed_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.FIXED_DATETIME(tomorrow_at_ten),
+            interval=timedelta(minutes=-30),  # Alert 30 minutes before the 
reference.
+            callback=AsyncCallback(
+                SlackWebhookNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "DAG 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+The timeline for this example would look like this:
+
+::
+
+    |------|----------|---------|------------|--------|
+         Queued     Start    Deadline    Reference
+         09:15      09:17     09:30       10:00
+
+.. note::
+    Note that since the interval is a negative value, the deadline is before 
the reference in this case.
+
+Using Callbacks
+---------------
+
+When a deadline is exceeded, the callback is executed. You can use an existing 
:doc:`Notifier </howto/notifications>`
+or create a custom callback function.  A callback must be either an 
:class:`~airflow.sdk.definitions.deadline.AsyncCallback`
+or a :class:`~airflow.sdk.definitions.deadline.SyncCallback`.
+
+Using Built-in Notifiers
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+Here's an example using the Slack Notifier if the DagRun has not finished 
within 30 minutes of it being queued:
+
+.. code-block:: python
+
+    with DAG(
+        dag_id="slack_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=30),
+            callback=AsyncCallback(
+                SlackNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "DAG 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+Creating Custom Callbacks
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+You can create custom callbacks for more complex handling. The 
``callback_kwargs`` specified in
+the ``DeadlineAlert`` are passed to the callback function, if any are 
provided.  **Synchronous callbacks**
+(standard python methods) can be defined in the dag bundle and are run in the 
Executor.  **Asynchronous
+callbacks** must be defined somewhere in the Triggerer's system path.
+
+.. note::
+    Regarding Async Custom Deadline callbacks:
+
+    * Async callbacks are executed by the Triggerer, so users must ensure they 
are importable by the Triggerer.
+    * One easy way to do this is to place the callback as a top-level method 
in a new file in the plugins folder.
+    * The Triggerer will need to be restarted when a callback is added or 
changed in order to reload the file.
+
+A **custom synchronous callback** might look like this:
+
+.. code-block:: python
+
+    from datetime import timedelta
+
+    from airflow import DAG
+    from airflow.providers.standard.operators.empty import EmptyOperator
+    from airflow.sdk.definitions.deadline import DeadlineAlert, 
DeadlineReference, SyncCallback
+
+
+    def custom_synchronous_callback(**kwargs):
+        """Handle deadline violation with custom logic."""
+        print(f"Deadline exceeded for DAG {kwargs.get("dag_id")}!")
+        print(f"Alert type: {kwargs.get("alert_type")}")
+        # Additional custom handling here
+
+
+    with DAG(
+        dag_id="custom_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=15),
+            callback=SyncCallback(
+                custom_synchronous_callback,
+                kwargs={"alert_type": "time_exceeded", "dag_id": 
"custom_deadline_alert"},
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+A **custom asynchronous callback** is only slightly more work.  Note in the 
following example that
+the custom callback code is placed in a separate file, and must be imported in 
the DAG file.
+
+.. code-block:: python
+
+    # Place this method in `/files/plugins/deadline_callbacks.py`
+    async def custom_async_callback(**kwargs):
+        """Handle deadline violation with custom logic."""
+        print(f"Deadline exceeded for DAG {kwargs.get("dag_id")}!")
+        print(f"Alert type: {kwargs.get("alert_type")}")
+        # Additional custom handling here
+
+
+    # Place this in a dag file

Review Comment:
   ```suggestion
   
   .. code-block:: python
   
       # Place this in a dag file
   ```
   
   maybe making them 2 blocks?



##########
airflow-core/docs/howto/deadline-alerts.rst:
##########
@@ -0,0 +1,312 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Deadline Alerts
+===============
+
+The :class:`~airflow.sdk.definitions.deadline.DeadlineAlert` feature is the 
next evolution of
+the old SLA.  Deadline Alerts allow you to set time thresholds for your DAG 
runs and automatically
+respond when those thresholds are exceeded. You can set up Deadline Alerts by 
choosing a built-in
+reference point, setting an interval, and defining a response using either 
Airflow's Notifiers or
+a custom callback function.
+
+Creating a Deadline Alert
+-------------------------
+
+Creating a Deadline Alert requires three mandatory and one optional parameter:
+
+* Reference: When to start counting from
+* Interval: How far before or after the reference point to trigger the alert
+* Callback: What to do when the deadline is exceeded
+* Callback Kwargs:  Optional values to pass to the Callback when it is run
+
+Here is how Deadlines are calculated:
+
+::
+
+    [Reference] ------ [Interval] ------> [Deadline]
+        ^                                     ^
+        |                                     |
+     Start time                          Trigger point
+
+Below is an example DAG implementation. If the DAG has not finished 15 minutes 
after it was queued, send a Slack message:
+
+.. code-block:: python
+
+    from datetime import datetime, timedelta
+    from airflow import DAG
+    from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, 
DeadlineReference
+    from airflow.providers.slack.notifications.slack_webhook import 
SlackWebhookNotifier
+    from airflow.providers.standard.operators.empty import EmptyOperator
+
+    with DAG(
+        dag_id="deadline_alert_example",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=15),
+            callback=AsyncCallback(
+                SlackWebhookNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "DAG 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+The timeline for this example would look like this:
+
+::
+
+    |------|-----------|---------|-----------|--------|
+        Scheduled    Queued    Started    Deadline
+         00:00       00:03      00:05      00:18
+
+Using Built-in References
+-------------------------
+
+Airflow provides several built-in reference points that you can use with 
DeadlineAlert:
+
+``DeadlineReference.DAGRUN_QUEUED_AT``
+    Measures time from when the DagRun was queued. Useful for monitoring 
resource constraints.
+
+``DeadlineReference.DAGRUN_LOGICAL_DATE``
+    References when the DAG run was scheduled to start. For example, setting 
an interval of
+    ``timedelta(minutes=15)`` would trigger the alert if the DAG hasn't 
completed 15 minutes
+    after it was scheduled to start, regardless of when (or if) it actually 
began executing.
+    Useful for ensuring scheduled DAGs complete before their next scheduled 
run.
+
+``DeadlineReference.FIXED_DATETIME``
+    Specifies a fixed point in time. Useful when DAGs must complete by a 
specific time.
+
+Here's an example using a fixed datetime:
+
+.. code-block:: python
+
+    tomorrow_at_ten = datetime.combine(datetime.now().date() + 
timedelta(days=1), time(10, 0))
+
+    with DAG(
+        dag_id="fixed_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.FIXED_DATETIME(tomorrow_at_ten),
+            interval=timedelta(minutes=-30),  # Alert 30 minutes before the 
reference.
+            callback=AsyncCallback(
+                SlackWebhookNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "DAG 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+The timeline for this example would look like this:
+
+::
+
+    |------|----------|---------|------------|--------|
+         Queued     Start    Deadline    Reference
+         09:15      09:17     09:30       10:00
+
+.. note::
+    Note that since the interval is a negative value, the deadline is before 
the reference in this case.
+
+Using Callbacks
+---------------
+
+When a deadline is exceeded, the callback is executed. You can use an existing 
:doc:`Notifier </howto/notifications>`
+or create a custom callback function.  A callback must be either an 
:class:`~airflow.sdk.definitions.deadline.AsyncCallback`
+or a :class:`~airflow.sdk.definitions.deadline.SyncCallback`.
+
+Using Built-in Notifiers
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+Here's an example using the Slack Notifier if the DagRun has not finished 
within 30 minutes of it being queued:
+
+.. code-block:: python
+
+    with DAG(
+        dag_id="slack_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=30),
+            callback=AsyncCallback(
+                SlackNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "DAG 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+Creating Custom Callbacks
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+You can create custom callbacks for more complex handling. The 
``callback_kwargs`` specified in
+the ``DeadlineAlert`` are passed to the callback function, if any are 
provided.  **Synchronous callbacks**
+(standard python methods) can be defined in the dag bundle and are run in the 
Executor.  **Asynchronous
+callbacks** must be defined somewhere in the Triggerer's system path.
+
+.. note::
+    Regarding Async Custom Deadline callbacks:
+
+    * Async callbacks are executed by the Triggerer, so users must ensure they 
are importable by the Triggerer.
+    * One easy way to do this is to place the callback as a top-level method 
in a new file in the plugins folder.
+    * The Triggerer will need to be restarted when a callback is added or 
changed in order to reload the file.
+
+A **custom synchronous callback** might look like this:
+
+.. code-block:: python
+
+    from datetime import timedelta
+
+    from airflow import DAG
+    from airflow.providers.standard.operators.empty import EmptyOperator
+    from airflow.sdk.definitions.deadline import DeadlineAlert, 
DeadlineReference, SyncCallback
+
+
+    def custom_synchronous_callback(**kwargs):
+        """Handle deadline violation with custom logic."""
+        print(f"Deadline exceeded for DAG {kwargs.get("dag_id")}!")
+        print(f"Alert type: {kwargs.get("alert_type")}")
+        # Additional custom handling here
+
+
+    with DAG(
+        dag_id="custom_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=15),
+            callback=SyncCallback(
+                custom_synchronous_callback,
+                kwargs={"alert_type": "time_exceeded", "dag_id": 
"custom_deadline_alert"},
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+A **custom asynchronous callback** is only slightly more work.  Note in the 
following example that
+the custom callback code is placed in a separate file, and must be imported in 
the DAG file.
+
+.. code-block:: python
+
+    # Place this method in `/files/plugins/deadline_callbacks.py`
+    async def custom_async_callback(**kwargs):
+        """Handle deadline violation with custom logic."""
+        print(f"Deadline exceeded for DAG {kwargs.get("dag_id")}!")
+        print(f"Alert type: {kwargs.get("alert_type")}")
+        # Additional custom handling here
+
+
+    # Place this in a dag file
+    from datetime import timedelta
+
+    from deadline_callbacks import custom_async_callback
+
+    from airflow import DAG
+    from airflow.providers.standard.operators.empty import EmptyOperator
+    from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, 
DeadlineReference
+
+    with DAG(
+        dag_id="custom_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=15),
+            callback=AsyncCallback(
+                custom_async_callback,
+                kwargs={"alert_type": "time_exceeded", "dag_id": 
"custom_deadline_alert"},
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+
+Deadline Calculation
+^^^^^^^^^^^^^^^^^^^^
+
+A deadline's trigger time is calculated by adding the ``interval`` to the 
datetime returned by
+the ``reference``. For ``FIXED_DATETIME`` references, negative intervals can 
be particularly
+useful to trigger the callback *before* the reference time.
+
+For example:
+
+.. code-block:: python
+
+    next_meeting = datetime(2025, 06, 26, 9, 30)

Review Comment:
   ```suggestion
       next_meeting = datetime(2025, 6, 26, 9, 30)
   ```
   
   this is a syntax error



##########
airflow-core/docs/howto/deadline-alerts.rst:
##########
@@ -0,0 +1,312 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Deadline Alerts
+===============
+
+The :class:`~airflow.sdk.definitions.deadline.DeadlineAlert` feature is the 
next evolution of
+the old SLA.  Deadline Alerts allow you to set time thresholds for your DAG 
runs and automatically
+respond when those thresholds are exceeded. You can set up Deadline Alerts by 
choosing a built-in
+reference point, setting an interval, and defining a response using either 
Airflow's Notifiers or
+a custom callback function.
+
+Creating a Deadline Alert
+-------------------------
+
+Creating a Deadline Alert requires three mandatory and one optional parameter:
+
+* Reference: When to start counting from
+* Interval: How far before or after the reference point to trigger the alert
+* Callback: What to do when the deadline is exceeded
+* Callback Kwargs:  Optional values to pass to the Callback when it is run
+
+Here is how Deadlines are calculated:
+
+::
+
+    [Reference] ------ [Interval] ------> [Deadline]
+        ^                                     ^
+        |                                     |
+     Start time                          Trigger point
+
+Below is an example DAG implementation. If the DAG has not finished 15 minutes 
after it was queued, send a Slack message:
+
+.. code-block:: python
+
+    from datetime import datetime, timedelta
+    from airflow import DAG
+    from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, 
DeadlineReference
+    from airflow.providers.slack.notifications.slack_webhook import 
SlackWebhookNotifier
+    from airflow.providers.standard.operators.empty import EmptyOperator
+
+    with DAG(
+        dag_id="deadline_alert_example",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=15),
+            callback=AsyncCallback(
+                SlackWebhookNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "DAG 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+The timeline for this example would look like this:
+
+::
+
+    |------|-----------|---------|-----------|--------|
+        Scheduled    Queued    Started    Deadline
+         00:00       00:03      00:05      00:18
+
+Using Built-in References
+-------------------------
+
+Airflow provides several built-in reference points that you can use with 
DeadlineAlert:
+
+``DeadlineReference.DAGRUN_QUEUED_AT``
+    Measures time from when the DagRun was queued. Useful for monitoring 
resource constraints.
+
+``DeadlineReference.DAGRUN_LOGICAL_DATE``
+    References when the DAG run was scheduled to start. For example, setting 
an interval of
+    ``timedelta(minutes=15)`` would trigger the alert if the DAG hasn't 
completed 15 minutes
+    after it was scheduled to start, regardless of when (or if) it actually 
began executing.
+    Useful for ensuring scheduled DAGs complete before their next scheduled 
run.
+
+``DeadlineReference.FIXED_DATETIME``
+    Specifies a fixed point in time. Useful when DAGs must complete by a 
specific time.
+
+Here's an example using a fixed datetime:
+
+.. code-block:: python
+
+    tomorrow_at_ten = datetime.combine(datetime.now().date() + 
timedelta(days=1), time(10, 0))
+
+    with DAG(
+        dag_id="fixed_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.FIXED_DATETIME(tomorrow_at_ten),
+            interval=timedelta(minutes=-30),  # Alert 30 minutes before the 
reference.
+            callback=AsyncCallback(
+                SlackWebhookNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "DAG 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+The timeline for this example would look like this:
+
+::
+
+    |------|----------|---------|------------|--------|
+         Queued     Start    Deadline    Reference
+         09:15      09:17     09:30       10:00
+
+.. note::
+    Note that since the interval is a negative value, the deadline is before 
the reference in this case.
+
+Using Callbacks
+---------------
+
+When a deadline is exceeded, the callback is executed. You can use an existing 
:doc:`Notifier </howto/notifications>`
+or create a custom callback function.  A callback must be either an 
:class:`~airflow.sdk.definitions.deadline.AsyncCallback`
+or a :class:`~airflow.sdk.definitions.deadline.SyncCallback`.
+
+Using Built-in Notifiers
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+Here's an example using the Slack Notifier if the DagRun has not finished 
within 30 minutes of it being queued:
+
+.. code-block:: python
+
+    with DAG(
+        dag_id="slack_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=30),
+            callback=AsyncCallback(
+                SlackNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "DAG 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+Creating Custom Callbacks
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+You can create custom callbacks for more complex handling. The 
``callback_kwargs`` specified in
+the ``DeadlineAlert`` are passed to the callback function, if any are 
provided.  **Synchronous callbacks**
+(standard python methods) can be defined in the dag bundle and are run in the 
Executor.  **Asynchronous
+callbacks** must be defined somewhere in the Triggerer's system path.
+
+.. note::
+    Regarding Async Custom Deadline callbacks:
+
+    * Async callbacks are executed by the Triggerer, so users must ensure they 
are importable by the Triggerer.
+    * One easy way to do this is to place the callback as a top-level method 
in a new file in the plugins folder.
+    * The Triggerer will need to be restarted when a callback is added or 
changed in order to reload the file.
+
+A **custom synchronous callback** might look like this:
+
+.. code-block:: python
+
+    from datetime import timedelta
+
+    from airflow import DAG
+    from airflow.providers.standard.operators.empty import EmptyOperator
+    from airflow.sdk.definitions.deadline import DeadlineAlert, 
DeadlineReference, SyncCallback
+
+
+    def custom_synchronous_callback(**kwargs):
+        """Handle deadline violation with custom logic."""
+        print(f"Deadline exceeded for DAG {kwargs.get("dag_id")}!")
+        print(f"Alert type: {kwargs.get("alert_type")}")
+        # Additional custom handling here
+
+
+    with DAG(
+        dag_id="custom_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=15),
+            callback=SyncCallback(
+                custom_synchronous_callback,
+                kwargs={"alert_type": "time_exceeded", "dag_id": 
"custom_deadline_alert"},
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+A **custom asynchronous callback** is only slightly more work.  Note in the 
following example that
+the custom callback code is placed in a separate file, and must be imported in 
the DAG file.
+
+.. code-block:: python
+
+    # Place this method in `/files/plugins/deadline_callbacks.py`
+    async def custom_async_callback(**kwargs):
+        """Handle deadline violation with custom logic."""
+        print(f"Deadline exceeded for DAG {kwargs.get("dag_id")}!")
+        print(f"Alert type: {kwargs.get("alert_type")}")
+        # Additional custom handling here
+
+
+    # Place this in a dag file
+    from datetime import timedelta
+
+    from deadline_callbacks import custom_async_callback
+
+    from airflow import DAG
+    from airflow.providers.standard.operators.empty import EmptyOperator
+    from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, 
DeadlineReference
+
+    with DAG(
+        dag_id="custom_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=15),
+            callback=AsyncCallback(
+                custom_async_callback,
+                kwargs={"alert_type": "time_exceeded", "dag_id": 
"custom_deadline_alert"},
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+
+Deadline Calculation
+^^^^^^^^^^^^^^^^^^^^
+
+A deadline's trigger time is calculated by adding the ``interval`` to the 
datetime returned by
+the ``reference``. For ``FIXED_DATETIME`` references, negative intervals can 
be particularly
+useful to trigger the callback *before* the reference time.
+
+For example:
+
+.. code-block:: python
+
+    next_meeting = datetime(2025, 06, 26, 9, 30)
+
+    DeadlineAlert(
+        reference=DeadlineReference.FIXED_DATETIME(next_meeting),
+        interval=timedelta(hours=-2),
+        callback=notify_team,
+    )
+
+This will trigger the alert 2 hours before the next meeting starts.
+
+For ``DAGRUN_LOGICAL_DATE``, the interval is typically positive, setting a 
deadline relative
+to when the DAG was scheduled to run. Here's an example:
+
+.. code-block:: python
+
+    DeadlineAlert(
+        reference=DeadlineReference.DAGRUN_LOGICAL_DATE,
+        interval=timedelta(hours=1),
+        callback=notify_team,
+    )
+
+In this case, if a DAG is scheduled to run daily at midnight, the deadline 
would be triggered

Review Comment:
   ```suggestion
   In this case, if a Dag is scheduled to run daily at midnight, the deadline 
would be triggered
   ```



##########
airflow-core/docs/howto/deadline-alerts.rst:
##########
@@ -0,0 +1,312 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Deadline Alerts
+===============
+
+The :class:`~airflow.sdk.definitions.deadline.DeadlineAlert` feature is the 
next evolution of
+the old SLA.  Deadline Alerts allow you to set time thresholds for your DAG 
runs and automatically
+respond when those thresholds are exceeded. You can set up Deadline Alerts by 
choosing a built-in
+reference point, setting an interval, and defining a response using either 
Airflow's Notifiers or
+a custom callback function.
+
+Creating a Deadline Alert
+-------------------------
+
+Creating a Deadline Alert requires three mandatory and one optional parameter:
+
+* Reference: When to start counting from
+* Interval: How far before or after the reference point to trigger the alert
+* Callback: What to do when the deadline is exceeded
+* Callback Kwargs:  Optional values to pass to the Callback when it is run
+
+Here is how Deadlines are calculated:
+
+::
+
+    [Reference] ------ [Interval] ------> [Deadline]
+        ^                                     ^
+        |                                     |
+     Start time                          Trigger point
+
+Below is an example DAG implementation. If the DAG has not finished 15 minutes 
after it was queued, send a Slack message:
+
+.. code-block:: python
+
+    from datetime import datetime, timedelta
+    from airflow import DAG
+    from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, 
DeadlineReference
+    from airflow.providers.slack.notifications.slack_webhook import 
SlackWebhookNotifier
+    from airflow.providers.standard.operators.empty import EmptyOperator
+
+    with DAG(
+        dag_id="deadline_alert_example",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=15),
+            callback=AsyncCallback(
+                SlackWebhookNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "DAG 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+The timeline for this example would look like this:
+
+::
+
+    |------|-----------|---------|-----------|--------|
+        Scheduled    Queued    Started    Deadline
+         00:00       00:03      00:05      00:18
+
+Using Built-in References
+-------------------------
+
+Airflow provides several built-in reference points that you can use with 
DeadlineAlert:
+
+``DeadlineReference.DAGRUN_QUEUED_AT``
+    Measures time from when the DagRun was queued. Useful for monitoring 
resource constraints.
+
+``DeadlineReference.DAGRUN_LOGICAL_DATE``
+    References when the DAG run was scheduled to start. For example, setting 
an interval of
+    ``timedelta(minutes=15)`` would trigger the alert if the DAG hasn't 
completed 15 minutes
+    after it was scheduled to start, regardless of when (or if) it actually 
began executing.
+    Useful for ensuring scheduled DAGs complete before their next scheduled 
run.
+
+``DeadlineReference.FIXED_DATETIME``
+    Specifies a fixed point in time. Useful when DAGs must complete by a 
specific time.
+
+Here's an example using a fixed datetime:
+
+.. code-block:: python
+
+    tomorrow_at_ten = datetime.combine(datetime.now().date() + 
timedelta(days=1), time(10, 0))
+
+    with DAG(
+        dag_id="fixed_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.FIXED_DATETIME(tomorrow_at_ten),
+            interval=timedelta(minutes=-30),  # Alert 30 minutes before the 
reference.
+            callback=AsyncCallback(
+                SlackWebhookNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "DAG 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+The timeline for this example would look like this:
+
+::
+
+    |------|----------|---------|------------|--------|
+         Queued     Start    Deadline    Reference
+         09:15      09:17     09:30       10:00
+
+.. note::
+    Note that since the interval is a negative value, the deadline is before 
the reference in this case.
+
+Using Callbacks
+---------------
+
+When a deadline is exceeded, the callback is executed. You can use an existing 
:doc:`Notifier </howto/notifications>`
+or create a custom callback function.  A callback must be either an 
:class:`~airflow.sdk.definitions.deadline.AsyncCallback`
+or a :class:`~airflow.sdk.definitions.deadline.SyncCallback`.
+
+Using Built-in Notifiers
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+Here's an example using the Slack Notifier if the DagRun has not finished 
within 30 minutes of it being queued:
+
+.. code-block:: python
+
+    with DAG(
+        dag_id="slack_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=30),
+            callback=AsyncCallback(
+                SlackNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "DAG 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+Creating Custom Callbacks
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+You can create custom callbacks for more complex handling. The 
``callback_kwargs`` specified in
+the ``DeadlineAlert`` are passed to the callback function, if any are 
provided.  **Synchronous callbacks**
+(standard python methods) can be defined in the dag bundle and are run in the 
Executor.  **Asynchronous
+callbacks** must be defined somewhere in the Triggerer's system path.
+
+.. note::
+    Regarding Async Custom Deadline callbacks:
+
+    * Async callbacks are executed by the Triggerer, so users must ensure they 
are importable by the Triggerer.
+    * One easy way to do this is to place the callback as a top-level method 
in a new file in the plugins folder.
+    * The Triggerer will need to be restarted when a callback is added or 
changed in order to reload the file.
+
+A **custom synchronous callback** might look like this:
+
+.. code-block:: python
+
+    from datetime import timedelta
+
+    from airflow import DAG
+    from airflow.providers.standard.operators.empty import EmptyOperator
+    from airflow.sdk.definitions.deadline import DeadlineAlert, 
DeadlineReference, SyncCallback
+
+
+    def custom_synchronous_callback(**kwargs):
+        """Handle deadline violation with custom logic."""
+        print(f"Deadline exceeded for DAG {kwargs.get("dag_id")}!")
+        print(f"Alert type: {kwargs.get("alert_type")}")
+        # Additional custom handling here
+
+
+    with DAG(
+        dag_id="custom_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=15),
+            callback=SyncCallback(
+                custom_synchronous_callback,
+                kwargs={"alert_type": "time_exceeded", "dag_id": 
"custom_deadline_alert"},
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+A **custom asynchronous callback** is only slightly more work.  Note in the 
following example that
+the custom callback code is placed in a separate file, and must be imported in 
the DAG file.
+
+.. code-block:: python
+
+    # Place this method in `/files/plugins/deadline_callbacks.py`
+    async def custom_async_callback(**kwargs):
+        """Handle deadline violation with custom logic."""
+        print(f"Deadline exceeded for DAG {kwargs.get("dag_id")}!")
+        print(f"Alert type: {kwargs.get("alert_type")}")
+        # Additional custom handling here
+
+
+    # Place this in a dag file
+    from datetime import timedelta
+
+    from deadline_callbacks import custom_async_callback
+
+    from airflow import DAG
+    from airflow.providers.standard.operators.empty import EmptyOperator
+    from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, 
DeadlineReference
+
+    with DAG(
+        dag_id="custom_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=15),
+            callback=AsyncCallback(
+                custom_async_callback,
+                kwargs={"alert_type": "time_exceeded", "dag_id": 
"custom_deadline_alert"},
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+
+Deadline Calculation
+^^^^^^^^^^^^^^^^^^^^
+
+A deadline's trigger time is calculated by adding the ``interval`` to the 
datetime returned by
+the ``reference``. For ``FIXED_DATETIME`` references, negative intervals can 
be particularly
+useful to trigger the callback *before* the reference time.
+
+For example:
+
+.. code-block:: python
+
+    next_meeting = datetime(2025, 06, 26, 9, 30)
+
+    DeadlineAlert(
+        reference=DeadlineReference.FIXED_DATETIME(next_meeting),
+        interval=timedelta(hours=-2),
+        callback=notify_team,
+    )
+
+This will trigger the alert 2 hours before the next meeting starts.
+
+For ``DAGRUN_LOGICAL_DATE``, the interval is typically positive, setting a 
deadline relative
+to when the DAG was scheduled to run. Here's an example:

Review Comment:
   ```suggestion
   to when the Dag was scheduled to run. Here's an example:
   ```



##########
airflow-core/docs/howto/deadline-alerts.rst:
##########
@@ -0,0 +1,312 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Deadline Alerts
+===============
+
+The :class:`~airflow.sdk.definitions.deadline.DeadlineAlert` feature is the 
next evolution of
+the old SLA.  Deadline Alerts allow you to set time thresholds for your DAG 
runs and automatically
+respond when those thresholds are exceeded. You can set up Deadline Alerts by 
choosing a built-in
+reference point, setting an interval, and defining a response using either 
Airflow's Notifiers or
+a custom callback function.
+
+Creating a Deadline Alert
+-------------------------
+
+Creating a Deadline Alert requires three mandatory and one optional parameter:
+
+* Reference: When to start counting from
+* Interval: How far before or after the reference point to trigger the alert
+* Callback: What to do when the deadline is exceeded
+* Callback Kwargs:  Optional values to pass to the Callback when it is run
+
+Here is how Deadlines are calculated:
+
+::
+
+    [Reference] ------ [Interval] ------> [Deadline]
+        ^                                     ^
+        |                                     |
+     Start time                          Trigger point
+
+Below is an example DAG implementation. If the DAG has not finished 15 minutes 
after it was queued, send a Slack message:
+
+.. code-block:: python
+
+    from datetime import datetime, timedelta
+    from airflow import DAG
+    from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, 
DeadlineReference
+    from airflow.providers.slack.notifications.slack_webhook import 
SlackWebhookNotifier
+    from airflow.providers.standard.operators.empty import EmptyOperator
+
+    with DAG(
+        dag_id="deadline_alert_example",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=15),
+            callback=AsyncCallback(
+                SlackWebhookNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "DAG 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+The timeline for this example would look like this:
+
+::
+
+    |------|-----------|---------|-----------|--------|
+        Scheduled    Queued    Started    Deadline
+         00:00       00:03      00:05      00:18
+
+Using Built-in References
+-------------------------
+
+Airflow provides several built-in reference points that you can use with 
DeadlineAlert:
+
+``DeadlineReference.DAGRUN_QUEUED_AT``
+    Measures time from when the DagRun was queued. Useful for monitoring 
resource constraints.
+
+``DeadlineReference.DAGRUN_LOGICAL_DATE``
+    References when the DAG run was scheduled to start. For example, setting 
an interval of
+    ``timedelta(minutes=15)`` would trigger the alert if the DAG hasn't 
completed 15 minutes
+    after it was scheduled to start, regardless of when (or if) it actually 
began executing.
+    Useful for ensuring scheduled DAGs complete before their next scheduled 
run.
+
+``DeadlineReference.FIXED_DATETIME``
+    Specifies a fixed point in time. Useful when DAGs must complete by a 
specific time.
+
+Here's an example using a fixed datetime:
+
+.. code-block:: python
+
+    tomorrow_at_ten = datetime.combine(datetime.now().date() + 
timedelta(days=1), time(10, 0))
+
+    with DAG(
+        dag_id="fixed_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.FIXED_DATETIME(tomorrow_at_ten),
+            interval=timedelta(minutes=-30),  # Alert 30 minutes before the 
reference.
+            callback=AsyncCallback(
+                SlackWebhookNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "DAG 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+The timeline for this example would look like this:
+
+::
+
+    |------|----------|---------|------------|--------|
+         Queued     Start    Deadline    Reference
+         09:15      09:17     09:30       10:00
+
+.. note::
+    Note that since the interval is a negative value, the deadline is before 
the reference in this case.
+
+Using Callbacks
+---------------
+
+When a deadline is exceeded, the callback is executed. You can use an existing 
:doc:`Notifier </howto/notifications>`
+or create a custom callback function.  A callback must be either an 
:class:`~airflow.sdk.definitions.deadline.AsyncCallback`
+or a :class:`~airflow.sdk.definitions.deadline.SyncCallback`.
+
+Using Built-in Notifiers
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+Here's an example using the Slack Notifier if the DagRun has not finished 
within 30 minutes of it being queued:
+
+.. code-block:: python
+
+    with DAG(
+        dag_id="slack_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=30),
+            callback=AsyncCallback(
+                SlackNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "DAG 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+Creating Custom Callbacks
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+You can create custom callbacks for more complex handling. The 
``callback_kwargs`` specified in
+the ``DeadlineAlert`` are passed to the callback function, if any are 
provided.  **Synchronous callbacks**
+(standard python methods) can be defined in the dag bundle and are run in the 
Executor.  **Asynchronous
+callbacks** must be defined somewhere in the Triggerer's system path.
+
+.. note::
+    Regarding Async Custom Deadline callbacks:
+
+    * Async callbacks are executed by the Triggerer, so users must ensure they 
are importable by the Triggerer.
+    * One easy way to do this is to place the callback as a top-level method 
in a new file in the plugins folder.
+    * The Triggerer will need to be restarted when a callback is added or 
changed in order to reload the file.
+
+A **custom synchronous callback** might look like this:
+
+.. code-block:: python
+
+    from datetime import timedelta
+
+    from airflow import DAG
+    from airflow.providers.standard.operators.empty import EmptyOperator
+    from airflow.sdk.definitions.deadline import DeadlineAlert, 
DeadlineReference, SyncCallback
+
+
+    def custom_synchronous_callback(**kwargs):
+        """Handle deadline violation with custom logic."""
+        print(f"Deadline exceeded for DAG {kwargs.get("dag_id")}!")
+        print(f"Alert type: {kwargs.get("alert_type")}")
+        # Additional custom handling here
+
+
+    with DAG(
+        dag_id="custom_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=15),
+            callback=SyncCallback(
+                custom_synchronous_callback,
+                kwargs={"alert_type": "time_exceeded", "dag_id": 
"custom_deadline_alert"},
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+A **custom asynchronous callback** is only slightly more work.  Note in the 
following example that
+the custom callback code is placed in a separate file, and must be imported in 
the DAG file.
+
+.. code-block:: python
+
+    # Place this method in `/files/plugins/deadline_callbacks.py`
+    async def custom_async_callback(**kwargs):
+        """Handle deadline violation with custom logic."""
+        print(f"Deadline exceeded for DAG {kwargs.get("dag_id")}!")
+        print(f"Alert type: {kwargs.get("alert_type")}")
+        # Additional custom handling here
+
+
+    # Place this in a dag file
+    from datetime import timedelta
+
+    from deadline_callbacks import custom_async_callback
+
+    from airflow import DAG
+    from airflow.providers.standard.operators.empty import EmptyOperator
+    from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, 
DeadlineReference
+
+    with DAG(
+        dag_id="custom_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=15),
+            callback=AsyncCallback(
+                custom_async_callback,
+                kwargs={"alert_type": "time_exceeded", "dag_id": 
"custom_deadline_alert"},
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+
+Deadline Calculation
+^^^^^^^^^^^^^^^^^^^^
+
+A deadline's trigger time is calculated by adding the ``interval`` to the 
datetime returned by
+the ``reference``. For ``FIXED_DATETIME`` references, negative intervals can 
be particularly
+useful to trigger the callback *before* the reference time.
+
+For example:
+
+.. code-block:: python
+
+    next_meeting = datetime(2025, 06, 26, 9, 30)
+
+    DeadlineAlert(
+        reference=DeadlineReference.FIXED_DATETIME(next_meeting),
+        interval=timedelta(hours=-2),
+        callback=notify_team,
+    )
+
+This will trigger the alert 2 hours before the next meeting starts.
+
+For ``DAGRUN_LOGICAL_DATE``, the interval is typically positive, setting a 
deadline relative
+to when the DAG was scheduled to run. Here's an example:
+
+.. code-block:: python
+
+    DeadlineAlert(
+        reference=DeadlineReference.DAGRUN_LOGICAL_DATE,
+        interval=timedelta(hours=1),
+        callback=notify_team,
+    )
+
+In this case, if a DAG is scheduled to run daily at midnight, the deadline 
would be triggered
+if the DAG hasn't completed by 1:00 AM. This is useful for ensuring that 
scheduled jobs complete

Review Comment:
   ```suggestion
   if the Dag hasn't completed by 1:00 AM. This is useful for ensuring that 
scheduled jobs complete
   ```



##########
airflow-core/docs/howto/deadline-alerts.rst:
##########
@@ -0,0 +1,312 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Deadline Alerts
+===============
+
+The :class:`~airflow.sdk.definitions.deadline.DeadlineAlert` feature is the 
next evolution of
+the old SLA.  Deadline Alerts allow you to set time thresholds for your DAG 
runs and automatically
+respond when those thresholds are exceeded. You can set up Deadline Alerts by 
choosing a built-in
+reference point, setting an interval, and defining a response using either 
Airflow's Notifiers or
+a custom callback function.
+
+Creating a Deadline Alert
+-------------------------
+
+Creating a Deadline Alert requires three mandatory and one optional parameter:
+
+* Reference: When to start counting from
+* Interval: How far before or after the reference point to trigger the alert
+* Callback: What to do when the deadline is exceeded
+* Callback Kwargs:  Optional values to pass to the Callback when it is run
+
+Here is how Deadlines are calculated:
+
+::
+
+    [Reference] ------ [Interval] ------> [Deadline]
+        ^                                     ^
+        |                                     |
+     Start time                          Trigger point
+
+Below is an example DAG implementation. If the DAG has not finished 15 minutes 
after it was queued, send a Slack message:

Review Comment:
   ```suggestion
   Below is an example Dag implementation. If the Dag has not finished 15 
minutes after it was queued, send a Slack message:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to