This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new acb8f19d7f Fix rebalancer EV converge check for low disk mode (#14178) acb8f19d7f is described below commit acb8f19d7fbb810a0e554fc5ac206e2aefd0961b Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Tue Oct 8 14:56:40 2024 -0700 Fix rebalancer EV converge check for low disk mode (#14178) --- .../helix/core/rebalance/TableRebalancer.java | 71 ++++++---- .../helix/core/rebalance/TableRebalancerTest.java | 144 ++++++++++++--------- 2 files changed, 133 insertions(+), 82 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index ee6ad88dd5..293ada1da5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -389,16 +389,15 @@ public class TableRebalancer { // 3. Check if the target assignment is reached. Rebalance is done if it is reached. // 4. Calculate the next assignment based on the current assignment, target assignment and min available replicas. // 5. Update the IdealState to the next assignment. If the IdealState changes before the update, go back to step 1. + // + // NOTE: Monitor the segments to be moved from both the previous round and this round to ensure the moved segments + // in the previous round are also converged. + Set<String> segmentsToMonitor = new HashSet<>(segmentsToMove); while (true) { // Wait for ExternalView to converge before updating the next IdealState - // NOTE: Monitor the segments to be moved from both the previous round and this round to ensure the moved segments - // in the previous round are also converged. - Set<String> segmentsToMonitor = new HashSet<>(segmentsToMove); - segmentsToMove = SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment); - segmentsToMonitor.addAll(segmentsToMove); IdealState idealState; try { - idealState = waitForExternalViewToConverge(tableNameWithType, bestEfforts, segmentsToMonitor, + idealState = waitForExternalViewToConverge(tableNameWithType, lowDiskMode, bestEfforts, segmentsToMonitor, externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs); } catch (Exception e) { String errorMsg = String.format( @@ -528,6 +527,10 @@ public class TableRebalancer { "Caught exception while updating IdealState: " + e, instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment); } + + segmentsToMonitor = new HashSet<>(segmentsToMove); + segmentsToMove = SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment); + segmentsToMonitor.addAll(segmentsToMove); } } @@ -762,7 +765,7 @@ public class TableRebalancer { } } - private IdealState waitForExternalViewToConverge(String tableNameWithType, boolean bestEfforts, + private IdealState waitForExternalViewToConverge(String tableNameWithType, boolean lowDiskMode, boolean bestEfforts, Set<String> segmentsToMonitor, long externalViewCheckIntervalInMs, long externalViewStabilizationTimeoutInMs) throws InterruptedException, TimeoutException { long endTimeMs = System.currentTimeMillis() + externalViewStabilizationTimeoutInMs; @@ -788,7 +791,7 @@ public class TableRebalancer { _tableRebalanceObserver.getStopStatus())); } if (isExternalViewConverged(tableNameWithType, externalView.getRecord().getMapFields(), - idealState.getRecord().getMapFields(), bestEfforts, segmentsToMonitor)) { + idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, segmentsToMonitor)) { LOGGER.info("ExternalView converged for table: {}", tableNameWithType); return idealState; } @@ -808,15 +811,21 @@ public class TableRebalancer { } /** - * NOTE: Only check the segments and instances in the IdealState. It is okay to have extra segments or instances in - * ExternalView as long as the instance states for all the segments in IdealState are reached. For ERROR state in - * ExternalView, if using best-efforts, log a warning and treat it as good state; if not, throw an exception to abort - * the rebalance because we are not able to get out of the ERROR state. + * NOTE: + * Only check the segments in the IdealState and being monitored. Extra segments in ExternalView are ignored because + * they are not managed by the rebalancer. + * For each segment checked: + * - In regular mode, it is okay to have extra instances in ExternalView as long as the instance states in IdealState + * are reached. + * - In low disk mode, instance states in ExternalView must match IdealState to ensure the segments are deleted from + * server before moving to the next assignment. + * For ERROR state in ExternalView, if using best-efforts, log a warning and treat it as good state; if not, throw an + * exception to abort the rebalance because we are not able to get out of the ERROR state. */ @VisibleForTesting static boolean isExternalViewConverged(String tableNameWithType, Map<String, Map<String, String>> externalViewSegmentStates, - Map<String, Map<String, String>> idealStateSegmentStates, boolean bestEfforts, + Map<String, Map<String, String>> idealStateSegmentStates, boolean lowDiskMode, boolean bestEfforts, @Nullable Set<String> segmentsToMonitor) { for (Map.Entry<String, Map<String, String>> entry : idealStateSegmentStates.entrySet()) { String segmentName = entry.getKey(); @@ -843,15 +852,22 @@ public class TableRebalancer { String externalViewInstanceState = externalViewInstanceStateMap.get(instanceName); if (!idealStateInstanceState.equals(externalViewInstanceState)) { if (SegmentStateModel.ERROR.equals(externalViewInstanceState)) { - if (bestEfforts) { - LOGGER.warn( - "Found ERROR instance: {} for segment: {}, table: {}, counting it as good state (best-efforts)", - instanceName, segmentName, tableNameWithType); - } else { - LOGGER.warn("Found ERROR instance: {} for segment: {}, table: {}", instanceName, segmentName, - tableNameWithType); - throw new IllegalStateException("Found segments in ERROR state"); - } + handleErrorInstance(tableNameWithType, segmentName, instanceName, bestEfforts); + } else { + return false; + } + } + } + + // For low disk mode, check if there are extra instances in ExternalView that are not in IdealState + if (lowDiskMode && externalViewInstanceStateMap != null) { + for (Map.Entry<String, String> instanceStateEntry : externalViewInstanceStateMap.entrySet()) { + String instanceName = instanceStateEntry.getKey(); + if (idealStateInstanceStateMap.containsKey(instanceName)) { + continue; + } + if (SegmentStateModel.ERROR.equals(instanceStateEntry.getValue())) { + handleErrorInstance(tableNameWithType, segmentName, instanceName, bestEfforts); } else { return false; } @@ -861,6 +877,17 @@ public class TableRebalancer { return true; } + private static void handleErrorInstance(String tableNameWithType, String segmentName, String instanceName, + boolean bestEfforts) { + if (bestEfforts) { + LOGGER.warn("Found ERROR instance: {} for segment: {}, table: {}, counting it as good state (best-efforts)", + instanceName, segmentName, tableNameWithType); + } else { + LOGGER.warn("Found ERROR instance: {} for segment: {}, table: {}", instanceName, segmentName, tableNameWithType); + throw new IllegalStateException("Found segments in ERROR state"); + } + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java index ecf1e0feda..a9aeac5a76 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java @@ -1261,96 +1261,120 @@ public class TableRebalancerTest { String offlineTableName = "testTable_OFFLINE"; Map<String, Map<String, String>> externalViewSegmentStates = new TreeMap<>(); Map<String, Map<String, String>> idealStateSegmentStates = new TreeMap<>(); + boolean[] falseAndTrue = new boolean[]{false, true}; // Empty segment states should match - assertTrue( - TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, - false, null)); - assertTrue( - TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, - true, null)); + for (boolean lowDiskMode : falseAndTrue) { + for (boolean bestEfforts : falseAndTrue) { + assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, + idealStateSegmentStates, lowDiskMode, bestEfforts, null)); + } + } // Do not check segment that does not exist in IdealState Map<String, String> instanceStateMap = new TreeMap<>(); instanceStateMap.put("instance1", ONLINE); externalViewSegmentStates.put("segment1", instanceStateMap); - assertTrue( - TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, - false, null)); - assertTrue( - TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, - true, null)); + for (boolean lowDiskMode : falseAndTrue) { + for (boolean bestEfforts : falseAndTrue) { + assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, + idealStateSegmentStates, lowDiskMode, bestEfforts, null)); + } + } // Do not check segment that is OFFLINE in IdealState instanceStateMap = new TreeMap<>(); instanceStateMap.put("instance1", OFFLINE); idealStateSegmentStates.put("segment2", instanceStateMap); - assertTrue( - TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, - false, null)); - assertTrue( - TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, - true, null)); + for (boolean lowDiskMode : falseAndTrue) { + for (boolean bestEfforts : falseAndTrue) { + assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, + idealStateSegmentStates, lowDiskMode, bestEfforts, null)); + } + } // Should fail when a segment has CONSUMING instance in IdealState but does not exist in ExternalView instanceStateMap.put("instance2", CONSUMING); - assertFalse( - TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, - false, null)); - assertFalse( - TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, - true, null)); + for (boolean lowDiskMode : falseAndTrue) { + for (boolean bestEfforts : falseAndTrue) { + assertFalse(TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, + idealStateSegmentStates, lowDiskMode, bestEfforts, null)); + } + } // Should fail when instance state does not exist instanceStateMap = new TreeMap<>(); externalViewSegmentStates.put("segment2", instanceStateMap); - assertFalse( - TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, - false, null)); - assertFalse( - TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, - true, null)); + for (boolean lowDiskMode : falseAndTrue) { + for (boolean bestEfforts : falseAndTrue) { + assertFalse(TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, + idealStateSegmentStates, lowDiskMode, bestEfforts, null)); + } + } // Should fail when instance state does not match instanceStateMap.put("instance2", OFFLINE); - assertFalse( - TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, - false, null)); - assertFalse( - TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, - true, null)); + for (boolean lowDiskMode : falseAndTrue) { + for (boolean bestEfforts : falseAndTrue) { + assertFalse(TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, + idealStateSegmentStates, lowDiskMode, bestEfforts, null)); + } + } // Should pass when instance state matches instanceStateMap.put("instance2", CONSUMING); - assertTrue( - TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, - false, null)); - assertTrue( - TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, - true, null)); + for (boolean lowDiskMode : falseAndTrue) { + for (boolean bestEfforts : falseAndTrue) { + assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, + idealStateSegmentStates, lowDiskMode, bestEfforts, null)); + } + } - // Should pass when there are extra instances in ExternalView + // When there are extra instances in ExternalView, should pass in regular mode but fail in low disk mode instanceStateMap.put("instance3", CONSUMING); - assertTrue( - TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, - false, null)); - assertTrue( - TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, - true, null)); + for (boolean bestEfforts : falseAndTrue) { + assertTrue( + TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, + false, bestEfforts, null)); + assertFalse( + TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, + true, bestEfforts, null)); + } - // Should throw exception when instance state is ERROR in ExternalView and best-efforts is disabled + // When instance state is ERROR in ExternalView, should fail in regular mode but pass in best-efforts mode instanceStateMap.put("instance2", ERROR); - try { - TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, - false, null); - fail(); - } catch (Exception e) { - // Expected + instanceStateMap.remove("instance3"); + for (boolean lowDiskMode : falseAndTrue) { + try { + TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, + lowDiskMode, false, null); + fail(); + } catch (Exception e) { + // Expected + } + assertTrue( + TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, + lowDiskMode, true, null)); } - // Should pass when instance state is ERROR in ExternalView and best-efforts is enabled - assertTrue( - TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, - true, null)); + // When the extra instance is in ERROR state, should throw exception in low disk mode when best-efforts is disabled + instanceStateMap.put("instance2", CONSUMING); + instanceStateMap.put("instance3", ERROR); + for (boolean lowDiskMode : falseAndTrue) { + for (boolean bestEfforts : falseAndTrue) { + if (lowDiskMode && !bestEfforts) { + try { + TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, + idealStateSegmentStates, true, false, null); + fail(); + } catch (Exception e) { + // Expected + } + } else { + assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, + idealStateSegmentStates, lowDiskMode, bestEfforts, null)); + } + } + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org