This is an automated email from the ASF dual-hosted git repository. somandal pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 0f38fd7cd7 Modify strictReplicaGroup rebalance batching to categorize on current+target instances to partitionId to currentAssignment (#15838) 0f38fd7cd7 is described below commit 0f38fd7cd73198024b79d334b35a9829d68bfd3f Author: Sonam Mandal <sonam.man...@startree.ai> AuthorDate: Tue May 20 12:27:45 2025 -0700 Modify strictReplicaGroup rebalance batching to categorize on current+target instances to partitionId to currentAssignment (#15838) * Fix strictReplicaGroup rebalance batching to categorize on current+target instances to partitionId to currentAssignment * Address review comment: short-circuit return when rebalance batching is disabled --- .../helix/core/rebalance/TableRebalancer.java | 156 ++++++++----------- .../helix/core/rebalance/TableRebalancerTest.java | 167 ++++++++++++++++----- .../tests/TableRebalanceIntegrationTest.java | 1 + 3 files changed, 199 insertions(+), 125 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index f4e964ceb5..a89f62b41e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -1581,84 +1581,57 @@ public class TableRebalancer { Logger tableRebalanceLogger) { Map<String, Map<String, String>> nextAssignment = new TreeMap<>(); Map<String, Integer> numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); - Map<Integer, Map<Set<String>, Map<String, Map<String, String>>>> - partitionIdToAssignedInstancesToCurrentAssignmentMap; - if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { - // Don't calculate the partition id to assigned instances to current assignment mapping if batching is disabled - // since we want to update the next assignment based on all partitions in this case. Use partitionId as 0 - // and a dummy set for the assigned instances. - partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>(); - partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new HashMap<>()); - partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), currentAssignment); - } else { - partitionIdToAssignedInstancesToCurrentAssignmentMap = - getPartitionIdToAssignedInstancesToCurrentAssignmentMap(currentAssignment, segmentPartitionIdMap, - partitionIdFetcher); - } Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new HashMap<>(); Map<Set<String>, Set<String>> availableInstancesMap = new HashMap<>(); - Map<String, Integer> serverToNumSegmentsAddedSoFar = new HashMap<>(); - for (Map<Set<String>, Map<String, Map<String, String>>> assignedInstancesToCurrentAssignment - : partitionIdToAssignedInstancesToCurrentAssignmentMap.values()) { - boolean anyServerExhaustedBatchSize = false; - if (batchSizePerServer != RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { - // The number of segments for a given partition, accumulates as we iterate over the assigned instances - Map<String, Integer> serverToNumSegmentsToBeAddedForPartitionMap = new HashMap<>(); - - // Check if the servers of the first assignment for each unique set of assigned instances has any space left - // to move this partition. If so, let's mark the partitions as to be moved, otherwise we mark the partition - // as a whole as not moveable. - for (Map<String, Map<String, String>> curAssignment : assignedInstancesToCurrentAssignment.values()) { - Map.Entry<String, Map<String, String>> firstEntry = curAssignment.entrySet().iterator().next(); - // It is enough to check for whether any server for one segment is above the limit or not since all segments - // in curAssignment will have the same assigned instances list - Map<String, String> firstEntryInstanceStateMap = firstEntry.getValue(); - SingleSegmentAssignment firstAssignment = - getNextSingleSegmentAssignment(firstEntryInstanceStateMap, targetAssignment.get(firstEntry.getKey()), - minAvailableReplicas, lowDiskMode, numSegmentsToOffloadMap, assignmentMap); - Set<String> serversAdded = getServersAddedInSingleSegmentAssignment(firstEntryInstanceStateMap, - firstAssignment._instanceStateMap); - for (String server : serversAdded) { - // Case I: We already exceeded the batchSizePerServer for this server, cannot add any more segments - if (serverToNumSegmentsAddedSoFar.getOrDefault(server, 0) >= batchSizePerServer) { - anyServerExhaustedBatchSize = true; - break; - } - // All segments assigned to the current instances will be moved, so track segments to be added for the given - // server based on this - serverToNumSegmentsToBeAddedForPartitionMap.put(server, - serverToNumSegmentsToBeAddedForPartitionMap.getOrDefault(server, 0) + curAssignment.size()); - } - if (anyServerExhaustedBatchSize) { + if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Directly update the nextAssignment with anyServerExhaustedBatchSize = false and return if batching is disabled + updateNextAssignmentForPartitionIdStrictReplicaGroup(currentAssignment, targetAssignment, nextAssignment, + false, minAvailableReplicas, lowDiskMode, numSegmentsToOffloadMap, assignmentMap, + availableInstancesMap, serverToNumSegmentsAddedSoFar); + return nextAssignment; + } + + // Batching is enabled, calculate the Pair(current instances, target instances) -> partitionId -> currentAssignment + Map<Pair<Set<String>, Set<String>>, Map<Integer, Map<String, Map<String, String>>>> + currentAndTargetInstancesToPartitionIdToCurrentAssignmentMap = + getCurrentAndTargetInstancesToPartitionIdToCurrentAssignmentMap(currentAssignment, targetAssignment, + segmentPartitionIdMap, partitionIdFetcher); + + // Iterating over the unique pairs of current and target instances + for (Map<Integer, Map<String, Map<String, String>>> partitionIdToCurrentAssignment + : currentAndTargetInstancesToPartitionIdToCurrentAssignmentMap.values()) { + // Check if the servers of the first assignment for each unique partition has any space left to move the + // segments assigned to the partition and unique assigned instances as a whole. If so, let's mark the partitions + // as to be moved, otherwise we mark the partition as a whole as not moveable. + // Iterating over the partitionIds with the same unique pair of current and assigned instances + for (Map<String, Map<String, String>> curAssignment : partitionIdToCurrentAssignment.values()) { + Map.Entry<String, Map<String, String>> firstEntry = curAssignment.entrySet().iterator().next(); + // It is enough to check for whether any server for one segment is above the limit or not since all segments + // in curAssignment will have the same current and target instances and same partitionId + Map<String, String> firstEntryInstanceStateMap = firstEntry.getValue(); + SingleSegmentAssignment firstAssignment = + getNextSingleSegmentAssignment(firstEntryInstanceStateMap, targetAssignment.get(firstEntry.getKey()), + minAvailableReplicas, lowDiskMode, numSegmentsToOffloadMap, assignmentMap); + Set<String> serversAdded = getServersAddedInSingleSegmentAssignment(firstEntryInstanceStateMap, + firstAssignment._instanceStateMap); + boolean anyServerExhaustedBatchSize = false; + for (String server : serversAdded) { + int segmentsAddedToServerSoFar = serverToNumSegmentsAddedSoFar.getOrDefault(server, 0); + // Case I: We already exceeded the batchSizePerServer for this server, cannot add any more segments + // Case II: We have not yet exceeded the batchSizePerServer for this server, but we don't have sufficient + // space to host the segments for this assignment on the server, and we have allocated some partitions so + // far. If the batchSizePerServer is less than the number of segments in a given partitionId, we must host + // at least 1 partition and exceed the batchSizePerServer to ensure progress is made. Thus, performing this + // check only if segmentsAddedToServerSoFar > 0 is necessary. + if ((segmentsAddedToServerSoFar >= batchSizePerServer) + || (segmentsAddedToServerSoFar > 0 + && (segmentsAddedToServerSoFar + curAssignment.size()) > batchSizePerServer)) { + anyServerExhaustedBatchSize = true; break; } } - - // Case II: We have not yet exceeded the batchSizePerServer for any server, but we don't have sufficient - // space to host the segments for this assignment on some server, and we have allocated some partitions so - // far. If the batchSizePerServer is less than the number of segments in a given partitionId, we must host - // at least 1 partition and exceed the batchSizePerServer to ensure progress is made. Thus, performing this - // check only if segmentsAddedToServerSoFar > 0 is necessary. - if (!anyServerExhaustedBatchSize) { - for (Map.Entry<String, Integer> serverToNumSegmentsToAdd - : serverToNumSegmentsToBeAddedForPartitionMap.entrySet()) { - int segmentsAddedToServerSoFar = - serverToNumSegmentsAddedSoFar.getOrDefault(serverToNumSegmentsToAdd.getKey(), 0); - if (segmentsAddedToServerSoFar > 0 - && (segmentsAddedToServerSoFar + serverToNumSegmentsToAdd.getValue()) > batchSizePerServer) { - anyServerExhaustedBatchSize = true; - break; - } - } - } - } - // TODO: Consider whether we should process the nextAssignment for each unique assigned instances rather than the - // full partition to get a more granular number of segment moves in each step. For now since we expect - // strict replica groups to mostly be used for tables like upserts which require a full partition to be - // moved, we move a full partition at a time. - for (Map<String, Map<String, String>> curAssignment : assignedInstancesToCurrentAssignment.values()) { updateNextAssignmentForPartitionIdStrictReplicaGroup(curAssignment, targetAssignment, nextAssignment, anyServerExhaustedBatchSize, minAvailableReplicas, lowDiskMode, numSegmentsToOffloadMap, assignmentMap, availableInstancesMap, serverToNumSegmentsAddedSoFar); @@ -1728,8 +1701,7 @@ public class TableRebalancer { Map<String, Integer> serverToNumSegmentsAddedSoFar, Logger tableRebalanceLogger) { int maxSegmentsAddedToAnyServer = serverToNumSegmentsAddedSoFar.isEmpty() ? 0 : Collections.max(serverToNumSegmentsAddedSoFar.values()); - if (batchSizePerServer != RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER - && maxSegmentsAddedToAnyServer > batchSizePerServer) { + if (maxSegmentsAddedToAnyServer > batchSizePerServer) { tableRebalanceLogger.warn("Found at least one server with {} segments added which is larger than " + "batchSizePerServer: {}. This is expected for strictReplicaGroup based assignment that needs to move a " + "full partition to maintain consistency for queries.", maxSegmentsAddedToAnyServer, batchSizePerServer); @@ -1737,37 +1709,41 @@ public class TableRebalancer { } /** - * Create a mapping of partitionId to the mapping of assigned instances to the current assignment of segments that - * belong to that partitionId and assigned instances. This is to be used for batching purposes for StrictReplicaGroup - * routing, for all segment assignment types: RealtimeSegmentAssignment, StrictRealtimeSegmentAssignment and - * OfflineSegmentAssignment + * Create a mapping of Pair(currentInstances, targetInstances) to partitionId to the current assignment of segments. + * This is to be used for batching purposes for StrictReplicaGroup routing, for all segment assignment types: + * RealtimeSegmentAssignment, StrictRealtimeSegmentAssignment and OfflineSegmentAssignment * @param currentAssignment the current assignment + * @param targetAssignment the target assignment * @param segmentPartitionIdMap cache to store the partition ids to avoid fetching ZK segment metadata * @param partitionIdFetcher function to fetch the partition id - * @return a mapping from partitionId to the assigned instances to the segment assignment map of all segments that - * map to that partitionId and assigned instances + * @return a mapping from Pair(currentInstances, targetInstances) to the partitionId to the segment assignment map of + * all segments that fall in that category */ - private static Map<Integer, Map<Set<String>, Map<String, Map<String, String>>>> - getPartitionIdToAssignedInstancesToCurrentAssignmentMap(Map<String, Map<String, String>> currentAssignment, - Object2IntOpenHashMap<String> segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher) { - Map<Integer, Map<Set<String>, Map<String, Map<String, String>>>> - partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>(); + private static Map<Pair<Set<String>, Set<String>>, Map<Integer, Map<String, Map<String, String>>>> + getCurrentAndTargetInstancesToPartitionIdToCurrentAssignmentMap(Map<String, Map<String, String>> currentAssignment, + Map<String, Map<String, String>> targetAssignment, Object2IntOpenHashMap<String> segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { + Map<Pair<Set<String>, Set<String>>, Map<Integer, Map<String, Map<String, String>>>> + currentAndTargetInstancesToPartitionIdToCurrentAssignmentMap = new HashMap<>(); for (Map.Entry<String, Map<String, String>> assignment : currentAssignment.entrySet()) { String segmentName = assignment.getKey(); - Map<String, String> instanceStateMap = assignment.getValue(); - Collection<String> segmentStates = instanceStateMap.values(); + Map<String, String> currentInstanceStateMap = assignment.getValue(); + Map<String, String> targetInstanceStateMap = targetAssignment.get(segmentName); + Collection<String> segmentStates = currentInstanceStateMap.values(); boolean isConsuming = segmentStates.stream().noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) && segmentStates.stream().anyMatch(state -> state.equals(SegmentStateModel.CONSUMING)); int partitionId = segmentPartitionIdMap.computeIfAbsent(segmentName, v -> partitionIdFetcher.fetch(segmentName, isConsuming)); - Set<String> assignedInstances = instanceStateMap.keySet(); - partitionIdToAssignedInstancesToCurrentAssignmentMap.computeIfAbsent(partitionId, k -> new HashMap<>()) - .computeIfAbsent(assignedInstances, k -> new TreeMap<>()).put(segmentName, instanceStateMap); + Pair<Set<String>, Set<String>> currentAndTargetInstances = + Pair.of(currentInstanceStateMap.keySet(), targetInstanceStateMap.keySet()); + currentAndTargetInstancesToPartitionIdToCurrentAssignmentMap + .computeIfAbsent(currentAndTargetInstances, k -> new TreeMap<>()) + .computeIfAbsent(partitionId, k -> new TreeMap<>()).put(segmentName, currentInstanceStateMap); } - return partitionIdToAssignedInstancesToCurrentAssignmentMap; + return currentAndTargetInstancesToPartitionIdToCurrentAssignmentMap; } @VisibleForTesting diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java index efc0d50556..0f184908a1 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java @@ -1654,7 +1654,8 @@ public class TableRebalancerTest { // with batching with batchSizePerServer = 1: // // The first assignment will move "segment1" and "segment2" by one host at a time, and since the other segments - // if added will go beyond batchSizePerServer, they'll be picked up on the next two assignments (host4 and host2): + // if added will go beyond batchSizePerServer, they'll be picked up on the next two-three assignments + // (host4 and host2): // { // "segment__1__0__98347869999L": { // "host1": "ONLINE", @@ -1677,7 +1678,7 @@ public class TableRebalancerTest { // "host4": "ONLINE" // } // } - // Second Assignment (host6, host1, host4, and host5 get 1 segment each): + // Second Assignment (host6, host1, host4, and host5 get 1 segment each) - non-strictReplicaGroup: // { // "segment__1__0__98347869999L": { // "host2": "ONLINE", @@ -1700,8 +1701,55 @@ public class TableRebalancerTest { // "host5": "ONLINE" // } // } + // Second Assignment (host1, host6, and host5 get 1 segment each) - strictReplicaGroup: + // { + // "segment__1__0__98347869999L": { + // "host1": "ONLINE", + // "host2": "ONLINE", + // "host4": "ONLINE" + // }, + // "segment__2__0__98347869999L": { + // "host1": "ONLINE", + // "host4": "ONLINE", + // "host5": "ONLINE" + // }, + // "segment__3__0__98347869999L": { + // "host1": "ONLINE", + // "host2": "ONLINE", + // "host6": "ONLINE" + // }, + // "segment__4__0__98347869999L": { + // "host2": "ONLINE", + // "host4": "ONLINE", + // "host5": "ONLINE" + // } + // } // - // The third assignment should reach the target assignment + // The third assignment should reach the target assignment for non-strictReplicaGroup, and the fourth for + // strictReplicaGroup + // Third Assignment (host1 and host4 gets 1 segment each) - strictReplicaGroup: + // { + // "segment__1__0__98347869999L": { + // "host1": "ONLINE", + // "host2": "ONLINE", + // "host4": "ONLINE" + // }, + // "segment__2__0__98347869999L": { + // "host1": "ONLINE", + // "host4": "ONLINE", + // "host5": "ONLINE" + // }, + // "segment__3__0__98347869999L": { + // "host2": "ONLINE", + // "host4": "ONLINE", + // "host6": "ONLINE" + // }, + // "segment__4__0__98347869999L": { + // "host1": "ONLINE", + // "host4": "ONLINE", + // "host5": "ONLINE" + // } + // } for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) { Map<String, Map<String, String>> nextAssignment = TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, enableStrictReplicaGroup, false, @@ -1718,18 +1766,42 @@ public class TableRebalancerTest { nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, enableStrictReplicaGroup, false, 1, new Object2IntOpenHashMap<>(), SIMPLE_PARTITION_FETCHER); - assertEquals(nextAssignment.get("segment__1__0__98347869999L").keySet(), - new TreeSet<>(Arrays.asList("host2", "host4", "host6"))); - assertEquals(nextAssignment.get("segment__2__0__98347869999L").keySet(), - new TreeSet<>(Arrays.asList("host1", "host4", "host5"))); - assertEquals(nextAssignment.get("segment__3__0__98347869999L").keySet(), - new TreeSet<>(Arrays.asList("host1", "host2", "host4"))); - assertEquals(nextAssignment.get("segment__4__0__98347869999L").keySet(), - new TreeSet<>(Arrays.asList("host2", "host4", "host5"))); - - nextAssignment = - TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, enableStrictReplicaGroup, false, - 1, new Object2IntOpenHashMap<>(), SIMPLE_PARTITION_FETCHER); + if (!enableStrictReplicaGroup) { + assertEquals(nextAssignment.get("segment__1__0__98347869999L").keySet(), + new TreeSet<>(Arrays.asList("host2", "host4", "host6"))); + assertEquals(nextAssignment.get("segment__2__0__98347869999L").keySet(), + new TreeSet<>(Arrays.asList("host1", "host4", "host5"))); + assertEquals(nextAssignment.get("segment__3__0__98347869999L").keySet(), + new TreeSet<>(Arrays.asList("host1", "host2", "host4"))); + assertEquals(nextAssignment.get("segment__4__0__98347869999L").keySet(), + new TreeSet<>(Arrays.asList("host2", "host4", "host5"))); + nextAssignment = + TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, enableStrictReplicaGroup, false, + 1, new Object2IntOpenHashMap<>(), SIMPLE_PARTITION_FETCHER); + } else { + assertEquals(nextAssignment.get("segment__1__0__98347869999L").keySet(), + new TreeSet<>(Arrays.asList("host1", "host2", "host4"))); + assertEquals(nextAssignment.get("segment__2__0__98347869999L").keySet(), + new TreeSet<>(Arrays.asList("host1", "host4", "host5"))); + assertEquals(nextAssignment.get("segment__3__0__98347869999L").keySet(), + new TreeSet<>(Arrays.asList("host1", "host2", "host6"))); + assertEquals(nextAssignment.get("segment__4__0__98347869999L").keySet(), + new TreeSet<>(Arrays.asList("host2", "host4", "host5"))); + nextAssignment = + TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, enableStrictReplicaGroup, false, + 1, new Object2IntOpenHashMap<>(), SIMPLE_PARTITION_FETCHER); + assertEquals(nextAssignment.get("segment__1__0__98347869999L").keySet(), + new TreeSet<>(Arrays.asList("host1", "host2", "host4"))); + assertEquals(nextAssignment.get("segment__2__0__98347869999L").keySet(), + new TreeSet<>(Arrays.asList("host1", "host4", "host5"))); + assertEquals(nextAssignment.get("segment__3__0__98347869999L").keySet(), + new TreeSet<>(Arrays.asList("host2", "host4", "host6"))); + assertEquals(nextAssignment.get("segment__4__0__98347869999L").keySet(), + new TreeSet<>(Arrays.asList("host1", "host4", "host5"))); + nextAssignment = + TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, enableStrictReplicaGroup, false, + 1, new Object2IntOpenHashMap<>(), SIMPLE_PARTITION_FETCHER); + } assertEquals(nextAssignment, targetAssignment); } @@ -1790,27 +1862,27 @@ public class TableRebalancerTest { // Next assignment with 2 minimum available replicas with strict replica-group should finish in 2 steps even with // batchSizePerServer = 2: // - // The first assignment will bring "segment1" and "segment3" to the target state. It cannot bring "segment2" and - // "segment4" to the target state because "host1" and "host4" might be unavailable for strict replica-group routing, + // The first assignment will bring "segment2" and "segment4" to the target state. It cannot bring "segment1" and + // "segment3" to the target state because "host1" and "host4" might be unavailable for strict replica-group routing, // which breaks the minimum available replicas requirement: // { // "segment__1__0__98347869999L": { // "host1": "ONLINE", - // "host3": "ONLINE", - // "host4": "ONLINE" + // "host2": "ONLINE", + // "host3": "ONLINE" // }, // "segment__2__0__98347869999L": { - // "host2": "ONLINE", + // "host1": "ONLINE", // "host3": "ONLINE", // "host4": "ONLINE" // }, // "segment__3__0__98347869999L": { // "host1": "ONLINE", - // "host3": "ONLINE", - // "host4": "ONLINE" + // "host2": "ONLINE", + // "host3": "ONLINE" // }, // "segment__4__0__98347869999L": { - // "host2": "ONLINE", + // "host1": "ONLINE", // "host3": "ONLINE", // "host4": "ONLINE" // } @@ -1820,13 +1892,13 @@ public class TableRebalancerTest { nextAssignment = TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, true, false, 2, new Object2IntOpenHashMap<>(), SIMPLE_PARTITION_FETCHER); assertEquals(nextAssignment.get("segment__1__0__98347869999L").keySet(), - new TreeSet<>(Arrays.asList("host1", "host3", "host4"))); + new TreeSet<>(Arrays.asList("host1", "host2", "host3"))); assertEquals(nextAssignment.get("segment__2__0__98347869999L").keySet(), - new TreeSet<>(Arrays.asList("host2", "host3", "host4"))); - assertEquals(nextAssignment.get("segment__3__0__98347869999L").keySet(), new TreeSet<>(Arrays.asList("host1", "host3", "host4"))); + assertEquals(nextAssignment.get("segment__3__0__98347869999L").keySet(), + new TreeSet<>(Arrays.asList("host1", "host2", "host3"))); assertEquals(nextAssignment.get("segment__4__0__98347869999L").keySet(), - new TreeSet<>(Arrays.asList("host2", "host3", "host4"))); + new TreeSet<>(Arrays.asList("host1", "host3", "host4"))); nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, true, false, 2, new Object2IntOpenHashMap<>(), SIMPLE_PARTITION_FETCHER); assertEquals(nextAssignment, targetAssignment); @@ -1920,7 +1992,8 @@ public class TableRebalancerTest { // Next assignment with 2 minimum available replicas without strict replica-group should reach the target // assignment after two steps. With strict replica groups it should reach the target assignment immediately since - // the full partition must be selected for movement. Batch size = 1, unique partitionIds + // the full partition must be selected for movement for a given Pair(currentInstance, targetInstances). + // Batch size = 1, unique partitionIds for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) { Object2IntOpenHashMap<String> segmentToPartitionIdMap = new Object2IntOpenHashMap<>(); nextAssignment = @@ -1998,7 +2071,8 @@ public class TableRebalancerTest { // Next assignment with 2 minimum available replicas without strict replica-group should reach the target // assignment after three steps. With strict replica groups it should reach the target assignment in two steps since - // the full partition must be selected for movement. Batch size = 1, unique partitionIds + // the full partition must be selected for movement for a given Pair(currentInstance, targetInstances). + // Batch size = 1, unique partitionIds for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) { Object2IntOpenHashMap<String> segmentToPartitionIdMap = new Object2IntOpenHashMap<>(); nextAssignment = @@ -2165,7 +2239,8 @@ public class TableRebalancerTest { // Next assignment with 2 minimum available replicas without strict replica-group should reach the target // assignment after four steps. With strict replica groups it should reach the target assignment in two steps since - // the full partition must be selected for movement. Batch size = 1, unique partitionIds + // the full partition must be selected for movement for a given Pair(currentInstance, targetInstances). + // Batch size = 1, unique partitionIds for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) { Object2IntOpenHashMap<String> segmentToPartitionIdMap = new Object2IntOpenHashMap<>(); nextAssignment = @@ -2402,9 +2477,9 @@ public class TableRebalancerTest { assertEquals((int) numSegmentsToOffloadMap.get("host7"), 1); // Next assignment with 2 minimum available replicas without strict replica-group should reach the target - // assignment after three steps if strict replica group is disabled . With strict replica groups it should reach - // the target assignment in two steps since the full partition must be selected for movement. - // Batch size = 2, unique partitionIds + // assignment after three steps if strict replica group is enabled or disabled. Even for strictReplicaGroup, + // though the full Pair(currentAssignment, targetAssignment) + partitionId moves as a block, it can take longer + // to move since this can be a smaller granularity. Batch size = 2, unique partitionIds for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) { Object2IntOpenHashMap<String> segmentToPartitionIdMap = new Object2IntOpenHashMap<>(); nextAssignment = @@ -2457,6 +2532,28 @@ public class TableRebalancerTest { 2, segmentToPartitionIdMap, SIMPLE_PARTITION_FETCHER); } else { assertNotEquals(nextAssignment, targetAssignment); + assertEquals(nextAssignment.get("segment__1__0__98347869999L").keySet(), + new TreeSet<>(Arrays.asList("host1", "host2", "host3"))); + assertEquals(nextAssignment.get("segment__1__1__98347869999L").keySet(), + new TreeSet<>(Arrays.asList("host1", "host2", "host3"))); + assertEquals(nextAssignment.get("segment__1__2__98347869999L").keySet(), + new TreeSet<>(Arrays.asList("host1", "host3", "host5"))); + assertEquals(nextAssignment.get("segment__2__0__98347869999L").keySet(), + new TreeSet<>(Arrays.asList("host2", "host3", "host4"))); + assertEquals(nextAssignment.get("segment__2__1__98347869999L").keySet(), + new TreeSet<>(Arrays.asList("host2", "host3", "host4"))); + assertEquals(nextAssignment.get("segment__2__2__98347869999L").keySet(), + new TreeSet<>(Arrays.asList("host2", "host4", "host6"))); + assertEquals(nextAssignment.get("segment__3__0__98347869999L").keySet(), + new TreeSet<>(Arrays.asList("host1", "host2", "host3"))); + assertEquals(nextAssignment.get("segment__3__1__98347869999L").keySet(), + new TreeSet<>(Arrays.asList("host1", "host2", "host3"))); + assertEquals(nextAssignment.get("segment__3__2__98347869999L").keySet(), + new TreeSet<>(Arrays.asList("host1", "host3", "host5"))); + nextAssignment = + TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, enableStrictReplicaGroup, false, + 2, segmentToPartitionIdMap, SIMPLE_PARTITION_FETCHER); + assertNotEquals(nextAssignment, targetAssignment); assertEquals(nextAssignment.get("segment__1__0__98347869999L").keySet(), new TreeSet<>(Arrays.asList("host1", "host3", "host5"))); assertEquals(nextAssignment.get("segment__1__1__98347869999L").keySet(), @@ -2474,7 +2571,7 @@ public class TableRebalancerTest { assertEquals(nextAssignment.get("segment__3__1__98347869999L").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host3"))); assertEquals(nextAssignment.get("segment__3__2__98347869999L").keySet(), - new TreeSet<>(Arrays.asList("host1", "host3", "host6"))); + new TreeSet<>(Arrays.asList("host1", "host3", "host5"))); nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, enableStrictReplicaGroup, false, 2, segmentToPartitionIdMap, SIMPLE_PARTITION_FETCHER); @@ -2535,7 +2632,7 @@ public class TableRebalancerTest { assertEquals(nextAssignment.get("segment__3__1__98347869999L").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host3"))); assertEquals(nextAssignment.get("segment__3__2__98347869999L").keySet(), - new TreeSet<>(Arrays.asList("host1", "host3", "host6"))); + new TreeSet<>(Arrays.asList("host1", "host3", "host5"))); nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, enableStrictReplicaGroup, false, 2, segmentToPartitionIdMap, SIMPLE_PARTITION_FETCHER); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java index 69631e8fb1..424c0f24d8 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java @@ -73,6 +73,7 @@ public class TableRebalanceIntegrationTest extends HybridClusterIntegrationTest + "&bootstrap=" + rebalanceConfig.isBootstrap() + "&downtime=" + rebalanceConfig.isDowntime() + "&minAvailableReplicas=" + rebalanceConfig.getMinAvailableReplicas() + "&bestEfforts=" + rebalanceConfig.isBestEfforts() + + "&batchSizePerServer=" + rebalanceConfig.getBatchSizePerServer() + "&externalViewCheckIntervalInMs=" + rebalanceConfig.getExternalViewCheckIntervalInMs() + "&externalViewStabilizationTimeoutInMs=" + rebalanceConfig.getExternalViewStabilizationTimeoutInMs() + "&updateTargetTier=" + rebalanceConfig.isUpdateTargetTier() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org