ankitsultana commented on code in PR #15843:
URL: https://github.com/apache/pinot/pull/15843#discussion_r2116698769


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java:
##########
@@ -99,58 +100,151 @@ Pair<Map<String, String>, Map<String, String>> 
select(List<String> segments, int
     } else {
       replicaGroupSelected = requestId % 
instancePartitions.getNumReplicaGroups();
     }
-    for (int iteration = 0; iteration < 
instancePartitions.getNumReplicaGroups(); iteration++) {
-      int replicaGroup = (replicaGroupSelected + iteration) % 
instancePartitions.getNumReplicaGroups();
-      try {
-        return tryAssigning(segments, segmentStates, instancePartitions, 
replicaGroup);
-      } catch (Exception e) {
-        LOGGER.warn("Unable to select replica-group {} for table: {}", 
replicaGroup, _tableNameWithType, e);
+
+    return assign(Sets.newHashSet(segments), segmentStates, 
instancePartitions, replicaGroupSelected);
+  }
+
+  /**
+   * Returns a map from the segmentName to the corresponding server. It tries 
to select all servers from the
+   * preferredReplicaGroup, but if it fails, it will try to select the 
relevant server from other instance partitions.
+   *
+   * @return A pair of maps, where the first map contains the segments that 
are assigned to a server and the second
+   * map contains the segments that are optional (i.e., the server is not 
online to serve that segment).
+   * Example:
+   * {
+   *   "required_segments": {
+   *     "segment1": "server1",
+   *     "segment2": "server2"
+   *   },
+   *   "optional_segments": {
+   *     "segment3": "server3",
+   *     "segment4": "server4"
+   *   }
+   * }
+   */
+  private Pair<Map<String, String>, Map<String, String>> assign(Set<String> 
segments,
+    SegmentStates segmentStates, InstancePartitions instancePartitions, int 
preferredReplicaId) {
+    Map<String, Integer> instanceToPartitionMap = 
instancePartitions.getInstanceToPartitionIdMap();
+    Map<String, Set<String>> instanceToSegmentsMap = new HashMap<>();
+
+    // instanceToSegmentsMap stores the mapping from instance to the active 
segments it can serve.
+    for (String segment : segments) {
+      List<SegmentInstanceCandidate> candidates = 
segmentStates.getCandidates(segment);
+      Preconditions.checkState(candidates != null && !candidates.isEmpty(),
+        "Failed to find servers for segment: %s", segment);
+      for (SegmentInstanceCandidate candidate : candidates) {
+        instanceToSegmentsMap
+          .computeIfAbsent(candidate.getInstance(), k -> new HashSet<>())
+          .add(segment);
       }
     }
-    throw new RuntimeException(
-        String.format("Unable to find any replica-group to serve table: %s", 
_tableNameWithType));
+
+    // partitionToRequiredSegmentsMap stores the mapping from partition to the 
segments that need to be served. This
+    // is necessary to select appropriate replica group at the instance 
partition level.
+    Map<Integer, Set<String>> partitionToRequiredSegmentsMap = new HashMap<>();
+    for (Map.Entry<String, Set<String>> entry : 
instanceToSegmentsMap.entrySet()) {
+      Integer partitionId = instanceToPartitionMap.get(entry.getKey());
+      partitionToRequiredSegmentsMap
+        .computeIfAbsent(partitionId, k -> new HashSet<>())
+        .addAll(entry.getValue());
+    }
+
+    // Assign segments to instances based on the 
partitionToRequiredSegmentsMap. This ensures that we select the
+    // appropriate replica group for each set of segments belonging to the 
same instance partition.
+    Map<String, String> segmentToSelectedInstanceMap = new HashMap<>();
+    int numPartitions = instancePartitions.getNumPartitions();
+    for (int partition = 0; partition < numPartitions; partition++) {
+      Set<String> requiredSegments = 
partitionToRequiredSegmentsMap.get(partition);
+      if (requiredSegments != null) {
+        getSelectedInstancesForPartition(instanceToSegmentsMap, 
requiredSegments, partition, preferredReplicaId,
+          segmentToSelectedInstanceMap);
+      }
+    }
+
+    return computeOptionalSegments(segmentToSelectedInstanceMap, 
segmentStates);

Review Comment:
   **Note**: Handling optional segments in the end means that an optional 
segment could cause a no replica-group found error. (I assume an optional 
segment is likely to have number of candidates < replication-factor).
   
   Seems like that was the case before too, but we might have to revisit this 
at some point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to