This is an automated email from the ASF dual-hosted git repository. mcvsubbu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 6611ff8 Add config for enabling realtime offset based consumption status checker (#7753) 6611ff8 is described below commit 6611ff82f46ea837e6bd8edefd46ccc16d4e2002 Author: Sajjad Moradi <moradi.saj...@gmail.com> AuthorDate: Mon Nov 15 12:05:32 2021 -0800 Add config for enabling realtime offset based consumption status checker (#7753) --- .../apache/pinot/common/utils/ServiceStatus.java | 21 ++++++++------------- .../server/starter/helix/BaseServerStarter.java | 16 ++++++++++++---- .../org/apache/pinot/spi/utils/CommonConstants.java | 3 +++ 3 files changed, 23 insertions(+), 17 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java index d50e399..45221fb 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java @@ -218,8 +218,6 @@ public class ServiceStatus { private final Supplier<Integer> _getNumConsumingSegmentsNotReachedTheirLatestOffset; String _statusDescription = STATUS_DESCRIPTION_INIT; - private boolean _consumptionNotYetCaughtUp = true; - /** * Realtime consumption catchup service which adds a static wait time for consuming segments to catchup */ @@ -242,22 +240,19 @@ public class ServiceStatus { return _serviceStatus; } long now = System.currentTimeMillis(); - int numConsumingSegmentsNotCaughtUp = _getNumConsumingSegmentsNotReachedTheirLatestOffset.get(); + boolean isConsumingSegmentsCounterProvided = _getNumConsumingSegmentsNotReachedTheirLatestOffset != null; + int numConsumingSegmentsNotCaughtUp = + isConsumingSegmentsCounterProvided ? _getNumConsumingSegmentsNotReachedTheirLatestOffset.get() : -1; if (now >= _endWaitTime) { _statusDescription = String.format("Consuming segments status GOOD since %dms " + "(numConsumingSegmentsNotCaughtUp=%d)", _endWaitTime, numConsumingSegmentsNotCaughtUp); return Status.GOOD; } - if (_consumptionNotYetCaughtUp && numConsumingSegmentsNotCaughtUp > 0) { - // TODO: Once the performance of offset based consumption checker is validated: - // - remove the log line - // - uncomment the status & statusDescription lines - // - remove variable _consumptionNotYetCaughtUp - _consumptionNotYetCaughtUp = false; - LOGGER.info("All consuming segments have reached their latest offsets! " - + "Finished {} msec earlier than time threshold.", _endWaitTime - now); -// _statusDescription = "Consuming segments status GOOD as all consuming segments have reached the latest offset"; -// return Status.GOOD; + if (isConsumingSegmentsCounterProvided && numConsumingSegmentsNotCaughtUp == 0) { + _statusDescription = String.format( + "Consuming segments status GOOD as all consuming segments have reached the latest offset. " + + "Finished %d msec earlier than time threshold.", _endWaitTime - now); + return Status.GOOD; } _statusDescription = String.format("Waiting for consuming segments to catchup: numConsumingSegmentsNotCaughtUp=%d, " diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index fe010ba..aaa1e2a 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Supplier; import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; import org.apache.helix.HelixAdmin; @@ -221,6 +222,9 @@ public abstract class BaseServerStarter implements ServiceStartable { int realtimeConsumptionCatchupWaitMs = _serverConf .getProperty(Server.CONFIG_OF_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS, Server.DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS); + boolean isOffsetBasedConsumptionStatusCheckerEnabled = _serverConf + .getProperty(Server.CONFIG_OF_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER, + Server.DEFAULT_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER); // collect all resources which have this instance in the ideal state List<String> resourcesToMonitor = new ArrayList<>(); @@ -265,12 +269,16 @@ public abstract class BaseServerStarter implements ServiceStartable { _instanceId, resourcesToMonitor, minResourcePercentForStartup)); boolean foundConsuming = !consumingSegments.isEmpty(); if (checkRealtime && foundConsuming) { - OffsetBasedConsumptionStatusChecker consumptionStatusChecker = - new OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(), consumingSegments); + Supplier<Integer> getNumConsumingSegmentsNotReachedTheirLatestOffset = null; + if (isOffsetBasedConsumptionStatusCheckerEnabled) { + OffsetBasedConsumptionStatusChecker consumptionStatusChecker = + new OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(), consumingSegments); + getNumConsumingSegmentsNotReachedTheirLatestOffset = + consumptionStatusChecker::getNumConsumingSegmentsNotReachedTheirLatestOffset; + } serviceStatusCallbackListBuilder.add( new ServiceStatus.RealtimeConsumptionCatchupServiceStatusCallback(_helixManager, _helixClusterName, - _instanceId, realtimeConsumptionCatchupWaitMs, - consumptionStatusChecker::getNumConsumingSegmentsNotReachedTheirLatestOffset)); + _instanceId, realtimeConsumptionCatchupWaitMs, getNumConsumingSegmentsNotReachedTheirLatestOffset)); } LOGGER.info("Registering service status handler"); ServiceStatus.setServiceStatusCallback(_instanceId, diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 1402417..58c175e 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -296,6 +296,9 @@ public class CommonConstants { public static final String CONFIG_OF_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS = "pinot.server.starter.realtimeConsumptionCatchupWaitMs"; public static final int DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS = 0; + public static final String CONFIG_OF_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER = + "pinot.server.starter.enableRealtimeOffsetBasedConsumptionStatusChecker"; + public static final boolean DEFAULT_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER = false; public static final String DEFAULT_READ_MODE = "mmap"; // Whether to reload consuming segment on scheme update --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org