jackjlli commented on code in PR #8483:
URL: https://github.com/apache/pinot/pull/8483#discussion_r854730114


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -140,25 +132,118 @@ public void selectInstances(Map<Integer, 
List<InstanceConfig>> poolToInstanceCon
       LOGGER.info("Selecting {} partitions, {} instances per partition within 
a replica-group for table: {}",
           numPartitions, numInstancesPerPartition, _tableNameWithType);
 
-      // Assign consecutive instances within a replica-group to each partition.
-      // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances 
per partition)
-      // [i0, i1, i2, i3, i4]
-      //  p0  p0  p0  p1  p1
-      //  p1  p2  p2  p2
-      for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
-        int instanceIdInReplicaGroup = 0;
-        for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
-          List<String> instancesInPartition = new 
ArrayList<>(numInstancesPerPartition);
-          for (int instanceIdInPartition = 0; instanceIdInPartition < 
numInstancesPerPartition;
-              instanceIdInPartition++) {
-            
instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]);
-            instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % 
numInstancesPerReplicaGroup;
+      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && 
_existingInstancePartitions != null) {
+        // Minimize data movement.
+        int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
+
+        Map<Integer, Set<String>> poolToCandidateInstancesMap = new 
TreeMap<>();
+        Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new 
HashMap<>();
+        for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {
+          Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+          if (pool == null) {
+            // Skip the replica group if it's no longer needed.
+            continue;
+          }
+          Set<String> candidateInstances =
+              poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new 
LinkedHashSet<>());
+          List<InstanceConfig> instanceConfigsInPool = 
poolToInstanceConfigsMap.get(pool);
+          instanceConfigsInPool.forEach(k -> 
candidateInstances.add(k.getInstanceName()));
+
+          for (int partitionId = 0; partitionId < existingNumPartitions; 
partitionId++) {
+            List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            replicaGroupIdToInstancesMap.computeIfAbsent(replicaGroupId, k -> 
new HashSet<>())
+                .addAll(existingInstances);
+          }
+        }
+
+        for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {

Review Comment:
   The 1st loop is to find out the existing RGs and their instances. The 2nd 
loop is to filter out those instances which are already used in other RGs. In 
order to filter all the instances that belong to other RGs, the 1st loop is 
needed. That's why we need 2 loops separately. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to