This is an automated email from the ASF dual-hosted git repository. xbli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 6372d78d98 make dedup table use strict replica group assignment too (#15778) 6372d78d98 is described below commit 6372d78d98fd391f62670a7d324b99f83c946bd6 Author: Xiaobing <61892277+klsi...@users.noreply.github.com> AuthorDate: Thu May 15 12:58:43 2025 -0700 make dedup table use strict replica group assignment too (#15778) * make dedup table use strict replica group assignment too * refine logs and add TODO for tiers support for StrictRealtimeSegmentAssignment --- .../segment/SegmentAssignmentFactory.java | 5 +- .../segment/StrictRealtimeSegmentAssignment.java | 23 ++++---- .../StrictRealtimeSegmentAssignmentTest.java | 69 ++++++++++++---------- 3 files changed, 55 insertions(+), 42 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java index e32a7246b2..7e37026438 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java @@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.assignment.segment; import javax.annotation.Nullable; import org.apache.helix.HelixManager; import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.spi.config.table.DedupConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.UpsertConfig; @@ -40,7 +41,9 @@ public class SegmentAssignmentFactory { segmentAssignment = new OfflineSegmentAssignment(); } else { UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); - if (upsertConfig != null && upsertConfig.getMode() != UpsertConfig.Mode.NONE) { + DedupConfig dedupConfig = tableConfig.getDedupConfig(); + if ((upsertConfig != null && upsertConfig.getMode() != UpsertConfig.Mode.NONE) || (dedupConfig != null + && dedupConfig.isDedupEnabled())) { segmentAssignment = new StrictRealtimeSegmentAssignment(); } else { segmentAssignment = new RealtimeSegmentAssignment(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java index 2bc0ada5bd..6c76332a24 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java @@ -37,7 +37,7 @@ import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateM /** - * Segment assignment for LLC real-time table using upsert. The assignSegment() of RealtimeSegmentAssignment is + * Segment assignment for LLC real-time table using upsert or dedup. The assignSegment() of RealtimeSegmentAssignment is * overridden to add new segment for a table partition in a way that's consistent with the assignment in idealState to * make sure that at any time the segments from the same table partition is hosted by the same server. * <ul> @@ -47,12 +47,13 @@ import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateM * InstancePartition and the one in idealState are different, the one in idealState must be used so that segments * from the same table partition are always hosted on the same server as set in current idealState. If the * idealState is not honored, segments from the same table partition may be assigned to different servers, - * breaking the key assumption for queries to be correct for the table using upsert. + * breaking the key assumption for queries to be correct for the table using upsert or dedup. * </li> * <li> - * There is no need to handle COMPLETED segments for tables using upsert, because their completed segments should - * not be relocated to servers tagged to host COMPLETED segments. Basically, upsert-enabled tables can only use - * servers tagged for CONSUMING segments to host both consuming and completed segments from a table partition. + * There is no need to handle COMPLETED segments for tables using upsert or dedup, because their completed + * segments should not be relocated to servers tagged to host COMPLETED segments. Basically, tables using upsert + * or dedup can only use servers tagged for CONSUMING segments to host both consuming and completed segments from + * a table partition. * </li> * </ul> */ @@ -63,9 +64,9 @@ public class StrictRealtimeSegmentAssignment extends RealtimeSegmentAssignment { // 1. This cache is used for table rebalance only, but not segment assignment. During rebalance, rebalanceTable() can // be invoked multiple times when the ideal state changes during the rebalance process. // 2. The cache won't be refreshed when an existing segment is replaced with a segment from a different partition. - // Replacing a segment with a segment from a different partition should not be allowed for upsert table because it - // will cause the segment being served by the wrong servers. If this happens during the table rebalance, another - // rebalance might be needed to fix the assignment. + // Replacing a segment with a segment from a different partition should not be allowed for upsert/dedup table + // because it will cause the segment being served by the wrong servers. If this happens during the table + // rebalance, another rebalance might be needed to fix the assignment. private final Object2IntOpenHashMap<String> _segmentPartitionIdMap = new Object2IntOpenHashMap<>(); @Override @@ -163,8 +164,10 @@ public class StrictRealtimeSegmentAssignment extends RealtimeSegmentAssignment { Preconditions.checkState(instancePartitions != null, "Failed to find CONSUMING instance partitions for table: %s", _tableNameWithType); Preconditions.checkArgument(config.isIncludeConsuming(), - "Consuming segment must be included when rebalancing upsert table: %s", _tableNameWithType); - Preconditions.checkState(sortedTiers == null, "Tiers must not be specified for upsert table: %s", + "Consuming segment must be included when rebalancing upsert/dedup table: %s", _tableNameWithType); + // TODO: consider to support tiers for segments out of metadata TTL for upsert/dedup table, as those segments + // won't be included in the upsert/dedup metadata as kept on CONSUMING tier. + Preconditions.checkState(sortedTiers == null, "Tiers must not be specified for upsert/dedup table: %s", _tableNameWithType); _logger.info("Rebalancing table: {} with instance partitions: {}", _tableNameWithType, instancePartitions); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java index 6775329d37..43cfb9998d 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java @@ -30,6 +30,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; +import org.apache.pinot.spi.config.table.DedupConfig; import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -39,6 +40,7 @@ import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateM import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static org.mockito.ArgumentMatchers.anyInt; @@ -67,7 +69,6 @@ public class StrictRealtimeSegmentAssignmentTest { InstancePartitionsType.CONSUMING.getInstancePartitionsName(RAW_TABLE_NAME); private List<String> _segments; - private SegmentAssignment _segmentAssignment; private Map<InstancePartitionsType, InstancePartitions> _instancePartitionsMap; private InstancePartitions _newConsumingInstancePartitions; @@ -78,16 +79,6 @@ public class StrictRealtimeSegmentAssignmentTest { _segments.add(new LLCSegmentName(RAW_TABLE_NAME, segmentId % NUM_PARTITIONS, segmentId / NUM_PARTITIONS, System.currentTimeMillis()).getSegmentName()); } - - Map<String, String> streamConfigs = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap(); - UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL); - TableConfig tableConfig = - new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS) - .setStreamConfigs(streamConfigs).setUpsertConfig(upsertConfig) - .setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY) - .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1)).build(); - _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(), tableConfig, null); - _instancePartitionsMap = new TreeMap<>(); // CONSUMING instances: // { @@ -126,25 +117,40 @@ public class StrictRealtimeSegmentAssignmentTest { } } - @Test - public void testFactory() { - assertTrue(_segmentAssignment instanceof StrictRealtimeSegmentAssignment); + @DataProvider(name = "tableTypes") + public Object[] getTableTypes() { + return new Object[]{"upsert", "dedup"}; + } + + private static SegmentAssignment createSegmentAssignment(String tableType) { + TableConfigBuilder builder = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) + .setNumReplicas(NUM_REPLICAS) + .setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()) + .setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY) + .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1)); + TableConfig tableConfig; + if ("upsert".equalsIgnoreCase(tableType)) { + tableConfig = builder.setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)).build(); + } else { + tableConfig = builder.setDedupConfig(new DedupConfig()).build(); + } + return SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(), tableConfig, null); } - @Test - public void testAssignSegment() { - assertTrue(_segmentAssignment instanceof StrictRealtimeSegmentAssignment); + @Test(dataProvider = "tableTypes") + public void testAssignSegment(String tableType) { + SegmentAssignment segmentAssignment = createSegmentAssignment(tableType); + assertTrue(segmentAssignment instanceof StrictRealtimeSegmentAssignment); Map<InstancePartitionsType, InstancePartitions> onlyConsumingInstancePartitionMap = ImmutableMap.of(InstancePartitionsType.CONSUMING, _instancePartitionsMap.get(InstancePartitionsType.CONSUMING)); int numInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES / NUM_REPLICAS; Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); // Add segments for partition 0/1/2, but add no segment for partition 3. List<String> instancesAssigned; - boolean consistent; for (int segmentId = 0; segmentId < 3; segmentId++) { String segmentName = _segments.get(segmentId); instancesAssigned = - _segmentAssignment.assignSegment(segmentName, currentAssignment, onlyConsumingInstancePartitionMap); + segmentAssignment.assignSegment(segmentName, currentAssignment, onlyConsumingInstancePartitionMap); assertEquals(instancesAssigned.size(), NUM_REPLICAS); // Segment 0 (partition 0) should be assigned to instance 0, 3, 6 // Segment 1 (partition 1) should be assigned to instance 1, 4, 7 @@ -171,7 +177,7 @@ public class StrictRealtimeSegmentAssignmentTest { int segmentId = 3; String segmentName = _segments.get(segmentId); instancesAssigned = - _segmentAssignment.assignSegment(segmentName, currentAssignment, newConsumingInstancePartitionMap); + segmentAssignment.assignSegment(segmentName, currentAssignment, newConsumingInstancePartitionMap); assertEquals(instancesAssigned, Arrays.asList("new_consumingInstance_0", "new_consumingInstance_3", "new_consumingInstance_6")); addToAssignment(currentAssignment, segmentId, instancesAssigned); @@ -180,7 +186,7 @@ public class StrictRealtimeSegmentAssignmentTest { for (segmentId = 4; segmentId < 7; segmentId++) { segmentName = _segments.get(segmentId); instancesAssigned = - _segmentAssignment.assignSegment(segmentName, currentAssignment, newConsumingInstancePartitionMap); + segmentAssignment.assignSegment(segmentName, currentAssignment, newConsumingInstancePartitionMap); assertEquals(instancesAssigned.size(), NUM_REPLICAS); // Those segments are assigned according to the assignment from idealState, instead of using new_xxx instances @@ -197,20 +203,20 @@ public class StrictRealtimeSegmentAssignmentTest { } } - @Test - public void testAssignSegmentWithOfflineSegment() { - assertTrue(_segmentAssignment instanceof StrictRealtimeSegmentAssignment); + @Test(dataProvider = "tableTypes") + public void testAssignSegmentWithOfflineSegment(String tableType) { + SegmentAssignment segmentAssignment = createSegmentAssignment(tableType); + assertTrue(segmentAssignment instanceof StrictRealtimeSegmentAssignment); Map<InstancePartitionsType, InstancePartitions> onlyConsumingInstancePartitionMap = ImmutableMap.of(InstancePartitionsType.CONSUMING, _instancePartitionsMap.get(InstancePartitionsType.CONSUMING)); int numInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES / NUM_REPLICAS; Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); // Add segments for partition 0/1/2, but add no segment for partition 3. List<String> instancesAssigned; - boolean consistent; for (int segmentId = 0; segmentId < 3; segmentId++) { String segmentName = _segments.get(segmentId); instancesAssigned = - _segmentAssignment.assignSegment(segmentName, currentAssignment, onlyConsumingInstancePartitionMap); + segmentAssignment.assignSegment(segmentName, currentAssignment, onlyConsumingInstancePartitionMap); assertEquals(instancesAssigned.size(), NUM_REPLICAS); // Segment 0 (partition 0) should be assigned to instance 0, 3, 6 // Segment 1 (partition 1) should be assigned to instance 1, 4, 7 @@ -238,7 +244,7 @@ public class StrictRealtimeSegmentAssignmentTest { for (int segmentId = 3; segmentId < 7; segmentId++) { String segmentName = _segments.get(segmentId); instancesAssigned = - _segmentAssignment.assignSegment(segmentName, currentAssignment, newConsumingInstancePartitionMap); + segmentAssignment.assignSegment(segmentName, currentAssignment, newConsumingInstancePartitionMap); assertEquals(instancesAssigned.size(), NUM_REPLICAS); // Those segments are assigned according to the assignment from idealState, instead of using new_xxx instances @@ -255,9 +261,10 @@ public class StrictRealtimeSegmentAssignmentTest { } } - @Test(expectedExceptions = IllegalStateException.class) - public void testAssignSegmentToCompletedServers() { - _segmentAssignment.assignSegment("seg01", new TreeMap<>(), new TreeMap<>()); + @Test(expectedExceptions = IllegalStateException.class, dataProvider = "tableTypes") + public void testAssignSegmentToCompletedServers(String tableType) { + SegmentAssignment segmentAssignment = createSegmentAssignment(tableType); + segmentAssignment.assignSegment("seg01", new TreeMap<>(), new TreeMap<>()); } private void addToAssignment(Map<String, Map<String, String>> currentAssignment, int segmentId, @@ -276,7 +283,7 @@ public class StrictRealtimeSegmentAssignmentTest { SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING)); } - private HelixManager createHelixManager() { + private static HelixManager createHelixManager() { HelixManager helixManager = mock(HelixManager.class); ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class); when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org