This is an automated email from the ASF dual-hosted git repository. xbli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 604cd16558 Add minimizeDataMovement to Rebalacne API (#15110) 604cd16558 is described below commit 604cd165586716ae04db22856371e50147e30459 Author: Jhow <44998515+j-howhu...@users.noreply.github.com> AuthorDate: Fri Mar 14 10:12:33 2025 -0700 Add minimizeDataMovement to Rebalacne API (#15110) * Add minimizeDataMovement param to RebalanceConfig * add minimizeDataMovement in RebalanceTableCommand --- .../api/resources/PinotTableRestletResource.java | 4 + .../instance/InstanceAssignmentDriver.java | 38 ++- .../helix/core/rebalance/RebalanceConfig.java | 36 ++- .../helix/core/rebalance/TableRebalancer.java | 69 ++++-- .../instance/InstanceAssignmentTest.java | 244 ++++++++++++++++-- .../TableRebalancerClusterStatelessTest.java | 275 ++++++++++++++++++++- .../apache/pinot/tools/PinotTableRebalancer.java | 4 +- .../tools/admin/command/RebalanceTableCommand.java | 9 +- 8 files changed, 620 insertions(+), 59 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index 5de903eb9d..fed660e988 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -612,6 +612,9 @@ public class PinotTableRestletResource { @QueryParam("reassignInstances") boolean reassignInstances, @ApiParam(value = "Whether to reassign CONSUMING segments for real-time table") @DefaultValue("true") @QueryParam("includeConsuming") boolean includeConsuming, + @ApiParam(value = "Whether to enable minimize data movement on rebalance, DEFAULT will use " + + "the minimizeDataMovement in table config") @DefaultValue("ENABLE") + @QueryParam("minimizeDataMovement") RebalanceConfig.MinimizeDataMovementOptions minimizeDataMovement, @ApiParam(value = "Whether to rebalance table in bootstrap mode (regardless of minimum segment movement, " + "reassign all segments in a round-robin fashion as if adding new segments to an empty table)") @DefaultValue("false") @QueryParam("bootstrap") boolean bootstrap, @@ -651,6 +654,7 @@ public class PinotTableRestletResource { rebalanceConfig.setPreChecks(preChecks); rebalanceConfig.setReassignInstances(reassignInstances); rebalanceConfig.setIncludeConsuming(includeConsuming); + rebalanceConfig.setMinimizeDataMovement(minimizeDataMovement); rebalanceConfig.setBootstrap(bootstrap); rebalanceConfig.setDowntime(downtime); rebalanceConfig.setMinAvailableReplicas(minAvailableReplicas); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java index 09866c1ed7..5ee9258014 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java @@ -55,40 +55,66 @@ public class InstanceAssignmentDriver { public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType, List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions) { + return assignInstances(instancePartitionsType, instanceConfigs, existingInstancePartitions, (Boolean) null); + } + + public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType, + List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, + @Nullable Boolean minimizeDataMovement) { String tableNameWithType = _tableConfig.getTableName(); InstanceAssignmentConfig assignmentConfig = InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType); return getInstancePartitions( instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)), - assignmentConfig, instanceConfigs, existingInstancePartitions, null); + assignmentConfig, instanceConfigs, existingInstancePartitions, null, minimizeDataMovement); } public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType, List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, @Nullable InstancePartitions preConfiguredInstancePartitions) { + return assignInstances(instancePartitionsType, instanceConfigs, existingInstancePartitions, + preConfiguredInstancePartitions, null); + } + + public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType, + List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, + @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable Boolean minimizeDataMovement) { String tableNameWithType = _tableConfig.getTableName(); InstanceAssignmentConfig assignmentConfig = InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType); return getInstancePartitions( instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)), - assignmentConfig, instanceConfigs, existingInstancePartitions, preConfiguredInstancePartitions); + assignmentConfig, instanceConfigs, existingInstancePartitions, preConfiguredInstancePartitions, + minimizeDataMovement); } public InstancePartitions assignInstances(String tierName, List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, InstanceAssignmentConfig instanceAssignmentConfig) { + return assignInstances(tierName, instanceConfigs, existingInstancePartitions, instanceAssignmentConfig, null); + } + + public InstancePartitions assignInstances(String tierName, List<InstanceConfig> instanceConfigs, + @Nullable InstancePartitions existingInstancePartitions, InstanceAssignmentConfig instanceAssignmentConfig, + @Nullable Boolean minimizeDataMovement) { return getInstancePartitions( InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(), tierName), - instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, null); + instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, null, minimizeDataMovement); } private InstancePartitions getInstancePartitions(String instancePartitionsName, InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, - @Nullable InstancePartitions preConfiguredInstancePartitions) { + @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable Boolean minimizeDataMovementOverride) { String tableNameWithType = _tableConfig.getTableName(); - LOGGER.info("Starting {} instance assignment for table {}", instancePartitionsName, tableNameWithType); - boolean minimizeDataMovement = instanceAssignmentConfig.isMinimizeDataMovement(); + // minimizeDataMovement might be set back to false within InstanceTagPoolSelector and InstancePartitionSelector + // if existingInstancePartitions is null. + boolean minimizeDataMovement = + minimizeDataMovementOverride == null ? instanceAssignmentConfig.isMinimizeDataMovement() + : minimizeDataMovementOverride; + LOGGER.info("Starting {} instance assignment for table {}, original minimizeDataMovement: {}, " + + "overriding with minimizeDataMovementOverride: {}", instancePartitionsName, tableNameWithType, + instanceAssignmentConfig.isMinimizeDataMovement(), minimizeDataMovementOverride); InstanceTagPoolSelector tagPoolSelector = new InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), tableNameWithType, minimizeDataMovement, existingInstancePartitions); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java index f69f6be3d9..381a22fa3c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java @@ -83,6 +83,18 @@ public class RebalanceConfig { @ApiModelProperty(example = "false") private boolean _bestEfforts = false; + // Whether to run Minimal Data Movement Algorithm, overriding the minimizeDataMovement flag in table config. If set + // to default, the minimizeDataMovement flag in table config will be used to determine whether to run the Minimal + // Data Movement Algorithm. + @ApiModel + public enum MinimizeDataMovementOptions { + ENABLE, DISABLE, DEFAULT + } + + @JsonProperty("minimizeDataMovement") + @ApiModelProperty(dataType = "string", allowableValues = "ENABLE, DISABLE, DEFAULT", example = "ENABLE") + private MinimizeDataMovementOptions _minimizeDataMovement = MinimizeDataMovementOptions.ENABLE; + // The check on external view can be very costly when the table has very large ideal and external states, i.e. when // having a huge number of segments. These two configs help reduce the cpu load on controllers, e.g. by doing the // check less frequently and bail out sooner to rebalance at best effort if configured so. @@ -244,16 +256,24 @@ public class RebalanceConfig { _retryInitialDelayInMs = retryInitialDelayInMs; } + public MinimizeDataMovementOptions getMinimizeDataMovement() { + return _minimizeDataMovement; + } + + public void setMinimizeDataMovement(MinimizeDataMovementOptions minimizeDataMovement) { + _minimizeDataMovement = minimizeDataMovement; + } + @Override public String toString() { - return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", preChecks=" + _preChecks - + ", _reassignInstances=" + _reassignInstances + ", _includeConsuming=" + _includeConsuming + ", _bootstrap=" - + _bootstrap + ", _downtime=" + _downtime + ", _minAvailableReplicas=" + _minAvailableReplicas - + ", _bestEfforts=" + _bestEfforts + ", _externalViewCheckIntervalInMs=" + _externalViewCheckIntervalInMs - + ", _externalViewStabilizationTimeoutInMs=" + _externalViewStabilizationTimeoutInMs + ", _updateTargetTier=" - + _updateTargetTier + ", _heartbeatIntervalInMs=" + _heartbeatIntervalInMs + ", _heartbeatTimeoutInMs=" - + _heartbeatTimeoutInMs + ", _maxAttempts=" + _maxAttempts + ", _retryInitialDelayInMs=" - + _retryInitialDelayInMs + '}'; + return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", preChecks=" + _preChecks + ", _reassignInstances=" + + _reassignInstances + ", _includeConsuming=" + _includeConsuming + ", _minimizeDataMovement=" + + _minimizeDataMovement + ", _bootstrap=" + _bootstrap + ", _downtime=" + _downtime + ", _minAvailableReplicas=" + + _minAvailableReplicas + ", _bestEfforts=" + _bestEfforts + ", _externalViewCheckIntervalInMs=" + + _externalViewCheckIntervalInMs + ", _externalViewStabilizationTimeoutInMs=" + + _externalViewStabilizationTimeoutInMs + ", _updateTargetTier=" + _updateTargetTier + + ", _heartbeatIntervalInMs=" + _heartbeatIntervalInMs + ", _heartbeatTimeoutInMs=" + _heartbeatTimeoutInMs + + ", _maxAttempts=" + _maxAttempts + ", _retryInitialDelayInMs=" + _retryInitialDelayInMs + '}'; } public static RebalanceConfig copy(RebalanceConfig cfg) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index 34cab7c29d..fd79e25b0a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -191,17 +191,32 @@ public class TableRebalancer { boolean bestEfforts = rebalanceConfig.isBestEfforts(); long externalViewCheckIntervalInMs = rebalanceConfig.getExternalViewCheckIntervalInMs(); long externalViewStabilizationTimeoutInMs = rebalanceConfig.getExternalViewStabilizationTimeoutInMs(); + Boolean minimizeDataMovement; + switch (rebalanceConfig.getMinimizeDataMovement()) { + case DEFAULT: + minimizeDataMovement = null; + break; + case ENABLE: + minimizeDataMovement = true; + break; + case DISABLE: + minimizeDataMovement = false; + break; + default: + throw new IllegalStateException( + "Invalid minimizeDataMovement option: " + rebalanceConfig.getMinimizeDataMovement()); + } boolean enableStrictReplicaGroup = tableConfig.getRoutingConfig() != null && RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase( tableConfig.getRoutingConfig().getInstanceSelectorType()); LOGGER.info( - "Start rebalancing table: {} with dryRun: {}, preChecks: {}, reassignInstances: {}, includeConsuming: {}," - + "bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}," - + "lowDiskMode: {}, bestEfforts: {}, externalViewCheckIntervalInMs: {}, " - + "externalViewStabilizationTimeoutInMs: {}", + "Start rebalancing table: {} with dryRun: {}, preChecks: {}, reassignInstances: {}, " + + "includeConsuming: {}, bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime: {}, " + + "enableStrictReplicaGroup: {}, lowDiskMode: {}, bestEfforts: {}, externalViewCheckIntervalInMs: {}, " + + "externalViewStabilizationTimeoutInMs: {}, minimizeDataMovement: {}", tableNameWithType, dryRun, preChecks, reassignInstances, includeConsuming, bootstrap, downtime, minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup, lowDiskMode, bestEfforts, - externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs); + externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs, minimizeDataMovement); // Perform pre-checks if enabled Map<String, String> preChecksResult = null; @@ -253,7 +268,7 @@ public class TableRebalancer { boolean instancePartitionsUnchanged; try { Pair<Map<InstancePartitionsType, InstancePartitions>, Boolean> instancePartitionsMapAndUnchanged = - getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, dryRun); + getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, dryRun, minimizeDataMovement); instancePartitionsMap = instancePartitionsMapAndUnchanged.getLeft(); instancePartitionsUnchanged = instancePartitionsMapAndUnchanged.getRight(); } catch (Exception e) { @@ -272,7 +287,8 @@ public class TableRebalancer { try { sortedTiers = getSortedTiers(tableConfig, providedTierToSegmentsMap); Pair<Map<String, InstancePartitions>, Boolean> tierToInstancePartitionsMapAndUnchanged = - getTierToInstancePartitionsMap(tableConfig, sortedTiers, reassignInstances, bootstrap, dryRun); + getTierToInstancePartitionsMap(tableConfig, sortedTiers, reassignInstances, bootstrap, dryRun, + minimizeDataMovement); tierToInstancePartitionsMap = tierToInstancePartitionsMapAndUnchanged.getLeft(); tierInstancePartitionsUnchanged = tierToInstancePartitionsMapAndUnchanged.getRight(); } catch (Exception e) { @@ -488,9 +504,11 @@ public class TableRebalancer { try { // Re-calculate the instance partitions in case the instance configs changed during the rebalance instancePartitionsMap = - getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, false).getLeft(); + getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, false, + minimizeDataMovement).getLeft(); tierToInstancePartitionsMap = - getTierToInstancePartitionsMap(tableConfig, sortedTiers, reassignInstances, bootstrap, false).getLeft(); + getTierToInstancePartitionsMap(tableConfig, sortedTiers, reassignInstances, bootstrap, false, + minimizeDataMovement).getLeft(); targetAssignment = segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers, tierToInstancePartitionsMap, rebalanceConfig); } catch (Exception e) { @@ -727,22 +745,31 @@ public class TableRebalancer { */ public Pair<Map<InstancePartitionsType, InstancePartitions>, Boolean> getInstancePartitionsMap( TableConfig tableConfig, boolean reassignInstances, boolean bootstrap, boolean dryRun) { + return getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, dryRun, false); + } + + public Pair<Map<InstancePartitionsType, InstancePartitions>, Boolean> getInstancePartitionsMap( + TableConfig tableConfig, boolean reassignInstances, boolean bootstrap, boolean dryRun, + @Nullable Boolean minimizeDataMovement) { boolean instancePartitionsUnchanged; Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = new TreeMap<>(); if (tableConfig.getTableType() == TableType.OFFLINE) { Pair<InstancePartitions, Boolean> partitionAndUnchangedForOffline = - getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, reassignInstances, bootstrap, dryRun); + getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, reassignInstances, bootstrap, dryRun, + minimizeDataMovement); instancePartitionsMap.put(InstancePartitionsType.OFFLINE, partitionAndUnchangedForOffline.getLeft()); instancePartitionsUnchanged = partitionAndUnchangedForOffline.getRight(); } else { Pair<InstancePartitions, Boolean> partitionAndUnchangedForConsuming = - getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, reassignInstances, bootstrap, dryRun); + getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, reassignInstances, bootstrap, dryRun, + minimizeDataMovement); instancePartitionsMap.put(InstancePartitionsType.CONSUMING, partitionAndUnchangedForConsuming.getLeft()); instancePartitionsUnchanged = partitionAndUnchangedForConsuming.getRight(); String tableNameWithType = tableConfig.getTableName(); if (InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig)) { Pair<InstancePartitions, Boolean> partitionAndUnchangedForCompleted = - getInstancePartitions(tableConfig, InstancePartitionsType.COMPLETED, reassignInstances, bootstrap, dryRun); + getInstancePartitions(tableConfig, InstancePartitionsType.COMPLETED, reassignInstances, bootstrap, dryRun, + minimizeDataMovement); LOGGER.info( "COMPLETED segments should be relocated, fetching/computing COMPLETED instance partitions for table: {}", tableNameWithType); @@ -768,7 +795,8 @@ public class TableRebalancer { * Fetches/computes the instance partitions and also returns a boolean for whether they are unchanged */ private Pair<InstancePartitions, Boolean> getInstancePartitions(TableConfig tableConfig, - InstancePartitionsType instancePartitionsType, boolean reassignInstances, boolean bootstrap, boolean dryRun) { + InstancePartitionsType instancePartitionsType, boolean reassignInstances, boolean bootstrap, boolean dryRun, + @Nullable Boolean minimizeDataMovement) { String tableNameWithType = tableConfig.getTableName(); String instancePartitionsName = InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, instancePartitionsType.toString()); @@ -790,7 +818,7 @@ public class TableRebalancer { // instance partition map can be fully recalculated. instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true), - bootstrap ? null : existingInstancePartitions); + bootstrap ? null : existingInstancePartitions, minimizeDataMovement); instancePartitionsUnchanged = instancePartitions.equals(existingInstancePartitions); if (!dryRun && !instancePartitionsUnchanged) { LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions); @@ -805,7 +833,8 @@ public class TableRebalancer { referenceInstancePartitionsName, instancePartitionsName); instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true), - bootstrap ? null : existingInstancePartitions, preConfiguredInstancePartitions); + bootstrap ? null : existingInstancePartitions, preConfiguredInstancePartitions, + minimizeDataMovement); instancePartitionsUnchanged = instancePartitions.equals(existingInstancePartitions); if (!dryRun && !instancePartitionsUnchanged) { LOGGER.info("Persisting instance partitions: {} (based on {})", instancePartitions, @@ -868,7 +897,8 @@ public class TableRebalancer { * instance partitions are unchanged. */ private Pair<Map<String, InstancePartitions>, Boolean> getTierToInstancePartitionsMap(TableConfig tableConfig, - @Nullable List<Tier> sortedTiers, boolean reassignInstances, boolean bootstrap, boolean dryRun) { + @Nullable List<Tier> sortedTiers, boolean reassignInstances, boolean bootstrap, boolean dryRun, + @Nullable Boolean minimizeDataMovement) { if (sortedTiers == null) { return Pair.of(null, true); } @@ -878,7 +908,8 @@ public class TableRebalancer { LOGGER.info("Fetching/computing instance partitions for tier: {} of table: {}", tier.getName(), tableConfig.getTableName()); Pair<InstancePartitions, Boolean> partitionsAndUnchanged = - getInstancePartitionsForTier(tableConfig, tier, reassignInstances, bootstrap, dryRun); + getInstancePartitionsForTier(tableConfig, tier, reassignInstances, bootstrap, dryRun, + minimizeDataMovement); tierToInstancePartitionsMap.put(tier.getName(), partitionsAndUnchanged.getLeft()); instancePartitionsUnchanged = instancePartitionsUnchanged && partitionsAndUnchanged.getRight(); } @@ -891,7 +922,7 @@ public class TableRebalancer { * a boolean for whether the instance partition is unchanged. */ private Pair<InstancePartitions, Boolean> getInstancePartitionsForTier(TableConfig tableConfig, Tier tier, - boolean reassignInstances, boolean bootstrap, boolean dryRun) { + boolean reassignInstances, boolean bootstrap, boolean dryRun, @Nullable Boolean minimizeDataMovement) { String tableNameWithType = tableConfig.getTableName(); String tierName = tier.getName(); String instancePartitionsName = @@ -924,7 +955,7 @@ public class TableRebalancer { // partition map can be fully recalculated. InstancePartitions instancePartitions = instanceAssignmentDriver.assignInstances(tierName, _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true), - bootstrap ? null : existingInstancePartitions, instanceAssignmentConfig); + bootstrap ? null : existingInstancePartitions, instanceAssignmentConfig, minimizeDataMovement); boolean instancePartitionsUnchanged = instancePartitions.equals(existingInstancePartitions); if (!dryRun && !instancePartitionsUnchanged) { LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java index 39aef7f35a..055ce297a1 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java @@ -53,6 +53,8 @@ import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -500,11 +502,224 @@ public class InstanceAssignmentTest { assertEquals(instancePartitions.getInstances(9, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 7)); } - public void testMirrorServerSetBasedRandom() throws FileNotFoundException { + public void testMirrorServerSetBasedRandom() + throws FileNotFoundException { testMirrorServerSetBasedRandomInner(10000000); } - public void testMirrorServerSetBasedRandomInner(int loopCount) throws FileNotFoundException { + @Test + public void testForceMinimizeDataMovement() { + // This test case is using the same instance rebalance plot as testMinimizeDataMovement, and test whether + // forceMinimizeDataMovement flag in InstanceAssignmentDriver works as the minimizeDataMovement flag in + // TableConfig does. + int numReplicas = 3; + int numPartitions = 2; + int numInstancesPerPartition = 2; + String partitionColumn = "partition"; + + // Configs and driver that minimize data movement + InstanceAssignmentConfig instanceAssignmentConfig = new InstanceAssignmentConfig( + new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 0, null), null, + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, numPartitions, numInstancesPerPartition, false, + partitionColumn), null, true); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) + .setNumReplicas(numReplicas) + .setInstanceAssignmentConfigMap(Map.of("OFFLINE", instanceAssignmentConfig)) + .build(); + assertTrue(InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig, InstancePartitionsType.OFFLINE) + .isMinimizeDataMovement()); + + InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig); + // Configs and driver that DO NOT minimize data movement + InstanceAssignmentConfig instanceAssignmentConfigNotMinimized = new InstanceAssignmentConfig( + new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 0, null), null, + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, numPartitions, numInstancesPerPartition, false, + partitionColumn), null, false); + + TableConfig tableConfigNotMinimized = new TableConfig(tableConfig); + tableConfigNotMinimized.setInstanceAssignmentConfigMap(Map.of("OFFLINE", instanceAssignmentConfigNotMinimized)); + assertFalse(InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfigNotMinimized, + InstancePartitionsType.OFFLINE).isMinimizeDataMovement()); + InstanceAssignmentDriver driverNotMinimized = new InstanceAssignmentDriver(tableConfigNotMinimized); + + int numInstances = 10; + List<InstanceConfig> instanceConfigs = new ArrayList<>(numInstances); + for (int i = 0; i < numInstances; i++) { + InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i); + instanceConfig.addTag(OFFLINE_TAG); + instanceConfigs.add(instanceConfig); + } + + // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 2 instances + InstancePartitions instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null, (Boolean) null); + + InstancePartitions instancePartitionsForcedMinimize = + driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null, true); + + InstancePartitions instancePartitionsNotMinimize = + driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null, false); + + // Initial assignment should be the same for all scenarios + assertEquals(instancePartitionsForcedMinimize, instancePartitions); + assertEquals(instancePartitionsForcedMinimize, instancePartitionsNotMinimize); + + // Remove two instances (i2, i6) and add two new instances (i10, i11). + instanceConfigs.remove(6); + instanceConfigs.remove(2); + for (int i = numInstances; i < numInstances + 2; i++) { + InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i); + instanceConfig.addTag(OFFLINE_TAG); + instanceConfigs.add(instanceConfig); + } + + // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 3 instances, then these 3 + // instances should be assigned to 2 partitions, each with 2 instances + // Leverage the latest instancePartitions from last computation as the parameter. + // Data movement is minimized so that: i2 -> i10, i6 -> i11 + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, instancePartitions, (Boolean) null); + + instancePartitionsForcedMinimize = + driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, + instancePartitionsForcedMinimize, true); + + // Data movement here is not minimized + instancePartitionsNotMinimize = + driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, + instancePartitionsNotMinimize, false); + + // Forced minimized data movement should be the same as minimized data movement + assertEquals(instancePartitionsForcedMinimize, instancePartitions); + // Without minimizeDataMovement set to true, the data movement is not minimized and should be different + assertNotEquals(instancePartitionsNotMinimize, instancePartitions); + + // Add 2 more instances to the ZK and increase the number of instances per partition from 2 to 3. + for (int i = numInstances + 2; i < numInstances + 4; i++) { + InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i); + instanceConfig.addTag(OFFLINE_TAG); + instanceConfigs.add(instanceConfig); + } + numInstancesPerPartition = 3; + instanceAssignmentConfig = new InstanceAssignmentConfig( + new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 0, null), null, + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, numPartitions, numInstancesPerPartition, false, + partitionColumn), null, true); + tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE", instanceAssignmentConfig)); + + instanceAssignmentConfigNotMinimized = new InstanceAssignmentConfig( + new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 0, null), null, + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, numPartitions, numInstancesPerPartition, false, + partitionColumn), null, false); + tableConfigNotMinimized.setInstanceAssignmentConfigMap(Map.of("OFFLINE", instanceAssignmentConfigNotMinimized)); + + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, instancePartitions, (Boolean) null); + + instancePartitionsForcedMinimize = + driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, + instancePartitionsForcedMinimize, true); + + instancePartitionsNotMinimize = + driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, + instancePartitionsNotMinimize, false); + + assertEquals(instancePartitionsForcedMinimize, instancePartitions); + assertNotEquals(instancePartitionsNotMinimize, instancePartitions); + + // Reduce the number of instances per partition from 3 to 2. + numInstancesPerPartition = 2; + instanceAssignmentConfig = new InstanceAssignmentConfig( + new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 0, null), null, + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, numPartitions, numInstancesPerPartition, false, + partitionColumn), null, true); + tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE", instanceAssignmentConfig)); + + instanceAssignmentConfigNotMinimized = new InstanceAssignmentConfig( + new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 0, null), null, + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, numPartitions, numInstancesPerPartition, false, + partitionColumn), null, false); + tableConfigNotMinimized.setInstanceAssignmentConfigMap(Map.of("OFFLINE", instanceAssignmentConfigNotMinimized)); + + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, instancePartitions, (Boolean) null); + + instancePartitionsForcedMinimize = + driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, + instancePartitionsForcedMinimize, true); + + instancePartitionsNotMinimize = + driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, + instancePartitionsNotMinimize, false); + + assertEquals(instancePartitionsForcedMinimize, instancePartitions); + assertNotEquals(instancePartitionsNotMinimize, instancePartitions); + + // Add one more replica group (from 3 to 4). + numReplicas = 4; + tableConfig.getValidationConfig().setReplication(Integer.toString(numReplicas)); + tableConfigNotMinimized.getValidationConfig().setReplication(Integer.toString(numReplicas)); + + instanceAssignmentConfig = new InstanceAssignmentConfig( + new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 0, null), null, + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, numPartitions, numInstancesPerPartition, false, + partitionColumn), null, true); + tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE", instanceAssignmentConfig)); + + instanceAssignmentConfigNotMinimized = new InstanceAssignmentConfig( + new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 0, null), null, + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, numPartitions, numInstancesPerPartition, false, + partitionColumn), null, false); + tableConfigNotMinimized.setInstanceAssignmentConfigMap(Map.of("OFFLINE", instanceAssignmentConfigNotMinimized)); + + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, instancePartitions, (Boolean) null); + + instancePartitionsForcedMinimize = + driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, + instancePartitionsForcedMinimize, true); + + instancePartitionsNotMinimize = + driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, + instancePartitionsNotMinimize, false); + + assertEquals(instancePartitionsForcedMinimize, instancePartitions); + assertNotEquals(instancePartitionsNotMinimize, instancePartitions); + + // Remove one replica group (from 4 to 3). + numReplicas = 3; + tableConfig.getValidationConfig().setReplication(Integer.toString(numReplicas)); + tableConfigNotMinimized.getValidationConfig().setReplication(Integer.toString(numReplicas)); + + instanceAssignmentConfig = new InstanceAssignmentConfig( + new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 0, null), null, + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, numPartitions, numInstancesPerPartition, false, + partitionColumn), null, true); + tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE", instanceAssignmentConfig)); + + instanceAssignmentConfigNotMinimized = new InstanceAssignmentConfig( + new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 0, null), null, + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, numPartitions, numInstancesPerPartition, false, + partitionColumn), null, false); + tableConfigNotMinimized.setInstanceAssignmentConfigMap(Map.of("OFFLINE", instanceAssignmentConfigNotMinimized)); + + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, instancePartitions, (Boolean) null); + + instancePartitionsForcedMinimize = + driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, + instancePartitionsForcedMinimize, true); + + instancePartitionsNotMinimize = + driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, + instancePartitionsNotMinimize, false); + + assertEquals(instancePartitionsForcedMinimize, instancePartitions); + assertNotEquals(instancePartitionsNotMinimize, instancePartitions); + } + + public void testMirrorServerSetBasedRandomInner(int loopCount) + throws FileNotFoundException { PrintStream o = new PrintStream("output.txt"); System.setOut(o); for (int iter = 0; iter < loopCount; iter++) { @@ -1212,7 +1427,6 @@ public class InstanceAssignmentTest { SERVER_INSTANCE_ID_PREFIX + 20, SERVER_INSTANCE_ID_PREFIX + 23)); - // upscale 3*3 to 3*5 numPartitions = 0; numInstancesPerPartition = 0; @@ -2209,9 +2423,9 @@ public class InstanceAssignmentTest { new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, 0, 0, false, null); tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap( - Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) + Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) .build(); driver = new InstanceAssignmentDriver(tableConfig); try { @@ -2243,9 +2457,9 @@ public class InstanceAssignmentTest { new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, 0, 0, false, null); tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap( - Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) + Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) .build(); driver = new InstanceAssignmentDriver(tableConfig); try { @@ -2285,9 +2499,9 @@ public class InstanceAssignmentTest { new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, 0, 0, false, null); tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap( - Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) + Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) .build(); driver = new InstanceAssignmentDriver(tableConfig); try { @@ -2397,9 +2611,9 @@ public class InstanceAssignmentTest { new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions, numInstancesPerPartition, true, null); tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap( - Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), true))) + Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), true))) .build(); driver = new InstanceAssignmentDriver(tableConfig); // existingInstancePartitions = instancePartitions diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java index 5b99e56cc3..cab18467f0 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java @@ -95,8 +95,8 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { ExecutorService executorService = Executors.newFixedThreadPool(10); DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker(); preChecker.init(_helixResourceManager, executorService); - TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, null, preChecker, - _helixResourceManager.getTableSizeReader()); + TableRebalancer tableRebalancer = + new TableRebalancer(_helixManager, null, null, preChecker, _helixResourceManager.getTableSizeReader()); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build(); @@ -513,6 +513,13 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertNull(rebalanceResult.getPreChecksResult()); _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME); + + for (int i = 0; i < numServers; i++) { + stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i); + } + for (int i = 0; i < numServersToAdd; i++) { + stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + (numServers + i)); + } executorService.shutdown(); } @@ -532,9 +539,10 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { } _helixResourceManager.createServerTenant(new Tenant(TenantRole.SERVER, NO_TIER_NAME, numServers, numServers, 0)); - TableConfig tableConfig = - new TableConfigBuilder(TableType.OFFLINE).setTableName(TIERED_TABLE_NAME).setNumReplicas(NUM_REPLICAS) - .setServerTenant(NO_TIER_NAME).build(); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TIERED_TABLE_NAME) + .setNumReplicas(NUM_REPLICAS) + .setServerTenant(NO_TIER_NAME) + .build(); // Create the table addDummySchema(TIERED_TABLE_NAME); _helixResourceManager.addTable(tableConfig); @@ -675,9 +683,10 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { _helixResourceManager.createServerTenant( new Tenant(TenantRole.SERVER, "replicaAssignment" + NO_TIER_NAME, numServers, numServers, 0)); - TableConfig tableConfig = - new TableConfigBuilder(TableType.OFFLINE).setTableName(TIERED_TABLE_NAME).setNumReplicas(NUM_REPLICAS) - .setServerTenant("replicaAssignment" + NO_TIER_NAME).build(); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TIERED_TABLE_NAME) + .setNumReplicas(NUM_REPLICAS) + .setServerTenant("replicaAssignment" + NO_TIER_NAME) + .build(); // Create the table addDummySchema(TIERED_TABLE_NAME); _helixResourceManager.addTable(tableConfig); @@ -858,6 +867,256 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { _helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME); } + @Test + public void testRebalanceWithMinimizeDataMovementBalanced() + throws Exception { + int numServers = 6; + for (int i = 0; i < numServers; i++) { + addFakeServerInstanceToAutoJoinHelixCluster("minimizeDataMovement_balance_" + SERVER_INSTANCE_ID_PREFIX + i, + true); + } + + // Create the table with default balanced segment assignment + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build(); + + addDummySchema(RAW_TABLE_NAME); + _helixResourceManager.addTable(tableConfig); + + // Add the segments + int numSegments = 10; + long nowInDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis()); + + for (int i = 0; i < numSegments; i++) { + _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, + SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME, SEGMENT_NAME_PREFIX + i, + nowInDays), null); + } + + Map<String, Map<String, String>> oldSegmentAssignment = + _helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(); + + TableRebalancer tableRebalancer = new TableRebalancer(_helixManager); + + // Try dry-run summary mode + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setReassignInstances(true); + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE); + RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + + RebalanceSummaryResult rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceSummaryResult); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + RebalanceSummaryResult.ServerInfo rebalanceServerInfo = rebalanceSummaryResult.getServerInfo(); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); + assertEquals(rebalanceServerInfo.getNumServers().getExpectedValueAfterRebalance(), numServers); + + rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); + // Segment assignment should not change + assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment); + + // add one server instance + addFakeServerInstanceToAutoJoinHelixCluster("minimizeDataMovement_balance_" + SERVER_INSTANCE_ID_PREFIX, true); + + // Table without instance assignment config should work fine (ignore) with the minimizeDataMovement flag set + // Try dry-run summary mode + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setReassignInstances(true); + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + + rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceSummaryResult); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + rebalanceServerInfo = rebalanceSummaryResult.getServerInfo(); + // Should see the added server + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + assertEquals(rebalanceServerInfo.getNumServers().getValueBeforeRebalance(), numServers); + assertEquals(rebalanceServerInfo.getNumServers().getExpectedValueAfterRebalance(), numServers + 1); + + // Check if the instance assignment is the same as the one without minimizeDataMovement flag set + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setReassignInstances(true); + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.DEFAULT); + RebalanceResult rebalanceResultWithoutMinimized = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + + assertEquals(rebalanceResult.getInstanceAssignment(), rebalanceResultWithoutMinimized.getInstanceAssignment()); + + // Rebalance + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setReassignInstances(true); + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + // Should see the added server in the instance assignment + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + assertEquals(rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE).getInstances(0, 0).size(), + numServers + 1); + + _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME); + for (int i = 0; i < numServers; i++) { + stopAndDropFakeInstance("minimizeDataMovement_balance_" + SERVER_INSTANCE_ID_PREFIX + i); + } + } + + @Test + public void testRebalanceWithMinimizeDataMovementInstanceAssignments() + throws Exception { + int numServers = 6; + for (int i = 0; i < numServers; i++) { + addFakeServerInstanceToAutoJoinHelixCluster("minimizeDataMovement_" + SERVER_INSTANCE_ID_PREFIX + i, true); + } + + // One instance per replica group, no partition + InstanceAssignmentConfig instanceAssignmentConfig = new InstanceAssignmentConfig( + new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null), false, 0, null), null, + new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 1, 0, 0, false, null), null, false); + + // Create the table + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) + .setNumReplicas(NUM_REPLICAS) + .setInstanceAssignmentConfigMap(Map.of("OFFLINE", instanceAssignmentConfig)) + .build(); + + addDummySchema(RAW_TABLE_NAME); + _helixResourceManager.addTable(tableConfig); + + // Add the segments + int numSegments = 10; + long nowInDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis()); + + for (int i = 0; i < numSegments; i++) { + _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, + SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME, SEGMENT_NAME_PREFIX + i, + nowInDays), null); + } + + Map<String, Map<String, String>> oldSegmentAssignment = + _helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(); + + TableRebalancer tableRebalancer = new TableRebalancer(_helixManager); + + // Try dry-run summary mode + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setReassignInstances(true); + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE); + RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + + RebalanceSummaryResult rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceSummaryResult); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + RebalanceSummaryResult.ServerInfo rebalanceServerInfo = rebalanceSummaryResult.getServerInfo(); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); + assertEquals(rebalanceServerInfo.getNumServers().getExpectedValueAfterRebalance(), NUM_REPLICAS); + + rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); + // Segment assignment should not change + assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment); + + // add one server instance + addFakeServerInstanceToAutoJoinHelixCluster("minimizeDataMovement_" + SERVER_INSTANCE_ID_PREFIX + numServers, true); + + // increase replica group size by 1 + instanceAssignmentConfig = new InstanceAssignmentConfig( + new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null), false, 0, null), null, + new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS + 1, 1, 0, 0, false, null), null, false); + + tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE", instanceAssignmentConfig)); + + // Try dry-run summary mode + + // without minimize data movement, it's supposed to add more than one server + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setReassignInstances(true); + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.DISABLE); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + rebalanceServerInfo = rebalanceResult.getRebalanceSummaryResult().getServerInfo(); + + // note: this assertion may fail due to instance assignment algorithm changed in the future. + // right now, rebalance without minimizing data movement adds more than one server and remove some servers in the + // testing setup like this. + assertTrue(rebalanceServerInfo.getServersAdded().size() > 1); + assertEquals(rebalanceServerInfo.getServersAdded().size() - rebalanceServerInfo.getServersRemoved().size(), 1); + + // use default table config's minimizeDataMovement flag, should be equivalent to without minimize data movement + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setReassignInstances(true); + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.DEFAULT); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + rebalanceServerInfo = rebalanceResult.getRebalanceSummaryResult().getServerInfo(); + + assertTrue(rebalanceServerInfo.getServersAdded().size() > 1); + assertEquals(rebalanceServerInfo.getServersAdded().size() - rebalanceServerInfo.getServersRemoved().size(), 1); + + // with minimize data movement, we should add 1 server only + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setReassignInstances(true); + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + rebalanceServerInfo = rebalanceResult.getRebalanceSummaryResult().getServerInfo(); + + assertEquals(rebalanceServerInfo.getServersAdded().size(), 1); + assertEquals(rebalanceServerInfo.getServersRemoved().size(), 0); + + // rebalance without dry-run + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setReassignInstances(true); + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + assertEquals(rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE).getNumReplicaGroups(), + NUM_REPLICAS + 1); + + // add one server instance + addFakeServerInstanceToAutoJoinHelixCluster("minimizeDataMovement_" + SERVER_INSTANCE_ID_PREFIX + (numServers + 1), + true); + + // decrease replica group size by 1 + instanceAssignmentConfig = new InstanceAssignmentConfig( + new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null), false, 0, null), null, + new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 1, 0, 0, false, null), null, false); + + tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE", instanceAssignmentConfig)); + _helixResourceManager.updateTableConfig(tableConfig); + + // Try dry-run summary mode + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setReassignInstances(true); + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + rebalanceServerInfo = rebalanceResult.getRebalanceSummaryResult().getServerInfo(); + + // with minimize data movement, we should remove 1 server only + assertEquals(rebalanceServerInfo.getServersAdded().size(), 0); + assertEquals(rebalanceServerInfo.getServersRemoved().size(), 1); + + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setReassignInstances(true); + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + assertEquals(rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE).getNumReplicaGroups(), + NUM_REPLICAS); + + _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME); + for (int i = 0; i < numServers; i++) { + stopAndDropFakeInstance("minimizeDataMovement_" + SERVER_INSTANCE_ID_PREFIX + i); + } + } + @AfterClass public void tearDown() { stopFakeInstances(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java index f1165bc9d4..7b08af32c0 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java @@ -33,7 +33,8 @@ public class PinotTableRebalancer extends PinotZKChanger { private final RebalanceConfig _rebalanceConfig = new RebalanceConfig(); public PinotTableRebalancer(String zkAddress, String clusterName, boolean dryRun, boolean preChecks, - boolean reassignInstances, boolean includeConsuming, boolean bootstrap, boolean downtime, + boolean reassignInstances, boolean includeConsuming, + RebalanceConfig.MinimizeDataMovementOptions minimizeDataMovement, boolean bootstrap, boolean downtime, int minReplicasToKeepUpForNoDowntime, boolean lowDiskMode, boolean bestEffort, long externalViewCheckIntervalInMs, long externalViewStabilizationTimeoutInMs) { super(zkAddress, clusterName); @@ -41,6 +42,7 @@ public class PinotTableRebalancer extends PinotZKChanger { _rebalanceConfig.setPreChecks(preChecks); _rebalanceConfig.setReassignInstances(reassignInstances); _rebalanceConfig.setIncludeConsuming(includeConsuming); + _rebalanceConfig.setMinimizeDataMovement(minimizeDataMovement); _rebalanceConfig.setBootstrap(bootstrap); _rebalanceConfig.setDowntime(downtime); _rebalanceConfig.setMinAvailableReplicas(minReplicasToKeepUpForNoDowntime); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java index 96ed26994a..e94e7b2773 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java @@ -64,6 +64,11 @@ public class RebalanceTableCommand extends AbstractBaseAdminCommand implements C description = "Whether to reassign CONSUMING segments for real-time table (true by default)") private boolean _includeConsuming = true; + @CommandLine.Option(names = {"-minimizeDataMovement"}, description = "Whether to enable, disable minimize data " + + "movement algorithm, or use table's default config") + private RebalanceConfig.MinimizeDataMovementOptions _minimizeDataMovement = + RebalanceConfig.MinimizeDataMovementOptions.ENABLE; + @CommandLine.Option(names = {"-bootstrap"}, description = "Whether to rebalance table in bootstrap mode (regardless of minimum segment movement, reassign" + " all segments in a round-robin fashion as if adding new segments to an empty table, false by default)") @@ -110,8 +115,8 @@ public class RebalanceTableCommand extends AbstractBaseAdminCommand implements C throws Exception { PinotTableRebalancer tableRebalancer = new PinotTableRebalancer(_zkAddress, _clusterName, _dryRun, _preChecks, _reassignInstances, _includeConsuming, - _bootstrap, _downtime, _minAvailableReplicas, _lowDiskMode, _bestEfforts, _externalViewCheckIntervalInMs, - _externalViewStabilizationTimeoutInMs); + _minimizeDataMovement, _bootstrap, _downtime, _minAvailableReplicas, _lowDiskMode, _bestEfforts, + _externalViewCheckIntervalInMs, _externalViewStabilizationTimeoutInMs); RebalanceResult rebalanceResult = tableRebalancer.rebalance(_tableNameWithType); LOGGER .info("Got rebalance result: {} for table: {}", JsonUtils.objectToString(rebalanceResult), _tableNameWithType); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org