klsince commented on code in PR #15266: URL: https://github.com/apache/pinot/pull/15266#discussion_r2025372338
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java: ########## @@ -118,6 +127,33 @@ public RebalanceStateStats getCurrentToTargetConvergence() { return _currentToTargetConvergence; } + public RebalanceProgressStats getRebalanceProgressStatsOverall() { + return _rebalanceProgressStatsOverall; + } + + public RebalanceProgressStats getRebalanceProgressStatsCurrentStep() { + return _rebalanceProgressStatsCurrentStep; + } + + public static boolean progressStatsDiffer(RebalanceProgressStats base, RebalanceProgressStats compare) { Review Comment: add and use the equals() method for the class? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java: ########## @@ -49,4 +52,29 @@ void onTrigger(Trigger trigger, Map<String, Map<String, String>> currentState, boolean isStopped(); RebalanceResult.Status getStopStatus(); + + class RebalanceContext { + private final long _estimatedAverageSegmentSizeInBytes; + private final Set<String> _uniqueSegmentList; Review Comment: nit: remove List suffix? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java: ########## @@ -284,4 +328,306 @@ public static TableRebalanceProgressStats.RebalanceStateStats getDifferenceBetwe (totalSegments == 0) ? 0 : ((double) rebalanceStats._segmentsToRebalance / totalSegments) * 100.0; return rebalanceStats; } + + /** + * Updates the overall progress stats based on the current step's progress stats. This should be called + * during the EV-IS convergence trigger to ensure the overall stats reflect the changes as they are made. + * @param rebalanceProgressStats the rebalance stats + * @param lastStepStats step level stats from the last iteration + * @param latestStepStats latest step level stats calculated in this iteration + * @return the newly calculated overall progress stats + */ + @VisibleForTesting + static TableRebalanceProgressStats.RebalanceProgressStats updateOverallProgressStatsFromStep( + TableRebalanceProgressStats rebalanceProgressStats, + TableRebalanceProgressStats.RebalanceProgressStats lastStepStats, + TableRebalanceProgressStats.RebalanceProgressStats latestStepStats) { + int numAdditionalSegmentsAdded = + latestStepStats._totalSegmentsToBeAdded - lastStepStats._totalSegmentsToBeAdded; + int numAdditionalSegmentsDeleted = + latestStepStats._totalSegmentsToBeDeleted - lastStepStats._totalSegmentsToBeDeleted; + int upperBoundOnSegmentsAdded = + lastStepStats._totalRemainingSegmentsToBeAdded > lastStepStats._totalSegmentsToBeAdded + ? lastStepStats._totalSegmentsToBeAdded : lastStepStats._totalRemainingSegmentsToBeAdded; + int numSegmentAddsProcessedInLastStep = Math.abs(upperBoundOnSegmentsAdded + - latestStepStats._totalRemainingSegmentsToBeAdded); + int upperBoundOnSegmentsDeleted = + lastStepStats._totalRemainingSegmentsToBeDeleted > lastStepStats._totalSegmentsToBeDeleted + ? lastStepStats._totalSegmentsToBeDeleted : lastStepStats._totalRemainingSegmentsToBeDeleted; + int numSegmentDeletesProcessedInLastStep = Math.abs(upperBoundOnSegmentsDeleted + - latestStepStats._totalRemainingSegmentsToBeDeleted); + int numberNewUntrackedSegmentsAdded = latestStepStats._totalUniqueNewUntrackedSegmentsDuringRebalance + - lastStepStats._totalUniqueNewUntrackedSegmentsDuringRebalance; + + TableRebalanceProgressStats.RebalanceProgressStats overallProgressStats = + rebalanceProgressStats.getRebalanceProgressStatsOverall(); + + TableRebalanceProgressStats.RebalanceProgressStats newOverallProgressStats = + new TableRebalanceProgressStats.RebalanceProgressStats(); + + newOverallProgressStats._totalSegmentsToBeAdded = overallProgressStats._totalSegmentsToBeAdded + + numAdditionalSegmentsAdded; + newOverallProgressStats._totalSegmentsToBeDeleted = overallProgressStats._totalSegmentsToBeDeleted + + numAdditionalSegmentsDeleted; + if (latestStepStats._totalCarryOverSegmentsToBeAdded > 0) { + newOverallProgressStats._totalRemainingSegmentsToBeAdded = overallProgressStats._totalRemainingSegmentsToBeAdded; + } else { + newOverallProgressStats._totalRemainingSegmentsToBeAdded = numAdditionalSegmentsAdded == 0 + ? overallProgressStats._totalRemainingSegmentsToBeAdded - numSegmentAddsProcessedInLastStep + : overallProgressStats._totalRemainingSegmentsToBeAdded + numSegmentAddsProcessedInLastStep; + } + newOverallProgressStats._totalCarryOverSegmentsToBeAdded = + latestStepStats._totalCarryOverSegmentsToBeAdded; + if (latestStepStats._totalCarryOverSegmentsToBeDeleted > 0) { + newOverallProgressStats._totalRemainingSegmentsToBeDeleted = + overallProgressStats._totalRemainingSegmentsToBeDeleted; + } else { + newOverallProgressStats._totalRemainingSegmentsToBeDeleted = numAdditionalSegmentsDeleted == 0 + ? overallProgressStats._totalRemainingSegmentsToBeDeleted - numSegmentDeletesProcessedInLastStep + : overallProgressStats._totalRemainingSegmentsToBeDeleted + numSegmentDeletesProcessedInLastStep; + } + newOverallProgressStats._totalCarryOverSegmentsToBeDeleted = + latestStepStats._totalCarryOverSegmentsToBeDeleted; + newOverallProgressStats._totalRemainingSegmentsToConverge = latestStepStats._totalRemainingSegmentsToConverge; + newOverallProgressStats._totalUniqueNewUntrackedSegmentsDuringRebalance = + overallProgressStats._totalUniqueNewUntrackedSegmentsDuringRebalance + numberNewUntrackedSegmentsAdded; + newOverallProgressStats._percentageTotalSegmentsAddsRemaining = + calculatePercentageChange(newOverallProgressStats._totalSegmentsToBeAdded, + newOverallProgressStats._totalRemainingSegmentsToBeAdded + + newOverallProgressStats._totalCarryOverSegmentsToBeAdded); + newOverallProgressStats._percentageTotalSegmentDeletesRemaining = + calculatePercentageChange(newOverallProgressStats._totalSegmentsToBeDeleted, + newOverallProgressStats._totalRemainingSegmentsToBeDeleted + + newOverallProgressStats._totalCarryOverSegmentsToBeDeleted); + // Calculate elapsed time based on start of rebalance (global) + newOverallProgressStats._estimatedTimeToCompleteAddsInSeconds = + calculateEstimatedTimeToCompleteChange(rebalanceProgressStats.getStartTimeMs(), + newOverallProgressStats._totalSegmentsToBeAdded, newOverallProgressStats._totalRemainingSegmentsToBeAdded); + newOverallProgressStats._estimatedTimeToCompleteDeletesInSeconds = + calculateEstimatedTimeToCompleteChange(rebalanceProgressStats.getStartTimeMs(), + newOverallProgressStats._totalSegmentsToBeDeleted, + newOverallProgressStats._totalRemainingSegmentsToBeDeleted); + newOverallProgressStats._averageSegmentSizeInBytes = overallProgressStats._averageSegmentSizeInBytes; + newOverallProgressStats._totalEstimatedDataToBeMovedInBytes = + overallProgressStats._totalEstimatedDataToBeMovedInBytes + + (numAdditionalSegmentsAdded * overallProgressStats._averageSegmentSizeInBytes); + newOverallProgressStats._startTimeMs = rebalanceProgressStats.getStartTimeMs(); + + return newOverallProgressStats; + } + + /** + * Calculates the progress stats for the given step or for the overall based on the trigger type + * @return the calculated step or progress stats + */ + @VisibleForTesting + static TableRebalanceProgressStats.RebalanceProgressStats calculateOverallProgressStats( Review Comment: I didn't figure out why need to pass in a `rebalanceProgressStats` to this calculate method? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -1145,8 +1174,8 @@ static boolean isExternalViewConverged(String tableNameWithType, } } - // For low disk mode, check if there are extra instances in ExternalView that are not in IdealState - if (lowDiskMode && externalViewInstanceStateMap != null) { + // Check if there are extra instances in ExternalView that are not in IdealState + if (externalViewInstanceStateMap != null) { Review Comment: maybe open a separate small PR for this fix for some closer reviews (and perhaps also include those simple typo fixes like segmentKey -> instanceName) ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java: ########## @@ -127,4 +163,77 @@ public static boolean statsDiffer(RebalanceStateStats base, RebalanceStateStats } return false; } + + // TODO: Clean this up once new stats are verified + public static class RebalanceStateStats { + public int _segmentsMissing; + public int _segmentsToRebalance; + public double _percentSegmentsToRebalance; + public int _replicasToRebalance; + + RebalanceStateStats() { + _segmentsMissing = 0; + _segmentsToRebalance = 0; + _replicasToRebalance = 0; + _percentSegmentsToRebalance = 0.0; + } + } + + // These rebalance stats specifically track the total segments added / deleted across all replicas + public static class RebalanceProgressStats { + // Total segments - across all replicas + @JsonProperty("totalSegmentsToBeAdded") + public int _totalSegmentsToBeAdded; + @JsonProperty("totalSegmentsToBeDeleted") + public int _totalSegmentsToBeDeleted; + // Total segments processed so far - across all replicas + @JsonProperty("totalRemainingSegmentsToBeAdded") + public int _totalRemainingSegmentsToBeAdded; + @JsonProperty("totalRemainingSegmentsToBeDeleted") + public int _totalRemainingSegmentsToBeDeleted; + @JsonProperty("totalRemainingSegmentsToConverge") + public int _totalRemainingSegmentsToConverge; + // Carry over stats - for when previous step's convergence doesn't complete and next step starts (bestEffort=true) + @JsonProperty("totalCarryOverSegmentsToBeAdded") + public int _totalCarryOverSegmentsToBeAdded; + @JsonProperty("totalCarryOverSegmentsToBeDeleted") + public int _totalCarryOverSegmentsToBeDeleted; + // Total new segments stats (not tracked by rebalance) + @JsonProperty("totalUniqueNewUntrackedSegmentsDuringRebalance") + public int _totalUniqueNewUntrackedSegmentsDuringRebalance; + // Derived stats + @JsonProperty("percentageTotalSegmentsAddsRemaining") + public double _percentageTotalSegmentsAddsRemaining; Review Comment: nit: _percentageTotalSegment`s`DeletesRemaining maybe refine the two var names a bit, e.g. `percentRemainingSegmentsToBeAdded` to look closer with `totalRemainingSegmentsToBeAdded` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java: ########## @@ -284,4 +328,306 @@ public static TableRebalanceProgressStats.RebalanceStateStats getDifferenceBetwe (totalSegments == 0) ? 0 : ((double) rebalanceStats._segmentsToRebalance / totalSegments) * 100.0; return rebalanceStats; } + + /** + * Updates the overall progress stats based on the current step's progress stats. This should be called + * during the EV-IS convergence trigger to ensure the overall stats reflect the changes as they are made. + * @param rebalanceProgressStats the rebalance stats + * @param lastStepStats step level stats from the last iteration + * @param latestStepStats latest step level stats calculated in this iteration + * @return the newly calculated overall progress stats + */ + @VisibleForTesting + static TableRebalanceProgressStats.RebalanceProgressStats updateOverallProgressStatsFromStep( Review Comment: make this a method of TableRebalanceProgressStats, so we can do `overall.update(last, latest)` iiuc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org