somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2080689742


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
     }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map<String, Map<String, String>> getNextAssignment(Map<String, 
Map<String, String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+      boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap<String> segmentPartitionIdMap,
+      PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment) {
+    return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+        lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, isStrictRealtimeSegmentAssignment,
+        LOGGER);
+  }
+
   /**
    * Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
    * the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
    * the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
    * is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   *
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, don't group the assignment by partitionId, since 
the segments of the same partitionId do
+   * not need to be assigned to the same servers. For strict replica group 
routing with strict replica group
+   * assignment on the other hand, group the assignment by partitionId since a 
partition must move as a whole, and they
+   * have the same servers assigned across all segments belonging to the same 
partitionId.
+   *
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but

Review Comment:
   done



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1595,13 +1875,41 @@ private static Map<String, Map<String, String>> 
getNextNonStrictReplicaGroupAssi
       Map<String, String> nextInstanceStateMap =
           getNextSingleSegmentAssignment(currentInstanceStateMap, 
targetInstanceStateMap, minAvailableReplicas,
               lowDiskMode, numSegmentsToOffloadMap, 
assignmentMap)._instanceStateMap;
-      nextAssignment.put(segmentName, nextInstanceStateMap);
-      updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, 
currentInstanceStateMap.keySet(),
-          nextInstanceStateMap.keySet());
+      Set<String> serversAddedForSegment = 
getServersAddedInSingleSegmentAssignment(currentInstanceStateMap,
+          nextInstanceStateMap);
+      boolean anyServerExhaustedBatchSize = false;
+      if (batchSizePerServer != RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) 
{
+        for (String server : serversAddedForSegment) {
+          if (serverToNumSegmentsAddedSoFar.getOrDefault(server, 0) >= 
batchSizePerServer) {
+            anyServerExhaustedBatchSize = true;
+            break;
+          }
+        }
+      }
+      if (anyServerExhaustedBatchSize) {
+        // Exhausted the batch size for at least 1 server, set to existing 
assignment
+        nextAssignment.put(segmentName, currentInstanceStateMap);
+      } else {
+        // Add the next assignment and update the segments added so far counts
+        for (String server : serversAddedForSegment) {
+          int numSegmentsAdded = 
serverToNumSegmentsAddedSoFar.getOrDefault(server, 0);
+          serverToNumSegmentsAddedSoFar.put(server, numSegmentsAdded + 1);
+        }

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to