jackjlli commented on code in PR #8483: URL: https://github.com/apache/pinot/pull/8483#discussion_r856589037
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java: ########## @@ -140,25 +133,118 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}", numPartitions, numInstancesPerPartition, _tableNameWithType); - // Assign consecutive instances within a replica-group to each partition. - // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances per partition) - // [i0, i1, i2, i3, i4] - // p0 p0 p0 p1 p1 - // p1 p2 p2 p2 - for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { - int instanceIdInReplicaGroup = 0; - for (int partitionId = 0; partitionId < numPartitions; partitionId++) { - List<String> instancesInPartition = new ArrayList<>(numInstancesPerPartition); - for (int instanceIdInPartition = 0; instanceIdInPartition < numInstancesPerPartition; - instanceIdInPartition++) { - instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]); - instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup; + if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) { + // Minimize data movement. + int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); + int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); + + Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>(); + Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new HashMap<>(); + for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) { + Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId); + if (pool == null) { + // Skip the replica group if it's no longer needed. + continue; + } + Set<String> candidateInstances = + poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>()); + List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool); + instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName())); + + for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { + List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); + replicaGroupIdToInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>()) + .addAll(existingInstances); + } + } + + for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) { Review Comment: I also thought about that, but the thing is that if number of replica groups increased, the number of instances per replica groups could be reduced. If that case, the ones that used to belong to the original RG but didn't get picked up in the same RG cannot be used for the new RG. I'll just leave it as it is right now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org