Jackie-Jiang commented on code in PR #8483: URL: https://github.com/apache/pinot/pull/8483#discussion_r854686985
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java: ########## @@ -140,25 +132,118 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}", numPartitions, numInstancesPerPartition, _tableNameWithType); - // Assign consecutive instances within a replica-group to each partition. - // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances per partition) - // [i0, i1, i2, i3, i4] - // p0 p0 p0 p1 p1 - // p1 p2 p2 p2 - for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { - int instanceIdInReplicaGroup = 0; - for (int partitionId = 0; partitionId < numPartitions; partitionId++) { - List<String> instancesInPartition = new ArrayList<>(numInstancesPerPartition); - for (int instanceIdInPartition = 0; instanceIdInPartition < numInstancesPerPartition; - instanceIdInPartition++) { - instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]); - instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup; + if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) { + // Minimize data movement. + int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); + int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); + + Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>(); + Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new HashMap<>(); + for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) { + Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId); + if (pool == null) { + // Skip the replica group if it's no longer needed. + continue; + } + Set<String> candidateInstances = + poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>()); + List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool); + instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName())); + + for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { + List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); + replicaGroupIdToInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>()) + .addAll(existingInstances); + } + } + + for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) { + Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId); + if (pool == null) { + // Skip the replica group if it's no longer needed. + continue; + } + // Filter out instances that belong to other replica groups which should not be the candidate. + Set<String> candidateInstances = new LinkedHashSet<>(poolToCandidateInstancesMap.get(pool)); + for (int otherReplicaGroupId = 0; + otherReplicaGroupId < existingNumReplicaGroups && otherReplicaGroupId < numReplicaGroups; + otherReplicaGroupId++) { + if (replicaGroupId != otherReplicaGroupId) { + candidateInstances.removeAll(replicaGroupIdToInstancesMap.get(otherReplicaGroupId)); + } + } + Set<String> chosenCandidateInstances = new HashSet<>(); + for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { + List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); + List<String> instancesToSelect = + getInstancesWithMinimumMovement(numInstancesPerPartition, candidateInstances, existingInstances); + chosenCandidateInstances.addAll(instancesToSelect); + instancePartitions.setInstances(partitionId, replicaGroupId, instancesToSelect); + } + // Remove instances that are already been chosen. + poolToCandidateInstancesMap.get(pool).removeAll(chosenCandidateInstances); + } + + // If the new number of replica groups is greater than the existing number of replica groups. + for (int replicaGroupId = existingNumReplicaGroups; replicaGroupId < numReplicaGroups; replicaGroupId++) { + int pool = replicaGroupIdToPoolMap.get(replicaGroupId); + Set<String> candidateInstances = new LinkedHashSet<>(poolToCandidateInstancesMap.get(pool)); + + Set<String> chosenCandidateInstances = new HashSet<>(); + for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { + List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); + if (existingInstances == null) { + existingInstances = Collections.emptyList(); + } + List<String> instancesToSelect = + getInstancesWithMinimumMovement(numInstancesPerPartition, candidateInstances, existingInstances); Review Comment: We should compute the maximum partitions per server, then use that as a limit to assign the partitions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org