klsince commented on code in PR #15368:
URL: https://github.com/apache/pinot/pull/15368#discussion_r2027503512


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -806,6 +848,153 @@ private List<String> getServerTag(String serverName) {
     return instanceConfig.getTags();
   }
 
+  private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
getConsumingSegmentSummary(String tableNameWithType,
+      Map<String, Set<String>> newServersToConsumingSegmentMap) {
+    if (newServersToConsumingSegmentMap.isEmpty()) {
+      return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(0, 0, 
new HashMap<>(), new HashMap<>(),
+          new HashMap<>());
+    }
+    int numConsumingSegmentsToBeMoved =
+        newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> 
a + b.size(), Integer::sum);
+    Set<String> uniqueConsumingSegments =
+        
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+    Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new 
HashMap<>();
+    uniqueConsumingSegments.forEach(segment -> 
consumingSegmentZKmetadata.put(segment,
+        
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
tableNameWithType, segment)));
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+        getConsumingSegmentsOffsetsToCatchUp(tableNameWithType, 
consumingSegmentZKmetadata);
+    Map<String, Integer> consumingSegmentsAge = 
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUpTopN;
+    Map<String, 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer>
+        consumingSegmentSummaryPerServer =
+        new HashMap<>();
+    if (consumingSegmentsOffsetsToCatchUp != null) {
+      consumingSegmentsOffsetsToCatchUpTopN = 
getTopNConsumingSegmentWithValue(consumingSegmentsOffsetsToCatchUp);
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        int totalOffsetsToCatchUp =
+            
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+        consumingSegmentSummaryPerServer.put(server,
+            new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+                segments.size(), totalOffsetsToCatchUp));
+      });
+    } else {
+      consumingSegmentsOffsetsToCatchUpTopN = null;
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        consumingSegmentSummaryPerServer.put(server,
+            new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+                segments.size(), -1));
+      });
+    }
+
+    Map<String, Integer> consumingSegmentsOldestTopN =
+        consumingSegmentsAge == null ? null : 
getTopNConsumingSegmentWithValue(consumingSegmentsAge);
+
+    return new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved,
+        newServersToConsumingSegmentMap.size(), 
consumingSegmentsOffsetsToCatchUpTopN, consumingSegmentsOldestTopN,
+        consumingSegmentSummaryPerServer);
+  }
+
+  private static Map<String, Integer> getTopNConsumingSegmentWithValue(
+      Map<String, Integer> consumingSegmentsWithValue) {
+    Map<String, Integer> topNConsumingSegments = new LinkedHashMap<>();
+    consumingSegmentsWithValue.entrySet()
+        .stream()
+        .sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
+        .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+        .forEach(entry -> topNConsumingSegments.put(entry.getKey(), 
entry.getValue()));
+    return topNConsumingSegments;
+  }
+
+  /**
+   * Fetches the age of each consuming segment in minutes.
+   * The age of a consuming segment is the time since the segment was created 
in ZK, it could be different to when
+   * the stream should start to be consumed for the segment.
+   * consumingSegmentZKMetadata is a map from consuming segments to be moved 
to their ZK metadata. Returns a map from
+   * segment name to the age of that consuming segment. Return null if failed 
to obtain info for any consuming segment.
+   */
+  @Nullable
+  private Map<String, Integer> getConsumingSegmentsAge(String 
tableNameWithType,
+      Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+    Map<String, Integer> consumingSegmentsAge = new HashMap<>();
+    long now = System.currentTimeMillis();
+    try {
+      consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> {
+        if (segmentZKMetadata == null) {
+          LOGGER.warn("SegmentZKMetadata is null for segment: {} in table: 
{}", s, tableNameWithType);
+          throw new RuntimeException("SegmentZKMetadata is null");
+        }
+        long creationTime = segmentZKMetadata.getCreationTime();

Review Comment:
   related to a question asked above, should we use segment's startTime here? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -806,6 +848,153 @@ private List<String> getServerTag(String serverName) {
     return instanceConfig.getTags();
   }
 
+  private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
getConsumingSegmentSummary(String tableNameWithType,
+      Map<String, Set<String>> newServersToConsumingSegmentMap) {
+    if (newServersToConsumingSegmentMap.isEmpty()) {
+      return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(0, 0, 
new HashMap<>(), new HashMap<>(),
+          new HashMap<>());
+    }
+    int numConsumingSegmentsToBeMoved =
+        newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> 
a + b.size(), Integer::sum);
+    Set<String> uniqueConsumingSegments =
+        
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+    Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new 
HashMap<>();
+    uniqueConsumingSegments.forEach(segment -> 
consumingSegmentZKmetadata.put(segment,
+        
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
tableNameWithType, segment)));
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+        getConsumingSegmentsOffsetsToCatchUp(tableNameWithType, 
consumingSegmentZKmetadata);
+    Map<String, Integer> consumingSegmentsAge = 
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUpTopN;
+    Map<String, 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer>
+        consumingSegmentSummaryPerServer =
+        new HashMap<>();

Review Comment:
   format?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -806,6 +848,153 @@ private List<String> getServerTag(String serverName) {
     return instanceConfig.getTags();
   }
 
+  private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
getConsumingSegmentSummary(String tableNameWithType,
+      Map<String, Set<String>> newServersToConsumingSegmentMap) {
+    if (newServersToConsumingSegmentMap.isEmpty()) {
+      return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(0, 0, 
new HashMap<>(), new HashMap<>(),
+          new HashMap<>());
+    }
+    int numConsumingSegmentsToBeMoved =
+        newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> 
a + b.size(), Integer::sum);
+    Set<String> uniqueConsumingSegments =
+        
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+    Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new 
HashMap<>();
+    uniqueConsumingSegments.forEach(segment -> 
consumingSegmentZKmetadata.put(segment,
+        
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
tableNameWithType, segment)));
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+        getConsumingSegmentsOffsetsToCatchUp(tableNameWithType, 
consumingSegmentZKmetadata);
+    Map<String, Integer> consumingSegmentsAge = 
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUpTopN;
+    Map<String, 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer>
+        consumingSegmentSummaryPerServer =
+        new HashMap<>();
+    if (consumingSegmentsOffsetsToCatchUp != null) {
+      consumingSegmentsOffsetsToCatchUpTopN = 
getTopNConsumingSegmentWithValue(consumingSegmentsOffsetsToCatchUp);
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        int totalOffsetsToCatchUp =
+            
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+        consumingSegmentSummaryPerServer.put(server,
+            new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+                segments.size(), totalOffsetsToCatchUp));
+      });
+    } else {
+      consumingSegmentsOffsetsToCatchUpTopN = null;
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        consumingSegmentSummaryPerServer.put(server,
+            new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+                segments.size(), -1));
+      });
+    }
+
+    Map<String, Integer> consumingSegmentsOldestTopN =
+        consumingSegmentsAge == null ? null : 
getTopNConsumingSegmentWithValue(consumingSegmentsAge);

Review Comment:
   nit: `getConsumingSegmentWithValue(@Nullable Integer topN) {...}` to be a 
bit method generic. Given null topN param, we can return all values if wanted. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -806,6 +848,153 @@ private List<String> getServerTag(String serverName) {
     return instanceConfig.getTags();
   }
 
+  private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
getConsumingSegmentSummary(String tableNameWithType,
+      Map<String, Set<String>> newServersToConsumingSegmentMap) {
+    if (newServersToConsumingSegmentMap.isEmpty()) {
+      return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(0, 0, 
new HashMap<>(), new HashMap<>(),
+          new HashMap<>());
+    }
+    int numConsumingSegmentsToBeMoved =
+        newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> 
a + b.size(), Integer::sum);
+    Set<String> uniqueConsumingSegments =
+        
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+    Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new 
HashMap<>();
+    uniqueConsumingSegments.forEach(segment -> 
consumingSegmentZKmetadata.put(segment,
+        
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
tableNameWithType, segment)));
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+        getConsumingSegmentsOffsetsToCatchUp(tableNameWithType, 
consumingSegmentZKmetadata);
+    Map<String, Integer> consumingSegmentsAge = 
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUpTopN;
+    Map<String, 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer>
+        consumingSegmentSummaryPerServer =
+        new HashMap<>();
+    if (consumingSegmentsOffsetsToCatchUp != null) {
+      consumingSegmentsOffsetsToCatchUpTopN = 
getTopNConsumingSegmentWithValue(consumingSegmentsOffsetsToCatchUp);
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        int totalOffsetsToCatchUp =
+            
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+        consumingSegmentSummaryPerServer.put(server,
+            new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+                segments.size(), totalOffsetsToCatchUp));
+      });
+    } else {
+      consumingSegmentsOffsetsToCatchUpTopN = null;
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        consumingSegmentSummaryPerServer.put(server,
+            new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+                segments.size(), -1));
+      });
+    }
+
+    Map<String, Integer> consumingSegmentsOldestTopN =
+        consumingSegmentsAge == null ? null : 
getTopNConsumingSegmentWithValue(consumingSegmentsAge);
+
+    return new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved,
+        newServersToConsumingSegmentMap.size(), 
consumingSegmentsOffsetsToCatchUpTopN, consumingSegmentsOldestTopN,
+        consumingSegmentSummaryPerServer);
+  }
+
+  private static Map<String, Integer> getTopNConsumingSegmentWithValue(
+      Map<String, Integer> consumingSegmentsWithValue) {
+    Map<String, Integer> topNConsumingSegments = new LinkedHashMap<>();
+    consumingSegmentsWithValue.entrySet()
+        .stream()
+        .sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
+        .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+        .forEach(entry -> topNConsumingSegments.put(entry.getKey(), 
entry.getValue()));
+    return topNConsumingSegments;
+  }
+
+  /**
+   * Fetches the age of each consuming segment in minutes.
+   * The age of a consuming segment is the time since the segment was created 
in ZK, it could be different to when
+   * the stream should start to be consumed for the segment.
+   * consumingSegmentZKMetadata is a map from consuming segments to be moved 
to their ZK metadata. Returns a map from
+   * segment name to the age of that consuming segment. Return null if failed 
to obtain info for any consuming segment.
+   */
+  @Nullable
+  private Map<String, Integer> getConsumingSegmentsAge(String 
tableNameWithType,
+      Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+    Map<String, Integer> consumingSegmentsAge = new HashMap<>();
+    long now = System.currentTimeMillis();
+    try {
+      consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> {
+        if (segmentZKMetadata == null) {
+          LOGGER.warn("SegmentZKMetadata is null for segment: {} in table: 
{}", s, tableNameWithType);
+          throw new RuntimeException("SegmentZKMetadata is null");
+        }
+        long creationTime = segmentZKMetadata.getCreationTime();
+        if (creationTime < 0) {
+          LOGGER.warn("Creation time is not found for segment: {} in table: 
{}", s, tableNameWithType);
+          throw new RuntimeException("Creation time is not found");
+        }
+        consumingSegmentsAge.put(s, (int) (now - creationTime) / 60_000);
+      }));
+    } catch (Exception e) {
+      return null;

Review Comment:
   looks like `throw new RuntimeException("Creation time is not found");` is 
the only stmt that will throw exception to be caught here? If so, I'd suggest 
just returning `null` in that if block above



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -806,6 +848,153 @@ private List<String> getServerTag(String serverName) {
     return instanceConfig.getTags();
   }
 
+  private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
getConsumingSegmentSummary(String tableNameWithType,
+      Map<String, Set<String>> newServersToConsumingSegmentMap) {
+    if (newServersToConsumingSegmentMap.isEmpty()) {
+      return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(0, 0, 
new HashMap<>(), new HashMap<>(),
+          new HashMap<>());
+    }
+    int numConsumingSegmentsToBeMoved =
+        newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> 
a + b.size(), Integer::sum);
+    Set<String> uniqueConsumingSegments =
+        
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+    Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new 
HashMap<>();
+    uniqueConsumingSegments.forEach(segment -> 
consumingSegmentZKmetadata.put(segment,
+        
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
tableNameWithType, segment)));
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+        getConsumingSegmentsOffsetsToCatchUp(tableNameWithType, 
consumingSegmentZKmetadata);
+    Map<String, Integer> consumingSegmentsAge = 
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUpTopN;
+    Map<String, 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer>
+        consumingSegmentSummaryPerServer =
+        new HashMap<>();
+    if (consumingSegmentsOffsetsToCatchUp != null) {
+      consumingSegmentsOffsetsToCatchUpTopN = 
getTopNConsumingSegmentWithValue(consumingSegmentsOffsetsToCatchUp);
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        int totalOffsetsToCatchUp =
+            
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+        consumingSegmentSummaryPerServer.put(server,
+            new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+                segments.size(), totalOffsetsToCatchUp));
+      });
+    } else {
+      consumingSegmentsOffsetsToCatchUpTopN = null;
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        consumingSegmentSummaryPerServer.put(server,
+            new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+                segments.size(), -1));
+      });
+    }
+
+    Map<String, Integer> consumingSegmentsOldestTopN =
+        consumingSegmentsAge == null ? null : 
getTopNConsumingSegmentWithValue(consumingSegmentsAge);
+
+    return new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved,
+        newServersToConsumingSegmentMap.size(), 
consumingSegmentsOffsetsToCatchUpTopN, consumingSegmentsOldestTopN,
+        consumingSegmentSummaryPerServer);
+  }
+
+  private static Map<String, Integer> getTopNConsumingSegmentWithValue(
+      Map<String, Integer> consumingSegmentsWithValue) {
+    Map<String, Integer> topNConsumingSegments = new LinkedHashMap<>();
+    consumingSegmentsWithValue.entrySet()
+        .stream()
+        .sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
+        .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)

Review Comment:
   nit: I'd prefer to pass in topN param, so the method can be more functional



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -806,6 +848,153 @@ private List<String> getServerTag(String serverName) {
     return instanceConfig.getTags();
   }
 
+  private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
getConsumingSegmentSummary(String tableNameWithType,
+      Map<String, Set<String>> newServersToConsumingSegmentMap) {
+    if (newServersToConsumingSegmentMap.isEmpty()) {
+      return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(0, 0, 
new HashMap<>(), new HashMap<>(),
+          new HashMap<>());
+    }
+    int numConsumingSegmentsToBeMoved =
+        newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> 
a + b.size(), Integer::sum);
+    Set<String> uniqueConsumingSegments =
+        
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+    Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new 
HashMap<>();
+    uniqueConsumingSegments.forEach(segment -> 
consumingSegmentZKmetadata.put(segment,
+        
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
tableNameWithType, segment)));
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+        getConsumingSegmentsOffsetsToCatchUp(tableNameWithType, 
consumingSegmentZKmetadata);
+    Map<String, Integer> consumingSegmentsAge = 
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUpTopN;
+    Map<String, 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer>
+        consumingSegmentSummaryPerServer =
+        new HashMap<>();
+    if (consumingSegmentsOffsetsToCatchUp != null) {
+      consumingSegmentsOffsetsToCatchUpTopN = 
getTopNConsumingSegmentWithValue(consumingSegmentsOffsetsToCatchUp);
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        int totalOffsetsToCatchUp =
+            
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+        consumingSegmentSummaryPerServer.put(server,
+            new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+                segments.size(), totalOffsetsToCatchUp));
+      });
+    } else {
+      consumingSegmentsOffsetsToCatchUpTopN = null;
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        consumingSegmentSummaryPerServer.put(server,
+            new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+                segments.size(), -1));
+      });
+    }
+
+    Map<String, Integer> consumingSegmentsOldestTopN =
+        consumingSegmentsAge == null ? null : 
getTopNConsumingSegmentWithValue(consumingSegmentsAge);
+
+    return new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved,
+        newServersToConsumingSegmentMap.size(), 
consumingSegmentsOffsetsToCatchUpTopN, consumingSegmentsOldestTopN,
+        consumingSegmentSummaryPerServer);
+  }
+
+  private static Map<String, Integer> getTopNConsumingSegmentWithValue(
+      Map<String, Integer> consumingSegmentsWithValue) {
+    Map<String, Integer> topNConsumingSegments = new LinkedHashMap<>();
+    consumingSegmentsWithValue.entrySet()
+        .stream()
+        .sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
+        .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+        .forEach(entry -> topNConsumingSegments.put(entry.getKey(), 
entry.getValue()));
+    return topNConsumingSegments;
+  }
+
+  /**
+   * Fetches the age of each consuming segment in minutes.
+   * The age of a consuming segment is the time since the segment was created 
in ZK, it could be different to when
+   * the stream should start to be consumed for the segment.
+   * consumingSegmentZKMetadata is a map from consuming segments to be moved 
to their ZK metadata. Returns a map from
+   * segment name to the age of that consuming segment. Return null if failed 
to obtain info for any consuming segment.
+   */
+  @Nullable
+  private Map<String, Integer> getConsumingSegmentsAge(String 
tableNameWithType,
+      Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+    Map<String, Integer> consumingSegmentsAge = new HashMap<>();
+    long now = System.currentTimeMillis();
+    try {
+      consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> {
+        if (segmentZKMetadata == null) {
+          LOGGER.warn("SegmentZKMetadata is null for segment: {} in table: 
{}", s, tableNameWithType);
+          throw new RuntimeException("SegmentZKMetadata is null");
+        }
+        long creationTime = segmentZKMetadata.getCreationTime();
+        if (creationTime < 0) {
+          LOGGER.warn("Creation time is not found for segment: {} in table: 
{}", s, tableNameWithType);
+          throw new RuntimeException("Creation time is not found");
+        }
+        consumingSegmentsAge.put(s, (int) (now - creationTime) / 60_000);
+      }));
+    } catch (Exception e) {
+      return null;
+    }
+    return consumingSegmentsAge;
+  }
+
+  @VisibleForTesting
+  ConsumingSegmentInfoReader getConsumingSegmentInfoReader() {
+    return _consumingSegmentInfoReader;
+  }
+
+  /**
+   * Fetches the consuming segment info for the table and calculates the 
number of offsets to catch up for each
+   * consuming segment. consumingSegmentZKMetadata is a map from consuming 
segments to be moved to their ZK metadata.
+   * Returns a map from segment name to the number of offsets to catch up for 
that consuming
+   * segment. Return null if failed to obtain info for any consuming segment.
+   */
+  @Nullable
+  private Map<String, Integer> getConsumingSegmentsOffsetsToCatchUp(String 
tableNameWithType,
+      Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+    ConsumingSegmentInfoReader consumingSegmentInfoReader = 
getConsumingSegmentInfoReader();
+    if (consumingSegmentInfoReader == null) {
+      LOGGER.warn("ConsumingSegmentInfoReader is null, cannot calculate 
consuming segments info for table: {}",
+          tableNameWithType);
+      return null;
+    }
+    Map<String, Integer> segmentToOffsetsToCatchUp = new HashMap<>();
+    try {
+      ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap 
consumingSegmentsInfoMap =
+          
consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, 30_000);
+      for (Map.Entry<String, SegmentZKMetadata> entry : 
consumingSegmentZKMetadata.entrySet()) {
+        String segmentName = entry.getKey();
+        List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> 
consumingSegmentInfoList =
+            
consumingSegmentsInfoMap._segmentToConsumingInfoMap.getOrDefault(segmentName, 
null);
+        SegmentZKMetadata segmentZKMetadata = entry.getValue();
+        if (segmentZKMetadata == null) {
+          LOGGER.warn("Cannot find SegmentZKMetadata for segment: {} in table: 
{}", segmentName, tableNameWithType);
+          return null;
+        }
+        String startOffset = segmentZKMetadata.getStartOffset();
+        if (startOffset == null) {
+          LOGGER.warn("Start offset is null for segment: {} in table: {}", 
segmentName, tableNameWithType);
+          return null;
+        }
+        if (consumingSegmentInfoList == null || 
consumingSegmentInfoList.isEmpty()) {
+          LOGGER.warn("No available consuming segment info from any server. 
Segment: {} in table: {}", segmentName,
+              tableNameWithType);
+          return null;
+        }
+        // this value should be the same regardless of which server the 
consuming segment info is from, use the
+        // first in the list here
+        int offsetsToCatchUp =
+            
consumingSegmentInfoList.get(0)._partitionOffsetInfo._latestUpstreamOffsetMap.values()

Review Comment:
   looks like we use consumingSegmentInfoReader to ask servers for upstream's 
latest offsets. 
   But under the hood, server uses this util to get offsets. So would it be 
more efficient to use this here to get the offsets from upstream directly?
   ```
   _streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfig);
   ...
   _partitionMetadataProvider = 
_streamConsumerFactory.createPartitionMetadataProvider(
           _clientId, _streamPatitionGroupId);
   ...
   
_partitionMetadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA,
 timeout);
   ...
   ```
   _streamConfig is from tableConfig, but may need to fake _clientId/groupId 
values



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java:
##########
@@ -306,18 +297,120 @@ public Map<String, ServerSegmentChangeInfo> 
getServerSegmentChangeInfo() {
     }
   }
 
+  public static class ConsumingSegmentToBeMovedSummary {
+    private final int _numConsumingSegmentsToBeMoved;
+    private final int _numServerGettingConsumingSegmentsAdded;
+    private final Map<String, Integer> 
_consumingSegmentsToBeMovedWithMostOffsetsToCatchUp;
+    private final Map<String, Integer> 
_oldestConsumingSegmentsToBeMovedInMinutes;
+    private final Map<String, ConsumingSegmentSummaryPerServer> 
_serverConsumingSegmentSummary;
+
+    /**
+     * Constructor for ConsumingSegmentToBeMovedSummary
+     * @param numConsumingSegmentsToBeMoved total number of consuming segments 
to be moved as part of this rebalance
+     * @param numServerGettingConsumingSegmentsAdded maximum bytes of 
consuming segments to be moved to catch up
+     * @param consumingSegmentsToBeMovedWithMostOffsetsToCatchUp top consuming 
segments to be moved to catch up.

Review Comment:
   hmm. re-reading the description of 
`consumingSegmentsToBeMovedWithOldestAgeInMinutes`, should it use `segment 
start time from ZK metadata` instead of `segment creation time from ZK 
metadata`? 
   
   because if it's based on creation time, then the oldest segment must be the 
one with smallest starting offset, right? however, if it's based on start time, 
then it's data's time (i.e. even time) would decide which segment would be the 
oldest one? IIUC, we wanted to show the oldest data time here, but correct me 
if I misunderstood it.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -806,6 +848,153 @@ private List<String> getServerTag(String serverName) {
     return instanceConfig.getTags();
   }
 
+  private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
getConsumingSegmentSummary(String tableNameWithType,
+      Map<String, Set<String>> newServersToConsumingSegmentMap) {
+    if (newServersToConsumingSegmentMap.isEmpty()) {
+      return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(0, 0, 
new HashMap<>(), new HashMap<>(),
+          new HashMap<>());
+    }
+    int numConsumingSegmentsToBeMoved =
+        newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> 
a + b.size(), Integer::sum);
+    Set<String> uniqueConsumingSegments =
+        
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+    Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new 
HashMap<>();
+    uniqueConsumingSegments.forEach(segment -> 
consumingSegmentZKmetadata.put(segment,
+        
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
tableNameWithType, segment)));
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+        getConsumingSegmentsOffsetsToCatchUp(tableNameWithType, 
consumingSegmentZKmetadata);
+    Map<String, Integer> consumingSegmentsAge = 
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUpTopN;
+    Map<String, 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer>
+        consumingSegmentSummaryPerServer =
+        new HashMap<>();
+    if (consumingSegmentsOffsetsToCatchUp != null) {
+      consumingSegmentsOffsetsToCatchUpTopN = 
getTopNConsumingSegmentWithValue(consumingSegmentsOffsetsToCatchUp);
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        int totalOffsetsToCatchUp =
+            
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+        consumingSegmentSummaryPerServer.put(server,
+            new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+                segments.size(), totalOffsetsToCatchUp));
+      });
+    } else {
+      consumingSegmentsOffsetsToCatchUpTopN = null;
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        consumingSegmentSummaryPerServer.put(server,
+            new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+                segments.size(), -1));
+      });
+    }
+
+    Map<String, Integer> consumingSegmentsOldestTopN =
+        consumingSegmentsAge == null ? null : 
getTopNConsumingSegmentWithValue(consumingSegmentsAge);
+
+    return new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved,
+        newServersToConsumingSegmentMap.size(), 
consumingSegmentsOffsetsToCatchUpTopN, consumingSegmentsOldestTopN,
+        consumingSegmentSummaryPerServer);
+  }
+
+  private static Map<String, Integer> getTopNConsumingSegmentWithValue(
+      Map<String, Integer> consumingSegmentsWithValue) {
+    Map<String, Integer> topNConsumingSegments = new LinkedHashMap<>();
+    consumingSegmentsWithValue.entrySet()
+        .stream()
+        .sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
+        .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+        .forEach(entry -> topNConsumingSegments.put(entry.getKey(), 
entry.getValue()));
+    return topNConsumingSegments;
+  }
+
+  /**
+   * Fetches the age of each consuming segment in minutes.
+   * The age of a consuming segment is the time since the segment was created 
in ZK, it could be different to when
+   * the stream should start to be consumed for the segment.
+   * consumingSegmentZKMetadata is a map from consuming segments to be moved 
to their ZK metadata. Returns a map from
+   * segment name to the age of that consuming segment. Return null if failed 
to obtain info for any consuming segment.
+   */
+  @Nullable
+  private Map<String, Integer> getConsumingSegmentsAge(String 
tableNameWithType,
+      Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+    Map<String, Integer> consumingSegmentsAge = new HashMap<>();
+    long now = System.currentTimeMillis();
+    try {
+      consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> {
+        if (segmentZKMetadata == null) {
+          LOGGER.warn("SegmentZKMetadata is null for segment: {} in table: 
{}", s, tableNameWithType);
+          throw new RuntimeException("SegmentZKMetadata is null");
+        }
+        long creationTime = segmentZKMetadata.getCreationTime();
+        if (creationTime < 0) {
+          LOGGER.warn("Creation time is not found for segment: {} in table: 
{}", s, tableNameWithType);
+          throw new RuntimeException("Creation time is not found");
+        }
+        consumingSegmentsAge.put(s, (int) (now - creationTime) / 60_000);
+      }));
+    } catch (Exception e) {
+      return null;
+    }
+    return consumingSegmentsAge;
+  }
+
+  @VisibleForTesting
+  ConsumingSegmentInfoReader getConsumingSegmentInfoReader() {
+    return _consumingSegmentInfoReader;
+  }
+
+  /**
+   * Fetches the consuming segment info for the table and calculates the 
number of offsets to catch up for each
+   * consuming segment. consumingSegmentZKMetadata is a map from consuming 
segments to be moved to their ZK metadata.
+   * Returns a map from segment name to the number of offsets to catch up for 
that consuming
+   * segment. Return null if failed to obtain info for any consuming segment.
+   */
+  @Nullable
+  private Map<String, Integer> getConsumingSegmentsOffsetsToCatchUp(String 
tableNameWithType,
+      Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+    ConsumingSegmentInfoReader consumingSegmentInfoReader = 
getConsumingSegmentInfoReader();
+    if (consumingSegmentInfoReader == null) {
+      LOGGER.warn("ConsumingSegmentInfoReader is null, cannot calculate 
consuming segments info for table: {}",
+          tableNameWithType);
+      return null;
+    }
+    Map<String, Integer> segmentToOffsetsToCatchUp = new HashMap<>();
+    try {
+      ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap 
consumingSegmentsInfoMap =
+          
consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, 30_000);

Review Comment:
   is 30s a timeout here? should it be configurable? or perhaps define a 
constant var for this default value, like that TOP_N default value



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to