Copilot commented on code in PR #15990:
URL: https://github.com/apache/pinot/pull/15990#discussion_r2126107656


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtils.java:
##########
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.util;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.helix.AccessOption;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.zookeeper.data.Stat;
+
+
+public class ControllerZkHelixUtils {
+
+  private ControllerZkHelixUtils() {
+    // Utility class
+  }
+
+  /**
+   * Adds a new job metadata entry for a controller job like table rebalance 
or segment reload into ZK
+   *
+   * @param propertyStore the ZK property store to write to
+   * @param jobId job's UUID
+   * @param jobMetadata the job metadata
+   * @param jobType the type of the job to figure out where the job metadata 
is kept in ZK
+   * @param prevJobMetadataChecker to check the previous job metadata before 
adding new one
+   * @return boolean representing success / failure of the ZK write step
+   */
+  public static boolean addControllerJobToZK(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String jobId,
+      Map<String, String> jobMetadata, String jobType, Predicate<Map<String, 
String>> prevJobMetadataChecker) {
+    
Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)
 != null,
+        CommonConstants.ControllerJob.SUBMISSION_TIME_MS
+            + " in JobMetadata record not set. Cannot expire these records");
+    String jobResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
+    Stat stat = new Stat();
+    ZNRecord jobsZnRecord = propertyStore.get(jobResourcePath, stat, 
AccessOption.PERSISTENT);
+    if (jobsZnRecord != null) {
+      Map<String, Map<String, String>> jobMetadataMap = 
jobsZnRecord.getMapFields();
+      Map<String, String> prevJobMetadata = jobMetadataMap.get(jobId);
+      if (!prevJobMetadataChecker.test(prevJobMetadata)) {
+        return false;
+      }
+      jobMetadataMap.put(jobId, jobMetadata);
+      if (jobMetadataMap.size() > 
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) {
+        jobMetadataMap = jobMetadataMap.entrySet().stream().sorted((v1, v2) -> 
Long.compare(
+                
Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)),
+                
Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS))))
+            .collect(Collectors.toList()).subList(0, 
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK)
+            .stream().collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+      }
+      jobsZnRecord.setMapFields(jobMetadataMap);
+      return propertyStore.set(jobResourcePath, jobsZnRecord, 
stat.getVersion(), AccessOption.PERSISTENT);
+    } else {
+      jobsZnRecord = new ZNRecord(jobResourcePath);

Review Comment:
   [nitpick] Using the full ZK path as the ZNRecord ID may be confusing; 
consider using the leaf node name (e.g., jobType) as the record ID and rely on 
the property store path for hierarchy.
   ```suggestion
         jobsZnRecord = new ZNRecord(jobType);
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtils.java:
##########
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.util;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.helix.AccessOption;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.zookeeper.data.Stat;
+
+
+public class ControllerZkHelixUtils {
+
+  private ControllerZkHelixUtils() {
+    // Utility class
+  }
+
+  /**
+   * Adds a new job metadata entry for a controller job like table rebalance 
or segment reload into ZK
+   *
+   * @param propertyStore the ZK property store to write to
+   * @param jobId job's UUID
+   * @param jobMetadata the job metadata
+   * @param jobType the type of the job to figure out where the job metadata 
is kept in ZK
+   * @param prevJobMetadataChecker to check the previous job metadata before 
adding new one
+   * @return boolean representing success / failure of the ZK write step
+   */
+  public static boolean addControllerJobToZK(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String jobId,
+      Map<String, String> jobMetadata, String jobType, Predicate<Map<String, 
String>> prevJobMetadataChecker) {
+    
Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)
 != null,
+        CommonConstants.ControllerJob.SUBMISSION_TIME_MS
+            + " in JobMetadata record not set. Cannot expire these records");
+    String jobResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
+    Stat stat = new Stat();
+    ZNRecord jobsZnRecord = propertyStore.get(jobResourcePath, stat, 
AccessOption.PERSISTENT);
+    if (jobsZnRecord != null) {
+      Map<String, Map<String, String>> jobMetadataMap = 
jobsZnRecord.getMapFields();
+      Map<String, String> prevJobMetadata = jobMetadataMap.get(jobId);
+      if (!prevJobMetadataChecker.test(prevJobMetadata)) {
+        return false;
+      }
+      jobMetadataMap.put(jobId, jobMetadata);
+      if (jobMetadataMap.size() > 
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) {
+        jobMetadataMap = jobMetadataMap.entrySet().stream().sorted((v1, v2) -> 
Long.compare(
+                
Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)),
+                
Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS))))
+            .collect(Collectors.toList()).subList(0, 
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK)
+            .stream().collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));

Review Comment:
   [nitpick] The trimming logic uses an intermediate list and subList; you 
could simplify by streaming entries, limiting to the max, and collecting into a 
`LinkedHashMap` to preserve insertion order and improve readability.
   ```suggestion
           jobMetadataMap = jobMetadataMap.entrySet().stream()
               .sorted((v1, v2) -> Long.compare(
                   
Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)),
                   
Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS))))
               
.limit(CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK)
               .collect(Collectors.toMap(
                   Map.Entry::getKey,
                   Map.Entry::getValue,
                   (e1, e2) -> e1, // Merge function, not needed as there are 
no duplicate keys
                   LinkedHashMap::new));
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java:
##########
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.TableSizeReader;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Single entry point for all table rebalance related operations. This class 
should be used to initiate table rebalance
+ * operations, rather than directly instantiating objects of {@link 
TableRebalancer}.
+ */
+public class TableRebalanceManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TableRebalanceManager.class);
+
+  private final PinotHelixResourceManager _resourceManager;
+  private final ControllerMetrics _controllerMetrics;
+  private final RebalancePreChecker _rebalancePreChecker;
+  private final TableSizeReader _tableSizeReader;
+  private final ExecutorService _executorService;
+
+  public TableRebalanceManager(PinotHelixResourceManager resourceManager, 
ControllerMetrics controllerMetrics,
+      RebalancePreChecker rebalancePreChecker, TableSizeReader 
tableSizeReader, ExecutorService executorService) {
+    _resourceManager = resourceManager;
+    _controllerMetrics = controllerMetrics;
+    _rebalancePreChecker = rebalancePreChecker;
+    _tableSizeReader = tableSizeReader;
+    _executorService = executorService;
+  }
+
+  /**
+   * Rebalance the table with the given name and type synchronously. It's the 
responsibility of the caller to ensure
+   * that this rebalance is run on the rebalance thread pool in the controller 
that respects the configuration
+   * {@link 
org.apache.pinot.controller.ControllerConf#CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS}.
+   *
+   * @param tableNameWithType name of the table to rebalance
+   * @param rebalanceConfig configuration for the rebalance operation
+   * @param rebalanceJobId ID of the rebalance job, which is used to track the 
progress of the rebalance operation
+   * @param trackRebalanceProgress whether to track rebalance progress stats 
in ZK
+   * @return result of the rebalance operation
+   * @throws TableNotFoundException if the table does not exist
+   */
+  public RebalanceResult rebalanceTable(String tableNameWithType, 
RebalanceConfig rebalanceConfig,
+      String rebalanceJobId, boolean trackRebalanceProgress)
+      throws TableNotFoundException {
+    TableConfig tableConfig = 
_resourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      throw new TableNotFoundException("Failed to find table config for table: 
" + tableNameWithType);
+    }
+    Preconditions.checkState(rebalanceJobId != null, "RebalanceId not 
populated in the rebalanceConfig");
+    ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver = null;
+    if (trackRebalanceProgress) {
+      zkBasedTableRebalanceObserver = new 
ZkBasedTableRebalanceObserver(tableNameWithType, rebalanceJobId,
+          TableRebalanceContext.forInitialAttempt(rebalanceJobId, 
rebalanceConfig),
+          _resourceManager.getPropertyStore());
+    }
+    return rebalanceTable(tableNameWithType, tableConfig, rebalanceJobId, 
rebalanceConfig,
+        zkBasedTableRebalanceObserver);
+  }
+
+  /**
+   * Rebalance the table with the given name and type asynchronously. The 
number of concurrent rebalances permitted
+   * on this controller is configured by
+   * {@link 
org.apache.pinot.controller.ControllerConf#CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS}
+   *
+   * @param tableNameWithType name of the table to rebalance
+   * @param rebalanceConfig configuration for the rebalance operation
+   * @param rebalanceJobId ID of the rebalance job, which is used to track the 
progress of the rebalance operation
+   * @param trackRebalanceProgress whether to track rebalance progress stats 
in ZK
+   * @return a CompletableFuture that will complete with the result of the 
rebalance operation
+   * @throws TableNotFoundException if the table does not exist
+   */
+  public CompletableFuture<RebalanceResult> rebalanceTableAsync(String 
tableNameWithType,
+      RebalanceConfig rebalanceConfig, String rebalanceJobId, boolean 
trackRebalanceProgress)
+      throws TableNotFoundException {
+    TableConfig tableConfig = 
_resourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      throw new TableNotFoundException("Failed to find table config for table: 
" + tableNameWithType);
+    }
+    return CompletableFuture.supplyAsync(
+        () -> {
+          try {
+            return rebalanceTable(tableNameWithType, rebalanceConfig, 
rebalanceJobId, trackRebalanceProgress);
+          } catch (TableNotFoundException e) {
+            // Should not happen since we already checked for table existence
+            throw new RuntimeException(e);
+          }
+        },
+        _executorService);
+  }
+
+  /**
+   * Rebalance the table with the given name and type asynchronously. The 
number of concurrent rebalances permitted
+   * on this controller is configured by
+   * {@link 
org.apache.pinot.controller.ControllerConf#CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS}
+   *
+   * @param tableNameWithType name of the table to rebalance
+   * @param tableConfig configuration for the table to rebalance
+   * @param rebalanceJobId ID of the rebalance job, which is used to track the 
progress of the rebalance operation
+   * @param rebalanceConfig configuration for the rebalance operation
+   * @param zkBasedTableRebalanceObserver observer to track rebalance progress 
in ZK
+   * @return a CompletableFuture that will complete with the result of the 
rebalance operation
+   */
+  public CompletableFuture<RebalanceResult> rebalanceTableAsync(String 
tableNameWithType, TableConfig tableConfig,
+      String rebalanceJobId, RebalanceConfig rebalanceConfig,
+      @Nullable ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver) {
+    return CompletableFuture.supplyAsync(
+        () -> rebalanceTable(tableNameWithType, tableConfig, rebalanceJobId, 
rebalanceConfig,
+            zkBasedTableRebalanceObserver),
+        _executorService);
+  }
+
+  @VisibleForTesting
+  RebalanceResult rebalanceTable(String tableNameWithType, TableConfig 
tableConfig, String rebalanceJobId,
+      RebalanceConfig rebalanceConfig, @Nullable ZkBasedTableRebalanceObserver 
zkBasedTableRebalanceObserver) {
+    String rebalanceJobInProgress = rebalanceJobInProgress(tableNameWithType);
+    if (rebalanceJobInProgress != null) {
+      String errorMsg = "Rebalance job is already in progress for table: " + 
tableNameWithType + ", jobId: "
+          + rebalanceJobInProgress + ". Please wait for the job to complete or 
cancel it before starting a new one.";
+      return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED, errorMsg, null, null, null, null, null);
+    }
+
+    Map<String, Set<String>> tierToSegmentsMap;
+    if (rebalanceConfig.isUpdateTargetTier()) {
+      tierToSegmentsMap = _resourceManager.updateTargetTier(rebalanceJobId, 
tableNameWithType, tableConfig);
+    } else {
+      tierToSegmentsMap = null;
+    }
+    TableRebalancer tableRebalancer =
+        new TableRebalancer(_resourceManager.getHelixZkManager(), 
zkBasedTableRebalanceObserver, _controllerMetrics,
+            _rebalancePreChecker, _tableSizeReader);
+
+    return tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
rebalanceJobId, tierToSegmentsMap);
+  }
+
+  /**
+   * Cancels ongoing rebalance jobs (if any) for the given table.
+   *
+   * @param tableNameWithType name of the table for which to cancel any 
ongoing rebalance job
+   * @return the list of job IDs that were cancelled
+   */
+  public List<String> cancelRebalance(String tableNameWithType) {

Review Comment:
   The `cancelRebalance` method marks jobs as CANCELLED in ZK but does not stop 
any in-flight rebalancing tasks; ensure the running rebalancer checks the 
status and aborts if cancellation is requested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to