J-HowHuang commented on code in PR #15368:
URL: https://github.com/apache/pinot/pull/15368#discussion_r2021700858


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -733,15 +781,148 @@ private RebalanceSummaryResult 
calculateDryRunSummary(Map<String, Map<String, St
         serversGettingNewSegments, serverSegmentChangeInfoMap);
     // TODO: Add a metric to estimate the total time it will take to 
rebalance. Need some good heuristics on how
     //       rebalance time can vary with number of segments added
+    RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
consumingSegmentToBeMovedSummary =
+        isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType, 
newServersToConsumingSegmentMap);
     RebalanceSummaryResult.SegmentInfo segmentInfo = new 
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
         maxSegmentsAddedToServer, averageSegmentSizeInBytes, 
totalEstimatedDataToBeMovedInBytes,
-        replicationFactor, numSegmentsInSingleReplica, 
numSegmentsAcrossAllReplicas);
+        replicationFactor, numSegmentsInSingleReplica, 
numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary);
 
     LOGGER.info("Calculated rebalance summary for table: {} with 
rebalanceJobId: {}", tableNameWithType,
         rebalanceJobId);
     return new RebalanceSummaryResult(serverInfo, segmentInfo);
   }
 
+  private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
getConsumingSegmentSummary(String tableNameWithType,
+      Map<String, Set<String>> newServersToConsumingSegmentMap) {
+    int numConsumingSegmentsToBeMoved =
+        newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> 
a + b.size(), Integer::sum);
+    Set<String> uniqueConsumingSegments =
+        
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+    Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new 
HashMap<>();
+    uniqueConsumingSegments.forEach(segment -> 
consumingSegmentZKmetadata.put(segment,
+        
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
tableNameWithType, segment)));
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+        getConsumingSegmentsOffsetsToCatchUp(tableNameWithType, 
consumingSegmentZKmetadata);
+    Map<String, Integer> consumingSegmentsAge = 
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+    Map<String, Integer> topTenOffset;
+    Map<String, RebalanceSummaryResult.ConsumingSegmentSummaryPerServer> 
consumingSegmentSummaryPerServer =
+        new HashMap<>();
+    if (consumingSegmentsOffsetsToCatchUp != null) {
+      topTenOffset = new LinkedHashMap<>();
+      consumingSegmentsOffsetsToCatchUp.entrySet()
+          .stream()
+          .sorted(
+              Collections.reverseOrder(Map.Entry.comparingByValue()))
+          .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+          .forEach(entry -> topTenOffset.put(entry.getKey(), 
entry.getValue()));
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        int totalOffsetsToCatchUp =
+            
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+        consumingSegmentSummaryPerServer.put(server, new 
RebalanceSummaryResult.ConsumingSegmentSummaryPerServer(
+            segments.size(), totalOffsetsToCatchUp));
+      });
+    } else {
+      topTenOffset = null;
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        consumingSegmentSummaryPerServer.put(server, new 
RebalanceSummaryResult.ConsumingSegmentSummaryPerServer(
+            segments.size(), null));
+      });
+    }
+
+    Map<String, Integer> oldestTenSegment;
+    oldestTenSegment = new LinkedHashMap<>();
+    consumingSegmentsAge.entrySet()
+        .stream()
+        .sorted(
+            Map.Entry.comparingByValue())
+        .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+        .forEach(entry -> oldestTenSegment.put(entry.getKey(), 
entry.getValue()));
+
+    return new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved,
+        newServersToConsumingSegmentMap.size(), topTenOffset, 
oldestTenSegment, consumingSegmentSummaryPerServer);
+  }
+
+  private Map<String, Integer> getConsumingSegmentsAge(String 
tableNameWithType,
+      Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+    Map<String, Integer> consumingSegmentsAge = new HashMap<>();
+    long now = System.currentTimeMillis();
+    consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> {
+      long creationTime = segmentZKMetadata.getCreationTime();
+      if (creationTime < 0) {
+        LOGGER.warn("Creation time is not found for segment: {} in table: {}", 
s, tableNameWithType);
+        return;
+      }
+      consumingSegmentsAge.put(s, (int) (now - creationTime) / 60_000);
+    }));
+    return consumingSegmentsAge;
+  }
+
+  @VisibleForTesting
+  ConsumingSegmentInfoReader getConsumingSegmentInfoReader() {
+    if (_executorService == null || _connectionManager == null || 
_pinotHelixResourceManager == null) {
+      return null;
+    }
+    return new ConsumingSegmentInfoReader(_executorService, 
_connectionManager, _pinotHelixResourceManager);
+  }
+
+  /**
+   * Fetches the consuming segment info for the table and calculates the 
number of offsets to catch up for each
+   * consuming segment. consumingSegmentZKMetadata is a map from consuming 
segments to be moved to their ZK metadata.
+   * Returns a map from segment name to the number of offsets to catch up for 
that consuming
+   * segment. Return null if failed to obtain info for any consuming segment.
+   */
+  private Map<String, Integer> getConsumingSegmentsOffsetsToCatchUp(String 
tableNameWithType,

Review Comment:
   I prefer kept this as it is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to