This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fbf08426ee Execute Queries on Logical Tables in SSE (#15634)
fbf08426ee is described below

commit fbf08426ee082a8bab503d9807752c4cf424280a
Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com>
AuthorDate: Thu May 15 15:22:34 2025 +0530

    Execute Queries on Logical Tables in SSE (#15634)
    
    * Add support to execute logical tables.
    
    Add LogicalTableRouteProvider/RouteInfo and tests
    
    Checkpoint while adding support for disabled tables.
    
    Add tests for logical table execution
    
    Fix compilation issues due to 15573
    
    Fix isDisabled unit tests
    
    Fix check
    
    Fix compilation errors
    
    Remove files
    
    Rename to PhysicalTableInfo
    
    Checkstyle
    
    Code cleanup
    
    Use ImplicitHybridTableRouteInfo and Provider
    
    Consolidate checks in BaseTableRouteInfo
    
    Make sure the logical table setup is correct
    
    Checkstyle
    
    Create schema for the logical table
    
    Move QPS back to where it was.
    
    Use Interface
    
    Compilation
    
    * Integrate query options. Add tests for some of them.
    
    * Checkstyle
    
    * Fix tests
    
    * Address review comments.
    
    * Use table config to get server tenant.
    
    * Address review comments.
---
 .../BaseSingleStageBrokerRequestHandler.java       | 125 ++++--
 .../pinot/broker/routing/BrokerRoutingManager.java |   6 +
 .../api/resources/PinotLogicalTableResource.java   |   7 +-
 .../query/executor/LogicalTableExecutionInfo.java  | 174 ++++++++
 .../query/executor/ServerQueryExecutorV1Impl.java  |   4 +
 .../core/query/executor/TableExecutionInfo.java    |  46 +-
 .../apache/pinot/core/routing/RoutingManager.java  |  10 +
 .../pinot/core/transport/BaseTableRouteInfo.java   |  64 +++
 .../transport/ImplicitHybridTableRouteInfo.java    |  65 +--
 .../pinot/core/transport/TableRouteInfo.java       |  23 +-
 .../BaseLogicalTableIntegrationTest.java           | 491 +++++++++++++++++++++
 ...calTableWithOneOfflineTableIntegrationTest.java |  29 ++
 ...ableWithTwelveOfflineTablesIntegrationTest.java |  31 ++
 ...alTableWithTwoOfflineTablesIntegrationTest.java |  29 ++
 .../query/routing/table/LogicalTableRouteInfo.java | 391 ++++++++++++++++
 .../routing/table/LogicalTableRouteProvider.java   | 141 ++++++
 .../routing/table/PhysicalTableRouteProvider.java  | 111 +++++
 .../query/routing/table/BaseTableRouteTest.java    | 166 +++++++
 ...HybridTableRouteProviderCalculateRouteTest.java | 102 -----
 ...tHybridTableRouteProviderGetTableRouteTest.java |  20 -
 ...ogicalTableRouteProviderCalculateRouteTest.java | 126 ++++++
 .../LogicalTableRouteProviderGetRouteTest.java     | 309 +++++++++++++
 .../query/testutils/MockRoutingManagerFactory.java |   6 +
 23 files changed, 2247 insertions(+), 229 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index bea1c14bdd..fd880bdfc6 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -92,6 +92,7 @@ import org.apache.pinot.core.transport.TableRouteInfo;
 import org.apache.pinot.core.util.GapfillUtils;
 import org.apache.pinot.query.parser.utils.ParserUtils;
 import org.apache.pinot.query.routing.table.ImplicitHybridTableRouteProvider;
+import org.apache.pinot.query.routing.table.LogicalTableRouteProvider;
 import org.apache.pinot.query.routing.table.TableRouteProvider;
 import org.apache.pinot.segment.local.function.GroovyFunctionEvaluator;
 import org.apache.pinot.spi.auth.AuthorizationResult;
@@ -100,6 +101,7 @@ import org.apache.pinot.spi.config.table.QueryConfig;
 import org.apache.pinot.spi.config.table.RoutingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
@@ -155,6 +157,7 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
   protected ExecutorService _multistageCompileExecutor;
   protected BlockingQueue<Pair<String, String>> _multistageCompileQueryQueue;
   protected ImplicitHybridTableRouteProvider _implicitHybridTableRouteProvider;
+  protected LogicalTableRouteProvider _logicalTableRouteProvider;
 
   public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String 
brokerId,
       BrokerRoutingManager routingManager, AccessControlFactory 
accessControlFactory,
@@ -190,6 +193,7 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
         Broker.DEFAULT_USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA);
 
     _implicitHybridTableRouteProvider = new ImplicitHybridTableRouteProvider();
+    _logicalTableRouteProvider = new LogicalTableRouteProvider();
 
     LOGGER.info("Initialized {} with broker id: {}, timeout: {}ms, query 
response limit: {}, "
             + "default query limit {}, query log max length: {}, query log max 
rate: {}, query cancellation "
@@ -378,6 +382,8 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
     String rawTableName = compileResult._rawTableName;
     PinotQuery pinotQuery = compileResult._pinotQuery;
     PinotQuery serverPinotQuery = compileResult._serverPinotQuery;
+    LogicalTableConfig logicalTableConfig = 
_tableCache.getLogicalTableConfig(rawTableName);
+    String database = 
DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(tableName);
     long compilationEndTimeNs = System.nanoTime();
     // full request compile time = compilationTimeNs + parserTimeNs
     _brokerMetrics.addPhaseTiming(rawTableName, 
BrokerQueryPhase.REQUEST_COMPILATION,
@@ -388,17 +394,55 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
     BrokerRequest brokerRequest = 
CalciteSqlCompiler.convertToBrokerRequest(pinotQuery);
     BrokerRequest serverBrokerRequest =
         serverPinotQuery == pinotQuery ? brokerRequest : 
CalciteSqlCompiler.convertToBrokerRequest(serverPinotQuery);
-    AuthorizationResult authorizationResult = 
accessControl.authorize(requesterIdentity, serverBrokerRequest);
 
-    _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.AUTHORIZATION,
-        System.nanoTime() - compilationEndTimeNs);
+    TableRouteProvider routeProvider;
 
-    if (!authorizationResult.hasAccess()) {
-      throwAccessDeniedError(requestId, query, requestContext, tableName, 
authorizationResult);
+    if (logicalTableConfig != null) {
+      Set<String> physicalTableNames = 
logicalTableConfig.getPhysicalTableConfigMap().keySet();
+      AuthorizationResult authorizationResult =
+          hasTableAccess(requesterIdentity, physicalTableNames, 
requestContext, httpHeaders);
+      if (!authorizationResult.hasAccess()) {
+        throwAccessDeniedError(requestId, query, requestContext, tableName, 
authorizationResult);
+      }
+
+      // Validate QPS
+      if (hasExceededQPSQuota(database, physicalTableNames, requestContext)) {
+        String errorMessage = String.format("Request %d: %s exceeds query 
quota.", requestId, query);
+        return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, 
errorMessage);
+      }
+
+      routeProvider = _logicalTableRouteProvider;
+    } else {
+      AuthorizationResult authorizationResult = 
accessControl.authorize(requesterIdentity, serverBrokerRequest);
+
+      _brokerMetrics.addPhaseTiming(rawTableName, 
BrokerQueryPhase.AUTHORIZATION,
+          System.nanoTime() - compilationEndTimeNs);
+
+      if (!authorizationResult.hasAccess()) {
+        throwAccessDeniedError(requestId, query, requestContext, tableName, 
authorizationResult);
+      }
+
+      // Validate QPS quota
+      if (!_queryQuotaManager.acquireDatabase(database)) {
+        String errorMessage =
+            String.format("Request %d: %s exceeds query quota for database: 
%s", requestId, query, database);
+        LOGGER.info(errorMessage);
+        requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS);
+        return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, 
errorMessage);
+      }
+      if (!_queryQuotaManager.acquire(tableName)) {
+        String errorMessage =
+            String.format("Request %d: %s exceeds query quota for table: %s", 
requestId, query, tableName);
+        LOGGER.info(errorMessage);
+        requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS);
+        _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.QUERY_QUOTA_EXCEEDED, 1);
+        return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, 
errorMessage);
+      }
+
+      routeProvider = _implicitHybridTableRouteProvider;
     }
 
     // Get the tables hit by the request
-    TableRouteProvider routeProvider = _implicitHybridTableRouteProvider;
     TableRouteInfo routeInfo = routeProvider.getTableRouteInfo(tableName, 
_tableCache, _routingManager);
 
     if (!routeInfo.isExists()) {
@@ -418,32 +462,16 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
     String realtimeTableName = routeInfo.getRealtimeTableName();
     TableConfig offlineTableConfig = routeInfo.getOfflineTableConfig();
     TableConfig realtimeTableConfig = routeInfo.getRealtimeTableConfig();
+    QueryConfig offlineTableQueryConfig = 
routeInfo.getOfflineTableQueryConfig();
+    QueryConfig realtimeTableQueryConfig = 
routeInfo.getRealtimeTableQueryConfig();
     TimeBoundaryInfo timeBoundaryInfo = routeInfo.getTimeBoundaryInfo();
 
-    HandlerContext handlerContext = getHandlerContext(offlineTableConfig, 
realtimeTableConfig);
+    HandlerContext handlerContext = getHandlerContext(offlineTableQueryConfig, 
realtimeTableQueryConfig);
     validateGroovyScript(serverPinotQuery, handlerContext._disableGroovy);
     if (handlerContext._useApproximateFunction) {
       handleApproximateFunctionOverride(serverPinotQuery);
     }
 
-    // Validate QPS quota
-    String database = 
DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(tableName);
-    if (!_queryQuotaManager.acquireDatabase(database)) {
-      String errorMessage =
-          String.format("Request %d: %s exceeds query quota for database: %s", 
requestId, query, database);
-      LOGGER.info(errorMessage);
-      requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS);
-      return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, 
errorMessage);
-    }
-    if (!_queryQuotaManager.acquire(tableName)) {
-      String errorMessage =
-          String.format("Request %d: %s exceeds query quota for table: %s", 
requestId, query, tableName);
-      LOGGER.info(errorMessage);
-      requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS);
-      _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.QUERY_QUOTA_EXCEEDED, 1);
-      return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, 
errorMessage);
-    }
-
     // Validate the request
     try {
       validateRequest(serverPinotQuery, _queryResponseLimit);
@@ -492,8 +520,8 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
       realtimeBrokerRequest = 
CalciteSqlCompiler.convertToBrokerRequest(realtimePinotQuery);
 
       requestContext.setFanoutType(RequestContext.FanoutType.HYBRID);
-      requestContext.setOfflineServerTenant(getServerTenant(offlineTableName));
-      
requestContext.setRealtimeServerTenant(getServerTenant(realtimeTableName));
+      
requestContext.setOfflineServerTenant(getServerTenant(offlineTableConfig));
+      
requestContext.setRealtimeServerTenant(getServerTenant(realtimeTableConfig));
     } else if (routeInfo.isOffline()) {
       // OFFLINE only
       setTableName(serverBrokerRequest, offlineTableName);
@@ -503,7 +531,7 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
       offlineBrokerRequest = serverBrokerRequest;
 
       requestContext.setFanoutType(RequestContext.FanoutType.OFFLINE);
-      requestContext.setOfflineServerTenant(getServerTenant(offlineTableName));
+      
requestContext.setOfflineServerTenant(getServerTenant(offlineTableConfig));
     } else {
       // REALTIME only
       setTableName(serverBrokerRequest, realtimeTableName);
@@ -513,7 +541,7 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
       realtimeBrokerRequest = serverBrokerRequest;
 
       requestContext.setFanoutType(RequestContext.FanoutType.REALTIME);
-      
requestContext.setRealtimeServerTenant(getServerTenant(realtimeTableName));
+      
requestContext.setRealtimeServerTenant(getServerTenant(realtimeTableConfig));
     }
 
     // Check if response can be sent without server query evaluation.
@@ -623,13 +651,16 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
     //       each table type, and broker should wait for both types to return.
     long remainingTimeMs = 0;
     try {
+      Long logicalTableQueryTimeout = logicalTableConfig != null && 
logicalTableConfig.getQueryConfig() != null
+          && logicalTableConfig.getQueryConfig().getTimeoutMs() != null ? 
logicalTableConfig.getQueryConfig()
+          .getTimeoutMs() : null;
       if (offlineBrokerRequest != null) {
-        remainingTimeMs =
-            setQueryTimeout(offlineTableName, 
offlineBrokerRequest.getPinotQuery().getQueryOptions(), timeSpentMs);
+        remainingTimeMs = setQueryTimeout(offlineTableName, 
logicalTableQueryTimeout,
+            offlineBrokerRequest.getPinotQuery().getQueryOptions(), 
timeSpentMs);
       }
       if (realtimeBrokerRequest != null) {
-        remainingTimeMs = Math.max(remainingTimeMs,
-            setQueryTimeout(realtimeTableName, 
realtimeBrokerRequest.getPinotQuery().getQueryOptions(), timeSpentMs));
+        remainingTimeMs = Math.max(remainingTimeMs, 
setQueryTimeout(realtimeTableName, logicalTableQueryTimeout,
+            realtimeBrokerRequest.getPinotQuery().getQueryOptions(), 
timeSpentMs));
       }
     } catch (TimeoutException e) {
       String errorMessage = e.getMessage();
@@ -650,7 +681,7 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
     }
     if (offlineBrokerRequest != null) {
       Map<String, String> queryOptions = 
offlineBrokerRequest.getPinotQuery().getQueryOptions();
-      setMaxServerResponseSizeBytes(numServers, queryOptions, 
offlineTableConfig);
+      setMaxServerResponseSizeBytes(numServers, queryOptions, 
offlineTableQueryConfig);
       // Set the query option to directly return final result for single 
server query unless it is explicitly disabled
       if (numServers == 1) {
         // Set the same flag in the original server request to be used in the 
reduce phase for hybrid table
@@ -663,7 +694,7 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
     }
     if (realtimeBrokerRequest != null) {
       Map<String, String> queryOptions = 
realtimeBrokerRequest.getPinotQuery().getQueryOptions();
-      setMaxServerResponseSizeBytes(numServers, queryOptions, 
realtimeTableConfig);
+      setMaxServerResponseSizeBytes(numServers, queryOptions, 
realtimeTableQueryConfig);
       // Set the query option to directly return final result for single 
server query unless it is explicitly disabled
       if (numServers == 1) {
         // Set the same flag in the original server request to be used in the 
reduce phase for hybrid table
@@ -1047,10 +1078,8 @@ public abstract class 
BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ
     return TRUE.equals(pinotQuery.getFilterExpression());
   }
 
-  private String getServerTenant(String tableNameWithType) {
-    TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+  private String getServerTenant(@Nullable TableConfig tableConfig) {
     if (tableConfig == null) {
-      LOGGER.debug("Table config is not available for table {}", 
tableNameWithType);
       return "unknownTenant";
     }
     return tableConfig.getTenantConfig().getServer();
@@ -1393,12 +1422,11 @@ public abstract class 
BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ
     }
   }
 
-  private HandlerContext getHandlerContext(@Nullable TableConfig 
offlineTableConfig,
-      @Nullable TableConfig realtimeTableConfig) {
+  private HandlerContext getHandlerContext(@Nullable QueryConfig 
offlineTableQueryConfig,
+      @Nullable QueryConfig realtimeTableQueryConfig) {
     Boolean disableGroovyOverride = null;
     Boolean useApproximateFunctionOverride = null;
-    if (offlineTableConfig != null && offlineTableConfig.getQueryConfig() != 
null) {
-      QueryConfig offlineTableQueryConfig = 
offlineTableConfig.getQueryConfig();
+    if (offlineTableQueryConfig != null) {
       Boolean disableGroovyOfflineTableOverride = 
offlineTableQueryConfig.getDisableGroovy();
       if (disableGroovyOfflineTableOverride != null) {
         disableGroovyOverride = disableGroovyOfflineTableOverride;
@@ -1408,8 +1436,7 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
         useApproximateFunctionOverride = 
useApproximateFunctionOfflineTableOverride;
       }
     }
-    if (realtimeTableConfig != null && realtimeTableConfig.getQueryConfig() != 
null) {
-      QueryConfig realtimeTableQueryConfig = 
realtimeTableConfig.getQueryConfig();
+    if (realtimeTableQueryConfig != null) {
       Boolean disableGroovyRealtimeTableOverride = 
realtimeTableQueryConfig.getDisableGroovy();
       if (disableGroovyRealtimeTableOverride != null) {
         if (disableGroovyOverride == null) {
@@ -1840,7 +1867,8 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
    * <p>For the overall query timeout, use query-level timeout (in the query 
options) if exists, or use table-level
    * timeout (in the table config) if exists, or use instance-level timeout 
(in the broker config).
    */
-  private long setQueryTimeout(String tableNameWithType, Map<String, String> 
queryOptions, long timeSpentMs)
+  private long setQueryTimeout(String tableNameWithType, Long 
logicalTableQueryTimeout,
+      Map<String, String> queryOptions, long timeSpentMs)
       throws TimeoutException {
     long queryTimeoutMs;
     Long queryLevelTimeoutMs = QueryOptionsUtils.getTimeoutMs(queryOptions);
@@ -1852,6 +1880,8 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
       if (tableLevelTimeoutMs != null) {
         // Use table-level timeout if exists
         queryTimeoutMs = tableLevelTimeoutMs;
+      } else if (logicalTableQueryTimeout != null) {
+        queryTimeoutMs = logicalTableQueryTimeout;
       } else {
         // Use instance-level timeout
         queryTimeoutMs = _brokerTimeoutMs;
@@ -1882,7 +1912,7 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
    * 6. BrokerConfig -> maxServerResponseSizeBytes
    */
   private void setMaxServerResponseSizeBytes(int numServers, Map<String, 
String> queryOptions,
-      @Nullable TableConfig tableConfig) {
+      @Nullable QueryConfig queryConfig) {
     // QueryOption
     if (QueryOptionsUtils.getMaxServerResponseSizeBytes(queryOptions) != null) 
{
       return;
@@ -1895,8 +1925,7 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
     }
 
     // TableConfig
-    if (tableConfig != null && tableConfig.getQueryConfig() != null) {
-      QueryConfig queryConfig = tableConfig.getQueryConfig();
+    if (queryConfig != null) {
       if (queryConfig.getMaxServerResponseSizeBytes() != null) {
         queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
             Long.toString(queryConfig.getMaxServerResponseSizeBytes()));
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index 6d2cfae2cb..9c54f57618 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -642,6 +642,12 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
   @Override
   public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long 
requestId) {
     String tableNameWithType = brokerRequest.getQuerySource().getTableName();
+    return getRoutingTable(brokerRequest, tableNameWithType, requestId);
+  }
+
+  @Nullable
+  @Override
+  public RoutingTable getRoutingTable(BrokerRequest brokerRequest, String 
tableNameWithType, long requestId) {
     RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
     if (routingEntry == null) {
       return null;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java
index cf9cf3b2d4..73265bd832 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java
@@ -152,9 +152,10 @@ public class PinotLogicalTableResource {
   @Authenticate(AccessType.UPDATE)
   @ApiOperation(value = "Update a logical table", notes = "Updates a logical 
table")
   @ApiResponses(value = {
-      @ApiResponse(code = 200, message = "Successfully updated schema"), 
@ApiResponse(code = 404, message = "Schema "
-      + "not found"), @ApiResponse(code = 400, message = "Missing or invalid 
request body"), @ApiResponse(code = 500,
-      message = "Internal error")
+      @ApiResponse(code = 200, message = "Successfully updated logical table"),
+      @ApiResponse(code = 404, message = "Logical Table not found"),
+      @ApiResponse(code = 400, message = "Missing or invalid request body"),
+      @ApiResponse(code = 500, message = "Internal error")
   })
   public SuccessResponse updateLogicalTable(
       @ApiParam(value = "Name of the logical table", required = true) 
@PathParam("tableName") String tableName,
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/LogicalTableExecutionInfo.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/LogicalTableExecutionInfo.java
new file mode 100644
index 0000000000..3a9c63f599
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/LogicalTableExecutionInfo.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.executor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
+import org.apache.pinot.core.query.pruner.SegmentPrunerService;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.TableSegmentsContext;
+import org.apache.pinot.core.query.request.context.TimerContext;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LogicalTableExecutionInfo implements TableExecutionInfo {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LogicalTableExecutionInfo.class);
+
+  public static LogicalTableExecutionInfo create(InstanceDataManager 
instanceDataManager,
+      ServerQueryRequest queryRequest, QueryContext queryContext)
+      throws TableNotFoundException {
+    List<TableSegmentsContext> tableSegmentsContexts = 
queryRequest.getTableSegmentsContexts();
+    List<SingleTableExecutionInfo> tableExecutionInfos =
+        new ArrayList<>(Objects.requireNonNull(tableSegmentsContexts).size());
+    for (TableSegmentsContext tableSegmentsContext : tableSegmentsContexts) {
+      SingleTableExecutionInfo singleTableExecutionInfo =
+          SingleTableExecutionInfo.create(instanceDataManager, 
tableSegmentsContext.getTableName(),
+              tableSegmentsContext.getSegments(), 
tableSegmentsContext.getOptionalSegments(), queryContext);
+      tableExecutionInfos.add(singleTableExecutionInfo);
+    }
+
+    return new LogicalTableExecutionInfo(tableExecutionInfos);
+  }
+
+  private final List<SingleTableExecutionInfo> _tableExecutionInfos;
+
+  public LogicalTableExecutionInfo(List<SingleTableExecutionInfo> 
tableExecutionInfos) {
+    _tableExecutionInfos = tableExecutionInfos;
+  }
+
+  @Override
+  public boolean hasRealtime() {
+    return _tableExecutionInfos.stream()
+        .anyMatch(tableExecutionInfo -> 
tableExecutionInfo.getTableDataManager() instanceof RealtimeTableDataManager);
+  }
+
+  @Override
+  public SelectedSegmentsInfo getSelectedSegmentsInfo(QueryContext 
queryContext, TimerContext timerContext,
+      ExecutorService executorService, SegmentPrunerService 
segmentPrunerService) {
+    SelectedSegmentsInfo aggregatedSelectedSegmentsInfo = new 
SelectedSegmentsInfo();
+
+    for (SingleTableExecutionInfo tableExecutionInfo : _tableExecutionInfos) {
+      SelectedSegmentsInfo selectedSegmentsInfo =
+          tableExecutionInfo.getSelectedSegmentsInfo(queryContext, 
timerContext, executorService, segmentPrunerService);
+      aggregatedSelectedSegmentsInfo.aggregate(selectedSegmentsInfo);
+    }
+
+    LOGGER.debug("Matched {} segments after pruning", 
aggregatedSelectedSegmentsInfo.getNumSelectedSegments());
+    return aggregatedSelectedSegmentsInfo;
+  }
+
+  @Override
+  public List<IndexSegment> getIndexSegments() {
+    return _tableExecutionInfos.stream().flatMap(tableExecutionInfo -> 
tableExecutionInfo.getIndexSegments().stream())
+        .collect(Collectors.toList());
+  }
+
+  @Nullable
+  @Override
+  public Map<IndexSegment, SegmentContext> getProvidedSegmentContexts() {
+    Map<IndexSegment, SegmentContext> providedSegmentContexts = null;
+    for (SingleTableExecutionInfo tableExecutionInfo : _tableExecutionInfos) {
+      Map<IndexSegment, SegmentContext> segmentContexts = 
tableExecutionInfo.getProvidedSegmentContexts();
+      if (segmentContexts != null) {
+        if (providedSegmentContexts == null) {
+          // First time we are getting segment contexts
+          providedSegmentContexts = new HashMap<>();
+        }
+        providedSegmentContexts.putAll(segmentContexts);
+      }
+    }
+
+    return providedSegmentContexts;
+  }
+
+  @Override
+  public List<String> getSegmentsToQuery() {
+    return _tableExecutionInfos.stream().flatMap(tableExecutionInfo -> 
tableExecutionInfo.getSegmentsToQuery().stream())
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public List<String> getOptionalSegments() {
+    return _tableExecutionInfos.stream()
+        .flatMap(tableExecutionInfo -> 
tableExecutionInfo.getOptionalSegments().stream()).collect(Collectors.toList());
+  }
+
+  @Override
+  public List<SegmentDataManager> getSegmentDataManagers() {
+    return _tableExecutionInfos.stream()
+        .flatMap(tableExecutionInfo -> 
tableExecutionInfo.getSegmentDataManagers().stream())
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public void releaseSegmentDataManagers() {
+    for (SingleTableExecutionInfo tableExecutionInfo : _tableExecutionInfos) {
+      tableExecutionInfo.releaseSegmentDataManagers();
+    }
+  }
+
+  @Override
+  public List<SegmentContext> getSegmentContexts(List<IndexSegment> 
selectedSegments,
+      Map<String, String> queryOptions) {
+    return _tableExecutionInfos.stream()
+        .flatMap(tableExecutionInfo -> 
tableExecutionInfo.getSegmentContexts(selectedSegments, queryOptions).stream())
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public List<String> getNotAcquiredSegments() {
+    return _tableExecutionInfos.stream()
+        .flatMap(tableExecutionInfo -> 
tableExecutionInfo.getNotAcquiredSegments().stream())
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public List<String> getMissingSegments() {
+    return _tableExecutionInfos.stream().flatMap(tableExecutionInfo -> 
tableExecutionInfo.getMissingSegments().stream())
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public ConsumingSegmentsInfo getConsumingSegmentsInfo() {
+    ConsumingSegmentsInfo consumingSegmentsInfo = new ConsumingSegmentsInfo();
+    for (SingleTableExecutionInfo tableExecutionInfo : _tableExecutionInfos) {
+      
consumingSegmentsInfo.aggregate(tableExecutionInfo.getConsumingSegmentsInfo());
+    }
+    return consumingSegmentsInfo;
+  }
+
+  @Override
+  public int getNumSegmentsAcquired() {
+    return 
_tableExecutionInfos.stream().mapToInt(SingleTableExecutionInfo::getNumSegmentsAcquired).sum();
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 6284a4e6bd..aca9b861f9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -193,9 +193,13 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
 
     TableExecutionInfo executionInfo;
     try {
+      if (queryRequest.getTableSegmentsContexts() != null && 
!queryRequest.getTableSegmentsContexts().isEmpty()) {
+        executionInfo = LogicalTableExecutionInfo.create(_instanceDataManager, 
queryRequest, queryContext);
+      } else {
         executionInfo =
             SingleTableExecutionInfo.create(_instanceDataManager, 
tableNameWithType, queryRequest.getSegmentsToQuery(),
                 queryRequest.getOptionalSegments(), queryContext);
+      }
     } catch (TableNotFoundException exception) {
       String errorMessage =
           "Failed to find table: " + exception.getMessage() + " on server: " + 
_instanceDataManager.getInstanceId();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/TableExecutionInfo.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/TableExecutionInfo.java
index 4a9b8968dd..a820bade0d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/TableExecutionInfo.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/TableExecutionInfo.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.query.executor;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -119,10 +120,17 @@ public interface TableExecutionInfo {
    * the number of segments queried, the min index time, the min ingestion 
time and the max end time.
    */
   class ConsumingSegmentsInfo {
-    private final int _numConsumingSegmentsQueried;
-    private final long _minIndexTimeMs;
-    private final long _minIngestionTimeMs;
-    private final long _maxEndTimeMs;
+    private int _numConsumingSegmentsQueried;
+    private long _minIndexTimeMs;
+    private long _minIngestionTimeMs;
+    private long _maxEndTimeMs;
+
+    public ConsumingSegmentsInfo() {
+      _numConsumingSegmentsQueried = 0;
+      _minIndexTimeMs = Long.MAX_VALUE;
+      _minIngestionTimeMs = Long.MAX_VALUE;
+      _maxEndTimeMs = Long.MIN_VALUE;
+    }
 
     public ConsumingSegmentsInfo(int numConsumingSegmentsQueried, long 
minIndexTimeMs, long minIngestionTimeMs,
         long maxEndTimeMs) {
@@ -159,6 +167,13 @@ public interface TableExecutionInfo {
     public long getMaxEndTimeMs() {
       return _maxEndTimeMs;
     }
+
+    public void aggregate(ConsumingSegmentsInfo other) {
+      _numConsumingSegmentsQueried += other.getNumConsumingSegmentsQueried();
+      _minIndexTimeMs = Math.min(_minIndexTimeMs, other.getMinIndexTimeMs());
+      _minIngestionTimeMs = Math.min(_minIngestionTimeMs, 
other.getMinIngestionTimeMs());
+      _maxEndTimeMs = Math.max(_maxEndTimeMs, other.getMaxEndTimeMs());
+    }
   }
 
   /**
@@ -173,6 +188,18 @@ public interface TableExecutionInfo {
     private int _numSelectedSegments;
     private List<SegmentContext> _selectedSegmentContexts;
 
+    public SelectedSegmentsInfo() {
+      _indexSegments = new ArrayList<>();
+      _numTotalDocs = 0;
+      _numTotalSegments = 0;
+      _numSelectedSegments = 0;
+      _selectedSegmentContexts = new ArrayList<>();
+      _prunerStats = new SegmentPrunerStatistics();
+      _prunerStats.setInvalidSegments(0);
+      _prunerStats.setValuePruned(0);
+      _prunerStats.setLimitPruned(0);
+    }
+
     public SelectedSegmentsInfo(List<IndexSegment> indexSegments, long 
numTotalDocs,
         SegmentPrunerStatistics prunerStats, int numTotalSegments, int 
numSelectedSegments,
         List<SegmentContext> selectedSegmentContexts) {
@@ -207,5 +234,16 @@ public interface TableExecutionInfo {
     public List<SegmentContext> getSelectedSegmentContexts() {
       return _selectedSegmentContexts;
     }
+
+    public void aggregate(SelectedSegmentsInfo other) {
+      _indexSegments.addAll(other._indexSegments);
+      _numTotalDocs += other._numTotalDocs;
+      _numTotalSegments += other._numTotalSegments;
+      _numSelectedSegments += other._numSelectedSegments;
+      _selectedSegmentContexts.addAll(other._selectedSegmentContexts);
+      _prunerStats.setInvalidSegments(_prunerStats.getInvalidSegments() + 
other._prunerStats.getInvalidSegments());
+      _prunerStats.setValuePruned(_prunerStats.getValuePruned() + 
other._prunerStats.getValuePruned());
+      _prunerStats.setLimitPruned(_prunerStats.getLimitPruned() + 
other._prunerStats.getLimitPruned());
+    }
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java 
b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
index 0fce0a19d8..64154c97ba 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
@@ -65,6 +65,16 @@ public interface RoutingManager {
   @Nullable
   RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId);
 
+  /**
+   * Get the {@link RoutingTable} for a specific broker request.
+   * @param brokerRequest the broker request constructed from a query.
+   * @param tableNameWithType the name of the table.
+   * @param requestId the request id.
+   * @return the route table.
+   */
+  @Nullable
+  RoutingTable getRoutingTable(BrokerRequest brokerRequest, String 
tableNameWithType, long requestId);
+
   /**
    * Returns the segments that are relevant for the given broker request. 
Returns {@code null} if the table does not
    * exist.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/BaseTableRouteInfo.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/BaseTableRouteInfo.java
new file mode 100644
index 0000000000..ff55ba1586
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/BaseTableRouteInfo.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.transport;
+
+public abstract class BaseTableRouteInfo implements TableRouteInfo {
+
+  @Override
+  public boolean isExists() {
+    return hasOffline() || hasRealtime();
+  }
+
+  @Override
+  public boolean isHybrid() {
+    return hasOffline() && hasRealtime();
+  }
+
+  @Override
+  public boolean isOffline() {
+    return hasOffline() && !hasRealtime();
+  }
+
+  @Override
+  public boolean isRealtime() {
+    return !hasOffline() && hasRealtime();
+  }
+
+  @Override
+  public boolean isRouteExists() {
+    if (isOffline()) {
+      return isOfflineRouteExists();
+    } else if (isRealtime()) {
+      return isRealtimeRouteExists();
+    } else {
+      return isOfflineRouteExists() || isRealtimeRouteExists();
+    }
+  }
+
+  @Override
+  public boolean isDisabled() {
+    if (isOffline()) {
+      return isOfflineTableDisabled();
+    } else if (isRealtime()) {
+      return isRealtimeTableDisabled();
+    } else {
+      return isOfflineTableDisabled() && isRealtimeTableDisabled();
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ImplicitHybridTableRouteInfo.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ImplicitHybridTableRouteInfo.java
index 1556a2237e..00199c2726 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ImplicitHybridTableRouteInfo.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ImplicitHybridTableRouteInfo.java
@@ -28,13 +28,14 @@ import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.core.routing.ServerRouteInfo;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.spi.config.table.QueryConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.query.QueryThreadContext;
 import org.apache.pinot.spi.utils.CommonConstants;
 
 
-public class ImplicitHybridTableRouteInfo implements TableRouteInfo {
+public class ImplicitHybridTableRouteInfo extends BaseTableRouteInfo {
   private String _offlineTableName = null;
   private boolean _isOfflineRouteExists;
   private TableConfig _offlineTableConfig;
@@ -112,6 +113,18 @@ public class ImplicitHybridTableRouteInfo implements 
TableRouteInfo {
     return _realtimeTableConfig;
   }
 
+  @Nullable
+  @Override
+  public QueryConfig getOfflineTableQueryConfig() {
+    return _offlineTableConfig != null ? _offlineTableConfig.getQueryConfig() 
: null;
+  }
+
+  @Nullable
+  @Override
+  public QueryConfig getRealtimeTableQueryConfig() {
+    return _realtimeTableConfig != null ? 
_realtimeTableConfig.getQueryConfig() : null;
+  }
+
   public void setRealtimeTableConfig(TableConfig realtimeTableConfig) {
     _realtimeTableConfig = realtimeTableConfig;
   }
@@ -160,26 +173,6 @@ public class ImplicitHybridTableRouteInfo implements 
TableRouteInfo {
     _realtimeRoutingTable = realtimeRoutingTable;
   }
 
-  @Override
-  public boolean isExists() {
-    return hasOffline() || hasRealtime();
-  }
-
-  @Override
-  public boolean isHybrid() {
-    return hasOffline() && hasRealtime();
-  }
-
-  @Override
-  public boolean isOffline() {
-    return hasOffline() && !hasRealtime();
-  }
-
-  @Override
-  public boolean isRealtime() {
-    return !hasOffline() && hasRealtime();
-  }
-
   /**
    * Offline if offline table config is present.
    * @return true if there is an OFFLINE table, false otherwise
@@ -198,21 +191,7 @@ public class ImplicitHybridTableRouteInfo implements 
TableRouteInfo {
     return _realtimeTableConfig != null;
   }
 
-  /**
-   * Route exists if at least one of the physical tables has a route.
-   * @return true if a route exists, false otherwise
-   */
   @Override
-  public boolean isRouteExists() {
-    if (isOffline()) {
-      return _isOfflineRouteExists;
-    } else if (isRealtime()) {
-      return _isRealtimeRouteExists;
-    } else {
-      return _isOfflineRouteExists || _isRealtimeRouteExists;
-    }
-  }
-
   public boolean isOfflineRouteExists() {
     return _isOfflineRouteExists;
   }
@@ -221,6 +200,7 @@ public class ImplicitHybridTableRouteInfo implements 
TableRouteInfo {
     _isOfflineRouteExists = offlineRouteExists;
   }
 
+  @Override
   public boolean isRealtimeRouteExists() {
     return _isRealtimeRouteExists;
   }
@@ -247,21 +227,6 @@ public class ImplicitHybridTableRouteInfo implements 
TableRouteInfo {
     _isRealtimeTableDisabled = realtimeTableDisabled;
   }
 
-  /**
-   * Disabled if all physical tables are disabled.
-   * @return true if the table is disabled, false
-   */
-  @Override
-  public boolean isDisabled() {
-    if (isOffline()) {
-      return _isOfflineTableDisabled;
-    } else if (isRealtime()) {
-      return _isRealtimeTableDisabled;
-    } else {
-      return _isOfflineTableDisabled && _isRealtimeTableDisabled;
-    }
-  }
-
   @Nullable
   @Override
   public List<String> getDisabledTableNames() {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/TableRouteInfo.java 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/TableRouteInfo.java
index 18e0abcee3..b969d20e52 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/TableRouteInfo.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/TableRouteInfo.java
@@ -26,6 +26,7 @@ import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.core.routing.ServerRouteInfo;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.spi.config.table.QueryConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 
 
@@ -74,6 +75,20 @@ public interface TableRouteInfo {
   @Nullable
   TableConfig getRealtimeTableConfig();
 
+  /**
+   * Gets the query config for the offline table, if available.
+   * @return the query config for the offline table, or null if not available
+   */
+  @Nullable
+  QueryConfig getOfflineTableQueryConfig();
+
+  /**
+   * Gets the query config for the realtime table, if available.
+   * @return the query config for the realtime table, or null if not available
+   */
+  @Nullable
+  QueryConfig getRealtimeTableQueryConfig();
+
   /**
    * Get the set of servers that will execute the query on the segments of the 
OFFLINE table.
    * @return the set of servers that will execute the query on the segments of 
the OFFLINE table
@@ -155,9 +170,9 @@ public interface TableRouteInfo {
    */
   boolean isRouteExists();
 
-  boolean isOfflineTableDisabled();
+  boolean isOfflineRouteExists();
 
-  boolean isRealtimeTableDisabled();
+  boolean isRealtimeRouteExists();
 
   /**
    * Checks if all the physical tables are disabled.
@@ -166,6 +181,10 @@ public interface TableRouteInfo {
    */
   boolean isDisabled();
 
+  boolean isOfflineTableDisabled();
+
+  boolean isRealtimeTableDisabled();
+
   @Nullable
   List<String> getDisabledTableNames();
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
new file mode 100644
index 0000000000..7405a00ce5
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
@@ -0,0 +1,491 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.logicaltable;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.controller.helix.ControllerRequestClient;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet;
+import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
+import org.apache.pinot.integration.tests.QueryGenerator;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
+
+public abstract class BaseLogicalTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(BaseLogicalTableIntegrationTest.class);
+  private static final String DEFAULT_TENANT = "DefaultTenant";
+  private static final String DEFAULT_LOGICAL_TABLE_NAME = "mytable";
+  protected static final String DEFAULT_TABLE_NAME = "physicalTable";
+  private static final int NUM_OFFLINE_SEGMENTS = 12;
+  protected static BaseLogicalTableIntegrationTest _sharedClusterTestSuite = 
null;
+
+  @BeforeSuite
+  public void setUpSuite()
+      throws Exception {
+    LOGGER.info("Setting up integration test suite");
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+    _sharedClusterTestSuite = this;
+
+    // Start the Pinot cluster
+    startZk();
+    LOGGER.info("Start Kafka in the integration test suite");
+    startKafka();
+    startController();
+    startBroker();
+    startServers(2);
+    LOGGER.info("Finished setting up integration test suite");
+  }
+
+  @AfterSuite
+  public void tearDownSuite()
+      throws Exception {
+    LOGGER.info("Tearing down integration test suite");
+    // Stop Kafka
+    LOGGER.info("Stop Kafka in the integration test suite");
+    stopKafka();
+    // Shutdown the Pinot cluster
+    stopServer();
+    stopBroker();
+    stopController();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);
+    LOGGER.info("Finished tearing down integration test suite");
+  }
+
+  @Override
+  protected String getTableName() {
+    return DEFAULT_TABLE_NAME;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+    if (_sharedClusterTestSuite != this) {
+      _controllerRequestURLBuilder = 
_sharedClusterTestSuite._controllerRequestURLBuilder;
+      _helixResourceManager = _sharedClusterTestSuite._helixResourceManager;
+    }
+
+    List<File> avroFiles = getAllAvroFiles();
+    int numSegmentsPerTable = NUM_OFFLINE_SEGMENTS / 
getOfflineTableNames().size();
+    int index = 0;
+    for (String tableName : getOfflineTableNames()) {
+      File tarDir = new File(_tarDir, tableName);
+
+      TestUtils.ensureDirectoriesExistAndEmpty(tarDir);
+
+      // Create and upload the schema and table config
+      Schema schema = createSchema(getSchemaFileName());
+      schema.setSchemaName(tableName);
+      addSchema(schema);
+      TableConfig offlineTableConfig = createOfflineTableConfig(tableName);
+      addTableConfig(offlineTableConfig);
+
+      List<File> offlineAvroFiles = new ArrayList<>(numSegmentsPerTable);
+      for (int i = index; i < index + numSegmentsPerTable; i++) {
+        offlineAvroFiles.add(avroFiles.get(i));
+      }
+      index += numSegmentsPerTable;
+
+      // Create and upload segments
+      ClusterIntegrationTestUtils.buildSegmentsFromAvro(offlineAvroFiles, 
offlineTableConfig, schema, 0, _segmentDir,
+          tarDir);
+      uploadSegments(tableName, tarDir);
+    }
+
+    createLogicalTable();
+
+    // Set up the H2 connection
+    setUpH2Connection(avroFiles);
+
+    // Initialize the query generator
+    setUpQueryGenerator(avroFiles);
+
+    // Wait for all documents loaded
+    waitForAllDocsLoaded(600_000L);
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    cleanup();
+  }
+
+  protected abstract List<String> getOfflineTableNames();
+
+  protected List<String> getPhysicalTableNames() {
+    return 
getOfflineTableNames().stream().map(TableNameBuilder.OFFLINE::tableNameWithType)
+        .collect(Collectors.toList());
+  }
+
+  protected String getLogicalTableName() {
+    return DEFAULT_LOGICAL_TABLE_NAME;
+  }
+
+  protected Map<String, String> getHeaders() {
+    return Map.of();
+  }
+
+  protected String getBrokerTenant() {
+    return DEFAULT_TENANT;
+  }
+
+  /**
+   * Creates a new OFFLINE table config.
+   */
+  protected TableConfig createOfflineTableConfig(String tableName) {
+    // @formatter:off
+    return new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName(tableName)
+        .setTimeColumnName(getTimeColumnName())
+        .setSortedColumn(getSortedColumn())
+        .setInvertedIndexColumns(getInvertedIndexColumns())
+        .setNoDictionaryColumns(getNoDictionaryColumns())
+        .setRangeIndexColumns(getRangeIndexColumns())
+        .setBloomFilterColumns(getBloomFilterColumns())
+        .setFieldConfigList(getFieldConfigs())
+        .setNumReplicas(getNumReplicas())
+        .setSegmentVersion(getSegmentVersion())
+        .setLoadMode(getLoadMode())
+        .setTaskConfig(getTaskConfig())
+        .setBrokerTenant(getBrokerTenant())
+        .setServerTenant(getServerTenant())
+        .setIngestionConfig(getIngestionConfig())
+        .setQueryConfig(getQueryConfig())
+        .setNullHandlingEnabled(getNullHandlingEnabled())
+        .setSegmentPartitionConfig(getSegmentPartitionConfig())
+        .build();
+    // @formatter:on
+  }
+
+  public static LogicalTableConfig getLogicalTableConfig(String tableName, 
List<String> physicalTableNames,
+      String brokerTenant) {
+    Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
+    for (String physicalTableName : physicalTableNames) {
+      physicalTableConfigMap.put(physicalTableName, new PhysicalTableConfig());
+    }
+    String offlineTableName =
+        
physicalTableNames.stream().filter(TableNameBuilder::isOfflineTableResource).findFirst().orElse(null);
+    String realtimeTableName =
+        
physicalTableNames.stream().filter(TableNameBuilder::isRealtimeTableResource).findFirst().orElse(null);
+    LogicalTableConfigBuilder builder =
+        new 
LogicalTableConfigBuilder().setTableName(tableName).setBrokerTenant(brokerTenant)
+            
.setRefOfflineTableName(offlineTableName).setRefRealtimeTableName(realtimeTableName)
+            .setPhysicalTableConfigMap(physicalTableConfigMap);
+    return builder.build();
+  }
+
+  protected void createLogicalTable()
+      throws IOException {
+    String addLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableCreate();
+    Schema logicalTableSchema = createSchema(getSchemaFileName());
+    logicalTableSchema.setSchemaName(getLogicalTableName());
+    addSchema(logicalTableSchema);
+    LogicalTableConfig logicalTable =
+        getLogicalTableConfig(getLogicalTableName(), getPhysicalTableNames(), 
getBrokerTenant());
+    String resp =
+        ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTable.toSingleLineJsonString(), getHeaders());
+    assertEquals(resp, "{\"unrecognizedProperties\":{},\"status\":\"" + 
getLogicalTableName()
+        + " logical table successfully added.\"}");
+  }
+
+  protected LogicalTableConfig getLogicalTableConfig(String logicalTableName)
+      throws IOException {
+    String getLogicalTableUrl =
+        _controllerRequestURLBuilder.forLogicalTableGet(logicalTableName);
+    String resp = ControllerTest.sendGetRequest(getLogicalTableUrl, 
getHeaders());
+    return LogicalTableConfig.fromString(resp);
+  }
+
+  protected void updateLogicalTableConfig(String logicalTableName, 
LogicalTableConfig logicalTableConfig)
+      throws IOException {
+    String updateLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableUpdate(logicalTableName);
+    String resp =
+        ControllerTest.sendPutRequest(updateLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+
+    assertEquals(resp, "{\"unrecognizedProperties\":{},\"status\":\"" + 
getLogicalTableName()
+        + " logical table successfully updated.\"}");
+  }
+
+  protected void deleteLogicalTable()
+      throws IOException {
+    String deleteLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableDelete(getLogicalTableName());
+    // delete logical table
+    String deleteResponse = 
ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, getHeaders());
+    assertEquals(deleteResponse, "{\"status\":\"" + getLogicalTableName() + " 
logical table successfully deleted.\"}");
+  }
+
+  @Override
+  protected void pushAvroIntoKafka(List<File> avroFiles)
+      throws Exception {
+    ClusterIntegrationTestUtils.pushAvroIntoKafka(avroFiles,
+        "localhost:" + 
_sharedClusterTestSuite._kafkaStarters.get(0).getPort(), getKafkaTopic(),
+        getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), 
getPartitionColumn(), injectTombstones());
+  }
+
+  @Override
+  public String getZkUrl() {
+    if (_sharedClusterTestSuite != this) {
+      return _sharedClusterTestSuite.getZkUrl();
+    }
+    return super.getZkUrl();
+  }
+
+  @Override
+  public ControllerRequestClient getControllerRequestClient() {
+    if (_sharedClusterTestSuite != this) {
+      return _sharedClusterTestSuite.getControllerRequestClient();
+    }
+    return super.getControllerRequestClient();
+  }
+
+  @Override
+  protected String getBrokerBaseApiUrl() {
+    if (_sharedClusterTestSuite != this) {
+      return _sharedClusterTestSuite.getBrokerBaseApiUrl();
+    }
+    return super.getBrokerBaseApiUrl();
+  }
+
+  @Override
+  protected String getBrokerGrpcEndpoint() {
+    if (_sharedClusterTestSuite != this) {
+      return _sharedClusterTestSuite.getBrokerGrpcEndpoint();
+    }
+    return super.getBrokerGrpcEndpoint();
+  }
+
+  @Override
+  public int getControllerPort() {
+    if (_sharedClusterTestSuite != this) {
+      return _sharedClusterTestSuite.getControllerPort();
+    }
+    return super.getControllerPort();
+  }
+
+  @Override
+  public int getRandomBrokerPort() {
+    if (_sharedClusterTestSuite != this) {
+      return _sharedClusterTestSuite.getRandomBrokerPort();
+    }
+    return super.getRandomBrokerPort();
+  }
+
+  @Override
+  public String getHelixClusterName() {
+    return "BaseLogicalTableIntegrationTest";
+  }
+
+  @Override
+  protected void waitForAllDocsLoaded(long timeoutMs)
+      throws Exception {
+    waitForDocsLoaded(timeoutMs, true, getLogicalTableName());
+  }
+
+  @Override
+  protected void setUpQueryGenerator(List<File> avroFiles) {
+    Assert.assertNull(_queryGenerator);
+    String tableName = getLogicalTableName();
+    _queryGenerator = new QueryGenerator(avroFiles, tableName, tableName);
+  }
+
+  @Test
+  public void verifyLogicalTableConfig()
+      throws IOException {
+    LogicalTableConfig logicalTableConfig = 
getLogicalTableConfig(getLogicalTableName());
+    assertEquals(logicalTableConfig.getPhysicalTableConfigMap().size(), 
getPhysicalTableNames().size());
+    assertEquals(new HashSet<>(getPhysicalTableNames()), 
logicalTableConfig.getPhysicalTableConfigMap().keySet());
+  }
+
+  @Test
+  public void testHardcodedQueries()
+      throws Exception {
+    super.testHardcodedQueries();
+  }
+
+  @Test
+  public void testQueriesFromQueryFile()
+      throws Exception {
+    super.testQueriesFromQueryFile();
+  }
+
+  @Test
+  public void testGeneratedQueries()
+      throws Exception {
+    super.testGeneratedQueries(true, false);
+  }
+
+  @Test
+  public void testDisableGroovyQueryTableConfigOverride()
+      throws Exception {
+    QueryConfig queryConfig = new QueryConfig(null, false, null, null, null, 
null);
+    LogicalTableConfig logicalTableConfig = 
getLogicalTableConfig(getLogicalTableName());
+    logicalTableConfig.setQueryConfig(queryConfig);
+    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+
+    String groovyQuery = "SELECT 
GROOVY('{\"returnType\":\"STRING\",\"isSingleValue\":true}', "
+        + "'arg0 + arg1', FlightNum, Origin) FROM mytable";
+
+    // Query should not throw exception
+    postQuery(groovyQuery);
+
+    // Disable groovy explicitly
+    queryConfig = new QueryConfig(null, true, null, null, null, null);
+
+    logicalTableConfig.setQueryConfig(queryConfig);
+    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+
+    // grpc and http throw different exceptions. So only check error message.
+    Exception athrows = expectThrows(Exception.class, () -> 
postQuery(groovyQuery));
+    assertTrue(athrows.getMessage().contains("Groovy transform functions are 
disabled for queries"));
+
+    // Remove query config
+    logicalTableConfig.setQueryConfig(null);
+    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+
+    athrows = expectThrows(Exception.class, () -> postQuery(groovyQuery));
+    assertTrue(athrows.getMessage().contains("Groovy transform functions are 
disabled for queries"));
+  }
+
+  @Test
+  public void testMaxQueryResponseSizeTableConfig()
+      throws Exception {
+    String starQuery = "SELECT * from mytable";
+
+    QueryConfig queryConfig = new QueryConfig(null, null, null, null, 100L, 
null);
+    LogicalTableConfig logicalTableConfig = 
getLogicalTableConfig(getLogicalTableName());
+    logicalTableConfig.setQueryConfig(queryConfig);
+    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+
+    JsonNode response = postQuery(starQuery);
+    JsonNode exceptions = response.get("exceptions");
+    assertTrue(!exceptions.isEmpty()
+        && exceptions.get(0).get("errorCode").asInt() == 
QueryErrorCode.QUERY_CANCELLATION.getId());
+
+    // Query Succeeds with a high limit.
+    queryConfig = new QueryConfig(null, null, null, null, 1000000L, null);
+    logicalTableConfig.setQueryConfig(queryConfig);
+    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+    response = postQuery(starQuery);
+    exceptions = response.get("exceptions");
+    assertTrue(exceptions.isEmpty(), "Query should not throw exception");
+
+    //Reset to null.
+    queryConfig = new QueryConfig(null, null, null, null, null, null);
+    logicalTableConfig.setQueryConfig(queryConfig);
+    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+    response = postQuery(starQuery);
+    exceptions = response.get("exceptions");
+    assertTrue(exceptions.isEmpty(), "Query should not throw exception");
+  }
+
+  @Test
+  public void testMaxServerResponseSizeTableConfig()
+      throws Exception {
+    String starQuery = "SELECT * from mytable";
+
+    QueryConfig queryConfig = new QueryConfig(null, null, null, null, null, 
1000L);
+    LogicalTableConfig logicalTableConfig = 
getLogicalTableConfig(getLogicalTableName());
+    logicalTableConfig.setQueryConfig(queryConfig);
+    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+    JsonNode response = postQuery(starQuery);
+    JsonNode exceptions = response.get("exceptions");
+    assertTrue(!exceptions.isEmpty()
+        && exceptions.get(0).get("errorCode").asInt() == 
QueryErrorCode.QUERY_CANCELLATION.getId());
+
+    // Query Succeeds with a high limit.
+    queryConfig = new QueryConfig(null, null, null, null, null, 1000000L);
+    logicalTableConfig.setQueryConfig(queryConfig);
+    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+    response = postQuery(starQuery);
+    exceptions = response.get("exceptions");
+    assertTrue(exceptions.isEmpty(), "Query should not throw exception");
+
+    //Reset to null.
+    queryConfig = new QueryConfig(null, null, null, null, null, null);
+    logicalTableConfig.setQueryConfig(queryConfig);
+    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+    response = postQuery(starQuery);
+    exceptions = response.get("exceptions");
+    assertTrue(exceptions.isEmpty(), "Query should not throw exception");
+  }
+
+  @Test
+  public void testQueryTimeOut()
+      throws Exception {
+    String starQuery = "SELECT * from mytable";
+    QueryConfig queryConfig = new QueryConfig(1L, null, null, null, null, 
null);
+    LogicalTableConfig logicalTableConfig = 
getLogicalTableConfig(getLogicalTableName());
+    logicalTableConfig.setQueryConfig(queryConfig);
+    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+    JsonNode response = postQuery(starQuery);
+    JsonNode exceptions = response.get("exceptions");
+    assertTrue(
+        !exceptions.isEmpty() && (exceptions.get(0).get("errorCode").asInt() 
== QueryErrorCode.BROKER_TIMEOUT.getId()
+            // Timeout may occur just before submitting the request. Then this 
error code is thrown.
+            || exceptions.get(0).get("errorCode").asInt() == 
QueryErrorCode.SERVER_NOT_RESPONDING.getId()));
+
+    // Query Succeeds with a high limit.
+    queryConfig = new QueryConfig(1000000L, null, null, null, null, null);
+    logicalTableConfig.setQueryConfig(queryConfig);
+    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+    response = postQuery(starQuery);
+    exceptions = response.get("exceptions");
+    assertTrue(exceptions.isEmpty(), "Query should not throw exception");
+
+    //Reset to null.
+    queryConfig = new QueryConfig(null, null, null, null, null, null);
+    logicalTableConfig.setQueryConfig(queryConfig);
+    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+    response = postQuery(starQuery);
+    exceptions = response.get("exceptions");
+    assertTrue(exceptions.isEmpty(), "Query should not throw exception");
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithOneOfflineTableIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithOneOfflineTableIntegrationTest.java
new file mode 100644
index 0000000000..fc9c75ec12
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithOneOfflineTableIntegrationTest.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.logicaltable;
+
+import java.util.List;
+
+
+public class LogicalTableWithOneOfflineTableIntegrationTest extends 
BaseLogicalTableIntegrationTest {
+  @Override
+  protected List<String> getOfflineTableNames() {
+    return List.of("physicalTable");
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwelveOfflineTablesIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwelveOfflineTablesIntegrationTest.java
new file mode 100644
index 0000000000..b6a484d37c
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwelveOfflineTablesIntegrationTest.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.logicaltable;
+
+import java.util.List;
+
+
+public class LogicalTableWithTwelveOfflineTablesIntegrationTest extends 
BaseLogicalTableIntegrationTest {
+  @Override
+  protected List<String> getOfflineTableNames() {
+    return List.of("physicalTable_0", "physicalTable_1", "physicalTable_2", 
"physicalTable_3", "physicalTable_4",
+        "physicalTable_5", "physicalTable_6", "physicalTable_7", 
"physicalTable_8", "physicalTable_9",
+        "physicalTable_10", "physicalTable_11");
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineTablesIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineTablesIntegrationTest.java
new file mode 100644
index 0000000000..46a4f91291
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineTablesIntegrationTest.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.logicaltable;
+
+import java.util.List;
+
+
+public class LogicalTableWithTwoOfflineTablesIntegrationTest extends 
BaseLogicalTableIntegrationTest {
+  @Override
+  protected List<String> getOfflineTableNames() {
+    return List.of("physicalTable_0", "physicalTable_1");
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteInfo.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteInfo.java
new file mode 100644
index 0000000000..04d3b76bed
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteInfo.java
@@ -0,0 +1,391 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.routing.table;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.InstanceRequest;
+import org.apache.pinot.common.request.TableSegmentsInfo;
+import org.apache.pinot.core.routing.ServerRouteInfo;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.core.transport.BaseTableRouteInfo;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.transport.TableRouteInfo;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+
+public class LogicalTableRouteInfo extends BaseTableRouteInfo {
+  private final LogicalTableConfig _logicalTable;
+  private List<TableRouteInfo> _offlineTables;
+  private List<TableRouteInfo> _realtimeTables;
+  private TableConfig _offlineTableConfig;
+  private TableConfig _realtimeTableConfig;
+  private QueryConfig _queryConfig;
+  private List<String> _unavailableSegments;
+  private int _numPrunedSegments = 0;
+
+  private BrokerRequest _offlineBrokerRequest;
+  private BrokerRequest _realtimeBrokerRequest;
+  private TimeBoundaryInfo _timeBoundaryInfo;
+
+  LogicalTableRouteInfo() {
+    _logicalTable = null;
+  }
+
+  public LogicalTableRouteInfo(LogicalTableConfig logicalTable) {
+    _logicalTable = logicalTable;
+  }
+
+  @Override
+  public Map<ServerRoutingInstance, InstanceRequest> getRequestMap(long 
requestId, String brokerId, boolean preferTls) {
+    Map<ServerInstance, List<TableSegmentsInfo>> offlineTableRouteInfo = new 
HashMap<>();
+    Map<ServerInstance, List<TableSegmentsInfo>> realtimeTableRouteInfo = new 
HashMap<>();
+
+    if (_offlineTables != null) {
+      for (TableRouteInfo physicalTableRoute : _offlineTables) {
+        if (physicalTableRoute.getOfflineRoutingTable() != null) {
+          for (Map.Entry<ServerInstance, ServerRouteInfo> entry : 
physicalTableRoute.getOfflineRoutingTable()
+              .entrySet()) {
+            TableSegmentsInfo tableSegmentsInfo = new TableSegmentsInfo();
+            
tableSegmentsInfo.setTableName(physicalTableRoute.getOfflineTableName());
+            tableSegmentsInfo.setSegments(entry.getValue().getSegments());
+            if 
(CollectionUtils.isNotEmpty(entry.getValue().getOptionalSegments())) {
+              
tableSegmentsInfo.setOptionalSegments(entry.getValue().getOptionalSegments());
+            }
+
+            offlineTableRouteInfo.computeIfAbsent(entry.getKey(), v -> new 
ArrayList<>()).add(tableSegmentsInfo);
+          }
+        }
+      }
+    }
+
+    if (_realtimeTables != null) {
+      for (TableRouteInfo physicalTableRoute : _realtimeTables) {
+        if (physicalTableRoute.getRealtimeRoutingTable() != null) {
+          for (Map.Entry<ServerInstance, ServerRouteInfo> entry : 
physicalTableRoute.getRealtimeRoutingTable()
+              .entrySet()) {
+            TableSegmentsInfo tableSegmentsInfo = new TableSegmentsInfo();
+            
tableSegmentsInfo.setTableName(physicalTableRoute.getRealtimeTableName());
+            tableSegmentsInfo.setSegments(entry.getValue().getSegments());
+            if 
(CollectionUtils.isNotEmpty(entry.getValue().getOptionalSegments())) {
+              
tableSegmentsInfo.setOptionalSegments(entry.getValue().getOptionalSegments());
+            }
+
+            realtimeTableRouteInfo.computeIfAbsent(entry.getKey(), v -> new 
ArrayList<>()).add(tableSegmentsInfo);
+          }
+        }
+      }
+    }
+
+    Map<ServerRoutingInstance, InstanceRequest> requestMap = new HashMap<>();
+
+    for (Map.Entry<ServerInstance, List<TableSegmentsInfo>> entry : 
offlineTableRouteInfo.entrySet()) {
+      requestMap.put(
+          new ServerRoutingInstance(entry.getKey().getHostname(), 
entry.getKey().getPort(), TableType.OFFLINE),
+          getInstanceRequest(requestId, brokerId, _offlineBrokerRequest, 
entry.getValue()));
+    }
+
+    for (Map.Entry<ServerInstance, List<TableSegmentsInfo>> entry : 
realtimeTableRouteInfo.entrySet()) {
+      requestMap.put(
+          new ServerRoutingInstance(entry.getKey().getHostname(), 
entry.getKey().getPort(), TableType.REALTIME),
+          getInstanceRequest(requestId, brokerId, _realtimeBrokerRequest, 
entry.getValue()));
+    }
+
+    return requestMap;
+  }
+
+  private InstanceRequest getInstanceRequest(long requestId, String brokerId, 
BrokerRequest brokerRequest,
+      List<TableSegmentsInfo> tableSegmentsInfoList) {
+    InstanceRequest instanceRequest = new InstanceRequest();
+    instanceRequest.setRequestId(requestId);
+    instanceRequest.setCid(QueryThreadContext.getCid());
+    instanceRequest.setQuery(brokerRequest);
+    Map<String, String> queryOptions = 
brokerRequest.getPinotQuery().getQueryOptions();
+    if (queryOptions != null) {
+      
instanceRequest.setEnableTrace(Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE)));
+    }
+    instanceRequest.setTableSegmentsInfoList(tableSegmentsInfoList);
+    instanceRequest.setBrokerId(brokerId);
+    return instanceRequest;
+  }
+
+  @Nullable
+  @Override
+  public TableConfig getOfflineTableConfig() {
+    return _offlineTableConfig;
+  }
+
+  public void setOfflineTableConfig(TableConfig offlineTableConfig) {
+    _offlineTableConfig = offlineTableConfig;
+  }
+
+  @Nullable
+  @Override
+  public TableConfig getRealtimeTableConfig() {
+    return _realtimeTableConfig;
+  }
+
+  public void setRealtimeTableConfig(TableConfig realtimeTableConfig) {
+    _realtimeTableConfig = realtimeTableConfig;
+  }
+
+  @Nullable
+  @Override
+  public QueryConfig getOfflineTableQueryConfig() {
+    return _queryConfig;
+  }
+
+  @Nullable
+  @Override
+  public QueryConfig getRealtimeTableQueryConfig() {
+    return _queryConfig;
+  }
+
+  public void setQueryConfig(QueryConfig queryConfig) {
+    _queryConfig = queryConfig;
+  }
+
+  @Override
+  public Set<ServerInstance> getOfflineExecutionServers() {
+    if (hasOffline()) {
+      Set<ServerInstance> offlineExecutionServers = new HashSet<>();
+      for (TableRouteInfo offlineTable : _offlineTables) {
+        if (offlineTable.isOfflineRouteExists()) {
+          Map<ServerInstance, ServerRouteInfo> offlineRoutingTable = 
offlineTable.getOfflineRoutingTable();
+          if (offlineRoutingTable != null) {
+            offlineExecutionServers.addAll(offlineRoutingTable.keySet());
+          }
+        }
+      }
+      return offlineExecutionServers;
+    }
+    return Set.of();
+  }
+
+  @Override
+  public Set<ServerInstance> getRealtimeExecutionServers() {
+    if (hasRealtime()) {
+      Set<ServerInstance> realtimeExecutionServers = new HashSet<>();
+      for (TableRouteInfo realtimeTable : _realtimeTables) {
+        if (realtimeTable.isRealtimeRouteExists()) {
+          Map<ServerInstance, ServerRouteInfo> realtimeRoutingTable = 
realtimeTable.getRealtimeRoutingTable();
+          if (realtimeRoutingTable != null) {
+            realtimeExecutionServers.addAll(realtimeRoutingTable.keySet());
+          }
+        }
+      }
+      return realtimeExecutionServers;
+    }
+    return Set.of();
+  }
+
+  @Nullable
+  @Override
+  public Map<ServerInstance, ServerRouteInfo> getOfflineRoutingTable() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Nullable
+  @Override
+  public Map<ServerInstance, ServerRouteInfo> getRealtimeRoutingTable() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean hasOffline() {
+    return _offlineTables != null && !_offlineTables.isEmpty();
+  }
+
+  @Override
+  public boolean hasRealtime() {
+    return _realtimeTables != null && !_realtimeTables.isEmpty();
+  }
+
+  @Nullable
+  @Override
+  public String getOfflineTableName() {
+    return hasOffline() && _logicalTable != null ? 
TableNameBuilder.OFFLINE.tableNameWithType(
+        _logicalTable.getTableName()) : null;
+  }
+
+  @Nullable
+  @Override
+  public String getRealtimeTableName() {
+    return hasRealtime() && _logicalTable != null ? 
TableNameBuilder.REALTIME.tableNameWithType(
+        _logicalTable.getTableName()) : null;
+  }
+
+  @Nullable
+  @Override
+  public BrokerRequest getOfflineBrokerRequest() {
+    return _offlineBrokerRequest;
+  }
+
+  @Nullable
+  @Override
+  public BrokerRequest getRealtimeBrokerRequest() {
+    return _realtimeBrokerRequest;
+  }
+
+  public boolean isOfflineRouteExists() {
+    if (_offlineTables != null) {
+      for (TableRouteInfo offlineTable : _offlineTables) {
+        if (offlineTable.isRouteExists()) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  public boolean isRealtimeRouteExists() {
+    if (_realtimeTables != null) {
+      for (TableRouteInfo realtimeTable : _realtimeTables) {
+        if (realtimeTable.isRouteExists()) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean isOfflineTableDisabled() {
+    if (_offlineTables != null) {
+      for (TableRouteInfo offlineTable : _offlineTables) {
+        if (!offlineTable.isDisabled()) {
+          return false;
+        }
+      }
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public boolean isRealtimeTableDisabled() {
+    if (_realtimeTables != null) {
+      for (TableRouteInfo realtimeTable : _realtimeTables) {
+        if (!realtimeTable.isDisabled()) {
+          return false;
+        }
+      }
+      return true;
+    }
+    return false;
+  }
+
+  @Nullable
+  @Override
+  public List<String> getDisabledTableNames() {
+    List<String> disabledTableNames = null;
+    if (_offlineTables != null) {
+      for (TableRouteInfo offlineTable : _offlineTables) {
+        if (offlineTable.isDisabled()) {
+          if (disabledTableNames == null) {
+            disabledTableNames = new ArrayList<>();
+          }
+          disabledTableNames.add(offlineTable.getOfflineTableName());
+        }
+      }
+    }
+
+    if (_realtimeTables != null) {
+      for (TableRouteInfo realtimeTable : _realtimeTables) {
+        if (realtimeTable.isDisabled()) {
+          if (disabledTableNames == null) {
+            disabledTableNames = new ArrayList<>();
+          }
+          disabledTableNames.add(realtimeTable.getRealtimeTableName());
+        }
+      }
+    }
+
+    return disabledTableNames;
+  }
+
+  // TODO: https://github.com/apache/pinot/issues/15640
+  @Nullable
+  @Override
+  public TimeBoundaryInfo getTimeBoundaryInfo() {
+    return _timeBoundaryInfo;
+  }
+
+  public void setTimeBoundaryInfo(TimeBoundaryInfo timeBoundaryInfo) {
+    _timeBoundaryInfo = timeBoundaryInfo;
+  }
+
+  @Override
+  public List<String> getUnavailableSegments() {
+    return _unavailableSegments;
+  }
+
+  @Override
+  public int getNumPrunedSegmentsTotal() {
+    return _numPrunedSegments;
+  }
+
+  @Nullable
+  public List<TableRouteInfo> getOfflineTables() {
+    return _offlineTables;
+  }
+
+  public void setOfflineTables(List<TableRouteInfo> offlineTables) {
+    _offlineTables = offlineTables;
+  }
+
+  @Nullable
+  public List<TableRouteInfo> getRealtimeTables() {
+    return _realtimeTables;
+  }
+
+  public void setRealtimeTables(List<TableRouteInfo> realtimeTables) {
+    _realtimeTables = realtimeTables;
+  }
+
+  public void setUnavailableSegments(List<String> unavailableSegments) {
+    _unavailableSegments = unavailableSegments;
+  }
+
+  public void setNumPrunedSegments(int numPrunedSegments) {
+    _numPrunedSegments = numPrunedSegments;
+  }
+
+  public void setOfflineBrokerRequest(BrokerRequest offlineBrokerRequest) {
+    _offlineBrokerRequest = offlineBrokerRequest;
+  }
+
+  public void setRealtimeBrokerRequest(BrokerRequest realtimeBrokerRequest) {
+    _realtimeBrokerRequest = realtimeBrokerRequest;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java
new file mode 100644
index 0000000000..60ddd345ae
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.routing.table;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.core.transport.TableRouteInfo;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LogicalTableRouteProvider implements TableRouteProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LogicalTableRouteProvider.class);
+
+  @Override
+  public TableRouteInfo getTableRouteInfo(String tableName, TableCache 
tableCache, RoutingManager routingManager) {
+    LogicalTableConfig logicalTable = 
tableCache.getLogicalTableConfig(tableName);
+    if (logicalTable == null) {
+      return new LogicalTableRouteInfo();
+    }
+
+    PhysicalTableRouteProvider routeProvider = new 
PhysicalTableRouteProvider();
+
+    List<TableRouteInfo> offlineTables = new ArrayList<>();
+    List<TableRouteInfo> realtimeTables = new ArrayList<>();
+    for (String physicalTableName : 
logicalTable.getPhysicalTableConfigMap().keySet()) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(physicalTableName);
+      Preconditions.checkNotNull(tableType);
+      TableRouteInfo physicalTableInfo =
+          routeProvider.getTableRouteInfo(physicalTableName, tableCache, 
routingManager);
+      if (physicalTableInfo.isExists()) {
+        if (tableType == TableType.OFFLINE) {
+          offlineTables.add(physicalTableInfo);
+        } else {
+          realtimeTables.add(physicalTableInfo);
+        }
+      }
+    }
+
+    LogicalTableRouteInfo routeInfo = new LogicalTableRouteInfo(logicalTable);
+    if (!offlineTables.isEmpty()) {
+      TableConfig offlineTableConfig = 
tableCache.getTableConfig(logicalTable.getRefOfflineTableName());
+      Preconditions.checkNotNull(offlineTableConfig,
+          "Offline table config not found: " + 
logicalTable.getRefOfflineTableName());
+      routeInfo.setOfflineTables(offlineTables);
+      routeInfo.setOfflineTableConfig(offlineTableConfig);
+    }
+    if (!realtimeTables.isEmpty()) {
+      TableConfig realtimeTableConfig = 
tableCache.getTableConfig(logicalTable.getRefRealtimeTableName());
+      Preconditions.checkNotNull(realtimeTableConfig,
+          "Realtime table config not found: " + 
logicalTable.getRefRealtimeTableName());
+      routeInfo.setRealtimeTables(realtimeTables);
+      routeInfo.setRealtimeTableConfig(realtimeTableConfig);
+    }
+    routeInfo.setQueryConfig(logicalTable.getQueryConfig());
+    return routeInfo;
+  }
+
+  @Override
+  public void calculateRoutes(TableRouteInfo tableRouteInfo, RoutingManager 
routingManager,
+      BrokerRequest offlineBrokerRequest, BrokerRequest realtimeBrokerRequest, 
long requestId) {
+    LogicalTableRouteInfo routeInfo = (LogicalTableRouteInfo) tableRouteInfo;
+    int numPrunedSegments = 0;
+    List<String> unavailableSegments = new ArrayList<>();
+    PhysicalTableRouteProvider routeProvider = new 
PhysicalTableRouteProvider();
+
+    if (routeInfo.getOfflineTables() != null) {
+      for (TableRouteInfo physicalTableInfo : routeInfo.getOfflineTables()) {
+        routeProvider.calculateRoutes(physicalTableInfo, routingManager, 
offlineBrokerRequest, null,
+            requestId);
+        numPrunedSegments += physicalTableInfo.getNumPrunedSegmentsTotal();
+        if (physicalTableInfo.getUnavailableSegments() != null) {
+          
unavailableSegments.addAll(physicalTableInfo.getUnavailableSegments());
+        }
+      }
+    }
+
+    if (routeInfo.getRealtimeTables() != null) {
+      for (TableRouteInfo physicalTableInfo : routeInfo.getRealtimeTables()) {
+        routeProvider.calculateRoutes(physicalTableInfo, routingManager, null, 
realtimeBrokerRequest,
+            requestId);
+        numPrunedSegments += physicalTableInfo.getNumPrunedSegmentsTotal();
+        if (physicalTableInfo.getUnavailableSegments() != null) {
+          
unavailableSegments.addAll(physicalTableInfo.getUnavailableSegments());
+        }
+      }
+    }
+
+    TimeBoundaryInfo timeBoundaryInfo = null;
+    if (routeInfo.hasRealtime() && routeInfo.hasOffline()) {
+      timeBoundaryInfo = 
routingManager.getTimeBoundaryInfo(routeInfo.getOfflineTables().get(0).getOfflineTableName());
+      if (timeBoundaryInfo == null) {
+        LOGGER.debug("No time boundary info found for hybrid table: ");
+        routeInfo.setOfflineTables(null);
+      } else {
+        routeInfo.setTimeBoundaryInfo(timeBoundaryInfo);
+      }
+    }
+
+    //Set BrokerRequests to NULL if there is no route.
+    if (routeInfo.getOfflineExecutionServers().isEmpty()) {
+      routeInfo.setOfflineBrokerRequest(null);
+    } else {
+      routeInfo.setOfflineBrokerRequest(offlineBrokerRequest);
+    }
+
+    if (routeInfo.getRealtimeExecutionServers().isEmpty()) {
+      routeInfo.setRealtimeBrokerRequest(null);
+    } else {
+      routeInfo.setRealtimeBrokerRequest(realtimeBrokerRequest);
+    }
+
+    routeInfo.setUnavailableSegments(unavailableSegments);
+    routeInfo.setNumPrunedSegments(numPrunedSegments);
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/PhysicalTableRouteProvider.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/PhysicalTableRouteProvider.java
new file mode 100644
index 0000000000..287afcdd7b
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/PhysicalTableRouteProvider.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.routing.table;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.routing.ServerRouteInfo;
+import org.apache.pinot.core.transport.ImplicitHybridTableRouteInfo;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.core.transport.TableRouteInfo;
+
+
+/**
+ * PhysicalTableRouteProvider is used to calculate the routing table for a 
physical table.
+ * It differs from ImplicitHybridTableRouteProvider in that it calls a 
different RoutingManager function to get the
+ * routing table.
+ * For Logical Tables, the broker request has the name of the logical table. 
It is inefficient to create a broker
+ * request for every physical table. Therefore the name of the physical table 
is passed explicitly to the
+ * RoutingManager.
+ *
+ * The relevant call is getRoutingTable(BrokerRequest, String, long)
+ */
+public class PhysicalTableRouteProvider extends 
ImplicitHybridTableRouteProvider {
+  @Override
+  public void calculateRoutes(TableRouteInfo tableRouteInfo, RoutingManager 
routingManager,
+      BrokerRequest offlineBrokerRequest, BrokerRequest realtimeBrokerRequest, 
long requestId) {
+    assert (tableRouteInfo.isExists());
+    String offlineTableName = tableRouteInfo.getOfflineTableName();
+    String realtimeTableName = tableRouteInfo.getRealtimeTableName();
+    Map<ServerInstance, ServerRouteInfo> offlineRoutingTable = null;
+    Map<ServerInstance, ServerRouteInfo> realtimeRoutingTable = null;
+    List<String> unavailableSegments = new ArrayList<>();
+    int numPrunedSegmentsTotal = 0;
+
+    if (offlineBrokerRequest != null) {
+      Preconditions.checkNotNull(offlineTableName);
+
+      // NOTE: Routing table might be null if table is just removed
+      RoutingTable routingTable = null;
+      if (!tableRouteInfo.isOfflineTableDisabled()) {
+        routingTable =
+            routingManager.getRoutingTable(offlineBrokerRequest, 
tableRouteInfo.getOfflineTableName(), requestId);
+      }
+      if (routingTable != null) {
+        unavailableSegments.addAll(routingTable.getUnavailableSegments());
+        Map<ServerInstance, ServerRouteInfo> serverInstanceToSegmentsMap =
+            routingTable.getServerInstanceToSegmentsMap();
+        if (!serverInstanceToSegmentsMap.isEmpty()) {
+          offlineRoutingTable = serverInstanceToSegmentsMap;
+        } else {
+          offlineBrokerRequest = null;
+        }
+        numPrunedSegmentsTotal += routingTable.getNumPrunedSegments();
+      } else {
+        offlineBrokerRequest = null;
+      }
+    }
+    if (realtimeBrokerRequest != null) {
+      Preconditions.checkNotNull(realtimeTableName);
+
+      // NOTE: Routing table might be null if table is just removed
+      RoutingTable routingTable = null;
+      if (!tableRouteInfo.isRealtimeTableDisabled()) {
+        routingTable =
+            routingManager.getRoutingTable(realtimeBrokerRequest, 
tableRouteInfo.getRealtimeTableName(), requestId);
+      }
+      if (routingTable != null) {
+        unavailableSegments.addAll(routingTable.getUnavailableSegments());
+        Map<ServerInstance, ServerRouteInfo> serverInstanceToSegmentsMap =
+            routingTable.getServerInstanceToSegmentsMap();
+        if (!serverInstanceToSegmentsMap.isEmpty()) {
+          realtimeRoutingTable = serverInstanceToSegmentsMap;
+        } else {
+          realtimeBrokerRequest = null;
+        }
+        numPrunedSegmentsTotal += routingTable.getNumPrunedSegments();
+      } else {
+        realtimeBrokerRequest = null;
+      }
+    }
+
+    ImplicitHybridTableRouteInfo hybridTableRouteInfo = 
(ImplicitHybridTableRouteInfo) tableRouteInfo;
+    hybridTableRouteInfo.setUnavailableSegments(unavailableSegments);
+    hybridTableRouteInfo.setNumPrunedSegmentsTotal(numPrunedSegmentsTotal);
+    hybridTableRouteInfo.setOfflineBrokerRequest(offlineBrokerRequest);
+    hybridTableRouteInfo.setRealtimeBrokerRequest(realtimeBrokerRequest);
+    hybridTableRouteInfo.setOfflineRoutingTable(offlineRoutingTable);
+    hybridTableRouteInfo.setRealtimeRoutingTable(realtimeRoutingTable);
+  }
+}
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java
index ac51f9ebf6..dd8d206b5a 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java
@@ -20,19 +20,37 @@ package org.apache.pinot.query.routing.table;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.ServerRouteInfo;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.core.transport.TableRouteInfo;
 import org.apache.pinot.query.testutils.MockRoutingManagerFactory;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
 
 public class BaseTableRouteTest {
   //@formatter:off
@@ -106,6 +124,7 @@ public class BaseTableRouteTest {
   RoutingManager _routingManager;
   TableCache _tableCache;
   ImplicitHybridTableRouteProvider _hybridTableRouteProvider;
+  LogicalTableRouteProvider _logicalTableRouteProvider;
 
 
   @BeforeClass
@@ -134,6 +153,7 @@ public class BaseTableRouteTest {
     _routingManager = factory.buildRoutingManager(null);
     _tableCache = factory.buildTableCache();
     _hybridTableRouteProvider = new ImplicitHybridTableRouteProvider();
+    _logicalTableRouteProvider = new LogicalTableRouteProvider();
   }
 
   @DataProvider(name = "offlineTableProvider")
@@ -198,6 +218,26 @@ public class BaseTableRouteTest {
     //@formatter:on
   }
 
+  @DataProvider(name = "routeExistsProvider")
+  public static Object[][] routeExistsProvider() {
+    //@formatter:off
+    return new Object[][] {
+        {"a"},
+        {"a_REALTIME"},
+        {"b"},
+        {"b_OFFLINE"},
+        {"b_REALTIME"},
+        {"c"},
+        {"c_OFFLINE"},
+        {"d"},
+        {"d_OFFLINE"},
+        {"e"},
+        {"e_OFFLINE"},
+        {"e_REALTIME"}
+    };
+    //@formatter:on
+  }
+
   @DataProvider(name = "routeNotExistsProvider")
   public static Object[][] routeNotExistsProvider() {
     //@formatter:off
@@ -263,4 +303,130 @@ public class BaseTableRouteTest {
     };
     //@formatter:on
   }
+
+  private static final String QUERY_FORMAT = "SELECT col1, col2 FROM %s LIMIT 
10";
+
+  static class BrokerRequestPair {
+    public final BrokerRequest _offlineBrokerRequest;
+    public final BrokerRequest _realtimeBrokerRequest;
+
+    public BrokerRequestPair(BrokerRequest offlineBrokerRequest, BrokerRequest 
realtimeBrokerRequest) {
+      _offlineBrokerRequest = offlineBrokerRequest;
+      _realtimeBrokerRequest = realtimeBrokerRequest;
+    }
+  }
+
+  static BrokerRequestPair getBrokerRequestPair(String tableName, boolean 
hasOffline, boolean hasRealtime,
+      String offlineTableName, String realtimeTableName) {
+    String query = String.format(QUERY_FORMAT, tableName);
+    BrokerRequest brokerRequest =
+        
CalciteSqlCompiler.convertToBrokerRequest(CalciteSqlParser.compileToPinotQuery(query));
+    BrokerRequest offlineBrokerRequest = null;
+    BrokerRequest realtimeBrokerRequest = null;
+
+    if (hasOffline) {
+      PinotQuery offlinePinotQuery = brokerRequest.getPinotQuery().deepCopy();
+      offlinePinotQuery.getDataSource().setTableName(offlineTableName);
+      offlineBrokerRequest = 
CalciteSqlCompiler.convertToBrokerRequest(offlinePinotQuery);
+    }
+
+    if (hasRealtime) {
+      PinotQuery realtimePinotQuery = brokerRequest.getPinotQuery().deepCopy();
+      realtimePinotQuery.getDataSource().setTableName(realtimeTableName);
+      realtimeBrokerRequest = 
CalciteSqlCompiler.convertToBrokerRequest(realtimePinotQuery);
+    }
+
+    return new BrokerRequestPair(offlineBrokerRequest, realtimeBrokerRequest);
+  }
+
+  @DataProvider(name = "offlineTableAndRouteProvider")
+  public static Object[][] offlineTableAndRouteProvider() {
+    //@formatter:off
+    return new Object[][] {
+        {"b_OFFLINE", Map.of("Server_localhost_2", ImmutableSet.of("b2"))},
+        {"c_OFFLINE", Map.of("Server_localhost_1", ImmutableSet.of("c1"),
+            "Server_localhost_2", ImmutableSet.of("c2", "c3"))},
+        {"d_OFFLINE", Map.of("Server_localhost_1", ImmutableSet.of("d1"),
+            "Server_localhost_2", ImmutableSet.of("d3"))},
+        {"e_OFFLINE", Map.of("Server_localhost_1", ImmutableSet.of("e1"),
+            "Server_localhost_2", ImmutableSet.of("e3"))},
+    };
+    //@formatter:on
+  }
+
+  @DataProvider(name = "realtimeTableAndRouteProvider")
+  public static Object[][] realtimeTableAndRouteProvider() {
+    //@formatter:off
+    return new Object[][] {
+        {"a_REALTIME", Map.of("Server_localhost_1", ImmutableSet.of("a1", 
"a2"),
+            "Server_localhost_2", ImmutableSet.of("a3"))},
+        {"b_REALTIME", Map.of("Server_localhost_1", ImmutableSet.of("b1"))},
+        {"e_REALTIME", Map.of("Server_localhost_2", ImmutableSet.of("e2"))},
+    };
+    //@formatter:on
+  }
+
+  @DataProvider(name = "hybridTableAndRouteProvider")
+  public static Object[][] hybridTableAndRouteProvider() {
+    //@formatter:off
+    return new Object[][] {
+        {"d", Map.of("Server_localhost_1", ImmutableSet.of("d1"),
+            "Server_localhost_2", ImmutableSet.of("d3")), null},
+        {"e", Map.of("Server_localhost_1", ImmutableSet.of("e1"),
+            "Server_localhost_2", ImmutableSet.of("e3")),
+            Map.of("Server_localhost_2", ImmutableSet.of("e2"))},
+    };
+    //@formatter:on
+  }
+
+  @DataProvider(name = "partiallyDisabledTableAndRouteProvider")
+  public static Object[][] partiallyDisabledTableAndRouteProvider() {
+    //@formatter:off
+    return new Object[][] {
+        {"hybrid_o_disabled", null, Map.of("Server_localhost_1", 
ImmutableSet.of("hor1"),
+            "Server_localhost_2", ImmutableSet.of("hor2"))},
+        {"hybrid_r_disabled", Map.of("Server_localhost_1", 
ImmutableSet.of("hro1"),
+            "Server_localhost_2", ImmutableSet.of("hro2")), null},
+    };
+    //@formatter:on
+  }
+
+  protected TableRouteInfo getLogicalTableRouteInfo(String tableName, String 
logicalTableName) {
+    LogicalTableConfigBuilder builder = new LogicalTableConfigBuilder();
+    builder.setTableName(logicalTableName);
+    Map<String, PhysicalTableConfig> tableConfigMap = new HashMap<>();
+    if (TableNameBuilder.getTableTypeFromTableName(tableName) == null) {
+      // Generate offline and realtime table names
+      
tableConfigMap.put(TableNameBuilder.OFFLINE.tableNameWithType(tableName), new 
PhysicalTableConfig());
+      
tableConfigMap.put(TableNameBuilder.REALTIME.tableNameWithType(tableName), new 
PhysicalTableConfig());
+      
builder.setRefOfflineTableName(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
+      
builder.setRefRealtimeTableName(TableNameBuilder.REALTIME.tableNameWithType(tableName));
+    } else if (TableNameBuilder.getTableTypeFromTableName(tableName) == 
TableType.OFFLINE) {
+      tableConfigMap.put(tableName, new PhysicalTableConfig());
+      builder.setRefOfflineTableName(tableName);
+    } else if (TableNameBuilder.getTableTypeFromTableName(tableName) == 
TableType.REALTIME) {
+      tableConfigMap.put(tableName, new PhysicalTableConfig());
+      builder.setRefRealtimeTableName(tableName);
+    } else {
+      throw new IllegalArgumentException("Invalid table type");
+    }
+
+    builder.setPhysicalTableConfigMap(tableConfigMap);
+    builder.setBrokerTenant("brokerTenant");
+    LogicalTableConfig logicalTable = builder.build();
+    
when(_tableCache.getLogicalTableConfig(eq(logicalTableName))).thenReturn(logicalTable);
+
+    return _logicalTableRouteProvider.getTableRouteInfo(logicalTableName, 
_tableCache, _routingManager);
+  }
+
+  static void assertRoutingTableEqual(Map<ServerInstance, ServerRouteInfo> 
routeComputer,
+      Map<String, Set<String>> expectedRealtimeRoutingTable) {
+    for (Map.Entry<ServerInstance, ServerRouteInfo> entry : 
routeComputer.entrySet()) {
+      ServerInstance serverInstance = entry.getKey();
+      ServerRouteInfo serverRouteInfo = entry.getValue();
+      Set<String> segments = 
ImmutableSet.copyOf(serverRouteInfo.getSegments());
+      
assertTrue(expectedRealtimeRoutingTable.containsKey(serverInstance.toString()));
+      
assertEquals(expectedRealtimeRoutingTable.get(serverInstance.toString()), 
segments);
+    }
+  }
 }
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderCalculateRouteTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderCalculateRouteTest.java
index e66c8ce849..6beb5d3ec6 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderCalculateRouteTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderCalculateRouteTest.java
@@ -18,14 +18,12 @@
  */
 package org.apache.pinot.query.routing.table;
 
-import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.InstanceRequest;
-import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.routing.RoutingTable;
 import org.apache.pinot.core.routing.ServerRouteInfo;
@@ -35,11 +33,8 @@ import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.query.QueryThreadContext;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
-import org.apache.pinot.sql.parsers.CalciteSqlParser;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.*;
@@ -49,7 +44,6 @@ import static org.testng.Assert.*;
  * Test class for {@link ImplicitHybridTableRouteProvider} to test the routing 
table calculation (calculateRoutes)
  */
 public class ImplicitHybridTableRouteProviderCalculateRouteTest extends 
BaseTableRouteTest {
-  private static final String QUERY_FORMAT = "SELECT col1, col2 FROM %s LIMIT 
10";
   private QueryThreadContext.CloseableContext _closeableContext;
 
   @BeforeMethod
@@ -65,91 +59,6 @@ public class 
ImplicitHybridTableRouteProviderCalculateRouteTest extends BaseTabl
     }
   }
 
-  @DataProvider(name = "offlineTableAndRouteProvider")
-  public static Object[][] offlineTableAndRouteProvider() {
-    //@formatter:off
-    return new Object[][] {
-        {"b_OFFLINE", Map.of("Server_localhost_2", ImmutableSet.of("b2"))},
-        {"c_OFFLINE", Map.of("Server_localhost_1", ImmutableSet.of("c1"),
-            "Server_localhost_2", ImmutableSet.of("c2", "c3"))},
-        {"d_OFFLINE", Map.of("Server_localhost_1", ImmutableSet.of("d1"),
-            "Server_localhost_2", ImmutableSet.of("d3"))},
-        {"e_OFFLINE", Map.of("Server_localhost_1", ImmutableSet.of("e1"),
-            "Server_localhost_2", ImmutableSet.of("e3"))},
-    };
-    //@formatter:on
-  }
-
-  @DataProvider(name = "realtimeTableAndRouteProvider")
-  public static Object[][] realtimeTableAndRouteProvider() {
-    //@formatter:off
-    return new Object[][] {
-        {"a_REALTIME", Map.of("Server_localhost_1", ImmutableSet.of("a1", 
"a2"),
-            "Server_localhost_2", ImmutableSet.of("a3"))},
-        {"b_REALTIME", Map.of("Server_localhost_1", ImmutableSet.of("b1"))},
-        {"e_REALTIME", Map.of("Server_localhost_2", ImmutableSet.of("e2"))},
-    };
-    //@formatter:on
-  }
-
-  @DataProvider(name = "hybridTableAndRouteProvider")
-  public static Object[][] hybridTableAndRouteProvider() {
-    //@formatter:off
-    return new Object[][] {
-        {"d", Map.of("Server_localhost_1", ImmutableSet.of("d1"),
-            "Server_localhost_2", ImmutableSet.of("d3")), null},
-        {"e", Map.of("Server_localhost_1", ImmutableSet.of("e1"),
-            "Server_localhost_2", ImmutableSet.of("e3")),
-            Map.of("Server_localhost_2", ImmutableSet.of("e2"))},
-    };
-    //@formatter:on
-  }
-
-  @DataProvider(name = "partiallyDisabledTableAndRouteProvider")
-  public static Object[][] partiallyDisabledTableAndRouteProvider() {
-    //@formatter:off
-    return new Object[][] {
-        {"hybrid_o_disabled", null, Map.of("Server_localhost_1", 
ImmutableSet.of("hor1"),
-            "Server_localhost_2", ImmutableSet.of("hor2"))},
-        {"hybrid_r_disabled", Map.of("Server_localhost_1", 
ImmutableSet.of("hro1"),
-            "Server_localhost_2", ImmutableSet.of("hro2")), null},
-    };
-    //@formatter:on
-  }
-
-  static class BrokerRequestPair {
-    public final BrokerRequest _offlineBrokerRequest;
-    public final BrokerRequest _realtimeBrokerRequest;
-
-    public BrokerRequestPair(BrokerRequest offlineBrokerRequest, BrokerRequest 
realtimeBrokerRequest) {
-      _offlineBrokerRequest = offlineBrokerRequest;
-      _realtimeBrokerRequest = realtimeBrokerRequest;
-    }
-  }
-
-  private static BrokerRequestPair getBrokerRequestPair(String tableName, 
boolean hasOffline, boolean hasRealtime,
-      String offlineTableName, String realtimeTableName) {
-    String query = String.format(QUERY_FORMAT, tableName);
-    BrokerRequest brokerRequest =
-        
CalciteSqlCompiler.convertToBrokerRequest(CalciteSqlParser.compileToPinotQuery(query));
-    BrokerRequest offlineBrokerRequest = null;
-    BrokerRequest realtimeBrokerRequest = null;
-
-    if (hasOffline) {
-      PinotQuery offlinePinotQuery = brokerRequest.getPinotQuery().deepCopy();
-      offlinePinotQuery.getDataSource().setTableName(offlineTableName);
-      offlineBrokerRequest = 
CalciteSqlCompiler.convertToBrokerRequest(offlinePinotQuery);
-    }
-
-    if (hasRealtime) {
-      PinotQuery realtimePinotQuery = brokerRequest.getPinotQuery().deepCopy();
-      realtimePinotQuery.getDataSource().setTableName(realtimeTableName);
-      realtimeBrokerRequest = 
CalciteSqlCompiler.convertToBrokerRequest(realtimePinotQuery);
-    }
-
-    return new BrokerRequestPair(offlineBrokerRequest, realtimeBrokerRequest);
-  }
-
   private ImplicitHybridTableRouteInfo getImplicitHybridTableRouteInfo(String 
tableName) {
     ImplicitHybridTableRouteInfo routeInfo =
         _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, 
_routingManager);
@@ -201,17 +110,6 @@ public class 
ImplicitHybridTableRouteProviderCalculateRouteTest extends BaseTabl
     }
   }
 
-  private static void assertRoutingTableEqual(Map<ServerInstance, 
ServerRouteInfo> routeComputer,
-      Map<String, Set<String>> expectedRealtimeRoutingTable) {
-    for (Map.Entry<ServerInstance, ServerRouteInfo> entry : 
routeComputer.entrySet()) {
-      ServerInstance serverInstance = entry.getKey();
-      ServerRouteInfo serverRouteInfo = entry.getValue();
-      Set<String> segments = 
ImmutableSet.copyOf(serverRouteInfo.getSegments());
-      
assertTrue(expectedRealtimeRoutingTable.containsKey(serverInstance.toString()));
-      
assertEquals(expectedRealtimeRoutingTable.get(serverInstance.toString()), 
segments);
-    }
-  }
-
   @Test(dataProvider = "offlineTableAndRouteProvider")
   void testOfflineTableRoute(String tableName, Map<String, Set<String>> 
expectedRoutingTable) {
     assertTableRoute(tableName, expectedRoutingTable, null, true, false);
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderGetTableRouteTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderGetTableRouteTest.java
index aa5573e92a..2a5e046f49 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderGetTableRouteTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderGetTableRouteTest.java
@@ -86,26 +86,6 @@ public class 
ImplicitHybridTableRouteProviderGetTableRouteTest extends BaseTable
     assertFalse(routeInfo.isExists(), "The table should not exist");
   }
 
-  @DataProvider(name = "routeExistsProvider")
-  public static Object[][] routeExistsProvider() {
-    //@formatter:off
-    return new Object[][] {
-        {"a"},
-        {"a_REALTIME"},
-        {"b"},
-        {"b_OFFLINE"},
-        {"b_REALTIME"},
-        {"c"},
-        {"c_OFFLINE"},
-        {"d"},
-        {"d_OFFLINE"},
-        {"e"},
-        {"e_OFFLINE"},
-        {"e_REALTIME"}
-    };
-    //@formatter:on
-  }
-
   @Test(dataProvider = "routeExistsProvider")
   public void testRouteExists(String parameter) {
     ImplicitHybridTableRouteInfo routeInfo =
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderCalculateRouteTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderCalculateRouteTest.java
new file mode 100644
index 0000000000..f7280b3319
--- /dev/null
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderCalculateRouteTest.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.routing.table;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.request.InstanceRequest;
+import org.apache.pinot.core.routing.ServerRouteInfo;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.transport.TableRouteInfo;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+public class LogicalTableRouteProviderCalculateRouteTest extends 
BaseTableRouteTest {
+  private QueryThreadContext.CloseableContext _closeableContext;
+
+  @BeforeMethod
+  public void setupQueryThreadContext() {
+    _closeableContext = QueryThreadContext.open();
+  }
+
+  @AfterMethod
+  void closeQueryThreadContext() {
+    if (_closeableContext != null) {
+      _closeableContext.close();
+      _closeableContext = null;
+    }
+  }
+
+  private void assertTableRoute(String tableName, String logicalTableName,
+      Map<String, Set<String>> expectedOfflineRoutingTable, Map<String, 
Set<String>> expectedRealtimeRoutingTable,
+      boolean isOfflineExpected, boolean isRealtimeExpected) {
+    TableRouteInfo routeInfo = getLogicalTableRouteInfo(tableName, 
logicalTableName);
+    BrokerRequestPair brokerRequestPair =
+        getBrokerRequestPair(logicalTableName, routeInfo.hasOffline(), 
routeInfo.hasRealtime(),
+            routeInfo.getOfflineTableName(), routeInfo.getRealtimeTableName());
+
+    _logicalTableRouteProvider.calculateRoutes(routeInfo, _routingManager, 
brokerRequestPair._offlineBrokerRequest,
+        brokerRequestPair._realtimeBrokerRequest, 0);
+    LogicalTableRouteInfo logicalTableRouteInfo = (LogicalTableRouteInfo) 
routeInfo;
+
+    if (isOfflineExpected) {
+      assertNotNull(logicalTableRouteInfo.getOfflineTables());
+      assertEquals(logicalTableRouteInfo.getOfflineTables().size(), 1);
+      Map<ServerInstance, ServerRouteInfo> offlineRoutingTable =
+          
logicalTableRouteInfo.getOfflineTables().get(0).getOfflineRoutingTable();
+      assertNotNull(offlineRoutingTable);
+      assertRoutingTableEqual(offlineRoutingTable, 
expectedOfflineRoutingTable);
+    } else {
+      assertTrue(logicalTableRouteInfo.getOfflineExecutionServers().isEmpty());
+    }
+
+    if (isRealtimeExpected) {
+      assertNotNull(logicalTableRouteInfo.getRealtimeTables());
+      assertEquals(logicalTableRouteInfo.getRealtimeTables().size(), 1);
+      Map<ServerInstance, ServerRouteInfo> realtimeRoutingTable =
+          
logicalTableRouteInfo.getRealtimeTables().get(0).getRealtimeRoutingTable();
+      assertNotNull(realtimeRoutingTable);
+      assertRoutingTableEqual(realtimeRoutingTable, 
expectedRealtimeRoutingTable);
+    } else {
+      
assertTrue(logicalTableRouteInfo.getRealtimeExecutionServers().isEmpty());
+    }
+
+    if (!isOfflineExpected && !isRealtimeExpected) {
+      assertTrue(routeInfo.getOfflineBrokerRequest() == null && 
routeInfo.getRealtimeBrokerRequest() == null);
+    } else {
+      assertFalse(routeInfo.getOfflineBrokerRequest() == null && 
routeInfo.getRealtimeBrokerRequest() == null);
+      // Check requestMap
+      Map<ServerRoutingInstance, InstanceRequest> requestMap = 
routeInfo.getRequestMap(0, "broker", false);
+      assertNotNull(requestMap);
+      assertFalse(requestMap.isEmpty());
+    }
+  }
+
+  @Test(dataProvider = "offlineTableAndRouteProvider")
+  void testOfflineTableRoute(String tableName, Map<String, Set<String>> 
expectedRoutingTable) {
+    assertTableRoute(tableName, "offlineTableAndRouteProvider", 
expectedRoutingTable, null, true, false);
+  }
+
+  @Test(dataProvider = "realtimeTableAndRouteProvider")
+  void testRealtimeTableRoute(String tableName, Map<String, Set<String>> 
expectedRoutingTable) {
+    assertTableRoute(tableName, "realtimeTableAndRouteProvider", null, 
expectedRoutingTable, false, true);
+  }
+
+  @Test(dataProvider = "routeNotExistsProvider")
+  void testRouteNotExists(String tableName) {
+    assertTableRoute(tableName, "routeNotExistsProvider", null, null, false, 
false);
+  }
+
+  @Test(dataProvider = "partiallyDisabledTableAndRouteProvider")
+  void testPartiallyDisabledTable(String tableName, Map<String, Set<String>> 
expectedOfflineRoutingTable,
+      Map<String, Set<String>> expectedRealtimeRoutingTable) {
+    assertTableRoute(tableName, "partiallyDisabledTableAndRouteProvider", 
expectedOfflineRoutingTable,
+        expectedRealtimeRoutingTable, expectedOfflineRoutingTable != null, 
expectedRealtimeRoutingTable != null);
+  }
+
+  @Test(dataProvider = "disabledTableProvider")
+  void testDisabledTable(String tableName) {
+    assertTableRoute(tableName, "disabledTableProvider", null, null, false, 
false);
+  }
+}
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderGetRouteTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderGetRouteTest.java
new file mode 100644
index 0000000000..e4417a9daa
--- /dev/null
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderGetRouteTest.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.routing.table;
+
+import com.google.common.collect.ImmutableList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.transport.TableRouteInfo;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+
+
+public class LogicalTableRouteProviderGetRouteTest extends BaseTableRouteTest {
+  @Test
+  void testWithUnknownTable() {
+    String logicalTableName = "testWithUnknownTable";
+    
when(_tableCache.getLogicalTableConfig(eq(logicalTableName))).thenReturn(null);
+    TableRouteInfo routeInfo =
+        _logicalTableRouteProvider.getTableRouteInfo(logicalTableName, 
_tableCache, _routingManager);
+    assertFalse(routeInfo.isExists());
+    assertFalse(routeInfo.isHybrid());
+    assertFalse(routeInfo.isOffline());
+    assertFalse(routeInfo.isRealtime());
+    assertFalse(routeInfo.isRouteExists());
+    assertFalse(routeInfo.isDisabled());
+    assertNull(routeInfo.getOfflineTableName());
+    assertNull(routeInfo.getRealtimeTableName());
+    assertNull(routeInfo.getDisabledTableNames());
+    assertNull(routeInfo.getTimeBoundaryInfo());
+    assertNull(routeInfo.getOfflineTableConfig());
+    assertNull(routeInfo.getRealtimeTableConfig());
+    assertNull(routeInfo.getOfflineBrokerRequest());
+    assertNull(routeInfo.getRealtimeBrokerRequest());
+    assertNull(routeInfo.getUnavailableSegments());
+
+    LogicalTableRouteInfo logicalTableRouteInfo = (LogicalTableRouteInfo) 
routeInfo;
+    assertNull(logicalTableRouteInfo.getOfflineTables());
+    assertNull(logicalTableRouteInfo.getRealtimeTables());
+  }
+
+  @Test(dataProvider = "offlineTableProvider")
+  public void testOfflineTable(String tableName) {
+    TableRouteInfo routeInfo = getLogicalTableRouteInfo(tableName, 
"testOfflineTable");
+    assertTrue(routeInfo.isExists(), "The table should exist");
+    assertTrue(routeInfo.isOffline(), "The table should be offline");
+  }
+
+  @Test(dataProvider = "realtimeTableProvider")
+  public void testRealtimeTable(String tableName) {
+    TableRouteInfo routeInfo = getLogicalTableRouteInfo(tableName, 
"testRealtimeTable");
+    assertTrue(routeInfo.isExists(), "The table should exist");
+    assertTrue(routeInfo.isRealtime(), "The table should be realtime");
+  }
+
+  @Test(dataProvider = "hybridTableProvider")
+  public void testHybridTable(String tableName) {
+    TableRouteInfo routeInfo = getLogicalTableRouteInfo(tableName, 
"testHybridTable");
+    assertTrue(routeInfo.isExists(), "The table should exist");
+    assertTrue(routeInfo.isHybrid(), "The table should be hybrid");
+  }
+
+  @Test(dataProvider = "routeExistsProvider")
+  public void testRouteExists(String tableName) {
+    TableRouteInfo routeInfo = getLogicalTableRouteInfo(tableName, 
"testRouteExists");
+
+    assertTrue(routeInfo.isExists(), "The table should exist");
+    assertTrue(routeInfo.isRouteExists(), "The table should have route");
+  }
+
+  @Test(dataProvider = "routeNotExistsProvider")
+  public void testRouteNotExists(String tableName) {
+    TableRouteInfo routeInfo = getLogicalTableRouteInfo(tableName, 
"testRouteNotExists");
+
+    assertTrue(routeInfo.isExists(), "The table should exist");
+    assertFalse(routeInfo.isRouteExists(), "The table should not have route");
+  }
+
+  @DataProvider(name = "offlineTableList")
+  public Object[][] offlineTableList() {
+    return new Object[][]{
+        {ImmutableList.of("b_OFFLINE")}, {ImmutableList.of("b_OFFLINE", 
"c_OFFLINE")}, {
+        ImmutableList.of("b_OFFLINE", "c_OFFLINE", "d_OFFLINE")
+    }, {ImmutableList.of("b_OFFLINE", "c_OFFLINE", "d_OFFLINE", "e_OFFLINE")},
+    };
+  }
+
+  @Test(dataProvider = "notDisabledTableProvider")
+  public void testNotDisabledTable(String tableName) {
+    TableRouteInfo routeInfo = getLogicalTableRouteInfo(tableName, 
"testNotDisabledTable");
+    assertTrue(routeInfo.isExists(), "The table should exist");
+    assertFalse(routeInfo.isDisabled(), "The table should not be disabled");
+  }
+
+  @Test(dataProvider = "partiallyDisabledTableProvider")
+  public void testPartiallyDisabledTable(String tableName) {
+    TableRouteInfo routeInfo = getLogicalTableRouteInfo(tableName, 
"testPartiallyDisabledTable");
+    assertTrue(routeInfo.isExists(), "The table should exist");
+    assertFalse(routeInfo.isDisabled(), "The table should be disabled");
+  }
+
+  @Test(dataProvider = "disabledTableProvider")
+  public void testDisabledTable(String tableName) {
+    TableRouteInfo routeInfo = getLogicalTableRouteInfo(tableName, 
"testDisabledTable");
+    assertTrue(routeInfo.isExists(), "The table should exist");
+    assertTrue(routeInfo.isDisabled(), "The table should not have route");
+  }
+
+  @Test(dataProvider = "offlineTableList")
+  void testWithOfflineTables(List<String> physicalTableNames) {
+    String logicalTableName = "testWithOfflineTables";
+    LogicalTableConfig logicalTable = new LogicalTableConfig();
+    logicalTable.setTableName(logicalTableName);
+    Map<String, PhysicalTableConfig> tableConfigMap = new HashMap<>();
+    for (String tableName : physicalTableNames) {
+      tableConfigMap.put(tableName, new PhysicalTableConfig());
+    }
+    logicalTable.setPhysicalTableConfigMap(tableConfigMap);
+    logicalTable.setBrokerTenant("brokerTenant");
+    logicalTable.setRefOfflineTableName(physicalTableNames.get(0));
+    
when(_tableCache.getLogicalTableConfig(eq(logicalTableName))).thenReturn(logicalTable);
+
+    TableRouteInfo routeInfo =
+        _logicalTableRouteProvider.getTableRouteInfo(logicalTableName, 
_tableCache, _routingManager);
+    assertNotNull(routeInfo.getOfflineTableConfig());
+    assertNull(routeInfo.getRealtimeTableConfig());
+    assertTrue(routeInfo.isExists());
+    assertFalse(routeInfo.isHybrid());
+    assertTrue(routeInfo.isOffline());
+    assertEquals(routeInfo.getOfflineTableName(), 
"testWithOfflineTables_OFFLINE");
+    assertNull(routeInfo.getRealtimeTableName());
+    assertTrue(routeInfo.isRouteExists());
+    assertFalse(routeInfo.isDisabled());
+    assertNull(routeInfo.getDisabledTableNames());
+    assertNull(routeInfo.getTimeBoundaryInfo());
+  }
+
+  @DataProvider(name = "realtimeTableList")
+  public Object[][] realtimeTableList() {
+    return new Object[][]{
+        {ImmutableList.of("a_REALTIME")}, {ImmutableList.of("a_REALTIME", 
"b_REALTIME")}, {
+        ImmutableList.of("a_REALTIME", "b_REALTIME", "e_REALTIME")
+    }
+    };
+  }
+
+  @Test(dataProvider = "realtimeTableList")
+  void testWithRealtimeTables(List<String> physicalTableNames) {
+    String logicalTableName = "testWithRealtimeTables";
+    LogicalTableConfig logicalTable = new LogicalTableConfig();
+    logicalTable.setTableName(logicalTableName);
+    Map<String, PhysicalTableConfig> tableConfigMap = new HashMap<>();
+    for (String tableName : physicalTableNames) {
+      tableConfigMap.put(tableName, new PhysicalTableConfig());
+    }
+    logicalTable.setPhysicalTableConfigMap(tableConfigMap);
+    logicalTable.setBrokerTenant("brokerTenant");
+    logicalTable.setRefRealtimeTableName(physicalTableNames.get(0));
+    
when(_tableCache.getLogicalTableConfig(eq(logicalTableName))).thenReturn(logicalTable);
+
+    TableRouteInfo routeInfo =
+        
_logicalTableRouteProvider.getTableRouteInfo(logicalTable.getTableName(), 
_tableCache, _routingManager);
+    assertNull(routeInfo.getOfflineTableConfig());
+    assertNotNull(routeInfo.getRealtimeTableConfig());
+    assertTrue(routeInfo.isExists());
+    assertFalse(routeInfo.isHybrid());
+    assertTrue(routeInfo.isRealtime());
+    assertNull(routeInfo.getOfflineTableName());
+    assertEquals(routeInfo.getRealtimeTableName(), 
"testWithRealtimeTables_REALTIME");
+    assertTrue(routeInfo.isRouteExists());
+    assertFalse(routeInfo.isDisabled());
+    assertNull(routeInfo.getDisabledTableNames());
+    assertNull(routeInfo.getTimeBoundaryInfo());
+  }
+
+  @DataProvider(name = "hybridTableName")
+  public Object[][] hybridTableName() {
+    return new Object[][]{
+        {"e"},
+    };
+  }
+
+  @Test(dataProvider = "hybridTableName")
+  void testWithHybridTable(String hybridTableName) {
+    TableRouteInfo routeInfo = getLogicalTableRouteInfo(hybridTableName, 
"testWithHybridTable");
+    assertNotNull(routeInfo.getOfflineTableConfig());
+    assertNotNull(routeInfo.getRealtimeTableConfig());
+    assertTrue(routeInfo.isExists());
+    assertTrue(routeInfo.isHybrid());
+    assertEquals(routeInfo.getOfflineTableName(), 
"testWithHybridTable_OFFLINE");
+    assertEquals(routeInfo.getRealtimeTableName(), 
"testWithHybridTable_REALTIME");
+    assertTrue(routeInfo.isRouteExists());
+    assertFalse(routeInfo.isDisabled());
+    assertNull(routeInfo.getDisabledTableNames());
+  }
+
+  @Test(dataProvider = "disabledTableProvider")
+  void testWithDisabledPhysicalTable(String tableName) {
+    TableRouteInfo routeInfo = getLogicalTableRouteInfo(tableName, 
"testWithDisabledPhysicalTable");
+    LogicalTableConfig logicalTableConfig = 
_tableCache.getLogicalTableConfig("testWithDisabledPhysicalTable");
+    assertNotNull(logicalTableConfig);
+    assertNotNull(logicalTableConfig.getPhysicalTableConfigMap());
+    assertTrue(routeInfo.isExists());
+    assertTrue(routeInfo.isDisabled());
+    assertNotNull(routeInfo.getDisabledTableNames());
+    assertEquals(new HashSet<>(routeInfo.getDisabledTableNames()),
+        logicalTableConfig.getPhysicalTableConfigMap().keySet());
+  }
+
+  @DataProvider(name = "offlineTableWithOtherTables")
+  public Object[][] offlineTableMixedList() {
+    return new Object[][]{
+        {ImmutableList.of("b_OFFLINE", "a_REALTIME")}, {
+        ImmutableList.of("b_OFFLINE", "hybrid_o_disabled_REALTIME")
+    }, {ImmutableList.of("b_OFFLINE", "no_route_table_O_REALTIME")}, {
+        ImmutableList.of("b_OFFLINE", "no_route_table_R_OFFLINE")
+    }, {ImmutableList.of("b_OFFLINE", "o_disabled_REALTIME")}, 
{ImmutableList.of("b_OFFLINE", "r_disabled_OFFLINE")},
+    };
+  }
+
+  @Test(dataProvider = "offlineTableWithOtherTables")
+  void testWithOfflineTableWithOtherTables(List<String> physicalTableNames) {
+    String logicalTableName = "testWithOfflineTableWithOtherTables";
+    LogicalTableConfig logicalTable = new LogicalTableConfig();
+    logicalTable.setTableName(logicalTableName);
+    Map<String, PhysicalTableConfig> tableConfigMap = new HashMap<>();
+    for (String tableName : physicalTableNames) {
+      tableConfigMap.put(tableName, new PhysicalTableConfig());
+      if (logicalTable.getRefOfflineTableName() == null && 
TableNameBuilder.isOfflineTableResource(tableName)) {
+        logicalTable.setRefOfflineTableName(tableName);
+      } else if (logicalTable.getRefRealtimeTableName() == null && 
TableNameBuilder.isRealtimeTableResource(
+          tableName)) {
+        logicalTable.setRefRealtimeTableName(tableName);
+      }
+    }
+    logicalTable.setPhysicalTableConfigMap(tableConfigMap);
+    logicalTable.setBrokerTenant("brokerTenant");
+    
when(_tableCache.getLogicalTableConfig(eq(logicalTableName))).thenReturn(logicalTable);
+
+    TableRouteInfo routeInfo =
+        
_logicalTableRouteProvider.getTableRouteInfo(logicalTable.getTableName(), 
_tableCache, _routingManager);
+    assertTrue(routeInfo.isExists());
+    assertFalse(routeInfo.isDisabled());
+    assertTrue(routeInfo.isRouteExists());
+    assertTrue(routeInfo.hasOffline());
+  }
+
+  @DataProvider(name = "realTimeTableWithOtherTables")
+  public Object[][] physicalTableMixedList() {
+    return new Object[][]{
+        {ImmutableList.of("a_REALTIME", "b_OFFLINE")}, {
+        ImmutableList.of("a_REALTIME", "hybrid_o_disabled_OFFLINE")
+    }, {ImmutableList.of("a_REALTIME", "no_route_table_O_OFFLINE")}, {
+        ImmutableList.of("a_REALTIME", "no_route_table_R_REALTIME")
+    }, {ImmutableList.of("a_REALTIME", "o_disabled_OFFLINE")}, 
{ImmutableList.of("a_REALTIME", "r_disabled_REALTIME")},
+    };
+  }
+
+  @Test(dataProvider = "realTimeTableWithOtherTables")
+  void testWithRealTimeTableWithOtherTables(List<String> physicalTableNames) {
+    String logicalTableName = "testWithRealTimeTableWithOtherTables";
+    LogicalTableConfig logicalTable = new LogicalTableConfig();
+    logicalTable.setTableName(logicalTableName);
+    Map<String, PhysicalTableConfig> tableConfigMap = new HashMap<>();
+    for (String tableName : physicalTableNames) {
+      tableConfigMap.put(tableName, new PhysicalTableConfig());
+      if (logicalTable.getRefOfflineTableName() == null && 
TableNameBuilder.isOfflineTableResource(tableName)) {
+        logicalTable.setRefOfflineTableName(tableName);
+      } else if (logicalTable.getRefRealtimeTableName() == null && 
TableNameBuilder.isRealtimeTableResource(
+          tableName)) {
+        logicalTable.setRefRealtimeTableName(tableName);
+      }
+    }
+    logicalTable.setPhysicalTableConfigMap(tableConfigMap);
+    logicalTable.setBrokerTenant("brokerTenant");
+    
when(_tableCache.getLogicalTableConfig(eq(logicalTableName))).thenReturn(logicalTable);
+
+    TableRouteInfo routeInfo =
+        
_logicalTableRouteProvider.getTableRouteInfo(logicalTable.getTableName(), 
_tableCache, _routingManager);
+    assertTrue(routeInfo.isExists());
+    assertFalse(routeInfo.isDisabled());
+    assertTrue(routeInfo.isRouteExists());
+    assertTrue(routeInfo.hasRealtime());
+  }
+}
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
index b54f176630..d6531026be 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
@@ -202,6 +202,12 @@ public class MockRoutingManagerFactory {
       return _routingTableMap.get(tableNameWithType);
     }
 
+    @Nullable
+    @Override
+    public RoutingTable getRoutingTable(BrokerRequest brokerRequest, String 
tableNameWithType, long requestId) {
+      return _routingTableMap.get(tableNameWithType);
+    }
+
     @Nullable
     @Override
     public List<String> getSegments(BrokerRequest brokerRequest) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to