This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a1e417c4fe Convert SNS Sample DAG to System Test (#24384)
a1e417c4fe is described below
commit a1e417c4fe44979e5965442ce3ba6564494fcf63
Author: D. Ferruzzi <[email protected]>
AuthorDate: Sun Jun 12 02:53:16 2022 -0700
Convert SNS Sample DAG to System Test (#24384)
---
.../amazon/aws/example_dags/example_sns.py | 39 -----------
airflow/providers/amazon/aws/operators/sns.py | 4 +-
.../operators/sns.rst | 2 +-
tests/system/providers/amazon/aws/example_sns.py | 81 ++++++++++++++++++++++
4 files changed, 84 insertions(+), 42 deletions(-)
diff --git a/airflow/providers/amazon/aws/example_dags/example_sns.py
b/airflow/providers/amazon/aws/example_dags/example_sns.py
deleted file mode 100644
index 782156b14c..0000000000
--- a/airflow/providers/amazon/aws/example_dags/example_sns.py
+++ /dev/null
@@ -1,39 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from datetime import datetime
-from os import environ
-
-from airflow import DAG
-from airflow.providers.amazon.aws.operators.sns import SnsPublishOperator
-
-SNS_TOPIC_ARN = environ.get('SNS_TOPIC_ARN',
'arn:aws:sns:us-west-2:123456789012:dummy-topic-name')
-
-with DAG(
- dag_id='example_sns',
- schedule_interval=None,
- start_date=datetime(2021, 1, 1),
- tags=['example'],
- catchup=False,
-) as dag:
-
- # [START howto_operator_sns_publish_operator]
- publish = SnsPublishOperator(
- task_id='publish_message',
- target_arn=SNS_TOPIC_ARN,
- message='This is a sample message sent to SNS via an Apache Airflow
DAG task.',
- )
- # [END howto_operator_sns_publish_operator]
diff --git a/airflow/providers/amazon/aws/operators/sns.py
b/airflow/providers/amazon/aws/operators/sns.py
index e916798d03..d9dd3c4531 100644
--- a/airflow/providers/amazon/aws/operators/sns.py
+++ b/airflow/providers/amazon/aws/operators/sns.py
@@ -42,7 +42,7 @@ class SnsPublishOperator(BaseOperator):
determined automatically)
"""
- template_fields: Sequence[str] = ('message', 'subject',
'message_attributes')
+ template_fields: Sequence[str] = ('target_arn', 'message', 'subject',
'message_attributes', 'aws_conn_id')
template_ext: Sequence[str] = ()
template_fields_renderers = {"message_attributes": "json"}
@@ -51,9 +51,9 @@ class SnsPublishOperator(BaseOperator):
*,
target_arn: str,
message: str,
- aws_conn_id: str = 'aws_default',
subject: Optional[str] = None,
message_attributes: Optional[dict] = None,
+ aws_conn_id: str = 'aws_default',
**kwargs,
):
super().__init__(**kwargs)
diff --git a/docs/apache-airflow-providers-amazon/operators/sns.rst
b/docs/apache-airflow-providers-amazon/operators/sns.rst
index a6001cdc39..caaa484527 100644
--- a/docs/apache-airflow-providers-amazon/operators/sns.rst
+++ b/docs/apache-airflow-providers-amazon/operators/sns.rst
@@ -43,7 +43,7 @@ Publish a message to an existing SNS topic
To publish a message to an Amazon SNS Topic you can use
:class:`~airflow.providers.amazon.aws.operators.sns.SnsPublishOperator`.
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_sns.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_sns.py
:language: python
:dedent: 4
:start-after: [START howto_operator_sns_publish_operator]
diff --git a/tests/system/providers/amazon/aws/example_sns.py
b/tests/system/providers/amazon/aws/example_sns.py
new file mode 100644
index 0000000000..3231184e12
--- /dev/null
+++ b/tests/system/providers/amazon/aws/example_sns.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from datetime import datetime
+
+import boto3
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.models.baseoperator import chain
+from airflow.providers.amazon.aws.operators.sns import SnsPublishOperator
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import set_env_id
+
+ENV_ID = set_env_id()
+DAG_ID = 'example_sns'
+
+SNS_TOPIC_NAME = f'{ENV_ID}-test-topic'
+
+
+@task
+def create_topic() -> str:
+ return boto3.client('sns').create_topic(Name=SNS_TOPIC_NAME)['TopicArn']
+
+
+@task(trigger_rule=TriggerRule.ALL_DONE)
+def delete_topic(topic_arn) -> None:
+ boto3.client('sns').delete_topic(TopicArn=topic_arn)
+
+
+with DAG(
+ dag_id=DAG_ID,
+ schedule_interval='@once',
+ start_date=datetime(2021, 1, 1),
+ tags=['example'],
+ catchup=False,
+) as dag:
+
+ create_sns_topic = create_topic()
+
+ # [START howto_operator_sns_publish_operator]
+ publish_message = SnsPublishOperator(
+ task_id='publish_message',
+ target_arn=create_sns_topic,
+ message='This is a sample message sent to SNS via an Apache Airflow
DAG task.',
+ )
+ # [END howto_operator_sns_publish_operator]
+
+ chain(
+ # TEST SETUP
+ create_sns_topic,
+ # TEST BODY
+ publish_message,
+ # TEST TEARDOWN
+ delete_topic(create_sns_topic),
+ )
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)