This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch maintain-pool-selection-for-minimizeDataMovement in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 25b4c3a212b234106a92e30489b03f195d3eb60c Author: jlli_LinkedIn <j...@linkedin.com> AuthorDate: Fri Dec 29 23:26:52 2023 -0800 Add logic to consider the case when instances are moved across pools --- .../InstanceReplicaGroupPartitionSelector.java | 75 +++++++++++++++++---- .../instance/InstanceTagPoolSelector.java | 38 ++++++----- .../InstanceReplicaGroupPartitionSelectorTest.java | 76 ++++++++++++++++++++-- .../java/org/apache/pinot/spi/utils/Pairs.java | 23 ++++++- 4 files changed, 171 insertions(+), 41 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java index 79e95db7a6..505006f1d3 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java @@ -74,6 +74,9 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele int numReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups(); Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups must be positive"); Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>(); + Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new TreeMap<>(); + Map<Integer, Set<Integer>> existingPoolToExistingReplicaGroupIdsMap = new TreeMap<>(); + Map<Integer, Set<String>> existingReplicaGroupIdToExistingInstancesMap = new TreeMap<>(); Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>(); Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>(); Map<String, Integer> instanceToPoolMap = new HashMap<>(); @@ -89,26 +92,70 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele } if (_minimizeDataMovement && _existingInstancePartitions != null) { - // Keep the same pool for the replica group if it's already been used for the table. + // Collect the stats between the existing pools, existing replica groups, and existing instances. int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); - int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups); - for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) { - boolean foundExistingReplicaGroup = false; - for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingReplicaGroup; partitionId++) { + for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) { + for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); for (String existingInstance : existingInstances) { Integer existingPool = instanceToPoolMap.get(existingInstance); - if (existingPool != null & pools.contains(existingPool)) { - poolToReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new ArrayList<>()).add(replicaGroupId); - replicaGroupIdToPoolMap.put(replicaGroupId, existingPool); - foundExistingReplicaGroup = true; - break; + if (existingPool != null) { + existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new HashSet<>()) + .add(existingInstance); + existingPoolToExistingReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new HashSet<>()) + .add(replicaGroupId); + existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>()) + .add(existingInstance); + } + } + } + } + + // Use a max heap to track the number of servers used for the given pools, + // so that pool with max number of existing instances will be considered first. + PriorityQueue<Pairs.IntPair> maxHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator(false)); + for (int pool : pools) { + maxHeap.add( + new Pairs.IntPair(existingPoolsToExistingInstancesMap.computeIfAbsent(pool, k -> new HashSet<>()).size(), + pool)); + } + + // Get the maximum number of replica groups per pool. + int maxNumberOfReplicaGroupPerPool = numReplicaGroups / pools.size(); + // Given a pool number, assign replica group which has the max number of existing instances. + // Repeat this process until the max number of replica groups per pool is reached. + while (!maxHeap.isEmpty()) { + Pairs.IntPair pair = maxHeap.remove(); + int poolNumber = pair.getRight(); + for (int i = 0; i < maxNumberOfReplicaGroupPerPool; i++) { + Set<Integer> existingReplicaGroups = existingPoolToExistingReplicaGroupIdsMap.get(poolNumber); + if (existingReplicaGroups == null || existingReplicaGroups.isEmpty()) { + continue; + } + int targetReplicaGroupId = -1; + int maxNumInstances = 0; + for (int existingReplicaGroupId : existingReplicaGroups) { + int numExistingInstances = + existingReplicaGroupIdToExistingInstancesMap.getOrDefault(existingReplicaGroupId, new HashSet<>()) + .size(); + if (numExistingInstances > maxNumInstances) { + maxNumInstances = numExistingInstances; + targetReplicaGroupId = existingReplicaGroupId; } } + // If target existing replica group cannot be found, it means it should be chosen from a new replica group. + if (targetReplicaGroupId > -1) { + poolToReplicaGroupIdsMap.computeIfAbsent(poolNumber, k -> new ArrayList<>()).add(targetReplicaGroupId); + replicaGroupIdToPoolMap.put(targetReplicaGroupId, poolNumber); + // Clear the stats so that the same replica group won't be picked up again in later iteration. + existingReplicaGroupIdToExistingInstancesMap.get(targetReplicaGroupId).clear(); + } } } - // Use a min heap to track the least frequently picked pool among all the pools + + // If there is any new replica group added, choose pool which is least frequently picked up. + // Use a min heap to track the least frequently picked pool among all the pools. PriorityQueue<Pairs.IntPair> minHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator()); for (int pool : pools) { int numExistingReplicaGroups = @@ -190,7 +237,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups); - Map<Integer, Set<String>> replicaGroupIdToExistingInstancesMap = new TreeMap<>(); + existingReplicaGroupIdToExistingInstancesMap = new TreeMap<>(); // Step 1: find out the replica groups and their existing instances, // so that these instances can be filtered out and won't be chosen for the other replica group. for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) { @@ -202,7 +249,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); - replicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>()) + existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>()) .addAll(existingInstances); } } @@ -215,7 +262,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele otherReplicaGroupId < existingNumReplicaGroups && otherReplicaGroupId < numReplicaGroups; otherReplicaGroupId++) { if (replicaGroupId != otherReplicaGroupId) { - candidateInstances.removeAll(replicaGroupIdToExistingInstancesMap.get(otherReplicaGroupId)); + candidateInstances.removeAll(existingReplicaGroupIdToExistingInstancesMap.get(otherReplicaGroupId)); } } LinkedHashSet<String> chosenCandidateInstances = new LinkedHashSet<>(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java index 2062a75209..940968432b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java @@ -22,16 +22,18 @@ import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; import java.util.Set; import java.util.TreeMap; -import java.util.TreeSet; import javax.annotation.Nullable; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.utils.config.InstanceUtils; import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig; +import org.apache.pinot.spi.utils.Pairs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,36 +125,38 @@ public class InstanceTagPoolSelector { poolsToSelect = new ArrayList<>(numPoolsToSelect); if (_minimizeDataMovement && _existingInstancePartitions != null) { - Set<Integer> existingPools = new TreeSet<>(); + Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new TreeMap<>(); // Keep the same pool if it's already been used for the table. int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) { - boolean foundExistingPoolForReplicaGroup = false; - for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingPoolForReplicaGroup; - partitionId++) { + for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); for (String existingInstance : existingInstances) { Integer existingPool = instanceToPoolMap.get(existingInstance); if (existingPool != null) { - if (existingPools.add(existingPool)) { - poolsToSelect.add(existingPool); + if (!existingPoolsToExistingInstancesMap.containsKey(existingPool)) { + existingPoolsToExistingInstancesMap.put(existingPool, new HashSet<>()); } - foundExistingPoolForReplicaGroup = true; - break; + existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new HashSet<>()) + .add(existingInstance); } } } } - LOGGER.info("Keep the same pool: {} for table: {}", existingPools, _tableNameWithType); - // Pick a pool from remainingPools that isn't used before. - List<Integer> remainingPools = new ArrayList<>(pools); - remainingPools.removeAll(existingPools); - // Select from the remaining pools. - int remainingNumPoolsToSelect = numPoolsToSelect - poolsToSelect.size(); - for (int i = 0; i < remainingNumPoolsToSelect; i++) { - poolsToSelect.add(remainingPools.remove(i % remainingPools.size())); + + // Use a max heap to track the number of servers used for all the pools. + PriorityQueue<Pairs.IntPair> maxHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator(false)); + for (int pool : pools) { + maxHeap.add(new Pairs.IntPair(existingPoolsToExistingInstancesMap.get(pool).size(), pool)); + } + + // Pick the pools from the max heap, so that data movement be minimized. + for (int i = 0; i < numPoolsToSelect; i++) { + Pairs.IntPair pair = maxHeap.remove(); + poolsToSelect.add(pair.getRight()); } + LOGGER.info("The selected pools: " + poolsToSelect); } else { // Select pools based on the table name hash to evenly distribute the tables List<Integer> poolsInCluster = new ArrayList<>(pools); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java index 2fdef27796..fdb6292f26 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java @@ -34,6 +34,8 @@ import org.testng.annotations.Test; public class InstanceReplicaGroupPartitionSelectorTest { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String INSTANCE_CONFIG_TEMPLATE = "{\n" + " \"id\": \"Server_pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + " \"simpleFields\": {\n" + " \"HELIX_ENABLED\": \"true\",\n" @@ -51,15 +53,15 @@ public class InstanceReplicaGroupPartitionSelectorTest { + " ]\n" + " }\n" + "}"; @Test - public void testSelectInstances() throws JsonProcessingException { - ObjectMapper objectMapper = new ObjectMapper(); + public void testPoolsWhenOneMorePoolAddedAndOneMoreReplicaGroupsNeeded() + throws JsonProcessingException { String existingPartitionsJson = " {\n" + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n" + " \"partitionToInstancesMap\": {\n" + " \"0_0\": [\n" + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" + " ]\n" + " }\n" + " }\n"; - InstancePartitions existing = objectMapper.readValue(existingPartitionsJson, InstancePartitions.class); + InstancePartitions existing = OBJECT_MAPPER.readValue(existingPartitionsJson, InstancePartitions.class); InstanceReplicaGroupPartitionConfig config = new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null); @@ -68,8 +70,10 @@ public class InstanceReplicaGroupPartitionSelectorTest { String[] serverNames = {"rg0-0", "rg0-1", "rg1-0", "rg1-1"}; String[] poolNumbers = {"0", "0", "1", "1"}; - String[] poolNames = {"FirstHalfReplicationGroups", "FirstHalfReplicationGroups", "SecondHalfReplicationGroups", - "SecondHalfReplicationGroups"}; + String[] poolNames = { + "FirstHalfReplicationGroups", "FirstHalfReplicationGroups", "SecondHalfReplicationGroups", + "SecondHalfReplicationGroups" + }; Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = new HashMap<>(); for (int i = 0; i < serverNames.length; i++) { @@ -81,13 +85,15 @@ public class InstanceReplicaGroupPartitionSelectorTest { StringSubstitutor substitutor = new StringSubstitutor(valuesMap); String resolvedString = substitutor.replace(INSTANCE_CONFIG_TEMPLATE); - ZNRecord znRecord = objectMapper.readValue(resolvedString, ZNRecord.class); + ZNRecord znRecord = OBJECT_MAPPER.readValue(resolvedString, ZNRecord.class); int poolNumber = Integer.parseInt(poolNumbers[i]); poolToInstanceConfigsMap.computeIfAbsent(poolNumber, k -> new ArrayList<>()).add(new InstanceConfig(znRecord)); } InstancePartitions assignedPartitions = new InstancePartitions("0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE"); selector.selectInstances(poolToInstanceConfigsMap, assignedPartitions); + // Now that 1 more pool is added and 1 more RG is needed, a new set called "0_1" is generated, + // and the instances from Pool 1 are assigned to this new replica. String expectedInstancePartitions = " {\n" + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n" + " \"partitionToInstancesMap\": {\n" + " \"0_0\": [\n" @@ -98,7 +104,63 @@ public class InstanceReplicaGroupPartitionSelectorTest { + " \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" + " ]\n" + " }\n" + " }\n"; InstancePartitions expectedPartitions = - objectMapper.readValue(expectedInstancePartitions, InstancePartitions.class); + OBJECT_MAPPER.readValue(expectedInstancePartitions, InstancePartitions.class); + assert assignedPartitions.equals(expectedPartitions); + } + + @Test + public void testSelectPoolsWhenExistingReplicaGroupMapsToMultiplePools() + throws JsonProcessingException { + // The "rg0-2" instance used to belong to Pool 1, but now it belongs to Pool 0. + String existingPartitionsJson = + " {\n" + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n" + + " \"partitionToInstancesMap\": {\n" + " \"0_0\": [\n" + + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" + + " ],\n" + " \"0_1\": [\n" + + " \"Server_pinot-server-rg0-2.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" + + " ]\n" + " }\n" + " }\n"; + InstancePartitions existing = OBJECT_MAPPER.readValue(existingPartitionsJson, InstancePartitions.class); + InstanceReplicaGroupPartitionConfig config = + new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null); + + InstanceReplicaGroupPartitionSelector selector = + new InstanceReplicaGroupPartitionSelector(config, "tableNameBlah", existing, true); + + String[] serverNames = {"rg0-0", "rg0-1", "rg0-2", "rg1-0", "rg1-1", "rg1-2"}; + String[] poolNumbers = {"0", "0", "0", "1", "1", "1"}; + Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = new HashMap<>(); + + for (int i = 0; i < serverNames.length; i++) { + Map<String, String> valuesMap = new HashMap<>(); + valuesMap.put("serverName", serverNames[i]); + valuesMap.put("pool", poolNumbers[i]); + + StringSubstitutor substitutor = new StringSubstitutor(valuesMap); + String resolvedString = substitutor.replace(INSTANCE_CONFIG_TEMPLATE); + + ZNRecord znRecord = OBJECT_MAPPER.readValue(resolvedString, ZNRecord.class); + int poolNumber = Integer.parseInt(poolNumbers[i]); + poolToInstanceConfigsMap.computeIfAbsent(poolNumber, k -> new ArrayList<>()).add(new InstanceConfig(znRecord)); + } + + InstancePartitions assignedPartitions = new InstancePartitions("0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE"); + selector.selectInstances(poolToInstanceConfigsMap, assignedPartitions); + + // The "rg0-2" instance is replaced by "rg1-0" (which belongs to Pool 1), as "rg0-2" no longer belongs to Pool 1. + // And "rg1-0" remains the same position as it's always under Pool 1. + String expectedInstancePartitions = + " {\n" + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n" + + " \"partitionToInstancesMap\": {\n" + " \"0_0\": [\n" + + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" + + " ],\n" + " \"0_1\": [\n" + + " \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" + + " ]\n" + " }\n" + " }\n"; + InstancePartitions expectedPartitions = + OBJECT_MAPPER.readValue(expectedInstancePartitions, InstancePartitions.class); assert assignedPartitions.equals(expectedPartitions); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java index be18d35e50..45645387af 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java @@ -30,7 +30,11 @@ public class Pairs { } public static Comparator<IntPair> intPairComparator() { - return new AscendingIntPairComparator(); + return new AscendingIntPairComparator(true); + } + + public static Comparator<IntPair> intPairComparator(boolean ascending) { + return new AscendingIntPairComparator(ascending); } public static class IntPair { @@ -79,13 +83,26 @@ public class Pairs { } public static class AscendingIntPairComparator implements Comparator<IntPair> { + private boolean _ascending; + + public AscendingIntPairComparator(boolean ascending) { + _ascending = ascending; + } @Override public int compare(IntPair pair1, IntPair pair2) { if (pair1._left != pair2._left) { - return Integer.compare(pair1._left, pair2._left); + if (_ascending) { + return Integer.compare(pair1._left, pair2._left); + } else { + return Integer.compare(pair2._left, pair1._left); + } } else { - return Integer.compare(pair1._right, pair2._right); + if (_ascending) { + return Integer.compare(pair1._right, pair2._right); + } else { + return Integer.compare(pair2._right, pair1._right); + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org