This is an automated email from the ASF dual-hosted git repository. xbli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 5f220b398c Add support for performing pre-checks for TableRebalance (#15029) 5f220b398c is described below commit 5f220b398cda1fde87a286492e05521d48923634 Author: Sonam Mandal <sonam.man...@startree.ai> AuthorDate: Fri Feb 14 21:47:03 2025 -0800 Add support for performing pre-checks for TableRebalance (#15029) * Add support for performing pre-checks for TableRebalance --- .../pinot/controller/BaseControllerStarter.java | 6 +- .../apache/pinot/controller/ControllerConf.java | 11 ++ .../api/resources/PinotSegmentRestletResource.java | 2 +- .../api/resources/PinotTableRestletResource.java | 14 +- .../helix/core/PinotHelixResourceManager.java | 31 ++++- .../core/rebalance/DefaultRebalancePreChecker.java | 142 +++++++++++++++++++++ .../helix/core/rebalance/RebalanceConfig.java | 23 +++- .../helix/core/rebalance/RebalancePreChecker.java | 31 +++++ .../core/rebalance/RebalancePreCheckerFactory.java | 42 ++++++ .../helix/core/rebalance/RebalanceResult.java | 11 +- .../helix/core/rebalance/TableRebalancer.java | 70 ++++++---- .../rebalance/tenant/DefaultTenantRebalancer.java | 4 +- .../rebalance/tenant/TenantRebalanceResult.java | 2 +- .../util/ServerSegmentMetadataReader.java | 24 +++- .../pinot/controller/util/TableMetadataReader.java | 35 ++++- .../TableRebalancerClusterStatelessTest.java | 15 ++- .../tests/OfflineClusterIntegrationTest.java | 119 +++++++++++++++++ 17 files changed, 529 insertions(+), 53 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index a0ce503d41..e95e0f5d87 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -252,11 +252,11 @@ public abstract class BaseControllerStarter implements ServiceStartable { // queries) FunctionRegistry.init(); _adminApp = createControllerAdminApp(); + // This executor service is used to do async tasks from multiget util or table rebalancing. + _executorService = createExecutorService(_config.getControllerExecutorNumThreads(), "async-task-thread-%d"); // Do not use this before the invocation of {@link PinotHelixResourceManager::start()}, which happens in {@link // ControllerStarter::start()} _helixResourceManager = createHelixResourceManager(); - // This executor service is used to do async tasks from multiget util or table rebalancing. - _executorService = createExecutorService(_config.getControllerExecutorNumThreads(), "async-task-thread-%d"); _tenantRebalanceExecutorService = createExecutorService(_config.getControllerExecutorRebalanceNumThreads(), "tenant-rebalance-thread-%d"); _tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager, _tenantRebalanceExecutorService); @@ -324,7 +324,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { * @return A new instance of PinotHelixResourceManager. */ protected PinotHelixResourceManager createHelixResourceManager() { - return new PinotHelixResourceManager(_config); + return new PinotHelixResourceManager(_config, _executorService); } public PinotHelixResourceManager getHelixResourceManager() { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index 2957ca81fd..5ea1fd20cf 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -293,6 +293,7 @@ public class ControllerConf extends PinotConfiguration { public static final String ACCESS_CONTROL_USERNAME = "access.control.init.username"; public static final String ACCESS_CONTROL_PASSWORD = "access.control.init.password"; public static final String LINEAGE_MANAGER_CLASS = "controller.lineage.manager.class"; + public static final String REBALANCE_PRE_CHECKER_CLASS = "controller.rebalance.pre.checker.class"; // Amount of the time the segment can take from the beginning of upload to the end of upload. Used when parallel push // protection is enabled. If the upload does not finish within the timeout, next upload can override the previous one. private static final String SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = "controller.segment.upload.timeoutInMillis"; @@ -316,6 +317,8 @@ public class ControllerConf extends PinotConfiguration { private static final String DEFAULT_ACCESS_CONTROL_PASSWORD = "admin"; private static final String DEFAULT_LINEAGE_MANAGER = "org.apache.pinot.controller.helix.core.lineage.DefaultLineageManager"; + private static final String DEFAULT_REBALANCE_PRE_CHECKER = + "org.apache.pinot.controller.helix.core.rebalance.DefaultRebalancePreChecker"; private static final long DEFAULT_SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = 600_000L; // 10 minutes private static final int DEFAULT_MIN_NUM_CHARS_IN_IS_TO_TURN_ON_COMPRESSION = -1; private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 64; @@ -952,6 +955,14 @@ public class ControllerConf extends PinotConfiguration { setProperty(LINEAGE_MANAGER_CLASS, lineageModifierClass); } + public String getRebalancePreCheckerClass() { + return getProperty(REBALANCE_PRE_CHECKER_CLASS, DEFAULT_REBALANCE_PRE_CHECKER); + } + + public void setRebalancePreCheckerClass(String rebalancePreCheckerClass) { + setProperty(REBALANCE_PRE_CHECKER_CLASS, rebalancePreCheckerClass); + } + public long getSegmentUploadTimeoutInMillis() { return getProperty(SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS, DEFAULT_SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java index 37e365bc7f..600c75b718 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java @@ -948,7 +948,7 @@ public class PinotSegmentRestletResource { new TableMetadataReader(_executor, _connectionManager, _pinotHelixResourceManager); Map<String, JsonNode> needReloadMetadata = tableMetadataReader.getServerCheckSegmentsReloadMetadata(tableNameWithType, - _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000); + _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000).getServerReloadJsonResponses(); boolean needReload = needReloadMetadata.values().stream().anyMatch(value -> value.get("needReload").booleanValue()); Map<String, ServerSegmentsReloadCheckResponse> serverResponses = new HashMap<>(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index 638849df46..74361a62a8 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -604,6 +604,8 @@ public class PinotTableRestletResource { @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr, @ApiParam(value = "Whether to rebalance table in dry-run mode") @DefaultValue("false") @QueryParam("dryRun") boolean dryRun, + @ApiParam(value = "Whether to enable pre-checks for table, must be in dry-run mode to enable") + @DefaultValue("false") @QueryParam("preChecks") boolean preChecks, @ApiParam(value = "Whether to reassign instances before reassigning segments") @DefaultValue("false") @QueryParam("reassignInstances") boolean reassignInstances, @ApiParam(value = "Whether to reassign CONSUMING segments for real-time table") @DefaultValue("false") @@ -644,6 +646,7 @@ public class PinotTableRestletResource { String tableNameWithType = constructTableNameWithType(tableName, tableTypeStr); RebalanceConfig rebalanceConfig = new RebalanceConfig(); rebalanceConfig.setDryRun(dryRun); + rebalanceConfig.setPreChecks(preChecks); rebalanceConfig.setReassignInstances(reassignInstances); rebalanceConfig.setIncludeConsuming(includeConsuming); rebalanceConfig.setBootstrap(bootstrap); @@ -663,8 +666,9 @@ public class PinotTableRestletResource { String rebalanceJobId = TableRebalancer.createUniqueRebalanceJobIdentifier(); try { - if (dryRun || downtime) { - // For dry-run or rebalance with downtime, directly return the rebalance result as it should return immediately + if (dryRun || preChecks || downtime) { + // For dry-run, preChecks or rebalance with downtime, directly return the rebalance result as it should return + // immediately return _pinotHelixResourceManager.rebalanceTable(tableNameWithType, rebalanceConfig, rebalanceJobId, false); } else { // Make a dry-run first to get the target assignment @@ -682,7 +686,8 @@ public class PinotTableRestletResource { } catch (Throwable t) { String errorMsg = String.format("Caught exception/error while rebalancing table: %s", tableNameWithType); LOGGER.error(errorMsg, t); - return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, errorMsg, null, null, null); + return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, errorMsg, null, null, null, + null); } }); boolean isJobIdPersisted = waitForRebalanceToPersist( @@ -702,7 +707,8 @@ public class PinotTableRestletResource { return new RebalanceResult(dryRunResult.getJobId(), RebalanceResult.Status.IN_PROGRESS, "In progress, check controller logs for updates", dryRunResult.getInstanceAssignment(), - dryRunResult.getTierInstanceAssignment(), dryRunResult.getSegmentAssignment()); + dryRunResult.getTierInstanceAssignment(), dryRunResult.getSegmentAssignment(), + dryRunResult.getPreChecksResult()); } else { // If dry-run failed or is no-op, return the dry-run result return dryRunResult; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 1f983ce671..748f9d3c28 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -48,6 +48,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; @@ -151,6 +152,8 @@ import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; +import org.apache.pinot.controller.helix.core.rebalance.RebalancePreChecker; +import org.apache.pinot.controller.helix.core.rebalance.RebalancePreCheckerFactory; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext; import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer; @@ -237,10 +240,12 @@ public class PinotHelixResourceManager { private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager; private TableCache _tableCache; private final LineageManager _lineageManager; + private final RebalancePreChecker _rebalancePreChecker; public PinotHelixResourceManager(String zkURL, String helixClusterName, @Nullable String dataDir, boolean isSingleTenantCluster, boolean enableBatchMessageMode, int deletedSegmentsRetentionInDays, - boolean enableTieredSegmentAssignment, LineageManager lineageManager) { + boolean enableTieredSegmentAssignment, LineageManager lineageManager, RebalancePreChecker rebalancePreChecker, + @Nullable ExecutorService executorService) { _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL); _helixClusterName = helixClusterName; _dataDir = dataDir; @@ -263,13 +268,24 @@ public class PinotHelixResourceManager { _lineageUpdaterLocks[i] = new Object(); } _lineageManager = lineageManager; + _rebalancePreChecker = rebalancePreChecker; + _rebalancePreChecker.init(this, executorService); + } + + public PinotHelixResourceManager(ControllerConf controllerConf, @Nullable ExecutorService executorService) { + this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), controllerConf.getDataDir(), + controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(), + controllerConf.getDeletedSegmentsRetentionInDays(), controllerConf.tieredSegmentAssignmentEnabled(), + LineageManagerFactory.create(controllerConf), + RebalancePreCheckerFactory.create(controllerConf.getRebalancePreCheckerClass()), executorService); } public PinotHelixResourceManager(ControllerConf controllerConf) { this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), controllerConf.getDataDir(), controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(), controllerConf.getDeletedSegmentsRetentionInDays(), controllerConf.tieredSegmentAssignmentEnabled(), - LineageManagerFactory.create(controllerConf)); + LineageManagerFactory.create(controllerConf), + RebalancePreCheckerFactory.create(controllerConf.getRebalancePreCheckerClass()), null); } /** @@ -423,6 +439,15 @@ public class PinotHelixResourceManager { return _lineageManager; } + /** + * Get the rebalance pre-checker + * + * @return rebalance pre-checker + */ + public RebalancePreChecker getRebalancePreChecker() { + return _rebalancePreChecker; + } + /** * Instance related APIs */ @@ -3587,7 +3612,7 @@ public class PinotHelixResourceManager { tierToSegmentsMap = updateTargetTier(rebalanceJobId, tableNameWithType, tableConfig); } TableRebalancer tableRebalancer = - new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver, _controllerMetrics); + new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver, _controllerMetrics, _rebalancePreChecker); return tableRebalancer.rebalance(tableConfig, rebalanceConfig, rebalanceJobId, tierToSegmentsMap); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java new file mode 100644 index 0000000000..945a98e48e --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import javax.annotation.Nullable; +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; +import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils; +import org.apache.pinot.common.exception.InvalidConfigException; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.util.TableMetadataReader; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig; +import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class DefaultRebalancePreChecker implements RebalancePreChecker { + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRebalancePreChecker.class); + + public static final String NEEDS_RELOAD_STATUS = "needsReloadStatus"; + public static final String IS_MINIMIZE_DATA_MOVEMENT = "isMinimizeDataMovement"; + + private PinotHelixResourceManager _pinotHelixResourceManager; + private ExecutorService _executorService; + + @Override + public void init(PinotHelixResourceManager pinotHelixResourceManager, @Nullable ExecutorService executorService) { + _pinotHelixResourceManager = pinotHelixResourceManager; + _executorService = executorService; + } + + @Override + public Map<String, String> check(String rebalanceJobId, String tableNameWithType, + TableConfig tableConfig) { + LOGGER.info("Start pre-checks for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); + + Map<String, String> preCheckResult = new HashMap<>(); + // Check for reload status + Boolean needsReload = checkReloadNeededOnServers(rebalanceJobId, tableNameWithType); + preCheckResult.put(NEEDS_RELOAD_STATUS, needsReload == null ? "error" : String.valueOf(needsReload)); + // Check whether minimizeDataMovement is set in TableConfig + boolean isMinimizeDataMovement = checkIsMinimizeDataMovement(rebalanceJobId, tableNameWithType, tableConfig); + preCheckResult.put(IS_MINIMIZE_DATA_MOVEMENT, String.valueOf(isMinimizeDataMovement)); + + LOGGER.info("End pre-checks for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); + return preCheckResult; + } + + /** + * Checks if the current segments on any servers needs a reload (table config or schema change that hasn't been + * applied yet). This check does not guarantee that the segments in deep store are up to date. + * TODO: Add an API to check for whether segments in deep store are up to date with the table configs and schema + * and add a pre-check here to call that API. + */ + private Boolean checkReloadNeededOnServers(String rebalanceJobId, String tableNameWithType) { + LOGGER.info("Fetching whether reload is needed for table: {} with rebalanceJobId: {}", tableNameWithType, + rebalanceJobId); + Boolean needsReload = null; + if (_executorService == null) { + LOGGER.warn("Executor service is null, skipping needsReload check for table: {} rebalanceJobId: {}", + tableNameWithType, rebalanceJobId); + return needsReload; + } + try (PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager()) { + TableMetadataReader metadataReader = new TableMetadataReader(_executorService, connectionManager, + _pinotHelixResourceManager); + TableMetadataReader.TableReloadJsonResponse needsReloadMetadataPair = + metadataReader.getServerCheckSegmentsReloadMetadata(tableNameWithType, 30_000); + Map<String, JsonNode> needsReloadMetadata = needsReloadMetadataPair.getServerReloadJsonResponses(); + int failedResponses = needsReloadMetadataPair.getNumFailedResponses(); + LOGGER.info("Received {} needs reload responses and {} failed responses from servers for table: {} with " + + "rebalanceJobId: {}", needsReloadMetadata.size(), failedResponses, tableNameWithType, rebalanceJobId); + needsReload = needsReloadMetadata.values().stream().anyMatch(value -> value.get("needReload").booleanValue()); + if (needsReload) { + return needsReload; + } + if (failedResponses > 0) { + LOGGER.warn("Received {} failed responses from servers and needsReload is false from returned responses, " + + "check needsReload status manually", failedResponses); + needsReload = null; + } + } catch (InvalidConfigException | IOException e) { + LOGGER.warn("Caught exception while trying to fetch reload status from servers", e); + } + + return needsReload; + } + + /** + * Checks if minimize data movement is set for the given table in the TableConfig + */ + private boolean checkIsMinimizeDataMovement(String rebalanceJobId, String tableNameWithType, + TableConfig tableConfig) { + LOGGER.info("Checking whether minimizeDataMovement is set for table: {} with rebalanceJobId: {}", tableNameWithType, + rebalanceJobId); + try { + if (tableConfig.getTableType() == TableType.OFFLINE) { + InstanceAssignmentConfig instanceAssignmentConfig = + InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig, InstancePartitionsType.OFFLINE); + return instanceAssignmentConfig.isMinimizeDataMovement(); + } else { + InstanceAssignmentConfig instanceAssignmentConfigConsuming = + InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig, InstancePartitionsType.CONSUMING); + // For REALTIME tables need to check for both CONSUMING and COMPLETED segments if relocation is enabled + if (!InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig)) { + return instanceAssignmentConfigConsuming.isMinimizeDataMovement(); + } + + InstanceAssignmentConfig instanceAssignmentConfigCompleted = + InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig, InstancePartitionsType.COMPLETED); + return instanceAssignmentConfigConsuming.isMinimizeDataMovement() + && instanceAssignmentConfigCompleted.isMinimizeDataMovement(); + } + } catch (IllegalStateException e) { + LOGGER.warn("Error while trying to fetch instance assignment config, assuming minimizeDataMovement is false", e); + return false; + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java index b6b471cfbc..e5bd39f0ec 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java @@ -34,6 +34,12 @@ public class RebalanceConfig { @ApiModelProperty(example = "false") private boolean _dryRun = false; + // Whether to perform pre-checks for rebalance. This only returns the status of each pre-check and does not fail + // rebalance + @JsonProperty("preChecks") + @ApiModelProperty(example = "false") + private boolean _preChecks = false; + // Whether to reassign instances before reassigning segments @JsonProperty("reassignInstances") @ApiModelProperty(example = "false") @@ -118,6 +124,14 @@ public class RebalanceConfig { _dryRun = dryRun; } + public boolean isPreChecks() { + return _preChecks; + } + + public void setPreChecks(boolean preChecks) { + _preChecks = preChecks; + } + public boolean isReassignInstances() { return _reassignInstances; } @@ -232,10 +246,10 @@ public class RebalanceConfig { @Override public String toString() { - return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", _reassignInstances=" + _reassignInstances - + ", _includeConsuming=" + _includeConsuming + ", _bootstrap=" + _bootstrap + ", _downtime=" + _downtime - + ", _minAvailableReplicas=" + _minAvailableReplicas + ", _bestEfforts=" + _bestEfforts - + ", _externalViewCheckIntervalInMs=" + _externalViewCheckIntervalInMs + return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", preChecks=" + _preChecks + ", _reassignInstances=" + + _reassignInstances + ", _includeConsuming=" + _includeConsuming + ", _bootstrap=" + _bootstrap + + ", _downtime=" + _downtime + ", _minAvailableReplicas=" + _minAvailableReplicas + ", _bestEfforts=" + + _bestEfforts + ", _externalViewCheckIntervalInMs=" + _externalViewCheckIntervalInMs + ", _externalViewStabilizationTimeoutInMs=" + _externalViewStabilizationTimeoutInMs + ", _updateTargetTier=" + _updateTargetTier + ", _heartbeatIntervalInMs=" + _heartbeatIntervalInMs + ", _heartbeatTimeoutInMs=" + _heartbeatTimeoutInMs + ", _maxAttempts=" + _maxAttempts + ", _retryInitialDelayInMs=" @@ -245,6 +259,7 @@ public class RebalanceConfig { public static RebalanceConfig copy(RebalanceConfig cfg) { RebalanceConfig rc = new RebalanceConfig(); rc._dryRun = cfg._dryRun; + rc._preChecks = cfg._preChecks; rc._reassignInstances = cfg._reassignInstances; rc._includeConsuming = cfg._includeConsuming; rc._bootstrap = cfg._bootstrap; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java new file mode 100644 index 0000000000..54f15e8b33 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance; + +import java.util.Map; +import java.util.concurrent.ExecutorService; +import javax.annotation.Nullable; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.spi.config.table.TableConfig; + + +public interface RebalancePreChecker { + void init(PinotHelixResourceManager pinotHelixResourceManager, @Nullable ExecutorService executorService); + Map<String, String> check(String rebalanceJobId, String tableNameWithType, TableConfig tableConfig); +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreCheckerFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreCheckerFactory.java new file mode 100644 index 0000000000..286942dc78 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreCheckerFactory.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class RebalancePreCheckerFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(RebalancePreCheckerFactory.class); + + private RebalancePreCheckerFactory() { + } + + public static RebalancePreChecker create(String rebalancePreCheckerClassName) { + try { + LOGGER.info("Trying to create rebalance pre-checker object for class: {}", rebalancePreCheckerClassName); + return (RebalancePreChecker) Class.forName(rebalancePreCheckerClassName).newInstance(); + } catch (Exception e) { + String errMsg = String.format("Failed to create rebalance pre-checker for class: %s", + rebalancePreCheckerClassName); + LOGGER.error(errMsg, e); + throw new RuntimeException(e); + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java index 2be7fc7753..2b81d8d78b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java @@ -40,6 +40,8 @@ public class RebalanceResult { @JsonInclude(JsonInclude.Include.NON_NULL) private final Map<String, Map<String, String>> _segmentAssignment; private final String _description; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final Map<String, String> _preChecksResult; @JsonCreator public RebalanceResult(@JsonProperty(value = "jobId", required = true) String jobId, @@ -47,13 +49,15 @@ public class RebalanceResult { @JsonProperty(value = "description", required = true) String description, @JsonProperty("instanceAssignment") @Nullable Map<InstancePartitionsType, InstancePartitions> instanceAssignment, @JsonProperty("tierInstanceAssignment") @Nullable Map<String, InstancePartitions> tierInstanceAssignment, - @JsonProperty("segmentAssignment") @Nullable Map<String, Map<String, String>> segmentAssignment) { + @JsonProperty("segmentAssignment") @Nullable Map<String, Map<String, String>> segmentAssignment, + @JsonProperty("preChecksResult") @Nullable Map<String, String> preChecksResult) { _jobId = jobId; _status = status; _description = description; _instanceAssignment = instanceAssignment; _tierInstanceAssignment = tierInstanceAssignment; _segmentAssignment = segmentAssignment; + _preChecksResult = preChecksResult; } @JsonProperty @@ -86,6 +90,11 @@ public class RebalanceResult { return _segmentAssignment; } + @JsonProperty + public Map<String, String> getPreChecksResult() { + return _preChecksResult; + } + public enum Status { // FAILED if the job has ended with known exceptions; // ABORTED if the job is stopped by others but retry is still allowed; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index facd904f21..5dc7c4d8ea 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -119,9 +119,10 @@ public class TableRebalancer { private final HelixDataAccessor _helixDataAccessor; private final TableRebalanceObserver _tableRebalanceObserver; private final ControllerMetrics _controllerMetrics; + private final RebalancePreChecker _rebalancePreChecker; public TableRebalancer(HelixManager helixManager, @Nullable TableRebalanceObserver tableRebalanceObserver, - @Nullable ControllerMetrics controllerMetrics) { + @Nullable ControllerMetrics controllerMetrics, @Nullable RebalancePreChecker rebalancePreChecker) { _helixManager = helixManager; if (tableRebalanceObserver != null) { _tableRebalanceObserver = tableRebalanceObserver; @@ -130,10 +131,11 @@ public class TableRebalancer { } _helixDataAccessor = helixManager.getHelixDataAccessor(); _controllerMetrics = controllerMetrics; + _rebalancePreChecker = rebalancePreChecker; } public TableRebalancer(HelixManager helixManager) { - this(helixManager, null, null); + this(helixManager, null, null, null); } public static String createUniqueRebalanceJobIdentifier() { @@ -173,6 +175,7 @@ public class TableRebalancer { rebalanceJobId = createUniqueRebalanceJobIdentifier(); } boolean dryRun = rebalanceConfig.isDryRun(); + boolean preChecks = rebalanceConfig.isPreChecks(); boolean reassignInstances = rebalanceConfig.isReassignInstances(); boolean includeConsuming = rebalanceConfig.isIncludeConsuming(); boolean bootstrap = rebalanceConfig.isBootstrap(); @@ -186,13 +189,29 @@ public class TableRebalancer { && RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase( tableConfig.getRoutingConfig().getInstanceSelectorType()); LOGGER.info( - "Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, includeConsuming: {}, bootstrap: {}, " - + "downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}, lowDiskMode: {}, " - + "bestEfforts: {}, externalViewCheckIntervalInMs: {}, externalViewStabilizationTimeoutInMs: {}", - tableNameWithType, dryRun, reassignInstances, includeConsuming, bootstrap, downtime, + "Start rebalancing table: {} with dryRun: {}, preChecks: {}, reassignInstances: {}, includeConsuming: {}, " + + "bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}, " + + "lowDiskMode: {}, bestEfforts: {}, externalViewCheckIntervalInMs: {}, " + + "externalViewStabilizationTimeoutInMs: {}", + tableNameWithType, dryRun, preChecks, reassignInstances, includeConsuming, bootstrap, downtime, minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup, lowDiskMode, bestEfforts, externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs); + // Perform pre-checks if enabled + Map<String, String> preChecksResult = null; + if (preChecks) { + if (!dryRun) { + // Dry-run must be enabled to run pre-checks + String errorMsg = String.format("Pre-checks can only be enabled in dry-run mode, not triggering rebalance for " + + "table: %s with rebalanceJobId: %s", tableNameWithType, rebalanceJobId); + LOGGER.error(errorMsg); + return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, errorMsg, null, null, null, null); + } + if (_rebalancePreChecker != null) { + preChecksResult = _rebalancePreChecker.check(rebalanceJobId, tableNameWithType, tableConfig); + } + } + // Fetch ideal state PropertyKey idealStatePropertyKey = _helixDataAccessor.keyBuilder().idealStates(tableNameWithType); IdealState currentIdealState; @@ -203,21 +222,21 @@ public class TableRebalancer { "For rebalanceId: %s, caught exception while fetching IdealState for table: %s, aborting the rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, - "Caught exception while fetching IdealState: " + e, null, null, null); + "Caught exception while fetching IdealState: " + e, null, null, null, preChecksResult); } if (currentIdealState == null) { onReturnFailure( String.format("For rebalanceId: %s, cannot find the IdealState for table: %s, aborting the rebalance", rebalanceJobId, tableNameWithType), null); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Cannot find the IdealState for table", - null, null, null); + null, null, null, preChecksResult); } if (!currentIdealState.isEnabled() && !downtime) { onReturnFailure(String.format( "For rebalanceId: %s, cannot rebalance disabled table: %s without downtime, aborting the rebalance", rebalanceJobId, tableNameWithType), null); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, - "Cannot rebalance disabled table without downtime", null, null, null); + "Cannot rebalance disabled table without downtime", null, null, null, preChecksResult); } LOGGER.info("For rebalanceId: {}, processing instance partitions for table: {}", rebalanceJobId, tableNameWithType); @@ -235,7 +254,7 @@ public class TableRebalancer { "For rebalanceId: %s, caught exception while fetching/calculating instance partitions for table: %s, " + "aborting the rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, - "Caught exception while fetching/calculating instance partitions: " + e, null, null, null); + "Caught exception while fetching/calculating instance partitions: " + e, null, null, null, preChecksResult); } // Calculate instance partitions for tiers if configured @@ -253,7 +272,8 @@ public class TableRebalancer { "For rebalanceId: %s, caught exception while fetching/calculating tier instance partitions for table: %s, " + "aborting the rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, - "Caught exception while fetching/calculating tier instance partitions: " + e, null, null, null); + "Caught exception while fetching/calculating tier instance partitions: " + e, null, null, null, + preChecksResult); } LOGGER.info("For rebalanceId: {}, calculating the target assignment for table: {}", rebalanceJobId, @@ -271,7 +291,7 @@ public class TableRebalancer { + "rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while calculating target assignment: " + e, instancePartitionsMap, - tierToInstancePartitionsMap, null); + tierToInstancePartitionsMap, null, preChecksResult); } boolean segmentAssignmentUnchanged = currentAssignment.equals(targetAssignment); @@ -286,19 +306,19 @@ public class TableRebalancer { String.format("For rebalanceId: %s, instance unchanged and table: %s is already balanced", rebalanceJobId, tableNameWithType)); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.NO_OP, "Table is already balanced", - instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment); + instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment, preChecksResult); } else { if (dryRun) { return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE, "Instance reassigned in dry-run mode, table is already balanced", instancePartitionsMap, - tierToInstancePartitionsMap, targetAssignment); + tierToInstancePartitionsMap, targetAssignment, preChecksResult); } else { _tableRebalanceObserver.onSuccess( String.format("For rebalanceId: %s, instance reassigned but table: %s is already balanced", rebalanceJobId, tableNameWithType)); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE, "Instance reassigned, table is already balanced", instancePartitionsMap, tierToInstancePartitionsMap, - targetAssignment); + targetAssignment, preChecksResult); } } } @@ -307,7 +327,7 @@ public class TableRebalancer { LOGGER.info("For rebalanceId: {}, rebalancing table: {} in dry-run mode, returning the target assignment", rebalanceJobId, tableNameWithType); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE, "Dry-run mode", instancePartitionsMap, - tierToInstancePartitionsMap, targetAssignment); + tierToInstancePartitionsMap, targetAssignment, preChecksResult); } if (downtime) { @@ -332,14 +352,14 @@ public class TableRebalancer { return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE, "Success with downtime (replaced IdealState with the target segment assignment, ExternalView might not " + "reach the target segment assignment yet)", instancePartitionsMap, tierToInstancePartitionsMap, - targetAssignment); + targetAssignment, preChecksResult); } catch (Exception e) { onReturnFailure(String.format( "For rebalanceId: %s, caught exception while updating IdealState for table: %s, aborting the rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while updating IdealState: " + e, instancePartitionsMap, tierToInstancePartitionsMap, - targetAssignment); + targetAssignment, preChecksResult); } } @@ -371,7 +391,7 @@ public class TableRebalancer { minReplicasToKeepUpForNoDowntime, tableNameWithType, numReplicas), null); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Illegal min available replicas config", instancePartitionsMap, tierToInstancePartitionsMap, - targetAssignment); + targetAssignment, preChecksResult); } minAvailableReplicas = minReplicasToKeepUpForNoDowntime; } else { @@ -412,12 +432,12 @@ public class TableRebalancer { if (_tableRebalanceObserver.isStopped()) { return new RebalanceResult(rebalanceJobId, _tableRebalanceObserver.getStopStatus(), "Caught exception while waiting for ExternalView to converge: " + e, instancePartitionsMap, - tierToInstancePartitionsMap, targetAssignment); + tierToInstancePartitionsMap, targetAssignment, preChecksResult); } _tableRebalanceObserver.onError(errorMsg); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while waiting for ExternalView to converge: " + e, instancePartitionsMap, - tierToInstancePartitionsMap, targetAssignment); + tierToInstancePartitionsMap, targetAssignment, preChecksResult); } // Re-calculate the target assignment if IdealState changed while waiting for ExternalView to converge @@ -466,7 +486,7 @@ public class TableRebalancer { + "aborting the rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while re-calculating the target assignment: " + e, instancePartitionsMap, - tierToInstancePartitionsMap, targetAssignment); + tierToInstancePartitionsMap, targetAssignment, preChecksResult); } } else { LOGGER.info("For rebalanceId:{}, no state change found for segments to be moved, " @@ -490,7 +510,7 @@ public class TableRebalancer { return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE, "Success with minAvailableReplicas: " + minAvailableReplicas + " (both IdealState and ExternalView should reach the target segment assignment)", - instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment); + instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment, preChecksResult); } // Record change of current ideal state and the new target @@ -499,7 +519,7 @@ public class TableRebalancer { if (_tableRebalanceObserver.isStopped()) { return new RebalanceResult(rebalanceJobId, _tableRebalanceObserver.getStopStatus(), "Rebalance has stopped already before updating the IdealState", instancePartitionsMap, - tierToInstancePartitionsMap, targetAssignment); + tierToInstancePartitionsMap, targetAssignment, preChecksResult); } Map<String, Map<String, String>> nextAssignment = getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, @@ -530,7 +550,7 @@ public class TableRebalancer { + "aborting the rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while updating IdealState: " + e, instancePartitionsMap, tierToInstancePartitionsMap, - targetAssignment); + targetAssignment, preChecksResult); } segmentsToMonitor = new HashSet<>(segmentsToMove); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java index ec55c38b95..e115476529 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java @@ -60,7 +60,7 @@ public class DefaultTenantRebalancer implements TenantRebalancer { false)); } catch (TableNotFoundException exception) { rebalanceResult.put(table, new RebalanceResult(null, RebalanceResult.Status.FAILED, exception.getMessage(), - null, null, null)); + null, null, null, null)); } }); if (config.isDryRun()) { @@ -71,7 +71,7 @@ public class DefaultTenantRebalancer implements TenantRebalancer { if (result.getStatus() == RebalanceResult.Status.DONE) { rebalanceResult.put(table, new RebalanceResult(result.getJobId(), RebalanceResult.Status.IN_PROGRESS, "In progress, check controller task status for the", result.getInstanceAssignment(), - result.getTierInstanceAssignment(), result.getSegmentAssignment())); + result.getTierInstanceAssignment(), result.getSegmentAssignment(), result.getPreChecksResult())); } } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java index 57c17054d7..c96c25b250 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java @@ -36,7 +36,7 @@ public class TenantRebalanceResult { _rebalanceTableResults = new HashMap<>(); rebalanceTableResults.forEach((table, result) -> { _rebalanceTableResults.put(table, new RebalanceResult(result.getJobId(), result.getStatus(), - result.getDescription(), null, null, null)); + result.getDescription(), null, null, null, null)); }); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java index 8dde7f08fe..0376b90dac 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java @@ -222,8 +222,8 @@ public class ServerSegmentMetadataReader { * This method will return metadata of all the servers along with need reload flag. * In future additional details like segments list can also be added */ - public List<String> getCheckReloadSegmentsFromServer(String tableNameWithType, Set<String> serverInstances, - BiMap<String, String> endpoints, int timeoutMs) { + public TableReloadResponse getCheckReloadSegmentsFromServer(String tableNameWithType, + Set<String> serverInstances, BiMap<String, String> endpoints, int timeoutMs) { LOGGER.debug("Checking if reload is needed on segments from servers for table {}.", tableNameWithType); List<String> serverURLs = new ArrayList<>(); for (String serverInstance : serverInstances) { @@ -250,7 +250,7 @@ public class ServerSegmentMetadataReader { } LOGGER.debug("Retrieved metadata of reload check from servers."); - return serversNeedReloadResponses; + return new TableReloadResponse(serviceResponse._failedResponseCount, serversNeedReloadResponses); } /** @@ -505,4 +505,22 @@ public class ServerSegmentMetadataReader { tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8); return String.format("%s/tables/%s/segments/isStale", endpoint, tableNameWithType); } + + public class TableReloadResponse { + private int _numFailedResponses; + private List<String> _serverReloadResponses; + + TableReloadResponse(int numFailedResponses, List<String> serverReloadResponses) { + _numFailedResponses = numFailedResponses; + _serverReloadResponses = serverReloadResponses; + } + + public int getNumFailedResponses() { + return _numFailedResponses; + } + + public List<String> getServerReloadResponses() { + return _serverReloadResponses; + } + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java index 48f53577a8..c6ba7aa59f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java @@ -59,19 +59,26 @@ public class TableMetadataReader { _pinotHelixResourceManager = helixResourceManager; } - public Map<String, JsonNode> getServerCheckSegmentsReloadMetadata(String tableNameWithType, int timeoutMs) + /** + * Check if segments need a reload on any servers + * @return pair of: a) number of failed responses, b) reload responses returned + */ + public TableReloadJsonResponse getServerCheckSegmentsReloadMetadata(String tableNameWithType, + int timeoutMs) throws InvalidConfigException, IOException { - List<String> segmentsMetadata = getReloadCheckResponses(tableNameWithType, timeoutMs); + ServerSegmentMetadataReader.TableReloadResponse segmentsMetadataPair = + getReloadCheckResponses(tableNameWithType, timeoutMs); + List<String> segmentsMetadata = segmentsMetadataPair.getServerReloadResponses(); Map<String, JsonNode> response = new HashMap<>(); for (String segmentMetadata : segmentsMetadata) { JsonNode responseJson = JsonUtils.stringToJsonNode(segmentMetadata); response.put(responseJson.get("instanceId").asText(), responseJson); } - return response; + return new TableReloadJsonResponse(segmentsMetadataPair.getNumFailedResponses(), response); } - public List<String> getReloadCheckResponses(String tableNameWithType, int timeoutMs) - throws InvalidConfigException { + public ServerSegmentMetadataReader.TableReloadResponse getReloadCheckResponses(String tableNameWithType, + int timeoutMs) throws InvalidConfigException { TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); List<String> serverInstances = _pinotHelixResourceManager.getServerInstancesForTable(tableNameWithType, tableType); Set<String> serverInstanceSet = new HashSet<>(serverInstances); @@ -213,4 +220,22 @@ public class TableMetadataReader { return serverSegmentMetadataReader.getStaleSegmentsFromServer(tableNameWithType, serverInstanceSet, endpoints, timeoutMs); } + + public class TableReloadJsonResponse { + private int _numFailedResponses; + private Map<String, JsonNode> _serverReloadJsonResponses; + + TableReloadJsonResponse(int numFailedResponses, Map<String, JsonNode> serverReloadJsonResponses) { + _numFailedResponses = numFailedResponses; + _serverReloadJsonResponses = serverReloadJsonResponses; + } + + public int getNumFailedResponses() { + return _numFailedResponses; + } + + public Map<String, JsonNode> getServerReloadJsonResponses() { + return _serverReloadJsonResponses; + } + } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java index c5ac9e82bd..de83ec6eee 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Map; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.assignment.InstancePartitionsUtils; @@ -90,7 +91,9 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + i, true); } - TableRebalancer tableRebalancer = new TableRebalancer(_helixManager); + DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker(); + preChecker.init(_helixResourceManager, Executors.newFixedThreadPool(10)); + TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, null, preChecker); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build(); @@ -137,8 +140,18 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { // Rebalance in dry-run mode RebalanceConfig rebalanceConfig = new RebalanceConfig(); rebalanceConfig.setDryRun(true); + rebalanceConfig.setPreChecks(true); rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + Map<String, String> preCheckResult = rebalanceResult.getPreChecksResult(); + assertNotNull(preCheckResult); + assertEquals(preCheckResult.size(), 2); + assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS)); + assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT)); + // Sending request to servers should fail for all, so needsPreprocess should be set to "error" to indicate that a + // manual check is needed + assertEquals(preCheckResult.get(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS), "error"); + assertEquals(preCheckResult.get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT), "false"); // All servers should be assigned to the table instanceAssignment = rebalanceResult.getInstanceAssignment(); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 054d8393cf..6750a7340d 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -34,10 +34,13 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; @@ -61,10 +64,16 @@ import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.ServiceStatus; import org.apache.pinot.common.utils.SimpleHttpResponse; import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.rebalance.DefaultRebalancePreChecker; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer; import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator; import org.apache.pinot.segment.spi.index.ForwardIndexConfig; import org.apache.pinot.segment.spi.index.StandardIndexes; import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair; +import org.apache.pinot.server.starter.helix.BaseServerStarter; import org.apache.pinot.spi.config.instance.InstanceType; import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec; @@ -73,6 +82,10 @@ import org.apache.pinot.spi.config.table.QueryConfig; import org.apache.pinot.spi.config.table.StarTreeIndexConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig; +import org.apache.pinot.spi.config.table.assignment.InstanceConstraintConfig; +import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; +import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; import org.apache.pinot.spi.config.table.ingestion.TransformConfig; import org.apache.pinot.spi.data.DateTimeFieldSpec; @@ -161,6 +174,9 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet // Once this value is set, assert that table size always gets back to this value after removing the added indices. private long _tableSize; + private PinotHelixResourceManager _resourceManager; + private TableRebalancer _tableRebalancer; + protected int getNumBrokers() { return NUM_BROKERS; } @@ -268,6 +284,11 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet waitForAllDocsLoaded(600_000L); _tableSize = getTableSize(getTableName()); + + _resourceManager = _controllerStarter.getHelixResourceManager(); + DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker(); + preChecker.init(_helixResourceManager, Executors.newFixedThreadPool(10)); + _tableRebalancer = new TableRebalancer(_resourceManager.getHelixZkManager(), null, null, preChecker); } private void reloadAllSegments(String testQuery, boolean forceDownload, long numTotalDocs) @@ -777,6 +798,104 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet }, 60_000L, "Failed to execute query"); } + @Test + public void testRebalancePreChecks() + throws Exception { + // setup the rebalance config + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + + TableConfig tableConfig = getOfflineTableConfig(); + + // Ensure pre-check status is null if not enabled + RebalanceResult rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertNull(rebalanceResult.getPreChecksResult()); + + // Enable pre-checks, nothing is set + rebalanceConfig.setPreChecks(true); + rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, false, false); + + // Enable minimizeDataMovement + tableConfig.setInstanceAssignmentConfigMap(createInstanceAssignmentConfigMap()); + rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, true, false); + + // Undo minimizeDataMovement, update the table config to add a column to bloom filter + tableConfig.getIndexingConfig().getBloomFilterColumns().add("Quarter"); + tableConfig.setInstanceAssignmentConfigMap(null); + updateTableConfig(tableConfig); + rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, false, true); + + // Undo tableConfig change + tableConfig.getIndexingConfig().getBloomFilterColumns().remove("Quarter"); + updateTableConfig(tableConfig); + rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, false, false); + + // Add a schema change + Schema schema = createSchema(); + schema.addField(new MetricFieldSpec("NewAddedIntMetric", DataType.INT, 1)); + updateSchema(schema); + rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, false, true); + + // Keep schema change and update table config to add minimizeDataMovement + tableConfig.setInstanceAssignmentConfigMap(createInstanceAssignmentConfigMap()); + rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, true, true); + + // Add a new server (to force change in instance assignment) and enable reassignInstances + BaseServerStarter serverStarter1 = startOneServer(NUM_SERVERS); + rebalanceConfig.setReassignInstances(true); + tableConfig.setInstanceAssignmentConfigMap(null); + rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.DONE, false, true); + + // Disable dry-run + rebalanceConfig.setDryRun(false); + rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertNull(rebalanceResult.getPreChecksResult()); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED); + + // Stop the added server + serverStarter1.stop(); + TestUtils.waitForCondition(aVoid -> _resourceManager.dropInstance(serverStarter1.getInstanceId()).isSuccessful(), + 60_000L, "Failed to drop added server"); + } + + private void checkRebalancePreCheckStatus(RebalanceResult rebalanceResult, RebalanceResult.Status expectedStatus, + boolean expectedMinimizeDataMovement, boolean expectedNeedsReloadStatus) { + assertEquals(rebalanceResult.getStatus(), expectedStatus); + Map<String, String> preChecksResult = rebalanceResult.getPreChecksResult(); + assertNotNull(preChecksResult); + assertEquals(preChecksResult.size(), 2); + assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT)); + assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS)); + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT), + String.valueOf(expectedMinimizeDataMovement)); + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS), + String.valueOf(expectedNeedsReloadStatus)); + } + + private Map<String, InstanceAssignmentConfig> createInstanceAssignmentConfigMap() { + InstanceTagPoolConfig instanceTagPoolConfig = + new InstanceTagPoolConfig("tag", false, 1, null); + List<String> constraints = new ArrayList<>(); + constraints.add("constraints1"); + InstanceConstraintConfig instanceConstraintConfig = new InstanceConstraintConfig(constraints); + InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig = + new InstanceReplicaGroupPartitionConfig(true, 1, 1, + 1, 1, 1, true, + null); + InstanceAssignmentConfig instanceAssignmentConfig = new InstanceAssignmentConfig(instanceTagPoolConfig, + instanceConstraintConfig, instanceReplicaGroupPartitionConfig, null, true); + Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = new HashMap<>(); + instanceAssignmentConfigMap.put("OFFLINE", instanceAssignmentConfig); + return instanceAssignmentConfigMap; + } + @Test(dataProvider = "useBothQueryEngines") public void testRegexpReplace(boolean useMultiStageQueryEngine) throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org