This is an automated email from the ASF dual-hosted git repository. yashmayya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new c7af34e7aa Reject rebalance requests for tables that already have an in progress rebalance job (#15990) c7af34e7aa is described below commit c7af34e7aae0ec0b0772ef9d0842d5970ad491f0 Author: Yash Mayya <yash.ma...@gmail.com> AuthorDate: Tue Jun 10 17:53:46 2025 +0100 Reject rebalance requests for tables that already have an in progress rebalance job (#15990) --- .../exception/RebalanceInProgressException.java | 33 +++ .../pinot/controller/BaseControllerStarter.java | 47 +++- .../api/resources/PinotRealtimeTableResource.java | 2 +- .../api/resources/PinotTableRestletResource.java | 89 ++---- .../helix/core/PinotHelixResourceManager.java | 142 +--------- .../helix/core/rebalance/RebalanceChecker.java | 47 ++-- .../core/rebalance/TableRebalanceManager.java | 307 +++++++++++++++++++++ .../helix/core/rebalance/TableRebalancer.java | 7 +- .../rebalance/ZkBasedTableRebalanceObserver.java | 16 +- .../rebalance/tenant/DefaultTenantRebalancer.java | 19 +- .../helix/core/relocation/SegmentRelocator.java | 19 +- .../helix/core/util/ControllerZkHelixUtils.java | 111 ++++++++ .../pinot/controller/helix/ControllerTest.java | 25 ++ .../helix/core/rebalance/RebalanceCheckerTest.java | 31 ++- .../TableRebalancerClusterStatelessTest.java | 19 +- .../TestZkBasedTableRebalanceObserver.java | 19 +- .../rebalance/tenant/TenantRebalancerTest.java | 3 +- .../core/relocation/SegmentRelocatorTest.java | 11 +- .../tests/TableRebalanceIntegrationTest.java | 58 ++++ .../apache/pinot/tools/PinotTableRebalancer.java | 22 +- 20 files changed, 732 insertions(+), 295 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/exception/RebalanceInProgressException.java b/pinot-common/src/main/java/org/apache/pinot/common/exception/RebalanceInProgressException.java new file mode 100644 index 0000000000..1b14e31394 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/exception/RebalanceInProgressException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.exception; + +/** + * Exception thrown when a rebalance operation is attempted while another rebalance is already in progress for the same + * table. This helps to prevent concurrent table rebalances. + */ +public class RebalanceInProgressException extends Exception { + public RebalanceInProgressException(String message) { + super(message); + } + + public RebalanceInProgressException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 6b9b88f88e..545ddd95aa 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -101,6 +101,9 @@ import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentMa import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig; import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager; import org.apache.pinot.controller.helix.core.rebalance.RebalanceChecker; +import org.apache.pinot.controller.helix.core.rebalance.RebalancePreChecker; +import org.apache.pinot.controller.helix.core.rebalance.RebalancePreCheckerFactory; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager; import org.apache.pinot.controller.helix.core.rebalance.tenant.DefaultTenantRebalancer; import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalancer; import org.apache.pinot.controller.helix.core.relocation.SegmentRelocator; @@ -203,11 +206,15 @@ public abstract class BaseControllerStarter implements ServiceStartable { protected TaskMetricsEmitter _taskMetricsEmitter; protected PoolingHttpClientConnectionManager _connectionManager; protected TenantRebalancer _tenantRebalancer; - protected ExecutorService _tenantRebalanceExecutorService; + // This executor should be used by all code paths for user initiated rebalances, so that the controller config + // CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS is honored. + protected ExecutorService _rebalancerExecutorService; protected TableSizeReader _tableSizeReader; protected StorageQuotaChecker _storageQuotaChecker; protected DiskUtilizationChecker _diskUtilizationChecker; protected ResourceUtilizationManager _resourceUtilizationManager; + protected RebalancePreChecker _rebalancePreChecker; + protected TableRebalanceManager _tableRebalanceManager; @Override public void init(PinotConfiguration pinotConfiguration) @@ -264,9 +271,6 @@ public abstract class BaseControllerStarter implements ServiceStartable { // Do not use this before the invocation of {@link PinotHelixResourceManager::start()}, which happens in {@link // ControllerStarter::start()} _helixResourceManager = createHelixResourceManager(); - _tenantRebalanceExecutorService = createExecutorService(_config.getControllerExecutorRebalanceNumThreads(), - "tenant-rebalance-thread-%d"); - _tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager, _tenantRebalanceExecutorService); } // Initialize the table config tuner registry. @@ -347,7 +351,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { * @return A new instance of PinotHelixResourceManager. */ protected PinotHelixResourceManager createHelixResourceManager() { - return new PinotHelixResourceManager(_config, _executorService); + return new PinotHelixResourceManager(_config); } public PinotHelixResourceManager getHelixResourceManager() { @@ -389,6 +393,14 @@ public abstract class BaseControllerStarter implements ServiceStartable { return _staleInstancesCleanupTask; } + public TableRebalanceManager getTableRebalanceManager() { + return _tableRebalanceManager; + } + + public TableSizeReader getTableSizeReader() { + return _tableSizeReader; + } + @Override public ServiceRole getServiceRole() { return ServiceRole.CONTROLLER; @@ -536,12 +548,20 @@ public abstract class BaseControllerStarter implements ServiceStartable { _tableSizeReader = new TableSizeReader(_executorService, _connectionManager, _controllerMetrics, _helixResourceManager, _leadControllerManager); - _helixResourceManager.registerTableSizeReader(_tableSizeReader); _storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader, _controllerMetrics, _leadControllerManager, _helixResourceManager, _config); _diskUtilizationChecker = new DiskUtilizationChecker(_helixResourceManager, _config); _resourceUtilizationManager = new ResourceUtilizationManager(_config, _diskUtilizationChecker); + _rebalancePreChecker = RebalancePreCheckerFactory.create(_config.getRebalancePreCheckerClass()); + _rebalancePreChecker.init(_helixResourceManager, _executorService, _config.getDiskUtilizationThreshold()); + _rebalancerExecutorService = createExecutorService(_config.getControllerExecutorRebalanceNumThreads(), + "rebalance-thread-%d"); + _tableRebalanceManager = + new TableRebalanceManager(_helixResourceManager, _controllerMetrics, _rebalancePreChecker, _tableSizeReader, + _rebalancerExecutorService); + _tenantRebalancer = + new DefaultTenantRebalancer(_tableRebalanceManager, _helixResourceManager, _rebalancerExecutorService); // Setting up periodic tasks List<PeriodicTask> controllerPeriodicTasks = setupControllerPeriodicTasks(); @@ -579,6 +599,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { bind(_helixParticipantInstanceId).named(CONTROLLER_INSTANCE_ID); bind(_helixResourceManager).to(PinotHelixResourceManager.class); bind(_helixTaskResourceManager).to(PinotHelixTaskResourceManager.class); + bind(_tableRebalanceManager).to(TableRebalanceManager.class); bind(_segmentCompletionManager).to(SegmentCompletionManager.class); bind(_taskManager).to(PinotTaskManager.class); bind(_taskManagerStatusCache).to(TaskManagerStatusCache.class); @@ -857,15 +878,17 @@ public abstract class BaseControllerStarter implements ServiceStartable { new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, _tableSizeReader); periodicTasks.add(_segmentStatusChecker); - _rebalanceChecker = new RebalanceChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, - _executorService); + _rebalanceChecker = + new RebalanceChecker(_tableRebalanceManager, _helixResourceManager, _leadControllerManager, _config, + _controllerMetrics); periodicTasks.add(_rebalanceChecker); _realtimeConsumerMonitor = new RealtimeConsumerMonitor(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics, _executorService); periodicTasks.add(_realtimeConsumerMonitor); - _segmentRelocator = new SegmentRelocator(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, - _executorService, _connectionManager); + _segmentRelocator = + new SegmentRelocator(_tableRebalanceManager, _helixResourceManager, _leadControllerManager, _config, + _controllerMetrics, _executorService, _connectionManager); periodicTasks.add(_segmentRelocator); _staleInstancesCleanupTask = new StaleInstancesCleanupTask(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics); @@ -965,8 +988,8 @@ public abstract class BaseControllerStarter implements ServiceStartable { LOGGER.info("Shutting down executor service"); _executorService.shutdownNow(); _executorService.awaitTermination(10L, TimeUnit.SECONDS); - _tenantRebalanceExecutorService.shutdownNow(); - _tenantRebalanceExecutorService.awaitTermination(10L, TimeUnit.SECONDS); + _rebalancerExecutorService.shutdownNow(); + _rebalancerExecutorService.awaitTermination(10L, TimeUnit.SECONDS); } catch (final Exception e) { LOGGER.error("Caught exception while shutting down", e); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java index 595169ce44..ca6cd35bc0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java @@ -259,7 +259,7 @@ public class PinotRealtimeTableResource { controllerJobZKMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST, JsonUtils.objectToString(segmentsYetToBeCommitted)); _pinotHelixResourceManager.addControllerJobToZK(forceCommitJobId, controllerJobZKMetadata, - ControllerJobType.FORCE_COMMIT, prev -> true); + ControllerJobType.FORCE_COMMIT); } Map<String, Object> result = new HashMap<>(controllerJobZKMetadata); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index 30d8401165..bdb82ce766 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -45,8 +45,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -73,6 +73,7 @@ import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.exception.InvalidConfigException; +import org.apache.pinot.common.exception.RebalanceInProgressException; import org.apache.pinot.common.exception.SchemaNotFoundException; import org.apache.pinot.common.exception.TableNotFoundException; import org.apache.pinot.common.metadata.ZKMetadataProvider; @@ -97,10 +98,8 @@ import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse; import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; -import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; -import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext; -import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager; import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer; import org.apache.pinot.controller.recommender.RecommenderDriver; import org.apache.pinot.controller.tuner.TableConfigTunerUtils; @@ -171,6 +170,9 @@ public class PinotTableRestletResource { @Inject PinotHelixResourceManager _pinotHelixResourceManager; + @Inject + TableRebalanceManager _tableRebalanceManager; + @Inject PinotHelixTaskResourceManager _pinotHelixTaskResourceManager; @@ -183,9 +185,6 @@ public class PinotTableRestletResource { @Inject ControllerMetrics _controllerMetrics; - @Inject - ExecutorService _executorService; - @Inject AccessControlFactory _accessControlFactory; @@ -695,31 +694,28 @@ public class PinotTableRestletResource { try { if (dryRun || preChecks || downtime) { - // For dry-run, preChecks or rebalance with downtime, directly return the rebalance result as it should return - // immediately - return _pinotHelixResourceManager.rebalanceTable(tableNameWithType, rebalanceConfig, rebalanceJobId, false); + // For dry-run, preChecks or rebalance with downtime, it's fine to run the rebalance synchronously since it + // should be a really short operation. + return _tableRebalanceManager.rebalanceTable(tableNameWithType, rebalanceConfig, rebalanceJobId, false); } else { // Make a dry-run first to get the target assignment rebalanceConfig.setDryRun(true); RebalanceResult dryRunResult = - _pinotHelixResourceManager.rebalanceTable(tableNameWithType, rebalanceConfig, rebalanceJobId, false); + _tableRebalanceManager.rebalanceTable(tableNameWithType, rebalanceConfig, rebalanceJobId, false); if (dryRunResult.getStatus() == RebalanceResult.Status.DONE) { // If dry-run succeeded, run rebalance asynchronously rebalanceConfig.setDryRun(false); - Future<RebalanceResult> rebalanceResultFuture = _executorService.submit(() -> { - try { - return _pinotHelixResourceManager.rebalanceTable( - tableNameWithType, rebalanceConfig, rebalanceJobId, true); - } catch (Throwable t) { + CompletableFuture<RebalanceResult> rebalanceResultFuture = + _tableRebalanceManager.rebalanceTableAsync(tableNameWithType, rebalanceConfig, rebalanceJobId, true); + rebalanceResultFuture.whenComplete((rebalanceResult, throwable) -> { + if (throwable != null) { String errorMsg = String.format("Caught exception/error while rebalancing table: %s", tableNameWithType); - LOGGER.error(errorMsg, t); - return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, errorMsg, null, null, null, - null, null); + LOGGER.error(errorMsg, throwable); } }); - boolean isJobIdPersisted = waitForRebalanceToPersist( - dryRunResult.getJobId(), tableNameWithType, rebalanceResultFuture); + boolean isJobIdPersisted = + waitForRebalanceToPersist(dryRunResult.getJobId(), tableNameWithType, rebalanceResultFuture); if (rebalanceResultFuture.isDone()) { try { @@ -744,6 +740,8 @@ public class PinotTableRestletResource { } } catch (TableNotFoundException e) { throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.NOT_FOUND); + } catch (RebalanceInProgressException e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.CONFLICT); } } @@ -783,29 +781,7 @@ public class PinotTableRestletResource { @Context HttpHeaders headers) { tableName = DatabaseUtils.translateTableName(tableName, headers); String tableNameWithType = constructTableNameWithType(tableName, tableTypeStr); - List<String> cancelledJobIds = new ArrayList<>(); - boolean updated = - _pinotHelixResourceManager.updateJobsForTable(tableNameWithType, ControllerJobType.TABLE_REBALANCE, - jobMetadata -> { - String jobId = jobMetadata.get(CommonConstants.ControllerJob.JOB_ID); - try { - String jobStatsInStr = jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS); - TableRebalanceProgressStats jobStats = - JsonUtils.stringToObject(jobStatsInStr, TableRebalanceProgressStats.class); - if (jobStats.getStatus() != RebalanceResult.Status.IN_PROGRESS) { - return; - } - cancelledJobIds.add(jobId); - LOGGER.info("Cancel rebalance job: {} for table: {}", jobId, tableNameWithType); - jobStats.setStatus(RebalanceResult.Status.CANCELLED); - jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, - JsonUtils.objectToString(jobStats)); - } catch (Exception e) { - LOGGER.error("Failed to cancel rebalance job: {} for table: {}", jobId, tableNameWithType, e); - } - }); - LOGGER.info("Tried to cancel existing jobs at best effort and done: {}", updated); - return cancelledJobIds; + return _tableRebalanceManager.cancelRebalance(tableNameWithType); } @GET @@ -818,30 +794,7 @@ public class PinotTableRestletResource { public ServerRebalanceJobStatusResponse rebalanceStatus( @ApiParam(value = "Rebalance Job Id", required = true) @PathParam("jobId") String jobId) throws JsonProcessingException { - Map<String, String> controllerJobZKMetadata = getControllerJobMetadata(jobId); - - if (controllerJobZKMetadata == null) { - throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + jobId, - Response.Status.NOT_FOUND); - } - ServerRebalanceJobStatusResponse serverRebalanceJobStatusResponse = new ServerRebalanceJobStatusResponse(); - TableRebalanceProgressStats tableRebalanceProgressStats = JsonUtils.stringToObject( - controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS), - TableRebalanceProgressStats.class); - serverRebalanceJobStatusResponse.setTableRebalanceProgressStats(tableRebalanceProgressStats); - - long timeSinceStartInSecs = 0L; - if (RebalanceResult.Status.DONE != tableRebalanceProgressStats.getStatus()) { - timeSinceStartInSecs = (System.currentTimeMillis() - tableRebalanceProgressStats.getStartTimeMs()) / 1000; - } - serverRebalanceJobStatusResponse.setTimeElapsedSinceStartInSeconds(timeSinceStartInSecs); - - String jobCtxInStr = controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT); - if (StringUtils.isNotEmpty(jobCtxInStr)) { - TableRebalanceContext jobCtx = JsonUtils.stringToObject(jobCtxInStr, TableRebalanceContext.class); - serverRebalanceJobStatusResponse.setTableRebalanceContext(jobCtx); - } - return serverRebalanceJobStatusResponse; + return _tableRebalanceManager.getRebalanceStatus(jobId); } @GET diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 21894be117..b097ecd05b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -48,7 +48,6 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; @@ -154,15 +153,8 @@ import org.apache.pinot.controller.helix.core.lineage.LineageManager; import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; -import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; -import org.apache.pinot.controller.helix.core.rebalance.RebalancePreChecker; -import org.apache.pinot.controller.helix.core.rebalance.RebalancePreCheckerFactory; -import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; -import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext; -import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer; -import org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObserver; +import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils; import org.apache.pinot.controller.helix.starter.HelixConfig; -import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.spi.config.DatabaseConfig; import org.apache.pinot.spi.config.instance.Instance; @@ -245,13 +237,10 @@ public class PinotHelixResourceManager { private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager; private TableCache _tableCache; private final LineageManager _lineageManager; - private final RebalancePreChecker _rebalancePreChecker; - private TableSizeReader _tableSizeReader; public PinotHelixResourceManager(String zkURL, String helixClusterName, @Nullable String dataDir, boolean isSingleTenantCluster, boolean enableBatchMessageMode, int deletedSegmentsRetentionInDays, - boolean enableTieredSegmentAssignment, LineageManager lineageManager, RebalancePreChecker rebalancePreChecker, - @Nullable ExecutorService executorService, double diskUtilizationThreshold) { + boolean enableTieredSegmentAssignment, LineageManager lineageManager) { _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL); _helixClusterName = helixClusterName; _dataDir = dataDir; @@ -274,26 +263,13 @@ public class PinotHelixResourceManager { _lineageUpdaterLocks[i] = new Object(); } _lineageManager = lineageManager; - _rebalancePreChecker = rebalancePreChecker; - _rebalancePreChecker.init(this, executorService, diskUtilizationThreshold); - } - - public PinotHelixResourceManager(ControllerConf controllerConf, @Nullable ExecutorService executorService) { - this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), controllerConf.getDataDir(), - controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(), - controllerConf.getDeletedSegmentsRetentionInDays(), controllerConf.tieredSegmentAssignmentEnabled(), - LineageManagerFactory.create(controllerConf), - RebalancePreCheckerFactory.create(controllerConf.getRebalancePreCheckerClass()), executorService, - controllerConf.getRebalanceDiskUtilizationThreshold()); } public PinotHelixResourceManager(ControllerConf controllerConf) { this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), controllerConf.getDataDir(), controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(), controllerConf.getDeletedSegmentsRetentionInDays(), controllerConf.tieredSegmentAssignmentEnabled(), - LineageManagerFactory.create(controllerConf), - RebalancePreCheckerFactory.create(controllerConf.getRebalancePreCheckerClass()), null, - controllerConf.getRebalanceDiskUtilizationThreshold()); + LineageManagerFactory.create(controllerConf)); } /** @@ -447,24 +423,6 @@ public class PinotHelixResourceManager { return _lineageManager; } - /** - * Get the rebalance pre-checker - * - * @return rebalance pre-checker - */ - public RebalancePreChecker getRebalancePreChecker() { - return _rebalancePreChecker; - } - - /** - * Get the table size reader. - * - * @return table size reader - */ - public TableSizeReader getTableSizeReader() { - return _tableSizeReader; - } - /** * Instance related APIs */ @@ -2059,10 +2017,6 @@ public class PinotHelixResourceManager { _pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager; } - public void registerTableSizeReader(TableSizeReader tableSizeReader) { - _tableSizeReader = tableSizeReader; - } - private void assignInstances(TableConfig tableConfig, boolean override) { String tableNameWithType = tableConfig.getTableName(); String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); @@ -2440,25 +2394,7 @@ public class PinotHelixResourceManager { */ public Map<String, Map<String, String>> getAllJobs(Set<String> jobTypes, Predicate<Map<String, String>> jobMetadataChecker) { - Map<String, Map<String, String>> controllerJobs = new HashMap<>(); - for (String jobType : jobTypes) { - String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType); - ZNRecord jobsZnRecord = _propertyStore.get(jobResourcePath, null, AccessOption.PERSISTENT); - if (jobsZnRecord == null) { - continue; - } - Map<String, Map<String, String>> jobMetadataMap = jobsZnRecord.getMapFields(); - for (Map.Entry<String, Map<String, String>> jobMetadataEntry : jobMetadataMap.entrySet()) { - String jobId = jobMetadataEntry.getKey(); - Map<String, String> jobMetadata = jobMetadataEntry.getValue(); - Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.JOB_TYPE).equals(jobType), - "Got unexpected jobType: %s at jobResourcePath: %s with jobId: %s", jobType, jobResourcePath, jobId); - if (jobMetadataChecker.test(jobMetadata)) { - controllerJobs.put(jobId, jobMetadata); - } - } - } - return controllerJobs; + return ControllerZkHelixUtils.getAllControllerJobs(jobTypes, jobMetadataChecker, _propertyStore); } /** @@ -2538,37 +2474,12 @@ public class PinotHelixResourceManager { * @param jobId job's UUID * @param jobMetadata the job metadata * @param jobType the type of the job to figure out where job metadata is kept in ZK - * @param prevJobMetadataChecker to check the previous job metadata before adding new one * @return boolean representing success / failure of the ZK write step */ public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadata, String jobType, Predicate<Map<String, String>> prevJobMetadataChecker) { - Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS) != null, - "Submission Time in JobMetadata record not set. Cannot expire these records"); - String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType); - Stat stat = new Stat(); - ZNRecord jobsZnRecord = _propertyStore.get(jobResourcePath, stat, AccessOption.PERSISTENT); - if (jobsZnRecord != null) { - Map<String, Map<String, String>> jobMetadataMap = jobsZnRecord.getMapFields(); - Map<String, String> prevJobMetadata = jobMetadataMap.get(jobId); - if (!prevJobMetadataChecker.test(prevJobMetadata)) { - return false; - } - jobMetadataMap.put(jobId, jobMetadata); - if (jobMetadataMap.size() > CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) { - jobMetadataMap = jobMetadataMap.entrySet().stream().sorted((v1, v2) -> Long.compare( - Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)), - Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)))) - .collect(Collectors.toList()).subList(0, CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) - .stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } - jobsZnRecord.setMapFields(jobMetadataMap); - return _propertyStore.set(jobResourcePath, jobsZnRecord, stat.getVersion(), AccessOption.PERSISTENT); - } else { - jobsZnRecord = new ZNRecord(jobResourcePath); - jobsZnRecord.setMapField(jobId, jobMetadata); - return _propertyStore.set(jobResourcePath, jobsZnRecord, AccessOption.PERSISTENT); - } + return ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, jobId, jobMetadata, jobType, + prevJobMetadataChecker); } /** @@ -3874,47 +3785,10 @@ public class PinotHelixResourceManager { "Instance: " + instanceName + (enableInstance ? " enable" : " disable") + " failed, timeout"); } - /** - * Entry point for table Rebalacing. - * @param tableNameWithType - * @param rebalanceConfig - * @param trackRebalanceProgress whether to track rebalance progress stats - * @return RebalanceResult - * @throws TableNotFoundException - */ - public RebalanceResult rebalanceTable(String tableNameWithType, RebalanceConfig rebalanceConfig, - String rebalanceJobId, boolean trackRebalanceProgress) - throws TableNotFoundException { - TableConfig tableConfig = getTableConfig(tableNameWithType); - if (tableConfig == null) { - throw new TableNotFoundException("Failed to find table config for table: " + tableNameWithType); - } - Preconditions.checkState(rebalanceJobId != null, "RebalanceId not populated in the rebalanceConfig"); - ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver = null; - if (trackRebalanceProgress) { - zkBasedTableRebalanceObserver = new ZkBasedTableRebalanceObserver(tableNameWithType, rebalanceJobId, - TableRebalanceContext.forInitialAttempt(rebalanceJobId, rebalanceConfig), this); - } - return rebalanceTable(tableNameWithType, tableConfig, rebalanceJobId, rebalanceConfig, - zkBasedTableRebalanceObserver); - } - - public RebalanceResult rebalanceTable(String tableNameWithType, TableConfig tableConfig, String rebalanceJobId, - RebalanceConfig rebalanceConfig, @Nullable ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver) { - Map<String, Set<String>> tierToSegmentsMap = null; - if (rebalanceConfig.isUpdateTargetTier()) { - tierToSegmentsMap = updateTargetTier(rebalanceJobId, tableNameWithType, tableConfig); - } - TableRebalancer tableRebalancer = - new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver, _controllerMetrics, _rebalancePreChecker, - _tableSizeReader); - return tableRebalancer.rebalance(tableConfig, rebalanceConfig, rebalanceJobId, tierToSegmentsMap); - } - /// Calculates the target tier for the segments within a table, updates the segment ZK metadata and persists the /// update to ZK. - @VisibleForTesting - Map<String, Set<String>> updateTargetTier(String rebalanceJobId, String tableNameWithType, TableConfig tableConfig) { + public Map<String, Set<String>> updateTargetTier(String rebalanceJobId, String tableNameWithType, + TableConfig tableConfig) { List<Tier> sortedTiers = CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList()) ? getSortedTiers(tableConfig) : List.of(); LOGGER.info("For rebalanceId: {}, updating target tiers for segments of table: {} with tierConfigs: {}", diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java index 817c4129b4..b16c6b8197 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java @@ -27,10 +27,10 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.common.exception.RebalanceInProgressException; import org.apache.pinot.common.metadata.controllerjob.ControllerJobType; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMeter; @@ -54,15 +54,15 @@ import org.slf4j.LoggerFactory; public class RebalanceChecker extends ControllerPeriodicTask<Void> { private static final Logger LOGGER = LoggerFactory.getLogger(RebalanceChecker.class); private static final double RETRY_DELAY_SCALE_FACTOR = 2.0; - private final ExecutorService _executorService; + private final TableRebalanceManager _tableRebalanceManager; - public RebalanceChecker(PinotHelixResourceManager pinotHelixResourceManager, - LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics, - ExecutorService executorService) { + public RebalanceChecker(TableRebalanceManager tableRebalanceManager, + PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, + ControllerConf config, ControllerMetrics controllerMetrics) { super(RebalanceChecker.class.getSimpleName(), config.getRebalanceCheckerFrequencyInSeconds(), config.getRebalanceCheckerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager, controllerMetrics); - _executorService = executorService; + _tableRebalanceManager = tableRebalanceManager; } @Override @@ -152,14 +152,7 @@ public class RebalanceChecker extends ControllerPeriodicTask<Void> { // thread, in order to avoid unnecessary ZK reads and making too many ZK reads in a short time. TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", tableNameWithType); - _executorService.submit(() -> { - // Retry rebalance in another thread as rebalance can take time. - try { - retryRebalanceTableWithContext(tableNameWithType, tableConfig, jobCtx); - } catch (Throwable t) { - LOGGER.error("Failed to retry rebalance for table: {} asynchronously", tableNameWithType, t); - } - }); + retryRebalanceTableWithContext(tableNameWithType, tableConfig, jobCtx); } private void retryRebalanceTableWithContext(String tableNameWithType, TableConfig tableConfig, @@ -173,12 +166,23 @@ public class RebalanceChecker extends ControllerPeriodicTask<Void> { attemptJobId); _controllerMetrics.addMeteredTableValue(tableNameWithType, ControllerMeter.TABLE_REBALANCE_RETRY, 1L); ZkBasedTableRebalanceObserver observer = - new ZkBasedTableRebalanceObserver(tableNameWithType, attemptJobId, retryCtx, _pinotHelixResourceManager); - RebalanceResult result = - _pinotHelixResourceManager.rebalanceTable(tableNameWithType, tableConfig, attemptJobId, rebalanceConfig, - observer); - LOGGER.info("New attempt: {} for table: {} is done with result status: {}", attemptJobId, tableNameWithType, - result.getStatus()); + new ZkBasedTableRebalanceObserver(tableNameWithType, attemptJobId, retryCtx, + _pinotHelixResourceManager.getPropertyStore()); + + try { + _tableRebalanceManager.rebalanceTableAsync(tableNameWithType, tableConfig, attemptJobId, rebalanceConfig, + observer) + .whenComplete((result, throwable) -> { + if (throwable != null) { + LOGGER.error("Failed to retry rebalance for table: {}", tableNameWithType, throwable); + } else { + LOGGER.info("New attempt: {} for table: {} is done with result status: {}", attemptJobId, + tableNameWithType, result.getStatus()); + } + }); + } catch (RebalanceInProgressException e) { + LOGGER.warn("Rebalance job for table: {} is already in progress. Skipping retry.", tableNameWithType, e); + } } @VisibleForTesting @@ -309,7 +313,8 @@ public class RebalanceChecker extends ControllerPeriodicTask<Void> { } // The job is considered failed, but it's possible it is still running, then we might end up with more than one // rebalance jobs running in parallel for a table. The rebalance algorithm is idempotent, so this should be fine - // for the correctness. + // for the correctness. Note that we do still abort this job before retrying, because we don't allow more than + // one actively running rebalance job (as per ZK) for a table. LOGGER.info("Found stuck rebalance job: {} for original job: {}", jobId, originalJobId); candidates.computeIfAbsent(originalJobId, (k) -> new HashSet<>()).add(Pair.of(jobCtx, jobStartTimeMs)); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java new file mode 100644 index 0000000000..87de12b3d7 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import javax.annotation.Nullable; +import javax.ws.rs.NotFoundException; +import org.apache.commons.lang3.StringUtils; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.common.exception.RebalanceInProgressException; +import org.apache.pinot.common.exception.TableNotFoundException; +import org.apache.pinot.common.metadata.controllerjob.ControllerJobType; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.api.resources.ServerRebalanceJobStatusResponse; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils; +import org.apache.pinot.controller.util.TableSizeReader; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Single entry point for all table rebalance related operations. This class should be used to initiate table rebalance + * operations, rather than directly instantiating objects of {@link TableRebalancer}. + */ +public class TableRebalanceManager { + private static final Logger LOGGER = LoggerFactory.getLogger(TableRebalanceManager.class); + + private final PinotHelixResourceManager _resourceManager; + private final ControllerMetrics _controllerMetrics; + private final RebalancePreChecker _rebalancePreChecker; + private final TableSizeReader _tableSizeReader; + private final ExecutorService _executorService; + + public TableRebalanceManager(PinotHelixResourceManager resourceManager, ControllerMetrics controllerMetrics, + RebalancePreChecker rebalancePreChecker, TableSizeReader tableSizeReader, ExecutorService executorService) { + _resourceManager = resourceManager; + _controllerMetrics = controllerMetrics; + _rebalancePreChecker = rebalancePreChecker; + _tableSizeReader = tableSizeReader; + _executorService = executorService; + } + + /** + * Rebalance the table with the given name and type synchronously. It's the responsibility of the caller to ensure + * that this rebalance is run on the rebalance thread pool in the controller that respects the configuration + * {@link org.apache.pinot.controller.ControllerConf#CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS}. + * + * @param tableNameWithType name of the table to rebalance + * @param rebalanceConfig configuration for the rebalance operation + * @param rebalanceJobId ID of the rebalance job, which is used to track the progress of the rebalance operation + * @param trackRebalanceProgress whether to track rebalance progress stats in ZK + * @return result of the rebalance operation + * @throws TableNotFoundException if the table does not exist + */ + public RebalanceResult rebalanceTable(String tableNameWithType, RebalanceConfig rebalanceConfig, + String rebalanceJobId, boolean trackRebalanceProgress) + throws TableNotFoundException, RebalanceInProgressException { + TableConfig tableConfig = _resourceManager.getTableConfig(tableNameWithType); + if (tableConfig == null) { + throw new TableNotFoundException("Failed to find table config for table: " + tableNameWithType); + } + Preconditions.checkState(rebalanceJobId != null, "RebalanceId not populated in the rebalanceConfig"); + ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver = null; + if (trackRebalanceProgress) { + zkBasedTableRebalanceObserver = new ZkBasedTableRebalanceObserver(tableNameWithType, rebalanceJobId, + TableRebalanceContext.forInitialAttempt(rebalanceJobId, rebalanceConfig), + _resourceManager.getPropertyStore()); + } + return rebalanceTable(tableNameWithType, tableConfig, rebalanceJobId, rebalanceConfig, + zkBasedTableRebalanceObserver); + } + + /** + * Rebalance the table with the given name and type asynchronously. The number of concurrent rebalances permitted + * on this controller is configured by + * {@link org.apache.pinot.controller.ControllerConf#CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS} + * + * @param tableNameWithType name of the table to rebalance + * @param rebalanceConfig configuration for the rebalance operation + * @param rebalanceJobId ID of the rebalance job, which is used to track the progress of the rebalance operation + * @param trackRebalanceProgress whether to track rebalance progress stats in ZK + * @return a CompletableFuture that will complete with the result of the rebalance operation + * @throws TableNotFoundException if the table does not exist + */ + public CompletableFuture<RebalanceResult> rebalanceTableAsync(String tableNameWithType, + RebalanceConfig rebalanceConfig, String rebalanceJobId, boolean trackRebalanceProgress) + throws TableNotFoundException, RebalanceInProgressException { + TableConfig tableConfig = _resourceManager.getTableConfig(tableNameWithType); + if (tableConfig == null) { + throw new TableNotFoundException("Failed to find table config for table: " + tableNameWithType); + } + if (!rebalanceConfig.isDryRun()) { + checkRebalanceJobInProgress(tableNameWithType); + } + return CompletableFuture.supplyAsync( + () -> { + try { + return rebalanceTable(tableNameWithType, rebalanceConfig, rebalanceJobId, trackRebalanceProgress); + } catch (TableNotFoundException e) { + // Should not happen since we already checked for table existence + throw new RuntimeException(e); + } catch (RebalanceInProgressException e) { + throw new RuntimeException(e); + } + }, + _executorService); + } + + /** + * Rebalance the table with the given name and type asynchronously. The number of concurrent rebalances permitted + * on this controller is configured by + * {@link org.apache.pinot.controller.ControllerConf#CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS} + * + * @param tableNameWithType name of the table to rebalance + * @param tableConfig configuration for the table to rebalance + * @param rebalanceJobId ID of the rebalance job, which is used to track the progress of the rebalance operation + * @param rebalanceConfig configuration for the rebalance operation + * @param zkBasedTableRebalanceObserver observer to track rebalance progress in ZK + * @return a CompletableFuture that will complete with the result of the rebalance operation + */ + public CompletableFuture<RebalanceResult> rebalanceTableAsync(String tableNameWithType, TableConfig tableConfig, + String rebalanceJobId, RebalanceConfig rebalanceConfig, + @Nullable ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver) + throws RebalanceInProgressException { + if (!rebalanceConfig.isDryRun()) { + checkRebalanceJobInProgress(tableNameWithType); + } + + return CompletableFuture.supplyAsync( + () -> { + try { + return rebalanceTable(tableNameWithType, tableConfig, rebalanceJobId, rebalanceConfig, + zkBasedTableRebalanceObserver); + } catch (RebalanceInProgressException e) { + throw new RuntimeException(e); + } + }, + _executorService); + } + + @VisibleForTesting + RebalanceResult rebalanceTable(String tableNameWithType, TableConfig tableConfig, String rebalanceJobId, + RebalanceConfig rebalanceConfig, @Nullable ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver) + throws RebalanceInProgressException { + + if (!rebalanceConfig.isDryRun()) { + checkRebalanceJobInProgress(tableNameWithType); + } + + Map<String, Set<String>> tierToSegmentsMap; + if (rebalanceConfig.isUpdateTargetTier()) { + tierToSegmentsMap = _resourceManager.updateTargetTier(rebalanceJobId, tableNameWithType, tableConfig); + } else { + tierToSegmentsMap = null; + } + TableRebalancer tableRebalancer = + new TableRebalancer(_resourceManager.getHelixZkManager(), zkBasedTableRebalanceObserver, _controllerMetrics, + _rebalancePreChecker, _tableSizeReader); + + return tableRebalancer.rebalance(tableConfig, rebalanceConfig, rebalanceJobId, tierToSegmentsMap); + } + + /** + * Cancels ongoing rebalance jobs (if any) for the given table. + * + * @param tableNameWithType name of the table for which to cancel any ongoing rebalance job + * @return the list of job IDs that were cancelled + */ + public List<String> cancelRebalance(String tableNameWithType) { + List<String> cancelledJobIds = new ArrayList<>(); + boolean updated = _resourceManager.updateJobsForTable(tableNameWithType, ControllerJobType.TABLE_REBALANCE, + jobMetadata -> { + String jobId = jobMetadata.get(CommonConstants.ControllerJob.JOB_ID); + try { + String jobStatsInStr = jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS); + TableRebalanceProgressStats jobStats = + JsonUtils.stringToObject(jobStatsInStr, TableRebalanceProgressStats.class); + if (jobStats.getStatus() != RebalanceResult.Status.IN_PROGRESS) { + return; + } + + LOGGER.info("Cancelling rebalance job: {} for table: {}", jobId, tableNameWithType); + jobStats.setStatus(RebalanceResult.Status.CANCELLED); + jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, + JsonUtils.objectToString(jobStats)); + cancelledJobIds.add(jobId); + } catch (Exception e) { + LOGGER.error("Failed to cancel rebalance job: {} for table: {}", jobId, tableNameWithType, e); + } + }); + LOGGER.info("Tried to cancel existing rebalance jobs for table: {} at best effort and done: {}", tableNameWithType, + updated); + return cancelledJobIds; + } + + /** + * Gets the status of the rebalance job with the given ID. + * + * @param jobId ID of the rebalance job to get the status for + * @return response containing the status of the rebalance job + * @throws JsonProcessingException if there is an error processing the rebalance progress stats from ZK + * @throws NotFoundException if the rebalance job with the given ID does not exist + */ + public ServerRebalanceJobStatusResponse getRebalanceStatus(String jobId) + throws JsonProcessingException { + Map<String, String> controllerJobZKMetadata = + _resourceManager.getControllerJobZKMetadata(jobId, ControllerJobType.TABLE_REBALANCE); + if (controllerJobZKMetadata == null) { + LOGGER.warn("Rebalance job with ID: {} not found", jobId); + throw new NotFoundException("Rebalance job with ID: " + jobId + " not found"); + } + ServerRebalanceJobStatusResponse serverRebalanceJobStatusResponse = new ServerRebalanceJobStatusResponse(); + TableRebalanceProgressStats tableRebalanceProgressStats = JsonUtils.stringToObject( + controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS), + TableRebalanceProgressStats.class); + serverRebalanceJobStatusResponse.setTableRebalanceProgressStats(tableRebalanceProgressStats); + + long timeSinceStartInSecs = 0L; + if (RebalanceResult.Status.DONE != tableRebalanceProgressStats.getStatus()) { + timeSinceStartInSecs = (System.currentTimeMillis() - tableRebalanceProgressStats.getStartTimeMs()) / 1000; + } + serverRebalanceJobStatusResponse.setTimeElapsedSinceStartInSeconds(timeSinceStartInSecs); + + String jobCtxInStr = controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT); + if (StringUtils.isNotEmpty(jobCtxInStr)) { + TableRebalanceContext jobCtx = JsonUtils.stringToObject(jobCtxInStr, TableRebalanceContext.class); + serverRebalanceJobStatusResponse.setTableRebalanceContext(jobCtx); + } + return serverRebalanceJobStatusResponse; + } + + private void checkRebalanceJobInProgress(String tableNameWithType) + throws RebalanceInProgressException { + String rebalanceJobInProgress = rebalanceJobInProgress(tableNameWithType, _resourceManager.getPropertyStore()); + if (rebalanceJobInProgress != null) { + String errorMsg = "Rebalance job is already in progress for table: " + tableNameWithType + ", jobId: " + + rebalanceJobInProgress + ". Please wait for the job to complete or cancel it before starting a new one."; + throw new RebalanceInProgressException(errorMsg); + } + } + + /** + * Checks if there is an ongoing rebalance job for the given table. + * + * @param tableNameWithType name of the table to check for ongoing rebalance jobs + * @param propertyStore ZK property store to read the job metadata from + * @return jobId of the ongoing rebalance job if one exists, {@code null} otherwise + */ + @Nullable + public static String rebalanceJobInProgress(String tableNameWithType, ZkHelixPropertyStore<ZNRecord> propertyStore) { + // Get all jobMetadata for the given table with a single ZK read. + Map<String, Map<String, String>> allJobMetadataByJobId = + ControllerZkHelixUtils.getAllControllerJobs(Collections.singleton(ControllerJobType.TABLE_REBALANCE), + jobMetadata -> tableNameWithType.equals( + jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE)), propertyStore); + + for (Map.Entry<String, Map<String, String>> entry : allJobMetadataByJobId.entrySet()) { + String jobId = entry.getKey(); + Map<String, String> jobMetadata = entry.getValue(); + String jobStatsInStr = jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS); + + TableRebalanceProgressStats jobStats; + try { + jobStats = JsonUtils.stringToObject(jobStatsInStr, TableRebalanceProgressStats.class); + } catch (Exception e) { + // If the job stats cannot be parsed, let's assume that the job is not in progress. + continue; + } + + if (jobStats.getStatus() == RebalanceResult.Status.IN_PROGRESS) { + return jobId; + } + } + + return null; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index 3bf80c9f60..cf3874ce2b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -30,6 +30,7 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; @@ -156,11 +157,7 @@ public class TableRebalancer { @Nullable ControllerMetrics controllerMetrics, @Nullable RebalancePreChecker rebalancePreChecker, @Nullable TableSizeReader tableSizeReader) { _helixManager = helixManager; - if (tableRebalanceObserver != null) { - _tableRebalanceObserver = tableRebalanceObserver; - } else { - _tableRebalanceObserver = new NoOpTableRebalanceObserver(); - } + _tableRebalanceObserver = Objects.requireNonNullElseGet(tableRebalanceObserver, NoOpTableRebalanceObserver::new); _helixDataAccessor = helixManager.getHelixDataAccessor(); _controllerMetrics = controllerMetrics; _rebalancePreChecker = rebalancePreChecker; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java index 042be1a93d..e1cfdfc833 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java @@ -25,10 +25,12 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.metadata.controllerjob.ControllerJobType; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; -import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; import org.slf4j.Logger; @@ -43,7 +45,7 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { private static final Logger LOGGER = LoggerFactory.getLogger(ZkBasedTableRebalanceObserver.class); private final String _tableNameWithType; private final String _rebalanceJobId; - private final PinotHelixResourceManager _pinotHelixResourceManager; + private final ZkHelixPropertyStore<ZNRecord> _propertyStore; private final TableRebalanceProgressStats _tableRebalanceProgressStats; private final TableRebalanceContext _tableRebalanceContext; // These previous stats are used for rollback scenarios where the IdealState update fails due to a version @@ -59,13 +61,13 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { private final ControllerMetrics _controllerMetrics; public ZkBasedTableRebalanceObserver(String tableNameWithType, String rebalanceJobId, - TableRebalanceContext tableRebalanceContext, PinotHelixResourceManager pinotHelixResourceManager) { + TableRebalanceContext tableRebalanceContext, ZkHelixPropertyStore<ZNRecord> propertyStore) { Preconditions.checkState(tableNameWithType != null, "Table name cannot be null"); Preconditions.checkState(rebalanceJobId != null, "rebalanceId cannot be null"); - Preconditions.checkState(pinotHelixResourceManager != null, "PinotHelixManager cannot be null"); + Preconditions.checkState(propertyStore != null, "ZkHelixPropertyStore cannot be null"); _tableNameWithType = tableNameWithType; _rebalanceJobId = rebalanceJobId; - _pinotHelixResourceManager = pinotHelixResourceManager; + _propertyStore = propertyStore; _tableRebalanceProgressStats = new TableRebalanceProgressStats(); _tableRebalanceContext = tableRebalanceContext; _previousStepStats = new TableRebalanceProgressStats.RebalanceProgressStats(); @@ -279,8 +281,8 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { private void trackStatsInZk() { Map<String, String> jobMetadata = createJobMetadata(_tableNameWithType, _rebalanceJobId, _tableRebalanceProgressStats, _tableRebalanceContext); - _pinotHelixResourceManager.addControllerJobToZK(_rebalanceJobId, jobMetadata, ControllerJobType.TABLE_REBALANCE, - prevJobMetadata -> { + ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, _rebalanceJobId, jobMetadata, + ControllerJobType.TABLE_REBALANCE, prevJobMetadata -> { // In addition to updating job progress status, the observer also checks if the job status is IN_PROGRESS. // If not, then no need to update the job status, and we keep this status to end the job promptly. if (prevJobMetadata == null) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java index 661fff70d9..369024faa6 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java @@ -29,20 +29,25 @@ import java.util.UUID; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.pinot.common.exception.RebalanceInProgressException; import org.apache.pinot.common.exception.TableNotFoundException; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager; import org.apache.pinot.spi.config.table.TableConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DefaultTenantRebalancer implements TenantRebalancer { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultTenantRebalancer.class); - PinotHelixResourceManager _pinotHelixResourceManager; - ExecutorService _executorService; + private final TableRebalanceManager _tableRebalanceManager; + private final PinotHelixResourceManager _pinotHelixResourceManager; + private final ExecutorService _executorService; - public DefaultTenantRebalancer(PinotHelixResourceManager pinotHelixResourceManager, ExecutorService executorService) { + public DefaultTenantRebalancer(TableRebalanceManager tableRebalanceManager, + PinotHelixResourceManager pinotHelixResourceManager, ExecutorService executorService) { + _tableRebalanceManager = tableRebalanceManager; _pinotHelixResourceManager = pinotHelixResourceManager; _executorService = executorService; } @@ -56,13 +61,13 @@ public class DefaultTenantRebalancer implements TenantRebalancer { RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config); rebalanceConfig.setDryRun(true); rebalanceResult.put(table, - _pinotHelixResourceManager.rebalanceTable(table, rebalanceConfig, createUniqueRebalanceJobIdentifier(), - false)); - } catch (TableNotFoundException exception) { + _tableRebalanceManager.rebalanceTable(table, rebalanceConfig, createUniqueRebalanceJobIdentifier(), false)); + } catch (TableNotFoundException | RebalanceInProgressException exception) { rebalanceResult.put(table, new RebalanceResult(null, RebalanceResult.Status.FAILED, exception.getMessage(), null, null, null, null, null)); } }); + if (config.isDryRun()) { return new TenantRebalanceResult(null, rebalanceResult, config.isVerboseResult()); } else { @@ -198,7 +203,7 @@ public class DefaultTenantRebalancer implements TenantRebalancer { TenantRebalanceObserver observer) { try { observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_STARTED_TRIGGER, tableName, rebalanceJobId); - RebalanceResult result = _pinotHelixResourceManager.rebalanceTable(tableName, config, rebalanceJobId, true); + RebalanceResult result = _tableRebalanceManager.rebalanceTable(tableName, config, rebalanceJobId, true); if (result.getStatus().equals(RebalanceResult.Status.DONE)) { observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER, tableName, null); } else { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java index 17dd44378b..b2144260c0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java @@ -45,6 +45,7 @@ import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager; import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer; import org.apache.pinot.controller.util.TableTierReader; import org.apache.pinot.spi.config.table.TableConfig; @@ -65,6 +66,7 @@ import org.slf4j.LoggerFactory; public class SegmentRelocator extends ControllerPeriodicTask<Void> { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentRelocator.class); + private final TableRebalanceManager _tableRebalanceManager; private final ExecutorService _executorService; private final HttpClientConnectionManager _connectionManager; private final boolean _enableLocalTierMigration; @@ -87,12 +89,14 @@ public class SegmentRelocator extends ControllerPeriodicTask<Void> { @Nullable private final Set<String> _tablesUndergoingRebalance; - public SegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, - LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics, - ExecutorService executorService, HttpClientConnectionManager connectionManager) { + public SegmentRelocator(TableRebalanceManager tableRebalanceManager, + PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, + ControllerConf config, ControllerMetrics controllerMetrics, ExecutorService executorService, + HttpClientConnectionManager connectionManager) { super(SegmentRelocator.class.getSimpleName(), config.getSegmentRelocatorFrequencyInSeconds(), config.getSegmentRelocatorInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager, controllerMetrics); + _tableRebalanceManager = tableRebalanceManager; _executorService = executorService; _connectionManager = connectionManager; _enableLocalTierMigration = config.enableSegmentRelocatorLocalTierMigration(); @@ -145,9 +149,9 @@ public class SegmentRelocator extends ControllerPeriodicTask<Void> { } else { LOGGER.info("The previous rebalance has not yet completed, skip rebalancing table {}", tableNameWithType); } - return; + } else { + putTableToWait(tableNameWithType); } - putTableToWait(tableNameWithType); } @VisibleForTesting @@ -225,7 +229,10 @@ public class SegmentRelocator extends ControllerPeriodicTask<Void> { // all segments are put on the right servers. If any segments are not on their target tier, the server local // tier migration is triggered for them, basically asking the hosting servers to reload them. The segment // target tier may get changed between the two sequential actions, but cluster states converge eventually. - RebalanceResult rebalance = _pinotHelixResourceManager.rebalanceTable(tableNameWithType, rebalanceConfig, + + // We're not using the async rebalance API here because we want to run this on a separate thread pool from the + // rebalance thread pool that is used for user initiated rebalances. + RebalanceResult rebalance = _tableRebalanceManager.rebalanceTable(tableNameWithType, rebalanceConfig, TableRebalancer.createUniqueRebalanceJobIdentifier(), false); switch (rebalance.getStatus()) { case NO_OP: diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtils.java new file mode 100644 index 0000000000..f5f7ee12e8 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtils.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.util; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import org.apache.helix.AccessOption; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.zookeeper.data.Stat; + + +public class ControllerZkHelixUtils { + + private ControllerZkHelixUtils() { + // Utility class + } + + /** + * Adds a new job metadata entry for a controller job like table rebalance or segment reload into ZK + * + * @param propertyStore the ZK property store to write to + * @param jobId job's UUID + * @param jobMetadata the job metadata + * @param jobType the type of the job to figure out where the job metadata is kept in ZK + * @param prevJobMetadataChecker to check the previous job metadata before adding new one + * @return boolean representing success / failure of the ZK write step + */ + public static boolean addControllerJobToZK(ZkHelixPropertyStore<ZNRecord> propertyStore, String jobId, + Map<String, String> jobMetadata, String jobType, Predicate<Map<String, String>> prevJobMetadataChecker) { + Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS) != null, + CommonConstants.ControllerJob.SUBMISSION_TIME_MS + + " in JobMetadata record not set. Cannot expire these records"); + String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType); + Stat stat = new Stat(); + ZNRecord jobsZnRecord = propertyStore.get(jobResourcePath, stat, AccessOption.PERSISTENT); + if (jobsZnRecord != null) { + Map<String, Map<String, String>> jobMetadataMap = jobsZnRecord.getMapFields(); + Map<String, String> prevJobMetadata = jobMetadataMap.get(jobId); + if (!prevJobMetadataChecker.test(prevJobMetadata)) { + return false; + } + jobMetadataMap.put(jobId, jobMetadata); + if (jobMetadataMap.size() > CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) { + jobMetadataMap = jobMetadataMap.entrySet().stream().sorted((v1, v2) -> Long.compare( + Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)), + Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)))) + .collect(Collectors.toList()).subList(0, CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) + .stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + jobsZnRecord.setMapFields(jobMetadataMap); + return propertyStore.set(jobResourcePath, jobsZnRecord, stat.getVersion(), AccessOption.PERSISTENT); + } else { + jobsZnRecord = new ZNRecord(jobResourcePath); + jobsZnRecord.setMapField(jobId, jobMetadata); + return propertyStore.set(jobResourcePath, jobsZnRecord, AccessOption.PERSISTENT); + } + } + + /** + * Get all controller jobs from ZK for a given set of job types. + * @param jobTypes the set of job types to filter + * @param jobMetadataChecker a predicate to filter the job metadata + * @param propertyStore the ZK property store to read from + * @return a map of jobId to job metadata for all the jobs that match the given job types and metadata checker + */ + public static Map<String, Map<String, String>> getAllControllerJobs(Set<String> jobTypes, + Predicate<Map<String, String>> jobMetadataChecker, ZkHelixPropertyStore<ZNRecord> propertyStore) { + Map<String, Map<String, String>> controllerJobs = new HashMap<>(); + for (String jobType : jobTypes) { + String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType); + ZNRecord jobsZnRecord = propertyStore.get(jobResourcePath, null, AccessOption.PERSISTENT); + if (jobsZnRecord == null) { + continue; + } + Map<String, Map<String, String>> jobMetadataMap = jobsZnRecord.getMapFields(); + for (Map.Entry<String, Map<String, String>> jobMetadataEntry : jobMetadataMap.entrySet()) { + String jobId = jobMetadataEntry.getKey(); + Map<String, String> jobMetadata = jobMetadataEntry.getValue(); + Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.JOB_TYPE).equals(jobType), + "Got unexpected jobType: %s at jobResourcePath: %s with jobId: %s", jobType, jobResourcePath, jobId); + if (jobMetadataChecker.test(jobMetadata)) { + controllerJobs.put(jobId, jobMetadata); + } + } + } + return controllerJobs; + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java index 55e0bb1772..176cec96ae 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java @@ -34,6 +34,7 @@ import javax.annotation.Nullable; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hc.client5.http.entity.EntityBuilder; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; @@ -69,6 +70,8 @@ import org.apache.pinot.controller.api.access.AllowAllAccessFactory; import org.apache.pinot.controller.api.resources.PauseStatusDetails; import org.apache.pinot.controller.api.resources.TableViews; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager; +import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; import org.apache.pinot.spi.config.table.QueryConfig; import org.apache.pinot.spi.config.table.QuotaConfig; @@ -158,6 +161,8 @@ public class ControllerTest { protected HelixDataAccessor _helixDataAccessor; protected HelixAdmin _helixAdmin; protected ZkHelixPropertyStore<ZNRecord> _propertyStore; + protected TableRebalanceManager _tableRebalanceManager; + protected TableSizeReader _tableSizeReader; /** * Acquire the {@link ControllerTest} default instance that can be shared across different test cases. @@ -303,6 +308,8 @@ public class ControllerTest { _controllerDataDir = _controllerConfig.getDataDir(); _helixResourceManager = _controllerStarter.getHelixResourceManager(); _helixManager = _controllerStarter.getHelixControllerManager(); + _tableRebalanceManager = _controllerStarter.getTableRebalanceManager(); + _tableSizeReader = _controllerStarter.getTableSizeReader(); _helixDataAccessor = _helixManager.getHelixDataAccessor(); ConfigAccessor configAccessor = _helixManager.getConfigAccessor(); // HelixResourceManager is null in Helix only mode, while HelixManager is null in Pinot only mode. @@ -1028,6 +1035,24 @@ public class ControllerTest { } } + /** + * Sends a POST request to the specified URL with the given payload and returns the status code along with the + * stringified response. + * @param urlString the URL to send the POST request to + * @param payload the payload to send in the POST request + * @return a Pair containing the status code and the stringified response + */ + public static Pair<Integer, String> postRequestWithStatusCode(String urlString, String payload) + throws IOException { + try { + SimpleHttpResponse resp = + getHttpClient().sendJsonPostRequest(new URL(urlString).toURI(), payload, Collections.emptyMap()); + return Pair.of(resp.getStatusCode(), constructResponse(resp)); + } catch (URISyntaxException e) { + throw new IOException(e); + } + } + public static String sendPostRequestRaw(String urlString, String payload, Map<String, String> headers) throws IOException { try { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java index 22663fa333..d88b1319c1 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.AccessOption; @@ -49,10 +50,7 @@ import org.testng.annotations.Test; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; @@ -248,14 +246,21 @@ public class RebalanceCheckerTest { PinotHelixResourceManager helixManager = mock(PinotHelixResourceManager.class); when(helixManager.getTableConfig(tableName)).thenReturn(tableConfig); when(helixManager.getAllJobs(any(), any())).thenReturn(allJobMetadata); - RebalanceChecker checker = new RebalanceChecker(helixManager, leadController, cfg, metrics, exec); + ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class); + when(helixManager.getPropertyStore()).thenReturn(propertyStore); + TableRebalanceManager tableRebalanceManager = mock(TableRebalanceManager.class); + when(tableRebalanceManager.rebalanceTableAsync(anyString(), any(TableConfig.class), anyString(), + any(RebalanceConfig.class), any(ZkBasedTableRebalanceObserver.class))).thenReturn( + CompletableFuture.completedFuture(null)); + RebalanceChecker checker = new RebalanceChecker(tableRebalanceManager, helixManager, leadController, cfg, metrics); // Although job1_3 was submitted most recently but job1 had exceeded maxAttempts. Chose job3 to retry, which got // stuck at in progress status. checker.retryRebalanceTable(tableName, allJobMetadata); // The new retry job is for job3 and attemptId is increased to 2. ArgumentCaptor<ZkBasedTableRebalanceObserver> observerCaptor = ArgumentCaptor.forClass(ZkBasedTableRebalanceObserver.class); - verify(helixManager, times(1)).rebalanceTable(eq(tableName), any(), anyString(), any(), observerCaptor.capture()); + verify(tableRebalanceManager, times(1)).rebalanceTableAsync(eq(tableName), any(), anyString(), any(), + observerCaptor.capture()); ZkBasedTableRebalanceObserver observer = observerCaptor.getValue(); jobCtx = observer.getTableRebalanceContext(); assertEquals(jobCtx.getOriginalJobId(), "job3"); @@ -286,12 +291,19 @@ public class RebalanceCheckerTest { PinotHelixResourceManager helixManager = mock(PinotHelixResourceManager.class); TableConfig tableConfig = mock(TableConfig.class); when(helixManager.getTableConfig(tableName)).thenReturn(tableConfig); - RebalanceChecker checker = new RebalanceChecker(helixManager, leadController, cfg, metrics, exec); + ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class); + when(helixManager.getPropertyStore()).thenReturn(propertyStore); + TableRebalanceManager tableRebalanceManager = mock(TableRebalanceManager.class); + when(tableRebalanceManager.rebalanceTableAsync(anyString(), any(TableConfig.class), anyString(), + any(RebalanceConfig.class), any(ZkBasedTableRebalanceObserver.class))).thenReturn( + CompletableFuture.completedFuture(null)); + RebalanceChecker checker = new RebalanceChecker(tableRebalanceManager, helixManager, leadController, cfg, metrics); checker.retryRebalanceTable(tableName, allJobMetadata); // Retry for job1 is delayed with 5min backoff. ArgumentCaptor<ZkBasedTableRebalanceObserver> observerCaptor = ArgumentCaptor.forClass(ZkBasedTableRebalanceObserver.class); - verify(helixManager, times(0)).rebalanceTable(eq(tableName), any(), anyString(), any(), observerCaptor.capture()); + verify(tableRebalanceManager, never()).rebalanceTable(eq(tableName), any(), anyString(), any(), + observerCaptor.capture()); // Set initial delay to 0 to disable retry backoff. jobCfg.setRetryInitialDelayInMs(0); @@ -300,7 +312,8 @@ public class RebalanceCheckerTest { checker.retryRebalanceTable(tableName, allJobMetadata); // Retry for job1 is delayed with 0 backoff. observerCaptor = ArgumentCaptor.forClass(ZkBasedTableRebalanceObserver.class); - verify(helixManager, times(1)).rebalanceTable(eq(tableName), any(), anyString(), any(), observerCaptor.capture()); + verify(tableRebalanceManager, times(1)).rebalanceTableAsync(eq(tableName), any(), anyString(), any(), + observerCaptor.capture()); } @Test diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java index 64cbff92c0..64b8a6e5f5 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java @@ -128,7 +128,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker(); preChecker.init(_helixResourceManager, executorService, 1); TableRebalancer tableRebalancer = - new TableRebalancer(_helixManager, null, null, preChecker, _helixResourceManager.getTableSizeReader()); + new TableRebalancer(_helixManager, null, null, preChecker, _tableSizeReader); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build(); @@ -699,8 +699,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { ExecutorService executorService = Executors.newFixedThreadPool(10); DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker(); preChecker.init(_helixResourceManager, executorService, 1); - TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, null, preChecker, - _helixResourceManager.getTableSizeReader()); + TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, null, preChecker, _tableSizeReader); // Set up the table with 1 replication factor and strict replica group enabled TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(1) @@ -982,8 +981,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { ExecutorService executorService = Executors.newFixedThreadPool(10); DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker(); preChecker.init(_helixResourceManager, executorService, 1); - TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, null, preChecker, - _helixResourceManager.getTableSizeReader()); + TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, null, preChecker, _tableSizeReader); // Set up the table with 1 replication factor and strict replica group enabled TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(1) @@ -1042,7 +1040,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker(); preChecker.init(_helixResourceManager, executorService, 0.5); TableRebalancer tableRebalancer = - new TableRebalancer(_helixManager, null, null, preChecker, _helixResourceManager.getTableSizeReader()); + new TableRebalancer(_helixManager, null, null, preChecker, _tableSizeReader); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build(); @@ -1142,7 +1140,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker(); preChecker.init(_helixResourceManager, executorService, 0.5); TableRebalancer tableRebalancer = - new TableRebalancer(_helixManager, null, null, preChecker, _helixResourceManager.getTableSizeReader()); + new TableRebalancer(_helixManager, null, null, preChecker, _tableSizeReader); TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) .setNumReplicas(2) @@ -1557,7 +1555,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { ExecutorService executorService = Executors.newFixedThreadPool(10); preChecker.init(_helixResourceManager, executorService, 1); TableRebalancer tableRebalancer = - new TableRebalancer(_helixManager, null, null, preChecker, _helixResourceManager.getTableSizeReader()); + new TableRebalancer(_helixManager, null, null, preChecker, _tableSizeReader); // Try dry-run summary mode RebalanceConfig rebalanceConfig = new RebalanceConfig(); @@ -2114,7 +2112,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { ConsumingSegmentInfoReader mockConsumingSegmentInfoReader = Mockito.mock(ConsumingSegmentInfoReader.class); TableRebalancer tableRebalancerOriginal = - new TableRebalancer(_helixManager, null, null, null, _helixResourceManager.getTableSizeReader()); + new TableRebalancer(_helixManager, null, null, null, _tableSizeReader); TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) .setNumReplicas(numReplica) @@ -2233,9 +2231,8 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true); } - ConsumingSegmentInfoReader mockConsumingSegmentInfoReader = Mockito.mock(ConsumingSegmentInfoReader.class); TableRebalancer tableRebalancerOriginal = - new TableRebalancer(_helixManager, null, null, null, _helixResourceManager.getTableSizeReader()); + new TableRebalancer(_helixManager, null, null, null, _tableSizeReader); TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) .setNumReplicas(numReplica) diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java index b0d86702e9..f8db37a5aa 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java @@ -24,9 +24,10 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; -import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils; import org.testng.annotations.Test; @@ -35,6 +36,8 @@ import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.Segmen import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE; import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; @@ -45,14 +48,15 @@ import static org.testng.Assert.assertTrue; public class TestZkBasedTableRebalanceObserver { @Test void testZkObserverProgressStats() { - PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class); + ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class); // Mocking this. We will verify using numZkUpdate stat - when(pinotHelixResourceManager.addControllerJobToZK(any(), any(), any())).thenReturn(true); + when(propertyStore.set(anyString(), any(), anyInt())).thenReturn(true); + ControllerMetrics controllerMetrics = ControllerMetrics.get(); TableRebalanceContext retryCtx = new TableRebalanceContext(); retryCtx.setConfig(new RebalanceConfig()); ZkBasedTableRebalanceObserver observer = - new ZkBasedTableRebalanceObserver("dummy", "dummyId", retryCtx, pinotHelixResourceManager); + new ZkBasedTableRebalanceObserver("dummy", "dummyId", retryCtx, propertyStore); Map<String, Map<String, String>> source = new TreeMap<>(); Map<String, Map<String, String>> target = new TreeMap<>(); Map<String, Map<String, String>> targetIntermediate = new TreeMap<>(); @@ -206,14 +210,15 @@ public class TestZkBasedTableRebalanceObserver { // This is a test to verify if Zk stats are pushed out correctly @Test void testZkObserverTracking() { - PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class); + ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class); // Mocking this. We will verify using numZkUpdate stat - when(pinotHelixResourceManager.addControllerJobToZK(any(), any(), any())).thenReturn(true); + when(propertyStore.set(anyString(), any(), anyInt())).thenReturn(true); + ControllerMetrics controllerMetrics = ControllerMetrics.get(); TableRebalanceContext retryCtx = new TableRebalanceContext(); retryCtx.setConfig(new RebalanceConfig()); ZkBasedTableRebalanceObserver observer = - new ZkBasedTableRebalanceObserver("dummy", "dummyId", retryCtx, pinotHelixResourceManager); + new ZkBasedTableRebalanceObserver("dummy", "dummyId", retryCtx, propertyStore); Map<String, Map<String, String>> source = new TreeMap<>(); Map<String, Map<String, String>> target = new TreeMap<>(); target.put("segment1", diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java index 76189e9268..abac12b974 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java @@ -74,7 +74,8 @@ public class TenantRebalancerTest extends ControllerTest { addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + i, true); } - TenantRebalancer tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager, _executorService); + TenantRebalancer tenantRebalancer = + new DefaultTenantRebalancer(_tableRebalanceManager, _helixResourceManager, _executorService); // tag all servers and brokers to test tenant addTenantTagToInstances(TENANT_NAME); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java index 3c08b79d92..e6bbcdd026 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java @@ -37,6 +37,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager; import org.apache.pinot.controller.util.TableTierReader; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.util.TestUtils; @@ -119,8 +120,9 @@ public class SegmentRelocatorTest { ControllerConf conf = mock(ControllerConf.class); when(conf.isSegmentRelocatorRebalanceTablesSequentially()).thenReturn(true); SegmentRelocator relocator = - new SegmentRelocator(mock(PinotHelixResourceManager.class), mock(LeadControllerManager.class), conf, - mock(ControllerMetrics.class), mock(ExecutorService.class), mock(HttpClientConnectionManager.class)); + new SegmentRelocator(mock(TableRebalanceManager.class), mock(PinotHelixResourceManager.class), + mock(LeadControllerManager.class), conf, mock(ControllerMetrics.class), mock(ExecutorService.class), + mock(HttpClientConnectionManager.class)); int cnt = 10; Random random = new Random(); for (int i = 0; i < cnt; i++) { @@ -150,8 +152,9 @@ public class SegmentRelocatorTest { ControllerConf conf = mock(ControllerConf.class); when(conf.isSegmentRelocatorRebalanceTablesSequentially()).thenReturn(true); SegmentRelocator relocator = - new SegmentRelocator(mock(PinotHelixResourceManager.class), mock(LeadControllerManager.class), conf, - mock(ControllerMetrics.class), mock(ExecutorService.class), mock(HttpClientConnectionManager.class)); + new SegmentRelocator(mock(TableRebalanceManager.class), mock(PinotHelixResourceManager.class), + mock(LeadControllerManager.class), conf, mock(ControllerMetrics.class), mock(ExecutorService.class), + mock(HttpClientConnectionManager.class)); ExecutorService runner = Executors.newCachedThreadPool(); Random random = new Random(); int cnt = 10; diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java index 2f51fd6250..11f16e1e53 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java @@ -26,7 +26,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.ws.rs.core.Response; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.exception.HttpErrorStatusException; +import org.apache.pinot.common.metadata.controllerjob.ControllerJobType; import org.apache.pinot.common.tier.TierFactory; import org.apache.pinot.common.utils.SimpleHttpResponse; import org.apache.pinot.common.utils.config.TagNameUtils; @@ -37,9 +40,13 @@ import org.apache.pinot.common.utils.regex.Pattern; import org.apache.pinot.controller.api.resources.ServerReloadControllerJobStatusResponse; import org.apache.pinot.controller.helix.core.rebalance.DefaultRebalancePreChecker; import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; import org.apache.pinot.controller.helix.core.rebalance.RebalancePreCheckerResult; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; import org.apache.pinot.controller.helix.core.rebalance.RebalanceSummaryResult; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer; +import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils; import org.apache.pinot.server.starter.helix.BaseServerStarter; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -54,9 +61,11 @@ import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.MetricFieldSpec; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.Enablement; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.StringUtil; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; import org.testng.Assert; import org.testng.annotations.Test; @@ -1258,6 +1267,55 @@ public class TableRebalanceIntegrationTest extends BaseHybridClusterIntegrationT assertEquals(numServersUnchanged, summaryResult.getServerInfo().getServersUnchanged().size()); } + @Test + public void testDisallowMultipleConcurrentRebalancesOnSameTable() throws Exception { + // Manually write an IN_PROGRESS rebalance job to ZK instead of trying to collide multiple actual rebalance + // attempts which will be prone to race conditions and cause this test to be flaky. We only reject a rebalance job + // if there is an IN_PROGRESS rebalance job for the same table in ZK, so we could actually end up with more than + // one active rebalance job if both are started at the exact same time since the progress stats are written to ZK + // after some initial pre-checks are done. However, rebalances are idempotent, and we don't actually care too much + // about avoiding this edge case race condition as long as in most cases we are able to prevent users from + // triggering a rebalance for a table that already has an in-progress rebalance job. + String jobId = TableRebalancer.createUniqueRebalanceJobIdentifier(); + String tableNameWithType = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(getTableName()); + TableRebalanceProgressStats progressStats = new TableRebalanceProgressStats(); + progressStats.setStatus(RebalanceResult.Status.IN_PROGRESS); + Map<String, String> jobMetadata = new HashMap<>(); + jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType); + jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); + jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis())); + jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE); + jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, + JsonUtils.objectToString(progressStats)); + ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, jobId, jobMetadata, ControllerJobType.TABLE_REBALANCE, + prevJobMetadata -> true); + + // Add a new server (to force change in instance assignment) and enable reassignInstances to ensure that the + // rebalance is not a NO_OP + BaseServerStarter serverStarter = startOneServer(NUM_SERVERS); + createServerTenant(getServerTenant(), 1, 0); + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setReassignInstances(true); + + Pair<Integer, String> response = + postRequestWithStatusCode(getRebalanceUrl(rebalanceConfig, TableType.OFFLINE), null); + assertEquals(response.getLeft(), Response.Status.CONFLICT.getStatusCode()); + assertTrue(response.getRight().contains("Rebalance job is already in progress for table")); + + // Update the job status to DONE to allow other tests to run + progressStats.setStatus(RebalanceResult.Status.DONE); + jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, + JsonUtils.objectToString(progressStats)); + ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, jobId, jobMetadata, ControllerJobType.TABLE_REBALANCE, + prevJobMetadata -> true); + + // Stop the added server + serverStarter.stop(); + TestUtils.waitForCondition( + aVoid -> getHelixResourceManager().dropInstance(serverStarter.getInstanceId()).isSuccessful(), + 60_000L, "Failed to drop added server"); + } + private String getReloadJobIdFromResponse(String response) { Pattern pattern = new JavaUtilPattern("([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"); Matcher matcher = pattern.matcher(response); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java index 39ccd02602..f935a22ba7 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java @@ -22,7 +22,10 @@ import com.google.common.base.Preconditions; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager; import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer; +import org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObserver; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.utils.Enablement; @@ -55,7 +58,22 @@ public class PinotTableRebalancer extends PinotZKChanger { public RebalanceResult rebalance(String tableNameWithType) { TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); Preconditions.checkState(tableConfig != null, "Failed to find table config for table: " + tableNameWithType); - return new TableRebalancer(_helixManager).rebalance(tableConfig, _rebalanceConfig, - TableRebalancer.createUniqueRebalanceJobIdentifier()); + + String jobId = TableRebalancer.createUniqueRebalanceJobIdentifier(); + + if (!_rebalanceConfig.isDryRun()) { + String rebalanceJobInProgress = TableRebalanceManager.rebalanceJobInProgress(tableNameWithType, _propertyStore); + if (rebalanceJobInProgress != null) { + String errorMsg = "Rebalance job is already in progress for table: " + tableNameWithType + ", jobId: " + + rebalanceJobInProgress + ". Please wait for the job to complete or cancel it before starting a new one."; + return new RebalanceResult(jobId, RebalanceResult.Status.FAILED, errorMsg, null, null, null, null, null); + } + } + + ZkBasedTableRebalanceObserver rebalanceObserver = new ZkBasedTableRebalanceObserver(tableNameWithType, jobId, + TableRebalanceContext.forInitialAttempt(jobId, _rebalanceConfig), _propertyStore); + + return new TableRebalancer(_helixManager, rebalanceObserver, null, null, null) + .rebalance(tableConfig, _rebalanceConfig, jobId); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org