This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new acb8f19d7f Fix rebalancer EV converge check for low disk mode (#14178)
acb8f19d7f is described below

commit acb8f19d7fbb810a0e554fc5ac206e2aefd0961b
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Tue Oct 8 14:56:40 2024 -0700

    Fix rebalancer EV converge check for low disk mode (#14178)
---
 .../helix/core/rebalance/TableRebalancer.java      |  71 ++++++----
 .../helix/core/rebalance/TableRebalancerTest.java  | 144 ++++++++++++---------
 2 files changed, 133 insertions(+), 82 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index ee6ad88dd5..293ada1da5 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -389,16 +389,15 @@ public class TableRebalancer {
     // 3. Check if the target assignment is reached. Rebalance is done if it 
is reached.
     // 4. Calculate the next assignment based on the current assignment, 
target assignment and min available replicas.
     // 5. Update the IdealState to the next assignment. If the IdealState 
changes before the update, go back to step 1.
+    //
+    // NOTE: Monitor the segments to be moved from both the previous round and 
this round to ensure the moved segments
+    //       in the previous round are also converged.
+    Set<String> segmentsToMonitor = new HashSet<>(segmentsToMove);
     while (true) {
       // Wait for ExternalView to converge before updating the next IdealState
-      // NOTE: Monitor the segments to be moved from both the previous round 
and this round to ensure the moved segments
-      //       in the previous round are also converged.
-      Set<String> segmentsToMonitor = new HashSet<>(segmentsToMove);
-      segmentsToMove = 
SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
-      segmentsToMonitor.addAll(segmentsToMove);
       IdealState idealState;
       try {
-        idealState = waitForExternalViewToConverge(tableNameWithType, 
bestEfforts, segmentsToMonitor,
+        idealState = waitForExternalViewToConverge(tableNameWithType, 
lowDiskMode, bestEfforts, segmentsToMonitor,
             externalViewCheckIntervalInMs, 
externalViewStabilizationTimeoutInMs);
       } catch (Exception e) {
         String errorMsg = String.format(
@@ -528,6 +527,10 @@ public class TableRebalancer {
             "Caught exception while updating IdealState: " + e, 
instancePartitionsMap, tierToInstancePartitionsMap,
             targetAssignment);
       }
+
+      segmentsToMonitor = new HashSet<>(segmentsToMove);
+      segmentsToMove = 
SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
+      segmentsToMonitor.addAll(segmentsToMove);
     }
   }
 
@@ -762,7 +765,7 @@ public class TableRebalancer {
     }
   }
 
-  private IdealState waitForExternalViewToConverge(String tableNameWithType, 
boolean bestEfforts,
+  private IdealState waitForExternalViewToConverge(String tableNameWithType, 
boolean lowDiskMode, boolean bestEfforts,
       Set<String> segmentsToMonitor, long externalViewCheckIntervalInMs, long 
externalViewStabilizationTimeoutInMs)
       throws InterruptedException, TimeoutException {
     long endTimeMs = System.currentTimeMillis() + 
externalViewStabilizationTimeoutInMs;
@@ -788,7 +791,7 @@ public class TableRebalancer {
                   _tableRebalanceObserver.getStopStatus()));
         }
         if (isExternalViewConverged(tableNameWithType, 
externalView.getRecord().getMapFields(),
-            idealState.getRecord().getMapFields(), bestEfforts, 
segmentsToMonitor)) {
+            idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, 
segmentsToMonitor)) {
           LOGGER.info("ExternalView converged for table: {}", 
tableNameWithType);
           return idealState;
         }
@@ -808,15 +811,21 @@ public class TableRebalancer {
   }
 
   /**
-   * NOTE: Only check the segments and instances in the IdealState. It is okay 
to have extra segments or instances in
-   * ExternalView as long as the instance states for all the segments in 
IdealState are reached. For ERROR state in
-   * ExternalView, if using best-efforts, log a warning and treat it as good 
state; if not, throw an exception to abort
-   * the rebalance because we are not able to get out of the ERROR state.
+   * NOTE:
+   * Only check the segments in the IdealState and being monitored. Extra 
segments in ExternalView are ignored because
+   * they are not managed by the rebalancer.
+   * For each segment checked:
+   * - In regular mode, it is okay to have extra instances in ExternalView as 
long as the instance states in IdealState
+   *   are reached.
+   * - In low disk mode, instance states in ExternalView must match IdealState 
to ensure the segments are deleted from
+   *   server before moving to the next assignment.
+   * For ERROR state in ExternalView, if using best-efforts, log a warning and 
treat it as good state; if not, throw an
+   * exception to abort the rebalance because we are not able to get out of 
the ERROR state.
    */
   @VisibleForTesting
   static boolean isExternalViewConverged(String tableNameWithType,
       Map<String, Map<String, String>> externalViewSegmentStates,
-      Map<String, Map<String, String>> idealStateSegmentStates, boolean 
bestEfforts,
+      Map<String, Map<String, String>> idealStateSegmentStates, boolean 
lowDiskMode, boolean bestEfforts,
       @Nullable Set<String> segmentsToMonitor) {
     for (Map.Entry<String, Map<String, String>> entry : 
idealStateSegmentStates.entrySet()) {
       String segmentName = entry.getKey();
@@ -843,15 +852,22 @@ public class TableRebalancer {
         String externalViewInstanceState = 
externalViewInstanceStateMap.get(instanceName);
         if (!idealStateInstanceState.equals(externalViewInstanceState)) {
           if (SegmentStateModel.ERROR.equals(externalViewInstanceState)) {
-            if (bestEfforts) {
-              LOGGER.warn(
-                  "Found ERROR instance: {} for segment: {}, table: {}, 
counting it as good state (best-efforts)",
-                  instanceName, segmentName, tableNameWithType);
-            } else {
-              LOGGER.warn("Found ERROR instance: {} for segment: {}, table: 
{}", instanceName, segmentName,
-                  tableNameWithType);
-              throw new IllegalStateException("Found segments in ERROR state");
-            }
+            handleErrorInstance(tableNameWithType, segmentName, instanceName, 
bestEfforts);
+          } else {
+            return false;
+          }
+        }
+      }
+
+      // For low disk mode, check if there are extra instances in ExternalView 
that are not in IdealState
+      if (lowDiskMode && externalViewInstanceStateMap != null) {
+        for (Map.Entry<String, String> instanceStateEntry : 
externalViewInstanceStateMap.entrySet()) {
+          String instanceName = instanceStateEntry.getKey();
+          if (idealStateInstanceStateMap.containsKey(instanceName)) {
+            continue;
+          }
+          if (SegmentStateModel.ERROR.equals(instanceStateEntry.getValue())) {
+            handleErrorInstance(tableNameWithType, segmentName, instanceName, 
bestEfforts);
           } else {
             return false;
           }
@@ -861,6 +877,17 @@ public class TableRebalancer {
     return true;
   }
 
+  private static void handleErrorInstance(String tableNameWithType, String 
segmentName, String instanceName,
+      boolean bestEfforts) {
+    if (bestEfforts) {
+      LOGGER.warn("Found ERROR instance: {} for segment: {}, table: {}, 
counting it as good state (best-efforts)",
+          instanceName, segmentName, tableNameWithType);
+    } else {
+      LOGGER.warn("Found ERROR instance: {} for segment: {}, table: {}", 
instanceName, segmentName, tableNameWithType);
+      throw new IllegalStateException("Found segments in ERROR state");
+    }
+  }
+
   /**
    * Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
    * the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
index ecf1e0feda..a9aeac5a76 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
@@ -1261,96 +1261,120 @@ public class TableRebalancerTest {
     String offlineTableName = "testTable_OFFLINE";
     Map<String, Map<String, String>> externalViewSegmentStates = new 
TreeMap<>();
     Map<String, Map<String, String>> idealStateSegmentStates = new TreeMap<>();
+    boolean[] falseAndTrue = new boolean[]{false, true};
 
     // Empty segment states should match
-    assertTrue(
-        TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
-            false, null));
-    assertTrue(
-        TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
-            true, null));
+    for (boolean lowDiskMode : falseAndTrue) {
+      for (boolean bestEfforts : falseAndTrue) {
+        assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
+            idealStateSegmentStates, lowDiskMode, bestEfforts, null));
+      }
+    }
 
     // Do not check segment that does not exist in IdealState
     Map<String, String> instanceStateMap = new TreeMap<>();
     instanceStateMap.put("instance1", ONLINE);
     externalViewSegmentStates.put("segment1", instanceStateMap);
-    assertTrue(
-        TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
-            false, null));
-    assertTrue(
-        TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
-            true, null));
+    for (boolean lowDiskMode : falseAndTrue) {
+      for (boolean bestEfforts : falseAndTrue) {
+        assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
+            idealStateSegmentStates, lowDiskMode, bestEfforts, null));
+      }
+    }
 
     // Do not check segment that is OFFLINE in IdealState
     instanceStateMap = new TreeMap<>();
     instanceStateMap.put("instance1", OFFLINE);
     idealStateSegmentStates.put("segment2", instanceStateMap);
-    assertTrue(
-        TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
-            false, null));
-    assertTrue(
-        TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
-            true, null));
+    for (boolean lowDiskMode : falseAndTrue) {
+      for (boolean bestEfforts : falseAndTrue) {
+        assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
+            idealStateSegmentStates, lowDiskMode, bestEfforts, null));
+      }
+    }
 
     // Should fail when a segment has CONSUMING instance in IdealState but 
does not exist in ExternalView
     instanceStateMap.put("instance2", CONSUMING);
-    assertFalse(
-        TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
-            false, null));
-    assertFalse(
-        TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
-            true, null));
+    for (boolean lowDiskMode : falseAndTrue) {
+      for (boolean bestEfforts : falseAndTrue) {
+        assertFalse(TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
+            idealStateSegmentStates, lowDiskMode, bestEfforts, null));
+      }
+    }
 
     // Should fail when instance state does not exist
     instanceStateMap = new TreeMap<>();
     externalViewSegmentStates.put("segment2", instanceStateMap);
-    assertFalse(
-        TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
-            false, null));
-    assertFalse(
-        TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
-            true, null));
+    for (boolean lowDiskMode : falseAndTrue) {
+      for (boolean bestEfforts : falseAndTrue) {
+        assertFalse(TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
+            idealStateSegmentStates, lowDiskMode, bestEfforts, null));
+      }
+    }
 
     // Should fail when instance state does not match
     instanceStateMap.put("instance2", OFFLINE);
-    assertFalse(
-        TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
-            false, null));
-    assertFalse(
-        TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
-            true, null));
+    for (boolean lowDiskMode : falseAndTrue) {
+      for (boolean bestEfforts : falseAndTrue) {
+        assertFalse(TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
+            idealStateSegmentStates, lowDiskMode, bestEfforts, null));
+      }
+    }
 
     // Should pass when instance state matches
     instanceStateMap.put("instance2", CONSUMING);
-    assertTrue(
-        TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
-            false, null));
-    assertTrue(
-        TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
-            true, null));
+    for (boolean lowDiskMode : falseAndTrue) {
+      for (boolean bestEfforts : falseAndTrue) {
+        assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
+            idealStateSegmentStates, lowDiskMode, bestEfforts, null));
+      }
+    }
 
-    // Should pass when there are extra instances in ExternalView
+    // When there are extra instances in ExternalView, should pass in regular 
mode but fail in low disk mode
     instanceStateMap.put("instance3", CONSUMING);
-    assertTrue(
-        TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
-            false, null));
-    assertTrue(
-        TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
-            true, null));
+    for (boolean bestEfforts : falseAndTrue) {
+      assertTrue(
+          TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
+              false, bestEfforts, null));
+      assertFalse(
+          TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
+              true, bestEfforts, null));
+    }
 
-    // Should throw exception when instance state is ERROR in ExternalView and 
best-efforts is disabled
+    // When instance state is ERROR in ExternalView, should fail in regular 
mode but pass in best-efforts mode
     instanceStateMap.put("instance2", ERROR);
-    try {
-      TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
-          false, null);
-      fail();
-    } catch (Exception e) {
-      // Expected
+    instanceStateMap.remove("instance3");
+    for (boolean lowDiskMode : falseAndTrue) {
+      try {
+        TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
+            lowDiskMode, false, null);
+        fail();
+      } catch (Exception e) {
+        // Expected
+      }
+      assertTrue(
+          TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
+              lowDiskMode, true, null));
     }
 
-    // Should pass when instance state is ERROR in ExternalView and 
best-efforts is enabled
-    assertTrue(
-        TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
-            true, null));
+    // When the extra instance is in ERROR state, should throw exception in 
low disk mode when best-efforts is disabled
+    instanceStateMap.put("instance2", CONSUMING);
+    instanceStateMap.put("instance3", ERROR);
+    for (boolean lowDiskMode : falseAndTrue) {
+      for (boolean bestEfforts : falseAndTrue) {
+        if (lowDiskMode && !bestEfforts) {
+          try {
+            TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
+                idealStateSegmentStates, true, false, null);
+            fail();
+          } catch (Exception e) {
+            // Expected
+          }
+        } else {
+          assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
+              idealStateSegmentStates, lowDiskMode, bestEfforts, null));
+        }
+      }
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to