This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 604cd16558 Add minimizeDataMovement to Rebalacne API (#15110)
604cd16558 is described below

commit 604cd165586716ae04db22856371e50147e30459
Author: Jhow <44998515+j-howhu...@users.noreply.github.com>
AuthorDate: Fri Mar 14 10:12:33 2025 -0700

    Add minimizeDataMovement to Rebalacne API (#15110)
    
    * Add minimizeDataMovement param to RebalanceConfig
    
    * add minimizeDataMovement in RebalanceTableCommand
---
 .../api/resources/PinotTableRestletResource.java   |   4 +
 .../instance/InstanceAssignmentDriver.java         |  38 ++-
 .../helix/core/rebalance/RebalanceConfig.java      |  36 ++-
 .../helix/core/rebalance/TableRebalancer.java      |  69 ++++--
 .../instance/InstanceAssignmentTest.java           | 244 ++++++++++++++++--
 .../TableRebalancerClusterStatelessTest.java       | 275 ++++++++++++++++++++-
 .../apache/pinot/tools/PinotTableRebalancer.java   |   4 +-
 .../tools/admin/command/RebalanceTableCommand.java |   9 +-
 8 files changed, 620 insertions(+), 59 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 5de903eb9d..fed660e988 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -612,6 +612,9 @@ public class PinotTableRestletResource {
       @QueryParam("reassignInstances") boolean reassignInstances,
       @ApiParam(value = "Whether to reassign CONSUMING segments for real-time 
table") @DefaultValue("true")
       @QueryParam("includeConsuming") boolean includeConsuming,
+      @ApiParam(value = "Whether to enable minimize data movement on 
rebalance, DEFAULT will use "
+          + "the minimizeDataMovement in table config") @DefaultValue("ENABLE")
+      @QueryParam("minimizeDataMovement") 
RebalanceConfig.MinimizeDataMovementOptions minimizeDataMovement,
       @ApiParam(value = "Whether to rebalance table in bootstrap mode 
(regardless of minimum segment movement, "
           + "reassign all segments in a round-robin fashion as if adding new 
segments to an empty table)")
       @DefaultValue("false") @QueryParam("bootstrap") boolean bootstrap,
@@ -651,6 +654,7 @@ public class PinotTableRestletResource {
     rebalanceConfig.setPreChecks(preChecks);
     rebalanceConfig.setReassignInstances(reassignInstances);
     rebalanceConfig.setIncludeConsuming(includeConsuming);
+    rebalanceConfig.setMinimizeDataMovement(minimizeDataMovement);
     rebalanceConfig.setBootstrap(bootstrap);
     rebalanceConfig.setDowntime(downtime);
     rebalanceConfig.setMinAvailableReplicas(minAvailableReplicas);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index 09866c1ed7..5ee9258014 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -55,40 +55,66 @@ public class InstanceAssignmentDriver {
 
   public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
       List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions) {
+    return assignInstances(instancePartitionsType, instanceConfigs, 
existingInstancePartitions, (Boolean) null);
+  }
+
+  public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
+      List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions,
+      @Nullable Boolean minimizeDataMovement) {
     String tableNameWithType = _tableConfig.getTableName();
     InstanceAssignmentConfig assignmentConfig =
         
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, 
instancePartitionsType);
     return getInstancePartitions(
         
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
-        assignmentConfig, instanceConfigs, existingInstancePartitions, null);
+        assignmentConfig, instanceConfigs, existingInstancePartitions, null, 
minimizeDataMovement);
   }
 
   public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
       List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions,
       @Nullable InstancePartitions preConfiguredInstancePartitions) {
+    return assignInstances(instancePartitionsType, instanceConfigs, 
existingInstancePartitions,
+        preConfiguredInstancePartitions, null);
+  }
+
+  public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
+      List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions,
+      @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable 
Boolean minimizeDataMovement) {
     String tableNameWithType = _tableConfig.getTableName();
     InstanceAssignmentConfig assignmentConfig =
         
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, 
instancePartitionsType);
     return getInstancePartitions(
         
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
-        assignmentConfig, instanceConfigs, existingInstancePartitions, 
preConfiguredInstancePartitions);
+        assignmentConfig, instanceConfigs, existingInstancePartitions, 
preConfiguredInstancePartitions,
+        minimizeDataMovement);
   }
 
   public InstancePartitions assignInstances(String tierName, 
List<InstanceConfig> instanceConfigs,
       @Nullable InstancePartitions existingInstancePartitions, 
InstanceAssignmentConfig instanceAssignmentConfig) {
+    return assignInstances(tierName, instanceConfigs, 
existingInstancePartitions, instanceAssignmentConfig, null);
+  }
+
+  public InstancePartitions assignInstances(String tierName, 
List<InstanceConfig> instanceConfigs,
+      @Nullable InstancePartitions existingInstancePartitions, 
InstanceAssignmentConfig instanceAssignmentConfig,
+      @Nullable Boolean minimizeDataMovement) {
     return getInstancePartitions(
         
InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(),
 tierName),
-        instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, 
null);
+        instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, 
null, minimizeDataMovement);
   }
 
   private InstancePartitions getInstancePartitions(String 
instancePartitionsName,
       InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig> 
instanceConfigs,
       @Nullable InstancePartitions existingInstancePartitions,
-      @Nullable InstancePartitions preConfiguredInstancePartitions) {
+      @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable 
Boolean minimizeDataMovementOverride) {
     String tableNameWithType = _tableConfig.getTableName();
-    LOGGER.info("Starting {} instance assignment for table {}", 
instancePartitionsName, tableNameWithType);
 
-    boolean minimizeDataMovement = 
instanceAssignmentConfig.isMinimizeDataMovement();
+    // minimizeDataMovement might be set back to false within 
InstanceTagPoolSelector and InstancePartitionSelector
+    // if existingInstancePartitions is null.
+    boolean minimizeDataMovement =
+        minimizeDataMovementOverride == null ? 
instanceAssignmentConfig.isMinimizeDataMovement()
+            : minimizeDataMovementOverride;
+    LOGGER.info("Starting {} instance assignment for table {}, original 
minimizeDataMovement: {}, "
+            + "overriding with minimizeDataMovementOverride: {}", 
instancePartitionsName, tableNameWithType,
+        instanceAssignmentConfig.isMinimizeDataMovement(), 
minimizeDataMovementOverride);
     InstanceTagPoolSelector tagPoolSelector =
         new 
InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), 
tableNameWithType,
             minimizeDataMovement, existingInstancePartitions);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
index f69f6be3d9..381a22fa3c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
@@ -83,6 +83,18 @@ public class RebalanceConfig {
   @ApiModelProperty(example = "false")
   private boolean _bestEfforts = false;
 
+  // Whether to run Minimal Data Movement Algorithm, overriding the 
minimizeDataMovement flag in table config. If set
+  // to default, the minimizeDataMovement flag in table config will be used to 
determine whether to run the Minimal
+  // Data Movement Algorithm.
+  @ApiModel
+  public enum MinimizeDataMovementOptions {
+    ENABLE, DISABLE, DEFAULT
+  }
+
+  @JsonProperty("minimizeDataMovement")
+  @ApiModelProperty(dataType = "string", allowableValues = "ENABLE, DISABLE, 
DEFAULT", example = "ENABLE")
+  private MinimizeDataMovementOptions _minimizeDataMovement = 
MinimizeDataMovementOptions.ENABLE;
+
   // The check on external view can be very costly when the table has very 
large ideal and external states, i.e. when
   // having a huge number of segments. These two configs help reduce the cpu 
load on controllers, e.g. by doing the
   // check less frequently and bail out sooner to rebalance at best effort if 
configured so.
@@ -244,16 +256,24 @@ public class RebalanceConfig {
     _retryInitialDelayInMs = retryInitialDelayInMs;
   }
 
+  public MinimizeDataMovementOptions getMinimizeDataMovement() {
+    return _minimizeDataMovement;
+  }
+
+  public void setMinimizeDataMovement(MinimizeDataMovementOptions 
minimizeDataMovement) {
+    _minimizeDataMovement = minimizeDataMovement;
+  }
+
   @Override
   public String toString() {
-    return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", preChecks=" + 
_preChecks
-        + ", _reassignInstances=" + _reassignInstances + ", 
_includeConsuming=" + _includeConsuming + ", _bootstrap="
-        + _bootstrap + ", _downtime=" + _downtime + ", _minAvailableReplicas=" 
+ _minAvailableReplicas
-        + ", _bestEfforts=" + _bestEfforts + ", 
_externalViewCheckIntervalInMs=" + _externalViewCheckIntervalInMs
-        + ", _externalViewStabilizationTimeoutInMs=" + 
_externalViewStabilizationTimeoutInMs + ", _updateTargetTier="
-        + _updateTargetTier + ", _heartbeatIntervalInMs=" + 
_heartbeatIntervalInMs + ", _heartbeatTimeoutInMs="
-        + _heartbeatTimeoutInMs + ", _maxAttempts=" + _maxAttempts + ", 
_retryInitialDelayInMs="
-        + _retryInitialDelayInMs + '}';
+    return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", preChecks=" + 
_preChecks + ", _reassignInstances="
+        + _reassignInstances + ", _includeConsuming=" + _includeConsuming + ", 
_minimizeDataMovement="
+        + _minimizeDataMovement + ", _bootstrap=" + _bootstrap + ", 
_downtime=" + _downtime + ", _minAvailableReplicas="
+        + _minAvailableReplicas + ", _bestEfforts=" + _bestEfforts + ", 
_externalViewCheckIntervalInMs="
+        + _externalViewCheckIntervalInMs + ", 
_externalViewStabilizationTimeoutInMs="
+        + _externalViewStabilizationTimeoutInMs + ", _updateTargetTier=" + 
_updateTargetTier
+        + ", _heartbeatIntervalInMs=" + _heartbeatIntervalInMs + ", 
_heartbeatTimeoutInMs=" + _heartbeatTimeoutInMs
+        + ", _maxAttempts=" + _maxAttempts + ", _retryInitialDelayInMs=" + 
_retryInitialDelayInMs + '}';
   }
 
   public static RebalanceConfig copy(RebalanceConfig cfg) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 34cab7c29d..fd79e25b0a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -191,17 +191,32 @@ public class TableRebalancer {
     boolean bestEfforts = rebalanceConfig.isBestEfforts();
     long externalViewCheckIntervalInMs = 
rebalanceConfig.getExternalViewCheckIntervalInMs();
     long externalViewStabilizationTimeoutInMs = 
rebalanceConfig.getExternalViewStabilizationTimeoutInMs();
+    Boolean minimizeDataMovement;
+    switch (rebalanceConfig.getMinimizeDataMovement()) {
+      case DEFAULT:
+        minimizeDataMovement = null;
+        break;
+      case ENABLE:
+        minimizeDataMovement = true;
+        break;
+      case DISABLE:
+        minimizeDataMovement = false;
+        break;
+      default:
+        throw new IllegalStateException(
+            "Invalid minimizeDataMovement option: " + 
rebalanceConfig.getMinimizeDataMovement());
+    }
     boolean enableStrictReplicaGroup = tableConfig.getRoutingConfig() != null
         && 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
         tableConfig.getRoutingConfig().getInstanceSelectorType());
     LOGGER.info(
-        "Start rebalancing table: {} with dryRun: {}, preChecks: {}, 
reassignInstances: {}, includeConsuming: {},"
-            + "bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime: 
{}, enableStrictReplicaGroup: {},"
-            + "lowDiskMode: {}, bestEfforts: {}, 
externalViewCheckIntervalInMs: {}, "
-            + "externalViewStabilizationTimeoutInMs: {}",
+        "Start rebalancing table: {} with dryRun: {}, preChecks: {}, 
reassignInstances: {}, "
+            + "includeConsuming: {}, bootstrap: {}, downtime: {}, 
minReplicasToKeepUpForNoDowntime: {}, "
+            + "enableStrictReplicaGroup: {}, lowDiskMode: {}, bestEfforts: {}, 
externalViewCheckIntervalInMs: {}, "
+            + "externalViewStabilizationTimeoutInMs: {}, minimizeDataMovement: 
{}",
         tableNameWithType, dryRun, preChecks, reassignInstances, 
includeConsuming, bootstrap, downtime,
         minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup, 
lowDiskMode, bestEfforts,
-        externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs);
+        externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs, 
minimizeDataMovement);
 
     // Perform pre-checks if enabled
     Map<String, String> preChecksResult = null;
@@ -253,7 +268,7 @@ public class TableRebalancer {
     boolean instancePartitionsUnchanged;
     try {
       Pair<Map<InstancePartitionsType, InstancePartitions>, Boolean> 
instancePartitionsMapAndUnchanged =
-          getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, 
dryRun);
+          getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, 
dryRun, minimizeDataMovement);
       instancePartitionsMap = instancePartitionsMapAndUnchanged.getLeft();
       instancePartitionsUnchanged = 
instancePartitionsMapAndUnchanged.getRight();
     } catch (Exception e) {
@@ -272,7 +287,8 @@ public class TableRebalancer {
     try {
       sortedTiers = getSortedTiers(tableConfig, providedTierToSegmentsMap);
       Pair<Map<String, InstancePartitions>, Boolean> 
tierToInstancePartitionsMapAndUnchanged =
-          getTierToInstancePartitionsMap(tableConfig, sortedTiers, 
reassignInstances, bootstrap, dryRun);
+          getTierToInstancePartitionsMap(tableConfig, sortedTiers, 
reassignInstances, bootstrap, dryRun,
+              minimizeDataMovement);
       tierToInstancePartitionsMap = 
tierToInstancePartitionsMapAndUnchanged.getLeft();
       tierInstancePartitionsUnchanged = 
tierToInstancePartitionsMapAndUnchanged.getRight();
     } catch (Exception e) {
@@ -488,9 +504,11 @@ public class TableRebalancer {
           try {
             // Re-calculate the instance partitions in case the instance 
configs changed during the rebalance
             instancePartitionsMap =
-                getInstancePartitionsMap(tableConfig, reassignInstances, 
bootstrap, false).getLeft();
+                getInstancePartitionsMap(tableConfig, reassignInstances, 
bootstrap, false,
+                    minimizeDataMovement).getLeft();
             tierToInstancePartitionsMap =
-                getTierToInstancePartitionsMap(tableConfig, sortedTiers, 
reassignInstances, bootstrap, false).getLeft();
+                getTierToInstancePartitionsMap(tableConfig, sortedTiers, 
reassignInstances, bootstrap, false,
+                    minimizeDataMovement).getLeft();
             targetAssignment = 
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, 
sortedTiers,
                 tierToInstancePartitionsMap, rebalanceConfig);
           } catch (Exception e) {
@@ -727,22 +745,31 @@ public class TableRebalancer {
    */
   public Pair<Map<InstancePartitionsType, InstancePartitions>, Boolean> 
getInstancePartitionsMap(
       TableConfig tableConfig, boolean reassignInstances, boolean bootstrap, 
boolean dryRun) {
+    return getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, 
dryRun, false);
+  }
+
+  public Pair<Map<InstancePartitionsType, InstancePartitions>, Boolean> 
getInstancePartitionsMap(
+      TableConfig tableConfig, boolean reassignInstances, boolean bootstrap, 
boolean dryRun,
+      @Nullable Boolean minimizeDataMovement) {
     boolean instancePartitionsUnchanged;
     Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = 
new TreeMap<>();
     if (tableConfig.getTableType() == TableType.OFFLINE) {
       Pair<InstancePartitions, Boolean> partitionAndUnchangedForOffline =
-          getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, 
reassignInstances, bootstrap, dryRun);
+          getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, 
reassignInstances, bootstrap, dryRun,
+              minimizeDataMovement);
       instancePartitionsMap.put(InstancePartitionsType.OFFLINE, 
partitionAndUnchangedForOffline.getLeft());
       instancePartitionsUnchanged = partitionAndUnchangedForOffline.getRight();
     } else {
       Pair<InstancePartitions, Boolean> partitionAndUnchangedForConsuming =
-          getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, 
reassignInstances, bootstrap, dryRun);
+          getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, 
reassignInstances, bootstrap, dryRun,
+              minimizeDataMovement);
       instancePartitionsMap.put(InstancePartitionsType.CONSUMING, 
partitionAndUnchangedForConsuming.getLeft());
       instancePartitionsUnchanged = 
partitionAndUnchangedForConsuming.getRight();
       String tableNameWithType = tableConfig.getTableName();
       if 
(InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig)) {
         Pair<InstancePartitions, Boolean> partitionAndUnchangedForCompleted =
-            getInstancePartitions(tableConfig, 
InstancePartitionsType.COMPLETED, reassignInstances, bootstrap, dryRun);
+            getInstancePartitions(tableConfig, 
InstancePartitionsType.COMPLETED, reassignInstances, bootstrap, dryRun,
+                minimizeDataMovement);
         LOGGER.info(
             "COMPLETED segments should be relocated, fetching/computing 
COMPLETED instance partitions for table: {}",
             tableNameWithType);
@@ -768,7 +795,8 @@ public class TableRebalancer {
    * Fetches/computes the instance partitions and also returns a boolean for 
whether they are unchanged
    */
   private Pair<InstancePartitions, Boolean> getInstancePartitions(TableConfig 
tableConfig,
-      InstancePartitionsType instancePartitionsType, boolean 
reassignInstances, boolean bootstrap, boolean dryRun) {
+      InstancePartitionsType instancePartitionsType, boolean 
reassignInstances, boolean bootstrap, boolean dryRun,
+      @Nullable Boolean minimizeDataMovement) {
     String tableNameWithType = tableConfig.getTableName();
     String instancePartitionsName =
         InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, 
instancePartitionsType.toString());
@@ -790,7 +818,7 @@ public class TableRebalancer {
           // instance partition map can be fully recalculated.
           instancePartitions = 
instanceAssignmentDriver.assignInstances(instancePartitionsType,
               
_helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(),
 true),
-              bootstrap ? null : existingInstancePartitions);
+              bootstrap ? null : existingInstancePartitions, 
minimizeDataMovement);
           instancePartitionsUnchanged = 
instancePartitions.equals(existingInstancePartitions);
           if (!dryRun && !instancePartitionsUnchanged) {
             LOGGER.info("Persisting instance partitions: {} to ZK", 
instancePartitions);
@@ -805,7 +833,8 @@ public class TableRebalancer {
                     referenceInstancePartitionsName, instancePartitionsName);
             instancePartitions = 
instanceAssignmentDriver.assignInstances(instancePartitionsType,
                 
_helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(),
 true),
-                bootstrap ? null : existingInstancePartitions, 
preConfiguredInstancePartitions);
+                bootstrap ? null : existingInstancePartitions, 
preConfiguredInstancePartitions,
+                minimizeDataMovement);
             instancePartitionsUnchanged = 
instancePartitions.equals(existingInstancePartitions);
             if (!dryRun && !instancePartitionsUnchanged) {
               LOGGER.info("Persisting instance partitions: {} (based on {})", 
instancePartitions,
@@ -868,7 +897,8 @@ public class TableRebalancer {
    * instance partitions are unchanged.
    */
   private Pair<Map<String, InstancePartitions>, Boolean> 
getTierToInstancePartitionsMap(TableConfig tableConfig,
-      @Nullable List<Tier> sortedTiers, boolean reassignInstances, boolean 
bootstrap, boolean dryRun) {
+      @Nullable List<Tier> sortedTiers, boolean reassignInstances, boolean 
bootstrap, boolean dryRun,
+      @Nullable Boolean minimizeDataMovement) {
     if (sortedTiers == null) {
       return Pair.of(null, true);
     }
@@ -878,7 +908,8 @@ public class TableRebalancer {
       LOGGER.info("Fetching/computing instance partitions for tier: {} of 
table: {}", tier.getName(),
           tableConfig.getTableName());
       Pair<InstancePartitions, Boolean> partitionsAndUnchanged =
-          getInstancePartitionsForTier(tableConfig, tier, reassignInstances, 
bootstrap, dryRun);
+          getInstancePartitionsForTier(tableConfig, tier, reassignInstances, 
bootstrap, dryRun,
+              minimizeDataMovement);
       tierToInstancePartitionsMap.put(tier.getName(), 
partitionsAndUnchanged.getLeft());
       instancePartitionsUnchanged = instancePartitionsUnchanged && 
partitionsAndUnchanged.getRight();
     }
@@ -891,7 +922,7 @@ public class TableRebalancer {
    * a boolean for whether the instance partition is unchanged.
    */
   private Pair<InstancePartitions, Boolean> 
getInstancePartitionsForTier(TableConfig tableConfig, Tier tier,
-      boolean reassignInstances, boolean bootstrap, boolean dryRun) {
+      boolean reassignInstances, boolean bootstrap, boolean dryRun, @Nullable 
Boolean minimizeDataMovement) {
     String tableNameWithType = tableConfig.getTableName();
     String tierName = tier.getName();
     String instancePartitionsName =
@@ -924,7 +955,7 @@ public class TableRebalancer {
         // partition map can be fully recalculated.
         InstancePartitions instancePartitions = 
instanceAssignmentDriver.assignInstances(tierName,
             
_helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(),
 true),
-            bootstrap ? null : existingInstancePartitions, 
instanceAssignmentConfig);
+            bootstrap ? null : existingInstancePartitions, 
instanceAssignmentConfig, minimizeDataMovement);
         boolean instancePartitionsUnchanged = 
instancePartitions.equals(existingInstancePartitions);
         if (!dryRun && !instancePartitionsUnchanged) {
           LOGGER.info("Persisting instance partitions: {} to ZK", 
instancePartitions);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index 39aef7f35a..055ce297a1 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -53,6 +53,8 @@ import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 
@@ -500,11 +502,224 @@ public class InstanceAssignmentTest {
     assertEquals(instancePartitions.getInstances(9, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 7));
   }
 
-  public void testMirrorServerSetBasedRandom() throws FileNotFoundException {
+  public void testMirrorServerSetBasedRandom()
+      throws FileNotFoundException {
     testMirrorServerSetBasedRandomInner(10000000);
   }
 
-  public void testMirrorServerSetBasedRandomInner(int loopCount) throws 
FileNotFoundException {
+  @Test
+  public void testForceMinimizeDataMovement() {
+    // This test case is using the same instance rebalance plot as 
testMinimizeDataMovement, and test whether
+    // forceMinimizeDataMovement flag in InstanceAssignmentDriver works as the 
minimizeDataMovement flag in
+    // TableConfig does.
+    int numReplicas = 3;
+    int numPartitions = 2;
+    int numInstancesPerPartition = 2;
+    String partitionColumn = "partition";
+
+    // Configs and driver that minimize data movement
+    InstanceAssignmentConfig instanceAssignmentConfig = new 
InstanceAssignmentConfig(
+        new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 
0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 
numPartitions, numInstancesPerPartition, false,
+            partitionColumn), null, true);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        .setNumReplicas(numReplicas)
+        .setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfig))
+        .build();
+    
assertTrue(InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig,
 InstancePartitionsType.OFFLINE)
+        .isMinimizeDataMovement());
+
+    InstanceAssignmentDriver driver = new 
InstanceAssignmentDriver(tableConfig);
+    // Configs and driver that DO NOT minimize data movement
+    InstanceAssignmentConfig instanceAssignmentConfigNotMinimized = new 
InstanceAssignmentConfig(
+        new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 
0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 
numPartitions, numInstancesPerPartition, false,
+            partitionColumn), null, false);
+
+    TableConfig tableConfigNotMinimized = new TableConfig(tableConfig);
+    tableConfigNotMinimized.setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfigNotMinimized));
+    
assertFalse(InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfigNotMinimized,
+        InstancePartitionsType.OFFLINE).isMinimizeDataMovement());
+    InstanceAssignmentDriver driverNotMinimized = new 
InstanceAssignmentDriver(tableConfigNotMinimized);
+
+    int numInstances = 10;
+    List<InstanceConfig> instanceConfigs = new ArrayList<>(numInstances);
+    for (int i = 0; i < numInstances; i++) {
+      InstanceConfig instanceConfig = new 
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Instances should be assigned to 3 replica-groups with a round-robin 
fashion, each with 2 instances
+    InstancePartitions instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, null, (Boolean) null);
+
+    InstancePartitions instancePartitionsForcedMinimize =
+        driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, null, true);
+
+    InstancePartitions instancePartitionsNotMinimize =
+        driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, null, false);
+
+    // Initial assignment should be the same for all scenarios
+    assertEquals(instancePartitionsForcedMinimize, instancePartitions);
+    assertEquals(instancePartitionsForcedMinimize, 
instancePartitionsNotMinimize);
+
+    // Remove two instances (i2, i6) and add two new instances (i10, i11).
+    instanceConfigs.remove(6);
+    instanceConfigs.remove(2);
+    for (int i = numInstances; i < numInstances + 2; i++) {
+      InstanceConfig instanceConfig = new 
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Instances should be assigned to 3 replica-groups with a round-robin 
fashion, each with 3 instances, then these 3
+    // instances should be assigned to 2 partitions, each with 2 instances
+    // Leverage the latest instancePartitions from last computation as the 
parameter.
+    // Data movement is minimized so that: i2 -> i10, i6 -> i11
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, instancePartitions, (Boolean) null);
+
+    instancePartitionsForcedMinimize =
+        driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs,
+            instancePartitionsForcedMinimize, true);
+
+    // Data movement here is not minimized
+    instancePartitionsNotMinimize =
+        driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs,
+            instancePartitionsNotMinimize, false);
+
+    // Forced minimized data movement should be the same as minimized data 
movement
+    assertEquals(instancePartitionsForcedMinimize, instancePartitions);
+    // Without minimizeDataMovement set to true, the data movement is not 
minimized and should be different
+    assertNotEquals(instancePartitionsNotMinimize, instancePartitions);
+
+    // Add 2 more instances to the ZK and increase the number of instances per 
partition from 2 to 3.
+    for (int i = numInstances + 2; i < numInstances + 4; i++) {
+      InstanceConfig instanceConfig = new 
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      instanceConfigs.add(instanceConfig);
+    }
+    numInstancesPerPartition = 3;
+    instanceAssignmentConfig = new InstanceAssignmentConfig(
+        new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 
0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 
numPartitions, numInstancesPerPartition, false,
+            partitionColumn), null, true);
+    tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfig));
+
+    instanceAssignmentConfigNotMinimized = new InstanceAssignmentConfig(
+        new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 
0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 
numPartitions, numInstancesPerPartition, false,
+            partitionColumn), null, false);
+    tableConfigNotMinimized.setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfigNotMinimized));
+
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, instancePartitions, (Boolean) null);
+
+    instancePartitionsForcedMinimize =
+        driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs,
+            instancePartitionsForcedMinimize, true);
+
+    instancePartitionsNotMinimize =
+        driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs,
+            instancePartitionsNotMinimize, false);
+
+    assertEquals(instancePartitionsForcedMinimize, instancePartitions);
+    assertNotEquals(instancePartitionsNotMinimize, instancePartitions);
+
+    // Reduce the number of instances per partition from 3 to 2.
+    numInstancesPerPartition = 2;
+    instanceAssignmentConfig = new InstanceAssignmentConfig(
+        new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 
0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 
numPartitions, numInstancesPerPartition, false,
+            partitionColumn), null, true);
+    tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfig));
+
+    instanceAssignmentConfigNotMinimized = new InstanceAssignmentConfig(
+        new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 
0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 
numPartitions, numInstancesPerPartition, false,
+            partitionColumn), null, false);
+    tableConfigNotMinimized.setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfigNotMinimized));
+
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, instancePartitions, (Boolean) null);
+
+    instancePartitionsForcedMinimize =
+        driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs,
+            instancePartitionsForcedMinimize, true);
+
+    instancePartitionsNotMinimize =
+        driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs,
+            instancePartitionsNotMinimize, false);
+
+    assertEquals(instancePartitionsForcedMinimize, instancePartitions);
+    assertNotEquals(instancePartitionsNotMinimize, instancePartitions);
+
+    // Add one more replica group (from 3 to 4).
+    numReplicas = 4;
+    
tableConfig.getValidationConfig().setReplication(Integer.toString(numReplicas));
+    
tableConfigNotMinimized.getValidationConfig().setReplication(Integer.toString(numReplicas));
+
+    instanceAssignmentConfig = new InstanceAssignmentConfig(
+        new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 
0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 
numPartitions, numInstancesPerPartition, false,
+            partitionColumn), null, true);
+    tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfig));
+
+    instanceAssignmentConfigNotMinimized = new InstanceAssignmentConfig(
+        new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 
0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 
numPartitions, numInstancesPerPartition, false,
+            partitionColumn), null, false);
+    tableConfigNotMinimized.setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfigNotMinimized));
+
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, instancePartitions, (Boolean) null);
+
+    instancePartitionsForcedMinimize =
+        driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs,
+            instancePartitionsForcedMinimize, true);
+
+    instancePartitionsNotMinimize =
+        driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs,
+            instancePartitionsNotMinimize, false);
+
+    assertEquals(instancePartitionsForcedMinimize, instancePartitions);
+    assertNotEquals(instancePartitionsNotMinimize, instancePartitions);
+
+    // Remove one replica group (from 4 to 3).
+    numReplicas = 3;
+    
tableConfig.getValidationConfig().setReplication(Integer.toString(numReplicas));
+    
tableConfigNotMinimized.getValidationConfig().setReplication(Integer.toString(numReplicas));
+
+    instanceAssignmentConfig = new InstanceAssignmentConfig(
+        new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 
0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 
numPartitions, numInstancesPerPartition, false,
+            partitionColumn), null, true);
+    tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfig));
+
+    instanceAssignmentConfigNotMinimized = new InstanceAssignmentConfig(
+        new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 
0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 
numPartitions, numInstancesPerPartition, false,
+            partitionColumn), null, false);
+    tableConfigNotMinimized.setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfigNotMinimized));
+
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs, instancePartitions, (Boolean) null);
+
+    instancePartitionsForcedMinimize =
+        driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs,
+            instancePartitionsForcedMinimize, true);
+
+    instancePartitionsNotMinimize =
+        driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE, 
instanceConfigs,
+            instancePartitionsNotMinimize, false);
+
+    assertEquals(instancePartitionsForcedMinimize, instancePartitions);
+    assertNotEquals(instancePartitionsNotMinimize, instancePartitions);
+  }
+
+  public void testMirrorServerSetBasedRandomInner(int loopCount)
+      throws FileNotFoundException {
     PrintStream o = new PrintStream("output.txt");
     System.setOut(o);
     for (int iter = 0; iter < loopCount; iter++) {
@@ -1212,7 +1427,6 @@ public class InstanceAssignmentTest {
             SERVER_INSTANCE_ID_PREFIX + 20,
             SERVER_INSTANCE_ID_PREFIX + 23));
 
-
     // upscale 3*3 to 3*5
     numPartitions = 0;
     numInstancesPerPartition = 0;
@@ -2209,9 +2423,9 @@ public class InstanceAssignmentTest {
         new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 
numInstancesPerReplicaGroup,
             0, 0, false, null);
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
-        Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-            new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
-                
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
 false)))
+            Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+                new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
+                    
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
 false)))
         .build();
     driver = new InstanceAssignmentDriver(tableConfig);
     try {
@@ -2243,9 +2457,9 @@ public class InstanceAssignmentTest {
         new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
             numInstancesPerReplicaGroup, 0, 0, false, null);
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
-        Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-            new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
-                
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
 false)))
+            Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+                new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
+                    
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
 false)))
         .build();
     driver = new InstanceAssignmentDriver(tableConfig);
     try {
@@ -2285,9 +2499,9 @@ public class InstanceAssignmentTest {
         new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
             numInstancesPerReplicaGroup, 0, 0, false, null);
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
-        Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-            new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
-                
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
 false)))
+            Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+                new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
+                    
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
 false)))
         .build();
     driver = new InstanceAssignmentDriver(tableConfig);
     try {
@@ -2397,9 +2611,9 @@ public class InstanceAssignmentTest {
         new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 
numInstancesPerReplicaGroup, numPartitions,
             numInstancesPerPartition, true, null);
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
-        Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-            new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
-                
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
 true)))
+            Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+                new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
+                    
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
 true)))
         .build();
     driver = new InstanceAssignmentDriver(tableConfig);
     // existingInstancePartitions = instancePartitions
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 5b99e56cc3..cab18467f0 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -95,8 +95,8 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     ExecutorService executorService = Executors.newFixedThreadPool(10);
     DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
     preChecker.init(_helixResourceManager, executorService);
-    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, 
null, preChecker,
-        _helixResourceManager.getTableSizeReader());
+    TableRebalancer tableRebalancer =
+        new TableRebalancer(_helixManager, null, null, preChecker, 
_helixResourceManager.getTableSizeReader());
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
 
@@ -513,6 +513,13 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     assertNull(rebalanceResult.getPreChecksResult());
 
     _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
+
+    for (int i = 0; i < numServers; i++) {
+      stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+    }
+    for (int i = 0; i < numServersToAdd; i++) {
+      stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + (numServers + i));
+    }
     executorService.shutdown();
   }
 
@@ -532,9 +539,10 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     }
     _helixResourceManager.createServerTenant(new Tenant(TenantRole.SERVER, 
NO_TIER_NAME, numServers, numServers, 0));
 
-    TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TIERED_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
-            .setServerTenant(NO_TIER_NAME).build();
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TIERED_TABLE_NAME)
+        .setNumReplicas(NUM_REPLICAS)
+        .setServerTenant(NO_TIER_NAME)
+        .build();
     // Create the table
     addDummySchema(TIERED_TABLE_NAME);
     _helixResourceManager.addTable(tableConfig);
@@ -675,9 +683,10 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     _helixResourceManager.createServerTenant(
         new Tenant(TenantRole.SERVER, "replicaAssignment" + NO_TIER_NAME, 
numServers, numServers, 0));
 
-    TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TIERED_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
-            .setServerTenant("replicaAssignment" + NO_TIER_NAME).build();
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TIERED_TABLE_NAME)
+        .setNumReplicas(NUM_REPLICAS)
+        .setServerTenant("replicaAssignment" + NO_TIER_NAME)
+        .build();
     // Create the table
     addDummySchema(TIERED_TABLE_NAME);
     _helixResourceManager.addTable(tableConfig);
@@ -858,6 +867,256 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     _helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
   }
 
+  @Test
+  public void testRebalanceWithMinimizeDataMovementBalanced()
+      throws Exception {
+    int numServers = 6;
+    for (int i = 0; i < numServers; i++) {
+      
addFakeServerInstanceToAutoJoinHelixCluster("minimizeDataMovement_balance_" + 
SERVER_INSTANCE_ID_PREFIX + i,
+          true);
+    }
+
+    // Create the table with default balanced segment assignment
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
+
+    addDummySchema(RAW_TABLE_NAME);
+    _helixResourceManager.addTable(tableConfig);
+
+    // Add the segments
+    int numSegments = 10;
+    long nowInDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
+
+    for (int i = 0; i < numSegments; i++) {
+      _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+          
SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME, 
SEGMENT_NAME_PREFIX + i,
+              nowInDays), null);
+    }
+
+    Map<String, Map<String, String>> oldSegmentAssignment =
+        
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields();
+
+    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
+
+    // Try dry-run summary mode
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setReassignInstances(true);
+    
rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE);
+    RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+
+    RebalanceSummaryResult rebalanceSummaryResult = 
rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(rebalanceSummaryResult);
+    assertNotNull(rebalanceSummaryResult.getServerInfo());
+    RebalanceSummaryResult.ServerInfo rebalanceServerInfo = 
rebalanceSummaryResult.getServerInfo();
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+    
assertEquals(rebalanceServerInfo.getNumServers().getExpectedValueAfterRebalance(),
 numServers);
+
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig(), null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+    // Segment assignment should not change
+    assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
+
+    // add one server instance
+    
addFakeServerInstanceToAutoJoinHelixCluster("minimizeDataMovement_balance_" + 
SERVER_INSTANCE_ID_PREFIX, true);
+
+    // Table without instance assignment config should work fine (ignore) with 
the minimizeDataMovement flag set
+    // Try dry-run summary mode
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setReassignInstances(true);
+    
rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+
+    rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(rebalanceSummaryResult);
+    assertNotNull(rebalanceSummaryResult.getServerInfo());
+    rebalanceServerInfo = rebalanceSummaryResult.getServerInfo();
+    // Should see the added server
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    
assertEquals(rebalanceServerInfo.getNumServers().getValueBeforeRebalance(), 
numServers);
+    
assertEquals(rebalanceServerInfo.getNumServers().getExpectedValueAfterRebalance(),
 numServers + 1);
+
+    // Check if the instance assignment is the same as the one without 
minimizeDataMovement flag set
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setReassignInstances(true);
+    
rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.DEFAULT);
+    RebalanceResult rebalanceResultWithoutMinimized = 
tableRebalancer.rebalance(tableConfig, rebalanceConfig, null);
+
+    assertEquals(rebalanceResult.getInstanceAssignment(), 
rebalanceResultWithoutMinimized.getInstanceAssignment());
+
+    // Rebalance
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setReassignInstances(true);
+    
rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    // Should see the added server in the instance assignment
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    
assertEquals(rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE).getInstances(0,
 0).size(),
+        numServers + 1);
+
+    _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
+    for (int i = 0; i < numServers; i++) {
+      stopAndDropFakeInstance("minimizeDataMovement_balance_" + 
SERVER_INSTANCE_ID_PREFIX + i);
+    }
+  }
+
+  @Test
+  public void testRebalanceWithMinimizeDataMovementInstanceAssignments()
+      throws Exception {
+    int numServers = 6;
+    for (int i = 0; i < numServers; i++) {
+      addFakeServerInstanceToAutoJoinHelixCluster("minimizeDataMovement_" + 
SERVER_INSTANCE_ID_PREFIX + i, true);
+    }
+
+    // One instance per replica group, no partition
+    InstanceAssignmentConfig instanceAssignmentConfig = new 
InstanceAssignmentConfig(
+        new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null), 
false, 0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 1, 0, 
0, false, null), null, false);
+
+    // Create the table
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        .setNumReplicas(NUM_REPLICAS)
+        .setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfig))
+        .build();
+
+    addDummySchema(RAW_TABLE_NAME);
+    _helixResourceManager.addTable(tableConfig);
+
+    // Add the segments
+    int numSegments = 10;
+    long nowInDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
+
+    for (int i = 0; i < numSegments; i++) {
+      _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+          
SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME, 
SEGMENT_NAME_PREFIX + i,
+              nowInDays), null);
+    }
+
+    Map<String, Map<String, String>> oldSegmentAssignment =
+        
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields();
+
+    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
+
+    // Try dry-run summary mode
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setReassignInstances(true);
+    
rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE);
+    RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+
+    RebalanceSummaryResult rebalanceSummaryResult = 
rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(rebalanceSummaryResult);
+    assertNotNull(rebalanceSummaryResult.getServerInfo());
+    RebalanceSummaryResult.ServerInfo rebalanceServerInfo = 
rebalanceSummaryResult.getServerInfo();
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+    
assertEquals(rebalanceServerInfo.getNumServers().getExpectedValueAfterRebalance(),
 NUM_REPLICAS);
+
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig(), null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+    // Segment assignment should not change
+    assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
+
+    // add one server instance
+    addFakeServerInstanceToAutoJoinHelixCluster("minimizeDataMovement_" + 
SERVER_INSTANCE_ID_PREFIX + numServers, true);
+
+    // increase replica group size by 1
+    instanceAssignmentConfig = new InstanceAssignmentConfig(
+        new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null), 
false, 0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS + 1, 1, 
0, 0, false, null), null, false);
+
+    tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfig));
+
+    // Try dry-run summary mode
+
+    // without minimize data movement, it's supposed to add more than one 
server
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setReassignInstances(true);
+    
rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.DISABLE);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    rebalanceServerInfo = 
rebalanceResult.getRebalanceSummaryResult().getServerInfo();
+
+    // note: this assertion may fail due to instance assignment algorithm 
changed in the future.
+    // right now, rebalance without minimizing data movement adds more than 
one server and remove some servers in the
+    // testing setup like this.
+    assertTrue(rebalanceServerInfo.getServersAdded().size() > 1);
+    assertEquals(rebalanceServerInfo.getServersAdded().size() - 
rebalanceServerInfo.getServersRemoved().size(), 1);
+
+    // use default table config's minimizeDataMovement flag, should be 
equivalent to without minimize data movement
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setReassignInstances(true);
+    
rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.DEFAULT);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    rebalanceServerInfo = 
rebalanceResult.getRebalanceSummaryResult().getServerInfo();
+
+    assertTrue(rebalanceServerInfo.getServersAdded().size() > 1);
+    assertEquals(rebalanceServerInfo.getServersAdded().size() - 
rebalanceServerInfo.getServersRemoved().size(), 1);
+
+    // with minimize data movement, we should add 1 server only
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setReassignInstances(true);
+    
rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    rebalanceServerInfo = 
rebalanceResult.getRebalanceSummaryResult().getServerInfo();
+
+    assertEquals(rebalanceServerInfo.getServersAdded().size(), 1);
+    assertEquals(rebalanceServerInfo.getServersRemoved().size(), 0);
+
+    // rebalance without dry-run
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setReassignInstances(true);
+    
rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    
assertEquals(rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE).getNumReplicaGroups(),
+        NUM_REPLICAS + 1);
+
+    // add one server instance
+    addFakeServerInstanceToAutoJoinHelixCluster("minimizeDataMovement_" + 
SERVER_INSTANCE_ID_PREFIX + (numServers + 1),
+        true);
+
+    // decrease replica group size by 1
+    instanceAssignmentConfig = new InstanceAssignmentConfig(
+        new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null), 
false, 0, null), null,
+        new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 1, 0, 
0, false, null), null, false);
+
+    tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE", 
instanceAssignmentConfig));
+    _helixResourceManager.updateTableConfig(tableConfig);
+
+    // Try dry-run summary mode
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setReassignInstances(true);
+    
rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    rebalanceServerInfo = 
rebalanceResult.getRebalanceSummaryResult().getServerInfo();
+
+    // with minimize data movement, we should remove 1 server only
+    assertEquals(rebalanceServerInfo.getServersAdded().size(), 0);
+    assertEquals(rebalanceServerInfo.getServersRemoved().size(), 1);
+
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setReassignInstances(true);
+    
rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    
assertEquals(rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE).getNumReplicaGroups(),
+        NUM_REPLICAS);
+
+    _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
+    for (int i = 0; i < numServers; i++) {
+      stopAndDropFakeInstance("minimizeDataMovement_" + 
SERVER_INSTANCE_ID_PREFIX + i);
+    }
+  }
+
   @AfterClass
   public void tearDown() {
     stopFakeInstances();
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
index f1165bc9d4..7b08af32c0 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
@@ -33,7 +33,8 @@ public class PinotTableRebalancer extends PinotZKChanger {
   private final RebalanceConfig _rebalanceConfig = new RebalanceConfig();
 
   public PinotTableRebalancer(String zkAddress, String clusterName, boolean 
dryRun, boolean preChecks,
-      boolean reassignInstances, boolean includeConsuming, boolean bootstrap, 
boolean downtime,
+      boolean reassignInstances, boolean includeConsuming,
+      RebalanceConfig.MinimizeDataMovementOptions minimizeDataMovement, 
boolean bootstrap, boolean downtime,
       int minReplicasToKeepUpForNoDowntime, boolean lowDiskMode, boolean 
bestEffort, long externalViewCheckIntervalInMs,
       long externalViewStabilizationTimeoutInMs) {
     super(zkAddress, clusterName);
@@ -41,6 +42,7 @@ public class PinotTableRebalancer extends PinotZKChanger {
     _rebalanceConfig.setPreChecks(preChecks);
     _rebalanceConfig.setReassignInstances(reassignInstances);
     _rebalanceConfig.setIncludeConsuming(includeConsuming);
+    _rebalanceConfig.setMinimizeDataMovement(minimizeDataMovement);
     _rebalanceConfig.setBootstrap(bootstrap);
     _rebalanceConfig.setDowntime(downtime);
     _rebalanceConfig.setMinAvailableReplicas(minReplicasToKeepUpForNoDowntime);
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
index 96ed26994a..e94e7b2773 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
@@ -64,6 +64,11 @@ public class RebalanceTableCommand extends 
AbstractBaseAdminCommand implements C
       description = "Whether to reassign CONSUMING segments for real-time 
table (true by default)")
   private boolean _includeConsuming = true;
 
+  @CommandLine.Option(names = {"-minimizeDataMovement"}, description = 
"Whether to enable, disable minimize data "
+      + "movement algorithm, or use table's default config")
+  private RebalanceConfig.MinimizeDataMovementOptions _minimizeDataMovement =
+      RebalanceConfig.MinimizeDataMovementOptions.ENABLE;
+
   @CommandLine.Option(names = {"-bootstrap"},
       description = "Whether to rebalance table in bootstrap mode (regardless 
of minimum segment movement, reassign"
           + " all segments in a round-robin fashion as if adding new segments 
to an empty table, false by default)")
@@ -110,8 +115,8 @@ public class RebalanceTableCommand extends 
AbstractBaseAdminCommand implements C
       throws Exception {
     PinotTableRebalancer tableRebalancer =
         new PinotTableRebalancer(_zkAddress, _clusterName, _dryRun, 
_preChecks, _reassignInstances, _includeConsuming,
-            _bootstrap, _downtime, _minAvailableReplicas, _lowDiskMode, 
_bestEfforts, _externalViewCheckIntervalInMs,
-            _externalViewStabilizationTimeoutInMs);
+            _minimizeDataMovement, _bootstrap, _downtime, 
_minAvailableReplicas, _lowDiskMode, _bestEfforts,
+            _externalViewCheckIntervalInMs, 
_externalViewStabilizationTimeoutInMs);
     RebalanceResult rebalanceResult = 
tableRebalancer.rebalance(_tableNameWithType);
     LOGGER
         .info("Got rebalance result: {} for table: {}", 
JsonUtils.objectToString(rebalanceResult), _tableNameWithType);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to