yashmayya commented on code in PR #16886:
URL: https://github.com/apache/pinot/pull/16886#discussion_r2388987512
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java:
##########
@@ -701,6 +703,46 @@ public SuccessResponse deleteTenant(
Response.Status.INTERNAL_SERVER_ERROR);
}
+ @DELETE
+ @Produces(MediaType.APPLICATION_JSON)
+ @Authenticate(AccessType.DELETE)
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.REBALANCE_TENANT_TABLES)
+ @Path("/tenants/rebalance/{jobId}")
+ @ApiOperation(value = "Cancels a running tenant rebalance job")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success", response =
SuccessResponse.class),
+ @ApiResponse(code = 404, message = "Tenant rebalance job not found"),
+ @ApiResponse(code = 500, message = "Internal server error during
cancelling the rebalance job")
Review Comment:
```suggestion
@ApiResponse(code = 500, message = "Internal server error while
cancelling the rebalance job")
```
nit
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java:
##########
@@ -701,6 +703,46 @@ public SuccessResponse deleteTenant(
Response.Status.INTERNAL_SERVER_ERROR);
}
+ @DELETE
+ @Produces(MediaType.APPLICATION_JSON)
+ @Authenticate(AccessType.DELETE)
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.REBALANCE_TENANT_TABLES)
+ @Path("/tenants/rebalance/{jobId}")
+ @ApiOperation(value = "Cancels a running tenant rebalance job")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success", response =
SuccessResponse.class),
+ @ApiResponse(code = 404, message = "Tenant rebalance job not found"),
+ @ApiResponse(code = 500, message = "Internal server error during
cancelling the rebalance job")
+ })
+ public SuccessResponse cancelRebalance(
+ @ApiParam(value = "Tenant rebalance job id", required = true)
@PathParam("jobId") String jobId) {
+ Map<String, String> jobMetadata =
+ _pinotHelixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobTypes.TENANT_REBALANCE);
+ if (jobMetadata == null) {
+ throw new ControllerApplicationException(LOGGER, "Failed to cancel
tenant rebalance job: " + jobId,
+ Response.Status.NOT_FOUND);
+ }
+ try {
+ ZkBasedTenantRebalanceObserver observer =
+ new ZkBasedTenantRebalanceObserver(jobId,
jobMetadata.get(CommonConstants.ControllerJob.TENANT_NAME),
+ _pinotHelixResourceManager);
+ Pair<List<String>, Boolean> result = observer.cancelJob(true);
+ if (result.getRight()) {
+ return new SuccessResponse(
+ "Successfully cancelled tenant rebalance job: " + jobId + ".
Cancelled " + result.getLeft().size()
+ + " table rebalance jobs: " + result.getLeft());
+ } else {
+ throw new ControllerApplicationException(LOGGER,
+ "Failed to cancel tenant rebalance job: " + jobId + ", yet " +
result.getLeft().size()
+ + " table rebalance jobs are cancelled successfully: " +
result.getLeft(),
+ Response.Status.INTERNAL_SERVER_ERROR);
Review Comment:
nit: this message will look a bit strange if `result.getLeft()` is 0. I'd
suggest rewording it to something like: `Failed to cancel tenant rebalance job:
{jobId}. Number of table rebalance jobs cancelled successfully: {num}` instead.
Also we're not returning any context on the failure to the users here?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java:
##########
@@ -74,66 +82,211 @@ public ZkBasedTenantRebalanceObserver(String jobId, String
tenantName, Set<Strin
@Override
public void onTrigger(Trigger trigger, String tableName, String description)
{
- switch (trigger) {
- case START_TRIGGER:
- _progressStats.setStartTimeMs(System.currentTimeMillis());
- break;
- case REBALANCE_STARTED_TRIGGER:
- _progressStats.updateTableStatus(tableName,
TenantRebalanceProgressStats.TableStatus.PROCESSING.name());
- _progressStats.putTableRebalanceJobId(tableName, description);
- break;
- case REBALANCE_COMPLETED_TRIGGER:
- _progressStats.updateTableStatus(tableName,
TenantRebalanceProgressStats.TableStatus.PROCESSED.name());
- _unprocessedTables.remove(tableName);
- _progressStats.setRemainingTables(_unprocessedTables.size());
- break;
- case REBALANCE_ERRORED_TRIGGER:
- _progressStats.updateTableStatus(tableName, description);
- _unprocessedTables.remove(tableName);
- _progressStats.setRemainingTables(_unprocessedTables.size());
- break;
- default:
+ }
+
+ public void onStart() {
+ try {
+ updateTenantRebalanceJobMetadataInZk(
+ (ctx, progressStats) ->
progressStats.setStartTimeMs(System.currentTimeMillis()));
+ } catch (AttemptFailureException e) {
+ LOGGER.error("Error updating ZK for jobId: {} on starting tenant
rebalance", _jobId, e);
+ throw new RuntimeException("Error updating ZK for jobId: " + _jobId + "
on starting tenant rebalance", e);
}
- syncStatsAndContextInZk();
}
@Override
public void onSuccess(String msg) {
- _progressStats.setCompletionStatusMsg(msg);
- _progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() -
_progressStats.getStartTimeMs()) / 1000);
- syncStatsAndContextInZk();
- _isDone = true;
+ onFinish(msg);
}
@Override
public void onError(String errorMsg) {
- _progressStats.setCompletionStatusMsg(errorMsg);
- _progressStats.setTimeToFinishInSeconds(System.currentTimeMillis() -
_progressStats.getStartTimeMs());
- syncStatsAndContextInZk();
+ onFinish(errorMsg);
+ }
+
+ private void onFinish(String msg) {
+ try {
+ updateTenantRebalanceJobMetadataInZk((ctx, progressStats) -> {
+ if (StringUtils.isEmpty(progressStats.getCompletionStatusMsg())) {
+ progressStats.setCompletionStatusMsg(msg);
+ progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() -
progressStats.getStartTimeMs()) / 1000);
+ }
+ });
+ } catch (AttemptFailureException e) {
+ LOGGER.error("Error updating ZK for jobId: {} on successful completion
of tenant rebalance", _jobId, e);
+ throw new RuntimeException(
+ "Error updating ZK for jobId: " + _jobId + " on successful
completion of tenant rebalance", e);
+ }
_isDone = true;
}
- private void syncStatsAndContextInZk() {
+ private Map<String, String> makeJobMetadata(TenantRebalanceContext
tenantRebalanceContext,
+ TenantRebalanceProgressStats progressStats) {
Map<String, String> jobMetadata = new HashMap<>();
jobMetadata.put(CommonConstants.ControllerJob.TENANT_NAME, _tenantName);
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _jobId);
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(System.currentTimeMillis()));
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobTypes.TENANT_REBALANCE.name());
try {
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
- JsonUtils.objectToString(_progressStats));
+ JsonUtils.objectToString(progressStats));
} catch (JsonProcessingException e) {
LOGGER.error("Error serialising rebalance stats to JSON for persisting
to ZK {}", _jobId, e);
}
try {
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
- JsonUtils.objectToString(_tenantRebalanceContext));
+ JsonUtils.objectToString(tenantRebalanceContext));
} catch (JsonProcessingException e) {
LOGGER.error("Error serialising rebalance context to JSON for persisting
to ZK {}", _jobId, e);
}
- _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata,
ControllerJobTypes.TENANT_REBALANCE);
- _numUpdatesToZk++;
- LOGGER.debug("Number of updates to Zk: {} for rebalanceJob: {} ",
_numUpdatesToZk, _jobId);
+ return jobMetadata;
+ }
+
+ public TenantRebalancer.TenantTableRebalanceJobContext pollQueue(boolean
isParallel) {
+ final TenantRebalancer.TenantTableRebalanceJobContext[] ret =
Review Comment:
nit: I'd suggest using an `AtomicReference` rather than an array to get
around the lambda's effectively final requirement, makes it cleaner IMO.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java:
##########
@@ -251,20 +259,18 @@ private void
doConsumeTablesFromQueueAndRebalance(Queue<TenantTableRebalanceJobC
if (result.getStatus().equals(RebalanceResult.Status.DONE)) {
LOGGER.info("Completed rebalance for table: {} with table rebalance
job ID: {} in tenant rebalance job: {}",
tableName, rebalanceJobId, observer.getJobId());
- ongoingJobs.remove(jobContext);
-
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER,
tableName, null);
+ observer.onTableJobDone(jobContext);
} else {
- LOGGER.warn("Rebalance for table: {} with table rebalance job ID: {}
in tenant rebalance job: {} is not done."
+ LOGGER.warn(
+ "Rebalance for table: {} with table rebalance job ID: {} in
tenant rebalance job: {} is not done."
+ "Status: {}, Description: {}", tableName, rebalanceJobId,
observer.getJobId(), result.getStatus(),
result.getDescription());
- ongoingJobs.remove(jobContext);
-
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER,
tableName,
- result.getDescription());
+ observer.onTableJobError(jobContext, result.getDescription());
}
} catch (Throwable t) {
Review Comment:
Not a new change, but why are we catching `Throwable` and not `Exception`
here? This is generally a bad idea in most situations.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java:
##########
@@ -74,66 +82,211 @@ public ZkBasedTenantRebalanceObserver(String jobId, String
tenantName, Set<Strin
@Override
public void onTrigger(Trigger trigger, String tableName, String description)
{
- switch (trigger) {
- case START_TRIGGER:
- _progressStats.setStartTimeMs(System.currentTimeMillis());
- break;
- case REBALANCE_STARTED_TRIGGER:
- _progressStats.updateTableStatus(tableName,
TenantRebalanceProgressStats.TableStatus.PROCESSING.name());
- _progressStats.putTableRebalanceJobId(tableName, description);
- break;
- case REBALANCE_COMPLETED_TRIGGER:
- _progressStats.updateTableStatus(tableName,
TenantRebalanceProgressStats.TableStatus.PROCESSED.name());
- _unprocessedTables.remove(tableName);
- _progressStats.setRemainingTables(_unprocessedTables.size());
- break;
- case REBALANCE_ERRORED_TRIGGER:
- _progressStats.updateTableStatus(tableName, description);
- _unprocessedTables.remove(tableName);
- _progressStats.setRemainingTables(_unprocessedTables.size());
- break;
- default:
+ }
Review Comment:
TBH we can probably remove the interface itself given that this is the only
implementation and you've refactored away usages of the various triggers.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java:
##########
@@ -227,9 +227,10 @@ RebalanceResult rebalanceTable(String tableNameWithType,
TableConfig tableConfig
* @param tableNameWithType name of the table for which to cancel any
ongoing rebalance job
* @return the list of job IDs that were cancelled
*/
- public List<String> cancelRebalance(String tableNameWithType) {
+ public static List<String> cancelRebalance(String tableNameWithType,
PinotHelixResourceManager resourceManager,
+ RebalanceResult.Status setToStatus) {
Review Comment:
nit: add the new parameter to the Javadoc and also add a precondition check
to verify that the status is one of `CANCELLED` or `ABORTED`.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java:
##########
@@ -701,6 +703,46 @@ public SuccessResponse deleteTenant(
Response.Status.INTERNAL_SERVER_ERROR);
}
+ @DELETE
+ @Produces(MediaType.APPLICATION_JSON)
+ @Authenticate(AccessType.DELETE)
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.REBALANCE_TENANT_TABLES)
+ @Path("/tenants/rebalance/{jobId}")
+ @ApiOperation(value = "Cancels a running tenant rebalance job")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success", response =
SuccessResponse.class),
+ @ApiResponse(code = 404, message = "Tenant rebalance job not found"),
+ @ApiResponse(code = 500, message = "Internal server error during
cancelling the rebalance job")
+ })
+ public SuccessResponse cancelRebalance(
+ @ApiParam(value = "Tenant rebalance job id", required = true)
@PathParam("jobId") String jobId) {
+ Map<String, String> jobMetadata =
+ _pinotHelixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobTypes.TENANT_REBALANCE);
+ if (jobMetadata == null) {
+ throw new ControllerApplicationException(LOGGER, "Failed to cancel
tenant rebalance job: " + jobId,
Review Comment:
nit: shouldn't the message also indicate that no tenant rebalance with the
given `jobId` was found?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java:
##########
@@ -74,66 +82,211 @@ public ZkBasedTenantRebalanceObserver(String jobId, String
tenantName, Set<Strin
@Override
public void onTrigger(Trigger trigger, String tableName, String description)
{
- switch (trigger) {
- case START_TRIGGER:
- _progressStats.setStartTimeMs(System.currentTimeMillis());
- break;
- case REBALANCE_STARTED_TRIGGER:
- _progressStats.updateTableStatus(tableName,
TenantRebalanceProgressStats.TableStatus.PROCESSING.name());
- _progressStats.putTableRebalanceJobId(tableName, description);
- break;
- case REBALANCE_COMPLETED_TRIGGER:
- _progressStats.updateTableStatus(tableName,
TenantRebalanceProgressStats.TableStatus.PROCESSED.name());
- _unprocessedTables.remove(tableName);
- _progressStats.setRemainingTables(_unprocessedTables.size());
- break;
- case REBALANCE_ERRORED_TRIGGER:
- _progressStats.updateTableStatus(tableName, description);
- _unprocessedTables.remove(tableName);
- _progressStats.setRemainingTables(_unprocessedTables.size());
- break;
- default:
+ }
+
+ public void onStart() {
+ try {
+ updateTenantRebalanceJobMetadataInZk(
+ (ctx, progressStats) ->
progressStats.setStartTimeMs(System.currentTimeMillis()));
+ } catch (AttemptFailureException e) {
+ LOGGER.error("Error updating ZK for jobId: {} on starting tenant
rebalance", _jobId, e);
+ throw new RuntimeException("Error updating ZK for jobId: " + _jobId + "
on starting tenant rebalance", e);
}
- syncStatsAndContextInZk();
}
@Override
public void onSuccess(String msg) {
- _progressStats.setCompletionStatusMsg(msg);
- _progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() -
_progressStats.getStartTimeMs()) / 1000);
- syncStatsAndContextInZk();
- _isDone = true;
+ onFinish(msg);
}
@Override
public void onError(String errorMsg) {
- _progressStats.setCompletionStatusMsg(errorMsg);
- _progressStats.setTimeToFinishInSeconds(System.currentTimeMillis() -
_progressStats.getStartTimeMs());
- syncStatsAndContextInZk();
+ onFinish(errorMsg);
+ }
+
+ private void onFinish(String msg) {
+ try {
+ updateTenantRebalanceJobMetadataInZk((ctx, progressStats) -> {
+ if (StringUtils.isEmpty(progressStats.getCompletionStatusMsg())) {
+ progressStats.setCompletionStatusMsg(msg);
+ progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() -
progressStats.getStartTimeMs()) / 1000);
+ }
+ });
+ } catch (AttemptFailureException e) {
+ LOGGER.error("Error updating ZK for jobId: {} on successful completion
of tenant rebalance", _jobId, e);
+ throw new RuntimeException(
+ "Error updating ZK for jobId: " + _jobId + " on successful
completion of tenant rebalance", e);
+ }
_isDone = true;
}
- private void syncStatsAndContextInZk() {
+ private Map<String, String> makeJobMetadata(TenantRebalanceContext
tenantRebalanceContext,
+ TenantRebalanceProgressStats progressStats) {
Map<String, String> jobMetadata = new HashMap<>();
jobMetadata.put(CommonConstants.ControllerJob.TENANT_NAME, _tenantName);
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _jobId);
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(System.currentTimeMillis()));
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobTypes.TENANT_REBALANCE.name());
try {
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
- JsonUtils.objectToString(_progressStats));
+ JsonUtils.objectToString(progressStats));
} catch (JsonProcessingException e) {
LOGGER.error("Error serialising rebalance stats to JSON for persisting
to ZK {}", _jobId, e);
}
try {
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
- JsonUtils.objectToString(_tenantRebalanceContext));
+ JsonUtils.objectToString(tenantRebalanceContext));
} catch (JsonProcessingException e) {
LOGGER.error("Error serialising rebalance context to JSON for persisting
to ZK {}", _jobId, e);
}
- _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata,
ControllerJobTypes.TENANT_REBALANCE);
- _numUpdatesToZk++;
- LOGGER.debug("Number of updates to Zk: {} for rebalanceJob: {} ",
_numUpdatesToZk, _jobId);
+ return jobMetadata;
+ }
+
+ public TenantRebalancer.TenantTableRebalanceJobContext pollQueue(boolean
isParallel) {
+ final TenantRebalancer.TenantTableRebalanceJobContext[] ret =
+ new TenantRebalancer.TenantTableRebalanceJobContext[1];
+ try {
+ updateTenantRebalanceJobMetadataInZk((ctx, progressStats) -> {
+ TenantRebalancer.TenantTableRebalanceJobContext polled =
+ isParallel ? ctx.getParallelQueue().poll() :
ctx.getSequentialQueue().poll();
+ if (polled != null) {
+ ctx.getOngoingJobsQueue().add(polled);
+ String tableName = polled.getTableName();
+ String rebalanceJobId = polled.getJobId();
+ progressStats.updateTableStatus(tableName,
TenantRebalanceProgressStats.TableStatus.REBALANCING.name());
+ progressStats.putTableRebalanceJobId(tableName, rebalanceJobId);
+ }
+ ret[0] = polled;
+ });
+ } catch (AttemptFailureException e) {
+ LOGGER.error("Error updating ZK for jobId: {} while polling from {}
queue", _jobId,
+ isParallel ? "parallel" : "sequential", e);
+ throw new RuntimeException(
+ "Error updating ZK for jobId: " + _jobId + " while polling from " +
(isParallel ? "parallel" : "sequential")
+ + " queue", e);
+ }
+ return ret[0];
+ }
+
+ public TenantRebalancer.TenantTableRebalanceJobContext pollParallel() {
+ return pollQueue(true);
+ }
+
+ public TenantRebalancer.TenantTableRebalanceJobContext pollSequential() {
+ return pollQueue(false);
+ }
+
+ public void onTableJobError(TenantRebalancer.TenantTableRebalanceJobContext
jobContext, String errorMessage) {
+ onTableJobComplete(jobContext, errorMessage);
+ }
+
+ public void onTableJobDone(TenantRebalancer.TenantTableRebalanceJobContext
jobContext) {
+ onTableJobComplete(jobContext,
TenantRebalanceProgressStats.TableStatus.DONE.name());
+ }
+
+ private void
onTableJobComplete(TenantRebalancer.TenantTableRebalanceJobContext jobContext,
String message) {
+ try {
+ updateTenantRebalanceJobMetadataInZk((ctx, progressStats) -> {
+ if (ctx.getOngoingJobsQueue().remove(jobContext)) {
+ progressStats.updateTableStatus(jobContext.getTableName(), message);
+ progressStats.setRemainingTables(progressStats.getRemainingTables()
- 1);
+ }
+ });
+ } catch (AttemptFailureException e) {
+ LOGGER.error("Error updating ZK for jobId: {} on completion of table
rebalance job: {}", _jobId, jobContext, e);
+ throw new RuntimeException(
+ "Error updating ZK for jobId: " + _jobId + " on completion of table
rebalance job: " + jobContext, e);
+ }
+ }
+
+ public Pair<List<String>, Boolean> cancelJob(boolean isCancelledByUser) {
+ List<String> cancelledJobs = new ArrayList<>();
+ try {
+ // Empty the queues first to prevent any new jobs from being picked up.
+ updateTenantRebalanceJobMetadataInZk((tenantRebalanceContext,
progressStats) -> {
+ TenantRebalancer.TenantTableRebalanceJobContext ctx;
+ while ((ctx = tenantRebalanceContext.getParallelQueue().poll()) !=
null) {
+ progressStats.getTableStatusMap()
+ .put(ctx.getTableName(),
TenantRebalanceProgressStats.TableStatus.NOT_SCHEDULED.name());
+ }
+ while ((ctx = tenantRebalanceContext.getSequentialQueue().poll()) !=
null) {
+ progressStats.getTableStatusMap()
+ .put(ctx.getTableName(),
TenantRebalanceProgressStats.TableStatus.NOT_SCHEDULED.name());
+ }
+ });
+ // Try to cancel ongoing jobs with best efforts. There could be some
ongoing jobs that are marked cancelled but
+ // was completed if table rebalance completed right after
TableRebalanceManager marked it.
+ updateTenantRebalanceJobMetadataInZk((tenantRebalanceContext,
progressStats) -> {
+ TenantRebalancer.TenantTableRebalanceJobContext ctx;
+ while ((ctx = tenantRebalanceContext.getOngoingJobsQueue().poll()) !=
null) {
+
cancelledJobs.addAll(TableRebalanceManager.cancelRebalance(ctx.getTableName(),
_pinotHelixResourceManager,
+ isCancelledByUser ? RebalanceResult.Status.CANCELLED :
RebalanceResult.Status.ABORTED));
+ progressStats.getTableStatusMap()
+ .put(ctx.getTableName(), isCancelledByUser ?
TenantRebalanceProgressStats.TableStatus.CANCELLED.name()
+ : TenantRebalanceProgressStats.TableStatus.ABORTED.name());
+ }
+ progressStats.setRemainingTables(0);
+ progressStats.setCompletionStatusMsg(
+ "Tenant rebalance job has been " + (isCancelledByUser ?
"cancelled." : "aborted."));
+ progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() -
progressStats.getStartTimeMs()) / 1000);
+ });
+ return Pair.of(cancelledJobs, true);
+ } catch (AttemptFailureException e) {
+ return Pair.of(cancelledJobs, false);
+ }
+ }
+
+ private void updateTenantRebalanceJobMetadataInZk(
+ BiConsumer<TenantRebalanceContext, TenantRebalanceProgressStats> updater)
+ throws AttemptFailureException {
+ RetryPolicy retry =
RetryPolicies.fixedDelayRetryPolicy(ZK_UPDATE_MAX_RETRIES,
ZK_UPDATE_RETRY_WAIT_MS);
+ retry.attempt(() -> {
Review Comment:
Are we using this retry mechanism to guard against concurrent updates from
ongoing parallel rebalances? Wouldn't it be better to just synchronize this
instead?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java:
##########
@@ -240,17 +241,19 @@ public List<String> cancelRebalance(String
tableNameWithType) {
return;
}
- LOGGER.info("Cancelling rebalance job: {} for table: {}", jobId,
tableNameWithType);
- jobStats.setStatus(RebalanceResult.Status.CANCELLED);
+ LOGGER.info("Cancelling rebalance job: {} for table: {}, setting
status to: {}", jobId, tableNameWithType,
+ setToStatus);
+ jobStats.setStatus(setToStatus);
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(jobStats));
cancelledJobIds.add(jobId);
} catch (Exception e) {
LOGGER.error("Failed to cancel rebalance job: {} for table: {}",
jobId, tableNameWithType, e);
}
});
- LOGGER.info("Tried to cancel existing rebalance jobs for table: {} at best
effort and done: {}", tableNameWithType,
- updated);
+ LOGGER.info("Tried to cancel existing rebalance jobs for table: {} at best
effort and done: {}. Status set to: {}",
Review Comment:
Same as above (cancel vs abort)
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java:
##########
@@ -251,20 +259,18 @@ private void
doConsumeTablesFromQueueAndRebalance(Queue<TenantTableRebalanceJobC
if (result.getStatus().equals(RebalanceResult.Status.DONE)) {
LOGGER.info("Completed rebalance for table: {} with table rebalance
job ID: {} in tenant rebalance job: {}",
tableName, rebalanceJobId, observer.getJobId());
- ongoingJobs.remove(jobContext);
-
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER,
tableName, null);
+ observer.onTableJobDone(jobContext);
} else {
- LOGGER.warn("Rebalance for table: {} with table rebalance job ID: {}
in tenant rebalance job: {} is not done."
+ LOGGER.warn(
+ "Rebalance for table: {} with table rebalance job ID: {} in
tenant rebalance job: {} is not done."
+ "Status: {}, Description: {}", tableName, rebalanceJobId,
observer.getJobId(), result.getStatus(),
result.getDescription());
- ongoingJobs.remove(jobContext);
-
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER,
tableName,
- result.getDescription());
+ observer.onTableJobError(jobContext, result.getDescription());
}
} catch (Throwable t) {
- ongoingJobs.remove(jobContext);
-
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER,
tableName,
- String.format("Caught exception/error while rebalancing table:
%s", tableName));
+ LOGGER.error("Caught exception while rebalancing table: {} with table
rebalance job ID: {} in tenant "
+ + "rebalance job: {}", tableName, rebalanceJobId,
observer.getJobId(), t);
+ observer.onTableJobError(jobContext, t.getMessage());
Review Comment:
Previously, the error status message was 'Caught exception/error while
rebalancing table', now it's just the exception message without any context. Is
this change intentional?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java:
##########
@@ -240,17 +241,19 @@ public List<String> cancelRebalance(String
tableNameWithType) {
return;
}
- LOGGER.info("Cancelling rebalance job: {} for table: {}", jobId,
tableNameWithType);
- jobStats.setStatus(RebalanceResult.Status.CANCELLED);
+ LOGGER.info("Cancelling rebalance job: {} for table: {}, setting
status to: {}", jobId, tableNameWithType,
Review Comment:
Log line should also distinguish between cancelled and aborted given that
these are user facing statuses exposed through the REST API.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java:
##########
@@ -74,66 +82,211 @@ public ZkBasedTenantRebalanceObserver(String jobId, String
tenantName, Set<Strin
@Override
public void onTrigger(Trigger trigger, String tableName, String description)
{
- switch (trigger) {
- case START_TRIGGER:
- _progressStats.setStartTimeMs(System.currentTimeMillis());
- break;
- case REBALANCE_STARTED_TRIGGER:
- _progressStats.updateTableStatus(tableName,
TenantRebalanceProgressStats.TableStatus.PROCESSING.name());
- _progressStats.putTableRebalanceJobId(tableName, description);
- break;
- case REBALANCE_COMPLETED_TRIGGER:
- _progressStats.updateTableStatus(tableName,
TenantRebalanceProgressStats.TableStatus.PROCESSED.name());
- _unprocessedTables.remove(tableName);
- _progressStats.setRemainingTables(_unprocessedTables.size());
- break;
- case REBALANCE_ERRORED_TRIGGER:
- _progressStats.updateTableStatus(tableName, description);
- _unprocessedTables.remove(tableName);
- _progressStats.setRemainingTables(_unprocessedTables.size());
- break;
- default:
+ }
Review Comment:
I guess we can remove this unused method from the interface now?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java:
##########
@@ -74,66 +82,211 @@ public ZkBasedTenantRebalanceObserver(String jobId, String
tenantName, Set<Strin
@Override
public void onTrigger(Trigger trigger, String tableName, String description)
{
- switch (trigger) {
- case START_TRIGGER:
- _progressStats.setStartTimeMs(System.currentTimeMillis());
- break;
- case REBALANCE_STARTED_TRIGGER:
- _progressStats.updateTableStatus(tableName,
TenantRebalanceProgressStats.TableStatus.PROCESSING.name());
- _progressStats.putTableRebalanceJobId(tableName, description);
- break;
- case REBALANCE_COMPLETED_TRIGGER:
- _progressStats.updateTableStatus(tableName,
TenantRebalanceProgressStats.TableStatus.PROCESSED.name());
- _unprocessedTables.remove(tableName);
- _progressStats.setRemainingTables(_unprocessedTables.size());
- break;
- case REBALANCE_ERRORED_TRIGGER:
- _progressStats.updateTableStatus(tableName, description);
- _unprocessedTables.remove(tableName);
- _progressStats.setRemainingTables(_unprocessedTables.size());
- break;
- default:
+ }
+
+ public void onStart() {
+ try {
+ updateTenantRebalanceJobMetadataInZk(
+ (ctx, progressStats) ->
progressStats.setStartTimeMs(System.currentTimeMillis()));
+ } catch (AttemptFailureException e) {
+ LOGGER.error("Error updating ZK for jobId: {} on starting tenant
rebalance", _jobId, e);
+ throw new RuntimeException("Error updating ZK for jobId: " + _jobId + "
on starting tenant rebalance", e);
}
- syncStatsAndContextInZk();
}
@Override
public void onSuccess(String msg) {
- _progressStats.setCompletionStatusMsg(msg);
- _progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() -
_progressStats.getStartTimeMs()) / 1000);
- syncStatsAndContextInZk();
- _isDone = true;
+ onFinish(msg);
}
@Override
public void onError(String errorMsg) {
- _progressStats.setCompletionStatusMsg(errorMsg);
- _progressStats.setTimeToFinishInSeconds(System.currentTimeMillis() -
_progressStats.getStartTimeMs());
- syncStatsAndContextInZk();
+ onFinish(errorMsg);
+ }
+
+ private void onFinish(String msg) {
+ try {
+ updateTenantRebalanceJobMetadataInZk((ctx, progressStats) -> {
+ if (StringUtils.isEmpty(progressStats.getCompletionStatusMsg())) {
+ progressStats.setCompletionStatusMsg(msg);
+ progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() -
progressStats.getStartTimeMs()) / 1000);
+ }
+ });
+ } catch (AttemptFailureException e) {
+ LOGGER.error("Error updating ZK for jobId: {} on successful completion
of tenant rebalance", _jobId, e);
+ throw new RuntimeException(
+ "Error updating ZK for jobId: " + _jobId + " on successful
completion of tenant rebalance", e);
+ }
_isDone = true;
}
- private void syncStatsAndContextInZk() {
+ private Map<String, String> makeJobMetadata(TenantRebalanceContext
tenantRebalanceContext,
+ TenantRebalanceProgressStats progressStats) {
Map<String, String> jobMetadata = new HashMap<>();
jobMetadata.put(CommonConstants.ControllerJob.TENANT_NAME, _tenantName);
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _jobId);
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(System.currentTimeMillis()));
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobTypes.TENANT_REBALANCE.name());
try {
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
- JsonUtils.objectToString(_progressStats));
+ JsonUtils.objectToString(progressStats));
} catch (JsonProcessingException e) {
LOGGER.error("Error serialising rebalance stats to JSON for persisting
to ZK {}", _jobId, e);
}
try {
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
- JsonUtils.objectToString(_tenantRebalanceContext));
+ JsonUtils.objectToString(tenantRebalanceContext));
} catch (JsonProcessingException e) {
LOGGER.error("Error serialising rebalance context to JSON for persisting
to ZK {}", _jobId, e);
}
- _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata,
ControllerJobTypes.TENANT_REBALANCE);
- _numUpdatesToZk++;
- LOGGER.debug("Number of updates to Zk: {} for rebalanceJob: {} ",
_numUpdatesToZk, _jobId);
+ return jobMetadata;
+ }
+
+ public TenantRebalancer.TenantTableRebalanceJobContext pollQueue(boolean
isParallel) {
+ final TenantRebalancer.TenantTableRebalanceJobContext[] ret =
+ new TenantRebalancer.TenantTableRebalanceJobContext[1];
+ try {
+ updateTenantRebalanceJobMetadataInZk((ctx, progressStats) -> {
+ TenantRebalancer.TenantTableRebalanceJobContext polled =
+ isParallel ? ctx.getParallelQueue().poll() :
ctx.getSequentialQueue().poll();
+ if (polled != null) {
+ ctx.getOngoingJobsQueue().add(polled);
+ String tableName = polled.getTableName();
+ String rebalanceJobId = polled.getJobId();
+ progressStats.updateTableStatus(tableName,
TenantRebalanceProgressStats.TableStatus.REBALANCING.name());
+ progressStats.putTableRebalanceJobId(tableName, rebalanceJobId);
+ }
+ ret[0] = polled;
+ });
+ } catch (AttemptFailureException e) {
+ LOGGER.error("Error updating ZK for jobId: {} while polling from {}
queue", _jobId,
+ isParallel ? "parallel" : "sequential", e);
+ throw new RuntimeException(
+ "Error updating ZK for jobId: " + _jobId + " while polling from " +
(isParallel ? "parallel" : "sequential")
+ + " queue", e);
+ }
+ return ret[0];
+ }
+
+ public TenantRebalancer.TenantTableRebalanceJobContext pollParallel() {
+ return pollQueue(true);
+ }
+
+ public TenantRebalancer.TenantTableRebalanceJobContext pollSequential() {
+ return pollQueue(false);
+ }
+
+ public void onTableJobError(TenantRebalancer.TenantTableRebalanceJobContext
jobContext, String errorMessage) {
+ onTableJobComplete(jobContext, errorMessage);
+ }
+
+ public void onTableJobDone(TenantRebalancer.TenantTableRebalanceJobContext
jobContext) {
+ onTableJobComplete(jobContext,
TenantRebalanceProgressStats.TableStatus.DONE.name());
+ }
+
+ private void
onTableJobComplete(TenantRebalancer.TenantTableRebalanceJobContext jobContext,
String message) {
+ try {
+ updateTenantRebalanceJobMetadataInZk((ctx, progressStats) -> {
+ if (ctx.getOngoingJobsQueue().remove(jobContext)) {
+ progressStats.updateTableStatus(jobContext.getTableName(), message);
+ progressStats.setRemainingTables(progressStats.getRemainingTables()
- 1);
+ }
+ });
+ } catch (AttemptFailureException e) {
+ LOGGER.error("Error updating ZK for jobId: {} on completion of table
rebalance job: {}", _jobId, jobContext, e);
+ throw new RuntimeException(
+ "Error updating ZK for jobId: " + _jobId + " on completion of table
rebalance job: " + jobContext, e);
+ }
+ }
+
+ public Pair<List<String>, Boolean> cancelJob(boolean isCancelledByUser) {
+ List<String> cancelledJobs = new ArrayList<>();
+ try {
+ // Empty the queues first to prevent any new jobs from being picked up.
+ updateTenantRebalanceJobMetadataInZk((tenantRebalanceContext,
progressStats) -> {
+ TenantRebalancer.TenantTableRebalanceJobContext ctx;
+ while ((ctx = tenantRebalanceContext.getParallelQueue().poll()) !=
null) {
+ progressStats.getTableStatusMap()
+ .put(ctx.getTableName(),
TenantRebalanceProgressStats.TableStatus.NOT_SCHEDULED.name());
+ }
+ while ((ctx = tenantRebalanceContext.getSequentialQueue().poll()) !=
null) {
+ progressStats.getTableStatusMap()
+ .put(ctx.getTableName(),
TenantRebalanceProgressStats.TableStatus.NOT_SCHEDULED.name());
+ }
+ });
+ // Try to cancel ongoing jobs with best efforts. There could be some
ongoing jobs that are marked cancelled but
+ // was completed if table rebalance completed right after
TableRebalanceManager marked it.
+ updateTenantRebalanceJobMetadataInZk((tenantRebalanceContext,
progressStats) -> {
+ TenantRebalancer.TenantTableRebalanceJobContext ctx;
+ while ((ctx = tenantRebalanceContext.getOngoingJobsQueue().poll()) !=
null) {
+
cancelledJobs.addAll(TableRebalanceManager.cancelRebalance(ctx.getTableName(),
_pinotHelixResourceManager,
+ isCancelledByUser ? RebalanceResult.Status.CANCELLED :
RebalanceResult.Status.ABORTED));
+ progressStats.getTableStatusMap()
+ .put(ctx.getTableName(), isCancelledByUser ?
TenantRebalanceProgressStats.TableStatus.CANCELLED.name()
+ : TenantRebalanceProgressStats.TableStatus.ABORTED.name());
+ }
+ progressStats.setRemainingTables(0);
+ progressStats.setCompletionStatusMsg(
+ "Tenant rebalance job has been " + (isCancelledByUser ?
"cancelled." : "aborted."));
+ progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() -
progressStats.getStartTimeMs()) / 1000);
+ });
+ return Pair.of(cancelledJobs, true);
+ } catch (AttemptFailureException e) {
+ return Pair.of(cancelledJobs, false);
+ }
+ }
+
+ private void updateTenantRebalanceJobMetadataInZk(
+ BiConsumer<TenantRebalanceContext, TenantRebalanceProgressStats> updater)
+ throws AttemptFailureException {
+ RetryPolicy retry =
RetryPolicies.fixedDelayRetryPolicy(ZK_UPDATE_MAX_RETRIES,
ZK_UPDATE_RETRY_WAIT_MS);
+ retry.attempt(() -> {
Review Comment:
Oh, I see that we could have contention across objects because we're
spinning up new instances of the observer for cancelling and aborting tenant
rebalances. In that case, we could use some static locks keyed by `jobId` or a
fixed number of locks accessed by the hash of the `jobId`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]