This is an automated email from the ASF dual-hosted git repository. yashmayya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 6639f89842 Enable ZK-based progress tracking for SegmentRelocator rebalances (#16008) 6639f89842 is described below commit 6639f89842f082cabf7a72cf4717166b61384402 Author: Yash Mayya <yash.ma...@gmail.com> AuthorDate: Wed Jun 11 17:47:28 2025 +0100 Enable ZK-based progress tracking for SegmentRelocator rebalances (#16008) --- .../api/resources/PinotTableRestletResource.java | 7 ++- .../helix/core/rebalance/RebalanceChecker.java | 68 ++++++++++++++++++++- .../core/rebalance/TableRebalanceContext.java | 39 ++++++++++-- .../core/rebalance/TableRebalanceManager.java | 20 +++++-- .../rebalance/tenant/DefaultTenantRebalancer.java | 5 +- .../helix/core/relocation/SegmentRelocator.java | 5 +- .../helix/core/rebalance/RebalanceCheckerTest.java | 69 ++++++++++++++++++---- .../apache/pinot/tools/PinotTableRebalancer.java | 2 +- 8 files changed, 184 insertions(+), 31 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index bdb82ce766..7ca3473ffe 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -696,18 +696,19 @@ public class PinotTableRestletResource { if (dryRun || preChecks || downtime) { // For dry-run, preChecks or rebalance with downtime, it's fine to run the rebalance synchronously since it // should be a really short operation. - return _tableRebalanceManager.rebalanceTable(tableNameWithType, rebalanceConfig, rebalanceJobId, false); + return _tableRebalanceManager.rebalanceTable(tableNameWithType, rebalanceConfig, rebalanceJobId, false, false); } else { // Make a dry-run first to get the target assignment rebalanceConfig.setDryRun(true); RebalanceResult dryRunResult = - _tableRebalanceManager.rebalanceTable(tableNameWithType, rebalanceConfig, rebalanceJobId, false); + _tableRebalanceManager.rebalanceTable(tableNameWithType, rebalanceConfig, rebalanceJobId, false, false); if (dryRunResult.getStatus() == RebalanceResult.Status.DONE) { // If dry-run succeeded, run rebalance asynchronously rebalanceConfig.setDryRun(false); CompletableFuture<RebalanceResult> rebalanceResultFuture = - _tableRebalanceManager.rebalanceTableAsync(tableNameWithType, rebalanceConfig, rebalanceJobId, true); + _tableRebalanceManager.rebalanceTableAsync(tableNameWithType, rebalanceConfig, rebalanceJobId, true, + true); rebalanceResultFuture.whenComplete((rebalanceResult, throwable) -> { if (throwable != null) { String errorMsg = String.format("Caught exception/error while rebalancing table: %s", tableNameWithType); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java index b16c6b8197..cac0315393 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java @@ -111,6 +111,11 @@ public class RebalanceChecker extends ControllerPeriodicTask<Void> { @VisibleForTesting void retryRebalanceTable(String tableNameWithType, Map<String, Map<String, String>> allJobMetadata) throws Exception { + // We first check for rebalance jobs that are stuck - i.e., those that are IN_PROGRESS but haven't updated their + // status in ZK within the heartbeat timeout. This could occur if a controller crashes while running a rebalance job + // for instance. These stuck jobs are always aborted here. They will also be retried if it's allowed as per the + // rebalance context. + // // Skip retry for the table if rebalance job is still running or has completed, in specific: // 1) Skip retry if any rebalance job is actively running. Being actively running means the job is at IN_PROGRESS // status, and has updated its status kept in ZK within the heartbeat timeout. It's possible that more than one @@ -118,11 +123,23 @@ public class RebalanceChecker extends ControllerPeriodicTask<Void> { // 2) Skip retry if the most recently started rebalance job has completed with DONE or NO_OP. It's possible that // jobs started earlier may be still running, but they are ignored here. // - // Otherwise, we can get a list of failed rebalance jobs, i.e. those at FAILED status; or IN_PROGRESS status but - // haven't updated their status kept in ZK within the heartbeat timeout. For those candidate jobs to retry: + // Note that it should be very unlikely to have scenarios where there are more than one rebalance jobs running + // for a table at the same time, or to have a rebalance job that started earlier but completed later than the one + // started most recently since we try to prevent new rebalances from being triggered while a rebalance is in + // progress by checking ZK metadata. Such scenarios can only occur if multiple rebalance jobs are triggered at the + // same time and the second one is started before the first one updates its status in ZK. + // + // If we detect that a retry is required based on the above criteria, we can get a list of failed rebalance jobs, + // i.e. those at FAILED status; or IN_PROGRESS status but haven't updated their status kept in ZK within the + // heartbeat timeout. For those candidate jobs to retry: // 1) Firstly, group them by the original jobIds they retry for so that we can skip those exceeded maxRetry. // 2) For the remaining jobs, we take the one started most recently and retry it with its original configs. // 3) If configured, we can abort the other rebalance jobs for the table by setting their status to FAILED. + + if (hasStuckInProgressJobs(tableNameWithType, allJobMetadata)) { + abortExistingJobs(tableNameWithType, _pinotHelixResourceManager); + } + Map<String/*original jobId*/, Set<Pair<TableRebalanceContext/*job attempts*/, Long /*startTime*/>>> candidateJobs = getCandidateJobs(tableNameWithType, allJobMetadata); if (candidateJobs.isEmpty()) { @@ -147,7 +164,7 @@ public class RebalanceChecker extends ControllerPeriodicTask<Void> { retryDelayMs); return; } - abortExistingJobs(tableNameWithType, _pinotHelixResourceManager); + // Get tableConfig only when the table needs to retry rebalance, and get it before submitting rebalance to another // thread, in order to avoid unnecessary ZK reads and making too many ZK reads in a short time. TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); @@ -250,6 +267,47 @@ public class RebalanceChecker extends ControllerPeriodicTask<Void> { return candidateJobRun; } + /** + * Check if there are any rebalance jobs that are stuck in IN_PROGRESS status, i.e., they have not updated their + * status in ZK within the configured heartbeat timeout. + * @param tableNameWithType the table name with type + * @param allJobMetadata the metadata of all rebalance jobs for the table + * @return true if there are stuck rebalance jobs, false otherwise + */ + @VisibleForTesting + static boolean hasStuckInProgressJobs(String tableNameWithType, Map<String, Map<String, String>> allJobMetadata) + throws Exception { + for (Map.Entry<String, Map<String, String>> entry : allJobMetadata.entrySet()) { + String jobId = entry.getKey(); + Map<String, String> jobMetadata = entry.getValue(); + long statsUpdatedAt = Long.parseLong(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)); + String jobStatsInStr = jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS); + if (StringUtils.isEmpty(jobStatsInStr)) { + // Skip rebalance job as it has no job progress stats + continue; + } + String jobCtxInStr = jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT); + if (StringUtils.isEmpty(jobCtxInStr)) { + // Skip rebalance job: {} as it has no job context + continue; + } + TableRebalanceProgressStats jobStats = JsonUtils.stringToObject(jobStatsInStr, TableRebalanceProgressStats.class); + TableRebalanceContext jobCtx = JsonUtils.stringToObject(jobCtxInStr, TableRebalanceContext.class); + + if (jobStats.getStatus() == RebalanceResult.Status.IN_PROGRESS) { + long heartbeatTimeoutMs = jobCtx.getConfig().getHeartbeatTimeoutInMs(); + if (System.currentTimeMillis() - statsUpdatedAt >= heartbeatTimeoutMs) { + LOGGER.info("Found stuck rebalance job: {} for table: {} that has not updated its status in ZK within " + + "heartbeat timeout: {}", jobId, tableNameWithType, heartbeatTimeoutMs); + return true; + } else { + return false; + } + } + } + return false; + } + @VisibleForTesting static Map<String, Set<Pair<TableRebalanceContext, Long>>> getCandidateJobs(String tableNameWithType, Map<String, Map<String, String>> allJobMetadata) @@ -279,6 +337,10 @@ public class RebalanceChecker extends ControllerPeriodicTask<Void> { } TableRebalanceProgressStats jobStats = JsonUtils.stringToObject(jobStatsInStr, TableRebalanceProgressStats.class); TableRebalanceContext jobCtx = JsonUtils.stringToObject(jobCtxInStr, TableRebalanceContext.class); + if (!jobCtx.getAllowRetries()) { + LOGGER.info("Skip rebalance job: {} as it does not allow retries", jobId); + continue; + } long jobStartTimeMs = jobStats.getStartTimeMs(); if (latestStartedJob == null || latestStartedJob.getRight() < jobStartTimeMs) { latestStartedJob = Pair.of(jobId, jobStartTimeMs); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceContext.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceContext.java index 48e43558cc..aea3a2ff0d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceContext.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceContext.java @@ -27,24 +27,45 @@ public class TableRebalanceContext { private String _originalJobId; private RebalanceConfig _config; private int _attemptId; + // Default to true for all user initiated rebalances, so that they can be retried if they fail or get stuck. + private boolean _allowRetries = true; - public static TableRebalanceContext forInitialAttempt(String originalJobId, RebalanceConfig config) { - return new TableRebalanceContext(originalJobId, config, INITIAL_ATTEMPT_ID); + /** + * Creates a new TableRebalanceContext for the initial attempt of a rebalance job. + * + * @param originalJobId The original job ID for the rebalance job. + * @param config The rebalance configuration. + * @param allowRetries Whether retries are allowed for this rebalance job. This isn't part of {@link RebalanceConfig} + * because user initiated rebalances should always support retries for failed and stuck jobs. + * @return A new TableRebalanceContext instance. + */ + public static TableRebalanceContext forInitialAttempt(String originalJobId, RebalanceConfig config, + boolean allowRetries) { + return new TableRebalanceContext(originalJobId, config, INITIAL_ATTEMPT_ID, allowRetries); } + /** + * Creates a new TableRebalanceContext for a retry attempt of a rebalance job. + * + * @param originalJobId The original job ID for the rebalance job. + * @param config The rebalance configuration. + * @param attemptId The attempt ID for the retry. + * @return A new TableRebalanceContext instance. + */ public static TableRebalanceContext forRetry(String originalJobId, RebalanceConfig config, int attemptId) { - return new TableRebalanceContext(originalJobId, config, attemptId); + return new TableRebalanceContext(originalJobId, config, attemptId, true); } public TableRebalanceContext() { // For JSON deserialization. } - private TableRebalanceContext(String originalJobId, RebalanceConfig config, int attemptId) { + private TableRebalanceContext(String originalJobId, RebalanceConfig config, int attemptId, boolean allowRetries) { _jobId = createAttemptJobId(originalJobId, attemptId); _originalJobId = originalJobId; _config = config; _attemptId = attemptId; + _allowRetries = allowRetries; } public int getAttemptId() { @@ -79,10 +100,18 @@ public class TableRebalanceContext { _config = config; } + public boolean getAllowRetries() { + return _allowRetries; + } + + public void setAllowRetries(boolean allowRetries) { + _allowRetries = allowRetries; + } + @Override public String toString() { return "TableRebalanceContext{" + "_jobId='" + _jobId + '\'' + ", _originalJobId='" + _originalJobId + '\'' - + ", _config=" + _config + ", _attemptId=" + _attemptId + '}'; + + ", _config=" + _config + ", _attemptId=" + _attemptId + ", _allowRetries=" + _allowRetries + "}"; } private static String createAttemptJobId(String originalJobId, int attemptId) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java index 87de12b3d7..b700582593 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java @@ -79,12 +79,19 @@ public class TableRebalanceManager { * @param rebalanceConfig configuration for the rebalance operation * @param rebalanceJobId ID of the rebalance job, which is used to track the progress of the rebalance operation * @param trackRebalanceProgress whether to track rebalance progress stats in ZK + * @param allowRetries whether to allow retries for failed or stuck rebalance operations (through + * {@link RebalanceChecker}). Requires {@code trackRebalanceProgress} to be true. * @return result of the rebalance operation * @throws TableNotFoundException if the table does not exist + * @throws RebalanceInProgressException if a rebalance job is already in progress for the table (as per ZK metadata) */ public RebalanceResult rebalanceTable(String tableNameWithType, RebalanceConfig rebalanceConfig, - String rebalanceJobId, boolean trackRebalanceProgress) + String rebalanceJobId, boolean trackRebalanceProgress, boolean allowRetries) throws TableNotFoundException, RebalanceInProgressException { + if (allowRetries && !trackRebalanceProgress) { + throw new IllegalArgumentException( + "Rebalance retries are only supported when rebalance progress is tracked in ZK"); + } TableConfig tableConfig = _resourceManager.getTableConfig(tableNameWithType); if (tableConfig == null) { throw new TableNotFoundException("Failed to find table config for table: " + tableNameWithType); @@ -93,7 +100,7 @@ public class TableRebalanceManager { ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver = null; if (trackRebalanceProgress) { zkBasedTableRebalanceObserver = new ZkBasedTableRebalanceObserver(tableNameWithType, rebalanceJobId, - TableRebalanceContext.forInitialAttempt(rebalanceJobId, rebalanceConfig), + TableRebalanceContext.forInitialAttempt(rebalanceJobId, rebalanceConfig, allowRetries), _resourceManager.getPropertyStore()); } return rebalanceTable(tableNameWithType, tableConfig, rebalanceJobId, rebalanceConfig, @@ -109,11 +116,14 @@ public class TableRebalanceManager { * @param rebalanceConfig configuration for the rebalance operation * @param rebalanceJobId ID of the rebalance job, which is used to track the progress of the rebalance operation * @param trackRebalanceProgress whether to track rebalance progress stats in ZK + * @param allowRetries whether to allow retries for failed or stuck rebalance operations (through + * {@link RebalanceChecker}). Requires {@code trackRebalanceProgress} to be true. * @return a CompletableFuture that will complete with the result of the rebalance operation * @throws TableNotFoundException if the table does not exist + * @throws RebalanceInProgressException if a rebalance job is already in progress for the table (as per ZK metadata) */ public CompletableFuture<RebalanceResult> rebalanceTableAsync(String tableNameWithType, - RebalanceConfig rebalanceConfig, String rebalanceJobId, boolean trackRebalanceProgress) + RebalanceConfig rebalanceConfig, String rebalanceJobId, boolean trackRebalanceProgress, boolean allowRetries) throws TableNotFoundException, RebalanceInProgressException { TableConfig tableConfig = _resourceManager.getTableConfig(tableNameWithType); if (tableConfig == null) { @@ -125,7 +135,8 @@ public class TableRebalanceManager { return CompletableFuture.supplyAsync( () -> { try { - return rebalanceTable(tableNameWithType, rebalanceConfig, rebalanceJobId, trackRebalanceProgress); + return rebalanceTable(tableNameWithType, rebalanceConfig, rebalanceJobId, trackRebalanceProgress, + allowRetries); } catch (TableNotFoundException e) { // Should not happen since we already checked for table existence throw new RuntimeException(e); @@ -147,6 +158,7 @@ public class TableRebalanceManager { * @param rebalanceConfig configuration for the rebalance operation * @param zkBasedTableRebalanceObserver observer to track rebalance progress in ZK * @return a CompletableFuture that will complete with the result of the rebalance operation + * @throws RebalanceInProgressException if a rebalance job is already in progress for the table (as per ZK metadata) */ public CompletableFuture<RebalanceResult> rebalanceTableAsync(String tableNameWithType, TableConfig tableConfig, String rebalanceJobId, RebalanceConfig rebalanceConfig, diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java index 369024faa6..3f288ef5ff 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java @@ -61,7 +61,8 @@ public class DefaultTenantRebalancer implements TenantRebalancer { RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config); rebalanceConfig.setDryRun(true); rebalanceResult.put(table, - _tableRebalanceManager.rebalanceTable(table, rebalanceConfig, createUniqueRebalanceJobIdentifier(), false)); + _tableRebalanceManager.rebalanceTable(table, rebalanceConfig, createUniqueRebalanceJobIdentifier(), false, + false)); } catch (TableNotFoundException | RebalanceInProgressException exception) { rebalanceResult.put(table, new RebalanceResult(null, RebalanceResult.Status.FAILED, exception.getMessage(), null, null, null, null, null)); @@ -203,7 +204,7 @@ public class DefaultTenantRebalancer implements TenantRebalancer { TenantRebalanceObserver observer) { try { observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_STARTED_TRIGGER, tableName, rebalanceJobId); - RebalanceResult result = _tableRebalanceManager.rebalanceTable(tableName, config, rebalanceJobId, true); + RebalanceResult result = _tableRebalanceManager.rebalanceTable(tableName, config, rebalanceJobId, true, true); if (result.getStatus().equals(RebalanceResult.Status.DONE)) { observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER, tableName, null); } else { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java index b2144260c0..fd10148ce3 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java @@ -232,8 +232,11 @@ public class SegmentRelocator extends ControllerPeriodicTask<Void> { // We're not using the async rebalance API here because we want to run this on a separate thread pool from the // rebalance thread pool that is used for user initiated rebalances. + + // Retries are disabled because SegmentRelocator itself is a periodic controller task, so we don't want the + // RebalanceChecker to unnecessarily retry any such failed rebalances. RebalanceResult rebalance = _tableRebalanceManager.rebalanceTable(tableNameWithType, rebalanceConfig, - TableRebalancer.createUniqueRebalanceJobIdentifier(), false); + TableRebalancer.createUniqueRebalanceJobIdentifier(), true, false); switch (rebalance.getStatus()) { case NO_OP: LOGGER.info("All segments are already relocated for table: {}", tableNameWithType); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java index d88b1319c1..373815c969 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java @@ -51,10 +51,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertTrue; +import static org.testng.Assert.*; public class RebalanceCheckerTest { @@ -87,7 +84,7 @@ public class RebalanceCheckerTest { TableRebalanceProgressStats stats = new TableRebalanceProgressStats(); stats.setStatus(RebalanceResult.Status.FAILED); stats.setStartTimeMs(1000); - TableRebalanceContext jobCtx = TableRebalanceContext.forInitialAttempt("job1", jobCfg); + TableRebalanceContext jobCtx = TableRebalanceContext.forInitialAttempt("job1", jobCfg, true); Map<String, String> jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job1", stats, jobCtx); allJobMetadata.put("job1", jobMetadata); // 3 failed retry runs for job1 @@ -104,7 +101,7 @@ public class RebalanceCheckerTest { stats = new TableRebalanceProgressStats(); stats.setStatus(RebalanceResult.Status.FAILED); stats.setStartTimeMs(2000); - jobCtx = TableRebalanceContext.forInitialAttempt("job2", jobCfg); + jobCtx = TableRebalanceContext.forInitialAttempt("job2", jobCfg, true); jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job2", stats, jobCtx); allJobMetadata.put("job2", jobMetadata); jobMetadata = createDummyJobMetadata(tableName, "job2", 2, 2100, RebalanceResult.Status.DONE); @@ -116,7 +113,7 @@ public class RebalanceCheckerTest { stats = new TableRebalanceProgressStats(); stats.setStatus(RebalanceResult.Status.IN_PROGRESS); stats.setStartTimeMs(3000); - jobCtx = TableRebalanceContext.forInitialAttempt("job3", jobCfg); + jobCtx = TableRebalanceContext.forInitialAttempt("job3", jobCfg, true); jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job3", stats, jobCtx); jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, "3000"); allJobMetadata.put("job3", jobMetadata); @@ -152,11 +149,37 @@ public class RebalanceCheckerTest { stats = new TableRebalanceProgressStats(); stats.setStatus(RebalanceResult.Status.DONE); stats.setStartTimeMs(5000); - jobCtx = TableRebalanceContext.forInitialAttempt("job5", jobCfg); + jobCtx = TableRebalanceContext.forInitialAttempt("job5", jobCfg, true); jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job5", stats, jobCtx); allJobMetadata.put("job5", jobMetadata); jobs = RebalanceChecker.getCandidateJobs(tableName, allJobMetadata); assertEquals(jobs.size(), 0); + + // Add job6 that doesn't support retries as per its rebalance context (used by system initiated rebalances in + // practice). + jobCfg = new RebalanceConfig(); + jobCfg.setMaxAttempts(4); + stats = new TableRebalanceProgressStats(); + stats.setStatus(RebalanceResult.Status.FAILED); + stats.setStartTimeMs(5000); + jobCtx = TableRebalanceContext.forInitialAttempt("job6", jobCfg, false); + jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job6", stats, jobCtx); + allJobMetadata.put("job6", jobMetadata); + jobs = RebalanceChecker.getCandidateJobs(tableName, allJobMetadata); + assertEquals(jobs.size(), 0); + + // Ensure that a job serialized using an older version of TableRebalanceContext without the allowRetries field is + // retriable by default. + jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT, + "{\"jobId\":\"job6\",\"attemptId\":1,\"config\":{\"maxAttempts\":4,\"dryRun\":false,\"preChecks\":false," + + "\"bootstrap\":false,\"downtime\":false,\"lowDiskMode\":false,\"bestEfforts\":false," + + "\"reassignInstances\":false,\"includeConsuming\":false,\"batchSizePerServer\":-1," + + "\"updateTargetTier\":false,\"externalViewCheckIntervalInMs\":1000,\"minAvailableReplicas\":1," + + "\"heartbeatIntervalInMs\":300000,\"heartbeatTimeoutInMs\":3600000,\"retryInitialDelayInMs\":300000," + + "\"minimizeDataMovement\":\"ENABLE\",\"externalViewStabilizationTimeoutInMs\":3600000}," + + "\"originalJobId\":\"job6\"}, tableName=table01}"); + jobs = RebalanceChecker.getCandidateJobs(tableName, Map.of("job6", jobMetadata)); + assertEquals(jobs.size(), 1); } @Test @@ -192,6 +215,28 @@ public class RebalanceCheckerTest { assertNull(jobTime); } + @Test + public void testStuckInProgressJobs() + throws Exception { + String tableName = "table01"; + Map<String, Map<String, String>> allJobMetadata = new HashMap<>(); + + assertFalse(RebalanceChecker.hasStuckInProgressJobs(tableName, allJobMetadata)); + + RebalanceConfig jobCfg = new RebalanceConfig(); + jobCfg.setHeartbeatTimeoutInMs(10_000); + TableRebalanceProgressStats stats = new TableRebalanceProgressStats(); + stats.setStatus(RebalanceResult.Status.IN_PROGRESS); + // Even though allowRetries is false, we still abort stuck jobs (heartbeat timeout exceeded). + TableRebalanceContext jobCtx = TableRebalanceContext.forInitialAttempt("job1", jobCfg, false); + Map<String, String> jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job1", stats, jobCtx); + jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, + String.valueOf(System.currentTimeMillis() - 20_000)); + allJobMetadata.put("job1", jobMetadata); + + assertTrue(RebalanceChecker.hasStuckInProgressJobs(tableName, allJobMetadata)); + } + @Test public void testRetryRebalance() throws Exception { @@ -208,7 +253,7 @@ public class RebalanceCheckerTest { TableRebalanceProgressStats stats = new TableRebalanceProgressStats(); stats.setStatus(RebalanceResult.Status.FAILED); stats.setStartTimeMs(1000); - TableRebalanceContext jobCtx = TableRebalanceContext.forInitialAttempt("job1", jobCfg); + TableRebalanceContext jobCtx = TableRebalanceContext.forInitialAttempt("job1", jobCfg, true); Map<String, String> jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job1", stats, jobCtx); allJobMetadata.put("job1", jobMetadata); // 3 failed retry runs for job1 @@ -225,7 +270,7 @@ public class RebalanceCheckerTest { stats = new TableRebalanceProgressStats(); stats.setStatus(RebalanceResult.Status.FAILED); stats.setStartTimeMs(2000); - jobCtx = TableRebalanceContext.forInitialAttempt("job2", jobCfg); + jobCtx = TableRebalanceContext.forInitialAttempt("job2", jobCfg, true); jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job2", stats, jobCtx); allJobMetadata.put("job2", jobMetadata); jobMetadata = createDummyJobMetadata(tableName, "job2", 2, 2100, RebalanceResult.Status.DONE); @@ -237,7 +282,7 @@ public class RebalanceCheckerTest { stats = new TableRebalanceProgressStats(); stats.setStatus(RebalanceResult.Status.IN_PROGRESS); stats.setStartTimeMs(3000); - jobCtx = TableRebalanceContext.forInitialAttempt("job3", jobCfg); + jobCtx = TableRebalanceContext.forInitialAttempt("job3", jobCfg, true); jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job3", stats, jobCtx); jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, "3000"); allJobMetadata.put("job3", jobMetadata); @@ -284,7 +329,7 @@ public class RebalanceCheckerTest { TableRebalanceProgressStats stats = new TableRebalanceProgressStats(); stats.setStatus(RebalanceResult.Status.FAILED); stats.setStartTimeMs(nowMs); - TableRebalanceContext jobCtx = TableRebalanceContext.forInitialAttempt("job1", jobCfg); + TableRebalanceContext jobCtx = TableRebalanceContext.forInitialAttempt("job1", jobCfg, true); Map<String, String> jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job1", stats, jobCtx); allJobMetadata.put("job1", jobMetadata); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java index f935a22ba7..54778de2fe 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java @@ -71,7 +71,7 @@ public class PinotTableRebalancer extends PinotZKChanger { } ZkBasedTableRebalanceObserver rebalanceObserver = new ZkBasedTableRebalanceObserver(tableNameWithType, jobId, - TableRebalanceContext.forInitialAttempt(jobId, _rebalanceConfig), _propertyStore); + TableRebalanceContext.forInitialAttempt(jobId, _rebalanceConfig, true), _propertyStore); return new TableRebalancer(_helixManager, rebalanceObserver, null, null, null) .rebalance(tableConfig, _rebalanceConfig, jobId); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org