Copilot commented on code in PR #16886:
URL: https://github.com/apache/pinot/pull/16886#discussion_r2379456674


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java:
##########
@@ -20,31 +20,34 @@
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.annotations.VisibleForTesting;
+import io.netty.util.internal.StringUtil;

Review Comment:
   Using `io.netty.util.internal.StringUtil` from Netty's internal package 
violates encapsulation and may break in future Netty versions. Consider using 
`org.apache.commons.lang3.StringUtils.isEmpty()` or similar from a stable 
public API instead.
   ```suggestion
   import org.apache.commons.lang3.StringUtils;
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java:
##########
@@ -74,66 +71,166 @@ public ZkBasedTenantRebalanceObserver(String jobId, String 
tenantName, Set<Strin
 
   @Override
   public void onTrigger(Trigger trigger, String tableName, String description) 
{
-    switch (trigger) {
-      case START_TRIGGER:
-        _progressStats.setStartTimeMs(System.currentTimeMillis());
-        break;
-      case REBALANCE_STARTED_TRIGGER:
-        _progressStats.updateTableStatus(tableName, 
TenantRebalanceProgressStats.TableStatus.PROCESSING.name());
-        _progressStats.putTableRebalanceJobId(tableName, description);
-        break;
-      case REBALANCE_COMPLETED_TRIGGER:
-        _progressStats.updateTableStatus(tableName, 
TenantRebalanceProgressStats.TableStatus.PROCESSED.name());
-        _unprocessedTables.remove(tableName);
-        _progressStats.setRemainingTables(_unprocessedTables.size());
-        break;
-      case REBALANCE_ERRORED_TRIGGER:
-        _progressStats.updateTableStatus(tableName, description);
-        _unprocessedTables.remove(tableName);
-        _progressStats.setRemainingTables(_unprocessedTables.size());
-        break;
-      default:
+  }
+
+  public void onStart() {
+    try {
+      updateTenantRebalanceContextInZk(
+          (ctx, progressStats) -> 
progressStats.setStartTimeMs(System.currentTimeMillis()));
+    } catch (AttemptFailureException e) {
+      LOGGER.error("Error updating ZK for jobId: {} on starting tenant 
rebalance", _jobId, e);
+      throw new RuntimeException("Error updating ZK for jobId: " + _jobId + " 
on starting tenant rebalance", e);
     }
-    syncStatsAndContextInZk();
   }
 
   @Override
   public void onSuccess(String msg) {
-    _progressStats.setCompletionStatusMsg(msg);
-    _progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() - 
_progressStats.getStartTimeMs()) / 1000);
-    syncStatsAndContextInZk();
-    _isDone = true;
+    onFinish(msg);
   }
 
   @Override
   public void onError(String errorMsg) {
-    _progressStats.setCompletionStatusMsg(errorMsg);
-    _progressStats.setTimeToFinishInSeconds(System.currentTimeMillis() - 
_progressStats.getStartTimeMs());
-    syncStatsAndContextInZk();
+    onFinish(errorMsg);
+  }
+
+  private void onFinish(String msg) {
+    try {
+      updateTenantRebalanceContextInZk((ctx, progressStats) -> {
+        if (StringUtil.isNullOrEmpty(progressStats.getCompletionStatusMsg())) {
+          progressStats.setCompletionStatusMsg(msg);
+          progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() - 
progressStats.getStartTimeMs()) / 1000);
+        }
+      });
+    } catch (AttemptFailureException e) {
+      LOGGER.error("Error updating ZK for jobId: {} on successful completion 
of tenant rebalance", _jobId, e);
+      throw new RuntimeException(
+          "Error updating ZK for jobId: " + _jobId + " on successful 
completion of tenant rebalance", e);
+    }
     _isDone = true;
   }
 
-  private void syncStatsAndContextInZk() {
+  private Map<String, String> makeJobMetadata(TenantRebalanceContext 
tenantRebalanceContext,
+      TenantRebalanceProgressStats progressStats) {
     Map<String, String> jobMetadata = new HashMap<>();
     jobMetadata.put(CommonConstants.ControllerJob.TENANT_NAME, _tenantName);
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _jobId);
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(System.currentTimeMillis()));
     jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobTypes.TENANT_REBALANCE.name());
     try {
       
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
-          JsonUtils.objectToString(_progressStats));
+          JsonUtils.objectToString(progressStats));
     } catch (JsonProcessingException e) {
       LOGGER.error("Error serialising rebalance stats to JSON for persisting 
to ZK {}", _jobId, e);
     }
     try {
       jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
-          JsonUtils.objectToString(_tenantRebalanceContext));
+          JsonUtils.objectToString(tenantRebalanceContext));
     } catch (JsonProcessingException e) {
       LOGGER.error("Error serialising rebalance context to JSON for persisting 
to ZK {}", _jobId, e);
     }
-    _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata, 
ControllerJobTypes.TENANT_REBALANCE);
-    _numUpdatesToZk++;
-    LOGGER.debug("Number of updates to Zk: {} for rebalanceJob: {}  ", 
_numUpdatesToZk, _jobId);
+    return jobMetadata;
+  }
+
+  public TenantRebalancer.TenantTableRebalanceJobContext pollQueue(boolean 
isParallel) {
+    final TenantRebalancer.TenantTableRebalanceJobContext[] ret =
+        new TenantRebalancer.TenantTableRebalanceJobContext[1];

Review Comment:
   Using a single-element array to work around lambda variable capture 
restrictions is a code smell. Consider refactoring this method to return the 
result directly from the lambda or use a more explicit approach with 
AtomicReference.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java:
##########
@@ -295,19 +275,24 @@ private void abortTableRebalanceJob(String 
tableNameWithType) {
    * @param tenantRebalanceContext The context of the tenant rebalance job.
    * @param progressStats The progress stats of the tenant rebalance job.
    */
-  private void markTenantRebalanceJobAsAborted(String jobId, Map<String, 
String> jobMetadata,
+  public static void markTenantRebalanceJobAsCancelled(String jobId, 
Map<String, String> jobMetadata,
       TenantRebalanceContext tenantRebalanceContext,
-      TenantRebalanceProgressStats progressStats) {
+      TenantRebalanceProgressStats progressStats, PinotHelixResourceManager 
resourceManager,
+      boolean isCancelledByUser) {
     LOGGER.info("Marking tenant rebalance job: {} as aborted", jobId);

Review Comment:
   The log message says 'aborted' but the method name and functionality now 
handle both cancellation and abortion. Update the log message to be more 
generic like 'Marking tenant rebalance job: {} as cancelled' or include the 
actual status being set.
   ```suggestion
       LOGGER.info("Marking tenant rebalance job: {} as {}", jobId, 
isCancelledByUser ? "cancelled" : "aborted");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to