This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f2ea415407 Replace Helix AutoRebalanceStrategy with deterministic 
algorithm (#16135)
2f2ea415407 is described below

commit 2f2ea4154071a0e42b26f62bf8db93c04833fe9f
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Wed Jun 18 12:52:53 2025 -0600

    Replace Helix AutoRebalanceStrategy with deterministic algorithm (#16135)
---
 .../assignment/segment/SegmentAssignmentUtils.java |  83 +++--
 .../BalancedNumSegmentAssignmentStrategy.java      |  11 +-
 .../segment/SegmentAssignmentUtilsTest.java        |  26 +-
 .../TableRebalancerClusterStatelessTest.java       | 350 +++++++++------------
 4 files changed, 222 insertions(+), 248 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index 4e5aec1b60e..027f8defba9 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -24,14 +24,12 @@ import it.unimi.dsi.fastutil.ints.IntIntPair;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.TreeMap;
 import org.apache.helix.HelixManager;
-import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.assignment.InstancePartitions;
@@ -140,25 +138,72 @@ public class SegmentAssignmentUtils {
     return instancesAssigned;
   }
 
-  /**
-   * Rebalances the table with Helix AutoRebalanceStrategy.
-   */
-  public static Map<String, Map<String, String>> 
rebalanceTableWithHelixAutoRebalanceStrategy(
-      Map<String, Map<String, String>> currentAssignment, List<String> 
instances, int replication) {
-    // Use Helix AutoRebalanceStrategy to rebalance the table
-    LinkedHashMap<String, Integer> states = new LinkedHashMap<>();
-    states.put(SegmentStateModel.ONLINE, replication);
-    AutoRebalanceStrategy autoRebalanceStrategy =
-        new AutoRebalanceStrategy(null, new 
ArrayList<>(currentAssignment.keySet()), states);
-    // Make a copy of the current assignment because this step might change 
the passed in assignment
-    Map<String, Map<String, String>> currentAssignmentCopy = new TreeMap<>();
+  /// Rebalances the table with non-replica-group based segment assignment 
strategy by uniformly spraying segment
+  /// replicas to the servers.
+  /// 1. Calculate the target number of segments on each server
+  /// 2. Loop over all the segments and keep the assignment if target number 
of segments for the server has not been
+  /// reached and track the not assigned segments
+  /// 3. Assign the left-over segments to the servers with the least segments, 
or the smallest index if there is a tie
+  public static Map<String, Map<String, String>> 
rebalanceNonReplicaGroupBasedTable(
+      Map<String, Map<String, String>> currentAssignment, List<String> 
servers, int replication) {
+    Map<String, Integer> serverIds = getInstanceNameToIdMap(servers);
+
+    // Calculate target number of segments per server
+    // NOTE: in order to minimize the segment movements, use the ceiling of 
the quotient
+    int numServers = servers.size();
+    int numSegments = currentAssignment.size();
+    int targetNumSegmentsPerServer = (numSegments * replication + numServers - 
1) / numServers;
+
+    // Do not move segment if target number of segments is not reached, track 
the segments need to be moved
+    Map<String, Map<String, String>> newAssignment = new TreeMap<>();
+    int[] numSegmentsAssignedPerServer = new int[numServers];
+    List<String> segmentsNotAssigned = new ArrayList<>();
     for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
-      String segmentName = entry.getKey();
-      Map<String, String> instanceStateMap = entry.getValue();
-      currentAssignmentCopy.put(segmentName, new TreeMap<>(instanceStateMap));
+      String segment = entry.getKey();
+      Set<String> currentServers = entry.getValue().keySet();
+      int remainingReplicas = replication;
+      for (String server : currentServers) {
+        Integer serverId = serverIds.get(server);
+        if (serverId != null && numSegmentsAssignedPerServer[serverId] < 
targetNumSegmentsPerServer) {
+          newAssignment.computeIfAbsent(segment, k -> new 
TreeMap<>()).put(server, SegmentStateModel.ONLINE);
+          numSegmentsAssignedPerServer[serverId]++;
+          remainingReplicas--;
+          if (remainingReplicas == 0) {
+            break;
+          }
+        }
+      }
+      for (int i = 0; i < remainingReplicas; i++) {
+        segmentsNotAssigned.add(segment);
+      }
+    }
+
+    // Assign each not assigned segment to the server with the least segments, 
or the smallest id if there is a tie
+    PriorityQueue<Pairs.IntPair> heap = new PriorityQueue<>(numServers, 
Pairs.intPairComparator());
+    for (int serverId = 0; serverId < numServers; serverId++) {
+      heap.add(new Pairs.IntPair(numSegmentsAssignedPerServer[serverId], 
serverId));
+    }
+    List<Pairs.IntPair> skippedPairs = new ArrayList<>();
+    for (String segment : segmentsNotAssigned) {
+      Map<String, String> instanceStateMap = 
newAssignment.computeIfAbsent(segment, k -> new TreeMap<>());
+      while (true) {
+        Pairs.IntPair intPair = heap.remove();
+        int serverId = intPair.getRight();
+        String server = servers.get(serverId);
+        // Skip the server if it already has the segment
+        if (instanceStateMap.put(server, SegmentStateModel.ONLINE) == null) {
+          intPair.setLeft(intPair.getLeft() + 1);
+          heap.add(intPair);
+          break;
+        } else {
+          skippedPairs.add(intPair);
+        }
+      }
+      heap.addAll(skippedPairs);
+      skippedPairs.clear();
     }
-    return autoRebalanceStrategy.computePartitionAssignment(instances, 
instances, currentAssignmentCopy, null)
-        .getMapFields();
+
+    return newAssignment;
   }
 
   /**
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java
index e9c540da78a..c1270705848 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java
@@ -42,17 +42,15 @@ import org.slf4j.LoggerFactory;
 public class BalancedNumSegmentAssignmentStrategy implements 
SegmentAssignmentStrategy {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BalancedNumSegmentAssignmentStrategy.class);
 
-  private String _tableNameWithType;
   private int _replication;
 
   @Override
   public void init(HelixManager helixManager, TableConfig tableConfig) {
-    _tableNameWithType = tableConfig.getTableName();
     SegmentsValidationAndRetentionConfig validationAndRetentionConfig = 
tableConfig.getValidationConfig();
     Preconditions.checkState(validationAndRetentionConfig != null, "Validation 
Config is null");
     _replication = tableConfig.getReplication();
-    LOGGER.info("Initialized BalancedNumSegmentAssignmentStrategy for table: " 
+ "{} with replication: {}",
-        _tableNameWithType, _replication);
+    LOGGER.info("Initialized BalancedNumSegmentAssignmentStrategy for table: 
{} with replication: {}",
+        tableConfig.getTableName(), _replication);
   }
 
   @Override
@@ -66,12 +64,9 @@ public class BalancedNumSegmentAssignmentStrategy implements 
SegmentAssignmentSt
   public Map<String, Map<String, String>> reassignSegments(Map<String, 
Map<String, String>> currentAssignment,
       InstancePartitions instancePartitions, InstancePartitionsType 
instancePartitionsType) {
     validateSegmentAssignmentStrategy(instancePartitions);
-    Map<String, Map<String, String>> newAssignment;
     List<String> instances =
         
SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions,
 _replication);
-    newAssignment =
-        
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
 instances, _replication);
-    return newAssignment;
+    return 
SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, 
instances, _replication);
   }
 
   private void validateSegmentAssignmentStrategy(InstancePartitions 
instancePartitions) {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
index 0f43b7869df..306e391ce0c 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
@@ -76,8 +76,7 @@ public class SegmentAssignmentUtilsTest {
     Arrays.fill(expectedNumSegmentsAssignedPerInstance, 
numSegmentsPerInstance);
     assertEquals(numSegmentsAssignedPerInstance, 
expectedNumSegmentsAssignedPerInstance);
     // Current assignment should already be balanced
-    assertEquals(
-        
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
 instances, NUM_REPLICAS),
+    
assertEquals(SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment,
 instances, NUM_REPLICAS),
         currentAssignment);
 
     // Replace instance_0 with instance_10
@@ -85,8 +84,7 @@ public class SegmentAssignmentUtilsTest {
     String newInstanceName = INSTANCE_NAME_PREFIX + 10;
     newInstances.set(0, newInstanceName);
     Map<String, Map<String, String>> newAssignment =
-        
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
 newInstances,
-            NUM_REPLICAS);
+        
SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, 
newInstances, NUM_REPLICAS);
     // There should be 100 segments assigned
     assertEquals(currentAssignment.size(), numSegments);
     // Each segment should have 3 replicas
@@ -116,8 +114,8 @@ public class SegmentAssignmentUtilsTest {
     // }
     int newNumInstances = numInstances - 5;
     newInstances = 
SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, newNumInstances);
-    newAssignment = 
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
 newInstances,
-        NUM_REPLICAS);
+    newAssignment =
+        
SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, 
newInstances, NUM_REPLICAS);
     // There should be 100 segments assigned
     assertEquals(newAssignment.size(), numSegments);
     // Each segment should have 3 replicas
@@ -127,19 +125,19 @@ public class SegmentAssignmentUtilsTest {
     // The segments are not perfectly balanced, but should be deterministic
     numSegmentsAssignedPerInstance =
         
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, 
newInstances);
-    assertEquals(numSegmentsAssignedPerInstance[0], 56);
+    assertEquals(numSegmentsAssignedPerInstance[0], 60);
     assertEquals(numSegmentsAssignedPerInstance[1], 60);
     assertEquals(numSegmentsAssignedPerInstance[2], 60);
     assertEquals(numSegmentsAssignedPerInstance[3], 60);
-    assertEquals(numSegmentsAssignedPerInstance[4], 64);
+    assertEquals(numSegmentsAssignedPerInstance[4], 60);
     numSegmentsToMovePerInstance =
         
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment, 
newAssignment);
     assertEquals(numSegmentsToMovePerInstance.size(), numInstances);
-    assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 0), 
IntIntPair.of(26, 0));
+    assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 0), 
IntIntPair.of(30, 0));
     assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 1), 
IntIntPair.of(30, 0));
     assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 2), 
IntIntPair.of(30, 0));
     assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 3), 
IntIntPair.of(30, 0));
-    assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 4), 
IntIntPair.of(34, 0));
+    assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 4), 
IntIntPair.of(30, 0));
     for (int i = 5; i < 10; i++) {
       assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + i), 
IntIntPair.of(0, 30));
     }
@@ -150,8 +148,8 @@ public class SegmentAssignmentUtilsTest {
     // }
     newNumInstances = numInstances + 5;
     newInstances = 
SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, newNumInstances);
-    newAssignment = 
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
 newInstances,
-        NUM_REPLICAS);
+    newAssignment =
+        
SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, 
newInstances, NUM_REPLICAS);
     // There should be 100 segments assigned
     assertEquals(newAssignment.size(), numSegments);
     // Each segment should have 3 replicas
@@ -182,8 +180,8 @@ public class SegmentAssignmentUtilsTest {
     // }
     String newInstanceNamePrefix = "i_";
     newInstances = 
SegmentAssignmentTestUtils.getNameList(newInstanceNamePrefix, numInstances);
-    newAssignment = 
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
 newInstances,
-        NUM_REPLICAS);
+    newAssignment =
+        
SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, 
newInstances, NUM_REPLICAS);
     // There should be 100 segments assigned
     assertEquals(newAssignment.size(), numSegments);
     // Each segment should have 3 replicas
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 64b8a6e5f5d..f4964021ea9 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -99,7 +99,6 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
   }
 
-  ///
   /// Dropping instance from cluster requires waiting for live instance gone 
and removing instance related ZNodes, which
   /// are not the purpose of the test, so combine different rebalance 
scenarios into one test:
   /// 1. NO_OP rebalance
@@ -107,7 +106,6 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
   /// 3. Migrate to replica-group based segment assignment and rebalance
   /// 4. Migrate back to non-replica-group based segment assignment and 
rebalance
   /// 5. Remove (disable) servers and rebalance
-  ///
   @Test
   public void testRebalance()
       throws Exception {
@@ -119,16 +117,14 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
       for (int i = 0; i < numServers; i++) {
         String instanceId = SERVER_INSTANCE_ID_PREFIX + i;
         addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
-        DiskUsageInfo diskUsageInfo1 =
-            new DiskUsageInfo(instanceId, "", 1000L, 500L, 
System.currentTimeMillis());
-        diskUsageInfoMap.put(instanceId, diskUsageInfo1);
+        DiskUsageInfo diskUsageInfo = new DiskUsageInfo(instanceId, "", 1000L, 
500L, System.currentTimeMillis());
+        diskUsageInfoMap.put(instanceId, diskUsageInfo);
       }
 
       ExecutorService executorService = Executors.newFixedThreadPool(10);
       DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
       preChecker.init(_helixResourceManager, executorService, 1);
-      TableRebalancer tableRebalancer =
-          new TableRebalancer(_helixManager, null, null, preChecker, 
_tableSizeReader);
+      TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, 
null, null, preChecker, _tableSizeReader);
       TableConfig tableConfig =
           new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
 
@@ -175,8 +171,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
       
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 3);
       assertNotNull(rebalanceSummaryResult.getTagsInfo());
       assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
-      assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
-          TagNameUtils.getOfflineTagForTenant(null));
+      assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), 
TagNameUtils.getOfflineTagForTenant(null));
       
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 0);
       
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
 numSegments * NUM_REPLICAS);
       
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
 numServers);
@@ -211,8 +206,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
       for (int i = 0; i < numServersToAdd; i++) {
         String instanceId = SERVER_INSTANCE_ID_PREFIX + (numServers + i);
         addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
-        DiskUsageInfo diskUsageInfo =
-            new DiskUsageInfo(instanceId, "", 1000L, 500L, 
System.currentTimeMillis());
+        DiskUsageInfo diskUsageInfo = new DiskUsageInfo(instanceId, "", 1000L, 
500L, System.currentTimeMillis());
         diskUsageInfoMap.put(instanceId, diskUsageInfo);
       }
 
@@ -227,18 +221,17 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
       assertNotNull(rebalanceSummaryResult);
       assertNotNull(rebalanceSummaryResult.getServerInfo());
       assertNotNull(rebalanceSummaryResult.getSegmentInfo());
-      
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 14);
-      
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(),
 14);
+      
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 15);
+      
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(),
 15);
       
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
       
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
       
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 3);
       assertNotNull(rebalanceSummaryResult.getTagsInfo());
       assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
-      assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
-          TagNameUtils.getOfflineTagForTenant(null));
-      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 14);
+      assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), 
TagNameUtils.getOfflineTagForTenant(null));
+      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 15);
       
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
-          numSegments * NUM_REPLICAS - 14);
+          numSegments * NUM_REPLICAS - 15);
       
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
           numServers + numServersToAdd);
       assertNotNull(rebalanceResult.getInstanceAssignment());
@@ -251,19 +244,19 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
         // Original servers should be losing some segments
         String newServer = SERVER_INSTANCE_ID_PREFIX + i;
         RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = 
serverSegmentChangeInfoMap.get(newServer);
-        assertTrue(serverSegmentChange.getSegmentsDeleted() > 0);
-        assertTrue(serverSegmentChange.getSegmentsUnchanged() > 0);
-        assertTrue(serverSegmentChange.getTotalSegmentsBeforeRebalance() > 0);
-        assertTrue(serverSegmentChange.getTotalSegmentsAfterRebalance() > 0);
+        assertEquals(serverSegmentChange.getTotalSegmentsBeforeRebalance(), 
10);
+        assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), 5);
         assertEquals(serverSegmentChange.getSegmentsAdded(), 0);
+        assertEquals(serverSegmentChange.getSegmentsDeleted(), 5);
+        assertEquals(serverSegmentChange.getSegmentsUnchanged(), 5);
       }
       for (int i = 0; i < numServersToAdd; i++) {
         // New servers should only get new segments
         String newServer = SERVER_INSTANCE_ID_PREFIX + (numServers + i);
         RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = 
serverSegmentChangeInfoMap.get(newServer);
-        assertTrue(serverSegmentChange.getSegmentsAdded() > 0);
         assertEquals(serverSegmentChange.getTotalSegmentsBeforeRebalance(), 0);
-        assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), 
serverSegmentChange.getSegmentsAdded());
+        assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), 5);
+        assertEquals(serverSegmentChange.getSegmentsAdded(), 5);
         assertEquals(serverSegmentChange.getSegmentsDeleted(), 0);
         assertEquals(serverSegmentChange.getSegmentsUnchanged(), 0);
       }
@@ -301,16 +294,14 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
           "Instance assignment not allowed, no need for minimizeDataMovement");
       
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE).getPreCheckStatus(),
           RebalancePreCheckerResult.PreCheckStatus.PASS);
-      assertTrue(
-          
preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE)
-              .getMessage()
-              .startsWith("Within threshold"));
+      
assertTrue(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE)
+          .getMessage()
+          .startsWith("Within threshold"));
       
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE).getPreCheckStatus(),
           RebalancePreCheckerResult.PreCheckStatus.PASS);
-      assertTrue(
-          
preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE)
-              .getMessage()
-              .startsWith("Within threshold"));
+      
assertTrue(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE)
+          .getMessage()
+          .startsWith("Within threshold"));
       
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS).getPreCheckStatus(),
           RebalancePreCheckerResult.PreCheckStatus.PASS);
       
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS).getMessage(),
@@ -336,17 +327,17 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
       Map<String, IntIntPair> instanceToNumSegmentsToMoveMap =
           
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(oldSegmentAssignment, 
newSegmentAssignment);
       assertEquals(instanceToNumSegmentsToMoveMap.size(), numServers + 
numServersToAdd);
-      for (int i = 0; i < numServersToAdd; i++) {
-        IntIntPair numSegmentsToMove = 
instanceToNumSegmentsToMoveMap.get(SERVER_INSTANCE_ID_PREFIX + (numServers + 
i));
-        assertNotNull(numSegmentsToMove);
-        assertTrue(numSegmentsToMove.leftInt() > 0);
-        assertEquals(numSegmentsToMove.rightInt(), 0);
-      }
       for (int i = 0; i < numServers; i++) {
         IntIntPair numSegmentsToMove = 
instanceToNumSegmentsToMoveMap.get(SERVER_INSTANCE_ID_PREFIX + i);
         assertNotNull(numSegmentsToMove);
         assertEquals(numSegmentsToMove.leftInt(), 0);
-        assertTrue(numSegmentsToMove.rightInt() > 0);
+        assertEquals(numSegmentsToMove.rightInt(), 5);
+      }
+      for (int i = 0; i < numServersToAdd; i++) {
+        IntIntPair numSegmentsToMove = 
instanceToNumSegmentsToMoveMap.get(SERVER_INSTANCE_ID_PREFIX + (numServers + 
i));
+        assertNotNull(numSegmentsToMove);
+        assertEquals(numSegmentsToMove.leftInt(), 5);
+        assertEquals(numSegmentsToMove.rightInt(), 0);
       }
 
       // Dry-run mode should not change the IdealState
@@ -416,25 +407,25 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
       assertEquals(
           
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(),
           RebalancePreCheckerResult.PreCheckStatus.WARN);
-      
assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO)
-          .getMessage(), "reassignInstances is disabled, replica groups may 
not be updated.\nOFFLINE segments "
-          + "- numReplicaGroups: " + NUM_REPLICAS + ", 
numInstancesPerReplicaGroup: 0 (using as many instances as "
-          + "possible)");
+      assertEquals(
+          
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(),
+          "reassignInstances is disabled, replica groups may not be 
updated.\nOFFLINE segments "
+              + "- numReplicaGroups: " + NUM_REPLICAS + ", 
numInstancesPerReplicaGroup: 0 (using as many instances as "
+              + "possible)");
       rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
       assertNotNull(rebalanceSummaryResult);
       assertNotNull(rebalanceSummaryResult.getServerInfo());
       assertNotNull(rebalanceSummaryResult.getSegmentInfo());
-      
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 11);
-      
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(),
 11);
+      
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 20);
+      
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(),
 20);
       
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 6);
       
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
       assertNotNull(rebalanceSummaryResult.getTagsInfo());
       assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
-      assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
-          TagNameUtils.getOfflineTagForTenant(null));
-      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 11);
+      assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), 
TagNameUtils.getOfflineTagForTenant(null));
+      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 20);
       
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
-          numSegments * NUM_REPLICAS - 11);
+          numSegments * NUM_REPLICAS - 20);
       
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
           numServers + numServersToAdd);
       assertNotNull(rebalanceResult.getInstanceAssignment());
@@ -445,11 +436,8 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
       for (int i = 0; i < numServers + numServersToAdd; i++) {
         String newServer = SERVER_INSTANCE_ID_PREFIX + i;
         RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = 
serverSegmentChangeInfoMap.get(newServer);
+        assertEquals(serverSegmentChange.getTotalSegmentsBeforeRebalance(), 5);
         assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), 5);
-        // Ensure not all segments moved
-        assertTrue(serverSegmentChange.getSegmentsUnchanged() > 0);
-        // Ensure all segments has something assigned prior to rebalance
-        assertTrue(serverSegmentChange.getTotalSegmentsBeforeRebalance() > 0);
       }
 
       // Dry-run mode should not change the IdealState
@@ -515,8 +503,9 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
       assertEquals(
           
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(),
           RebalancePreCheckerResult.PreCheckStatus.PASS);
-      
assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO)
-          .getMessage(), "OFFLINE segments - Replica Groups are not enabled, 
replication: " + NUM_REPLICAS);
+      assertEquals(
+          
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(),
+          "OFFLINE segments - Replica Groups are not enabled, replication: " + 
NUM_REPLICAS);
       rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
       assertNotNull(rebalanceSummaryResult);
       assertNotNull(rebalanceSummaryResult.getServerInfo());
@@ -528,11 +517,9 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
       
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 0);
       assertNotNull(rebalanceSummaryResult.getTagsInfo());
       assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
-      assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
-          TagNameUtils.getOfflineTagForTenant(null));
+      assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), 
TagNameUtils.getOfflineTagForTenant(null));
       
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 0);
-      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
-          numSegments * NUM_REPLICAS);
+      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
 numSegments * NUM_REPLICAS);
       
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
           numServers + numServersToAdd);
       assertNotNull(rebalanceResult.getInstanceAssignment());
@@ -557,8 +544,9 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
       assertEquals(
           
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(),
           RebalancePreCheckerResult.PreCheckStatus.PASS);
-      
assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO)
-          .getMessage(), "OFFLINE segments - Replica Groups are not enabled, 
replication: " + NUM_REPLICAS);
+      assertEquals(
+          
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(),
+          "OFFLINE segments - Replica Groups are not enabled, replication: " + 
NUM_REPLICAS);
       rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
       assertNotNull(rebalanceSummaryResult);
       assertNotNull(rebalanceSummaryResult.getServerInfo());
@@ -571,11 +559,9 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
       
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 0);
       assertNotNull(rebalanceSummaryResult.getTagsInfo());
       assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
-      assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
-          TagNameUtils.getOfflineTagForTenant(null));
+      assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), 
TagNameUtils.getOfflineTagForTenant(null));
       
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 0);
-      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
-          numSegments * NUM_REPLICAS);
+      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
 numSegments * NUM_REPLICAS);
       
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
           numServers + numServersToAdd);
       assertNotNull(rebalanceResult.getInstanceAssignment());
@@ -628,13 +614,11 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
       
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 3);
       assertNotNull(rebalanceSummaryResult.getTagsInfo());
       assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
-      assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
-          TagNameUtils.getOfflineTagForTenant(null));
+      assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), 
TagNameUtils.getOfflineTagForTenant(null));
       
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 15);
       
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
           numSegments * NUM_REPLICAS - 15);
-      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
-          numServers);
+      
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
 numServers);
       assertNotNull(rebalanceResult.getInstanceAssignment());
       assertNotNull(rebalanceResult.getSegmentAssignment());
 
@@ -701,10 +685,11 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
       preChecker.init(_helixResourceManager, executorService, 1);
       TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, 
null, null, preChecker, _tableSizeReader);
       // Set up the table with 1 replication factor and strict replica group 
enabled
-      TableConfig tableConfig =
-          new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(1)
-              .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE,
-                  false)).build();
+      TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+          .setNumReplicas(1)
+          .setRoutingConfig(
+              new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+          .build();
 
       // Create the table
       addDummySchema(RAW_TABLE_NAME);
@@ -783,20 +768,17 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
 
     InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
         new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 0, 1, 
false, null);
-    InstanceAssignmentConfig instanceAssignmentConfig =
-        new InstanceAssignmentConfig(
-            new 
InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(null), false, 0, 
null), null,
-            replicaGroupPartitionConfig,
-            
InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR.name(),
 true);
-    TableConfig tableConfig =
-        new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
-            .setNumReplicas(numReplicas)
-            .setRoutingConfig(
-                new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
-            .setStreamConfigs(
-                
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(numPartitions).getStreamConfigsMap())
-            
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(), 
instanceAssignmentConfig))
-            .build();
+    InstanceAssignmentConfig instanceAssignmentConfig = new 
InstanceAssignmentConfig(
+        new InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(null), 
false, 0, null), null,
+        replicaGroupPartitionConfig,
+        
InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR.name(),
 true);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+        .setNumReplicas(numReplicas)
+        .setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+        
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(numPartitions).getStreamConfigsMap())
+        
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(), 
instanceAssignmentConfig))
+        .build();
 
     // Create the table
     addDummySchema(RAW_TABLE_NAME);
@@ -983,10 +965,11 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     preChecker.init(_helixResourceManager, executorService, 1);
     TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, 
null, preChecker, _tableSizeReader);
     // Set up the table with 1 replication factor and strict replica group 
enabled
-    TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(1)
-            .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE,
-                false)).build();
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        .setNumReplicas(1)
+        .setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+        .build();
 
     // Create the table
     addDummySchema(RAW_TABLE_NAME);
@@ -1031,16 +1014,14 @@ public class TableRebalancerClusterStatelessTest 
extends ControllerTest {
     for (int i = 0; i < numServers; i++) {
       String instanceId = "preCheckerDiskUtil_" + SERVER_INSTANCE_ID_PREFIX + 
i;
       addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
-      DiskUsageInfo diskUsageInfo1 =
-          new DiskUsageInfo(instanceId, "", 1000L, 200L, 
System.currentTimeMillis());
+      DiskUsageInfo diskUsageInfo1 = new DiskUsageInfo(instanceId, "", 1000L, 
200L, System.currentTimeMillis());
       diskUsageInfoMap.put(instanceId, diskUsageInfo1);
     }
 
     ExecutorService executorService = Executors.newFixedThreadPool(10);
     DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
     preChecker.init(_helixResourceManager, executorService, 0.5);
-    TableRebalancer tableRebalancer =
-        new TableRebalancer(_helixManager, null, null, preChecker, 
_tableSizeReader);
+    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, 
null, preChecker, _tableSizeReader);
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
 
@@ -1062,8 +1043,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     for (int i = 0; i < numServersToAdd; i++) {
       String instanceId = "preCheckerDiskUtil_" + SERVER_INSTANCE_ID_PREFIX + 
(numServers + i);
       addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
-      DiskUsageInfo diskUsageInfo =
-          new DiskUsageInfo(instanceId, "", 1000L, 200L, 
System.currentTimeMillis());
+      DiskUsageInfo diskUsageInfo = new DiskUsageInfo(instanceId, "", 1000L, 
200L, System.currentTimeMillis());
       diskUsageInfoMap.put(instanceId, diskUsageInfo);
     }
 
@@ -1081,22 +1061,19 @@ public class TableRebalancerClusterStatelessTest 
extends ControllerTest {
     
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE));
     
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE).getPreCheckStatus(),
         RebalancePreCheckerResult.PreCheckStatus.PASS);
-    assertTrue(
-        
preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE)
-            .getMessage()
-            .startsWith("Within threshold"));
+    
assertTrue(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE)
+        .getMessage()
+        .startsWith("Within threshold"));
     
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE));
     
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE).getPreCheckStatus(),
         RebalancePreCheckerResult.PreCheckStatus.PASS);
-    assertTrue(
-        
preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE)
-            .getMessage()
-            .startsWith("Within threshold"));
+    
assertTrue(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE)
+        .getMessage()
+        .startsWith("Within threshold"));
 
     for (int i = 0; i < numServers + numServersToAdd; i++) {
       String instanceId = "preCheckerDiskUtil_" + SERVER_INSTANCE_ID_PREFIX + 
i;
-      DiskUsageInfo diskUsageInfo =
-          new DiskUsageInfo(instanceId, "", 1000L, 755L, 
System.currentTimeMillis());
+      DiskUsageInfo diskUsageInfo = new DiskUsageInfo(instanceId, "", 1000L, 
755L, System.currentTimeMillis());
       diskUsageInfoMap.put(instanceId, diskUsageInfo);
     }
 
@@ -1139,13 +1116,11 @@ public class TableRebalancerClusterStatelessTest 
extends ControllerTest {
     ExecutorService executorService = Executors.newFixedThreadPool(10);
     DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
     preChecker.init(_helixResourceManager, executorService, 0.5);
-    TableRebalancer tableRebalancer =
-        new TableRebalancer(_helixManager, null, null, preChecker, 
_tableSizeReader);
-    TableConfig tableConfig =
-        new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
-            .setNumReplicas(2)
-            
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
-            .build();
+    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, 
null, preChecker, _tableSizeReader);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+        .setNumReplicas(2)
+        
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+        .build();
 
     // Create the table
     addDummySchema(RAW_TABLE_NAME);
@@ -1187,13 +1162,12 @@ public class TableRebalancerClusterStatelessTest 
extends ControllerTest {
     rebalanceConfig.setUpdateTargetTier(false);
     rebalanceConfig.setBootstrap(false);
     rebalanceConfig.setBestEfforts(false);
-    tableConfig =
-        new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
-            .setTierConfigList(Collections.singletonList(
-                new TierConfig("dummyTier", 
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "7d", null,
-                    TierFactory.PINOT_SERVER_STORAGE_TYPE,
-                    
TagNameUtils.getRealtimeTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME), null, 
null)))
-            .build();
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+        .setTierConfigList(Collections.singletonList(
+            new TierConfig("dummyTier", 
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "7d", null,
+                TierFactory.PINOT_SERVER_STORAGE_TYPE,
+                
TagNameUtils.getRealtimeTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME), null, 
null)))
+        .build();
 
     rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
     preCheckerResult = 
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS);
@@ -1218,8 +1192,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
         "Number of replicas (3) is greater than 1, downtime is not 
recommended.");
 
     // no downtime warning with 1 replica
-    newTableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(1).build();
+    newTableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(1).build();
 
     rebalanceConfig.setDowntime(true);
     rebalanceResult = tableRebalancer.rebalance(newTableConfig, 
rebalanceConfig, null);
@@ -1312,8 +1285,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     assertEquals(preCheckerResult.getMessage(),
         "Number of segments to add to a single server (" + 
expectedNumSegmentsToAdd + ") is high (>"
             + DefaultRebalancePreChecker.SEGMENT_ADD_THRESHOLD + "). It is 
recommended to set batchSizePerServer to "
-            + DefaultRebalancePreChecker.RECOMMENDED_BATCH_SIZE
-            + " or lower to avoid excessive load on servers.");
+            + DefaultRebalancePreChecker.RECOMMENDED_BATCH_SIZE + " or lower 
to avoid excessive load on servers.");
 
     
rebalanceConfig.setBatchSizePerServer(DefaultRebalancePreChecker.RECOMMENDED_BATCH_SIZE);
     rebalanceResult = tableRebalancer.rebalance(newTableConfig, 
rebalanceConfig, null);
@@ -1330,13 +1302,11 @@ public class TableRebalancerClusterStatelessTest 
extends ControllerTest {
     executorService.shutdown();
   }
 
-  /**
-   * Tests rebalance with tier configs
-   * Add 10 segments, with segment metadata end time 3 days apart starting 
from now to 30 days ago
-   * 1. run rebalance - should see no change
-   * 2. add nodes for tiers and run rebalance - should see no change
-   * 3. add tier config and run rebalance - should see changed assignment
-   */
+  /// Tests rebalance with tier configs
+  /// Add 10 segments, with segment metadata end time 3 days apart starting 
from now to 30 days ago
+  /// 1. run rebalance - should see no change
+  /// 2. add nodes for tiers and run rebalance - should see no change
+  /// 3. add tier config and run rebalance - should see changed assignment
   @Test
   public void testRebalanceWithTiers()
       throws Exception {
@@ -1387,10 +1357,8 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
         TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME));
     
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 0);
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
-        numSegments * NUM_REPLICAS);
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
-        numServers);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
 numSegments * NUM_REPLICAS);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
 numServers);
     assertNotNull(rebalanceResult.getInstanceAssignment());
     assertNotNull(rebalanceResult.getSegmentAssignment());
 
@@ -1428,10 +1396,8 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
         TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME));
     
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 0);
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
-        numSegments * NUM_REPLICAS);
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
-        numServers);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
 numSegments * NUM_REPLICAS);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
 numServers);
     assertNotNull(rebalanceResult.getInstanceAssignment());
     assertNotNull(rebalanceResult.getSegmentAssignment());
 
@@ -1554,8 +1520,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
     ExecutorService executorService = Executors.newFixedThreadPool(10);
     preChecker.init(_helixResourceManager, executorService, 1);
-    TableRebalancer tableRebalancer =
-        new TableRebalancer(_helixManager, null, null, preChecker, 
_tableSizeReader);
+    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, 
null, preChecker, _tableSizeReader);
 
     // Try dry-run summary mode
     RebalanceConfig rebalanceConfig = new RebalanceConfig();
@@ -1575,10 +1540,8 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
         TagNameUtils.getOfflineTagForTenant("replicaAssignment" + 
NO_TIER_NAME));
     
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 0);
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
-        numSegments * NUM_REPLICAS);
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
-        numServers);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
 numSegments * NUM_REPLICAS);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
 numServers);
     assertNotNull(rebalanceResult.getInstanceAssignment());
     assertNotNull(rebalanceResult.getSegmentAssignment());
 
@@ -1612,10 +1575,8 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
         TagNameUtils.getOfflineTagForTenant("replicaAssignment" + 
NO_TIER_NAME));
     
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 0);
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
-        numSegments * NUM_REPLICAS);
-    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
-        numServers);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
 numSegments * NUM_REPLICAS);
+    
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
 numServers);
     assertNotNull(rebalanceResult.getInstanceAssignment());
     assertNotNull(rebalanceResult.getSegmentAssignment());
 
@@ -1662,14 +1623,11 @@ public class TableRebalancerClusterStatelessTest 
extends ControllerTest {
     
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + NO_TIER_NAME))
         .getNumSegmentsToDownload(), 0);
     
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + NO_TIER_NAME))
-            .getNumSegmentsUnchanged(),
-        0);
+        .getNumSegmentsUnchanged(), 0);
     
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + NO_TIER_NAME))
-            .getNumServerParticipants(),
-        0);
+        .getNumServerParticipants(), 0);
     
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + TIER_A_NAME))
-            .getNumSegmentsToDownload(),
-        numSegments * NUM_REPLICAS);
+        .getNumSegmentsToDownload(), numSegments * NUM_REPLICAS);
     
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + TIER_A_NAME))
         .getNumSegmentsUnchanged(), 0);
     
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + TIER_A_NAME))
@@ -1709,7 +1667,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     rebalanceConfig.setPreChecks(true);
     rebalanceConfig.setReassignInstances(true);
     rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
-    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
     assertEquals(
         
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(),
         RebalancePreCheckerResult.PreCheckStatus.PASS);
@@ -1721,7 +1679,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     assertNotNull(rebalanceSummaryResult);
     assertNotNull(rebalanceSummaryResult.getServerInfo());
     assertNotNull(rebalanceSummaryResult.getSegmentInfo());
-    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 13);
+    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 0);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 6);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
     assertNotNull(rebalanceSummaryResult.getTagsInfo());
@@ -1734,16 +1692,13 @@ public class TableRebalancerClusterStatelessTest 
extends ControllerTest {
     
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + NO_TIER_NAME))
         .getNumSegmentsToDownload(), 0);
     
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + NO_TIER_NAME))
-            .getNumSegmentsUnchanged(),
-        0);
+        .getNumSegmentsUnchanged(), 0);
     
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + NO_TIER_NAME))
-            .getNumServerParticipants(),
-        0);
+        .getNumServerParticipants(), 0);
     
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + TIER_A_NAME))
-            .getNumSegmentsToDownload(),
-        13);
+        .getNumSegmentsToDownload(), 0);
     
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + TIER_A_NAME))
-        .getNumSegmentsUnchanged(), numSegments * NUM_REPLICAS - 13);
+        .getNumSegmentsUnchanged(), numSegments * NUM_REPLICAS);
     
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + TIER_A_NAME))
         .getNumServerParticipants(), 6);
     assertNotNull(rebalanceResult.getInstanceAssignment());
@@ -1751,7 +1706,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     assertNotNull(rebalanceResult.getSegmentAssignment());
 
     rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig(), null);
-    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
     
assertTrue(rebalanceResult.getTierInstanceAssignment().containsKey(TIER_A_NAME));
 
     InstancePartitions instancePartitions = 
rebalanceResult.getTierInstanceAssignment().get(TIER_A_NAME);
@@ -1823,26 +1778,20 @@ public class TableRebalancerClusterStatelessTest 
extends ControllerTest {
     
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + NO_TIER_NAME))
         .getNumSegmentsToDownload(), 0);
     
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + NO_TIER_NAME))
-            .getNumSegmentsUnchanged(),
-        0);
+        .getNumSegmentsUnchanged(), 0);
     
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + NO_TIER_NAME))
-            .getNumServerParticipants(),
-        0);
+        .getNumServerParticipants(), 0);
     
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + TIER_A_NAME))
-            .getNumSegmentsToDownload(),
-        0);
+        .getNumSegmentsToDownload(), 0);
     
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + TIER_A_NAME))
         .getNumSegmentsUnchanged(), 0);
     
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
 + TIER_A_NAME))
         .getNumServerParticipants(), 0);
-    assertEquals(
-        
tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumSegmentsToDownload(),
+    
assertEquals(tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumSegmentsToDownload(),
         0);
-    assertEquals(
-        
tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumSegmentsUnchanged(),
+    
assertEquals(tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumSegmentsUnchanged(),
         numSegments * NUM_REPLICAS);
-    assertEquals(
-        
tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumServerParticipants(),
+    
assertEquals(tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumServerParticipants(),
         6);
 
     _helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
@@ -1852,7 +1801,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
   @Test
   public void testRebalanceWithMinimizeDataMovementBalanced()
       throws Exception {
-    int numServers = 6;
+    int numServers = 3;
     for (int i = 0; i < numServers; i++) {
       
addFakeServerInstanceToAutoJoinHelixCluster("minimizeDataMovement_balance_" + 
SERVER_INSTANCE_ID_PREFIX + i,
           true);
@@ -2111,13 +2060,11 @@ public class TableRebalancerClusterStatelessTest 
extends ControllerTest {
     }
 
     ConsumingSegmentInfoReader mockConsumingSegmentInfoReader = 
Mockito.mock(ConsumingSegmentInfoReader.class);
-    TableRebalancer tableRebalancerOriginal =
-        new TableRebalancer(_helixManager, null, null, null, _tableSizeReader);
-    TableConfig tableConfig =
-        new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
-            .setNumReplicas(numReplica)
-            
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
-            .build();
+    TableRebalancer tableRebalancerOriginal = new 
TableRebalancer(_helixManager, null, null, null, _tableSizeReader);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+        .setNumReplicas(numReplica)
+        
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+        .build();
 
     // Create the table
     addDummySchema(RAW_TABLE_NAME);
@@ -2149,11 +2096,8 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     
assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
 0);
     
assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp().size(),
 0);
     
assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().size(),
 0);
-    assertEquals(consumingSegmentToBeMovedSummary
-        .getServerConsumingSegmentSummary()
-        .size(), 0);
-    assertTrue(consumingSegmentToBeMovedSummary
-        .getServerConsumingSegmentSummary()
+    
assertEquals(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary().size(),
 0);
+    
assertTrue(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary()
         .values()
         .stream()
         .allMatch(x -> x.getTotalOffsetsToCatchUpAcrossAllConsumingSegments() 
== 0));
@@ -2168,11 +2112,8 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     
assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
 0);
     
assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp().size(),
 0);
     
assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().size(),
 0);
-    assertEquals(consumingSegmentToBeMovedSummary
-        .getServerConsumingSegmentSummary()
-        .size(), 0);
-    assertTrue(consumingSegmentToBeMovedSummary
-        .getServerConsumingSegmentSummary()
+    
assertEquals(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary().size(),
 0);
+    
assertTrue(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary()
         .values()
         .stream()
         .allMatch(x -> x.getTotalOffsetsToCatchUpAcrossAllConsumingSegments() 
== 0));
@@ -2203,11 +2144,8 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     }
     
assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().size(),
         FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS);
-    assertEquals(consumingSegmentToBeMovedSummary
-        .getServerConsumingSegmentSummary()
-        .size(), numServers);
-    assertTrue(consumingSegmentToBeMovedSummary
-        .getServerConsumingSegmentSummary()
+    
assertEquals(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary().size(),
 numServers);
+    
assertTrue(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary()
         .values()
         .stream()
         .allMatch(x -> x.getTotalOffsetsToCatchUpAcrossAllConsumingSegments()
@@ -2231,13 +2169,11 @@ public class TableRebalancerClusterStatelessTest 
extends ControllerTest {
       addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
     }
 
-    TableRebalancer tableRebalancerOriginal =
-        new TableRebalancer(_helixManager, null, null, null, _tableSizeReader);
-    TableConfig tableConfig =
-        new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
-            .setNumReplicas(numReplica)
-            
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
-            .build();
+    TableRebalancer tableRebalancerOriginal = new 
TableRebalancer(_helixManager, null, null, null, _tableSizeReader);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+        .setNumReplicas(numReplica)
+        
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+        .build();
 
     // Create the table
     addDummySchema(RAW_TABLE_NAME);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to