Jackie-Jiang commented on code in PR #11953:
URL: https://github.com/apache/pinot/pull/11953#discussion_r1395100930


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java:
##########
@@ -109,11 +122,45 @@ public Map<Integer, List<InstanceConfig>> 
selectInstances(List<InstanceConfig> i
           return poolToInstanceConfigsMap;
         }
 
-        // Select pools based on the table name hash to evenly distribute the 
tables
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
-        List<Integer> poolsInCluster = new ArrayList<>(pools);
-        for (int i = 0; i < numPoolsToSelect; i++) {
-          poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % 
numPools));
+        if (_minimizeDataMovement && _existingInstancePartitions != null) {
+          Set<Integer> existingPools = new HashSet<>(numPoolsToSelect);

Review Comment:
   (minor) This set is very small
   ```suggestion
             Set<Integer> existingPools = new TreeSet<>();
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -73,16 +76,65 @@ public void selectInstances(Map<Integer, 
List<InstanceConfig>> poolToInstanceCon
       Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
       Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
       Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
-      for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
-        // Pick one pool for each replica-group based on the table name hash
-        int pool = pools.get((tableNameHash + replicaId) % numPools);
-        poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new 
ArrayList<>()).add(replicaId);
-        replicaGroupIdToPoolMap.put(replicaId, pool);
+      Map<String, Integer> instanceToPoolMap = new HashMap<>();
+      for (Map.Entry<Integer, List<InstanceConfig>> entry : 
poolToInstanceConfigsMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<InstanceConfig> instanceConfigsInPool = entry.getValue();
+        Set<String> candidateInstances = 
poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
+        for (InstanceConfig instanceConfig : instanceConfigsInPool) {
+          String instanceName = instanceConfig.getInstanceName();
+          candidateInstances.add(instanceName);
+          instanceToPoolMap.put(instanceName, pool);
+        }
+      }
 
-        Set<String> candidateInstances =
-            poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new 
LinkedHashSet<>());
-        List<InstanceConfig> instanceConfigsInPool = 
poolToInstanceConfigsMap.get(pool);
-        instanceConfigsInPool.forEach(k -> 
candidateInstances.add(k.getInstanceName()));
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
+        // Keep the same pool for the replica group if it's already been used 
for the table.
+        int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
+        int numCommonReplicaGroups = Math.min(numReplicaGroups, 
existingNumReplicaGroups);

Review Comment:
   Why are we only checking the common replica groups? We should try to match 
the pool to existing replica groups, then assign the remaining pools.
   There are several scenarios to cover:
   - Same pool and replica groups
   - pools < replica groups (or single pool)
   - pools > replica groups



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java:
##########
@@ -109,11 +122,45 @@ public Map<Integer, List<InstanceConfig>> 
selectInstances(List<InstanceConfig> i
           return poolToInstanceConfigsMap;
         }
 
-        // Select pools based on the table name hash to evenly distribute the 
tables
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
-        List<Integer> poolsInCluster = new ArrayList<>(pools);
-        for (int i = 0; i < numPoolsToSelect; i++) {
-          poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % 
numPools));
+        if (_minimizeDataMovement && _existingInstancePartitions != null) {
+          Set<Integer> existingPools = new HashSet<>(numPoolsToSelect);
+          // Keep the same pool if it's already been used for the table.
+          int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
+          int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
+          for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {
+            boolean foundExistingPoolForReplicaGroup = false;
+            for (int partitionId = 0; partitionId < existingNumPartitions & 
!foundExistingPoolForReplicaGroup;
+                partitionId++) {
+              List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+              for (String existingInstance : existingInstances) {
+                Integer existingPool = instanceToPoolMap.get(existingInstance);
+                if (existingPool != null & pools.contains(existingPool)) {

Review Comment:
   (minor) The second check should be redundant because both maps are extracted 
from the same mapping
   ```suggestion
                   if (existingPool != null) {
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java:
##########
@@ -109,11 +122,45 @@ public Map<Integer, List<InstanceConfig>> 
selectInstances(List<InstanceConfig> i
           return poolToInstanceConfigsMap;
         }
 
-        // Select pools based on the table name hash to evenly distribute the 
tables
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
-        List<Integer> poolsInCluster = new ArrayList<>(pools);
-        for (int i = 0; i < numPoolsToSelect; i++) {
-          poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % 
numPools));
+        if (_minimizeDataMovement && _existingInstancePartitions != null) {
+          Set<Integer> existingPools = new HashSet<>(numPoolsToSelect);
+          // Keep the same pool if it's already been used for the table.
+          int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
+          int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
+          for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {
+            boolean foundExistingPoolForReplicaGroup = false;
+            for (int partitionId = 0; partitionId < existingNumPartitions & 
!foundExistingPoolForReplicaGroup;
+                partitionId++) {
+              List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+              for (String existingInstance : existingInstances) {
+                Integer existingPool = instanceToPoolMap.get(existingInstance);
+                if (existingPool != null & pools.contains(existingPool)) {
+                  poolsToSelect.add(existingPool);

Review Comment:
   This can potentially add the same pool multiple times. We should probably 
first gather all existing pools, then decide the pools to select



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java:
##########
@@ -41,9 +45,17 @@ public class InstanceTagPoolSelector {
   private final InstanceTagPoolConfig _tagPoolConfig;
   private final String _tableNameWithType;
 
-  public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String 
tableNameWithType) {
+  private final boolean _minimizeDataMovement;
+
+  private final InstancePartitions _existingInstancePartitions;
+
+  public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String 
tableNameWithType,
+      boolean minimizeDataMovement,
+      @Nullable InstancePartitions existingInstancePartitions) {

Review Comment:
   (minor) Remove the unnecessary whitespace and reformat



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to