This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d6f3ec3758f [refactor] Refactor TableCache to interface with
ZkTableCache implementation (#16646)
d6f3ec3758f is described below
commit d6f3ec3758f4ca5b83d5e0cf028ba8f4b03ad514
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Tue Aug 26 06:32:49 2025 -0700
[refactor] Refactor TableCache to interface with ZkTableCache
implementation (#16646)
Co-authored-by: Shaurya Chaturvedi <[email protected]>
---
.../broker/broker/helix/BaseBrokerStarter.java | 3 +-
.../pinot/common/config/provider/TableCache.java | 616 +--------------------
.../{TableCache.java => ZkTableCache.java} | 30 +-
.../helix/core/PinotHelixResourceManager.java | 3 +-
.../pinot/controller/helix/TableCacheTest.java | 3 +-
5 files changed, 41 insertions(+), 614 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 3d8a42563ac..f361632025a 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -59,6 +59,7 @@ import org.apache.pinot.common.Utils;
import org.apache.pinot.common.config.NettyConfig;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.config.provider.ZkTableCache;
import org.apache.pinot.common.cursors.AbstractResponseStore;
import org.apache.pinot.common.failuredetector.FailureDetector;
import org.apache.pinot.common.failuredetector.FailureDetectorFactory;
@@ -334,7 +335,7 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
FunctionRegistry.init();
boolean caseInsensitive =
_brokerConf.getProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY,
Helix.DEFAULT_ENABLE_CASE_INSENSITIVE);
- TableCache tableCache = new TableCache(_propertyStore, caseInsensitive);
+ TableCache tableCache = new ZkTableCache(_propertyStore, caseInsensitive);
LOGGER.info("Initializing Broker Event Listener Factory");
BrokerQueryEventListenerFactory.init(_brokerConf.subset(Broker.EVENT_LISTENER_CONFIG_PREFIX));
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
index 291e60a387d..6d789e685c6 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
@@ -18,158 +18,37 @@
*/
package org.apache.pinot.common.config.provider;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
import javax.annotation.Nullable;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.collections4.MapUtils;
-import org.apache.helix.AccessOption;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.helix.zookeeper.zkclient.IZkChildListener;
-import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.pinot.common.request.Expression;
-import org.apache.pinot.common.utils.LogicalTableConfigUtils;
-import org.apache.pinot.common.utils.config.SchemaSerDeUtils;
-import org.apache.pinot.common.utils.config.TableConfigSerDeUtils;
import org.apache.pinot.spi.config.provider.LogicalTableConfigChangeListener;
import org.apache.pinot.spi.config.provider.PinotConfigProvider;
import org.apache.pinot.spi.config.provider.SchemaChangeListener;
import org.apache.pinot.spi.config.provider.TableConfigChangeListener;
-import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.DimensionFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.CommonConstants.Segment.BuiltInVirtualColumn;
-import org.apache.pinot.spi.utils.CommonConstants.ZkPaths;
-import org.apache.pinot.spi.utils.TimestampIndexUtils;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.sql.parsers.CalciteSqlParser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
- * An implementation of {@link PinotConfigProvider}
+ * Interface for caching table configs and schemas within the cluster.
* The {@code TableCache} caches all the table configs and schemas within the
cluster, and listens on ZK changes to keep
* them in sync. It also maintains the table name map and the column name map
for case-insensitive queries.
*/
-public class TableCache implements PinotConfigProvider {
- private static final Logger LOGGER =
LoggerFactory.getLogger(TableCache.class);
- private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE";
- private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
- private static final String SCHEMA_PARENT_PATH = "/SCHEMAS";
- private static final String SCHEMA_PATH_PREFIX = "/SCHEMAS/";
- private static final String OFFLINE_TABLE_SUFFIX = "_OFFLINE";
- private static final String REALTIME_TABLE_SUFFIX = "_REALTIME";
- private static final String LOWER_CASE_OFFLINE_TABLE_SUFFIX =
OFFLINE_TABLE_SUFFIX.toLowerCase();
- private static final String LOWER_CASE_REALTIME_TABLE_SUFFIX =
REALTIME_TABLE_SUFFIX.toLowerCase();
-
- // NOTE: No need to use concurrent set because it is always accessed within
the ZK change listener lock
- private final Set<TableConfigChangeListener> _tableConfigChangeListeners =
new HashSet<>();
- private final Set<SchemaChangeListener> _schemaChangeListeners = new
HashSet<>();
- private final Set<LogicalTableConfigChangeListener>
_logicalTableConfigChangeListeners = new HashSet<>();
-
- private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
- private final boolean _ignoreCase;
-
- private final ZkTableConfigChangeListener _zkTableConfigChangeListener = new
ZkTableConfigChangeListener();
- // Key is table name with type suffix, value is table config info
- private final Map<String, TableConfigInfo> _tableConfigInfoMap = new
ConcurrentHashMap<>();
- // Key is lower case table name (with or without type suffix), value is
actual table name
- // For case-insensitive mode only
- private final Map<String, String> _tableNameMap = new ConcurrentHashMap<>();
-
- private final ZkSchemaChangeListener _zkSchemaChangeListener = new
ZkSchemaChangeListener();
- // Key is schema name, value is schema info
- private final Map<String, SchemaInfo> _schemaInfoMap = new
ConcurrentHashMap<>();
-
- private final ZkLogicalTableConfigChangeListener
- _zkLogicalTableConfigChangeListener = new
ZkLogicalTableConfigChangeListener();
- // Key is table name, value is logical table info
- private final Map<String, LogicalTableConfigInfo> _logicalTableConfigInfoMap
= new ConcurrentHashMap<>();
- // Key is lower case logical table name, value is actual logical table name
- // For case-insensitive mode only
- private final Map<String, String> _logicalTableNameMap = new
ConcurrentHashMap<>();
-
- public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore, boolean
ignoreCase) {
- _propertyStore = propertyStore;
- _ignoreCase = ignoreCase;
-
- synchronized (_zkTableConfigChangeListener) {
- // Subscribe child changes before reading the data to avoid missing
changes
- _propertyStore.subscribeChildChanges(TABLE_CONFIG_PARENT_PATH,
_zkTableConfigChangeListener);
-
- List<String> tables =
_propertyStore.getChildNames(TABLE_CONFIG_PARENT_PATH, AccessOption.PERSISTENT);
- if (CollectionUtils.isNotEmpty(tables)) {
- List<String> pathsToAdd = new ArrayList<>(tables.size());
- for (String tableNameWithType : tables) {
- pathsToAdd.add(TABLE_CONFIG_PATH_PREFIX + tableNameWithType);
- }
- addTableConfigs(pathsToAdd);
- }
- }
-
- synchronized (_zkSchemaChangeListener) {
- // Subscribe child changes before reading the data to avoid missing
changes
- _propertyStore.subscribeChildChanges(SCHEMA_PARENT_PATH,
_zkSchemaChangeListener);
-
- List<String> tables = _propertyStore.getChildNames(SCHEMA_PARENT_PATH,
AccessOption.PERSISTENT);
- if (CollectionUtils.isNotEmpty(tables)) {
- List<String> pathsToAdd = new ArrayList<>(tables.size());
- for (String rawTableName : tables) {
- pathsToAdd.add(SCHEMA_PATH_PREFIX + rawTableName);
- }
- addSchemas(pathsToAdd);
- }
- }
-
- synchronized (_zkLogicalTableConfigChangeListener) {
- // Subscribe child changes before reading the data to avoid missing
changes
- _propertyStore.subscribeChildChanges(ZkPaths.LOGICAL_TABLE_PARENT_PATH,
_zkLogicalTableConfigChangeListener);
-
- List<String> tables =
_propertyStore.getChildNames(ZkPaths.LOGICAL_TABLE_PARENT_PATH,
AccessOption.PERSISTENT);
- if (CollectionUtils.isNotEmpty(tables)) {
- List<String> pathsToAdd = tables.stream()
- .map(rawTableName -> ZkPaths.LOGICAL_TABLE_PATH_PREFIX +
rawTableName)
- .collect(Collectors.toList());
- addLogicalTableConfigs(pathsToAdd);
- }
- }
-
- LOGGER.info("Initialized TableCache with IgnoreCase: {}", ignoreCase);
- }
-
+public interface TableCache extends PinotConfigProvider {
/**
* Returns {@code true} if the TableCache is case-insensitive, {@code false}
otherwise.
*/
- public boolean isIgnoreCase() {
- return _ignoreCase;
- }
+ boolean isIgnoreCase();
/**
* Returns the actual table name for the given table name (with or without
type suffix), or {@code null} if the table
* does not exist.
*/
@Nullable
- public String getActualTableName(String tableName) {
- if (_ignoreCase) {
- return _tableNameMap.get(tableName.toLowerCase());
- } else {
- return _tableNameMap.get(tableName);
- }
- }
+ String getActualTableName(String tableName);
/**
* Returns the actual logical table name for the given table name, or {@code
null} if table does not exist.
@@ -177,531 +56,76 @@ public class TableCache implements PinotConfigProvider {
* @return Actual logical table name
*/
@Nullable
- public String getActualLogicalTableName(String logicalTableName) {
- return _ignoreCase
- ? _logicalTableNameMap.get(logicalTableName.toLowerCase())
- : _logicalTableNameMap.get(logicalTableName);
- }
+ String getActualLogicalTableName(String logicalTableName);
/**
* Returns a map from table name to actual table name. For case-insensitive
case, the keys of the map are in lower
* case.
*/
- public Map<String, String> getTableNameMap() {
- return _tableNameMap;
- }
+ Map<String, String> getTableNameMap();
/**
* Returns a map from logical table name to actual logical table name. For
case-insensitive case, the keys of the map
* are in lower case.
* @return Map from logical table name to actual logical table name
*/
- public Map<String, String> getLogicalTableNameMap() {
- return _logicalTableNameMap;
- }
+ Map<String, String> getLogicalTableNameMap();
/**
* Get all dimension table names.
* @return List of dimension table names
*/
- public List<String> getAllDimensionTables() {
- List<String> dimensionTables = new ArrayList<>();
- for (TableConfigInfo tableConfigInfo : _tableConfigInfoMap.values()) {
- if (tableConfigInfo._tableConfig.isDimTable()) {
- dimensionTables.add(tableConfigInfo._tableConfig.getTableName());
- }
- }
- return dimensionTables;
- }
+ List<String> getAllDimensionTables();
/**
* Returns a map from column name to actual column name for the given table,
or {@code null} if the table schema does
* not exist. For case-insensitive case, the keys of the map are in lower
case.
*/
@Nullable
- public Map<String, String> getColumnNameMap(String rawTableName) {
- SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
- return schemaInfo != null ? schemaInfo._columnNameMap : null;
- }
+ Map<String, String> getColumnNameMap(String rawTableName);
/**
* Returns the expression override map for the given logical or physical
table, or {@code null} if no override is
* configured.
*/
@Nullable
- public Map<Expression, Expression> getExpressionOverrideMap(String
physicalOrLogicalTableName) {
- TableConfigInfo tableConfigInfo =
_tableConfigInfoMap.get(physicalOrLogicalTableName);
- if (tableConfigInfo != null) {
- return tableConfigInfo._expressionOverrideMap;
- }
- LogicalTableConfigInfo logicalTableConfigInfo =
_logicalTableConfigInfoMap.get(physicalOrLogicalTableName);
- return logicalTableConfigInfo != null ?
logicalTableConfigInfo._expressionOverrideMap : null;
- }
+ Map<Expression, Expression> getExpressionOverrideMap(String
physicalOrLogicalTableName);
/**
* Returns the timestamp index columns for the given table, or {@code null}
if table does not exist.
*/
@Nullable
- public Set<String> getTimestampIndexColumns(String tableNameWithType) {
- TableConfigInfo tableConfigInfo =
_tableConfigInfoMap.get(tableNameWithType);
- return tableConfigInfo != null ? tableConfigInfo._timestampIndexColumns :
null;
- }
+ Set<String> getTimestampIndexColumns(String tableNameWithType);
/**
* Returns the table config for the given table, or {@code null} if it does
not exist.
*/
@Nullable
@Override
- public TableConfig getTableConfig(String tableNameWithType) {
- TableConfigInfo tableConfigInfo =
_tableConfigInfoMap.get(tableNameWithType);
- return tableConfigInfo != null ? tableConfigInfo._tableConfig : null;
- }
+ TableConfig getTableConfig(String tableNameWithType);
@Nullable
@Override
- public LogicalTableConfig getLogicalTableConfig(String logicalTableName) {
- LogicalTableConfigInfo logicalTableConfigInfo =
_logicalTableConfigInfoMap.get(logicalTableName);
- return logicalTableConfigInfo != null ?
logicalTableConfigInfo._logicalTableConfig : null;
- }
+ LogicalTableConfig getLogicalTableConfig(String logicalTableName);
@Override
- public boolean registerTableConfigChangeListener(TableConfigChangeListener
tableConfigChangeListener) {
- synchronized (_zkTableConfigChangeListener) {
- boolean added =
_tableConfigChangeListeners.add(tableConfigChangeListener);
- if (added) {
- tableConfigChangeListener.onChange(getTableConfigs());
- }
- return added;
- }
- }
+ boolean registerTableConfigChangeListener(TableConfigChangeListener
tableConfigChangeListener);
/**
* Returns the schema for the given logical or physical table, or {@code
null} if it does not exist.
*/
@Nullable
@Override
- public Schema getSchema(String rawTableName) {
- SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
- return schemaInfo != null ? schemaInfo._schema : null;
- }
+ Schema getSchema(String rawTableName);
@Override
- public boolean registerSchemaChangeListener(SchemaChangeListener
schemaChangeListener) {
- synchronized (_zkSchemaChangeListener) {
- boolean added = _schemaChangeListeners.add(schemaChangeListener);
- if (added) {
- schemaChangeListener.onChange(getSchemas());
- }
- return added;
- }
- }
-
- private void addTableConfigs(List<String> paths) {
- // Subscribe data changes before reading the data to avoid missing changes
- for (String path : paths) {
- _propertyStore.subscribeDataChanges(path, _zkTableConfigChangeListener);
- }
- List<ZNRecord> znRecords = _propertyStore.get(paths, null,
AccessOption.PERSISTENT);
- for (ZNRecord znRecord : znRecords) {
- if (znRecord != null) {
- try {
- putTableConfig(znRecord);
- } catch (Exception e) {
- LOGGER.error("Caught exception while adding table config for
ZNRecord: {}", znRecord.getId(), e);
- }
- }
- }
- }
-
- private void addLogicalTableConfigs(List<String> paths) {
- // Subscribe data changes before reading the data to avoid missing changes
- for (String path : paths) {
- _propertyStore.subscribeDataChanges(path,
_zkLogicalTableConfigChangeListener);
- }
- List<ZNRecord> znRecords = _propertyStore.get(paths, null,
AccessOption.PERSISTENT);
- for (ZNRecord znRecord : znRecords) {
- if (znRecord != null) {
- try {
- putLogicalTableConfig(znRecord);
- } catch (Exception e) {
- LOGGER.error("Caught exception while adding logical table for
ZNRecord: {}", znRecord.getId(), e);
- }
- }
- }
- }
-
- private void putTableConfig(ZNRecord znRecord)
- throws IOException {
- TableConfig tableConfig = TableConfigSerDeUtils.fromZNRecord(znRecord);
- String tableNameWithType = tableConfig.getTableName();
- String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
- _tableConfigInfoMap.put(tableNameWithType, new
TableConfigInfo(tableConfig));
- if (_ignoreCase) {
- _tableNameMap.put(tableNameWithType.toLowerCase(), tableNameWithType);
- _tableNameMap.put(rawTableName.toLowerCase(), rawTableName);
- } else {
- _tableNameMap.put(tableNameWithType, tableNameWithType);
- _tableNameMap.put(rawTableName, rawTableName);
- }
- }
-
- private void putLogicalTableConfig(ZNRecord znRecord)
- throws IOException {
- LogicalTableConfig logicalTableConfig =
LogicalTableConfigUtils.fromZNRecord(znRecord);
- String logicalTableName = logicalTableConfig.getTableName();
- _logicalTableConfigInfoMap.put(logicalTableName, new
LogicalTableConfigInfo(logicalTableConfig));
- if (_ignoreCase) {
- _logicalTableNameMap.put(logicalTableName.toLowerCase(),
logicalTableName);
- } else {
- _logicalTableNameMap.put(logicalTableName, logicalTableName);
- }
- }
-
- private void removeTableConfig(String path) {
- _propertyStore.unsubscribeDataChanges(path, _zkTableConfigChangeListener);
- String tableNameWithType =
path.substring(TABLE_CONFIG_PATH_PREFIX.length());
- String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
- _tableConfigInfoMap.remove(tableNameWithType);
- if (_ignoreCase) {
- _tableNameMap.remove(tableNameWithType.toLowerCase());
- String lowerCaseRawTableName = rawTableName.toLowerCase();
- if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
- if (!_tableNameMap.containsKey(lowerCaseRawTableName +
LOWER_CASE_REALTIME_TABLE_SUFFIX)) {
- _tableNameMap.remove(lowerCaseRawTableName);
- }
- } else {
- if (!_tableNameMap.containsKey(lowerCaseRawTableName +
LOWER_CASE_OFFLINE_TABLE_SUFFIX)) {
- _tableNameMap.remove(lowerCaseRawTableName);
- }
- }
- } else {
- _tableNameMap.remove(tableNameWithType);
- if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
- if (!_tableNameMap.containsKey(rawTableName + REALTIME_TABLE_SUFFIX)) {
- _tableNameMap.remove(rawTableName);
- }
- } else {
- if (!_tableNameMap.containsKey(rawTableName + OFFLINE_TABLE_SUFFIX)) {
- _tableNameMap.remove(rawTableName);
- }
- }
- }
- }
-
- private void removeLogicalTableConfig(String path) {
- _propertyStore.unsubscribeDataChanges(path,
_zkLogicalTableConfigChangeListener);
- String logicalTableName =
path.substring(ZkPaths.LOGICAL_TABLE_PATH_PREFIX.length());
- _logicalTableConfigInfoMap.remove(logicalTableName);
- logicalTableName = _ignoreCase ? logicalTableName.toLowerCase() :
logicalTableName;
- _logicalTableNameMap.remove(logicalTableName);
- }
-
- private void addSchemas(List<String> paths) {
- // Subscribe data changes before reading the data to avoid missing changes
- for (String path : paths) {
- _propertyStore.subscribeDataChanges(path, _zkSchemaChangeListener);
- }
- List<ZNRecord> znRecords = _propertyStore.get(paths, null,
AccessOption.PERSISTENT);
- for (ZNRecord znRecord : znRecords) {
- if (znRecord != null) {
- try {
- putSchema(znRecord);
- } catch (Exception e) {
- LOGGER.error("Caught exception while adding schema for ZNRecord:
{}", znRecord.getId(), e);
- }
- }
- }
- }
-
- private void putSchema(ZNRecord znRecord)
- throws IOException {
- Schema schema = SchemaSerDeUtils.fromZNRecord(znRecord);
- addBuiltInVirtualColumns(schema);
- String schemaName = schema.getSchemaName();
- Map<String, String> columnNameMap = new HashMap<>();
- if (_ignoreCase) {
- for (String columnName : schema.getColumnNames()) {
- columnNameMap.put(columnName.toLowerCase(), columnName);
- }
- } else {
- for (String columnName : schema.getColumnNames()) {
- columnNameMap.put(columnName, columnName);
- }
- }
- _schemaInfoMap.put(schemaName, new SchemaInfo(schema, columnNameMap));
- }
-
- /**
- * Adds the built-in virtual columns to the schema.
- * NOTE: The virtual column provider class is not added.
- */
- private static void addBuiltInVirtualColumns(Schema schema) {
- if (!schema.hasColumn(BuiltInVirtualColumn.DOCID)) {
- schema.addField(new DimensionFieldSpec(BuiltInVirtualColumn.DOCID,
FieldSpec.DataType.INT, true));
- }
- if (!schema.hasColumn(BuiltInVirtualColumn.HOSTNAME)) {
- schema.addField(new DimensionFieldSpec(BuiltInVirtualColumn.HOSTNAME,
FieldSpec.DataType.STRING, true));
- }
- if (!schema.hasColumn(BuiltInVirtualColumn.SEGMENTNAME)) {
- schema.addField(new DimensionFieldSpec(BuiltInVirtualColumn.SEGMENTNAME,
FieldSpec.DataType.STRING, true));
- }
- }
-
- private void removeSchema(String path) {
- _propertyStore.unsubscribeDataChanges(path, _zkSchemaChangeListener);
- String schemaName = path.substring(SCHEMA_PATH_PREFIX.length());
- _schemaInfoMap.remove(schemaName);
- }
-
- private void notifyTableConfigChangeListeners() {
- if (!_tableConfigChangeListeners.isEmpty()) {
- List<TableConfig> tableConfigs = getTableConfigs();
- for (TableConfigChangeListener tableConfigChangeListener :
_tableConfigChangeListeners) {
- tableConfigChangeListener.onChange(tableConfigs);
- }
- }
- }
+ boolean registerSchemaChangeListener(SchemaChangeListener
schemaChangeListener);
- private void notifyLogicalTableConfigChangeListeners() {
- if (!_logicalTableConfigChangeListeners.isEmpty()) {
- List<LogicalTableConfig> logicalTableConfigs = getLogicalTableConfigs();
- for (LogicalTableConfigChangeListener listener :
_logicalTableConfigChangeListeners) {
- listener.onChange(logicalTableConfigs);
- }
- }
- }
+ List<LogicalTableConfig> getLogicalTableConfigs();
- private List<TableConfig> getTableConfigs() {
- List<TableConfig> tableConfigs = new
ArrayList<>(_tableConfigInfoMap.size());
- for (TableConfigInfo tableConfigInfo : _tableConfigInfoMap.values()) {
- tableConfigs.add(tableConfigInfo._tableConfig);
- }
- return tableConfigs;
- }
-
- public List<LogicalTableConfig> getLogicalTableConfigs() {
- return _logicalTableConfigInfoMap.values().stream().map(o ->
o._logicalTableConfig).collect(Collectors.toList());
- }
-
- private void notifySchemaChangeListeners() {
- if (!_schemaChangeListeners.isEmpty()) {
- List<Schema> schemas = getSchemas();
- for (SchemaChangeListener schemaChangeListener : _schemaChangeListeners)
{
- schemaChangeListener.onChange(schemas);
- }
- }
- }
-
- private List<Schema> getSchemas() {
- List<Schema> schemas = new ArrayList<>(_schemaInfoMap.size());
- for (SchemaInfo schemaInfo : _schemaInfoMap.values()) {
- schemas.add(schemaInfo._schema);
- }
- return schemas;
- }
-
- public boolean isLogicalTable(String logicalTableName) {
- logicalTableName = _ignoreCase ? logicalTableName.toLowerCase() :
logicalTableName;
- return _logicalTableConfigInfoMap.containsKey(logicalTableName);
- }
+ boolean isLogicalTable(String logicalTableName);
@Override
- public boolean registerLogicalTableConfigChangeListener(
- LogicalTableConfigChangeListener logicalTableConfigChangeListener) {
- synchronized (_zkLogicalTableConfigChangeListener) {
- boolean added =
_logicalTableConfigChangeListeners.add(logicalTableConfigChangeListener);
- if (added) {
- logicalTableConfigChangeListener.onChange(getLogicalTableConfigs());
- }
- return added;
- }
- }
-
- private class ZkTableConfigChangeListener implements IZkChildListener,
IZkDataListener {
-
- @Override
- public synchronized void handleChildChange(String path, List<String>
tableNamesWithType) {
- if (CollectionUtils.isEmpty(tableNamesWithType)) {
- return;
- }
-
- // Only process new added table configs. Changed/removed table configs
are handled by other callbacks.
- List<String> pathsToAdd = new ArrayList<>();
- for (String tableNameWithType : tableNamesWithType) {
- if (!_tableConfigInfoMap.containsKey(tableNameWithType)) {
- pathsToAdd.add(TABLE_CONFIG_PATH_PREFIX + tableNameWithType);
- }
- }
- if (!pathsToAdd.isEmpty()) {
- addTableConfigs(pathsToAdd);
- }
- notifyTableConfigChangeListeners();
- }
-
- @Override
- public synchronized void handleDataChange(String path, Object data) {
- if (data != null) {
- ZNRecord znRecord = (ZNRecord) data;
- try {
- putTableConfig(znRecord);
- } catch (Exception e) {
- LOGGER.error("Caught exception while refreshing table config for
ZNRecord: {}", znRecord.getId(), e);
- }
- notifyTableConfigChangeListeners();
- }
- }
-
- @Override
- public synchronized void handleDataDeleted(String path) {
- // NOTE: The path here is the absolute ZK path instead of the relative
path to the property store.
- String tableNameWithType = path.substring(path.lastIndexOf('/') + 1);
- removeTableConfig(TABLE_CONFIG_PATH_PREFIX + tableNameWithType);
- notifyTableConfigChangeListeners();
- }
- }
-
- private class ZkSchemaChangeListener implements IZkChildListener,
IZkDataListener {
-
- @Override
- public synchronized void handleChildChange(String path, List<String>
schemaNames) {
- if (CollectionUtils.isEmpty(schemaNames)) {
- return;
- }
-
- // Only process new added schemas. Changed/removed schemas are handled
by other callbacks.
- List<String> pathsToAdd = new ArrayList<>();
- for (String schemaName : schemaNames) {
- if (!_schemaInfoMap.containsKey(schemaName)) {
- pathsToAdd.add(SCHEMA_PATH_PREFIX + schemaName);
- }
- }
- if (!pathsToAdd.isEmpty()) {
- addSchemas(pathsToAdd);
- }
- notifySchemaChangeListeners();
- }
-
- @Override
- public synchronized void handleDataChange(String path, Object data) {
- if (data != null) {
- ZNRecord znRecord = (ZNRecord) data;
- try {
- putSchema(znRecord);
- } catch (Exception e) {
- LOGGER.error("Caught exception while refreshing schema for ZNRecord:
{}", znRecord.getId(), e);
- }
- notifySchemaChangeListeners();
- }
- }
-
- @Override
- public synchronized void handleDataDeleted(String path) {
- // NOTE: The path here is the absolute ZK path instead of the relative
path to the property store.
- String schemaName = path.substring(path.lastIndexOf('/') + 1);
- removeSchema(SCHEMA_PATH_PREFIX + schemaName);
- notifySchemaChangeListeners();
- }
- }
-
- private class ZkLogicalTableConfigChangeListener implements
IZkChildListener, IZkDataListener {
-
- @Override
- public synchronized void handleChildChange(String path, List<String>
logicalTableNames) {
- if (CollectionUtils.isEmpty(logicalTableNames)) {
- return;
- }
-
- // Only process new added logical tables. Changed/removed logical tables
are handled by other callbacks.
- List<String> pathsToAdd = new ArrayList<>();
- for (String logicalTableName : logicalTableNames) {
- if (!_logicalTableConfigInfoMap.containsKey(logicalTableName)) {
- pathsToAdd.add(ZkPaths.LOGICAL_TABLE_PATH_PREFIX + logicalTableName);
- }
- }
- if (!pathsToAdd.isEmpty()) {
- addLogicalTableConfigs(pathsToAdd);
- }
- notifyLogicalTableConfigChangeListeners();
- }
-
- @Override
- public synchronized void handleDataChange(String path, Object data) {
- if (data != null) {
- ZNRecord znRecord = (ZNRecord) data;
- try {
- putLogicalTableConfig(znRecord);
- } catch (Exception e) {
- LOGGER.error("Caught exception while refreshing logical table for
ZNRecord: {}", znRecord.getId(), e);
- }
- notifyLogicalTableConfigChangeListeners();
- }
- }
-
- @Override
- public synchronized void handleDataDeleted(String path) {
- // NOTE: The path here is the absolute ZK path instead of the relative
path to the property store.
- String logicalTableName = path.substring(path.lastIndexOf('/') + 1);
- removeLogicalTableConfig(ZkPaths.LOGICAL_TABLE_PATH_PREFIX +
logicalTableName);
- notifyLogicalTableConfigChangeListeners();
- }
- }
-
- private static Map<Expression, Expression>
createExpressionOverrideMap(String physicalOrLogicalTableName,
- QueryConfig queryConfig) {
- Map<Expression, Expression> expressionOverrideMap = new TreeMap<>();
- if (queryConfig != null &&
MapUtils.isNotEmpty(queryConfig.getExpressionOverrideMap())) {
- for (Map.Entry<String, String> entry :
queryConfig.getExpressionOverrideMap().entrySet()) {
- try {
- Expression srcExp =
CalciteSqlParser.compileToExpression(entry.getKey());
- Expression destExp =
CalciteSqlParser.compileToExpression(entry.getValue());
- expressionOverrideMap.put(srcExp, destExp);
- } catch (Exception e) {
- LOGGER.warn("Caught exception while compiling expression override:
{} -> {} for table: {}, skipping it",
- entry.getKey(), entry.getValue(), physicalOrLogicalTableName);
- }
- }
- int mapSize = expressionOverrideMap.size();
- if (mapSize == 1) {
- Map.Entry<Expression, Expression> entry =
expressionOverrideMap.entrySet().iterator().next();
- return Collections.singletonMap(entry.getKey(), entry.getValue());
- } else if (mapSize > 1) {
- return expressionOverrideMap;
- }
- }
- return null;
- }
-
- private static class TableConfigInfo {
- final TableConfig _tableConfig;
- final Map<Expression, Expression> _expressionOverrideMap;
- // All the timestamp with granularity column names
- final Set<String> _timestampIndexColumns;
-
- private TableConfigInfo(TableConfig tableConfig) {
- _tableConfig = tableConfig;
- _expressionOverrideMap =
createExpressionOverrideMap(tableConfig.getTableName(),
tableConfig.getQueryConfig());
- _timestampIndexColumns =
TimestampIndexUtils.extractColumnsWithGranularity(tableConfig);
- }
- }
-
- private static class SchemaInfo {
- final Schema _schema;
- final Map<String, String> _columnNameMap;
-
- private SchemaInfo(Schema schema, Map<String, String> columnNameMap) {
- _schema = schema;
- _columnNameMap = columnNameMap;
- }
- }
-
- private static class LogicalTableConfigInfo {
- final LogicalTableConfig _logicalTableConfig;
- final Map<Expression, Expression> _expressionOverrideMap;
-
- private LogicalTableConfigInfo(LogicalTableConfig logicalTableConfig) {
- _logicalTableConfig = logicalTableConfig;
- _expressionOverrideMap =
createExpressionOverrideMap(logicalTableConfig.getTableName(),
- logicalTableConfig.getQueryConfig());
- }
- }
+ boolean registerLogicalTableConfigChangeListener(
+ LogicalTableConfigChangeListener logicalTableConfigChangeListener);
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/ZkTableCache.java
similarity index 96%
copy from
pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
copy to
pinot-common/src/main/java/org/apache/pinot/common/config/provider/ZkTableCache.java
index 291e60a387d..8e688468b19 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/ZkTableCache.java
@@ -65,8 +65,8 @@ import org.slf4j.LoggerFactory;
* The {@code TableCache} caches all the table configs and schemas within the
cluster, and listens on ZK changes to keep
* them in sync. It also maintains the table name map and the column name map
for case-insensitive queries.
*/
-public class TableCache implements PinotConfigProvider {
- private static final Logger LOGGER =
LoggerFactory.getLogger(TableCache.class);
+public class ZkTableCache implements TableCache {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ZkTableCache.class);
private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE";
private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
private static final String SCHEMA_PARENT_PATH = "/SCHEMAS";
@@ -96,14 +96,14 @@ public class TableCache implements PinotConfigProvider {
private final Map<String, SchemaInfo> _schemaInfoMap = new
ConcurrentHashMap<>();
private final ZkLogicalTableConfigChangeListener
- _zkLogicalTableConfigChangeListener = new
ZkLogicalTableConfigChangeListener();
+ _zkLogicalTableConfigChangeListener = new
ZkLogicalTableConfigChangeListener();
// Key is table name, value is logical table info
private final Map<String, LogicalTableConfigInfo> _logicalTableConfigInfoMap
= new ConcurrentHashMap<>();
// Key is lower case logical table name, value is actual logical table name
// For case-insensitive mode only
private final Map<String, String> _logicalTableNameMap = new
ConcurrentHashMap<>();
- public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore, boolean
ignoreCase) {
+ public ZkTableCache(ZkHelixPropertyStore<ZNRecord> propertyStore, boolean
ignoreCase) {
_propertyStore = propertyStore;
_ignoreCase = ignoreCase;
@@ -142,8 +142,8 @@ public class TableCache implements PinotConfigProvider {
List<String> tables =
_propertyStore.getChildNames(ZkPaths.LOGICAL_TABLE_PARENT_PATH,
AccessOption.PERSISTENT);
if (CollectionUtils.isNotEmpty(tables)) {
List<String> pathsToAdd = tables.stream()
- .map(rawTableName -> ZkPaths.LOGICAL_TABLE_PATH_PREFIX +
rawTableName)
- .collect(Collectors.toList());
+ .map(rawTableName -> ZkPaths.LOGICAL_TABLE_PATH_PREFIX +
rawTableName)
+ .collect(Collectors.toList());
addLogicalTableConfigs(pathsToAdd);
}
}
@@ -179,8 +179,8 @@ public class TableCache implements PinotConfigProvider {
@Nullable
public String getActualLogicalTableName(String logicalTableName) {
return _ignoreCase
- ? _logicalTableNameMap.get(logicalTableName.toLowerCase())
- : _logicalTableNameMap.get(logicalTableName);
+ ? _logicalTableNameMap.get(logicalTableName.toLowerCase())
+ : _logicalTableNameMap.get(logicalTableName);
}
/**
@@ -331,7 +331,7 @@ public class TableCache implements PinotConfigProvider {
}
private void putTableConfig(ZNRecord znRecord)
- throws IOException {
+ throws IOException {
TableConfig tableConfig = TableConfigSerDeUtils.fromZNRecord(znRecord);
String tableNameWithType = tableConfig.getTableName();
String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
@@ -346,7 +346,7 @@ public class TableCache implements PinotConfigProvider {
}
private void putLogicalTableConfig(ZNRecord znRecord)
- throws IOException {
+ throws IOException {
LogicalTableConfig logicalTableConfig =
LogicalTableConfigUtils.fromZNRecord(znRecord);
String logicalTableName = logicalTableConfig.getTableName();
_logicalTableConfigInfoMap.put(logicalTableName, new
LogicalTableConfigInfo(logicalTableConfig));
@@ -414,7 +414,7 @@ public class TableCache implements PinotConfigProvider {
}
private void putSchema(ZNRecord znRecord)
- throws IOException {
+ throws IOException {
Schema schema = SchemaSerDeUtils.fromZNRecord(znRecord);
addBuiltInVirtualColumns(schema);
String schemaName = schema.getSchemaName();
@@ -507,7 +507,7 @@ public class TableCache implements PinotConfigProvider {
@Override
public boolean registerLogicalTableConfigChangeListener(
- LogicalTableConfigChangeListener logicalTableConfigChangeListener) {
+ LogicalTableConfigChangeListener logicalTableConfigChangeListener) {
synchronized (_zkLogicalTableConfigChangeListener) {
boolean added =
_logicalTableConfigChangeListeners.add(logicalTableConfigChangeListener);
if (added) {
@@ -647,7 +647,7 @@ public class TableCache implements PinotConfigProvider {
}
private static Map<Expression, Expression>
createExpressionOverrideMap(String physicalOrLogicalTableName,
- QueryConfig queryConfig) {
+ QueryConfig queryConfig) {
Map<Expression, Expression> expressionOverrideMap = new TreeMap<>();
if (queryConfig != null &&
MapUtils.isNotEmpty(queryConfig.getExpressionOverrideMap())) {
for (Map.Entry<String, String> entry :
queryConfig.getExpressionOverrideMap().entrySet()) {
@@ -657,7 +657,7 @@ public class TableCache implements PinotConfigProvider {
expressionOverrideMap.put(srcExp, destExp);
} catch (Exception e) {
LOGGER.warn("Caught exception while compiling expression override:
{} -> {} for table: {}, skipping it",
- entry.getKey(), entry.getValue(), physicalOrLogicalTableName);
+ entry.getKey(), entry.getValue(), physicalOrLogicalTableName);
}
}
int mapSize = expressionOverrideMap.size();
@@ -701,7 +701,7 @@ public class TableCache implements PinotConfigProvider {
private LogicalTableConfigInfo(LogicalTableConfig logicalTableConfig) {
_logicalTableConfig = logicalTableConfig;
_expressionOverrideMap =
createExpressionOverrideMap(logicalTableConfig.getTableName(),
- logicalTableConfig.getQueryConfig());
+ logicalTableConfig.getQueryConfig());
}
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 7e0761103e5..0c415a3a48c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -92,6 +92,7 @@ import
org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.config.provider.ZkTableCache;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.exception.SchemaAlreadyExistsException;
import org.apache.pinot.common.exception.SchemaBackwardIncompatibleException;
@@ -325,7 +326,7 @@ public class PinotHelixResourceManager {
_helixAdmin.getConfig(helixConfigScope,
Arrays.asList(Helix.ENABLE_CASE_INSENSITIVE_KEY));
boolean caseInsensitive =
Boolean.parseBoolean(configs.getOrDefault(Helix.ENABLE_CASE_INSENSITIVE_KEY,
Boolean.toString(Helix.DEFAULT_ENABLE_CASE_INSENSITIVE)));
- _tableCache = new TableCache(_propertyStore, caseInsensitive);
+ _tableCache = new ZkTableCache(_propertyStore, caseInsensitive);
}
/**
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
index 0f21afc305e..f35d97ccbec 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.config.provider.ZkTableCache;
import org.apache.pinot.common.exception.SchemaAlreadyExistsException;
import org.apache.pinot.common.exception.SchemaBackwardIncompatibleException;
import org.apache.pinot.spi.config.provider.LogicalTableConfigChangeListener;
@@ -73,7 +74,7 @@ public class TableCacheTest {
@Test(dataProvider = "testTableCacheDataProvider")
public void testTableCache(boolean isCaseInsensitive)
throws Exception {
- TableCache tableCache = new TableCache(TEST_INSTANCE.getPropertyStore(),
isCaseInsensitive);
+ TableCache tableCache = new ZkTableCache(TEST_INSTANCE.getPropertyStore(),
isCaseInsensitive);
assertNull(tableCache.getSchema(RAW_TABLE_NAME));
assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]