somandal commented on code in PR #15368: URL: https://github.com/apache/pinot/pull/15368#discussion_r2017439035
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java: ########## @@ -240,6 +240,92 @@ public Map<String, ServerSegmentChangeInfo> getServerSegmentChangeInfo() { } } + public static class ConsumingSegmentSummaryPerServer { + private final int _numConsumingSegmentToBeAdded; + private final Integer _totalOffsetsNeedToCatchUp; + + @JsonCreator + public ConsumingSegmentSummaryPerServer( Review Comment: nit: can you move this class to the bottom of `ConsumingSegmentSummaryPerServer`? I've found it easier to read the code if the main class's constructor and fields are at the top ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java: ########## @@ -240,6 +240,92 @@ public Map<String, ServerSegmentChangeInfo> getServerSegmentChangeInfo() { } } + public static class ConsumingSegmentSummaryPerServer { + private final int _numConsumingSegmentToBeAdded; + private final Integer _totalOffsetsNeedToCatchUp; + + @JsonCreator + public ConsumingSegmentSummaryPerServer( + @JsonProperty("numConsumingSegmentToBeAdded") int numConsumingSegmentToBeAdded, + @JsonProperty("totalOffsetsNeedToCatchUp") @Nullable Integer totalOffsetsNeedToCatchUp) { Review Comment: nit: this name is confusing. can we rename to: `totalNumOffsetsToReConsumeToCatchUp` Or something along the above lines? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -733,15 +781,148 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St serversGettingNewSegments, serverSegmentChangeInfoMap); // TODO: Add a metric to estimate the total time it will take to rebalance. Need some good heuristics on how // rebalance time can vary with number of segments added + RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary consumingSegmentToBeMovedSummary = + isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType, newServersToConsumingSegmentMap); RebalanceSummaryResult.SegmentInfo segmentInfo = new RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved, maxSegmentsAddedToServer, averageSegmentSizeInBytes, totalEstimatedDataToBeMovedInBytes, - replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas); + replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary); LOGGER.info("Calculated rebalance summary for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); return new RebalanceSummaryResult(serverInfo, segmentInfo); } + private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary getConsumingSegmentSummary(String tableNameWithType, + Map<String, Set<String>> newServersToConsumingSegmentMap) { + int numConsumingSegmentsToBeMoved = + newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> a + b.size(), Integer::sum); + Set<String> uniqueConsumingSegments = + newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()); + Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new HashMap<>(); + uniqueConsumingSegments.forEach(segment -> consumingSegmentZKmetadata.put(segment, + ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), tableNameWithType, segment))); + Map<String, Integer> consumingSegmentsOffsetsToCatchUp = + getConsumingSegmentsOffsetsToCatchUp(tableNameWithType, consumingSegmentZKmetadata); + Map<String, Integer> consumingSegmentsAge = getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata); + + Map<String, Integer> topTenOffset; + Map<String, RebalanceSummaryResult.ConsumingSegmentSummaryPerServer> consumingSegmentSummaryPerServer = + new HashMap<>(); + if (consumingSegmentsOffsetsToCatchUp != null) { + topTenOffset = new LinkedHashMap<>(); + consumingSegmentsOffsetsToCatchUp.entrySet() + .stream() + .sorted( + Collections.reverseOrder(Map.Entry.comparingByValue())) + .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY) + .forEach(entry -> topTenOffset.put(entry.getKey(), entry.getValue())); + newServersToConsumingSegmentMap.forEach((server, segments) -> { + int totalOffsetsToCatchUp = + segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum(); + consumingSegmentSummaryPerServer.put(server, new RebalanceSummaryResult.ConsumingSegmentSummaryPerServer( + segments.size(), totalOffsetsToCatchUp)); + }); + } else { + topTenOffset = null; + newServersToConsumingSegmentMap.forEach((server, segments) -> { + consumingSegmentSummaryPerServer.put(server, new RebalanceSummaryResult.ConsumingSegmentSummaryPerServer( + segments.size(), null)); + }); + } + + Map<String, Integer> oldestTenSegment; + oldestTenSegment = new LinkedHashMap<>(); + consumingSegmentsAge.entrySet() + .stream() + .sorted( + Map.Entry.comparingByValue()) + .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY) + .forEach(entry -> oldestTenSegment.put(entry.getKey(), entry.getValue())); + + return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved, + newServersToConsumingSegmentMap.size(), topTenOffset, oldestTenSegment, consumingSegmentSummaryPerServer); + } + + private Map<String, Integer> getConsumingSegmentsAge(String tableNameWithType, Review Comment: nit: let's rename to: `getTopConsumingSegmentsByOldestCreationTime` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -733,15 +781,148 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St serversGettingNewSegments, serverSegmentChangeInfoMap); // TODO: Add a metric to estimate the total time it will take to rebalance. Need some good heuristics on how // rebalance time can vary with number of segments added + RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary consumingSegmentToBeMovedSummary = + isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType, newServersToConsumingSegmentMap); RebalanceSummaryResult.SegmentInfo segmentInfo = new RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved, maxSegmentsAddedToServer, averageSegmentSizeInBytes, totalEstimatedDataToBeMovedInBytes, - replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas); + replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary); LOGGER.info("Calculated rebalance summary for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); return new RebalanceSummaryResult(serverInfo, segmentInfo); } + private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary getConsumingSegmentSummary(String tableNameWithType, + Map<String, Set<String>> newServersToConsumingSegmentMap) { + int numConsumingSegmentsToBeMoved = + newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> a + b.size(), Integer::sum); + Set<String> uniqueConsumingSegments = + newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()); + Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new HashMap<>(); + uniqueConsumingSegments.forEach(segment -> consumingSegmentZKmetadata.put(segment, + ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), tableNameWithType, segment))); + Map<String, Integer> consumingSegmentsOffsetsToCatchUp = + getConsumingSegmentsOffsetsToCatchUp(tableNameWithType, consumingSegmentZKmetadata); + Map<String, Integer> consumingSegmentsAge = getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata); + + Map<String, Integer> topTenOffset; + Map<String, RebalanceSummaryResult.ConsumingSegmentSummaryPerServer> consumingSegmentSummaryPerServer = + new HashMap<>(); + if (consumingSegmentsOffsetsToCatchUp != null) { + topTenOffset = new LinkedHashMap<>(); + consumingSegmentsOffsetsToCatchUp.entrySet() + .stream() + .sorted( + Collections.reverseOrder(Map.Entry.comparingByValue())) + .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY) + .forEach(entry -> topTenOffset.put(entry.getKey(), entry.getValue())); + newServersToConsumingSegmentMap.forEach((server, segments) -> { + int totalOffsetsToCatchUp = + segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum(); + consumingSegmentSummaryPerServer.put(server, new RebalanceSummaryResult.ConsumingSegmentSummaryPerServer( + segments.size(), totalOffsetsToCatchUp)); + }); + } else { + topTenOffset = null; + newServersToConsumingSegmentMap.forEach((server, segments) -> { + consumingSegmentSummaryPerServer.put(server, new RebalanceSummaryResult.ConsumingSegmentSummaryPerServer( + segments.size(), null)); + }); + } + + Map<String, Integer> oldestTenSegment; + oldestTenSegment = new LinkedHashMap<>(); + consumingSegmentsAge.entrySet() + .stream() + .sorted( + Map.Entry.comparingByValue()) + .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY) + .forEach(entry -> oldestTenSegment.put(entry.getKey(), entry.getValue())); + + return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved, + newServersToConsumingSegmentMap.size(), topTenOffset, oldestTenSegment, consumingSegmentSummaryPerServer); + } + + private Map<String, Integer> getConsumingSegmentsAge(String tableNameWithType, + Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) { + Map<String, Integer> consumingSegmentsAge = new HashMap<>(); + long now = System.currentTimeMillis(); + consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> { + long creationTime = segmentZKMetadata.getCreationTime(); + if (creationTime < 0) { + LOGGER.warn("Creation time is not found for segment: {} in table: {}", s, tableNameWithType); + return; + } + consumingSegmentsAge.put(s, (int) (now - creationTime) / 60_000); + })); + return consumingSegmentsAge; + } + + @VisibleForTesting + ConsumingSegmentInfoReader getConsumingSegmentInfoReader() { + if (_executorService == null || _connectionManager == null || _pinotHelixResourceManager == null) { + return null; + } + return new ConsumingSegmentInfoReader(_executorService, _connectionManager, _pinotHelixResourceManager); + } + + /** + * Fetches the consuming segment info for the table and calculates the number of offsets to catch up for each + * consuming segment. consumingSegmentZKMetadata is a map from consuming segments to be moved to their ZK metadata. + * Returns a map from segment name to the number of offsets to catch up for that consuming + * segment. Return null if failed to obtain info for any consuming segment. + */ + private Map<String, Integer> getConsumingSegmentsOffsetsToCatchUp(String tableNameWithType, + Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) { + if (consumingSegmentZKMetadata.isEmpty()) { + LOGGER.info("No consuming segments being moved for table: {}", tableNameWithType); + return new HashMap<>(); + } + ConsumingSegmentInfoReader consumingSegmentInfoReader = getConsumingSegmentInfoReader(); + if (consumingSegmentInfoReader == null) { + LOGGER.warn("ConsumingSegmentInfoReader is null, cannot calculate consuming segments info for table: {}", + tableNameWithType); + return null; + } + Map<String, Integer> segmentToOffsetsToCatchUp = new HashMap<>(); + try { + ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap = + consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, 30_000); + for (Map.Entry<String, SegmentZKMetadata> entry : consumingSegmentZKMetadata.entrySet()) { + String segmentName = entry.getKey(); + List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfoList = + consumingSegmentsInfoMap._segmentToConsumingInfoMap.getOrDefault(segmentName, null); + SegmentZKMetadata segmentZKMetadata = entry.getValue(); + if (segmentZKMetadata == null) { + LOGGER.warn("Cannot find SegmentZKMetadata for segment: {} in table: {}", segmentName, tableNameWithType); + return null; + } + String startOffset = segmentZKMetadata.getStartOffset(); + if (startOffset == null) { + LOGGER.warn("Start offset is null for segment: {} in table: {}", segmentName, tableNameWithType); + return null; + } + if (consumingSegmentInfoList != null && !consumingSegmentInfoList.isEmpty()) { + // this value should be the same regardless of which server the consuming segment info is from, use the + // first in the list here + int offsetsToCatchUp = + consumingSegmentInfoList.get(0)._partitionOffsetInfo._latestUpstreamOffsetMap.values() + .stream().mapToInt(offset -> Integer.parseInt(offset) - Integer.parseInt(startOffset)).sum(); + segmentToOffsetsToCatchUp.put(segmentName, offsetsToCatchUp); + } else { + LOGGER.warn("No available consuming segment info from any server. Segment: {} in table: {}", segmentName, + tableNameWithType); + return null; + } + } + } catch (InvalidConfigException e) { + LOGGER.warn("Caught exception while trying to fetch consuming segment info for table: {}", tableNameWithType, e); + return null; + } + LOGGER.info("Successfully fetched consuming segments info for table: {}", tableNameWithType); Review Comment: nit: since we've identified a logging improvement to alway include the rebalance job ID, can you ensure you add that to all logs added in this PR? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -733,15 +781,148 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St serversGettingNewSegments, serverSegmentChangeInfoMap); // TODO: Add a metric to estimate the total time it will take to rebalance. Need some good heuristics on how // rebalance time can vary with number of segments added + RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary consumingSegmentToBeMovedSummary = + isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType, newServersToConsumingSegmentMap); RebalanceSummaryResult.SegmentInfo segmentInfo = new RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved, maxSegmentsAddedToServer, averageSegmentSizeInBytes, totalEstimatedDataToBeMovedInBytes, - replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas); + replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary); LOGGER.info("Calculated rebalance summary for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); return new RebalanceSummaryResult(serverInfo, segmentInfo); } + private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary getConsumingSegmentSummary(String tableNameWithType, + Map<String, Set<String>> newServersToConsumingSegmentMap) { + int numConsumingSegmentsToBeMoved = + newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> a + b.size(), Integer::sum); + Set<String> uniqueConsumingSegments = Review Comment: can we have an early return from here if the `uniqueConsumingSegments` list is empty? that way you can assume that the remainder of this function is acting on the scenario where `CONSUMING` segments are moving. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -733,15 +781,148 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St serversGettingNewSegments, serverSegmentChangeInfoMap); // TODO: Add a metric to estimate the total time it will take to rebalance. Need some good heuristics on how // rebalance time can vary with number of segments added + RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary consumingSegmentToBeMovedSummary = + isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType, newServersToConsumingSegmentMap); RebalanceSummaryResult.SegmentInfo segmentInfo = new RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved, maxSegmentsAddedToServer, averageSegmentSizeInBytes, totalEstimatedDataToBeMovedInBytes, - replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas); + replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary); LOGGER.info("Calculated rebalance summary for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); return new RebalanceSummaryResult(serverInfo, segmentInfo); } + private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary getConsumingSegmentSummary(String tableNameWithType, + Map<String, Set<String>> newServersToConsumingSegmentMap) { + int numConsumingSegmentsToBeMoved = + newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> a + b.size(), Integer::sum); + Set<String> uniqueConsumingSegments = + newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()); + Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new HashMap<>(); + uniqueConsumingSegments.forEach(segment -> consumingSegmentZKmetadata.put(segment, + ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), tableNameWithType, segment))); + Map<String, Integer> consumingSegmentsOffsetsToCatchUp = + getConsumingSegmentsOffsetsToCatchUp(tableNameWithType, consumingSegmentZKmetadata); + Map<String, Integer> consumingSegmentsAge = getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata); + + Map<String, Integer> topTenOffset; + Map<String, RebalanceSummaryResult.ConsumingSegmentSummaryPerServer> consumingSegmentSummaryPerServer = + new HashMap<>(); + if (consumingSegmentsOffsetsToCatchUp != null) { + topTenOffset = new LinkedHashMap<>(); + consumingSegmentsOffsetsToCatchUp.entrySet() + .stream() + .sorted( + Collections.reverseOrder(Map.Entry.comparingByValue())) + .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY) + .forEach(entry -> topTenOffset.put(entry.getKey(), entry.getValue())); Review Comment: any chance this can be made into a util? looks like the logic is the same for both this and the top segments by age calculation? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java: ########## @@ -252,6 +338,7 @@ public static class SegmentInfo { private final RebalanceChangeInfo _numSegmentsInSingleReplica; @JsonInclude(JsonInclude.Include.NON_NULL) private final RebalanceChangeInfo _numSegmentsAcrossAllReplicas; + private final ConsumingSegmentToBeMovedSummary _consumingSegmentToBeMovedSummary; Review Comment: can we also add the annotation for `@JsonInclude(JsonInclude.Include.NON_NULL)` here? we can probably revisit all these in a later PR and remove where it isn't necessary ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -625,22 +651,35 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails) { LOGGER.info("Calculating rebalance summary for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); + boolean isOfflineTable = TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == TableType.OFFLINE; int existingReplicationFactor = 0; int newReplicationFactor = 0; Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>(); Map<String, Set<String>> newServersToSegmentMap = new HashMap<>(); + Map<String, Set<String>> existingServersToConsumingSegmentMap = isOfflineTable ? null : new HashMap<>(); + Map<String, Set<String>> newServersToConsumingSegmentMap = isOfflineTable ? null : new HashMap<>(); for (Map.Entry<String, Map<String, String>> entrySet : currentAssignment.entrySet()) { existingReplicationFactor = entrySet.getValue().size(); for (String segmentKey : entrySet.getValue().keySet()) { existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new HashSet<>()).add(entrySet.getKey()); + if (existingServersToConsumingSegmentMap != null && entrySet.getValue() + .get(segmentKey) + .equals(SegmentStateModel.CONSUMING)) { + existingServersToConsumingSegmentMap.computeIfAbsent(segmentKey, k -> new HashSet<>()).add(entrySet.getKey()); + } Review Comment: shouldn't you only add the segment if all the server's state is CONSUMING? or maybe I misunderstood this code? Good to have a test case for this scenario too (yet to look at tests) e.g. ``` segment_1: server_1: CONSUMIG, server_2: CONSUMING, server_3: CONSUMING segment_2: server_1: ONLINE, server_2: CONSUMING, server_3: CONSUMING ``` In the above, only segment_1 should be added, right? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -625,22 +651,35 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails) { LOGGER.info("Calculating rebalance summary for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); + boolean isOfflineTable = TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == TableType.OFFLINE; int existingReplicationFactor = 0; int newReplicationFactor = 0; Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>(); Map<String, Set<String>> newServersToSegmentMap = new HashMap<>(); + Map<String, Set<String>> existingServersToConsumingSegmentMap = isOfflineTable ? null : new HashMap<>(); + Map<String, Set<String>> newServersToConsumingSegmentMap = isOfflineTable ? null : new HashMap<>(); for (Map.Entry<String, Map<String, String>> entrySet : currentAssignment.entrySet()) { existingReplicationFactor = entrySet.getValue().size(); for (String segmentKey : entrySet.getValue().keySet()) { existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new HashSet<>()).add(entrySet.getKey()); + if (existingServersToConsumingSegmentMap != null && entrySet.getValue() + .get(segmentKey) + .equals(SegmentStateModel.CONSUMING)) { + existingServersToConsumingSegmentMap.computeIfAbsent(segmentKey, k -> new HashSet<>()).add(entrySet.getKey()); + } } } for (Map.Entry<String, Map<String, String>> entrySet : targetAssignment.entrySet()) { newReplicationFactor = entrySet.getValue().size(); for (String segmentKey : entrySet.getValue().keySet()) { newServersToSegmentMap.computeIfAbsent(segmentKey, k -> new HashSet<>()).add(entrySet.getKey()); + if (newServersToConsumingSegmentMap != null && entrySet.getValue() + .get(segmentKey) + .equals(SegmentStateModel.CONSUMING)) { + newServersToConsumingSegmentMap.computeIfAbsent(segmentKey, k -> new HashSet<>()).add(entrySet.getKey()); + } Review Comment: shouldn't you only add the segment if all the server's state is CONSUMING? or maybe I misunderstood this code? Good to have a test case for this scenario too (yet to look at tests) ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java: ########## @@ -240,6 +240,92 @@ public Map<String, ServerSegmentChangeInfo> getServerSegmentChangeInfo() { } } + public static class ConsumingSegmentSummaryPerServer { + private final int _numConsumingSegmentToBeAdded; + private final Integer _totalOffsetsNeedToCatchUp; + + @JsonCreator + public ConsumingSegmentSummaryPerServer( + @JsonProperty("numConsumingSegmentToBeAdded") int numConsumingSegmentToBeAdded, + @JsonProperty("totalOffsetsNeedToCatchUp") @Nullable Integer totalOffsetsNeedToCatchUp) { + _numConsumingSegmentToBeAdded = numConsumingSegmentToBeAdded; + _totalOffsetsNeedToCatchUp = totalOffsetsNeedToCatchUp; + } + + @JsonProperty + public int getNumConsumingSegmentToBeAdded() { + return _numConsumingSegmentToBeAdded; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Integer getTotalOffsetsNeedToCatchUp() { + return _totalOffsetsNeedToCatchUp; + } + } + + public static class ConsumingSegmentToBeMovedSummary { + private final int _numConsumingSegmentsToBeMoved; + private final int _numServerGettingConsumingSegmentsAdded; + private final Map<String, Integer> _topConsumingSegmentsOffsetsToCatchUp; + private final Map<String, Integer> _oldestConsumingSegmentsToBeMovedInMinutes; Review Comment: can you add a comment explaining what is meant by oldest here, i.e. it's related to the segment creation time? to ensure that people don't assume this has anything to do with when the event was added to Kafka nitL perhaps rename it to: `oldestCreationTimeInMinutesConsumingSegmentsToBeMoved` (I know it's long, but don't want people to think we're looking at oldest to mean the Kafka timestamp) ########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java: ########## @@ -214,7 +214,7 @@ public String forTableRebalance(String tableName, String tableType, boolean dryR if (reassignInstances) { stringBuilder.append("&reassignInstances=").append(reassignInstances); } - if (includeConsuming) { + if (!includeConsuming) { Review Comment: why was this change made? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java: ########## @@ -240,6 +240,92 @@ public Map<String, ServerSegmentChangeInfo> getServerSegmentChangeInfo() { } } + public static class ConsumingSegmentSummaryPerServer { + private final int _numConsumingSegmentToBeAdded; + private final Integer _totalOffsetsNeedToCatchUp; + + @JsonCreator + public ConsumingSegmentSummaryPerServer( + @JsonProperty("numConsumingSegmentToBeAdded") int numConsumingSegmentToBeAdded, + @JsonProperty("totalOffsetsNeedToCatchUp") @Nullable Integer totalOffsetsNeedToCatchUp) { + _numConsumingSegmentToBeAdded = numConsumingSegmentToBeAdded; + _totalOffsetsNeedToCatchUp = totalOffsetsNeedToCatchUp; + } + + @JsonProperty + public int getNumConsumingSegmentToBeAdded() { + return _numConsumingSegmentToBeAdded; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Integer getTotalOffsetsNeedToCatchUp() { + return _totalOffsetsNeedToCatchUp; + } + } + + public static class ConsumingSegmentToBeMovedSummary { + private final int _numConsumingSegmentsToBeMoved; + private final int _numServerGettingConsumingSegmentsAdded; + private final Map<String, Integer> _topConsumingSegmentsOffsetsToCatchUp; Review Comment: nit: let's rename this to: topConsumingSegmentsWithMaxOffsetsToCatchUp or something similar? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -711,6 +750,15 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St } } + if (existingServersToConsumingSegmentMap != null && newServersToConsumingSegmentMap != null) { + for (Map.Entry<String, Set<String>> entry : newServersToConsumingSegmentMap.entrySet()) { + String server = entry.getKey(); + entry.getValue().removeAll(existingServersToConsumingSegmentMap.getOrDefault(server, Collections.emptySet())); + } + // turn the map into server -> consuming segments added Review Comment: nit: should this comment be moved further up? I think the code below just removes empty sets? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java: ########## @@ -314,6 +404,12 @@ public RebalanceChangeInfo getNumSegmentsInSingleReplica() { public RebalanceChangeInfo getNumSegmentsAcrossAllReplicas() { return _numSegmentsAcrossAllReplicas; } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) Review Comment: nit: can you add this annotation to all getters in this `RebalanceSummaryResult` class where the field has been annotated with this and just do a quick verification? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -625,22 +651,35 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails) { LOGGER.info("Calculating rebalance summary for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); + boolean isOfflineTable = TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == TableType.OFFLINE; int existingReplicationFactor = 0; int newReplicationFactor = 0; Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>(); Map<String, Set<String>> newServersToSegmentMap = new HashMap<>(); + Map<String, Set<String>> existingServersToConsumingSegmentMap = isOfflineTable ? null : new HashMap<>(); + Map<String, Set<String>> newServersToConsumingSegmentMap = isOfflineTable ? null : new HashMap<>(); for (Map.Entry<String, Map<String, String>> entrySet : currentAssignment.entrySet()) { existingReplicationFactor = entrySet.getValue().size(); Review Comment: nit: let's add a variable to get `entrySet.getKey()` and use that instead of calling `entrySet.getKey()` over and over again. e.g. `String segmentName = entrySet.getKey();` can you also rename `segmentKey` -> `instanceName` (since most likely your PR will be merged before mine that fixes this) ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -625,22 +651,35 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails) { LOGGER.info("Calculating rebalance summary for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); + boolean isOfflineTable = TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == TableType.OFFLINE; int existingReplicationFactor = 0; int newReplicationFactor = 0; Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>(); Map<String, Set<String>> newServersToSegmentMap = new HashMap<>(); + Map<String, Set<String>> existingServersToConsumingSegmentMap = isOfflineTable ? null : new HashMap<>(); + Map<String, Set<String>> newServersToConsumingSegmentMap = isOfflineTable ? null : new HashMap<>(); for (Map.Entry<String, Map<String, String>> entrySet : currentAssignment.entrySet()) { existingReplicationFactor = entrySet.getValue().size(); for (String segmentKey : entrySet.getValue().keySet()) { existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new HashSet<>()).add(entrySet.getKey()); + if (existingServersToConsumingSegmentMap != null && entrySet.getValue() + .get(segmentKey) + .equals(SegmentStateModel.CONSUMING)) { + existingServersToConsumingSegmentMap.computeIfAbsent(segmentKey, k -> new HashSet<>()).add(entrySet.getKey()); + } } } for (Map.Entry<String, Map<String, String>> entrySet : targetAssignment.entrySet()) { newReplicationFactor = entrySet.getValue().size(); Review Comment: nit: let's add a variable to get entrySet.getKey() and use that instead of calling entrySet.getKey() over and over again. e.g. String segmentName = entrySet.getKey(); can you also rename segmentKey -> instanceName (since most likely your PR will be merged before mine that fixes this) ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -711,6 +750,15 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St } } + if (existingServersToConsumingSegmentMap != null && newServersToConsumingSegmentMap != null) { + for (Map.Entry<String, Set<String>> entry : newServersToConsumingSegmentMap.entrySet()) { + String server = entry.getKey(); + entry.getValue().removeAll(existingServersToConsumingSegmentMap.getOrDefault(server, Collections.emptySet())); Review Comment: nit: recommend having a variable for `entry.getValue()` since it is accessed more than once ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -733,15 +781,148 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St serversGettingNewSegments, serverSegmentChangeInfoMap); // TODO: Add a metric to estimate the total time it will take to rebalance. Need some good heuristics on how // rebalance time can vary with number of segments added + RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary consumingSegmentToBeMovedSummary = + isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType, newServersToConsumingSegmentMap); RebalanceSummaryResult.SegmentInfo segmentInfo = new RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved, maxSegmentsAddedToServer, averageSegmentSizeInBytes, totalEstimatedDataToBeMovedInBytes, - replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas); + replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary); LOGGER.info("Calculated rebalance summary for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); return new RebalanceSummaryResult(serverInfo, segmentInfo); } + private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary getConsumingSegmentSummary(String tableNameWithType, + Map<String, Set<String>> newServersToConsumingSegmentMap) { + int numConsumingSegmentsToBeMoved = + newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> a + b.size(), Integer::sum); + Set<String> uniqueConsumingSegments = + newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()); + Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new HashMap<>(); + uniqueConsumingSegments.forEach(segment -> consumingSegmentZKmetadata.put(segment, + ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), tableNameWithType, segment))); + Map<String, Integer> consumingSegmentsOffsetsToCatchUp = + getConsumingSegmentsOffsetsToCatchUp(tableNameWithType, consumingSegmentZKmetadata); + Map<String, Integer> consumingSegmentsAge = getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata); + + Map<String, Integer> topTenOffset; + Map<String, RebalanceSummaryResult.ConsumingSegmentSummaryPerServer> consumingSegmentSummaryPerServer = + new HashMap<>(); + if (consumingSegmentsOffsetsToCatchUp != null) { + topTenOffset = new LinkedHashMap<>(); + consumingSegmentsOffsetsToCatchUp.entrySet() + .stream() + .sorted( + Collections.reverseOrder(Map.Entry.comparingByValue())) + .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY) + .forEach(entry -> topTenOffset.put(entry.getKey(), entry.getValue())); + newServersToConsumingSegmentMap.forEach((server, segments) -> { + int totalOffsetsToCatchUp = + segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum(); + consumingSegmentSummaryPerServer.put(server, new RebalanceSummaryResult.ConsumingSegmentSummaryPerServer( + segments.size(), totalOffsetsToCatchUp)); + }); + } else { + topTenOffset = null; + newServersToConsumingSegmentMap.forEach((server, segments) -> { + consumingSegmentSummaryPerServer.put(server, new RebalanceSummaryResult.ConsumingSegmentSummaryPerServer( + segments.size(), null)); + }); + } + + Map<String, Integer> oldestTenSegment; + oldestTenSegment = new LinkedHashMap<>(); + consumingSegmentsAge.entrySet() + .stream() + .sorted( + Map.Entry.comparingByValue()) + .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY) + .forEach(entry -> oldestTenSegment.put(entry.getKey(), entry.getValue())); + + return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved, + newServersToConsumingSegmentMap.size(), topTenOffset, oldestTenSegment, consumingSegmentSummaryPerServer); + } + + private Map<String, Integer> getConsumingSegmentsAge(String tableNameWithType, + Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) { + Map<String, Integer> consumingSegmentsAge = new HashMap<>(); + long now = System.currentTimeMillis(); + consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> { + long creationTime = segmentZKMetadata.getCreationTime(); + if (creationTime < 0) { + LOGGER.warn("Creation time is not found for segment: {} in table: {}", s, tableNameWithType); + return; + } + consumingSegmentsAge.put(s, (int) (now - creationTime) / 60_000); + })); + return consumingSegmentsAge; + } + + @VisibleForTesting + ConsumingSegmentInfoReader getConsumingSegmentInfoReader() { + if (_executorService == null || _connectionManager == null || _pinotHelixResourceManager == null) { + return null; + } + return new ConsumingSegmentInfoReader(_executorService, _connectionManager, _pinotHelixResourceManager); + } + + /** + * Fetches the consuming segment info for the table and calculates the number of offsets to catch up for each + * consuming segment. consumingSegmentZKMetadata is a map from consuming segments to be moved to their ZK metadata. + * Returns a map from segment name to the number of offsets to catch up for that consuming + * segment. Return null if failed to obtain info for any consuming segment. + */ + private Map<String, Integer> getConsumingSegmentsOffsetsToCatchUp(String tableNameWithType, Review Comment: nit: let's rename: `getTopConsumingSegmentsByNumOffsetsToCatchUp` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -733,15 +781,148 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St serversGettingNewSegments, serverSegmentChangeInfoMap); // TODO: Add a metric to estimate the total time it will take to rebalance. Need some good heuristics on how // rebalance time can vary with number of segments added + RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary consumingSegmentToBeMovedSummary = + isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType, newServersToConsumingSegmentMap); RebalanceSummaryResult.SegmentInfo segmentInfo = new RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved, maxSegmentsAddedToServer, averageSegmentSizeInBytes, totalEstimatedDataToBeMovedInBytes, - replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas); + replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary); LOGGER.info("Calculated rebalance summary for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); return new RebalanceSummaryResult(serverInfo, segmentInfo); } + private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary getConsumingSegmentSummary(String tableNameWithType, + Map<String, Set<String>> newServersToConsumingSegmentMap) { + int numConsumingSegmentsToBeMoved = + newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> a + b.size(), Integer::sum); + Set<String> uniqueConsumingSegments = + newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()); + Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new HashMap<>(); + uniqueConsumingSegments.forEach(segment -> consumingSegmentZKmetadata.put(segment, + ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), tableNameWithType, segment))); + Map<String, Integer> consumingSegmentsOffsetsToCatchUp = + getConsumingSegmentsOffsetsToCatchUp(tableNameWithType, consumingSegmentZKmetadata); + Map<String, Integer> consumingSegmentsAge = getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata); + + Map<String, Integer> topTenOffset; Review Comment: nit: let's rename to topOffsets? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -733,15 +781,148 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St serversGettingNewSegments, serverSegmentChangeInfoMap); // TODO: Add a metric to estimate the total time it will take to rebalance. Need some good heuristics on how // rebalance time can vary with number of segments added + RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary consumingSegmentToBeMovedSummary = + isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType, newServersToConsumingSegmentMap); RebalanceSummaryResult.SegmentInfo segmentInfo = new RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved, maxSegmentsAddedToServer, averageSegmentSizeInBytes, totalEstimatedDataToBeMovedInBytes, - replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas); + replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary); LOGGER.info("Calculated rebalance summary for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); return new RebalanceSummaryResult(serverInfo, segmentInfo); } + private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary getConsumingSegmentSummary(String tableNameWithType, + Map<String, Set<String>> newServersToConsumingSegmentMap) { + int numConsumingSegmentsToBeMoved = + newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> a + b.size(), Integer::sum); + Set<String> uniqueConsumingSegments = + newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()); + Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new HashMap<>(); + uniqueConsumingSegments.forEach(segment -> consumingSegmentZKmetadata.put(segment, + ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), tableNameWithType, segment))); + Map<String, Integer> consumingSegmentsOffsetsToCatchUp = + getConsumingSegmentsOffsetsToCatchUp(tableNameWithType, consumingSegmentZKmetadata); + Map<String, Integer> consumingSegmentsAge = getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata); + + Map<String, Integer> topTenOffset; + Map<String, RebalanceSummaryResult.ConsumingSegmentSummaryPerServer> consumingSegmentSummaryPerServer = + new HashMap<>(); + if (consumingSegmentsOffsetsToCatchUp != null) { + topTenOffset = new LinkedHashMap<>(); + consumingSegmentsOffsetsToCatchUp.entrySet() + .stream() + .sorted( + Collections.reverseOrder(Map.Entry.comparingByValue())) + .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY) + .forEach(entry -> topTenOffset.put(entry.getKey(), entry.getValue())); + newServersToConsumingSegmentMap.forEach((server, segments) -> { + int totalOffsetsToCatchUp = + segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum(); + consumingSegmentSummaryPerServer.put(server, new RebalanceSummaryResult.ConsumingSegmentSummaryPerServer( + segments.size(), totalOffsetsToCatchUp)); + }); + } else { + topTenOffset = null; + newServersToConsumingSegmentMap.forEach((server, segments) -> { + consumingSegmentSummaryPerServer.put(server, new RebalanceSummaryResult.ConsumingSegmentSummaryPerServer( + segments.size(), null)); + }); + } + + Map<String, Integer> oldestTenSegment; + oldestTenSegment = new LinkedHashMap<>(); + consumingSegmentsAge.entrySet() + .stream() + .sorted( + Map.Entry.comparingByValue()) + .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY) + .forEach(entry -> oldestTenSegment.put(entry.getKey(), entry.getValue())); + + return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved, + newServersToConsumingSegmentMap.size(), topTenOffset, oldestTenSegment, consumingSegmentSummaryPerServer); + } + + private Map<String, Integer> getConsumingSegmentsAge(String tableNameWithType, Review Comment: nit: let's add a comment to explain what this function does as well? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -733,15 +781,148 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St serversGettingNewSegments, serverSegmentChangeInfoMap); // TODO: Add a metric to estimate the total time it will take to rebalance. Need some good heuristics on how // rebalance time can vary with number of segments added + RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary consumingSegmentToBeMovedSummary = + isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType, newServersToConsumingSegmentMap); RebalanceSummaryResult.SegmentInfo segmentInfo = new RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved, maxSegmentsAddedToServer, averageSegmentSizeInBytes, totalEstimatedDataToBeMovedInBytes, - replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas); + replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary); LOGGER.info("Calculated rebalance summary for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); return new RebalanceSummaryResult(serverInfo, segmentInfo); } + private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary getConsumingSegmentSummary(String tableNameWithType, + Map<String, Set<String>> newServersToConsumingSegmentMap) { + int numConsumingSegmentsToBeMoved = + newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> a + b.size(), Integer::sum); + Set<String> uniqueConsumingSegments = + newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()); + Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new HashMap<>(); + uniqueConsumingSegments.forEach(segment -> consumingSegmentZKmetadata.put(segment, + ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), tableNameWithType, segment))); + Map<String, Integer> consumingSegmentsOffsetsToCatchUp = + getConsumingSegmentsOffsetsToCatchUp(tableNameWithType, consumingSegmentZKmetadata); + Map<String, Integer> consumingSegmentsAge = getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata); + + Map<String, Integer> topTenOffset; + Map<String, RebalanceSummaryResult.ConsumingSegmentSummaryPerServer> consumingSegmentSummaryPerServer = + new HashMap<>(); + if (consumingSegmentsOffsetsToCatchUp != null) { + topTenOffset = new LinkedHashMap<>(); + consumingSegmentsOffsetsToCatchUp.entrySet() + .stream() + .sorted( + Collections.reverseOrder(Map.Entry.comparingByValue())) + .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY) + .forEach(entry -> topTenOffset.put(entry.getKey(), entry.getValue())); + newServersToConsumingSegmentMap.forEach((server, segments) -> { + int totalOffsetsToCatchUp = + segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum(); + consumingSegmentSummaryPerServer.put(server, new RebalanceSummaryResult.ConsumingSegmentSummaryPerServer( + segments.size(), totalOffsetsToCatchUp)); + }); + } else { + topTenOffset = null; + newServersToConsumingSegmentMap.forEach((server, segments) -> { + consumingSegmentSummaryPerServer.put(server, new RebalanceSummaryResult.ConsumingSegmentSummaryPerServer( + segments.size(), null)); + }); + } + + Map<String, Integer> oldestTenSegment; Review Comment: nit: let's rename: `oldestConsumingSegments` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -733,15 +781,148 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St serversGettingNewSegments, serverSegmentChangeInfoMap); // TODO: Add a metric to estimate the total time it will take to rebalance. Need some good heuristics on how // rebalance time can vary with number of segments added + RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary consumingSegmentToBeMovedSummary = + isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType, newServersToConsumingSegmentMap); RebalanceSummaryResult.SegmentInfo segmentInfo = new RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved, maxSegmentsAddedToServer, averageSegmentSizeInBytes, totalEstimatedDataToBeMovedInBytes, - replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas); + replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary); LOGGER.info("Calculated rebalance summary for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); return new RebalanceSummaryResult(serverInfo, segmentInfo); } + private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary getConsumingSegmentSummary(String tableNameWithType, + Map<String, Set<String>> newServersToConsumingSegmentMap) { + int numConsumingSegmentsToBeMoved = + newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> a + b.size(), Integer::sum); + Set<String> uniqueConsumingSegments = + newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()); + Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new HashMap<>(); + uniqueConsumingSegments.forEach(segment -> consumingSegmentZKmetadata.put(segment, + ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), tableNameWithType, segment))); + Map<String, Integer> consumingSegmentsOffsetsToCatchUp = + getConsumingSegmentsOffsetsToCatchUp(tableNameWithType, consumingSegmentZKmetadata); + Map<String, Integer> consumingSegmentsAge = getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata); + + Map<String, Integer> topTenOffset; + Map<String, RebalanceSummaryResult.ConsumingSegmentSummaryPerServer> consumingSegmentSummaryPerServer = + new HashMap<>(); + if (consumingSegmentsOffsetsToCatchUp != null) { + topTenOffset = new LinkedHashMap<>(); + consumingSegmentsOffsetsToCatchUp.entrySet() + .stream() + .sorted( + Collections.reverseOrder(Map.Entry.comparingByValue())) + .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY) + .forEach(entry -> topTenOffset.put(entry.getKey(), entry.getValue())); + newServersToConsumingSegmentMap.forEach((server, segments) -> { + int totalOffsetsToCatchUp = + segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum(); + consumingSegmentSummaryPerServer.put(server, new RebalanceSummaryResult.ConsumingSegmentSummaryPerServer( + segments.size(), totalOffsetsToCatchUp)); + }); + } else { + topTenOffset = null; + newServersToConsumingSegmentMap.forEach((server, segments) -> { + consumingSegmentSummaryPerServer.put(server, new RebalanceSummaryResult.ConsumingSegmentSummaryPerServer( + segments.size(), null)); + }); + } + + Map<String, Integer> oldestTenSegment; + oldestTenSegment = new LinkedHashMap<>(); + consumingSegmentsAge.entrySet() + .stream() + .sorted( + Map.Entry.comparingByValue()) + .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY) + .forEach(entry -> oldestTenSegment.put(entry.getKey(), entry.getValue())); + + return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved, + newServersToConsumingSegmentMap.size(), topTenOffset, oldestTenSegment, consumingSegmentSummaryPerServer); + } + + private Map<String, Integer> getConsumingSegmentsAge(String tableNameWithType, + Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) { + Map<String, Integer> consumingSegmentsAge = new HashMap<>(); + long now = System.currentTimeMillis(); + consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> { + long creationTime = segmentZKMetadata.getCreationTime(); + if (creationTime < 0) { + LOGGER.warn("Creation time is not found for segment: {} in table: {}", s, tableNameWithType); + return; + } + consumingSegmentsAge.put(s, (int) (now - creationTime) / 60_000); + })); + return consumingSegmentsAge; + } + + @VisibleForTesting + ConsumingSegmentInfoReader getConsumingSegmentInfoReader() { + if (_executorService == null || _connectionManager == null || _pinotHelixResourceManager == null) { + return null; + } + return new ConsumingSegmentInfoReader(_executorService, _connectionManager, _pinotHelixResourceManager); + } + + /** + * Fetches the consuming segment info for the table and calculates the number of offsets to catch up for each + * consuming segment. consumingSegmentZKMetadata is a map from consuming segments to be moved to their ZK metadata. + * Returns a map from segment name to the number of offsets to catch up for that consuming + * segment. Return null if failed to obtain info for any consuming segment. + */ + private Map<String, Integer> getConsumingSegmentsOffsetsToCatchUp(String tableNameWithType, + Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) { + if (consumingSegmentZKMetadata.isEmpty()) { + LOGGER.info("No consuming segments being moved for table: {}", tableNameWithType); + return new HashMap<>(); + } + ConsumingSegmentInfoReader consumingSegmentInfoReader = getConsumingSegmentInfoReader(); + if (consumingSegmentInfoReader == null) { + LOGGER.warn("ConsumingSegmentInfoReader is null, cannot calculate consuming segments info for table: {}", + tableNameWithType); + return null; + } + Map<String, Integer> segmentToOffsetsToCatchUp = new HashMap<>(); + try { + ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap = + consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, 30_000); + for (Map.Entry<String, SegmentZKMetadata> entry : consumingSegmentZKMetadata.entrySet()) { + String segmentName = entry.getKey(); + List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfoList = + consumingSegmentsInfoMap._segmentToConsumingInfoMap.getOrDefault(segmentName, null); + SegmentZKMetadata segmentZKMetadata = entry.getValue(); + if (segmentZKMetadata == null) { + LOGGER.warn("Cannot find SegmentZKMetadata for segment: {} in table: {}", segmentName, tableNameWithType); + return null; + } + String startOffset = segmentZKMetadata.getStartOffset(); + if (startOffset == null) { + LOGGER.warn("Start offset is null for segment: {} in table: {}", segmentName, tableNameWithType); + return null; + } + if (consumingSegmentInfoList != null && !consumingSegmentInfoList.isEmpty()) { + // this value should be the same regardless of which server the consuming segment info is from, use the + // first in the list here + int offsetsToCatchUp = + consumingSegmentInfoList.get(0)._partitionOffsetInfo._latestUpstreamOffsetMap.values() Review Comment: should there be a null check for `_partitionOffsetInfo` and `_latestUpstreamOffsetMap`? (I know I ran into `_latestUpstreamOffsetMap` being null for Kinesis) ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -733,15 +781,148 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St serversGettingNewSegments, serverSegmentChangeInfoMap); // TODO: Add a metric to estimate the total time it will take to rebalance. Need some good heuristics on how // rebalance time can vary with number of segments added + RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary consumingSegmentToBeMovedSummary = Review Comment: would it make sense to skip calling `getConsumingSegmentSummary` altogether if `newServersToConsumingSegmentMap` is empty? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -243,6 +244,8 @@ private enum LineageUpdateType { private final LineageManager _lineageManager; private final RebalancePreChecker _rebalancePreChecker; private TableSizeReader _tableSizeReader; + private final ExecutorService _executorService; + private HttpClientConnectionManager _connectionManager; Review Comment: cc @xiangfu0 @Jackie-Jiang @klsince We wanted to know what's the recommendation on passing some of these objects. I see that both `_executorService` and `_connectionManager` are available in the Restlet classes. Should we always be passing these down from there for handling APIs? And for non-Restlet called paths, do we just create these and pass them in? Or doing it the way it's done in this PR is fine? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org