This is an automated email from the ASF dual-hosted git repository.

yupeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e41bdd0c6f [multistage] Add Multi Stage Strict Replica Group Routing 
Strategy (#9808)
e41bdd0c6f is described below

commit e41bdd0c6ff851ca65102fb7bdc83161753e839c
Author: Ankit Sultana <ankitsult...@uber.com>
AuthorDate: Tue Nov 29 22:25:23 2022 +0530

    [multistage] Add Multi Stage Strict Replica Group Routing Strategy (#9808)
    
    * Rebase on master
    
    * Cleanup
    
    * Add tests
    
    * Fix test
    
    * Address feedback
    
    * Mark feature is in Beta
    
    * Address feedback
    
    * Address feedback
    
    * Rename to MultiStageReplicaGroup
---
 .../broker/api/resources/PinotBrokerDebug.java     |  15 +-
 .../requesthandler/BaseBrokerRequestHandler.java   |   4 +-
 .../MultiStageBrokerRequestHandler.java            |   2 +-
 .../pinot/broker/routing/BrokerRoutingManager.java |  13 +-
 .../instanceselector/BaseInstanceSelector.java     |  10 +-
 .../routing/instanceselector/InstanceSelector.java |   7 +-
 .../instanceselector/InstanceSelectorFactory.java  |  11 +-
 .../MultiStageReplicaGroupSelector.java            | 150 +++++++++++
 .../broker/broker/HelixBrokerStarterTest.java      |   7 +-
 .../BaseBrokerRequestHandlerTest.java              |   2 +-
 .../instanceselector/InstanceSelectorTest.java     | 277 +++++++++++++++------
 .../apache/pinot/core/routing/RoutingManager.java  |   2 +-
 .../org/apache/pinot/query/QueryEnvironment.java   |  16 +-
 .../pinot/query/planner/logical/StagePlanner.java  |   6 +-
 .../apache/pinot/query/routing/WorkerManager.java  |  16 +-
 .../query/testutils/MockRoutingManagerFactory.java |   2 +-
 .../pinot/spi/config/table/RoutingConfig.java      |   1 +
 17 files changed, 426 insertions(+), 115 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
index daa17a0e5c..f9eb55b535 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
@@ -30,6 +30,7 @@ import io.swagger.annotations.SwaggerDefinition;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.inject.Inject;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
@@ -59,6 +60,9 @@ import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
 // TODO: Add APIs to return the RoutingTable (with unavailable segments)
 public class PinotBrokerDebug {
 
+  // Request ID is passed to the RoutingManager to rotate the selected 
replica-group.
+  private final AtomicLong _requestIdGenerator = new AtomicLong();
+
   @Inject
   private BrokerRoutingManager _routingManager;
 
@@ -102,7 +106,7 @@ public class PinotBrokerDebug {
     if (tableType != TableType.REALTIME) {
       String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
       RoutingTable routingTable = _routingManager.getRoutingTable(
-          CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + 
offlineTableName));
+          CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + 
offlineTableName), getRequestId());
       if (routingTable != null) {
         result.put(offlineTableName, 
routingTable.getServerInstanceToSegmentsMap());
       }
@@ -110,7 +114,7 @@ public class PinotBrokerDebug {
     if (tableType != TableType.OFFLINE) {
       String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
       RoutingTable routingTable = _routingManager.getRoutingTable(
-          CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + 
realtimeTableName));
+          CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + 
realtimeTableName), getRequestId());
       if (routingTable != null) {
         result.put(realtimeTableName, 
routingTable.getServerInstanceToSegmentsMap());
       }
@@ -133,7 +137,8 @@ public class PinotBrokerDebug {
   })
   public Map<ServerInstance, List<String>> getRoutingTableForQuery(
       @ApiParam(value = "SQL query (table name should have type suffix)") 
@QueryParam("query") String query) {
-    RoutingTable routingTable = 
_routingManager.getRoutingTable(CalciteSqlCompiler.compileToBrokerRequest(query));
+    RoutingTable routingTable = 
_routingManager.getRoutingTable(CalciteSqlCompiler.compileToBrokerRequest(query),
+        getRequestId());
     if (routingTable != null) {
       return routingTable.getServerInstanceToSegmentsMap();
     } else {
@@ -157,4 +162,8 @@ public class PinotBrokerDebug {
   public String getServerRoutingStats() {
     return _serverRoutingStatsManager.getServerRoutingStatsStr();
   }
+
+  private long getRequestId() {
+    return _requestIdGenerator.getAndIncrement();
+  }
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index ade86b61bd..ae88689a51 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -551,7 +551,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     int numPrunedSegmentsTotal = 0;
     if (offlineBrokerRequest != null) {
       // NOTE: Routing table might be null if table is just removed
-      RoutingTable routingTable = 
_routingManager.getRoutingTable(offlineBrokerRequest);
+      RoutingTable routingTable = 
_routingManager.getRoutingTable(offlineBrokerRequest, requestId);
       if (routingTable != null) {
         unavailableSegments.addAll(routingTable.getUnavailableSegments());
         Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = 
routingTable.getServerInstanceToSegmentsMap();
@@ -567,7 +567,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     }
     if (realtimeBrokerRequest != null) {
       // NOTE: Routing table might be null if table is just removed
-      RoutingTable routingTable = 
_routingManager.getRoutingTable(realtimeBrokerRequest);
+      RoutingTable routingTable = 
_routingManager.getRoutingTable(realtimeBrokerRequest, requestId);
       if (routingTable != null) {
         unavailableSegments.addAll(routingTable.getUnavailableSegments());
         Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = 
routingTable.getServerInstanceToSegmentsMap();
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 0b1a67efbb..75e8d8d3f0 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -144,7 +144,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
           return constructMultistageExplainPlan(query, plan);
         case SELECT:
         default:
-          queryPlan = _queryEnvironment.planQuery(query, sqlNodeAndOptions);
+          queryPlan = _queryEnvironment.planQuery(query, sqlNodeAndOptions, 
requestId);
           break;
       }
     } catch (Exception e) {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index ee0982606d..883e9cfb02 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -81,7 +81,7 @@ import org.slf4j.LoggerFactory;
  *   <li>{@link #removeRouting(String)}: Removes the routing for a table</li>
  *   <li>{@link #refreshSegment(String, String)}: Refreshes the metadata for a 
segment</li>
  *   <li>{@link #routingExists(String)}: Returns whether the routing exists 
for a table</li>
- *   <li>{@link #getRoutingTable(BrokerRequest)}: Returns the routing table 
for a query</li>
+ *   <li>{@link #getRoutingTable(BrokerRequest, long)}: Returns the routing 
table for a query</li>
  *   <li>{@link #getTimeBoundaryInfo(String)}: Returns the time boundary info 
for a table</li>
  *   <li>{@link #getQueryTimeoutMs(String)}: Returns the table-level query 
timeout in milliseconds for a table</li>
  * </ul>
@@ -437,7 +437,8 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
     AdaptiveServerSelector adaptiveServerSelector =
         
AdaptiveServerSelectorFactory.getAdaptiveServerSelector(_serverRoutingStatsManager,
 _pinotConfig);
     InstanceSelector instanceSelector =
-        InstanceSelectorFactory.getInstanceSelector(tableConfig, 
_brokerMetrics, adaptiveServerSelector);
+        InstanceSelectorFactory.getInstanceSelector(tableConfig, 
_propertyStore, _brokerMetrics,
+            adaptiveServerSelector);
     instanceSelector.init(_routableServers, idealState, externalView, 
preSelectedOnlineSegments);
 
     // Add time boundary manager if both offline and real-time part exist for 
a hybrid table
@@ -567,13 +568,13 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
    */
   @Nullable
   @Override
-  public RoutingTable getRoutingTable(BrokerRequest brokerRequest) {
+  public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long 
requestId) {
     String tableNameWithType = brokerRequest.getQuerySource().getTableName();
     RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
     if (routingEntry == null) {
       return null;
     }
-    InstanceSelector.SelectionResult selectionResult = 
routingEntry.calculateRouting(brokerRequest);
+    InstanceSelector.SelectionResult selectionResult = 
routingEntry.calculateRouting(brokerRequest, requestId);
     Map<String, String> segmentToInstanceMap = 
selectionResult.getSegmentToInstanceMap();
     Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = new 
HashMap<>();
     for (Map.Entry<String, String> entry : segmentToInstanceMap.entrySet()) {
@@ -717,7 +718,7 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
       }
     }
 
-    InstanceSelector.SelectionResult calculateRouting(BrokerRequest 
brokerRequest) {
+    InstanceSelector.SelectionResult calculateRouting(BrokerRequest 
brokerRequest, long requestId) {
       Set<String> selectedSegments = _segmentSelector.select(brokerRequest);
       int numTotalSelectedSegments = selectedSegments.size();
       if (!selectedSegments.isEmpty()) {
@@ -728,7 +729,7 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
       int numPrunedSegments = numTotalSelectedSegments - 
selectedSegments.size();
       if (!selectedSegments.isEmpty()) {
         InstanceSelector.SelectionResult selectionResult = 
_instanceSelector.select(brokerRequest,
-            new ArrayList<>(selectedSegments));
+            new ArrayList<>(selectedSegments), requestId);
         selectionResult.setNumPrunedSegments(numPrunedSegments);
         return selectionResult;
       } else {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index 9b92d6031c..9325035aca 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
-import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -50,9 +49,8 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseInstanceSelector.class);
 
   // To prevent int overflow, reset the request id once it reaches this value
-  private static final int MAX_REQUEST_ID = 1_000_000_000;
+  private static final long MAX_REQUEST_ID = 1_000_000_000;
 
-  private final AtomicLong _requestId = new AtomicLong();
   private final String _tableNameWithType;
   private final BrokerMetrics _brokerMetrics;
   protected final AdaptiveServerSelector _adaptiveServerSelector;
@@ -267,13 +265,13 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
   }
 
   @Override
-  public SelectionResult select(BrokerRequest brokerRequest, List<String> 
segments) {
-    int requestId = (int) (_requestId.getAndIncrement() % MAX_REQUEST_ID);
+  public SelectionResult select(BrokerRequest brokerRequest, List<String> 
segments, long requestId) {
     Map<String, String> queryOptions = (brokerRequest.getPinotQuery() != null
         && brokerRequest.getPinotQuery().getQueryOptions() != null)
         ? brokerRequest.getPinotQuery().getQueryOptions()
         : Collections.emptyMap();
-    Map<String, String> segmentToInstanceMap = select(segments, requestId, 
_segmentToEnabledInstancesMap,
+    int requestIdInt = (int) (requestId % MAX_REQUEST_ID);
+    Map<String, String> segmentToInstanceMap = select(segments, requestIdInt, 
_segmentToEnabledInstancesMap,
         queryOptions);
     Set<String> unavailableSegments = _unavailableSegments;
     if (unavailableSegments.isEmpty()) {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
index 4edaf69b0a..4c96007fd6 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
@@ -55,8 +55,13 @@ public interface InstanceSelector {
    * Selects the server instances for the given segments queried by the given 
broker request, returns a map from segment
    * to selected server instance hosting the segment and a set of unavailable 
segments (no enabled instance or all
    * enabled instances are in ERROR state).
+   *
+   * @param brokerRequest BrokerRequest for the query
+   * @param segments segments for which instance needs to be selected
+   * @param requestId requestId generated by the Broker for a query
+   * @return instance of SelectionResult which describes the instance to pick 
for a given segment
    */
-  SelectionResult select(BrokerRequest brokerRequest, List<String> segments);
+  SelectionResult select(BrokerRequest brokerRequest, List<String> segments, 
long requestId);
 
   class SelectionResult {
     private final Map<String, String> _segmentToInstanceMap;
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
index 0ef46d7b7a..8cc9f260f8 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
@@ -19,6 +19,8 @@
 package org.apache.pinot.broker.routing.instanceselector;
 
 import javax.annotation.Nullable;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import 
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.spi.config.table.RoutingConfig;
@@ -37,7 +39,8 @@ public class InstanceSelectorFactory {
   public static final String LEGACY_REPLICA_GROUP_OFFLINE_ROUTING = 
"PartitionAwareOffline";
   public static final String LEGACY_REPLICA_GROUP_REALTIME_ROUTING = 
"PartitionAwareRealtime";
 
-  public static InstanceSelector getInstanceSelector(TableConfig tableConfig, 
BrokerMetrics brokerMetrics,
+  public static InstanceSelector getInstanceSelector(TableConfig tableConfig,
+      ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics 
brokerMetrics,
       @Nullable AdaptiveServerSelector adaptiveServerSelector) {
     String tableNameWithType = tableConfig.getTableName();
     RoutingConfig routingConfig = tableConfig.getRoutingConfig();
@@ -55,6 +58,12 @@ public class InstanceSelectorFactory {
         LOGGER.info("Using StrictReplicaGroupInstanceSelector for table: {}", 
tableNameWithType);
         return new StrictReplicaGroupInstanceSelector(tableNameWithType, 
brokerMetrics, adaptiveServerSelector);
       }
+      if 
(RoutingConfig.MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE.equalsIgnoreCase(
+          routingConfig.getInstanceSelectorType())) {
+        LOGGER.info("Using {} for table: {}", 
routingConfig.getInstanceSelectorType(), tableNameWithType);
+        return new MultiStageReplicaGroupSelector(tableNameWithType, 
propertyStore, brokerMetrics,
+            adaptiveServerSelector);
+      }
     }
     return new BalancedInstanceSelector(tableNameWithType, brokerMetrics, 
adaptiveServerSelector);
   }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
new file mode 100644
index 0000000000..0a6d66510c
--- /dev/null
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.instanceselector;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import 
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Instance selector for multi-stage queries which can ensure that Colocated 
Tables always leverage Colocated Join
+ * whenever possible. To achieve this, this instance-selector uses 
InstancePartitions (IP) to determine replica-groups,
+ * as opposed to IdealState used by other instance-selectors. Moreover, this 
also uses the requestId generated by
+ * Pinot broker to determine the replica-group picked for each table involved 
in the query, as opposed to using a
+ * member variable. There may be scenarios where an instance in the chosen 
replica-group is down. In that case, this
+ * strategy will try to pick another replica-group. For realtime tables, this 
strategy uses only CONSUMING partitions.
+ * This is feature is in <strong>Beta</strong>.
+ */
+public class MultiStageReplicaGroupSelector extends BaseInstanceSelector {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MultiStageReplicaGroupSelector.class);
+
+  private final String _tableNameWithType;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  private InstancePartitions _instancePartitions;
+
+  public MultiStageReplicaGroupSelector(String tableNameWithType, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
+      BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector) {
+    super(tableNameWithType, brokerMetrics, adaptiveServerSelector);
+    _tableNameWithType = tableNameWithType;
+    _propertyStore = propertyStore;
+  }
+
+  @Override
+  public void init(Set<String> enabledInstances, IdealState idealState, 
ExternalView externalView,
+      Set<String> onlineSegments) {
+    super.init(enabledInstances, idealState, externalView, onlineSegments);
+    _instancePartitions = getInstancePartitions();
+  }
+
+  @Override
+  public void onInstancesChange(Set<String> enabledInstances, List<String> 
changedInstances) {
+    super.onInstancesChange(enabledInstances, changedInstances);
+    _instancePartitions = getInstancePartitions();
+  }
+
+  @Override
+  public void onAssignmentChange(IdealState idealState, ExternalView 
externalView, Set<String> onlineSegments) {
+    super.onAssignmentChange(idealState, externalView, onlineSegments);
+    _instancePartitions = getInstancePartitions();
+  }
+
+  @Override
+  Map<String, String> select(List<String> segments, int requestId,
+      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, 
String> queryOptions) {
+    // Create a copy of InstancePartitions to avoid race-condition with 
event-listeners above.
+    InstancePartitions instancePartitions = _instancePartitions;
+    int replicaGroupSelected = requestId % 
instancePartitions.getNumReplicaGroups();
+    for (int iteration = 0; iteration < 
instancePartitions.getNumReplicaGroups(); iteration++) {
+      int replicaGroup = (replicaGroupSelected + iteration) % 
instancePartitions.getNumReplicaGroups();
+      try {
+        return tryAssigning(segmentToEnabledInstancesMap, instancePartitions, 
replicaGroup);
+      } catch (Exception e) {
+        LOGGER.warn("Unable to select replica-group {} for table: {}", 
replicaGroup, _tableNameWithType, e);
+      }
+    }
+    throw new RuntimeException(String.format("Unable to find any replica-group 
to serve table: %s",
+        _tableNameWithType));
+  }
+
+  /**
+   * Returns a map from the segmentName to the corresponding server in the 
given replica-group. If the is not enabled,
+   * we throw an exception.
+   */
+  private Map<String, String> tryAssigning(Map<String, List<String>> 
segmentToEnabledInstancesMap,
+      InstancePartitions instancePartitions, int replicaId) {
+    Set<String> instanceLookUpSet = new HashSet<>();
+    for (int partition = 0; partition < instancePartitions.getNumPartitions(); 
partition++) {
+      List<String> instances = instancePartitions.getInstances(partition, 
replicaId);
+      instanceLookUpSet.addAll(instances);
+    }
+    Map<String, String> result = new HashMap<>();
+    for (Map.Entry<String, List<String>> entry : 
segmentToEnabledInstancesMap.entrySet()) {
+      String segmentName = entry.getKey();
+      boolean found = false;
+      for (String enabledInstanceForSegment : entry.getValue()) {
+        if (instanceLookUpSet.contains(enabledInstanceForSegment)) {
+          found = true;
+          result.put(segmentName, enabledInstanceForSegment);
+          break;
+        }
+      }
+      if (!found) {
+        throw new RuntimeException(String.format("Unable to find an enabled 
instance for segment: %s", segmentName));
+      }
+    }
+    return result;
+  }
+
+  @VisibleForTesting
+  protected InstancePartitions getInstancePartitions() {
+    // TODO: Evaluate whether we need to provide support for COMPLETE 
partitions.
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(_tableNameWithType);
+    Preconditions.checkNotNull(tableType);
+    InstancePartitions instancePartitions = null;
+    if (tableType.equals(TableType.OFFLINE)) {
+      instancePartitions = 
InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
+          
InstancePartitionsUtils.getInstancePartitionsName(_tableNameWithType, 
tableType.name()));
+    } else {
+      instancePartitions = 
InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
+          InstancePartitionsUtils.getInstancePartitionsName(_tableNameWithType,
+              InstancePartitionsType.CONSUMING.name()));
+    }
+    Preconditions.checkNotNull(instancePartitions);
+    return instancePartitions;
+  }
+}
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index b2abbd59a9..53e75d0557 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -153,7 +153,7 @@ public class HelixBrokerStarterTest extends ControllerTest {
     assertTrue(routingManager.routingExists(REALTIME_TABLE_NAME));
 
     BrokerRequest brokerRequest = 
CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + 
OFFLINE_TABLE_NAME);
-    RoutingTable routingTable = routingManager.getRoutingTable(brokerRequest);
+    RoutingTable routingTable = routingManager.getRoutingTable(brokerRequest, 
0);
     assertNotNull(routingTable);
     assertEquals(routingTable.getServerInstanceToSegmentsMap().size(), 
NUM_SERVERS);
     
assertEquals(routingTable.getServerInstanceToSegmentsMap().values().iterator().next().size(),
 NUM_OFFLINE_SEGMENTS);
@@ -164,8 +164,9 @@ public class HelixBrokerStarterTest extends ControllerTest {
         SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME), 
"downloadUrl");
 
     TestUtils.waitForCondition(aVoid ->
-        
routingManager.getRoutingTable(brokerRequest).getServerInstanceToSegmentsMap().values().iterator().next().size()
-            == NUM_OFFLINE_SEGMENTS + 1, 30_000L, "Failed to add the new 
segment into the routing table");
+        routingManager.getRoutingTable(brokerRequest, 
0).getServerInstanceToSegmentsMap()
+            .values().iterator().next().size() == NUM_OFFLINE_SEGMENTS + 1, 
30_000L, "Failed to add the new segment "
+        + "into the routing table");
 
     // Add a new table with different broker tenant
     String newRawTableName = "newTable";
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
index cff4eff193..88f72600e5 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
@@ -199,7 +199,7 @@ public class BaseBrokerRequestHandlerTest {
     RoutingTable rt = mock(RoutingTable.class);
     when(rt.getServerInstanceToSegmentsMap()).thenReturn(Collections
         .singletonMap(new ServerInstance(new InstanceConfig("server01_9000")), 
Collections.singletonList("segment01")));
-    when(routingManager.getRoutingTable(any())).thenReturn(rt);
+    when(routingManager.getRoutingTable(any(), 
Mockito.anyLong())).thenReturn(rt);
     QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class);
     when(queryQuotaManager.acquire(anyString())).thenReturn(true);
     CountDownLatch latch = new CountDownLatch(1);
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
index 5025782a49..07efbe101a 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -28,7 +31,10 @@ import java.util.Set;
 import java.util.TreeMap;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import 
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
+import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.PinotQuery;
@@ -41,10 +47,13 @@ import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.Segmen
 import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ERROR;
 import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE;
 import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 
 public class InstanceSelectorTest {
@@ -55,26 +64,27 @@ public class InstanceSelectorTest {
   public void testInstanceSelectorFactory() {
     TableConfig tableConfig = mock(TableConfig.class);
     BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
     AdaptiveServerSelector adaptiveServerSelector = null;
 
     // Routing config is missing
-    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
brokerMetrics,
+    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics,
         adaptiveServerSelector) instanceof BalancedInstanceSelector);
 
     // Instance selector type is not configured
     RoutingConfig routingConfig = mock(RoutingConfig.class);
     when(tableConfig.getRoutingConfig()).thenReturn(routingConfig);
-    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
brokerMetrics,
+    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics,
         adaptiveServerSelector) instanceof BalancedInstanceSelector);
 
     // Replica-group instance selector should be returned
     
when(routingConfig.getInstanceSelectorType()).thenReturn(RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
-    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
brokerMetrics,
+    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics,
         adaptiveServerSelector) instanceof ReplicaGroupInstanceSelector);
 
     // Strict replica-group instance selector should be returned
     
when(routingConfig.getInstanceSelectorType()).thenReturn(RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
-    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
brokerMetrics,
+    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics,
         adaptiveServerSelector) instanceof StrictReplicaGroupInstanceSelector);
 
     // Should be backward-compatible with legacy config
@@ -82,12 +92,12 @@ public class InstanceSelectorTest {
     when(tableConfig.getTableType()).thenReturn(TableType.OFFLINE);
     when(routingConfig.getRoutingTableBuilderName()).thenReturn(
         InstanceSelectorFactory.LEGACY_REPLICA_GROUP_OFFLINE_ROUTING);
-    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
brokerMetrics,
+    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics,
         adaptiveServerSelector) instanceof ReplicaGroupInstanceSelector);
     when(tableConfig.getTableType()).thenReturn(TableType.REALTIME);
     when(routingConfig.getRoutingTableBuilderName()).thenReturn(
         InstanceSelectorFactory.LEGACY_REPLICA_GROUP_REALTIME_ROUTING);
-    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
brokerMetrics,
+    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics,
         adaptiveServerSelector) instanceof ReplicaGroupInstanceSelector);
   }
 
@@ -169,6 +179,8 @@ public class InstanceSelectorTest {
     replicaGroupInstanceSelector.init(enabledInstances, idealState, 
externalView, onlineSegments);
     strictReplicaGroupInstanceSelector.init(enabledInstances, idealState, 
externalView, onlineSegments);
 
+    int requestId = 0;
+
     // For the 1st request:
     //   BalancedInstanceSelector:
     //     segment0 -> instance0
@@ -189,7 +201,8 @@ public class InstanceSelectorTest {
     expectedBalancedInstanceSelectorResult.put(segment1, instance2);
     expectedBalancedInstanceSelectorResult.put(segment2, instance1);
     expectedBalancedInstanceSelectorResult.put(segment3, instance3);
-    InstanceSelector.SelectionResult selectionResult = 
balancedInstanceSelector.select(brokerRequest, segments);
+    InstanceSelector.SelectionResult selectionResult = 
balancedInstanceSelector.select(brokerRequest, segments,
+        requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedBalancedInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
     Map<String, String> expectedReplicaGroupInstanceSelectorResult = new 
HashMap<>();
@@ -197,10 +210,10 @@ public class InstanceSelectorTest {
     expectedReplicaGroupInstanceSelectorResult.put(segment1, instance0);
     expectedReplicaGroupInstanceSelectorResult.put(segment2, instance1);
     expectedReplicaGroupInstanceSelectorResult.put(segment3, instance1);
-    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
-    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
@@ -215,12 +228,13 @@ public class InstanceSelectorTest {
     //     segment1 -> instance2
     //     segment2 -> instance3
     //     segment3 -> instance3
+    requestId++;
     expectedBalancedInstanceSelectorResult = new HashMap<>();
     expectedBalancedInstanceSelectorResult.put(segment0, instance2);
     expectedBalancedInstanceSelectorResult.put(segment1, instance0);
     expectedBalancedInstanceSelectorResult.put(segment2, instance3);
     expectedBalancedInstanceSelectorResult.put(segment3, instance1);
-    selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+    selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 
requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedBalancedInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
     expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -228,10 +242,10 @@ public class InstanceSelectorTest {
     expectedReplicaGroupInstanceSelectorResult.put(segment1, instance2);
     expectedReplicaGroupInstanceSelectorResult.put(segment2, instance3);
     expectedReplicaGroupInstanceSelectorResult.put(segment3, instance3);
-    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
-    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
@@ -252,12 +266,13 @@ public class InstanceSelectorTest {
     //     segment1 -> instance2
     //     segment2 -> instance1
     //     segment3 -> instance1
+    requestId++;
     expectedBalancedInstanceSelectorResult = new HashMap<>();
     expectedBalancedInstanceSelectorResult.put(segment0, instance2);
     expectedBalancedInstanceSelectorResult.put(segment1, instance2);
     expectedBalancedInstanceSelectorResult.put(segment2, instance1);
     expectedBalancedInstanceSelectorResult.put(segment3, instance3);
-    selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+    selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 
requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedBalancedInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
     expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -265,10 +280,10 @@ public class InstanceSelectorTest {
     expectedReplicaGroupInstanceSelectorResult.put(segment1, instance2);
     expectedReplicaGroupInstanceSelectorResult.put(segment2, instance1);
     expectedReplicaGroupInstanceSelectorResult.put(segment3, instance1);
-    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
-    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
@@ -283,12 +298,13 @@ public class InstanceSelectorTest {
     //     segment1 -> instance2
     //     segment2 -> instance3
     //     segment3 -> instance3
+    requestId++;
     expectedBalancedInstanceSelectorResult = new HashMap<>();
     expectedBalancedInstanceSelectorResult.put(segment0, instance2);
     expectedBalancedInstanceSelectorResult.put(segment1, instance2);
     expectedBalancedInstanceSelectorResult.put(segment2, instance3);
     expectedBalancedInstanceSelectorResult.put(segment3, instance1);
-    selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+    selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 
requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedBalancedInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
     expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -296,10 +312,10 @@ public class InstanceSelectorTest {
     expectedReplicaGroupInstanceSelectorResult.put(segment1, instance2);
     expectedReplicaGroupInstanceSelectorResult.put(segment2, instance3);
     expectedReplicaGroupInstanceSelectorResult.put(segment3, instance3);
-    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
-    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
@@ -326,21 +342,22 @@ public class InstanceSelectorTest {
     //     segment2 -> instance1
     //     segment3 -> instance1
     //     segment4 -> null
+    requestId++;
     expectedBalancedInstanceSelectorResult = new HashMap<>();
     expectedBalancedInstanceSelectorResult.put(segment1, instance2);
     expectedBalancedInstanceSelectorResult.put(segment2, instance3);
     expectedBalancedInstanceSelectorResult.put(segment3, instance1);
-    selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+    selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 
requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedBalancedInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
     expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
     expectedReplicaGroupInstanceSelectorResult.put(segment1, instance2);
     expectedReplicaGroupInstanceSelectorResult.put(segment2, instance1);
     expectedReplicaGroupInstanceSelectorResult.put(segment3, instance1);
-    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
-    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
@@ -355,21 +372,22 @@ public class InstanceSelectorTest {
     //     segment2 -> instance3
     //     segment3 -> instance3
     //     segment4 -> null
+    requestId++;
     expectedBalancedInstanceSelectorResult = new HashMap<>();
     expectedBalancedInstanceSelectorResult.put(segment1, instance2);
     expectedBalancedInstanceSelectorResult.put(segment2, instance1);
     expectedBalancedInstanceSelectorResult.put(segment3, instance3);
-    selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+    selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 
requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedBalancedInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
     expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
     expectedReplicaGroupInstanceSelectorResult.put(segment1, instance2);
     expectedReplicaGroupInstanceSelectorResult.put(segment2, instance3);
     expectedReplicaGroupInstanceSelectorResult.put(segment3, instance3);
-    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
-    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
@@ -389,12 +407,13 @@ public class InstanceSelectorTest {
     //     segment2 -> instance1
     //     segment3 -> instance1
     //     segment4 -> instance2
+    requestId++;
     expectedBalancedInstanceSelectorResult = new HashMap<>();
     expectedBalancedInstanceSelectorResult.put(segment1, instance2);
     expectedBalancedInstanceSelectorResult.put(segment2, instance3);
     expectedBalancedInstanceSelectorResult.put(segment3, instance1);
     expectedBalancedInstanceSelectorResult.put(segment4, instance2);
-    selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+    selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 
requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedBalancedInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
     expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -402,10 +421,10 @@ public class InstanceSelectorTest {
     expectedReplicaGroupInstanceSelectorResult.put(segment2, instance1);
     expectedReplicaGroupInstanceSelectorResult.put(segment3, instance1);
     expectedReplicaGroupInstanceSelectorResult.put(segment4, instance2);
-    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
-    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
@@ -420,12 +439,13 @@ public class InstanceSelectorTest {
     //     segment2 -> instance3
     //     segment3 -> instance3
     //     segment4 -> instance2
+    requestId++;
     expectedBalancedInstanceSelectorResult = new HashMap<>();
     expectedBalancedInstanceSelectorResult.put(segment1, instance2);
     expectedBalancedInstanceSelectorResult.put(segment2, instance1);
     expectedBalancedInstanceSelectorResult.put(segment3, instance3);
     expectedBalancedInstanceSelectorResult.put(segment4, instance2);
-    selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+    selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 
requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedBalancedInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
     expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -433,10 +453,10 @@ public class InstanceSelectorTest {
     expectedReplicaGroupInstanceSelectorResult.put(segment2, instance3);
     expectedReplicaGroupInstanceSelectorResult.put(segment3, instance3);
     expectedReplicaGroupInstanceSelectorResult.put(segment4, instance2);
-    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
-    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
@@ -457,12 +477,13 @@ public class InstanceSelectorTest {
     //     segment2 -> instance1
     //     segment3 -> instance1
     //     segment4 -> instance0
+    requestId++;
     expectedBalancedInstanceSelectorResult = new HashMap<>();
     expectedBalancedInstanceSelectorResult.put(segment1, instance0);
     expectedBalancedInstanceSelectorResult.put(segment2, instance3);
     expectedBalancedInstanceSelectorResult.put(segment3, instance1);
     expectedBalancedInstanceSelectorResult.put(segment4, instance2);
-    selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+    selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 
requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedBalancedInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
     expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -470,10 +491,10 @@ public class InstanceSelectorTest {
     expectedReplicaGroupInstanceSelectorResult.put(segment2, instance1);
     expectedReplicaGroupInstanceSelectorResult.put(segment3, instance1);
     expectedReplicaGroupInstanceSelectorResult.put(segment4, instance0);
-    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
-    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
@@ -488,12 +509,13 @@ public class InstanceSelectorTest {
     //     segment2 -> instance3
     //     segment3 -> instance3
     //     segment4 -> instance2
+    requestId++;
     expectedBalancedInstanceSelectorResult = new HashMap<>();
     expectedBalancedInstanceSelectorResult.put(segment1, instance2);
     expectedBalancedInstanceSelectorResult.put(segment2, instance1);
     expectedBalancedInstanceSelectorResult.put(segment3, instance3);
     expectedBalancedInstanceSelectorResult.put(segment4, instance0);
-    selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+    selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 
requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedBalancedInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
     expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -501,10 +523,10 @@ public class InstanceSelectorTest {
     expectedReplicaGroupInstanceSelectorResult.put(segment2, instance3);
     expectedReplicaGroupInstanceSelectorResult.put(segment3, instance3);
     expectedReplicaGroupInstanceSelectorResult.put(segment4, instance2);
-    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
-    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
@@ -534,12 +556,13 @@ public class InstanceSelectorTest {
     //     segment2 -> instance1
     //     segment3 -> instance1
     //     segment4 -> instance2
+    requestId++;
     expectedBalancedInstanceSelectorResult = new HashMap<>();
     expectedBalancedInstanceSelectorResult.put(segment1, instance2);
     expectedBalancedInstanceSelectorResult.put(segment2, instance3);
     expectedBalancedInstanceSelectorResult.put(segment3, instance1);
     expectedBalancedInstanceSelectorResult.put(segment4, instance2);
-    selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+    selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 
requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedBalancedInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
     expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -547,7 +570,7 @@ public class InstanceSelectorTest {
     expectedReplicaGroupInstanceSelectorResult.put(segment2, instance1);
     expectedReplicaGroupInstanceSelectorResult.put(segment3, instance1);
     expectedReplicaGroupInstanceSelectorResult.put(segment4, instance0);
-    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
     Map<String, String> expectedStrictReplicaGroupInstanceSelectorResult = new 
HashMap<>();
@@ -555,7 +578,7 @@ public class InstanceSelectorTest {
     expectedStrictReplicaGroupInstanceSelectorResult.put(segment2, instance1);
     expectedStrictReplicaGroupInstanceSelectorResult.put(segment3, instance1);
     expectedStrictReplicaGroupInstanceSelectorResult.put(segment4, instance2);
-    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedStrictReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
@@ -570,12 +593,13 @@ public class InstanceSelectorTest {
     //     segment2 -> instance3
     //     segment3 -> instance3
     //     segment4 -> instance2
+    requestId++;
     expectedBalancedInstanceSelectorResult = new HashMap<>();
     expectedBalancedInstanceSelectorResult.put(segment1, instance2);
     expectedBalancedInstanceSelectorResult.put(segment2, instance1);
     expectedBalancedInstanceSelectorResult.put(segment3, instance3);
     expectedBalancedInstanceSelectorResult.put(segment4, instance0);
-    selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+    selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 
requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedBalancedInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
     expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -583,10 +607,10 @@ public class InstanceSelectorTest {
     expectedReplicaGroupInstanceSelectorResult.put(segment2, instance3);
     expectedReplicaGroupInstanceSelectorResult.put(segment3, instance3);
     expectedReplicaGroupInstanceSelectorResult.put(segment4, instance2);
-    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
-    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments, requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
   }
@@ -668,12 +692,11 @@ public class InstanceSelectorTest {
     expectedReplicaGroupInstanceSelectorResult.put(segments.get(9), instance1);
     expectedReplicaGroupInstanceSelectorResult.put(segments.get(10), 
instance0);
     expectedReplicaGroupInstanceSelectorResult.put(segments.get(11), 
instance1);
-    InstanceSelector.SelectionResult selectionResult = 
replicaGroupInstanceSelector.select(brokerRequest, segments);
+    InstanceSelector.SelectionResult selectionResult = 
replicaGroupInstanceSelector.select(brokerRequest, segments, 0);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
   }
 
-
   @Test
   public void 
testReplicaGroupInstanceSelectorNumReplicaGroupsToQueryGreaterThanReplicas() {
     String offlineTableName = "testTable_OFFLINE";
@@ -752,7 +775,7 @@ public class InstanceSelectorTest {
     expectedReplicaGroupInstanceSelectorResult.put(segments.get(9), instance0);
     expectedReplicaGroupInstanceSelectorResult.put(segments.get(10), 
instance1);
     expectedReplicaGroupInstanceSelectorResult.put(segments.get(11), 
instance2);
-    InstanceSelector.SelectionResult selectionResult = 
replicaGroupInstanceSelector.select(brokerRequest, segments);
+    InstanceSelector.SelectionResult selectionResult = 
replicaGroupInstanceSelector.select(brokerRequest, segments, 0);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
   }
@@ -814,14 +837,126 @@ public class InstanceSelectorTest {
     for (String segment: segments) {
       expectedReplicaGroupInstanceSelectorResult.put(segment, instance0);
     }
-    InstanceSelector.SelectionResult selectionResult = 
replicaGroupInstanceSelector.select(brokerRequest, segments);
+    InstanceSelector.SelectionResult selectionResult = 
replicaGroupInstanceSelector.select(brokerRequest, segments, 0);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
 
     for (String segment: segments) {
       expectedReplicaGroupInstanceSelectorResult.put(segment, instance1);
     }
-    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments, 1);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
+  }
+
+  @Test
+  public void testMultiStageStrictReplicaGroupSelector() {
+    String offlineTableName = "testTable_OFFLINE";
+    // Create instance-partitions with two replica-groups and 1 partition. 
Each replica-group has 2 instances.
+    List<String> replicaGroup0 = ImmutableList.of("instance-0", "instance-1");
+    List<String> replicaGroup1 = ImmutableList.of("instance-2", "instance-3");
+    Map<String, List<String>> partitionToInstances = ImmutableMap.of(
+        "0_0", replicaGroup0,
+        "0_1", replicaGroup1);
+    InstancePartitions instancePartitions = new 
InstancePartitions(offlineTableName);
+    instancePartitions.setInstances(0, 0, partitionToInstances.get("0_0"));
+    instancePartitions.setInstances(0, 1, partitionToInstances.get("0_1"));
+    BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
+    BrokerRequest brokerRequest = mock(BrokerRequest.class);
+    PinotQuery pinotQuery = mock(PinotQuery.class);
+    Map<String, String> queryOptions = new HashMap<>();
+
+    when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery);
+    when(pinotQuery.getQueryOptions()).thenReturn(queryOptions);
+
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
(ZkHelixPropertyStore<ZNRecord>) mock(ZkHelixPropertyStore.class);
+
+    MultiStageReplicaGroupSelector multiStageSelector =
+        new MultiStageReplicaGroupSelector(offlineTableName, propertyStore, 
brokerMetrics, null);
+    multiStageSelector = spy(multiStageSelector);
+    
doReturn(instancePartitions).when(multiStageSelector).getInstancePartitions();
+
+    List<String> enabledInstances = new ArrayList<>();
+    IdealState idealState = new IdealState(offlineTableName);
+    Map<String, Map<String, String>> idealStateSegmentAssignment = 
idealState.getRecord().getMapFields();
+    ExternalView externalView = new ExternalView(offlineTableName);
+    Map<String, Map<String, String>> externalViewSegmentAssignment = 
externalView.getRecord().getMapFields();
+    Set<String> onlineSegments = new HashSet<>();
+
+    // Mark all instances as enabled
+    for (int i = 0; i < 4; i++) {
+      enabledInstances.add(String.format("instance-%d", i));
+    }
+
+    List<String> segments = getSegments();
+
+    // Create two idealState and externalView maps. One is used for segments 
with replica-group=0 and the other for rg=1
+    Map<String, String> idealStateInstanceStateMap0 = new TreeMap<>();
+    Map<String, String> externalViewInstanceStateMap0 = new TreeMap<>();
+    Map<String, String> idealStateInstanceStateMap1 = new TreeMap<>();
+    Map<String, String> externalViewInstanceStateMap1 = new TreeMap<>();
+
+    // instance-0 and instance-2 mirror each other in the two replica-groups. 
Same for instance-1 and instance-3.
+    for (int i = 0; i < 4; i++) {
+      String instance = enabledInstances.get(i);
+      if (i % 2 == 0) {
+        idealStateInstanceStateMap0.put(instance, ONLINE);
+        externalViewInstanceStateMap0.put(instance, ONLINE);
+      } else {
+        idealStateInstanceStateMap1.put(instance, ONLINE);
+        externalViewInstanceStateMap1.put(instance, ONLINE);
+      }
+    }
+
+    // Even numbered segments get assigned to [instance-0, instance-2], and 
odd numbered segments get assigned to
+    // [instance-1,instance-3].
+    for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) {
+      String segment = segments.get(segmentNum);
+      if (segmentNum % 2 == 0) {
+        idealStateSegmentAssignment.put(segment, idealStateInstanceStateMap0);
+        externalViewSegmentAssignment.put(segment, 
externalViewInstanceStateMap0);
+      } else {
+        idealStateSegmentAssignment.put(segment, idealStateInstanceStateMap1);
+        externalViewSegmentAssignment.put(segment, 
externalViewInstanceStateMap1);
+      }
+      onlineSegments.add(segment);
+    }
+
+    multiStageSelector.init(new HashSet<>(enabledInstances), idealState, 
externalView, onlineSegments);
+
+    // Using requestId=0 should select replica-group 0. Even segments get 
assigned to instance-0 and odd segments get
+    // assigned to instance-1.
+    Map<String, String> expectedReplicaGroupInstanceSelectorResult = new 
HashMap<>();
+    for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) {
+      expectedReplicaGroupInstanceSelectorResult.put(segments.get(segmentNum), 
replicaGroup0.get(segmentNum % 2));
+    }
+    InstanceSelector.SelectionResult selectionResult = 
multiStageSelector.select(brokerRequest, segments, 0);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
+
+    // Using same requestId again should return the same selection
+    selectionResult = multiStageSelector.select(brokerRequest, segments, 0);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
+
+    // Using requestId=1 should select replica-group 1
+    expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
+    for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) {
+      expectedReplicaGroupInstanceSelectorResult.put(segments.get(segmentNum), 
replicaGroup1.get(segmentNum % 2));
+    }
+    selectionResult = multiStageSelector.select(brokerRequest, segments, 1);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
+
+    // If instance-0 is down, replica-group 1 should be picked even with 
requestId=0
+    enabledInstances.remove("instance-0");
+    multiStageSelector.init(new HashSet<>(enabledInstances), idealState, 
externalView, onlineSegments);
+    selectionResult = multiStageSelector.select(brokerRequest, segments, 0);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
+
+    // If instance-2 also goes down, no replica-group is eligible
+    enabledInstances.remove("instance-2");
+    multiStageSelector.init(new HashSet<>(enabledInstances), idealState, 
externalView, onlineSegments);
+    try {
+      multiStageSelector.select(brokerRequest, segments, 0);
+      fail("Method call above should have failed");
+    } catch (Exception ignored) {
+    }
   }
 
   @Test
@@ -880,10 +1015,10 @@ public class InstanceSelectorTest {
     PinotQuery pinotQuery = mock(PinotQuery.class);
     when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery);
     when(pinotQuery.getQueryOptions()).thenReturn(null);
-    InstanceSelector.SelectionResult selectionResult = 
balancedInstanceSelector.select(brokerRequest, segments);
+    InstanceSelector.SelectionResult selectionResult = 
balancedInstanceSelector.select(brokerRequest, segments, 0);
     assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
     assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
-    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments, 0);
     assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
     assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
 
@@ -904,10 +1039,10 @@ public class InstanceSelectorTest {
       enabledInstances.add(errorInstance);
       balancedInstanceSelector.onInstancesChange(enabledInstances, 
Collections.singletonList(errorInstance));
       strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances, 
Collections.singletonList(errorInstance));
-      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments);
+      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments, 0);
       assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
       assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
-      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
       assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
       assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
 
@@ -925,10 +1060,10 @@ public class InstanceSelectorTest {
       enabledInstances.add(instance);
       balancedInstanceSelector.onInstancesChange(enabledInstances, 
Collections.singletonList(instance));
       strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances, 
Collections.singletonList(instance));
-      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments);
+      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments, 0);
       assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
       assertTrue(selectionResult.getUnavailableSegments().isEmpty());
-      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
       assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
       assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
@@ -936,10 +1071,10 @@ public class InstanceSelectorTest {
       idealStateInstanceStateMap.put(instance, ONLINE);
       balancedInstanceSelector.onAssignmentChange(idealState, externalView, 
onlineSegments);
       strictReplicaGroupInstanceSelector.onAssignmentChange(idealState, 
externalView, onlineSegments);
-      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments);
+      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments, 0);
       assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
       assertTrue(selectionResult.getUnavailableSegments().isEmpty());
-      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
       assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
       assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
@@ -958,10 +1093,10 @@ public class InstanceSelectorTest {
       externalViewInstanceStateMap1.put(instance, ONLINE);
       balancedInstanceSelector.onAssignmentChange(idealState, externalView, 
onlineSegments);
       strictReplicaGroupInstanceSelector.onAssignmentChange(idealState, 
externalView, onlineSegments);
-      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments);
+      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments, 0);
       assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
       assertTrue(selectionResult.getUnavailableSegments().isEmpty());
-      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
       assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
       assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
@@ -969,10 +1104,10 @@ public class InstanceSelectorTest {
       idealStateInstanceStateMap.remove(instance);
       balancedInstanceSelector.onAssignmentChange(idealState, externalView, 
onlineSegments);
       strictReplicaGroupInstanceSelector.onAssignmentChange(idealState, 
externalView, onlineSegments);
-      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments);
+      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments, 0);
       assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
       assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
-      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
       assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
       assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
 
@@ -993,10 +1128,10 @@ public class InstanceSelectorTest {
       externalViewInstanceStateMap1.put(errorInstance, ONLINE);
       balancedInstanceSelector.onAssignmentChange(idealState, externalView, 
onlineSegments);
       strictReplicaGroupInstanceSelector.onAssignmentChange(idealState, 
externalView, onlineSegments);
-      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments);
+      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments, 0);
       assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
       assertTrue(selectionResult.getUnavailableSegments().isEmpty());
-      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
       assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
       assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
 
@@ -1017,10 +1152,10 @@ public class InstanceSelectorTest {
       externalViewInstanceStateMap1.put(errorInstance, ERROR);
       balancedInstanceSelector.onAssignmentChange(idealState, externalView, 
onlineSegments);
       strictReplicaGroupInstanceSelector.onAssignmentChange(idealState, 
externalView, onlineSegments);
-      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments);
+      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments, 0);
       assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
       assertTrue(selectionResult.getUnavailableSegments().isEmpty());
-      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
       assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
       assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
@@ -1038,10 +1173,10 @@ public class InstanceSelectorTest {
       enabledInstances.remove(instance);
       balancedInstanceSelector.onInstancesChange(enabledInstances, 
Collections.singletonList(instance));
       strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances, 
Collections.singletonList(instance));
-      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments);
+      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments, 0);
       assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
       assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
-      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
       assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
       assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
 
@@ -1061,10 +1196,10 @@ public class InstanceSelectorTest {
       externalViewInstanceStateMap0.put(errorInstance, ONLINE);
       balancedInstanceSelector.onAssignmentChange(idealState, externalView, 
onlineSegments);
       strictReplicaGroupInstanceSelector.onAssignmentChange(idealState, 
externalView, onlineSegments);
-      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments);
+      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments, 0);
       assertEquals(selectionResult.getSegmentToInstanceMap().size(), 1);
       assertEquals(selectionResult.getUnavailableSegments(), 
Collections.singletonList(segment1));
-      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
       assertEquals(selectionResult.getSegmentToInstanceMap().size(), 1);
       assertEquals(selectionResult.getUnavailableSegments(), 
Collections.singletonList(segment1));
 
@@ -1082,10 +1217,10 @@ public class InstanceSelectorTest {
       enabledInstances.remove(errorInstance);
       balancedInstanceSelector.onInstancesChange(enabledInstances, 
Collections.singletonList(errorInstance));
       strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances, 
Collections.singletonList(errorInstance));
-      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments);
+      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments, 0);
       assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
       assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
-      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments);
+      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments, 0);
       assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
       assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
 
@@ -1106,10 +1241,10 @@ public class InstanceSelectorTest {
       externalViewInstanceStateMap1.put(instance, CONSUMING);
       balancedInstanceSelector.onAssignmentChange(idealState, externalView, 
onlineSegments);
       strictReplicaGroupInstanceSelector.onAssignmentChange(idealState, 
externalView, onlineSegments);
-      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments);
+      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments, 0);
       assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
       assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
-      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
       assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
       assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java 
b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
index db535dcaa6..857f0207da 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
@@ -50,7 +50,7 @@ public interface RoutingManager {
    * @param brokerRequest the broker request constructed from a query.
    * @return the route table.
    */
-  RoutingTable getRoutingTable(BrokerRequest brokerRequest);
+  RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId);
 
   /**
    * Validate routing exist for a table
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 814837ec15..d84a70d3f8 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -125,11 +125,11 @@ public class QueryEnvironment {
    * @param sqlNodeAndOptions parsed SQL query.
    * @return a dispatchable query plan
    */
-  public QueryPlan planQuery(String sqlQuery, SqlNodeAndOptions 
sqlNodeAndOptions) {
+  public QueryPlan planQuery(String sqlQuery, SqlNodeAndOptions 
sqlNodeAndOptions, long requestId) {
     try (PlannerContext plannerContext = new PlannerContext(_config, 
_catalogReader, _typeFactory, _hepProgram)) {
       plannerContext.setOptions(sqlNodeAndOptions.getOptions());
       RelRoot relRoot = compileQuery(sqlNodeAndOptions.getSqlNode(), 
plannerContext);
-      return toDispatchablePlan(relRoot, plannerContext);
+      return toDispatchablePlan(relRoot, plannerContext, requestId);
     } catch (CalciteContextException e) {
       throw new RuntimeException("Error composing query plan for '" + sqlQuery
           + "': " + e.getMessage() + "'", e);
@@ -141,9 +141,9 @@ public class QueryEnvironment {
   /**
    * Explain a SQL query.
    *
-   * Similar to {@link QueryEnvironment#planQuery(String, SqlNodeAndOptions)}, 
this API runs the query compilation.
-   * But it doesn't run the distributed {@link QueryPlan} generation, instead 
it only returns the explained logical
-   * plan.
+   * Similar to {@link QueryEnvironment#planQuery(String, SqlNodeAndOptions, 
long)}, this API runs the query
+   * compilation. But it doesn't run the distributed {@link QueryPlan} 
generation, instead it only returns the
+   * explained logical plan.
    *
    * @param sqlQuery SQL query string.
    * @param sqlNodeAndOptions parsed SQL query.
@@ -165,7 +165,7 @@ public class QueryEnvironment {
 
   @VisibleForTesting
   public QueryPlan planQuery(String sqlQuery) {
-    return planQuery(sqlQuery, 
CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery));
+    return planQuery(sqlQuery, 
CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery), 0);
   }
 
   @VisibleForTesting
@@ -219,9 +219,9 @@ public class QueryEnvironment {
     }
   }
 
-  private QueryPlan toDispatchablePlan(RelRoot relRoot, PlannerContext 
plannerContext) {
+  private QueryPlan toDispatchablePlan(RelRoot relRoot, PlannerContext 
plannerContext, long requestId) {
     // 5. construct a dispatchable query plan.
-    StagePlanner queryStagePlanner = new StagePlanner(plannerContext, 
_workerManager);
+    StagePlanner queryStagePlanner = new StagePlanner(plannerContext, 
_workerManager, requestId);
     return queryStagePlanner.makePlan(relRoot);
   }
 
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 2d61856c85..5f46b23d26 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -44,10 +44,12 @@ public class StagePlanner {
   private final PlannerContext _plannerContext;   // DO NOT REMOVE.
   private final WorkerManager _workerManager;
   private int _stageIdCounter;
+  private long _requestId;
 
-  public StagePlanner(PlannerContext plannerContext, WorkerManager 
workerManager) {
+  public StagePlanner(PlannerContext plannerContext, WorkerManager 
workerManager, long requestId) {
     _plannerContext = plannerContext;
     _workerManager = workerManager;
+    _requestId = requestId;
   }
 
   /**
@@ -79,7 +81,7 @@ public class StagePlanner {
 
     // assign workers to each stage.
     for (Map.Entry<Integer, StageMetadata> e : 
queryPlan.getStageMetadataMap().entrySet()) {
-      _workerManager.assignWorkerToStage(e.getKey(), e.getValue());
+      _workerManager.assignWorkerToStage(e.getKey(), e.getValue(), _requestId);
     }
 
     return queryPlan;
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 112aec606b..42bb19d269 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -58,12 +58,12 @@ public class WorkerManager {
     _routingManager = routingManager;
   }
 
-  public void assignWorkerToStage(int stageId, StageMetadata stageMetadata) {
+  public void assignWorkerToStage(int stageId, StageMetadata stageMetadata, 
long requestId) {
     List<String> scannedTables = stageMetadata.getScannedTables();
     if (scannedTables.size() == 1) {
       // table scan stage, need to attach server as well as segment info for 
each physical table type.
       String logicalTableName = scannedTables.get(0);
-      Map<String, RoutingTable> routingTableMap = 
getRoutingTable(logicalTableName);
+      Map<String, RoutingTable> routingTableMap = 
getRoutingTable(logicalTableName, requestId);
       if (routingTableMap.size() == 0) {
         throw new IllegalArgumentException("Unable to find routing entries for 
table: " + logicalTableName);
       }
@@ -124,22 +124,22 @@ public class WorkerManager {
    * @param logicalTableName it can either be a hybrid table name or a 
physical table name with table type.
    * @return keyed-map from table type(s) to routing table(s).
    */
-  private Map<String, RoutingTable> getRoutingTable(String logicalTableName) {
+  private Map<String, RoutingTable> getRoutingTable(String logicalTableName, 
long requestId) {
     String rawTableName = 
TableNameBuilder.extractRawTableName(logicalTableName);
     TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(logicalTableName);
     Map<String, RoutingTable> routingTableMap = new HashMap<>();
     RoutingTable routingTable;
     if (tableType == null) {
-      routingTable = getRoutingTable(rawTableName, TableType.OFFLINE);
+      routingTable = getRoutingTable(rawTableName, TableType.OFFLINE, 
requestId);
       if (routingTable != null) {
         routingTableMap.put(TableType.OFFLINE.name(), routingTable);
       }
-      routingTable = getRoutingTable(rawTableName, TableType.REALTIME);
+      routingTable = getRoutingTable(rawTableName, TableType.REALTIME, 
requestId);
       if (routingTable != null) {
         routingTableMap.put(TableType.REALTIME.name(), routingTable);
       }
     } else {
-      routingTable = getRoutingTable(logicalTableName, tableType);
+      routingTable = getRoutingTable(logicalTableName, tableType, requestId);
       if (routingTable != null) {
         routingTableMap.put(tableType.name(), routingTable);
       }
@@ -147,10 +147,10 @@ public class WorkerManager {
     return routingTableMap;
   }
 
-  private RoutingTable getRoutingTable(String tableName, TableType tableType) {
+  private RoutingTable getRoutingTable(String tableName, TableType tableType, 
long requestId) {
     String tableNameWithType = 
TableNameBuilder.forType(tableType).tableNameWithType(
         TableNameBuilder.extractRawTableName(tableName));
     return _routingManager.getRoutingTable(
-        CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + 
tableNameWithType));
+        CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + 
tableNameWithType), requestId);
   }
 }
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
index 9d2d5e1a22..5e75de0455 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
@@ -144,7 +144,7 @@ public class MockRoutingManagerFactory {
     }
 
     @Override
-    public RoutingTable getRoutingTable(BrokerRequest brokerRequest) {
+    public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long 
requestId) {
       String tableName = 
brokerRequest.getPinotQuery().getDataSource().getTableName();
       return _routingTableMap.getOrDefault(tableName,
           
_routingTableMap.get(TableNameBuilder.extractRawTableName(tableName)));
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
index 2c238aa287..8af5773675 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
@@ -31,6 +31,7 @@ public class RoutingConfig extends BaseJsonConfig {
   public static final String EMPTY_SEGMENT_PRUNER_TYPE = "empty";
   public static final String REPLICA_GROUP_INSTANCE_SELECTOR_TYPE = 
"replicaGroup";
   public static final String STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE = 
"strictReplicaGroup";
+  public static final String MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE = 
"multiStageReplicaGroup";
 
   // Replaced by _segmentPrunerTypes and _instanceSelectorType
   @Deprecated


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to