This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ed5e306c0fd Add TenantRebalanceChecker for improved tenant rebalance 
monitoring (#16455)
ed5e306c0fd is described below

commit ed5e306c0fd006c0d52e993d4f7c5cb2b11dcab9
Author: Jhow <[email protected]>
AuthorDate: Sat Aug 30 00:41:51 2025 -0700

    Add TenantRebalanceChecker for improved tenant rebalance monitoring (#16455)
---
 .../pinot/controller/BaseControllerStarter.java    |  19 +-
 .../apache/pinot/controller/ControllerConf.java    |  18 +
 .../helix/core/PinotHelixResourceManager.java      |   1 +
 .../rebalance/tenant/DefaultTenantRebalancer.java  | 288 -----------
 .../rebalance/tenant/TenantRebalanceChecker.java   | 330 ++++++++++++
 .../rebalance/tenant/TenantRebalanceContext.java   | 130 +++++
 .../tenant/TenantRebalanceProgressStats.java       |  14 +-
 .../core/rebalance/tenant/TenantRebalancer.java    | 332 +++++++++++-
 .../tenant/ZkBasedTenantRebalanceObserver.java     |  61 ++-
 ...ControllerPeriodicTaskStarterStatelessTest.java |   2 +-
 .../tenant/TenantRebalanceCheckerTest.java         | 556 +++++++++++++++++++++
 .../rebalance/tenant/TenantRebalancerTest.java     |  20 +-
 12 files changed, 1451 insertions(+), 320 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 8ea4efd2113..8a9a8c7817a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -107,7 +107,7 @@ import 
org.apache.pinot.controller.helix.core.rebalance.RebalanceChecker;
 import org.apache.pinot.controller.helix.core.rebalance.RebalancePreChecker;
 import 
org.apache.pinot.controller.helix.core.rebalance.RebalancePreCheckerFactory;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
-import 
org.apache.pinot.controller.helix.core.rebalance.tenant.DefaultTenantRebalancer;
+import 
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceChecker;
 import 
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalancer;
 import org.apache.pinot.controller.helix.core.relocation.SegmentRelocator;
 import org.apache.pinot.controller.helix.core.retention.RetentionManager;
@@ -584,7 +584,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
         new TableRebalanceManager(_helixResourceManager, _controllerMetrics, 
_rebalancePreChecker, _tableSizeReader,
             _rebalancerExecutorService);
     _tenantRebalancer =
-        new DefaultTenantRebalancer(_tableRebalanceManager, 
_helixResourceManager, _rebalancerExecutorService);
+        new TenantRebalancer(_tableRebalanceManager, _helixResourceManager, 
_rebalancerExecutorService);
 
     // Setting up periodic tasks
     List<PeriodicTask> controllerPeriodicTasks = 
setupControllerPeriodicTasks();
@@ -930,6 +930,9 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     PeriodicTask resourceUtilizationChecker = new 
ResourceUtilizationChecker(_config, _connectionManager,
         _controllerMetrics, _utilizationCheckers, _executorService, 
_helixResourceManager);
     periodicTasks.add(resourceUtilizationChecker);
+    PeriodicTask tenantRebalanceChecker =
+        new TenantRebalanceChecker(_config, _helixResourceManager, 
_tenantRebalancer);
+    periodicTasks.add(tenantRebalanceChecker);
 
     return periodicTasks;
   }
@@ -943,12 +946,16 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     LOGGER.info("Creating TaskManager with class: {}", taskManagerClass);
     try {
       return PluginManager.get().createInstance(taskManagerClass,
-          new Class[]{PinotHelixTaskResourceManager.class, 
PinotHelixResourceManager.class, LeadControllerManager.class,
+          new Class[]{
+              PinotHelixTaskResourceManager.class, 
PinotHelixResourceManager.class, LeadControllerManager.class,
               ControllerConf.class, ControllerMetrics.class, 
TaskManagerStatusCache.class,
-              Executor.class, PoolingHttpClientConnectionManager.class, 
ResourceUtilizationManager.class},
-          new Object[]{_helixTaskResourceManager, _helixResourceManager, 
_leadControllerManager,
+              Executor.class, PoolingHttpClientConnectionManager.class, 
ResourceUtilizationManager.class
+          },
+          new Object[]{
+              _helixTaskResourceManager, _helixResourceManager, 
_leadControllerManager,
               _config, _controllerMetrics, _taskManagerStatusCache, 
_executorService,
-              _connectionManager, _resourceUtilizationManager});
+              _connectionManager, _resourceUtilizationManager
+          });
     } catch (Exception e) {
       LOGGER.error("Failed to create task manager with class: {}", 
taskManagerClass, e);
       throw new RuntimeException("Failed to create task manager with class: " 
+ taskManagerClass, e);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index c90f2617363..1a849cab307 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -242,6 +242,10 @@ public class ControllerConf extends PinotConfiguration {
         "controller.segmentRelocator.initialDelayInSeconds";
     public static final String REBALANCE_CHECKER_INITIAL_DELAY_IN_SECONDS =
         "controller.rebalanceChecker.initialDelayInSeconds";
+    public static final String TENANT_REBALANCE_CHECKER_FREQUENCY_PERIOD =
+        "controller.tenant.rebalance.checker.frequencyPeriod";
+    public static final String 
TENANT_REBALANCE_CHECKER_INITIAL_DELAY_IN_SECONDS =
+        "controller.tenant.rebalance.checker.initialDelayInSeconds";
 
     // The flag to indicate if controller periodic job will fix the missing 
LLC segment deep store copy.
     // Default value is false.
@@ -278,6 +282,7 @@ public class ControllerConf extends PinotConfiguration {
     public static final int 
DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
     public static final int DEFAULT_STATUS_CHECKER_FREQUENCY_IN_SECONDS = 5 * 
60; // 5 minutes
     public static final int DEFAULT_REBALANCE_CHECKER_FREQUENCY_IN_SECONDS = 5 
* 60; // 5 minutes
+    public static final int 
DEFAULT_TENANT_REBALANCE_CHECKER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes
     public static final int DEFAULT_TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS 
= 5 * 60; // 5 minutes
     public static final int 
DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes
     public static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; // 
Disabled
@@ -732,6 +737,19 @@ public class ControllerConf extends PinotConfiguration {
         ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
   }
 
+  public int getTenantRebalanceCheckerFrequencyInSeconds() {
+    return 
Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.TENANT_REBALANCE_CHECKER_FREQUENCY_PERIOD))
+        .filter(period -> isValidPeriodWithLogging(
+            
ControllerPeriodicTasksConf.TENANT_REBALANCE_CHECKER_FREQUENCY_PERIOD, period))
+        .map(period -> (int) convertPeriodToSeconds(period))
+        
.orElse(ControllerPeriodicTasksConf.DEFAULT_TENANT_REBALANCE_CHECKER_FREQUENCY_IN_SECONDS);
+  }
+
+  public long getTenantRebalanceCheckerInitialDelayInSeconds() {
+    return 
getProperty(ControllerPeriodicTasksConf.TENANT_REBALANCE_CHECKER_INITIAL_DELAY_IN_SECONDS,
+        ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
+  }
+
   public int getRealtimeConsumerMonitorRunFrequency() {
     return 
Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.RT_CONSUMER_MONITOR_FREQUENCY_PERIOD))
         .map(period -> (int) convertPeriodToSeconds(period))
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 0c415a3a48c..97e2cb0a908 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2482,6 +2482,7 @@ public class PinotHelixResourceManager {
    * @param jobId job's UUID
    * @param jobMetadata the job metadata
    * @param jobType the type of the job to figure out where job metadata is 
kept in ZK
+   * @param prevJobMetadataChecker an additional check to see if there's a 
need to update
    * @return boolean representing success / failure of the ZK write step
    */
   public boolean addControllerJobToZK(String jobId, Map<String, String> 
jobMetadata, ControllerJobType jobType,
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
deleted file mode 100644
index 749afc72e2f..00000000000
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core.rebalance.tenant;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.annotation.Nullable;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pinot.common.exception.TableNotFoundException;
-import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
-import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
-import org.apache.pinot.controller.helix.core.rebalance.RebalanceSummaryResult;
-import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
-import org.apache.pinot.segment.local.utils.TableConfigUtils;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class DefaultTenantRebalancer implements TenantRebalancer {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultTenantRebalancer.class);
-  private final TableRebalanceManager _tableRebalanceManager;
-  private final PinotHelixResourceManager _pinotHelixResourceManager;
-  private final ExecutorService _executorService;
-
-  public DefaultTenantRebalancer(TableRebalanceManager tableRebalanceManager,
-      PinotHelixResourceManager pinotHelixResourceManager, ExecutorService 
executorService) {
-    _tableRebalanceManager = tableRebalanceManager;
-    _pinotHelixResourceManager = pinotHelixResourceManager;
-    _executorService = executorService;
-  }
-
-  @Override
-  public TenantRebalanceResult rebalance(TenantRebalanceConfig config) {
-    Map<String, RebalanceResult> dryRunResults = new HashMap<>();
-
-    // Step 1: Select the tables to include in this rebalance operation
-
-    Set<String> tables = getTenantTables(config.getTenantName());
-    Set<String> includeTables = config.getIncludeTables();
-    if (!includeTables.isEmpty()) {
-      tables.retainAll(includeTables);
-    }
-    tables.removeAll(config.getExcludeTables());
-
-    // Step 2: Dry-run over the selected tables to get the dry-run rebalance 
results. The result is to be sent as
-    // response to the user, and their summaries are needed for scheduling the 
job queue later
-
-    tables.forEach(table -> {
-      try {
-        RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
-        rebalanceConfig.setDryRun(true);
-        dryRunResults.put(table,
-            _tableRebalanceManager.rebalanceTableDryRun(table, 
rebalanceConfig, createUniqueRebalanceJobIdentifier()));
-      } catch (TableNotFoundException exception) {
-        dryRunResults.put(table, new RebalanceResult(null, 
RebalanceResult.Status.FAILED, exception.getMessage(),
-            null, null, null, null, null));
-      }
-    });
-
-    // If dry-run was set, return the dry-run results and the job is done here
-    if (config.isDryRun()) {
-      return new TenantRebalanceResult(null, dryRunResults, 
config.isVerboseResult());
-    }
-
-    // Step 3: Create two queues--parallel and sequential and schedule the 
tables to these queues based on the
-    // parallelWhitelist and parallelBlacklist, also their dry-run results. 
For each table, a job context is created
-    // and put in the queue for the consuming threads to pick up and run the 
rebalance operation
-
-    String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier();
-    TenantRebalanceObserver observer = new 
ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, config.getTenantName(),
-        tables, _pinotHelixResourceManager);
-    observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, 
null);
-    Pair<ConcurrentLinkedQueue<TenantTableRebalanceJobContext>, 
Queue<TenantTableRebalanceJobContext>> queues =
-        createParallelAndSequentialQueues(config, dryRunResults, 
config.getParallelWhitelist(),
-            config.getParallelBlacklist());
-    ConcurrentLinkedQueue<TenantTableRebalanceJobContext> parallelQueue = 
queues.getLeft();
-    Queue<TenantTableRebalanceJobContext> sequentialQueue = queues.getRight();
-
-    // Step 4: Spin up threads to consume the parallel queue and sequential 
queue.
-
-    // ensure atleast 1 thread is created to run the sequential table 
rebalance operations
-    int parallelism = Math.max(config.getDegreeOfParallelism(), 1);
-    AtomicInteger activeThreads = new AtomicInteger(parallelism);
-    try {
-      for (int i = 0; i < parallelism; i++) {
-        _executorService.submit(() -> {
-          doConsumeTablesFromQueue(parallelQueue, config, observer);
-          if (activeThreads.decrementAndGet() == 0) {
-            doConsumeTablesFromQueue(sequentialQueue, config, observer);
-            observer.onSuccess(String.format("Successfully rebalanced tenant 
%s.", config.getTenantName()));
-          }
-        });
-      }
-    } catch (Exception exception) {
-      observer.onError(String.format("Failed to rebalance the tenant %s. 
Cause: %s", config.getTenantName(),
-          exception.getMessage()));
-    }
-
-    // Step 5: Prepare the rebalance results to be returned to the user. The 
rebalance jobs are running in the
-    // background asynchronously.
-
-    Map<String, RebalanceResult> rebalanceResults = new HashMap<>();
-    for (String table : dryRunResults.keySet()) {
-      RebalanceResult result = dryRunResults.get(table);
-      if (result.getStatus() == RebalanceResult.Status.DONE) {
-        rebalanceResults.put(table, new RebalanceResult(result.getJobId(), 
RebalanceResult.Status.IN_PROGRESS,
-            "In progress, check controller task status for the", 
result.getInstanceAssignment(),
-            result.getTierInstanceAssignment(), result.getSegmentAssignment(), 
result.getPreChecksResult(),
-            result.getRebalanceSummaryResult()));
-      } else {
-        rebalanceResults.put(table, dryRunResults.get(table));
-      }
-    }
-    return new TenantRebalanceResult(tenantRebalanceJobId, rebalanceResults, 
config.isVerboseResult());
-  }
-
-  private void doConsumeTablesFromQueue(Queue<TenantTableRebalanceJobContext> 
queue, RebalanceConfig config,
-      TenantRebalanceObserver observer) {
-    while (true) {
-      TenantTableRebalanceJobContext jobContext = queue.poll();
-      if (jobContext == null) {
-        break;
-      }
-      String table = jobContext.getTableName();
-      RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
-      rebalanceConfig.setDryRun(false);
-      if (jobContext.shouldRebalanceWithDowntime()) {
-        rebalanceConfig.setMinAvailableReplicas(0);
-      }
-      rebalanceTable(table, rebalanceConfig, jobContext.getJobId(), observer);
-    }
-  }
-
-  private Set<String> getDimensionalTables(String tenantName) {
-    Set<String> dimTables = new HashSet<>();
-    for (String table : _pinotHelixResourceManager.getAllTables()) {
-      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(table);
-      if (tableConfig == null) {
-        LOGGER.error("Unable to retrieve table config for table: {}", table);
-        continue;
-      }
-      if (tenantName.equals(tableConfig.getTenantConfig().getServer()) && 
tableConfig.isDimTable()) {
-        dimTables.add(table);
-      }
-    }
-    return dimTables;
-  }
-
-  private String createUniqueRebalanceJobIdentifier() {
-    return UUID.randomUUID().toString();
-  }
-
-  Set<String> getTenantTables(String tenantName) {
-    Set<String> tables = new HashSet<>();
-    for (String table : _pinotHelixResourceManager.getAllTables()) {
-      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(table);
-      if (tableConfig == null) {
-        LOGGER.error("Unable to retrieve table config for table: {}", table);
-        continue;
-      }
-      if (TableConfigUtils.isRelevantToTenant(tableConfig, tenantName)) {
-        tables.add(table);
-      }
-    }
-    return tables;
-  }
-
-  private void rebalanceTable(String tableName, RebalanceConfig config, String 
rebalanceJobId,
-      TenantRebalanceObserver observer) {
-    try {
-      
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_STARTED_TRIGGER, 
tableName, rebalanceJobId);
-      RebalanceResult result = 
_tableRebalanceManager.rebalanceTable(tableName, config, rebalanceJobId, true);
-      // TODO: For downtime=true rebalance, track if the EV-IS has converged 
to move on, otherwise it fundementally
-      //  breaks the degree of parallelism
-      if (result.getStatus().equals(RebalanceResult.Status.DONE)) {
-        
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER, 
tableName, null);
-      } else {
-        
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER, 
tableName,
-            result.getDescription());
-      }
-    } catch (Throwable t) {
-      
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER, 
tableName,
-          String.format("Caught exception/error while rebalancing table: %s", 
tableName));
-    }
-  }
-
-  private static Set<String> getTablesToRunInParallel(Set<String> tables,
-      @Nullable Set<String> parallelWhitelist, @Nullable Set<String> 
parallelBlacklist) {
-    Set<String> parallelTables = new HashSet<>(tables);
-    if (parallelWhitelist != null && !parallelWhitelist.isEmpty()) {
-      parallelTables.retainAll(parallelWhitelist);
-    }
-    if (parallelBlacklist != null && !parallelBlacklist.isEmpty()) {
-      parallelTables.removeAll(parallelBlacklist);
-    }
-    return parallelTables;
-  }
-
-  @VisibleForTesting
-  Pair<ConcurrentLinkedQueue<TenantTableRebalanceJobContext>, 
Queue<TenantTableRebalanceJobContext>>
-  createParallelAndSequentialQueues(
-      TenantRebalanceConfig config, Map<String, RebalanceResult> 
dryRunResults, @Nullable Set<String> parallelWhitelist,
-      @Nullable Set<String> parallelBlacklist) {
-    Set<String> parallelTables = 
getTablesToRunInParallel(dryRunResults.keySet(), parallelWhitelist, 
parallelBlacklist);
-    Map<String, RebalanceResult> parallelTableDryRunResults = new HashMap<>();
-    Map<String, RebalanceResult> sequentialTableDryRunResults = new 
HashMap<>();
-    dryRunResults.forEach((table, result) -> {
-      if (parallelTables.contains(table)) {
-        parallelTableDryRunResults.put(table, result);
-      } else {
-        sequentialTableDryRunResults.put(table, result);
-      }
-    });
-    ConcurrentLinkedQueue<TenantTableRebalanceJobContext> parallelQueue =
-        createTableQueue(config, parallelTableDryRunResults);
-    Queue<TenantTableRebalanceJobContext> sequentialQueue = 
createTableQueue(config, sequentialTableDryRunResults);
-    return Pair.of(parallelQueue, sequentialQueue);
-  }
-
-  @VisibleForTesting
-  ConcurrentLinkedQueue<TenantTableRebalanceJobContext> 
createTableQueue(TenantRebalanceConfig config,
-      Map<String, RebalanceResult> dryRunResults) {
-    Queue<TenantTableRebalanceJobContext> firstQueue = new LinkedList<>();
-    Queue<TenantTableRebalanceJobContext> queue = new LinkedList<>();
-    Queue<TenantTableRebalanceJobContext> lastQueue = new LinkedList<>();
-    Set<String> dimTables = getDimensionalTables(config.getTenantName());
-    dryRunResults.forEach((table, dryRunResult) -> {
-      TenantTableRebalanceJobContext jobContext =
-          new TenantTableRebalanceJobContext(table, dryRunResult.getJobId(), 
dryRunResult.getRebalanceSummaryResult()
-              .getSegmentInfo()
-              .getReplicationFactor()
-              .getExpectedValueAfterRebalance() == 1);
-      if (dimTables.contains(table)) {
-        // check if the dimension table is a pure scale out or scale in.
-        // pure scale out means that only new servers are added and no servers 
are removed, vice versa
-        RebalanceSummaryResult.ServerInfo serverInfo =
-            
dryRunResults.get(table).getRebalanceSummaryResult().getServerInfo();
-        if (!serverInfo.getServersAdded().isEmpty() && 
serverInfo.getServersRemoved().isEmpty()) {
-          // dimension table's pure scale OUT should be performed BEFORE other 
regular tables so that queries involving
-          // joining with dimension table won't fail on the new servers
-          firstQueue.add(jobContext);
-        } else if (serverInfo.getServersAdded().isEmpty() && 
!serverInfo.getServersRemoved().isEmpty()) {
-          // dimension table's pure scale IN should be performed AFTER other 
regular tables so that queries involving
-          // joining with dimension table won't fail on the old servers
-          lastQueue.add(jobContext);
-        } else {
-          // the dimension table is not a pure scale out or scale in, which is 
supposed to be rebalanced manually.
-          // Pre-check should capture and warn about this case.
-          firstQueue.add(jobContext);
-        }
-      } else {
-        queue.add(jobContext);
-      }
-    });
-    ConcurrentLinkedQueue<TenantTableRebalanceJobContext> tableQueue = new 
ConcurrentLinkedQueue<>();
-    tableQueue.addAll(firstQueue);
-    tableQueue.addAll(queue);
-    tableQueue.addAll(lastQueue);
-    return tableQueue;
-  }
-}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java
new file mode 100644
index 00000000000..954a68f5425
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import 
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
+import org.apache.pinot.core.periodictask.BasePeriodicTask;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Periodic task to check if tenant rebalance jobs are stuck and retry them. 
Controller crashes or restarts could
+ * make a tenant rebalance job stuck.
+ * The task checks for each tenant rebalance job's metadata in ZK, look at the 
TenantTableRebalanceJobContext in
+ * ongoingJobsQueue, which is a list with table rebalance job ids that the 
controller is currently processing, to see
+ * if any of these table rebalance jobs has not updated their progress stats 
longer than the configured heartbeat
+ * timeout. If so, the tenant rebalance job is considered stuck, and the task 
will resume the tenant rebalance job by
+ * aborting all the ongoing table rebalance jobs, move the ongoing 
TenantTableRebalanceJobContext back to the head of
+ * the parallel queue, and then trigger the tenant rebalance job again with 
the updated context, with an attempt job ID
+ * <p>
+ * Notice that fundamentally this is not a retry but a resume, since we will 
not re-do the table rebalance for those
+ * tables that have already been processed.
+ */
+public class TenantRebalanceChecker extends BasePeriodicTask {
+  private final static String TASK_NAME = 
TenantRebalanceChecker.class.getSimpleName();
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TenantRebalanceChecker.class);
+  private final TenantRebalancer _tenantRebalancer;
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
+  // To avoid multiple retries of tenant rebalance job on one controller, we 
only allow one ongoing retry job at a time.
+  private ZkBasedTenantRebalanceObserver _ongoingJobObserver = null;
+
+  public TenantRebalanceChecker(ControllerConf config,
+      PinotHelixResourceManager pinotHelixResourceManager, TenantRebalancer 
tenantRebalancer) {
+    super(TASK_NAME, config.getTenantRebalanceCheckerFrequencyInSeconds(),
+        config.getTenantRebalanceCheckerInitialDelayInSeconds());
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _tenantRebalancer = tenantRebalancer;
+  }
+
+  @Override
+  protected void runTask(Properties periodicTaskProperties) {
+    if (_ongoingJobObserver == null || _ongoingJobObserver.isDone()) {
+      checkAndRetryTenantRebalance();
+    } else {
+      LOGGER.info("Skip checking tenant rebalance jobs as there's an ongoing 
retry job: {} for tenant: {}",
+          _ongoingJobObserver.getJobId(), _ongoingJobObserver.getTenantName());
+    }
+  }
+
+  private void checkAndRetryTenantRebalance() {
+    Map<String, Map<String, String>> allJobMetadataByJobId =
+        
_pinotHelixResourceManager.getAllJobs(Set.of(ControllerJobTypes.TENANT_REBALANCE),
 x -> true);
+    for (Map.Entry<String, Map<String, String>> entry : 
allJobMetadataByJobId.entrySet()) {
+      String jobId = entry.getKey();
+      Map<String, String> jobZKMetadata = entry.getValue();
+
+      try {
+        // Check if the tenant rebalance job is stuck
+        String tenantRebalanceContextStr =
+            
jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT);
+        String tenantRebalanceProgressStatsStr =
+            
jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+        if (StringUtils.isEmpty(tenantRebalanceContextStr) || 
StringUtils.isEmpty(tenantRebalanceProgressStatsStr)) {
+          // Skip rebalance job: {} as it has no job context or progress stats
+          LOGGER.info("Skip checking tenant rebalance job: {} as it has no job 
context or progress stats", jobId);
+          continue;
+        }
+        TenantRebalanceContext tenantRebalanceContext =
+            JsonUtils.stringToObject(tenantRebalanceContextStr, 
TenantRebalanceContext.class);
+        TenantRebalanceProgressStats progressStats =
+            JsonUtils.stringToObject(tenantRebalanceProgressStatsStr, 
TenantRebalanceProgressStats.class);
+        long statsUpdatedAt = 
Long.parseLong(jobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
+
+        TenantRebalanceContext retryTenantRebalanceContext =
+            prepareRetryIfTenantRebalanceJobStuck(jobZKMetadata, 
tenantRebalanceContext, statsUpdatedAt);
+        if (retryTenantRebalanceContext != null) {
+          TenantRebalancer.TenantTableRebalanceJobContext ctx;
+          while ((ctx = 
retryTenantRebalanceContext.getOngoingJobsQueue().poll()) != null) {
+            abortTableRebalanceJob(ctx.getTableName());
+            // the existing table rebalance job is aborted, we need to run the 
rebalance job with a new job ID.
+            TenantRebalancer.TenantTableRebalanceJobContext newCtx =
+                new TenantRebalancer.TenantTableRebalanceJobContext(
+                    ctx.getTableName(), UUID.randomUUID().toString(), 
ctx.shouldRebalanceWithDowntime());
+            retryTenantRebalanceContext.getParallelQueue().addFirst(newCtx);
+          }
+          // the retry tenant rebalance job id has been created in ZK, we can 
safely mark the original job as
+          // aborted, so that this original job will not be picked up again in 
the future.
+          markTenantRebalanceJobAsAborted(jobId, jobZKMetadata, 
tenantRebalanceContext, progressStats);
+          retryTenantRebalanceJob(retryTenantRebalanceContext, progressStats);
+          // We only retry one stuck tenant rebalance job at a time to avoid 
multiple retries of the same job
+          return;
+        } else {
+          LOGGER.info("Tenant rebalance job: {} is not stuck", jobId);
+        }
+      } catch (JsonProcessingException e) {
+        // If we cannot parse the job metadata, we skip this job
+        LOGGER.warn("Failed to parse tenant rebalance context for job: {}, 
skipping", jobId);
+      }
+    }
+  }
+
+  private void retryTenantRebalanceJob(TenantRebalanceContext 
tenantRebalanceContextForRetry,
+      TenantRebalanceProgressStats progressStats) {
+    ZkBasedTenantRebalanceObserver observer =
+        new 
ZkBasedTenantRebalanceObserver(tenantRebalanceContextForRetry.getJobId(),
+            tenantRebalanceContextForRetry.getConfig().getTenantName(),
+            progressStats, tenantRebalanceContextForRetry, 
_pinotHelixResourceManager);
+    _ongoingJobObserver = observer;
+    _tenantRebalancer.rebalanceWithContext(tenantRebalanceContextForRetry, 
observer);
+  }
+
+  /**
+   * Check if the tenant rebalance job is stuck, and prepare to retry it if 
necessary.
+   * A tenant rebalance job is considered stuck if:
+   * <ol>
+   *   <li>There are no ongoing jobs, but there are jobs in the parallel or 
sequential queue, and the stats have not
+   *   been updated for longer than the heartbeat timeout.</li>
+   *   <li>There are ongoing table rebalance jobs, and at least one of them 
has not updated its status for longer
+   *   than the heartbeat timeout.</li>
+   * </ol>
+   * We cannot simply check whether the tenant rebalance job's metadata was 
updated within the heartbeat timeout to
+   * determine if this tenant rebalance job is stuck, because a tenant 
rebalance job can have no updates for a
+   * longer period if the underlying table rebalance jobs take extra long time 
to finish, while they individually do
+   * update regularly within heartbeat timeout.
+   * The retry is prepared by creating a new tenant rebalance job with an 
incremented attempt ID, and persisting it to
+   * ZK.
+   *
+   * @param jobZKMetadata The ZK metadata of the tenant rebalance job.
+   * @param tenantRebalanceContext The context of the tenant rebalance job.
+   * @param statsUpdatedAt The timestamp when the stats were last updated.
+   * @return The TenantRebalanceContext for retry if the tenant rebalance job 
is stuck and should be retried, null if
+   * the job is not stuck, or it's stuck but other controller has prepared the 
retry first.
+   */
+  private TenantRebalanceContext prepareRetryIfTenantRebalanceJobStuck(
+      Map<String, String> jobZKMetadata, TenantRebalanceContext 
tenantRebalanceContext, long statsUpdatedAt) {
+    boolean isStuck = false;
+    String stuckTableRebalanceJobId = null;
+    long heartbeatTimeoutMs = 
tenantRebalanceContext.getConfig().getHeartbeatTimeoutInMs();
+    if (tenantRebalanceContext.getOngoingJobsQueue().isEmpty()) {
+      if (!tenantRebalanceContext.getParallelQueue().isEmpty() || 
!tenantRebalanceContext.getSequentialQueue()
+          .isEmpty()) {
+        // If there are no ongoing jobs but in parallel or sequential queue, 
it could be because the tenant rebalancer
+        // is in the interval of the previous done job and consumption of the 
next job. We need to check the heartbeat
+        // timeout to be sure that it's actually stuck at this state for a 
long while.
+        isStuck = (System.currentTimeMillis() - statsUpdatedAt >= 
heartbeatTimeoutMs);
+        LOGGER.info(
+            "No ongoing table rebalance jobs for tenant: {}, but there are 
jobs in parallel queue size: {}, "
+                + "sequential queue size: {}, stats last updated at: {}, 
isStuck: {}",
+            tenantRebalanceContext.getConfig().getTenantName(), 
tenantRebalanceContext.getParallelQueue().size(),
+            tenantRebalanceContext.getSequentialQueue().size(), 
statsUpdatedAt, isStuck);
+      }
+    } else {
+      // Check if there's any stuck ongoing table rebalance jobs
+      for (TenantRebalancer.TenantTableRebalanceJobContext ctx : new 
ArrayList<>(
+          tenantRebalanceContext.getOngoingJobsQueue())) {
+        if (isOngoingTableRebalanceJobStuck(ctx.getJobId(), statsUpdatedAt, 
heartbeatTimeoutMs)) {
+          isStuck = true;
+          stuckTableRebalanceJobId = ctx.getJobId();
+          break;
+        }
+      }
+    }
+    if (isStuck) {
+      // If any of the table rebalance jobs is stuck, we consider the tenant 
rebalance job as stuck.
+      // We need to make sure only one controller instance retries the tenant 
rebalance job.
+      TenantRebalanceContext retryTenantRebalanceContext =
+          
TenantRebalanceContext.forRetry(tenantRebalanceContext.getOriginalJobId(),
+              tenantRebalanceContext.getConfig(), 
tenantRebalanceContext.getAttemptId() + 1,
+              tenantRebalanceContext.getParallelQueue(), 
tenantRebalanceContext.getSequentialQueue(),
+              tenantRebalanceContext.getOngoingJobsQueue());
+      try {
+        
jobZKMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
+            JsonUtils.objectToString(retryTenantRebalanceContext));
+        // this returns false if the retryJob already exists in ZK, which 
means another controller has already
+        // prepared the retry, so we should not retry again.
+        boolean shouldRetry =
+            
_pinotHelixResourceManager.addControllerJobToZK(retryTenantRebalanceContext.getJobId(),
 jobZKMetadata,
+                ControllerJobTypes.TENANT_REBALANCE, Objects::isNull);
+        if (!shouldRetry) {
+          LOGGER.info("Another controller has already prepared the retry job: 
{} for tenant: {}. Do not retry.",
+              stuckTableRebalanceJobId, 
tenantRebalanceContext.getConfig().getTenantName());
+          return null;
+        }
+        LOGGER.info("Found and prepared to retry stuck table rebalance job: {} 
for tenant: {}.",
+            stuckTableRebalanceJobId, 
tenantRebalanceContext.getConfig().getTenantName());
+        return retryTenantRebalanceContext;
+      } catch (JsonProcessingException e) {
+        LOGGER.error(
+            "Error serialising rebalance context to JSON for updating 
retryTenantRebalanceContext to ZK {}",
+            tenantRebalanceContext.getJobId(), e);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Check if the table rebalance job in tenant job's ongoing queue is stuck.
+   * A table rebalance job is considered stuck if:
+   * <ol>
+   * <li> The job metadata does not exist in ZK, and the tenant rebalance job 
stats have not been updated for longer
+   * than the heartbeat timeout.</li>
+   * <li> The job metadata exists, but it has not been updated for longer than 
the heartbeat timeout.</li>
+   * </ol>
+   * This is different to how we consider a table rebalance job is stuck in
+   * {@link 
org.apache.pinot.controller.helix.core.rebalance.RebalanceChecker}, where we 
consider a table rebalance
+   * job stuck even if it's DONE, ABORTED, FAILED, or CANCELLED for more than 
heartbeat timeout, because they should
+   * have been removed from the ongoing queue once they are DONE or ABORTED 
etc., if the controller is working properly.
+   *
+   * @param jobId The ID of the table rebalance job.
+   * @param tenantRebalanceJobStatsUpdatedAt The timestamp when the tenant 
rebalance job stats were last updated.
+   * @param heartbeatTimeoutMs The heartbeat timeout in milliseconds.
+   * @return True if the table rebalance job is stuck, false otherwise.
+   */
+  private boolean isOngoingTableRebalanceJobStuck(String jobId, long 
tenantRebalanceJobStatsUpdatedAt,
+      long heartbeatTimeoutMs) {
+    Map<String, String> jobMetadata =
+        _pinotHelixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobTypes.TABLE_REBALANCE);
+    if (jobMetadata == null) {
+      // if the table rebalance job metadata has not created for the table 
rebalance job id in ongoingJobsQueue
+      // longer than heartbeat timeout, the controller may have crashed or 
restarted
+      return System.currentTimeMillis() - tenantRebalanceJobStatsUpdatedAt >= 
heartbeatTimeoutMs;
+    }
+    long statsUpdatedAt = 
Long.parseLong(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
+
+    if (System.currentTimeMillis() - statsUpdatedAt >= heartbeatTimeoutMs) {
+      LOGGER.info("Found stuck table rebalance job: {} that has not updated 
its status in ZK within "
+          + "heartbeat timeout: {}", jobId, heartbeatTimeoutMs);
+      return true;
+    }
+    return false;
+  }
+
+  private void abortTableRebalanceJob(String tableNameWithType) {
+    // TODO: This is a duplicate of a private method in RebalanceChecker, we 
should refactor it to a common place.
+    boolean updated =
+        _pinotHelixResourceManager.updateJobsForTable(tableNameWithType, 
ControllerJobTypes.TABLE_REBALANCE,
+            jobMetadata -> {
+              String jobId = 
jobMetadata.get(CommonConstants.ControllerJob.JOB_ID);
+              try {
+                String jobStatsInStr = 
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+                TableRebalanceProgressStats jobStats =
+                    JsonUtils.stringToObject(jobStatsInStr, 
TableRebalanceProgressStats.class);
+                if (jobStats.getStatus() != 
RebalanceResult.Status.IN_PROGRESS) {
+                  return;
+                }
+                LOGGER.info("Abort rebalance job: {} for table: {}", jobId, 
tableNameWithType);
+                jobStats.setStatus(RebalanceResult.Status.ABORTED);
+                
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+                    JsonUtils.objectToString(jobStats));
+              } catch (Exception e) {
+                LOGGER.error("Failed to abort rebalance job: {} for table: 
{}", jobId, tableNameWithType, e);
+              }
+            });
+    LOGGER.info("Tried to abort existing jobs at best effort and done: {}", 
updated);
+  }
+
+  /**
+   * Mark the tenant rebalance job as aborted by updating the progress stats 
and clearing the queues in the context,
+   * then update the updated job metadata to ZK. The tables that are 
unprocessed will be marked as CANCELLED, and the
+   * tables that are processing will be marked as ABORTED in progress stats.
+   *
+   * @param jobId The ID of the tenant rebalance job.
+   * @param jobMetadata The metadata of the tenant rebalance job.
+   * @param tenantRebalanceContext The context of the tenant rebalance job.
+   * @param progressStats The progress stats of the tenant rebalance job.
+   */
+  private void markTenantRebalanceJobAsAborted(String jobId, Map<String, 
String> jobMetadata,
+      TenantRebalanceContext tenantRebalanceContext,
+      TenantRebalanceProgressStats progressStats) {
+    LOGGER.info("Marking tenant rebalance job: {} as aborted", jobId);
+    TenantRebalanceProgressStats abortedProgressStats = new 
TenantRebalanceProgressStats(progressStats);
+    for (Map.Entry<String, String> entry : 
abortedProgressStats.getTableStatusMap().entrySet()) {
+      if (Objects.equals(entry.getValue(), 
TenantRebalanceProgressStats.TableStatus.UNPROCESSED.name())) {
+        
entry.setValue(TenantRebalanceProgressStats.TableStatus.CANCELLED.name());
+      } else if (Objects.equals(entry.getValue(),
+          TenantRebalanceProgressStats.TableStatus.PROCESSING.name())) {
+        
entry.setValue(TenantRebalanceProgressStats.TableStatus.ABORTED.name());
+      }
+    }
+    tenantRebalanceContext.getSequentialQueue().clear();
+    tenantRebalanceContext.getParallelQueue().clear();
+    tenantRebalanceContext.getOngoingJobsQueue().clear();
+    try {
+      
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+          JsonUtils.objectToString(abortedProgressStats));
+    } catch (JsonProcessingException e) {
+      LOGGER.error("Error serialising rebalance stats to JSON for marking 
tenant rebalance job as aborted {}", jobId,
+          e);
+    }
+    try {
+      jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
+          JsonUtils.objectToString(tenantRebalanceContext));
+    } catch (JsonProcessingException e) {
+      LOGGER.error("Error serialising rebalance context to JSON for marking 
tenant rebalance job as aborted {}", jobId,
+          e);
+    }
+    _pinotHelixResourceManager.addControllerJobToZK(jobId, jobMetadata, 
ControllerJobTypes.TENANT_REBALANCE);
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java
new file mode 100644
index 00000000000..a5ef73a74a1
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+
+/**
+ * Default implementation of TenantRebalanceContext that includes parallel and 
sequential queues
+ * for managing tenant rebalance operations. This context is synchronized to 
ZK by `ZkBasedTenantRebalanceObserver`
+ * to ensure consistency across controller restarts.
+ */
+public class TenantRebalanceContext {
+  protected static final int INITIAL_ATTEMPT_ID = 1;
+  @JsonProperty("jobId")
+  private final String _jobId;
+  @JsonProperty("originalJobId")
+  private final String _originalJobId;
+  @JsonProperty("config")
+  private final TenantRebalanceConfig _config;
+  @JsonProperty("attemptId")
+  private final int _attemptId;
+  // Ongoing jobs queue and parallel queue are accessed concurrently by 
multiple threads, where each worker thread
+  // consumes a tenant-table-rebalance-job from the parallel queue, adds it to 
the ongoing jobs queue, processes it.
+  // On the other hand, only a single thread consumes from the sequential 
queue.
+  private final 
ConcurrentLinkedQueue<TenantRebalancer.TenantTableRebalanceJobContext> 
_ongoingJobsQueue;
+  private final 
ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext> 
_parallelQueue;
+  private final Queue<TenantRebalancer.TenantTableRebalanceJobContext> 
_sequentialQueue;
+
+  // Default constructor for JSON deserialization
+  public TenantRebalanceContext() {
+    _jobId = null;
+    _originalJobId = null;
+    _config = null;
+    _attemptId = INITIAL_ATTEMPT_ID;
+    _parallelQueue = new ConcurrentLinkedDeque<>();
+    _sequentialQueue = new LinkedList<>();
+    _ongoingJobsQueue = new ConcurrentLinkedQueue<>();
+  }
+
+  public TenantRebalanceContext(String originalJobId, TenantRebalanceConfig 
config, int attemptId,
+      ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext> 
parallelQueue,
+      Queue<TenantRebalancer.TenantTableRebalanceJobContext> sequentialQueue,
+      ConcurrentLinkedQueue<TenantRebalancer.TenantTableRebalanceJobContext> 
ongoingJobsQueue) {
+    _jobId = createAttemptJobId(originalJobId, attemptId);
+    _originalJobId = originalJobId;
+    _config = config;
+    _attemptId = attemptId;
+    _parallelQueue = new ConcurrentLinkedDeque<>(parallelQueue);
+    _sequentialQueue = new LinkedList<>(sequentialQueue);
+    _ongoingJobsQueue = new ConcurrentLinkedQueue<>(ongoingJobsQueue);
+  }
+
+  public static TenantRebalanceContext forInitialRebalance(String 
originalJobId, TenantRebalanceConfig config,
+      ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext> 
parallelQueue,
+      Queue<TenantRebalancer.TenantTableRebalanceJobContext> sequentialQueue) {
+    return new TenantRebalanceContext(originalJobId, config, 
INITIAL_ATTEMPT_ID,
+        parallelQueue, sequentialQueue, new ConcurrentLinkedQueue<>());
+  }
+
+  public static TenantRebalanceContext forRetry(String originalJobId, 
TenantRebalanceConfig config,
+      int attemptId, 
ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext> 
parallelQueue,
+      Queue<TenantRebalancer.TenantTableRebalanceJobContext> sequentialQueue,
+      ConcurrentLinkedQueue<TenantRebalancer.TenantTableRebalanceJobContext> 
ongoingJobsQueue) {
+    return new TenantRebalanceContext(originalJobId, config, attemptId,
+        parallelQueue, sequentialQueue, ongoingJobsQueue);
+  }
+
+  public 
ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext> 
getParallelQueue() {
+    return _parallelQueue;
+  }
+
+  public Queue<TenantRebalancer.TenantTableRebalanceJobContext> 
getSequentialQueue() {
+    return _sequentialQueue;
+  }
+
+  public 
ConcurrentLinkedQueue<TenantRebalancer.TenantTableRebalanceJobContext> 
getOngoingJobsQueue() {
+    return _ongoingJobsQueue;
+  }
+
+  public int getAttemptId() {
+    return _attemptId;
+  }
+
+  public String getOriginalJobId() {
+    return _originalJobId;
+  }
+
+  public String getJobId() {
+    return _jobId;
+  }
+
+  public TenantRebalanceConfig getConfig() {
+    return _config;
+  }
+
+  public String toString() {
+    return "TenantRebalanceContext{" + "jobId='" + getJobId() + '\'' + ", 
originalJobId='" + getOriginalJobId()
+        + '\'' + ", attemptId=" + getAttemptId() + ", parallelQueueSize="
+        + getParallelQueue().size() + ", sequentialQueueSize=" + 
getSequentialQueue().size() + ", ongoingJobsQueueSize="
+        + getOngoingJobsQueue().size() + '}';
+  }
+
+  private static String createAttemptJobId(String originalJobId, int 
attemptId) {
+    if (attemptId == INITIAL_ATTEMPT_ID) {
+      return originalJobId;
+    }
+    return originalJobId + "_" + attemptId;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceProgressStats.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceProgressStats.java
index fb77100ecee..7bad045c0b7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceProgressStats.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceProgressStats.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.controller.helix.core.rebalance.tenant;
 
+import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -42,12 +43,23 @@ public class TenantRebalanceProgressStats {
   }
 
   public TenantRebalanceProgressStats(Set<String> tables) {
+    Preconditions.checkState(tables != null && !tables.isEmpty(), "List of 
tables to observe is empty.");
     _tableStatusMap = tables.stream()
         .collect(Collectors.toMap(Function.identity(), k -> 
TableStatus.UNPROCESSED.name()));
     _totalTables = tables.size();
     _remainingTables = _totalTables;
   }
 
+  public TenantRebalanceProgressStats(TenantRebalanceProgressStats other) {
+    _tableStatusMap = new HashMap<>(other._tableStatusMap);
+    _tableRebalanceJobIdMap.putAll(other._tableRebalanceJobIdMap);
+    _totalTables = other._totalTables;
+    _remainingTables = other._remainingTables;
+    _startTimeMs = other._startTimeMs;
+    _timeToFinishInSeconds = other._timeToFinishInSeconds;
+    _completionStatusMsg = other._completionStatusMsg;
+  }
+
   public Map<String, String> getTableStatusMap() {
     return _tableStatusMap;
   }
@@ -109,6 +121,6 @@ public class TenantRebalanceProgressStats {
   }
 
   public enum TableStatus {
-    UNPROCESSED, PROCESSING, PROCESSED
+    UNPROCESSED, PROCESSING, PROCESSED, ABORTED, CANCELLED
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
index 95149868067..1f2cb74f47a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
@@ -18,10 +18,48 @@
  */
 package org.apache.pinot.controller.helix.core.rebalance.tenant;
 
-public interface TenantRebalancer {
-  TenantRebalanceResult rebalance(TenantRebalanceConfig config);
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceSummaryResult;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-  class TenantTableRebalanceJobContext {
+
+public class TenantRebalancer {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TenantRebalancer.class);
+  private final TableRebalanceManager _tableRebalanceManager;
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
+  private final ExecutorService _executorService;
+
+  public TenantRebalancer(TableRebalanceManager tableRebalanceManager,
+      PinotHelixResourceManager pinotHelixResourceManager, ExecutorService 
executorService) {
+    _tableRebalanceManager = tableRebalanceManager;
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _executorService = executorService;
+  }
+
+  public static class TenantTableRebalanceJobContext {
     private final String _tableName;
     private final String _jobId;
     // Whether the rebalance should be done with downtime or 
minAvailableReplicas=0.
@@ -35,7 +73,9 @@ public interface TenantRebalancer {
      * @param withDowntime Whether the rebalance should be done with downtime 
or minAvailableReplicas=0.
      * @return The result of the rebalance operation.
      */
-    public TenantTableRebalanceJobContext(String tableName, String jobId, 
boolean withDowntime) {
+    @JsonCreator
+    public TenantTableRebalanceJobContext(@JsonProperty("tableName") String 
tableName,
+        @JsonProperty("jobId") String jobId, @JsonProperty("withDowntime") 
boolean withDowntime) {
       _tableName = tableName;
       _jobId = jobId;
       _withDowntime = withDowntime;
@@ -49,8 +89,292 @@ public interface TenantRebalancer {
       return _tableName;
     }
 
+    @JsonProperty("withDowntime")
     public boolean shouldRebalanceWithDowntime() {
       return _withDowntime;
     }
   }
+
+  public TenantRebalanceResult rebalance(TenantRebalanceConfig config) {
+    Map<String, RebalanceResult> dryRunResults = new HashMap<>();
+
+    // Step 1: Select the tables to include in this rebalance operation
+
+    Set<String> tables = getTenantTables(config.getTenantName());
+    Set<String> includeTables = config.getIncludeTables();
+    if (!includeTables.isEmpty()) {
+      tables.retainAll(includeTables);
+    }
+    tables.removeAll(config.getExcludeTables());
+
+    // Step 2: Dry-run over the selected tables to get the dry-run rebalance 
results. The result is to be sent as
+    // response to the user, and their summaries are needed for scheduling the 
job queue later
+
+    tables.forEach(table -> {
+      try {
+        RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+        rebalanceConfig.setDryRun(true);
+        dryRunResults.put(table,
+            _tableRebalanceManager.rebalanceTableDryRun(table, 
rebalanceConfig, createUniqueRebalanceJobIdentifier()));
+      } catch (TableNotFoundException exception) {
+        dryRunResults.put(table, new RebalanceResult(null, 
RebalanceResult.Status.FAILED, exception.getMessage(),
+            null, null, null, null, null));
+      }
+    });
+
+    // If dry-run was set, return the dry-run results and the job is done here
+    if (config.isDryRun()) {
+      return new TenantRebalanceResult(null, dryRunResults, 
config.isVerboseResult());
+    }
+
+    // Step 3: Create two queues--parallel and sequential and schedule the 
tables to these queues based on the
+    // parallelWhitelist and parallelBlacklist, also their dry-run results. 
For each table, a job context is created
+    // and put in the queue for the consuming threads to pick up and run the 
rebalance operation
+
+    String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier();
+    Pair<ConcurrentLinkedDeque<TenantTableRebalanceJobContext>, 
Queue<TenantTableRebalanceJobContext>> queues =
+        createParallelAndSequentialQueues(config, dryRunResults, 
config.getParallelWhitelist(),
+            config.getParallelBlacklist());
+    ConcurrentLinkedDeque<TenantTableRebalanceJobContext> parallelQueue = 
queues.getLeft();
+    Queue<TenantTableRebalanceJobContext> sequentialQueue = queues.getRight();
+    TenantRebalanceContext tenantRebalanceContext =
+        TenantRebalanceContext.forInitialRebalance(tenantRebalanceJobId, 
config, parallelQueue,
+            sequentialQueue);
+
+    ZkBasedTenantRebalanceObserver observer =
+        new ZkBasedTenantRebalanceObserver(tenantRebalanceContext.getJobId(), 
config.getTenantName(),
+            tables, tenantRebalanceContext, _pinotHelixResourceManager);
+    // Step 4: Spin up threads to consume the parallel queue and sequential 
queue.
+    rebalanceWithContext(tenantRebalanceContext, observer);
+
+    // Step 5: Prepare the rebalance results to be returned to the user. The 
rebalance jobs are running in the
+    // background asynchronously.
+
+    Map<String, RebalanceResult> rebalanceResults = new HashMap<>();
+    for (String table : dryRunResults.keySet()) {
+      RebalanceResult result = dryRunResults.get(table);
+      if (result.getStatus() == RebalanceResult.Status.DONE) {
+        rebalanceResults.put(table, new RebalanceResult(result.getJobId(), 
RebalanceResult.Status.IN_PROGRESS,
+            "In progress, check controller task status for the", 
result.getInstanceAssignment(),
+            result.getTierInstanceAssignment(), result.getSegmentAssignment(), 
result.getPreChecksResult(),
+            result.getRebalanceSummaryResult()));
+      } else {
+        rebalanceResults.put(table, dryRunResults.get(table));
+      }
+    }
+    return new TenantRebalanceResult(tenantRebalanceJobId, rebalanceResults, 
config.isVerboseResult());
+  }
+
+  /**
+   * Spins up threads to rebalance the tenant with the given context and 
observer.
+   * The rebalance operation is performed in parallel for the tables in the 
parallel queue, then, sequentially for the
+   * tables in the sequential queue.
+   * The observer should be initiated with the tenantRebalanceContext in order 
to track the progress properly.
+   *
+   * @param tenantRebalanceContext The context containing the configuration 
and queues for the rebalance operation.
+   * @param observer The observer to notify about the rebalance progress and 
results.
+   */
+  public void rebalanceWithContext(TenantRebalanceContext 
tenantRebalanceContext,
+      ZkBasedTenantRebalanceObserver observer) {
+    LOGGER.info("Starting tenant rebalance with context: {}", 
tenantRebalanceContext);
+    TenantRebalanceConfig config = tenantRebalanceContext.getConfig();
+    ConcurrentLinkedDeque<TenantTableRebalanceJobContext> parallelQueue = 
tenantRebalanceContext.getParallelQueue();
+    Queue<TenantTableRebalanceJobContext> sequentialQueue = 
tenantRebalanceContext.getSequentialQueue();
+    ConcurrentLinkedQueue<TenantTableRebalanceJobContext> ongoingJobs = 
tenantRebalanceContext.getOngoingJobsQueue();
+
+    observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, 
null);
+
+    // ensure atleast 1 thread is created to run the sequential table 
rebalance operations
+    int parallelism = Math.max(config.getDegreeOfParallelism(), 1);
+    LOGGER.info("Spinning up {} threads for tenant rebalance job: {}", 
parallelism, tenantRebalanceContext.getJobId());
+    AtomicInteger activeThreads = new AtomicInteger(parallelism);
+    try {
+      for (int i = 0; i < parallelism; i++) {
+        _executorService.submit(() -> {
+          doConsumeTablesFromQueueAndRebalance(parallelQueue, ongoingJobs, 
config, observer);
+          // If this is the last thread to finish, start consuming the 
sequential queue
+          if (activeThreads.decrementAndGet() == 0) {
+            LOGGER.info("All parallel threads completed, starting sequential 
rebalance for job: {}",
+                tenantRebalanceContext.getJobId());
+            doConsumeTablesFromQueueAndRebalance(sequentialQueue, ongoingJobs, 
config, observer);
+            observer.onSuccess(String.format("Successfully rebalanced tenant 
%s.", config.getTenantName()));
+            LOGGER.info("Completed tenant rebalance job: {}", 
tenantRebalanceContext.getJobId());
+          }
+        });
+      }
+    } catch (Exception exception) {
+      observer.onError(String.format("Failed to rebalance the tenant %s. 
Cause: %s", config.getTenantName(),
+          exception.getMessage()));
+      LOGGER.error("Caught exception in tenant rebalance job: {}, Cause: {}", 
tenantRebalanceContext.getJobId(),
+          exception.getMessage(), exception);
+    }
+  }
+
+  /**
+   * Consumes tables from the given queue from the 
DefaultTenantRebalanceContext that is being monitored by the
+   * observer and rebalances them using the provided config.
+   * The ongoing jobs are tracked in the ongoingJobs queue, which is also from 
the monitored
+   * DefaultTenantRebalanceContext.
+   *
+   * @param queue The queue of TenantTableRebalanceJobContext to consume 
tables from.
+   * @param ongoingJobs The queue to track ongoing rebalance jobs.
+   * @param config The rebalance configuration to use for the rebalancing.
+   * @param observer The observer to notify about the rebalance progress and 
results, should be initiated with the
+   *                 DefaultTenantRebalanceContext that contains `queue` and 
`ongoingJobs`.
+   */
+  private void 
doConsumeTablesFromQueueAndRebalance(Queue<TenantTableRebalanceJobContext> 
queue,
+      Queue<TenantTableRebalanceJobContext> ongoingJobs, RebalanceConfig 
config,
+      ZkBasedTenantRebalanceObserver observer) {
+    while (true) {
+      TenantTableRebalanceJobContext jobContext = queue.poll();
+      if (jobContext == null) {
+        break;
+      }
+      ongoingJobs.add(jobContext);
+      String tableName = jobContext.getTableName();
+      String rebalanceJobId = jobContext.getJobId();
+      RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+      rebalanceConfig.setDryRun(false);
+      if (jobContext.shouldRebalanceWithDowntime()) {
+        rebalanceConfig.setMinAvailableReplicas(0);
+      }
+      try {
+        LOGGER.info("Starting rebalance for table: {} with table rebalance job 
ID: {} in tenant rebalance job: {}",
+            tableName, rebalanceJobId, observer.getJobId());
+        
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_STARTED_TRIGGER, 
tableName, rebalanceJobId);
+        // Disallow TABLE rebalance checker to retry the rebalance job here, 
since we want TENANT rebalance checker
+        // to do so
+        RebalanceResult result =
+            _tableRebalanceManager.rebalanceTable(tableName, rebalanceConfig, 
rebalanceJobId, false);
+        // TODO: For downtime=true rebalance, track if the EV-IS has converged 
to move on, otherwise it fundementally
+        //  breaks the degree of parallelism
+        if (result.getStatus().equals(RebalanceResult.Status.DONE)) {
+          LOGGER.info("Completed rebalance for table: {} with table rebalance 
job ID: {} in tenant rebalance job: {}",
+              tableName, rebalanceJobId, observer.getJobId());
+          ongoingJobs.remove(jobContext);
+          
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER, 
tableName, null);
+        } else {
+          LOGGER.warn("Rebalance for table: {} with table rebalance job ID: {} 
in tenant rebalance job: {} is not done."
+                  + "Status: {}, Description: {}", tableName, rebalanceJobId, 
observer.getJobId(), result.getStatus(),
+              result.getDescription());
+          ongoingJobs.remove(jobContext);
+          
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER, 
tableName,
+              result.getDescription());
+        }
+      } catch (Throwable t) {
+        ongoingJobs.remove(jobContext);
+        
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER, 
tableName,
+            String.format("Caught exception/error while rebalancing table: 
%s", tableName));
+      }
+    }
+  }
+
+  private Set<String> getDimensionalTables(String tenantName) {
+    Set<String> dimTables = new HashSet<>();
+    for (String table : _pinotHelixResourceManager.getAllTables()) {
+      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(table);
+      if (tableConfig == null) {
+        LOGGER.error("Unable to retrieve table config for table: {}", table);
+        continue;
+      }
+      if (tenantName.equals(tableConfig.getTenantConfig().getServer()) && 
tableConfig.isDimTable()) {
+        dimTables.add(table);
+      }
+    }
+    return dimTables;
+  }
+
+  private String createUniqueRebalanceJobIdentifier() {
+    return UUID.randomUUID().toString();
+  }
+
+  Set<String> getTenantTables(String tenantName) {
+    Set<String> tables = new HashSet<>();
+    for (String table : _pinotHelixResourceManager.getAllTables()) {
+      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(table);
+      if (tableConfig == null) {
+        LOGGER.error("Unable to retrieve table config for table: {}", table);
+        continue;
+      }
+      if (TableConfigUtils.isRelevantToTenant(tableConfig, tenantName)) {
+        tables.add(table);
+      }
+    }
+    return tables;
+  }
+
+  private static Set<String> getTablesToRunInParallel(Set<String> tables,
+      @Nullable Set<String> parallelWhitelist, @Nullable Set<String> 
parallelBlacklist) {
+    Set<String> parallelTables = new HashSet<>(tables);
+    if (parallelWhitelist != null && !parallelWhitelist.isEmpty()) {
+      parallelTables.retainAll(parallelWhitelist);
+    }
+    if (parallelBlacklist != null && !parallelBlacklist.isEmpty()) {
+      parallelTables.removeAll(parallelBlacklist);
+    }
+    return parallelTables;
+  }
+
+  @VisibleForTesting
+  Pair<ConcurrentLinkedDeque<TenantTableRebalanceJobContext>, 
Queue<TenantTableRebalanceJobContext>>
+  createParallelAndSequentialQueues(
+      TenantRebalanceConfig config, Map<String, RebalanceResult> 
dryRunResults, @Nullable Set<String> parallelWhitelist,
+      @Nullable Set<String> parallelBlacklist) {
+    Set<String> parallelTables = 
getTablesToRunInParallel(dryRunResults.keySet(), parallelWhitelist, 
parallelBlacklist);
+    Map<String, RebalanceResult> parallelTableDryRunResults = new HashMap<>();
+    Map<String, RebalanceResult> sequentialTableDryRunResults = new 
HashMap<>();
+    dryRunResults.forEach((table, result) -> {
+      if (parallelTables.contains(table)) {
+        parallelTableDryRunResults.put(table, result);
+      } else {
+        sequentialTableDryRunResults.put(table, result);
+      }
+    });
+    ConcurrentLinkedDeque<TenantTableRebalanceJobContext> parallelQueue =
+        createTableQueue(config, parallelTableDryRunResults);
+    Queue<TenantTableRebalanceJobContext> sequentialQueue = 
createTableQueue(config, sequentialTableDryRunResults);
+    return Pair.of(parallelQueue, sequentialQueue);
+  }
+
+  @VisibleForTesting
+  ConcurrentLinkedDeque<TenantTableRebalanceJobContext> 
createTableQueue(TenantRebalanceConfig config,
+      Map<String, RebalanceResult> dryRunResults) {
+    Queue<TenantTableRebalanceJobContext> firstQueue = new LinkedList<>();
+    Queue<TenantTableRebalanceJobContext> queue = new LinkedList<>();
+    Queue<TenantTableRebalanceJobContext> lastQueue = new LinkedList<>();
+    Set<String> dimTables = getDimensionalTables(config.getTenantName());
+    dryRunResults.forEach((table, dryRunResult) -> {
+      TenantTableRebalanceJobContext jobContext =
+          new TenantTableRebalanceJobContext(table, dryRunResult.getJobId(), 
dryRunResult.getRebalanceSummaryResult()
+              .getSegmentInfo()
+              .getReplicationFactor()
+              .getExpectedValueAfterRebalance() == 1);
+      if (dimTables.contains(table)) {
+        // check if the dimension table is a pure scale out or scale in.
+        // pure scale out means that only new servers are added and no servers 
are removed, vice versa
+        RebalanceSummaryResult.ServerInfo serverInfo =
+            
dryRunResults.get(table).getRebalanceSummaryResult().getServerInfo();
+        if (!serverInfo.getServersAdded().isEmpty() && 
serverInfo.getServersRemoved().isEmpty()) {
+          // dimension table's pure scale OUT should be performed BEFORE other 
regular tables so that queries involving
+          // joining with dimension table won't fail on the new servers
+          firstQueue.add(jobContext);
+        } else if (serverInfo.getServersAdded().isEmpty() && 
!serverInfo.getServersRemoved().isEmpty()) {
+          // dimension table's pure scale IN should be performed AFTER other 
regular tables so that queries involving
+          // joining with dimension table won't fail on the old servers
+          lastQueue.add(jobContext);
+        } else {
+          // the dimension table is not a pure scale out or scale in, which is 
supposed to be rebalanced manually.
+          // Pre-check should capture and warn about this case.
+          firstQueue.add(jobContext);
+        }
+      } else {
+        queue.add(jobContext);
+      }
+    });
+    ConcurrentLinkedDeque<TenantTableRebalanceJobContext> tableQueue = new 
ConcurrentLinkedDeque<>();
+    tableQueue.addAll(firstQueue);
+    tableQueue.addAll(queue);
+    tableQueue.addAll(lastQueue);
+    return tableQueue;
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
index 0d89a94d54d..9606db14b4b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
@@ -19,12 +19,12 @@
 package org.apache.pinot.controller.helix.core.rebalance.tenant;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
+import com.google.common.annotations.VisibleForTesting;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
@@ -42,20 +42,36 @@ public class ZkBasedTenantRebalanceObserver implements 
TenantRebalanceObserver {
   private final String _tenantName;
   private final List<String> _unprocessedTables;
   private final TenantRebalanceProgressStats _progressStats;
+  private final TenantRebalanceContext _tenantRebalanceContext;
   // Keep track of number of updates. Useful during debugging.
   private int _numUpdatesToZk;
+  private boolean _isDone;
 
-  public ZkBasedTenantRebalanceObserver(String jobId, String tenantName, 
Set<String> tables,
+  public ZkBasedTenantRebalanceObserver(String jobId, String tenantName, 
TenantRebalanceProgressStats progressStats,
+      TenantRebalanceContext tenantRebalanceContext,
       PinotHelixResourceManager pinotHelixResourceManager) {
-    Preconditions.checkState(tables != null && !tables.isEmpty(), "List of 
tables to observe is empty.");
+    _isDone = false;
     _jobId = jobId;
     _tenantName = tenantName;
-    _unprocessedTables = new ArrayList<>(tables);
+    _unprocessedTables = progressStats.getTableStatusMap()
+        .entrySet()
+        .stream()
+        .filter(entry -> 
entry.getValue().equals(TenantRebalanceProgressStats.TableStatus.UNPROCESSED.name()))
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toList());
+    _tenantRebalanceContext = tenantRebalanceContext;
     _pinotHelixResourceManager = pinotHelixResourceManager;
-    _progressStats = new TenantRebalanceProgressStats(tables);
+    _progressStats = progressStats;
     _numUpdatesToZk = 0;
   }
 
+  public ZkBasedTenantRebalanceObserver(String jobId, String tenantName, 
Set<String> tables,
+      TenantRebalanceContext tenantRebalanceContext,
+      PinotHelixResourceManager pinotHelixResourceManager) {
+    this(jobId, tenantName, new TenantRebalanceProgressStats(tables), 
tenantRebalanceContext,
+        pinotHelixResourceManager);
+  }
+
   @Override
   public void onTrigger(Trigger trigger, String tableName, String description) 
{
     switch (trigger) {
@@ -78,24 +94,26 @@ public class ZkBasedTenantRebalanceObserver implements 
TenantRebalanceObserver {
         break;
       default:
     }
-    trackStatsInZk();
+    syncStatsAndContextInZk();
   }
 
   @Override
   public void onSuccess(String msg) {
     _progressStats.setCompletionStatusMsg(msg);
     _progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() - 
_progressStats.getStartTimeMs()) / 1000);
-    trackStatsInZk();
+    syncStatsAndContextInZk();
+    _isDone = true;
   }
 
   @Override
   public void onError(String errorMsg) {
     _progressStats.setCompletionStatusMsg(errorMsg);
     _progressStats.setTimeToFinishInSeconds(System.currentTimeMillis() - 
_progressStats.getStartTimeMs());
-    trackStatsInZk();
+    syncStatsAndContextInZk();
+    _isDone = true;
   }
 
-  private void trackStatsInZk() {
+  private void syncStatsAndContextInZk() {
     Map<String, String> jobMetadata = new HashMap<>();
     jobMetadata.put(CommonConstants.ControllerJob.TENANT_NAME, _tenantName);
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _jobId);
@@ -107,8 +125,31 @@ public class ZkBasedTenantRebalanceObserver implements 
TenantRebalanceObserver {
     } catch (JsonProcessingException e) {
       LOGGER.error("Error serialising rebalance stats to JSON for persisting 
to ZK {}", _jobId, e);
     }
+    try {
+      jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
+          JsonUtils.objectToString(_tenantRebalanceContext));
+    } catch (JsonProcessingException e) {
+      LOGGER.error("Error serialising rebalance context to JSON for persisting 
to ZK {}", _jobId, e);
+    }
     _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata, 
ControllerJobTypes.TENANT_REBALANCE);
     _numUpdatesToZk++;
     LOGGER.debug("Number of updates to Zk: {} for rebalanceJob: {}  ", 
_numUpdatesToZk, _jobId);
   }
+
+  public boolean isDone() {
+    return _isDone;
+  }
+
+  public String getJobId() {
+    return _jobId;
+  }
+
+  public String getTenantName() {
+    return _tenantName;
+  }
+
+  @VisibleForTesting
+  void setDone(boolean isDone) {
+    _isDone = isDone;
+  }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
index 413d23c383e..c48e16a05f2 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
@@ -57,7 +57,7 @@ public class ControllerPeriodicTaskStarterStatelessTest 
extends ControllerTest {
   }
 
   private class MockControllerStarter extends ControllerStarter {
-    private static final int NUM_PERIODIC_TASKS = 13;
+    private static final int NUM_PERIODIC_TASKS = 14;
 
     public MockControllerStarter() {
       super();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceCheckerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceCheckerTest.java
new file mode 100644
index 00000000000..15c69e63673
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceCheckerTest.java
@@ -0,0 +1,556 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
+import 
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+
+
+public class TenantRebalanceCheckerTest extends ControllerTest {
+  private static final String TENANT_NAME = "TestTenant";
+  private static final String JOB_ID = "test-tenant-rebalance-job-123";
+  private static final String JOB_ID_2 = "test-tenant-rebalance-job-456";
+  private static final String ORIGINAL_JOB_ID = 
"original-tenant-rebalance-job-123";
+  private static final String TABLE_NAME_1 = "testTable1_OFFLINE";
+  private static final String TABLE_NAME_2 = "testTable2_OFFLINE";
+  private static final String NON_STUCK_TABLE_JOB_ID = 
"non-stuck-table-job-456";
+  private static final String STUCK_TABLE_JOB_ID = "stuck-table-job-456";
+  private static final String STUCK_TABLE_JOB_ID_2 = "stuck-table-job-789";
+
+  @Mock
+  private PinotHelixResourceManager _mockPinotHelixResourceManager;
+  @Mock
+  private TenantRebalancer _mockTenantRebalancer;
+  @Mock
+  private ControllerConf _mockControllerConf;
+
+  private TenantRebalanceChecker _tenantRebalanceChecker;
+  private ExecutorService _executorService;
+
+  @BeforeMethod
+  public void setUp() {
+    MockitoAnnotations.openMocks(this);
+    _executorService = Executors.newFixedThreadPool(2);
+
+    // Setup default mock behaviors
+    
when(_mockControllerConf.getTenantRebalanceCheckerFrequencyInSeconds()).thenReturn(300);
+    
when(_mockControllerConf.getTenantRebalanceCheckerInitialDelayInSeconds()).thenReturn(300L);
+    // mock ZK update success
+    doReturn(true).when(_mockPinotHelixResourceManager)
+        .addControllerJobToZK(anyString(), anyMap(), 
eq(ControllerJobTypes.TENANT_REBALANCE));
+    doReturn(true).when(_mockPinotHelixResourceManager)
+        .addControllerJobToZK(anyString(), anyMap(), 
eq(ControllerJobTypes.TENANT_REBALANCE), any());
+
+    _tenantRebalanceChecker = new TenantRebalanceChecker(
+        _mockControllerConf,
+        _mockPinotHelixResourceManager,
+        _mockTenantRebalancer
+    );
+  }
+
+  @AfterMethod
+  public void tearDown() {
+    if (_executorService != null) {
+      _executorService.shutdown();
+    }
+  }
+
+  @Test
+  public void testResumeStuckTenantRebalanceJob()
+      throws Exception {
+    // Create a stuck tenant rebalance context
+    TenantRebalanceContext stuckContext = createStuckTenantRebalanceContext();
+    TenantRebalanceProgressStats progressStats = createProgressStats();
+
+    // Mock ZK metadata for the stuck job
+    Map<String, String> jobZKMetadata = 
createTenantJobZKMetadata(stuckContext, progressStats);
+    Map<String, Map<String, String>> allJobMetadata = new HashMap<>();
+    allJobMetadata.put(JOB_ID, jobZKMetadata);
+
+    // Mock stuck table rebalance job metadata
+    Map<String, String> stuckTableJobMetadata = createStuckTableJobMetadata();
+
+    // Setup mocks
+    doReturn(allJobMetadata).when(_mockPinotHelixResourceManager)
+        .getAllJobs(eq(Set.of(ControllerJobTypes.TENANT_REBALANCE)), any());
+    doReturn(stuckTableJobMetadata).when(_mockPinotHelixResourceManager)
+        .getControllerJobZKMetadata(eq(STUCK_TABLE_JOB_ID), 
eq(ControllerJobTypes.TABLE_REBALANCE));
+
+    // Mock the tenant rebalancer to capture the resumed context
+    ArgumentCaptor<TenantRebalanceContext> contextCaptor =
+        ArgumentCaptor.forClass(TenantRebalanceContext.class);
+    ArgumentCaptor<ZkBasedTenantRebalanceObserver> observerCaptor =
+        ArgumentCaptor.forClass(ZkBasedTenantRebalanceObserver.class);
+
+    // Execute the checker
+    _tenantRebalanceChecker.runTask(new Properties());
+
+    // Verify that the tenant rebalancer was called to resume the job
+    verify(_mockTenantRebalancer, times(1)).rebalanceWithContext(
+        contextCaptor.capture(), observerCaptor.capture());
+
+    // Verify the resumed context
+    TenantRebalanceContext resumedContext = contextCaptor.getValue();
+    assertNotNull(resumedContext);
+    assertEquals(resumedContext.getAttemptId(), 
TenantRebalanceContext.INITIAL_ATTEMPT_ID + 1);
+    assertEquals(resumedContext.getOriginalJobId(), ORIGINAL_JOB_ID);
+    assertEquals(resumedContext.getAttemptId(), 2); // Should be incremented 
from 1
+    assertEquals(resumedContext.getConfig().getTenantName(), TENANT_NAME);
+
+    // Verify that the stuck table job context was moved back to parallel queue
+    TenantRebalancer.TenantTableRebalanceJobContext 
firstJobContextInParallelQueue =
+        resumedContext.getParallelQueue().poll();
+    assertNotNull(firstJobContextInParallelQueue);
+    // because the stuck job is aborted, a new job ID is generated
+    assertNotEquals(firstJobContextInParallelQueue.getJobId(), 
STUCK_TABLE_JOB_ID);
+    assertEquals(firstJobContextInParallelQueue.getTableName(), TABLE_NAME_1);
+    assertFalse(firstJobContextInParallelQueue.shouldRebalanceWithDowntime());
+    assertTrue(resumedContext.getOngoingJobsQueue().isEmpty());
+
+    // Verify the observer was created with correct parameters
+    ZkBasedTenantRebalanceObserver observer = observerCaptor.getValue();
+    assertNotNull(observer);
+  }
+
+  @Test
+  public void testResumeStuckTenantRebalanceJobWithMultipleStuckTables()
+      throws Exception {
+    // Create a context with multiple stuck table jobs
+    TenantRebalanceContext stuckContext = 
createStuckTenantRebalanceContextWithMultipleTables();
+    TenantRebalanceProgressStats progressStats = 
createProgressStatsWithMultipleTables();
+
+    // Mock ZK metadata
+    Map<String, String> jobZKMetadata = 
createTenantJobZKMetadata(stuckContext, progressStats);
+    Map<String, Map<String, String>> allJobMetadata = new HashMap<>();
+    allJobMetadata.put(JOB_ID, jobZKMetadata);
+
+    // Mock stuck table job metadata for both tables
+    Map<String, String> stuckTableJobMetadata1 = createStuckTableJobMetadata();
+    Map<String, String> stuckTableJobMetadata2 = createStuckTableJobMetadata();
+
+    // Setup mocks
+    doReturn(allJobMetadata).when(_mockPinotHelixResourceManager)
+        .getAllJobs(eq(Set.of(ControllerJobTypes.TENANT_REBALANCE)), any());
+    doReturn(stuckTableJobMetadata1).when(_mockPinotHelixResourceManager)
+        .getControllerJobZKMetadata(eq(STUCK_TABLE_JOB_ID), 
eq(ControllerJobTypes.TABLE_REBALANCE));
+    doReturn(stuckTableJobMetadata2).when(_mockPinotHelixResourceManager)
+        .getControllerJobZKMetadata(eq(STUCK_TABLE_JOB_ID_2), 
eq(ControllerJobTypes.TABLE_REBALANCE));
+
+    // Execute the checker
+    _tenantRebalanceChecker.runTask(new Properties());
+
+    // Verify that the tenant rebalancer was called
+    ArgumentCaptor<TenantRebalanceContext> contextCaptor =
+        ArgumentCaptor.forClass(TenantRebalanceContext.class);
+    verify(_mockTenantRebalancer, times(1)).rebalanceWithContext(
+        contextCaptor.capture(), any(ZkBasedTenantRebalanceObserver.class));
+
+    // Verify that both stuck table jobs were moved back to parallel queue
+    TenantRebalanceContext resumedContext = contextCaptor.getValue();
+    assertEquals(resumedContext.getAttemptId(), 
TenantRebalanceContext.INITIAL_ATTEMPT_ID + 1);
+    assertEquals(resumedContext.getParallelQueue().size(), 2);
+    assertTrue(resumedContext.getOngoingJobsQueue().isEmpty());
+  }
+
+  @Test
+  public void testDoNotResumeNonStuckTenantRebalanceJob()
+      throws Exception {
+    // Create a non-stuck tenant rebalance context (no ongoing jobs)
+    TenantRebalanceContext nonStuckContextWithoutOngoing = 
createNonStuckTenantRebalanceContextWithoutOngoing();
+    TenantRebalanceProgressStats progressStats = createProgressStats();
+
+    // Mock ZK metadata
+    Map<String, String> tenantJobZKMetadataWithoutOngoing =
+        
createTenantJobZKMetadataWithRecentTimestamp(nonStuckContextWithoutOngoing, 
progressStats);
+    Map<String, Map<String, String>> allJobMetadata = new HashMap<>();
+    allJobMetadata.put(JOB_ID, tenantJobZKMetadataWithoutOngoing);
+
+    // Setup mocks
+    doReturn(allJobMetadata).when(_mockPinotHelixResourceManager)
+        .getAllJobs(eq(Set.of(ControllerJobTypes.TENANT_REBALANCE)), any());
+
+    // Execute the checker
+    _tenantRebalanceChecker.runTask(new Properties());
+
+    // Verify that the tenant rebalancer was NOT called
+    verify(_mockTenantRebalancer, never()).rebalanceWithContext(any(), any());
+  }
+
+  @Test
+  public void testDoNotResumeNonStuckTenantRebalanceJobWithOngoing()
+      throws Exception {
+    // Create a non-stuck tenant rebalance context (no ongoing jobs)
+    TenantRebalanceContext nonStuckContextWithOngoing = 
createNonStuckTenantRebalanceContextWithOngoing();
+    TenantRebalanceProgressStats progressStats = createProgressStats();
+
+    // Mock ZK metadata
+    Map<String, String> tenantJobZKMetadataWithOngoing =
+        
createTenantJobZKMetadataWithRecentTimestamp(nonStuckContextWithOngoing, 
progressStats);
+    Map<String, Map<String, String>> allJobMetadata = new HashMap<>();
+    allJobMetadata.put(JOB_ID, tenantJobZKMetadataWithOngoing);
+
+    // Mock non-stuck table rebalance job metadata
+    Map<String, String> nonStuckTableJobMetadata = 
createNonStuckTableJobMetadata();
+
+    // Setup mocks
+    doReturn(allJobMetadata).when(_mockPinotHelixResourceManager)
+        .getAllJobs(eq(Set.of(ControllerJobTypes.TENANT_REBALANCE)), any());
+    doReturn(nonStuckTableJobMetadata).when(_mockPinotHelixResourceManager)
+        .getControllerJobZKMetadata(eq(NON_STUCK_TABLE_JOB_ID), 
eq(ControllerJobTypes.TABLE_REBALANCE));
+
+    // Setup mocks
+    doReturn(allJobMetadata).when(_mockPinotHelixResourceManager)
+        .getAllJobs(eq(Set.of(ControllerJobTypes.TENANT_REBALANCE)), any());
+
+    // Execute the checker
+    _tenantRebalanceChecker.runTask(new Properties());
+
+    // Verify that the tenant rebalancer was NOT called
+    verify(_mockTenantRebalancer, never()).rebalanceWithContext(any(), any());
+  }
+
+  @Test
+  public void testDoNotResumeTenantRebalanceJobWhileZKUpdateFailed()
+      throws Exception {
+
+    // Create a stuck tenant rebalance context
+    TenantRebalanceContext stuckContext = createStuckTenantRebalanceContext();
+    TenantRebalanceProgressStats progressStats = createProgressStats();
+
+    // Mock ZK metadata for the stuck job
+    Map<String, String> jobZKMetadata = 
createTenantJobZKMetadata(stuckContext, progressStats);
+    Map<String, Map<String, String>> allJobMetadata = new HashMap<>();
+    allJobMetadata.put(JOB_ID, jobZKMetadata);
+
+    // Mock stuck table rebalance job metadata
+    Map<String, String> stuckTableJobMetadata = createStuckTableJobMetadata();
+
+    // Setup mocks
+    doReturn(allJobMetadata).when(_mockPinotHelixResourceManager)
+        .getAllJobs(eq(Set.of(ControllerJobTypes.TENANT_REBALANCE)), any());
+    doReturn(stuckTableJobMetadata).when(_mockPinotHelixResourceManager)
+        .getControllerJobZKMetadata(eq(STUCK_TABLE_JOB_ID), 
eq(ControllerJobTypes.TABLE_REBALANCE));
+    doReturn(false).when(_mockPinotHelixResourceManager)
+        .addControllerJobToZK(anyString(), anyMap(), 
eq(ControllerJobTypes.TENANT_REBALANCE), any());
+
+    // Mock the tenant rebalancer to capture the resumed context
+    ArgumentCaptor<TenantRebalanceContext> contextCaptor =
+        ArgumentCaptor.forClass(TenantRebalanceContext.class);
+    ArgumentCaptor<ZkBasedTenantRebalanceObserver> observerCaptor =
+        ArgumentCaptor.forClass(ZkBasedTenantRebalanceObserver.class);
+
+    // Execute the checker
+    _tenantRebalanceChecker.runTask(new Properties());
+
+    // Verify that the tenant rebalancer was NOT called because ZK update 
failed
+    verify(_mockTenantRebalancer, times(0)).rebalanceWithContext(
+        contextCaptor.capture(), observerCaptor.capture());
+  }
+
+  @Test
+  public void testHandleJsonProcessingException()
+      throws Exception {
+    // Create ZK metadata with invalid JSON
+    Map<String, String> invalidJsonJobZKMetadata = new HashMap<>();
+    invalidJsonJobZKMetadata.put(CommonConstants.ControllerJob.JOB_ID, JOB_ID);
+    invalidJsonJobZKMetadata.put(CommonConstants.ControllerJob.TENANT_NAME, 
TENANT_NAME);
+    
invalidJsonJobZKMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
+        String.valueOf(System.currentTimeMillis()));
+    invalidJsonJobZKMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobTypes.TENANT_REBALANCE.name());
+    
invalidJsonJobZKMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
 "invalid json");
+    
invalidJsonJobZKMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
 "invalid json");
+
+    Map<String, Map<String, String>> allJobMetadata = new HashMap<>();
+    allJobMetadata.put(JOB_ID, invalidJsonJobZKMetadata);
+
+    // Setup mocks
+    doReturn(allJobMetadata).when(_mockPinotHelixResourceManager)
+        .getAllJobs(eq(Set.of(ControllerJobTypes.TENANT_REBALANCE)), any());
+
+    // Execute the checker - should not throw exception
+    _tenantRebalanceChecker.runTask(new Properties());
+
+    // Verify that the tenant rebalancer was NOT called
+    verify(_mockTenantRebalancer, never()).rebalanceWithContext(any(), any());
+  }
+
+  @Test
+  public void testDoNotRunMultipleTenantRebalanceRetry()
+      throws Exception {
+
+    // Create a stuck tenant rebalance context
+    TenantRebalanceContext stuckContext = createStuckTenantRebalanceContext();
+    TenantRebalanceProgressStats progressStats = createProgressStats();
+
+    // Mock ZK metadata for the stuck job
+    Map<String, Map<String, String>> allJobMetadata = new HashMap<>();
+    allJobMetadata.put(JOB_ID, createTenantJobZKMetadata(stuckContext, 
progressStats, JOB_ID));
+    allJobMetadata.put(JOB_ID_2, createTenantJobZKMetadata(stuckContext, 
progressStats, JOB_ID_2));
+
+    // Mock stuck table rebalance job metadata
+    Map<String, String> stuckTableJobMetadata = createStuckTableJobMetadata();
+
+    // Setup mocks
+    doReturn(allJobMetadata).when(_mockPinotHelixResourceManager)
+        .getAllJobs(eq(Set.of(ControllerJobTypes.TENANT_REBALANCE)), any());
+    doReturn(stuckTableJobMetadata).when(_mockPinotHelixResourceManager)
+        .getControllerJobZKMetadata(eq(STUCK_TABLE_JOB_ID), 
eq(ControllerJobTypes.TABLE_REBALANCE));
+
+    // Mock the tenant rebalancer to capture the resumed context
+    ArgumentCaptor<TenantRebalanceContext> contextCaptor =
+        ArgumentCaptor.forClass(TenantRebalanceContext.class);
+    ArgumentCaptor<ZkBasedTenantRebalanceObserver> observerCaptor =
+        ArgumentCaptor.forClass(ZkBasedTenantRebalanceObserver.class);
+
+    // Execute the checker
+    _tenantRebalanceChecker.runTask(new Properties());
+
+    // Verify that the tenant rebalancer was called to resume the job
+    verify(_mockTenantRebalancer, times(1)).rebalanceWithContext(
+        contextCaptor.capture(), observerCaptor.capture());
+    // The mockTenantRebalance never let the job done
+    assertFalse(observerCaptor.getValue().isDone());
+
+    _tenantRebalanceChecker.runTask(new Properties());
+    // Since the previous job is not done, the rebalanceWithContext should not 
be called again as we have set the limit
+    // to one tenant rebalance retry at a time
+    verify(_mockTenantRebalancer, times(1)).rebalanceWithContext(
+        contextCaptor.capture(), observerCaptor.capture());
+
+    // Mark the job as done and run the checker again - should pick up another 
job now
+    observerCaptor.getValue().setDone(true);
+    _tenantRebalanceChecker.runTask(new Properties());
+
+    verify(_mockTenantRebalancer, times(2)).rebalanceWithContext(
+        contextCaptor.capture(), observerCaptor.capture());
+  }
+
+  // Helper methods to create test data
+
+  private TenantRebalanceContext createStuckTenantRebalanceContext() {
+    TenantRebalanceConfig config = new TenantRebalanceConfig();
+    config.setTenantName(TENANT_NAME);
+    config.setHeartbeatTimeoutInMs(300000L); // 5 minutes
+
+    ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext> 
parallelQueue =
+        new ConcurrentLinkedDeque<>();
+    ConcurrentLinkedQueue<TenantRebalancer.TenantTableRebalanceJobContext> 
ongoingJobsQueue =
+        new ConcurrentLinkedQueue<>();
+
+    // Add a stuck table job to ongoing queue
+    TenantRebalancer.TenantTableRebalanceJobContext stuckJobContext =
+        new TenantRebalancer.TenantTableRebalanceJobContext(TABLE_NAME_1, 
STUCK_TABLE_JOB_ID, false);
+    ongoingJobsQueue.add(stuckJobContext);
+
+    return new TenantRebalanceContext(
+        ORIGINAL_JOB_ID, config, 1, parallelQueue,
+        new ConcurrentLinkedQueue<>(), ongoingJobsQueue);
+  }
+
+  private TenantRebalanceContext 
createStuckTenantRebalanceContextWithMultipleTables()
+      throws JsonProcessingException {
+    TenantRebalanceConfig config = new TenantRebalanceConfig();
+    config.setTenantName(TENANT_NAME);
+    config.setHeartbeatTimeoutInMs(300000L);
+
+    ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext> 
parallelQueue =
+        new ConcurrentLinkedDeque<>();
+    ConcurrentLinkedQueue<TenantRebalancer.TenantTableRebalanceJobContext> 
ongoingJobsQueue =
+        new ConcurrentLinkedQueue<>();
+
+    // Add multiple stuck table jobs to ongoing queue
+    ongoingJobsQueue.add(new 
TenantRebalancer.TenantTableRebalanceJobContext(TABLE_NAME_1, 
STUCK_TABLE_JOB_ID, false));
+    ongoingJobsQueue.add(
+        new TenantRebalancer.TenantTableRebalanceJobContext(TABLE_NAME_2, 
STUCK_TABLE_JOB_ID_2, false));
+
+    return new TenantRebalanceContext(
+        ORIGINAL_JOB_ID, config, 1, parallelQueue,
+        new ConcurrentLinkedQueue<>(), ongoingJobsQueue);
+  }
+
+  private TenantRebalanceContext 
createNonStuckTenantRebalanceContextWithoutOngoing()
+      throws JsonProcessingException {
+    TenantRebalanceConfig config = new TenantRebalanceConfig();
+    config.setTenantName(TENANT_NAME);
+    config.setHeartbeatTimeoutInMs(300000L);
+
+    ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext> 
parallelQueue =
+        new ConcurrentLinkedDeque<>();
+    // Add some jobs to parallel queue but none to ongoing queue
+    parallelQueue.add(new 
TenantRebalancer.TenantTableRebalanceJobContext(TABLE_NAME_1, 
NON_STUCK_TABLE_JOB_ID, false));
+
+    return new TenantRebalanceContext(
+        ORIGINAL_JOB_ID, config, 1, parallelQueue,
+        new ConcurrentLinkedQueue<>(), new ConcurrentLinkedQueue<>());
+  }
+
+  private TenantRebalanceContext 
createNonStuckTenantRebalanceContextWithOngoing()
+      throws JsonProcessingException {
+    TenantRebalanceConfig config = new TenantRebalanceConfig();
+    config.setTenantName(TENANT_NAME);
+    config.setHeartbeatTimeoutInMs(300000L);
+
+    ConcurrentLinkedQueue<TenantRebalancer.TenantTableRebalanceJobContext> 
ongoing =
+        new ConcurrentLinkedQueue<>();
+    // Add some jobs to parallel queue but none to ongoing queue
+    ongoing.add(new 
TenantRebalancer.TenantTableRebalanceJobContext(TABLE_NAME_1, 
NON_STUCK_TABLE_JOB_ID, false));
+
+    return new TenantRebalanceContext(
+        ORIGINAL_JOB_ID, config, 1, new ConcurrentLinkedDeque<>(),
+        new ConcurrentLinkedQueue<>(), ongoing);
+  }
+
+  private TenantRebalanceContext createRecentTenantRebalanceContext()
+      throws JsonProcessingException {
+    TenantRebalanceConfig config = new TenantRebalanceConfig();
+    config.setTenantName(TENANT_NAME);
+    config.setHeartbeatTimeoutInMs(300000L);
+
+    return new TenantRebalanceContext(
+        ORIGINAL_JOB_ID, config, 1,
+        new ConcurrentLinkedDeque<>(), new ConcurrentLinkedQueue<>(), new 
ConcurrentLinkedQueue<>());
+  }
+
+  private TenantRebalanceProgressStats createProgressStats() {
+    Set<String> tables = new HashSet<>();
+    tables.add(TABLE_NAME_1);
+    tables.add(TABLE_NAME_2);
+
+    TenantRebalanceProgressStats stats = new 
TenantRebalanceProgressStats(tables);
+    stats.setStartTimeMs(System.currentTimeMillis() - 60000); // 1 minute ago
+    stats.updateTableStatus(TABLE_NAME_1, 
TenantRebalanceProgressStats.TableStatus.PROCESSING.name());
+    stats.updateTableStatus(TABLE_NAME_2, 
TenantRebalanceProgressStats.TableStatus.UNPROCESSED.name());
+
+    return stats;
+  }
+
+  private TenantRebalanceProgressStats createProgressStatsWithMultipleTables() 
{
+    Set<String> tables = new HashSet<>();
+    tables.add(TABLE_NAME_1);
+    tables.add(TABLE_NAME_2);
+
+    TenantRebalanceProgressStats stats = new 
TenantRebalanceProgressStats(tables);
+    stats.setStartTimeMs(System.currentTimeMillis() - 60000);
+    stats.updateTableStatus(TABLE_NAME_1, 
TenantRebalanceProgressStats.TableStatus.PROCESSING.name());
+    stats.updateTableStatus(TABLE_NAME_2, 
TenantRebalanceProgressStats.TableStatus.PROCESSING.name());
+
+    return stats;
+  }
+
+  private Map<String, String> createTenantJobZKMetadata(TenantRebalanceContext 
context,
+      TenantRebalanceProgressStats progressStats)
+      throws JsonProcessingException {
+    return createTenantJobZKMetadata(context, progressStats, JOB_ID);
+  }
+
+  private Map<String, String> createTenantJobZKMetadata(TenantRebalanceContext 
context,
+      TenantRebalanceProgressStats progressStats, String jobId)
+      throws JsonProcessingException {
+    Map<String, String> metadata = new HashMap<>();
+    metadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
+    metadata.put(CommonConstants.ControllerJob.TENANT_NAME, TENANT_NAME);
+    metadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
+        String.valueOf(System.currentTimeMillis() - 400000)); // 6+ minutes 
ago (beyond heartbeat timeout)
+    metadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobTypes.TENANT_REBALANCE.name());
+    metadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
+        JsonUtils.objectToString(context));
+    
metadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+        JsonUtils.objectToString(progressStats));
+
+    return metadata;
+  }
+
+  private Map<String, String> 
createTenantJobZKMetadataWithRecentTimestamp(TenantRebalanceContext context,
+      TenantRebalanceProgressStats progressStats)
+      throws JsonProcessingException {
+    Map<String, String> metadata = createTenantJobZKMetadata(context, 
progressStats);
+    metadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
+        String.valueOf(System.currentTimeMillis() - 60000)); // 1 minute ago 
(within heartbeat timeout)
+    return metadata;
+  }
+
+  private Map<String, String> createStuckTableJobMetadata()
+      throws JsonProcessingException {
+    Map<String, String> metadata = new HashMap<>();
+    metadata.put(CommonConstants.ControllerJob.JOB_ID, STUCK_TABLE_JOB_ID);
+    metadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
+        String.valueOf(System.currentTimeMillis() - 400000)); // 6+ minutes ago
+
+    // Create stuck table rebalance progress stats
+    TableRebalanceProgressStats tableStats = new TableRebalanceProgressStats();
+    tableStats.setStatus(RebalanceResult.Status.IN_PROGRESS);
+    tableStats.setStartTimeMs(System.currentTimeMillis() - 400000);
+
+    // Create table rebalance context
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setHeartbeatTimeoutInMs(300000L);
+    TableRebalanceContext tableContext = 
TableRebalanceContext.forInitialAttempt(
+        "original-table-job", rebalanceConfig, true);
+
+    
metadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+        JsonUtils.objectToString(tableStats));
+    metadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
+        JsonUtils.objectToString(tableContext));
+
+    return metadata;
+  }
+
+  private Map<String, String> createNonStuckTableJobMetadata()
+      throws JsonProcessingException {
+    Map<String, String> metadata = createStuckTableJobMetadata();
+    metadata.put(CommonConstants.ControllerJob.JOB_ID, NON_STUCK_TABLE_JOB_ID);
+    metadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
+        String.valueOf(System.currentTimeMillis() - 60000)); // 1 minutes ago
+
+    return metadata;
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
index 0b2f099c53e..d033b9c6a4f 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
@@ -28,7 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.apache.commons.lang3.tuple.Pair;
@@ -92,8 +92,8 @@ public class TenantRebalancerTest extends ControllerTest {
       addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + 
i, true);
     }
 
-    DefaultTenantRebalancer tenantRebalancer =
-        new DefaultTenantRebalancer(_tableRebalanceManager, 
_helixResourceManager, _executorService);
+    TenantRebalancer tenantRebalancer =
+        new TenantRebalancer(_tableRebalanceManager, _helixResourceManager, 
_executorService);
 
     // tag all servers and brokers to test tenant
     addTenantTagToInstances(TENANT_NAME);
@@ -161,8 +161,8 @@ public class TenantRebalancerTest extends ControllerTest {
       addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + 
i, true);
     }
 
-    DefaultTenantRebalancer tenantRebalancer =
-        new DefaultTenantRebalancer(_tableRebalanceManager, 
_helixResourceManager, _executorService);
+    TenantRebalancer tenantRebalancer =
+        new TenantRebalancer(_tableRebalanceManager, _helixResourceManager, 
_executorService);
 
     // tag all servers and brokers to test tenant
     addTenantTagToInstances(TENANT_NAME);
@@ -289,8 +289,8 @@ public class TenantRebalancerTest extends ControllerTest {
     addDummySchema(tableNameD);
     addDummySchema(tableNameE);
 
-    DefaultTenantRebalancer tenantRebalancer =
-        new DefaultTenantRebalancer(_tableRebalanceManager, 
_helixResourceManager, _executorService);
+    TenantRebalancer tenantRebalancer =
+        new TenantRebalancer(_tableRebalanceManager, _helixResourceManager, 
_executorService);
 
     // table A set tenantConfig.tenants.server to tenantName
     // SHOULD be selected as tenant's table
@@ -359,8 +359,8 @@ public class TenantRebalancerTest extends ControllerTest {
       addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + 
i, true);
     }
 
-    DefaultTenantRebalancer tenantRebalancer =
-        new DefaultTenantRebalancer(_tableRebalanceManager, 
_helixResourceManager, _executorService);
+    TenantRebalancer tenantRebalancer =
+        new TenantRebalancer(_tableRebalanceManager, _helixResourceManager, 
_executorService);
 
     // tag all servers and brokers to test tenant
     addTenantTagToInstances(TENANT_NAME);
@@ -442,7 +442,7 @@ public class TenantRebalancerTest extends ControllerTest {
     assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
 
     // set table B in parallel blacklist, so that it ends up in sequential 
queue, and table A in parallel queue
-    
Pair<ConcurrentLinkedQueue<TenantRebalancer.TenantTableRebalanceJobContext>,
+    
Pair<ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext>,
         Queue<TenantRebalancer.TenantTableRebalanceJobContext>>
         queues =
         tenantRebalancer.createParallelAndSequentialQueues(config, 
dryRunResult.getRebalanceTableResults(), null,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to