J-HowHuang commented on code in PR #16886:
URL: https://github.com/apache/pinot/pull/16886#discussion_r2389519793
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java:
##########
@@ -251,20 +259,18 @@ private void
doConsumeTablesFromQueueAndRebalance(Queue<TenantTableRebalanceJobC
if (result.getStatus().equals(RebalanceResult.Status.DONE)) {
LOGGER.info("Completed rebalance for table: {} with table rebalance
job ID: {} in tenant rebalance job: {}",
tableName, rebalanceJobId, observer.getJobId());
- ongoingJobs.remove(jobContext);
-
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER,
tableName, null);
+ observer.onTableJobDone(jobContext);
} else {
- LOGGER.warn("Rebalance for table: {} with table rebalance job ID: {}
in tenant rebalance job: {} is not done."
+ LOGGER.warn(
+ "Rebalance for table: {} with table rebalance job ID: {} in
tenant rebalance job: {} is not done."
+ "Status: {}, Description: {}", tableName, rebalanceJobId,
observer.getJobId(), result.getStatus(),
result.getDescription());
- ongoingJobs.remove(jobContext);
-
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER,
tableName,
- result.getDescription());
+ observer.onTableJobError(jobContext, result.getDescription());
}
} catch (Throwable t) {
- ongoingJobs.remove(jobContext);
-
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER,
tableName,
- String.format("Caught exception/error while rebalancing table:
%s", tableName));
+ LOGGER.error("Caught exception while rebalancing table: {} with table
rebalance job ID: {} in tenant "
+ + "rebalance job: {}", tableName, rebalanceJobId,
observer.getJobId(), t);
+ observer.onTableJobError(jobContext, t.getMessage());
Review Comment:
No. I wanted to add the error message into the table statuses but miss the
initial line. Added it
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java:
##########
@@ -74,66 +82,211 @@ public ZkBasedTenantRebalanceObserver(String jobId, String
tenantName, Set<Strin
@Override
public void onTrigger(Trigger trigger, String tableName, String description)
{
- switch (trigger) {
- case START_TRIGGER:
- _progressStats.setStartTimeMs(System.currentTimeMillis());
- break;
- case REBALANCE_STARTED_TRIGGER:
- _progressStats.updateTableStatus(tableName,
TenantRebalanceProgressStats.TableStatus.PROCESSING.name());
- _progressStats.putTableRebalanceJobId(tableName, description);
- break;
- case REBALANCE_COMPLETED_TRIGGER:
- _progressStats.updateTableStatus(tableName,
TenantRebalanceProgressStats.TableStatus.PROCESSED.name());
- _unprocessedTables.remove(tableName);
- _progressStats.setRemainingTables(_unprocessedTables.size());
- break;
- case REBALANCE_ERRORED_TRIGGER:
- _progressStats.updateTableStatus(tableName, description);
- _unprocessedTables.remove(tableName);
- _progressStats.setRemainingTables(_unprocessedTables.size());
- break;
- default:
+ }
+
+ public void onStart() {
+ try {
+ updateTenantRebalanceJobMetadataInZk(
+ (ctx, progressStats) ->
progressStats.setStartTimeMs(System.currentTimeMillis()));
+ } catch (AttemptFailureException e) {
+ LOGGER.error("Error updating ZK for jobId: {} on starting tenant
rebalance", _jobId, e);
+ throw new RuntimeException("Error updating ZK for jobId: " + _jobId + "
on starting tenant rebalance", e);
}
- syncStatsAndContextInZk();
}
@Override
public void onSuccess(String msg) {
- _progressStats.setCompletionStatusMsg(msg);
- _progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() -
_progressStats.getStartTimeMs()) / 1000);
- syncStatsAndContextInZk();
- _isDone = true;
+ onFinish(msg);
}
@Override
public void onError(String errorMsg) {
- _progressStats.setCompletionStatusMsg(errorMsg);
- _progressStats.setTimeToFinishInSeconds(System.currentTimeMillis() -
_progressStats.getStartTimeMs());
- syncStatsAndContextInZk();
+ onFinish(errorMsg);
+ }
+
+ private void onFinish(String msg) {
+ try {
+ updateTenantRebalanceJobMetadataInZk((ctx, progressStats) -> {
+ if (StringUtils.isEmpty(progressStats.getCompletionStatusMsg())) {
+ progressStats.setCompletionStatusMsg(msg);
+ progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() -
progressStats.getStartTimeMs()) / 1000);
+ }
+ });
+ } catch (AttemptFailureException e) {
+ LOGGER.error("Error updating ZK for jobId: {} on successful completion
of tenant rebalance", _jobId, e);
+ throw new RuntimeException(
+ "Error updating ZK for jobId: " + _jobId + " on successful
completion of tenant rebalance", e);
+ }
_isDone = true;
}
- private void syncStatsAndContextInZk() {
+ private Map<String, String> makeJobMetadata(TenantRebalanceContext
tenantRebalanceContext,
+ TenantRebalanceProgressStats progressStats) {
Map<String, String> jobMetadata = new HashMap<>();
jobMetadata.put(CommonConstants.ControllerJob.TENANT_NAME, _tenantName);
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _jobId);
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(System.currentTimeMillis()));
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobTypes.TENANT_REBALANCE.name());
try {
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
- JsonUtils.objectToString(_progressStats));
+ JsonUtils.objectToString(progressStats));
} catch (JsonProcessingException e) {
LOGGER.error("Error serialising rebalance stats to JSON for persisting
to ZK {}", _jobId, e);
}
try {
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
- JsonUtils.objectToString(_tenantRebalanceContext));
+ JsonUtils.objectToString(tenantRebalanceContext));
} catch (JsonProcessingException e) {
LOGGER.error("Error serialising rebalance context to JSON for persisting
to ZK {}", _jobId, e);
}
- _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata,
ControllerJobTypes.TENANT_REBALANCE);
- _numUpdatesToZk++;
- LOGGER.debug("Number of updates to Zk: {} for rebalanceJob: {} ",
_numUpdatesToZk, _jobId);
+ return jobMetadata;
+ }
+
+ public TenantRebalancer.TenantTableRebalanceJobContext pollQueue(boolean
isParallel) {
+ final TenantRebalancer.TenantTableRebalanceJobContext[] ret =
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]