This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch maintain-pool-selection-for-minimizeDataMovement in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 9550545be3ea35b2874d79a62d396806c0f0f23f Author: jlli_LinkedIn <j...@linkedin.com> AuthorDate: Sun Nov 5 10:55:11 2023 -0800 Enhance the minimizeDataMovement to keep the existing pool assignment --- .../assignment/InstanceAssignmentConfigUtils.java | 2 +- .../common/utils/config/TableConfigSerDeTest.java | 2 +- .../instance/FDAwareInstancePartitionSelector.java | 6 +- .../instance/InstanceAssignmentDriver.java | 10 +- .../instance/InstancePartitionSelector.java | 4 +- .../instance/InstancePartitionSelectorFactory.java | 18 +- .../InstanceReplicaGroupPartitionSelector.java | 78 +++++++-- .../instance/InstanceTagPoolSelector.java | 63 ++++++- .../MirrorServerSetInstancePartitionSelector.java | 4 +- ...anceAssignmentRestletResourceStatelessTest.java | 6 +- .../instance/InstanceAssignmentTest.java | 193 ++++++++++++++++----- .../InstanceReplicaGroupPartitionSelectorTest.java | 2 +- .../TableRebalancerClusterStatelessTest.java | 4 +- .../table/assignment/InstanceAssignmentConfig.java | 16 +- .../InstanceReplicaGroupPartitionConfig.java | 2 + 15 files changed, 311 insertions(+), 99 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java index ebf38d308f..13cc270954 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java @@ -122,7 +122,7 @@ public class InstanceAssignmentConfigUtils { replicaGroupStrategyConfig.getNumInstancesPerPartition(), 0, 0, minimizeDataMovement, null); } - return new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig); + return new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, minimizeDataMovement); } public static boolean isMirrorServerSetAssignment(TableConfig tableConfig, diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java index ed9d605af0..74f857a102 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java @@ -212,7 +212,7 @@ public class TableConfigSerDeTest { InstanceAssignmentConfig instanceAssignmentConfig = new InstanceAssignmentConfig(new InstanceTagPoolConfig("tenant_OFFLINE", true, 3, null), new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2")), - new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, false, null)); + new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, false, null), null, false); TableConfig tableConfig = tableConfigBuilder.setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), instanceAssignmentConfig)).build(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java index 294971615a..de96d4da4d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java @@ -50,8 +50,8 @@ public class FDAwareInstancePartitionSelector extends InstancePartitionSelector private static final Logger LOGGER = LoggerFactory.getLogger(FDAwareInstancePartitionSelector.class); public FDAwareInstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, - String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions) { - super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions); + String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) { + super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement); } /** @@ -152,7 +152,7 @@ public class FDAwareInstancePartitionSelector extends InstancePartitionSelector * initialize the new replicaGroupBasedAssignmentState for assignment, * place existing instances in their corresponding positions */ - if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) { + if (_minimizeDataMovement && _existingInstancePartitions != null) { int numExistingReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); int numExistingPartitions = _existingInstancePartitions.getNumPartitions(); /* diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java index 6d869b86c1..09866c1ed7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java @@ -64,8 +64,8 @@ public class InstanceAssignmentDriver { } public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType, - List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, @Nullable - InstancePartitions preConfiguredInstancePartitions) { + List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, + @Nullable InstancePartitions preConfiguredInstancePartitions) { String tableNameWithType = _tableConfig.getTableName(); InstanceAssignmentConfig assignmentConfig = InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType); @@ -88,8 +88,10 @@ public class InstanceAssignmentDriver { String tableNameWithType = _tableConfig.getTableName(); LOGGER.info("Starting {} instance assignment for table {}", instancePartitionsName, tableNameWithType); + boolean minimizeDataMovement = instanceAssignmentConfig.isMinimizeDataMovement(); InstanceTagPoolSelector tagPoolSelector = - new InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), tableNameWithType); + new InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), tableNameWithType, + minimizeDataMovement, existingInstancePartitions); Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = tagPoolSelector.selectInstances(instanceConfigs); InstanceConstraintConfig constraintConfig = instanceAssignmentConfig.getConstraintConfig(); @@ -106,7 +108,7 @@ public class InstanceAssignmentDriver { InstancePartitionSelector instancePartitionSelector = InstancePartitionSelectorFactory.getInstance(instanceAssignmentConfig.getPartitionSelector(), instanceAssignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType, existingInstancePartitions, - preConfiguredInstancePartitions); + preConfiguredInstancePartitions, minimizeDataMovement); InstancePartitions instancePartitions = new InstancePartitions(instancePartitionsName); instancePartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions); return instancePartitions; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java index 396b869924..5f92db2426 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java @@ -29,12 +29,14 @@ abstract class InstancePartitionSelector { protected final InstanceReplicaGroupPartitionConfig _replicaGroupPartitionConfig; protected final String _tableNameWithType; protected final InstancePartitions _existingInstancePartitions; + protected final boolean _minimizeDataMovement; public InstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, - String tableNameWithType, InstancePartitions existingInstancePartitions) { + String tableNameWithType, InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) { _replicaGroupPartitionConfig = replicaGroupPartitionConfig; _tableNameWithType = tableNameWithType; _existingInstancePartitions = existingInstancePartitions; + _minimizeDataMovement = minimizeDataMovement; } /** diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java index 256aa89b02..8a343b1598 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java @@ -19,6 +19,7 @@ package org.apache.pinot.controller.helix.core.assignment.instance; import java.util.Arrays; +import javax.annotation.Nullable; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig; import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; @@ -31,25 +32,18 @@ public class InstancePartitionSelectorFactory { public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector, InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, String tableNameWithType, - InstancePartitions existingInstancePartitions) { - return getInstance(partitionSelector, instanceReplicaGroupPartitionConfig, tableNameWithType, - existingInstancePartitions, null); - } - - public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector, - InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, String tableNameWithType, - InstancePartitions existingInstancePartitions, InstancePartitions preConfiguredInstancePartitions - ) { + InstancePartitions existingInstancePartitions, @Nullable InstancePartitions preConfiguredInstancePartitions, + boolean minimizeDataMovement) { switch (partitionSelector) { case FD_AWARE_INSTANCE_PARTITION_SELECTOR: return new FDAwareInstancePartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType, - existingInstancePartitions); + existingInstancePartitions, minimizeDataMovement); case INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR: return new InstanceReplicaGroupPartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType, - existingInstancePartitions); + existingInstancePartitions, minimizeDataMovement); case MIRROR_SERVER_SET_PARTITION_SELECTOR: return new MirrorServerSetInstancePartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType, - existingInstancePartitions, preConfiguredInstancePartitions); + existingInstancePartitions, preConfiguredInstancePartitions, minimizeDataMovement); default: throw new IllegalStateException("Unexpected PartitionSelector: " + partitionSelector + ", should be from" + Arrays.toString(InstanceAssignmentConfig.PartitionSelector.values())); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java index de1e681d17..79e95db7a6 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java @@ -22,18 +22,21 @@ import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collections; import java.util.Deque; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; import java.util.Set; import java.util.TreeMap; import javax.annotation.Nullable; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; +import org.apache.pinot.spi.utils.Pairs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,8 +49,8 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele private static final Logger LOGGER = LoggerFactory.getLogger(InstanceReplicaGroupPartitionSelector.class); public InstanceReplicaGroupPartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, - String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions) { - super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions); + String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) { + super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement); } /** @@ -73,16 +76,65 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>(); Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>(); Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>(); - for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) { - // Pick one pool for each replica-group based on the table name hash - int pool = pools.get((tableNameHash + replicaId) % numPools); - poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId); - replicaGroupIdToPoolMap.put(replicaId, pool); + Map<String, Integer> instanceToPoolMap = new HashMap<>(); + for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) { + Integer pool = entry.getKey(); + List<InstanceConfig> instanceConfigsInPool = entry.getValue(); + Set<String> candidateInstances = poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>()); + for (InstanceConfig instanceConfig : instanceConfigsInPool) { + String instanceName = instanceConfig.getInstanceName(); + candidateInstances.add(instanceName); + instanceToPoolMap.put(instanceName, pool); + } + } - Set<String> candidateInstances = - poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>()); - List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool); - instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName())); + if (_minimizeDataMovement && _existingInstancePartitions != null) { + // Keep the same pool for the replica group if it's already been used for the table. + int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); + int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); + int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups); + for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) { + boolean foundExistingReplicaGroup = false; + for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingReplicaGroup; partitionId++) { + List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); + for (String existingInstance : existingInstances) { + Integer existingPool = instanceToPoolMap.get(existingInstance); + if (existingPool != null & pools.contains(existingPool)) { + poolToReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new ArrayList<>()).add(replicaGroupId); + replicaGroupIdToPoolMap.put(replicaGroupId, existingPool); + foundExistingReplicaGroup = true; + break; + } + } + } + } + // Use a min heap to track the least frequently picked pool among all the pools + PriorityQueue<Pairs.IntPair> minHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator()); + for (int pool : pools) { + int numExistingReplicaGroups = + poolToReplicaGroupIdsMap.get(pool) != null ? poolToReplicaGroupIdsMap.get(pool).size() : 0; + minHeap.add(new Pairs.IntPair(numExistingReplicaGroups, pool)); + } + for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) { + if (replicaGroupIdToPoolMap.containsKey(replicaId)) { + continue; + } + // Increment the frequency for a given pool and put it back to the min heap to rotate the pool selection. + Pairs.IntPair pair = minHeap.remove(); + int pool = pair.getRight(); + pair.setLeft(pair.getLeft() + 1); + minHeap.add(pair); + poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId); + replicaGroupIdToPoolMap.put(replicaId, pool); + } + } else { + // Current default way to assign pool to replica groups. + for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) { + // Pick one pool for each replica-group based on the table name hash + int pool = pools.get((tableNameHash + replicaId) % numPools); + poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId); + replicaGroupIdToPoolMap.put(replicaId, pool); + } } LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap, _tableNameWithType); @@ -132,7 +184,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}", numPartitions, numInstancesPerPartition, _tableNameWithType); - if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) { + if (_minimizeDataMovement && _existingInstancePartitions != null) { // Minimize data movement. int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); @@ -257,7 +309,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele } List<String> instancesToSelect; - if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) { + if (_minimizeDataMovement && _existingInstancePartitions != null) { // Minimize data movement. List<String> existingInstances = _existingInstancePartitions.getInstances(0, 0); LinkedHashSet<String> candidateInstances = new LinkedHashSet<>(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java index 5aefd1ad69..755e7aa713 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java @@ -21,11 +21,15 @@ package org.apache.pinot.controller.helix.core.assignment.instance; import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import javax.annotation.Nullable; import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.utils.config.InstanceUtils; import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig; import org.slf4j.Logger; @@ -41,9 +45,17 @@ public class InstanceTagPoolSelector { private final InstanceTagPoolConfig _tagPoolConfig; private final String _tableNameWithType; - public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String tableNameWithType) { + private final boolean _minimizeDataMovement; + + private final InstancePartitions _existingInstancePartitions; + + public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String tableNameWithType, + boolean minimizeDataMovement, + @Nullable InstancePartitions existingInstancePartitions) { _tagPoolConfig = tagPoolConfig; _tableNameWithType = tableNameWithType; + _minimizeDataMovement = minimizeDataMovement; + _existingInstancePartitions = existingInstancePartitions; } /** @@ -70,12 +82,14 @@ public class InstanceTagPoolSelector { if (_tagPoolConfig.isPoolBased()) { // Pool based selection + Map<String, Integer> instanceToPoolMap = new HashMap<>(); // Extract the pool information from the instance configs for (InstanceConfig instanceConfig : candidateInstanceConfigs) { Map<String, String> poolMap = instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY); if (poolMap != null && poolMap.containsKey(tag)) { int pool = Integer.parseInt(poolMap.get(tag)); poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(instanceConfig); + instanceToPoolMap.put(instanceConfig.getInstanceName(), pool); } } Preconditions.checkState(!poolToInstanceConfigsMap.isEmpty(), @@ -96,9 +110,8 @@ public class InstanceTagPoolSelector { int numPools = poolToInstanceConfigsMap.size(); int numPoolsToSelect = _tagPoolConfig.getNumPools(); if (numPoolsToSelect > 0) { - Preconditions - .checkState(numPoolsToSelect <= numPools, "Not enough instance pools (%s in the cluster, asked for %s)", - numPools, numPoolsToSelect); + Preconditions.checkState(numPoolsToSelect <= numPools, + "Not enough instance pools (%s in the cluster, asked for %s)", numPools, numPoolsToSelect); } else { numPoolsToSelect = numPools; } @@ -109,11 +122,45 @@ public class InstanceTagPoolSelector { return poolToInstanceConfigsMap; } - // Select pools based on the table name hash to evenly distribute the tables poolsToSelect = new ArrayList<>(numPoolsToSelect); - List<Integer> poolsInCluster = new ArrayList<>(pools); - for (int i = 0; i < numPoolsToSelect; i++) { - poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools)); + if (_minimizeDataMovement && _existingInstancePartitions != null) { + Set<Integer> existingPools = new HashSet<>(numPoolsToSelect); + // Keep the same pool if it's already been used for the table. + int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); + int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); + for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) { + boolean foundExistingPoolForReplicaGroup = false; + for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingPoolForReplicaGroup; + partitionId++) { + List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); + for (String existingInstance : existingInstances) { + Integer existingPool = instanceToPoolMap.get(existingInstance); + if (existingPool != null & pools.contains(existingPool)) { + poolsToSelect.add(existingPool); + existingPools.add(existingPool); + foundExistingPoolForReplicaGroup = true; + break; + } + } + } + } + LOGGER.info("Keep the same pool: {} for table: {}", existingPools, _tableNameWithType); + // Pick a pool from remainingPools that isn't used before. + List<Integer> remainingPools = new ArrayList<>(pools); + remainingPools.retainAll(existingPools); + // Skip selecting the existing pool. + for (int i = 0; i < numPoolsToSelect; i++) { + if (existingPools.contains(i)) { + continue; + } + poolsToSelect.add(remainingPools.remove(i % remainingPools.size())); + } + } else { + // Select pools based on the table name hash to evenly distribute the tables + List<Integer> poolsInCluster = new ArrayList<>(pools); + for (int i = 0; i < numPoolsToSelect; i++) { + poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools)); + } } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java index 6b4086615a..f273866eeb 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java @@ -76,8 +76,8 @@ public class MirrorServerSetInstancePartitionSelector extends InstancePartitionS public MirrorServerSetInstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, - InstancePartitions preConfiguredInstancePartitions) { - super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions); + InstancePartitions preConfiguredInstancePartitions, boolean minimizeDataMovement) { + super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement); _preConfiguredInstancePartitions = preConfiguredInstancePartitions; _numTargetInstancesPerReplicaGroup = _replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup(); _numTargetReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java index dedc79384e..9feb8844c8 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java @@ -118,7 +118,7 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control // Add OFFLINE instance assignment config to the offline table config InstanceAssignmentConfig offlineInstanceAssignmentConfig = new InstanceAssignmentConfig( new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), false, 0, null), null, - new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null)); + new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null), null, false); offlineTableConfig.setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), offlineInstanceAssignmentConfig)); _helixResourceManager.setExistingTableConfig(offlineTableConfig); @@ -136,7 +136,7 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control // Add CONSUMING instance assignment config to the real-time table config InstanceAssignmentConfig consumingInstanceAssignmentConfig = new InstanceAssignmentConfig( new InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(SERVER_TENANT_NAME), false, 0, null), null, - new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null)); + new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null), null, false); realtimeTableConfig.setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.CONSUMING.toString(), consumingInstanceAssignmentConfig)); _helixResourceManager.setExistingTableConfig(realtimeTableConfig); @@ -164,7 +164,7 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control null))); InstanceAssignmentConfig tierInstanceAssignmentConfig = new InstanceAssignmentConfig( new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), false, 0, null), null, - new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null)); + new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null), null, false); Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = new HashMap<>(); instanceAssignmentConfigMap.put(InstancePartitionsType.OFFLINE.toString(), offlineInstanceAssignmentConfig); instanceAssignmentConfigMap.put(TIER_NAME, tierInstanceAssignmentConfig); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java index b25a529e10..a6220c00a2 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java @@ -374,7 +374,7 @@ public class InstanceAssignmentTest { TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false))) .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")) .build(); InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig); @@ -480,7 +480,7 @@ public class InstanceAssignmentTest { TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false))) .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig); InstancePartitions preConfigured = new InstancePartitions("preConfigured"); @@ -561,7 +561,7 @@ public class InstanceAssignmentTest { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false))) .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); driver = new InstanceAssignmentDriver(tableConfig); preConfigured = new InstancePartitions("preConfigured"); @@ -664,7 +664,7 @@ public class InstanceAssignmentTest { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false))) .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); driver = new InstanceAssignmentDriver(tableConfig); preConfigured = new InstancePartitions("preConfigured"); @@ -756,7 +756,7 @@ public class InstanceAssignmentTest { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false))) .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); driver = new InstanceAssignmentDriver(tableConfig); preConfigured = new InstancePartitions("preConfigured"); @@ -851,7 +851,7 @@ public class InstanceAssignmentTest { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false))) .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); driver = new InstanceAssignmentDriver(tableConfig); preConfigured = new InstancePartitions("preConfigured"); @@ -956,7 +956,7 @@ public class InstanceAssignmentTest { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false))) .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); driver = new InstanceAssignmentDriver(tableConfig); preConfigured = new InstancePartitions("preConfigured"); @@ -1063,7 +1063,7 @@ public class InstanceAssignmentTest { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false))) .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); driver = new InstanceAssignmentDriver(tableConfig); preConfigured = new InstancePartitions("preConfigured"); @@ -1156,7 +1156,7 @@ public class InstanceAssignmentTest { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false))) .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); driver = new InstanceAssignmentDriver(tableConfig); @@ -1230,7 +1230,7 @@ public class InstanceAssignmentTest { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false))) .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); driver = new InstanceAssignmentDriver(tableConfig); @@ -1311,7 +1311,7 @@ public class InstanceAssignmentTest { new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, false, null); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))).build(); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, false))).build(); InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig); // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 @@ -1364,7 +1364,7 @@ public class InstanceAssignmentTest { // Select all 3 pools in pool selection tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, false))); // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2 // All instances in pool 2 should be assigned to replica-group 0, and all instances in pool 0 should be assigned to @@ -1386,7 +1386,7 @@ public class InstanceAssignmentTest { // Select pool 0 and 1 in pool selection tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 1)); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, false))); // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to @@ -1408,7 +1408,7 @@ public class InstanceAssignmentTest { numReplicaGroups = numPools; replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, false))); // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 // [pool0, pool1] @@ -1438,7 +1438,7 @@ public class InstanceAssignmentTest { replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null); tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true))); // Reset the instance configs to have only two pools. instanceConfigs.clear(); numInstances = 10; @@ -1487,7 +1487,7 @@ public class InstanceAssignmentTest { // Select pool 0 and 1 in pool selection tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 1)); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true))); // Get the latest existingInstancePartitions from last computation. existingInstancePartitions = instancePartitions; @@ -1514,7 +1514,7 @@ public class InstanceAssignmentTest { numReplicaGroups = 3; replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true))); // Get the latest existingInstancePartitions from last computation. existingInstancePartitions = instancePartitions; @@ -1593,7 +1593,7 @@ public class InstanceAssignmentTest { numReplicaGroups = 2; replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true))); // Get the latest existingInstancePartitions from last computation. existingInstancePartitions = instancePartitions; @@ -1693,6 +1693,109 @@ public class InstanceAssignmentTest { assertEquals(instancePartitions.getInstances(0, 1), Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6)); + + // The below is the test suite for testing out minimizeDataMovement with pool configs + // Add the third pool with same number of instances but keep number of pools the same (i.e. 2) + numPools = 3; + numInstances = numPools * numInstancesPerPool; + for (int i = numInstances + 4; i < numInstances + 9; i++) { + InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i); + instanceConfig.addTag(OFFLINE_TAG); + int pool = numPools - 1; + instanceConfig.getRecord() + .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool))); + instanceConfigs.add(instanceConfig); + } + + // Get the latest existingInstancePartitions from last computation. + existingInstancePartitions = instancePartitions; + + // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2, but since minimizeDataMovement is enabled, + // same pools would be re-used. + // [pool0, pool1] + // r0 r1 + // Thus, the instance partition assignment remains the same as the previous one. + // pool 0: [ i12, i4, i0, i1, i10 ] + // pool 1: [ i7, i9, i11, i13, i6 ] + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0, + SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11, + SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6)); + + // Set tag pool config to 3. + tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); + tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true))); + + // Get the latest existingInstancePartitions from last computation. + existingInstancePartitions = instancePartitions; + + // Putting the existingPoolToInstancesMap shouldn't change the instance assignment, + // as there are only 2 replica groups needed. + // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 + // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2 + // But since Pool 0 and Pool 1 is already being used for the table, the numReplica remains at 2, + // so the 3rd pool (Pool 2) won't be picked up. + // Thus, the instance partition assignment remains the same as the existing one. + // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to + // replica-group 1 + // Now in poolToInstancesMap: + // pool 0: [ i12, i4, i0, i1, i10 ] + // pool 1: [ i7, i9, i11, i13, i6 ] + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0, + SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11, + SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6)); + + // Set replica group from 2 to 3 + numReplicaGroups = 3; + replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null); + tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true))); + + // Get the latest existingInstancePartitions from last computation. + existingInstancePartitions = instancePartitions; + + // Now that 1 more replica group is needed, Pool 2 will be chosen for the 3rd replica group + // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 + // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2 + // [pool0, pool1, pool2] + // r0 r1 r2 + // Each replica-group should have 2 instances assigned + // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3 + // Latest instances from ZK: + // pool 0: [ i3, i4, i0, i1, i2 ] + // pool 1: [ i8, i9, i5, i6, i7 ] + // pool 2: [ i22,i23,i19,i20,i21] + // Thus, the new assignment will become: + // pool 0: [ i12, i4, i0, i1, i10 ] + // pool 1: [ i7, i9, i11, i13, i6 ] + // pool 2: [ i22, i23, i19, i20,i21 ] + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0, + SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11, + SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6)); + assertEquals(instancePartitions.getInstances(0, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 22, SERVER_INSTANCE_ID_PREFIX + 23, SERVER_INSTANCE_ID_PREFIX + 19, + SERVER_INSTANCE_ID_PREFIX + 20, SERVER_INSTANCE_ID_PREFIX + 21)); } @Test @@ -1720,7 +1823,7 @@ public class InstanceAssignmentTest { InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); // No instance with correct tag try { @@ -1750,7 +1853,7 @@ public class InstanceAssignmentTest { // Enable pool tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); // No instance has correct pool configured try { @@ -1784,7 +1887,7 @@ public class InstanceAssignmentTest { tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 3, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); // Ask for too many pools try { @@ -1796,7 +1899,7 @@ public class InstanceAssignmentTest { tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 2)); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); // Ask for pool that does not exist try { @@ -1810,7 +1913,7 @@ public class InstanceAssignmentTest { replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(false, 6, 0, 0, 0, 0, false, null ); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); // Ask for too many instances try { @@ -1824,7 +1927,7 @@ public class InstanceAssignmentTest { replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 0, 0, 0, 0, false, null ); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); // Number of replica-groups must be positive try { @@ -1836,7 +1939,7 @@ public class InstanceAssignmentTest { replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 11, 0, 0, 0, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); // Ask for too many replica-groups try { @@ -1849,7 +1952,7 @@ public class InstanceAssignmentTest { replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 3, 0, 0, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); // Ask for too many instances try { @@ -1861,7 +1964,7 @@ public class InstanceAssignmentTest { replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 3, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); // Ask for too many instances per partition try { @@ -1874,7 +1977,7 @@ public class InstanceAssignmentTest { replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 0, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3 // pool0: [i3, i4, i0, i1, i2] @@ -1914,7 +2017,8 @@ public class InstanceAssignmentTest { try { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, "ILLEGAL_SELECTOR"))).build(); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, "ILLEGAL_SELECTOR", false))) + .build(); } catch (IllegalArgumentException e) { assertEquals(e.getMessage(), "No enum constant org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig.PartitionSelector" @@ -1943,7 +2047,8 @@ public class InstanceAssignmentTest { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build(); + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) + .build(); driver = new InstanceAssignmentDriver(tableConfig); try { instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); @@ -1976,7 +2081,8 @@ public class InstanceAssignmentTest { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build(); + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) + .build(); driver = new InstanceAssignmentDriver(tableConfig); try { instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); @@ -2017,7 +2123,8 @@ public class InstanceAssignmentTest { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build(); + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) + .build(); driver = new InstanceAssignmentDriver(tableConfig); try { instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); @@ -2055,7 +2162,8 @@ public class InstanceAssignmentTest { TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build(); + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) + .build(); InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig); InstancePartitions instancePartitions = @@ -2127,7 +2235,8 @@ public class InstanceAssignmentTest { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build(); + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), true))) + .build(); driver = new InstanceAssignmentDriver(tableConfig); // existingInstancePartitions = instancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, instancePartitions); @@ -2208,7 +2317,7 @@ public class InstanceAssignmentTest { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(partitionColumnName, numInstancesPerReplicaGroup)) .setSegmentPartitionConfig(segmentPartitionConfig).build(); driver = new InstanceAssignmentDriver(tableConfig); @@ -2282,7 +2391,7 @@ public class InstanceAssignmentTest { new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) .build(); driver = new InstanceAssignmentDriver(tableConfig); @@ -2338,7 +2447,7 @@ public class InstanceAssignmentTest { new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), true))) .build(); driver = new InstanceAssignmentDriver(tableConfig); @@ -2405,7 +2514,7 @@ public class InstanceAssignmentTest { new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) .build(); driver = new InstanceAssignmentDriver(tableConfig); @@ -2471,7 +2580,7 @@ public class InstanceAssignmentTest { new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), true))) .build(); driver = new InstanceAssignmentDriver(tableConfig); @@ -2542,7 +2651,7 @@ public class InstanceAssignmentTest { new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) .build(); driver = new InstanceAssignmentDriver(tableConfig); @@ -2593,7 +2702,7 @@ public class InstanceAssignmentTest { new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) .build(); driver = new InstanceAssignmentDriver(tableConfig); @@ -2657,7 +2766,7 @@ public class InstanceAssignmentTest { new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), true))) .build(); driver = new InstanceAssignmentDriver(tableConfig); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java index 889206437f..2fdef27796 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java @@ -64,7 +64,7 @@ public class InstanceReplicaGroupPartitionSelectorTest { new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null); InstanceReplicaGroupPartitionSelector selector = - new InstanceReplicaGroupPartitionSelector(config, "tableNameBlah", existing); + new InstanceReplicaGroupPartitionSelector(config, "tableNameBlah", existing, true); String[] serverNames = {"rg0-0", "rg0-1", "rg1-0", "rg1-1"}; String[] poolNumbers = {"0", "0", "1", "1"}; diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java index 5d679c0380..1df7109ef2 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java @@ -195,7 +195,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); _helixResourceManager.updateTableConfig(tableConfig); // No need to reassign instances because instances should be automatically assigned when updating the table config @@ -481,7 +481,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(TIER_A_NAME, - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); _helixResourceManager.updateTableConfig(tableConfig); rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java index 391ba4812d..ad4b22ecaf 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java @@ -41,13 +41,17 @@ public class InstanceAssignmentConfig extends BaseJsonConfig { "Configuration for the instance replica-group and partition of the instance assignment (mandatory)") private final InstanceReplicaGroupPartitionConfig _replicaGroupPartitionConfig; + @JsonPropertyDescription("Configuration to minimize data movement for pool and instance assignment") + private final boolean _minimizeDataMovement; + @JsonCreator public InstanceAssignmentConfig( @JsonProperty(value = "tagPoolConfig", required = true) InstanceTagPoolConfig tagPoolConfig, @JsonProperty("constraintConfig") @Nullable InstanceConstraintConfig constraintConfig, @JsonProperty(value = "replicaGroupPartitionConfig", required = true) InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, - @JsonProperty("partitionSelector") @Nullable String partitionSelector) { + @JsonProperty("partitionSelector") @Nullable String partitionSelector, + @JsonProperty("minimizeDataMovement") boolean minimizeDataMovement) { Preconditions.checkArgument(tagPoolConfig != null, "'tagPoolConfig' must be configured"); Preconditions .checkArgument(replicaGroupPartitionConfig != null, "'replicaGroupPartitionConfig' must be configured"); @@ -57,11 +61,7 @@ public class InstanceAssignmentConfig extends BaseJsonConfig { _partitionSelector = partitionSelector == null ? PartitionSelector.INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR : PartitionSelector.valueOf(partitionSelector); - } - - public InstanceAssignmentConfig(InstanceTagPoolConfig tagPoolConfig, InstanceConstraintConfig constraintConfig, - InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig) { - this(tagPoolConfig, constraintConfig, replicaGroupPartitionConfig, null); + _minimizeDataMovement = minimizeDataMovement; } public PartitionSelector getPartitionSelector() { @@ -81,6 +81,10 @@ public class InstanceAssignmentConfig extends BaseJsonConfig { return _replicaGroupPartitionConfig; } + public boolean isMinimizeDataMovement() { + return _minimizeDataMovement; + } + public enum PartitionSelector { FD_AWARE_INSTANCE_PARTITION_SELECTOR, INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR, MIRROR_SERVER_SET_PARTITION_SELECTOR diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java index adc72e8f1c..1bc40cba21 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java @@ -56,6 +56,8 @@ public class InstanceReplicaGroupPartitionConfig extends BaseJsonConfig { "Name of the column used for partition, if not provided table level replica group will be used") private final String _partitionColumn; + // TODO: remove this config in the next official release + @Deprecated private final boolean _minimizeDataMovement; @JsonCreator --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org