This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-6-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 3b51035b5b4a7823da043637e9db3362e6529658 Author: Wei Lee <[email protected]> AuthorDate: Fri Jun 16 16:31:51 2023 +0800 feat(jobs/triggerer_job_runner): add triggerer canceled log (#31757) Emit log message when trigger is cancelled Co-authored-by: Daniel Standish <[email protected]> Co-authored-by: Jed Cunningham <[email protected]> (cherry picked from commit a60429eadfffb5fb0f867c220a6cecf628692dcf) --- airflow/jobs/triggerer_job_runner.py | 9 +++++++++ tests/jobs/test_triggerer_job.py | 29 ++++++++++++++++++++++++++++- 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index 7051dec5a0..7ca3fc2e94 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -40,6 +40,7 @@ from airflow.serialization.pydantic.job import JobPydantic from airflow.stats import Stats from airflow.triggers.base import BaseTrigger, TriggerEvent from airflow.typing_compat import TypedDict +from airflow.utils import timezone from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.log.trigger_handler import ( @@ -615,6 +616,14 @@ class TriggerRunner(threading.Thread, LoggingMixin): self.log.info("Trigger %s fired: %s", self.triggers[trigger_id]["name"], event) self.triggers[trigger_id]["events"] += 1 self.events.append((trigger_id, event)) + except asyncio.CancelledError as err: + timeout = trigger.task_instance.trigger_timeout + if timeout: + timeout = timeout.replace(tzinfo=timezone.utc) if not timeout.tzinfo else timeout + if timeout < timezone.utcnow(): + self.log.error("Trigger cancelled due to timeout") + self.log.error("Trigger cancelled; message=%s", err) + raise finally: # CancelledError will get injected when we're stopped - which is # fine, the cleanup process will understand that, but we want to diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py index dd35dd4cc8..d0ce887a57 100644 --- a/tests/jobs/test_triggerer_job.py +++ b/tests/jobs/test_triggerer_job.py @@ -22,7 +22,7 @@ import datetime import importlib import time from threading import Thread -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pendulum import pytest @@ -255,6 +255,33 @@ def test_trigger_lifecycle(session): job_runner.trigger_runner.stop = True +class TestTriggerRunner: + @pytest.mark.asyncio + @patch("airflow.jobs.triggerer_job_runner.TriggerRunner.set_individual_trigger_logging") + async def test_run_trigger_canceled(self, session) -> None: + trigger_runner = TriggerRunner() + trigger_runner.triggers = {1: {"task": MagicMock(), "name": "mock_name", "events": 0}} + mock_trigger = MagicMock() + mock_trigger.task_instance.trigger_timeout = None + mock_trigger.run.side_effect = asyncio.CancelledError() + + with pytest.raises(asyncio.CancelledError): + await trigger_runner.run_trigger(1, mock_trigger) + + @pytest.mark.asyncio + @patch("airflow.jobs.triggerer_job_runner.TriggerRunner.set_individual_trigger_logging") + async def test_run_trigger_timeout(self, session, caplog) -> None: + trigger_runner = TriggerRunner() + trigger_runner.triggers = {1: {"task": MagicMock(), "name": "mock_name", "events": 0}} + mock_trigger = MagicMock() + mock_trigger.task_instance.trigger_timeout = timezone.utcnow() - datetime.timedelta(hours=1) + mock_trigger.run.side_effect = asyncio.CancelledError() + + with pytest.raises(asyncio.CancelledError): + await trigger_runner.run_trigger(1, mock_trigger) + assert "Trigger cancelled due to timeout" in caplog.text + + def test_trigger_create_race_condition_18392(session, tmp_path): """ This verifies the resolution of race condition documented in github issue #18392.
