Jackie-Jiang commented on code in PR #8483: URL: https://github.com/apache/pinot/pull/8483#discussion_r855564628
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java: ########## @@ -37,13 +46,15 @@ public class InstanceReplicaGroupPartitionSelector { private static final Logger LOGGER = LoggerFactory.getLogger(InstanceReplicaGroupPartitionSelector.class); - private final InstanceReplicaGroupPartitionConfig _replicaGroupPartitionConfig; - private final String _tableNameWithType; + protected final InstanceReplicaGroupPartitionConfig _replicaGroupPartitionConfig; Review Comment: (minor) Keep them `private` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java: ########## @@ -140,25 +133,118 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}", numPartitions, numInstancesPerPartition, _tableNameWithType); - // Assign consecutive instances within a replica-group to each partition. - // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances per partition) - // [i0, i1, i2, i3, i4] - // p0 p0 p0 p1 p1 - // p1 p2 p2 p2 - for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { - int instanceIdInReplicaGroup = 0; - for (int partitionId = 0; partitionId < numPartitions; partitionId++) { - List<String> instancesInPartition = new ArrayList<>(numInstancesPerPartition); - for (int instanceIdInPartition = 0; instanceIdInPartition < numInstancesPerPartition; - instanceIdInPartition++) { - instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]); - instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup; + if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) { + // Minimize data movement. + int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); + int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); + + Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>(); + Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new HashMap<>(); Review Comment: (minor) rename to be more specific, and use `TreeMap` because the map size will be small ```suggestion Map<Integer, Set<String>> replicaGroupIdToExistingInstancesMap = new TreeMap<>(); ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java: ########## @@ -140,25 +133,118 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}", numPartitions, numInstancesPerPartition, _tableNameWithType); - // Assign consecutive instances within a replica-group to each partition. - // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances per partition) - // [i0, i1, i2, i3, i4] - // p0 p0 p0 p1 p1 - // p1 p2 p2 p2 - for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { - int instanceIdInReplicaGroup = 0; - for (int partitionId = 0; partitionId < numPartitions; partitionId++) { - List<String> instancesInPartition = new ArrayList<>(numInstancesPerPartition); - for (int instanceIdInPartition = 0; instanceIdInPartition < numInstancesPerPartition; - instanceIdInPartition++) { - instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]); - instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup; + if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) { + // Minimize data movement. + int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); + int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); + + Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>(); + Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new HashMap<>(); + for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) { + Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId); + if (pool == null) { + // Skip the replica group if it's no longer needed. + continue; + } + Set<String> candidateInstances = + poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>()); + List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool); + instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName())); + + for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { + List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); + replicaGroupIdToInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>()) + .addAll(existingInstances); + } + } + + for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) { Review Comment: The following 2 loops can be simplified (and more efficient) if we loop on pools first, then loop over then replica-groups within the pool ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java: ########## @@ -180,14 +266,110 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon numInstancesToSelect = numInstanceConfigs; } - List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect); - for (int i = 0; i < numInstancesToSelect; i++) { - instancesToSelect.add(instanceConfigs.get(i).getInstanceName()); + List<String> instancesToSelect; + if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) { + // Minimize data movement. + List<String> existingInstances = _existingInstancePartitions.getInstances(0, 0); + Set<String> candidateInstances = new LinkedHashSet<>(); + instanceConfigs.forEach(k -> candidateInstances.add(k.getInstanceName())); + instancesToSelect = + getInstancesWithMinimumMovement(numInstancesToSelect, candidateInstances, existingInstances); + } else { + // Select instances sequentially. + instancesToSelect = new ArrayList<>(numInstancesToSelect); + for (int i = 0; i < numInstancesToSelect; i++) { + instancesToSelect.add(instanceConfigs.get(i).getInstanceName()); + } } instancesToSelect.sort(null); LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, _tableNameWithType); // Set the instances as partition 0 replica 0 instancePartitions.setInstances(0, 0, instancesToSelect); } } + + /** + * Select instances with minimum movement. + * This algorithm can solve the following scenarios: + * * swap an instance + * * add/remove replica groups + * * increase/decrease number of instances per replica group + * TODO: handle the scenarios that selected pools are changed. + * @param numInstancesToSelect number of instances to select + * @param candidateInstances candidate instances to be selected + * @param existingInstances list of existing instances + */ + protected List<String> getInstancesWithMinimumMovement(int numInstancesToSelect, Set<String> candidateInstances, + List<String> existingInstances) { + // Initialize the list with empty positions to fill. + List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect); + for (int i = 0; i < numInstancesToSelect; i++) { + instancesToSelect.add(null); + } + Deque<String> newlyAddedInstances = new LinkedList<>(); + + // Find out the existing instances that are still alive. + Set<String> existingInstancesStillAlive = new HashSet<>(); + for (String existingInstance : existingInstances) { + if (candidateInstances.contains(existingInstance)) { + existingInstancesStillAlive.add(existingInstance); + } + } + + // Find out the newly added instances. + for (String candidateInstance : candidateInstances) { + if (!existingInstancesStillAlive.contains(candidateInstance)) { + newlyAddedInstances.add(candidateInstance); + } + } + + for (int i = 0; i < existingInstances.size() && i < numInstancesToSelect; i++) { + String existingInstance = existingInstances.get(i); + if (candidateInstances.contains(existingInstance)) { + // If the instance still exists, add it to the instance list. + instancesToSelect.set(i, existingInstance); + existingInstancesStillAlive.remove(existingInstance); + // Add the existing instance to the tail so that it won't be firstly chosen for the next partition. + candidateInstances.remove(existingInstance); + candidateInstances.add(existingInstance); + } else if (!newlyAddedInstances.isEmpty()) { + // If the instance no longer exists, pick a new instance to fill its vacant position. + String newInstance = newlyAddedInstances.pollFirst(); + instancesToSelect.set(i, newInstance); + } + } Review Comment: I feel the following 3 loops can be simplified ```suggestion int numExistingInstances = existingInstances.size(); for (int i = 0; i < numInstancesToSelect; i++) { String existingInstance = i < numExistingInstances ? existingInstances.get(i) : null; String selectedInstance; if (existingInstance != null && candidateInstances.contains(existingInstance)) { selectedInstance = existingInstance; } else { selectInstance = newlyAddedInstances.poll(); } // Add the selected instance to the tail so that it won't be firstly chosen for the next partition. candidateInstances.remove(selectedInstance); candidateInstances.add(selectedInstance); } ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java: ########## @@ -180,14 +266,110 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon numInstancesToSelect = numInstanceConfigs; } - List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect); - for (int i = 0; i < numInstancesToSelect; i++) { - instancesToSelect.add(instanceConfigs.get(i).getInstanceName()); + List<String> instancesToSelect; + if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) { + // Minimize data movement. + List<String> existingInstances = _existingInstancePartitions.getInstances(0, 0); + Set<String> candidateInstances = new LinkedHashSet<>(); + instanceConfigs.forEach(k -> candidateInstances.add(k.getInstanceName())); + instancesToSelect = + getInstancesWithMinimumMovement(numInstancesToSelect, candidateInstances, existingInstances); + } else { + // Select instances sequentially. + instancesToSelect = new ArrayList<>(numInstancesToSelect); + for (int i = 0; i < numInstancesToSelect; i++) { + instancesToSelect.add(instanceConfigs.get(i).getInstanceName()); + } } instancesToSelect.sort(null); LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, _tableNameWithType); // Set the instances as partition 0 replica 0 instancePartitions.setInstances(0, 0, instancesToSelect); } } + + /** + * Select instances with minimum movement. + * This algorithm can solve the following scenarios: + * * swap an instance + * * add/remove replica groups + * * increase/decrease number of instances per replica group + * TODO: handle the scenarios that selected pools are changed. + * @param numInstancesToSelect number of instances to select + * @param candidateInstances candidate instances to be selected + * @param existingInstances list of existing instances + */ + protected List<String> getInstancesWithMinimumMovement(int numInstancesToSelect, Set<String> candidateInstances, Review Comment: (minor) `private static` and put `LinkedHashSet` instead of general `Set` because we rely on that for the algorithm to work ```suggestion private static List<String> getInstancesWithMinimumMovement(int numInstancesToSelect, LinkedHashSet<String> candidateInstances, ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java: ########## @@ -140,25 +133,118 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}", numPartitions, numInstancesPerPartition, _tableNameWithType); - // Assign consecutive instances within a replica-group to each partition. - // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances per partition) - // [i0, i1, i2, i3, i4] - // p0 p0 p0 p1 p1 - // p1 p2 p2 p2 - for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { - int instanceIdInReplicaGroup = 0; - for (int partitionId = 0; partitionId < numPartitions; partitionId++) { - List<String> instancesInPartition = new ArrayList<>(numInstancesPerPartition); - for (int instanceIdInPartition = 0; instanceIdInPartition < numInstancesPerPartition; - instanceIdInPartition++) { - instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]); - instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup; + if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) { + // Minimize data movement. + int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); + int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); + + Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>(); + Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new HashMap<>(); + for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) { + Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId); + if (pool == null) { + // Skip the replica group if it's no longer needed. + continue; + } + Set<String> candidateInstances = + poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>()); + List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool); + instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName())); + + for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { + List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); + replicaGroupIdToInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>()) + .addAll(existingInstances); + } + } + + for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) { + Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId); + if (pool == null) { + // Skip the replica group if it's no longer needed. + continue; + } + // Filter out instances that belong to other replica groups which should not be the candidate. + Set<String> candidateInstances = new LinkedHashSet<>(poolToCandidateInstancesMap.get(pool)); + for (int otherReplicaGroupId = 0; + otherReplicaGroupId < existingNumReplicaGroups && otherReplicaGroupId < numReplicaGroups; + otherReplicaGroupId++) { + if (replicaGroupId != otherReplicaGroupId) { + candidateInstances.removeAll(replicaGroupIdToInstancesMap.get(otherReplicaGroupId)); + } + } + Set<String> chosenCandidateInstances = new HashSet<>(); + for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { + List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); + List<String> instancesToSelect = + getInstancesWithMinimumMovement(numInstancesPerPartition, candidateInstances, existingInstances); + chosenCandidateInstances.addAll(instancesToSelect); + instancePartitions.setInstances(partitionId, replicaGroupId, instancesToSelect); + } + // Remove instances that are already been chosen. + poolToCandidateInstancesMap.get(pool).removeAll(chosenCandidateInstances); + } + + // If the new number of replica groups is greater than the existing number of replica groups. + for (int replicaGroupId = existingNumReplicaGroups; replicaGroupId < numReplicaGroups; replicaGroupId++) { + int pool = replicaGroupIdToPoolMap.get(replicaGroupId); + Set<String> candidateInstances = new LinkedHashSet<>(poolToCandidateInstancesMap.get(pool)); + + Set<String> chosenCandidateInstances = new HashSet<>(); + for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { + List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); + if (existingInstances == null) { Review Comment: This is always `null`? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java: ########## @@ -180,14 +266,110 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon numInstancesToSelect = numInstanceConfigs; } - List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect); - for (int i = 0; i < numInstancesToSelect; i++) { - instancesToSelect.add(instanceConfigs.get(i).getInstanceName()); + List<String> instancesToSelect; + if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) { + // Minimize data movement. + List<String> existingInstances = _existingInstancePartitions.getInstances(0, 0); + Set<String> candidateInstances = new LinkedHashSet<>(); + instanceConfigs.forEach(k -> candidateInstances.add(k.getInstanceName())); + instancesToSelect = + getInstancesWithMinimumMovement(numInstancesToSelect, candidateInstances, existingInstances); + } else { + // Select instances sequentially. + instancesToSelect = new ArrayList<>(numInstancesToSelect); + for (int i = 0; i < numInstancesToSelect; i++) { + instancesToSelect.add(instanceConfigs.get(i).getInstanceName()); + } } instancesToSelect.sort(null); LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, _tableNameWithType); // Set the instances as partition 0 replica 0 instancePartitions.setInstances(0, 0, instancesToSelect); } } + + /** + * Select instances with minimum movement. + * This algorithm can solve the following scenarios: Review Comment: I still feel this algorithm cannot guarantee evenly distribution of the partitions in certain scenarios because it doesn't track the partitions already assigned to the instance. In order to have guaranteed even distribution, we should assign in 2 steps: first assign the existing instances for all partitions; then assign the vacant positions based on the partitions already assigned to each instance. But the current algorithm should work well in most scenarios, so probably add a TODO for the above concern ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java: ########## @@ -180,14 +266,110 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon numInstancesToSelect = numInstanceConfigs; } - List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect); - for (int i = 0; i < numInstancesToSelect; i++) { - instancesToSelect.add(instanceConfigs.get(i).getInstanceName()); + List<String> instancesToSelect; + if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) { + // Minimize data movement. + List<String> existingInstances = _existingInstancePartitions.getInstances(0, 0); + Set<String> candidateInstances = new LinkedHashSet<>(); + instanceConfigs.forEach(k -> candidateInstances.add(k.getInstanceName())); + instancesToSelect = + getInstancesWithMinimumMovement(numInstancesToSelect, candidateInstances, existingInstances); + } else { + // Select instances sequentially. + instancesToSelect = new ArrayList<>(numInstancesToSelect); + for (int i = 0; i < numInstancesToSelect; i++) { + instancesToSelect.add(instanceConfigs.get(i).getInstanceName()); + } } instancesToSelect.sort(null); LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, _tableNameWithType); // Set the instances as partition 0 replica 0 instancePartitions.setInstances(0, 0, instancesToSelect); } } + + /** + * Select instances with minimum movement. + * This algorithm can solve the following scenarios: + * * swap an instance + * * add/remove replica groups + * * increase/decrease number of instances per replica group + * TODO: handle the scenarios that selected pools are changed. + * @param numInstancesToSelect number of instances to select + * @param candidateInstances candidate instances to be selected + * @param existingInstances list of existing instances + */ + protected List<String> getInstancesWithMinimumMovement(int numInstancesToSelect, Set<String> candidateInstances, + List<String> existingInstances) { + // Initialize the list with empty positions to fill. + List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect); + for (int i = 0; i < numInstancesToSelect; i++) { + instancesToSelect.add(null); + } + Deque<String> newlyAddedInstances = new LinkedList<>(); + + // Find out the existing instances that are still alive. + Set<String> existingInstancesStillAlive = new HashSet<>(); + for (String existingInstance : existingInstances) { + if (candidateInstances.contains(existingInstance)) { + existingInstancesStillAlive.add(existingInstance); + } + } + + // Find out the newly added instances. + for (String candidateInstance : candidateInstances) { + if (!existingInstancesStillAlive.contains(candidateInstance)) { + newlyAddedInstances.add(candidateInstance); + } + } + + for (int i = 0; i < existingInstances.size() && i < numInstancesToSelect; i++) { + String existingInstance = existingInstances.get(i); + if (candidateInstances.contains(existingInstance)) { + // If the instance still exists, add it to the instance list. + instancesToSelect.set(i, existingInstance); + existingInstancesStillAlive.remove(existingInstance); + // Add the existing instance to the tail so that it won't be firstly chosen for the next partition. + candidateInstances.remove(existingInstance); + candidateInstances.add(existingInstance); + } else if (!newlyAddedInstances.isEmpty()) { + // If the instance no longer exists, pick a new instance to fill its vacant position. + String newInstance = newlyAddedInstances.pollFirst(); + instancesToSelect.set(i, newInstance); + } + } + + // If the new number of instances per replica group is greater than the previous one. Review Comment: ```suggestion // If the new number of instances per partition is greater than the previous one. ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java: ########## @@ -140,25 +132,118 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}", numPartitions, numInstancesPerPartition, _tableNameWithType); - // Assign consecutive instances within a replica-group to each partition. - // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances per partition) - // [i0, i1, i2, i3, i4] - // p0 p0 p0 p1 p1 - // p1 p2 p2 p2 - for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { - int instanceIdInReplicaGroup = 0; - for (int partitionId = 0; partitionId < numPartitions; partitionId++) { - List<String> instancesInPartition = new ArrayList<>(numInstancesPerPartition); - for (int instanceIdInPartition = 0; instanceIdInPartition < numInstancesPerPartition; - instanceIdInPartition++) { - instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]); - instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup; + if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) { + // Minimize data movement. + int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); + int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); + + Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>(); + Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new HashMap<>(); + for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) { + Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId); + if (pool == null) { + // Skip the replica group if it's no longer needed. + continue; + } + Set<String> candidateInstances = + poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>()); + List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool); + instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName())); + + for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { + List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); + replicaGroupIdToInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>()) + .addAll(existingInstances); + } + } + + for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) { + Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId); + if (pool == null) { + // Skip the replica group if it's no longer needed. + continue; + } + // Filter out instances that belong to other replica groups which should not be the candidate. + Set<String> candidateInstances = new LinkedHashSet<>(poolToCandidateInstancesMap.get(pool)); + for (int otherReplicaGroupId = 0; + otherReplicaGroupId < existingNumReplicaGroups && otherReplicaGroupId < numReplicaGroups; + otherReplicaGroupId++) { + if (replicaGroupId != otherReplicaGroupId) { + candidateInstances.removeAll(replicaGroupIdToInstancesMap.get(otherReplicaGroupId)); + } + } + Set<String> chosenCandidateInstances = new HashSet<>(); + for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { + List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); + List<String> instancesToSelect = + getInstancesWithMinimumMovement(numInstancesPerPartition, candidateInstances, existingInstances); + chosenCandidateInstances.addAll(instancesToSelect); + instancePartitions.setInstances(partitionId, replicaGroupId, instancesToSelect); + } + // Remove instances that are already been chosen. + poolToCandidateInstancesMap.get(pool).removeAll(chosenCandidateInstances); + } + + // If the new number of replica groups is greater than the existing number of replica groups. + for (int replicaGroupId = existingNumReplicaGroups; replicaGroupId < numReplicaGroups; replicaGroupId++) { + int pool = replicaGroupIdToPoolMap.get(replicaGroupId); + Set<String> candidateInstances = new LinkedHashSet<>(poolToCandidateInstancesMap.get(pool)); + + Set<String> chosenCandidateInstances = new HashSet<>(); + for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { + List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); + if (existingInstances == null) { + existingInstances = Collections.emptyList(); + } + List<String> instancesToSelect = + getInstancesWithMinimumMovement(numInstancesPerPartition, candidateInstances, existingInstances); Review Comment: After reading the doc, the algorithm is much more clear. Shall we add some comments for the 3 steps of the algorithm before each for loop to explain what we are trying to achieve for each step? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java: ########## @@ -140,25 +132,118 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}", numPartitions, numInstancesPerPartition, _tableNameWithType); - // Assign consecutive instances within a replica-group to each partition. - // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances per partition) - // [i0, i1, i2, i3, i4] - // p0 p0 p0 p1 p1 - // p1 p2 p2 p2 - for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { - int instanceIdInReplicaGroup = 0; - for (int partitionId = 0; partitionId < numPartitions; partitionId++) { - List<String> instancesInPartition = new ArrayList<>(numInstancesPerPartition); - for (int instanceIdInPartition = 0; instanceIdInPartition < numInstancesPerPartition; - instanceIdInPartition++) { - instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]); - instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup; + if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) { + // Minimize data movement. + int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); + int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); + + Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>(); + Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new HashMap<>(); + for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) { + Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId); + if (pool == null) { + // Skip the replica group if it's no longer needed. Review Comment: I see. Suggest adding `int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);` and use it as the end state to be more clear -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org