This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d6f3ec3758f [refactor] Refactor TableCache to interface with 
ZkTableCache implementation (#16646)
d6f3ec3758f is described below

commit d6f3ec3758f4ca5b83d5e0cf028ba8f4b03ad514
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Tue Aug 26 06:32:49 2025 -0700

    [refactor] Refactor TableCache to interface with ZkTableCache 
implementation (#16646)
    
    Co-authored-by: Shaurya Chaturvedi <[email protected]>
---
 .../broker/broker/helix/BaseBrokerStarter.java     |   3 +-
 .../pinot/common/config/provider/TableCache.java   | 616 +--------------------
 .../{TableCache.java => ZkTableCache.java}         |  30 +-
 .../helix/core/PinotHelixResourceManager.java      |   3 +-
 .../pinot/controller/helix/TableCacheTest.java     |   3 +-
 5 files changed, 41 insertions(+), 614 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 3d8a42563ac..f361632025a 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -59,6 +59,7 @@ import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.config.NettyConfig;
 import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.config.provider.ZkTableCache;
 import org.apache.pinot.common.cursors.AbstractResponseStore;
 import org.apache.pinot.common.failuredetector.FailureDetector;
 import org.apache.pinot.common.failuredetector.FailureDetectorFactory;
@@ -334,7 +335,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     FunctionRegistry.init();
     boolean caseInsensitive =
         _brokerConf.getProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, 
Helix.DEFAULT_ENABLE_CASE_INSENSITIVE);
-    TableCache tableCache = new TableCache(_propertyStore, caseInsensitive);
+    TableCache tableCache = new ZkTableCache(_propertyStore, caseInsensitive);
 
     LOGGER.info("Initializing Broker Event Listener Factory");
     
BrokerQueryEventListenerFactory.init(_brokerConf.subset(Broker.EVENT_LISTENER_CONFIG_PREFIX));
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
index 291e60a387d..6d789e685c6 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
@@ -18,158 +18,37 @@
  */
 package org.apache.pinot.common.config.provider;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
 import javax.annotation.Nullable;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.collections4.MapUtils;
-import org.apache.helix.AccessOption;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.helix.zookeeper.zkclient.IZkChildListener;
-import org.apache.helix.zookeeper.zkclient.IZkDataListener;
 import org.apache.pinot.common.request.Expression;
-import org.apache.pinot.common.utils.LogicalTableConfigUtils;
-import org.apache.pinot.common.utils.config.SchemaSerDeUtils;
-import org.apache.pinot.common.utils.config.TableConfigSerDeUtils;
 import org.apache.pinot.spi.config.provider.LogicalTableConfigChangeListener;
 import org.apache.pinot.spi.config.provider.PinotConfigProvider;
 import org.apache.pinot.spi.config.provider.SchemaChangeListener;
 import org.apache.pinot.spi.config.provider.TableConfigChangeListener;
-import org.apache.pinot.spi.config.table.QueryConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.DimensionFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.CommonConstants.Segment.BuiltInVirtualColumn;
-import org.apache.pinot.spi.utils.CommonConstants.ZkPaths;
-import org.apache.pinot.spi.utils.TimestampIndexUtils;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.sql.parsers.CalciteSqlParser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
- * An implementation of {@link PinotConfigProvider}
+ * Interface for caching table configs and schemas within the cluster.
  * The {@code TableCache} caches all the table configs and schemas within the 
cluster, and listens on ZK changes to keep
  * them in sync. It also maintains the table name map and the column name map 
for case-insensitive queries.
  */
-public class TableCache implements PinotConfigProvider {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(TableCache.class);
-  private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE";
-  private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
-  private static final String SCHEMA_PARENT_PATH = "/SCHEMAS";
-  private static final String SCHEMA_PATH_PREFIX = "/SCHEMAS/";
-  private static final String OFFLINE_TABLE_SUFFIX = "_OFFLINE";
-  private static final String REALTIME_TABLE_SUFFIX = "_REALTIME";
-  private static final String LOWER_CASE_OFFLINE_TABLE_SUFFIX = 
OFFLINE_TABLE_SUFFIX.toLowerCase();
-  private static final String LOWER_CASE_REALTIME_TABLE_SUFFIX = 
REALTIME_TABLE_SUFFIX.toLowerCase();
-
-  // NOTE: No need to use concurrent set because it is always accessed within 
the ZK change listener lock
-  private final Set<TableConfigChangeListener> _tableConfigChangeListeners = 
new HashSet<>();
-  private final Set<SchemaChangeListener> _schemaChangeListeners = new 
HashSet<>();
-  private final Set<LogicalTableConfigChangeListener> 
_logicalTableConfigChangeListeners = new HashSet<>();
-
-  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
-  private final boolean _ignoreCase;
-
-  private final ZkTableConfigChangeListener _zkTableConfigChangeListener = new 
ZkTableConfigChangeListener();
-  // Key is table name with type suffix, value is table config info
-  private final Map<String, TableConfigInfo> _tableConfigInfoMap = new 
ConcurrentHashMap<>();
-  // Key is lower case table name (with or without type suffix), value is 
actual table name
-  // For case-insensitive mode only
-  private final Map<String, String> _tableNameMap = new ConcurrentHashMap<>();
-
-  private final ZkSchemaChangeListener _zkSchemaChangeListener = new 
ZkSchemaChangeListener();
-  // Key is schema name, value is schema info
-  private final Map<String, SchemaInfo> _schemaInfoMap = new 
ConcurrentHashMap<>();
-
-  private final ZkLogicalTableConfigChangeListener
-      _zkLogicalTableConfigChangeListener = new 
ZkLogicalTableConfigChangeListener();
-  // Key is table name, value is logical table info
-  private final Map<String, LogicalTableConfigInfo> _logicalTableConfigInfoMap 
= new ConcurrentHashMap<>();
-  // Key is lower case logical table name, value is actual logical table name
-  // For case-insensitive mode only
-  private final Map<String, String> _logicalTableNameMap = new 
ConcurrentHashMap<>();
-
-  public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore, boolean 
ignoreCase) {
-    _propertyStore = propertyStore;
-    _ignoreCase = ignoreCase;
-
-    synchronized (_zkTableConfigChangeListener) {
-      // Subscribe child changes before reading the data to avoid missing 
changes
-      _propertyStore.subscribeChildChanges(TABLE_CONFIG_PARENT_PATH, 
_zkTableConfigChangeListener);
-
-      List<String> tables = 
_propertyStore.getChildNames(TABLE_CONFIG_PARENT_PATH, AccessOption.PERSISTENT);
-      if (CollectionUtils.isNotEmpty(tables)) {
-        List<String> pathsToAdd = new ArrayList<>(tables.size());
-        for (String tableNameWithType : tables) {
-          pathsToAdd.add(TABLE_CONFIG_PATH_PREFIX + tableNameWithType);
-        }
-        addTableConfigs(pathsToAdd);
-      }
-    }
-
-    synchronized (_zkSchemaChangeListener) {
-      // Subscribe child changes before reading the data to avoid missing 
changes
-      _propertyStore.subscribeChildChanges(SCHEMA_PARENT_PATH, 
_zkSchemaChangeListener);
-
-      List<String> tables = _propertyStore.getChildNames(SCHEMA_PARENT_PATH, 
AccessOption.PERSISTENT);
-      if (CollectionUtils.isNotEmpty(tables)) {
-        List<String> pathsToAdd = new ArrayList<>(tables.size());
-        for (String rawTableName : tables) {
-          pathsToAdd.add(SCHEMA_PATH_PREFIX + rawTableName);
-        }
-        addSchemas(pathsToAdd);
-      }
-    }
-
-    synchronized (_zkLogicalTableConfigChangeListener) {
-      // Subscribe child changes before reading the data to avoid missing 
changes
-      _propertyStore.subscribeChildChanges(ZkPaths.LOGICAL_TABLE_PARENT_PATH, 
_zkLogicalTableConfigChangeListener);
-
-      List<String> tables = 
_propertyStore.getChildNames(ZkPaths.LOGICAL_TABLE_PARENT_PATH, 
AccessOption.PERSISTENT);
-      if (CollectionUtils.isNotEmpty(tables)) {
-        List<String> pathsToAdd = tables.stream()
-            .map(rawTableName -> ZkPaths.LOGICAL_TABLE_PATH_PREFIX + 
rawTableName)
-            .collect(Collectors.toList());
-        addLogicalTableConfigs(pathsToAdd);
-      }
-    }
-
-    LOGGER.info("Initialized TableCache with IgnoreCase: {}", ignoreCase);
-  }
-
+public interface TableCache extends PinotConfigProvider {
   /**
    * Returns {@code true} if the TableCache is case-insensitive, {@code false} 
otherwise.
    */
-  public boolean isIgnoreCase() {
-    return _ignoreCase;
-  }
+  boolean isIgnoreCase();
 
   /**
    * Returns the actual table name for the given table name (with or without 
type suffix), or {@code null} if the table
    * does not exist.
    */
   @Nullable
-  public String getActualTableName(String tableName) {
-    if (_ignoreCase) {
-      return _tableNameMap.get(tableName.toLowerCase());
-    } else {
-      return _tableNameMap.get(tableName);
-    }
-  }
+  String getActualTableName(String tableName);
 
   /**
    * Returns the actual logical table name for the given table name, or {@code 
null} if table does not exist.
@@ -177,531 +56,76 @@ public class TableCache implements PinotConfigProvider {
    * @return Actual logical table name
    */
   @Nullable
-  public String getActualLogicalTableName(String logicalTableName) {
-    return _ignoreCase
-        ? _logicalTableNameMap.get(logicalTableName.toLowerCase())
-        : _logicalTableNameMap.get(logicalTableName);
-  }
+  String getActualLogicalTableName(String logicalTableName);
 
   /**
    * Returns a map from table name to actual table name. For case-insensitive 
case, the keys of the map are in lower
    * case.
    */
-  public Map<String, String> getTableNameMap() {
-    return _tableNameMap;
-  }
+  Map<String, String> getTableNameMap();
 
   /**
    * Returns a map from logical table name to actual logical table name. For 
case-insensitive case, the keys of the map
    * are in lower case.
    * @return Map from logical table name to actual logical table name
    */
-  public Map<String, String> getLogicalTableNameMap() {
-    return _logicalTableNameMap;
-  }
+  Map<String, String> getLogicalTableNameMap();
 
   /**
    * Get all dimension table names.
    * @return List of dimension table names
    */
-  public List<String> getAllDimensionTables() {
-    List<String> dimensionTables = new ArrayList<>();
-    for (TableConfigInfo tableConfigInfo : _tableConfigInfoMap.values()) {
-      if (tableConfigInfo._tableConfig.isDimTable()) {
-        dimensionTables.add(tableConfigInfo._tableConfig.getTableName());
-      }
-    }
-    return dimensionTables;
-  }
+  List<String> getAllDimensionTables();
 
   /**
    * Returns a map from column name to actual column name for the given table, 
or {@code null} if the table schema does
    * not exist. For case-insensitive case, the keys of the map are in lower 
case.
    */
   @Nullable
-  public Map<String, String> getColumnNameMap(String rawTableName) {
-    SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
-    return schemaInfo != null ? schemaInfo._columnNameMap : null;
-  }
+  Map<String, String> getColumnNameMap(String rawTableName);
 
   /**
    * Returns the expression override map for the given logical or physical 
table, or {@code null} if no override is
    * configured.
    */
   @Nullable
-  public Map<Expression, Expression> getExpressionOverrideMap(String 
physicalOrLogicalTableName) {
-    TableConfigInfo tableConfigInfo = 
_tableConfigInfoMap.get(physicalOrLogicalTableName);
-    if (tableConfigInfo != null) {
-      return tableConfigInfo._expressionOverrideMap;
-    }
-    LogicalTableConfigInfo logicalTableConfigInfo = 
_logicalTableConfigInfoMap.get(physicalOrLogicalTableName);
-    return logicalTableConfigInfo != null ? 
logicalTableConfigInfo._expressionOverrideMap : null;
-  }
+  Map<Expression, Expression> getExpressionOverrideMap(String 
physicalOrLogicalTableName);
 
   /**
    * Returns the timestamp index columns for the given table, or {@code null} 
if table does not exist.
    */
   @Nullable
-  public Set<String> getTimestampIndexColumns(String tableNameWithType) {
-    TableConfigInfo tableConfigInfo = 
_tableConfigInfoMap.get(tableNameWithType);
-    return tableConfigInfo != null ? tableConfigInfo._timestampIndexColumns : 
null;
-  }
+  Set<String> getTimestampIndexColumns(String tableNameWithType);
 
   /**
    * Returns the table config for the given table, or {@code null} if it does 
not exist.
    */
   @Nullable
   @Override
-  public TableConfig getTableConfig(String tableNameWithType) {
-    TableConfigInfo tableConfigInfo = 
_tableConfigInfoMap.get(tableNameWithType);
-    return tableConfigInfo != null ? tableConfigInfo._tableConfig : null;
-  }
+  TableConfig getTableConfig(String tableNameWithType);
 
   @Nullable
   @Override
-  public LogicalTableConfig getLogicalTableConfig(String logicalTableName) {
-    LogicalTableConfigInfo logicalTableConfigInfo = 
_logicalTableConfigInfoMap.get(logicalTableName);
-    return logicalTableConfigInfo != null ? 
logicalTableConfigInfo._logicalTableConfig : null;
-  }
+  LogicalTableConfig getLogicalTableConfig(String logicalTableName);
 
   @Override
-  public boolean registerTableConfigChangeListener(TableConfigChangeListener 
tableConfigChangeListener) {
-    synchronized (_zkTableConfigChangeListener) {
-      boolean added = 
_tableConfigChangeListeners.add(tableConfigChangeListener);
-      if (added) {
-        tableConfigChangeListener.onChange(getTableConfigs());
-      }
-      return added;
-    }
-  }
+  boolean registerTableConfigChangeListener(TableConfigChangeListener 
tableConfigChangeListener);
 
   /**
    * Returns the schema for the given logical or physical table, or {@code 
null} if it does not exist.
    */
   @Nullable
   @Override
-  public Schema getSchema(String rawTableName) {
-    SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
-    return schemaInfo != null ? schemaInfo._schema : null;
-  }
+  Schema getSchema(String rawTableName);
 
   @Override
-  public boolean registerSchemaChangeListener(SchemaChangeListener 
schemaChangeListener) {
-    synchronized (_zkSchemaChangeListener) {
-      boolean added = _schemaChangeListeners.add(schemaChangeListener);
-      if (added) {
-        schemaChangeListener.onChange(getSchemas());
-      }
-      return added;
-    }
-  }
-
-  private void addTableConfigs(List<String> paths) {
-    // Subscribe data changes before reading the data to avoid missing changes
-    for (String path : paths) {
-      _propertyStore.subscribeDataChanges(path, _zkTableConfigChangeListener);
-    }
-    List<ZNRecord> znRecords = _propertyStore.get(paths, null, 
AccessOption.PERSISTENT);
-    for (ZNRecord znRecord : znRecords) {
-      if (znRecord != null) {
-        try {
-          putTableConfig(znRecord);
-        } catch (Exception e) {
-          LOGGER.error("Caught exception while adding table config for 
ZNRecord: {}", znRecord.getId(), e);
-        }
-      }
-    }
-  }
-
-  private void addLogicalTableConfigs(List<String> paths) {
-    // Subscribe data changes before reading the data to avoid missing changes
-    for (String path : paths) {
-      _propertyStore.subscribeDataChanges(path, 
_zkLogicalTableConfigChangeListener);
-    }
-    List<ZNRecord> znRecords = _propertyStore.get(paths, null, 
AccessOption.PERSISTENT);
-    for (ZNRecord znRecord : znRecords) {
-      if (znRecord != null) {
-        try {
-          putLogicalTableConfig(znRecord);
-        } catch (Exception e) {
-          LOGGER.error("Caught exception while adding logical table for 
ZNRecord: {}", znRecord.getId(), e);
-        }
-      }
-    }
-  }
-
-  private void putTableConfig(ZNRecord znRecord)
-      throws IOException {
-    TableConfig tableConfig = TableConfigSerDeUtils.fromZNRecord(znRecord);
-    String tableNameWithType = tableConfig.getTableName();
-    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
-    _tableConfigInfoMap.put(tableNameWithType, new 
TableConfigInfo(tableConfig));
-    if (_ignoreCase) {
-      _tableNameMap.put(tableNameWithType.toLowerCase(), tableNameWithType);
-      _tableNameMap.put(rawTableName.toLowerCase(), rawTableName);
-    } else {
-      _tableNameMap.put(tableNameWithType, tableNameWithType);
-      _tableNameMap.put(rawTableName, rawTableName);
-    }
-  }
-
-  private void putLogicalTableConfig(ZNRecord znRecord)
-      throws IOException {
-    LogicalTableConfig logicalTableConfig = 
LogicalTableConfigUtils.fromZNRecord(znRecord);
-    String logicalTableName = logicalTableConfig.getTableName();
-    _logicalTableConfigInfoMap.put(logicalTableName, new 
LogicalTableConfigInfo(logicalTableConfig));
-    if (_ignoreCase) {
-      _logicalTableNameMap.put(logicalTableName.toLowerCase(), 
logicalTableName);
-    } else {
-      _logicalTableNameMap.put(logicalTableName, logicalTableName);
-    }
-  }
-
-  private void removeTableConfig(String path) {
-    _propertyStore.unsubscribeDataChanges(path, _zkTableConfigChangeListener);
-    String tableNameWithType = 
path.substring(TABLE_CONFIG_PATH_PREFIX.length());
-    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
-    _tableConfigInfoMap.remove(tableNameWithType);
-    if (_ignoreCase) {
-      _tableNameMap.remove(tableNameWithType.toLowerCase());
-      String lowerCaseRawTableName = rawTableName.toLowerCase();
-      if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
-        if (!_tableNameMap.containsKey(lowerCaseRawTableName + 
LOWER_CASE_REALTIME_TABLE_SUFFIX)) {
-          _tableNameMap.remove(lowerCaseRawTableName);
-        }
-      } else {
-        if (!_tableNameMap.containsKey(lowerCaseRawTableName + 
LOWER_CASE_OFFLINE_TABLE_SUFFIX)) {
-          _tableNameMap.remove(lowerCaseRawTableName);
-        }
-      }
-    } else {
-      _tableNameMap.remove(tableNameWithType);
-      if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
-        if (!_tableNameMap.containsKey(rawTableName + REALTIME_TABLE_SUFFIX)) {
-          _tableNameMap.remove(rawTableName);
-        }
-      } else {
-        if (!_tableNameMap.containsKey(rawTableName + OFFLINE_TABLE_SUFFIX)) {
-          _tableNameMap.remove(rawTableName);
-        }
-      }
-    }
-  }
-
-  private void removeLogicalTableConfig(String path) {
-    _propertyStore.unsubscribeDataChanges(path, 
_zkLogicalTableConfigChangeListener);
-    String logicalTableName = 
path.substring(ZkPaths.LOGICAL_TABLE_PATH_PREFIX.length());
-    _logicalTableConfigInfoMap.remove(logicalTableName);
-    logicalTableName = _ignoreCase ? logicalTableName.toLowerCase() : 
logicalTableName;
-    _logicalTableNameMap.remove(logicalTableName);
-  }
-
-  private void addSchemas(List<String> paths) {
-    // Subscribe data changes before reading the data to avoid missing changes
-    for (String path : paths) {
-      _propertyStore.subscribeDataChanges(path, _zkSchemaChangeListener);
-    }
-    List<ZNRecord> znRecords = _propertyStore.get(paths, null, 
AccessOption.PERSISTENT);
-    for (ZNRecord znRecord : znRecords) {
-      if (znRecord != null) {
-        try {
-          putSchema(znRecord);
-        } catch (Exception e) {
-          LOGGER.error("Caught exception while adding schema for ZNRecord: 
{}", znRecord.getId(), e);
-        }
-      }
-    }
-  }
-
-  private void putSchema(ZNRecord znRecord)
-      throws IOException {
-    Schema schema = SchemaSerDeUtils.fromZNRecord(znRecord);
-    addBuiltInVirtualColumns(schema);
-    String schemaName = schema.getSchemaName();
-    Map<String, String> columnNameMap = new HashMap<>();
-    if (_ignoreCase) {
-      for (String columnName : schema.getColumnNames()) {
-        columnNameMap.put(columnName.toLowerCase(), columnName);
-      }
-    } else {
-      for (String columnName : schema.getColumnNames()) {
-        columnNameMap.put(columnName, columnName);
-      }
-    }
-    _schemaInfoMap.put(schemaName, new SchemaInfo(schema, columnNameMap));
-  }
-
-  /**
-   * Adds the built-in virtual columns to the schema.
-   * NOTE: The virtual column provider class is not added.
-   */
-  private static void addBuiltInVirtualColumns(Schema schema) {
-    if (!schema.hasColumn(BuiltInVirtualColumn.DOCID)) {
-      schema.addField(new DimensionFieldSpec(BuiltInVirtualColumn.DOCID, 
FieldSpec.DataType.INT, true));
-    }
-    if (!schema.hasColumn(BuiltInVirtualColumn.HOSTNAME)) {
-      schema.addField(new DimensionFieldSpec(BuiltInVirtualColumn.HOSTNAME, 
FieldSpec.DataType.STRING, true));
-    }
-    if (!schema.hasColumn(BuiltInVirtualColumn.SEGMENTNAME)) {
-      schema.addField(new DimensionFieldSpec(BuiltInVirtualColumn.SEGMENTNAME, 
FieldSpec.DataType.STRING, true));
-    }
-  }
-
-  private void removeSchema(String path) {
-    _propertyStore.unsubscribeDataChanges(path, _zkSchemaChangeListener);
-    String schemaName = path.substring(SCHEMA_PATH_PREFIX.length());
-    _schemaInfoMap.remove(schemaName);
-  }
-
-  private void notifyTableConfigChangeListeners() {
-    if (!_tableConfigChangeListeners.isEmpty()) {
-      List<TableConfig> tableConfigs = getTableConfigs();
-      for (TableConfigChangeListener tableConfigChangeListener : 
_tableConfigChangeListeners) {
-        tableConfigChangeListener.onChange(tableConfigs);
-      }
-    }
-  }
+  boolean registerSchemaChangeListener(SchemaChangeListener 
schemaChangeListener);
 
-  private void notifyLogicalTableConfigChangeListeners() {
-    if (!_logicalTableConfigChangeListeners.isEmpty()) {
-      List<LogicalTableConfig> logicalTableConfigs = getLogicalTableConfigs();
-      for (LogicalTableConfigChangeListener listener : 
_logicalTableConfigChangeListeners) {
-        listener.onChange(logicalTableConfigs);
-      }
-    }
-  }
+  List<LogicalTableConfig> getLogicalTableConfigs();
 
-  private List<TableConfig> getTableConfigs() {
-    List<TableConfig> tableConfigs = new 
ArrayList<>(_tableConfigInfoMap.size());
-    for (TableConfigInfo tableConfigInfo : _tableConfigInfoMap.values()) {
-      tableConfigs.add(tableConfigInfo._tableConfig);
-    }
-    return tableConfigs;
-  }
-
-  public List<LogicalTableConfig> getLogicalTableConfigs() {
-    return _logicalTableConfigInfoMap.values().stream().map(o -> 
o._logicalTableConfig).collect(Collectors.toList());
-  }
-
-  private void notifySchemaChangeListeners() {
-    if (!_schemaChangeListeners.isEmpty()) {
-      List<Schema> schemas = getSchemas();
-      for (SchemaChangeListener schemaChangeListener : _schemaChangeListeners) 
{
-        schemaChangeListener.onChange(schemas);
-      }
-    }
-  }
-
-  private List<Schema> getSchemas() {
-    List<Schema> schemas = new ArrayList<>(_schemaInfoMap.size());
-    for (SchemaInfo schemaInfo : _schemaInfoMap.values()) {
-      schemas.add(schemaInfo._schema);
-    }
-    return schemas;
-  }
-
-  public boolean isLogicalTable(String logicalTableName) {
-    logicalTableName = _ignoreCase ? logicalTableName.toLowerCase() : 
logicalTableName;
-    return _logicalTableConfigInfoMap.containsKey(logicalTableName);
-  }
+  boolean isLogicalTable(String logicalTableName);
 
   @Override
-  public boolean registerLogicalTableConfigChangeListener(
-      LogicalTableConfigChangeListener logicalTableConfigChangeListener) {
-    synchronized (_zkLogicalTableConfigChangeListener) {
-      boolean added = 
_logicalTableConfigChangeListeners.add(logicalTableConfigChangeListener);
-      if (added) {
-        logicalTableConfigChangeListener.onChange(getLogicalTableConfigs());
-      }
-      return added;
-    }
-  }
-
-  private class ZkTableConfigChangeListener implements IZkChildListener, 
IZkDataListener {
-
-    @Override
-    public synchronized void handleChildChange(String path, List<String> 
tableNamesWithType) {
-      if (CollectionUtils.isEmpty(tableNamesWithType)) {
-        return;
-      }
-
-      // Only process new added table configs. Changed/removed table configs 
are handled by other callbacks.
-      List<String> pathsToAdd = new ArrayList<>();
-      for (String tableNameWithType : tableNamesWithType) {
-        if (!_tableConfigInfoMap.containsKey(tableNameWithType)) {
-          pathsToAdd.add(TABLE_CONFIG_PATH_PREFIX + tableNameWithType);
-        }
-      }
-      if (!pathsToAdd.isEmpty()) {
-        addTableConfigs(pathsToAdd);
-      }
-      notifyTableConfigChangeListeners();
-    }
-
-    @Override
-    public synchronized void handleDataChange(String path, Object data) {
-      if (data != null) {
-        ZNRecord znRecord = (ZNRecord) data;
-        try {
-          putTableConfig(znRecord);
-        } catch (Exception e) {
-          LOGGER.error("Caught exception while refreshing table config for 
ZNRecord: {}", znRecord.getId(), e);
-        }
-        notifyTableConfigChangeListeners();
-      }
-    }
-
-    @Override
-    public synchronized void handleDataDeleted(String path) {
-      // NOTE: The path here is the absolute ZK path instead of the relative 
path to the property store.
-      String tableNameWithType = path.substring(path.lastIndexOf('/') + 1);
-      removeTableConfig(TABLE_CONFIG_PATH_PREFIX + tableNameWithType);
-      notifyTableConfigChangeListeners();
-    }
-  }
-
-  private class ZkSchemaChangeListener implements IZkChildListener, 
IZkDataListener {
-
-    @Override
-    public synchronized void handleChildChange(String path, List<String> 
schemaNames) {
-      if (CollectionUtils.isEmpty(schemaNames)) {
-        return;
-      }
-
-      // Only process new added schemas. Changed/removed schemas are handled 
by other callbacks.
-      List<String> pathsToAdd = new ArrayList<>();
-      for (String schemaName : schemaNames) {
-        if (!_schemaInfoMap.containsKey(schemaName)) {
-          pathsToAdd.add(SCHEMA_PATH_PREFIX + schemaName);
-        }
-      }
-      if (!pathsToAdd.isEmpty()) {
-        addSchemas(pathsToAdd);
-      }
-      notifySchemaChangeListeners();
-    }
-
-    @Override
-    public synchronized void handleDataChange(String path, Object data) {
-      if (data != null) {
-        ZNRecord znRecord = (ZNRecord) data;
-        try {
-          putSchema(znRecord);
-        } catch (Exception e) {
-          LOGGER.error("Caught exception while refreshing schema for ZNRecord: 
{}", znRecord.getId(), e);
-        }
-        notifySchemaChangeListeners();
-      }
-    }
-
-    @Override
-    public synchronized void handleDataDeleted(String path) {
-      // NOTE: The path here is the absolute ZK path instead of the relative 
path to the property store.
-      String schemaName = path.substring(path.lastIndexOf('/') + 1);
-      removeSchema(SCHEMA_PATH_PREFIX + schemaName);
-      notifySchemaChangeListeners();
-    }
-  }
-
-  private class ZkLogicalTableConfigChangeListener implements 
IZkChildListener, IZkDataListener {
-
-    @Override
-    public synchronized void handleChildChange(String path, List<String> 
logicalTableNames) {
-      if (CollectionUtils.isEmpty(logicalTableNames)) {
-        return;
-      }
-
-      // Only process new added logical tables. Changed/removed logical tables 
are handled by other callbacks.
-      List<String> pathsToAdd = new ArrayList<>();
-      for (String logicalTableName : logicalTableNames) {
-        if (!_logicalTableConfigInfoMap.containsKey(logicalTableName)) {
-          pathsToAdd.add(ZkPaths.LOGICAL_TABLE_PATH_PREFIX + logicalTableName);
-        }
-      }
-      if (!pathsToAdd.isEmpty()) {
-        addLogicalTableConfigs(pathsToAdd);
-      }
-      notifyLogicalTableConfigChangeListeners();
-    }
-
-    @Override
-    public synchronized void handleDataChange(String path, Object data) {
-      if (data != null) {
-        ZNRecord znRecord = (ZNRecord) data;
-        try {
-          putLogicalTableConfig(znRecord);
-        } catch (Exception e) {
-          LOGGER.error("Caught exception while refreshing logical table for 
ZNRecord: {}", znRecord.getId(), e);
-        }
-        notifyLogicalTableConfigChangeListeners();
-      }
-    }
-
-    @Override
-    public synchronized void handleDataDeleted(String path) {
-      // NOTE: The path here is the absolute ZK path instead of the relative 
path to the property store.
-      String logicalTableName = path.substring(path.lastIndexOf('/') + 1);
-      removeLogicalTableConfig(ZkPaths.LOGICAL_TABLE_PATH_PREFIX + 
logicalTableName);
-      notifyLogicalTableConfigChangeListeners();
-    }
-  }
-
-  private static Map<Expression, Expression> 
createExpressionOverrideMap(String physicalOrLogicalTableName,
-      QueryConfig queryConfig) {
-    Map<Expression, Expression> expressionOverrideMap = new TreeMap<>();
-    if (queryConfig != null && 
MapUtils.isNotEmpty(queryConfig.getExpressionOverrideMap())) {
-      for (Map.Entry<String, String> entry : 
queryConfig.getExpressionOverrideMap().entrySet()) {
-        try {
-          Expression srcExp = 
CalciteSqlParser.compileToExpression(entry.getKey());
-          Expression destExp = 
CalciteSqlParser.compileToExpression(entry.getValue());
-          expressionOverrideMap.put(srcExp, destExp);
-        } catch (Exception e) {
-          LOGGER.warn("Caught exception while compiling expression override: 
{} -> {} for table: {}, skipping it",
-              entry.getKey(), entry.getValue(), physicalOrLogicalTableName);
-        }
-      }
-      int mapSize = expressionOverrideMap.size();
-      if (mapSize == 1) {
-        Map.Entry<Expression, Expression> entry = 
expressionOverrideMap.entrySet().iterator().next();
-        return Collections.singletonMap(entry.getKey(), entry.getValue());
-      } else if (mapSize > 1) {
-        return expressionOverrideMap;
-      }
-    }
-    return null;
-  }
-
-  private static class TableConfigInfo {
-    final TableConfig _tableConfig;
-    final Map<Expression, Expression> _expressionOverrideMap;
-    // All the timestamp with granularity column names
-    final Set<String> _timestampIndexColumns;
-
-    private TableConfigInfo(TableConfig tableConfig) {
-      _tableConfig = tableConfig;
-      _expressionOverrideMap = 
createExpressionOverrideMap(tableConfig.getTableName(), 
tableConfig.getQueryConfig());
-      _timestampIndexColumns = 
TimestampIndexUtils.extractColumnsWithGranularity(tableConfig);
-    }
-  }
-
-  private static class SchemaInfo {
-    final Schema _schema;
-    final Map<String, String> _columnNameMap;
-
-    private SchemaInfo(Schema schema, Map<String, String> columnNameMap) {
-      _schema = schema;
-      _columnNameMap = columnNameMap;
-    }
-  }
-
-  private static class LogicalTableConfigInfo {
-    final LogicalTableConfig _logicalTableConfig;
-    final Map<Expression, Expression> _expressionOverrideMap;
-
-    private LogicalTableConfigInfo(LogicalTableConfig logicalTableConfig) {
-      _logicalTableConfig = logicalTableConfig;
-      _expressionOverrideMap = 
createExpressionOverrideMap(logicalTableConfig.getTableName(),
-          logicalTableConfig.getQueryConfig());
-    }
-  }
+  boolean registerLogicalTableConfigChangeListener(
+      LogicalTableConfigChangeListener logicalTableConfigChangeListener);
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/ZkTableCache.java
similarity index 96%
copy from 
pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
copy to 
pinot-common/src/main/java/org/apache/pinot/common/config/provider/ZkTableCache.java
index 291e60a387d..8e688468b19 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/ZkTableCache.java
@@ -65,8 +65,8 @@ import org.slf4j.LoggerFactory;
  * The {@code TableCache} caches all the table configs and schemas within the 
cluster, and listens on ZK changes to keep
  * them in sync. It also maintains the table name map and the column name map 
for case-insensitive queries.
  */
-public class TableCache implements PinotConfigProvider {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(TableCache.class);
+public class ZkTableCache implements TableCache {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ZkTableCache.class);
   private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE";
   private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
   private static final String SCHEMA_PARENT_PATH = "/SCHEMAS";
@@ -96,14 +96,14 @@ public class TableCache implements PinotConfigProvider {
   private final Map<String, SchemaInfo> _schemaInfoMap = new 
ConcurrentHashMap<>();
 
   private final ZkLogicalTableConfigChangeListener
-      _zkLogicalTableConfigChangeListener = new 
ZkLogicalTableConfigChangeListener();
+    _zkLogicalTableConfigChangeListener = new 
ZkLogicalTableConfigChangeListener();
   // Key is table name, value is logical table info
   private final Map<String, LogicalTableConfigInfo> _logicalTableConfigInfoMap 
= new ConcurrentHashMap<>();
   // Key is lower case logical table name, value is actual logical table name
   // For case-insensitive mode only
   private final Map<String, String> _logicalTableNameMap = new 
ConcurrentHashMap<>();
 
-  public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore, boolean 
ignoreCase) {
+  public ZkTableCache(ZkHelixPropertyStore<ZNRecord> propertyStore, boolean 
ignoreCase) {
     _propertyStore = propertyStore;
     _ignoreCase = ignoreCase;
 
@@ -142,8 +142,8 @@ public class TableCache implements PinotConfigProvider {
       List<String> tables = 
_propertyStore.getChildNames(ZkPaths.LOGICAL_TABLE_PARENT_PATH, 
AccessOption.PERSISTENT);
       if (CollectionUtils.isNotEmpty(tables)) {
         List<String> pathsToAdd = tables.stream()
-            .map(rawTableName -> ZkPaths.LOGICAL_TABLE_PATH_PREFIX + 
rawTableName)
-            .collect(Collectors.toList());
+          .map(rawTableName -> ZkPaths.LOGICAL_TABLE_PATH_PREFIX + 
rawTableName)
+          .collect(Collectors.toList());
         addLogicalTableConfigs(pathsToAdd);
       }
     }
@@ -179,8 +179,8 @@ public class TableCache implements PinotConfigProvider {
   @Nullable
   public String getActualLogicalTableName(String logicalTableName) {
     return _ignoreCase
-        ? _logicalTableNameMap.get(logicalTableName.toLowerCase())
-        : _logicalTableNameMap.get(logicalTableName);
+      ? _logicalTableNameMap.get(logicalTableName.toLowerCase())
+      : _logicalTableNameMap.get(logicalTableName);
   }
 
   /**
@@ -331,7 +331,7 @@ public class TableCache implements PinotConfigProvider {
   }
 
   private void putTableConfig(ZNRecord znRecord)
-      throws IOException {
+    throws IOException {
     TableConfig tableConfig = TableConfigSerDeUtils.fromZNRecord(znRecord);
     String tableNameWithType = tableConfig.getTableName();
     String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
@@ -346,7 +346,7 @@ public class TableCache implements PinotConfigProvider {
   }
 
   private void putLogicalTableConfig(ZNRecord znRecord)
-      throws IOException {
+    throws IOException {
     LogicalTableConfig logicalTableConfig = 
LogicalTableConfigUtils.fromZNRecord(znRecord);
     String logicalTableName = logicalTableConfig.getTableName();
     _logicalTableConfigInfoMap.put(logicalTableName, new 
LogicalTableConfigInfo(logicalTableConfig));
@@ -414,7 +414,7 @@ public class TableCache implements PinotConfigProvider {
   }
 
   private void putSchema(ZNRecord znRecord)
-      throws IOException {
+    throws IOException {
     Schema schema = SchemaSerDeUtils.fromZNRecord(znRecord);
     addBuiltInVirtualColumns(schema);
     String schemaName = schema.getSchemaName();
@@ -507,7 +507,7 @@ public class TableCache implements PinotConfigProvider {
 
   @Override
   public boolean registerLogicalTableConfigChangeListener(
-      LogicalTableConfigChangeListener logicalTableConfigChangeListener) {
+    LogicalTableConfigChangeListener logicalTableConfigChangeListener) {
     synchronized (_zkLogicalTableConfigChangeListener) {
       boolean added = 
_logicalTableConfigChangeListeners.add(logicalTableConfigChangeListener);
       if (added) {
@@ -647,7 +647,7 @@ public class TableCache implements PinotConfigProvider {
   }
 
   private static Map<Expression, Expression> 
createExpressionOverrideMap(String physicalOrLogicalTableName,
-      QueryConfig queryConfig) {
+    QueryConfig queryConfig) {
     Map<Expression, Expression> expressionOverrideMap = new TreeMap<>();
     if (queryConfig != null && 
MapUtils.isNotEmpty(queryConfig.getExpressionOverrideMap())) {
       for (Map.Entry<String, String> entry : 
queryConfig.getExpressionOverrideMap().entrySet()) {
@@ -657,7 +657,7 @@ public class TableCache implements PinotConfigProvider {
           expressionOverrideMap.put(srcExp, destExp);
         } catch (Exception e) {
           LOGGER.warn("Caught exception while compiling expression override: 
{} -> {} for table: {}, skipping it",
-              entry.getKey(), entry.getValue(), physicalOrLogicalTableName);
+            entry.getKey(), entry.getValue(), physicalOrLogicalTableName);
         }
       }
       int mapSize = expressionOverrideMap.size();
@@ -701,7 +701,7 @@ public class TableCache implements PinotConfigProvider {
     private LogicalTableConfigInfo(LogicalTableConfig logicalTableConfig) {
       _logicalTableConfig = logicalTableConfig;
       _expressionOverrideMap = 
createExpressionOverrideMap(logicalTableConfig.getTableName(),
-          logicalTableConfig.getQueryConfig());
+        logicalTableConfig.getQueryConfig());
     }
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 7e0761103e5..0c415a3a48c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -92,6 +92,7 @@ import 
org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.assignment.InstancePartitionsUtils;
 import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.config.provider.ZkTableCache;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.exception.SchemaAlreadyExistsException;
 import org.apache.pinot.common.exception.SchemaBackwardIncompatibleException;
@@ -325,7 +326,7 @@ public class PinotHelixResourceManager {
         _helixAdmin.getConfig(helixConfigScope, 
Arrays.asList(Helix.ENABLE_CASE_INSENSITIVE_KEY));
     boolean caseInsensitive = 
Boolean.parseBoolean(configs.getOrDefault(Helix.ENABLE_CASE_INSENSITIVE_KEY,
         Boolean.toString(Helix.DEFAULT_ENABLE_CASE_INSENSITIVE)));
-    _tableCache = new TableCache(_propertyStore, caseInsensitive);
+    _tableCache = new ZkTableCache(_propertyStore, caseInsensitive);
   }
 
   /**
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
index 0f21afc305e..f35d97ccbec 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.config.provider.ZkTableCache;
 import org.apache.pinot.common.exception.SchemaAlreadyExistsException;
 import org.apache.pinot.common.exception.SchemaBackwardIncompatibleException;
 import org.apache.pinot.spi.config.provider.LogicalTableConfigChangeListener;
@@ -73,7 +74,7 @@ public class TableCacheTest {
   @Test(dataProvider = "testTableCacheDataProvider")
   public void testTableCache(boolean isCaseInsensitive)
       throws Exception {
-    TableCache tableCache = new TableCache(TEST_INSTANCE.getPropertyStore(), 
isCaseInsensitive);
+    TableCache tableCache = new ZkTableCache(TEST_INSTANCE.getPropertyStore(), 
isCaseInsensitive);
 
     assertNull(tableCache.getSchema(RAW_TABLE_NAME));
     assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to