This is an automated email from the ASF dual-hosted git repository. yashmayya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 4cd508d7a8 Cache configs for logical table context in server (#15881) 4cd508d7a8 is described below commit 4cd508d7a84e653b1d85eca93e270988423385a6 Author: Abhishek Bafna <aba...@startree.ai> AuthorDate: Tue Jun 10 12:55:31 2025 +0530 Cache configs for logical table context in server (#15881) --- .../HelixExternalViewBasedQueryQuotaManager.java | 3 +- .../config/provider/LogicalTableMetadataCache.java | 321 +++++++++++++++++++++ .../pinot/common/config/provider/TableCache.java | 15 +- .../pinot/common/metadata/ZKMetadataProvider.java | 6 +- .../PinotHelixPropertyStoreZnRecordProvider.java | 3 +- .../controller/helix/ControllerRequestClient.java | 11 + .../helix/core/PinotHelixResourceManager.java | 6 + .../pinot/controller/helix/ControllerTest.java | 5 + .../helix/LogicalTableMetadataCacheTest.java | 260 +++++++++++++++++ .../BaseLogicalTableIntegrationTest.java | 4 +- .../plan/server/ServerPlanRequestUtils.java | 13 +- .../starter/helix/HelixInstanceDataManager.java | 22 +- .../apache/pinot/spi/data/LogicalTableConfig.java | 25 ++ .../apache/pinot/spi/utils/CommonConstants.java | 10 + 14 files changed, 676 insertions(+), 28 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java index 7bb8641271..46009aab74 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java @@ -51,6 +51,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.CommonConstants.ZkPaths; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; @@ -960,6 +961,6 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan } private String constructLogicalTableConfigPath(String tableName) { - return "/LOGICAL/TABLE/" + tableName; + return ZkPaths.LOGICAL_TABLE_PATH_PREFIX + tableName; } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/LogicalTableMetadataCache.java b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/LogicalTableMetadataCache.java new file mode 100644 index 0000000000..62982ea680 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/LogicalTableMetadataCache.java @@ -0,0 +1,321 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.config.provider; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.Nullable; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.helix.AccessOption; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.zkclient.IZkChildListener; +import org.apache.helix.zookeeper.zkclient.IZkDataListener; +import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.utils.LogicalTableConfigUtils; +import org.apache.pinot.common.utils.SchemaUtils; +import org.apache.pinot.common.utils.config.TableConfigUtils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.LogicalTableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.CommonConstants.ZkPaths; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * LogicalTableMetadataCache maintains the cache for logical tables, that includes the logical table configs, + * logical table schemas, and reference offline and realtime table configs. + * It listens to changes in the ZK property store for all the logical table configs and updates the cache accordingly. + * For schema and table configs, it listens to only those configs that are required by the logical tables. + */ +public class LogicalTableMetadataCache { + + private static final Logger LOGGER = LoggerFactory.getLogger(LogicalTableMetadataCache.class); + + private final Map<String, LogicalTableConfig> _logicalTableConfigMap = new ConcurrentHashMap<>(); + private final Map<String, Schema> _schemaMap = new ConcurrentHashMap<>(); + private final Map<String, TableConfig> _tableConfigMap = new ConcurrentHashMap<>(); + private final Map<String, List<String>> _tableNameToLogicalTableNamesMap = new ConcurrentHashMap<>(); + + private ZkHelixPropertyStore<ZNRecord> _propertyStore; + private ZkTableConfigChangeListener _zkTableConfigChangeListener; + private ZkSchemaChangeListener _zkSchemaChangeListener; + private ZkLogicalTableConfigChangeListener _zkLogicalTableConfigChangeListener; + + public void init(ZkHelixPropertyStore<ZNRecord> propertyStore) { + _propertyStore = propertyStore; + _zkTableConfigChangeListener = new ZkTableConfigChangeListener(); + _zkSchemaChangeListener = new ZkSchemaChangeListener(); + _zkLogicalTableConfigChangeListener = new ZkLogicalTableConfigChangeListener(); + + // Add child listeners to the property store for logical table config changes + _propertyStore.subscribeChildChanges(ZkPaths.LOGICAL_TABLE_PARENT_PATH, _zkLogicalTableConfigChangeListener); + + LOGGER.info("Logical table metadata cache initialized"); + } + + public void shutdown() { + // Unsubscribe from the logical table config creation changes + _propertyStore.unsubscribeChildChanges(ZkPaths.LOGICAL_TABLE_PARENT_PATH, _zkLogicalTableConfigChangeListener); + + // Unsubscribe from all logical table config paths, table config paths, and schema paths + unsubscribeDataChanges(_logicalTableConfigMap.keySet(), ZkPaths.LOGICAL_TABLE_PATH_PREFIX, + _zkLogicalTableConfigChangeListener); + unsubscribeDataChanges(_tableConfigMap.keySet(), ZkPaths.TABLE_CONFIG_PATH_PREFIX, _zkTableConfigChangeListener); + unsubscribeDataChanges(_schemaMap.keySet(), ZkPaths.SCHEMA_PATH_PREFIX, _zkSchemaChangeListener); + + // Clear all caches + _logicalTableConfigMap.clear(); + _schemaMap.clear(); + _tableConfigMap.clear(); + _tableNameToLogicalTableNamesMap.clear(); + + LOGGER.info("Logical table metadata cache shutdown"); + } + + private void unsubscribeDataChanges(Set<String> resourceNames, String pathPrefix, + IZkDataListener changeListener) { + for (String resource : resourceNames) { + String logicalTableConfigPath = pathPrefix + resource; + _propertyStore.unsubscribeDataChanges(logicalTableConfigPath, changeListener); + } + } + + @Nullable + public Schema getSchema(String schemaName) { + return _schemaMap.get(schemaName); + } + + @Nullable + public TableConfig getTableConfig(String tableName) { + return _tableConfigMap.get(tableName); + } + + @Nullable + public LogicalTableConfig getLogicalTableConfig(String logicalTableName) { + return _logicalTableConfigMap.get(logicalTableName); + } + + private class ZkTableConfigChangeListener implements IZkDataListener { + + @Override + public synchronized void handleDataChange(String path, Object data) { + if (data != null) { + ZNRecord znRecord = (ZNRecord) data; + try { + TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord); + _tableConfigMap.put(tableConfig.getTableName(), tableConfig); + } catch (Exception e) { + LOGGER.error("Caught exception while refreshing table config for ZNRecord: {}", znRecord.getId(), e); + } + } + } + + @Override + public synchronized void handleDataDeleted(String path) { + // no-op, table config should not be deleted while referenced in the logical table config + } + } + + private class ZkSchemaChangeListener implements IZkDataListener { + + @Override + public synchronized void handleDataChange(String path, Object data) { + if (data != null) { + ZNRecord znRecord = (ZNRecord) data; + try { + Schema schema = SchemaUtils.fromZNRecord(znRecord); + _schemaMap.put(schema.getSchemaName(), schema); + } catch (Exception e) { + LOGGER.error("Caught exception while refreshing schema for ZNRecord: {}", znRecord.getId(), e); + } + } + } + + @Override + public synchronized void handleDataDeleted(String path) { + // no-op, schema should not be deleted before the logical table config + } + } + + private class ZkLogicalTableConfigChangeListener implements IZkChildListener, IZkDataListener { + + @Override + public synchronized void handleChildChange(String path, List<String> logicalTableNames) { + if (CollectionUtils.isEmpty(logicalTableNames)) { + return; + } + + // Only process new added logical tables. Changed/removed logical tables are handled by other callbacks. + List<String> pathsToAdd = new ArrayList<>(); + for (String logicalTableName : logicalTableNames) { + if (!_logicalTableConfigMap.containsKey(logicalTableName)) { + pathsToAdd.add(ZkPaths.LOGICAL_TABLE_PATH_PREFIX + logicalTableName); + } + } + if (!pathsToAdd.isEmpty()) { + addLogicalTableConfigs(pathsToAdd); + } + } + + @Override + public synchronized void handleDataChange(String path, Object data) { + if (data != null) { + updateLogicalTableConfig((ZNRecord) data); + } + } + + @Override + public synchronized void handleDataDeleted(String path) { + // NOTE: The path here is the absolute ZK path instead of the relative path to the property store. + String logicalTableName = path.substring(path.lastIndexOf('/') + 1); + removeLogicalTableConfig(logicalTableName); + } + + private synchronized void addLogicalTableConfigs(List<String> pathsToAdd) { + for (String path : pathsToAdd) { + ZNRecord znRecord = _propertyStore.get(path, null, AccessOption.PERSISTENT); + if (znRecord != null) { + try { + LogicalTableConfig logicalTableConfig = LogicalTableConfigUtils.fromZNRecord(znRecord); + String logicalTableName = logicalTableConfig.getTableName(); + + if (logicalTableConfig.getRefOfflineTableName() != null) { + addTableConfig(logicalTableConfig.getRefOfflineTableName(), logicalTableName); + } + if (logicalTableConfig.getRefRealtimeTableName() != null) { + addTableConfig(logicalTableConfig.getRefRealtimeTableName(), logicalTableName); + } + + addSchema(logicalTableName); + _logicalTableConfigMap.put(logicalTableName, logicalTableConfig); + String logicalTableConfigPath = ZkPaths.LOGICAL_TABLE_PATH_PREFIX + logicalTableName; + _propertyStore.subscribeDataChanges(logicalTableConfigPath, _zkLogicalTableConfigChangeListener); + LOGGER.info("Added the logical table config: {} in cache", logicalTableName); + } catch (Exception e) { + LOGGER.error("Caught exception while refreshing logical table config for ZNRecord: {}", znRecord.getId(), + e); + } + } + } + } + + private synchronized void addTableConfig(String tableName, String logicalTableName) { + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableName); + Preconditions.checkArgument(tableConfig != null, "Failed to find table config for table: %s", tableName); + _tableNameToLogicalTableNamesMap.computeIfAbsent(tableName, k -> new ArrayList<>()) + .add(logicalTableName); + _tableConfigMap.put(tableName, tableConfig); + String path = ZkPaths.TABLE_CONFIG_PATH_PREFIX + tableName; + _propertyStore.subscribeDataChanges(path, _zkTableConfigChangeListener); + LOGGER.info("Added the table config: {} in cache for logical table: {}", tableName, logicalTableName); + } + + private synchronized void addSchema(String logicalTableName) { + Schema schema = ZKMetadataProvider.getSchema(_propertyStore, logicalTableName); + Preconditions.checkArgument(schema != null, + "Failed to find schema for logical table: %s", logicalTableName); + _schemaMap.put(schema.getSchemaName(), schema); + String schemaPath = ZkPaths.SCHEMA_PATH_PREFIX + schema.getSchemaName(); + _propertyStore.subscribeDataChanges(schemaPath, _zkSchemaChangeListener); + LOGGER.info("Added the schema: {} in cache for logical table: {}", schema.getSchemaName(), logicalTableName); + } + + private synchronized void updateLogicalTableConfig(ZNRecord znRecord) { + try { + LogicalTableConfig logicalTableConfig = LogicalTableConfigUtils.fromZNRecord(znRecord); + String logicalTableName = logicalTableConfig.getTableName(); + LogicalTableConfig oldLogicalTableConfig = _logicalTableConfigMap.put(logicalTableName, logicalTableConfig); + Preconditions.checkArgument(oldLogicalTableConfig != null, + "Logical table config for logical table: %s should have been created before", logicalTableName); + + // Remove the old table configs from the table config map + if (oldLogicalTableConfig.getRefOfflineTableName() != null + && !oldLogicalTableConfig.getRefOfflineTableName().equals(logicalTableConfig.getRefOfflineTableName())) { + removeTableConfig(oldLogicalTableConfig.getRefOfflineTableName(), logicalTableName); + } + if (oldLogicalTableConfig.getRefRealtimeTableName() != null + && !oldLogicalTableConfig.getRefRealtimeTableName().equals(logicalTableConfig.getRefRealtimeTableName())) { + removeTableConfig(oldLogicalTableConfig.getRefRealtimeTableName(), logicalTableName); + } + + // Add the new table configs to the table config map + if (logicalTableConfig.getRefOfflineTableName() != null + && !logicalTableConfig.getRefOfflineTableName().equals(oldLogicalTableConfig.getRefOfflineTableName())) { + addTableConfig(logicalTableConfig.getRefOfflineTableName(), logicalTableName); + } + if (logicalTableConfig.getRefRealtimeTableName() != null + && !logicalTableConfig.getRefRealtimeTableName().equals(oldLogicalTableConfig.getRefRealtimeTableName())) { + addTableConfig(logicalTableConfig.getRefRealtimeTableName(), logicalTableName); + } + LOGGER.info("Updated the logical table config: {} in cache", logicalTableName); + } catch (Exception e) { + LOGGER.error("Caught exception while refreshing logical table for ZNRecord: {}", znRecord.getId(), e); + } + } + + private synchronized void removeLogicalTableConfig(String logicalTableName) { + LogicalTableConfig logicalTableConfig = _logicalTableConfigMap.remove(logicalTableName); + if (logicalTableConfig != null) { + // Remove the table configs from the table config map + String offlineTableName = logicalTableConfig.getRefOfflineTableName(); + String realtimeTableName = logicalTableConfig.getRefRealtimeTableName(); + if (offlineTableName != null) { + removeTableConfig(offlineTableName, logicalTableName); + } + if (realtimeTableName != null) { + removeTableConfig(realtimeTableName, logicalTableName); + } + // remove schema + removeSchema(logicalTableConfig); + // Unsubscribe from the logical table config path + String logicalTableConfigPath = ZkPaths.LOGICAL_TABLE_PATH_PREFIX + logicalTableName; + _propertyStore.unsubscribeDataChanges(logicalTableConfigPath, _zkLogicalTableConfigChangeListener); + LOGGER.info("Removed the logical table config: {} from cache", logicalTableName); + } + } + + private synchronized void removeTableConfig(String tableName, String logicalTableName) { + _tableNameToLogicalTableNamesMap.computeIfPresent(tableName, (k, v) -> { + v.remove(logicalTableName); + if (v.isEmpty()) { + _tableConfigMap.remove(tableName); + String path = ZkPaths.TABLE_CONFIG_PATH_PREFIX + tableName; + _propertyStore.unsubscribeDataChanges(path, _zkTableConfigChangeListener); + LOGGER.info("Removed the table config: {} from cache", tableName); + return null; + } + return v; + }); + } + + private synchronized void removeSchema(LogicalTableConfig logicalTableConfig) { + String schemaName = logicalTableConfig.getTableName(); + _schemaMap.remove(schemaName); + String schemaPath = ZkPaths.SCHEMA_PATH_PREFIX + schemaName; + _propertyStore.unsubscribeDataChanges(schemaPath, _zkSchemaChangeListener); + LOGGER.info("Removed the schema: {} from cache", schemaName); + } + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java index e25948fd61..b2a5ae02e1 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java @@ -52,6 +52,7 @@ import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants.Segment.BuiltInVirtualColumn; +import org.apache.pinot.spi.utils.CommonConstants.ZkPaths; import org.apache.pinot.spi.utils.TimestampIndexUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.sql.parsers.CalciteSqlParser; @@ -70,8 +71,6 @@ public class TableCache implements PinotConfigProvider { private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/"; private static final String SCHEMA_PARENT_PATH = "/SCHEMAS"; private static final String SCHEMA_PATH_PREFIX = "/SCHEMAS/"; - private static final String LOGICAL_TABLE_PARENT_PATH = "/LOGICAL/TABLE"; - private static final String LOGICAL_TABLE_PATH_PREFIX = "/LOGICAL/TABLE/"; private static final String OFFLINE_TABLE_SUFFIX = "_OFFLINE"; private static final String REALTIME_TABLE_SUFFIX = "_REALTIME"; private static final String LOWER_CASE_OFFLINE_TABLE_SUFFIX = OFFLINE_TABLE_SUFFIX.toLowerCase(); @@ -138,12 +137,12 @@ public class TableCache implements PinotConfigProvider { synchronized (_zkLogicalTableConfigChangeListener) { // Subscribe child changes before reading the data to avoid missing changes - _propertyStore.subscribeChildChanges(LOGICAL_TABLE_PARENT_PATH, _zkLogicalTableConfigChangeListener); + _propertyStore.subscribeChildChanges(ZkPaths.LOGICAL_TABLE_PARENT_PATH, _zkLogicalTableConfigChangeListener); - List<String> tables = _propertyStore.getChildNames(LOGICAL_TABLE_PARENT_PATH, AccessOption.PERSISTENT); + List<String> tables = _propertyStore.getChildNames(ZkPaths.LOGICAL_TABLE_PARENT_PATH, AccessOption.PERSISTENT); if (CollectionUtils.isNotEmpty(tables)) { List<String> pathsToAdd = tables.stream() - .map(rawTableName -> LOGICAL_TABLE_PATH_PREFIX + rawTableName) + .map(rawTableName -> ZkPaths.LOGICAL_TABLE_PATH_PREFIX + rawTableName) .collect(Collectors.toList()); addLogicalTableConfigs(pathsToAdd); } @@ -391,7 +390,7 @@ public class TableCache implements PinotConfigProvider { private void removeLogicalTableConfig(String path) { _propertyStore.unsubscribeDataChanges(path, _zkLogicalTableConfigChangeListener); - String logicalTableName = path.substring(LOGICAL_TABLE_PATH_PREFIX.length()); + String logicalTableName = path.substring(ZkPaths.LOGICAL_TABLE_PATH_PREFIX.length()); _logicalTableConfigInfoMap.remove(logicalTableName); logicalTableName = _ignoreCase ? logicalTableName.toLowerCase() : logicalTableName; _logicalTableNameMap.remove(logicalTableName); @@ -616,7 +615,7 @@ public class TableCache implements PinotConfigProvider { List<String> pathsToAdd = new ArrayList<>(); for (String logicalTableName : logicalTableNames) { if (!_logicalTableConfigInfoMap.containsKey(logicalTableName)) { - pathsToAdd.add(LOGICAL_TABLE_PATH_PREFIX + logicalTableName); + pathsToAdd.add(ZkPaths.LOGICAL_TABLE_PATH_PREFIX + logicalTableName); } } if (!pathsToAdd.isEmpty()) { @@ -642,7 +641,7 @@ public class TableCache implements PinotConfigProvider { public synchronized void handleDataDeleted(String path) { // NOTE: The path here is the absolute ZK path instead of the relative path to the property store. String logicalTableName = path.substring(path.lastIndexOf('/') + 1); - removeLogicalTableConfig(LOGICAL_TABLE_PATH_PREFIX + logicalTableName); + removeLogicalTableConfig(ZkPaths.LOGICAL_TABLE_PATH_PREFIX + logicalTableName); notifyLogicalTableConfigChangeListeners(); } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java index eff90fcf49..dfb2bbe401 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java @@ -52,6 +52,7 @@ import org.apache.pinot.spi.config.user.UserConfig; import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.CommonConstants.ZkPaths; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.StringUtil; import org.apache.pinot.spi.utils.TableConfigDecoratorRegistry; @@ -73,7 +74,6 @@ public class ZKMetadataProvider { private static final String PROPERTYSTORE_SEGMENTS_PREFIX = "/SEGMENTS"; private static final String PROPERTYSTORE_PAUSELESS_DEBUG_METADATA_PREFIX = "/PAUSELESS_DEBUG_METADATA"; private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS"; - private static final String PROPERTYSTORE_LOGICAL_PREFIX = "/LOGICAL/TABLE"; private static final String PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX = "/INSTANCE_PARTITIONS"; private static final String PROPERTYSTORE_DATABASE_CONFIGS_PREFIX = "/CONFIGS/DATABASE"; private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX = "/CONFIGS/TABLE"; @@ -312,7 +312,7 @@ public class ZKMetadataProvider { } public static String constructPropertyStorePathForLogical(String tableName) { - return StringUtil.join("/", PROPERTYSTORE_LOGICAL_PREFIX, tableName); + return StringUtil.join("/", ZkPaths.LOGICAL_TABLE_PARENT_PATH, tableName); } public static boolean isSegmentExisted(ZkHelixPropertyStore<ZNRecord> propertyStore, String resourceNameForResource, @@ -850,7 +850,7 @@ public class ZKMetadataProvider { public static List<LogicalTableConfig> getAllLogicalTableConfigs(ZkHelixPropertyStore<ZNRecord> propertyStore) { List<ZNRecord> znRecords = - propertyStore.getChildren(PROPERTYSTORE_LOGICAL_PREFIX, null, AccessOption.PERSISTENT, 0, 0); + propertyStore.getChildren(ZkPaths.LOGICAL_TABLE_PARENT_PATH, null, AccessOption.PERSISTENT, 0, 0); if (znRecords != null) { return znRecords.stream().map(znRecord -> { try { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java index fc25ef4dd3..542fc9a248 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java @@ -21,6 +21,7 @@ package org.apache.pinot.common.utils.helix; import org.apache.helix.AccessOption; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.spi.utils.CommonConstants.ZkPaths; public class PinotHelixPropertyStoreZnRecordProvider { @@ -52,7 +53,7 @@ public class PinotHelixPropertyStoreZnRecordProvider { } public static PinotHelixPropertyStoreZnRecordProvider forLogicalTable(ZkHelixPropertyStore<ZNRecord> propertyStore) { - return new PinotHelixPropertyStoreZnRecordProvider(propertyStore, "/LOGICAL/TABLE"); + return new PinotHelixPropertyStoreZnRecordProvider(propertyStore, ZkPaths.LOGICAL_TABLE_PARENT_PATH); } public ZNRecord get(String name) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java index 7969d2aa86..8acf41df87 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java @@ -198,6 +198,17 @@ public class ControllerRequestClient { } } + public void deleteLogicalTable(String logicalTableName) + throws IOException { + try { + HttpClient.wrapAndThrowHttpException( + _httpClient.sendDeleteRequest(new URI(_controllerRequestURLBuilder.forLogicalTableDelete(logicalTableName)), + _headers)); + } catch (HttpErrorStatusException | URISyntaxException e) { + throw new IOException(e); + } + } + public TableConfig getTableConfig(String tableName, TableType tableType) throws IOException { try { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index d7bcf53bac..21894be117 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -1586,6 +1586,12 @@ public class PinotHelixResourceManager { } updateSchema(schema, oldSchema, forceTableSchemaUpdate); + if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, schemaName)) { + // For logical table schemas, we do not need to reload segments or send schema refresh messages + LOGGER.info("Logical table schema: {} updated, no need to reload segments or send schema refresh messages", + schemaName); + return; + } try { List<String> tableNamesWithType = getExistingTableNamesWithType(schemaName, null); if (reload) { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java index 01092a3137..55e0bb1772 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java @@ -800,6 +800,11 @@ public class ControllerTest { getControllerRequestClient().deleteTable(TableNameBuilder.REALTIME.tableNameWithType(tableName)); } + public void dropLogicalTable(String logicalTableName) + throws IOException { + getControllerRequestClient().deleteLogicalTable(logicalTableName); + } + public void waitForEVToAppear(String tableNameWithType) { TestUtils.waitForCondition(aVoid -> _helixResourceManager.getTableExternalView(tableNameWithType) != null, 60_000L, "Failed to create the external view for table: " + tableNameWithType); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/LogicalTableMetadataCacheTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/LogicalTableMetadataCacheTest.java new file mode 100644 index 0000000000..db459e9aff --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/LogicalTableMetadataCacheTest.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.common.config.provider.LogicalTableMetadataCache; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.LogicalTableConfig; +import org.apache.pinot.spi.data.MetricFieldSpec; +import org.apache.pinot.spi.data.PhysicalTableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; + + +public class LogicalTableMetadataCacheTest { + + private static final ControllerTest INSTANCE = ControllerTest.getInstance(); + private static final LogicalTableMetadataCache CACHE = new LogicalTableMetadataCache(); + + private final String _testTable = "testTable"; + private final String _extraTableName = "testExtraTable"; + private final Schema _testTableSchema = ControllerTest.createDummySchema(_testTable); + private final Schema _extraTableSchema = ControllerTest.createDummySchema(_extraTableName); + private final TableConfig _offlineTableConfig = ControllerTest.createDummyTableConfig(_testTable, TableType.OFFLINE); + private final TableConfig _realtimeTableConfig = + ControllerTest.createDummyTableConfig(_testTable, TableType.REALTIME); + private final TableConfig _extraOfflineTableConfig = + ControllerTest.createDummyTableConfig(_extraTableName, TableType.OFFLINE); + private final TableConfig _extraRealtimeTableConfig = + ControllerTest.createDummyTableConfig(_extraTableName, TableType.REALTIME); + + @BeforeClass + public void setUp() + throws Exception { + INSTANCE.setupSharedStateAndValidate(); + } + + @AfterClass + public void tearDown() + throws Exception { + INSTANCE.stopSharedTestSetup(); + } + + @BeforeMethod + public void beforeMethod() + throws IOException { + ZkHelixPropertyStore<ZNRecord> propertyStore = INSTANCE.getHelixResourceManager().getPropertyStore(); + CACHE.init(propertyStore); + + // Setup schema and table configs in the property store + INSTANCE.addSchema(_testTableSchema); + INSTANCE.addSchema(_extraTableSchema); + INSTANCE.addTableConfig(_offlineTableConfig); + INSTANCE.addTableConfig(_realtimeTableConfig); + INSTANCE.addTableConfig(_extraOfflineTableConfig); + INSTANCE.addTableConfig(_extraRealtimeTableConfig); + + // Ensure the schema and table configs are not loaded into the cache yet + assertNull(CACHE.getSchema(_testTable)); + assertNull(CACHE.getSchema(_extraTableName)); + assertNull(CACHE.getTableConfig(_offlineTableConfig.getTableName())); + assertNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName())); + assertNull(CACHE.getTableConfig(_extraOfflineTableConfig.getTableName())); + assertNull(CACHE.getTableConfig(_extraRealtimeTableConfig.getTableName())); + } + + @AfterMethod + public void afterMethod() { + CACHE.shutdown(); + INSTANCE.cleanup(); + } + + @Test + public void testLogicalTableCacheWithUpdates() + throws IOException { + String logicalTableName = "testLogicalTable1"; + LogicalTableConfig logicalTableConfig = addLogicalTableAndValidateCache( + logicalTableName, List.of(_offlineTableConfig.getTableName(), _realtimeTableConfig.getTableName())); + Schema logicalTableSchema = ControllerTest.createDummySchema(logicalTableName); + + // Update logical table config and verify the cache is updated + Map<String, PhysicalTableConfig> physicalTableConfigMap = logicalTableConfig.getPhysicalTableConfigMap(); + physicalTableConfigMap.put(_extraOfflineTableConfig.getTableName(), new PhysicalTableConfig()); + physicalTableConfigMap.put(_extraRealtimeTableConfig.getTableName(), new PhysicalTableConfig()); + assertNotEquals(CACHE.getLogicalTableConfig(logicalTableName), logicalTableConfig); + INSTANCE.updateLogicalTableConfig(logicalTableConfig); + TestUtils.waitForCondition( + aVoid -> CACHE.getLogicalTableConfig(logicalTableName).equals(logicalTableConfig), + 10_000L, "Logical table config not updated in cache"); + assertNull(CACHE.getTableConfig(_extraOfflineTableConfig.getTableName())); + assertNull(CACHE.getTableConfig(_extraRealtimeTableConfig.getTableName())); + + // Update logical table schema and verify the cache is updated + logicalTableSchema.addField(new MetricFieldSpec("newMetric", FieldSpec.DataType.INT)); + assertNotEquals(CACHE.getSchema(logicalTableName), logicalTableSchema); + INSTANCE.updateSchema(logicalTableSchema); + TestUtils.waitForCondition( + aVoid -> CACHE.getSchema(logicalTableName).equals(logicalTableSchema), + 10_000L, "Logical table schema not updated in cache"); + + // Update offline table configs and verify the cache is updated (update retention) + _offlineTableConfig.getValidationConfig().setRetentionTimeValue("10"); + assertNotEquals( + Objects.requireNonNull(CACHE.getTableConfig(_offlineTableConfig.getTableName())) + .getValidationConfig() + .getRetentionTimeValue(), + _offlineTableConfig.getValidationConfig().getRetentionTimeValue()); + INSTANCE.updateTableConfig(_offlineTableConfig); + TestUtils.waitForCondition( + aVoid -> Objects.requireNonNull(CACHE.getTableConfig(_offlineTableConfig.getTableName())) + .getValidationConfig() + .getRetentionTimeValue() + .equals(_offlineTableConfig.getValidationConfig().getRetentionTimeValue()), + 10_000L, "Offline table config not updated in cache"); + + // Update realtime table configs and verify the cache is updated (update retention) + _realtimeTableConfig.getValidationConfig().setRetentionTimeValue("20"); + assertNotEquals( + Objects.requireNonNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName())) + .getValidationConfig() + .getRetentionTimeValue(), + _realtimeTableConfig.getValidationConfig().getRetentionTimeValue()); + INSTANCE.updateTableConfig(_realtimeTableConfig); + TestUtils.waitForCondition( + aVoid -> Objects.requireNonNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName())) + .getValidationConfig().getRetentionTimeValue() + .equals(_realtimeTableConfig.getValidationConfig().getRetentionTimeValue()), + 10_000L, "Realtime table config not updated in cache"); + + // Delete logical table config and verify the cache is removed + INSTANCE.dropLogicalTable(logicalTableName); + TestUtils.waitForCondition( + aVoid -> CACHE.getSchema(logicalTableName) == null, + 10_000L, "Logical table schema not removed from cache"); + assertNull(CACHE.getLogicalTableConfig(logicalTableName)); + assertNull(CACHE.getTableConfig(_offlineTableConfig.getTableName())); + assertNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName())); + } + + @Test + public void testLogicalTableUpdateRefTables() + throws IOException { + String logicalTableName = "testLogicalTable2"; + LogicalTableConfig logicalTableConfig = addLogicalTableAndValidateCache( + logicalTableName, List.of(_offlineTableConfig.getTableName(), _realtimeTableConfig.getTableName())); + + // Update logical table config ref tables with extra tables and verify the cache is updated + logicalTableConfig.setRefOfflineTableName(_extraOfflineTableConfig.getTableName()); + logicalTableConfig.setRefRealtimeTableName(_extraRealtimeTableConfig.getTableName()); + logicalTableConfig.setPhysicalTableConfigMap( + Map.of(_extraOfflineTableConfig.getTableName(), new PhysicalTableConfig(), + _extraRealtimeTableConfig.getTableName(), new PhysicalTableConfig()) + ); + assertNotEquals(CACHE.getLogicalTableConfig(logicalTableName), logicalTableConfig); + + INSTANCE.updateLogicalTableConfig(logicalTableConfig); + + TestUtils.waitForCondition( + aVoid -> CACHE.getLogicalTableConfig(logicalTableName).equals(logicalTableConfig), + 10_000L, "Logical table config not updated in cache"); + assertNotNull(CACHE.getTableConfig(_extraOfflineTableConfig.getTableName())); + assertNotNull(CACHE.getTableConfig(_extraRealtimeTableConfig.getTableName())); + assertNull(CACHE.getTableConfig(_offlineTableConfig.getTableName())); + assertNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName())); + assertNotNull(CACHE.getSchema(logicalTableName)); + } + + @Test + public void testCacheWithMultipleLogicalTables() + throws IOException { + String logicalTableName = "testLogicalTable3"; + LogicalTableConfig logicalTableConfig = addLogicalTableAndValidateCache( + logicalTableName, List.of(_offlineTableConfig.getTableName(), _realtimeTableConfig.getTableName())); + + String otherLogicalTableName = "otherLogicalTable"; + LogicalTableConfig otherLogicalTableConfig = addLogicalTableAndValidateCache( + otherLogicalTableName, List.of(_offlineTableConfig.getTableName(), _realtimeTableConfig.getTableName())); + + // Delete one logical table config and verify the other is still present + INSTANCE.dropLogicalTable(logicalTableName); + TestUtils.waitForCondition( + aVoid -> CACHE.getSchema(logicalTableName) == null, + 10_000L, "Logical table schema not removed from cache"); + assertNull(CACHE.getLogicalTableConfig(logicalTableName)); + assertNotNull(CACHE.getLogicalTableConfig(otherLogicalTableName)); + assertNotNull(CACHE.getTableConfig(_offlineTableConfig.getTableName())); + assertNotNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName())); + + // Delete the other logical table config and verify the cache is empty + INSTANCE.dropLogicalTable(otherLogicalTableName); + TestUtils.waitForCondition( + aVoid -> CACHE.getSchema(otherLogicalTableName) == null, + 10_000L, "Logical table schema not removed from cache"); + assertNull(CACHE.getLogicalTableConfig(otherLogicalTableName)); + assertNull(CACHE.getTableConfig(_offlineTableConfig.getTableName())); + assertNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName())); + } + + private LogicalTableConfig addLogicalTableAndValidateCache(String logicalTableName, List<String> physicalTableNames) + throws IOException { + // Add logical table config + Schema logicalTableSchema = ControllerTest.createDummySchema(logicalTableName); + LogicalTableConfig logicalTableConfig = ControllerTest.getDummyLogicalTableConfig(logicalTableName, + physicalTableNames, "DefaultTenant"); + INSTANCE.addSchema(logicalTableSchema); + INSTANCE.addLogicalTableConfig(logicalTableConfig); + + // wait for the cache to be updated + TestUtils.waitForCondition( + aVoid -> CACHE.getLogicalTableConfig(logicalTableName) != null, + 10_000L, "Logical table config not loaded into cache"); + + // Verify that the logical table config is loaded into the cache + assertNotNull(CACHE.getSchema(logicalTableName)); + assertEquals(CACHE.getSchema(logicalTableName), logicalTableSchema); + assertNotNull(CACHE.getTableConfig(_offlineTableConfig.getTableName())); + assertNotNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName())); + assertNotNull(CACHE.getLogicalTableConfig(logicalTableName)); + assertEquals(CACHE.getLogicalTableConfig(logicalTableName), logicalTableConfig); + + // verify extra schema and table configs are not loaded + assertNull(CACHE.getSchema(_extraTableName)); + assertNull(CACHE.getTableConfig(_extraOfflineTableConfig.getTableName())); + assertNull(CACHE.getTableConfig(_extraRealtimeTableConfig.getTableName())); + return logicalTableConfig; + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java index 0dcbb3a9f3..414b664241 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java @@ -619,7 +619,9 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra JsonNode response = postQueryToController(query); assertNoError(response); - query = "SELECT count(*) FROM " + getOfflineTableNames().get(0); + String tableName = + getOfflineTableNames().isEmpty() ? getRealtimeTableNames().get(0) : getOfflineTableNames().get(0); + query = "SELECT count(*) FROM " + tableName; response = postQueryToController(query); assertNoError(response); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java index b0eb2e12d0..807a9a9fd6 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java @@ -253,10 +253,10 @@ public class ServerPlanRequestUtils { instanceRequest.setBrokerId("unknown"); instanceRequest.setEnableTrace(executionContext.isTraceEnabled()); /* - * If segmentList is not null, it means that the query is for a single table and we can directly set the segments. - * If segmentList is null, it means that the query is for a logical table and we need to set TableSegmentInfoList - * - * Either one of segmentList or tableRouteInfoList has to be set, but not both. + * If segmentList is not null, it means that the query is for a single table and we can directly set the segments. + * If segmentList is null, it means that the query is for a logical table and we need to set TableSegmentInfoList + * + * Either one of segmentList or tableRouteInfoList has to be set, but not both. */ if (segmentList != null) { instanceRequest.setSearchSegments(segmentList); @@ -422,7 +422,8 @@ public class ServerPlanRequestUtils { String logicalTableName = stageMetadata.getTableName(); LogicalTableContext logicalTableContext = instanceDataManager.getLogicalTableContext(logicalTableName); Preconditions.checkNotNull(logicalTableContext, - "LogicalTableManager not found for logical table name: " + logicalTableName); + String.format("LogicalTableContext not found for logical table name: %s, query context id: %s", + logicalTableName, QueryThreadContext.getCid())); Map<String, List<String>> logicalTableSegmentsMap = executionContext.getWorkerMetadata().getLogicalTableSegmentsMap(); @@ -430,7 +431,7 @@ public class ServerPlanRequestUtils { List<TableSegmentsInfo> realtimeTableRouteInfoList = new ArrayList<>(); Preconditions.checkNotNull(logicalTableSegmentsMap); - for (Map.Entry<String, List<String>> entry: logicalTableSegmentsMap.entrySet()) { + for (Map.Entry<String, List<String>> entry : logicalTableSegmentsMap.entrySet()) { String physicalTableName = entry.getKey(); TableType tableType = TableNameBuilder.getTableTypeFromTableName(physicalTableName); TableSegmentsInfo tableSegmentsInfo = new TableSegmentsInfo(); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index 07da49bb5f..924e0ff333 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -44,6 +44,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.HelixManager; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.common.config.provider.LogicalTableMetadataCache; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; @@ -85,6 +86,10 @@ public class HelixInstanceDataManager implements InstanceDataManager { private static final Logger LOGGER = LoggerFactory.getLogger(HelixInstanceDataManager.class); private final Map<String, TableDataManager> _tableDataManagerMap = new ConcurrentHashMap<>(); + + // Logical table metadata cache to cache logical table configs, schemas, and offline/realtime table configs. + private final LogicalTableMetadataCache _logicalTableMetadataCache = new LogicalTableMetadataCache(); + // TODO: Consider making segment locks per table instead of per instance private final SegmentLocks _segmentLocks = new SegmentLocks(); @@ -228,6 +233,9 @@ public class HelixInstanceDataManager implements InstanceDataManager { @Override public synchronized void start() { _propertyStore = _helixManager.getHelixPropertyStore(); + // Initialize logical table metadata cache + _logicalTableMetadataCache.init(_propertyStore); + LOGGER.info("Helix instance data manager started"); } @@ -255,6 +263,8 @@ public class HelixInstanceDataManager implements InstanceDataManager { } } SegmentBuildTimeLeaseExtender.shutdownExecutor(); + // shutdown logical table metadata cache + _logicalTableMetadataCache.shutdown(); LOGGER.info("Helix instance data manager shut down"); } @@ -533,17 +543,15 @@ public class HelixInstanceDataManager implements InstanceDataManager { } } - // TODO: LogicalTableContext has to be cached. https://github.com/apache/pinot/issues/15859 @Nullable @Override public LogicalTableContext getLogicalTableContext(String logicalTableName) { - Schema schema = ZKMetadataProvider.getSchema(getPropertyStore(), logicalTableName); + Schema schema = _logicalTableMetadataCache.getSchema(logicalTableName); if (schema == null) { LOGGER.warn("Failed to find schema for logical table: {}, skipping", logicalTableName); return null; } - LogicalTableConfig logicalTableConfig = ZKMetadataProvider.getLogicalTableConfig(getPropertyStore(), - logicalTableName); + LogicalTableConfig logicalTableConfig = _logicalTableMetadataCache.getLogicalTableConfig(logicalTableName); if (logicalTableConfig == null) { LOGGER.warn("Failed to find logical table config for logical table: {}, skipping", logicalTableName); return null; @@ -551,8 +559,7 @@ public class HelixInstanceDataManager implements InstanceDataManager { TableConfig offlineTableConfig = null; if (logicalTableConfig.getRefOfflineTableName() != null) { - offlineTableConfig = ZKMetadataProvider.getOfflineTableConfig(getPropertyStore(), - logicalTableConfig.getRefOfflineTableName()); + offlineTableConfig = _logicalTableMetadataCache.getTableConfig(logicalTableConfig.getRefOfflineTableName()); if (offlineTableConfig == null) { LOGGER.warn("Failed to find offline table config for logical table: {}, skipping", logicalTableName); return null; @@ -561,8 +568,7 @@ public class HelixInstanceDataManager implements InstanceDataManager { TableConfig realtimeTableConfig = null; if (logicalTableConfig.getRefRealtimeTableName() != null) { - realtimeTableConfig = ZKMetadataProvider.getRealtimeTableConfig(getPropertyStore(), - logicalTableConfig.getRefRealtimeTableName()); + realtimeTableConfig = _logicalTableMetadataCache.getTableConfig(logicalTableConfig.getRefRealtimeTableName()); if (realtimeTableConfig == null) { LOGGER.warn("Failed to find realtime table config for logical table: {}, skipping", logicalTableName); return null; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java index 2c52148098..973502c543 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.spi.data; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; @@ -31,6 +32,26 @@ import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.utils.JsonUtils; +/** + * Represents the configuration for a logical table in Pinot. + * + * <p> + * <ul> + * <li><b>tableName</b>: The name of the logical table.</li> + * <li><b>physicalTableConfigMap</b>: A map of physical table names to their configurations.</li> + * <li><b>brokerTenant</b>: The tenant for the broker.</li> + * <li><b>queryConfig</b>: Configuration for query execution on the logical table.</li> + * <li><b>quotaConfig</b>: Configuration for quota management on the logical table.</li> + * <li><b>refOfflineTableName</b>: The name of the offline table whose table config is referenced by this logical + * table.</li> + * <li><b>refRealtimeTableName</b>: The name of the realtime table whose table config is referenced by this logical + * table.</li> + * <li><b>timeBoundaryConfig</b>: Configuration for time boundaries of the logical table. This is used to determine + * the time boundaries for queries on the logical table, especially in hybrid scenarios where both offline and + * realtime data are present.</li> + * </ul> + * </p> + */ public class LogicalTableConfig extends BaseJsonConfig { private static final ObjectMapper DEFAULT_MAPPER = new ObjectMapper(); @@ -105,6 +126,7 @@ public class LogicalTableConfig extends BaseJsonConfig { _quotaConfig = quotaConfig; } + @Nullable public String getRefOfflineTableName() { return _refOfflineTableName; } @@ -113,6 +135,7 @@ public class LogicalTableConfig extends BaseJsonConfig { _refOfflineTableName = refOfflineTableName; } + @Nullable public String getRefRealtimeTableName() { return _refRealtimeTableName; } @@ -121,6 +144,7 @@ public class LogicalTableConfig extends BaseJsonConfig { _refRealtimeTableName = refRealtimeTableName; } + @Nullable public TimeBoundaryConfig getTimeBoundaryConfig() { return _timeBoundaryConfig; } @@ -151,6 +175,7 @@ public class LogicalTableConfig extends BaseJsonConfig { } } + @JsonIgnore public boolean isHybridLogicalTable() { return _refOfflineTableName != null && _refRealtimeTableName != null; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 7152d83f2b..86b8200dfc 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -1719,4 +1719,14 @@ public class CommonConstants { public static final String GROOVY_QUERY_STATIC_ANALYZER_CONFIG = "pinot.groovy.query.static.analyzer"; public static final String GROOVY_INGESTION_STATIC_ANALYZER_CONFIG = "pinot.groovy.ingestion.static.analyzer"; } + + /** + * ZK paths used by Pinot. + */ + public static class ZkPaths { + public static final String LOGICAL_TABLE_PARENT_PATH = "/LOGICAL/TABLE"; + public static final String LOGICAL_TABLE_PATH_PREFIX = "/LOGICAL/TABLE/"; + public static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/"; + public static final String SCHEMA_PATH_PREFIX = "/SCHEMAS/"; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org