mcvsubbu commented on a change in pull request #4222: Add startup/shutdown 
checks for HelixServerStarter
URL: https://github.com/apache/incubator-pinot/pull/4222#discussion_r286101047
 
 

 ##########
 File path: 
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
 ##########
 @@ -352,135 +287,160 @@ public void stop() {
     _adminApiApplication.stop();
     setShuttingDownStatus(true);
 
-    // Total waiting time should include max query time.
-    final long endTime = _maxShutdownWaitTimeMs + System.currentTimeMillis();
-    if 
(_helixServerConfig.getBoolean(CommonConstants.Server.CONFIG_OF_ENABLE_SHUTDOWN_DELAY,
 true)) {
-      Uninterruptibles.sleepUninterruptibly(_maxQueryTimeMs, 
TimeUnit.MILLISECONDS);
+    long endTimeMs = startTimeMs + 
_serverConf.getLong(CONFIG_OF_SHUTDOWN_TIMEOUT_MS, DEFAULT_SHUTDOWN_TIMEOUT_MS);
+    if (_serverConf.getBoolean(CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK, 
DEFAULT_SHUTDOWN_ENABLE_QUERY_CHECK)) {
+      shutdownQueryCheck(endTimeMs);
     }
-    waitUntilNoIncomingQueries(System.currentTimeMillis(), endTime);
     _helixManager.disconnect();
     _serverInstance.shutDown();
-    waitUntilNoOnlineResources(System.currentTimeMillis(), endTime);
-  }
-
-  private void waitUntilNoIncomingQueries(long startTime, final long endTime) {
-    if (startTime >= endTime) {
-      LOGGER.warn("Skip waiting until no incoming queries.");
-      return;
+    if (_serverConf.getBoolean(CONFIG_OF_SHUTDOWN_ENABLE_RESOURCE_CHECK, 
DEFAULT_SHUTDOWN_ENABLE_RESOURCE_CHECK)) {
+      shutdownResourceCheck(endTimeMs);
     }
-    LOGGER.info("Waiting upto {}ms until Pinot server doesn't receive any 
incoming queries...", (endTime - startTime));
-    long currentTime = startTime;
+  }
 
-    while (currentTime < endTime) {
-      if (noIncomingQueries(currentTime)) {
-        LOGGER.info("No incoming query within {}ms. Total waiting Time: {}ms", 
_checkIntervalTimeMs,
-            (currentTime - startTime));
-        return;
+  /**
+   * When shutting down the server, drains the queries and waits for all the 
existing queries to be finished.
+   *
+   * @param endTimeMs Timeout for the check
+   */
+  private void shutdownQueryCheck(long endTimeMs) {
+    LOGGER.info("Starting shutdown query check");
+    long startTimeMs = System.currentTimeMillis();
+
+    long maxQueryTimeMs = 
_serverConf.getLong(CONFIG_OF_QUERY_EXECUTOR_TIMEOUT, 
DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
+    long noQueryThresholdMs = 
_serverConf.getLong(CONFIG_OF_SHUTDOWN_NO_QUERY_THRESHOLD_MS, maxQueryTimeMs);
+
+    // Drain queries
+    boolean queriesDrained = false;
+    long currentTimeMs;
+    while ((currentTimeMs = System.currentTimeMillis()) < endTimeMs) {
+      long latestQueryTimeMs = _serverInstance.getLatestQueryTime();
+      if (currentTimeMs >= latestQueryTimeMs + noQueryThresholdMs) {
+        LOGGER.info("Finished draining queries (no query received within {}ms) 
after {}ms",
+            currentTimeMs - latestQueryTimeMs, currentTimeMs - startTimeMs);
+        queriesDrained = true;
+        break;
       }
-
       try {
-        Thread.sleep(Math.min(_maxQueryTimeMs, (endTime - currentTime)));
+        Thread.sleep(Math.min(noQueryThresholdMs - latestQueryTimeMs, 
endTimeMs - currentTimeMs));
       } catch (InterruptedException e) {
-        LOGGER.error("Interrupted when waiting for Pinot server not to receive 
any queries.", e);
+        LOGGER.error("Got interrupted while draining queries", e);
         Thread.currentThread().interrupt();
-        return;
+        break;
+      }
+    }
+    if (queriesDrained) {
+      // Ensure all the existing queries are finished
+      long latestQueryFinishTimeMs = _serverInstance.getLatestQueryTime() + 
maxQueryTimeMs;
+      if (latestQueryFinishTimeMs > currentTimeMs) {
+        try {
+          Thread.sleep(latestQueryFinishTimeMs - currentTimeMs);
+        } catch (InterruptedException e) {
+          LOGGER.warn("Got interrupted while waiting for all the existing 
queries to be finished", e);
+        }
       }
-      currentTime = System.currentTimeMillis();
+    } else {
+      LOGGER.warn("Failed to drain queries within {}ms", 
System.currentTimeMillis() - startTimeMs);
     }
-    LOGGER.error("Reach timeout when waiting for no incoming queries! Max 
waiting time: {}ms", _maxShutdownWaitTimeMs);
   }
 
   /**
-   * Init a helix spectator to watch the external view updates.
+   * When shutting down the server, waits for all the resources turn OFFLINE 
(all partitions served by the server are
+   * neither ONLINE or CONSUMING).
+   *
+   * @param endTimeMs Timeout for the check
    */
-  private void waitUntilNoOnlineResources(long startTime, final long endTime) {
-    if (startTime >= endTime) {
-      LOGGER.warn("Skip waiting until no online resources.");
+  private void shutdownResourceCheck(long endTimeMs) {
+    LOGGER.info("Starting shutdown resource check");
+    long startTimeMs = System.currentTimeMillis();
+
+    if (startTimeMs >= endTimeMs) {
+      LOGGER.warn("Skipping shutdown resource check because shutdown timeout 
is already reached");
       return;
     }
-    LOGGER.info("Waiting upto {}ms until no online resources...", (endTime - 
startTime));
 
-    // Initialize a helix spectator.
-    HelixManager spectatorManager =
-        HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId, 
InstanceType.SPECTATOR, _zkServers);
+    HelixAdmin helixAdmin = null;
     try {
-      spectatorManager.connect();
-
-      Set<String> resources = 
fetchLatestTableResources(spectatorManager.getClusterManagmentTool());
+      helixAdmin = new ZKHelixAdmin(_zkServers);
+
+      // Monitor all enabled table resources that the server serves
+      Set<String> resourcesToMonitor = new HashSet<>();
+      for (String resourceName : 
helixAdmin.getResourcesInCluster(_helixClusterName)) {
+        if (TableNameBuilder.isTableResource(resourceName)) {
+          IdealState idealState = 
helixAdmin.getResourceIdealState(_helixClusterName, resourceName);
+          if (idealState == null || !idealState.isEnabled()) {
+            continue;
+          }
+          for (String partition : idealState.getPartitionSet()) {
+            if (idealState.getInstanceSet(partition).contains(_instanceId)) {
+              resourcesToMonitor.add(resourceName);
+              break;
+            }
+          }
+        }
+      }
 
-      long currentTime = startTime;
-      while (currentTime < endTime) {
-        if (noOnlineResources(spectatorManager, resources)) {
-          LOGGER.info("No online resource within {}ms. Total waiting Time: 
{}ms", _checkIntervalTimeMs,
-              (currentTime - startTime));
+      long checkIntervalMs = _serverConf
+          .getLong(CONFIG_OF_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS, 
DEFAULT_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS);
+      while (System.currentTimeMillis() < endTimeMs) {
+        Iterator<String> iterator = resourcesToMonitor.iterator();
+        while (iterator.hasNext()) {
+          if (isResourceOffline(helixAdmin, iterator.next())) {
+            iterator.remove();
+          } else {
+            // Do not check remaining resources if one resource is not OFFLINE
+            break;
+          }
+        }
+        if (resourcesToMonitor.isEmpty()) {
+          LOGGER.info("All resources are OFFLINE after {}ms", 
System.currentTimeMillis() - startTimeMs);
           return;
         }
-
         try {
-          Thread.sleep(Math.min(_checkIntervalTimeMs, (endTime - 
currentTime)));
+          Thread.sleep(Math.min(checkIntervalMs, endTimeMs - 
System.currentTimeMillis()));
         } catch (InterruptedException e) {
-          LOGGER.error("Interrupted when waiting for no online resources.", e);
+          LOGGER.warn("Got interrupted while waiting for all resources 
OFFLINE", e);
 
 Review comment:
   return if we got interrupted

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to