Jackie-Jiang commented on code in PR #12883: URL: https://github.com/apache/pinot/pull/12883#discussion_r1571539714
########## pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java: ########## @@ -359,6 +361,20 @@ private void registerServiceStatusHandler() { new ServiceStatus.MultipleCallbackServiceStatusCallback(serviceStatusCallbackListBuilder.build())); } + private Set<String> getConsumingSegments(String tableName) { + Set<String> consumingSegments = new HashSet<>(); + IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableName); + if (idealState.isEnabled() && TableNameBuilder.isRealtimeTableResource(tableName)) { Review Comment: (nit) Second check is not needed ########## pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java: ########## @@ -37,62 +38,81 @@ public abstract class IngestionBasedConsumptionStatusChecker { // constructor parameters protected final InstanceDataManager _instanceDataManager; - protected final Set<String> _consumingSegments; + protected final Map<String, Set<String>> _consumingSegments; + protected final Function<String, Set<String>> _consumingSegmentsSupplier; - // helper variable - private final Set<String> _caughtUpSegments = new HashSet<>(); + // helper variable, which is thread safe, as the method might be called from multiple threads when the health check + // endpoint is called by many probes. + private final Set<String> _caughtUpSegments = ConcurrentHashMap.newKeySet(); + /** + * Both consumingSegments and consumingSegmentsSupplier are provided as it can be costly to get consumingSegments + * via the supplier, so only use it when any missing segment is detected. + */ public IngestionBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, - Set<String> consumingSegments) { + Map<String, Set<String>> consumingSegments, @Nullable Function<String, Set<String>> consumingSegmentsSupplier) { _instanceDataManager = instanceDataManager; - _consumingSegments = consumingSegments; + _consumingSegments = new ConcurrentHashMap<>(consumingSegments); + _consumingSegmentsSupplier = consumingSegmentsSupplier; } public int getNumConsumingSegmentsNotReachedIngestionCriteria() { - for (String segName : _consumingSegments) { - if (_caughtUpSegments.contains(segName)) { - continue; - } - TableDataManager tableDataManager = getTableDataManager(segName); + Set<String> tablesWithMissingSegment = new HashSet<>(); + for (Map.Entry<String, Set<String>> tableSegments : _consumingSegments.entrySet()) { + String tableNameWithType = tableSegments.getKey(); + TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType); if (tableDataManager == null) { - _logger.info("TableDataManager is not yet setup for segment {}. Will check consumption status later", segName); + _logger.info("No tableDataManager for table: {}. Will check consumption status later", tableNameWithType); + tablesWithMissingSegment.add(tableNameWithType); continue; } - SegmentDataManager segmentDataManager = null; - try { - segmentDataManager = tableDataManager.acquireSegment(segName); - if (segmentDataManager == null) { - _logger.info("SegmentDataManager is not yet setup for segment {}. Will check consumption status later", - segName); + for (String segName : tableSegments.getValue()) { + if (_caughtUpSegments.contains(segName)) { continue; } - if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) { - // There's a possibility that a consuming segment has converted to a committed segment. If that's the case, - // segment data manager will not be of type RealtimeSegmentDataManager. - _logger.info("Segment {} is already committed and is considered caught up.", segName); - _caughtUpSegments.add(segName); - continue; - } - - RealtimeSegmentDataManager rtSegmentDataManager = (RealtimeSegmentDataManager) segmentDataManager; - if (isSegmentCaughtUp(segName, rtSegmentDataManager)) { - _caughtUpSegments.add(segName); + SegmentDataManager segmentDataManager = null; + try { + segmentDataManager = tableDataManager.acquireSegment(segName); + if (segmentDataManager == null) { + _logger.info("No SegmentDataManager for segment: {}. Will check consumption status later", segName); + tablesWithMissingSegment.add(tableNameWithType); + continue; + } + if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) { + // There's a possibility that a consuming segment has converted to a committed segment. If that's the case, + // segment data manager will not be of type RealtimeSegmentDataManager. + _logger.info("Segment: {} is already committed and is considered caught up.", segName); + _caughtUpSegments.add(segName); + continue; + } + RealtimeSegmentDataManager rtSegmentDataManager = (RealtimeSegmentDataManager) segmentDataManager; + if (isSegmentCaughtUp(segName, rtSegmentDataManager)) { + _caughtUpSegments.add(segName); + } + } finally { + if (segmentDataManager != null) { + tableDataManager.releaseSegment(segmentDataManager); + } } - } finally { - if (segmentDataManager != null) { - tableDataManager.releaseSegment(segmentDataManager); + } + } + if (!tablesWithMissingSegment.isEmpty() && _consumingSegmentsSupplier != null) { + for (String tableName : tablesWithMissingSegment) { + Set<String> consumingSegments = _consumingSegmentsSupplier.apply(tableName); + if (consumingSegments == null || consumingSegments.isEmpty()) { + _consumingSegments.remove(tableName); + } else { + _consumingSegments.put(tableName, consumingSegments); } + _logger.info("Found missing segments in table: {}. Updated its consumingSegments: {}", tableName, + consumingSegments); } } - return _consumingSegments.size() - _caughtUpSegments.size(); + Set<String> currentConsumingSegments = new HashSet<>(); + _consumingSegments.forEach((k, v) -> currentConsumingSegments.addAll(v)); + _caughtUpSegments.retainAll(currentConsumingSegments); + return currentConsumingSegments.size() - _caughtUpSegments.size(); } protected abstract boolean isSegmentCaughtUp(String segmentName, RealtimeSegmentDataManager rtSegmentDataManager); - Review Comment: Nice! ########## pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java: ########## @@ -37,62 +38,81 @@ public abstract class IngestionBasedConsumptionStatusChecker { // constructor parameters protected final InstanceDataManager _instanceDataManager; - protected final Set<String> _consumingSegments; + protected final Map<String, Set<String>> _consumingSegments; + protected final Function<String, Set<String>> _consumingSegmentsSupplier; - // helper variable - private final Set<String> _caughtUpSegments = new HashSet<>(); + // helper variable, which is thread safe, as the method might be called from multiple threads when the health check + // endpoint is called by many probes. + private final Set<String> _caughtUpSegments = ConcurrentHashMap.newKeySet(); Review Comment: Do we need to make it thread safe? I feel we can simply synchronize on `getNumConsumingSegmentsNotReachedIngestionCriteria()`. There should be only one caller ########## pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java: ########## @@ -359,6 +361,20 @@ private void registerServiceStatusHandler() { new ServiceStatus.MultipleCallbackServiceStatusCallback(serviceStatusCallbackListBuilder.build())); } + private Set<String> getConsumingSegments(String tableName) { Review Comment: (nit) ```suggestion private Set<String> getConsumingSegments(String realtimeTableName) { ``` ########## pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java: ########## @@ -37,9 +40,15 @@ public class FreshnessBasedConsumptionStatusChecker extends IngestionBasedConsum private final long _minFreshnessMs; private final long _idleTimeoutMs; - public FreshnessBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, Set<String> consumingSegments, + public FreshnessBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, + Map<String, Set<String>> consumingSegments, long minFreshnessMs, long idleTimeoutMs) { + this(instanceDataManager, consumingSegments, null, minFreshnessMs, idleTimeoutMs); + } + + public FreshnessBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, Review Comment: Since we are modifying the API anyway, maybe we should enforce a non-null supplier? ########## pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java: ########## @@ -37,62 +38,81 @@ public abstract class IngestionBasedConsumptionStatusChecker { // constructor parameters protected final InstanceDataManager _instanceDataManager; - protected final Set<String> _consumingSegments; + protected final Map<String, Set<String>> _consumingSegments; + protected final Function<String, Set<String>> _consumingSegmentsSupplier; - // helper variable - private final Set<String> _caughtUpSegments = new HashSet<>(); + // helper variable, which is thread safe, as the method might be called from multiple threads when the health check + // endpoint is called by many probes. + private final Set<String> _caughtUpSegments = ConcurrentHashMap.newKeySet(); Review Comment: We may also make `_caughtUpSegments` a map, and whenever a table has all segments caught up, we can remove the table from both maps. Currently you'll need to merge all table segments into one in each check -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org