This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch 
maintain-pool-selection-for-minimizeDataMovement
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 9550545be3ea35b2874d79a62d396806c0f0f23f
Author: jlli_LinkedIn <j...@linkedin.com>
AuthorDate: Sun Nov 5 10:55:11 2023 -0800

    Enhance the minimizeDataMovement to keep the existing pool assignment
---
 .../assignment/InstanceAssignmentConfigUtils.java  |   2 +-
 .../common/utils/config/TableConfigSerDeTest.java  |   2 +-
 .../instance/FDAwareInstancePartitionSelector.java |   6 +-
 .../instance/InstanceAssignmentDriver.java         |  10 +-
 .../instance/InstancePartitionSelector.java        |   4 +-
 .../instance/InstancePartitionSelectorFactory.java |  18 +-
 .../InstanceReplicaGroupPartitionSelector.java     |  78 +++++++--
 .../instance/InstanceTagPoolSelector.java          |  63 ++++++-
 .../MirrorServerSetInstancePartitionSelector.java  |   4 +-
 ...anceAssignmentRestletResourceStatelessTest.java |   6 +-
 .../instance/InstanceAssignmentTest.java           | 193 ++++++++++++++++-----
 .../InstanceReplicaGroupPartitionSelectorTest.java |   2 +-
 .../TableRebalancerClusterStatelessTest.java       |   4 +-
 .../table/assignment/InstanceAssignmentConfig.java |  16 +-
 .../InstanceReplicaGroupPartitionConfig.java       |   2 +
 15 files changed, 311 insertions(+), 99 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
index ebf38d308f..13cc270954 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
@@ -122,7 +122,7 @@ public class InstanceAssignmentConfigUtils {
           replicaGroupStrategyConfig.getNumInstancesPerPartition(), 0, 0, 
minimizeDataMovement, null);
     }
 
-    return new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig);
+    return new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig, null, minimizeDataMovement);
   }
 
   public static boolean isMirrorServerSetAssignment(TableConfig tableConfig,
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index ed9d605af0..74f857a102 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -212,7 +212,7 @@ public class TableConfigSerDeTest {
       InstanceAssignmentConfig instanceAssignmentConfig =
           new InstanceAssignmentConfig(new 
InstanceTagPoolConfig("tenant_OFFLINE", true, 3, null),
               new InstanceConstraintConfig(Arrays.asList("constraint1", 
"constraint2")),
-              new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, 
false, null));
+              new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, 
false, null), null, false);
       TableConfig tableConfig = 
tableConfigBuilder.setInstanceAssignmentConfigMap(
           Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), 
instanceAssignmentConfig)).build();
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
index 294971615a..de96d4da4d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
@@ -50,8 +50,8 @@ public class FDAwareInstancePartitionSelector extends 
InstancePartitionSelector
   private static final Logger LOGGER = 
LoggerFactory.getLogger(FDAwareInstancePartitionSelector.class);
 
   public FDAwareInstancePartitionSelector(InstanceReplicaGroupPartitionConfig 
replicaGroupPartitionConfig,
-      String tableNameWithType, @Nullable InstancePartitions 
existingInstancePartitions) {
-    super(replicaGroupPartitionConfig, tableNameWithType, 
existingInstancePartitions);
+      String tableNameWithType, @Nullable InstancePartitions 
existingInstancePartitions, boolean minimizeDataMovement) {
+    super(replicaGroupPartitionConfig, tableNameWithType, 
existingInstancePartitions, minimizeDataMovement);
   }
 
   /**
@@ -152,7 +152,7 @@ public class FDAwareInstancePartitionSelector extends 
InstancePartitionSelector
        * initialize the new replicaGroupBasedAssignmentState for assignment,
        * place existing instances in their corresponding positions
        */
-      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && 
_existingInstancePartitions != null) {
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
         int numExistingReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
         int numExistingPartitions = 
_existingInstancePartitions.getNumPartitions();
         /*
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index 6d869b86c1..09866c1ed7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -64,8 +64,8 @@ public class InstanceAssignmentDriver {
   }
 
   public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
-      List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions, @Nullable
-      InstancePartitions preConfiguredInstancePartitions) {
+      List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions,
+      @Nullable InstancePartitions preConfiguredInstancePartitions) {
     String tableNameWithType = _tableConfig.getTableName();
     InstanceAssignmentConfig assignmentConfig =
         
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, 
instancePartitionsType);
@@ -88,8 +88,10 @@ public class InstanceAssignmentDriver {
     String tableNameWithType = _tableConfig.getTableName();
     LOGGER.info("Starting {} instance assignment for table {}", 
instancePartitionsName, tableNameWithType);
 
+    boolean minimizeDataMovement = 
instanceAssignmentConfig.isMinimizeDataMovement();
     InstanceTagPoolSelector tagPoolSelector =
-        new 
InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), 
tableNameWithType);
+        new 
InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), 
tableNameWithType,
+            minimizeDataMovement, existingInstancePartitions);
     Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = 
tagPoolSelector.selectInstances(instanceConfigs);
 
     InstanceConstraintConfig constraintConfig = 
instanceAssignmentConfig.getConstraintConfig();
@@ -106,7 +108,7 @@ public class InstanceAssignmentDriver {
     InstancePartitionSelector instancePartitionSelector =
         
InstancePartitionSelectorFactory.getInstance(instanceAssignmentConfig.getPartitionSelector(),
             instanceAssignmentConfig.getReplicaGroupPartitionConfig(), 
tableNameWithType, existingInstancePartitions,
-            preConfiguredInstancePartitions);
+            preConfiguredInstancePartitions, minimizeDataMovement);
     InstancePartitions instancePartitions = new 
InstancePartitions(instancePartitionsName);
     instancePartitionSelector.selectInstances(poolToInstanceConfigsMap, 
instancePartitions);
     return instancePartitions;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
index 396b869924..5f92db2426 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
@@ -29,12 +29,14 @@ abstract class InstancePartitionSelector {
   protected final InstanceReplicaGroupPartitionConfig 
_replicaGroupPartitionConfig;
   protected final String _tableNameWithType;
   protected final InstancePartitions _existingInstancePartitions;
+  protected final boolean _minimizeDataMovement;
 
   public InstancePartitionSelector(InstanceReplicaGroupPartitionConfig 
replicaGroupPartitionConfig,
-      String tableNameWithType, InstancePartitions existingInstancePartitions) 
{
+      String tableNameWithType, InstancePartitions existingInstancePartitions, 
boolean minimizeDataMovement) {
     _replicaGroupPartitionConfig = replicaGroupPartitionConfig;
     _tableNameWithType = tableNameWithType;
     _existingInstancePartitions = existingInstancePartitions;
+    _minimizeDataMovement = minimizeDataMovement;
   }
 
   /**
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
index 256aa89b02..8a343b1598 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.controller.helix.core.assignment.instance;
 
 import java.util.Arrays;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
 import 
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
@@ -31,25 +32,18 @@ public class InstancePartitionSelectorFactory {
 
   public static InstancePartitionSelector 
getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector,
       InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, 
String tableNameWithType,
-      InstancePartitions existingInstancePartitions) {
-    return getInstance(partitionSelector, instanceReplicaGroupPartitionConfig, 
tableNameWithType,
-        existingInstancePartitions, null);
-  }
-
-  public static InstancePartitionSelector 
getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector,
-      InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, 
String tableNameWithType,
-      InstancePartitions existingInstancePartitions, InstancePartitions 
preConfiguredInstancePartitions
-  ) {
+      InstancePartitions existingInstancePartitions, @Nullable 
InstancePartitions preConfiguredInstancePartitions,
+      boolean minimizeDataMovement) {
     switch (partitionSelector) {
       case FD_AWARE_INSTANCE_PARTITION_SELECTOR:
         return new 
FDAwareInstancePartitionSelector(instanceReplicaGroupPartitionConfig, 
tableNameWithType,
-            existingInstancePartitions);
+            existingInstancePartitions, minimizeDataMovement);
       case INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR:
         return new 
InstanceReplicaGroupPartitionSelector(instanceReplicaGroupPartitionConfig, 
tableNameWithType,
-            existingInstancePartitions);
+            existingInstancePartitions, minimizeDataMovement);
       case MIRROR_SERVER_SET_PARTITION_SELECTOR:
         return new 
MirrorServerSetInstancePartitionSelector(instanceReplicaGroupPartitionConfig, 
tableNameWithType,
-            existingInstancePartitions, preConfiguredInstancePartitions);
+            existingInstancePartitions, preConfiguredInstancePartitions, 
minimizeDataMovement);
       default:
         throw new IllegalStateException("Unexpected PartitionSelector: " + 
partitionSelector + ", should be from"
             + 
Arrays.toString(InstanceAssignmentConfig.PartitionSelector.values()));
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
index de1e681d17..79e95db7a6 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
@@ -22,18 +22,21 @@ import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Deque;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.TreeMap;
 import javax.annotation.Nullable;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import 
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.apache.pinot.spi.utils.Pairs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,8 +49,8 @@ public class InstanceReplicaGroupPartitionSelector extends 
InstancePartitionSele
   private static final Logger LOGGER = 
LoggerFactory.getLogger(InstanceReplicaGroupPartitionSelector.class);
 
   public 
InstanceReplicaGroupPartitionSelector(InstanceReplicaGroupPartitionConfig 
replicaGroupPartitionConfig,
-      String tableNameWithType, @Nullable InstancePartitions 
existingInstancePartitions) {
-    super(replicaGroupPartitionConfig, tableNameWithType, 
existingInstancePartitions);
+      String tableNameWithType, @Nullable InstancePartitions 
existingInstancePartitions, boolean minimizeDataMovement) {
+    super(replicaGroupPartitionConfig, tableNameWithType, 
existingInstancePartitions, minimizeDataMovement);
   }
 
   /**
@@ -73,16 +76,65 @@ public class InstanceReplicaGroupPartitionSelector extends 
InstancePartitionSele
       Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
       Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
       Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
-      for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
-        // Pick one pool for each replica-group based on the table name hash
-        int pool = pools.get((tableNameHash + replicaId) % numPools);
-        poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new 
ArrayList<>()).add(replicaId);
-        replicaGroupIdToPoolMap.put(replicaId, pool);
+      Map<String, Integer> instanceToPoolMap = new HashMap<>();
+      for (Map.Entry<Integer, List<InstanceConfig>> entry : 
poolToInstanceConfigsMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<InstanceConfig> instanceConfigsInPool = entry.getValue();
+        Set<String> candidateInstances = 
poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
+        for (InstanceConfig instanceConfig : instanceConfigsInPool) {
+          String instanceName = instanceConfig.getInstanceName();
+          candidateInstances.add(instanceName);
+          instanceToPoolMap.put(instanceName, pool);
+        }
+      }
 
-        Set<String> candidateInstances =
-            poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new 
LinkedHashSet<>());
-        List<InstanceConfig> instanceConfigsInPool = 
poolToInstanceConfigsMap.get(pool);
-        instanceConfigsInPool.forEach(k -> 
candidateInstances.add(k.getInstanceName()));
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
+        // Keep the same pool for the replica group if it's already been used 
for the table.
+        int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
+        int numCommonReplicaGroups = Math.min(numReplicaGroups, 
existingNumReplicaGroups);
+        for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; 
replicaGroupId++) {
+          boolean foundExistingReplicaGroup = false;
+          for (int partitionId = 0; partitionId < existingNumPartitions & 
!foundExistingReplicaGroup; partitionId++) {
+            List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            for (String existingInstance : existingInstances) {
+              Integer existingPool = instanceToPoolMap.get(existingInstance);
+              if (existingPool != null & pools.contains(existingPool)) {
+                poolToReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> 
new ArrayList<>()).add(replicaGroupId);
+                replicaGroupIdToPoolMap.put(replicaGroupId, existingPool);
+                foundExistingReplicaGroup = true;
+                break;
+              }
+            }
+          }
+        }
+        // Use a min heap to track the least frequently picked pool among all 
the pools
+        PriorityQueue<Pairs.IntPair> minHeap = new 
PriorityQueue<>(pools.size(), Pairs.intPairComparator());
+        for (int pool : pools) {
+          int numExistingReplicaGroups =
+              poolToReplicaGroupIdsMap.get(pool) != null ? 
poolToReplicaGroupIdsMap.get(pool).size() : 0;
+          minHeap.add(new Pairs.IntPair(numExistingReplicaGroups, pool));
+        }
+        for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
+          if (replicaGroupIdToPoolMap.containsKey(replicaId)) {
+            continue;
+          }
+          // Increment the frequency for a given pool and put it back to the 
min heap to rotate the pool selection.
+          Pairs.IntPair pair = minHeap.remove();
+          int pool = pair.getRight();
+          pair.setLeft(pair.getLeft() + 1);
+          minHeap.add(pair);
+          poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new 
ArrayList<>()).add(replicaId);
+          replicaGroupIdToPoolMap.put(replicaId, pool);
+        }
+      } else {
+        // Current default way to assign pool to replica groups.
+        for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
+          // Pick one pool for each replica-group based on the table name hash
+          int pool = pools.get((tableNameHash + replicaId) % numPools);
+          poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new 
ArrayList<>()).add(replicaId);
+          replicaGroupIdToPoolMap.put(replicaId, pool);
+        }
       }
       LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", 
numReplicaGroups, poolToReplicaGroupIdsMap,
           _tableNameWithType);
@@ -132,7 +184,7 @@ public class InstanceReplicaGroupPartitionSelector extends 
InstancePartitionSele
       LOGGER.info("Selecting {} partitions, {} instances per partition within 
a replica-group for table: {}",
           numPartitions, numInstancesPerPartition, _tableNameWithType);
 
-      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && 
_existingInstancePartitions != null) {
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
         // Minimize data movement.
         int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
         int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
@@ -257,7 +309,7 @@ public class InstanceReplicaGroupPartitionSelector extends 
InstancePartitionSele
       }
 
       List<String> instancesToSelect;
-      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && 
_existingInstancePartitions != null) {
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
         // Minimize data movement.
         List<String> existingInstances = 
_existingInstancePartitions.getInstances(0, 0);
         LinkedHashSet<String> candidateInstances = new LinkedHashSet<>();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
index 5aefd1ad69..755e7aa713 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
@@ -21,11 +21,15 @@ package 
org.apache.pinot.controller.helix.core.assignment.instance;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import javax.annotation.Nullable;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.utils.config.InstanceUtils;
 import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
 import org.slf4j.Logger;
@@ -41,9 +45,17 @@ public class InstanceTagPoolSelector {
   private final InstanceTagPoolConfig _tagPoolConfig;
   private final String _tableNameWithType;
 
-  public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String 
tableNameWithType) {
+  private final boolean _minimizeDataMovement;
+
+  private final InstancePartitions _existingInstancePartitions;
+
+  public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String 
tableNameWithType,
+      boolean minimizeDataMovement,
+      @Nullable InstancePartitions existingInstancePartitions) {
     _tagPoolConfig = tagPoolConfig;
     _tableNameWithType = tableNameWithType;
+    _minimizeDataMovement = minimizeDataMovement;
+    _existingInstancePartitions = existingInstancePartitions;
   }
 
   /**
@@ -70,12 +82,14 @@ public class InstanceTagPoolSelector {
     if (_tagPoolConfig.isPoolBased()) {
       // Pool based selection
 
+      Map<String, Integer> instanceToPoolMap = new HashMap<>();
       // Extract the pool information from the instance configs
       for (InstanceConfig instanceConfig : candidateInstanceConfigs) {
         Map<String, String> poolMap = 
instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY);
         if (poolMap != null && poolMap.containsKey(tag)) {
           int pool = Integer.parseInt(poolMap.get(tag));
           poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new 
ArrayList<>()).add(instanceConfig);
+          instanceToPoolMap.put(instanceConfig.getInstanceName(), pool);
         }
       }
       Preconditions.checkState(!poolToInstanceConfigsMap.isEmpty(),
@@ -96,9 +110,8 @@ public class InstanceTagPoolSelector {
         int numPools = poolToInstanceConfigsMap.size();
         int numPoolsToSelect = _tagPoolConfig.getNumPools();
         if (numPoolsToSelect > 0) {
-          Preconditions
-              .checkState(numPoolsToSelect <= numPools, "Not enough instance 
pools (%s in the cluster, asked for %s)",
-                  numPools, numPoolsToSelect);
+          Preconditions.checkState(numPoolsToSelect <= numPools,
+              "Not enough instance pools (%s in the cluster, asked for %s)", 
numPools, numPoolsToSelect);
         } else {
           numPoolsToSelect = numPools;
         }
@@ -109,11 +122,45 @@ public class InstanceTagPoolSelector {
           return poolToInstanceConfigsMap;
         }
 
-        // Select pools based on the table name hash to evenly distribute the 
tables
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
-        List<Integer> poolsInCluster = new ArrayList<>(pools);
-        for (int i = 0; i < numPoolsToSelect; i++) {
-          poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % 
numPools));
+        if (_minimizeDataMovement && _existingInstancePartitions != null) {
+          Set<Integer> existingPools = new HashSet<>(numPoolsToSelect);
+          // Keep the same pool if it's already been used for the table.
+          int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
+          int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
+          for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {
+            boolean foundExistingPoolForReplicaGroup = false;
+            for (int partitionId = 0; partitionId < existingNumPartitions & 
!foundExistingPoolForReplicaGroup;
+                partitionId++) {
+              List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+              for (String existingInstance : existingInstances) {
+                Integer existingPool = instanceToPoolMap.get(existingInstance);
+                if (existingPool != null & pools.contains(existingPool)) {
+                  poolsToSelect.add(existingPool);
+                  existingPools.add(existingPool);
+                  foundExistingPoolForReplicaGroup = true;
+                  break;
+                }
+              }
+            }
+          }
+          LOGGER.info("Keep the same pool: {} for table: {}", existingPools, 
_tableNameWithType);
+          // Pick a pool from remainingPools that isn't used before.
+          List<Integer> remainingPools = new ArrayList<>(pools);
+          remainingPools.retainAll(existingPools);
+          // Skip selecting the existing pool.
+          for (int i = 0; i < numPoolsToSelect; i++) {
+            if (existingPools.contains(i)) {
+              continue;
+            }
+            poolsToSelect.add(remainingPools.remove(i % 
remainingPools.size()));
+          }
+        } else {
+          // Select pools based on the table name hash to evenly distribute 
the tables
+          List<Integer> poolsInCluster = new ArrayList<>(pools);
+          for (int i = 0; i < numPoolsToSelect; i++) {
+            poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % 
numPools));
+          }
         }
       }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java
index 6b4086615a..f273866eeb 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java
@@ -76,8 +76,8 @@ public class MirrorServerSetInstancePartitionSelector extends 
InstancePartitionS
 
   public 
MirrorServerSetInstancePartitionSelector(InstanceReplicaGroupPartitionConfig 
replicaGroupPartitionConfig,
       String tableNameWithType, @Nullable InstancePartitions 
existingInstancePartitions,
-      InstancePartitions preConfiguredInstancePartitions) {
-    super(replicaGroupPartitionConfig, tableNameWithType, 
existingInstancePartitions);
+      InstancePartitions preConfiguredInstancePartitions, boolean 
minimizeDataMovement) {
+    super(replicaGroupPartitionConfig, tableNameWithType, 
existingInstancePartitions, minimizeDataMovement);
     _preConfiguredInstancePartitions = preConfiguredInstancePartitions;
     _numTargetInstancesPerReplicaGroup = 
_replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup();
     _numTargetReplicaGroups = 
_replicaGroupPartitionConfig.getNumReplicaGroups();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
index dedc79384e..9feb8844c8 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
@@ -118,7 +118,7 @@ public class 
PinotInstanceAssignmentRestletResourceStatelessTest extends Control
     // Add OFFLINE instance assignment config to the offline table config
     InstanceAssignmentConfig offlineInstanceAssignmentConfig = new 
InstanceAssignmentConfig(
         new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), 
false, 0, null), null,
-        new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, 
null));
+        new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, 
null), null, false);
     offlineTableConfig.setInstanceAssignmentConfigMap(
         Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), 
offlineInstanceAssignmentConfig));
     _helixResourceManager.setExistingTableConfig(offlineTableConfig);
@@ -136,7 +136,7 @@ public class 
PinotInstanceAssignmentRestletResourceStatelessTest extends Control
     // Add CONSUMING instance assignment config to the real-time table config
     InstanceAssignmentConfig consumingInstanceAssignmentConfig = new 
InstanceAssignmentConfig(
         new 
InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(SERVER_TENANT_NAME), 
false, 0, null), null,
-        new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, 
null));
+        new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, 
null), null, false);
     realtimeTableConfig.setInstanceAssignmentConfigMap(
         Collections.singletonMap(InstancePartitionsType.CONSUMING.toString(), 
consumingInstanceAssignmentConfig));
     _helixResourceManager.setExistingTableConfig(realtimeTableConfig);
@@ -164,7 +164,7 @@ public class 
PinotInstanceAssignmentRestletResourceStatelessTest extends Control
             null)));
     InstanceAssignmentConfig tierInstanceAssignmentConfig = new 
InstanceAssignmentConfig(
         new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), 
false, 0, null), null,
-        new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, 
null));
+        new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, 
null), null, false);
     Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = new 
HashMap<>();
     instanceAssignmentConfigMap.put(InstancePartitionsType.OFFLINE.toString(), 
offlineInstanceAssignmentConfig);
     instanceAssignmentConfigMap.put(TIER_NAME, tierInstanceAssignmentConfig);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index b25a529e10..a6220c00a2 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -374,7 +374,7 @@ public class InstanceAssignmentTest {
       TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
           
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
               new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
-                  
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                  
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(),
 false)))
           
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
 "preConfigured"))
           .build();
       InstanceAssignmentDriver driver = new 
InstanceAssignmentDriver(tableConfig);
@@ -480,7 +480,7 @@ public class InstanceAssignmentTest {
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
-                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(),
 false)))
         
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
 "preConfigured")).build();
     InstanceAssignmentDriver driver = new 
InstanceAssignmentDriver(tableConfig);
     InstancePartitions preConfigured = new InstancePartitions("preConfigured");
@@ -561,7 +561,7 @@ public class InstanceAssignmentTest {
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
-                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(),
 false)))
         
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
 "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
     preConfigured = new InstancePartitions("preConfigured");
@@ -664,7 +664,7 @@ public class InstanceAssignmentTest {
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
-                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(),
 false)))
         
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
 "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
     preConfigured = new InstancePartitions("preConfigured");
@@ -756,7 +756,7 @@ public class InstanceAssignmentTest {
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
-                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(),
 false)))
         
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
 "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
     preConfigured = new InstancePartitions("preConfigured");
@@ -851,7 +851,7 @@ public class InstanceAssignmentTest {
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
-                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(),
 false)))
         
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
 "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
     preConfigured = new InstancePartitions("preConfigured");
@@ -956,7 +956,7 @@ public class InstanceAssignmentTest {
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
-                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(),
 false)))
         
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
 "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
     preConfigured = new InstancePartitions("preConfigured");
@@ -1063,7 +1063,7 @@ public class InstanceAssignmentTest {
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
-                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(),
 false)))
         
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
 "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
     preConfigured = new InstancePartitions("preConfigured");
@@ -1156,7 +1156,7 @@ public class InstanceAssignmentTest {
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
-                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(),
 false)))
         
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
 "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -1230,7 +1230,7 @@ public class InstanceAssignmentTest {
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
-                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(),
 false)))
         
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
 "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -1311,7 +1311,7 @@ public class InstanceAssignmentTest {
         new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 
0, 0, false, null);
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-            new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig))).build();
+            new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig, null, false))).build();
     InstanceAssignmentDriver driver = new 
InstanceAssignmentDriver(tableConfig);
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
@@ -1364,7 +1364,7 @@ public class InstanceAssignmentTest {
     // Select all 3 pools in pool selection
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, 
null);
     
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig, null, false)));
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
     // All instances in pool 2 should be assigned to replica-group 0, and all 
instances in pool 0 should be assigned to
@@ -1386,7 +1386,7 @@ public class InstanceAssignmentTest {
     // Select pool 0 and 1 in pool selection
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, 
Arrays.asList(0, 1));
     
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig, null, false)));
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
     // All instances in pool 0 should be assigned to replica-group 0, and all 
instances in pool 1 should be assigned to
@@ -1408,7 +1408,7 @@ public class InstanceAssignmentTest {
     numReplicaGroups = numPools;
     replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 
numReplicaGroups, 0, 0, 0, false, null);
     
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig, null, false)));
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
     // [pool0, pool1]
@@ -1438,7 +1438,7 @@ public class InstanceAssignmentTest {
     replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 
numReplicaGroups, 0, 0, 0, true, null);
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, 
null);
     
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig, null, true)));
     // Reset the instance configs to have only two pools.
     instanceConfigs.clear();
     numInstances = 10;
@@ -1487,7 +1487,7 @@ public class InstanceAssignmentTest {
     // Select pool 0 and 1 in pool selection
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, 
Arrays.asList(0, 1));
     
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig, null, true)));
 
     // Get the latest existingInstancePartitions from last computation.
     existingInstancePartitions = instancePartitions;
@@ -1514,7 +1514,7 @@ public class InstanceAssignmentTest {
     numReplicaGroups = 3;
     replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 
numReplicaGroups, 0, 0, 0, true, null);
     
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig, null, true)));
 
     // Get the latest existingInstancePartitions from last computation.
     existingInstancePartitions = instancePartitions;
@@ -1593,7 +1593,7 @@ public class InstanceAssignmentTest {
     numReplicaGroups = 2;
     replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 
numReplicaGroups, 0, 0, 0, true, null);
     
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig, null, true)));
 
     // Get the latest existingInstancePartitions from last computation.
     existingInstancePartitions = instancePartitions;
@@ -1693,6 +1693,109 @@ public class InstanceAssignmentTest {
     assertEquals(instancePartitions.getInstances(0, 1),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX 
+ 9, SERVER_INSTANCE_ID_PREFIX + 11,
             SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+
+    // The below is the test suite for testing out minimizeDataMovement with 
pool configs
+    // Add the third pool with same number of instances but keep number of 
pools the same (i.e. 2)
+    numPools = 3;
+    numInstances = numPools * numInstancesPerPool;
+    for (int i = numInstances + 4; i < numInstances + 9; i++) {
+      InstanceConfig instanceConfig = new 
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      int pool = numPools - 1;
+      instanceConfig.getRecord()
+          .setMapField(InstanceUtils.POOL_KEY, 
Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2, but since 
minimizeDataMovement is enabled,
+    // same pools would be re-used.
+    // [pool0, pool1]
+    //  r0     r1
+    // Thus, the instance partition assignment remains the same as the 
previous one.
+    //     pool 0: [ i12, i4,  i0,  i1, i10 ]
+    //     pool 1: [  i7, i9, i11, i13,  i6 ]
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, 
SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0,
+            SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX 
+ 9, SERVER_INSTANCE_ID_PREFIX + 11,
+            SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+
+    // Set tag pool config to 3.
+    tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, 
null);
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig, null, true)));
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Putting the existingPoolToInstancesMap shouldn't change the instance 
assignment,
+    // as there are only 2 replica groups needed.
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
+    // But since Pool 0 and Pool 1 is already being used for the table, the 
numReplica remains at 2,
+    // so the 3rd pool (Pool 2) won't be picked up.
+    // Thus, the instance partition assignment remains the same as the 
existing one.
+    // All instances in pool 0 should be assigned to replica-group 0, and all 
instances in pool 1 should be assigned to
+    // replica-group 1
+    // Now in poolToInstancesMap:
+    //     pool 0: [ i12, i4,  i0,  i1, i10 ]
+    //     pool 1: [  i7, i9, i11, i13,  i6 ]
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, 
SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0,
+            SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX 
+ 9, SERVER_INSTANCE_ID_PREFIX + 11,
+            SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+
+    // Set replica group from 2 to 3
+    numReplicaGroups = 3;
+    replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 
numReplicaGroups, 0, 0, 0, true, null);
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig, null, true)));
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Now that 1 more replica group is needed, Pool 2 will be chosen for the 
3rd replica group
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
+    // [pool0, pool1, pool2]
+    //  r0     r1     r2
+    // Each replica-group should have 2 instances assigned
+    // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
+    // Latest instances from ZK:
+    //   pool 0: [ i3, i4, i0, i1, i2 ]
+    //   pool 1: [ i8, i9, i5, i6, i7 ]
+    //   pool 2: [ i22,i23,i19,i20,i21]
+    // Thus, the new assignment will become:
+    //   pool 0: [ i12, i4,  i0,  i1, i10 ]
+    //   pool 1: [  i7, i9, i11, i13,  i6 ]
+    //   pool 2: [ i22, i23, i19, i20,i21 ]
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, 
SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0,
+            SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX 
+ 9, SERVER_INSTANCE_ID_PREFIX + 11,
+            SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 22, 
SERVER_INSTANCE_ID_PREFIX + 23, SERVER_INSTANCE_ID_PREFIX + 19,
+            SERVER_INSTANCE_ID_PREFIX + 20, SERVER_INSTANCE_ID_PREFIX + 21));
   }
 
   @Test
@@ -1720,7 +1823,7 @@ public class InstanceAssignmentTest {
     InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
         new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, 
null);
     
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig, null, false)));
 
     // No instance with correct tag
     try {
@@ -1750,7 +1853,7 @@ public class InstanceAssignmentTest {
     // Enable pool
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, null);
     
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig, null, false)));
 
     // No instance has correct pool configured
     try {
@@ -1784,7 +1887,7 @@ public class InstanceAssignmentTest {
 
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 3, null);
     
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig, null, false)));
 
     // Ask for too many pools
     try {
@@ -1796,7 +1899,7 @@ public class InstanceAssignmentTest {
 
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, 
Arrays.asList(0, 2));
     
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig, null, false)));
 
     // Ask for pool that does not exist
     try {
@@ -1810,7 +1913,7 @@ public class InstanceAssignmentTest {
     replicaGroupPartitionConfig = new 
InstanceReplicaGroupPartitionConfig(false, 6, 0, 0, 0, 0, false, null
     );
     
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig, null, false)));
 
     // Ask for too many instances
     try {
@@ -1824,7 +1927,7 @@ public class InstanceAssignmentTest {
     replicaGroupPartitionConfig = new 
InstanceReplicaGroupPartitionConfig(true, 0, 0, 0, 0, 0, false, null
     );
     
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig, null, false)));
 
     // Number of replica-groups must be positive
     try {
@@ -1836,7 +1939,7 @@ public class InstanceAssignmentTest {
 
     replicaGroupPartitionConfig = new 
InstanceReplicaGroupPartitionConfig(true, 0, 11, 0, 0, 0, false, null);
     
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig, null, false)));
 
     // Ask for too many replica-groups
     try {
@@ -1849,7 +1952,7 @@ public class InstanceAssignmentTest {
 
     replicaGroupPartitionConfig = new 
InstanceReplicaGroupPartitionConfig(true, 0, 3, 3, 0, 0, false, null);
     
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig, null, false)));
 
     // Ask for too many instances
     try {
@@ -1861,7 +1964,7 @@ public class InstanceAssignmentTest {
 
     replicaGroupPartitionConfig = new 
InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 3, false, null);
     
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig, null, false)));
 
     // Ask for too many instances per partition
     try {
@@ -1874,7 +1977,7 @@ public class InstanceAssignmentTest {
 
     replicaGroupPartitionConfig = new 
InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 0, false, null);
     
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig, null, false)));
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
     // pool0: [i3, i4, i0, i1, i2]
@@ -1914,7 +2017,8 @@ public class InstanceAssignmentTest {
     try {
       tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
           
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-              new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig, "ILLEGAL_SELECTOR"))).build();
+              new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig, "ILLEGAL_SELECTOR", false)))
+          .build();
     } catch (IllegalArgumentException e) {
       assertEquals(e.getMessage(),
           "No enum constant 
org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig.PartitionSelector"
@@ -1943,7 +2047,8 @@ public class InstanceAssignmentTest {
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
         Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
-                
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
+                
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
 false)))
+        .build();
     driver = new InstanceAssignmentDriver(tableConfig);
     try {
       instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
@@ -1976,7 +2081,8 @@ public class InstanceAssignmentTest {
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
         Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
-                
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
+                
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
 false)))
+        .build();
     driver = new InstanceAssignmentDriver(tableConfig);
     try {
       instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
@@ -2017,7 +2123,8 @@ public class InstanceAssignmentTest {
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
         Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
-                
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
+                
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
 false)))
+        .build();
     driver = new InstanceAssignmentDriver(tableConfig);
     try {
       instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
@@ -2055,7 +2162,8 @@ public class InstanceAssignmentTest {
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
-                
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
+                
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
 false)))
+        .build();
     InstanceAssignmentDriver driver = new 
InstanceAssignmentDriver(tableConfig);
 
     InstancePartitions instancePartitions =
@@ -2127,7 +2235,8 @@ public class InstanceAssignmentTest {
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
         Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
-                
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
+                
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
 true)))
+        .build();
     driver = new InstanceAssignmentDriver(tableConfig);
     // existingInstancePartitions = instancePartitions
     instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, 
instancePartitions);
@@ -2208,7 +2317,7 @@ public class InstanceAssignmentTest {
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
             Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
-                    
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
 false)))
         .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(partitionColumnName, numInstancesPerReplicaGroup))
         .setSegmentPartitionConfig(segmentPartitionConfig).build();
     driver = new InstanceAssignmentDriver(tableConfig);
@@ -2282,7 +2391,7 @@ public class InstanceAssignmentTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME 
+ TABLE_NAME_ZERO_HASH_COMPLEMENT)
             
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, 
instanceConstraintConfig, replicaPartitionConfig,
-                    
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
 false)))
             .build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -2338,7 +2447,7 @@ public class InstanceAssignmentTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME 
+ TABLE_NAME_ZERO_HASH_COMPLEMENT)
             
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, 
instanceConstraintConfig, replicaPartitionConfig,
-                    
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
 true)))
             .build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -2405,7 +2514,7 @@ public class InstanceAssignmentTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME 
+ TABLE_NAME_ZERO_HASH_COMPLEMENT)
             
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, 
instanceConstraintConfig, replicaPartitionConfig,
-                    
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
 false)))
             .build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -2471,7 +2580,7 @@ public class InstanceAssignmentTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME 
+ TABLE_NAME_ZERO_HASH_COMPLEMENT)
             
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, 
instanceConstraintConfig, replicaPartitionConfig,
-                    
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
 true)))
             .build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -2542,7 +2651,7 @@ public class InstanceAssignmentTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME 
+ TABLE_NAME_ZERO_HASH_COMPLEMENT)
             
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, 
instanceConstraintConfig, replicaPartitionConfig,
-                    
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
 false)))
             .build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -2593,7 +2702,7 @@ public class InstanceAssignmentTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME 
+ TABLE_NAME_ZERO_HASH_COMPLEMENT)
             
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, 
instanceConstraintConfig, replicaPartitionConfig,
-                    
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
 false)))
             .build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -2657,7 +2766,7 @@ public class InstanceAssignmentTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME 
+ TABLE_NAME_ZERO_HASH_COMPLEMENT)
             
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, 
instanceConstraintConfig, replicaPartitionConfig,
-                    
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
 true)))
             .build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
index 889206437f..2fdef27796 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
@@ -64,7 +64,7 @@ public class InstanceReplicaGroupPartitionSelectorTest {
         new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, 
null);
 
     InstanceReplicaGroupPartitionSelector selector =
-        new InstanceReplicaGroupPartitionSelector(config, "tableNameBlah", 
existing);
+        new InstanceReplicaGroupPartitionSelector(config, "tableNameBlah", 
existing, true);
 
     String[] serverNames = {"rg0-0", "rg0-1", "rg1-0", "rg1-1"};
     String[] poolNumbers = {"0", "0", "1", "1"};
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 5d679c0380..1df7109ef2 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -195,7 +195,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
         new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 
0, false, null);
     
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig, null, false)));
     _helixResourceManager.updateTableConfig(tableConfig);
 
     // No need to reassign instances because instances should be automatically 
assigned when updating the table config
@@ -481,7 +481,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
         new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 
0, false, null);
     
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(TIER_A_NAME,
-        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig, null, false)));
     _helixResourceManager.updateTableConfig(tableConfig);
 
     rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig(), null);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
index 391ba4812d..ad4b22ecaf 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
@@ -41,13 +41,17 @@ public class InstanceAssignmentConfig extends 
BaseJsonConfig {
       "Configuration for the instance replica-group and partition of the 
instance assignment (mandatory)")
   private final InstanceReplicaGroupPartitionConfig 
_replicaGroupPartitionConfig;
 
+  @JsonPropertyDescription("Configuration to minimize data movement for pool 
and instance assignment")
+  private final boolean _minimizeDataMovement;
+
   @JsonCreator
   public InstanceAssignmentConfig(
       @JsonProperty(value = "tagPoolConfig", required = true) 
InstanceTagPoolConfig tagPoolConfig,
       @JsonProperty("constraintConfig") @Nullable InstanceConstraintConfig 
constraintConfig,
       @JsonProperty(value = "replicaGroupPartitionConfig", required = true)
           InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
-      @JsonProperty("partitionSelector") @Nullable String partitionSelector) {
+      @JsonProperty("partitionSelector") @Nullable String partitionSelector,
+      @JsonProperty("minimizeDataMovement") boolean minimizeDataMovement) {
     Preconditions.checkArgument(tagPoolConfig != null, "'tagPoolConfig' must 
be configured");
     Preconditions
         .checkArgument(replicaGroupPartitionConfig != null, 
"'replicaGroupPartitionConfig' must be configured");
@@ -57,11 +61,7 @@ public class InstanceAssignmentConfig extends BaseJsonConfig 
{
     _partitionSelector =
         partitionSelector == null ? 
PartitionSelector.INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR
             : PartitionSelector.valueOf(partitionSelector);
-  }
-
-  public InstanceAssignmentConfig(InstanceTagPoolConfig tagPoolConfig, 
InstanceConstraintConfig constraintConfig,
-      InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig) {
-    this(tagPoolConfig, constraintConfig, replicaGroupPartitionConfig, null);
+    _minimizeDataMovement = minimizeDataMovement;
   }
 
   public PartitionSelector getPartitionSelector() {
@@ -81,6 +81,10 @@ public class InstanceAssignmentConfig extends BaseJsonConfig 
{
     return _replicaGroupPartitionConfig;
   }
 
+  public boolean isMinimizeDataMovement() {
+    return _minimizeDataMovement;
+  }
+
   public enum PartitionSelector {
     FD_AWARE_INSTANCE_PARTITION_SELECTOR, 
INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR,
     MIRROR_SERVER_SET_PARTITION_SELECTOR
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java
index adc72e8f1c..1bc40cba21 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java
@@ -56,6 +56,8 @@ public class InstanceReplicaGroupPartitionConfig extends 
BaseJsonConfig {
       "Name of the column used for partition, if not provided table level 
replica group will be used")
   private final String _partitionColumn;
 
+  // TODO: remove this config in the next official release
+  @Deprecated
   private final boolean _minimizeDataMovement;
 
   @JsonCreator


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to