somandal commented on code in PR #15110:
URL: https://github.com/apache/pinot/pull/15110#discussion_r1985741776


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java:
##########
@@ -83,6 +83,13 @@ public class RebalanceConfig {
   @ApiModelProperty(example = "false")
   private boolean _bestEfforts = false;
 
+  // Whether to enforce Minimal Data Movement Algorithm (only effective if 
instance assignment config is set, and if
+  // bootstrap is false). If set to false, the minimizeDataMovement flag in 
the table config will be used to determine
+  // whether to run the Minimal Data Movement Algorithm.
+  @JsonProperty("minimizeDataMovement")
+  @ApiModelProperty(example = "TRUE")
+  private String _minimizeDataMovement = "TRUE";

Review Comment:
   Can we keep this `Boolean` here?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java:
##########
@@ -55,40 +55,65 @@ public InstanceAssignmentDriver(TableConfig tableConfig) {
 
   public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
       List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions) {
+    return assignInstances(instancePartitionsType, instanceConfigs, 
existingInstancePartitions, (Boolean) null);
+  }
+
+  public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
+      List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions,
+      @Nullable Boolean minimizeDataMovement) {
     String tableNameWithType = _tableConfig.getTableName();
     InstanceAssignmentConfig assignmentConfig =
         
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, 
instancePartitionsType);
     return getInstancePartitions(
         
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
-        assignmentConfig, instanceConfigs, existingInstancePartitions, null);
+        assignmentConfig, instanceConfigs, existingInstancePartitions, null, 
minimizeDataMovement);
   }
 
   public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
       List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions,
       @Nullable InstancePartitions preConfiguredInstancePartitions) {
+    return assignInstances(instancePartitionsType, instanceConfigs, 
existingInstancePartitions,
+        preConfiguredInstancePartitions, null);
+  }
+
+  public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
+      List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions,
+      @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable 
Boolean minimizeDataMovement) {
     String tableNameWithType = _tableConfig.getTableName();
     InstanceAssignmentConfig assignmentConfig =
         
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, 
instancePartitionsType);
     return getInstancePartitions(
         
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
-        assignmentConfig, instanceConfigs, existingInstancePartitions, 
preConfiguredInstancePartitions);
+        assignmentConfig, instanceConfigs, existingInstancePartitions, 
preConfiguredInstancePartitions,
+        minimizeDataMovement);
   }
 
   public InstancePartitions assignInstances(String tierName, 
List<InstanceConfig> instanceConfigs,
       @Nullable InstancePartitions existingInstancePartitions, 
InstanceAssignmentConfig instanceAssignmentConfig) {
+    return assignInstances(tierName, instanceConfigs, 
existingInstancePartitions, instanceAssignmentConfig, null);
+  }
+
+  public InstancePartitions assignInstances(String tierName, 
List<InstanceConfig> instanceConfigs,
+      @Nullable InstancePartitions existingInstancePartitions, 
InstanceAssignmentConfig instanceAssignmentConfig,
+      @Nullable Boolean minimizeDataMovement) {
     return getInstancePartitions(
         
InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(),
 tierName),
-        instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, 
null);
+        instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, 
null, minimizeDataMovement);
   }
 
   private InstancePartitions getInstancePartitions(String 
instancePartitionsName,
       InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig> 
instanceConfigs,
       @Nullable InstancePartitions existingInstancePartitions,
-      @Nullable InstancePartitions preConfiguredInstancePartitions) {
+      @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable 
Boolean minimizeDataMovementFlag) {

Review Comment:
   nit: let's rename it to `minimizeDataMovementOverride`?



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java:
##########
@@ -51,9 +51,7 @@
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.fail;
+import static org.testng.Assert.*;

Review Comment:
   nit: I usually avoid "*" imports. can you just add the newly added functions 
here?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -651,6 +654,7 @@ public RebalanceResult rebalance(
     rebalanceConfig.setPreChecks(preChecks);
     rebalanceConfig.setReassignInstances(reassignInstances);
     rebalanceConfig.setIncludeConsuming(includeConsuming);
+    rebalanceConfig.setMinimizeDataMovement(minimizeDataMovement);

Review Comment:
   Can the conversion of this String to `Boolean` be done here itself? i.e. the 
parsing for TRUE/FALSE and DEFAULT



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java:
##########
@@ -55,40 +55,65 @@ public InstanceAssignmentDriver(TableConfig tableConfig) {
 
   public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
       List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions) {
+    return assignInstances(instancePartitionsType, instanceConfigs, 
existingInstancePartitions, (Boolean) null);
+  }
+
+  public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
+      List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions,
+      @Nullable Boolean minimizeDataMovement) {
     String tableNameWithType = _tableConfig.getTableName();
     InstanceAssignmentConfig assignmentConfig =
         
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, 
instancePartitionsType);
     return getInstancePartitions(
         
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
-        assignmentConfig, instanceConfigs, existingInstancePartitions, null);
+        assignmentConfig, instanceConfigs, existingInstancePartitions, null, 
minimizeDataMovement);
   }
 
   public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
       List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions,
       @Nullable InstancePartitions preConfiguredInstancePartitions) {
+    return assignInstances(instancePartitionsType, instanceConfigs, 
existingInstancePartitions,
+        preConfiguredInstancePartitions, null);
+  }
+
+  public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
+      List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions,
+      @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable 
Boolean minimizeDataMovement) {
     String tableNameWithType = _tableConfig.getTableName();
     InstanceAssignmentConfig assignmentConfig =
         
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, 
instancePartitionsType);
     return getInstancePartitions(
         
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
-        assignmentConfig, instanceConfigs, existingInstancePartitions, 
preConfiguredInstancePartitions);
+        assignmentConfig, instanceConfigs, existingInstancePartitions, 
preConfiguredInstancePartitions,
+        minimizeDataMovement);
   }
 
   public InstancePartitions assignInstances(String tierName, 
List<InstanceConfig> instanceConfigs,
       @Nullable InstancePartitions existingInstancePartitions, 
InstanceAssignmentConfig instanceAssignmentConfig) {
+    return assignInstances(tierName, instanceConfigs, 
existingInstancePartitions, instanceAssignmentConfig, null);
+  }
+
+  public InstancePartitions assignInstances(String tierName, 
List<InstanceConfig> instanceConfigs,
+      @Nullable InstancePartitions existingInstancePartitions, 
InstanceAssignmentConfig instanceAssignmentConfig,
+      @Nullable Boolean minimizeDataMovement) {
     return getInstancePartitions(
         
InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(),
 tierName),
-        instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, 
null);
+        instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, 
null, minimizeDataMovement);
   }
 
   private InstancePartitions getInstancePartitions(String 
instancePartitionsName,
       InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig> 
instanceConfigs,
       @Nullable InstancePartitions existingInstancePartitions,
-      @Nullable InstancePartitions preConfiguredInstancePartitions) {
+      @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable 
Boolean minimizeDataMovementFlag) {
     String tableNameWithType = _tableConfig.getTableName();
-    LOGGER.info("Starting {} instance assignment for table {}", 
instancePartitionsName, tableNameWithType);
 
-    boolean minimizeDataMovement = 
instanceAssignmentConfig.isMinimizeDataMovement();
+    // minimizeDataMovement might be set back to false within 
InstanceTagPoolSelector and InstancePartitionSelector
+    // if existingInstancePartitions is null.
+    boolean minimizeDataMovement =
+        minimizeDataMovementFlag == null ? 
instanceAssignmentConfig.isMinimizeDataMovement() : minimizeDataMovementFlag;
+    LOGGER.info("Starting {} instance assignment for table {}, 
instanceAssignmentConfig.isMinimizeDataMovement()={}, "
+            + "minimizeDataMovement={}", instancePartitionsName, 
tableNameWithType,

Review Comment:
   nit: in the comment change `minimizeDataMovement=` to 
`minimizeDataMovementOverride=`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -191,17 +191,21 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
     boolean bestEfforts = rebalanceConfig.isBestEfforts();
     long externalViewCheckIntervalInMs = 
rebalanceConfig.getExternalViewCheckIntervalInMs();
     long externalViewStabilizationTimeoutInMs = 
rebalanceConfig.getExternalViewStabilizationTimeoutInMs();
+    String minimizeDataMovementStr = rebalanceConfig.getMinimizeDataMovement();
+    Boolean minimizeDataMovement =
+        minimizeDataMovementStr.toLowerCase().matches("^(true|false)$") ? 
Boolean.valueOf(minimizeDataMovementStr)

Review Comment:
   let's move this to `PinotTableRestletResource`. Also throw an exception if 
the value is not one of "true" "false" or "default" from there itself.



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -858,6 +867,227 @@ public void testRebalanceWithTiersAndInstanceAssignments()
     _helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
   }
 
+  @Test
+  public void testRebalanceWithMinimizeDataMovementBalanced()
+      throws Exception {
+    int numServers = 6;
+    for (int i = 0; i < numServers; i++) {
+      
addFakeServerInstanceToAutoJoinHelixCluster("minimizeDataMovement_balance_" + 
SERVER_INSTANCE_ID_PREFIX + i,
+          true);
+    }
+
+    // Create the table with default balanced segment assignment
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
+
+    addDummySchema(RAW_TABLE_NAME);
+    _helixResourceManager.addTable(tableConfig);
+
+    // Add the segments
+    int numSegments = 10;
+    long nowInDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
+
+    for (int i = 0; i < numSegments; i++) {
+      _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+          
SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME, 
SEGMENT_NAME_PREFIX + i,
+              nowInDays), null);
+    }
+
+    Map<String, Map<String, String>> oldSegmentAssignment =
+        
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields();
+
+    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
+
+    // Try dry-run summary mode
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setReassignInstances(true);
+    rebalanceConfig.setMinimizeDataMovement("true");

Review Comment:
   can you add a test for `setMinimizeDataMovement("false")` where TableConfig 
indicates it is true?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java:
##########
@@ -55,40 +55,65 @@ public InstanceAssignmentDriver(TableConfig tableConfig) {
 
   public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
       List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions) {
+    return assignInstances(instancePartitionsType, instanceConfigs, 
existingInstancePartitions, (Boolean) null);

Review Comment:
   nit: do you need to cast null to `Boolean` here?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java:
##########
@@ -55,40 +55,65 @@ public InstanceAssignmentDriver(TableConfig tableConfig) {
 
   public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
       List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions) {
+    return assignInstances(instancePartitionsType, instanceConfigs, 
existingInstancePartitions, (Boolean) null);
+  }
+
+  public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
+      List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions,
+      @Nullable Boolean minimizeDataMovement) {
     String tableNameWithType = _tableConfig.getTableName();
     InstanceAssignmentConfig assignmentConfig =
         
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, 
instancePartitionsType);
     return getInstancePartitions(
         
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
-        assignmentConfig, instanceConfigs, existingInstancePartitions, null);
+        assignmentConfig, instanceConfigs, existingInstancePartitions, null, 
minimizeDataMovement);
   }
 
   public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
       List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions,
       @Nullable InstancePartitions preConfiguredInstancePartitions) {
+    return assignInstances(instancePartitionsType, instanceConfigs, 
existingInstancePartitions,
+        preConfiguredInstancePartitions, null);
+  }
+
+  public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
+      List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions,
+      @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable 
Boolean minimizeDataMovement) {
     String tableNameWithType = _tableConfig.getTableName();
     InstanceAssignmentConfig assignmentConfig =
         
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, 
instancePartitionsType);
     return getInstancePartitions(
         
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
-        assignmentConfig, instanceConfigs, existingInstancePartitions, 
preConfiguredInstancePartitions);
+        assignmentConfig, instanceConfigs, existingInstancePartitions, 
preConfiguredInstancePartitions,
+        minimizeDataMovement);
   }
 
   public InstancePartitions assignInstances(String tierName, 
List<InstanceConfig> instanceConfigs,
       @Nullable InstancePartitions existingInstancePartitions, 
InstanceAssignmentConfig instanceAssignmentConfig) {
+    return assignInstances(tierName, instanceConfigs, 
existingInstancePartitions, instanceAssignmentConfig, null);
+  }
+
+  public InstancePartitions assignInstances(String tierName, 
List<InstanceConfig> instanceConfigs,
+      @Nullable InstancePartitions existingInstancePartitions, 
InstanceAssignmentConfig instanceAssignmentConfig,
+      @Nullable Boolean minimizeDataMovement) {
     return getInstancePartitions(
         
InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(),
 tierName),
-        instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, 
null);
+        instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, 
null, minimizeDataMovement);
   }
 
   private InstancePartitions getInstancePartitions(String 
instancePartitionsName,
       InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig> 
instanceConfigs,
       @Nullable InstancePartitions existingInstancePartitions,
-      @Nullable InstancePartitions preConfiguredInstancePartitions) {
+      @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable 
Boolean minimizeDataMovementFlag) {
     String tableNameWithType = _tableConfig.getTableName();
-    LOGGER.info("Starting {} instance assignment for table {}", 
instancePartitionsName, tableNameWithType);
 
-    boolean minimizeDataMovement = 
instanceAssignmentConfig.isMinimizeDataMovement();
+    // minimizeDataMovement might be set back to false within 
InstanceTagPoolSelector and InstancePartitionSelector

Review Comment:
   does this comment need to be updated to reflect the new change?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java:
##########
@@ -83,6 +83,13 @@ public class RebalanceConfig {
   @ApiModelProperty(example = "false")
   private boolean _bestEfforts = false;
 
+  // Whether to enforce Minimal Data Movement Algorithm (only effective if 
instance assignment config is set, and if
+  // bootstrap is false). If set to false, the minimizeDataMovement flag in 
the table config will be used to determine

Review Comment:
   nit: update comment, that if set to DEFAULT we fallback on table config, 
otherwise use this as the override value



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to