jasperjiaguo commented on code in PR #11578: URL: https://github.com/apache/pinot/pull/11578#discussion_r1372502885
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java: ########## @@ -0,0 +1,353 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment.instance; + +import com.google.common.base.Preconditions; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.assignment.InstancePartitions; +import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Detailed design see https://docs.google.com/document/d/1xxPkGPxyY21gAkFi9gtFDeSzEXjPjp-IQW70kHynsL8 + * During each creation/update/scale, the algorithm will refer to the corresponding tenant level instance partitions and + * generate an instance partition by taking numInstancePerReplicaGroup mirror server sets from the tenant level + * instance partitions. + * + * If an existingInstancePartition is provided, the algorithm will generate a best effort assignment that resembles + * the existingInstancePartition. + * + * Assumptions for this algorithm: + * 1. The number of replica groups in the tenant level instance partitions is the same as the number of replica groups + * in the table config. + * 2. The number of partitions at replica group level is 1 + * 3. This algorithm only works for replica group based table assignment + */ +public class MirrorServerSetInstancePartitionSelector extends InstancePartitionSelector { + private static final Logger LOGGER = LoggerFactory.getLogger(MirrorServerSetInstancePartitionSelector.class); + private final InstancePartitions _preConfiguredInstancePartitions; + + // dimensions of target instance partition + private final int _numTargetInstancesPerReplicaGroup; + private final int _numTargetReplicaGroups; + private final int _numTargetTotalInstances; + + // dimensions of pre-configured instance partition + private int _numPreConfiguredReplicaGroups; + private int _numPreConfiguredInstancesPerReplicaGroup; + + // dimensions of existing instance partition + private int _numExistingReplicaGroups; + private int _numExistingInstancesPerReplicaGroup; + + // look up tables for pre-configured instance partition + private final List<List<String>> _preConfiguredMirroredServerLists = new ArrayList<>(); + private final Map<String, Integer> _preConfiguredInstanceNameToOffsetMap = new HashMap<>(); + + private final List<List<String>> _existingMirroredServerLists = new ArrayList<>(); + + public MirrorServerSetInstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, + String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, + InstancePartitions preConfiguredInstancePartitions) { + super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions); + _preConfiguredInstancePartitions = preConfiguredInstancePartitions; + _numTargetInstancesPerReplicaGroup = _replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup(); + _numTargetReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups(); + _numTargetTotalInstances = _numTargetInstancesPerReplicaGroup * _numTargetReplicaGroups; + } + + /** + * validate if the poolToInstanceConfigsMap is a valid input for pre-configuration based replica-group selection + */ + private void validatePoolDiversePreconditions(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap) { + + LOGGER.info("Validating pre-configured instance partitions for pre-configuration based replica-group selection"); + + // numTargetInstancesPerReplica should be positive + Preconditions.checkState(_numTargetInstancesPerReplicaGroup > 0, + "Number of instances per replica must be positive"); + LOGGER.info("Number of instances per replica: {}", _numTargetInstancesPerReplicaGroup); + // _numTargetReplicaGroups should be positive + Preconditions.checkState(_numTargetReplicaGroups > 0, "Number of replica-groups must be positive"); + LOGGER.info("Number of replica-groups: {}", _numTargetReplicaGroups); + // validate target partition count is 1 + Preconditions.checkState(_replicaGroupPartitionConfig.getNumPartitions() <= 1, + "This algorithm does not support table level partitioning for target assignment"); + LOGGER.info("Number of partitions: {}", _replicaGroupPartitionConfig.getNumPartitions()); + + // Validate the existing instance partitions is null or has only one partition + Preconditions.checkState( + (_existingInstancePartitions == null || _existingInstancePartitions.getNumPartitions() == 1), + "This algorithm does not support table level partitioning for existing assignment"); + LOGGER.info("Number of partitions in existing instance partitions: {}", _existingInstancePartitions == null ? 0 + : _existingInstancePartitions.getNumPartitions()); + + _numExistingReplicaGroups = + _existingInstancePartitions == null ? 0 : _existingInstancePartitions.getNumReplicaGroups(); + _numExistingInstancesPerReplicaGroup = + _existingInstancePartitions == null ? 0 : _existingInstancePartitions.getInstances(0, 0).size(); + + // Validate the pre-configured instance partitions is not null and has only one partition + Preconditions.checkState(_preConfiguredInstancePartitions != null, + "Pre-configured instance partitions must be provided for pre-configuration based selection"); + Preconditions.checkState(_preConfiguredInstancePartitions.getNumPartitions() == 1, + "This algorithm does not support table level partitioning for pre-configured assignment"); + LOGGER.info("Number of partitions in pre-configured instance partitions: {}", _preConfiguredInstancePartitions + .getNumPartitions()); + + // Validate the number of replica-groups in the pre-configured instance partitions is equal to the target + // number of replica-groups + _numPreConfiguredReplicaGroups = _preConfiguredInstancePartitions.getNumReplicaGroups(); + Preconditions.checkState(_numPreConfiguredReplicaGroups == _numTargetReplicaGroups, + "The number of replica-groups %s in the pre-configured instance partitions " + + "is not equal to the target number of replica-groups %s", _numPreConfiguredReplicaGroups, + _numTargetReplicaGroups); + LOGGER.info("Number of replica-groups in pre-configured instance partitions: {}", _numPreConfiguredReplicaGroups); + + // Validate the number of instances per replica-group in the pre-configured instance partitions is greater than or + // equal to the target number of instances per replica-group + _numPreConfiguredInstancesPerReplicaGroup = _preConfiguredInstancePartitions.getInstances(0, 0).size(); + Preconditions.checkState(_numPreConfiguredInstancesPerReplicaGroup >= _numTargetInstancesPerReplicaGroup, + "The number of instances per replica-group in the pre-configured " + + "instance partitions is less than the target number of instances per replica-group"); + LOGGER.info("Number of instances per replica-group in pre-configured instance partitions: {}", + _numPreConfiguredInstancesPerReplicaGroup); + + // Validate the pool to instance configs map is not null or empty + Preconditions.checkNotNull(poolToInstanceConfigsMap, "poolToInstanceConfigsMap is null"); + int numPools = poolToInstanceConfigsMap.size(); + Preconditions.checkState(numPools > 0, "No pool qualified for selection"); + Preconditions.checkState(poolToInstanceConfigsMap.values().stream().map(List::size).reduce(Integer::sum) + .orElse(0) >= _numTargetTotalInstances, + "The total number of instances in all pools is less than the target number of target instances"); + + HashSet<String> availableInstanceSet = new HashSet<>(); + poolToInstanceConfigsMap.values().forEach(list -> list.forEach(i -> availableInstanceSet.add(i.getInstanceName()))); + LOGGER.info("Number of pools: {}", numPools); + LOGGER.info("Number of instances in all pools: {}", availableInstanceSet.size()); + LOGGER.info("availableInstanceSet: {}", availableInstanceSet); + + for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) { + List<String> instances = _preConfiguredInstancePartitions.getInstances(0, i); + for (String instance : instances) { + Preconditions.checkState(availableInstanceSet.contains(instance), + "Instance %s in pre-configured instance partitions is not in " + + "the pool to instance configs map", + instance); + } + } + + LOGGER.info("Validation passed. The instances provided can satisfy the pool diverse requirement."); + LOGGER.info("Trying to assign total {} instances to {} replica groups, " + "with {} instance per replica group", + _numTargetTotalInstances, _numTargetReplicaGroups, _numTargetInstancesPerReplicaGroup); + } + + void createListFromPreConfiguredInstanceAssignmentMap() { + List<List<String>> preConfiguredReplicaGroups = new ArrayList<>(_numPreConfiguredReplicaGroups); + for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) { + preConfiguredReplicaGroups.add(_preConfiguredInstancePartitions.getInstances(0, i)); + } + + for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) { + List<String> mirroredServerList = new ArrayList<>(); + for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) { + mirroredServerList.add(preConfiguredReplicaGroups.get(i).get(j)); + } + _preConfiguredMirroredServerLists.add(mirroredServerList); + } + } + + void createLookupTablesFromPreConfiguredInstanceAssignmentMap() { + List<List<String>> preConfiguredReplicaGroups = new ArrayList<>(_numPreConfiguredReplicaGroups); + for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) { + preConfiguredReplicaGroups.add(_preConfiguredInstancePartitions.getInstances(0, i)); + } + + for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) { + for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) { + String instance = preConfiguredReplicaGroups.get(i).get(j); + _preConfiguredInstanceNameToOffsetMap.put(instance, j); + } + } + } + + @Override + public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap, + InstancePartitions instancePartitions) { + if (_replicaGroupPartitionConfig.isReplicaGroupBased()) { + validatePoolDiversePreconditions(poolToInstanceConfigsMap); + if (_existingInstancePartitions == null) { + // If no existing instance partitions, create new instance partitions based on the pre-configured instance + // partitions. This is done by just selecting _targetNumInstancesPerReplicaGroup set of mirrored servers + // from the pre-configured instance partitions. + LOGGER.info("No existing instance partitions found. Will build new on top of" + + " the pre-configured instance partitions"); + // create a list of lists of mirrored servers from the pre-configured instance partitions + createListFromPreConfiguredInstanceAssignmentMap(); + // shuffle the list of lists of mirrored servers based on the table name hash + int tableNameHash = Math.abs(_tableNameWithType.hashCode()); + // initialize a list of indices from 0 to _numPreConfiguredInstancesPerReplicaGroup + List<Integer> shuffledIndex = new ArrayList<>(_numPreConfiguredInstancesPerReplicaGroup); + for (int i = 0; i < _numPreConfiguredInstancesPerReplicaGroup; i++) { + shuffledIndex.add(i); + } + // shuffle the list of indices based on the table name hash + Collections.shuffle(shuffledIndex, new Random(tableNameHash)); + // select the first _numTargetInstancesPerReplicaGroup indices + shuffledIndex = shuffledIndex.subList(0, _numTargetInstancesPerReplicaGroup); + // sort the list of indices so that they follow the original order of the pre-configured instance partitions + shuffledIndex.sort(Comparator.naturalOrder()); + + // create the instance partitions based on the shuffled list of mirrored servers + List<List<String>> resultReplicaGroups = new ArrayList<>(_numTargetReplicaGroups); + for (int i = 0; i < _numTargetReplicaGroups; i++) { + resultReplicaGroups.add(new ArrayList<>(_numTargetInstancesPerReplicaGroup)); + } + + // populate the instance partitions with the selected mirrored servers + for (int j = 0; j < _numTargetInstancesPerReplicaGroup; j++) { + for (int i = 0; i < _numTargetReplicaGroups; i++) { + resultReplicaGroups.get(i).add(_preConfiguredMirroredServerLists.get(shuffledIndex.get(j)).get(i)); + } + } + for (int i = 0; i < _numTargetReplicaGroups; i++) { + instancePartitions.setInstances(0, i, resultReplicaGroups.get(i)); + } + } else { + // If existing instance partitions exist, adjust the existing instance partitions based on the pre-configured + // instance partitions. This code path takes care of instance replacement, uplift, and downlift. + // This is done by search in the pre-configured instance partitions for the mirrored + // servers sets that are similar to the existing sets in instance partitions. + LOGGER.info("Existing instance partitions found. Will adjust the existing instance partitions" + + " based on the pre-configured instance partitions"); + createListFromPreConfiguredInstanceAssignmentMap(); + createLookupTablesFromPreConfiguredInstanceAssignmentMap(); + createListAndLookupTablesFromExistingInstancePartitions(); + Set<Integer> usedPreconfiguredInstanceOffsets = new HashSet<>(); + Map<Integer, Map.Entry<Integer, Long>> existingOffsetToResultTuple = new HashMap<>(); + + // For each instance offset, find the mirrored server that is most similar to the existing mirrored server + // set. If this mirrored server is not used, add it to the result list. + for (int j = 0; j < _numExistingInstancesPerReplicaGroup; j++) { + List<String> existingMirroredServers = _existingMirroredServerLists.get(j); + int finalJ = j; + existingMirroredServers.stream() + .map(_preConfiguredInstanceNameToOffsetMap::get) + .filter(Objects::nonNull) + .filter(offset -> !usedPreconfiguredInstanceOffsets.contains(offset)) + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())) + .entrySet().stream().max(Map.Entry.comparingByValue()).ifPresent(e -> { + existingOffsetToResultTuple.put(finalJ, e); + usedPreconfiguredInstanceOffsets.add(e.getKey()); + }); + } + + if (_numExistingInstancesPerReplicaGroup > _numTargetInstancesPerReplicaGroup) { + // If this is a downlift case + List<Map.Entry<Integer, Long>> collect = existingOffsetToResultTuple.values() + .stream() + .sorted((a, b) -> b.getValue().compareTo(a.getValue())) + .limit(_numTargetInstancesPerReplicaGroup) + .collect(Collectors.toList()); + int size = collect.size(); + existingOffsetToResultTuple.clear(); + usedPreconfiguredInstanceOffsets.clear(); + for (int j = 0; j < size; j++) { + existingOffsetToResultTuple.put(j, collect.get(j)); + usedPreconfiguredInstanceOffsets.add(collect.get(j).getKey()); + } + } + + if (existingOffsetToResultTuple.size() < _numTargetInstancesPerReplicaGroup) { + // If the number of instances selected from the result list is less than the target number + // of instances per replica group, add the remaining instances from the pre-configured instance partitions. + ArrayList<Integer> shuffledOffsets = new ArrayList<>(_numPreConfiguredInstancesPerReplicaGroup); + for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) { + shuffledOffsets.add(j); + } + // Commenting this out as + // (1) Shuffling is already done in the initial step. + // (2) We want to keep the order of the pre-configured instance partitions, so that the segment assignment + // strategy for single tenant cluster can be minimized-impact. + // But keeping the code here in case we want to have a specific reordering strategy in the future. + // Collections.shuffle(shuffledOffsets, new Random(Math.abs(_tableNameWithType.hashCode()))); + for (int k = 0, j = 0; j < _numTargetInstancesPerReplicaGroup; j++) { + if (existingOffsetToResultTuple.containsKey(j)) { + continue; + } + while (usedPreconfiguredInstanceOffsets.contains(shuffledOffsets.get(k))) { + k++; + } + Integer offset = shuffledOffsets.get(k); + existingOffsetToResultTuple.put(j, new AbstractMap.SimpleEntry<>(offset, 0L)); + usedPreconfiguredInstanceOffsets.add(offset); + } + } + + List<List<String>> resultReplicaGroups = new ArrayList<>(_numTargetReplicaGroups); + for (int i = 0; i < _numTargetReplicaGroups; i++) { + resultReplicaGroups.add(new ArrayList<>(_numTargetInstancesPerReplicaGroup)); + } + for (int j = 0; j < _numTargetInstancesPerReplicaGroup; j++) { + List<String> mirrorServers = + _preConfiguredMirroredServerLists.get(existingOffsetToResultTuple.get(j).getKey()); + for (int i = 0; i < _numTargetReplicaGroups; i++) { + resultReplicaGroups.get(i).add(mirrorServers.get(i)); + } + } + for (int i = 0; i < _numTargetReplicaGroups; i++) { + instancePartitions.setInstances(0, i, resultReplicaGroups.get(i)); + } + } + } else { + throw new IllegalStateException("Does not support Non-replica-group based selection"); + } + } + + private void createListAndLookupTablesFromExistingInstancePartitions() { Review Comment: renamed ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java: ########## @@ -0,0 +1,353 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment.instance; + +import com.google.common.base.Preconditions; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.assignment.InstancePartitions; +import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Detailed design see https://docs.google.com/document/d/1xxPkGPxyY21gAkFi9gtFDeSzEXjPjp-IQW70kHynsL8 + * During each creation/update/scale, the algorithm will refer to the corresponding tenant level instance partitions and + * generate an instance partition by taking numInstancePerReplicaGroup mirror server sets from the tenant level + * instance partitions. + * + * If an existingInstancePartition is provided, the algorithm will generate a best effort assignment that resembles + * the existingInstancePartition. + * + * Assumptions for this algorithm: + * 1. The number of replica groups in the tenant level instance partitions is the same as the number of replica groups + * in the table config. + * 2. The number of partitions at replica group level is 1 + * 3. This algorithm only works for replica group based table assignment + */ +public class MirrorServerSetInstancePartitionSelector extends InstancePartitionSelector { + private static final Logger LOGGER = LoggerFactory.getLogger(MirrorServerSetInstancePartitionSelector.class); + private final InstancePartitions _preConfiguredInstancePartitions; + + // dimensions of target instance partition + private final int _numTargetInstancesPerReplicaGroup; + private final int _numTargetReplicaGroups; + private final int _numTargetTotalInstances; + + // dimensions of pre-configured instance partition + private int _numPreConfiguredReplicaGroups; + private int _numPreConfiguredInstancesPerReplicaGroup; + + // dimensions of existing instance partition + private int _numExistingReplicaGroups; + private int _numExistingInstancesPerReplicaGroup; + + // look up tables for pre-configured instance partition + private final List<List<String>> _preConfiguredMirroredServerLists = new ArrayList<>(); + private final Map<String, Integer> _preConfiguredInstanceNameToOffsetMap = new HashMap<>(); + + private final List<List<String>> _existingMirroredServerLists = new ArrayList<>(); + + public MirrorServerSetInstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, + String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, + InstancePartitions preConfiguredInstancePartitions) { + super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions); + _preConfiguredInstancePartitions = preConfiguredInstancePartitions; + _numTargetInstancesPerReplicaGroup = _replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup(); + _numTargetReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups(); + _numTargetTotalInstances = _numTargetInstancesPerReplicaGroup * _numTargetReplicaGroups; + } + + /** + * validate if the poolToInstanceConfigsMap is a valid input for pre-configuration based replica-group selection + */ + private void validatePoolDiversePreconditions(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap) { + + LOGGER.info("Validating pre-configured instance partitions for pre-configuration based replica-group selection"); + + // numTargetInstancesPerReplica should be positive + Preconditions.checkState(_numTargetInstancesPerReplicaGroup > 0, + "Number of instances per replica must be positive"); + LOGGER.info("Number of instances per replica: {}", _numTargetInstancesPerReplicaGroup); + // _numTargetReplicaGroups should be positive + Preconditions.checkState(_numTargetReplicaGroups > 0, "Number of replica-groups must be positive"); + LOGGER.info("Number of replica-groups: {}", _numTargetReplicaGroups); + // validate target partition count is 1 + Preconditions.checkState(_replicaGroupPartitionConfig.getNumPartitions() <= 1, + "This algorithm does not support table level partitioning for target assignment"); + LOGGER.info("Number of partitions: {}", _replicaGroupPartitionConfig.getNumPartitions()); + + // Validate the existing instance partitions is null or has only one partition + Preconditions.checkState( + (_existingInstancePartitions == null || _existingInstancePartitions.getNumPartitions() == 1), + "This algorithm does not support table level partitioning for existing assignment"); + LOGGER.info("Number of partitions in existing instance partitions: {}", _existingInstancePartitions == null ? 0 + : _existingInstancePartitions.getNumPartitions()); + + _numExistingReplicaGroups = + _existingInstancePartitions == null ? 0 : _existingInstancePartitions.getNumReplicaGroups(); + _numExistingInstancesPerReplicaGroup = + _existingInstancePartitions == null ? 0 : _existingInstancePartitions.getInstances(0, 0).size(); + + // Validate the pre-configured instance partitions is not null and has only one partition + Preconditions.checkState(_preConfiguredInstancePartitions != null, + "Pre-configured instance partitions must be provided for pre-configuration based selection"); + Preconditions.checkState(_preConfiguredInstancePartitions.getNumPartitions() == 1, + "This algorithm does not support table level partitioning for pre-configured assignment"); + LOGGER.info("Number of partitions in pre-configured instance partitions: {}", _preConfiguredInstancePartitions + .getNumPartitions()); + + // Validate the number of replica-groups in the pre-configured instance partitions is equal to the target + // number of replica-groups + _numPreConfiguredReplicaGroups = _preConfiguredInstancePartitions.getNumReplicaGroups(); + Preconditions.checkState(_numPreConfiguredReplicaGroups == _numTargetReplicaGroups, + "The number of replica-groups %s in the pre-configured instance partitions " + + "is not equal to the target number of replica-groups %s", _numPreConfiguredReplicaGroups, + _numTargetReplicaGroups); + LOGGER.info("Number of replica-groups in pre-configured instance partitions: {}", _numPreConfiguredReplicaGroups); + + // Validate the number of instances per replica-group in the pre-configured instance partitions is greater than or + // equal to the target number of instances per replica-group + _numPreConfiguredInstancesPerReplicaGroup = _preConfiguredInstancePartitions.getInstances(0, 0).size(); + Preconditions.checkState(_numPreConfiguredInstancesPerReplicaGroup >= _numTargetInstancesPerReplicaGroup, + "The number of instances per replica-group in the pre-configured " + + "instance partitions is less than the target number of instances per replica-group"); + LOGGER.info("Number of instances per replica-group in pre-configured instance partitions: {}", + _numPreConfiguredInstancesPerReplicaGroup); + + // Validate the pool to instance configs map is not null or empty + Preconditions.checkNotNull(poolToInstanceConfigsMap, "poolToInstanceConfigsMap is null"); + int numPools = poolToInstanceConfigsMap.size(); + Preconditions.checkState(numPools > 0, "No pool qualified for selection"); + Preconditions.checkState(poolToInstanceConfigsMap.values().stream().map(List::size).reduce(Integer::sum) + .orElse(0) >= _numTargetTotalInstances, + "The total number of instances in all pools is less than the target number of target instances"); + + HashSet<String> availableInstanceSet = new HashSet<>(); + poolToInstanceConfigsMap.values().forEach(list -> list.forEach(i -> availableInstanceSet.add(i.getInstanceName()))); + LOGGER.info("Number of pools: {}", numPools); + LOGGER.info("Number of instances in all pools: {}", availableInstanceSet.size()); + LOGGER.info("availableInstanceSet: {}", availableInstanceSet); + + for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) { + List<String> instances = _preConfiguredInstancePartitions.getInstances(0, i); + for (String instance : instances) { + Preconditions.checkState(availableInstanceSet.contains(instance), + "Instance %s in pre-configured instance partitions is not in " + + "the pool to instance configs map", + instance); + } + } + + LOGGER.info("Validation passed. The instances provided can satisfy the pool diverse requirement."); + LOGGER.info("Trying to assign total {} instances to {} replica groups, " + "with {} instance per replica group", + _numTargetTotalInstances, _numTargetReplicaGroups, _numTargetInstancesPerReplicaGroup); + } + + void createListFromPreConfiguredInstanceAssignmentMap() { + List<List<String>> preConfiguredReplicaGroups = new ArrayList<>(_numPreConfiguredReplicaGroups); + for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) { + preConfiguredReplicaGroups.add(_preConfiguredInstancePartitions.getInstances(0, i)); + } + + for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) { + List<String> mirroredServerList = new ArrayList<>(); + for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) { + mirroredServerList.add(preConfiguredReplicaGroups.get(i).get(j)); + } + _preConfiguredMirroredServerLists.add(mirroredServerList); + } + } + + void createLookupTablesFromPreConfiguredInstanceAssignmentMap() { + List<List<String>> preConfiguredReplicaGroups = new ArrayList<>(_numPreConfiguredReplicaGroups); + for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) { + preConfiguredReplicaGroups.add(_preConfiguredInstancePartitions.getInstances(0, i)); + } + + for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) { + for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) { + String instance = preConfiguredReplicaGroups.get(i).get(j); + _preConfiguredInstanceNameToOffsetMap.put(instance, j); + } + } + } + + @Override + public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap, + InstancePartitions instancePartitions) { + if (_replicaGroupPartitionConfig.isReplicaGroupBased()) { + validatePoolDiversePreconditions(poolToInstanceConfigsMap); + if (_existingInstancePartitions == null) { + // If no existing instance partitions, create new instance partitions based on the pre-configured instance + // partitions. This is done by just selecting _targetNumInstancesPerReplicaGroup set of mirrored servers + // from the pre-configured instance partitions. + LOGGER.info("No existing instance partitions found. Will build new on top of" + + " the pre-configured instance partitions"); + // create a list of lists of mirrored servers from the pre-configured instance partitions + createListFromPreConfiguredInstanceAssignmentMap(); Review Comment: renamed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org