jackjlli commented on a change in pull request #8441: URL: https://github.com/apache/pinot/pull/8441#discussion_r840976672
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java ########## @@ -48,52 +52,117 @@ public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String table /** * Returns a map from pool to instance configs based on the tag and pool config for the given instance configs. + * @param instanceConfigs list of latest instance configs from ZK. + * @param existingPoolToInstancesMap existing instance with sequence that should be respected. An empty list + * means no preceding sequence to respect and the instances would be sorted. */ - public Map<Integer, List<InstanceConfig>> selectInstances(List<InstanceConfig> instanceConfigs) { + public Map<Integer, List<InstanceConfig>> selectInstances(List<InstanceConfig> instanceConfigs, + Map<Integer, List<String>> existingPoolToInstancesMap) { int tableNameHash = Math.abs(_tableNameWithType.hashCode()); LOGGER.info("Starting instance tag/pool selection for table: {} with hash: {}", _tableNameWithType, tableNameHash); - // Filter out the instances with the correct tag + // If existingPoolToInstancesMap is null, treat it as an empty map. + if (existingPoolToInstancesMap == null) { + existingPoolToInstancesMap = Collections.emptyMap(); + } + // Filter out the instances with the correct tag. + // Use LinkedHashMap here to retain the sorted list of instance names. String tag = _tagPoolConfig.getTag(); - List<InstanceConfig> candidateInstanceConfigs = new ArrayList<>(); + Map<String, InstanceConfig> candidateInstanceConfigsMap = new LinkedHashMap<>(); for (InstanceConfig instanceConfig : instanceConfigs) { if (instanceConfig.getTags().contains(tag)) { - candidateInstanceConfigs.add(instanceConfig); + candidateInstanceConfigsMap.put(instanceConfig.getInstanceName(), instanceConfig); } } - candidateInstanceConfigs.sort(Comparator.comparing(InstanceConfig::getInstanceName)); - int numCandidateInstances = candidateInstanceConfigs.size(); + + // Find out newly added instances from the latest copies of instance configs. + // A deque is used here in order to retain the sequence, + // given the fact that the list of instance configs is always sorted. + Deque<String> newlyAddedInstances = new LinkedList<>(candidateInstanceConfigsMap.keySet()); + for (List<String> existingInstancesWithSequence : existingPoolToInstancesMap.values()) { + newlyAddedInstances.removeAll(existingInstancesWithSequence); + } + + int numCandidateInstances = candidateInstanceConfigsMap.size(); Preconditions.checkState(numCandidateInstances > 0, "No enabled instance has the tag: %s", tag); LOGGER.info("{} enabled instances have the tag: {} for table: {}", numCandidateInstances, tag, _tableNameWithType); - Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = new TreeMap<>(); + Map<Integer, List<InstanceConfig>> poolToLatestInstanceConfigsMap = new TreeMap<>(); if (_tagPoolConfig.isPoolBased()) { - // Pool based selection + // Pool based selection. All the instances should be associated with a specific pool number. + // Instance selection should be done within the same pool. + // E.g.: Pool0 -> [ I1, I2, I3 ] + // Pool1 -> [ I4, I5, I6 ] - // Extract the pool information from the instance configs - for (InstanceConfig instanceConfig : candidateInstanceConfigs) { + // Each pool number associates with a map that key is the instance name and value is the instance config. + Map<Integer, Map<String, InstanceConfig>> poolToInstanceConfigsMap = new HashMap<>(); + // Each pool number associates with a list of newly added instance configs, + // so that new instances can be fetched from this list. + Map<Integer, Deque<InstanceConfig>> poolToNewInstanceConfigsMap = new HashMap<>(); + + // Extract the pool information from the instance configs. + for (Map.Entry<String, InstanceConfig> entry : candidateInstanceConfigsMap.entrySet()) { + String instanceName = entry.getKey(); + InstanceConfig instanceConfig = entry.getValue(); Map<String, String> poolMap = instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY); if (poolMap != null && poolMap.containsKey(tag)) { int pool = Integer.parseInt(poolMap.get(tag)); - poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(instanceConfig); + poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new TreeMap<>()).put(instanceName, instanceConfig); + if (newlyAddedInstances.contains(instanceName)) { + poolToNewInstanceConfigsMap.computeIfAbsent(pool, k -> new LinkedList<>()).add(instanceConfig); + } + } + } + + for (Map.Entry<Integer, List<String>> entry : existingPoolToInstancesMap.entrySet()) { + Integer pool = entry.getKey(); + List<String> existingInstanceAssignmentInPool = entry.getValue(); + List<InstanceConfig> candidateInstanceConfigsWithSequence = new ArrayList<>(); + for (String existingInstance: existingInstanceAssignmentInPool) { + InstanceConfig instanceConfig = poolToInstanceConfigsMap.get(pool).get(existingInstance); + // Add instances to the candidate list and respect the sequence of the existing instances from the ZK. + // The missing/removed instances will be replaced by the newly instances. + // If the instance still exists from ZK, then add it to the candidate list. + // E.g. if the old instances are: [I1, I2, I3, I4] and the new instance are: [I1, I3, I4, I5, I6], + // the removed instance is I2 and the newly added instances are I5 and I6. + // The position of I2 would be replaced by I5, the new remaining I6 would be appended to the tail. + // Thus, the new order would be [I1, I5, I3, I4, I6]. Review comment: Regarding to your 2nd question, it's a mixed scenario of swapping a host and adding 1 more replica group. For swapping a host, the current algorithm works well as I5 is going to replace I2 as expected. For adding 1 more replica group, yes the current algorithm will inevitably shuffle the instances for different replica groups. We can add a TODO here and solve it in another PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org