shauryachats commented on code in PR #15843:
URL: https://github.com/apache/pinot/pull/15843#discussion_r2106082446


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java:
##########
@@ -99,58 +100,139 @@ Pair<Map<String, String>, Map<String, String>> 
select(List<String> segments, int
     } else {
       replicaGroupSelected = requestId % 
instancePartitions.getNumReplicaGroups();
     }
-    for (int iteration = 0; iteration < 
instancePartitions.getNumReplicaGroups(); iteration++) {
-      int replicaGroup = (replicaGroupSelected + iteration) % 
instancePartitions.getNumReplicaGroups();
-      try {
-        return tryAssigning(segments, segmentStates, instancePartitions, 
replicaGroup);
-      } catch (Exception e) {
-        LOGGER.warn("Unable to select replica-group {} for table: {}", 
replicaGroup, _tableNameWithType, e);
+
+    return tryAssigning(Sets.newHashSet(segments), segmentStates, 
instancePartitions, replicaGroupSelected);
+  }
+
+  /**
+   * Returns a map from the segmentName to the corresponding server. It tries 
to select all servers from the
+   * preferredReplicaGroup, but if it fails, it will try to select the 
relevant server from other instance partitions.
+   */
+  private Pair<Map<String, String>, Map<String, String>> tryAssigning(
+    Set<String> segments,
+    SegmentStates segmentStates,
+    InstancePartitions instancePartitions,
+    int preferredReplicaId) {
+
+    Map<String, Integer> instanceToPartitionMap = 
instancePartitions.getInstanceToPartitionIdMap();
+    Map<String, Set<String>> instanceToSegmentsMap = new HashMap<>();
+
+    // instanceToSegmentsMap stores the mapping from instance to the active 
segments it can serve.
+    for (String segment : segments) {
+      List<SegmentInstanceCandidate> candidates = 
segmentStates.getCandidates(segment);
+      Preconditions.checkState(candidates != null, "Failed to find servers for 
segment: %s", segment);
+      for (SegmentInstanceCandidate candidate : candidates) {

Review Comment:
   Good catch, added `candidates.isEmpty()` to the Preconditions check.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java:
##########
@@ -99,58 +100,139 @@ Pair<Map<String, String>, Map<String, String>> 
select(List<String> segments, int
     } else {
       replicaGroupSelected = requestId % 
instancePartitions.getNumReplicaGroups();
     }
-    for (int iteration = 0; iteration < 
instancePartitions.getNumReplicaGroups(); iteration++) {
-      int replicaGroup = (replicaGroupSelected + iteration) % 
instancePartitions.getNumReplicaGroups();
-      try {
-        return tryAssigning(segments, segmentStates, instancePartitions, 
replicaGroup);
-      } catch (Exception e) {
-        LOGGER.warn("Unable to select replica-group {} for table: {}", 
replicaGroup, _tableNameWithType, e);
+
+    return tryAssigning(Sets.newHashSet(segments), segmentStates, 
instancePartitions, replicaGroupSelected);
+  }
+
+  /**
+   * Returns a map from the segmentName to the corresponding server. It tries 
to select all servers from the
+   * preferredReplicaGroup, but if it fails, it will try to select the 
relevant server from other instance partitions.
+   */
+  private Pair<Map<String, String>, Map<String, String>> tryAssigning(
+    Set<String> segments,
+    SegmentStates segmentStates,
+    InstancePartitions instancePartitions,
+    int preferredReplicaId) {
+
+    Map<String, Integer> instanceToPartitionMap = 
instancePartitions.getInstanceToPartitionIdMap();
+    Map<String, Set<String>> instanceToSegmentsMap = new HashMap<>();
+
+    // instanceToSegmentsMap stores the mapping from instance to the active 
segments it can serve.
+    for (String segment : segments) {
+      List<SegmentInstanceCandidate> candidates = 
segmentStates.getCandidates(segment);
+      Preconditions.checkState(candidates != null, "Failed to find servers for 
segment: %s", segment);
+      for (SegmentInstanceCandidate candidate : candidates) {
+        instanceToSegmentsMap
+          .computeIfAbsent(candidate.getInstance(), k -> new HashSet<>())
+          .add(segment);
       }
     }
-    throw new RuntimeException(
-        String.format("Unable to find any replica-group to serve table: %s", 
_tableNameWithType));
+
+    // partitionToRequiredSegmentsMap stores the mapping from partition to the 
segments that need to be served. This
+    // is necessary to select appropriate replica group at the instance 
partition level.
+    Map<Integer, Set<String>> partitionToRequiredSegmentsMap = new HashMap<>();
+    for (Map.Entry<String, Set<String>> entry : 
instanceToSegmentsMap.entrySet()) {
+      Integer partitionId = instanceToPartitionMap.get(entry.getKey());
+      partitionToRequiredSegmentsMap
+        .computeIfAbsent(partitionId, k -> new HashSet<>())
+        .addAll(entry.getValue());
+    }
+
+    // Assign segments to instances based on the 
partitionToRequiredSegmentsMap. This ensures that we select the
+    // appropriate replica group for each set of segments belonging to the 
same instance partition.
+    Map<String, String> segmentToSelectedInstanceMap = new HashMap<>();
+    int numPartitions = instancePartitions.getNumPartitions();
+    for (int partition = 0; partition < numPartitions; partition++) {
+      Set<String> requiredSegments = 
partitionToRequiredSegmentsMap.get(partition);
+      if (requiredSegments != null) {
+        segmentToSelectedInstanceMap.putAll(

Review Comment:
   Addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to