This is an automated email from the ASF dual-hosted git repository. pierrejeambrun pushed a commit to branch v2-5-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 8a4abf28ee919b4fcb4ea4916b1feb6762667869 Author: john-jac <[email protected]> AuthorDate: Wed Mar 15 15:56:19 2023 -0700 Update workday example (#30026) * Update workday example Benefit of this change is that a) it provides a solution to a common use case for Airflow users, and b) it separates out the logic of determining whether to schedule on a given day from the common logic used by Airflow to request what the next scheduled date/time is. (cherry picked from commit 66b5f90f4536329ba1fe0e54e3f15ec98c1e2730) --- airflow/example_dags/plugins/workday.py | 47 +++++++++++++++++------------- docs/apache-airflow/howto/timetable.rst | 16 +++++----- tests/timetables/test_workday_timetable.py | 7 +++-- 3 files changed, 40 insertions(+), 30 deletions(-) diff --git a/airflow/example_dags/plugins/workday.py b/airflow/example_dags/plugins/workday.py index db68c29541..92368de0ae 100644 --- a/airflow/example_dags/plugins/workday.py +++ b/airflow/example_dags/plugins/workday.py @@ -18,9 +18,10 @@ """Plugin to demonstrate timetable registration and accommodate example DAGs.""" from __future__ import annotations -# [START howto_timetable] from datetime import timedelta +# [START howto_timetable] +from pandas.tseries.holiday import USFederalHolidayCalendar from pendulum import UTC, Date, DateTime, Time from airflow.plugins_manager import AirflowPlugin @@ -28,16 +29,25 @@ from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, T class AfterWorkdayTimetable(Timetable): + def get_next_workday(self, d: DateTime, incr=1) -> DateTime: + cal = USFederalHolidayCalendar() + next_start = d + while True: + if next_start.weekday() in (5, 6): # If next start is in the weekend go to next day + next_start = next_start + incr * timedelta(days=1) + continue + holidays = cal.holidays(start=next_start, end=next_start).to_pydatetime() + if next_start in holidays: # If next start is a holiday go to next day + next_start = next_start + incr * timedelta(days=1) + continue + break + return next_start # [START howto_timetable_infer_manual_data_interval] def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval: - weekday = run_after.weekday() - if weekday in (0, 6): # Monday and Sunday -- interval is last Friday. - days_since_friday = (run_after.weekday() - 4) % 7 - delta = timedelta(days=days_since_friday) - else: # Otherwise the interval is yesterday. - delta = timedelta(days=1) - start = DateTime.combine((run_after - delta).date(), Time.min).replace(tzinfo=UTC) + start = DateTime.combine((run_after - timedelta(days=1)).date(), Time.min).replace(tzinfo=UTC) + # Skip backwards over weekends and holidays to find last run + start = self.get_next_workday(start, incr=-1) return DataInterval(start=start, end=(start + timedelta(days=1))) # [END howto_timetable_infer_manual_data_interval] @@ -51,12 +61,9 @@ class AfterWorkdayTimetable(Timetable): ) -> DagRunInfo | None: if last_automated_data_interval is not None: # There was a previous run on the regular schedule. last_start = last_automated_data_interval.start - last_start_weekday = last_start.weekday() - if 0 <= last_start_weekday < 4: # Last run on Monday through Thursday -- next is tomorrow. - delta = timedelta(days=1) - else: # Last run on Friday -- skip to next Monday. - delta = timedelta(days=(7 - last_start_weekday)) - next_start = DateTime.combine((last_start + delta).date(), Time.min).replace(tzinfo=UTC) + next_start = DateTime.combine((last_start + timedelta(days=1)).date(), Time.min).replace( + tzinfo=UTC + ) else: # This is the first ever run on the regular schedule. next_start = restriction.earliest if next_start is None: # No start_date. Don't schedule. @@ -66,12 +73,12 @@ class AfterWorkdayTimetable(Timetable): next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC)) elif next_start.time() != Time.min: # If earliest does not fall on midnight, skip to the next day. - next_day = next_start.date() + timedelta(days=1) - next_start = DateTime.combine(next_day, Time.min).replace(tzinfo=UTC) - next_start_weekday = next_start.weekday() - if next_start_weekday in (5, 6): # If next start is in the weekend, go to next Monday. - delta = timedelta(days=(7 - next_start_weekday)) - next_start = next_start + delta + next_start = DateTime.combine(next_start.date() + timedelta(days=1), Time.min).replace( + tzinfo=UTC + ) + # Skip weekends and holidays + next_start = self.get_next_workday(next_start) + if restriction.latest is not None and next_start > restriction.latest: return None # Over the DAG's scheduled end; don't schedule. return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1))) diff --git a/docs/apache-airflow/howto/timetable.rst b/docs/apache-airflow/howto/timetable.rst index fac296377f..767bfa4f16 100644 --- a/docs/apache-airflow/howto/timetable.rst +++ b/docs/apache-airflow/howto/timetable.rst @@ -24,7 +24,8 @@ process data collected during the work day. The first intuitive answer to this would be ``schedule="0 0 * * 1-5"`` (midnight on Monday to Friday), but this means data collected on Friday will *not* be processed right after Friday ends, but on the next Monday, and that run's interval would be from midnight -Friday to midnight *Monday*. What we want is: +Friday to midnight *Monday*. Further, the above schedule string cannot skip +processing on holidays. What we want is: * Schedule a run for each Monday, Tuesday, Wednesday, Thursday, and Friday. The run's data interval would cover from midnight of each day, to midnight of the @@ -32,6 +33,7 @@ Friday to midnight *Monday*. What we want is: * Each run would be created right after the data interval ends. The run covering Monday happens on midnight Tuesday and so on. The run covering Friday happens on midnight Saturday. No runs happen on midnights Sunday and Monday. +* Do not schedule a run on defined holidays. For simplicity, we will only deal with UTC datetimes in this example. @@ -46,8 +48,8 @@ Timetable Registration ---------------------- A timetable must be a subclass of :class:`~airflow.timetables.base.Timetable`, -and be registered as a part of a :doc:`plugin </authoring-and-scheduling/plugins>`. The following is a -skeleton for us to implement a new timetable: +and be registered as a part of a :doc:`plugin </authoring-and-scheduling/plugins>`. +The following is a skeleton for us to implement a new timetable: .. code-block:: python @@ -144,10 +146,10 @@ how the DAG and its tasks specify the schedule, and contains three attributes: (usually after the end of the data interval). If there was a run scheduled previously, we should now schedule for the next -weekday, i.e. plus one day if the previous run was on Monday through Thursday, -or three days if it was on Friday. If there was not a previous scheduled run, -however, we pick the next workday's midnight after ``restriction.earliest`` -(unless it *is* a workday's midnight; in which case it's used directly). +non-holiday weekday by looping through subsequent days to find one that is not +a Saturday, Sunday, or US holiday. If there was not a previous scheduled run, +however, we pick the next non-holiday workday's midnight after +``restriction.earliest``. ``restriction.catchup`` also needs to be considered---if it's ``False``, we can't schedule before the current time, even if ``start_date`` values are in the past. Finally, if our calculated data interval is later than diff --git a/tests/timetables/test_workday_timetable.py b/tests/timetables/test_workday_timetable.py index d630c3483a..d20f45d44c 100644 --- a/tests/timetables/test_workday_timetable.py +++ b/tests/timetables/test_workday_timetable.py @@ -29,7 +29,7 @@ from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, T START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=TIMEZONE) # This is a Saturday. WEEK_1_WEEKDAYS = [ - pendulum.DateTime(2021, 9, 6, tzinfo=TIMEZONE), + pendulum.DateTime(2021, 9, 6, tzinfo=TIMEZONE), # This is a US holiday pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE), pendulum.DateTime(2021, 9, 8, tzinfo=TIMEZONE), pendulum.DateTime(2021, 9, 9, tzinfo=TIMEZONE), @@ -62,9 +62,10 @@ def test_dag_run_info_interval(start: pendulum.DateTime, end: pendulum.DateTime) def test_first_schedule(timetable: Timetable, restriction: TimeRestriction): - """Since DAG starts on Saturday, the first ever run covers the next Monday and schedules on Tuesday.""" + """Since DAG starts on Saturday, and the first Monday is a holiday, + the first ever run covers the next Tuesday and schedules on Wednesday.""" next_info = timetable.next_dagrun_info(last_automated_data_interval=None, restriction=restriction) - assert next_info == DagRunInfo.interval(WEEK_1_WEEKDAYS[0], WEEK_1_WEEKDAYS[1]) + assert next_info == DagRunInfo.interval(WEEK_1_WEEKDAYS[1], WEEK_1_WEEKDAYS[2]) @pytest.mark.parametrize(
