This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8a4abf28ee919b4fcb4ea4916b1feb6762667869
Author: john-jac <[email protected]>
AuthorDate: Wed Mar 15 15:56:19 2023 -0700

    Update workday example (#30026)
    
    * Update workday example
    
    Benefit of this change is that a) it provides a solution to a common use 
case for Airflow users, and b) it separates out the logic of determining 
whether to schedule on a given day from the common logic used by Airflow to 
request what the next scheduled date/time is.
    
    (cherry picked from commit 66b5f90f4536329ba1fe0e54e3f15ec98c1e2730)
---
 airflow/example_dags/plugins/workday.py    | 47 +++++++++++++++++-------------
 docs/apache-airflow/howto/timetable.rst    | 16 +++++-----
 tests/timetables/test_workday_timetable.py |  7 +++--
 3 files changed, 40 insertions(+), 30 deletions(-)

diff --git a/airflow/example_dags/plugins/workday.py 
b/airflow/example_dags/plugins/workday.py
index db68c29541..92368de0ae 100644
--- a/airflow/example_dags/plugins/workday.py
+++ b/airflow/example_dags/plugins/workday.py
@@ -18,9 +18,10 @@
 """Plugin to demonstrate timetable registration and accommodate example 
DAGs."""
 from __future__ import annotations
 
-# [START howto_timetable]
 from datetime import timedelta
 
+# [START howto_timetable]
+from pandas.tseries.holiday import USFederalHolidayCalendar
 from pendulum import UTC, Date, DateTime, Time
 
 from airflow.plugins_manager import AirflowPlugin
@@ -28,16 +29,25 @@ from airflow.timetables.base import DagRunInfo, 
DataInterval, TimeRestriction, T
 
 
 class AfterWorkdayTimetable(Timetable):
+    def get_next_workday(self, d: DateTime, incr=1) -> DateTime:
+        cal = USFederalHolidayCalendar()
+        next_start = d
+        while True:
+            if next_start.weekday() in (5, 6):  # If next start is in the 
weekend go to next day
+                next_start = next_start + incr * timedelta(days=1)
+                continue
+            holidays = cal.holidays(start=next_start, 
end=next_start).to_pydatetime()
+            if next_start in holidays:  # If next start is a holiday go to 
next day
+                next_start = next_start + incr * timedelta(days=1)
+                continue
+            break
+        return next_start
 
     # [START howto_timetable_infer_manual_data_interval]
     def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
-        weekday = run_after.weekday()
-        if weekday in (0, 6):  # Monday and Sunday -- interval is last Friday.
-            days_since_friday = (run_after.weekday() - 4) % 7
-            delta = timedelta(days=days_since_friday)
-        else:  # Otherwise the interval is yesterday.
-            delta = timedelta(days=1)
-        start = DateTime.combine((run_after - delta).date(), 
Time.min).replace(tzinfo=UTC)
+        start = DateTime.combine((run_after - timedelta(days=1)).date(), 
Time.min).replace(tzinfo=UTC)
+        # Skip backwards over weekends and holidays to find last run
+        start = self.get_next_workday(start, incr=-1)
         return DataInterval(start=start, end=(start + timedelta(days=1)))
 
     # [END howto_timetable_infer_manual_data_interval]
@@ -51,12 +61,9 @@ class AfterWorkdayTimetable(Timetable):
     ) -> DagRunInfo | None:
         if last_automated_data_interval is not None:  # There was a previous 
run on the regular schedule.
             last_start = last_automated_data_interval.start
-            last_start_weekday = last_start.weekday()
-            if 0 <= last_start_weekday < 4:  # Last run on Monday through 
Thursday -- next is tomorrow.
-                delta = timedelta(days=1)
-            else:  # Last run on Friday -- skip to next Monday.
-                delta = timedelta(days=(7 - last_start_weekday))
-            next_start = DateTime.combine((last_start + delta).date(), 
Time.min).replace(tzinfo=UTC)
+            next_start = DateTime.combine((last_start + 
timedelta(days=1)).date(), Time.min).replace(
+                tzinfo=UTC
+            )
         else:  # This is the first ever run on the regular schedule.
             next_start = restriction.earliest
             if next_start is None:  # No start_date. Don't schedule.
@@ -66,12 +73,12 @@ class AfterWorkdayTimetable(Timetable):
                 next_start = max(next_start, DateTime.combine(Date.today(), 
Time.min).replace(tzinfo=UTC))
             elif next_start.time() != Time.min:
                 # If earliest does not fall on midnight, skip to the next day.
-                next_day = next_start.date() + timedelta(days=1)
-                next_start = DateTime.combine(next_day, 
Time.min).replace(tzinfo=UTC)
-            next_start_weekday = next_start.weekday()
-            if next_start_weekday in (5, 6):  # If next start is in the 
weekend, go to next Monday.
-                delta = timedelta(days=(7 - next_start_weekday))
-                next_start = next_start + delta
+                next_start = DateTime.combine(next_start.date() + 
timedelta(days=1), Time.min).replace(
+                    tzinfo=UTC
+                )
+        # Skip weekends and holidays
+        next_start = self.get_next_workday(next_start)
+
         if restriction.latest is not None and next_start > restriction.latest:
             return None  # Over the DAG's scheduled end; don't schedule.
         return DagRunInfo.interval(start=next_start, end=(next_start + 
timedelta(days=1)))
diff --git a/docs/apache-airflow/howto/timetable.rst 
b/docs/apache-airflow/howto/timetable.rst
index fac296377f..767bfa4f16 100644
--- a/docs/apache-airflow/howto/timetable.rst
+++ b/docs/apache-airflow/howto/timetable.rst
@@ -24,7 +24,8 @@ process data collected during the work day. The first 
intuitive answer to this
 would be ``schedule="0 0 * * 1-5"`` (midnight on Monday to Friday), but
 this means data collected on Friday will *not* be processed right after Friday
 ends, but on the next Monday, and that run's interval would be from midnight
-Friday to midnight *Monday*. What we want is:
+Friday to midnight *Monday*. Further, the above schedule string cannot skip
+processing on holidays. What we want is:
 
 * Schedule a run for each Monday, Tuesday, Wednesday, Thursday, and Friday. The
   run's data interval would cover from midnight of each day, to midnight of the
@@ -32,6 +33,7 @@ Friday to midnight *Monday*. What we want is:
 * Each run would be created right after the data interval ends. The run 
covering
   Monday happens on midnight Tuesday and so on. The run covering Friday happens
   on midnight Saturday. No runs happen on midnights Sunday and Monday.
+* Do not schedule a run on defined holidays.
 
 For simplicity, we will only deal with UTC datetimes in this example.
 
@@ -46,8 +48,8 @@ Timetable Registration
 ----------------------
 
 A timetable must be a subclass of :class:`~airflow.timetables.base.Timetable`,
-and be registered as a part of a :doc:`plugin 
</authoring-and-scheduling/plugins>`. The following is a
-skeleton for us to implement a new timetable:
+and be registered as a part of a :doc:`plugin 
</authoring-and-scheduling/plugins>`.
+The following is a skeleton for us to implement a new timetable:
 
 .. code-block:: python
 
@@ -144,10 +146,10 @@ how the DAG and its tasks specify the schedule, and 
contains three attributes:
     (usually after the end of the data interval).
 
 If there was a run scheduled previously, we should now schedule for the next
-weekday, i.e. plus one day if the previous run was on Monday through Thursday,
-or three days if it was on Friday. If there was not a previous scheduled run,
-however, we pick the next workday's midnight after ``restriction.earliest``
-(unless it *is* a workday's midnight; in which case it's used directly).
+non-holiday weekday by looping through subsequent days to find one that is not
+a Saturday, Sunday, or US holiday. If there was not a previous scheduled run,
+however, we pick the next non-holiday workday's midnight after
+``restriction.earliest``.
 ``restriction.catchup`` also needs to be considered---if it's ``False``, we
 can't schedule before the current time, even if ``start_date`` values are in 
the
 past. Finally, if our calculated data interval is later than
diff --git a/tests/timetables/test_workday_timetable.py 
b/tests/timetables/test_workday_timetable.py
index d630c3483a..d20f45d44c 100644
--- a/tests/timetables/test_workday_timetable.py
+++ b/tests/timetables/test_workday_timetable.py
@@ -29,7 +29,7 @@ from airflow.timetables.base import DagRunInfo, DataInterval, 
TimeRestriction, T
 START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=TIMEZONE)  # This is a 
Saturday.
 
 WEEK_1_WEEKDAYS = [
-    pendulum.DateTime(2021, 9, 6, tzinfo=TIMEZONE),
+    pendulum.DateTime(2021, 9, 6, tzinfo=TIMEZONE),  # This is a US holiday
     pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE),
     pendulum.DateTime(2021, 9, 8, tzinfo=TIMEZONE),
     pendulum.DateTime(2021, 9, 9, tzinfo=TIMEZONE),
@@ -62,9 +62,10 @@ def test_dag_run_info_interval(start: pendulum.DateTime, 
end: pendulum.DateTime)
 
 
 def test_first_schedule(timetable: Timetable, restriction: TimeRestriction):
-    """Since DAG starts on Saturday, the first ever run covers the next Monday 
and schedules on Tuesday."""
+    """Since DAG starts on Saturday, and the first Monday is a holiday,
+    the first ever run covers the next Tuesday and schedules on Wednesday."""
     next_info = timetable.next_dagrun_info(last_automated_data_interval=None, 
restriction=restriction)
-    assert next_info == DagRunInfo.interval(WEEK_1_WEEKDAYS[0], 
WEEK_1_WEEKDAYS[1])
+    assert next_info == DagRunInfo.interval(WEEK_1_WEEKDAYS[1], 
WEEK_1_WEEKDAYS[2])
 
 
 @pytest.mark.parametrize(

Reply via email to