This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c7af34e7aa Reject rebalance requests for tables that already have an 
in progress rebalance job (#15990)
c7af34e7aa is described below

commit c7af34e7aae0ec0b0772ef9d0842d5970ad491f0
Author: Yash Mayya <yash.ma...@gmail.com>
AuthorDate: Tue Jun 10 17:53:46 2025 +0100

    Reject rebalance requests for tables that already have an in progress 
rebalance job (#15990)
---
 .../exception/RebalanceInProgressException.java    |  33 +++
 .../pinot/controller/BaseControllerStarter.java    |  47 +++-
 .../api/resources/PinotRealtimeTableResource.java  |   2 +-
 .../api/resources/PinotTableRestletResource.java   |  89 ++----
 .../helix/core/PinotHelixResourceManager.java      | 142 +---------
 .../helix/core/rebalance/RebalanceChecker.java     |  47 ++--
 .../core/rebalance/TableRebalanceManager.java      | 307 +++++++++++++++++++++
 .../helix/core/rebalance/TableRebalancer.java      |   7 +-
 .../rebalance/ZkBasedTableRebalanceObserver.java   |  16 +-
 .../rebalance/tenant/DefaultTenantRebalancer.java  |  19 +-
 .../helix/core/relocation/SegmentRelocator.java    |  19 +-
 .../helix/core/util/ControllerZkHelixUtils.java    | 111 ++++++++
 .../pinot/controller/helix/ControllerTest.java     |  25 ++
 .../helix/core/rebalance/RebalanceCheckerTest.java |  31 ++-
 .../TableRebalancerClusterStatelessTest.java       |  19 +-
 .../TestZkBasedTableRebalanceObserver.java         |  19 +-
 .../rebalance/tenant/TenantRebalancerTest.java     |   3 +-
 .../core/relocation/SegmentRelocatorTest.java      |  11 +-
 .../tests/TableRebalanceIntegrationTest.java       |  58 ++++
 .../apache/pinot/tools/PinotTableRebalancer.java   |  22 +-
 20 files changed, 732 insertions(+), 295 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/exception/RebalanceInProgressException.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/exception/RebalanceInProgressException.java
new file mode 100644
index 0000000000..1b14e31394
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/exception/RebalanceInProgressException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.exception;
+
+/**
+ * Exception thrown when a rebalance operation is attempted while another 
rebalance is already in progress for the same
+ * table. This helps to prevent concurrent table rebalances.
+ */
+public class RebalanceInProgressException extends Exception {
+  public RebalanceInProgressException(String message) {
+    super(message);
+  }
+
+  public RebalanceInProgressException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 6b9b88f88e..545ddd95aa 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -101,6 +101,9 @@ import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentMa
 import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig;
 import 
org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceChecker;
+import org.apache.pinot.controller.helix.core.rebalance.RebalancePreChecker;
+import 
org.apache.pinot.controller.helix.core.rebalance.RebalancePreCheckerFactory;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
 import 
org.apache.pinot.controller.helix.core.rebalance.tenant.DefaultTenantRebalancer;
 import 
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalancer;
 import org.apache.pinot.controller.helix.core.relocation.SegmentRelocator;
@@ -203,11 +206,15 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
   protected TaskMetricsEmitter _taskMetricsEmitter;
   protected PoolingHttpClientConnectionManager _connectionManager;
   protected TenantRebalancer _tenantRebalancer;
-  protected ExecutorService _tenantRebalanceExecutorService;
+  // This executor should be used by all code paths for user initiated 
rebalances, so that the controller config
+  // CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS is honored.
+  protected ExecutorService _rebalancerExecutorService;
   protected TableSizeReader _tableSizeReader;
   protected StorageQuotaChecker _storageQuotaChecker;
   protected DiskUtilizationChecker _diskUtilizationChecker;
   protected ResourceUtilizationManager _resourceUtilizationManager;
+  protected RebalancePreChecker _rebalancePreChecker;
+  protected TableRebalanceManager _tableRebalanceManager;
 
   @Override
   public void init(PinotConfiguration pinotConfiguration)
@@ -264,9 +271,6 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
       // Do not use this before the invocation of {@link 
PinotHelixResourceManager::start()}, which happens in {@link
       // ControllerStarter::start()}
       _helixResourceManager = createHelixResourceManager();
-      _tenantRebalanceExecutorService = 
createExecutorService(_config.getControllerExecutorRebalanceNumThreads(),
-          "tenant-rebalance-thread-%d");
-      _tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager, 
_tenantRebalanceExecutorService);
     }
 
     // Initialize the table config tuner registry.
@@ -347,7 +351,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
    * @return A new instance of PinotHelixResourceManager.
    */
   protected PinotHelixResourceManager createHelixResourceManager() {
-    return new PinotHelixResourceManager(_config, _executorService);
+    return new PinotHelixResourceManager(_config);
   }
 
   public PinotHelixResourceManager getHelixResourceManager() {
@@ -389,6 +393,14 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     return _staleInstancesCleanupTask;
   }
 
+  public TableRebalanceManager getTableRebalanceManager() {
+    return _tableRebalanceManager;
+  }
+
+  public TableSizeReader getTableSizeReader() {
+    return _tableSizeReader;
+  }
+
   @Override
   public ServiceRole getServiceRole() {
     return ServiceRole.CONTROLLER;
@@ -536,12 +548,20 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     _tableSizeReader =
         new TableSizeReader(_executorService, _connectionManager, 
_controllerMetrics, _helixResourceManager,
             _leadControllerManager);
-    _helixResourceManager.registerTableSizeReader(_tableSizeReader);
     _storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader, 
_controllerMetrics, _leadControllerManager,
         _helixResourceManager, _config);
 
     _diskUtilizationChecker = new 
DiskUtilizationChecker(_helixResourceManager, _config);
     _resourceUtilizationManager = new ResourceUtilizationManager(_config, 
_diskUtilizationChecker);
+    _rebalancePreChecker = 
RebalancePreCheckerFactory.create(_config.getRebalancePreCheckerClass());
+    _rebalancePreChecker.init(_helixResourceManager, _executorService, 
_config.getDiskUtilizationThreshold());
+    _rebalancerExecutorService = 
createExecutorService(_config.getControllerExecutorRebalanceNumThreads(),
+        "rebalance-thread-%d");
+    _tableRebalanceManager =
+        new TableRebalanceManager(_helixResourceManager, _controllerMetrics, 
_rebalancePreChecker, _tableSizeReader,
+            _rebalancerExecutorService);
+    _tenantRebalancer =
+        new DefaultTenantRebalancer(_tableRebalanceManager, 
_helixResourceManager, _rebalancerExecutorService);
 
     // Setting up periodic tasks
     List<PeriodicTask> controllerPeriodicTasks = 
setupControllerPeriodicTasks();
@@ -579,6 +599,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
         bind(_helixParticipantInstanceId).named(CONTROLLER_INSTANCE_ID);
         bind(_helixResourceManager).to(PinotHelixResourceManager.class);
         
bind(_helixTaskResourceManager).to(PinotHelixTaskResourceManager.class);
+        bind(_tableRebalanceManager).to(TableRebalanceManager.class);
         bind(_segmentCompletionManager).to(SegmentCompletionManager.class);
         bind(_taskManager).to(PinotTaskManager.class);
         bind(_taskManagerStatusCache).to(TaskManagerStatusCache.class);
@@ -857,15 +878,17 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
         new SegmentStatusChecker(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
             _tableSizeReader);
     periodicTasks.add(_segmentStatusChecker);
-    _rebalanceChecker = new RebalanceChecker(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
-        _executorService);
+    _rebalanceChecker =
+        new RebalanceChecker(_tableRebalanceManager, _helixResourceManager, 
_leadControllerManager, _config,
+            _controllerMetrics);
     periodicTasks.add(_rebalanceChecker);
     _realtimeConsumerMonitor =
         new RealtimeConsumerMonitor(_config, _helixResourceManager, 
_leadControllerManager, _controllerMetrics,
             _executorService);
     periodicTasks.add(_realtimeConsumerMonitor);
-    _segmentRelocator = new SegmentRelocator(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
-        _executorService, _connectionManager);
+    _segmentRelocator =
+        new SegmentRelocator(_tableRebalanceManager, _helixResourceManager, 
_leadControllerManager, _config,
+            _controllerMetrics, _executorService, _connectionManager);
     periodicTasks.add(_segmentRelocator);
     _staleInstancesCleanupTask =
         new StaleInstancesCleanupTask(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics);
@@ -965,8 +988,8 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
       LOGGER.info("Shutting down executor service");
       _executorService.shutdownNow();
       _executorService.awaitTermination(10L, TimeUnit.SECONDS);
-      _tenantRebalanceExecutorService.shutdownNow();
-      _tenantRebalanceExecutorService.awaitTermination(10L, TimeUnit.SECONDS);
+      _rebalancerExecutorService.shutdownNow();
+      _rebalancerExecutorService.awaitTermination(10L, TimeUnit.SECONDS);
     } catch (final Exception e) {
       LOGGER.error("Caught exception while shutting down", e);
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index 595169ce44..ca6cd35bc0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -259,7 +259,7 @@ public class PinotRealtimeTableResource {
       
controllerJobZKMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST,
           JsonUtils.objectToString(segmentsYetToBeCommitted));
       _pinotHelixResourceManager.addControllerJobToZK(forceCommitJobId, 
controllerJobZKMetadata,
-          ControllerJobType.FORCE_COMMIT, prev -> true);
+          ControllerJobType.FORCE_COMMIT);
     }
 
     Map<String, Object> result = new HashMap<>(controllerJobZKMetadata);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 30d8401165..bdb82ce766 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -45,8 +45,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
@@ -73,6 +73,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.exception.RebalanceInProgressException;
 import org.apache.pinot.common.exception.SchemaNotFoundException;
 import org.apache.pinot.common.exception.TableNotFoundException;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
@@ -97,10 +98,8 @@ import 
org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
 import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
-import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
-import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
-import 
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
 import org.apache.pinot.controller.recommender.RecommenderDriver;
 import org.apache.pinot.controller.tuner.TableConfigTunerUtils;
@@ -171,6 +170,9 @@ public class PinotTableRestletResource {
   @Inject
   PinotHelixResourceManager _pinotHelixResourceManager;
 
+  @Inject
+  TableRebalanceManager _tableRebalanceManager;
+
   @Inject
   PinotHelixTaskResourceManager _pinotHelixTaskResourceManager;
 
@@ -183,9 +185,6 @@ public class PinotTableRestletResource {
   @Inject
   ControllerMetrics _controllerMetrics;
 
-  @Inject
-  ExecutorService _executorService;
-
   @Inject
   AccessControlFactory _accessControlFactory;
 
@@ -695,31 +694,28 @@ public class PinotTableRestletResource {
 
     try {
       if (dryRun || preChecks || downtime) {
-        // For dry-run, preChecks or rebalance with downtime, directly return 
the rebalance result as it should return
-        // immediately
-        return _pinotHelixResourceManager.rebalanceTable(tableNameWithType, 
rebalanceConfig, rebalanceJobId, false);
+        // For dry-run, preChecks or rebalance with downtime, it's fine to run 
the rebalance synchronously since it
+        // should be a really short operation.
+        return _tableRebalanceManager.rebalanceTable(tableNameWithType, 
rebalanceConfig, rebalanceJobId, false);
       } else {
         // Make a dry-run first to get the target assignment
         rebalanceConfig.setDryRun(true);
         RebalanceResult dryRunResult =
-            _pinotHelixResourceManager.rebalanceTable(tableNameWithType, 
rebalanceConfig, rebalanceJobId, false);
+            _tableRebalanceManager.rebalanceTable(tableNameWithType, 
rebalanceConfig, rebalanceJobId, false);
 
         if (dryRunResult.getStatus() == RebalanceResult.Status.DONE) {
           // If dry-run succeeded, run rebalance asynchronously
           rebalanceConfig.setDryRun(false);
-          Future<RebalanceResult> rebalanceResultFuture = 
_executorService.submit(() -> {
-            try {
-              return _pinotHelixResourceManager.rebalanceTable(
-                  tableNameWithType, rebalanceConfig, rebalanceJobId, true);
-            } catch (Throwable t) {
+          CompletableFuture<RebalanceResult> rebalanceResultFuture =
+              _tableRebalanceManager.rebalanceTableAsync(tableNameWithType, 
rebalanceConfig, rebalanceJobId, true);
+          rebalanceResultFuture.whenComplete((rebalanceResult, throwable) -> {
+            if (throwable != null) {
               String errorMsg = String.format("Caught exception/error while 
rebalancing table: %s", tableNameWithType);
-              LOGGER.error(errorMsg, t);
-              return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED, errorMsg, null, null, null,
-                  null, null);
+              LOGGER.error(errorMsg, throwable);
             }
           });
-          boolean isJobIdPersisted = waitForRebalanceToPersist(
-              dryRunResult.getJobId(), tableNameWithType, 
rebalanceResultFuture);
+          boolean isJobIdPersisted =
+              waitForRebalanceToPersist(dryRunResult.getJobId(), 
tableNameWithType, rebalanceResultFuture);
 
           if (rebalanceResultFuture.isDone()) {
             try {
@@ -744,6 +740,8 @@ public class PinotTableRestletResource {
       }
     } catch (TableNotFoundException e) {
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.NOT_FOUND);
+    } catch (RebalanceInProgressException e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.CONFLICT);
     }
   }
 
@@ -783,29 +781,7 @@ public class PinotTableRestletResource {
       @Context HttpHeaders headers) {
     tableName = DatabaseUtils.translateTableName(tableName, headers);
     String tableNameWithType = constructTableNameWithType(tableName, 
tableTypeStr);
-    List<String> cancelledJobIds = new ArrayList<>();
-    boolean updated =
-        _pinotHelixResourceManager.updateJobsForTable(tableNameWithType, 
ControllerJobType.TABLE_REBALANCE,
-            jobMetadata -> {
-              String jobId = 
jobMetadata.get(CommonConstants.ControllerJob.JOB_ID);
-              try {
-                String jobStatsInStr = 
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
-                TableRebalanceProgressStats jobStats =
-                    JsonUtils.stringToObject(jobStatsInStr, 
TableRebalanceProgressStats.class);
-                if (jobStats.getStatus() != 
RebalanceResult.Status.IN_PROGRESS) {
-                  return;
-                }
-                cancelledJobIds.add(jobId);
-                LOGGER.info("Cancel rebalance job: {} for table: {}", jobId, 
tableNameWithType);
-                jobStats.setStatus(RebalanceResult.Status.CANCELLED);
-                
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
-                    JsonUtils.objectToString(jobStats));
-              } catch (Exception e) {
-                LOGGER.error("Failed to cancel rebalance job: {} for table: 
{}", jobId, tableNameWithType, e);
-              }
-            });
-    LOGGER.info("Tried to cancel existing jobs at best effort and done: {}", 
updated);
-    return cancelledJobIds;
+    return _tableRebalanceManager.cancelRebalance(tableNameWithType);
   }
 
   @GET
@@ -818,30 +794,7 @@ public class PinotTableRestletResource {
   public ServerRebalanceJobStatusResponse rebalanceStatus(
       @ApiParam(value = "Rebalance Job Id", required = true) 
@PathParam("jobId") String jobId)
       throws JsonProcessingException {
-    Map<String, String> controllerJobZKMetadata = 
getControllerJobMetadata(jobId);
-
-    if (controllerJobZKMetadata == null) {
-      throw new ControllerApplicationException(LOGGER, "Failed to find 
controller job id: " + jobId,
-          Response.Status.NOT_FOUND);
-    }
-    ServerRebalanceJobStatusResponse serverRebalanceJobStatusResponse = new 
ServerRebalanceJobStatusResponse();
-    TableRebalanceProgressStats tableRebalanceProgressStats = 
JsonUtils.stringToObject(
-        
controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS),
-        TableRebalanceProgressStats.class);
-    
serverRebalanceJobStatusResponse.setTableRebalanceProgressStats(tableRebalanceProgressStats);
-
-    long timeSinceStartInSecs = 0L;
-    if (RebalanceResult.Status.DONE != 
tableRebalanceProgressStats.getStatus()) {
-      timeSinceStartInSecs = (System.currentTimeMillis() - 
tableRebalanceProgressStats.getStartTimeMs()) / 1000;
-    }
-    
serverRebalanceJobStatusResponse.setTimeElapsedSinceStartInSeconds(timeSinceStartInSecs);
-
-    String jobCtxInStr = 
controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT);
-    if (StringUtils.isNotEmpty(jobCtxInStr)) {
-      TableRebalanceContext jobCtx = JsonUtils.stringToObject(jobCtxInStr, 
TableRebalanceContext.class);
-      serverRebalanceJobStatusResponse.setTableRebalanceContext(jobCtx);
-    }
-    return serverRebalanceJobStatusResponse;
+    return _tableRebalanceManager.getRebalanceStatus(jobId);
   }
 
   @GET
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 21894be117..b097ecd05b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -48,7 +48,6 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -154,15 +153,8 @@ import 
org.apache.pinot.controller.helix.core.lineage.LineageManager;
 import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
-import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
-import org.apache.pinot.controller.helix.core.rebalance.RebalancePreChecker;
-import 
org.apache.pinot.controller.helix.core.rebalance.RebalancePreCheckerFactory;
-import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
-import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
-import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
-import 
org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObserver;
+import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils;
 import org.apache.pinot.controller.helix.starter.HelixConfig;
-import org.apache.pinot.controller.util.TableSizeReader;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.config.DatabaseConfig;
 import org.apache.pinot.spi.config.instance.Instance;
@@ -245,13 +237,10 @@ public class PinotHelixResourceManager {
   private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
   private TableCache _tableCache;
   private final LineageManager _lineageManager;
-  private final RebalancePreChecker _rebalancePreChecker;
-  private TableSizeReader _tableSizeReader;
 
   public PinotHelixResourceManager(String zkURL, String helixClusterName, 
@Nullable String dataDir,
       boolean isSingleTenantCluster, boolean enableBatchMessageMode, int 
deletedSegmentsRetentionInDays,
-      boolean enableTieredSegmentAssignment, LineageManager lineageManager, 
RebalancePreChecker rebalancePreChecker,
-      @Nullable ExecutorService executorService, double 
diskUtilizationThreshold) {
+      boolean enableTieredSegmentAssignment, LineageManager lineageManager) {
     _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
     _helixClusterName = helixClusterName;
     _dataDir = dataDir;
@@ -274,26 +263,13 @@ public class PinotHelixResourceManager {
       _lineageUpdaterLocks[i] = new Object();
     }
     _lineageManager = lineageManager;
-    _rebalancePreChecker = rebalancePreChecker;
-    _rebalancePreChecker.init(this, executorService, diskUtilizationThreshold);
-  }
-
-  public PinotHelixResourceManager(ControllerConf controllerConf, @Nullable 
ExecutorService executorService) {
-    this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), 
controllerConf.getDataDir(),
-        controllerConf.tenantIsolationEnabled(), 
controllerConf.getEnableBatchMessageMode(),
-        controllerConf.getDeletedSegmentsRetentionInDays(), 
controllerConf.tieredSegmentAssignmentEnabled(),
-        LineageManagerFactory.create(controllerConf),
-        
RebalancePreCheckerFactory.create(controllerConf.getRebalancePreCheckerClass()),
 executorService,
-        controllerConf.getRebalanceDiskUtilizationThreshold());
   }
 
   public PinotHelixResourceManager(ControllerConf controllerConf) {
     this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), 
controllerConf.getDataDir(),
         controllerConf.tenantIsolationEnabled(), 
controllerConf.getEnableBatchMessageMode(),
         controllerConf.getDeletedSegmentsRetentionInDays(), 
controllerConf.tieredSegmentAssignmentEnabled(),
-        LineageManagerFactory.create(controllerConf),
-        
RebalancePreCheckerFactory.create(controllerConf.getRebalancePreCheckerClass()),
 null,
-        controllerConf.getRebalanceDiskUtilizationThreshold());
+        LineageManagerFactory.create(controllerConf));
   }
 
   /**
@@ -447,24 +423,6 @@ public class PinotHelixResourceManager {
     return _lineageManager;
   }
 
-  /**
-   * Get the rebalance pre-checker
-   *
-   * @return rebalance pre-checker
-   */
-  public RebalancePreChecker getRebalancePreChecker() {
-    return _rebalancePreChecker;
-  }
-
-  /**
-   * Get the table size reader.
-   *
-   * @return table size reader
-   */
-  public TableSizeReader getTableSizeReader() {
-    return _tableSizeReader;
-  }
-
 /**
  * Instance related APIs
  */
@@ -2059,10 +2017,6 @@ public class PinotHelixResourceManager {
     _pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
   }
 
-  public void registerTableSizeReader(TableSizeReader tableSizeReader) {
-    _tableSizeReader = tableSizeReader;
-  }
-
   private void assignInstances(TableConfig tableConfig, boolean override) {
     String tableNameWithType = tableConfig.getTableName();
     String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
@@ -2440,25 +2394,7 @@ public class PinotHelixResourceManager {
    */
   public Map<String, Map<String, String>> getAllJobs(Set<String> jobTypes,
       Predicate<Map<String, String>> jobMetadataChecker) {
-    Map<String, Map<String, String>> controllerJobs = new HashMap<>();
-    for (String jobType : jobTypes) {
-      String jobResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
-      ZNRecord jobsZnRecord = _propertyStore.get(jobResourcePath, null, 
AccessOption.PERSISTENT);
-      if (jobsZnRecord == null) {
-        continue;
-      }
-      Map<String, Map<String, String>> jobMetadataMap = 
jobsZnRecord.getMapFields();
-      for (Map.Entry<String, Map<String, String>> jobMetadataEntry : 
jobMetadataMap.entrySet()) {
-        String jobId = jobMetadataEntry.getKey();
-        Map<String, String> jobMetadata = jobMetadataEntry.getValue();
-        
Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.JOB_TYPE).equals(jobType),
-            "Got unexpected jobType: %s at jobResourcePath: %s with jobId: 
%s", jobType, jobResourcePath, jobId);
-        if (jobMetadataChecker.test(jobMetadata)) {
-          controllerJobs.put(jobId, jobMetadata);
-        }
-      }
-    }
-    return controllerJobs;
+    return ControllerZkHelixUtils.getAllControllerJobs(jobTypes, 
jobMetadataChecker, _propertyStore);
   }
 
   /**
@@ -2538,37 +2474,12 @@ public class PinotHelixResourceManager {
    * @param jobId job's UUID
    * @param jobMetadata the job metadata
    * @param jobType the type of the job to figure out where job metadata is 
kept in ZK
-   * @param prevJobMetadataChecker to check the previous job metadata before 
adding new one
    * @return boolean representing success / failure of the ZK write step
    */
   public boolean addControllerJobToZK(String jobId, Map<String, String> 
jobMetadata, String jobType,
       Predicate<Map<String, String>> prevJobMetadataChecker) {
-    
Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)
 != null,
-        "Submission Time in JobMetadata record not set. Cannot expire these 
records");
-    String jobResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
-    Stat stat = new Stat();
-    ZNRecord jobsZnRecord = _propertyStore.get(jobResourcePath, stat, 
AccessOption.PERSISTENT);
-    if (jobsZnRecord != null) {
-      Map<String, Map<String, String>> jobMetadataMap = 
jobsZnRecord.getMapFields();
-      Map<String, String> prevJobMetadata = jobMetadataMap.get(jobId);
-      if (!prevJobMetadataChecker.test(prevJobMetadata)) {
-        return false;
-      }
-      jobMetadataMap.put(jobId, jobMetadata);
-      if (jobMetadataMap.size() > 
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) {
-        jobMetadataMap = jobMetadataMap.entrySet().stream().sorted((v1, v2) -> 
Long.compare(
-                
Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)),
-                
Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS))))
-            .collect(Collectors.toList()).subList(0, 
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK)
-            .stream().collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
-      }
-      jobsZnRecord.setMapFields(jobMetadataMap);
-      return _propertyStore.set(jobResourcePath, jobsZnRecord, 
stat.getVersion(), AccessOption.PERSISTENT);
-    } else {
-      jobsZnRecord = new ZNRecord(jobResourcePath);
-      jobsZnRecord.setMapField(jobId, jobMetadata);
-      return _propertyStore.set(jobResourcePath, jobsZnRecord, 
AccessOption.PERSISTENT);
-    }
+    return ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, jobId, 
jobMetadata, jobType,
+        prevJobMetadataChecker);
   }
 
   /**
@@ -3874,47 +3785,10 @@ public class PinotHelixResourceManager {
         "Instance: " + instanceName + (enableInstance ? " enable" : " 
disable") + " failed, timeout");
   }
 
-  /**
-   * Entry point for table Rebalacing.
-   * @param tableNameWithType
-   * @param rebalanceConfig
-   * @param trackRebalanceProgress whether to track rebalance progress stats
-   * @return RebalanceResult
-   * @throws TableNotFoundException
-   */
-  public RebalanceResult rebalanceTable(String tableNameWithType, 
RebalanceConfig rebalanceConfig,
-      String rebalanceJobId, boolean trackRebalanceProgress)
-      throws TableNotFoundException {
-    TableConfig tableConfig = getTableConfig(tableNameWithType);
-    if (tableConfig == null) {
-      throw new TableNotFoundException("Failed to find table config for table: 
" + tableNameWithType);
-    }
-    Preconditions.checkState(rebalanceJobId != null, "RebalanceId not 
populated in the rebalanceConfig");
-    ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver = null;
-    if (trackRebalanceProgress) {
-      zkBasedTableRebalanceObserver = new 
ZkBasedTableRebalanceObserver(tableNameWithType, rebalanceJobId,
-          TableRebalanceContext.forInitialAttempt(rebalanceJobId, 
rebalanceConfig), this);
-    }
-    return rebalanceTable(tableNameWithType, tableConfig, rebalanceJobId, 
rebalanceConfig,
-        zkBasedTableRebalanceObserver);
-  }
-
-  public RebalanceResult rebalanceTable(String tableNameWithType, TableConfig 
tableConfig, String rebalanceJobId,
-      RebalanceConfig rebalanceConfig, @Nullable ZkBasedTableRebalanceObserver 
zkBasedTableRebalanceObserver) {
-    Map<String, Set<String>> tierToSegmentsMap = null;
-    if (rebalanceConfig.isUpdateTargetTier()) {
-      tierToSegmentsMap = updateTargetTier(rebalanceJobId, tableNameWithType, 
tableConfig);
-    }
-    TableRebalancer tableRebalancer =
-        new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver, 
_controllerMetrics, _rebalancePreChecker,
-            _tableSizeReader);
-    return tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
rebalanceJobId, tierToSegmentsMap);
-  }
-
   /// Calculates the target tier for the segments within a table, updates the 
segment ZK metadata and persists the
   /// update to ZK.
-  @VisibleForTesting
-  Map<String, Set<String>> updateTargetTier(String rebalanceJobId, String 
tableNameWithType, TableConfig tableConfig) {
+  public Map<String, Set<String>> updateTargetTier(String rebalanceJobId, 
String tableNameWithType,
+      TableConfig tableConfig) {
     List<Tier> sortedTiers =
         CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList()) ? 
getSortedTiers(tableConfig) : List.of();
     LOGGER.info("For rebalanceId: {}, updating target tiers for segments of 
table: {} with tierConfigs: {}",
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
index 817c4129b4..b16c6b8197 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
@@ -27,10 +27,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.exception.RebalanceInProgressException;
 import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMeter;
@@ -54,15 +54,15 @@ import org.slf4j.LoggerFactory;
 public class RebalanceChecker extends ControllerPeriodicTask<Void> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RebalanceChecker.class);
   private static final double RETRY_DELAY_SCALE_FACTOR = 2.0;
-  private final ExecutorService _executorService;
+  private final TableRebalanceManager _tableRebalanceManager;
 
-  public RebalanceChecker(PinotHelixResourceManager pinotHelixResourceManager,
-      LeadControllerManager leadControllerManager, ControllerConf config, 
ControllerMetrics controllerMetrics,
-      ExecutorService executorService) {
+  public RebalanceChecker(TableRebalanceManager tableRebalanceManager,
+      PinotHelixResourceManager pinotHelixResourceManager, 
LeadControllerManager leadControllerManager,
+      ControllerConf config, ControllerMetrics controllerMetrics) {
     super(RebalanceChecker.class.getSimpleName(), 
config.getRebalanceCheckerFrequencyInSeconds(),
         config.getRebalanceCheckerInitialDelayInSeconds(), 
pinotHelixResourceManager, leadControllerManager,
         controllerMetrics);
-    _executorService = executorService;
+    _tableRebalanceManager = tableRebalanceManager;
   }
 
   @Override
@@ -152,14 +152,7 @@ public class RebalanceChecker extends 
ControllerPeriodicTask<Void> {
     // thread, in order to avoid unnecessary ZK reads and making too many ZK 
reads in a short time.
     TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
     Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: %s", tableNameWithType);
-    _executorService.submit(() -> {
-      // Retry rebalance in another thread as rebalance can take time.
-      try {
-        retryRebalanceTableWithContext(tableNameWithType, tableConfig, jobCtx);
-      } catch (Throwable t) {
-        LOGGER.error("Failed to retry rebalance for table: {} asynchronously", 
tableNameWithType, t);
-      }
-    });
+    retryRebalanceTableWithContext(tableNameWithType, tableConfig, jobCtx);
   }
 
   private void retryRebalanceTableWithContext(String tableNameWithType, 
TableConfig tableConfig,
@@ -173,12 +166,23 @@ public class RebalanceChecker extends 
ControllerPeriodicTask<Void> {
         attemptJobId);
     _controllerMetrics.addMeteredTableValue(tableNameWithType, 
ControllerMeter.TABLE_REBALANCE_RETRY, 1L);
     ZkBasedTableRebalanceObserver observer =
-        new ZkBasedTableRebalanceObserver(tableNameWithType, attemptJobId, 
retryCtx, _pinotHelixResourceManager);
-    RebalanceResult result =
-        _pinotHelixResourceManager.rebalanceTable(tableNameWithType, 
tableConfig, attemptJobId, rebalanceConfig,
-            observer);
-    LOGGER.info("New attempt: {} for table: {} is done with result status: 
{}", attemptJobId, tableNameWithType,
-        result.getStatus());
+        new ZkBasedTableRebalanceObserver(tableNameWithType, attemptJobId, 
retryCtx,
+            _pinotHelixResourceManager.getPropertyStore());
+
+    try {
+      _tableRebalanceManager.rebalanceTableAsync(tableNameWithType, 
tableConfig, attemptJobId, rebalanceConfig,
+              observer)
+          .whenComplete((result, throwable) -> {
+            if (throwable != null) {
+              LOGGER.error("Failed to retry rebalance for table: {}", 
tableNameWithType, throwable);
+            } else {
+              LOGGER.info("New attempt: {} for table: {} is done with result 
status: {}", attemptJobId,
+                  tableNameWithType, result.getStatus());
+            }
+          });
+    } catch (RebalanceInProgressException e) {
+      LOGGER.warn("Rebalance job for table: {} is already in progress. 
Skipping retry.", tableNameWithType, e);
+    }
   }
 
   @VisibleForTesting
@@ -309,7 +313,8 @@ public class RebalanceChecker extends 
ControllerPeriodicTask<Void> {
       }
       // The job is considered failed, but it's possible it is still running, 
then we might end up with more than one
       // rebalance jobs running in parallel for a table. The rebalance 
algorithm is idempotent, so this should be fine
-      // for the correctness.
+      // for the correctness. Note that we do still abort this job before 
retrying, because we don't allow more than
+      // one actively running rebalance job (as per ZK) for a table.
       LOGGER.info("Found stuck rebalance job: {} for original job: {}", jobId, 
originalJobId);
       candidates.computeIfAbsent(originalJobId, (k) -> new 
HashSet<>()).add(Pair.of(jobCtx, jobStartTimeMs));
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java
new file mode 100644
index 0000000000..87de12b3d7
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
+import javax.ws.rs.NotFoundException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.exception.RebalanceInProgressException;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import 
org.apache.pinot.controller.api.resources.ServerRebalanceJobStatusResponse;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils;
+import org.apache.pinot.controller.util.TableSizeReader;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Single entry point for all table rebalance related operations. This class 
should be used to initiate table rebalance
+ * operations, rather than directly instantiating objects of {@link 
TableRebalancer}.
+ */
+public class TableRebalanceManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TableRebalanceManager.class);
+
+  private final PinotHelixResourceManager _resourceManager;
+  private final ControllerMetrics _controllerMetrics;
+  private final RebalancePreChecker _rebalancePreChecker;
+  private final TableSizeReader _tableSizeReader;
+  private final ExecutorService _executorService;
+
+  public TableRebalanceManager(PinotHelixResourceManager resourceManager, 
ControllerMetrics controllerMetrics,
+      RebalancePreChecker rebalancePreChecker, TableSizeReader 
tableSizeReader, ExecutorService executorService) {
+    _resourceManager = resourceManager;
+    _controllerMetrics = controllerMetrics;
+    _rebalancePreChecker = rebalancePreChecker;
+    _tableSizeReader = tableSizeReader;
+    _executorService = executorService;
+  }
+
+  /**
+   * Rebalance the table with the given name and type synchronously. It's the 
responsibility of the caller to ensure
+   * that this rebalance is run on the rebalance thread pool in the controller 
that respects the configuration
+   * {@link 
org.apache.pinot.controller.ControllerConf#CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS}.
+   *
+   * @param tableNameWithType name of the table to rebalance
+   * @param rebalanceConfig configuration for the rebalance operation
+   * @param rebalanceJobId ID of the rebalance job, which is used to track the 
progress of the rebalance operation
+   * @param trackRebalanceProgress whether to track rebalance progress stats 
in ZK
+   * @return result of the rebalance operation
+   * @throws TableNotFoundException if the table does not exist
+   */
+  public RebalanceResult rebalanceTable(String tableNameWithType, 
RebalanceConfig rebalanceConfig,
+      String rebalanceJobId, boolean trackRebalanceProgress)
+      throws TableNotFoundException, RebalanceInProgressException {
+    TableConfig tableConfig = 
_resourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      throw new TableNotFoundException("Failed to find table config for table: 
" + tableNameWithType);
+    }
+    Preconditions.checkState(rebalanceJobId != null, "RebalanceId not 
populated in the rebalanceConfig");
+    ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver = null;
+    if (trackRebalanceProgress) {
+      zkBasedTableRebalanceObserver = new 
ZkBasedTableRebalanceObserver(tableNameWithType, rebalanceJobId,
+          TableRebalanceContext.forInitialAttempt(rebalanceJobId, 
rebalanceConfig),
+          _resourceManager.getPropertyStore());
+    }
+    return rebalanceTable(tableNameWithType, tableConfig, rebalanceJobId, 
rebalanceConfig,
+        zkBasedTableRebalanceObserver);
+  }
+
+  /**
+   * Rebalance the table with the given name and type asynchronously. The 
number of concurrent rebalances permitted
+   * on this controller is configured by
+   * {@link 
org.apache.pinot.controller.ControllerConf#CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS}
+   *
+   * @param tableNameWithType name of the table to rebalance
+   * @param rebalanceConfig configuration for the rebalance operation
+   * @param rebalanceJobId ID of the rebalance job, which is used to track the 
progress of the rebalance operation
+   * @param trackRebalanceProgress whether to track rebalance progress stats 
in ZK
+   * @return a CompletableFuture that will complete with the result of the 
rebalance operation
+   * @throws TableNotFoundException if the table does not exist
+   */
+  public CompletableFuture<RebalanceResult> rebalanceTableAsync(String 
tableNameWithType,
+      RebalanceConfig rebalanceConfig, String rebalanceJobId, boolean 
trackRebalanceProgress)
+      throws TableNotFoundException, RebalanceInProgressException {
+    TableConfig tableConfig = 
_resourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      throw new TableNotFoundException("Failed to find table config for table: 
" + tableNameWithType);
+    }
+    if (!rebalanceConfig.isDryRun()) {
+      checkRebalanceJobInProgress(tableNameWithType);
+    }
+    return CompletableFuture.supplyAsync(
+        () -> {
+          try {
+            return rebalanceTable(tableNameWithType, rebalanceConfig, 
rebalanceJobId, trackRebalanceProgress);
+          } catch (TableNotFoundException e) {
+            // Should not happen since we already checked for table existence
+            throw new RuntimeException(e);
+          } catch (RebalanceInProgressException e) {
+            throw new RuntimeException(e);
+          }
+        },
+        _executorService);
+  }
+
+  /**
+   * Rebalance the table with the given name and type asynchronously. The 
number of concurrent rebalances permitted
+   * on this controller is configured by
+   * {@link 
org.apache.pinot.controller.ControllerConf#CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS}
+   *
+   * @param tableNameWithType name of the table to rebalance
+   * @param tableConfig configuration for the table to rebalance
+   * @param rebalanceJobId ID of the rebalance job, which is used to track the 
progress of the rebalance operation
+   * @param rebalanceConfig configuration for the rebalance operation
+   * @param zkBasedTableRebalanceObserver observer to track rebalance progress 
in ZK
+   * @return a CompletableFuture that will complete with the result of the 
rebalance operation
+   */
+  public CompletableFuture<RebalanceResult> rebalanceTableAsync(String 
tableNameWithType, TableConfig tableConfig,
+      String rebalanceJobId, RebalanceConfig rebalanceConfig,
+      @Nullable ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver)
+      throws RebalanceInProgressException {
+    if (!rebalanceConfig.isDryRun()) {
+      checkRebalanceJobInProgress(tableNameWithType);
+    }
+
+    return CompletableFuture.supplyAsync(
+        () -> {
+          try {
+            return rebalanceTable(tableNameWithType, tableConfig, 
rebalanceJobId, rebalanceConfig,
+                zkBasedTableRebalanceObserver);
+          } catch (RebalanceInProgressException e) {
+            throw new RuntimeException(e);
+          }
+        },
+        _executorService);
+  }
+
+  @VisibleForTesting
+  RebalanceResult rebalanceTable(String tableNameWithType, TableConfig 
tableConfig, String rebalanceJobId,
+      RebalanceConfig rebalanceConfig, @Nullable ZkBasedTableRebalanceObserver 
zkBasedTableRebalanceObserver)
+      throws RebalanceInProgressException {
+
+    if (!rebalanceConfig.isDryRun()) {
+      checkRebalanceJobInProgress(tableNameWithType);
+    }
+
+    Map<String, Set<String>> tierToSegmentsMap;
+    if (rebalanceConfig.isUpdateTargetTier()) {
+      tierToSegmentsMap = _resourceManager.updateTargetTier(rebalanceJobId, 
tableNameWithType, tableConfig);
+    } else {
+      tierToSegmentsMap = null;
+    }
+    TableRebalancer tableRebalancer =
+        new TableRebalancer(_resourceManager.getHelixZkManager(), 
zkBasedTableRebalanceObserver, _controllerMetrics,
+            _rebalancePreChecker, _tableSizeReader);
+
+    return tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
rebalanceJobId, tierToSegmentsMap);
+  }
+
+  /**
+   * Cancels ongoing rebalance jobs (if any) for the given table.
+   *
+   * @param tableNameWithType name of the table for which to cancel any 
ongoing rebalance job
+   * @return the list of job IDs that were cancelled
+   */
+  public List<String> cancelRebalance(String tableNameWithType) {
+    List<String> cancelledJobIds = new ArrayList<>();
+    boolean updated = _resourceManager.updateJobsForTable(tableNameWithType, 
ControllerJobType.TABLE_REBALANCE,
+        jobMetadata -> {
+          String jobId = jobMetadata.get(CommonConstants.ControllerJob.JOB_ID);
+          try {
+            String jobStatsInStr = 
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+            TableRebalanceProgressStats jobStats =
+                JsonUtils.stringToObject(jobStatsInStr, 
TableRebalanceProgressStats.class);
+            if (jobStats.getStatus() != RebalanceResult.Status.IN_PROGRESS) {
+              return;
+            }
+
+            LOGGER.info("Cancelling rebalance job: {} for table: {}", jobId, 
tableNameWithType);
+            jobStats.setStatus(RebalanceResult.Status.CANCELLED);
+            
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+                JsonUtils.objectToString(jobStats));
+            cancelledJobIds.add(jobId);
+          } catch (Exception e) {
+            LOGGER.error("Failed to cancel rebalance job: {} for table: {}", 
jobId, tableNameWithType, e);
+          }
+        });
+    LOGGER.info("Tried to cancel existing rebalance jobs for table: {} at best 
effort and done: {}", tableNameWithType,
+        updated);
+    return cancelledJobIds;
+  }
+
+  /**
+   * Gets the status of the rebalance job with the given ID.
+   *
+   * @param jobId ID of the rebalance job to get the status for
+   * @return response containing the status of the rebalance job
+   * @throws JsonProcessingException if there is an error processing the 
rebalance progress stats from ZK
+   * @throws NotFoundException if the rebalance job with the given ID does not 
exist
+   */
+  public ServerRebalanceJobStatusResponse getRebalanceStatus(String jobId)
+      throws JsonProcessingException {
+    Map<String, String> controllerJobZKMetadata =
+        _resourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobType.TABLE_REBALANCE);
+    if (controllerJobZKMetadata == null) {
+      LOGGER.warn("Rebalance job with ID: {} not found", jobId);
+      throw new NotFoundException("Rebalance job with ID: " + jobId + " not 
found");
+    }
+    ServerRebalanceJobStatusResponse serverRebalanceJobStatusResponse = new 
ServerRebalanceJobStatusResponse();
+    TableRebalanceProgressStats tableRebalanceProgressStats = 
JsonUtils.stringToObject(
+        
controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS),
+        TableRebalanceProgressStats.class);
+    
serverRebalanceJobStatusResponse.setTableRebalanceProgressStats(tableRebalanceProgressStats);
+
+    long timeSinceStartInSecs = 0L;
+    if (RebalanceResult.Status.DONE != 
tableRebalanceProgressStats.getStatus()) {
+      timeSinceStartInSecs = (System.currentTimeMillis() - 
tableRebalanceProgressStats.getStartTimeMs()) / 1000;
+    }
+    
serverRebalanceJobStatusResponse.setTimeElapsedSinceStartInSeconds(timeSinceStartInSecs);
+
+    String jobCtxInStr = 
controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT);
+    if (StringUtils.isNotEmpty(jobCtxInStr)) {
+      TableRebalanceContext jobCtx = JsonUtils.stringToObject(jobCtxInStr, 
TableRebalanceContext.class);
+      serverRebalanceJobStatusResponse.setTableRebalanceContext(jobCtx);
+    }
+    return serverRebalanceJobStatusResponse;
+  }
+
+  private void checkRebalanceJobInProgress(String tableNameWithType)
+      throws RebalanceInProgressException {
+    String rebalanceJobInProgress = rebalanceJobInProgress(tableNameWithType, 
_resourceManager.getPropertyStore());
+    if (rebalanceJobInProgress != null) {
+      String errorMsg = "Rebalance job is already in progress for table: " + 
tableNameWithType + ", jobId: "
+          + rebalanceJobInProgress + ". Please wait for the job to complete or 
cancel it before starting a new one.";
+      throw new RebalanceInProgressException(errorMsg);
+    }
+  }
+
+  /**
+   * Checks if there is an ongoing rebalance job for the given table.
+   *
+   * @param tableNameWithType name of the table to check for ongoing rebalance 
jobs
+   * @param propertyStore ZK property store to read the job metadata from
+   * @return jobId of the ongoing rebalance job if one exists, {@code null} 
otherwise
+   */
+  @Nullable
+  public static String rebalanceJobInProgress(String tableNameWithType, 
ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    // Get all jobMetadata for the given table with a single ZK read.
+    Map<String, Map<String, String>> allJobMetadataByJobId =
+        
ControllerZkHelixUtils.getAllControllerJobs(Collections.singleton(ControllerJobType.TABLE_REBALANCE),
+            jobMetadata -> tableNameWithType.equals(
+                
jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE)), 
propertyStore);
+
+    for (Map.Entry<String, Map<String, String>> entry : 
allJobMetadataByJobId.entrySet()) {
+      String jobId = entry.getKey();
+      Map<String, String> jobMetadata = entry.getValue();
+      String jobStatsInStr = 
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+
+      TableRebalanceProgressStats jobStats;
+      try {
+        jobStats = JsonUtils.stringToObject(jobStatsInStr, 
TableRebalanceProgressStats.class);
+      } catch (Exception e) {
+        // If the job stats cannot be parsed, let's assume that the job is not 
in progress.
+        continue;
+      }
+
+      if (jobStats.getStatus() == RebalanceResult.Status.IN_PROGRESS) {
+        return jobId;
+      }
+    }
+
+    return null;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 3bf80c9f60..cf3874ce2b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -30,6 +30,7 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
@@ -156,11 +157,7 @@ public class TableRebalancer {
       @Nullable ControllerMetrics controllerMetrics, @Nullable 
RebalancePreChecker rebalancePreChecker,
       @Nullable TableSizeReader tableSizeReader) {
     _helixManager = helixManager;
-    if (tableRebalanceObserver != null) {
-      _tableRebalanceObserver = tableRebalanceObserver;
-    } else {
-      _tableRebalanceObserver = new NoOpTableRebalanceObserver();
-    }
+    _tableRebalanceObserver = 
Objects.requireNonNullElseGet(tableRebalanceObserver, 
NoOpTableRebalanceObserver::new);
     _helixDataAccessor = helixManager.getHelixDataAccessor();
     _controllerMetrics = controllerMetrics;
     _rebalancePreChecker = rebalancePreChecker;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
index 042be1a93d..e1cfdfc833 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
@@ -25,10 +25,12 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.slf4j.Logger;
@@ -43,7 +45,7 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ZkBasedTableRebalanceObserver.class);
   private final String _tableNameWithType;
   private final String _rebalanceJobId;
-  private final PinotHelixResourceManager _pinotHelixResourceManager;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private final TableRebalanceProgressStats _tableRebalanceProgressStats;
   private final TableRebalanceContext _tableRebalanceContext;
   // These previous stats are used for rollback scenarios where the IdealState 
update fails due to a version
@@ -59,13 +61,13 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
   private final ControllerMetrics _controllerMetrics;
 
   public ZkBasedTableRebalanceObserver(String tableNameWithType, String 
rebalanceJobId,
-      TableRebalanceContext tableRebalanceContext, PinotHelixResourceManager 
pinotHelixResourceManager) {
+      TableRebalanceContext tableRebalanceContext, 
ZkHelixPropertyStore<ZNRecord> propertyStore) {
     Preconditions.checkState(tableNameWithType != null, "Table name cannot be 
null");
     Preconditions.checkState(rebalanceJobId != null, "rebalanceId cannot be 
null");
-    Preconditions.checkState(pinotHelixResourceManager != null, 
"PinotHelixManager cannot be null");
+    Preconditions.checkState(propertyStore != null, "ZkHelixPropertyStore 
cannot be null");
     _tableNameWithType = tableNameWithType;
     _rebalanceJobId = rebalanceJobId;
-    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _propertyStore = propertyStore;
     _tableRebalanceProgressStats = new TableRebalanceProgressStats();
     _tableRebalanceContext = tableRebalanceContext;
     _previousStepStats = new 
TableRebalanceProgressStats.RebalanceProgressStats();
@@ -279,8 +281,8 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
   private void trackStatsInZk() {
     Map<String, String> jobMetadata =
         createJobMetadata(_tableNameWithType, _rebalanceJobId, 
_tableRebalanceProgressStats, _tableRebalanceContext);
-    _pinotHelixResourceManager.addControllerJobToZK(_rebalanceJobId, 
jobMetadata, ControllerJobType.TABLE_REBALANCE,
-        prevJobMetadata -> {
+    ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, 
_rebalanceJobId, jobMetadata,
+        ControllerJobType.TABLE_REBALANCE, prevJobMetadata -> {
           // In addition to updating job progress status, the observer also 
checks if the job status is IN_PROGRESS.
           // If not, then no need to update the job status, and we keep this 
status to end the job promptly.
           if (prevJobMetadata == null) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
index 661fff70d9..369024faa6 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
@@ -29,20 +29,25 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pinot.common.exception.RebalanceInProgressException;
 import org.apache.pinot.common.exception.TableNotFoundException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class DefaultTenantRebalancer implements TenantRebalancer {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultTenantRebalancer.class);
-  PinotHelixResourceManager _pinotHelixResourceManager;
-  ExecutorService _executorService;
+  private final TableRebalanceManager _tableRebalanceManager;
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
+  private final ExecutorService _executorService;
 
-  public DefaultTenantRebalancer(PinotHelixResourceManager 
pinotHelixResourceManager, ExecutorService executorService) {
+  public DefaultTenantRebalancer(TableRebalanceManager tableRebalanceManager,
+      PinotHelixResourceManager pinotHelixResourceManager, ExecutorService 
executorService) {
+    _tableRebalanceManager = tableRebalanceManager;
     _pinotHelixResourceManager = pinotHelixResourceManager;
     _executorService = executorService;
   }
@@ -56,13 +61,13 @@ public class DefaultTenantRebalancer implements 
TenantRebalancer {
         RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
         rebalanceConfig.setDryRun(true);
         rebalanceResult.put(table,
-            _pinotHelixResourceManager.rebalanceTable(table, rebalanceConfig, 
createUniqueRebalanceJobIdentifier(),
-                false));
-      } catch (TableNotFoundException exception) {
+            _tableRebalanceManager.rebalanceTable(table, rebalanceConfig, 
createUniqueRebalanceJobIdentifier(), false));
+      } catch (TableNotFoundException | RebalanceInProgressException 
exception) {
         rebalanceResult.put(table, new RebalanceResult(null, 
RebalanceResult.Status.FAILED, exception.getMessage(),
             null, null, null, null, null));
       }
     });
+
     if (config.isDryRun()) {
       return new TenantRebalanceResult(null, rebalanceResult, 
config.isVerboseResult());
     } else {
@@ -198,7 +203,7 @@ public class DefaultTenantRebalancer implements 
TenantRebalancer {
       TenantRebalanceObserver observer) {
     try {
       
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_STARTED_TRIGGER, 
tableName, rebalanceJobId);
-      RebalanceResult result = 
_pinotHelixResourceManager.rebalanceTable(tableName, config, rebalanceJobId, 
true);
+      RebalanceResult result = 
_tableRebalanceManager.rebalanceTable(tableName, config, rebalanceJobId, true);
       if (result.getStatus().equals(RebalanceResult.Status.DONE)) {
         
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER, 
tableName, null);
       } else {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
index 17dd44378b..b2144260c0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
@@ -45,6 +45,7 @@ import 
org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
 import org.apache.pinot.controller.util.TableTierReader;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -65,6 +66,7 @@ import org.slf4j.LoggerFactory;
 public class SegmentRelocator extends ControllerPeriodicTask<Void> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentRelocator.class);
 
+  private final TableRebalanceManager _tableRebalanceManager;
   private final ExecutorService _executorService;
   private final HttpClientConnectionManager _connectionManager;
   private final boolean _enableLocalTierMigration;
@@ -87,12 +89,14 @@ public class SegmentRelocator extends 
ControllerPeriodicTask<Void> {
   @Nullable
   private final Set<String> _tablesUndergoingRebalance;
 
-  public SegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager,
-      LeadControllerManager leadControllerManager, ControllerConf config, 
ControllerMetrics controllerMetrics,
-      ExecutorService executorService, HttpClientConnectionManager 
connectionManager) {
+  public SegmentRelocator(TableRebalanceManager tableRebalanceManager,
+      PinotHelixResourceManager pinotHelixResourceManager, 
LeadControllerManager leadControllerManager,
+      ControllerConf config, ControllerMetrics controllerMetrics, 
ExecutorService executorService,
+      HttpClientConnectionManager connectionManager) {
     super(SegmentRelocator.class.getSimpleName(), 
config.getSegmentRelocatorFrequencyInSeconds(),
         config.getSegmentRelocatorInitialDelayInSeconds(), 
pinotHelixResourceManager, leadControllerManager,
         controllerMetrics);
+    _tableRebalanceManager = tableRebalanceManager;
     _executorService = executorService;
     _connectionManager = connectionManager;
     _enableLocalTierMigration = 
config.enableSegmentRelocatorLocalTierMigration();
@@ -145,9 +149,9 @@ public class SegmentRelocator extends 
ControllerPeriodicTask<Void> {
       } else {
         LOGGER.info("The previous rebalance has not yet completed, skip 
rebalancing table {}", tableNameWithType);
       }
-      return;
+    } else {
+      putTableToWait(tableNameWithType);
     }
-    putTableToWait(tableNameWithType);
   }
 
   @VisibleForTesting
@@ -225,7 +229,10 @@ public class SegmentRelocator extends 
ControllerPeriodicTask<Void> {
       // all segments are put on the right servers. If any segments are not on 
their target tier, the server local
       // tier migration is triggered for them, basically asking the hosting 
servers to reload them. The segment
       // target tier may get changed between the two sequential actions, but 
cluster states converge eventually.
-      RebalanceResult rebalance = 
_pinotHelixResourceManager.rebalanceTable(tableNameWithType, rebalanceConfig,
+
+      // We're not using the async rebalance API here because we want to run 
this on a separate thread pool from the
+      // rebalance thread pool that is used for user initiated rebalances.
+      RebalanceResult rebalance = 
_tableRebalanceManager.rebalanceTable(tableNameWithType, rebalanceConfig,
           TableRebalancer.createUniqueRebalanceJobIdentifier(), false);
       switch (rebalance.getStatus()) {
         case NO_OP:
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtils.java
new file mode 100644
index 0000000000..f5f7ee12e8
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtils.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.util;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.helix.AccessOption;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.zookeeper.data.Stat;
+
+
+public class ControllerZkHelixUtils {
+
+  private ControllerZkHelixUtils() {
+    // Utility class
+  }
+
+  /**
+   * Adds a new job metadata entry for a controller job like table rebalance 
or segment reload into ZK
+   *
+   * @param propertyStore the ZK property store to write to
+   * @param jobId job's UUID
+   * @param jobMetadata the job metadata
+   * @param jobType the type of the job to figure out where the job metadata 
is kept in ZK
+   * @param prevJobMetadataChecker to check the previous job metadata before 
adding new one
+   * @return boolean representing success / failure of the ZK write step
+   */
+  public static boolean addControllerJobToZK(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String jobId,
+      Map<String, String> jobMetadata, String jobType, Predicate<Map<String, 
String>> prevJobMetadataChecker) {
+    
Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)
 != null,
+        CommonConstants.ControllerJob.SUBMISSION_TIME_MS
+            + " in JobMetadata record not set. Cannot expire these records");
+    String jobResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
+    Stat stat = new Stat();
+    ZNRecord jobsZnRecord = propertyStore.get(jobResourcePath, stat, 
AccessOption.PERSISTENT);
+    if (jobsZnRecord != null) {
+      Map<String, Map<String, String>> jobMetadataMap = 
jobsZnRecord.getMapFields();
+      Map<String, String> prevJobMetadata = jobMetadataMap.get(jobId);
+      if (!prevJobMetadataChecker.test(prevJobMetadata)) {
+        return false;
+      }
+      jobMetadataMap.put(jobId, jobMetadata);
+      if (jobMetadataMap.size() > 
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) {
+        jobMetadataMap = jobMetadataMap.entrySet().stream().sorted((v1, v2) -> 
Long.compare(
+                
Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)),
+                
Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS))))
+            .collect(Collectors.toList()).subList(0, 
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK)
+            .stream().collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+      }
+      jobsZnRecord.setMapFields(jobMetadataMap);
+      return propertyStore.set(jobResourcePath, jobsZnRecord, 
stat.getVersion(), AccessOption.PERSISTENT);
+    } else {
+      jobsZnRecord = new ZNRecord(jobResourcePath);
+      jobsZnRecord.setMapField(jobId, jobMetadata);
+      return propertyStore.set(jobResourcePath, jobsZnRecord, 
AccessOption.PERSISTENT);
+    }
+  }
+
+  /**
+   * Get all controller jobs from ZK for a given set of job types.
+   * @param jobTypes the set of job types to filter
+   * @param jobMetadataChecker a predicate to filter the job metadata
+   * @param propertyStore the ZK property store to read from
+   * @return a map of jobId to job metadata for all the jobs that match the 
given job types and metadata checker
+   */
+  public static Map<String, Map<String, String>> 
getAllControllerJobs(Set<String> jobTypes,
+      Predicate<Map<String, String>> jobMetadataChecker, 
ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    Map<String, Map<String, String>> controllerJobs = new HashMap<>();
+    for (String jobType : jobTypes) {
+      String jobResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
+      ZNRecord jobsZnRecord = propertyStore.get(jobResourcePath, null, 
AccessOption.PERSISTENT);
+      if (jobsZnRecord == null) {
+        continue;
+      }
+      Map<String, Map<String, String>> jobMetadataMap = 
jobsZnRecord.getMapFields();
+      for (Map.Entry<String, Map<String, String>> jobMetadataEntry : 
jobMetadataMap.entrySet()) {
+        String jobId = jobMetadataEntry.getKey();
+        Map<String, String> jobMetadata = jobMetadataEntry.getValue();
+        
Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.JOB_TYPE).equals(jobType),
+            "Got unexpected jobType: %s at jobResourcePath: %s with jobId: 
%s", jobType, jobResourcePath, jobId);
+        if (jobMetadataChecker.test(jobMetadata)) {
+          controllerJobs.put(jobId, jobMetadata);
+        }
+      }
+    }
+    return controllerJobs;
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 55e0bb1772..176cec96ae 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -34,6 +34,7 @@ import javax.annotation.Nullable;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hc.client5.http.entity.EntityBuilder;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
@@ -69,6 +70,8 @@ import 
org.apache.pinot.controller.api.access.AllowAllAccessFactory;
 import org.apache.pinot.controller.api.resources.PauseStatusDetails;
 import org.apache.pinot.controller.api.resources.TableViews;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
+import org.apache.pinot.controller.util.TableSizeReader;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.spi.config.table.QueryConfig;
 import org.apache.pinot.spi.config.table.QuotaConfig;
@@ -158,6 +161,8 @@ public class ControllerTest {
   protected HelixDataAccessor _helixDataAccessor;
   protected HelixAdmin _helixAdmin;
   protected ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  protected TableRebalanceManager _tableRebalanceManager;
+  protected TableSizeReader _tableSizeReader;
 
   /**
    * Acquire the {@link ControllerTest} default instance that can be shared 
across different test cases.
@@ -303,6 +308,8 @@ public class ControllerTest {
       _controllerDataDir = _controllerConfig.getDataDir();
       _helixResourceManager = _controllerStarter.getHelixResourceManager();
       _helixManager = _controllerStarter.getHelixControllerManager();
+      _tableRebalanceManager = _controllerStarter.getTableRebalanceManager();
+      _tableSizeReader = _controllerStarter.getTableSizeReader();
       _helixDataAccessor = _helixManager.getHelixDataAccessor();
       ConfigAccessor configAccessor = _helixManager.getConfigAccessor();
       // HelixResourceManager is null in Helix only mode, while HelixManager 
is null in Pinot only mode.
@@ -1028,6 +1035,24 @@ public class ControllerTest {
     }
   }
 
+  /**
+   * Sends a POST request to the specified URL with the given payload and 
returns the status code along with the
+   * stringified response.
+   * @param urlString the URL to send the POST request to
+   * @param payload the payload to send in the POST request
+   * @return a Pair containing the status code and the stringified response
+   */
+  public static Pair<Integer, String> postRequestWithStatusCode(String 
urlString, String payload)
+      throws IOException {
+    try {
+      SimpleHttpResponse resp =
+          getHttpClient().sendJsonPostRequest(new URL(urlString).toURI(), 
payload, Collections.emptyMap());
+      return Pair.of(resp.getStatusCode(), constructResponse(resp));
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
   public static String sendPostRequestRaw(String urlString, String payload, 
Map<String, String> headers)
       throws IOException {
     try {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
index 22663fa333..d88b1319c1 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.AccessOption;
@@ -49,10 +50,7 @@ import org.testng.annotations.Test;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
@@ -248,14 +246,21 @@ public class RebalanceCheckerTest {
     PinotHelixResourceManager helixManager = 
mock(PinotHelixResourceManager.class);
     when(helixManager.getTableConfig(tableName)).thenReturn(tableConfig);
     when(helixManager.getAllJobs(any(), any())).thenReturn(allJobMetadata);
-    RebalanceChecker checker = new RebalanceChecker(helixManager, 
leadController, cfg, metrics, exec);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
+    when(helixManager.getPropertyStore()).thenReturn(propertyStore);
+    TableRebalanceManager tableRebalanceManager = 
mock(TableRebalanceManager.class);
+    when(tableRebalanceManager.rebalanceTableAsync(anyString(), 
any(TableConfig.class), anyString(),
+        any(RebalanceConfig.class), 
any(ZkBasedTableRebalanceObserver.class))).thenReturn(
+        CompletableFuture.completedFuture(null));
+    RebalanceChecker checker = new RebalanceChecker(tableRebalanceManager, 
helixManager, leadController, cfg, metrics);
     // Although job1_3 was submitted most recently but job1 had exceeded 
maxAttempts. Chose job3 to retry, which got
     // stuck at in progress status.
     checker.retryRebalanceTable(tableName, allJobMetadata);
     // The new retry job is for job3 and attemptId is increased to 2.
     ArgumentCaptor<ZkBasedTableRebalanceObserver> observerCaptor =
         ArgumentCaptor.forClass(ZkBasedTableRebalanceObserver.class);
-    verify(helixManager, times(1)).rebalanceTable(eq(tableName), any(), 
anyString(), any(), observerCaptor.capture());
+    verify(tableRebalanceManager, times(1)).rebalanceTableAsync(eq(tableName), 
any(), anyString(), any(),
+        observerCaptor.capture());
     ZkBasedTableRebalanceObserver observer = observerCaptor.getValue();
     jobCtx = observer.getTableRebalanceContext();
     assertEquals(jobCtx.getOriginalJobId(), "job3");
@@ -286,12 +291,19 @@ public class RebalanceCheckerTest {
     PinotHelixResourceManager helixManager = 
mock(PinotHelixResourceManager.class);
     TableConfig tableConfig = mock(TableConfig.class);
     when(helixManager.getTableConfig(tableName)).thenReturn(tableConfig);
-    RebalanceChecker checker = new RebalanceChecker(helixManager, 
leadController, cfg, metrics, exec);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
+    when(helixManager.getPropertyStore()).thenReturn(propertyStore);
+    TableRebalanceManager tableRebalanceManager = 
mock(TableRebalanceManager.class);
+    when(tableRebalanceManager.rebalanceTableAsync(anyString(), 
any(TableConfig.class), anyString(),
+        any(RebalanceConfig.class), 
any(ZkBasedTableRebalanceObserver.class))).thenReturn(
+        CompletableFuture.completedFuture(null));
+    RebalanceChecker checker = new RebalanceChecker(tableRebalanceManager, 
helixManager, leadController, cfg, metrics);
     checker.retryRebalanceTable(tableName, allJobMetadata);
     // Retry for job1 is delayed with 5min backoff.
     ArgumentCaptor<ZkBasedTableRebalanceObserver> observerCaptor =
         ArgumentCaptor.forClass(ZkBasedTableRebalanceObserver.class);
-    verify(helixManager, times(0)).rebalanceTable(eq(tableName), any(), 
anyString(), any(), observerCaptor.capture());
+    verify(tableRebalanceManager, never()).rebalanceTable(eq(tableName), 
any(), anyString(), any(),
+        observerCaptor.capture());
 
     // Set initial delay to 0 to disable retry backoff.
     jobCfg.setRetryInitialDelayInMs(0);
@@ -300,7 +312,8 @@ public class RebalanceCheckerTest {
     checker.retryRebalanceTable(tableName, allJobMetadata);
     // Retry for job1 is delayed with 0 backoff.
     observerCaptor = 
ArgumentCaptor.forClass(ZkBasedTableRebalanceObserver.class);
-    verify(helixManager, times(1)).rebalanceTable(eq(tableName), any(), 
anyString(), any(), observerCaptor.capture());
+    verify(tableRebalanceManager, times(1)).rebalanceTableAsync(eq(tableName), 
any(), anyString(), any(),
+        observerCaptor.capture());
   }
 
   @Test
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 64cbff92c0..64b8a6e5f5 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -128,7 +128,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
       DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
       preChecker.init(_helixResourceManager, executorService, 1);
       TableRebalancer tableRebalancer =
-          new TableRebalancer(_helixManager, null, null, preChecker, 
_helixResourceManager.getTableSizeReader());
+          new TableRebalancer(_helixManager, null, null, preChecker, 
_tableSizeReader);
       TableConfig tableConfig =
           new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
 
@@ -699,8 +699,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
       ExecutorService executorService = Executors.newFixedThreadPool(10);
       DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
       preChecker.init(_helixResourceManager, executorService, 1);
-      TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, 
null, null, preChecker,
-          _helixResourceManager.getTableSizeReader());
+      TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, 
null, null, preChecker, _tableSizeReader);
       // Set up the table with 1 replication factor and strict replica group 
enabled
       TableConfig tableConfig =
           new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(1)
@@ -982,8 +981,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     ExecutorService executorService = Executors.newFixedThreadPool(10);
     DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
     preChecker.init(_helixResourceManager, executorService, 1);
-    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, 
null, preChecker,
-        _helixResourceManager.getTableSizeReader());
+    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, 
null, preChecker, _tableSizeReader);
     // Set up the table with 1 replication factor and strict replica group 
enabled
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(1)
@@ -1042,7 +1040,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
     preChecker.init(_helixResourceManager, executorService, 0.5);
     TableRebalancer tableRebalancer =
-        new TableRebalancer(_helixManager, null, null, preChecker, 
_helixResourceManager.getTableSizeReader());
+        new TableRebalancer(_helixManager, null, null, preChecker, 
_tableSizeReader);
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
 
@@ -1142,7 +1140,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
     preChecker.init(_helixResourceManager, executorService, 0.5);
     TableRebalancer tableRebalancer =
-        new TableRebalancer(_helixManager, null, null, preChecker, 
_helixResourceManager.getTableSizeReader());
+        new TableRebalancer(_helixManager, null, null, preChecker, 
_tableSizeReader);
     TableConfig tableConfig =
         new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
             .setNumReplicas(2)
@@ -1557,7 +1555,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     ExecutorService executorService = Executors.newFixedThreadPool(10);
     preChecker.init(_helixResourceManager, executorService, 1);
     TableRebalancer tableRebalancer =
-        new TableRebalancer(_helixManager, null, null, preChecker, 
_helixResourceManager.getTableSizeReader());
+        new TableRebalancer(_helixManager, null, null, preChecker, 
_tableSizeReader);
 
     // Try dry-run summary mode
     RebalanceConfig rebalanceConfig = new RebalanceConfig();
@@ -2114,7 +2112,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
 
     ConsumingSegmentInfoReader mockConsumingSegmentInfoReader = 
Mockito.mock(ConsumingSegmentInfoReader.class);
     TableRebalancer tableRebalancerOriginal =
-        new TableRebalancer(_helixManager, null, null, null, 
_helixResourceManager.getTableSizeReader());
+        new TableRebalancer(_helixManager, null, null, null, _tableSizeReader);
     TableConfig tableConfig =
         new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
             .setNumReplicas(numReplica)
@@ -2233,9 +2231,8 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
       addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
     }
 
-    ConsumingSegmentInfoReader mockConsumingSegmentInfoReader = 
Mockito.mock(ConsumingSegmentInfoReader.class);
     TableRebalancer tableRebalancerOriginal =
-        new TableRebalancer(_helixManager, null, null, null, 
_helixResourceManager.getTableSizeReader());
+        new TableRebalancer(_helixManager, null, null, null, _tableSizeReader);
     TableConfig tableConfig =
         new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
             .setNumReplicas(numReplica)
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
index b0d86702e9..f8db37a5aa 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
@@ -24,9 +24,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
 import org.testng.annotations.Test;
 
@@ -35,6 +36,8 @@ import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.Segmen
 import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE;
 import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
@@ -45,14 +48,15 @@ import static org.testng.Assert.assertTrue;
 public class TestZkBasedTableRebalanceObserver {
   @Test
   void testZkObserverProgressStats() {
-    PinotHelixResourceManager pinotHelixResourceManager = 
mock(PinotHelixResourceManager.class);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
     // Mocking this. We will verify using numZkUpdate stat
-    when(pinotHelixResourceManager.addControllerJobToZK(any(), any(), 
any())).thenReturn(true);
+    when(propertyStore.set(anyString(), any(), anyInt())).thenReturn(true);
+
     ControllerMetrics controllerMetrics = ControllerMetrics.get();
     TableRebalanceContext retryCtx = new TableRebalanceContext();
     retryCtx.setConfig(new RebalanceConfig());
     ZkBasedTableRebalanceObserver observer =
-        new ZkBasedTableRebalanceObserver("dummy", "dummyId", retryCtx, 
pinotHelixResourceManager);
+        new ZkBasedTableRebalanceObserver("dummy", "dummyId", retryCtx, 
propertyStore);
     Map<String, Map<String, String>> source = new TreeMap<>();
     Map<String, Map<String, String>> target = new TreeMap<>();
     Map<String, Map<String, String>> targetIntermediate = new TreeMap<>();
@@ -206,14 +210,15 @@ public class TestZkBasedTableRebalanceObserver {
   // This is a test to verify if Zk stats are pushed out correctly
   @Test
   void testZkObserverTracking() {
-    PinotHelixResourceManager pinotHelixResourceManager = 
mock(PinotHelixResourceManager.class);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
     // Mocking this. We will verify using numZkUpdate stat
-    when(pinotHelixResourceManager.addControllerJobToZK(any(), any(), 
any())).thenReturn(true);
+    when(propertyStore.set(anyString(), any(), anyInt())).thenReturn(true);
+
     ControllerMetrics controllerMetrics = ControllerMetrics.get();
     TableRebalanceContext retryCtx = new TableRebalanceContext();
     retryCtx.setConfig(new RebalanceConfig());
     ZkBasedTableRebalanceObserver observer =
-        new ZkBasedTableRebalanceObserver("dummy", "dummyId", retryCtx, 
pinotHelixResourceManager);
+        new ZkBasedTableRebalanceObserver("dummy", "dummyId", retryCtx, 
propertyStore);
     Map<String, Map<String, String>> source = new TreeMap<>();
     Map<String, Map<String, String>> target = new TreeMap<>();
     target.put("segment1",
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
index 76189e9268..abac12b974 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
@@ -74,7 +74,8 @@ public class TenantRebalancerTest extends ControllerTest {
       addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + 
i, true);
     }
 
-    TenantRebalancer tenantRebalancer = new 
DefaultTenantRebalancer(_helixResourceManager, _executorService);
+    TenantRebalancer tenantRebalancer =
+        new DefaultTenantRebalancer(_tableRebalanceManager, 
_helixResourceManager, _executorService);
 
     // tag all servers and brokers to test tenant
     addTenantTagToInstances(TENANT_NAME);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
index 3c08b79d92..e6bbcdd026 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
@@ -37,6 +37,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
 import org.apache.pinot.controller.util.TableTierReader;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.util.TestUtils;
@@ -119,8 +120,9 @@ public class SegmentRelocatorTest {
     ControllerConf conf = mock(ControllerConf.class);
     
when(conf.isSegmentRelocatorRebalanceTablesSequentially()).thenReturn(true);
     SegmentRelocator relocator =
-        new SegmentRelocator(mock(PinotHelixResourceManager.class), 
mock(LeadControllerManager.class), conf,
-            mock(ControllerMetrics.class), mock(ExecutorService.class), 
mock(HttpClientConnectionManager.class));
+        new SegmentRelocator(mock(TableRebalanceManager.class), 
mock(PinotHelixResourceManager.class),
+            mock(LeadControllerManager.class), conf, 
mock(ControllerMetrics.class), mock(ExecutorService.class),
+            mock(HttpClientConnectionManager.class));
     int cnt = 10;
     Random random = new Random();
     for (int i = 0; i < cnt; i++) {
@@ -150,8 +152,9 @@ public class SegmentRelocatorTest {
     ControllerConf conf = mock(ControllerConf.class);
     
when(conf.isSegmentRelocatorRebalanceTablesSequentially()).thenReturn(true);
     SegmentRelocator relocator =
-        new SegmentRelocator(mock(PinotHelixResourceManager.class), 
mock(LeadControllerManager.class), conf,
-            mock(ControllerMetrics.class), mock(ExecutorService.class), 
mock(HttpClientConnectionManager.class));
+        new SegmentRelocator(mock(TableRebalanceManager.class), 
mock(PinotHelixResourceManager.class),
+            mock(LeadControllerManager.class), conf, 
mock(ControllerMetrics.class), mock(ExecutorService.class),
+            mock(HttpClientConnectionManager.class));
     ExecutorService runner = Executors.newCachedThreadPool();
     Random random = new Random();
     int cnt = 10;
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
index 2f51fd6250..11f16e1e53 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
@@ -26,7 +26,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
 import org.apache.pinot.common.tier.TierFactory;
 import org.apache.pinot.common.utils.SimpleHttpResponse;
 import org.apache.pinot.common.utils.config.TagNameUtils;
@@ -37,9 +40,13 @@ import org.apache.pinot.common.utils.regex.Pattern;
 import 
org.apache.pinot.controller.api.resources.ServerReloadControllerJobStatusResponse;
 import 
org.apache.pinot.controller.helix.core.rebalance.DefaultRebalancePreChecker;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
 import 
org.apache.pinot.controller.helix.core.rebalance.RebalancePreCheckerResult;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceSummaryResult;
+import 
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
+import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils;
 import org.apache.pinot.server.starter.helix.BaseServerStarter;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -54,9 +61,11 @@ import 
org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.Enablement;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.StringUtil;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -1258,6 +1267,55 @@ public class TableRebalanceIntegrationTest extends 
BaseHybridClusterIntegrationT
     assertEquals(numServersUnchanged, 
summaryResult.getServerInfo().getServersUnchanged().size());
   }
 
+  @Test
+  public void testDisallowMultipleConcurrentRebalancesOnSameTable() throws 
Exception {
+    // Manually write an IN_PROGRESS rebalance job to ZK instead of trying to 
collide multiple actual rebalance
+    // attempts which will be prone to race conditions and cause this test to 
be flaky. We only reject a rebalance job
+    // if there is an IN_PROGRESS rebalance job for the same table in ZK, so 
we could actually end up with more than
+    // one active rebalance job if both are started at the exact same time 
since the progress stats are written to ZK
+    // after some initial pre-checks are done. However, rebalances are 
idempotent, and we don't actually care too much
+    // about avoiding this edge case race condition as long as in most cases 
we are able to prevent users from
+    // triggering a rebalance for a table that already has an in-progress 
rebalance job.
+    String jobId = TableRebalancer.createUniqueRebalanceJobIdentifier();
+    String tableNameWithType = 
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(getTableName());
+    TableRebalanceProgressStats progressStats = new 
TableRebalanceProgressStats();
+    progressStats.setStatus(RebalanceResult.Status.IN_PROGRESS);
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, 
tableNameWithType);
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
+    jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(System.currentTimeMillis()));
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.TABLE_REBALANCE);
+    
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+        JsonUtils.objectToString(progressStats));
+    ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, jobId, 
jobMetadata, ControllerJobType.TABLE_REBALANCE,
+        prevJobMetadata -> true);
+
+    // Add a new server (to force change in instance assignment) and enable 
reassignInstances to ensure that the
+    // rebalance is not a NO_OP
+    BaseServerStarter serverStarter = startOneServer(NUM_SERVERS);
+    createServerTenant(getServerTenant(), 1, 0);
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setReassignInstances(true);
+
+    Pair<Integer, String> response =
+        postRequestWithStatusCode(getRebalanceUrl(rebalanceConfig, 
TableType.OFFLINE), null);
+    assertEquals(response.getLeft(), Response.Status.CONFLICT.getStatusCode());
+    assertTrue(response.getRight().contains("Rebalance job is already in 
progress for table"));
+
+    // Update the job status to DONE to allow other tests to run
+    progressStats.setStatus(RebalanceResult.Status.DONE);
+    
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+        JsonUtils.objectToString(progressStats));
+    ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, jobId, 
jobMetadata, ControllerJobType.TABLE_REBALANCE,
+        prevJobMetadata -> true);
+
+    // Stop the added server
+    serverStarter.stop();
+    TestUtils.waitForCondition(
+        aVoid -> 
getHelixResourceManager().dropInstance(serverStarter.getInstanceId()).isSuccessful(),
+        60_000L, "Failed to drop added server");
+  }
+
   private String getReloadJobIdFromResponse(String response) {
     Pattern pattern = new 
JavaUtilPattern("([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
     Matcher matcher = pattern.matcher(response);
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
index 39ccd02602..f935a22ba7 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
@@ -22,7 +22,10 @@ import com.google.common.base.Preconditions;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
+import 
org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObserver;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.utils.Enablement;
 
@@ -55,7 +58,22 @@ public class PinotTableRebalancer extends PinotZKChanger {
   public RebalanceResult rebalance(String tableNameWithType) {
     TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
     Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: " + tableNameWithType);
-    return new TableRebalancer(_helixManager).rebalance(tableConfig, 
_rebalanceConfig,
-        TableRebalancer.createUniqueRebalanceJobIdentifier());
+
+    String jobId = TableRebalancer.createUniqueRebalanceJobIdentifier();
+
+    if (!_rebalanceConfig.isDryRun()) {
+      String rebalanceJobInProgress = 
TableRebalanceManager.rebalanceJobInProgress(tableNameWithType, _propertyStore);
+      if (rebalanceJobInProgress != null) {
+        String errorMsg = "Rebalance job is already in progress for table: " + 
tableNameWithType + ", jobId: "
+            + rebalanceJobInProgress + ". Please wait for the job to complete 
or cancel it before starting a new one.";
+        return new RebalanceResult(jobId, RebalanceResult.Status.FAILED, 
errorMsg, null, null, null, null, null);
+      }
+    }
+
+    ZkBasedTableRebalanceObserver rebalanceObserver = new 
ZkBasedTableRebalanceObserver(tableNameWithType, jobId,
+        TableRebalanceContext.forInitialAttempt(jobId, _rebalanceConfig), 
_propertyStore);
+
+    return new TableRebalancer(_helixManager, rebalanceObserver, null, null, 
null)
+        .rebalance(tableConfig, _rebalanceConfig, jobId);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to