J-HowHuang commented on code in PR #15368: URL: https://github.com/apache/pinot/pull/15368#discussion_r2019868894
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -733,15 +781,148 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St serversGettingNewSegments, serverSegmentChangeInfoMap); // TODO: Add a metric to estimate the total time it will take to rebalance. Need some good heuristics on how // rebalance time can vary with number of segments added + RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary consumingSegmentToBeMovedSummary = + isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType, newServersToConsumingSegmentMap); RebalanceSummaryResult.SegmentInfo segmentInfo = new RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved, maxSegmentsAddedToServer, averageSegmentSizeInBytes, totalEstimatedDataToBeMovedInBytes, - replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas); + replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary); LOGGER.info("Calculated rebalance summary for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); return new RebalanceSummaryResult(serverInfo, segmentInfo); } + private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary getConsumingSegmentSummary(String tableNameWithType, + Map<String, Set<String>> newServersToConsumingSegmentMap) { + int numConsumingSegmentsToBeMoved = + newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> a + b.size(), Integer::sum); + Set<String> uniqueConsumingSegments = + newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()); + Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new HashMap<>(); + uniqueConsumingSegments.forEach(segment -> consumingSegmentZKmetadata.put(segment, + ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), tableNameWithType, segment))); + Map<String, Integer> consumingSegmentsOffsetsToCatchUp = + getConsumingSegmentsOffsetsToCatchUp(tableNameWithType, consumingSegmentZKmetadata); + Map<String, Integer> consumingSegmentsAge = getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata); + + Map<String, Integer> topTenOffset; + Map<String, RebalanceSummaryResult.ConsumingSegmentSummaryPerServer> consumingSegmentSummaryPerServer = + new HashMap<>(); + if (consumingSegmentsOffsetsToCatchUp != null) { + topTenOffset = new LinkedHashMap<>(); + consumingSegmentsOffsetsToCatchUp.entrySet() + .stream() + .sorted( + Collections.reverseOrder(Map.Entry.comparingByValue())) + .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY) + .forEach(entry -> topTenOffset.put(entry.getKey(), entry.getValue())); + newServersToConsumingSegmentMap.forEach((server, segments) -> { + int totalOffsetsToCatchUp = + segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum(); + consumingSegmentSummaryPerServer.put(server, new RebalanceSummaryResult.ConsumingSegmentSummaryPerServer( + segments.size(), totalOffsetsToCatchUp)); + }); + } else { + topTenOffset = null; + newServersToConsumingSegmentMap.forEach((server, segments) -> { + consumingSegmentSummaryPerServer.put(server, new RebalanceSummaryResult.ConsumingSegmentSummaryPerServer( + segments.size(), null)); + }); + } + + Map<String, Integer> oldestTenSegment; + oldestTenSegment = new LinkedHashMap<>(); + consumingSegmentsAge.entrySet() + .stream() + .sorted( + Map.Entry.comparingByValue()) + .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY) + .forEach(entry -> oldestTenSegment.put(entry.getKey(), entry.getValue())); + + return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved, + newServersToConsumingSegmentMap.size(), topTenOffset, oldestTenSegment, consumingSegmentSummaryPerServer); + } + + private Map<String, Integer> getConsumingSegmentsAge(String tableNameWithType, + Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) { + Map<String, Integer> consumingSegmentsAge = new HashMap<>(); + long now = System.currentTimeMillis(); + consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> { + long creationTime = segmentZKMetadata.getCreationTime(); + if (creationTime < 0) { + LOGGER.warn("Creation time is not found for segment: {} in table: {}", s, tableNameWithType); + return; + } + consumingSegmentsAge.put(s, (int) (now - creationTime) / 60_000); + })); + return consumingSegmentsAge; + } + + @VisibleForTesting + ConsumingSegmentInfoReader getConsumingSegmentInfoReader() { + if (_executorService == null || _connectionManager == null || _pinotHelixResourceManager == null) { + return null; + } + return new ConsumingSegmentInfoReader(_executorService, _connectionManager, _pinotHelixResourceManager); + } + + /** + * Fetches the consuming segment info for the table and calculates the number of offsets to catch up for each + * consuming segment. consumingSegmentZKMetadata is a map from consuming segments to be moved to their ZK metadata. + * Returns a map from segment name to the number of offsets to catch up for that consuming + * segment. Return null if failed to obtain info for any consuming segment. + */ + private Map<String, Integer> getConsumingSegmentsOffsetsToCatchUp(String tableNameWithType, + Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) { + if (consumingSegmentZKMetadata.isEmpty()) { + LOGGER.info("No consuming segments being moved for table: {}", tableNameWithType); + return new HashMap<>(); + } + ConsumingSegmentInfoReader consumingSegmentInfoReader = getConsumingSegmentInfoReader(); + if (consumingSegmentInfoReader == null) { + LOGGER.warn("ConsumingSegmentInfoReader is null, cannot calculate consuming segments info for table: {}", + tableNameWithType); + return null; + } + Map<String, Integer> segmentToOffsetsToCatchUp = new HashMap<>(); + try { + ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap = + consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, 30_000); + for (Map.Entry<String, SegmentZKMetadata> entry : consumingSegmentZKMetadata.entrySet()) { + String segmentName = entry.getKey(); + List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfoList = + consumingSegmentsInfoMap._segmentToConsumingInfoMap.getOrDefault(segmentName, null); + SegmentZKMetadata segmentZKMetadata = entry.getValue(); + if (segmentZKMetadata == null) { + LOGGER.warn("Cannot find SegmentZKMetadata for segment: {} in table: {}", segmentName, tableNameWithType); + return null; + } + String startOffset = segmentZKMetadata.getStartOffset(); + if (startOffset == null) { + LOGGER.warn("Start offset is null for segment: {} in table: {}", segmentName, tableNameWithType); + return null; + } + if (consumingSegmentInfoList != null && !consumingSegmentInfoList.isEmpty()) { + // this value should be the same regardless of which server the consuming segment info is from, use the + // first in the list here + int offsetsToCatchUp = + consumingSegmentInfoList.get(0)._partitionOffsetInfo._latestUpstreamOffsetMap.values() Review Comment: wait you're right. I'll check because I thought the Kinesis error was caught here and that's how I found the issue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org