somandal commented on code in PR #16136:
URL: https://github.com/apache/pinot/pull/16136#discussion_r2155704695


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java:
##########
@@ -65,4 +215,449 @@ public void setJobId(String jobId) {
   public void setRebalanceTableResults(Map<String, RebalanceResult> 
rebalanceTableResults) {
     _rebalanceTableResults = rebalanceTableResults;
   }
+
+  /**
+   * Aggregated pre-check result that provides table-level pre-check status 
counts and message mappings
+   */
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public static class AggregatedPrecheckResult {
+    private final int _tablesPassedCount;
+    private final int _tablesWarnedCount;
+    private final int _tablesErroredCount;
+    private final Map<String, String> _passedTables;
+    private final Map<String, String> _warnedTables;
+    private final Map<String, String> _erroredTables;
+
+    public AggregatedPrecheckResult(int tablesPassedCount, int 
tablesWarnedCount, int tablesErroredCount,
+        Map<String, String> passedTables, Map<String, String> warnedTables,
+        Map<String, String> erroredTables) {
+      _tablesPassedCount = tablesPassedCount;
+      _tablesWarnedCount = tablesWarnedCount;
+      _tablesErroredCount = tablesErroredCount;
+      _passedTables = passedTables;
+      _warnedTables = warnedTables;
+      _erroredTables = erroredTables;
+    }
+
+    @JsonProperty
+    public int getTablesPassedCount() {
+      return _tablesPassedCount;
+    }
+
+    @JsonProperty
+    public int getTablesWarnedCount() {
+      return _tablesWarnedCount;
+    }
+
+    @JsonProperty
+    public int getTablesErroredCount() {
+      return _tablesErroredCount;
+    }
+
+    @JsonProperty
+    public Map<String, String> getPassedTables() {
+      return _passedTables;
+    }
+
+    @JsonProperty
+    public Map<String, String> getWarnedTables() {
+      return _warnedTables;
+    }
+
+    @JsonProperty
+    public Map<String, String> getErroredTables() {
+      return _erroredTables;
+    }
+  }
+
+  /**
+   * Step 1: Aggregate ServerSegmentChangeInfo across all tables for each 
server
+   */
+  private static Map<String, AggregatedServerSegmentChangeInfo> 
aggregateServerSegmentChangeInfo(
+      List<RebalanceSummaryResult> summaryResults) {
+    Map<String, AggregatedServerSegmentChangeInfo> serverAggregates = new 
HashMap<>();
+
+    for (RebalanceSummaryResult summary : summaryResults) {
+      if (summary.getServerInfo() != null && 
summary.getServerInfo().getServerSegmentChangeInfo() != null) {
+        for (Map.Entry<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
entry : summary.getServerInfo()
+            .getServerSegmentChangeInfo()
+            .entrySet()) {
+          String serverName = entry.getKey();
+          RebalanceSummaryResult.ServerSegmentChangeInfo changeInfo = 
entry.getValue();
+
+          serverAggregates.computeIfAbsent(serverName, k -> new 
AggregatedServerSegmentChangeInfo())
+              .merge(changeInfo);
+        }
+      }
+    }
+
+    return serverAggregates;
+  }
+
+  /**
+   * Helper class to aggregate ServerSegmentChangeInfo across multiple tables
+   */
+  private static class AggregatedServerSegmentChangeInfo extends 
RebalanceSummaryResult.ServerSegmentChangeInfo {
+
+    AggregatedServerSegmentChangeInfo() {
+      super(RebalanceSummaryResult.ServerStatus.UNCHANGED, 0, 0, 0, 0, 0, 
null);
+    }
+
+    void merge(RebalanceSummaryResult.ServerSegmentChangeInfo changeInfo) {
+      _totalSegmentsAfterRebalance += 
changeInfo.getTotalSegmentsAfterRebalance();
+      _totalSegmentsBeforeRebalance += 
changeInfo.getTotalSegmentsBeforeRebalance();
+      _segmentsAdded += changeInfo.getSegmentsAdded();
+      _segmentsDeleted += changeInfo.getSegmentsDeleted();
+      _segmentsUnchanged += changeInfo.getSegmentsUnchanged();
+
+      // Use tag list from any of the change infos (should be consistent)
+      if (_tagList == null && changeInfo.getTagList() != null) {
+        _tagList = changeInfo.getTagList();
+      }
+      if (_totalSegmentsAfterRebalance == 0) {
+        _serverStatus = RebalanceSummaryResult.ServerStatus.REMOVED;
+      } else if (_totalSegmentsBeforeRebalance == 0) {
+        _serverStatus = RebalanceSummaryResult.ServerStatus.ADDED;
+      } else {
+        _serverStatus = RebalanceSummaryResult.ServerStatus.UNCHANGED;
+      }
+    }
+  }
+
+  /**
+   * Aggregated ServerInfo that extends RebalanceSummaryResult.ServerInfo
+   */
+  private static class AggregatedServerInfo extends 
RebalanceSummaryResult.ServerInfo {
+    AggregatedServerInfo(Map<String, AggregatedServerSegmentChangeInfo> 
serverAggregates) {
+      super(0, null, null, null, null, null, null);
+
+      if (serverAggregates.isEmpty()) {
+        return;
+      }
+
+      Set<String> serversAdded = new HashSet<>();
+      Set<String> serversRemoved = new HashSet<>();
+      Set<String> serversUnchanged = new HashSet<>();
+      Set<String> serversGettingNewSegments = new HashSet<>();
+      Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
finalServerSegmentChangeInfo = new HashMap<>();
+
+      int numServersGettingNewSegments = 0;
+      int totalServersBefore = 0;
+      int totalServersAfter = 0;
+
+      for (Map.Entry<String, AggregatedServerSegmentChangeInfo> entry : 
serverAggregates.entrySet()) {
+        String serverName = entry.getKey();
+        AggregatedServerSegmentChangeInfo aggregate = entry.getValue();
+
+        // Determine server status based on aggregated segment counts
+        if (aggregate.getServerStatus() == 
RebalanceSummaryResult.ServerStatus.REMOVED) {
+          serversRemoved.add(serverName);
+        } else if (aggregate.getServerStatus() == 
RebalanceSummaryResult.ServerStatus.ADDED) {
+          serversAdded.add(serverName);
+        } else if (aggregate.getServerStatus() == 
RebalanceSummaryResult.ServerStatus.UNCHANGED) {
+          serversUnchanged.add(serverName);
+        }
+
+        // Track servers getting new segments
+        if (aggregate.getSegmentsAdded() > 0) {
+          serversGettingNewSegments.add(serverName);
+          numServersGettingNewSegments++;
+        }
+
+        // Create final ServerSegmentChangeInfo with determined status
+        finalServerSegmentChangeInfo.put(serverName, aggregate);
+
+        // Count servers for before/after totals
+        if (aggregate.getTotalSegmentsBeforeRebalance() > 0) {
+          totalServersBefore++;
+        }
+        if (aggregate.getTotalSegmentsAfterRebalance() > 0) {
+          totalServersAfter++;
+        }
+      }
+
+      RebalanceSummaryResult.RebalanceChangeInfo
+          aggregatedNumServers = new 
RebalanceSummaryResult.RebalanceChangeInfo(totalServersBefore, 
totalServersAfter);
+
+      // Set the computed values using reflection or by updating the parent 
constructor call

Review Comment:
   what does this comment mean? you're updating the fields as is, right?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java:
##########
@@ -65,4 +215,449 @@ public void setJobId(String jobId) {
   public void setRebalanceTableResults(Map<String, RebalanceResult> 
rebalanceTableResults) {
     _rebalanceTableResults = rebalanceTableResults;
   }
+
+  /**
+   * Aggregated pre-check result that provides table-level pre-check status 
counts and message mappings
+   */
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public static class AggregatedPrecheckResult {
+    private final int _tablesPassedCount;
+    private final int _tablesWarnedCount;
+    private final int _tablesErroredCount;
+    private final Map<String, String> _passedTables;
+    private final Map<String, String> _warnedTables;
+    private final Map<String, String> _erroredTables;
+
+    public AggregatedPrecheckResult(int tablesPassedCount, int 
tablesWarnedCount, int tablesErroredCount,
+        Map<String, String> passedTables, Map<String, String> warnedTables,
+        Map<String, String> erroredTables) {
+      _tablesPassedCount = tablesPassedCount;
+      _tablesWarnedCount = tablesWarnedCount;
+      _tablesErroredCount = tablesErroredCount;
+      _passedTables = passedTables;
+      _warnedTables = warnedTables;
+      _erroredTables = erroredTables;
+    }
+
+    @JsonProperty
+    public int getTablesPassedCount() {
+      return _tablesPassedCount;
+    }
+
+    @JsonProperty
+    public int getTablesWarnedCount() {
+      return _tablesWarnedCount;
+    }
+
+    @JsonProperty
+    public int getTablesErroredCount() {
+      return _tablesErroredCount;
+    }
+
+    @JsonProperty
+    public Map<String, String> getPassedTables() {
+      return _passedTables;
+    }
+
+    @JsonProperty
+    public Map<String, String> getWarnedTables() {
+      return _warnedTables;
+    }
+
+    @JsonProperty
+    public Map<String, String> getErroredTables() {
+      return _erroredTables;
+    }
+  }
+
+  /**
+   * Step 1: Aggregate ServerSegmentChangeInfo across all tables for each 
server
+   */
+  private static Map<String, AggregatedServerSegmentChangeInfo> 
aggregateServerSegmentChangeInfo(
+      List<RebalanceSummaryResult> summaryResults) {
+    Map<String, AggregatedServerSegmentChangeInfo> serverAggregates = new 
HashMap<>();
+
+    for (RebalanceSummaryResult summary : summaryResults) {
+      if (summary.getServerInfo() != null && 
summary.getServerInfo().getServerSegmentChangeInfo() != null) {
+        for (Map.Entry<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
entry : summary.getServerInfo()
+            .getServerSegmentChangeInfo()
+            .entrySet()) {
+          String serverName = entry.getKey();
+          RebalanceSummaryResult.ServerSegmentChangeInfo changeInfo = 
entry.getValue();
+
+          serverAggregates.computeIfAbsent(serverName, k -> new 
AggregatedServerSegmentChangeInfo())
+              .merge(changeInfo);
+        }
+      }
+    }
+
+    return serverAggregates;
+  }
+
+  /**
+   * Helper class to aggregate ServerSegmentChangeInfo across multiple tables
+   */
+  private static class AggregatedServerSegmentChangeInfo extends 
RebalanceSummaryResult.ServerSegmentChangeInfo {
+
+    AggregatedServerSegmentChangeInfo() {
+      super(RebalanceSummaryResult.ServerStatus.UNCHANGED, 0, 0, 0, 0, 0, 
null);
+    }
+
+    void merge(RebalanceSummaryResult.ServerSegmentChangeInfo changeInfo) {
+      _totalSegmentsAfterRebalance += 
changeInfo.getTotalSegmentsAfterRebalance();
+      _totalSegmentsBeforeRebalance += 
changeInfo.getTotalSegmentsBeforeRebalance();
+      _segmentsAdded += changeInfo.getSegmentsAdded();
+      _segmentsDeleted += changeInfo.getSegmentsDeleted();
+      _segmentsUnchanged += changeInfo.getSegmentsUnchanged();
+
+      // Use tag list from any of the change infos (should be consistent)
+      if (_tagList == null && changeInfo.getTagList() != null) {
+        _tagList = changeInfo.getTagList();
+      }
+      if (_totalSegmentsAfterRebalance == 0) {
+        _serverStatus = RebalanceSummaryResult.ServerStatus.REMOVED;
+      } else if (_totalSegmentsBeforeRebalance == 0) {
+        _serverStatus = RebalanceSummaryResult.ServerStatus.ADDED;
+      } else {
+        _serverStatus = RebalanceSummaryResult.ServerStatus.UNCHANGED;
+      }
+    }
+  }
+
+  /**
+   * Aggregated ServerInfo that extends RebalanceSummaryResult.ServerInfo
+   */
+  private static class AggregatedServerInfo extends 
RebalanceSummaryResult.ServerInfo {
+    AggregatedServerInfo(Map<String, AggregatedServerSegmentChangeInfo> 
serverAggregates) {
+      super(0, null, null, null, null, null, null);
+
+      if (serverAggregates.isEmpty()) {
+        return;
+      }
+
+      Set<String> serversAdded = new HashSet<>();
+      Set<String> serversRemoved = new HashSet<>();
+      Set<String> serversUnchanged = new HashSet<>();
+      Set<String> serversGettingNewSegments = new HashSet<>();
+      Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
finalServerSegmentChangeInfo = new HashMap<>();
+
+      int numServersGettingNewSegments = 0;
+      int totalServersBefore = 0;
+      int totalServersAfter = 0;
+
+      for (Map.Entry<String, AggregatedServerSegmentChangeInfo> entry : 
serverAggregates.entrySet()) {
+        String serverName = entry.getKey();
+        AggregatedServerSegmentChangeInfo aggregate = entry.getValue();
+
+        // Determine server status based on aggregated segment counts
+        if (aggregate.getServerStatus() == 
RebalanceSummaryResult.ServerStatus.REMOVED) {
+          serversRemoved.add(serverName);
+        } else if (aggregate.getServerStatus() == 
RebalanceSummaryResult.ServerStatus.ADDED) {
+          serversAdded.add(serverName);
+        } else if (aggregate.getServerStatus() == 
RebalanceSummaryResult.ServerStatus.UNCHANGED) {
+          serversUnchanged.add(serverName);
+        }
+
+        // Track servers getting new segments
+        if (aggregate.getSegmentsAdded() > 0) {
+          serversGettingNewSegments.add(serverName);
+          numServersGettingNewSegments++;
+        }
+
+        // Create final ServerSegmentChangeInfo with determined status
+        finalServerSegmentChangeInfo.put(serverName, aggregate);
+
+        // Count servers for before/after totals
+        if (aggregate.getTotalSegmentsBeforeRebalance() > 0) {
+          totalServersBefore++;
+        }
+        if (aggregate.getTotalSegmentsAfterRebalance() > 0) {
+          totalServersAfter++;
+        }
+      }
+
+      RebalanceSummaryResult.RebalanceChangeInfo
+          aggregatedNumServers = new 
RebalanceSummaryResult.RebalanceChangeInfo(totalServersBefore, 
totalServersAfter);
+
+      // Set the computed values using reflection or by updating the parent 
constructor call
+      _numServersGettingNewSegments = numServersGettingNewSegments;
+      _numServers = aggregatedNumServers;
+      _serversAdded = serversAdded;
+      _serversRemoved = serversRemoved;
+      _serversUnchanged = serversUnchanged;
+      _serversGettingNewSegments = serversGettingNewSegments;
+      _serverSegmentChangeInfo = finalServerSegmentChangeInfo;
+    }
+  }
+
+  /**
+   * Aggregated SegmentInfo that extends RebalanceSummaryResult.SegmentInfo
+   */
+  private static class AggregatedSegmentInfo extends 
RebalanceSummaryResult.SegmentInfo {
+    AggregatedSegmentInfo(List<RebalanceSummaryResult> summaryResults,
+        Map<String, AggregatedServerSegmentChangeInfo> serverAggregates) {
+      super(0, 0, 0, 0, 0, null, null, null, null);
+
+      if (summaryResults.isEmpty()) {
+        return;
+      }
+
+      int totalSegmentsToBeMoved = 0;
+      int totalSegmentsToBeDeleted = 0;
+      long totalEstimatedDataToBeMovedInBytes = 0;
+
+      int beforeRebalanceReplicationFactor = 0;
+      int afterRebalanceReplicationFactor = 0;
+      int beforeRebalanceSegmentsInSingleReplica = 0;
+      int afterRebalanceSegmentsInSingleReplica = 0;
+      int beforeRebalanceSegmentsAcrossAllReplicas = 0;
+      int afterRebalanceSegmentsAcrossAllReplicas = 0;
+
+      // Track consuming segment summaries for realtime tables
+      List<RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary> 
consumingSummaries = new ArrayList<>();
+      int validSummariesCount = 0;
+
+      // Aggregate data from individual table summaries
+      for (RebalanceSummaryResult summary : summaryResults) {
+        if (summary.getSegmentInfo() != null) {
+          RebalanceSummaryResult.SegmentInfo segmentInfo = 
summary.getSegmentInfo();
+          validSummariesCount++;
+
+          totalSegmentsToBeMoved += segmentInfo.getTotalSegmentsToBeMoved();
+          totalSegmentsToBeDeleted += 
segmentInfo.getTotalSegmentsToBeDeleted();
+          if (totalEstimatedDataToBeMovedInBytes >= 0) {
+            totalEstimatedDataToBeMovedInBytes = 
segmentInfo.getTotalEstimatedDataToBeMovedInBytes() < 0 ? -1
+                : totalEstimatedDataToBeMovedInBytes + 
segmentInfo.getTotalEstimatedDataToBeMovedInBytes();
+          }
+
+          if (segmentInfo.getNumSegmentsInSingleReplica() != null) {
+            beforeRebalanceSegmentsInSingleReplica +=
+                
segmentInfo.getNumSegmentsInSingleReplica().getValueBeforeRebalance();
+            afterRebalanceSegmentsInSingleReplica +=
+                
segmentInfo.getNumSegmentsInSingleReplica().getExpectedValueAfterRebalance();
+          }
+
+          if (segmentInfo.getNumSegmentsAcrossAllReplicas() != null) {
+            beforeRebalanceSegmentsAcrossAllReplicas +=
+                
segmentInfo.getNumSegmentsAcrossAllReplicas().getValueBeforeRebalance();
+            afterRebalanceSegmentsAcrossAllReplicas +=
+                
segmentInfo.getNumSegmentsAcrossAllReplicas().getExpectedValueAfterRebalance();
+          }
+
+          if (segmentInfo.getConsumingSegmentToBeMovedSummary() != null) {
+            
consumingSummaries.add(segmentInfo.getConsumingSegmentToBeMovedSummary());
+          }
+        }
+      }
+
+      if (validSummariesCount == 0) {
+        return;
+      }
+
+      int maxSegmentsAddedToASingleServer = 0;
+      for (AggregatedServerSegmentChangeInfo aggregate : 
serverAggregates.values()) {
+        maxSegmentsAddedToASingleServer = 
Math.max(maxSegmentsAddedToASingleServer, aggregate.getSegmentsAdded());
+      }
+
+      // Calculate average segment size
+      long estimatedAverageSegmentSizeInBytes =
+          totalEstimatedDataToBeMovedInBytes >= 0 && totalSegmentsToBeMoved > 
0 ? totalEstimatedDataToBeMovedInBytes
+              / totalSegmentsToBeMoved : 0;
+
+      // Create aggregated RebalanceChangeInfo objects
+      RebalanceSummaryResult.RebalanceChangeInfo 
aggregatedNumSegmentsInSingleReplica =
+          new RebalanceSummaryResult.RebalanceChangeInfo(
+              beforeRebalanceSegmentsInSingleReplica, 
afterRebalanceSegmentsInSingleReplica);
+      RebalanceSummaryResult.RebalanceChangeInfo 
aggregatedNumSegmentsAcrossAllReplicas =
+          new RebalanceSummaryResult.RebalanceChangeInfo(
+              beforeRebalanceSegmentsAcrossAllReplicas, 
afterRebalanceSegmentsAcrossAllReplicas);
+
+      // Aggregate consuming segment summaries
+      RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
aggregatedConsumingSummary =
+          aggregateConsumingSegmentSummary(consumingSummaries);
+
+      // Set the computed values
+      _totalSegmentsToBeMoved = totalSegmentsToBeMoved;
+      _totalSegmentsToBeDeleted = totalSegmentsToBeDeleted;
+      _maxSegmentsAddedToASingleServer = maxSegmentsAddedToASingleServer;
+      _estimatedAverageSegmentSizeInBytes = estimatedAverageSegmentSizeInBytes;
+      _totalEstimatedDataToBeMovedInBytes = totalEstimatedDataToBeMovedInBytes;
+      _replicationFactor = null; // This is irrelevant for tenant rebalance
+      _numSegmentsInSingleReplica = aggregatedNumSegmentsInSingleReplica;
+      _numSegmentsAcrossAllReplicas = aggregatedNumSegmentsAcrossAllReplicas;
+      _consumingSegmentToBeMovedSummary = aggregatedConsumingSummary;
+    }
+  }
+
+  /**
+   * Aggregates consuming segment summaries for realtime tables
+   */
+  private static RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
aggregateConsumingSegmentSummary(
+      List<RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary> summaries) 
{
+    if (summaries.isEmpty()) {
+      return null;
+    }
+
+    int totalNumConsumingSegmentsToBeMoved = 0;
+    // Find the maximum size of offset and age maps across all tables
+    int maxOffsetMapSize = 0;
+    int maxAgeMapSize = 0;
+    for (RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary summary : 
summaries) {
+      if (summary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp() != 
null) {
+        maxOffsetMapSize = Math.max(maxOffsetMapSize,
+            
summary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp().size());
+      }
+      if (summary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes() != 
null) {
+        maxAgeMapSize = Math.max(maxAgeMapSize,
+            
summary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().size());
+      }
+    }
+
+    // Create maps to store all segments by offset and age
+    Map<String, Integer> allConsumingSegmentsWithOffsets = new HashMap<>();
+    Map<String, Integer> allConsumingSegmentsWithAges = new HashMap<>();
+
+    // Aggregate ConsumingSegmentSummaryPerServer by server name across all 
tables
+    Map<String, AggregatedConsumingSegmentSummaryPerServer> serverAggregates = 
new HashMap<>();
+
+    for (RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary summary : 
summaries) {
+      totalNumConsumingSegmentsToBeMoved += 
summary.getNumConsumingSegmentsToBeMoved();
+
+      // Add all segments with offsets
+      if (summary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp() != 
null) {
+        
allConsumingSegmentsWithOffsets.putAll(summary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp());
+      }
+
+      // Add all segments with ages
+      if (summary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes() != 
null) {
+        
allConsumingSegmentsWithAges.putAll(summary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes());
+      }
+
+      // Aggregate server consuming segment summaries by server name
+      if (summary.getServerConsumingSegmentSummary() != null) {
+        for (Map.Entry<String,
+            
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer>
 entry
+            : summary.getServerConsumingSegmentSummary()
+            .entrySet()) {
+          String serverName = entry.getKey();
+          
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer
 serverSummary =
+              entry.getValue();
+
+          serverAggregates.computeIfAbsent(serverName, k -> new 
AggregatedConsumingSegmentSummaryPerServer())
+              .merge(serverSummary);
+        }
+      }
+    }
+
+    // Keep only top k segments with the highest offsets

Review Comment:
   yeah like I mentioned earlier, should we keep a top K per server rather than 
overall? here K can be smaller like top 3. Same for age



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java:
##########
@@ -65,4 +215,449 @@ public void setJobId(String jobId) {
   public void setRebalanceTableResults(Map<String, RebalanceResult> 
rebalanceTableResults) {
     _rebalanceTableResults = rebalanceTableResults;
   }
+
+  /**
+   * Aggregated pre-check result that provides table-level pre-check status 
counts and message mappings
+   */
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public static class AggregatedPrecheckResult {
+    private final int _tablesPassedCount;
+    private final int _tablesWarnedCount;
+    private final int _tablesErroredCount;
+    private final Map<String, String> _passedTables;
+    private final Map<String, String> _warnedTables;
+    private final Map<String, String> _erroredTables;
+
+    public AggregatedPrecheckResult(int tablesPassedCount, int 
tablesWarnedCount, int tablesErroredCount,
+        Map<String, String> passedTables, Map<String, String> warnedTables,
+        Map<String, String> erroredTables) {
+      _tablesPassedCount = tablesPassedCount;
+      _tablesWarnedCount = tablesWarnedCount;
+      _tablesErroredCount = tablesErroredCount;
+      _passedTables = passedTables;
+      _warnedTables = warnedTables;
+      _erroredTables = erroredTables;
+    }
+
+    @JsonProperty
+    public int getTablesPassedCount() {
+      return _tablesPassedCount;
+    }
+
+    @JsonProperty
+    public int getTablesWarnedCount() {
+      return _tablesWarnedCount;
+    }
+
+    @JsonProperty
+    public int getTablesErroredCount() {
+      return _tablesErroredCount;
+    }
+
+    @JsonProperty
+    public Map<String, String> getPassedTables() {
+      return _passedTables;
+    }
+
+    @JsonProperty
+    public Map<String, String> getWarnedTables() {
+      return _warnedTables;
+    }
+
+    @JsonProperty
+    public Map<String, String> getErroredTables() {
+      return _erroredTables;
+    }
+  }
+
+  /**
+   * Step 1: Aggregate ServerSegmentChangeInfo across all tables for each 
server
+   */
+  private static Map<String, AggregatedServerSegmentChangeInfo> 
aggregateServerSegmentChangeInfo(
+      List<RebalanceSummaryResult> summaryResults) {
+    Map<String, AggregatedServerSegmentChangeInfo> serverAggregates = new 
HashMap<>();
+
+    for (RebalanceSummaryResult summary : summaryResults) {
+      if (summary.getServerInfo() != null && 
summary.getServerInfo().getServerSegmentChangeInfo() != null) {
+        for (Map.Entry<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
entry : summary.getServerInfo()
+            .getServerSegmentChangeInfo()
+            .entrySet()) {
+          String serverName = entry.getKey();
+          RebalanceSummaryResult.ServerSegmentChangeInfo changeInfo = 
entry.getValue();
+
+          serverAggregates.computeIfAbsent(serverName, k -> new 
AggregatedServerSegmentChangeInfo())
+              .merge(changeInfo);
+        }
+      }
+    }
+
+    return serverAggregates;
+  }
+
+  /**
+   * Helper class to aggregate ServerSegmentChangeInfo across multiple tables
+   */
+  private static class AggregatedServerSegmentChangeInfo extends 
RebalanceSummaryResult.ServerSegmentChangeInfo {
+
+    AggregatedServerSegmentChangeInfo() {
+      super(RebalanceSummaryResult.ServerStatus.UNCHANGED, 0, 0, 0, 0, 0, 
null);
+    }
+
+    void merge(RebalanceSummaryResult.ServerSegmentChangeInfo changeInfo) {
+      _totalSegmentsAfterRebalance += 
changeInfo.getTotalSegmentsAfterRebalance();
+      _totalSegmentsBeforeRebalance += 
changeInfo.getTotalSegmentsBeforeRebalance();
+      _segmentsAdded += changeInfo.getSegmentsAdded();
+      _segmentsDeleted += changeInfo.getSegmentsDeleted();
+      _segmentsUnchanged += changeInfo.getSegmentsUnchanged();
+
+      // Use tag list from any of the change infos (should be consistent)
+      if (_tagList == null && changeInfo.getTagList() != null) {
+        _tagList = changeInfo.getTagList();
+      }
+      if (_totalSegmentsAfterRebalance == 0) {
+        _serverStatus = RebalanceSummaryResult.ServerStatus.REMOVED;
+      } else if (_totalSegmentsBeforeRebalance == 0) {
+        _serverStatus = RebalanceSummaryResult.ServerStatus.ADDED;
+      } else {
+        _serverStatus = RebalanceSummaryResult.ServerStatus.UNCHANGED;
+      }
+    }
+  }
+
+  /**
+   * Aggregated ServerInfo that extends RebalanceSummaryResult.ServerInfo
+   */
+  private static class AggregatedServerInfo extends 
RebalanceSummaryResult.ServerInfo {
+    AggregatedServerInfo(Map<String, AggregatedServerSegmentChangeInfo> 
serverAggregates) {
+      super(0, null, null, null, null, null, null);
+
+      if (serverAggregates.isEmpty()) {
+        return;
+      }
+
+      Set<String> serversAdded = new HashSet<>();
+      Set<String> serversRemoved = new HashSet<>();
+      Set<String> serversUnchanged = new HashSet<>();
+      Set<String> serversGettingNewSegments = new HashSet<>();
+      Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
finalServerSegmentChangeInfo = new HashMap<>();
+
+      int numServersGettingNewSegments = 0;
+      int totalServersBefore = 0;
+      int totalServersAfter = 0;
+
+      for (Map.Entry<String, AggregatedServerSegmentChangeInfo> entry : 
serverAggregates.entrySet()) {
+        String serverName = entry.getKey();
+        AggregatedServerSegmentChangeInfo aggregate = entry.getValue();
+
+        // Determine server status based on aggregated segment counts
+        if (aggregate.getServerStatus() == 
RebalanceSummaryResult.ServerStatus.REMOVED) {
+          serversRemoved.add(serverName);
+        } else if (aggregate.getServerStatus() == 
RebalanceSummaryResult.ServerStatus.ADDED) {
+          serversAdded.add(serverName);
+        } else if (aggregate.getServerStatus() == 
RebalanceSummaryResult.ServerStatus.UNCHANGED) {
+          serversUnchanged.add(serverName);
+        }
+
+        // Track servers getting new segments
+        if (aggregate.getSegmentsAdded() > 0) {
+          serversGettingNewSegments.add(serverName);
+          numServersGettingNewSegments++;
+        }
+
+        // Create final ServerSegmentChangeInfo with determined status
+        finalServerSegmentChangeInfo.put(serverName, aggregate);
+
+        // Count servers for before/after totals
+        if (aggregate.getTotalSegmentsBeforeRebalance() > 0) {
+          totalServersBefore++;
+        }
+        if (aggregate.getTotalSegmentsAfterRebalance() > 0) {
+          totalServersAfter++;
+        }
+      }
+
+      RebalanceSummaryResult.RebalanceChangeInfo
+          aggregatedNumServers = new 
RebalanceSummaryResult.RebalanceChangeInfo(totalServersBefore, 
totalServersAfter);
+
+      // Set the computed values using reflection or by updating the parent 
constructor call
+      _numServersGettingNewSegments = numServersGettingNewSegments;
+      _numServers = aggregatedNumServers;
+      _serversAdded = serversAdded;
+      _serversRemoved = serversRemoved;
+      _serversUnchanged = serversUnchanged;
+      _serversGettingNewSegments = serversGettingNewSegments;
+      _serverSegmentChangeInfo = finalServerSegmentChangeInfo;
+    }
+  }
+
+  /**
+   * Aggregated SegmentInfo that extends RebalanceSummaryResult.SegmentInfo
+   */
+  private static class AggregatedSegmentInfo extends 
RebalanceSummaryResult.SegmentInfo {
+    AggregatedSegmentInfo(List<RebalanceSummaryResult> summaryResults,
+        Map<String, AggregatedServerSegmentChangeInfo> serverAggregates) {
+      super(0, 0, 0, 0, 0, null, null, null, null);
+
+      if (summaryResults.isEmpty()) {
+        return;
+      }
+
+      int totalSegmentsToBeMoved = 0;
+      int totalSegmentsToBeDeleted = 0;
+      long totalEstimatedDataToBeMovedInBytes = 0;
+
+      int beforeRebalanceReplicationFactor = 0;
+      int afterRebalanceReplicationFactor = 0;
+      int beforeRebalanceSegmentsInSingleReplica = 0;
+      int afterRebalanceSegmentsInSingleReplica = 0;
+      int beforeRebalanceSegmentsAcrossAllReplicas = 0;
+      int afterRebalanceSegmentsAcrossAllReplicas = 0;
+
+      // Track consuming segment summaries for realtime tables
+      List<RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary> 
consumingSummaries = new ArrayList<>();
+      int validSummariesCount = 0;
+
+      // Aggregate data from individual table summaries
+      for (RebalanceSummaryResult summary : summaryResults) {
+        if (summary.getSegmentInfo() != null) {
+          RebalanceSummaryResult.SegmentInfo segmentInfo = 
summary.getSegmentInfo();
+          validSummariesCount++;
+
+          totalSegmentsToBeMoved += segmentInfo.getTotalSegmentsToBeMoved();
+          totalSegmentsToBeDeleted += 
segmentInfo.getTotalSegmentsToBeDeleted();
+          if (totalEstimatedDataToBeMovedInBytes >= 0) {
+            totalEstimatedDataToBeMovedInBytes = 
segmentInfo.getTotalEstimatedDataToBeMovedInBytes() < 0 ? -1
+                : totalEstimatedDataToBeMovedInBytes + 
segmentInfo.getTotalEstimatedDataToBeMovedInBytes();
+          }
+
+          if (segmentInfo.getNumSegmentsInSingleReplica() != null) {
+            beforeRebalanceSegmentsInSingleReplica +=
+                
segmentInfo.getNumSegmentsInSingleReplica().getValueBeforeRebalance();
+            afterRebalanceSegmentsInSingleReplica +=
+                
segmentInfo.getNumSegmentsInSingleReplica().getExpectedValueAfterRebalance();
+          }
+
+          if (segmentInfo.getNumSegmentsAcrossAllReplicas() != null) {
+            beforeRebalanceSegmentsAcrossAllReplicas +=
+                
segmentInfo.getNumSegmentsAcrossAllReplicas().getValueBeforeRebalance();
+            afterRebalanceSegmentsAcrossAllReplicas +=
+                
segmentInfo.getNumSegmentsAcrossAllReplicas().getExpectedValueAfterRebalance();
+          }
+
+          if (segmentInfo.getConsumingSegmentToBeMovedSummary() != null) {
+            
consumingSummaries.add(segmentInfo.getConsumingSegmentToBeMovedSummary());
+          }
+        }
+      }
+
+      if (validSummariesCount == 0) {
+        return;
+      }
+
+      int maxSegmentsAddedToASingleServer = 0;
+      for (AggregatedServerSegmentChangeInfo aggregate : 
serverAggregates.values()) {
+        maxSegmentsAddedToASingleServer = 
Math.max(maxSegmentsAddedToASingleServer, aggregate.getSegmentsAdded());
+      }
+
+      // Calculate average segment size

Review Comment:
   is this meaningful for a tenant rebalance? tables can have very different 
segment size characteristics. no harm in keeping it around though I guess



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java:
##########
@@ -50,10 +87,123 @@ public TenantRebalanceResult(String jobId, Map<String, 
RebalanceResult> rebalanc
     }
   }
 
+  private void computeStatusSummary(Map<String, RebalanceResult> 
rebalanceTableResults) {
+    _statusSummary = new HashMap<>();
+    for (RebalanceResult result : rebalanceTableResults.values()) {
+      RebalanceResult.Status status = result.getStatus();
+      _statusSummary.put(status, _statusSummary.getOrDefault(status, 0) + 1);
+    }
+  }
+
+  private void computeAggregatedPreChecks(Map<String, RebalanceResult> 
rebalanceTableResults) {
+    _aggregatedPreChecksResult = new HashMap<>();
+
+    // Organize pre-check results by pre-check name
+    Map<String, Map<String, String>> passedTablesByCheck = new HashMap<>();
+    Map<String, Map<String, String>> warnedTablesByCheck = new HashMap<>();
+    Map<String, Map<String, String>> erroredTablesByCheck = new HashMap<>();
+
+    // Process each table's pre-check results
+    for (Map.Entry<String, RebalanceResult> entry : 
rebalanceTableResults.entrySet()) {
+      String tableName = entry.getKey();
+      RebalanceResult result = entry.getValue();
+
+      if (result.getPreChecksResult() != null) {
+        for (Map.Entry<String, RebalancePreCheckerResult> checkEntry : 
result.getPreChecksResult().entrySet()) {
+          String checkName = checkEntry.getKey();
+          RebalancePreCheckerResult checkResult = checkEntry.getValue();
+
+          // Initialize maps for this check if not present
+          passedTablesByCheck.computeIfAbsent(checkName, k -> new HashMap<>());
+          warnedTablesByCheck.computeIfAbsent(checkName, k -> new HashMap<>());
+          erroredTablesByCheck.computeIfAbsent(checkName, k -> new 
HashMap<>());
+
+          // Categorize table based on this specific check's status
+          String message = checkResult.getMessage() != null ? 
checkResult.getMessage() : "";
+          switch (checkResult.getPreCheckStatus()) {
+            case PASS:
+              passedTablesByCheck.get(checkName).put(tableName, message);
+              break;
+            case WARN:
+              warnedTablesByCheck.get(checkName).put(tableName, message);
+              break;
+            case ERROR:
+              erroredTablesByCheck.get(checkName).put(tableName, message);
+              break;
+            default:
+              break; // Ignore unknown statuses

Review Comment:
   nit: let's log a warning here?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java:
##########
@@ -65,4 +215,449 @@ public void setJobId(String jobId) {
   public void setRebalanceTableResults(Map<String, RebalanceResult> 
rebalanceTableResults) {
     _rebalanceTableResults = rebalanceTableResults;
   }
+
+  /**
+   * Aggregated pre-check result that provides table-level pre-check status 
counts and message mappings
+   */
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public static class AggregatedPrecheckResult {
+    private final int _tablesPassedCount;
+    private final int _tablesWarnedCount;
+    private final int _tablesErroredCount;
+    private final Map<String, String> _passedTables;
+    private final Map<String, String> _warnedTables;
+    private final Map<String, String> _erroredTables;
+
+    public AggregatedPrecheckResult(int tablesPassedCount, int 
tablesWarnedCount, int tablesErroredCount,
+        Map<String, String> passedTables, Map<String, String> warnedTables,
+        Map<String, String> erroredTables) {
+      _tablesPassedCount = tablesPassedCount;
+      _tablesWarnedCount = tablesWarnedCount;
+      _tablesErroredCount = tablesErroredCount;
+      _passedTables = passedTables;
+      _warnedTables = warnedTables;
+      _erroredTables = erroredTables;
+    }
+
+    @JsonProperty
+    public int getTablesPassedCount() {
+      return _tablesPassedCount;
+    }
+
+    @JsonProperty
+    public int getTablesWarnedCount() {
+      return _tablesWarnedCount;
+    }
+
+    @JsonProperty
+    public int getTablesErroredCount() {
+      return _tablesErroredCount;
+    }
+
+    @JsonProperty
+    public Map<String, String> getPassedTables() {
+      return _passedTables;
+    }
+
+    @JsonProperty
+    public Map<String, String> getWarnedTables() {
+      return _warnedTables;
+    }
+
+    @JsonProperty
+    public Map<String, String> getErroredTables() {
+      return _erroredTables;
+    }
+  }
+
+  /**
+   * Step 1: Aggregate ServerSegmentChangeInfo across all tables for each 
server
+   */
+  private static Map<String, AggregatedServerSegmentChangeInfo> 
aggregateServerSegmentChangeInfo(
+      List<RebalanceSummaryResult> summaryResults) {
+    Map<String, AggregatedServerSegmentChangeInfo> serverAggregates = new 
HashMap<>();
+
+    for (RebalanceSummaryResult summary : summaryResults) {
+      if (summary.getServerInfo() != null && 
summary.getServerInfo().getServerSegmentChangeInfo() != null) {
+        for (Map.Entry<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
entry : summary.getServerInfo()
+            .getServerSegmentChangeInfo()
+            .entrySet()) {
+          String serverName = entry.getKey();
+          RebalanceSummaryResult.ServerSegmentChangeInfo changeInfo = 
entry.getValue();
+
+          serverAggregates.computeIfAbsent(serverName, k -> new 
AggregatedServerSegmentChangeInfo())
+              .merge(changeInfo);
+        }
+      }
+    }
+
+    return serverAggregates;
+  }
+
+  /**
+   * Helper class to aggregate ServerSegmentChangeInfo across multiple tables
+   */
+  private static class AggregatedServerSegmentChangeInfo extends 
RebalanceSummaryResult.ServerSegmentChangeInfo {
+
+    AggregatedServerSegmentChangeInfo() {
+      super(RebalanceSummaryResult.ServerStatus.UNCHANGED, 0, 0, 0, 0, 0, 
null);
+    }
+
+    void merge(RebalanceSummaryResult.ServerSegmentChangeInfo changeInfo) {
+      _totalSegmentsAfterRebalance += 
changeInfo.getTotalSegmentsAfterRebalance();
+      _totalSegmentsBeforeRebalance += 
changeInfo.getTotalSegmentsBeforeRebalance();
+      _segmentsAdded += changeInfo.getSegmentsAdded();
+      _segmentsDeleted += changeInfo.getSegmentsDeleted();
+      _segmentsUnchanged += changeInfo.getSegmentsUnchanged();
+
+      // Use tag list from any of the change infos (should be consistent)
+      if (_tagList == null && changeInfo.getTagList() != null) {
+        _tagList = changeInfo.getTagList();
+      }
+      if (_totalSegmentsAfterRebalance == 0) {
+        _serverStatus = RebalanceSummaryResult.ServerStatus.REMOVED;
+      } else if (_totalSegmentsBeforeRebalance == 0) {
+        _serverStatus = RebalanceSummaryResult.ServerStatus.ADDED;
+      } else {
+        _serverStatus = RebalanceSummaryResult.ServerStatus.UNCHANGED;
+      }
+    }
+  }
+
+  /**
+   * Aggregated ServerInfo that extends RebalanceSummaryResult.ServerInfo
+   */
+  private static class AggregatedServerInfo extends 
RebalanceSummaryResult.ServerInfo {
+    AggregatedServerInfo(Map<String, AggregatedServerSegmentChangeInfo> 
serverAggregates) {
+      super(0, null, null, null, null, null, null);
+
+      if (serverAggregates.isEmpty()) {
+        return;
+      }
+
+      Set<String> serversAdded = new HashSet<>();
+      Set<String> serversRemoved = new HashSet<>();
+      Set<String> serversUnchanged = new HashSet<>();
+      Set<String> serversGettingNewSegments = new HashSet<>();
+      Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
finalServerSegmentChangeInfo = new HashMap<>();
+
+      int numServersGettingNewSegments = 0;
+      int totalServersBefore = 0;
+      int totalServersAfter = 0;
+
+      for (Map.Entry<String, AggregatedServerSegmentChangeInfo> entry : 
serverAggregates.entrySet()) {
+        String serverName = entry.getKey();
+        AggregatedServerSegmentChangeInfo aggregate = entry.getValue();
+
+        // Determine server status based on aggregated segment counts
+        if (aggregate.getServerStatus() == 
RebalanceSummaryResult.ServerStatus.REMOVED) {
+          serversRemoved.add(serverName);
+        } else if (aggregate.getServerStatus() == 
RebalanceSummaryResult.ServerStatus.ADDED) {
+          serversAdded.add(serverName);
+        } else if (aggregate.getServerStatus() == 
RebalanceSummaryResult.ServerStatus.UNCHANGED) {
+          serversUnchanged.add(serverName);
+        }
+
+        // Track servers getting new segments
+        if (aggregate.getSegmentsAdded() > 0) {
+          serversGettingNewSegments.add(serverName);
+          numServersGettingNewSegments++;
+        }
+
+        // Create final ServerSegmentChangeInfo with determined status
+        finalServerSegmentChangeInfo.put(serverName, aggregate);
+
+        // Count servers for before/after totals
+        if (aggregate.getTotalSegmentsBeforeRebalance() > 0) {
+          totalServersBefore++;
+        }
+        if (aggregate.getTotalSegmentsAfterRebalance() > 0) {
+          totalServersAfter++;
+        }
+      }
+
+      RebalanceSummaryResult.RebalanceChangeInfo
+          aggregatedNumServers = new 
RebalanceSummaryResult.RebalanceChangeInfo(totalServersBefore, 
totalServersAfter);
+
+      // Set the computed values using reflection or by updating the parent 
constructor call
+      _numServersGettingNewSegments = numServersGettingNewSegments;
+      _numServers = aggregatedNumServers;
+      _serversAdded = serversAdded;
+      _serversRemoved = serversRemoved;
+      _serversUnchanged = serversUnchanged;
+      _serversGettingNewSegments = serversGettingNewSegments;
+      _serverSegmentChangeInfo = finalServerSegmentChangeInfo;
+    }
+  }
+
+  /**
+   * Aggregated SegmentInfo that extends RebalanceSummaryResult.SegmentInfo
+   */
+  private static class AggregatedSegmentInfo extends 
RebalanceSummaryResult.SegmentInfo {
+    AggregatedSegmentInfo(List<RebalanceSummaryResult> summaryResults,
+        Map<String, AggregatedServerSegmentChangeInfo> serverAggregates) {
+      super(0, 0, 0, 0, 0, null, null, null, null);
+
+      if (summaryResults.isEmpty()) {
+        return;
+      }
+
+      int totalSegmentsToBeMoved = 0;
+      int totalSegmentsToBeDeleted = 0;
+      long totalEstimatedDataToBeMovedInBytes = 0;
+
+      int beforeRebalanceReplicationFactor = 0;
+      int afterRebalanceReplicationFactor = 0;
+      int beforeRebalanceSegmentsInSingleReplica = 0;
+      int afterRebalanceSegmentsInSingleReplica = 0;
+      int beforeRebalanceSegmentsAcrossAllReplicas = 0;
+      int afterRebalanceSegmentsAcrossAllReplicas = 0;
+
+      // Track consuming segment summaries for realtime tables
+      List<RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary> 
consumingSummaries = new ArrayList<>();
+      int validSummariesCount = 0;
+
+      // Aggregate data from individual table summaries
+      for (RebalanceSummaryResult summary : summaryResults) {
+        if (summary.getSegmentInfo() != null) {
+          RebalanceSummaryResult.SegmentInfo segmentInfo = 
summary.getSegmentInfo();
+          validSummariesCount++;
+
+          totalSegmentsToBeMoved += segmentInfo.getTotalSegmentsToBeMoved();
+          totalSegmentsToBeDeleted += 
segmentInfo.getTotalSegmentsToBeDeleted();
+          if (totalEstimatedDataToBeMovedInBytes >= 0) {
+            totalEstimatedDataToBeMovedInBytes = 
segmentInfo.getTotalEstimatedDataToBeMovedInBytes() < 0 ? -1
+                : totalEstimatedDataToBeMovedInBytes + 
segmentInfo.getTotalEstimatedDataToBeMovedInBytes();
+          }
+
+          if (segmentInfo.getNumSegmentsInSingleReplica() != null) {
+            beforeRebalanceSegmentsInSingleReplica +=
+                
segmentInfo.getNumSegmentsInSingleReplica().getValueBeforeRebalance();
+            afterRebalanceSegmentsInSingleReplica +=
+                
segmentInfo.getNumSegmentsInSingleReplica().getExpectedValueAfterRebalance();
+          }
+
+          if (segmentInfo.getNumSegmentsAcrossAllReplicas() != null) {
+            beforeRebalanceSegmentsAcrossAllReplicas +=
+                
segmentInfo.getNumSegmentsAcrossAllReplicas().getValueBeforeRebalance();
+            afterRebalanceSegmentsAcrossAllReplicas +=
+                
segmentInfo.getNumSegmentsAcrossAllReplicas().getExpectedValueAfterRebalance();
+          }
+
+          if (segmentInfo.getConsumingSegmentToBeMovedSummary() != null) {
+            
consumingSummaries.add(segmentInfo.getConsumingSegmentToBeMovedSummary());
+          }
+        }
+      }
+
+      if (validSummariesCount == 0) {
+        return;
+      }
+
+      int maxSegmentsAddedToASingleServer = 0;
+      for (AggregatedServerSegmentChangeInfo aggregate : 
serverAggregates.values()) {
+        maxSegmentsAddedToASingleServer = 
Math.max(maxSegmentsAddedToASingleServer, aggregate.getSegmentsAdded());
+      }
+
+      // Calculate average segment size
+      long estimatedAverageSegmentSizeInBytes =
+          totalEstimatedDataToBeMovedInBytes >= 0 && totalSegmentsToBeMoved > 
0 ? totalEstimatedDataToBeMovedInBytes
+              / totalSegmentsToBeMoved : 0;
+
+      // Create aggregated RebalanceChangeInfo objects
+      RebalanceSummaryResult.RebalanceChangeInfo 
aggregatedNumSegmentsInSingleReplica =
+          new RebalanceSummaryResult.RebalanceChangeInfo(
+              beforeRebalanceSegmentsInSingleReplica, 
afterRebalanceSegmentsInSingleReplica);
+      RebalanceSummaryResult.RebalanceChangeInfo 
aggregatedNumSegmentsAcrossAllReplicas =
+          new RebalanceSummaryResult.RebalanceChangeInfo(
+              beforeRebalanceSegmentsAcrossAllReplicas, 
afterRebalanceSegmentsAcrossAllReplicas);
+
+      // Aggregate consuming segment summaries
+      RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
aggregatedConsumingSummary =
+          aggregateConsumingSegmentSummary(consumingSummaries);
+
+      // Set the computed values
+      _totalSegmentsToBeMoved = totalSegmentsToBeMoved;
+      _totalSegmentsToBeDeleted = totalSegmentsToBeDeleted;
+      _maxSegmentsAddedToASingleServer = maxSegmentsAddedToASingleServer;
+      _estimatedAverageSegmentSizeInBytes = estimatedAverageSegmentSizeInBytes;
+      _totalEstimatedDataToBeMovedInBytes = totalEstimatedDataToBeMovedInBytes;
+      _replicationFactor = null; // This is irrelevant for tenant rebalance
+      _numSegmentsInSingleReplica = aggregatedNumSegmentsInSingleReplica;
+      _numSegmentsAcrossAllReplicas = aggregatedNumSegmentsAcrossAllReplicas;
+      _consumingSegmentToBeMovedSummary = aggregatedConsumingSummary;
+    }
+  }
+
+  /**
+   * Aggregates consuming segment summaries for realtime tables
+   */
+  private static RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
aggregateConsumingSegmentSummary(
+      List<RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary> summaries) 
{
+    if (summaries.isEmpty()) {
+      return null;
+    }
+
+    int totalNumConsumingSegmentsToBeMoved = 0;
+    // Find the maximum size of offset and age maps across all tables
+    int maxOffsetMapSize = 0;
+    int maxAgeMapSize = 0;
+    for (RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary summary : 
summaries) {
+      if (summary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp() != 
null) {
+        maxOffsetMapSize = Math.max(maxOffsetMapSize,
+            
summary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp().size());
+      }
+      if (summary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes() != 
null) {
+        maxAgeMapSize = Math.max(maxAgeMapSize,
+            
summary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().size());
+      }
+    }
+
+    // Create maps to store all segments by offset and age
+    Map<String, Integer> allConsumingSegmentsWithOffsets = new HashMap<>();
+    Map<String, Integer> allConsumingSegmentsWithAges = new HashMap<>();
+
+    // Aggregate ConsumingSegmentSummaryPerServer by server name across all 
tables
+    Map<String, AggregatedConsumingSegmentSummaryPerServer> serverAggregates = 
new HashMap<>();
+
+    for (RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary summary : 
summaries) {
+      totalNumConsumingSegmentsToBeMoved += 
summary.getNumConsumingSegmentsToBeMoved();
+
+      // Add all segments with offsets
+      if (summary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp() != 
null) {
+        
allConsumingSegmentsWithOffsets.putAll(summary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp());
+      }
+
+      // Add all segments with ages
+      if (summary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes() != 
null) {
+        
allConsumingSegmentsWithAges.putAll(summary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes());
+      }
+
+      // Aggregate server consuming segment summaries by server name
+      if (summary.getServerConsumingSegmentSummary() != null) {
+        for (Map.Entry<String,
+            
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer>
 entry
+            : summary.getServerConsumingSegmentSummary()
+            .entrySet()) {
+          String serverName = entry.getKey();
+          
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer
 serverSummary =
+              entry.getValue();
+
+          serverAggregates.computeIfAbsent(serverName, k -> new 
AggregatedConsumingSegmentSummaryPerServer())
+              .merge(serverSummary);
+        }
+      }
+    }
+
+    // Keep only top k segments with the highest offsets
+    Map<String, Integer> topKConsumingSegmentsWithOffsets = 
allConsumingSegmentsWithOffsets.entrySet().stream()
+        .sorted(Map.Entry.<String, Integer>comparingByValue().reversed())
+        .limit(maxOffsetMapSize)
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, 
e2) -> e1, HashMap::new));
+
+    // Keep only top k segments with the oldest ages
+    Map<String, Integer> topKConsumingSegmentsWithAges = 
allConsumingSegmentsWithAges.entrySet().stream()
+        .sorted(Map.Entry.<String, Integer>comparingByValue().reversed())
+        .limit(maxAgeMapSize)
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, 
e2) -> e1, HashMap::new));
+
+    // Replace the original maps with the top k versions
+    allConsumingSegmentsWithOffsets = topKConsumingSegmentsWithOffsets;
+    allConsumingSegmentsWithAges = topKConsumingSegmentsWithAges;
+
+    // Convert aggregated server data to final 
ConsumingSegmentSummaryPerServer map
+    Map<String, 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer>
+        finalServerConsumingSummary = new HashMap<>(serverAggregates);
+
+    int totalNumServersGettingConsumingSegmentsAdded = 
finalServerConsumingSummary.size();
+
+    return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(
+        totalNumConsumingSegmentsToBeMoved,
+        totalNumServersGettingConsumingSegmentsAdded,
+        allConsumingSegmentsWithOffsets.isEmpty() ? null : 
allConsumingSegmentsWithOffsets,
+        allConsumingSegmentsWithAges.isEmpty() ? null : 
allConsumingSegmentsWithAges,
+        finalServerConsumingSummary.isEmpty() ? null : 
finalServerConsumingSummary
+    );
+  }
+
+  /**
+   * Helper class to aggregate ConsumingSegmentSummaryPerServer across 
multiple tables
+   */
+  private static class AggregatedConsumingSegmentSummaryPerServer

Review Comment:
   just wondering if we need this based on our offline conversation?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java:
##########
@@ -50,10 +87,123 @@ public TenantRebalanceResult(String jobId, Map<String, 
RebalanceResult> rebalanc
     }
   }
 
+  private void computeStatusSummary(Map<String, RebalanceResult> 
rebalanceTableResults) {
+    _statusSummary = new HashMap<>();
+    for (RebalanceResult result : rebalanceTableResults.values()) {
+      RebalanceResult.Status status = result.getStatus();
+      _statusSummary.put(status, _statusSummary.getOrDefault(status, 0) + 1);
+    }
+  }
+
+  private void computeAggregatedPreChecks(Map<String, RebalanceResult> 
rebalanceTableResults) {
+    _aggregatedPreChecksResult = new HashMap<>();
+
+    // Organize pre-check results by pre-check name
+    Map<String, Map<String, String>> passedTablesByCheck = new HashMap<>();
+    Map<String, Map<String, String>> warnedTablesByCheck = new HashMap<>();
+    Map<String, Map<String, String>> erroredTablesByCheck = new HashMap<>();
+
+    // Process each table's pre-check results
+    for (Map.Entry<String, RebalanceResult> entry : 
rebalanceTableResults.entrySet()) {
+      String tableName = entry.getKey();
+      RebalanceResult result = entry.getValue();
+
+      if (result.getPreChecksResult() != null) {
+        for (Map.Entry<String, RebalancePreCheckerResult> checkEntry : 
result.getPreChecksResult().entrySet()) {
+          String checkName = checkEntry.getKey();
+          RebalancePreCheckerResult checkResult = checkEntry.getValue();
+
+          // Initialize maps for this check if not present
+          passedTablesByCheck.computeIfAbsent(checkName, k -> new HashMap<>());
+          warnedTablesByCheck.computeIfAbsent(checkName, k -> new HashMap<>());
+          erroredTablesByCheck.computeIfAbsent(checkName, k -> new 
HashMap<>());

Review Comment:
   nit: just combine this with the below?
   
   ```
               case PASS:
                 passedTablesByCheck.get(checkName).put(tableName, message);
                 break;
               case WARN:
                 warnedTablesByCheck.get(checkName).put(tableName, message);
                 break;
               case ERROR:
                 erroredTablesByCheck.get(checkName).put(tableName, message);
                 break;
   ```
   
   Something like `passedTablesByCheck.computeIfAbsent(checkName, k -> new 
HashMap<>()).put(tableName, message);`? (fix up syntax in case i messed up)



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java:
##########
@@ -65,4 +215,449 @@ public void setJobId(String jobId) {
   public void setRebalanceTableResults(Map<String, RebalanceResult> 
rebalanceTableResults) {
     _rebalanceTableResults = rebalanceTableResults;
   }
+
+  /**
+   * Aggregated pre-check result that provides table-level pre-check status 
counts and message mappings
+   */
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public static class AggregatedPrecheckResult {
+    private final int _tablesPassedCount;
+    private final int _tablesWarnedCount;
+    private final int _tablesErroredCount;
+    private final Map<String, String> _passedTables;
+    private final Map<String, String> _warnedTables;
+    private final Map<String, String> _erroredTables;
+
+    public AggregatedPrecheckResult(int tablesPassedCount, int 
tablesWarnedCount, int tablesErroredCount,
+        Map<String, String> passedTables, Map<String, String> warnedTables,
+        Map<String, String> erroredTables) {
+      _tablesPassedCount = tablesPassedCount;
+      _tablesWarnedCount = tablesWarnedCount;
+      _tablesErroredCount = tablesErroredCount;
+      _passedTables = passedTables;
+      _warnedTables = warnedTables;
+      _erroredTables = erroredTables;
+    }
+
+    @JsonProperty
+    public int getTablesPassedCount() {
+      return _tablesPassedCount;
+    }
+
+    @JsonProperty
+    public int getTablesWarnedCount() {
+      return _tablesWarnedCount;
+    }
+
+    @JsonProperty
+    public int getTablesErroredCount() {
+      return _tablesErroredCount;
+    }
+
+    @JsonProperty
+    public Map<String, String> getPassedTables() {
+      return _passedTables;
+    }
+
+    @JsonProperty
+    public Map<String, String> getWarnedTables() {
+      return _warnedTables;
+    }
+
+    @JsonProperty
+    public Map<String, String> getErroredTables() {
+      return _erroredTables;
+    }
+  }
+
+  /**
+   * Step 1: Aggregate ServerSegmentChangeInfo across all tables for each 
server
+   */
+  private static Map<String, AggregatedServerSegmentChangeInfo> 
aggregateServerSegmentChangeInfo(
+      List<RebalanceSummaryResult> summaryResults) {
+    Map<String, AggregatedServerSegmentChangeInfo> serverAggregates = new 
HashMap<>();
+
+    for (RebalanceSummaryResult summary : summaryResults) {
+      if (summary.getServerInfo() != null && 
summary.getServerInfo().getServerSegmentChangeInfo() != null) {
+        for (Map.Entry<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
entry : summary.getServerInfo()
+            .getServerSegmentChangeInfo()
+            .entrySet()) {
+          String serverName = entry.getKey();
+          RebalanceSummaryResult.ServerSegmentChangeInfo changeInfo = 
entry.getValue();
+
+          serverAggregates.computeIfAbsent(serverName, k -> new 
AggregatedServerSegmentChangeInfo())
+              .merge(changeInfo);
+        }
+      }
+    }
+
+    return serverAggregates;
+  }
+
+  /**
+   * Helper class to aggregate ServerSegmentChangeInfo across multiple tables
+   */
+  private static class AggregatedServerSegmentChangeInfo extends 
RebalanceSummaryResult.ServerSegmentChangeInfo {
+
+    AggregatedServerSegmentChangeInfo() {
+      super(RebalanceSummaryResult.ServerStatus.UNCHANGED, 0, 0, 0, 0, 0, 
null);
+    }
+
+    void merge(RebalanceSummaryResult.ServerSegmentChangeInfo changeInfo) {
+      _totalSegmentsAfterRebalance += 
changeInfo.getTotalSegmentsAfterRebalance();
+      _totalSegmentsBeforeRebalance += 
changeInfo.getTotalSegmentsBeforeRebalance();
+      _segmentsAdded += changeInfo.getSegmentsAdded();
+      _segmentsDeleted += changeInfo.getSegmentsDeleted();
+      _segmentsUnchanged += changeInfo.getSegmentsUnchanged();
+
+      // Use tag list from any of the change infos (should be consistent)
+      if (_tagList == null && changeInfo.getTagList() != null) {
+        _tagList = changeInfo.getTagList();
+      }
+      if (_totalSegmentsAfterRebalance == 0) {
+        _serverStatus = RebalanceSummaryResult.ServerStatus.REMOVED;
+      } else if (_totalSegmentsBeforeRebalance == 0) {
+        _serverStatus = RebalanceSummaryResult.ServerStatus.ADDED;
+      } else {
+        _serverStatus = RebalanceSummaryResult.ServerStatus.UNCHANGED;
+      }
+    }
+  }
+
+  /**
+   * Aggregated ServerInfo that extends RebalanceSummaryResult.ServerInfo
+   */
+  private static class AggregatedServerInfo extends 
RebalanceSummaryResult.ServerInfo {
+    AggregatedServerInfo(Map<String, AggregatedServerSegmentChangeInfo> 
serverAggregates) {
+      super(0, null, null, null, null, null, null);
+
+      if (serverAggregates.isEmpty()) {
+        return;
+      }
+
+      Set<String> serversAdded = new HashSet<>();
+      Set<String> serversRemoved = new HashSet<>();
+      Set<String> serversUnchanged = new HashSet<>();
+      Set<String> serversGettingNewSegments = new HashSet<>();
+      Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
finalServerSegmentChangeInfo = new HashMap<>();
+
+      int numServersGettingNewSegments = 0;
+      int totalServersBefore = 0;
+      int totalServersAfter = 0;
+
+      for (Map.Entry<String, AggregatedServerSegmentChangeInfo> entry : 
serverAggregates.entrySet()) {
+        String serverName = entry.getKey();
+        AggregatedServerSegmentChangeInfo aggregate = entry.getValue();
+
+        // Determine server status based on aggregated segment counts
+        if (aggregate.getServerStatus() == 
RebalanceSummaryResult.ServerStatus.REMOVED) {
+          serversRemoved.add(serverName);
+        } else if (aggregate.getServerStatus() == 
RebalanceSummaryResult.ServerStatus.ADDED) {
+          serversAdded.add(serverName);
+        } else if (aggregate.getServerStatus() == 
RebalanceSummaryResult.ServerStatus.UNCHANGED) {
+          serversUnchanged.add(serverName);
+        }
+
+        // Track servers getting new segments
+        if (aggregate.getSegmentsAdded() > 0) {
+          serversGettingNewSegments.add(serverName);
+          numServersGettingNewSegments++;
+        }
+
+        // Create final ServerSegmentChangeInfo with determined status
+        finalServerSegmentChangeInfo.put(serverName, aggregate);
+
+        // Count servers for before/after totals
+        if (aggregate.getTotalSegmentsBeforeRebalance() > 0) {
+          totalServersBefore++;
+        }
+        if (aggregate.getTotalSegmentsAfterRebalance() > 0) {
+          totalServersAfter++;
+        }
+      }
+
+      RebalanceSummaryResult.RebalanceChangeInfo
+          aggregatedNumServers = new 
RebalanceSummaryResult.RebalanceChangeInfo(totalServersBefore, 
totalServersAfter);
+
+      // Set the computed values using reflection or by updating the parent 
constructor call
+      _numServersGettingNewSegments = numServersGettingNewSegments;
+      _numServers = aggregatedNumServers;
+      _serversAdded = serversAdded;
+      _serversRemoved = serversRemoved;
+      _serversUnchanged = serversUnchanged;
+      _serversGettingNewSegments = serversGettingNewSegments;
+      _serverSegmentChangeInfo = finalServerSegmentChangeInfo;
+    }
+  }
+
+  /**
+   * Aggregated SegmentInfo that extends RebalanceSummaryResult.SegmentInfo
+   */
+  private static class AggregatedSegmentInfo extends 
RebalanceSummaryResult.SegmentInfo {
+    AggregatedSegmentInfo(List<RebalanceSummaryResult> summaryResults,
+        Map<String, AggregatedServerSegmentChangeInfo> serverAggregates) {
+      super(0, 0, 0, 0, 0, null, null, null, null);
+
+      if (summaryResults.isEmpty()) {
+        return;
+      }
+
+      int totalSegmentsToBeMoved = 0;
+      int totalSegmentsToBeDeleted = 0;
+      long totalEstimatedDataToBeMovedInBytes = 0;
+
+      int beforeRebalanceReplicationFactor = 0;
+      int afterRebalanceReplicationFactor = 0;
+      int beforeRebalanceSegmentsInSingleReplica = 0;
+      int afterRebalanceSegmentsInSingleReplica = 0;
+      int beforeRebalanceSegmentsAcrossAllReplicas = 0;
+      int afterRebalanceSegmentsAcrossAllReplicas = 0;
+
+      // Track consuming segment summaries for realtime tables
+      List<RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary> 
consumingSummaries = new ArrayList<>();
+      int validSummariesCount = 0;
+
+      // Aggregate data from individual table summaries
+      for (RebalanceSummaryResult summary : summaryResults) {
+        if (summary.getSegmentInfo() != null) {
+          RebalanceSummaryResult.SegmentInfo segmentInfo = 
summary.getSegmentInfo();
+          validSummariesCount++;
+
+          totalSegmentsToBeMoved += segmentInfo.getTotalSegmentsToBeMoved();
+          totalSegmentsToBeDeleted += 
segmentInfo.getTotalSegmentsToBeDeleted();
+          if (totalEstimatedDataToBeMovedInBytes >= 0) {
+            totalEstimatedDataToBeMovedInBytes = 
segmentInfo.getTotalEstimatedDataToBeMovedInBytes() < 0 ? -1
+                : totalEstimatedDataToBeMovedInBytes + 
segmentInfo.getTotalEstimatedDataToBeMovedInBytes();
+          }
+
+          if (segmentInfo.getNumSegmentsInSingleReplica() != null) {
+            beforeRebalanceSegmentsInSingleReplica +=
+                
segmentInfo.getNumSegmentsInSingleReplica().getValueBeforeRebalance();
+            afterRebalanceSegmentsInSingleReplica +=
+                
segmentInfo.getNumSegmentsInSingleReplica().getExpectedValueAfterRebalance();
+          }
+
+          if (segmentInfo.getNumSegmentsAcrossAllReplicas() != null) {
+            beforeRebalanceSegmentsAcrossAllReplicas +=
+                
segmentInfo.getNumSegmentsAcrossAllReplicas().getValueBeforeRebalance();
+            afterRebalanceSegmentsAcrossAllReplicas +=
+                
segmentInfo.getNumSegmentsAcrossAllReplicas().getExpectedValueAfterRebalance();
+          }
+
+          if (segmentInfo.getConsumingSegmentToBeMovedSummary() != null) {
+            
consumingSummaries.add(segmentInfo.getConsumingSegmentToBeMovedSummary());
+          }
+        }
+      }
+
+      if (validSummariesCount == 0) {
+        return;
+      }
+
+      int maxSegmentsAddedToASingleServer = 0;
+      for (AggregatedServerSegmentChangeInfo aggregate : 
serverAggregates.values()) {
+        maxSegmentsAddedToASingleServer = 
Math.max(maxSegmentsAddedToASingleServer, aggregate.getSegmentsAdded());
+      }
+
+      // Calculate average segment size
+      long estimatedAverageSegmentSizeInBytes =
+          totalEstimatedDataToBeMovedInBytes >= 0 && totalSegmentsToBeMoved > 
0 ? totalEstimatedDataToBeMovedInBytes
+              / totalSegmentsToBeMoved : 0;
+
+      // Create aggregated RebalanceChangeInfo objects
+      RebalanceSummaryResult.RebalanceChangeInfo 
aggregatedNumSegmentsInSingleReplica =
+          new RebalanceSummaryResult.RebalanceChangeInfo(
+              beforeRebalanceSegmentsInSingleReplica, 
afterRebalanceSegmentsInSingleReplica);
+      RebalanceSummaryResult.RebalanceChangeInfo 
aggregatedNumSegmentsAcrossAllReplicas =
+          new RebalanceSummaryResult.RebalanceChangeInfo(
+              beforeRebalanceSegmentsAcrossAllReplicas, 
afterRebalanceSegmentsAcrossAllReplicas);
+
+      // Aggregate consuming segment summaries
+      RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
aggregatedConsumingSummary =

Review Comment:
   should this just be set to null (or at the very least have that logic not 
aggregate the server level data)? since we discussed offline that it may not 
make much sense to consolidate this across servers?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java:
##########
@@ -65,4 +215,449 @@ public void setJobId(String jobId) {
   public void setRebalanceTableResults(Map<String, RebalanceResult> 
rebalanceTableResults) {
     _rebalanceTableResults = rebalanceTableResults;
   }
+
+  /**
+   * Aggregated pre-check result that provides table-level pre-check status 
counts and message mappings
+   */
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public static class AggregatedPrecheckResult {
+    private final int _tablesPassedCount;
+    private final int _tablesWarnedCount;
+    private final int _tablesErroredCount;
+    private final Map<String, String> _passedTables;
+    private final Map<String, String> _warnedTables;
+    private final Map<String, String> _erroredTables;
+
+    public AggregatedPrecheckResult(int tablesPassedCount, int 
tablesWarnedCount, int tablesErroredCount,
+        Map<String, String> passedTables, Map<String, String> warnedTables,
+        Map<String, String> erroredTables) {
+      _tablesPassedCount = tablesPassedCount;
+      _tablesWarnedCount = tablesWarnedCount;
+      _tablesErroredCount = tablesErroredCount;
+      _passedTables = passedTables;
+      _warnedTables = warnedTables;
+      _erroredTables = erroredTables;
+    }
+
+    @JsonProperty
+    public int getTablesPassedCount() {
+      return _tablesPassedCount;
+    }
+
+    @JsonProperty
+    public int getTablesWarnedCount() {
+      return _tablesWarnedCount;
+    }
+
+    @JsonProperty
+    public int getTablesErroredCount() {
+      return _tablesErroredCount;
+    }
+
+    @JsonProperty
+    public Map<String, String> getPassedTables() {
+      return _passedTables;
+    }
+
+    @JsonProperty
+    public Map<String, String> getWarnedTables() {
+      return _warnedTables;
+    }
+
+    @JsonProperty
+    public Map<String, String> getErroredTables() {
+      return _erroredTables;
+    }
+  }
+
+  /**
+   * Step 1: Aggregate ServerSegmentChangeInfo across all tables for each 
server
+   */
+  private static Map<String, AggregatedServerSegmentChangeInfo> 
aggregateServerSegmentChangeInfo(
+      List<RebalanceSummaryResult> summaryResults) {
+    Map<String, AggregatedServerSegmentChangeInfo> serverAggregates = new 
HashMap<>();
+
+    for (RebalanceSummaryResult summary : summaryResults) {
+      if (summary.getServerInfo() != null && 
summary.getServerInfo().getServerSegmentChangeInfo() != null) {
+        for (Map.Entry<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
entry : summary.getServerInfo()
+            .getServerSegmentChangeInfo()
+            .entrySet()) {
+          String serverName = entry.getKey();
+          RebalanceSummaryResult.ServerSegmentChangeInfo changeInfo = 
entry.getValue();
+
+          serverAggregates.computeIfAbsent(serverName, k -> new 
AggregatedServerSegmentChangeInfo())
+              .merge(changeInfo);
+        }
+      }
+    }
+
+    return serverAggregates;
+  }
+
+  /**
+   * Helper class to aggregate ServerSegmentChangeInfo across multiple tables
+   */
+  private static class AggregatedServerSegmentChangeInfo extends 
RebalanceSummaryResult.ServerSegmentChangeInfo {
+
+    AggregatedServerSegmentChangeInfo() {
+      super(RebalanceSummaryResult.ServerStatus.UNCHANGED, 0, 0, 0, 0, 0, 
null);
+    }
+
+    void merge(RebalanceSummaryResult.ServerSegmentChangeInfo changeInfo) {
+      _totalSegmentsAfterRebalance += 
changeInfo.getTotalSegmentsAfterRebalance();
+      _totalSegmentsBeforeRebalance += 
changeInfo.getTotalSegmentsBeforeRebalance();
+      _segmentsAdded += changeInfo.getSegmentsAdded();
+      _segmentsDeleted += changeInfo.getSegmentsDeleted();
+      _segmentsUnchanged += changeInfo.getSegmentsUnchanged();
+
+      // Use tag list from any of the change infos (should be consistent)
+      if (_tagList == null && changeInfo.getTagList() != null) {
+        _tagList = changeInfo.getTagList();
+      }

Review Comment:
   tagList might not be the same across all servers (e.g. some servers can 
belong to multiple tenants). should we instead keep all unique tags across all 
servers? or perhaps set this to empty list since you're doing tag level 
aggregation anyways?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to