This is an automated email from the ASF dual-hosted git repository.

somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ea60ecf9df Fix up rebalance batching partitionId calculation to 
account for COMPLETED segment partitioning differently as done in 
RealtimeSegmentAssignment (#15822)
ea60ecf9df is described below

commit ea60ecf9df2e4b202ab42853a457314cfca5796d
Author: Sonam Mandal <sonam.man...@startree.ai>
AuthorDate: Mon May 19 15:08:47 2025 -0700

    Fix up rebalance batching partitionId calculation to account for COMPLETED 
segment partitioning differently as done in RealtimeSegmentAssignment (#15822)
    
    * Fix up rebalance batching partitionId calculation to account for 
COMPLETED segment partitioning differently as done in RealtimeSegmentAssignment
    
    * Fix comments
---
 .../helix/core/rebalance/TableRebalancer.java      | 43 +++++++++++++++-------
 .../helix/core/rebalance/TableRebalancerTest.java  |  4 +-
 2 files changed, 32 insertions(+), 15 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index eb3f12d13d..f4e964ceb5 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -1756,9 +1756,12 @@ public class TableRebalancer {
     for (Map.Entry<String, Map<String, String>> assignment : 
currentAssignment.entrySet()) {
       String segmentName = assignment.getKey();
       Map<String, String> instanceStateMap = assignment.getValue();
+      Collection<String> segmentStates = instanceStateMap.values();
 
+      boolean isConsuming = segmentStates.stream().noneMatch(state -> 
state.equals(SegmentStateModel.ONLINE))
+          && segmentStates.stream().anyMatch(state -> 
state.equals(SegmentStateModel.CONSUMING));
       int partitionId =
-          segmentPartitionIdMap.computeIfAbsent(segmentName, v -> 
partitionIdFetcher.fetch(segmentName));
+          segmentPartitionIdMap.computeIfAbsent(segmentName, v -> 
partitionIdFetcher.fetch(segmentName, isConsuming));
       Set<String> assignedInstances = instanceStateMap.keySet();
       
partitionIdToAssignedInstancesToCurrentAssignmentMap.computeIfAbsent(partitionId,
 k -> new HashMap<>())
           .computeIfAbsent(assignedInstances, k -> new 
TreeMap<>()).put(segmentName, instanceStateMap);
@@ -1770,7 +1773,7 @@ public class TableRebalancer {
   @VisibleForTesting
   @FunctionalInterface
   interface PartitionIdFetcher {
-    int fetch(String segmentName);
+    int fetch(String segmentName, boolean isConsuming);
   }
 
   private static class PartitionIdFetcherImpl implements PartitionIdFetcher {
@@ -1791,7 +1794,7 @@ public class TableRebalancer {
     }
 
     @Override
-    public int fetch(String segmentName) {
+    public int fetch(String segmentName, boolean isConsuming) {
       Integer partitionId;
       if (_isStrictRealtimeSegmentAssignment) {
         // This is how partitionId is calculated for 
StrictRealtimeSegmentAssignment. Here partitionId is mandatory
@@ -1801,8 +1804,8 @@ public class TableRebalancer {
         Preconditions.checkState(partitionId != null, "Failed to find 
partition id for segment: %s of table: %s",
             segmentName, _tableNameWithType);
       } else {
-        boolean isOfflineTable = 
TableNameBuilder.getTableTypeFromTableName(_tableNameWithType) == 
TableType.OFFLINE;
-        if (isOfflineTable) {
+        TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(_tableNameWithType);
+        if (tableType == TableType.OFFLINE) {
           InstancePartitions instancePartitions = 
_instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
           assert instancePartitions != null;
           if (_partitionColumn == null || 
instancePartitions.getNumPartitions() == 1) {
@@ -1810,18 +1813,32 @@ public class TableRebalancer {
             // rebalance without batching
             partitionId = 0;
           } else {
-            // This how partitionId is calculated for OFFLINE tables
+            // This is how partitionId is calculated for OFFLINE tables
             partitionId = 
SegmentAssignmentUtils.getOfflineSegmentPartitionId(segmentName, 
_tableNameWithType,
                 _helixManager, _partitionColumn);
           }
         } else {
-          // This how partitionId is calculated for CONSUMING segments in 
RealtimeSegmentAssignment
-          // TODO: Add handling for COMPLETED segments if in the future this 
is allowed for StrictReplicaGroup and
-          //       the partitionId calculation differs from the CONSUMING 
segments. For StrictRealtimeSegmentAssignment
-          //       the partitionId is mandated today. If this mandate is 
maintained then there may be no need to add
-          //       special handling for COMPLETED segments after all
-          partitionId = 
SegmentAssignmentUtils.getRealtimeSegmentPartitionId(segmentName, 
_tableNameWithType,
-              _helixManager, _partitionColumn);
+          if (isConsuming || 
!_instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED)) {
+            // This is how partitionId is calculated for CONSUMING segments 
and ONLINE segments without COMPLETED
+            // instance partitions in RealtimeSegmentAssignment
+            partitionId = 
SegmentAssignmentUtils.getRealtimeSegmentPartitionId(segmentName, 
_tableNameWithType,
+                _helixManager, _partitionColumn);
+          } else {
+            // This is how partitionId is calculated for ONLINE segments when 
COMPLETED instance partitions exist
+            // in RealtimeSegmentAssignment
+            InstancePartitions instancePartitions = 
_instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
+            assert instancePartitions != null;
+            if (_partitionColumn == null || 
instancePartitions.getNumPartitions() == 1) {
+              // Fallback to partitionId 0, in this case batching will not be 
possible so we will fall back to a full
+              // rebalance without batching
+              partitionId = 0;
+            } else {
+              // This is how partitionId is calculated for REALTIME tables if 
a partition column exists and if the
+              // COMPLETED instance partitions has more than 1 partition
+              partitionId = 
SegmentAssignmentUtils.getRealtimeSegmentPartitionId(segmentName, 
_tableNameWithType,
+                  _helixManager, _partitionColumn);
+            }
+          }
         }
       }
       return partitionId;
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
index 2af9d10fb9..efc0d50556 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
@@ -44,8 +44,8 @@ import static org.testng.Assert.fail;
 
 public class TableRebalancerTest {
 
-  private static final TableRebalancer.PartitionIdFetcher 
DUMMY_PARTITION_FETCHER = segmentName -> 0;
-  private static final TableRebalancer.PartitionIdFetcher 
SIMPLE_PARTITION_FETCHER = segmentName -> {
+  private static final TableRebalancer.PartitionIdFetcher 
DUMMY_PARTITION_FETCHER = (segmentName, isConsuming) -> 0;
+  private static final TableRebalancer.PartitionIdFetcher 
SIMPLE_PARTITION_FETCHER = (segmentName, isConsuming) -> {
     LLCSegmentName name = LLCSegmentName.of(segmentName);
     return name == null ? -1 : name.getPartitionGroupId();
   };


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to