somandal commented on code in PR #15266: URL: https://github.com/apache/pinot/pull/15266#discussion_r2025459940
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java: ########## @@ -284,4 +328,306 @@ public static TableRebalanceProgressStats.RebalanceStateStats getDifferenceBetwe (totalSegments == 0) ? 0 : ((double) rebalanceStats._segmentsToRebalance / totalSegments) * 100.0; return rebalanceStats; } + + /** + * Updates the overall progress stats based on the current step's progress stats. This should be called + * during the EV-IS convergence trigger to ensure the overall stats reflect the changes as they are made. + * @param rebalanceProgressStats the rebalance stats + * @param lastStepStats step level stats from the last iteration + * @param latestStepStats latest step level stats calculated in this iteration + * @return the newly calculated overall progress stats + */ + @VisibleForTesting + static TableRebalanceProgressStats.RebalanceProgressStats updateOverallProgressStatsFromStep( + TableRebalanceProgressStats rebalanceProgressStats, + TableRebalanceProgressStats.RebalanceProgressStats lastStepStats, + TableRebalanceProgressStats.RebalanceProgressStats latestStepStats) { + int numAdditionalSegmentsAdded = + latestStepStats._totalSegmentsToBeAdded - lastStepStats._totalSegmentsToBeAdded; + int numAdditionalSegmentsDeleted = + latestStepStats._totalSegmentsToBeDeleted - lastStepStats._totalSegmentsToBeDeleted; + int upperBoundOnSegmentsAdded = + lastStepStats._totalRemainingSegmentsToBeAdded > lastStepStats._totalSegmentsToBeAdded + ? lastStepStats._totalSegmentsToBeAdded : lastStepStats._totalRemainingSegmentsToBeAdded; + int numSegmentAddsProcessedInLastStep = Math.abs(upperBoundOnSegmentsAdded + - latestStepStats._totalRemainingSegmentsToBeAdded); + int upperBoundOnSegmentsDeleted = + lastStepStats._totalRemainingSegmentsToBeDeleted > lastStepStats._totalSegmentsToBeDeleted + ? lastStepStats._totalSegmentsToBeDeleted : lastStepStats._totalRemainingSegmentsToBeDeleted; + int numSegmentDeletesProcessedInLastStep = Math.abs(upperBoundOnSegmentsDeleted + - latestStepStats._totalRemainingSegmentsToBeDeleted); + int numberNewUntrackedSegmentsAdded = latestStepStats._totalUniqueNewUntrackedSegmentsDuringRebalance + - lastStepStats._totalUniqueNewUntrackedSegmentsDuringRebalance; + + TableRebalanceProgressStats.RebalanceProgressStats overallProgressStats = + rebalanceProgressStats.getRebalanceProgressStatsOverall(); + + TableRebalanceProgressStats.RebalanceProgressStats newOverallProgressStats = + new TableRebalanceProgressStats.RebalanceProgressStats(); + + newOverallProgressStats._totalSegmentsToBeAdded = overallProgressStats._totalSegmentsToBeAdded + + numAdditionalSegmentsAdded; + newOverallProgressStats._totalSegmentsToBeDeleted = overallProgressStats._totalSegmentsToBeDeleted + + numAdditionalSegmentsDeleted; + if (latestStepStats._totalCarryOverSegmentsToBeAdded > 0) { + newOverallProgressStats._totalRemainingSegmentsToBeAdded = overallProgressStats._totalRemainingSegmentsToBeAdded; + } else { + newOverallProgressStats._totalRemainingSegmentsToBeAdded = numAdditionalSegmentsAdded == 0 + ? overallProgressStats._totalRemainingSegmentsToBeAdded - numSegmentAddsProcessedInLastStep + : overallProgressStats._totalRemainingSegmentsToBeAdded + numSegmentAddsProcessedInLastStep; + } + newOverallProgressStats._totalCarryOverSegmentsToBeAdded = + latestStepStats._totalCarryOverSegmentsToBeAdded; + if (latestStepStats._totalCarryOverSegmentsToBeDeleted > 0) { + newOverallProgressStats._totalRemainingSegmentsToBeDeleted = + overallProgressStats._totalRemainingSegmentsToBeDeleted; + } else { + newOverallProgressStats._totalRemainingSegmentsToBeDeleted = numAdditionalSegmentsDeleted == 0 + ? overallProgressStats._totalRemainingSegmentsToBeDeleted - numSegmentDeletesProcessedInLastStep + : overallProgressStats._totalRemainingSegmentsToBeDeleted + numSegmentDeletesProcessedInLastStep; + } + newOverallProgressStats._totalCarryOverSegmentsToBeDeleted = + latestStepStats._totalCarryOverSegmentsToBeDeleted; + newOverallProgressStats._totalRemainingSegmentsToConverge = latestStepStats._totalRemainingSegmentsToConverge; + newOverallProgressStats._totalUniqueNewUntrackedSegmentsDuringRebalance = + overallProgressStats._totalUniqueNewUntrackedSegmentsDuringRebalance + numberNewUntrackedSegmentsAdded; + newOverallProgressStats._percentageTotalSegmentsAddsRemaining = + calculatePercentageChange(newOverallProgressStats._totalSegmentsToBeAdded, + newOverallProgressStats._totalRemainingSegmentsToBeAdded + + newOverallProgressStats._totalCarryOverSegmentsToBeAdded); + newOverallProgressStats._percentageTotalSegmentDeletesRemaining = + calculatePercentageChange(newOverallProgressStats._totalSegmentsToBeDeleted, + newOverallProgressStats._totalRemainingSegmentsToBeDeleted + + newOverallProgressStats._totalCarryOverSegmentsToBeDeleted); + // Calculate elapsed time based on start of rebalance (global) + newOverallProgressStats._estimatedTimeToCompleteAddsInSeconds = + calculateEstimatedTimeToCompleteChange(rebalanceProgressStats.getStartTimeMs(), + newOverallProgressStats._totalSegmentsToBeAdded, newOverallProgressStats._totalRemainingSegmentsToBeAdded); + newOverallProgressStats._estimatedTimeToCompleteDeletesInSeconds = + calculateEstimatedTimeToCompleteChange(rebalanceProgressStats.getStartTimeMs(), + newOverallProgressStats._totalSegmentsToBeDeleted, + newOverallProgressStats._totalRemainingSegmentsToBeDeleted); + newOverallProgressStats._averageSegmentSizeInBytes = overallProgressStats._averageSegmentSizeInBytes; + newOverallProgressStats._totalEstimatedDataToBeMovedInBytes = + overallProgressStats._totalEstimatedDataToBeMovedInBytes + + (numAdditionalSegmentsAdded * overallProgressStats._averageSegmentSizeInBytes); + newOverallProgressStats._startTimeMs = rebalanceProgressStats.getStartTimeMs(); + + return newOverallProgressStats; + } + + /** + * Calculates the progress stats for the given step or for the overall based on the trigger type + * @return the calculated step or progress stats + */ + @VisibleForTesting + static TableRebalanceProgressStats.RebalanceProgressStats calculateOverallProgressStats( Review Comment: I use `rebalanceProgressStats` for the following: - For the `IDEAL_STATE_CHANGE_TRIGGER` and `EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER`, I get the current overall or step-level stats to do the calculations based on the previous values. - I also fetch the rebalance `startTimeMs` for the estimated time to complete calculations -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org