This is an automated email from the ASF dual-hosted git repository. sarath pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 1310598 ATLAS-4274: Non matching relation are created via bulk import 1310598 is described below commit 1310598af79ea3a815972cde18d50be252337856 Author: sidmishra <sidmis...@cloudera.com> AuthorDate: Sat May 8 09:52:23 2021 -0700 ATLAS-4274: Non matching relation are created via bulk import Signed-off-by: Sarath Subramanian <sar...@apache.org> (cherry picked from commit 821b4c441153505db7f0ba9c5670623c527af989) --- .../org/apache/atlas/glossary/GlossaryService.java | 3 + .../apache/atlas/glossary/GlossaryTermUtils.java | 205 ++++++++++++++++++--- 2 files changed, 185 insertions(+), 23 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java b/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java index 73217de..9c84598 100644 --- a/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java +++ b/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java @@ -1118,6 +1118,7 @@ public class GlossaryService { } } + @GraphTransaction public BulkImportResponse importGlossaryData(InputStream inputStream, String fileName) throws AtlasBaseException { BulkImportResponse ret = new BulkImportResponse(); @@ -1161,6 +1162,8 @@ public class GlossaryService { private void updateGlossaryTermsRelation(List<AtlasGlossaryTerm> glossaryTerms, BulkImportResponse bulkImportResponse) { for (AtlasGlossaryTerm glossaryTerm : glossaryTerms) { + glossaryTermUtils.updateGlossaryTermRelations(glossaryTerm); + if (glossaryTerm.hasTerms()) { String glossaryTermName = glossaryTerm.getName(); String glossaryName = getGlossaryName(glossaryTerm); diff --git a/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java b/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java index 553d3d0..fa02c8e 100644 --- a/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java +++ b/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java @@ -66,7 +66,9 @@ public class GlossaryTermUtils extends GlossaryUtils { private static final int INDEX_FOR_GLOSSARY_AT_RECORD = 0; private static final int INDEX_FOR_TERM_AT_RECORD = 1; - private static final ThreadLocal<Map<String, String>> glossaryNameGuidCacheForImport = ThreadLocal.withInitial(() -> new LinkedHashMap<>()); + private static final ThreadLocal<Map<String, String>> glossaryNameGuidCache = ThreadLocal.withInitial(() -> new LinkedHashMap<>()); + private static final ThreadLocal<Map<String, Integer>> glossaryTermOrderCache = ThreadLocal.withInitial(() -> new HashMap<>()); + private static final ThreadLocal<Map<String, String>> glossaryTermQNameGuidCache = ThreadLocal.withInitial(() -> new HashMap<>()); protected GlossaryTermUtils(AtlasRelationshipStore relationshipStore, AtlasTypeRegistry typeRegistry, DataAccess dataAccess) { super(relationshipStore, typeRegistry, dataAccess); @@ -150,7 +152,9 @@ public class GlossaryTermUtils extends GlossaryUtils { } public void clearImportCache() { - glossaryNameGuidCacheForImport.get().clear(); + glossaryNameGuidCache.get().clear(); + glossaryTermOrderCache.get().clear(); + glossaryTermQNameGuidCache.get().clear(); } private boolean isRelationshipGuidSame(AtlasRelatedObjectId storeObject, AtlasRelatedObjectId relatedObjectId) { @@ -560,7 +564,7 @@ public class GlossaryTermUtils extends GlossaryUtils { } else { glossaryName = record[INDEX_FOR_GLOSSARY_AT_RECORD]; - String glossaryGuid = glossaryNameGuidCacheForImport.get().get(glossaryName); + String glossaryGuid = glossaryNameGuidCache.get().get(glossaryName); if (StringUtils.isEmpty(glossaryGuid)) { glossaryGuid = getGlossaryGUIDFromGraphDB(glossaryName); @@ -569,13 +573,15 @@ public class GlossaryTermUtils extends GlossaryUtils { glossaryGuid = createGlossary(glossaryName, failedTermMsgs); } - glossaryNameGuidCacheForImport.get().put(glossaryName, glossaryGuid); + glossaryNameGuidCache.get().put(glossaryName, glossaryGuid); } if (StringUtils.isNotEmpty(glossaryGuid)) { glossaryTerm = populateGlossaryTermObject(failedTermMsgs, record, glossaryGuid, false); - glossaryTerm.setQualifiedName(getGlossaryTermQualifiedName(glossaryTerm.getName(), glossaryGuid)); + glossaryTerm.setQualifiedName(getGlossaryTermQualifiedName(glossaryTerm.getName(), glossaryName)); + + glossaryTermOrderCache.get().put(glossaryTerm.getQualifiedName(), rowCount); glossaryTerms.add(glossaryTerm); } @@ -604,12 +610,12 @@ public class GlossaryTermUtils extends GlossaryUtils { if (ArrayUtils.isNotEmpty(record) && StringUtils.isNotBlank(record[INDEX_FOR_GLOSSARY_AT_RECORD])) { AtlasGlossaryTerm glossaryTerm = new AtlasGlossaryTerm(); String glossaryName = record[INDEX_FOR_GLOSSARY_AT_RECORD]; - String glossaryGuid = glossaryNameGuidCacheForImport.get().get(glossaryName); + String glossaryGuid = glossaryNameGuidCache.get().get(glossaryName); if (StringUtils.isNotEmpty(glossaryGuid)) { glossaryTerm = populateGlossaryTermObject(failedTermMsgs, record, glossaryGuid, true); - glossaryTerm.setQualifiedName(getGlossaryTermQualifiedName(glossaryTerm.getName(), glossaryGuid)); + glossaryTerm.setQualifiedName(getGlossaryTermQualifiedName(glossaryTerm.getName(), glossaryName)); glossaryTerms.add(glossaryTerm); } @@ -656,6 +662,19 @@ public class GlossaryTermUtils extends GlossaryUtils { return String.join(", ", ret); } + public void updateGlossaryTermRelations(AtlasGlossaryTerm updatedGlossaryTerm) { + if (glossaryTermQNameGuidCache.get().containsKey(updatedGlossaryTerm.getQualifiedName())) { + try { + AtlasGlossaryTerm glossaryTermFromDB = getGlossaryTem(glossaryTermQNameGuidCache.get().get(updatedGlossaryTerm.getQualifiedName())); + copyRelations(updatedGlossaryTerm, glossaryTermFromDB); + } catch (AtlasBaseException e) { + if (DEBUG_ENABLED) { + LOG.debug("Error occurred while loading glossary Term", e); + } + } + } + } + protected Map getMapValue(String csvRecord, List<String> failedTermMsgs, boolean populateRelations) { Map ret = null; @@ -702,21 +721,28 @@ public class GlossaryTermUtils extends GlossaryUtils { AtlasVertex vertex = null; String dataArray[] = data.split(FileUtils.ESCAPE_CHARACTER + FileUtils.COLON_CHARACTER); - if ((dataArray.length % 2) == 0) { + if (dataArray.length == 2) { + String relatedTermQualifiedName = dataArray[1] + invalidNameChars[0] + dataArray[0]; + String currTermQualifiedName = termName + invalidNameChars[0] + glossaryName; + vertex = AtlasGraphUtilsV2.findByTypeAndUniquePropertyName(GlossaryUtils.ATLAS_GLOSSARY_TERM_TYPENAME, - GlossaryUtils.ATLAS_GLOSSARY_TERM_TYPENAME + invalidNameChars[1] + QUALIFIED_NAME_ATTR, dataArray[1] + invalidNameChars[0] + dataArray[0]); - } else { - failedTermMsgs.add("Either incorrect data specified for Term or Term does not exist : " +termName); - } + GlossaryUtils.ATLAS_GLOSSARY_TERM_TYPENAME + invalidNameChars[1] + QUALIFIED_NAME_ATTR, relatedTermQualifiedName); + + if (vertex != null) { + String glossaryTermGuid = AtlasGraphUtilsV2.getIdFromVertex(vertex); - if (vertex != null) { - String glossaryTermGuid = AtlasGraphUtilsV2.getIdFromVertex(vertex); - relatedTermHeader = new AtlasRelatedTermHeader(); - relatedTermHeader.setTermGuid(glossaryTermGuid); - ret.add(relatedTermHeader); + relatedTermHeader = new AtlasRelatedTermHeader(); + relatedTermHeader.setTermGuid(glossaryTermGuid); + + cacheRelatedTermQNameGuid(currTermQualifiedName, relatedTermQualifiedName, glossaryTermGuid); + + ret.add(relatedTermHeader); + } else { + failedTermMsgs.add("The provided Reference " + dataArray[1] + "@" + dataArray[0] + + " does not exist at Atlas referred at record with TermName : " + termName + " and GlossaryName : " + glossaryName); + } } else { - failedTermMsgs.add("The provided Reference " + dataArray[1] + "@" + dataArray[0] + - " does not exist at Atlas referred at record with TermName : " + termName + " and GlossaryName : " + glossaryName); + failedTermMsgs.add("Incorrect relation data specified for the term : " + termName + "@" + glossaryName); } } } @@ -759,6 +785,8 @@ public class GlossaryTermUtils extends GlossaryUtils { ret.setSynonyms((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[INDEX_FOR_GLOSSARY_AT_RECORD], failedTermMsgList) : null); + ret.setReplacedBy((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[INDEX_FOR_GLOSSARY_AT_RECORD], failedTermMsgList) : null); + ret.setValidValues((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[INDEX_FOR_GLOSSARY_AT_RECORD], failedTermMsgList) : null); ret.setReplacementTerms((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[INDEX_FOR_GLOSSARY_AT_RECORD], failedTermMsgList) : null); @@ -782,10 +810,8 @@ public class GlossaryTermUtils extends GlossaryUtils { return ret; } - private String getGlossaryTermQualifiedName(String glossaryTermName, String glossaryGuid) throws AtlasBaseException { - AtlasGlossary glossary = dataAccess.load(getGlossarySkeleton(glossaryGuid)); - - return glossaryTermName + "@" + glossary.getQualifiedName(); + private String getGlossaryTermQualifiedName(String glossaryTermName, String glossaryName) throws AtlasBaseException { + return glossaryTermName + "@" + glossaryName; } private String getGlossaryGUIDFromGraphDB(String glossaryName) { @@ -812,4 +838,137 @@ public class GlossaryTermUtils extends GlossaryUtils { return ret; } + + private void cacheRelatedTermQNameGuid(String currTermQualifiedName, String relatedTermQualifiedName, String termGuid) { + if (!glossaryTermQNameGuidCache.get().containsKey(relatedTermQualifiedName) && + glossaryTermOrderCache.get().containsKey(currTermQualifiedName) && + glossaryTermOrderCache.get().containsKey(relatedTermQualifiedName) && + glossaryTermOrderCache.get().get(currTermQualifiedName) < glossaryTermOrderCache.get().get(relatedTermQualifiedName)) { + glossaryTermQNameGuidCache.get().put(relatedTermQualifiedName, termGuid); + } + } + + private AtlasGlossaryTerm getGlossaryTem(String termGuid) throws AtlasBaseException { + AtlasGlossaryTerm ret = null; + + if (DEBUG_ENABLED) { + LOG.debug("==> GlossaryTemUtils.getGlossaryTem({})", termGuid); + } + + if (!Objects.isNull(termGuid)) { + AtlasGlossaryTerm atlasGlossary = getAtlasGlossaryTermSkeleton(termGuid); + ret = dataAccess.load(atlasGlossary); + + if (DEBUG_ENABLED) { + LOG.debug("<== GlossaryTemUtils.getGlossaryTem() : {}", ret); + } + } + return ret; + } + + private void copyRelations(AtlasGlossaryTerm toGlossaryTerm, AtlasGlossaryTerm fromGlossaryTerm) { + if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getSeeAlso())) { + if (CollectionUtils.isNotEmpty(toGlossaryTerm.getSeeAlso())) { + toGlossaryTerm.getSeeAlso().addAll(fromGlossaryTerm.getSeeAlso()); + } else { + toGlossaryTerm.setSeeAlso(fromGlossaryTerm.getSeeAlso()); + } + } + + if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getSynonyms())) { + if (CollectionUtils.isNotEmpty(toGlossaryTerm.getSynonyms())) { + toGlossaryTerm.getSynonyms().addAll(fromGlossaryTerm.getSynonyms()); + } else { + toGlossaryTerm.setSynonyms(fromGlossaryTerm.getSynonyms()); + } + } + + if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getAntonyms())) { + if (CollectionUtils.isNotEmpty(toGlossaryTerm.getAntonyms())) { + toGlossaryTerm.getAntonyms().addAll(fromGlossaryTerm.getAntonyms()); + } else { + toGlossaryTerm.setAntonyms(fromGlossaryTerm.getAntonyms()); + } + } + + if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getPreferredTerms())) { + if (CollectionUtils.isNotEmpty(toGlossaryTerm.getPreferredTerms())) { + toGlossaryTerm.getPreferredTerms().addAll(fromGlossaryTerm.getPreferredTerms()); + } else { + toGlossaryTerm.setPreferredTerms(fromGlossaryTerm.getPreferredTerms()); + } + } + + if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getPreferredToTerms())) { + if (CollectionUtils.isNotEmpty(toGlossaryTerm.getPreferredToTerms())) { + toGlossaryTerm.getPreferredToTerms().addAll(fromGlossaryTerm.getPreferredToTerms()); + } else { + toGlossaryTerm.setPreferredToTerms(fromGlossaryTerm.getPreferredToTerms()); + } + } + + if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getReplacementTerms())) { + if (CollectionUtils.isNotEmpty(toGlossaryTerm.getReplacementTerms())) { + toGlossaryTerm.getReplacementTerms().addAll(fromGlossaryTerm.getReplacementTerms()); + } else { + toGlossaryTerm.setReplacementTerms(fromGlossaryTerm.getReplacementTerms()); + } + } + + if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getReplacedBy())) { + if (CollectionUtils.isNotEmpty(toGlossaryTerm.getReplacedBy())) { + toGlossaryTerm.getReplacedBy().addAll(fromGlossaryTerm.getReplacedBy()); + } else { + toGlossaryTerm.setReplacedBy(fromGlossaryTerm.getReplacedBy()); + } + } + + if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getTranslationTerms())) { + if (CollectionUtils.isNotEmpty(toGlossaryTerm.getTranslationTerms())) { + toGlossaryTerm.getTranslationTerms().addAll(fromGlossaryTerm.getTranslationTerms()); + } else { + toGlossaryTerm.setTranslationTerms(fromGlossaryTerm.getTranslationTerms()); + } + } + + if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getTranslatedTerms())) { + if (CollectionUtils.isNotEmpty(toGlossaryTerm.getTranslatedTerms())) { + toGlossaryTerm.getTranslatedTerms().addAll(fromGlossaryTerm.getTranslatedTerms()); + } else { + toGlossaryTerm.setTranslatedTerms(fromGlossaryTerm.getTranslatedTerms()); + } + } + + if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getIsA())) { + if (CollectionUtils.isNotEmpty(toGlossaryTerm.getIsA())) { + toGlossaryTerm.getIsA().addAll(fromGlossaryTerm.getIsA()); + } else { + toGlossaryTerm.setIsA(fromGlossaryTerm.getIsA()); + } + } + + if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getClassifies())) { + if (CollectionUtils.isNotEmpty(toGlossaryTerm.getClassifies())) { + toGlossaryTerm.getClassifies().addAll(fromGlossaryTerm.getClassifies()); + } else { + toGlossaryTerm.setClassifies(fromGlossaryTerm.getClassifies()); + } + } + + if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getValidValues())) { + if (CollectionUtils.isNotEmpty(toGlossaryTerm.getValidValues())) { + toGlossaryTerm.getValidValues().addAll(fromGlossaryTerm.getValidValues()); + } else { + toGlossaryTerm.setValidValues(fromGlossaryTerm.getValidValues()); + } + } + + if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getValidValuesFor())) { + if (CollectionUtils.isNotEmpty(toGlossaryTerm.getValidValuesFor())) { + toGlossaryTerm.getValidValuesFor().addAll(fromGlossaryTerm.getValidValuesFor()); + } else { + toGlossaryTerm.setValidValuesFor(fromGlossaryTerm.getValidValuesFor()); + } + } + } }