This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch minimize-instance-movement in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 17139574d51fe9c6649067c71c0f7ab9e56f3e43 Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> AuthorDate: Wed Apr 6 20:22:52 2022 -0700 Minimize data movement between instances in pools --- .../common/assignment/InstancePartitions.java | 66 +++- .../PinotInstanceAssignmentRestletResource.java | 38 +- .../api/resources/PinotTableRestletResource.java | 10 +- .../helix/core/PinotHelixResourceManager.java | 2 +- .../instance/InstanceAssignmentDriver.java | 7 +- .../InstanceReplicaGroupPartitionSelector.java | 63 ++-- ...stanceReplicaGroupPartitionSelectorFactory.java | 47 +++ ...ementInstanceReplicaGroupPartitionSelector.java | 277 +++++++++++++++ .../core/rebalance/RebalanceConfigConstants.java | 4 + .../helix/core/rebalance/TableRebalancer.java | 34 +- .../instance/InstanceAssignmentTest.java | 385 +++++++++++++++++++-- .../apache/pinot/tools/PinotTableRebalancer.java | 3 +- .../tools/admin/command/RebalanceTableCommand.java | 6 +- 13 files changed, 868 insertions(+), 74 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java index c511077187..20fa7478ac 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -58,24 +60,39 @@ import org.apache.pinot.spi.utils.JsonUtils; @JsonIgnoreProperties(ignoreUnknown = true) public class InstancePartitions { private static final char PARTITION_REPLICA_GROUP_SEPARATOR = '_'; + private static final String POOLS_KEY = "pools"; + private static final String REPLICA_GROUP_SEPARATOR = "/"; private final String _instancePartitionsName; + // A map to store the partition and its associated list of instances. + // The partition key would be like "0_0", where the 1st number denotes the partition id, + // and the 2nd one denotes the replica group id. private final Map<String, List<String>> _partitionToInstancesMap; + // A map to store the selected pool numbers and their associated list of replica groups. + private final Map<Integer, List<Integer>> _poolToReplicaGroupsMap; private int _numPartitions; private int _numReplicaGroups; public InstancePartitions(String instancePartitionsName) { _instancePartitionsName = instancePartitionsName; _partitionToInstancesMap = new TreeMap<>(); + _poolToReplicaGroupsMap = new TreeMap<>(); } @JsonCreator private InstancePartitions( @JsonProperty(value = "instancePartitionsName", required = true) String instancePartitionsName, @JsonProperty(value = "partitionToInstancesMap", required = true) - Map<String, List<String>> partitionToInstancesMap) { + Map<String, List<String>> partitionToInstancesMap, + @JsonProperty(value = "poolToReplicaGroupsMap") Map<String, String> poolToReplicaGroupsMap) { _instancePartitionsName = instancePartitionsName; _partitionToInstancesMap = partitionToInstancesMap; + _poolToReplicaGroupsMap = new TreeMap<>(); + if (poolToReplicaGroupsMap != null) { + for (Map.Entry<String, String> entry : poolToReplicaGroupsMap.entrySet()) { + _poolToReplicaGroupsMap.put(Integer.parseInt(entry.getKey()), extractReplicaGroups(entry.getValue())); + } + } for (String key : partitionToInstancesMap.keySet()) { int separatorIndex = key.indexOf(PARTITION_REPLICA_GROUP_SEPARATOR); int partitionId = Integer.parseInt(key.substring(0, separatorIndex)); @@ -105,6 +122,11 @@ public class InstancePartitions { return _numReplicaGroups; } + @JsonIgnore + public Map<Integer, List<Integer>> getPoolToReplicaGroupsMap() { + return _poolToReplicaGroupsMap; + } + public List<String> getInstances(int partitionId, int replicaGroupId) { return _partitionToInstancesMap .get(Integer.toString(partitionId) + PARTITION_REPLICA_GROUP_SEPARATOR + replicaGroupId); @@ -117,13 +139,53 @@ public class InstancePartitions { _numReplicaGroups = Integer.max(_numReplicaGroups, replicaGroupId + 1); } + public void setPoolToReplicaGroupsMap(Map<Integer, List<Integer>> poolToReplicaGroupsMap) { + _poolToReplicaGroupsMap.putAll(poolToReplicaGroupsMap); + } + public static InstancePartitions fromZNRecord(ZNRecord znRecord) { - return new InstancePartitions(znRecord.getId(), znRecord.getListFields()); + return new InstancePartitions(znRecord.getId(), znRecord.getListFields(), znRecord.getMapField(POOLS_KEY)); + } + + private static List<Integer> extractReplicaGroups(String instancesRawString) { + if (instancesRawString == null || instancesRawString.length() == 0) { + return Collections.emptyList(); + } + String[] replicaGroupStringArray = instancesRawString.split(REPLICA_GROUP_SEPARATOR); + List<Integer> instances = new ArrayList<>(replicaGroupStringArray.length); + for (String replicaGroupString : replicaGroupStringArray) { + instances.add(Integer.parseInt(replicaGroupString)); + } + return instances; + } + + private String convertReplicaGroupsToString(List<Integer> replicaGroups) { + if (replicaGroups == null || replicaGroups.isEmpty()) { + return ""; + } + StringBuilder stringBuilder = new StringBuilder(); + for (Integer replicaGroup : replicaGroups) { + if (stringBuilder.length() == 0) { + stringBuilder.append(replicaGroup); + } else { + stringBuilder.append(REPLICA_GROUP_SEPARATOR).append(replicaGroup); + } + } + return stringBuilder.toString(); + } + + private Map<String, String> convertListToStringMap() { + Map<String, String> convertedMap = new TreeMap<>(); + for (Map.Entry<Integer, List<Integer>> entry : _poolToReplicaGroupsMap.entrySet()) { + convertedMap.put(Integer.toString(entry.getKey()), convertReplicaGroupsToString(entry.getValue())); + } + return convertedMap; } public ZNRecord toZNRecord() { ZNRecord znRecord = new ZNRecord(_instancePartitionsName); znRecord.setListFields(_partitionToInstancesMap); + znRecord.setMapField(POOLS_KEY, convertListToStringMap()); return znRecord; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java index 8d9e6cc7dc..40f7bfd0ce 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java @@ -124,7 +124,9 @@ public class PinotInstanceAssignmentRestletResource { @ApiParam(value = "Name of the table") @PathParam("tableName") String tableName, @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") @Nullable InstancePartitionsType instancePartitionsType, - @ApiParam(value = "Whether to do dry-run") @DefaultValue("false") @QueryParam("dryRun") boolean dryRun) { + @ApiParam(value = "Whether to do dry-run") @DefaultValue("false") @QueryParam("dryRun") boolean dryRun, + @ApiParam(value = "Whether to retain current instance sequence") @DefaultValue("false") + @QueryParam("retainInstanceSequence") boolean retainInstanceSequence) { Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = new TreeMap<>(); List<InstanceConfig> instanceConfigs = _resourceManager.getAllHelixInstanceConfigs(); @@ -136,8 +138,8 @@ public class PinotInstanceAssignmentRestletResource { try { if (InstanceAssignmentConfigUtils .allowInstanceAssignment(offlineTableConfig, InstancePartitionsType.OFFLINE)) { - instancePartitionsMap.put(InstancePartitionsType.OFFLINE, new InstanceAssignmentDriver(offlineTableConfig) - .assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs)); + assignInstancesForInstancePartitionsType(instancePartitionsMap, offlineTableConfig, instanceConfigs, + InstancePartitionsType.OFFLINE, retainInstanceSequence); } } catch (IllegalStateException e) { throw new ControllerApplicationException(LOGGER, "Caught IllegalStateException", Response.Status.BAD_REQUEST, @@ -152,19 +154,18 @@ public class PinotInstanceAssignmentRestletResource { TableConfig realtimeTableConfig = _resourceManager.getRealtimeTableConfig(tableName); if (realtimeTableConfig != null) { try { - InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(realtimeTableConfig); if (instancePartitionsType == InstancePartitionsType.CONSUMING || instancePartitionsType == null) { if (InstanceAssignmentConfigUtils .allowInstanceAssignment(realtimeTableConfig, InstancePartitionsType.CONSUMING)) { - instancePartitionsMap.put(InstancePartitionsType.CONSUMING, - instanceAssignmentDriver.assignInstances(InstancePartitionsType.CONSUMING, instanceConfigs)); + assignInstancesForInstancePartitionsType(instancePartitionsMap, realtimeTableConfig, instanceConfigs, + InstancePartitionsType.CONSUMING, retainInstanceSequence); } } if (instancePartitionsType == InstancePartitionsType.COMPLETED || instancePartitionsType == null) { if (InstanceAssignmentConfigUtils .allowInstanceAssignment(realtimeTableConfig, InstancePartitionsType.COMPLETED)) { - instancePartitionsMap.put(InstancePartitionsType.COMPLETED, - instanceAssignmentDriver.assignInstances(InstancePartitionsType.COMPLETED, instanceConfigs)); + assignInstancesForInstancePartitionsType(instancePartitionsMap, realtimeTableConfig, instanceConfigs, + InstancePartitionsType.COMPLETED, retainInstanceSequence); } } } catch (IllegalStateException e) { @@ -191,6 +192,27 @@ public class PinotInstanceAssignmentRestletResource { return instancePartitionsMap; } + /** + * Assign instances given the type of instancePartitions. + * @param instancePartitionsMap the empty map to be filled. + * @param tableConfig table config + * @param instanceConfigs list of instance configs + * @param instancePartitionsType type of instancePartitions + * @param retainInstanceSequence whether to retain instance sequence + */ + private void assignInstancesForInstancePartitionsType( + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, TableConfig tableConfig, + List<InstanceConfig> instanceConfigs, InstancePartitionsType instancePartitionsType, + boolean retainInstanceSequence) { + InstancePartitions existingInstancePartitions = null; + if (retainInstanceSequence) { + existingInstancePartitions = InstancePartitionsUtils + .fetchOrComputeInstancePartitions(_resourceManager.getHelixZkManager(), tableConfig, instancePartitionsType); + } + instancePartitionsMap.put(instancePartitionsType, new InstanceAssignmentDriver(tableConfig) + .assignInstances(instancePartitionsType, instanceConfigs, existingInstancePartitions)); + } + private void persistInstancePartitionsHelper(InstancePartitions instancePartitions) { try { LOGGER.info("Persisting instance partitions: {}", instancePartitions); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index 46e7a2e7da..1db480acce 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -569,9 +569,12 @@ public class PinotTableRestletResource { boolean downtime, @ApiParam( value = "For no-downtime rebalance, minimum number of replicas to keep alive during rebalance, or maximum " + "number of replicas allowed to be unavailable if value is negative") @DefaultValue("1") - @QueryParam("minAvailableReplicas") int minAvailableReplicas, @ApiParam( - value = "Whether to use best-efforts to rebalance (not fail the rebalance when the no-downtime contract cannot " - + "be achieved)") @DefaultValue("false") @QueryParam("bestEfforts") boolean bestEfforts) { + @QueryParam("minAvailableReplicas") int minAvailableReplicas, + @ApiParam(value = "Whether to use best-efforts to rebalance (not fail the rebalance" + + " when the no-downtime contract cannot be achieved)") @DefaultValue("false") + @QueryParam("bestEfforts") boolean bestEfforts, + @ApiParam(value = "Whether to retain instance sequence during rebalancing in order to minimize data movement") + @DefaultValue("false") @QueryParam("retainInstancesSequence") boolean retainInstancesSequence) { String tableNameWithType = constructTableNameWithType(tableName, tableTypeStr); @@ -583,6 +586,7 @@ public class PinotTableRestletResource { rebalanceConfig.addProperty(RebalanceConfigConstants.DOWNTIME, downtime); rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME, minAvailableReplicas); rebalanceConfig.addProperty(RebalanceConfigConstants.BEST_EFFORTS, bestEfforts); + rebalanceConfig.addProperty(RebalanceConfigConstants.RETAIN_INSTANCE_SEQUENCE, retainInstancesSequence); try { if (dryRun || downtime) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 3b901171e8..8fdcf48c60 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -1649,7 +1649,7 @@ public class PinotHelixResourceManager { List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs(); for (InstancePartitionsType instancePartitionsType : instancePartitionsTypesToAssign) { InstancePartitions instancePartitions = - instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs); + instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs, null); LOGGER.info("Persisting instance partitions: {}", instancePartitions); InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java index 1440fae204..444f4934ea 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java @@ -52,7 +52,7 @@ public class InstanceAssignmentDriver { } public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType, - List<InstanceConfig> instanceConfigs) { + List<InstanceConfig> instanceConfigs, InstancePartitions existingInstancePartitions) { String tableNameWithType = _tableConfig.getTableName(); LOGGER.info("Starting {} instance assignment for table: {}", instancePartitionsType, tableNameWithType); @@ -73,8 +73,9 @@ public class InstanceAssignmentDriver { poolToInstanceConfigsMap = constraintApplier.applyConstraint(poolToInstanceConfigsMap); } - InstanceReplicaGroupPartitionSelector replicaPartitionSelector = - new InstanceReplicaGroupPartitionSelector(assignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType); + InstanceReplicaGroupPartitionSelector replicaPartitionSelector = InstanceReplicaGroupPartitionSelectorFactory + .generateInstanceReplicaGroupPartitionSelector(assignmentConfig.getReplicaGroupPartitionConfig(), + tableNameWithType, existingInstancePartitions); InstancePartitions instancePartitions = new InstancePartitions( instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType))); replicaPartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java index 3e83bf7720..13cbc9cd07 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java @@ -37,8 +37,8 @@ import org.slf4j.LoggerFactory; public class InstanceReplicaGroupPartitionSelector { private static final Logger LOGGER = LoggerFactory.getLogger(InstanceReplicaGroupPartitionSelector.class); - private final InstanceReplicaGroupPartitionConfig _replicaGroupPartitionConfig; - private final String _tableNameWithType; + protected final InstanceReplicaGroupPartitionConfig _replicaGroupPartitionConfig; + protected final String _tableNameWithType; public InstanceReplicaGroupPartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, String tableNameWithType) { @@ -163,31 +163,44 @@ public class InstanceReplicaGroupPartitionSelector { } } else { // Non-replica-group based selection + selectForNonReplicaGroupBased(poolToInstanceConfigsMap, instancePartitions); + } + } - // Pick one pool based on the table name hash - int pool = pools.get(tableNameHash % numPools); - LOGGER.info("Selecting pool: {} for table: {}", pool, _tableNameWithType); - List<InstanceConfig> instanceConfigs = poolToInstanceConfigsMap.get(pool); - int numInstanceConfigs = instanceConfigs.size(); - - // Assign all instances if not configured - int numInstancesToSelect = _replicaGroupPartitionConfig.getNumInstances(); - if (numInstancesToSelect > 0) { - Preconditions.checkState(numInstancesToSelect <= numInstanceConfigs, - "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstanceConfigs, - numInstancesToSelect); - } else { - numInstancesToSelect = numInstanceConfigs; - } + protected void selectForNonReplicaGroupBased(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap, + InstancePartitions instancePartitions) { + int numPools = poolToInstanceConfigsMap.size(); + Preconditions.checkState(numPools != 0, "No pool qualified for selection"); - List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect); - for (int i = 0; i < numInstancesToSelect; i++) { - instancesToSelect.add(instanceConfigs.get(i).getInstanceName()); - } - instancesToSelect.sort(null); - LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, _tableNameWithType); - // Set the instances as partition 0 replica 0 - instancePartitions.setInstances(0, 0, instancesToSelect); + int tableNameHash = Math.abs(_tableNameWithType.hashCode()); + List<Integer> pools = new ArrayList<>(poolToInstanceConfigsMap.keySet()); + pools.sort(null); + LOGGER.info("Starting instance replica-group/partition selection for table: {} with hash: {} from pools: {}", + _tableNameWithType, tableNameHash, pools); + + // Pick one pool based on the table name hash + int pool = pools.get(tableNameHash % numPools); + LOGGER.info("Selecting pool: {} for table: {}", pool, _tableNameWithType); + List<InstanceConfig> instanceConfigs = poolToInstanceConfigsMap.get(pool); + int numInstanceConfigs = instanceConfigs.size(); + + // Assign all instances if not configured + int numInstancesToSelect = _replicaGroupPartitionConfig.getNumInstances(); + if (numInstancesToSelect > 0) { + Preconditions.checkState(numInstancesToSelect <= numInstanceConfigs, + "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstanceConfigs, + numInstancesToSelect); + } else { + numInstancesToSelect = numInstanceConfigs; + } + + List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect); + for (int i = 0; i < numInstancesToSelect; i++) { + instancesToSelect.add(instanceConfigs.get(i).getInstanceName()); } + instancesToSelect.sort(null); + LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, _tableNameWithType); + // Set the instances as partition 0 replica 0 + instancePartitions.setInstances(0, 0, instancesToSelect); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorFactory.java new file mode 100644 index 0000000000..36e16e8d27 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorFactory.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment.instance; + +import org.apache.pinot.common.assignment.InstancePartitions; +import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; + +/** + * A factory class to generate {@link InstanceReplicaGroupPartitionSelector}. + */ +public class InstanceReplicaGroupPartitionSelectorFactory { + + private InstanceReplicaGroupPartitionSelectorFactory() { + } + + public static InstanceReplicaGroupPartitionSelector generateInstanceReplicaGroupPartitionSelector( + InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, String tableNameWithType, + InstancePartitions existingInstancePartitions) { + InstanceReplicaGroupPartitionSelector replicaPartitionSelector; + if (existingInstancePartitions == null) { + replicaPartitionSelector = + new InstanceReplicaGroupPartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType); + } else { + // If existing instance partitions is not null, use the customized selector to minimize data movement. + replicaPartitionSelector = + new MinimizedDataMovementInstanceReplicaGroupPartitionSelector(instanceReplicaGroupPartitionConfig, + tableNameWithType, existingInstancePartitions); + } + return replicaPartitionSelector; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MinimizedDataMovementInstanceReplicaGroupPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MinimizedDataMovementInstanceReplicaGroupPartitionSelector.java new file mode 100644 index 0000000000..0af1264d3a --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MinimizedDataMovementInstanceReplicaGroupPartitionSelector.java @@ -0,0 +1,277 @@ +package org.apache.pinot.controller.helix.core.assignment.instance; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.assignment.InstancePartitions; +import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * An extended class of {@link InstanceReplicaGroupPartitionSelector} to minimize data movement between instances. + * Currently the following scenarios are supported: + * * swap instances within a pool + * * add / remove instances per replica group + * * increase / decrease number of replica groups + * + * TODO: Support the remaining scenarios: + * * add / remove pools + */ +public class MinimizedDataMovementInstanceReplicaGroupPartitionSelector extends InstanceReplicaGroupPartitionSelector { + private static final Logger LOGGER = + LoggerFactory.getLogger(MinimizedDataMovementInstanceReplicaGroupPartitionSelector.class); + + private final InstancePartitions _existingInstancePartitions; + + public MinimizedDataMovementInstanceReplicaGroupPartitionSelector( + InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, String tableNameWithType, + InstancePartitions existingInstancePartitions) { + super(replicaGroupPartitionConfig, tableNameWithType); + _existingInstancePartitions = existingInstancePartitions; + } + + @Override + public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap, + InstancePartitions instancePartitions) { + int numPools = poolToInstanceConfigsMap.size(); + Preconditions.checkState(numPools != 0, "No pool qualified for selection"); + + int tableNameHash = Math.abs(_tableNameWithType.hashCode()); + List<Integer> pools = new ArrayList<>(poolToInstanceConfigsMap.keySet()); + pools.sort(null); + Preconditions.checkState(pools.containsAll(_existingInstancePartitions.getPoolToReplicaGroupsMap().keySet()), + String.format("The existing pool no longer exists in ZK any more. Existing pools: %s. Latest pools: %s", + _existingInstancePartitions.getPoolToReplicaGroupsMap().keySet(), pools)); + LOGGER.info("Starting instance replica-group/partition selection for table: {} with hash: {} from pools: {}", + _tableNameWithType, tableNameHash, pools); + + if (_replicaGroupPartitionConfig.isReplicaGroupBased()) { + // Replica-group based selection + + // Find out the mapping between pool and replica groups. + int numReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups(); + Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups must be positive"); + Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>(); + Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>(); + for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) { + // Pick one pool for each replica-group based on the table name hash + int pool = pools.get((tableNameHash + replicaId) % numPools); + poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId); + replicaGroupIdToPoolMap.put(replicaId, pool); + } + LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap, + _tableNameWithType); + + // Finalize the number of instances per replica group. + int numInstancesPerReplicaGroup = _replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup(); + if (numInstancesPerReplicaGroup > 0) { + // Check if we have enough instances if number of instances per replica-group is configured + for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) { + int pool = entry.getKey(); + int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size(); + int numInstancesToSelect = numInstancesPerReplicaGroup * entry.getValue().size(); + Preconditions.checkState(numInstancesToSelect <= numInstancesInPool, + "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstancesInPool, + numInstancesToSelect); + } + } else { + // Use as many instances as possible if number of instances per replica-group is not configured + numInstancesPerReplicaGroup = Integer.MAX_VALUE; + for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) { + int pool = entry.getKey(); + int numReplicaGroupsInPool = entry.getValue().size(); + int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size(); + Preconditions.checkState(numReplicaGroupsInPool <= numInstancesInPool, + "Not enough qualified instances from pool: %s, cannot select %s replica-groups from %s instances", pool, + numReplicaGroupsInPool, numInstancesInPool); + numInstancesPerReplicaGroup = + Math.min(numInstancesPerReplicaGroup, numInstancesInPool / numReplicaGroupsInPool); + } + } + LOGGER.info("Selecting {} instances per replica-group for table: {}", numInstancesPerReplicaGroup, + _tableNameWithType); + + // Assign instances within a replica-group to one partition if not configured + int numPartitions = _replicaGroupPartitionConfig.getNumPartitions(); + if (numPartitions <= 0) { + numPartitions = 1; + } + // Assign all instances within a replica-group to each partition if not configured + int numInstancesPerPartition = _replicaGroupPartitionConfig.getNumInstancesPerPartition(); + if (numInstancesPerPartition > 0) { + Preconditions.checkState(numInstancesPerPartition <= numInstancesPerReplicaGroup, + "Number of instances per partition: %s must be smaller or equal to number of instances per replica-group:" + + " %s", numInstancesPerPartition, numInstancesPerReplicaGroup); + } else { + numInstancesPerPartition = numInstancesPerReplicaGroup; + } + LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}", + numPartitions, numInstancesPerPartition, _tableNameWithType); + + // Step 1: Identify candidate instances from latest list of instance configs in ZK. + Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>(); + for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) { + Integer pool = entry.getKey(); + List<InstanceConfig> instanceConfigs = entry.getValue(); + for (InstanceConfig instanceConfig : instanceConfigs) { + poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>()) + .add(instanceConfig.getInstanceName()); + } + } + + Map<Integer, Map<String, String>> poolToGoneInstancesAndReplacedInstancesMap = new TreeMap<>(); + Map<String, List<String>> existingPartitionToLatestInstancesMap = new TreeMap<>(); + Map<String, List<String>> existingPartitionToInstancesMap = + _existingInstancePartitions.getPartitionToInstancesMap(); + Map<Integer, Set<String>> poolToExistingAliveInstancesMap = new TreeMap<>(); + + int maxNumberOfInstancesPerInstancePartitionAssignment = Integer.MIN_VALUE; + for (List<String> instances : existingPartitionToInstancesMap.values()) { + maxNumberOfInstancesPerInstancePartitionAssignment = + Math.max(maxNumberOfInstancesPerInstancePartitionAssignment, instances.size()); + } + + // Step 2: by reusing the existing mapping, find out the missing instances. + for (int replicaGroupId = 0; replicaGroupId < _existingInstancePartitions.getNumReplicaGroups(); + replicaGroupId++) { + Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId); + if (pool == null) { + // Skip validating replica group if it's no longer needed. + continue; + } + for (int partitionId = 0; partitionId < _existingInstancePartitions.getNumPartitions(); partitionId++) { + List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); + List<String> latestInstancesInMap = existingPartitionToLatestInstancesMap + .computeIfAbsent(partitionId + "_" + replicaGroupId, k -> new ArrayList<>()); + + for (String existingInstance : existingInstances) { + // The instance still exists in the ZK. + if (poolToReplicaGroupIdsMap.containsKey(pool) && poolToCandidateInstancesMap.containsKey(pool) + && poolToCandidateInstancesMap.get(pool).contains(existingInstance)) { + poolToExistingAliveInstancesMap.computeIfAbsent(pool, k -> new HashSet<>()).add(existingInstance); + latestInstancesInMap.add(existingInstance); + } else { + // The instance no longer exists + poolToGoneInstancesAndReplacedInstancesMap.computeIfAbsent(pool, k -> new TreeMap<>()) + .put(existingInstance, null); + latestInstancesInMap.add(null); + } + } + } + } + + // Step 3: Find out all new instances in each pool. + Map<Integer, Deque<String>> poolToNewCandidateInstancesMap = new TreeMap<>(); + for (Map.Entry<Integer, Set<String>> entry : poolToCandidateInstancesMap.entrySet()) { + Integer pool = entry.getKey(); + Set<String> candidateInstancesInPool = entry.getValue(); + Set<String> existingStillAliveInstances = + poolToExistingAliveInstancesMap.computeIfAbsent(pool, k -> new HashSet<>()); + for (String candidateInstance : candidateInstancesInPool) { + if (!existingStillAliveInstances.contains(candidateInstance)) { + poolToNewCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedList<>()).add(candidateInstance); + } + } + } + + // Step 4: Find the 1:1 mapping between the gone instance and the new instance. + for (Map.Entry<Integer, Map<String, String>> entry : poolToGoneInstancesAndReplacedInstancesMap.entrySet()) { + Integer pool = entry.getKey(); + Map<String, String> goneInstanceToNewInstanceMap = entry.getValue(); + Deque<String> newInstancesInPool = + poolToNewCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedList<>()); + goneInstanceToNewInstanceMap.replaceAll((k, v) -> { + if (!newInstancesInPool.isEmpty()) { + return newInstancesInPool.pollFirst(); + } else { + return v; + } + }); + } + + // Step 5: Fill the vacant positions with the new instances. + Map<String, List<String>> newInstancePartitionsAssignmentMap = new TreeMap<>(); + int finalNumInstancesPerPartition = numInstancesPerPartition; + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { + Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId); + Map<String, String> goneInstanceToNewInstanceMap = + poolToGoneInstancesAndReplacedInstancesMap.computeIfAbsent(pool, k -> new TreeMap<>()); + Set<String> candidateInstancesMap = poolToCandidateInstancesMap.get(pool); + String partitionToReplicaGroupKey = partitionId + "_" + replicaGroupId; + List<String> existingInstances = + existingPartitionToInstancesMap.computeIfAbsent(partitionToReplicaGroupKey, k -> new ArrayList<>()); + + // Construct an empty list to store the latest instances. + List<String> latestInstanceAssignment = + newInstancePartitionsAssignmentMap.computeIfAbsent(partitionToReplicaGroupKey, k -> { + List<String> instances = new ArrayList<>(finalNumInstancesPerPartition); + for (int i = 0; i < finalNumInstancesPerPartition; i++) { + instances.add(null); + } + return instances; + }); + + // Traverse the existing list of instances, fill the vacant positions with new instances from the map. + for (int i = 0; i < existingInstances.size() && i < finalNumInstancesPerPartition; i++) { + String existingInstance = existingInstances.get(i); + String replacedInstance = goneInstanceToNewInstanceMap.get(existingInstance); + if (replacedInstance != null) { + latestInstanceAssignment.set(i, replacedInstance); + candidateInstancesMap.remove(replacedInstance); + } else { + // If the instance does exist in the gone map but there is no new instance to replace its position, + // skip adding anything into the assignment. + if (!goneInstanceToNewInstanceMap.containsKey(existingInstance)) { + latestInstanceAssignment.set(i, existingInstance); + candidateInstancesMap.remove(existingInstance); + } + } + } + // If the new number of instances per partition is larger than the previous one, extend the vacant positions. + if (finalNumInstancesPerPartition > existingInstances.size()) { + Iterator<String> candidateInstancesInPoolIterator = candidateInstancesMap.iterator(); + for (int i = existingInstances.size(); i < finalNumInstancesPerPartition; i++) { + if (candidateInstancesInPoolIterator.hasNext()) { + String candidateInstance = candidateInstancesInPoolIterator.next(); + latestInstanceAssignment.set(i, candidateInstance); + candidateInstancesInPoolIterator.remove(); + } + } + } + + // Fill up the vacant positions if any. + for (int i = 0; i < latestInstanceAssignment.size(); i++) { + Iterator<String> candidateInstancesInPoolIterator = candidateInstancesMap.iterator(); + if (latestInstanceAssignment.get(i) == null) { + if (candidateInstancesInPoolIterator.hasNext()) { + String candidateInstance = candidateInstancesInPoolIterator.next(); + latestInstanceAssignment.set(i, candidateInstance); + candidateInstancesInPoolIterator.remove(); + } + } + } + + instancePartitions.setInstances(partitionId, replicaGroupId, latestInstanceAssignment); + } + } + + // Persist poolToReplicaGroupsMap to ZK. + instancePartitions.setPoolToReplicaGroupsMap(poolToReplicaGroupIdsMap); + } else { + // Non-replica-group based selection + selectForNonReplicaGroupBased(poolToInstanceConfigsMap, instancePartitions); + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java index 70bebabc17..d08fbbfd84 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java @@ -33,6 +33,10 @@ public class RebalanceConfigConstants { public static final String REASSIGN_INSTANCES = "reassignInstances"; public static final boolean DEFAULT_REASSIGN_INSTANCES = false; + // Whether to retain the sequence for the existing instances + public static final String RETAIN_INSTANCE_SEQUENCE = "retainInstancesSequence"; + public static final boolean DEFAULT_RETAIN_INSTANCE_SEQUENCE = false; + // Whether to reassign CONSUMING segments public static final String INCLUDE_CONSUMING = "includeConsuming"; public static final boolean DEFAULT_INCLUDE_CONSUMING = false; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index d6701c8363..8f86ac6fa9 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -145,6 +145,8 @@ public class TableRebalancer { tableConfig.getRoutingConfig().getInstanceSelectorType()); boolean bestEfforts = rebalanceConfig.getBoolean(RebalanceConfigConstants.BEST_EFFORTS, RebalanceConfigConstants.DEFAULT_BEST_EFFORTS); + boolean retainInstanceSequence = rebalanceConfig.getBoolean(RebalanceConfigConstants.RETAIN_INSTANCE_SEQUENCE, + RebalanceConfigConstants.DEFAULT_RETAIN_INSTANCE_SEQUENCE); LOGGER.info( "Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, includeConsuming: {}, bootstrap: {}, " + "downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}, bestEfforts: {}", @@ -194,7 +196,7 @@ public class TableRebalancer { // Calculate instance partitions map Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap; try { - instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, dryRun); + instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, dryRun, retainInstanceSequence); } catch (Exception e) { LOGGER.warn( "Caught exception while fetching/calculating instance partitions for table: {}, aborting the rebalance", @@ -323,10 +325,12 @@ public class TableRebalancer { currentIdealState = idealState; currentAssignment = currentIdealState.getRecord().getMapFields(); // Re-calculate the instance partitions in case the instance configs changed during the rebalance - instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, false); + instancePartitionsMap = + getInstancePartitionsMap(tableConfig, reassignInstances, false, retainInstanceSequence); tierToInstancePartitionsMap = getTierToInstancePartitionsMap(tableNameWithType, sortedTiers); - targetAssignment = segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers, - tierToInstancePartitionsMap, rebalanceConfig); + targetAssignment = segmentAssignment + .rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers, tierToInstancePartitionsMap, + rebalanceConfig); expectedVersion = currentIdealState.getRecord().getVersion(); } catch (Exception e) { LOGGER.warn( @@ -381,21 +385,24 @@ public class TableRebalancer { } private Map<InstancePartitionsType, InstancePartitions> getInstancePartitionsMap(TableConfig tableConfig, - boolean reassignInstances, boolean dryRun) { + boolean reassignInstances, boolean dryRun, boolean retainInstanceSequence) { Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = new TreeMap<>(); if (tableConfig.getTableType() == TableType.OFFLINE) { instancePartitionsMap.put(InstancePartitionsType.OFFLINE, - getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, reassignInstances, dryRun)); + getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, reassignInstances, dryRun, + retainInstanceSequence)); } else { instancePartitionsMap.put(InstancePartitionsType.CONSUMING, - getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, reassignInstances, dryRun)); + getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, reassignInstances, dryRun, + retainInstanceSequence)); String tableNameWithType = tableConfig.getTableName(); if (InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig)) { LOGGER.info( "COMPLETED segments should be relocated, fetching/computing COMPLETED instance partitions for table: {}", tableNameWithType); instancePartitionsMap.put(InstancePartitionsType.COMPLETED, - getInstancePartitions(tableConfig, InstancePartitionsType.COMPLETED, reassignInstances, dryRun)); + getInstancePartitions(tableConfig, InstancePartitionsType.COMPLETED, reassignInstances, dryRun, + retainInstanceSequence)); } else { LOGGER.info( "COMPLETED segments should not be relocated, skipping fetching/computing COMPLETED instance partitions " @@ -413,14 +420,21 @@ public class TableRebalancer { } private InstancePartitions getInstancePartitions(TableConfig tableConfig, - InstancePartitionsType instancePartitionsType, boolean reassignInstances, boolean dryRun) { + InstancePartitionsType instancePartitionsType, boolean reassignInstances, boolean dryRun, + boolean retainInstanceSequence) { String tableNameWithType = tableConfig.getTableName(); if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, instancePartitionsType)) { if (reassignInstances) { + InstancePartitions existingInstancePartitions = null; + if (retainInstanceSequence) { + existingInstancePartitions = InstancePartitionsUtils + .fetchOrComputeInstancePartitions(_helixManager, tableConfig, instancePartitionsType); + } LOGGER.info("Reassigning {} instances for table: {}", instancePartitionsType, tableNameWithType); InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig); InstancePartitions instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, - _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true)); + _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true), + existingInstancePartitions); if (!dryRun) { LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions); InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitions); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java index f1fc049ff5..197b4919df 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java @@ -71,7 +71,8 @@ public class InstanceAssignmentTest { } // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 2 instances - InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + InstancePartitions instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas); assertEquals(instancePartitions.getNumPartitions(), 1); // Instances of index 4 to 7 are not assigned because of the hash-based rotation @@ -95,7 +96,7 @@ public class InstanceAssignmentTest { // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 3 instances, then these 3 // instances should be assigned to 2 partitions, each with 2 instances - instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas); assertEquals(instancePartitions.getNumPartitions(), numPartitions); // Instance of index 7 is not assigned because of the hash-based rotation @@ -123,6 +124,85 @@ public class InstanceAssignmentTest { Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3)); assertEquals(instancePartitions.getInstances(1, 2), Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 6)); + + // ===== Test against the cases when the existing instancePartitions isn't null. ===== + // Put the existing instancePartitions as the parameter to the InstanceAssignmentDriver. + // The returned instance partition should be the same as the last computed one. + + // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 3 instances, then these 3 + // instances should be assigned to 2 partitions, each with 2 instances + instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, instancePartitions); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas); + assertEquals(instancePartitions.getNumPartitions(), numPartitions); + + // Instance of index 7 is not assigned because of the hash-based rotation + // Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8 + // [i8, i9, i0, i1, i2, i3, i4, i5, i6, i7] + // r0, r1, r2, r0, r1, r2, r0, r1, r2 + // r0: [i8, i1, i4] + // p0, p0, p1 + // p1 + // r1: [i9, i2, i5] + // p0, p0, p1 + // p1 + // r2: [i0, i3, i6] + // p0, p0, p1 + // p1 + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 8)); + assertEquals(instancePartitions.getInstances(1, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 8)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 9)); + assertEquals(instancePartitions.getInstances(1, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 9)); + assertEquals(instancePartitions.getInstances(0, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3)); + assertEquals(instancePartitions.getInstances(1, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 6)); + + // Remove two instances (i2, i6) and add two new instances (i10, i11). + instanceConfigs.remove(6); + instanceConfigs.remove(2); + for (int i = numInstances; i < numInstances + 2; i++) { + InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i); + instanceConfig.addTag(OFFLINE_TAG); + instanceConfigs.add(instanceConfig); + } + + // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 3 instances, then these 3 + // instances should be assigned to 2 partitions, each with 2 instances + // Leverage the latest instancePartitions from last computation as the parameter. + // Data movement is minimized so that: i2 -> i10, i6 -> i11 + instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, instancePartitions); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas); + assertEquals(instancePartitions.getNumPartitions(), numPartitions); + + // Instance of index 7 is not assigned because of the hash-based rotation + // Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8 + // [i8, i9, i0, i1, i10, i3, i4, i5, i11, i7] + // r0, r1, r2, r0, r1, r2, r0, r1, r2 + // r0: [i8, i1, i4] + // p0, p0, p1 + // p1 + // r1: [i9, i10, i5] + // p0, p0, p1 + // p1 + // r2: [i0, i3, i11] + // p0, p0, p1 + // p1 + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 8)); + assertEquals(instancePartitions.getInstances(1, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 8)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 9)); + assertEquals(instancePartitions.getInstances(1, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 9)); + assertEquals(instancePartitions.getInstances(0, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3)); + assertEquals(instancePartitions.getInstances(1, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 11)); } @Test @@ -155,7 +235,8 @@ public class InstanceAssignmentTest { // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to // replica-group 1 - InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + InstancePartitions instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); assertEquals(instancePartitions.getNumPartitions(), 1); assertEquals(instancePartitions.getInstances(0, 0), Arrays @@ -182,7 +263,7 @@ public class InstanceAssignmentTest { // Pool 0 and 2 will be selected in the pool selection // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 2 should be assigned to // replica-group 1 - instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); assertEquals(instancePartitions.getNumPartitions(), 1); assertEquals(instancePartitions.getInstances(0, 0), Arrays @@ -200,7 +281,7 @@ public class InstanceAssignmentTest { // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2 // All instances in pool 2 should be assigned to replica-group 0, and all instances in pool 0 should be assigned to // replica-group 1 - instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); assertEquals(instancePartitions.getNumPartitions(), 1); assertEquals(instancePartitions.getInstances(0, 0), Arrays @@ -218,7 +299,7 @@ public class InstanceAssignmentTest { // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to // replica-group 1 - instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); assertEquals(instancePartitions.getNumPartitions(), 1); assertEquals(instancePartitions.getInstances(0, 0), Arrays @@ -244,7 +325,7 @@ public class InstanceAssignmentTest { // r0 r2 r0 r2 // pool 1: [i8, i9, i5, i6, i7] // r1 r1 - instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); assertEquals(instancePartitions.getNumPartitions(), 1); assertEquals(instancePartitions.getInstances(0, 0), @@ -253,6 +334,269 @@ public class InstanceAssignmentTest { Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9)); assertEquals(instancePartitions.getInstances(0, 2), Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4)); + + // ===== Test against the cases when the existing instancePartitions isn't null. ===== + // Reset the number of replica groups to 2 and pools to 2. + numReplicaGroups = 2; + numPools = 2; + replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0); + tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); + tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); + // Reset the instance configs to have only two pools. + instanceConfigs.clear(); + numInstances = 10; + for (int i = 0; i < numInstances; i++) { + InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i); + instanceConfig.addTag(OFFLINE_TAG); + int pool = i / numInstancesPerPool; + instanceConfig.getRecord() + .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool))); + instanceConfigs.add(instanceConfig); + } + + // Use all pools, the instancePartitions should be the same as the one without using + // the existing partition to instances map. + // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 + // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to + // replica-group 1. + // [pool0, pool1] + // r0 r1 + InstancePartitions existingInstancePartitions = null; + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + assertEquals(instancePartitions.getInstances(0, 0), Arrays + .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2, + SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4)); + assertEquals(instancePartitions.getInstances(0, 1), Arrays + .asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7, + SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9)); + + // Get the latest existingPoolToInstancesMap from last computation and try again. + // The actual assignment should be the same as last one. + existingInstancePartitions = instancePartitions; + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + assertEquals(instancePartitions.getInstances(0, 0), Arrays + .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2, + SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4)); + assertEquals(instancePartitions.getInstances(0, 1), Arrays + .asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7, + SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9)); + + // Select pool 0 and 1 in pool selection + tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 1)); + tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); + + // Get the latest existingInstancePartitions from last computation. + existingInstancePartitions = instancePartitions; + + // Putting the existingPoolToInstancesMap shouldn't change the instance assignment. + // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 + // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to + // replica-group 1 + // Now in poolToInstancesMap: + // pool 0: [ i3, i4, i0, i1, i2 ] + // pool 1: [ i8, i9, i5, i6, i7 ] + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + assertEquals(instancePartitions.getInstances(0, 0), Arrays + .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2, + SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4)); + assertEquals(instancePartitions.getInstances(0, 1), Arrays + .asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7, + SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9)); + + // Assign instances from 2 pools to 3 replica-groups + numReplicaGroups = 3; + replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0); + tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); + + // Get the latest existingInstancePartitions from last computation. + existingInstancePartitions = instancePartitions; + + // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 + // [pool0, pool1] + // r0 r1 + // r2 + // Each replica-group should have 2 instances assigned + // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3 + // Latest instances from ZK: + // pool 0: [ i3, i4, i0, i1, i2 ] + // pool 1: [ i8, i9, i5, i6, i7 ] + // Due to the fact that in existing instancePartition instances are sorted, i0 and i1 will be retained for r0, + // i5 and i6 will be retained for r1. i3 and i4 are picked up from latest instances in the target pool. + // Thus, the new assignment will be as follows: + // pool 0: [i0, i1, i2, i3, i4] + // r0 r0 r2 r2 + // pool 1: [i5, i6, i7, i8, i9] + // r1 r1 + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6)); + assertEquals(instancePartitions.getInstances(0, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4)); + + // Remove one instance from each of the pools and add one more back. + instanceConfigs.remove(5); + instanceConfigs.remove(3); + int poolCount = 0; + for (int i = numInstances; i < numInstances + 2; i++) { + InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i); + instanceConfig.addTag(OFFLINE_TAG); + int pool = poolCount++; + instanceConfig.getRecord() + .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool))); + instanceConfigs.add(instanceConfig); + } + + // Get the latest existingInstancePartitions from last computation. + existingInstancePartitions = instancePartitions; + + // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 + // [pool0, pool1] + // r0 r1 + // r2 + // Each replica-group should have 2 instances assigned + // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3 + // Latest instances from ZK: + // pool 0: [ i2, i4, i0, i1, i10 ] + // pool 1: [ i8, i9, i11, i6, i7 ] + // i3 gets swapped out, the next available instance i2 will take its place. + // Similarly, i5 is swapped out and i8 will take its place. + // Thus, the new assignment will be as follows: + // pool 0: [ i2, i4, i0, i1, i10 ] + // r2 r2 r0 r0 + // pool 1: [ i8, i9, i11, i6, i7 ] + // r1 r1 + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 6)); + assertEquals(instancePartitions.getInstances(0, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 4)); + + // Reduce number of replica groups from 3 to 2. + numReplicaGroups = 2; + replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0); + tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); + + // Get the latest existingInstancePartitions from last computation. + existingInstancePartitions = instancePartitions; + + // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 + // [pool0, pool1] + // r0 r1 + // Each replica-group should have 2 instances assigned + // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3 + // Latest instances from ZK: + // pool 0: [ i2, i4, i0, i1, i10 ] + // pool 1: [ i8, i9, i11, i6, i7 ] + // In the existing instancePartitions, r0 already has [i0, i1], append the rest + // available instances (ie. [i2, i4, i10]) to the tail. + // r1 already has [i8, i6], append the rest available instances (ie. [i9, i11, i7]) to the tail. + // Thus, the new assignment will become: + // pool 0: [ i0, i1, i2, i4, i10 ] + // r0 r0 r0 r0 r0 + // pool 1: [ i8, i6, i9, i11, i7 ] + // r1 r1 r1 r1 r1 + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + assertEquals(instancePartitions.getInstances(0, 0), Arrays + .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2, + SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 10)); + assertEquals(instancePartitions.getInstances(0, 1), Arrays + .asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 9, + SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 7)); + + // Add 1 more instances to each pool + poolCount = 0; + for (int i = numInstances + 2; i < numInstances + 4; i++) { + InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i); + instanceConfig.addTag(OFFLINE_TAG); + int pool = poolCount++; + instanceConfig.getRecord() + .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool))); + instanceConfigs.add(instanceConfig); + } + + // Get the latest existingInstancePartitions from last computation. + existingInstancePartitions = instancePartitions; + + // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 + // [pool0, pool1] + // r0 r1 + // Each replica-group should have 2 instances assigned + // Math.abs("myTable_OFFLINE".hashCode()) % 6 = 2 + // Latest instances from ZK: + // pool 0: [ i10, i12, i2, i4, i0, i1 ] + // pool 1: [ i6, i7, i8, i9, i11, i13 ] + // There is one more empty position for each of the replica groups. + // Append the newly added instances (ie. i12 and i13) to the tails. + // Thus, the new assignment will become: + // pool 0: [ i0, i1, i2, i4, i10, i12 ] + // pool 1: [ i8, i6, i9, i11, i7, i13 ] + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + assertEquals(instancePartitions.getInstances(0, 0), Arrays + .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2, + SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 12)); + assertEquals(instancePartitions.getInstances(0, 1), Arrays + .asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 9, + SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 13)); + + // Remove one instances from each of the pools, ie. i2 and i8. + instanceConfigs.remove(6); + instanceConfigs.remove(2); + + // Get the latest existingInstancePartitions from last computation. + existingInstancePartitions = instancePartitions; + + // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 + // [pool0, pool1] + // r0 r1 + // Each replica-group should have 2 instances assigned + // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3 + // Latest instances from ZK: + // pool 0: [ i12, i4, i0, i1, i10 ] + // pool 1: [ i7, i9, i11, i13, i6 ] + // Since i2 and i8 got removed from the pools, + // the tail instances (ie. i12 and 13) will be used to fill their vacant position. + // Thus, the new assignment will become: + // pool 0: [ i0, i1, i12, i4, i10 ] + // pool 1: [ i13, i6, i9, i11, i7 ] + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + assertEquals(instancePartitions.getInstances(0, 0), Arrays + .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 12, + SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 10)); + assertEquals(instancePartitions.getInstances(0, 1), Arrays + .asList(SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 9, + SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 7)); } @Test @@ -270,7 +614,7 @@ public class InstanceAssignmentTest { // No instance assignment config assertFalse(InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, InstancePartitionsType.OFFLINE)); try { - driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); fail(); } catch (IllegalStateException e) { assertEquals(e.getMessage(), "Instance assignment is not allowed for the given table config"); @@ -284,7 +628,7 @@ public class InstanceAssignmentTest { // No instance with correct tag try { - driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); fail(); } catch (IllegalStateException e) { assertEquals(e.getMessage(), "No enabled instance has the tag: tenant_OFFLINE"); @@ -295,7 +639,8 @@ public class InstanceAssignmentTest { } // All instances should be assigned as replica-group 0 partition 0 - InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + InstancePartitions instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); assertEquals(instancePartitions.getNumReplicaGroups(), 1); assertEquals(instancePartitions.getNumPartitions(), 1); List<String> expectedInstances = new ArrayList<>(numInstances); @@ -311,7 +656,7 @@ public class InstanceAssignmentTest { // No instance has correct pool configured try { - driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); fail(); } catch (IllegalStateException e) { assertEquals(e.getMessage(), "No enabled instance has the pool configured for the tag: tenant_OFFLINE"); @@ -328,7 +673,7 @@ public class InstanceAssignmentTest { // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 // All instances in pool 0 should be assigned as replica-group 0 partition 0 - instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); assertEquals(instancePartitions.getNumReplicaGroups(), 1); assertEquals(instancePartitions.getNumPartitions(), 1); expectedInstances.clear(); @@ -343,7 +688,7 @@ public class InstanceAssignmentTest { // Ask for too many pools try { - driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); fail(); } catch (IllegalStateException e) { assertEquals(e.getMessage(), "Not enough instance pools (2 in the cluster, asked for 3)"); @@ -355,7 +700,7 @@ public class InstanceAssignmentTest { // Ask for pool that does not exist try { - driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); fail(); } catch (IllegalStateException e) { assertEquals(e.getMessage(), "Cannot find all instance pools configured: [0, 2]"); @@ -368,7 +713,7 @@ public class InstanceAssignmentTest { // Ask for too many instances try { - driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); fail(); } catch (IllegalStateException e) { assertEquals(e.getMessage(), "Not enough qualified instances from pool: 0 (5 in the pool, asked for 6)"); @@ -381,7 +726,7 @@ public class InstanceAssignmentTest { // Number of replica-groups must be positive try { - driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); fail(); } catch (IllegalStateException e) { assertEquals(e.getMessage(), "Number of replica-groups must be positive"); @@ -393,7 +738,7 @@ public class InstanceAssignmentTest { // Ask for too many replica-groups try { - driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); fail(); } catch (IllegalStateException e) { assertEquals(e.getMessage(), @@ -406,7 +751,7 @@ public class InstanceAssignmentTest { // Ask for too many instances try { - driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); fail(); } catch (IllegalStateException e) { assertEquals(e.getMessage(), "Not enough qualified instances from pool: 0 (5 in the pool, asked for 6)"); @@ -418,7 +763,7 @@ public class InstanceAssignmentTest { // Ask for too many instances per partition try { - driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); fail(); } catch (IllegalStateException e) { assertEquals(e.getMessage(), @@ -434,7 +779,7 @@ public class InstanceAssignmentTest { // r0 r2 r0 r2 // pool1: [i8, i9, i5, i6, i7] // r1 r1 - instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); assertEquals(instancePartitions.getNumReplicaGroups(), 3); assertEquals(instancePartitions.getNumPartitions(), 1); assertEquals(instancePartitions.getInstances(0, 0), diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java index b2bfab19f1..6758d17ff5 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java @@ -36,7 +36,7 @@ public class PinotTableRebalancer extends PinotZKChanger { public PinotTableRebalancer(String zkAddress, String clusterName, boolean dryRun, boolean reassignInstances, boolean includeConsuming, boolean bootstrap, boolean downtime, int minReplicasToKeepUpForNoDowntime, - boolean bestEffort) { + boolean bestEffort, boolean retainInstancesSequence) { super(zkAddress, clusterName); _rebalanceConfig.addProperty(RebalanceConfigConstants.DRY_RUN, dryRun); _rebalanceConfig.addProperty(RebalanceConfigConstants.REASSIGN_INSTANCES, reassignInstances); @@ -46,6 +46,7 @@ public class PinotTableRebalancer extends PinotZKChanger { _rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME, minReplicasToKeepUpForNoDowntime); _rebalanceConfig.addProperty(RebalanceConfigConstants.BEST_EFFORTS, bestEffort); + _rebalanceConfig.addProperty(RebalanceConfigConstants.RETAIN_INSTANCE_SEQUENCE, retainInstancesSequence); } public RebalanceResult rebalance(String tableNameWithType) { diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java index 6f15c8632b..d7148668d0 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java @@ -77,6 +77,10 @@ public class RebalanceTableCommand extends AbstractBaseAdminCommand implements C + " cannot be achieved, false by default)") private boolean _bestEfforts = false; + @CommandLine.Option(names = {"-retainInstancesSequence"}, + description = "Whether to retain instance sequence during rebalancing in order to minimize data movement") + private boolean _retainInstancesSequence = false; + @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, help = true, description = "Print this message") private boolean _help = false; @@ -94,7 +98,7 @@ public class RebalanceTableCommand extends AbstractBaseAdminCommand implements C throws Exception { PinotTableRebalancer tableRebalancer = new PinotTableRebalancer(_zkAddress, _clusterName, _dryRun, _reassignInstances, _includeConsuming, _bootstrap, - _downtime, _minAvailableReplicas, _bestEfforts); + _downtime, _minAvailableReplicas, _bestEfforts, _retainInstancesSequence); RebalanceResult rebalanceResult = tableRebalancer.rebalance(_tableNameWithType); LOGGER .info("Got rebalance result: {} for table: {}", JsonUtils.objectToString(rebalanceResult), _tableNameWithType); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org