This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e79895308a Respect `soft_fail` parameter in `S3KeysUnchangedSensor` 
and `S3KeySensor` (#34550)
e79895308a is described below

commit e79895308a81ed2a6e163c95d69e2db0da5c4c12
Author: Utkarsh Sharma <[email protected]>
AuthorDate: Fri Sep 22 20:48:24 2023 +0530

    Respect `soft_fail` parameter in `S3KeysUnchangedSensor` and `S3KeySensor` 
(#34550)
---
 airflow/providers/amazon/aws/sensors/s3.py         | 14 ++++++++++--
 tests/providers/amazon/aws/sensors/test_s3_key.py  | 16 +++++++++++++-
 .../amazon/aws/sensors/test_s3_keys_unchanged.py   | 25 +++++++++++++++++++++-
 3 files changed, 51 insertions(+), 4 deletions(-)

diff --git a/airflow/providers/amazon/aws/sensors/s3.py 
b/airflow/providers/amazon/aws/sensors/s3.py
index 654aca5bde..519aa49d6f 100644
--- a/airflow/providers/amazon/aws/sensors/s3.py
+++ b/airflow/providers/amazon/aws/sensors/s3.py
@@ -31,7 +31,7 @@ from airflow.configuration import conf
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
-from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning, AirflowSkipException
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.amazon.aws.triggers.s3 import S3KeysUnchangedTrigger, 
S3KeyTrigger
 from airflow.sensors.base import BaseSensorOperator, poke_mode_only
@@ -176,6 +176,9 @@ class S3KeySensor(BaseSensorOperator):
                 self._defer()
 
         if event["status"] == "error":
+            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
+            if self.soft_fail:
+                raise AirflowSkipException(event["message"])
             raise AirflowException(event["message"])
         return None
 
@@ -297,10 +300,14 @@ class S3KeysUnchangedSensor(BaseSensorOperator):
                 )
                 return False
 
-            raise AirflowException(
+            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
+            message = (
                 f"Illegal behavior: objects were deleted in"
                 f" {os.path.join(self.bucket_name, self.prefix)} between 
pokes."
             )
+            if self.soft_fail:
+                raise AirflowSkipException(message)
+            raise AirflowException(message)
 
         if self.last_activity_time:
             self.inactivity_seconds = int((datetime.now() - 
self.last_activity_time).total_seconds())
@@ -360,5 +367,8 @@ class S3KeysUnchangedSensor(BaseSensorOperator):
         Relies on trigger to throw an exception, otherwise it assumes 
execution was successful.
         """
         if event and event["status"] == "error":
+            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
+            if self.soft_fail:
+                raise AirflowSkipException(event["message"])
             raise AirflowException(event["message"])
         return None
diff --git a/tests/providers/amazon/aws/sensors/test_s3_key.py 
b/tests/providers/amazon/aws/sensors/test_s3_key.py
index 7c6f435f53..ef32be43d2 100644
--- a/tests/providers/amazon/aws/sensors/test_s3_key.py
+++ b/tests/providers/amazon/aws/sensors/test_s3_key.py
@@ -21,7 +21,7 @@ from unittest import mock
 
 import pytest
 
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, AirflowSkipException
 from airflow.models import DAG, DagRun, TaskInstance
 from airflow.models.variable import Variable
 from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
@@ -245,3 +245,17 @@ class TestS3KeySensor:
         assert (
             sensor.execute_complete(context={}, event={"status": "running", 
"files": [{"Size": 10}]}) is None
         )
+
+    @pytest.mark.parametrize(
+        "soft_fail, expected_exception", ((False, AirflowException), (True, 
AirflowSkipException))
+    )
+    def test_fail_execute_complete(self, soft_fail, expected_exception):
+        op = S3KeySensor(
+            task_id="s3_key_sensor",
+            bucket_key=["s3://test_bucket/file*", "s3://test_bucket/*.zip"],
+            wildcard_match=True,
+        )
+        op.soft_fail = soft_fail
+        message = "error"
+        with pytest.raises(expected_exception, match=message):
+            op.execute_complete(context={}, event={"status": "error", 
"message": message})
diff --git a/tests/providers/amazon/aws/sensors/test_s3_keys_unchanged.py 
b/tests/providers/amazon/aws/sensors/test_s3_keys_unchanged.py
index 726c4de7db..56caa9ecc1 100644
--- a/tests/providers/amazon/aws/sensors/test_s3_keys_unchanged.py
+++ b/tests/providers/amazon/aws/sensors/test_s3_keys_unchanged.py
@@ -23,7 +23,7 @@ from unittest import mock
 import pytest
 import time_machine
 
-from airflow.models.dag import DAG, AirflowException
+from airflow.models.dag import DAG, AirflowException, AirflowSkipException
 from airflow.providers.amazon.aws.sensors.s3 import S3KeysUnchangedSensor
 
 TEST_DAG_ID = "unit_tests_aws_sensor"
@@ -114,3 +114,26 @@ class TestS3KeysUnchangedSensor:
         assert not self.sensor.poke(dict())
         time_machine.coordinates.shift(10)
         assert self.sensor.poke(dict())
+
+    @pytest.mark.parametrize(
+        "soft_fail, expected_exception", ((False, AirflowException), (True, 
AirflowSkipException))
+    )
+    def test_fail_is_keys_unchanged(self, soft_fail, expected_exception):
+        op = S3KeysUnchangedSensor(task_id="sensor", 
bucket_name="test-bucket", prefix="test-prefix/path")
+        op.soft_fail = soft_fail
+        op.previous_objects = {"1", "2", "3"}
+        current_objects = {"1", "2"}
+        op.allow_delete = False
+        message = "Illegal behavior: objects were deleted in"
+        with pytest.raises(expected_exception, match=message):
+            op.is_keys_unchanged(current_objects=current_objects)
+
+    @pytest.mark.parametrize(
+        "soft_fail, expected_exception", ((False, AirflowException), (True, 
AirflowSkipException))
+    )
+    def test_fail_execute_complete(self, soft_fail, expected_exception):
+        op = S3KeysUnchangedSensor(task_id="sensor", 
bucket_name="test-bucket", prefix="test-prefix/path")
+        op.soft_fail = soft_fail
+        message = "test message"
+        with pytest.raises(expected_exception, match=message):
+            op.execute_complete(context={}, event={"status": "error", 
"message": message})

Reply via email to