jackjlli commented on code in PR #8483: URL: https://github.com/apache/pinot/pull/8483#discussion_r856840900
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java: ########## @@ -180,14 +266,104 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon numInstancesToSelect = numInstanceConfigs; } - List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect); - for (int i = 0; i < numInstancesToSelect; i++) { - instancesToSelect.add(instanceConfigs.get(i).getInstanceName()); + List<String> instancesToSelect; + if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) { + // Minimize data movement. + List<String> existingInstances = _existingInstancePartitions.getInstances(0, 0); + LinkedHashSet<String> candidateInstances = new LinkedHashSet<>(); + instanceConfigs.forEach(k -> candidateInstances.add(k.getInstanceName())); + instancesToSelect = + getInstancesWithMinimumMovement(numInstancesToSelect, candidateInstances, existingInstances); + } else { + // Select instances sequentially. + instancesToSelect = new ArrayList<>(numInstancesToSelect); + for (int i = 0; i < numInstancesToSelect; i++) { + instancesToSelect.add(instanceConfigs.get(i).getInstanceName()); + } } instancesToSelect.sort(null); LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, _tableNameWithType); // Set the instances as partition 0 replica 0 instancePartitions.setInstances(0, 0, instancesToSelect); } } + + /** + * Select instances with minimum movement. + * This algorithm can solve the following scenarios: + * * swap an instance + * * add/remove replica groups + * * increase/decrease number of instances per replica group + * TODO: handle the scenarios that selected pools are changed. + * TODO: improve the algorithm by doing the following steps: + * 1. assign the existing instances for all partitions; + * 2. assign the vacant positions based on the partitions already assigned to each instance. + * @param numInstancesToSelect number of instances to select + * @param candidateInstances candidate instances to be selected + * @param existingInstances list of existing instances + */ + private static List<String> getInstancesWithMinimumMovement(int numInstancesToSelect, + LinkedHashSet<String> candidateInstances, List<String> existingInstances) { + // Initialize the list with empty positions to fill. + List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect); + for (int i = 0; i < numInstancesToSelect; i++) { + instancesToSelect.add(null); + } + Deque<String> newlyAddedInstances = new LinkedList<>(); + + // Find out the existing instances that are still alive. + Set<String> existingInstancesStillAlive = new HashSet<>(); + for (String existingInstance : existingInstances) { + if (candidateInstances.contains(existingInstance)) { + existingInstancesStillAlive.add(existingInstance); + } + } + + // Find out the newly added instances. + for (String candidateInstance : candidateInstances) { + if (!existingInstancesStillAlive.contains(candidateInstance)) { + newlyAddedInstances.add(candidateInstance); + } + } + + int numExistingInstances = existingInstances.size(); + for (int i = 0; i < numInstancesToSelect; i++) { + String existingInstance = i < numExistingInstances ? existingInstances.get(i) : null; + String selectedInstance; + if (existingInstance != null && candidateInstances.contains(existingInstance)) { + selectedInstance = existingInstance; + existingInstancesStillAlive.remove(selectedInstance); + } else { + selectedInstance = newlyAddedInstances.poll(); + } + instancesToSelect.set(i, selectedInstance); + // If it's an existing alive instance, or it's for a new replica group, add the new instance to the tail, + // so that it won't be firstly chosen for the next partition. + // For newly added instances to fill the existing replica group, the sequence cannot change; + // otherwise there is no guarantee that same vacant position will be filled with the same new instance. + // The 'selectedInstance' object can still be null if there is no new instances from the candidate list. + if (selectedInstance != null && (i < numExistingInstances || existingInstances.isEmpty())) { + candidateInstances.remove(selectedInstance); + candidateInstances.add(selectedInstance); + } + } + + // If there are still some vacant positions in the instance list, + // try to fill with instances which are either left over or newly added. + for (int i = 0; i < instancesToSelect.size(); i++) { + if (instancesToSelect.get(i) == null) { + if (!existingInstancesStillAlive.isEmpty()) { + Iterator<String> iterator = existingInstancesStillAlive.iterator(); + String existingInstanceLeftOver = iterator.next(); + instancesToSelect.set(i, existingInstanceLeftOver); + iterator.remove(); + } else if (!newlyAddedInstances.isEmpty()) { + // pick a new instance to fill its vacant position. + String newInstance = newlyAddedInstances.pollFirst(); + instancesToSelect.set(i, newInstance); + } + } + } Review Comment: This is still needed for the scenarios when some of the instances got removed and there is no new instances added back to the cluster, then the instances from the tail will fill the vacant positions. E.g. supposed the existing instance assignment is [i1, i2, i3, i4, i5], and if i3 is removed/decommissioned, and there is no new instance that added back, the very last instance (i.e. i5) should fill i3's vacant position. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org