This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6372d78d98 make dedup table use strict replica group assignment too 
(#15778)
6372d78d98 is described below

commit 6372d78d98fd391f62670a7d324b99f83c946bd6
Author: Xiaobing <61892277+klsi...@users.noreply.github.com>
AuthorDate: Thu May 15 12:58:43 2025 -0700

    make dedup table use strict replica group assignment too (#15778)
    
    * make dedup table use strict replica group assignment too
    
    * refine logs and add TODO for tiers support for 
StrictRealtimeSegmentAssignment
---
 .../segment/SegmentAssignmentFactory.java          |  5 +-
 .../segment/StrictRealtimeSegmentAssignment.java   | 23 ++++----
 .../StrictRealtimeSegmentAssignmentTest.java       | 69 ++++++++++++----------
 3 files changed, 55 insertions(+), 42 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
index e32a7246b2..7e37026438 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
@@ -21,6 +21,7 @@ package 
org.apache.pinot.controller.helix.core.assignment.segment;
 import javax.annotation.Nullable;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.spi.config.table.DedupConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -40,7 +41,9 @@ public class SegmentAssignmentFactory {
       segmentAssignment = new OfflineSegmentAssignment();
     } else {
       UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
-      if (upsertConfig != null && upsertConfig.getMode() != 
UpsertConfig.Mode.NONE) {
+      DedupConfig dedupConfig = tableConfig.getDedupConfig();
+      if ((upsertConfig != null && upsertConfig.getMode() != 
UpsertConfig.Mode.NONE) || (dedupConfig != null
+          && dedupConfig.isDedupEnabled())) {
         segmentAssignment = new StrictRealtimeSegmentAssignment();
       } else {
         segmentAssignment = new RealtimeSegmentAssignment();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
index 2bc0ada5bd..6c76332a24 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
@@ -37,7 +37,7 @@ import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateM
 
 
 /**
- * Segment assignment for LLC real-time table using upsert. The 
assignSegment() of RealtimeSegmentAssignment is
+ * Segment assignment for LLC real-time table using upsert or dedup. The 
assignSegment() of RealtimeSegmentAssignment is
  * overridden to add new segment for a table partition in a way that's 
consistent with the assignment in idealState to
  * make sure that at any time the segments from the same table partition is 
hosted by the same server.
  * <ul>
@@ -47,12 +47,13 @@ import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateM
  *     InstancePartition and the one in idealState are different, the one in 
idealState must be used so that segments
  *     from the same table partition are always hosted on the same server as 
set in current idealState. If the
  *     idealState is not honored, segments from the same table partition may 
be assigned to different servers,
- *     breaking the key assumption for queries to be correct for the table 
using upsert.
+ *     breaking the key assumption for queries to be correct for the table 
using upsert or dedup.
  *   </li>
  *   <li>
- *     There is no need to handle COMPLETED segments for tables using upsert, 
because their completed segments should
- *     not be relocated to servers tagged to host COMPLETED segments. 
Basically, upsert-enabled tables can only use
- *     servers tagged for CONSUMING segments to host both consuming and 
completed segments from a table partition.
+ *     There is no need to handle COMPLETED segments for tables using upsert 
or dedup, because their completed
+ *     segments should not be relocated to servers tagged to host COMPLETED 
segments. Basically, tables using upsert
+ *     or dedup can only use servers tagged for CONSUMING segments to host 
both consuming and completed segments from
+ *     a table partition.
  *   </li>
  * </ul>
  */
@@ -63,9 +64,9 @@ public class StrictRealtimeSegmentAssignment extends 
RealtimeSegmentAssignment {
   // 1. This cache is used for table rebalance only, but not segment 
assignment. During rebalance, rebalanceTable() can
   //    be invoked multiple times when the ideal state changes during the 
rebalance process.
   // 2. The cache won't be refreshed when an existing segment is replaced with 
a segment from a different partition.
-  //    Replacing a segment with a segment from a different partition should 
not be allowed for upsert table because it
-  //    will cause the segment being served by the wrong servers. If this 
happens during the table rebalance, another
-  //    rebalance might be needed to fix the assignment.
+  //    Replacing a segment with a segment from a different partition should 
not be allowed for upsert/dedup table
+  //    because it will cause the segment being served by the wrong servers. 
If this happens during the table
+  //    rebalance, another rebalance might be needed to fix the assignment.
   private final Object2IntOpenHashMap<String> _segmentPartitionIdMap = new 
Object2IntOpenHashMap<>();
 
   @Override
@@ -163,8 +164,10 @@ public class StrictRealtimeSegmentAssignment extends 
RealtimeSegmentAssignment {
     Preconditions.checkState(instancePartitions != null, "Failed to find 
CONSUMING instance partitions for table: %s",
         _tableNameWithType);
     Preconditions.checkArgument(config.isIncludeConsuming(),
-        "Consuming segment must be included when rebalancing upsert table: 
%s", _tableNameWithType);
-    Preconditions.checkState(sortedTiers == null, "Tiers must not be specified 
for upsert table: %s",
+        "Consuming segment must be included when rebalancing upsert/dedup 
table: %s", _tableNameWithType);
+    // TODO: consider to support tiers for segments out of metadata TTL for 
upsert/dedup table, as those segments
+    //       won't be included in the upsert/dedup metadata as kept on 
CONSUMING tier.
+    Preconditions.checkState(sortedTiers == null, "Tiers must not be specified 
for upsert/dedup table: %s",
         _tableNameWithType);
     _logger.info("Rebalancing table: {} with instance partitions: {}", 
_tableNameWithType, instancePartitions);
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
index 6775329d37..43cfb9998d 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
@@ -30,6 +30,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
+import org.apache.pinot.spi.config.table.DedupConfig;
 import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -39,6 +40,7 @@ import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateM
 import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import static org.mockito.ArgumentMatchers.anyInt;
@@ -67,7 +69,6 @@ public class StrictRealtimeSegmentAssignmentTest {
       
InstancePartitionsType.CONSUMING.getInstancePartitionsName(RAW_TABLE_NAME);
 
   private List<String> _segments;
-  private SegmentAssignment _segmentAssignment;
   private Map<InstancePartitionsType, InstancePartitions> 
_instancePartitionsMap;
   private InstancePartitions _newConsumingInstancePartitions;
 
@@ -78,16 +79,6 @@ public class StrictRealtimeSegmentAssignmentTest {
       _segments.add(new LLCSegmentName(RAW_TABLE_NAME, segmentId % 
NUM_PARTITIONS, segmentId / NUM_PARTITIONS,
           System.currentTimeMillis()).getSegmentName());
     }
-
-    Map<String, String> streamConfigs = 
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
-    UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
-    TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
-            .setStreamConfigs(streamConfigs).setUpsertConfig(upsertConfig)
-            
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
-            .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1)).build();
-    _segmentAssignment = 
SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(), 
tableConfig, null);
-
     _instancePartitionsMap = new TreeMap<>();
     // CONSUMING instances:
     // {
@@ -126,25 +117,40 @@ public class StrictRealtimeSegmentAssignmentTest {
     }
   }
 
-  @Test
-  public void testFactory() {
-    assertTrue(_segmentAssignment instanceof StrictRealtimeSegmentAssignment);
+  @DataProvider(name = "tableTypes")
+  public Object[] getTableTypes() {
+    return new Object[]{"upsert", "dedup"};
+  }
+
+  private static SegmentAssignment createSegmentAssignment(String tableType) {
+    TableConfigBuilder builder = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+        .setNumReplicas(NUM_REPLICAS)
+        
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+        
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
+        .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1));
+    TableConfig tableConfig;
+    if ("upsert".equalsIgnoreCase(tableType)) {
+      tableConfig = builder.setUpsertConfig(new 
UpsertConfig(UpsertConfig.Mode.FULL)).build();
+    } else {
+      tableConfig = builder.setDedupConfig(new DedupConfig()).build();
+    }
+    return SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(), 
tableConfig, null);
   }
 
-  @Test
-  public void testAssignSegment() {
-    assertTrue(_segmentAssignment instanceof StrictRealtimeSegmentAssignment);
+  @Test(dataProvider = "tableTypes")
+  public void testAssignSegment(String tableType) {
+    SegmentAssignment segmentAssignment = createSegmentAssignment(tableType);
+    assertTrue(segmentAssignment instanceof StrictRealtimeSegmentAssignment);
     Map<InstancePartitionsType, InstancePartitions> 
onlyConsumingInstancePartitionMap =
         ImmutableMap.of(InstancePartitionsType.CONSUMING, 
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
     int numInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES / NUM_REPLICAS;
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
     // Add segments for partition 0/1/2, but add no segment for partition 3.
     List<String> instancesAssigned;
-    boolean consistent;
     for (int segmentId = 0; segmentId < 3; segmentId++) {
       String segmentName = _segments.get(segmentId);
       instancesAssigned =
-          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
onlyConsumingInstancePartitionMap);
+          segmentAssignment.assignSegment(segmentName, currentAssignment, 
onlyConsumingInstancePartitionMap);
       assertEquals(instancesAssigned.size(), NUM_REPLICAS);
       // Segment 0 (partition 0) should be assigned to instance 0, 3, 6
       // Segment 1 (partition 1) should be assigned to instance 1, 4, 7
@@ -171,7 +177,7 @@ public class StrictRealtimeSegmentAssignmentTest {
     int segmentId = 3;
     String segmentName = _segments.get(segmentId);
     instancesAssigned =
-        _segmentAssignment.assignSegment(segmentName, currentAssignment, 
newConsumingInstancePartitionMap);
+        segmentAssignment.assignSegment(segmentName, currentAssignment, 
newConsumingInstancePartitionMap);
     assertEquals(instancesAssigned,
         Arrays.asList("new_consumingInstance_0", "new_consumingInstance_3", 
"new_consumingInstance_6"));
     addToAssignment(currentAssignment, segmentId, instancesAssigned);
@@ -180,7 +186,7 @@ public class StrictRealtimeSegmentAssignmentTest {
     for (segmentId = 4; segmentId < 7; segmentId++) {
       segmentName = _segments.get(segmentId);
       instancesAssigned =
-          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
newConsumingInstancePartitionMap);
+          segmentAssignment.assignSegment(segmentName, currentAssignment, 
newConsumingInstancePartitionMap);
       assertEquals(instancesAssigned.size(), NUM_REPLICAS);
 
       // Those segments are assigned according to the assignment from 
idealState, instead of using new_xxx instances
@@ -197,20 +203,20 @@ public class StrictRealtimeSegmentAssignmentTest {
     }
   }
 
-  @Test
-  public void testAssignSegmentWithOfflineSegment() {
-    assertTrue(_segmentAssignment instanceof StrictRealtimeSegmentAssignment);
+  @Test(dataProvider = "tableTypes")
+  public void testAssignSegmentWithOfflineSegment(String tableType) {
+    SegmentAssignment segmentAssignment = createSegmentAssignment(tableType);
+    assertTrue(segmentAssignment instanceof StrictRealtimeSegmentAssignment);
     Map<InstancePartitionsType, InstancePartitions> 
onlyConsumingInstancePartitionMap =
         ImmutableMap.of(InstancePartitionsType.CONSUMING, 
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
     int numInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES / NUM_REPLICAS;
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
     // Add segments for partition 0/1/2, but add no segment for partition 3.
     List<String> instancesAssigned;
-    boolean consistent;
     for (int segmentId = 0; segmentId < 3; segmentId++) {
       String segmentName = _segments.get(segmentId);
       instancesAssigned =
-          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
onlyConsumingInstancePartitionMap);
+          segmentAssignment.assignSegment(segmentName, currentAssignment, 
onlyConsumingInstancePartitionMap);
       assertEquals(instancesAssigned.size(), NUM_REPLICAS);
       // Segment 0 (partition 0) should be assigned to instance 0, 3, 6
       // Segment 1 (partition 1) should be assigned to instance 1, 4, 7
@@ -238,7 +244,7 @@ public class StrictRealtimeSegmentAssignmentTest {
     for (int segmentId = 3; segmentId < 7; segmentId++) {
       String segmentName = _segments.get(segmentId);
       instancesAssigned =
-          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
newConsumingInstancePartitionMap);
+          segmentAssignment.assignSegment(segmentName, currentAssignment, 
newConsumingInstancePartitionMap);
       assertEquals(instancesAssigned.size(), NUM_REPLICAS);
 
       // Those segments are assigned according to the assignment from 
idealState, instead of using new_xxx instances
@@ -255,9 +261,10 @@ public class StrictRealtimeSegmentAssignmentTest {
     }
   }
 
-  @Test(expectedExceptions = IllegalStateException.class)
-  public void testAssignSegmentToCompletedServers() {
-    _segmentAssignment.assignSegment("seg01", new TreeMap<>(), new 
TreeMap<>());
+  @Test(expectedExceptions = IllegalStateException.class, dataProvider = 
"tableTypes")
+  public void testAssignSegmentToCompletedServers(String tableType) {
+    SegmentAssignment segmentAssignment = createSegmentAssignment(tableType);
+    segmentAssignment.assignSegment("seg01", new TreeMap<>(), new TreeMap<>());
   }
 
   private void addToAssignment(Map<String, Map<String, String>> 
currentAssignment, int segmentId,
@@ -276,7 +283,7 @@ public class StrictRealtimeSegmentAssignmentTest {
         SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.CONSUMING));
   }
 
-  private HelixManager createHelixManager() {
+  private static HelixManager createHelixManager() {
     HelixManager helixManager = mock(HelixManager.class);
     ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
     when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to