snleee commented on code in PR #8550: URL: https://github.com/apache/pinot/pull/8550#discussion_r851439939
########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java: ########## @@ -363,7 +363,13 @@ private BrokerResponseNative handleSQLRequest(long requestId, String query, Json // Validate the request try { - validateRequest(pinotQuery, _queryResponseLimit); + int numReplicas = 1; + if (offlineTableConfig != null) { + numReplicas = offlineTableConfig.getValidationConfig().getReplicationNumber(); + } else if (realtimeTableConfig != null) { + numReplicas = realtimeTableConfig.getValidationConfig().getReplicationNumber(); Review Comment: For realtime, `getReplicasPerPartitionNumber()` is the correct indicator for the replication factor. ########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java: ########## @@ -49,15 +50,26 @@ public ReplicaGroupInstanceSelector(String tableNameWithType, BrokerMetrics brok @Override Map<String, String> select(List<String> segments, int requestId, - Map<String, List<String>> segmentToEnabledInstancesMap) { + Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions) { Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size())); + // validate queryOptions to get query + String replicaGroup = queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.NUM_REPLICA_GROUPS); + int replicaGroupNum = 1; + int currentRequest = 0; + if (replicaGroup != null) { + replicaGroupNum = Integer.parseInt(replicaGroup); Review Comment: `replicaGroupNum`->`numReplicaGroup` ########## pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java: ########## @@ -581,6 +585,132 @@ public void testInstanceSelector() { assertTrue(selectionResult.getUnavailableSegments().isEmpty()); } + @Test + public void testReplicaGroupInstanceSelectorNumReplicaGroups() { + String offlineTableName = "testTable_OFFLINE"; + BrokerMetrics brokerMetrics = mock(BrokerMetrics.class); + BrokerRequest brokerRequest = mock(BrokerRequest.class); + PinotQuery pinotQuery = mock(PinotQuery.class); + Map<String, String> queryOptions = new HashMap<>(); + queryOptions.put("numReplicaGroups", "2"); Review Comment: Can you also cover the case with `numReplicaGroups=4`? ########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java: ########## @@ -49,15 +50,26 @@ public ReplicaGroupInstanceSelector(String tableNameWithType, BrokerMetrics brok @Override Map<String, String> select(List<String> segments, int requestId, - Map<String, List<String>> segmentToEnabledInstancesMap) { + Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions) { Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size())); + // validate queryOptions to get query + String replicaGroup = queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.NUM_REPLICA_GROUPS); Review Comment: Instead of re-writing the option based on the table config, you can use `enabledInstances` + `numReplicaGroup`. ########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java: ########## @@ -49,15 +50,26 @@ public ReplicaGroupInstanceSelector(String tableNameWithType, BrokerMetrics brok @Override Map<String, String> select(List<String> segments, int requestId, - Map<String, List<String>> segmentToEnabledInstancesMap) { + Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions) { Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size())); + // validate queryOptions to get query + String replicaGroup = queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.NUM_REPLICA_GROUPS); + int replicaGroupNum = 1; + int currentRequest = 0; + if (replicaGroup != null) { + replicaGroupNum = Integer.parseInt(replicaGroup); + } for (String segment : segments) { List<String> enabledInstances = segmentToEnabledInstancesMap.get(segment); // NOTE: enabledInstances can be null when there is no enabled instances for the segment, or the instance selector // has not been updated (we update all components for routing in sequence) if (enabledInstances != null) { int numEnabledInstances = enabledInstances.size(); - segmentToSelectedInstanceMap.put(segment, enabledInstances.get(requestId % numEnabledInstances)); + int instanceToSelect = (requestId + currentRequest++) % numEnabledInstances; Review Comment: Can we add the comment in the header about how the behavior changes when the option is given? ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java: ########## @@ -2152,8 +2159,16 @@ static void validateRequest(PinotQuery pinotQuery, int queryResponseLimit) { throw new IllegalStateException("SQL query should always have response format and group-by mode set to SQL"); } + // throw errors if options is less than 1, rectify if larger that current replicas + if (queryOptions.get(Broker.Request.QueryOptionKey.NUM_REPLICA_GROUPS) != null) { Review Comment: I'm not sure if the `validateRequest()` is the correct place to update value for `queryOptions`. Why don't we process this in `select()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org