This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bc3fc8c9fc Migrate Google azure_fileshare example DAG to new design
AIP-47 (#24349)
bc3fc8c9fc is described below
commit bc3fc8c9fcb5d1291797aa5f4fc8da954573c694
Author: Chenglong Yan <[email protected]>
AuthorDate: Sun Jun 12 17:55:56 2022 +0800
Migrate Google azure_fileshare example DAG to new design AIP-47 (#24349)
related: #22430, #22447
---
.../operators/transfer/azure_fileshare_to_gcs.rst | 2 +-
.../test_azure_fileshare_to_gcs_system.py | 85 ----------------------
.../azure}/example_azure_fileshare_to_gcs.py | 44 +++++++++--
3 files changed, 40 insertions(+), 91 deletions(-)
diff --git
a/docs/apache-airflow-providers-google/operators/transfer/azure_fileshare_to_gcs.rst
b/docs/apache-airflow-providers-google/operators/transfer/azure_fileshare_to_gcs.rst
index 9eac6e3d96..1ff6965d68 100644
---
a/docs/apache-airflow-providers-google/operators/transfer/azure_fileshare_to_gcs.rst
+++
b/docs/apache-airflow-providers-google/operators/transfer/azure_fileshare_to_gcs.rst
@@ -38,7 +38,7 @@ All parameters are described in the reference documentation -
:class:`~airflow.p
An example operator call might look like this:
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_azure_fileshare_to_gcs.py
+.. exampleinclude::
/../../tests/system/providers/google/azure/example_azure_fileshare_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_operator_azure_fileshare_to_gcs_basic]
diff --git
a/tests/providers/google/cloud/transfers/test_azure_fileshare_to_gcs_system.py
b/tests/providers/google/cloud/transfers/test_azure_fileshare_to_gcs_system.py
deleted file mode 100644
index 8253063be0..0000000000
---
a/tests/providers/google/cloud/transfers/test_azure_fileshare_to_gcs_system.py
+++ /dev/null
@@ -1,85 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import os
-
-import pytest
-
-from airflow.models import Connection
-from
airflow.providers.google.cloud.example_dags.example_azure_fileshare_to_gcs
import (
- AZURE_DIRECTORY_NAME,
- AZURE_SHARE_NAME,
- DEST_GCS_BUCKET,
-)
-from airflow.utils.session import create_session
-from tests.providers.google.cloud.utils.gcp_authenticator import GCP_GCS_KEY
-from tests.test_utils.azure_system_helpers import AzureSystemTest,
provide_azure_fileshare
-from tests.test_utils.db import clear_db_connections
-from tests.test_utils.gcp_system_helpers import (
- CLOUD_DAG_FOLDER,
- GoogleSystemTest,
- provide_gcp_context,
- provide_gcs_bucket,
-)
-
-AZURE_LOGIN = os.environ.get('AZURE_LOGIN', 'default_login')
-AZURE_KEY = os.environ.get('AZURE_KEY', 'default_key')
-CONN_ID = 'azure_fileshare_default'
-AZURE_FILE_NAME = 'file.bin'
-
-
[email protected]
-def provide_azure_fileshare_with_directory():
- with create_session() as session:
- azure_fileshare_conn_id = Connection(
- conn_id=CONN_ID,
- conn_type='https',
- login=AZURE_LOGIN,
- password=AZURE_KEY,
- )
- session.add(azure_fileshare_conn_id)
-
- with provide_azure_fileshare(
- share_name=AZURE_SHARE_NAME,
- azure_fileshare_conn_id=CONN_ID,
- file_name=AZURE_FILE_NAME,
- directory=AZURE_DIRECTORY_NAME,
- ):
- yield
-
- clear_db_connections()
-
-
[email protected]
-def provide_gcs_bucket_basic():
- with provide_gcs_bucket(bucket_name=DEST_GCS_BUCKET):
- yield
-
-
[email protected]_file(GCP_GCS_KEY)
[email protected]("google.cloud")
-class AzureFileShareToGCSOperatorExampleDAGsTest(GoogleSystemTest,
AzureSystemTest):
- def setUp(self):
- super().setUp()
-
- @pytest.mark.usefixtures('provide_gcs_bucket_basic',
'provide_azure_fileshare_with_directory')
- @provide_gcp_context(GCP_GCS_KEY)
- def test_run_example_dag_azure_fileshare_to_gcs(self):
- self.run_dag('azure_fileshare_to_gcs_example', CLOUD_DAG_FOLDER)
-
- def tearDown(self):
- super().tearDown()
diff --git
a/airflow/providers/google/cloud/example_dags/example_azure_fileshare_to_gcs.py
b/tests/system/providers/google/azure/example_azure_fileshare_to_gcs.py
similarity index 59%
rename from
airflow/providers/google/cloud/example_dags/example_azure_fileshare_to_gcs.py
rename to tests/system/providers/google/azure/example_azure_fileshare_to_gcs.py
index 680b43bd01..54671fb191 100644
---
a/airflow/providers/google/cloud/example_dags/example_azure_fileshare_to_gcs.py
+++ b/tests/system/providers/google/azure/example_azure_fileshare_to_gcs.py
@@ -18,15 +18,20 @@ import os
from datetime import datetime, timedelta
from airflow import DAG
+from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import
AzureFileShareToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
-DEST_GCS_BUCKET = os.environ.get('GCP_GCS_BUCKET', 'gs://INVALID BUCKET NAME')
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+DAG_ID = 'azure_fileshare_to_gcs_example'
+
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
AZURE_SHARE_NAME = os.environ.get('AZURE_SHARE_NAME', 'test-azure-share')
AZURE_DIRECTORY_NAME = "test-azure-dir"
-
with DAG(
- dag_id='azure_fileshare_to_gcs_example',
+ dag_id=DAG_ID,
default_args={
'owner': 'airflow',
'depends_on_past': False,
@@ -39,16 +44,45 @@ with DAG(
schedule_interval='@once',
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=['example'],
+ tags=['example', 'azure'],
) as dag:
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
+ )
+
# [START howto_operator_azure_fileshare_to_gcs_basic]
sync_azure_files_with_gcs = AzureFileShareToGCSOperator(
task_id='sync_azure_files_with_gcs',
share_name=AZURE_SHARE_NAME,
- dest_gcs=DEST_GCS_BUCKET,
+ dest_gcs=BUCKET_NAME,
directory_name=AZURE_DIRECTORY_NAME,
replace=False,
gzip=True,
google_impersonation_chain=None,
)
# [END howto_operator_azure_fileshare_to_gcs_basic]
+
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket", bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
+ )
+
+ (
+ # TEST SETUP
+ create_bucket
+ # TEST BODY
+ >> sync_azure_files_with_gcs
+ # TEST TEARDOWN
+ >> delete_bucket
+ )
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)