This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cd508d7a8 Cache configs for logical table context in server (#15881)
4cd508d7a8 is described below

commit 4cd508d7a84e653b1d85eca93e270988423385a6
Author: Abhishek Bafna <aba...@startree.ai>
AuthorDate: Tue Jun 10 12:55:31 2025 +0530

    Cache configs for logical table context in server (#15881)
---
 .../HelixExternalViewBasedQueryQuotaManager.java   |   3 +-
 .../config/provider/LogicalTableMetadataCache.java | 321 +++++++++++++++++++++
 .../pinot/common/config/provider/TableCache.java   |  15 +-
 .../pinot/common/metadata/ZKMetadataProvider.java  |   6 +-
 .../PinotHelixPropertyStoreZnRecordProvider.java   |   3 +-
 .../controller/helix/ControllerRequestClient.java  |  11 +
 .../helix/core/PinotHelixResourceManager.java      |   6 +
 .../pinot/controller/helix/ControllerTest.java     |   5 +
 .../helix/LogicalTableMetadataCacheTest.java       | 260 +++++++++++++++++
 .../BaseLogicalTableIntegrationTest.java           |   4 +-
 .../plan/server/ServerPlanRequestUtils.java        |  13 +-
 .../starter/helix/HelixInstanceDataManager.java    |  22 +-
 .../apache/pinot/spi/data/LogicalTableConfig.java  |  25 ++
 .../apache/pinot/spi/utils/CommonConstants.java    |  10 +
 14 files changed, 676 insertions(+), 28 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index 7bb8641271..46009aab74 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -51,6 +51,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.CommonConstants.ZkPaths;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -960,6 +961,6 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
   }
 
   private String constructLogicalTableConfigPath(String tableName) {
-    return "/LOGICAL/TABLE/" + tableName;
+    return ZkPaths.LOGICAL_TABLE_PATH_PREFIX + tableName;
   }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/LogicalTableMetadataCache.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/LogicalTableMetadataCache.java
new file mode 100644
index 0000000000..62982ea680
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/LogicalTableMetadataCache.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.config.provider;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.helix.AccessOption;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.zkclient.IZkChildListener;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.utils.LogicalTableConfigUtils;
+import org.apache.pinot.common.utils.SchemaUtils;
+import org.apache.pinot.common.utils.config.TableConfigUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants.ZkPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * LogicalTableMetadataCache maintains the cache for logical tables, that 
includes the logical table configs,
+ * logical table schemas, and reference offline and realtime table configs.
+ * It listens to changes in the ZK property store for all the logical table 
configs and updates the cache accordingly.
+ * For schema and table configs, it listens to only those configs that are 
required by the logical tables.
+ */
+public class LogicalTableMetadataCache {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LogicalTableMetadataCache.class);
+
+  private final Map<String, LogicalTableConfig> _logicalTableConfigMap = new 
ConcurrentHashMap<>();
+  private final Map<String, Schema> _schemaMap = new ConcurrentHashMap<>();
+  private final Map<String, TableConfig> _tableConfigMap = new 
ConcurrentHashMap<>();
+  private final Map<String, List<String>> _tableNameToLogicalTableNamesMap = 
new ConcurrentHashMap<>();
+
+  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  private ZkTableConfigChangeListener _zkTableConfigChangeListener;
+  private ZkSchemaChangeListener _zkSchemaChangeListener;
+  private ZkLogicalTableConfigChangeListener 
_zkLogicalTableConfigChangeListener;
+
+  public void init(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    _propertyStore = propertyStore;
+    _zkTableConfigChangeListener = new ZkTableConfigChangeListener();
+    _zkSchemaChangeListener = new ZkSchemaChangeListener();
+    _zkLogicalTableConfigChangeListener = new 
ZkLogicalTableConfigChangeListener();
+
+    // Add child listeners to the property store for logical table config 
changes
+    _propertyStore.subscribeChildChanges(ZkPaths.LOGICAL_TABLE_PARENT_PATH, 
_zkLogicalTableConfigChangeListener);
+
+    LOGGER.info("Logical table metadata cache initialized");
+  }
+
+  public void shutdown() {
+    // Unsubscribe from the logical table config creation changes
+    _propertyStore.unsubscribeChildChanges(ZkPaths.LOGICAL_TABLE_PARENT_PATH, 
_zkLogicalTableConfigChangeListener);
+
+    // Unsubscribe from all logical table config paths, table config paths, 
and schema paths
+    unsubscribeDataChanges(_logicalTableConfigMap.keySet(), 
ZkPaths.LOGICAL_TABLE_PATH_PREFIX,
+        _zkLogicalTableConfigChangeListener);
+    unsubscribeDataChanges(_tableConfigMap.keySet(), 
ZkPaths.TABLE_CONFIG_PATH_PREFIX, _zkTableConfigChangeListener);
+    unsubscribeDataChanges(_schemaMap.keySet(), ZkPaths.SCHEMA_PATH_PREFIX, 
_zkSchemaChangeListener);
+
+    // Clear all caches
+    _logicalTableConfigMap.clear();
+    _schemaMap.clear();
+    _tableConfigMap.clear();
+    _tableNameToLogicalTableNamesMap.clear();
+
+    LOGGER.info("Logical table metadata cache shutdown");
+  }
+
+  private void unsubscribeDataChanges(Set<String> resourceNames, String 
pathPrefix,
+      IZkDataListener changeListener) {
+    for (String resource : resourceNames) {
+      String logicalTableConfigPath = pathPrefix + resource;
+      _propertyStore.unsubscribeDataChanges(logicalTableConfigPath, 
changeListener);
+    }
+  }
+
+  @Nullable
+  public Schema getSchema(String schemaName) {
+    return _schemaMap.get(schemaName);
+  }
+
+  @Nullable
+  public TableConfig getTableConfig(String tableName) {
+    return _tableConfigMap.get(tableName);
+  }
+
+  @Nullable
+  public LogicalTableConfig getLogicalTableConfig(String logicalTableName) {
+    return _logicalTableConfigMap.get(logicalTableName);
+  }
+
+  private class ZkTableConfigChangeListener implements IZkDataListener {
+
+    @Override
+    public synchronized void handleDataChange(String path, Object data) {
+      if (data != null) {
+        ZNRecord znRecord = (ZNRecord) data;
+        try {
+          TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord);
+          _tableConfigMap.put(tableConfig.getTableName(), tableConfig);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while refreshing table config for 
ZNRecord: {}", znRecord.getId(), e);
+        }
+      }
+    }
+
+    @Override
+    public synchronized void handleDataDeleted(String path) {
+      // no-op, table config should not be deleted while referenced in the 
logical table config
+    }
+  }
+
+  private class ZkSchemaChangeListener implements IZkDataListener {
+
+    @Override
+    public synchronized void handleDataChange(String path, Object data) {
+      if (data != null) {
+        ZNRecord znRecord = (ZNRecord) data;
+        try {
+          Schema schema = SchemaUtils.fromZNRecord(znRecord);
+          _schemaMap.put(schema.getSchemaName(), schema);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while refreshing schema for ZNRecord: 
{}", znRecord.getId(), e);
+        }
+      }
+    }
+
+    @Override
+    public synchronized void handleDataDeleted(String path) {
+      // no-op, schema should not be deleted before the logical table config
+    }
+  }
+
+  private class ZkLogicalTableConfigChangeListener implements 
IZkChildListener, IZkDataListener {
+
+    @Override
+    public synchronized void handleChildChange(String path, List<String> 
logicalTableNames) {
+      if (CollectionUtils.isEmpty(logicalTableNames)) {
+        return;
+      }
+
+      // Only process new added logical tables. Changed/removed logical tables 
are handled by other callbacks.
+      List<String> pathsToAdd = new ArrayList<>();
+      for (String logicalTableName : logicalTableNames) {
+        if (!_logicalTableConfigMap.containsKey(logicalTableName)) {
+          pathsToAdd.add(ZkPaths.LOGICAL_TABLE_PATH_PREFIX + logicalTableName);
+        }
+      }
+      if (!pathsToAdd.isEmpty()) {
+        addLogicalTableConfigs(pathsToAdd);
+      }
+    }
+
+    @Override
+    public synchronized void handleDataChange(String path, Object data) {
+      if (data != null) {
+        updateLogicalTableConfig((ZNRecord) data);
+      }
+    }
+
+    @Override
+    public synchronized void handleDataDeleted(String path) {
+      // NOTE: The path here is the absolute ZK path instead of the relative 
path to the property store.
+      String logicalTableName = path.substring(path.lastIndexOf('/') + 1);
+      removeLogicalTableConfig(logicalTableName);
+    }
+
+    private synchronized void addLogicalTableConfigs(List<String> pathsToAdd) {
+      for (String path : pathsToAdd) {
+        ZNRecord znRecord = _propertyStore.get(path, null, 
AccessOption.PERSISTENT);
+        if (znRecord != null) {
+          try {
+            LogicalTableConfig logicalTableConfig = 
LogicalTableConfigUtils.fromZNRecord(znRecord);
+            String logicalTableName = logicalTableConfig.getTableName();
+
+            if (logicalTableConfig.getRefOfflineTableName() != null) {
+              addTableConfig(logicalTableConfig.getRefOfflineTableName(), 
logicalTableName);
+            }
+            if (logicalTableConfig.getRefRealtimeTableName() != null) {
+              addTableConfig(logicalTableConfig.getRefRealtimeTableName(), 
logicalTableName);
+            }
+
+            addSchema(logicalTableName);
+            _logicalTableConfigMap.put(logicalTableName, logicalTableConfig);
+            String logicalTableConfigPath = ZkPaths.LOGICAL_TABLE_PATH_PREFIX 
+ logicalTableName;
+            _propertyStore.subscribeDataChanges(logicalTableConfigPath, 
_zkLogicalTableConfigChangeListener);
+            LOGGER.info("Added the logical table config: {} in cache", 
logicalTableName);
+          } catch (Exception e) {
+            LOGGER.error("Caught exception while refreshing logical table 
config for ZNRecord: {}", znRecord.getId(),
+                e);
+          }
+        }
+      }
+    }
+
+    private synchronized void addTableConfig(String tableName, String 
logicalTableName) {
+      TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableName);
+      Preconditions.checkArgument(tableConfig != null, "Failed to find table 
config for table: %s", tableName);
+      _tableNameToLogicalTableNamesMap.computeIfAbsent(tableName, k -> new 
ArrayList<>())
+          .add(logicalTableName);
+      _tableConfigMap.put(tableName, tableConfig);
+      String path = ZkPaths.TABLE_CONFIG_PATH_PREFIX + tableName;
+      _propertyStore.subscribeDataChanges(path, _zkTableConfigChangeListener);
+      LOGGER.info("Added the table config: {} in cache for logical table: {}", 
tableName, logicalTableName);
+    }
+
+    private synchronized void addSchema(String logicalTableName) {
+      Schema schema = ZKMetadataProvider.getSchema(_propertyStore, 
logicalTableName);
+      Preconditions.checkArgument(schema != null,
+          "Failed to find schema for logical table: %s", logicalTableName);
+      _schemaMap.put(schema.getSchemaName(), schema);
+      String schemaPath = ZkPaths.SCHEMA_PATH_PREFIX + schema.getSchemaName();
+      _propertyStore.subscribeDataChanges(schemaPath, _zkSchemaChangeListener);
+      LOGGER.info("Added the schema: {} in cache for logical table: {}", 
schema.getSchemaName(), logicalTableName);
+    }
+
+    private synchronized void updateLogicalTableConfig(ZNRecord znRecord) {
+      try {
+        LogicalTableConfig logicalTableConfig = 
LogicalTableConfigUtils.fromZNRecord(znRecord);
+        String logicalTableName = logicalTableConfig.getTableName();
+        LogicalTableConfig oldLogicalTableConfig = 
_logicalTableConfigMap.put(logicalTableName, logicalTableConfig);
+        Preconditions.checkArgument(oldLogicalTableConfig != null,
+            "Logical table config for logical table: %s should have been 
created before", logicalTableName);
+
+        // Remove the old table configs from the table config map
+        if (oldLogicalTableConfig.getRefOfflineTableName() != null
+            && 
!oldLogicalTableConfig.getRefOfflineTableName().equals(logicalTableConfig.getRefOfflineTableName()))
 {
+          removeTableConfig(oldLogicalTableConfig.getRefOfflineTableName(), 
logicalTableName);
+        }
+        if (oldLogicalTableConfig.getRefRealtimeTableName() != null
+            && 
!oldLogicalTableConfig.getRefRealtimeTableName().equals(logicalTableConfig.getRefRealtimeTableName()))
 {
+          removeTableConfig(oldLogicalTableConfig.getRefRealtimeTableName(), 
logicalTableName);
+        }
+
+        // Add the new table configs to the table config map
+        if (logicalTableConfig.getRefOfflineTableName() != null
+            && 
!logicalTableConfig.getRefOfflineTableName().equals(oldLogicalTableConfig.getRefOfflineTableName()))
 {
+          addTableConfig(logicalTableConfig.getRefOfflineTableName(), 
logicalTableName);
+        }
+        if (logicalTableConfig.getRefRealtimeTableName() != null
+            && 
!logicalTableConfig.getRefRealtimeTableName().equals(oldLogicalTableConfig.getRefRealtimeTableName()))
 {
+          addTableConfig(logicalTableConfig.getRefRealtimeTableName(), 
logicalTableName);
+        }
+        LOGGER.info("Updated the logical table config: {} in cache", 
logicalTableName);
+      } catch (Exception e) {
+        LOGGER.error("Caught exception while refreshing logical table for 
ZNRecord: {}", znRecord.getId(), e);
+      }
+    }
+
+    private synchronized void removeLogicalTableConfig(String 
logicalTableName) {
+      LogicalTableConfig logicalTableConfig = 
_logicalTableConfigMap.remove(logicalTableName);
+      if (logicalTableConfig != null) {
+        // Remove the table configs from the table config map
+        String offlineTableName = logicalTableConfig.getRefOfflineTableName();
+        String realtimeTableName = 
logicalTableConfig.getRefRealtimeTableName();
+        if (offlineTableName != null) {
+          removeTableConfig(offlineTableName, logicalTableName);
+        }
+        if (realtimeTableName != null) {
+          removeTableConfig(realtimeTableName, logicalTableName);
+        }
+        // remove schema
+        removeSchema(logicalTableConfig);
+        // Unsubscribe from the logical table config path
+        String logicalTableConfigPath = ZkPaths.LOGICAL_TABLE_PATH_PREFIX + 
logicalTableName;
+        _propertyStore.unsubscribeDataChanges(logicalTableConfigPath, 
_zkLogicalTableConfigChangeListener);
+        LOGGER.info("Removed the logical table config: {} from cache", 
logicalTableName);
+      }
+    }
+
+    private synchronized void removeTableConfig(String tableName, String 
logicalTableName) {
+      _tableNameToLogicalTableNamesMap.computeIfPresent(tableName, (k, v) -> {
+        v.remove(logicalTableName);
+        if (v.isEmpty()) {
+          _tableConfigMap.remove(tableName);
+          String path = ZkPaths.TABLE_CONFIG_PATH_PREFIX + tableName;
+          _propertyStore.unsubscribeDataChanges(path, 
_zkTableConfigChangeListener);
+          LOGGER.info("Removed the table config: {} from cache", tableName);
+          return null;
+        }
+        return v;
+      });
+    }
+
+    private synchronized void removeSchema(LogicalTableConfig 
logicalTableConfig) {
+      String schemaName = logicalTableConfig.getTableName();
+      _schemaMap.remove(schemaName);
+      String schemaPath = ZkPaths.SCHEMA_PATH_PREFIX + schemaName;
+      _propertyStore.unsubscribeDataChanges(schemaPath, 
_zkSchemaChangeListener);
+      LOGGER.info("Removed the schema: {} from cache", schemaName);
+    }
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
index e25948fd61..b2a5ae02e1 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
@@ -52,6 +52,7 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants.Segment.BuiltInVirtualColumn;
+import org.apache.pinot.spi.utils.CommonConstants.ZkPaths;
 import org.apache.pinot.spi.utils.TimestampIndexUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
@@ -70,8 +71,6 @@ public class TableCache implements PinotConfigProvider {
   private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
   private static final String SCHEMA_PARENT_PATH = "/SCHEMAS";
   private static final String SCHEMA_PATH_PREFIX = "/SCHEMAS/";
-  private static final String LOGICAL_TABLE_PARENT_PATH = "/LOGICAL/TABLE";
-  private static final String LOGICAL_TABLE_PATH_PREFIX = "/LOGICAL/TABLE/";
   private static final String OFFLINE_TABLE_SUFFIX = "_OFFLINE";
   private static final String REALTIME_TABLE_SUFFIX = "_REALTIME";
   private static final String LOWER_CASE_OFFLINE_TABLE_SUFFIX = 
OFFLINE_TABLE_SUFFIX.toLowerCase();
@@ -138,12 +137,12 @@ public class TableCache implements PinotConfigProvider {
 
     synchronized (_zkLogicalTableConfigChangeListener) {
       // Subscribe child changes before reading the data to avoid missing 
changes
-      _propertyStore.subscribeChildChanges(LOGICAL_TABLE_PARENT_PATH, 
_zkLogicalTableConfigChangeListener);
+      _propertyStore.subscribeChildChanges(ZkPaths.LOGICAL_TABLE_PARENT_PATH, 
_zkLogicalTableConfigChangeListener);
 
-      List<String> tables = 
_propertyStore.getChildNames(LOGICAL_TABLE_PARENT_PATH, 
AccessOption.PERSISTENT);
+      List<String> tables = 
_propertyStore.getChildNames(ZkPaths.LOGICAL_TABLE_PARENT_PATH, 
AccessOption.PERSISTENT);
       if (CollectionUtils.isNotEmpty(tables)) {
         List<String> pathsToAdd = tables.stream()
-            .map(rawTableName -> LOGICAL_TABLE_PATH_PREFIX + rawTableName)
+            .map(rawTableName -> ZkPaths.LOGICAL_TABLE_PATH_PREFIX + 
rawTableName)
             .collect(Collectors.toList());
         addLogicalTableConfigs(pathsToAdd);
       }
@@ -391,7 +390,7 @@ public class TableCache implements PinotConfigProvider {
 
   private void removeLogicalTableConfig(String path) {
     _propertyStore.unsubscribeDataChanges(path, 
_zkLogicalTableConfigChangeListener);
-    String logicalTableName = 
path.substring(LOGICAL_TABLE_PATH_PREFIX.length());
+    String logicalTableName = 
path.substring(ZkPaths.LOGICAL_TABLE_PATH_PREFIX.length());
     _logicalTableConfigInfoMap.remove(logicalTableName);
     logicalTableName = _ignoreCase ? logicalTableName.toLowerCase() : 
logicalTableName;
     _logicalTableNameMap.remove(logicalTableName);
@@ -616,7 +615,7 @@ public class TableCache implements PinotConfigProvider {
       List<String> pathsToAdd = new ArrayList<>();
       for (String logicalTableName : logicalTableNames) {
         if (!_logicalTableConfigInfoMap.containsKey(logicalTableName)) {
-          pathsToAdd.add(LOGICAL_TABLE_PATH_PREFIX + logicalTableName);
+          pathsToAdd.add(ZkPaths.LOGICAL_TABLE_PATH_PREFIX + logicalTableName);
         }
       }
       if (!pathsToAdd.isEmpty()) {
@@ -642,7 +641,7 @@ public class TableCache implements PinotConfigProvider {
     public synchronized void handleDataDeleted(String path) {
       // NOTE: The path here is the absolute ZK path instead of the relative 
path to the property store.
       String logicalTableName = path.substring(path.lastIndexOf('/') + 1);
-      removeLogicalTableConfig(LOGICAL_TABLE_PATH_PREFIX + logicalTableName);
+      removeLogicalTableConfig(ZkPaths.LOGICAL_TABLE_PATH_PREFIX + 
logicalTableName);
       notifyLogicalTableConfigChangeListeners();
     }
   }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index eff90fcf49..dfb2bbe401 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -52,6 +52,7 @@ import org.apache.pinot.spi.config.user.UserConfig;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.CommonConstants.ZkPaths;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.StringUtil;
 import org.apache.pinot.spi.utils.TableConfigDecoratorRegistry;
@@ -73,7 +74,6 @@ public class ZKMetadataProvider {
   private static final String PROPERTYSTORE_SEGMENTS_PREFIX = "/SEGMENTS";
   private static final String PROPERTYSTORE_PAUSELESS_DEBUG_METADATA_PREFIX = 
"/PAUSELESS_DEBUG_METADATA";
   private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS";
-  private static final String PROPERTYSTORE_LOGICAL_PREFIX = "/LOGICAL/TABLE";
   private static final String PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX = 
"/INSTANCE_PARTITIONS";
   private static final String PROPERTYSTORE_DATABASE_CONFIGS_PREFIX = 
"/CONFIGS/DATABASE";
   private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX = 
"/CONFIGS/TABLE";
@@ -312,7 +312,7 @@ public class ZKMetadataProvider {
   }
 
   public static String constructPropertyStorePathForLogical(String tableName) {
-    return StringUtil.join("/", PROPERTYSTORE_LOGICAL_PREFIX, tableName);
+    return StringUtil.join("/", ZkPaths.LOGICAL_TABLE_PARENT_PATH, tableName);
   }
 
   public static boolean isSegmentExisted(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String resourceNameForResource,
@@ -850,7 +850,7 @@ public class ZKMetadataProvider {
 
   public static List<LogicalTableConfig> 
getAllLogicalTableConfigs(ZkHelixPropertyStore<ZNRecord> propertyStore) {
     List<ZNRecord> znRecords =
-        propertyStore.getChildren(PROPERTYSTORE_LOGICAL_PREFIX, null, 
AccessOption.PERSISTENT, 0, 0);
+        propertyStore.getChildren(ZkPaths.LOGICAL_TABLE_PARENT_PATH, null, 
AccessOption.PERSISTENT, 0, 0);
     if (znRecords != null) {
       return znRecords.stream().map(znRecord -> {
         try {
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java
index fc25ef4dd3..542fc9a248 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java
@@ -21,6 +21,7 @@ package org.apache.pinot.common.utils.helix;
 import org.apache.helix.AccessOption;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.spi.utils.CommonConstants.ZkPaths;
 
 
 public class PinotHelixPropertyStoreZnRecordProvider {
@@ -52,7 +53,7 @@ public class PinotHelixPropertyStoreZnRecordProvider {
   }
 
   public static PinotHelixPropertyStoreZnRecordProvider 
forLogicalTable(ZkHelixPropertyStore<ZNRecord> propertyStore) {
-    return new PinotHelixPropertyStoreZnRecordProvider(propertyStore, 
"/LOGICAL/TABLE");
+    return new PinotHelixPropertyStoreZnRecordProvider(propertyStore, 
ZkPaths.LOGICAL_TABLE_PARENT_PATH);
   }
 
   public ZNRecord get(String name) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
index 7969d2aa86..8acf41df87 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
@@ -198,6 +198,17 @@ public class ControllerRequestClient {
     }
   }
 
+  public void deleteLogicalTable(String logicalTableName)
+      throws IOException {
+    try {
+      HttpClient.wrapAndThrowHttpException(
+          _httpClient.sendDeleteRequest(new 
URI(_controllerRequestURLBuilder.forLogicalTableDelete(logicalTableName)),
+              _headers));
+    } catch (HttpErrorStatusException | URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
   public TableConfig getTableConfig(String tableName, TableType tableType)
       throws IOException {
     try {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index d7bcf53bac..21894be117 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1586,6 +1586,12 @@ public class PinotHelixResourceManager {
     }
 
     updateSchema(schema, oldSchema, forceTableSchemaUpdate);
+    if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, schemaName)) {
+      // For logical table schemas, we do not need to reload segments or send 
schema refresh messages
+      LOGGER.info("Logical table schema: {} updated, no need to reload 
segments or send schema refresh messages",
+          schemaName);
+      return;
+    }
     try {
       List<String> tableNamesWithType = 
getExistingTableNamesWithType(schemaName, null);
       if (reload) {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 01092a3137..55e0bb1772 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -800,6 +800,11 @@ public class ControllerTest {
     
getControllerRequestClient().deleteTable(TableNameBuilder.REALTIME.tableNameWithType(tableName));
   }
 
+  public void dropLogicalTable(String logicalTableName)
+      throws IOException {
+    getControllerRequestClient().deleteLogicalTable(logicalTableName);
+  }
+
   public void waitForEVToAppear(String tableNameWithType) {
     TestUtils.waitForCondition(aVoid -> 
_helixResourceManager.getTableExternalView(tableNameWithType) != null, 60_000L,
         "Failed to create the external view for table: " + tableNameWithType);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/LogicalTableMetadataCacheTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/LogicalTableMetadataCacheTest.java
new file mode 100644
index 0000000000..db459e9aff
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/LogicalTableMetadataCacheTest.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.config.provider.LogicalTableMetadataCache;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+
+public class LogicalTableMetadataCacheTest {
+
+  private static final ControllerTest INSTANCE = ControllerTest.getInstance();
+  private static final LogicalTableMetadataCache CACHE = new 
LogicalTableMetadataCache();
+
+  private final String _testTable = "testTable";
+  private final String _extraTableName = "testExtraTable";
+  private final Schema _testTableSchema = 
ControllerTest.createDummySchema(_testTable);
+  private final Schema _extraTableSchema = 
ControllerTest.createDummySchema(_extraTableName);
+  private final TableConfig _offlineTableConfig = 
ControllerTest.createDummyTableConfig(_testTable, TableType.OFFLINE);
+  private final TableConfig _realtimeTableConfig =
+      ControllerTest.createDummyTableConfig(_testTable, TableType.REALTIME);
+  private final TableConfig _extraOfflineTableConfig =
+      ControllerTest.createDummyTableConfig(_extraTableName, 
TableType.OFFLINE);
+  private final TableConfig _extraRealtimeTableConfig =
+      ControllerTest.createDummyTableConfig(_extraTableName, 
TableType.REALTIME);
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    INSTANCE.setupSharedStateAndValidate();
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    INSTANCE.stopSharedTestSetup();
+  }
+
+  @BeforeMethod
+  public void beforeMethod()
+      throws IOException {
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
INSTANCE.getHelixResourceManager().getPropertyStore();
+    CACHE.init(propertyStore);
+
+    // Setup schema and table configs in the property store
+    INSTANCE.addSchema(_testTableSchema);
+    INSTANCE.addSchema(_extraTableSchema);
+    INSTANCE.addTableConfig(_offlineTableConfig);
+    INSTANCE.addTableConfig(_realtimeTableConfig);
+    INSTANCE.addTableConfig(_extraOfflineTableConfig);
+    INSTANCE.addTableConfig(_extraRealtimeTableConfig);
+
+    // Ensure the schema and table configs are not loaded into the cache yet
+    assertNull(CACHE.getSchema(_testTable));
+    assertNull(CACHE.getSchema(_extraTableName));
+    assertNull(CACHE.getTableConfig(_offlineTableConfig.getTableName()));
+    assertNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName()));
+    assertNull(CACHE.getTableConfig(_extraOfflineTableConfig.getTableName()));
+    assertNull(CACHE.getTableConfig(_extraRealtimeTableConfig.getTableName()));
+  }
+
+  @AfterMethod
+  public void afterMethod() {
+    CACHE.shutdown();
+    INSTANCE.cleanup();
+  }
+
+  @Test
+  public void testLogicalTableCacheWithUpdates()
+      throws IOException {
+    String logicalTableName = "testLogicalTable1";
+    LogicalTableConfig logicalTableConfig = addLogicalTableAndValidateCache(
+        logicalTableName, List.of(_offlineTableConfig.getTableName(), 
_realtimeTableConfig.getTableName()));
+    Schema logicalTableSchema = 
ControllerTest.createDummySchema(logicalTableName);
+
+    // Update logical table config and verify the cache is updated
+    Map<String, PhysicalTableConfig> physicalTableConfigMap = 
logicalTableConfig.getPhysicalTableConfigMap();
+    physicalTableConfigMap.put(_extraOfflineTableConfig.getTableName(), new 
PhysicalTableConfig());
+    physicalTableConfigMap.put(_extraRealtimeTableConfig.getTableName(), new 
PhysicalTableConfig());
+    assertNotEquals(CACHE.getLogicalTableConfig(logicalTableName), 
logicalTableConfig);
+    INSTANCE.updateLogicalTableConfig(logicalTableConfig);
+    TestUtils.waitForCondition(
+        aVoid -> 
CACHE.getLogicalTableConfig(logicalTableName).equals(logicalTableConfig),
+        10_000L, "Logical table config not updated in cache");
+    assertNull(CACHE.getTableConfig(_extraOfflineTableConfig.getTableName()));
+    assertNull(CACHE.getTableConfig(_extraRealtimeTableConfig.getTableName()));
+
+    // Update logical table schema and verify the cache is updated
+    logicalTableSchema.addField(new MetricFieldSpec("newMetric", 
FieldSpec.DataType.INT));
+    assertNotEquals(CACHE.getSchema(logicalTableName), logicalTableSchema);
+    INSTANCE.updateSchema(logicalTableSchema);
+    TestUtils.waitForCondition(
+        aVoid -> CACHE.getSchema(logicalTableName).equals(logicalTableSchema),
+        10_000L, "Logical table schema not updated in cache");
+
+    // Update offline table configs and verify the cache is updated (update 
retention)
+    _offlineTableConfig.getValidationConfig().setRetentionTimeValue("10");
+    assertNotEquals(
+        
Objects.requireNonNull(CACHE.getTableConfig(_offlineTableConfig.getTableName()))
+            .getValidationConfig()
+            .getRetentionTimeValue(),
+        _offlineTableConfig.getValidationConfig().getRetentionTimeValue());
+    INSTANCE.updateTableConfig(_offlineTableConfig);
+    TestUtils.waitForCondition(
+        aVoid -> 
Objects.requireNonNull(CACHE.getTableConfig(_offlineTableConfig.getTableName()))
+            .getValidationConfig()
+            .getRetentionTimeValue()
+            
.equals(_offlineTableConfig.getValidationConfig().getRetentionTimeValue()),
+        10_000L, "Offline table config not updated in cache");
+
+    // Update realtime table configs and verify the cache is updated (update 
retention)
+    _realtimeTableConfig.getValidationConfig().setRetentionTimeValue("20");
+    assertNotEquals(
+        
Objects.requireNonNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName()))
+            .getValidationConfig()
+            .getRetentionTimeValue(),
+        _realtimeTableConfig.getValidationConfig().getRetentionTimeValue());
+    INSTANCE.updateTableConfig(_realtimeTableConfig);
+    TestUtils.waitForCondition(
+        aVoid -> 
Objects.requireNonNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName()))
+            .getValidationConfig().getRetentionTimeValue()
+            
.equals(_realtimeTableConfig.getValidationConfig().getRetentionTimeValue()),
+        10_000L, "Realtime table config not updated in cache");
+
+    // Delete logical table config and verify the cache is removed
+    INSTANCE.dropLogicalTable(logicalTableName);
+    TestUtils.waitForCondition(
+        aVoid -> CACHE.getSchema(logicalTableName) == null,
+        10_000L, "Logical table schema not removed from cache");
+    assertNull(CACHE.getLogicalTableConfig(logicalTableName));
+    assertNull(CACHE.getTableConfig(_offlineTableConfig.getTableName()));
+    assertNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName()));
+  }
+
+  @Test
+  public void testLogicalTableUpdateRefTables()
+      throws IOException {
+    String logicalTableName = "testLogicalTable2";
+    LogicalTableConfig logicalTableConfig = addLogicalTableAndValidateCache(
+        logicalTableName, List.of(_offlineTableConfig.getTableName(), 
_realtimeTableConfig.getTableName()));
+
+    // Update logical table config ref tables with extra tables and verify the 
cache is updated
+    
logicalTableConfig.setRefOfflineTableName(_extraOfflineTableConfig.getTableName());
+    
logicalTableConfig.setRefRealtimeTableName(_extraRealtimeTableConfig.getTableName());
+    logicalTableConfig.setPhysicalTableConfigMap(
+        Map.of(_extraOfflineTableConfig.getTableName(), new 
PhysicalTableConfig(),
+            _extraRealtimeTableConfig.getTableName(), new 
PhysicalTableConfig())
+    );
+    assertNotEquals(CACHE.getLogicalTableConfig(logicalTableName), 
logicalTableConfig);
+
+    INSTANCE.updateLogicalTableConfig(logicalTableConfig);
+
+    TestUtils.waitForCondition(
+        aVoid -> 
CACHE.getLogicalTableConfig(logicalTableName).equals(logicalTableConfig),
+        10_000L, "Logical table config not updated in cache");
+    
assertNotNull(CACHE.getTableConfig(_extraOfflineTableConfig.getTableName()));
+    
assertNotNull(CACHE.getTableConfig(_extraRealtimeTableConfig.getTableName()));
+    assertNull(CACHE.getTableConfig(_offlineTableConfig.getTableName()));
+    assertNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName()));
+    assertNotNull(CACHE.getSchema(logicalTableName));
+  }
+
+  @Test
+  public void testCacheWithMultipleLogicalTables()
+      throws IOException {
+    String logicalTableName = "testLogicalTable3";
+    LogicalTableConfig logicalTableConfig = addLogicalTableAndValidateCache(
+        logicalTableName, List.of(_offlineTableConfig.getTableName(), 
_realtimeTableConfig.getTableName()));
+
+    String otherLogicalTableName = "otherLogicalTable";
+    LogicalTableConfig otherLogicalTableConfig = 
addLogicalTableAndValidateCache(
+        otherLogicalTableName, List.of(_offlineTableConfig.getTableName(), 
_realtimeTableConfig.getTableName()));
+
+    // Delete one logical table config and verify the other is still present
+    INSTANCE.dropLogicalTable(logicalTableName);
+    TestUtils.waitForCondition(
+        aVoid -> CACHE.getSchema(logicalTableName) == null,
+        10_000L, "Logical table schema not removed from cache");
+    assertNull(CACHE.getLogicalTableConfig(logicalTableName));
+    assertNotNull(CACHE.getLogicalTableConfig(otherLogicalTableName));
+    assertNotNull(CACHE.getTableConfig(_offlineTableConfig.getTableName()));
+    assertNotNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName()));
+
+    // Delete the other logical table config and verify the cache is empty
+    INSTANCE.dropLogicalTable(otherLogicalTableName);
+    TestUtils.waitForCondition(
+        aVoid -> CACHE.getSchema(otherLogicalTableName) == null,
+        10_000L, "Logical table schema not removed from cache");
+    assertNull(CACHE.getLogicalTableConfig(otherLogicalTableName));
+    assertNull(CACHE.getTableConfig(_offlineTableConfig.getTableName()));
+    assertNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName()));
+  }
+
+  private LogicalTableConfig addLogicalTableAndValidateCache(String 
logicalTableName, List<String> physicalTableNames)
+      throws IOException {
+    // Add logical table config
+    Schema logicalTableSchema = 
ControllerTest.createDummySchema(logicalTableName);
+    LogicalTableConfig logicalTableConfig = 
ControllerTest.getDummyLogicalTableConfig(logicalTableName,
+        physicalTableNames, "DefaultTenant");
+    INSTANCE.addSchema(logicalTableSchema);
+    INSTANCE.addLogicalTableConfig(logicalTableConfig);
+
+    // wait for the cache to be updated
+    TestUtils.waitForCondition(
+        aVoid -> CACHE.getLogicalTableConfig(logicalTableName) != null,
+        10_000L, "Logical table config not loaded into cache");
+
+    // Verify that the logical table config is loaded into the cache
+    assertNotNull(CACHE.getSchema(logicalTableName));
+    assertEquals(CACHE.getSchema(logicalTableName), logicalTableSchema);
+    assertNotNull(CACHE.getTableConfig(_offlineTableConfig.getTableName()));
+    assertNotNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName()));
+    assertNotNull(CACHE.getLogicalTableConfig(logicalTableName));
+    assertEquals(CACHE.getLogicalTableConfig(logicalTableName), 
logicalTableConfig);
+
+    // verify extra schema and table configs are not loaded
+    assertNull(CACHE.getSchema(_extraTableName));
+    assertNull(CACHE.getTableConfig(_extraOfflineTableConfig.getTableName()));
+    assertNull(CACHE.getTableConfig(_extraRealtimeTableConfig.getTableName()));
+    return logicalTableConfig;
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
index 0dcbb3a9f3..414b664241 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
@@ -619,7 +619,9 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
     JsonNode response = postQueryToController(query);
     assertNoError(response);
 
-    query = "SELECT count(*) FROM " + getOfflineTableNames().get(0);
+    String tableName =
+        getOfflineTableNames().isEmpty() ? getRealtimeTableNames().get(0) : 
getOfflineTableNames().get(0);
+    query = "SELECT count(*) FROM " + tableName;
     response = postQueryToController(query);
     assertNoError(response);
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index b0eb2e12d0..807a9a9fd6 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -253,10 +253,10 @@ public class ServerPlanRequestUtils {
     instanceRequest.setBrokerId("unknown");
     instanceRequest.setEnableTrace(executionContext.isTraceEnabled());
     /*
-      * If segmentList is not null, it means that the query is for a single 
table and we can directly set the segments.
-      * If segmentList is null, it means that the query is for a logical table 
and we need to set TableSegmentInfoList
-      *
-      * Either one of segmentList or tableRouteInfoList has to be set, but not 
both.
+     * If segmentList is not null, it means that the query is for a single 
table and we can directly set the segments.
+     * If segmentList is null, it means that the query is for a logical table 
and we need to set TableSegmentInfoList
+     *
+     * Either one of segmentList or tableRouteInfoList has to be set, but not 
both.
      */
     if (segmentList != null) {
       instanceRequest.setSearchSegments(segmentList);
@@ -422,7 +422,8 @@ public class ServerPlanRequestUtils {
     String logicalTableName = stageMetadata.getTableName();
     LogicalTableContext logicalTableContext = 
instanceDataManager.getLogicalTableContext(logicalTableName);
     Preconditions.checkNotNull(logicalTableContext,
-        "LogicalTableManager not found for logical table name: " + 
logicalTableName);
+        String.format("LogicalTableContext not found for logical table name: 
%s, query context id: %s",
+            logicalTableName, QueryThreadContext.getCid()));
 
     Map<String, List<String>> logicalTableSegmentsMap =
         executionContext.getWorkerMetadata().getLogicalTableSegmentsMap();
@@ -430,7 +431,7 @@ public class ServerPlanRequestUtils {
     List<TableSegmentsInfo> realtimeTableRouteInfoList = new ArrayList<>();
 
     Preconditions.checkNotNull(logicalTableSegmentsMap);
-    for (Map.Entry<String, List<String>> entry: 
logicalTableSegmentsMap.entrySet()) {
+    for (Map.Entry<String, List<String>> entry : 
logicalTableSegmentsMap.entrySet()) {
       String physicalTableName = entry.getKey();
       TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(physicalTableName);
       TableSegmentsInfo tableSegmentsInfo = new TableSegmentsInfo();
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 07da49bb5f..924e0ff333 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -44,6 +44,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.config.provider.LogicalTableMetadataCache;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
@@ -85,6 +86,10 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(HelixInstanceDataManager.class);
 
   private final Map<String, TableDataManager> _tableDataManagerMap = new 
ConcurrentHashMap<>();
+
+  // Logical table metadata cache to cache logical table configs, schemas, and 
offline/realtime table configs.
+  private final LogicalTableMetadataCache _logicalTableMetadataCache = new 
LogicalTableMetadataCache();
+
   // TODO: Consider making segment locks per table instead of per instance
   private final SegmentLocks _segmentLocks = new SegmentLocks();
 
@@ -228,6 +233,9 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
   @Override
   public synchronized void start() {
     _propertyStore = _helixManager.getHelixPropertyStore();
+    // Initialize logical table metadata cache
+    _logicalTableMetadataCache.init(_propertyStore);
+
     LOGGER.info("Helix instance data manager started");
   }
 
@@ -255,6 +263,8 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
       }
     }
     SegmentBuildTimeLeaseExtender.shutdownExecutor();
+    // shutdown logical table metadata cache
+    _logicalTableMetadataCache.shutdown();
     LOGGER.info("Helix instance data manager shut down");
   }
 
@@ -533,17 +543,15 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     }
   }
 
-  // TODO: LogicalTableContext has to be cached. 
https://github.com/apache/pinot/issues/15859
   @Nullable
   @Override
   public LogicalTableContext getLogicalTableContext(String logicalTableName) {
-    Schema schema = ZKMetadataProvider.getSchema(getPropertyStore(), 
logicalTableName);
+    Schema schema = _logicalTableMetadataCache.getSchema(logicalTableName);
     if (schema == null) {
       LOGGER.warn("Failed to find schema for logical table: {}, skipping", 
logicalTableName);
       return null;
     }
-    LogicalTableConfig logicalTableConfig = 
ZKMetadataProvider.getLogicalTableConfig(getPropertyStore(),
-        logicalTableName);
+    LogicalTableConfig logicalTableConfig = 
_logicalTableMetadataCache.getLogicalTableConfig(logicalTableName);
     if (logicalTableConfig == null) {
       LOGGER.warn("Failed to find logical table config for logical table: {}, 
skipping", logicalTableName);
       return null;
@@ -551,8 +559,7 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
 
     TableConfig offlineTableConfig = null;
     if (logicalTableConfig.getRefOfflineTableName() != null) {
-      offlineTableConfig = 
ZKMetadataProvider.getOfflineTableConfig(getPropertyStore(),
-          logicalTableConfig.getRefOfflineTableName());
+      offlineTableConfig = 
_logicalTableMetadataCache.getTableConfig(logicalTableConfig.getRefOfflineTableName());
       if (offlineTableConfig == null) {
         LOGGER.warn("Failed to find offline table config for logical table: 
{}, skipping", logicalTableName);
         return null;
@@ -561,8 +568,7 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
 
     TableConfig realtimeTableConfig = null;
     if (logicalTableConfig.getRefRealtimeTableName() != null) {
-      realtimeTableConfig = 
ZKMetadataProvider.getRealtimeTableConfig(getPropertyStore(),
-          logicalTableConfig.getRefRealtimeTableName());
+      realtimeTableConfig = 
_logicalTableMetadataCache.getTableConfig(logicalTableConfig.getRefRealtimeTableName());
       if (realtimeTableConfig == null) {
         LOGGER.warn("Failed to find realtime table config for logical table: 
{}, skipping", logicalTableName);
         return null;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java
index 2c52148098..973502c543 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.spi.data;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -31,6 +32,26 @@ import org.apache.pinot.spi.config.table.QuotaConfig;
 import org.apache.pinot.spi.utils.JsonUtils;
 
 
+/**
+ * Represents the configuration for a logical table in Pinot.
+ *
+ * <p>
+ * <ul>
+ *   <li><b>tableName</b>: The name of the logical table.</li>
+ *   <li><b>physicalTableConfigMap</b>: A map of physical table names to their 
configurations.</li>
+ *   <li><b>brokerTenant</b>: The tenant for the broker.</li>
+ *   <li><b>queryConfig</b>: Configuration for query execution on the logical 
table.</li>
+ *   <li><b>quotaConfig</b>: Configuration for quota management on the logical 
table.</li>
+ *   <li><b>refOfflineTableName</b>: The name of the offline table whose table 
config is referenced by this logical
+ *   table.</li>
+ *   <li><b>refRealtimeTableName</b>: The name of the realtime table whose 
table config is referenced by this logical
+ *   table.</li>
+ *   <li><b>timeBoundaryConfig</b>: Configuration for time boundaries of the 
logical table. This is used to determine
+ *   the time boundaries for queries on the logical table, especially in 
hybrid scenarios where both offline and
+ *   realtime data are present.</li>
+ * </ul>
+ * </p>
+ */
 public class LogicalTableConfig extends BaseJsonConfig {
 
   private static final ObjectMapper DEFAULT_MAPPER = new ObjectMapper();
@@ -105,6 +126,7 @@ public class LogicalTableConfig extends BaseJsonConfig {
     _quotaConfig = quotaConfig;
   }
 
+  @Nullable
   public String getRefOfflineTableName() {
     return _refOfflineTableName;
   }
@@ -113,6 +135,7 @@ public class LogicalTableConfig extends BaseJsonConfig {
     _refOfflineTableName = refOfflineTableName;
   }
 
+  @Nullable
   public String getRefRealtimeTableName() {
     return _refRealtimeTableName;
   }
@@ -121,6 +144,7 @@ public class LogicalTableConfig extends BaseJsonConfig {
     _refRealtimeTableName = refRealtimeTableName;
   }
 
+  @Nullable
   public TimeBoundaryConfig getTimeBoundaryConfig() {
     return _timeBoundaryConfig;
   }
@@ -151,6 +175,7 @@ public class LogicalTableConfig extends BaseJsonConfig {
     }
   }
 
+  @JsonIgnore
   public boolean isHybridLogicalTable() {
     return _refOfflineTableName != null && _refRealtimeTableName != null;
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 7152d83f2b..86b8200dfc 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1719,4 +1719,14 @@ public class CommonConstants {
     public static final String GROOVY_QUERY_STATIC_ANALYZER_CONFIG = 
"pinot.groovy.query.static.analyzer";
     public static final String GROOVY_INGESTION_STATIC_ANALYZER_CONFIG = 
"pinot.groovy.ingestion.static.analyzer";
   }
+
+  /**
+   * ZK paths used by Pinot.
+   */
+  public static class ZkPaths {
+    public static final String LOGICAL_TABLE_PARENT_PATH = "/LOGICAL/TABLE";
+    public static final String LOGICAL_TABLE_PATH_PREFIX = "/LOGICAL/TABLE/";
+    public static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
+    public static final String SCHEMA_PATH_PREFIX = "/SCHEMAS/";
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to