This is an automated email from the ASF dual-hosted git repository. radhikakundam pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new 1443f25a4 ATLAS-4746: hive_process and hive_process_execution (lineage) being generated for simple DML UPDATE queries run via hive 1443f25a4 is described below commit 1443f25a46784dfa63892daa74744b19b5c2a71e Author: radhikakundam <radhikakun...@apache.org> AuthorDate: Wed May 17 10:40:21 2023 -0700 ATLAS-4746: hive_process and hive_process_execution (lineage) being generated for simple DML UPDATE queries run via hive Signed-off-by: radhikakundam <radhikakun...@apache.org> --- .../atlas/hive/hook/events/CreateHiveProcess.java | 70 +++++++++++++++------- 1 file changed, 47 insertions(+), 23 deletions(-) diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java index bc2c91a25..fed4ece41 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java @@ -41,6 +41,7 @@ import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -72,28 +73,21 @@ public class CreateHiveProcess extends BaseHiveEvent { if (!skipProcess()) { List<AtlasEntity> inputs = new ArrayList<>(); List<AtlasEntity> outputs = new ArrayList<>(); - Set<String> processedInputNames = new HashSet<>(); - Set<String> processedOutputNames = new HashSet<>(); + Set<String> processedNames = new HashSet<>(); ret = new AtlasEntitiesWithExtInfo(); + Map<String, Entity> inputByQualifiedName = new HashMap<>(); + Map<String, Entity> outputByQualifiedName = new HashMap<>(); + if (getInputs() != null) { for (ReadEntity input : getInputs()) { String qualifiedName = getQualifiedName(input); - if (qualifiedName == null || !processedInputNames.add(qualifiedName)) { + if (qualifiedName == null) { continue; } - - AtlasEntity entity = getInputOutputEntity(input, ret, skipTempTables); - - if (!input.isDirect()) { - continue; - } - - if (entity != null) { - inputs.add(entity); - } + inputByQualifiedName.put(qualifiedName, input); } } @@ -101,27 +95,53 @@ public class CreateHiveProcess extends BaseHiveEvent { for (WriteEntity output : getOutputs()) { String qualifiedName = getQualifiedName(output); - if (qualifiedName == null || !processedOutputNames.add(qualifiedName)) { + if (qualifiedName == null) { continue; } + outputByQualifiedName.put(qualifiedName, output); + } + } - AtlasEntity entity = getInputOutputEntity(output, ret, skipTempTables); + for (String outputQualifiedName : outputByQualifiedName.keySet()) { + WriteEntity output = (WriteEntity) outputByQualifiedName.get(outputQualifiedName); + AtlasEntity entity = getInputOutputEntity(output, ret, skipTempTables); - if (entity != null) { - outputs.add(entity); - } + if (checkIfOnlySelfLineagePossible(outputQualifiedName, inputByQualifiedName) || !processedNames.add(outputQualifiedName)) { + continue; + } - if (isDdlOperation(entity)) { + if (entity != null) { + outputs.add(entity); + } - AtlasEntity ddlEntity = createHiveDDLEntity(entity); + if (isDdlOperation(entity)) { - if (ddlEntity != null) { - ret.addEntity(ddlEntity); - } + AtlasEntity ddlEntity = createHiveDDLEntity(entity); + + if (ddlEntity != null) { + ret.addEntity(ddlEntity); } } } + for (String inputQualifiedName : inputByQualifiedName.keySet()) { + ReadEntity input = (ReadEntity) inputByQualifiedName.get(inputQualifiedName); + + if (!processedNames.add(inputQualifiedName)) { + continue; + } + + AtlasEntity entity = getInputOutputEntity(input, ret, skipTempTables); + + if (!input.isDirect()) { + continue; + } + + if (entity != null) { + inputs.add(entity); + } + } + boolean skipProcess = inputs.isEmpty() && outputs.isEmpty(); if (!skipProcess) { @@ -151,6 +171,10 @@ public class CreateHiveProcess extends BaseHiveEvent { return ret; } + private boolean checkIfOnlySelfLineagePossible(String outputQualifiedName, Map<String, Entity> inputByQualifiedName) { + return inputByQualifiedName.size() == 1 && inputByQualifiedName.containsKey(outputQualifiedName); + } + private void processColumnLineage(AtlasEntity hiveProcess, AtlasEntitiesWithExtInfo entities) { LineageInfo lineageInfo = getLineageInfo();