somandal commented on code in PR #11953: URL: https://github.com/apache/pinot/pull/11953#discussion_r1442339107
########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java: ########## @@ -79,13 +83,26 @@ public boolean equals(Object obj) { } public static class AscendingIntPairComparator implements Comparator<IntPair> { + private boolean _ascending; + + public AscendingIntPairComparator(boolean ascending) { Review Comment: recommend renaming this class since it is no longer a strictly "Ascending" comparator. The boolean you added allows both ascending and descending comparisons. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java: ########## @@ -71,18 +74,114 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon int numReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups(); Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups must be positive"); Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>(); + Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new TreeMap<>(); + Map<Integer, Set<Integer>> existingPoolToExistingReplicaGroupIdsMap = new TreeMap<>(); + Map<Integer, Set<String>> existingReplicaGroupIdToExistingInstancesMap = new TreeMap<>(); Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>(); Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>(); - for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) { - // Pick one pool for each replica-group based on the table name hash - int pool = pools.get((tableNameHash + replicaId) % numPools); - poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId); - replicaGroupIdToPoolMap.put(replicaId, pool); + Map<String, Integer> instanceToPoolMap = new HashMap<>(); + for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) { + Integer pool = entry.getKey(); + List<InstanceConfig> instanceConfigsInPool = entry.getValue(); + Set<String> candidateInstances = poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>()); + for (InstanceConfig instanceConfig : instanceConfigsInPool) { + String instanceName = instanceConfig.getInstanceName(); + candidateInstances.add(instanceName); + instanceToPoolMap.put(instanceName, pool); + } + } + + if (_minimizeDataMovement && _existingInstancePartitions != null) { + // Collect the stats between the existing pools, existing replica groups, and existing instances. + int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); + int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); + for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) { + for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { + List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); + for (String existingInstance : existingInstances) { + Integer existingPool = instanceToPoolMap.get(existingInstance); + if (existingPool != null) { + existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new HashSet<>()) + .add(existingInstance); + existingPoolToExistingReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new HashSet<>()) + .add(replicaGroupId); + existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>()) + .add(existingInstance); + } + } + } + } + + // Use a max heap to track the number of servers used for the given pools, + // so that pool with max number of existing instances will be considered first. + PriorityQueue<Pairs.IntPair> maxHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator(false)); + for (int pool : pools) { + maxHeap.add( + new Pairs.IntPair(existingPoolsToExistingInstancesMap.computeIfAbsent(pool, k -> new HashSet<>()).size(), + pool)); + } - Set<String> candidateInstances = - poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>()); - List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool); - instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName())); + // Get the maximum number of replica groups per pool. + int maxNumberOfReplicaGroupPerPool = numReplicaGroups / pools.size(); Review Comment: The comment here is confusing. Should this use the ceil() of the division? What if the `numReplicaGroups` isn't a multiple of number of pools? e.g. 3 replica groups across 2 pools? This will set the max to 1 instead of 2. Or is this intentionally the floor? In which case can you update the comment and variable name to reflect that this should be minimum number of RGs/pool? ########## pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java: ########## @@ -56,6 +56,8 @@ public class InstanceReplicaGroupPartitionConfig extends BaseJsonConfig { "Name of the column used for partition, if not provided table level replica group will be used") private final String _partitionColumn; + // TODO: remove this config in the next official release + @Deprecated Review Comment: just a question: we'll have to update all table configs on our end to remove this once it is removed, right? Will we see failures for existing tables if this is deleted in the next release but we still have table configs setting this in the `InstanceReplicaGroupPartitionConfig`? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java: ########## @@ -71,18 +74,114 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon int numReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups(); Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups must be positive"); Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>(); + Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new TreeMap<>(); + Map<Integer, Set<Integer>> existingPoolToExistingReplicaGroupIdsMap = new TreeMap<>(); + Map<Integer, Set<String>> existingReplicaGroupIdToExistingInstancesMap = new TreeMap<>(); Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>(); Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>(); - for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) { - // Pick one pool for each replica-group based on the table name hash - int pool = pools.get((tableNameHash + replicaId) % numPools); - poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId); - replicaGroupIdToPoolMap.put(replicaId, pool); + Map<String, Integer> instanceToPoolMap = new HashMap<>(); + for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) { + Integer pool = entry.getKey(); + List<InstanceConfig> instanceConfigsInPool = entry.getValue(); + Set<String> candidateInstances = poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>()); + for (InstanceConfig instanceConfig : instanceConfigsInPool) { + String instanceName = instanceConfig.getInstanceName(); + candidateInstances.add(instanceName); + instanceToPoolMap.put(instanceName, pool); + } + } + + if (_minimizeDataMovement && _existingInstancePartitions != null) { + // Collect the stats between the existing pools, existing replica groups, and existing instances. + int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); + int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); + for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) { + for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { + List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); + for (String existingInstance : existingInstances) { + Integer existingPool = instanceToPoolMap.get(existingInstance); + if (existingPool != null) { + existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new HashSet<>()) + .add(existingInstance); + existingPoolToExistingReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new HashSet<>()) + .add(replicaGroupId); + existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>()) + .add(existingInstance); + } + } + } + } + + // Use a max heap to track the number of servers used for the given pools, + // so that pool with max number of existing instances will be considered first. + PriorityQueue<Pairs.IntPair> maxHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator(false)); + for (int pool : pools) { + maxHeap.add( + new Pairs.IntPair(existingPoolsToExistingInstancesMap.computeIfAbsent(pool, k -> new HashSet<>()).size(), + pool)); + } - Set<String> candidateInstances = - poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>()); - List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool); - instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName())); + // Get the maximum number of replica groups per pool. + int maxNumberOfReplicaGroupPerPool = numReplicaGroups / pools.size(); + // Given a pool number, assign replica group which has the max number of existing instances. + // Repeat this process until the max number of replica groups per pool is reached. + while (!maxHeap.isEmpty()) { + Pairs.IntPair pair = maxHeap.remove(); + int poolNumber = pair.getRight(); + for (int i = 0; i < maxNumberOfReplicaGroupPerPool; i++) { Review Comment: Just wondering if there is a code simplification opportunity here. Instead of running this outer loop, can you just extract out the relevant group ids from `existingReplicaGroupIdToExistingInstancesMap`, sort by size ascending and assign the top `maxNumberOfReplicaGroupPerPool` number of target groups if larger than 0? Also I guess if you do want to keep this for loop you can move it to be after the following, right? ``` Set<Integer> existingReplicaGroups = existingPoolToExistingReplicaGroupIdsMap.get(poolNumber); if (existingReplicaGroups == null || existingReplicaGroups.isEmpty()) { continue; } ``` I don't see how the above will change for each run ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java: ########## @@ -109,11 +123,46 @@ public Map<Integer, List<InstanceConfig>> selectInstances(List<InstanceConfig> i return poolToInstanceConfigsMap; } - // Select pools based on the table name hash to evenly distribute the tables poolsToSelect = new ArrayList<>(numPoolsToSelect); - List<Integer> poolsInCluster = new ArrayList<>(pools); - for (int i = 0; i < numPoolsToSelect; i++) { - poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools)); + if (_minimizeDataMovement && _existingInstancePartitions != null) { + Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new TreeMap<>(); + // Keep the same pool if it's already been used for the table. + int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); + int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); + for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) { + for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { + List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); + for (String existingInstance : existingInstances) { + Integer existingPool = instanceToPoolMap.get(existingInstance); + if (existingPool != null) { + if (!existingPoolsToExistingInstancesMap.containsKey(existingPool)) { + existingPoolsToExistingInstancesMap.put(existingPool, new HashSet<>()); + } Review Comment: you don't need this. You're already doing a `computeIfAbsent` on the next line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org