This is an automated email from the ASF dual-hosted git repository. amestry pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 9854ceb ATLAS-4015: Add Re-indexing as JAVA_PATCH. 9854ceb is described below commit 9854ceb909401019c31b085d957b7200ba5e997d Author: Ashutosh Mestry <ames...@cloudera.com> AuthorDate: Tue Nov 10 09:47:00 2020 -0800 ATLAS-4015: Add Re-indexing as JAVA_PATCH. --- .../repository/graphdb/AtlasGraphManagement.java | 8 + .../graphdb/janus/AtlasJanusGraphManagement.java | 64 +++++++ .../java/org/apache/atlas/AtlasConfiguration.java | 3 +- .../repository/patches/AtlasPatchManager.java | 3 +- .../patches/ConcurrentPatchProcessor.java | 6 +- .../atlas/repository/patches/ReIndexPatch.java | 199 +++++++++++++++++++++ 6 files changed, 278 insertions(+), 5 deletions(-) diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java index f7d2e27..7e3b2f4 100644 --- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java +++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java @@ -172,4 +172,12 @@ public interface AtlasGraphManagement { * Set consistency to ConsistencyModifier.LOCK for all vertex and edge indexes. */ void updateUniqueIndexesForConsistencyLock(); + + /*** + * Re-index elements. + * @param indexName: Name of the index that needs to be operated on. + * @param elements: Elements to be re-indexed. + * @throws Exception + */ + void reindex(String indexName, List<AtlasElement> elements) throws Exception; } diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java index 2a2ef92..1cc7f8b 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java @@ -19,14 +19,22 @@ package org.apache.atlas.repository.graphdb.janus; import com.google.common.base.Preconditions; import org.apache.atlas.AtlasConfiguration; +import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; +import org.apache.atlas.repository.graphdb.AtlasElement; import org.apache.tinkerpop.gremlin.structure.Direction; import org.apache.tinkerpop.gremlin.structure.Element; import org.janusgraph.core.Cardinality; import org.janusgraph.core.EdgeLabel; +import org.janusgraph.core.JanusGraphElement; import org.janusgraph.core.PropertyKey; import org.janusgraph.core.schema.*; import org.janusgraph.core.schema.JanusGraphManagement.IndexBuilder; +import org.janusgraph.diskstorage.BackendTransaction; +import org.janusgraph.diskstorage.indexing.IndexEntry; +import org.janusgraph.graphdb.database.IndexSerializer; +import org.janusgraph.graphdb.database.StandardJanusGraph; +import org.janusgraph.graphdb.database.management.ManagementSystem; import org.janusgraph.graphdb.internal.Token; import org.apache.atlas.repository.graphdb.AtlasCardinality; import org.apache.atlas.repository.graphdb.AtlasEdgeLabel; @@ -36,11 +44,16 @@ import org.apache.atlas.repository.graphdb.AtlasPropertyKey; import org.apache.commons.lang.StringUtils; import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.janusgraph.graphdb.transaction.StandardJanusGraphTx; +import org.janusgraph.graphdb.types.IndexType; +import org.janusgraph.graphdb.types.MixedIndexType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -308,4 +321,55 @@ public class AtlasJanusGraphManagement implements AtlasGraphManagement { LOG.info("setConsistency: {}: {}: Done!", elementType.getSimpleName(), count); } } + + @Override + public void reindex(String indexName, List<AtlasElement> elements) throws Exception { + try { + JanusGraphIndex index = management.getGraphIndex(indexName); + if (index == null || !(management instanceof ManagementSystem) || !(graph.getGraph() instanceof StandardJanusGraph)) { + LOG.error("Could not retrieve index for name: {} ", indexName); + return; + } + + ManagementSystem managementSystem = (ManagementSystem) management; + IndexType indexType = managementSystem.getSchemaVertex(index).asIndexType(); + if (!(indexType instanceof MixedIndexType)) { + LOG.warn("Index: {}: Not of MixedIndexType ", indexName); + return; + } + + IndexSerializer indexSerializer = ((StandardJanusGraph) graph.getGraph()).getIndexSerializer(); + reindexElement(managementSystem, indexSerializer, (MixedIndexType) indexType, elements); + } catch (Exception exception) { + throw exception; + } finally { + management.commit(); + } + } + + private void reindexElement(ManagementSystem managementSystem, IndexSerializer indexSerializer, MixedIndexType indexType, List<AtlasElement> elements) throws Exception { + Map<String, Map<String, List<IndexEntry>>> documentsPerStore = new HashMap<>(); + StandardJanusGraphTx tx = managementSystem.getWrappedTx(); + BackendTransaction txHandle = tx.getTxHandle(); + + try { + JanusGraphElement janusGraphElement = null; + for (AtlasElement element : elements) { + try { + if (element == null || element.getWrappedElement() == null) { + continue; + } + + janusGraphElement = element.getWrappedElement(); + indexSerializer.reindexElement(janusGraphElement, indexType, documentsPerStore); + } catch (Exception e) { + LOG.warn("{}: Exception: {}:{}", indexType.getName(), e.getClass().getSimpleName(), e.getMessage()); + } + } + } finally { + if (txHandle != null) { + txHandle.getIndexTransaction(indexType.getBackingIndexName()).restore(documentsPerStore); + } + } + } } diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index 1c79158..2d3dfdf 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -73,7 +73,8 @@ public enum AtlasConfiguration { LINEAGE_USING_GREMLIN("atlas.lineage.query.use.gremlin", false), HTTP_HEADER_SERVER_VALUE("atlas.http.header.server.value","Apache Atlas"), - STORAGE_CONSISTENCY_LOCK_ENABLED("atlas.graph.storage.consistency-lock.enabled", true); + STORAGE_CONSISTENCY_LOCK_ENABLED("atlas.graph.storage.consistency-lock.enabled", true), + REINDEX_PATCH_ENABLED("atlas.patch.reindex.enabled", false); private static final Configuration APPLICATION_PROPERTIES; diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java index b142a2a..478376b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java +++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java @@ -54,7 +54,8 @@ public class AtlasPatchManager { new ClassificationTextPatch(context), new FreeTextRequestHandlerPatch(context), new SuggestionsRequestHandlerPatch(context), - new IndexConsistencyPatch(context) + new IndexConsistencyPatch(context), + new ReIndexPatch(context) }; try { diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java b/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java index c6f0e64..e5dcb2e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java @@ -43,8 +43,8 @@ public abstract class ConcurrentPatchProcessor { private static final String BATCH_SIZE_PROPERTY = "atlas.patch.batchSize"; private static final String ATLAS_SOLR_SHARDS = "ATLAS_SOLR_SHARDS"; private static final String WORKER_NAME_PREFIX = "patchWorkItem"; - private static final int NUM_WORKERS; - private static final int BATCH_SIZE; + public static final int NUM_WORKERS; + public static final int BATCH_SIZE; private final EntityGraphMapper entityGraphMapper; private final AtlasGraph graph; @@ -61,7 +61,7 @@ public abstract class ConcurrentPatchProcessor { numWorkers = config.getInt(NUM_WORKERS_PROPERTY, config.getInt(ATLAS_SOLR_SHARDS, 1) * 3); batchSize = config.getInt(BATCH_SIZE_PROPERTY, 300); - LOG.info("UniqueAttributePatch: {}={}, {}={}", NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize); + LOG.info("ConcurrentPatchProcessor: {}={}, {}={}", NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize); } catch (Exception e) { LOG.error("Error retrieving configuration.", e); } diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/ReIndexPatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/ReIndexPatch.java new file mode 100644 index 0000000..a47a2cc --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/patches/ReIndexPatch.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.patches; + +import org.apache.atlas.AtlasConfiguration; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.pc.WorkItemBuilder; +import org.apache.atlas.pc.WorkItemConsumer; +import org.apache.atlas.pc.WorkItemManager; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graphdb.AtlasElement; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + +import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN; + +public class ReIndexPatch extends AtlasPatchHandler { + private static final Logger LOG = LoggerFactory.getLogger(ReIndexPatch.class); + + private static final String PATCH_ID = "JAVA_PATCH_0000_006"; + private static final String PATCH_DESCRIPTION = "Performs reindex on all the indexes."; + + private final PatchContext context; + + public ReIndexPatch(PatchContext context) { + super(context.getPatchRegistry(), PATCH_ID, PATCH_DESCRIPTION); + this.context = context; + } + + @Override + public void apply() throws AtlasBaseException { + if (AtlasConfiguration.REINDEX_PATCH_ENABLED.getBoolean() == false) { + return; + } + + try { + LOG.info("ReIndexPatch: Starting..."); + ReindexPatchProcessor reindexPatchProcessor = new ReindexPatchProcessor(context); + + reindexPatchProcessor.repairVertices(); + reindexPatchProcessor.repairEdges(); + } catch (Exception exception) { + LOG.error("Error while reindexing.", exception); + } finally { + LOG.info("ReIndexPatch: Done!"); + } + + setStatus(UNKNOWN); + + LOG.info("ReIndexPatch.apply(): patchId={}, status={}", getPatchId(), getStatus()); + } + + public static class ReindexPatchProcessor { + private static String[] vertexIndexNames = new String[]{ Constants.VERTEX_INDEX, Constants.FULLTEXT_INDEX }; + private static String[] edgeIndexNames = new String[]{ Constants.EDGE_INDEX }; + private static String WORKER_PREFIX = "reindex"; + + private PatchContext context; + + public ReindexPatchProcessor(PatchContext context) { + this.context = context; + } + + public void repairVertices() { + repairElements(ReindexPatchProcessor::vertices, vertexIndexNames); + } + + public void repairEdges() { + repairElements(ReindexPatchProcessor::edges, edgeIndexNames); + } + + private void repairElements(BiConsumer<WorkItemManager, AtlasGraph> action, String[] indexNames) { + WorkItemManager manager = new WorkItemManager(new ReindexConsumerBuilder(context.getGraph(), indexNames), + WORKER_PREFIX, ConcurrentPatchProcessor.BATCH_SIZE, ConcurrentPatchProcessor.NUM_WORKERS, false); + + try { + LOG.info("repairElements.execute(): {}: Starting...", indexNames); + action.accept(manager, context.getGraph()); + manager.drain(); + } finally { + try { + manager.shutdown(); + } catch (InterruptedException e) { + LOG.error("repairEdges.execute(): interrupted during WorkItemManager shutdown.", e); + } + + LOG.info("repairElements.execute(): {}: Done!", indexNames); + } + } + + private static void edges(WorkItemManager manager, AtlasGraph graph) { + Iterable<Object> iterable = graph.getEdges(); + for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) { + manager.checkProduce(iter.next()); + } + } + + private static void vertices(WorkItemManager manager, AtlasGraph graph) { + Iterable<AtlasVertex> iterable = graph.getVertices(); + for (Iterator<AtlasVertex> iter = iterable.iterator(); iter.hasNext(); ) { + AtlasVertex vertex = iter.next(); + manager.checkProduce(vertex); + } + } + } + + private static class ReindexConsumerBuilder implements WorkItemBuilder<ReindexConsumer, AtlasElement> { + private AtlasGraph graph; + private String[] indexNames; + + public ReindexConsumerBuilder(AtlasGraph graph, String[] indexNames) { + this.graph = graph; + this.indexNames = indexNames; + } + + @Override + public ReindexConsumer build(BlockingQueue queue) { + return new ReindexConsumer(queue, this.graph, this.indexNames); + } + } + + private static class ReindexConsumer extends WorkItemConsumer<AtlasElement> { + private List<AtlasElement> list = new ArrayList(); + private AtlasGraph graph; + private String[] indexNames; + private final AtomicLong counter; + + public ReindexConsumer(BlockingQueue queue, AtlasGraph graph, String[] indexNames) { + super(queue); + this.graph = graph; + this.indexNames = indexNames; + this.counter = new AtomicLong(0); + } + + @Override + protected void doCommit() { + if (list.size() >= ConcurrentPatchProcessor.BATCH_SIZE) { + attemptCommit(); + } + } + + @Override + protected void commitDirty() { + attemptCommit(); + + LOG.info("Total: Commit: {}", counter.get()); + super.commitDirty(); + } + + private void attemptCommit() { + for (String indexName : indexNames) { + try { + this.graph.getManagementSystem().reindex(indexName, list); + } + catch (IllegalStateException e) { + LOG.error("IllegalStateException: Exception", e); + return; + } + catch (Exception exception) { + LOG.error("Exception: {}", indexName, exception); + } + } + + list.clear(); + LOG.info("Processed: {}", counter.get()); + } + + @Override + protected void processItem(AtlasElement item) { + counter.incrementAndGet(); + list.add(item); + commit(); + } + } +}