This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new dc9e268  ATLAS-4015: Add Re-indexing as JAVA_PATCH.
dc9e268 is described below

commit dc9e268966fa27df400e827fd6e57a5098d7d478
Author: Ashutosh Mestry <ames...@cloudera.com>
AuthorDate: Tue Nov 10 09:47:00 2020 -0800

    ATLAS-4015: Add Re-indexing as JAVA_PATCH.
---
 .../repository/graphdb/AtlasGraphManagement.java   |   8 +
 .../graphdb/janus/AtlasJanusGraphManagement.java   |  64 +++++++
 .../java/org/apache/atlas/AtlasConfiguration.java  |   3 +-
 .../repository/patches/AtlasPatchManager.java      |   3 +-
 .../patches/ConcurrentPatchProcessor.java          |   6 +-
 .../atlas/repository/patches/ReIndexPatch.java     | 199 +++++++++++++++++++++
 6 files changed, 278 insertions(+), 5 deletions(-)

diff --git 
a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java
 
b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java
index f7d2e27..7e3b2f4 100644
--- 
a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java
+++ 
b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java
@@ -172,4 +172,12 @@ public interface AtlasGraphManagement {
      * Set consistency to ConsistencyModifier.LOCK for all vertex and edge 
indexes.
      */
     void updateUniqueIndexesForConsistencyLock();
+
+    /***
+     * Re-index elements.
+     * @param indexName: Name of the index that needs to be operated on.
+     * @param elements: Elements to be re-indexed.
+     * @throws Exception
+     */
+    void reindex(String indexName, List<AtlasElement> elements) throws 
Exception;
 }
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
index 2a2ef92..1cc7f8b 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
@@ -19,14 +19,22 @@ package org.apache.atlas.repository.graphdb.janus;
 
 import com.google.common.base.Preconditions;
 import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
+import org.apache.atlas.repository.graphdb.AtlasElement;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Element;
 import org.janusgraph.core.Cardinality;
 import org.janusgraph.core.EdgeLabel;
+import org.janusgraph.core.JanusGraphElement;
 import org.janusgraph.core.PropertyKey;
 import org.janusgraph.core.schema.*;
 import org.janusgraph.core.schema.JanusGraphManagement.IndexBuilder;
+import org.janusgraph.diskstorage.BackendTransaction;
+import org.janusgraph.diskstorage.indexing.IndexEntry;
+import org.janusgraph.graphdb.database.IndexSerializer;
+import org.janusgraph.graphdb.database.StandardJanusGraph;
+import org.janusgraph.graphdb.database.management.ManagementSystem;
 import org.janusgraph.graphdb.internal.Token;
 import org.apache.atlas.repository.graphdb.AtlasCardinality;
 import org.apache.atlas.repository.graphdb.AtlasEdgeLabel;
@@ -36,11 +44,16 @@ import org.apache.atlas.repository.graphdb.AtlasPropertyKey;
 import org.apache.commons.lang.StringUtils;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.janusgraph.graphdb.transaction.StandardJanusGraphTx;
+import org.janusgraph.graphdb.types.IndexType;
+import org.janusgraph.graphdb.types.MixedIndexType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -308,4 +321,55 @@ public class AtlasJanusGraphManagement implements 
AtlasGraphManagement {
             LOG.info("setConsistency: {}: {}: Done!", 
elementType.getSimpleName(), count);
         }
     }
+
+    @Override
+    public void reindex(String indexName, List<AtlasElement> elements) throws 
Exception {
+        try {
+            JanusGraphIndex index = management.getGraphIndex(indexName);
+            if (index == null || !(management instanceof ManagementSystem) || 
!(graph.getGraph() instanceof StandardJanusGraph)) {
+                LOG.error("Could not retrieve index for name: {} ", indexName);
+                return;
+            }
+
+            ManagementSystem managementSystem = (ManagementSystem) management;
+            IndexType indexType = 
managementSystem.getSchemaVertex(index).asIndexType();
+            if (!(indexType instanceof MixedIndexType)) {
+                LOG.warn("Index: {}: Not of MixedIndexType ", indexName);
+                return;
+            }
+
+            IndexSerializer indexSerializer = ((StandardJanusGraph) 
graph.getGraph()).getIndexSerializer();
+            reindexElement(managementSystem, indexSerializer, (MixedIndexType) 
indexType, elements);
+        } catch (Exception exception) {
+            throw exception;
+        } finally {
+            management.commit();
+        }
+    }
+
+    private void reindexElement(ManagementSystem managementSystem, 
IndexSerializer indexSerializer, MixedIndexType indexType, List<AtlasElement> 
elements) throws Exception {
+        Map<String, Map<String, List<IndexEntry>>> documentsPerStore = new 
HashMap<>();
+        StandardJanusGraphTx tx = managementSystem.getWrappedTx();
+        BackendTransaction txHandle = tx.getTxHandle();
+
+        try {
+            JanusGraphElement janusGraphElement = null;
+            for (AtlasElement element : elements) {
+                try {
+                    if (element == null || element.getWrappedElement() == 
null) {
+                        continue;
+                    }
+
+                    janusGraphElement = element.getWrappedElement();
+                    indexSerializer.reindexElement(janusGraphElement, 
indexType, documentsPerStore);
+                } catch (Exception e) {
+                    LOG.warn("{}: Exception: {}:{}", indexType.getName(), 
e.getClass().getSimpleName(), e.getMessage());
+                }
+            }
+        } finally {
+            if (txHandle != null) {
+                
txHandle.getIndexTransaction(indexType.getBackingIndexName()).restore(documentsPerStore);
+            }
+        }
+    }
 }
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java 
b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 1c79158..2d3dfdf 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -73,7 +73,8 @@ public enum AtlasConfiguration {
     LINEAGE_USING_GREMLIN("atlas.lineage.query.use.gremlin", false),
 
     HTTP_HEADER_SERVER_VALUE("atlas.http.header.server.value","Apache Atlas"),
-    
STORAGE_CONSISTENCY_LOCK_ENABLED("atlas.graph.storage.consistency-lock.enabled",
 true);
+    
STORAGE_CONSISTENCY_LOCK_ENABLED("atlas.graph.storage.consistency-lock.enabled",
 true),
+    REINDEX_PATCH_ENABLED("atlas.patch.reindex.enabled", false);
 
     private static final Configuration APPLICATION_PROPERTIES;
 
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
 
b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
index b142a2a..478376b 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
@@ -54,7 +54,8 @@ public class AtlasPatchManager {
                 new ClassificationTextPatch(context),
                 new FreeTextRequestHandlerPatch(context),
                 new SuggestionsRequestHandlerPatch(context),
-                new IndexConsistencyPatch(context)
+                new IndexConsistencyPatch(context),
+                new ReIndexPatch(context)
         };
 
         try {
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
 
b/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
index c6f0e64..e5dcb2e 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
@@ -43,8 +43,8 @@ public abstract class ConcurrentPatchProcessor {
     private static final String BATCH_SIZE_PROPERTY  = "atlas.patch.batchSize";
     private static final String ATLAS_SOLR_SHARDS    = "ATLAS_SOLR_SHARDS";
     private static final String WORKER_NAME_PREFIX   = "patchWorkItem";
-    private static final int    NUM_WORKERS;
-    private static final int    BATCH_SIZE;
+    public static final int    NUM_WORKERS;
+    public static final int    BATCH_SIZE;
 
     private final EntityGraphMapper        entityGraphMapper;
     private final AtlasGraph               graph;
@@ -61,7 +61,7 @@ public abstract class ConcurrentPatchProcessor {
             numWorkers = config.getInt(NUM_WORKERS_PROPERTY, 
config.getInt(ATLAS_SOLR_SHARDS, 1) * 3);
             batchSize  = config.getInt(BATCH_SIZE_PROPERTY, 300);
 
-            LOG.info("UniqueAttributePatch: {}={}, {}={}", 
NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize);
+            LOG.info("ConcurrentPatchProcessor: {}={}, {}={}", 
NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize);
         } catch (Exception e) {
             LOG.error("Error retrieving configuration.", e);
         }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/patches/ReIndexPatch.java
 
b/repository/src/main/java/org/apache/atlas/repository/patches/ReIndexPatch.java
new file mode 100644
index 0000000..a47a2cc
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/patches/ReIndexPatch.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.pc.WorkItemBuilder;
+import org.apache.atlas.pc.WorkItemConsumer;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasElement;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
+
+public class ReIndexPatch extends AtlasPatchHandler {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReIndexPatch.class);
+
+    private static final String PATCH_ID = "JAVA_PATCH_0000_006";
+    private static final String PATCH_DESCRIPTION = "Performs reindex on all 
the indexes.";
+
+    private final PatchContext context;
+
+    public ReIndexPatch(PatchContext context) {
+        super(context.getPatchRegistry(), PATCH_ID, PATCH_DESCRIPTION);
+        this.context = context;
+    }
+
+    @Override
+    public void apply() throws AtlasBaseException {
+        if (AtlasConfiguration.REINDEX_PATCH_ENABLED.getBoolean() == false) {
+            return;
+        }
+
+        try {
+            LOG.info("ReIndexPatch: Starting...");
+            ReindexPatchProcessor reindexPatchProcessor = new 
ReindexPatchProcessor(context);
+
+            reindexPatchProcessor.repairVertices();
+            reindexPatchProcessor.repairEdges();
+        } catch (Exception exception) {
+            LOG.error("Error while reindexing.", exception);
+        } finally {
+            LOG.info("ReIndexPatch: Done!");
+        }
+
+        setStatus(UNKNOWN);
+
+        LOG.info("ReIndexPatch.apply(): patchId={}, status={}", getPatchId(), 
getStatus());
+    }
+
+    public static class ReindexPatchProcessor {
+        private static String[] vertexIndexNames = new String[]{ 
Constants.VERTEX_INDEX, Constants.FULLTEXT_INDEX };
+        private static String[] edgeIndexNames = new String[]{ 
Constants.EDGE_INDEX };
+        private static String WORKER_PREFIX = "reindex";
+
+        private PatchContext context;
+
+        public ReindexPatchProcessor(PatchContext context) {
+            this.context = context;
+        }
+
+        public void repairVertices() {
+            repairElements(ReindexPatchProcessor::vertices, vertexIndexNames);
+        }
+
+        public void repairEdges() {
+            repairElements(ReindexPatchProcessor::edges, edgeIndexNames);
+        }
+
+        private void repairElements(BiConsumer<WorkItemManager, AtlasGraph> 
action, String[] indexNames) {
+            WorkItemManager manager = new WorkItemManager(new 
ReindexConsumerBuilder(context.getGraph(), indexNames),
+                    WORKER_PREFIX, ConcurrentPatchProcessor.BATCH_SIZE, 
ConcurrentPatchProcessor.NUM_WORKERS, false);
+
+            try {
+                LOG.info("repairElements.execute(): {}: Starting...", 
indexNames);
+                action.accept(manager, context.getGraph());
+                manager.drain();
+            } finally {
+                try {
+                    manager.shutdown();
+                } catch (InterruptedException e) {
+                    LOG.error("repairEdges.execute(): interrupted during 
WorkItemManager shutdown.", e);
+                }
+
+                LOG.info("repairElements.execute(): {}: Done!", indexNames);
+            }
+        }
+
+        private static void edges(WorkItemManager manager, AtlasGraph graph) {
+            Iterable<Object> iterable = graph.getEdges();
+            for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); 
) {
+                manager.checkProduce(iter.next());
+            }
+        }
+
+        private static void vertices(WorkItemManager manager, AtlasGraph 
graph) {
+            Iterable<AtlasVertex> iterable = graph.getVertices();
+            for (Iterator<AtlasVertex> iter = iterable.iterator(); 
iter.hasNext(); ) {
+                AtlasVertex vertex = iter.next();
+                manager.checkProduce(vertex);
+            }
+        }
+    }
+
+    private static class ReindexConsumerBuilder implements 
WorkItemBuilder<ReindexConsumer, AtlasElement> {
+        private AtlasGraph graph;
+        private String[] indexNames;
+
+        public ReindexConsumerBuilder(AtlasGraph graph, String[] indexNames) {
+            this.graph = graph;
+            this.indexNames = indexNames;
+        }
+
+        @Override
+        public ReindexConsumer build(BlockingQueue queue) {
+            return new ReindexConsumer(queue, this.graph, this.indexNames);
+        }
+    }
+
+    private static class ReindexConsumer extends 
WorkItemConsumer<AtlasElement> {
+        private List<AtlasElement> list = new ArrayList();
+        private AtlasGraph graph;
+        private String[] indexNames;
+        private final AtomicLong counter;
+
+        public ReindexConsumer(BlockingQueue queue, AtlasGraph graph, String[] 
indexNames) {
+            super(queue);
+            this.graph = graph;
+            this.indexNames = indexNames;
+            this.counter = new AtomicLong(0);
+        }
+
+        @Override
+        protected void doCommit() {
+            if (list.size() >= ConcurrentPatchProcessor.BATCH_SIZE) {
+                attemptCommit();
+            }
+        }
+
+        @Override
+        protected void commitDirty() {
+            attemptCommit();
+
+            LOG.info("Total: Commit: {}", counter.get());
+            super.commitDirty();
+        }
+
+        private void attemptCommit() {
+            for (String indexName : indexNames) {
+                try {
+                    this.graph.getManagementSystem().reindex(indexName, list);
+                }
+                catch (IllegalStateException e) {
+                    LOG.error("IllegalStateException: Exception", e);
+                    return;
+                }
+                catch (Exception exception) {
+                    LOG.error("Exception: {}", indexName, exception);
+                }
+            }
+
+            list.clear();
+            LOG.info("Processed: {}", counter.get());
+        }
+
+        @Override
+        protected void processItem(AtlasElement item) {
+            counter.incrementAndGet();
+            list.add(item);
+            commit();
+        }
+    }
+}

Reply via email to