jackjlli commented on code in PR #11953: URL: https://github.com/apache/pinot/pull/11953#discussion_r1419770234
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java: ########## @@ -73,16 +76,65 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>(); Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>(); Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>(); - for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) { - // Pick one pool for each replica-group based on the table name hash - int pool = pools.get((tableNameHash + replicaId) % numPools); - poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId); - replicaGroupIdToPoolMap.put(replicaId, pool); + Map<String, Integer> instanceToPoolMap = new HashMap<>(); + for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) { + Integer pool = entry.getKey(); + List<InstanceConfig> instanceConfigsInPool = entry.getValue(); + Set<String> candidateInstances = poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>()); + for (InstanceConfig instanceConfig : instanceConfigsInPool) { + String instanceName = instanceConfig.getInstanceName(); + candidateInstances.add(instanceName); + instanceToPoolMap.put(instanceName, pool); + } + } - Set<String> candidateInstances = - poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>()); - List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool); - instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName())); + if (_minimizeDataMovement && _existingInstancePartitions != null) { + // Keep the same pool for the replica group if it's already been used for the table. + int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); + int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); + int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups); Review Comment: Let me put the analysis for RG reduction and increment respectively here. For the RG reduction scenario, it's always the RG(s) with higher numbers that got removed. Let's say numRG got reduced from 2 to 1 (i.e. one out of two replica groups got reduced), it's always the RG1 instead of RG0 that would be removed. In this case, when calculating the new number of instances per RG, we should always use the updated number of RGs in a pool (which comes from `poolToReplicaGroupIdsMap`) to calculate that (detailed logic can be seen in Line 164 of this class). For the RG increment scenario, it's always the RG(s) with higher numbers that got added. In this case, the pool(s) with least number of previous usage would be chosen, which are either 1) the pools would never be used at all (i.e. from a unused pool), or 2) the least frequent chosen pool would be picked up (i.e. from an existing pool). The order is maintained by a min heap, so that the unused pool would always be chosen before choosing any existing pools. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org