somandal commented on code in PR #15368:
URL: https://github.com/apache/pinot/pull/15368#discussion_r2021740211


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -120,16 +130,22 @@
  */
 public class TableRebalancer {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TableRebalancer.class);
+  private static final int TOP_N_IN_CONSUMING_SEGMENT_SUMMARY = 10;
   private final HelixManager _helixManager;
   private final HelixDataAccessor _helixDataAccessor;
   private final TableRebalanceObserver _tableRebalanceObserver;
   private final ControllerMetrics _controllerMetrics;
   private final RebalancePreChecker _rebalancePreChecker;
   private final TableSizeReader _tableSizeReader;
+  private final ExecutorService _executorService;
+  private final HttpClientConnectionManager _connectionManager;
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
 
   public TableRebalancer(HelixManager helixManager, @Nullable 
TableRebalanceObserver tableRebalanceObserver,
       @Nullable ControllerMetrics controllerMetrics, @Nullable 
RebalancePreChecker rebalancePreChecker,
-      @Nullable TableSizeReader tableSizeReader) {
+      @Nullable TableSizeReader tableSizeReader, @Nullable ExecutorService 
executorService,

Review Comment:
   nit: Since we do pass in `PinotHelixResourceManager` now, can we get the 
`TableSizeReader` from that directly rather than passing it in separately? 
Similarly check if any of the other parameters here can be fetched from 
`PinotHelixResourceManager` directly, just to keep the parameters passed in 
cleaner?



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -1570,6 +1580,269 @@ public void 
testRebalanceWithMinimizeDataMovementInstanceAssignments()
     }
   }
 
+  @Test
+  public void testRebalanceConsumingSegmentSummary()

Review Comment:
   For tests, is it possible to update 
`OfflineClusterIntegrationTest::testRebalanceDryRunSummary()` with asserts that 
the consuming segment summary is null? just to ensure we have that covered for 
OFFLINE tables?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -627,22 +653,42 @@ private RebalanceSummaryResult 
calculateDryRunSummary(Map<String, Map<String, St
       TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, 
TableConfig tableConfig) {
     LOGGER.info("Calculating rebalance summary for table: {} with 
rebalanceJobId: {}",
         tableNameWithType, rebalanceJobId);
+    boolean isOfflineTable = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == 
TableType.OFFLINE;
     int existingReplicationFactor = 0;
     int newReplicationFactor = 0;
     Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
     Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> existingServersToConsumingSegmentMap = 
isOfflineTable ? null : new HashMap<>();
+    Map<String, Set<String>> newServersToConsumingSegmentMap = isOfflineTable 
? null : new HashMap<>();
 
     for (Map.Entry<String, Map<String, String>> entrySet : 
currentAssignment.entrySet()) {
       existingReplicationFactor = entrySet.getValue().size();
-      for (String segmentKey : entrySet.getValue().keySet()) {
-        existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new 
HashSet<>()).add(entrySet.getKey());
+      String segmentName = entrySet.getKey();
+      boolean isSegmentConsuming = existingServersToConsumingSegmentMap != 
null && entrySet.getValue()
+          .values()
+          .stream()
+          .allMatch(state -> state.equals(SegmentStateModel.CONSUMING));

Review Comment:
   I looked into this a bit more today for another change I'm making. Based on 
my understanding, I think the logic should be:
   - If any matches ONLINE, skip this segment (as it is an ONLINE segment)
   - otherwise, if any matches CONSUMING, pick this segment
   
   The above logic handles the corner case where nothing is ONLINE, but some 
replica is OFFLINE/ERROR state



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -140,10 +156,20 @@ public TableRebalancer(HelixManager helixManager, 
@Nullable TableRebalanceObserv
     _controllerMetrics = controllerMetrics;
     _rebalancePreChecker = rebalancePreChecker;
     _tableSizeReader = tableSizeReader;
+    _executorService = executorService;
+    _connectionManager = connectionManager;
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+  }
+
+  public TableRebalancer(HelixManager helixManager, @Nullable 
TableRebalanceObserver tableRebalanceObserver,

Review Comment:
   nit: is this constructor really needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to