somandal commented on code in PR #15368: URL: https://github.com/apache/pinot/pull/15368#discussion_r2029082992
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -806,6 +856,172 @@ private List<String> getServerTag(String serverName) { return instanceConfig.getTags(); } + private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary getConsumingSegmentSummary(TableConfig tableConfig, + Map<String, Set<String>> newServersToConsumingSegmentMap) { + String tableNameWithType = tableConfig.getTableName(); + if (newServersToConsumingSegmentMap.isEmpty()) { + return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(0, 0, new HashMap<>(), new HashMap<>(), + new HashMap<>()); + } + int numConsumingSegmentsToBeMoved = + newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> a + b.size(), Integer::sum); + Set<String> uniqueConsumingSegments = + newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()); + Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new HashMap<>(); + uniqueConsumingSegments.forEach(segment -> consumingSegmentZKmetadata.put(segment, + ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), tableNameWithType, segment))); + Map<String, Integer> consumingSegmentsOffsetsToCatchUp = + getConsumingSegmentsOffsetsToCatchUp(tableConfig, consumingSegmentZKmetadata); + Map<String, Integer> consumingSegmentsAge = getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata); + + Map<String, Integer> consumingSegmentsOffsetsToCatchUpTopN; + Map<String, RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer> + consumingSegmentSummaryPerServer = new HashMap<>(); + if (consumingSegmentsOffsetsToCatchUp != null) { + consumingSegmentsOffsetsToCatchUpTopN = + getTopNConsumingSegmentWithValue(consumingSegmentsOffsetsToCatchUp, TOP_N_IN_CONSUMING_SEGMENT_SUMMARY); + newServersToConsumingSegmentMap.forEach((server, segments) -> { + int totalOffsetsToCatchUp = + segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum(); + consumingSegmentSummaryPerServer.put(server, + new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer( + segments.size(), totalOffsetsToCatchUp)); + }); + } else { + consumingSegmentsOffsetsToCatchUpTopN = null; + newServersToConsumingSegmentMap.forEach((server, segments) -> { + consumingSegmentSummaryPerServer.put(server, + new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer( + segments.size(), -1)); + }); + } + + Map<String, Integer> consumingSegmentsOldestTopN = + consumingSegmentsAge == null ? null + : getTopNConsumingSegmentWithValue(consumingSegmentsAge, TOP_N_IN_CONSUMING_SEGMENT_SUMMARY); + + return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved, + newServersToConsumingSegmentMap.size(), consumingSegmentsOffsetsToCatchUpTopN, consumingSegmentsOldestTopN, + consumingSegmentSummaryPerServer); + } + + private static Map<String, Integer> getTopNConsumingSegmentWithValue( + Map<String, Integer> consumingSegmentsWithValue, @Nullable Integer topN) { Review Comment: left a comment on that to ask why just for my understanding, but I'm okay keeping this as is -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org