somandal commented on code in PR #15110: URL: https://github.com/apache/pinot/pull/15110#discussion_r1985741776
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java: ########## @@ -83,6 +83,13 @@ public class RebalanceConfig { @ApiModelProperty(example = "false") private boolean _bestEfforts = false; + // Whether to enforce Minimal Data Movement Algorithm (only effective if instance assignment config is set, and if + // bootstrap is false). If set to false, the minimizeDataMovement flag in the table config will be used to determine + // whether to run the Minimal Data Movement Algorithm. + @JsonProperty("minimizeDataMovement") + @ApiModelProperty(example = "TRUE") + private String _minimizeDataMovement = "TRUE"; Review Comment: Can we keep this `Boolean` here? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java: ########## @@ -55,40 +55,65 @@ public InstanceAssignmentDriver(TableConfig tableConfig) { public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType, List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions) { + return assignInstances(instancePartitionsType, instanceConfigs, existingInstancePartitions, (Boolean) null); + } + + public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType, + List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, + @Nullable Boolean minimizeDataMovement) { String tableNameWithType = _tableConfig.getTableName(); InstanceAssignmentConfig assignmentConfig = InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType); return getInstancePartitions( instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)), - assignmentConfig, instanceConfigs, existingInstancePartitions, null); + assignmentConfig, instanceConfigs, existingInstancePartitions, null, minimizeDataMovement); } public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType, List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, @Nullable InstancePartitions preConfiguredInstancePartitions) { + return assignInstances(instancePartitionsType, instanceConfigs, existingInstancePartitions, + preConfiguredInstancePartitions, null); + } + + public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType, + List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, + @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable Boolean minimizeDataMovement) { String tableNameWithType = _tableConfig.getTableName(); InstanceAssignmentConfig assignmentConfig = InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType); return getInstancePartitions( instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)), - assignmentConfig, instanceConfigs, existingInstancePartitions, preConfiguredInstancePartitions); + assignmentConfig, instanceConfigs, existingInstancePartitions, preConfiguredInstancePartitions, + minimizeDataMovement); } public InstancePartitions assignInstances(String tierName, List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, InstanceAssignmentConfig instanceAssignmentConfig) { + return assignInstances(tierName, instanceConfigs, existingInstancePartitions, instanceAssignmentConfig, null); + } + + public InstancePartitions assignInstances(String tierName, List<InstanceConfig> instanceConfigs, + @Nullable InstancePartitions existingInstancePartitions, InstanceAssignmentConfig instanceAssignmentConfig, + @Nullable Boolean minimizeDataMovement) { return getInstancePartitions( InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(), tierName), - instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, null); + instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, null, minimizeDataMovement); } private InstancePartitions getInstancePartitions(String instancePartitionsName, InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, - @Nullable InstancePartitions preConfiguredInstancePartitions) { + @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable Boolean minimizeDataMovementFlag) { Review Comment: nit: let's rename it to `minimizeDataMovementOverride`? ########## pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java: ########## @@ -51,9 +51,7 @@ import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.annotations.Test; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.fail; +import static org.testng.Assert.*; Review Comment: nit: I usually avoid "*" imports. can you just add the newly added functions here? ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java: ########## @@ -651,6 +654,7 @@ public RebalanceResult rebalance( rebalanceConfig.setPreChecks(preChecks); rebalanceConfig.setReassignInstances(reassignInstances); rebalanceConfig.setIncludeConsuming(includeConsuming); + rebalanceConfig.setMinimizeDataMovement(minimizeDataMovement); Review Comment: Can the conversion of this String to `Boolean` be done here itself? i.e. the parsing for TRUE/FALSE and DEFAULT ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java: ########## @@ -55,40 +55,65 @@ public InstanceAssignmentDriver(TableConfig tableConfig) { public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType, List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions) { + return assignInstances(instancePartitionsType, instanceConfigs, existingInstancePartitions, (Boolean) null); + } + + public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType, + List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, + @Nullable Boolean minimizeDataMovement) { String tableNameWithType = _tableConfig.getTableName(); InstanceAssignmentConfig assignmentConfig = InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType); return getInstancePartitions( instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)), - assignmentConfig, instanceConfigs, existingInstancePartitions, null); + assignmentConfig, instanceConfigs, existingInstancePartitions, null, minimizeDataMovement); } public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType, List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, @Nullable InstancePartitions preConfiguredInstancePartitions) { + return assignInstances(instancePartitionsType, instanceConfigs, existingInstancePartitions, + preConfiguredInstancePartitions, null); + } + + public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType, + List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, + @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable Boolean minimizeDataMovement) { String tableNameWithType = _tableConfig.getTableName(); InstanceAssignmentConfig assignmentConfig = InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType); return getInstancePartitions( instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)), - assignmentConfig, instanceConfigs, existingInstancePartitions, preConfiguredInstancePartitions); + assignmentConfig, instanceConfigs, existingInstancePartitions, preConfiguredInstancePartitions, + minimizeDataMovement); } public InstancePartitions assignInstances(String tierName, List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, InstanceAssignmentConfig instanceAssignmentConfig) { + return assignInstances(tierName, instanceConfigs, existingInstancePartitions, instanceAssignmentConfig, null); + } + + public InstancePartitions assignInstances(String tierName, List<InstanceConfig> instanceConfigs, + @Nullable InstancePartitions existingInstancePartitions, InstanceAssignmentConfig instanceAssignmentConfig, + @Nullable Boolean minimizeDataMovement) { return getInstancePartitions( InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(), tierName), - instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, null); + instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, null, minimizeDataMovement); } private InstancePartitions getInstancePartitions(String instancePartitionsName, InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, - @Nullable InstancePartitions preConfiguredInstancePartitions) { + @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable Boolean minimizeDataMovementFlag) { String tableNameWithType = _tableConfig.getTableName(); - LOGGER.info("Starting {} instance assignment for table {}", instancePartitionsName, tableNameWithType); - boolean minimizeDataMovement = instanceAssignmentConfig.isMinimizeDataMovement(); + // minimizeDataMovement might be set back to false within InstanceTagPoolSelector and InstancePartitionSelector + // if existingInstancePartitions is null. + boolean minimizeDataMovement = + minimizeDataMovementFlag == null ? instanceAssignmentConfig.isMinimizeDataMovement() : minimizeDataMovementFlag; + LOGGER.info("Starting {} instance assignment for table {}, instanceAssignmentConfig.isMinimizeDataMovement()={}, " + + "minimizeDataMovement={}", instancePartitionsName, tableNameWithType, Review Comment: nit: in the comment change `minimizeDataMovement=` to `minimizeDataMovementOverride=` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -191,17 +191,21 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb boolean bestEfforts = rebalanceConfig.isBestEfforts(); long externalViewCheckIntervalInMs = rebalanceConfig.getExternalViewCheckIntervalInMs(); long externalViewStabilizationTimeoutInMs = rebalanceConfig.getExternalViewStabilizationTimeoutInMs(); + String minimizeDataMovementStr = rebalanceConfig.getMinimizeDataMovement(); + Boolean minimizeDataMovement = + minimizeDataMovementStr.toLowerCase().matches("^(true|false)$") ? Boolean.valueOf(minimizeDataMovementStr) Review Comment: let's move this to `PinotTableRestletResource`. Also throw an exception if the value is not one of "true" "false" or "default" from there itself. ########## pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java: ########## @@ -858,6 +867,227 @@ public void testRebalanceWithTiersAndInstanceAssignments() _helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME); } + @Test + public void testRebalanceWithMinimizeDataMovementBalanced() + throws Exception { + int numServers = 6; + for (int i = 0; i < numServers; i++) { + addFakeServerInstanceToAutoJoinHelixCluster("minimizeDataMovement_balance_" + SERVER_INSTANCE_ID_PREFIX + i, + true); + } + + // Create the table with default balanced segment assignment + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build(); + + addDummySchema(RAW_TABLE_NAME); + _helixResourceManager.addTable(tableConfig); + + // Add the segments + int numSegments = 10; + long nowInDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis()); + + for (int i = 0; i < numSegments; i++) { + _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, + SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME, SEGMENT_NAME_PREFIX + i, + nowInDays), null); + } + + Map<String, Map<String, String>> oldSegmentAssignment = + _helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(); + + TableRebalancer tableRebalancer = new TableRebalancer(_helixManager); + + // Try dry-run summary mode + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setReassignInstances(true); + rebalanceConfig.setMinimizeDataMovement("true"); Review Comment: can you add a test for `setMinimizeDataMovement("false")` where TableConfig indicates it is true? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java: ########## @@ -55,40 +55,65 @@ public InstanceAssignmentDriver(TableConfig tableConfig) { public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType, List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions) { + return assignInstances(instancePartitionsType, instanceConfigs, existingInstancePartitions, (Boolean) null); Review Comment: nit: do you need to cast null to `Boolean` here? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java: ########## @@ -55,40 +55,65 @@ public InstanceAssignmentDriver(TableConfig tableConfig) { public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType, List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions) { + return assignInstances(instancePartitionsType, instanceConfigs, existingInstancePartitions, (Boolean) null); + } + + public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType, + List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, + @Nullable Boolean minimizeDataMovement) { String tableNameWithType = _tableConfig.getTableName(); InstanceAssignmentConfig assignmentConfig = InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType); return getInstancePartitions( instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)), - assignmentConfig, instanceConfigs, existingInstancePartitions, null); + assignmentConfig, instanceConfigs, existingInstancePartitions, null, minimizeDataMovement); } public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType, List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, @Nullable InstancePartitions preConfiguredInstancePartitions) { + return assignInstances(instancePartitionsType, instanceConfigs, existingInstancePartitions, + preConfiguredInstancePartitions, null); + } + + public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType, + List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, + @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable Boolean minimizeDataMovement) { String tableNameWithType = _tableConfig.getTableName(); InstanceAssignmentConfig assignmentConfig = InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType); return getInstancePartitions( instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)), - assignmentConfig, instanceConfigs, existingInstancePartitions, preConfiguredInstancePartitions); + assignmentConfig, instanceConfigs, existingInstancePartitions, preConfiguredInstancePartitions, + minimizeDataMovement); } public InstancePartitions assignInstances(String tierName, List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, InstanceAssignmentConfig instanceAssignmentConfig) { + return assignInstances(tierName, instanceConfigs, existingInstancePartitions, instanceAssignmentConfig, null); + } + + public InstancePartitions assignInstances(String tierName, List<InstanceConfig> instanceConfigs, + @Nullable InstancePartitions existingInstancePartitions, InstanceAssignmentConfig instanceAssignmentConfig, + @Nullable Boolean minimizeDataMovement) { return getInstancePartitions( InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(), tierName), - instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, null); + instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, null, minimizeDataMovement); } private InstancePartitions getInstancePartitions(String instancePartitionsName, InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, - @Nullable InstancePartitions preConfiguredInstancePartitions) { + @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable Boolean minimizeDataMovementFlag) { String tableNameWithType = _tableConfig.getTableName(); - LOGGER.info("Starting {} instance assignment for table {}", instancePartitionsName, tableNameWithType); - boolean minimizeDataMovement = instanceAssignmentConfig.isMinimizeDataMovement(); + // minimizeDataMovement might be set back to false within InstanceTagPoolSelector and InstancePartitionSelector Review Comment: does this comment need to be updated to reflect the new change? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java: ########## @@ -83,6 +83,13 @@ public class RebalanceConfig { @ApiModelProperty(example = "false") private boolean _bestEfforts = false; + // Whether to enforce Minimal Data Movement Algorithm (only effective if instance assignment config is set, and if + // bootstrap is false). If set to false, the minimizeDataMovement flag in the table config will be used to determine Review Comment: nit: update comment, that if set to DEFAULT we fallback on table config, otherwise use this as the override value -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org