This is an automated email from the ASF dual-hosted git repository. radhikakundam pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new 882954e29 ATLAS-4738: Dynamic Index Recovery issues and improvements 882954e29 is described below commit 882954e2969d6018d2bc8977f9dc18a6e3d1d5ce Author: radhikakundam <radhikakun...@apache.org> AuthorDate: Wed Mar 29 11:16:26 2023 -0700 ATLAS-4738: Dynamic Index Recovery issues and improvements Signed-off-by: radhikakundam <radhikakun...@apache.org> --- .../main/java/org/apache/atlas/AtlasClientV2.java | 18 +++ .../org/apache/atlas/repository/Constants.java | 9 +- .../graphdb/janus/AtlasJanusGraphDatabase.java | 5 +- .../graphdb/janus/AtlasJanusGraphManagement.java | 2 +- .../java/org/apache/atlas/AtlasConfiguration.java | 3 +- .../repository/graph/IndexRecoveryService.java | 121 +++++++++++++---- .../apache/atlas/web/rest/IndexRecoveryREST.java | 143 +++++++++++++++++++++ .../atlas/web/integration/IndexRecoveryRestIT.java | 54 ++++++++ 8 files changed, 318 insertions(+), 37 deletions(-) diff --git a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java index 7c8e875a1..6f97da192 100644 --- a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java +++ b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java @@ -83,6 +83,7 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.InputStream; import java.io.InputStreamReader; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -137,6 +138,8 @@ public class AtlasClientV2 extends AtlasBaseClient { //Notification APIs private static final String NOTIFICATION_URI = BASE_URI + "v2/notification"; + //IndexRecovery APIs + private static final String INDEX_RECOVERY_URI = BASE_URI + "v2/indexrecovery"; public AtlasClientV2(String[] baseUrl, String[] basicAuthUserNamePassword) { super(baseUrl, basicAuthUserNamePassword); @@ -1031,6 +1034,18 @@ public class AtlasClientV2 extends AtlasBaseClient { callAPI(formatPathParameters(API_V2.POST_NOTIFICATIONS_TO_TOPIC, topic), (Class<?>) null, messages); } + public Map<String, String> getIndexRecoveryData() throws AtlasServiceException { + return callAPI(API_V2.GET_INDEX_RECOVERY_DATA, Map.class, null); + } + + public void startIndexRecovery(Instant startTime) throws AtlasServiceException { + MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl(); + + queryParams.add("startTime", startTime != null ? String.valueOf(startTime) : null); + + callAPI(API_V2.START_INDEX_RECOVERY, (Class<?>) null, queryParams); + } + @VisibleForTesting public API formatPathWithParameter(API api, String... params) { return formatPathParameters(api, params); @@ -1212,6 +1227,9 @@ public class AtlasClientV2 extends AtlasBaseClient { public static final API_V2 POST_NOTIFICATIONS_TO_TOPIC = new API_V2(NOTIFICATION_URI + "/topic/%s", HttpMethod.POST, Response.Status.NO_CONTENT); + public static final API_V2 GET_INDEX_RECOVERY_DATA = new API_V2(INDEX_RECOVERY_URI , HttpMethod.GET, Response.Status.OK); + public static final API_V2 START_INDEX_RECOVERY = new API_V2(INDEX_RECOVERY_URI + "/start", HttpMethod.POST, Response.Status.NO_CONTENT); + // labels APIs public static final API_V2 ADD_LABELS = new API_V2(ENTITY_API + "guid/%s/labels", HttpMethod.PUT, Response.Status.NO_CONTENT); public static final API_V2 ADD_LABELS_BY_UNIQUE_ATTRIBUTE = new API_V2(ENTITY_API + "uniqueAttribute/type/%s/labels", HttpMethod.PUT, Response.Status.NO_CONTENT); diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index c84e1b2d0..51b093284 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -233,10 +233,11 @@ public final class Constants { /** * Index Recovery vertex property keys. */ - public static final String INDEX_RECOVERY_PREFIX = INTERNAL_PROPERTY_KEY_PREFIX + "idxRecovery_"; - public static final String PROPERTY_KEY_INDEX_RECOVERY_NAME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "name"); - public static final String PROPERTY_KEY_INDEX_RECOVERY_START_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "startTime"); - public static final String PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "prevTime"); + public static final String INDEX_RECOVERY_PREFIX = INTERNAL_PROPERTY_KEY_PREFIX + "idxRecovery_"; + public static final String PROPERTY_KEY_INDEX_RECOVERY_NAME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "name"); + public static final String PROPERTY_KEY_INDEX_RECOVERY_START_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "startTime"); + public static final String PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "prevTime"); + public static final String PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "customTime"); public static final String SQOOP_SOURCE = "sqoop"; public static final String FALCON_SOURCE = "falcon"; diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java index cb3e8d993..115b681cc 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java @@ -21,6 +21,7 @@ package org.apache.atlas.repository.graphdb.janus; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasException; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.GraphDatabase; @@ -77,9 +78,7 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, public static final String INDEX_BACKEND_ES = "elasticsearch"; public static final String GRAPH_TX_LOG_CONF = "tx.log-tx"; public static final String GRAPH_TX_LOG_VERBOSE_CONF = "tx.recovery.verbose"; - public static final String SOLR_INDEX_TX_LOG_TTL_CONF = "write.ahead.log.ttl.in.hours"; public static final String GRAPH_TX_LOG_TTL_CONF = "log.tx.ttl"; - public static final long DEFAULT_GRAPH_TX_LOG_TTL = 72; //Hrs private static volatile AtlasJanusGraph atlasGraphInstance = null; private static volatile JanusGraph graphInstance; @@ -233,7 +232,7 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, public static void configureTxLogBasedIndexRecovery() { try { boolean recoveryEnabled = ApplicationProperties.get().getBoolean(INDEX_RECOVERY_CONF, DEFAULT_INDEX_RECOVERY); - long ttl = ApplicationProperties.get().getLong(SOLR_INDEX_TX_LOG_TTL_CONF, DEFAULT_GRAPH_TX_LOG_TTL); + long ttl = AtlasConfiguration.SOLR_INDEX_TX_LOG_TTL_CONF.getLong(); Duration txLogTtlSecs = Duration.ofSeconds(Duration.ofHours(ttl).getSeconds()); Map<String, Object> properties = new HashMap<String, Object>() {{ diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java index e7de83005..b3807dfc4 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java @@ -434,7 +434,7 @@ public class AtlasJanusGraphManagement implements AtlasGraphManagement { TransactionRecovery txRecovery = (TransactionRecovery) txRecoveryObject; StandardJanusGraph janusGraph = (StandardJanusGraph) this.graph.getGraph(); - LOG.info("stopIndexRecovery: Index Client is unhealthy. Index recovery: Paused!"); + LOG.info("stopIndexRecovery: Index recovery: Paused!"); janusGraph.getBackend().getSystemTxLog().close(); diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index 31ec605f3..58a2fa725 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -92,7 +92,8 @@ public enum AtlasConfiguration { TASKS_USE_ENABLED("atlas.tasks.enabled", true), SESSION_TIMEOUT_SECS("atlas.session.timeout.secs", -1), UPDATE_COMPOSITE_INDEX_STATUS("atlas.update.composite.index.status", true), - METRICS_TIME_TO_LIVE_HOURS( "atlas.metrics.ttl.hours", 336); // 14 days default + METRICS_TIME_TO_LIVE_HOURS( "atlas.metrics.ttl.hours", 336), // 14 days default + SOLR_INDEX_TX_LOG_TTL_CONF("write.ahead.log.ttl.in.hours", 240); //10 days default private static final Configuration APPLICATION_PROPERTIES; diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java index caf31634b..9bcad5475 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java @@ -19,6 +19,7 @@ package org.apache.atlas.repository.graph; import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasException; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.listener.ActiveStateChangeHandler; @@ -27,6 +28,7 @@ import org.apache.atlas.repository.graphdb.AtlasGraphQuery; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.service.Service; import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.math.NumberUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.annotation.Order; @@ -35,12 +37,16 @@ import org.springframework.stereotype.Component; import javax.inject.Inject; import java.text.SimpleDateFormat; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Iterator; import java.util.TimeZone; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.HashMap; +import java.util.Map; import static org.apache.atlas.ApplicationProperties.DEFAULT_INDEX_RECOVERY; import static org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_NAME; +import static org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME; import static org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME; import static org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_START_TIME; import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty; @@ -52,14 +58,14 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler { private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; private static final String INDEX_HEALTH_MONITOR_THREAD_NAME = "index-health-monitor"; private static final String SOLR_STATUS_CHECK_RETRY_INTERVAL = "atlas.graph.index.status.check.frequency"; - private static final String SOLR_INDEX_RECOVERY_CONFIGURED_START_TIME = "atlas.graph.index.recovery.start.time"; + private static final String SOLR_INDEX_RECOVERY_CONFIGURED_START_TIME = "atlas.index.recovery.start.time"; private static final long SOLR_STATUS_RETRY_DEFAULT_MS = 30000; // 30 secs default private final Thread indexHealthMonitor; - private final RecoveryInfoManagement recoveryInfoManagement; + public final RecoveryInfoManagement recoveryInfoManagement; private Configuration configuration; private boolean isIndexRecoveryEnabled; - private RecoveryThread recoveryThread; + public RecoveryThread recoveryThread; @Inject public IndexRecoveryService(Configuration config, AtlasGraph graph) { @@ -151,7 +157,7 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler { } } - private static class RecoveryThread implements Runnable { + public static class RecoveryThread implements Runnable { private final AtlasGraph graph; private final RecoveryInfoManagement recoveryInfoManagement; private long indexStatusCheckRetryMillis; @@ -176,7 +182,7 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler { while (shouldRun.get()) { try { - boolean isIdxHealthy = isIndexBackendHealthy(); + boolean isIdxHealthy = waitAndCheckIfIndexBackendHealthy(); if (this.txRecoveryObject == null && isIdxHealthy) { startMonitoring(); @@ -207,42 +213,68 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler { } } - private boolean isIndexBackendHealthy() throws AtlasException, InterruptedException { + private boolean waitAndCheckIfIndexBackendHealthy() throws AtlasException, InterruptedException { Thread.sleep(indexStatusCheckRetryMillis); + return isIndexBackendHealthy(); + } + + public boolean isIndexBackendHealthy() throws AtlasException { return this.graph.getGraphIndexClient().isHealthy(); } + public void startMonitoringByUserRequest(Long startTime) { + startMonitoring(startTime); + } + private void startMonitoring() { - Long startTime = null; + startMonitoring(recoveryInfoManagement.getStartTime()); + } + + private void startMonitoring(Long startTime) { + if (startTime == null || startTime == 0L) { + LOG.error("Index Recovery requested without start time"); + return; + } try { - startTime = recoveryInfoManagement.getStartTime(); txRecoveryObject = this.graph.getManagementSystem().startIndexRecovery(startTime); printIndexRecoveryStats(); - } catch (Exception e) { - LOG.error("Index Recovery: Start: Error!", e); - } finally { + LOG.info("Index Recovery: Started! Recovery time: {}", Instant.ofEpochMilli(startTime)); + } catch (Exception e) { + LOG.error("Index Recovery with recovery time: {} failed", Instant.ofEpochMilli(startTime), e); } } + public void stopMonitoringByUserRequest() { + stopIndexRecovery(); + LOG.info("Index Recovery: Stopped!"); + } + private void stopMonitoring() { - Instant newStartTime = Instant.now().minusMillis(indexStatusCheckRetryMillis); + stopIndexRecoveryAndUpdateStartTime(); + } + + private void stopIndexRecoveryAndUpdateStartTime() { + Instant newStartTime = Instant.now().minusMillis(2 * indexStatusCheckRetryMillis); + + stopIndexRecovery(); + recoveryInfoManagement.updateStartTime(newStartTime.toEpochMilli()); + + LOG.info("Index Recovery: Stopped! Recovery time: {}", newStartTime); + } + private void stopIndexRecovery() { try { this.graph.getManagementSystem().stopIndexRecovery(txRecoveryObject); - recoveryInfoManagement.updateStartTime(newStartTime.toEpochMilli()); - printIndexRecoveryStats(); } catch (Exception e) { LOG.info("Index Recovery: Stopped! Error!", e); } finally { this.txRecoveryObject = null; - - LOG.info("Index Recovery: Stopped! Recovery time: {}", newStartTime); } } @@ -252,7 +284,7 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler { } @VisibleForTesting - static class RecoveryInfoManagement { + public static class RecoveryInfoManagement { private static final String INDEX_RECOVERY_TYPE_NAME = "__solrIndexRecoveryInfo"; private final AtlasGraph graph; @@ -261,20 +293,46 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler { this.graph = graph; } - public void updateStartTime(Long startTime) { + public void updateStartTime(long time) { + updateIndexRecoveryTime(PROPERTY_KEY_INDEX_RECOVERY_START_TIME, time); + } + + public void updateCustomStartTime(long time) { + updateIndexRecoveryTime(PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME, time); + } + + public void updateIndexRecoveryTime(String timePropertyKey, long time) { + Map<String, String> indexRecoveryData = new HashMap<>(); + indexRecoveryData.put(timePropertyKey, String.valueOf(time)); + updateIndexRecoveryData(indexRecoveryData); + } + + public void updateIndexRecoveryData(Map<String, String> indexRecoveryData) { try { - Long prevStartTime = null; - AtlasVertex vertex = findVertex(); + Long startTime = NumberUtils.createLong(indexRecoveryData.get(PROPERTY_KEY_INDEX_RECOVERY_START_TIME)); + Long prevStartTime = NumberUtils.createLong(indexRecoveryData.get(PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME)); + Long customStartTime = NumberUtils.createLong(indexRecoveryData.get(PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME)); + boolean isStartTimeUpdated = startTime != null ? true : false; + AtlasVertex vertex = findVertex(); if (vertex == null) { vertex = graph.addVertex(); + setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_NAME, INDEX_RECOVERY_TYPE_NAME); } else { - prevStartTime = getStartTime(vertex); + prevStartTime = isStartTimeUpdated ? getStartTime(vertex) : prevStartTime; } - setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_NAME, INDEX_RECOVERY_TYPE_NAME); - setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_START_TIME, startTime); - setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME, prevStartTime); + if (startTime != null) { + setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_START_TIME, startTime); + } + + if (prevStartTime != null) { + setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME, prevStartTime); + } + + if (customStartTime != null) { + setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME, customStartTime); + } } catch (Exception ex) { LOG.error("Error: Updating: {}!", ex); @@ -290,10 +348,12 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler { } private Long getStartTime(AtlasVertex vertex) { + Long defaultStartTime = getStartTimeByTxLogTTL(); + if (vertex == null) { - LOG.warn("Vertex passed is NULL: Returned is 0"); + LOG.warn("Vertex passed is NULL: Returned is startTime by TTL {}", Instant.ofEpochMilli(defaultStartTime)); - return 0L; + return defaultStartTime; } Long startTime = 0L; @@ -304,10 +364,15 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler { LOG.error("Error retrieving startTime", e); } - return startTime; + return startTime == null || startTime == 0L ? defaultStartTime : startTime; + } + + private Long getStartTimeByTxLogTTL() { + long ttl = AtlasConfiguration.SOLR_INDEX_TX_LOG_TTL_CONF.getLong(); + return Instant.now().minus(ttl, ChronoUnit.HOURS).toEpochMilli(); } - private AtlasVertex findVertex() { + public AtlasVertex findVertex() { AtlasGraphQuery query = graph.query().has(PROPERTY_KEY_INDEX_RECOVERY_NAME, INDEX_RECOVERY_TYPE_NAME); Iterator<AtlasVertex> results = query.vertices().iterator(); diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/IndexRecoveryREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/IndexRecoveryREST.java new file mode 100644 index 000000000..72916d261 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/rest/IndexRecoveryREST.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.web.rest; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.AtlasException; +import org.apache.atlas.annotation.Timed; +import org.apache.atlas.authorize.AtlasAdminAccessRequest; +import org.apache.atlas.authorize.AtlasAuthorizationUtils; +import org.apache.atlas.authorize.AtlasPrivilege; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.repository.graph.IndexRecoveryService; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.utils.AtlasPerfTracer; +import org.apache.atlas.web.util.DateTimeHelper; +import org.apache.atlas.web.util.Servlets; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.format.annotation.DateTimeFormat; +import org.springframework.stereotype.Service; + +import javax.inject.Inject; +import javax.inject.Singleton; + +import javax.ws.rs.*; +import javax.ws.rs.core.MediaType; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.atlas.repository.Constants.*; + +@Path("v2/indexrecovery") +@Singleton +@Service +@Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON}) +@Produces({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON}) +public class IndexRecoveryREST { + private static final Logger LOG = LoggerFactory.getLogger(IndexRecoveryREST.class); + private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("rest.IndexRecoveryREST"); + + private final IndexRecoveryService indexRecoveryService; + private final AtlasGraph graph; + + @Inject + IndexRecoveryREST(IndexRecoveryService indexRecoveryService, AtlasGraph graph) { + this.indexRecoveryService = indexRecoveryService; + this.graph = graph; + } + /** + * @return Future index recovery start time and previous recovery start time if applicable + * @HTTP 200 If Index recovery data exists for the given entity + * @HTTP 400 Bad query parameters + */ + @GET + @Timed + public Map<String, String> getIndexRecoveryData() { + + AtlasPerfTracer perf = null; + Long startTime = null; + Long prevTime = null; + Long customStartTime = null; + Map<String, String> indexRecoveryData = new HashMap<>(); + + try { + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "IndexRecoveryREST.getIndexRecoveryData()"); + } + + AtlasVertex indexRecoveryVertex = indexRecoveryService.recoveryInfoManagement.findVertex(); + if (indexRecoveryVertex != null) { + startTime = indexRecoveryVertex.getProperty(PROPERTY_KEY_INDEX_RECOVERY_START_TIME, Long.class); + prevTime = indexRecoveryVertex.getProperty(PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME, Long.class); + customStartTime = indexRecoveryVertex.getProperty(PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME, Long.class); + } + + indexRecoveryData.put(getPropertyKeyByRemovingPrefix(PROPERTY_KEY_INDEX_RECOVERY_START_TIME), startTime != null ? Instant.ofEpochMilli(startTime).toString() : "Not applicable"); + indexRecoveryData.put(getPropertyKeyByRemovingPrefix(PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME), prevTime != null ? Instant.ofEpochMilli(prevTime).toString() : "Not applicable"); + indexRecoveryData.put(getPropertyKeyByRemovingPrefix(PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME), customStartTime != null ? Instant.ofEpochMilli(customStartTime).toString() : "Not applicable"); + + } finally { + AtlasPerfTracer.log(perf); + } + + return indexRecoveryData; + } + + @POST + @Path("/start") + public void startCustomIndexRecovery(@QueryParam("startTime") @DateTimeFormat(pattern = DateTimeHelper.ISO8601_FORMAT) + final String startTime) throws AtlasBaseException, AtlasException { + AtlasPerfTracer perf = null; + + try { + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "IndexRecoveryREST.getIndexRecoveryData()"); + } + + AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "to start dynamic index recovery by custom time"); + + if (startTime == null) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Index Recovery requested without start time"); + } + + if (!indexRecoveryService.recoveryThread.isIndexBackendHealthy()) { + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "Index recovery can not be started - Solr Health: Unhealthy"); + } + + long startTimeMilli = Instant.parse(startTime).toEpochMilli(); + + indexRecoveryService.recoveryThread.stopMonitoringByUserRequest(); + + indexRecoveryService.recoveryThread.startMonitoringByUserRequest(startTimeMilli); + + indexRecoveryService.recoveryInfoManagement.updateCustomStartTime(startTimeMilli); + + } finally { + AtlasPerfTracer.log(perf); + } + } + + public static String getPropertyKeyByRemovingPrefix(String propertyKey) { + return StringUtils.removeStart(propertyKey, INDEX_RECOVERY_PREFIX); + } +} \ No newline at end of file diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/IndexRecoveryRestIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/IndexRecoveryRestIT.java new file mode 100644 index 000000000..1aaa6959d --- /dev/null +++ b/webapp/src/test/java/org/apache/atlas/web/integration/IndexRecoveryRestIT.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.web.integration; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.AtlasServiceException; +import org.apache.commons.lang.StringUtils; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.time.Instant; +import java.util.Map; + +import static org.apache.atlas.repository.Constants.*; + + +public class IndexRecoveryRestIT extends BaseResourceIT { + + @Test + public void startIndexRecovery() throws Exception { + Map<String, String> indexRecoveryDataBefore = atlasClientV2.getIndexRecoveryData(); + + try { + atlasClientV2.startIndexRecovery(null); + } catch (AtlasServiceException e) { + Assert.assertEquals(e.getStatus().getStatusCode(), AtlasErrorCode.BAD_REQUEST.getHttpCode().getStatusCode()); + } + + long now = System.currentTimeMillis(); + atlasClientV2.startIndexRecovery(Instant.ofEpochMilli(now)); + + Map<String, String> indexRecoveryDataAfter = atlasClientV2.getIndexRecoveryData(); + + String customTimeKey = StringUtils.removeStart(PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME, INDEX_RECOVERY_PREFIX); + Assert.assertNotEquals(indexRecoveryDataBefore.get(customTimeKey), indexRecoveryDataAfter.get(customTimeKey)); + Assert.assertEquals(Instant.ofEpochMilli(now).toString(), indexRecoveryDataAfter.get(customTimeKey)); + + } +}