This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a1e417c4fe Convert SNS Sample DAG to System Test (#24384)
a1e417c4fe is described below

commit a1e417c4fe44979e5965442ce3ba6564494fcf63
Author: D. Ferruzzi <[email protected]>
AuthorDate: Sun Jun 12 02:53:16 2022 -0700

    Convert SNS Sample DAG to System Test (#24384)
---
 .../amazon/aws/example_dags/example_sns.py         | 39 -----------
 airflow/providers/amazon/aws/operators/sns.py      |  4 +-
 .../operators/sns.rst                              |  2 +-
 tests/system/providers/amazon/aws/example_sns.py   | 81 ++++++++++++++++++++++
 4 files changed, 84 insertions(+), 42 deletions(-)

diff --git a/airflow/providers/amazon/aws/example_dags/example_sns.py 
b/airflow/providers/amazon/aws/example_dags/example_sns.py
deleted file mode 100644
index 782156b14c..0000000000
--- a/airflow/providers/amazon/aws/example_dags/example_sns.py
+++ /dev/null
@@ -1,39 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from datetime import datetime
-from os import environ
-
-from airflow import DAG
-from airflow.providers.amazon.aws.operators.sns import SnsPublishOperator
-
-SNS_TOPIC_ARN = environ.get('SNS_TOPIC_ARN', 
'arn:aws:sns:us-west-2:123456789012:dummy-topic-name')
-
-with DAG(
-    dag_id='example_sns',
-    schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
-    tags=['example'],
-    catchup=False,
-) as dag:
-
-    # [START howto_operator_sns_publish_operator]
-    publish = SnsPublishOperator(
-        task_id='publish_message',
-        target_arn=SNS_TOPIC_ARN,
-        message='This is a sample message sent to SNS via an Apache Airflow 
DAG task.',
-    )
-    # [END howto_operator_sns_publish_operator]
diff --git a/airflow/providers/amazon/aws/operators/sns.py 
b/airflow/providers/amazon/aws/operators/sns.py
index e916798d03..d9dd3c4531 100644
--- a/airflow/providers/amazon/aws/operators/sns.py
+++ b/airflow/providers/amazon/aws/operators/sns.py
@@ -42,7 +42,7 @@ class SnsPublishOperator(BaseOperator):
         determined automatically)
     """
 
-    template_fields: Sequence[str] = ('message', 'subject', 
'message_attributes')
+    template_fields: Sequence[str] = ('target_arn', 'message', 'subject', 
'message_attributes', 'aws_conn_id')
     template_ext: Sequence[str] = ()
     template_fields_renderers = {"message_attributes": "json"}
 
@@ -51,9 +51,9 @@ class SnsPublishOperator(BaseOperator):
         *,
         target_arn: str,
         message: str,
-        aws_conn_id: str = 'aws_default',
         subject: Optional[str] = None,
         message_attributes: Optional[dict] = None,
+        aws_conn_id: str = 'aws_default',
         **kwargs,
     ):
         super().__init__(**kwargs)
diff --git a/docs/apache-airflow-providers-amazon/operators/sns.rst 
b/docs/apache-airflow-providers-amazon/operators/sns.rst
index a6001cdc39..caaa484527 100644
--- a/docs/apache-airflow-providers-amazon/operators/sns.rst
+++ b/docs/apache-airflow-providers-amazon/operators/sns.rst
@@ -43,7 +43,7 @@ Publish a message to an existing SNS topic
 To publish a message to an Amazon SNS Topic you can use
 :class:`~airflow.providers.amazon.aws.operators.sns.SnsPublishOperator`.
 
-.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_sns.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_sns.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_sns_publish_operator]
diff --git a/tests/system/providers/amazon/aws/example_sns.py 
b/tests/system/providers/amazon/aws/example_sns.py
new file mode 100644
index 0000000000..3231184e12
--- /dev/null
+++ b/tests/system/providers/amazon/aws/example_sns.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from datetime import datetime
+
+import boto3
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.models.baseoperator import chain
+from airflow.providers.amazon.aws.operators.sns import SnsPublishOperator
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import set_env_id
+
+ENV_ID = set_env_id()
+DAG_ID = 'example_sns'
+
+SNS_TOPIC_NAME = f'{ENV_ID}-test-topic'
+
+
+@task
+def create_topic() -> str:
+    return boto3.client('sns').create_topic(Name=SNS_TOPIC_NAME)['TopicArn']
+
+
+@task(trigger_rule=TriggerRule.ALL_DONE)
+def delete_topic(topic_arn) -> None:
+    boto3.client('sns').delete_topic(TopicArn=topic_arn)
+
+
+with DAG(
+    dag_id=DAG_ID,
+    schedule_interval='@once',
+    start_date=datetime(2021, 1, 1),
+    tags=['example'],
+    catchup=False,
+) as dag:
+
+    create_sns_topic = create_topic()
+
+    # [START howto_operator_sns_publish_operator]
+    publish_message = SnsPublishOperator(
+        task_id='publish_message',
+        target_arn=create_sns_topic,
+        message='This is a sample message sent to SNS via an Apache Airflow 
DAG task.',
+    )
+    # [END howto_operator_sns_publish_operator]
+
+    chain(
+        # TEST SETUP
+        create_sns_topic,
+        # TEST BODY
+        publish_message,
+        # TEST TEARDOWN
+        delete_topic(create_sns_topic),
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)

Reply via email to