J-HowHuang commented on code in PR #15368:
URL: https://github.com/apache/pinot/pull/15368#discussion_r2027597651


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -806,6 +848,153 @@ private List<String> getServerTag(String serverName) {
     return instanceConfig.getTags();
   }
 
+  private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
getConsumingSegmentSummary(String tableNameWithType,
+      Map<String, Set<String>> newServersToConsumingSegmentMap) {
+    if (newServersToConsumingSegmentMap.isEmpty()) {
+      return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(0, 0, 
new HashMap<>(), new HashMap<>(),
+          new HashMap<>());
+    }
+    int numConsumingSegmentsToBeMoved =
+        newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> 
a + b.size(), Integer::sum);
+    Set<String> uniqueConsumingSegments =
+        
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+    Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new 
HashMap<>();
+    uniqueConsumingSegments.forEach(segment -> 
consumingSegmentZKmetadata.put(segment,
+        
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
tableNameWithType, segment)));
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+        getConsumingSegmentsOffsetsToCatchUp(tableNameWithType, 
consumingSegmentZKmetadata);
+    Map<String, Integer> consumingSegmentsAge = 
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUpTopN;
+    Map<String, 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer>
+        consumingSegmentSummaryPerServer =
+        new HashMap<>();
+    if (consumingSegmentsOffsetsToCatchUp != null) {
+      consumingSegmentsOffsetsToCatchUpTopN = 
getTopNConsumingSegmentWithValue(consumingSegmentsOffsetsToCatchUp);
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        int totalOffsetsToCatchUp =
+            
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+        consumingSegmentSummaryPerServer.put(server,
+            new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+                segments.size(), totalOffsetsToCatchUp));
+      });
+    } else {
+      consumingSegmentsOffsetsToCatchUpTopN = null;
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        consumingSegmentSummaryPerServer.put(server,
+            new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+                segments.size(), -1));
+      });
+    }
+
+    Map<String, Integer> consumingSegmentsOldestTopN =
+        consumingSegmentsAge == null ? null : 
getTopNConsumingSegmentWithValue(consumingSegmentsAge);
+
+    return new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved,
+        newServersToConsumingSegmentMap.size(), 
consumingSegmentsOffsetsToCatchUpTopN, consumingSegmentsOldestTopN,
+        consumingSegmentSummaryPerServer);
+  }
+
+  private static Map<String, Integer> getTopNConsumingSegmentWithValue(
+      Map<String, Integer> consumingSegmentsWithValue) {
+    Map<String, Integer> topNConsumingSegments = new LinkedHashMap<>();
+    consumingSegmentsWithValue.entrySet()
+        .stream()
+        .sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
+        .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+        .forEach(entry -> topNConsumingSegments.put(entry.getKey(), 
entry.getValue()));
+    return topNConsumingSegments;
+  }
+
+  /**
+   * Fetches the age of each consuming segment in minutes.
+   * The age of a consuming segment is the time since the segment was created 
in ZK, it could be different to when
+   * the stream should start to be consumed for the segment.
+   * consumingSegmentZKMetadata is a map from consuming segments to be moved 
to their ZK metadata. Returns a map from
+   * segment name to the age of that consuming segment. Return null if failed 
to obtain info for any consuming segment.
+   */
+  @Nullable
+  private Map<String, Integer> getConsumingSegmentsAge(String 
tableNameWithType,
+      Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+    Map<String, Integer> consumingSegmentsAge = new HashMap<>();
+    long now = System.currentTimeMillis();
+    try {
+      consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> {
+        if (segmentZKMetadata == null) {
+          LOGGER.warn("SegmentZKMetadata is null for segment: {} in table: 
{}", s, tableNameWithType);
+          throw new RuntimeException("SegmentZKMetadata is null");
+        }
+        long creationTime = segmentZKMetadata.getCreationTime();
+        if (creationTime < 0) {
+          LOGGER.warn("Creation time is not found for segment: {} in table: 
{}", s, tableNameWithType);
+          throw new RuntimeException("Creation time is not found");
+        }
+        consumingSegmentsAge.put(s, (int) (now - creationTime) / 60_000);
+      }));
+    } catch (Exception e) {
+      return null;

Review Comment:
   the one who throws exception is the anonymous function passed into the 
`consumingSegmentZKMetadata.forEach` here 
https://github.com/apache/pinot/blob/9c731b19d67f3ddf09958a14083ce61a4b1397e8/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java#L922
   
   throwing exceptions so that the call stack stops and return null 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to