This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ed5e306c0fd Add TenantRebalanceChecker for improved tenant rebalance
monitoring (#16455)
ed5e306c0fd is described below
commit ed5e306c0fd006c0d52e993d4f7c5cb2b11dcab9
Author: Jhow <[email protected]>
AuthorDate: Sat Aug 30 00:41:51 2025 -0700
Add TenantRebalanceChecker for improved tenant rebalance monitoring (#16455)
---
.../pinot/controller/BaseControllerStarter.java | 19 +-
.../apache/pinot/controller/ControllerConf.java | 18 +
.../helix/core/PinotHelixResourceManager.java | 1 +
.../rebalance/tenant/DefaultTenantRebalancer.java | 288 -----------
.../rebalance/tenant/TenantRebalanceChecker.java | 330 ++++++++++++
.../rebalance/tenant/TenantRebalanceContext.java | 130 +++++
.../tenant/TenantRebalanceProgressStats.java | 14 +-
.../core/rebalance/tenant/TenantRebalancer.java | 332 +++++++++++-
.../tenant/ZkBasedTenantRebalanceObserver.java | 61 ++-
...ControllerPeriodicTaskStarterStatelessTest.java | 2 +-
.../tenant/TenantRebalanceCheckerTest.java | 556 +++++++++++++++++++++
.../rebalance/tenant/TenantRebalancerTest.java | 20 +-
12 files changed, 1451 insertions(+), 320 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 8ea4efd2113..8a9a8c7817a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -107,7 +107,7 @@ import
org.apache.pinot.controller.helix.core.rebalance.RebalanceChecker;
import org.apache.pinot.controller.helix.core.rebalance.RebalancePreChecker;
import
org.apache.pinot.controller.helix.core.rebalance.RebalancePreCheckerFactory;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
-import
org.apache.pinot.controller.helix.core.rebalance.tenant.DefaultTenantRebalancer;
+import
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceChecker;
import
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalancer;
import org.apache.pinot.controller.helix.core.relocation.SegmentRelocator;
import org.apache.pinot.controller.helix.core.retention.RetentionManager;
@@ -584,7 +584,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
new TableRebalanceManager(_helixResourceManager, _controllerMetrics,
_rebalancePreChecker, _tableSizeReader,
_rebalancerExecutorService);
_tenantRebalancer =
- new DefaultTenantRebalancer(_tableRebalanceManager,
_helixResourceManager, _rebalancerExecutorService);
+ new TenantRebalancer(_tableRebalanceManager, _helixResourceManager,
_rebalancerExecutorService);
// Setting up periodic tasks
List<PeriodicTask> controllerPeriodicTasks =
setupControllerPeriodicTasks();
@@ -930,6 +930,9 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
PeriodicTask resourceUtilizationChecker = new
ResourceUtilizationChecker(_config, _connectionManager,
_controllerMetrics, _utilizationCheckers, _executorService,
_helixResourceManager);
periodicTasks.add(resourceUtilizationChecker);
+ PeriodicTask tenantRebalanceChecker =
+ new TenantRebalanceChecker(_config, _helixResourceManager,
_tenantRebalancer);
+ periodicTasks.add(tenantRebalanceChecker);
return periodicTasks;
}
@@ -943,12 +946,16 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
LOGGER.info("Creating TaskManager with class: {}", taskManagerClass);
try {
return PluginManager.get().createInstance(taskManagerClass,
- new Class[]{PinotHelixTaskResourceManager.class,
PinotHelixResourceManager.class, LeadControllerManager.class,
+ new Class[]{
+ PinotHelixTaskResourceManager.class,
PinotHelixResourceManager.class, LeadControllerManager.class,
ControllerConf.class, ControllerMetrics.class,
TaskManagerStatusCache.class,
- Executor.class, PoolingHttpClientConnectionManager.class,
ResourceUtilizationManager.class},
- new Object[]{_helixTaskResourceManager, _helixResourceManager,
_leadControllerManager,
+ Executor.class, PoolingHttpClientConnectionManager.class,
ResourceUtilizationManager.class
+ },
+ new Object[]{
+ _helixTaskResourceManager, _helixResourceManager,
_leadControllerManager,
_config, _controllerMetrics, _taskManagerStatusCache,
_executorService,
- _connectionManager, _resourceUtilizationManager});
+ _connectionManager, _resourceUtilizationManager
+ });
} catch (Exception e) {
LOGGER.error("Failed to create task manager with class: {}",
taskManagerClass, e);
throw new RuntimeException("Failed to create task manager with class: "
+ taskManagerClass, e);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index c90f2617363..1a849cab307 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -242,6 +242,10 @@ public class ControllerConf extends PinotConfiguration {
"controller.segmentRelocator.initialDelayInSeconds";
public static final String REBALANCE_CHECKER_INITIAL_DELAY_IN_SECONDS =
"controller.rebalanceChecker.initialDelayInSeconds";
+ public static final String TENANT_REBALANCE_CHECKER_FREQUENCY_PERIOD =
+ "controller.tenant.rebalance.checker.frequencyPeriod";
+ public static final String
TENANT_REBALANCE_CHECKER_INITIAL_DELAY_IN_SECONDS =
+ "controller.tenant.rebalance.checker.initialDelayInSeconds";
// The flag to indicate if controller periodic job will fix the missing
LLC segment deep store copy.
// Default value is false.
@@ -278,6 +282,7 @@ public class ControllerConf extends PinotConfiguration {
public static final int
DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
public static final int DEFAULT_STATUS_CHECKER_FREQUENCY_IN_SECONDS = 5 *
60; // 5 minutes
public static final int DEFAULT_REBALANCE_CHECKER_FREQUENCY_IN_SECONDS = 5
* 60; // 5 minutes
+ public static final int
DEFAULT_TENANT_REBALANCE_CHECKER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes
public static final int DEFAULT_TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS
= 5 * 60; // 5 minutes
public static final int
DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes
public static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; //
Disabled
@@ -732,6 +737,19 @@ public class ControllerConf extends PinotConfiguration {
ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
}
+ public int getTenantRebalanceCheckerFrequencyInSeconds() {
+ return
Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.TENANT_REBALANCE_CHECKER_FREQUENCY_PERIOD))
+ .filter(period -> isValidPeriodWithLogging(
+
ControllerPeriodicTasksConf.TENANT_REBALANCE_CHECKER_FREQUENCY_PERIOD, period))
+ .map(period -> (int) convertPeriodToSeconds(period))
+
.orElse(ControllerPeriodicTasksConf.DEFAULT_TENANT_REBALANCE_CHECKER_FREQUENCY_IN_SECONDS);
+ }
+
+ public long getTenantRebalanceCheckerInitialDelayInSeconds() {
+ return
getProperty(ControllerPeriodicTasksConf.TENANT_REBALANCE_CHECKER_INITIAL_DELAY_IN_SECONDS,
+ ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
+ }
+
public int getRealtimeConsumerMonitorRunFrequency() {
return
Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.RT_CONSUMER_MONITOR_FREQUENCY_PERIOD))
.map(period -> (int) convertPeriodToSeconds(period))
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 0c415a3a48c..97e2cb0a908 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2482,6 +2482,7 @@ public class PinotHelixResourceManager {
* @param jobId job's UUID
* @param jobMetadata the job metadata
* @param jobType the type of the job to figure out where job metadata is
kept in ZK
+ * @param prevJobMetadataChecker an additional check to see if there's a
need to update
* @return boolean representing success / failure of the ZK write step
*/
public boolean addControllerJobToZK(String jobId, Map<String, String>
jobMetadata, ControllerJobType jobType,
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
deleted file mode 100644
index 749afc72e2f..00000000000
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core.rebalance.tenant;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.annotation.Nullable;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pinot.common.exception.TableNotFoundException;
-import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
-import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
-import org.apache.pinot.controller.helix.core.rebalance.RebalanceSummaryResult;
-import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
-import org.apache.pinot.segment.local.utils.TableConfigUtils;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class DefaultTenantRebalancer implements TenantRebalancer {
- private static final Logger LOGGER =
LoggerFactory.getLogger(DefaultTenantRebalancer.class);
- private final TableRebalanceManager _tableRebalanceManager;
- private final PinotHelixResourceManager _pinotHelixResourceManager;
- private final ExecutorService _executorService;
-
- public DefaultTenantRebalancer(TableRebalanceManager tableRebalanceManager,
- PinotHelixResourceManager pinotHelixResourceManager, ExecutorService
executorService) {
- _tableRebalanceManager = tableRebalanceManager;
- _pinotHelixResourceManager = pinotHelixResourceManager;
- _executorService = executorService;
- }
-
- @Override
- public TenantRebalanceResult rebalance(TenantRebalanceConfig config) {
- Map<String, RebalanceResult> dryRunResults = new HashMap<>();
-
- // Step 1: Select the tables to include in this rebalance operation
-
- Set<String> tables = getTenantTables(config.getTenantName());
- Set<String> includeTables = config.getIncludeTables();
- if (!includeTables.isEmpty()) {
- tables.retainAll(includeTables);
- }
- tables.removeAll(config.getExcludeTables());
-
- // Step 2: Dry-run over the selected tables to get the dry-run rebalance
results. The result is to be sent as
- // response to the user, and their summaries are needed for scheduling the
job queue later
-
- tables.forEach(table -> {
- try {
- RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
- rebalanceConfig.setDryRun(true);
- dryRunResults.put(table,
- _tableRebalanceManager.rebalanceTableDryRun(table,
rebalanceConfig, createUniqueRebalanceJobIdentifier()));
- } catch (TableNotFoundException exception) {
- dryRunResults.put(table, new RebalanceResult(null,
RebalanceResult.Status.FAILED, exception.getMessage(),
- null, null, null, null, null));
- }
- });
-
- // If dry-run was set, return the dry-run results and the job is done here
- if (config.isDryRun()) {
- return new TenantRebalanceResult(null, dryRunResults,
config.isVerboseResult());
- }
-
- // Step 3: Create two queues--parallel and sequential and schedule the
tables to these queues based on the
- // parallelWhitelist and parallelBlacklist, also their dry-run results.
For each table, a job context is created
- // and put in the queue for the consuming threads to pick up and run the
rebalance operation
-
- String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier();
- TenantRebalanceObserver observer = new
ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, config.getTenantName(),
- tables, _pinotHelixResourceManager);
- observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null,
null);
- Pair<ConcurrentLinkedQueue<TenantTableRebalanceJobContext>,
Queue<TenantTableRebalanceJobContext>> queues =
- createParallelAndSequentialQueues(config, dryRunResults,
config.getParallelWhitelist(),
- config.getParallelBlacklist());
- ConcurrentLinkedQueue<TenantTableRebalanceJobContext> parallelQueue =
queues.getLeft();
- Queue<TenantTableRebalanceJobContext> sequentialQueue = queues.getRight();
-
- // Step 4: Spin up threads to consume the parallel queue and sequential
queue.
-
- // ensure atleast 1 thread is created to run the sequential table
rebalance operations
- int parallelism = Math.max(config.getDegreeOfParallelism(), 1);
- AtomicInteger activeThreads = new AtomicInteger(parallelism);
- try {
- for (int i = 0; i < parallelism; i++) {
- _executorService.submit(() -> {
- doConsumeTablesFromQueue(parallelQueue, config, observer);
- if (activeThreads.decrementAndGet() == 0) {
- doConsumeTablesFromQueue(sequentialQueue, config, observer);
- observer.onSuccess(String.format("Successfully rebalanced tenant
%s.", config.getTenantName()));
- }
- });
- }
- } catch (Exception exception) {
- observer.onError(String.format("Failed to rebalance the tenant %s.
Cause: %s", config.getTenantName(),
- exception.getMessage()));
- }
-
- // Step 5: Prepare the rebalance results to be returned to the user. The
rebalance jobs are running in the
- // background asynchronously.
-
- Map<String, RebalanceResult> rebalanceResults = new HashMap<>();
- for (String table : dryRunResults.keySet()) {
- RebalanceResult result = dryRunResults.get(table);
- if (result.getStatus() == RebalanceResult.Status.DONE) {
- rebalanceResults.put(table, new RebalanceResult(result.getJobId(),
RebalanceResult.Status.IN_PROGRESS,
- "In progress, check controller task status for the",
result.getInstanceAssignment(),
- result.getTierInstanceAssignment(), result.getSegmentAssignment(),
result.getPreChecksResult(),
- result.getRebalanceSummaryResult()));
- } else {
- rebalanceResults.put(table, dryRunResults.get(table));
- }
- }
- return new TenantRebalanceResult(tenantRebalanceJobId, rebalanceResults,
config.isVerboseResult());
- }
-
- private void doConsumeTablesFromQueue(Queue<TenantTableRebalanceJobContext>
queue, RebalanceConfig config,
- TenantRebalanceObserver observer) {
- while (true) {
- TenantTableRebalanceJobContext jobContext = queue.poll();
- if (jobContext == null) {
- break;
- }
- String table = jobContext.getTableName();
- RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
- rebalanceConfig.setDryRun(false);
- if (jobContext.shouldRebalanceWithDowntime()) {
- rebalanceConfig.setMinAvailableReplicas(0);
- }
- rebalanceTable(table, rebalanceConfig, jobContext.getJobId(), observer);
- }
- }
-
- private Set<String> getDimensionalTables(String tenantName) {
- Set<String> dimTables = new HashSet<>();
- for (String table : _pinotHelixResourceManager.getAllTables()) {
- TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(table);
- if (tableConfig == null) {
- LOGGER.error("Unable to retrieve table config for table: {}", table);
- continue;
- }
- if (tenantName.equals(tableConfig.getTenantConfig().getServer()) &&
tableConfig.isDimTable()) {
- dimTables.add(table);
- }
- }
- return dimTables;
- }
-
- private String createUniqueRebalanceJobIdentifier() {
- return UUID.randomUUID().toString();
- }
-
- Set<String> getTenantTables(String tenantName) {
- Set<String> tables = new HashSet<>();
- for (String table : _pinotHelixResourceManager.getAllTables()) {
- TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(table);
- if (tableConfig == null) {
- LOGGER.error("Unable to retrieve table config for table: {}", table);
- continue;
- }
- if (TableConfigUtils.isRelevantToTenant(tableConfig, tenantName)) {
- tables.add(table);
- }
- }
- return tables;
- }
-
- private void rebalanceTable(String tableName, RebalanceConfig config, String
rebalanceJobId,
- TenantRebalanceObserver observer) {
- try {
-
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_STARTED_TRIGGER,
tableName, rebalanceJobId);
- RebalanceResult result =
_tableRebalanceManager.rebalanceTable(tableName, config, rebalanceJobId, true);
- // TODO: For downtime=true rebalance, track if the EV-IS has converged
to move on, otherwise it fundementally
- // breaks the degree of parallelism
- if (result.getStatus().equals(RebalanceResult.Status.DONE)) {
-
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER,
tableName, null);
- } else {
-
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER,
tableName,
- result.getDescription());
- }
- } catch (Throwable t) {
-
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER,
tableName,
- String.format("Caught exception/error while rebalancing table: %s",
tableName));
- }
- }
-
- private static Set<String> getTablesToRunInParallel(Set<String> tables,
- @Nullable Set<String> parallelWhitelist, @Nullable Set<String>
parallelBlacklist) {
- Set<String> parallelTables = new HashSet<>(tables);
- if (parallelWhitelist != null && !parallelWhitelist.isEmpty()) {
- parallelTables.retainAll(parallelWhitelist);
- }
- if (parallelBlacklist != null && !parallelBlacklist.isEmpty()) {
- parallelTables.removeAll(parallelBlacklist);
- }
- return parallelTables;
- }
-
- @VisibleForTesting
- Pair<ConcurrentLinkedQueue<TenantTableRebalanceJobContext>,
Queue<TenantTableRebalanceJobContext>>
- createParallelAndSequentialQueues(
- TenantRebalanceConfig config, Map<String, RebalanceResult>
dryRunResults, @Nullable Set<String> parallelWhitelist,
- @Nullable Set<String> parallelBlacklist) {
- Set<String> parallelTables =
getTablesToRunInParallel(dryRunResults.keySet(), parallelWhitelist,
parallelBlacklist);
- Map<String, RebalanceResult> parallelTableDryRunResults = new HashMap<>();
- Map<String, RebalanceResult> sequentialTableDryRunResults = new
HashMap<>();
- dryRunResults.forEach((table, result) -> {
- if (parallelTables.contains(table)) {
- parallelTableDryRunResults.put(table, result);
- } else {
- sequentialTableDryRunResults.put(table, result);
- }
- });
- ConcurrentLinkedQueue<TenantTableRebalanceJobContext> parallelQueue =
- createTableQueue(config, parallelTableDryRunResults);
- Queue<TenantTableRebalanceJobContext> sequentialQueue =
createTableQueue(config, sequentialTableDryRunResults);
- return Pair.of(parallelQueue, sequentialQueue);
- }
-
- @VisibleForTesting
- ConcurrentLinkedQueue<TenantTableRebalanceJobContext>
createTableQueue(TenantRebalanceConfig config,
- Map<String, RebalanceResult> dryRunResults) {
- Queue<TenantTableRebalanceJobContext> firstQueue = new LinkedList<>();
- Queue<TenantTableRebalanceJobContext> queue = new LinkedList<>();
- Queue<TenantTableRebalanceJobContext> lastQueue = new LinkedList<>();
- Set<String> dimTables = getDimensionalTables(config.getTenantName());
- dryRunResults.forEach((table, dryRunResult) -> {
- TenantTableRebalanceJobContext jobContext =
- new TenantTableRebalanceJobContext(table, dryRunResult.getJobId(),
dryRunResult.getRebalanceSummaryResult()
- .getSegmentInfo()
- .getReplicationFactor()
- .getExpectedValueAfterRebalance() == 1);
- if (dimTables.contains(table)) {
- // check if the dimension table is a pure scale out or scale in.
- // pure scale out means that only new servers are added and no servers
are removed, vice versa
- RebalanceSummaryResult.ServerInfo serverInfo =
-
dryRunResults.get(table).getRebalanceSummaryResult().getServerInfo();
- if (!serverInfo.getServersAdded().isEmpty() &&
serverInfo.getServersRemoved().isEmpty()) {
- // dimension table's pure scale OUT should be performed BEFORE other
regular tables so that queries involving
- // joining with dimension table won't fail on the new servers
- firstQueue.add(jobContext);
- } else if (serverInfo.getServersAdded().isEmpty() &&
!serverInfo.getServersRemoved().isEmpty()) {
- // dimension table's pure scale IN should be performed AFTER other
regular tables so that queries involving
- // joining with dimension table won't fail on the old servers
- lastQueue.add(jobContext);
- } else {
- // the dimension table is not a pure scale out or scale in, which is
supposed to be rebalanced manually.
- // Pre-check should capture and warn about this case.
- firstQueue.add(jobContext);
- }
- } else {
- queue.add(jobContext);
- }
- });
- ConcurrentLinkedQueue<TenantTableRebalanceJobContext> tableQueue = new
ConcurrentLinkedQueue<>();
- tableQueue.addAll(firstQueue);
- tableQueue.addAll(queue);
- tableQueue.addAll(lastQueue);
- return tableQueue;
- }
-}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java
new file mode 100644
index 00000000000..954a68f5425
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
+import org.apache.pinot.core.periodictask.BasePeriodicTask;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Periodic task to check if tenant rebalance jobs are stuck and retry them.
Controller crashes or restarts could
+ * make a tenant rebalance job stuck.
+ * The task checks for each tenant rebalance job's metadata in ZK, look at the
TenantTableRebalanceJobContext in
+ * ongoingJobsQueue, which is a list with table rebalance job ids that the
controller is currently processing, to see
+ * if any of these table rebalance jobs has not updated their progress stats
longer than the configured heartbeat
+ * timeout. If so, the tenant rebalance job is considered stuck, and the task
will resume the tenant rebalance job by
+ * aborting all the ongoing table rebalance jobs, move the ongoing
TenantTableRebalanceJobContext back to the head of
+ * the parallel queue, and then trigger the tenant rebalance job again with
the updated context, with an attempt job ID
+ * <p>
+ * Notice that fundamentally this is not a retry but a resume, since we will
not re-do the table rebalance for those
+ * tables that have already been processed.
+ */
+public class TenantRebalanceChecker extends BasePeriodicTask {
+ private final static String TASK_NAME =
TenantRebalanceChecker.class.getSimpleName();
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TenantRebalanceChecker.class);
+ private final TenantRebalancer _tenantRebalancer;
+ private final PinotHelixResourceManager _pinotHelixResourceManager;
+ // To avoid multiple retries of tenant rebalance job on one controller, we
only allow one ongoing retry job at a time.
+ private ZkBasedTenantRebalanceObserver _ongoingJobObserver = null;
+
+ public TenantRebalanceChecker(ControllerConf config,
+ PinotHelixResourceManager pinotHelixResourceManager, TenantRebalancer
tenantRebalancer) {
+ super(TASK_NAME, config.getTenantRebalanceCheckerFrequencyInSeconds(),
+ config.getTenantRebalanceCheckerInitialDelayInSeconds());
+ _pinotHelixResourceManager = pinotHelixResourceManager;
+ _tenantRebalancer = tenantRebalancer;
+ }
+
+ @Override
+ protected void runTask(Properties periodicTaskProperties) {
+ if (_ongoingJobObserver == null || _ongoingJobObserver.isDone()) {
+ checkAndRetryTenantRebalance();
+ } else {
+ LOGGER.info("Skip checking tenant rebalance jobs as there's an ongoing
retry job: {} for tenant: {}",
+ _ongoingJobObserver.getJobId(), _ongoingJobObserver.getTenantName());
+ }
+ }
+
+ private void checkAndRetryTenantRebalance() {
+ Map<String, Map<String, String>> allJobMetadataByJobId =
+
_pinotHelixResourceManager.getAllJobs(Set.of(ControllerJobTypes.TENANT_REBALANCE),
x -> true);
+ for (Map.Entry<String, Map<String, String>> entry :
allJobMetadataByJobId.entrySet()) {
+ String jobId = entry.getKey();
+ Map<String, String> jobZKMetadata = entry.getValue();
+
+ try {
+ // Check if the tenant rebalance job is stuck
+ String tenantRebalanceContextStr =
+
jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT);
+ String tenantRebalanceProgressStatsStr =
+
jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+ if (StringUtils.isEmpty(tenantRebalanceContextStr) ||
StringUtils.isEmpty(tenantRebalanceProgressStatsStr)) {
+ // Skip rebalance job: {} as it has no job context or progress stats
+ LOGGER.info("Skip checking tenant rebalance job: {} as it has no job
context or progress stats", jobId);
+ continue;
+ }
+ TenantRebalanceContext tenantRebalanceContext =
+ JsonUtils.stringToObject(tenantRebalanceContextStr,
TenantRebalanceContext.class);
+ TenantRebalanceProgressStats progressStats =
+ JsonUtils.stringToObject(tenantRebalanceProgressStatsStr,
TenantRebalanceProgressStats.class);
+ long statsUpdatedAt =
Long.parseLong(jobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
+
+ TenantRebalanceContext retryTenantRebalanceContext =
+ prepareRetryIfTenantRebalanceJobStuck(jobZKMetadata,
tenantRebalanceContext, statsUpdatedAt);
+ if (retryTenantRebalanceContext != null) {
+ TenantRebalancer.TenantTableRebalanceJobContext ctx;
+ while ((ctx =
retryTenantRebalanceContext.getOngoingJobsQueue().poll()) != null) {
+ abortTableRebalanceJob(ctx.getTableName());
+ // the existing table rebalance job is aborted, we need to run the
rebalance job with a new job ID.
+ TenantRebalancer.TenantTableRebalanceJobContext newCtx =
+ new TenantRebalancer.TenantTableRebalanceJobContext(
+ ctx.getTableName(), UUID.randomUUID().toString(),
ctx.shouldRebalanceWithDowntime());
+ retryTenantRebalanceContext.getParallelQueue().addFirst(newCtx);
+ }
+ // the retry tenant rebalance job id has been created in ZK, we can
safely mark the original job as
+ // aborted, so that this original job will not be picked up again in
the future.
+ markTenantRebalanceJobAsAborted(jobId, jobZKMetadata,
tenantRebalanceContext, progressStats);
+ retryTenantRebalanceJob(retryTenantRebalanceContext, progressStats);
+ // We only retry one stuck tenant rebalance job at a time to avoid
multiple retries of the same job
+ return;
+ } else {
+ LOGGER.info("Tenant rebalance job: {} is not stuck", jobId);
+ }
+ } catch (JsonProcessingException e) {
+ // If we cannot parse the job metadata, we skip this job
+ LOGGER.warn("Failed to parse tenant rebalance context for job: {},
skipping", jobId);
+ }
+ }
+ }
+
+ private void retryTenantRebalanceJob(TenantRebalanceContext
tenantRebalanceContextForRetry,
+ TenantRebalanceProgressStats progressStats) {
+ ZkBasedTenantRebalanceObserver observer =
+ new
ZkBasedTenantRebalanceObserver(tenantRebalanceContextForRetry.getJobId(),
+ tenantRebalanceContextForRetry.getConfig().getTenantName(),
+ progressStats, tenantRebalanceContextForRetry,
_pinotHelixResourceManager);
+ _ongoingJobObserver = observer;
+ _tenantRebalancer.rebalanceWithContext(tenantRebalanceContextForRetry,
observer);
+ }
+
+ /**
+ * Check if the tenant rebalance job is stuck, and prepare to retry it if
necessary.
+ * A tenant rebalance job is considered stuck if:
+ * <ol>
+ * <li>There are no ongoing jobs, but there are jobs in the parallel or
sequential queue, and the stats have not
+ * been updated for longer than the heartbeat timeout.</li>
+ * <li>There are ongoing table rebalance jobs, and at least one of them
has not updated its status for longer
+ * than the heartbeat timeout.</li>
+ * </ol>
+ * We cannot simply check whether the tenant rebalance job's metadata was
updated within the heartbeat timeout to
+ * determine if this tenant rebalance job is stuck, because a tenant
rebalance job can have no updates for a
+ * longer period if the underlying table rebalance jobs take extra long time
to finish, while they individually do
+ * update regularly within heartbeat timeout.
+ * The retry is prepared by creating a new tenant rebalance job with an
incremented attempt ID, and persisting it to
+ * ZK.
+ *
+ * @param jobZKMetadata The ZK metadata of the tenant rebalance job.
+ * @param tenantRebalanceContext The context of the tenant rebalance job.
+ * @param statsUpdatedAt The timestamp when the stats were last updated.
+ * @return The TenantRebalanceContext for retry if the tenant rebalance job
is stuck and should be retried, null if
+ * the job is not stuck, or it's stuck but other controller has prepared the
retry first.
+ */
+ private TenantRebalanceContext prepareRetryIfTenantRebalanceJobStuck(
+ Map<String, String> jobZKMetadata, TenantRebalanceContext
tenantRebalanceContext, long statsUpdatedAt) {
+ boolean isStuck = false;
+ String stuckTableRebalanceJobId = null;
+ long heartbeatTimeoutMs =
tenantRebalanceContext.getConfig().getHeartbeatTimeoutInMs();
+ if (tenantRebalanceContext.getOngoingJobsQueue().isEmpty()) {
+ if (!tenantRebalanceContext.getParallelQueue().isEmpty() ||
!tenantRebalanceContext.getSequentialQueue()
+ .isEmpty()) {
+ // If there are no ongoing jobs but in parallel or sequential queue,
it could be because the tenant rebalancer
+ // is in the interval of the previous done job and consumption of the
next job. We need to check the heartbeat
+ // timeout to be sure that it's actually stuck at this state for a
long while.
+ isStuck = (System.currentTimeMillis() - statsUpdatedAt >=
heartbeatTimeoutMs);
+ LOGGER.info(
+ "No ongoing table rebalance jobs for tenant: {}, but there are
jobs in parallel queue size: {}, "
+ + "sequential queue size: {}, stats last updated at: {},
isStuck: {}",
+ tenantRebalanceContext.getConfig().getTenantName(),
tenantRebalanceContext.getParallelQueue().size(),
+ tenantRebalanceContext.getSequentialQueue().size(),
statsUpdatedAt, isStuck);
+ }
+ } else {
+ // Check if there's any stuck ongoing table rebalance jobs
+ for (TenantRebalancer.TenantTableRebalanceJobContext ctx : new
ArrayList<>(
+ tenantRebalanceContext.getOngoingJobsQueue())) {
+ if (isOngoingTableRebalanceJobStuck(ctx.getJobId(), statsUpdatedAt,
heartbeatTimeoutMs)) {
+ isStuck = true;
+ stuckTableRebalanceJobId = ctx.getJobId();
+ break;
+ }
+ }
+ }
+ if (isStuck) {
+ // If any of the table rebalance jobs is stuck, we consider the tenant
rebalance job as stuck.
+ // We need to make sure only one controller instance retries the tenant
rebalance job.
+ TenantRebalanceContext retryTenantRebalanceContext =
+
TenantRebalanceContext.forRetry(tenantRebalanceContext.getOriginalJobId(),
+ tenantRebalanceContext.getConfig(),
tenantRebalanceContext.getAttemptId() + 1,
+ tenantRebalanceContext.getParallelQueue(),
tenantRebalanceContext.getSequentialQueue(),
+ tenantRebalanceContext.getOngoingJobsQueue());
+ try {
+
jobZKMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
+ JsonUtils.objectToString(retryTenantRebalanceContext));
+ // this returns false if the retryJob already exists in ZK, which
means another controller has already
+ // prepared the retry, so we should not retry again.
+ boolean shouldRetry =
+
_pinotHelixResourceManager.addControllerJobToZK(retryTenantRebalanceContext.getJobId(),
jobZKMetadata,
+ ControllerJobTypes.TENANT_REBALANCE, Objects::isNull);
+ if (!shouldRetry) {
+ LOGGER.info("Another controller has already prepared the retry job:
{} for tenant: {}. Do not retry.",
+ stuckTableRebalanceJobId,
tenantRebalanceContext.getConfig().getTenantName());
+ return null;
+ }
+ LOGGER.info("Found and prepared to retry stuck table rebalance job: {}
for tenant: {}.",
+ stuckTableRebalanceJobId,
tenantRebalanceContext.getConfig().getTenantName());
+ return retryTenantRebalanceContext;
+ } catch (JsonProcessingException e) {
+ LOGGER.error(
+ "Error serialising rebalance context to JSON for updating
retryTenantRebalanceContext to ZK {}",
+ tenantRebalanceContext.getJobId(), e);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Check if the table rebalance job in tenant job's ongoing queue is stuck.
+ * A table rebalance job is considered stuck if:
+ * <ol>
+ * <li> The job metadata does not exist in ZK, and the tenant rebalance job
stats have not been updated for longer
+ * than the heartbeat timeout.</li>
+ * <li> The job metadata exists, but it has not been updated for longer than
the heartbeat timeout.</li>
+ * </ol>
+ * This is different to how we consider a table rebalance job is stuck in
+ * {@link
org.apache.pinot.controller.helix.core.rebalance.RebalanceChecker}, where we
consider a table rebalance
+ * job stuck even if it's DONE, ABORTED, FAILED, or CANCELLED for more than
heartbeat timeout, because they should
+ * have been removed from the ongoing queue once they are DONE or ABORTED
etc., if the controller is working properly.
+ *
+ * @param jobId The ID of the table rebalance job.
+ * @param tenantRebalanceJobStatsUpdatedAt The timestamp when the tenant
rebalance job stats were last updated.
+ * @param heartbeatTimeoutMs The heartbeat timeout in milliseconds.
+ * @return True if the table rebalance job is stuck, false otherwise.
+ */
+ private boolean isOngoingTableRebalanceJobStuck(String jobId, long
tenantRebalanceJobStatsUpdatedAt,
+ long heartbeatTimeoutMs) {
+ Map<String, String> jobMetadata =
+ _pinotHelixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobTypes.TABLE_REBALANCE);
+ if (jobMetadata == null) {
+ // if the table rebalance job metadata has not created for the table
rebalance job id in ongoingJobsQueue
+ // longer than heartbeat timeout, the controller may have crashed or
restarted
+ return System.currentTimeMillis() - tenantRebalanceJobStatsUpdatedAt >=
heartbeatTimeoutMs;
+ }
+ long statsUpdatedAt =
Long.parseLong(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
+
+ if (System.currentTimeMillis() - statsUpdatedAt >= heartbeatTimeoutMs) {
+ LOGGER.info("Found stuck table rebalance job: {} that has not updated
its status in ZK within "
+ + "heartbeat timeout: {}", jobId, heartbeatTimeoutMs);
+ return true;
+ }
+ return false;
+ }
+
+ private void abortTableRebalanceJob(String tableNameWithType) {
+ // TODO: This is a duplicate of a private method in RebalanceChecker, we
should refactor it to a common place.
+ boolean updated =
+ _pinotHelixResourceManager.updateJobsForTable(tableNameWithType,
ControllerJobTypes.TABLE_REBALANCE,
+ jobMetadata -> {
+ String jobId =
jobMetadata.get(CommonConstants.ControllerJob.JOB_ID);
+ try {
+ String jobStatsInStr =
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+ TableRebalanceProgressStats jobStats =
+ JsonUtils.stringToObject(jobStatsInStr,
TableRebalanceProgressStats.class);
+ if (jobStats.getStatus() !=
RebalanceResult.Status.IN_PROGRESS) {
+ return;
+ }
+ LOGGER.info("Abort rebalance job: {} for table: {}", jobId,
tableNameWithType);
+ jobStats.setStatus(RebalanceResult.Status.ABORTED);
+
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+ JsonUtils.objectToString(jobStats));
+ } catch (Exception e) {
+ LOGGER.error("Failed to abort rebalance job: {} for table:
{}", jobId, tableNameWithType, e);
+ }
+ });
+ LOGGER.info("Tried to abort existing jobs at best effort and done: {}",
updated);
+ }
+
+ /**
+ * Mark the tenant rebalance job as aborted by updating the progress stats
and clearing the queues in the context,
+ * then update the updated job metadata to ZK. The tables that are
unprocessed will be marked as CANCELLED, and the
+ * tables that are processing will be marked as ABORTED in progress stats.
+ *
+ * @param jobId The ID of the tenant rebalance job.
+ * @param jobMetadata The metadata of the tenant rebalance job.
+ * @param tenantRebalanceContext The context of the tenant rebalance job.
+ * @param progressStats The progress stats of the tenant rebalance job.
+ */
+ private void markTenantRebalanceJobAsAborted(String jobId, Map<String,
String> jobMetadata,
+ TenantRebalanceContext tenantRebalanceContext,
+ TenantRebalanceProgressStats progressStats) {
+ LOGGER.info("Marking tenant rebalance job: {} as aborted", jobId);
+ TenantRebalanceProgressStats abortedProgressStats = new
TenantRebalanceProgressStats(progressStats);
+ for (Map.Entry<String, String> entry :
abortedProgressStats.getTableStatusMap().entrySet()) {
+ if (Objects.equals(entry.getValue(),
TenantRebalanceProgressStats.TableStatus.UNPROCESSED.name())) {
+
entry.setValue(TenantRebalanceProgressStats.TableStatus.CANCELLED.name());
+ } else if (Objects.equals(entry.getValue(),
+ TenantRebalanceProgressStats.TableStatus.PROCESSING.name())) {
+
entry.setValue(TenantRebalanceProgressStats.TableStatus.ABORTED.name());
+ }
+ }
+ tenantRebalanceContext.getSequentialQueue().clear();
+ tenantRebalanceContext.getParallelQueue().clear();
+ tenantRebalanceContext.getOngoingJobsQueue().clear();
+ try {
+
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+ JsonUtils.objectToString(abortedProgressStats));
+ } catch (JsonProcessingException e) {
+ LOGGER.error("Error serialising rebalance stats to JSON for marking
tenant rebalance job as aborted {}", jobId,
+ e);
+ }
+ try {
+ jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
+ JsonUtils.objectToString(tenantRebalanceContext));
+ } catch (JsonProcessingException e) {
+ LOGGER.error("Error serialising rebalance context to JSON for marking
tenant rebalance job as aborted {}", jobId,
+ e);
+ }
+ _pinotHelixResourceManager.addControllerJobToZK(jobId, jobMetadata,
ControllerJobTypes.TENANT_REBALANCE);
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java
new file mode 100644
index 00000000000..a5ef73a74a1
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+
+/**
+ * Default implementation of TenantRebalanceContext that includes parallel and
sequential queues
+ * for managing tenant rebalance operations. This context is synchronized to
ZK by `ZkBasedTenantRebalanceObserver`
+ * to ensure consistency across controller restarts.
+ */
+public class TenantRebalanceContext {
+ protected static final int INITIAL_ATTEMPT_ID = 1;
+ @JsonProperty("jobId")
+ private final String _jobId;
+ @JsonProperty("originalJobId")
+ private final String _originalJobId;
+ @JsonProperty("config")
+ private final TenantRebalanceConfig _config;
+ @JsonProperty("attemptId")
+ private final int _attemptId;
+ // Ongoing jobs queue and parallel queue are accessed concurrently by
multiple threads, where each worker thread
+ // consumes a tenant-table-rebalance-job from the parallel queue, adds it to
the ongoing jobs queue, processes it.
+ // On the other hand, only a single thread consumes from the sequential
queue.
+ private final
ConcurrentLinkedQueue<TenantRebalancer.TenantTableRebalanceJobContext>
_ongoingJobsQueue;
+ private final
ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext>
_parallelQueue;
+ private final Queue<TenantRebalancer.TenantTableRebalanceJobContext>
_sequentialQueue;
+
+ // Default constructor for JSON deserialization
+ public TenantRebalanceContext() {
+ _jobId = null;
+ _originalJobId = null;
+ _config = null;
+ _attemptId = INITIAL_ATTEMPT_ID;
+ _parallelQueue = new ConcurrentLinkedDeque<>();
+ _sequentialQueue = new LinkedList<>();
+ _ongoingJobsQueue = new ConcurrentLinkedQueue<>();
+ }
+
+ public TenantRebalanceContext(String originalJobId, TenantRebalanceConfig
config, int attemptId,
+ ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext>
parallelQueue,
+ Queue<TenantRebalancer.TenantTableRebalanceJobContext> sequentialQueue,
+ ConcurrentLinkedQueue<TenantRebalancer.TenantTableRebalanceJobContext>
ongoingJobsQueue) {
+ _jobId = createAttemptJobId(originalJobId, attemptId);
+ _originalJobId = originalJobId;
+ _config = config;
+ _attemptId = attemptId;
+ _parallelQueue = new ConcurrentLinkedDeque<>(parallelQueue);
+ _sequentialQueue = new LinkedList<>(sequentialQueue);
+ _ongoingJobsQueue = new ConcurrentLinkedQueue<>(ongoingJobsQueue);
+ }
+
+ public static TenantRebalanceContext forInitialRebalance(String
originalJobId, TenantRebalanceConfig config,
+ ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext>
parallelQueue,
+ Queue<TenantRebalancer.TenantTableRebalanceJobContext> sequentialQueue) {
+ return new TenantRebalanceContext(originalJobId, config,
INITIAL_ATTEMPT_ID,
+ parallelQueue, sequentialQueue, new ConcurrentLinkedQueue<>());
+ }
+
+ public static TenantRebalanceContext forRetry(String originalJobId,
TenantRebalanceConfig config,
+ int attemptId,
ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext>
parallelQueue,
+ Queue<TenantRebalancer.TenantTableRebalanceJobContext> sequentialQueue,
+ ConcurrentLinkedQueue<TenantRebalancer.TenantTableRebalanceJobContext>
ongoingJobsQueue) {
+ return new TenantRebalanceContext(originalJobId, config, attemptId,
+ parallelQueue, sequentialQueue, ongoingJobsQueue);
+ }
+
+ public
ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext>
getParallelQueue() {
+ return _parallelQueue;
+ }
+
+ public Queue<TenantRebalancer.TenantTableRebalanceJobContext>
getSequentialQueue() {
+ return _sequentialQueue;
+ }
+
+ public
ConcurrentLinkedQueue<TenantRebalancer.TenantTableRebalanceJobContext>
getOngoingJobsQueue() {
+ return _ongoingJobsQueue;
+ }
+
+ public int getAttemptId() {
+ return _attemptId;
+ }
+
+ public String getOriginalJobId() {
+ return _originalJobId;
+ }
+
+ public String getJobId() {
+ return _jobId;
+ }
+
+ public TenantRebalanceConfig getConfig() {
+ return _config;
+ }
+
+ public String toString() {
+ return "TenantRebalanceContext{" + "jobId='" + getJobId() + '\'' + ",
originalJobId='" + getOriginalJobId()
+ + '\'' + ", attemptId=" + getAttemptId() + ", parallelQueueSize="
+ + getParallelQueue().size() + ", sequentialQueueSize=" +
getSequentialQueue().size() + ", ongoingJobsQueueSize="
+ + getOngoingJobsQueue().size() + '}';
+ }
+
+ private static String createAttemptJobId(String originalJobId, int
attemptId) {
+ if (attemptId == INITIAL_ATTEMPT_ID) {
+ return originalJobId;
+ }
+ return originalJobId + "_" + attemptId;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceProgressStats.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceProgressStats.java
index fb77100ecee..7bad045c0b7 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceProgressStats.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceProgressStats.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.controller.helix.core.rebalance.tenant;
+import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -42,12 +43,23 @@ public class TenantRebalanceProgressStats {
}
public TenantRebalanceProgressStats(Set<String> tables) {
+ Preconditions.checkState(tables != null && !tables.isEmpty(), "List of
tables to observe is empty.");
_tableStatusMap = tables.stream()
.collect(Collectors.toMap(Function.identity(), k ->
TableStatus.UNPROCESSED.name()));
_totalTables = tables.size();
_remainingTables = _totalTables;
}
+ public TenantRebalanceProgressStats(TenantRebalanceProgressStats other) {
+ _tableStatusMap = new HashMap<>(other._tableStatusMap);
+ _tableRebalanceJobIdMap.putAll(other._tableRebalanceJobIdMap);
+ _totalTables = other._totalTables;
+ _remainingTables = other._remainingTables;
+ _startTimeMs = other._startTimeMs;
+ _timeToFinishInSeconds = other._timeToFinishInSeconds;
+ _completionStatusMsg = other._completionStatusMsg;
+ }
+
public Map<String, String> getTableStatusMap() {
return _tableStatusMap;
}
@@ -109,6 +121,6 @@ public class TenantRebalanceProgressStats {
}
public enum TableStatus {
- UNPROCESSED, PROCESSING, PROCESSED
+ UNPROCESSED, PROCESSING, PROCESSED, ABORTED, CANCELLED
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
index 95149868067..1f2cb74f47a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
@@ -18,10 +18,48 @@
*/
package org.apache.pinot.controller.helix.core.rebalance.tenant;
-public interface TenantRebalancer {
- TenantRebalanceResult rebalance(TenantRebalanceConfig config);
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceSummaryResult;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
- class TenantTableRebalanceJobContext {
+
+public class TenantRebalancer {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TenantRebalancer.class);
+ private final TableRebalanceManager _tableRebalanceManager;
+ private final PinotHelixResourceManager _pinotHelixResourceManager;
+ private final ExecutorService _executorService;
+
+ public TenantRebalancer(TableRebalanceManager tableRebalanceManager,
+ PinotHelixResourceManager pinotHelixResourceManager, ExecutorService
executorService) {
+ _tableRebalanceManager = tableRebalanceManager;
+ _pinotHelixResourceManager = pinotHelixResourceManager;
+ _executorService = executorService;
+ }
+
+ public static class TenantTableRebalanceJobContext {
private final String _tableName;
private final String _jobId;
// Whether the rebalance should be done with downtime or
minAvailableReplicas=0.
@@ -35,7 +73,9 @@ public interface TenantRebalancer {
* @param withDowntime Whether the rebalance should be done with downtime
or minAvailableReplicas=0.
* @return The result of the rebalance operation.
*/
- public TenantTableRebalanceJobContext(String tableName, String jobId,
boolean withDowntime) {
+ @JsonCreator
+ public TenantTableRebalanceJobContext(@JsonProperty("tableName") String
tableName,
+ @JsonProperty("jobId") String jobId, @JsonProperty("withDowntime")
boolean withDowntime) {
_tableName = tableName;
_jobId = jobId;
_withDowntime = withDowntime;
@@ -49,8 +89,292 @@ public interface TenantRebalancer {
return _tableName;
}
+ @JsonProperty("withDowntime")
public boolean shouldRebalanceWithDowntime() {
return _withDowntime;
}
}
+
+ public TenantRebalanceResult rebalance(TenantRebalanceConfig config) {
+ Map<String, RebalanceResult> dryRunResults = new HashMap<>();
+
+ // Step 1: Select the tables to include in this rebalance operation
+
+ Set<String> tables = getTenantTables(config.getTenantName());
+ Set<String> includeTables = config.getIncludeTables();
+ if (!includeTables.isEmpty()) {
+ tables.retainAll(includeTables);
+ }
+ tables.removeAll(config.getExcludeTables());
+
+ // Step 2: Dry-run over the selected tables to get the dry-run rebalance
results. The result is to be sent as
+ // response to the user, and their summaries are needed for scheduling the
job queue later
+
+ tables.forEach(table -> {
+ try {
+ RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+ rebalanceConfig.setDryRun(true);
+ dryRunResults.put(table,
+ _tableRebalanceManager.rebalanceTableDryRun(table,
rebalanceConfig, createUniqueRebalanceJobIdentifier()));
+ } catch (TableNotFoundException exception) {
+ dryRunResults.put(table, new RebalanceResult(null,
RebalanceResult.Status.FAILED, exception.getMessage(),
+ null, null, null, null, null));
+ }
+ });
+
+ // If dry-run was set, return the dry-run results and the job is done here
+ if (config.isDryRun()) {
+ return new TenantRebalanceResult(null, dryRunResults,
config.isVerboseResult());
+ }
+
+ // Step 3: Create two queues--parallel and sequential and schedule the
tables to these queues based on the
+ // parallelWhitelist and parallelBlacklist, also their dry-run results.
For each table, a job context is created
+ // and put in the queue for the consuming threads to pick up and run the
rebalance operation
+
+ String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier();
+ Pair<ConcurrentLinkedDeque<TenantTableRebalanceJobContext>,
Queue<TenantTableRebalanceJobContext>> queues =
+ createParallelAndSequentialQueues(config, dryRunResults,
config.getParallelWhitelist(),
+ config.getParallelBlacklist());
+ ConcurrentLinkedDeque<TenantTableRebalanceJobContext> parallelQueue =
queues.getLeft();
+ Queue<TenantTableRebalanceJobContext> sequentialQueue = queues.getRight();
+ TenantRebalanceContext tenantRebalanceContext =
+ TenantRebalanceContext.forInitialRebalance(tenantRebalanceJobId,
config, parallelQueue,
+ sequentialQueue);
+
+ ZkBasedTenantRebalanceObserver observer =
+ new ZkBasedTenantRebalanceObserver(tenantRebalanceContext.getJobId(),
config.getTenantName(),
+ tables, tenantRebalanceContext, _pinotHelixResourceManager);
+ // Step 4: Spin up threads to consume the parallel queue and sequential
queue.
+ rebalanceWithContext(tenantRebalanceContext, observer);
+
+ // Step 5: Prepare the rebalance results to be returned to the user. The
rebalance jobs are running in the
+ // background asynchronously.
+
+ Map<String, RebalanceResult> rebalanceResults = new HashMap<>();
+ for (String table : dryRunResults.keySet()) {
+ RebalanceResult result = dryRunResults.get(table);
+ if (result.getStatus() == RebalanceResult.Status.DONE) {
+ rebalanceResults.put(table, new RebalanceResult(result.getJobId(),
RebalanceResult.Status.IN_PROGRESS,
+ "In progress, check controller task status for the",
result.getInstanceAssignment(),
+ result.getTierInstanceAssignment(), result.getSegmentAssignment(),
result.getPreChecksResult(),
+ result.getRebalanceSummaryResult()));
+ } else {
+ rebalanceResults.put(table, dryRunResults.get(table));
+ }
+ }
+ return new TenantRebalanceResult(tenantRebalanceJobId, rebalanceResults,
config.isVerboseResult());
+ }
+
+ /**
+ * Spins up threads to rebalance the tenant with the given context and
observer.
+ * The rebalance operation is performed in parallel for the tables in the
parallel queue, then, sequentially for the
+ * tables in the sequential queue.
+ * The observer should be initiated with the tenantRebalanceContext in order
to track the progress properly.
+ *
+ * @param tenantRebalanceContext The context containing the configuration
and queues for the rebalance operation.
+ * @param observer The observer to notify about the rebalance progress and
results.
+ */
+ public void rebalanceWithContext(TenantRebalanceContext
tenantRebalanceContext,
+ ZkBasedTenantRebalanceObserver observer) {
+ LOGGER.info("Starting tenant rebalance with context: {}",
tenantRebalanceContext);
+ TenantRebalanceConfig config = tenantRebalanceContext.getConfig();
+ ConcurrentLinkedDeque<TenantTableRebalanceJobContext> parallelQueue =
tenantRebalanceContext.getParallelQueue();
+ Queue<TenantTableRebalanceJobContext> sequentialQueue =
tenantRebalanceContext.getSequentialQueue();
+ ConcurrentLinkedQueue<TenantTableRebalanceJobContext> ongoingJobs =
tenantRebalanceContext.getOngoingJobsQueue();
+
+ observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null,
null);
+
+ // ensure atleast 1 thread is created to run the sequential table
rebalance operations
+ int parallelism = Math.max(config.getDegreeOfParallelism(), 1);
+ LOGGER.info("Spinning up {} threads for tenant rebalance job: {}",
parallelism, tenantRebalanceContext.getJobId());
+ AtomicInteger activeThreads = new AtomicInteger(parallelism);
+ try {
+ for (int i = 0; i < parallelism; i++) {
+ _executorService.submit(() -> {
+ doConsumeTablesFromQueueAndRebalance(parallelQueue, ongoingJobs,
config, observer);
+ // If this is the last thread to finish, start consuming the
sequential queue
+ if (activeThreads.decrementAndGet() == 0) {
+ LOGGER.info("All parallel threads completed, starting sequential
rebalance for job: {}",
+ tenantRebalanceContext.getJobId());
+ doConsumeTablesFromQueueAndRebalance(sequentialQueue, ongoingJobs,
config, observer);
+ observer.onSuccess(String.format("Successfully rebalanced tenant
%s.", config.getTenantName()));
+ LOGGER.info("Completed tenant rebalance job: {}",
tenantRebalanceContext.getJobId());
+ }
+ });
+ }
+ } catch (Exception exception) {
+ observer.onError(String.format("Failed to rebalance the tenant %s.
Cause: %s", config.getTenantName(),
+ exception.getMessage()));
+ LOGGER.error("Caught exception in tenant rebalance job: {}, Cause: {}",
tenantRebalanceContext.getJobId(),
+ exception.getMessage(), exception);
+ }
+ }
+
+ /**
+ * Consumes tables from the given queue from the
DefaultTenantRebalanceContext that is being monitored by the
+ * observer and rebalances them using the provided config.
+ * The ongoing jobs are tracked in the ongoingJobs queue, which is also from
the monitored
+ * DefaultTenantRebalanceContext.
+ *
+ * @param queue The queue of TenantTableRebalanceJobContext to consume
tables from.
+ * @param ongoingJobs The queue to track ongoing rebalance jobs.
+ * @param config The rebalance configuration to use for the rebalancing.
+ * @param observer The observer to notify about the rebalance progress and
results, should be initiated with the
+ * DefaultTenantRebalanceContext that contains `queue` and
`ongoingJobs`.
+ */
+ private void
doConsumeTablesFromQueueAndRebalance(Queue<TenantTableRebalanceJobContext>
queue,
+ Queue<TenantTableRebalanceJobContext> ongoingJobs, RebalanceConfig
config,
+ ZkBasedTenantRebalanceObserver observer) {
+ while (true) {
+ TenantTableRebalanceJobContext jobContext = queue.poll();
+ if (jobContext == null) {
+ break;
+ }
+ ongoingJobs.add(jobContext);
+ String tableName = jobContext.getTableName();
+ String rebalanceJobId = jobContext.getJobId();
+ RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+ rebalanceConfig.setDryRun(false);
+ if (jobContext.shouldRebalanceWithDowntime()) {
+ rebalanceConfig.setMinAvailableReplicas(0);
+ }
+ try {
+ LOGGER.info("Starting rebalance for table: {} with table rebalance job
ID: {} in tenant rebalance job: {}",
+ tableName, rebalanceJobId, observer.getJobId());
+
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_STARTED_TRIGGER,
tableName, rebalanceJobId);
+ // Disallow TABLE rebalance checker to retry the rebalance job here,
since we want TENANT rebalance checker
+ // to do so
+ RebalanceResult result =
+ _tableRebalanceManager.rebalanceTable(tableName, rebalanceConfig,
rebalanceJobId, false);
+ // TODO: For downtime=true rebalance, track if the EV-IS has converged
to move on, otherwise it fundementally
+ // breaks the degree of parallelism
+ if (result.getStatus().equals(RebalanceResult.Status.DONE)) {
+ LOGGER.info("Completed rebalance for table: {} with table rebalance
job ID: {} in tenant rebalance job: {}",
+ tableName, rebalanceJobId, observer.getJobId());
+ ongoingJobs.remove(jobContext);
+
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER,
tableName, null);
+ } else {
+ LOGGER.warn("Rebalance for table: {} with table rebalance job ID: {}
in tenant rebalance job: {} is not done."
+ + "Status: {}, Description: {}", tableName, rebalanceJobId,
observer.getJobId(), result.getStatus(),
+ result.getDescription());
+ ongoingJobs.remove(jobContext);
+
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER,
tableName,
+ result.getDescription());
+ }
+ } catch (Throwable t) {
+ ongoingJobs.remove(jobContext);
+
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER,
tableName,
+ String.format("Caught exception/error while rebalancing table:
%s", tableName));
+ }
+ }
+ }
+
+ private Set<String> getDimensionalTables(String tenantName) {
+ Set<String> dimTables = new HashSet<>();
+ for (String table : _pinotHelixResourceManager.getAllTables()) {
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(table);
+ if (tableConfig == null) {
+ LOGGER.error("Unable to retrieve table config for table: {}", table);
+ continue;
+ }
+ if (tenantName.equals(tableConfig.getTenantConfig().getServer()) &&
tableConfig.isDimTable()) {
+ dimTables.add(table);
+ }
+ }
+ return dimTables;
+ }
+
+ private String createUniqueRebalanceJobIdentifier() {
+ return UUID.randomUUID().toString();
+ }
+
+ Set<String> getTenantTables(String tenantName) {
+ Set<String> tables = new HashSet<>();
+ for (String table : _pinotHelixResourceManager.getAllTables()) {
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(table);
+ if (tableConfig == null) {
+ LOGGER.error("Unable to retrieve table config for table: {}", table);
+ continue;
+ }
+ if (TableConfigUtils.isRelevantToTenant(tableConfig, tenantName)) {
+ tables.add(table);
+ }
+ }
+ return tables;
+ }
+
+ private static Set<String> getTablesToRunInParallel(Set<String> tables,
+ @Nullable Set<String> parallelWhitelist, @Nullable Set<String>
parallelBlacklist) {
+ Set<String> parallelTables = new HashSet<>(tables);
+ if (parallelWhitelist != null && !parallelWhitelist.isEmpty()) {
+ parallelTables.retainAll(parallelWhitelist);
+ }
+ if (parallelBlacklist != null && !parallelBlacklist.isEmpty()) {
+ parallelTables.removeAll(parallelBlacklist);
+ }
+ return parallelTables;
+ }
+
+ @VisibleForTesting
+ Pair<ConcurrentLinkedDeque<TenantTableRebalanceJobContext>,
Queue<TenantTableRebalanceJobContext>>
+ createParallelAndSequentialQueues(
+ TenantRebalanceConfig config, Map<String, RebalanceResult>
dryRunResults, @Nullable Set<String> parallelWhitelist,
+ @Nullable Set<String> parallelBlacklist) {
+ Set<String> parallelTables =
getTablesToRunInParallel(dryRunResults.keySet(), parallelWhitelist,
parallelBlacklist);
+ Map<String, RebalanceResult> parallelTableDryRunResults = new HashMap<>();
+ Map<String, RebalanceResult> sequentialTableDryRunResults = new
HashMap<>();
+ dryRunResults.forEach((table, result) -> {
+ if (parallelTables.contains(table)) {
+ parallelTableDryRunResults.put(table, result);
+ } else {
+ sequentialTableDryRunResults.put(table, result);
+ }
+ });
+ ConcurrentLinkedDeque<TenantTableRebalanceJobContext> parallelQueue =
+ createTableQueue(config, parallelTableDryRunResults);
+ Queue<TenantTableRebalanceJobContext> sequentialQueue =
createTableQueue(config, sequentialTableDryRunResults);
+ return Pair.of(parallelQueue, sequentialQueue);
+ }
+
+ @VisibleForTesting
+ ConcurrentLinkedDeque<TenantTableRebalanceJobContext>
createTableQueue(TenantRebalanceConfig config,
+ Map<String, RebalanceResult> dryRunResults) {
+ Queue<TenantTableRebalanceJobContext> firstQueue = new LinkedList<>();
+ Queue<TenantTableRebalanceJobContext> queue = new LinkedList<>();
+ Queue<TenantTableRebalanceJobContext> lastQueue = new LinkedList<>();
+ Set<String> dimTables = getDimensionalTables(config.getTenantName());
+ dryRunResults.forEach((table, dryRunResult) -> {
+ TenantTableRebalanceJobContext jobContext =
+ new TenantTableRebalanceJobContext(table, dryRunResult.getJobId(),
dryRunResult.getRebalanceSummaryResult()
+ .getSegmentInfo()
+ .getReplicationFactor()
+ .getExpectedValueAfterRebalance() == 1);
+ if (dimTables.contains(table)) {
+ // check if the dimension table is a pure scale out or scale in.
+ // pure scale out means that only new servers are added and no servers
are removed, vice versa
+ RebalanceSummaryResult.ServerInfo serverInfo =
+
dryRunResults.get(table).getRebalanceSummaryResult().getServerInfo();
+ if (!serverInfo.getServersAdded().isEmpty() &&
serverInfo.getServersRemoved().isEmpty()) {
+ // dimension table's pure scale OUT should be performed BEFORE other
regular tables so that queries involving
+ // joining with dimension table won't fail on the new servers
+ firstQueue.add(jobContext);
+ } else if (serverInfo.getServersAdded().isEmpty() &&
!serverInfo.getServersRemoved().isEmpty()) {
+ // dimension table's pure scale IN should be performed AFTER other
regular tables so that queries involving
+ // joining with dimension table won't fail on the old servers
+ lastQueue.add(jobContext);
+ } else {
+ // the dimension table is not a pure scale out or scale in, which is
supposed to be rebalanced manually.
+ // Pre-check should capture and warn about this case.
+ firstQueue.add(jobContext);
+ }
+ } else {
+ queue.add(jobContext);
+ }
+ });
+ ConcurrentLinkedDeque<TenantTableRebalanceJobContext> tableQueue = new
ConcurrentLinkedDeque<>();
+ tableQueue.addAll(firstQueue);
+ tableQueue.addAll(queue);
+ tableQueue.addAll(lastQueue);
+ return tableQueue;
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
index 0d89a94d54d..9606db14b4b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
@@ -19,12 +19,12 @@
package org.apache.pinot.controller.helix.core.rebalance.tenant;
import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
+import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
@@ -42,20 +42,36 @@ public class ZkBasedTenantRebalanceObserver implements
TenantRebalanceObserver {
private final String _tenantName;
private final List<String> _unprocessedTables;
private final TenantRebalanceProgressStats _progressStats;
+ private final TenantRebalanceContext _tenantRebalanceContext;
// Keep track of number of updates. Useful during debugging.
private int _numUpdatesToZk;
+ private boolean _isDone;
- public ZkBasedTenantRebalanceObserver(String jobId, String tenantName,
Set<String> tables,
+ public ZkBasedTenantRebalanceObserver(String jobId, String tenantName,
TenantRebalanceProgressStats progressStats,
+ TenantRebalanceContext tenantRebalanceContext,
PinotHelixResourceManager pinotHelixResourceManager) {
- Preconditions.checkState(tables != null && !tables.isEmpty(), "List of
tables to observe is empty.");
+ _isDone = false;
_jobId = jobId;
_tenantName = tenantName;
- _unprocessedTables = new ArrayList<>(tables);
+ _unprocessedTables = progressStats.getTableStatusMap()
+ .entrySet()
+ .stream()
+ .filter(entry ->
entry.getValue().equals(TenantRebalanceProgressStats.TableStatus.UNPROCESSED.name()))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ _tenantRebalanceContext = tenantRebalanceContext;
_pinotHelixResourceManager = pinotHelixResourceManager;
- _progressStats = new TenantRebalanceProgressStats(tables);
+ _progressStats = progressStats;
_numUpdatesToZk = 0;
}
+ public ZkBasedTenantRebalanceObserver(String jobId, String tenantName,
Set<String> tables,
+ TenantRebalanceContext tenantRebalanceContext,
+ PinotHelixResourceManager pinotHelixResourceManager) {
+ this(jobId, tenantName, new TenantRebalanceProgressStats(tables),
tenantRebalanceContext,
+ pinotHelixResourceManager);
+ }
+
@Override
public void onTrigger(Trigger trigger, String tableName, String description)
{
switch (trigger) {
@@ -78,24 +94,26 @@ public class ZkBasedTenantRebalanceObserver implements
TenantRebalanceObserver {
break;
default:
}
- trackStatsInZk();
+ syncStatsAndContextInZk();
}
@Override
public void onSuccess(String msg) {
_progressStats.setCompletionStatusMsg(msg);
_progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() -
_progressStats.getStartTimeMs()) / 1000);
- trackStatsInZk();
+ syncStatsAndContextInZk();
+ _isDone = true;
}
@Override
public void onError(String errorMsg) {
_progressStats.setCompletionStatusMsg(errorMsg);
_progressStats.setTimeToFinishInSeconds(System.currentTimeMillis() -
_progressStats.getStartTimeMs());
- trackStatsInZk();
+ syncStatsAndContextInZk();
+ _isDone = true;
}
- private void trackStatsInZk() {
+ private void syncStatsAndContextInZk() {
Map<String, String> jobMetadata = new HashMap<>();
jobMetadata.put(CommonConstants.ControllerJob.TENANT_NAME, _tenantName);
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _jobId);
@@ -107,8 +125,31 @@ public class ZkBasedTenantRebalanceObserver implements
TenantRebalanceObserver {
} catch (JsonProcessingException e) {
LOGGER.error("Error serialising rebalance stats to JSON for persisting
to ZK {}", _jobId, e);
}
+ try {
+ jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
+ JsonUtils.objectToString(_tenantRebalanceContext));
+ } catch (JsonProcessingException e) {
+ LOGGER.error("Error serialising rebalance context to JSON for persisting
to ZK {}", _jobId, e);
+ }
_pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata,
ControllerJobTypes.TENANT_REBALANCE);
_numUpdatesToZk++;
LOGGER.debug("Number of updates to Zk: {} for rebalanceJob: {} ",
_numUpdatesToZk, _jobId);
}
+
+ public boolean isDone() {
+ return _isDone;
+ }
+
+ public String getJobId() {
+ return _jobId;
+ }
+
+ public String getTenantName() {
+ return _tenantName;
+ }
+
+ @VisibleForTesting
+ void setDone(boolean isDone) {
+ _isDone = isDone;
+ }
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
index 413d23c383e..c48e16a05f2 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
@@ -57,7 +57,7 @@ public class ControllerPeriodicTaskStarterStatelessTest
extends ControllerTest {
}
private class MockControllerStarter extends ControllerStarter {
- private static final int NUM_PERIODIC_TASKS = 13;
+ private static final int NUM_PERIODIC_TASKS = 14;
public MockControllerStarter() {
super();
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceCheckerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceCheckerTest.java
new file mode 100644
index 00000000000..15c69e63673
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceCheckerTest.java
@@ -0,0 +1,556 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
+import
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+
+
+public class TenantRebalanceCheckerTest extends ControllerTest {
+ private static final String TENANT_NAME = "TestTenant";
+ private static final String JOB_ID = "test-tenant-rebalance-job-123";
+ private static final String JOB_ID_2 = "test-tenant-rebalance-job-456";
+ private static final String ORIGINAL_JOB_ID =
"original-tenant-rebalance-job-123";
+ private static final String TABLE_NAME_1 = "testTable1_OFFLINE";
+ private static final String TABLE_NAME_2 = "testTable2_OFFLINE";
+ private static final String NON_STUCK_TABLE_JOB_ID =
"non-stuck-table-job-456";
+ private static final String STUCK_TABLE_JOB_ID = "stuck-table-job-456";
+ private static final String STUCK_TABLE_JOB_ID_2 = "stuck-table-job-789";
+
+ @Mock
+ private PinotHelixResourceManager _mockPinotHelixResourceManager;
+ @Mock
+ private TenantRebalancer _mockTenantRebalancer;
+ @Mock
+ private ControllerConf _mockControllerConf;
+
+ private TenantRebalanceChecker _tenantRebalanceChecker;
+ private ExecutorService _executorService;
+
+ @BeforeMethod
+ public void setUp() {
+ MockitoAnnotations.openMocks(this);
+ _executorService = Executors.newFixedThreadPool(2);
+
+ // Setup default mock behaviors
+
when(_mockControllerConf.getTenantRebalanceCheckerFrequencyInSeconds()).thenReturn(300);
+
when(_mockControllerConf.getTenantRebalanceCheckerInitialDelayInSeconds()).thenReturn(300L);
+ // mock ZK update success
+ doReturn(true).when(_mockPinotHelixResourceManager)
+ .addControllerJobToZK(anyString(), anyMap(),
eq(ControllerJobTypes.TENANT_REBALANCE));
+ doReturn(true).when(_mockPinotHelixResourceManager)
+ .addControllerJobToZK(anyString(), anyMap(),
eq(ControllerJobTypes.TENANT_REBALANCE), any());
+
+ _tenantRebalanceChecker = new TenantRebalanceChecker(
+ _mockControllerConf,
+ _mockPinotHelixResourceManager,
+ _mockTenantRebalancer
+ );
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ if (_executorService != null) {
+ _executorService.shutdown();
+ }
+ }
+
+ @Test
+ public void testResumeStuckTenantRebalanceJob()
+ throws Exception {
+ // Create a stuck tenant rebalance context
+ TenantRebalanceContext stuckContext = createStuckTenantRebalanceContext();
+ TenantRebalanceProgressStats progressStats = createProgressStats();
+
+ // Mock ZK metadata for the stuck job
+ Map<String, String> jobZKMetadata =
createTenantJobZKMetadata(stuckContext, progressStats);
+ Map<String, Map<String, String>> allJobMetadata = new HashMap<>();
+ allJobMetadata.put(JOB_ID, jobZKMetadata);
+
+ // Mock stuck table rebalance job metadata
+ Map<String, String> stuckTableJobMetadata = createStuckTableJobMetadata();
+
+ // Setup mocks
+ doReturn(allJobMetadata).when(_mockPinotHelixResourceManager)
+ .getAllJobs(eq(Set.of(ControllerJobTypes.TENANT_REBALANCE)), any());
+ doReturn(stuckTableJobMetadata).when(_mockPinotHelixResourceManager)
+ .getControllerJobZKMetadata(eq(STUCK_TABLE_JOB_ID),
eq(ControllerJobTypes.TABLE_REBALANCE));
+
+ // Mock the tenant rebalancer to capture the resumed context
+ ArgumentCaptor<TenantRebalanceContext> contextCaptor =
+ ArgumentCaptor.forClass(TenantRebalanceContext.class);
+ ArgumentCaptor<ZkBasedTenantRebalanceObserver> observerCaptor =
+ ArgumentCaptor.forClass(ZkBasedTenantRebalanceObserver.class);
+
+ // Execute the checker
+ _tenantRebalanceChecker.runTask(new Properties());
+
+ // Verify that the tenant rebalancer was called to resume the job
+ verify(_mockTenantRebalancer, times(1)).rebalanceWithContext(
+ contextCaptor.capture(), observerCaptor.capture());
+
+ // Verify the resumed context
+ TenantRebalanceContext resumedContext = contextCaptor.getValue();
+ assertNotNull(resumedContext);
+ assertEquals(resumedContext.getAttemptId(),
TenantRebalanceContext.INITIAL_ATTEMPT_ID + 1);
+ assertEquals(resumedContext.getOriginalJobId(), ORIGINAL_JOB_ID);
+ assertEquals(resumedContext.getAttemptId(), 2); // Should be incremented
from 1
+ assertEquals(resumedContext.getConfig().getTenantName(), TENANT_NAME);
+
+ // Verify that the stuck table job context was moved back to parallel queue
+ TenantRebalancer.TenantTableRebalanceJobContext
firstJobContextInParallelQueue =
+ resumedContext.getParallelQueue().poll();
+ assertNotNull(firstJobContextInParallelQueue);
+ // because the stuck job is aborted, a new job ID is generated
+ assertNotEquals(firstJobContextInParallelQueue.getJobId(),
STUCK_TABLE_JOB_ID);
+ assertEquals(firstJobContextInParallelQueue.getTableName(), TABLE_NAME_1);
+ assertFalse(firstJobContextInParallelQueue.shouldRebalanceWithDowntime());
+ assertTrue(resumedContext.getOngoingJobsQueue().isEmpty());
+
+ // Verify the observer was created with correct parameters
+ ZkBasedTenantRebalanceObserver observer = observerCaptor.getValue();
+ assertNotNull(observer);
+ }
+
+ @Test
+ public void testResumeStuckTenantRebalanceJobWithMultipleStuckTables()
+ throws Exception {
+ // Create a context with multiple stuck table jobs
+ TenantRebalanceContext stuckContext =
createStuckTenantRebalanceContextWithMultipleTables();
+ TenantRebalanceProgressStats progressStats =
createProgressStatsWithMultipleTables();
+
+ // Mock ZK metadata
+ Map<String, String> jobZKMetadata =
createTenantJobZKMetadata(stuckContext, progressStats);
+ Map<String, Map<String, String>> allJobMetadata = new HashMap<>();
+ allJobMetadata.put(JOB_ID, jobZKMetadata);
+
+ // Mock stuck table job metadata for both tables
+ Map<String, String> stuckTableJobMetadata1 = createStuckTableJobMetadata();
+ Map<String, String> stuckTableJobMetadata2 = createStuckTableJobMetadata();
+
+ // Setup mocks
+ doReturn(allJobMetadata).when(_mockPinotHelixResourceManager)
+ .getAllJobs(eq(Set.of(ControllerJobTypes.TENANT_REBALANCE)), any());
+ doReturn(stuckTableJobMetadata1).when(_mockPinotHelixResourceManager)
+ .getControllerJobZKMetadata(eq(STUCK_TABLE_JOB_ID),
eq(ControllerJobTypes.TABLE_REBALANCE));
+ doReturn(stuckTableJobMetadata2).when(_mockPinotHelixResourceManager)
+ .getControllerJobZKMetadata(eq(STUCK_TABLE_JOB_ID_2),
eq(ControllerJobTypes.TABLE_REBALANCE));
+
+ // Execute the checker
+ _tenantRebalanceChecker.runTask(new Properties());
+
+ // Verify that the tenant rebalancer was called
+ ArgumentCaptor<TenantRebalanceContext> contextCaptor =
+ ArgumentCaptor.forClass(TenantRebalanceContext.class);
+ verify(_mockTenantRebalancer, times(1)).rebalanceWithContext(
+ contextCaptor.capture(), any(ZkBasedTenantRebalanceObserver.class));
+
+ // Verify that both stuck table jobs were moved back to parallel queue
+ TenantRebalanceContext resumedContext = contextCaptor.getValue();
+ assertEquals(resumedContext.getAttemptId(),
TenantRebalanceContext.INITIAL_ATTEMPT_ID + 1);
+ assertEquals(resumedContext.getParallelQueue().size(), 2);
+ assertTrue(resumedContext.getOngoingJobsQueue().isEmpty());
+ }
+
+ @Test
+ public void testDoNotResumeNonStuckTenantRebalanceJob()
+ throws Exception {
+ // Create a non-stuck tenant rebalance context (no ongoing jobs)
+ TenantRebalanceContext nonStuckContextWithoutOngoing =
createNonStuckTenantRebalanceContextWithoutOngoing();
+ TenantRebalanceProgressStats progressStats = createProgressStats();
+
+ // Mock ZK metadata
+ Map<String, String> tenantJobZKMetadataWithoutOngoing =
+
createTenantJobZKMetadataWithRecentTimestamp(nonStuckContextWithoutOngoing,
progressStats);
+ Map<String, Map<String, String>> allJobMetadata = new HashMap<>();
+ allJobMetadata.put(JOB_ID, tenantJobZKMetadataWithoutOngoing);
+
+ // Setup mocks
+ doReturn(allJobMetadata).when(_mockPinotHelixResourceManager)
+ .getAllJobs(eq(Set.of(ControllerJobTypes.TENANT_REBALANCE)), any());
+
+ // Execute the checker
+ _tenantRebalanceChecker.runTask(new Properties());
+
+ // Verify that the tenant rebalancer was NOT called
+ verify(_mockTenantRebalancer, never()).rebalanceWithContext(any(), any());
+ }
+
+ @Test
+ public void testDoNotResumeNonStuckTenantRebalanceJobWithOngoing()
+ throws Exception {
+ // Create a non-stuck tenant rebalance context (no ongoing jobs)
+ TenantRebalanceContext nonStuckContextWithOngoing =
createNonStuckTenantRebalanceContextWithOngoing();
+ TenantRebalanceProgressStats progressStats = createProgressStats();
+
+ // Mock ZK metadata
+ Map<String, String> tenantJobZKMetadataWithOngoing =
+
createTenantJobZKMetadataWithRecentTimestamp(nonStuckContextWithOngoing,
progressStats);
+ Map<String, Map<String, String>> allJobMetadata = new HashMap<>();
+ allJobMetadata.put(JOB_ID, tenantJobZKMetadataWithOngoing);
+
+ // Mock non-stuck table rebalance job metadata
+ Map<String, String> nonStuckTableJobMetadata =
createNonStuckTableJobMetadata();
+
+ // Setup mocks
+ doReturn(allJobMetadata).when(_mockPinotHelixResourceManager)
+ .getAllJobs(eq(Set.of(ControllerJobTypes.TENANT_REBALANCE)), any());
+ doReturn(nonStuckTableJobMetadata).when(_mockPinotHelixResourceManager)
+ .getControllerJobZKMetadata(eq(NON_STUCK_TABLE_JOB_ID),
eq(ControllerJobTypes.TABLE_REBALANCE));
+
+ // Setup mocks
+ doReturn(allJobMetadata).when(_mockPinotHelixResourceManager)
+ .getAllJobs(eq(Set.of(ControllerJobTypes.TENANT_REBALANCE)), any());
+
+ // Execute the checker
+ _tenantRebalanceChecker.runTask(new Properties());
+
+ // Verify that the tenant rebalancer was NOT called
+ verify(_mockTenantRebalancer, never()).rebalanceWithContext(any(), any());
+ }
+
+ @Test
+ public void testDoNotResumeTenantRebalanceJobWhileZKUpdateFailed()
+ throws Exception {
+
+ // Create a stuck tenant rebalance context
+ TenantRebalanceContext stuckContext = createStuckTenantRebalanceContext();
+ TenantRebalanceProgressStats progressStats = createProgressStats();
+
+ // Mock ZK metadata for the stuck job
+ Map<String, String> jobZKMetadata =
createTenantJobZKMetadata(stuckContext, progressStats);
+ Map<String, Map<String, String>> allJobMetadata = new HashMap<>();
+ allJobMetadata.put(JOB_ID, jobZKMetadata);
+
+ // Mock stuck table rebalance job metadata
+ Map<String, String> stuckTableJobMetadata = createStuckTableJobMetadata();
+
+ // Setup mocks
+ doReturn(allJobMetadata).when(_mockPinotHelixResourceManager)
+ .getAllJobs(eq(Set.of(ControllerJobTypes.TENANT_REBALANCE)), any());
+ doReturn(stuckTableJobMetadata).when(_mockPinotHelixResourceManager)
+ .getControllerJobZKMetadata(eq(STUCK_TABLE_JOB_ID),
eq(ControllerJobTypes.TABLE_REBALANCE));
+ doReturn(false).when(_mockPinotHelixResourceManager)
+ .addControllerJobToZK(anyString(), anyMap(),
eq(ControllerJobTypes.TENANT_REBALANCE), any());
+
+ // Mock the tenant rebalancer to capture the resumed context
+ ArgumentCaptor<TenantRebalanceContext> contextCaptor =
+ ArgumentCaptor.forClass(TenantRebalanceContext.class);
+ ArgumentCaptor<ZkBasedTenantRebalanceObserver> observerCaptor =
+ ArgumentCaptor.forClass(ZkBasedTenantRebalanceObserver.class);
+
+ // Execute the checker
+ _tenantRebalanceChecker.runTask(new Properties());
+
+ // Verify that the tenant rebalancer was NOT called because ZK update
failed
+ verify(_mockTenantRebalancer, times(0)).rebalanceWithContext(
+ contextCaptor.capture(), observerCaptor.capture());
+ }
+
+ @Test
+ public void testHandleJsonProcessingException()
+ throws Exception {
+ // Create ZK metadata with invalid JSON
+ Map<String, String> invalidJsonJobZKMetadata = new HashMap<>();
+ invalidJsonJobZKMetadata.put(CommonConstants.ControllerJob.JOB_ID, JOB_ID);
+ invalidJsonJobZKMetadata.put(CommonConstants.ControllerJob.TENANT_NAME,
TENANT_NAME);
+
invalidJsonJobZKMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
+ String.valueOf(System.currentTimeMillis()));
+ invalidJsonJobZKMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobTypes.TENANT_REBALANCE.name());
+
invalidJsonJobZKMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
"invalid json");
+
invalidJsonJobZKMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
"invalid json");
+
+ Map<String, Map<String, String>> allJobMetadata = new HashMap<>();
+ allJobMetadata.put(JOB_ID, invalidJsonJobZKMetadata);
+
+ // Setup mocks
+ doReturn(allJobMetadata).when(_mockPinotHelixResourceManager)
+ .getAllJobs(eq(Set.of(ControllerJobTypes.TENANT_REBALANCE)), any());
+
+ // Execute the checker - should not throw exception
+ _tenantRebalanceChecker.runTask(new Properties());
+
+ // Verify that the tenant rebalancer was NOT called
+ verify(_mockTenantRebalancer, never()).rebalanceWithContext(any(), any());
+ }
+
+ @Test
+ public void testDoNotRunMultipleTenantRebalanceRetry()
+ throws Exception {
+
+ // Create a stuck tenant rebalance context
+ TenantRebalanceContext stuckContext = createStuckTenantRebalanceContext();
+ TenantRebalanceProgressStats progressStats = createProgressStats();
+
+ // Mock ZK metadata for the stuck job
+ Map<String, Map<String, String>> allJobMetadata = new HashMap<>();
+ allJobMetadata.put(JOB_ID, createTenantJobZKMetadata(stuckContext,
progressStats, JOB_ID));
+ allJobMetadata.put(JOB_ID_2, createTenantJobZKMetadata(stuckContext,
progressStats, JOB_ID_2));
+
+ // Mock stuck table rebalance job metadata
+ Map<String, String> stuckTableJobMetadata = createStuckTableJobMetadata();
+
+ // Setup mocks
+ doReturn(allJobMetadata).when(_mockPinotHelixResourceManager)
+ .getAllJobs(eq(Set.of(ControllerJobTypes.TENANT_REBALANCE)), any());
+ doReturn(stuckTableJobMetadata).when(_mockPinotHelixResourceManager)
+ .getControllerJobZKMetadata(eq(STUCK_TABLE_JOB_ID),
eq(ControllerJobTypes.TABLE_REBALANCE));
+
+ // Mock the tenant rebalancer to capture the resumed context
+ ArgumentCaptor<TenantRebalanceContext> contextCaptor =
+ ArgumentCaptor.forClass(TenantRebalanceContext.class);
+ ArgumentCaptor<ZkBasedTenantRebalanceObserver> observerCaptor =
+ ArgumentCaptor.forClass(ZkBasedTenantRebalanceObserver.class);
+
+ // Execute the checker
+ _tenantRebalanceChecker.runTask(new Properties());
+
+ // Verify that the tenant rebalancer was called to resume the job
+ verify(_mockTenantRebalancer, times(1)).rebalanceWithContext(
+ contextCaptor.capture(), observerCaptor.capture());
+ // The mockTenantRebalance never let the job done
+ assertFalse(observerCaptor.getValue().isDone());
+
+ _tenantRebalanceChecker.runTask(new Properties());
+ // Since the previous job is not done, the rebalanceWithContext should not
be called again as we have set the limit
+ // to one tenant rebalance retry at a time
+ verify(_mockTenantRebalancer, times(1)).rebalanceWithContext(
+ contextCaptor.capture(), observerCaptor.capture());
+
+ // Mark the job as done and run the checker again - should pick up another
job now
+ observerCaptor.getValue().setDone(true);
+ _tenantRebalanceChecker.runTask(new Properties());
+
+ verify(_mockTenantRebalancer, times(2)).rebalanceWithContext(
+ contextCaptor.capture(), observerCaptor.capture());
+ }
+
+ // Helper methods to create test data
+
+ private TenantRebalanceContext createStuckTenantRebalanceContext() {
+ TenantRebalanceConfig config = new TenantRebalanceConfig();
+ config.setTenantName(TENANT_NAME);
+ config.setHeartbeatTimeoutInMs(300000L); // 5 minutes
+
+ ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext>
parallelQueue =
+ new ConcurrentLinkedDeque<>();
+ ConcurrentLinkedQueue<TenantRebalancer.TenantTableRebalanceJobContext>
ongoingJobsQueue =
+ new ConcurrentLinkedQueue<>();
+
+ // Add a stuck table job to ongoing queue
+ TenantRebalancer.TenantTableRebalanceJobContext stuckJobContext =
+ new TenantRebalancer.TenantTableRebalanceJobContext(TABLE_NAME_1,
STUCK_TABLE_JOB_ID, false);
+ ongoingJobsQueue.add(stuckJobContext);
+
+ return new TenantRebalanceContext(
+ ORIGINAL_JOB_ID, config, 1, parallelQueue,
+ new ConcurrentLinkedQueue<>(), ongoingJobsQueue);
+ }
+
+ private TenantRebalanceContext
createStuckTenantRebalanceContextWithMultipleTables()
+ throws JsonProcessingException {
+ TenantRebalanceConfig config = new TenantRebalanceConfig();
+ config.setTenantName(TENANT_NAME);
+ config.setHeartbeatTimeoutInMs(300000L);
+
+ ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext>
parallelQueue =
+ new ConcurrentLinkedDeque<>();
+ ConcurrentLinkedQueue<TenantRebalancer.TenantTableRebalanceJobContext>
ongoingJobsQueue =
+ new ConcurrentLinkedQueue<>();
+
+ // Add multiple stuck table jobs to ongoing queue
+ ongoingJobsQueue.add(new
TenantRebalancer.TenantTableRebalanceJobContext(TABLE_NAME_1,
STUCK_TABLE_JOB_ID, false));
+ ongoingJobsQueue.add(
+ new TenantRebalancer.TenantTableRebalanceJobContext(TABLE_NAME_2,
STUCK_TABLE_JOB_ID_2, false));
+
+ return new TenantRebalanceContext(
+ ORIGINAL_JOB_ID, config, 1, parallelQueue,
+ new ConcurrentLinkedQueue<>(), ongoingJobsQueue);
+ }
+
+ private TenantRebalanceContext
createNonStuckTenantRebalanceContextWithoutOngoing()
+ throws JsonProcessingException {
+ TenantRebalanceConfig config = new TenantRebalanceConfig();
+ config.setTenantName(TENANT_NAME);
+ config.setHeartbeatTimeoutInMs(300000L);
+
+ ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext>
parallelQueue =
+ new ConcurrentLinkedDeque<>();
+ // Add some jobs to parallel queue but none to ongoing queue
+ parallelQueue.add(new
TenantRebalancer.TenantTableRebalanceJobContext(TABLE_NAME_1,
NON_STUCK_TABLE_JOB_ID, false));
+
+ return new TenantRebalanceContext(
+ ORIGINAL_JOB_ID, config, 1, parallelQueue,
+ new ConcurrentLinkedQueue<>(), new ConcurrentLinkedQueue<>());
+ }
+
+ private TenantRebalanceContext
createNonStuckTenantRebalanceContextWithOngoing()
+ throws JsonProcessingException {
+ TenantRebalanceConfig config = new TenantRebalanceConfig();
+ config.setTenantName(TENANT_NAME);
+ config.setHeartbeatTimeoutInMs(300000L);
+
+ ConcurrentLinkedQueue<TenantRebalancer.TenantTableRebalanceJobContext>
ongoing =
+ new ConcurrentLinkedQueue<>();
+ // Add some jobs to parallel queue but none to ongoing queue
+ ongoing.add(new
TenantRebalancer.TenantTableRebalanceJobContext(TABLE_NAME_1,
NON_STUCK_TABLE_JOB_ID, false));
+
+ return new TenantRebalanceContext(
+ ORIGINAL_JOB_ID, config, 1, new ConcurrentLinkedDeque<>(),
+ new ConcurrentLinkedQueue<>(), ongoing);
+ }
+
+ private TenantRebalanceContext createRecentTenantRebalanceContext()
+ throws JsonProcessingException {
+ TenantRebalanceConfig config = new TenantRebalanceConfig();
+ config.setTenantName(TENANT_NAME);
+ config.setHeartbeatTimeoutInMs(300000L);
+
+ return new TenantRebalanceContext(
+ ORIGINAL_JOB_ID, config, 1,
+ new ConcurrentLinkedDeque<>(), new ConcurrentLinkedQueue<>(), new
ConcurrentLinkedQueue<>());
+ }
+
+ private TenantRebalanceProgressStats createProgressStats() {
+ Set<String> tables = new HashSet<>();
+ tables.add(TABLE_NAME_1);
+ tables.add(TABLE_NAME_2);
+
+ TenantRebalanceProgressStats stats = new
TenantRebalanceProgressStats(tables);
+ stats.setStartTimeMs(System.currentTimeMillis() - 60000); // 1 minute ago
+ stats.updateTableStatus(TABLE_NAME_1,
TenantRebalanceProgressStats.TableStatus.PROCESSING.name());
+ stats.updateTableStatus(TABLE_NAME_2,
TenantRebalanceProgressStats.TableStatus.UNPROCESSED.name());
+
+ return stats;
+ }
+
+ private TenantRebalanceProgressStats createProgressStatsWithMultipleTables()
{
+ Set<String> tables = new HashSet<>();
+ tables.add(TABLE_NAME_1);
+ tables.add(TABLE_NAME_2);
+
+ TenantRebalanceProgressStats stats = new
TenantRebalanceProgressStats(tables);
+ stats.setStartTimeMs(System.currentTimeMillis() - 60000);
+ stats.updateTableStatus(TABLE_NAME_1,
TenantRebalanceProgressStats.TableStatus.PROCESSING.name());
+ stats.updateTableStatus(TABLE_NAME_2,
TenantRebalanceProgressStats.TableStatus.PROCESSING.name());
+
+ return stats;
+ }
+
+ private Map<String, String> createTenantJobZKMetadata(TenantRebalanceContext
context,
+ TenantRebalanceProgressStats progressStats)
+ throws JsonProcessingException {
+ return createTenantJobZKMetadata(context, progressStats, JOB_ID);
+ }
+
+ private Map<String, String> createTenantJobZKMetadata(TenantRebalanceContext
context,
+ TenantRebalanceProgressStats progressStats, String jobId)
+ throws JsonProcessingException {
+ Map<String, String> metadata = new HashMap<>();
+ metadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
+ metadata.put(CommonConstants.ControllerJob.TENANT_NAME, TENANT_NAME);
+ metadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
+ String.valueOf(System.currentTimeMillis() - 400000)); // 6+ minutes
ago (beyond heartbeat timeout)
+ metadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobTypes.TENANT_REBALANCE.name());
+ metadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
+ JsonUtils.objectToString(context));
+
metadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+ JsonUtils.objectToString(progressStats));
+
+ return metadata;
+ }
+
+ private Map<String, String>
createTenantJobZKMetadataWithRecentTimestamp(TenantRebalanceContext context,
+ TenantRebalanceProgressStats progressStats)
+ throws JsonProcessingException {
+ Map<String, String> metadata = createTenantJobZKMetadata(context,
progressStats);
+ metadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
+ String.valueOf(System.currentTimeMillis() - 60000)); // 1 minute ago
(within heartbeat timeout)
+ return metadata;
+ }
+
+ private Map<String, String> createStuckTableJobMetadata()
+ throws JsonProcessingException {
+ Map<String, String> metadata = new HashMap<>();
+ metadata.put(CommonConstants.ControllerJob.JOB_ID, STUCK_TABLE_JOB_ID);
+ metadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
+ String.valueOf(System.currentTimeMillis() - 400000)); // 6+ minutes ago
+
+ // Create stuck table rebalance progress stats
+ TableRebalanceProgressStats tableStats = new TableRebalanceProgressStats();
+ tableStats.setStatus(RebalanceResult.Status.IN_PROGRESS);
+ tableStats.setStartTimeMs(System.currentTimeMillis() - 400000);
+
+ // Create table rebalance context
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setHeartbeatTimeoutInMs(300000L);
+ TableRebalanceContext tableContext =
TableRebalanceContext.forInitialAttempt(
+ "original-table-job", rebalanceConfig, true);
+
+
metadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+ JsonUtils.objectToString(tableStats));
+ metadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
+ JsonUtils.objectToString(tableContext));
+
+ return metadata;
+ }
+
+ private Map<String, String> createNonStuckTableJobMetadata()
+ throws JsonProcessingException {
+ Map<String, String> metadata = createStuckTableJobMetadata();
+ metadata.put(CommonConstants.ControllerJob.JOB_ID, NON_STUCK_TABLE_JOB_ID);
+ metadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
+ String.valueOf(System.currentTimeMillis() - 60000)); // 1 minutes ago
+
+ return metadata;
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
index 0b2f099c53e..d033b9c6a4f 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
@@ -28,7 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.tuple.Pair;
@@ -92,8 +92,8 @@ public class TenantRebalancerTest extends ControllerTest {
addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX +
i, true);
}
- DefaultTenantRebalancer tenantRebalancer =
- new DefaultTenantRebalancer(_tableRebalanceManager,
_helixResourceManager, _executorService);
+ TenantRebalancer tenantRebalancer =
+ new TenantRebalancer(_tableRebalanceManager, _helixResourceManager,
_executorService);
// tag all servers and brokers to test tenant
addTenantTagToInstances(TENANT_NAME);
@@ -161,8 +161,8 @@ public class TenantRebalancerTest extends ControllerTest {
addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX +
i, true);
}
- DefaultTenantRebalancer tenantRebalancer =
- new DefaultTenantRebalancer(_tableRebalanceManager,
_helixResourceManager, _executorService);
+ TenantRebalancer tenantRebalancer =
+ new TenantRebalancer(_tableRebalanceManager, _helixResourceManager,
_executorService);
// tag all servers and brokers to test tenant
addTenantTagToInstances(TENANT_NAME);
@@ -289,8 +289,8 @@ public class TenantRebalancerTest extends ControllerTest {
addDummySchema(tableNameD);
addDummySchema(tableNameE);
- DefaultTenantRebalancer tenantRebalancer =
- new DefaultTenantRebalancer(_tableRebalanceManager,
_helixResourceManager, _executorService);
+ TenantRebalancer tenantRebalancer =
+ new TenantRebalancer(_tableRebalanceManager, _helixResourceManager,
_executorService);
// table A set tenantConfig.tenants.server to tenantName
// SHOULD be selected as tenant's table
@@ -359,8 +359,8 @@ public class TenantRebalancerTest extends ControllerTest {
addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX +
i, true);
}
- DefaultTenantRebalancer tenantRebalancer =
- new DefaultTenantRebalancer(_tableRebalanceManager,
_helixResourceManager, _executorService);
+ TenantRebalancer tenantRebalancer =
+ new TenantRebalancer(_tableRebalanceManager, _helixResourceManager,
_executorService);
// tag all servers and brokers to test tenant
addTenantTagToInstances(TENANT_NAME);
@@ -442,7 +442,7 @@ public class TenantRebalancerTest extends ControllerTest {
assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
// set table B in parallel blacklist, so that it ends up in sequential
queue, and table A in parallel queue
-
Pair<ConcurrentLinkedQueue<TenantRebalancer.TenantTableRebalanceJobContext>,
+
Pair<ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext>,
Queue<TenantRebalancer.TenantTableRebalanceJobContext>>
queues =
tenantRebalancer.createParallelAndSequentialQueues(config,
dryRunResult.getRebalanceTableResults(), null,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]