This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6611ff8  Add config for enabling realtime offset based consumption 
status checker (#7753)
6611ff8 is described below

commit 6611ff82f46ea837e6bd8edefd46ccc16d4e2002
Author: Sajjad Moradi <moradi.saj...@gmail.com>
AuthorDate: Mon Nov 15 12:05:32 2021 -0800

    Add config for enabling realtime offset based consumption status checker 
(#7753)
---
 .../apache/pinot/common/utils/ServiceStatus.java    | 21 ++++++++-------------
 .../server/starter/helix/BaseServerStarter.java     | 16 ++++++++++++----
 .../org/apache/pinot/spi/utils/CommonConstants.java |  3 +++
 3 files changed, 23 insertions(+), 17 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
index d50e399..45221fb 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
@@ -218,8 +218,6 @@ public class ServiceStatus {
     private final Supplier<Integer> 
_getNumConsumingSegmentsNotReachedTheirLatestOffset;
     String _statusDescription = STATUS_DESCRIPTION_INIT;
 
-    private boolean _consumptionNotYetCaughtUp = true;
-
     /**
      * Realtime consumption catchup service which adds a static wait time for 
consuming segments to catchup
      */
@@ -242,22 +240,19 @@ public class ServiceStatus {
         return _serviceStatus;
       }
       long now = System.currentTimeMillis();
-      int numConsumingSegmentsNotCaughtUp = 
_getNumConsumingSegmentsNotReachedTheirLatestOffset.get();
+      boolean isConsumingSegmentsCounterProvided = 
_getNumConsumingSegmentsNotReachedTheirLatestOffset != null;
+      int numConsumingSegmentsNotCaughtUp =
+          isConsumingSegmentsCounterProvided ? 
_getNumConsumingSegmentsNotReachedTheirLatestOffset.get() : -1;
       if (now >= _endWaitTime) {
         _statusDescription = String.format("Consuming segments status GOOD 
since %dms "
             + "(numConsumingSegmentsNotCaughtUp=%d)", _endWaitTime, 
numConsumingSegmentsNotCaughtUp);
         return Status.GOOD;
       }
-      if (_consumptionNotYetCaughtUp && numConsumingSegmentsNotCaughtUp > 0) {
-        // TODO: Once the performance of offset based consumption checker is 
validated:
-        //      - remove the log line
-        //      - uncomment the status & statusDescription lines
-        //      - remove variable _consumptionNotYetCaughtUp
-        _consumptionNotYetCaughtUp = false;
-        LOGGER.info("All consuming segments have reached their latest offsets! 
"
-            + "Finished {} msec earlier than time threshold.", _endWaitTime - 
now);
-//      _statusDescription = "Consuming segments status GOOD as all consuming 
segments have reached the latest offset";
-//      return Status.GOOD;
+      if (isConsumingSegmentsCounterProvided && 
numConsumingSegmentsNotCaughtUp == 0) {
+        _statusDescription = String.format(
+            "Consuming segments status GOOD as all consuming segments have 
reached the latest offset. "
+                + "Finished %d msec earlier than time threshold.", 
_endWaitTime - now);
+        return Status.GOOD;
       }
       _statusDescription =
           String.format("Waiting for consuming segments to catchup: 
numConsumingSegmentsNotCaughtUp=%d, "
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index fe010ba..aaa1e2a 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.HelixAdmin;
@@ -221,6 +222,9 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     int realtimeConsumptionCatchupWaitMs = _serverConf
         
.getProperty(Server.CONFIG_OF_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS,
             Server.DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS);
+    boolean isOffsetBasedConsumptionStatusCheckerEnabled = _serverConf
+        
.getProperty(Server.CONFIG_OF_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER,
+            
Server.DEFAULT_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER);
 
     // collect all resources which have this instance in the ideal state
     List<String> resourcesToMonitor = new ArrayList<>();
@@ -265,12 +269,16 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
             _instanceId, resourcesToMonitor, minResourcePercentForStartup));
     boolean foundConsuming = !consumingSegments.isEmpty();
     if (checkRealtime && foundConsuming) {
-      OffsetBasedConsumptionStatusChecker consumptionStatusChecker =
-          new 
OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(), 
consumingSegments);
+      Supplier<Integer> getNumConsumingSegmentsNotReachedTheirLatestOffset = 
null;
+      if (isOffsetBasedConsumptionStatusCheckerEnabled) {
+        OffsetBasedConsumptionStatusChecker consumptionStatusChecker =
+            new 
OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(), 
consumingSegments);
+        getNumConsumingSegmentsNotReachedTheirLatestOffset =
+            
consumptionStatusChecker::getNumConsumingSegmentsNotReachedTheirLatestOffset;
+      }
       serviceStatusCallbackListBuilder.add(
           new 
ServiceStatus.RealtimeConsumptionCatchupServiceStatusCallback(_helixManager, 
_helixClusterName,
-              _instanceId, realtimeConsumptionCatchupWaitMs,
-              
consumptionStatusChecker::getNumConsumingSegmentsNotReachedTheirLatestOffset));
+              _instanceId, realtimeConsumptionCatchupWaitMs, 
getNumConsumingSegmentsNotReachedTheirLatestOffset));
     }
     LOGGER.info("Registering service status handler");
     ServiceStatus.setServiceStatusCallback(_instanceId,
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 1402417..58c175e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -296,6 +296,9 @@ public class CommonConstants {
     public static final String 
CONFIG_OF_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS =
         "pinot.server.starter.realtimeConsumptionCatchupWaitMs";
     public static final int 
DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS = 0;
+    public static final String 
CONFIG_OF_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER =
+        
"pinot.server.starter.enableRealtimeOffsetBasedConsumptionStatusChecker";
+    public static final boolean 
DEFAULT_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER = false;
 
     public static final String DEFAULT_READ_MODE = "mmap";
     // Whether to reload consuming segment on scheme update

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to