somandal commented on code in PR #15368: URL: https://github.com/apache/pinot/pull/15368#discussion_r2021740211
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -120,16 +130,22 @@ */ public class TableRebalancer { private static final Logger LOGGER = LoggerFactory.getLogger(TableRebalancer.class); + private static final int TOP_N_IN_CONSUMING_SEGMENT_SUMMARY = 10; private final HelixManager _helixManager; private final HelixDataAccessor _helixDataAccessor; private final TableRebalanceObserver _tableRebalanceObserver; private final ControllerMetrics _controllerMetrics; private final RebalancePreChecker _rebalancePreChecker; private final TableSizeReader _tableSizeReader; + private final ExecutorService _executorService; + private final HttpClientConnectionManager _connectionManager; + private final PinotHelixResourceManager _pinotHelixResourceManager; public TableRebalancer(HelixManager helixManager, @Nullable TableRebalanceObserver tableRebalanceObserver, @Nullable ControllerMetrics controllerMetrics, @Nullable RebalancePreChecker rebalancePreChecker, - @Nullable TableSizeReader tableSizeReader) { + @Nullable TableSizeReader tableSizeReader, @Nullable ExecutorService executorService, Review Comment: nit: Since we do pass in `PinotHelixResourceManager` now, can we get the `TableSizeReader` from that directly rather than passing it in separately? Similarly check if any of the other parameters here can be fetched from `PinotHelixResourceManager` directly, just to keep the parameters passed in cleaner? ########## pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java: ########## @@ -1570,6 +1580,269 @@ public void testRebalanceWithMinimizeDataMovementInstanceAssignments() } } + @Test + public void testRebalanceConsumingSegmentSummary() Review Comment: For tests, is it possible to update `OfflineClusterIntegrationTest::testRebalanceDryRunSummary()` with asserts that the consuming segment summary is null? just to ensure we have that covered for OFFLINE tables? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -627,22 +653,42 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, TableConfig tableConfig) { LOGGER.info("Calculating rebalance summary for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); + boolean isOfflineTable = TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == TableType.OFFLINE; int existingReplicationFactor = 0; int newReplicationFactor = 0; Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>(); Map<String, Set<String>> newServersToSegmentMap = new HashMap<>(); + Map<String, Set<String>> existingServersToConsumingSegmentMap = isOfflineTable ? null : new HashMap<>(); + Map<String, Set<String>> newServersToConsumingSegmentMap = isOfflineTable ? null : new HashMap<>(); for (Map.Entry<String, Map<String, String>> entrySet : currentAssignment.entrySet()) { existingReplicationFactor = entrySet.getValue().size(); - for (String segmentKey : entrySet.getValue().keySet()) { - existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new HashSet<>()).add(entrySet.getKey()); + String segmentName = entrySet.getKey(); + boolean isSegmentConsuming = existingServersToConsumingSegmentMap != null && entrySet.getValue() + .values() + .stream() + .allMatch(state -> state.equals(SegmentStateModel.CONSUMING)); Review Comment: I looked into this a bit more today for another change I'm making. Based on my understanding, I think the logic should be: - If any matches ONLINE, skip this segment (as it is an ONLINE segment) - otherwise, if any matches CONSUMING, pick this segment The above logic handles the corner case where nothing is ONLINE, but some replica is OFFLINE/ERROR state ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -140,10 +156,20 @@ public TableRebalancer(HelixManager helixManager, @Nullable TableRebalanceObserv _controllerMetrics = controllerMetrics; _rebalancePreChecker = rebalancePreChecker; _tableSizeReader = tableSizeReader; + _executorService = executorService; + _connectionManager = connectionManager; + _pinotHelixResourceManager = pinotHelixResourceManager; + } + + public TableRebalancer(HelixManager helixManager, @Nullable TableRebalanceObserver tableRebalanceObserver, Review Comment: nit: is this constructor really needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org