This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9353f158b6 Add new ImplicitRealtimeTablePartitionSelector strategy for 
instance assignment (#15930)
9353f158b6 is described below

commit 9353f158b6f59dbc8d19c626f173f8b99f34b1ce
Author: Yash Mayya <yash.ma...@gmail.com>
AuthorDate: Fri Jun 6 16:41:54 2025 +0100

    Add new ImplicitRealtimeTablePartitionSelector strategy for instance 
assignment (#15930)
---
 .../ImplicitRealtimeTablePartitionSelector.java    |  79 ++++++++
 .../instance/InstanceAssignmentDriver.java         |  22 +-
 .../instance/InstancePartitionSelectorFactory.java |   7 +-
 .../InstanceReplicaGroupPartitionSelector.java     |   4 +-
 .../helix/core/rebalance/RebalanceResult.java      |   8 +
 .../core/rebalance/RebalanceSummaryResult.java     |  88 ++++++++
 .../instance/InstanceAssignmentTest.java           | 153 ++++++++++++++
 .../TableRebalancerClusterStatelessTest.java       | 221 ++++++++++++++++++++-
 .../tests/TableRebalanceIntegrationTest.java       |  80 +++++++-
 .../segment/local/utils/TableConfigUtils.java      |  30 +++
 .../segment/local/utils/TableConfigUtilsTest.java  |  71 +++++++
 .../table/assignment/InstanceAssignmentConfig.java |   2 +-
 12 files changed, 744 insertions(+), 21 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/ImplicitRealtimeTablePartitionSelector.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/ImplicitRealtimeTablePartitionSelector.java
new file mode 100644
index 0000000000..1cb12a725a
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/ImplicitRealtimeTablePartitionSelector.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.instance;
+
+import com.google.common.annotations.VisibleForTesting;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.spi.config.table.TableConfig;
+import 
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+
+
+/**
+ * Variation of {@link InstanceReplicaGroupPartitionSelector} that uses the 
number of partitions from the stream
+ * to determine the number of partitions in each replica group.
+ */
+public class ImplicitRealtimeTablePartitionSelector extends 
InstanceReplicaGroupPartitionSelector {
+  private final int _numPartitions;
+
+  public ImplicitRealtimeTablePartitionSelector(TableConfig tableConfig,
+      InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, String 
tableNameWithType,
+      @Nullable InstancePartitions existingInstancePartitions, boolean 
minimizeDataMovement) {
+    this(replicaGroupPartitionConfig, tableNameWithType, 
existingInstancePartitions, minimizeDataMovement,
+        // Get the number of partitions from the first stream config
+        // TODO: Revisit this logic to better handle multiple streams in the 
future - either validate that they
+        //       all have the same number of partitions and use that or 
disallow the use of this selector in case the
+        //       partition counts differ.
+        
StreamConsumerFactoryProvider.create(IngestionConfigUtils.getFirstStreamConfig(tableConfig))
+            .createStreamMetadataProvider(
+                ImplicitRealtimeTablePartitionSelector.class.getSimpleName() + 
"-" + tableNameWithType)
+    );
+  }
+
+  @VisibleForTesting
+  ImplicitRealtimeTablePartitionSelector(InstanceReplicaGroupPartitionConfig 
replicaGroupPartitionConfig,
+      String tableNameWithType, @Nullable InstancePartitions 
existingInstancePartitions, boolean minimizeDataMovement,
+      StreamMetadataProvider streamMetadataProvider) {
+    super(replicaGroupPartitionConfig, tableNameWithType, 
existingInstancePartitions, minimizeDataMovement);
+    _numPartitions = getStreamNumPartitions(streamMetadataProvider);
+  }
+
+  private int getStreamNumPartitions(StreamMetadataProvider 
streamMetadataProvider) {
+    try (streamMetadataProvider) {
+      return streamMetadataProvider.fetchPartitionCount(10_000L);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to retrieve partition info for table: 
" + _tableNameWithType, e);
+    }
+  }
+
+  @Override
+  protected int getNumPartitions() {
+    return _numPartitions;
+  }
+
+  @Override
+  protected int getNumInstancesPerPartition(int numInstancesPerReplicaGroup) {
+    // This partition selector should only be used for CONSUMING instance 
partitions, and we enforce a single instance
+    // per partition in this case.
+    return 1;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index f76c9df6a4..04ce082fa6 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.controller.helix.core.assignment.instance;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -113,9 +114,26 @@ public class InstanceAssignmentDriver {
     // if existingInstancePartitions is null.
     boolean minimizeDataMovementFromTableConfig = 
instanceAssignmentConfig.isMinimizeDataMovement();
     boolean minimizeDataMovement = 
minimizeDataMovementEnablement.isEnabled(minimizeDataMovementFromTableConfig);
+    InstancePartitionSelector instancePartitionSelector =
+        InstancePartitionSelectorFactory.getInstance(_tableConfig, 
instanceAssignmentConfig.getPartitionSelector(),
+            instanceAssignmentConfig.getReplicaGroupPartitionConfig(), 
tableNameWithType, existingInstancePartitions,
+            preConfiguredInstancePartitions, minimizeDataMovement);
+
     LOGGER.info("Starting {} instance assignment for table: {} with 
minimizeDataMovement: {} (from table config: {}, "
             + "override: {})", instancePartitionsName, tableNameWithType, 
minimizeDataMovement,
         minimizeDataMovementFromTableConfig, minimizeDataMovementEnablement);
+
+    return getInstancePartitions(instancePartitionsName, 
instanceAssignmentConfig, instanceConfigs,
+        existingInstancePartitions, minimizeDataMovement, 
instancePartitionSelector);
+  }
+
+  @VisibleForTesting
+  InstancePartitions getInstancePartitions(String instancePartitionsName,
+      InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig> 
instanceConfigs,
+      @Nullable InstancePartitions existingInstancePartitions, boolean 
minimizeDataMovement,
+      InstancePartitionSelector instancePartitionSelector) {
+    String tableNameWithType = _tableConfig.getTableName();
+
     InstanceTagPoolSelector tagPoolSelector =
         new 
InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), 
tableNameWithType,
             minimizeDataMovement, existingInstancePartitions);
@@ -132,10 +150,6 @@ public class InstanceAssignmentDriver {
       poolToInstanceConfigsMap = 
constraintApplier.applyConstraint(poolToInstanceConfigsMap);
     }
 
-    InstancePartitionSelector instancePartitionSelector =
-        
InstancePartitionSelectorFactory.getInstance(instanceAssignmentConfig.getPartitionSelector(),
-            instanceAssignmentConfig.getReplicaGroupPartitionConfig(), 
tableNameWithType, existingInstancePartitions,
-            preConfiguredInstancePartitions, minimizeDataMovement);
     InstancePartitions instancePartitions = new 
InstancePartitions(instancePartitionsName);
     instancePartitionSelector.selectInstances(poolToInstanceConfigsMap, 
instancePartitions);
     return instancePartitions;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
index 8a343b1598..d038245913 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
@@ -21,6 +21,7 @@ package 
org.apache.pinot.controller.helix.core.assignment.instance;
 import java.util.Arrays;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
 import 
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
 
@@ -30,7 +31,8 @@ public class InstancePartitionSelectorFactory {
   private InstancePartitionSelectorFactory() {
   }
 
-  public static InstancePartitionSelector 
getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector,
+  public static InstancePartitionSelector getInstance(TableConfig tableConfig,
+      InstanceAssignmentConfig.PartitionSelector partitionSelector,
       InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, 
String tableNameWithType,
       InstancePartitions existingInstancePartitions, @Nullable 
InstancePartitions preConfiguredInstancePartitions,
       boolean minimizeDataMovement) {
@@ -44,6 +46,9 @@ public class InstancePartitionSelectorFactory {
       case MIRROR_SERVER_SET_PARTITION_SELECTOR:
         return new 
MirrorServerSetInstancePartitionSelector(instanceReplicaGroupPartitionConfig, 
tableNameWithType,
             existingInstancePartitions, preConfiguredInstancePartitions, 
minimizeDataMovement);
+      case IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR:
+        return new ImplicitRealtimeTablePartitionSelector(tableConfig, 
instanceReplicaGroupPartitionConfig,
+            tableNameWithType, existingInstancePartitions, 
minimizeDataMovement);
       default:
         throw new IllegalStateException("Unexpected PartitionSelector: " + 
partitionSelector + ", should be from"
             + 
Arrays.toString(InstanceAssignmentConfig.PartitionSelector.values()));
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
index b8c19ede69..4309d67b67 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
@@ -248,7 +248,7 @@ public class InstanceReplicaGroupPartitionSelector extends 
InstancePartitionSele
     return numInstancesPerReplicaGroup;
   }
 
-  private int getNumPartitions() {
+  protected int getNumPartitions() {
     // Assign instances within a replica-group to one partition if not 
configured
     int numPartitions = _replicaGroupPartitionConfig.getNumPartitions();
     if (numPartitions <= 0) {
@@ -257,7 +257,7 @@ public class InstanceReplicaGroupPartitionSelector extends 
InstancePartitionSele
     return numPartitions;
   }
 
-  private int getNumInstancesPerPartition(int numInstancesPerReplicaGroup) {
+  protected int getNumInstancesPerPartition(int numInstancesPerReplicaGroup) {
     // Assign all instances within a replica-group to each partition if not 
configured
     int numInstancesPerPartition = 
_replicaGroupPartitionConfig.getNumInstancesPerPartition();
     if (numInstancesPerPartition > 0) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
index f2737c265b..cfc38a8033 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
@@ -114,4 +114,12 @@ public class RebalanceResult {
     // UNKNOWN_ERROR if the job hits on an unexpected exception.
     NO_OP, DONE, FAILED, IN_PROGRESS, ABORTED, CANCELLED, UNKNOWN_ERROR
   }
+
+  @Override
+  public String toString() {
+    return "RebalanceResult{" + "_jobId='" + _jobId + '\'' + ", _status=" + 
_status + ", _description='" + _description
+        + '\'' + ", _instanceAssignment=" + _instanceAssignment + ", 
_tierInstanceAssignment="
+        + _tierInstanceAssignment + ", _segmentAssignment=" + 
_segmentAssignment + ", _preChecksResult="
+        + _preChecksResult + ", _rebalanceSummaryResult=" + 
_rebalanceSummaryResult + '}';
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
index a7c5da9abf..d99cb55c09 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
@@ -137,6 +137,19 @@ public class RebalanceSummaryResult {
     public List<String> getTagList() {
       return _tagList;
     }
+
+    @Override
+    public String toString() {
+      return "ServerSegmentChangeInfo{"
+          + "_serverStatus=" + _serverStatus
+          + ", _totalSegmentsAfterRebalance=" + _totalSegmentsAfterRebalance
+          + ", _totalSegmentsBeforeRebalance=" + _totalSegmentsBeforeRebalance
+          + ", _segmentsAdded=" + _segmentsAdded
+          + ", _segmentsDeleted=" + _segmentsDeleted
+          + ", _segmentsUnchanged=" + _segmentsUnchanged
+          + ", _tagList=" + _tagList
+          + '}';
+    }
   }
 
   public static class RebalanceChangeInfo {
@@ -164,6 +177,14 @@ public class RebalanceSummaryResult {
     public int getExpectedValueAfterRebalance() {
       return _expectedValueAfterRebalance;
     }
+
+    @Override
+    public String toString() {
+      return "RebalanceChangeInfo{"
+          + "_valueBeforeRebalance=" + _valueBeforeRebalance
+          + ", _expectedValueAfterRebalance=" + _expectedValueAfterRebalance
+          + '}';
+    }
   }
 
   public static class TagInfo {
@@ -221,6 +242,16 @@ public class RebalanceSummaryResult {
     public void increaseNumServerParticipants(int numServers) {
       _numServerParticipants += numServers;
     }
+
+    @Override
+    public String toString() {
+      return "TagInfo{"
+          + "_tagName='" + _tagName + '\''
+          + ", _numSegmentsUnchanged=" + _numSegmentsUnchanged
+          + ", _numSegmentsToDownload=" + _numSegmentsToDownload
+          + ", _numServerParticipants=" + _numServerParticipants
+          + '}';
+    }
   }
 
   @JsonInclude(JsonInclude.Include.NON_NULL)
@@ -295,6 +326,19 @@ public class RebalanceSummaryResult {
     public Map<String, ServerSegmentChangeInfo> getServerSegmentChangeInfo() {
       return _serverSegmentChangeInfo;
     }
+
+    @Override
+    public String toString() {
+      return "ServerInfo{"
+          + "_numServersGettingNewSegments=" + _numServersGettingNewSegments
+          + ", _numServers=" + _numServers
+          + ", _serversAdded=" + _serversAdded
+          + ", _serversRemoved=" + _serversRemoved
+          + ", _serversUnchanged=" + _serversUnchanged
+          + ", _serversGettingNewSegments=" + _serversGettingNewSegments
+          + ", _serverSegmentChangeInfo=" + _serverSegmentChangeInfo
+          + '}';
+    }
   }
 
   public static class ConsumingSegmentToBeMovedSummary {
@@ -407,6 +451,26 @@ public class RebalanceSummaryResult {
       public int getTotalOffsetsToCatchUpAcrossAllConsumingSegments() {
         return _totalOffsetsToCatchUpAcrossAllConsumingSegments;
       }
+
+      @Override
+      public String toString() {
+        return "ConsumingSegmentSummaryPerServer{"
+            + "_numConsumingSegmentsToBeAdded=" + 
_numConsumingSegmentsToBeAdded
+            + ", _totalOffsetsToCatchUpAcrossAllConsumingSegments=" + 
_totalOffsetsToCatchUpAcrossAllConsumingSegments
+            + '}';
+      }
+    }
+
+    @Override
+    public String toString() {
+      return "ConsumingSegmentToBeMovedSummary{"
+          + "_numConsumingSegmentsToBeMoved=" + _numConsumingSegmentsToBeMoved
+          + ", _numServersGettingConsumingSegmentsAdded=" + 
_numServersGettingConsumingSegmentsAdded
+          + ", _consumingSegmentsToBeMovedWithMostOffsetsToCatchUp="
+          + _consumingSegmentsToBeMovedWithMostOffsetsToCatchUp
+          + ", _consumingSegmentsToBeMovedWithOldestAgeInMinutes=" + 
_consumingSegmentsToBeMovedWithOldestAgeInMinutes
+          + ", _serverConsumingSegmentSummary=" + 
_serverConsumingSegmentSummary
+          + '}';
     }
   }
 
@@ -501,6 +565,21 @@ public class RebalanceSummaryResult {
     public ConsumingSegmentToBeMovedSummary 
getConsumingSegmentToBeMovedSummary() {
       return _consumingSegmentToBeMovedSummary;
     }
+
+    @Override
+    public String toString() {
+      return "SegmentInfo{"
+          + "_totalSegmentsToBeMoved=" + _totalSegmentsToBeMoved
+          + ", _totalSegmentsToBeDeleted=" + _totalSegmentsToBeDeleted
+          + ", _maxSegmentsAddedToASingleServer=" + 
_maxSegmentsAddedToASingleServer
+          + ", _estimatedAverageSegmentSizeInBytes=" + 
_estimatedAverageSegmentSizeInBytes
+          + ", _totalEstimatedDataToBeMovedInBytes=" + 
_totalEstimatedDataToBeMovedInBytes
+          + ", _replicationFactor=" + _replicationFactor
+          + ", _numSegmentsInSingleReplica=" + _numSegmentsInSingleReplica
+          + ", _numSegmentsAcrossAllReplicas=" + _numSegmentsAcrossAllReplicas
+          + ", _consumingSegmentToBeMovedSummary=" + 
_consumingSegmentToBeMovedSummary
+          + '}';
+    }
   }
 
   public enum ServerStatus {
@@ -509,4 +588,13 @@ public class RebalanceSummaryResult {
     // UNCHANGED if the server status is unchanged as part of rebalance;
     ADDED, REMOVED, UNCHANGED
   }
+
+  @Override
+  public String toString() {
+    return "RebalanceSummaryResult{"
+        + "_serverInfo=" + _serverInfo
+        + ", _segmentInfo=" + _segmentInfo
+        + ", _tagsInfo=" + _tagsInfo
+        + '}';
+  }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index 8f8819831e..74badfb7fc 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -47,11 +47,15 @@ import 
org.apache.pinot.spi.config.table.assignment.InstanceConstraintConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import 
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
 import org.apache.pinot.spi.utils.Enablement;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.*;
 
 
@@ -59,6 +63,7 @@ public class InstanceAssignmentTest {
   private static final String RAW_TABLE_NAME = "myTable";
   private static final String TENANT_NAME = "tenant";
   private static final String OFFLINE_TAG = 
TagNameUtils.getOfflineTagForTenant(TENANT_NAME);
+  private static final String REALTIME_TAG = 
TagNameUtils.getRealtimeTagForTenant(TENANT_NAME);
   private static final String SERVER_INSTANCE_ID_PREFIX = "Server_localhost_";
   private static final String SERVER_INSTANCE_POOL_PREFIX = "_pool_";
   private static final String TABLE_NAME_ZERO_HASH_COMPLEMENT = "12";
@@ -499,6 +504,154 @@ public class InstanceAssignmentTest {
     assertEquals(instancePartitions.getInstances(9, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 7));
   }
 
+  @Test
+  public void testMinimizeDataMovementImplicitRealtimeTablePartitionSelector() 
{
+    int numReplicas = 2;
+    InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 0, 1, 
true, null);
+    InstanceAssignmentConfig instanceAssignmentConfig =
+        new InstanceAssignmentConfig(new InstanceTagPoolConfig(REALTIME_TAG, 
false, 0, null), null,
+            replicaGroupPartitionConfig,
+            
InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR.name(),
 true);
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setServerTenant(TENANT_NAME)
+            .setNumReplicas(numReplicas)
+            
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(), 
instanceAssignmentConfig))
+            .build();
+
+    int numInstances = 12;
+    List<InstanceConfig> instanceConfigs = new ArrayList<>(numInstances);
+    for (int i = 0; i < numInstances; i++) {
+      InstanceConfig instanceConfig = new 
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(REALTIME_TAG);
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Start without existing InstancePartitions:
+    // Instances should be assigned to 2 replica-groups in a round-robin 
fashion, each with 6 instances. Then these 6
+    // instances should be assigned to 6 partitions, each with 1 instances
+    int numPartitions = 6;
+    StreamMetadataProvider streamMetadataProvider = 
mock(StreamMetadataProvider.class);
+    
when(streamMetadataProvider.fetchPartitionCount(anyLong())).thenReturn(numPartitions);
+    InstancePartitionSelector instancePartitionSelector =
+        new 
ImplicitRealtimeTablePartitionSelector(replicaGroupPartitionConfig, 
tableConfig.getTableName(), null, true,
+            streamMetadataProvider);
+    InstanceAssignmentDriver driver = new 
InstanceAssignmentDriver(tableConfig);
+    InstancePartitions instancePartitions =
+        
driver.getInstancePartitions(InstancePartitionsType.CONSUMING.getInstancePartitionsName(RAW_TABLE_NAME),
+            instanceAssignmentConfig, instanceConfigs, null, true, 
instancePartitionSelector);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+    assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+    // Math.abs("myTable_REALTIME".hashCode()) % 12 = 0
+
+    // [i0, i1, i10, i11, i2, i3, i4, i5, i6, i7, i8, i9]
+    //  r0  r1  r0   r1   r0  r1  r0  r1  r0  r1  r0  r1
+    // r0: [i0, i10, i2, i4, i6, i8]
+    //      p0  p1   p2  p3  p4  p5
+    //
+    // r1: [i1, i11, i3, i5, i7, i9]
+    //      p0  p1   p2  p3  p4  p5
+
+    assertEquals(instancePartitions.getInstances(0, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+    assertEquals(instancePartitions.getInstances(0, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+    assertEquals(instancePartitions.getInstances(1, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 10));
+    assertEquals(instancePartitions.getInstances(1, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 11));
+    assertEquals(instancePartitions.getInstances(2, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+    assertEquals(instancePartitions.getInstances(2, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+    assertEquals(instancePartitions.getInstances(3, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 4));
+    assertEquals(instancePartitions.getInstances(3, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 5));
+    assertEquals(instancePartitions.getInstances(4, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 6));
+    assertEquals(instancePartitions.getInstances(4, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 7));
+    assertEquals(instancePartitions.getInstances(5, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 8));
+    assertEquals(instancePartitions.getInstances(5, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 9));
+
+    // Increase the number of partitions from 6 to 9. Expect no data movement 
for existing partitions.
+    numPartitions = 9;
+    
when(streamMetadataProvider.fetchPartitionCount(anyLong())).thenReturn(numPartitions);
+    instancePartitionSelector = new 
ImplicitRealtimeTablePartitionSelector(replicaGroupPartitionConfig,
+        tableConfig.getTableName(), instancePartitions, true, 
streamMetadataProvider);
+    instancePartitions = driver.getInstancePartitions(
+        
InstancePartitionsType.CONSUMING.getInstancePartitionsName(RAW_TABLE_NAME), 
instanceAssignmentConfig,
+        instanceConfigs, instancePartitions, true, instancePartitionSelector);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+    assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+    // [i0, i1, i10, i11, i2, i3, i4, i5, i6, i7, i8, i9]
+    //  r0  r1  r0   r1   r0  r1  r0  r1  r0  r1  r0  r1
+    // r0: [i0, i10, i2, i4, i6, i8]
+    //      p0  p1   p2  p3  p4  p5
+    //      p6  p7   p8
+    //
+    // r1: [i1, i11, i3, i5, i7, i9]
+    //      p0  p1   p2  p3  p4  p5
+    //      p6  p7   p8
+
+    assertEquals(instancePartitions.getInstances(0, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+    assertEquals(instancePartitions.getInstances(0, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+    assertEquals(instancePartitions.getInstances(1, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 10));
+    assertEquals(instancePartitions.getInstances(1, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 11));
+    assertEquals(instancePartitions.getInstances(2, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+    assertEquals(instancePartitions.getInstances(2, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+    assertEquals(instancePartitions.getInstances(3, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 4));
+    assertEquals(instancePartitions.getInstances(3, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 5));
+    assertEquals(instancePartitions.getInstances(4, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 6));
+    assertEquals(instancePartitions.getInstances(4, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 7));
+    assertEquals(instancePartitions.getInstances(5, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 8));
+    assertEquals(instancePartitions.getInstances(5, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 9));
+    assertEquals(instancePartitions.getInstances(6, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+    assertEquals(instancePartitions.getInstances(6, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+    assertEquals(instancePartitions.getInstances(7, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 10));
+    assertEquals(instancePartitions.getInstances(7, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 11));
+    assertEquals(instancePartitions.getInstances(8, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+    assertEquals(instancePartitions.getInstances(8, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+
+    // Add 6 new instances
+    for (int i = numInstances; i < numInstances + 6; i++) {
+      InstanceConfig instanceConfig = new 
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(REALTIME_TAG);
+      instanceConfigs.add(instanceConfig);
+    }
+
+    instancePartitionSelector = new 
ImplicitRealtimeTablePartitionSelector(replicaGroupPartitionConfig,
+        tableConfig.getTableName(), instancePartitions, true, 
streamMetadataProvider);
+    instancePartitions = driver.getInstancePartitions(
+        
InstancePartitionsType.CONSUMING.getInstancePartitionsName(RAW_TABLE_NAME), 
instanceAssignmentConfig,
+        instanceConfigs, instancePartitions, true, instancePartitionSelector);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+    assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+    // We're using the minimize data movement based algorithm, so only the new 
partitions should be moved to the new
+    // instances, and the existing partition assignments should remain 
unchanged.
+    //
+    // [i0, i1, i10, i11, i2, i3, i4, i5, i6, i7, i8, i9]
+    //  r0  r1  r0   r1   r0  r1  r0  r1  r0  r1  r0  r1
+    // r0: [i0, i10, i2, i4, i6, i8, i12, i14, i16]
+    //      p0  p1   p2  p3  p4  p5  p6   p7   p8
+    //
+    // r1: [i1, i11, i3, i5, i7, i9, i13, i15, i17]
+    //      p0  p1   p2  p3  p4  p5  p6   p7   p8
+
+    assertEquals(instancePartitions.getInstances(0, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+    assertEquals(instancePartitions.getInstances(0, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+    assertEquals(instancePartitions.getInstances(1, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 10));
+    assertEquals(instancePartitions.getInstances(1, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 11));
+    assertEquals(instancePartitions.getInstances(2, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+    assertEquals(instancePartitions.getInstances(2, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+    assertEquals(instancePartitions.getInstances(3, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 4));
+    assertEquals(instancePartitions.getInstances(3, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 5));
+    assertEquals(instancePartitions.getInstances(4, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 6));
+    assertEquals(instancePartitions.getInstances(4, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 7));
+    assertEquals(instancePartitions.getInstances(5, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 8));
+    assertEquals(instancePartitions.getInstances(5, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 9));
+    assertEquals(instancePartitions.getInstances(6, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 12));
+    assertEquals(instancePartitions.getInstances(6, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 13));
+    assertEquals(instancePartitions.getInstances(7, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 14));
+    assertEquals(instancePartitions.getInstances(7, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 15));
+    assertEquals(instancePartitions.getInstances(8, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 16));
+    assertEquals(instancePartitions.getInstances(8, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 17));
+  }
+
   public void testMirrorServerSetBasedRandom()
       throws FileNotFoundException {
     testMirrorServerSetBasedRandomInner(10000000);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index a25d178091..64cbff92c0 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -24,8 +24,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -76,6 +79,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
   private static final String REALTIME_TABLE_NAME = 
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
   private static final int NUM_REPLICAS = 3;
   private static final String SEGMENT_NAME_PREFIX = "segment_";
+  private static final String PARTITION_COLUMN = "partitionColumn";
 
   private static final String TIERED_TABLE_NAME = "testTable";
   private static final String OFFLINE_TIERED_TABLE_NAME = 
TableNameBuilder.OFFLINE.tableNameWithType(TIERED_TABLE_NAME);
@@ -95,15 +99,15 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
   }
 
-  /**
-   * Dropping instance from cluster requires waiting for live instance gone 
and removing instance related ZNodes, which
-   * are not the purpose of the test, so combine different rebalance scenarios 
into one test:
-   * 1. NO_OP rebalance
-   * 2. Add servers and rebalance
-   * 3. Migrate to replica-group based segment assignment and rebalance
-   * 4. Migrate back to non-replica-group based segment assignment and 
rebalance
-   * 5. Remove (disable) servers and rebalance
-   */
+  ///
+  /// Dropping instance from cluster requires waiting for live instance gone 
and removing instance related ZNodes, which
+  /// are not the purpose of the test, so combine different rebalance 
scenarios into one test:
+  /// 1. NO_OP rebalance
+  /// 2. Add servers and rebalance
+  /// 3. Migrate to replica-group based segment assignment and rebalance
+  /// 4. Migrate back to non-replica-group based segment assignment and 
rebalance
+  /// 5. Remove (disable) servers and rebalance
+  ///
   @Test
   public void testRebalance()
       throws Exception {
@@ -766,6 +770,205 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     }
   }
 
+  @Test
+  public void 
testRebalanceWithImplicitRealtimeTablePartitionSelectorAndMinimizeDataMovement()
+      throws Exception {
+    int numServers = 6;
+    int numPartitions = 18;
+    int numReplicas = 2;
+
+    for (int i = 0; i < numServers; i++) {
+      String instanceId = SERVER_INSTANCE_ID_PREFIX + i;
+      addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+    }
+
+    InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 0, 1, 
false, null);
+    InstanceAssignmentConfig instanceAssignmentConfig =
+        new InstanceAssignmentConfig(
+            new 
InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(null), false, 0, 
null), null,
+            replicaGroupPartitionConfig,
+            
InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR.name(),
 true);
+    TableConfig tableConfig =
+        new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+            .setNumReplicas(numReplicas)
+            .setRoutingConfig(
+                new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+            .setStreamConfigs(
+                
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(numPartitions).getStreamConfigsMap())
+            
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(), 
instanceAssignmentConfig))
+            .build();
+
+    // Create the table
+    addDummySchema(RAW_TABLE_NAME);
+    _helixResourceManager.addTable(tableConfig);
+
+    // Add the segments
+    int numSegmentsPerPartition = 4;
+    for (int i = 0; i < numPartitions; i++) {
+      for (int j = 0; j < numSegmentsPerPartition; j++) {
+        _helixResourceManager.addNewSegment(REALTIME_TABLE_NAME,
+            
SegmentMetadataMockUtils.mockSegmentMetadataWithPartitionInfo(RAW_TABLE_NAME,
+                SEGMENT_NAME_PREFIX + (i * numSegmentsPerPartition + j), 
PARTITION_COLUMN, i), null);
+      }
+    }
+
+    Map<String, Map<String, String>> oldSegmentAssignment =
+        
_helixResourceManager.getTableIdealState(REALTIME_TABLE_NAME).getRecord().getMapFields();
+    for (Map.Entry<String, Map<String, String>> entry : 
oldSegmentAssignment.entrySet()) {
+      assertEquals(entry.getValue().size(), numReplicas);
+    }
+
+    // Verify that segments are distributed equally across servers
+    Map<String, Integer> numSegmentsPerServer = 
getNumSegmentsPerServer(oldSegmentAssignment);
+    for (int i = 0; i < numServers; i++) {
+      String instanceId = SERVER_INSTANCE_ID_PREFIX + i;
+      assertTrue(numSegmentsPerServer.containsKey(instanceId));
+      // Total number of segments is numReplicas * numPartitions * 
(numSegmentsPerPartition + 1) because of
+      // CONSUMING segments
+      assertEquals(numSegmentsPerServer.get(instanceId),
+          (numReplicas * numPartitions * (numSegmentsPerPartition + 1)) / 
numServers);
+    }
+
+    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, 
null, null, null);
+    // Rebalance should return NO_OP status since there has been no change
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+
+    // All servers should be assigned to the table
+    Map<InstancePartitionsType, InstancePartitions> instanceAssignment = 
rebalanceResult.getInstanceAssignment();
+    assertEquals(instanceAssignment.size(), 1);
+    InstancePartitions instancePartitions = 
instanceAssignment.get(InstancePartitionsType.CONSUMING);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+    assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+    // Verify that replica partitions are distributed equally across servers
+    Map<String, Integer> numReplicaPartitionsPerServer = 
getNumReplicaPartitionsPerServer(instancePartitions);
+    for (int i = 0; i < numServers; i++) {
+      String instanceId = SERVER_INSTANCE_ID_PREFIX + i;
+      assertTrue(numReplicaPartitionsPerServer.containsKey(instanceId));
+      // Total number of partitions is numReplicas * numPartitions
+      assertEquals(numReplicaPartitionsPerServer.get(instanceId), (numReplicas 
* numPartitions) / numServers);
+    }
+
+    // Segment assignment should not change
+    assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
+
+    // Add two new servers
+    int numServersToAdd = 2;
+    Set<String> newServers = new HashSet<>();
+    for (int i = 0; i < numServersToAdd; i++) {
+      String instanceId = SERVER_INSTANCE_ID_PREFIX + (numServers + i);
+      addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+      newServers.add(instanceId);
+    }
+
+    // Check number of segments moved when minimizeDataMovement is not enabled
+    rebalanceConfig.setReassignInstances(true);
+    rebalanceConfig.setIncludeConsuming(true);
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setMinimizeDataMovement(Enablement.DISABLE);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    // Most of the segments end up being moved when minimizeDataMovement is 
not enabled due to the round robin way in
+    // which partitions are assigned to instances (see 
InstanceReplicaGroupPartitionSelector)
+    
assertEquals(rebalanceResult.getRebalanceSummaryResult().getSegmentInfo().getTotalSegmentsToBeMoved(),
 130);
+
+    // Rebalance with reassignInstances and minimizeDataMovement enabled
+    rebalanceConfig.setMinimizeDataMovement(Enablement.ENABLE);
+    rebalanceConfig.setDryRun(false);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    instanceAssignment = rebalanceResult.getInstanceAssignment();
+    assertEquals(instanceAssignment.size(), 1);
+    instancePartitions = 
instanceAssignment.get(InstancePartitionsType.CONSUMING);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+    assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+    // Get number of segments moved
+    int numSegmentsMoved = getNumSegmentsMoved(oldSegmentAssignment, 
rebalanceResult.getSegmentAssignment());
+    // This number is 130 when using the default partition selector in this 
same scenario since more segment partitions
+    // will be moved when the instance partitions don't match the segment 
partitions (we're setting numPartitions to
+    // the default value of 0 in the table's instance assignment config).
+    assertEquals(numSegmentsMoved, 30);
+
+    // "Repartition" and add two new partitions
+    int newNumPartitions = 20;
+    tableConfig.getIndexingConfig()
+        .setStreamConfigs(
+            
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(newNumPartitions).getStreamConfigsMap());
+    _helixResourceManager.updateTableConfig(tableConfig);
+
+    // Add segments for the new partitions
+    for (int i = numPartitions; i < newNumPartitions; i++) {
+      for (int j = 0; j < numSegmentsPerPartition; j++) {
+        _helixResourceManager.addNewSegment(REALTIME_TABLE_NAME,
+            
SegmentMetadataMockUtils.mockSegmentMetadataWithPartitionInfo(RAW_TABLE_NAME,
+                SEGMENT_NAME_PREFIX + (i * numSegmentsPerPartition + j), 
PARTITION_COLUMN, i), null);
+      }
+    }
+
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
+    // Verify that the new partitions are assigned to the new servers. Due to 
the minimizeDataMovement algorithm, the
+    // previous rebalance resulted in the older servers having 5 partition 
replicas each with the newer ones having 3
+    // partition replicas each.
+    instancePartitions = 
rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.CONSUMING);
+    for (int i = numPartitions; i < newNumPartitions; i++) {
+      for (int j = 0; j < numReplicas; j++) {
+        for (String instanceId : instancePartitions.getInstances(i, j)) {
+          assertTrue(newServers.contains(instanceId),
+              "Expected new partition " + i + " to be assigned to a new 
server, but found it on " + instanceId);
+        }
+      }
+    }
+
+    _helixResourceManager.deleteRealtimeTable(RAW_TABLE_NAME);
+
+    for (int i = 0; i < numServers + numServersToAdd; i++) {
+      stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+    }
+  }
+
+  private Map<String, Integer> getNumSegmentsPerServer(Map<String, Map<String, 
String>> segmentAssignment) {
+    Map<String, Integer> numSegmentsPerServer = new HashMap<>();
+    for (Map<String, String> instanceStateMap : segmentAssignment.values()) {
+      for (String instanceId : instanceStateMap.keySet()) {
+        numSegmentsPerServer.merge(instanceId, 1, Integer::sum);
+      }
+    }
+    return numSegmentsPerServer;
+  }
+
+  private Map<String, Integer> 
getNumReplicaPartitionsPerServer(InstancePartitions instancePartitions) {
+    Map<String, Integer> numPartitionsPerServer = new HashMap<>();
+    for (int i = 0; i < instancePartitions.getNumReplicaGroups(); i++) {
+      for (int j = 0; j < instancePartitions.getNumPartitions(); j++) {
+        List<String> instances = instancePartitions.getInstances(j, i);
+        for (String instanceId : instances) {
+          numPartitionsPerServer.merge(instanceId, 1, Integer::sum);
+        }
+      }
+    }
+    return numPartitionsPerServer;
+  }
+
+  private int getNumSegmentsMoved(Map<String, Map<String, String>> 
oldSegmentAssignment,
+      Map<String, Map<String, String>> newSegmentAssignment) {
+    int numSegmentsMoved = 0;
+    for (Map.Entry<String, Map<String, String>> entry : 
newSegmentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> newInstanceStateMap = entry.getValue();
+      Map<String, String> oldInstanceStateMap = 
oldSegmentAssignment.get(segmentName);
+      assertEquals(oldInstanceStateMap.size(), newInstanceStateMap.size());
+      Set<String> commonInstances = new 
HashSet<>(newInstanceStateMap.keySet());
+      commonInstances.retainAll(oldInstanceStateMap.keySet());
+      numSegmentsMoved += newInstanceStateMap.size() - commonInstances.size();
+    }
+    return numSegmentsMoved;
+  }
+
   @Test
   public void testRebalanceBatchSizePerServerErrors()
       throws Exception {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
index 9c778908de..2f51fd6250 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.integration.tests;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
@@ -25,6 +26,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
 import org.apache.pinot.common.tier.TierFactory;
 import org.apache.pinot.common.utils.SimpleHttpResponse;
 import org.apache.pinot.common.utils.config.TagNameUtils;
@@ -46,6 +48,7 @@ import org.apache.pinot.spi.config.table.TenantConfig;
 import org.apache.pinot.spi.config.table.TierConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceConstraintConfig;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import 
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -58,10 +61,7 @@ import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.*;
 
 
 public class TableRebalanceIntegrationTest extends 
BaseHybridClusterIntegrationTest {
@@ -88,6 +88,78 @@ public class TableRebalanceIntegrationTest extends 
BaseHybridClusterIntegrationT
         + "?type=" + tableType.toString() + "&" + 
getQueryString(rebalanceConfig);
   }
 
+  @Test
+  public void testImplicitRealtimeTableInstanceAssignment() throws Exception {
+    // Instance assignment not configured for the table initially, so 
INSTANCE_PARTITIONS should not exist.
+    assertThrows("404", IOException.class,
+        () -> sendGetRequest(getControllerBaseApiUrl() + "/tables/" + 
getTableName() + "/instancePartitions"));
+
+    // Update table config with instance assignment config, use 
IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR to
+    // create partitions in the replica group based on the number of stream 
partitions.
+    TableConfig realtimeTableConfig = 
getTableConfigBuilder(TableType.REALTIME).build();
+    realtimeTableConfig.setInstanceAssignmentConfigMap(
+        Map.of(InstancePartitionsType.CONSUMING.name(), new 
InstanceAssignmentConfig(
+            new 
InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(getServerTenant()), 
false, 0, null), null,
+            new InstanceReplicaGroupPartitionConfig(true, 0, 1, 0, 0, 0, true, 
null),
+            
InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR.name(),
 true))
+    );
+    updateTableConfig(realtimeTableConfig);
+
+    // Rebalance the table to reassign instances and create the 
INSTANCE_PARTITIONS.
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setReassignInstances(true);
+    rebalanceConfig.setMinAvailableReplicas(-1);
+    rebalanceConfig.setIncludeConsuming(true);
+    sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME));
+
+    // We're using IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR based instance 
assignment for this table.
+    // This test verifies that INSTANCE_PARTITIONS is written to ZK after 
instance assignment in the rebalance and has
+    // the expected number of partitions.
+
+    TestUtils.waitForCondition(
+        aVoid -> {
+          try {
+            sendGetRequest(getControllerBaseApiUrl() + "/tables/" + 
getTableName() + "/instancePartitions");
+          } catch (Exception e) {
+            return false;
+          }
+          return true;
+        }, 10_000, "Expected INSTANCE_PARTITIONS to be created for table after 
instance assignment in rebalance"
+    );
+
+    JsonNode instancePartitions = JsonUtils.stringToJsonNode(
+        sendGetRequest(getControllerBaseApiUrl() + "/tables/" + getTableName() 
+ "/instancePartitions"));
+
+    assertNotNull(instancePartitions);
+    assertEquals(instancePartitions.size(), 1);
+
+    JsonNode partitionToInstancesMap =
+        
instancePartitions.get(InstancePartitionsType.CONSUMING.name()).get("partitionToInstancesMap");
+
+    assertEquals(partitionToInstancesMap.size(), getNumKafkaPartitions()); // 
single replica group
+    for (int i = 0; i < getNumKafkaPartitions(); i++) {
+      assertNotNull(partitionToInstancesMap.get(i + "_0")); // partition i, 
replica group 0
+    }
+
+    // Reset the table config and rebalance
+    updateTableConfig(getTableConfigBuilder(TableType.REALTIME).build());
+    sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME));
+
+    TestUtils.waitForCondition(
+        aVoid -> {
+          try {
+            sendGetRequest(getControllerBaseApiUrl() + "/tables/" + 
getTableName() + "/instancePartitions");
+          } catch (Exception e) {
+            return e.getCause() instanceof HttpErrorStatusException
+                && ((HttpErrorStatusException) e.getCause()).getStatusCode() 
== 404;
+          }
+          return false;
+        }, 10_000,
+        "Expected INSTANCE_PARTITIONS to be deleted for table after removing 
instance assignment configs and "
+            + "rebalancing"
+    );
+  }
+
   @Test
   public void testRealtimeRebalancePreCheckMinimizeDataMovement()
       throws Exception {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index decda17d89..c4c08710f9 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -170,6 +170,7 @@ public final class TableConfigUtils {
       validateFieldConfigList(tableConfig, schema);
       validateInstancePartitionsTypeMapConfig(tableConfig);
       validatePartitionedReplicaGroupInstance(tableConfig);
+      validateInstanceAssignmentConfigs(tableConfig);
       if (!skipTypes.contains(ValidationType.UPSERT)) {
         validateUpsertAndDedupConfig(tableConfig, schema);
         validatePartialUpsertStrategies(tableConfig, schema);
@@ -913,6 +914,35 @@ public final class TableConfigUtils {
     }
   }
 
+  @VisibleForTesting
+  static void validateInstanceAssignmentConfigs(TableConfig tableConfig) {
+    if (tableConfig.getInstanceAssignmentConfigMap() == null) {
+      return;
+    }
+    for (Map.Entry<String, InstanceAssignmentConfig> 
instanceAssignmentConfigMapEntry
+        : tableConfig.getInstanceAssignmentConfigMap().entrySet()) {
+      String instancePartitionsType = 
instanceAssignmentConfigMapEntry.getKey();
+      InstanceAssignmentConfig instanceAssignmentConfig = 
instanceAssignmentConfigMapEntry.getValue();
+      if (instanceAssignmentConfig.getPartitionSelector()
+          == 
InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR)
 {
+        Preconditions.checkState(tableConfig.getTableType() == 
TableType.REALTIME,
+            "IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR can only be used for 
REALTIME tables");
+        
Preconditions.checkState(InstancePartitionsType.CONSUMING.name().equalsIgnoreCase(instancePartitionsType),
+            "IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR can only be used for 
CONSUMING instance partitions type");
+        
Preconditions.checkState(instanceAssignmentConfig.getReplicaGroupPartitionConfig().isReplicaGroupBased(),
+            "IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR can only be used with 
replica group based instance assignment");
+        
Preconditions.checkState(instanceAssignmentConfig.getReplicaGroupPartitionConfig().getNumPartitions()
 == 0,
+            "numPartitions should not be explicitly set when using 
IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR");
+        // Allow 0 because that's the default (unset) value.
+        Preconditions.checkState(
+            
instanceAssignmentConfig.getReplicaGroupPartitionConfig().getNumInstancesPerPartition()
 == 0
+                || 
instanceAssignmentConfig.getReplicaGroupPartitionConfig().getNumInstancesPerPartition()
 == 1,
+            "numInstancesPerPartition must be 1 when using 
IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR");
+      }
+      // TODO: Add more validations for other partition selectors here
+    }
+  }
+
   /**
    * Validates metrics aggregation when upsert config is enabled
    * - Metrics aggregation cannot be enabled when Upsert Config is enabled.
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 5d51854392..10eab60abf 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -76,7 +76,10 @@ import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
 
 
 /**
@@ -2570,6 +2573,74 @@ public class TableConfigUtilsTest {
     }
   }
 
+  @Test
+  public void testValidateImplicitRealtimeTablePartitionSelectorConfigs() {
+    InstanceAssignmentConfig instanceAssignmentConfig = 
Mockito.mock(InstanceAssignmentConfig.class);
+    when(instanceAssignmentConfig.getPartitionSelector()).thenReturn(
+        
InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR);
+
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(), 
instanceAssignmentConfig))
+        .build();
+    IllegalStateException e = expectThrows(IllegalStateException.class,
+        () -> TableConfigUtils.validateInstanceAssignmentConfigs(tableConfig));
+    assertTrue(
+        e.getMessage().contains("IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR 
can only be used for REALTIME tables"));
+
+    InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig =
+        Mockito.mock(InstanceReplicaGroupPartitionConfig.class);
+    
when(instanceReplicaGroupPartitionConfig.isReplicaGroupBased()).thenReturn(true);
+    
when(instanceAssignmentConfig.getReplicaGroupPartitionConfig()).thenReturn(instanceReplicaGroupPartitionConfig);
+    TableConfig tableConfig2 = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.COMPLETED.name(), 
instanceAssignmentConfig))
+        .build();
+    e = expectThrows(IllegalStateException.class,
+        () -> 
TableConfigUtils.validateInstanceAssignmentConfigs(tableConfig2));
+    assertTrue(e.getMessage()
+        .contains(
+            "IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR can only be used for 
CONSUMING instance partitions type"));
+
+    
when(instanceReplicaGroupPartitionConfig.isReplicaGroupBased()).thenReturn(false);
+    
when(instanceAssignmentConfig.getReplicaGroupPartitionConfig()).thenReturn(instanceReplicaGroupPartitionConfig);
+    TableConfig tableConfig3 = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(), 
instanceAssignmentConfig))
+        .build();
+    e = expectThrows(IllegalStateException.class,
+        () -> 
TableConfigUtils.validateInstanceAssignmentConfigs(tableConfig3));
+    assertTrue(e.getMessage()
+        .contains(
+            "IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR can only be used with 
replica group based instance "
+                + "assignment"));
+
+    
when(instanceReplicaGroupPartitionConfig.isReplicaGroupBased()).thenReturn(true);
+    when(instanceReplicaGroupPartitionConfig.getNumPartitions()).thenReturn(1);
+    TableConfig tableConfig4 = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(), 
instanceAssignmentConfig))
+        .build();
+    e = expectThrows(IllegalStateException.class,
+        () -> 
TableConfigUtils.validateInstanceAssignmentConfigs(tableConfig4));
+    assertTrue(e.getMessage()
+        .contains("numPartitions should not be explicitly set when using 
IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR"));
+
+    
when(instanceReplicaGroupPartitionConfig.isReplicaGroupBased()).thenReturn(true);
+    when(instanceReplicaGroupPartitionConfig.getNumPartitions()).thenReturn(0);
+    
when(instanceReplicaGroupPartitionConfig.getNumInstancesPerPartition()).thenReturn(2);
+    TableConfig tableConfig5 = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(), 
instanceAssignmentConfig))
+        .build();
+    e = expectThrows(IllegalStateException.class,
+        () -> 
TableConfigUtils.validateInstanceAssignmentConfigs(tableConfig5));
+    assertTrue(e.getMessage()
+        .contains("numInstancesPerPartition must be 1 when using 
IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR"));
+
+    when(instanceReplicaGroupPartitionConfig.getNumPartitions()).thenReturn(0);
+    
when(instanceReplicaGroupPartitionConfig.getNumInstancesPerPartition()).thenReturn(0);
+    TableConfig tableConfig6 = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(), 
instanceAssignmentConfig))
+        .build();
+    TableConfigUtils.validateInstanceAssignmentConfigs(tableConfig6);
+  }
+
   private Map<String, String> getStreamConfigs() {
     Map<String, String> streamConfigs = new HashMap<>();
     streamConfigs.put("streamType", "kafka");
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
index ad4b22ecaf..973cf368c7 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
@@ -87,6 +87,6 @@ public class InstanceAssignmentConfig extends BaseJsonConfig {
 
   public enum PartitionSelector {
     FD_AWARE_INSTANCE_PARTITION_SELECTOR, 
INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR,
-    MIRROR_SERVER_SET_PARTITION_SELECTOR
+    MIRROR_SERVER_SET_PARTITION_SELECTOR, 
IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to