This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 527b948856 [misc] Replace XOR `^` conditions by `exactly_one` helper
in providers (#27858)
527b948856 is described below
commit 527b948856584320f74d385f58477af79506834d
Author: Andrey Anshin <[email protected]>
AuthorDate: Sat Dec 3 19:11:21 2022 +0300
[misc] Replace XOR `^` conditions by `exactly_one` helper in providers
(#27858)
---
airflow/providers/amazon/aws/hooks/base_aws.py | 3 +-
airflow/providers/amazon/aws/operators/emr.py | 3 +-
airflow/providers/amazon/aws/operators/s3.py | 5 +-
.../google/cloud/operators/cloud_build.py | 3 +-
airflow/providers/slack/hooks/slack.py | 3 +-
.../amazon/aws/operators/test_emr_add_steps.py | 21 +++++-
.../amazon/aws/operators/test_s3_object.py | 74 ++++++++++++----------
.../google/cloud/operators/test_cloud_build.py | 6 +-
8 files changed, 73 insertions(+), 45 deletions(-)
diff --git a/airflow/providers/amazon/aws/hooks/base_aws.py
b/airflow/providers/amazon/aws/hooks/base_aws.py
index b902d102cb..ec06eb22e5 100644
--- a/airflow/providers/amazon/aws/hooks/base_aws.py
+++ b/airflow/providers/amazon/aws/hooks/base_aws.py
@@ -47,6 +47,7 @@ from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.hooks.base import BaseHook
from airflow.providers.amazon.aws.utils.connection_wrapper import
AwsConnectionWrapper
+from airflow.utils.helpers import exactly_one
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.secrets_masker import mask_secret
@@ -493,7 +494,7 @@ class AwsGenericHook(BaseHook, Generic[BaseAwsConnection]):
:return: boto3.client or boto3.resource
"""
- if not ((not self.client_type) ^ (not self.resource_type)):
+ if not exactly_one(self.client_type, self.resource_type):
raise ValueError(
f"Either client_type={self.client_type!r} or "
f"resource_type={self.resource_type!r} must be provided, not
both."
diff --git a/airflow/providers/amazon/aws/operators/emr.py
b/airflow/providers/amazon/aws/operators/emr.py
index 63659e8188..7395e42351 100644
--- a/airflow/providers/amazon/aws/operators/emr.py
+++ b/airflow/providers/amazon/aws/operators/emr.py
@@ -26,6 +26,7 @@ from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook,
EmrServerlessHook
from airflow.providers.amazon.aws.links.emr import EmrClusterLink
+from airflow.utils.helpers import exactly_one
if TYPE_CHECKING:
from airflow.utils.context import Context
@@ -71,7 +72,7 @@ class EmrAddStepsOperator(BaseOperator):
wait_for_completion: bool = False,
**kwargs,
):
- if not (job_flow_id is None) ^ (job_flow_name is None):
+ if not exactly_one(job_flow_id is None, job_flow_name is None):
raise AirflowException("Exactly one of job_flow_id or
job_flow_name must be specified.")
super().__init__(**kwargs)
cluster_states = cluster_states or []
diff --git a/airflow/providers/amazon/aws/operators/s3.py
b/airflow/providers/amazon/aws/operators/s3.py
index 9173208401..d748da67bb 100644
--- a/airflow/providers/amazon/aws/operators/s3.py
+++ b/airflow/providers/amazon/aws/operators/s3.py
@@ -26,6 +26,7 @@ from typing import TYPE_CHECKING, Sequence
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.utils.helpers import exactly_one
if TYPE_CHECKING:
from airflow.utils.context import Context
@@ -463,11 +464,11 @@ class S3DeleteObjectsOperator(BaseOperator):
self.aws_conn_id = aws_conn_id
self.verify = verify
- if not bool(keys is None) ^ bool(prefix is None):
+ if not exactly_one(prefix is None, keys is None):
raise AirflowException("Either keys or prefix should be set.")
def execute(self, context: Context):
- if not bool(self.keys is None) ^ bool(self.prefix is None):
+ if not exactly_one(self.keys is None, self.prefix is None):
raise AirflowException("Either keys or prefix should be set.")
if isinstance(self.keys, (list, str)) and not bool(self.keys):
diff --git a/airflow/providers/google/cloud/operators/cloud_build.py
b/airflow/providers/google/cloud/operators/cloud_build.py
index 33d8f40c88..b64e368c3d 100644
--- a/airflow/providers/google/cloud/operators/cloud_build.py
+++ b/airflow/providers/google/cloud/operators/cloud_build.py
@@ -40,6 +40,7 @@ from airflow.providers.google.cloud.links.cloud_build import (
from airflow.providers.google.cloud.triggers.cloud_build import
CloudBuildCreateBuildTrigger
from airflow.providers.google.common.consts import
GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME
from airflow.utils import yaml
+from airflow.utils.helpers import exactly_one
if TYPE_CHECKING:
from airflow.utils.context import Context
@@ -971,7 +972,7 @@ class BuildProcessor:
self.build = deepcopy(build)
def _verify_source(self) -> None:
- if not (("storage_source" in self.build["source"]) ^ ("repo_source" in
self.build["source"])):
+ if not exactly_one("storage_source" in self.build["source"],
"repo_source" in self.build["source"]):
raise AirflowException(
"The source could not be determined. Please choose one data
source from: "
"storage_source and repo_source."
diff --git a/airflow/providers/slack/hooks/slack.py
b/airflow/providers/slack/hooks/slack.py
index 2bf9bd2ace..61b6d2faa0 100644
--- a/airflow/providers/slack/hooks/slack.py
+++ b/airflow/providers/slack/hooks/slack.py
@@ -30,6 +30,7 @@ from airflow.compat.functools import cached_property
from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.hooks.base import BaseHook
from airflow.providers.slack.utils import ConnectionExtraConfig
+from airflow.utils.helpers import exactly_one
from airflow.utils.log.secrets_masker import mask_secret
if TYPE_CHECKING:
@@ -268,7 +269,7 @@ class SlackHook(BaseHook):
- `Slack API files.upload method
<https://api.slack.com/methods/files.upload>`_
- `File types <https://api.slack.com/types/file#file_types>`_
"""
- if not ((not file) ^ (not content)):
+ if not exactly_one(file, content):
raise ValueError("Either `file` or `content` must be provided, not
both.")
elif file:
file = Path(file)
diff --git a/tests/providers/amazon/aws/operators/test_emr_add_steps.py
b/tests/providers/amazon/aws/operators/test_emr_add_steps.py
index 657d27aebc..5b5f51030b 100644
--- a/tests/providers/amazon/aws/operators/test_emr_add_steps.py
+++ b/tests/providers/amazon/aws/operators/test_emr_add_steps.py
@@ -19,7 +19,6 @@ from __future__ import annotations
import json
import os
-import unittest
from datetime import timedelta
from unittest.mock import MagicMock, call, patch
@@ -41,7 +40,7 @@ TEMPLATE_SEARCHPATH = os.path.join(
)
-class TestEmrAddStepsOperator(unittest.TestCase):
+class TestEmrAddStepsOperator:
# When
_config = [
{
@@ -54,7 +53,7 @@ class TestEmrAddStepsOperator(unittest.TestCase):
}
]
- def setUp(self):
+ def setup_method(self):
self.args = {"owner": "airflow", "start_date": DEFAULT_DATE}
# Mock out the emr_client (moto has incorrect response)
@@ -79,6 +78,22 @@ class TestEmrAddStepsOperator(unittest.TestCase):
assert self.operator.job_flow_id == "j-8989898989"
assert self.operator.aws_conn_id == "aws_default"
+ @pytest.mark.parametrize(
+ "job_flow_id, job_flow_name",
+ [
+ pytest.param("j-8989898989", "test_cluster", id="both-specified"),
+ pytest.param(None, None, id="both-none"),
+ ],
+ )
+ def test_validate_mutually_exclusive_args(self, job_flow_id,
job_flow_name):
+ error_message = r"Exactly one of job_flow_id or job_flow_name must be
specified\."
+ with pytest.raises(AirflowException, match=error_message):
+ EmrAddStepsOperator(
+ task_id="test_validate_mutually_exclusive_args",
+ job_flow_id=job_flow_id,
+ job_flow_name=job_flow_name,
+ )
+
def test_render_template(self):
dag_run = DagRun(dag_id=self.operator.dag.dag_id,
execution_date=DEFAULT_DATE, run_id="test")
ti = TaskInstance(task=self.operator)
diff --git a/tests/providers/amazon/aws/operators/test_s3_object.py
b/tests/providers/amazon/aws/operators/test_s3_object.py
index 0d94664ce7..884137ec1f 100644
--- a/tests/providers/amazon/aws/operators/test_s3_object.py
+++ b/tests/providers/amazon/aws/operators/test_s3_object.py
@@ -22,6 +22,7 @@ import unittest
from unittest import mock
import boto3
+import pytest
from moto import mock_s3
from airflow import AirflowException
@@ -95,8 +96,8 @@ class TestS3CopyObjectOperator(unittest.TestCase):
assert objects_in_dest_bucket["Contents"][0]["Key"] == self.dest_key
-class TestS3DeleteObjectsOperator(unittest.TestCase):
- @mock_s3
+@mock_s3
+class TestS3DeleteObjectsOperator:
def test_s3_delete_single_object(self):
bucket = "testbucket"
key = "path/data.txt"
@@ -116,7 +117,6 @@ class TestS3DeleteObjectsOperator(unittest.TestCase):
# There should be no object found in the bucket created earlier
assert "Contents" not in conn.list_objects(Bucket=bucket, Prefix=key)
- @mock_s3
def test_s3_delete_multiple_objects(self):
bucket = "testbucket"
key_pattern = "path/data"
@@ -139,7 +139,6 @@ class TestS3DeleteObjectsOperator(unittest.TestCase):
# There should be no object found in the bucket created earlier
assert "Contents" not in conn.list_objects(Bucket=bucket,
Prefix=key_pattern)
- @mock_s3
def test_s3_delete_prefix(self):
bucket = "testbucket"
key_pattern = "path/data"
@@ -162,7 +161,6 @@ class TestS3DeleteObjectsOperator(unittest.TestCase):
# There should be no object found in the bucket created earlier
assert "Contents" not in conn.list_objects(Bucket=bucket,
Prefix=key_pattern)
- @mock_s3
def test_s3_delete_empty_list(self):
bucket = "testbucket"
key_of_test = "path/data.txt"
@@ -185,7 +183,6 @@ class TestS3DeleteObjectsOperator(unittest.TestCase):
# the object found should be consistent with dest_key specified earlier
assert objects_in_dest_bucket["Contents"][0]["Key"] == key_of_test
- @mock_s3
def test_s3_delete_empty_string(self):
bucket = "testbucket"
key_of_test = "path/data.txt"
@@ -208,36 +205,32 @@ class TestS3DeleteObjectsOperator(unittest.TestCase):
# the object found should be consistent with dest_key specified earlier
assert objects_in_dest_bucket["Contents"][0]["Key"] == key_of_test
- @mock_s3
- def test_assert_s3_both_keys_and_prifix_given(self):
- bucket = "testbucket"
- keys = "path/data.txt"
- key_pattern = "path/data"
-
- conn = boto3.client("s3")
- conn.create_bucket(Bucket=bucket)
- conn.upload_fileobj(Bucket=bucket, Key=keys,
Fileobj=io.BytesIO(b"input"))
-
- # The object should be detected before the DELETE action is tested
- objects_in_dest_bucket = conn.list_objects(Bucket=bucket, Prefix=keys)
- assert len(objects_in_dest_bucket["Contents"]) == 1
- assert objects_in_dest_bucket["Contents"][0]["Key"] == keys
- with self.assertRaises(AirflowException):
- op = S3DeleteObjectsOperator(
- task_id="test_assert_s3_both_keys_and_prifix_given",
- bucket=bucket,
+ @pytest.mark.parametrize(
+ "keys, prefix",
+ [
+ pytest.param("path/data.txt", "path/data",
id="single-key-and-prefix"),
+ pytest.param(["path/data.txt"], "path/data",
id="multiple-keys-and-prefix"),
+ pytest.param(None, None, id="both-none"),
+ ],
+ )
+ def test_validate_keys_and_prefix_in_constructor(self, keys, prefix):
+ with pytest.raises(AirflowException, match=r"Either keys or prefix
should be set\."):
+ S3DeleteObjectsOperator(
+ task_id="test_validate_keys_and_prefix_in_constructor",
+ bucket="foo-bar-bucket",
keys=keys,
- prefix=key_pattern,
+ prefix=prefix,
)
- op.execute(None)
-
- # The object found in the bucket created earlier should still be there
- assert len(objects_in_dest_bucket["Contents"]) == 1
- # the object found should be consistent with dest_key specified earlier
- assert objects_in_dest_bucket["Contents"][0]["Key"] == keys
- @mock_s3
- def test_assert_s3_no_keys_or_prifix_given(self):
+ @pytest.mark.parametrize(
+ "keys, prefix",
+ [
+ pytest.param("path/data.txt", "path/data",
id="single-key-and-prefix"),
+ pytest.param(["path/data.txt"], "path/data",
id="multiple-keys-and-prefix"),
+ pytest.param(None, None, id="both-none"),
+ ],
+ )
+ def test_validate_keys_and_prefix_in_execute(self, keys, prefix):
bucket = "testbucket"
key_of_test = "path/data.txt"
@@ -245,13 +238,24 @@ class TestS3DeleteObjectsOperator(unittest.TestCase):
conn.create_bucket(Bucket=bucket)
conn.upload_fileobj(Bucket=bucket, Key=key_of_test,
Fileobj=io.BytesIO(b"input"))
+ # Set valid values for constructor, and change them later for emulate
rendering template
+ op = S3DeleteObjectsOperator(
+ task_id="test_validate_keys_and_prefix_in_execute",
+ bucket=bucket,
+ keys="keys-exists",
+ prefix=None,
+ )
+ op.keys = keys
+ op.prefix = prefix
+
# The object should be detected before the DELETE action is tested
objects_in_dest_bucket = conn.list_objects(Bucket=bucket,
Prefix=key_of_test)
assert len(objects_in_dest_bucket["Contents"]) == 1
assert objects_in_dest_bucket["Contents"][0]["Key"] == key_of_test
- with self.assertRaises(AirflowException):
- op =
S3DeleteObjectsOperator(task_id="test_assert_s3_no_keys_or_prifix_given",
bucket=bucket)
+
+ with pytest.raises(AirflowException, match=r"Either keys or prefix
should be set\."):
op.execute(None)
+
# The object found in the bucket created earlier should still be there
assert len(objects_in_dest_bucket["Contents"]) == 1
# the object found should be consistent with dest_key specified earlier
diff --git a/tests/providers/google/cloud/operators/test_cloud_build.py
b/tests/providers/google/cloud/operators/test_cloud_build.py
index e80dcd1fb0..0ec514267e 100644
--- a/tests/providers/google/cloud/operators/test_cloud_build.py
+++ b/tests/providers/google/cloud/operators/test_cloud_build.py
@@ -304,9 +304,13 @@ class TestCloudBuildOperator(TestCase):
class TestBuildProcessor(TestCase):
def test_verify_source(self):
- with pytest.raises(AirflowException, match="The source could not be
determined."):
+ error_message = r"The source could not be determined."
+ with pytest.raises(AirflowException, match=error_message):
BuildProcessor(build={"source": {"storage_source": {},
"repo_source": {}}}).process_body()
+ with pytest.raises(AirflowException, match=error_message):
+ BuildProcessor(build={"source": {}}).process_body()
+
@parameterized.expand(
[
(