J-HowHuang commented on code in PR #15368:
URL: https://github.com/apache/pinot/pull/15368#discussion_r2027951549


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -806,6 +856,172 @@ private List<String> getServerTag(String serverName) {
     return instanceConfig.getTags();
   }
 
+  private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
getConsumingSegmentSummary(TableConfig tableConfig,
+      Map<String, Set<String>> newServersToConsumingSegmentMap) {
+    String tableNameWithType = tableConfig.getTableName();
+    if (newServersToConsumingSegmentMap.isEmpty()) {
+      return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(0, 0, 
new HashMap<>(), new HashMap<>(),
+          new HashMap<>());
+    }
+    int numConsumingSegmentsToBeMoved =
+        newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> 
a + b.size(), Integer::sum);
+    Set<String> uniqueConsumingSegments =
+        
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+    Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new 
HashMap<>();
+    uniqueConsumingSegments.forEach(segment -> 
consumingSegmentZKmetadata.put(segment,
+        
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
tableNameWithType, segment)));
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+        getConsumingSegmentsOffsetsToCatchUp(tableConfig, 
consumingSegmentZKmetadata);
+    Map<String, Integer> consumingSegmentsAge = 
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUpTopN;
+    Map<String, 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer>
+        consumingSegmentSummaryPerServer = new HashMap<>();
+    if (consumingSegmentsOffsetsToCatchUp != null) {
+      consumingSegmentsOffsetsToCatchUpTopN =
+          getTopNConsumingSegmentWithValue(consumingSegmentsOffsetsToCatchUp, 
TOP_N_IN_CONSUMING_SEGMENT_SUMMARY);
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        int totalOffsetsToCatchUp =
+            
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+        consumingSegmentSummaryPerServer.put(server,
+            new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+                segments.size(), totalOffsetsToCatchUp));
+      });
+    } else {
+      consumingSegmentsOffsetsToCatchUpTopN = null;
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        consumingSegmentSummaryPerServer.put(server,
+            new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+                segments.size(), -1));
+      });
+    }
+
+    Map<String, Integer> consumingSegmentsOldestTopN =
+        consumingSegmentsAge == null ? null
+            : getTopNConsumingSegmentWithValue(consumingSegmentsAge, 
TOP_N_IN_CONSUMING_SEGMENT_SUMMARY);
+
+    return new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved,
+        newServersToConsumingSegmentMap.size(), 
consumingSegmentsOffsetsToCatchUpTopN, consumingSegmentsOldestTopN,
+        consumingSegmentSummaryPerServer);
+  }
+
+  private static Map<String, Integer> getTopNConsumingSegmentWithValue(
+      Map<String, Integer> consumingSegmentsWithValue, @Nullable Integer topN) 
{

Review Comment:
   Suggested by @klsince 
   https://github.com/apache/pinot/pull/15368#discussion_r2027535003



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to