This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ba99dc4310 [Logical Table] Support logical tables in MSE. (#15773)
ba99dc4310 is described below

commit ba99dc4310615e4a299c278bf336a74d79dc71ce
Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com>
AuthorDate: Fri May 30 19:24:31 2025 +0530

    [Logical Table] Support logical tables in MSE. (#15773)
---
 .../api/resources/PinotQueryResource.java          |  54 +++++++--
 .../helix/core/PinotHelixResourceManager.java      |  11 ++
 .../core/data/manager/InstanceDataManager.java     |   6 +
 .../core/data/manager/LogicalTableContext.java     |  55 +++++++++
 .../BaseLogicalTableIntegrationTest.java           | 115 +++++++++++++++++--
 .../apache/pinot/query/catalog/PinotCatalog.java   |  11 +-
 .../planner/physical/DispatchablePlanContext.java  |   7 ++
 .../planner/physical/DispatchablePlanMetadata.java |  26 +++++
 .../planner/physical/DispatchablePlanVisitor.java  |  17 ++-
 .../apache/pinot/query/routing/WorkerManager.java  |  96 +++++++++++++++-
 .../apache/pinot/query/routing/WorkerMetadata.java |  27 ++++-
 .../table/ImplicitHybridTableRouteProvider.java    |  17 ++-
 .../query/routing/table/LogicalTableRouteInfo.java |  53 +++++----
 .../routing/table/LogicalTableRouteProvider.java   |  94 ++++++++++------
 .../routing/table/PhysicalTableRouteProvider.java  |   1 -
 .../timeboundary/MinTimeBoundaryStrategy.java      |  47 ++++----
 .../query/timeboundary/TimeBoundaryStrategy.java   |  12 +-
 .../query/routing/table/BaseTableRouteTest.java    |   2 +-
 ...HybridTableRouteProviderCalculateRouteTest.java |  17 ++-
 ...tHybridTableRouteProviderGetTableRouteTest.java |  65 +++--------
 .../timeboundary/MinTimeBoundaryStrategyTest.java  |   5 +-
 .../plan/server/ServerPlanRequestUtils.java        | 124 ++++++++++++++++++---
 .../starter/helix/HelixInstanceDataManager.java    |  40 +++++++
 23 files changed, 719 insertions(+), 183 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
index 73d880570e..ceaaa8bf07 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
@@ -69,6 +69,7 @@ import 
org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
 import org.apache.pinot.query.QueryEnvironment;
 import org.apache.pinot.query.parser.utils.ParserUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.exception.DatabaseConflictException;
 import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.apache.pinot.spi.exception.QueryErrorMessage;
@@ -240,12 +241,30 @@ public class PinotQueryResource {
     List<String> instanceIds;
     if (!tableNames.isEmpty()) {
       List<TableConfig> tableConfigList = getListTableConfigs(tableNames, 
database);
-      if (tableConfigList == null || tableConfigList.isEmpty()) {
-        throw QueryErrorCode.TABLE_DOES_NOT_EXIST.asException("Unable to find 
table in cluster, table does not exist");
+      List<LogicalTableConfig> logicalTableConfigList = null;
+      // First check for table configs, if not found, check for logical table 
configs.
+      if (tableConfigList.size() != tableNames.size()) {
+        logicalTableConfigList = getListLogicalTableConfigs(tableNames, 
database);
+        // If config is not found for all tables, then find the tables that 
are not found.
+        if ((tableConfigList.size() + logicalTableConfigList.size()) != 
tableNames.size()) {
+          Set<String> tableNamesFoundSet = new HashSet<>();
+          for (TableConfig tableConfig : tableConfigList) {
+            tableNamesFoundSet.add(tableConfig.getTableName());
+          }
+          for (LogicalTableConfig logicalTableConfig : logicalTableConfigList) 
{
+            tableNamesFoundSet.add(logicalTableConfig.getTableName());
+          }
+
+          List<String> tablesNotFound = tableNames.stream().filter(name -> 
!tableNamesFoundSet.contains(name))
+              .collect(Collectors.toList());
+
+          throw QueryErrorCode.TABLE_DOES_NOT_EXIST.asException(
+              "Unable to find table in cluster, table does not exist for 
tables: " + tablesNotFound);
+        }
       }
 
       // find the unions of all the broker tenant tags of the queried tables.
-      Set<String> brokerTenantsUnion = getBrokerTenantsUnion(tableConfigList);
+      Set<String> brokerTenantsUnion = getBrokerTenantsUnion(tableConfigList, 
logicalTableConfigList);
       if (brokerTenantsUnion.isEmpty()) {
         throw QueryErrorCode.BROKER_REQUEST_SEND.asException("Unable to find 
broker tenant for tables: " + tableNames);
       }
@@ -324,14 +343,28 @@ public class PinotQueryResource {
       if (_pinotHelixResourceManager.hasOfflineTable(actualTableName)) {
         
tableConfigList.add(Objects.requireNonNull(_pinotHelixResourceManager.getOfflineTableConfig(actualTableName)));
       }
-      if (tableConfigList.isEmpty()) {
-        return null;
+      // If no table configs found for the table, skip it.
+      if (!tableConfigList.isEmpty()) {
+        allTableConfigList.addAll(tableConfigList);
       }
-      allTableConfigList.addAll(tableConfigList);
     }
     return allTableConfigList;
   }
 
+  private List<LogicalTableConfig> getListLogicalTableConfigs(List<String> 
tableNames, String database) {
+    List<LogicalTableConfig> allLogicalTableConfigList = new ArrayList<>();
+    for (String tableName : tableNames) {
+      String actualTableName = 
_pinotHelixResourceManager.getActualLogicalTableName(tableName, database);
+      LogicalTableConfig logicalTableConfig =
+          _pinotHelixResourceManager.getLogicalTableConfig(actualTableName);
+      if (logicalTableConfig != null) {
+        allLogicalTableConfigList.add(logicalTableConfig);
+      }
+    }
+    return allLogicalTableConfigList;
+  }
+
+
   private String selectRandomInstanceId(List<String> instanceIds) {
     if (instanceIds.isEmpty()) {
       throw QueryErrorCode.BROKER_RESOURCE_MISSING.asException("No broker 
found for query");
@@ -356,11 +389,18 @@ public class PinotQueryResource {
   }
 
   // return the union of brokerTenants from the tables list.
-  private Set<String> getBrokerTenantsUnion(List<TableConfig> tableConfigList) 
{
+  private Set<String> getBrokerTenantsUnion(List<TableConfig> tableConfigList,
+      @Nullable List<LogicalTableConfig> logicalTableConfigList) {
     Set<String> tableBrokerTenants = new HashSet<>();
     for (TableConfig tableConfig : tableConfigList) {
       tableBrokerTenants.add(tableConfig.getTenantConfig().getBroker());
     }
+
+    if (logicalTableConfigList != null) {
+      for (LogicalTableConfig logicalTableConfig : logicalTableConfigList) {
+        tableBrokerTenants.add(logicalTableConfig.getBrokerTenant());
+      }
+    }
     return tableBrokerTenants;
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 0a4b2d37f3..ef823d55bf 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -906,6 +906,17 @@ public class PinotHelixResourceManager {
     return actualTableName != null ? actualTableName : tableName;
   }
 
+  /**
+   * Given a logical table name in any case, returns the logical table name as 
defined in Helix/Segment/Schema
+   * @param logicalTableName logical tableName in any case.
+   * @return logicalTableName actually defined in Pinot (matches case) and 
exists ,else, return the input value
+   */
+  public String getActualLogicalTableName(String logicalTableName, @Nullable 
String databaseName) {
+    logicalTableName = DatabaseUtils.translateTableName(logicalTableName, 
databaseName, _tableCache.isIgnoreCase());
+    String actualTableName = 
_tableCache.getActualLogicalTableName(logicalTableName);
+    return actualTableName != null ? actualTableName : logicalTableName;
+  }
+
   /**
    * Table related APIs
    */
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index d5572dce52..15ef79f465 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -201,4 +201,10 @@ public interface InstanceDataManager {
    * Returns the instance data directory
    */
   String getInstanceDataDir();
+
+  /**
+   * Returns the logical table config and schema for the given logical table 
name.
+   */
+  @Nullable
+  LogicalTableContext getLogicalTableContext(String logicalTableName);
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/LogicalTableContext.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/LogicalTableContext.java
new file mode 100644
index 0000000000..86592f2db7
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/LogicalTableContext.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager;
+
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+public class LogicalTableContext {
+  private final LogicalTableConfig _logicalTableConfig;
+  private final Schema _logicalTableSchema;
+  private final TableConfig _refOfflineTableConfig;
+  private final TableConfig _refRealtimeTableConfig;
+
+  public LogicalTableContext(LogicalTableConfig logicalTableConfig, Schema 
logicalTableSchema,
+      TableConfig refOfflineTableConfig, TableConfig refRealtimeTableConfig) {
+    _logicalTableConfig = logicalTableConfig;
+    _logicalTableSchema = logicalTableSchema;
+    _refOfflineTableConfig = refOfflineTableConfig;
+    _refRealtimeTableConfig = refRealtimeTableConfig;
+  }
+
+  public LogicalTableConfig getLogicalTableConfig() {
+    return _logicalTableConfig;
+  }
+
+  public Schema getLogicalTableSchema() {
+    return _logicalTableSchema;
+  }
+
+  public TableConfig getRefOfflineTableConfig() {
+    return _refOfflineTableConfig;
+  }
+
+  public TableConfig getRefRealtimeTableConfig() {
+    return _refRealtimeTableConfig;
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
index 7fb4802cc1..0dcbb3a9f3 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
@@ -34,6 +34,7 @@ import 
org.apache.pinot.controller.helix.ControllerRequestClient;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet;
 import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
+import org.apache.pinot.integration.tests.QueryAssert;
 import org.apache.pinot.integration.tests.QueryGenerator;
 import org.apache.pinot.spi.config.table.QueryConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -48,6 +49,7 @@ import 
org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
+import org.intellij.lang.annotations.Language;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -67,6 +69,7 @@ public abstract class BaseLogicalTableIntegrationTest extends 
BaseClusterIntegra
   private static final String DEFAULT_TENANT = "DefaultTenant";
   private static final String DEFAULT_LOGICAL_TABLE_NAME = "mytable";
   protected static final String DEFAULT_TABLE_NAME = "physicalTable";
+  protected static final String EMPTY_OFFLINE_TABLE_NAME = "empty_o";
   protected static BaseLogicalTableIntegrationTest _sharedClusterTestSuite = 
null;
   protected List<File> _avroFiles;
 
@@ -111,6 +114,7 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
       _controllerRequestURLBuilder = 
_sharedClusterTestSuite._controllerRequestURLBuilder;
       _helixResourceManager = _sharedClusterTestSuite._helixResourceManager;
       _kafkaStarters = _sharedClusterTestSuite._kafkaStarters;
+      _controllerBaseApiUrl = _sharedClusterTestSuite._controllerBaseApiUrl;
     }
 
     _avroFiles = getAllAvroFiles();
@@ -164,6 +168,7 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
 
     // Wait for all documents loaded
     waitForAllDocsLoaded(600_000L);
+    createLogicalTableWithEmptyOfflineTable();
   }
 
   @AfterClass
@@ -250,6 +255,13 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
     return DEFAULT_TENANT;
   }
 
+  // Setup H2 table with the same name as the logical table.
+  protected void setUpH2Connection(List<File> avroFiles)
+      throws Exception {
+    setUpH2Connection();
+    ClusterIntegrationTestUtils.setUpH2TableWithAvro(avroFiles, 
getLogicalTableName(), _h2Connection);
+  }
+
   /**
    * Creates a new OFFLINE table config.
    */
@@ -324,12 +336,35 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
     return LogicalTableConfig.fromString(resp);
   }
 
-  protected void deleteLogicalTable()
+  private void createLogicalTableWithEmptyOfflineTable()
       throws IOException {
-    String deleteLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableDelete(getLogicalTableName());
-    // delete logical table
-    String deleteResponse = 
ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, getHeaders());
-    assertEquals(deleteResponse, "{\"status\":\"" + getLogicalTableName() + " 
logical table successfully deleted.\"}");
+    Schema schema = createSchema(getSchemaFileName());
+    
schema.setSchemaName(TableNameBuilder.extractRawTableName(EMPTY_OFFLINE_TABLE_NAME));
+    addSchema(schema);
+
+    Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
+    TableConfig offlineTableConfig = 
createOfflineTableConfig(EMPTY_OFFLINE_TABLE_NAME);
+    addTableConfig(offlineTableConfig);
+    
physicalTableConfigMap.put(TableNameBuilder.OFFLINE.tableNameWithType(EMPTY_OFFLINE_TABLE_NAME),
+        new PhysicalTableConfig());
+    String refOfflineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(EMPTY_OFFLINE_TABLE_NAME);
+
+    String logicalTableName = EMPTY_OFFLINE_TABLE_NAME + "_logical";
+
+    String addLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableCreate();
+    Schema logicalTableSchema = createSchema(getSchemaFileName());
+    logicalTableSchema.setSchemaName(logicalTableName);
+    addSchema(logicalTableSchema);
+    LogicalTableConfigBuilder builder =
+        new LogicalTableConfigBuilder().setTableName(logicalTableName)
+            .setBrokerTenant(DEFAULT_TENANT)
+            .setRefOfflineTableName(refOfflineTableName)
+            .setPhysicalTableConfigMap(physicalTableConfigMap);
+
+    String resp =
+        ControllerTest.sendPostRequest(addLogicalTableUrl, 
builder.build().toSingleLineJsonString(), getHeaders());
+    assertEquals(resp, "{\"unrecognizedProperties\":{},\"status\":\"" + 
logicalTableName
+        + " logical table successfully added.\"}");
   }
 
   @Override
@@ -414,22 +449,24 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
     assertEquals(new HashSet<>(getPhysicalTableNames()), 
logicalTableConfig.getPhysicalTableConfigMap().keySet());
   }
 
-  @Test
-  public void testHardcodedQueries()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testHardcodedQueries(boolean useMultiStageQueryEngine)
       throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     super.testHardcodedQueries();
   }
 
-  @Test
   public void testQueriesFromQueryFile()
       throws Exception {
+    setUseMultiStageQueryEngine(false);
     super.testQueriesFromQueryFile();
   }
 
-  @Test
-  public void testGeneratedQueries()
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testGeneratedQueries(boolean useMultiStageQueryEngine)
       throws Exception {
-    super.testGeneratedQueries(true, false);
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    super.testGeneratedQueries(true, useMultiStageQueryEngine);
   }
 
   @Test
@@ -558,4 +595,60 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
     exceptions = response.get("exceptions");
     assertTrue(exceptions.isEmpty(), "Query should not throw exception");
   }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testLogicalTableWithEmptyOfflineTable(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+    String logicalTableName = EMPTY_OFFLINE_TABLE_NAME + "_logical";
+    // Query should return empty result
+    JsonNode queryResponse = postQuery("SELECT count(*) FROM " + 
logicalTableName);
+    assertEquals(queryResponse.get("numDocsScanned").asInt(), 0);
+    assertEquals(queryResponse.get("numServersQueried").asInt(), 
useMultiStageQueryEngine ? 1 : 0);
+    assertTrue(queryResponse.get("exceptions").isEmpty());
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  void testControllerQuerySubmit(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    @Language("sql")
+    String query = "SELECT count(*) FROM " + getLogicalTableName();
+    JsonNode response = postQueryToController(query);
+    assertNoError(response);
+
+    query = "SELECT count(*) FROM " + getOfflineTableNames().get(0);
+    response = postQueryToController(query);
+    assertNoError(response);
+
+    query = "SELECT count(*) FROM unknown";
+    response = postQueryToController(query);
+    
QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST)
+        .containsMessage("TableDoesNotExistError");
+  }
+
+  @Test
+  void testControllerJoinQuerySubmit()
+      throws Exception {
+    setUseMultiStageQueryEngine(true);
+    @Language("sql")
+    String query = "SELECT count(*) FROM " + getLogicalTableName() + " JOIN " 
+ getPhysicalTableNames().get(0)
+        + " ON " + getLogicalTableName() + ".FlightNum = " + 
getPhysicalTableNames().get(0) + ".FlightNum";
+    JsonNode response = postQueryToController(query);
+    assertNoError(response);
+
+    query = "SELECT count(*) FROM unknown JOIN " + 
getPhysicalTableNames().get(0)
+        + " ON unknown.FlightNum = " + getPhysicalTableNames().get(0) + 
".FlightNum";
+    response = postQueryToController(query);
+    
QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST)
+        .containsMessage("TableDoesNotExistError");
+
+    query = "SELECT count(*) FROM " + getLogicalTableName() + " JOIN known  ON 
"
+        + getLogicalTableName() + ".FlightNum = unknown.FlightNum";
+    response = postQueryToController(query);
+    
QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST)
+        .containsMessage("TableDoesNotExistError");
+  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java
index da40d6c48c..c4ba328cee 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.catalog;
 import java.util.Collection;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import org.apache.calcite.linq4j.tree.Expression;
 import org.apache.calcite.rel.type.RelProtoDataType;
@@ -68,10 +69,14 @@ public class PinotCatalog implements Schema {
     String rawTableName = TableNameBuilder.extractRawTableName(name);
     String physicalTableName = DatabaseUtils.translateTableName(rawTableName, 
_databaseName);
     String tableName = _tableCache.getActualTableName(physicalTableName);
+
     if (tableName == null) {
-      return null;
+      tableName = _tableCache.getActualLogicalTableName(physicalTableName);
     }
 
+    if (tableName == null) {
+      return null;
+    }
     org.apache.pinot.spi.data.Schema schema = _tableCache.getSchema(tableName);
     if (schema == null) {
       return null;
@@ -86,7 +91,9 @@ public class PinotCatalog implements Schema {
    */
   @Override
   public Set<String> getTableNames() {
-    return _tableCache.getTableNameMap().keySet().stream().filter(n -> 
DatabaseUtils.isPartOfDatabase(n, _databaseName))
+    return Stream.concat(_tableCache.getTableNameMap().keySet().stream(),
+            _tableCache.getLogicalTableNameMap().keySet().stream())
+        .filter(n -> DatabaseUtils.isPartOfDatabase(n, _databaseName))
         .collect(Collectors.toSet());
   }
 
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
index 5aaf277858..fbace37191 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
@@ -103,8 +103,12 @@ public class DispatchablePlanContext {
           dispatchablePlanMetadata.getWorkerIdToServerInstanceMap();
       Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap =
           dispatchablePlanMetadata.getWorkerIdToSegmentsMap();
+      Map<Integer, Map<String, List<String>>> workerIdToTableNameSegmentsMap =
+          dispatchablePlanMetadata.getWorkerIdToTableSegmentsMap();
       Map<Integer, Map<Integer, MailboxInfos>> workerIdToMailboxesMap =
           dispatchablePlanMetadata.getWorkerIdToMailboxesMap();
+      Preconditions.checkArgument(workerIdToSegmentsMap == null || 
workerIdToTableNameSegmentsMap == null,
+          "Both workerIdToSegmentsMap and workerIdToTableNameSegmentsMap 
cannot be set at the same time");
       Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdsMap = 
new HashMap<>();
       WorkerMetadata[] workerMetadataArray = new 
WorkerMetadata[workerIdToServerInstanceMap.size()];
       for (Map.Entry<Integer, QueryServerInstance> serverEntry : 
workerIdToServerInstanceMap.entrySet()) {
@@ -115,6 +119,9 @@ public class DispatchablePlanContext {
         if (workerIdToSegmentsMap != null) {
           
workerMetadata.setTableSegmentsMap(workerIdToSegmentsMap.get(workerId));
         }
+        if (workerIdToTableNameSegmentsMap != null) {
+          
workerMetadata.setLogicalTableSegmentsMap(workerIdToTableNameSegmentsMap.get(workerId));
+        }
         workerMetadataArray[workerId] = workerMetadata;
       }
 
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
index 0c27906ee6..ec88a8e4f4 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.query.routing.MailboxInfos;
 import org.apache.pinot.query.routing.QueryServerInstance;
+import org.apache.pinot.query.routing.table.LogicalTableRouteInfo;
 
 
 /**
@@ -80,6 +81,12 @@ public class DispatchablePlanMetadata implements 
Serializable {
   // Map from workerId -> {planFragmentId -> mailboxes}
   private final Map<Integer, Map<Integer, MailboxInfos>> 
_workerIdToMailboxesMap = new HashMap<>();
 
+  /**
+   * Map from workerId -> {physicalTableName -> segments} is required for 
logical tables.
+   */
+  private Map<Integer, Map<String, List<String>>> _workerIdToTableSegmentsMap;
+  private LogicalTableRouteInfo _logicalTableRouteInfo;
+
   public List<String> getScannedTables() {
     return _scannedTables;
   }
@@ -178,4 +185,23 @@ public class DispatchablePlanMetadata implements 
Serializable {
   public void addUnavailableSegments(String tableName, Collection<String> 
unavailableSegments) {
     _tableToUnavailableSegmentsMap.computeIfAbsent(tableName, k -> new 
HashSet<>()).addAll(unavailableSegments);
   }
+
+  @Nullable
+  public LogicalTableRouteInfo getLogicalTableRouteInfo() {
+    return _logicalTableRouteInfo;
+  }
+
+  public void setLogicalTableRouteInfo(LogicalTableRouteInfo 
logicalTableRouteInfo) {
+    _logicalTableRouteInfo = logicalTableRouteInfo;
+  }
+
+  @Nullable
+  public Map<Integer, Map<String, List<String>>> 
getWorkerIdToTableSegmentsMap() {
+    return _workerIdToTableSegmentsMap;
+  }
+
+  public void setWorkerIdToTableSegmentsMap(
+      Map<Integer, Map<String, List<String>>> workerIdToTableSegmentsMap) {
+    _workerIdToTableSegmentsMap = workerIdToTableSegmentsMap;
+  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
index c79a6bec68..848bea8fbc 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.planner.physical;
 
+import com.google.common.base.Preconditions;
 import java.util.Collections;
 import java.util.IdentityHashMap;
 import java.util.Set;
@@ -38,6 +39,8 @@ import org.apache.pinot.query.planner.plannode.SortNode;
 import org.apache.pinot.query.planner.plannode.TableScanNode;
 import org.apache.pinot.query.planner.plannode.ValueNode;
 import org.apache.pinot.query.planner.plannode.WindowNode;
+import org.apache.pinot.query.routing.table.LogicalTableRouteInfo;
+import org.apache.pinot.query.routing.table.LogicalTableRouteProvider;
 
 
 public class DispatchablePlanVisitor implements PlanNodeVisitor<Void, 
DispatchablePlanContext> {
@@ -137,7 +140,19 @@ public class DispatchablePlanVisitor implements 
PlanNodeVisitor<Void, Dispatchab
   @Override
   public Void visitTableScan(TableScanNode node, DispatchablePlanContext 
context) {
     DispatchablePlanMetadata dispatchablePlanMetadata = 
getOrCreateDispatchablePlanMetadata(node, context);
-    
dispatchablePlanMetadata.addScannedTable(_tableCache.getActualTableName(node.getTableName()));
+
+    String tableNameInNode = node.getTableName();
+    String tableName = _tableCache.getActualTableName(tableNameInNode);
+    if (tableName == null) {
+      tableName = _tableCache.getActualLogicalTableName(tableNameInNode);
+      Preconditions.checkNotNull(tableName, "Logical table config not found in 
table cache: " + tableNameInNode);
+      LogicalTableRouteProvider tableRouteProvider = new 
LogicalTableRouteProvider();
+      LogicalTableRouteInfo logicalTableRouteInfo = new 
LogicalTableRouteInfo();
+      tableRouteProvider.fillTableConfigMetadata(logicalTableRouteInfo, 
tableName, _tableCache);
+      dispatchablePlanMetadata.setLogicalTableRouteInfo(logicalTableRouteInfo);
+    }
+
+    dispatchablePlanMetadata.addScannedTable(tableName);
     dispatchablePlanMetadata.setTableOptions(
         
node.getNodeHint().getHintOptions().get(PinotHintOptions.TABLE_HINT_OPTIONS));
     return null;
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index b474b06b4e..4a28f11632 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -35,18 +35,22 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
 import org.apache.pinot.calcite.rel.rules.ImmutableTableOptions;
 import org.apache.pinot.calcite.rel.rules.TableOptions;
+import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.routing.RoutingTable;
 import org.apache.pinot.core.routing.ServerRouteInfo;
 import org.apache.pinot.core.routing.TablePartitionReplicatedServersInfo;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.core.transport.TableRouteInfo;
 import org.apache.pinot.query.planner.PlanFragment;
 import org.apache.pinot.query.planner.physical.DispatchablePlanContext;
 import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata;
 import org.apache.pinot.query.planner.plannode.MailboxSendNode;
 import org.apache.pinot.query.planner.plannode.PlanNode;
 import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.routing.table.LogicalTableRouteInfo;
+import org.apache.pinot.query.routing.table.LogicalTableRouteProvider;
 import org.apache.pinot.spi.config.table.TableType;
 import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -373,7 +377,11 @@ public class WorkerManager {
     DispatchablePlanMetadata metadata = 
context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId());
     Map<String, String> tableOptions = metadata.getTableOptions();
     if (tableOptions == null) {
-      assignWorkersToNonPartitionedLeafFragment(metadata, context);
+      if (metadata.getLogicalTableRouteInfo() != null) {
+        assignWorkersToNonPartitionedLeafFragmentForLogicalTable(metadata, 
context);
+      } else {
+        assignWorkersToNonPartitionedLeafFragment(metadata, context);
+      }
       return;
     }
 
@@ -392,7 +400,11 @@ public class WorkerManager {
     if 
(Boolean.parseBoolean(tableOptions.get(PinotHintOptions.TableHintOptions.IS_REPLICATED)))
 {
       setSegmentsForReplicatedLeafFragment(metadata);
     } else {
-      assignWorkersToNonPartitionedLeafFragment(metadata, context);
+      if (metadata.getLogicalTableRouteInfo() != null) {
+        assignWorkersToNonPartitionedLeafFragmentForLogicalTable(metadata, 
context);
+      } else {
+        assignWorkersToNonPartitionedLeafFragment(metadata, context);
+      }
     }
   }
 
@@ -539,6 +551,86 @@ public class WorkerManager {
         CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM \"" + 
tableNameWithType + "\""));
   }
 
+  private void 
assignWorkersToNonPartitionedLeafFragmentForLogicalTable(DispatchablePlanMetadata
 metadata,
+      DispatchablePlanContext context) {
+    LogicalTableRouteInfo logicalTableRouteInfo = 
metadata.getLogicalTableRouteInfo();
+    Preconditions.checkNotNull(logicalTableRouteInfo);
+    LogicalTableRouteProvider tableRouteProvider = new 
LogicalTableRouteProvider();
+    tableRouteProvider.fillRouteMetadata(logicalTableRouteInfo, 
_routingManager);
+    if (logicalTableRouteInfo.getTimeBoundaryInfo() != null) {
+      
metadata.setTimeBoundaryInfo(logicalTableRouteInfo.getTimeBoundaryInfo());
+    }
+    BrokerRequest offlineBrokerRequest = null;
+    BrokerRequest realtimeBrokerRequest = null;
+
+    if (logicalTableRouteInfo.hasOffline()) {
+      offlineBrokerRequest = CalciteSqlCompiler.compileToBrokerRequest(
+          "SELECT * FROM \"" + logicalTableRouteInfo.getOfflineTableName() + 
"\"");
+    }
+
+    if (logicalTableRouteInfo.hasRealtime()) {
+      realtimeBrokerRequest = CalciteSqlCompiler.compileToBrokerRequest(
+          "SELECT * FROM \"" + logicalTableRouteInfo.getRealtimeTableName() + 
"\"");
+    }
+
+    tableRouteProvider.calculateRoutes(logicalTableRouteInfo, _routingManager, 
offlineBrokerRequest,
+        realtimeBrokerRequest, context.getRequestId());
+
+    assignTableSegmentsToWorkers(logicalTableRouteInfo, metadata);
+  }
+
+  private static void assignTableSegmentsToWorkers(LogicalTableRouteInfo 
logicalTableRouteInfo,
+      DispatchablePlanMetadata metadata) {
+    Map<ServerInstance, Map<String, List<String>>> 
serverInstanceToLogicalSegmentsMap =
+        new HashMap<>();
+
+    if (logicalTableRouteInfo.getOfflineTables() != null) {
+      for (TableRouteInfo physicalTableRoute : 
logicalTableRouteInfo.getOfflineTables()) {
+        // Routing table maybe null if no routing table is found OR there are 
no segments.
+        if (physicalTableRoute.getOfflineRoutingTable() != null) {
+          
transferToServerInstanceLogicalSegmentsMap(physicalTableRoute.getOfflineTableName(),
+              physicalTableRoute.getOfflineRoutingTable(), 
serverInstanceToLogicalSegmentsMap);
+        }
+      }
+    }
+
+    if (logicalTableRouteInfo.getRealtimeTables() != null) {
+      for (TableRouteInfo physicalTableRoute : 
logicalTableRouteInfo.getRealtimeTables()) {
+        // Routing table maybe null if no routing table is found OR there are 
no segments.
+        if (physicalTableRoute.getRealtimeRoutingTable() != null) {
+          
transferToServerInstanceLogicalSegmentsMap(physicalTableRoute.getRealtimeTableName(),
+              physicalTableRoute.getRealtimeRoutingTable(), 
serverInstanceToLogicalSegmentsMap);
+        }
+      }
+    }
+
+    int workerId = 0;
+    Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = new 
HashMap<>();
+    Map<Integer, Map<String, List<String>>> workerIdToLogicalTableSegmentsMap 
= new HashMap<>();
+    for (Map.Entry<ServerInstance, Map<String, List<String>>> entry
+        : serverInstanceToLogicalSegmentsMap.entrySet()) {
+      workerIdToServerInstanceMap.put(workerId, new 
QueryServerInstance(entry.getKey()));
+      workerIdToLogicalTableSegmentsMap.put(workerId, entry.getValue());
+      workerId++;
+    }
+
+    metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap);
+    metadata.setWorkerIdToTableSegmentsMap(workerIdToLogicalTableSegmentsMap);
+  }
+
+  private static void transferToServerInstanceLogicalSegmentsMap(String 
physicalTableName,
+      Map<ServerInstance, ServerRouteInfo> segmentsMap,
+      Map<ServerInstance, Map<String, List<String>>> 
serverInstanceToLogicalSegmentsMap) {
+    for (Map.Entry<ServerInstance, ServerRouteInfo> serverEntry : 
segmentsMap.entrySet()) {
+      Map<String, List<String>> tableNameToSegmentsMap =
+          
serverInstanceToLogicalSegmentsMap.computeIfAbsent(serverEntry.getKey(), k -> 
new HashMap<>());
+      // TODO: support optional segments for multi-stage engine.
+      Preconditions.checkState(
+          tableNameToSegmentsMap.put(physicalTableName, 
serverEntry.getValue().getSegments()) == null,
+          "Entry for server {} and physical table: {} already exist!", 
serverEntry.getKey(), physicalTableName);
+    }
+  }
+
   // --------------------------------------------------------------------------
   // Partitioned leaf stage assignment
   // --------------------------------------------------------------------------
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
index ab1226862e..b00a31294e 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
@@ -43,6 +43,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
  */
 public class WorkerMetadata {
   public static final String TABLE_SEGMENTS_MAP_KEY = "tableSegmentsMap";
+  public static final String LOGICAL_TABLE_SEGMENTS_MAP_KEY = 
"logicalTableSegmentsMap";
 
   private final int _workerId;
   private final Map<Integer, MailboxInfos> _mailboxInfosMap;
@@ -75,13 +76,17 @@ public class WorkerMetadata {
 
   @Nullable
   public Map<String, List<String>> getTableSegmentsMap() {
-    String tableSegmentsMapStr = _customProperties.get(TABLE_SEGMENTS_MAP_KEY);
+    return deserializeStringSegmentListMap(TABLE_SEGMENTS_MAP_KEY);
+  }
+
+  private Map<String, List<String>> deserializeStringSegmentListMap(String 
propertyKey) {
+    String tableSegmentsMapStr = _customProperties.get(propertyKey);
     if (tableSegmentsMapStr != null) {
       try {
         return JsonUtils.stringToObject(tableSegmentsMapStr, new 
TypeReference<Map<String, List<String>>>() {
         });
       } catch (IOException e) {
-        throw new RuntimeException("Unable to deserialize table segments map: 
" + tableSegmentsMapStr, e);
+        throw new RuntimeException("Unable to deserialize " + propertyKey + " 
: " + tableSegmentsMapStr, e);
       }
     } else {
       return null;
@@ -89,7 +94,8 @@ public class WorkerMetadata {
   }
 
   public boolean isLeafStageWorker() {
-    return _customProperties.containsKey(TABLE_SEGMENTS_MAP_KEY);
+    return _customProperties.containsKey(TABLE_SEGMENTS_MAP_KEY)
+        || _customProperties.containsKey(LOGICAL_TABLE_SEGMENTS_MAP_KEY);
   }
 
   public void setTableSegmentsMap(Map<String, List<String>> tableSegmentsMap) {
@@ -101,4 +107,19 @@ public class WorkerMetadata {
     }
     _customProperties.put(TABLE_SEGMENTS_MAP_KEY, tableSegmentsMapStr);
   }
+
+  @Nullable
+  public Map<String, List<String>> getLogicalTableSegmentsMap() {
+    return deserializeStringSegmentListMap(LOGICAL_TABLE_SEGMENTS_MAP_KEY);
+  }
+
+  public void setLogicalTableSegmentsMap(Map<String, List<String>> 
logicalTableSegmentsMap) {
+    String logicalTableSegmentsMapStr;
+    try {
+      logicalTableSegmentsMapStr = 
JsonUtils.objectToString(logicalTableSegmentsMap);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Unable to serialize table segments map: " + 
logicalTableSegmentsMap, e);
+    }
+    _customProperties.put(LOGICAL_TABLE_SEGMENTS_MAP_KEY, 
logicalTableSegmentsMapStr);
+  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProvider.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProvider.java
index 63543c41ce..48553c328d 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProvider.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProvider.java
@@ -46,10 +46,18 @@ public class ImplicitHybridTableRouteProvider implements 
TableRouteProvider {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ImplicitHybridTableRouteProvider.class);
 
   @Override
-  public ImplicitHybridTableRouteInfo getTableRouteInfo(String tableName, 
TableCache tableCache,
+  public TableRouteInfo getTableRouteInfo(String tableName, TableCache 
tableCache,
       RoutingManager routingManager) {
     ImplicitHybridTableRouteInfo tableRouteInfo = new 
ImplicitHybridTableRouteInfo();
 
+    fillTableConfigMetadata(tableRouteInfo, tableName, tableCache);
+    fillRouteMetadata(tableRouteInfo, routingManager);
+
+    return tableRouteInfo;
+  }
+
+  public void fillTableConfigMetadata(ImplicitHybridTableRouteInfo 
tableRouteInfo,
+      String tableName, TableCache tableCache) {
     TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
 
     if (tableType == TableType.OFFLINE) {
@@ -71,19 +79,24 @@ public class ImplicitHybridTableRouteProvider implements 
TableRouteProvider {
     if (realtimeTableName != null) {
       
tableRouteInfo.setRealtimeTableConfig(tableCache.getTableConfig(realtimeTableName));
     }
+  }
 
+  public void fillRouteMetadata(ImplicitHybridTableRouteInfo tableRouteInfo, 
RoutingManager routingManager) {
     if (tableRouteInfo.hasOffline()) {
+      String offlineTableName = tableRouteInfo.getOfflineTableName();
       
tableRouteInfo.setOfflineRouteExists(routingManager.routingExists(offlineTableName));
       
tableRouteInfo.setOfflineTableDisabled(routingManager.isTableDisabled(offlineTableName));
     }
 
     if (tableRouteInfo.hasRealtime()) {
+      String realtimeTableName = tableRouteInfo.getRealtimeTableName();
       
tableRouteInfo.setRealtimeRouteExists(routingManager.routingExists(realtimeTableName));
       
tableRouteInfo.setRealtimeTableDisabled(routingManager.isTableDisabled(realtimeTableName));
     }
 
     // Get TimeBoundaryInfo. If there is no time boundary, then do not 
consider the offline table.
     if (tableRouteInfo.isHybrid()) {
+      String offlineTableName = tableRouteInfo.getOfflineTableName();
       // Time boundary info might be null when there is no segment in the 
offline table, query real-time side only
       TimeBoundaryInfo timeBoundaryInfo = 
routingManager.getTimeBoundaryInfo(offlineTableName);
       if (timeBoundaryInfo == null) {
@@ -95,8 +108,6 @@ public class ImplicitHybridTableRouteProvider implements 
TableRouteProvider {
         tableRouteInfo.setTimeBoundaryInfo(timeBoundaryInfo);
       }
     }
-
-    return tableRouteInfo;
   }
 
   @Override
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteInfo.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteInfo.java
index 04d3b76bed..612f8e79cd 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteInfo.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteInfo.java
@@ -32,22 +32,23 @@ import org.apache.pinot.common.request.TableSegmentsInfo;
 import org.apache.pinot.core.routing.ServerRouteInfo;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.core.transport.BaseTableRouteInfo;
+import org.apache.pinot.core.transport.ImplicitHybridTableRouteInfo;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.core.transport.TableRouteInfo;
+import org.apache.pinot.query.timeboundary.TimeBoundaryStrategy;
 import org.apache.pinot.spi.config.table.QueryConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.query.QueryThreadContext;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 
 public class LogicalTableRouteInfo extends BaseTableRouteInfo {
-  private final LogicalTableConfig _logicalTable;
-  private List<TableRouteInfo> _offlineTables;
-  private List<TableRouteInfo> _realtimeTables;
+  private String _logicalTableName;
+  private List<ImplicitHybridTableRouteInfo> _offlineTables;
+  private List<ImplicitHybridTableRouteInfo> _realtimeTables;
   private TableConfig _offlineTableConfig;
   private TableConfig _realtimeTableConfig;
   private QueryConfig _queryConfig;
@@ -56,15 +57,9 @@ public class LogicalTableRouteInfo extends 
BaseTableRouteInfo {
 
   private BrokerRequest _offlineBrokerRequest;
   private BrokerRequest _realtimeBrokerRequest;
-  private TimeBoundaryInfo _timeBoundaryInfo;
-
-  LogicalTableRouteInfo() {
-    _logicalTable = null;
-  }
 
-  public LogicalTableRouteInfo(LogicalTableConfig logicalTable) {
-    _logicalTable = logicalTable;
-  }
+  private TimeBoundaryStrategy _timeBoundaryStrategy;
+  private TimeBoundaryInfo _timeBoundaryInfo;
 
   @Override
   public Map<ServerRoutingInstance, InstanceRequest> getRequestMap(long 
requestId, String brokerId, boolean preferTls) {
@@ -139,6 +134,15 @@ public class LogicalTableRouteInfo extends 
BaseTableRouteInfo {
     return instanceRequest;
   }
 
+  public void setLogicalTableName(String logicalTableName) {
+    _logicalTableName = logicalTableName;
+  }
+
+  @Nullable
+  public String getLogicalTableName() {
+    return _logicalTableName;
+  }
+
   @Nullable
   @Override
   public TableConfig getOfflineTableConfig() {
@@ -234,15 +238,15 @@ public class LogicalTableRouteInfo extends 
BaseTableRouteInfo {
   @Nullable
   @Override
   public String getOfflineTableName() {
-    return hasOffline() && _logicalTable != null ? 
TableNameBuilder.OFFLINE.tableNameWithType(
-        _logicalTable.getTableName()) : null;
+    return hasOffline() && _logicalTableName != null ? 
TableNameBuilder.OFFLINE.tableNameWithType(_logicalTableName)
+        : null;
   }
 
   @Nullable
   @Override
   public String getRealtimeTableName() {
-    return hasRealtime() && _logicalTable != null ? 
TableNameBuilder.REALTIME.tableNameWithType(
-        _logicalTable.getTableName()) : null;
+    return hasRealtime() && _logicalTableName != null ? 
TableNameBuilder.REALTIME.tableNameWithType(_logicalTableName)
+        : null;
   }
 
   @Nullable
@@ -356,20 +360,20 @@ public class LogicalTableRouteInfo extends 
BaseTableRouteInfo {
   }
 
   @Nullable
-  public List<TableRouteInfo> getOfflineTables() {
+  public List<ImplicitHybridTableRouteInfo> getOfflineTables() {
     return _offlineTables;
   }
 
-  public void setOfflineTables(List<TableRouteInfo> offlineTables) {
+  public void setOfflineTables(List<ImplicitHybridTableRouteInfo> 
offlineTables) {
     _offlineTables = offlineTables;
   }
 
   @Nullable
-  public List<TableRouteInfo> getRealtimeTables() {
+  public List<ImplicitHybridTableRouteInfo> getRealtimeTables() {
     return _realtimeTables;
   }
 
-  public void setRealtimeTables(List<TableRouteInfo> realtimeTables) {
+  public void setRealtimeTables(List<ImplicitHybridTableRouteInfo> 
realtimeTables) {
     _realtimeTables = realtimeTables;
   }
 
@@ -388,4 +392,13 @@ public class LogicalTableRouteInfo extends 
BaseTableRouteInfo {
   public void setRealtimeBrokerRequest(BrokerRequest realtimeBrokerRequest) {
     _realtimeBrokerRequest = realtimeBrokerRequest;
   }
+
+  @Nullable
+  public TimeBoundaryStrategy getTimeBoundaryStrategy() {
+    return _timeBoundaryStrategy;
+  }
+
+  public void setTimeBoundaryStrategy(TimeBoundaryStrategy 
timeBoundaryStrategy) {
+    _timeBoundaryStrategy = timeBoundaryStrategy;
+  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java
index fe3937b8b0..bdce62a938 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java
@@ -25,6 +25,7 @@ import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.core.transport.ImplicitHybridTableRouteInfo;
 import org.apache.pinot.core.transport.TableRouteInfo;
 import org.apache.pinot.query.timeboundary.TimeBoundaryStrategy;
 import org.apache.pinot.query.timeboundary.TimeBoundaryStrategyService;
@@ -41,20 +42,29 @@ public class LogicalTableRouteProvider implements 
TableRouteProvider {
 
   @Override
   public TableRouteInfo getTableRouteInfo(String tableName, TableCache 
tableCache, RoutingManager routingManager) {
-    LogicalTableConfig logicalTable = 
tableCache.getLogicalTableConfig(tableName);
-    if (logicalTable == null) {
-      return new LogicalTableRouteInfo();
-    }
+    LogicalTableRouteInfo logicalTableRouteInfo = new LogicalTableRouteInfo();
+    fillTableConfigMetadata(logicalTableRouteInfo, tableName, tableCache);
+    fillRouteMetadata(logicalTableRouteInfo, routingManager);
+    return logicalTableRouteInfo;
+  }
 
+  public void fillTableConfigMetadata(LogicalTableRouteInfo 
logicalTableRouteInfo, String tableName,
+      TableCache tableCache) {
+    LogicalTableConfig logicalTableConfig = 
tableCache.getLogicalTableConfig(tableName);
+    if (logicalTableConfig == null) {
+      return;
+    }
+    logicalTableRouteInfo.setLogicalTableName(tableName);
     PhysicalTableRouteProvider routeProvider = new 
PhysicalTableRouteProvider();
 
-    List<TableRouteInfo> offlineTables = new ArrayList<>();
-    List<TableRouteInfo> realtimeTables = new ArrayList<>();
-    for (String physicalTableName : 
logicalTable.getPhysicalTableConfigMap().keySet()) {
+    List<ImplicitHybridTableRouteInfo> offlineTables = new ArrayList<>();
+    List<ImplicitHybridTableRouteInfo> realtimeTables = new ArrayList<>();
+    for (String physicalTableName : 
logicalTableConfig.getPhysicalTableConfigMap().keySet()) {
       TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(physicalTableName);
       Preconditions.checkNotNull(tableType);
-      TableRouteInfo physicalTableInfo =
-          routeProvider.getTableRouteInfo(physicalTableName, tableCache, 
routingManager);
+      ImplicitHybridTableRouteInfo physicalTableInfo = new 
ImplicitHybridTableRouteInfo();
+      routeProvider.fillTableConfigMetadata(physicalTableInfo, 
physicalTableName, tableCache);
+
       if (physicalTableInfo.isExists()) {
         if (tableType == TableType.OFFLINE) {
           offlineTables.add(physicalTableInfo);
@@ -64,37 +74,59 @@ public class LogicalTableRouteProvider implements 
TableRouteProvider {
       }
     }
 
-    LogicalTableRouteInfo routeInfo = new LogicalTableRouteInfo(logicalTable);
     if (!offlineTables.isEmpty()) {
-      TableConfig offlineTableConfig = 
tableCache.getTableConfig(logicalTable.getRefOfflineTableName());
+      TableConfig offlineTableConfig = 
tableCache.getTableConfig(logicalTableConfig.getRefOfflineTableName());
       Preconditions.checkNotNull(offlineTableConfig,
-          "Offline table config not found: " + 
logicalTable.getRefOfflineTableName());
-      routeInfo.setOfflineTables(offlineTables);
-      routeInfo.setOfflineTableConfig(offlineTableConfig);
+          "Offline table config not found: " + 
logicalTableConfig.getRefOfflineTableName());
+      logicalTableRouteInfo.setOfflineTables(offlineTables);
+      logicalTableRouteInfo.setOfflineTableConfig(offlineTableConfig);
     }
+
     if (!realtimeTables.isEmpty()) {
-      TableConfig realtimeTableConfig = 
tableCache.getTableConfig(logicalTable.getRefRealtimeTableName());
+      TableConfig realtimeTableConfig = 
tableCache.getTableConfig(logicalTableConfig.getRefRealtimeTableName());
       Preconditions.checkNotNull(realtimeTableConfig,
-          "Realtime table config not found: " + 
logicalTable.getRefRealtimeTableName());
-      routeInfo.setRealtimeTables(realtimeTables);
-      routeInfo.setRealtimeTableConfig(realtimeTableConfig);
+          "Realtime table config not found: " + 
logicalTableConfig.getRefRealtimeTableName());
+      logicalTableRouteInfo.setRealtimeTables(realtimeTables);
+      logicalTableRouteInfo.setRealtimeTableConfig(realtimeTableConfig);
     }
-    routeInfo.setQueryConfig(logicalTable.getQueryConfig());
 
-    TimeBoundaryInfo timeBoundaryInfo;
     if (!offlineTables.isEmpty() && !realtimeTables.isEmpty()) {
-      String boundaryStrategy = 
logicalTable.getTimeBoundaryConfig().getBoundaryStrategy();
+      String boundaryStrategy = 
logicalTableConfig.getTimeBoundaryConfig().getBoundaryStrategy();
       TimeBoundaryStrategy timeBoundaryStrategy =
           
TimeBoundaryStrategyService.getInstance().getTimeBoundaryStrategy(boundaryStrategy);
-      timeBoundaryInfo = 
timeBoundaryStrategy.computeTimeBoundary(logicalTable, tableCache, 
routingManager);
-      if (timeBoundaryInfo == null) {
-        LOGGER.info("No time boundary info found for logical hybrid table: ");
-        routeInfo.setOfflineTables(null);
-      } else {
-        routeInfo.setTimeBoundaryInfo(timeBoundaryInfo);
+      timeBoundaryStrategy.init(logicalTableConfig, tableCache);
+      logicalTableRouteInfo.setTimeBoundaryStrategy(timeBoundaryStrategy);
+    }
+
+    logicalTableRouteInfo.setQueryConfig(logicalTableConfig.getQueryConfig());
+  }
+
+  public void fillRouteMetadata(LogicalTableRouteInfo logicalTableRouteInfo, 
RoutingManager routingManager) {
+    ImplicitHybridTableRouteProvider tableRouteProvider = new 
ImplicitHybridTableRouteProvider();
+    if (logicalTableRouteInfo.getOfflineTables() != null) {
+      for (ImplicitHybridTableRouteInfo routeInfo : 
logicalTableRouteInfo.getOfflineTables()) {
+        tableRouteProvider.fillRouteMetadata(routeInfo, routingManager);
+      }
+    }
+
+    if (logicalTableRouteInfo.getRealtimeTables() != null) {
+      for (ImplicitHybridTableRouteInfo routeInfo : 
logicalTableRouteInfo.getRealtimeTables()) {
+        tableRouteProvider.fillRouteMetadata(routeInfo, routingManager);
+      }
+    }
+
+    if (logicalTableRouteInfo.isHybrid()) {
+      TimeBoundaryStrategy timeBoundaryStrategy = 
logicalTableRouteInfo.getTimeBoundaryStrategy();
+      if (timeBoundaryStrategy != null) {
+        TimeBoundaryInfo timeBoundaryInfo = 
timeBoundaryStrategy.computeTimeBoundary(routingManager);
+        if (timeBoundaryInfo == null) {
+          LOGGER.info("No time boundary info found for logical hybrid table: 
");
+          logicalTableRouteInfo.setOfflineTables(null);
+        } else {
+          logicalTableRouteInfo.setTimeBoundaryInfo(timeBoundaryInfo);
+        }
       }
     }
-    return routeInfo;
   }
 
   @Override
@@ -107,8 +139,7 @@ public class LogicalTableRouteProvider implements 
TableRouteProvider {
 
     if (routeInfo.getOfflineTables() != null) {
       for (TableRouteInfo physicalTableInfo : routeInfo.getOfflineTables()) {
-        routeProvider.calculateRoutes(physicalTableInfo, routingManager, 
offlineBrokerRequest, null,
-            requestId);
+        routeProvider.calculateRoutes(physicalTableInfo, routingManager, 
offlineBrokerRequest, null, requestId);
         numPrunedSegments += physicalTableInfo.getNumPrunedSegmentsTotal();
         if (physicalTableInfo.getUnavailableSegments() != null) {
           
unavailableSegments.addAll(physicalTableInfo.getUnavailableSegments());
@@ -118,8 +149,7 @@ public class LogicalTableRouteProvider implements 
TableRouteProvider {
 
     if (routeInfo.getRealtimeTables() != null) {
       for (TableRouteInfo physicalTableInfo : routeInfo.getRealtimeTables()) {
-        routeProvider.calculateRoutes(physicalTableInfo, routingManager, null, 
realtimeBrokerRequest,
-            requestId);
+        routeProvider.calculateRoutes(physicalTableInfo, routingManager, null, 
realtimeBrokerRequest, requestId);
         numPrunedSegments += physicalTableInfo.getNumPrunedSegmentsTotal();
         if (physicalTableInfo.getUnavailableSegments() != null) {
           
unavailableSegments.addAll(physicalTableInfo.getUnavailableSegments());
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/PhysicalTableRouteProvider.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/PhysicalTableRouteProvider.java
index 287afcdd7b..04d09fd634 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/PhysicalTableRouteProvider.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/PhysicalTableRouteProvider.java
@@ -45,7 +45,6 @@ public class PhysicalTableRouteProvider extends 
ImplicitHybridTableRouteProvider
   @Override
   public void calculateRoutes(TableRouteInfo tableRouteInfo, RoutingManager 
routingManager,
       BrokerRequest offlineBrokerRequest, BrokerRequest realtimeBrokerRequest, 
long requestId) {
-    assert (tableRouteInfo.isExists());
     String offlineTableName = tableRouteInfo.getOfflineTableName();
     String realtimeTableName = tableRouteInfo.getRealtimeTableName();
     Map<ServerInstance, ServerRouteInfo> offlineRoutingTable = null;
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java
index 04be7d3cdf..e204c917da 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.timeboundary;
 
 import com.google.auto.service.AutoService;
 import com.google.common.base.Preconditions;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.config.provider.TableCache;
@@ -36,7 +37,27 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 @AutoService(TimeBoundaryStrategy.class)
 public class MinTimeBoundaryStrategy implements TimeBoundaryStrategy {
 
-  public static final String INCLUDED_TABLES = "includedTables";
+  private static final String INCLUDED_TABLES = "includedTables";
+  Map<String, DateTimeFormatSpec> _dateTimeFormatSpecMap;
+
+  @Override
+  public void init(LogicalTableConfig logicalTableConfig, TableCache 
tableCache) {
+    List<String> includedTables = 
getTimeBoundaryTableNames(logicalTableConfig);
+    _dateTimeFormatSpecMap = new HashMap<>(includedTables.size());
+    for (String physicalTableName : includedTables) {
+      String rawTableName = 
TableNameBuilder.extractRawTableName(physicalTableName);
+      Schema schema = tableCache.getSchema(rawTableName);
+      TableConfig tableConfig = tableCache.getTableConfig(physicalTableName);
+      Preconditions.checkArgument(tableConfig != null, "Table config not found 
for table: %s", physicalTableName);
+      Preconditions.checkArgument(schema != null, "Schema not found for table: 
%s", physicalTableName);
+      String timeColumnName = 
tableConfig.getValidationConfig().getTimeColumnName();
+      DateTimeFieldSpec dateTimeFieldSpec = 
schema.getSpecForTimeColumn(timeColumnName);
+      Preconditions.checkArgument(dateTimeFieldSpec != null, "Time column not 
found in schema for table: %s",
+          physicalTableName);
+      DateTimeFormatSpec specFormatSpec = dateTimeFieldSpec.getFormatSpec();
+      _dateTimeFormatSpecMap.put(physicalTableName, specFormatSpec);
+    }
+  }
 
   @Override
   public String getName() {
@@ -44,29 +65,13 @@ public class MinTimeBoundaryStrategy implements 
TimeBoundaryStrategy {
   }
 
   @Override
-  public TimeBoundaryInfo computeTimeBoundary(LogicalTableConfig 
logicalTableConfig, TableCache tableCache,
-      RoutingManager routingManager) {
+  public TimeBoundaryInfo computeTimeBoundary(RoutingManager routingManager) {
     TimeBoundaryInfo minTimeBoundaryInfo = null;
     long minTimeBoundary = Long.MAX_VALUE;
-    Map<String, Object> parameters = 
logicalTableConfig.getTimeBoundaryConfig().getParameters();
-    List<String> includedTables =
-        parameters != null ? (List) parameters.getOrDefault("includedTables", 
List.of()) : List.of();
-    for (String physicalTableName : includedTables) {
-      TimeBoundaryInfo current = 
routingManager.getTimeBoundaryInfo(physicalTableName);
+    for (Map.Entry<String, DateTimeFormatSpec> entry : 
_dateTimeFormatSpecMap.entrySet()) {
+      TimeBoundaryInfo current = 
routingManager.getTimeBoundaryInfo(entry.getKey());
       if (current != null) {
-        String rawTableName = 
TableNameBuilder.extractRawTableName(physicalTableName);
-        Schema schema = tableCache.getSchema(rawTableName);
-        TableConfig tableConfig = tableCache.getTableConfig(physicalTableName);
-        Preconditions.checkArgument(tableConfig != null,
-            "Table config not found for table: %s", physicalTableName);
-        Preconditions.checkArgument(schema != null,
-            "Schema not found for table: %s", physicalTableName);
-        String timeColumnName = 
tableConfig.getValidationConfig().getTimeColumnName();
-        DateTimeFieldSpec dateTimeFieldSpec = 
schema.getSpecForTimeColumn(timeColumnName);
-        Preconditions.checkArgument(dateTimeFieldSpec != null,
-            "Time column not found in schema for table: %s", 
physicalTableName);
-        DateTimeFormatSpec specFormatSpec = dateTimeFieldSpec.getFormatSpec();
-        long currentTimeBoundaryMillis = 
specFormatSpec.fromFormatToMillis(current.getTimeValue());
+        long currentTimeBoundaryMillis = 
entry.getValue().fromFormatToMillis(current.getTimeValue());
         if (minTimeBoundaryInfo == null) {
           minTimeBoundaryInfo = current;
           minTimeBoundary = currentTimeBoundaryMillis;
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java
index 7a4ee21794..f2215b29b2 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java
@@ -34,16 +34,20 @@ public interface TimeBoundaryStrategy {
    */
   String getName();
 
+  /**
+   * Initializes the time boundary strategy with the given logical table 
configuration and table cache.
+   * @param logicalTableConfig The logical table configuration to use for 
initialization.
+   * @param tableCache The table cache to use for initialization.
+   */
+  void init(LogicalTableConfig logicalTableConfig, TableCache tableCache);
+
   /**
    * Computes the time boundary for the given physical table names.
    *
-   * @param logicalTableConfig The logical table configuration.
-   * @param tableCache The table cache to use for fetching table metadata.
    * @param routingManager The routing manager to use for computing the time 
boundary.
    * @return The computed time boundary information.
    */
-  TimeBoundaryInfo computeTimeBoundary(LogicalTableConfig logicalTableConfig, 
TableCache tableCache,
-      RoutingManager routingManager);
+  TimeBoundaryInfo computeTimeBoundary(RoutingManager routingManager);
 
 
   /**
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java
index 1c08dca4ae..71901f8432 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java
@@ -170,7 +170,7 @@ public class BaseTableRouteTest {
     TimeBoundaryStrategyService mockService = 
mock(TimeBoundaryStrategyService.class);
     when(TimeBoundaryStrategyService.getInstance()).thenReturn(mockService);
     
when(mockService.getTimeBoundaryStrategy(any())).thenReturn(_timeBoundaryStrategy);
-    when(_timeBoundaryStrategy.computeTimeBoundary(any(), any(), 
any())).thenReturn(mock(TimeBoundaryInfo.class));
+    
when(_timeBoundaryStrategy.computeTimeBoundary(any())).thenReturn(mock(TimeBoundaryInfo.class));
   }
 
   @AfterClass
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderCalculateRouteTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderCalculateRouteTest.java
index 6beb5d3ec6..be8606d068 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderCalculateRouteTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderCalculateRouteTest.java
@@ -27,9 +27,9 @@ import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.routing.RoutingTable;
 import org.apache.pinot.core.routing.ServerRouteInfo;
-import org.apache.pinot.core.transport.ImplicitHybridTableRouteInfo;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.transport.TableRouteInfo;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.query.QueryThreadContext;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -59,9 +59,8 @@ public class 
ImplicitHybridTableRouteProviderCalculateRouteTest extends BaseTabl
     }
   }
 
-  private ImplicitHybridTableRouteInfo getImplicitHybridTableRouteInfo(String 
tableName) {
-    ImplicitHybridTableRouteInfo routeInfo =
-        _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, 
_routingManager);
+  private TableRouteInfo getImplicitHybridTableRouteInfo(String tableName) {
+    TableRouteInfo routeInfo = 
_hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, 
_routingManager);
     BrokerRequestPair brokerRequestPair =
         getBrokerRequestPair(tableName, routeInfo.hasOffline(), 
routeInfo.hasRealtime(),
             routeInfo.getOfflineTableName(), routeInfo.getRealtimeTableName());
@@ -73,7 +72,7 @@ public class 
ImplicitHybridTableRouteProviderCalculateRouteTest extends BaseTabl
 
   private void assertTableRoute(String tableName, Map<String, Set<String>> 
expectedOfflineRoutingTable,
       Map<String, Set<String>> expectedRealtimeRoutingTable, boolean 
isOfflineExpected, boolean isRealtimeExpected) {
-    ImplicitHybridTableRouteInfo routeInfo = 
getImplicitHybridTableRouteInfo(tableName);
+    TableRouteInfo routeInfo = getImplicitHybridTableRouteInfo(tableName);
 
     // If a routing table for offline table is expected, then compare it with 
the expected routing table.
     if (isOfflineExpected) {
@@ -281,7 +280,7 @@ public class 
ImplicitHybridTableRouteProviderCalculateRouteTest extends BaseTabl
   private void assertEqualsTableRouteInfoGetTableRouteResult(String tableName,
       Map<String, Set<String>> expectedOfflineRoutingTable,
       Map<String, Set<String>> expectedRealtimeRoutingTable, boolean 
isOfflineExpected, boolean isRealtimeExpected) {
-    ImplicitHybridTableRouteInfo routeInfo = 
getImplicitHybridTableRouteInfo(tableName);
+    TableRouteInfo routeInfo = getImplicitHybridTableRouteInfo(tableName);
     GetTableRouteResult expectedTableRoute = getTableRouting(tableName, 
_routingManager);
 
     if (isOfflineExpected) {
@@ -332,7 +331,7 @@ public class 
ImplicitHybridTableRouteProviderCalculateRouteTest extends BaseTabl
 
   @Test(dataProvider = "routeNotExistsProvider")
   void testTableRoutingForRouteNotExists(String tableName) {
-    ImplicitHybridTableRouteInfo routeInfo = 
getImplicitHybridTableRouteInfo(tableName);
+    TableRouteInfo routeInfo = getImplicitHybridTableRouteInfo(tableName);
     GetTableRouteResult expectedTableRoute = getTableRouting(tableName, 
_routingManager);
 
     assertNull(expectedTableRoute._offlineRoutingTable);
@@ -348,7 +347,7 @@ public class 
ImplicitHybridTableRouteProviderCalculateRouteTest extends BaseTabl
   @Test(dataProvider = "partiallyDisabledTableAndRouteProvider")
   void testTableRoutingForPartiallyDisabledTable(String tableName, Map<String, 
Set<String>> expectedOfflineRoutingTable,
       Map<String, Set<String>> expectedRealtimeRoutingTable) {
-    ImplicitHybridTableRouteInfo routeInfo = 
getImplicitHybridTableRouteInfo(tableName);
+    TableRouteInfo routeInfo = getImplicitHybridTableRouteInfo(tableName);
     GetTableRouteResult expectedTableRoute = getTableRouting(tableName, 
_routingManager);
 
     if (expectedOfflineRoutingTable == null) {
@@ -376,7 +375,7 @@ public class 
ImplicitHybridTableRouteProviderCalculateRouteTest extends BaseTabl
 
   @Test(dataProvider = "disabledTableProvider")
   void testTableRoutingForDisabledTable(String tableName) {
-    ImplicitHybridTableRouteInfo routeInfo = 
getImplicitHybridTableRouteInfo(tableName);
+    TableRouteInfo routeInfo = getImplicitHybridTableRouteInfo(tableName);
     GetTableRouteResult expectedTableRoute = getTableRouting(tableName, 
_routingManager);
 
     if (expectedTableRoute._offlineTableDisabled) {
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderGetTableRouteTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderGetTableRouteTest.java
index 2a5e046f49..ddd34ec9da 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderGetTableRouteTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderGetTableRouteTest.java
@@ -21,7 +21,7 @@ package org.apache.pinot.query.routing.table;
 import org.apache.pinot.common.response.BrokerResponse;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.QueryProcessingException;
-import org.apache.pinot.core.transport.ImplicitHybridTableRouteInfo;
+import org.apache.pinot.core.transport.TableRouteInfo;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.exception.QueryErrorCode;
@@ -41,26 +41,21 @@ import static org.testng.Assert.assertTrue;
 public class ImplicitHybridTableRouteProviderGetTableRouteTest extends 
BaseTableRouteTest {
   @Test(dataProvider = "offlineTableProvider")
   public void testOfflineTable(String parameter) {
-    ImplicitHybridTableRouteInfo routeInfo =
-        _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, 
_routingManager);
-
+    TableRouteInfo routeInfo = 
_hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, 
_routingManager);
     assertTrue(routeInfo.isExists(), "The table should exist");
     assertTrue(routeInfo.isOffline(), "The table should be offline");
   }
 
   @Test(dataProvider = "realtimeTableProvider")
   public void testRealtimeTable(String parameter) {
-    ImplicitHybridTableRouteInfo routeInfo =
-        _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, 
_routingManager);
-
+    TableRouteInfo routeInfo = 
_hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, 
_routingManager);
     assertTrue(routeInfo.isExists(), "The table should exist");
     assertTrue(routeInfo.isRealtime(), "The table should be realtime");
   }
 
   @Test(dataProvider = "hybridTableProvider")
   public void testHybridTable(String parameter) {
-    ImplicitHybridTableRouteInfo routeInfo =
-        _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, 
_routingManager);
+    TableRouteInfo routeInfo = 
_hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, 
_routingManager);
 
     assertTrue(routeInfo.isExists(), "The table should exist");
     assertTrue(routeInfo.isHybrid(), "The table should be hybrid");
@@ -71,62 +66,48 @@ public class 
ImplicitHybridTableRouteProviderGetTableRouteTest extends BaseTable
    */
   @Test
   public void testWithNoTimeBoundary() {
-    ImplicitHybridTableRouteInfo routeInfo =
-        _hybridTableRouteProvider.getTableRouteInfo("b", _tableCache, 
_routingManager);
-
+    TableRouteInfo routeInfo = 
_hybridTableRouteProvider.getTableRouteInfo("b", _tableCache, _routingManager);
     assertTrue(routeInfo.isExists(), "The table should exist");
     assertTrue(routeInfo.isRealtime(), "The table should be realtime");
   }
 
   @Test(dataProvider = "nonExistentTableProvider")
   public void testNonExistentTableName(String parameter) {
-    ImplicitHybridTableRouteInfo routeInfo =
-        _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, 
_routingManager);
-
+    TableRouteInfo routeInfo = 
_hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, 
_routingManager);
     assertFalse(routeInfo.isExists(), "The table should not exist");
   }
 
   @Test(dataProvider = "routeExistsProvider")
   public void testRouteExists(String parameter) {
-    ImplicitHybridTableRouteInfo routeInfo =
-        _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, 
_routingManager);
-
+    TableRouteInfo routeInfo = 
_hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, 
_routingManager);
     assertTrue(routeInfo.isExists(), "The table should exist");
     assertTrue(routeInfo.isRouteExists(), "The table should have route");
   }
 
   @Test(dataProvider = "routeNotExistsProvider")
   public void testRouteNotExists(String parameter) {
-    ImplicitHybridTableRouteInfo routeInfo =
-        _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, 
_routingManager);
-
+    TableRouteInfo routeInfo = 
_hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, 
_routingManager);
     assertTrue(routeInfo.isExists(), "The table should exist");
     assertFalse(routeInfo.isRouteExists(), "The table should not have route");
   }
 
   @Test(dataProvider = "notDisabledTableProvider")
   public void testNotDisabledTable(String parameter) {
-    ImplicitHybridTableRouteInfo routeInfo =
-        _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, 
_routingManager);
-
+    TableRouteInfo routeInfo = 
_hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, 
_routingManager);
     assertTrue(routeInfo.isExists(), "The table should exist");
     assertFalse(routeInfo.isDisabled(), "The table should not be disabled");
   }
 
   @Test(dataProvider = "partiallyDisabledTableProvider")
   public void testPartiallyDisabledTable(String parameter) {
-    ImplicitHybridTableRouteInfo routeInfo =
-        _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, 
_routingManager);
-
+    TableRouteInfo routeInfo = 
_hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, 
_routingManager);
     assertTrue(routeInfo.isExists(), "The table should exist");
     assertFalse(routeInfo.isDisabled(), "The table should be disabled");
   }
 
   @Test(dataProvider = "disabledTableProvider")
   public void testDisabledTable(String parameter) {
-    ImplicitHybridTableRouteInfo routeInfo =
-        _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, 
_routingManager);
-
+    TableRouteInfo routeInfo = 
_hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache, 
_routingManager);
     assertTrue(routeInfo.isExists(), "The table should exist");
     assertTrue(routeInfo.isDisabled(), "The table should not have route");
   }
@@ -158,10 +139,8 @@ public class 
ImplicitHybridTableRouteProviderGetTableRouteTest extends BaseTable
     /**
      * Similar if offlineTableName is not null and there is a route in the 
routing routeInfo. Same for
      * realtimeTableName.
-     * @param routeInfo
-     * @return
      */
-    boolean similar(ImplicitHybridTableRouteInfo routeInfo) {
+    boolean similar(TableRouteInfo routeInfo) {
       boolean isEquals = true;
 
       if (_offlineTableName != null) {
@@ -272,13 +251,11 @@ public class 
ImplicitHybridTableRouteProviderGetTableRouteTest extends BaseTable
   /**
    * This test checks tables that have a table config and an entry in routing 
manager.
    * It makes sure that getTableNameAndConfig() behaves the same way as 
ImplicitTableRouteComputer.
-   * @param tableName
    */
   @Test(dataProvider = "tableNameAndConfigSuccessProvider")
   public void testTableNameAndConfigSuccess(String tableName) {
     TableNameAndConfig tableNameAndConfig = getTableNameAndConfig(tableName);
-    ImplicitHybridTableRouteInfo routeInfo =
-        _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, 
_routingManager);
+    TableRouteInfo routeInfo = 
_hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, 
_routingManager);
     assertTrue(tableNameAndConfig.similar(routeInfo), "The table name and 
config should match the hybrid table");
   }
 
@@ -303,13 +280,11 @@ public class 
ImplicitHybridTableRouteProviderGetTableRouteTest extends BaseTable
 
   /**
    * This test checks tables that do not have a table config or an entry in 
routing manager.
-   * @param tableName
    */
   @Test(dataProvider = "tableNameAndConfigFailureProvider")
   public void testTableNameAndConfigFailure(String tableName) {
     TableNameAndConfig tableNameAndConfig = getTableNameAndConfig(tableName);
-    ImplicitHybridTableRouteInfo routeInfo =
-        _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, 
_routingManager);
+    TableRouteInfo routeInfo = 
_hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, 
_routingManager);
 
     // getTableNameAndConfig() returns an error as a BrokerResponse with the 
right error code.
     assertNotNull(tableNameAndConfig._brokerResponse);
@@ -373,12 +348,10 @@ public class 
ImplicitHybridTableRouteProviderGetTableRouteTest extends BaseTable
   /**
    * If a table is not disabled, then checkTableDisabled() should return null.
    * ImplicitTableRouteComputer should not be disabled.
-   * @param tableName
    */
   @Test(dataProvider = "notDisabledTableProvider")
   public void testNotDisabledWithCheckDisabled(String tableName) {
-    ImplicitHybridTableRouteInfo routeInfo =
-        _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, 
_routingManager);
+    TableRouteInfo routeInfo = 
_hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, 
_routingManager);
 
     ExceptionOrResponse exceptionOrResponse =
         checkTableDisabled(routeInfo.hasOffline() && 
routeInfo.isOfflineTableDisabled(),
@@ -393,12 +366,10 @@ public class 
ImplicitHybridTableRouteProviderGetTableRouteTest extends BaseTable
   /**
    * In a hybrid table, if one of the tables is disabled, then 
checkTableDisabled() should return an exception.
    * ImplicitTableRouteComputer should not be disabled.
-   * @param tableName
    */
   @Test(dataProvider = "partiallyDisabledTableProvider")
   public void testPartiallyDisabledWithCheckDisabled(String tableName) {
-    ImplicitHybridTableRouteInfo routeInfo =
-        _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, 
_routingManager);
+    TableRouteInfo routeInfo = 
_hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, 
_routingManager);
 
     ExceptionOrResponse exceptionOrResponse =
         checkTableDisabled(routeInfo.hasOffline() && 
routeInfo.isOfflineTableDisabled(),
@@ -416,12 +387,10 @@ public class 
ImplicitHybridTableRouteProviderGetTableRouteTest extends BaseTable
    * If a table is disabled, then checkTableDisabled() should return a broker 
response with error code
    * TABLE_IS_DISABLED.
    * ImplicitTableRouteComputer should be disabled.
-   * @param tableName
    */
   @Test(dataProvider = "disabledTableProvider")
   public void testDisabledWithCheckDisabled(String tableName) {
-    ImplicitHybridTableRouteInfo routeInfo =
-        _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, 
_routingManager);
+    TableRouteInfo routeInfo = 
_hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache, 
_routingManager);
 
     ExceptionOrResponse exceptionOrResponse =
         checkTableDisabled(routeInfo.hasOffline() && 
routeInfo.isOfflineTableDisabled(),
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategyTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategyTest.java
index 464ff805d9..7e0ce2d2fe 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategyTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategyTest.java
@@ -84,7 +84,6 @@ public class MinTimeBoundaryStrategyTest {
 
     return new Object[][]{
         {timeBoundaryInfoMap, List.of("table3_OFFLINE"), "table3_OFFLINE"},
-        {timeBoundaryInfoMap, List.of("Invalid_OFFLINE"), "Invalid_OFFLINE"},
         {timeBoundaryInfoMap, List.of("table2_OFFLINE", "table3_OFFLINE"), 
"table2_OFFLINE"},
         {timeBoundaryInfoMap, List.of("table3_OFFLINE", "table2_OFFLINE", 
"table4_OFFLINE"), "table2_OFFLINE"},
         {timeBoundaryInfoMap, List.of(), "empty_includedTables_OFFLINE"}
@@ -102,8 +101,8 @@ public class MinTimeBoundaryStrategyTest {
   private void testComputeTimeBoundary(Map<String, TimeBoundaryInfo> 
timeBoundaryInfoMap, String expectedTableName,
       Map<String, Object> parameters) {
     setupMocks(timeBoundaryInfoMap);
-    TimeBoundaryInfo timeBoundaryInfo = 
_minTimeBoundaryStrategy.computeTimeBoundary(
-        createLogicalTableConfig(parameters), _mockTableCache, 
_mockRoutingManager);
+    _minTimeBoundaryStrategy.init(createLogicalTableConfig(parameters), 
_mockTableCache);
+    TimeBoundaryInfo timeBoundaryInfo = 
_minTimeBoundaryStrategy.computeTimeBoundary(_mockRoutingManager);
     assertSame(timeBoundaryInfo, timeBoundaryInfoMap.get(expectedTableName));
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index 8bd40d51c0..b0eb2e12d0 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -36,10 +36,12 @@ import org.apache.pinot.common.request.Expression;
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.common.request.QuerySource;
+import org.apache.pinot.common.request.TableSegmentsInfo;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.LogicalTableContext;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.optimizer.QueryOptimizer;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
@@ -112,8 +114,14 @@ public class ServerPlanRequestUtils {
     // 2. Convert PinotQuery into InstanceRequest list (one for each physical 
table)
     PinotQuery pinotQuery = serverContext.getPinotQuery();
     pinotQuery.setExplain(explain);
-    List<InstanceRequest> instanceRequests =
-        constructServerQueryRequests(executionContext, pinotQuery, 
leafQueryExecutor.getInstanceDataManager());
+    List<InstanceRequest> instanceRequests;
+    if (executionContext.getWorkerMetadata().getLogicalTableSegmentsMap() != 
null) {
+      instanceRequests = 
constructLogicalTableServerQueryRequests(executionContext, pinotQuery,
+          leafQueryExecutor.getInstanceDataManager());
+    } else {
+      instanceRequests =
+          constructServerQueryRequests(executionContext, pinotQuery, 
leafQueryExecutor.getInstanceDataManager());
+    }
     int numRequests = instanceRequests.size();
     List<ServerQueryRequest> serverQueryRequests = new 
ArrayList<>(numRequests);
     for (InstanceRequest instanceRequest : instanceRequests) {
@@ -163,16 +171,20 @@ public class ServerPlanRequestUtils {
         TableDataManager tableDataManager = 
instanceDataManager.getTableDataManager(offlineTableName);
         Preconditions.checkState(tableDataManager != null, "Failed to find 
data manager for table: %s",
             offlineTableName);
-        return List.of(compileInstanceRequest(executionContext, pinotQuery, 
timeBoundary, TableType.OFFLINE, segments,
-            tableDataManager));
+        Pair<TableConfig, Schema> tableConfigAndSchema = 
tableDataManager.getCachedTableConfigAndSchema();
+        return List.of(compileInstanceRequest(executionContext, pinotQuery, 
timeBoundary, TableType.OFFLINE,
+            tableDataManager.getTableName(), tableConfigAndSchema.getLeft(), 
tableConfigAndSchema.getRight(), segments,
+            null));
       } else {
         assert tableType.equals(TableType.REALTIME.name());
         String realtimeTableName = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName);
         TableDataManager tableDataManager = 
instanceDataManager.getTableDataManager(realtimeTableName);
         Preconditions.checkState(tableDataManager != null, "Failed to find 
data manager for table: %s",
             realtimeTableName);
-        return List.of(compileInstanceRequest(executionContext, pinotQuery, 
timeBoundary, TableType.REALTIME, segments,
-            tableDataManager));
+        Pair<TableConfig, Schema> tableConfigAndSchema = 
tableDataManager.getCachedTableConfigAndSchema();
+        return List.of(compileInstanceRequest(executionContext, pinotQuery, 
timeBoundary, TableType.REALTIME,
+            tableDataManager.getTableName(), tableConfigAndSchema.getLeft(), 
tableConfigAndSchema.getRight(), segments,
+            null));
       }
     } else {
       assert numRequests == 2;
@@ -183,16 +195,21 @@ public class ServerPlanRequestUtils {
       TableDataManager offlineTableDataManager = 
instanceDataManager.getTableDataManager(offlineTableName);
       Preconditions.checkState(offlineTableDataManager != null, "Failed to 
find data manager for table: %s",
           offlineTableName);
+      Pair<TableConfig, Schema> offlineTableConfigAndSchema = 
offlineTableDataManager.getCachedTableConfigAndSchema();
       String realtimeTableName = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName);
       TableDataManager realtimeTableDataManager = 
instanceDataManager.getTableDataManager(realtimeTableName);
       Preconditions.checkState(realtimeTableDataManager != null, "Failed to 
find data manager for table: %s",
           realtimeTableName);
+      Pair<TableConfig, Schema> realtimeTableConfigAndSchema =
+          realtimeTableDataManager.getCachedTableConfigAndSchema();
       // NOTE: Make a deep copy of PinotQuery for OFFLINE request.
       return List.of(
           compileInstanceRequest(executionContext, new PinotQuery(pinotQuery), 
timeBoundary, TableType.OFFLINE,
-              offlineSegments, offlineTableDataManager),
-          compileInstanceRequest(executionContext, pinotQuery, timeBoundary, 
TableType.REALTIME, realtimeSegments,
-              realtimeTableDataManager));
+              offlineTableDataManager.getTableName(), 
offlineTableConfigAndSchema.getLeft(),
+              offlineTableConfigAndSchema.getRight(), offlineSegments, null),
+          compileInstanceRequest(executionContext, pinotQuery, timeBoundary, 
TableType.REALTIME,
+              realtimeTableDataManager.getTableName(), 
realtimeTableConfigAndSchema.getLeft(),
+              realtimeTableConfigAndSchema.getRight(), realtimeSegments, 
null));
     }
   }
 
@@ -200,13 +217,16 @@ public class ServerPlanRequestUtils {
    * Convert {@link PinotQuery} into an {@link InstanceRequest}.
    */
   private static InstanceRequest 
compileInstanceRequest(OpChainExecutionContext executionContext, PinotQuery 
pinotQuery,
-      @Nullable TimeBoundaryInfo timeBoundaryInfo, TableType tableType, 
List<String> segmentList,
-      TableDataManager tableDataManager) {
+      @Nullable TimeBoundaryInfo timeBoundaryInfo, TableType tableType,
+      String tableNameWithType, TableConfig tableConfig, Schema schema, 
@Nullable List<String> segmentList,
+      @Nullable List<TableSegmentsInfo> tableRouteInfoList) {
+    Preconditions.checkArgument(segmentList == null || tableRouteInfoList == 
null,
+        "Either segmentList OR tableRouteInfoList should be set");
+
     // Making a unique requestId for leaf stages otherwise it causes problem 
on stats/metrics/tracing.
     long requestId = (executionContext.getRequestId() << 16) + ((long) 
executionContext.getStageId() << 8) + (
         tableType == TableType.REALTIME ? 1 : 0);
     // 1. Modify the PinotQuery
-    String tableNameWithType = tableDataManager.getTableName();
     pinotQuery.getDataSource().setTableName(tableNameWithType);
     if (timeBoundaryInfo != null) {
       attachTimeBoundary(pinotQuery, timeBoundaryInfo, tableType == 
TableType.OFFLINE);
@@ -214,8 +234,7 @@ public class ServerPlanRequestUtils {
     for (QueryRewriter queryRewriter : QUERY_REWRITERS) {
       pinotQuery = queryRewriter.rewrite(pinotQuery);
     }
-    Pair<TableConfig, Schema> tableConfigAndSchema = 
tableDataManager.getCachedTableConfigAndSchema();
-    QUERY_OPTIMIZER.optimize(pinotQuery, tableConfigAndSchema.getLeft(), 
tableConfigAndSchema.getRight());
+    QUERY_OPTIMIZER.optimize(pinotQuery, tableConfig, schema);
 
     // 2. Update query options according to requestMetadataMap
     updateQueryOptions(pinotQuery, executionContext);
@@ -233,7 +252,17 @@ public class ServerPlanRequestUtils {
     instanceRequest.setCid(QueryThreadContext.getCid());
     instanceRequest.setBrokerId("unknown");
     instanceRequest.setEnableTrace(executionContext.isTraceEnabled());
-    instanceRequest.setSearchSegments(segmentList);
+    /*
+      * If segmentList is not null, it means that the query is for a single 
table and we can directly set the segments.
+      * If segmentList is null, it means that the query is for a logical table 
and we need to set TableSegmentInfoList
+      *
+      * Either one of segmentList or tableRouteInfoList has to be set, but not 
both.
+     */
+    if (segmentList != null) {
+      instanceRequest.setSearchSegments(segmentList);
+    } else {
+      instanceRequest.setTableSegmentsInfoList(tableRouteInfoList);
+    }
     instanceRequest.setQuery(brokerRequest);
 
     return instanceRequest;
@@ -386,4 +415,69 @@ public class ServerPlanRequestUtils {
     }
     return expressions;
   }
+
+  private static List<InstanceRequest> 
constructLogicalTableServerQueryRequests(
+      OpChainExecutionContext executionContext, PinotQuery pinotQuery, 
InstanceDataManager instanceDataManager) {
+    StageMetadata stageMetadata = executionContext.getStageMetadata();
+    String logicalTableName = stageMetadata.getTableName();
+    LogicalTableContext logicalTableContext = 
instanceDataManager.getLogicalTableContext(logicalTableName);
+    Preconditions.checkNotNull(logicalTableContext,
+        "LogicalTableManager not found for logical table name: " + 
logicalTableName);
+
+    Map<String, List<String>> logicalTableSegmentsMap =
+        executionContext.getWorkerMetadata().getLogicalTableSegmentsMap();
+    List<TableSegmentsInfo> offlineTableRouteInfoList = new ArrayList<>();
+    List<TableSegmentsInfo> realtimeTableRouteInfoList = new ArrayList<>();
+
+    Preconditions.checkNotNull(logicalTableSegmentsMap);
+    for (Map.Entry<String, List<String>> entry: 
logicalTableSegmentsMap.entrySet()) {
+      String physicalTableName = entry.getKey();
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(physicalTableName);
+      TableSegmentsInfo tableSegmentsInfo = new TableSegmentsInfo();
+      tableSegmentsInfo.setTableName(physicalTableName);
+      tableSegmentsInfo.setSegments(entry.getValue());
+      if (tableType == TableType.REALTIME) {
+        realtimeTableRouteInfoList.add(tableSegmentsInfo);
+      } else {
+        offlineTableRouteInfoList.add(tableSegmentsInfo);
+      }
+    }
+
+    TimeBoundaryInfo timeBoundaryInfo = stageMetadata.getTimeBoundary();
+
+    if (offlineTableRouteInfoList.isEmpty() || 
realtimeTableRouteInfoList.isEmpty()) {
+      List<TableSegmentsInfo> routeInfoList =
+          offlineTableRouteInfoList.isEmpty() ? realtimeTableRouteInfoList : 
offlineTableRouteInfoList;
+      String tableType = offlineTableRouteInfoList.isEmpty() ? 
TableType.REALTIME.name() : TableType.OFFLINE.name();
+      if (tableType.equals(TableType.OFFLINE.name())) {
+        
Preconditions.checkNotNull(logicalTableContext.getRefOfflineTableConfig());
+        String offlineTableName = 
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(logicalTableName);
+        return List.of(
+            compileInstanceRequest(executionContext, pinotQuery, 
timeBoundaryInfo, TableType.OFFLINE, offlineTableName,
+                logicalTableContext.getRefOfflineTableConfig(), 
logicalTableContext.getLogicalTableSchema(), null,
+                routeInfoList));
+      } else {
+        
Preconditions.checkNotNull(logicalTableContext.getRefRealtimeTableConfig());
+        String realtimeTableName = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(logicalTableName);
+        return List.of(
+            compileInstanceRequest(executionContext, pinotQuery, 
timeBoundaryInfo, TableType.REALTIME,
+                realtimeTableName, 
logicalTableContext.getRefRealtimeTableConfig(),
+                logicalTableContext.getLogicalTableSchema(), null, 
routeInfoList));
+      }
+    } else {
+      
Preconditions.checkNotNull(logicalTableContext.getRefOfflineTableConfig());
+      
Preconditions.checkNotNull(logicalTableContext.getRefRealtimeTableConfig());
+      String offlineTableName = 
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(logicalTableName);
+      String realtimeTableName = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(logicalTableName);
+      PinotQuery offlinePinotQuery = pinotQuery.deepCopy();
+      PinotQuery realtimePinotQuery = pinotQuery.deepCopy();
+      return List.of(
+          compileInstanceRequest(executionContext, offlinePinotQuery, 
timeBoundaryInfo, TableType.OFFLINE,
+              offlineTableName, logicalTableContext.getRefOfflineTableConfig(),
+              logicalTableContext.getLogicalTableSchema(), null, 
offlineTableRouteInfoList),
+          compileInstanceRequest(executionContext, realtimePinotQuery, 
timeBoundaryInfo, TableType.REALTIME,
+              realtimeTableName, 
logicalTableContext.getRefRealtimeTableConfig(),
+              logicalTableContext.getLogicalTableSchema(), null, 
realtimeTableRouteInfoList));
+    }
+  }
 }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 292833ff85..c71321b6d1 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -47,6 +47,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.LogicalTableContext;
 import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
 import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
@@ -64,6 +65,7 @@ import 
org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.plugin.PluginManager;
@@ -517,4 +519,42 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
       });
     }
   }
+
+  // TODO: LogicalTableContext has to be cached. 
https://github.com/apache/pinot/issues/15859
+  @Nullable
+  @Override
+  public LogicalTableContext getLogicalTableContext(String logicalTableName) {
+    Schema schema = ZKMetadataProvider.getSchema(getPropertyStore(), 
logicalTableName);
+    if (schema == null) {
+      LOGGER.warn("Failed to find schema for logical table: {}, skipping", 
logicalTableName);
+      return null;
+    }
+    LogicalTableConfig logicalTableConfig = 
ZKMetadataProvider.getLogicalTableConfig(getPropertyStore(),
+        logicalTableName);
+    if (logicalTableConfig == null) {
+      LOGGER.warn("Failed to find logical table config for logical table: {}, 
skipping", logicalTableName);
+      return null;
+    }
+
+    TableConfig offlineTableConfig = null;
+    if (logicalTableConfig.getRefOfflineTableName() != null) {
+      offlineTableConfig = 
ZKMetadataProvider.getOfflineTableConfig(getPropertyStore(),
+          logicalTableConfig.getRefOfflineTableName());
+      if (offlineTableConfig == null) {
+        LOGGER.warn("Failed to find offline table config for logical table: 
{}, skipping", logicalTableName);
+        return null;
+      }
+    }
+
+    TableConfig realtimeTableConfig = null;
+    if (logicalTableConfig.getRefRealtimeTableName() != null) {
+      realtimeTableConfig = 
ZKMetadataProvider.getRealtimeTableConfig(getPropertyStore(),
+          logicalTableConfig.getRefRealtimeTableName());
+      if (realtimeTableConfig == null) {
+        LOGGER.warn("Failed to find realtime table config for logical table: 
{}, skipping", logicalTableName);
+        return null;
+      }
+    }
+    return new LogicalTableContext(logicalTableConfig, schema, 
offlineTableConfig, realtimeTableConfig);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to