This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6639f89842 Enable ZK-based progress tracking for SegmentRelocator 
rebalances (#16008)
6639f89842 is described below

commit 6639f89842f082cabf7a72cf4717166b61384402
Author: Yash Mayya <yash.ma...@gmail.com>
AuthorDate: Wed Jun 11 17:47:28 2025 +0100

    Enable ZK-based progress tracking for SegmentRelocator rebalances (#16008)
---
 .../api/resources/PinotTableRestletResource.java   |  7 ++-
 .../helix/core/rebalance/RebalanceChecker.java     | 68 ++++++++++++++++++++-
 .../core/rebalance/TableRebalanceContext.java      | 39 ++++++++++--
 .../core/rebalance/TableRebalanceManager.java      | 20 +++++--
 .../rebalance/tenant/DefaultTenantRebalancer.java  |  5 +-
 .../helix/core/relocation/SegmentRelocator.java    |  5 +-
 .../helix/core/rebalance/RebalanceCheckerTest.java | 69 ++++++++++++++++++----
 .../apache/pinot/tools/PinotTableRebalancer.java   |  2 +-
 8 files changed, 184 insertions(+), 31 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index bdb82ce766..7ca3473ffe 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -696,18 +696,19 @@ public class PinotTableRestletResource {
       if (dryRun || preChecks || downtime) {
         // For dry-run, preChecks or rebalance with downtime, it's fine to run 
the rebalance synchronously since it
         // should be a really short operation.
-        return _tableRebalanceManager.rebalanceTable(tableNameWithType, 
rebalanceConfig, rebalanceJobId, false);
+        return _tableRebalanceManager.rebalanceTable(tableNameWithType, 
rebalanceConfig, rebalanceJobId, false, false);
       } else {
         // Make a dry-run first to get the target assignment
         rebalanceConfig.setDryRun(true);
         RebalanceResult dryRunResult =
-            _tableRebalanceManager.rebalanceTable(tableNameWithType, 
rebalanceConfig, rebalanceJobId, false);
+            _tableRebalanceManager.rebalanceTable(tableNameWithType, 
rebalanceConfig, rebalanceJobId, false, false);
 
         if (dryRunResult.getStatus() == RebalanceResult.Status.DONE) {
           // If dry-run succeeded, run rebalance asynchronously
           rebalanceConfig.setDryRun(false);
           CompletableFuture<RebalanceResult> rebalanceResultFuture =
-              _tableRebalanceManager.rebalanceTableAsync(tableNameWithType, 
rebalanceConfig, rebalanceJobId, true);
+              _tableRebalanceManager.rebalanceTableAsync(tableNameWithType, 
rebalanceConfig, rebalanceJobId, true,
+                  true);
           rebalanceResultFuture.whenComplete((rebalanceResult, throwable) -> {
             if (throwable != null) {
               String errorMsg = String.format("Caught exception/error while 
rebalancing table: %s", tableNameWithType);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
index b16c6b8197..cac0315393 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
@@ -111,6 +111,11 @@ public class RebalanceChecker extends 
ControllerPeriodicTask<Void> {
   @VisibleForTesting
   void retryRebalanceTable(String tableNameWithType, Map<String, Map<String, 
String>> allJobMetadata)
       throws Exception {
+    // We first check for rebalance jobs that are stuck - i.e., those that are 
IN_PROGRESS but haven't updated their
+    // status in ZK within the heartbeat timeout. This could occur if a 
controller crashes while running a rebalance job
+    // for instance. These stuck jobs are always aborted here. They will also 
be retried if it's allowed as per the
+    // rebalance context.
+    //
     // Skip retry for the table if rebalance job is still running or has 
completed, in specific:
     // 1) Skip retry if any rebalance job is actively running. Being actively 
running means the job is at IN_PROGRESS
     // status, and has updated its status kept in ZK within the heartbeat 
timeout. It's possible that more than one
@@ -118,11 +123,23 @@ public class RebalanceChecker extends 
ControllerPeriodicTask<Void> {
     // 2) Skip retry if the most recently started rebalance job has completed 
with DONE or NO_OP. It's possible that
     // jobs started earlier may be still running, but they are ignored here.
     //
-    // Otherwise, we can get a list of failed rebalance jobs, i.e. those at 
FAILED status; or IN_PROGRESS status but
-    // haven't updated their status kept in ZK within the heartbeat timeout. 
For those candidate jobs to retry:
+    // Note that it should be very unlikely to have scenarios where there are 
more than one rebalance jobs running
+    // for a table at the same time, or to have a rebalance job that started 
earlier but completed later than the one
+    // started most recently since we try to prevent new rebalances from being 
triggered while a rebalance is in
+    // progress by checking ZK metadata. Such scenarios can only occur if 
multiple rebalance jobs are triggered at the
+    // same time and the second one is started before the first one updates 
its status in ZK.
+    //
+    // If we detect that a retry is required based on the above criteria, we 
can get a list of failed rebalance jobs,
+    // i.e. those at FAILED status; or IN_PROGRESS status but haven't updated 
their status kept in ZK within the
+    // heartbeat timeout. For those candidate jobs to retry:
     // 1) Firstly, group them by the original jobIds they retry for so that we 
can skip those exceeded maxRetry.
     // 2) For the remaining jobs, we take the one started most recently and 
retry it with its original configs.
     // 3) If configured, we can abort the other rebalance jobs for the table 
by setting their status to FAILED.
+
+    if (hasStuckInProgressJobs(tableNameWithType, allJobMetadata)) {
+      abortExistingJobs(tableNameWithType, _pinotHelixResourceManager);
+    }
+
     Map<String/*original jobId*/, Set<Pair<TableRebalanceContext/*job 
attempts*/, Long
         /*startTime*/>>> candidateJobs = getCandidateJobs(tableNameWithType, 
allJobMetadata);
     if (candidateJobs.isEmpty()) {
@@ -147,7 +164,7 @@ public class RebalanceChecker extends 
ControllerPeriodicTask<Void> {
           retryDelayMs);
       return;
     }
-    abortExistingJobs(tableNameWithType, _pinotHelixResourceManager);
+
     // Get tableConfig only when the table needs to retry rebalance, and get 
it before submitting rebalance to another
     // thread, in order to avoid unnecessary ZK reads and making too many ZK 
reads in a short time.
     TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
@@ -250,6 +267,47 @@ public class RebalanceChecker extends 
ControllerPeriodicTask<Void> {
     return candidateJobRun;
   }
 
+  /**
+   * Check if there are any rebalance jobs that are stuck in IN_PROGRESS 
status, i.e., they have not updated their
+   * status in ZK within the configured heartbeat timeout.
+   * @param tableNameWithType the table name with type
+   * @param allJobMetadata the metadata of all rebalance jobs for the table
+   * @return true if there are stuck rebalance jobs, false otherwise
+   */
+  @VisibleForTesting
+  static boolean hasStuckInProgressJobs(String tableNameWithType, Map<String, 
Map<String, String>> allJobMetadata)
+      throws Exception {
+    for (Map.Entry<String, Map<String, String>> entry : 
allJobMetadata.entrySet()) {
+      String jobId = entry.getKey();
+      Map<String, String> jobMetadata = entry.getValue();
+      long statsUpdatedAt = 
Long.parseLong(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
+      String jobStatsInStr = 
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+      if (StringUtils.isEmpty(jobStatsInStr)) {
+        // Skip rebalance job as it has no job progress stats
+        continue;
+      }
+      String jobCtxInStr = 
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT);
+      if (StringUtils.isEmpty(jobCtxInStr)) {
+        // Skip rebalance job: {} as it has no job context
+        continue;
+      }
+      TableRebalanceProgressStats jobStats = 
JsonUtils.stringToObject(jobStatsInStr, TableRebalanceProgressStats.class);
+      TableRebalanceContext jobCtx = JsonUtils.stringToObject(jobCtxInStr, 
TableRebalanceContext.class);
+
+      if (jobStats.getStatus() == RebalanceResult.Status.IN_PROGRESS) {
+        long heartbeatTimeoutMs = jobCtx.getConfig().getHeartbeatTimeoutInMs();
+        if (System.currentTimeMillis() - statsUpdatedAt >= heartbeatTimeoutMs) 
{
+          LOGGER.info("Found stuck rebalance job: {} for table: {} that has 
not updated its status in ZK within "
+              + "heartbeat timeout: {}", jobId, tableNameWithType, 
heartbeatTimeoutMs);
+          return true;
+        } else {
+          return false;
+        }
+      }
+    }
+    return false;
+  }
+
   @VisibleForTesting
   static Map<String, Set<Pair<TableRebalanceContext, Long>>> 
getCandidateJobs(String tableNameWithType,
       Map<String, Map<String, String>> allJobMetadata)
@@ -279,6 +337,10 @@ public class RebalanceChecker extends 
ControllerPeriodicTask<Void> {
       }
       TableRebalanceProgressStats jobStats = 
JsonUtils.stringToObject(jobStatsInStr, TableRebalanceProgressStats.class);
       TableRebalanceContext jobCtx = JsonUtils.stringToObject(jobCtxInStr, 
TableRebalanceContext.class);
+      if (!jobCtx.getAllowRetries()) {
+        LOGGER.info("Skip rebalance job: {} as it does not allow retries", 
jobId);
+        continue;
+      }
       long jobStartTimeMs = jobStats.getStartTimeMs();
       if (latestStartedJob == null || latestStartedJob.getRight() < 
jobStartTimeMs) {
         latestStartedJob = Pair.of(jobId, jobStartTimeMs);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceContext.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceContext.java
index 48e43558cc..aea3a2ff0d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceContext.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceContext.java
@@ -27,24 +27,45 @@ public class TableRebalanceContext {
   private String _originalJobId;
   private RebalanceConfig _config;
   private int _attemptId;
+  // Default to true for all user initiated rebalances, so that they can be 
retried if they fail or get stuck.
+  private boolean _allowRetries = true;
 
-  public static TableRebalanceContext forInitialAttempt(String originalJobId, 
RebalanceConfig config) {
-    return new TableRebalanceContext(originalJobId, config, 
INITIAL_ATTEMPT_ID);
+  /**
+   * Creates a new TableRebalanceContext for the initial attempt of a 
rebalance job.
+   *
+   * @param originalJobId The original job ID for the rebalance job.
+   * @param config The rebalance configuration.
+   * @param allowRetries Whether retries are allowed for this rebalance job. 
This isn't part of {@link RebalanceConfig}
+   *                     because user initiated rebalances should always 
support retries for failed and stuck jobs.
+   * @return A new TableRebalanceContext instance.
+   */
+  public static TableRebalanceContext forInitialAttempt(String originalJobId, 
RebalanceConfig config,
+      boolean allowRetries) {
+    return new TableRebalanceContext(originalJobId, config, 
INITIAL_ATTEMPT_ID, allowRetries);
   }
 
+  /**
+   * Creates a new TableRebalanceContext for a retry attempt of a rebalance 
job.
+   *
+   * @param originalJobId The original job ID for the rebalance job.
+   * @param config The rebalance configuration.
+   * @param attemptId The attempt ID for the retry.
+   * @return A new TableRebalanceContext instance.
+   */
   public static TableRebalanceContext forRetry(String originalJobId, 
RebalanceConfig config, int attemptId) {
-    return new TableRebalanceContext(originalJobId, config, attemptId);
+    return new TableRebalanceContext(originalJobId, config, attemptId, true);
   }
 
   public TableRebalanceContext() {
     // For JSON deserialization.
   }
 
-  private TableRebalanceContext(String originalJobId, RebalanceConfig config, 
int attemptId) {
+  private TableRebalanceContext(String originalJobId, RebalanceConfig config, 
int attemptId, boolean allowRetries) {
     _jobId = createAttemptJobId(originalJobId, attemptId);
     _originalJobId = originalJobId;
     _config = config;
     _attemptId = attemptId;
+    _allowRetries = allowRetries;
   }
 
   public int getAttemptId() {
@@ -79,10 +100,18 @@ public class TableRebalanceContext {
     _config = config;
   }
 
+  public boolean getAllowRetries() {
+    return _allowRetries;
+  }
+
+  public void setAllowRetries(boolean allowRetries) {
+    _allowRetries = allowRetries;
+  }
+
   @Override
   public String toString() {
     return "TableRebalanceContext{" + "_jobId='" + _jobId + '\'' + ", 
_originalJobId='" + _originalJobId + '\''
-        + ", _config=" + _config + ", _attemptId=" + _attemptId + '}';
+        + ", _config=" + _config + ", _attemptId=" + _attemptId + ", 
_allowRetries=" + _allowRetries + "}";
   }
 
   private static String createAttemptJobId(String originalJobId, int 
attemptId) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java
index 87de12b3d7..b700582593 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java
@@ -79,12 +79,19 @@ public class TableRebalanceManager {
    * @param rebalanceConfig configuration for the rebalance operation
    * @param rebalanceJobId ID of the rebalance job, which is used to track the 
progress of the rebalance operation
    * @param trackRebalanceProgress whether to track rebalance progress stats 
in ZK
+   * @param allowRetries whether to allow retries for failed or stuck 
rebalance operations (through
+   *                     {@link RebalanceChecker}). Requires {@code 
trackRebalanceProgress} to be true.
    * @return result of the rebalance operation
    * @throws TableNotFoundException if the table does not exist
+   * @throws RebalanceInProgressException if a rebalance job is already in 
progress for the table (as per ZK metadata)
    */
   public RebalanceResult rebalanceTable(String tableNameWithType, 
RebalanceConfig rebalanceConfig,
-      String rebalanceJobId, boolean trackRebalanceProgress)
+      String rebalanceJobId, boolean trackRebalanceProgress, boolean 
allowRetries)
       throws TableNotFoundException, RebalanceInProgressException {
+    if (allowRetries && !trackRebalanceProgress) {
+      throw new IllegalArgumentException(
+          "Rebalance retries are only supported when rebalance progress is 
tracked in ZK");
+    }
     TableConfig tableConfig = 
_resourceManager.getTableConfig(tableNameWithType);
     if (tableConfig == null) {
       throw new TableNotFoundException("Failed to find table config for table: 
" + tableNameWithType);
@@ -93,7 +100,7 @@ public class TableRebalanceManager {
     ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver = null;
     if (trackRebalanceProgress) {
       zkBasedTableRebalanceObserver = new 
ZkBasedTableRebalanceObserver(tableNameWithType, rebalanceJobId,
-          TableRebalanceContext.forInitialAttempt(rebalanceJobId, 
rebalanceConfig),
+          TableRebalanceContext.forInitialAttempt(rebalanceJobId, 
rebalanceConfig, allowRetries),
           _resourceManager.getPropertyStore());
     }
     return rebalanceTable(tableNameWithType, tableConfig, rebalanceJobId, 
rebalanceConfig,
@@ -109,11 +116,14 @@ public class TableRebalanceManager {
    * @param rebalanceConfig configuration for the rebalance operation
    * @param rebalanceJobId ID of the rebalance job, which is used to track the 
progress of the rebalance operation
    * @param trackRebalanceProgress whether to track rebalance progress stats 
in ZK
+   * @param allowRetries whether to allow retries for failed or stuck 
rebalance operations (through
+   *                     {@link RebalanceChecker}). Requires {@code 
trackRebalanceProgress} to be true.
    * @return a CompletableFuture that will complete with the result of the 
rebalance operation
    * @throws TableNotFoundException if the table does not exist
+   * @throws RebalanceInProgressException if a rebalance job is already in 
progress for the table (as per ZK metadata)
    */
   public CompletableFuture<RebalanceResult> rebalanceTableAsync(String 
tableNameWithType,
-      RebalanceConfig rebalanceConfig, String rebalanceJobId, boolean 
trackRebalanceProgress)
+      RebalanceConfig rebalanceConfig, String rebalanceJobId, boolean 
trackRebalanceProgress, boolean allowRetries)
       throws TableNotFoundException, RebalanceInProgressException {
     TableConfig tableConfig = 
_resourceManager.getTableConfig(tableNameWithType);
     if (tableConfig == null) {
@@ -125,7 +135,8 @@ public class TableRebalanceManager {
     return CompletableFuture.supplyAsync(
         () -> {
           try {
-            return rebalanceTable(tableNameWithType, rebalanceConfig, 
rebalanceJobId, trackRebalanceProgress);
+            return rebalanceTable(tableNameWithType, rebalanceConfig, 
rebalanceJobId, trackRebalanceProgress,
+                allowRetries);
           } catch (TableNotFoundException e) {
             // Should not happen since we already checked for table existence
             throw new RuntimeException(e);
@@ -147,6 +158,7 @@ public class TableRebalanceManager {
    * @param rebalanceConfig configuration for the rebalance operation
    * @param zkBasedTableRebalanceObserver observer to track rebalance progress 
in ZK
    * @return a CompletableFuture that will complete with the result of the 
rebalance operation
+   * @throws RebalanceInProgressException if a rebalance job is already in 
progress for the table (as per ZK metadata)
    */
   public CompletableFuture<RebalanceResult> rebalanceTableAsync(String 
tableNameWithType, TableConfig tableConfig,
       String rebalanceJobId, RebalanceConfig rebalanceConfig,
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
index 369024faa6..3f288ef5ff 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
@@ -61,7 +61,8 @@ public class DefaultTenantRebalancer implements 
TenantRebalancer {
         RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
         rebalanceConfig.setDryRun(true);
         rebalanceResult.put(table,
-            _tableRebalanceManager.rebalanceTable(table, rebalanceConfig, 
createUniqueRebalanceJobIdentifier(), false));
+            _tableRebalanceManager.rebalanceTable(table, rebalanceConfig, 
createUniqueRebalanceJobIdentifier(), false,
+                false));
       } catch (TableNotFoundException | RebalanceInProgressException 
exception) {
         rebalanceResult.put(table, new RebalanceResult(null, 
RebalanceResult.Status.FAILED, exception.getMessage(),
             null, null, null, null, null));
@@ -203,7 +204,7 @@ public class DefaultTenantRebalancer implements 
TenantRebalancer {
       TenantRebalanceObserver observer) {
     try {
       
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_STARTED_TRIGGER, 
tableName, rebalanceJobId);
-      RebalanceResult result = 
_tableRebalanceManager.rebalanceTable(tableName, config, rebalanceJobId, true);
+      RebalanceResult result = 
_tableRebalanceManager.rebalanceTable(tableName, config, rebalanceJobId, true, 
true);
       if (result.getStatus().equals(RebalanceResult.Status.DONE)) {
         
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER, 
tableName, null);
       } else {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
index b2144260c0..fd10148ce3 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
@@ -232,8 +232,11 @@ public class SegmentRelocator extends 
ControllerPeriodicTask<Void> {
 
       // We're not using the async rebalance API here because we want to run 
this on a separate thread pool from the
       // rebalance thread pool that is used for user initiated rebalances.
+
+      // Retries are disabled because SegmentRelocator itself is a periodic 
controller task, so we don't want the
+      // RebalanceChecker to unnecessarily retry any such failed rebalances.
       RebalanceResult rebalance = 
_tableRebalanceManager.rebalanceTable(tableNameWithType, rebalanceConfig,
-          TableRebalancer.createUniqueRebalanceJobIdentifier(), false);
+          TableRebalancer.createUniqueRebalanceJobIdentifier(), true, false);
       switch (rebalance.getStatus()) {
         case NO_OP:
           LOGGER.info("All segments are already relocated for table: {}", 
tableNameWithType);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
index d88b1319c1..373815c969 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
@@ -51,10 +51,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.*;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.*;
 
 
 public class RebalanceCheckerTest {
@@ -87,7 +84,7 @@ public class RebalanceCheckerTest {
     TableRebalanceProgressStats stats = new TableRebalanceProgressStats();
     stats.setStatus(RebalanceResult.Status.FAILED);
     stats.setStartTimeMs(1000);
-    TableRebalanceContext jobCtx = 
TableRebalanceContext.forInitialAttempt("job1", jobCfg);
+    TableRebalanceContext jobCtx = 
TableRebalanceContext.forInitialAttempt("job1", jobCfg, true);
     Map<String, String> jobMetadata = 
ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job1", stats, 
jobCtx);
     allJobMetadata.put("job1", jobMetadata);
     // 3 failed retry runs for job1
@@ -104,7 +101,7 @@ public class RebalanceCheckerTest {
     stats = new TableRebalanceProgressStats();
     stats.setStatus(RebalanceResult.Status.FAILED);
     stats.setStartTimeMs(2000);
-    jobCtx = TableRebalanceContext.forInitialAttempt("job2", jobCfg);
+    jobCtx = TableRebalanceContext.forInitialAttempt("job2", jobCfg, true);
     jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName, 
"job2", stats, jobCtx);
     allJobMetadata.put("job2", jobMetadata);
     jobMetadata = createDummyJobMetadata(tableName, "job2", 2, 2100, 
RebalanceResult.Status.DONE);
@@ -116,7 +113,7 @@ public class RebalanceCheckerTest {
     stats = new TableRebalanceProgressStats();
     stats.setStatus(RebalanceResult.Status.IN_PROGRESS);
     stats.setStartTimeMs(3000);
-    jobCtx = TableRebalanceContext.forInitialAttempt("job3", jobCfg);
+    jobCtx = TableRebalanceContext.forInitialAttempt("job3", jobCfg, true);
     jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName, 
"job3", stats, jobCtx);
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, "3000");
     allJobMetadata.put("job3", jobMetadata);
@@ -152,11 +149,37 @@ public class RebalanceCheckerTest {
     stats = new TableRebalanceProgressStats();
     stats.setStatus(RebalanceResult.Status.DONE);
     stats.setStartTimeMs(5000);
-    jobCtx = TableRebalanceContext.forInitialAttempt("job5", jobCfg);
+    jobCtx = TableRebalanceContext.forInitialAttempt("job5", jobCfg, true);
     jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName, 
"job5", stats, jobCtx);
     allJobMetadata.put("job5", jobMetadata);
     jobs = RebalanceChecker.getCandidateJobs(tableName, allJobMetadata);
     assertEquals(jobs.size(), 0);
+
+    // Add job6 that doesn't support retries as per its rebalance context 
(used by system initiated rebalances in
+    // practice).
+    jobCfg = new RebalanceConfig();
+    jobCfg.setMaxAttempts(4);
+    stats = new TableRebalanceProgressStats();
+    stats.setStatus(RebalanceResult.Status.FAILED);
+    stats.setStartTimeMs(5000);
+    jobCtx = TableRebalanceContext.forInitialAttempt("job6", jobCfg, false);
+    jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName, 
"job6", stats, jobCtx);
+    allJobMetadata.put("job6", jobMetadata);
+    jobs = RebalanceChecker.getCandidateJobs(tableName, allJobMetadata);
+    assertEquals(jobs.size(), 0);
+
+    // Ensure that a job serialized using an older version of 
TableRebalanceContext without the allowRetries field is
+    // retriable by default.
+    jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
+        
"{\"jobId\":\"job6\",\"attemptId\":1,\"config\":{\"maxAttempts\":4,\"dryRun\":false,\"preChecks\":false,"
+            + 
"\"bootstrap\":false,\"downtime\":false,\"lowDiskMode\":false,\"bestEfforts\":false,"
+            + 
"\"reassignInstances\":false,\"includeConsuming\":false,\"batchSizePerServer\":-1,"
+            + 
"\"updateTargetTier\":false,\"externalViewCheckIntervalInMs\":1000,\"minAvailableReplicas\":1,"
+            + 
"\"heartbeatIntervalInMs\":300000,\"heartbeatTimeoutInMs\":3600000,\"retryInitialDelayInMs\":300000,"
+            + 
"\"minimizeDataMovement\":\"ENABLE\",\"externalViewStabilizationTimeoutInMs\":3600000},"
+            + "\"originalJobId\":\"job6\"}, tableName=table01}");
+    jobs = RebalanceChecker.getCandidateJobs(tableName, Map.of("job6", 
jobMetadata));
+    assertEquals(jobs.size(), 1);
   }
 
   @Test
@@ -192,6 +215,28 @@ public class RebalanceCheckerTest {
     assertNull(jobTime);
   }
 
+  @Test
+  public void testStuckInProgressJobs()
+      throws Exception {
+    String tableName = "table01";
+    Map<String, Map<String, String>> allJobMetadata = new HashMap<>();
+
+    assertFalse(RebalanceChecker.hasStuckInProgressJobs(tableName, 
allJobMetadata));
+
+    RebalanceConfig jobCfg = new RebalanceConfig();
+    jobCfg.setHeartbeatTimeoutInMs(10_000);
+    TableRebalanceProgressStats stats = new TableRebalanceProgressStats();
+    stats.setStatus(RebalanceResult.Status.IN_PROGRESS);
+    // Even though allowRetries is false, we still abort stuck jobs (heartbeat 
timeout exceeded).
+    TableRebalanceContext jobCtx = 
TableRebalanceContext.forInitialAttempt("job1", jobCfg, false);
+    Map<String, String> jobMetadata = 
ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job1", stats, 
jobCtx);
+    jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
+        String.valueOf(System.currentTimeMillis() - 20_000));
+    allJobMetadata.put("job1", jobMetadata);
+
+    assertTrue(RebalanceChecker.hasStuckInProgressJobs(tableName, 
allJobMetadata));
+  }
+
   @Test
   public void testRetryRebalance()
       throws Exception {
@@ -208,7 +253,7 @@ public class RebalanceCheckerTest {
     TableRebalanceProgressStats stats = new TableRebalanceProgressStats();
     stats.setStatus(RebalanceResult.Status.FAILED);
     stats.setStartTimeMs(1000);
-    TableRebalanceContext jobCtx = 
TableRebalanceContext.forInitialAttempt("job1", jobCfg);
+    TableRebalanceContext jobCtx = 
TableRebalanceContext.forInitialAttempt("job1", jobCfg, true);
     Map<String, String> jobMetadata = 
ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job1", stats, 
jobCtx);
     allJobMetadata.put("job1", jobMetadata);
     // 3 failed retry runs for job1
@@ -225,7 +270,7 @@ public class RebalanceCheckerTest {
     stats = new TableRebalanceProgressStats();
     stats.setStatus(RebalanceResult.Status.FAILED);
     stats.setStartTimeMs(2000);
-    jobCtx = TableRebalanceContext.forInitialAttempt("job2", jobCfg);
+    jobCtx = TableRebalanceContext.forInitialAttempt("job2", jobCfg, true);
     jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName, 
"job2", stats, jobCtx);
     allJobMetadata.put("job2", jobMetadata);
     jobMetadata = createDummyJobMetadata(tableName, "job2", 2, 2100, 
RebalanceResult.Status.DONE);
@@ -237,7 +282,7 @@ public class RebalanceCheckerTest {
     stats = new TableRebalanceProgressStats();
     stats.setStatus(RebalanceResult.Status.IN_PROGRESS);
     stats.setStartTimeMs(3000);
-    jobCtx = TableRebalanceContext.forInitialAttempt("job3", jobCfg);
+    jobCtx = TableRebalanceContext.forInitialAttempt("job3", jobCfg, true);
     jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName, 
"job3", stats, jobCtx);
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, "3000");
     allJobMetadata.put("job3", jobMetadata);
@@ -284,7 +329,7 @@ public class RebalanceCheckerTest {
     TableRebalanceProgressStats stats = new TableRebalanceProgressStats();
     stats.setStatus(RebalanceResult.Status.FAILED);
     stats.setStartTimeMs(nowMs);
-    TableRebalanceContext jobCtx = 
TableRebalanceContext.forInitialAttempt("job1", jobCfg);
+    TableRebalanceContext jobCtx = 
TableRebalanceContext.forInitialAttempt("job1", jobCfg, true);
     Map<String, String> jobMetadata = 
ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job1", stats, 
jobCtx);
     allJobMetadata.put("job1", jobMetadata);
 
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
index f935a22ba7..54778de2fe 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
@@ -71,7 +71,7 @@ public class PinotTableRebalancer extends PinotZKChanger {
     }
 
     ZkBasedTableRebalanceObserver rebalanceObserver = new 
ZkBasedTableRebalanceObserver(tableNameWithType, jobId,
-        TableRebalanceContext.forInitialAttempt(jobId, _rebalanceConfig), 
_propertyStore);
+        TableRebalanceContext.forInitialAttempt(jobId, _rebalanceConfig, 
true), _propertyStore);
 
     return new TableRebalancer(_helixManager, rebalanceObserver, null, null, 
null)
         .rebalance(tableConfig, _rebalanceConfig, jobId);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to