This is an automated email from the ASF dual-hosted git repository.

somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 764ae956b3 Add progress stats rollback support for rebalance progress 
stats on IdealState update failure (#15510)
764ae956b3 is described below

commit 764ae956b3bfffd33720312fd31d386f1ea6d60f
Author: Sonam Mandal <sonam.man...@startree.ai>
AuthorDate: Thu Apr 17 16:02:14 2025 -0700

    Add progress stats rollback support for rebalance progress stats on 
IdealState update failure (#15510)
---
 .../core/rebalance/NoOpTableRebalanceObserver.java |  4 +++
 .../core/rebalance/TableRebalanceObserver.java     |  2 ++
 .../rebalance/TableRebalanceProgressStats.java     | 18 ++++++++++++++
 .../helix/core/rebalance/TableRebalancer.java      | 12 ++++++---
 .../rebalance/ZkBasedTableRebalanceObserver.java   | 29 ++++++++++++++++++++++
 .../TestZkBasedTableRebalanceObserver.java         |  3 +++
 6 files changed, 65 insertions(+), 3 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java
index cdd632274d..8b11440e17 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java
@@ -41,6 +41,10 @@ public class NoOpTableRebalanceObserver implements 
TableRebalanceObserver {
   public void onError(String errorMsg) {
   }
 
+  @Override
+  public void onRollback() {
+  }
+
   @Override
   public boolean isStopped() {
     return false;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java
index 8bc2423824..68abcbad72 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java
@@ -49,6 +49,8 @@ public interface TableRebalanceObserver {
 
   void onError(String errorMsg);
 
+  void onRollback();
+
   boolean isStopped();
 
   RebalanceResult.Status getStopStatus();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java
index b8693dc5f4..050052e83a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java
@@ -339,6 +339,24 @@ public class TableRebalanceProgressStats {
       _startTimeMs = 0;
     }
 
+    RebalanceProgressStats(RebalanceProgressStats other) {
+      _totalSegmentsToBeAdded = other._totalSegmentsToBeAdded;
+      _totalSegmentsToBeDeleted = other._totalSegmentsToBeDeleted;
+      _totalRemainingSegmentsToBeAdded = 
other._totalRemainingSegmentsToBeAdded;
+      _totalRemainingSegmentsToBeDeleted = 
other._totalRemainingSegmentsToBeDeleted;
+      _totalRemainingSegmentsToConverge = 
other._totalRemainingSegmentsToConverge;
+      _totalCarryOverSegmentsToBeAdded = 
other._totalCarryOverSegmentsToBeAdded;
+      _totalCarryOverSegmentsToBeDeleted = 
other._totalCarryOverSegmentsToBeDeleted;
+      _totalUniqueNewUntrackedSegmentsDuringRebalance = 
other._totalUniqueNewUntrackedSegmentsDuringRebalance;
+      _percentageRemainingSegmentsToBeAdded = 
other._percentageRemainingSegmentsToBeAdded;
+      _percentageRemainingSegmentsToBeDeleted = 
other._percentageRemainingSegmentsToBeDeleted;
+      _estimatedTimeToCompleteAddsInSeconds = 
other._estimatedTimeToCompleteAddsInSeconds;
+      _estimatedTimeToCompleteDeletesInSeconds = 
other._estimatedTimeToCompleteDeletesInSeconds;
+      _averageSegmentSizeInBytes = other._averageSegmentSizeInBytes;
+      _totalEstimatedDataToBeMovedInBytes = 
other._totalEstimatedDataToBeMovedInBytes;
+      _startTimeMs = other._startTimeMs;
+    }
+
     @Override
     public boolean equals(Object o) {
       if (!(o instanceof RebalanceProgressStats)) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 6fbc866c0f..a4a368607c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -557,7 +557,8 @@ public class TableRebalancer {
               "No state change found for segments to be moved, re-calculating 
the target assignment based on the "
                   + "previous target assignment");
           Map<String, Map<String, String>> oldTargetAssignment = 
targetAssignment;
-          targetAssignment = new HashMap<>(currentAssignment);
+          // Other instance assignment code returns a TreeMap to keep it 
sorted, doing the same here
+          targetAssignment = new TreeMap<>(currentAssignment);
           for (String segment : segmentsToMove) {
             targetAssignment.put(segment, oldTargetAssignment.get(segment));
           }
@@ -583,8 +584,6 @@ public class TableRebalancer {
           allSegmentsFromIdealState, null);
       
_tableRebalanceObserver.onTrigger(TableRebalanceObserver.Trigger.IDEAL_STATE_CHANGE_TRIGGER,
 currentAssignment,
           targetAssignment, rebalanceContext);
-      // Update the segment list as the IDEAL_STATE_CHANGE_TRIGGER should've 
captured the newly added / deleted segments
-      allSegmentsFromIdealState = currentAssignment.keySet();
       if (_tableRebalanceObserver.isStopped()) {
         return new RebalanceResult(rebalanceJobId, 
_tableRebalanceObserver.getStopStatus(),
             "Rebalance has stopped already before updating the IdealState", 
instancePartitionsMap,
@@ -618,9 +617,16 @@ public class TableRebalancer {
             "Failed to update IdealState");
         currentAssignment = nextAssignment;
         expectedVersion++;
+        // IdealState update is successful. Update the segment list as the 
IDEAL_STATE_CHANGE_TRIGGER should have
+        // captured the newly added / deleted segments
+        allSegmentsFromIdealState = currentAssignment.keySet();
         tableRebalanceLogger.info("Successfully updated the IdealState");
       } catch (ZkBadVersionException e) {
         tableRebalanceLogger.info("Version changed while updating IdealState");
+        // Since IdealState wasn't updated, rollback the stats changes made 
and continue. There is no need to update
+        // segmentsToMonitor either since that hasn't changed without the 
IdealState update
+        _tableRebalanceObserver.onRollback();
+        continue;
       } catch (Exception e) {
         onReturnFailure("Caught exception while updating IdealState, aborting 
the rebalance", e, tableRebalanceLogger);
         return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED,
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
index dca82fe97b..125b0568bc 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
@@ -46,6 +46,10 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
   private final PinotHelixResourceManager _pinotHelixResourceManager;
   private final TableRebalanceProgressStats _tableRebalanceProgressStats;
   private final TableRebalanceContext _tableRebalanceContext;
+  // These previous stats are used for rollback scenarios where the IdealState 
update fails dure to a version
+  // change and the rebalance loop is retried.
+  private TableRebalanceProgressStats.RebalanceProgressStats 
_previousStepStats;
+  private TableRebalanceProgressStats.RebalanceProgressStats 
_previousOverallStats;
   private long _lastUpdateTimeMs;
   // Keep track of number of updates. Useful during debugging.
   private int _numUpdatesToZk;
@@ -64,6 +68,8 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
     _pinotHelixResourceManager = pinotHelixResourceManager;
     _tableRebalanceProgressStats = new TableRebalanceProgressStats();
     _tableRebalanceContext = tableRebalanceContext;
+    _previousStepStats = new 
TableRebalanceProgressStats.RebalanceProgressStats();
+    _previousOverallStats = new 
TableRebalanceProgressStats.RebalanceProgressStats();
     _numUpdatesToZk = 0;
     _controllerMetrics = ControllerMetrics.get();
   }
@@ -83,6 +89,10 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
         break;
       // Write to Zk if there's change since previous stats computation
       case IDEAL_STATE_CHANGE_TRIGGER:
+        // Update the previous stats with the current values in case a 
rollback is needed due to IdealState version
+        // change
+        _previousOverallStats = new 
TableRebalanceProgressStats.RebalanceProgressStats(
+            _tableRebalanceProgressStats.getRebalanceProgressStatsOverall());
         latest = getDifferenceBetweenTableRebalanceStates(targetState, 
currentState);
         latestProgress = calculateUpdatedProgressStats(targetState, 
currentState, rebalanceContext,
             Trigger.IDEAL_STATE_CHANGE_TRIGGER, _tableRebalanceProgressStats);
@@ -100,6 +110,12 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
         }
         break;
       case EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER:
+        // Update the previous stats with the current values in case a 
rollback is needed due to IdealState version
+        // change
+        _previousStepStats = new 
TableRebalanceProgressStats.RebalanceProgressStats(
+            
_tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep());
+        _previousOverallStats = new 
TableRebalanceProgressStats.RebalanceProgressStats(
+            _tableRebalanceProgressStats.getRebalanceProgressStatsOverall());
         latest = getDifferenceBetweenTableRebalanceStates(targetState, 
currentState);
         latestProgress = calculateUpdatedProgressStats(targetState, 
currentState, rebalanceContext,
             Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER, 
_tableRebalanceProgressStats);
@@ -118,6 +134,10 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
         }
         break;
       case NEXT_ASSINGMENT_CALCULATION_TRIGGER:
+        // Update the previous stats with the current values in case a 
rollback is needed due to IdealState version
+        // change
+        _previousStepStats = new 
TableRebalanceProgressStats.RebalanceProgressStats(
+            
_tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep());
         latestProgress = calculateUpdatedProgressStats(targetState, 
currentState, rebalanceContext,
             Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER, 
_tableRebalanceProgressStats);
         if 
(!_tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep().equals(latestProgress))
 {
@@ -191,6 +211,15 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
     trackStatsInZk();
   }
 
+  @Override
+  public void onRollback() {
+    _tableRebalanceProgressStats.setRebalanceProgressStatsCurrentStep(
+        new 
TableRebalanceProgressStats.RebalanceProgressStats(_previousStepStats));
+    _tableRebalanceProgressStats.setRebalanceProgressStatsOverall(
+        new 
TableRebalanceProgressStats.RebalanceProgressStats(_previousOverallStats));
+    trackStatsInZk();
+  }
+
   @Override
   public boolean isStopped() {
     return _isStopped;
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
index 130682fa6c..0d2f395d79 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
@@ -79,6 +79,9 @@ public class TestZkBasedTableRebalanceObserver {
         rebalanceContext);
     // Both of the changes above will update ZK for progress stats
     assertEquals(observer.getNumUpdatesToZk(), 4);
+    // Try a rollback and this should trigger a ZK update as well
+    observer.onRollback();
+    assertEquals(observer.getNumUpdatesToZk(), 5);
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to