shauryachats commented on code in PR #15843: URL: https://github.com/apache/pinot/pull/15843#discussion_r2106082446
########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java: ########## @@ -99,58 +100,139 @@ Pair<Map<String, String>, Map<String, String>> select(List<String> segments, int } else { replicaGroupSelected = requestId % instancePartitions.getNumReplicaGroups(); } - for (int iteration = 0; iteration < instancePartitions.getNumReplicaGroups(); iteration++) { - int replicaGroup = (replicaGroupSelected + iteration) % instancePartitions.getNumReplicaGroups(); - try { - return tryAssigning(segments, segmentStates, instancePartitions, replicaGroup); - } catch (Exception e) { - LOGGER.warn("Unable to select replica-group {} for table: {}", replicaGroup, _tableNameWithType, e); + + return tryAssigning(Sets.newHashSet(segments), segmentStates, instancePartitions, replicaGroupSelected); + } + + /** + * Returns a map from the segmentName to the corresponding server. It tries to select all servers from the + * preferredReplicaGroup, but if it fails, it will try to select the relevant server from other instance partitions. + */ + private Pair<Map<String, String>, Map<String, String>> tryAssigning( + Set<String> segments, + SegmentStates segmentStates, + InstancePartitions instancePartitions, + int preferredReplicaId) { + + Map<String, Integer> instanceToPartitionMap = instancePartitions.getInstanceToPartitionIdMap(); + Map<String, Set<String>> instanceToSegmentsMap = new HashMap<>(); + + // instanceToSegmentsMap stores the mapping from instance to the active segments it can serve. + for (String segment : segments) { + List<SegmentInstanceCandidate> candidates = segmentStates.getCandidates(segment); + Preconditions.checkState(candidates != null, "Failed to find servers for segment: %s", segment); + for (SegmentInstanceCandidate candidate : candidates) { Review Comment: Good catch, added `candidates.isEmpty()` to the Preconditions check. ########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java: ########## @@ -99,58 +100,139 @@ Pair<Map<String, String>, Map<String, String>> select(List<String> segments, int } else { replicaGroupSelected = requestId % instancePartitions.getNumReplicaGroups(); } - for (int iteration = 0; iteration < instancePartitions.getNumReplicaGroups(); iteration++) { - int replicaGroup = (replicaGroupSelected + iteration) % instancePartitions.getNumReplicaGroups(); - try { - return tryAssigning(segments, segmentStates, instancePartitions, replicaGroup); - } catch (Exception e) { - LOGGER.warn("Unable to select replica-group {} for table: {}", replicaGroup, _tableNameWithType, e); + + return tryAssigning(Sets.newHashSet(segments), segmentStates, instancePartitions, replicaGroupSelected); + } + + /** + * Returns a map from the segmentName to the corresponding server. It tries to select all servers from the + * preferredReplicaGroup, but if it fails, it will try to select the relevant server from other instance partitions. + */ + private Pair<Map<String, String>, Map<String, String>> tryAssigning( + Set<String> segments, + SegmentStates segmentStates, + InstancePartitions instancePartitions, + int preferredReplicaId) { + + Map<String, Integer> instanceToPartitionMap = instancePartitions.getInstanceToPartitionIdMap(); + Map<String, Set<String>> instanceToSegmentsMap = new HashMap<>(); + + // instanceToSegmentsMap stores the mapping from instance to the active segments it can serve. + for (String segment : segments) { + List<SegmentInstanceCandidate> candidates = segmentStates.getCandidates(segment); + Preconditions.checkState(candidates != null, "Failed to find servers for segment: %s", segment); + for (SegmentInstanceCandidate candidate : candidates) { + instanceToSegmentsMap + .computeIfAbsent(candidate.getInstance(), k -> new HashSet<>()) + .add(segment); } } - throw new RuntimeException( - String.format("Unable to find any replica-group to serve table: %s", _tableNameWithType)); + + // partitionToRequiredSegmentsMap stores the mapping from partition to the segments that need to be served. This + // is necessary to select appropriate replica group at the instance partition level. + Map<Integer, Set<String>> partitionToRequiredSegmentsMap = new HashMap<>(); + for (Map.Entry<String, Set<String>> entry : instanceToSegmentsMap.entrySet()) { + Integer partitionId = instanceToPartitionMap.get(entry.getKey()); + partitionToRequiredSegmentsMap + .computeIfAbsent(partitionId, k -> new HashSet<>()) + .addAll(entry.getValue()); + } + + // Assign segments to instances based on the partitionToRequiredSegmentsMap. This ensures that we select the + // appropriate replica group for each set of segments belonging to the same instance partition. + Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(); + int numPartitions = instancePartitions.getNumPartitions(); + for (int partition = 0; partition < numPartitions; partition++) { + Set<String> requiredSegments = partitionToRequiredSegmentsMap.get(partition); + if (requiredSegments != null) { + segmentToSelectedInstanceMap.putAll( Review Comment: Addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org