klsince commented on code in PR #15029: URL: https://github.com/apache/pinot/pull/15029#discussion_r1953238433
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreCheckerFactory.java: ########## @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance; + +import org.apache.pinot.controller.ControllerConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class RebalancePreCheckerFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(RebalancePreCheckerFactory.class); + + private RebalancePreCheckerFactory() { + } + + public static RebalancePreChecker create(ControllerConf controllerConf) { Review Comment: nit: just pass in `String rebalancePreCheckerClassName` to remove deps of ControllerConf, as it's not used anyway other than getting the class name. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -186,13 +190,21 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb && RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase( tableConfig.getRoutingConfig().getInstanceSelectorType()); LOGGER.info( - "Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, includeConsuming: {}, bootstrap: {}, " - + "downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}, lowDiskMode: {}, " - + "bestEfforts: {}, externalViewCheckIntervalInMs: {}, externalViewStabilizationTimeoutInMs: {}", - tableNameWithType, dryRun, reassignInstances, includeConsuming, bootstrap, downtime, + "Start rebalancing table: {} with dryRun: {}, preChecks: {}, reassignInstances: {}, includeConsuming: {}, " + + "bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}, " + + "lowDiskMode: {}, bestEfforts: {}, externalViewCheckIntervalInMs: {}, " + + "externalViewStabilizationTimeoutInMs: {}", + tableNameWithType, dryRun, preChecks, reassignInstances, includeConsuming, bootstrap, downtime, minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup, lowDiskMode, bestEfforts, externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs); + // Perform pre-checks if enabled + Map<String, String> preChecksResult = null; + if (preChecks && _pinotHelixResourceManager != null) { Review Comment: looks like _pinotHelixResourceManager is passed into rebalancer only to get the prechecker object? and then pass it to the prechecker. if so, we may let prechecker hold a ref of _pinotHelixResourceManager and just pass prechecker object to rebalancer. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -186,13 +190,21 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb && RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase( tableConfig.getRoutingConfig().getInstanceSelectorType()); LOGGER.info( - "Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, includeConsuming: {}, bootstrap: {}, " - + "downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}, lowDiskMode: {}, " - + "bestEfforts: {}, externalViewCheckIntervalInMs: {}, externalViewStabilizationTimeoutInMs: {}", - tableNameWithType, dryRun, reassignInstances, includeConsuming, bootstrap, downtime, + "Start rebalancing table: {} with dryRun: {}, preChecks: {}, reassignInstances: {}, includeConsuming: {}, " + + "bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}, " + + "lowDiskMode: {}, bestEfforts: {}, externalViewCheckIntervalInMs: {}, " + + "externalViewStabilizationTimeoutInMs: {}", + tableNameWithType, dryRun, preChecks, reassignInstances, includeConsuming, bootstrap, downtime, minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup, lowDiskMode, bestEfforts, externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs); + // Perform pre-checks if enabled + Map<String, String> preChecksResult = null; + if (preChecks && _pinotHelixResourceManager != null) { + preChecksResult = _pinotHelixResourceManager.getRebalancePreChecker().doRebalancePreChecks(rebalanceJobId, + tableNameWithType, tableConfig, _pinotHelixResourceManager); Review Comment: I see it was changed to do prechecks only for dryRun mode. Then should this if-block get moved inside the dryrun if-block or check dryRun flag as well, so we don't do the check when dryRun is false. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java: ########## @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance; + +import java.util.Map; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.spi.config.table.TableConfig; + + +public interface RebalancePreChecker { + Map<String, String> doRebalancePreChecks(String rebalanceJobId, String tableNameWithType, Review Comment: nit: I'd probably just call this check() as the class name has specified for pre-checks. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java: ########## @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; +import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils; +import org.apache.pinot.common.exception.InvalidConfigException; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.util.TableMetadataReader; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig; +import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class DefaultRebalancePreChecker implements RebalancePreChecker { + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRebalancePreChecker.class); + + public static final String NEEDS_RELOAD_STATUS = "needsReloadStatus"; + public static final String IS_MINIMIZE_DATA_MOVEMENT = "isMinimizeDataMovement"; + + @Override + public Map<String, String> doRebalancePreChecks(String rebalanceJobId, String tableNameWithType, + TableConfig tableConfig, PinotHelixResourceManager pinotHelixResourceManager) { + LOGGER.info("Start pre-checks for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); + + Map<String, String> preCheckResult = new HashMap<>(); + // Check for reload status + Boolean needsReload = checkReloadNeededOnServers(rebalanceJobId, tableNameWithType, pinotHelixResourceManager); + preCheckResult.put(NEEDS_RELOAD_STATUS, + needsReload == null ? "error" : needsReload ? String.valueOf(true) : String.valueOf(false)); + // Check whether minimizeDataMovement is set in TableConfig + boolean isMinimizeDataMovement = checkIsMinimizeDataMovement(rebalanceJobId, tableNameWithType, tableConfig); + preCheckResult.put(IS_MINIMIZE_DATA_MOVEMENT, String.valueOf(isMinimizeDataMovement)); + + LOGGER.info("End pre-checks for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); + return preCheckResult; + } + + private static Boolean checkReloadNeededOnServers(String rebalanceJobId, String tableNameWithType, Review Comment: Is this 'needReload' check to avoid unexpected segments reprocessing (e.g. to add new indexes) when rebalancing a table? If so, this check might be misleading because it checks segments on servers instead of segments in deepstore. So when segments are relocated to new servers, the new servers will download raw segments (that are not consistent with the latest index configs or schema) and reprocessing them. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java: ########## @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; +import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils; +import org.apache.pinot.common.exception.InvalidConfigException; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.util.TableMetadataReader; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig; +import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class DefaultRebalancePreChecker implements RebalancePreChecker { + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRebalancePreChecker.class); + + public static final String NEEDS_RELOAD_STATUS = "needsReloadStatus"; + public static final String IS_MINIMIZE_DATA_MOVEMENT = "isMinimizeDataMovement"; + + @Override + public Map<String, String> doRebalancePreChecks(String rebalanceJobId, String tableNameWithType, + TableConfig tableConfig, PinotHelixResourceManager pinotHelixResourceManager) { + LOGGER.info("Start pre-checks for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); + + Map<String, String> preCheckResult = new HashMap<>(); + // Check for reload status + Boolean needsReload = checkReloadNeededOnServers(rebalanceJobId, tableNameWithType, pinotHelixResourceManager); + preCheckResult.put(NEEDS_RELOAD_STATUS, + needsReload == null ? "error" : needsReload ? String.valueOf(true) : String.valueOf(false)); + // Check whether minimizeDataMovement is set in TableConfig + boolean isMinimizeDataMovement = checkIsMinimizeDataMovement(rebalanceJobId, tableNameWithType, tableConfig); + preCheckResult.put(IS_MINIMIZE_DATA_MOVEMENT, String.valueOf(isMinimizeDataMovement)); + + LOGGER.info("End pre-checks for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); + return preCheckResult; + } + + private static Boolean checkReloadNeededOnServers(String rebalanceJobId, String tableNameWithType, + PinotHelixResourceManager pinotHelixResourceManager) { + // Use at most 10 threads to get whether reload is needed from servers + LOGGER.info("Fetching whether reload is needed for table: {} with rebalanceJobId: {}", tableNameWithType, + rebalanceJobId); + ExecutorService executorService = Executors.newFixedThreadPool(10); + Boolean needsReload = null; + if (pinotHelixResourceManager == null) { + return needsReload; Review Comment: the executor is not closed, if this branch happens. but better check if this `protected ExecutorService _executorService;` inside ControllerStarter can be reused, as it's like a common executor to be reused for many things. If not able to reuse it, let's name the threads for this executor, so it's easier to find them in logs to debug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org