sajjad-moradi commented on code in PR #8550:
URL: https://github.com/apache/pinot/pull/8550#discussion_r851431724


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java:
##########
@@ -49,15 +50,26 @@ public ReplicaGroupInstanceSelector(String 
tableNameWithType, BrokerMetrics brok
 
   @Override
   Map<String, String> select(List<String> segments, int requestId,
-      Map<String, List<String>> segmentToEnabledInstancesMap) {
+      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, 
String> queryOptions) {
     Map<String, String> segmentToSelectedInstanceMap = new 
HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
+    // validate queryOptions to get query
+    String replicaGroup = 
queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.NUM_REPLICA_GROUPS);
+    int replicaGroupNum = 1;
+    int currentRequest = 0;

Review Comment:
   The name is a little bit confusing. RequestId is actually the current 
request!
   Maybe something like replicaOffset?



##########
pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java:
##########
@@ -581,6 +585,132 @@ public void testInstanceSelector() {
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
   }
 
+  @Test
+  public void testReplicaGroupInstanceSelectorNumReplicaGroups() {
+    String offlineTableName = "testTable_OFFLINE";
+    BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
+    BrokerRequest brokerRequest = mock(BrokerRequest.class);
+    PinotQuery pinotQuery = mock(PinotQuery.class);
+    Map<String, String> queryOptions = new HashMap<>();
+    queryOptions.put("numReplicaGroups", "2");
+    when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery);
+    when(pinotQuery.getQueryOptions()).thenReturn(queryOptions);
+
+    ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
+        new ReplicaGroupInstanceSelector(offlineTableName, brokerMetrics);
+
+    Set<String> enabledInstances = new HashSet<>();
+    IdealState idealState = new IdealState(offlineTableName);
+    Map<String, Map<String, String>> idealStateSegmentAssignment = 
idealState.getRecord().getMapFields();
+    ExternalView externalView = new ExternalView(offlineTableName);
+    Map<String, Map<String, String>> externalViewSegmentAssignment = 
externalView.getRecord().getMapFields();
+    Set<String> onlineSegments = new HashSet<>();
+
+    // 12 online segments with each segment having all 3 instances as online
+    // replicas are 3
+    String instance0 = "instance0";
+    String instance1 = "instance1";
+    String instance2 = "instance2";
+    enabledInstances.add(instance0);
+    enabledInstances.add(instance1);
+    enabledInstances.add(instance2);
+
+    String segment0 = "segment0";
+    String segment1 = "segment1";
+    String segment2 = "segment2";
+    String segment3 = "segment3";
+    String segment4 = "segment4";
+    String segment5 = "segment5";
+    String segment6 = "segment6";
+    String segment7 = "segment7";
+    String segment8 = "segment8";
+    String segment9 = "segment9";
+    String segment10 = "segment10";
+    String segment11 = "segment11";
+
+    Map<String, String> idealStateInstanceStateMap0 = new TreeMap<>();
+    idealStateInstanceStateMap0.put(instance0, ONLINE);
+    idealStateInstanceStateMap0.put(instance1, ONLINE);
+    idealStateInstanceStateMap0.put(instance2, ONLINE);
+    idealStateSegmentAssignment.put(segment0, idealStateInstanceStateMap0);

Review Comment:
   You can get rid of repetition here by creating a list for segmen names and  
loop over the list. 



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -2152,8 +2159,16 @@ static void validateRequest(PinotQuery pinotQuery, int 
queryResponseLimit) {
       throw new IllegalStateException("SQL query should always have response 
format and group-by mode set to SQL");
     }
 
+    // throw errors if options is less than 1, rectify if larger that current 
replicas
+    if (queryOptions.get(Broker.Request.QueryOptionKey.NUM_REPLICA_GROUPS) != 
null) {
+      Integer numReplicaGroups = 
QueryOptionsUtils.getNumReplicaGroups(queryOptions);
+      if (numReplicaGroups > numReplicas) {

Review Comment:
   If numReplicaGroups is null, you get NPE for unboxing on the next line



##########
pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptionsUtils.java:
##########
@@ -59,6 +59,17 @@ public static boolean isSkipUpsert(Map<String, String> 
queryOptions) {
     return 
Boolean.parseBoolean(queryOptions.get(Request.QueryOptionKey.SKIP_UPSERT));
   }
 
+  public static Integer getNumReplicaGroups(Map<String, String> queryOptions) {
+    String numReplicaGroups = 
queryOptions.get(Request.QueryOptionKey.NUM_REPLICA_GROUPS);
+    if (numReplicaGroups != null) {
+      int replicaGroups = Integer.parseInt(numReplicaGroups);
+      Preconditions.checkState(replicaGroups > 0, "numReplicaGroups must be 
positive, got: %s", numReplicaGroups);
+      return replicaGroups;
+    } else {
+      return null;

Review Comment:
   Integer.parseInt() throws exception if the provided string is not a number. 
Can we treat null string the same way - not a number exception - and not 
special case it? That way the method is slightly simpler; the function return 
value can be primitive int; and there's no need on the calling function to 
check for null values.



##########
pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java:
##########
@@ -581,6 +585,132 @@ public void testInstanceSelector() {
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
   }
 
+  @Test
+  public void testReplicaGroupInstanceSelectorNumReplicaGroups() {
+    String offlineTableName = "testTable_OFFLINE";
+    BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
+    BrokerRequest brokerRequest = mock(BrokerRequest.class);
+    PinotQuery pinotQuery = mock(PinotQuery.class);
+    Map<String, String> queryOptions = new HashMap<>();
+    queryOptions.put("numReplicaGroups", "2");
+    when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery);
+    when(pinotQuery.getQueryOptions()).thenReturn(queryOptions);
+
+    ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
+        new ReplicaGroupInstanceSelector(offlineTableName, brokerMetrics);
+
+    Set<String> enabledInstances = new HashSet<>();
+    IdealState idealState = new IdealState(offlineTableName);
+    Map<String, Map<String, String>> idealStateSegmentAssignment = 
idealState.getRecord().getMapFields();
+    ExternalView externalView = new ExternalView(offlineTableName);
+    Map<String, Map<String, String>> externalViewSegmentAssignment = 
externalView.getRecord().getMapFields();
+    Set<String> onlineSegments = new HashSet<>();
+
+    // 12 online segments with each segment having all 3 instances as online
+    // replicas are 3
+    String instance0 = "instance0";
+    String instance1 = "instance1";
+    String instance2 = "instance2";
+    enabledInstances.add(instance0);
+    enabledInstances.add(instance1);
+    enabledInstances.add(instance2);
+
+    String segment0 = "segment0";
+    String segment1 = "segment1";
+    String segment2 = "segment2";
+    String segment3 = "segment3";
+    String segment4 = "segment4";
+    String segment5 = "segment5";
+    String segment6 = "segment6";
+    String segment7 = "segment7";
+    String segment8 = "segment8";
+    String segment9 = "segment9";
+    String segment10 = "segment10";
+    String segment11 = "segment11";
+
+    Map<String, String> idealStateInstanceStateMap0 = new TreeMap<>();
+    idealStateInstanceStateMap0.put(instance0, ONLINE);
+    idealStateInstanceStateMap0.put(instance1, ONLINE);
+    idealStateInstanceStateMap0.put(instance2, ONLINE);
+    idealStateSegmentAssignment.put(segment0, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment1, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment2, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment3, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment4, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment5, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment6, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment7, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment8, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment9, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment10, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment11, idealStateInstanceStateMap0);
+
+    Map<String, String> externalViewInstanceStateMap0 = new TreeMap<>();
+    externalViewInstanceStateMap0.put(instance0, ONLINE);
+    externalViewInstanceStateMap0.put(instance1, ONLINE);
+    externalViewInstanceStateMap0.put(instance2, ONLINE);
+    externalViewSegmentAssignment.put(segment0, externalViewInstanceStateMap0);
+    externalViewSegmentAssignment.put(segment1, externalViewInstanceStateMap0);
+    externalViewSegmentAssignment.put(segment2, externalViewInstanceStateMap0);
+    externalViewSegmentAssignment.put(segment3, externalViewInstanceStateMap0);
+    externalViewSegmentAssignment.put(segment4, externalViewInstanceStateMap0);
+    externalViewSegmentAssignment.put(segment5, externalViewInstanceStateMap0);
+    externalViewSegmentAssignment.put(segment6, externalViewInstanceStateMap0);
+    externalViewSegmentAssignment.put(segment7, externalViewInstanceStateMap0);
+    externalViewSegmentAssignment.put(segment8, externalViewInstanceStateMap0);
+    externalViewSegmentAssignment.put(segment9, externalViewInstanceStateMap0);
+    externalViewSegmentAssignment.put(segment10, 
externalViewInstanceStateMap0);
+    externalViewSegmentAssignment.put(segment11, 
externalViewInstanceStateMap0);
+
+
+    onlineSegments.add(segment0);

Review Comment:
   Ditto 



##########
pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java:
##########
@@ -581,6 +585,132 @@ public void testInstanceSelector() {
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
   }
 
+  @Test
+  public void testReplicaGroupInstanceSelectorNumReplicaGroups() {
+    String offlineTableName = "testTable_OFFLINE";
+    BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
+    BrokerRequest brokerRequest = mock(BrokerRequest.class);
+    PinotQuery pinotQuery = mock(PinotQuery.class);
+    Map<String, String> queryOptions = new HashMap<>();
+    queryOptions.put("numReplicaGroups", "2");
+    when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery);
+    when(pinotQuery.getQueryOptions()).thenReturn(queryOptions);
+
+    ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
+        new ReplicaGroupInstanceSelector(offlineTableName, brokerMetrics);
+
+    Set<String> enabledInstances = new HashSet<>();
+    IdealState idealState = new IdealState(offlineTableName);
+    Map<String, Map<String, String>> idealStateSegmentAssignment = 
idealState.getRecord().getMapFields();
+    ExternalView externalView = new ExternalView(offlineTableName);
+    Map<String, Map<String, String>> externalViewSegmentAssignment = 
externalView.getRecord().getMapFields();
+    Set<String> onlineSegments = new HashSet<>();
+
+    // 12 online segments with each segment having all 3 instances as online
+    // replicas are 3
+    String instance0 = "instance0";
+    String instance1 = "instance1";
+    String instance2 = "instance2";
+    enabledInstances.add(instance0);
+    enabledInstances.add(instance1);
+    enabledInstances.add(instance2);
+
+    String segment0 = "segment0";
+    String segment1 = "segment1";
+    String segment2 = "segment2";
+    String segment3 = "segment3";
+    String segment4 = "segment4";
+    String segment5 = "segment5";
+    String segment6 = "segment6";
+    String segment7 = "segment7";
+    String segment8 = "segment8";
+    String segment9 = "segment9";
+    String segment10 = "segment10";
+    String segment11 = "segment11";
+
+    Map<String, String> idealStateInstanceStateMap0 = new TreeMap<>();
+    idealStateInstanceStateMap0.put(instance0, ONLINE);
+    idealStateInstanceStateMap0.put(instance1, ONLINE);
+    idealStateInstanceStateMap0.put(instance2, ONLINE);
+    idealStateSegmentAssignment.put(segment0, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment1, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment2, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment3, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment4, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment5, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment6, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment7, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment8, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment9, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment10, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment11, idealStateInstanceStateMap0);
+
+    Map<String, String> externalViewInstanceStateMap0 = new TreeMap<>();
+    externalViewInstanceStateMap0.put(instance0, ONLINE);
+    externalViewInstanceStateMap0.put(instance1, ONLINE);
+    externalViewInstanceStateMap0.put(instance2, ONLINE);
+    externalViewSegmentAssignment.put(segment0, externalViewInstanceStateMap0);

Review Comment:
   Ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to