Jackie-Jiang commented on code in PR #11953:
URL: https://github.com/apache/pinot/pull/11953#discussion_r1410042951


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java:
##########
@@ -109,11 +121,46 @@ public Map<Integer, List<InstanceConfig>> 
selectInstances(List<InstanceConfig> i
           return poolToInstanceConfigsMap;
         }
 
-        // Select pools based on the table name hash to evenly distribute the 
tables
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
-        List<Integer> poolsInCluster = new ArrayList<>(pools);
-        for (int i = 0; i < numPoolsToSelect; i++) {
-          poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % 
numPools));
+        if (_minimizeDataMovement && _existingInstancePartitions != null) {
+          Set<Integer> existingPools = new TreeSet<>();
+          // Keep the same pool if it's already been used for the table.
+          int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
+          int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
+          for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {
+            boolean foundExistingPoolForReplicaGroup = false;
+            for (int partitionId = 0; partitionId < existingNumPartitions & 
!foundExistingPoolForReplicaGroup;
+                partitionId++) {
+              List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+              for (String existingInstance : existingInstances) {
+                Integer existingPool = instanceToPoolMap.get(existingInstance);
+                if (existingPool != null) {
+                  if (existingPools.add(existingPool)) {
+                    poolsToSelect.add(existingPool);
+                  }
+                  foundExistingPoolForReplicaGroup = true;
+                  break;
+                }
+              }
+            }
+          }
+          LOGGER.info("Keep the same pool: {} for table: {}", existingPools, 
_tableNameWithType);
+          // Pick a pool from remainingPools that isn't used before.
+          List<Integer> remainingPools = new ArrayList<>(pools);
+          remainingPools.retainAll(existingPools);

Review Comment:
   I don't follow this part. Should it be `removeAll()`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -73,16 +76,65 @@ public void selectInstances(Map<Integer, 
List<InstanceConfig>> poolToInstanceCon
       Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
       Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
       Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
-      for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
-        // Pick one pool for each replica-group based on the table name hash
-        int pool = pools.get((tableNameHash + replicaId) % numPools);
-        poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new 
ArrayList<>()).add(replicaId);
-        replicaGroupIdToPoolMap.put(replicaId, pool);
+      Map<String, Integer> instanceToPoolMap = new HashMap<>();
+      for (Map.Entry<Integer, List<InstanceConfig>> entry : 
poolToInstanceConfigsMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<InstanceConfig> instanceConfigsInPool = entry.getValue();
+        Set<String> candidateInstances = 
poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
+        for (InstanceConfig instanceConfig : instanceConfigsInPool) {
+          String instanceName = instanceConfig.getInstanceName();
+          candidateInstances.add(instanceName);
+          instanceToPoolMap.put(instanceName, pool);
+        }
+      }
 
-        Set<String> candidateInstances =
-            poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new 
LinkedHashSet<>());
-        List<InstanceConfig> instanceConfigsInPool = 
poolToInstanceConfigsMap.get(pool);
-        instanceConfigsInPool.forEach(k -> 
candidateInstances.add(k.getInstanceName()));
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
+        // Keep the same pool for the replica group if it's already been used 
for the table.
+        int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
+        int numCommonReplicaGroups = Math.min(numReplicaGroups, 
existingNumReplicaGroups);

Review Comment:
   Say we used to have 2 RGs, and now we reduce it to 1 RG, we should still 
check all existing RGs and pick the pool with the most common instances so that 
we can keep minimize movement.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java:
##########
@@ -109,11 +121,46 @@ public Map<Integer, List<InstanceConfig>> 
selectInstances(List<InstanceConfig> i
           return poolToInstanceConfigsMap;
         }
 
-        // Select pools based on the table name hash to evenly distribute the 
tables
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
-        List<Integer> poolsInCluster = new ArrayList<>(pools);
-        for (int i = 0; i < numPoolsToSelect; i++) {
-          poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % 
numPools));
+        if (_minimizeDataMovement && _existingInstancePartitions != null) {
+          Set<Integer> existingPools = new TreeSet<>();
+          // Keep the same pool if it's already been used for the table.
+          int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
+          int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
+          for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {
+            boolean foundExistingPoolForReplicaGroup = false;
+            for (int partitionId = 0; partitionId < existingNumPartitions & 
!foundExistingPoolForReplicaGroup;
+                partitionId++) {
+              List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+              for (String existingInstance : existingInstances) {
+                Integer existingPool = instanceToPoolMap.get(existingInstance);
+                if (existingPool != null) {
+                  if (existingPools.add(existingPool)) {
+                    poolsToSelect.add(existingPool);
+                  }
+                  foundExistingPoolForReplicaGroup = true;
+                  break;
+                }
+              }
+            }
+          }
+          LOGGER.info("Keep the same pool: {} for table: {}", existingPools, 
_tableNameWithType);
+          // Pick a pool from remainingPools that isn't used before.
+          List<Integer> remainingPools = new ArrayList<>(pools);
+          remainingPools.retainAll(existingPools);
+          // Skip selecting the existing pool.
+          for (int i = 0; i < numPoolsToSelect; i++) {
+            if (existingPools.contains(i)) {

Review Comment:
   This doesn't seem correct. Why are we looking up index within the pool set?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to