This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b22e3c1fcd5 Fix Dataplex Data Quality partial update (#44262)
b22e3c1fcd5 is described below
commit b22e3c1fcd5d92238f0c187c8338c11bdae73acb
Author: Amir Mor <[email protected]>
AuthorDate: Sat Nov 23 01:01:15 2024 +0200
Fix Dataplex Data Quality partial update (#44262)
* 44012 - Update index.rst
* Fix Dataplex Data Quality Task partial update
When we try to update dataplex data quality task using the
DataplexCreateOrUpdateDataQualityScanOperator, it will first try to create the
task, and only if it fails with AlreadyExists exception, it will try to update
the task, but if you want to provide a partial parameters to the update (and
not to replace the entire data scan properties), it will fail with
AirflowException `Error creating Data Quality scan` because its missing
mandatory parameters in the DataScan, and will never upd [...]
I've added a check to see if update_mask is not None, first try to do this
update, and only if not -> try to create the task.
Also moved the update section into a private function to reuse it this
check, and later if we are trying to do a full update of the task
* add empty line for lint
* add test to verify update when update_mask is not none
---------
Co-authored-by: Amir Mor <[email protected]>
---
.../providers/google/cloud/operators/dataplex.py | 65 ++++++++++++----------
.../tests/google/cloud/operators/test_dataplex.py | 22 ++++++++
2 files changed, 57 insertions(+), 30 deletions(-)
diff --git a/providers/src/airflow/providers/google/cloud/operators/dataplex.py
b/providers/src/airflow/providers/google/cloud/operators/dataplex.py
index 04edc10795f..f77c648f20e 100644
--- a/providers/src/airflow/providers/google/cloud/operators/dataplex.py
+++ b/providers/src/airflow/providers/google/cloud/operators/dataplex.py
@@ -686,39 +686,44 @@ class
DataplexCreateOrUpdateDataQualityScanOperator(GoogleCloudBaseOperator):
impersonation_chain=self.impersonation_chain,
)
- self.log.info("Creating Dataplex Data Quality scan %s",
self.data_scan_id)
- try:
- operation = hook.create_data_scan(
- project_id=self.project_id,
- region=self.region,
- data_scan_id=self.data_scan_id,
- body=self.body,
- retry=self.retry,
- timeout=self.timeout,
- metadata=self.metadata,
- )
- hook.wait_for_operation(timeout=self.timeout, operation=operation)
- self.log.info("Dataplex Data Quality scan %s created
successfully!", self.data_scan_id)
- except AlreadyExists:
- self.log.info("Dataplex Data Quality scan already exists: %s",
{self.data_scan_id})
-
- operation = hook.update_data_scan(
- project_id=self.project_id,
- region=self.region,
- data_scan_id=self.data_scan_id,
- body=self.body,
- update_mask=self.update_mask,
- retry=self.retry,
- timeout=self.timeout,
- metadata=self.metadata,
- )
- hook.wait_for_operation(timeout=self.timeout, operation=operation)
- self.log.info("Dataplex Data Quality scan %s updated
successfully!", self.data_scan_id)
- except GoogleAPICallError as e:
- raise AirflowException(f"Error creating Data Quality scan
{self.data_scan_id}", e)
+ if self.update_mask is not None:
+ self._update_data_scan(hook)
+ else:
+ self.log.info("Creating Dataplex Data Quality scan %s",
self.data_scan_id)
+ try:
+ operation = hook.create_data_scan(
+ project_id=self.project_id,
+ region=self.region,
+ data_scan_id=self.data_scan_id,
+ body=self.body,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ hook.wait_for_operation(timeout=self.timeout,
operation=operation)
+ self.log.info("Dataplex Data Quality scan %s created
successfully!", self.data_scan_id)
+ except AlreadyExists:
+ self._update_data_scan(hook)
+ except GoogleAPICallError as e:
+ raise AirflowException(f"Error creating Data Quality scan
{self.data_scan_id}", e)
return self.data_scan_id
+ def _update_data_scan(self, hook: DataplexHook):
+ self.log.info("Dataplex Data Quality scan already exists: %s",
{self.data_scan_id})
+ operation = hook.update_data_scan(
+ project_id=self.project_id,
+ region=self.region,
+ data_scan_id=self.data_scan_id,
+ body=self.body,
+ update_mask=self.update_mask,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ hook.wait_for_operation(timeout=self.timeout, operation=operation)
+ self.log.info("Dataplex Data Quality scan %s updated successfully!",
self.data_scan_id)
+
class DataplexGetDataQualityScanOperator(GoogleCloudBaseOperator):
"""
diff --git a/providers/tests/google/cloud/operators/test_dataplex.py
b/providers/tests/google/cloud/operators/test_dataplex.py
index 67c9b8ca10f..1eec9008e2c 100644
--- a/providers/tests/google/cloud/operators/test_dataplex.py
+++ b/providers/tests/google/cloud/operators/test_dataplex.py
@@ -672,6 +672,18 @@ class TestDataplexCreateDataQualityScanOperator:
api_version=API_VERSION,
impersonation_chain=IMPERSONATION_CHAIN,
)
+ update_operator = DataplexCreateOrUpdateDataQualityScanOperator(
+ task_id=TASK_ID,
+ project_id=PROJECT_ID,
+ region=REGION,
+ data_scan_id=DATA_SCAN_ID,
+ body={},
+ update_mask={},
+ api_version=API_VERSION,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ update_operator.execute(context=mock.MagicMock())
hook_mock.return_value.create_data_scan.assert_called_once_with(
project_id=PROJECT_ID,
region=REGION,
@@ -681,6 +693,16 @@ class TestDataplexCreateDataQualityScanOperator:
timeout=None,
metadata=(),
)
+ hook_mock.return_value.update_data_scan.assert_called_once_with(
+ project_id=PROJECT_ID,
+ region=REGION,
+ data_scan_id=DATA_SCAN_ID,
+ body={},
+ update_mask={},
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
class TestDataplexCreateDataProfileScanOperator: