ankitsultana commented on code in PR #15843:
URL: https://github.com/apache/pinot/pull/15843#discussion_r2105708385


##########
pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java:
##########
@@ -111,6 +113,21 @@ public List<String> getInstances(int partitionId, int 
replicaGroupId) {
         .get(Integer.toString(partitionId) + PARTITION_REPLICA_GROUP_SEPARATOR 
+ replicaGroupId);
   }
 
+  public Map<String, Integer> getInstanceToPartitionIdMap() {

Review Comment:
   javadoc



##########
pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java:
##########
@@ -111,6 +113,21 @@ public List<String> getInstances(int partitionId, int 
replicaGroupId) {
         .get(Integer.toString(partitionId) + PARTITION_REPLICA_GROUP_SEPARATOR 
+ replicaGroupId);
   }
 
+  public Map<String, Integer> getInstanceToPartitionIdMap() {
+    Map<String, Integer> instanceToPartitionIdMap = new HashMap<>();
+
+    for (Map.Entry<String, List<String>> entry : 
_partitionToInstancesMap.entrySet()) {
+      Pair<Integer, Integer> partitionIdAndReplicaGroupId = 
getPartitionIdAndReplicaGroupId(entry.getKey());
+      int partitionId = partitionIdAndReplicaGroupId.getLeft();
+

Review Comment:
   let's try to keep empty lines minimal. they seem arbitrary



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java:
##########
@@ -99,58 +100,139 @@ Pair<Map<String, String>, Map<String, String>> 
select(List<String> segments, int
     } else {
       replicaGroupSelected = requestId % 
instancePartitions.getNumReplicaGroups();
     }
-    for (int iteration = 0; iteration < 
instancePartitions.getNumReplicaGroups(); iteration++) {
-      int replicaGroup = (replicaGroupSelected + iteration) % 
instancePartitions.getNumReplicaGroups();
-      try {
-        return tryAssigning(segments, segmentStates, instancePartitions, 
replicaGroup);
-      } catch (Exception e) {
-        LOGGER.warn("Unable to select replica-group {} for table: {}", 
replicaGroup, _tableNameWithType, e);
+
+    return tryAssigning(Sets.newHashSet(segments), segmentStates, 
instancePartitions, replicaGroupSelected);
+  }
+
+  /**
+   * Returns a map from the segmentName to the corresponding server. It tries 
to select all servers from the
+   * preferredReplicaGroup, but if it fails, it will try to select the 
relevant server from other instance partitions.
+   */
+  private Pair<Map<String, String>, Map<String, String>> tryAssigning(
+    Set<String> segments,
+    SegmentStates segmentStates,
+    InstancePartitions instancePartitions,
+    int preferredReplicaId) {
+
+    Map<String, Integer> instanceToPartitionMap = 
instancePartitions.getInstanceToPartitionIdMap();
+    Map<String, Set<String>> instanceToSegmentsMap = new HashMap<>();
+
+    // instanceToSegmentsMap stores the mapping from instance to the active 
segments it can serve.
+    for (String segment : segments) {
+      List<SegmentInstanceCandidate> candidates = 
segmentStates.getCandidates(segment);
+      Preconditions.checkState(candidates != null, "Failed to find servers for 
segment: %s", segment);
+      for (SegmentInstanceCandidate candidate : candidates) {
+        instanceToSegmentsMap
+          .computeIfAbsent(candidate.getInstance(), k -> new HashSet<>())
+          .add(segment);
       }
     }
-    throw new RuntimeException(
-        String.format("Unable to find any replica-group to serve table: %s", 
_tableNameWithType));
+
+    // partitionToRequiredSegmentsMap stores the mapping from partition to the 
segments that need to be served. This
+    // is necessary to select appropriate replica group at the instance 
partition level.
+    Map<Integer, Set<String>> partitionToRequiredSegmentsMap = new HashMap<>();
+    for (Map.Entry<String, Set<String>> entry : 
instanceToSegmentsMap.entrySet()) {
+      Integer partitionId = instanceToPartitionMap.get(entry.getKey());
+      partitionToRequiredSegmentsMap
+        .computeIfAbsent(partitionId, k -> new HashSet<>())
+        .addAll(entry.getValue());
+    }
+
+    // Assign segments to instances based on the 
partitionToRequiredSegmentsMap. This ensures that we select the
+    // appropriate replica group for each set of segments belonging to the 
same instance partition.
+    Map<String, String> segmentToSelectedInstanceMap = new HashMap<>();
+    int numPartitions = instancePartitions.getNumPartitions();
+    for (int partition = 0; partition < numPartitions; partition++) {
+      Set<String> requiredSegments = 
partitionToRequiredSegmentsMap.get(partition);
+      if (requiredSegments != null) {
+        segmentToSelectedInstanceMap.putAll(

Review Comment:
   We have duplicate map insertions here, one within 
`getSelectedInstancesForPartition` and the one for 
`segmentToSelectedInstanceMap`. You can simply pass in 
`segmentToSelectedInstanceMap` to the method and mutate it directly



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java:
##########
@@ -99,58 +100,139 @@ Pair<Map<String, String>, Map<String, String>> 
select(List<String> segments, int
     } else {
       replicaGroupSelected = requestId % 
instancePartitions.getNumReplicaGroups();
     }
-    for (int iteration = 0; iteration < 
instancePartitions.getNumReplicaGroups(); iteration++) {
-      int replicaGroup = (replicaGroupSelected + iteration) % 
instancePartitions.getNumReplicaGroups();
-      try {
-        return tryAssigning(segments, segmentStates, instancePartitions, 
replicaGroup);
-      } catch (Exception e) {
-        LOGGER.warn("Unable to select replica-group {} for table: {}", 
replicaGroup, _tableNameWithType, e);
+
+    return tryAssigning(Sets.newHashSet(segments), segmentStates, 
instancePartitions, replicaGroupSelected);
+  }
+
+  /**
+   * Returns a map from the segmentName to the corresponding server. It tries 
to select all servers from the
+   * preferredReplicaGroup, but if it fails, it will try to select the 
relevant server from other instance partitions.
+   */
+  private Pair<Map<String, String>, Map<String, String>> tryAssigning(

Review Comment:
   Can you add an example of what the return value looks like here?
   
   nit: format. pinot code-style is to only spill to next line when the next 
word or semantic arg doesn't fit in existing line.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java:
##########
@@ -99,58 +100,139 @@ Pair<Map<String, String>, Map<String, String>> 
select(List<String> segments, int
     } else {
       replicaGroupSelected = requestId % 
instancePartitions.getNumReplicaGroups();
     }
-    for (int iteration = 0; iteration < 
instancePartitions.getNumReplicaGroups(); iteration++) {
-      int replicaGroup = (replicaGroupSelected + iteration) % 
instancePartitions.getNumReplicaGroups();
-      try {
-        return tryAssigning(segments, segmentStates, instancePartitions, 
replicaGroup);
-      } catch (Exception e) {
-        LOGGER.warn("Unable to select replica-group {} for table: {}", 
replicaGroup, _tableNameWithType, e);
+
+    return tryAssigning(Sets.newHashSet(segments), segmentStates, 
instancePartitions, replicaGroupSelected);
+  }
+
+  /**
+   * Returns a map from the segmentName to the corresponding server. It tries 
to select all servers from the
+   * preferredReplicaGroup, but if it fails, it will try to select the 
relevant server from other instance partitions.
+   */
+  private Pair<Map<String, String>, Map<String, String>> tryAssigning(
+    Set<String> segments,
+    SegmentStates segmentStates,
+    InstancePartitions instancePartitions,
+    int preferredReplicaId) {
+
+    Map<String, Integer> instanceToPartitionMap = 
instancePartitions.getInstanceToPartitionIdMap();
+    Map<String, Set<String>> instanceToSegmentsMap = new HashMap<>();
+
+    // instanceToSegmentsMap stores the mapping from instance to the active 
segments it can serve.
+    for (String segment : segments) {
+      List<SegmentInstanceCandidate> candidates = 
segmentStates.getCandidates(segment);
+      Preconditions.checkState(candidates != null, "Failed to find servers for 
segment: %s", segment);
+      for (SegmentInstanceCandidate candidate : candidates) {
+        instanceToSegmentsMap
+          .computeIfAbsent(candidate.getInstance(), k -> new HashSet<>())
+          .add(segment);
       }
     }
-    throw new RuntimeException(
-        String.format("Unable to find any replica-group to serve table: %s", 
_tableNameWithType));
+
+    // partitionToRequiredSegmentsMap stores the mapping from partition to the 
segments that need to be served. This
+    // is necessary to select appropriate replica group at the instance 
partition level.
+    Map<Integer, Set<String>> partitionToRequiredSegmentsMap = new HashMap<>();
+    for (Map.Entry<String, Set<String>> entry : 
instanceToSegmentsMap.entrySet()) {
+      Integer partitionId = instanceToPartitionMap.get(entry.getKey());
+      partitionToRequiredSegmentsMap
+        .computeIfAbsent(partitionId, k -> new HashSet<>())
+        .addAll(entry.getValue());
+    }
+
+    // Assign segments to instances based on the 
partitionToRequiredSegmentsMap. This ensures that we select the
+    // appropriate replica group for each set of segments belonging to the 
same instance partition.
+    Map<String, String> segmentToSelectedInstanceMap = new HashMap<>();
+    int numPartitions = instancePartitions.getNumPartitions();
+    for (int partition = 0; partition < numPartitions; partition++) {
+      Set<String> requiredSegments = 
partitionToRequiredSegmentsMap.get(partition);
+      if (requiredSegments != null) {
+        segmentToSelectedInstanceMap.putAll(
+          getSelectedInstancesForPartition(instanceToSegmentsMap, 
requiredSegments, partition, preferredReplicaId));
+      }
+    }
+
+    return computeOptionalSegments(segmentToSelectedInstanceMap, 
segmentStates);
   }
 
   /**
-   * Returns a map from the segmentName to the corresponding server in the 
given replica-group. If the is not enabled,
-   * we throw an exception.
+   * This method selects the instances for the given partition and segments. 
It iterates over the replica groups
+   * in a round-robin fashion, starting from the preferred replica group. It 
only selects the replica group if all
+   * the segments are available in the selected instances.
+   * If no replica group is found, it throws an exception.
    */
-  private Pair<Map<String, String>, Map<String, String>> 
tryAssigning(List<String> segments,
-      SegmentStates segmentStates, InstancePartitions instancePartitions, int 
replicaId) {
-    Set<String> instanceLookUpSet = new HashSet<>();
-    for (int partition = 0; partition < instancePartitions.getNumPartitions(); 
partition++) {
-      List<String> instances = instancePartitions.getInstances(partition, 
replicaId);
-      instanceLookUpSet.addAll(instances);
+  private Map<String, String> getSelectedInstancesForPartition(
+    Map<String, Set<String>> instanceToSegmentsMap,
+    Set<String> requiredSegments,
+    int partitionId,
+    int preferredReplicaId) {
+
+    int numReplicaGroups = _instancePartitions.getNumReplicaGroups();
+
+    for (int i = 0; i < numReplicaGroups; i += 1) {
+      int selectedReplicaGroup = (i + preferredReplicaId) % numReplicaGroups;
+      List<String> selectedInstances = 
_instancePartitions.getInstances(partitionId, selectedReplicaGroup);
+
+      Set<String> segmentsFromSelectedInstances = new HashSet<>();
+      for (String instance : selectedInstances) {
+        Set<String> servedSegments = instanceToSegmentsMap.get(instance);
+        if (servedSegments != null) {
+          segmentsFromSelectedInstances.addAll(servedSegments);
+        }
+      }
+
+      if (segmentsFromSelectedInstances.containsAll(requiredSegments)) {
+        Map<String, String> segmentToInstance = new HashMap<>();
+        for (String segment : requiredSegments) {
+          for (String instance : selectedInstances) {

Review Comment:
   this seems very costly. We are iterating over all segments and all instances 
in the selected instance partition and then doing a lookup on the instance to 
segments map.
   
   A cheaper way might be to simply compute the segmentToInstances map in L21, 
and then iterate over that map and do a lookup on a set of selectedInstances.
   
   side-note: we should ideally throw if the segment was not added to 
`segmentToInstance`. otherwise bugs can be very hard to catch



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java:
##########
@@ -99,58 +100,139 @@ Pair<Map<String, String>, Map<String, String>> 
select(List<String> segments, int
     } else {
       replicaGroupSelected = requestId % 
instancePartitions.getNumReplicaGroups();
     }
-    for (int iteration = 0; iteration < 
instancePartitions.getNumReplicaGroups(); iteration++) {
-      int replicaGroup = (replicaGroupSelected + iteration) % 
instancePartitions.getNumReplicaGroups();
-      try {
-        return tryAssigning(segments, segmentStates, instancePartitions, 
replicaGroup);
-      } catch (Exception e) {
-        LOGGER.warn("Unable to select replica-group {} for table: {}", 
replicaGroup, _tableNameWithType, e);
+
+    return tryAssigning(Sets.newHashSet(segments), segmentStates, 
instancePartitions, replicaGroupSelected);
+  }
+
+  /**
+   * Returns a map from the segmentName to the corresponding server. It tries 
to select all servers from the
+   * preferredReplicaGroup, but if it fails, it will try to select the 
relevant server from other instance partitions.
+   */
+  private Pair<Map<String, String>, Map<String, String>> tryAssigning(
+    Set<String> segments,
+    SegmentStates segmentStates,
+    InstancePartitions instancePartitions,
+    int preferredReplicaId) {
+
+    Map<String, Integer> instanceToPartitionMap = 
instancePartitions.getInstanceToPartitionIdMap();
+    Map<String, Set<String>> instanceToSegmentsMap = new HashMap<>();
+
+    // instanceToSegmentsMap stores the mapping from instance to the active 
segments it can serve.
+    for (String segment : segments) {
+      List<SegmentInstanceCandidate> candidates = 
segmentStates.getCandidates(segment);
+      Preconditions.checkState(candidates != null, "Failed to find servers for 
segment: %s", segment);
+      for (SegmentInstanceCandidate candidate : candidates) {

Review Comment:
   need to handle scenario when `candidates` list is empty. otherwise the 
segment will be silently ignored



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java:
##########
@@ -99,58 +100,139 @@ Pair<Map<String, String>, Map<String, String>> 
select(List<String> segments, int
     } else {
       replicaGroupSelected = requestId % 
instancePartitions.getNumReplicaGroups();
     }
-    for (int iteration = 0; iteration < 
instancePartitions.getNumReplicaGroups(); iteration++) {
-      int replicaGroup = (replicaGroupSelected + iteration) % 
instancePartitions.getNumReplicaGroups();
-      try {
-        return tryAssigning(segments, segmentStates, instancePartitions, 
replicaGroup);
-      } catch (Exception e) {
-        LOGGER.warn("Unable to select replica-group {} for table: {}", 
replicaGroup, _tableNameWithType, e);
+
+    return tryAssigning(Sets.newHashSet(segments), segmentStates, 
instancePartitions, replicaGroupSelected);

Review Comment:
   If a table is getting rebalanced, some assumptions around the data 
assignment may break temporarily. During rebalance, afair, Instance Partitions 
are updated immediately and Ideal State is updated slowly to allow EV to change 
slowly.
   
   In such a state, can you think about what the behavior of this routing 
strategy will be?
   
   The other routing strategies use Ideal State to define replica-groups and 
server/segment assignments, so they'll have different behaviors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to