somandal commented on code in PR #15990:
URL: https://github.com/apache/pinot/pull/15990#discussion_r2136337151


##########
pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java:
##########
@@ -55,7 +57,12 @@ public PinotTableRebalancer(String zkAddress, String 
clusterName, boolean dryRun
   public RebalanceResult rebalance(String tableNameWithType) {
     TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
     Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: " + tableNameWithType);
-    return new TableRebalancer(_helixManager).rebalance(tableConfig, 
_rebalanceConfig,
-        TableRebalancer.createUniqueRebalanceJobIdentifier());
+
+    String jobId = TableRebalancer.createUniqueRebalanceJobIdentifier();
+    ZkBasedTableRebalanceObserver rebalanceObserver = new 
ZkBasedTableRebalanceObserver(tableNameWithType, jobId,
+        TableRebalanceContext.forInitialAttempt(jobId, _rebalanceConfig), 
_propertyStore);
+
+    return new TableRebalancer(_helixManager, rebalanceObserver, null, null, 
null)

Review Comment:
   just for my understanding - you create the `ZkBasedTableRebalanceObserver` 
here so that if someone use CLI to issue a rebalance, then rebalances issued 
via the API won't go through? But vice-versa is not possible, as here we cannot 
check ZK to ensure that a rebalance is not already ongoing?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java:
##########
@@ -203,11 +206,13 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
   protected TaskMetricsEmitter _taskMetricsEmitter;
   protected PoolingHttpClientConnectionManager _connectionManager;
   protected TenantRebalancer _tenantRebalancer;
-  protected ExecutorService _tenantRebalanceExecutorService;
+  protected ExecutorService _rebalancerExecutorService;

Review Comment:
   Perhaps add some comments about the recommended usage (i.e. this should be 
used by code paths that perform user-initiated rebalance)?
   
   Reason for this ask: This can be a bit confusing to decide which 
executorService to pass, for example, `SegmentRelocator` and the rebalance 
pre-checker also need an executor service, and are rebalance related, but the 
controller level's executorService is used (and it makes sense to use that one 
IMO).



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java:
##########
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.TableSizeReader;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Single entry point for all table rebalance related operations. This class 
should be used to initiate table rebalance
+ * operations, rather than directly instantiating objects of {@link 
TableRebalancer}.
+ */
+public class TableRebalanceManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TableRebalanceManager.class);
+
+  private final PinotHelixResourceManager _resourceManager;
+  private final ControllerMetrics _controllerMetrics;
+  private final RebalancePreChecker _rebalancePreChecker;
+  private final TableSizeReader _tableSizeReader;
+  private final ExecutorService _executorService;
+
+  public TableRebalanceManager(PinotHelixResourceManager resourceManager, 
ControllerMetrics controllerMetrics,
+      RebalancePreChecker rebalancePreChecker, TableSizeReader 
tableSizeReader, ExecutorService executorService) {
+    _resourceManager = resourceManager;
+    _controllerMetrics = controllerMetrics;
+    _rebalancePreChecker = rebalancePreChecker;
+    _tableSizeReader = tableSizeReader;
+    _executorService = executorService;
+  }
+
+  /**
+   * Rebalance the table with the given name and type synchronously. It's the 
responsibility of the caller to ensure
+   * that this rebalance is run on the rebalance thread pool in the controller 
that respects the configuration
+   * {@link 
org.apache.pinot.controller.ControllerConf#CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS}.
+   *
+   * @param tableNameWithType name of the table to rebalance
+   * @param rebalanceConfig configuration for the rebalance operation
+   * @param rebalanceJobId ID of the rebalance job, which is used to track the 
progress of the rebalance operation
+   * @param trackRebalanceProgress whether to track rebalance progress stats 
in ZK
+   * @return result of the rebalance operation
+   * @throws TableNotFoundException if the table does not exist
+   */
+  public RebalanceResult rebalanceTable(String tableNameWithType, 
RebalanceConfig rebalanceConfig,
+      String rebalanceJobId, boolean trackRebalanceProgress)
+      throws TableNotFoundException {
+    TableConfig tableConfig = 
_resourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      throw new TableNotFoundException("Failed to find table config for table: 
" + tableNameWithType);
+    }
+    Preconditions.checkState(rebalanceJobId != null, "RebalanceId not 
populated in the rebalanceConfig");
+    ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver = null;
+    if (trackRebalanceProgress) {
+      zkBasedTableRebalanceObserver = new 
ZkBasedTableRebalanceObserver(tableNameWithType, rebalanceJobId,
+          TableRebalanceContext.forInitialAttempt(rebalanceJobId, 
rebalanceConfig),
+          _resourceManager.getPropertyStore());
+    }
+    return rebalanceTable(tableNameWithType, tableConfig, rebalanceJobId, 
rebalanceConfig,
+        zkBasedTableRebalanceObserver);
+  }
+
+  /**
+   * Rebalance the table with the given name and type asynchronously. The 
number of concurrent rebalances permitted
+   * on this controller is configured by
+   * {@link 
org.apache.pinot.controller.ControllerConf#CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS}
+   *
+   * @param tableNameWithType name of the table to rebalance
+   * @param rebalanceConfig configuration for the rebalance operation
+   * @param rebalanceJobId ID of the rebalance job, which is used to track the 
progress of the rebalance operation
+   * @param trackRebalanceProgress whether to track rebalance progress stats 
in ZK
+   * @return a CompletableFuture that will complete with the result of the 
rebalance operation
+   * @throws TableNotFoundException if the table does not exist
+   */
+  public CompletableFuture<RebalanceResult> rebalanceTableAsync(String 
tableNameWithType,
+      RebalanceConfig rebalanceConfig, String rebalanceJobId, boolean 
trackRebalanceProgress)
+      throws TableNotFoundException {
+    TableConfig tableConfig = 
_resourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      throw new TableNotFoundException("Failed to find table config for table: 
" + tableNameWithType);
+    }
+    return CompletableFuture.supplyAsync(
+        () -> {
+          try {
+            return rebalanceTable(tableNameWithType, rebalanceConfig, 
rebalanceJobId, trackRebalanceProgress);
+          } catch (TableNotFoundException e) {
+            // Should not happen since we already checked for table existence
+            throw new RuntimeException(e);
+          }
+        },
+        _executorService);
+  }
+
+  /**
+   * Rebalance the table with the given name and type asynchronously. The 
number of concurrent rebalances permitted
+   * on this controller is configured by
+   * {@link 
org.apache.pinot.controller.ControllerConf#CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS}
+   *
+   * @param tableNameWithType name of the table to rebalance
+   * @param tableConfig configuration for the table to rebalance
+   * @param rebalanceJobId ID of the rebalance job, which is used to track the 
progress of the rebalance operation
+   * @param rebalanceConfig configuration for the rebalance operation
+   * @param zkBasedTableRebalanceObserver observer to track rebalance progress 
in ZK
+   * @return a CompletableFuture that will complete with the result of the 
rebalance operation
+   */
+  public CompletableFuture<RebalanceResult> rebalanceTableAsync(String 
tableNameWithType, TableConfig tableConfig,
+      String rebalanceJobId, RebalanceConfig rebalanceConfig,
+      @Nullable ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver) {
+    return CompletableFuture.supplyAsync(
+        () -> rebalanceTable(tableNameWithType, tableConfig, rebalanceJobId, 
rebalanceConfig,
+            zkBasedTableRebalanceObserver),
+        _executorService);
+  }
+
+  @VisibleForTesting
+  RebalanceResult rebalanceTable(String tableNameWithType, TableConfig 
tableConfig, String rebalanceJobId,
+      RebalanceConfig rebalanceConfig, @Nullable ZkBasedTableRebalanceObserver 
zkBasedTableRebalanceObserver) {
+    String rebalanceJobInProgress = rebalanceJobInProgress(tableNameWithType);
+    if (rebalanceJobInProgress != null) {
+      String errorMsg = "Rebalance job is already in progress for table: " + 
tableNameWithType + ", jobId: "
+          + rebalanceJobInProgress + ". Please wait for the job to complete or 
cancel it before starting a new one.";
+      return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED, errorMsg, null, null, null, null, null);
+    }
+
+    Map<String, Set<String>> tierToSegmentsMap;
+    if (rebalanceConfig.isUpdateTargetTier()) {
+      tierToSegmentsMap = _resourceManager.updateTargetTier(rebalanceJobId, 
tableNameWithType, tableConfig);
+    } else {
+      tierToSegmentsMap = null;
+    }
+    TableRebalancer tableRebalancer =
+        new TableRebalancer(_resourceManager.getHelixZkManager(), 
zkBasedTableRebalanceObserver, _controllerMetrics,
+            _rebalancePreChecker, _tableSizeReader);
+
+    return tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
rebalanceJobId, tierToSegmentsMap);
+  }
+
+  /**
+   * Cancels ongoing rebalance jobs (if any) for the given table.
+   *
+   * @param tableNameWithType name of the table for which to cancel any 
ongoing rebalance job
+   * @return the list of job IDs that were cancelled
+   */
+  public List<String> cancelRebalance(String tableNameWithType) {
+    List<String> cancelledJobIds = new ArrayList<>();
+    _resourceManager.updateJobsForTable(tableNameWithType, 
ControllerJobType.TABLE_REBALANCE,
+        jobMetadata -> {
+          String jobId = jobMetadata.get(CommonConstants.ControllerJob.JOB_ID);
+          try {
+            String jobStatsInStr = 
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+            TableRebalanceProgressStats jobStats =
+                JsonUtils.stringToObject(jobStatsInStr, 
TableRebalanceProgressStats.class);
+            if (jobStats.getStatus() != RebalanceResult.Status.IN_PROGRESS) {
+              return;
+            }
+
+            LOGGER.info("Cancelling rebalance job: {} for table: {}", jobId, 
tableNameWithType);
+            jobStats.setStatus(RebalanceResult.Status.CANCELLED);
+            
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+                JsonUtils.objectToString(jobStats));
+            cancelledJobIds.add(jobId);
+          } catch (Exception e) {
+            LOGGER.error("Failed to cancel rebalance job: {} for table: {}", 
jobId, tableNameWithType, e);
+          }
+        });
+    return cancelledJobIds;
+  }
+
+  /**
+   * Checks if there is an ongoing rebalance job for the given table.
+   *
+   * @param tableNameWithType name of the table to check for ongoing rebalance 
jobs
+   * @return jobId of the ongoing rebalance job if one exists, {@code null} 
otherwise
+   */
+  @Nullable
+  private String rebalanceJobInProgress(String tableNameWithType) {

Review Comment:
   Should we allow rebalance `dryRun=true` to go through even if an existing 
rebalance job is on-going? Perhaps we can add a field in the returned 
`RebalanceResult` to indicate that a rebalance is already on-going in this case 
(though this can be a future improvement and not needed for this PR)?
   
   It might be useful to allow this in case someone just wants to check the 
returned value (e.g. check the pre-checks, or the instance assignment / segment 
assignment)



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java:
##########
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.TableSizeReader;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Single entry point for all table rebalance related operations. This class 
should be used to initiate table rebalance
+ * operations, rather than directly instantiating objects of {@link 
TableRebalancer}.
+ */
+public class TableRebalanceManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TableRebalanceManager.class);
+
+  private final PinotHelixResourceManager _resourceManager;
+  private final ControllerMetrics _controllerMetrics;
+  private final RebalancePreChecker _rebalancePreChecker;
+  private final TableSizeReader _tableSizeReader;
+  private final ExecutorService _executorService;
+
+  public TableRebalanceManager(PinotHelixResourceManager resourceManager, 
ControllerMetrics controllerMetrics,
+      RebalancePreChecker rebalancePreChecker, TableSizeReader 
tableSizeReader, ExecutorService executorService) {
+    _resourceManager = resourceManager;
+    _controllerMetrics = controllerMetrics;
+    _rebalancePreChecker = rebalancePreChecker;
+    _tableSizeReader = tableSizeReader;
+    _executorService = executorService;
+  }
+
+  /**
+   * Rebalance the table with the given name and type synchronously. It's the 
responsibility of the caller to ensure
+   * that this rebalance is run on the rebalance thread pool in the controller 
that respects the configuration
+   * {@link 
org.apache.pinot.controller.ControllerConf#CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS}.
+   *
+   * @param tableNameWithType name of the table to rebalance
+   * @param rebalanceConfig configuration for the rebalance operation
+   * @param rebalanceJobId ID of the rebalance job, which is used to track the 
progress of the rebalance operation
+   * @param trackRebalanceProgress whether to track rebalance progress stats 
in ZK
+   * @return result of the rebalance operation
+   * @throws TableNotFoundException if the table does not exist
+   */
+  public RebalanceResult rebalanceTable(String tableNameWithType, 
RebalanceConfig rebalanceConfig,
+      String rebalanceJobId, boolean trackRebalanceProgress)
+      throws TableNotFoundException {
+    TableConfig tableConfig = 
_resourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      throw new TableNotFoundException("Failed to find table config for table: 
" + tableNameWithType);
+    }
+    Preconditions.checkState(rebalanceJobId != null, "RebalanceId not 
populated in the rebalanceConfig");
+    ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver = null;
+    if (trackRebalanceProgress) {
+      zkBasedTableRebalanceObserver = new 
ZkBasedTableRebalanceObserver(tableNameWithType, rebalanceJobId,
+          TableRebalanceContext.forInitialAttempt(rebalanceJobId, 
rebalanceConfig),
+          _resourceManager.getPropertyStore());
+    }
+    return rebalanceTable(tableNameWithType, tableConfig, rebalanceJobId, 
rebalanceConfig,
+        zkBasedTableRebalanceObserver);
+  }
+
+  /**
+   * Rebalance the table with the given name and type asynchronously. The 
number of concurrent rebalances permitted
+   * on this controller is configured by
+   * {@link 
org.apache.pinot.controller.ControllerConf#CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS}
+   *
+   * @param tableNameWithType name of the table to rebalance
+   * @param rebalanceConfig configuration for the rebalance operation
+   * @param rebalanceJobId ID of the rebalance job, which is used to track the 
progress of the rebalance operation
+   * @param trackRebalanceProgress whether to track rebalance progress stats 
in ZK
+   * @return a CompletableFuture that will complete with the result of the 
rebalance operation
+   * @throws TableNotFoundException if the table does not exist
+   */
+  public CompletableFuture<RebalanceResult> rebalanceTableAsync(String 
tableNameWithType,
+      RebalanceConfig rebalanceConfig, String rebalanceJobId, boolean 
trackRebalanceProgress)
+      throws TableNotFoundException {
+    TableConfig tableConfig = 
_resourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      throw new TableNotFoundException("Failed to find table config for table: 
" + tableNameWithType);
+    }
+    return CompletableFuture.supplyAsync(
+        () -> {
+          try {
+            return rebalanceTable(tableNameWithType, rebalanceConfig, 
rebalanceJobId, trackRebalanceProgress);
+          } catch (TableNotFoundException e) {
+            // Should not happen since we already checked for table existence
+            throw new RuntimeException(e);
+          }
+        },
+        _executorService);
+  }
+
+  /**
+   * Rebalance the table with the given name and type asynchronously. The 
number of concurrent rebalances permitted
+   * on this controller is configured by
+   * {@link 
org.apache.pinot.controller.ControllerConf#CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS}
+   *
+   * @param tableNameWithType name of the table to rebalance
+   * @param tableConfig configuration for the table to rebalance
+   * @param rebalanceJobId ID of the rebalance job, which is used to track the 
progress of the rebalance operation
+   * @param rebalanceConfig configuration for the rebalance operation
+   * @param zkBasedTableRebalanceObserver observer to track rebalance progress 
in ZK
+   * @return a CompletableFuture that will complete with the result of the 
rebalance operation
+   */
+  public CompletableFuture<RebalanceResult> rebalanceTableAsync(String 
tableNameWithType, TableConfig tableConfig,
+      String rebalanceJobId, RebalanceConfig rebalanceConfig,
+      @Nullable ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver) {
+    return CompletableFuture.supplyAsync(
+        () -> rebalanceTable(tableNameWithType, tableConfig, rebalanceJobId, 
rebalanceConfig,
+            zkBasedTableRebalanceObserver),
+        _executorService);
+  }
+
+  @VisibleForTesting
+  RebalanceResult rebalanceTable(String tableNameWithType, TableConfig 
tableConfig, String rebalanceJobId,
+      RebalanceConfig rebalanceConfig, @Nullable ZkBasedTableRebalanceObserver 
zkBasedTableRebalanceObserver) {
+    String rebalanceJobInProgress = rebalanceJobInProgress(tableNameWithType);
+    if (rebalanceJobInProgress != null) {
+      String errorMsg = "Rebalance job is already in progress for table: " + 
tableNameWithType + ", jobId: "
+          + rebalanceJobInProgress + ". Please wait for the job to complete or 
cancel it before starting a new one.";
+      return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED, errorMsg, null, null, null, null, null);
+    }
+
+    Map<String, Set<String>> tierToSegmentsMap;
+    if (rebalanceConfig.isUpdateTargetTier()) {
+      tierToSegmentsMap = _resourceManager.updateTargetTier(rebalanceJobId, 
tableNameWithType, tableConfig);
+    } else {
+      tierToSegmentsMap = null;
+    }
+    TableRebalancer tableRebalancer =
+        new TableRebalancer(_resourceManager.getHelixZkManager(), 
zkBasedTableRebalanceObserver, _controllerMetrics,
+            _rebalancePreChecker, _tableSizeReader);
+
+    return tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
rebalanceJobId, tierToSegmentsMap);
+  }
+
+  /**
+   * Cancels ongoing rebalance jobs (if any) for the given table.
+   *
+   * @param tableNameWithType name of the table for which to cancel any 
ongoing rebalance job
+   * @return the list of job IDs that were cancelled
+   */
+  public List<String> cancelRebalance(String tableNameWithType) {
+    List<String> cancelledJobIds = new ArrayList<>();
+    _resourceManager.updateJobsForTable(tableNameWithType, 
ControllerJobType.TABLE_REBALANCE,

Review Comment:
   similar to the original code, should we capture the return (boolean) and add 
a log message on whether it was `true`/`false`?



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java:
##########
@@ -1258,6 +1265,41 @@ private void checkRebalanceDryRunSummary(RebalanceResult 
rebalanceResult, Rebala
     assertEquals(numServersUnchanged, 
summaryResult.getServerInfo().getServersUnchanged().size());
   }
 
+  @Test
+  public void testDisallowMultipleConcurrentRebalancesOnSameTable() throws 
Exception {
+    // Manually write an IN_PROGRESS rebalance job to ZK instead of trying to 
collide multiple actual rebalance
+    // attempts which will be prone to race conditions and cause this test to 
be flaky. We only reject a rebalance job
+    // if there is an IN_PROGRESS rebalance job for the same table in ZK, so 
we could actually end up with more than
+    // one active rebalance job if both are started at the exact same time 
since the progress stats are written to ZK
+    // after some initial pre-checks are done. However, rebalances are 
idempotent, and we don't actually care too much
+    // about avoiding this edge case race condition as long as in most cases 
we are able to prevent users from
+    // triggering a rebalance for a table that already has an in-progress 
rebalance job.
+    String jobId = TableRebalancer.createUniqueRebalanceJobIdentifier();
+    String tableNameWithType = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(getTableName());
+    TableRebalanceProgressStats progressStats = new 
TableRebalanceProgressStats();
+    progressStats.setStatus(RebalanceResult.Status.IN_PROGRESS);
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, 
tableNameWithType);
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
+    jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(System.currentTimeMillis()));
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.TABLE_REBALANCE);
+    
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+        JsonUtils.objectToString(progressStats));
+    ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, jobId, 
jobMetadata, ControllerJobType.TABLE_REBALANCE,
+        prevJobMetadata -> true);
+
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    String response = sendPostRequest(getRebalanceUrl(rebalanceConfig, 
TableType.REALTIME));
+    assertTrue(response.contains("Rebalance job is already in progress for 
table"));

Review Comment:
   nit: can we also assert on the returned status code?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java:
##########
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.TableSizeReader;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Single entry point for all table rebalance related operations. This class 
should be used to initiate table rebalance
+ * operations, rather than directly instantiating objects of {@link 
TableRebalancer}.
+ */
+public class TableRebalanceManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TableRebalanceManager.class);
+
+  private final PinotHelixResourceManager _resourceManager;
+  private final ControllerMetrics _controllerMetrics;
+  private final RebalancePreChecker _rebalancePreChecker;
+  private final TableSizeReader _tableSizeReader;
+  private final ExecutorService _executorService;
+
+  public TableRebalanceManager(PinotHelixResourceManager resourceManager, 
ControllerMetrics controllerMetrics,
+      RebalancePreChecker rebalancePreChecker, TableSizeReader 
tableSizeReader, ExecutorService executorService) {
+    _resourceManager = resourceManager;
+    _controllerMetrics = controllerMetrics;
+    _rebalancePreChecker = rebalancePreChecker;
+    _tableSizeReader = tableSizeReader;
+    _executorService = executorService;
+  }
+
+  /**
+   * Rebalance the table with the given name and type synchronously. It's the 
responsibility of the caller to ensure

Review Comment:
   nit: is this comment `It's the responsibility of the caller to ensure that 
this rebalance is run on the rebalance thread pool in the controller that 
respects the configuration ...` needed? we do create this class with the 
correct executor based on the controllerconf, right? is any action needed for 
the caller of this function?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -783,29 +781,7 @@ public List<String> cancelRebalance(
       @Context HttpHeaders headers) {
     tableName = DatabaseUtils.translateTableName(tableName, headers);
     String tableNameWithType = constructTableNameWithType(tableName, 
tableTypeStr);
-    List<String> cancelledJobIds = new ArrayList<>();
-    boolean updated =
-        _pinotHelixResourceManager.updateJobsForTable(tableNameWithType, 
ControllerJobType.TABLE_REBALANCE,
-            jobMetadata -> {
-              String jobId = 
jobMetadata.get(CommonConstants.ControllerJob.JOB_ID);
-              try {
-                String jobStatsInStr = 
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
-                TableRebalanceProgressStats jobStats =
-                    JsonUtils.stringToObject(jobStatsInStr, 
TableRebalanceProgressStats.class);
-                if (jobStats.getStatus() != 
RebalanceResult.Status.IN_PROGRESS) {
-                  return;
-                }
-                cancelledJobIds.add(jobId);
-                LOGGER.info("Cancel rebalance job: {} for table: {}", jobId, 
tableNameWithType);
-                jobStats.setStatus(RebalanceResult.Status.CANCELLED);
-                
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
-                    JsonUtils.objectToString(jobStats));
-              } catch (Exception e) {
-                LOGGER.error("Failed to cancel rebalance job: {} for table: 
{}", jobId, tableNameWithType, e);
-              }
-            });
-    LOGGER.info("Tried to cancel existing jobs at best effort and done: {}", 
updated);
-    return cancelledJobIds;
+    return _tableRebalanceManager.cancelRebalance(tableNameWithType);
   }
 
   @GET

Review Comment:
   just wondering - would it make sense to also add a getRebalanceStatus() 
function to  `_tableRebalanceManager`? not necessary, but I see you did move 
the cancel to that class



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java:
##########
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.TableSizeReader;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Single entry point for all table rebalance related operations. This class 
should be used to initiate table rebalance
+ * operations, rather than directly instantiating objects of {@link 
TableRebalancer}.
+ */
+public class TableRebalanceManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TableRebalanceManager.class);
+
+  private final PinotHelixResourceManager _resourceManager;
+  private final ControllerMetrics _controllerMetrics;
+  private final RebalancePreChecker _rebalancePreChecker;
+  private final TableSizeReader _tableSizeReader;
+  private final ExecutorService _executorService;
+
+  public TableRebalanceManager(PinotHelixResourceManager resourceManager, 
ControllerMetrics controllerMetrics,
+      RebalancePreChecker rebalancePreChecker, TableSizeReader 
tableSizeReader, ExecutorService executorService) {
+    _resourceManager = resourceManager;
+    _controllerMetrics = controllerMetrics;
+    _rebalancePreChecker = rebalancePreChecker;
+    _tableSizeReader = tableSizeReader;
+    _executorService = executorService;
+  }
+
+  /**
+   * Rebalance the table with the given name and type synchronously. It's the 
responsibility of the caller to ensure
+   * that this rebalance is run on the rebalance thread pool in the controller 
that respects the configuration
+   * {@link 
org.apache.pinot.controller.ControllerConf#CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS}.
+   *
+   * @param tableNameWithType name of the table to rebalance
+   * @param rebalanceConfig configuration for the rebalance operation
+   * @param rebalanceJobId ID of the rebalance job, which is used to track the 
progress of the rebalance operation
+   * @param trackRebalanceProgress whether to track rebalance progress stats 
in ZK
+   * @return result of the rebalance operation
+   * @throws TableNotFoundException if the table does not exist
+   */
+  public RebalanceResult rebalanceTable(String tableNameWithType, 
RebalanceConfig rebalanceConfig,
+      String rebalanceJobId, boolean trackRebalanceProgress)
+      throws TableNotFoundException {
+    TableConfig tableConfig = 
_resourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      throw new TableNotFoundException("Failed to find table config for table: 
" + tableNameWithType);
+    }
+    Preconditions.checkState(rebalanceJobId != null, "RebalanceId not 
populated in the rebalanceConfig");
+    ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver = null;
+    if (trackRebalanceProgress) {
+      zkBasedTableRebalanceObserver = new 
ZkBasedTableRebalanceObserver(tableNameWithType, rebalanceJobId,
+          TableRebalanceContext.forInitialAttempt(rebalanceJobId, 
rebalanceConfig),
+          _resourceManager.getPropertyStore());
+    }
+    return rebalanceTable(tableNameWithType, tableConfig, rebalanceJobId, 
rebalanceConfig,
+        zkBasedTableRebalanceObserver);
+  }
+
+  /**
+   * Rebalance the table with the given name and type asynchronously. The 
number of concurrent rebalances permitted

Review Comment:
   same question here about the comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to