This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new c90f3d0  ATLAS-3825: updated import business-metadata to set business 
attribute values on appropriate entities
c90f3d0 is described below

commit c90f3d022af699c986029313a447edf3a4aec251
Author: Madhan Neethiraj <mad...@apache.org>
AuthorDate: Wed Jun 3 16:44:51 2020 -0700

    ATLAS-3825: updated import business-metadata to set business attribute 
values on appropriate entities
    
    (cherry picked from commit 86ba342fe6baee08c6aaa20e50de4c3b8e2ae873)
---
 .../atlas/bulkimport/BulkImportResponse.java       |   4 +
 .../store/graph/v2/AtlasEntityStoreV2.java         | 220 ++++++++++-----------
 2 files changed, 110 insertions(+), 114 deletions(-)

diff --git 
a/intg/src/main/java/org/apache/atlas/bulkimport/BulkImportResponse.java 
b/intg/src/main/java/org/apache/atlas/bulkimport/BulkImportResponse.java
index 0ee54e9..047d497 100644
--- a/intg/src/main/java/org/apache/atlas/bulkimport/BulkImportResponse.java
+++ b/intg/src/main/java/org/apache/atlas/bulkimport/BulkImportResponse.java
@@ -95,6 +95,10 @@ public class BulkImportResponse {
             this(parentObjectName, childObjectName, importStatus, "",-1);
         }
 
+        public ImportInfo( ImportStatus importStatus, String remarks) {
+            this("","", importStatus, remarks, -1);
+        }
+
         public ImportInfo( ImportStatus importStatus, String remarks, Integer 
rowNumber) {
             this("","", importStatus, remarks, rowNumber);
         }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index 89076c1..bf1629c 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -41,7 +41,6 @@ import org.apache.atlas.model.instance.AtlasEntityHeaders;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
-import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
@@ -1551,28 +1550,33 @@ public class AtlasEntityStoreV2 implements 
AtlasEntityStore {
     @GraphTransaction
     public BulkImportResponse bulkCreateOrUpdateBusinessAttributes(InputStream 
inputStream, String fileName) throws AtlasBaseException {
         BulkImportResponse ret = new BulkImportResponse();
+
         try {
             if (StringUtils.isBlank(fileName)) {
                 throw new 
AtlasBaseException(AtlasErrorCode.FILE_NAME_NOT_FOUND, fileName);
             }
-            List<String[]> fileData = FileUtils.readFileData(fileName, 
inputStream);
 
+            List<String[]>           fileData              = 
FileUtils.readFileData(fileName, inputStream);
             Map<String, AtlasEntity> attributesToAssociate = 
getBusinessMetadataDefList(fileData, ret);
 
-            for (Map.Entry<String, AtlasEntity> entry : 
attributesToAssociate.entrySet()) {
-                AtlasEntity entity = entry.getValue();
-                try{
+            for (AtlasEntity entity : attributesToAssociate.values()) {
+                try {
                     addOrUpdateBusinessAttributes(entity.getGuid(), 
entity.getBusinessAttributes(), true);
-                    BulkImportResponse.ImportInfo successImportInfo = new 
BulkImportResponse.ImportInfo(entity.getAttribute(Constants.QUALIFIED_NAME).toString(),
 entity.getBusinessAttributes().toString());
+
+                    BulkImportResponse.ImportInfo successImportInfo = new 
BulkImportResponse.ImportInfo(entity.getGuid(), 
entity.getBusinessAttributes().toString());
+
                     ret.setSuccessImportInfoList(successImportInfo);
-                }catch(Exception e){
-                    LOG.error("Error occurred while updating BusinessMetadata 
Attributes for Entity 
"+entity.getAttribute(Constants.QUALIFIED_NAME).toString());
-                    BulkImportResponse.ImportInfo failedImportInfo = new 
BulkImportResponse.ImportInfo(entity.getAttribute(Constants.QUALIFIED_NAME).toString(),
 entity.getBusinessAttributes().toString(), 
BulkImportResponse.ImportStatus.FAILED, e.getMessage());
-                    ret.setFailedImportInfoList(failedImportInfo);
+                }catch (Exception e) {
+                    LOG.error("Error occurred while updating BusinessMetadata 
Attributes for Entity " + entity.getGuid());
+
+                    BulkImportResponse.ImportInfo failedImportInfo = new 
BulkImportResponse.ImportInfo(entity.getGuid(), 
entity.getBusinessAttributes().toString(), 
BulkImportResponse.ImportStatus.FAILED, e.getMessage());
+
+                    ret.getFailedImportInfoList().add(failedImportInfo);
                 }
             }
         } catch (IOException e) {
-            LOG.error("An Exception occurred while uploading the file : 
"+e.getMessage());
+            LOG.error("An Exception occurred while uploading the file {}", 
fileName, e);
+
             throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_UPLOAD, e);
         }
 
@@ -1580,105 +1584,133 @@ public class AtlasEntityStoreV2 implements 
AtlasEntityStore {
     }
 
     private Map<String, AtlasEntity> getBusinessMetadataDefList(List<String[]> 
fileData, BulkImportResponse bulkImportResponse) throws AtlasBaseException {
-        Map<String, AtlasEntity> ret = new HashMap<>();
-        Map<String, Map<String, Object>> newBMAttributes = new HashMap<>();
-        Map<String, AtlasVertex> vertexCache = new HashMap<>();
-        Map<String, Object> attribute = new HashMap<>();
+        Map<String, AtlasEntity> ret           = new HashMap<>();
+        Map<String, AtlasVertex> vertexCache   = new HashMap<>();
+        List<String>             failedMsgList = new ArrayList<>();
 
         for (int lineIndex = 0; lineIndex < fileData.size(); lineIndex++) {
-            List<String> failedTermMsgList = new ArrayList<>();
-            AtlasEntity atlasEntity = new AtlasEntity();
             String[] record = fileData.get(lineIndex);
-            if (missingFieldsCheck(record, bulkImportResponse, lineIndex+1)) {
+
+            boolean missingFields = record.length < 
FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX ||
+                                    
StringUtils.isBlank(record[FileUtils.TYPENAME_COLUMN_INDEX]) ||
+                                    
StringUtils.isBlank(record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX]) ||
+                                    
StringUtils.isBlank(record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX]) ||
+                                    
StringUtils.isBlank(record[FileUtils.BM_ATTR_VALUE_COLUMN_INDEX]);
+
+            if (missingFields){
+                failedMsgList.add("Line #" + (lineIndex + 1) + ": missing 
fields. " + Arrays.toString(record));
+
+                continue;
+            }
+
+            String          typeName   = 
record[FileUtils.TYPENAME_COLUMN_INDEX];
+            AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(typeName);
+
+            if (entityType == null) {
+                failedMsgList.add("Line #" + (lineIndex + 1) + ": invalid 
entity-type '" + typeName + "'");
+
                 continue;
             }
-            String typeName = record[FileUtils.TYPENAME_COLUMN_INDEX];
-            String uniqueAttrValue = 
record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX];
-            String bmAttribute = record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX];
+
+            String uniqueAttrValue  = 
record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX];
+            String bmAttribute      = 
record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX];
             String bmAttributeValue = 
record[FileUtils.BM_ATTR_VALUE_COLUMN_INDEX];
-            String uniqueAttrName = Constants.QUALIFIED_NAME;
+            String uniqueAttrName   = AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME;
+
             if (record.length > FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX) {
-                uniqueAttrName = 
typeName+"."+record[FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX];
+                uniqueAttrName = 
record[FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX];
             }
 
-            if (validateTypeName(typeName, bulkImportResponse, lineIndex+1)) {
+            AtlasAttribute uniqueAttribute = 
entityType.getAttribute(uniqueAttrName);
+
+            if (uniqueAttribute == null) {
+                failedMsgList.add("Line #" + (lineIndex + 1) + ": attribute '" 
+ uniqueAttrName + "' not found in entity-type '" + typeName + "'");
+
                 continue;
             }
 
-            String vertexKey = typeName + "_" + uniqueAttrName + "_" + 
uniqueAttrValue;
-            AtlasVertex atlasVertex = vertexCache.get(vertexKey);
-            if (atlasVertex == null) {
-                atlasVertex = 
AtlasGraphUtilsV2.findByTypeAndUniquePropertyName(graph, typeName, 
uniqueAttrName, uniqueAttrValue);
-            }
+            if (!uniqueAttribute.getAttributeDef().getIsUnique()) {
+                failedMsgList.add("Line #" + (lineIndex + 1) + ": attribute '" 
+ uniqueAttrName + "' is not an unique attribute in entity-type '" + typeName + 
"'");
 
-            if (atlasVertex == null) {
-                LOG.error("Provided UniqueAttributeValue is not valid : " + 
uniqueAttrValue + " at line #" + (lineIndex + 1));
-                failedTermMsgList.add("Provided UniqueAttributeValue is not 
valid : " + uniqueAttrValue + " at line #" + (lineIndex + 1));
+                continue;
             }
 
-            vertexCache.put(vertexKey, atlasVertex);
-            String[] businessAttributeName = 
bmAttribute.split(FileUtils.ESCAPE_CHARACTER + ".");
-            if 
(validateBMAttributeName(uniqueAttrValue,bmAttribute,bulkImportResponse,lineIndex+1))
 {
-                continue;
+            String      vertexKey = uniqueAttribute.getVertexPropertyName() + 
"_" + uniqueAttrValue;
+            AtlasVertex vertex    = vertexCache.get(vertexKey);
+
+            if (vertex == null) {
+                vertex = 
AtlasGraphUtilsV2.findByTypeAndUniquePropertyName(graph, typeName, 
uniqueAttribute.getVertexUniquePropertyName(), uniqueAttrValue);
+
+                if (vertex == null) {
+                    failedMsgList.add("Line #" + (lineIndex + 1) + ": no " + 
typeName + " entity found with " + uniqueAttrName + "=" + uniqueAttrValue);
+
+                    continue;
+                }
+
+                vertexCache.put(vertexKey, vertex);
             }
 
-            String bMName = businessAttributeName[0];
-            String bMAttributeName = businessAttributeName[1];
-            AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(typeName);
-            if (validateBMAttribute(uniqueAttrValue, businessAttributeName, 
entityType, bulkImportResponse,lineIndex+1)) {
+            AtlasBusinessAttribute businessAttribute = 
entityType.getBusinesAAttribute(bmAttribute);
+
+            if (businessAttribute == null) {
+                failedMsgList.add("Line #" + (lineIndex + 1) + ": invalid 
business attribute name '" + bmAttribute + "'");
+
                 continue;
             }
 
-            AtlasBusinessAttribute atlasBusinessAttribute = 
entityType.getBusinessAttributes().get(bMName).get(bMAttributeName);
-            if (atlasBusinessAttribute.getAttributeType().getTypeCategory() == 
TypeCategory.ARRAY) {
-                AtlasArrayType arrayType = (AtlasArrayType) 
atlasBusinessAttribute.getAttributeType();
-                List attributeValueData;
+            final Object attrValue;
+
+            if (businessAttribute.getAttributeType().getTypeCategory() == 
TypeCategory.ARRAY) {
+                AtlasArrayType arrayType = (AtlasArrayType) 
businessAttribute.getAttributeType();
+                List           arrayValue;
 
-                if(arrayType.getElementType() instanceof AtlasEnumType){
-                    attributeValueData = 
AtlasGraphUtilsV2.assignEnumValues(bmAttributeValue, (AtlasEnumType) 
arrayType.getElementType(), failedTermMsgList, lineIndex+1);
-                }else{
-                    attributeValueData = 
assignMultipleValues(bmAttributeValue, arrayType.getElementTypeName(), 
failedTermMsgList, lineIndex+1);
+                if (arrayType.getElementType() instanceof AtlasEnumType) {
+                    arrayValue = 
AtlasGraphUtilsV2.assignEnumValues(bmAttributeValue, (AtlasEnumType) 
arrayType.getElementType(), failedMsgList, lineIndex+1);
+                } else {
+                    arrayValue = assignMultipleValues(bmAttributeValue, 
arrayType.getElementTypeName(), failedMsgList, lineIndex+1);
                 }
-                attribute.put(bmAttribute, attributeValueData);
+
+                attrValue = arrayValue;
             } else {
-                attribute.put(bmAttribute, bmAttributeValue);
+                attrValue = bmAttributeValue;
             }
 
-            if(failedMsgCheck(uniqueAttrValue,bmAttribute, failedTermMsgList, 
bulkImportResponse, lineIndex+1)) {
-                continue;
-            }
+            if (ret.containsKey(vertexKey)) {
+                AtlasEntity entity = ret.get(vertexKey);
 
-            if(ret.containsKey(vertexKey)) {
-                atlasEntity = ret.get(vertexKey);
-                atlasEntity.setBusinessAttribute(bMName, bMAttributeName, 
attribute.get(bmAttribute));
-                ret.put(vertexKey, atlasEntity);
+                
entity.setBusinessAttribute(businessAttribute.getDefinedInType().getTypeName(), 
businessAttribute.getName(), attrValue);
             } else {
-                String guid = GraphHelper.getGuid(atlasVertex);
-                atlasEntity.setGuid(guid);
-                atlasEntity.setTypeName(typeName);
-                
atlasEntity.setAttribute(Constants.QUALIFIED_NAME,uniqueAttrValue);
-                newBMAttributes = 
entityRetriever.getBusinessMetadata(atlasVertex) != null ? 
entityRetriever.getBusinessMetadata(atlasVertex) : newBMAttributes;
-                atlasEntity.setBusinessAttributes(newBMAttributes);
-                atlasEntity.setBusinessAttribute(bMName, bMAttributeName, 
attribute.get(bmAttribute));
-                ret.put(vertexKey, atlasEntity);
+                AtlasEntity                      entity             = new 
AtlasEntity();
+                String                           guid               = 
GraphHelper.getGuid(vertex);
+                Map<String, Map<String, Object>> businessAttributes = 
entityRetriever.getBusinessMetadata(vertex);
+
+                entity.setGuid(guid);
+                entity.setTypeName(typeName);
+                entity.setAttribute(uniqueAttribute.getName(), 
uniqueAttrValue);
+
+                if (businessAttributes == null) {
+                    businessAttributes = new HashMap<>();
+                }
+
+                entity.setBusinessAttributes(businessAttributes);
+                
entity.setBusinessAttribute(businessAttribute.getDefinedInType().getTypeName(), 
businessAttribute.getName(), attrValue);
+
+                ret.put(vertexKey, entity);
             }
         }
-        return ret;
-    }
 
-    private boolean validateTypeName(String typeName, BulkImportResponse 
bulkImportResponse, int lineIndex) {
-        boolean ret = false;
+        for (String failedMsg : failedMsgList) {
+            LOG.error(failedMsg);
+
+            BulkImportResponse.ImportInfo importInfo = new 
BulkImportResponse.ImportInfo(BulkImportResponse.ImportStatus.FAILED, 
failedMsg);
 
-        if(!typeRegistry.getAllEntityDefNames().contains(typeName)){
-            ret = true;
-            LOG.error("Invalid entity-type: " + typeName + " at line #" + 
lineIndex);
-            String failedTermMsgs = "Invalid entity-type: " + typeName + " at 
line #" + lineIndex;
-            BulkImportResponse.ImportInfo importInfo = new 
BulkImportResponse.ImportInfo(BulkImportResponse.ImportStatus.FAILED, 
failedTermMsgs, lineIndex);
             bulkImportResponse.getFailedImportInfoList().add(importInfo);
         }
+
         return ret;
     }
 
+
     private List assignMultipleValues(String bmAttributeValues, String 
elementTypeName, List failedTermMsgList, int lineIndex) {
 
         String[] arr = bmAttributeValues.split(FileUtils.ESCAPE_CHARACTER + 
FileUtils.PIPE_CHARACTER);
@@ -1731,44 +1763,4 @@ public class AtlasEntityStoreV2 implements 
AtlasEntityStore {
         }
         return missingFieldsCheck;
     }
-
-    private boolean validateBMAttributeName(String uniqueAttrValue, String 
bmAttribute, BulkImportResponse bulkImportResponse, int lineIndex) {
-        boolean ret = false;
-        String[] businessAttributeName = 
bmAttribute.split(FileUtils.ESCAPE_CHARACTER + ".");
-        if(businessAttributeName.length < 2){
-            LOG.error("Provided businessAttributeName is not in proper format 
: " + bmAttribute + " at line #" + lineIndex);
-            String failedTermMsgs = "Provided businessAttributeName is not in 
proper format : " + bmAttribute + " at line #" + lineIndex;
-            BulkImportResponse.ImportInfo importInfo = new 
BulkImportResponse.ImportInfo(uniqueAttrValue, bmAttribute,  
BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex);
-            bulkImportResponse.getFailedImportInfoList().add(importInfo);
-            ret = true;
-        }
-        return ret;
-    }
-
-    private boolean validateBMAttribute(String uniqueAttrValue,String[] 
businessAttributeName, AtlasEntityType entityType, BulkImportResponse 
bulkImportResponse, int lineIndex) {
-        boolean ret = false;
-        String bMName = businessAttributeName[0];
-        String bMAttributeName = businessAttributeName[1];
-
-        if(entityType.getBusinessAttributes(bMName) == null ||
-                entityType.getBusinessAttributes(bMName).get(bMAttributeName) 
== null){
-            ret = true;
-            LOG.error("Provided businessAttributeName is not valid : " + 
bMName+"."+bMAttributeName + " at line #" + lineIndex);
-            String failedTermMsgs = "Provided businessAttributeName is not 
valid : " + bMName+"."+bMAttributeName + " at line #" + lineIndex;
-            BulkImportResponse.ImportInfo importInfo = new 
BulkImportResponse.ImportInfo(uniqueAttrValue, bMName+"."+bMAttributeName, 
BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex);
-            bulkImportResponse.getFailedImportInfoList().add(importInfo);
-        }
-        return ret;
-    }
-
-    private boolean failedMsgCheck(String uniqueAttrValue, String bmAttribute, 
List<String> failedTermMsgList,BulkImportResponse bulkImportResponse,int 
lineIndex) {
-        boolean ret = false;
-        if(!failedTermMsgList.isEmpty()){
-            ret = true;
-            String failedTermMsg = StringUtils.join(failedTermMsgList, "\n");
-            BulkImportResponse.ImportInfo importInfo = new 
BulkImportResponse.ImportInfo(uniqueAttrValue, bmAttribute, 
BulkImportResponse.ImportStatus.FAILED, failedTermMsg, lineIndex);
-            bulkImportResponse.getFailedImportInfoList().add(importInfo);
-        }
-        return ret;
-    }
 }

Reply via email to