This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new c3e118f1fa Fix up elapsed time calculation for rebalance progress stats when carry over are present (#15674) c3e118f1fa is described below commit c3e118f1fa737acde84e50b23d8c3c1d1979a5b6 Author: Sonam Mandal <sonam.man...@startree.ai> AuthorDate: Wed Apr 30 22:25:12 2025 -0700 Fix up elapsed time calculation for rebalance progress stats when carry over are present (#15674) * Fix up elapsed time calculation for rebalance progress stats when carry over * Fix extra commented line --- .../rebalance/TableRebalanceProgressStats.java | 4 +- .../rebalance/ZkBasedTableRebalanceObserver.java | 4 +- .../TestZkBasedTableRebalanceObserver.java | 125 +++++++++++++++++++++ 3 files changed, 130 insertions(+), 3 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java index 050052e83a..47d223804f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java @@ -242,7 +242,9 @@ public class TableRebalanceProgressStats { int remainingSegmentsToChange) { double elapsedTimeInSeconds = (double) (System.currentTimeMillis() - startTime) / 1000.0; int segmentsAlreadyChanged = totalSegmentsToChange - remainingSegmentsToChange; - return segmentsAlreadyChanged == 0 ? totalSegmentsToChange == 0 ? 0.0 : -1.0 + // If carry over + remaining segments to change are > total segments to change then number of segments already + // changed may be -ve, in which case we should just set the default value as we cannot measure elapsed time + return segmentsAlreadyChanged <= 0 ? totalSegmentsToChange == 0 ? 0.0 : -1.0 : (double) remainingSegmentsToChange / (double) segmentsAlreadyChanged * elapsedTimeInSeconds; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java index 125b0568bc..5eaa80e91d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java @@ -581,10 +581,10 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { startTimeMs = existingProgressStats._startTimeMs; progressStats._estimatedTimeToCompleteAddsInSeconds = TableRebalanceProgressStats.calculateEstimatedTimeToCompleteChange(startTimeMs, - progressStats._totalSegmentsToBeAdded, progressStats._totalRemainingSegmentsToBeAdded); + progressStats._totalSegmentsToBeAdded, totalSegmentsToBeAdded); progressStats._estimatedTimeToCompleteDeletesInSeconds = TableRebalanceProgressStats.calculateEstimatedTimeToCompleteChange(startTimeMs, - progressStats._totalSegmentsToBeDeleted, progressStats._totalRemainingSegmentsToBeDeleted); + progressStats._totalSegmentsToBeDeleted, totalSegmentsToBeDeleted); progressStats._averageSegmentSizeInBytes = existingProgressStats._averageSegmentSizeInBytes; progressStats._totalEstimatedDataToBeMovedInBytes = TableRebalanceProgressStats.calculateNewEstimatedDataToBeMovedInBytes( diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java index 0d2f395d79..7ae66a7b30 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java @@ -1293,4 +1293,129 @@ public class TestZkBasedTableRebalanceObserver { assertEquals(current, target); } + + @Test + void testElapsedTimeWithCarryOverProgressStats() { + long estimatedAverageSegmentSize = 1024; + + Map<String, Map<String, String>> currentIS = new TreeMap<>(); + currentIS.put("segment1", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1"), ONLINE)); + currentIS.put("segment2", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2"), ONLINE)); + currentIS.put("segment3", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4"), ONLINE)); + + // Assume that the current EV doesn't yet have the segments added as seen in currentIS + Map<String, Map<String, String>> currentEV = new TreeMap<>(); + + Map<String, Map<String, String>> targetIS = new TreeMap<>(); + targetIS.put("segment1", + SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4", "host5"), ONLINE)); + targetIS.put("segment2", + SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host4", "host5"), ONLINE)); + targetIS.put("segment3", + SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host4", "host5"), ONLINE)); + + TableRebalanceProgressStats tableRebalanceProgressStats = new TableRebalanceProgressStats(); + + // Initialize the start trigger with some change + Set<String> segmentSet = new HashSet<>(targetIS.keySet()); + TableRebalanceObserver.RebalanceContext rebalanceContext = new TableRebalanceObserver.RebalanceContext( + estimatedAverageSegmentSize, segmentSet, segmentSet); + TableRebalanceProgressStats.RebalanceProgressStats stats = + ZkBasedTableRebalanceObserver.calculateUpdatedProgressStats(targetIS, currentIS, rebalanceContext, + TableRebalanceObserver.Trigger.START_TRIGGER, tableRebalanceProgressStats); + assertEquals(stats._totalSegmentsToBeAdded, 11); + assertEquals(stats._totalSegmentsToBeDeleted, 2); + assertEquals(stats._totalRemainingSegmentsToBeAdded, 11); + assertEquals(stats._totalRemainingSegmentsToBeDeleted, 2); + assertEquals(stats._totalCarryOverSegmentsToBeAdded, 0); + assertEquals(stats._totalCarryOverSegmentsToBeDeleted, 0); + assertEquals(stats._totalRemainingSegmentsToConverge, 0); + assertEquals(stats._totalUniqueNewUntrackedSegmentsDuringRebalance, 0); + assertEquals(stats._percentageRemainingSegmentsToBeAdded, 100.0); + assertEquals(stats._percentageRemainingSegmentsToBeDeleted, 100.0); + assertEquals(stats._estimatedTimeToCompleteAddsInSeconds, -1.0); + assertEquals(stats._estimatedTimeToCompleteDeletesInSeconds, -1.0); + assertEquals(stats._averageSegmentSizeInBytes, estimatedAverageSegmentSize); + assertEquals(stats._totalEstimatedDataToBeMovedInBytes, estimatedAverageSegmentSize * 11); + tableRebalanceProgressStats.setRebalanceProgressStatsOverall(stats); + + // Next call EV-IS convergence (assume here that the IS is not yet updated like happens in actual rebalance) + // Since currentEV does not match currentIS, IS-EV convergence will detect some segments are carry over segments + // and wait for these to converge before moving on to the first IS update + // Also validate that the overall progress stats shows elapsed time as -1.0 instead of some random -ve time + segmentSet = new HashSet<>(targetIS.keySet()); + rebalanceContext = new TableRebalanceObserver.RebalanceContext(estimatedAverageSegmentSize, segmentSet, segmentSet); + stats = ZkBasedTableRebalanceObserver.calculateUpdatedProgressStats(currentIS, currentEV, rebalanceContext, + TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER, + tableRebalanceProgressStats); + tableRebalanceProgressStats.updateOverallAndStepStatsFromLatestStepStats(stats); + stats = tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep(); + assertEquals(stats._totalSegmentsToBeAdded, 0); + assertEquals(stats._totalSegmentsToBeDeleted, 0); + assertEquals(stats._totalRemainingSegmentsToBeAdded, 0); + assertEquals(stats._totalRemainingSegmentsToBeDeleted, 0); + assertEquals(stats._totalCarryOverSegmentsToBeAdded, 3); + assertEquals(stats._totalCarryOverSegmentsToBeDeleted, 0); + assertEquals(stats._totalRemainingSegmentsToConverge, 0); + assertEquals(stats._totalUniqueNewUntrackedSegmentsDuringRebalance, 0); + assertEquals(stats._percentageRemainingSegmentsToBeAdded, 0.0); + assertEquals(stats._percentageRemainingSegmentsToBeDeleted, 0.0); + assertEquals(stats._estimatedTimeToCompleteAddsInSeconds, 0.0); + assertEquals(stats._estimatedTimeToCompleteDeletesInSeconds, 0.0); + assertEquals(stats._averageSegmentSizeInBytes, 0); + assertEquals(stats._totalEstimatedDataToBeMovedInBytes, 0); + TableRebalanceProgressStats.RebalanceProgressStats overallStats = + tableRebalanceProgressStats.getRebalanceProgressStatsOverall(); + assertEquals(overallStats._totalSegmentsToBeAdded, 11); + assertEquals(overallStats._totalSegmentsToBeDeleted, 2); + assertEquals(overallStats._totalRemainingSegmentsToBeAdded, 11); + assertEquals(overallStats._totalRemainingSegmentsToBeDeleted, 2); + assertEquals(overallStats._totalCarryOverSegmentsToBeAdded, 3); + assertEquals(overallStats._totalCarryOverSegmentsToBeDeleted, 0); + assertEquals(overallStats._totalRemainingSegmentsToConverge, 0); + assertEquals(overallStats._totalUniqueNewUntrackedSegmentsDuringRebalance, 0); + assertEquals(overallStats._percentageRemainingSegmentsToBeAdded, 127.27272727272727); + assertEquals(overallStats._percentageRemainingSegmentsToBeDeleted, 100.0); + assertEquals(overallStats._estimatedTimeToCompleteAddsInSeconds, -1.0); + assertEquals(overallStats._estimatedTimeToCompleteDeletesInSeconds, -1.0); + assertEquals(overallStats._averageSegmentSizeInBytes, estimatedAverageSegmentSize); + assertEquals(overallStats._totalEstimatedDataToBeMovedInBytes, estimatedAverageSegmentSize * 11); + + // currentEV has converged to match currentIS, no more carry over segments should be seen + currentEV = new TreeMap<>(currentIS); + stats = ZkBasedTableRebalanceObserver.calculateUpdatedProgressStats(currentIS, currentEV, rebalanceContext, + TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER, + tableRebalanceProgressStats); + tableRebalanceProgressStats.updateOverallAndStepStatsFromLatestStepStats(stats); + stats = tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep(); + assertEquals(stats._totalSegmentsToBeAdded, 0); + assertEquals(stats._totalSegmentsToBeDeleted, 0); + assertEquals(stats._totalRemainingSegmentsToBeAdded, 0); + assertEquals(stats._totalRemainingSegmentsToBeDeleted, 0); + assertEquals(stats._totalCarryOverSegmentsToBeAdded, 0); + assertEquals(stats._totalCarryOverSegmentsToBeDeleted, 0); + assertEquals(stats._totalRemainingSegmentsToConverge, 0); + assertEquals(stats._totalUniqueNewUntrackedSegmentsDuringRebalance, 0); + assertEquals(stats._percentageRemainingSegmentsToBeAdded, 0.0); + assertEquals(stats._percentageRemainingSegmentsToBeDeleted, 0.0); + assertEquals(stats._estimatedTimeToCompleteAddsInSeconds, 0.0); + assertEquals(stats._estimatedTimeToCompleteDeletesInSeconds, 0.0); + assertEquals(stats._averageSegmentSizeInBytes, 0); + assertEquals(stats._totalEstimatedDataToBeMovedInBytes, 0); + overallStats = tableRebalanceProgressStats.getRebalanceProgressStatsOverall(); + assertEquals(overallStats._totalSegmentsToBeAdded, 11); + assertEquals(overallStats._totalSegmentsToBeDeleted, 2); + assertEquals(overallStats._totalRemainingSegmentsToBeAdded, 11); + assertEquals(overallStats._totalRemainingSegmentsToBeDeleted, 2); + assertEquals(overallStats._totalCarryOverSegmentsToBeAdded, 0); + assertEquals(overallStats._totalCarryOverSegmentsToBeDeleted, 0); + assertEquals(overallStats._totalRemainingSegmentsToConverge, 0); + assertEquals(overallStats._totalUniqueNewUntrackedSegmentsDuringRebalance, 0); + assertEquals(overallStats._percentageRemainingSegmentsToBeAdded, 100.0); + assertEquals(overallStats._percentageRemainingSegmentsToBeDeleted, 100.0); + assertEquals(overallStats._estimatedTimeToCompleteAddsInSeconds, -1.0); + assertEquals(overallStats._estimatedTimeToCompleteDeletesInSeconds, -1.0); + assertEquals(overallStats._averageSegmentSizeInBytes, estimatedAverageSegmentSize); + assertEquals(overallStats._totalEstimatedDataToBeMovedInBytes, estimatedAverageSegmentSize * 11); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org