klsince commented on code in PR #15266: URL: https://github.com/apache/pinot/pull/15266#discussion_r2027561893
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java: ########## @@ -118,6 +128,116 @@ public RebalanceStateStats getCurrentToTargetConvergence() { return _currentToTargetConvergence; } + public RebalanceProgressStats getRebalanceProgressStatsOverall() { + return _rebalanceProgressStatsOverall; + } + + public RebalanceProgressStats getRebalanceProgressStatsCurrentStep() { + return _rebalanceProgressStatsCurrentStep; + } + + /** + * Updates the overall and step progress stats based on the latest calculated step's progress stats. This should + * be called during the EV-IS convergence trigger to ensure the overall stats reflect the changes as they are made. + * @param latestStepStats latest step level stats calculated in this iteration + */ + public void updateOverallAndStepStatsFromLatestStepStats( + TableRebalanceProgressStats.RebalanceProgressStats latestStepStats) { + TableRebalanceProgressStats.RebalanceProgressStats lastStepStats = getRebalanceProgressStatsCurrentStep(); + TableRebalanceProgressStats.RebalanceProgressStats lastOverallStats = getRebalanceProgressStatsOverall(); + int numAdditionalSegmentsAdded = + latestStepStats._totalSegmentsToBeAdded - lastStepStats._totalSegmentsToBeAdded; + int numAdditionalSegmentsDeleted = + latestStepStats._totalSegmentsToBeDeleted - lastStepStats._totalSegmentsToBeDeleted; + int upperBoundOnSegmentsAdded = Review Comment: I feel this method is a bit hard to follow, perhaps due to the var names are kinda similar `last..` vs. `latest... ` could you help add some comments to split the method into a few major sections, which might help read it. btw, why we need to use `upperBoundOnSegmentsAdded`? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -402,9 +402,17 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb } } + List<String> segmentsToMove = SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment); + Set<String> segmentsToMonitor = new HashSet<>(segmentsToMove); + + long estimatedAverageSegmentSizeInBytes = summaryResult.getSegmentInfo().getEstimatedAverageSegmentSizeInBytes(); + Set<String> allOriginalSegmentsIdealState = currentAssignment.keySet(); Review Comment: nit: allOriginalSegmentsIdealState -> allSegmentsFromIdealState or is there a special reason to call it out 'original'? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java: ########## @@ -284,4 +319,204 @@ public static TableRebalanceProgressStats.RebalanceStateStats getDifferenceBetwe (totalSegments == 0) ? 0 : ((double) rebalanceStats._segmentsToRebalance / totalSegments) * 100.0; return rebalanceStats; } + + /** + * Calculates the progress stats for the given step or for the overall based on the trigger type + * @return the calculated step or progress stats + */ + @VisibleForTesting + static TableRebalanceProgressStats.RebalanceProgressStats calculateUpdatedProgressStats( + Map<String, Map<String, String>> targetAssignment, Map<String, Map<String, String>> currentAssignment, + RebalanceContext rebalanceContext, Trigger trigger, TableRebalanceProgressStats rebalanceProgressStats) { + Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>(); + Map<String, Set<String>> newServersToSegmentMap = new HashMap<>(); + Map<String, Set<String>> targetInstanceToOfflineSegmentsMap = new HashMap<>(); + Set<String> newSegmentsNotExistingBefore = new HashSet<>(); + + Set<String> segmentsToMonitor = rebalanceContext.getSegmentsToMonitor(); + int totalNewSegmentsNotMonitored = 0; + int totalSegmentsTarget = 0; + for (Map.Entry<String, Map<String, String>> entrySet : targetAssignment.entrySet()) { + String segmentName = entrySet.getKey(); + if (!rebalanceContext.getUniqueSegments().contains(segmentName)) { + newSegmentsNotExistingBefore.add(segmentName); + } + for (Map.Entry<String, String> entry : entrySet.getValue().entrySet()) { + String instanceName = entry.getKey(); + String instanceState = entry.getValue(); + if (segmentsToMonitor != null && !segmentsToMonitor.contains(segmentName)) { Review Comment: hmm. sorry, i'm a bit confused on what this if-check tries to do? I see it didn't use instanceName/State so why it's inside the for-loop and why segmentsToMonitor is checked here 🤔 ? pls shed some light, or perhaps add a bit more comments for this bit method to help understand it. thanks! ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java: ########## @@ -284,4 +319,204 @@ public static TableRebalanceProgressStats.RebalanceStateStats getDifferenceBetwe (totalSegments == 0) ? 0 : ((double) rebalanceStats._segmentsToRebalance / totalSegments) * 100.0; return rebalanceStats; } + + /** + * Calculates the progress stats for the given step or for the overall based on the trigger type + * @return the calculated step or progress stats + */ + @VisibleForTesting + static TableRebalanceProgressStats.RebalanceProgressStats calculateUpdatedProgressStats( + Map<String, Map<String, String>> targetAssignment, Map<String, Map<String, String>> currentAssignment, + RebalanceContext rebalanceContext, Trigger trigger, TableRebalanceProgressStats rebalanceProgressStats) { + Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>(); + Map<String, Set<String>> newServersToSegmentMap = new HashMap<>(); + Map<String, Set<String>> targetInstanceToOfflineSegmentsMap = new HashMap<>(); + Set<String> newSegmentsNotExistingBefore = new HashSet<>(); + + Set<String> segmentsToMonitor = rebalanceContext.getSegmentsToMonitor(); + int totalNewSegmentsNotMonitored = 0; + int totalSegmentsTarget = 0; + for (Map.Entry<String, Map<String, String>> entrySet : targetAssignment.entrySet()) { + String segmentName = entrySet.getKey(); + if (!rebalanceContext.getUniqueSegments().contains(segmentName)) { + newSegmentsNotExistingBefore.add(segmentName); + } + for (Map.Entry<String, String> entry : entrySet.getValue().entrySet()) { + String instanceName = entry.getKey(); + String instanceState = entry.getValue(); + if (segmentsToMonitor != null && !segmentsToMonitor.contains(segmentName)) { + if (newSegmentsNotExistingBefore.contains(segmentName)) { + // Don't track newly added segments unless they're on the monitor list + totalNewSegmentsNotMonitored++; + newSegmentsNotExistingBefore.remove(segmentName); + } + continue; + } + if (instanceState.equals(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE)) { + // Skip tracking segments that are in OFFLINE state in the target assignment + targetInstanceToOfflineSegmentsMap.computeIfAbsent(instanceName, k -> new HashSet<>()).add(segmentName); + continue; + } + totalSegmentsTarget += 1; + newServersToSegmentMap.computeIfAbsent(instanceName, k -> new HashSet<>()).add(segmentName); + } + } + + for (Map.Entry<String, Map<String, String>> entrySet : currentAssignment.entrySet()) { + String segmentName = entrySet.getKey(); + for (String instanceName : entrySet.getValue().keySet()) { + if (segmentsToMonitor != null && !segmentsToMonitor.contains(segmentName)) { + continue; + } + if (targetInstanceToOfflineSegmentsMap.containsKey(instanceName) + && targetInstanceToOfflineSegmentsMap.get(instanceName).contains(segmentName)) { + // Skip tracking segments that are in OFFLINE state in the target assignment + continue; + } + existingServersToSegmentMap.computeIfAbsent(instanceName, k -> new HashSet<>()).add(segmentName); + } + } + + int segmentsNotMoved = 0; + int totalSegmentsToBeDeleted = 0; + int segmentsUnchangedYetNotConverged = 0; + for (Map.Entry<String, Set<String>> entry : newServersToSegmentMap.entrySet()) { + String server = entry.getKey(); + Set<String> segmentSet = entry.getValue(); + + Set<String> newSegmentSet = new HashSet<>(segmentSet); + Set<String> existingSegmentSet = new HashSet<>(); + int segmentsUnchanged = 0; + if (existingServersToSegmentMap.containsKey(server)) { + Set<String> segmentSetForServer = existingServersToSegmentMap.get(server); + existingSegmentSet.addAll(segmentSetForServer); + Set<String> intersection = new HashSet<>(segmentSetForServer); + intersection.retainAll(newSegmentSet); + segmentsUnchanged = intersection.size(); + segmentsNotMoved += segmentsUnchanged; + + for (String segmentName : intersection) { + String currentInstanceState = currentAssignment.get(segmentName).get(server); + String targetInstanceState = targetAssignment.get(segmentName).get(server); + if (!currentInstanceState.equals(targetInstanceState)) { + segmentsUnchangedYetNotConverged++; + } + } + } + newSegmentSet.removeAll(existingSegmentSet); + totalSegmentsToBeDeleted += existingSegmentSet.size() - segmentsUnchanged; + } + + for (Map.Entry<String, Set<String>> entry : existingServersToSegmentMap.entrySet()) { + if (!newServersToSegmentMap.containsKey(entry.getKey())) { + totalSegmentsToBeDeleted += entry.getValue().size(); + } + } + + int newSegsAddedInThisAssignment = 0; + int newSegsDeletedInThisAssignment = 0; + for (String segment : newSegmentsNotExistingBefore) { + Set<String> currentSegmentAssign = currentAssignment.get(segment) != null + ? currentAssignment.get(segment).keySet() : new HashSet<>(); + Set<String> targetSegmentAssign = targetAssignment.get(segment) != null + ? targetAssignment.get(segment).keySet() : new HashSet<>(); + + Set<String> segmentsAdded = new HashSet<>(targetSegmentAssign); + segmentsAdded.removeAll(currentSegmentAssign); + newSegsAddedInThisAssignment += segmentsAdded.size(); + + Set<String> segmentsDeleted = new HashSet<>(currentSegmentAssign); + segmentsDeleted.removeAll(targetSegmentAssign); + newSegsDeletedInThisAssignment += segmentsDeleted.size(); + } + + int newNumberSegmentsTotal = totalSegmentsTarget; + int totalSegmentsToBeAdded = newNumberSegmentsTotal - segmentsNotMoved; + + TableRebalanceProgressStats.RebalanceProgressStats progressStats = + new TableRebalanceProgressStats.RebalanceProgressStats(); + switch (trigger) { + case START_TRIGGER: + case NEXT_ASSINGMENT_CALCULATION_TRIGGER: + // These are initialization steps for global / step progress stats + progressStats._totalSegmentsToBeAdded = totalSegmentsToBeAdded; + progressStats._totalSegmentsToBeDeleted = totalSegmentsToBeDeleted; + progressStats._totalRemainingSegmentsToBeAdded = totalSegmentsToBeAdded; + progressStats._totalRemainingSegmentsToBeDeleted = totalSegmentsToBeDeleted; + progressStats._totalCarryOverSegmentsToBeAdded = 0; + progressStats._totalCarryOverSegmentsToBeDeleted = 0; + progressStats._totalRemainingSegmentsToConverge = segmentsUnchangedYetNotConverged; + progressStats._totalUniqueNewUntrackedSegmentsDuringRebalance = totalNewSegmentsNotMonitored; + progressStats._percentageRemainingSegmentsToBeAdded = totalSegmentsToBeAdded == 0 ? 0.0 : 100.0; + progressStats._percentageRemainingSegmentsToBeDeleted = totalSegmentsToBeDeleted == 0 ? 0.0 : 100.0; + progressStats._estimatedTimeToCompleteAddsInSeconds = totalSegmentsToBeAdded == 0 ? 0.0 : -1.0; + progressStats._estimatedTimeToCompleteDeletesInSeconds = totalSegmentsToBeDeleted == 0 ? 0.0 : -1.0; + progressStats._averageSegmentSizeInBytes = rebalanceContext.getEstimatedAverageSegmentSizeInBytes(); + progressStats._totalEstimatedDataToBeMovedInBytes = + TableRebalanceProgressStats.calculateNewEstimatedDataToBeMovedInBytes(0, + rebalanceContext.getEstimatedAverageSegmentSizeInBytes(), totalSegmentsToBeAdded); + progressStats._startTimeMs = trigger == Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER + ? System.currentTimeMillis() : rebalanceProgressStats.getStartTimeMs(); + break; + case IDEAL_STATE_CHANGE_TRIGGER: + case EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER: Review Comment: with the two checks on `trigger == Trigger.IDEAL_STATE_CHANGE_TRIGGER` and `trigger == Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER` , perhaps just split them into two cases ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java: ########## @@ -284,4 +319,204 @@ public static TableRebalanceProgressStats.RebalanceStateStats getDifferenceBetwe (totalSegments == 0) ? 0 : ((double) rebalanceStats._segmentsToRebalance / totalSegments) * 100.0; return rebalanceStats; } + + /** + * Calculates the progress stats for the given step or for the overall based on the trigger type + * @return the calculated step or progress stats + */ + @VisibleForTesting + static TableRebalanceProgressStats.RebalanceProgressStats calculateUpdatedProgressStats( + Map<String, Map<String, String>> targetAssignment, Map<String, Map<String, String>> currentAssignment, + RebalanceContext rebalanceContext, Trigger trigger, TableRebalanceProgressStats rebalanceProgressStats) { + Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>(); + Map<String, Set<String>> newServersToSegmentMap = new HashMap<>(); + Map<String, Set<String>> targetInstanceToOfflineSegmentsMap = new HashMap<>(); + Set<String> newSegmentsNotExistingBefore = new HashSet<>(); + + Set<String> segmentsToMonitor = rebalanceContext.getSegmentsToMonitor(); + int totalNewSegmentsNotMonitored = 0; + int totalSegmentsTarget = 0; + for (Map.Entry<String, Map<String, String>> entrySet : targetAssignment.entrySet()) { + String segmentName = entrySet.getKey(); + if (!rebalanceContext.getUniqueSegments().contains(segmentName)) { + newSegmentsNotExistingBefore.add(segmentName); + } + for (Map.Entry<String, String> entry : entrySet.getValue().entrySet()) { + String instanceName = entry.getKey(); + String instanceState = entry.getValue(); + if (segmentsToMonitor != null && !segmentsToMonitor.contains(segmentName)) { + if (newSegmentsNotExistingBefore.contains(segmentName)) { + // Don't track newly added segments unless they're on the monitor list + totalNewSegmentsNotMonitored++; + newSegmentsNotExistingBefore.remove(segmentName); + } + continue; + } + if (instanceState.equals(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE)) { + // Skip tracking segments that are in OFFLINE state in the target assignment + targetInstanceToOfflineSegmentsMap.computeIfAbsent(instanceName, k -> new HashSet<>()).add(segmentName); + continue; + } + totalSegmentsTarget += 1; Review Comment: nit: ++ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org