yashmayya commented on code in PR #16886:
URL: https://github.com/apache/pinot/pull/16886#discussion_r2388987512


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java:
##########
@@ -701,6 +703,46 @@ public SuccessResponse deleteTenant(
         Response.Status.INTERNAL_SERVER_ERROR);
   }
 
+  @DELETE
+  @Produces(MediaType.APPLICATION_JSON)
+  @Authenticate(AccessType.DELETE)
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.REBALANCE_TENANT_TABLES)
+  @Path("/tenants/rebalance/{jobId}")
+  @ApiOperation(value = "Cancels a running tenant rebalance job")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success", response = 
SuccessResponse.class),
+      @ApiResponse(code = 404, message = "Tenant rebalance job not found"),
+      @ApiResponse(code = 500, message = "Internal server error during 
cancelling the rebalance job")

Review Comment:
   ```suggestion
         @ApiResponse(code = 500, message = "Internal server error while 
cancelling the rebalance job")
   ```
   nit



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java:
##########
@@ -701,6 +703,46 @@ public SuccessResponse deleteTenant(
         Response.Status.INTERNAL_SERVER_ERROR);
   }
 
+  @DELETE
+  @Produces(MediaType.APPLICATION_JSON)
+  @Authenticate(AccessType.DELETE)
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.REBALANCE_TENANT_TABLES)
+  @Path("/tenants/rebalance/{jobId}")
+  @ApiOperation(value = "Cancels a running tenant rebalance job")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success", response = 
SuccessResponse.class),
+      @ApiResponse(code = 404, message = "Tenant rebalance job not found"),
+      @ApiResponse(code = 500, message = "Internal server error during 
cancelling the rebalance job")
+  })
+  public SuccessResponse cancelRebalance(
+      @ApiParam(value = "Tenant rebalance job id", required = true) 
@PathParam("jobId") String jobId) {
+    Map<String, String> jobMetadata =
+        _pinotHelixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobTypes.TENANT_REBALANCE);
+    if (jobMetadata == null) {
+      throw new ControllerApplicationException(LOGGER, "Failed to cancel 
tenant rebalance job: " + jobId,
+          Response.Status.NOT_FOUND);
+    }
+    try {
+      ZkBasedTenantRebalanceObserver observer =
+          new ZkBasedTenantRebalanceObserver(jobId, 
jobMetadata.get(CommonConstants.ControllerJob.TENANT_NAME),
+              _pinotHelixResourceManager);
+      Pair<List<String>, Boolean> result = observer.cancelJob(true);
+      if (result.getRight()) {
+        return new SuccessResponse(
+            "Successfully cancelled tenant rebalance job: " + jobId + ". 
Cancelled " + result.getLeft().size()
+                + " table rebalance jobs: " + result.getLeft());
+      } else {
+        throw new ControllerApplicationException(LOGGER,
+            "Failed to cancel tenant rebalance job: " + jobId + ", yet " + 
result.getLeft().size()
+                + " table rebalance jobs are cancelled successfully: " + 
result.getLeft(),
+            Response.Status.INTERNAL_SERVER_ERROR);

Review Comment:
   nit: this message will look a bit strange if `result.getLeft()` is 0. I'd 
suggest rewording it to something like: `Failed to cancel tenant rebalance job: 
{jobId}. Number of table rebalance jobs cancelled successfully: {num}` instead. 
Also we're not returning any context on the failure to the users here?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java:
##########
@@ -74,66 +82,211 @@ public ZkBasedTenantRebalanceObserver(String jobId, String 
tenantName, Set<Strin
 
   @Override
   public void onTrigger(Trigger trigger, String tableName, String description) 
{
-    switch (trigger) {
-      case START_TRIGGER:
-        _progressStats.setStartTimeMs(System.currentTimeMillis());
-        break;
-      case REBALANCE_STARTED_TRIGGER:
-        _progressStats.updateTableStatus(tableName, 
TenantRebalanceProgressStats.TableStatus.PROCESSING.name());
-        _progressStats.putTableRebalanceJobId(tableName, description);
-        break;
-      case REBALANCE_COMPLETED_TRIGGER:
-        _progressStats.updateTableStatus(tableName, 
TenantRebalanceProgressStats.TableStatus.PROCESSED.name());
-        _unprocessedTables.remove(tableName);
-        _progressStats.setRemainingTables(_unprocessedTables.size());
-        break;
-      case REBALANCE_ERRORED_TRIGGER:
-        _progressStats.updateTableStatus(tableName, description);
-        _unprocessedTables.remove(tableName);
-        _progressStats.setRemainingTables(_unprocessedTables.size());
-        break;
-      default:
+  }
+
+  public void onStart() {
+    try {
+      updateTenantRebalanceJobMetadataInZk(
+          (ctx, progressStats) -> 
progressStats.setStartTimeMs(System.currentTimeMillis()));
+    } catch (AttemptFailureException e) {
+      LOGGER.error("Error updating ZK for jobId: {} on starting tenant 
rebalance", _jobId, e);
+      throw new RuntimeException("Error updating ZK for jobId: " + _jobId + " 
on starting tenant rebalance", e);
     }
-    syncStatsAndContextInZk();
   }
 
   @Override
   public void onSuccess(String msg) {
-    _progressStats.setCompletionStatusMsg(msg);
-    _progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() - 
_progressStats.getStartTimeMs()) / 1000);
-    syncStatsAndContextInZk();
-    _isDone = true;
+    onFinish(msg);
   }
 
   @Override
   public void onError(String errorMsg) {
-    _progressStats.setCompletionStatusMsg(errorMsg);
-    _progressStats.setTimeToFinishInSeconds(System.currentTimeMillis() - 
_progressStats.getStartTimeMs());
-    syncStatsAndContextInZk();
+    onFinish(errorMsg);
+  }
+
+  private void onFinish(String msg) {
+    try {
+      updateTenantRebalanceJobMetadataInZk((ctx, progressStats) -> {
+        if (StringUtils.isEmpty(progressStats.getCompletionStatusMsg())) {
+          progressStats.setCompletionStatusMsg(msg);
+          progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() - 
progressStats.getStartTimeMs()) / 1000);
+        }
+      });
+    } catch (AttemptFailureException e) {
+      LOGGER.error("Error updating ZK for jobId: {} on successful completion 
of tenant rebalance", _jobId, e);
+      throw new RuntimeException(
+          "Error updating ZK for jobId: " + _jobId + " on successful 
completion of tenant rebalance", e);
+    }
     _isDone = true;
   }
 
-  private void syncStatsAndContextInZk() {
+  private Map<String, String> makeJobMetadata(TenantRebalanceContext 
tenantRebalanceContext,
+      TenantRebalanceProgressStats progressStats) {
     Map<String, String> jobMetadata = new HashMap<>();
     jobMetadata.put(CommonConstants.ControllerJob.TENANT_NAME, _tenantName);
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _jobId);
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(System.currentTimeMillis()));
     jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobTypes.TENANT_REBALANCE.name());
     try {
       
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
-          JsonUtils.objectToString(_progressStats));
+          JsonUtils.objectToString(progressStats));
     } catch (JsonProcessingException e) {
       LOGGER.error("Error serialising rebalance stats to JSON for persisting 
to ZK {}", _jobId, e);
     }
     try {
       jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
-          JsonUtils.objectToString(_tenantRebalanceContext));
+          JsonUtils.objectToString(tenantRebalanceContext));
     } catch (JsonProcessingException e) {
       LOGGER.error("Error serialising rebalance context to JSON for persisting 
to ZK {}", _jobId, e);
     }
-    _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata, 
ControllerJobTypes.TENANT_REBALANCE);
-    _numUpdatesToZk++;
-    LOGGER.debug("Number of updates to Zk: {} for rebalanceJob: {}  ", 
_numUpdatesToZk, _jobId);
+    return jobMetadata;
+  }
+
+  public TenantRebalancer.TenantTableRebalanceJobContext pollQueue(boolean 
isParallel) {
+    final TenantRebalancer.TenantTableRebalanceJobContext[] ret =

Review Comment:
   nit: I'd suggest using an `AtomicReference` rather than an array to get 
around the lambda's effectively final requirement, makes it cleaner IMO.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java:
##########
@@ -251,20 +259,18 @@ private void 
doConsumeTablesFromQueueAndRebalance(Queue<TenantTableRebalanceJobC
         if (result.getStatus().equals(RebalanceResult.Status.DONE)) {
           LOGGER.info("Completed rebalance for table: {} with table rebalance 
job ID: {} in tenant rebalance job: {}",
               tableName, rebalanceJobId, observer.getJobId());
-          ongoingJobs.remove(jobContext);
-          
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER, 
tableName, null);
+          observer.onTableJobDone(jobContext);
         } else {
-          LOGGER.warn("Rebalance for table: {} with table rebalance job ID: {} 
in tenant rebalance job: {} is not done."
+          LOGGER.warn(
+              "Rebalance for table: {} with table rebalance job ID: {} in 
tenant rebalance job: {} is not done."
                   + "Status: {}, Description: {}", tableName, rebalanceJobId, 
observer.getJobId(), result.getStatus(),
               result.getDescription());
-          ongoingJobs.remove(jobContext);
-          
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER, 
tableName,
-              result.getDescription());
+          observer.onTableJobError(jobContext, result.getDescription());
         }
       } catch (Throwable t) {

Review Comment:
   Not a new change, but why are we catching `Throwable` and not `Exception` 
here? This is generally a bad idea in most situations.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java:
##########
@@ -74,66 +82,211 @@ public ZkBasedTenantRebalanceObserver(String jobId, String 
tenantName, Set<Strin
 
   @Override
   public void onTrigger(Trigger trigger, String tableName, String description) 
{
-    switch (trigger) {
-      case START_TRIGGER:
-        _progressStats.setStartTimeMs(System.currentTimeMillis());
-        break;
-      case REBALANCE_STARTED_TRIGGER:
-        _progressStats.updateTableStatus(tableName, 
TenantRebalanceProgressStats.TableStatus.PROCESSING.name());
-        _progressStats.putTableRebalanceJobId(tableName, description);
-        break;
-      case REBALANCE_COMPLETED_TRIGGER:
-        _progressStats.updateTableStatus(tableName, 
TenantRebalanceProgressStats.TableStatus.PROCESSED.name());
-        _unprocessedTables.remove(tableName);
-        _progressStats.setRemainingTables(_unprocessedTables.size());
-        break;
-      case REBALANCE_ERRORED_TRIGGER:
-        _progressStats.updateTableStatus(tableName, description);
-        _unprocessedTables.remove(tableName);
-        _progressStats.setRemainingTables(_unprocessedTables.size());
-        break;
-      default:
+  }

Review Comment:
   TBH we can probably remove the interface itself given that this is the only 
implementation and you've refactored away usages of the various triggers.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java:
##########
@@ -227,9 +227,10 @@ RebalanceResult rebalanceTable(String tableNameWithType, 
TableConfig tableConfig
    * @param tableNameWithType name of the table for which to cancel any 
ongoing rebalance job
    * @return the list of job IDs that were cancelled
    */
-  public List<String> cancelRebalance(String tableNameWithType) {
+  public static List<String> cancelRebalance(String tableNameWithType, 
PinotHelixResourceManager resourceManager,
+      RebalanceResult.Status setToStatus) {

Review Comment:
   nit: add the new parameter to the Javadoc and also add a precondition check 
to verify that the status is one of `CANCELLED` or `ABORTED`.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java:
##########
@@ -701,6 +703,46 @@ public SuccessResponse deleteTenant(
         Response.Status.INTERNAL_SERVER_ERROR);
   }
 
+  @DELETE
+  @Produces(MediaType.APPLICATION_JSON)
+  @Authenticate(AccessType.DELETE)
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.REBALANCE_TENANT_TABLES)
+  @Path("/tenants/rebalance/{jobId}")
+  @ApiOperation(value = "Cancels a running tenant rebalance job")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success", response = 
SuccessResponse.class),
+      @ApiResponse(code = 404, message = "Tenant rebalance job not found"),
+      @ApiResponse(code = 500, message = "Internal server error during 
cancelling the rebalance job")
+  })
+  public SuccessResponse cancelRebalance(
+      @ApiParam(value = "Tenant rebalance job id", required = true) 
@PathParam("jobId") String jobId) {
+    Map<String, String> jobMetadata =
+        _pinotHelixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobTypes.TENANT_REBALANCE);
+    if (jobMetadata == null) {
+      throw new ControllerApplicationException(LOGGER, "Failed to cancel 
tenant rebalance job: " + jobId,

Review Comment:
   nit: shouldn't the message also indicate that no tenant rebalance with the 
given `jobId` was found?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java:
##########
@@ -74,66 +82,211 @@ public ZkBasedTenantRebalanceObserver(String jobId, String 
tenantName, Set<Strin
 
   @Override
   public void onTrigger(Trigger trigger, String tableName, String description) 
{
-    switch (trigger) {
-      case START_TRIGGER:
-        _progressStats.setStartTimeMs(System.currentTimeMillis());
-        break;
-      case REBALANCE_STARTED_TRIGGER:
-        _progressStats.updateTableStatus(tableName, 
TenantRebalanceProgressStats.TableStatus.PROCESSING.name());
-        _progressStats.putTableRebalanceJobId(tableName, description);
-        break;
-      case REBALANCE_COMPLETED_TRIGGER:
-        _progressStats.updateTableStatus(tableName, 
TenantRebalanceProgressStats.TableStatus.PROCESSED.name());
-        _unprocessedTables.remove(tableName);
-        _progressStats.setRemainingTables(_unprocessedTables.size());
-        break;
-      case REBALANCE_ERRORED_TRIGGER:
-        _progressStats.updateTableStatus(tableName, description);
-        _unprocessedTables.remove(tableName);
-        _progressStats.setRemainingTables(_unprocessedTables.size());
-        break;
-      default:
+  }
+
+  public void onStart() {
+    try {
+      updateTenantRebalanceJobMetadataInZk(
+          (ctx, progressStats) -> 
progressStats.setStartTimeMs(System.currentTimeMillis()));
+    } catch (AttemptFailureException e) {
+      LOGGER.error("Error updating ZK for jobId: {} on starting tenant 
rebalance", _jobId, e);
+      throw new RuntimeException("Error updating ZK for jobId: " + _jobId + " 
on starting tenant rebalance", e);
     }
-    syncStatsAndContextInZk();
   }
 
   @Override
   public void onSuccess(String msg) {
-    _progressStats.setCompletionStatusMsg(msg);
-    _progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() - 
_progressStats.getStartTimeMs()) / 1000);
-    syncStatsAndContextInZk();
-    _isDone = true;
+    onFinish(msg);
   }
 
   @Override
   public void onError(String errorMsg) {
-    _progressStats.setCompletionStatusMsg(errorMsg);
-    _progressStats.setTimeToFinishInSeconds(System.currentTimeMillis() - 
_progressStats.getStartTimeMs());
-    syncStatsAndContextInZk();
+    onFinish(errorMsg);
+  }
+
+  private void onFinish(String msg) {
+    try {
+      updateTenantRebalanceJobMetadataInZk((ctx, progressStats) -> {
+        if (StringUtils.isEmpty(progressStats.getCompletionStatusMsg())) {
+          progressStats.setCompletionStatusMsg(msg);
+          progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() - 
progressStats.getStartTimeMs()) / 1000);
+        }
+      });
+    } catch (AttemptFailureException e) {
+      LOGGER.error("Error updating ZK for jobId: {} on successful completion 
of tenant rebalance", _jobId, e);
+      throw new RuntimeException(
+          "Error updating ZK for jobId: " + _jobId + " on successful 
completion of tenant rebalance", e);
+    }
     _isDone = true;
   }
 
-  private void syncStatsAndContextInZk() {
+  private Map<String, String> makeJobMetadata(TenantRebalanceContext 
tenantRebalanceContext,
+      TenantRebalanceProgressStats progressStats) {
     Map<String, String> jobMetadata = new HashMap<>();
     jobMetadata.put(CommonConstants.ControllerJob.TENANT_NAME, _tenantName);
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _jobId);
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(System.currentTimeMillis()));
     jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobTypes.TENANT_REBALANCE.name());
     try {
       
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
-          JsonUtils.objectToString(_progressStats));
+          JsonUtils.objectToString(progressStats));
     } catch (JsonProcessingException e) {
       LOGGER.error("Error serialising rebalance stats to JSON for persisting 
to ZK {}", _jobId, e);
     }
     try {
       jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
-          JsonUtils.objectToString(_tenantRebalanceContext));
+          JsonUtils.objectToString(tenantRebalanceContext));
     } catch (JsonProcessingException e) {
       LOGGER.error("Error serialising rebalance context to JSON for persisting 
to ZK {}", _jobId, e);
     }
-    _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata, 
ControllerJobTypes.TENANT_REBALANCE);
-    _numUpdatesToZk++;
-    LOGGER.debug("Number of updates to Zk: {} for rebalanceJob: {}  ", 
_numUpdatesToZk, _jobId);
+    return jobMetadata;
+  }
+
+  public TenantRebalancer.TenantTableRebalanceJobContext pollQueue(boolean 
isParallel) {
+    final TenantRebalancer.TenantTableRebalanceJobContext[] ret =
+        new TenantRebalancer.TenantTableRebalanceJobContext[1];
+    try {
+      updateTenantRebalanceJobMetadataInZk((ctx, progressStats) -> {
+        TenantRebalancer.TenantTableRebalanceJobContext polled =
+            isParallel ? ctx.getParallelQueue().poll() : 
ctx.getSequentialQueue().poll();
+        if (polled != null) {
+          ctx.getOngoingJobsQueue().add(polled);
+          String tableName = polled.getTableName();
+          String rebalanceJobId = polled.getJobId();
+          progressStats.updateTableStatus(tableName, 
TenantRebalanceProgressStats.TableStatus.REBALANCING.name());
+          progressStats.putTableRebalanceJobId(tableName, rebalanceJobId);
+        }
+        ret[0] = polled;
+      });
+    } catch (AttemptFailureException e) {
+      LOGGER.error("Error updating ZK for jobId: {} while polling from {} 
queue", _jobId,
+          isParallel ? "parallel" : "sequential", e);
+      throw new RuntimeException(
+          "Error updating ZK for jobId: " + _jobId + " while polling from " + 
(isParallel ? "parallel" : "sequential")
+              + " queue", e);
+    }
+    return ret[0];
+  }
+
+  public TenantRebalancer.TenantTableRebalanceJobContext pollParallel() {
+    return pollQueue(true);
+  }
+
+  public TenantRebalancer.TenantTableRebalanceJobContext pollSequential() {
+    return pollQueue(false);
+  }
+
+  public void onTableJobError(TenantRebalancer.TenantTableRebalanceJobContext 
jobContext, String errorMessage) {
+    onTableJobComplete(jobContext, errorMessage);
+  }
+
+  public void onTableJobDone(TenantRebalancer.TenantTableRebalanceJobContext 
jobContext) {
+    onTableJobComplete(jobContext, 
TenantRebalanceProgressStats.TableStatus.DONE.name());
+  }
+
+  private void 
onTableJobComplete(TenantRebalancer.TenantTableRebalanceJobContext jobContext, 
String message) {
+    try {
+      updateTenantRebalanceJobMetadataInZk((ctx, progressStats) -> {
+        if (ctx.getOngoingJobsQueue().remove(jobContext)) {
+          progressStats.updateTableStatus(jobContext.getTableName(), message);
+          progressStats.setRemainingTables(progressStats.getRemainingTables() 
- 1);
+        }
+      });
+    } catch (AttemptFailureException e) {
+      LOGGER.error("Error updating ZK for jobId: {} on completion of table 
rebalance job: {}", _jobId, jobContext, e);
+      throw new RuntimeException(
+          "Error updating ZK for jobId: " + _jobId + " on completion of table 
rebalance job: " + jobContext, e);
+    }
+  }
+
+  public Pair<List<String>, Boolean> cancelJob(boolean isCancelledByUser) {
+    List<String> cancelledJobs = new ArrayList<>();
+    try {
+      // Empty the queues first to prevent any new jobs from being picked up.
+      updateTenantRebalanceJobMetadataInZk((tenantRebalanceContext, 
progressStats) -> {
+        TenantRebalancer.TenantTableRebalanceJobContext ctx;
+        while ((ctx = tenantRebalanceContext.getParallelQueue().poll()) != 
null) {
+          progressStats.getTableStatusMap()
+              .put(ctx.getTableName(), 
TenantRebalanceProgressStats.TableStatus.NOT_SCHEDULED.name());
+        }
+        while ((ctx = tenantRebalanceContext.getSequentialQueue().poll()) != 
null) {
+          progressStats.getTableStatusMap()
+              .put(ctx.getTableName(), 
TenantRebalanceProgressStats.TableStatus.NOT_SCHEDULED.name());
+        }
+      });
+      // Try to cancel ongoing jobs with best efforts. There could be some 
ongoing jobs that are marked cancelled but
+      // was completed if table rebalance completed right after 
TableRebalanceManager marked it.
+      updateTenantRebalanceJobMetadataInZk((tenantRebalanceContext, 
progressStats) -> {
+        TenantRebalancer.TenantTableRebalanceJobContext ctx;
+        while ((ctx = tenantRebalanceContext.getOngoingJobsQueue().poll()) != 
null) {
+          
cancelledJobs.addAll(TableRebalanceManager.cancelRebalance(ctx.getTableName(), 
_pinotHelixResourceManager,
+              isCancelledByUser ? RebalanceResult.Status.CANCELLED : 
RebalanceResult.Status.ABORTED));
+          progressStats.getTableStatusMap()
+              .put(ctx.getTableName(), isCancelledByUser ? 
TenantRebalanceProgressStats.TableStatus.CANCELLED.name()
+                  : TenantRebalanceProgressStats.TableStatus.ABORTED.name());
+        }
+        progressStats.setRemainingTables(0);
+        progressStats.setCompletionStatusMsg(
+            "Tenant rebalance job has been " + (isCancelledByUser ? 
"cancelled." : "aborted."));
+        progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() - 
progressStats.getStartTimeMs()) / 1000);
+      });
+      return Pair.of(cancelledJobs, true);
+    } catch (AttemptFailureException e) {
+      return Pair.of(cancelledJobs, false);
+    }
+  }
+
+  private void updateTenantRebalanceJobMetadataInZk(
+      BiConsumer<TenantRebalanceContext, TenantRebalanceProgressStats> updater)
+      throws AttemptFailureException {
+    RetryPolicy retry = 
RetryPolicies.fixedDelayRetryPolicy(ZK_UPDATE_MAX_RETRIES, 
ZK_UPDATE_RETRY_WAIT_MS);
+    retry.attempt(() -> {

Review Comment:
   Are we using this retry mechanism to guard against concurrent updates from 
ongoing parallel rebalances? Wouldn't it be better to just synchronize this 
instead?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java:
##########
@@ -240,17 +241,19 @@ public List<String> cancelRebalance(String 
tableNameWithType) {
               return;
             }
 
-            LOGGER.info("Cancelling rebalance job: {} for table: {}", jobId, 
tableNameWithType);
-            jobStats.setStatus(RebalanceResult.Status.CANCELLED);
+            LOGGER.info("Cancelling rebalance job: {} for table: {}, setting 
status to: {}", jobId, tableNameWithType,
+                setToStatus);
+            jobStats.setStatus(setToStatus);
             
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
                 JsonUtils.objectToString(jobStats));
             cancelledJobIds.add(jobId);
           } catch (Exception e) {
             LOGGER.error("Failed to cancel rebalance job: {} for table: {}", 
jobId, tableNameWithType, e);
           }
         });
-    LOGGER.info("Tried to cancel existing rebalance jobs for table: {} at best 
effort and done: {}", tableNameWithType,
-        updated);
+    LOGGER.info("Tried to cancel existing rebalance jobs for table: {} at best 
effort and done: {}. Status set to: {}",

Review Comment:
   Same as above (cancel vs abort)



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java:
##########
@@ -251,20 +259,18 @@ private void 
doConsumeTablesFromQueueAndRebalance(Queue<TenantTableRebalanceJobC
         if (result.getStatus().equals(RebalanceResult.Status.DONE)) {
           LOGGER.info("Completed rebalance for table: {} with table rebalance 
job ID: {} in tenant rebalance job: {}",
               tableName, rebalanceJobId, observer.getJobId());
-          ongoingJobs.remove(jobContext);
-          
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER, 
tableName, null);
+          observer.onTableJobDone(jobContext);
         } else {
-          LOGGER.warn("Rebalance for table: {} with table rebalance job ID: {} 
in tenant rebalance job: {} is not done."
+          LOGGER.warn(
+              "Rebalance for table: {} with table rebalance job ID: {} in 
tenant rebalance job: {} is not done."
                   + "Status: {}, Description: {}", tableName, rebalanceJobId, 
observer.getJobId(), result.getStatus(),
               result.getDescription());
-          ongoingJobs.remove(jobContext);
-          
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER, 
tableName,
-              result.getDescription());
+          observer.onTableJobError(jobContext, result.getDescription());
         }
       } catch (Throwable t) {
-        ongoingJobs.remove(jobContext);
-        
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER, 
tableName,
-            String.format("Caught exception/error while rebalancing table: 
%s", tableName));
+        LOGGER.error("Caught exception while rebalancing table: {} with table 
rebalance job ID: {} in tenant "
+            + "rebalance job: {}", tableName, rebalanceJobId, 
observer.getJobId(), t);
+        observer.onTableJobError(jobContext, t.getMessage());

Review Comment:
   Previously, the error status message was 'Caught exception/error while 
rebalancing table', now it's just the exception message without any context. Is 
this change intentional?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java:
##########
@@ -240,17 +241,19 @@ public List<String> cancelRebalance(String 
tableNameWithType) {
               return;
             }
 
-            LOGGER.info("Cancelling rebalance job: {} for table: {}", jobId, 
tableNameWithType);
-            jobStats.setStatus(RebalanceResult.Status.CANCELLED);
+            LOGGER.info("Cancelling rebalance job: {} for table: {}, setting 
status to: {}", jobId, tableNameWithType,

Review Comment:
   Log line should also distinguish between cancelled and aborted given that 
these are user facing statuses exposed through the REST API.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java:
##########
@@ -74,66 +82,211 @@ public ZkBasedTenantRebalanceObserver(String jobId, String 
tenantName, Set<Strin
 
   @Override
   public void onTrigger(Trigger trigger, String tableName, String description) 
{
-    switch (trigger) {
-      case START_TRIGGER:
-        _progressStats.setStartTimeMs(System.currentTimeMillis());
-        break;
-      case REBALANCE_STARTED_TRIGGER:
-        _progressStats.updateTableStatus(tableName, 
TenantRebalanceProgressStats.TableStatus.PROCESSING.name());
-        _progressStats.putTableRebalanceJobId(tableName, description);
-        break;
-      case REBALANCE_COMPLETED_TRIGGER:
-        _progressStats.updateTableStatus(tableName, 
TenantRebalanceProgressStats.TableStatus.PROCESSED.name());
-        _unprocessedTables.remove(tableName);
-        _progressStats.setRemainingTables(_unprocessedTables.size());
-        break;
-      case REBALANCE_ERRORED_TRIGGER:
-        _progressStats.updateTableStatus(tableName, description);
-        _unprocessedTables.remove(tableName);
-        _progressStats.setRemainingTables(_unprocessedTables.size());
-        break;
-      default:
+  }

Review Comment:
   I guess we can remove this unused method from the interface now?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java:
##########
@@ -74,66 +82,211 @@ public ZkBasedTenantRebalanceObserver(String jobId, String 
tenantName, Set<Strin
 
   @Override
   public void onTrigger(Trigger trigger, String tableName, String description) 
{
-    switch (trigger) {
-      case START_TRIGGER:
-        _progressStats.setStartTimeMs(System.currentTimeMillis());
-        break;
-      case REBALANCE_STARTED_TRIGGER:
-        _progressStats.updateTableStatus(tableName, 
TenantRebalanceProgressStats.TableStatus.PROCESSING.name());
-        _progressStats.putTableRebalanceJobId(tableName, description);
-        break;
-      case REBALANCE_COMPLETED_TRIGGER:
-        _progressStats.updateTableStatus(tableName, 
TenantRebalanceProgressStats.TableStatus.PROCESSED.name());
-        _unprocessedTables.remove(tableName);
-        _progressStats.setRemainingTables(_unprocessedTables.size());
-        break;
-      case REBALANCE_ERRORED_TRIGGER:
-        _progressStats.updateTableStatus(tableName, description);
-        _unprocessedTables.remove(tableName);
-        _progressStats.setRemainingTables(_unprocessedTables.size());
-        break;
-      default:
+  }
+
+  public void onStart() {
+    try {
+      updateTenantRebalanceJobMetadataInZk(
+          (ctx, progressStats) -> 
progressStats.setStartTimeMs(System.currentTimeMillis()));
+    } catch (AttemptFailureException e) {
+      LOGGER.error("Error updating ZK for jobId: {} on starting tenant 
rebalance", _jobId, e);
+      throw new RuntimeException("Error updating ZK for jobId: " + _jobId + " 
on starting tenant rebalance", e);
     }
-    syncStatsAndContextInZk();
   }
 
   @Override
   public void onSuccess(String msg) {
-    _progressStats.setCompletionStatusMsg(msg);
-    _progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() - 
_progressStats.getStartTimeMs()) / 1000);
-    syncStatsAndContextInZk();
-    _isDone = true;
+    onFinish(msg);
   }
 
   @Override
   public void onError(String errorMsg) {
-    _progressStats.setCompletionStatusMsg(errorMsg);
-    _progressStats.setTimeToFinishInSeconds(System.currentTimeMillis() - 
_progressStats.getStartTimeMs());
-    syncStatsAndContextInZk();
+    onFinish(errorMsg);
+  }
+
+  private void onFinish(String msg) {
+    try {
+      updateTenantRebalanceJobMetadataInZk((ctx, progressStats) -> {
+        if (StringUtils.isEmpty(progressStats.getCompletionStatusMsg())) {
+          progressStats.setCompletionStatusMsg(msg);
+          progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() - 
progressStats.getStartTimeMs()) / 1000);
+        }
+      });
+    } catch (AttemptFailureException e) {
+      LOGGER.error("Error updating ZK for jobId: {} on successful completion 
of tenant rebalance", _jobId, e);
+      throw new RuntimeException(
+          "Error updating ZK for jobId: " + _jobId + " on successful 
completion of tenant rebalance", e);
+    }
     _isDone = true;
   }
 
-  private void syncStatsAndContextInZk() {
+  private Map<String, String> makeJobMetadata(TenantRebalanceContext 
tenantRebalanceContext,
+      TenantRebalanceProgressStats progressStats) {
     Map<String, String> jobMetadata = new HashMap<>();
     jobMetadata.put(CommonConstants.ControllerJob.TENANT_NAME, _tenantName);
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _jobId);
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(System.currentTimeMillis()));
     jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobTypes.TENANT_REBALANCE.name());
     try {
       
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
-          JsonUtils.objectToString(_progressStats));
+          JsonUtils.objectToString(progressStats));
     } catch (JsonProcessingException e) {
       LOGGER.error("Error serialising rebalance stats to JSON for persisting 
to ZK {}", _jobId, e);
     }
     try {
       jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
-          JsonUtils.objectToString(_tenantRebalanceContext));
+          JsonUtils.objectToString(tenantRebalanceContext));
     } catch (JsonProcessingException e) {
       LOGGER.error("Error serialising rebalance context to JSON for persisting 
to ZK {}", _jobId, e);
     }
-    _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata, 
ControllerJobTypes.TENANT_REBALANCE);
-    _numUpdatesToZk++;
-    LOGGER.debug("Number of updates to Zk: {} for rebalanceJob: {}  ", 
_numUpdatesToZk, _jobId);
+    return jobMetadata;
+  }
+
+  public TenantRebalancer.TenantTableRebalanceJobContext pollQueue(boolean 
isParallel) {
+    final TenantRebalancer.TenantTableRebalanceJobContext[] ret =
+        new TenantRebalancer.TenantTableRebalanceJobContext[1];
+    try {
+      updateTenantRebalanceJobMetadataInZk((ctx, progressStats) -> {
+        TenantRebalancer.TenantTableRebalanceJobContext polled =
+            isParallel ? ctx.getParallelQueue().poll() : 
ctx.getSequentialQueue().poll();
+        if (polled != null) {
+          ctx.getOngoingJobsQueue().add(polled);
+          String tableName = polled.getTableName();
+          String rebalanceJobId = polled.getJobId();
+          progressStats.updateTableStatus(tableName, 
TenantRebalanceProgressStats.TableStatus.REBALANCING.name());
+          progressStats.putTableRebalanceJobId(tableName, rebalanceJobId);
+        }
+        ret[0] = polled;
+      });
+    } catch (AttemptFailureException e) {
+      LOGGER.error("Error updating ZK for jobId: {} while polling from {} 
queue", _jobId,
+          isParallel ? "parallel" : "sequential", e);
+      throw new RuntimeException(
+          "Error updating ZK for jobId: " + _jobId + " while polling from " + 
(isParallel ? "parallel" : "sequential")
+              + " queue", e);
+    }
+    return ret[0];
+  }
+
+  public TenantRebalancer.TenantTableRebalanceJobContext pollParallel() {
+    return pollQueue(true);
+  }
+
+  public TenantRebalancer.TenantTableRebalanceJobContext pollSequential() {
+    return pollQueue(false);
+  }
+
+  public void onTableJobError(TenantRebalancer.TenantTableRebalanceJobContext 
jobContext, String errorMessage) {
+    onTableJobComplete(jobContext, errorMessage);
+  }
+
+  public void onTableJobDone(TenantRebalancer.TenantTableRebalanceJobContext 
jobContext) {
+    onTableJobComplete(jobContext, 
TenantRebalanceProgressStats.TableStatus.DONE.name());
+  }
+
+  private void 
onTableJobComplete(TenantRebalancer.TenantTableRebalanceJobContext jobContext, 
String message) {
+    try {
+      updateTenantRebalanceJobMetadataInZk((ctx, progressStats) -> {
+        if (ctx.getOngoingJobsQueue().remove(jobContext)) {
+          progressStats.updateTableStatus(jobContext.getTableName(), message);
+          progressStats.setRemainingTables(progressStats.getRemainingTables() 
- 1);
+        }
+      });
+    } catch (AttemptFailureException e) {
+      LOGGER.error("Error updating ZK for jobId: {} on completion of table 
rebalance job: {}", _jobId, jobContext, e);
+      throw new RuntimeException(
+          "Error updating ZK for jobId: " + _jobId + " on completion of table 
rebalance job: " + jobContext, e);
+    }
+  }
+
+  public Pair<List<String>, Boolean> cancelJob(boolean isCancelledByUser) {
+    List<String> cancelledJobs = new ArrayList<>();
+    try {
+      // Empty the queues first to prevent any new jobs from being picked up.
+      updateTenantRebalanceJobMetadataInZk((tenantRebalanceContext, 
progressStats) -> {
+        TenantRebalancer.TenantTableRebalanceJobContext ctx;
+        while ((ctx = tenantRebalanceContext.getParallelQueue().poll()) != 
null) {
+          progressStats.getTableStatusMap()
+              .put(ctx.getTableName(), 
TenantRebalanceProgressStats.TableStatus.NOT_SCHEDULED.name());
+        }
+        while ((ctx = tenantRebalanceContext.getSequentialQueue().poll()) != 
null) {
+          progressStats.getTableStatusMap()
+              .put(ctx.getTableName(), 
TenantRebalanceProgressStats.TableStatus.NOT_SCHEDULED.name());
+        }
+      });
+      // Try to cancel ongoing jobs with best efforts. There could be some 
ongoing jobs that are marked cancelled but
+      // was completed if table rebalance completed right after 
TableRebalanceManager marked it.
+      updateTenantRebalanceJobMetadataInZk((tenantRebalanceContext, 
progressStats) -> {
+        TenantRebalancer.TenantTableRebalanceJobContext ctx;
+        while ((ctx = tenantRebalanceContext.getOngoingJobsQueue().poll()) != 
null) {
+          
cancelledJobs.addAll(TableRebalanceManager.cancelRebalance(ctx.getTableName(), 
_pinotHelixResourceManager,
+              isCancelledByUser ? RebalanceResult.Status.CANCELLED : 
RebalanceResult.Status.ABORTED));
+          progressStats.getTableStatusMap()
+              .put(ctx.getTableName(), isCancelledByUser ? 
TenantRebalanceProgressStats.TableStatus.CANCELLED.name()
+                  : TenantRebalanceProgressStats.TableStatus.ABORTED.name());
+        }
+        progressStats.setRemainingTables(0);
+        progressStats.setCompletionStatusMsg(
+            "Tenant rebalance job has been " + (isCancelledByUser ? 
"cancelled." : "aborted."));
+        progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() - 
progressStats.getStartTimeMs()) / 1000);
+      });
+      return Pair.of(cancelledJobs, true);
+    } catch (AttemptFailureException e) {
+      return Pair.of(cancelledJobs, false);
+    }
+  }
+
+  private void updateTenantRebalanceJobMetadataInZk(
+      BiConsumer<TenantRebalanceContext, TenantRebalanceProgressStats> updater)
+      throws AttemptFailureException {
+    RetryPolicy retry = 
RetryPolicies.fixedDelayRetryPolicy(ZK_UPDATE_MAX_RETRIES, 
ZK_UPDATE_RETRY_WAIT_MS);
+    retry.attempt(() -> {

Review Comment:
   Oh, I see that we could have contention across objects because we're 
spinning up new instances of the observer for cancelling and aborting tenant 
rebalances. In that case, we could use some static locks keyed by `jobId` or a 
fixed number of locks accessed by the hash of the `jobId`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to