somandal commented on code in PR #15368: URL: https://github.com/apache/pinot/pull/15368#discussion_r2013085638
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -733,15 +793,79 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St serversGettingNewSegments, serverSegmentChangeInfoMap); // TODO: Add a metric to estimate the total time it will take to rebalance. Need some good heuristics on how // rebalance time can vary with number of segments added + RebalanceSummaryResult.ConsumingSegmentSummary consumingSegmentSummary = + isOfflineTable ? null : new RebalanceSummaryResult.ConsumingSegmentSummary( + consumingSegmentsToBeMoved, maxBytesToCatchUpForConsumingSegments, bytesToCatchUpForServers); RebalanceSummaryResult.SegmentInfo segmentInfo = new RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved, maxSegmentsAddedToServer, averageSegmentSizeInBytes, totalEstimatedDataToBeMovedInBytes, - replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas); + replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas, consumingSegmentSummary); LOGGER.info("Calculated rebalance summary for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); return new RebalanceSummaryResult(serverInfo, segmentInfo); } + @VisibleForTesting + ConsumingSegmentInfoReader getConsumingSegmentInfoReader() { + if (_executorService == null || _connectionManager == null || _pinotHelixResourceManager == null) { + return null; + } + return new ConsumingSegmentInfoReader(_executorService, _connectionManager, _pinotHelixResourceManager); + } + + /** + * Fetches the consuming segment info for the table and calculates the number of bytes to catch up for each consuming + * segment. Returns a map from segment name to the number of bytes to catch up for that consuming segment. Return + * null if failed to obtain info for any consuming segment. + */ + private Map<String, Integer> getConsumingSegmentsBytesToCatchUp(String tableNameWithType) { + ConsumingSegmentInfoReader consumingSegmentInfoReader = getConsumingSegmentInfoReader(); + if (consumingSegmentInfoReader == null) { + LOGGER.warn("ConsumingSegmentInfoReader is null, cannot calculate consuming segments info for table: {}", + tableNameWithType); + return null; + } + Map<String, Integer> segmentToBytesToCatchUp = new HashMap<>(); + try { + ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap = + consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, 30_000); + for (Map.Entry<String, List<ConsumingSegmentInfoReader.ConsumingSegmentInfo>> entry + : consumingSegmentsInfoMap._segmentToConsumingInfoMap.entrySet()) { + String segmentName = entry.getKey(); + List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfoList = entry.getValue(); + SegmentZKMetadata segmentZKMetadata = + ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), tableNameWithType, + segmentName); + if (segmentZKMetadata == null) { + LOGGER.warn("Cannot find SegmentZKMetadata for segment: {} in table: {}", segmentName, tableNameWithType); + return null; + } + if (consumingSegmentInfoList != null) { + String startOffset = segmentZKMetadata.getStartOffset(); + if (startOffset == null) { + LOGGER.warn("Start offset is null for segment: {} in table: {}", segmentName, tableNameWithType); + return null; + } + if (!consumingSegmentInfoList.isEmpty()) { + // this value should be the same regardless of which server the consuming segment info is from, use the + // first in the list here + int bytesToCatchUp = consumingSegmentInfoList.get(0)._partitionOffsetInfo._latestUpstreamOffsetMap.values() Review Comment: In Kafka at least, each offset is a full message and can be of variable bytes depending on the payload data. I don't know of a good way to get the bytes information, but perhaps just calculating the number of offsets difference is probably good enough for this case? That's also why I was recommending capturing the age as well (see scenarios i listed in that comment) Another important thing to consider is that different consumers may be consuming from Kafka at a different rate, and be at a different offset. You should clearly call out what difference you are calculating? latest offset (which is the highest offset of data present in Kafka) or current offset (what the consumer has consumed so far, this might be below latest offset in Kafka). E.g. your segment startOffset: 100, latest offset in Kafka: 500, the consumer in each server (assuming 3 replicas) might be at different current offsets: consumer 1: 300, consumer 2: 450, consumer 3: 350. I think the code you call into calculates lag (latest offset - current offset) for that given consumer, right? here when we calculate the difference, what's the intention? Are you comparing the latest Kafka offset (500 in example above) or are you looking at the consumer's current offset? I don't know much about other pub-sub systems like Kinesis, it will be good to understand if we can use the same data for such tables as well. cc @npawar or @xiangfu0 might know better -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org