This is an automated email from the ASF dual-hosted git repository. yashmayya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 9353f158b6 Add new ImplicitRealtimeTablePartitionSelector strategy for instance assignment (#15930) 9353f158b6 is described below commit 9353f158b6f59dbc8d19c626f173f8b99f34b1ce Author: Yash Mayya <yash.ma...@gmail.com> AuthorDate: Fri Jun 6 16:41:54 2025 +0100 Add new ImplicitRealtimeTablePartitionSelector strategy for instance assignment (#15930) --- .../ImplicitRealtimeTablePartitionSelector.java | 79 ++++++++ .../instance/InstanceAssignmentDriver.java | 22 +- .../instance/InstancePartitionSelectorFactory.java | 7 +- .../InstanceReplicaGroupPartitionSelector.java | 4 +- .../helix/core/rebalance/RebalanceResult.java | 8 + .../core/rebalance/RebalanceSummaryResult.java | 88 ++++++++ .../instance/InstanceAssignmentTest.java | 153 ++++++++++++++ .../TableRebalancerClusterStatelessTest.java | 221 ++++++++++++++++++++- .../tests/TableRebalanceIntegrationTest.java | 80 +++++++- .../segment/local/utils/TableConfigUtils.java | 30 +++ .../segment/local/utils/TableConfigUtilsTest.java | 71 +++++++ .../table/assignment/InstanceAssignmentConfig.java | 2 +- 12 files changed, 744 insertions(+), 21 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/ImplicitRealtimeTablePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/ImplicitRealtimeTablePartitionSelector.java new file mode 100644 index 0000000000..1cb12a725a --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/ImplicitRealtimeTablePartitionSelector.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment.instance; + +import com.google.common.annotations.VisibleForTesting; +import javax.annotation.Nullable; +import org.apache.pinot.common.assignment.InstancePartitions; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamMetadataProvider; +import org.apache.pinot.spi.utils.IngestionConfigUtils; + + +/** + * Variation of {@link InstanceReplicaGroupPartitionSelector} that uses the number of partitions from the stream + * to determine the number of partitions in each replica group. + */ +public class ImplicitRealtimeTablePartitionSelector extends InstanceReplicaGroupPartitionSelector { + private final int _numPartitions; + + public ImplicitRealtimeTablePartitionSelector(TableConfig tableConfig, + InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, String tableNameWithType, + @Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) { + this(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement, + // Get the number of partitions from the first stream config + // TODO: Revisit this logic to better handle multiple streams in the future - either validate that they + // all have the same number of partitions and use that or disallow the use of this selector in case the + // partition counts differ. + StreamConsumerFactoryProvider.create(IngestionConfigUtils.getFirstStreamConfig(tableConfig)) + .createStreamMetadataProvider( + ImplicitRealtimeTablePartitionSelector.class.getSimpleName() + "-" + tableNameWithType) + ); + } + + @VisibleForTesting + ImplicitRealtimeTablePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, + String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement, + StreamMetadataProvider streamMetadataProvider) { + super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement); + _numPartitions = getStreamNumPartitions(streamMetadataProvider); + } + + private int getStreamNumPartitions(StreamMetadataProvider streamMetadataProvider) { + try (streamMetadataProvider) { + return streamMetadataProvider.fetchPartitionCount(10_000L); + } catch (Exception e) { + throw new RuntimeException("Failed to retrieve partition info for table: " + _tableNameWithType, e); + } + } + + @Override + protected int getNumPartitions() { + return _numPartitions; + } + + @Override + protected int getNumInstancesPerPartition(int numInstancesPerReplicaGroup) { + // This partition selector should only be used for CONSUMING instance partitions, and we enforce a single instance + // per partition in this case. + return 1; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java index f76c9df6a4..04ce082fa6 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.controller.helix.core.assignment.instance; +import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -113,9 +114,26 @@ public class InstanceAssignmentDriver { // if existingInstancePartitions is null. boolean minimizeDataMovementFromTableConfig = instanceAssignmentConfig.isMinimizeDataMovement(); boolean minimizeDataMovement = minimizeDataMovementEnablement.isEnabled(minimizeDataMovementFromTableConfig); + InstancePartitionSelector instancePartitionSelector = + InstancePartitionSelectorFactory.getInstance(_tableConfig, instanceAssignmentConfig.getPartitionSelector(), + instanceAssignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType, existingInstancePartitions, + preConfiguredInstancePartitions, minimizeDataMovement); + LOGGER.info("Starting {} instance assignment for table: {} with minimizeDataMovement: {} (from table config: {}, " + "override: {})", instancePartitionsName, tableNameWithType, minimizeDataMovement, minimizeDataMovementFromTableConfig, minimizeDataMovementEnablement); + + return getInstancePartitions(instancePartitionsName, instanceAssignmentConfig, instanceConfigs, + existingInstancePartitions, minimizeDataMovement, instancePartitionSelector); + } + + @VisibleForTesting + InstancePartitions getInstancePartitions(String instancePartitionsName, + InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig> instanceConfigs, + @Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement, + InstancePartitionSelector instancePartitionSelector) { + String tableNameWithType = _tableConfig.getTableName(); + InstanceTagPoolSelector tagPoolSelector = new InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), tableNameWithType, minimizeDataMovement, existingInstancePartitions); @@ -132,10 +150,6 @@ public class InstanceAssignmentDriver { poolToInstanceConfigsMap = constraintApplier.applyConstraint(poolToInstanceConfigsMap); } - InstancePartitionSelector instancePartitionSelector = - InstancePartitionSelectorFactory.getInstance(instanceAssignmentConfig.getPartitionSelector(), - instanceAssignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType, existingInstancePartitions, - preConfiguredInstancePartitions, minimizeDataMovement); InstancePartitions instancePartitions = new InstancePartitions(instancePartitionsName); instancePartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions); return instancePartitions; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java index 8a343b1598..d038245913 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java @@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.assignment.instance; import java.util.Arrays; import javax.annotation.Nullable; import org.apache.pinot.common.assignment.InstancePartitions; +import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig; import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; @@ -30,7 +31,8 @@ public class InstancePartitionSelectorFactory { private InstancePartitionSelectorFactory() { } - public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector, + public static InstancePartitionSelector getInstance(TableConfig tableConfig, + InstanceAssignmentConfig.PartitionSelector partitionSelector, InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, String tableNameWithType, InstancePartitions existingInstancePartitions, @Nullable InstancePartitions preConfiguredInstancePartitions, boolean minimizeDataMovement) { @@ -44,6 +46,9 @@ public class InstancePartitionSelectorFactory { case MIRROR_SERVER_SET_PARTITION_SELECTOR: return new MirrorServerSetInstancePartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, preConfiguredInstancePartitions, minimizeDataMovement); + case IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR: + return new ImplicitRealtimeTablePartitionSelector(tableConfig, instanceReplicaGroupPartitionConfig, + tableNameWithType, existingInstancePartitions, minimizeDataMovement); default: throw new IllegalStateException("Unexpected PartitionSelector: " + partitionSelector + ", should be from" + Arrays.toString(InstanceAssignmentConfig.PartitionSelector.values())); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java index b8c19ede69..4309d67b67 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java @@ -248,7 +248,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele return numInstancesPerReplicaGroup; } - private int getNumPartitions() { + protected int getNumPartitions() { // Assign instances within a replica-group to one partition if not configured int numPartitions = _replicaGroupPartitionConfig.getNumPartitions(); if (numPartitions <= 0) { @@ -257,7 +257,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele return numPartitions; } - private int getNumInstancesPerPartition(int numInstancesPerReplicaGroup) { + protected int getNumInstancesPerPartition(int numInstancesPerReplicaGroup) { // Assign all instances within a replica-group to each partition if not configured int numInstancesPerPartition = _replicaGroupPartitionConfig.getNumInstancesPerPartition(); if (numInstancesPerPartition > 0) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java index f2737c265b..cfc38a8033 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java @@ -114,4 +114,12 @@ public class RebalanceResult { // UNKNOWN_ERROR if the job hits on an unexpected exception. NO_OP, DONE, FAILED, IN_PROGRESS, ABORTED, CANCELLED, UNKNOWN_ERROR } + + @Override + public String toString() { + return "RebalanceResult{" + "_jobId='" + _jobId + '\'' + ", _status=" + _status + ", _description='" + _description + + '\'' + ", _instanceAssignment=" + _instanceAssignment + ", _tierInstanceAssignment=" + + _tierInstanceAssignment + ", _segmentAssignment=" + _segmentAssignment + ", _preChecksResult=" + + _preChecksResult + ", _rebalanceSummaryResult=" + _rebalanceSummaryResult + '}'; + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java index a7c5da9abf..d99cb55c09 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java @@ -137,6 +137,19 @@ public class RebalanceSummaryResult { public List<String> getTagList() { return _tagList; } + + @Override + public String toString() { + return "ServerSegmentChangeInfo{" + + "_serverStatus=" + _serverStatus + + ", _totalSegmentsAfterRebalance=" + _totalSegmentsAfterRebalance + + ", _totalSegmentsBeforeRebalance=" + _totalSegmentsBeforeRebalance + + ", _segmentsAdded=" + _segmentsAdded + + ", _segmentsDeleted=" + _segmentsDeleted + + ", _segmentsUnchanged=" + _segmentsUnchanged + + ", _tagList=" + _tagList + + '}'; + } } public static class RebalanceChangeInfo { @@ -164,6 +177,14 @@ public class RebalanceSummaryResult { public int getExpectedValueAfterRebalance() { return _expectedValueAfterRebalance; } + + @Override + public String toString() { + return "RebalanceChangeInfo{" + + "_valueBeforeRebalance=" + _valueBeforeRebalance + + ", _expectedValueAfterRebalance=" + _expectedValueAfterRebalance + + '}'; + } } public static class TagInfo { @@ -221,6 +242,16 @@ public class RebalanceSummaryResult { public void increaseNumServerParticipants(int numServers) { _numServerParticipants += numServers; } + + @Override + public String toString() { + return "TagInfo{" + + "_tagName='" + _tagName + '\'' + + ", _numSegmentsUnchanged=" + _numSegmentsUnchanged + + ", _numSegmentsToDownload=" + _numSegmentsToDownload + + ", _numServerParticipants=" + _numServerParticipants + + '}'; + } } @JsonInclude(JsonInclude.Include.NON_NULL) @@ -295,6 +326,19 @@ public class RebalanceSummaryResult { public Map<String, ServerSegmentChangeInfo> getServerSegmentChangeInfo() { return _serverSegmentChangeInfo; } + + @Override + public String toString() { + return "ServerInfo{" + + "_numServersGettingNewSegments=" + _numServersGettingNewSegments + + ", _numServers=" + _numServers + + ", _serversAdded=" + _serversAdded + + ", _serversRemoved=" + _serversRemoved + + ", _serversUnchanged=" + _serversUnchanged + + ", _serversGettingNewSegments=" + _serversGettingNewSegments + + ", _serverSegmentChangeInfo=" + _serverSegmentChangeInfo + + '}'; + } } public static class ConsumingSegmentToBeMovedSummary { @@ -407,6 +451,26 @@ public class RebalanceSummaryResult { public int getTotalOffsetsToCatchUpAcrossAllConsumingSegments() { return _totalOffsetsToCatchUpAcrossAllConsumingSegments; } + + @Override + public String toString() { + return "ConsumingSegmentSummaryPerServer{" + + "_numConsumingSegmentsToBeAdded=" + _numConsumingSegmentsToBeAdded + + ", _totalOffsetsToCatchUpAcrossAllConsumingSegments=" + _totalOffsetsToCatchUpAcrossAllConsumingSegments + + '}'; + } + } + + @Override + public String toString() { + return "ConsumingSegmentToBeMovedSummary{" + + "_numConsumingSegmentsToBeMoved=" + _numConsumingSegmentsToBeMoved + + ", _numServersGettingConsumingSegmentsAdded=" + _numServersGettingConsumingSegmentsAdded + + ", _consumingSegmentsToBeMovedWithMostOffsetsToCatchUp=" + + _consumingSegmentsToBeMovedWithMostOffsetsToCatchUp + + ", _consumingSegmentsToBeMovedWithOldestAgeInMinutes=" + _consumingSegmentsToBeMovedWithOldestAgeInMinutes + + ", _serverConsumingSegmentSummary=" + _serverConsumingSegmentSummary + + '}'; } } @@ -501,6 +565,21 @@ public class RebalanceSummaryResult { public ConsumingSegmentToBeMovedSummary getConsumingSegmentToBeMovedSummary() { return _consumingSegmentToBeMovedSummary; } + + @Override + public String toString() { + return "SegmentInfo{" + + "_totalSegmentsToBeMoved=" + _totalSegmentsToBeMoved + + ", _totalSegmentsToBeDeleted=" + _totalSegmentsToBeDeleted + + ", _maxSegmentsAddedToASingleServer=" + _maxSegmentsAddedToASingleServer + + ", _estimatedAverageSegmentSizeInBytes=" + _estimatedAverageSegmentSizeInBytes + + ", _totalEstimatedDataToBeMovedInBytes=" + _totalEstimatedDataToBeMovedInBytes + + ", _replicationFactor=" + _replicationFactor + + ", _numSegmentsInSingleReplica=" + _numSegmentsInSingleReplica + + ", _numSegmentsAcrossAllReplicas=" + _numSegmentsAcrossAllReplicas + + ", _consumingSegmentToBeMovedSummary=" + _consumingSegmentToBeMovedSummary + + '}'; + } } public enum ServerStatus { @@ -509,4 +588,13 @@ public class RebalanceSummaryResult { // UNCHANGED if the server status is unchanged as part of rebalance; ADDED, REMOVED, UNCHANGED } + + @Override + public String toString() { + return "RebalanceSummaryResult{" + + "_serverInfo=" + _serverInfo + + ", _segmentInfo=" + _segmentInfo + + ", _tagsInfo=" + _tagsInfo + + '}'; + } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java index 8f8819831e..74badfb7fc 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java @@ -47,11 +47,15 @@ import org.apache.pinot.spi.config.table.assignment.InstanceConstraintConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig; +import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy; import org.apache.pinot.spi.utils.Enablement; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.annotations.Test; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.*; @@ -59,6 +63,7 @@ public class InstanceAssignmentTest { private static final String RAW_TABLE_NAME = "myTable"; private static final String TENANT_NAME = "tenant"; private static final String OFFLINE_TAG = TagNameUtils.getOfflineTagForTenant(TENANT_NAME); + private static final String REALTIME_TAG = TagNameUtils.getRealtimeTagForTenant(TENANT_NAME); private static final String SERVER_INSTANCE_ID_PREFIX = "Server_localhost_"; private static final String SERVER_INSTANCE_POOL_PREFIX = "_pool_"; private static final String TABLE_NAME_ZERO_HASH_COMPLEMENT = "12"; @@ -499,6 +504,154 @@ public class InstanceAssignmentTest { assertEquals(instancePartitions.getInstances(9, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 7)); } + @Test + public void testMinimizeDataMovementImplicitRealtimeTablePartitionSelector() { + int numReplicas = 2; + InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 0, 1, true, null); + InstanceAssignmentConfig instanceAssignmentConfig = + new InstanceAssignmentConfig(new InstanceTagPoolConfig(REALTIME_TAG, false, 0, null), null, + replicaGroupPartitionConfig, + InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR.name(), true); + TableConfig tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setServerTenant(TENANT_NAME) + .setNumReplicas(numReplicas) + .setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(), instanceAssignmentConfig)) + .build(); + + int numInstances = 12; + List<InstanceConfig> instanceConfigs = new ArrayList<>(numInstances); + for (int i = 0; i < numInstances; i++) { + InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i); + instanceConfig.addTag(REALTIME_TAG); + instanceConfigs.add(instanceConfig); + } + + // Start without existing InstancePartitions: + // Instances should be assigned to 2 replica-groups in a round-robin fashion, each with 6 instances. Then these 6 + // instances should be assigned to 6 partitions, each with 1 instances + int numPartitions = 6; + StreamMetadataProvider streamMetadataProvider = mock(StreamMetadataProvider.class); + when(streamMetadataProvider.fetchPartitionCount(anyLong())).thenReturn(numPartitions); + InstancePartitionSelector instancePartitionSelector = + new ImplicitRealtimeTablePartitionSelector(replicaGroupPartitionConfig, tableConfig.getTableName(), null, true, + streamMetadataProvider); + InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig); + InstancePartitions instancePartitions = + driver.getInstancePartitions(InstancePartitionsType.CONSUMING.getInstancePartitionsName(RAW_TABLE_NAME), + instanceAssignmentConfig, instanceConfigs, null, true, instancePartitionSelector); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas); + assertEquals(instancePartitions.getNumPartitions(), numPartitions); + + // Math.abs("myTable_REALTIME".hashCode()) % 12 = 0 + + // [i0, i1, i10, i11, i2, i3, i4, i5, i6, i7, i8, i9] + // r0 r1 r0 r1 r0 r1 r0 r1 r0 r1 r0 r1 + // r0: [i0, i10, i2, i4, i6, i8] + // p0 p1 p2 p3 p4 p5 + // + // r1: [i1, i11, i3, i5, i7, i9] + // p0 p1 p2 p3 p4 p5 + + assertEquals(instancePartitions.getInstances(0, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 0)); + assertEquals(instancePartitions.getInstances(0, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 1)); + assertEquals(instancePartitions.getInstances(1, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 10)); + assertEquals(instancePartitions.getInstances(1, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 11)); + assertEquals(instancePartitions.getInstances(2, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 2)); + assertEquals(instancePartitions.getInstances(2, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 3)); + assertEquals(instancePartitions.getInstances(3, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 4)); + assertEquals(instancePartitions.getInstances(3, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 5)); + assertEquals(instancePartitions.getInstances(4, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 6)); + assertEquals(instancePartitions.getInstances(4, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 7)); + assertEquals(instancePartitions.getInstances(5, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 8)); + assertEquals(instancePartitions.getInstances(5, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 9)); + + // Increase the number of partitions from 6 to 9. Expect no data movement for existing partitions. + numPartitions = 9; + when(streamMetadataProvider.fetchPartitionCount(anyLong())).thenReturn(numPartitions); + instancePartitionSelector = new ImplicitRealtimeTablePartitionSelector(replicaGroupPartitionConfig, + tableConfig.getTableName(), instancePartitions, true, streamMetadataProvider); + instancePartitions = driver.getInstancePartitions( + InstancePartitionsType.CONSUMING.getInstancePartitionsName(RAW_TABLE_NAME), instanceAssignmentConfig, + instanceConfigs, instancePartitions, true, instancePartitionSelector); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas); + assertEquals(instancePartitions.getNumPartitions(), numPartitions); + + // [i0, i1, i10, i11, i2, i3, i4, i5, i6, i7, i8, i9] + // r0 r1 r0 r1 r0 r1 r0 r1 r0 r1 r0 r1 + // r0: [i0, i10, i2, i4, i6, i8] + // p0 p1 p2 p3 p4 p5 + // p6 p7 p8 + // + // r1: [i1, i11, i3, i5, i7, i9] + // p0 p1 p2 p3 p4 p5 + // p6 p7 p8 + + assertEquals(instancePartitions.getInstances(0, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 0)); + assertEquals(instancePartitions.getInstances(0, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 1)); + assertEquals(instancePartitions.getInstances(1, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 10)); + assertEquals(instancePartitions.getInstances(1, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 11)); + assertEquals(instancePartitions.getInstances(2, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 2)); + assertEquals(instancePartitions.getInstances(2, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 3)); + assertEquals(instancePartitions.getInstances(3, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 4)); + assertEquals(instancePartitions.getInstances(3, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 5)); + assertEquals(instancePartitions.getInstances(4, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 6)); + assertEquals(instancePartitions.getInstances(4, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 7)); + assertEquals(instancePartitions.getInstances(5, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 8)); + assertEquals(instancePartitions.getInstances(5, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 9)); + assertEquals(instancePartitions.getInstances(6, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 0)); + assertEquals(instancePartitions.getInstances(6, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 1)); + assertEquals(instancePartitions.getInstances(7, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 10)); + assertEquals(instancePartitions.getInstances(7, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 11)); + assertEquals(instancePartitions.getInstances(8, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 2)); + assertEquals(instancePartitions.getInstances(8, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 3)); + + // Add 6 new instances + for (int i = numInstances; i < numInstances + 6; i++) { + InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i); + instanceConfig.addTag(REALTIME_TAG); + instanceConfigs.add(instanceConfig); + } + + instancePartitionSelector = new ImplicitRealtimeTablePartitionSelector(replicaGroupPartitionConfig, + tableConfig.getTableName(), instancePartitions, true, streamMetadataProvider); + instancePartitions = driver.getInstancePartitions( + InstancePartitionsType.CONSUMING.getInstancePartitionsName(RAW_TABLE_NAME), instanceAssignmentConfig, + instanceConfigs, instancePartitions, true, instancePartitionSelector); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas); + assertEquals(instancePartitions.getNumPartitions(), numPartitions); + + // We're using the minimize data movement based algorithm, so only the new partitions should be moved to the new + // instances, and the existing partition assignments should remain unchanged. + // + // [i0, i1, i10, i11, i2, i3, i4, i5, i6, i7, i8, i9] + // r0 r1 r0 r1 r0 r1 r0 r1 r0 r1 r0 r1 + // r0: [i0, i10, i2, i4, i6, i8, i12, i14, i16] + // p0 p1 p2 p3 p4 p5 p6 p7 p8 + // + // r1: [i1, i11, i3, i5, i7, i9, i13, i15, i17] + // p0 p1 p2 p3 p4 p5 p6 p7 p8 + + assertEquals(instancePartitions.getInstances(0, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 0)); + assertEquals(instancePartitions.getInstances(0, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 1)); + assertEquals(instancePartitions.getInstances(1, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 10)); + assertEquals(instancePartitions.getInstances(1, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 11)); + assertEquals(instancePartitions.getInstances(2, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 2)); + assertEquals(instancePartitions.getInstances(2, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 3)); + assertEquals(instancePartitions.getInstances(3, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 4)); + assertEquals(instancePartitions.getInstances(3, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 5)); + assertEquals(instancePartitions.getInstances(4, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 6)); + assertEquals(instancePartitions.getInstances(4, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 7)); + assertEquals(instancePartitions.getInstances(5, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 8)); + assertEquals(instancePartitions.getInstances(5, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 9)); + assertEquals(instancePartitions.getInstances(6, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 12)); + assertEquals(instancePartitions.getInstances(6, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 13)); + assertEquals(instancePartitions.getInstances(7, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 14)); + assertEquals(instancePartitions.getInstances(7, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 15)); + assertEquals(instancePartitions.getInstances(8, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 16)); + assertEquals(instancePartitions.getInstances(8, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 17)); + } + public void testMirrorServerSetBasedRandom() throws FileNotFoundException { testMirrorServerSetBasedRandomInner(10000000); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java index a25d178091..64cbff92c0 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java @@ -24,8 +24,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -76,6 +79,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME); private static final int NUM_REPLICAS = 3; private static final String SEGMENT_NAME_PREFIX = "segment_"; + private static final String PARTITION_COLUMN = "partitionColumn"; private static final String TIERED_TABLE_NAME = "testTable"; private static final String OFFLINE_TIERED_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(TIERED_TABLE_NAME); @@ -95,15 +99,15 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { addFakeBrokerInstancesToAutoJoinHelixCluster(1, true); } - /** - * Dropping instance from cluster requires waiting for live instance gone and removing instance related ZNodes, which - * are not the purpose of the test, so combine different rebalance scenarios into one test: - * 1. NO_OP rebalance - * 2. Add servers and rebalance - * 3. Migrate to replica-group based segment assignment and rebalance - * 4. Migrate back to non-replica-group based segment assignment and rebalance - * 5. Remove (disable) servers and rebalance - */ + /// + /// Dropping instance from cluster requires waiting for live instance gone and removing instance related ZNodes, which + /// are not the purpose of the test, so combine different rebalance scenarios into one test: + /// 1. NO_OP rebalance + /// 2. Add servers and rebalance + /// 3. Migrate to replica-group based segment assignment and rebalance + /// 4. Migrate back to non-replica-group based segment assignment and rebalance + /// 5. Remove (disable) servers and rebalance + /// @Test public void testRebalance() throws Exception { @@ -766,6 +770,205 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { } } + @Test + public void testRebalanceWithImplicitRealtimeTablePartitionSelectorAndMinimizeDataMovement() + throws Exception { + int numServers = 6; + int numPartitions = 18; + int numReplicas = 2; + + for (int i = 0; i < numServers; i++) { + String instanceId = SERVER_INSTANCE_ID_PREFIX + i; + addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true); + } + + InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 0, 1, false, null); + InstanceAssignmentConfig instanceAssignmentConfig = + new InstanceAssignmentConfig( + new InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(null), false, 0, null), null, + replicaGroupPartitionConfig, + InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR.name(), true); + TableConfig tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) + .setNumReplicas(numReplicas) + .setRoutingConfig( + new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false)) + .setStreamConfigs( + FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(numPartitions).getStreamConfigsMap()) + .setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(), instanceAssignmentConfig)) + .build(); + + // Create the table + addDummySchema(RAW_TABLE_NAME); + _helixResourceManager.addTable(tableConfig); + + // Add the segments + int numSegmentsPerPartition = 4; + for (int i = 0; i < numPartitions; i++) { + for (int j = 0; j < numSegmentsPerPartition; j++) { + _helixResourceManager.addNewSegment(REALTIME_TABLE_NAME, + SegmentMetadataMockUtils.mockSegmentMetadataWithPartitionInfo(RAW_TABLE_NAME, + SEGMENT_NAME_PREFIX + (i * numSegmentsPerPartition + j), PARTITION_COLUMN, i), null); + } + } + + Map<String, Map<String, String>> oldSegmentAssignment = + _helixResourceManager.getTableIdealState(REALTIME_TABLE_NAME).getRecord().getMapFields(); + for (Map.Entry<String, Map<String, String>> entry : oldSegmentAssignment.entrySet()) { + assertEquals(entry.getValue().size(), numReplicas); + } + + // Verify that segments are distributed equally across servers + Map<String, Integer> numSegmentsPerServer = getNumSegmentsPerServer(oldSegmentAssignment); + for (int i = 0; i < numServers; i++) { + String instanceId = SERVER_INSTANCE_ID_PREFIX + i; + assertTrue(numSegmentsPerServer.containsKey(instanceId)); + // Total number of segments is numReplicas * numPartitions * (numSegmentsPerPartition + 1) because of + // CONSUMING segments + assertEquals(numSegmentsPerServer.get(instanceId), + (numReplicas * numPartitions * (numSegmentsPerPartition + 1)) / numServers); + } + + TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, null, null, null); + // Rebalance should return NO_OP status since there has been no change + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); + + // All servers should be assigned to the table + Map<InstancePartitionsType, InstancePartitions> instanceAssignment = rebalanceResult.getInstanceAssignment(); + assertEquals(instanceAssignment.size(), 1); + InstancePartitions instancePartitions = instanceAssignment.get(InstancePartitionsType.CONSUMING); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas); + assertEquals(instancePartitions.getNumPartitions(), numPartitions); + + // Verify that replica partitions are distributed equally across servers + Map<String, Integer> numReplicaPartitionsPerServer = getNumReplicaPartitionsPerServer(instancePartitions); + for (int i = 0; i < numServers; i++) { + String instanceId = SERVER_INSTANCE_ID_PREFIX + i; + assertTrue(numReplicaPartitionsPerServer.containsKey(instanceId)); + // Total number of partitions is numReplicas * numPartitions + assertEquals(numReplicaPartitionsPerServer.get(instanceId), (numReplicas * numPartitions) / numServers); + } + + // Segment assignment should not change + assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment); + + // Add two new servers + int numServersToAdd = 2; + Set<String> newServers = new HashSet<>(); + for (int i = 0; i < numServersToAdd; i++) { + String instanceId = SERVER_INSTANCE_ID_PREFIX + (numServers + i); + addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true); + newServers.add(instanceId); + } + + // Check number of segments moved when minimizeDataMovement is not enabled + rebalanceConfig.setReassignInstances(true); + rebalanceConfig.setIncludeConsuming(true); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setMinimizeDataMovement(Enablement.DISABLE); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + // Most of the segments end up being moved when minimizeDataMovement is not enabled due to the round robin way in + // which partitions are assigned to instances (see InstanceReplicaGroupPartitionSelector) + assertEquals(rebalanceResult.getRebalanceSummaryResult().getSegmentInfo().getTotalSegmentsToBeMoved(), 130); + + // Rebalance with reassignInstances and minimizeDataMovement enabled + rebalanceConfig.setMinimizeDataMovement(Enablement.ENABLE); + rebalanceConfig.setDryRun(false); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + instanceAssignment = rebalanceResult.getInstanceAssignment(); + assertEquals(instanceAssignment.size(), 1); + instancePartitions = instanceAssignment.get(InstancePartitionsType.CONSUMING); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas); + assertEquals(instancePartitions.getNumPartitions(), numPartitions); + + // Get number of segments moved + int numSegmentsMoved = getNumSegmentsMoved(oldSegmentAssignment, rebalanceResult.getSegmentAssignment()); + // This number is 130 when using the default partition selector in this same scenario since more segment partitions + // will be moved when the instance partitions don't match the segment partitions (we're setting numPartitions to + // the default value of 0 in the table's instance assignment config). + assertEquals(numSegmentsMoved, 30); + + // "Repartition" and add two new partitions + int newNumPartitions = 20; + tableConfig.getIndexingConfig() + .setStreamConfigs( + FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(newNumPartitions).getStreamConfigsMap()); + _helixResourceManager.updateTableConfig(tableConfig); + + // Add segments for the new partitions + for (int i = numPartitions; i < newNumPartitions; i++) { + for (int j = 0; j < numSegmentsPerPartition; j++) { + _helixResourceManager.addNewSegment(REALTIME_TABLE_NAME, + SegmentMetadataMockUtils.mockSegmentMetadataWithPartitionInfo(RAW_TABLE_NAME, + SEGMENT_NAME_PREFIX + (i * numSegmentsPerPartition + j), PARTITION_COLUMN, i), null); + } + } + + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + + // Verify that the new partitions are assigned to the new servers. Due to the minimizeDataMovement algorithm, the + // previous rebalance resulted in the older servers having 5 partition replicas each with the newer ones having 3 + // partition replicas each. + instancePartitions = rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.CONSUMING); + for (int i = numPartitions; i < newNumPartitions; i++) { + for (int j = 0; j < numReplicas; j++) { + for (String instanceId : instancePartitions.getInstances(i, j)) { + assertTrue(newServers.contains(instanceId), + "Expected new partition " + i + " to be assigned to a new server, but found it on " + instanceId); + } + } + } + + _helixResourceManager.deleteRealtimeTable(RAW_TABLE_NAME); + + for (int i = 0; i < numServers + numServersToAdd; i++) { + stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i); + } + } + + private Map<String, Integer> getNumSegmentsPerServer(Map<String, Map<String, String>> segmentAssignment) { + Map<String, Integer> numSegmentsPerServer = new HashMap<>(); + for (Map<String, String> instanceStateMap : segmentAssignment.values()) { + for (String instanceId : instanceStateMap.keySet()) { + numSegmentsPerServer.merge(instanceId, 1, Integer::sum); + } + } + return numSegmentsPerServer; + } + + private Map<String, Integer> getNumReplicaPartitionsPerServer(InstancePartitions instancePartitions) { + Map<String, Integer> numPartitionsPerServer = new HashMap<>(); + for (int i = 0; i < instancePartitions.getNumReplicaGroups(); i++) { + for (int j = 0; j < instancePartitions.getNumPartitions(); j++) { + List<String> instances = instancePartitions.getInstances(j, i); + for (String instanceId : instances) { + numPartitionsPerServer.merge(instanceId, 1, Integer::sum); + } + } + } + return numPartitionsPerServer; + } + + private int getNumSegmentsMoved(Map<String, Map<String, String>> oldSegmentAssignment, + Map<String, Map<String, String>> newSegmentAssignment) { + int numSegmentsMoved = 0; + for (Map.Entry<String, Map<String, String>> entry : newSegmentAssignment.entrySet()) { + String segmentName = entry.getKey(); + Map<String, String> newInstanceStateMap = entry.getValue(); + Map<String, String> oldInstanceStateMap = oldSegmentAssignment.get(segmentName); + assertEquals(oldInstanceStateMap.size(), newInstanceStateMap.size()); + Set<String> commonInstances = new HashSet<>(newInstanceStateMap.keySet()); + commonInstances.retainAll(oldInstanceStateMap.keySet()); + numSegmentsMoved += newInstanceStateMap.size() - commonInstances.size(); + } + return numSegmentsMoved; + } + @Test public void testRebalanceBatchSizePerServerErrors() throws Exception { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java index 9c778908de..2f51fd6250 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.integration.tests; +import com.fasterxml.jackson.databind.JsonNode; import java.io.IOException; import java.net.URL; import java.util.ArrayList; @@ -25,6 +26,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.pinot.common.exception.HttpErrorStatusException; import org.apache.pinot.common.tier.TierFactory; import org.apache.pinot.common.utils.SimpleHttpResponse; import org.apache.pinot.common.utils.config.TagNameUtils; @@ -46,6 +48,7 @@ import org.apache.pinot.spi.config.table.TenantConfig; import org.apache.pinot.spi.config.table.TierConfig; import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig; import org.apache.pinot.spi.config.table.assignment.InstanceConstraintConfig; +import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig; import org.apache.pinot.spi.data.FieldSpec; @@ -58,10 +61,7 @@ import org.apache.pinot.util.TestUtils; import org.testng.Assert; import org.testng.annotations.Test; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertTrue; +import static org.testng.Assert.*; public class TableRebalanceIntegrationTest extends BaseHybridClusterIntegrationTest { @@ -88,6 +88,78 @@ public class TableRebalanceIntegrationTest extends BaseHybridClusterIntegrationT + "?type=" + tableType.toString() + "&" + getQueryString(rebalanceConfig); } + @Test + public void testImplicitRealtimeTableInstanceAssignment() throws Exception { + // Instance assignment not configured for the table initially, so INSTANCE_PARTITIONS should not exist. + assertThrows("404", IOException.class, + () -> sendGetRequest(getControllerBaseApiUrl() + "/tables/" + getTableName() + "/instancePartitions")); + + // Update table config with instance assignment config, use IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR to + // create partitions in the replica group based on the number of stream partitions. + TableConfig realtimeTableConfig = getTableConfigBuilder(TableType.REALTIME).build(); + realtimeTableConfig.setInstanceAssignmentConfigMap( + Map.of(InstancePartitionsType.CONSUMING.name(), new InstanceAssignmentConfig( + new InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(getServerTenant()), false, 0, null), null, + new InstanceReplicaGroupPartitionConfig(true, 0, 1, 0, 0, 0, true, null), + InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR.name(), true)) + ); + updateTableConfig(realtimeTableConfig); + + // Rebalance the table to reassign instances and create the INSTANCE_PARTITIONS. + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setReassignInstances(true); + rebalanceConfig.setMinAvailableReplicas(-1); + rebalanceConfig.setIncludeConsuming(true); + sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + + // We're using IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR based instance assignment for this table. + // This test verifies that INSTANCE_PARTITIONS is written to ZK after instance assignment in the rebalance and has + // the expected number of partitions. + + TestUtils.waitForCondition( + aVoid -> { + try { + sendGetRequest(getControllerBaseApiUrl() + "/tables/" + getTableName() + "/instancePartitions"); + } catch (Exception e) { + return false; + } + return true; + }, 10_000, "Expected INSTANCE_PARTITIONS to be created for table after instance assignment in rebalance" + ); + + JsonNode instancePartitions = JsonUtils.stringToJsonNode( + sendGetRequest(getControllerBaseApiUrl() + "/tables/" + getTableName() + "/instancePartitions")); + + assertNotNull(instancePartitions); + assertEquals(instancePartitions.size(), 1); + + JsonNode partitionToInstancesMap = + instancePartitions.get(InstancePartitionsType.CONSUMING.name()).get("partitionToInstancesMap"); + + assertEquals(partitionToInstancesMap.size(), getNumKafkaPartitions()); // single replica group + for (int i = 0; i < getNumKafkaPartitions(); i++) { + assertNotNull(partitionToInstancesMap.get(i + "_0")); // partition i, replica group 0 + } + + // Reset the table config and rebalance + updateTableConfig(getTableConfigBuilder(TableType.REALTIME).build()); + sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + + TestUtils.waitForCondition( + aVoid -> { + try { + sendGetRequest(getControllerBaseApiUrl() + "/tables/" + getTableName() + "/instancePartitions"); + } catch (Exception e) { + return e.getCause() instanceof HttpErrorStatusException + && ((HttpErrorStatusException) e.getCause()).getStatusCode() == 404; + } + return false; + }, 10_000, + "Expected INSTANCE_PARTITIONS to be deleted for table after removing instance assignment configs and " + + "rebalancing" + ); + } + @Test public void testRealtimeRebalancePreCheckMinimizeDataMovement() throws Exception { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index decda17d89..c4c08710f9 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -170,6 +170,7 @@ public final class TableConfigUtils { validateFieldConfigList(tableConfig, schema); validateInstancePartitionsTypeMapConfig(tableConfig); validatePartitionedReplicaGroupInstance(tableConfig); + validateInstanceAssignmentConfigs(tableConfig); if (!skipTypes.contains(ValidationType.UPSERT)) { validateUpsertAndDedupConfig(tableConfig, schema); validatePartialUpsertStrategies(tableConfig, schema); @@ -913,6 +914,35 @@ public final class TableConfigUtils { } } + @VisibleForTesting + static void validateInstanceAssignmentConfigs(TableConfig tableConfig) { + if (tableConfig.getInstanceAssignmentConfigMap() == null) { + return; + } + for (Map.Entry<String, InstanceAssignmentConfig> instanceAssignmentConfigMapEntry + : tableConfig.getInstanceAssignmentConfigMap().entrySet()) { + String instancePartitionsType = instanceAssignmentConfigMapEntry.getKey(); + InstanceAssignmentConfig instanceAssignmentConfig = instanceAssignmentConfigMapEntry.getValue(); + if (instanceAssignmentConfig.getPartitionSelector() + == InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR) { + Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, + "IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR can only be used for REALTIME tables"); + Preconditions.checkState(InstancePartitionsType.CONSUMING.name().equalsIgnoreCase(instancePartitionsType), + "IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR can only be used for CONSUMING instance partitions type"); + Preconditions.checkState(instanceAssignmentConfig.getReplicaGroupPartitionConfig().isReplicaGroupBased(), + "IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR can only be used with replica group based instance assignment"); + Preconditions.checkState(instanceAssignmentConfig.getReplicaGroupPartitionConfig().getNumPartitions() == 0, + "numPartitions should not be explicitly set when using IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR"); + // Allow 0 because that's the default (unset) value. + Preconditions.checkState( + instanceAssignmentConfig.getReplicaGroupPartitionConfig().getNumInstancesPerPartition() == 0 + || instanceAssignmentConfig.getReplicaGroupPartitionConfig().getNumInstancesPerPartition() == 1, + "numInstancesPerPartition must be 1 when using IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR"); + } + // TODO: Add more validations for other partition selectors here + } + } + /** * Validates metrics aggregation when upsert config is enabled * - Metrics aggregation cannot be enabled when Upsert Config is enabled. diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index 5d51854392..10eab60abf 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -76,7 +76,10 @@ import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; /** @@ -2570,6 +2573,74 @@ public class TableConfigUtilsTest { } } + @Test + public void testValidateImplicitRealtimeTablePartitionSelectorConfigs() { + InstanceAssignmentConfig instanceAssignmentConfig = Mockito.mock(InstanceAssignmentConfig.class); + when(instanceAssignmentConfig.getPartitionSelector()).thenReturn( + InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR); + + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) + .setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(), instanceAssignmentConfig)) + .build(); + IllegalStateException e = expectThrows(IllegalStateException.class, + () -> TableConfigUtils.validateInstanceAssignmentConfigs(tableConfig)); + assertTrue( + e.getMessage().contains("IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR can only be used for REALTIME tables")); + + InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig = + Mockito.mock(InstanceReplicaGroupPartitionConfig.class); + when(instanceReplicaGroupPartitionConfig.isReplicaGroupBased()).thenReturn(true); + when(instanceAssignmentConfig.getReplicaGroupPartitionConfig()).thenReturn(instanceReplicaGroupPartitionConfig); + TableConfig tableConfig2 = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.COMPLETED.name(), instanceAssignmentConfig)) + .build(); + e = expectThrows(IllegalStateException.class, + () -> TableConfigUtils.validateInstanceAssignmentConfigs(tableConfig2)); + assertTrue(e.getMessage() + .contains( + "IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR can only be used for CONSUMING instance partitions type")); + + when(instanceReplicaGroupPartitionConfig.isReplicaGroupBased()).thenReturn(false); + when(instanceAssignmentConfig.getReplicaGroupPartitionConfig()).thenReturn(instanceReplicaGroupPartitionConfig); + TableConfig tableConfig3 = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(), instanceAssignmentConfig)) + .build(); + e = expectThrows(IllegalStateException.class, + () -> TableConfigUtils.validateInstanceAssignmentConfigs(tableConfig3)); + assertTrue(e.getMessage() + .contains( + "IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR can only be used with replica group based instance " + + "assignment")); + + when(instanceReplicaGroupPartitionConfig.isReplicaGroupBased()).thenReturn(true); + when(instanceReplicaGroupPartitionConfig.getNumPartitions()).thenReturn(1); + TableConfig tableConfig4 = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(), instanceAssignmentConfig)) + .build(); + e = expectThrows(IllegalStateException.class, + () -> TableConfigUtils.validateInstanceAssignmentConfigs(tableConfig4)); + assertTrue(e.getMessage() + .contains("numPartitions should not be explicitly set when using IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR")); + + when(instanceReplicaGroupPartitionConfig.isReplicaGroupBased()).thenReturn(true); + when(instanceReplicaGroupPartitionConfig.getNumPartitions()).thenReturn(0); + when(instanceReplicaGroupPartitionConfig.getNumInstancesPerPartition()).thenReturn(2); + TableConfig tableConfig5 = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(), instanceAssignmentConfig)) + .build(); + e = expectThrows(IllegalStateException.class, + () -> TableConfigUtils.validateInstanceAssignmentConfigs(tableConfig5)); + assertTrue(e.getMessage() + .contains("numInstancesPerPartition must be 1 when using IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR")); + + when(instanceReplicaGroupPartitionConfig.getNumPartitions()).thenReturn(0); + when(instanceReplicaGroupPartitionConfig.getNumInstancesPerPartition()).thenReturn(0); + TableConfig tableConfig6 = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(), instanceAssignmentConfig)) + .build(); + TableConfigUtils.validateInstanceAssignmentConfigs(tableConfig6); + } + private Map<String, String> getStreamConfigs() { Map<String, String> streamConfigs = new HashMap<>(); streamConfigs.put("streamType", "kafka"); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java index ad4b22ecaf..973cf368c7 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java @@ -87,6 +87,6 @@ public class InstanceAssignmentConfig extends BaseJsonConfig { public enum PartitionSelector { FD_AWARE_INSTANCE_PARTITION_SELECTOR, INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR, - MIRROR_SERVER_SET_PARTITION_SELECTOR + MIRROR_SERVER_SET_PARTITION_SELECTOR, IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org