This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e79895308a Respect `soft_fail` parameter in `S3KeysUnchangedSensor`
and `S3KeySensor` (#34550)
e79895308a is described below
commit e79895308a81ed2a6e163c95d69e2db0da5c4c12
Author: Utkarsh Sharma <[email protected]>
AuthorDate: Fri Sep 22 20:48:24 2023 +0530
Respect `soft_fail` parameter in `S3KeysUnchangedSensor` and `S3KeySensor`
(#34550)
---
airflow/providers/amazon/aws/sensors/s3.py | 14 ++++++++++--
tests/providers/amazon/aws/sensors/test_s3_key.py | 16 +++++++++++++-
.../amazon/aws/sensors/test_s3_keys_unchanged.py | 25 +++++++++++++++++++++-
3 files changed, 51 insertions(+), 4 deletions(-)
diff --git a/airflow/providers/amazon/aws/sensors/s3.py
b/airflow/providers/amazon/aws/sensors/s3.py
index 654aca5bde..519aa49d6f 100644
--- a/airflow/providers/amazon/aws/sensors/s3.py
+++ b/airflow/providers/amazon/aws/sensors/s3.py
@@ -31,7 +31,7 @@ from airflow.configuration import conf
if TYPE_CHECKING:
from airflow.utils.context import Context
-from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
+from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.triggers.s3 import S3KeysUnchangedTrigger,
S3KeyTrigger
from airflow.sensors.base import BaseSensorOperator, poke_mode_only
@@ -176,6 +176,9 @@ class S3KeySensor(BaseSensorOperator):
self._defer()
if event["status"] == "error":
+ # TODO: remove this if block when min_airflow_version is set to
higher than 2.7.1
+ if self.soft_fail:
+ raise AirflowSkipException(event["message"])
raise AirflowException(event["message"])
return None
@@ -297,10 +300,14 @@ class S3KeysUnchangedSensor(BaseSensorOperator):
)
return False
- raise AirflowException(
+ # TODO: remove this if block when min_airflow_version is set to
higher than 2.7.1
+ message = (
f"Illegal behavior: objects were deleted in"
f" {os.path.join(self.bucket_name, self.prefix)} between
pokes."
)
+ if self.soft_fail:
+ raise AirflowSkipException(message)
+ raise AirflowException(message)
if self.last_activity_time:
self.inactivity_seconds = int((datetime.now() -
self.last_activity_time).total_seconds())
@@ -360,5 +367,8 @@ class S3KeysUnchangedSensor(BaseSensorOperator):
Relies on trigger to throw an exception, otherwise it assumes
execution was successful.
"""
if event and event["status"] == "error":
+ # TODO: remove this if block when min_airflow_version is set to
higher than 2.7.1
+ if self.soft_fail:
+ raise AirflowSkipException(event["message"])
raise AirflowException(event["message"])
return None
diff --git a/tests/providers/amazon/aws/sensors/test_s3_key.py
b/tests/providers/amazon/aws/sensors/test_s3_key.py
index 7c6f435f53..ef32be43d2 100644
--- a/tests/providers/amazon/aws/sensors/test_s3_key.py
+++ b/tests/providers/amazon/aws/sensors/test_s3_key.py
@@ -21,7 +21,7 @@ from unittest import mock
import pytest
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.models import DAG, DagRun, TaskInstance
from airflow.models.variable import Variable
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
@@ -245,3 +245,17 @@ class TestS3KeySensor:
assert (
sensor.execute_complete(context={}, event={"status": "running",
"files": [{"Size": 10}]}) is None
)
+
+ @pytest.mark.parametrize(
+ "soft_fail, expected_exception", ((False, AirflowException), (True,
AirflowSkipException))
+ )
+ def test_fail_execute_complete(self, soft_fail, expected_exception):
+ op = S3KeySensor(
+ task_id="s3_key_sensor",
+ bucket_key=["s3://test_bucket/file*", "s3://test_bucket/*.zip"],
+ wildcard_match=True,
+ )
+ op.soft_fail = soft_fail
+ message = "error"
+ with pytest.raises(expected_exception, match=message):
+ op.execute_complete(context={}, event={"status": "error",
"message": message})
diff --git a/tests/providers/amazon/aws/sensors/test_s3_keys_unchanged.py
b/tests/providers/amazon/aws/sensors/test_s3_keys_unchanged.py
index 726c4de7db..56caa9ecc1 100644
--- a/tests/providers/amazon/aws/sensors/test_s3_keys_unchanged.py
+++ b/tests/providers/amazon/aws/sensors/test_s3_keys_unchanged.py
@@ -23,7 +23,7 @@ from unittest import mock
import pytest
import time_machine
-from airflow.models.dag import DAG, AirflowException
+from airflow.models.dag import DAG, AirflowException, AirflowSkipException
from airflow.providers.amazon.aws.sensors.s3 import S3KeysUnchangedSensor
TEST_DAG_ID = "unit_tests_aws_sensor"
@@ -114,3 +114,26 @@ class TestS3KeysUnchangedSensor:
assert not self.sensor.poke(dict())
time_machine.coordinates.shift(10)
assert self.sensor.poke(dict())
+
+ @pytest.mark.parametrize(
+ "soft_fail, expected_exception", ((False, AirflowException), (True,
AirflowSkipException))
+ )
+ def test_fail_is_keys_unchanged(self, soft_fail, expected_exception):
+ op = S3KeysUnchangedSensor(task_id="sensor",
bucket_name="test-bucket", prefix="test-prefix/path")
+ op.soft_fail = soft_fail
+ op.previous_objects = {"1", "2", "3"}
+ current_objects = {"1", "2"}
+ op.allow_delete = False
+ message = "Illegal behavior: objects were deleted in"
+ with pytest.raises(expected_exception, match=message):
+ op.is_keys_unchanged(current_objects=current_objects)
+
+ @pytest.mark.parametrize(
+ "soft_fail, expected_exception", ((False, AirflowException), (True,
AirflowSkipException))
+ )
+ def test_fail_execute_complete(self, soft_fail, expected_exception):
+ op = S3KeysUnchangedSensor(task_id="sensor",
bucket_name="test-bucket", prefix="test-prefix/path")
+ op.soft_fail = soft_fail
+ message = "test message"
+ with pytest.raises(expected_exception, match=message):
+ op.execute_complete(context={}, event={"status": "error",
"message": message})