Jackie-Jiang commented on code in PR #8483:
URL: https://github.com/apache/pinot/pull/8483#discussion_r855564628


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -37,13 +46,15 @@
 public class InstanceReplicaGroupPartitionSelector {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(InstanceReplicaGroupPartitionSelector.class);
 
-  private final InstanceReplicaGroupPartitionConfig 
_replicaGroupPartitionConfig;
-  private final String _tableNameWithType;
+  protected final InstanceReplicaGroupPartitionConfig 
_replicaGroupPartitionConfig;

Review Comment:
   (minor) Keep them `private`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -140,25 +133,118 @@ public void selectInstances(Map<Integer, 
List<InstanceConfig>> poolToInstanceCon
       LOGGER.info("Selecting {} partitions, {} instances per partition within 
a replica-group for table: {}",
           numPartitions, numInstancesPerPartition, _tableNameWithType);
 
-      // Assign consecutive instances within a replica-group to each partition.
-      // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances 
per partition)
-      // [i0, i1, i2, i3, i4]
-      //  p0  p0  p0  p1  p1
-      //  p1  p2  p2  p2
-      for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
-        int instanceIdInReplicaGroup = 0;
-        for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
-          List<String> instancesInPartition = new 
ArrayList<>(numInstancesPerPartition);
-          for (int instanceIdInPartition = 0; instanceIdInPartition < 
numInstancesPerPartition;
-              instanceIdInPartition++) {
-            
instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]);
-            instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % 
numInstancesPerReplicaGroup;
+      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && 
_existingInstancePartitions != null) {
+        // Minimize data movement.
+        int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
+
+        Map<Integer, Set<String>> poolToCandidateInstancesMap = new 
TreeMap<>();
+        Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new 
HashMap<>();

Review Comment:
   (minor) rename to be more specific, and use `TreeMap` because the map size 
will be small
   ```suggestion
           Map<Integer, Set<String>> replicaGroupIdToExistingInstancesMap = new 
TreeMap<>();
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -140,25 +133,118 @@ public void selectInstances(Map<Integer, 
List<InstanceConfig>> poolToInstanceCon
       LOGGER.info("Selecting {} partitions, {} instances per partition within 
a replica-group for table: {}",
           numPartitions, numInstancesPerPartition, _tableNameWithType);
 
-      // Assign consecutive instances within a replica-group to each partition.
-      // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances 
per partition)
-      // [i0, i1, i2, i3, i4]
-      //  p0  p0  p0  p1  p1
-      //  p1  p2  p2  p2
-      for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
-        int instanceIdInReplicaGroup = 0;
-        for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
-          List<String> instancesInPartition = new 
ArrayList<>(numInstancesPerPartition);
-          for (int instanceIdInPartition = 0; instanceIdInPartition < 
numInstancesPerPartition;
-              instanceIdInPartition++) {
-            
instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]);
-            instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % 
numInstancesPerReplicaGroup;
+      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && 
_existingInstancePartitions != null) {
+        // Minimize data movement.
+        int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
+
+        Map<Integer, Set<String>> poolToCandidateInstancesMap = new 
TreeMap<>();
+        Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new 
HashMap<>();
+        for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {
+          Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+          if (pool == null) {
+            // Skip the replica group if it's no longer needed.
+            continue;
+          }
+          Set<String> candidateInstances =
+              poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new 
LinkedHashSet<>());
+          List<InstanceConfig> instanceConfigsInPool = 
poolToInstanceConfigsMap.get(pool);
+          instanceConfigsInPool.forEach(k -> 
candidateInstances.add(k.getInstanceName()));
+
+          for (int partitionId = 0; partitionId < existingNumPartitions; 
partitionId++) {
+            List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            replicaGroupIdToInstancesMap.computeIfAbsent(replicaGroupId, k -> 
new HashSet<>())
+                .addAll(existingInstances);
+          }
+        }
+
+        for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {

Review Comment:
   The following 2 loops can be simplified (and more efficient) if we loop on 
pools first, then loop over then replica-groups within the pool



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -180,14 +266,110 @@ public void selectInstances(Map<Integer, 
List<InstanceConfig>> poolToInstanceCon
         numInstancesToSelect = numInstanceConfigs;
       }
 
-      List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
-      for (int i = 0; i < numInstancesToSelect; i++) {
-        instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
+      List<String> instancesToSelect;
+      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && 
_existingInstancePartitions != null) {
+        // Minimize data movement.
+        List<String> existingInstances = 
_existingInstancePartitions.getInstances(0, 0);
+        Set<String> candidateInstances = new LinkedHashSet<>();
+        instanceConfigs.forEach(k -> 
candidateInstances.add(k.getInstanceName()));
+        instancesToSelect =
+            getInstancesWithMinimumMovement(numInstancesToSelect, 
candidateInstances, existingInstances);
+      } else {
+        // Select instances sequentially.
+        instancesToSelect = new ArrayList<>(numInstancesToSelect);
+        for (int i = 0; i < numInstancesToSelect; i++) {
+          instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
+        }
       }
       instancesToSelect.sort(null);
       LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, 
_tableNameWithType);
       // Set the instances as partition 0 replica 0
       instancePartitions.setInstances(0, 0, instancesToSelect);
     }
   }
+
+  /**
+   * Select instances with minimum movement.
+   * This algorithm can solve the following scenarios:
+   *    * swap an instance
+   *    * add/remove replica groups
+   *    * increase/decrease number of instances per replica group
+   * TODO: handle the scenarios that selected pools are changed.
+   * @param numInstancesToSelect number of instances to select
+   * @param candidateInstances candidate instances to be selected
+   * @param existingInstances list of existing instances
+   */
+  protected List<String> getInstancesWithMinimumMovement(int 
numInstancesToSelect, Set<String> candidateInstances,
+      List<String> existingInstances) {
+    // Initialize the list with empty positions to fill.
+    List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
+    for (int i = 0; i < numInstancesToSelect; i++) {
+      instancesToSelect.add(null);
+    }
+    Deque<String> newlyAddedInstances = new LinkedList<>();
+
+    // Find out the existing instances that are still alive.
+    Set<String> existingInstancesStillAlive = new HashSet<>();
+    for (String existingInstance : existingInstances) {
+      if (candidateInstances.contains(existingInstance)) {
+        existingInstancesStillAlive.add(existingInstance);
+      }
+    }
+
+    // Find out the newly added instances.
+    for (String candidateInstance : candidateInstances) {
+      if (!existingInstancesStillAlive.contains(candidateInstance)) {
+        newlyAddedInstances.add(candidateInstance);
+      }
+    }
+
+    for (int i = 0; i < existingInstances.size() && i < numInstancesToSelect; 
i++) {
+      String existingInstance = existingInstances.get(i);
+      if (candidateInstances.contains(existingInstance)) {
+        // If the instance still exists, add it to the instance list.
+        instancesToSelect.set(i, existingInstance);
+        existingInstancesStillAlive.remove(existingInstance);
+        // Add the existing instance to the tail so that it won't be firstly 
chosen for the next partition.
+        candidateInstances.remove(existingInstance);
+        candidateInstances.add(existingInstance);
+      } else if (!newlyAddedInstances.isEmpty()) {
+        // If the instance no longer exists, pick a new instance to fill its 
vacant position.
+        String newInstance = newlyAddedInstances.pollFirst();
+        instancesToSelect.set(i, newInstance);
+      }
+    }

Review Comment:
   I feel the following 3 loops can be simplified
   ```suggestion
       int numExistingInstances = existingInstances.size();
       for (int i = 0; i < numInstancesToSelect; i++) {
         String existingInstance = i < numExistingInstances ? 
existingInstances.get(i) : null;
         String selectedInstance;
         if (existingInstance != null && 
candidateInstances.contains(existingInstance)) {
           selectedInstance = existingInstance;
         } else {
           selectInstance = newlyAddedInstances.poll();
         }
         // Add the selected instance to the tail so that it won't be firstly 
chosen for the next partition.
         candidateInstances.remove(selectedInstance);
         candidateInstances.add(selectedInstance);
       }
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -180,14 +266,110 @@ public void selectInstances(Map<Integer, 
List<InstanceConfig>> poolToInstanceCon
         numInstancesToSelect = numInstanceConfigs;
       }
 
-      List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
-      for (int i = 0; i < numInstancesToSelect; i++) {
-        instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
+      List<String> instancesToSelect;
+      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && 
_existingInstancePartitions != null) {
+        // Minimize data movement.
+        List<String> existingInstances = 
_existingInstancePartitions.getInstances(0, 0);
+        Set<String> candidateInstances = new LinkedHashSet<>();
+        instanceConfigs.forEach(k -> 
candidateInstances.add(k.getInstanceName()));
+        instancesToSelect =
+            getInstancesWithMinimumMovement(numInstancesToSelect, 
candidateInstances, existingInstances);
+      } else {
+        // Select instances sequentially.
+        instancesToSelect = new ArrayList<>(numInstancesToSelect);
+        for (int i = 0; i < numInstancesToSelect; i++) {
+          instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
+        }
       }
       instancesToSelect.sort(null);
       LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, 
_tableNameWithType);
       // Set the instances as partition 0 replica 0
       instancePartitions.setInstances(0, 0, instancesToSelect);
     }
   }
+
+  /**
+   * Select instances with minimum movement.
+   * This algorithm can solve the following scenarios:
+   *    * swap an instance
+   *    * add/remove replica groups
+   *    * increase/decrease number of instances per replica group
+   * TODO: handle the scenarios that selected pools are changed.
+   * @param numInstancesToSelect number of instances to select
+   * @param candidateInstances candidate instances to be selected
+   * @param existingInstances list of existing instances
+   */
+  protected List<String> getInstancesWithMinimumMovement(int 
numInstancesToSelect, Set<String> candidateInstances,

Review Comment:
   (minor) `private static` and put `LinkedHashSet` instead of general `Set` 
because we rely on that for the algorithm to work
   ```suggestion
     private static List<String> getInstancesWithMinimumMovement(int 
numInstancesToSelect, LinkedHashSet<String> candidateInstances,
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -140,25 +133,118 @@ public void selectInstances(Map<Integer, 
List<InstanceConfig>> poolToInstanceCon
       LOGGER.info("Selecting {} partitions, {} instances per partition within 
a replica-group for table: {}",
           numPartitions, numInstancesPerPartition, _tableNameWithType);
 
-      // Assign consecutive instances within a replica-group to each partition.
-      // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances 
per partition)
-      // [i0, i1, i2, i3, i4]
-      //  p0  p0  p0  p1  p1
-      //  p1  p2  p2  p2
-      for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
-        int instanceIdInReplicaGroup = 0;
-        for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
-          List<String> instancesInPartition = new 
ArrayList<>(numInstancesPerPartition);
-          for (int instanceIdInPartition = 0; instanceIdInPartition < 
numInstancesPerPartition;
-              instanceIdInPartition++) {
-            
instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]);
-            instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % 
numInstancesPerReplicaGroup;
+      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && 
_existingInstancePartitions != null) {
+        // Minimize data movement.
+        int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
+
+        Map<Integer, Set<String>> poolToCandidateInstancesMap = new 
TreeMap<>();
+        Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new 
HashMap<>();
+        for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {
+          Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+          if (pool == null) {
+            // Skip the replica group if it's no longer needed.
+            continue;
+          }
+          Set<String> candidateInstances =
+              poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new 
LinkedHashSet<>());
+          List<InstanceConfig> instanceConfigsInPool = 
poolToInstanceConfigsMap.get(pool);
+          instanceConfigsInPool.forEach(k -> 
candidateInstances.add(k.getInstanceName()));
+
+          for (int partitionId = 0; partitionId < existingNumPartitions; 
partitionId++) {
+            List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            replicaGroupIdToInstancesMap.computeIfAbsent(replicaGroupId, k -> 
new HashSet<>())
+                .addAll(existingInstances);
+          }
+        }
+
+        for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {
+          Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+          if (pool == null) {
+            // Skip the replica group if it's no longer needed.
+            continue;
+          }
+          // Filter out instances that belong to other replica groups which 
should not be the candidate.
+          Set<String> candidateInstances = new 
LinkedHashSet<>(poolToCandidateInstancesMap.get(pool));
+          for (int otherReplicaGroupId = 0;
+              otherReplicaGroupId < existingNumReplicaGroups && 
otherReplicaGroupId < numReplicaGroups;
+              otherReplicaGroupId++) {
+            if (replicaGroupId != otherReplicaGroupId) {
+              
candidateInstances.removeAll(replicaGroupIdToInstancesMap.get(otherReplicaGroupId));
+            }
+          }
+          Set<String> chosenCandidateInstances = new HashSet<>();
+          for (int partitionId = 0; partitionId < existingNumPartitions; 
partitionId++) {
+            List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            List<String> instancesToSelect =
+                getInstancesWithMinimumMovement(numInstancesPerPartition, 
candidateInstances, existingInstances);
+            chosenCandidateInstances.addAll(instancesToSelect);
+            instancePartitions.setInstances(partitionId, replicaGroupId, 
instancesToSelect);
+          }
+          // Remove instances that are already been chosen.
+          
poolToCandidateInstancesMap.get(pool).removeAll(chosenCandidateInstances);
+        }
+
+        // If the new number of replica groups is greater than the existing 
number of replica groups.
+        for (int replicaGroupId = existingNumReplicaGroups; replicaGroupId < 
numReplicaGroups; replicaGroupId++) {
+          int pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+          Set<String> candidateInstances = new 
LinkedHashSet<>(poolToCandidateInstancesMap.get(pool));
+
+          Set<String> chosenCandidateInstances = new HashSet<>();
+          for (int partitionId = 0; partitionId < existingNumPartitions; 
partitionId++) {
+            List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            if (existingInstances == null) {

Review Comment:
   This is always `null`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -180,14 +266,110 @@ public void selectInstances(Map<Integer, 
List<InstanceConfig>> poolToInstanceCon
         numInstancesToSelect = numInstanceConfigs;
       }
 
-      List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
-      for (int i = 0; i < numInstancesToSelect; i++) {
-        instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
+      List<String> instancesToSelect;
+      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && 
_existingInstancePartitions != null) {
+        // Minimize data movement.
+        List<String> existingInstances = 
_existingInstancePartitions.getInstances(0, 0);
+        Set<String> candidateInstances = new LinkedHashSet<>();
+        instanceConfigs.forEach(k -> 
candidateInstances.add(k.getInstanceName()));
+        instancesToSelect =
+            getInstancesWithMinimumMovement(numInstancesToSelect, 
candidateInstances, existingInstances);
+      } else {
+        // Select instances sequentially.
+        instancesToSelect = new ArrayList<>(numInstancesToSelect);
+        for (int i = 0; i < numInstancesToSelect; i++) {
+          instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
+        }
       }
       instancesToSelect.sort(null);
       LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, 
_tableNameWithType);
       // Set the instances as partition 0 replica 0
       instancePartitions.setInstances(0, 0, instancesToSelect);
     }
   }
+
+  /**
+   * Select instances with minimum movement.
+   * This algorithm can solve the following scenarios:

Review Comment:
   I still feel this algorithm cannot guarantee evenly distribution of the 
partitions in certain scenarios because it doesn't track the partitions already 
assigned to the instance. In order to have guaranteed even distribution, we 
should assign in 2 steps: first assign the existing instances for all 
partitions; then assign the vacant positions based on the partitions already 
assigned to each instance.
   But the current algorithm should work well in most scenarios, so probably 
add a TODO for the above concern



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -180,14 +266,110 @@ public void selectInstances(Map<Integer, 
List<InstanceConfig>> poolToInstanceCon
         numInstancesToSelect = numInstanceConfigs;
       }
 
-      List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
-      for (int i = 0; i < numInstancesToSelect; i++) {
-        instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
+      List<String> instancesToSelect;
+      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && 
_existingInstancePartitions != null) {
+        // Minimize data movement.
+        List<String> existingInstances = 
_existingInstancePartitions.getInstances(0, 0);
+        Set<String> candidateInstances = new LinkedHashSet<>();
+        instanceConfigs.forEach(k -> 
candidateInstances.add(k.getInstanceName()));
+        instancesToSelect =
+            getInstancesWithMinimumMovement(numInstancesToSelect, 
candidateInstances, existingInstances);
+      } else {
+        // Select instances sequentially.
+        instancesToSelect = new ArrayList<>(numInstancesToSelect);
+        for (int i = 0; i < numInstancesToSelect; i++) {
+          instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
+        }
       }
       instancesToSelect.sort(null);
       LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, 
_tableNameWithType);
       // Set the instances as partition 0 replica 0
       instancePartitions.setInstances(0, 0, instancesToSelect);
     }
   }
+
+  /**
+   * Select instances with minimum movement.
+   * This algorithm can solve the following scenarios:
+   *    * swap an instance
+   *    * add/remove replica groups
+   *    * increase/decrease number of instances per replica group
+   * TODO: handle the scenarios that selected pools are changed.
+   * @param numInstancesToSelect number of instances to select
+   * @param candidateInstances candidate instances to be selected
+   * @param existingInstances list of existing instances
+   */
+  protected List<String> getInstancesWithMinimumMovement(int 
numInstancesToSelect, Set<String> candidateInstances,
+      List<String> existingInstances) {
+    // Initialize the list with empty positions to fill.
+    List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
+    for (int i = 0; i < numInstancesToSelect; i++) {
+      instancesToSelect.add(null);
+    }
+    Deque<String> newlyAddedInstances = new LinkedList<>();
+
+    // Find out the existing instances that are still alive.
+    Set<String> existingInstancesStillAlive = new HashSet<>();
+    for (String existingInstance : existingInstances) {
+      if (candidateInstances.contains(existingInstance)) {
+        existingInstancesStillAlive.add(existingInstance);
+      }
+    }
+
+    // Find out the newly added instances.
+    for (String candidateInstance : candidateInstances) {
+      if (!existingInstancesStillAlive.contains(candidateInstance)) {
+        newlyAddedInstances.add(candidateInstance);
+      }
+    }
+
+    for (int i = 0; i < existingInstances.size() && i < numInstancesToSelect; 
i++) {
+      String existingInstance = existingInstances.get(i);
+      if (candidateInstances.contains(existingInstance)) {
+        // If the instance still exists, add it to the instance list.
+        instancesToSelect.set(i, existingInstance);
+        existingInstancesStillAlive.remove(existingInstance);
+        // Add the existing instance to the tail so that it won't be firstly 
chosen for the next partition.
+        candidateInstances.remove(existingInstance);
+        candidateInstances.add(existingInstance);
+      } else if (!newlyAddedInstances.isEmpty()) {
+        // If the instance no longer exists, pick a new instance to fill its 
vacant position.
+        String newInstance = newlyAddedInstances.pollFirst();
+        instancesToSelect.set(i, newInstance);
+      }
+    }
+
+    // If the new number of instances per replica group is greater than the 
previous one.

Review Comment:
   ```suggestion
       // If the new number of instances per partition is greater than the 
previous one.
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -140,25 +132,118 @@ public void selectInstances(Map<Integer, 
List<InstanceConfig>> poolToInstanceCon
       LOGGER.info("Selecting {} partitions, {} instances per partition within 
a replica-group for table: {}",
           numPartitions, numInstancesPerPartition, _tableNameWithType);
 
-      // Assign consecutive instances within a replica-group to each partition.
-      // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances 
per partition)
-      // [i0, i1, i2, i3, i4]
-      //  p0  p0  p0  p1  p1
-      //  p1  p2  p2  p2
-      for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
-        int instanceIdInReplicaGroup = 0;
-        for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
-          List<String> instancesInPartition = new 
ArrayList<>(numInstancesPerPartition);
-          for (int instanceIdInPartition = 0; instanceIdInPartition < 
numInstancesPerPartition;
-              instanceIdInPartition++) {
-            
instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]);
-            instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % 
numInstancesPerReplicaGroup;
+      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && 
_existingInstancePartitions != null) {
+        // Minimize data movement.
+        int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
+
+        Map<Integer, Set<String>> poolToCandidateInstancesMap = new 
TreeMap<>();
+        Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new 
HashMap<>();
+        for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {
+          Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+          if (pool == null) {
+            // Skip the replica group if it's no longer needed.
+            continue;
+          }
+          Set<String> candidateInstances =
+              poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new 
LinkedHashSet<>());
+          List<InstanceConfig> instanceConfigsInPool = 
poolToInstanceConfigsMap.get(pool);
+          instanceConfigsInPool.forEach(k -> 
candidateInstances.add(k.getInstanceName()));
+
+          for (int partitionId = 0; partitionId < existingNumPartitions; 
partitionId++) {
+            List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            replicaGroupIdToInstancesMap.computeIfAbsent(replicaGroupId, k -> 
new HashSet<>())
+                .addAll(existingInstances);
+          }
+        }
+
+        for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {
+          Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+          if (pool == null) {
+            // Skip the replica group if it's no longer needed.
+            continue;
+          }
+          // Filter out instances that belong to other replica groups which 
should not be the candidate.
+          Set<String> candidateInstances = new 
LinkedHashSet<>(poolToCandidateInstancesMap.get(pool));
+          for (int otherReplicaGroupId = 0;
+              otherReplicaGroupId < existingNumReplicaGroups && 
otherReplicaGroupId < numReplicaGroups;
+              otherReplicaGroupId++) {
+            if (replicaGroupId != otherReplicaGroupId) {
+              
candidateInstances.removeAll(replicaGroupIdToInstancesMap.get(otherReplicaGroupId));
+            }
+          }
+          Set<String> chosenCandidateInstances = new HashSet<>();
+          for (int partitionId = 0; partitionId < existingNumPartitions; 
partitionId++) {
+            List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            List<String> instancesToSelect =
+                getInstancesWithMinimumMovement(numInstancesPerPartition, 
candidateInstances, existingInstances);
+            chosenCandidateInstances.addAll(instancesToSelect);
+            instancePartitions.setInstances(partitionId, replicaGroupId, 
instancesToSelect);
+          }
+          // Remove instances that are already been chosen.
+          
poolToCandidateInstancesMap.get(pool).removeAll(chosenCandidateInstances);
+        }
+
+        // If the new number of replica groups is greater than the existing 
number of replica groups.
+        for (int replicaGroupId = existingNumReplicaGroups; replicaGroupId < 
numReplicaGroups; replicaGroupId++) {
+          int pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+          Set<String> candidateInstances = new 
LinkedHashSet<>(poolToCandidateInstancesMap.get(pool));
+
+          Set<String> chosenCandidateInstances = new HashSet<>();
+          for (int partitionId = 0; partitionId < existingNumPartitions; 
partitionId++) {
+            List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            if (existingInstances == null) {
+              existingInstances = Collections.emptyList();
+            }
+            List<String> instancesToSelect =
+                getInstancesWithMinimumMovement(numInstancesPerPartition, 
candidateInstances, existingInstances);

Review Comment:
   After reading the doc, the algorithm is much more clear. Shall we add some 
comments for the 3 steps of the algorithm before each for loop to explain what 
we are trying to achieve for each step?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -140,25 +132,118 @@ public void selectInstances(Map<Integer, 
List<InstanceConfig>> poolToInstanceCon
       LOGGER.info("Selecting {} partitions, {} instances per partition within 
a replica-group for table: {}",
           numPartitions, numInstancesPerPartition, _tableNameWithType);
 
-      // Assign consecutive instances within a replica-group to each partition.
-      // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances 
per partition)
-      // [i0, i1, i2, i3, i4]
-      //  p0  p0  p0  p1  p1
-      //  p1  p2  p2  p2
-      for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
-        int instanceIdInReplicaGroup = 0;
-        for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
-          List<String> instancesInPartition = new 
ArrayList<>(numInstancesPerPartition);
-          for (int instanceIdInPartition = 0; instanceIdInPartition < 
numInstancesPerPartition;
-              instanceIdInPartition++) {
-            
instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]);
-            instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % 
numInstancesPerReplicaGroup;
+      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && 
_existingInstancePartitions != null) {
+        // Minimize data movement.
+        int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
+
+        Map<Integer, Set<String>> poolToCandidateInstancesMap = new 
TreeMap<>();
+        Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new 
HashMap<>();
+        for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {
+          Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+          if (pool == null) {
+            // Skip the replica group if it's no longer needed.

Review Comment:
   I see. Suggest adding `int numCommonReplicaGroups = 
Math.min(numReplicaGroups, existingNumReplicaGroups);` and use it as the end 
state to be more clear



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to