somandal commented on code in PR #15175:
URL: https://github.com/apache/pinot/pull/15175#discussion_r1994283360


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -138,4 +157,80 @@ private boolean checkIsMinimizeDataMovement(String 
rebalanceJobId, String tableN
       return false;
     }
   }
+
+  private String checkDiskUtilization(String tableNameWithType, Map<String, 
Map<String, String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment,
+      TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, double 
threshold) {
+    boolean isDiskUtilSafe = true;
+    StringBuilder message = new StringBuilder("UNSAFE. Servers with unsafe 
disk util footprint: ");
+    String sep = "";
+    Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
currentAssignment.entrySet()) {
+      for (String segmentKey : entrySet.getValue().keySet()) {
+        existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new 
HashSet<>()).add(entrySet.getKey());
+      }
+    }
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
targetAssignment.entrySet()) {
+      for (String segmentKey : entrySet.getValue().keySet()) {
+        newServersToSegmentMap.computeIfAbsent(segmentKey, k -> new 
HashSet<>()).add(entrySet.getKey());
+      }
+    }
+
+    long avgSegmentSize = getAverageSegmentSize(tableSubTypeSizeDetails, 
currentAssignment);
+
+    for (Map.Entry<String, Set<String>> entry : 
newServersToSegmentMap.entrySet()) {
+      String server = entry.getKey();
+      DiskUsageInfo diskUsage = getDiskUsageInfoOfInstance(server);
+
+      if (diskUsage.getTotalSpaceBytes() < 0) {
+        return "Disk usage info not enabled. Try to set 
controller.enable.resource.utilization.check=true";
+      }
+
+      Set<String> segmentSet = entry.getValue();
+
+      Set<String> newSegmentSet = new HashSet<>(segmentSet);
+      Set<String> existingSegmentSet = new HashSet<>();
+      Set<String> intersection = new HashSet<>();
+      if (existingServersToSegmentMap.containsKey(server)) {
+        Set<String> segmentSetForServer = 
existingServersToSegmentMap.get(server);
+        existingSegmentSet.addAll(segmentSetForServer);
+        intersection.addAll(segmentSetForServer);
+        intersection.retainAll(newSegmentSet);
+      }
+      newSegmentSet.removeAll(intersection);
+      Set<String> removedSegmentSet = new HashSet<>(existingSegmentSet);
+      removedSegmentSet.removeAll(intersection);
+
+      long diskUtilizationGain = newSegmentSet.size() * avgSegmentSize;
+      long diskUtilizationLoss = removedSegmentSet.size() * avgSegmentSize;
+
+      long diskUtilizationFootprint = diskUsage.getUsedSpaceBytes() + 
diskUtilizationGain;
+      double diskUtilizationFootprintRate =

Review Comment:
   nit: rename `diskUtilizationFootprintRate` to 
`diskUtilizationFootprintPercentage`



##########
pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/DiskUsageInfo.java:
##########
@@ -34,6 +34,14 @@ public class DiskUsageInfo {
   private final long _usedSpaceBytes;
   private final long _lastUpdatedTimeInEpochMs;
 
+  public DiskUsageInfo(String instanceId) {

Review Comment:
   nit: add `@JsonProperty("instanceId")` annotation?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -572,28 +575,28 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
     }
   }
 
-  private long calculateTableSizePerReplicaInBytes(String tableNameWithType) {
-    long tableSizePerReplicaInBytes = -1;
+  private TableSizeReader.TableSubTypeSizeDetails fetchTableSizeDetails(String 
tableNameWithType) {
     if (_tableSizeReader == null) {
       LOGGER.warn("tableSizeReader is null, cannot calculate table size for 
table {}!", tableNameWithType);
-      return tableSizePerReplicaInBytes;
+      return null;
     }
     LOGGER.info("Fetching the table size for rebalance summary for table: {}", 
tableNameWithType);
     try {
       // TODO: Consider making the timeoutMs for fetching table size via table 
rebalancer configurable
-      TableSizeReader.TableSubTypeSizeDetails tableSizeDetails =
-          _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000);
-      tableSizePerReplicaInBytes = 
tableSizeDetails._reportedSizePerReplicaInBytes;
+      return _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000);
     } catch (InvalidConfigException e) {
       LOGGER.error("Caught exception while trying to fetch table size details 
for table: {}", tableNameWithType, e);
     }

Review Comment:
   can you add the exit log back (want it for debugging):
   
   ```
       LOGGER.info("Fetched the table size details for table: {}", 
tableNameWithType);
   ```



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -513,6 +533,106 @@ public void testRebalance()
     assertNull(rebalanceResult.getPreChecksResult());
 
     _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
+
+    for (int i = 0; i < numServers; i++) {
+      stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+    }
+    for (int i = 0; i < numServersToAdd; i++) {
+      stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + (numServers + i));
+    }
+    executorService.shutdown();
+  }
+
+  @Test
+  public void testRebalancePreCheckerDiskUtil()
+      throws Exception {
+    int numServers = 3;
+    // Mock disk usage
+    Map<String, DiskUsageInfo> diskUsageInfoMap = new HashMap<>();
+
+    for (int i = 0; i < numServers; i++) {
+      String instanceId = "preCheckerDiskUtil_" + SERVER_INSTANCE_ID_PREFIX + 
i;
+      addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+      DiskUsageInfo diskUsageInfo1 =
+          new DiskUsageInfo(instanceId, "", 1000L, 200L, 
System.currentTimeMillis());
+      diskUsageInfoMap.put(instanceId, diskUsageInfo1);
+    }
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
+    preChecker.init(_helixResourceManager, executorService, 0.5);
+    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, 
null, preChecker,
+        _helixResourceManager.getTableSizeReader());
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
+
+    // Create the table
+    addDummySchema(RAW_TABLE_NAME);
+    _helixResourceManager.addTable(tableConfig);
+
+    // Add the segments
+    int numSegments = 10;
+    for (int i = 0; i < numSegments; i++) {
+      _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+          SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME, 
SEGMENT_NAME_PREFIX + i), null);
+    }
+    Map<String, Map<String, String>> oldSegmentAssignment =
+        
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields();
+
+    // Add 3 more servers
+    int numServersToAdd = 3;
+    for (int i = 0; i < numServersToAdd; i++) {
+      String instanceId = "preCheckerDiskUtil_" + SERVER_INSTANCE_ID_PREFIX + 
(numServers + i);
+      addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+      DiskUsageInfo diskUsageInfo =
+          new DiskUsageInfo(instanceId, "", 1000L, 200L, 
System.currentTimeMillis());
+      diskUsageInfoMap.put(instanceId, diskUsageInfo);
+    }
+
+    ResourceUtilizationInfo.setDiskUsageInfo(diskUsageInfoMap);
+
+    // Rebalance in dry-run mode
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setPreChecks(true);
+
+    RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    Map<String, String> preCheckResult = rebalanceResult.getPreChecksResult();
+    assertNotNull(preCheckResult);
+    
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION));
+    // Sending request to servers should fail for all, so needsPreprocess 
should be set to "error" to indicate that a

Review Comment:
   is this a copy paste error? you don't seem to be checking for reload



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java:
##########
@@ -22,10 +22,33 @@
 import java.util.concurrent.ExecutorService;
 import javax.annotation.Nullable;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.TableSizeReader;
 import org.apache.pinot.spi.config.table.TableConfig;
 
 
 public interface RebalancePreChecker {
-  void init(PinotHelixResourceManager pinotHelixResourceManager, @Nullable 
ExecutorService executorService);
-  Map<String, String> check(String rebalanceJobId, String tableNameWithType, 
TableConfig tableConfig);
+  void init(PinotHelixResourceManager pinotHelixResourceManager, @Nullable 
ExecutorService executorService,
+      double diskUtilizationThreshold);
+
+  class TableFacts {
+    public String _rebalanceJobId;
+    public String _tableNameWithType;
+    public TableConfig _tableConfig;
+    public Map<String, Map<String, String>> _currentAssignment;
+    public Map<String, Map<String, String>> _targetAssignment;
+    public TableSizeReader.TableSubTypeSizeDetails _tableSubTypeSizeDetails;
+
+    public TableFacts(String rebalanceJobId, String tableNameWithType, 
TableConfig tableConfig,

Review Comment:
   nit: are any of these nullable? if so, add `@Nullable` annotation for those



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -42,19 +47,28 @@ public class DefaultRebalancePreChecker implements 
RebalancePreChecker {
 
   public static final String NEEDS_RELOAD_STATUS = "needsReloadStatus";
   public static final String IS_MINIMIZE_DATA_MOVEMENT = 
"isMinimizeDataMovement";
+  public static final String DISK_UTILIZATION = "diskUtilization";
+
+  private static double _diskUtilizationThreshold;
 
   protected PinotHelixResourceManager _pinotHelixResourceManager;
   protected ExecutorService _executorService;
 
   @Override
-  public void init(PinotHelixResourceManager pinotHelixResourceManager, 
@Nullable ExecutorService executorService) {
+  public void init(PinotHelixResourceManager pinotHelixResourceManager, 
@Nullable ExecutorService executorService,
+      double diskUtilizationThreshold) {
     _pinotHelixResourceManager = pinotHelixResourceManager;
     _executorService = executorService;
+    _diskUtilizationThreshold = diskUtilizationThreshold;
   }
 
   @Override
-  public Map<String, String> check(String rebalanceJobId, String 
tableNameWithType, TableConfig tableConfig) {
-    LOGGER.info("Start pre-checks for table: {} with rebalanceJobId: {}", 
tableNameWithType, rebalanceJobId);
+  public Map<String, String> check(TableFacts tableFacts) {
+    LOGGER.info("Start pre-checks. Table fact: {}", tableFacts.toString());

Review Comment:
   what does `tableFacts.toString()` look like? I don't see that you've 
implemented that so want to make sure this log looks fine and not too long



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -138,4 +157,80 @@ private boolean checkIsMinimizeDataMovement(String 
rebalanceJobId, String tableN
       return false;
     }
   }
+
+  private String checkDiskUtilization(String tableNameWithType, Map<String, 
Map<String, String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment,
+      TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, double 
threshold) {
+    boolean isDiskUtilSafe = true;
+    StringBuilder message = new StringBuilder("UNSAFE. Servers with unsafe 
disk util footprint: ");
+    String sep = "";
+    Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
currentAssignment.entrySet()) {
+      for (String segmentKey : entrySet.getValue().keySet()) {

Review Comment:
   nit: not your mistake, but `segmentKey` should `serverKey` or 
`instanceName`. I'll be correcting this in other parts of the code as part of 
other PRs, but it'll be good if you can fix it here 😅 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -138,4 +157,80 @@ private boolean checkIsMinimizeDataMovement(String 
rebalanceJobId, String tableN
       return false;
     }
   }
+
+  private String checkDiskUtilization(String tableNameWithType, Map<String, 
Map<String, String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment,
+      TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, double 
threshold) {
+    boolean isDiskUtilSafe = true;
+    StringBuilder message = new StringBuilder("UNSAFE. Servers with unsafe 
disk util footprint: ");
+    String sep = "";
+    Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
currentAssignment.entrySet()) {
+      for (String segmentKey : entrySet.getValue().keySet()) {
+        existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new 
HashSet<>()).add(entrySet.getKey());
+      }
+    }
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
targetAssignment.entrySet()) {
+      for (String segmentKey : entrySet.getValue().keySet()) {

Review Comment:
   nit: not your mistake, but `segmentKey` should `serverKey` or 
`instanceName`. I'll be correcting this in other parts of the code as part of 
other PRs, but it'll be good if you can fix it here 😅 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -299,18 +292,28 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
               + "rebalance", rebalanceJobId, tableNameWithType), e);
       return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
           "Caught exception while calculating target assignment: " + e, 
instancePartitionsMap,
-          tierToInstancePartitionsMap, null, preChecksResult, null);
+          tierToInstancePartitionsMap, null, null, null);
     }
 
     boolean segmentAssignmentUnchanged = 
currentAssignment.equals(targetAssignment);
     LOGGER.info("For rebalanceId: {}, instancePartitionsUnchanged: {}, 
tierInstancePartitionsUnchanged: {}, "
             + "segmentAssignmentUnchanged: {} for table: {}", rebalanceJobId, 
instancePartitionsUnchanged,
         tierInstancePartitionsUnchanged, segmentAssignmentUnchanged, 
tableNameWithType);
 
+    TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails = 
fetchTableSizeDetails(tableNameWithType);
+
+    Map<String, String> preChecksResult = null;
+    if (preChecks && _rebalancePreChecker != null) {
+      // TODO: consider making an error or warning log when pre-checks are 
enabled but the pre-checker is not set

Review Comment:
   nit: if you add the recommended log for `_rebalancePreChecker` being null, 
then remove this TODO



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -138,4 +157,80 @@ private boolean checkIsMinimizeDataMovement(String 
rebalanceJobId, String tableN
       return false;
     }
   }
+
+  private String checkDiskUtilization(String tableNameWithType, Map<String, 
Map<String, String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment,
+      TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, double 
threshold) {
+    boolean isDiskUtilSafe = true;
+    StringBuilder message = new StringBuilder("UNSAFE. Servers with unsafe 
disk util footprint: ");
+    String sep = "";
+    Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
currentAssignment.entrySet()) {
+      for (String segmentKey : entrySet.getValue().keySet()) {
+        existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new 
HashSet<>()).add(entrySet.getKey());
+      }
+    }
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
targetAssignment.entrySet()) {
+      for (String segmentKey : entrySet.getValue().keySet()) {
+        newServersToSegmentMap.computeIfAbsent(segmentKey, k -> new 
HashSet<>()).add(entrySet.getKey());
+      }
+    }
+
+    long avgSegmentSize = getAverageSegmentSize(tableSubTypeSizeDetails, 
currentAssignment);
+
+    for (Map.Entry<String, Set<String>> entry : 
newServersToSegmentMap.entrySet()) {
+      String server = entry.getKey();
+      DiskUsageInfo diskUsage = getDiskUsageInfoOfInstance(server);
+
+      if (diskUsage.getTotalSpaceBytes() < 0) {
+        return "Disk usage info not enabled. Try to set 
controller.enable.resource.utilization.check=true";
+      }
+
+      Set<String> segmentSet = entry.getValue();
+
+      Set<String> newSegmentSet = new HashSet<>(segmentSet);
+      Set<String> existingSegmentSet = new HashSet<>();
+      Set<String> intersection = new HashSet<>();
+      if (existingServersToSegmentMap.containsKey(server)) {
+        Set<String> segmentSetForServer = 
existingServersToSegmentMap.get(server);
+        existingSegmentSet.addAll(segmentSetForServer);
+        intersection.addAll(segmentSetForServer);
+        intersection.retainAll(newSegmentSet);
+      }
+      newSegmentSet.removeAll(intersection);
+      Set<String> removedSegmentSet = new HashSet<>(existingSegmentSet);
+      removedSegmentSet.removeAll(intersection);
+
+      long diskUtilizationGain = newSegmentSet.size() * avgSegmentSize;
+      long diskUtilizationLoss = removedSegmentSet.size() * avgSegmentSize;
+
+      long diskUtilizationFootprint = diskUsage.getUsedSpaceBytes() + 
diskUtilizationGain;
+      double diskUtilizationFootprintRate =
+          (double) diskUtilizationFootprint / diskUsage.getTotalSpaceBytes();
+
+      if (diskUtilizationFootprintRate >= threshold) {
+        isDiskUtilSafe = false;
+        message.append(sep)
+            .append(server)
+            .append(String.format(" (%d%%)", (short) 
(diskUtilizationFootprintRate * 100)));
+        sep = ", ";
+      }
+    }
+    return isDiskUtilSafe ? "Within threshold" : message.toString();

Review Comment:
   nit: let's add the threshold value here as well (in case we decide to make 
the threshold configurable via `RebalanceConfig` in the future)



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -138,4 +157,80 @@ private boolean checkIsMinimizeDataMovement(String 
rebalanceJobId, String tableN
       return false;
     }
   }
+
+  private String checkDiskUtilization(String tableNameWithType, Map<String, 
Map<String, String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment,
+      TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, double 
threshold) {
+    boolean isDiskUtilSafe = true;
+    StringBuilder message = new StringBuilder("UNSAFE. Servers with unsafe 
disk util footprint: ");
+    String sep = "";

Review Comment:
   nit: what is `sep`? can we use a less confusing name?



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java:
##########
@@ -227,6 +227,8 @@ public Map<String, Object> 
getDefaultControllerConfiguration() {
     properties.put(ControllerConf.DISABLE_GROOVY, false);
     properties.put(ControllerConf.CONSOLE_SWAGGER_ENABLE, false);
     properties.put(CommonConstants.CONFIG_OF_TIMEZONE, "UTC");
+    // Disable resource util check in test

Review Comment:
   why are we disabling this? can you update the comment to explain?



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java:
##########
@@ -878,9 +878,10 @@ private void checkRebalancePreCheckStatus(RebalanceResult 
rebalanceResult, Rebal
     assertEquals(rebalanceResult.getStatus(), expectedStatus);
     Map<String, String> preChecksResult = rebalanceResult.getPreChecksResult();
     assertNotNull(preChecksResult);
-    assertEquals(preChecksResult.size(), 2);
+    assertEquals(preChecksResult.size(), 3);
     
assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT));
     
assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS));
+    
assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION));

Review Comment:
   Can you also add an additional check to validate the returned value for this 
check? is it possible to add disk utilization related tests here?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -138,4 +157,80 @@ private boolean checkIsMinimizeDataMovement(String 
rebalanceJobId, String tableN
       return false;
     }
   }
+
+  private String checkDiskUtilization(String tableNameWithType, Map<String, 
Map<String, String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment,
+      TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, double 
threshold) {
+    boolean isDiskUtilSafe = true;
+    StringBuilder message = new StringBuilder("UNSAFE. Servers with unsafe 
disk util footprint: ");
+    String sep = "";
+    Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
currentAssignment.entrySet()) {
+      for (String segmentKey : entrySet.getValue().keySet()) {
+        existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new 
HashSet<>()).add(entrySet.getKey());
+      }
+    }
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
targetAssignment.entrySet()) {
+      for (String segmentKey : entrySet.getValue().keySet()) {
+        newServersToSegmentMap.computeIfAbsent(segmentKey, k -> new 
HashSet<>()).add(entrySet.getKey());
+      }
+    }
+
+    long avgSegmentSize = getAverageSegmentSize(tableSubTypeSizeDetails, 
currentAssignment);
+
+    for (Map.Entry<String, Set<String>> entry : 
newServersToSegmentMap.entrySet()) {
+      String server = entry.getKey();
+      DiskUsageInfo diskUsage = getDiskUsageInfoOfInstance(server);
+
+      if (diskUsage.getTotalSpaceBytes() < 0) {
+        return "Disk usage info not enabled. Try to set 
controller.enable.resource.utilization.check=true";
+      }
+
+      Set<String> segmentSet = entry.getValue();
+
+      Set<String> newSegmentSet = new HashSet<>(segmentSet);
+      Set<String> existingSegmentSet = new HashSet<>();
+      Set<String> intersection = new HashSet<>();
+      if (existingServersToSegmentMap.containsKey(server)) {
+        Set<String> segmentSetForServer = 
existingServersToSegmentMap.get(server);
+        existingSegmentSet.addAll(segmentSetForServer);
+        intersection.addAll(segmentSetForServer);
+        intersection.retainAll(newSegmentSet);
+      }
+      newSegmentSet.removeAll(intersection);
+      Set<String> removedSegmentSet = new HashSet<>(existingSegmentSet);
+      removedSegmentSet.removeAll(intersection);
+
+      long diskUtilizationGain = newSegmentSet.size() * avgSegmentSize;
+      long diskUtilizationLoss = removedSegmentSet.size() * avgSegmentSize;

Review Comment:
   looks like you don't do anything with `diskUtilizationLoss` here.
   
   Can we have 2 disk utilization pre-checks:
   - Worst case disk utilization during rebalance: 
`diskUsage.getUsedSpaceBytes() + diskUtilizationGain`
   - Actual disk utilization after rebalance were we account for deleted data 
too: `diskUsage.getUsedSpaceBytes() + diskUtilizationGain - diskUtilizationLoss`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -299,18 +292,28 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
               + "rebalance", rebalanceJobId, tableNameWithType), e);
       return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
           "Caught exception while calculating target assignment: " + e, 
instancePartitionsMap,
-          tierToInstancePartitionsMap, null, preChecksResult, null);
+          tierToInstancePartitionsMap, null, null, null);
     }
 
     boolean segmentAssignmentUnchanged = 
currentAssignment.equals(targetAssignment);
     LOGGER.info("For rebalanceId: {}, instancePartitionsUnchanged: {}, 
tierInstancePartitionsUnchanged: {}, "
             + "segmentAssignmentUnchanged: {} for table: {}", rebalanceJobId, 
instancePartitionsUnchanged,
         tierInstancePartitionsUnchanged, segmentAssignmentUnchanged, 
tableNameWithType);
 
+    TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails = 
fetchTableSizeDetails(tableNameWithType);
+
+    Map<String, String> preChecksResult = null;
+    if (preChecks && _rebalancePreChecker != null) {
+      // TODO: consider making an error or warning log when pre-checks are 
enabled but the pre-checker is not set
+      RebalancePreChecker.TableFacts tableFacts = new 
RebalancePreChecker.TableFacts(rebalanceJobId, tableNameWithType,
+          tableConfig, currentAssignment, targetAssignment, 
tableSubTypeSizeDetails);
+      preChecksResult = _rebalancePreChecker.check(tableFacts);
+    }

Review Comment:
   nit: let's add an else for if `preChecks` are enabled but 
`_rebalancePreChecker` is null so that we can debug this scenario from logs



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -572,28 +575,28 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
     }
   }
 
-  private long calculateTableSizePerReplicaInBytes(String tableNameWithType) {
-    long tableSizePerReplicaInBytes = -1;
+  private TableSizeReader.TableSubTypeSizeDetails fetchTableSizeDetails(String 
tableNameWithType) {
     if (_tableSizeReader == null) {
       LOGGER.warn("tableSizeReader is null, cannot calculate table size for 
table {}!", tableNameWithType);
-      return tableSizePerReplicaInBytes;
+      return null;
     }
     LOGGER.info("Fetching the table size for rebalance summary for table: {}", 
tableNameWithType);

Review Comment:
   nit: reword this to remove "for rebalance summary" since it will be used for 
both pre-checks and summary



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -513,6 +533,106 @@ public void testRebalance()
     assertNull(rebalanceResult.getPreChecksResult());
 
     _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
+
+    for (int i = 0; i < numServers; i++) {
+      stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+    }
+    for (int i = 0; i < numServersToAdd; i++) {
+      stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + (numServers + i));
+    }
+    executorService.shutdown();
+  }
+
+  @Test
+  public void testRebalancePreCheckerDiskUtil()
+      throws Exception {
+    int numServers = 3;
+    // Mock disk usage
+    Map<String, DiskUsageInfo> diskUsageInfoMap = new HashMap<>();
+
+    for (int i = 0; i < numServers; i++) {
+      String instanceId = "preCheckerDiskUtil_" + SERVER_INSTANCE_ID_PREFIX + 
i;
+      addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+      DiskUsageInfo diskUsageInfo1 =
+          new DiskUsageInfo(instanceId, "", 1000L, 200L, 
System.currentTimeMillis());
+      diskUsageInfoMap.put(instanceId, diskUsageInfo1);
+    }
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
+    preChecker.init(_helixResourceManager, executorService, 0.5);
+    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, 
null, preChecker,
+        _helixResourceManager.getTableSizeReader());
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
+
+    // Create the table
+    addDummySchema(RAW_TABLE_NAME);
+    _helixResourceManager.addTable(tableConfig);
+
+    // Add the segments
+    int numSegments = 10;
+    for (int i = 0; i < numSegments; i++) {
+      _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+          SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME, 
SEGMENT_NAME_PREFIX + i), null);
+    }
+    Map<String, Map<String, String>> oldSegmentAssignment =
+        
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields();
+
+    // Add 3 more servers
+    int numServersToAdd = 3;
+    for (int i = 0; i < numServersToAdd; i++) {
+      String instanceId = "preCheckerDiskUtil_" + SERVER_INSTANCE_ID_PREFIX + 
(numServers + i);
+      addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+      DiskUsageInfo diskUsageInfo =
+          new DiskUsageInfo(instanceId, "", 1000L, 200L, 
System.currentTimeMillis());
+      diskUsageInfoMap.put(instanceId, diskUsageInfo);
+    }
+
+    ResourceUtilizationInfo.setDiskUsageInfo(diskUsageInfoMap);
+
+    // Rebalance in dry-run mode
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setPreChecks(true);
+
+    RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    Map<String, String> preCheckResult = rebalanceResult.getPreChecksResult();
+    assertNotNull(preCheckResult);
+    
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION));
+    // Sending request to servers should fail for all, so needsPreprocess 
should be set to "error" to indicate that a
+    // manual check is needed
+    
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION), 
"Within threshold");
+
+    for (int i = 0; i < numServers + numServersToAdd; i++) {
+      String instanceId = "preCheckerDiskUtil_" + SERVER_INSTANCE_ID_PREFIX + 
i;
+      DiskUsageInfo diskUsageInfo =
+          new DiskUsageInfo(instanceId, "", 1000L, 755L, 
System.currentTimeMillis());
+      diskUsageInfoMap.put(instanceId, diskUsageInfo);
+    }
+
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setPreChecks(true);
+
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    preCheckResult = rebalanceResult.getPreChecksResult();
+    assertNotNull(preCheckResult);
+    
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION));
+    // Sending request to servers should fail for all, so needsPreprocess 
should be set to "error" to indicate that a

Review Comment:
   is this a copy paste error? you don't seem to be checking for reload



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -138,4 +157,80 @@ private boolean checkIsMinimizeDataMovement(String 
rebalanceJobId, String tableN
       return false;
     }
   }
+
+  private String checkDiskUtilization(String tableNameWithType, Map<String, 
Map<String, String>> currentAssignment,

Review Comment:
   Just calling out that the pre-check results will be changing a bit as done 
in https://github.com/apache/pinot/pull/15233, you'll have to rebase and pick 
up that change once it is merged



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java:
##########
@@ -22,10 +22,33 @@
 import java.util.concurrent.ExecutorService;
 import javax.annotation.Nullable;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.TableSizeReader;
 import org.apache.pinot.spi.config.table.TableConfig;
 
 
 public interface RebalancePreChecker {
-  void init(PinotHelixResourceManager pinotHelixResourceManager, @Nullable 
ExecutorService executorService);
-  Map<String, String> check(String rebalanceJobId, String tableNameWithType, 
TableConfig tableConfig);
+  void init(PinotHelixResourceManager pinotHelixResourceManager, @Nullable 
ExecutorService executorService,
+      double diskUtilizationThreshold);
+
+  class TableFacts {

Review Comment:
   nit: can we rename this to `PreCheckContext` since this is specifically used 
for pre-checks only.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -42,19 +47,28 @@ public class DefaultRebalancePreChecker implements 
RebalancePreChecker {
 
   public static final String NEEDS_RELOAD_STATUS = "needsReloadStatus";
   public static final String IS_MINIMIZE_DATA_MOVEMENT = 
"isMinimizeDataMovement";
+  public static final String DISK_UTILIZATION = "diskUtilization";
+
+  private static double _diskUtilizationThreshold;
 
   protected PinotHelixResourceManager _pinotHelixResourceManager;
   protected ExecutorService _executorService;
 
   @Override
-  public void init(PinotHelixResourceManager pinotHelixResourceManager, 
@Nullable ExecutorService executorService) {
+  public void init(PinotHelixResourceManager pinotHelixResourceManager, 
@Nullable ExecutorService executorService,
+      double diskUtilizationThreshold) {
     _pinotHelixResourceManager = pinotHelixResourceManager;
     _executorService = executorService;
+    _diskUtilizationThreshold = diskUtilizationThreshold;
   }
 
   @Override
-  public Map<String, String> check(String rebalanceJobId, String 
tableNameWithType, TableConfig tableConfig) {
-    LOGGER.info("Start pre-checks for table: {} with rebalanceJobId: {}", 
tableNameWithType, rebalanceJobId);
+  public Map<String, String> check(TableFacts tableFacts) {

Review Comment:
   +1 - I think in the future we might want to make the threshold configurable 
via `RebalanceConfig` in which case having it passed in from `check` is better. 
We can still have the config based `_diskUtilizationThreshold` in init though, 
as that could be the default if no `RebalanceConfig` override is provided. what 
do you folks think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to