klsince commented on code in PR #12883: URL: https://github.com/apache/pinot/pull/12883#discussion_r1573060376
########## pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java: ########## @@ -19,80 +19,107 @@ package org.apache.pinot.server.starter.helix; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; -import org.apache.pinot.common.utils.LLCSegmentName; +import java.util.function.Function; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; -import org.apache.pinot.spi.config.table.TableType; -import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public abstract class IngestionBasedConsumptionStatusChecker { protected final Logger _logger = LoggerFactory.getLogger(getClass()); - // constructor parameters - protected final InstanceDataManager _instanceDataManager; - protected final Set<String> _consumingSegments; - - // helper variable - private final Set<String> _caughtUpSegments = new HashSet<>(); + private final InstanceDataManager _instanceDataManager; + private final Map<String, Set<String>> _consumingSegmentsByTable; + private final Map<String, Set<String>> _caughtUpSegmentsByTable = new HashMap<>(); + private final Function<String, Set<String>> _consumingSegmentsSupplier; + /** + * Both consumingSegmentsByTable and consumingSegmentsSupplier are provided as it can be costly to get + * consumingSegmentsByTable via the supplier, so only use it when any missing segment is detected. + */ public IngestionBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, - Set<String> consumingSegments) { + Map<String, Set<String>> consumingSegmentsByTable, Function<String, Set<String>> consumingSegmentsSupplier) { _instanceDataManager = instanceDataManager; - _consumingSegments = consumingSegments; + _consumingSegmentsByTable = consumingSegmentsByTable; + _consumingSegmentsSupplier = consumingSegmentsSupplier; } - public int getNumConsumingSegmentsNotReachedIngestionCriteria() { - for (String segName : _consumingSegments) { - if (_caughtUpSegments.contains(segName)) { - continue; - } - TableDataManager tableDataManager = getTableDataManager(segName); + // This might be called by multiple threads, thus synchronized to be correct. + public synchronized int getNumConsumingSegmentsNotReachedIngestionCriteria() { + Set<String> tablesWithMissingSegment = new HashSet<>(); + for (Map.Entry<String, Set<String>> tableSegments : _consumingSegmentsByTable.entrySet()) { + String tableNameWithType = tableSegments.getKey(); + TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType); if (tableDataManager == null) { - _logger.info("TableDataManager is not yet setup for segment {}. Will check consumption status later", segName); + _logger.info("No tableDataManager for table: {}. Will check consumption status later", tableNameWithType); + tablesWithMissingSegment.add(tableNameWithType); continue; } - SegmentDataManager segmentDataManager = null; - try { - segmentDataManager = tableDataManager.acquireSegment(segName); - if (segmentDataManager == null) { - _logger.info("SegmentDataManager is not yet setup for segment {}. Will check consumption status later", - segName); + Set<String> consumingSegments = tableSegments.getValue(); + Set<String> caughtUpSegments = _caughtUpSegmentsByTable.computeIfAbsent(tableNameWithType, k -> new HashSet<>()); + for (String segName : consumingSegments) { + if (caughtUpSegments.contains(segName)) { continue; } - if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) { - // There's a possibility that a consuming segment has converted to a committed segment. If that's the case, - // segment data manager will not be of type RealtimeSegmentDataManager. - _logger.info("Segment {} is already committed and is considered caught up.", segName); - _caughtUpSegments.add(segName); - continue; - } - - RealtimeSegmentDataManager rtSegmentDataManager = (RealtimeSegmentDataManager) segmentDataManager; - if (isSegmentCaughtUp(segName, rtSegmentDataManager)) { - _caughtUpSegments.add(segName); + SegmentDataManager segmentDataManager = null; + try { + segmentDataManager = tableDataManager.acquireSegment(segName); + if (segmentDataManager == null) { + _logger.info("No SegmentDataManager for segment: {}. Will check consumption status later", segName); + tablesWithMissingSegment.add(tableNameWithType); + continue; + } + if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) { + // There's a possibility that a consuming segment has converted to a committed segment. If that's the case, + // segment data manager will not be of type RealtimeSegmentDataManager. + _logger.info("Segment: {} is already committed and is considered caught up.", segName); + caughtUpSegments.add(segName); + continue; + } + RealtimeSegmentDataManager rtSegmentDataManager = (RealtimeSegmentDataManager) segmentDataManager; + if (isSegmentCaughtUp(segName, rtSegmentDataManager)) { + caughtUpSegments.add(segName); + } + } finally { + if (segmentDataManager != null) { + tableDataManager.releaseSegment(segmentDataManager); + } } - } finally { - if (segmentDataManager != null) { - tableDataManager.releaseSegment(segmentDataManager); + } Review Comment: good point, but I'll remove the table while calculating the numLaggingSegments below, so that future checks skip the table whose consuming segments have all caught up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org