Jackie-Jiang commented on code in PR #8550:
URL: https://github.com/apache/pinot/pull/8550#discussion_r852334030


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -265,7 +265,11 @@ private List<String> 
calculateEnabledInstancesForSegment(String segment, List<St
   @Override
   public SelectionResult select(BrokerRequest brokerRequest, List<String> 
segments) {
     int requestId = (int) (_requestId.getAndIncrement() % MAX_REQUEST_ID);
-    Map<String, String> segmentToInstanceMap = select(segments, requestId, 
_segmentToEnabledInstancesMap);
+    Map<String, String> queryOptions = 
brokerRequest.getPinotQuery().getQueryOptions();
+    if (queryOptions == null) {
+      queryOptions = new HashMap<>();

Review Comment:
   Reduce garbade
   ```suggestion
         queryOptions = Collections.emptyMap();
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -2133,10 +2139,11 @@ static void validateRequest(BrokerRequest 
brokerRequest, int queryResponseLimit)
    * <ul>
    *   <li>Value for 'LIMIT' <= configured value</li>
    *   <li>Query options must be set to SQL mode</li>
+   *   <li>Check if numReplicaGroups option provided is valid</li>
    * </ul>
    */
   @VisibleForTesting
-  static void validateRequest(PinotQuery pinotQuery, int queryResponseLimit) {
+  static void validateRequest(PinotQuery pinotQuery, int queryResponseLimit, 
int numReplicas) {

Review Comment:
   I don't think we need to validate this. `ReplicaGroupInstanceSelector` 
should be able to handle the case of configuring more replicas than the 
available servers. Even though the load might not be evenly distributed if the 
option is misconfigured, but the behavior should be similar to the 
transitioning phase and won't cause hotspot servers.



##########
pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptionsUtils.java:
##########
@@ -59,6 +59,17 @@ public static boolean isSkipUpsert(Map<String, String> 
queryOptions) {
     return 
Boolean.parseBoolean(queryOptions.get(Request.QueryOptionKey.SKIP_UPSERT));
   }
 
+  public static Integer getNumReplicaGroups(Map<String, String> queryOptions) {
+    String numReplicaGroups = 
queryOptions.get(Request.QueryOptionKey.NUM_REPLICA_GROUPS);
+    if (numReplicaGroups != null) {
+      int replicaGroups = Integer.parseInt(numReplicaGroups);
+      Preconditions.checkState(replicaGroups > 0, "numReplicaGroups must be 
positive, got: %s", numReplicaGroups);
+      return replicaGroups;
+    } else {
+      return null;

Review Comment:
   @sajjad-moradi I think we need the `null` return value to indicate that this 
option is not set, and create the routing table in the default way



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -247,6 +247,7 @@ public static class QueryOptionKey {
         public static final String MAX_EXECUTION_THREADS = 
"maxExecutionThreads";
         public static final String MIN_SEGMENT_GROUP_TRIM_SIZE = 
"minSegmentGroupTrimSize";
         public static final String MIN_SERVER_GROUP_TRIM_SIZE = 
"minServerGroupTrimSize";
+        public static final String NUM_REPLICA_GROUPS = "numReplicaGroups";

Review Comment:
   Make it more specific
   ```suggestion
           public static final String NUM_REPLICA_GROUPS_TO_QUERY = 
"numReplicaGroupsToQuery";
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java:
##########
@@ -49,15 +50,26 @@ public ReplicaGroupInstanceSelector(String 
tableNameWithType, BrokerMetrics brok
 
   @Override
   Map<String, String> select(List<String> segments, int requestId,
-      Map<String, List<String>> segmentToEnabledInstancesMap) {
+      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, 
String> queryOptions) {
     Map<String, String> segmentToSelectedInstanceMap = new 
HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
+    // validate queryOptions to get query
+    String replicaGroup = 
queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.NUM_REPLICA_GROUPS);
+    int replicaGroupNum = 1;
+    int currentRequest = 0;
+    if (replicaGroup != null) {
+      replicaGroupNum = Integer.parseInt(replicaGroup);
+    }
     for (String segment : segments) {
       List<String> enabledInstances = 
segmentToEnabledInstancesMap.get(segment);
       // NOTE: enabledInstances can be null when there is no enabled instances 
for the segment, or the instance selector
       // has not been updated (we update all components for routing in 
sequence)
       if (enabledInstances != null) {
         int numEnabledInstances = enabledInstances.size();
-        segmentToSelectedInstanceMap.put(segment, 
enabledInstances.get(requestId % numEnabledInstances));
+        int instanceToSelect = (requestId + currentRequest++) % 
numEnabledInstances;
+        segmentToSelectedInstanceMap.put(segment, 
enabledInstances.get(instanceToSelect));
+        if (currentRequest % replicaGroupNum == 0) {
+          currentRequest = 0;
+        }

Review Comment:
   This if check is redundant. For readability, I'd suggest removing the `++` 
on line 68, and change the code to
   ```suggestion
           currentRequest = (currentRequest + 1) % numReplicaGroupsToQuery;
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java:
##########
@@ -49,15 +50,26 @@ public ReplicaGroupInstanceSelector(String 
tableNameWithType, BrokerMetrics brok
 
   @Override
   Map<String, String> select(List<String> segments, int requestId,
-      Map<String, List<String>> segmentToEnabledInstancesMap) {
+      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, 
String> queryOptions) {
     Map<String, String> segmentToSelectedInstanceMap = new 
HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
+    // validate queryOptions to get query
+    String replicaGroup = 
queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.NUM_REPLICA_GROUPS);

Review Comment:
   Why not using the `QueryOptionsUtils`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to