jackjlli commented on code in PR #8483:
URL: https://github.com/apache/pinot/pull/8483#discussion_r854735037


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -140,25 +132,118 @@ public void selectInstances(Map<Integer, 
List<InstanceConfig>> poolToInstanceCon
       LOGGER.info("Selecting {} partitions, {} instances per partition within 
a replica-group for table: {}",
           numPartitions, numInstancesPerPartition, _tableNameWithType);
 
-      // Assign consecutive instances within a replica-group to each partition.
-      // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances 
per partition)
-      // [i0, i1, i2, i3, i4]
-      //  p0  p0  p0  p1  p1
-      //  p1  p2  p2  p2
-      for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
-        int instanceIdInReplicaGroup = 0;
-        for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
-          List<String> instancesInPartition = new 
ArrayList<>(numInstancesPerPartition);
-          for (int instanceIdInPartition = 0; instanceIdInPartition < 
numInstancesPerPartition;
-              instanceIdInPartition++) {
-            
instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]);
-            instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % 
numInstancesPerReplicaGroup;
+      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && 
_existingInstancePartitions != null) {
+        // Minimize data movement.
+        int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
+
+        Map<Integer, Set<String>> poolToCandidateInstancesMap = new 
TreeMap<>();
+        Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new 
HashMap<>();
+        for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {
+          Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+          if (pool == null) {
+            // Skip the replica group if it's no longer needed.
+            continue;
+          }
+          Set<String> candidateInstances =
+              poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new 
LinkedHashSet<>());
+          List<InstanceConfig> instanceConfigsInPool = 
poolToInstanceConfigsMap.get(pool);
+          instanceConfigsInPool.forEach(k -> 
candidateInstances.add(k.getInstanceName()));
+
+          for (int partitionId = 0; partitionId < existingNumPartitions; 
partitionId++) {
+            List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            replicaGroupIdToInstancesMap.computeIfAbsent(replicaGroupId, k -> 
new HashSet<>())
+                .addAll(existingInstances);
+          }
+        }
+
+        for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {
+          Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+          if (pool == null) {
+            // Skip the replica group if it's no longer needed.
+            continue;
+          }
+          // Filter out instances that belong to other replica groups which 
should not be the candidate.
+          Set<String> candidateInstances = new 
LinkedHashSet<>(poolToCandidateInstancesMap.get(pool));
+          for (int otherReplicaGroupId = 0;
+              otherReplicaGroupId < existingNumReplicaGroups && 
otherReplicaGroupId < numReplicaGroups;
+              otherReplicaGroupId++) {
+            if (replicaGroupId != otherReplicaGroupId) {
+              
candidateInstances.removeAll(replicaGroupIdToInstancesMap.get(otherReplicaGroupId));
+            }
+          }
+          Set<String> chosenCandidateInstances = new HashSet<>();
+          for (int partitionId = 0; partitionId < existingNumPartitions; 
partitionId++) {
+            List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            List<String> instancesToSelect =
+                getInstancesWithMinimumMovement(numInstancesPerPartition, 
candidateInstances, existingInstances);
+            chosenCandidateInstances.addAll(instancesToSelect);
+            instancePartitions.setInstances(partitionId, replicaGroupId, 
instancesToSelect);
+          }
+          // Remove instances that are already been chosen.
+          
poolToCandidateInstancesMap.get(pool).removeAll(chosenCandidateInstances);
+        }
+
+        // If the new number of replica groups is greater than the existing 
number of replica groups.
+        for (int replicaGroupId = existingNumReplicaGroups; replicaGroupId < 
numReplicaGroups; replicaGroupId++) {
+          int pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+          Set<String> candidateInstances = new 
LinkedHashSet<>(poolToCandidateInstancesMap.get(pool));
+
+          Set<String> chosenCandidateInstances = new HashSet<>();
+          for (int partitionId = 0; partitionId < existingNumPartitions; 
partitionId++) {
+            List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            if (existingInstances == null) {
+              existingInstances = Collections.emptyList();
+            }
+            List<String> instancesToSelect =
+                getInstancesWithMinimumMovement(numInstancesPerPartition, 
candidateInstances, existingInstances);

Review Comment:
   Good question! The `candidateInstances` here is a linked hash set. The 
purpose of that is to maintain the sequence as well as faster lookup.
   For existing instances that are still alive, they will be popped out from 
the candidate list and then appended to the tail of the candidates, so that 
newly added instances has the chances to be picked up. For these existing alive 
instances, they will be validated by checking whether they exist in the 
candidates.
   For newly added instances, the order of them remains unchanged, so that same 
instance can be picked up for all the partitions of a replica group. 
   If it's a brand new RG, all the new instances in this candidate list will be 
picked up in round robin order.
   I've written an analysis on all the possible cases in this doc, and unit 
tests in this PR cover all these scenarios. Please take a look:
   
https://docs.google.com/document/d/1_Fn-yNjt9Ih0SQiIqIhCEvE9BvYugvAhB4tgWH-VoWE/edit?usp=sharing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to