klsince commented on code in PR #15029:
URL: https://github.com/apache/pinot/pull/15029#discussion_r1953238433


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreCheckerFactory.java:
##########
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import org.apache.pinot.controller.ControllerConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RebalancePreCheckerFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RebalancePreCheckerFactory.class);
+
+  private RebalancePreCheckerFactory() {
+  }
+
+  public static RebalancePreChecker create(ControllerConf controllerConf) {

Review Comment:
   nit: just pass in `String rebalancePreCheckerClassName` to remove deps of 
ControllerConf, as it's not used anyway other than getting the class name.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -186,13 +190,21 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
         && 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
         tableConfig.getRoutingConfig().getInstanceSelectorType());
     LOGGER.info(
-        "Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, 
includeConsuming: {}, bootstrap: {}, "
-            + "downtime: {}, minReplicasToKeepUpForNoDowntime: {}, 
enableStrictReplicaGroup: {}, lowDiskMode: {}, "
-            + "bestEfforts: {}, externalViewCheckIntervalInMs: {}, 
externalViewStabilizationTimeoutInMs: {}",
-        tableNameWithType, dryRun, reassignInstances, includeConsuming, 
bootstrap, downtime,
+        "Start rebalancing table: {} with dryRun: {}, preChecks: {}, 
reassignInstances: {}, includeConsuming: {}, "
+            + "bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime: 
{}, enableStrictReplicaGroup: {}, "
+            + "lowDiskMode: {}, bestEfforts: {}, 
externalViewCheckIntervalInMs: {}, "
+            + "externalViewStabilizationTimeoutInMs: {}",
+        tableNameWithType, dryRun, preChecks, reassignInstances, 
includeConsuming, bootstrap, downtime,
         minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup, 
lowDiskMode, bestEfforts,
         externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs);
 
+    // Perform pre-checks if enabled
+    Map<String, String> preChecksResult = null;
+    if (preChecks && _pinotHelixResourceManager != null) {

Review Comment:
   looks like _pinotHelixResourceManager is passed into rebalancer only to get 
the prechecker object? and then pass it to the prechecker. if so, we may let 
prechecker hold a ref of _pinotHelixResourceManager and just pass prechecker 
object to rebalancer.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -186,13 +190,21 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
         && 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
         tableConfig.getRoutingConfig().getInstanceSelectorType());
     LOGGER.info(
-        "Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, 
includeConsuming: {}, bootstrap: {}, "
-            + "downtime: {}, minReplicasToKeepUpForNoDowntime: {}, 
enableStrictReplicaGroup: {}, lowDiskMode: {}, "
-            + "bestEfforts: {}, externalViewCheckIntervalInMs: {}, 
externalViewStabilizationTimeoutInMs: {}",
-        tableNameWithType, dryRun, reassignInstances, includeConsuming, 
bootstrap, downtime,
+        "Start rebalancing table: {} with dryRun: {}, preChecks: {}, 
reassignInstances: {}, includeConsuming: {}, "
+            + "bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime: 
{}, enableStrictReplicaGroup: {}, "
+            + "lowDiskMode: {}, bestEfforts: {}, 
externalViewCheckIntervalInMs: {}, "
+            + "externalViewStabilizationTimeoutInMs: {}",
+        tableNameWithType, dryRun, preChecks, reassignInstances, 
includeConsuming, bootstrap, downtime,
         minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup, 
lowDiskMode, bestEfforts,
         externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs);
 
+    // Perform pre-checks if enabled
+    Map<String, String> preChecksResult = null;
+    if (preChecks && _pinotHelixResourceManager != null) {
+      preChecksResult = 
_pinotHelixResourceManager.getRebalancePreChecker().doRebalancePreChecks(rebalanceJobId,
+          tableNameWithType, tableConfig, _pinotHelixResourceManager);

Review Comment:
   I see it was changed to do prechecks only for dryRun mode. Then should this 
if-block get moved inside the dryrun if-block or check dryRun flag as well, so 
we don't do the check when dryRun is false.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java:
##########
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import java.util.Map;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+
+
+public interface RebalancePreChecker {
+  Map<String, String> doRebalancePreChecks(String rebalanceJobId, String 
tableNameWithType,

Review Comment:
   nit: I'd probably just call this check() as the class name has specified for 
pre-checks.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.TableMetadataReader;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DefaultRebalancePreChecker implements RebalancePreChecker {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultRebalancePreChecker.class);
+
+  public static final String NEEDS_RELOAD_STATUS = "needsReloadStatus";
+  public static final String IS_MINIMIZE_DATA_MOVEMENT = 
"isMinimizeDataMovement";
+
+  @Override
+  public Map<String, String> doRebalancePreChecks(String rebalanceJobId, 
String tableNameWithType,
+      TableConfig tableConfig, PinotHelixResourceManager 
pinotHelixResourceManager) {
+    LOGGER.info("Start pre-checks for table: {} with rebalanceJobId: {}", 
tableNameWithType, rebalanceJobId);
+
+    Map<String, String> preCheckResult = new HashMap<>();
+    // Check for reload status
+    Boolean needsReload = checkReloadNeededOnServers(rebalanceJobId, 
tableNameWithType, pinotHelixResourceManager);
+    preCheckResult.put(NEEDS_RELOAD_STATUS,
+        needsReload == null ? "error" : needsReload ? String.valueOf(true) : 
String.valueOf(false));
+    // Check whether minimizeDataMovement is set in TableConfig
+    boolean isMinimizeDataMovement = 
checkIsMinimizeDataMovement(rebalanceJobId, tableNameWithType, tableConfig);
+    preCheckResult.put(IS_MINIMIZE_DATA_MOVEMENT, 
String.valueOf(isMinimizeDataMovement));
+
+    LOGGER.info("End pre-checks for table: {} with rebalanceJobId: {}", 
tableNameWithType, rebalanceJobId);
+    return preCheckResult;
+  }
+
+  private static Boolean checkReloadNeededOnServers(String rebalanceJobId, 
String tableNameWithType,

Review Comment:
   Is this 'needReload' check to avoid unexpected segments reprocessing (e.g. 
to add new indexes) when rebalancing a table?
   
   If so, this check might be misleading because it checks segments on servers 
instead of segments in deepstore. So when segments are relocated to new 
servers, the new servers will download raw segments (that are not consistent 
with the latest index configs or schema) and reprocessing them. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.TableMetadataReader;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DefaultRebalancePreChecker implements RebalancePreChecker {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultRebalancePreChecker.class);
+
+  public static final String NEEDS_RELOAD_STATUS = "needsReloadStatus";
+  public static final String IS_MINIMIZE_DATA_MOVEMENT = 
"isMinimizeDataMovement";
+
+  @Override
+  public Map<String, String> doRebalancePreChecks(String rebalanceJobId, 
String tableNameWithType,
+      TableConfig tableConfig, PinotHelixResourceManager 
pinotHelixResourceManager) {
+    LOGGER.info("Start pre-checks for table: {} with rebalanceJobId: {}", 
tableNameWithType, rebalanceJobId);
+
+    Map<String, String> preCheckResult = new HashMap<>();
+    // Check for reload status
+    Boolean needsReload = checkReloadNeededOnServers(rebalanceJobId, 
tableNameWithType, pinotHelixResourceManager);
+    preCheckResult.put(NEEDS_RELOAD_STATUS,
+        needsReload == null ? "error" : needsReload ? String.valueOf(true) : 
String.valueOf(false));
+    // Check whether minimizeDataMovement is set in TableConfig
+    boolean isMinimizeDataMovement = 
checkIsMinimizeDataMovement(rebalanceJobId, tableNameWithType, tableConfig);
+    preCheckResult.put(IS_MINIMIZE_DATA_MOVEMENT, 
String.valueOf(isMinimizeDataMovement));
+
+    LOGGER.info("End pre-checks for table: {} with rebalanceJobId: {}", 
tableNameWithType, rebalanceJobId);
+    return preCheckResult;
+  }
+
+  private static Boolean checkReloadNeededOnServers(String rebalanceJobId, 
String tableNameWithType,
+      PinotHelixResourceManager pinotHelixResourceManager) {
+    // Use at most 10 threads to get whether reload is needed from servers
+    LOGGER.info("Fetching whether reload is needed for table: {} with 
rebalanceJobId: {}", tableNameWithType,
+        rebalanceJobId);
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Boolean needsReload = null;
+    if (pinotHelixResourceManager == null) {
+      return needsReload;

Review Comment:
   the executor is not closed, if this branch happens.
   
   but better check if this `protected ExecutorService _executorService;` 
inside ControllerStarter can be reused, as it's like a common executor to be 
reused for many things. If not able to reuse it, let's name the threads for 
this executor, so it's easier to find them in logs to debug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to