This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f220b398c Add support for performing pre-checks for TableRebalance 
(#15029)
5f220b398c is described below

commit 5f220b398cda1fde87a286492e05521d48923634
Author: Sonam Mandal <sonam.man...@startree.ai>
AuthorDate: Fri Feb 14 21:47:03 2025 -0800

    Add support for performing pre-checks for TableRebalance (#15029)
    
    * Add support for performing pre-checks for TableRebalance
---
 .../pinot/controller/BaseControllerStarter.java    |   6 +-
 .../apache/pinot/controller/ControllerConf.java    |  11 ++
 .../api/resources/PinotSegmentRestletResource.java |   2 +-
 .../api/resources/PinotTableRestletResource.java   |  14 +-
 .../helix/core/PinotHelixResourceManager.java      |  31 ++++-
 .../core/rebalance/DefaultRebalancePreChecker.java | 142 +++++++++++++++++++++
 .../helix/core/rebalance/RebalanceConfig.java      |  23 +++-
 .../helix/core/rebalance/RebalancePreChecker.java  |  31 +++++
 .../core/rebalance/RebalancePreCheckerFactory.java |  42 ++++++
 .../helix/core/rebalance/RebalanceResult.java      |  11 +-
 .../helix/core/rebalance/TableRebalancer.java      |  70 ++++++----
 .../rebalance/tenant/DefaultTenantRebalancer.java  |   4 +-
 .../rebalance/tenant/TenantRebalanceResult.java    |   2 +-
 .../util/ServerSegmentMetadataReader.java          |  24 +++-
 .../pinot/controller/util/TableMetadataReader.java |  35 ++++-
 .../TableRebalancerClusterStatelessTest.java       |  15 ++-
 .../tests/OfflineClusterIntegrationTest.java       | 119 +++++++++++++++++
 17 files changed, 529 insertions(+), 53 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index a0ce503d41..e95e0f5d87 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -252,11 +252,11 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
       // queries)
       FunctionRegistry.init();
       _adminApp = createControllerAdminApp();
+      // This executor service is used to do async tasks from multiget util or 
table rebalancing.
+      _executorService = 
createExecutorService(_config.getControllerExecutorNumThreads(), 
"async-task-thread-%d");
       // Do not use this before the invocation of {@link 
PinotHelixResourceManager::start()}, which happens in {@link
       // ControllerStarter::start()}
       _helixResourceManager = createHelixResourceManager();
-      // This executor service is used to do async tasks from multiget util or 
table rebalancing.
-      _executorService = 
createExecutorService(_config.getControllerExecutorNumThreads(), 
"async-task-thread-%d");
       _tenantRebalanceExecutorService = 
createExecutorService(_config.getControllerExecutorRebalanceNumThreads(),
           "tenant-rebalance-thread-%d");
       _tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager, 
_tenantRebalanceExecutorService);
@@ -324,7 +324,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
    * @return A new instance of PinotHelixResourceManager.
    */
   protected PinotHelixResourceManager createHelixResourceManager() {
-    return new PinotHelixResourceManager(_config);
+    return new PinotHelixResourceManager(_config, _executorService);
   }
 
   public PinotHelixResourceManager getHelixResourceManager() {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 2957ca81fd..5ea1fd20cf 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -293,6 +293,7 @@ public class ControllerConf extends PinotConfiguration {
   public static final String ACCESS_CONTROL_USERNAME = 
"access.control.init.username";
   public static final String ACCESS_CONTROL_PASSWORD = 
"access.control.init.password";
   public static final String LINEAGE_MANAGER_CLASS = 
"controller.lineage.manager.class";
+  public static final String REBALANCE_PRE_CHECKER_CLASS = 
"controller.rebalance.pre.checker.class";
   // Amount of the time the segment can take from the beginning of upload to 
the end of upload. Used when parallel push
   // protection is enabled. If the upload does not finish within the timeout, 
next upload can override the previous one.
   private static final String SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = 
"controller.segment.upload.timeoutInMillis";
@@ -316,6 +317,8 @@ public class ControllerConf extends PinotConfiguration {
   private static final String DEFAULT_ACCESS_CONTROL_PASSWORD = "admin";
   private static final String DEFAULT_LINEAGE_MANAGER =
       "org.apache.pinot.controller.helix.core.lineage.DefaultLineageManager";
+  private static final String DEFAULT_REBALANCE_PRE_CHECKER =
+      
"org.apache.pinot.controller.helix.core.rebalance.DefaultRebalancePreChecker";
   private static final long DEFAULT_SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = 
600_000L; // 10 minutes
   private static final int DEFAULT_MIN_NUM_CHARS_IN_IS_TO_TURN_ON_COMPRESSION 
= -1;
   private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 
64;
@@ -952,6 +955,14 @@ public class ControllerConf extends PinotConfiguration {
     setProperty(LINEAGE_MANAGER_CLASS, lineageModifierClass);
   }
 
+  public String getRebalancePreCheckerClass() {
+    return getProperty(REBALANCE_PRE_CHECKER_CLASS, 
DEFAULT_REBALANCE_PRE_CHECKER);
+  }
+
+  public void setRebalancePreCheckerClass(String rebalancePreCheckerClass) {
+    setProperty(REBALANCE_PRE_CHECKER_CLASS, rebalancePreCheckerClass);
+  }
+
   public long getSegmentUploadTimeoutInMillis() {
     return getProperty(SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS, 
DEFAULT_SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS);
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 37e365bc7f..600c75b718 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -948,7 +948,7 @@ public class PinotSegmentRestletResource {
           new TableMetadataReader(_executor, _connectionManager, 
_pinotHelixResourceManager);
       Map<String, JsonNode> needReloadMetadata =
           
tableMetadataReader.getServerCheckSegmentsReloadMetadata(tableNameWithType,
-              _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+              _controllerConf.getServerAdminRequestTimeoutSeconds() * 
1000).getServerReloadJsonResponses();
       boolean needReload =
           needReloadMetadata.values().stream().anyMatch(value -> 
value.get("needReload").booleanValue());
       Map<String, ServerSegmentsReloadCheckResponse> serverResponses = new 
HashMap<>();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 638849df46..74361a62a8 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -604,6 +604,8 @@ public class PinotTableRestletResource {
       @ApiParam(value = "OFFLINE|REALTIME", required = true) 
@QueryParam("type") String tableTypeStr,
       @ApiParam(value = "Whether to rebalance table in dry-run mode") 
@DefaultValue("false") @QueryParam("dryRun")
       boolean dryRun,
+      @ApiParam(value = "Whether to enable pre-checks for table, must be in 
dry-run mode to enable")
+      @DefaultValue("false") @QueryParam("preChecks") boolean preChecks,
       @ApiParam(value = "Whether to reassign instances before reassigning 
segments") @DefaultValue("false")
       @QueryParam("reassignInstances") boolean reassignInstances,
       @ApiParam(value = "Whether to reassign CONSUMING segments for real-time 
table") @DefaultValue("false")
@@ -644,6 +646,7 @@ public class PinotTableRestletResource {
     String tableNameWithType = constructTableNameWithType(tableName, 
tableTypeStr);
     RebalanceConfig rebalanceConfig = new RebalanceConfig();
     rebalanceConfig.setDryRun(dryRun);
+    rebalanceConfig.setPreChecks(preChecks);
     rebalanceConfig.setReassignInstances(reassignInstances);
     rebalanceConfig.setIncludeConsuming(includeConsuming);
     rebalanceConfig.setBootstrap(bootstrap);
@@ -663,8 +666,9 @@ public class PinotTableRestletResource {
     String rebalanceJobId = 
TableRebalancer.createUniqueRebalanceJobIdentifier();
 
     try {
-      if (dryRun || downtime) {
-        // For dry-run or rebalance with downtime, directly return the 
rebalance result as it should return immediately
+      if (dryRun || preChecks || downtime) {
+        // For dry-run, preChecks or rebalance with downtime, directly return 
the rebalance result as it should return
+        // immediately
         return _pinotHelixResourceManager.rebalanceTable(tableNameWithType, 
rebalanceConfig, rebalanceJobId, false);
       } else {
         // Make a dry-run first to get the target assignment
@@ -682,7 +686,8 @@ public class PinotTableRestletResource {
             } catch (Throwable t) {
               String errorMsg = String.format("Caught exception/error while 
rebalancing table: %s", tableNameWithType);
               LOGGER.error(errorMsg, t);
-              return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED, errorMsg, null, null, null);
+              return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED, errorMsg, null, null, null,
+                  null);
             }
           });
           boolean isJobIdPersisted = waitForRebalanceToPersist(
@@ -702,7 +707,8 @@ public class PinotTableRestletResource {
 
           return new RebalanceResult(dryRunResult.getJobId(), 
RebalanceResult.Status.IN_PROGRESS,
               "In progress, check controller logs for updates", 
dryRunResult.getInstanceAssignment(),
-              dryRunResult.getTierInstanceAssignment(), 
dryRunResult.getSegmentAssignment());
+              dryRunResult.getTierInstanceAssignment(), 
dryRunResult.getSegmentAssignment(),
+              dryRunResult.getPreChecksResult());
         } else {
           // If dry-run failed or is no-op, return the dry-run result
           return dryRunResult;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 1f983ce671..748f9d3c28 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -48,6 +48,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -151,6 +152,8 @@ import 
org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
+import org.apache.pinot.controller.helix.core.rebalance.RebalancePreChecker;
+import 
org.apache.pinot.controller.helix.core.rebalance.RebalancePreCheckerFactory;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
@@ -237,10 +240,12 @@ public class PinotHelixResourceManager {
   private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
   private TableCache _tableCache;
   private final LineageManager _lineageManager;
+  private final RebalancePreChecker _rebalancePreChecker;
 
   public PinotHelixResourceManager(String zkURL, String helixClusterName, 
@Nullable String dataDir,
       boolean isSingleTenantCluster, boolean enableBatchMessageMode, int 
deletedSegmentsRetentionInDays,
-      boolean enableTieredSegmentAssignment, LineageManager lineageManager) {
+      boolean enableTieredSegmentAssignment, LineageManager lineageManager, 
RebalancePreChecker rebalancePreChecker,
+      @Nullable ExecutorService executorService) {
     _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
     _helixClusterName = helixClusterName;
     _dataDir = dataDir;
@@ -263,13 +268,24 @@ public class PinotHelixResourceManager {
       _lineageUpdaterLocks[i] = new Object();
     }
     _lineageManager = lineageManager;
+    _rebalancePreChecker = rebalancePreChecker;
+    _rebalancePreChecker.init(this, executorService);
+  }
+
+  public PinotHelixResourceManager(ControllerConf controllerConf, @Nullable 
ExecutorService executorService) {
+    this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), 
controllerConf.getDataDir(),
+        controllerConf.tenantIsolationEnabled(), 
controllerConf.getEnableBatchMessageMode(),
+        controllerConf.getDeletedSegmentsRetentionInDays(), 
controllerConf.tieredSegmentAssignmentEnabled(),
+        LineageManagerFactory.create(controllerConf),
+        
RebalancePreCheckerFactory.create(controllerConf.getRebalancePreCheckerClass()),
 executorService);
   }
 
   public PinotHelixResourceManager(ControllerConf controllerConf) {
     this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), 
controllerConf.getDataDir(),
         controllerConf.tenantIsolationEnabled(), 
controllerConf.getEnableBatchMessageMode(),
         controllerConf.getDeletedSegmentsRetentionInDays(), 
controllerConf.tieredSegmentAssignmentEnabled(),
-        LineageManagerFactory.create(controllerConf));
+        LineageManagerFactory.create(controllerConf),
+        
RebalancePreCheckerFactory.create(controllerConf.getRebalancePreCheckerClass()),
 null);
   }
 
   /**
@@ -423,6 +439,15 @@ public class PinotHelixResourceManager {
     return _lineageManager;
   }
 
+  /**
+   * Get the rebalance pre-checker
+   *
+   * @return rebalance pre-checker
+   */
+  public RebalancePreChecker getRebalancePreChecker() {
+    return _rebalancePreChecker;
+  }
+
   /**
    * Instance related APIs
    */
@@ -3587,7 +3612,7 @@ public class PinotHelixResourceManager {
       tierToSegmentsMap = updateTargetTier(rebalanceJobId, tableNameWithType, 
tableConfig);
     }
     TableRebalancer tableRebalancer =
-        new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver, 
_controllerMetrics);
+        new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver, 
_controllerMetrics, _rebalancePreChecker);
     return tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
rebalanceJobId, tierToSegmentsMap);
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
new file mode 100644
index 0000000000..945a98e48e
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.TableMetadataReader;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DefaultRebalancePreChecker implements RebalancePreChecker {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultRebalancePreChecker.class);
+
+  public static final String NEEDS_RELOAD_STATUS = "needsReloadStatus";
+  public static final String IS_MINIMIZE_DATA_MOVEMENT = 
"isMinimizeDataMovement";
+
+  private PinotHelixResourceManager _pinotHelixResourceManager;
+  private ExecutorService _executorService;
+
+  @Override
+  public void init(PinotHelixResourceManager pinotHelixResourceManager, 
@Nullable ExecutorService executorService) {
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _executorService = executorService;
+  }
+
+  @Override
+  public Map<String, String> check(String rebalanceJobId, String 
tableNameWithType,
+      TableConfig tableConfig) {
+    LOGGER.info("Start pre-checks for table: {} with rebalanceJobId: {}", 
tableNameWithType, rebalanceJobId);
+
+    Map<String, String> preCheckResult = new HashMap<>();
+    // Check for reload status
+    Boolean needsReload = checkReloadNeededOnServers(rebalanceJobId, 
tableNameWithType);
+    preCheckResult.put(NEEDS_RELOAD_STATUS, needsReload == null ? "error" : 
String.valueOf(needsReload));
+    // Check whether minimizeDataMovement is set in TableConfig
+    boolean isMinimizeDataMovement = 
checkIsMinimizeDataMovement(rebalanceJobId, tableNameWithType, tableConfig);
+    preCheckResult.put(IS_MINIMIZE_DATA_MOVEMENT, 
String.valueOf(isMinimizeDataMovement));
+
+    LOGGER.info("End pre-checks for table: {} with rebalanceJobId: {}", 
tableNameWithType, rebalanceJobId);
+    return preCheckResult;
+  }
+
+  /**
+   * Checks if the current segments on any servers needs a reload (table 
config or schema change that hasn't been
+   * applied yet). This check does not guarantee that the segments in deep 
store are up to date.
+   * TODO: Add an API to check for whether segments in deep store are up to 
date with the table configs and schema
+   *       and add a pre-check here to call that API.
+   */
+  private Boolean checkReloadNeededOnServers(String rebalanceJobId, String 
tableNameWithType) {
+    LOGGER.info("Fetching whether reload is needed for table: {} with 
rebalanceJobId: {}", tableNameWithType,
+        rebalanceJobId);
+    Boolean needsReload = null;
+    if (_executorService == null) {
+      LOGGER.warn("Executor service is null, skipping needsReload check for 
table: {} rebalanceJobId: {}",
+          tableNameWithType, rebalanceJobId);
+      return needsReload;
+    }
+    try (PoolingHttpClientConnectionManager connectionManager = new 
PoolingHttpClientConnectionManager()) {
+      TableMetadataReader metadataReader = new 
TableMetadataReader(_executorService, connectionManager,
+          _pinotHelixResourceManager);
+      TableMetadataReader.TableReloadJsonResponse needsReloadMetadataPair =
+          
metadataReader.getServerCheckSegmentsReloadMetadata(tableNameWithType, 30_000);
+      Map<String, JsonNode> needsReloadMetadata = 
needsReloadMetadataPair.getServerReloadJsonResponses();
+      int failedResponses = needsReloadMetadataPair.getNumFailedResponses();
+      LOGGER.info("Received {} needs reload responses and {} failed responses 
from servers for table: {} with "
+              + "rebalanceJobId: {}", needsReloadMetadata.size(), 
failedResponses, tableNameWithType, rebalanceJobId);
+      needsReload = needsReloadMetadata.values().stream().anyMatch(value -> 
value.get("needReload").booleanValue());
+      if (needsReload) {
+        return needsReload;
+      }
+      if (failedResponses > 0) {
+        LOGGER.warn("Received {} failed responses from servers and needsReload 
is false from returned responses, "
+            + "check needsReload status manually", failedResponses);
+        needsReload = null;
+      }
+    } catch (InvalidConfigException | IOException e) {
+      LOGGER.warn("Caught exception while trying to fetch reload status from 
servers", e);
+    }
+
+    return needsReload;
+  }
+
+  /**
+   * Checks if minimize data movement is set for the given table in the 
TableConfig
+   */
+  private boolean checkIsMinimizeDataMovement(String rebalanceJobId, String 
tableNameWithType,
+      TableConfig tableConfig) {
+    LOGGER.info("Checking whether minimizeDataMovement is set for table: {} 
with rebalanceJobId: {}", tableNameWithType,
+        rebalanceJobId);
+    try {
+      if (tableConfig.getTableType() == TableType.OFFLINE) {
+        InstanceAssignmentConfig instanceAssignmentConfig =
+            
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig, 
InstancePartitionsType.OFFLINE);
+        return instanceAssignmentConfig.isMinimizeDataMovement();
+      } else {
+        InstanceAssignmentConfig instanceAssignmentConfigConsuming =
+            
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig, 
InstancePartitionsType.CONSUMING);
+        // For REALTIME tables need to check for both CONSUMING and COMPLETED 
segments if relocation is enabled
+        if 
(!InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig)) {
+          return instanceAssignmentConfigConsuming.isMinimizeDataMovement();
+        }
+
+        InstanceAssignmentConfig instanceAssignmentConfigCompleted =
+            
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig, 
InstancePartitionsType.COMPLETED);
+        return instanceAssignmentConfigConsuming.isMinimizeDataMovement()
+            && instanceAssignmentConfigCompleted.isMinimizeDataMovement();
+      }
+    } catch (IllegalStateException e) {
+      LOGGER.warn("Error while trying to fetch instance assignment config, 
assuming minimizeDataMovement is false", e);
+      return false;
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
index b6b471cfbc..e5bd39f0ec 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
@@ -34,6 +34,12 @@ public class RebalanceConfig {
   @ApiModelProperty(example = "false")
   private boolean _dryRun = false;
 
+  // Whether to perform pre-checks for rebalance. This only returns the status 
of each pre-check and does not fail
+  // rebalance
+  @JsonProperty("preChecks")
+  @ApiModelProperty(example = "false")
+  private boolean _preChecks = false;
+
   // Whether to reassign instances before reassigning segments
   @JsonProperty("reassignInstances")
   @ApiModelProperty(example = "false")
@@ -118,6 +124,14 @@ public class RebalanceConfig {
     _dryRun = dryRun;
   }
 
+  public boolean isPreChecks() {
+    return _preChecks;
+  }
+
+  public void setPreChecks(boolean preChecks) {
+    _preChecks = preChecks;
+  }
+
   public boolean isReassignInstances() {
     return _reassignInstances;
   }
@@ -232,10 +246,10 @@ public class RebalanceConfig {
 
   @Override
   public String toString() {
-    return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", _reassignInstances=" 
+ _reassignInstances
-        + ", _includeConsuming=" + _includeConsuming + ", _bootstrap=" + 
_bootstrap + ", _downtime=" + _downtime
-        + ", _minAvailableReplicas=" + _minAvailableReplicas + ", 
_bestEfforts=" + _bestEfforts
-        + ", _externalViewCheckIntervalInMs=" + _externalViewCheckIntervalInMs
+    return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", preChecks=" + 
_preChecks + ", _reassignInstances="
+        + _reassignInstances + ", _includeConsuming=" + _includeConsuming + ", 
_bootstrap=" + _bootstrap
+        + ", _downtime=" + _downtime + ", _minAvailableReplicas=" + 
_minAvailableReplicas + ", _bestEfforts="
+        + _bestEfforts + ", _externalViewCheckIntervalInMs=" + 
_externalViewCheckIntervalInMs
         + ", _externalViewStabilizationTimeoutInMs=" + 
_externalViewStabilizationTimeoutInMs + ", _updateTargetTier="
         + _updateTargetTier + ", _heartbeatIntervalInMs=" + 
_heartbeatIntervalInMs + ", _heartbeatTimeoutInMs="
         + _heartbeatTimeoutInMs + ", _maxAttempts=" + _maxAttempts + ", 
_retryInitialDelayInMs="
@@ -245,6 +259,7 @@ public class RebalanceConfig {
   public static RebalanceConfig copy(RebalanceConfig cfg) {
     RebalanceConfig rc = new RebalanceConfig();
     rc._dryRun = cfg._dryRun;
+    rc._preChecks = cfg._preChecks;
     rc._reassignInstances = cfg._reassignInstances;
     rc._includeConsuming = cfg._includeConsuming;
     rc._bootstrap = cfg._bootstrap;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java
new file mode 100644
index 0000000000..54f15e8b33
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+
+
+public interface RebalancePreChecker {
+  void init(PinotHelixResourceManager pinotHelixResourceManager, @Nullable 
ExecutorService executorService);
+  Map<String, String> check(String rebalanceJobId, String tableNameWithType, 
TableConfig tableConfig);
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreCheckerFactory.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreCheckerFactory.java
new file mode 100644
index 0000000000..286942dc78
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreCheckerFactory.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RebalancePreCheckerFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RebalancePreCheckerFactory.class);
+
+  private RebalancePreCheckerFactory() {
+  }
+
+  public static RebalancePreChecker create(String 
rebalancePreCheckerClassName) {
+    try {
+      LOGGER.info("Trying to create rebalance pre-checker object for class: 
{}", rebalancePreCheckerClassName);
+      return (RebalancePreChecker) 
Class.forName(rebalancePreCheckerClassName).newInstance();
+    } catch (Exception e) {
+      String errMsg = String.format("Failed to create rebalance pre-checker 
for class: %s",
+          rebalancePreCheckerClassName);
+      LOGGER.error(errMsg, e);
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
index 2be7fc7753..2b81d8d78b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
@@ -40,6 +40,8 @@ public class RebalanceResult {
   @JsonInclude(JsonInclude.Include.NON_NULL)
   private final Map<String, Map<String, String>> _segmentAssignment;
   private final String _description;
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final Map<String, String> _preChecksResult;
 
   @JsonCreator
   public RebalanceResult(@JsonProperty(value = "jobId", required = true) 
String jobId,
@@ -47,13 +49,15 @@ public class RebalanceResult {
       @JsonProperty(value = "description", required = true) String description,
       @JsonProperty("instanceAssignment") @Nullable 
Map<InstancePartitionsType, InstancePartitions> instanceAssignment,
       @JsonProperty("tierInstanceAssignment") @Nullable Map<String, 
InstancePartitions> tierInstanceAssignment,
-      @JsonProperty("segmentAssignment") @Nullable Map<String, Map<String, 
String>> segmentAssignment) {
+      @JsonProperty("segmentAssignment") @Nullable Map<String, Map<String, 
String>> segmentAssignment,
+      @JsonProperty("preChecksResult") @Nullable Map<String, String> 
preChecksResult) {
     _jobId = jobId;
     _status = status;
     _description = description;
     _instanceAssignment = instanceAssignment;
     _tierInstanceAssignment = tierInstanceAssignment;
     _segmentAssignment = segmentAssignment;
+    _preChecksResult = preChecksResult;
   }
 
   @JsonProperty
@@ -86,6 +90,11 @@ public class RebalanceResult {
     return _segmentAssignment;
   }
 
+  @JsonProperty
+  public Map<String, String> getPreChecksResult() {
+    return _preChecksResult;
+  }
+
   public enum Status {
     // FAILED if the job has ended with known exceptions;
     // ABORTED if the job is stopped by others but retry is still allowed;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index facd904f21..5dc7c4d8ea 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -119,9 +119,10 @@ public class TableRebalancer {
   private final HelixDataAccessor _helixDataAccessor;
   private final TableRebalanceObserver _tableRebalanceObserver;
   private final ControllerMetrics _controllerMetrics;
+  private final RebalancePreChecker _rebalancePreChecker;
 
   public TableRebalancer(HelixManager helixManager, @Nullable 
TableRebalanceObserver tableRebalanceObserver,
-      @Nullable ControllerMetrics controllerMetrics) {
+      @Nullable ControllerMetrics controllerMetrics, @Nullable 
RebalancePreChecker rebalancePreChecker) {
     _helixManager = helixManager;
     if (tableRebalanceObserver != null) {
       _tableRebalanceObserver = tableRebalanceObserver;
@@ -130,10 +131,11 @@ public class TableRebalancer {
     }
     _helixDataAccessor = helixManager.getHelixDataAccessor();
     _controllerMetrics = controllerMetrics;
+    _rebalancePreChecker = rebalancePreChecker;
   }
 
   public TableRebalancer(HelixManager helixManager) {
-    this(helixManager, null, null);
+    this(helixManager, null, null, null);
   }
 
   public static String createUniqueRebalanceJobIdentifier() {
@@ -173,6 +175,7 @@ public class TableRebalancer {
       rebalanceJobId = createUniqueRebalanceJobIdentifier();
     }
     boolean dryRun = rebalanceConfig.isDryRun();
+    boolean preChecks = rebalanceConfig.isPreChecks();
     boolean reassignInstances = rebalanceConfig.isReassignInstances();
     boolean includeConsuming = rebalanceConfig.isIncludeConsuming();
     boolean bootstrap = rebalanceConfig.isBootstrap();
@@ -186,13 +189,29 @@ public class TableRebalancer {
         && 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
         tableConfig.getRoutingConfig().getInstanceSelectorType());
     LOGGER.info(
-        "Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, 
includeConsuming: {}, bootstrap: {}, "
-            + "downtime: {}, minReplicasToKeepUpForNoDowntime: {}, 
enableStrictReplicaGroup: {}, lowDiskMode: {}, "
-            + "bestEfforts: {}, externalViewCheckIntervalInMs: {}, 
externalViewStabilizationTimeoutInMs: {}",
-        tableNameWithType, dryRun, reassignInstances, includeConsuming, 
bootstrap, downtime,
+        "Start rebalancing table: {} with dryRun: {}, preChecks: {}, 
reassignInstances: {}, includeConsuming: {}, "
+            + "bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime: 
{}, enableStrictReplicaGroup: {}, "
+            + "lowDiskMode: {}, bestEfforts: {}, 
externalViewCheckIntervalInMs: {}, "
+            + "externalViewStabilizationTimeoutInMs: {}",
+        tableNameWithType, dryRun, preChecks, reassignInstances, 
includeConsuming, bootstrap, downtime,
         minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup, 
lowDiskMode, bestEfforts,
         externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs);
 
+    // Perform pre-checks if enabled
+    Map<String, String> preChecksResult = null;
+    if (preChecks) {
+      if (!dryRun) {
+        // Dry-run must be enabled to run pre-checks
+        String errorMsg = String.format("Pre-checks can only be enabled in 
dry-run mode, not triggering rebalance for "
+            + "table: %s with rebalanceJobId: %s", tableNameWithType, 
rebalanceJobId);
+        LOGGER.error(errorMsg);
+        return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED, errorMsg, null, null, null, null);
+      }
+      if (_rebalancePreChecker != null) {
+        preChecksResult = _rebalancePreChecker.check(rebalanceJobId, 
tableNameWithType, tableConfig);
+      }
+    }
+
     // Fetch ideal state
     PropertyKey idealStatePropertyKey = 
_helixDataAccessor.keyBuilder().idealStates(tableNameWithType);
     IdealState currentIdealState;
@@ -203,21 +222,21 @@ public class TableRebalancer {
           "For rebalanceId: %s, caught exception while fetching IdealState for 
table: %s, aborting the rebalance",
           rebalanceJobId, tableNameWithType), e);
       return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
-          "Caught exception while fetching IdealState: " + e, null, null, 
null);
+          "Caught exception while fetching IdealState: " + e, null, null, 
null, preChecksResult);
     }
     if (currentIdealState == null) {
       onReturnFailure(
           String.format("For rebalanceId: %s, cannot find the IdealState for 
table: %s, aborting the rebalance",
               rebalanceJobId, tableNameWithType), null);
       return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED, "Cannot find the IdealState for table",
-          null, null, null);
+          null, null, null, preChecksResult);
     }
     if (!currentIdealState.isEnabled() && !downtime) {
       onReturnFailure(String.format(
           "For rebalanceId: %s, cannot rebalance disabled table: %s without 
downtime, aborting the rebalance",
           rebalanceJobId, tableNameWithType), null);
       return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
-          "Cannot rebalance disabled table without downtime", null, null, 
null);
+          "Cannot rebalance disabled table without downtime", null, null, 
null, preChecksResult);
     }
 
     LOGGER.info("For rebalanceId: {}, processing instance partitions for 
table: {}", rebalanceJobId, tableNameWithType);
@@ -235,7 +254,7 @@ public class TableRebalancer {
           "For rebalanceId: %s, caught exception while fetching/calculating 
instance partitions for table: %s, "
               + "aborting the rebalance", rebalanceJobId, tableNameWithType), 
e);
       return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
-          "Caught exception while fetching/calculating instance partitions: " 
+ e, null, null, null);
+          "Caught exception while fetching/calculating instance partitions: " 
+ e, null, null, null, preChecksResult);
     }
 
     // Calculate instance partitions for tiers if configured
@@ -253,7 +272,8 @@ public class TableRebalancer {
           "For rebalanceId: %s, caught exception while fetching/calculating 
tier instance partitions for table: %s, "
               + "aborting the rebalance", rebalanceJobId, tableNameWithType), 
e);
       return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
-          "Caught exception while fetching/calculating tier instance 
partitions: " + e, null, null, null);
+          "Caught exception while fetching/calculating tier instance 
partitions: " + e, null, null, null,
+          preChecksResult);
     }
 
     LOGGER.info("For rebalanceId: {}, calculating the target assignment for 
table: {}", rebalanceJobId,
@@ -271,7 +291,7 @@ public class TableRebalancer {
               + "rebalance", rebalanceJobId, tableNameWithType), e);
       return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
           "Caught exception while calculating target assignment: " + e, 
instancePartitionsMap,
-          tierToInstancePartitionsMap, null);
+          tierToInstancePartitionsMap, null, preChecksResult);
     }
 
     boolean segmentAssignmentUnchanged = 
currentAssignment.equals(targetAssignment);
@@ -286,19 +306,19 @@ public class TableRebalancer {
             String.format("For rebalanceId: %s, instance unchanged and table: 
%s is already balanced", rebalanceJobId,
                 tableNameWithType));
         return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.NO_OP, "Table is already balanced",
-            instancePartitionsMap, tierToInstancePartitionsMap, 
targetAssignment);
+            instancePartitionsMap, tierToInstancePartitionsMap, 
targetAssignment, preChecksResult);
       } else {
         if (dryRun) {
           return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.DONE,
               "Instance reassigned in dry-run mode, table is already 
balanced", instancePartitionsMap,
-              tierToInstancePartitionsMap, targetAssignment);
+              tierToInstancePartitionsMap, targetAssignment, preChecksResult);
         } else {
           _tableRebalanceObserver.onSuccess(
               String.format("For rebalanceId: %s, instance reassigned but 
table: %s is already balanced",
                   rebalanceJobId, tableNameWithType));
           return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.DONE,
               "Instance reassigned, table is already balanced", 
instancePartitionsMap, tierToInstancePartitionsMap,
-              targetAssignment);
+              targetAssignment, preChecksResult);
         }
       }
     }
@@ -307,7 +327,7 @@ public class TableRebalancer {
       LOGGER.info("For rebalanceId: {}, rebalancing table: {} in dry-run mode, 
returning the target assignment",
           rebalanceJobId, tableNameWithType);
       return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE, 
"Dry-run mode", instancePartitionsMap,
-          tierToInstancePartitionsMap, targetAssignment);
+          tierToInstancePartitionsMap, targetAssignment, preChecksResult);
     }
 
     if (downtime) {
@@ -332,14 +352,14 @@ public class TableRebalancer {
         return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
             "Success with downtime (replaced IdealState with the target 
segment assignment, ExternalView might not "
                 + "reach the target segment assignment yet)", 
instancePartitionsMap, tierToInstancePartitionsMap,
-            targetAssignment);
+            targetAssignment, preChecksResult);
       } catch (Exception e) {
         onReturnFailure(String.format(
             "For rebalanceId: %s, caught exception while updating IdealState 
for table: %s, aborting the rebalance",
             rebalanceJobId, tableNameWithType), e);
         return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED,
             "Caught exception while updating IdealState: " + e, 
instancePartitionsMap, tierToInstancePartitionsMap,
-            targetAssignment);
+            targetAssignment, preChecksResult);
       }
     }
 
@@ -371,7 +391,7 @@ public class TableRebalancer {
             minReplicasToKeepUpForNoDowntime, tableNameWithType, numReplicas), 
null);
         return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED,
             "Illegal min available replicas config", instancePartitionsMap, 
tierToInstancePartitionsMap,
-            targetAssignment);
+            targetAssignment, preChecksResult);
       }
       minAvailableReplicas = minReplicasToKeepUpForNoDowntime;
     } else {
@@ -412,12 +432,12 @@ public class TableRebalancer {
         if (_tableRebalanceObserver.isStopped()) {
           return new RebalanceResult(rebalanceJobId, 
_tableRebalanceObserver.getStopStatus(),
               "Caught exception while waiting for ExternalView to converge: " 
+ e, instancePartitionsMap,
-              tierToInstancePartitionsMap, targetAssignment);
+              tierToInstancePartitionsMap, targetAssignment, preChecksResult);
         }
         _tableRebalanceObserver.onError(errorMsg);
         return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED,
             "Caught exception while waiting for ExternalView to converge: " + 
e, instancePartitionsMap,
-            tierToInstancePartitionsMap, targetAssignment);
+            tierToInstancePartitionsMap, targetAssignment, preChecksResult);
       }
 
       // Re-calculate the target assignment if IdealState changed while 
waiting for ExternalView to converge
@@ -466,7 +486,7 @@ public class TableRebalancer {
                     + "aborting the rebalance", rebalanceJobId, 
tableNameWithType), e);
             return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED,
                 "Caught exception while re-calculating the target assignment: 
" + e, instancePartitionsMap,
-                tierToInstancePartitionsMap, targetAssignment);
+                tierToInstancePartitionsMap, targetAssignment, 
preChecksResult);
           }
         } else {
           LOGGER.info("For rebalanceId:{}, no state change found for segments 
to be moved, "
@@ -490,7 +510,7 @@ public class TableRebalancer {
         return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
             "Success with minAvailableReplicas: " + minAvailableReplicas
                 + " (both IdealState and ExternalView should reach the target 
segment assignment)",
-            instancePartitionsMap, tierToInstancePartitionsMap, 
targetAssignment);
+            instancePartitionsMap, tierToInstancePartitionsMap, 
targetAssignment, preChecksResult);
       }
 
       // Record change of current ideal state and the new target
@@ -499,7 +519,7 @@ public class TableRebalancer {
       if (_tableRebalanceObserver.isStopped()) {
         return new RebalanceResult(rebalanceJobId, 
_tableRebalanceObserver.getStopStatus(),
             "Rebalance has stopped already before updating the IdealState", 
instancePartitionsMap,
-            tierToInstancePartitionsMap, targetAssignment);
+            tierToInstancePartitionsMap, targetAssignment, preChecksResult);
       }
       Map<String, Map<String, String>> nextAssignment =
           getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
@@ -530,7 +550,7 @@ public class TableRebalancer {
             + "aborting the rebalance", rebalanceJobId, tableNameWithType), e);
         return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED,
             "Caught exception while updating IdealState: " + e, 
instancePartitionsMap, tierToInstancePartitionsMap,
-            targetAssignment);
+            targetAssignment, preChecksResult);
       }
 
       segmentsToMonitor = new HashSet<>(segmentsToMove);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
index ec55c38b95..e115476529 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
@@ -60,7 +60,7 @@ public class DefaultTenantRebalancer implements 
TenantRebalancer {
                 false));
       } catch (TableNotFoundException exception) {
         rebalanceResult.put(table, new RebalanceResult(null, 
RebalanceResult.Status.FAILED, exception.getMessage(),
-            null, null, null));
+            null, null, null, null));
       }
     });
     if (config.isDryRun()) {
@@ -71,7 +71,7 @@ public class DefaultTenantRebalancer implements 
TenantRebalancer {
         if (result.getStatus() == RebalanceResult.Status.DONE) {
           rebalanceResult.put(table, new RebalanceResult(result.getJobId(), 
RebalanceResult.Status.IN_PROGRESS,
               "In progress, check controller task status for the", 
result.getInstanceAssignment(),
-              result.getTierInstanceAssignment(), 
result.getSegmentAssignment()));
+              result.getTierInstanceAssignment(), 
result.getSegmentAssignment(), result.getPreChecksResult()));
         }
       }
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
index 57c17054d7..c96c25b250 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
@@ -36,7 +36,7 @@ public class TenantRebalanceResult {
       _rebalanceTableResults = new HashMap<>();
       rebalanceTableResults.forEach((table, result) -> {
         _rebalanceTableResults.put(table, new 
RebalanceResult(result.getJobId(), result.getStatus(),
-            result.getDescription(), null, null, null));
+            result.getDescription(), null, null, null, null));
       });
     }
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
index 8dde7f08fe..0376b90dac 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
@@ -222,8 +222,8 @@ public class ServerSegmentMetadataReader {
    * This method will return metadata of all the servers along with need 
reload flag.
    * In future additional details like segments list can also be added
    */
-  public List<String> getCheckReloadSegmentsFromServer(String 
tableNameWithType, Set<String> serverInstances,
-      BiMap<String, String> endpoints, int timeoutMs) {
+  public TableReloadResponse getCheckReloadSegmentsFromServer(String 
tableNameWithType,
+      Set<String> serverInstances, BiMap<String, String> endpoints, int 
timeoutMs) {
     LOGGER.debug("Checking if reload is needed on segments from servers for 
table {}.", tableNameWithType);
     List<String> serverURLs = new ArrayList<>();
     for (String serverInstance : serverInstances) {
@@ -250,7 +250,7 @@ public class ServerSegmentMetadataReader {
     }
 
     LOGGER.debug("Retrieved metadata of reload check from servers.");
-    return serversNeedReloadResponses;
+    return new TableReloadResponse(serviceResponse._failedResponseCount, 
serversNeedReloadResponses);
   }
 
   /**
@@ -505,4 +505,22 @@ public class ServerSegmentMetadataReader {
     tableNameWithType = URLEncoder.encode(tableNameWithType, 
StandardCharsets.UTF_8);
     return String.format("%s/tables/%s/segments/isStale", endpoint, 
tableNameWithType);
   }
+
+  public class TableReloadResponse {
+    private int _numFailedResponses;
+    private List<String> _serverReloadResponses;
+
+    TableReloadResponse(int numFailedResponses, List<String> 
serverReloadResponses) {
+      _numFailedResponses = numFailedResponses;
+      _serverReloadResponses = serverReloadResponses;
+    }
+
+    public int getNumFailedResponses() {
+      return _numFailedResponses;
+    }
+
+    public List<String> getServerReloadResponses() {
+      return _serverReloadResponses;
+    }
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
index 48f53577a8..c6ba7aa59f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
@@ -59,19 +59,26 @@ public class TableMetadataReader {
     _pinotHelixResourceManager = helixResourceManager;
   }
 
-  public Map<String, JsonNode> getServerCheckSegmentsReloadMetadata(String 
tableNameWithType, int timeoutMs)
+  /**
+   * Check if segments need a reload on any servers
+   * @return pair of: a) number of failed responses, b) reload responses 
returned
+   */
+  public TableReloadJsonResponse getServerCheckSegmentsReloadMetadata(String 
tableNameWithType,
+      int timeoutMs)
       throws InvalidConfigException, IOException {
-    List<String> segmentsMetadata = getReloadCheckResponses(tableNameWithType, 
timeoutMs);
+    ServerSegmentMetadataReader.TableReloadResponse segmentsMetadataPair =
+        getReloadCheckResponses(tableNameWithType, timeoutMs);
+    List<String> segmentsMetadata = 
segmentsMetadataPair.getServerReloadResponses();
     Map<String, JsonNode> response = new HashMap<>();
     for (String segmentMetadata : segmentsMetadata) {
       JsonNode responseJson = JsonUtils.stringToJsonNode(segmentMetadata);
       response.put(responseJson.get("instanceId").asText(), responseJson);
     }
-    return response;
+    return new 
TableReloadJsonResponse(segmentsMetadataPair.getNumFailedResponses(), response);
   }
 
-  public List<String> getReloadCheckResponses(String tableNameWithType, int 
timeoutMs)
-      throws InvalidConfigException {
+  public ServerSegmentMetadataReader.TableReloadResponse 
getReloadCheckResponses(String tableNameWithType,
+      int timeoutMs) throws InvalidConfigException {
     TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
     List<String> serverInstances = 
_pinotHelixResourceManager.getServerInstancesForTable(tableNameWithType, 
tableType);
     Set<String> serverInstanceSet = new HashSet<>(serverInstances);
@@ -213,4 +220,22 @@ public class TableMetadataReader {
     return 
serverSegmentMetadataReader.getStaleSegmentsFromServer(tableNameWithType, 
serverInstanceSet, endpoints,
         timeoutMs);
   }
+
+  public class TableReloadJsonResponse {
+    private int _numFailedResponses;
+    private Map<String, JsonNode> _serverReloadJsonResponses;
+
+    TableReloadJsonResponse(int numFailedResponses, Map<String, JsonNode> 
serverReloadJsonResponses) {
+      _numFailedResponses = numFailedResponses;
+      _serverReloadJsonResponses = serverReloadJsonResponses;
+    }
+
+    public int getNumFailedResponses() {
+      return _numFailedResponses;
+    }
+
+    public Map<String, JsonNode> getServerReloadJsonResponses() {
+      return _serverReloadJsonResponses;
+    }
+  }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index c5ac9e82bd..de83ec6eee 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.assignment.InstancePartitionsUtils;
@@ -90,7 +91,9 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
       addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + 
i, true);
     }
 
-    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
+    DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
+    preChecker.init(_helixResourceManager, Executors.newFixedThreadPool(10));
+    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, 
null, preChecker);
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
 
@@ -137,8 +140,18 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     // Rebalance in dry-run mode
     RebalanceConfig rebalanceConfig = new RebalanceConfig();
     rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setPreChecks(true);
     rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    Map<String, String> preCheckResult = rebalanceResult.getPreChecksResult();
+    assertNotNull(preCheckResult);
+    assertEquals(preCheckResult.size(), 2);
+    
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS));
+    
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT));
+    // Sending request to servers should fail for all, so needsPreprocess 
should be set to "error" to indicate that a
+    // manual check is needed
+    
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS),
 "error");
+    
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT),
 "false");
 
     // All servers should be assigned to the table
     instanceAssignment = rebalanceResult.getInstanceAssignment();
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 054d8393cf..6750a7340d 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -34,10 +34,13 @@ import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.regex.Pattern;
@@ -61,10 +64,16 @@ import 
org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.common.utils.SimpleHttpResponse;
 import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.rebalance.DefaultRebalancePreChecker;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
 import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
 import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
 import 
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
+import org.apache.pinot.server.starter.helix.BaseServerStarter;
 import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
@@ -73,6 +82,10 @@ import org.apache.pinot.spi.config.table.QueryConfig;
 import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
+import org.apache.pinot.spi.config.table.assignment.InstanceConstraintConfig;
+import 
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
@@ -161,6 +174,9 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
   // Once this value is set, assert that table size always gets back to this 
value after removing the added indices.
   private long _tableSize;
 
+  private PinotHelixResourceManager _resourceManager;
+  private TableRebalancer _tableRebalancer;
+
   protected int getNumBrokers() {
     return NUM_BROKERS;
   }
@@ -268,6 +284,11 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     waitForAllDocsLoaded(600_000L);
 
     _tableSize = getTableSize(getTableName());
+
+    _resourceManager = _controllerStarter.getHelixResourceManager();
+    DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
+    preChecker.init(_helixResourceManager, Executors.newFixedThreadPool(10));
+    _tableRebalancer = new 
TableRebalancer(_resourceManager.getHelixZkManager(), null, null, preChecker);
   }
 
   private void reloadAllSegments(String testQuery, boolean forceDownload, long 
numTotalDocs)
@@ -777,6 +798,104 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     }, 60_000L, "Failed to execute query");
   }
 
+  @Test
+  public void testRebalancePreChecks()
+      throws Exception {
+    // setup the rebalance config
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+
+    TableConfig tableConfig = getOfflineTableConfig();
+
+    // Ensure pre-check status is null if not enabled
+    RebalanceResult rebalanceResult = _tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+    assertNull(rebalanceResult.getPreChecksResult());
+
+    // Enable pre-checks, nothing is set
+    rebalanceConfig.setPreChecks(true);
+    rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    checkRebalancePreCheckStatus(rebalanceResult, 
RebalanceResult.Status.NO_OP, false, false);
+
+    // Enable minimizeDataMovement
+    
tableConfig.setInstanceAssignmentConfigMap(createInstanceAssignmentConfigMap());
+    rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    checkRebalancePreCheckStatus(rebalanceResult, 
RebalanceResult.Status.NO_OP, true, false);
+
+    // Undo minimizeDataMovement, update the table config to add a column to 
bloom filter
+    tableConfig.getIndexingConfig().getBloomFilterColumns().add("Quarter");
+    tableConfig.setInstanceAssignmentConfigMap(null);
+    updateTableConfig(tableConfig);
+    rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    checkRebalancePreCheckStatus(rebalanceResult, 
RebalanceResult.Status.NO_OP, false, true);
+
+    // Undo tableConfig change
+    tableConfig.getIndexingConfig().getBloomFilterColumns().remove("Quarter");
+    updateTableConfig(tableConfig);
+    rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    checkRebalancePreCheckStatus(rebalanceResult, 
RebalanceResult.Status.NO_OP, false, false);
+
+    // Add a schema change
+    Schema schema = createSchema();
+    schema.addField(new MetricFieldSpec("NewAddedIntMetric", DataType.INT, 1));
+    updateSchema(schema);
+    rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    checkRebalancePreCheckStatus(rebalanceResult, 
RebalanceResult.Status.NO_OP, false, true);
+
+    // Keep schema change and update table config to add minimizeDataMovement
+    
tableConfig.setInstanceAssignmentConfigMap(createInstanceAssignmentConfigMap());
+    rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    checkRebalancePreCheckStatus(rebalanceResult, 
RebalanceResult.Status.NO_OP, true, true);
+
+    // Add a new server (to force change in instance assignment) and enable 
reassignInstances
+    BaseServerStarter serverStarter1 = startOneServer(NUM_SERVERS);
+    rebalanceConfig.setReassignInstances(true);
+    tableConfig.setInstanceAssignmentConfigMap(null);
+    rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.DONE, 
false, true);
+
+    // Disable dry-run
+    rebalanceConfig.setDryRun(false);
+    rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertNull(rebalanceResult.getPreChecksResult());
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
+
+    // Stop the added server
+    serverStarter1.stop();
+    TestUtils.waitForCondition(aVoid -> 
_resourceManager.dropInstance(serverStarter1.getInstanceId()).isSuccessful(),
+        60_000L, "Failed to drop added server");
+  }
+
+  private void checkRebalancePreCheckStatus(RebalanceResult rebalanceResult, 
RebalanceResult.Status expectedStatus,
+      boolean expectedMinimizeDataMovement, boolean expectedNeedsReloadStatus) 
{
+    assertEquals(rebalanceResult.getStatus(), expectedStatus);
+    Map<String, String> preChecksResult = rebalanceResult.getPreChecksResult();
+    assertNotNull(preChecksResult);
+    assertEquals(preChecksResult.size(), 2);
+    
assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT));
+    
assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS));
+    
assertEquals(preChecksResult.get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT),
+        String.valueOf(expectedMinimizeDataMovement));
+    
assertEquals(preChecksResult.get(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS),
+        String.valueOf(expectedNeedsReloadStatus));
+  }
+
+  private Map<String, InstanceAssignmentConfig> 
createInstanceAssignmentConfigMap() {
+    InstanceTagPoolConfig instanceTagPoolConfig =
+        new InstanceTagPoolConfig("tag", false, 1, null);
+    List<String> constraints = new ArrayList<>();
+    constraints.add("constraints1");
+    InstanceConstraintConfig instanceConstraintConfig = new 
InstanceConstraintConfig(constraints);
+    InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig =
+        new InstanceReplicaGroupPartitionConfig(true, 1, 1,
+            1, 1, 1, true,
+            null);
+    InstanceAssignmentConfig instanceAssignmentConfig = new 
InstanceAssignmentConfig(instanceTagPoolConfig,
+        instanceConstraintConfig, instanceReplicaGroupPartitionConfig, null, 
true);
+    Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = new 
HashMap<>();
+    instanceAssignmentConfigMap.put("OFFLINE", instanceAssignmentConfig);
+    return instanceAssignmentConfigMap;
+  }
+
   @Test(dataProvider = "useBothQueryEngines")
   public void testRegexpReplace(boolean useMultiStageQueryEngine)
       throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to