This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new c90f3d0 ATLAS-3825: updated import business-metadata to set business attribute values on appropriate entities c90f3d0 is described below commit c90f3d022af699c986029313a447edf3a4aec251 Author: Madhan Neethiraj <mad...@apache.org> AuthorDate: Wed Jun 3 16:44:51 2020 -0700 ATLAS-3825: updated import business-metadata to set business attribute values on appropriate entities (cherry picked from commit 86ba342fe6baee08c6aaa20e50de4c3b8e2ae873) --- .../atlas/bulkimport/BulkImportResponse.java | 4 + .../store/graph/v2/AtlasEntityStoreV2.java | 220 ++++++++++----------- 2 files changed, 110 insertions(+), 114 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/bulkimport/BulkImportResponse.java b/intg/src/main/java/org/apache/atlas/bulkimport/BulkImportResponse.java index 0ee54e9..047d497 100644 --- a/intg/src/main/java/org/apache/atlas/bulkimport/BulkImportResponse.java +++ b/intg/src/main/java/org/apache/atlas/bulkimport/BulkImportResponse.java @@ -95,6 +95,10 @@ public class BulkImportResponse { this(parentObjectName, childObjectName, importStatus, "",-1); } + public ImportInfo( ImportStatus importStatus, String remarks) { + this("","", importStatus, remarks, -1); + } + public ImportInfo( ImportStatus importStatus, String remarks, Integer rowNumber) { this("","", importStatus, remarks, rowNumber); } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index 89076c1..bf1629c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -41,7 +41,6 @@ import org.apache.atlas.model.instance.AtlasEntityHeaders; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.typedef.AtlasBaseTypeDef; -import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; @@ -1551,28 +1550,33 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { @GraphTransaction public BulkImportResponse bulkCreateOrUpdateBusinessAttributes(InputStream inputStream, String fileName) throws AtlasBaseException { BulkImportResponse ret = new BulkImportResponse(); + try { if (StringUtils.isBlank(fileName)) { throw new AtlasBaseException(AtlasErrorCode.FILE_NAME_NOT_FOUND, fileName); } - List<String[]> fileData = FileUtils.readFileData(fileName, inputStream); + List<String[]> fileData = FileUtils.readFileData(fileName, inputStream); Map<String, AtlasEntity> attributesToAssociate = getBusinessMetadataDefList(fileData, ret); - for (Map.Entry<String, AtlasEntity> entry : attributesToAssociate.entrySet()) { - AtlasEntity entity = entry.getValue(); - try{ + for (AtlasEntity entity : attributesToAssociate.values()) { + try { addOrUpdateBusinessAttributes(entity.getGuid(), entity.getBusinessAttributes(), true); - BulkImportResponse.ImportInfo successImportInfo = new BulkImportResponse.ImportInfo(entity.getAttribute(Constants.QUALIFIED_NAME).toString(), entity.getBusinessAttributes().toString()); + + BulkImportResponse.ImportInfo successImportInfo = new BulkImportResponse.ImportInfo(entity.getGuid(), entity.getBusinessAttributes().toString()); + ret.setSuccessImportInfoList(successImportInfo); - }catch(Exception e){ - LOG.error("Error occurred while updating BusinessMetadata Attributes for Entity "+entity.getAttribute(Constants.QUALIFIED_NAME).toString()); - BulkImportResponse.ImportInfo failedImportInfo = new BulkImportResponse.ImportInfo(entity.getAttribute(Constants.QUALIFIED_NAME).toString(), entity.getBusinessAttributes().toString(), BulkImportResponse.ImportStatus.FAILED, e.getMessage()); - ret.setFailedImportInfoList(failedImportInfo); + }catch (Exception e) { + LOG.error("Error occurred while updating BusinessMetadata Attributes for Entity " + entity.getGuid()); + + BulkImportResponse.ImportInfo failedImportInfo = new BulkImportResponse.ImportInfo(entity.getGuid(), entity.getBusinessAttributes().toString(), BulkImportResponse.ImportStatus.FAILED, e.getMessage()); + + ret.getFailedImportInfoList().add(failedImportInfo); } } } catch (IOException e) { - LOG.error("An Exception occurred while uploading the file : "+e.getMessage()); + LOG.error("An Exception occurred while uploading the file {}", fileName, e); + throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_UPLOAD, e); } @@ -1580,105 +1584,133 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { } private Map<String, AtlasEntity> getBusinessMetadataDefList(List<String[]> fileData, BulkImportResponse bulkImportResponse) throws AtlasBaseException { - Map<String, AtlasEntity> ret = new HashMap<>(); - Map<String, Map<String, Object>> newBMAttributes = new HashMap<>(); - Map<String, AtlasVertex> vertexCache = new HashMap<>(); - Map<String, Object> attribute = new HashMap<>(); + Map<String, AtlasEntity> ret = new HashMap<>(); + Map<String, AtlasVertex> vertexCache = new HashMap<>(); + List<String> failedMsgList = new ArrayList<>(); for (int lineIndex = 0; lineIndex < fileData.size(); lineIndex++) { - List<String> failedTermMsgList = new ArrayList<>(); - AtlasEntity atlasEntity = new AtlasEntity(); String[] record = fileData.get(lineIndex); - if (missingFieldsCheck(record, bulkImportResponse, lineIndex+1)) { + + boolean missingFields = record.length < FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX || + StringUtils.isBlank(record[FileUtils.TYPENAME_COLUMN_INDEX]) || + StringUtils.isBlank(record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX]) || + StringUtils.isBlank(record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX]) || + StringUtils.isBlank(record[FileUtils.BM_ATTR_VALUE_COLUMN_INDEX]); + + if (missingFields){ + failedMsgList.add("Line #" + (lineIndex + 1) + ": missing fields. " + Arrays.toString(record)); + + continue; + } + + String typeName = record[FileUtils.TYPENAME_COLUMN_INDEX]; + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); + + if (entityType == null) { + failedMsgList.add("Line #" + (lineIndex + 1) + ": invalid entity-type '" + typeName + "'"); + continue; } - String typeName = record[FileUtils.TYPENAME_COLUMN_INDEX]; - String uniqueAttrValue = record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX]; - String bmAttribute = record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX]; + + String uniqueAttrValue = record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX]; + String bmAttribute = record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX]; String bmAttributeValue = record[FileUtils.BM_ATTR_VALUE_COLUMN_INDEX]; - String uniqueAttrName = Constants.QUALIFIED_NAME; + String uniqueAttrName = AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME; + if (record.length > FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX) { - uniqueAttrName = typeName+"."+record[FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX]; + uniqueAttrName = record[FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX]; } - if (validateTypeName(typeName, bulkImportResponse, lineIndex+1)) { + AtlasAttribute uniqueAttribute = entityType.getAttribute(uniqueAttrName); + + if (uniqueAttribute == null) { + failedMsgList.add("Line #" + (lineIndex + 1) + ": attribute '" + uniqueAttrName + "' not found in entity-type '" + typeName + "'"); + continue; } - String vertexKey = typeName + "_" + uniqueAttrName + "_" + uniqueAttrValue; - AtlasVertex atlasVertex = vertexCache.get(vertexKey); - if (atlasVertex == null) { - atlasVertex = AtlasGraphUtilsV2.findByTypeAndUniquePropertyName(graph, typeName, uniqueAttrName, uniqueAttrValue); - } + if (!uniqueAttribute.getAttributeDef().getIsUnique()) { + failedMsgList.add("Line #" + (lineIndex + 1) + ": attribute '" + uniqueAttrName + "' is not an unique attribute in entity-type '" + typeName + "'"); - if (atlasVertex == null) { - LOG.error("Provided UniqueAttributeValue is not valid : " + uniqueAttrValue + " at line #" + (lineIndex + 1)); - failedTermMsgList.add("Provided UniqueAttributeValue is not valid : " + uniqueAttrValue + " at line #" + (lineIndex + 1)); + continue; } - vertexCache.put(vertexKey, atlasVertex); - String[] businessAttributeName = bmAttribute.split(FileUtils.ESCAPE_CHARACTER + "."); - if (validateBMAttributeName(uniqueAttrValue,bmAttribute,bulkImportResponse,lineIndex+1)) { - continue; + String vertexKey = uniqueAttribute.getVertexPropertyName() + "_" + uniqueAttrValue; + AtlasVertex vertex = vertexCache.get(vertexKey); + + if (vertex == null) { + vertex = AtlasGraphUtilsV2.findByTypeAndUniquePropertyName(graph, typeName, uniqueAttribute.getVertexUniquePropertyName(), uniqueAttrValue); + + if (vertex == null) { + failedMsgList.add("Line #" + (lineIndex + 1) + ": no " + typeName + " entity found with " + uniqueAttrName + "=" + uniqueAttrValue); + + continue; + } + + vertexCache.put(vertexKey, vertex); } - String bMName = businessAttributeName[0]; - String bMAttributeName = businessAttributeName[1]; - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); - if (validateBMAttribute(uniqueAttrValue, businessAttributeName, entityType, bulkImportResponse,lineIndex+1)) { + AtlasBusinessAttribute businessAttribute = entityType.getBusinesAAttribute(bmAttribute); + + if (businessAttribute == null) { + failedMsgList.add("Line #" + (lineIndex + 1) + ": invalid business attribute name '" + bmAttribute + "'"); + continue; } - AtlasBusinessAttribute atlasBusinessAttribute = entityType.getBusinessAttributes().get(bMName).get(bMAttributeName); - if (atlasBusinessAttribute.getAttributeType().getTypeCategory() == TypeCategory.ARRAY) { - AtlasArrayType arrayType = (AtlasArrayType) atlasBusinessAttribute.getAttributeType(); - List attributeValueData; + final Object attrValue; + + if (businessAttribute.getAttributeType().getTypeCategory() == TypeCategory.ARRAY) { + AtlasArrayType arrayType = (AtlasArrayType) businessAttribute.getAttributeType(); + List arrayValue; - if(arrayType.getElementType() instanceof AtlasEnumType){ - attributeValueData = AtlasGraphUtilsV2.assignEnumValues(bmAttributeValue, (AtlasEnumType) arrayType.getElementType(), failedTermMsgList, lineIndex+1); - }else{ - attributeValueData = assignMultipleValues(bmAttributeValue, arrayType.getElementTypeName(), failedTermMsgList, lineIndex+1); + if (arrayType.getElementType() instanceof AtlasEnumType) { + arrayValue = AtlasGraphUtilsV2.assignEnumValues(bmAttributeValue, (AtlasEnumType) arrayType.getElementType(), failedMsgList, lineIndex+1); + } else { + arrayValue = assignMultipleValues(bmAttributeValue, arrayType.getElementTypeName(), failedMsgList, lineIndex+1); } - attribute.put(bmAttribute, attributeValueData); + + attrValue = arrayValue; } else { - attribute.put(bmAttribute, bmAttributeValue); + attrValue = bmAttributeValue; } - if(failedMsgCheck(uniqueAttrValue,bmAttribute, failedTermMsgList, bulkImportResponse, lineIndex+1)) { - continue; - } + if (ret.containsKey(vertexKey)) { + AtlasEntity entity = ret.get(vertexKey); - if(ret.containsKey(vertexKey)) { - atlasEntity = ret.get(vertexKey); - atlasEntity.setBusinessAttribute(bMName, bMAttributeName, attribute.get(bmAttribute)); - ret.put(vertexKey, atlasEntity); + entity.setBusinessAttribute(businessAttribute.getDefinedInType().getTypeName(), businessAttribute.getName(), attrValue); } else { - String guid = GraphHelper.getGuid(atlasVertex); - atlasEntity.setGuid(guid); - atlasEntity.setTypeName(typeName); - atlasEntity.setAttribute(Constants.QUALIFIED_NAME,uniqueAttrValue); - newBMAttributes = entityRetriever.getBusinessMetadata(atlasVertex) != null ? entityRetriever.getBusinessMetadata(atlasVertex) : newBMAttributes; - atlasEntity.setBusinessAttributes(newBMAttributes); - atlasEntity.setBusinessAttribute(bMName, bMAttributeName, attribute.get(bmAttribute)); - ret.put(vertexKey, atlasEntity); + AtlasEntity entity = new AtlasEntity(); + String guid = GraphHelper.getGuid(vertex); + Map<String, Map<String, Object>> businessAttributes = entityRetriever.getBusinessMetadata(vertex); + + entity.setGuid(guid); + entity.setTypeName(typeName); + entity.setAttribute(uniqueAttribute.getName(), uniqueAttrValue); + + if (businessAttributes == null) { + businessAttributes = new HashMap<>(); + } + + entity.setBusinessAttributes(businessAttributes); + entity.setBusinessAttribute(businessAttribute.getDefinedInType().getTypeName(), businessAttribute.getName(), attrValue); + + ret.put(vertexKey, entity); } } - return ret; - } - private boolean validateTypeName(String typeName, BulkImportResponse bulkImportResponse, int lineIndex) { - boolean ret = false; + for (String failedMsg : failedMsgList) { + LOG.error(failedMsg); + + BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(BulkImportResponse.ImportStatus.FAILED, failedMsg); - if(!typeRegistry.getAllEntityDefNames().contains(typeName)){ - ret = true; - LOG.error("Invalid entity-type: " + typeName + " at line #" + lineIndex); - String failedTermMsgs = "Invalid entity-type: " + typeName + " at line #" + lineIndex; - BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex); bulkImportResponse.getFailedImportInfoList().add(importInfo); } + return ret; } + private List assignMultipleValues(String bmAttributeValues, String elementTypeName, List failedTermMsgList, int lineIndex) { String[] arr = bmAttributeValues.split(FileUtils.ESCAPE_CHARACTER + FileUtils.PIPE_CHARACTER); @@ -1731,44 +1763,4 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { } return missingFieldsCheck; } - - private boolean validateBMAttributeName(String uniqueAttrValue, String bmAttribute, BulkImportResponse bulkImportResponse, int lineIndex) { - boolean ret = false; - String[] businessAttributeName = bmAttribute.split(FileUtils.ESCAPE_CHARACTER + "."); - if(businessAttributeName.length < 2){ - LOG.error("Provided businessAttributeName is not in proper format : " + bmAttribute + " at line #" + lineIndex); - String failedTermMsgs = "Provided businessAttributeName is not in proper format : " + bmAttribute + " at line #" + lineIndex; - BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(uniqueAttrValue, bmAttribute, BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex); - bulkImportResponse.getFailedImportInfoList().add(importInfo); - ret = true; - } - return ret; - } - - private boolean validateBMAttribute(String uniqueAttrValue,String[] businessAttributeName, AtlasEntityType entityType, BulkImportResponse bulkImportResponse, int lineIndex) { - boolean ret = false; - String bMName = businessAttributeName[0]; - String bMAttributeName = businessAttributeName[1]; - - if(entityType.getBusinessAttributes(bMName) == null || - entityType.getBusinessAttributes(bMName).get(bMAttributeName) == null){ - ret = true; - LOG.error("Provided businessAttributeName is not valid : " + bMName+"."+bMAttributeName + " at line #" + lineIndex); - String failedTermMsgs = "Provided businessAttributeName is not valid : " + bMName+"."+bMAttributeName + " at line #" + lineIndex; - BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(uniqueAttrValue, bMName+"."+bMAttributeName, BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex); - bulkImportResponse.getFailedImportInfoList().add(importInfo); - } - return ret; - } - - private boolean failedMsgCheck(String uniqueAttrValue, String bmAttribute, List<String> failedTermMsgList,BulkImportResponse bulkImportResponse,int lineIndex) { - boolean ret = false; - if(!failedTermMsgList.isEmpty()){ - ret = true; - String failedTermMsg = StringUtils.join(failedTermMsgList, "\n"); - BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(uniqueAttrValue, bmAttribute, BulkImportResponse.ImportStatus.FAILED, failedTermMsg, lineIndex); - bulkImportResponse.getFailedImportInfoList().add(importInfo); - } - return ret; - } }