This is an automated email from the ASF dual-hosted git repository. somandal pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 764ae956b3 Add progress stats rollback support for rebalance progress stats on IdealState update failure (#15510) 764ae956b3 is described below commit 764ae956b3bfffd33720312fd31d386f1ea6d60f Author: Sonam Mandal <sonam.man...@startree.ai> AuthorDate: Thu Apr 17 16:02:14 2025 -0700 Add progress stats rollback support for rebalance progress stats on IdealState update failure (#15510) --- .../core/rebalance/NoOpTableRebalanceObserver.java | 4 +++ .../core/rebalance/TableRebalanceObserver.java | 2 ++ .../rebalance/TableRebalanceProgressStats.java | 18 ++++++++++++++ .../helix/core/rebalance/TableRebalancer.java | 12 ++++++--- .../rebalance/ZkBasedTableRebalanceObserver.java | 29 ++++++++++++++++++++++ .../TestZkBasedTableRebalanceObserver.java | 3 +++ 6 files changed, 65 insertions(+), 3 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java index cdd632274d..8b11440e17 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java @@ -41,6 +41,10 @@ public class NoOpTableRebalanceObserver implements TableRebalanceObserver { public void onError(String errorMsg) { } + @Override + public void onRollback() { + } + @Override public boolean isStopped() { return false; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java index 8bc2423824..68abcbad72 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java @@ -49,6 +49,8 @@ public interface TableRebalanceObserver { void onError(String errorMsg); + void onRollback(); + boolean isStopped(); RebalanceResult.Status getStopStatus(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java index b8693dc5f4..050052e83a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java @@ -339,6 +339,24 @@ public class TableRebalanceProgressStats { _startTimeMs = 0; } + RebalanceProgressStats(RebalanceProgressStats other) { + _totalSegmentsToBeAdded = other._totalSegmentsToBeAdded; + _totalSegmentsToBeDeleted = other._totalSegmentsToBeDeleted; + _totalRemainingSegmentsToBeAdded = other._totalRemainingSegmentsToBeAdded; + _totalRemainingSegmentsToBeDeleted = other._totalRemainingSegmentsToBeDeleted; + _totalRemainingSegmentsToConverge = other._totalRemainingSegmentsToConverge; + _totalCarryOverSegmentsToBeAdded = other._totalCarryOverSegmentsToBeAdded; + _totalCarryOverSegmentsToBeDeleted = other._totalCarryOverSegmentsToBeDeleted; + _totalUniqueNewUntrackedSegmentsDuringRebalance = other._totalUniqueNewUntrackedSegmentsDuringRebalance; + _percentageRemainingSegmentsToBeAdded = other._percentageRemainingSegmentsToBeAdded; + _percentageRemainingSegmentsToBeDeleted = other._percentageRemainingSegmentsToBeDeleted; + _estimatedTimeToCompleteAddsInSeconds = other._estimatedTimeToCompleteAddsInSeconds; + _estimatedTimeToCompleteDeletesInSeconds = other._estimatedTimeToCompleteDeletesInSeconds; + _averageSegmentSizeInBytes = other._averageSegmentSizeInBytes; + _totalEstimatedDataToBeMovedInBytes = other._totalEstimatedDataToBeMovedInBytes; + _startTimeMs = other._startTimeMs; + } + @Override public boolean equals(Object o) { if (!(o instanceof RebalanceProgressStats)) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index 6fbc866c0f..a4a368607c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -557,7 +557,8 @@ public class TableRebalancer { "No state change found for segments to be moved, re-calculating the target assignment based on the " + "previous target assignment"); Map<String, Map<String, String>> oldTargetAssignment = targetAssignment; - targetAssignment = new HashMap<>(currentAssignment); + // Other instance assignment code returns a TreeMap to keep it sorted, doing the same here + targetAssignment = new TreeMap<>(currentAssignment); for (String segment : segmentsToMove) { targetAssignment.put(segment, oldTargetAssignment.get(segment)); } @@ -583,8 +584,6 @@ public class TableRebalancer { allSegmentsFromIdealState, null); _tableRebalanceObserver.onTrigger(TableRebalanceObserver.Trigger.IDEAL_STATE_CHANGE_TRIGGER, currentAssignment, targetAssignment, rebalanceContext); - // Update the segment list as the IDEAL_STATE_CHANGE_TRIGGER should've captured the newly added / deleted segments - allSegmentsFromIdealState = currentAssignment.keySet(); if (_tableRebalanceObserver.isStopped()) { return new RebalanceResult(rebalanceJobId, _tableRebalanceObserver.getStopStatus(), "Rebalance has stopped already before updating the IdealState", instancePartitionsMap, @@ -618,9 +617,16 @@ public class TableRebalancer { "Failed to update IdealState"); currentAssignment = nextAssignment; expectedVersion++; + // IdealState update is successful. Update the segment list as the IDEAL_STATE_CHANGE_TRIGGER should have + // captured the newly added / deleted segments + allSegmentsFromIdealState = currentAssignment.keySet(); tableRebalanceLogger.info("Successfully updated the IdealState"); } catch (ZkBadVersionException e) { tableRebalanceLogger.info("Version changed while updating IdealState"); + // Since IdealState wasn't updated, rollback the stats changes made and continue. There is no need to update + // segmentsToMonitor either since that hasn't changed without the IdealState update + _tableRebalanceObserver.onRollback(); + continue; } catch (Exception e) { onReturnFailure("Caught exception while updating IdealState, aborting the rebalance", e, tableRebalanceLogger); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java index dca82fe97b..125b0568bc 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java @@ -46,6 +46,10 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { private final PinotHelixResourceManager _pinotHelixResourceManager; private final TableRebalanceProgressStats _tableRebalanceProgressStats; private final TableRebalanceContext _tableRebalanceContext; + // These previous stats are used for rollback scenarios where the IdealState update fails dure to a version + // change and the rebalance loop is retried. + private TableRebalanceProgressStats.RebalanceProgressStats _previousStepStats; + private TableRebalanceProgressStats.RebalanceProgressStats _previousOverallStats; private long _lastUpdateTimeMs; // Keep track of number of updates. Useful during debugging. private int _numUpdatesToZk; @@ -64,6 +68,8 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { _pinotHelixResourceManager = pinotHelixResourceManager; _tableRebalanceProgressStats = new TableRebalanceProgressStats(); _tableRebalanceContext = tableRebalanceContext; + _previousStepStats = new TableRebalanceProgressStats.RebalanceProgressStats(); + _previousOverallStats = new TableRebalanceProgressStats.RebalanceProgressStats(); _numUpdatesToZk = 0; _controllerMetrics = ControllerMetrics.get(); } @@ -83,6 +89,10 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { break; // Write to Zk if there's change since previous stats computation case IDEAL_STATE_CHANGE_TRIGGER: + // Update the previous stats with the current values in case a rollback is needed due to IdealState version + // change + _previousOverallStats = new TableRebalanceProgressStats.RebalanceProgressStats( + _tableRebalanceProgressStats.getRebalanceProgressStatsOverall()); latest = getDifferenceBetweenTableRebalanceStates(targetState, currentState); latestProgress = calculateUpdatedProgressStats(targetState, currentState, rebalanceContext, Trigger.IDEAL_STATE_CHANGE_TRIGGER, _tableRebalanceProgressStats); @@ -100,6 +110,12 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { } break; case EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER: + // Update the previous stats with the current values in case a rollback is needed due to IdealState version + // change + _previousStepStats = new TableRebalanceProgressStats.RebalanceProgressStats( + _tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep()); + _previousOverallStats = new TableRebalanceProgressStats.RebalanceProgressStats( + _tableRebalanceProgressStats.getRebalanceProgressStatsOverall()); latest = getDifferenceBetweenTableRebalanceStates(targetState, currentState); latestProgress = calculateUpdatedProgressStats(targetState, currentState, rebalanceContext, Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER, _tableRebalanceProgressStats); @@ -118,6 +134,10 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { } break; case NEXT_ASSINGMENT_CALCULATION_TRIGGER: + // Update the previous stats with the current values in case a rollback is needed due to IdealState version + // change + _previousStepStats = new TableRebalanceProgressStats.RebalanceProgressStats( + _tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep()); latestProgress = calculateUpdatedProgressStats(targetState, currentState, rebalanceContext, Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER, _tableRebalanceProgressStats); if (!_tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep().equals(latestProgress)) { @@ -191,6 +211,15 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { trackStatsInZk(); } + @Override + public void onRollback() { + _tableRebalanceProgressStats.setRebalanceProgressStatsCurrentStep( + new TableRebalanceProgressStats.RebalanceProgressStats(_previousStepStats)); + _tableRebalanceProgressStats.setRebalanceProgressStatsOverall( + new TableRebalanceProgressStats.RebalanceProgressStats(_previousOverallStats)); + trackStatsInZk(); + } + @Override public boolean isStopped() { return _isStopped; diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java index 130682fa6c..0d2f395d79 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java @@ -79,6 +79,9 @@ public class TestZkBasedTableRebalanceObserver { rebalanceContext); // Both of the changes above will update ZK for progress stats assertEquals(observer.getNumUpdatesToZk(), 4); + // Try a rollback and this should trigger a ZK update as well + observer.onRollback(); + assertEquals(observer.getNumUpdatesToZk(), 5); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org