Jackie-Jiang commented on code in PR #8483:
URL: https://github.com/apache/pinot/pull/8483#discussion_r854668972


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java:
##########
@@ -191,6 +190,22 @@ public Map<InstancePartitionsType, InstancePartitions> 
assignInstances(
     return instancePartitionsMap;
   }
 
+  /**
+   * Assign instances given the type of instancePartitions.
+   * @param instancePartitionsMap the empty map to be filled.
+   * @param tableConfig table config
+   * @param instanceConfigs list of instance configs
+   * @param instancePartitionsType type of instancePartitions
+   */
+  private void assignInstancesForInstancePartitionsType(
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
TableConfig tableConfig,
+      List<InstanceConfig> instanceConfigs, InstancePartitionsType 
instancePartitionsType) {
+    InstancePartitions existingInstancePartitions = InstancePartitionsUtils
+        
.fetchOrComputeInstancePartitions(_resourceManager.getHelixZkManager(), 
tableConfig, instancePartitionsType);

Review Comment:
   We should not compute the instance partitions if it does not exist. The 
`assignInstances()` should be able to handle the case of not having an existing 
instance partitions



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java:
##########
@@ -52,7 +52,7 @@ public InstanceAssignmentDriver(TableConfig tableConfig) {
   }
 
   public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
-      List<InstanceConfig> instanceConfigs) {
+      List<InstanceConfig> instanceConfigs, InstancePartitions 
existingInstancePartitions) {

Review Comment:
   Annotate `existingInstancePartitions` as `@Nullable`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -417,10 +417,14 @@ private InstancePartitions 
getInstancePartitions(TableConfig tableConfig,
     String tableNameWithType = tableConfig.getTableName();
     if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, 
instancePartitionsType)) {
       if (reassignInstances) {
+        InstancePartitions existingInstancePartitions =
+            
InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixManager, 
tableConfig,

Review Comment:
   Same here. We should not compute if the existing one does not exist



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -140,25 +132,118 @@ public void selectInstances(Map<Integer, 
List<InstanceConfig>> poolToInstanceCon
       LOGGER.info("Selecting {} partitions, {} instances per partition within 
a replica-group for table: {}",
           numPartitions, numInstancesPerPartition, _tableNameWithType);
 
-      // Assign consecutive instances within a replica-group to each partition.
-      // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances 
per partition)
-      // [i0, i1, i2, i3, i4]
-      //  p0  p0  p0  p1  p1
-      //  p1  p2  p2  p2
-      for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
-        int instanceIdInReplicaGroup = 0;
-        for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
-          List<String> instancesInPartition = new 
ArrayList<>(numInstancesPerPartition);
-          for (int instanceIdInPartition = 0; instanceIdInPartition < 
numInstancesPerPartition;
-              instanceIdInPartition++) {
-            
instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]);
-            instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % 
numInstancesPerReplicaGroup;
+      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && 
_existingInstancePartitions != null) {
+        // Minimize data movement.
+        int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
+
+        Map<Integer, Set<String>> poolToCandidateInstancesMap = new 
TreeMap<>();
+        Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new 
HashMap<>();
+        for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {
+          Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+          if (pool == null) {
+            // Skip the replica group if it's no longer needed.
+            continue;
+          }
+          Set<String> candidateInstances =
+              poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new 
LinkedHashSet<>());
+          List<InstanceConfig> instanceConfigsInPool = 
poolToInstanceConfigsMap.get(pool);
+          instanceConfigsInPool.forEach(k -> 
candidateInstances.add(k.getInstanceName()));
+
+          for (int partitionId = 0; partitionId < existingNumPartitions; 
partitionId++) {
+            List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            replicaGroupIdToInstancesMap.computeIfAbsent(replicaGroupId, k -> 
new HashSet<>())
+                .addAll(existingInstances);
+          }
+        }
+
+        for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {
+          Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+          if (pool == null) {
+            // Skip the replica group if it's no longer needed.
+            continue;
+          }
+          // Filter out instances that belong to other replica groups which 
should not be the candidate.
+          Set<String> candidateInstances = new 
LinkedHashSet<>(poolToCandidateInstancesMap.get(pool));
+          for (int otherReplicaGroupId = 0;
+              otherReplicaGroupId < existingNumReplicaGroups && 
otherReplicaGroupId < numReplicaGroups;
+              otherReplicaGroupId++) {
+            if (replicaGroupId != otherReplicaGroupId) {
+              
candidateInstances.removeAll(replicaGroupIdToInstancesMap.get(otherReplicaGroupId));
+            }
+          }
+          Set<String> chosenCandidateInstances = new HashSet<>();
+          for (int partitionId = 0; partitionId < existingNumPartitions; 
partitionId++) {
+            List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            List<String> instancesToSelect =
+                getInstancesWithMinimumMovement(numInstancesPerPartition, 
candidateInstances, existingInstances);
+            chosenCandidateInstances.addAll(instancesToSelect);
+            instancePartitions.setInstances(partitionId, replicaGroupId, 
instancesToSelect);
+          }
+          // Remove instances that are already been chosen.
+          
poolToCandidateInstancesMap.get(pool).removeAll(chosenCandidateInstances);
+        }
+
+        // If the new number of replica groups is greater than the existing 
number of replica groups.
+        for (int replicaGroupId = existingNumReplicaGroups; replicaGroupId < 
numReplicaGroups; replicaGroupId++) {
+          int pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+          Set<String> candidateInstances = new 
LinkedHashSet<>(poolToCandidateInstancesMap.get(pool));
+
+          Set<String> chosenCandidateInstances = new HashSet<>();
+          for (int partitionId = 0; partitionId < existingNumPartitions; 
partitionId++) {
+            List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            if (existingInstances == null) {
+              existingInstances = Collections.emptyList();
+            }
+            List<String> instancesToSelect =
+                getInstancesWithMinimumMovement(numInstancesPerPartition, 
candidateInstances, existingInstances);

Review Comment:
   I think this can cause uneven distribution if `numInstancesPerPartition` 
changes. We didn't track the number of partitions assigned to each server



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -140,25 +132,118 @@ public void selectInstances(Map<Integer, 
List<InstanceConfig>> poolToInstanceCon
       LOGGER.info("Selecting {} partitions, {} instances per partition within 
a replica-group for table: {}",
           numPartitions, numInstancesPerPartition, _tableNameWithType);
 
-      // Assign consecutive instances within a replica-group to each partition.
-      // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances 
per partition)
-      // [i0, i1, i2, i3, i4]
-      //  p0  p0  p0  p1  p1
-      //  p1  p2  p2  p2
-      for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
-        int instanceIdInReplicaGroup = 0;
-        for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
-          List<String> instancesInPartition = new 
ArrayList<>(numInstancesPerPartition);
-          for (int instanceIdInPartition = 0; instanceIdInPartition < 
numInstancesPerPartition;
-              instanceIdInPartition++) {
-            
instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]);
-            instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % 
numInstancesPerReplicaGroup;
+      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && 
_existingInstancePartitions != null) {
+        // Minimize data movement.
+        int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
+
+        Map<Integer, Set<String>> poolToCandidateInstancesMap = new 
TreeMap<>();
+        Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new 
HashMap<>();
+        for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {
+          Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+          if (pool == null) {
+            // Skip the replica group if it's no longer needed.

Review Comment:
   Is it possible to hit this branch?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -140,25 +132,118 @@ public void selectInstances(Map<Integer, 
List<InstanceConfig>> poolToInstanceCon
       LOGGER.info("Selecting {} partitions, {} instances per partition within 
a replica-group for table: {}",
           numPartitions, numInstancesPerPartition, _tableNameWithType);
 
-      // Assign consecutive instances within a replica-group to each partition.
-      // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances 
per partition)
-      // [i0, i1, i2, i3, i4]
-      //  p0  p0  p0  p1  p1
-      //  p1  p2  p2  p2
-      for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
-        int instanceIdInReplicaGroup = 0;
-        for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
-          List<String> instancesInPartition = new 
ArrayList<>(numInstancesPerPartition);
-          for (int instanceIdInPartition = 0; instanceIdInPartition < 
numInstancesPerPartition;
-              instanceIdInPartition++) {
-            
instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]);
-            instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % 
numInstancesPerReplicaGroup;
+      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && 
_existingInstancePartitions != null) {
+        // Minimize data movement.
+        int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
+
+        Map<Integer, Set<String>> poolToCandidateInstancesMap = new 
TreeMap<>();
+        Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new 
HashMap<>();
+        for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {
+          Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+          if (pool == null) {
+            // Skip the replica group if it's no longer needed.
+            continue;
+          }
+          Set<String> candidateInstances =
+              poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new 
LinkedHashSet<>());
+          List<InstanceConfig> instanceConfigsInPool = 
poolToInstanceConfigsMap.get(pool);
+          instanceConfigsInPool.forEach(k -> 
candidateInstances.add(k.getInstanceName()));
+
+          for (int partitionId = 0; partitionId < existingNumPartitions; 
partitionId++) {
+            List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            replicaGroupIdToInstancesMap.computeIfAbsent(replicaGroupId, k -> 
new HashSet<>())
+                .addAll(existingInstances);
+          }
+        }
+
+        for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {

Review Comment:
   Can we merge these 2 for loops into one? Don't see why we want to iterate 
over all replica groups twice



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to