jackjlli commented on code in PR #8483:
URL: https://github.com/apache/pinot/pull/8483#discussion_r856594719


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -180,14 +266,110 @@ public void selectInstances(Map<Integer, 
List<InstanceConfig>> poolToInstanceCon
         numInstancesToSelect = numInstanceConfigs;
       }
 
-      List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
-      for (int i = 0; i < numInstancesToSelect; i++) {
-        instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
+      List<String> instancesToSelect;
+      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && 
_existingInstancePartitions != null) {
+        // Minimize data movement.
+        List<String> existingInstances = 
_existingInstancePartitions.getInstances(0, 0);
+        Set<String> candidateInstances = new LinkedHashSet<>();
+        instanceConfigs.forEach(k -> 
candidateInstances.add(k.getInstanceName()));
+        instancesToSelect =
+            getInstancesWithMinimumMovement(numInstancesToSelect, 
candidateInstances, existingInstances);
+      } else {
+        // Select instances sequentially.
+        instancesToSelect = new ArrayList<>(numInstancesToSelect);
+        for (int i = 0; i < numInstancesToSelect; i++) {
+          instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
+        }
       }
       instancesToSelect.sort(null);
       LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, 
_tableNameWithType);
       // Set the instances as partition 0 replica 0
       instancePartitions.setInstances(0, 0, instancesToSelect);
     }
   }
+
+  /**
+   * Select instances with minimum movement.
+   * This algorithm can solve the following scenarios:
+   *    * swap an instance
+   *    * add/remove replica groups
+   *    * increase/decrease number of instances per replica group
+   * TODO: handle the scenarios that selected pools are changed.
+   * @param numInstancesToSelect number of instances to select
+   * @param candidateInstances candidate instances to be selected
+   * @param existingInstances list of existing instances
+   */
+  protected List<String> getInstancesWithMinimumMovement(int 
numInstancesToSelect, Set<String> candidateInstances,
+      List<String> existingInstances) {
+    // Initialize the list with empty positions to fill.
+    List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
+    for (int i = 0; i < numInstancesToSelect; i++) {
+      instancesToSelect.add(null);
+    }
+    Deque<String> newlyAddedInstances = new LinkedList<>();
+
+    // Find out the existing instances that are still alive.
+    Set<String> existingInstancesStillAlive = new HashSet<>();
+    for (String existingInstance : existingInstances) {
+      if (candidateInstances.contains(existingInstance)) {
+        existingInstancesStillAlive.add(existingInstance);
+      }
+    }
+
+    // Find out the newly added instances.
+    for (String candidateInstance : candidateInstances) {
+      if (!existingInstancesStillAlive.contains(candidateInstance)) {
+        newlyAddedInstances.add(candidateInstance);
+      }
+    }
+
+    for (int i = 0; i < existingInstances.size() && i < numInstancesToSelect; 
i++) {
+      String existingInstance = existingInstances.get(i);
+      if (candidateInstances.contains(existingInstance)) {
+        // If the instance still exists, add it to the instance list.
+        instancesToSelect.set(i, existingInstance);
+        existingInstancesStillAlive.remove(existingInstance);
+        // Add the existing instance to the tail so that it won't be firstly 
chosen for the next partition.
+        candidateInstances.remove(existingInstance);
+        candidateInstances.add(existingInstance);
+      } else if (!newlyAddedInstances.isEmpty()) {
+        // If the instance no longer exists, pick a new instance to fill its 
vacant position.
+        String newInstance = newlyAddedInstances.pollFirst();
+        instancesToSelect.set(i, newInstance);
+      }
+    }

Review Comment:
   simplified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to