This is an automated email from the ASF dual-hosted git repository.

mandarambawane pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new ab3257e7b ATLAS-5006 : Atlas incremental export takes too much time 
(almost equivalent to bootstrap export with the entire data) (#312)
ab3257e7b is described below

commit ab3257e7ba6548099639132e37c954986e49ea03
Author: mandarambawane <mandar.ambaw...@freestoneinfotech.com>
AuthorDate: Mon Apr 14 14:55:13 2025 +0530

    ATLAS-5006 : Atlas incremental export takes too much time (almost 
equivalent to bootstrap export with the entire data) (#312)
---
 .../atlas/repository/graphdb/AtlasGraph.java       |  8 +++
 .../repository/graphdb/janus/AtlasJanusGraph.java  | 12 ++++
 .../atlas/repository/impexp/ExportService.java     | 83 +++++++++++++++++++++-
 .../store/graph/v2/EntityGraphRetriever.java       |  4 ++
 4 files changed, 105 insertions(+), 2 deletions(-)

diff --git 
a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java 
b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java
index c1f7109ab..1c3e94924 100644
--- 
a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java
+++ 
b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java
@@ -27,6 +27,7 @@ import javax.script.ScriptException;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -359,4 +360,11 @@ public interface AtlasGraph<V, E> {
      * @throws AtlasException when error encountered in creating the client.
      */
     AtlasGraphIndexClient getGraphIndexClient() throws AtlasException;
+
+    /**
+     *
+     * @param vertex
+     * @return
+     */
+    List<AtlasVertex> getAllEdgesVertices(AtlasVertex vertex);
 }
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
index eeca84f54..fb29e4cec 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
@@ -78,6 +78,7 @@ import javax.script.ScriptException;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -419,6 +420,17 @@ public class AtlasJanusGraph implements 
AtlasGraph<AtlasJanusVertex, AtlasJanusE
         return this.janusGraph;
     }
 
+    @Override
+    public List<AtlasVertex> getAllEdgesVertices(AtlasVertex vertex) {
+        GraphTraversal gt = V(vertex.getId()).both();
+        List<AtlasVertex> resultList = new ArrayList<>();
+        while (gt.hasNext()) {
+            Vertex v = (Vertex) gt.next();
+            resultList.add(GraphDbObjectFactory.createVertex(this, v));
+        }
+        return resultList;
+    }
+
     public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> 
wrapVertices(Iterable<? extends Vertex> it) {
         return Iterables.transform(it, (Function<Vertex, 
AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>>) input -> 
GraphDbObjectFactory.createVertex(AtlasJanusGraph.this, input));
     }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index 54b5aba97..8ccc63c1b 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -33,7 +33,10 @@ import org.apache.atlas.model.typedef.AtlasEnumDef;
 import org.apache.atlas.model.typedef.AtlasRelationshipDef;
 import org.apache.atlas.model.typedef.AtlasStructDef;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
 import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
 import org.apache.atlas.repository.util.UniqueList;
 import org.apache.atlas.type.AtlasTypeRegistry;
@@ -56,6 +59,8 @@ import java.util.Set;
 import static 
org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_CONNECTED;
 import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL;
 import static 
org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL;
+import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY;
+import static 
org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
 
 @Component
 public class ExportService {
@@ -69,6 +74,7 @@ public class ExportService {
     private final GlossaryService                 glossaryService;
     private final AuditsWriter                    auditsWriter;
     private       ExportTypeProcessor             exportTypeProcessor;
+    private static final String                   ATLAS_TYPE_HIVE_DB = 
"hive_db";
 
     @Inject
     public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph 
graph, AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator, 
GlossaryService glossaryService) {
@@ -239,6 +245,9 @@ public class ExportService {
             
entitiesExtractor.setExtractor(typeRegistry.getEntityDefByName(item.getTypeName()));
 
             for (String guid : entityGuids) {
+                AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(guid);
+                String typeName = GraphHelper.getTypeName(vertex);
+                context.startingEntityType = typeName;
                 processEntityGuid(guid, context);
             }
 
@@ -283,13 +292,72 @@ public class ExportService {
             return;
         }
 
-        AtlasEntityWithExtInfo entityWithExtInfo = 
entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
+        if (context.fetchType == ExportFetchType.INCREMENTAL && 
context.startingEntityType.equals(ATLAS_TYPE_HIVE_DB) && !context.skipLineage) {
+            AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(guid);
 
-        processEntity(entityWithExtInfo, context);
+            processVertex(context, vertex, guid);
+        } else {
+            AtlasEntityWithExtInfo entityWithExtInfo = 
entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
+
+            processEntity(entityWithExtInfo, context);
+        }
 
         LOG.debug("<== processEntityGuid({})", guid);
     }
 
+    public void processVertex(ExportContext context, AtlasVertex vertex, 
String guid) throws AtlasBaseException {
+        if (MapUtils.isNotEmpty(context.termsGlossary)) {
+            addGlossaryEntities(context);
+        }
+
+        addVertex(vertex, guid, context);
+
+        context.guidsProcessed.add(guid);
+
+        extractConnectedVertices(vertex, context);
+    }
+
+    public void extractConnectedVertices(AtlasVertex vertex, ExportContext 
context) {
+        List<AtlasVertex> connectedVertices = 
entityGraphRetriever.findAllConnectedVertices(vertex);
+
+        if (CollectionUtils.isNotEmpty(connectedVertices)) {
+            for (AtlasVertex e : connectedVertices) {
+                String typeName = GraphHelper.getTypeName(e);
+
+                if (typeRegistry.getEntityTypeByName(typeName) != null) {
+                    String guid = AtlasGraphUtilsV2.getEncodedProperty(e, 
GUID_PROPERTY_KEY, String.class);
+
+                    if (!context.guidsProcessed.contains(guid)) {
+                        context.guidsToProcess.add(guid);
+                    }
+                }
+            }
+        }
+    }
+
+    private void addVertex(AtlasVertex vertex, String guid, ExportContext 
context) throws AtlasBaseException {
+        if (context.sink.hasEntity(guid)) {
+            return;
+        }
+
+        LOG.info("export: Guid in process: {}", guid);
+        if (context.doesTimestampQualify(vertex)) {
+            AtlasEntityWithExtInfo entityWithExtInfo = 
entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
+            exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), 
context);
+            context.addToSink(entityWithExtInfo);
+
+            context.result.incrementMeticsCounter(String.format("entity:%s", 
entityWithExtInfo.getEntity().getTypeName()));
+            if (entityWithExtInfo.getReferredEntities() != null) {
+                for (AtlasEntity e : 
entityWithExtInfo.getReferredEntities().values()) {
+                    
context.result.incrementMeticsCounter(String.format("entity:%s", 
e.getTypeName()));
+                }
+            }
+
+            context.result.incrementMeticsCounter("entity:withExtInfo");
+        }
+        context.reportProgress();
+    }
+
     private void addGlossaryEntities(ExportContext context) {
         try {
             for (String termGuid : context.termsGlossary.keySet()) {
@@ -403,6 +471,7 @@ public class ExportService {
 
         boolean isSkipConnectedFetch;
         private int progressReportCount;
+        public String startingEntityType;
 
         ExportContext(AtlasExportResult result, ZipSink sink) {
             this.result = result;
@@ -442,6 +511,7 @@ public class ExportService {
             guidsToProcess.clear();
             guidsProcessed.clear();
             guidDirection.clear();
+            startingEntityType = null;
         }
 
         public void addToBeProcessed(boolean isSuperTypeProcess, String guid, 
TraversalDirection direction) {
@@ -470,6 +540,15 @@ public class ExportService {
             return changeMarker <= entity.getUpdateTime().getTime();
         }
 
+        public boolean doesTimestampQualify(AtlasVertex vertex) {
+            if (fetchType != ExportFetchType.INCREMENTAL) {
+                return true;
+            }
+
+            Long updatedTime = AtlasGraphUtilsV2.getEncodedProperty(vertex, 
MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class);
+            return changeMarker <= updatedTime;
+        }
+
         public boolean getSkipLineage() {
             return skipLineage;
         }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
index 7f98da30e..a6aa1695f 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
@@ -482,6 +482,10 @@ public class EntityGraphRetriever {
         return ret;
     }
 
+    public List<AtlasVertex> findAllConnectedVertices(AtlasVertex vertex) {
+        return graph.getAllEdgesVertices(vertex);
+    }
+
     public Map<String, Object> getEntityUniqueAttribute(AtlasVertex 
entityVertex) throws AtlasBaseException {
         Map<String, Object> ret        = null;
         String              typeName   = 
AtlasGraphUtilsV2.getTypeName(entityVertex);

Reply via email to