J-HowHuang commented on code in PR #16136:
URL: https://github.com/apache/pinot/pull/16136#discussion_r2159764191


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java:
##########
@@ -65,4 +215,449 @@ public void setJobId(String jobId) {
   public void setRebalanceTableResults(Map<String, RebalanceResult> 
rebalanceTableResults) {
     _rebalanceTableResults = rebalanceTableResults;
   }
+
+  /**
+   * Aggregated pre-check result that provides table-level pre-check status 
counts and message mappings
+   */
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public static class AggregatedPrecheckResult {
+    private final int _tablesPassedCount;
+    private final int _tablesWarnedCount;
+    private final int _tablesErroredCount;
+    private final Map<String, String> _passedTables;
+    private final Map<String, String> _warnedTables;
+    private final Map<String, String> _erroredTables;
+
+    public AggregatedPrecheckResult(int tablesPassedCount, int 
tablesWarnedCount, int tablesErroredCount,
+        Map<String, String> passedTables, Map<String, String> warnedTables,
+        Map<String, String> erroredTables) {
+      _tablesPassedCount = tablesPassedCount;
+      _tablesWarnedCount = tablesWarnedCount;
+      _tablesErroredCount = tablesErroredCount;
+      _passedTables = passedTables;
+      _warnedTables = warnedTables;
+      _erroredTables = erroredTables;
+    }
+
+    @JsonProperty
+    public int getTablesPassedCount() {
+      return _tablesPassedCount;
+    }
+
+    @JsonProperty
+    public int getTablesWarnedCount() {
+      return _tablesWarnedCount;
+    }
+
+    @JsonProperty
+    public int getTablesErroredCount() {
+      return _tablesErroredCount;
+    }
+
+    @JsonProperty
+    public Map<String, String> getPassedTables() {
+      return _passedTables;
+    }
+
+    @JsonProperty
+    public Map<String, String> getWarnedTables() {
+      return _warnedTables;
+    }
+
+    @JsonProperty
+    public Map<String, String> getErroredTables() {
+      return _erroredTables;
+    }
+  }
+
+  /**
+   * Step 1: Aggregate ServerSegmentChangeInfo across all tables for each 
server
+   */
+  private static Map<String, AggregatedServerSegmentChangeInfo> 
aggregateServerSegmentChangeInfo(
+      List<RebalanceSummaryResult> summaryResults) {
+    Map<String, AggregatedServerSegmentChangeInfo> serverAggregates = new 
HashMap<>();
+
+    for (RebalanceSummaryResult summary : summaryResults) {
+      if (summary.getServerInfo() != null && 
summary.getServerInfo().getServerSegmentChangeInfo() != null) {
+        for (Map.Entry<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
entry : summary.getServerInfo()
+            .getServerSegmentChangeInfo()
+            .entrySet()) {
+          String serverName = entry.getKey();
+          RebalanceSummaryResult.ServerSegmentChangeInfo changeInfo = 
entry.getValue();
+
+          serverAggregates.computeIfAbsent(serverName, k -> new 
AggregatedServerSegmentChangeInfo())
+              .merge(changeInfo);
+        }
+      }
+    }
+
+    return serverAggregates;
+  }
+
+  /**
+   * Helper class to aggregate ServerSegmentChangeInfo across multiple tables
+   */
+  private static class AggregatedServerSegmentChangeInfo extends 
RebalanceSummaryResult.ServerSegmentChangeInfo {
+
+    AggregatedServerSegmentChangeInfo() {
+      super(RebalanceSummaryResult.ServerStatus.UNCHANGED, 0, 0, 0, 0, 0, 
null);
+    }
+
+    void merge(RebalanceSummaryResult.ServerSegmentChangeInfo changeInfo) {
+      _totalSegmentsAfterRebalance += 
changeInfo.getTotalSegmentsAfterRebalance();
+      _totalSegmentsBeforeRebalance += 
changeInfo.getTotalSegmentsBeforeRebalance();
+      _segmentsAdded += changeInfo.getSegmentsAdded();
+      _segmentsDeleted += changeInfo.getSegmentsDeleted();
+      _segmentsUnchanged += changeInfo.getSegmentsUnchanged();
+
+      // Use tag list from any of the change infos (should be consistent)
+      if (_tagList == null && changeInfo.getTagList() != null) {
+        _tagList = changeInfo.getTagList();
+      }
+      if (_totalSegmentsAfterRebalance == 0) {
+        _serverStatus = RebalanceSummaryResult.ServerStatus.REMOVED;
+      } else if (_totalSegmentsBeforeRebalance == 0) {
+        _serverStatus = RebalanceSummaryResult.ServerStatus.ADDED;
+      } else {
+        _serverStatus = RebalanceSummaryResult.ServerStatus.UNCHANGED;
+      }
+    }
+  }
+
+  /**
+   * Aggregated ServerInfo that extends RebalanceSummaryResult.ServerInfo
+   */
+  private static class AggregatedServerInfo extends 
RebalanceSummaryResult.ServerInfo {
+    AggregatedServerInfo(Map<String, AggregatedServerSegmentChangeInfo> 
serverAggregates) {
+      super(0, null, null, null, null, null, null);
+
+      if (serverAggregates.isEmpty()) {
+        return;
+      }
+
+      Set<String> serversAdded = new HashSet<>();
+      Set<String> serversRemoved = new HashSet<>();
+      Set<String> serversUnchanged = new HashSet<>();
+      Set<String> serversGettingNewSegments = new HashSet<>();
+      Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
finalServerSegmentChangeInfo = new HashMap<>();
+
+      int numServersGettingNewSegments = 0;
+      int totalServersBefore = 0;
+      int totalServersAfter = 0;
+
+      for (Map.Entry<String, AggregatedServerSegmentChangeInfo> entry : 
serverAggregates.entrySet()) {
+        String serverName = entry.getKey();
+        AggregatedServerSegmentChangeInfo aggregate = entry.getValue();
+
+        // Determine server status based on aggregated segment counts
+        if (aggregate.getServerStatus() == 
RebalanceSummaryResult.ServerStatus.REMOVED) {
+          serversRemoved.add(serverName);
+        } else if (aggregate.getServerStatus() == 
RebalanceSummaryResult.ServerStatus.ADDED) {
+          serversAdded.add(serverName);
+        } else if (aggregate.getServerStatus() == 
RebalanceSummaryResult.ServerStatus.UNCHANGED) {
+          serversUnchanged.add(serverName);
+        }
+
+        // Track servers getting new segments
+        if (aggregate.getSegmentsAdded() > 0) {
+          serversGettingNewSegments.add(serverName);
+          numServersGettingNewSegments++;
+        }
+
+        // Create final ServerSegmentChangeInfo with determined status
+        finalServerSegmentChangeInfo.put(serverName, aggregate);
+
+        // Count servers for before/after totals
+        if (aggregate.getTotalSegmentsBeforeRebalance() > 0) {
+          totalServersBefore++;
+        }
+        if (aggregate.getTotalSegmentsAfterRebalance() > 0) {
+          totalServersAfter++;
+        }
+      }
+
+      RebalanceSummaryResult.RebalanceChangeInfo
+          aggregatedNumServers = new 
RebalanceSummaryResult.RebalanceChangeInfo(totalServersBefore, 
totalServersAfter);
+
+      // Set the computed values using reflection or by updating the parent 
constructor call

Review Comment:
   Thanks, I forgot to remove this. This was before I made the member variables 
in parent class `ServerInfo` `protected`. They were `private final` so some 
tricks had to be done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to