klsince commented on code in PR #15050:
URL: https://github.com/apache/pinot/pull/15050#discussion_r1960684388


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -291,43 +305,58 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
               + "rebalance", rebalanceJobId, tableNameWithType), e);
       return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
           "Caught exception while calculating target assignment: " + e, 
instancePartitionsMap,
-          tierToInstancePartitionsMap, null, preChecksResult);
+          tierToInstancePartitionsMap, null, preChecksResult, null);
     }
 
     boolean segmentAssignmentUnchanged = 
currentAssignment.equals(targetAssignment);
     LOGGER.info("For rebalanceId: {}, instancePartitionsUnchanged: {}, 
tierInstancePartitionsUnchanged: {}, "
             + "segmentAssignmentUnchanged: {} for table: {}", rebalanceJobId, 
instancePartitionsUnchanged,
         tierInstancePartitionsUnchanged, segmentAssignmentUnchanged, 
tableNameWithType);
 
+    RebalanceSummaryResult summaryResult = null;
+    if (summary) {
+      summaryResult = calculateDryRunSummary(currentAssignment, 
targetAssignment, tableNameWithType, rebalanceJobId);
+    }

Review Comment:
   how about calculating the summary after this whole if-block, so we save the 
cost from calculating it when the table is already balanced?
   ```
   if (segmentAssignmentUnchanged) {
   ..
   }
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -559,6 +588,147 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
     }
   }
 
+  private long calculateTableSizePerReplicaInBytes(String tableNameWithType) {
+    long tableSizePerReplicaInBytes = -1;
+    if (_tableSizeReader == null) {
+      LOGGER.warn("tableSizeReader is null, cannot calculate table size for 
table {}!", tableNameWithType);
+      return tableSizePerReplicaInBytes;
+    }
+    try {
+      TableSizeReader.TableSubTypeSizeDetails tableSizeDetails =
+          _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000);
+      tableSizePerReplicaInBytes = 
tableSizeDetails._reportedSizePerReplicaInBytes;
+    } catch (InvalidConfigException e) {
+      String errMsg = String.format("Caught exception while trying to fetch 
table size details for table: %s",
+          tableNameWithType);
+      LOGGER.error(errMsg, e);
+    }
+    return tableSizePerReplicaInBytes;
+  }
+
+  private RebalanceSummaryResult calculateDryRunSummary(Map<String, 
Map<String, String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment, String 
tableNameWithType, String rebalanceJobId) {
+    LOGGER.info("Calculating rebalance summary for table: {} with 
rebalanceJobId: {}",
+        tableNameWithType, rebalanceJobId);
+    int existingReplicationFactor = 0;
+    int newReplicationFactor = 0;
+    Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
currentAssignment.entrySet()) {
+      existingReplicationFactor = entrySet.getValue().size();
+      for (Map.Entry<String, String> segmentEntrySet : 
entrySet.getValue().entrySet()) {
+        existingServersToSegmentMap.putIfAbsent(segmentEntrySet.getKey(), new 
HashSet<>());
+        
existingServersToSegmentMap.get(segmentEntrySet.getKey()).add(entrySet.getKey());
+      }
+    }
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
targetAssignment.entrySet()) {
+      newReplicationFactor = entrySet.getValue().size();
+      for (Map.Entry<String, String> segmentEntrySet : 
entrySet.getValue().entrySet()) {
+        newServersToSegmentMap.putIfAbsent(segmentEntrySet.getKey(), new 
HashSet<>());
+        
newServersToSegmentMap.get(segmentEntrySet.getKey()).add(entrySet.getKey());
+      }
+    }
+    RebalanceSummaryResult.RebalanceChangeInfo replicationFactor
+        = new 
RebalanceSummaryResult.RebalanceChangeInfo(existingReplicationFactor, 
newReplicationFactor);
+
+    int existingNumServers = existingServersToSegmentMap.keySet().size();
+    int newNumServers = newServersToSegmentMap.keySet().size();
+    RebalanceSummaryResult.RebalanceChangeInfo numServers
+        = new RebalanceSummaryResult.RebalanceChangeInfo(existingNumServers, 
newNumServers);
+
+    List<InstanceConfig> instanceConfigs = _helixDataAccessor.getChildValues(
+        _helixDataAccessor.keyBuilder().instanceConfigs(), true);
+    Map<String, List<String>> instanceToTagsMap = new HashMap<>();
+    for (InstanceConfig instanceConfig : instanceConfigs) {
+      instanceToTagsMap.put(instanceConfig.getInstanceName(), 
instanceConfig.getTags());
+    }
+
+    Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
serverSegmentChangeInfoMap = new HashMap<>();
+    int segmentsNotMoved = 0;
+    int numServersGettingDataAdded = 0;
+    for (Map.Entry<String, Set<String>> entry : 
newServersToSegmentMap.entrySet()) {
+      String server = entry.getKey();
+      Set<String> segmentMap = entry.getValue();
+      int totalNewSegments = segmentMap.size();
+
+      Set<String> newSegmentList = new HashSet<>(segmentMap);
+      Set<String> existingSegmentList = new HashSet<>();
+      int segmentsUnchanged = 0;
+      int totalExistingSegments = 0;
+      RebalanceSummaryResult.ServerStatus serverStatus = 
RebalanceSummaryResult.ServerStatus.ADDED;
+      if (existingServersToSegmentMap.containsKey(server)) {
+        totalExistingSegments = existingServersToSegmentMap.get(server).size();
+        existingSegmentList.addAll(existingServersToSegmentMap.get(server));
+        Set<String> intersection = new 
HashSet<>(existingServersToSegmentMap.get(server));
+        intersection.retainAll(newSegmentList);
+        segmentsUnchanged = intersection.size();
+        segmentsNotMoved += segmentsUnchanged;
+        serverStatus = RebalanceSummaryResult.ServerStatus.UNCHANGED;
+      }
+      newSegmentList.removeAll(existingSegmentList);
+      int segmentsAdded = newSegmentList.size();
+      int segmentsDeleted = existingSegmentList.size() - segmentsUnchanged;
+      numServersGettingDataAdded += segmentsAdded > 0 ? 1 : 0;
+
+      serverSegmentChangeInfoMap.put(server, new 
RebalanceSummaryResult.ServerSegmentChangeInfo(serverStatus,
+          totalNewSegments, totalExistingSegments, segmentsAdded, 
segmentsDeleted, segmentsUnchanged,
+          instanceToTagsMap.getOrDefault(server, null)));
+    }
+
+    for (Map.Entry<String, Set<String>> entry : 
existingServersToSegmentMap.entrySet()) {
+      String server = entry.getKey();
+      if (!serverSegmentChangeInfoMap.containsKey(server)) {
+        serverSegmentChangeInfoMap.put(server, new 
RebalanceSummaryResult.ServerSegmentChangeInfo(
+            RebalanceSummaryResult.ServerStatus.REMOVED, 0, 
entry.getValue().size(), 0, entry.getValue().size(), 0,
+            instanceToTagsMap.getOrDefault(server, null)));
+      }
+    }
+
+    RebalanceSummaryResult.RebalanceChangeInfo numSegmentsInSingleReplica
+        = new 
RebalanceSummaryResult.RebalanceChangeInfo(currentAssignment.size(), 
targetAssignment.size());
+
+    int existingNumberSegmentsTotal = existingReplicationFactor * 
currentAssignment.size();
+    int newNumberSegmentsTotal = newReplicationFactor * 
targetAssignment.size();
+    RebalanceSummaryResult.RebalanceChangeInfo numSegmentsAcrossAllReplicas
+        = new 
RebalanceSummaryResult.RebalanceChangeInfo(existingNumberSegmentsTotal, 
newNumberSegmentsTotal);
+
+    int totalSegmentsToBeMoved = newNumberSegmentsTotal - segmentsNotMoved;
+
+    long tableSizePerReplicaInBytes = 
calculateTableSizePerReplicaInBytes(tableNameWithType);
+    long averageSegmentSizeInBytes = tableSizePerReplicaInBytes <= 0 ? 
tableSizePerReplicaInBytes
+        : tableSizePerReplicaInBytes / ((long) currentAssignment.size());
+    long totalEstimatedDataToBeMovedInBytes = tableSizePerReplicaInBytes <= 0 
? tableSizePerReplicaInBytes
+        : ((long) totalSegmentsToBeMoved) * averageSegmentSizeInBytes;
+    double estimatedTimeToRebalanceInSec = 
getEstimatedTimeToRebalanceInSec(totalEstimatedDataToBeMovedInBytes,
+        numServersGettingDataAdded);
+
+    RebalanceSummaryResult.ServerInfo serverInfo = new 
RebalanceSummaryResult.ServerInfo(numServersGettingDataAdded,
+        numServers, serverSegmentChangeInfoMap);
+    RebalanceSummaryResult.SegmentInfo segmentInfo = new 
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
+        averageSegmentSizeInBytes, totalEstimatedDataToBeMovedInBytes, 
estimatedTimeToRebalanceInSec,
+        replicationFactor, numSegmentsInSingleReplica, 
numSegmentsAcrossAllReplicas);
+
+    LOGGER.info("Calculated rebalance summary for table: {} with 
rebalanceJobId: {}", tableNameWithType,
+        rebalanceJobId);
+    return new RebalanceSummaryResult(serverInfo, segmentInfo);
+  }
+
+  private static double getEstimatedTimeToRebalanceInSec(long 
totalEstimatedDataToBeMovedInBytes,
+      int numServersGettingDataAdded) {
+    double estimatedTimeToRebalanceInSec = totalEstimatedDataToBeMovedInBytes 
== 0 ? 0.0 : -1.0;
+    if (totalEstimatedDataToBeMovedInBytes > 0 && numServersGettingDataAdded > 
0) {
+      // Do some processing to figure out what the time to download might be
+      // Assume that data is evenly distributed across all servers
+      long totalDataPerServerInBytes = totalEstimatedDataToBeMovedInBytes / 
numServersGettingDataAdded;
+      // TODO: Pick a good threshold to calculate estimated time to rebalance. 
For now assume 100 MB/s data download
+      //       + process rate
+      estimatedTimeToRebalanceInSec = ((double) totalDataPerServerInBytes) / 
(100.0D * 1024.0D * 1024.0D);

Review Comment:
   I think maybe we skip this field, as it's not easy to estimate this 
accurately because the cluster setup can vary a lot. The summary contains the 
total amount of segments and data bytes a server may get now, so the user can 
estimate the duration for their envs accordingly.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -559,6 +588,147 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
     }
   }
 
+  private long calculateTableSizePerReplicaInBytes(String tableNameWithType) {
+    long tableSizePerReplicaInBytes = -1;
+    if (_tableSizeReader == null) {
+      LOGGER.warn("tableSizeReader is null, cannot calculate table size for 
table {}!", tableNameWithType);
+      return tableSizePerReplicaInBytes;
+    }
+    try {
+      TableSizeReader.TableSubTypeSizeDetails tableSizeDetails =
+          _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000);
+      tableSizePerReplicaInBytes = 
tableSizeDetails._reportedSizePerReplicaInBytes;
+    } catch (InvalidConfigException e) {
+      String errMsg = String.format("Caught exception while trying to fetch 
table size details for table: %s",
+          tableNameWithType);
+      LOGGER.error(errMsg, e);
+    }
+    return tableSizePerReplicaInBytes;
+  }
+
+  private RebalanceSummaryResult calculateDryRunSummary(Map<String, 
Map<String, String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment, String 
tableNameWithType, String rebalanceJobId) {
+    LOGGER.info("Calculating rebalance summary for table: {} with 
rebalanceJobId: {}",
+        tableNameWithType, rebalanceJobId);
+    int existingReplicationFactor = 0;
+    int newReplicationFactor = 0;
+    Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
currentAssignment.entrySet()) {
+      existingReplicationFactor = entrySet.getValue().size();
+      for (Map.Entry<String, String> segmentEntrySet : 
entrySet.getValue().entrySet()) {
+        existingServersToSegmentMap.putIfAbsent(segmentEntrySet.getKey(), new 
HashSet<>());
+        
existingServersToSegmentMap.get(segmentEntrySet.getKey()).add(entrySet.getKey());
+      }
+    }
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
targetAssignment.entrySet()) {
+      newReplicationFactor = entrySet.getValue().size();
+      for (Map.Entry<String, String> segmentEntrySet : 
entrySet.getValue().entrySet()) {
+        newServersToSegmentMap.putIfAbsent(segmentEntrySet.getKey(), new 
HashSet<>());
+        
newServersToSegmentMap.get(segmentEntrySet.getKey()).add(entrySet.getKey());

Review Comment:
   computeIfAbsent().add()? and just loop over keys (server names)?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -559,6 +588,147 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
     }
   }
 
+  private long calculateTableSizePerReplicaInBytes(String tableNameWithType) {
+    long tableSizePerReplicaInBytes = -1;
+    if (_tableSizeReader == null) {
+      LOGGER.warn("tableSizeReader is null, cannot calculate table size for 
table {}!", tableNameWithType);
+      return tableSizePerReplicaInBytes;
+    }
+    try {
+      TableSizeReader.TableSubTypeSizeDetails tableSizeDetails =
+          _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000);

Review Comment:
   perhaps add LOGGER.info() before/after this method, to know how much time 
controller may spend getting the table size, as I assume it'd take a while for 
large tables
   
   and maybe leave a TODO to make 30s configurable later



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -559,6 +588,147 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
     }
   }
 
+  private long calculateTableSizePerReplicaInBytes(String tableNameWithType) {
+    long tableSizePerReplicaInBytes = -1;
+    if (_tableSizeReader == null) {
+      LOGGER.warn("tableSizeReader is null, cannot calculate table size for 
table {}!", tableNameWithType);
+      return tableSizePerReplicaInBytes;
+    }
+    try {
+      TableSizeReader.TableSubTypeSizeDetails tableSizeDetails =
+          _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000);
+      tableSizePerReplicaInBytes = 
tableSizeDetails._reportedSizePerReplicaInBytes;
+    } catch (InvalidConfigException e) {
+      String errMsg = String.format("Caught exception while trying to fetch 
table size details for table: %s",
+          tableNameWithType);
+      LOGGER.error(errMsg, e);
+    }
+    return tableSizePerReplicaInBytes;
+  }
+
+  private RebalanceSummaryResult calculateDryRunSummary(Map<String, 
Map<String, String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment, String 
tableNameWithType, String rebalanceJobId) {
+    LOGGER.info("Calculating rebalance summary for table: {} with 
rebalanceJobId: {}",
+        tableNameWithType, rebalanceJobId);
+    int existingReplicationFactor = 0;
+    int newReplicationFactor = 0;
+    Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
currentAssignment.entrySet()) {
+      existingReplicationFactor = entrySet.getValue().size();
+      for (Map.Entry<String, String> segmentEntrySet : 
entrySet.getValue().entrySet()) {
+        existingServersToSegmentMap.putIfAbsent(segmentEntrySet.getKey(), new 
HashSet<>());
+        
existingServersToSegmentMap.get(segmentEntrySet.getKey()).add(entrySet.getKey());
+      }
+    }
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
targetAssignment.entrySet()) {
+      newReplicationFactor = entrySet.getValue().size();
+      for (Map.Entry<String, String> segmentEntrySet : 
entrySet.getValue().entrySet()) {
+        newServersToSegmentMap.putIfAbsent(segmentEntrySet.getKey(), new 
HashSet<>());
+        
newServersToSegmentMap.get(segmentEntrySet.getKey()).add(entrySet.getKey());
+      }
+    }
+    RebalanceSummaryResult.RebalanceChangeInfo replicationFactor
+        = new 
RebalanceSummaryResult.RebalanceChangeInfo(existingReplicationFactor, 
newReplicationFactor);
+
+    int existingNumServers = existingServersToSegmentMap.keySet().size();
+    int newNumServers = newServersToSegmentMap.keySet().size();
+    RebalanceSummaryResult.RebalanceChangeInfo numServers
+        = new RebalanceSummaryResult.RebalanceChangeInfo(existingNumServers, 
newNumServers);
+
+    List<InstanceConfig> instanceConfigs = _helixDataAccessor.getChildValues(
+        _helixDataAccessor.keyBuilder().instanceConfigs(), true);
+    Map<String, List<String>> instanceToTagsMap = new HashMap<>();
+    for (InstanceConfig instanceConfig : instanceConfigs) {
+      instanceToTagsMap.put(instanceConfig.getInstanceName(), 
instanceConfig.getTags());
+    }
+
+    Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
serverSegmentChangeInfoMap = new HashMap<>();
+    int segmentsNotMoved = 0;
+    int numServersGettingDataAdded = 0;
+    for (Map.Entry<String, Set<String>> entry : 
newServersToSegmentMap.entrySet()) {
+      String server = entry.getKey();
+      Set<String> segmentMap = entry.getValue();

Review Comment:
   nit: segments or segmentSet



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -559,6 +588,147 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
     }
   }
 
+  private long calculateTableSizePerReplicaInBytes(String tableNameWithType) {
+    long tableSizePerReplicaInBytes = -1;
+    if (_tableSizeReader == null) {
+      LOGGER.warn("tableSizeReader is null, cannot calculate table size for 
table {}!", tableNameWithType);
+      return tableSizePerReplicaInBytes;
+    }
+    try {
+      TableSizeReader.TableSubTypeSizeDetails tableSizeDetails =
+          _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000);
+      tableSizePerReplicaInBytes = 
tableSizeDetails._reportedSizePerReplicaInBytes;
+    } catch (InvalidConfigException e) {
+      String errMsg = String.format("Caught exception while trying to fetch 
table size details for table: %s",
+          tableNameWithType);
+      LOGGER.error(errMsg, e);
+    }
+    return tableSizePerReplicaInBytes;
+  }
+
+  private RebalanceSummaryResult calculateDryRunSummary(Map<String, 
Map<String, String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment, String 
tableNameWithType, String rebalanceJobId) {
+    LOGGER.info("Calculating rebalance summary for table: {} with 
rebalanceJobId: {}",
+        tableNameWithType, rebalanceJobId);
+    int existingReplicationFactor = 0;
+    int newReplicationFactor = 0;
+    Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
currentAssignment.entrySet()) {
+      existingReplicationFactor = entrySet.getValue().size();
+      for (Map.Entry<String, String> segmentEntrySet : 
entrySet.getValue().entrySet()) {
+        existingServersToSegmentMap.putIfAbsent(segmentEntrySet.getKey(), new 
HashSet<>());
+        
existingServersToSegmentMap.get(segmentEntrySet.getKey()).add(entrySet.getKey());

Review Comment:
   nit: existingServersToSegmentMap.computeIfAbsent().add()



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -559,6 +588,147 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
     }
   }
 
+  private long calculateTableSizePerReplicaInBytes(String tableNameWithType) {
+    long tableSizePerReplicaInBytes = -1;
+    if (_tableSizeReader == null) {
+      LOGGER.warn("tableSizeReader is null, cannot calculate table size for 
table {}!", tableNameWithType);
+      return tableSizePerReplicaInBytes;
+    }
+    try {
+      TableSizeReader.TableSubTypeSizeDetails tableSizeDetails =
+          _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000);
+      tableSizePerReplicaInBytes = 
tableSizeDetails._reportedSizePerReplicaInBytes;
+    } catch (InvalidConfigException e) {
+      String errMsg = String.format("Caught exception while trying to fetch 
table size details for table: %s",
+          tableNameWithType);
+      LOGGER.error(errMsg, e);

Review Comment:
   inline `LOGGER.error("Caught.... {}", tableWithType, e)`? as errMsg is not 
used anywhere else.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -559,6 +588,147 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
     }
   }
 
+  private long calculateTableSizePerReplicaInBytes(String tableNameWithType) {
+    long tableSizePerReplicaInBytes = -1;
+    if (_tableSizeReader == null) {
+      LOGGER.warn("tableSizeReader is null, cannot calculate table size for 
table {}!", tableNameWithType);
+      return tableSizePerReplicaInBytes;
+    }
+    try {
+      TableSizeReader.TableSubTypeSizeDetails tableSizeDetails =
+          _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000);
+      tableSizePerReplicaInBytes = 
tableSizeDetails._reportedSizePerReplicaInBytes;
+    } catch (InvalidConfigException e) {
+      String errMsg = String.format("Caught exception while trying to fetch 
table size details for table: %s",
+          tableNameWithType);
+      LOGGER.error(errMsg, e);
+    }
+    return tableSizePerReplicaInBytes;
+  }
+
+  private RebalanceSummaryResult calculateDryRunSummary(Map<String, 
Map<String, String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment, String 
tableNameWithType, String rebalanceJobId) {
+    LOGGER.info("Calculating rebalance summary for table: {} with 
rebalanceJobId: {}",
+        tableNameWithType, rebalanceJobId);
+    int existingReplicationFactor = 0;
+    int newReplicationFactor = 0;
+    Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
currentAssignment.entrySet()) {
+      existingReplicationFactor = entrySet.getValue().size();
+      for (Map.Entry<String, String> segmentEntrySet : 
entrySet.getValue().entrySet()) {

Review Comment:
   looks like we just need the keys (i.e. server names) in this loop



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -559,6 +588,147 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
     }
   }
 
+  private long calculateTableSizePerReplicaInBytes(String tableNameWithType) {
+    long tableSizePerReplicaInBytes = -1;
+    if (_tableSizeReader == null) {
+      LOGGER.warn("tableSizeReader is null, cannot calculate table size for 
table {}!", tableNameWithType);
+      return tableSizePerReplicaInBytes;
+    }
+    try {
+      TableSizeReader.TableSubTypeSizeDetails tableSizeDetails =
+          _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000);
+      tableSizePerReplicaInBytes = 
tableSizeDetails._reportedSizePerReplicaInBytes;
+    } catch (InvalidConfigException e) {
+      String errMsg = String.format("Caught exception while trying to fetch 
table size details for table: %s",
+          tableNameWithType);
+      LOGGER.error(errMsg, e);
+    }
+    return tableSizePerReplicaInBytes;
+  }
+
+  private RebalanceSummaryResult calculateDryRunSummary(Map<String, 
Map<String, String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment, String 
tableNameWithType, String rebalanceJobId) {
+    LOGGER.info("Calculating rebalance summary for table: {} with 
rebalanceJobId: {}",
+        tableNameWithType, rebalanceJobId);
+    int existingReplicationFactor = 0;
+    int newReplicationFactor = 0;
+    Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
currentAssignment.entrySet()) {
+      existingReplicationFactor = entrySet.getValue().size();
+      for (Map.Entry<String, String> segmentEntrySet : 
entrySet.getValue().entrySet()) {
+        existingServersToSegmentMap.putIfAbsent(segmentEntrySet.getKey(), new 
HashSet<>());
+        
existingServersToSegmentMap.get(segmentEntrySet.getKey()).add(entrySet.getKey());
+      }
+    }
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
targetAssignment.entrySet()) {
+      newReplicationFactor = entrySet.getValue().size();
+      for (Map.Entry<String, String> segmentEntrySet : 
entrySet.getValue().entrySet()) {
+        newServersToSegmentMap.putIfAbsent(segmentEntrySet.getKey(), new 
HashSet<>());
+        
newServersToSegmentMap.get(segmentEntrySet.getKey()).add(entrySet.getKey());
+      }
+    }
+    RebalanceSummaryResult.RebalanceChangeInfo replicationFactor
+        = new 
RebalanceSummaryResult.RebalanceChangeInfo(existingReplicationFactor, 
newReplicationFactor);
+
+    int existingNumServers = existingServersToSegmentMap.keySet().size();
+    int newNumServers = newServersToSegmentMap.keySet().size();

Review Comment:
   nit: simply map.size()



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -559,6 +588,147 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
     }
   }
 
+  private long calculateTableSizePerReplicaInBytes(String tableNameWithType) {
+    long tableSizePerReplicaInBytes = -1;
+    if (_tableSizeReader == null) {
+      LOGGER.warn("tableSizeReader is null, cannot calculate table size for 
table {}!", tableNameWithType);
+      return tableSizePerReplicaInBytes;
+    }
+    try {
+      TableSizeReader.TableSubTypeSizeDetails tableSizeDetails =
+          _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000);
+      tableSizePerReplicaInBytes = 
tableSizeDetails._reportedSizePerReplicaInBytes;
+    } catch (InvalidConfigException e) {
+      String errMsg = String.format("Caught exception while trying to fetch 
table size details for table: %s",
+          tableNameWithType);
+      LOGGER.error(errMsg, e);
+    }
+    return tableSizePerReplicaInBytes;
+  }
+
+  private RebalanceSummaryResult calculateDryRunSummary(Map<String, 
Map<String, String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment, String 
tableNameWithType, String rebalanceJobId) {
+    LOGGER.info("Calculating rebalance summary for table: {} with 
rebalanceJobId: {}",
+        tableNameWithType, rebalanceJobId);
+    int existingReplicationFactor = 0;
+    int newReplicationFactor = 0;
+    Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
currentAssignment.entrySet()) {
+      existingReplicationFactor = entrySet.getValue().size();
+      for (Map.Entry<String, String> segmentEntrySet : 
entrySet.getValue().entrySet()) {
+        existingServersToSegmentMap.putIfAbsent(segmentEntrySet.getKey(), new 
HashSet<>());
+        
existingServersToSegmentMap.get(segmentEntrySet.getKey()).add(entrySet.getKey());
+      }
+    }
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
targetAssignment.entrySet()) {
+      newReplicationFactor = entrySet.getValue().size();
+      for (Map.Entry<String, String> segmentEntrySet : 
entrySet.getValue().entrySet()) {
+        newServersToSegmentMap.putIfAbsent(segmentEntrySet.getKey(), new 
HashSet<>());
+        
newServersToSegmentMap.get(segmentEntrySet.getKey()).add(entrySet.getKey());
+      }
+    }
+    RebalanceSummaryResult.RebalanceChangeInfo replicationFactor
+        = new 
RebalanceSummaryResult.RebalanceChangeInfo(existingReplicationFactor, 
newReplicationFactor);
+
+    int existingNumServers = existingServersToSegmentMap.keySet().size();
+    int newNumServers = newServersToSegmentMap.keySet().size();
+    RebalanceSummaryResult.RebalanceChangeInfo numServers
+        = new RebalanceSummaryResult.RebalanceChangeInfo(existingNumServers, 
newNumServers);
+
+    List<InstanceConfig> instanceConfigs = _helixDataAccessor.getChildValues(
+        _helixDataAccessor.keyBuilder().instanceConfigs(), true);
+    Map<String, List<String>> instanceToTagsMap = new HashMap<>();
+    for (InstanceConfig instanceConfig : instanceConfigs) {
+      instanceToTagsMap.put(instanceConfig.getInstanceName(), 
instanceConfig.getTags());
+    }
+
+    Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
serverSegmentChangeInfoMap = new HashMap<>();
+    int segmentsNotMoved = 0;
+    int numServersGettingDataAdded = 0;
+    for (Map.Entry<String, Set<String>> entry : 
newServersToSegmentMap.entrySet()) {
+      String server = entry.getKey();
+      Set<String> segmentMap = entry.getValue();
+      int totalNewSegments = segmentMap.size();
+
+      Set<String> newSegmentList = new HashSet<>(segmentMap);
+      Set<String> existingSegmentList = new HashSet<>();
+      int segmentsUnchanged = 0;
+      int totalExistingSegments = 0;
+      RebalanceSummaryResult.ServerStatus serverStatus = 
RebalanceSummaryResult.ServerStatus.ADDED;
+      if (existingServersToSegmentMap.containsKey(server)) {
+        totalExistingSegments = existingServersToSegmentMap.get(server).size();

Review Comment:
   nit: use a var to keep existingServersToSegmentMap.get(server), as this 
map.get() is called 3 times in the loop. Or get value of map.get() and check if 
it's null for the if-check



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -559,6 +588,147 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
     }
   }
 
+  private long calculateTableSizePerReplicaInBytes(String tableNameWithType) {
+    long tableSizePerReplicaInBytes = -1;
+    if (_tableSizeReader == null) {
+      LOGGER.warn("tableSizeReader is null, cannot calculate table size for 
table {}!", tableNameWithType);
+      return tableSizePerReplicaInBytes;
+    }
+    try {
+      TableSizeReader.TableSubTypeSizeDetails tableSizeDetails =
+          _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000);
+      tableSizePerReplicaInBytes = 
tableSizeDetails._reportedSizePerReplicaInBytes;
+    } catch (InvalidConfigException e) {
+      String errMsg = String.format("Caught exception while trying to fetch 
table size details for table: %s",
+          tableNameWithType);
+      LOGGER.error(errMsg, e);
+    }
+    return tableSizePerReplicaInBytes;
+  }
+
+  private RebalanceSummaryResult calculateDryRunSummary(Map<String, 
Map<String, String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment, String 
tableNameWithType, String rebalanceJobId) {
+    LOGGER.info("Calculating rebalance summary for table: {} with 
rebalanceJobId: {}",
+        tableNameWithType, rebalanceJobId);
+    int existingReplicationFactor = 0;
+    int newReplicationFactor = 0;
+    Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
currentAssignment.entrySet()) {
+      existingReplicationFactor = entrySet.getValue().size();
+      for (Map.Entry<String, String> segmentEntrySet : 
entrySet.getValue().entrySet()) {
+        existingServersToSegmentMap.putIfAbsent(segmentEntrySet.getKey(), new 
HashSet<>());
+        
existingServersToSegmentMap.get(segmentEntrySet.getKey()).add(entrySet.getKey());
+      }
+    }
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
targetAssignment.entrySet()) {
+      newReplicationFactor = entrySet.getValue().size();
+      for (Map.Entry<String, String> segmentEntrySet : 
entrySet.getValue().entrySet()) {
+        newServersToSegmentMap.putIfAbsent(segmentEntrySet.getKey(), new 
HashSet<>());
+        
newServersToSegmentMap.get(segmentEntrySet.getKey()).add(entrySet.getKey());
+      }
+    }
+    RebalanceSummaryResult.RebalanceChangeInfo replicationFactor
+        = new 
RebalanceSummaryResult.RebalanceChangeInfo(existingReplicationFactor, 
newReplicationFactor);
+
+    int existingNumServers = existingServersToSegmentMap.keySet().size();
+    int newNumServers = newServersToSegmentMap.keySet().size();
+    RebalanceSummaryResult.RebalanceChangeInfo numServers
+        = new RebalanceSummaryResult.RebalanceChangeInfo(existingNumServers, 
newNumServers);
+
+    List<InstanceConfig> instanceConfigs = _helixDataAccessor.getChildValues(
+        _helixDataAccessor.keyBuilder().instanceConfigs(), true);
+    Map<String, List<String>> instanceToTagsMap = new HashMap<>();
+    for (InstanceConfig instanceConfig : instanceConfigs) {
+      instanceToTagsMap.put(instanceConfig.getInstanceName(), 
instanceConfig.getTags());
+    }
+
+    Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
serverSegmentChangeInfoMap = new HashMap<>();
+    int segmentsNotMoved = 0;
+    int numServersGettingDataAdded = 0;
+    for (Map.Entry<String, Set<String>> entry : 
newServersToSegmentMap.entrySet()) {
+      String server = entry.getKey();
+      Set<String> segmentMap = entry.getValue();
+      int totalNewSegments = segmentMap.size();
+
+      Set<String> newSegmentList = new HashSet<>(segmentMap);
+      Set<String> existingSegmentList = new HashSet<>();

Review Comment:
   nit: perhaps remove `List` suffix from the var name if using Set<> for bit 
less confusion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to