jackjlli commented on code in PR #8483: URL: https://github.com/apache/pinot/pull/8483#discussion_r856594719
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java: ########## @@ -180,14 +266,110 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon numInstancesToSelect = numInstanceConfigs; } - List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect); - for (int i = 0; i < numInstancesToSelect; i++) { - instancesToSelect.add(instanceConfigs.get(i).getInstanceName()); + List<String> instancesToSelect; + if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) { + // Minimize data movement. + List<String> existingInstances = _existingInstancePartitions.getInstances(0, 0); + Set<String> candidateInstances = new LinkedHashSet<>(); + instanceConfigs.forEach(k -> candidateInstances.add(k.getInstanceName())); + instancesToSelect = + getInstancesWithMinimumMovement(numInstancesToSelect, candidateInstances, existingInstances); + } else { + // Select instances sequentially. + instancesToSelect = new ArrayList<>(numInstancesToSelect); + for (int i = 0; i < numInstancesToSelect; i++) { + instancesToSelect.add(instanceConfigs.get(i).getInstanceName()); + } } instancesToSelect.sort(null); LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, _tableNameWithType); // Set the instances as partition 0 replica 0 instancePartitions.setInstances(0, 0, instancesToSelect); } } + + /** + * Select instances with minimum movement. + * This algorithm can solve the following scenarios: + * * swap an instance + * * add/remove replica groups + * * increase/decrease number of instances per replica group + * TODO: handle the scenarios that selected pools are changed. + * @param numInstancesToSelect number of instances to select + * @param candidateInstances candidate instances to be selected + * @param existingInstances list of existing instances + */ + protected List<String> getInstancesWithMinimumMovement(int numInstancesToSelect, Set<String> candidateInstances, + List<String> existingInstances) { + // Initialize the list with empty positions to fill. + List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect); + for (int i = 0; i < numInstancesToSelect; i++) { + instancesToSelect.add(null); + } + Deque<String> newlyAddedInstances = new LinkedList<>(); + + // Find out the existing instances that are still alive. + Set<String> existingInstancesStillAlive = new HashSet<>(); + for (String existingInstance : existingInstances) { + if (candidateInstances.contains(existingInstance)) { + existingInstancesStillAlive.add(existingInstance); + } + } + + // Find out the newly added instances. + for (String candidateInstance : candidateInstances) { + if (!existingInstancesStillAlive.contains(candidateInstance)) { + newlyAddedInstances.add(candidateInstance); + } + } + + for (int i = 0; i < existingInstances.size() && i < numInstancesToSelect; i++) { + String existingInstance = existingInstances.get(i); + if (candidateInstances.contains(existingInstance)) { + // If the instance still exists, add it to the instance list. + instancesToSelect.set(i, existingInstance); + existingInstancesStillAlive.remove(existingInstance); + // Add the existing instance to the tail so that it won't be firstly chosen for the next partition. + candidateInstances.remove(existingInstance); + candidateInstances.add(existingInstance); + } else if (!newlyAddedInstances.isEmpty()) { + // If the instance no longer exists, pick a new instance to fill its vacant position. + String newInstance = newlyAddedInstances.pollFirst(); + instancesToSelect.set(i, newInstance); + } + } Review Comment: simplified. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org