This is an automated email from the ASF dual-hosted git repository.

radhikakundam pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new cd1c3d026 ATLAS-4746: hive_process and hive_process_execution 
(lineage) being generated for simple DML UPDATE queries run via hive
cd1c3d026 is described below

commit cd1c3d02694fe60a749f0597580dd43808e3961f
Author: radhikakundam <radhikakun...@apache.org>
AuthorDate: Wed May 17 10:40:21 2023 -0700

    ATLAS-4746: hive_process and hive_process_execution (lineage) being 
generated for simple DML UPDATE queries run via hive
    
    Signed-off-by: radhikakundam <radhikakun...@apache.org>
    (cherry picked from commit 1443f25a46784dfa63892daa74744b19b5c2a71e)
---
 .../atlas/hive/hook/events/CreateHiveProcess.java  | 70 +++++++++++++++-------
 1 file changed, 47 insertions(+), 23 deletions(-)

diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
index bc2c91a25..fed4ece41 100644
--- 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
+++ 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
@@ -41,6 +41,7 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -72,28 +73,21 @@ public class CreateHiveProcess extends BaseHiveEvent {
         if (!skipProcess()) {
             List<AtlasEntity> inputs              = new ArrayList<>();
             List<AtlasEntity> outputs             = new ArrayList<>();
-            Set<String>       processedInputNames = new HashSet<>();
-            Set<String>       processedOutputNames = new HashSet<>();
+            Set<String>       processedNames      = new HashSet<>();
 
             ret = new AtlasEntitiesWithExtInfo();
 
+            Map<String, Entity> inputByQualifiedName = new HashMap<>();
+            Map<String, Entity> outputByQualifiedName = new HashMap<>();
+
             if (getInputs() != null) {
                 for (ReadEntity input : getInputs()) {
                     String qualifiedName = getQualifiedName(input);
 
-                    if (qualifiedName == null || 
!processedInputNames.add(qualifiedName)) {
+                    if (qualifiedName == null) {
                         continue;
                     }
-
-                    AtlasEntity entity = getInputOutputEntity(input, ret, 
skipTempTables);
-
-                    if (!input.isDirect()) {
-                        continue;
-                    }
-
-                    if (entity != null) {
-                        inputs.add(entity);
-                    }
+                    inputByQualifiedName.put(qualifiedName, input);
                 }
             }
 
@@ -101,27 +95,53 @@ public class CreateHiveProcess extends BaseHiveEvent {
                 for (WriteEntity output : getOutputs()) {
                     String qualifiedName = getQualifiedName(output);
 
-                    if (qualifiedName == null || 
!processedOutputNames.add(qualifiedName)) {
+                    if (qualifiedName == null) {
                         continue;
                     }
+                    outputByQualifiedName.put(qualifiedName, output);
+                }
+            }
 
-                    AtlasEntity entity = getInputOutputEntity(output, ret, 
skipTempTables);
+            for (String outputQualifiedName : outputByQualifiedName.keySet()) {
+                WriteEntity output = (WriteEntity) 
outputByQualifiedName.get(outputQualifiedName);
+                AtlasEntity entity = getInputOutputEntity(output, ret, 
skipTempTables);
 
-                    if (entity != null) {
-                        outputs.add(entity);
-                    }
+                if (checkIfOnlySelfLineagePossible(outputQualifiedName, 
inputByQualifiedName) || !processedNames.add(outputQualifiedName)) {
+                    continue;
+                }
 
-                    if (isDdlOperation(entity)) {
+                if (entity != null) {
+                    outputs.add(entity);
+                }
 
-                        AtlasEntity ddlEntity = createHiveDDLEntity(entity);
+                if (isDdlOperation(entity)) {
 
-                        if (ddlEntity != null) {
-                            ret.addEntity(ddlEntity);
-                        }
+                    AtlasEntity ddlEntity = createHiveDDLEntity(entity);
+
+                    if (ddlEntity != null) {
+                        ret.addEntity(ddlEntity);
                     }
                 }
             }
 
+            for (String inputQualifiedName : inputByQualifiedName.keySet()) {
+                ReadEntity input   = (ReadEntity) 
inputByQualifiedName.get(inputQualifiedName);
+
+                if (!processedNames.add(inputQualifiedName)) {
+                    continue;
+                }
+
+                AtlasEntity entity = getInputOutputEntity(input, ret, 
skipTempTables);
+
+                if (!input.isDirect()) {
+                    continue;
+                }
+
+                if (entity != null) {
+                    inputs.add(entity);
+                }
+            }
+
             boolean skipProcess = inputs.isEmpty() && outputs.isEmpty();
 
             if (!skipProcess) {
@@ -151,6 +171,10 @@ public class CreateHiveProcess extends BaseHiveEvent {
         return ret;
     }
 
+    private boolean checkIfOnlySelfLineagePossible(String outputQualifiedName, 
Map<String, Entity> inputByQualifiedName) {
+        return inputByQualifiedName.size() == 1 && 
inputByQualifiedName.containsKey(outputQualifiedName);
+    }
+
     private void processColumnLineage(AtlasEntity hiveProcess, 
AtlasEntitiesWithExtInfo entities) {
         LineageInfo lineageInfo = getLineageInfo();
 

Reply via email to