ankitsultana commented on code in PR #15843: URL: https://github.com/apache/pinot/pull/15843#discussion_r2105708385
########## pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java: ########## @@ -111,6 +113,21 @@ public List<String> getInstances(int partitionId, int replicaGroupId) { .get(Integer.toString(partitionId) + PARTITION_REPLICA_GROUP_SEPARATOR + replicaGroupId); } + public Map<String, Integer> getInstanceToPartitionIdMap() { Review Comment: javadoc ########## pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java: ########## @@ -111,6 +113,21 @@ public List<String> getInstances(int partitionId, int replicaGroupId) { .get(Integer.toString(partitionId) + PARTITION_REPLICA_GROUP_SEPARATOR + replicaGroupId); } + public Map<String, Integer> getInstanceToPartitionIdMap() { + Map<String, Integer> instanceToPartitionIdMap = new HashMap<>(); + + for (Map.Entry<String, List<String>> entry : _partitionToInstancesMap.entrySet()) { + Pair<Integer, Integer> partitionIdAndReplicaGroupId = getPartitionIdAndReplicaGroupId(entry.getKey()); + int partitionId = partitionIdAndReplicaGroupId.getLeft(); + Review Comment: let's try to keep empty lines minimal. they seem arbitrary ########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java: ########## @@ -99,58 +100,139 @@ Pair<Map<String, String>, Map<String, String>> select(List<String> segments, int } else { replicaGroupSelected = requestId % instancePartitions.getNumReplicaGroups(); } - for (int iteration = 0; iteration < instancePartitions.getNumReplicaGroups(); iteration++) { - int replicaGroup = (replicaGroupSelected + iteration) % instancePartitions.getNumReplicaGroups(); - try { - return tryAssigning(segments, segmentStates, instancePartitions, replicaGroup); - } catch (Exception e) { - LOGGER.warn("Unable to select replica-group {} for table: {}", replicaGroup, _tableNameWithType, e); + + return tryAssigning(Sets.newHashSet(segments), segmentStates, instancePartitions, replicaGroupSelected); + } + + /** + * Returns a map from the segmentName to the corresponding server. It tries to select all servers from the + * preferredReplicaGroup, but if it fails, it will try to select the relevant server from other instance partitions. + */ + private Pair<Map<String, String>, Map<String, String>> tryAssigning( + Set<String> segments, + SegmentStates segmentStates, + InstancePartitions instancePartitions, + int preferredReplicaId) { + + Map<String, Integer> instanceToPartitionMap = instancePartitions.getInstanceToPartitionIdMap(); + Map<String, Set<String>> instanceToSegmentsMap = new HashMap<>(); + + // instanceToSegmentsMap stores the mapping from instance to the active segments it can serve. + for (String segment : segments) { + List<SegmentInstanceCandidate> candidates = segmentStates.getCandidates(segment); + Preconditions.checkState(candidates != null, "Failed to find servers for segment: %s", segment); + for (SegmentInstanceCandidate candidate : candidates) { + instanceToSegmentsMap + .computeIfAbsent(candidate.getInstance(), k -> new HashSet<>()) + .add(segment); } } - throw new RuntimeException( - String.format("Unable to find any replica-group to serve table: %s", _tableNameWithType)); + + // partitionToRequiredSegmentsMap stores the mapping from partition to the segments that need to be served. This + // is necessary to select appropriate replica group at the instance partition level. + Map<Integer, Set<String>> partitionToRequiredSegmentsMap = new HashMap<>(); + for (Map.Entry<String, Set<String>> entry : instanceToSegmentsMap.entrySet()) { + Integer partitionId = instanceToPartitionMap.get(entry.getKey()); + partitionToRequiredSegmentsMap + .computeIfAbsent(partitionId, k -> new HashSet<>()) + .addAll(entry.getValue()); + } + + // Assign segments to instances based on the partitionToRequiredSegmentsMap. This ensures that we select the + // appropriate replica group for each set of segments belonging to the same instance partition. + Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(); + int numPartitions = instancePartitions.getNumPartitions(); + for (int partition = 0; partition < numPartitions; partition++) { + Set<String> requiredSegments = partitionToRequiredSegmentsMap.get(partition); + if (requiredSegments != null) { + segmentToSelectedInstanceMap.putAll( Review Comment: We have duplicate map insertions here, one within `getSelectedInstancesForPartition` and the one for `segmentToSelectedInstanceMap`. You can simply pass in `segmentToSelectedInstanceMap` to the method and mutate it directly ########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java: ########## @@ -99,58 +100,139 @@ Pair<Map<String, String>, Map<String, String>> select(List<String> segments, int } else { replicaGroupSelected = requestId % instancePartitions.getNumReplicaGroups(); } - for (int iteration = 0; iteration < instancePartitions.getNumReplicaGroups(); iteration++) { - int replicaGroup = (replicaGroupSelected + iteration) % instancePartitions.getNumReplicaGroups(); - try { - return tryAssigning(segments, segmentStates, instancePartitions, replicaGroup); - } catch (Exception e) { - LOGGER.warn("Unable to select replica-group {} for table: {}", replicaGroup, _tableNameWithType, e); + + return tryAssigning(Sets.newHashSet(segments), segmentStates, instancePartitions, replicaGroupSelected); + } + + /** + * Returns a map from the segmentName to the corresponding server. It tries to select all servers from the + * preferredReplicaGroup, but if it fails, it will try to select the relevant server from other instance partitions. + */ + private Pair<Map<String, String>, Map<String, String>> tryAssigning( Review Comment: Can you add an example of what the return value looks like here? nit: format. pinot code-style is to only spill to next line when the next word or semantic arg doesn't fit in existing line. ########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java: ########## @@ -99,58 +100,139 @@ Pair<Map<String, String>, Map<String, String>> select(List<String> segments, int } else { replicaGroupSelected = requestId % instancePartitions.getNumReplicaGroups(); } - for (int iteration = 0; iteration < instancePartitions.getNumReplicaGroups(); iteration++) { - int replicaGroup = (replicaGroupSelected + iteration) % instancePartitions.getNumReplicaGroups(); - try { - return tryAssigning(segments, segmentStates, instancePartitions, replicaGroup); - } catch (Exception e) { - LOGGER.warn("Unable to select replica-group {} for table: {}", replicaGroup, _tableNameWithType, e); + + return tryAssigning(Sets.newHashSet(segments), segmentStates, instancePartitions, replicaGroupSelected); + } + + /** + * Returns a map from the segmentName to the corresponding server. It tries to select all servers from the + * preferredReplicaGroup, but if it fails, it will try to select the relevant server from other instance partitions. + */ + private Pair<Map<String, String>, Map<String, String>> tryAssigning( + Set<String> segments, + SegmentStates segmentStates, + InstancePartitions instancePartitions, + int preferredReplicaId) { + + Map<String, Integer> instanceToPartitionMap = instancePartitions.getInstanceToPartitionIdMap(); + Map<String, Set<String>> instanceToSegmentsMap = new HashMap<>(); + + // instanceToSegmentsMap stores the mapping from instance to the active segments it can serve. + for (String segment : segments) { + List<SegmentInstanceCandidate> candidates = segmentStates.getCandidates(segment); + Preconditions.checkState(candidates != null, "Failed to find servers for segment: %s", segment); + for (SegmentInstanceCandidate candidate : candidates) { + instanceToSegmentsMap + .computeIfAbsent(candidate.getInstance(), k -> new HashSet<>()) + .add(segment); } } - throw new RuntimeException( - String.format("Unable to find any replica-group to serve table: %s", _tableNameWithType)); + + // partitionToRequiredSegmentsMap stores the mapping from partition to the segments that need to be served. This + // is necessary to select appropriate replica group at the instance partition level. + Map<Integer, Set<String>> partitionToRequiredSegmentsMap = new HashMap<>(); + for (Map.Entry<String, Set<String>> entry : instanceToSegmentsMap.entrySet()) { + Integer partitionId = instanceToPartitionMap.get(entry.getKey()); + partitionToRequiredSegmentsMap + .computeIfAbsent(partitionId, k -> new HashSet<>()) + .addAll(entry.getValue()); + } + + // Assign segments to instances based on the partitionToRequiredSegmentsMap. This ensures that we select the + // appropriate replica group for each set of segments belonging to the same instance partition. + Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(); + int numPartitions = instancePartitions.getNumPartitions(); + for (int partition = 0; partition < numPartitions; partition++) { + Set<String> requiredSegments = partitionToRequiredSegmentsMap.get(partition); + if (requiredSegments != null) { + segmentToSelectedInstanceMap.putAll( + getSelectedInstancesForPartition(instanceToSegmentsMap, requiredSegments, partition, preferredReplicaId)); + } + } + + return computeOptionalSegments(segmentToSelectedInstanceMap, segmentStates); } /** - * Returns a map from the segmentName to the corresponding server in the given replica-group. If the is not enabled, - * we throw an exception. + * This method selects the instances for the given partition and segments. It iterates over the replica groups + * in a round-robin fashion, starting from the preferred replica group. It only selects the replica group if all + * the segments are available in the selected instances. + * If no replica group is found, it throws an exception. */ - private Pair<Map<String, String>, Map<String, String>> tryAssigning(List<String> segments, - SegmentStates segmentStates, InstancePartitions instancePartitions, int replicaId) { - Set<String> instanceLookUpSet = new HashSet<>(); - for (int partition = 0; partition < instancePartitions.getNumPartitions(); partition++) { - List<String> instances = instancePartitions.getInstances(partition, replicaId); - instanceLookUpSet.addAll(instances); + private Map<String, String> getSelectedInstancesForPartition( + Map<String, Set<String>> instanceToSegmentsMap, + Set<String> requiredSegments, + int partitionId, + int preferredReplicaId) { + + int numReplicaGroups = _instancePartitions.getNumReplicaGroups(); + + for (int i = 0; i < numReplicaGroups; i += 1) { + int selectedReplicaGroup = (i + preferredReplicaId) % numReplicaGroups; + List<String> selectedInstances = _instancePartitions.getInstances(partitionId, selectedReplicaGroup); + + Set<String> segmentsFromSelectedInstances = new HashSet<>(); + for (String instance : selectedInstances) { + Set<String> servedSegments = instanceToSegmentsMap.get(instance); + if (servedSegments != null) { + segmentsFromSelectedInstances.addAll(servedSegments); + } + } + + if (segmentsFromSelectedInstances.containsAll(requiredSegments)) { + Map<String, String> segmentToInstance = new HashMap<>(); + for (String segment : requiredSegments) { + for (String instance : selectedInstances) { Review Comment: this seems very costly. We are iterating over all segments and all instances in the selected instance partition and then doing a lookup on the instance to segments map. A cheaper way might be to simply compute the segmentToInstances map in L21, and then iterate over that map and do a lookup on a set of selectedInstances. side-note: we should ideally throw if the segment was not added to `segmentToInstance`. otherwise bugs can be very hard to catch ########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java: ########## @@ -99,58 +100,139 @@ Pair<Map<String, String>, Map<String, String>> select(List<String> segments, int } else { replicaGroupSelected = requestId % instancePartitions.getNumReplicaGroups(); } - for (int iteration = 0; iteration < instancePartitions.getNumReplicaGroups(); iteration++) { - int replicaGroup = (replicaGroupSelected + iteration) % instancePartitions.getNumReplicaGroups(); - try { - return tryAssigning(segments, segmentStates, instancePartitions, replicaGroup); - } catch (Exception e) { - LOGGER.warn("Unable to select replica-group {} for table: {}", replicaGroup, _tableNameWithType, e); + + return tryAssigning(Sets.newHashSet(segments), segmentStates, instancePartitions, replicaGroupSelected); + } + + /** + * Returns a map from the segmentName to the corresponding server. It tries to select all servers from the + * preferredReplicaGroup, but if it fails, it will try to select the relevant server from other instance partitions. + */ + private Pair<Map<String, String>, Map<String, String>> tryAssigning( + Set<String> segments, + SegmentStates segmentStates, + InstancePartitions instancePartitions, + int preferredReplicaId) { + + Map<String, Integer> instanceToPartitionMap = instancePartitions.getInstanceToPartitionIdMap(); + Map<String, Set<String>> instanceToSegmentsMap = new HashMap<>(); + + // instanceToSegmentsMap stores the mapping from instance to the active segments it can serve. + for (String segment : segments) { + List<SegmentInstanceCandidate> candidates = segmentStates.getCandidates(segment); + Preconditions.checkState(candidates != null, "Failed to find servers for segment: %s", segment); + for (SegmentInstanceCandidate candidate : candidates) { Review Comment: need to handle scenario when `candidates` list is empty. otherwise the segment will be silently ignored ########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java: ########## @@ -99,58 +100,139 @@ Pair<Map<String, String>, Map<String, String>> select(List<String> segments, int } else { replicaGroupSelected = requestId % instancePartitions.getNumReplicaGroups(); } - for (int iteration = 0; iteration < instancePartitions.getNumReplicaGroups(); iteration++) { - int replicaGroup = (replicaGroupSelected + iteration) % instancePartitions.getNumReplicaGroups(); - try { - return tryAssigning(segments, segmentStates, instancePartitions, replicaGroup); - } catch (Exception e) { - LOGGER.warn("Unable to select replica-group {} for table: {}", replicaGroup, _tableNameWithType, e); + + return tryAssigning(Sets.newHashSet(segments), segmentStates, instancePartitions, replicaGroupSelected); Review Comment: If a table is getting rebalanced, some assumptions around the data assignment may break temporarily. During rebalance, afair, Instance Partitions are updated immediately and Ideal State is updated slowly to allow EV to change slowly. In such a state, can you think about what the behavior of this routing strategy will be? The other routing strategies use Ideal State to define replica-groups and server/segment assignments, so they'll have different behaviors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org