somandal commented on code in PR #15368:
URL: https://github.com/apache/pinot/pull/15368#discussion_r2019835365


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java:
##########
@@ -306,18 +298,107 @@ public Map<String, ServerSegmentChangeInfo> 
getServerSegmentChangeInfo() {
     }
   }
 
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public static class ConsumingSegmentToBeMovedSummary {
+    private final int _numConsumingSegmentsToBeMoved;
+    private final int _numServerGettingConsumingSegmentsAdded;
+    // the top N consuming segments with most offsets to be consumed by a 
server. this is essentially the difference
+    // between the latest offset of the stream and the segment's start offset 
of the stream
+    private final Map<String, Integer> 
_consumingSegmentsToBeMovedWithMostOffsetsToCatchUp;
+    // top N oldest consuming segments. the age of a segment is determined by 
its creation time
+    private final Map<String, Integer> 
_oldestConsumingSegmentsToBeMovedInMinutes;
+    private final Map<String, ConsumingSegmentSummaryPerServer> 
_serverConsumingSegmentSummary;
+
+    /**
+     * Constructor for ConsumingSegmentInfo
+     * @param numConsumingSegmentsToBeMoved total number of consuming segments 
to be moved as part of this rebalance
+     * @param numServerGettingConsumingSegmentsAdded maximum bytes of 
consuming segments to be moved to catch up
+     * @param consumingSegmentsToBeMovedWithMostOffsetsToCatchUp top consuming 
segments to be moved to catch up
+     * @param oldestConsumingSegmentsToBeMovedInMinutes oldest consuming 
segments to be moved to catch up
+     * @param serverConsumingSegmentSummary offsets of consuming segments to 
be moved to catch up per
+     *                                                   server
+     */
+    @JsonCreator
+    public ConsumingSegmentToBeMovedSummary(
+        @JsonProperty("numConsumingSegmentsToBeMoved") int 
numConsumingSegmentsToBeMoved,
+        @JsonProperty("numServerGettingConsumingSegmentsAdded") int 
numServerGettingConsumingSegmentsAdded,
+        @JsonProperty("consumingSegmentsToBeMovedWithMostOffsetsToCatchUp") 
@Nullable
+        Map<String, Integer> 
consumingSegmentsToBeMovedWithMostOffsetsToCatchUp,
+        @JsonProperty("oldestConsumingSegmentsToBeMovedInMinutes") @Nullable
+        Map<String, Integer> oldestConsumingSegmentsToBeMovedInMinutes,

Review Comment:
   do these need to be marked as `@Nullable`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java:
##########
@@ -306,18 +298,107 @@ public Map<String, ServerSegmentChangeInfo> 
getServerSegmentChangeInfo() {
     }
   }
 
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public static class ConsumingSegmentToBeMovedSummary {
+    private final int _numConsumingSegmentsToBeMoved;
+    private final int _numServerGettingConsumingSegmentsAdded;
+    // the top N consuming segments with most offsets to be consumed by a 
server. this is essentially the difference
+    // between the latest offset of the stream and the segment's start offset 
of the stream
+    private final Map<String, Integer> 
_consumingSegmentsToBeMovedWithMostOffsetsToCatchUp;
+    // top N oldest consuming segments. the age of a segment is determined by 
its creation time
+    private final Map<String, Integer> 
_oldestConsumingSegmentsToBeMovedInMinutes;
+    private final Map<String, ConsumingSegmentSummaryPerServer> 
_serverConsumingSegmentSummary;
+
+    /**
+     * Constructor for ConsumingSegmentInfo
+     * @param numConsumingSegmentsToBeMoved total number of consuming segments 
to be moved as part of this rebalance
+     * @param numServerGettingConsumingSegmentsAdded maximum bytes of 
consuming segments to be moved to catch up
+     * @param consumingSegmentsToBeMovedWithMostOffsetsToCatchUp top consuming 
segments to be moved to catch up

Review Comment:
   nit: in the comment can you mention the word "offset"



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -711,6 +750,15 @@ private RebalanceSummaryResult 
calculateDryRunSummary(Map<String, Map<String, St
       }
     }
 
+    if (existingServersToConsumingSegmentMap != null && 
newServersToConsumingSegmentMap != null) {
+      for (Map.Entry<String, Set<String>> entry : 
newServersToConsumingSegmentMap.entrySet()) {
+        String server = entry.getKey();
+        
entry.getValue().removeAll(existingServersToConsumingSegmentMap.getOrDefault(server,
 Collections.emptySet()));

Review Comment:
   > And key removal from the map should be after the traversal here.
   
   I didn't understand what you meant here. can you explain?
   
   I see it accessed here, and then again in:
   
   ```
   newServersToConsumingSegmentMap.entrySet().removeIf(entry -> 
entry.getValue().isEmpty());
   ```
   
   am i missing something? or are you saying that you're using it directly 
because of key removal? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -733,15 +781,148 @@ private RebalanceSummaryResult 
calculateDryRunSummary(Map<String, Map<String, St
         serversGettingNewSegments, serverSegmentChangeInfoMap);
     // TODO: Add a metric to estimate the total time it will take to 
rebalance. Need some good heuristics on how
     //       rebalance time can vary with number of segments added
+    RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
consumingSegmentToBeMovedSummary =
+        isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType, 
newServersToConsumingSegmentMap);
     RebalanceSummaryResult.SegmentInfo segmentInfo = new 
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
         maxSegmentsAddedToServer, averageSegmentSizeInBytes, 
totalEstimatedDataToBeMovedInBytes,
-        replicationFactor, numSegmentsInSingleReplica, 
numSegmentsAcrossAllReplicas);
+        replicationFactor, numSegmentsInSingleReplica, 
numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary);
 
     LOGGER.info("Calculated rebalance summary for table: {} with 
rebalanceJobId: {}", tableNameWithType,
         rebalanceJobId);
     return new RebalanceSummaryResult(serverInfo, segmentInfo);
   }
 
+  private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
getConsumingSegmentSummary(String tableNameWithType,
+      Map<String, Set<String>> newServersToConsumingSegmentMap) {
+    int numConsumingSegmentsToBeMoved =
+        newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> 
a + b.size(), Integer::sum);
+    Set<String> uniqueConsumingSegments =
+        
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+    Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new 
HashMap<>();
+    uniqueConsumingSegments.forEach(segment -> 
consumingSegmentZKmetadata.put(segment,
+        
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
tableNameWithType, segment)));
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+        getConsumingSegmentsOffsetsToCatchUp(tableNameWithType, 
consumingSegmentZKmetadata);
+    Map<String, Integer> consumingSegmentsAge = 
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+    Map<String, Integer> topTenOffset;
+    Map<String, RebalanceSummaryResult.ConsumingSegmentSummaryPerServer> 
consumingSegmentSummaryPerServer =
+        new HashMap<>();
+    if (consumingSegmentsOffsetsToCatchUp != null) {
+      topTenOffset = new LinkedHashMap<>();
+      consumingSegmentsOffsetsToCatchUp.entrySet()
+          .stream()
+          .sorted(
+              Collections.reverseOrder(Map.Entry.comparingByValue()))
+          .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+          .forEach(entry -> topTenOffset.put(entry.getKey(), 
entry.getValue()));
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        int totalOffsetsToCatchUp =
+            
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+        consumingSegmentSummaryPerServer.put(server, new 
RebalanceSummaryResult.ConsumingSegmentSummaryPerServer(
+            segments.size(), totalOffsetsToCatchUp));
+      });
+    } else {
+      topTenOffset = null;
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        consumingSegmentSummaryPerServer.put(server, new 
RebalanceSummaryResult.ConsumingSegmentSummaryPerServer(
+            segments.size(), null));
+      });
+    }
+
+    Map<String, Integer> oldestTenSegment;
+    oldestTenSegment = new LinkedHashMap<>();
+    consumingSegmentsAge.entrySet()
+        .stream()
+        .sorted(
+            Map.Entry.comparingByValue())
+        .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+        .forEach(entry -> oldestTenSegment.put(entry.getKey(), 
entry.getValue()));
+
+    return new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved,
+        newServersToConsumingSegmentMap.size(), topTenOffset, 
oldestTenSegment, consumingSegmentSummaryPerServer);
+  }
+
+  private Map<String, Integer> getConsumingSegmentsAge(String 
tableNameWithType,
+      Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+    Map<String, Integer> consumingSegmentsAge = new HashMap<>();
+    long now = System.currentTimeMillis();
+    consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> {
+      long creationTime = segmentZKMetadata.getCreationTime();
+      if (creationTime < 0) {
+        LOGGER.warn("Creation time is not found for segment: {} in table: {}", 
s, tableNameWithType);
+        return;
+      }
+      consumingSegmentsAge.put(s, (int) (now - creationTime) / 60_000);
+    }));
+    return consumingSegmentsAge;
+  }
+
+  @VisibleForTesting
+  ConsumingSegmentInfoReader getConsumingSegmentInfoReader() {
+    if (_executorService == null || _connectionManager == null || 
_pinotHelixResourceManager == null) {
+      return null;
+    }
+    return new ConsumingSegmentInfoReader(_executorService, 
_connectionManager, _pinotHelixResourceManager);
+  }
+
+  /**
+   * Fetches the consuming segment info for the table and calculates the 
number of offsets to catch up for each
+   * consuming segment. consumingSegmentZKMetadata is a map from consuming 
segments to be moved to their ZK metadata.
+   * Returns a map from segment name to the number of offsets to catch up for 
that consuming
+   * segment. Return null if failed to obtain info for any consuming segment.
+   */
+  private Map<String, Integer> getConsumingSegmentsOffsetsToCatchUp(String 
tableNameWithType,
+      Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+    if (consumingSegmentZKMetadata.isEmpty()) {
+      LOGGER.info("No consuming segments being moved for table: {}", 
tableNameWithType);
+      return new HashMap<>();
+    }
+    ConsumingSegmentInfoReader consumingSegmentInfoReader = 
getConsumingSegmentInfoReader();
+    if (consumingSegmentInfoReader == null) {
+      LOGGER.warn("ConsumingSegmentInfoReader is null, cannot calculate 
consuming segments info for table: {}",
+          tableNameWithType);
+      return null;
+    }
+    Map<String, Integer> segmentToOffsetsToCatchUp = new HashMap<>();
+    try {
+      ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap 
consumingSegmentsInfoMap =
+          
consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, 30_000);
+      for (Map.Entry<String, SegmentZKMetadata> entry : 
consumingSegmentZKMetadata.entrySet()) {
+        String segmentName = entry.getKey();
+        List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> 
consumingSegmentInfoList =
+            
consumingSegmentsInfoMap._segmentToConsumingInfoMap.getOrDefault(segmentName, 
null);
+        SegmentZKMetadata segmentZKMetadata = entry.getValue();
+        if (segmentZKMetadata == null) {
+          LOGGER.warn("Cannot find SegmentZKMetadata for segment: {} in table: 
{}", segmentName, tableNameWithType);
+          return null;
+        }
+        String startOffset = segmentZKMetadata.getStartOffset();
+        if (startOffset == null) {
+          LOGGER.warn("Start offset is null for segment: {} in table: {}", 
segmentName, tableNameWithType);
+          return null;
+        }
+        if (consumingSegmentInfoList != null && 
!consumingSegmentInfoList.isEmpty()) {
+          // this value should be the same regardless of which server the 
consuming segment info is from, use the
+          // first in the list here
+          int offsetsToCatchUp =
+              
consumingSegmentInfoList.get(0)._partitionOffsetInfo._latestUpstreamOffsetMap.values()

Review Comment:
   the catch is only catching `InvalidConfigException`, doesn't seem to be 
catching null pointer exception etc? can you do a quick test for NPE?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java:
##########
@@ -214,7 +214,7 @@ public String forTableRebalance(String tableName, String 
tableType, boolean dryR
     if (reassignInstances) {
       stringBuilder.append("&reassignInstances=").append(reassignInstances);
     }
-    if (includeConsuming) {
+    if (!includeConsuming) {

Review Comment:
   since we decided to open a separate PR for this, should you remove this 
change from this PR?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -806,6 +863,150 @@ private List<String> getServerTag(String serverName) {
     return instanceConfig.getTags();
   }
 
+  private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
getConsumingSegmentSummary(String tableNameWithType,
+      Map<String, Set<String>> newServersToConsumingSegmentMap) {
+    if (newServersToConsumingSegmentMap.isEmpty()) {

Review Comment:
   just curious, why not return null instead? why even create this object?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -806,6 +863,150 @@ private List<String> getServerTag(String serverName) {
     return instanceConfig.getTags();
   }
 
+  private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
getConsumingSegmentSummary(String tableNameWithType,
+      Map<String, Set<String>> newServersToConsumingSegmentMap) {
+    if (newServersToConsumingSegmentMap.isEmpty()) {
+      return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(0, 0, 
new HashMap<>(), new HashMap<>(),
+          new HashMap<>());
+    }
+    int numConsumingSegmentsToBeMoved =
+        newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> 
a + b.size(), Integer::sum);
+    Set<String> uniqueConsumingSegments =
+        
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+    Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new 
HashMap<>();
+    uniqueConsumingSegments.forEach(segment -> 
consumingSegmentZKmetadata.put(segment,
+        
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
tableNameWithType, segment)));
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+        getConsumingSegmentsOffsetsToCatchUp(tableNameWithType, 
consumingSegmentZKmetadata);
+    Map<String, Integer> consumingSegmentsAge = 
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUpTopN;
+    Map<String, 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer>
+        consumingSegmentSummaryPerServer =
+        new HashMap<>();
+    if (consumingSegmentsOffsetsToCatchUp != null) {
+      consumingSegmentsOffsetsToCatchUpTopN = 
getTopNConsumingSegmentWithValue(consumingSegmentsOffsetsToCatchUp);
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        int totalOffsetsToCatchUp =
+            
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+        consumingSegmentSummaryPerServer.put(server,
+            new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+                segments.size(), totalOffsetsToCatchUp));
+      });
+    } else {
+      consumingSegmentsOffsetsToCatchUpTopN = null;
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        consumingSegmentSummaryPerServer.put(server,
+            new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+                segments.size(), null));
+      });
+    }
+
+    Map<String, Integer> consumingSegmentsOldestTopN = 
getTopNConsumingSegmentWithValue(consumingSegmentsAge);
+
+    return new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved,
+        newServersToConsumingSegmentMap.size(), 
consumingSegmentsOffsetsToCatchUpTopN, consumingSegmentsOldestTopN,
+        consumingSegmentSummaryPerServer);
+  }
+
+  private static Map<String, Integer> getTopNConsumingSegmentWithValue(
+      Map<String, Integer> consumingSegmentsWithValue) {
+    Map<String, Integer> topNConsumingSegments = new LinkedHashMap<>();
+    consumingSegmentsWithValue.entrySet()
+        .stream()
+        .sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
+        .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+        .forEach(entry -> topNConsumingSegments.put(entry.getKey(), 
entry.getValue()));
+    return topNConsumingSegments;
+  }
+
+  /**
+   * Fetches the age of each consuming segment.
+   * The age of a consuming segment is the time since the segment was created 
in ZK, it could be different to when
+   * the stream should start to be consumed for the segment.
+   * consumingSegmentZKMetadata is a map from consuming segments to be moved 
to their ZK metadata. Returns a map from
+   * segment name to the age of that consuming segment. Return null if failed 
to obtain info for any consuming segment.
+   */
+  private Map<String, Integer> getConsumingSegmentsAge(String 
tableNameWithType,
+      Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+    Map<String, Integer> consumingSegmentsAge = new HashMap<>();
+    long now = System.currentTimeMillis();
+    consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> {
+      long creationTime = segmentZKMetadata.getCreationTime();
+      if (creationTime < 0) {
+        LOGGER.warn("Creation time is not found for segment: {} in table: {}", 
s, tableNameWithType);
+        return;
+      }
+      consumingSegmentsAge.put(s, (int) (now - creationTime) / 60_000);
+    }));
+    return consumingSegmentsAge;
+  }
+
+  @VisibleForTesting
+  ConsumingSegmentInfoReader getConsumingSegmentInfoReader() {
+    if (_executorService == null || _connectionManager == null || 
_pinotHelixResourceManager == null) {
+      return null;
+    }
+    return new ConsumingSegmentInfoReader(_executorService, 
_connectionManager, _pinotHelixResourceManager);
+  }
+
+  /**
+   * Fetches the consuming segment info for the table and calculates the 
number of offsets to catch up for each
+   * consuming segment. consumingSegmentZKMetadata is a map from consuming 
segments to be moved to their ZK metadata.
+   * Returns a map from segment name to the number of offsets to catch up for 
that consuming
+   * segment. Return null if failed to obtain info for any consuming segment.
+   */
+  private Map<String, Integer> getConsumingSegmentsOffsetsToCatchUp(String 
tableNameWithType,
+      Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+    if (consumingSegmentZKMetadata.isEmpty()) {

Review Comment:
   why not return null?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -733,15 +781,148 @@ private RebalanceSummaryResult 
calculateDryRunSummary(Map<String, Map<String, St
         serversGettingNewSegments, serverSegmentChangeInfoMap);
     // TODO: Add a metric to estimate the total time it will take to 
rebalance. Need some good heuristics on how
     //       rebalance time can vary with number of segments added
+    RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
consumingSegmentToBeMovedSummary =
+        isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType, 
newServersToConsumingSegmentMap);
     RebalanceSummaryResult.SegmentInfo segmentInfo = new 
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
         maxSegmentsAddedToServer, averageSegmentSizeInBytes, 
totalEstimatedDataToBeMovedInBytes,
-        replicationFactor, numSegmentsInSingleReplica, 
numSegmentsAcrossAllReplicas);
+        replicationFactor, numSegmentsInSingleReplica, 
numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary);
 
     LOGGER.info("Calculated rebalance summary for table: {} with 
rebalanceJobId: {}", tableNameWithType,
         rebalanceJobId);
     return new RebalanceSummaryResult(serverInfo, segmentInfo);
   }
 
+  private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
getConsumingSegmentSummary(String tableNameWithType,
+      Map<String, Set<String>> newServersToConsumingSegmentMap) {
+    int numConsumingSegmentsToBeMoved =
+        newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> 
a + b.size(), Integer::sum);
+    Set<String> uniqueConsumingSegments =
+        
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+    Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new 
HashMap<>();
+    uniqueConsumingSegments.forEach(segment -> 
consumingSegmentZKmetadata.put(segment,
+        
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
tableNameWithType, segment)));
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+        getConsumingSegmentsOffsetsToCatchUp(tableNameWithType, 
consumingSegmentZKmetadata);
+    Map<String, Integer> consumingSegmentsAge = 
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+    Map<String, Integer> topTenOffset;
+    Map<String, RebalanceSummaryResult.ConsumingSegmentSummaryPerServer> 
consumingSegmentSummaryPerServer =
+        new HashMap<>();
+    if (consumingSegmentsOffsetsToCatchUp != null) {
+      topTenOffset = new LinkedHashMap<>();
+      consumingSegmentsOffsetsToCatchUp.entrySet()
+          .stream()
+          .sorted(
+              Collections.reverseOrder(Map.Entry.comparingByValue()))
+          .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+          .forEach(entry -> topTenOffset.put(entry.getKey(), 
entry.getValue()));
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        int totalOffsetsToCatchUp =
+            
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+        consumingSegmentSummaryPerServer.put(server, new 
RebalanceSummaryResult.ConsumingSegmentSummaryPerServer(
+            segments.size(), totalOffsetsToCatchUp));
+      });
+    } else {
+      topTenOffset = null;
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        consumingSegmentSummaryPerServer.put(server, new 
RebalanceSummaryResult.ConsumingSegmentSummaryPerServer(
+            segments.size(), null));
+      });
+    }
+
+    Map<String, Integer> oldestTenSegment;
+    oldestTenSegment = new LinkedHashMap<>();
+    consumingSegmentsAge.entrySet()
+        .stream()
+        .sorted(
+            Map.Entry.comparingByValue())
+        .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+        .forEach(entry -> oldestTenSegment.put(entry.getKey(), 
entry.getValue()));
+
+    return new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved,
+        newServersToConsumingSegmentMap.size(), topTenOffset, 
oldestTenSegment, consumingSegmentSummaryPerServer);
+  }
+
+  private Map<String, Integer> getConsumingSegmentsAge(String 
tableNameWithType,
+      Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+    Map<String, Integer> consumingSegmentsAge = new HashMap<>();
+    long now = System.currentTimeMillis();
+    consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> {
+      long creationTime = segmentZKMetadata.getCreationTime();
+      if (creationTime < 0) {
+        LOGGER.warn("Creation time is not found for segment: {} in table: {}", 
s, tableNameWithType);
+        return;
+      }
+      consumingSegmentsAge.put(s, (int) (now - creationTime) / 60_000);
+    }));
+    return consumingSegmentsAge;
+  }
+
+  @VisibleForTesting
+  ConsumingSegmentInfoReader getConsumingSegmentInfoReader() {
+    if (_executorService == null || _connectionManager == null || 
_pinotHelixResourceManager == null) {
+      return null;
+    }
+    return new ConsumingSegmentInfoReader(_executorService, 
_connectionManager, _pinotHelixResourceManager);
+  }
+
+  /**
+   * Fetches the consuming segment info for the table and calculates the 
number of offsets to catch up for each
+   * consuming segment. consumingSegmentZKMetadata is a map from consuming 
segments to be moved to their ZK metadata.
+   * Returns a map from segment name to the number of offsets to catch up for 
that consuming
+   * segment. Return null if failed to obtain info for any consuming segment.
+   */
+  private Map<String, Integer> getConsumingSegmentsOffsetsToCatchUp(String 
tableNameWithType,

Review Comment:
   i think this is not addressed? are you planning to or not? (just to make 
sure nothing is missed - it'll be good to reply to comments on whether you've 
addressed or decided to skip so it's easier for the reviewer)



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java:
##########
@@ -306,18 +298,107 @@ public Map<String, ServerSegmentChangeInfo> 
getServerSegmentChangeInfo() {
     }
   }
 
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public static class ConsumingSegmentToBeMovedSummary {
+    private final int _numConsumingSegmentsToBeMoved;
+    private final int _numServerGettingConsumingSegmentsAdded;
+    // the top N consuming segments with most offsets to be consumed by a 
server. this is essentially the difference
+    // between the latest offset of the stream and the segment's start offset 
of the stream
+    private final Map<String, Integer> 
_consumingSegmentsToBeMovedWithMostOffsetsToCatchUp;
+    // top N oldest consuming segments. the age of a segment is determined by 
its creation time
+    private final Map<String, Integer> 
_oldestConsumingSegmentsToBeMovedInMinutes;
+    private final Map<String, ConsumingSegmentSummaryPerServer> 
_serverConsumingSegmentSummary;
+
+    /**
+     * Constructor for ConsumingSegmentInfo
+     * @param numConsumingSegmentsToBeMoved total number of consuming segments 
to be moved as part of this rebalance
+     * @param numServerGettingConsumingSegmentsAdded maximum bytes of 
consuming segments to be moved to catch up
+     * @param consumingSegmentsToBeMovedWithMostOffsetsToCatchUp top consuming 
segments to be moved to catch up
+     * @param oldestConsumingSegmentsToBeMovedInMinutes oldest consuming 
segments to be moved to catch up
+     * @param serverConsumingSegmentSummary offsets of consuming segments to 
be moved to catch up per
+     *                                                   server
+     */
+    @JsonCreator
+    public ConsumingSegmentToBeMovedSummary(
+        @JsonProperty("numConsumingSegmentsToBeMoved") int 
numConsumingSegmentsToBeMoved,
+        @JsonProperty("numServerGettingConsumingSegmentsAdded") int 
numServerGettingConsumingSegmentsAdded,
+        @JsonProperty("consumingSegmentsToBeMovedWithMostOffsetsToCatchUp") 
@Nullable
+        Map<String, Integer> 
consumingSegmentsToBeMovedWithMostOffsetsToCatchUp,
+        @JsonProperty("oldestConsumingSegmentsToBeMovedInMinutes") @Nullable
+        Map<String, Integer> oldestConsumingSegmentsToBeMovedInMinutes,
+        @JsonProperty("serverConsumingSegmentSummary") @Nullable
+        Map<String, ConsumingSegmentSummaryPerServer> 
serverConsumingSegmentSummary) {
+      _numConsumingSegmentsToBeMoved = numConsumingSegmentsToBeMoved;
+      _numServerGettingConsumingSegmentsAdded = 
numServerGettingConsumingSegmentsAdded;
+      _consumingSegmentsToBeMovedWithMostOffsetsToCatchUp = 
consumingSegmentsToBeMovedWithMostOffsetsToCatchUp;
+      _oldestConsumingSegmentsToBeMovedInMinutes = 
oldestConsumingSegmentsToBeMovedInMinutes;
+      _serverConsumingSegmentSummary = serverConsumingSegmentSummary;
+    }
+
+    @JsonProperty
+    public int getNumConsumingSegmentsToBeMoved() {
+      return _numConsumingSegmentsToBeMoved;
+    }
+
+    @JsonProperty
+    public int getNumServerGettingConsumingSegmentsAdded() {
+      return _numServerGettingConsumingSegmentsAdded;
+    }
+
+    @JsonProperty
+    public Map<String, Integer> 
getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp() {
+      return _consumingSegmentsToBeMovedWithMostOffsetsToCatchUp;
+    }
+
+    @JsonProperty
+    public Map<String, Integer> getOldestConsumingSegmentsToBeMovedInMinutes() 
{
+      return _oldestConsumingSegmentsToBeMovedInMinutes;
+    }
+
+    @JsonProperty
+    public Map<String, ConsumingSegmentSummaryPerServer> 
getServerConsumingSegmentSummary() {
+      return _serverConsumingSegmentSummary;
+    }

Review Comment:
   should these have the annotation: 
`@JsonInclude(JsonInclude.Include.NON_NULL)`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java:
##########
@@ -380,6 +464,11 @@ public RebalanceChangeInfo getNumSegmentsInSingleReplica() 
{
     public RebalanceChangeInfo getNumSegmentsAcrossAllReplicas() {
       return _numSegmentsAcrossAllReplicas;
     }
+
+    @JsonProperty
+    public ConsumingSegmentToBeMovedSummary 
getConsumingSegmentToBeMovedSummary() {

Review Comment:
   annotate this with `@JsonInclude(JsonInclude.Include.NON_NULL)` ? or is the 
class level one sufficient here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to