vincbeck commented on code in PR #53727: URL: https://github.com/apache/airflow/pull/53727#discussion_r2243578854
########## airflow-core/docs/howto/deadline-alerts.rst: ########## @@ -0,0 +1,255 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +Deadline Alerts +=============== + +The :class:`~airflow.sdk.definitions.deadline.DeadlineAlert` feature is the next evolution of +the old SLA. Deadline Alerts allow you to set time thresholds for your DAG runs and automatically +respond when those thresholds are exceeded. You can set up Deadline Alerts by choosing a built-in +reference point, setting an interval, and defining a response using either Airflow's Notifiers or +a custom callback function. + +Creating a Deadline Alert +------------------------- + +To create a Deadline Alert, you'll need to specify three components: + +* A reference: When to start counting from +* An interval: How far before or after the reference point to trigger the alert +* A callback: What to do when the deadline is exceeded + +Here is how Deadlines are calculated: + +:: + + [Reference] ------ [Interval] ------> [Deadline] + ^ ^ + | | + Start time Trigger point + +Here's an example DAG implementation. If the DAG has not finished 15 minutes after it was queued, send an email: + +.. code-block:: python + + from datetime import datetime, timedelta + from airflow import DAG + from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference + from airflow.providers.smtp.notifications.smtp import SmtpNotifier + from airflow.providers.standard.operators.empty import EmptyOperator + + with DAG( + dag_id="deadline_alert_example", + deadline=DeadlineAlert( + reference=DeadlineReference.DAGRUN_QUEUED_AT, + interval=timedelta(minutes=15), + callback=SmtpNotifier( + to="[email protected]", + subject="[Alert] DAG {{ dag.dag_id }} exceeded time threshold", + html_content="The DAG has been running for more than 15 minutes since being queued.", + ), + ), + ): + EmptyOperator(task_id="example_task") + +The timeline for this example would look like this: + +:: + + |------|-----------|---------|-----------|--------| + Scheduled Queued Started Deadline + 00:00 00:03 00:05 00:18 Review Comment: It is 15 minutes right? Other than that, I love this section, it makes it super easy to understand the concept of deadlines! ```suggestion 00:00 00:03 00:05 00:15 ``` ########## airflow-core/docs/core-concepts/dags.rst: ########## @@ -829,3 +829,40 @@ if it fails for ``N`` number of times consecutively. we can also provide and override these configuration from DAG argument: - ``max_consecutive_failed_dag_runs``: Overrides :ref:`config:core__max_consecutive_failed_dag_runs_per_dag`. + +Deadline Alerts +--------------- + +.. versionadded:: 3.1 + +Deadline Alerts allow you to set time thresholds for your DAG runs and automatically respond when those +thresholds are exceeded. You can set deadlines relative to a fixed datetime, use one of the available +calculated references (like DAG queue time or start time), or implement your own custom reference. +When a deadline is exceeded, it triggers a callback which can notify you or take other actions. + +Here's a simple example using the existing email Notifier: Review Comment: Unless you have good reason to keep it capitalized? ```suggestion Here's a simple example using the existing email notifier: ``` ########## airflow-core/docs/howto/deadline-alerts.rst: ########## @@ -0,0 +1,255 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +Deadline Alerts +=============== + +The :class:`~airflow.sdk.definitions.deadline.DeadlineAlert` feature is the next evolution of +the old SLA. Deadline Alerts allow you to set time thresholds for your DAG runs and automatically +respond when those thresholds are exceeded. You can set up Deadline Alerts by choosing a built-in +reference point, setting an interval, and defining a response using either Airflow's Notifiers or +a custom callback function. + +Creating a Deadline Alert +------------------------- + +To create a Deadline Alert, you'll need to specify three components: + +* A reference: When to start counting from +* An interval: How far before or after the reference point to trigger the alert +* A callback: What to do when the deadline is exceeded + +Here is how Deadlines are calculated: + +:: + + [Reference] ------ [Interval] ------> [Deadline] + ^ ^ + | | + Start time Trigger point + +Here's an example DAG implementation. If the DAG has not finished 15 minutes after it was queued, send an email: + +.. code-block:: python + + from datetime import datetime, timedelta + from airflow import DAG + from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference + from airflow.providers.smtp.notifications.smtp import SmtpNotifier + from airflow.providers.standard.operators.empty import EmptyOperator + + with DAG( + dag_id="deadline_alert_example", + deadline=DeadlineAlert( + reference=DeadlineReference.DAGRUN_QUEUED_AT, + interval=timedelta(minutes=15), + callback=SmtpNotifier( + to="[email protected]", + subject="[Alert] DAG {{ dag.dag_id }} exceeded time threshold", + html_content="The DAG has been running for more than 15 minutes since being queued.", + ), + ), + ): + EmptyOperator(task_id="example_task") + +The timeline for this example would look like this: + +:: + + |------|-----------|---------|-----------|--------| + Scheduled Queued Started Deadline + 00:00 00:03 00:05 00:18 + +Using Built-in References +------------------------- + +Airflow provides several built-in reference points that you can use with DeadlineAlert: + +``DeadlineReference.DAGRUN_QUEUED_AT`` + Measures time from when the DagRun was queued. Useful for monitoring resource constraints. + +``DeadlineReference.DAGRUN_LOGICAL_DATE`` + References when the DAG run was scheduled to start. For example, setting an interval of + ``timedelta(minutes=15)`` would trigger the alert if the DAG hasn't completed 15 minutes + after it was scheduled to start, regardless of when (or if) it actually began executing. + Useful for ensuring scheduled DAGs complete before their next scheduled run. + +``DeadlineReference.FIXED_DATETIME`` + Specifies a fixed point in time. Useful when DAGs must complete by a specific time. + +Here's an example using a fixed datetime: + +.. code-block:: python + + tomorrow_at_ten = datetime.combine(datetime.now().date() + timedelta(days=1), time(10, 0)) + + with DAG( + dag_id="fixed_deadline_alert", + deadline=DeadlineAlert( + reference=DeadlineReference.FIXED_DATETIME(tomorrow_at_ten), + interval=timedelta(minutes=-30), # Alert 30 minutes before the deadline + callback=SmtpNotifier( + to="[email protected]", + subject="Report will be late", + html_content="The report will not be ready 30 minutes before the deadline.", + ), + ), + ): + EmptyOperator(task_id="example_task") + +The timeline for this example would look like this: + +:: + + |------|----------|---------|----------|--------| + Deadline Queued Start Reference + 08:00 09:15 09:17 10:00 Review Comment: Is this timeline right? It does not seem to match the description above ########## airflow-core/docs/howto/deadline-alerts.rst: ########## @@ -0,0 +1,255 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +Deadline Alerts +=============== + +The :class:`~airflow.sdk.definitions.deadline.DeadlineAlert` feature is the next evolution of +the old SLA. Deadline Alerts allow you to set time thresholds for your DAG runs and automatically +respond when those thresholds are exceeded. You can set up Deadline Alerts by choosing a built-in +reference point, setting an interval, and defining a response using either Airflow's Notifiers or +a custom callback function. + +Creating a Deadline Alert +------------------------- + +To create a Deadline Alert, you'll need to specify three components: + +* A reference: When to start counting from +* An interval: How far before or after the reference point to trigger the alert +* A callback: What to do when the deadline is exceeded + +Here is how Deadlines are calculated: + +:: + + [Reference] ------ [Interval] ------> [Deadline] + ^ ^ + | | + Start time Trigger point + +Here's an example DAG implementation. If the DAG has not finished 15 minutes after it was queued, send an email: + +.. code-block:: python + + from datetime import datetime, timedelta + from airflow import DAG + from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference + from airflow.providers.smtp.notifications.smtp import SmtpNotifier + from airflow.providers.standard.operators.empty import EmptyOperator + + with DAG( + dag_id="deadline_alert_example", + deadline=DeadlineAlert( + reference=DeadlineReference.DAGRUN_QUEUED_AT, + interval=timedelta(minutes=15), + callback=SmtpNotifier( + to="[email protected]", + subject="[Alert] DAG {{ dag.dag_id }} exceeded time threshold", + html_content="The DAG has been running for more than 15 minutes since being queued.", + ), + ), + ): + EmptyOperator(task_id="example_task") + +The timeline for this example would look like this: + +:: + + |------|-----------|---------|-----------|--------| + Scheduled Queued Started Deadline + 00:00 00:03 00:05 00:18 + +Using Built-in References +------------------------- + +Airflow provides several built-in reference points that you can use with DeadlineAlert: + +``DeadlineReference.DAGRUN_QUEUED_AT`` + Measures time from when the DagRun was queued. Useful for monitoring resource constraints. + +``DeadlineReference.DAGRUN_LOGICAL_DATE`` + References when the DAG run was scheduled to start. For example, setting an interval of + ``timedelta(minutes=15)`` would trigger the alert if the DAG hasn't completed 15 minutes + after it was scheduled to start, regardless of when (or if) it actually began executing. + Useful for ensuring scheduled DAGs complete before their next scheduled run. + +``DeadlineReference.FIXED_DATETIME`` + Specifies a fixed point in time. Useful when DAGs must complete by a specific time. + +Here's an example using a fixed datetime: + +.. code-block:: python + + tomorrow_at_ten = datetime.combine(datetime.now().date() + timedelta(days=1), time(10, 0)) + + with DAG( + dag_id="fixed_deadline_alert", + deadline=DeadlineAlert( + reference=DeadlineReference.FIXED_DATETIME(tomorrow_at_ten), + interval=timedelta(minutes=-30), # Alert 30 minutes before the deadline Review Comment: ```suggestion interval=timedelta(minutes=-30), # Alert 30 minutes before the reference ``` ########## airflow-core/docs/howto/deadline-alerts.rst: ########## @@ -0,0 +1,255 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +Deadline Alerts +=============== + +The :class:`~airflow.sdk.definitions.deadline.DeadlineAlert` feature is the next evolution of +the old SLA. Deadline Alerts allow you to set time thresholds for your DAG runs and automatically Review Comment: I think it has been brought up we should not compare Airflow version in docs, I am just wondering if mentioning "the old SLA" goes against that. ########## airflow-core/docs/howto/deadline-alerts.rst: ########## @@ -0,0 +1,255 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +Deadline Alerts +=============== + +The :class:`~airflow.sdk.definitions.deadline.DeadlineAlert` feature is the next evolution of +the old SLA. Deadline Alerts allow you to set time thresholds for your DAG runs and automatically +respond when those thresholds are exceeded. You can set up Deadline Alerts by choosing a built-in +reference point, setting an interval, and defining a response using either Airflow's Notifiers or +a custom callback function. + +Creating a Deadline Alert +------------------------- + +To create a Deadline Alert, you'll need to specify three components: + +* A reference: When to start counting from +* An interval: How far before or after the reference point to trigger the alert +* A callback: What to do when the deadline is exceeded + +Here is how Deadlines are calculated: + +:: + + [Reference] ------ [Interval] ------> [Deadline] + ^ ^ + | | + Start time Trigger point + +Here's an example DAG implementation. If the DAG has not finished 15 minutes after it was queued, send an email: + +.. code-block:: python + + from datetime import datetime, timedelta + from airflow import DAG + from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference + from airflow.providers.smtp.notifications.smtp import SmtpNotifier + from airflow.providers.standard.operators.empty import EmptyOperator + + with DAG( + dag_id="deadline_alert_example", + deadline=DeadlineAlert( + reference=DeadlineReference.DAGRUN_QUEUED_AT, + interval=timedelta(minutes=15), + callback=SmtpNotifier( + to="[email protected]", + subject="[Alert] DAG {{ dag.dag_id }} exceeded time threshold", + html_content="The DAG has been running for more than 15 minutes since being queued.", + ), + ), + ): + EmptyOperator(task_id="example_task") + +The timeline for this example would look like this: + +:: + + |------|-----------|---------|-----------|--------| + Scheduled Queued Started Deadline + 00:00 00:03 00:05 00:18 + +Using Built-in References +------------------------- + +Airflow provides several built-in reference points that you can use with DeadlineAlert: + +``DeadlineReference.DAGRUN_QUEUED_AT`` + Measures time from when the DagRun was queued. Useful for monitoring resource constraints. + +``DeadlineReference.DAGRUN_LOGICAL_DATE`` + References when the DAG run was scheduled to start. For example, setting an interval of + ``timedelta(minutes=15)`` would trigger the alert if the DAG hasn't completed 15 minutes + after it was scheduled to start, regardless of when (or if) it actually began executing. + Useful for ensuring scheduled DAGs complete before their next scheduled run. + +``DeadlineReference.FIXED_DATETIME`` + Specifies a fixed point in time. Useful when DAGs must complete by a specific time. + +Here's an example using a fixed datetime: + +.. code-block:: python + + tomorrow_at_ten = datetime.combine(datetime.now().date() + timedelta(days=1), time(10, 0)) + + with DAG( + dag_id="fixed_deadline_alert", + deadline=DeadlineAlert( + reference=DeadlineReference.FIXED_DATETIME(tomorrow_at_ten), + interval=timedelta(minutes=-30), # Alert 30 minutes before the deadline + callback=SmtpNotifier( + to="[email protected]", + subject="Report will be late", + html_content="The report will not be ready 30 minutes before the deadline.", + ), + ), + ): + EmptyOperator(task_id="example_task") + +The timeline for this example would look like this: + +:: + + |------|----------|---------|----------|--------| + Deadline Queued Start Reference + 08:00 09:15 09:17 10:00 + +Using Callbacks +--------------- + +When a deadline is exceeded, the callback is executed. You can use any async :doc:`Notifier </howto/notifications>` +or create a custom callback function. + +Using Built-in Notifiers +^^^^^^^^^^^^^^^^^^^^^^^^ + +Here's an example using the Slack notifier if the DagRun has not finished within 30 minutes of it being queued: + +.. code-block:: python + + with DAG( + dag_id="slack_deadline_alert", + deadline=DeadlineAlert( + reference=DeadlineReference.DAGRUN_QUEUED_AT, + interval=timedelta(minutes=15), Review Comment: ```suggestion interval=timedelta(minutes=30), ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
