somandal commented on code in PR #11953:
URL: https://github.com/apache/pinot/pull/11953#discussion_r1442339107


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java:
##########
@@ -79,13 +83,26 @@ public boolean equals(Object obj) {
   }
 
   public static class AscendingIntPairComparator implements 
Comparator<IntPair> {
+    private boolean _ascending;
+
+    public AscendingIntPairComparator(boolean ascending) {

Review Comment:
   recommend renaming this class since it is no longer a strictly "Ascending" 
comparator. The boolean you added allows both ascending and descending 
comparisons.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -71,18 +74,114 @@ public void selectInstances(Map<Integer, 
List<InstanceConfig>> poolToInstanceCon
       int numReplicaGroups = 
_replicaGroupPartitionConfig.getNumReplicaGroups();
       Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups 
must be positive");
       Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
+      Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new 
TreeMap<>();
+      Map<Integer, Set<Integer>> existingPoolToExistingReplicaGroupIdsMap = 
new TreeMap<>();
+      Map<Integer, Set<String>> existingReplicaGroupIdToExistingInstancesMap = 
new TreeMap<>();
       Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
       Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
-      for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
-        // Pick one pool for each replica-group based on the table name hash
-        int pool = pools.get((tableNameHash + replicaId) % numPools);
-        poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new 
ArrayList<>()).add(replicaId);
-        replicaGroupIdToPoolMap.put(replicaId, pool);
+      Map<String, Integer> instanceToPoolMap = new HashMap<>();
+      for (Map.Entry<Integer, List<InstanceConfig>> entry : 
poolToInstanceConfigsMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<InstanceConfig> instanceConfigsInPool = entry.getValue();
+        Set<String> candidateInstances = 
poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
+        for (InstanceConfig instanceConfig : instanceConfigsInPool) {
+          String instanceName = instanceConfig.getInstanceName();
+          candidateInstances.add(instanceName);
+          instanceToPoolMap.put(instanceName, pool);
+        }
+      }
+
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
+        // Collect the stats between the existing pools, existing replica 
groups, and existing instances.
+        int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
+        for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {
+          for (int partitionId = 0; partitionId < existingNumPartitions; 
partitionId++) {
+            List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            for (String existingInstance : existingInstances) {
+              Integer existingPool = instanceToPoolMap.get(existingInstance);
+              if (existingPool != null) {
+                
existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new 
HashSet<>())
+                    .add(existingInstance);
+                
existingPoolToExistingReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new 
HashSet<>())
+                    .add(replicaGroupId);
+                
existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k 
-> new HashSet<>())
+                    .add(existingInstance);
+              }
+            }
+          }
+        }
+
+        // Use a max heap to track the number of servers used for the given 
pools,
+        // so that pool with max number of existing instances will be 
considered first.
+        PriorityQueue<Pairs.IntPair> maxHeap = new 
PriorityQueue<>(pools.size(), Pairs.intPairComparator(false));
+        for (int pool : pools) {
+          maxHeap.add(
+              new 
Pairs.IntPair(existingPoolsToExistingInstancesMap.computeIfAbsent(pool, k -> 
new HashSet<>()).size(),
+                  pool));
+        }
 
-        Set<String> candidateInstances =
-            poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new 
LinkedHashSet<>());
-        List<InstanceConfig> instanceConfigsInPool = 
poolToInstanceConfigsMap.get(pool);
-        instanceConfigsInPool.forEach(k -> 
candidateInstances.add(k.getInstanceName()));
+        // Get the maximum number of replica groups per pool.
+        int maxNumberOfReplicaGroupPerPool = numReplicaGroups / pools.size();

Review Comment:
   The comment here is confusing. Should this use the ceil() of the division? 
What if the `numReplicaGroups` isn't a multiple of number of pools? e.g. 3 
replica groups across 2 pools? This will set the max to 1 instead of 2.
   
   Or is this intentionally the floor? In which case can you update the comment 
and variable name to reflect that this should be minimum number of RGs/pool?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java:
##########
@@ -56,6 +56,8 @@ public class InstanceReplicaGroupPartitionConfig extends 
BaseJsonConfig {
       "Name of the column used for partition, if not provided table level 
replica group will be used")
   private final String _partitionColumn;
 
+  // TODO: remove this config in the next official release
+  @Deprecated

Review Comment:
   just a question: we'll have to update all table configs on our end to remove 
this once it is removed, right? Will we see failures for existing tables if 
this is deleted in the next release but we still have table configs setting 
this in the `InstanceReplicaGroupPartitionConfig`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -71,18 +74,114 @@ public void selectInstances(Map<Integer, 
List<InstanceConfig>> poolToInstanceCon
       int numReplicaGroups = 
_replicaGroupPartitionConfig.getNumReplicaGroups();
       Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups 
must be positive");
       Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
+      Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new 
TreeMap<>();
+      Map<Integer, Set<Integer>> existingPoolToExistingReplicaGroupIdsMap = 
new TreeMap<>();
+      Map<Integer, Set<String>> existingReplicaGroupIdToExistingInstancesMap = 
new TreeMap<>();
       Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
       Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
-      for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
-        // Pick one pool for each replica-group based on the table name hash
-        int pool = pools.get((tableNameHash + replicaId) % numPools);
-        poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new 
ArrayList<>()).add(replicaId);
-        replicaGroupIdToPoolMap.put(replicaId, pool);
+      Map<String, Integer> instanceToPoolMap = new HashMap<>();
+      for (Map.Entry<Integer, List<InstanceConfig>> entry : 
poolToInstanceConfigsMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<InstanceConfig> instanceConfigsInPool = entry.getValue();
+        Set<String> candidateInstances = 
poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
+        for (InstanceConfig instanceConfig : instanceConfigsInPool) {
+          String instanceName = instanceConfig.getInstanceName();
+          candidateInstances.add(instanceName);
+          instanceToPoolMap.put(instanceName, pool);
+        }
+      }
+
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
+        // Collect the stats between the existing pools, existing replica 
groups, and existing instances.
+        int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
+        for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {
+          for (int partitionId = 0; partitionId < existingNumPartitions; 
partitionId++) {
+            List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            for (String existingInstance : existingInstances) {
+              Integer existingPool = instanceToPoolMap.get(existingInstance);
+              if (existingPool != null) {
+                
existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new 
HashSet<>())
+                    .add(existingInstance);
+                
existingPoolToExistingReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new 
HashSet<>())
+                    .add(replicaGroupId);
+                
existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k 
-> new HashSet<>())
+                    .add(existingInstance);
+              }
+            }
+          }
+        }
+
+        // Use a max heap to track the number of servers used for the given 
pools,
+        // so that pool with max number of existing instances will be 
considered first.
+        PriorityQueue<Pairs.IntPair> maxHeap = new 
PriorityQueue<>(pools.size(), Pairs.intPairComparator(false));
+        for (int pool : pools) {
+          maxHeap.add(
+              new 
Pairs.IntPair(existingPoolsToExistingInstancesMap.computeIfAbsent(pool, k -> 
new HashSet<>()).size(),
+                  pool));
+        }
 
-        Set<String> candidateInstances =
-            poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new 
LinkedHashSet<>());
-        List<InstanceConfig> instanceConfigsInPool = 
poolToInstanceConfigsMap.get(pool);
-        instanceConfigsInPool.forEach(k -> 
candidateInstances.add(k.getInstanceName()));
+        // Get the maximum number of replica groups per pool.
+        int maxNumberOfReplicaGroupPerPool = numReplicaGroups / pools.size();
+        // Given a pool number, assign replica group which has the max number 
of existing instances.
+        // Repeat this process until the max number of replica groups per pool 
is reached.
+        while (!maxHeap.isEmpty()) {
+          Pairs.IntPair pair = maxHeap.remove();
+          int poolNumber = pair.getRight();
+          for (int i = 0; i < maxNumberOfReplicaGroupPerPool; i++) {

Review Comment:
   Just wondering if there is a code simplification opportunity here. Instead 
of running this outer loop, can you just extract out the relevant group ids 
from `existingReplicaGroupIdToExistingInstancesMap`, sort by size ascending and 
assign the top `maxNumberOfReplicaGroupPerPool` number of target groups if 
larger than 0?
   
   Also I guess if you do want to keep this for loop you can move it to be 
after the following, right?
   ```
    Set<Integer> existingReplicaGroups = 
existingPoolToExistingReplicaGroupIdsMap.get(poolNumber);
               if (existingReplicaGroups == null || 
existingReplicaGroups.isEmpty()) {
                 continue;
               }
   ```
   I don't see how the above will change for each run



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java:
##########
@@ -109,11 +123,46 @@ public Map<Integer, List<InstanceConfig>> 
selectInstances(List<InstanceConfig> i
           return poolToInstanceConfigsMap;
         }
 
-        // Select pools based on the table name hash to evenly distribute the 
tables
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
-        List<Integer> poolsInCluster = new ArrayList<>(pools);
-        for (int i = 0; i < numPoolsToSelect; i++) {
-          poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % 
numPools));
+        if (_minimizeDataMovement && _existingInstancePartitions != null) {
+          Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new 
TreeMap<>();
+          // Keep the same pool if it's already been used for the table.
+          int existingNumPartitions = 
_existingInstancePartitions.getNumPartitions();
+          int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
+          for (int replicaGroupId = 0; replicaGroupId < 
existingNumReplicaGroups; replicaGroupId++) {
+            for (int partitionId = 0; partitionId < existingNumPartitions; 
partitionId++) {
+              List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+              for (String existingInstance : existingInstances) {
+                Integer existingPool = instanceToPoolMap.get(existingInstance);
+                if (existingPool != null) {
+                  if 
(!existingPoolsToExistingInstancesMap.containsKey(existingPool)) {
+                    existingPoolsToExistingInstancesMap.put(existingPool, new 
HashSet<>());
+                  }

Review Comment:
   you don't need this. You're already doing a `computeIfAbsent` on the next 
line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to