somandal commented on code in PR #15368: URL: https://github.com/apache/pinot/pull/15368#discussion_r2013047959
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -733,15 +793,79 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St serversGettingNewSegments, serverSegmentChangeInfoMap); // TODO: Add a metric to estimate the total time it will take to rebalance. Need some good heuristics on how // rebalance time can vary with number of segments added + RebalanceSummaryResult.ConsumingSegmentSummary consumingSegmentSummary = + isOfflineTable ? null : new RebalanceSummaryResult.ConsumingSegmentSummary( + consumingSegmentsToBeMoved, maxBytesToCatchUpForConsumingSegments, bytesToCatchUpForServers); RebalanceSummaryResult.SegmentInfo segmentInfo = new RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved, maxSegmentsAddedToServer, averageSegmentSizeInBytes, totalEstimatedDataToBeMovedInBytes, - replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas); + replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas, consumingSegmentSummary); LOGGER.info("Calculated rebalance summary for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); return new RebalanceSummaryResult(serverInfo, segmentInfo); } + @VisibleForTesting + ConsumingSegmentInfoReader getConsumingSegmentInfoReader() { + if (_executorService == null || _connectionManager == null || _pinotHelixResourceManager == null) { + return null; + } + return new ConsumingSegmentInfoReader(_executorService, _connectionManager, _pinotHelixResourceManager); + } + + /** + * Fetches the consuming segment info for the table and calculates the number of bytes to catch up for each consuming + * segment. Returns a map from segment name to the number of bytes to catch up for that consuming segment. Return + * null if failed to obtain info for any consuming segment. + */ + private Map<String, Integer> getConsumingSegmentsBytesToCatchUp(String tableNameWithType) { + ConsumingSegmentInfoReader consumingSegmentInfoReader = getConsumingSegmentInfoReader(); + if (consumingSegmentInfoReader == null) { + LOGGER.warn("ConsumingSegmentInfoReader is null, cannot calculate consuming segments info for table: {}", + tableNameWithType); + return null; + } + Map<String, Integer> segmentToBytesToCatchUp = new HashMap<>(); + try { + ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap = + consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, 30_000); + for (Map.Entry<String, List<ConsumingSegmentInfoReader.ConsumingSegmentInfo>> entry + : consumingSegmentsInfoMap._segmentToConsumingInfoMap.entrySet()) { + String segmentName = entry.getKey(); + List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfoList = entry.getValue(); + SegmentZKMetadata segmentZKMetadata = + ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), tableNameWithType, + segmentName); + if (segmentZKMetadata == null) { + LOGGER.warn("Cannot find SegmentZKMetadata for segment: {} in table: {}", segmentName, tableNameWithType); + return null; + } + if (consumingSegmentInfoList != null) { + String startOffset = segmentZKMetadata.getStartOffset(); + if (startOffset == null) { + LOGGER.warn("Start offset is null for segment: {} in table: {}", segmentName, tableNameWithType); + return null; + } + if (!consumingSegmentInfoList.isEmpty()) { + // this value should be the same regardless of which server the consuming segment info is from, use the + // first in the list here + int bytesToCatchUp = consumingSegmentInfoList.get(0)._partitionOffsetInfo._latestUpstreamOffsetMap.values() Review Comment: I am not that familiar with this code, but how is this calculating `bytes` rather than number of offsets to catch up? It looks to me like you get start offset, and subtract it from this offset? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -243,6 +244,8 @@ private enum LineageUpdateType { private final LineageManager _lineageManager; private final RebalancePreChecker _rebalancePreChecker; private TableSizeReader _tableSizeReader; + private final ExecutorService _executorService; + private HttpClientConnectionManager _connectionManager; Review Comment: as mentioned, let's not create this and pass it in, but directly create this in the code where we need it ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -124,10 +131,15 @@ public class TableRebalancer { private final ControllerMetrics _controllerMetrics; private final RebalancePreChecker _rebalancePreChecker; private final TableSizeReader _tableSizeReader; + private final ExecutorService _executorService; + private final HttpClientConnectionManager _connectionManager; Review Comment: let's not pass this in if it can be avoided In the `DefaultPreChecker` we create a connection manager. I would recommend doing the same here: ``` try (PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager()) { ... ... } ``` Also recommend using a try-with-resources type block as mentioned above so that the connection manager is automatically closed when no longer needed ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -693,6 +738,21 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St if (segmentsAdded > 0) { serversGettingNewSegments.add(server); } + if (!isOfflineTable) { + if (bytesToCatchUpForServers != null) { + bytesToCatchUpForServers.put(server, 0); + } + for (String segment : newSegmentSet) { + if (consumingSegments.contains(segment)) { + consumingSegmentsToBeMoved++; + if (bytesToCatchUpForSegments != null) { Review Comment: can we track the scenario where we have `bytesToCatchUpForSegments` = null so that in the summary we can easily tell if the information is accurate or not? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java: ########## @@ -240,6 +240,45 @@ public Map<String, ServerSegmentChangeInfo> getServerSegmentChangeInfo() { } } + public static class ConsumingSegmentSummary { + private final int _numConsumingSegmentsToBeMoved; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final Integer _maxBytesConsumingSegmentsToCatchUp; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final Map<String, Integer> _bytesConsumingSegmentsToCatchUpPerServer; Review Comment: I would also add oldest age here as well per server if possible ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -733,15 +793,79 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St serversGettingNewSegments, serverSegmentChangeInfoMap); // TODO: Add a metric to estimate the total time it will take to rebalance. Need some good heuristics on how // rebalance time can vary with number of segments added + RebalanceSummaryResult.ConsumingSegmentSummary consumingSegmentSummary = + isOfflineTable ? null : new RebalanceSummaryResult.ConsumingSegmentSummary( + consumingSegmentsToBeMoved, maxBytesToCatchUpForConsumingSegments, bytesToCatchUpForServers); RebalanceSummaryResult.SegmentInfo segmentInfo = new RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved, maxSegmentsAddedToServer, averageSegmentSizeInBytes, totalEstimatedDataToBeMovedInBytes, - replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas); + replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas, consumingSegmentSummary); LOGGER.info("Calculated rebalance summary for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); return new RebalanceSummaryResult(serverInfo, segmentInfo); } + @VisibleForTesting + ConsumingSegmentInfoReader getConsumingSegmentInfoReader() { + if (_executorService == null || _connectionManager == null || _pinotHelixResourceManager == null) { + return null; + } + return new ConsumingSegmentInfoReader(_executorService, _connectionManager, _pinotHelixResourceManager); + } + + /** + * Fetches the consuming segment info for the table and calculates the number of bytes to catch up for each consuming + * segment. Returns a map from segment name to the number of bytes to catch up for that consuming segment. Return + * null if failed to obtain info for any consuming segment. + */ + private Map<String, Integer> getConsumingSegmentsBytesToCatchUp(String tableNameWithType) { + ConsumingSegmentInfoReader consumingSegmentInfoReader = getConsumingSegmentInfoReader(); + if (consumingSegmentInfoReader == null) { + LOGGER.warn("ConsumingSegmentInfoReader is null, cannot calculate consuming segments info for table: {}", + tableNameWithType); + return null; + } Review Comment: Can we have an entry and exit log for this function? my concern is that since this also fetches ZK metadata, it can get expensive if there are lots of segments. The logs can help identify how long this calculation is taking ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -693,6 +738,21 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St if (segmentsAdded > 0) { serversGettingNewSegments.add(server); } + if (!isOfflineTable) { + if (bytesToCatchUpForServers != null) { + bytesToCatchUpForServers.put(server, 0); + } Review Comment: let's create an entry here only if this server has new segments added? Move this into the for loop below, and within the if condition `if (bytesToCatchUpForSegments != null) {` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -693,6 +738,21 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St if (segmentsAdded > 0) { serversGettingNewSegments.add(server); } + if (!isOfflineTable) { + if (bytesToCatchUpForServers != null) { + bytesToCatchUpForServers.put(server, 0); + } + for (String segment : newSegmentSet) { + if (consumingSegments.contains(segment)) { + consumingSegmentsToBeMoved++; + if (bytesToCatchUpForSegments != null) { + int bytesToCatchUp = bytesToCatchUpForSegments.getOrDefault(segment, 0); + maxBytesToCatchUpForConsumingSegments = Math.max(maxBytesToCatchUpForConsumingSegments, bytesToCatchUp); + bytesToCatchUpForServers.put(server, bytesToCatchUpForServers.get(server) + bytesToCatchUp); Review Comment: is it possible to use putIfAbsent / computeIfAbsent here to initialize to 0? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java: ########## @@ -240,6 +240,45 @@ public Map<String, ServerSegmentChangeInfo> getServerSegmentChangeInfo() { } } + public static class ConsumingSegmentSummary { + private final int _numConsumingSegmentsToBeMoved; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final Integer _maxBytesConsumingSegmentsToCatchUp; Review Comment: can we also track the difference in time (i.e. current time - segment creation time) We can have multiple scenarios: - Segment was created a long time ago but very few events - Segment was created recently but many events - Segment was created a long time ago and has many events - Segment created recently and has very few events It might be good to get the above perspective ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -1965,6 +1969,10 @@ public void registerTableSizeReader(TableSizeReader tableSizeReader) { _tableSizeReader = tableSizeReader; } + public void registerConnectionManager(HttpClientConnectionManager connectionManager) { Review Comment: remove this after you modify the code to create the connection manager directly ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java: ########## @@ -240,6 +240,45 @@ public Map<String, ServerSegmentChangeInfo> getServerSegmentChangeInfo() { } } + public static class ConsumingSegmentSummary { + private final int _numConsumingSegmentsToBeMoved; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final Integer _maxBytesConsumingSegmentsToCatchUp; Review Comment: Also, I thought we had discussed capturing the top 10 (or top X) segments and dumping their offset difference and time difference. here I see you've only captured the max? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java: ########## @@ -240,6 +240,45 @@ public Map<String, ServerSegmentChangeInfo> getServerSegmentChangeInfo() { } } + public static class ConsumingSegmentSummary { + private final int _numConsumingSegmentsToBeMoved; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final Integer _maxBytesConsumingSegmentsToCatchUp; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final Map<String, Integer> _bytesConsumingSegmentsToCatchUpPerServer; Review Comment: again, is bytes correct here? or should it be number of events (offsets)? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -664,6 +686,29 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St Set<String> serversGettingNewSegments = new HashSet<>(); Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> serverSegmentChangeInfoMap = new HashMap<>(); int segmentsNotMoved = 0; + Set<String> consumingSegments = new HashSet<>(); + boolean isOfflineTable = TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == TableType.OFFLINE; + Integer consumingSegmentsToBeMoved = null; + Integer maxBytesToCatchUpForConsumingSegments = null; + Map<String, Integer> bytesToCatchUpForSegments = null; + Map<String, Integer> bytesToCatchUpForServers = null; + if (!isOfflineTable) { + consumingSegmentsToBeMoved = 0; + bytesToCatchUpForSegments = getConsumingSegmentsBytesToCatchUp(tableNameWithType); Review Comment: this is an expensive operation and we don't really need to calculate this for REALTIME tables if consuming segments aren't to be moved. Can we only run this when we know for sure that CONSUMING segments are even involved in the rebalance? (i.e. make sure at least 1 consuming segment is moving) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org