This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-6-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 3b51035b5b4a7823da043637e9db3362e6529658
Author: Wei Lee <[email protected]>
AuthorDate: Fri Jun 16 16:31:51 2023 +0800

    feat(jobs/triggerer_job_runner): add triggerer canceled log (#31757)
    
    Emit log message when trigger is cancelled
    
    Co-authored-by: Daniel Standish 
<[email protected]>
    Co-authored-by: Jed Cunningham 
<[email protected]>
    (cherry picked from commit a60429eadfffb5fb0f867c220a6cecf628692dcf)
---
 airflow/jobs/triggerer_job_runner.py |  9 +++++++++
 tests/jobs/test_triggerer_job.py     | 29 ++++++++++++++++++++++++++++-
 2 files changed, 37 insertions(+), 1 deletion(-)

diff --git a/airflow/jobs/triggerer_job_runner.py 
b/airflow/jobs/triggerer_job_runner.py
index 7051dec5a0..7ca3fc2e94 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -40,6 +40,7 @@ from airflow.serialization.pydantic.job import JobPydantic
 from airflow.stats import Stats
 from airflow.triggers.base import BaseTrigger, TriggerEvent
 from airflow.typing_compat import TypedDict
+from airflow.utils import timezone
 from airflow.utils.log.file_task_handler import FileTaskHandler
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.log.trigger_handler import (
@@ -615,6 +616,14 @@ class TriggerRunner(threading.Thread, LoggingMixin):
                 self.log.info("Trigger %s fired: %s", 
self.triggers[trigger_id]["name"], event)
                 self.triggers[trigger_id]["events"] += 1
                 self.events.append((trigger_id, event))
+        except asyncio.CancelledError as err:
+            timeout = trigger.task_instance.trigger_timeout
+            if timeout:
+                timeout = timeout.replace(tzinfo=timezone.utc) if not 
timeout.tzinfo else timeout
+                if timeout < timezone.utcnow():
+                    self.log.error("Trigger cancelled due to timeout")
+            self.log.error("Trigger cancelled; message=%s", err)
+            raise
         finally:
             # CancelledError will get injected when we're stopped - which is
             # fine, the cleanup process will understand that, but we want to
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index dd35dd4cc8..d0ce887a57 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -22,7 +22,7 @@ import datetime
 import importlib
 import time
 from threading import Thread
-from unittest.mock import patch
+from unittest.mock import MagicMock, patch
 
 import pendulum
 import pytest
@@ -255,6 +255,33 @@ def test_trigger_lifecycle(session):
         job_runner.trigger_runner.stop = True
 
 
+class TestTriggerRunner:
+    @pytest.mark.asyncio
+    
@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.set_individual_trigger_logging")
+    async def test_run_trigger_canceled(self, session) -> None:
+        trigger_runner = TriggerRunner()
+        trigger_runner.triggers = {1: {"task": MagicMock(), "name": 
"mock_name", "events": 0}}
+        mock_trigger = MagicMock()
+        mock_trigger.task_instance.trigger_timeout = None
+        mock_trigger.run.side_effect = asyncio.CancelledError()
+
+        with pytest.raises(asyncio.CancelledError):
+            await trigger_runner.run_trigger(1, mock_trigger)
+
+    @pytest.mark.asyncio
+    
@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.set_individual_trigger_logging")
+    async def test_run_trigger_timeout(self, session, caplog) -> None:
+        trigger_runner = TriggerRunner()
+        trigger_runner.triggers = {1: {"task": MagicMock(), "name": 
"mock_name", "events": 0}}
+        mock_trigger = MagicMock()
+        mock_trigger.task_instance.trigger_timeout = timezone.utcnow() - 
datetime.timedelta(hours=1)
+        mock_trigger.run.side_effect = asyncio.CancelledError()
+
+        with pytest.raises(asyncio.CancelledError):
+            await trigger_runner.run_trigger(1, mock_trigger)
+        assert "Trigger cancelled due to timeout" in caplog.text
+
+
 def test_trigger_create_race_condition_18392(session, tmp_path):
     """
     This verifies the resolution of race condition documented in github issue 
#18392.

Reply via email to