This is an automated email from the ASF dual-hosted git repository. sarath pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 55723a3 ATLAS-4191: Improve Bulk Glossary Import to support import with relations 55723a3 is described below commit 55723a3b749afb5f98ce5cac972a51368d63dc37 Author: sidmishra <sidmis...@cloudera.com> AuthorDate: Wed Apr 7 09:57:50 2021 -0700 ATLAS-4191: Improve Bulk Glossary Import to support import with relations Signed-off-by: Sarath Subramanian <sar...@apache.org> (cherry picked from commit ff662427e54f675200646f52eb13db6084ef4ef3) --- .../main/java/org/apache/atlas/AtlasErrorCode.java | 1 + .../atlas/model/glossary/AtlasGlossaryTerm.java | 16 +++ .../org/apache/atlas/glossary/GlossaryService.java | 95 ++++++++++-------- .../apache/atlas/glossary/GlossaryTermUtils.java | 109 ++++++++++++--------- .../store/graph/v2/AtlasEntityStoreV2.java | 34 +++---- .../main/java/org/apache/atlas/util/FileUtils.java | 44 +++++---- .../apache/atlas/glossary/GlossaryServiceTest.java | 29 +++++- .../template_with_circular_relationship.csv | 4 + .../src/test/resources/excelFiles/invalid_xls.xls | 5 + 9 files changed, 205 insertions(+), 132 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java index 884f81f..773fae2 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java +++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java @@ -171,6 +171,7 @@ public enum AtlasErrorCode { INVALID_BUSINESS_ATTRIBUTES_IMPORT_DATA(400, "ATLAS-400-00-099","The uploaded file was not processed due to following errors : {0}"), ATTRIBUTE_NAME_INVALID_CHARS(400, "ATLAS-400-00-09A", "{0}: invalid name. Attribute names must begin with a letter followed by a sequence of letters, numbers, or '_' characters"), NO_DATA_FOUND(400, "ATLAS-400-00-09B", "No data found in the uploaded file"), + NOT_VALID_FILE(400, "ATLAS-400-00-09C", "Invalid {0} file"), UNAUTHORIZED_ACCESS(403, "ATLAS-403-00-001", "{0} is not authorized to perform {1}"), // All Not found enums go here diff --git a/intg/src/main/java/org/apache/atlas/model/glossary/AtlasGlossaryTerm.java b/intg/src/main/java/org/apache/atlas/model/glossary/AtlasGlossaryTerm.java index 8d0b7c5..4fa1538 100644 --- a/intg/src/main/java/org/apache/atlas/model/glossary/AtlasGlossaryTerm.java +++ b/intg/src/main/java/org/apache/atlas/model/glossary/AtlasGlossaryTerm.java @@ -286,6 +286,22 @@ public class AtlasGlossaryTerm extends AtlasGlossaryBaseObject { hasTerms = true; } + public boolean containAnyRelation() { + return (CollectionUtils.isNotEmpty(getTranslationTerms()) || + CollectionUtils.isNotEmpty(getValidValuesFor()) || + CollectionUtils.isNotEmpty(getSynonyms()) || + CollectionUtils.isNotEmpty(getReplacedBy()) || + CollectionUtils.isNotEmpty(getValidValues()) || + CollectionUtils.isNotEmpty(getReplacementTerms()) || + CollectionUtils.isNotEmpty(getSeeAlso()) || + CollectionUtils.isNotEmpty(getTranslatedTerms()) || + CollectionUtils.isNotEmpty(getIsA()) || + CollectionUtils.isNotEmpty(getAntonyms()) || + CollectionUtils.isNotEmpty(getClassifies()) || + CollectionUtils.isNotEmpty(getPreferredToTerms()) || + CollectionUtils.isNotEmpty(getPreferredTerms())); + } + @JsonIgnore public String toAuditString() { AtlasGlossaryTerm t = new AtlasGlossaryTerm(); diff --git a/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java b/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java index d156700..b12c0b8 100644 --- a/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java +++ b/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java @@ -44,7 +44,6 @@ import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import javax.inject.Inject; -import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; @@ -394,8 +393,12 @@ public class GlossaryService { return ret; } - @GraphTransaction public AtlasGlossaryTerm updateTerm(AtlasGlossaryTerm atlasGlossaryTerm) throws AtlasBaseException { + return updateTerm(atlasGlossaryTerm, true); + } + + @GraphTransaction + public AtlasGlossaryTerm updateTerm(AtlasGlossaryTerm atlasGlossaryTerm, boolean ignoreUpdateIfTermExists) throws AtlasBaseException { if (DEBUG_ENABLED) { LOG.debug("==> GlossaryService.updateTerm({})", atlasGlossaryTerm); } @@ -412,9 +415,12 @@ public class GlossaryService { throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME); } - String qualifiedName = getDuplicateGlossaryRelatedTerm(atlasGlossaryTerm); - if (StringUtils.isNotEmpty(qualifiedName)) { - throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_ALREADY_EXISTS, qualifiedName); + if (ignoreUpdateIfTermExists) { + String qualifiedName = getDuplicateGlossaryRelatedTerm(atlasGlossaryTerm); + + if (StringUtils.isNotEmpty(qualifiedName)) { + throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_ALREADY_EXISTS, qualifiedName); + } } AtlasGlossaryTerm storeObject = dataAccess.load(atlasGlossaryTerm); @@ -1041,7 +1047,6 @@ public class GlossaryService { } private String getDuplicateGlossaryRelatedTerm(AtlasGlossaryTerm atlasGlossaryTerm) throws AtlasBaseException { - Map<AtlasGlossaryTerm.Relation, Set<AtlasRelatedTermHeader>> relatedTermsMap = atlasGlossaryTerm.getRelatedTerms(); for (Map.Entry<AtlasGlossaryTerm.Relation, Set<AtlasRelatedTermHeader>> relatedTermsMapEntry : relatedTermsMap.entrySet()) { Set<AtlasRelatedTermHeader> termHeaders = relatedTermsMapEntry.getValue(); @@ -1113,59 +1118,65 @@ public class GlossaryService { public BulkImportResponse importGlossaryData(InputStream inputStream, String fileName) throws AtlasBaseException { BulkImportResponse ret = new BulkImportResponse(); - try { - if (StringUtils.isBlank(fileName)) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_FILE_TYPE, fileName); - } - List<String[]> fileData = FileUtils.readFileData(fileName, inputStream); - List<AtlasGlossaryTerm> glossaryTerms = glossaryTermUtils.getGlossaryTermDataList(fileData, ret); + if (StringUtils.isBlank(fileName)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_FILE_TYPE, fileName); + } - for (AtlasGlossaryTerm glossaryTerm : glossaryTerms) { - String glossaryTermName = glossaryTerm.getName(); - String glossaryName = getGlossaryName(glossaryTerm); + List<String[]> fileData = FileUtils.readFileData(fileName, inputStream); - try { - createTerm(glossaryTerm); - ret.addToSuccessImportInfoList(new ImportInfo(glossaryName, glossaryTermName)); - } catch (AtlasBaseException e) { - LOG.error("Error while importing glossary term {}", glossaryTermName); + List<AtlasGlossaryTerm> glossaryTermsWithoutRelations = glossaryTermUtils.getGlossaryTermDataList(fileData, ret); + createGlossaryTerms(glossaryTermsWithoutRelations, ret); - ret.addToFailedImportInfoList(new ImportInfo(glossaryName, glossaryTermName, FAILED, e.getMessage())); - } - } - } catch (IOException e) { - throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_UPLOAD, fileName); - } + List<AtlasGlossaryTerm> glossaryTermsWithRelations = glossaryTermUtils.getGlossaryTermDataList(fileData, ret, true); + updateGlossaryTermsRelation(glossaryTermsWithRelations, ret); return ret; } - private String getGlossaryName(AtlasGlossaryTerm glossaryTerm) { - String ret = ""; - String glossaryTermQName = glossaryTerm.getQualifiedName(); + private void createGlossaryTerms(List<AtlasGlossaryTerm> glossaryTerms, BulkImportResponse bulkImportResponse) { + for (AtlasGlossaryTerm glossaryTerm : glossaryTerms) { + String glossaryTermName = glossaryTerm.getName(); + String glossaryName = getGlossaryName(glossaryTerm); - if (StringUtils.isNotBlank(glossaryTermQName)){ - String[] glossaryQnameSplit = glossaryTermQName.split("@"); + try { + createTerm(glossaryTerm); - ret = (glossaryQnameSplit.length == 2) ? glossaryQnameSplit[1] : ""; - } + bulkImportResponse.addToSuccessImportInfoList(new ImportInfo(glossaryName, glossaryTermName)); + } catch (AtlasBaseException e) { + LOG.error(AtlasErrorCode.FAILED_TO_CREATE_GLOSSARY_TERM.toString(), glossaryTermName, e); - return ret; + bulkImportResponse.addToFailedImportInfoList(new ImportInfo(glossaryName, glossaryTermName, FAILED, e.getMessage())); + } + } } - private List<AtlasGlossaryTerm> createGlossaryTerms(List<AtlasGlossaryTerm> glossaryTerms) throws AtlasBaseException { - List<AtlasGlossaryTerm> ret = new ArrayList<>(); - + private void updateGlossaryTermsRelation(List<AtlasGlossaryTerm> glossaryTerms, BulkImportResponse bulkImportResponse) { for (AtlasGlossaryTerm glossaryTerm : glossaryTerms) { - try { - ret.add(createTerm(glossaryTerm)); - } catch (AtlasBaseException e) { - if (!e.getAtlasErrorCode().equals(AtlasErrorCode.GLOSSARY_TERM_ALREADY_EXISTS)) { - throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_CREATE_GLOSSARY_TERM, glossaryTerm.getName()); + if (glossaryTerm.containAnyRelation()) { + String glossaryTermName = glossaryTerm.getName(); + String glossaryName = getGlossaryName(glossaryTerm); + + try { + updateTerm(glossaryTerm, false); + } catch (AtlasBaseException e) { + LOG.error(AtlasErrorCode.FAILED_TO_CREATE_GLOSSARY_TERM.toString(), glossaryTermName, e); + + bulkImportResponse.addToFailedImportInfoList(new ImportInfo(glossaryName, glossaryTermName, FAILED, e.getMessage())); } } } + } + + private String getGlossaryName(AtlasGlossaryTerm glossaryTerm) { + String ret = ""; + String glossaryTermQName = glossaryTerm.getQualifiedName(); + + if (StringUtils.isNotBlank(glossaryTermQName)){ + String[] glossaryQnameSplit = glossaryTermQName.split("@"); + + ret = (glossaryQnameSplit.length == 2) ? glossaryQnameSplit[1] : ""; + } return ret; } diff --git a/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java b/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java index 647dd3b..80b09a3 100644 --- a/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java +++ b/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java @@ -61,6 +61,8 @@ public class GlossaryTermUtils extends GlossaryUtils { private static final Logger LOG = LoggerFactory.getLogger(GlossaryTermUtils.class); private static final boolean DEBUG_ENABLED = LOG.isDebugEnabled(); + Map<String, String> glossaryNameGuidCacheForImport = new HashMap<>(); + protected GlossaryTermUtils(AtlasRelationshipStore relationshipStore, AtlasTypeRegistry typeRegistry, DataAccess dataAccess) { super(relationshipStore, typeRegistry, dataAccess); } @@ -534,53 +536,60 @@ public class GlossaryTermUtils extends GlossaryUtils { } protected List<AtlasGlossaryTerm> getGlossaryTermDataList(List<String[]> fileData, BulkImportResponse bulkImportResponse) throws AtlasBaseException { - List<AtlasGlossaryTerm> glossaryTerms = new ArrayList<>(); - Map<String, String> glossaryNameCache = new HashMap<>(); - int rowCount = 1; + return getGlossaryTermDataList(fileData, bulkImportResponse, false); + } + + protected List<AtlasGlossaryTerm> getGlossaryTermDataList(List<String[]> fileData, BulkImportResponse bulkImportResponse, boolean processRelations) throws AtlasBaseException { + List<AtlasGlossaryTerm> glossaryTerms = new ArrayList<>(); + int rowCount = 1; for (String[] record : fileData) { List<String> failedTermMsgs = new ArrayList<>(); AtlasGlossaryTerm glossaryTerm = new AtlasGlossaryTerm(); + String glossaryName = StringUtils.EMPTY; if ((record.length < 1) || StringUtils.isBlank(record[0])) { LOG.error("The GlossaryName is blank for the record : ", Arrays.toString(record)); failedTermMsgs.add("The GlossaryName is blank for the record : " + Arrays.toString(record)); - } - - String glossaryName = record[0]; - String glossaryGuid; - - if (glossaryNameCache.get(glossaryName) != null) { - glossaryGuid = glossaryNameCache.get(glossaryName); - } else { - AtlasVertex vertex = AtlasGraphUtilsV2.findByTypeAndUniquePropertyName(GlossaryUtils.ATLAS_GLOSSARY_TYPENAME, GlossaryUtils.ATLAS_GLOSSARY_TYPENAME + "." + QUALIFIED_NAME_ATTR, glossaryName); + String glossaryGuid; + glossaryName = record[0]; - glossaryGuid = (vertex != null) ? AtlasGraphUtilsV2.getIdFromVertex(vertex) : null; - } + if (glossaryNameGuidCacheForImport.get(glossaryName) != null) { + glossaryGuid = glossaryNameGuidCacheForImport.get(glossaryName); - if (glossaryGuid == null) { - if (GlossaryService.isNameInvalid(glossaryName)) { - LOG.error("The provided Glossary Name is invalid : " + glossaryName); - failedTermMsgs.add("The provided Glossary Name is invalid : " + glossaryName); } else { - AtlasGlossary glossary = new AtlasGlossary(); - glossary.setQualifiedName(glossaryName); - glossary.setName(glossaryName); + AtlasVertex vertex = AtlasGraphUtilsV2.findByTypeAndUniquePropertyName(GlossaryUtils.ATLAS_GLOSSARY_TYPENAME, GlossaryUtils.ATLAS_GLOSSARY_TYPENAME + "." + QUALIFIED_NAME_ATTR, glossaryName); - glossary = dataAccess.save(glossary); - glossaryGuid = glossary.getGuid(); + glossaryGuid = (vertex != null) ? AtlasGraphUtilsV2.getIdFromVertex(vertex) : null; + } + + if (glossaryGuid == null) { + if (GlossaryService.isNameInvalid(glossaryName)) { + LOG.error("The provided Glossary Name is invalid : " + glossaryName); + failedTermMsgs.add("The provided Glossary Name is invalid : " + glossaryName); + } else { + AtlasGlossary glossary = new AtlasGlossary(); + glossary.setQualifiedName(glossaryName); + glossary.setName(glossaryName); + + glossary = dataAccess.save(glossary); + glossaryGuid = glossary.getGuid(); + } } - } - if (glossaryGuid != null) { - glossaryNameCache.put(glossaryName, glossaryGuid); - glossaryTerm = populateGlossaryTermObject(failedTermMsgs, record, glossaryGuid); + if (glossaryGuid != null) { + glossaryNameGuidCacheForImport.put(glossaryName, glossaryGuid); + + glossaryTerm = populateGlossaryTermObject(failedTermMsgs, record, glossaryGuid, processRelations); + + glossaryTerm.setQualifiedName(getGlossaryTermQualifiedName(glossaryTerm.getName(), glossaryGuid)); + + glossaryTerms.add(glossaryTerm); + } } - if (failedTermMsgs.size() == 0) { - glossaryTerms.add(glossaryTerm); - } else { + if (failedTermMsgs.size() > 0) { String failedTermMsg = StringUtils.join(failedTermMsgs, "\n"); String glossaryTermName = glossaryTerm.getName(); @@ -683,14 +692,12 @@ public class GlossaryTermUtils extends GlossaryUtils { dataArray[1] + FileUtils.COLON_CHARACTER + dataArray[0] + " for record with TermName : " + termName + " and GlossaryName : " + glossaryName); } } - - return ret; } return ret; } - protected AtlasGlossaryTerm populateGlossaryTermObject(List<String> failedTermMsgList, String[] record, String glossaryGuid) { + protected AtlasGlossaryTerm populateGlossaryTermObject(List<String> failedTermMsgList, String[] record, String glossaryGuid, boolean populateRelations) { AtlasGlossaryTerm ret = new AtlasGlossaryTerm(); int i = 0; int length = record.length; @@ -712,35 +719,43 @@ public class GlossaryTermUtils extends GlossaryUtils { ret.setAdditionalAttributes(((length > ++i) ? (Map<String, Object>) getMapValue(record[i], failedTermMsgList) : null)); - ret.setTranslationTerms((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); + ret.setAnchor(new AtlasGlossaryHeader(glossaryGuid)); - ret.setValidValuesFor((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); + if (populateRelations) { + ret.setTranslationTerms((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); - ret.setSynonyms((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); + ret.setValidValuesFor((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); - ret.setReplacedBy((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); + ret.setSynonyms((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); - ret.setValidValues((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); + ret.setReplacedBy((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); - ret.setReplacementTerms((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); + ret.setValidValues((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); - ret.setSeeAlso((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); + ret.setReplacementTerms((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); - ret.setTranslatedTerms((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); + ret.setSeeAlso((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); - ret.setIsA((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); + ret.setTranslatedTerms((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); - ret.setAnchor(new AtlasGlossaryHeader(glossaryGuid)); + ret.setIsA((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); - ret.setAntonyms((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); + ret.setAntonyms((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); - ret.setClassifies((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); + ret.setClassifies((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); - ret.setPreferredToTerms((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); + ret.setPreferredToTerms((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); - ret.setPreferredTerms((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); + ret.setPreferredTerms((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[0], failedTermMsgList) : null); + } } return ret; } + + private String getGlossaryTermQualifiedName(String glossaryTermName, String glossaryGuid) throws AtlasBaseException { + AtlasGlossary glossary = dataAccess.load(getGlossarySkeleton(glossaryGuid)); + + return glossaryTermName + "@" + glossary.getQualifiedName(); + } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index 9f143d4..c133920 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -1527,32 +1527,26 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { public BulkImportResponse bulkCreateOrUpdateBusinessAttributes(InputStream inputStream, String fileName) throws AtlasBaseException { BulkImportResponse ret = new BulkImportResponse(); - try { - if (StringUtils.isBlank(fileName)) { - throw new AtlasBaseException(AtlasErrorCode.FILE_NAME_NOT_FOUND, fileName); - } + if (StringUtils.isBlank(fileName)) { + throw new AtlasBaseException(AtlasErrorCode.FILE_NAME_NOT_FOUND, fileName); + } - List<String[]> fileData = FileUtils.readFileData(fileName, inputStream); - Map<String, AtlasEntity> attributesToAssociate = getBusinessMetadataDefList(fileData, ret); + List<String[]> fileData = FileUtils.readFileData(fileName, inputStream); + Map<String, AtlasEntity> attributesToAssociate = getBusinessMetadataDefList(fileData, ret); - for (AtlasEntity entity : attributesToAssociate.values()) { - Map<String, Map<String, Object>> businessAttributes = entity.getBusinessAttributes(); - String guid = entity.getGuid(); + for (AtlasEntity entity : attributesToAssociate.values()) { + Map<String, Map<String, Object>> businessAttributes = entity.getBusinessAttributes(); + String guid = entity.getGuid(); - try { - addOrUpdateBusinessAttributes(guid, businessAttributes, true); + try { + addOrUpdateBusinessAttributes(guid, businessAttributes, true); - ret.addToSuccessImportInfoList(new ImportInfo(guid, businessAttributes.toString())); - }catch (Exception e) { - LOG.error("Error occurred while updating BusinessMetadata Attributes for Entity " + guid); + ret.addToSuccessImportInfoList(new ImportInfo(guid, businessAttributes.toString())); + } catch (Exception e) { + LOG.error("Error occurred while updating BusinessMetadata Attributes for Entity " + guid); - ret.addToFailedImportInfoList(new ImportInfo(guid, businessAttributes.toString(), FAILED, e.getMessage())); - } + ret.addToFailedImportInfoList(new ImportInfo(guid, businessAttributes.toString(), FAILED, e.getMessage())); } - } catch (IOException e) { - LOG.error("An Exception occurred while uploading the file {}", fileName, e); - - throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_UPLOAD, e); } return ret; diff --git a/repository/src/main/java/org/apache/atlas/util/FileUtils.java b/repository/src/main/java/org/apache/atlas/util/FileUtils.java index 57df9ab..ce1e0fe 100644 --- a/repository/src/main/java/org/apache/atlas/util/FileUtils.java +++ b/repository/src/main/java/org/apache/atlas/util/FileUtils.java @@ -53,7 +53,7 @@ public class FileUtils { public static final int BM_ATTR_VALUE_COLUMN_INDEX = 3; public static final int UNIQUE_ATTR_NAME_COLUMN_INDEX = 4; - public static List<String[]> readFileData(String fileName, InputStream inputStream) throws IOException, AtlasBaseException { + public static List<String[]> readFileData(String fileName, InputStream inputStream) throws AtlasBaseException { List<String[]> ret; String extension = FilenameUtils.getExtension(fileName); @@ -72,7 +72,7 @@ public class FileUtils { return ret; } - public static List<String[]> readCSV(InputStream inputStream) throws IOException, AtlasBaseException { + public static List<String[]> readCSV(InputStream inputStream) throws AtlasBaseException { List<String[]> ret = new ArrayList<>(); try (CSVReader csvReader = new CSVReader(new InputStreamReader(inputStream))) { @@ -89,36 +89,40 @@ public class FileUtils { ret.add(data); } } - } catch (CsvValidationException e) { - throw new AtlasBaseException(AtlasErrorCode.NO_DATA_FOUND, e); + } catch (CsvValidationException | IOException e) { + throw new AtlasBaseException(AtlasErrorCode.NOT_VALID_FILE, CSV.name()); } return ret; } - public static List<String[]> readExcel(InputStream inputStream, String extension) throws IOException { + public static List<String[]> readExcel(InputStream inputStream, String extension) throws AtlasBaseException { List<String[]> ret = new ArrayList<>(); - Workbook excelBook = extension.equalsIgnoreCase(XLS.name()) ? new HSSFWorkbook(inputStream) : new XSSFWorkbook(inputStream); - Sheet excelSheet = excelBook.getSheetAt(0); - Iterator itr = excelSheet.rowIterator(); - Row headerRow = (Row) itr.next(); - if (isRowEmpty(headerRow)) { - return ret; - } + try (Workbook excelBook = extension.equalsIgnoreCase(XLS.name()) ? new HSSFWorkbook(inputStream) : new XSSFWorkbook(inputStream)) { + Sheet excelSheet = excelBook.getSheetAt(0); + Iterator itr = excelSheet.rowIterator(); + Row headerRow = (Row) itr.next(); - while (itr.hasNext()) { - Row row = (Row) itr.next(); + if (isRowEmpty(headerRow)) { + return ret; + } - if (!isRowEmpty(row)) { - String[] data = new String[row.getLastCellNum()]; + while (itr.hasNext()) { + Row row = (Row) itr.next(); - for (int i = 0; i < row.getLastCellNum(); i++) { - data[i] = (row.getCell(i) != null) ? row.getCell(i).getStringCellValue().trim() : null; - } + if (!isRowEmpty(row)) { + String[] data = new String[row.getLastCellNum()]; - ret.add(data); + for (int i = 0; i < row.getLastCellNum(); i++) { + data[i] = (row.getCell(i) != null) ? row.getCell(i).getStringCellValue().trim() : null; + } + + ret.add(data); + } } + } catch (IOException e) { + throw new AtlasBaseException(AtlasErrorCode.NOT_VALID_FILE, XLS.name()); } return ret; diff --git a/repository/src/test/java/org/apache/atlas/glossary/GlossaryServiceTest.java b/repository/src/test/java/org/apache/atlas/glossary/GlossaryServiceTest.java index 371b942..27717c3 100644 --- a/repository/src/test/java/org/apache/atlas/glossary/GlossaryServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/glossary/GlossaryServiceTest.java @@ -98,9 +98,9 @@ public class GlossaryServiceTest { public static Object[][] getGlossaryTermsProvider() { return new Object[][]{ // offset, limit, expected - {0, -1, 6}, + {0, -1, 7}, {0, 2, 2}, - {2, 5, 4}, + {2, 6, 5}, }; } @@ -949,6 +949,14 @@ public class GlossaryServiceTest { assertNotNull(bulkImportResponse1); assertEquals(bulkImportResponse1.getSuccessImportInfoList().size(), 1); + + // With circular dependent relations + InputStream inputStream2 = getFile(CSV_FILES,"template_with_circular_relationship.csv"); + BulkImportResponse bulkImportResponse2 = glossaryService.importGlossaryData(inputStream2,"template_with_circular_relationship.csv"); + + assertNotNull(bulkImportResponse2); + assertEquals(bulkImportResponse2.getSuccessImportInfoList().size(), 3); + assertEquals(bulkImportResponse2.getFailedImportInfoList().size(), 0); } catch (AtlasBaseException e){ fail("The GlossaryTerm should have been created "+e); } @@ -967,6 +975,18 @@ public class GlossaryServiceTest { } @Test + public void testInvalidFileException() { + InputStream inputStream = getFile(EXCEL_FILES, "invalid_xls.xls"); + + try { + BulkImportResponse bulkImportResponse = glossaryService.importGlossaryData(inputStream, "invalid_xls.xls"); + fail("Error occurred : Failed to recognize the invalid xls file."); + } catch (AtlasBaseException e) { + assertEquals(e.getMessage(),"Invalid XLS file"); + } + } + + @Test public void testFileExtension() throws IOException { InputStream inputStream = getFile(CSV_FILES, "incorrectEXT.py"); final String userDir = System.getProperty("user.dir"); @@ -987,6 +1007,9 @@ public class GlossaryServiceTest { try { BulkImportResponse bulkImportResponse = glossaryService.importGlossaryData(inputStream, "incorrectFile.csv"); + assertEquals(bulkImportResponse.getSuccessImportInfoList().size(),1); + + //Due to invalid Relation we get Failed message even the import succeeded for the term assertEquals(bulkImportResponse.getFailedImportInfoList().size(),1); } catch (AtlasBaseException e) { fail("The incorrect file exception should have handled "+e); @@ -1015,4 +1038,4 @@ public class GlossaryServiceTest { return startPath + "/src/test/resources/" + fileName; } } -} \ No newline at end of file +} diff --git a/repository/src/test/resources/csvFiles/template_with_circular_relationship.csv b/repository/src/test/resources/csvFiles/template_with_circular_relationship.csv new file mode 100644 index 0000000..7f2948d --- /dev/null +++ b/repository/src/test/resources/csvFiles/template_with_circular_relationship.csv @@ -0,0 +1,4 @@ +GlossaryName, TermName, ShortDescription, LongDescription, Examples, Abbreviation, Usage, AdditionalAttributes, TranslationTerms, ValidValuesFor, Synonyms, ReplacedBy, ValidValues, ReplacementTerms, SeeAlso, TranslatedTerms, IsA, Antonyms, Classifies, PreferredToTerms, PreferredTerms +Industry,Fact,More specific sales-related information,,,,,,,,,,,,Industry:Vertical|Industry:Horizontal,,,,,, +Industry,Vertical,Specific industry to which we sell products or services,,,,,,,,,,,,Industry:Fact,,,,,, +Industry,Horizontal,Not industry-specific,,,,,,,,,,,,Industry:Fact,,,,,, \ No newline at end of file diff --git a/repository/src/test/resources/excelFiles/invalid_xls.xls b/repository/src/test/resources/excelFiles/invalid_xls.xls new file mode 100644 index 0000000..67d513c --- /dev/null +++ b/repository/src/test/resources/excelFiles/invalid_xls.xls @@ -0,0 +1,5 @@ +GlossaryName, TermName, ShortDescription, LongDescription, Examples, Abbreviation, Usage, AdditionalAttributes, TranslationTerms, ValidValuesFor, Synonyms, ReplacedBy, ValidValues, ReplacementTerms, SeeAlso, TranslatedTerms, IsA, Antonyms, Classifies, PreferredToTerms, PreferredTerms +Industry1,Fact,More specific sales-related information,,,,,,,,,,,,,,,,,, +Industry1,Vertical,Specific industry to which we sell products or services,,,,,,,,,,,,Industry1:Fact,,,,,, +Industry1,Horizontal,Not industry-specific,,,,,,,,,,,,Industry1:Fact,,,,,, +Industry1,Testing,Testing,,,,,,,,,,,,Industry:Fact,,,,,, \ No newline at end of file