somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2083768379
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment, + Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap<String> segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { + return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, + lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * <p> + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, the instances assigned for the same partitionId can be different for different segments. + * For strict replica group routing with StrictRealtimeSegmentAssignment on the other hand, the assignment for a given + * partitionId will be the same across all segments. We can treat both cases similarly by creating a mapping from + * partitionId -> unique set of instance assignments -> currentAssignment. With StrictRealtimeSegmentAssignment, + * this map will have a single entry for 'unique set of instance assignments'. + * <p> + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, then the routing side and assignment side + * will be equivalent and all segments belonging to a given partitionId will be assigned to the same set of + * instances. Special handling to check each group of assigned instances can be removed in that case. */ - @VisibleForTesting - static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment, + private static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { - return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, - minAvailableReplicas, lowDiskMode) + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap<String> segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { + return enableStrictReplicaGroup + ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, + batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, - lowDiskMode); + lowDiskMode, batchSizePerServer); } private static Map<String, Map<String, String>> getNextStrictReplicaGroupAssignment( Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap<String> segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map<String, Map<String, String>> nextAssignment = new TreeMap<>(); Map<String, Integer> numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); + Map<Integer, Map<Set<String>, Map<String, Map<String, String>>>> + partitionIdToAssignedInstancesToCurrentAssignmentMap; + if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to assigned instances to current assignment mapping if batching is disabled + // since we want to update the next assignment based on all partitions in this case. Use partitionId as 0 + // and a dummy set for the assigned instances. + partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new HashMap<>()); + partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), currentAssignment); + } else { + partitionIdToAssignedInstancesToCurrentAssignmentMap = + getPartitionIdToAssignedInstancesToCurrentAssignmentMap(currentAssignment, segmentPartitionIdMap, + partitionIdFetcher); + } Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new HashMap<>(); Map<Set<String>, Set<String>> availableInstancesMap = new HashMap<>(); - for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) { - String segmentName = entry.getKey(); - Map<String, String> currentInstanceStateMap = entry.getValue(); - Map<String, String> targetInstanceStateMap = targetAssignment.get(segmentName); - SingleSegmentAssignment assignment = - getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, - lowDiskMode, numSegmentsToOffloadMap, assignmentMap); - Set<String> assignedInstances = assignment._instanceStateMap.keySet(); - Set<String> availableInstances = assignment._availableInstances; - availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> { - if (currentAvailableInstances == null) { - // First segment assigned to these instances, use the new assignment and update the available instances - nextAssignment.put(segmentName, assignment._instanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); - return availableInstances; - } else { - // There are other segments assigned to the same instances, check the available instances to see if adding the - // new assignment can still hold the minimum available replicas requirement - availableInstances.retainAll(currentAvailableInstances); - if (availableInstances.size() >= minAvailableReplicas) { - // New assignment can be added + + Map<String, Integer> serverToNumSegmentsAddedSoFar = new HashMap<>(); + for (Map<Set<String>, Map<String, Map<String, String>>> assignedInstancesToCurrentAssignment + : partitionIdToAssignedInstancesToCurrentAssignmentMap.values()) { + boolean anyServerExhaustedBatchSize = false; + if (batchSizePerServer != RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Check if the servers of the first assignment for each unique set of assigned instances has any space left + // to move this partition. If so, let's mark the partitions as to be moved, otherwise we mark the partition + // as a whole as not moveable. + for (Map<String, Map<String, String>> curAssignment : assignedInstancesToCurrentAssignment.values()) { + Map.Entry<String, Map<String, String>> firstEntry = curAssignment.entrySet().iterator().next(); + // All segments should be assigned to the same set of servers so it is enough to check for whether any server + // for one segment is above the limit or not Review Comment: it's the latter -> current set of assigned instances is the same (key of the map here) I've updated the comment to be clearer though. We need to see how strict replica group evolves for OFFLINE, today you're right that we do allow multiple servers to host the same partitionId ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment, + Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap<String> segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { + return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, + lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * <p> + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, the instances assigned for the same partitionId can be different for different segments. + * For strict replica group routing with StrictRealtimeSegmentAssignment on the other hand, the assignment for a given + * partitionId will be the same across all segments. We can treat both cases similarly by creating a mapping from + * partitionId -> unique set of instance assignments -> currentAssignment. With StrictRealtimeSegmentAssignment, + * this map will have a single entry for 'unique set of instance assignments'. + * <p> + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, then the routing side and assignment side + * will be equivalent and all segments belonging to a given partitionId will be assigned to the same set of + * instances. Special handling to check each group of assigned instances can be removed in that case. */ - @VisibleForTesting - static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment, + private static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { - return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, - minAvailableReplicas, lowDiskMode) + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap<String> segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { + return enableStrictReplicaGroup + ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, + batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, - lowDiskMode); + lowDiskMode, batchSizePerServer); } private static Map<String, Map<String, String>> getNextStrictReplicaGroupAssignment( Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap<String> segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map<String, Map<String, String>> nextAssignment = new TreeMap<>(); Map<String, Integer> numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); + Map<Integer, Map<Set<String>, Map<String, Map<String, String>>>> + partitionIdToAssignedInstancesToCurrentAssignmentMap; + if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to assigned instances to current assignment mapping if batching is disabled + // since we want to update the next assignment based on all partitions in this case. Use partitionId as 0 + // and a dummy set for the assigned instances. + partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new HashMap<>()); + partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), currentAssignment); + } else { + partitionIdToAssignedInstancesToCurrentAssignmentMap = + getPartitionIdToAssignedInstancesToCurrentAssignmentMap(currentAssignment, segmentPartitionIdMap, + partitionIdFetcher); + } Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new HashMap<>(); Map<Set<String>, Set<String>> availableInstancesMap = new HashMap<>(); - for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) { - String segmentName = entry.getKey(); - Map<String, String> currentInstanceStateMap = entry.getValue(); - Map<String, String> targetInstanceStateMap = targetAssignment.get(segmentName); - SingleSegmentAssignment assignment = - getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, - lowDiskMode, numSegmentsToOffloadMap, assignmentMap); - Set<String> assignedInstances = assignment._instanceStateMap.keySet(); - Set<String> availableInstances = assignment._availableInstances; - availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> { - if (currentAvailableInstances == null) { - // First segment assigned to these instances, use the new assignment and update the available instances - nextAssignment.put(segmentName, assignment._instanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); - return availableInstances; - } else { - // There are other segments assigned to the same instances, check the available instances to see if adding the - // new assignment can still hold the minimum available replicas requirement - availableInstances.retainAll(currentAvailableInstances); - if (availableInstances.size() >= minAvailableReplicas) { - // New assignment can be added + + Map<String, Integer> serverToNumSegmentsAddedSoFar = new HashMap<>(); + for (Map<Set<String>, Map<String, Map<String, String>>> assignedInstancesToCurrentAssignment + : partitionIdToAssignedInstancesToCurrentAssignmentMap.values()) { + boolean anyServerExhaustedBatchSize = false; + if (batchSizePerServer != RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Check if the servers of the first assignment for each unique set of assigned instances has any space left + // to move this partition. If so, let's mark the partitions as to be moved, otherwise we mark the partition + // as a whole as not moveable. + for (Map<String, Map<String, String>> curAssignment : assignedInstancesToCurrentAssignment.values()) { + Map.Entry<String, Map<String, String>> firstEntry = curAssignment.entrySet().iterator().next(); + // All segments should be assigned to the same set of servers so it is enough to check for whether any server + // for one segment is above the limit or not + Map<String, String> firstEntryInstanceStateMap = firstEntry.getValue(); + SingleSegmentAssignment firstAssignment = + getNextSingleSegmentAssignment(firstEntryInstanceStateMap, targetAssignment.get(firstEntry.getKey()), + minAvailableReplicas, lowDiskMode, numSegmentsToOffloadMap, assignmentMap); + Set<String> serversAdded = getServersAddedInSingleSegmentAssignment(firstEntryInstanceStateMap, + firstAssignment._instanceStateMap); + for (String server : serversAdded) { + if (serverToNumSegmentsAddedSoFar.getOrDefault(server, 0) >= batchSizePerServer) { + anyServerExhaustedBatchSize = true; + break; + } + } + if (anyServerExhaustedBatchSize) { + break; + } + } + } + for (Map<String, Map<String, String>> curAssignment : assignedInstancesToCurrentAssignment.values()) { + getNextAssignmentForPartitionIdStrictReplicaGroup(curAssignment, targetAssignment, nextAssignment, + anyServerExhaustedBatchSize, minAvailableReplicas, lowDiskMode, numSegmentsToOffloadMap, assignmentMap, + availableInstancesMap, serverToNumSegmentsAddedSoFar); + } + } + + checkIfAnyServersAssignedMoreSegmentsThanBatchSize(batchSizePerServer, serverToNumSegmentsAddedSoFar, + tableRebalanceLogger); + return nextAssignment; + } + + private static void getNextAssignmentForPartitionIdStrictReplicaGroup(Map<String, Map<String, String>> curAssignment, + Map<String, Map<String, String>> targetAssignment, Map<String, Map<String, String>> nextAssignment, + boolean anyServerExhaustedBatchSize, int minAvailableReplicas, boolean lowDiskMode, + Map<String, Integer> numSegmentsToOffloadMap, Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap, + Map<Set<String>, Set<String>> availableInstancesMap, Map<String, Integer> serverToNumSegmentsAddedSoFar) { + if (anyServerExhaustedBatchSize) { + // Exhausted the batch size for at least 1 server, just copy over the remaining segments as is + for (Map.Entry<String, Map<String, String>> entry : curAssignment.entrySet()) { + String segmentName = entry.getKey(); + Map<String, String> currentInstanceStateMap = entry.getValue(); + nextAssignment.put(segmentName, currentInstanceStateMap); + } + } else { + // Process all the partitionIds even if segmentsAddedSoFar becomes larger than batchSizePerServer + // Can only do bestEfforts w.r.t. StrictReplicaGroup since a whole partition must be moved together for + // maintaining consistency + for (Map.Entry<String, Map<String, String>> entry : curAssignment.entrySet()) { + String segmentName = entry.getKey(); + Map<String, String> currentInstanceStateMap = entry.getValue(); + Map<String, String> targetInstanceStateMap = targetAssignment.get(segmentName); + SingleSegmentAssignment assignment = + getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, + lowDiskMode, numSegmentsToOffloadMap, assignmentMap); + Set<String> assignedInstances = assignment._instanceStateMap.keySet(); + Set<String> availableInstances = assignment._availableInstances; + availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> { + if (currentAvailableInstances == null) { + // First segment assigned to these instances, use the new assignment and update the available instances nextAssignment.put(segmentName, assignment._instanceStateMap); updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); return availableInstances; } else { - // New assignment cannot be added, use the current instance state map - nextAssignment.put(segmentName, currentInstanceStateMap); - return currentAvailableInstances; + // There are other segments assigned to the same instances, check the available instances to see if + // adding the new assignment can still hold the minimum available replicas requirement + availableInstances.retainAll(currentAvailableInstances); + if (availableInstances.size() >= minAvailableReplicas) { + // New assignment can be added + nextAssignment.put(segmentName, assignment._instanceStateMap); + updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); + return availableInstances; + } else { + // New assignment cannot be added, use the current instance state map + nextAssignment.put(segmentName, currentInstanceStateMap); + return currentAvailableInstances; + } + } + }); + + if (!nextAssignment.get(segmentName).equals(currentInstanceStateMap)) { + Set<String> serversAddedForSegment = getServersAddedInSingleSegmentAssignment(currentInstanceStateMap, + nextAssignment.get(segmentName)); + for (String server : serversAddedForSegment) { + int numSegmentsAdded = serverToNumSegmentsAddedSoFar.getOrDefault(server, 0); + serverToNumSegmentsAddedSoFar.put(server, numSegmentsAdded + 1); } } - }); + } + } + } + + private static void checkIfAnyServersAssignedMoreSegmentsThanBatchSize(int batchSizePerServer, + Map<String, Integer> serverToNumSegmentsAddedSoFar, Logger tableRebalanceLogger) { + int maxSegmentsAddedToAnyServer = serverToNumSegmentsAddedSoFar.isEmpty() ? 0 + : Collections.max(serverToNumSegmentsAddedSoFar.values()); + if (batchSizePerServer != RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER + && maxSegmentsAddedToAnyServer > batchSizePerServer) { + tableRebalanceLogger.warn("Found at least one server with {} segments added which is larger than " + + "batchSizePerServer: {}", maxSegmentsAddedToAnyServer, batchSizePerServer); Review Comment: done ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment, + Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap<String> segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { + return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, + lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * <p> + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, the instances assigned for the same partitionId can be different for different segments. + * For strict replica group routing with StrictRealtimeSegmentAssignment on the other hand, the assignment for a given + * partitionId will be the same across all segments. We can treat both cases similarly by creating a mapping from + * partitionId -> unique set of instance assignments -> currentAssignment. With StrictRealtimeSegmentAssignment, + * this map will have a single entry for 'unique set of instance assignments'. + * <p> + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, then the routing side and assignment side + * will be equivalent and all segments belonging to a given partitionId will be assigned to the same set of + * instances. Special handling to check each group of assigned instances can be removed in that case. */ - @VisibleForTesting - static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment, + private static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { - return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, - minAvailableReplicas, lowDiskMode) + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap<String> segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { + return enableStrictReplicaGroup + ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, + batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, - lowDiskMode); + lowDiskMode, batchSizePerServer); } private static Map<String, Map<String, String>> getNextStrictReplicaGroupAssignment( Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap<String> segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map<String, Map<String, String>> nextAssignment = new TreeMap<>(); Map<String, Integer> numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); + Map<Integer, Map<Set<String>, Map<String, Map<String, String>>>> + partitionIdToAssignedInstancesToCurrentAssignmentMap; + if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to assigned instances to current assignment mapping if batching is disabled + // since we want to update the next assignment based on all partitions in this case. Use partitionId as 0 + // and a dummy set for the assigned instances. + partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new HashMap<>()); + partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), currentAssignment); + } else { + partitionIdToAssignedInstancesToCurrentAssignmentMap = + getPartitionIdToAssignedInstancesToCurrentAssignmentMap(currentAssignment, segmentPartitionIdMap, + partitionIdFetcher); + } Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new HashMap<>(); Map<Set<String>, Set<String>> availableInstancesMap = new HashMap<>(); - for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) { - String segmentName = entry.getKey(); - Map<String, String> currentInstanceStateMap = entry.getValue(); - Map<String, String> targetInstanceStateMap = targetAssignment.get(segmentName); - SingleSegmentAssignment assignment = - getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, - lowDiskMode, numSegmentsToOffloadMap, assignmentMap); - Set<String> assignedInstances = assignment._instanceStateMap.keySet(); - Set<String> availableInstances = assignment._availableInstances; - availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> { - if (currentAvailableInstances == null) { - // First segment assigned to these instances, use the new assignment and update the available instances - nextAssignment.put(segmentName, assignment._instanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); - return availableInstances; - } else { - // There are other segments assigned to the same instances, check the available instances to see if adding the - // new assignment can still hold the minimum available replicas requirement - availableInstances.retainAll(currentAvailableInstances); - if (availableInstances.size() >= minAvailableReplicas) { - // New assignment can be added + + Map<String, Integer> serverToNumSegmentsAddedSoFar = new HashMap<>(); + for (Map<Set<String>, Map<String, Map<String, String>>> assignedInstancesToCurrentAssignment + : partitionIdToAssignedInstancesToCurrentAssignmentMap.values()) { + boolean anyServerExhaustedBatchSize = false; + if (batchSizePerServer != RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Check if the servers of the first assignment for each unique set of assigned instances has any space left + // to move this partition. If so, let's mark the partitions as to be moved, otherwise we mark the partition + // as a whole as not moveable. + for (Map<String, Map<String, String>> curAssignment : assignedInstancesToCurrentAssignment.values()) { + Map.Entry<String, Map<String, String>> firstEntry = curAssignment.entrySet().iterator().next(); + // All segments should be assigned to the same set of servers so it is enough to check for whether any server + // for one segment is above the limit or not + Map<String, String> firstEntryInstanceStateMap = firstEntry.getValue(); + SingleSegmentAssignment firstAssignment = + getNextSingleSegmentAssignment(firstEntryInstanceStateMap, targetAssignment.get(firstEntry.getKey()), + minAvailableReplicas, lowDiskMode, numSegmentsToOffloadMap, assignmentMap); + Set<String> serversAdded = getServersAddedInSingleSegmentAssignment(firstEntryInstanceStateMap, + firstAssignment._instanceStateMap); + for (String server : serversAdded) { + if (serverToNumSegmentsAddedSoFar.getOrDefault(server, 0) >= batchSizePerServer) { + anyServerExhaustedBatchSize = true; + break; + } + } + if (anyServerExhaustedBatchSize) { + break; + } + } + } + for (Map<String, Map<String, String>> curAssignment : assignedInstancesToCurrentAssignment.values()) { + getNextAssignmentForPartitionIdStrictReplicaGroup(curAssignment, targetAssignment, nextAssignment, + anyServerExhaustedBatchSize, minAvailableReplicas, lowDiskMode, numSegmentsToOffloadMap, assignmentMap, + availableInstancesMap, serverToNumSegmentsAddedSoFar); + } + } + + checkIfAnyServersAssignedMoreSegmentsThanBatchSize(batchSizePerServer, serverToNumSegmentsAddedSoFar, + tableRebalanceLogger); + return nextAssignment; + } + + private static void getNextAssignmentForPartitionIdStrictReplicaGroup(Map<String, Map<String, String>> curAssignment, + Map<String, Map<String, String>> targetAssignment, Map<String, Map<String, String>> nextAssignment, + boolean anyServerExhaustedBatchSize, int minAvailableReplicas, boolean lowDiskMode, + Map<String, Integer> numSegmentsToOffloadMap, Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap, + Map<Set<String>, Set<String>> availableInstancesMap, Map<String, Integer> serverToNumSegmentsAddedSoFar) { + if (anyServerExhaustedBatchSize) { + // Exhausted the batch size for at least 1 server, just copy over the remaining segments as is + for (Map.Entry<String, Map<String, String>> entry : curAssignment.entrySet()) { + String segmentName = entry.getKey(); + Map<String, String> currentInstanceStateMap = entry.getValue(); + nextAssignment.put(segmentName, currentInstanceStateMap); + } + } else { + // Process all the partitionIds even if segmentsAddedSoFar becomes larger than batchSizePerServer + // Can only do bestEfforts w.r.t. StrictReplicaGroup since a whole partition must be moved together for + // maintaining consistency + for (Map.Entry<String, Map<String, String>> entry : curAssignment.entrySet()) { + String segmentName = entry.getKey(); + Map<String, String> currentInstanceStateMap = entry.getValue(); + Map<String, String> targetInstanceStateMap = targetAssignment.get(segmentName); + SingleSegmentAssignment assignment = + getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, + lowDiskMode, numSegmentsToOffloadMap, assignmentMap); + Set<String> assignedInstances = assignment._instanceStateMap.keySet(); + Set<String> availableInstances = assignment._availableInstances; + availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> { + if (currentAvailableInstances == null) { + // First segment assigned to these instances, use the new assignment and update the available instances nextAssignment.put(segmentName, assignment._instanceStateMap); updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); return availableInstances; } else { - // New assignment cannot be added, use the current instance state map - nextAssignment.put(segmentName, currentInstanceStateMap); - return currentAvailableInstances; + // There are other segments assigned to the same instances, check the available instances to see if + // adding the new assignment can still hold the minimum available replicas requirement + availableInstances.retainAll(currentAvailableInstances); + if (availableInstances.size() >= minAvailableReplicas) { + // New assignment can be added + nextAssignment.put(segmentName, assignment._instanceStateMap); + updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); + return availableInstances; + } else { + // New assignment cannot be added, use the current instance state map + nextAssignment.put(segmentName, currentInstanceStateMap); + return currentAvailableInstances; + } + } + }); + + if (!nextAssignment.get(segmentName).equals(currentInstanceStateMap)) { + Set<String> serversAddedForSegment = getServersAddedInSingleSegmentAssignment(currentInstanceStateMap, + nextAssignment.get(segmentName)); + for (String server : serversAddedForSegment) { + int numSegmentsAdded = serverToNumSegmentsAddedSoFar.getOrDefault(server, 0); + serverToNumSegmentsAddedSoFar.put(server, numSegmentsAdded + 1); } } - }); + } + } + } + + private static void checkIfAnyServersAssignedMoreSegmentsThanBatchSize(int batchSizePerServer, + Map<String, Integer> serverToNumSegmentsAddedSoFar, Logger tableRebalanceLogger) { + int maxSegmentsAddedToAnyServer = serverToNumSegmentsAddedSoFar.isEmpty() ? 0 + : Collections.max(serverToNumSegmentsAddedSoFar.values()); + if (batchSizePerServer != RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER + && maxSegmentsAddedToAnyServer > batchSizePerServer) { + tableRebalanceLogger.warn("Found at least one server with {} segments added which is larger than " + + "batchSizePerServer: {}", maxSegmentsAddedToAnyServer, batchSizePerServer); + } + } + + /** + * Create a mapping of partitionId to the mapping of assigned instances to the current assignment of segments that + * belong to that partitionId and assigned instances. This is to be used for batching purposes for StrictReplicaGroup + * routing, for all segment assignment types: RealtimeSegmentAssignment, StrictRealtimeSegmentAssignment and + * OfflineSegmentAssignment + * @param currentAssignment the current assignment + * @param segmentPartitionIdMap cache to store the partition ids to avoid fetching ZK segment metadata + * @param partitionIdFetcher function to fetch the partition id + * @return a mapping from partitionId to the assigned instances to the segment assignment map of all segments that + * map to that partitionId and assigned instances + */ + private static Map<Integer, Map<Set<String>, Map<String, Map<String, String>>>> + getPartitionIdToAssignedInstancesToCurrentAssignmentMap(Map<String, Map<String, String>> currentAssignment, + Object2IntOpenHashMap<String> segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher) { + Map<Integer, Map<Set<String>, Map<String, Map<String, String>>>> + partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>(); + + for (Map.Entry<String, Map<String, String>> assignment : currentAssignment.entrySet()) { + String segmentName = assignment.getKey(); + Map<String, String> instanceStateMap = assignment.getValue(); + + int partitionId = + segmentPartitionIdMap.computeIfAbsent(segmentName, v -> partitionIdFetcher.fetch(segmentName)); + Set<String> assignedInstances = instanceStateMap.keySet(); + partitionIdToAssignedInstancesToCurrentAssignmentMap.putIfAbsent(partitionId, new HashMap<>()); + partitionIdToAssignedInstancesToCurrentAssignmentMap.get(partitionId) Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org