ankitsultana commented on code in PR #15843: URL: https://github.com/apache/pinot/pull/15843#discussion_r2116698769
########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java: ########## @@ -99,58 +100,151 @@ Pair<Map<String, String>, Map<String, String>> select(List<String> segments, int } else { replicaGroupSelected = requestId % instancePartitions.getNumReplicaGroups(); } - for (int iteration = 0; iteration < instancePartitions.getNumReplicaGroups(); iteration++) { - int replicaGroup = (replicaGroupSelected + iteration) % instancePartitions.getNumReplicaGroups(); - try { - return tryAssigning(segments, segmentStates, instancePartitions, replicaGroup); - } catch (Exception e) { - LOGGER.warn("Unable to select replica-group {} for table: {}", replicaGroup, _tableNameWithType, e); + + return assign(Sets.newHashSet(segments), segmentStates, instancePartitions, replicaGroupSelected); + } + + /** + * Returns a map from the segmentName to the corresponding server. It tries to select all servers from the + * preferredReplicaGroup, but if it fails, it will try to select the relevant server from other instance partitions. + * + * @return A pair of maps, where the first map contains the segments that are assigned to a server and the second + * map contains the segments that are optional (i.e., the server is not online to serve that segment). + * Example: + * { + * "required_segments": { + * "segment1": "server1", + * "segment2": "server2" + * }, + * "optional_segments": { + * "segment3": "server3", + * "segment4": "server4" + * } + * } + */ + private Pair<Map<String, String>, Map<String, String>> assign(Set<String> segments, + SegmentStates segmentStates, InstancePartitions instancePartitions, int preferredReplicaId) { + Map<String, Integer> instanceToPartitionMap = instancePartitions.getInstanceToPartitionIdMap(); + Map<String, Set<String>> instanceToSegmentsMap = new HashMap<>(); + + // instanceToSegmentsMap stores the mapping from instance to the active segments it can serve. + for (String segment : segments) { + List<SegmentInstanceCandidate> candidates = segmentStates.getCandidates(segment); + Preconditions.checkState(candidates != null && !candidates.isEmpty(), + "Failed to find servers for segment: %s", segment); + for (SegmentInstanceCandidate candidate : candidates) { + instanceToSegmentsMap + .computeIfAbsent(candidate.getInstance(), k -> new HashSet<>()) + .add(segment); } } - throw new RuntimeException( - String.format("Unable to find any replica-group to serve table: %s", _tableNameWithType)); + + // partitionToRequiredSegmentsMap stores the mapping from partition to the segments that need to be served. This + // is necessary to select appropriate replica group at the instance partition level. + Map<Integer, Set<String>> partitionToRequiredSegmentsMap = new HashMap<>(); + for (Map.Entry<String, Set<String>> entry : instanceToSegmentsMap.entrySet()) { + Integer partitionId = instanceToPartitionMap.get(entry.getKey()); + partitionToRequiredSegmentsMap + .computeIfAbsent(partitionId, k -> new HashSet<>()) + .addAll(entry.getValue()); + } + + // Assign segments to instances based on the partitionToRequiredSegmentsMap. This ensures that we select the + // appropriate replica group for each set of segments belonging to the same instance partition. + Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(); + int numPartitions = instancePartitions.getNumPartitions(); + for (int partition = 0; partition < numPartitions; partition++) { + Set<String> requiredSegments = partitionToRequiredSegmentsMap.get(partition); + if (requiredSegments != null) { + getSelectedInstancesForPartition(instanceToSegmentsMap, requiredSegments, partition, preferredReplicaId, + segmentToSelectedInstanceMap); + } + } + + return computeOptionalSegments(segmentToSelectedInstanceMap, segmentStates); Review Comment: **Note**: Handling optional segments in the end means that an optional segment could cause a no replica-group found error. (I assume an optional segment is likely to have number of candidates < replication-factor). Seems like that was the case before too, but we might have to revisit this at some point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org