This is an automated email from the ASF dual-hosted git repository. yashmayya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 0c880dceb7 Minor cleanups in rebalance related code (#15806) 0c880dceb7 is described below commit 0c880dceb7fcf4cc5268b7ced19d54ef9f03c36f Author: Yash Mayya <yash.ma...@gmail.com> AuthorDate: Thu May 15 16:44:10 2025 +0100 Minor cleanups in rebalance related code (#15806) --- .../core/rebalance/RebalanceJobConstants.java | 4 +- .../core/rebalance/TableRebalanceObserver.java | 4 +- .../helix/core/rebalance/TableRebalancer.java | 58 +++++------ .../rebalance/ZkBasedTableRebalanceObserver.java | 10 +- .../helix/core/relocation/SegmentRelocator.java | 13 ++- .../helix/core/rebalance/TableRebalancerTest.java | 114 ++++++++++++--------- .../TestZkBasedTableRebalanceObserver.java | 18 ++-- 7 files changed, 116 insertions(+), 105 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceJobConstants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceJobConstants.java index 68314bbba2..41e7ccecf4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceJobConstants.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceJobConstants.java @@ -22,8 +22,8 @@ public class RebalanceJobConstants { private RebalanceJobConstants() { } - // Progress status of the rebalance operartion + // Progress status of the rebalance operation public static final String JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS = "REBALANCE_PROGRESS_STATS"; - // Configs to retry the rebalance operartion + // Configs to retry the rebalance operation public static final String JOB_METADATA_KEY_REBALANCE_CONTEXT = "REBALANCE_CONTEXT"; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java index 68abcbad72..0579b2022b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java @@ -25,7 +25,7 @@ import javax.annotation.Nullable; /** * The <code>TableRebalanceObserver</code> interface provides callbacks to take actions - * during critical triggers. The 3 main triggers during a rebalance operation are show below. + * during critical triggers. The 4 main triggers during a rebalance operation are shown below. * For example, we can track stats + status of rebalance during these triggers. */ public interface TableRebalanceObserver { @@ -37,7 +37,7 @@ public interface TableRebalanceObserver { // Ideal state changes due to external events and new target for rebalance is computed IDEAL_STATE_CHANGE_TRIGGER, // Next assignment calculation change trigger which calculates next assignment to act on - NEXT_ASSINGMENT_CALCULATION_TRIGGER, + NEXT_ASSIGNMENT_CALCULATION_TRIGGER, } void onTrigger(Trigger trigger, Map<String, Map<String, String>> currentState, diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index 1ca1bc266c..eb3f12d13d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -606,7 +606,7 @@ public class TableRebalancer { SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment, nextAssignment)); // Record change of current ideal state and the next assignment - _tableRebalanceObserver.onTrigger(TableRebalanceObserver.Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER, + _tableRebalanceObserver.onTrigger(TableRebalanceObserver.Trigger.NEXT_ASSIGNMENT_CALCULATION_TRIGGER, currentAssignment, nextAssignment, rebalanceContext); if (_tableRebalanceObserver.isStopped()) { return new RebalanceResult(rebalanceJobId, _tableRebalanceObserver.getStopStatus(), @@ -906,7 +906,7 @@ public class TableRebalancer { Map<String, Integer> consumingSegmentsOffsetsToCatchUp = getConsumingSegmentsOffsetsToCatchUp(tableConfig, consumingSegmentZKmetadata, tableRebalanceLogger); Map<String, Integer> consumingSegmentsAge = - getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata, tableRebalanceLogger); + getConsumingSegmentsAge(consumingSegmentZKmetadata, tableRebalanceLogger); Map<String, Integer> consumingSegmentsOffsetsToCatchUpTopN; Map<String, RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer> @@ -958,8 +958,8 @@ public class TableRebalancer { * segment name to the age of that consuming segment. Return null if failed to obtain info for any consuming segment. */ @Nullable - private Map<String, Integer> getConsumingSegmentsAge(String tableNameWithType, - Map<String, SegmentZKMetadata> consumingSegmentZKMetadata, Logger tableRebalanceLogger) { + private Map<String, Integer> getConsumingSegmentsAge(Map<String, SegmentZKMetadata> consumingSegmentZKMetadata, + Logger tableRebalanceLogger) { Map<String, Integer> consumingSegmentsAge = new HashMap<>(); long now = System.currentTimeMillis(); try { @@ -1344,18 +1344,17 @@ public class TableRebalancer { String.format("Rebalance has already stopped with status: %s", _tableRebalanceObserver.getStopStatus())); } - if (isExternalViewConverged(tableNameWithType, externalView.getRecord().getMapFields(), - idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, segmentsToMonitor, - tableRebalanceLogger)) { + if (isExternalViewConverged(externalView.getRecord().getMapFields(), idealState.getRecord().getMapFields(), + lowDiskMode, bestEfforts, segmentsToMonitor, tableRebalanceLogger)) { tableRebalanceLogger.info("ExternalView converged in {}ms, with {} extensions", System.currentTimeMillis() - startTimeMs, extensionCount); return idealState; } if (previousRemainingSegments < 0) { // initialize previousRemainingSegments - previousRemainingSegments = getNumRemainingSegmentReplicasToProcess(tableNameWithType, - externalView.getRecord().getMapFields(), idealState.getRecord().getMapFields(), lowDiskMode, - bestEfforts, segmentsToMonitor, tableRebalanceLogger, false); + previousRemainingSegments = getNumRemainingSegmentReplicasToProcess(externalView.getRecord().getMapFields(), + idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, segmentsToMonitor, + tableRebalanceLogger, false); tableRebalanceLogger.info("Remaining {} segment replicas to be processed.", previousRemainingSegments); } } @@ -1371,9 +1370,9 @@ public class TableRebalancer { externalViewStabilizationTimeoutInMs)); } - int currentRemainingSegments = getNumRemainingSegmentReplicasToProcess(tableNameWithType, - externalView.getRecord().getMapFields(), idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, - segmentsToMonitor, tableRebalanceLogger, false); + int currentRemainingSegments = getNumRemainingSegmentReplicasToProcess(externalView.getRecord().getMapFields(), + idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, segmentsToMonitor, tableRebalanceLogger, + false); // It is possible that remainingSegments increases so that currentRemainingSegments > previousRemainingSegments, // likely due to CONSUMING segments committing, where the state of the segment change to ONLINE. Therefore, if @@ -1403,41 +1402,37 @@ public class TableRebalancer { } @VisibleForTesting - static boolean isExternalViewConverged(String tableNameWithType, - Map<String, Map<String, String>> externalViewSegmentStates, + static boolean isExternalViewConverged(Map<String, Map<String, String>> externalViewSegmentStates, Map<String, Map<String, String>> idealStateSegmentStates, boolean lowDiskMode, boolean bestEfforts, @Nullable Set<String> segmentsToMonitor) { - return - getNumRemainingSegmentReplicasToProcess(tableNameWithType, externalViewSegmentStates, idealStateSegmentStates, - lowDiskMode, bestEfforts, segmentsToMonitor, LOGGER, true) == 0; + return getNumRemainingSegmentReplicasToProcess(externalViewSegmentStates, idealStateSegmentStates, lowDiskMode, + bestEfforts, segmentsToMonitor, LOGGER, true) == 0; } /** * Check if the external view has converged to the ideal state. See `getNumRemainingSegmentReplicasToProcess` for * details on how the convergence is determined. */ - private static boolean isExternalViewConverged(String tableNameWithType, - Map<String, Map<String, String>> externalViewSegmentStates, + private static boolean isExternalViewConverged(Map<String, Map<String, String>> externalViewSegmentStates, Map<String, Map<String, String>> idealStateSegmentStates, boolean lowDiskMode, boolean bestEfforts, @Nullable Set<String> segmentsToMonitor, Logger tableRebalanceLogger) { - return - getNumRemainingSegmentReplicasToProcess(tableNameWithType, externalViewSegmentStates, idealStateSegmentStates, - lowDiskMode, bestEfforts, segmentsToMonitor, tableRebalanceLogger, true) == 0; + return getNumRemainingSegmentReplicasToProcess(externalViewSegmentStates, idealStateSegmentStates, lowDiskMode, + bestEfforts, segmentsToMonitor, tableRebalanceLogger, true) == 0; } @VisibleForTesting - static int getNumRemainingSegmentReplicasToProcess(String tableNameWithType, - Map<String, Map<String, String>> externalViewSegmentStates, + static int getNumRemainingSegmentReplicasToProcess(Map<String, Map<String, String>> externalViewSegmentStates, Map<String, Map<String, String>> idealStateSegmentStates, boolean lowDiskMode, boolean bestEfforts, @Nullable Set<String> segmentsToMonitor) { - return getNumRemainingSegmentReplicasToProcess(tableNameWithType, externalViewSegmentStates, - idealStateSegmentStates, lowDiskMode, bestEfforts, segmentsToMonitor, LOGGER, false); + return getNumRemainingSegmentReplicasToProcess(externalViewSegmentStates, idealStateSegmentStates, lowDiskMode, + bestEfforts, segmentsToMonitor, LOGGER, false); } /** * If `earlyReturn=false`, it returns the number of segment replicas that are not in the expected state. * If `earlyReturn=true` it returns 1 if the number of said segment replicas are more than 0, returns 0 otherwise, * which is used to check whether the ExternalView has converged to the IdealState. + * <p> * The method checks the following: * Only the segments in the IdealState and being monitored. Extra segments in ExternalView are ignored * because they are not managed by the rebalancer. @@ -1449,15 +1444,16 @@ public class TableRebalancer { * <li> The instance appears in IS instance map is not in the EV instance map, unless the IS instance state is OFFLINE * <li> The instance has different states between IS and EV instance map, unless the IS instance state is OFFLINE * </ul> + * <p> * If `lowDiskMode=true`, go through the instance map from ExternalView and compare it with the one in IdealState, - * and also increment the number of remaining segment replicas to process if: - * - The instance appears in EV instance map does not appear in the IS instance map + * and also increment the number of remaining segment replicas to process if the instance appears in EV instance map + * does not appear in the IS instance map. + * <p> * Once there's an ERROR state for any instance in ExternalView, throw an exception to abort the rebalance because * we are not able to get out of the ERROR state, unless `bestEfforts=true`, in which case, log a warning and keep * going as if that instance has converged. */ - private static int getNumRemainingSegmentReplicasToProcess(String tableNameWithType, - Map<String, Map<String, String>> externalViewSegmentStates, + private static int getNumRemainingSegmentReplicasToProcess(Map<String, Map<String, String>> externalViewSegmentStates, Map<String, Map<String, String>> idealStateSegmentStates, boolean lowDiskMode, boolean bestEfforts, @Nullable Set<String> segmentsToMonitor, Logger tableRebalanceLogger, boolean earlyReturn) { int remainingSegmentReplicasToProcess = 0; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java index 37da8bd9ec..042be1a93d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java @@ -46,7 +46,7 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { private final PinotHelixResourceManager _pinotHelixResourceManager; private final TableRebalanceProgressStats _tableRebalanceProgressStats; private final TableRebalanceContext _tableRebalanceContext; - // These previous stats are used for rollback scenarios where the IdealState update fails dure to a version + // These previous stats are used for rollback scenarios where the IdealState update fails due to a version // change and the rebalance loop is retried. private TableRebalanceProgressStats.RebalanceProgressStats _previousStepStats; private TableRebalanceProgressStats.RebalanceProgressStats _previousOverallStats; @@ -136,13 +136,13 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { updatedStatsInZk = true; } break; - case NEXT_ASSINGMENT_CALCULATION_TRIGGER: + case NEXT_ASSIGNMENT_CALCULATION_TRIGGER: // Update the previous stats with the current values in case a rollback is needed due to IdealState version // change _previousStepStats = new TableRebalanceProgressStats.RebalanceProgressStats( _tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep()); latestProgress = calculateUpdatedProgressStats(targetState, currentState, rebalanceContext, - Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER, _tableRebalanceProgressStats); + Trigger.NEXT_ASSIGNMENT_CALCULATION_TRIGGER, _tableRebalanceProgressStats); if (!_tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep().equals(latestProgress)) { _tableRebalanceProgressStats.setRebalanceProgressStatsCurrentStep(latestProgress); trackStatsInZk(); @@ -524,7 +524,7 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { new TableRebalanceProgressStats.RebalanceProgressStats(); switch (trigger) { case START_TRIGGER: - case NEXT_ASSINGMENT_CALCULATION_TRIGGER: + case NEXT_ASSIGNMENT_CALCULATION_TRIGGER: // These are initialization steps for global / step progress stats progressStats._totalSegmentsToBeAdded = totalSegmentsToBeAdded; progressStats._totalSegmentsToBeDeleted = totalSegmentsToBeDeleted; @@ -542,7 +542,7 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { progressStats._totalEstimatedDataToBeMovedInBytes = TableRebalanceProgressStats.calculateNewEstimatedDataToBeMovedInBytes(0, rebalanceContext.getEstimatedAverageSegmentSizeInBytes(), totalSegmentsToBeAdded); - progressStats._startTimeMs = trigger == Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER + progressStats._startTimeMs = trigger == Trigger.NEXT_ASSIGNMENT_CALCULATION_TRIGGER ? System.currentTimeMillis() : rebalanceProgressStats.getStartTimeMs(); break; case IDEAL_STATE_CHANGE_TRIGGER: diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java index 1981ecaca9..17dd44378b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java @@ -55,9 +55,11 @@ import org.slf4j.LoggerFactory; /** - * Periodic task to run rebalancer in background to - * 1. relocate COMPLETED segments to tag overrides - * 2. relocate ONLINE segments to tiers if tier configs are set + * Periodic task to run rebalancer in background to: + * <ol> + * <li> Relocate COMPLETED segments to tag overrides + * <li> Relocate ONLINE segments to tiers if tier configs are set + * </ol> * Allow at most one replica unavailable during rebalance. Not applicable for HLC tables. */ public class SegmentRelocator extends ControllerPeriodicTask<Void> { @@ -82,7 +84,8 @@ public class SegmentRelocator extends ControllerPeriodicTask<Void> { private final Set<String> _waitingTables; private final BlockingQueue<String> _waitingQueue; - @Nullable private final Set<String> _tablesUndergoingRebalance; + @Nullable + private final Set<String> _tablesUndergoingRebalance; public SegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics, @@ -294,7 +297,7 @@ public class SegmentRelocator extends ControllerPeriodicTask<Void> { } } } - if (serverToSegmentsToMigrate.size() > 0) { + if (!serverToSegmentsToMigrate.isEmpty()) { LOGGER.info("Notify servers: {} to move segments to new tiers locally", serverToSegmentsToMigrate.keySet()); reloadSegmentsForLocalTierMigration(tableNameWithType, serverToSegmentsToMigrate, messagingService); } else { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java index 1424c774b7..2af9d10fb9 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java @@ -1292,8 +1292,9 @@ public class TableRebalancerTest { // Empty segment states should match for (boolean lowDiskMode : falseAndTrue) { for (boolean bestEfforts : falseAndTrue) { - assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, - idealStateSegmentStates, lowDiskMode, bestEfforts, null)); + assertTrue( + TableRebalancer.isExternalViewConverged(externalViewSegmentStates, idealStateSegmentStates, lowDiskMode, + bestEfforts, null)); } } @@ -1303,8 +1304,9 @@ public class TableRebalancerTest { externalViewSegmentStates.put("segment1", instanceStateMap); for (boolean lowDiskMode : falseAndTrue) { for (boolean bestEfforts : falseAndTrue) { - assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, - idealStateSegmentStates, lowDiskMode, bestEfforts, null)); + assertTrue( + TableRebalancer.isExternalViewConverged(externalViewSegmentStates, idealStateSegmentStates, lowDiskMode, + bestEfforts, null)); } } @@ -1314,8 +1316,9 @@ public class TableRebalancerTest { idealStateSegmentStates.put("segment2", instanceStateMap); for (boolean lowDiskMode : falseAndTrue) { for (boolean bestEfforts : falseAndTrue) { - assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, - idealStateSegmentStates, lowDiskMode, bestEfforts, null)); + assertTrue( + TableRebalancer.isExternalViewConverged(externalViewSegmentStates, idealStateSegmentStates, lowDiskMode, + bestEfforts, null)); } } @@ -1323,22 +1326,24 @@ public class TableRebalancerTest { instanceStateMap.put("instance2", CONSUMING); for (boolean lowDiskMode : falseAndTrue) { for (boolean bestEfforts : falseAndTrue) { - assertFalse(TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, - idealStateSegmentStates, lowDiskMode, bestEfforts, null)); + assertFalse( + TableRebalancer.isExternalViewConverged(externalViewSegmentStates, idealStateSegmentStates, lowDiskMode, + bestEfforts, null)); assertEquals( - TableRebalancer.getNumRemainingSegmentReplicasToProcess(offlineTableName, externalViewSegmentStates, - idealStateSegmentStates, lowDiskMode, bestEfforts, null), 1); + TableRebalancer.getNumRemainingSegmentReplicasToProcess(externalViewSegmentStates, idealStateSegmentStates, + lowDiskMode, bestEfforts, null), 1); } } instanceStateMap.put("instance3", CONSUMING); for (boolean lowDiskMode : falseAndTrue) { for (boolean bestEfforts : falseAndTrue) { - assertFalse(TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, - idealStateSegmentStates, lowDiskMode, bestEfforts, null)); + assertFalse( + TableRebalancer.isExternalViewConverged(externalViewSegmentStates, idealStateSegmentStates, lowDiskMode, + bestEfforts, null)); assertEquals( - TableRebalancer.getNumRemainingSegmentReplicasToProcess(offlineTableName, externalViewSegmentStates, - idealStateSegmentStates, lowDiskMode, bestEfforts, null), 2); + TableRebalancer.getNumRemainingSegmentReplicasToProcess(externalViewSegmentStates, idealStateSegmentStates, + lowDiskMode, bestEfforts, null), 2); } } @@ -1347,11 +1352,12 @@ public class TableRebalancerTest { externalViewSegmentStates.put("segment2", instanceStateMap); for (boolean lowDiskMode : falseAndTrue) { for (boolean bestEfforts : falseAndTrue) { - assertFalse(TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, - idealStateSegmentStates, lowDiskMode, bestEfforts, null)); + assertFalse( + TableRebalancer.isExternalViewConverged(externalViewSegmentStates, idealStateSegmentStates, lowDiskMode, + bestEfforts, null)); assertEquals( - TableRebalancer.getNumRemainingSegmentReplicasToProcess(offlineTableName, externalViewSegmentStates, - idealStateSegmentStates, lowDiskMode, bestEfforts, null), 2); + TableRebalancer.getNumRemainingSegmentReplicasToProcess(externalViewSegmentStates, idealStateSegmentStates, + lowDiskMode, bestEfforts, null), 2); } } @@ -1359,11 +1365,12 @@ public class TableRebalancerTest { instanceStateMap.put("instance2", OFFLINE); for (boolean lowDiskMode : falseAndTrue) { for (boolean bestEfforts : falseAndTrue) { - assertFalse(TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, - idealStateSegmentStates, lowDiskMode, bestEfforts, null)); + assertFalse( + TableRebalancer.isExternalViewConverged(externalViewSegmentStates, idealStateSegmentStates, lowDiskMode, + bestEfforts, null)); assertEquals( - TableRebalancer.getNumRemainingSegmentReplicasToProcess(offlineTableName, externalViewSegmentStates, - idealStateSegmentStates, lowDiskMode, bestEfforts, null), 2); + TableRebalancer.getNumRemainingSegmentReplicasToProcess(externalViewSegmentStates, idealStateSegmentStates, + lowDiskMode, bestEfforts, null), 2); } } @@ -1372,11 +1379,12 @@ public class TableRebalancerTest { instanceStateMap.put("instance3", OFFLINE); for (boolean lowDiskMode : falseAndTrue) { for (boolean bestEfforts : falseAndTrue) { - assertFalse(TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, - idealStateSegmentStates, lowDiskMode, bestEfforts, null)); + assertFalse( + TableRebalancer.isExternalViewConverged(externalViewSegmentStates, idealStateSegmentStates, lowDiskMode, + bestEfforts, null)); assertEquals( - TableRebalancer.getNumRemainingSegmentReplicasToProcess(offlineTableName, externalViewSegmentStates, - idealStateSegmentStates, lowDiskMode, bestEfforts, null), 2); + TableRebalancer.getNumRemainingSegmentReplicasToProcess(externalViewSegmentStates, idealStateSegmentStates, + lowDiskMode, bestEfforts, null), 2); } } @@ -1385,11 +1393,12 @@ public class TableRebalancerTest { instanceStateMap.put("instance3", OFFLINE); for (boolean lowDiskMode : falseAndTrue) { for (boolean bestEfforts : falseAndTrue) { - assertFalse(TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, - idealStateSegmentStates, lowDiskMode, bestEfforts, null)); + assertFalse( + TableRebalancer.isExternalViewConverged(externalViewSegmentStates, idealStateSegmentStates, lowDiskMode, + bestEfforts, null)); assertEquals( - TableRebalancer.getNumRemainingSegmentReplicasToProcess(offlineTableName, externalViewSegmentStates, - idealStateSegmentStates, lowDiskMode, bestEfforts, null), 1); + TableRebalancer.getNumRemainingSegmentReplicasToProcess(externalViewSegmentStates, idealStateSegmentStates, + lowDiskMode, bestEfforts, null), 1); } } @@ -1398,11 +1407,12 @@ public class TableRebalancerTest { instanceStateMap.put("instance3", CONSUMING); for (boolean lowDiskMode : falseAndTrue) { for (boolean bestEfforts : falseAndTrue) { - assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, - idealStateSegmentStates, lowDiskMode, bestEfforts, null)); + assertTrue( + TableRebalancer.isExternalViewConverged(externalViewSegmentStates, idealStateSegmentStates, lowDiskMode, + bestEfforts, null)); assertEquals( - TableRebalancer.getNumRemainingSegmentReplicasToProcess(offlineTableName, externalViewSegmentStates, - idealStateSegmentStates, lowDiskMode, bestEfforts, null), 0); + TableRebalancer.getNumRemainingSegmentReplicasToProcess(externalViewSegmentStates, idealStateSegmentStates, + lowDiskMode, bestEfforts, null), 0); } } @@ -1411,16 +1421,17 @@ public class TableRebalancerTest { instanceStateMap.put("instance5", CONSUMING); instanceStateMap.put("instance6", CONSUMING); for (boolean bestEfforts : falseAndTrue) { - assertTrue( - TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, - false, bestEfforts, null)); - assertEquals(TableRebalancer.getNumRemainingSegmentReplicasToProcess(offlineTableName, externalViewSegmentStates, - idealStateSegmentStates, false, bestEfforts, null), 0); + assertTrue(TableRebalancer.isExternalViewConverged(externalViewSegmentStates, idealStateSegmentStates, false, + bestEfforts, null)); + assertEquals( + TableRebalancer.getNumRemainingSegmentReplicasToProcess(externalViewSegmentStates, idealStateSegmentStates, + false, bestEfforts, null), 0); assertFalse( - TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, - true, bestEfforts, null)); - assertEquals(TableRebalancer.getNumRemainingSegmentReplicasToProcess(offlineTableName, externalViewSegmentStates, - idealStateSegmentStates, true, bestEfforts, null), 3); + TableRebalancer.isExternalViewConverged(externalViewSegmentStates, idealStateSegmentStates, true, bestEfforts, + null)); + assertEquals( + TableRebalancer.getNumRemainingSegmentReplicasToProcess(externalViewSegmentStates, idealStateSegmentStates, + true, bestEfforts, null), 3); } // When instance state is ERROR in ExternalView, should fail in regular mode but pass in best-efforts mode @@ -1430,15 +1441,15 @@ public class TableRebalancerTest { instanceStateMap.remove("instance6"); for (boolean lowDiskMode : falseAndTrue) { try { - TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, - lowDiskMode, false, null); + TableRebalancer.isExternalViewConverged(externalViewSegmentStates, idealStateSegmentStates, lowDiskMode, false, + null); fail(); } catch (Exception e) { // Expected } assertTrue( - TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, - lowDiskMode, true, null)); + TableRebalancer.isExternalViewConverged(externalViewSegmentStates, idealStateSegmentStates, lowDiskMode, true, + null)); } // When the extra instance is in ERROR state, should throw exception in low disk mode when best-efforts is disabled @@ -1451,15 +1462,16 @@ public class TableRebalancerTest { for (boolean bestEfforts : falseAndTrue) { if (lowDiskMode && !bestEfforts) { try { - TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, - idealStateSegmentStates, true, false, null); + TableRebalancer.isExternalViewConverged(externalViewSegmentStates, idealStateSegmentStates, true, false, + null); fail(); } catch (Exception e) { // Expected } } else { - assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, - idealStateSegmentStates, lowDiskMode, bestEfforts, null)); + assertTrue( + TableRebalancer.isExternalViewConverged(externalViewSegmentStates, idealStateSegmentStates, lowDiskMode, + bestEfforts, null)); } } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java index 043269f037..b0d86702e9 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java @@ -113,7 +113,7 @@ public class TestZkBasedTableRebalanceObserver { assertEquals(currentStepStats._totalCarryOverSegmentsToBeDeleted, 0); checkProgressPercentMetrics(controllerMetrics, observer); // This simulates the first step of rebalance, where the IS is set to the intermediate assignment - observer.onTrigger(TableRebalanceObserver.Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER, source, targetIntermediate, + observer.onTrigger(TableRebalanceObserver.Trigger.NEXT_ASSIGNMENT_CALCULATION_TRIGGER, source, targetIntermediate, rebalanceContext); overallStats = observer.getTableRebalanceProgressStats().getRebalanceProgressStatsOverall(); currentStepStats = observer.getTableRebalanceProgressStats().getRebalanceProgressStatsCurrentStep(); @@ -167,7 +167,7 @@ public class TestZkBasedTableRebalanceObserver { assertEquals(currentStepStats._totalCarryOverSegmentsToBeDeleted, 0); checkProgressPercentMetrics(controllerMetrics, observer); // Next assignment calculated based on the IS, IS should be same as the previous targetAssignment - observer.onTrigger(TableRebalanceObserver.Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER, targetIntermediate, target, + observer.onTrigger(TableRebalanceObserver.Trigger.NEXT_ASSIGNMENT_CALCULATION_TRIGGER, targetIntermediate, target, rebalanceContext); overallStats = observer.getTableRebalanceProgressStats().getRebalanceProgressStatsOverall(); currentStepStats = observer.getTableRebalanceProgressStats().getRebalanceProgressStatsCurrentStep(); @@ -314,7 +314,7 @@ public class TestZkBasedTableRebalanceObserver { // Triggers to initialize the overall or step level progress stats - they should provide similar results List<TableRebalanceObserver.Trigger> triggers = Arrays.asList(TableRebalanceObserver.Trigger.START_TRIGGER, - TableRebalanceObserver.Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER); + TableRebalanceObserver.Trigger.NEXT_ASSIGNMENT_CALCULATION_TRIGGER); for (TableRebalanceObserver.Trigger trigger : triggers) { Map<String, Map<String, String>> current = new TreeMap<>(); current.put("segment1", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1"), ONLINE)); @@ -586,7 +586,7 @@ public class TestZkBasedTableRebalanceObserver { nextAssignment.put("segment2", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3"), ONLINE)); stats = ZkBasedTableRebalanceObserver.calculateUpdatedProgressStats(nextAssignment, current, rebalanceContextIS, - TableRebalanceObserver.Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER, tableRebalanceProgressStats); + TableRebalanceObserver.Trigger.NEXT_ASSIGNMENT_CALCULATION_TRIGGER, tableRebalanceProgressStats); assertEquals(stats._totalSegmentsToBeAdded, 2); assertEquals(stats._totalSegmentsToBeDeleted, 0); assertEquals(stats._totalRemainingSegmentsToBeAdded, 2); @@ -753,7 +753,7 @@ public class TestZkBasedTableRebalanceObserver { Arrays.asList("host2", "host3", "host4", "host5"), ONLINE)); stats = ZkBasedTableRebalanceObserver.calculateUpdatedProgressStats(nextAssignment, current, rebalanceContextIS, - TableRebalanceObserver.Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER, tableRebalanceProgressStats); + TableRebalanceObserver.Trigger.NEXT_ASSIGNMENT_CALCULATION_TRIGGER, tableRebalanceProgressStats); assertEquals(stats._totalSegmentsToBeAdded, 4); assertEquals(stats._totalSegmentsToBeDeleted, 0); assertEquals(stats._totalRemainingSegmentsToBeAdded, 4); @@ -885,7 +885,7 @@ public class TestZkBasedTableRebalanceObserver { Arrays.asList("host2", "host3", "host4", "host5"), ONLINE)); stats = ZkBasedTableRebalanceObserver.calculateUpdatedProgressStats(nextAssignment, current, rebalanceContextIS, - TableRebalanceObserver.Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER, tableRebalanceProgressStats); + TableRebalanceObserver.Trigger.NEXT_ASSIGNMENT_CALCULATION_TRIGGER, tableRebalanceProgressStats); assertEquals(stats._totalSegmentsToBeAdded, 2); assertEquals(stats._totalSegmentsToBeDeleted, 2); assertEquals(stats._totalRemainingSegmentsToBeAdded, 2); @@ -1098,7 +1098,7 @@ public class TestZkBasedTableRebalanceObserver { nextAssignment.put("segment2", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3"), ONLINE)); stats = ZkBasedTableRebalanceObserver.calculateUpdatedProgressStats(nextAssignment, current, rebalanceContextIS, - TableRebalanceObserver.Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER, tableRebalanceProgressStats); + TableRebalanceObserver.Trigger.NEXT_ASSIGNMENT_CALCULATION_TRIGGER, tableRebalanceProgressStats); assertEquals(stats._totalSegmentsToBeAdded, 2); assertEquals(stats._totalSegmentsToBeDeleted, 0); assertEquals(stats._totalRemainingSegmentsToBeAdded, 2); @@ -1192,7 +1192,7 @@ public class TestZkBasedTableRebalanceObserver { Arrays.asList("host2", "host3", "host4", "host5"), ONLINE)); stats = ZkBasedTableRebalanceObserver.calculateUpdatedProgressStats(nextAssignment, oldNextAssignment, - rebalanceContextIS, TableRebalanceObserver.Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER, + rebalanceContextIS, TableRebalanceObserver.Trigger.NEXT_ASSIGNMENT_CALCULATION_TRIGGER, tableRebalanceProgressStats); assertEquals(stats._totalSegmentsToBeAdded, 4); assertEquals(stats._totalSegmentsToBeDeleted, 0); @@ -1366,7 +1366,7 @@ public class TestZkBasedTableRebalanceObserver { Arrays.asList("host2", "host3", "host4", "host5"), ONLINE)); stats = ZkBasedTableRebalanceObserver.calculateUpdatedProgressStats(nextAssignment, current, rebalanceContextIS, - TableRebalanceObserver.Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER, tableRebalanceProgressStats); + TableRebalanceObserver.Trigger.NEXT_ASSIGNMENT_CALCULATION_TRIGGER, tableRebalanceProgressStats); assertEquals(stats._totalSegmentsToBeAdded, 2); assertEquals(stats._totalSegmentsToBeDeleted, 2); assertEquals(stats._totalRemainingSegmentsToBeAdded, 2); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org