J-HowHuang commented on code in PR #15266: URL: https://github.com/apache/pinot/pull/15266#discussion_r2027413419
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java: ########## @@ -19,33 +19,25 @@ package org.apache.pinot.controller.helix.core.rebalance; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import java.util.Objects; /** * These are rebalance stats as to how the current state is, when compared to the target state. * Eg: If the current has 4 segments whose replicas (16) don't match the target state, _segmentsToRebalance * is 4 and _replicasToRebalance is 16. */ Review Comment: Should this be moved down to `RebalanceStateStats`? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java: ########## @@ -118,6 +128,165 @@ public RebalanceStateStats getCurrentToTargetConvergence() { return _currentToTargetConvergence; } + public RebalanceProgressStats getRebalanceProgressStatsOverall() { + return _rebalanceProgressStatsOverall; + } + + public RebalanceProgressStats getRebalanceProgressStatsCurrentStep() { + return _rebalanceProgressStatsCurrentStep; + } + + /** + * Updates the overall and step progress stats based on the latest calculated step's progress stats. This should + * be called during the EV-IS convergence trigger to ensure the overall stats reflect the changes as they are made. + * @param currentStepStats latest step level stats calculated in this iteration + */ + public void updateOverallAndStepStatsFromLatestStepStats( + TableRebalanceProgressStats.RebalanceProgressStats currentStepStats) { + // Fetch the step level and overall stats that were calculated in the last convergence check. These will be used + // to calculate the overall stats in the current convergence check + TableRebalanceProgressStats.RebalanceProgressStats previousStepStats = getRebalanceProgressStatsCurrentStep(); + TableRebalanceProgressStats.RebalanceProgressStats previousOverallStats = getRebalanceProgressStatsOverall(); + + // Calculate the new segments added / deleted since the last step to track overall change in segments. These are + // only new segments tracked by table rebalance (segments that aren't tracked by rebalance convergence check are + // ignored because the convergence check does not wait for them to complete) + int numAdditionalSegmentsAdded = + currentStepStats._totalSegmentsToBeAdded - previousStepStats._totalSegmentsToBeAdded; + int numAdditionalSegmentsDeleted = + currentStepStats._totalSegmentsToBeDeleted - previousStepStats._totalSegmentsToBeDeleted; + + // Calculate the carry-over segment adds / deletes from the previous rebalance step if any (can happen if + // bestEfforts=true or if there are deletions left over from the last step) and the difference in the number of + // segments processed since the previous stats update. + // Example of what carry-over will look like: + // previousStepStats: totalSegmentsToBeAdded = 10 + // previousStepStats: totalRemainingSegmentsToBeAdded = 15 (this can only be larger than totalSegmentsToBeAdded + // if carry-over segments exist) + // Carry-over: 15 - 10 = 5 segments -> which were added in the last IS update by rebalance, but which didn't + // finish getting added before making the next assignment IS update + // Using the above, we calculate the 'upperBoundOnSegmentsAdded/Deleted' as the maximum segments to use when + // calculating the number of segments processed (number of segments that got added / deleted in EV based on expected + // IS) since the last step. We want to skip including the carry-over segments if any as they are tracked separately. + int upperBoundOnSegmentsAdded = + previousStepStats._totalRemainingSegmentsToBeAdded > previousStepStats._totalSegmentsToBeAdded + ? previousStepStats._totalSegmentsToBeAdded : previousStepStats._totalRemainingSegmentsToBeAdded; + int upperBoundOnSegmentsDeleted = + previousStepStats._totalRemainingSegmentsToBeDeleted > previousStepStats._totalSegmentsToBeDeleted + ? previousStepStats._totalSegmentsToBeDeleted : previousStepStats._totalRemainingSegmentsToBeDeleted; + + // Number of adds / deletes processed in the last step. Take the absolute here since if new segments were added / + // deleted in the current step that are being tracking, we need to treat those as new segments that remain to + // be processed. + int numSegmentAddsProcessedInLastStep = Math.abs(upperBoundOnSegmentsAdded + - currentStepStats._totalRemainingSegmentsToBeAdded); + int numSegmentDeletesProcessedInLastStep = Math.abs(upperBoundOnSegmentsDeleted + - currentStepStats._totalRemainingSegmentsToBeDeleted); + + // Number of untracked segments that were added + int numberNewUntrackedSegmentsAdded = currentStepStats._totalUniqueNewUntrackedSegmentsDuringRebalance + - previousStepStats._totalUniqueNewUntrackedSegmentsDuringRebalance; + + TableRebalanceProgressStats.RebalanceProgressStats currentOverallStats = + new TableRebalanceProgressStats.RebalanceProgressStats(); + + // New number of total segment adds / deletes should include any new segments added from the previous stats + currentOverallStats._totalSegmentsToBeAdded = previousOverallStats._totalSegmentsToBeAdded + + numAdditionalSegmentsAdded; + currentOverallStats._totalSegmentsToBeDeleted = previousOverallStats._totalSegmentsToBeDeleted + + numAdditionalSegmentsDeleted; + + // If carry-over segments are present, keep the upper bound of remaining segments to be added / deleted at the same + // level as the previous stats. This is so we track progress to handle the carry-over segments before we start + // tracking progress against the segments that need to be handled as part of the current rebalance step. + if (currentStepStats._totalCarryOverSegmentsToBeAdded > 0) { + currentOverallStats._totalRemainingSegmentsToBeAdded = previousOverallStats._totalRemainingSegmentsToBeAdded; + } else { + // If additional segments were added in the previous step, these need to be added to the remaining to be added + // number as they are new segments that should be tracked for completion + // Example: + // currentStepStats: totalSegmentsToBeAdded = 20 + // previousStepStats: totalSegmentsToBeAdded = 10 + // numAdditionalSegmentsAdded = 20 - 10 = 10 + // In the above scenario, the new 'numAdditionalSegmentsAdded' also need to be tracked as + // 'totalRemainingSegmentsToBeAdded' for the current step, thus we need to add the number of segments processed + // If 'numAdditionalSegmentsAdded' = 0, that means no new segments were added in the previous step, and if there + // was a change in the segments processed in the last step, they completed processing + currentOverallStats._totalRemainingSegmentsToBeAdded = numAdditionalSegmentsAdded == 0 + ? previousOverallStats._totalRemainingSegmentsToBeAdded - numSegmentAddsProcessedInLastStep + : previousOverallStats._totalRemainingSegmentsToBeAdded + numSegmentAddsProcessedInLastStep; Review Comment: What if the previous step has ``` { totalSegmentsToBeAdded: 10, totalRemainingSegmentsToBeAdded: 10 } ``` and current has ``` { totalSegmentsToBeAdded: 15, totalRemainingSegmentsToBeAdded: 5 } ``` Does the new overall stats get 5 more segments (`abs(upperBoundOnSegmentsAdded - currentStepStats._totalRemainingSegmentsToBeAdded) == 5`) in `_totalRemainingSegmentsToBeAdded` than the previous overall stats? I was a bit confused because I thought we should have +5 new segment to add and -10 segments this step added so a net -5 remaining segment to added right? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java: ########## @@ -284,4 +319,204 @@ public static TableRebalanceProgressStats.RebalanceStateStats getDifferenceBetwe (totalSegments == 0) ? 0 : ((double) rebalanceStats._segmentsToRebalance / totalSegments) * 100.0; return rebalanceStats; } + + /** + * Calculates the progress stats for the given step or for the overall based on the trigger type + * @return the calculated step or progress stats + */ + @VisibleForTesting + static TableRebalanceProgressStats.RebalanceProgressStats calculateUpdatedProgressStats( + Map<String, Map<String, String>> targetAssignment, Map<String, Map<String, String>> currentAssignment, + RebalanceContext rebalanceContext, Trigger trigger, TableRebalanceProgressStats rebalanceProgressStats) { + Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>(); + Map<String, Set<String>> newServersToSegmentMap = new HashMap<>(); + Map<String, Set<String>> targetInstanceToOfflineSegmentsMap = new HashMap<>(); + Set<String> newSegmentsNotExistingBefore = new HashSet<>(); + + Set<String> segmentsToMonitor = rebalanceContext.getSegmentsToMonitor(); + int totalNewSegmentsNotMonitored = 0; + int totalSegmentsTarget = 0; + for (Map.Entry<String, Map<String, String>> entrySet : targetAssignment.entrySet()) { + String segmentName = entrySet.getKey(); + if (!rebalanceContext.getUniqueSegments().contains(segmentName)) { + newSegmentsNotExistingBefore.add(segmentName); + } + for (Map.Entry<String, String> entry : entrySet.getValue().entrySet()) { + String instanceName = entry.getKey(); + String instanceState = entry.getValue(); + if (segmentsToMonitor != null && !segmentsToMonitor.contains(segmentName)) { Review Comment: Shouldn't this part be placed one scope out? Right now it skips a replica instead of the whole segment ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java: ########## @@ -284,4 +319,253 @@ public static TableRebalanceProgressStats.RebalanceStateStats getDifferenceBetwe (totalSegments == 0) ? 0 : ((double) rebalanceStats._segmentsToRebalance / totalSegments) * 100.0; return rebalanceStats; } + + /** + * Calculates the progress stats for the given step or for the overall based on the trigger type + * @param targetAssignment target assignment (either updated IS or the target end IS depending on the step) + * @param currentAssignment current assignment (either EV or the current IS depending on the step) + * @param rebalanceContext rebalance context + * @param trigger reason to trigger the stats update + * @param rebalanceProgressStats current value of the rebalance progress stats, used to calculate the next + * @return the calculated step or progress stats + */ + @VisibleForTesting + static TableRebalanceProgressStats.RebalanceProgressStats calculateUpdatedProgressStats( + Map<String, Map<String, String>> targetAssignment, Map<String, Map<String, String>> currentAssignment, + RebalanceContext rebalanceContext, Trigger trigger, TableRebalanceProgressStats rebalanceProgressStats) { + Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>(); + Map<String, Set<String>> newServersToSegmentMap = new HashMap<>(); + Map<String, Set<String>> targetInstanceToOfflineSegmentsMap = new HashMap<>(); + Set<String> newSegmentsNotExistingBefore = new HashSet<>(); + + // Segments to monitor is the list of segments that are being moved as part of the table rebalance that the + // table rebalance intends to track convergence for. This list usually includes segments from the last next + // assignment IS update and the current IS update. The EV-IS convergence check only tracks convergence for the + // segments on this list, and if any additional segments are found that haven't converged they are ignored. + // From the stats perspective, we also only care about tracking the convergence of actual segments that the + // rebalance cares about, that is why we skip any segments that are not on this list. + Set<String> segmentsToMonitor = rebalanceContext.getSegmentsToMonitor(); + int totalNewSegmentsNotMonitored = 0; + int totalSegmentsTarget = 0; + for (Map.Entry<String, Map<String, String>> entrySet : targetAssignment.entrySet()) { + String segmentName = entrySet.getKey(); + if (!rebalanceContext.getUniqueSegments().contains(segmentName)) { + newSegmentsNotExistingBefore.add(segmentName); + } + for (Map.Entry<String, String> entry : entrySet.getValue().entrySet()) { + String instanceName = entry.getKey(); + String instanceState = entry.getValue(); + // Don't track segments that are not on the rebalance monitor list + if (segmentsToMonitor != null && !segmentsToMonitor.contains(segmentName)) { + if (newSegmentsNotExistingBefore.contains(segmentName)) { + // Don't track newly added segments unless they're on the monitor list, remove them if they were added + // before + totalNewSegmentsNotMonitored++; + newSegmentsNotExistingBefore.remove(segmentName); + } + continue; + } + if (instanceState.equals(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE)) { + // Skip tracking segments that are in OFFLINE state in the target assignment + targetInstanceToOfflineSegmentsMap.computeIfAbsent(instanceName, k -> new HashSet<>()).add(segmentName); + continue; + } + totalSegmentsTarget++; + newServersToSegmentMap.computeIfAbsent(instanceName, k -> new HashSet<>()).add(segmentName); + } + } + + for (Map.Entry<String, Map<String, String>> entrySet : currentAssignment.entrySet()) { + String segmentName = entrySet.getKey(); + for (String instanceName : entrySet.getValue().keySet()) { + // Don't track segments that are not on the rebalance monitor list + if (segmentsToMonitor != null && !segmentsToMonitor.contains(segmentName)) { + continue; + } + if (targetInstanceToOfflineSegmentsMap.containsKey(instanceName) + && targetInstanceToOfflineSegmentsMap.get(instanceName).contains(segmentName)) { + // Skip tracking segments that are in OFFLINE state in the target assignment + continue; + } + existingServersToSegmentMap.computeIfAbsent(instanceName, k -> new HashSet<>()).add(segmentName); + } + } + + int segmentsNotMoved = 0; + int totalSegmentsToBeDeleted = 0; + int segmentsUnchangedYetNotConverged = 0; + for (Map.Entry<String, Set<String>> entry : newServersToSegmentMap.entrySet()) { + String server = entry.getKey(); + Set<String> segmentSet = entry.getValue(); + + Set<String> newSegmentSet = new HashSet<>(segmentSet); + Set<String> existingSegmentSet = new HashSet<>(); + int segmentsUnchanged = 0; + if (existingServersToSegmentMap.containsKey(server)) { + Set<String> segmentSetForServer = existingServersToSegmentMap.get(server); + existingSegmentSet.addAll(segmentSetForServer); + Set<String> intersection = new HashSet<>(segmentSetForServer); + intersection.retainAll(newSegmentSet); + segmentsUnchanged = intersection.size(); + segmentsNotMoved += segmentsUnchanged; + + for (String segmentName : intersection) { + String currentInstanceState = currentAssignment.get(segmentName).get(server); + String targetInstanceState = targetAssignment.get(segmentName).get(server); + if (!currentInstanceState.equals(targetInstanceState)) { + segmentsUnchangedYetNotConverged++; + } + } + } + newSegmentSet.removeAll(existingSegmentSet); + totalSegmentsToBeDeleted += existingSegmentSet.size() - segmentsUnchanged; + } + + for (Map.Entry<String, Set<String>> entry : existingServersToSegmentMap.entrySet()) { + if (!newServersToSegmentMap.containsKey(entry.getKey())) { + totalSegmentsToBeDeleted += entry.getValue().size(); + } + } + + int newSegsAddedInThisAssignment = 0; + int newSegsDeletedInThisAssignment = 0; + for (String segment : newSegmentsNotExistingBefore) { + Set<String> currentSegmentAssign = currentAssignment.get(segment) != null + ? currentAssignment.get(segment).keySet() : new HashSet<>(); + Set<String> targetSegmentAssign = targetAssignment.get(segment) != null + ? targetAssignment.get(segment).keySet() : new HashSet<>(); + + Set<String> segmentsAdded = new HashSet<>(targetSegmentAssign); + segmentsAdded.removeAll(currentSegmentAssign); + newSegsAddedInThisAssignment += segmentsAdded.size(); + + Set<String> segmentsDeleted = new HashSet<>(currentSegmentAssign); + segmentsDeleted.removeAll(targetSegmentAssign); + newSegsDeletedInThisAssignment += segmentsDeleted.size(); + } + + int newNumberSegmentsTotal = totalSegmentsTarget; + int totalSegmentsToBeAdded = newNumberSegmentsTotal - segmentsNotMoved; + + TableRebalanceProgressStats.RebalanceProgressStats existingProgressStats; + long startTimeMs; + TableRebalanceProgressStats.RebalanceProgressStats progressStats = + new TableRebalanceProgressStats.RebalanceProgressStats(); + switch (trigger) { + case START_TRIGGER: + case NEXT_ASSINGMENT_CALCULATION_TRIGGER: + // These are initialization steps for global / step progress stats + progressStats._totalSegmentsToBeAdded = totalSegmentsToBeAdded; + progressStats._totalSegmentsToBeDeleted = totalSegmentsToBeDeleted; + progressStats._totalRemainingSegmentsToBeAdded = totalSegmentsToBeAdded; + progressStats._totalRemainingSegmentsToBeDeleted = totalSegmentsToBeDeleted; + progressStats._totalCarryOverSegmentsToBeAdded = 0; + progressStats._totalCarryOverSegmentsToBeDeleted = 0; + progressStats._totalRemainingSegmentsToConverge = segmentsUnchangedYetNotConverged; + progressStats._totalUniqueNewUntrackedSegmentsDuringRebalance = totalNewSegmentsNotMonitored; + progressStats._percentageRemainingSegmentsToBeAdded = totalSegmentsToBeAdded == 0 ? 0.0 : 100.0; + progressStats._percentageRemainingSegmentsToBeDeleted = totalSegmentsToBeDeleted == 0 ? 0.0 : 100.0; + progressStats._estimatedTimeToCompleteAddsInSeconds = totalSegmentsToBeAdded == 0 ? 0.0 : -1.0; + progressStats._estimatedTimeToCompleteDeletesInSeconds = totalSegmentsToBeDeleted == 0 ? 0.0 : -1.0; + progressStats._averageSegmentSizeInBytes = rebalanceContext.getEstimatedAverageSegmentSizeInBytes(); + progressStats._totalEstimatedDataToBeMovedInBytes = + TableRebalanceProgressStats.calculateNewEstimatedDataToBeMovedInBytes(0, + rebalanceContext.getEstimatedAverageSegmentSizeInBytes(), totalSegmentsToBeAdded); + progressStats._startTimeMs = trigger == Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER + ? System.currentTimeMillis() : rebalanceProgressStats.getStartTimeMs(); + break; + case IDEAL_STATE_CHANGE_TRIGGER: + existingProgressStats = rebalanceProgressStats.getRebalanceProgressStatsOverall(); + progressStats._totalSegmentsToBeAdded = + existingProgressStats._totalSegmentsToBeAdded + newSegsAddedInThisAssignment; + progressStats._totalSegmentsToBeDeleted = + existingProgressStats._totalSegmentsToBeDeleted + newSegsDeletedInThisAssignment; + progressStats._totalRemainingSegmentsToBeAdded = totalSegmentsToBeAdded; + progressStats._totalRemainingSegmentsToBeDeleted = totalSegmentsToBeDeleted; + progressStats._totalCarryOverSegmentsToBeAdded = 0; + progressStats._totalCarryOverSegmentsToBeDeleted = 0; + progressStats._totalRemainingSegmentsToConverge = segmentsUnchangedYetNotConverged; + progressStats._totalUniqueNewUntrackedSegmentsDuringRebalance = + existingProgressStats._totalUniqueNewUntrackedSegmentsDuringRebalance + totalNewSegmentsNotMonitored; Review Comment: Though I think it's correct accumulate this number over each IS update in rebalancer. But we can also calculate the cardinality diff between `rebalanceContext._uniqueSegments` and `rebalanceContext._segmentsToMonitor` here? I think this way is less error prone -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org