Jackie-Jiang commented on code in PR #12883:
URL: https://github.com/apache/pinot/pull/12883#discussion_r1571539714


##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java:
##########
@@ -359,6 +361,20 @@ private void registerServiceStatusHandler() {
         new 
ServiceStatus.MultipleCallbackServiceStatusCallback(serviceStatusCallbackListBuilder.build()));
   }
 
+  private Set<String> getConsumingSegments(String tableName) {
+    Set<String> consumingSegments = new HashSet<>();
+    IdealState idealState = 
_helixAdmin.getResourceIdealState(_helixClusterName, tableName);
+    if (idealState.isEnabled() && 
TableNameBuilder.isRealtimeTableResource(tableName)) {

Review Comment:
   (nit) Second check is not needed



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java:
##########
@@ -37,62 +38,81 @@ public abstract class 
IngestionBasedConsumptionStatusChecker {
 
   // constructor parameters
   protected final InstanceDataManager _instanceDataManager;
-  protected final Set<String> _consumingSegments;
+  protected final Map<String, Set<String>> _consumingSegments;
+  protected final Function<String, Set<String>> _consumingSegmentsSupplier;
 
-  // helper variable
-  private final Set<String> _caughtUpSegments = new HashSet<>();
+  // helper variable, which is thread safe, as the method might be called from 
multiple threads when the health check
+  // endpoint is called by many probes.
+  private final Set<String> _caughtUpSegments = ConcurrentHashMap.newKeySet();
 
+  /**
+   * Both consumingSegments and consumingSegmentsSupplier are provided as it 
can be costly to get consumingSegments
+   * via the supplier, so only use it when any missing segment is detected.
+   */
   public IngestionBasedConsumptionStatusChecker(InstanceDataManager 
instanceDataManager,
-      Set<String> consumingSegments) {
+      Map<String, Set<String>> consumingSegments, @Nullable Function<String, 
Set<String>> consumingSegmentsSupplier) {
     _instanceDataManager = instanceDataManager;
-    _consumingSegments = consumingSegments;
+    _consumingSegments = new ConcurrentHashMap<>(consumingSegments);
+    _consumingSegmentsSupplier = consumingSegmentsSupplier;
   }
 
   public int getNumConsumingSegmentsNotReachedIngestionCriteria() {
-    for (String segName : _consumingSegments) {
-      if (_caughtUpSegments.contains(segName)) {
-        continue;
-      }
-      TableDataManager tableDataManager = getTableDataManager(segName);
+    Set<String> tablesWithMissingSegment = new HashSet<>();
+    for (Map.Entry<String, Set<String>> tableSegments : 
_consumingSegments.entrySet()) {
+      String tableNameWithType = tableSegments.getKey();
+      TableDataManager tableDataManager = 
_instanceDataManager.getTableDataManager(tableNameWithType);
       if (tableDataManager == null) {
-        _logger.info("TableDataManager is not yet setup for segment {}. Will 
check consumption status later", segName);
+        _logger.info("No tableDataManager for table: {}. Will check 
consumption status later", tableNameWithType);
+        tablesWithMissingSegment.add(tableNameWithType);
         continue;
       }
-      SegmentDataManager segmentDataManager = null;
-      try {
-        segmentDataManager = tableDataManager.acquireSegment(segName);
-        if (segmentDataManager == null) {
-          _logger.info("SegmentDataManager is not yet setup for segment {}. 
Will check consumption status later",
-              segName);
+      for (String segName : tableSegments.getValue()) {
+        if (_caughtUpSegments.contains(segName)) {
           continue;
         }
-        if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
-          // There's a possibility that a consuming segment has converted to a 
committed segment. If that's the case,
-          // segment data manager will not be of type 
RealtimeSegmentDataManager.
-          _logger.info("Segment {} is already committed and is considered 
caught up.", segName);
-          _caughtUpSegments.add(segName);
-          continue;
-        }
-
-        RealtimeSegmentDataManager rtSegmentDataManager = 
(RealtimeSegmentDataManager) segmentDataManager;
-        if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
-          _caughtUpSegments.add(segName);
+        SegmentDataManager segmentDataManager = null;
+        try {
+          segmentDataManager = tableDataManager.acquireSegment(segName);
+          if (segmentDataManager == null) {
+            _logger.info("No SegmentDataManager for segment: {}. Will check 
consumption status later", segName);
+            tablesWithMissingSegment.add(tableNameWithType);
+            continue;
+          }
+          if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
+            // There's a possibility that a consuming segment has converted to 
a committed segment. If that's the case,
+            // segment data manager will not be of type 
RealtimeSegmentDataManager.
+            _logger.info("Segment: {} is already committed and is considered 
caught up.", segName);
+            _caughtUpSegments.add(segName);
+            continue;
+          }
+          RealtimeSegmentDataManager rtSegmentDataManager = 
(RealtimeSegmentDataManager) segmentDataManager;
+          if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
+            _caughtUpSegments.add(segName);
+          }
+        } finally {
+          if (segmentDataManager != null) {
+            tableDataManager.releaseSegment(segmentDataManager);
+          }
         }
-      } finally {
-        if (segmentDataManager != null) {
-          tableDataManager.releaseSegment(segmentDataManager);
+      }
+    }
+    if (!tablesWithMissingSegment.isEmpty() && _consumingSegmentsSupplier != 
null) {
+      for (String tableName : tablesWithMissingSegment) {
+        Set<String> consumingSegments = 
_consumingSegmentsSupplier.apply(tableName);
+        if (consumingSegments == null || consumingSegments.isEmpty()) {
+          _consumingSegments.remove(tableName);
+        } else {
+          _consumingSegments.put(tableName, consumingSegments);
         }
+        _logger.info("Found missing segments in table: {}. Updated its 
consumingSegments: {}", tableName,
+            consumingSegments);
       }
     }
-    return _consumingSegments.size() - _caughtUpSegments.size();
+    Set<String> currentConsumingSegments = new HashSet<>();
+    _consumingSegments.forEach((k, v) -> currentConsumingSegments.addAll(v));
+    _caughtUpSegments.retainAll(currentConsumingSegments);
+    return currentConsumingSegments.size() - _caughtUpSegments.size();
   }
 
   protected abstract boolean isSegmentCaughtUp(String segmentName, 
RealtimeSegmentDataManager rtSegmentDataManager);
-

Review Comment:
   Nice!



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java:
##########
@@ -37,62 +38,81 @@ public abstract class 
IngestionBasedConsumptionStatusChecker {
 
   // constructor parameters
   protected final InstanceDataManager _instanceDataManager;
-  protected final Set<String> _consumingSegments;
+  protected final Map<String, Set<String>> _consumingSegments;
+  protected final Function<String, Set<String>> _consumingSegmentsSupplier;
 
-  // helper variable
-  private final Set<String> _caughtUpSegments = new HashSet<>();
+  // helper variable, which is thread safe, as the method might be called from 
multiple threads when the health check
+  // endpoint is called by many probes.
+  private final Set<String> _caughtUpSegments = ConcurrentHashMap.newKeySet();

Review Comment:
   Do we need to make it thread safe? I feel we can simply synchronize on 
`getNumConsumingSegmentsNotReachedIngestionCriteria()`. There should be only 
one caller



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java:
##########
@@ -359,6 +361,20 @@ private void registerServiceStatusHandler() {
         new 
ServiceStatus.MultipleCallbackServiceStatusCallback(serviceStatusCallbackListBuilder.build()));
   }
 
+  private Set<String> getConsumingSegments(String tableName) {

Review Comment:
   (nit)
   ```suggestion
     private Set<String> getConsumingSegments(String realtimeTableName) {
   ```



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java:
##########
@@ -37,9 +40,15 @@ public class FreshnessBasedConsumptionStatusChecker extends 
IngestionBasedConsum
   private final long _minFreshnessMs;
   private final long _idleTimeoutMs;
 
-  public FreshnessBasedConsumptionStatusChecker(InstanceDataManager 
instanceDataManager, Set<String> consumingSegments,
+  public FreshnessBasedConsumptionStatusChecker(InstanceDataManager 
instanceDataManager,
+      Map<String, Set<String>> consumingSegments, long minFreshnessMs, long 
idleTimeoutMs) {
+    this(instanceDataManager, consumingSegments, null, minFreshnessMs, 
idleTimeoutMs);
+  }
+
+  public FreshnessBasedConsumptionStatusChecker(InstanceDataManager 
instanceDataManager,

Review Comment:
   Since we are modifying the API anyway, maybe we should enforce a non-null 
supplier?



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java:
##########
@@ -37,62 +38,81 @@ public abstract class 
IngestionBasedConsumptionStatusChecker {
 
   // constructor parameters
   protected final InstanceDataManager _instanceDataManager;
-  protected final Set<String> _consumingSegments;
+  protected final Map<String, Set<String>> _consumingSegments;
+  protected final Function<String, Set<String>> _consumingSegmentsSupplier;
 
-  // helper variable
-  private final Set<String> _caughtUpSegments = new HashSet<>();
+  // helper variable, which is thread safe, as the method might be called from 
multiple threads when the health check
+  // endpoint is called by many probes.
+  private final Set<String> _caughtUpSegments = ConcurrentHashMap.newKeySet();

Review Comment:
   We may also make `_caughtUpSegments` a map, and whenever a table has all 
segments caught up, we can remove the table from both maps.
   Currently you'll need to merge all table segments into one in each check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to