J-HowHuang commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2058977515
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -479,6 +482,16 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb externalViewStabilizationTimeoutInMs); int expectedVersion = currentIdealState.getRecord().getVersion(); + // Cache segment partition id to avoid ZK reads. Similar behavior as cache used in StrictReplicaGroupAssignment + // NOTE: + // 1. This cache is used for table rebalance only, but not segment assignment. During rebalance, rebalanceTable() + // can be invoked multiple times when the ideal state changes during the rebalance process. + // 2. The cache won't be refreshed when an existing segment is replaced with a segment from a different partition. + // Replacing a segment with a segment from a different partition should not be allowed for upsert table because + // it will cause the segment being served by the wrong servers. If this happens during the table rebalance, + // another rebalance might be needed to fix the assignment. + Object2IntOpenHashMap<String> segmentPartitionIdMap = new Object2IntOpenHashMap<>(); Review Comment: Is it possible to get the same cache map object from `segmentAssignment` if it's instance of `StrictRealtimeSegmentAssignment`? IIUC the cache would be available already while computing next assignment ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment, + Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap<String> segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { + return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, + lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. */ - @VisibleForTesting - static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment, + private static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap<String> segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, - minAvailableReplicas, lowDiskMode) + minAvailableReplicas, lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, + tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, - lowDiskMode); + lowDiskMode, batchSizePerServer); } private static Map<String, Map<String, String>> getNextStrictReplicaGroupAssignment( Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap<String> segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map<String, Map<String, String>> nextAssignment = new TreeMap<>(); Map<String, Integer> numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); + Map<Integer, Map<String, Map<String, String>>> partitionIdToCurrentAssignmentMap; + if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment); + } else { + partitionIdToCurrentAssignmentMap = + getPartitionIdToCurrentAssignmentMap(currentAssignment, segmentPartitionIdMap, partitionIdFetcher); + } Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new HashMap<>(); Map<Set<String>, Set<String>> availableInstancesMap = new HashMap<>(); - for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) { - String segmentName = entry.getKey(); - Map<String, String> currentInstanceStateMap = entry.getValue(); - Map<String, String> targetInstanceStateMap = targetAssignment.get(segmentName); - SingleSegmentAssignment assignment = - getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, - lowDiskMode, numSegmentsToOffloadMap, assignmentMap); - Set<String> assignedInstances = assignment._instanceStateMap.keySet(); - Set<String> availableInstances = assignment._availableInstances; - availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> { - if (currentAvailableInstances == null) { - // First segment assigned to these instances, use the new assignment and update the available instances - nextAssignment.put(segmentName, assignment._instanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); - return availableInstances; - } else { - // There are other segments assigned to the same instances, check the available instances to see if adding the - // new assignment can still hold the minimum available replicas requirement - availableInstances.retainAll(currentAvailableInstances); - if (availableInstances.size() >= minAvailableReplicas) { - // New assignment can be added - nextAssignment.put(segmentName, assignment._instanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); - return availableInstances; - } else { - // New assignment cannot be added, use the current instance state map - nextAssignment.put(segmentName, currentInstanceStateMap); - return currentAvailableInstances; + + Map<String, Integer> serverToNumSegmentsAddedSoFar = new HashMap<>(); + for (Map<String, Map<String, String>> curAssignment : partitionIdToCurrentAssignmentMap.values()) { + Map.Entry<String, Map<String, String>> firstEntry = curAssignment.entrySet().iterator().next(); + // All partitions should be assigned to the same set of servers so it is enough to check for whether any server + // for one segment is above the limit or not + Map<String, String> firstEntryInstanceStateMap = firstEntry.getValue(); + SingleSegmentAssignment firstAssignment = + getNextSingleSegmentAssignment(firstEntryInstanceStateMap, targetAssignment.get(firstEntry.getKey()), + minAvailableReplicas, lowDiskMode, numSegmentsToOffloadMap, assignmentMap); + Set<String> serversAdded = getServersAddedInSingleSegmentAssignment(firstEntryInstanceStateMap, + firstAssignment._instanceStateMap); + boolean anyServerExhaustedBatchSize = false; Review Comment: This flag can be moved outside the partition loop and only need to re-calculate if it's false (once it's true it wouldn't change back to false) ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment, + Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap<String> segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { + return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, + lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. */ - @VisibleForTesting - static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment, + private static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap<String> segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, - minAvailableReplicas, lowDiskMode) + minAvailableReplicas, lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, + tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, - lowDiskMode); + lowDiskMode, batchSizePerServer); } private static Map<String, Map<String, String>> getNextStrictReplicaGroupAssignment( Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap<String> segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map<String, Map<String, String>> nextAssignment = new TreeMap<>(); Map<String, Integer> numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); + Map<Integer, Map<String, Map<String, String>>> partitionIdToCurrentAssignmentMap; + if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment); + } else { + partitionIdToCurrentAssignmentMap = + getPartitionIdToCurrentAssignmentMap(currentAssignment, segmentPartitionIdMap, partitionIdFetcher); + } Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new HashMap<>(); Map<Set<String>, Set<String>> availableInstancesMap = new HashMap<>(); - for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) { - String segmentName = entry.getKey(); - Map<String, String> currentInstanceStateMap = entry.getValue(); - Map<String, String> targetInstanceStateMap = targetAssignment.get(segmentName); - SingleSegmentAssignment assignment = - getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, - lowDiskMode, numSegmentsToOffloadMap, assignmentMap); - Set<String> assignedInstances = assignment._instanceStateMap.keySet(); - Set<String> availableInstances = assignment._availableInstances; - availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> { - if (currentAvailableInstances == null) { - // First segment assigned to these instances, use the new assignment and update the available instances - nextAssignment.put(segmentName, assignment._instanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); - return availableInstances; - } else { - // There are other segments assigned to the same instances, check the available instances to see if adding the - // new assignment can still hold the minimum available replicas requirement - availableInstances.retainAll(currentAvailableInstances); - if (availableInstances.size() >= minAvailableReplicas) { - // New assignment can be added - nextAssignment.put(segmentName, assignment._instanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); - return availableInstances; - } else { - // New assignment cannot be added, use the current instance state map - nextAssignment.put(segmentName, currentInstanceStateMap); - return currentAvailableInstances; + + Map<String, Integer> serverToNumSegmentsAddedSoFar = new HashMap<>(); + for (Map<String, Map<String, String>> curAssignment : partitionIdToCurrentAssignmentMap.values()) { + Map.Entry<String, Map<String, String>> firstEntry = curAssignment.entrySet().iterator().next(); + // All partitions should be assigned to the same set of servers so it is enough to check for whether any server + // for one segment is above the limit or not + Map<String, String> firstEntryInstanceStateMap = firstEntry.getValue(); + SingleSegmentAssignment firstAssignment = + getNextSingleSegmentAssignment(firstEntryInstanceStateMap, targetAssignment.get(firstEntry.getKey()), + minAvailableReplicas, lowDiskMode, numSegmentsToOffloadMap, assignmentMap); + Set<String> serversAdded = getServersAddedInSingleSegmentAssignment(firstEntryInstanceStateMap, + firstAssignment._instanceStateMap); + boolean anyServerExhaustedBatchSize = false; + if (batchSizePerServer != RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + for (String server : serversAdded) { + if (serverToNumSegmentsAddedSoFar.getOrDefault(server, 0) >= batchSizePerServer) { + anyServerExhaustedBatchSize = true; + break; } } - }); + } + if (anyServerExhaustedBatchSize) { + // Exhausted the batch size for at least 1 server, just copy over the remaining segments as is + for (Map.Entry<String, Map<String, String>> entry : curAssignment.entrySet()) { + String segmentName = entry.getKey(); + Map<String, String> currentInstanceStateMap = entry.getValue(); + nextAssignment.put(segmentName, currentInstanceStateMap); + } + } else { + // Process all the partitionIds even if segmentsAddedSoFar becomes larger than batchSizePerServer + // Can only do bestEfforts w.r.t. StrictReplicaGroup since a whole partition must be moved together for + // maintaining consistency + for (Map.Entry<String, Map<String, String>> entry : curAssignment.entrySet()) { + String segmentName = entry.getKey(); + Map<String, String> currentInstanceStateMap = entry.getValue(); + Map<String, String> targetInstanceStateMap = targetAssignment.get(segmentName); + SingleSegmentAssignment assignment = + getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, + lowDiskMode, numSegmentsToOffloadMap, assignmentMap); + Set<String> assignedInstances = assignment._instanceStateMap.keySet(); + Set<String> availableInstances = assignment._availableInstances; + availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> { + if (currentAvailableInstances == null) { + // First segment assigned to these instances, use the new assignment and update the available instances + nextAssignment.put(segmentName, assignment._instanceStateMap); + updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); + return availableInstances; + } else { + // There are other segments assigned to the same instances, check the available instances to see if adding + // the new assignment can still hold the minimum available replicas requirement + availableInstances.retainAll(currentAvailableInstances); + if (availableInstances.size() >= minAvailableReplicas) { + // New assignment can be added + nextAssignment.put(segmentName, assignment._instanceStateMap); + updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); + return availableInstances; + } else { + // New assignment cannot be added, use the current instance state map + nextAssignment.put(segmentName, currentInstanceStateMap); + return currentAvailableInstances; + } + } + }); + + if (!nextAssignment.get(segmentName).equals(currentInstanceStateMap)) { + Set<String> serversAddedForSegment = getServersAddedInSingleSegmentAssignment(currentInstanceStateMap, + nextAssignment.get(segmentName)); + for (String server : serversAddedForSegment) { + int numSegmentsAdded = serverToNumSegmentsAddedSoFar.getOrDefault(server, 0); + serverToNumSegmentsAddedSoFar.put(server, numSegmentsAdded + 1); + } + } + } + } + } + + int maxSegmentsAddedToAnyServer = serverToNumSegmentsAddedSoFar.isEmpty() ? 0 + : Collections.max(serverToNumSegmentsAddedSoFar.values()); + if (batchSizePerServer != RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER + && maxSegmentsAddedToAnyServer > batchSizePerServer) { + tableRebalanceLogger.warn("Found at least one server with {} segments added which is larger than " + + "batchSizePerServer: {}", maxSegmentsAddedToAnyServer, batchSizePerServer); } return nextAssignment; } + /** + * Create a mapping of partitionId to the current assignment of segments that belong to that partitionId. This is to + * be used for batching purposes for StrictReplicaGroup + * @param currentAssignment the current assignment + * @param segmentPartitionIdMap cache to store the partition ids to avoid fetching ZK segment metadata + * @param partitionIdFetcher function to fetch the partition id + * @return a mapping from partitionId to the segment assignment map of all segments that map to that partition + */ + private static Map<Integer, Map<String, Map<String, String>>> getPartitionIdToCurrentAssignmentMap( + Map<String, Map<String, String>> currentAssignment, Object2IntOpenHashMap<String> segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { + Map<Integer, Map<String, Map<String, String>>> partitionIdToCurrentAssignmentMap = new TreeMap<>(); + + for (Map.Entry<String, Map<String, String>> assignment : currentAssignment.entrySet()) { + String segmentName = assignment.getKey(); + Map<String, String> instanceStateMap = assignment.getValue(); + + int partitionId = + segmentPartitionIdMap.computeIntIfAbsent(segmentName, v -> partitionIdFetcher.fetch(segmentName)); + partitionIdToCurrentAssignmentMap.putIfAbsent(partitionId, new TreeMap<>()); + partitionIdToCurrentAssignmentMap.get(partitionId).put(segmentName, instanceStateMap); Review Comment: ```suggestion partitionIdToCurrentAssignmentMap.computeIfAbsent(partitionId, k -> new TreeMap<>()).put(segmentName, instanceStateMap); ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment, + Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap<String> segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { + return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, + lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. */ - @VisibleForTesting - static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment, + private static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap<String> segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, - minAvailableReplicas, lowDiskMode) + minAvailableReplicas, lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, + tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, - lowDiskMode); + lowDiskMode, batchSizePerServer); } private static Map<String, Map<String, String>> getNextStrictReplicaGroupAssignment( Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap<String> segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map<String, Map<String, String>> nextAssignment = new TreeMap<>(); Map<String, Integer> numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); + Map<Integer, Map<String, Map<String, String>>> partitionIdToCurrentAssignmentMap; + if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment); + } else { + partitionIdToCurrentAssignmentMap = + getPartitionIdToCurrentAssignmentMap(currentAssignment, segmentPartitionIdMap, partitionIdFetcher); + } Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new HashMap<>(); Map<Set<String>, Set<String>> availableInstancesMap = new HashMap<>(); - for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) { - String segmentName = entry.getKey(); - Map<String, String> currentInstanceStateMap = entry.getValue(); - Map<String, String> targetInstanceStateMap = targetAssignment.get(segmentName); - SingleSegmentAssignment assignment = - getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, - lowDiskMode, numSegmentsToOffloadMap, assignmentMap); - Set<String> assignedInstances = assignment._instanceStateMap.keySet(); - Set<String> availableInstances = assignment._availableInstances; - availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> { - if (currentAvailableInstances == null) { - // First segment assigned to these instances, use the new assignment and update the available instances - nextAssignment.put(segmentName, assignment._instanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); - return availableInstances; - } else { - // There are other segments assigned to the same instances, check the available instances to see if adding the - // new assignment can still hold the minimum available replicas requirement - availableInstances.retainAll(currentAvailableInstances); - if (availableInstances.size() >= minAvailableReplicas) { - // New assignment can be added - nextAssignment.put(segmentName, assignment._instanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); - return availableInstances; - } else { - // New assignment cannot be added, use the current instance state map - nextAssignment.put(segmentName, currentInstanceStateMap); - return currentAvailableInstances; + + Map<String, Integer> serverToNumSegmentsAddedSoFar = new HashMap<>(); + for (Map<String, Map<String, String>> curAssignment : partitionIdToCurrentAssignmentMap.values()) { + Map.Entry<String, Map<String, String>> firstEntry = curAssignment.entrySet().iterator().next(); + // All partitions should be assigned to the same set of servers so it is enough to check for whether any server + // for one segment is above the limit or not + Map<String, String> firstEntryInstanceStateMap = firstEntry.getValue(); + SingleSegmentAssignment firstAssignment = + getNextSingleSegmentAssignment(firstEntryInstanceStateMap, targetAssignment.get(firstEntry.getKey()), + minAvailableReplicas, lowDiskMode, numSegmentsToOffloadMap, assignmentMap); + Set<String> serversAdded = getServersAddedInSingleSegmentAssignment(firstEntryInstanceStateMap, + firstAssignment._instanceStateMap); + boolean anyServerExhaustedBatchSize = false; + if (batchSizePerServer != RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + for (String server : serversAdded) { + if (serverToNumSegmentsAddedSoFar.getOrDefault(server, 0) >= batchSizePerServer) { + anyServerExhaustedBatchSize = true; + break; } } - }); + } + if (anyServerExhaustedBatchSize) { + // Exhausted the batch size for at least 1 server, just copy over the remaining segments as is + for (Map.Entry<String, Map<String, String>> entry : curAssignment.entrySet()) { + String segmentName = entry.getKey(); + Map<String, String> currentInstanceStateMap = entry.getValue(); + nextAssignment.put(segmentName, currentInstanceStateMap); + } + } else { + // Process all the partitionIds even if segmentsAddedSoFar becomes larger than batchSizePerServer + // Can only do bestEfforts w.r.t. StrictReplicaGroup since a whole partition must be moved together for + // maintaining consistency + for (Map.Entry<String, Map<String, String>> entry : curAssignment.entrySet()) { + String segmentName = entry.getKey(); + Map<String, String> currentInstanceStateMap = entry.getValue(); + Map<String, String> targetInstanceStateMap = targetAssignment.get(segmentName); + SingleSegmentAssignment assignment = + getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, + lowDiskMode, numSegmentsToOffloadMap, assignmentMap); + Set<String> assignedInstances = assignment._instanceStateMap.keySet(); Review Comment: This would be the same across `curAssignment` because we have the assumption ``` // All partitions should be assigned to the same set of servers so it is enough to check for whether any server // for one segment is above the limit or not ``` Thus `availableInstancesMap` is unnecessary? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment, + Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap<String> segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { + return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, + lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. */ - @VisibleForTesting - static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment, + private static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap<String> segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, - minAvailableReplicas, lowDiskMode) + minAvailableReplicas, lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, + tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, - lowDiskMode); + lowDiskMode, batchSizePerServer); } private static Map<String, Map<String, String>> getNextStrictReplicaGroupAssignment( Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap<String> segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map<String, Map<String, String>> nextAssignment = new TreeMap<>(); Map<String, Integer> numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); + Map<Integer, Map<String, Map<String, String>>> partitionIdToCurrentAssignmentMap; + if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment); + } else { + partitionIdToCurrentAssignmentMap = + getPartitionIdToCurrentAssignmentMap(currentAssignment, segmentPartitionIdMap, partitionIdFetcher); + } Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new HashMap<>(); Map<Set<String>, Set<String>> availableInstancesMap = new HashMap<>(); - for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) { - String segmentName = entry.getKey(); - Map<String, String> currentInstanceStateMap = entry.getValue(); - Map<String, String> targetInstanceStateMap = targetAssignment.get(segmentName); - SingleSegmentAssignment assignment = - getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, - lowDiskMode, numSegmentsToOffloadMap, assignmentMap); - Set<String> assignedInstances = assignment._instanceStateMap.keySet(); - Set<String> availableInstances = assignment._availableInstances; - availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> { - if (currentAvailableInstances == null) { - // First segment assigned to these instances, use the new assignment and update the available instances - nextAssignment.put(segmentName, assignment._instanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); - return availableInstances; - } else { - // There are other segments assigned to the same instances, check the available instances to see if adding the - // new assignment can still hold the minimum available replicas requirement - availableInstances.retainAll(currentAvailableInstances); - if (availableInstances.size() >= minAvailableReplicas) { - // New assignment can be added - nextAssignment.put(segmentName, assignment._instanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); - return availableInstances; - } else { - // New assignment cannot be added, use the current instance state map - nextAssignment.put(segmentName, currentInstanceStateMap); - return currentAvailableInstances; + + Map<String, Integer> serverToNumSegmentsAddedSoFar = new HashMap<>(); + for (Map<String, Map<String, String>> curAssignment : partitionIdToCurrentAssignmentMap.values()) { + Map.Entry<String, Map<String, String>> firstEntry = curAssignment.entrySet().iterator().next(); + // All partitions should be assigned to the same set of servers so it is enough to check for whether any server + // for one segment is above the limit or not + Map<String, String> firstEntryInstanceStateMap = firstEntry.getValue(); + SingleSegmentAssignment firstAssignment = + getNextSingleSegmentAssignment(firstEntryInstanceStateMap, targetAssignment.get(firstEntry.getKey()), + minAvailableReplicas, lowDiskMode, numSegmentsToOffloadMap, assignmentMap); + Set<String> serversAdded = getServersAddedInSingleSegmentAssignment(firstEntryInstanceStateMap, + firstAssignment._instanceStateMap); + boolean anyServerExhaustedBatchSize = false; + if (batchSizePerServer != RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + for (String server : serversAdded) { + if (serverToNumSegmentsAddedSoFar.getOrDefault(server, 0) >= batchSizePerServer) { + anyServerExhaustedBatchSize = true; + break; } } - }); + } + if (anyServerExhaustedBatchSize) { + // Exhausted the batch size for at least 1 server, just copy over the remaining segments as is + for (Map.Entry<String, Map<String, String>> entry : curAssignment.entrySet()) { + String segmentName = entry.getKey(); + Map<String, String> currentInstanceStateMap = entry.getValue(); + nextAssignment.put(segmentName, currentInstanceStateMap); + } + } else { + // Process all the partitionIds even if segmentsAddedSoFar becomes larger than batchSizePerServer + // Can only do bestEfforts w.r.t. StrictReplicaGroup since a whole partition must be moved together for + // maintaining consistency + for (Map.Entry<String, Map<String, String>> entry : curAssignment.entrySet()) { + String segmentName = entry.getKey(); + Map<String, String> currentInstanceStateMap = entry.getValue(); + Map<String, String> targetInstanceStateMap = targetAssignment.get(segmentName); + SingleSegmentAssignment assignment = + getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, + lowDiskMode, numSegmentsToOffloadMap, assignmentMap); + Set<String> assignedInstances = assignment._instanceStateMap.keySet(); + Set<String> availableInstances = assignment._availableInstances; + availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> { + if (currentAvailableInstances == null) { + // First segment assigned to these instances, use the new assignment and update the available instances + nextAssignment.put(segmentName, assignment._instanceStateMap); + updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); + return availableInstances; + } else { + // There are other segments assigned to the same instances, check the available instances to see if adding + // the new assignment can still hold the minimum available replicas requirement + availableInstances.retainAll(currentAvailableInstances); + if (availableInstances.size() >= minAvailableReplicas) { + // New assignment can be added + nextAssignment.put(segmentName, assignment._instanceStateMap); + updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); + return availableInstances; + } else { + // New assignment cannot be added, use the current instance state map + nextAssignment.put(segmentName, currentInstanceStateMap); + return currentAvailableInstances; + } + } + }); + + if (!nextAssignment.get(segmentName).equals(currentInstanceStateMap)) { + Set<String> serversAddedForSegment = getServersAddedInSingleSegmentAssignment(currentInstanceStateMap, + nextAssignment.get(segmentName)); + for (String server : serversAddedForSegment) { + int numSegmentsAdded = serverToNumSegmentsAddedSoFar.getOrDefault(server, 0); + serverToNumSegmentsAddedSoFar.put(server, numSegmentsAdded + 1); + } + } + } + } + } + + int maxSegmentsAddedToAnyServer = serverToNumSegmentsAddedSoFar.isEmpty() ? 0 + : Collections.max(serverToNumSegmentsAddedSoFar.values()); + if (batchSizePerServer != RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER + && maxSegmentsAddedToAnyServer > batchSizePerServer) { + tableRebalanceLogger.warn("Found at least one server with {} segments added which is larger than " + + "batchSizePerServer: {}", maxSegmentsAddedToAnyServer, batchSizePerServer); } return nextAssignment; } + /** + * Create a mapping of partitionId to the current assignment of segments that belong to that partitionId. This is to + * be used for batching purposes for StrictReplicaGroup + * @param currentAssignment the current assignment + * @param segmentPartitionIdMap cache to store the partition ids to avoid fetching ZK segment metadata + * @param partitionIdFetcher function to fetch the partition id + * @return a mapping from partitionId to the segment assignment map of all segments that map to that partition + */ + private static Map<Integer, Map<String, Map<String, String>>> getPartitionIdToCurrentAssignmentMap( + Map<String, Map<String, String>> currentAssignment, Object2IntOpenHashMap<String> segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { + Map<Integer, Map<String, Map<String, String>>> partitionIdToCurrentAssignmentMap = new TreeMap<>(); + + for (Map.Entry<String, Map<String, String>> assignment : currentAssignment.entrySet()) { + String segmentName = assignment.getKey(); + Map<String, String> instanceStateMap = assignment.getValue(); + + int partitionId = + segmentPartitionIdMap.computeIntIfAbsent(segmentName, v -> partitionIdFetcher.fetch(segmentName)); Review Comment: Is this method deprecated? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org