This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c01daf8119 Do not let EventsTimetable schedule past events if
catchup=False (#36134)
c01daf8119 is described below
commit c01daf811925816e9ae09b78c37b9ff8d87ce691
Author: Aleksey Kirilishin <[email protected]>
AuthorDate: Wed Jan 10 19:29:02 2024 +0300
Do not let EventsTimetable schedule past events if catchup=False (#36134)
* Fix the EventsTimetable schedules past events bug
---------
Co-authored-by: Tzu-ping Chung <[email protected]>
---
airflow/timetables/events.py | 41 +++++++++++++-------
tests/timetables/test_events_timetable.py | 64 +++++++++++++++++++++++++++++--
2 files changed, 88 insertions(+), 17 deletions(-)
diff --git a/airflow/timetables/events.py b/airflow/timetables/events.py
index 1998b12d46..c8fd65c2a9 100644
--- a/airflow/timetables/events.py
+++ b/airflow/timetables/events.py
@@ -22,6 +22,7 @@ from typing import TYPE_CHECKING, Iterable
import pendulum
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
+from airflow.utils import timezone
if TYPE_CHECKING:
from pendulum import DateTime
@@ -58,10 +59,13 @@ class EventsTimetable(Timetable):
self.event_dates.sort()
self.restrict_to_events = restrict_to_events
if description is None:
- self.description = (
- f"{len(self.event_dates)} Events between {self.event_dates[0]}
and {self.event_dates[-1]}"
- )
- self._summary = f"{len(self.event_dates)} Events"
+ if self.event_dates:
+ self.description = (
+ f"{len(self.event_dates)} events between
{self.event_dates[0]} and {self.event_dates[-1]}"
+ )
+ else:
+ self.description = "No events"
+ self._summary = f"{len(self.event_dates)} events"
else:
self._summary = description
self.description = description
@@ -79,22 +83,31 @@ class EventsTimetable(Timetable):
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction,
) -> DagRunInfo | None:
- if last_automated_data_interval is None:
- next_event = self.event_dates[0]
+ earliest = restriction.earliest
+ if not restriction.catchup:
+ current_time = timezone.utcnow()
+ if earliest is None or current_time > earliest:
+ earliest = pendulum.instance(current_time)
+
+ for next_event in self.event_dates:
+ if earliest and next_event < earliest:
+ continue
+ if last_automated_data_interval and next_event <=
last_automated_data_interval.end:
+ continue
+ break
else:
- future_dates = itertools.dropwhile(
- lambda when: when <= last_automated_data_interval.end, #
type: ignore
- self.event_dates,
- )
- next_event = next(future_dates, None) # type: ignore
- if next_event is None:
- return None
+ # We need to return None if self.event_dates is empty or,
+ # if not empty, when no suitable event can be found.
+ return None
+
+ if restriction.latest is not None and next_event > restriction.latest:
+ return None
return DagRunInfo.exact(next_event)
def infer_manual_data_interval(self, *, run_after: DateTime) ->
DataInterval:
# If Timetable not restricted to events, run for the time specified
- if not self.restrict_to_events:
+ if not self.restrict_to_events or not self.event_dates:
return DataInterval.exact(run_after)
# If restricted to events, run for the most recent past event
diff --git a/tests/timetables/test_events_timetable.py
b/tests/timetables/test_events_timetable.py
index e743000f07..39d1fd3431 100644
--- a/tests/timetables/test_events_timetable.py
+++ b/tests/timetables/test_events_timetable.py
@@ -19,12 +19,14 @@ from __future__ import annotations
import pendulum
import pytest
+import time_machine
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction,
Timetable
from airflow.timetables.events import EventsTimetable
from airflow.utils.timezone import utc
-START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) # Precedes all events
+BEFORE_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) # Precedes all events
+START_DATE = pendulum.DateTime(2021, 9, 7, tzinfo=utc)
EVENT_DATES = [
pendulum.DateTime(2021, 9, 6, tzinfo=utc),
@@ -93,7 +95,7 @@ def test_manual_with_restricted_before(restricted_timetable:
Timetable, restrict
Test that when using strict event dates, manual runs before the first
event have the first event's date
as the start interval
"""
- manual_run_data_interval =
restricted_timetable.infer_manual_data_interval(run_after=START_DATE)
+ manual_run_data_interval =
restricted_timetable.infer_manual_data_interval(run_after=BEFORE_DATE)
expected_data_interval = DataInterval.exact(EVENT_DATES[0])
assert expected_data_interval == manual_run_data_interval
@@ -101,8 +103,15 @@ def
test_manual_with_restricted_before(restricted_timetable: Timetable, restrict
@pytest.mark.parametrize(
"last_automated_data_interval, expected_next_info",
[
+ pytest.param(None, DagRunInfo.interval(START_DATE, START_DATE)),
+ pytest.param(
+ DataInterval(EVENT_DATES_SORTED[0], EVENT_DATES_SORTED[0]),
+ DagRunInfo.interval(START_DATE, START_DATE),
+ ),
+ ]
+ + [
pytest.param(DataInterval(day1, day1), DagRunInfo.interval(day2, day2))
- for day1, day2 in zip(EVENT_DATES_SORTED, EVENT_DATES_SORTED[1:])
+ for day1, day2 in zip(EVENT_DATES_SORTED[1:], EVENT_DATES_SORTED[2:])
]
+ [pytest.param(DataInterval(EVENT_DATES_SORTED[-1],
EVENT_DATES_SORTED[-1]), None)],
)
@@ -118,3 +127,52 @@ def test_subsequent_weekday_schedule(
restriction=restriction,
)
assert next_info == expected_next_info
+
+
[email protected](
+ "current_date",
+ [
+ pytest.param(pendulum.DateTime(2021, 9, 1, tzinfo=utc),
id="when-current-date-is-before-first-event"),
+ pytest.param(pendulum.DateTime(2021, 9, 8, tzinfo=utc),
id="when-current-date-is-in-the-middle"),
+ pytest.param(pendulum.DateTime(2021, 12, 9, tzinfo=utc),
id="when-current-date-is-after-last-event"),
+ ],
+)
[email protected](
+ "last_automated_data_interval",
+ [
+ pytest.param(None, id="first-run"),
+ pytest.param(DataInterval(start=BEFORE_DATE, end=BEFORE_DATE),
id="subsequent-run"),
+ ],
+)
+def test_no_catchup_first_starts(
+ last_automated_data_interval: DataInterval | None,
+ current_date,
+ unrestricted_timetable: Timetable,
+) -> None:
+ # we don't use the last_automated_data_interval here because it's always
less than the first event
+ expected_date = max(current_date, START_DATE, EVENT_DATES_SORTED[0])
+ expected_info = None
+ if expected_date <= EVENT_DATES_SORTED[-1]:
+ expected_info = DagRunInfo.interval(start=expected_date,
end=expected_date)
+
+ with time_machine.travel(current_date):
+ next_info = unrestricted_timetable.next_dagrun_info(
+ last_automated_data_interval=last_automated_data_interval,
+ restriction=TimeRestriction(earliest=START_DATE, latest=None,
catchup=False),
+ )
+ assert next_info == expected_info
+
+
+def test_empty_timetable() -> None:
+ empty_timetable = EventsTimetable(event_dates=[])
+ next_info = empty_timetable.next_dagrun_info(
+ last_automated_data_interval=None,
+ restriction=TimeRestriction(earliest=START_DATE, latest=None,
catchup=False),
+ )
+ assert next_info is None
+
+
+def test_empty_timetable_manual_run() -> None:
+ empty_timetable = EventsTimetable(event_dates=[])
+ manual_run_data_interval =
empty_timetable.infer_manual_data_interval(run_after=START_DATE)
+ assert manual_run_data_interval == DataInterval(start=START_DATE,
end=START_DATE)