This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 06d8540378 For table rebalance, check if instances are equal for NO_OP (#11073) 06d8540378 is described below commit 06d8540378d9df6220c0e0e47d42841b3376bd96 Author: summerhasama-stripe <134637018+summerhasama-str...@users.noreply.github.com> AuthorDate: Mon Jul 24 21:17:03 2023 -0400 For table rebalance, check if instances are equal for NO_OP (#11073) --- .../common/assignment/InstancePartitions.java | 19 +++ .../helix/core/rebalance/TableRebalancer.java | 174 +++++++++++++-------- .../TableRebalancerClusterStatelessTest.java | 16 +- 3 files changed, 134 insertions(+), 75 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java index a296527e84..a67bb93ce0 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.TreeMap; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.spi.utils.JsonUtils; @@ -146,4 +147,22 @@ public class InstancePartitions { public String toString() { return toJsonString(); } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof InstancePartitions)) { + return false; + } + InstancePartitions other = (InstancePartitions) obj; + return Objects.equals(_instancePartitionsName, other._instancePartitionsName) + && Objects.equals(_partitionToInstancesMap, other._partitionToInstancesMap); + } + + @Override + public int hashCode() { + return Objects.hash(_instancePartitionsName, _partitionToInstancesMap); + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index 22c5b87635..95df26ff96 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -226,8 +226,12 @@ public class TableRebalancer { // Calculate instance partitions map Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap; + boolean instancePartitionsUnchanged; try { - instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, dryRun); + Pair<Map<InstancePartitionsType, InstancePartitions>, Boolean> instancePartitionsMapAndUnchanged = + getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, dryRun); + instancePartitionsMap = instancePartitionsMapAndUnchanged.getLeft(); + instancePartitionsUnchanged = instancePartitionsMapAndUnchanged.getRight(); } catch (Exception e) { LOGGER.warn("For rebalanceId: {}, caught exception while fetching/calculating instance partitions for table: {}, " + "aborting the rebalance", rebalanceJobId, tableNameWithType, e); @@ -237,9 +241,13 @@ public class TableRebalancer { // Calculate instance partitions for tiers if configured List<Tier> sortedTiers = getSortedTiers(tableConfig); - Map<String, InstancePartitions> tierToInstancePartitionsMap = + + Pair<Map<String, InstancePartitions>, Boolean> tierToInstancePartitionsMapAndUnchanged = getTierToInstancePartitionsMap(tableConfig, sortedTiers, reassignInstances, bootstrap, dryRun); + Map<String, InstancePartitions> tierToInstancePartitionsMap = tierToInstancePartitionsMapAndUnchanged.getLeft(); + boolean tierInstancePartitionsUnchanged = tierToInstancePartitionsMapAndUnchanged.getRight(); + LOGGER.info("For rebalanceId: {}, calculating the target assignment for table: {}", rebalanceJobId, tableNameWithType); SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig); @@ -256,21 +264,26 @@ public class TableRebalancer { tierToInstancePartitionsMap, null); } - if (currentAssignment.equals(targetAssignment)) { + boolean segmentAssignmentUnchanged = currentAssignment.equals(targetAssignment); + LOGGER.info("For rebalanceId: {}, segmentAssignmentUnchanged: {}, " + + "tierInstancePartitionsUnchanged: {}, instancePartitionsUnchanged: {} for table: {}", + rebalanceJobId, segmentAssignmentUnchanged, tierInstancePartitionsUnchanged, + instancePartitionsUnchanged, tableNameWithType); + + if (segmentAssignmentUnchanged) { LOGGER.info("Table: {} is already balanced", tableNameWithType); - if (reassignInstances) { + if (instancePartitionsUnchanged && tierInstancePartitionsUnchanged) { + return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.NO_OP, "Table is already balanced", + instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment); + } else { if (dryRun) { return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE, "Instance reassigned in dry-run mode, table is already balanced", instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment); - } else { - return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE, - "Instance reassigned, table is already balanced", instancePartitionsMap, tierToInstancePartitionsMap, - targetAssignment); } - } else { - return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.NO_OP, "Table is already balanced", - instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment); + return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE, + "Instance reassigned, table is already balanced", instancePartitionsMap, tierToInstancePartitionsMap, + targetAssignment); } } @@ -399,9 +412,11 @@ public class TableRebalancer { if (segmentsToMoveChanged) { try { // Re-calculate the instance partitions in case the instance configs changed during the rebalance - instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, false); + instancePartitionsMap = + getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, false).getLeft(); tierToInstancePartitionsMap = - getTierToInstancePartitionsMap(tableConfig, sortedTiers, reassignInstances, bootstrap, dryRun); + getTierToInstancePartitionsMap(tableConfig, sortedTiers, reassignInstances, + bootstrap, dryRun).getLeft(); targetAssignment = segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers, tierToInstancePartitionsMap, rebalanceConfig); } catch (Exception e) { @@ -480,22 +495,32 @@ public class TableRebalancer { } } - private Map<InstancePartitionsType, InstancePartitions> getInstancePartitionsMap(TableConfig tableConfig, - boolean reassignInstances, boolean bootstrap, boolean dryRun) { + /** + * Gets the instance partitions for instance partition types and also returns a boolean for whether they are unchanged + */ + private Pair<Map<InstancePartitionsType, InstancePartitions>, Boolean> getInstancePartitionsMap( + TableConfig tableConfig, boolean reassignInstances, boolean bootstrap, boolean dryRun) { + boolean instancePartitionsUnchanged = true; Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = new TreeMap<>(); if (tableConfig.getTableType() == TableType.OFFLINE) { - instancePartitionsMap.put(InstancePartitionsType.OFFLINE, - getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, reassignInstances, bootstrap, dryRun)); + Pair<InstancePartitions, Boolean> partitionAndUnchangedForOffline = + getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, reassignInstances, bootstrap, dryRun); + instancePartitionsMap.put(InstancePartitionsType.OFFLINE, partitionAndUnchangedForOffline.getLeft()); + instancePartitionsUnchanged = instancePartitionsUnchanged && partitionAndUnchangedForOffline.getRight(); } else { - instancePartitionsMap.put(InstancePartitionsType.CONSUMING, - getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, reassignInstances, bootstrap, dryRun)); + Pair<InstancePartitions, Boolean> partitionAndUnchangedForConsuming = + getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, reassignInstances, bootstrap, dryRun); + instancePartitionsMap.put(InstancePartitionsType.CONSUMING, partitionAndUnchangedForConsuming.getLeft()); + instancePartitionsUnchanged = instancePartitionsUnchanged && partitionAndUnchangedForConsuming.getRight(); String tableNameWithType = tableConfig.getTableName(); if (InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig)) { + Pair<InstancePartitions, Boolean> partitionAndUnchangedForCompleted = + getInstancePartitions(tableConfig, InstancePartitionsType.COMPLETED, reassignInstances, bootstrap, dryRun); LOGGER.info( "COMPLETED segments should be relocated, fetching/computing COMPLETED instance partitions for table: {}", tableNameWithType); - instancePartitionsMap.put(InstancePartitionsType.COMPLETED, - getInstancePartitions(tableConfig, InstancePartitionsType.COMPLETED, reassignInstances, bootstrap, dryRun)); + instancePartitionsMap.put(InstancePartitionsType.COMPLETED, partitionAndUnchangedForCompleted.getLeft()); + instancePartitionsUnchanged = instancePartitionsUnchanged && partitionAndUnchangedForCompleted.getRight(); } else { LOGGER.info( "COMPLETED segments should not be relocated, skipping fetching/computing COMPLETED instance partitions " @@ -509,12 +534,21 @@ public class TableRebalancer { } } } - return instancePartitionsMap; + return Pair.of(instancePartitionsMap, instancePartitionsUnchanged); } - private InstancePartitions getInstancePartitions(TableConfig tableConfig, + /** + * Fetches/computes the instance partitions and also returns a boolean for whether they are unchanged + */ + private Pair<InstancePartitions, Boolean> getInstancePartitions(TableConfig tableConfig, InstancePartitionsType instancePartitionsType, boolean reassignInstances, boolean bootstrap, boolean dryRun) { String tableNameWithType = tableConfig.getTableName(); + + InstancePartitions existingInstancePartitions = + InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(), + InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, + instancePartitionsType.toString())); + if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, instancePartitionsType)) { if (reassignInstances) { String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); @@ -525,35 +559,33 @@ public class TableRebalancer { InstancePartitions instancePartitions = InstancePartitionsUtils.fetchInstancePartitionsWithRename(_helixManager.getHelixPropertyStore(), referenceInstancePartitionsName, instancePartitionsType.getInstancePartitionsName(rawTableName)); - if (!dryRun) { + boolean instancePartitionsUnchanged = instancePartitions.equals(existingInstancePartitions); + if (!dryRun && !instancePartitionsUnchanged) { LOGGER.info("Persisting instance partitions: {} (referencing {})", instancePartitions, referenceInstancePartitionsName); InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitions); } - return instancePartitions; + return Pair.of(instancePartitions, instancePartitionsUnchanged); } - // Set existing instance partition to null if bootstrap mode is enabled, so that the instance partition - // map can be fully recalculated. - InstancePartitions existingInstancePartitions = bootstrap ? null - : InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(), - InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, - instancePartitionsType.toString())); LOGGER.info("Reassigning {} instances for table: {}", instancePartitionsType, tableNameWithType); + // Assign instances with existing instance partition to null if bootstrap mode is enabled, + // so that the instance partition map can be fully recalculated. InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig); InstancePartitions instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true), - existingInstancePartitions); - if (!dryRun) { + bootstrap ? null : existingInstancePartitions); + boolean instancePartitionsUnchanged = instancePartitions.equals(existingInstancePartitions); + if (!dryRun && !instancePartitionsUnchanged) { LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions); InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitions); } - return instancePartitions; + return Pair.of(instancePartitions, instancePartitionsUnchanged); } else { LOGGER.info("Fetching/computing {} instance partitions for table: {}", instancePartitionsType, tableNameWithType); - return InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixManager, tableConfig, - instancePartitionsType); + return Pair.of(InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixManager, tableConfig, + instancePartitionsType), true); } } else { LOGGER.info("{} instance assignment is not allowed, using default instance partitions for table: {}", @@ -564,12 +596,15 @@ public class TableRebalancer { } InstancePartitions instancePartitions = InstancePartitionsUtils.computeDefaultInstancePartitions(_helixManager, tableConfig, instancePartitionsType); - if (!dryRun) { + + Boolean noExistingInstancePartitions = existingInstancePartitions == null; + + if (!dryRun && !noExistingInstancePartitions) { String instancePartitionsName = instancePartitions.getInstancePartitionsName(); LOGGER.info("Removing instance partitions: {} from ZK if it exists", instancePartitionsName); InstancePartitionsUtils.removeInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitionsName); } - return instancePartitions; + return Pair.of(instancePartitions, noExistingInstancePartitions); } } @@ -586,74 +621,79 @@ public class TableRebalancer { } } - @Nullable - private Map<String, InstancePartitions> getTierToInstancePartitionsMap(TableConfig tableConfig, + /** + * Fetches/computes the instance partitions for sorted tiers and also returns a boolean for whether the + * instance partitions are unchanged. + */ + private Pair<Map<String, InstancePartitions>, Boolean> getTierToInstancePartitionsMap(TableConfig tableConfig, @Nullable List<Tier> sortedTiers, boolean reassignInstances, boolean bootstrap, boolean dryRun) { if (sortedTiers == null) { - return null; + return Pair.of(null, true); } + boolean instancePartitionsUnchanged = true; Map<String, InstancePartitions> tierToInstancePartitionsMap = new HashMap<>(); for (Tier tier : sortedTiers) { LOGGER.info("Fetching/computing instance partitions for tier: {} of table: {}", tier.getName(), tableConfig.getTableName()); - tierToInstancePartitionsMap.put(tier.getName(), - getInstancePartitionsForTier(tableConfig, tier, reassignInstances, bootstrap, dryRun)); + Pair<InstancePartitions, Boolean> partitionsAndUnchanged = getInstancePartitionsForTier( + tableConfig, tier, reassignInstances, bootstrap, dryRun); + tierToInstancePartitionsMap.put(tier.getName(), partitionsAndUnchanged.getLeft()); + instancePartitionsUnchanged = instancePartitionsUnchanged && partitionsAndUnchanged.getRight(); } - return tierToInstancePartitionsMap; + return Pair.of(tierToInstancePartitionsMap, instancePartitionsUnchanged); } /** * Computes the instance partitions for the given tier. If table's instanceAssignmentConfigMap has an entry for the - * tier, it's used to calculate the instance partitions. Else default instance partitions are returned + * tier, it's used to calculate the instance partitions. Else default instance partitions are returned. Also returns + * a boolean for whether the instance partition is unchanged. */ - private InstancePartitions getInstancePartitionsForTier(TableConfig tableConfig, Tier tier, boolean reassignInstances, - boolean bootstrap, boolean dryRun) { + private Pair<InstancePartitions, Boolean> getInstancePartitionsForTier(TableConfig tableConfig, Tier tier, + boolean reassignInstances, boolean bootstrap, boolean dryRun) { PinotServerTierStorage storage = (PinotServerTierStorage) tier.getStorage(); InstancePartitions defaultInstancePartitions = InstancePartitionsUtils.computeDefaultInstancePartitionsForTag(_helixManager, tableConfig.getTableName(), tier.getName(), storage.getServerTag()); + String instancePartitionsName = + InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(), tier.getName()); + InstancePartitions existingInstancePartitions = InstancePartitionsUtils. + fetchInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitionsName); if (tableConfig.getInstanceAssignmentConfigMap() == null || !tableConfig.getInstanceAssignmentConfigMap() .containsKey(tier.getName())) { LOGGER.info("Skipping fetching/computing instance partitions for tier {} for table: {}", tier.getName(), tableConfig.getTableName()); - if (!dryRun) { - String instancePartitionsName = - InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(), tier.getName()); + Boolean noExistingInstancePartitions = existingInstancePartitions == null; + + if (!dryRun && !noExistingInstancePartitions) { LOGGER.info("Removing instance partitions: {} from ZK if it exists", instancePartitionsName); InstancePartitionsUtils.removeInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitionsName); } - return defaultInstancePartitions; + return Pair.of(defaultInstancePartitions, noExistingInstancePartitions); } String tableNameWithType = tableConfig.getTableName(); - String instancePartitionsName = - InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(), tier.getName()); if (reassignInstances) { - // Set existing instance partition to null if bootstrap mode is enabled, so that the instance partition - // map can be fully recalculated. - InstancePartitions existingInstancePartitions = bootstrap ? null - : InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(), - instancePartitionsName); + // Assign instances with existing instance partition to null if bootstrap mode is enabled, + // so that the instance partition map can be fully recalculated. InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig); InstancePartitions instancePartitions = instanceAssignmentDriver.assignInstances(tier.getName(), _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true), - existingInstancePartitions, tableConfig.getInstanceAssignmentConfigMap().get(tier.getName())); - if (!dryRun) { + bootstrap ? null : existingInstancePartitions, + tableConfig.getInstanceAssignmentConfigMap().get(tier.getName())); + boolean instancePartitionsUnchanged = instancePartitions.equals(existingInstancePartitions); + if (!dryRun && !instancePartitionsUnchanged) { LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions); InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitions); } - return instancePartitions; + return Pair.of(instancePartitions, instancePartitionsUnchanged); } - InstancePartitions instancePartitions = - InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(), - InstancePartitionsUtils.getInstancePartitionsNameForTier(tableNameWithType, tier.getName())); - if (instancePartitions != null) { - return instancePartitions; + if (existingInstancePartitions != null) { + return Pair.of(existingInstancePartitions, true); } - return defaultInstancePartitions; + return Pair.of(defaultInstancePartitions, true); } private IdealState waitForExternalViewToConverge(String tableNameWithType, boolean bestEfforts, diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java index 91f27160c0..f2eed4ccab 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java @@ -246,19 +246,19 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { tableConfig.setInstanceAssignmentConfigMap(null); _helixResourceManager.updateTableConfig(tableConfig); - // Without instances reassignment, the rebalance should return status NO_OP as instance partitions are already - // generated + // Without instances reassignment, the rebalance should return status DONE, + // and the instance partitions should be removed rebalanceResult = tableRebalancer.rebalance(tableConfig, new BaseConfiguration()); - assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + assertNull(InstancePartitionsUtils.fetchInstancePartitions(_propertyStore, + InstancePartitionsType.OFFLINE.getInstancePartitionsName(RAW_TABLE_NAME))); - // With instances reassignment, the instance partitions should be removed, and the default instance partitions - // should be used for segment assignment + // With instances reassignment, the default instance partitions + // should be used for segment assignment and should return NO_OP rebalanceConfig = new BaseConfiguration(); rebalanceConfig.addProperty(RebalanceConfigConstants.REASSIGN_INSTANCES, true); rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig); - assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); - assertNull(InstancePartitionsUtils.fetchInstancePartitions(_propertyStore, - InstancePartitionsType.OFFLINE.getInstancePartitionsName(RAW_TABLE_NAME))); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); // All servers should be assigned to the table instanceAssignment = rebalanceResult.getInstanceAssignment(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org