ankitsultana commented on code in PR #15203: URL: https://github.com/apache/pinot/pull/15203#discussion_r2085845845
########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java: ########## @@ -88,6 +93,7 @@ abstract class BaseInstanceSelector implements InstanceSelector { final ZkHelixPropertyStore<ZNRecord> _propertyStore; final BrokerMetrics _brokerMetrics; final AdaptiveServerSelector _adaptiveServerSelector; + final PriorityGroupInstanceSelector _priorityGroupInstanceSelector; Review Comment: Let's mark this Nullable? ########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java: ########## @@ -76,18 +76,17 @@ Pair<Map<String, String>, Map<String, String>> select(List<String> segments, int SegmentStates segmentStates, Map<String, String> queryOptions) { if (_adaptiveServerSelector != null) { // Adaptive Server Selection is enabled. - List<String> candidateServers = fetchCandidateServersForQuery(segments, segmentStates); + List<SegmentInstanceCandidate> candidateServers = fetchCandidateServersForQuery(segments, segmentStates); // Fetch serverRankList before looping through all the segments. This is important to make sure that we pick // the least amount of instances for a query by referring to a single snapshot of the rankings. - List<Pair<String, Double>> serverRankListWithScores = - _adaptiveServerSelector.fetchServerRankingsWithScores(candidateServers); + List<String> serverRankList = _priorityGroupInstanceSelector.rank(new ServerSelectionContext(queryOptions), + candidateServers); Map<String, Integer> serverRankMap = new HashMap<>(); - for (int idx = 0; idx < serverRankListWithScores.size(); idx++) { - Pair<String, Double> entry = serverRankListWithScores.get(idx); - serverRankMap.put(entry.getLeft(), idx); + for (int idx = 0; idx < serverRankList.size(); idx++) { + serverRankMap.put(serverRankList.get(idx), idx); } - return selectServersUsingAdaptiveServerSelector(segments, requestId, segmentStates, serverRankMap); + return selectServersUsingAdaptiveServerSelector(segments, requestId, segmentStates, serverRankMap, queryOptions); Review Comment: ideally we should pass ServerSelectorContext here instead of queryOptions, since you want to define a broad and extensible context for entire instance selection? ########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java: ########## @@ -460,6 +460,10 @@ public static class Broker { "pinot.broker.enable.dynamic.filtering.semijoin"; public static final boolean DEFAULT_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN = true; + // When the server instance's pool field is null or the pool contains multi distinguished group value, set the group + // to -1. + public static final int DEFAULT_SERVER_REPLICA_GROUP_OF_BROKER_VIEW = -1; Review Comment: nit: consider renaming this since this is a bit confusing ########## pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java: ########## @@ -359,4 +359,16 @@ default long getRealtimeTotalCpuTimeNs() { * @return Set of tables queried */ Set<String> getTablesQueried(); + + /** + * Set the replica groups queried in the request + * @param replicaGroups + */ + void setReplicaGroups(Set<Integer> replicaGroups); + + /** + * Set the replica groups queried in the request Review Comment: "Set" ==> "Get" ########## pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/PriorityGroupInstanceSelectorTest.java: ########## @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.routing.adaptiveserverselector; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.broker.routing.instanceselector.SegmentInstanceCandidate; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.*; + +public class PriorityGroupInstanceSelectorTest { Review Comment: Can you also add an E2E integration test that runs queries with the ordered replica group query option set? We don't need to verify whether the right replica-groups are hit. Only that E2E queries continue to work with the option set. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org