This is an automated email from the ASF dual-hosted git repository. sajjad pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 5d54386ba4 Fix bug in segment rebalance with replica group segment assignment (#8598) 5d54386ba4 is described below commit 5d54386ba46a566cd7085f5dcda33c940fd5fc84 Author: Sajjad Moradi <moradi.saj...@gmail.com> AuthorDate: Wed Apr 27 18:06:30 2022 -0700 Fix bug in segment rebalance with replica group segment assignment (#8598) Segment rebalance with replica group segment assignment is currently done in a partition-by-partition way which leads to an imbalanced assignment. This PR fixes the issue by grouping all partitions belonging to each instancePartition and then reassigning each group of partitions separately. --- .../segment/OfflineSegmentAssignment.java | 10 ++++--- .../segment/RealtimeSegmentAssignment.java | 11 +++++--- .../assignment/segment/SegmentAssignmentUtils.java | 15 +++++------ .../OfflineReplicaGroupSegmentAssignmentTest.java | 31 ++++++++++++++++++---- .../RealtimeReplicaGroupSegmentAssignmentTest.java | 2 +- 5 files changed, 47 insertions(+), 22 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java index 0b3c9fdc6b..2ebf5e4b78 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java @@ -281,22 +281,24 @@ public class OfflineSegmentAssignment implements SegmentAssignment { for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) { segmentZKMetadataMap.put(segmentZKMetadata.getSegmentName(), segmentZKMetadata); } - Map<Integer, List<String>> partitionIdToSegmentsMap = new HashMap<>(); + int numPartitions = instancePartitions.getNumPartitions(); + Map<Integer, List<String>> instancePartitionIdToSegmentsMap = new HashMap<>(); for (String segmentName : currentAssignment.keySet()) { int partitionId = getPartitionId(segmentZKMetadataMap.get(segmentName)); - partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(segmentName); + int instancePartitionId = partitionId % numPartitions; + instancePartitionIdToSegmentsMap.computeIfAbsent(instancePartitionId, k -> new ArrayList<>()).add(segmentName); } // NOTE: Shuffle the segments within the current assignment to avoid moving only new segments to the new added // servers, which might cause hotspot servers because queries tend to hit the new segments. Use the table // name hash as the random seed for the shuffle so that the result is deterministic. Random random = new Random(_offlineTableName.hashCode()); - for (List<String> segments : partitionIdToSegmentsMap.values()) { + for (List<String> segments : instancePartitionIdToSegmentsMap.values()) { Collections.shuffle(segments, random); } return SegmentAssignmentUtils - .rebalanceReplicaGroupBasedTable(currentAssignment, instancePartitions, partitionIdToSegmentsMap); + .rebalanceReplicaGroupBasedTable(currentAssignment, instancePartitions, instancePartitionIdToSegmentsMap); } private int getPartitionId(SegmentZKMetadata segmentZKMetadata) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java index ac31ebca21..c97b53e75c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java @@ -328,23 +328,26 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { } else { // Replica-group based assignment - Map<Integer, List<String>> partitionGroupIdToSegmentsMap = new HashMap<>(); + int numPartitions = instancePartitions.getNumPartitions(); + Map<Integer, List<String>> instancePartitionIdToSegmentsMap = new HashMap<>(); for (String segmentName : currentAssignment.keySet()) { int partitionGroupId = SegmentUtils .getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, _helixManager, _partitionColumn); - partitionGroupIdToSegmentsMap.computeIfAbsent(partitionGroupId, k -> new ArrayList<>()).add(segmentName); + int instancePartitionId = partitionGroupId % numPartitions; + instancePartitionIdToSegmentsMap.computeIfAbsent(instancePartitionId, k -> new ArrayList<>()) + .add(segmentName); } // NOTE: Shuffle the segments within the current assignment to avoid moving only new segments to the new added // servers, which might cause hotspot servers because queries tend to hit the new segments. Use the table // name hash as the random seed for the shuffle so that the result is deterministic. Random random = new Random(_realtimeTableName.hashCode()); - for (List<String> segments : partitionGroupIdToSegmentsMap.values()) { + for (List<String> segments : instancePartitionIdToSegmentsMap.values()) { Collections.shuffle(segments, random); } newAssignment = SegmentAssignmentUtils - .rebalanceReplicaGroupBasedTable(currentAssignment, instancePartitions, partitionGroupIdToSegmentsMap); + .rebalanceReplicaGroupBasedTable(currentAssignment, instancePartitions, instancePartitionIdToSegmentsMap); } } return newAssignment; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java index e27983e8b3..8db3416b93 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java @@ -157,20 +157,19 @@ public class SegmentAssignmentUtils { } /** - * Rebalances the table for the replica-group based segment assignment strategy. - * <p>The number of partitions for the segments can be different from the number of partitions in the instance - * partitions. Uniformly spray the segment partitions over the instance partitions. + * Rebalances the table for the replica-group based segment assignment strategy by uniformly spraying group of + * segments belonging to each instancePartitionId to the instances of that instance partition. */ public static Map<String, Map<String, String>> rebalanceReplicaGroupBasedTable( Map<String, Map<String, String>> currentAssignment, InstancePartitions instancePartitions, - Map<Integer, List<String>> partitionIdToSegmentsMap) { + Map<Integer, List<String>> instancePartitionIdToSegmentsMap) { Map<String, Map<String, String>> newAssignment = new TreeMap<>(); - int numPartitions = instancePartitions.getNumPartitions(); - for (Map.Entry<Integer, List<String>> entry : partitionIdToSegmentsMap.entrySet()) { + for (Map.Entry<Integer, List<String>> entry : instancePartitionIdToSegmentsMap.entrySet()) { // Uniformly spray the segment partitions over the instance partitions - int partitionId = entry.getKey() % numPartitions; + int instancePartitionId = entry.getKey(); + List<String> segments = entry.getValue(); SegmentAssignmentUtils - .rebalanceReplicaGroupBasedPartition(currentAssignment, instancePartitions, partitionId, entry.getValue(), + .rebalanceReplicaGroupBasedPartition(currentAssignment, instancePartitions, instancePartitionId, segments, newAssignment); } return newAssignment; diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java index aa4c8c7f66..62bcf70b11 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.controller.helix.core.assignment.segment; +import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -58,7 +59,7 @@ import static org.testng.Assert.assertTrue; public class OfflineReplicaGroupSegmentAssignmentTest { private static final int NUM_REPLICAS = 3; private static final String SEGMENT_NAME_PREFIX = "segment_"; - private static final int NUM_SEGMENTS = 90; + private static final int NUM_SEGMENTS = 12; private static final List<String> SEGMENTS = SegmentAssignmentTestUtils.getNameList(SEGMENT_NAME_PREFIX, NUM_SEGMENTS); private static final String INSTANCE_NAME_PREFIX = "instance_"; @@ -248,13 +249,11 @@ public class OfflineReplicaGroupSegmentAssignmentTest { .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE)); } - // There should be 90 segments assigned assertEquals(currentAssignment.size(), NUM_SEGMENTS); // Each segment should have 3 replicas for (Map<String, String> instanceStateMap : currentAssignment.values()) { assertEquals(instanceStateMap.size(), NUM_REPLICAS); } - // Each instance should have 15 segments assigned int[] numSegmentsAssignedPerInstance = SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, INSTANCES); int[] expectedNumSegmentsAssignedPerInstance = new int[NUM_INSTANCES]; @@ -278,13 +277,11 @@ public class OfflineReplicaGroupSegmentAssignmentTest { .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE)); } - // There should be 90 segments assigned assertEquals(currentAssignment.size(), NUM_SEGMENTS); // Each segment should have 3 replicas for (Map<String, String> instanceStateMap : currentAssignment.values()) { assertEquals(instanceStateMap.size(), NUM_REPLICAS); } - // Each instance should have 15 segments assigned int[] numSegmentsAssignedPerInstance = SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, INSTANCES); int[] expectedNumSegmentsAssignedPerInstance = new int[NUM_INSTANCES]; @@ -356,4 +353,28 @@ public class OfflineReplicaGroupSegmentAssignmentTest { } } } + + @Test + public void testRebalanceTableWithPartitionColumnAndInstancePartitionsMapWithOnePartition() { + // make an unbalanced assignment by assigning all segments to the first three instances + String instance0 = INSTANCE_NAME_PREFIX + "0"; + String instance1 = INSTANCE_NAME_PREFIX + "1"; + String instance2 = INSTANCE_NAME_PREFIX + "2"; + Map<String, Map<String, String>> unbalancedAssignment = new TreeMap<>(); + SEGMENTS.forEach(segName -> + unbalancedAssignment.put(segName, ImmutableMap.of( + instance0, SegmentStateModel.ONLINE, + instance1, SegmentStateModel.ONLINE, + instance2, SegmentStateModel.ONLINE + )) + ); + Map<String, Map<String, String>> balancedAssignment = _segmentAssignmentWithPartition + .rebalanceTable(unbalancedAssignment, _instancePartitionsMapWithoutPartition, null, null, + new BaseConfiguration()); + int[] actualNumSegmentsAssignedPerInstance = + SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(balancedAssignment, INSTANCES); + int[] expectedNumSegmentsAssignedPerInstance = new int[NUM_INSTANCES]; + Arrays.fill(expectedNumSegmentsAssignedPerInstance, NUM_SEGMENTS * NUM_REPLICAS / NUM_INSTANCES); + assertEquals(actualNumSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); + } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java index a82d10db04..09cd07bc9e 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java @@ -43,7 +43,7 @@ import static org.testng.Assert.assertTrue; public class RealtimeReplicaGroupSegmentAssignmentTest { private static final int NUM_REPLICAS = 3; private static final int NUM_PARTITIONS = 4; - private static final int NUM_SEGMENTS = 100; + private static final int NUM_SEGMENTS = 24; private static final String CONSUMING_INSTANCE_NAME_PREFIX = "consumingInstance_"; private static final int NUM_CONSUMING_INSTANCES = 9; private static final List<String> CONSUMING_INSTANCES = --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org