somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2059154741


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
     }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map<String, Map<String, String>> getNextAssignment(Map<String, 
Map<String, String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+      boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap<String> segmentPartitionIdMap,
+      PartitionIdFetcher partitionIdFetcher) {
+    return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+        lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
    * Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
    * the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
    * the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
    * is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
    */
-  @VisibleForTesting
-  static Map<String, Map<String, String>> getNextAssignment(Map<String, 
Map<String, String>> currentAssignment,
+  private static Map<String, Map<String, String>> 
getNextAssignment(Map<String, Map<String, String>> currentAssignment,
       Map<String, Map<String, String>> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-      boolean lowDiskMode) {
+      boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap<String> segmentPartitionIdMap,
+      PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
     return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-        minAvailableReplicas, lowDiskMode)
+        minAvailableReplicas, lowDiskMode, batchSizePerServer, 
segmentPartitionIdMap, partitionIdFetcher,
+        tableRebalanceLogger)
         : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-            lowDiskMode);
+            lowDiskMode, batchSizePerServer);
   }
 
   private static Map<String, Map<String, String>> 
getNextStrictReplicaGroupAssignment(
       Map<String, Map<String, String>> currentAssignment, Map<String, 
Map<String, String>> targetAssignment,
-      int minAvailableReplicas, boolean lowDiskMode) {
+      int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+      Object2IntOpenHashMap<String> segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+      Logger tableRebalanceLogger) {
     Map<String, Map<String, String>> nextAssignment = new TreeMap<>();
     Map<String, Integer> numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+    Map<Integer, Map<String, Map<String, String>>> 
partitionIdToCurrentAssignmentMap;
+    if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+      // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+      // we want to update the next assignment based on all partitions in this 
case
+      partitionIdToCurrentAssignmentMap = new TreeMap<>();
+      partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
+    } else {
+      partitionIdToCurrentAssignmentMap =
+          getPartitionIdToCurrentAssignmentMap(currentAssignment, 
segmentPartitionIdMap, partitionIdFetcher);
+    }
     Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new 
HashMap<>();
     Map<Set<String>, Set<String>> availableInstancesMap = new HashMap<>();
-    for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
-      String segmentName = entry.getKey();
-      Map<String, String> currentInstanceStateMap = entry.getValue();
-      Map<String, String> targetInstanceStateMap = 
targetAssignment.get(segmentName);
-      SingleSegmentAssignment assignment =
-          getNextSingleSegmentAssignment(currentInstanceStateMap, 
targetInstanceStateMap, minAvailableReplicas,
-              lowDiskMode, numSegmentsToOffloadMap, assignmentMap);
-      Set<String> assignedInstances = assignment._instanceStateMap.keySet();
-      Set<String> availableInstances = assignment._availableInstances;
-      availableInstancesMap.compute(assignedInstances, (k, 
currentAvailableInstances) -> {
-        if (currentAvailableInstances == null) {
-          // First segment assigned to these instances, use the new assignment 
and update the available instances
-          nextAssignment.put(segmentName, assignment._instanceStateMap);
-          updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, 
currentInstanceStateMap.keySet(), k);
-          return availableInstances;
-        } else {
-          // There are other segments assigned to the same instances, check 
the available instances to see if adding the
-          // new assignment can still hold the minimum available replicas 
requirement
-          availableInstances.retainAll(currentAvailableInstances);
-          if (availableInstances.size() >= minAvailableReplicas) {
-            // New assignment can be added
-            nextAssignment.put(segmentName, assignment._instanceStateMap);
-            updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, 
currentInstanceStateMap.keySet(), k);
-            return availableInstances;
-          } else {
-            // New assignment cannot be added, use the current instance state 
map
-            nextAssignment.put(segmentName, currentInstanceStateMap);
-            return currentAvailableInstances;
+
+    Map<String, Integer> serverToNumSegmentsAddedSoFar = new HashMap<>();
+    for (Map<String, Map<String, String>> curAssignment : 
partitionIdToCurrentAssignmentMap.values()) {
+      Map.Entry<String, Map<String, String>> firstEntry = 
curAssignment.entrySet().iterator().next();
+      // All partitions should be assigned to the same set of servers so it is 
enough to check for whether any server

Review Comment:
   @Jackie-Jiang is this a safe assumption to make if we decide to use 
`getNextStrictReplicaGroupAssignment` purely based on the RoutingConfig and not 
based on the actual assignment strategy?
   
   ```
   boolean enableStrictReplicaGroup = tableConfig.getRoutingConfig() != null
      && 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
      tableConfig.getRoutingConfig().getInstanceSelectorType());
   ```
   
   Is it valid to have StrictReplicaGroup routing for an OFFLINE table or 
REALTIME table without Upsert enabled?
   
   In that case, it is not safe to assume all segments with the same 
partitionId is assigned to the same set of servers. We will still make 
progress, but this can miss out assigning some segments that could've been 
assigned and will probably be too conservative.
   
   The option is to split this function into 2 types:
   - StrictRealtimeSegmentAssignment based (which is definitely used for 
Upserts) where we move the partitionId as a whole
   - Other assignments based - in which case we don't try to move all segments 
with the same partition ID together. This will be very similar to the 
non-strict replica group function except with the extra handling for available 
instances



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to