somandal commented on code in PR #15368:
URL: https://github.com/apache/pinot/pull/15368#discussion_r2013098032


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -733,15 +793,79 @@ private RebalanceSummaryResult 
calculateDryRunSummary(Map<String, Map<String, St
         serversGettingNewSegments, serverSegmentChangeInfoMap);
     // TODO: Add a metric to estimate the total time it will take to 
rebalance. Need some good heuristics on how
     //       rebalance time can vary with number of segments added
+    RebalanceSummaryResult.ConsumingSegmentSummary consumingSegmentSummary =
+        isOfflineTable ? null : new 
RebalanceSummaryResult.ConsumingSegmentSummary(
+            consumingSegmentsToBeMoved, maxBytesToCatchUpForConsumingSegments, 
bytesToCatchUpForServers);
     RebalanceSummaryResult.SegmentInfo segmentInfo = new 
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
         maxSegmentsAddedToServer, averageSegmentSizeInBytes, 
totalEstimatedDataToBeMovedInBytes,
-        replicationFactor, numSegmentsInSingleReplica, 
numSegmentsAcrossAllReplicas);
+        replicationFactor, numSegmentsInSingleReplica, 
numSegmentsAcrossAllReplicas, consumingSegmentSummary);
 
     LOGGER.info("Calculated rebalance summary for table: {} with 
rebalanceJobId: {}", tableNameWithType,
         rebalanceJobId);
     return new RebalanceSummaryResult(serverInfo, segmentInfo);
   }
 
+  @VisibleForTesting
+  ConsumingSegmentInfoReader getConsumingSegmentInfoReader() {
+    if (_executorService == null || _connectionManager == null || 
_pinotHelixResourceManager == null) {
+      return null;
+    }
+    return new ConsumingSegmentInfoReader(_executorService, 
_connectionManager, _pinotHelixResourceManager);
+  }
+
+  /**
+   * Fetches the consuming segment info for the table and calculates the 
number of bytes to catch up for each consuming
+   * segment. Returns a map from segment name to the number of bytes to catch 
up for that consuming segment. Return
+   * null if failed to obtain info for any consuming segment.
+   */
+  private Map<String, Integer> getConsumingSegmentsBytesToCatchUp(String 
tableNameWithType) {
+    ConsumingSegmentInfoReader consumingSegmentInfoReader = 
getConsumingSegmentInfoReader();
+    if (consumingSegmentInfoReader == null) {
+      LOGGER.warn("ConsumingSegmentInfoReader is null, cannot calculate 
consuming segments info for table: {}",
+          tableNameWithType);
+      return null;
+    }
+    Map<String, Integer> segmentToBytesToCatchUp = new HashMap<>();
+    try {
+      ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap 
consumingSegmentsInfoMap =
+          
consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, 30_000);
+      for (Map.Entry<String, 
List<ConsumingSegmentInfoReader.ConsumingSegmentInfo>> entry
+          : consumingSegmentsInfoMap._segmentToConsumingInfoMap.entrySet()) {
+        String segmentName = entry.getKey();
+        List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> 
consumingSegmentInfoList = entry.getValue();
+        SegmentZKMetadata segmentZKMetadata =
+            
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
tableNameWithType,
+                segmentName);
+        if (segmentZKMetadata == null) {
+          LOGGER.warn("Cannot find SegmentZKMetadata for segment: {} in table: 
{}", segmentName, tableNameWithType);
+          return null;
+        }
+        if (consumingSegmentInfoList != null) {
+          String startOffset = segmentZKMetadata.getStartOffset();
+          if (startOffset == null) {
+            LOGGER.warn("Start offset is null for segment: {} in table: {}", 
segmentName, tableNameWithType);
+            return null;
+          }
+          if (!consumingSegmentInfoList.isEmpty()) {
+            // this value should be the same regardless of which server the 
consuming segment info is from, use the
+            // first in the list here
+            int bytesToCatchUp = 
consumingSegmentInfoList.get(0)._partitionOffsetInfo._latestUpstreamOffsetMap.values()

Review Comment:
   yeah I know we need segment ZK metadata, was asking about whether the 
intended semantics of this is to be `latest offset - start offset` or `current 
offset - start offset`, and whether you'd thought about that, that's all. I 
would add comments in the summary result that calls out this is calculated 
against latest offset so that there is no confusion.
   
   Is there another way to get this kind of information for Kinesis that'll 
also work for Kafka? A different API, or something else? It'll be good to find 
out more so we don't land up re-implementing this from scratch if we need to 
add Kinesis support



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to