somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2085595949


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
     }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map<String, Map<String, String>> getNextAssignment(Map<String, 
Map<String, String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+      boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap<String> segmentPartitionIdMap,
+      PartitionIdFetcher partitionIdFetcher) {
+    return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+        lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
    * Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
    * the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
    * the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
    * is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   * <p>
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, the instances assigned for the same partitionId 
can be different for different segments.
+   * For strict replica group routing with StrictRealtimeSegmentAssignment on 
the other hand, the assignment for a given
+   * partitionId will be the same across all segments. We can treat both cases 
similarly by creating a mapping from
+   * partitionId -> unique set of instance assignments -> currentAssignment. 
With StrictRealtimeSegmentAssignment,
+   * this map will have a single entry for 'unique set of instance 
assignments'.
+   * <p>
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *       this is not enforced in the code today. Once enforcement is added, 
then the routing side and assignment side
+   *       will be equivalent and all segments belonging to a given 
partitionId will be assigned to the same set of
+   *       instances. Special handling to check each group of assigned 
instances can be removed in that case.
    */
-  @VisibleForTesting
-  static Map<String, Map<String, String>> getNextAssignment(Map<String, 
Map<String, String>> currentAssignment,
+  private static Map<String, Map<String, String>> 
getNextAssignment(Map<String, Map<String, String>> currentAssignment,
       Map<String, Map<String, String>> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-      boolean lowDiskMode) {
-    return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-        minAvailableReplicas, lowDiskMode)
+      boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap<String> segmentPartitionIdMap,
+      PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
+    return enableStrictReplicaGroup
+        ? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+        batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
         : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-            lowDiskMode);
+            lowDiskMode, batchSizePerServer);
   }
 
   private static Map<String, Map<String, String>> 
getNextStrictReplicaGroupAssignment(
       Map<String, Map<String, String>> currentAssignment, Map<String, 
Map<String, String>> targetAssignment,
-      int minAvailableReplicas, boolean lowDiskMode) {
+      int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+      Object2IntOpenHashMap<String> segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+      Logger tableRebalanceLogger) {
     Map<String, Map<String, String>> nextAssignment = new TreeMap<>();
     Map<String, Integer> numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+    Map<Integer, Map<Set<String>, Map<String, Map<String, String>>>>
+        partitionIdToAssignedInstancesToCurrentAssignmentMap;
+    if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+      // Don't calculate the partition id to assigned instances to current 
assignment mapping if batching is disabled
+      // since we want to update the next assignment based on all partitions 
in this case. Use partitionId as 0
+      // and a dummy set for the assigned instances.
+      partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>();
+      partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new 
HashMap<>());
+      
partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), 
currentAssignment);
+    } else {
+      partitionIdToAssignedInstancesToCurrentAssignmentMap =
+          
getPartitionIdToAssignedInstancesToCurrentAssignmentMap(currentAssignment, 
segmentPartitionIdMap,
+              partitionIdFetcher);
+    }
     Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new 
HashMap<>();
     Map<Set<String>, Set<String>> availableInstancesMap = new HashMap<>();
-    for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
-      String segmentName = entry.getKey();
-      Map<String, String> currentInstanceStateMap = entry.getValue();
-      Map<String, String> targetInstanceStateMap = 
targetAssignment.get(segmentName);
-      SingleSegmentAssignment assignment =
-          getNextSingleSegmentAssignment(currentInstanceStateMap, 
targetInstanceStateMap, minAvailableReplicas,
-              lowDiskMode, numSegmentsToOffloadMap, assignmentMap);
-      Set<String> assignedInstances = assignment._instanceStateMap.keySet();
-      Set<String> availableInstances = assignment._availableInstances;
-      availableInstancesMap.compute(assignedInstances, (k, 
currentAvailableInstances) -> {
-        if (currentAvailableInstances == null) {
-          // First segment assigned to these instances, use the new assignment 
and update the available instances
-          nextAssignment.put(segmentName, assignment._instanceStateMap);
-          updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, 
currentInstanceStateMap.keySet(), k);
-          return availableInstances;
-        } else {
-          // There are other segments assigned to the same instances, check 
the available instances to see if adding the
-          // new assignment can still hold the minimum available replicas 
requirement
-          availableInstances.retainAll(currentAvailableInstances);
-          if (availableInstances.size() >= minAvailableReplicas) {
-            // New assignment can be added
+
+    Map<String, Integer> serverToNumSegmentsAddedSoFar = new HashMap<>();
+    for (Map<Set<String>, Map<String, Map<String, String>>> 
assignedInstancesToCurrentAssignment
+        : partitionIdToAssignedInstancesToCurrentAssignmentMap.values()) {
+      boolean anyServerExhaustedBatchSize = false;
+      if (batchSizePerServer != RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) 
{
+        // Check if the servers of the first assignment for each unique set of 
assigned instances has any space left
+        // to move this partition. If so, let's mark the partitions as to be 
moved, otherwise we mark the partition
+        // as a whole as not moveable.
+        for (Map<String, Map<String, String>> curAssignment : 
assignedInstancesToCurrentAssignment.values()) {
+          Map.Entry<String, Map<String, String>> firstEntry = 
curAssignment.entrySet().iterator().next();
+          // All segments should be assigned to the same set of servers so it 
is enough to check for whether any server
+          // for one segment is above the limit or not
+          Map<String, String> firstEntryInstanceStateMap = 
firstEntry.getValue();
+          SingleSegmentAssignment firstAssignment =
+              getNextSingleSegmentAssignment(firstEntryInstanceStateMap, 
targetAssignment.get(firstEntry.getKey()),
+                  minAvailableReplicas, lowDiskMode, numSegmentsToOffloadMap, 
assignmentMap);
+          Set<String> serversAdded = 
getServersAddedInSingleSegmentAssignment(firstEntryInstanceStateMap,
+              firstAssignment._instanceStateMap);
+          for (String server : serversAdded) {
+            if (serverToNumSegmentsAddedSoFar.getOrDefault(server, 0) >= 
batchSizePerServer) {
+              anyServerExhaustedBatchSize = true;
+              break;
+            }
+          }
+          if (anyServerExhaustedBatchSize) {
+            break;
+          }
+        }
+      }
+      for (Map<String, Map<String, String>> curAssignment : 
assignedInstancesToCurrentAssignment.values()) {
+        getNextAssignmentForPartitionIdStrictReplicaGroup(curAssignment, 
targetAssignment, nextAssignment,
+            anyServerExhaustedBatchSize, minAvailableReplicas, lowDiskMode, 
numSegmentsToOffloadMap, assignmentMap,
+            availableInstancesMap, serverToNumSegmentsAddedSoFar);
+      }
+    }
+
+    checkIfAnyServersAssignedMoreSegmentsThanBatchSize(batchSizePerServer, 
serverToNumSegmentsAddedSoFar,
+        tableRebalanceLogger);
+    return nextAssignment;
+  }
+
+  private static void 
getNextAssignmentForPartitionIdStrictReplicaGroup(Map<String, Map<String, 
String>> curAssignment,
+      Map<String, Map<String, String>> targetAssignment, Map<String, 
Map<String, String>> nextAssignment,
+      boolean anyServerExhaustedBatchSize, int minAvailableReplicas, boolean 
lowDiskMode,
+      Map<String, Integer> numSegmentsToOffloadMap, Map<Pair<Set<String>, 
Set<String>>, Set<String>> assignmentMap,
+      Map<Set<String>, Set<String>> availableInstancesMap, Map<String, 
Integer> serverToNumSegmentsAddedSoFar) {
+    if (anyServerExhaustedBatchSize) {
+      // Exhausted the batch size for at least 1 server, just copy over the 
remaining segments as is
+      for (Map.Entry<String, Map<String, String>> entry : 
curAssignment.entrySet()) {
+        String segmentName = entry.getKey();
+        Map<String, String> currentInstanceStateMap = entry.getValue();
+        nextAssignment.put(segmentName, currentInstanceStateMap);
+      }
+    } else {
+      // Process all the partitionIds even if segmentsAddedSoFar becomes 
larger than batchSizePerServer
+      // Can only do bestEfforts w.r.t. StrictReplicaGroup since a whole 
partition must be moved together for
+      // maintaining consistency
+      for (Map.Entry<String, Map<String, String>> entry : 
curAssignment.entrySet()) {

Review Comment:
   **Maybe an example will help:**
   
   **Current implementation:**
   Say I have just 1 partition, p0 with currentAssignment:
   p0s1 -> host1, host2, host3
   p0s2 -> host4, host5, host6
   
   Say my target:
   p0s1 -> host7, host8, host9
   p0s2 -> host7, host8, host9
   
   In the above there are 2 sets of assigned instances in the current 
assignment. I'll calculate the next single segment assignment for both of them 
and set the `anyServerExhaustedBatchSize` flag based on both.
   
   Then using the `anyServerExhaustedBatchSize` that I calculated, I'll walk 
over both assigned instances and update the next assignment. Note here that 
`anyServerExhaustedBatchSize` will be the same value (so either the full 
partition is allowed to be assigned to the nextAssignment or the partition as a 
whole remains on currentAssignment)
   
   
   **The alternative:**
   For each unique assigned instance in the current assignment, calculate 
`anyServerExhaustedBatchSize` separately and then update the nextAssignment 
based on that. Here the batches will be smaller since we aren't moving the 
partition as a whole, but just the segments assigned to a unique assigned 
instance for that partition.
   
   Frankly I think this should be okay to do as well, but to be safe I decided 
to move either the full partition or not at all. For REALTIME, this will become 
the eventual invariant once we fix strictReplicaGroup. For OFFLINE we need to 
see how this evolves for strictReplicaGroup, since technically OFFLINE segments 
can have the segments of a partition assigned across more than 1 server.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to