This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch retain-instance-sequence in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 0214adf78c12aae29732aa0b94932854fca13b0e Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> AuthorDate: Wed Mar 30 10:54:14 2022 -0700 Add retainInstancesSequence feature to table rebalance to minimize data movement between instances --- .../common/assignment/InstancePartitions.java | 77 ++++- .../PinotInstanceAssignmentRestletResource.java | 15 +- .../helix/core/PinotHelixResourceManager.java | 2 +- .../instance/InstanceAssignmentDriver.java | 45 ++- .../instance/InstanceTagPoolSelector.java | 125 ++++++-- .../core/rebalance/RebalanceConfigConstants.java | 4 + .../helix/core/rebalance/TableRebalancer.java | 30 +- .../instance/InstanceAssignmentTest.java | 318 +++++++++++++++++++-- 8 files changed, 544 insertions(+), 72 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java index c511077..b26a957 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -58,25 +60,37 @@ import org.apache.pinot.spi.utils.JsonUtils; @JsonIgnoreProperties(ignoreUnknown = true) public class InstancePartitions { private static final char PARTITION_REPLICA_GROUP_SEPARATOR = '_'; + private static final String PARTITIONS_KEY = "partitions"; + private static final String INSTANCE_SEPARATOR = "/"; private final String _instancePartitionsName; - private final Map<String, List<String>> _partitionToInstancesMap; + private final Map<String, List<String>> _partitionWithReplicaGroupToInstancesMap; + private final Map<Integer, List<String>> _partitionToInstancesMap; private int _numPartitions; private int _numReplicaGroups; public InstancePartitions(String instancePartitionsName) { _instancePartitionsName = instancePartitionsName; + _partitionWithReplicaGroupToInstancesMap = new TreeMap<>(); _partitionToInstancesMap = new TreeMap<>(); } @JsonCreator private InstancePartitions( @JsonProperty(value = "instancePartitionsName", required = true) String instancePartitionsName, + @JsonProperty(value = "partitionWithReplicaGroupToInstancesMap", required = true) + Map<String, List<String>> partitionWithReplicaGroupToInstancesMap, @JsonProperty(value = "partitionToInstancesMap", required = true) - Map<String, List<String>> partitionToInstancesMap) { + Map<String, String> partitionsMap) { _instancePartitionsName = instancePartitionsName; - _partitionToInstancesMap = partitionToInstancesMap; - for (String key : partitionToInstancesMap.keySet()) { + _partitionWithReplicaGroupToInstancesMap = partitionWithReplicaGroupToInstancesMap; + _partitionToInstancesMap = new TreeMap<>(); + if (partitionsMap != null) { + for (Map.Entry<String, String> entry : partitionsMap.entrySet()) { + _partitionToInstancesMap.put(Integer.parseInt(entry.getKey()), extractInstances(entry.getValue())); + } + } + for (String key : partitionWithReplicaGroupToInstancesMap.keySet()) { int separatorIndex = key.indexOf(PARTITION_REPLICA_GROUP_SEPARATOR); int partitionId = Integer.parseInt(key.substring(0, separatorIndex)); int replicaGroupId = Integer.parseInt(key.substring(separatorIndex + 1)); @@ -91,8 +105,8 @@ public class InstancePartitions { } @JsonProperty - public Map<String, List<String>> getPartitionToInstancesMap() { - return _partitionToInstancesMap; + public Map<String, List<String>> getPartitionWithReplicaGroupToInstancesMap() { + return _partitionWithReplicaGroupToInstancesMap; } @JsonIgnore @@ -105,25 +119,68 @@ public class InstancePartitions { return _numReplicaGroups; } + @JsonIgnore + public Map<Integer, List<String>> getPartitionToInstancesMap() { + return _partitionToInstancesMap; + } + public List<String> getInstances(int partitionId, int replicaGroupId) { - return _partitionToInstancesMap + return _partitionWithReplicaGroupToInstancesMap .get(Integer.toString(partitionId) + PARTITION_REPLICA_GROUP_SEPARATOR + replicaGroupId); } public void setInstances(int partitionId, int replicaGroupId, List<String> instances) { String key = Integer.toString(partitionId) + PARTITION_REPLICA_GROUP_SEPARATOR + replicaGroupId; - _partitionToInstancesMap.put(key, instances); + _partitionWithReplicaGroupToInstancesMap.put(key, instances); _numPartitions = Integer.max(_numPartitions, partitionId + 1); _numReplicaGroups = Integer.max(_numReplicaGroups, replicaGroupId + 1); } + public void setPartitionToInstancesMap(Map<Integer, List<String>> partitionToInstancesMap) { + _partitionToInstancesMap.putAll(partitionToInstancesMap); + } + public static InstancePartitions fromZNRecord(ZNRecord znRecord) { - return new InstancePartitions(znRecord.getId(), znRecord.getListFields()); + return new InstancePartitions(znRecord.getId(), znRecord.getListFields(), znRecord.getMapField(PARTITIONS_KEY)); + } + + private static List<String> extractInstances(String instancesRawString) { + if (instancesRawString == null || instancesRawString.length() == 0) { + return Collections.emptyList(); + } + String[] instancesArray = instancesRawString.split(INSTANCE_SEPARATOR); + List<String> instances = new ArrayList<>(instancesArray.length); + Collections.addAll(instances, instancesArray); + return instances; + } + + private String convertInstancesToString(List<String> instances) { + if (instances == null || instances.isEmpty()) { + return ""; + } + StringBuilder stringBuilder = new StringBuilder(); + for (String instance : instances) { + if (stringBuilder.length() == 0) { + stringBuilder.append(instance); + } else { + stringBuilder.append(INSTANCE_SEPARATOR).append(instance); + } + } + return stringBuilder.toString(); + } + + private Map<String, String> convertListToStringMap() { + Map<String, String> convertedMap = new TreeMap<>(); + for (Map.Entry<Integer, List<String>> entry : _partitionToInstancesMap.entrySet()) { + convertedMap.put(Integer.toString(entry.getKey()), convertInstancesToString(entry.getValue())); + } + return convertedMap; } public ZNRecord toZNRecord() { ZNRecord znRecord = new ZNRecord(_instancePartitionsName); - znRecord.setListFields(_partitionToInstancesMap); + znRecord.setListFields(_partitionWithReplicaGroupToInstancesMap); + znRecord.setMapField(PARTITIONS_KEY, convertListToStringMap()); return znRecord; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java index 8d9e6cc..58e677d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java @@ -137,7 +137,7 @@ public class PinotInstanceAssignmentRestletResource { if (InstanceAssignmentConfigUtils .allowInstanceAssignment(offlineTableConfig, InstancePartitionsType.OFFLINE)) { instancePartitionsMap.put(InstancePartitionsType.OFFLINE, new InstanceAssignmentDriver(offlineTableConfig) - .assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs)); + .assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap())); } } catch (IllegalStateException e) { throw new ControllerApplicationException(LOGGER, "Caught IllegalStateException", Response.Status.BAD_REQUEST, @@ -156,15 +156,15 @@ public class PinotInstanceAssignmentRestletResource { if (instancePartitionsType == InstancePartitionsType.CONSUMING || instancePartitionsType == null) { if (InstanceAssignmentConfigUtils .allowInstanceAssignment(realtimeTableConfig, InstancePartitionsType.CONSUMING)) { - instancePartitionsMap.put(InstancePartitionsType.CONSUMING, - instanceAssignmentDriver.assignInstances(InstancePartitionsType.CONSUMING, instanceConfigs)); + instancePartitionsMap.put(InstancePartitionsType.CONSUMING, instanceAssignmentDriver + .assignInstances(InstancePartitionsType.CONSUMING, instanceConfigs, Collections.emptyMap())); } } if (instancePartitionsType == InstancePartitionsType.COMPLETED || instancePartitionsType == null) { if (InstanceAssignmentConfigUtils .allowInstanceAssignment(realtimeTableConfig, InstancePartitionsType.COMPLETED)) { - instancePartitionsMap.put(InstancePartitionsType.COMPLETED, - instanceAssignmentDriver.assignInstances(InstancePartitionsType.COMPLETED, instanceConfigs)); + instancePartitionsMap.put(InstancePartitionsType.COMPLETED, instanceAssignmentDriver + .assignInstances(InstancePartitionsType.COMPLETED, instanceConfigs, Collections.emptyMap())); } } } catch (IllegalStateException e) { @@ -295,8 +295,9 @@ public class PinotInstanceAssignmentRestletResource { while (iterator.hasNext()) { InstancePartitions instancePartitions = iterator.next(); boolean oldInstanceFound = false; - Map<String, List<String>> partitionToInstancesMap = instancePartitions.getPartitionToInstancesMap(); - for (List<String> instances : partitionToInstancesMap.values()) { + Map<String, List<String>> partitionWithReplicaGroupToInstancesMap = + instancePartitions.getPartitionWithReplicaGroupToInstancesMap(); + for (List<String> instances : partitionWithReplicaGroupToInstancesMap.values()) { oldInstanceFound |= Collections.replaceAll(instances, oldInstanceId, newInstanceId); } if (oldInstanceFound) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index ecbd3b7..754146a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -1648,7 +1648,7 @@ public class PinotHelixResourceManager { List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs(); for (InstancePartitionsType instancePartitionsType : instancePartitionsTypesToAssign) { InstancePartitions instancePartitions = - instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs); + instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs, Collections.emptyMap()); LOGGER.info("Persisting instance partitions: {}", instancePartitions); InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java index 1440fae..5cda724 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java @@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.assignment.instance; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.TreeMap; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils; import org.apache.pinot.common.assignment.InstancePartitions; @@ -51,16 +52,33 @@ public class InstanceAssignmentDriver { _tableConfig = tableConfig; } + /** + * Assign instances to InstancePartitions object. + * @param instancePartitionsType type of instance partitions + * @param instanceConfigs list of instance configs + * @param partitionToInstancesMap existing instance with sequence that should be respected. An empty list + * means no preceding sequence to respect and the instances would be sorted. + */ public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType, - List<InstanceConfig> instanceConfigs) { + List<InstanceConfig> instanceConfigs, Map<Integer, List<String>> partitionToInstancesMap) { + boolean shouldRetainInstanceSequence = !partitionToInstancesMap.isEmpty(); String tableNameWithType = _tableConfig.getTableName(); - LOGGER.info("Starting {} instance assignment for table: {}", instancePartitionsType, tableNameWithType); + LOGGER.info("Starting {} instance assignment for table: {}. Should retain instance sequence: {}", + instancePartitionsType, tableNameWithType, shouldRetainInstanceSequence); InstanceAssignmentConfig assignmentConfig = InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType); InstanceTagPoolSelector tagPoolSelector = new InstanceTagPoolSelector(assignmentConfig.getTagPoolConfig(), tableNameWithType); - Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = tagPoolSelector.selectInstances(instanceConfigs); + Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = + tagPoolSelector.selectInstances(instanceConfigs, partitionToInstancesMap); + + InstancePartitions instancePartitions = new InstancePartitions( + instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType))); + if (!partitionToInstancesMap.isEmpty()) { + instancePartitions + .setPartitionToInstancesMap(extractInstanceNamesFromPoolToInstanceConfigsMap(poolToInstanceConfigsMap)); + } InstanceConstraintConfig constraintConfig = assignmentConfig.getConstraintConfig(); List<InstanceConstraintApplier> constraintAppliers = new ArrayList<>(); @@ -75,9 +93,26 @@ public class InstanceAssignmentDriver { InstanceReplicaGroupPartitionSelector replicaPartitionSelector = new InstanceReplicaGroupPartitionSelector(assignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType); - InstancePartitions instancePartitions = new InstancePartitions( - instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType))); replicaPartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions); return instancePartitions; } + + private Map<Integer, List<String>> extractInstanceNamesFromPoolToInstanceConfigsMap( + Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap) { + Map<Integer, List<String>> partitionToInstancesMap = new TreeMap<>(); + for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) { + Integer pool = entry.getKey(); + List<InstanceConfig> instanceConfigs = entry.getValue(); + partitionToInstancesMap.put(pool, extractInstanceNamesFromInstanceConfigs(instanceConfigs)); + } + return partitionToInstancesMap; + } + + private List<String> extractInstanceNamesFromInstanceConfigs(List<InstanceConfig> instanceConfigs) { + List<String> instanceNames = new ArrayList<>(instanceConfigs.size()); + for (InstanceConfig instanceConfig : instanceConfigs) { + instanceNames.add(instanceConfig.getInstanceName()); + } + return instanceNames; + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java index 5aefd1a..b56c7ed 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java @@ -20,7 +20,10 @@ package org.apache.pinot.controller.helix.core.assignment.instance; import com.google.common.base.Preconditions; import java.util.ArrayList; -import java.util.Comparator; +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -48,52 +51,105 @@ public class InstanceTagPoolSelector { /** * Returns a map from pool to instance configs based on the tag and pool config for the given instance configs. + * @param instanceConfigs list of latest instance configs from ZK. + * @param partitionToInstancesMap existing instance with sequence that should be respected. An empty list + * means no preceding sequence to respect and the instances would be sorted. */ - public Map<Integer, List<InstanceConfig>> selectInstances(List<InstanceConfig> instanceConfigs) { + public Map<Integer, List<InstanceConfig>> selectInstances(List<InstanceConfig> instanceConfigs, + Map<Integer, List<String>> partitionToInstancesMap) { int tableNameHash = Math.abs(_tableNameWithType.hashCode()); LOGGER.info("Starting instance tag/pool selection for table: {} with hash: {}", _tableNameWithType, tableNameHash); - // Filter out the instances with the correct tag + // Filter out the instances with the correct tag. String tag = _tagPoolConfig.getTag(); - List<InstanceConfig> candidateInstanceConfigs = new ArrayList<>(); + Map<String, InstanceConfig> candidateInstanceConfigsMap = new LinkedHashMap<>(); for (InstanceConfig instanceConfig : instanceConfigs) { if (instanceConfig.getTags().contains(tag)) { - candidateInstanceConfigs.add(instanceConfig); + candidateInstanceConfigsMap.put(instanceConfig.getInstanceName(), instanceConfig); } } - candidateInstanceConfigs.sort(Comparator.comparing(InstanceConfig::getInstanceName)); - int numCandidateInstances = candidateInstanceConfigs.size(); + + // Find out newly added instances from the latest copies of instance configs. + Deque<String> newlyAddedInstances = new LinkedList<>(candidateInstanceConfigsMap.keySet()); + for (List<String> existingInstancesWithSequence : partitionToInstancesMap.values()) { + newlyAddedInstances.removeAll(existingInstancesWithSequence); + } + + int numCandidateInstances = candidateInstanceConfigsMap.size(); Preconditions.checkState(numCandidateInstances > 0, "No enabled instance has the tag: %s", tag); LOGGER.info("{} enabled instances have the tag: {} for table: {}", numCandidateInstances, tag, _tableNameWithType); - Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = new TreeMap<>(); + // Each pool number associates with a map that key is the instance name and value is the instance config. + Map<Integer, Map<String, InstanceConfig>> poolToInstanceConfigsMap = new HashMap<>(); + // Each pool number associates with a list of newly added instance configs, + // so that new instances can be fetched from this list. + Map<Integer, Deque<InstanceConfig>> poolToNewInstanceConfigsMap = new HashMap<>(); + + // Extract the pool information from the instance configs. + for (Map.Entry<String,InstanceConfig> entry : candidateInstanceConfigsMap.entrySet()) { + String instanceName = entry.getKey(); + InstanceConfig instanceConfig = entry.getValue(); + Map<String, String> poolMap = instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY); + if (poolMap != null && poolMap.containsKey(tag)) { + int pool = Integer.parseInt(poolMap.get(tag)); + poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new TreeMap<>()).put(instanceName, instanceConfig); + if (newlyAddedInstances.contains(instanceName)) { + poolToNewInstanceConfigsMap.computeIfAbsent(pool, k -> new LinkedList<>()).add(instanceConfig); + } + } + } + + Map<Integer, List<InstanceConfig>> poolToLatestInstanceConfigsMap = new TreeMap<>(); if (_tagPoolConfig.isPoolBased()) { // Pool based selection - // Extract the pool information from the instance configs - for (InstanceConfig instanceConfig : candidateInstanceConfigs) { - Map<String, String> poolMap = instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY); - if (poolMap != null && poolMap.containsKey(tag)) { - int pool = Integer.parseInt(poolMap.get(tag)); - poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(instanceConfig); + for (Map.Entry<Integer, List<String>> entry : partitionToInstancesMap.entrySet()) { + Integer pool = entry.getKey(); + List<String> existingInstanceAssignmentInPool = entry.getValue(); + List<InstanceConfig> candidateInstanceConfigsWithSequence = new ArrayList<>(); + for (String existingInstance: existingInstanceAssignmentInPool) { + InstanceConfig instanceConfig = poolToInstanceConfigsMap.get(pool).get(existingInstance); + // Add instances to the candidate list and respect the sequence of the existing instances from the ZK. + // The missing/removed instances will be replaced by the newly instances. + // If the instance still exists from ZK, then add it to the candidate list. + // E.g. if the old instances are: [I1, I2, I3, I4] and the new instance are: [I1, I3, I4, I5, I6], + // the removed instance is I2 and the newly added instances are I5 and I6. + // The position of I2 would be replaced by I5, the new remaining I6 would be appended to the tail. + // Thus, the new order would be [I1, I5, I3, I4, I6]. + if (instanceConfig != null) { + candidateInstanceConfigsWithSequence.add(instanceConfig); + } else { + // The current chosen instance no longer lives in the cluster any more, thus pick a new instance. + candidateInstanceConfigsWithSequence.add(poolToNewInstanceConfigsMap.get(pool).pollFirst()); + } } + poolToLatestInstanceConfigsMap.put(pool, candidateInstanceConfigsWithSequence); } + + // The preceding list of instances has been traversed. Add the remaining new instances. + for (Map.Entry<Integer, Deque<InstanceConfig>> entry : poolToNewInstanceConfigsMap.entrySet()) { + Integer pool = entry.getKey(); + Deque<InstanceConfig> remainingNewInstanceConfigs = entry.getValue(); + poolToLatestInstanceConfigsMap.computeIfAbsent(pool, k -> new ArrayList<>()) + .addAll(remainingNewInstanceConfigs); + } + Preconditions.checkState(!poolToInstanceConfigsMap.isEmpty(), "No enabled instance has the pool configured for the tag: %s", tag); Map<Integer, Integer> poolToNumInstancesMap = new TreeMap<>(); - for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) { + for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToLatestInstanceConfigsMap.entrySet()) { poolToNumInstancesMap.put(entry.getKey(), entry.getValue().size()); } LOGGER.info("Number instances for each pool: {} for table: {}", poolToNumInstancesMap, _tableNameWithType); // Calculate the pools to select based on the selection config - Set<Integer> pools = poolToInstanceConfigsMap.keySet(); + Set<Integer> pools = poolToLatestInstanceConfigsMap.keySet(); List<Integer> poolsToSelect = _tagPoolConfig.getPools(); if (poolsToSelect != null && !poolsToSelect.isEmpty()) { Preconditions.checkState(pools.containsAll(poolsToSelect), "Cannot find all instance pools configured: %s", poolsToSelect); } else { - int numPools = poolToInstanceConfigsMap.size(); + int numPools = poolToLatestInstanceConfigsMap.size(); int numPoolsToSelect = _tagPoolConfig.getNumPools(); if (numPoolsToSelect > 0) { Preconditions @@ -106,7 +162,7 @@ public class InstanceTagPoolSelector { // Directly return the map if all the pools are selected if (numPools == numPoolsToSelect) { LOGGER.info("Selecting all {} pools: {} for table: {}", numPools, pools, _tableNameWithType); - return poolToInstanceConfigsMap; + return poolToLatestInstanceConfigsMap; } // Select pools based on the table name hash to evenly distribute the tables @@ -123,10 +179,37 @@ public class InstanceTagPoolSelector { } else { // Non-pool based selection - LOGGER.info("Selecting {} instances for table: {}", numCandidateInstances, _tableNameWithType); + LOGGER.info("Selecting {} instances for table: {}", candidateInstanceConfigsMap.size(), _tableNameWithType); // Put all instance configs as pool 0 - poolToInstanceConfigsMap.put(0, candidateInstanceConfigs); + + for (Map.Entry<Integer, List<String>> entry : partitionToInstancesMap.entrySet()) { + Integer pool = entry.getKey(); + List<String> existingInstanceAssignmentInPool = entry.getValue(); + List<InstanceConfig> candidateInstanceConfigsWithSequence = new ArrayList<>(); + for (String existingInstance: existingInstanceAssignmentInPool) { + InstanceConfig instanceConfig = candidateInstanceConfigsMap.get(existingInstance); + // Add instances to the candidate list and respect the sequence of the existing instances from the ZK. + // The missing/removed instances will be replaced by the newly instances. + // If the instance still exists from ZK, then add it to the candidate list. + // E.g. if the old instances are: [I1, I2, I3, I4] and the new instance are: [I1, I3, I4, I5, I6], + // the removed instance is I2 and the newly added instances are I5 and I6. + // The position of I2 would be replaced by I5, the new remaining I6 would be appended to the tail. + // Thus, the new order would be [I1, I5, I3, I4, I6]. + if (instanceConfig != null) { + candidateInstanceConfigsWithSequence.add(instanceConfig); + } else { + // The current chosen instance no longer lives in the cluster any more, thus pick a new instance. + candidateInstanceConfigsWithSequence.add(candidateInstanceConfigsMap.get(newlyAddedInstances.pollFirst())); + } + } + poolToLatestInstanceConfigsMap.put(pool, candidateInstanceConfigsWithSequence); + } + // The preceding list of instances has been traversed. Add the remaining new instances. + for (String remainingNewInstance : newlyAddedInstances) { + poolToLatestInstanceConfigsMap.computeIfAbsent(0, k -> new ArrayList<>()) + .add(candidateInstanceConfigsMap.get(remainingNewInstance)); + } } - return poolToInstanceConfigsMap; + return poolToLatestInstanceConfigsMap; } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java index 70bebab..d08fbbf 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java @@ -33,6 +33,10 @@ public class RebalanceConfigConstants { public static final String REASSIGN_INSTANCES = "reassignInstances"; public static final boolean DEFAULT_REASSIGN_INSTANCES = false; + // Whether to retain the sequence for the existing instances + public static final String RETAIN_INSTANCE_SEQUENCE = "retainInstancesSequence"; + public static final boolean DEFAULT_RETAIN_INSTANCE_SEQUENCE = false; + // Whether to reassign CONSUMING segments public static final String INCLUDE_CONSUMING = "includeConsuming"; public static final boolean DEFAULT_INCLUDE_CONSUMING = false; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index d6701c8..005a1ef 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.rebalance; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -145,6 +146,8 @@ public class TableRebalancer { tableConfig.getRoutingConfig().getInstanceSelectorType()); boolean bestEfforts = rebalanceConfig.getBoolean(RebalanceConfigConstants.BEST_EFFORTS, RebalanceConfigConstants.DEFAULT_BEST_EFFORTS); + boolean retainInstanceSequence = rebalanceConfig.getBoolean(RebalanceConfigConstants.RETAIN_INSTANCE_SEQUENCE, + RebalanceConfigConstants.DEFAULT_RETAIN_INSTANCE_SEQUENCE); LOGGER.info( "Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, includeConsuming: {}, bootstrap: {}, " + "downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}, bestEfforts: {}", @@ -194,7 +197,7 @@ public class TableRebalancer { // Calculate instance partitions map Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap; try { - instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, dryRun); + instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, dryRun, retainInstanceSequence); } catch (Exception e) { LOGGER.warn( "Caught exception while fetching/calculating instance partitions for table: {}, aborting the rebalance", @@ -323,7 +326,7 @@ public class TableRebalancer { currentIdealState = idealState; currentAssignment = currentIdealState.getRecord().getMapFields(); // Re-calculate the instance partitions in case the instance configs changed during the rebalance - instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, false); + instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, false, retainInstanceSequence); tierToInstancePartitionsMap = getTierToInstancePartitionsMap(tableNameWithType, sortedTiers); targetAssignment = segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers, tierToInstancePartitionsMap, rebalanceConfig); @@ -381,21 +384,24 @@ public class TableRebalancer { } private Map<InstancePartitionsType, InstancePartitions> getInstancePartitionsMap(TableConfig tableConfig, - boolean reassignInstances, boolean dryRun) { + boolean reassignInstances, boolean dryRun, boolean retainInstanceSequence) { Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = new TreeMap<>(); if (tableConfig.getTableType() == TableType.OFFLINE) { instancePartitionsMap.put(InstancePartitionsType.OFFLINE, - getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, reassignInstances, dryRun)); + getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, reassignInstances, dryRun, + retainInstanceSequence)); } else { instancePartitionsMap.put(InstancePartitionsType.CONSUMING, - getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, reassignInstances, dryRun)); + getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, reassignInstances, dryRun, + retainInstanceSequence)); String tableNameWithType = tableConfig.getTableName(); if (InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig)) { LOGGER.info( "COMPLETED segments should be relocated, fetching/computing COMPLETED instance partitions for table: {}", tableNameWithType); instancePartitionsMap.put(InstancePartitionsType.COMPLETED, - getInstancePartitions(tableConfig, InstancePartitionsType.COMPLETED, reassignInstances, dryRun)); + getInstancePartitions(tableConfig, InstancePartitionsType.COMPLETED, reassignInstances, dryRun, + retainInstanceSequence)); } else { LOGGER.info( "COMPLETED segments should not be relocated, skipping fetching/computing COMPLETED instance partitions " @@ -413,14 +419,22 @@ public class TableRebalancer { } private InstancePartitions getInstancePartitions(TableConfig tableConfig, - InstancePartitionsType instancePartitionsType, boolean reassignInstances, boolean dryRun) { + InstancePartitionsType instancePartitionsType, boolean reassignInstances, boolean dryRun, + boolean retainInstanceSequence) { String tableNameWithType = tableConfig.getTableName(); if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, instancePartitionsType)) { if (reassignInstances) { + Map<Integer, List<String>> partitionsToInstancesMap = Collections.emptyMap(); + if (retainInstanceSequence) { + InstancePartitions existingInstancePartitions = InstancePartitionsUtils + .fetchOrComputeInstancePartitions(_helixManager, tableConfig, instancePartitionsType); + partitionsToInstancesMap = existingInstancePartitions.getPartitionToInstancesMap(); + } LOGGER.info("Reassigning {} instances for table: {}", instancePartitionsType, tableNameWithType); InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig); InstancePartitions instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, - _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true)); + _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true), + partitionsToInstancesMap); if (!dryRun) { LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions); InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitions); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java index f1fc049..0bbee49 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java @@ -21,7 +21,9 @@ package org.apache.pinot.controller.helix.core.assignment.instance; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils; import org.apache.pinot.common.assignment.InstancePartitions; @@ -71,7 +73,8 @@ public class InstanceAssignmentTest { } // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 2 instances - InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, + instanceConfigs, Collections.emptyMap()); assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas); assertEquals(instancePartitions.getNumPartitions(), 1); // Instances of index 4 to 7 are not assigned because of the hash-based rotation @@ -95,7 +98,8 @@ public class InstanceAssignmentTest { // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 3 instances, then these 3 // instances should be assigned to 2 partitions, each with 2 instances - instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + instancePartitions = driver + .assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap()); assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas); assertEquals(instancePartitions.getNumPartitions(), numPartitions); // Instance of index 7 is not assigned because of the hash-based rotation @@ -123,6 +127,91 @@ public class InstanceAssignmentTest { Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3)); assertEquals(instancePartitions.getInstances(1, 2), Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 6)); + + // ===== Test against the cases when partitionToInstancesMap isn't empty. ===== + // Put the existing partition to instances map as the parameter to the InstanceAssignmentDriver + // instead of passing an empty map. + // The returned instance partition should be the same as the last computed one. + Map<Integer, List<String>> existingPartitionToInstancesMap = new HashMap<>(); + List<String> instances = Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, + SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4, + SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7, + SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9); + existingPartitionToInstancesMap.put(0, instances); + + // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 3 instances, then these 3 + // instances should be assigned to 2 partitions, each with 2 instances + instancePartitions = driver + .assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas); + assertEquals(instancePartitions.getNumPartitions(), numPartitions); + + // Instance of index 7 is not assigned because of the hash-based rotation + // Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8 + // [i8, i9, i0, i1, i2, i3, i4, i5, i6, i7] + // r0, r1, r2, r0, r1, r2, r0, r1, r2 + // r0: [i8, i1, i4] + // p0, p0, p1 + // p1 + // r1: [i9, i2, i5] + // p0, p0, p1 + // p1 + // r2: [i0, i3, i6] + // p0, p0, p1 + // p1 + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 8)); + assertEquals(instancePartitions.getInstances(1, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 8)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 9)); + assertEquals(instancePartitions.getInstances(1, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 9)); + assertEquals(instancePartitions.getInstances(0, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3)); + assertEquals(instancePartitions.getInstances(1, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 6)); + + // Remove two instances (i2, i6) and add two new instances (i10, i11). + instanceConfigs.remove(6); + instanceConfigs.remove(2); + for (int i = numInstances; i < numInstances + 2; i++) { + InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i); + instanceConfig.addTag(OFFLINE_TAG); + instanceConfigs.add(instanceConfig); + } + // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 3 instances, then these 3 + // instances should be assigned to 2 partitions, each with 2 instances + instancePartitions = driver + .assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas); + assertEquals(instancePartitions.getNumPartitions(), numPartitions); + + // Instance of index 7 is not assigned because of the hash-based rotation + // Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8 + // [i8, i9, i0, i1, i10, i3, i4, i5, i11, i7] + // r0, r1, r2, r0, r1, r2, r0, r1, r2 + // r0: [i8, i1, i4] + // p0, p0, p1 + // p1 + // r1: [i9, i10, i5] + // p0, p0, p1 + // p1 + // r2: [i0, i3, i11] + // p0, p0, p1 + // p1 + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 8)); + assertEquals(instancePartitions.getInstances(1, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 8)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 9)); + assertEquals(instancePartitions.getInstances(1, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 9)); + assertEquals(instancePartitions.getInstances(0, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3)); + assertEquals(instancePartitions.getInstances(1, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 11)); } @Test @@ -155,7 +244,8 @@ public class InstanceAssignmentTest { // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to // replica-group 1 - InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, + Collections.emptyMap()); assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); assertEquals(instancePartitions.getNumPartitions(), 1); assertEquals(instancePartitions.getInstances(0, 0), Arrays @@ -182,7 +272,8 @@ public class InstanceAssignmentTest { // Pool 0 and 2 will be selected in the pool selection // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 2 should be assigned to // replica-group 1 - instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap()); assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); assertEquals(instancePartitions.getNumPartitions(), 1); assertEquals(instancePartitions.getInstances(0, 0), Arrays @@ -200,7 +291,154 @@ public class InstanceAssignmentTest { // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2 // All instances in pool 2 should be assigned to replica-group 0, and all instances in pool 0 should be assigned to // replica-group 1 - instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap()); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + assertEquals(instancePartitions.getInstances(0, 0), Arrays + .asList(SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 12, + SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 14)); + assertEquals(instancePartitions.getInstances(0, 1), Arrays + .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2, + SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4)); + + // Select pool 0 and 1 in pool selection + tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 1)); + tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); + + // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 + // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to + // replica-group 1 + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap()); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + assertEquals(instancePartitions.getInstances(0, 0), Arrays + .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2, + SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4)); + assertEquals(instancePartitions.getInstances(0, 1), Arrays + .asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7, + SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9)); + + // Assign instances from 2 pools to 3 replica-groups + numReplicaGroups = numPools; + replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0); + tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); + + // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 + // [pool0, pool1] + // r0 r1 + // r2 + // Each replica-group should have 2 instances assigned + // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3 + // pool 0: [i3, i4, i0, i1, i2] + // r0 r2 r0 r2 + // pool 1: [i8, i9, i5, i6, i7] + // r1 r1 + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap()); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9)); + assertEquals(instancePartitions.getInstances(0, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4)); + + // ===== Test against the cases when partitionToInstancesMap isn't empty. ===== + // Reset the number of replica groups to 2 and pools to 2. + numReplicaGroups = 2; + numPools = 2; + replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0); + tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); + tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); + // Reset the instance configs to have only two pools. + instanceConfigs.clear(); + numInstances = 10; + for (int i = 0; i < numInstances; i++) { + InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i); + instanceConfig.addTag(OFFLINE_TAG); + int pool = i / numInstancesPerPool; + instanceConfig.getRecord() + .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool))); + instanceConfigs.add(instanceConfig); + } + + // Put the existing partition to instances map as the parameter to the InstanceAssignmentDriver. + Map<Integer, List<String>> existingPartitionToInstancesMap = new HashMap<>(); + existingPartitionToInstancesMap.put(0, Arrays + .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2, + SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4)); + existingPartitionToInstancesMap.put(1, Arrays + .asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7, + SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9)); + + // Use all pools, the instance partition should be the same as the one without using + // the existing partition to instances map. + // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 + // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to + // replica-group 1. + // [pool0, pool1] + // r0 r1 + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + assertEquals(instancePartitions.getInstances(0, 0), Arrays + .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2, + SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4)); + assertEquals(instancePartitions.getInstances(0, 1), Arrays + .asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7, + SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9)); + + // Add the third pool with same number of instances + numPools = 3; + numInstances = numPools * numInstancesPerPool; + for (int i = numInstances - numInstancesPerPool; i < numInstances; i++) { + InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i); + instanceConfig.addTag(OFFLINE_TAG); + int pool = numPools - 1; + instanceConfig.getRecord() + .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool))); + instanceConfigs.add(instanceConfig); + } + + // Putting the existingPartitionToInstancesMap shouldn't change the instance assignment. + // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2 + // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 + // Pool 0 and 2 will be selected in the pool selection + // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 2 should be assigned to + // replica-group 1. + // [pool0, pool1, pool2] + // r0 r1 + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + assertEquals(instancePartitions.getInstances(0, 0), Arrays + .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2, + SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4)); + assertEquals(instancePartitions.getInstances(0, 1), Arrays + .asList(SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 12, + SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 14)); + + // Select all 3 pools in pool selection + tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); + tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); + + // Putting the existingPartitionToInstancesMap shouldn't change the instance assignment. + // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2 + // All instances in pool 2 should be assigned to replica-group 0, and all instances in pool 0 should be assigned to + // replica-group 1. + // [pool0, pool1, pool2] + // r1 r2 r0 + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap); assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); assertEquals(instancePartitions.getNumPartitions(), 1); assertEquals(instancePartitions.getInstances(0, 0), Arrays @@ -215,10 +453,12 @@ public class InstanceAssignmentTest { tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); + // Putting the existingPartitionToInstancesMap shouldn't change the instance assignment. // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to // replica-group 1 - instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap); assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); assertEquals(instancePartitions.getNumPartitions(), 1); assertEquals(instancePartitions.getInstances(0, 0), Arrays @@ -244,7 +484,8 @@ public class InstanceAssignmentTest { // r0 r2 r0 r2 // pool 1: [i8, i9, i5, i6, i7] // r1 r1 - instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap); assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); assertEquals(instancePartitions.getNumPartitions(), 1); assertEquals(instancePartitions.getInstances(0, 0), @@ -253,6 +494,40 @@ public class InstanceAssignmentTest { Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9)); assertEquals(instancePartitions.getInstances(0, 2), Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4)); + + instanceConfigs.remove(12); + instanceConfigs.remove(9); + instanceConfigs.remove(3); + int poolCount = 0; + for (int i = numInstances; i < numInstances + 3; i++) { + InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i); + instanceConfig.addTag(OFFLINE_TAG); + int pool = poolCount++; + instanceConfig.getRecord() + .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool))); + instanceConfigs.add(instanceConfig); + } + + // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 + // [pool0, pool1] + // r0 r1 + // r2 + // Each replica-group should have 2 instances assigned + // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3 + // pool 0: [i15, i4, i0, i1, i2] + // r0 r2 r0 r2 + // pool 1: [i8, i16, i5, i6, i7] + // r1 r1 + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 15)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 16, SERVER_INSTANCE_ID_PREFIX + 8)); + assertEquals(instancePartitions.getInstances(0, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4)); } @Test @@ -270,7 +545,7 @@ public class InstanceAssignmentTest { // No instance assignment config assertFalse(InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, InstancePartitionsType.OFFLINE)); try { - driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap()); fail(); } catch (IllegalStateException e) { assertEquals(e.getMessage(), "Instance assignment is not allowed for the given table config"); @@ -284,7 +559,7 @@ public class InstanceAssignmentTest { // No instance with correct tag try { - driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap()); fail(); } catch (IllegalStateException e) { assertEquals(e.getMessage(), "No enabled instance has the tag: tenant_OFFLINE"); @@ -295,7 +570,8 @@ public class InstanceAssignmentTest { } // All instances should be assigned as replica-group 0 partition 0 - InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + InstancePartitions instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap()); assertEquals(instancePartitions.getNumReplicaGroups(), 1); assertEquals(instancePartitions.getNumPartitions(), 1); List<String> expectedInstances = new ArrayList<>(numInstances); @@ -311,7 +587,7 @@ public class InstanceAssignmentTest { // No instance has correct pool configured try { - driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap()); fail(); } catch (IllegalStateException e) { assertEquals(e.getMessage(), "No enabled instance has the pool configured for the tag: tenant_OFFLINE"); @@ -328,7 +604,8 @@ public class InstanceAssignmentTest { // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 // All instances in pool 0 should be assigned as replica-group 0 partition 0 - instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap()); assertEquals(instancePartitions.getNumReplicaGroups(), 1); assertEquals(instancePartitions.getNumPartitions(), 1); expectedInstances.clear(); @@ -343,7 +620,7 @@ public class InstanceAssignmentTest { // Ask for too many pools try { - driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap()); fail(); } catch (IllegalStateException e) { assertEquals(e.getMessage(), "Not enough instance pools (2 in the cluster, asked for 3)"); @@ -355,7 +632,7 @@ public class InstanceAssignmentTest { // Ask for pool that does not exist try { - driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap()); fail(); } catch (IllegalStateException e) { assertEquals(e.getMessage(), "Cannot find all instance pools configured: [0, 2]"); @@ -368,7 +645,7 @@ public class InstanceAssignmentTest { // Ask for too many instances try { - driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap()); fail(); } catch (IllegalStateException e) { assertEquals(e.getMessage(), "Not enough qualified instances from pool: 0 (5 in the pool, asked for 6)"); @@ -381,7 +658,7 @@ public class InstanceAssignmentTest { // Number of replica-groups must be positive try { - driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap()); fail(); } catch (IllegalStateException e) { assertEquals(e.getMessage(), "Number of replica-groups must be positive"); @@ -393,7 +670,7 @@ public class InstanceAssignmentTest { // Ask for too many replica-groups try { - driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap()); fail(); } catch (IllegalStateException e) { assertEquals(e.getMessage(), @@ -406,7 +683,7 @@ public class InstanceAssignmentTest { // Ask for too many instances try { - driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap()); fail(); } catch (IllegalStateException e) { assertEquals(e.getMessage(), "Not enough qualified instances from pool: 0 (5 in the pool, asked for 6)"); @@ -418,7 +695,7 @@ public class InstanceAssignmentTest { // Ask for too many instances per partition try { - driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap()); fail(); } catch (IllegalStateException e) { assertEquals(e.getMessage(), @@ -434,7 +711,8 @@ public class InstanceAssignmentTest { // r0 r2 r0 r2 // pool1: [i8, i9, i5, i6, i7] // r1 r1 - instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, + Collections.emptyMap()); assertEquals(instancePartitions.getNumReplicaGroups(), 3); assertEquals(instancePartitions.getNumPartitions(), 1); assertEquals(instancePartitions.getInstances(0, 0), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org