J-HowHuang commented on code in PR #16136: URL: https://github.com/apache/pinot/pull/16136#discussion_r2175625928
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java: ########## @@ -65,4 +215,449 @@ public void setJobId(String jobId) { public void setRebalanceTableResults(Map<String, RebalanceResult> rebalanceTableResults) { _rebalanceTableResults = rebalanceTableResults; } + + /** + * Aggregated pre-check result that provides table-level pre-check status counts and message mappings + */ + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class AggregatedPrecheckResult { + private final int _tablesPassedCount; + private final int _tablesWarnedCount; + private final int _tablesErroredCount; + private final Map<String, String> _passedTables; + private final Map<String, String> _warnedTables; + private final Map<String, String> _erroredTables; + + public AggregatedPrecheckResult(int tablesPassedCount, int tablesWarnedCount, int tablesErroredCount, + Map<String, String> passedTables, Map<String, String> warnedTables, + Map<String, String> erroredTables) { + _tablesPassedCount = tablesPassedCount; + _tablesWarnedCount = tablesWarnedCount; + _tablesErroredCount = tablesErroredCount; + _passedTables = passedTables; + _warnedTables = warnedTables; + _erroredTables = erroredTables; + } + + @JsonProperty + public int getTablesPassedCount() { + return _tablesPassedCount; + } + + @JsonProperty + public int getTablesWarnedCount() { + return _tablesWarnedCount; + } + + @JsonProperty + public int getTablesErroredCount() { + return _tablesErroredCount; + } + + @JsonProperty + public Map<String, String> getPassedTables() { + return _passedTables; + } + + @JsonProperty + public Map<String, String> getWarnedTables() { + return _warnedTables; + } + + @JsonProperty + public Map<String, String> getErroredTables() { + return _erroredTables; + } + } + + /** + * Step 1: Aggregate ServerSegmentChangeInfo across all tables for each server + */ + private static Map<String, AggregatedServerSegmentChangeInfo> aggregateServerSegmentChangeInfo( + List<RebalanceSummaryResult> summaryResults) { + Map<String, AggregatedServerSegmentChangeInfo> serverAggregates = new HashMap<>(); + + for (RebalanceSummaryResult summary : summaryResults) { + if (summary.getServerInfo() != null && summary.getServerInfo().getServerSegmentChangeInfo() != null) { + for (Map.Entry<String, RebalanceSummaryResult.ServerSegmentChangeInfo> entry : summary.getServerInfo() + .getServerSegmentChangeInfo() + .entrySet()) { + String serverName = entry.getKey(); + RebalanceSummaryResult.ServerSegmentChangeInfo changeInfo = entry.getValue(); + + serverAggregates.computeIfAbsent(serverName, k -> new AggregatedServerSegmentChangeInfo()) + .merge(changeInfo); + } + } + } + + return serverAggregates; + } + + /** + * Helper class to aggregate ServerSegmentChangeInfo across multiple tables + */ + private static class AggregatedServerSegmentChangeInfo extends RebalanceSummaryResult.ServerSegmentChangeInfo { + + AggregatedServerSegmentChangeInfo() { + super(RebalanceSummaryResult.ServerStatus.UNCHANGED, 0, 0, 0, 0, 0, null); + } + + void merge(RebalanceSummaryResult.ServerSegmentChangeInfo changeInfo) { + _totalSegmentsAfterRebalance += changeInfo.getTotalSegmentsAfterRebalance(); + _totalSegmentsBeforeRebalance += changeInfo.getTotalSegmentsBeforeRebalance(); + _segmentsAdded += changeInfo.getSegmentsAdded(); + _segmentsDeleted += changeInfo.getSegmentsDeleted(); + _segmentsUnchanged += changeInfo.getSegmentsUnchanged(); + + // Use tag list from any of the change infos (should be consistent) + if (_tagList == null && changeInfo.getTagList() != null) { + _tagList = changeInfo.getTagList(); + } + if (_totalSegmentsAfterRebalance == 0) { + _serverStatus = RebalanceSummaryResult.ServerStatus.REMOVED; + } else if (_totalSegmentsBeforeRebalance == 0) { + _serverStatus = RebalanceSummaryResult.ServerStatus.ADDED; + } else { + _serverStatus = RebalanceSummaryResult.ServerStatus.UNCHANGED; + } + } + } + + /** + * Aggregated ServerInfo that extends RebalanceSummaryResult.ServerInfo + */ + private static class AggregatedServerInfo extends RebalanceSummaryResult.ServerInfo { + AggregatedServerInfo(Map<String, AggregatedServerSegmentChangeInfo> serverAggregates) { + super(0, null, null, null, null, null, null); + + if (serverAggregates.isEmpty()) { + return; + } + + Set<String> serversAdded = new HashSet<>(); + Set<String> serversRemoved = new HashSet<>(); + Set<String> serversUnchanged = new HashSet<>(); + Set<String> serversGettingNewSegments = new HashSet<>(); + Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> finalServerSegmentChangeInfo = new HashMap<>(); + + int numServersGettingNewSegments = 0; + int totalServersBefore = 0; + int totalServersAfter = 0; + + for (Map.Entry<String, AggregatedServerSegmentChangeInfo> entry : serverAggregates.entrySet()) { + String serverName = entry.getKey(); + AggregatedServerSegmentChangeInfo aggregate = entry.getValue(); + + // Determine server status based on aggregated segment counts + if (aggregate.getServerStatus() == RebalanceSummaryResult.ServerStatus.REMOVED) { + serversRemoved.add(serverName); + } else if (aggregate.getServerStatus() == RebalanceSummaryResult.ServerStatus.ADDED) { + serversAdded.add(serverName); + } else if (aggregate.getServerStatus() == RebalanceSummaryResult.ServerStatus.UNCHANGED) { + serversUnchanged.add(serverName); + } + + // Track servers getting new segments + if (aggregate.getSegmentsAdded() > 0) { + serversGettingNewSegments.add(serverName); + numServersGettingNewSegments++; + } + + // Create final ServerSegmentChangeInfo with determined status + finalServerSegmentChangeInfo.put(serverName, aggregate); + + // Count servers for before/after totals + if (aggregate.getTotalSegmentsBeforeRebalance() > 0) { + totalServersBefore++; + } + if (aggregate.getTotalSegmentsAfterRebalance() > 0) { + totalServersAfter++; + } + } + + RebalanceSummaryResult.RebalanceChangeInfo + aggregatedNumServers = new RebalanceSummaryResult.RebalanceChangeInfo(totalServersBefore, totalServersAfter); + + // Set the computed values using reflection or by updating the parent constructor call + _numServersGettingNewSegments = numServersGettingNewSegments; + _numServers = aggregatedNumServers; + _serversAdded = serversAdded; + _serversRemoved = serversRemoved; + _serversUnchanged = serversUnchanged; + _serversGettingNewSegments = serversGettingNewSegments; + _serverSegmentChangeInfo = finalServerSegmentChangeInfo; + } + } + + /** + * Aggregated SegmentInfo that extends RebalanceSummaryResult.SegmentInfo + */ + private static class AggregatedSegmentInfo extends RebalanceSummaryResult.SegmentInfo { + AggregatedSegmentInfo(List<RebalanceSummaryResult> summaryResults, + Map<String, AggregatedServerSegmentChangeInfo> serverAggregates) { + super(0, 0, 0, 0, 0, null, null, null, null); + + if (summaryResults.isEmpty()) { + return; + } + + int totalSegmentsToBeMoved = 0; + int totalSegmentsToBeDeleted = 0; + long totalEstimatedDataToBeMovedInBytes = 0; + + int beforeRebalanceReplicationFactor = 0; + int afterRebalanceReplicationFactor = 0; + int beforeRebalanceSegmentsInSingleReplica = 0; + int afterRebalanceSegmentsInSingleReplica = 0; + int beforeRebalanceSegmentsAcrossAllReplicas = 0; + int afterRebalanceSegmentsAcrossAllReplicas = 0; + + // Track consuming segment summaries for realtime tables + List<RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary> consumingSummaries = new ArrayList<>(); + int validSummariesCount = 0; + + // Aggregate data from individual table summaries + for (RebalanceSummaryResult summary : summaryResults) { + if (summary.getSegmentInfo() != null) { + RebalanceSummaryResult.SegmentInfo segmentInfo = summary.getSegmentInfo(); + validSummariesCount++; + + totalSegmentsToBeMoved += segmentInfo.getTotalSegmentsToBeMoved(); + totalSegmentsToBeDeleted += segmentInfo.getTotalSegmentsToBeDeleted(); + if (totalEstimatedDataToBeMovedInBytes >= 0) { + totalEstimatedDataToBeMovedInBytes = segmentInfo.getTotalEstimatedDataToBeMovedInBytes() < 0 ? -1 + : totalEstimatedDataToBeMovedInBytes + segmentInfo.getTotalEstimatedDataToBeMovedInBytes(); + } + + if (segmentInfo.getNumSegmentsInSingleReplica() != null) { + beforeRebalanceSegmentsInSingleReplica += + segmentInfo.getNumSegmentsInSingleReplica().getValueBeforeRebalance(); + afterRebalanceSegmentsInSingleReplica += + segmentInfo.getNumSegmentsInSingleReplica().getExpectedValueAfterRebalance(); + } + + if (segmentInfo.getNumSegmentsAcrossAllReplicas() != null) { + beforeRebalanceSegmentsAcrossAllReplicas += + segmentInfo.getNumSegmentsAcrossAllReplicas().getValueBeforeRebalance(); + afterRebalanceSegmentsAcrossAllReplicas += + segmentInfo.getNumSegmentsAcrossAllReplicas().getExpectedValueAfterRebalance(); + } + + if (segmentInfo.getConsumingSegmentToBeMovedSummary() != null) { + consumingSummaries.add(segmentInfo.getConsumingSegmentToBeMovedSummary()); + } + } + } + + if (validSummariesCount == 0) { + return; + } + + int maxSegmentsAddedToASingleServer = 0; + for (AggregatedServerSegmentChangeInfo aggregate : serverAggregates.values()) { + maxSegmentsAddedToASingleServer = Math.max(maxSegmentsAddedToASingleServer, aggregate.getSegmentsAdded()); + } + + // Calculate average segment size + long estimatedAverageSegmentSizeInBytes = + totalEstimatedDataToBeMovedInBytes >= 0 && totalSegmentsToBeMoved > 0 ? totalEstimatedDataToBeMovedInBytes + / totalSegmentsToBeMoved : 0; + + // Create aggregated RebalanceChangeInfo objects + RebalanceSummaryResult.RebalanceChangeInfo aggregatedNumSegmentsInSingleReplica = + new RebalanceSummaryResult.RebalanceChangeInfo( + beforeRebalanceSegmentsInSingleReplica, afterRebalanceSegmentsInSingleReplica); + RebalanceSummaryResult.RebalanceChangeInfo aggregatedNumSegmentsAcrossAllReplicas = + new RebalanceSummaryResult.RebalanceChangeInfo( + beforeRebalanceSegmentsAcrossAllReplicas, afterRebalanceSegmentsAcrossAllReplicas); + + // Aggregate consuming segment summaries + RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary aggregatedConsumingSummary = + aggregateConsumingSegmentSummary(consumingSummaries); + + // Set the computed values + _totalSegmentsToBeMoved = totalSegmentsToBeMoved; + _totalSegmentsToBeDeleted = totalSegmentsToBeDeleted; + _maxSegmentsAddedToASingleServer = maxSegmentsAddedToASingleServer; + _estimatedAverageSegmentSizeInBytes = estimatedAverageSegmentSizeInBytes; + _totalEstimatedDataToBeMovedInBytes = totalEstimatedDataToBeMovedInBytes; + _replicationFactor = null; // This is irrelevant for tenant rebalance + _numSegmentsInSingleReplica = aggregatedNumSegmentsInSingleReplica; + _numSegmentsAcrossAllReplicas = aggregatedNumSegmentsAcrossAllReplicas; + _consumingSegmentToBeMovedSummary = aggregatedConsumingSummary; + } + } + + /** + * Aggregates consuming segment summaries for realtime tables + */ + private static RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary aggregateConsumingSegmentSummary( + List<RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary> summaries) { + if (summaries.isEmpty()) { + return null; + } + + int totalNumConsumingSegmentsToBeMoved = 0; + // Find the maximum size of offset and age maps across all tables + int maxOffsetMapSize = 0; + int maxAgeMapSize = 0; + for (RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary summary : summaries) { + if (summary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp() != null) { + maxOffsetMapSize = Math.max(maxOffsetMapSize, + summary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp().size()); + } + if (summary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes() != null) { + maxAgeMapSize = Math.max(maxAgeMapSize, + summary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().size()); + } + } + + // Create maps to store all segments by offset and age + Map<String, Integer> allConsumingSegmentsWithOffsets = new HashMap<>(); + Map<String, Integer> allConsumingSegmentsWithAges = new HashMap<>(); + + // Aggregate ConsumingSegmentSummaryPerServer by server name across all tables + Map<String, AggregatedConsumingSegmentSummaryPerServer> serverAggregates = new HashMap<>(); + + for (RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary summary : summaries) { + totalNumConsumingSegmentsToBeMoved += summary.getNumConsumingSegmentsToBeMoved(); + + // Add all segments with offsets + if (summary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp() != null) { + allConsumingSegmentsWithOffsets.putAll(summary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp()); + } + + // Add all segments with ages + if (summary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes() != null) { + allConsumingSegmentsWithAges.putAll(summary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes()); + } + + // Aggregate server consuming segment summaries by server name + if (summary.getServerConsumingSegmentSummary() != null) { + for (Map.Entry<String, + RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer> entry + : summary.getServerConsumingSegmentSummary() + .entrySet()) { + String serverName = entry.getKey(); + RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer serverSummary = + entry.getValue(); + + serverAggregates.computeIfAbsent(serverName, k -> new AggregatedConsumingSegmentSummaryPerServer()) + .merge(serverSummary); + } + } + } + + // Keep only top k segments with the highest offsets Review Comment: done ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java: ########## @@ -65,4 +215,438 @@ public void setJobId(String jobId) { public void setRebalanceTableResults(Map<String, RebalanceResult> rebalanceTableResults) { _rebalanceTableResults = rebalanceTableResults; } + + /** + * Aggregated pre-check result that provides table-level pre-check status counts and message mappings + */ + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class AggregatedPrecheckResult { + private final int _tablesPassedCount; + private final int _tablesWarnedCount; + private final int _tablesErroredCount; + private final Map<String, String> _passedTables; + private final Map<String, String> _warnedTables; + private final Map<String, String> _erroredTables; + + public AggregatedPrecheckResult(int tablesPassedCount, int tablesWarnedCount, int tablesErroredCount, + Map<String, String> passedTables, Map<String, String> warnedTables, + Map<String, String> erroredTables) { + _tablesPassedCount = tablesPassedCount; + _tablesWarnedCount = tablesWarnedCount; + _tablesErroredCount = tablesErroredCount; + _passedTables = passedTables; + _warnedTables = warnedTables; + _erroredTables = erroredTables; + } + + @JsonProperty + public int getTablesPassedCount() { + return _tablesPassedCount; + } + + @JsonProperty + public int getTablesWarnedCount() { + return _tablesWarnedCount; + } + + @JsonProperty + public int getTablesErroredCount() { + return _tablesErroredCount; + } + + @JsonProperty + public Map<String, String> getPassedTables() { + return _passedTables; + } + + @JsonProperty + public Map<String, String> getWarnedTables() { + return _warnedTables; + } + + @JsonProperty + public Map<String, String> getErroredTables() { + return _erroredTables; + } + } + + /** + * Step 1: Aggregate ServerSegmentChangeInfo across all tables for each server + */ + private static Map<String, AggregatedServerSegmentChangeInfo> aggregateServerSegmentChangeInfo( + List<RebalanceSummaryResult> summaryResults) { + Map<String, AggregatedServerSegmentChangeInfo> serverAggregates = new HashMap<>(); + + for (RebalanceSummaryResult summary : summaryResults) { + if (summary.getServerInfo() != null && summary.getServerInfo().getServerSegmentChangeInfo() != null) { + for (Map.Entry<String, RebalanceSummaryResult.ServerSegmentChangeInfo> entry : summary.getServerInfo() + .getServerSegmentChangeInfo() + .entrySet()) { + String serverName = entry.getKey(); + RebalanceSummaryResult.ServerSegmentChangeInfo changeInfo = entry.getValue(); + + serverAggregates.computeIfAbsent(serverName, k -> new AggregatedServerSegmentChangeInfo()) + .merge(changeInfo); + } + } + } + + return serverAggregates; + } + + /** + * Helper class to aggregate ServerSegmentChangeInfo across multiple tables + */ + private static class AggregatedServerSegmentChangeInfo extends RebalanceSummaryResult.ServerSegmentChangeInfo { + + AggregatedServerSegmentChangeInfo() { + super(RebalanceSummaryResult.ServerStatus.UNCHANGED, 0, 0, 0, 0, 0, null); + } + + void merge(RebalanceSummaryResult.ServerSegmentChangeInfo changeInfo) { + _totalSegmentsAfterRebalance += changeInfo.getTotalSegmentsAfterRebalance(); + _totalSegmentsBeforeRebalance += changeInfo.getTotalSegmentsBeforeRebalance(); + _segmentsAdded += changeInfo.getSegmentsAdded(); + _segmentsDeleted += changeInfo.getSegmentsDeleted(); + _segmentsUnchanged += changeInfo.getSegmentsUnchanged(); + + // Use tag list from any of the change infos (should be consistent) + if (_tagList == null && changeInfo.getTagList() != null) { + _tagList = changeInfo.getTagList(); + } + if (_totalSegmentsAfterRebalance == 0) { + _serverStatus = RebalanceSummaryResult.ServerStatus.REMOVED; + } else if (_totalSegmentsBeforeRebalance == 0) { + _serverStatus = RebalanceSummaryResult.ServerStatus.ADDED; + } else { + _serverStatus = RebalanceSummaryResult.ServerStatus.UNCHANGED; + } + } + } + + /** + * Aggregated ServerInfo that extends RebalanceSummaryResult.ServerInfo + */ + private static class AggregatedServerInfo extends RebalanceSummaryResult.ServerInfo { + AggregatedServerInfo(Map<String, AggregatedServerSegmentChangeInfo> serverAggregates) { + super(0, null, null, null, null, null, null); + + if (serverAggregates.isEmpty()) { + return; + } + + Set<String> serversAdded = new HashSet<>(); + Set<String> serversRemoved = new HashSet<>(); + Set<String> serversUnchanged = new HashSet<>(); + Set<String> serversGettingNewSegments = new HashSet<>(); + Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> finalServerSegmentChangeInfo = new HashMap<>(); + + int numServersGettingNewSegments = 0; + int totalServersBefore = 0; + int totalServersAfter = 0; + + for (Map.Entry<String, AggregatedServerSegmentChangeInfo> entry : serverAggregates.entrySet()) { + String serverName = entry.getKey(); + AggregatedServerSegmentChangeInfo aggregate = entry.getValue(); + + // Determine server status based on aggregated segment counts + if (aggregate.getServerStatus() == RebalanceSummaryResult.ServerStatus.REMOVED) { + serversRemoved.add(serverName); + } else if (aggregate.getServerStatus() == RebalanceSummaryResult.ServerStatus.ADDED) { + serversAdded.add(serverName); + } else if (aggregate.getServerStatus() == RebalanceSummaryResult.ServerStatus.UNCHANGED) { + serversUnchanged.add(serverName); + } + + // Track servers getting new segments + if (aggregate.getSegmentsAdded() > 0) { + serversGettingNewSegments.add(serverName); + numServersGettingNewSegments++; + } + + // Create final ServerSegmentChangeInfo with determined status + finalServerSegmentChangeInfo.put(serverName, aggregate); + + // Count servers for before/after totals + if (aggregate.getTotalSegmentsBeforeRebalance() > 0) { + totalServersBefore++; + } + if (aggregate.getTotalSegmentsAfterRebalance() > 0) { + totalServersAfter++; + } + } + + RebalanceSummaryResult.RebalanceChangeInfo + aggregatedNumServers = new RebalanceSummaryResult.RebalanceChangeInfo(totalServersBefore, totalServersAfter); + + _numServersGettingNewSegments = numServersGettingNewSegments; + _numServers = aggregatedNumServers; + _serversAdded = serversAdded; + _serversRemoved = serversRemoved; + _serversUnchanged = serversUnchanged; + _serversGettingNewSegments = serversGettingNewSegments; + _serverSegmentChangeInfo = finalServerSegmentChangeInfo; + } + } + + /** + * Aggregated SegmentInfo that extends RebalanceSummaryResult.SegmentInfo + */ + private static class AggregatedSegmentInfo extends RebalanceSummaryResult.SegmentInfo { + AggregatedSegmentInfo(List<RebalanceSummaryResult> summaryResults, + Map<String, AggregatedServerSegmentChangeInfo> serverAggregates) { + super(0, 0, 0, 0, 0, null, null, null, null); + + if (summaryResults.isEmpty()) { + return; + } + + int totalSegmentsToBeMoved = 0; + int totalSegmentsToBeDeleted = 0; + long totalEstimatedDataToBeMovedInBytes = 0; + + int beforeRebalanceSegmentsInSingleReplica = 0; + int afterRebalanceSegmentsInSingleReplica = 0; + int beforeRebalanceSegmentsAcrossAllReplicas = 0; + int afterRebalanceSegmentsAcrossAllReplicas = 0; + + // Track consuming segment summaries for realtime tables + List<RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary> consumingSummaries = new ArrayList<>(); + int validSummariesCount = 0; + + // Aggregate data from individual table summaries + for (RebalanceSummaryResult summary : summaryResults) { + if (summary.getSegmentInfo() != null) { + RebalanceSummaryResult.SegmentInfo segmentInfo = summary.getSegmentInfo(); + validSummariesCount++; + + totalSegmentsToBeMoved += segmentInfo.getTotalSegmentsToBeMoved(); + totalSegmentsToBeDeleted += segmentInfo.getTotalSegmentsToBeDeleted(); + if (totalEstimatedDataToBeMovedInBytes >= 0) { + totalEstimatedDataToBeMovedInBytes = segmentInfo.getTotalEstimatedDataToBeMovedInBytes() < 0 ? -1 + : totalEstimatedDataToBeMovedInBytes + segmentInfo.getTotalEstimatedDataToBeMovedInBytes(); + } + + if (segmentInfo.getNumSegmentsInSingleReplica() != null) { + beforeRebalanceSegmentsInSingleReplica += + segmentInfo.getNumSegmentsInSingleReplica().getValueBeforeRebalance(); + afterRebalanceSegmentsInSingleReplica += + segmentInfo.getNumSegmentsInSingleReplica().getExpectedValueAfterRebalance(); + } + + if (segmentInfo.getNumSegmentsAcrossAllReplicas() != null) { + beforeRebalanceSegmentsAcrossAllReplicas += + segmentInfo.getNumSegmentsAcrossAllReplicas().getValueBeforeRebalance(); + afterRebalanceSegmentsAcrossAllReplicas += + segmentInfo.getNumSegmentsAcrossAllReplicas().getExpectedValueAfterRebalance(); + } + + if (segmentInfo.getConsumingSegmentToBeMovedSummary() != null) { + consumingSummaries.add(segmentInfo.getConsumingSegmentToBeMovedSummary()); + } + } + } + + if (validSummariesCount == 0) { + return; + } + + int maxSegmentsAddedToASingleServer = 0; + for (AggregatedServerSegmentChangeInfo aggregate : serverAggregates.values()) { + maxSegmentsAddedToASingleServer = Math.max(maxSegmentsAddedToASingleServer, aggregate.getSegmentsAdded()); + } + + // Calculate average segment size + long estimatedAverageSegmentSizeInBytes = + totalEstimatedDataToBeMovedInBytes >= 0 && totalSegmentsToBeMoved > 0 ? totalEstimatedDataToBeMovedInBytes + / totalSegmentsToBeMoved : 0; + + // Create aggregated RebalanceChangeInfo objects + RebalanceSummaryResult.RebalanceChangeInfo aggregatedNumSegmentsInSingleReplica = + new RebalanceSummaryResult.RebalanceChangeInfo( + beforeRebalanceSegmentsInSingleReplica, afterRebalanceSegmentsInSingleReplica); + RebalanceSummaryResult.RebalanceChangeInfo aggregatedNumSegmentsAcrossAllReplicas = + new RebalanceSummaryResult.RebalanceChangeInfo( + beforeRebalanceSegmentsAcrossAllReplicas, afterRebalanceSegmentsAcrossAllReplicas); + + // Aggregate consuming segment summaries + RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary aggregatedConsumingSummary = + aggregateConsumingSegmentSummary(consumingSummaries); + + // Set the computed values + _totalSegmentsToBeMoved = totalSegmentsToBeMoved; + _totalSegmentsToBeDeleted = totalSegmentsToBeDeleted; + _maxSegmentsAddedToASingleServer = maxSegmentsAddedToASingleServer; + _estimatedAverageSegmentSizeInBytes = estimatedAverageSegmentSizeInBytes; + _totalEstimatedDataToBeMovedInBytes = totalEstimatedDataToBeMovedInBytes; + _replicationFactor = null; // This is irrelevant for tenant rebalance + _numSegmentsInSingleReplica = aggregatedNumSegmentsInSingleReplica; + _numSegmentsAcrossAllReplicas = aggregatedNumSegmentsAcrossAllReplicas; + _consumingSegmentToBeMovedSummary = aggregatedConsumingSummary; + } + } + + /** + * Aggregates consuming segment summaries for realtime tables + */ + private static RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary aggregateConsumingSegmentSummary( + List<RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary> summaries) { + if (summaries.isEmpty()) { + return null; + } + + int totalNumConsumingSegmentsToBeMoved = 0; + + // Create maps to store all segments by offset and age + Map<String, Integer> consumingSegmentsWithMostOffsetsPerTable = new HashMap<>(); + Map<String, Integer> consumingSegmentsWithOldestAgePerTable = new HashMap<>(); + + // Aggregate ConsumingSegmentSummaryPerServer by server name across all tables + Map<String, AggregatedConsumingSegmentSummaryPerServer> serverAggregates = new HashMap<>(); + + for (RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary summary : summaries) { + totalNumConsumingSegmentsToBeMoved += summary.getNumConsumingSegmentsToBeMoved(); + + // Add one segment with offsets for each table + if (summary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp() != null + && !summary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp().isEmpty()) { + Map.Entry<String, Integer> consumingSegmentWithMostOffsetsToCatchUp = + summary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp().entrySet().iterator().next(); + consumingSegmentsWithMostOffsetsPerTable.put(consumingSegmentWithMostOffsetsToCatchUp.getKey(), + consumingSegmentWithMostOffsetsToCatchUp.getValue()); + } + + // Add all segments with ages + if (summary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes() != null + && !summary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().isEmpty()) { + Map.Entry<String, Integer> consumingSegmentWithOldestAge = + summary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().entrySet().iterator().next(); + consumingSegmentsWithOldestAgePerTable.put(consumingSegmentWithOldestAge.getKey(), + consumingSegmentWithOldestAge.getValue()); + } + + // Aggregate server consuming segment summaries by server name + if (summary.getServerConsumingSegmentSummary() != null) { + for (Map.Entry<String, + RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer> entry + : summary.getServerConsumingSegmentSummary() + .entrySet()) { + String serverName = entry.getKey(); + RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer serverSummary = + entry.getValue(); + + serverAggregates.computeIfAbsent(serverName, k -> new AggregatedConsumingSegmentSummaryPerServer()) + .merge(serverSummary); + } + } + } + + // Keep only top k segments with the highest offsets + Map<String, Integer> sortedConsumingSegmentsWithMostOffsetsPerTable = new LinkedHashMap<>(); + consumingSegmentsWithMostOffsetsPerTable.entrySet().stream() + .sorted(Map.Entry.<String, Integer>comparingByValue().reversed()) + .forEach(entry -> sortedConsumingSegmentsWithMostOffsetsPerTable.put(entry.getKey(), entry.getValue())); + + // Keep only top k segments with the oldest ages Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org