This is an automated email from the ASF dual-hosted git repository. somandal pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ea60ecf9df Fix up rebalance batching partitionId calculation to account for COMPLETED segment partitioning differently as done in RealtimeSegmentAssignment (#15822) ea60ecf9df is described below commit ea60ecf9df2e4b202ab42853a457314cfca5796d Author: Sonam Mandal <sonam.man...@startree.ai> AuthorDate: Mon May 19 15:08:47 2025 -0700 Fix up rebalance batching partitionId calculation to account for COMPLETED segment partitioning differently as done in RealtimeSegmentAssignment (#15822) * Fix up rebalance batching partitionId calculation to account for COMPLETED segment partitioning differently as done in RealtimeSegmentAssignment * Fix comments --- .../helix/core/rebalance/TableRebalancer.java | 43 +++++++++++++++------- .../helix/core/rebalance/TableRebalancerTest.java | 4 +- 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index eb3f12d13d..f4e964ceb5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -1756,9 +1756,12 @@ public class TableRebalancer { for (Map.Entry<String, Map<String, String>> assignment : currentAssignment.entrySet()) { String segmentName = assignment.getKey(); Map<String, String> instanceStateMap = assignment.getValue(); + Collection<String> segmentStates = instanceStateMap.values(); + boolean isConsuming = segmentStates.stream().noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) + && segmentStates.stream().anyMatch(state -> state.equals(SegmentStateModel.CONSUMING)); int partitionId = - segmentPartitionIdMap.computeIfAbsent(segmentName, v -> partitionIdFetcher.fetch(segmentName)); + segmentPartitionIdMap.computeIfAbsent(segmentName, v -> partitionIdFetcher.fetch(segmentName, isConsuming)); Set<String> assignedInstances = instanceStateMap.keySet(); partitionIdToAssignedInstancesToCurrentAssignmentMap.computeIfAbsent(partitionId, k -> new HashMap<>()) .computeIfAbsent(assignedInstances, k -> new TreeMap<>()).put(segmentName, instanceStateMap); @@ -1770,7 +1773,7 @@ public class TableRebalancer { @VisibleForTesting @FunctionalInterface interface PartitionIdFetcher { - int fetch(String segmentName); + int fetch(String segmentName, boolean isConsuming); } private static class PartitionIdFetcherImpl implements PartitionIdFetcher { @@ -1791,7 +1794,7 @@ public class TableRebalancer { } @Override - public int fetch(String segmentName) { + public int fetch(String segmentName, boolean isConsuming) { Integer partitionId; if (_isStrictRealtimeSegmentAssignment) { // This is how partitionId is calculated for StrictRealtimeSegmentAssignment. Here partitionId is mandatory @@ -1801,8 +1804,8 @@ public class TableRebalancer { Preconditions.checkState(partitionId != null, "Failed to find partition id for segment: %s of table: %s", segmentName, _tableNameWithType); } else { - boolean isOfflineTable = TableNameBuilder.getTableTypeFromTableName(_tableNameWithType) == TableType.OFFLINE; - if (isOfflineTable) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(_tableNameWithType); + if (tableType == TableType.OFFLINE) { InstancePartitions instancePartitions = _instancePartitionsMap.get(InstancePartitionsType.OFFLINE); assert instancePartitions != null; if (_partitionColumn == null || instancePartitions.getNumPartitions() == 1) { @@ -1810,18 +1813,32 @@ public class TableRebalancer { // rebalance without batching partitionId = 0; } else { - // This how partitionId is calculated for OFFLINE tables + // This is how partitionId is calculated for OFFLINE tables partitionId = SegmentAssignmentUtils.getOfflineSegmentPartitionId(segmentName, _tableNameWithType, _helixManager, _partitionColumn); } } else { - // This how partitionId is calculated for CONSUMING segments in RealtimeSegmentAssignment - // TODO: Add handling for COMPLETED segments if in the future this is allowed for StrictReplicaGroup and - // the partitionId calculation differs from the CONSUMING segments. For StrictRealtimeSegmentAssignment - // the partitionId is mandated today. If this mandate is maintained then there may be no need to add - // special handling for COMPLETED segments after all - partitionId = SegmentAssignmentUtils.getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, - _helixManager, _partitionColumn); + if (isConsuming || !_instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED)) { + // This is how partitionId is calculated for CONSUMING segments and ONLINE segments without COMPLETED + // instance partitions in RealtimeSegmentAssignment + partitionId = SegmentAssignmentUtils.getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, + _helixManager, _partitionColumn); + } else { + // This is how partitionId is calculated for ONLINE segments when COMPLETED instance partitions exist + // in RealtimeSegmentAssignment + InstancePartitions instancePartitions = _instancePartitionsMap.get(InstancePartitionsType.COMPLETED); + assert instancePartitions != null; + if (_partitionColumn == null || instancePartitions.getNumPartitions() == 1) { + // Fallback to partitionId 0, in this case batching will not be possible so we will fall back to a full + // rebalance without batching + partitionId = 0; + } else { + // This is how partitionId is calculated for REALTIME tables if a partition column exists and if the + // COMPLETED instance partitions has more than 1 partition + partitionId = SegmentAssignmentUtils.getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, + _helixManager, _partitionColumn); + } + } } } return partitionId; diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java index 2af9d10fb9..efc0d50556 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java @@ -44,8 +44,8 @@ import static org.testng.Assert.fail; public class TableRebalancerTest { - private static final TableRebalancer.PartitionIdFetcher DUMMY_PARTITION_FETCHER = segmentName -> 0; - private static final TableRebalancer.PartitionIdFetcher SIMPLE_PARTITION_FETCHER = segmentName -> { + private static final TableRebalancer.PartitionIdFetcher DUMMY_PARTITION_FETCHER = (segmentName, isConsuming) -> 0; + private static final TableRebalancer.PartitionIdFetcher SIMPLE_PARTITION_FETCHER = (segmentName, isConsuming) -> { LLCSegmentName name = LLCSegmentName.of(segmentName); return name == null ? -1 : name.getPartitionGroupId(); }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org