shauryachats commented on code in PR #15843:
URL: https://github.com/apache/pinot/pull/15843#discussion_r2106083549


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java:
##########
@@ -99,58 +100,139 @@ Pair<Map<String, String>, Map<String, String>> 
select(List<String> segments, int
     } else {
       replicaGroupSelected = requestId % 
instancePartitions.getNumReplicaGroups();
     }
-    for (int iteration = 0; iteration < 
instancePartitions.getNumReplicaGroups(); iteration++) {
-      int replicaGroup = (replicaGroupSelected + iteration) % 
instancePartitions.getNumReplicaGroups();
-      try {
-        return tryAssigning(segments, segmentStates, instancePartitions, 
replicaGroup);
-      } catch (Exception e) {
-        LOGGER.warn("Unable to select replica-group {} for table: {}", 
replicaGroup, _tableNameWithType, e);
+
+    return tryAssigning(Sets.newHashSet(segments), segmentStates, 
instancePartitions, replicaGroupSelected);
+  }
+
+  /**
+   * Returns a map from the segmentName to the corresponding server. It tries 
to select all servers from the
+   * preferredReplicaGroup, but if it fails, it will try to select the 
relevant server from other instance partitions.
+   */
+  private Pair<Map<String, String>, Map<String, String>> tryAssigning(
+    Set<String> segments,
+    SegmentStates segmentStates,
+    InstancePartitions instancePartitions,
+    int preferredReplicaId) {
+
+    Map<String, Integer> instanceToPartitionMap = 
instancePartitions.getInstanceToPartitionIdMap();
+    Map<String, Set<String>> instanceToSegmentsMap = new HashMap<>();
+
+    // instanceToSegmentsMap stores the mapping from instance to the active 
segments it can serve.
+    for (String segment : segments) {
+      List<SegmentInstanceCandidate> candidates = 
segmentStates.getCandidates(segment);
+      Preconditions.checkState(candidates != null, "Failed to find servers for 
segment: %s", segment);
+      for (SegmentInstanceCandidate candidate : candidates) {
+        instanceToSegmentsMap
+          .computeIfAbsent(candidate.getInstance(), k -> new HashSet<>())
+          .add(segment);
       }
     }
-    throw new RuntimeException(
-        String.format("Unable to find any replica-group to serve table: %s", 
_tableNameWithType));
+
+    // partitionToRequiredSegmentsMap stores the mapping from partition to the 
segments that need to be served. This
+    // is necessary to select appropriate replica group at the instance 
partition level.
+    Map<Integer, Set<String>> partitionToRequiredSegmentsMap = new HashMap<>();
+    for (Map.Entry<String, Set<String>> entry : 
instanceToSegmentsMap.entrySet()) {
+      Integer partitionId = instanceToPartitionMap.get(entry.getKey());
+      partitionToRequiredSegmentsMap
+        .computeIfAbsent(partitionId, k -> new HashSet<>())
+        .addAll(entry.getValue());
+    }
+
+    // Assign segments to instances based on the 
partitionToRequiredSegmentsMap. This ensures that we select the
+    // appropriate replica group for each set of segments belonging to the 
same instance partition.
+    Map<String, String> segmentToSelectedInstanceMap = new HashMap<>();
+    int numPartitions = instancePartitions.getNumPartitions();
+    for (int partition = 0; partition < numPartitions; partition++) {
+      Set<String> requiredSegments = 
partitionToRequiredSegmentsMap.get(partition);
+      if (requiredSegments != null) {
+        segmentToSelectedInstanceMap.putAll(
+          getSelectedInstancesForPartition(instanceToSegmentsMap, 
requiredSegments, partition, preferredReplicaId));
+      }
+    }
+
+    return computeOptionalSegments(segmentToSelectedInstanceMap, 
segmentStates);
   }
 
   /**
-   * Returns a map from the segmentName to the corresponding server in the 
given replica-group. If the is not enabled,
-   * we throw an exception.
+   * This method selects the instances for the given partition and segments. 
It iterates over the replica groups
+   * in a round-robin fashion, starting from the preferred replica group. It 
only selects the replica group if all
+   * the segments are available in the selected instances.
+   * If no replica group is found, it throws an exception.
    */
-  private Pair<Map<String, String>, Map<String, String>> 
tryAssigning(List<String> segments,
-      SegmentStates segmentStates, InstancePartitions instancePartitions, int 
replicaId) {
-    Set<String> instanceLookUpSet = new HashSet<>();
-    for (int partition = 0; partition < instancePartitions.getNumPartitions(); 
partition++) {
-      List<String> instances = instancePartitions.getInstances(partition, 
replicaId);
-      instanceLookUpSet.addAll(instances);
+  private Map<String, String> getSelectedInstancesForPartition(
+    Map<String, Set<String>> instanceToSegmentsMap,
+    Set<String> requiredSegments,
+    int partitionId,
+    int preferredReplicaId) {
+
+    int numReplicaGroups = _instancePartitions.getNumReplicaGroups();
+
+    for (int i = 0; i < numReplicaGroups; i += 1) {
+      int selectedReplicaGroup = (i + preferredReplicaId) % numReplicaGroups;
+      List<String> selectedInstances = 
_instancePartitions.getInstances(partitionId, selectedReplicaGroup);
+
+      Set<String> segmentsFromSelectedInstances = new HashSet<>();
+      for (String instance : selectedInstances) {
+        Set<String> servedSegments = instanceToSegmentsMap.get(instance);
+        if (servedSegments != null) {
+          segmentsFromSelectedInstances.addAll(servedSegments);
+        }
+      }
+
+      if (segmentsFromSelectedInstances.containsAll(requiredSegments)) {
+        Map<String, String> segmentToInstance = new HashMap<>();
+        for (String segment : requiredSegments) {
+          for (String instance : selectedInstances) {

Review Comment:
   Good catch! Instead of computing the segmentToInstances, since we need (and 
will have) only one instance per segment in an instance partition, we could 
alternatively iterate through all selected instances, and for each instance, we 
add the segment -> instance mapping in the map if it is not already present.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java:
##########
@@ -99,58 +100,139 @@ Pair<Map<String, String>, Map<String, String>> 
select(List<String> segments, int
     } else {
       replicaGroupSelected = requestId % 
instancePartitions.getNumReplicaGroups();
     }
-    for (int iteration = 0; iteration < 
instancePartitions.getNumReplicaGroups(); iteration++) {
-      int replicaGroup = (replicaGroupSelected + iteration) % 
instancePartitions.getNumReplicaGroups();
-      try {
-        return tryAssigning(segments, segmentStates, instancePartitions, 
replicaGroup);
-      } catch (Exception e) {
-        LOGGER.warn("Unable to select replica-group {} for table: {}", 
replicaGroup, _tableNameWithType, e);
+
+    return tryAssigning(Sets.newHashSet(segments), segmentStates, 
instancePartitions, replicaGroupSelected);
+  }
+
+  /**
+   * Returns a map from the segmentName to the corresponding server. It tries 
to select all servers from the
+   * preferredReplicaGroup, but if it fails, it will try to select the 
relevant server from other instance partitions.
+   */
+  private Pair<Map<String, String>, Map<String, String>> tryAssigning(
+    Set<String> segments,
+    SegmentStates segmentStates,
+    InstancePartitions instancePartitions,
+    int preferredReplicaId) {
+
+    Map<String, Integer> instanceToPartitionMap = 
instancePartitions.getInstanceToPartitionIdMap();
+    Map<String, Set<String>> instanceToSegmentsMap = new HashMap<>();
+
+    // instanceToSegmentsMap stores the mapping from instance to the active 
segments it can serve.
+    for (String segment : segments) {
+      List<SegmentInstanceCandidate> candidates = 
segmentStates.getCandidates(segment);
+      Preconditions.checkState(candidates != null, "Failed to find servers for 
segment: %s", segment);
+      for (SegmentInstanceCandidate candidate : candidates) {
+        instanceToSegmentsMap
+          .computeIfAbsent(candidate.getInstance(), k -> new HashSet<>())
+          .add(segment);
       }
     }
-    throw new RuntimeException(
-        String.format("Unable to find any replica-group to serve table: %s", 
_tableNameWithType));
+
+    // partitionToRequiredSegmentsMap stores the mapping from partition to the 
segments that need to be served. This
+    // is necessary to select appropriate replica group at the instance 
partition level.
+    Map<Integer, Set<String>> partitionToRequiredSegmentsMap = new HashMap<>();
+    for (Map.Entry<String, Set<String>> entry : 
instanceToSegmentsMap.entrySet()) {
+      Integer partitionId = instanceToPartitionMap.get(entry.getKey());
+      partitionToRequiredSegmentsMap
+        .computeIfAbsent(partitionId, k -> new HashSet<>())
+        .addAll(entry.getValue());
+    }
+
+    // Assign segments to instances based on the 
partitionToRequiredSegmentsMap. This ensures that we select the
+    // appropriate replica group for each set of segments belonging to the 
same instance partition.
+    Map<String, String> segmentToSelectedInstanceMap = new HashMap<>();
+    int numPartitions = instancePartitions.getNumPartitions();
+    for (int partition = 0; partition < numPartitions; partition++) {
+      Set<String> requiredSegments = 
partitionToRequiredSegmentsMap.get(partition);
+      if (requiredSegments != null) {
+        segmentToSelectedInstanceMap.putAll(

Review Comment:
   Makes sense, addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to