This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new cf8cf91cdd Revert "make dedup table use strict replica group 
assignment too (#15778)" (#15853)
cf8cf91cdd is described below

commit cf8cf91cdd3ca88e98d42b3414495b454762f9b7
Author: Xiaobing <61892277+klsi...@users.noreply.github.com>
AuthorDate: Tue May 20 11:52:14 2025 -0700

    Revert "make dedup table use strict replica group assignment too (#15778)" 
(#15853)
    
    This reverts commit 6372d78d98fd391f62670a7d324b99f83c946bd6.
    
    The tiered storage support is required for dedup table for it to use strict 
assignment correctly
---
 .../segment/SegmentAssignmentFactory.java          |  5 +-
 .../segment/StrictRealtimeSegmentAssignment.java   | 23 ++++----
 .../StrictRealtimeSegmentAssignmentTest.java       | 69 ++++++++++------------
 3 files changed, 42 insertions(+), 55 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
index 7e37026438..e32a7246b2 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
@@ -21,7 +21,6 @@ package 
org.apache.pinot.controller.helix.core.assignment.segment;
 import javax.annotation.Nullable;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.spi.config.table.DedupConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -41,9 +40,7 @@ public class SegmentAssignmentFactory {
       segmentAssignment = new OfflineSegmentAssignment();
     } else {
       UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
-      DedupConfig dedupConfig = tableConfig.getDedupConfig();
-      if ((upsertConfig != null && upsertConfig.getMode() != 
UpsertConfig.Mode.NONE) || (dedupConfig != null
-          && dedupConfig.isDedupEnabled())) {
+      if (upsertConfig != null && upsertConfig.getMode() != 
UpsertConfig.Mode.NONE) {
         segmentAssignment = new StrictRealtimeSegmentAssignment();
       } else {
         segmentAssignment = new RealtimeSegmentAssignment();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
index 6c76332a24..2bc0ada5bd 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
@@ -37,7 +37,7 @@ import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateM
 
 
 /**
- * Segment assignment for LLC real-time table using upsert or dedup. The 
assignSegment() of RealtimeSegmentAssignment is
+ * Segment assignment for LLC real-time table using upsert. The 
assignSegment() of RealtimeSegmentAssignment is
  * overridden to add new segment for a table partition in a way that's 
consistent with the assignment in idealState to
  * make sure that at any time the segments from the same table partition is 
hosted by the same server.
  * <ul>
@@ -47,13 +47,12 @@ import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateM
  *     InstancePartition and the one in idealState are different, the one in 
idealState must be used so that segments
  *     from the same table partition are always hosted on the same server as 
set in current idealState. If the
  *     idealState is not honored, segments from the same table partition may 
be assigned to different servers,
- *     breaking the key assumption for queries to be correct for the table 
using upsert or dedup.
+ *     breaking the key assumption for queries to be correct for the table 
using upsert.
  *   </li>
  *   <li>
- *     There is no need to handle COMPLETED segments for tables using upsert 
or dedup, because their completed
- *     segments should not be relocated to servers tagged to host COMPLETED 
segments. Basically, tables using upsert
- *     or dedup can only use servers tagged for CONSUMING segments to host 
both consuming and completed segments from
- *     a table partition.
+ *     There is no need to handle COMPLETED segments for tables using upsert, 
because their completed segments should
+ *     not be relocated to servers tagged to host COMPLETED segments. 
Basically, upsert-enabled tables can only use
+ *     servers tagged for CONSUMING segments to host both consuming and 
completed segments from a table partition.
  *   </li>
  * </ul>
  */
@@ -64,9 +63,9 @@ public class StrictRealtimeSegmentAssignment extends 
RealtimeSegmentAssignment {
   // 1. This cache is used for table rebalance only, but not segment 
assignment. During rebalance, rebalanceTable() can
   //    be invoked multiple times when the ideal state changes during the 
rebalance process.
   // 2. The cache won't be refreshed when an existing segment is replaced with 
a segment from a different partition.
-  //    Replacing a segment with a segment from a different partition should 
not be allowed for upsert/dedup table
-  //    because it will cause the segment being served by the wrong servers. 
If this happens during the table
-  //    rebalance, another rebalance might be needed to fix the assignment.
+  //    Replacing a segment with a segment from a different partition should 
not be allowed for upsert table because it
+  //    will cause the segment being served by the wrong servers. If this 
happens during the table rebalance, another
+  //    rebalance might be needed to fix the assignment.
   private final Object2IntOpenHashMap<String> _segmentPartitionIdMap = new 
Object2IntOpenHashMap<>();
 
   @Override
@@ -164,10 +163,8 @@ public class StrictRealtimeSegmentAssignment extends 
RealtimeSegmentAssignment {
     Preconditions.checkState(instancePartitions != null, "Failed to find 
CONSUMING instance partitions for table: %s",
         _tableNameWithType);
     Preconditions.checkArgument(config.isIncludeConsuming(),
-        "Consuming segment must be included when rebalancing upsert/dedup 
table: %s", _tableNameWithType);
-    // TODO: consider to support tiers for segments out of metadata TTL for 
upsert/dedup table, as those segments
-    //       won't be included in the upsert/dedup metadata as kept on 
CONSUMING tier.
-    Preconditions.checkState(sortedTiers == null, "Tiers must not be specified 
for upsert/dedup table: %s",
+        "Consuming segment must be included when rebalancing upsert table: 
%s", _tableNameWithType);
+    Preconditions.checkState(sortedTiers == null, "Tiers must not be specified 
for upsert table: %s",
         _tableNameWithType);
     _logger.info("Rebalancing table: {} with instance partitions: {}", 
_tableNameWithType, instancePartitions);
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
index 43cfb9998d..6775329d37 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
@@ -30,7 +30,6 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
-import org.apache.pinot.spi.config.table.DedupConfig;
 import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -40,7 +39,6 @@ import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateM
 import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import static org.mockito.ArgumentMatchers.anyInt;
@@ -69,6 +67,7 @@ public class StrictRealtimeSegmentAssignmentTest {
       
InstancePartitionsType.CONSUMING.getInstancePartitionsName(RAW_TABLE_NAME);
 
   private List<String> _segments;
+  private SegmentAssignment _segmentAssignment;
   private Map<InstancePartitionsType, InstancePartitions> 
_instancePartitionsMap;
   private InstancePartitions _newConsumingInstancePartitions;
 
@@ -79,6 +78,16 @@ public class StrictRealtimeSegmentAssignmentTest {
       _segments.add(new LLCSegmentName(RAW_TABLE_NAME, segmentId % 
NUM_PARTITIONS, segmentId / NUM_PARTITIONS,
           System.currentTimeMillis()).getSegmentName());
     }
+
+    Map<String, String> streamConfigs = 
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
+    UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
+            .setStreamConfigs(streamConfigs).setUpsertConfig(upsertConfig)
+            
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
+            .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1)).build();
+    _segmentAssignment = 
SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(), 
tableConfig, null);
+
     _instancePartitionsMap = new TreeMap<>();
     // CONSUMING instances:
     // {
@@ -117,40 +126,25 @@ public class StrictRealtimeSegmentAssignmentTest {
     }
   }
 
-  @DataProvider(name = "tableTypes")
-  public Object[] getTableTypes() {
-    return new Object[]{"upsert", "dedup"};
-  }
-
-  private static SegmentAssignment createSegmentAssignment(String tableType) {
-    TableConfigBuilder builder = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
-        .setNumReplicas(NUM_REPLICAS)
-        
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
-        
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
-        .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1));
-    TableConfig tableConfig;
-    if ("upsert".equalsIgnoreCase(tableType)) {
-      tableConfig = builder.setUpsertConfig(new 
UpsertConfig(UpsertConfig.Mode.FULL)).build();
-    } else {
-      tableConfig = builder.setDedupConfig(new DedupConfig()).build();
-    }
-    return SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(), 
tableConfig, null);
+  @Test
+  public void testFactory() {
+    assertTrue(_segmentAssignment instanceof StrictRealtimeSegmentAssignment);
   }
 
-  @Test(dataProvider = "tableTypes")
-  public void testAssignSegment(String tableType) {
-    SegmentAssignment segmentAssignment = createSegmentAssignment(tableType);
-    assertTrue(segmentAssignment instanceof StrictRealtimeSegmentAssignment);
+  @Test
+  public void testAssignSegment() {
+    assertTrue(_segmentAssignment instanceof StrictRealtimeSegmentAssignment);
     Map<InstancePartitionsType, InstancePartitions> 
onlyConsumingInstancePartitionMap =
         ImmutableMap.of(InstancePartitionsType.CONSUMING, 
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
     int numInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES / NUM_REPLICAS;
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
     // Add segments for partition 0/1/2, but add no segment for partition 3.
     List<String> instancesAssigned;
+    boolean consistent;
     for (int segmentId = 0; segmentId < 3; segmentId++) {
       String segmentName = _segments.get(segmentId);
       instancesAssigned =
-          segmentAssignment.assignSegment(segmentName, currentAssignment, 
onlyConsumingInstancePartitionMap);
+          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
onlyConsumingInstancePartitionMap);
       assertEquals(instancesAssigned.size(), NUM_REPLICAS);
       // Segment 0 (partition 0) should be assigned to instance 0, 3, 6
       // Segment 1 (partition 1) should be assigned to instance 1, 4, 7
@@ -177,7 +171,7 @@ public class StrictRealtimeSegmentAssignmentTest {
     int segmentId = 3;
     String segmentName = _segments.get(segmentId);
     instancesAssigned =
-        segmentAssignment.assignSegment(segmentName, currentAssignment, 
newConsumingInstancePartitionMap);
+        _segmentAssignment.assignSegment(segmentName, currentAssignment, 
newConsumingInstancePartitionMap);
     assertEquals(instancesAssigned,
         Arrays.asList("new_consumingInstance_0", "new_consumingInstance_3", 
"new_consumingInstance_6"));
     addToAssignment(currentAssignment, segmentId, instancesAssigned);
@@ -186,7 +180,7 @@ public class StrictRealtimeSegmentAssignmentTest {
     for (segmentId = 4; segmentId < 7; segmentId++) {
       segmentName = _segments.get(segmentId);
       instancesAssigned =
-          segmentAssignment.assignSegment(segmentName, currentAssignment, 
newConsumingInstancePartitionMap);
+          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
newConsumingInstancePartitionMap);
       assertEquals(instancesAssigned.size(), NUM_REPLICAS);
 
       // Those segments are assigned according to the assignment from 
idealState, instead of using new_xxx instances
@@ -203,20 +197,20 @@ public class StrictRealtimeSegmentAssignmentTest {
     }
   }
 
-  @Test(dataProvider = "tableTypes")
-  public void testAssignSegmentWithOfflineSegment(String tableType) {
-    SegmentAssignment segmentAssignment = createSegmentAssignment(tableType);
-    assertTrue(segmentAssignment instanceof StrictRealtimeSegmentAssignment);
+  @Test
+  public void testAssignSegmentWithOfflineSegment() {
+    assertTrue(_segmentAssignment instanceof StrictRealtimeSegmentAssignment);
     Map<InstancePartitionsType, InstancePartitions> 
onlyConsumingInstancePartitionMap =
         ImmutableMap.of(InstancePartitionsType.CONSUMING, 
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
     int numInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES / NUM_REPLICAS;
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
     // Add segments for partition 0/1/2, but add no segment for partition 3.
     List<String> instancesAssigned;
+    boolean consistent;
     for (int segmentId = 0; segmentId < 3; segmentId++) {
       String segmentName = _segments.get(segmentId);
       instancesAssigned =
-          segmentAssignment.assignSegment(segmentName, currentAssignment, 
onlyConsumingInstancePartitionMap);
+          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
onlyConsumingInstancePartitionMap);
       assertEquals(instancesAssigned.size(), NUM_REPLICAS);
       // Segment 0 (partition 0) should be assigned to instance 0, 3, 6
       // Segment 1 (partition 1) should be assigned to instance 1, 4, 7
@@ -244,7 +238,7 @@ public class StrictRealtimeSegmentAssignmentTest {
     for (int segmentId = 3; segmentId < 7; segmentId++) {
       String segmentName = _segments.get(segmentId);
       instancesAssigned =
-          segmentAssignment.assignSegment(segmentName, currentAssignment, 
newConsumingInstancePartitionMap);
+          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
newConsumingInstancePartitionMap);
       assertEquals(instancesAssigned.size(), NUM_REPLICAS);
 
       // Those segments are assigned according to the assignment from 
idealState, instead of using new_xxx instances
@@ -261,10 +255,9 @@ public class StrictRealtimeSegmentAssignmentTest {
     }
   }
 
-  @Test(expectedExceptions = IllegalStateException.class, dataProvider = 
"tableTypes")
-  public void testAssignSegmentToCompletedServers(String tableType) {
-    SegmentAssignment segmentAssignment = createSegmentAssignment(tableType);
-    segmentAssignment.assignSegment("seg01", new TreeMap<>(), new TreeMap<>());
+  @Test(expectedExceptions = IllegalStateException.class)
+  public void testAssignSegmentToCompletedServers() {
+    _segmentAssignment.assignSegment("seg01", new TreeMap<>(), new 
TreeMap<>());
   }
 
   private void addToAssignment(Map<String, Map<String, String>> 
currentAssignment, int segmentId,
@@ -283,7 +276,7 @@ public class StrictRealtimeSegmentAssignmentTest {
         SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.CONSUMING));
   }
 
-  private static HelixManager createHelixManager() {
+  private HelixManager createHelixManager() {
     HelixManager helixManager = mock(HelixManager.class);
     ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
     when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to