This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b22e3c1fcd5 Fix Dataplex Data Quality partial update (#44262)
b22e3c1fcd5 is described below

commit b22e3c1fcd5d92238f0c187c8338c11bdae73acb
Author: Amir Mor <[email protected]>
AuthorDate: Sat Nov 23 01:01:15 2024 +0200

    Fix Dataplex Data Quality partial update (#44262)
    
    * 44012 - Update index.rst
    
    * Fix Dataplex Data Quality Task partial update
    
    When we try to update dataplex data quality task using the 
DataplexCreateOrUpdateDataQualityScanOperator, it will first try to create the 
task, and only if it fails with AlreadyExists exception, it will try to update 
the task, but if you want to provide a partial parameters to the update (and 
not to replace the entire data scan properties), it will fail with 
AirflowException `Error creating Data Quality scan` because its missing 
mandatory parameters in the DataScan, and will never upd [...]
    
    I've added a check to see if update_mask is not None, first try to do this 
update, and only if not -> try to create the task.
    Also moved the update section into a private function to reuse it this 
check, and later if we are trying to do a full update of the task
    
    * add empty line for lint
    
    * add test to verify update when update_mask is not none
    
    ---------
    
    Co-authored-by: Amir Mor <[email protected]>
---
 .../providers/google/cloud/operators/dataplex.py   | 65 ++++++++++++----------
 .../tests/google/cloud/operators/test_dataplex.py  | 22 ++++++++
 2 files changed, 57 insertions(+), 30 deletions(-)

diff --git a/providers/src/airflow/providers/google/cloud/operators/dataplex.py 
b/providers/src/airflow/providers/google/cloud/operators/dataplex.py
index 04edc10795f..f77c648f20e 100644
--- a/providers/src/airflow/providers/google/cloud/operators/dataplex.py
+++ b/providers/src/airflow/providers/google/cloud/operators/dataplex.py
@@ -686,39 +686,44 @@ class 
DataplexCreateOrUpdateDataQualityScanOperator(GoogleCloudBaseOperator):
             impersonation_chain=self.impersonation_chain,
         )
 
-        self.log.info("Creating Dataplex Data Quality scan %s", 
self.data_scan_id)
-        try:
-            operation = hook.create_data_scan(
-                project_id=self.project_id,
-                region=self.region,
-                data_scan_id=self.data_scan_id,
-                body=self.body,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
-            hook.wait_for_operation(timeout=self.timeout, operation=operation)
-            self.log.info("Dataplex Data Quality scan %s created 
successfully!", self.data_scan_id)
-        except AlreadyExists:
-            self.log.info("Dataplex Data Quality scan already exists: %s", 
{self.data_scan_id})
-
-            operation = hook.update_data_scan(
-                project_id=self.project_id,
-                region=self.region,
-                data_scan_id=self.data_scan_id,
-                body=self.body,
-                update_mask=self.update_mask,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
-            hook.wait_for_operation(timeout=self.timeout, operation=operation)
-            self.log.info("Dataplex Data Quality scan %s updated 
successfully!", self.data_scan_id)
-        except GoogleAPICallError as e:
-            raise AirflowException(f"Error creating Data Quality scan 
{self.data_scan_id}", e)
+        if self.update_mask is not None:
+            self._update_data_scan(hook)
+        else:
+            self.log.info("Creating Dataplex Data Quality scan %s", 
self.data_scan_id)
+            try:
+                operation = hook.create_data_scan(
+                    project_id=self.project_id,
+                    region=self.region,
+                    data_scan_id=self.data_scan_id,
+                    body=self.body,
+                    retry=self.retry,
+                    timeout=self.timeout,
+                    metadata=self.metadata,
+                )
+                hook.wait_for_operation(timeout=self.timeout, 
operation=operation)
+                self.log.info("Dataplex Data Quality scan %s created 
successfully!", self.data_scan_id)
+            except AlreadyExists:
+                self._update_data_scan(hook)
+            except GoogleAPICallError as e:
+                raise AirflowException(f"Error creating Data Quality scan 
{self.data_scan_id}", e)
 
         return self.data_scan_id
 
+    def _update_data_scan(self, hook: DataplexHook):
+        self.log.info("Dataplex Data Quality scan already exists: %s", 
{self.data_scan_id})
+        operation = hook.update_data_scan(
+            project_id=self.project_id,
+            region=self.region,
+            data_scan_id=self.data_scan_id,
+            body=self.body,
+            update_mask=self.update_mask,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        hook.wait_for_operation(timeout=self.timeout, operation=operation)
+        self.log.info("Dataplex Data Quality scan %s updated successfully!", 
self.data_scan_id)
+
 
 class DataplexGetDataQualityScanOperator(GoogleCloudBaseOperator):
     """
diff --git a/providers/tests/google/cloud/operators/test_dataplex.py 
b/providers/tests/google/cloud/operators/test_dataplex.py
index 67c9b8ca10f..1eec9008e2c 100644
--- a/providers/tests/google/cloud/operators/test_dataplex.py
+++ b/providers/tests/google/cloud/operators/test_dataplex.py
@@ -672,6 +672,18 @@ class TestDataplexCreateDataQualityScanOperator:
             api_version=API_VERSION,
             impersonation_chain=IMPERSONATION_CHAIN,
         )
+        update_operator = DataplexCreateOrUpdateDataQualityScanOperator(
+            task_id=TASK_ID,
+            project_id=PROJECT_ID,
+            region=REGION,
+            data_scan_id=DATA_SCAN_ID,
+            body={},
+            update_mask={},
+            api_version=API_VERSION,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        update_operator.execute(context=mock.MagicMock())
         hook_mock.return_value.create_data_scan.assert_called_once_with(
             project_id=PROJECT_ID,
             region=REGION,
@@ -681,6 +693,16 @@ class TestDataplexCreateDataQualityScanOperator:
             timeout=None,
             metadata=(),
         )
+        hook_mock.return_value.update_data_scan.assert_called_once_with(
+            project_id=PROJECT_ID,
+            region=REGION,
+            data_scan_id=DATA_SCAN_ID,
+            body={},
+            update_mask={},
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
 
 
 class TestDataplexCreateDataProfileScanOperator:

Reply via email to