This is an automated email from the ASF dual-hosted git repository.

somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f38fd7cd7 Modify strictReplicaGroup rebalance batching to categorize 
on current+target instances to partitionId to currentAssignment (#15838)
0f38fd7cd7 is described below

commit 0f38fd7cd73198024b79d334b35a9829d68bfd3f
Author: Sonam Mandal <sonam.man...@startree.ai>
AuthorDate: Tue May 20 12:27:45 2025 -0700

    Modify strictReplicaGroup rebalance batching to categorize on 
current+target instances to partitionId to currentAssignment (#15838)
    
    * Fix strictReplicaGroup rebalance batching to categorize on current+target 
instances to partitionId to currentAssignment
    
    * Address review comment: short-circuit return when rebalance batching is 
disabled
---
 .../helix/core/rebalance/TableRebalancer.java      | 156 ++++++++-----------
 .../helix/core/rebalance/TableRebalancerTest.java  | 167 ++++++++++++++++-----
 .../tests/TableRebalanceIntegrationTest.java       |   1 +
 3 files changed, 199 insertions(+), 125 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index f4e964ceb5..a89f62b41e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -1581,84 +1581,57 @@ public class TableRebalancer {
       Logger tableRebalanceLogger) {
     Map<String, Map<String, String>> nextAssignment = new TreeMap<>();
     Map<String, Integer> numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
-    Map<Integer, Map<Set<String>, Map<String, Map<String, String>>>>
-        partitionIdToAssignedInstancesToCurrentAssignmentMap;
-    if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
-      // Don't calculate the partition id to assigned instances to current 
assignment mapping if batching is disabled
-      // since we want to update the next assignment based on all partitions 
in this case. Use partitionId as 0
-      // and a dummy set for the assigned instances.
-      partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>();
-      partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new 
HashMap<>());
-      
partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), 
currentAssignment);
-    } else {
-      partitionIdToAssignedInstancesToCurrentAssignmentMap =
-          
getPartitionIdToAssignedInstancesToCurrentAssignmentMap(currentAssignment, 
segmentPartitionIdMap,
-              partitionIdFetcher);
-    }
     Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new 
HashMap<>();
     Map<Set<String>, Set<String>> availableInstancesMap = new HashMap<>();
-
     Map<String, Integer> serverToNumSegmentsAddedSoFar = new HashMap<>();
-    for (Map<Set<String>, Map<String, Map<String, String>>> 
assignedInstancesToCurrentAssignment
-        : partitionIdToAssignedInstancesToCurrentAssignmentMap.values()) {
-      boolean anyServerExhaustedBatchSize = false;
-      if (batchSizePerServer != RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) 
{
-        // The number of segments for a given partition, accumulates as we 
iterate over the assigned instances
-        Map<String, Integer> serverToNumSegmentsToBeAddedForPartitionMap = new 
HashMap<>();
-
-        // Check if the servers of the first assignment for each unique set of 
assigned instances has any space left
-        // to move this partition. If so, let's mark the partitions as to be 
moved, otherwise we mark the partition
-        // as a whole as not moveable.
-        for (Map<String, Map<String, String>> curAssignment : 
assignedInstancesToCurrentAssignment.values()) {
-          Map.Entry<String, Map<String, String>> firstEntry = 
curAssignment.entrySet().iterator().next();
-          // It is enough to check for whether any server for one segment is 
above the limit or not since all segments
-          // in curAssignment will have the same assigned instances list
-          Map<String, String> firstEntryInstanceStateMap = 
firstEntry.getValue();
-          SingleSegmentAssignment firstAssignment =
-              getNextSingleSegmentAssignment(firstEntryInstanceStateMap, 
targetAssignment.get(firstEntry.getKey()),
-                  minAvailableReplicas, lowDiskMode, numSegmentsToOffloadMap, 
assignmentMap);
-          Set<String> serversAdded = 
getServersAddedInSingleSegmentAssignment(firstEntryInstanceStateMap,
-              firstAssignment._instanceStateMap);
-          for (String server : serversAdded) {
-            // Case I: We already exceeded the batchSizePerServer for this 
server, cannot add any more segments
-            if (serverToNumSegmentsAddedSoFar.getOrDefault(server, 0) >= 
batchSizePerServer) {
-              anyServerExhaustedBatchSize = true;
-              break;
-            }
 
-            // All segments assigned to the current instances will be moved, 
so track segments to be added for the given
-            // server based on this
-            serverToNumSegmentsToBeAddedForPartitionMap.put(server,
-                
serverToNumSegmentsToBeAddedForPartitionMap.getOrDefault(server, 0) + 
curAssignment.size());
-          }
-          if (anyServerExhaustedBatchSize) {
+    if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+      // Directly update the nextAssignment with anyServerExhaustedBatchSize = 
false and return if batching is disabled
+      updateNextAssignmentForPartitionIdStrictReplicaGroup(currentAssignment, 
targetAssignment, nextAssignment,
+          false, minAvailableReplicas, lowDiskMode, numSegmentsToOffloadMap, 
assignmentMap,
+          availableInstancesMap, serverToNumSegmentsAddedSoFar);
+      return nextAssignment;
+    }
+
+    // Batching is enabled, calculate the Pair(current instances, target 
instances) -> partitionId -> currentAssignment
+    Map<Pair<Set<String>, Set<String>>, Map<Integer, Map<String, Map<String, 
String>>>>
+        currentAndTargetInstancesToPartitionIdToCurrentAssignmentMap =
+        
getCurrentAndTargetInstancesToPartitionIdToCurrentAssignmentMap(currentAssignment,
 targetAssignment,
+            segmentPartitionIdMap, partitionIdFetcher);
+
+    // Iterating over the unique pairs of current and target instances
+    for (Map<Integer, Map<String, Map<String, String>>> 
partitionIdToCurrentAssignment
+        : 
currentAndTargetInstancesToPartitionIdToCurrentAssignmentMap.values()) {
+      // Check if the servers of the first assignment for each unique 
partition has any space left to move the
+      // segments assigned to the partition and unique assigned instances as a 
whole. If so, let's mark the partitions
+      // as to be moved, otherwise we mark the partition as a whole as not 
moveable.
+      // Iterating over the partitionIds with the same unique pair of current 
and assigned instances
+      for (Map<String, Map<String, String>> curAssignment : 
partitionIdToCurrentAssignment.values()) {
+        Map.Entry<String, Map<String, String>> firstEntry = 
curAssignment.entrySet().iterator().next();
+        // It is enough to check for whether any server for one segment is 
above the limit or not since all segments
+        // in curAssignment will have the same current and target instances 
and same partitionId
+        Map<String, String> firstEntryInstanceStateMap = firstEntry.getValue();
+        SingleSegmentAssignment firstAssignment =
+            getNextSingleSegmentAssignment(firstEntryInstanceStateMap, 
targetAssignment.get(firstEntry.getKey()),
+                minAvailableReplicas, lowDiskMode, numSegmentsToOffloadMap, 
assignmentMap);
+        Set<String> serversAdded = 
getServersAddedInSingleSegmentAssignment(firstEntryInstanceStateMap,
+            firstAssignment._instanceStateMap);
+        boolean anyServerExhaustedBatchSize = false;
+        for (String server : serversAdded) {
+          int segmentsAddedToServerSoFar = 
serverToNumSegmentsAddedSoFar.getOrDefault(server, 0);
+          // Case I: We already exceeded the batchSizePerServer for this 
server, cannot add any more segments
+          // Case II: We have not yet exceeded the batchSizePerServer for this 
server, but we don't have sufficient
+          // space to host the segments for this assignment on the server, and 
we have allocated some partitions so
+          // far. If the batchSizePerServer is less than the number of 
segments in a given partitionId, we must host
+          // at least 1 partition and exceed the batchSizePerServer to ensure 
progress is made. Thus, performing this
+          // check only if segmentsAddedToServerSoFar > 0 is necessary.
+          if ((segmentsAddedToServerSoFar >= batchSizePerServer)
+              || (segmentsAddedToServerSoFar > 0
+              && (segmentsAddedToServerSoFar + curAssignment.size()) > 
batchSizePerServer)) {
+            anyServerExhaustedBatchSize = true;
             break;
           }
         }
-
-        // Case II: We have not yet exceeded the batchSizePerServer for any 
server, but we don't have sufficient
-        // space to host the segments for this assignment on some server, and 
we have allocated some partitions so
-        // far. If the batchSizePerServer is less than the number of segments 
in a given partitionId, we must host
-        // at least 1 partition and exceed the batchSizePerServer to ensure 
progress is made. Thus, performing this
-        // check only if segmentsAddedToServerSoFar > 0 is necessary.
-        if (!anyServerExhaustedBatchSize) {
-          for (Map.Entry<String, Integer> serverToNumSegmentsToAdd
-              : serverToNumSegmentsToBeAddedForPartitionMap.entrySet()) {
-            int segmentsAddedToServerSoFar =
-                
serverToNumSegmentsAddedSoFar.getOrDefault(serverToNumSegmentsToAdd.getKey(), 
0);
-            if (segmentsAddedToServerSoFar > 0
-                && (segmentsAddedToServerSoFar + 
serverToNumSegmentsToAdd.getValue()) > batchSizePerServer) {
-              anyServerExhaustedBatchSize = true;
-              break;
-            }
-          }
-        }
-      }
-      // TODO: Consider whether we should process the nextAssignment for each 
unique assigned instances rather than the
-      //       full partition to get a more granular number of segment moves 
in each step. For now since we expect
-      //       strict replica groups to mostly be used for tables like upserts 
which require a full partition to be
-      //       moved, we move a full partition at a time.
-      for (Map<String, Map<String, String>> curAssignment : 
assignedInstancesToCurrentAssignment.values()) {
         updateNextAssignmentForPartitionIdStrictReplicaGroup(curAssignment, 
targetAssignment, nextAssignment,
             anyServerExhaustedBatchSize, minAvailableReplicas, lowDiskMode, 
numSegmentsToOffloadMap, assignmentMap,
             availableInstancesMap, serverToNumSegmentsAddedSoFar);
@@ -1728,8 +1701,7 @@ public class TableRebalancer {
       Map<String, Integer> serverToNumSegmentsAddedSoFar, Logger 
tableRebalanceLogger) {
     int maxSegmentsAddedToAnyServer = serverToNumSegmentsAddedSoFar.isEmpty() 
? 0
         : Collections.max(serverToNumSegmentsAddedSoFar.values());
-    if (batchSizePerServer != RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER
-        && maxSegmentsAddedToAnyServer > batchSizePerServer) {
+    if (maxSegmentsAddedToAnyServer > batchSizePerServer) {
       tableRebalanceLogger.warn("Found at least one server with {} segments 
added which is larger than "
           + "batchSizePerServer: {}. This is expected for strictReplicaGroup 
based assignment that needs to move a "
           + "full partition to maintain consistency for queries.", 
maxSegmentsAddedToAnyServer, batchSizePerServer);
@@ -1737,37 +1709,41 @@ public class TableRebalancer {
   }
 
   /**
-   * Create a mapping of partitionId to the mapping of assigned instances to 
the current assignment of segments that
-   * belong to that partitionId and assigned instances. This is to be used for 
batching purposes for StrictReplicaGroup
-   * routing, for all segment assignment types: RealtimeSegmentAssignment, 
StrictRealtimeSegmentAssignment and
-   * OfflineSegmentAssignment
+   * Create a mapping of Pair(currentInstances, targetInstances) to 
partitionId to the current assignment of segments.
+   * This is to be used for batching purposes for StrictReplicaGroup routing, 
for all segment assignment types:
+   * RealtimeSegmentAssignment, StrictRealtimeSegmentAssignment and 
OfflineSegmentAssignment
    * @param currentAssignment the current assignment
+   * @param targetAssignment the target assignment
    * @param segmentPartitionIdMap cache to store the partition ids to avoid 
fetching ZK segment metadata
    * @param partitionIdFetcher function to fetch the partition id
-   * @return a mapping from partitionId to the assigned instances to the 
segment assignment map of all segments that
-   *         map to that partitionId and assigned instances
+   * @return a mapping from Pair(currentInstances, targetInstances) to the 
partitionId to the segment assignment map of
+   *         all segments that fall in that category
    */
-  private static Map<Integer, Map<Set<String>, Map<String, Map<String, 
String>>>>
-  getPartitionIdToAssignedInstancesToCurrentAssignmentMap(Map<String, 
Map<String, String>> currentAssignment,
-      Object2IntOpenHashMap<String> segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher) {
-    Map<Integer, Map<Set<String>, Map<String, Map<String, String>>>>
-        partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>();
+  private static Map<Pair<Set<String>, Set<String>>, Map<Integer, Map<String, 
Map<String, String>>>>
+  getCurrentAndTargetInstancesToPartitionIdToCurrentAssignmentMap(Map<String, 
Map<String, String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment, 
Object2IntOpenHashMap<String> segmentPartitionIdMap,
+      PartitionIdFetcher partitionIdFetcher) {
+    Map<Pair<Set<String>, Set<String>>, Map<Integer, Map<String, Map<String, 
String>>>>
+        currentAndTargetInstancesToPartitionIdToCurrentAssignmentMap = new 
HashMap<>();
 
     for (Map.Entry<String, Map<String, String>> assignment : 
currentAssignment.entrySet()) {
       String segmentName = assignment.getKey();
-      Map<String, String> instanceStateMap = assignment.getValue();
-      Collection<String> segmentStates = instanceStateMap.values();
+      Map<String, String> currentInstanceStateMap = assignment.getValue();
+      Map<String, String> targetInstanceStateMap = 
targetAssignment.get(segmentName);
 
+      Collection<String> segmentStates = currentInstanceStateMap.values();
       boolean isConsuming = segmentStates.stream().noneMatch(state -> 
state.equals(SegmentStateModel.ONLINE))
           && segmentStates.stream().anyMatch(state -> 
state.equals(SegmentStateModel.CONSUMING));
       int partitionId =
           segmentPartitionIdMap.computeIfAbsent(segmentName, v -> 
partitionIdFetcher.fetch(segmentName, isConsuming));
-      Set<String> assignedInstances = instanceStateMap.keySet();
-      
partitionIdToAssignedInstancesToCurrentAssignmentMap.computeIfAbsent(partitionId,
 k -> new HashMap<>())
-          .computeIfAbsent(assignedInstances, k -> new 
TreeMap<>()).put(segmentName, instanceStateMap);
+      Pair<Set<String>, Set<String>> currentAndTargetInstances =
+          Pair.of(currentInstanceStateMap.keySet(), 
targetInstanceStateMap.keySet());
+      currentAndTargetInstancesToPartitionIdToCurrentAssignmentMap
+          .computeIfAbsent(currentAndTargetInstances, k -> new TreeMap<>())
+          .computeIfAbsent(partitionId, k -> new TreeMap<>()).put(segmentName, 
currentInstanceStateMap);
     }
 
-    return partitionIdToAssignedInstancesToCurrentAssignmentMap;
+    return currentAndTargetInstancesToPartitionIdToCurrentAssignmentMap;
   }
 
   @VisibleForTesting
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
index efc0d50556..0f184908a1 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
@@ -1654,7 +1654,8 @@ public class TableRebalancerTest {
     // with batching with batchSizePerServer = 1:
     //
     // The first assignment will move "segment1" and "segment2" by one host at 
a time, and since the other segments
-    // if added will go beyond batchSizePerServer, they'll be picked up on the 
next two assignments (host4 and host2):
+    // if added will go beyond batchSizePerServer, they'll be picked up on the 
next two-three assignments
+    // (host4 and host2):
     // {
     //   "segment__1__0__98347869999L": {
     //     "host1": "ONLINE",
@@ -1677,7 +1678,7 @@ public class TableRebalancerTest {
     //     "host4": "ONLINE"
     //   }
     // }
-    // Second Assignment (host6, host1, host4, and host5 get 1 segment each):
+    // Second Assignment (host6, host1, host4, and host5 get 1 segment each) - 
non-strictReplicaGroup:
     // {
     //   "segment__1__0__98347869999L": {
     //     "host2": "ONLINE",
@@ -1700,8 +1701,55 @@ public class TableRebalancerTest {
     //     "host5": "ONLINE"
     //   }
     // }
+    // Second Assignment (host1, host6, and host5 get 1 segment each) - 
strictReplicaGroup:
+    // {
+    //   "segment__1__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment__2__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host5": "ONLINE"
+    //   },
+    //   "segment__3__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host6": "ONLINE"
+    //   },
+    //   "segment__4__0__98347869999L": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host5": "ONLINE"
+    //   }
+    // }
     //
-    // The third assignment should reach the target assignment
+    // The third assignment should reach the target assignment for 
non-strictReplicaGroup, and the fourth for
+    // strictReplicaGroup
+    // Third Assignment (host1 and host4 gets 1 segment each) - 
strictReplicaGroup:
+    // {
+    //   "segment__1__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment__2__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host5": "ONLINE"
+    //   },
+    //   "segment__3__0__98347869999L": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   },
+    //   "segment__4__0__98347869999L": {
+    //     "host1": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host5": "ONLINE"
+    //   }
+    // }
     for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) {
       Map<String, Map<String, String>> nextAssignment =
           TableRebalancer.getNextAssignment(currentAssignment, 
targetAssignment, 2, enableStrictReplicaGroup, false,
@@ -1718,18 +1766,42 @@ public class TableRebalancerTest {
       nextAssignment =
           TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 
2, enableStrictReplicaGroup, false,
               1, new Object2IntOpenHashMap<>(), SIMPLE_PARTITION_FETCHER);
-      assertEquals(nextAssignment.get("segment__1__0__98347869999L").keySet(),
-          new TreeSet<>(Arrays.asList("host2", "host4", "host6")));
-      assertEquals(nextAssignment.get("segment__2__0__98347869999L").keySet(),
-          new TreeSet<>(Arrays.asList("host1", "host4", "host5")));
-      assertEquals(nextAssignment.get("segment__3__0__98347869999L").keySet(),
-          new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
-      assertEquals(nextAssignment.get("segment__4__0__98347869999L").keySet(),
-          new TreeSet<>(Arrays.asList("host2", "host4", "host5")));
-
-      nextAssignment =
-          TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 
2, enableStrictReplicaGroup, false,
-              1, new Object2IntOpenHashMap<>(), SIMPLE_PARTITION_FETCHER);
+      if (!enableStrictReplicaGroup) {
+        
assertEquals(nextAssignment.get("segment__1__0__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host2", "host4", "host6")));
+        
assertEquals(nextAssignment.get("segment__2__0__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host1", "host4", "host5")));
+        
assertEquals(nextAssignment.get("segment__3__0__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+        
assertEquals(nextAssignment.get("segment__4__0__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host2", "host4", "host5")));
+        nextAssignment =
+            TableRebalancer.getNextAssignment(nextAssignment, 
targetAssignment, 2, enableStrictReplicaGroup, false,
+                1, new Object2IntOpenHashMap<>(), SIMPLE_PARTITION_FETCHER);
+      } else {
+        
assertEquals(nextAssignment.get("segment__1__0__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+        
assertEquals(nextAssignment.get("segment__2__0__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host1", "host4", "host5")));
+        
assertEquals(nextAssignment.get("segment__3__0__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host1", "host2", "host6")));
+        
assertEquals(nextAssignment.get("segment__4__0__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host2", "host4", "host5")));
+        nextAssignment =
+            TableRebalancer.getNextAssignment(nextAssignment, 
targetAssignment, 2, enableStrictReplicaGroup, false,
+                1, new Object2IntOpenHashMap<>(), SIMPLE_PARTITION_FETCHER);
+        
assertEquals(nextAssignment.get("segment__1__0__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+        
assertEquals(nextAssignment.get("segment__2__0__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host1", "host4", "host5")));
+        
assertEquals(nextAssignment.get("segment__3__0__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host2", "host4", "host6")));
+        
assertEquals(nextAssignment.get("segment__4__0__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host1", "host4", "host5")));
+        nextAssignment =
+            TableRebalancer.getNextAssignment(nextAssignment, 
targetAssignment, 2, enableStrictReplicaGroup, false,
+                1, new Object2IntOpenHashMap<>(), SIMPLE_PARTITION_FETCHER);
+      }
       assertEquals(nextAssignment, targetAssignment);
     }
 
@@ -1790,27 +1862,27 @@ public class TableRebalancerTest {
     // Next assignment with 2 minimum available replicas with strict 
replica-group should finish in 2 steps even with
     // batchSizePerServer = 2:
     //
-    // The first assignment will bring "segment1" and "segment3" to the target 
state. It cannot bring "segment2" and
-    // "segment4" to the target state because "host1" and "host4" might be 
unavailable for strict replica-group routing,
+    // The first assignment will bring "segment2" and "segment4" to the target 
state. It cannot bring "segment1" and
+    // "segment3" to the target state because "host1" and "host4" might be 
unavailable for strict replica-group routing,
     // which breaks the minimum available replicas requirement:
     // {
     //   "segment__1__0__98347869999L": {
     //     "host1": "ONLINE",
-    //     "host3": "ONLINE",
-    //     "host4": "ONLINE"
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE"
     //   },
     //   "segment__2__0__98347869999L": {
-    //     "host2": "ONLINE",
+    //     "host1": "ONLINE",
     //     "host3": "ONLINE",
     //     "host4": "ONLINE"
     //   },
     //   "segment__3__0__98347869999L": {
     //     "host1": "ONLINE",
-    //     "host3": "ONLINE",
-    //     "host4": "ONLINE"
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE"
     //   },
     //   "segment__4__0__98347869999L": {
-    //     "host2": "ONLINE",
+    //     "host1": "ONLINE",
     //     "host3": "ONLINE",
     //     "host4": "ONLINE"
     //   }
@@ -1820,13 +1892,13 @@ public class TableRebalancerTest {
     nextAssignment = TableRebalancer.getNextAssignment(currentAssignment, 
targetAssignment, 2, true, false,
         2, new Object2IntOpenHashMap<>(), SIMPLE_PARTITION_FETCHER);
     assertEquals(nextAssignment.get("segment__1__0__98347869999L").keySet(),
-        new TreeSet<>(Arrays.asList("host1", "host3", "host4")));
+        new TreeSet<>(Arrays.asList("host1", "host2", "host3")));
     assertEquals(nextAssignment.get("segment__2__0__98347869999L").keySet(),
-        new TreeSet<>(Arrays.asList("host2", "host3", "host4")));
-    assertEquals(nextAssignment.get("segment__3__0__98347869999L").keySet(),
         new TreeSet<>(Arrays.asList("host1", "host3", "host4")));
+    assertEquals(nextAssignment.get("segment__3__0__98347869999L").keySet(),
+        new TreeSet<>(Arrays.asList("host1", "host2", "host3")));
     assertEquals(nextAssignment.get("segment__4__0__98347869999L").keySet(),
-        new TreeSet<>(Arrays.asList("host2", "host3", "host4")));
+        new TreeSet<>(Arrays.asList("host1", "host3", "host4")));
     nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, 
targetAssignment, 2, true, false,
         2, new Object2IntOpenHashMap<>(), SIMPLE_PARTITION_FETCHER);
     assertEquals(nextAssignment, targetAssignment);
@@ -1920,7 +1992,8 @@ public class TableRebalancerTest {
 
     // Next assignment with 2 minimum available replicas without strict 
replica-group should reach the target
     // assignment after two steps. With strict replica groups it should reach 
the target assignment immediately since
-    // the full partition must be selected for movement. Batch size = 1, 
unique partitionIds
+    // the full partition must be selected for movement for a given 
Pair(currentInstance, targetInstances).
+    // Batch size = 1, unique partitionIds
     for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) {
       Object2IntOpenHashMap<String> segmentToPartitionIdMap = new 
Object2IntOpenHashMap<>();
       nextAssignment =
@@ -1998,7 +2071,8 @@ public class TableRebalancerTest {
 
     // Next assignment with 2 minimum available replicas without strict 
replica-group should reach the target
     // assignment after three steps. With strict replica groups it should 
reach the target assignment in two steps since
-    // the full partition must be selected for movement. Batch size = 1, 
unique partitionIds
+    // the full partition must be selected for movement for a given 
Pair(currentInstance, targetInstances).
+    // Batch size = 1, unique partitionIds
     for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) {
       Object2IntOpenHashMap<String> segmentToPartitionIdMap = new 
Object2IntOpenHashMap<>();
       nextAssignment =
@@ -2165,7 +2239,8 @@ public class TableRebalancerTest {
 
     // Next assignment with 2 minimum available replicas without strict 
replica-group should reach the target
     // assignment after four steps. With strict replica groups it should reach 
the target assignment in two steps since
-    // the full partition must be selected for movement. Batch size = 1, 
unique partitionIds
+    // the full partition must be selected for movement for a given 
Pair(currentInstance, targetInstances).
+    // Batch size = 1, unique partitionIds
     for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) {
       Object2IntOpenHashMap<String> segmentToPartitionIdMap = new 
Object2IntOpenHashMap<>();
       nextAssignment =
@@ -2402,9 +2477,9 @@ public class TableRebalancerTest {
     assertEquals((int) numSegmentsToOffloadMap.get("host7"), 1);
 
     // Next assignment with 2 minimum available replicas without strict 
replica-group should reach the target
-    // assignment after three steps if strict replica group is disabled . With 
strict replica groups it should reach
-    // the target assignment in two steps since the full partition must be 
selected for movement.
-    // Batch size = 2, unique partitionIds
+    // assignment after three steps if strict replica group is enabled or 
disabled. Even for strictReplicaGroup,
+    // though the full Pair(currentAssignment, targetAssignment) + partitionId 
moves as a block, it can take longer
+    // to move since this can be a smaller granularity. Batch size = 2, unique 
partitionIds
     for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) {
       Object2IntOpenHashMap<String> segmentToPartitionIdMap = new 
Object2IntOpenHashMap<>();
       nextAssignment =
@@ -2457,6 +2532,28 @@ public class TableRebalancerTest {
                 2, segmentToPartitionIdMap, SIMPLE_PARTITION_FETCHER);
       } else {
         assertNotEquals(nextAssignment, targetAssignment);
+        
assertEquals(nextAssignment.get("segment__1__0__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host1", "host2", "host3")));
+        
assertEquals(nextAssignment.get("segment__1__1__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host1", "host2", "host3")));
+        
assertEquals(nextAssignment.get("segment__1__2__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host1", "host3", "host5")));
+        
assertEquals(nextAssignment.get("segment__2__0__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host2", "host3", "host4")));
+        
assertEquals(nextAssignment.get("segment__2__1__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host2", "host3", "host4")));
+        
assertEquals(nextAssignment.get("segment__2__2__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host2", "host4", "host6")));
+        
assertEquals(nextAssignment.get("segment__3__0__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host1", "host2", "host3")));
+        
assertEquals(nextAssignment.get("segment__3__1__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host1", "host2", "host3")));
+        
assertEquals(nextAssignment.get("segment__3__2__98347869999L").keySet(),
+            new TreeSet<>(Arrays.asList("host1", "host3", "host5")));
+        nextAssignment =
+            TableRebalancer.getNextAssignment(nextAssignment, 
targetAssignment, 2, enableStrictReplicaGroup, false,
+                2, segmentToPartitionIdMap, SIMPLE_PARTITION_FETCHER);
+        assertNotEquals(nextAssignment, targetAssignment);
         
assertEquals(nextAssignment.get("segment__1__0__98347869999L").keySet(),
             new TreeSet<>(Arrays.asList("host1", "host3", "host5")));
         
assertEquals(nextAssignment.get("segment__1__1__98347869999L").keySet(),
@@ -2474,7 +2571,7 @@ public class TableRebalancerTest {
         
assertEquals(nextAssignment.get("segment__3__1__98347869999L").keySet(),
             new TreeSet<>(Arrays.asList("host1", "host2", "host3")));
         
assertEquals(nextAssignment.get("segment__3__2__98347869999L").keySet(),
-            new TreeSet<>(Arrays.asList("host1", "host3", "host6")));
+            new TreeSet<>(Arrays.asList("host1", "host3", "host5")));
         nextAssignment =
             TableRebalancer.getNextAssignment(nextAssignment, 
targetAssignment, 2, enableStrictReplicaGroup, false,
                 2, segmentToPartitionIdMap, SIMPLE_PARTITION_FETCHER);
@@ -2535,7 +2632,7 @@ public class TableRebalancerTest {
         
assertEquals(nextAssignment.get("segment__3__1__98347869999L").keySet(),
             new TreeSet<>(Arrays.asList("host1", "host2", "host3")));
         
assertEquals(nextAssignment.get("segment__3__2__98347869999L").keySet(),
-            new TreeSet<>(Arrays.asList("host1", "host3", "host6")));
+            new TreeSet<>(Arrays.asList("host1", "host3", "host5")));
         nextAssignment =
             TableRebalancer.getNextAssignment(nextAssignment, 
targetAssignment, 2, enableStrictReplicaGroup, false,
                 2, segmentToPartitionIdMap, SIMPLE_PARTITION_FETCHER);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
index 69631e8fb1..424c0f24d8 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
@@ -73,6 +73,7 @@ public class TableRebalanceIntegrationTest extends 
HybridClusterIntegrationTest
         + "&bootstrap=" + rebalanceConfig.isBootstrap() + "&downtime=" + 
rebalanceConfig.isDowntime()
         + "&minAvailableReplicas=" + rebalanceConfig.getMinAvailableReplicas()
         + "&bestEfforts=" + rebalanceConfig.isBestEfforts()
+        + "&batchSizePerServer=" + rebalanceConfig.getBatchSizePerServer()
         + "&externalViewCheckIntervalInMs=" + 
rebalanceConfig.getExternalViewCheckIntervalInMs()
         + "&externalViewStabilizationTimeoutInMs=" + 
rebalanceConfig.getExternalViewStabilizationTimeoutInMs()
         + "&updateTargetTier=" + rebalanceConfig.isUpdateTargetTier()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to