somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2063997789


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1434,67 +1445,336 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
     }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map<String, Map<String, String>> getNextAssignment(Map<String, 
Map<String, String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+      boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap<String> segmentPartitionIdMap,
+      PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment) {
+    return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+        lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, isStrictRealtimeSegmentAssignment,
+        LOGGER);
+  }
+
   /**
    * Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
    * the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
    * the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
    * is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   *
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, don't group the assignment by partitionId, since 
the segments of the same partitionId do
+   * not need to be assigned to the same servers. For strict replica group 
routing with strict replica group
+   * assignment on the other hand, group the assignment by partitionId since a 
partition must move as a whole, and they
+   * have the same servers assigned across all segments belonging to the same 
partitionId.
+   *
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *       this is not enforced in the code today. Once enforcement is added, 
there will no longer be any need to
+   *       handle strict replica group routing only v.s. strict replica group 
routing + assignment. Remove the
+   *       getNextStrictReplicaGroupRoutingOnlyAssignment() function.
    */
-  @VisibleForTesting
-  static Map<String, Map<String, String>> getNextAssignment(Map<String, 
Map<String, String>> currentAssignment,
+  private static Map<String, Map<String, String>> 
getNextAssignment(Map<String, Map<String, String>> currentAssignment,
       Map<String, Map<String, String>> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-      boolean lowDiskMode) {
-    return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-        minAvailableReplicas, lowDiskMode)
-        : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-            lowDiskMode);
+      boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap<String> segmentPartitionIdMap,
+      PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) {
+    return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment)
+        ? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+        batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
+        : enableStrictReplicaGroup
+            ? 
getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+            lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, tableRebalanceLogger)
+            : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+            lowDiskMode, batchSizePerServer);
   }
 
   private static Map<String, Map<String, String>> 
getNextStrictReplicaGroupAssignment(
       Map<String, Map<String, String>> currentAssignment, Map<String, 
Map<String, String>> targetAssignment,
-      int minAvailableReplicas, boolean lowDiskMode) {
+      int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+      Object2IntOpenHashMap<String> segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+      Logger tableRebalanceLogger) {
     Map<String, Map<String, String>> nextAssignment = new TreeMap<>();
     Map<String, Integer> numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+    Map<Integer, Map<String, Map<String, String>>> 
partitionIdToCurrentAssignmentMap;
+    if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+      // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+      // we want to update the next assignment based on all partitions in this 
case
+      partitionIdToCurrentAssignmentMap = new TreeMap<>();
+      partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
+    } else {
+      partitionIdToCurrentAssignmentMap =
+          getPartitionIdToCurrentAssignmentMap(currentAssignment, 
segmentPartitionIdMap, partitionIdFetcher);
+    }
     Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new 
HashMap<>();
     Map<Set<String>, Set<String>> availableInstancesMap = new HashMap<>();
-    for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
-      String segmentName = entry.getKey();
-      Map<String, String> currentInstanceStateMap = entry.getValue();
-      Map<String, String> targetInstanceStateMap = 
targetAssignment.get(segmentName);
-      SingleSegmentAssignment assignment =
-          getNextSingleSegmentAssignment(currentInstanceStateMap, 
targetInstanceStateMap, minAvailableReplicas,
-              lowDiskMode, numSegmentsToOffloadMap, assignmentMap);
-      Set<String> assignedInstances = assignment._instanceStateMap.keySet();
-      Set<String> availableInstances = assignment._availableInstances;
-      availableInstancesMap.compute(assignedInstances, (k, 
currentAvailableInstances) -> {
-        if (currentAvailableInstances == null) {
-          // First segment assigned to these instances, use the new assignment 
and update the available instances
-          nextAssignment.put(segmentName, assignment._instanceStateMap);
-          updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, 
currentInstanceStateMap.keySet(), k);
-          return availableInstances;
-        } else {
-          // There are other segments assigned to the same instances, check 
the available instances to see if adding the
-          // new assignment can still hold the minimum available replicas 
requirement
-          availableInstances.retainAll(currentAvailableInstances);
-          if (availableInstances.size() >= minAvailableReplicas) {
-            // New assignment can be added
+
+    Map<String, Integer> serverToNumSegmentsAddedSoFar = new HashMap<>();
+    for (Map<String, Map<String, String>> curAssignment : 
partitionIdToCurrentAssignmentMap.values()) {
+      boolean anyServerExhaustedBatchSize = false;
+      if (batchSizePerServer != RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) 
{
+        Map.Entry<String, Map<String, String>> firstEntry = 
curAssignment.entrySet().iterator().next();
+        // All partitions should be assigned to the same set of servers so it 
is enough to check for whether any server
+        // for one segment is above the limit or not
+        Map<String, String> firstEntryInstanceStateMap = firstEntry.getValue();
+        SingleSegmentAssignment firstAssignment =
+            getNextSingleSegmentAssignment(firstEntryInstanceStateMap, 
targetAssignment.get(firstEntry.getKey()),
+                minAvailableReplicas, lowDiskMode, numSegmentsToOffloadMap, 
assignmentMap);
+        Set<String> serversAdded = 
getServersAddedInSingleSegmentAssignment(firstEntryInstanceStateMap,
+            firstAssignment._instanceStateMap);
+        for (String server : serversAdded) {
+          if (serverToNumSegmentsAddedSoFar.getOrDefault(server, 0) >= 
batchSizePerServer) {
+            anyServerExhaustedBatchSize = true;
+            break;
+          }
+        }
+      }
+      getNextAssignmentForPartitionIdStrictReplicaGroup(curAssignment, 
targetAssignment, nextAssignment,
+          anyServerExhaustedBatchSize, minAvailableReplicas, lowDiskMode, 
numSegmentsToOffloadMap, assignmentMap,
+          availableInstancesMap, serverToNumSegmentsAddedSoFar);
+    }
+
+    checkIfAnyServersAssignedMoreSegmentsThanBatchSize(batchSizePerServer, 
serverToNumSegmentsAddedSoFar,
+        tableRebalanceLogger);
+    return nextAssignment;
+  }
+
+  /**
+   * Create a mapping of partitionId to the current assignment of segments 
that belong to that partitionId. This is to
+   * be used for batching purposes for StrictReplicaGroup
+   * @param currentAssignment the current assignment
+   * @param segmentPartitionIdMap cache to store the partition ids to avoid 
fetching ZK segment metadata
+   * @param partitionIdFetcher function to fetch the partition id
+   * @return a mapping from partitionId to the segment assignment map of all 
segments that map to that partitionId
+   */
+  private static Map<Integer, Map<String, Map<String, String>>> 
getPartitionIdToCurrentAssignmentMap(
+      Map<String, Map<String, String>> currentAssignment, 
Object2IntOpenHashMap<String> segmentPartitionIdMap,
+      PartitionIdFetcher partitionIdFetcher) {
+    Map<Integer, Map<String, Map<String, String>>> 
partitionIdToCurrentAssignmentMap = new TreeMap<>();
+
+    for (Map.Entry<String, Map<String, String>> assignment : 
currentAssignment.entrySet()) {
+      String segmentName = assignment.getKey();
+      Map<String, String> instanceStateMap = assignment.getValue();
+
+      int partitionId =
+          segmentPartitionIdMap.computeIfAbsent(segmentName, v -> 
partitionIdFetcher.fetch(segmentName));
+      partitionIdToCurrentAssignmentMap.computeIfAbsent(partitionId,
+          k -> new TreeMap<>()).put(segmentName, instanceStateMap);
+    }
+
+    return partitionIdToCurrentAssignmentMap;
+  }
+
+  private static Map<String, Map<String, String>> 
getNextStrictReplicaGroupRoutingOnlyAssignment(
+      Map<String, Map<String, String>> currentAssignment, Map<String, 
Map<String, String>> targetAssignment,
+      int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+      Object2IntOpenHashMap<String> segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+      Logger tableRebalanceLogger) {
+    Map<String, Map<String, String>> nextAssignment = new TreeMap<>();
+    Map<String, Integer> numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+    Map<Integer, Map<Set<String>, Map<String, Map<String, String>>>>
+        partitionIdToAssignedInstancesToCurrentAssignmentMap;
+    if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+      // Don't calculate the partition id to assigned instances to current 
assignment mapping if batching is disabled
+      // since we want to update the next assignment based on all partitions 
in this case. Use partitionId as 0
+      // and a dummy set for the assigned instances.
+      partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>();
+      partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new 
HashMap<>());
+      
partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), 
currentAssignment);
+    } else {
+      partitionIdToAssignedInstancesToCurrentAssignmentMap =
+          
getPartitionIdToAssignedInstancesToCurrentAssignmentMap(currentAssignment, 
segmentPartitionIdMap,
+              partitionIdFetcher);
+    }
+    Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new 
HashMap<>();
+    Map<Set<String>, Set<String>> availableInstancesMap = new HashMap<>();
+
+    Map<String, Integer> serverToNumSegmentsAddedSoFar = new HashMap<>();
+    for (Map<Set<String>, Map<String, Map<String, String>>> 
assignedInstancesToCurrentAssignment
+        : partitionIdToAssignedInstancesToCurrentAssignmentMap.values()) {
+      boolean anyServerExhaustedBatchSize = false;
+      if (batchSizePerServer != RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) 
{
+        // Check if the servers of the first assignment for each unique set of 
assigned instances has any space left
+        // to move this partition. If so, let's mark the partitions as to be 
moved, otherwise we mark the partition
+        // as a whole as not moveable.
+        for (Map<String, Map<String, String>> curAssignment : 
assignedInstancesToCurrentAssignment.values()) {
+          Map.Entry<String, Map<String, String>> firstEntry = 
curAssignment.entrySet().iterator().next();
+          // All segments should be assigned to the same set of servers so it 
is enough to check for whether any server
+          // for one segment is above the limit or not
+          Map<String, String> firstEntryInstanceStateMap = 
firstEntry.getValue();
+          SingleSegmentAssignment firstAssignment =

Review Comment:
   @Jackie-Jiang just FYI (same for `getNextStrictReplicaGroupAssignment` as 
well)
   
   Here we just calculate the list of next servers and don't do the actual 
assignment. I still pass the original `numSegmentsToOffloadMap` and 
`assignmentMap`, but this does not update the `numSegmentsToOffloadMap` in this 
step, which means that the servers chosen may not correctly reflect based on 
`numSegmentsToOffloadMap` since this is not updated. The `assignmentMap` will 
be updated though and reused when we actually decide to do the assignment -> 
which i think is correct since this ensures we don't change the servers when we 
do assignment compared to what we use to decide whether to assign or not based 
on `batchSizePerServer`.
   
   @Jackie-Jiang Do you see any concerns with this approach? I'm not sure how 
best to incorporate the segments to offload piece here without doing an actual 
assignment. Open to brainstorming ideas on how to handle this if you think it 
is a concern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to