This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch minimize-instance-movement
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 17139574d51fe9c6649067c71c0f7ab9e56f3e43
Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz>
AuthorDate: Wed Apr 6 20:22:52 2022 -0700

    Minimize data movement between instances in pools
---
 .../common/assignment/InstancePartitions.java      |  66 +++-
 .../PinotInstanceAssignmentRestletResource.java    |  38 +-
 .../api/resources/PinotTableRestletResource.java   |  10 +-
 .../helix/core/PinotHelixResourceManager.java      |   2 +-
 .../instance/InstanceAssignmentDriver.java         |   7 +-
 .../InstanceReplicaGroupPartitionSelector.java     |  63 ++--
 ...stanceReplicaGroupPartitionSelectorFactory.java |  47 +++
 ...ementInstanceReplicaGroupPartitionSelector.java | 277 +++++++++++++++
 .../core/rebalance/RebalanceConfigConstants.java   |   4 +
 .../helix/core/rebalance/TableRebalancer.java      |  34 +-
 .../instance/InstanceAssignmentTest.java           | 385 +++++++++++++++++++--
 .../apache/pinot/tools/PinotTableRebalancer.java   |   3 +-
 .../tools/admin/command/RebalanceTableCommand.java |   6 +-
 13 files changed, 868 insertions(+), 74 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java
index c511077187..20fa7478ac 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -58,24 +60,39 @@ import org.apache.pinot.spi.utils.JsonUtils;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class InstancePartitions {
   private static final char PARTITION_REPLICA_GROUP_SEPARATOR = '_';
+  private static final String POOLS_KEY = "pools";
+  private static final String REPLICA_GROUP_SEPARATOR = "/";
 
   private final String _instancePartitionsName;
+  // A map to store the partition and its associated list of instances.
+  // The partition key would be like "0_0", where the 1st number denotes the 
partition id,
+  // and the 2nd one denotes the replica group id.
   private final Map<String, List<String>> _partitionToInstancesMap;
+  // A map to store the selected pool numbers and their associated list of 
replica groups.
+  private final Map<Integer, List<Integer>> _poolToReplicaGroupsMap;
   private int _numPartitions;
   private int _numReplicaGroups;
 
   public InstancePartitions(String instancePartitionsName) {
     _instancePartitionsName = instancePartitionsName;
     _partitionToInstancesMap = new TreeMap<>();
+    _poolToReplicaGroupsMap = new TreeMap<>();
   }
 
   @JsonCreator
   private InstancePartitions(
       @JsonProperty(value = "instancePartitionsName", required = true) String 
instancePartitionsName,
       @JsonProperty(value = "partitionToInstancesMap", required = true)
-          Map<String, List<String>> partitionToInstancesMap) {
+          Map<String, List<String>> partitionToInstancesMap,
+      @JsonProperty(value = "poolToReplicaGroupsMap") Map<String, String> 
poolToReplicaGroupsMap) {
     _instancePartitionsName = instancePartitionsName;
     _partitionToInstancesMap = partitionToInstancesMap;
+    _poolToReplicaGroupsMap = new TreeMap<>();
+    if (poolToReplicaGroupsMap != null) {
+      for (Map.Entry<String, String> entry : 
poolToReplicaGroupsMap.entrySet()) {
+        _poolToReplicaGroupsMap.put(Integer.parseInt(entry.getKey()), 
extractReplicaGroups(entry.getValue()));
+      }
+    }
     for (String key : partitionToInstancesMap.keySet()) {
       int separatorIndex = key.indexOf(PARTITION_REPLICA_GROUP_SEPARATOR);
       int partitionId = Integer.parseInt(key.substring(0, separatorIndex));
@@ -105,6 +122,11 @@ public class InstancePartitions {
     return _numReplicaGroups;
   }
 
+  @JsonIgnore
+  public Map<Integer, List<Integer>> getPoolToReplicaGroupsMap() {
+    return _poolToReplicaGroupsMap;
+  }
+
   public List<String> getInstances(int partitionId, int replicaGroupId) {
     return _partitionToInstancesMap
         .get(Integer.toString(partitionId) + PARTITION_REPLICA_GROUP_SEPARATOR 
+ replicaGroupId);
@@ -117,13 +139,53 @@ public class InstancePartitions {
     _numReplicaGroups = Integer.max(_numReplicaGroups, replicaGroupId + 1);
   }
 
+  public void setPoolToReplicaGroupsMap(Map<Integer, List<Integer>> 
poolToReplicaGroupsMap) {
+    _poolToReplicaGroupsMap.putAll(poolToReplicaGroupsMap);
+  }
+
   public static InstancePartitions fromZNRecord(ZNRecord znRecord) {
-    return new InstancePartitions(znRecord.getId(), znRecord.getListFields());
+    return new InstancePartitions(znRecord.getId(), znRecord.getListFields(), 
znRecord.getMapField(POOLS_KEY));
+  }
+
+  private static List<Integer> extractReplicaGroups(String instancesRawString) 
{
+    if (instancesRawString == null || instancesRawString.length() == 0) {
+      return Collections.emptyList();
+    }
+    String[] replicaGroupStringArray = 
instancesRawString.split(REPLICA_GROUP_SEPARATOR);
+    List<Integer> instances = new ArrayList<>(replicaGroupStringArray.length);
+    for (String replicaGroupString : replicaGroupStringArray) {
+      instances.add(Integer.parseInt(replicaGroupString));
+    }
+    return instances;
+  }
+
+  private String convertReplicaGroupsToString(List<Integer> replicaGroups) {
+    if (replicaGroups == null || replicaGroups.isEmpty()) {
+      return "";
+    }
+    StringBuilder stringBuilder = new StringBuilder();
+    for (Integer replicaGroup : replicaGroups) {
+      if (stringBuilder.length() == 0) {
+        stringBuilder.append(replicaGroup);
+      } else {
+        stringBuilder.append(REPLICA_GROUP_SEPARATOR).append(replicaGroup);
+      }
+    }
+    return stringBuilder.toString();
+  }
+
+  private Map<String, String> convertListToStringMap() {
+    Map<String, String> convertedMap = new TreeMap<>();
+    for (Map.Entry<Integer, List<Integer>> entry : 
_poolToReplicaGroupsMap.entrySet()) {
+      convertedMap.put(Integer.toString(entry.getKey()), 
convertReplicaGroupsToString(entry.getValue()));
+    }
+    return convertedMap;
   }
 
   public ZNRecord toZNRecord() {
     ZNRecord znRecord = new ZNRecord(_instancePartitionsName);
     znRecord.setListFields(_partitionToInstancesMap);
+    znRecord.setMapField(POOLS_KEY, convertListToStringMap());
     return znRecord;
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
index 8d9e6cc7dc..40f7bfd0ce 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
@@ -124,7 +124,9 @@ public class PinotInstanceAssignmentRestletResource {
       @ApiParam(value = "Name of the table") @PathParam("tableName") String 
tableName,
       @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") 
@Nullable
           InstancePartitionsType instancePartitionsType,
-      @ApiParam(value = "Whether to do dry-run") @DefaultValue("false") 
@QueryParam("dryRun") boolean dryRun) {
+      @ApiParam(value = "Whether to do dry-run") @DefaultValue("false") 
@QueryParam("dryRun") boolean dryRun,
+      @ApiParam(value = "Whether to retain current instance sequence") 
@DefaultValue("false")
+      @QueryParam("retainInstanceSequence") boolean retainInstanceSequence) {
     Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = 
new TreeMap<>();
     List<InstanceConfig> instanceConfigs = 
_resourceManager.getAllHelixInstanceConfigs();
 
@@ -136,8 +138,8 @@ public class PinotInstanceAssignmentRestletResource {
         try {
           if (InstanceAssignmentConfigUtils
               .allowInstanceAssignment(offlineTableConfig, 
InstancePartitionsType.OFFLINE)) {
-            instancePartitionsMap.put(InstancePartitionsType.OFFLINE, new 
InstanceAssignmentDriver(offlineTableConfig)
-                .assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs));
+            assignInstancesForInstancePartitionsType(instancePartitionsMap, 
offlineTableConfig, instanceConfigs,
+                InstancePartitionsType.OFFLINE, retainInstanceSequence);
           }
         } catch (IllegalStateException e) {
           throw new ControllerApplicationException(LOGGER, "Caught 
IllegalStateException", Response.Status.BAD_REQUEST,
@@ -152,19 +154,18 @@ public class PinotInstanceAssignmentRestletResource {
       TableConfig realtimeTableConfig = 
_resourceManager.getRealtimeTableConfig(tableName);
       if (realtimeTableConfig != null) {
         try {
-          InstanceAssignmentDriver instanceAssignmentDriver = new 
InstanceAssignmentDriver(realtimeTableConfig);
           if (instancePartitionsType == InstancePartitionsType.CONSUMING || 
instancePartitionsType == null) {
             if (InstanceAssignmentConfigUtils
                 .allowInstanceAssignment(realtimeTableConfig, 
InstancePartitionsType.CONSUMING)) {
-              instancePartitionsMap.put(InstancePartitionsType.CONSUMING,
-                  
instanceAssignmentDriver.assignInstances(InstancePartitionsType.CONSUMING, 
instanceConfigs));
+              assignInstancesForInstancePartitionsType(instancePartitionsMap, 
realtimeTableConfig, instanceConfigs,
+                  InstancePartitionsType.CONSUMING, retainInstanceSequence);
             }
           }
           if (instancePartitionsType == InstancePartitionsType.COMPLETED || 
instancePartitionsType == null) {
             if (InstanceAssignmentConfigUtils
                 .allowInstanceAssignment(realtimeTableConfig, 
InstancePartitionsType.COMPLETED)) {
-              instancePartitionsMap.put(InstancePartitionsType.COMPLETED,
-                  
instanceAssignmentDriver.assignInstances(InstancePartitionsType.COMPLETED, 
instanceConfigs));
+              assignInstancesForInstancePartitionsType(instancePartitionsMap, 
realtimeTableConfig, instanceConfigs,
+                  InstancePartitionsType.COMPLETED, retainInstanceSequence);
             }
           }
         } catch (IllegalStateException e) {
@@ -191,6 +192,27 @@ public class PinotInstanceAssignmentRestletResource {
     return instancePartitionsMap;
   }
 
+  /**
+   * Assign instances given the type of instancePartitions.
+   * @param instancePartitionsMap the empty map to be filled.
+   * @param tableConfig table config
+   * @param instanceConfigs list of instance configs
+   * @param instancePartitionsType type of instancePartitions
+   * @param retainInstanceSequence whether to retain instance sequence
+   */
+  private void assignInstancesForInstancePartitionsType(
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
TableConfig tableConfig,
+      List<InstanceConfig> instanceConfigs, InstancePartitionsType 
instancePartitionsType,
+      boolean retainInstanceSequence) {
+    InstancePartitions existingInstancePartitions = null;
+    if (retainInstanceSequence) {
+      existingInstancePartitions = InstancePartitionsUtils
+          
.fetchOrComputeInstancePartitions(_resourceManager.getHelixZkManager(), 
tableConfig, instancePartitionsType);
+    }
+    instancePartitionsMap.put(instancePartitionsType, new 
InstanceAssignmentDriver(tableConfig)
+        .assignInstances(instancePartitionsType, instanceConfigs, 
existingInstancePartitions));
+  }
+
   private void persistInstancePartitionsHelper(InstancePartitions 
instancePartitions) {
     try {
       LOGGER.info("Persisting instance partitions: {}", instancePartitions);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 46e7a2e7da..1db480acce 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -569,9 +569,12 @@ public class PinotTableRestletResource {
           boolean downtime, @ApiParam(
       value = "For no-downtime rebalance, minimum number of replicas to keep 
alive during rebalance, or maximum "
           + "number of replicas allowed to be unavailable if value is 
negative") @DefaultValue("1")
-  @QueryParam("minAvailableReplicas") int minAvailableReplicas, @ApiParam(
-      value = "Whether to use best-efforts to rebalance (not fail the 
rebalance when the no-downtime contract cannot "
-          + "be achieved)") @DefaultValue("false") @QueryParam("bestEfforts") 
boolean bestEfforts) {
+  @QueryParam("minAvailableReplicas") int minAvailableReplicas,
+      @ApiParam(value = "Whether to use best-efforts to rebalance (not fail 
the rebalance"
+          + " when the no-downtime contract cannot be achieved)") 
@DefaultValue("false")
+      @QueryParam("bestEfforts") boolean bestEfforts,
+      @ApiParam(value = "Whether to retain instance sequence during 
rebalancing in order to minimize data movement")
+      @DefaultValue("false") @QueryParam("retainInstancesSequence") boolean 
retainInstancesSequence) {
 
     String tableNameWithType = constructTableNameWithType(tableName, 
tableTypeStr);
 
@@ -583,6 +586,7 @@ public class PinotTableRestletResource {
     rebalanceConfig.addProperty(RebalanceConfigConstants.DOWNTIME, downtime);
     
rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
 minAvailableReplicas);
     rebalanceConfig.addProperty(RebalanceConfigConstants.BEST_EFFORTS, 
bestEfforts);
+    
rebalanceConfig.addProperty(RebalanceConfigConstants.RETAIN_INSTANCE_SEQUENCE, 
retainInstancesSequence);
 
     try {
       if (dryRun || downtime) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 3b901171e8..8fdcf48c60 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1649,7 +1649,7 @@ public class PinotHelixResourceManager {
       List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs();
       for (InstancePartitionsType instancePartitionsType : 
instancePartitionsTypesToAssign) {
         InstancePartitions instancePartitions =
-            instanceAssignmentDriver.assignInstances(instancePartitionsType, 
instanceConfigs);
+            instanceAssignmentDriver.assignInstances(instancePartitionsType, 
instanceConfigs, null);
         LOGGER.info("Persisting instance partitions: {}", instancePartitions);
         InstancePartitionsUtils.persistInstancePartitions(_propertyStore, 
instancePartitions);
       }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index 1440fae204..444f4934ea 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -52,7 +52,7 @@ public class InstanceAssignmentDriver {
   }
 
   public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
-      List<InstanceConfig> instanceConfigs) {
+      List<InstanceConfig> instanceConfigs, InstancePartitions 
existingInstancePartitions) {
     String tableNameWithType = _tableConfig.getTableName();
     LOGGER.info("Starting {} instance assignment for table: {}", 
instancePartitionsType, tableNameWithType);
 
@@ -73,8 +73,9 @@ public class InstanceAssignmentDriver {
       poolToInstanceConfigsMap = 
constraintApplier.applyConstraint(poolToInstanceConfigsMap);
     }
 
-    InstanceReplicaGroupPartitionSelector replicaPartitionSelector =
-        new 
InstanceReplicaGroupPartitionSelector(assignmentConfig.getReplicaGroupPartitionConfig(),
 tableNameWithType);
+    InstanceReplicaGroupPartitionSelector replicaPartitionSelector = 
InstanceReplicaGroupPartitionSelectorFactory
+        
.generateInstanceReplicaGroupPartitionSelector(assignmentConfig.getReplicaGroupPartitionConfig(),
+            tableNameWithType, existingInstancePartitions);
     InstancePartitions instancePartitions = new InstancePartitions(
         
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)));
     replicaPartitionSelector.selectInstances(poolToInstanceConfigsMap, 
instancePartitions);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
index 3e83bf7720..13cbc9cd07 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
@@ -37,8 +37,8 @@ import org.slf4j.LoggerFactory;
 public class InstanceReplicaGroupPartitionSelector {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(InstanceReplicaGroupPartitionSelector.class);
 
-  private final InstanceReplicaGroupPartitionConfig 
_replicaGroupPartitionConfig;
-  private final String _tableNameWithType;
+  protected final InstanceReplicaGroupPartitionConfig 
_replicaGroupPartitionConfig;
+  protected final String _tableNameWithType;
 
   public 
InstanceReplicaGroupPartitionSelector(InstanceReplicaGroupPartitionConfig 
replicaGroupPartitionConfig,
       String tableNameWithType) {
@@ -163,31 +163,44 @@ public class InstanceReplicaGroupPartitionSelector {
       }
     } else {
       // Non-replica-group based selection
+      selectForNonReplicaGroupBased(poolToInstanceConfigsMap, 
instancePartitions);
+    }
+  }
 
-      // Pick one pool based on the table name hash
-      int pool = pools.get(tableNameHash % numPools);
-      LOGGER.info("Selecting pool: {} for table: {}", pool, 
_tableNameWithType);
-      List<InstanceConfig> instanceConfigs = 
poolToInstanceConfigsMap.get(pool);
-      int numInstanceConfigs = instanceConfigs.size();
-
-      // Assign all instances if not configured
-      int numInstancesToSelect = 
_replicaGroupPartitionConfig.getNumInstances();
-      if (numInstancesToSelect > 0) {
-        Preconditions.checkState(numInstancesToSelect <= numInstanceConfigs,
-            "Not enough qualified instances from pool: %s (%s in the pool, 
asked for %s)", pool, numInstanceConfigs,
-            numInstancesToSelect);
-      } else {
-        numInstancesToSelect = numInstanceConfigs;
-      }
+  protected void selectForNonReplicaGroupBased(Map<Integer, 
List<InstanceConfig>> poolToInstanceConfigsMap,
+      InstancePartitions instancePartitions) {
+    int numPools = poolToInstanceConfigsMap.size();
+    Preconditions.checkState(numPools != 0, "No pool qualified for selection");
 
-      List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
-      for (int i = 0; i < numInstancesToSelect; i++) {
-        instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
-      }
-      instancesToSelect.sort(null);
-      LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, 
_tableNameWithType);
-      // Set the instances as partition 0 replica 0
-      instancePartitions.setInstances(0, 0, instancesToSelect);
+    int tableNameHash = Math.abs(_tableNameWithType.hashCode());
+    List<Integer> pools = new ArrayList<>(poolToInstanceConfigsMap.keySet());
+    pools.sort(null);
+    LOGGER.info("Starting instance replica-group/partition selection for 
table: {} with hash: {} from pools: {}",
+        _tableNameWithType, tableNameHash, pools);
+
+    // Pick one pool based on the table name hash
+    int pool = pools.get(tableNameHash % numPools);
+    LOGGER.info("Selecting pool: {} for table: {}", pool, _tableNameWithType);
+    List<InstanceConfig> instanceConfigs = poolToInstanceConfigsMap.get(pool);
+    int numInstanceConfigs = instanceConfigs.size();
+
+    // Assign all instances if not configured
+    int numInstancesToSelect = _replicaGroupPartitionConfig.getNumInstances();
+    if (numInstancesToSelect > 0) {
+      Preconditions.checkState(numInstancesToSelect <= numInstanceConfigs,
+          "Not enough qualified instances from pool: %s (%s in the pool, asked 
for %s)", pool, numInstanceConfigs,
+          numInstancesToSelect);
+    } else {
+      numInstancesToSelect = numInstanceConfigs;
+    }
+
+    List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
+    for (int i = 0; i < numInstancesToSelect; i++) {
+      instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
     }
+    instancesToSelect.sort(null);
+    LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, 
_tableNameWithType);
+    // Set the instances as partition 0 replica 0
+    instancePartitions.setInstances(0, 0, instancesToSelect);
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorFactory.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorFactory.java
new file mode 100644
index 0000000000..36e16e8d27
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorFactory.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.instance;
+
+import org.apache.pinot.common.assignment.InstancePartitions;
+import 
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+
+/**
+ * A factory class to generate {@link InstanceReplicaGroupPartitionSelector}.
+ */
+public class InstanceReplicaGroupPartitionSelectorFactory {
+
+  private InstanceReplicaGroupPartitionSelectorFactory() {
+  }
+
+  public static InstanceReplicaGroupPartitionSelector 
generateInstanceReplicaGroupPartitionSelector(
+      InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, 
String tableNameWithType,
+      InstancePartitions existingInstancePartitions) {
+    InstanceReplicaGroupPartitionSelector replicaPartitionSelector;
+    if (existingInstancePartitions == null) {
+      replicaPartitionSelector =
+          new 
InstanceReplicaGroupPartitionSelector(instanceReplicaGroupPartitionConfig, 
tableNameWithType);
+    } else {
+      // If existing instance partitions is not null, use the customized 
selector to minimize data movement.
+      replicaPartitionSelector =
+          new 
MinimizedDataMovementInstanceReplicaGroupPartitionSelector(instanceReplicaGroupPartitionConfig,
+              tableNameWithType, existingInstancePartitions);
+    }
+    return replicaPartitionSelector;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MinimizedDataMovementInstanceReplicaGroupPartitionSelector.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MinimizedDataMovementInstanceReplicaGroupPartitionSelector.java
new file mode 100644
index 0000000000..0af1264d3a
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MinimizedDataMovementInstanceReplicaGroupPartitionSelector.java
@@ -0,0 +1,277 @@
+package org.apache.pinot.controller.helix.core.assignment.instance;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import 
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An extended class of {@link InstanceReplicaGroupPartitionSelector} to 
minimize data movement between instances.
+ * Currently the following scenarios are supported:
+ *    * swap instances within a pool
+ *    * add / remove instances per replica group
+ *    * increase / decrease number of replica groups
+ *
+ * TODO: Support the remaining scenarios:
+ *    * add / remove pools
+ */
+public class MinimizedDataMovementInstanceReplicaGroupPartitionSelector 
extends InstanceReplicaGroupPartitionSelector {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(MinimizedDataMovementInstanceReplicaGroupPartitionSelector.class);
+
+  private final InstancePartitions _existingInstancePartitions;
+
+  public MinimizedDataMovementInstanceReplicaGroupPartitionSelector(
+      InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, String 
tableNameWithType,
+      InstancePartitions existingInstancePartitions) {
+    super(replicaGroupPartitionConfig, tableNameWithType);
+    _existingInstancePartitions = existingInstancePartitions;
+  }
+
+  @Override
+  public void selectInstances(Map<Integer, List<InstanceConfig>> 
poolToInstanceConfigsMap,
+      InstancePartitions instancePartitions) {
+    int numPools = poolToInstanceConfigsMap.size();
+    Preconditions.checkState(numPools != 0, "No pool qualified for selection");
+
+    int tableNameHash = Math.abs(_tableNameWithType.hashCode());
+    List<Integer> pools = new ArrayList<>(poolToInstanceConfigsMap.keySet());
+    pools.sort(null);
+    
Preconditions.checkState(pools.containsAll(_existingInstancePartitions.getPoolToReplicaGroupsMap().keySet()),
+        String.format("The existing pool no longer exists in ZK any more. 
Existing pools: %s. Latest pools: %s",
+            _existingInstancePartitions.getPoolToReplicaGroupsMap().keySet(), 
pools));
+    LOGGER.info("Starting instance replica-group/partition selection for 
table: {} with hash: {} from pools: {}",
+        _tableNameWithType, tableNameHash, pools);
+
+    if (_replicaGroupPartitionConfig.isReplicaGroupBased()) {
+      // Replica-group based selection
+
+      // Find out the mapping between pool and replica groups.
+      int numReplicaGroups = 
_replicaGroupPartitionConfig.getNumReplicaGroups();
+      Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups 
must be positive");
+      Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
+      Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
+      for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
+        // Pick one pool for each replica-group based on the table name hash
+        int pool = pools.get((tableNameHash + replicaId) % numPools);
+        poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new 
ArrayList<>()).add(replicaId);
+        replicaGroupIdToPoolMap.put(replicaId, pool);
+      }
+      LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", 
numReplicaGroups, poolToReplicaGroupIdsMap,
+          _tableNameWithType);
+
+      // Finalize the number of instances per replica group.
+      int numInstancesPerReplicaGroup = 
_replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup();
+      if (numInstancesPerReplicaGroup > 0) {
+        // Check if we have enough instances if number of instances per 
replica-group is configured
+        for (Map.Entry<Integer, List<Integer>> entry : 
poolToReplicaGroupIdsMap.entrySet()) {
+          int pool = entry.getKey();
+          int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size();
+          int numInstancesToSelect = numInstancesPerReplicaGroup * 
entry.getValue().size();
+          Preconditions.checkState(numInstancesToSelect <= numInstancesInPool,
+              "Not enough qualified instances from pool: %s (%s in the pool, 
asked for %s)", pool, numInstancesInPool,
+              numInstancesToSelect);
+        }
+      } else {
+        // Use as many instances as possible if number of instances per 
replica-group is not configured
+        numInstancesPerReplicaGroup = Integer.MAX_VALUE;
+        for (Map.Entry<Integer, List<Integer>> entry : 
poolToReplicaGroupIdsMap.entrySet()) {
+          int pool = entry.getKey();
+          int numReplicaGroupsInPool = entry.getValue().size();
+          int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size();
+          Preconditions.checkState(numReplicaGroupsInPool <= 
numInstancesInPool,
+              "Not enough qualified instances from pool: %s, cannot select %s 
replica-groups from %s instances", pool,
+              numReplicaGroupsInPool, numInstancesInPool);
+          numInstancesPerReplicaGroup =
+              Math.min(numInstancesPerReplicaGroup, numInstancesInPool / 
numReplicaGroupsInPool);
+        }
+      }
+      LOGGER.info("Selecting {} instances per replica-group for table: {}", 
numInstancesPerReplicaGroup,
+          _tableNameWithType);
+
+      // Assign instances within a replica-group to one partition if not 
configured
+      int numPartitions = _replicaGroupPartitionConfig.getNumPartitions();
+      if (numPartitions <= 0) {
+        numPartitions = 1;
+      }
+      // Assign all instances within a replica-group to each partition if not 
configured
+      int numInstancesPerPartition = 
_replicaGroupPartitionConfig.getNumInstancesPerPartition();
+      if (numInstancesPerPartition > 0) {
+        Preconditions.checkState(numInstancesPerPartition <= 
numInstancesPerReplicaGroup,
+            "Number of instances per partition: %s must be smaller or equal to 
number of instances per replica-group:"
+                + " %s", numInstancesPerPartition, 
numInstancesPerReplicaGroup);
+      } else {
+        numInstancesPerPartition = numInstancesPerReplicaGroup;
+      }
+      LOGGER.info("Selecting {} partitions, {} instances per partition within 
a replica-group for table: {}",
+          numPartitions, numInstancesPerPartition, _tableNameWithType);
+
+      // Step 1: Identify candidate instances from latest list of instance 
configs in ZK.
+      Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
+      for (Map.Entry<Integer, List<InstanceConfig>> entry : 
poolToInstanceConfigsMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<InstanceConfig> instanceConfigs = entry.getValue();
+        for (InstanceConfig instanceConfig : instanceConfigs) {
+          poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new 
LinkedHashSet<>())
+              .add(instanceConfig.getInstanceName());
+        }
+      }
+
+      Map<Integer, Map<String, String>> 
poolToGoneInstancesAndReplacedInstancesMap = new TreeMap<>();
+      Map<String, List<String>> existingPartitionToLatestInstancesMap = new 
TreeMap<>();
+      Map<String, List<String>> existingPartitionToInstancesMap =
+          _existingInstancePartitions.getPartitionToInstancesMap();
+      Map<Integer, Set<String>> poolToExistingAliveInstancesMap = new 
TreeMap<>();
+
+      int maxNumberOfInstancesPerInstancePartitionAssignment = 
Integer.MIN_VALUE;
+      for (List<String> instances : existingPartitionToInstancesMap.values()) {
+        maxNumberOfInstancesPerInstancePartitionAssignment =
+            Math.max(maxNumberOfInstancesPerInstancePartitionAssignment, 
instances.size());
+      }
+
+      // Step 2: by reusing the existing mapping, find out the missing 
instances.
+      for (int replicaGroupId = 0; replicaGroupId < 
_existingInstancePartitions.getNumReplicaGroups();
+          replicaGroupId++) {
+        Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+        if (pool == null) {
+          // Skip validating replica group if it's no longer needed.
+          continue;
+        }
+        for (int partitionId = 0; partitionId < 
_existingInstancePartitions.getNumPartitions(); partitionId++) {
+          List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+          List<String> latestInstancesInMap = 
existingPartitionToLatestInstancesMap
+              .computeIfAbsent(partitionId + "_" + replicaGroupId, k -> new 
ArrayList<>());
+
+          for (String existingInstance : existingInstances) {
+            // The instance still exists in the ZK.
+            if (poolToReplicaGroupIdsMap.containsKey(pool) && 
poolToCandidateInstancesMap.containsKey(pool)
+                && 
poolToCandidateInstancesMap.get(pool).contains(existingInstance)) {
+              poolToExistingAliveInstancesMap.computeIfAbsent(pool, k -> new 
HashSet<>()).add(existingInstance);
+              latestInstancesInMap.add(existingInstance);
+            } else {
+              // The instance no longer exists
+              poolToGoneInstancesAndReplacedInstancesMap.computeIfAbsent(pool, 
k -> new TreeMap<>())
+                  .put(existingInstance, null);
+              latestInstancesInMap.add(null);
+            }
+          }
+        }
+      }
+
+      // Step 3: Find out all new instances in each pool.
+      Map<Integer, Deque<String>> poolToNewCandidateInstancesMap = new 
TreeMap<>();
+      for (Map.Entry<Integer, Set<String>> entry : 
poolToCandidateInstancesMap.entrySet()) {
+        Integer pool = entry.getKey();
+        Set<String> candidateInstancesInPool = entry.getValue();
+        Set<String> existingStillAliveInstances =
+            poolToExistingAliveInstancesMap.computeIfAbsent(pool, k -> new 
HashSet<>());
+        for (String candidateInstance : candidateInstancesInPool) {
+          if (!existingStillAliveInstances.contains(candidateInstance)) {
+            poolToNewCandidateInstancesMap.computeIfAbsent(pool, k -> new 
LinkedList<>()).add(candidateInstance);
+          }
+        }
+      }
+
+      // Step 4: Find the 1:1 mapping between the gone instance and the new 
instance.
+      for (Map.Entry<Integer, Map<String, String>> entry : 
poolToGoneInstancesAndReplacedInstancesMap.entrySet()) {
+        Integer pool = entry.getKey();
+        Map<String, String> goneInstanceToNewInstanceMap = entry.getValue();
+        Deque<String> newInstancesInPool =
+            poolToNewCandidateInstancesMap.computeIfAbsent(pool, k -> new 
LinkedList<>());
+        goneInstanceToNewInstanceMap.replaceAll((k, v) -> {
+          if (!newInstancesInPool.isEmpty()) {
+            return newInstancesInPool.pollFirst();
+          } else {
+            return v;
+          }
+        });
+      }
+
+      // Step 5: Fill the vacant positions with the new instances.
+      Map<String, List<String>> newInstancePartitionsAssignmentMap = new 
TreeMap<>();
+      int finalNumInstancesPerPartition = numInstancesPerPartition;
+      for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+        for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
+          Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+          Map<String, String> goneInstanceToNewInstanceMap =
+              poolToGoneInstancesAndReplacedInstancesMap.computeIfAbsent(pool, 
k -> new TreeMap<>());
+          Set<String> candidateInstancesMap = 
poolToCandidateInstancesMap.get(pool);
+          String partitionToReplicaGroupKey = partitionId + "_" + 
replicaGroupId;
+          List<String> existingInstances =
+              
existingPartitionToInstancesMap.computeIfAbsent(partitionToReplicaGroupKey, k 
-> new ArrayList<>());
+
+          // Construct an empty list to store the latest instances.
+          List<String> latestInstanceAssignment =
+              
newInstancePartitionsAssignmentMap.computeIfAbsent(partitionToReplicaGroupKey, 
k -> {
+                List<String> instances = new 
ArrayList<>(finalNumInstancesPerPartition);
+                for (int i = 0; i < finalNumInstancesPerPartition; i++) {
+                  instances.add(null);
+                }
+                return instances;
+              });
+
+          // Traverse the existing list of instances, fill the vacant 
positions with new instances from the map.
+          for (int i = 0; i < existingInstances.size() && i < 
finalNumInstancesPerPartition; i++) {
+            String existingInstance = existingInstances.get(i);
+            String replacedInstance = 
goneInstanceToNewInstanceMap.get(existingInstance);
+            if (replacedInstance != null) {
+              latestInstanceAssignment.set(i, replacedInstance);
+              candidateInstancesMap.remove(replacedInstance);
+            } else {
+              // If the instance does exist in the gone map but there is no 
new instance to replace its position,
+              // skip adding anything into the assignment.
+              if (!goneInstanceToNewInstanceMap.containsKey(existingInstance)) 
{
+                latestInstanceAssignment.set(i, existingInstance);
+                candidateInstancesMap.remove(existingInstance);
+              }
+            }
+          }
+          // If the new number of instances per partition is larger than the 
previous one, extend the vacant positions.
+          if (finalNumInstancesPerPartition > existingInstances.size()) {
+            Iterator<String> candidateInstancesInPoolIterator = 
candidateInstancesMap.iterator();
+            for (int i = existingInstances.size(); i < 
finalNumInstancesPerPartition; i++) {
+              if (candidateInstancesInPoolIterator.hasNext()) {
+                String candidateInstance = 
candidateInstancesInPoolIterator.next();
+                latestInstanceAssignment.set(i, candidateInstance);
+                candidateInstancesInPoolIterator.remove();
+              }
+            }
+          }
+
+          // Fill up the vacant positions if any.
+          for (int i = 0; i < latestInstanceAssignment.size(); i++) {
+            Iterator<String> candidateInstancesInPoolIterator = 
candidateInstancesMap.iterator();
+            if (latestInstanceAssignment.get(i) == null) {
+              if (candidateInstancesInPoolIterator.hasNext()) {
+                String candidateInstance = 
candidateInstancesInPoolIterator.next();
+                latestInstanceAssignment.set(i, candidateInstance);
+                candidateInstancesInPoolIterator.remove();
+              }
+            }
+          }
+
+          instancePartitions.setInstances(partitionId, replicaGroupId, 
latestInstanceAssignment);
+        }
+      }
+
+      // Persist poolToReplicaGroupsMap to ZK.
+      instancePartitions.setPoolToReplicaGroupsMap(poolToReplicaGroupIdsMap);
+    } else {
+      // Non-replica-group based selection
+      selectForNonReplicaGroupBased(poolToInstanceConfigsMap, 
instancePartitions);
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java
index 70bebabc17..d08fbbfd84 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java
@@ -33,6 +33,10 @@ public class RebalanceConfigConstants {
   public static final String REASSIGN_INSTANCES = "reassignInstances";
   public static final boolean DEFAULT_REASSIGN_INSTANCES = false;
 
+  // Whether to retain the sequence for the existing instances
+  public static final String RETAIN_INSTANCE_SEQUENCE = 
"retainInstancesSequence";
+  public static final boolean DEFAULT_RETAIN_INSTANCE_SEQUENCE = false;
+
   // Whether to reassign CONSUMING segments
   public static final String INCLUDE_CONSUMING = "includeConsuming";
   public static final boolean DEFAULT_INCLUDE_CONSUMING = false;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index d6701c8363..8f86ac6fa9 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -145,6 +145,8 @@ public class TableRebalancer {
         tableConfig.getRoutingConfig().getInstanceSelectorType());
     boolean bestEfforts = 
rebalanceConfig.getBoolean(RebalanceConfigConstants.BEST_EFFORTS,
         RebalanceConfigConstants.DEFAULT_BEST_EFFORTS);
+    boolean retainInstanceSequence = 
rebalanceConfig.getBoolean(RebalanceConfigConstants.RETAIN_INSTANCE_SEQUENCE,
+        RebalanceConfigConstants.DEFAULT_RETAIN_INSTANCE_SEQUENCE);
     LOGGER.info(
         "Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, 
includeConsuming: {}, bootstrap: {}, "
             + "downtime: {}, minReplicasToKeepUpForNoDowntime: {}, 
enableStrictReplicaGroup: {}, bestEfforts: {}",
@@ -194,7 +196,7 @@ public class TableRebalancer {
     // Calculate instance partitions map
     Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap;
     try {
-      instancePartitionsMap = getInstancePartitionsMap(tableConfig, 
reassignInstances, dryRun);
+      instancePartitionsMap = getInstancePartitionsMap(tableConfig, 
reassignInstances, dryRun, retainInstanceSequence);
     } catch (Exception e) {
       LOGGER.warn(
           "Caught exception while fetching/calculating instance partitions for 
table: {}, aborting the rebalance",
@@ -323,10 +325,12 @@ public class TableRebalancer {
           currentIdealState = idealState;
           currentAssignment = currentIdealState.getRecord().getMapFields();
           // Re-calculate the instance partitions in case the instance configs 
changed during the rebalance
-          instancePartitionsMap = getInstancePartitionsMap(tableConfig, 
reassignInstances, false);
+          instancePartitionsMap =
+              getInstancePartitionsMap(tableConfig, reassignInstances, false, 
retainInstanceSequence);
           tierToInstancePartitionsMap = 
getTierToInstancePartitionsMap(tableNameWithType, sortedTiers);
-          targetAssignment = 
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, 
sortedTiers,
-              tierToInstancePartitionsMap, rebalanceConfig);
+          targetAssignment = segmentAssignment
+              .rebalanceTable(currentAssignment, instancePartitionsMap, 
sortedTiers, tierToInstancePartitionsMap,
+                  rebalanceConfig);
           expectedVersion = currentIdealState.getRecord().getVersion();
         } catch (Exception e) {
           LOGGER.warn(
@@ -381,21 +385,24 @@ public class TableRebalancer {
   }
 
   private Map<InstancePartitionsType, InstancePartitions> 
getInstancePartitionsMap(TableConfig tableConfig,
-      boolean reassignInstances, boolean dryRun) {
+      boolean reassignInstances, boolean dryRun, boolean 
retainInstanceSequence) {
     Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = 
new TreeMap<>();
     if (tableConfig.getTableType() == TableType.OFFLINE) {
       instancePartitionsMap.put(InstancePartitionsType.OFFLINE,
-          getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, 
reassignInstances, dryRun));
+          getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, 
reassignInstances, dryRun,
+              retainInstanceSequence));
     } else {
       instancePartitionsMap.put(InstancePartitionsType.CONSUMING,
-          getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, 
reassignInstances, dryRun));
+          getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, 
reassignInstances, dryRun,
+              retainInstanceSequence));
       String tableNameWithType = tableConfig.getTableName();
       if 
(InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig)) {
         LOGGER.info(
             "COMPLETED segments should be relocated, fetching/computing 
COMPLETED instance partitions for table: {}",
             tableNameWithType);
         instancePartitionsMap.put(InstancePartitionsType.COMPLETED,
-            getInstancePartitions(tableConfig, 
InstancePartitionsType.COMPLETED, reassignInstances, dryRun));
+            getInstancePartitions(tableConfig, 
InstancePartitionsType.COMPLETED, reassignInstances, dryRun,
+                retainInstanceSequence));
       } else {
         LOGGER.info(
             "COMPLETED segments should not be relocated, skipping 
fetching/computing COMPLETED instance partitions "
@@ -413,14 +420,21 @@ public class TableRebalancer {
   }
 
   private InstancePartitions getInstancePartitions(TableConfig tableConfig,
-      InstancePartitionsType instancePartitionsType, boolean 
reassignInstances, boolean dryRun) {
+      InstancePartitionsType instancePartitionsType, boolean 
reassignInstances, boolean dryRun,
+      boolean retainInstanceSequence) {
     String tableNameWithType = tableConfig.getTableName();
     if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, 
instancePartitionsType)) {
       if (reassignInstances) {
+        InstancePartitions existingInstancePartitions = null;
+        if (retainInstanceSequence) {
+          existingInstancePartitions = InstancePartitionsUtils
+              .fetchOrComputeInstancePartitions(_helixManager, tableConfig, 
instancePartitionsType);
+        }
         LOGGER.info("Reassigning {} instances for table: {}", 
instancePartitionsType, tableNameWithType);
         InstanceAssignmentDriver instanceAssignmentDriver = new 
InstanceAssignmentDriver(tableConfig);
         InstancePartitions instancePartitions = 
instanceAssignmentDriver.assignInstances(instancePartitionsType,
-            
_helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(),
 true));
+            
_helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(),
 true),
+            existingInstancePartitions);
         if (!dryRun) {
           LOGGER.info("Persisting instance partitions: {} to ZK", 
instancePartitions);
           
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
 instancePartitions);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index f1fc049ff5..197b4919df 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -71,7 +71,8 @@ public class InstanceAssignmentTest {
     }
 
     // Instances should be assigned to 3 replica-groups with a round-robin 
fashion, each with 2 instances
-    InstancePartitions instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    InstancePartitions instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, null);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     // Instances of index 4 to 7 are not assigned because of the hash-based 
rotation
@@ -95,7 +96,7 @@ public class InstanceAssignmentTest {
 
     // Instances should be assigned to 3 replica-groups with a round-robin 
fashion, each with 3 instances, then these 3
     // instances should be assigned to 2 partitions, each with 2 instances
-    instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
     assertEquals(instancePartitions.getNumPartitions(), numPartitions);
     // Instance of index 7 is not assigned because of the hash-based rotation
@@ -123,6 +124,85 @@ public class InstanceAssignmentTest {
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX 
+ 3));
     assertEquals(instancePartitions.getInstances(1, 2),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX 
+ 6));
+
+    // ===== Test against the cases when the existing instancePartitions isn't 
null. =====
+    // Put the existing instancePartitions as the parameter to the 
InstanceAssignmentDriver.
+    // The returned instance partition should be the same as the last computed 
one.
+
+    // Instances should be assigned to 3 replica-groups with a round-robin 
fashion, each with 3 instances, then these 3
+    // instances should be assigned to 2 partitions, each with 2 instances
+    instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, 
instancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+    assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+    // Instance of index 7 is not assigned because of the hash-based rotation
+    // Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8
+    // [i8, i9, i0, i1, i2, i3, i4, i5, i6, i7]
+    //  r0, r1, r2, r0, r1, r2, r0, r1, r2
+    // r0: [i8, i1, i4]
+    //      p0, p0, p1
+    //      p1
+    // r1: [i9, i2, i5]
+    //      p0, p0, p1
+    //      p1
+    // r2: [i0, i3, i6]
+    //      p0, p0, p1
+    //      p1
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX 
+ 8));
+    assertEquals(instancePartitions.getInstances(1, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX 
+ 8));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX 
+ 9));
+    assertEquals(instancePartitions.getInstances(1, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX 
+ 9));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX 
+ 3));
+    assertEquals(instancePartitions.getInstances(1, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX 
+ 6));
+
+    // Remove two instances (i2, i6) and add two new instances (i10, i11).
+    instanceConfigs.remove(6);
+    instanceConfigs.remove(2);
+    for (int i = numInstances; i < numInstances + 2; i++) {
+      InstanceConfig instanceConfig = new 
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Instances should be assigned to 3 replica-groups with a round-robin 
fashion, each with 3 instances, then these 3
+    // instances should be assigned to 2 partitions, each with 2 instances
+    // Leverage the latest instancePartitions from last computation as the 
parameter.
+    // Data movement is minimized so that: i2 -> i10, i6 -> i11
+    instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, 
instancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+    assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+    // Instance of index 7 is not assigned because of the hash-based rotation
+    // Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8
+    // [i8, i9, i0, i1, i10, i3, i4, i5, i11, i7]
+    //  r0, r1, r2, r0, r1, r2, r0, r1, r2
+    // r0: [i8, i1, i4]
+    //      p0, p0, p1
+    //      p1
+    // r1: [i9, i10, i5]
+    //      p0, p0, p1
+    //      p1
+    // r2: [i0, i3, i11]
+    //      p0, p0, p1
+    //      p1
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX 
+ 8));
+    assertEquals(instancePartitions.getInstances(1, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX 
+ 8));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 10, 
SERVER_INSTANCE_ID_PREFIX + 9));
+    assertEquals(instancePartitions.getInstances(1, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX 
+ 9));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX 
+ 3));
+    assertEquals(instancePartitions.getInstances(1, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX 
+ 11));
   }
 
   @Test
@@ -155,7 +235,8 @@ public class InstanceAssignmentTest {
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
     // All instances in pool 0 should be assigned to replica-group 0, and all 
instances in pool 1 should be assigned to
     // replica-group 1
-    InstancePartitions instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    InstancePartitions instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, null);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0), Arrays
@@ -182,7 +263,7 @@ public class InstanceAssignmentTest {
     // Pool 0 and 2 will be selected in the pool selection
     // All instances in pool 0 should be assigned to replica-group 0, and all 
instances in pool 2 should be assigned to
     // replica-group 1
-    instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0), Arrays
@@ -200,7 +281,7 @@ public class InstanceAssignmentTest {
     // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
     // All instances in pool 2 should be assigned to replica-group 0, and all 
instances in pool 0 should be assigned to
     // replica-group 1
-    instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0), Arrays
@@ -218,7 +299,7 @@ public class InstanceAssignmentTest {
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
     // All instances in pool 0 should be assigned to replica-group 0, and all 
instances in pool 1 should be assigned to
     // replica-group 1
-    instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0), Arrays
@@ -244,7 +325,7 @@ public class InstanceAssignmentTest {
     //          r0  r2  r0  r2
     // pool 1: [i8, i9, i5, i6, i7]
     //          r1  r1
-    instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0),
@@ -253,6 +334,269 @@ public class InstanceAssignmentTest {
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX 
+ 9));
     assertEquals(instancePartitions.getInstances(0, 2),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX 
+ 4));
+
+    // ===== Test against the cases when the existing instancePartitions isn't 
null. =====
+    // Reset the number of replica groups to 2 and pools to 2.
+    numReplicaGroups = 2;
+    numPools = 2;
+    replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 
numReplicaGroups, 0, 0, 0);
+    tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, 
null);
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig)));
+    // Reset the instance configs to have only two pools.
+    instanceConfigs.clear();
+    numInstances = 10;
+    for (int i = 0; i < numInstances; i++) {
+      InstanceConfig instanceConfig = new 
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      int pool = i / numInstancesPerPool;
+      instanceConfig.getRecord()
+          .setMapField(InstanceUtils.POOL_KEY, 
Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Use all pools, the instancePartitions should be the same as the one 
without using
+    // the existing partition to instances map.
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // All instances in pool 0 should be assigned to replica-group 0, and all 
instances in pool 1 should be assigned to
+    // replica-group 1.
+    // [pool0, pool1]
+    //  r0     r1
+    InstancePartitions existingInstancePartitions = null;
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, 
SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4));
+    assertEquals(instancePartitions.getInstances(0, 1), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, 
SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9));
+
+    // Get the latest existingPoolToInstancesMap from last computation and try 
again.
+    // The actual assignment should be the same as last one.
+    existingInstancePartitions = instancePartitions;
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, 
SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4));
+    assertEquals(instancePartitions.getInstances(0, 1), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, 
SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9));
+
+    // Select pool 0 and 1 in pool selection
+    tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, 
Arrays.asList(0, 1));
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig)));
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Putting the existingPoolToInstancesMap shouldn't change the instance 
assignment.
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // All instances in pool 0 should be assigned to replica-group 0, and all 
instances in pool 1 should be assigned to
+    // replica-group 1
+    // Now in poolToInstancesMap:
+    // pool 0: [ i3, i4, i0, i1, i2 ]
+    // pool 1: [ i8, i9, i5, i6, i7 ]
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, 
SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4));
+    assertEquals(instancePartitions.getInstances(0, 1), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, 
SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9));
+
+    // Assign instances from 2 pools to 3 replica-groups
+    numReplicaGroups = 3;
+    replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 
numReplicaGroups, 0, 0, 0);
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig)));
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // [pool0, pool1]
+    //  r0     r1
+    //  r2
+    // Each replica-group should have 2 instances assigned
+    // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
+    // Latest instances from ZK:
+    //   pool 0: [ i3, i4, i0, i1, i2 ]
+    //   pool 1: [ i8, i9, i5, i6, i7 ]
+    // Due to the fact that in existing instancePartition instances are 
sorted, i0 and i1 will be retained for r0,
+    // i5 and i6 will be retained for r1. i3 and i4 are picked up from latest 
instances in the target pool.
+    // Thus, the new assignment will be as follows:
+    //   pool 0: [i0, i1, i2, i3, i4]
+    //            r0  r0      r2  r2
+    //   pool 1: [i5, i6, i7, i8, i9]
+    //            r1  r1
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX 
+ 1));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX 
+ 6));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX 
+ 4));
+
+    // Remove one instance from each of the pools and add one more back.
+    instanceConfigs.remove(5);
+    instanceConfigs.remove(3);
+    int poolCount = 0;
+    for (int i = numInstances; i < numInstances + 2; i++) {
+      InstanceConfig instanceConfig = new 
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      int pool = poolCount++;
+      instanceConfig.getRecord()
+          .setMapField(InstanceUtils.POOL_KEY, 
Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // [pool0, pool1]
+    //  r0     r1
+    //  r2
+    // Each replica-group should have 2 instances assigned
+    // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
+    // Latest instances from ZK:
+    //     pool 0: [ i2,  i4,  i0, i1, i10 ]
+    //     pool 1: [ i8,  i9, i11, i6,  i7 ]
+    // i3 gets swapped out, the next available instance i2 will take its place.
+    // Similarly, i5 is swapped out and i8 will take its place.
+    // Thus, the new assignment will be as follows:
+    //     pool 0: [ i2,  i4,  i0, i1, i10 ]
+    //               r2   r2   r0  r0
+    //     pool 1: [ i8,  i9, i11, i6,  i7 ]
+    //               r1            r1
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX 
+ 1));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX 
+ 6));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX 
+ 4));
+
+    // Reduce number of replica groups from 3 to 2.
+    numReplicaGroups = 2;
+    replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 
numReplicaGroups, 0, 0, 0);
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig)));
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // [pool0, pool1]
+    //  r0     r1
+    // Each replica-group should have 2 instances assigned
+    // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
+    // Latest instances from ZK:
+    //     pool 0: [ i2,  i4,  i0, i1, i10 ]
+    //     pool 1: [ i8,  i9, i11, i6,  i7 ]
+    // In the existing instancePartitions, r0 already has [i0, i1], append the 
rest
+    // available instances (ie. [i2, i4, i10]) to the tail.
+    // r1 already has [i8, i6], append the rest available instances (ie. [i9, 
i11, i7]) to the tail.
+    // Thus, the new assignment will become:
+    //     pool 0: [ i0, i1, i2,  i4, i10 ]
+    //               r0   r0   r0  r0  r0
+    //     pool 1: [ i8, i6, i9, i11,  i7 ]
+    //               r1   r1  r1  r1   r1
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, 
SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 10));
+    assertEquals(instancePartitions.getInstances(0, 1), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 6, 
SERVER_INSTANCE_ID_PREFIX + 9,
+            SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 7));
+
+    // Add 1 more instances to each pool
+    poolCount = 0;
+    for (int i = numInstances + 2; i < numInstances + 4; i++) {
+      InstanceConfig instanceConfig = new 
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      int pool = poolCount++;
+      instanceConfig.getRecord()
+          .setMapField(InstanceUtils.POOL_KEY, 
Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // [pool0, pool1]
+    //  r0     r1
+    // Each replica-group should have 2 instances assigned
+    // Math.abs("myTable_OFFLINE".hashCode()) % 6 = 2
+    // Latest instances from ZK:
+    //     pool 0: [ i10, i12, i2, i4,  i0,  i1 ]
+    //     pool 1: [  i6,  i7, i8, i9, i11, i13 ]
+    // There is one more empty position for each of the replica groups.
+    // Append the newly added instances (ie. i12 and i13) to the tails.
+    // Thus, the new assignment will become:
+    //     pool 0: [ i0, i1, i2,  i4, i10, i12 ]
+    //     pool 1: [ i8, i6, i9, i11,  i7, i13 ]
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, 
SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 10, 
SERVER_INSTANCE_ID_PREFIX + 12));
+    assertEquals(instancePartitions.getInstances(0, 1), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 6, 
SERVER_INSTANCE_ID_PREFIX + 9,
+            SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 7, 
SERVER_INSTANCE_ID_PREFIX + 13));
+
+    // Remove one instances from each of the pools, ie. i2 and i8.
+    instanceConfigs.remove(6);
+    instanceConfigs.remove(2);
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // [pool0, pool1]
+    //  r0     r1
+    // Each replica-group should have 2 instances assigned
+    // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
+    // Latest instances from ZK:
+    //     pool 0: [ i12, i4,  i0, i1, i10 ]
+    //     pool 1: [ i7,  i9, i11, i13, i6 ]
+    // Since i2 and i8 got removed from the pools,
+    // the tail instances (ie. i12 and 13) will be used to fill their vacant 
position.
+    // Thus, the new assignment will become:
+    //     pool 0: [ i0, i1, i12,  i4, i10 ]
+    //     pool 1: [ i13, i6, i9, i11,  i7 ]
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, 
SERVER_INSTANCE_ID_PREFIX + 12,
+            SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 10));
+    assertEquals(instancePartitions.getInstances(0, 1), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6, 
SERVER_INSTANCE_ID_PREFIX + 9,
+            SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 7));
   }
 
   @Test
@@ -270,7 +614,7 @@ public class InstanceAssignmentTest {
     // No instance assignment config
     
assertFalse(InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, 
InstancePartitionsType.OFFLINE));
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, 
null);
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Instance assignment is not allowed for the 
given table config");
@@ -284,7 +628,7 @@ public class InstanceAssignmentTest {
 
     // No instance with correct tag
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, 
null);
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "No enabled instance has the tag: 
tenant_OFFLINE");
@@ -295,7 +639,8 @@ public class InstanceAssignmentTest {
     }
 
     // All instances should be assigned as replica-group 0 partition 0
-    InstancePartitions instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    InstancePartitions instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, null);
     assertEquals(instancePartitions.getNumReplicaGroups(), 1);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     List<String> expectedInstances = new ArrayList<>(numInstances);
@@ -311,7 +656,7 @@ public class InstanceAssignmentTest {
 
     // No instance has correct pool configured
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, 
null);
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "No enabled instance has the pool 
configured for the tag: tenant_OFFLINE");
@@ -328,7 +673,7 @@ public class InstanceAssignmentTest {
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
     // All instances in pool 0 should be assigned as replica-group 0 partition 0
-    instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
     assertEquals(instancePartitions.getNumReplicaGroups(), 1);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     expectedInstances.clear();
@@ -343,7 +688,7 @@ public class InstanceAssignmentTest {
 
     // Ask for too many pools
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, 
null);
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Not enough instance pools (2 in the 
cluster, asked for 3)");
@@ -355,7 +700,7 @@ public class InstanceAssignmentTest {
 
     // Ask for pool that does not exist
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, 
null);
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Cannot find all instance pools configured: 
[0, 2]");
@@ -368,7 +713,7 @@ public class InstanceAssignmentTest {
 
     // Ask for too many instances
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, 
null);
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Not enough qualified instances from pool: 
0 (5 in the pool, asked for 6)");
@@ -381,7 +726,7 @@ public class InstanceAssignmentTest {
 
     // Number of replica-groups must be positive
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, 
null);
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Number of replica-groups must be 
positive");
@@ -393,7 +738,7 @@ public class InstanceAssignmentTest {
 
     // Ask for too many replica-groups
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, 
null);
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(),
@@ -406,7 +751,7 @@ public class InstanceAssignmentTest {
 
     // Ask for too many instances
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, 
null);
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Not enough qualified instances from pool: 
0 (5 in the pool, asked for 6)");
@@ -418,7 +763,7 @@ public class InstanceAssignmentTest {
 
     // Ask for too many instances per partition
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, 
null);
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(),
@@ -434,7 +779,7 @@ public class InstanceAssignmentTest {
     //         r0  r2  r0  r2
     // pool1: [i8, i9, i5, i6, i7]
     //         r1  r1
-    instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
     assertEquals(instancePartitions.getNumReplicaGroups(), 3);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0),
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
index b2bfab19f1..6758d17ff5 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
@@ -36,7 +36,7 @@ public class PinotTableRebalancer extends PinotZKChanger {
 
   public PinotTableRebalancer(String zkAddress, String clusterName, boolean 
dryRun, boolean reassignInstances,
       boolean includeConsuming, boolean bootstrap, boolean downtime, int 
minReplicasToKeepUpForNoDowntime,
-      boolean bestEffort) {
+      boolean bestEffort, boolean retainInstancesSequence) {
     super(zkAddress, clusterName);
     _rebalanceConfig.addProperty(RebalanceConfigConstants.DRY_RUN, dryRun);
     _rebalanceConfig.addProperty(RebalanceConfigConstants.REASSIGN_INSTANCES, 
reassignInstances);
@@ -46,6 +46,7 @@ public class PinotTableRebalancer extends PinotZKChanger {
     
_rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
         minReplicasToKeepUpForNoDowntime);
     _rebalanceConfig.addProperty(RebalanceConfigConstants.BEST_EFFORTS, 
bestEffort);
+    
_rebalanceConfig.addProperty(RebalanceConfigConstants.RETAIN_INSTANCE_SEQUENCE, 
retainInstancesSequence);
   }
 
   public RebalanceResult rebalance(String tableNameWithType) {
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
index 6f15c8632b..d7148668d0 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
@@ -77,6 +77,10 @@ public class RebalanceTableCommand extends 
AbstractBaseAdminCommand implements C
           + " cannot be achieved, false by default)")
   private boolean _bestEfforts = false;
 
+  @CommandLine.Option(names = {"-retainInstancesSequence"},
+      description = "Whether to retain instance sequence during rebalancing in 
order to minimize data movement")
+  private boolean _retainInstancesSequence = false;
+
   @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, help = true, 
description = "Print this message")
   private boolean _help = false;
 
@@ -94,7 +98,7 @@ public class RebalanceTableCommand extends 
AbstractBaseAdminCommand implements C
       throws Exception {
     PinotTableRebalancer tableRebalancer =
         new PinotTableRebalancer(_zkAddress, _clusterName, _dryRun, 
_reassignInstances, _includeConsuming, _bootstrap,
-            _downtime, _minAvailableReplicas, _bestEfforts);
+            _downtime, _minAvailableReplicas, _bestEfforts, 
_retainInstancesSequence);
     RebalanceResult rebalanceResult = 
tableRebalancer.rebalance(_tableNameWithType);
     LOGGER
         .info("Got rebalance result: {} for table: {}", 
JsonUtils.objectToString(rebalanceResult), _tableNameWithType);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to