This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch disgard-existing-instance-partitions-for-bootstrap-mode in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 878d0060a53687d5070a9c2b63bd2946682c6d32 Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> AuthorDate: Mon Jan 23 15:36:21 2023 -0800 Discard existing instancePartitions if bootstrap rebalance mode is enabled --- .../helix/core/rebalance/TableRebalancer.java | 35 +++++++++++----------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index 4a1bc13b52..194c175deb 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -152,10 +152,9 @@ public class TableRebalancer { LOGGER.info( "Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, includeConsuming: {}, bootstrap: {}, " + "downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}, bestEfforts: {}, " - + "externalViewCheckIntervalInMs: {}, externalViewStabilizationTimeoutInMs: {}", - tableNameWithType, dryRun, reassignInstances, includeConsuming, bootstrap, downtime, - minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup, bestEfforts, externalViewCheckIntervalInMs, - externalViewStabilizationTimeoutInMs); + + "externalViewCheckIntervalInMs: {}, externalViewStabilizationTimeoutInMs: {}", tableNameWithType, dryRun, + reassignInstances, includeConsuming, bootstrap, downtime, minReplicasToKeepUpForNoDowntime, + enableStrictReplicaGroup, bestEfforts, externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs); // Validate table config try { @@ -200,7 +199,7 @@ public class TableRebalancer { // Calculate instance partitions map Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap; try { - instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, dryRun); + instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, dryRun); } catch (Exception e) { LOGGER.warn( "Caught exception while fetching/calculating instance partitions for table: {}, aborting the rebalance", @@ -353,7 +352,7 @@ public class TableRebalancer { if (segmentsToMoveChanged) { try { // Re-calculate the instance partitions in case the instance configs changed during the rebalance - instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, false); + instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, false); tierToInstancePartitionsMap = getTierToInstancePartitionsMap(tableNameWithType, sortedTiers); targetAssignment = segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers, tierToInstancePartitionsMap, rebalanceConfig); @@ -419,21 +418,21 @@ public class TableRebalancer { } private Map<InstancePartitionsType, InstancePartitions> getInstancePartitionsMap(TableConfig tableConfig, - boolean reassignInstances, boolean dryRun) { + boolean reassignInstances, boolean bootstrap, boolean dryRun) { Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = new TreeMap<>(); if (tableConfig.getTableType() == TableType.OFFLINE) { instancePartitionsMap.put(InstancePartitionsType.OFFLINE, - getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, reassignInstances, dryRun)); + getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, reassignInstances, bootstrap, dryRun)); } else { instancePartitionsMap.put(InstancePartitionsType.CONSUMING, - getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, reassignInstances, dryRun)); + getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, reassignInstances, bootstrap, dryRun)); String tableNameWithType = tableConfig.getTableName(); if (InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig)) { LOGGER.info( "COMPLETED segments should be relocated, fetching/computing COMPLETED instance partitions for table: {}", tableNameWithType); instancePartitionsMap.put(InstancePartitionsType.COMPLETED, - getInstancePartitions(tableConfig, InstancePartitionsType.COMPLETED, reassignInstances, dryRun)); + getInstancePartitions(tableConfig, InstancePartitionsType.COMPLETED, reassignInstances, bootstrap, dryRun)); } else { LOGGER.info( "COMPLETED segments should not be relocated, skipping fetching/computing COMPLETED instance partitions " @@ -451,18 +450,18 @@ public class TableRebalancer { } private InstancePartitions getInstancePartitions(TableConfig tableConfig, - InstancePartitionsType instancePartitionsType, boolean reassignInstances, boolean dryRun) { + InstancePartitionsType instancePartitionsType, boolean reassignInstances, boolean bootstrap, boolean dryRun) { String tableNameWithType = tableConfig.getTableName(); if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, instancePartitionsType)) { if (reassignInstances) { String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); - boolean hasPreConfiguredInstancePartitions = TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, - instancePartitionsType); + boolean hasPreConfiguredInstancePartitions = + TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, instancePartitionsType); if (hasPreConfiguredInstancePartitions) { String referenceInstancePartitionsName = tableConfig.getInstancePartitionsMap().get(instancePartitionsType); - InstancePartitions instancePartitions = InstancePartitionsUtils.fetchInstancePartitionsWithRename( - _helixManager.getHelixPropertyStore(), referenceInstancePartitionsName, - instancePartitionsType.getInstancePartitionsName(rawTableName)); + InstancePartitions instancePartitions = + InstancePartitionsUtils.fetchInstancePartitionsWithRename(_helixManager.getHelixPropertyStore(), + referenceInstancePartitionsName, instancePartitionsType.getInstancePartitionsName(rawTableName)); if (!dryRun) { LOGGER.info("Persisting instance partitions: {} (referencing {})", instancePartitions, referenceInstancePartitionsName); @@ -471,8 +470,8 @@ public class TableRebalancer { } return instancePartitions; } - InstancePartitions existingInstancePartitions = - InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(), + InstancePartitions existingInstancePartitions = bootstrap ? null + : InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(), InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, instancePartitionsType.toString())); LOGGER.info("Reassigning {} instances for table: {}", instancePartitionsType, tableNameWithType); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org