mcvsubbu commented on a change in pull request #4222: Add startup/shutdown checks for HelixServerStarter URL: https://github.com/apache/incubator-pinot/pull/4222#discussion_r286101047
########## File path: pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java ########## @@ -352,135 +287,160 @@ public void stop() { _adminApiApplication.stop(); setShuttingDownStatus(true); - // Total waiting time should include max query time. - final long endTime = _maxShutdownWaitTimeMs + System.currentTimeMillis(); - if (_helixServerConfig.getBoolean(CommonConstants.Server.CONFIG_OF_ENABLE_SHUTDOWN_DELAY, true)) { - Uninterruptibles.sleepUninterruptibly(_maxQueryTimeMs, TimeUnit.MILLISECONDS); + long endTimeMs = startTimeMs + _serverConf.getLong(CONFIG_OF_SHUTDOWN_TIMEOUT_MS, DEFAULT_SHUTDOWN_TIMEOUT_MS); + if (_serverConf.getBoolean(CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK, DEFAULT_SHUTDOWN_ENABLE_QUERY_CHECK)) { + shutdownQueryCheck(endTimeMs); } - waitUntilNoIncomingQueries(System.currentTimeMillis(), endTime); _helixManager.disconnect(); _serverInstance.shutDown(); - waitUntilNoOnlineResources(System.currentTimeMillis(), endTime); - } - - private void waitUntilNoIncomingQueries(long startTime, final long endTime) { - if (startTime >= endTime) { - LOGGER.warn("Skip waiting until no incoming queries."); - return; + if (_serverConf.getBoolean(CONFIG_OF_SHUTDOWN_ENABLE_RESOURCE_CHECK, DEFAULT_SHUTDOWN_ENABLE_RESOURCE_CHECK)) { + shutdownResourceCheck(endTimeMs); } - LOGGER.info("Waiting upto {}ms until Pinot server doesn't receive any incoming queries...", (endTime - startTime)); - long currentTime = startTime; + } - while (currentTime < endTime) { - if (noIncomingQueries(currentTime)) { - LOGGER.info("No incoming query within {}ms. Total waiting Time: {}ms", _checkIntervalTimeMs, - (currentTime - startTime)); - return; + /** + * When shutting down the server, drains the queries and waits for all the existing queries to be finished. + * + * @param endTimeMs Timeout for the check + */ + private void shutdownQueryCheck(long endTimeMs) { + LOGGER.info("Starting shutdown query check"); + long startTimeMs = System.currentTimeMillis(); + + long maxQueryTimeMs = _serverConf.getLong(CONFIG_OF_QUERY_EXECUTOR_TIMEOUT, DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS); + long noQueryThresholdMs = _serverConf.getLong(CONFIG_OF_SHUTDOWN_NO_QUERY_THRESHOLD_MS, maxQueryTimeMs); + + // Drain queries + boolean queriesDrained = false; + long currentTimeMs; + while ((currentTimeMs = System.currentTimeMillis()) < endTimeMs) { + long latestQueryTimeMs = _serverInstance.getLatestQueryTime(); + if (currentTimeMs >= latestQueryTimeMs + noQueryThresholdMs) { + LOGGER.info("Finished draining queries (no query received within {}ms) after {}ms", + currentTimeMs - latestQueryTimeMs, currentTimeMs - startTimeMs); + queriesDrained = true; + break; } - try { - Thread.sleep(Math.min(_maxQueryTimeMs, (endTime - currentTime))); + Thread.sleep(Math.min(noQueryThresholdMs - latestQueryTimeMs, endTimeMs - currentTimeMs)); } catch (InterruptedException e) { - LOGGER.error("Interrupted when waiting for Pinot server not to receive any queries.", e); + LOGGER.error("Got interrupted while draining queries", e); Thread.currentThread().interrupt(); - return; + break; + } + } + if (queriesDrained) { + // Ensure all the existing queries are finished + long latestQueryFinishTimeMs = _serverInstance.getLatestQueryTime() + maxQueryTimeMs; + if (latestQueryFinishTimeMs > currentTimeMs) { + try { + Thread.sleep(latestQueryFinishTimeMs - currentTimeMs); + } catch (InterruptedException e) { + LOGGER.warn("Got interrupted while waiting for all the existing queries to be finished", e); + } } - currentTime = System.currentTimeMillis(); + } else { + LOGGER.warn("Failed to drain queries within {}ms", System.currentTimeMillis() - startTimeMs); } - LOGGER.error("Reach timeout when waiting for no incoming queries! Max waiting time: {}ms", _maxShutdownWaitTimeMs); } /** - * Init a helix spectator to watch the external view updates. + * When shutting down the server, waits for all the resources turn OFFLINE (all partitions served by the server are + * neither ONLINE or CONSUMING). + * + * @param endTimeMs Timeout for the check */ - private void waitUntilNoOnlineResources(long startTime, final long endTime) { - if (startTime >= endTime) { - LOGGER.warn("Skip waiting until no online resources."); + private void shutdownResourceCheck(long endTimeMs) { + LOGGER.info("Starting shutdown resource check"); + long startTimeMs = System.currentTimeMillis(); + + if (startTimeMs >= endTimeMs) { + LOGGER.warn("Skipping shutdown resource check because shutdown timeout is already reached"); return; } - LOGGER.info("Waiting upto {}ms until no online resources...", (endTime - startTime)); - // Initialize a helix spectator. - HelixManager spectatorManager = - HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId, InstanceType.SPECTATOR, _zkServers); + HelixAdmin helixAdmin = null; try { - spectatorManager.connect(); - - Set<String> resources = fetchLatestTableResources(spectatorManager.getClusterManagmentTool()); + helixAdmin = new ZKHelixAdmin(_zkServers); + + // Monitor all enabled table resources that the server serves + Set<String> resourcesToMonitor = new HashSet<>(); + for (String resourceName : helixAdmin.getResourcesInCluster(_helixClusterName)) { + if (TableNameBuilder.isTableResource(resourceName)) { + IdealState idealState = helixAdmin.getResourceIdealState(_helixClusterName, resourceName); + if (idealState == null || !idealState.isEnabled()) { + continue; + } + for (String partition : idealState.getPartitionSet()) { + if (idealState.getInstanceSet(partition).contains(_instanceId)) { + resourcesToMonitor.add(resourceName); + break; + } + } + } + } - long currentTime = startTime; - while (currentTime < endTime) { - if (noOnlineResources(spectatorManager, resources)) { - LOGGER.info("No online resource within {}ms. Total waiting Time: {}ms", _checkIntervalTimeMs, - (currentTime - startTime)); + long checkIntervalMs = _serverConf + .getLong(CONFIG_OF_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS, DEFAULT_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS); + while (System.currentTimeMillis() < endTimeMs) { + Iterator<String> iterator = resourcesToMonitor.iterator(); + while (iterator.hasNext()) { + if (isResourceOffline(helixAdmin, iterator.next())) { + iterator.remove(); + } else { + // Do not check remaining resources if one resource is not OFFLINE + break; + } + } + if (resourcesToMonitor.isEmpty()) { + LOGGER.info("All resources are OFFLINE after {}ms", System.currentTimeMillis() - startTimeMs); return; } - try { - Thread.sleep(Math.min(_checkIntervalTimeMs, (endTime - currentTime))); + Thread.sleep(Math.min(checkIntervalMs, endTimeMs - System.currentTimeMillis())); } catch (InterruptedException e) { - LOGGER.error("Interrupted when waiting for no online resources.", e); + LOGGER.warn("Got interrupted while waiting for all resources OFFLINE", e); Review comment: return if we got interrupted ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org