Jackie-Jiang commented on code in PR #8483:
URL: https://github.com/apache/pinot/pull/8483#discussion_r856668832


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -140,25 +132,119 @@ public void selectInstances(Map<Integer, 
List<InstanceConfig>> poolToInstanceCon
       LOGGER.info("Selecting {} partitions, {} instances per partition within 
a replica-group for table: {}",
           numPartitions, numInstancesPerPartition, _tableNameWithType);
 
-      // Assign consecutive instances within a replica-group to each partition.
-      // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances 
per partition)
-      // [i0, i1, i2, i3, i4]
-      //  p0  p0  p0  p1  p1
-      //  p1  p2  p2  p2
-      for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
-        int instanceIdInReplicaGroup = 0;
-        for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
-          List<String> instancesInPartition = new 
ArrayList<>(numInstancesPerPartition);
-          for (int instanceIdInPartition = 0; instanceIdInPartition < 
numInstancesPerPartition;
-              instanceIdInPartition++) {
-            
instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]);
-            instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % 
numInstancesPerReplicaGroup;
+      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && 
_existingInstancePartitions != null) {
+        // Minimize data movement.
+        int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
+        int numCommonReplicaGroups = Math.min(numReplicaGroups, 
existingNumReplicaGroups);
+
+        Map<Integer, Set<String>> poolToCandidateInstancesMap = new 
TreeMap<>();
+        Map<Integer, Set<String>> replicaGroupIdToExistingInstancesMap = new 
TreeMap<>();
+        // Step 1: find out the replica groups and their existing instances,
+        //   so that these instances can be filtered out and won't be chosen 
for the other replica group.
+        for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; 
replicaGroupId++) {
+          Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+          if (pool == null) {
+            // Skip the replica group if it's no longer needed.
+            continue;
+          }
+          Set<String> candidateInstances =
+              poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new 
LinkedHashSet<>());
+          List<InstanceConfig> instanceConfigsInPool = 
poolToInstanceConfigsMap.get(pool);
+          instanceConfigsInPool.forEach(k -> 
candidateInstances.add(k.getInstanceName()));
+
+          for (int partitionId = 0; partitionId < existingNumPartitions; 
partitionId++) {
+            List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            
replicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new 
HashSet<>())
+                .addAll(existingInstances);
+          }
+        }
+
+        for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {
+          Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+          if (pool == null) {
+            // Skip the replica group if it's no longer needed.
+            continue;
+          }

Review Comment:
   ```suggestion
           for (int replicaGroupId = 0; replicaGroupId < 
numCommonReplicaGroups; replicaGroupId++) {
             Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -140,25 +132,119 @@ public void selectInstances(Map<Integer, 
List<InstanceConfig>> poolToInstanceCon
       LOGGER.info("Selecting {} partitions, {} instances per partition within 
a replica-group for table: {}",
           numPartitions, numInstancesPerPartition, _tableNameWithType);
 
-      // Assign consecutive instances within a replica-group to each partition.
-      // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances 
per partition)
-      // [i0, i1, i2, i3, i4]
-      //  p0  p0  p0  p1  p1
-      //  p1  p2  p2  p2
-      for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
-        int instanceIdInReplicaGroup = 0;
-        for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
-          List<String> instancesInPartition = new 
ArrayList<>(numInstancesPerPartition);
-          for (int instanceIdInPartition = 0; instanceIdInPartition < 
numInstancesPerPartition;
-              instanceIdInPartition++) {
-            
instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]);
-            instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % 
numInstancesPerReplicaGroup;
+      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && 
_existingInstancePartitions != null) {
+        // Minimize data movement.
+        int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
+        int numCommonReplicaGroups = Math.min(numReplicaGroups, 
existingNumReplicaGroups);
+
+        Map<Integer, Set<String>> poolToCandidateInstancesMap = new 
TreeMap<>();
+        Map<Integer, Set<String>> replicaGroupIdToExistingInstancesMap = new 
TreeMap<>();
+        // Step 1: find out the replica groups and their existing instances,
+        //   so that these instances can be filtered out and won't be chosen 
for the other replica group.
+        for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; 
replicaGroupId++) {
+          Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+          if (pool == null) {
+            // Skip the replica group if it's no longer needed.
+            continue;
+          }

Review Comment:
   This check is no longer needed with `numCommonReplicaGroups`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -180,14 +266,104 @@ public void selectInstances(Map<Integer, 
List<InstanceConfig>> poolToInstanceCon
         numInstancesToSelect = numInstanceConfigs;
       }
 
-      List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
-      for (int i = 0; i < numInstancesToSelect; i++) {
-        instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
+      List<String> instancesToSelect;
+      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && 
_existingInstancePartitions != null) {
+        // Minimize data movement.
+        List<String> existingInstances = 
_existingInstancePartitions.getInstances(0, 0);
+        LinkedHashSet<String> candidateInstances = new LinkedHashSet<>();
+        instanceConfigs.forEach(k -> 
candidateInstances.add(k.getInstanceName()));
+        instancesToSelect =
+            getInstancesWithMinimumMovement(numInstancesToSelect, 
candidateInstances, existingInstances);
+      } else {
+        // Select instances sequentially.
+        instancesToSelect = new ArrayList<>(numInstancesToSelect);
+        for (int i = 0; i < numInstancesToSelect; i++) {
+          instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
+        }
       }
       instancesToSelect.sort(null);
       LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, 
_tableNameWithType);
       // Set the instances as partition 0 replica 0
       instancePartitions.setInstances(0, 0, instancesToSelect);
     }
   }
+
+  /**
+   * Select instances with minimum movement.
+   * This algorithm can solve the following scenarios:
+   *    * swap an instance
+   *    * add/remove replica groups
+   *    * increase/decrease number of instances per replica group
+   * TODO: handle the scenarios that selected pools are changed.
+   * TODO: improve the algorithm by doing the following steps:
+   *         1. assign the existing instances for all partitions;
+   *         2. assign the vacant positions based on the partitions already 
assigned to each instance.
+   * @param numInstancesToSelect number of instances to select
+   * @param candidateInstances candidate instances to be selected
+   * @param existingInstances list of existing instances
+   */
+  private static List<String> getInstancesWithMinimumMovement(int 
numInstancesToSelect,
+      LinkedHashSet<String> candidateInstances, List<String> 
existingInstances) {
+    // Initialize the list with empty positions to fill.
+    List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
+    for (int i = 0; i < numInstancesToSelect; i++) {
+      instancesToSelect.add(null);
+    }
+    Deque<String> newlyAddedInstances = new LinkedList<>();
+
+    // Find out the existing instances that are still alive.
+    Set<String> existingInstancesStillAlive = new HashSet<>();
+    for (String existingInstance : existingInstances) {
+      if (candidateInstances.contains(existingInstance)) {
+        existingInstancesStillAlive.add(existingInstance);
+      }
+    }
+
+    // Find out the newly added instances.
+    for (String candidateInstance : candidateInstances) {
+      if (!existingInstancesStillAlive.contains(candidateInstance)) {
+        newlyAddedInstances.add(candidateInstance);
+      }
+    }
+
+    int numExistingInstances = existingInstances.size();
+    for (int i = 0; i < numInstancesToSelect; i++) {
+      String existingInstance = i < numExistingInstances ? 
existingInstances.get(i) : null;
+      String selectedInstance;
+      if (existingInstance != null && 
candidateInstances.contains(existingInstance)) {
+        selectedInstance = existingInstance;
+        existingInstancesStillAlive.remove(selectedInstance);
+      } else {
+        selectedInstance = newlyAddedInstances.poll();
+      }
+      instancesToSelect.set(i, selectedInstance);
+      // If it's an existing alive instance, or it's for a new replica group, 
add the new instance to the tail,
+      // so that it won't be firstly chosen for the next partition.
+      // For newly added instances to fill the existing replica group, the 
sequence cannot change;
+      // otherwise there is no guarantee that same vacant position will be 
filled with the same new instance.
+      // The 'selectedInstance' object can still be null if there is no new 
instances from the candidate list.
+      if (selectedInstance != null && (i < numExistingInstances || 
existingInstances.isEmpty())) {
+        candidateInstances.remove(selectedInstance);
+        candidateInstances.add(selectedInstance);
+      }
+    }
+
+    // If there are still some vacant positions in the instance list,
+    // try to fill with instances which are either left over or newly added.
+    for (int i = 0; i < instancesToSelect.size(); i++) {
+      if (instancesToSelect.get(i) == null) {
+        if (!existingInstancesStillAlive.isEmpty()) {
+          Iterator<String> iterator = existingInstancesStillAlive.iterator();
+          String existingInstanceLeftOver = iterator.next();
+          instancesToSelect.set(i, existingInstanceLeftOver);
+          iterator.remove();
+        } else if (!newlyAddedInstances.isEmpty()) {
+          // pick a new instance to fill its vacant position.
+          String newInstance = newlyAddedInstances.pollFirst();
+          instancesToSelect.set(i, newInstance);
+        }
+      }
+    }

Review Comment:
   This loop shouldn't be needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to