Jackie-Jiang commented on a change in pull request #4218: Add 
RealtimeConsumptionCatchupServiceCallback
URL: https://github.com/apache/incubator-pinot/pull/4218#discussion_r285292815
 
 

 ##########
 File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
 ##########
 @@ -127,34 +129,129 @@ public String getStatusDescription() {
       return statusDescription.toString();
     }
   }
+  
+  public static abstract class BaseServiceStatusCallback implements 
ServiceStatusCallback {
+    private final String _clusterName;
+    final String _instanceName;
+    private final HelixAdmin _helixAdmin;
+    private final HelixDataAccessor _helixDataAccessor;
+
+    String _statusDescription = STATUS_DESCRIPTION_INIT;
+
+    BaseServiceStatusCallback(HelixManager helixManager, String clusterName, 
String instanceName) {
+      _helixAdmin = helixManager.getClusterManagmentTool();
+      _helixDataAccessor = helixManager.getHelixDataAccessor();
+      _clusterName = clusterName;
+      _instanceName = instanceName;
+    }
+
+    @Override
+    public synchronized String getStatusDescription() {
+      return _statusDescription;
+    }
+
+    protected IdealState getResourceIdealState(String resourceName) {
+      return _helixAdmin.getResourceIdealState(_clusterName, resourceName);
+    }
+
+    protected List<String> getResourcesInCluster() {
+      return _helixAdmin.getResourcesInCluster(_clusterName);
+    }
+
+    protected ExternalView getResourceExternalView(String resourceName) {
+      return _helixAdmin.getResourceExternalView(_clusterName, resourceName);
+    }
+
+    protected CurrentState getCurrentState(String resourceName) {
+      PropertyKey.Builder keyBuilder = _helixDataAccessor.keyBuilder();
+      LiveInstance liveInstance = 
_helixDataAccessor.getProperty(keyBuilder.liveInstance(_instanceName));
+      String sessionId = liveInstance.getSessionId();
+      return 
_helixDataAccessor.getProperty(keyBuilder.currentState(_instanceName, 
sessionId, resourceName));
+    }
+  }
+
+  /**
+   * Service status callback that checks whether realtime consumption has 
caught up
+   * TODO: In this initial version, we are simply adding a configurable static 
wait time
+   * This can be made smarter:
+   * 1) Keep track of average consumption rate for table in server stats
+   * 2) Monitor consumption rate during startup, report GOOD when it 
stabilizes to average rate
+   * 3) Monitor consumption rate during startup, report GOOD if it is idle
+   */
+  public static class RealtimeConsumptionCatchupServiceStatusCallback extends 
BaseServiceStatusCallback {
+
+    private final Map<String, List<String>> _tableToConsumingSegmentsMap;
+    private long _endWaitTime = 0;
+
+    /**
+     * Realtime consumption catchup service which adds a static wait time for
+     */
+    public RealtimeConsumptionCatchupServiceStatusCallback(HelixManager 
helixManager, String clusterName, String instanceName,
+        long realtimeConsumptionCatchupWaitTimeSeconds) {
+      super(helixManager, clusterName, instanceName);
+
+      _tableToConsumingSegmentsMap = new HashMap<>();
+      for (String resourceName : getResourcesInCluster()) {
+        if (TableNameBuilder.isRealtimeTableResource(resourceName)) {
+
+          IdealState idealState = getResourceIdealState(resourceName);
+          if (idealState.isEnabled()) {
+            List<String> consumingSegments = new ArrayList<>();
+            for (String segmentName : idealState.getPartitionSet()) {
+              Map<String, String> instanceStateMap = 
idealState.getInstanceStateMap(segmentName);
+              if 
(CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING.equals(
+                  instanceStateMap.get(instanceName))) {
+                consumingSegments.add(segmentName);
+              }
+            }
+            if (!consumingSegments.isEmpty()) {
+              _tableToConsumingSegmentsMap.put(resourceName, 
consumingSegments);
+            }
+          }
+        }
+      }
+
+      if (_tableToConsumingSegmentsMap.isEmpty()) {
+        LOGGER.info(
+            "No consuming segments to monitor on instance. Setting realtime 
consumption catchup wait time to 0ms");
 
 Review comment:
   Directly set the service status to GOOD instead of relying on the 
_endWaitTime

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to