This is an automated email from the ASF dual-hosted git repository. mandarambawane pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new ab3257e7b ATLAS-5006 : Atlas incremental export takes too much time (almost equivalent to bootstrap export with the entire data) (#312) ab3257e7b is described below commit ab3257e7ba6548099639132e37c954986e49ea03 Author: mandarambawane <mandar.ambaw...@freestoneinfotech.com> AuthorDate: Mon Apr 14 14:55:13 2025 +0530 ATLAS-5006 : Atlas incremental export takes too much time (almost equivalent to bootstrap export with the entire data) (#312) --- .../atlas/repository/graphdb/AtlasGraph.java | 8 +++ .../repository/graphdb/janus/AtlasJanusGraph.java | 12 ++++ .../atlas/repository/impexp/ExportService.java | 83 +++++++++++++++++++++- .../store/graph/v2/EntityGraphRetriever.java | 4 ++ 4 files changed, 105 insertions(+), 2 deletions(-) diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java index c1f7109ab..1c3e94924 100644 --- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java +++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java @@ -27,6 +27,7 @@ import javax.script.ScriptException; import java.io.IOException; import java.io.OutputStream; +import java.util.List; import java.util.Map; import java.util.Set; @@ -359,4 +360,11 @@ public interface AtlasGraph<V, E> { * @throws AtlasException when error encountered in creating the client. */ AtlasGraphIndexClient getGraphIndexClient() throws AtlasException; + + /** + * + * @param vertex + * @return + */ + List<AtlasVertex> getAllEdgesVertices(AtlasVertex vertex); } diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java index eeca84f54..fb29e4cec 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java @@ -78,6 +78,7 @@ import javax.script.ScriptException; import java.io.IOException; import java.io.OutputStream; +import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; @@ -419,6 +420,17 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE return this.janusGraph; } + @Override + public List<AtlasVertex> getAllEdgesVertices(AtlasVertex vertex) { + GraphTraversal gt = V(vertex.getId()).both(); + List<AtlasVertex> resultList = new ArrayList<>(); + while (gt.hasNext()) { + Vertex v = (Vertex) gt.next(); + resultList.add(GraphDbObjectFactory.createVertex(this, v)); + } + return resultList; + } + public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> wrapVertices(Iterable<? extends Vertex> it) { return Iterables.transform(it, (Function<Vertex, AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>>) input -> GraphDbObjectFactory.createVertex(AtlasJanusGraph.this, input)); } diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java index 54b5aba97..8ccc63c1b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java @@ -33,7 +33,10 @@ import org.apache.atlas.model.typedef.AtlasEnumDef; import org.apache.atlas.model.typedef.AtlasRelationshipDef; import org.apache.atlas.model.typedef.AtlasStructDef; import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; import org.apache.atlas.repository.util.UniqueList; import org.apache.atlas.type.AtlasTypeRegistry; @@ -56,6 +59,8 @@ import java.util.Set; import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_CONNECTED; import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL; import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL; +import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY; @Component public class ExportService { @@ -69,6 +74,7 @@ public class ExportService { private final GlossaryService glossaryService; private final AuditsWriter auditsWriter; private ExportTypeProcessor exportTypeProcessor; + private static final String ATLAS_TYPE_HIVE_DB = "hive_db"; @Inject public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph graph, AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator, GlossaryService glossaryService) { @@ -239,6 +245,9 @@ public class ExportService { entitiesExtractor.setExtractor(typeRegistry.getEntityDefByName(item.getTypeName())); for (String guid : entityGuids) { + AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(guid); + String typeName = GraphHelper.getTypeName(vertex); + context.startingEntityType = typeName; processEntityGuid(guid, context); } @@ -283,13 +292,72 @@ public class ExportService { return; } - AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid); + if (context.fetchType == ExportFetchType.INCREMENTAL && context.startingEntityType.equals(ATLAS_TYPE_HIVE_DB) && !context.skipLineage) { + AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(guid); - processEntity(entityWithExtInfo, context); + processVertex(context, vertex, guid); + } else { + AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid); + + processEntity(entityWithExtInfo, context); + } LOG.debug("<== processEntityGuid({})", guid); } + public void processVertex(ExportContext context, AtlasVertex vertex, String guid) throws AtlasBaseException { + if (MapUtils.isNotEmpty(context.termsGlossary)) { + addGlossaryEntities(context); + } + + addVertex(vertex, guid, context); + + context.guidsProcessed.add(guid); + + extractConnectedVertices(vertex, context); + } + + public void extractConnectedVertices(AtlasVertex vertex, ExportContext context) { + List<AtlasVertex> connectedVertices = entityGraphRetriever.findAllConnectedVertices(vertex); + + if (CollectionUtils.isNotEmpty(connectedVertices)) { + for (AtlasVertex e : connectedVertices) { + String typeName = GraphHelper.getTypeName(e); + + if (typeRegistry.getEntityTypeByName(typeName) != null) { + String guid = AtlasGraphUtilsV2.getEncodedProperty(e, GUID_PROPERTY_KEY, String.class); + + if (!context.guidsProcessed.contains(guid)) { + context.guidsToProcess.add(guid); + } + } + } + } + } + + private void addVertex(AtlasVertex vertex, String guid, ExportContext context) throws AtlasBaseException { + if (context.sink.hasEntity(guid)) { + return; + } + + LOG.info("export: Guid in process: {}", guid); + if (context.doesTimestampQualify(vertex)) { + AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid); + exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context); + context.addToSink(entityWithExtInfo); + + context.result.incrementMeticsCounter(String.format("entity:%s", entityWithExtInfo.getEntity().getTypeName())); + if (entityWithExtInfo.getReferredEntities() != null) { + for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) { + context.result.incrementMeticsCounter(String.format("entity:%s", e.getTypeName())); + } + } + + context.result.incrementMeticsCounter("entity:withExtInfo"); + } + context.reportProgress(); + } + private void addGlossaryEntities(ExportContext context) { try { for (String termGuid : context.termsGlossary.keySet()) { @@ -403,6 +471,7 @@ public class ExportService { boolean isSkipConnectedFetch; private int progressReportCount; + public String startingEntityType; ExportContext(AtlasExportResult result, ZipSink sink) { this.result = result; @@ -442,6 +511,7 @@ public class ExportService { guidsToProcess.clear(); guidsProcessed.clear(); guidDirection.clear(); + startingEntityType = null; } public void addToBeProcessed(boolean isSuperTypeProcess, String guid, TraversalDirection direction) { @@ -470,6 +540,15 @@ public class ExportService { return changeMarker <= entity.getUpdateTime().getTime(); } + public boolean doesTimestampQualify(AtlasVertex vertex) { + if (fetchType != ExportFetchType.INCREMENTAL) { + return true; + } + + Long updatedTime = AtlasGraphUtilsV2.getEncodedProperty(vertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class); + return changeMarker <= updatedTime; + } + public boolean getSkipLineage() { return skipLineage; } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java index 7f98da30e..a6aa1695f 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java @@ -482,6 +482,10 @@ public class EntityGraphRetriever { return ret; } + public List<AtlasVertex> findAllConnectedVertices(AtlasVertex vertex) { + return graph.getAllEdgesVertices(vertex); + } + public Map<String, Object> getEntityUniqueAttribute(AtlasVertex entityVertex) throws AtlasBaseException { Map<String, Object> ret = null; String typeName = AtlasGraphUtilsV2.getTypeName(entityVertex);