This is an automated email from the ASF dual-hosted git repository.

radhikakundam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 882954e29 ATLAS-4738: Dynamic Index Recovery issues and improvements
882954e29 is described below

commit 882954e2969d6018d2bc8977f9dc18a6e3d1d5ce
Author: radhikakundam <radhikakun...@apache.org>
AuthorDate: Wed Mar 29 11:16:26 2023 -0700

    ATLAS-4738: Dynamic Index Recovery issues and improvements
    
    Signed-off-by: radhikakundam <radhikakun...@apache.org>
---
 .../main/java/org/apache/atlas/AtlasClientV2.java  |  18 +++
 .../org/apache/atlas/repository/Constants.java     |   9 +-
 .../graphdb/janus/AtlasJanusGraphDatabase.java     |   5 +-
 .../graphdb/janus/AtlasJanusGraphManagement.java   |   2 +-
 .../java/org/apache/atlas/AtlasConfiguration.java  |   3 +-
 .../repository/graph/IndexRecoveryService.java     | 121 +++++++++++++----
 .../apache/atlas/web/rest/IndexRecoveryREST.java   | 143 +++++++++++++++++++++
 .../atlas/web/integration/IndexRecoveryRestIT.java |  54 ++++++++
 8 files changed, 318 insertions(+), 37 deletions(-)

diff --git a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java 
b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
index 7c8e875a1..6f97da192 100644
--- a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
+++ b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
@@ -83,6 +83,7 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -137,6 +138,8 @@ public class AtlasClientV2 extends AtlasBaseClient {
     //Notification APIs
     private static final String NOTIFICATION_URI         = BASE_URI + 
"v2/notification";
 
+    //IndexRecovery APIs
+    private static final String INDEX_RECOVERY_URI = BASE_URI + 
"v2/indexrecovery";
 
     public AtlasClientV2(String[] baseUrl, String[] basicAuthUserNamePassword) 
{
         super(baseUrl, basicAuthUserNamePassword);
@@ -1031,6 +1034,18 @@ public class AtlasClientV2 extends AtlasBaseClient {
         callAPI(formatPathParameters(API_V2.POST_NOTIFICATIONS_TO_TOPIC, 
topic), (Class<?>) null, messages);
     }
 
+    public Map<String, String> getIndexRecoveryData() throws 
AtlasServiceException {
+        return callAPI(API_V2.GET_INDEX_RECOVERY_DATA, Map.class, null);
+    }
+
+    public void startIndexRecovery(Instant startTime) throws 
AtlasServiceException {
+        MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
+
+        queryParams.add("startTime", startTime != null ? 
String.valueOf(startTime) : null);
+
+        callAPI(API_V2.START_INDEX_RECOVERY, (Class<?>) null, queryParams);
+    }
+
     @VisibleForTesting
     public API formatPathWithParameter(API api, String... params) {
         return formatPathParameters(api, params);
@@ -1212,6 +1227,9 @@ public class AtlasClientV2 extends AtlasBaseClient {
 
         public static final API_V2 POST_NOTIFICATIONS_TO_TOPIC     = new 
API_V2(NOTIFICATION_URI + "/topic/%s", HttpMethod.POST, 
Response.Status.NO_CONTENT);
 
+        public static final API_V2 GET_INDEX_RECOVERY_DATA = new 
API_V2(INDEX_RECOVERY_URI , HttpMethod.GET, Response.Status.OK);
+        public static final API_V2 START_INDEX_RECOVERY    = new 
API_V2(INDEX_RECOVERY_URI + "/start", HttpMethod.POST, 
Response.Status.NO_CONTENT);
+
         // labels APIs
         public static final API_V2 ADD_LABELS                        = new 
API_V2(ENTITY_API + "guid/%s/labels", HttpMethod.PUT, 
Response.Status.NO_CONTENT);
         public static final API_V2 ADD_LABELS_BY_UNIQUE_ATTRIBUTE    = new 
API_V2(ENTITY_API + "uniqueAttribute/type/%s/labels", HttpMethod.PUT, 
Response.Status.NO_CONTENT);
diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java 
b/common/src/main/java/org/apache/atlas/repository/Constants.java
index c84e1b2d0..51b093284 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -233,10 +233,11 @@ public final class Constants {
     /**
      * Index Recovery vertex property keys.
      */
-    public static final String INDEX_RECOVERY_PREFIX                  = 
INTERNAL_PROPERTY_KEY_PREFIX + "idxRecovery_";
-    public static final String PROPERTY_KEY_INDEX_RECOVERY_NAME       = 
encodePropertyKey(INDEX_RECOVERY_PREFIX + "name");
-    public static final String PROPERTY_KEY_INDEX_RECOVERY_START_TIME = 
encodePropertyKey(INDEX_RECOVERY_PREFIX + "startTime");
-    public static final String PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME  = 
encodePropertyKey(INDEX_RECOVERY_PREFIX + "prevTime");
+    public static final String INDEX_RECOVERY_PREFIX                   = 
INTERNAL_PROPERTY_KEY_PREFIX + "idxRecovery_";
+    public static final String PROPERTY_KEY_INDEX_RECOVERY_NAME        = 
encodePropertyKey(INDEX_RECOVERY_PREFIX + "name");
+    public static final String PROPERTY_KEY_INDEX_RECOVERY_START_TIME  = 
encodePropertyKey(INDEX_RECOVERY_PREFIX + "startTime");
+    public static final String PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME   = 
encodePropertyKey(INDEX_RECOVERY_PREFIX + "prevTime");
+    public static final String PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME = 
encodePropertyKey(INDEX_RECOVERY_PREFIX + "customTime");
 
     public static final String SQOOP_SOURCE       = "sqoop";
     public static final String FALCON_SOURCE      = "falcon";
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
index cb3e8d993..115b681cc 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
@@ -21,6 +21,7 @@ package org.apache.atlas.repository.graphdb.janus;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.GraphDatabase;
@@ -77,9 +78,7 @@ public class AtlasJanusGraphDatabase implements 
GraphDatabase<AtlasJanusVertex,
     public static final String INDEX_BACKEND_ES           = "elasticsearch";
     public static final String GRAPH_TX_LOG_CONF          = "tx.log-tx";
     public static final String GRAPH_TX_LOG_VERBOSE_CONF  = 
"tx.recovery.verbose";
-    public static final String SOLR_INDEX_TX_LOG_TTL_CONF = 
"write.ahead.log.ttl.in.hours";
     public static final String GRAPH_TX_LOG_TTL_CONF      = "log.tx.ttl";
-    public static final long   DEFAULT_GRAPH_TX_LOG_TTL   = 72; //Hrs
 
     private static volatile AtlasJanusGraph atlasGraphInstance = null;
     private static volatile JanusGraph graphInstance;
@@ -233,7 +232,7 @@ public class AtlasJanusGraphDatabase implements 
GraphDatabase<AtlasJanusVertex,
     public static void configureTxLogBasedIndexRecovery() {
         try {
             boolean  recoveryEnabled = 
ApplicationProperties.get().getBoolean(INDEX_RECOVERY_CONF, 
DEFAULT_INDEX_RECOVERY);
-            long     ttl             = 
ApplicationProperties.get().getLong(SOLR_INDEX_TX_LOG_TTL_CONF, 
DEFAULT_GRAPH_TX_LOG_TTL);
+            long     ttl             = 
AtlasConfiguration.SOLR_INDEX_TX_LOG_TTL_CONF.getLong();
             Duration txLogTtlSecs    = 
Duration.ofSeconds(Duration.ofHours(ttl).getSeconds());
 
             Map<String, Object> properties = new HashMap<String, Object>() {{
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
index e7de83005..b3807dfc4 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
@@ -434,7 +434,7 @@ public class AtlasJanusGraphManagement implements 
AtlasGraphManagement {
                 TransactionRecovery txRecovery = (TransactionRecovery) 
txRecoveryObject;
                 StandardJanusGraph  janusGraph = (StandardJanusGraph) 
this.graph.getGraph();
 
-                LOG.info("stopIndexRecovery: Index Client is unhealthy. Index 
recovery: Paused!");
+                LOG.info("stopIndexRecovery: Index recovery: Paused!");
 
                 janusGraph.getBackend().getSystemTxLog().close();
 
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java 
b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 31ec605f3..58a2fa725 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -92,7 +92,8 @@ public enum AtlasConfiguration {
     TASKS_USE_ENABLED("atlas.tasks.enabled", true),
     SESSION_TIMEOUT_SECS("atlas.session.timeout.secs", -1),
     UPDATE_COMPOSITE_INDEX_STATUS("atlas.update.composite.index.status", true),
-    METRICS_TIME_TO_LIVE_HOURS( "atlas.metrics.ttl.hours", 336); // 14 days 
default
+    METRICS_TIME_TO_LIVE_HOURS( "atlas.metrics.ttl.hours", 336), // 14 days 
default
+       SOLR_INDEX_TX_LOG_TTL_CONF("write.ahead.log.ttl.in.hours", 240); //10 
days default
 
     private static final Configuration APPLICATION_PROPERTIES;
 
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
 
b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
index caf31634b..9bcad5475 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
@@ -19,6 +19,7 @@ package org.apache.atlas.repository.graph;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.listener.ActiveStateChangeHandler;
@@ -27,6 +28,7 @@ import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.service.Service;
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.math.NumberUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.core.annotation.Order;
@@ -35,12 +37,16 @@ import org.springframework.stereotype.Component;
 import javax.inject.Inject;
 import java.text.SimpleDateFormat;
 import java.time.Instant;
+import java.time.temporal.ChronoUnit;
 import java.util.Iterator;
 import java.util.TimeZone;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.apache.atlas.ApplicationProperties.DEFAULT_INDEX_RECOVERY;
 import static 
org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_NAME;
+import static 
org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME;
 import static 
org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME;
 import static 
org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_START_TIME;
 import static 
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
@@ -52,14 +58,14 @@ public class IndexRecoveryService implements Service, 
ActiveStateChangeHandler {
     private static final String DATE_FORMAT                               = 
"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
     private static final String INDEX_HEALTH_MONITOR_THREAD_NAME          = 
"index-health-monitor";
     private static final String SOLR_STATUS_CHECK_RETRY_INTERVAL          = 
"atlas.graph.index.status.check.frequency";
-    private static final String SOLR_INDEX_RECOVERY_CONFIGURED_START_TIME = 
"atlas.graph.index.recovery.start.time";
+    private static final String SOLR_INDEX_RECOVERY_CONFIGURED_START_TIME = 
"atlas.index.recovery.start.time";
     private static final long   SOLR_STATUS_RETRY_DEFAULT_MS              = 
30000; // 30 secs default
 
     private final Thread                 indexHealthMonitor;
-    private final RecoveryInfoManagement recoveryInfoManagement;
+    public  final RecoveryInfoManagement recoveryInfoManagement;
     private       Configuration          configuration;
     private       boolean                isIndexRecoveryEnabled;
-    private       RecoveryThread         recoveryThread;
+    public        RecoveryThread         recoveryThread;
 
     @Inject
     public IndexRecoveryService(Configuration config, AtlasGraph graph) {
@@ -151,7 +157,7 @@ public class IndexRecoveryService implements Service, 
ActiveStateChangeHandler {
         }
     }
 
-    private static class RecoveryThread implements Runnable {
+    public static class RecoveryThread implements Runnable {
         private final AtlasGraph             graph;
         private final RecoveryInfoManagement recoveryInfoManagement;
         private       long                   indexStatusCheckRetryMillis;
@@ -176,7 +182,7 @@ public class IndexRecoveryService implements Service, 
ActiveStateChangeHandler {
 
             while (shouldRun.get()) {
                 try {
-                    boolean isIdxHealthy = isIndexBackendHealthy();
+                    boolean isIdxHealthy = waitAndCheckIfIndexBackendHealthy();
 
                     if (this.txRecoveryObject == null && isIdxHealthy) {
                         startMonitoring();
@@ -207,42 +213,68 @@ public class IndexRecoveryService implements Service, 
ActiveStateChangeHandler {
             }
         }
 
-        private boolean isIndexBackendHealthy() throws AtlasException, 
InterruptedException {
+        private boolean waitAndCheckIfIndexBackendHealthy() throws 
AtlasException, InterruptedException {
             Thread.sleep(indexStatusCheckRetryMillis);
 
+            return isIndexBackendHealthy();
+        }
+
+        public boolean isIndexBackendHealthy() throws AtlasException {
             return this.graph.getGraphIndexClient().isHealthy();
         }
 
+        public void startMonitoringByUserRequest(Long startTime) {
+            startMonitoring(startTime);
+        }
+
         private void startMonitoring() {
-            Long startTime = null;
+            startMonitoring(recoveryInfoManagement.getStartTime());
+        }
+
+        private void startMonitoring(Long startTime) {
+            if (startTime == null || startTime == 0L) {
+                LOG.error("Index Recovery requested without start time");
+                return;
+            }
 
             try {
-                startTime        = recoveryInfoManagement.getStartTime();
                 txRecoveryObject = 
this.graph.getManagementSystem().startIndexRecovery(startTime);
 
                 printIndexRecoveryStats();
-            } catch (Exception e) {
-                LOG.error("Index Recovery: Start: Error!", e);
-            } finally {
+
                 LOG.info("Index Recovery: Started! Recovery time: {}", 
Instant.ofEpochMilli(startTime));
+            } catch (Exception e) {
+                LOG.error("Index Recovery with recovery time: {} failed", 
Instant.ofEpochMilli(startTime), e);
             }
         }
 
+        public void stopMonitoringByUserRequest() {
+            stopIndexRecovery();
+            LOG.info("Index Recovery: Stopped!");
+        }
+
         private void stopMonitoring() {
-            Instant newStartTime = 
Instant.now().minusMillis(indexStatusCheckRetryMillis);
+            stopIndexRecoveryAndUpdateStartTime();
+        }
+
+        private void stopIndexRecoveryAndUpdateStartTime() {
+            Instant newStartTime = Instant.now().minusMillis(2 * 
indexStatusCheckRetryMillis);
+
+            stopIndexRecovery();
+            
recoveryInfoManagement.updateStartTime(newStartTime.toEpochMilli());
+
+            LOG.info("Index Recovery: Stopped! Recovery time: {}", 
newStartTime);
+        }
 
+        private void stopIndexRecovery() {
             try {
                 
this.graph.getManagementSystem().stopIndexRecovery(txRecoveryObject);
 
-                
recoveryInfoManagement.updateStartTime(newStartTime.toEpochMilli());
-
                 printIndexRecoveryStats();
             } catch (Exception e) {
                 LOG.info("Index Recovery: Stopped! Error!", e);
             } finally {
                 this.txRecoveryObject = null;
-
-                LOG.info("Index Recovery: Stopped! Recovery time: {}", 
newStartTime);
             }
         }
 
@@ -252,7 +284,7 @@ public class IndexRecoveryService implements Service, 
ActiveStateChangeHandler {
     }
 
     @VisibleForTesting
-    static class RecoveryInfoManagement {
+    public static class RecoveryInfoManagement {
         private static final String INDEX_RECOVERY_TYPE_NAME = 
"__solrIndexRecoveryInfo";
 
         private final AtlasGraph graph;
@@ -261,20 +293,46 @@ public class IndexRecoveryService implements Service, 
ActiveStateChangeHandler {
             this.graph = graph;
         }
 
-        public void updateStartTime(Long startTime) {
+        public void updateStartTime(long time) {
+            updateIndexRecoveryTime(PROPERTY_KEY_INDEX_RECOVERY_START_TIME, 
time);
+        }
+
+        public void updateCustomStartTime(long time) {
+            updateIndexRecoveryTime(PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME, 
time);
+        }
+
+        public void updateIndexRecoveryTime(String timePropertyKey, long time) 
{
+            Map<String, String> indexRecoveryData = new HashMap<>();
+            indexRecoveryData.put(timePropertyKey, String.valueOf(time));
+            updateIndexRecoveryData(indexRecoveryData);
+        }
+
+        public void updateIndexRecoveryData(Map<String, String> 
indexRecoveryData) {
             try {
-                Long        prevStartTime = null;
-                AtlasVertex vertex        = findVertex();
+                Long        startTime          = 
NumberUtils.createLong(indexRecoveryData.get(PROPERTY_KEY_INDEX_RECOVERY_START_TIME));
+                Long        prevStartTime      = 
NumberUtils.createLong(indexRecoveryData.get(PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME));
+                Long        customStartTime    = 
NumberUtils.createLong(indexRecoveryData.get(PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME));
+                boolean     isStartTimeUpdated = startTime != null ? true : 
false;
 
+                AtlasVertex vertex = findVertex();
                 if (vertex == null) {
                     vertex = graph.addVertex();
+                    setEncodedProperty(vertex, 
PROPERTY_KEY_INDEX_RECOVERY_NAME, INDEX_RECOVERY_TYPE_NAME);
                 } else {
-                    prevStartTime = getStartTime(vertex);
+                    prevStartTime = isStartTimeUpdated ? getStartTime(vertex) 
: prevStartTime;
                 }
 
-                setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_NAME, 
INDEX_RECOVERY_TYPE_NAME);
-                setEncodedProperty(vertex, 
PROPERTY_KEY_INDEX_RECOVERY_START_TIME, startTime);
-                setEncodedProperty(vertex, 
PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME, prevStartTime);
+                if (startTime != null) {
+                    setEncodedProperty(vertex, 
PROPERTY_KEY_INDEX_RECOVERY_START_TIME, startTime);
+                }
+
+                if (prevStartTime != null) {
+                    setEncodedProperty(vertex, 
PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME, prevStartTime);
+                }
+
+                if (customStartTime != null) {
+                    setEncodedProperty(vertex, 
PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME, customStartTime);
+                }
 
             } catch (Exception ex) {
                 LOG.error("Error: Updating: {}!", ex);
@@ -290,10 +348,12 @@ public class IndexRecoveryService implements Service, 
ActiveStateChangeHandler {
         }
 
         private Long getStartTime(AtlasVertex vertex) {
+            Long defaultStartTime = getStartTimeByTxLogTTL();
+
             if (vertex == null) {
-                LOG.warn("Vertex passed is NULL: Returned is 0");
+                LOG.warn("Vertex passed is NULL: Returned is startTime by TTL 
{}", Instant.ofEpochMilli(defaultStartTime));
 
-                return 0L;
+                return defaultStartTime;
             }
 
             Long startTime = 0L;
@@ -304,10 +364,15 @@ public class IndexRecoveryService implements Service, 
ActiveStateChangeHandler {
                 LOG.error("Error retrieving startTime", e);
             }
 
-            return startTime;
+            return startTime == null || startTime == 0L ? defaultStartTime : 
startTime;
+        }
+
+        private Long getStartTimeByTxLogTTL() {
+            long ttl = AtlasConfiguration.SOLR_INDEX_TX_LOG_TTL_CONF.getLong();
+            return Instant.now().minus(ttl, ChronoUnit.HOURS).toEpochMilli();
         }
 
-        private AtlasVertex findVertex() {
+        public AtlasVertex findVertex() {
             AtlasGraphQuery       query   = 
graph.query().has(PROPERTY_KEY_INDEX_RECOVERY_NAME, INDEX_RECOVERY_TYPE_NAME);
             Iterator<AtlasVertex> results = query.vertices().iterator();
 
diff --git 
a/webapp/src/main/java/org/apache/atlas/web/rest/IndexRecoveryREST.java 
b/webapp/src/main/java/org/apache/atlas/web/rest/IndexRecoveryREST.java
new file mode 100644
index 000000000..72916d261
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/web/rest/IndexRecoveryREST.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.web.rest;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.annotation.Timed;
+import org.apache.atlas.authorize.AtlasAdminAccessRequest;
+import org.apache.atlas.authorize.AtlasAuthorizationUtils;
+import org.apache.atlas.authorize.AtlasPrivilege;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.repository.graph.IndexRecoveryService;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.utils.AtlasPerfTracer;
+import org.apache.atlas.web.util.DateTimeHelper;
+import org.apache.atlas.web.util.Servlets;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.format.annotation.DateTimeFormat;
+import org.springframework.stereotype.Service;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.atlas.repository.Constants.*;
+
+@Path("v2/indexrecovery")
+@Singleton
+@Service
+@Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
+@Produces({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
+public class IndexRecoveryREST {
+    private static final Logger LOG      = 
LoggerFactory.getLogger(IndexRecoveryREST.class);
+    private static final Logger PERF_LOG = 
AtlasPerfTracer.getPerfLogger("rest.IndexRecoveryREST");
+
+    private final IndexRecoveryService indexRecoveryService;
+    private final AtlasGraph           graph;
+
+    @Inject
+    IndexRecoveryREST(IndexRecoveryService indexRecoveryService, AtlasGraph 
graph) {
+        this.indexRecoveryService = indexRecoveryService;
+        this.graph                = graph;
+    }
+    /**
+     * @return Future index recovery start time and previous recovery start 
time if applicable
+     * @HTTP 200 If Index recovery data exists for the given entity
+     * @HTTP 400 Bad query parameters
+     */
+    @GET
+    @Timed
+    public Map<String, String> getIndexRecoveryData() {
+
+        AtlasPerfTracer     perf              = null;
+        Long                startTime         = null;
+        Long                prevTime          = null;
+        Long                customStartTime   = null;
+        Map<String, String> indexRecoveryData = new HashMap<>();
+
+        try {
+            if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, 
"IndexRecoveryREST.getIndexRecoveryData()");
+            }
+
+            AtlasVertex indexRecoveryVertex = 
indexRecoveryService.recoveryInfoManagement.findVertex();
+            if (indexRecoveryVertex != null) {
+                startTime        = 
indexRecoveryVertex.getProperty(PROPERTY_KEY_INDEX_RECOVERY_START_TIME, 
Long.class);
+                prevTime         = 
indexRecoveryVertex.getProperty(PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME, 
Long.class);
+                customStartTime  = 
indexRecoveryVertex.getProperty(PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME, 
Long.class);
+            }
+
+            
indexRecoveryData.put(getPropertyKeyByRemovingPrefix(PROPERTY_KEY_INDEX_RECOVERY_START_TIME),
 startTime != null ? Instant.ofEpochMilli(startTime).toString() : "Not 
applicable");
+            
indexRecoveryData.put(getPropertyKeyByRemovingPrefix(PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME),
 prevTime != null ? Instant.ofEpochMilli(prevTime).toString() : "Not 
applicable");
+            
indexRecoveryData.put(getPropertyKeyByRemovingPrefix(PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME),
 customStartTime != null ? Instant.ofEpochMilli(customStartTime).toString() : 
"Not applicable");
+
+        } finally {
+            AtlasPerfTracer.log(perf);
+        }
+
+        return indexRecoveryData;
+    }
+
+    @POST
+    @Path("/start")
+    public void startCustomIndexRecovery(@QueryParam("startTime") 
@DateTimeFormat(pattern = DateTimeHelper.ISO8601_FORMAT)
+                                                            final String 
startTime) throws AtlasBaseException, AtlasException {
+        AtlasPerfTracer perf = null;
+
+        try {
+            if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, 
"IndexRecoveryREST.getIndexRecoveryData()");
+            }
+
+            AtlasAuthorizationUtils.verifyAccess(new 
AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "to start dynamic index 
recovery by custom time");
+
+            if (startTime == null) {
+                throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, 
"Index Recovery requested without start time");
+            }
+
+            if (!indexRecoveryService.recoveryThread.isIndexBackendHealthy()) {
+                throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, 
"Index recovery can not be started - Solr Health: Unhealthy");
+            }
+
+            long startTimeMilli = Instant.parse(startTime).toEpochMilli();
+
+            indexRecoveryService.recoveryThread.stopMonitoringByUserRequest();
+
+            
indexRecoveryService.recoveryThread.startMonitoringByUserRequest(startTimeMilli);
+
+            
indexRecoveryService.recoveryInfoManagement.updateCustomStartTime(startTimeMilli);
+
+        } finally {
+            AtlasPerfTracer.log(perf);
+        }
+    }
+
+    public static String getPropertyKeyByRemovingPrefix(String propertyKey) {
+        return StringUtils.removeStart(propertyKey, INDEX_RECOVERY_PREFIX);
+    }
+}
\ No newline at end of file
diff --git 
a/webapp/src/test/java/org/apache/atlas/web/integration/IndexRecoveryRestIT.java
 
b/webapp/src/test/java/org/apache/atlas/web/integration/IndexRecoveryRestIT.java
new file mode 100644
index 000000000..1aaa6959d
--- /dev/null
+++ 
b/webapp/src/test/java/org/apache/atlas/web/integration/IndexRecoveryRestIT.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.web.integration;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.commons.lang.StringUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.time.Instant;
+import java.util.Map;
+
+import static org.apache.atlas.repository.Constants.*;
+
+
+public class IndexRecoveryRestIT extends BaseResourceIT {
+
+    @Test
+    public void startIndexRecovery() throws Exception {
+        Map<String, String> indexRecoveryDataBefore = 
atlasClientV2.getIndexRecoveryData();
+
+        try {
+            atlasClientV2.startIndexRecovery(null);
+        } catch (AtlasServiceException e) {
+            Assert.assertEquals(e.getStatus().getStatusCode(), 
AtlasErrorCode.BAD_REQUEST.getHttpCode().getStatusCode());
+        }
+
+        long now = System.currentTimeMillis();
+        atlasClientV2.startIndexRecovery(Instant.ofEpochMilli(now));
+
+        Map<String, String> indexRecoveryDataAfter = 
atlasClientV2.getIndexRecoveryData();
+
+        String customTimeKey = 
StringUtils.removeStart(PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME, 
INDEX_RECOVERY_PREFIX);
+        Assert.assertNotEquals(indexRecoveryDataBefore.get(customTimeKey), 
indexRecoveryDataAfter.get(customTimeKey));
+        Assert.assertEquals(Instant.ofEpochMilli(now).toString(), 
indexRecoveryDataAfter.get(customTimeKey));
+
+    }
+}

Reply via email to