sajjad-moradi commented on code in PR #8550: URL: https://github.com/apache/pinot/pull/8550#discussion_r851431724
########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java: ########## @@ -49,15 +50,26 @@ public ReplicaGroupInstanceSelector(String tableNameWithType, BrokerMetrics brok @Override Map<String, String> select(List<String> segments, int requestId, - Map<String, List<String>> segmentToEnabledInstancesMap) { + Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions) { Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size())); + // validate queryOptions to get query + String replicaGroup = queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.NUM_REPLICA_GROUPS); + int replicaGroupNum = 1; + int currentRequest = 0; Review Comment: The name is a little bit confusing. RequestId is actually the current request! Maybe something like replicaOffset? ########## pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java: ########## @@ -581,6 +585,132 @@ public void testInstanceSelector() { assertTrue(selectionResult.getUnavailableSegments().isEmpty()); } + @Test + public void testReplicaGroupInstanceSelectorNumReplicaGroups() { + String offlineTableName = "testTable_OFFLINE"; + BrokerMetrics brokerMetrics = mock(BrokerMetrics.class); + BrokerRequest brokerRequest = mock(BrokerRequest.class); + PinotQuery pinotQuery = mock(PinotQuery.class); + Map<String, String> queryOptions = new HashMap<>(); + queryOptions.put("numReplicaGroups", "2"); + when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery); + when(pinotQuery.getQueryOptions()).thenReturn(queryOptions); + + ReplicaGroupInstanceSelector replicaGroupInstanceSelector = + new ReplicaGroupInstanceSelector(offlineTableName, brokerMetrics); + + Set<String> enabledInstances = new HashSet<>(); + IdealState idealState = new IdealState(offlineTableName); + Map<String, Map<String, String>> idealStateSegmentAssignment = idealState.getRecord().getMapFields(); + ExternalView externalView = new ExternalView(offlineTableName); + Map<String, Map<String, String>> externalViewSegmentAssignment = externalView.getRecord().getMapFields(); + Set<String> onlineSegments = new HashSet<>(); + + // 12 online segments with each segment having all 3 instances as online + // replicas are 3 + String instance0 = "instance0"; + String instance1 = "instance1"; + String instance2 = "instance2"; + enabledInstances.add(instance0); + enabledInstances.add(instance1); + enabledInstances.add(instance2); + + String segment0 = "segment0"; + String segment1 = "segment1"; + String segment2 = "segment2"; + String segment3 = "segment3"; + String segment4 = "segment4"; + String segment5 = "segment5"; + String segment6 = "segment6"; + String segment7 = "segment7"; + String segment8 = "segment8"; + String segment9 = "segment9"; + String segment10 = "segment10"; + String segment11 = "segment11"; + + Map<String, String> idealStateInstanceStateMap0 = new TreeMap<>(); + idealStateInstanceStateMap0.put(instance0, ONLINE); + idealStateInstanceStateMap0.put(instance1, ONLINE); + idealStateInstanceStateMap0.put(instance2, ONLINE); + idealStateSegmentAssignment.put(segment0, idealStateInstanceStateMap0); Review Comment: You can get rid of repetition here by creating a list for segmen names and loop over the list. ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java: ########## @@ -2152,8 +2159,16 @@ static void validateRequest(PinotQuery pinotQuery, int queryResponseLimit) { throw new IllegalStateException("SQL query should always have response format and group-by mode set to SQL"); } + // throw errors if options is less than 1, rectify if larger that current replicas + if (queryOptions.get(Broker.Request.QueryOptionKey.NUM_REPLICA_GROUPS) != null) { + Integer numReplicaGroups = QueryOptionsUtils.getNumReplicaGroups(queryOptions); + if (numReplicaGroups > numReplicas) { Review Comment: If numReplicaGroups is null, you get NPE for unboxing on the next line ########## pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptionsUtils.java: ########## @@ -59,6 +59,17 @@ public static boolean isSkipUpsert(Map<String, String> queryOptions) { return Boolean.parseBoolean(queryOptions.get(Request.QueryOptionKey.SKIP_UPSERT)); } + public static Integer getNumReplicaGroups(Map<String, String> queryOptions) { + String numReplicaGroups = queryOptions.get(Request.QueryOptionKey.NUM_REPLICA_GROUPS); + if (numReplicaGroups != null) { + int replicaGroups = Integer.parseInt(numReplicaGroups); + Preconditions.checkState(replicaGroups > 0, "numReplicaGroups must be positive, got: %s", numReplicaGroups); + return replicaGroups; + } else { + return null; Review Comment: Integer.parseInt() throws exception if the provided string is not a number. Can we treat null string the same way - not a number exception - and not special case it? That way the method is slightly simpler; the function return value can be primitive int; and there's no need on the calling function to check for null values. ########## pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java: ########## @@ -581,6 +585,132 @@ public void testInstanceSelector() { assertTrue(selectionResult.getUnavailableSegments().isEmpty()); } + @Test + public void testReplicaGroupInstanceSelectorNumReplicaGroups() { + String offlineTableName = "testTable_OFFLINE"; + BrokerMetrics brokerMetrics = mock(BrokerMetrics.class); + BrokerRequest brokerRequest = mock(BrokerRequest.class); + PinotQuery pinotQuery = mock(PinotQuery.class); + Map<String, String> queryOptions = new HashMap<>(); + queryOptions.put("numReplicaGroups", "2"); + when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery); + when(pinotQuery.getQueryOptions()).thenReturn(queryOptions); + + ReplicaGroupInstanceSelector replicaGroupInstanceSelector = + new ReplicaGroupInstanceSelector(offlineTableName, brokerMetrics); + + Set<String> enabledInstances = new HashSet<>(); + IdealState idealState = new IdealState(offlineTableName); + Map<String, Map<String, String>> idealStateSegmentAssignment = idealState.getRecord().getMapFields(); + ExternalView externalView = new ExternalView(offlineTableName); + Map<String, Map<String, String>> externalViewSegmentAssignment = externalView.getRecord().getMapFields(); + Set<String> onlineSegments = new HashSet<>(); + + // 12 online segments with each segment having all 3 instances as online + // replicas are 3 + String instance0 = "instance0"; + String instance1 = "instance1"; + String instance2 = "instance2"; + enabledInstances.add(instance0); + enabledInstances.add(instance1); + enabledInstances.add(instance2); + + String segment0 = "segment0"; + String segment1 = "segment1"; + String segment2 = "segment2"; + String segment3 = "segment3"; + String segment4 = "segment4"; + String segment5 = "segment5"; + String segment6 = "segment6"; + String segment7 = "segment7"; + String segment8 = "segment8"; + String segment9 = "segment9"; + String segment10 = "segment10"; + String segment11 = "segment11"; + + Map<String, String> idealStateInstanceStateMap0 = new TreeMap<>(); + idealStateInstanceStateMap0.put(instance0, ONLINE); + idealStateInstanceStateMap0.put(instance1, ONLINE); + idealStateInstanceStateMap0.put(instance2, ONLINE); + idealStateSegmentAssignment.put(segment0, idealStateInstanceStateMap0); + idealStateSegmentAssignment.put(segment1, idealStateInstanceStateMap0); + idealStateSegmentAssignment.put(segment2, idealStateInstanceStateMap0); + idealStateSegmentAssignment.put(segment3, idealStateInstanceStateMap0); + idealStateSegmentAssignment.put(segment4, idealStateInstanceStateMap0); + idealStateSegmentAssignment.put(segment5, idealStateInstanceStateMap0); + idealStateSegmentAssignment.put(segment6, idealStateInstanceStateMap0); + idealStateSegmentAssignment.put(segment7, idealStateInstanceStateMap0); + idealStateSegmentAssignment.put(segment8, idealStateInstanceStateMap0); + idealStateSegmentAssignment.put(segment9, idealStateInstanceStateMap0); + idealStateSegmentAssignment.put(segment10, idealStateInstanceStateMap0); + idealStateSegmentAssignment.put(segment11, idealStateInstanceStateMap0); + + Map<String, String> externalViewInstanceStateMap0 = new TreeMap<>(); + externalViewInstanceStateMap0.put(instance0, ONLINE); + externalViewInstanceStateMap0.put(instance1, ONLINE); + externalViewInstanceStateMap0.put(instance2, ONLINE); + externalViewSegmentAssignment.put(segment0, externalViewInstanceStateMap0); + externalViewSegmentAssignment.put(segment1, externalViewInstanceStateMap0); + externalViewSegmentAssignment.put(segment2, externalViewInstanceStateMap0); + externalViewSegmentAssignment.put(segment3, externalViewInstanceStateMap0); + externalViewSegmentAssignment.put(segment4, externalViewInstanceStateMap0); + externalViewSegmentAssignment.put(segment5, externalViewInstanceStateMap0); + externalViewSegmentAssignment.put(segment6, externalViewInstanceStateMap0); + externalViewSegmentAssignment.put(segment7, externalViewInstanceStateMap0); + externalViewSegmentAssignment.put(segment8, externalViewInstanceStateMap0); + externalViewSegmentAssignment.put(segment9, externalViewInstanceStateMap0); + externalViewSegmentAssignment.put(segment10, externalViewInstanceStateMap0); + externalViewSegmentAssignment.put(segment11, externalViewInstanceStateMap0); + + + onlineSegments.add(segment0); Review Comment: Ditto ########## pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java: ########## @@ -581,6 +585,132 @@ public void testInstanceSelector() { assertTrue(selectionResult.getUnavailableSegments().isEmpty()); } + @Test + public void testReplicaGroupInstanceSelectorNumReplicaGroups() { + String offlineTableName = "testTable_OFFLINE"; + BrokerMetrics brokerMetrics = mock(BrokerMetrics.class); + BrokerRequest brokerRequest = mock(BrokerRequest.class); + PinotQuery pinotQuery = mock(PinotQuery.class); + Map<String, String> queryOptions = new HashMap<>(); + queryOptions.put("numReplicaGroups", "2"); + when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery); + when(pinotQuery.getQueryOptions()).thenReturn(queryOptions); + + ReplicaGroupInstanceSelector replicaGroupInstanceSelector = + new ReplicaGroupInstanceSelector(offlineTableName, brokerMetrics); + + Set<String> enabledInstances = new HashSet<>(); + IdealState idealState = new IdealState(offlineTableName); + Map<String, Map<String, String>> idealStateSegmentAssignment = idealState.getRecord().getMapFields(); + ExternalView externalView = new ExternalView(offlineTableName); + Map<String, Map<String, String>> externalViewSegmentAssignment = externalView.getRecord().getMapFields(); + Set<String> onlineSegments = new HashSet<>(); + + // 12 online segments with each segment having all 3 instances as online + // replicas are 3 + String instance0 = "instance0"; + String instance1 = "instance1"; + String instance2 = "instance2"; + enabledInstances.add(instance0); + enabledInstances.add(instance1); + enabledInstances.add(instance2); + + String segment0 = "segment0"; + String segment1 = "segment1"; + String segment2 = "segment2"; + String segment3 = "segment3"; + String segment4 = "segment4"; + String segment5 = "segment5"; + String segment6 = "segment6"; + String segment7 = "segment7"; + String segment8 = "segment8"; + String segment9 = "segment9"; + String segment10 = "segment10"; + String segment11 = "segment11"; + + Map<String, String> idealStateInstanceStateMap0 = new TreeMap<>(); + idealStateInstanceStateMap0.put(instance0, ONLINE); + idealStateInstanceStateMap0.put(instance1, ONLINE); + idealStateInstanceStateMap0.put(instance2, ONLINE); + idealStateSegmentAssignment.put(segment0, idealStateInstanceStateMap0); + idealStateSegmentAssignment.put(segment1, idealStateInstanceStateMap0); + idealStateSegmentAssignment.put(segment2, idealStateInstanceStateMap0); + idealStateSegmentAssignment.put(segment3, idealStateInstanceStateMap0); + idealStateSegmentAssignment.put(segment4, idealStateInstanceStateMap0); + idealStateSegmentAssignment.put(segment5, idealStateInstanceStateMap0); + idealStateSegmentAssignment.put(segment6, idealStateInstanceStateMap0); + idealStateSegmentAssignment.put(segment7, idealStateInstanceStateMap0); + idealStateSegmentAssignment.put(segment8, idealStateInstanceStateMap0); + idealStateSegmentAssignment.put(segment9, idealStateInstanceStateMap0); + idealStateSegmentAssignment.put(segment10, idealStateInstanceStateMap0); + idealStateSegmentAssignment.put(segment11, idealStateInstanceStateMap0); + + Map<String, String> externalViewInstanceStateMap0 = new TreeMap<>(); + externalViewInstanceStateMap0.put(instance0, ONLINE); + externalViewInstanceStateMap0.put(instance1, ONLINE); + externalViewInstanceStateMap0.put(instance2, ONLINE); + externalViewSegmentAssignment.put(segment0, externalViewInstanceStateMap0); Review Comment: Ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org