J-HowHuang commented on code in PR #15368: URL: https://github.com/apache/pinot/pull/15368#discussion_r2017652432
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -733,15 +781,148 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St serversGettingNewSegments, serverSegmentChangeInfoMap); // TODO: Add a metric to estimate the total time it will take to rebalance. Need some good heuristics on how // rebalance time can vary with number of segments added + RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary consumingSegmentToBeMovedSummary = + isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType, newServersToConsumingSegmentMap); RebalanceSummaryResult.SegmentInfo segmentInfo = new RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved, maxSegmentsAddedToServer, averageSegmentSizeInBytes, totalEstimatedDataToBeMovedInBytes, - replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas); + replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary); LOGGER.info("Calculated rebalance summary for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); return new RebalanceSummaryResult(serverInfo, segmentInfo); } + private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary getConsumingSegmentSummary(String tableNameWithType, + Map<String, Set<String>> newServersToConsumingSegmentMap) { + int numConsumingSegmentsToBeMoved = + newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> a + b.size(), Integer::sum); + Set<String> uniqueConsumingSegments = + newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()); + Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new HashMap<>(); + uniqueConsumingSegments.forEach(segment -> consumingSegmentZKmetadata.put(segment, + ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), tableNameWithType, segment))); + Map<String, Integer> consumingSegmentsOffsetsToCatchUp = + getConsumingSegmentsOffsetsToCatchUp(tableNameWithType, consumingSegmentZKmetadata); + Map<String, Integer> consumingSegmentsAge = getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata); + + Map<String, Integer> topTenOffset; + Map<String, RebalanceSummaryResult.ConsumingSegmentSummaryPerServer> consumingSegmentSummaryPerServer = + new HashMap<>(); + if (consumingSegmentsOffsetsToCatchUp != null) { + topTenOffset = new LinkedHashMap<>(); + consumingSegmentsOffsetsToCatchUp.entrySet() + .stream() + .sorted( + Collections.reverseOrder(Map.Entry.comparingByValue())) + .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY) + .forEach(entry -> topTenOffset.put(entry.getKey(), entry.getValue())); + newServersToConsumingSegmentMap.forEach((server, segments) -> { + int totalOffsetsToCatchUp = + segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum(); + consumingSegmentSummaryPerServer.put(server, new RebalanceSummaryResult.ConsumingSegmentSummaryPerServer( + segments.size(), totalOffsetsToCatchUp)); + }); + } else { + topTenOffset = null; + newServersToConsumingSegmentMap.forEach((server, segments) -> { + consumingSegmentSummaryPerServer.put(server, new RebalanceSummaryResult.ConsumingSegmentSummaryPerServer( + segments.size(), null)); + }); + } + + Map<String, Integer> oldestTenSegment; + oldestTenSegment = new LinkedHashMap<>(); + consumingSegmentsAge.entrySet() + .stream() + .sorted( + Map.Entry.comparingByValue()) + .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY) + .forEach(entry -> oldestTenSegment.put(entry.getKey(), entry.getValue())); + + return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved, + newServersToConsumingSegmentMap.size(), topTenOffset, oldestTenSegment, consumingSegmentSummaryPerServer); + } + + private Map<String, Integer> getConsumingSegmentsAge(String tableNameWithType, Review Comment: This does not filter the top, it gets all and without sorting -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org