This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch table_cache
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit f64b211c9980695ac96aba11f67b026aa1f6ec8c
Author: Xiaotian (Jackie) Jiang <jackie....@gmail.com>
AuthorDate: Thu Jul 30 18:44:04 2020 -0700

    Refactor TableCache
---
 .../broker/broker/helix/HelixBrokerStarter.java    |  35 +-
 .../requesthandler/BaseBrokerRequestHandler.java   | 166 +++++-----
 .../SingleConnectionBrokerRequestHandler.java      |  11 +-
 .../LiteralOnlyBrokerRequestTest.java              |  16 +-
 .../pinot/common/utils/helix/TableCache.java       | 362 ++++++++++++++-------
 .../helix/core/PinotHelixResourceManager.java      |  51 +--
 .../pinot/controller/helix/TableCacheTest.java     | 168 ++++++++++
 pinot-spi/pom.xml                                  |   4 +
 pom.xml                                            |  10 +-
 9 files changed, 558 insertions(+), 265 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
index 3617a82..9ba8207 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -18,17 +18,14 @@
  */
 package org.apache.pinot.broker.broker.helix;
 
+import com.google.common.collect.ImmutableList;
+import com.yammer.metrics.core.MetricsRegistry;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import javax.annotation.Nullable;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.HelixDataAccessor;
@@ -61,15 +58,13 @@ import org.apache.pinot.common.utils.CommonConstants.Helix;
 import org.apache.pinot.common.utils.NetUtil;
 import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.common.utils.helix.TableCache;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.services.ServiceRole;
 import org.apache.pinot.spi.services.ServiceStartable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.ImmutableList;
-import com.yammer.metrics.core.MetricsRegistry;
-
 
 @SuppressWarnings("unused")
 public class HelixBrokerStarter implements ServiceStartable {
@@ -106,7 +101,8 @@ public class HelixBrokerStarter implements ServiceStartable 
{
     this(brokerConf, clusterName, zkServer, null);
   }
 
-  public HelixBrokerStarter(PinotConfiguration brokerConf, String clusterName, 
String zkServer, @Nullable String brokerHost)
+  public HelixBrokerStarter(PinotConfiguration brokerConf, String clusterName, 
String zkServer,
+      @Nullable String brokerHost)
       throws Exception {
     _brokerConf = brokerConf;
     setupHelixSystemProperties();
@@ -190,17 +186,15 @@ public class HelixBrokerStarter implements 
ServiceStartable {
     _helixAdmin = _spectatorHelixManager.getClusterManagmentTool();
     _propertyStore = _spectatorHelixManager.getHelixPropertyStore();
     _helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
+
     // Fetch cluster level config from ZK
-    ConfigAccessor configAccessor = _spectatorHelixManager.getConfigAccessor();
     HelixConfigScope helixConfigScope =
         new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(_clusterName).build();
-    Map<String, String> configMap = configAccessor.get(helixConfigScope, Arrays
+    Map<String, String> configMap = _helixAdmin.getConfig(helixConfigScope, 
Arrays
         .asList(Helix.ENABLE_CASE_INSENSITIVE_KEY, 
Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY,
             Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, 
Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY));
-    if (Boolean.parseBoolean(configMap.get(Helix.ENABLE_CASE_INSENSITIVE_KEY)) 
|| Boolean
-        
.parseBoolean(configMap.get(Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY))) {
-      _brokerConf.setProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, true);
-    }
+    boolean caseInsensitive = 
Boolean.parseBoolean(configMap.get(Helix.ENABLE_CASE_INSENSITIVE_KEY)) || 
Boolean
+        
.parseBoolean(configMap.get(Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY));
     String log2mStr = configMap.get(Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY);
     if (log2mStr != null) {
       try {
@@ -233,9 +227,10 @@ public class HelixBrokerStarter implements 
ServiceStartable {
     queryQuotaManager.init(_spectatorHelixManager);
     // Initialize FunctionRegistry before starting the broker request handler
     FunctionRegistry.init();
+    TableCache tableCache = new TableCache(_propertyStore, caseInsensitive);
     _brokerRequestHandler =
         new SingleConnectionBrokerRequestHandler(_brokerConf, _routingManager, 
_accessControlFactory, queryQuotaManager,
-            _brokerMetrics, _propertyStore);
+            tableCache, _brokerMetrics);
 
     int brokerQueryPort = 
_brokerConf.getProperty(Helix.KEY_OF_BROKER_QUERY_PORT, 
Helix.DEFAULT_BROKER_QUERY_PORT);
     LOGGER.info("Starting broker admin application on port: {}", 
brokerQueryPort);
@@ -311,8 +306,9 @@ public class HelixBrokerStarter implements ServiceStartable 
{
       }
     }
 
-    double minResourcePercentForStartup = _brokerConf.getProperty(
-        Broker.CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START, 
Broker.DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START);
+    double minResourcePercentForStartup = _brokerConf
+        .getProperty(Broker.CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START,
+            Broker.DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START);
 
     LOGGER.info("Registering service status handler");
     ServiceStatus.setServiceStatusCallback(_brokerId, new 
ServiceStatus.MultipleCallbackServiceStatusCallback(
@@ -395,7 +391,8 @@ public class HelixBrokerStarter implements ServiceStartable 
{
     return _brokerRequestHandler;
   }
 
-  public static HelixBrokerStarter getDefault() throws Exception {
+  public static HelixBrokerStarter getDefault()
+      throws Exception {
     Map<String, Object> properties = new HashMap<>();
 
     properties.put(Helix.KEY_OF_BROKER_QUERY_PORT, 5001);
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index e442b5d..7bb8c7a 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pinot.broker.requesthandler;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Splitter;
+import com.google.common.util.concurrent.RateLimiter;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -31,14 +35,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
-
 import org.apache.calcite.sql.SqlKind;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.broker.api.RequestStatistics;
 import org.apache.pinot.broker.api.RequesterIdentity;
 import org.apache.pinot.broker.broker.AccessControlFactory;
@@ -85,11 +85,6 @@ import org.apache.pinot.sql.parsers.CalciteSqlParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Splitter;
-import com.google.common.util.concurrent.RateLimiter;
-
 
 @ThreadSafe
 public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler 
{
@@ -99,6 +94,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
   protected final RoutingManager _routingManager;
   protected final AccessControlFactory _accessControlFactory;
   protected final QueryQuotaManager _queryQuotaManager;
+  protected final TableCache _tableCache;
   protected final BrokerMetrics _brokerMetrics;
 
   protected final AtomicLong _requestIdGenerator = new AtomicLong();
@@ -115,36 +111,29 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
   private final RateLimiter _numDroppedLogRateLimiter;
   private final AtomicInteger _numDroppedLog;
 
-  private final boolean _enableCaseInsensitive;
   private final boolean _enableQueryLimitOverride;
   private final int _defaultHllLog2m;
-  private final TableCache _tableCache;
 
   public BaseBrokerRequestHandler(PinotConfiguration config, RoutingManager 
routingManager,
-      AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, BrokerMetrics brokerMetrics,
-      ZkHelixPropertyStore<ZNRecord> propertyStore) {
+      AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
+      BrokerMetrics brokerMetrics) {
     _config = config;
     _routingManager = routingManager;
     _accessControlFactory = accessControlFactory;
     _queryQuotaManager = queryQuotaManager;
+    _tableCache = tableCache;
     _brokerMetrics = brokerMetrics;
 
-    _enableCaseInsensitive = 
_config.getProperty(CommonConstants.Helix.ENABLE_CASE_INSENSITIVE_KEY, false);
-    if (_enableCaseInsensitive) {
-      _tableCache = new TableCache(propertyStore);
-    } else {
-      _tableCache = null;
-    }
-    _defaultHllLog2m = _config
-        .getProperty(CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY, 
CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M);
-
+    _defaultHllLog2m = 
_config.getProperty(CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY,
+        CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M);
     _enableQueryLimitOverride = 
_config.getProperty(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, false);
 
     _brokerId = config.getProperty(Broker.CONFIG_OF_BROKER_ID, 
getDefaultBrokerId());
     _brokerTimeoutMs = config.getProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 
Broker.DEFAULT_BROKER_TIMEOUT_MS);
     _queryResponseLimit =
         config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, 
Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
-    _queryLogLength = 
config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_LOG_LENGTH, 
Broker.DEFAULT_BROKER_QUERY_LOG_LENGTH);
+    _queryLogLength =
+        config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_LOG_LENGTH, 
Broker.DEFAULT_BROKER_QUERY_LOG_LENGTH);
     _queryLogRateLimiter = 
RateLimiter.create(config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND,
         Broker.DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND));
     _numDroppedLog = new AtomicInteger(0);
@@ -201,7 +190,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
                 e.getMessage());
       }
     }
-    updateQuerySource(brokerRequest);
+    updateTableName(brokerRequest);
     try {
       updateColumnNames(brokerRequest);
     } catch (Exception e) {
@@ -442,43 +431,60 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
    *
    * Only update TableName in QuerySource if there is no existing table in the 
format of [database_name].[table_name],
    * but only [table_name].
-   *
-   * @param brokerRequest
    */
-  private void updateQuerySource(BrokerRequest brokerRequest) {
+  private void updateTableName(BrokerRequest brokerRequest) {
     String tableName = brokerRequest.getQuerySource().getTableName();
-    // Check if table is in the format of [database_name].[table_name]
-    String[] tableNameSplits = StringUtils.split(tableName, ".", 2);
-    // Update table name if there is no existing table in the format of 
[database_name].[table_name] but only [table_name]
-    if (_enableCaseInsensitive) {
-      if (tableNameSplits.length < 2) {
-        
brokerRequest.getQuerySource().setTableName(_tableCache.getActualTableName(tableName));
+
+    // Use TableCache to handle case-insensitive table name
+    if (_tableCache.isCaseInsensitive()) {
+      String actualTableName = _tableCache.getActualTableName(tableName);
+      if (actualTableName != null) {
+        setTableName(brokerRequest, actualTableName);
         return;
       }
-      if (_tableCache.containsTable(tableNameSplits[1]) && 
!_tableCache.containsTable(tableName)) {
-        // Use TableCache to check case insensitive table name.
-        
brokerRequest.getQuerySource().setTableName(_tableCache.getActualTableName(tableNameSplits[1]));
+
+      // Check if table is in the format of [database_name].[table_name]
+      String[] tableNameSplits = StringUtils.split(tableName, ".", 2);
+      if (tableNameSplits.length == 2) {
+        actualTableName = _tableCache.getActualTableName(tableNameSplits[1]);
+        if (actualTableName != null) {
+          setTableName(brokerRequest, actualTableName);
+          return;
+        }
       }
+
       return;
     }
-    if (tableNameSplits.length < 2) {
+
+    // Check if table is in the format of [database_name].[table_name]
+    String[] tableNameSplits = StringUtils.split(tableName, ".", 2);
+    if (tableNameSplits.length != 2) {
       return;
     }
-    // Use RoutingManager to check case sensitive table name.
+
+    // Use RoutingManager to handle case-sensitive table name
+    // Update table name if there is no existing table in the format of 
[database_name].[table_name] but only [table_name]
     if (TableNameBuilder.isTableResource(tableName)) {
       if (_routingManager.routingExists(tableNameSplits[1]) && 
!_routingManager.routingExists(tableName)) {
-        brokerRequest.getQuerySource().setTableName(tableNameSplits[1]);
+        setTableName(brokerRequest, tableNameSplits[1]);
       }
       return;
     }
+    if 
(_routingManager.routingExists(TableNameBuilder.OFFLINE.tableNameWithType(tableNameSplits[1]))
+        && 
!_routingManager.routingExists(TableNameBuilder.OFFLINE.tableNameWithType(tableName)))
 {
+      setTableName(brokerRequest, tableNameSplits[1]);
+      return;
+    }
     if 
(_routingManager.routingExists(TableNameBuilder.REALTIME.tableNameWithType(tableNameSplits[1]))
         && 
!_routingManager.routingExists(TableNameBuilder.REALTIME.tableNameWithType(tableName)))
 {
-      brokerRequest.getQuerySource().setTableName(tableNameSplits[1]);
-      return;
+      setTableName(brokerRequest, tableNameSplits[1]);
     }
-    if 
(_routingManager.routingExists(TableNameBuilder.OFFLINE.tableNameWithType(tableNameSplits[1]))
-        && 
!_routingManager.routingExists(TableNameBuilder.OFFLINE.tableNameWithType(tableName)))
 {
-      brokerRequest.getQuerySource().setTableName(tableNameSplits[1]);
+  }
+
+  private void setTableName(BrokerRequest brokerRequest, String tableName) {
+    brokerRequest.getQuerySource().setTableName(tableName);
+    if (brokerRequest.getPinotQuery() != null) {
+      brokerRequest.getPinotQuery().getDataSource().setTableName(tableName);
     }
   }
 
@@ -672,14 +678,15 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
    * Fixes the column names to the actual column names in the given broker 
request.
    */
   private void updateColumnNames(BrokerRequest brokerRequest) {
-    String tableName = brokerRequest.getQuerySource().getTableName();
-    //fix columns
+    String rawTableName = 
TableNameBuilder.extractRawTableName(brokerRequest.getQuerySource().getTableName());
+    Map<String, String> columnNameMap =
+        _tableCache.isCaseInsensitive() ? 
_tableCache.getColumnNameMap(rawTableName) : null;
+
     if (brokerRequest.getFilterSubQueryMap() != null) {
       Collection<FilterQuery> values = 
brokerRequest.getFilterSubQueryMap().getFilterQueryMap().values();
       for (FilterQuery filterQuery : values) {
         if (filterQuery.getNestedFilterQueryIdsSize() == 0) {
-          String expression = filterQuery.getColumn();
-          filterQuery.setColumn(fixColumnName(tableName, expression));
+          filterQuery.setColumn(fixColumnName(rawTableName, 
filterQuery.getColumn(), columnNameMap));
         }
       }
     }
@@ -688,14 +695,14 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
         if 
(!info.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName()))
 {
           // Always read from backward compatible api in 
AggregationFunctionUtils.
           List<String> arguments = AggregationFunctionUtils.getArguments(info);
-          arguments.replaceAll(e -> fixColumnName(tableName, e));
+          arguments.replaceAll(e -> fixColumnName(rawTableName, e, 
columnNameMap));
           info.setExpressions(arguments);
         }
       }
       if (brokerRequest.isSetGroupBy()) {
         List<String> expressions = brokerRequest.getGroupBy().getExpressions();
         for (int i = 0; i < expressions.size(); i++) {
-          expressions.set(i, fixColumnName(tableName, expressions.get(i)));
+          expressions.set(i, fixColumnName(rawTableName, expressions.get(i), 
columnNameMap));
         }
       }
     } else {
@@ -704,7 +711,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       for (int i = 0; i < selectionColumns.size(); i++) {
         String expression = selectionColumns.get(i);
         if (!expression.equals("*")) {
-          selectionColumns.set(i, fixColumnName(tableName, expression));
+          selectionColumns.set(i, fixColumnName(rawTableName, expression, 
columnNameMap));
         }
       }
     }
@@ -712,86 +719,87 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       List<SelectionSort> orderBy = brokerRequest.getOrderBy();
       for (SelectionSort selectionSort : orderBy) {
         String expression = selectionSort.getColumn();
-        selectionSort.setColumn(fixColumnName(tableName, expression));
+        selectionSort.setColumn(fixColumnName(rawTableName, expression, 
columnNameMap));
       }
     }
 
     PinotQuery pinotQuery = brokerRequest.getPinotQuery();
     if (pinotQuery != null) {
-      pinotQuery.getDataSource().setTableName(tableName);
       for (Expression expression : pinotQuery.getSelectList()) {
-        fixColumnName(tableName, expression);
+        fixColumnName(rawTableName, expression, columnNameMap);
       }
       Expression filterExpression = pinotQuery.getFilterExpression();
       if (filterExpression != null) {
-        fixColumnName(tableName, filterExpression);
+        fixColumnName(rawTableName, filterExpression, columnNameMap);
       }
       List<Expression> groupByList = pinotQuery.getGroupByList();
       if (groupByList != null) {
         for (Expression expression : groupByList) {
-          fixColumnName(tableName, expression);
+          fixColumnName(rawTableName, expression, columnNameMap);
         }
       }
       List<Expression> orderByList = pinotQuery.getOrderByList();
       if (orderByList != null) {
         for (Expression expression : orderByList) {
-          fixColumnName(tableName, expression);
+          fixColumnName(rawTableName, expression, columnNameMap);
         }
       }
       Expression havingExpression = pinotQuery.getHavingExpression();
       if (havingExpression != null) {
-        fixColumnName(tableName, havingExpression);
+        fixColumnName(rawTableName, havingExpression, columnNameMap);
       }
     }
   }
 
-  private String fixColumnName(String tableNameWithType, String expression) {
+  private String fixColumnName(String rawTableName, String expression, 
@Nullable Map<String, String> columnNameMap) {
     TransformExpressionTree expressionTree = 
TransformExpressionTree.compileToExpressionTree(expression);
-    fixColumnName(tableNameWithType, expressionTree);
+    fixColumnName(rawTableName, expressionTree, columnNameMap);
     return expressionTree.toString();
   }
 
-  private void fixColumnName(String tableNameWithType, TransformExpressionTree 
expression) {
+  private void fixColumnName(String rawTableName, TransformExpressionTree 
expression,
+      @Nullable Map<String, String> columnNameMap) {
     TransformExpressionTree.ExpressionType expressionType = 
expression.getExpressionType();
     if (expressionType == TransformExpressionTree.ExpressionType.IDENTIFIER) {
-      String identifier = expression.getValue();
-      expression.setValue(getActualColumnName(tableNameWithType, identifier));
+      expression.setValue(getActualColumnName(rawTableName, 
expression.getValue(), columnNameMap));
     } else if (expressionType == 
TransformExpressionTree.ExpressionType.FUNCTION) {
       for (TransformExpressionTree child : expression.getChildren()) {
-        fixColumnName(tableNameWithType, child);
+        fixColumnName(rawTableName, child, columnNameMap);
       }
     }
   }
 
-  private void fixColumnName(String tableNameWithType, Expression expression) {
+  private void fixColumnName(String rawTableName, Expression expression, 
@Nullable Map<String, String> columnNameMap) {
     ExpressionType expressionType = expression.getType();
     if (expressionType == ExpressionType.IDENTIFIER) {
       Identifier identifier = expression.getIdentifier();
-      identifier.setName(getActualColumnName(tableNameWithType, 
identifier.getName()));
+      identifier.setName(getActualColumnName(rawTableName, 
identifier.getName(), columnNameMap));
     } else if (expressionType == ExpressionType.FUNCTION) {
       for (Expression operand : expression.getFunctionCall().getOperands()) {
-        fixColumnName(tableNameWithType, operand);
+        fixColumnName(rawTableName, operand, columnNameMap);
       }
     }
   }
 
-  private String getActualColumnName(String tableNameWithType, String 
columnName) {
+  private String getActualColumnName(String rawTableName, String columnName,
+      @Nullable Map<String, String> columnNameMap) {
+    // Check if column is in the format of [table_name].[column_name]
     String[] splits = StringUtils.split(columnName, ".", 2);
-    if (_enableCaseInsensitive) {
-      if (splits.length == 2) {
-        if 
(TableNameBuilder.extractRawTableName(tableNameWithType).equalsIgnoreCase(splits[0]))
 {
-          return _tableCache.getActualColumnName(tableNameWithType, splits[1]);
-        }
+    if (_tableCache.isCaseInsensitive()) {
+      if (splits.length == 2 && rawTableName.equalsIgnoreCase(splits[0])) {
+        columnName = splits[1];
+      }
+      if (columnNameMap != null) {
+        return columnNameMap.getOrDefault(columnName, columnName);
+      } else {
+        return columnName;
       }
-      return _tableCache.getActualColumnName(tableNameWithType, columnName);
     } else {
-      if (splits.length == 2) {
-        if 
(TableNameBuilder.extractRawTableName(tableNameWithType).equals(splits[0])) {
-          return splits[1];
-        }
+      if (splits.length == 2 && rawTableName.equals(splits[0])) {
+        columnName = splits[1];
       }
+      return columnName;
     }
-    return columnName;
   }
 
   private static Map<String, String> getOptionsFromJson(JsonNode request, 
String optionsKey) {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 420177a..1f67d88 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -21,12 +21,8 @@ package org.apache.pinot.broker.requesthandler;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
-
-import org.apache.helix.ZNRecord;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.broker.api.RequestStatistics;
 import org.apache.pinot.broker.broker.AccessControlFactory;
 import org.apache.pinot.broker.queryquota.QueryQuotaManager;
@@ -41,6 +37,7 @@ import 
org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.QueryProcessingException;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.common.utils.helix.TableCache;
 import org.apache.pinot.core.transport.AsyncQueryResponse;
 import org.apache.pinot.core.transport.QueryRouter;
 import org.apache.pinot.core.transport.ServerInstance;
@@ -59,9 +56,9 @@ public class SingleConnectionBrokerRequestHandler extends 
BaseBrokerRequestHandl
   private final QueryRouter _queryRouter;
 
   public SingleConnectionBrokerRequestHandler(PinotConfiguration config, 
RoutingManager routingManager,
-      AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, BrokerMetrics brokerMetrics,
-      ZkHelixPropertyStore<ZNRecord> propertyStore) {
-    super(config, routingManager, accessControlFactory, queryQuotaManager, 
brokerMetrics, propertyStore);
+      AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
+      BrokerMetrics brokerMetrics) {
+    super(config, routingManager, accessControlFactory, queryQuotaManager, 
tableCache, brokerMetrics);
     _queryRouter = new QueryRouter(_brokerId, brokerMetrics);
   }
 
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
index 0dbb2e2..18f9e86 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pinot.broker.requesthandler;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.yammer.metrics.core.MetricsRegistry;
 import java.util.Random;
-
 import org.apache.pinot.broker.api.RequestStatistics;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
@@ -30,10 +32,6 @@ import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.yammer.metrics.core.MetricsRegistry;
-
 
 public class LiteralOnlyBrokerRequestTest {
   private static final Random RANDOM = new Random(System.currentTimeMillis());
@@ -92,8 +90,8 @@ public class LiteralOnlyBrokerRequestTest {
   public void testBrokerRequestHandler()
       throws Exception {
     SingleConnectionBrokerRequestHandler requestHandler =
-        new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), 
null, null, null,
-            new BrokerMetrics("", new MetricsRegistry(), false), null);
+        new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), 
null, null, null, null,
+            new BrokerMetrics("", new MetricsRegistry(), false));
     long randNum = RANDOM.nextLong();
     byte[] randBytes = new byte[12];
     RANDOM.nextBytes(randBytes);
@@ -119,8 +117,8 @@ public class LiteralOnlyBrokerRequestTest {
   public void testBrokerRequestHandlerWithAsFunction()
       throws Exception {
     SingleConnectionBrokerRequestHandler requestHandler =
-        new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), 
null, null, null,
-            new BrokerMetrics("", new MetricsRegistry(), false), null);
+        new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), 
null, null, null, null,
+            new BrokerMetrics("", new MetricsRegistry(), false));
     long currentTsMin = System.currentTimeMillis();
     JsonNode request = new ObjectMapper().readTree(
         "{\"sql\":\"SELECT now() as currentTs, fromDateTime('2020-01-01 UTC', 
'yyyy-MM-dd z') as firstDayOf2020\"}");
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
index 4e3dd4d..dbd360d 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
@@ -18,19 +18,23 @@
  */
 package org.apache.pinot.common.utils.helix;
 
-import java.util.Collection;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
@@ -38,170 +42,284 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- *  Caches table config and schema of a table.
- *  At the start - loads all the table configs and schemas in map.
- *  sets up a zookeeper listener that watches for any change and updates the 
cache.
- *  TODO: optimize to load only changed table configs/schema on a callback.
- *  TODO: Table deletes are not handled as of now
- *  Goal is to eventually grow this into a PinotClusterDataAccessor
+ * The {@code TableCache} caches all the table configs and schemas within the 
cluster, and listens on ZK changes to keep
+ * them in sync. It also maintains the table name map and the column name map 
for case-insensitive queries.
  */
 public class TableCache {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TableCache.class);
+  private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE";
+  private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
+  private static final String SCHEMA_PARENT_PATH = "/SCHEMAS";
+  private static final String SCHEMA_PATH_PREFIX = "/SCHEMAS/";
+  private static final String LOWER_CASE_OFFLINE_TABLE_SUFFIX = "_offline";
+  private static final String LOWER_CASE_REALTIME_TABLE_SUFFIX = "_realtime";
 
-  private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS";
-  private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX = 
"/CONFIGS/TABLE";
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  private final boolean _caseInsensitive;
+  // For case insensitive, key is lower case table name, value is actual table 
name
+  private final Map<String, String> _tableNameMap;
 
-  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
-  TableConfigChangeListener _tableConfigChangeListener;
-  SchemaChangeListener _schemaChangeListener;
+  // Key is table name with type suffix
+  private final TableConfigChangeListener _tableConfigChangeListener = new 
TableConfigChangeListener();
+  private final Map<String, TableConfig> _tableConfigMap = new 
ConcurrentHashMap<>();
+  // Key is raw table name
+  private final SchemaChangeListener _schemaChangeListener = new 
SchemaChangeListener();
+  private final Map<String, SchemaInfo> _schemaInfoMap = new 
ConcurrentHashMap<>();
 
-  public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+  public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore, boolean 
caseInsensitive) {
     _propertyStore = propertyStore;
-    _schemaChangeListener = new SchemaChangeListener();
-    _schemaChangeListener.refresh();
-    _tableConfigChangeListener = new TableConfigChangeListener();
-    _tableConfigChangeListener.refresh();
+    _caseInsensitive = caseInsensitive;
+    _tableNameMap = caseInsensitive ? new ConcurrentHashMap<>() : null;
+
+    synchronized (_tableConfigChangeListener) {
+      // Subscribe child changes before reading the data to avoid missing 
changes
+      _propertyStore.subscribeChildChanges(TABLE_CONFIG_PARENT_PATH, 
_tableConfigChangeListener);
+
+      List<String> tables = 
_propertyStore.getChildNames(TABLE_CONFIG_PARENT_PATH, AccessOption.PERSISTENT);
+      if (CollectionUtils.isNotEmpty(tables)) {
+        List<String> pathsToAdd = new ArrayList<>(tables.size());
+        for (String tableNameWithType : tables) {
+          pathsToAdd.add(TABLE_CONFIG_PATH_PREFIX + tableNameWithType);
+        }
+        addTableConfigs(pathsToAdd);
+      }
+    }
+
+    synchronized (_schemaChangeListener) {
+      // Subscribe child changes before reading the data to avoid missing 
changes
+      _propertyStore.subscribeChildChanges(SCHEMA_PARENT_PATH, 
_schemaChangeListener);
+
+      List<String> tables = _propertyStore.getChildNames(SCHEMA_PARENT_PATH, 
AccessOption.PERSISTENT);
+      if (CollectionUtils.isNotEmpty(tables)) {
+        List<String> pathsToAdd = new ArrayList<>(tables.size());
+        for (String rawTableName : tables) {
+          pathsToAdd.add(SCHEMA_PATH_PREFIX + rawTableName);
+        }
+        addSchemas(pathsToAdd);
+      }
+    }
+
+    LOGGER.info("Initialized TableCache with caseInsensitive: {}", 
caseInsensitive);
   }
 
-  public String getActualTableName(String tableName) {
-    return 
_tableConfigChangeListener._tableNameMap.getOrDefault(tableName.toLowerCase(), 
tableName);
+  /**
+   * Returns {@code true} if the TableCache is case-insensitive, {@code false} 
otherwise.
+   */
+  public boolean isCaseInsensitive() {
+    return _caseInsensitive;
   }
 
-  public boolean containsTable(String tableName) {
-    return 
_tableConfigChangeListener._tableNameMap.containsKey(tableName.toLowerCase());
+  /**
+   * For case-insensitive only, returns the actual table name for the given 
case-insensitive table name (with or without
+   * type suffix), or {@code null} if the table does not exist.
+   */
+  @Nullable
+  public String getActualTableName(String caseInsensitiveTableName) {
+    Preconditions.checkState(_caseInsensitive, "TableCache is not 
case-insensitive");
+    return _tableNameMap.get(caseInsensitiveTableName.toLowerCase());
   }
 
-  public String getActualColumnName(String tableName, String columnName) {
-    String schemaName = 
_tableConfigChangeListener._table2SchemaConfigMap.get(tableName.toLowerCase());
-    if (schemaName != null) {
-      String actualColumnName = 
_schemaChangeListener.getColumnName(schemaName, columnName);
-      // If actual column name doesn't exist in schema, then return the origin 
column name.
-      if (actualColumnName == null) {
-        return columnName;
+  /**
+   * For case-insensitive only, returns a map from lower case column name to 
actual column name for the given table, or
+   * {@code null} if the table schema does not exist.
+   */
+  @Nullable
+  public Map<String, String> getColumnNameMap(String rawTableName) {
+    Preconditions.checkState(_caseInsensitive, "TableCache is not 
case-insensitive");
+    SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
+    return schemaInfo != null ? schemaInfo._columnNameMap : null;
+  }
+
+  /**
+   * Returns the table config for the given table, or {@code null} if it does 
not exist.
+   */
+  @Nullable
+  public TableConfig getTableConfig(String tableNameWithType) {
+    return _tableConfigMap.get(tableNameWithType);
+  }
+
+  /**
+   * Returns the schema for the given table, or {@code null} if it does not 
exist.
+   */
+  @Nullable
+  public Schema getSchema(String rawTableName) {
+    SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
+    return schemaInfo != null ? schemaInfo._schema : null;
+  }
+
+  private void addTableConfigs(List<String> paths) {
+    // Subscribe data changes before reading the data to avoid missing changes
+    for (String path : paths) {
+      _propertyStore.subscribeDataChanges(path, _tableConfigChangeListener);
+    }
+    List<ZNRecord> znRecords = _propertyStore.get(paths, null, 
AccessOption.PERSISTENT);
+    for (ZNRecord znRecord : znRecords) {
+      if (znRecord != null) {
+        try {
+          putTableConfig(znRecord);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while adding table config for 
ZNRecord: {}", znRecord.getId(), e);
+        }
       }
-      return actualColumnName;
     }
-    return columnName;
   }
 
-  public TableConfig getTableConfig(String tableName) {
-    return _tableConfigChangeListener._tableConfigMap.get(tableName);
+  private void putTableConfig(ZNRecord znRecord)
+      throws IOException {
+    TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord);
+    String tableNameWithType = tableConfig.getTableName();
+    _tableConfigMap.put(tableNameWithType, tableConfig);
+    if (_caseInsensitive) {
+      _tableNameMap.put(tableNameWithType.toLowerCase(), tableNameWithType);
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      _tableNameMap.put(rawTableName.toLowerCase(), rawTableName);
+    }
   }
 
-  class TableConfigChangeListener implements IZkChildListener, IZkDataListener 
{
-
-    Map<String, TableConfig> _tableConfigMap = new ConcurrentHashMap<>();
-    Map<String, String> _tableNameMap = new ConcurrentHashMap<>();
-    Map<String, String> _table2SchemaConfigMap = new ConcurrentHashMap<>();
-
-    public synchronized void refresh() {
-      try {
-        //always subscribe first before reading, so that we dont miss any 
changes
-        
_propertyStore.subscribeChildChanges(PROPERTYSTORE_TABLE_CONFIGS_PREFIX, 
_tableConfigChangeListener);
-        
_propertyStore.subscribeDataChanges(PROPERTYSTORE_TABLE_CONFIGS_PREFIX, 
_tableConfigChangeListener);
-        List<ZNRecord> children =
-            _propertyStore.getChildren(PROPERTYSTORE_TABLE_CONFIGS_PREFIX, 
null, AccessOption.PERSISTENT);
-        if (children != null) {
-          for (ZNRecord znRecord : children) {
-            try {
-              TableConfig tableConfig = 
TableConfigUtils.fromZNRecord(znRecord);
-              String tableNameWithType = tableConfig.getTableName();
-              _tableConfigMap.put(tableNameWithType, tableConfig);
-              String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
-              //create case insensitive mapping
-              _tableNameMap.put(tableNameWithType.toLowerCase(), 
tableNameWithType);
-              _tableNameMap.put(rawTableName.toLowerCase(), rawTableName);
-              //create case insensitive mapping between table name and 
schemaName
-              _table2SchemaConfigMap.put(tableNameWithType.toLowerCase(), 
rawTableName);
-              _table2SchemaConfigMap.put(rawTableName.toLowerCase(), 
rawTableName);
-            } catch (Exception e) {
-              LOGGER.warn("Exception loading table config for: {}: {}", 
znRecord.getId(), e.getMessage());
-              //ignore
-            }
-          }
+  private void removeTableConfig(String path) {
+    _propertyStore.unsubscribeDataChanges(path, _tableConfigChangeListener);
+    String tableNameWithType = 
path.substring(TABLE_CONFIG_PATH_PREFIX.length());
+    _tableConfigMap.remove(tableNameWithType);
+    if (_caseInsensitive) {
+      _tableNameMap.remove(tableNameWithType.toLowerCase());
+      String lowerCaseRawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType).toLowerCase();
+      if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+        if (!_tableNameMap.containsKey(lowerCaseRawTableName + 
LOWER_CASE_REALTIME_TABLE_SUFFIX)) {
+          _tableNameMap.remove(lowerCaseRawTableName);
+        }
+      } else {
+        if (!_tableNameMap.containsKey(lowerCaseRawTableName + 
LOWER_CASE_OFFLINE_TABLE_SUFFIX)) {
+          _tableNameMap.remove(lowerCaseRawTableName);
         }
-      } catch (Exception e) {
-        LOGGER.warn("Exception subscribing/reading tableconfigs", e);
-        //ignore
       }
     }
+  }
+
+  private void addSchemas(List<String> paths) {
+    // Subscribe data changes before reading the data to avoid missing changes
+    for (String path : paths) {
+      _propertyStore.subscribeDataChanges(path, _schemaChangeListener);
+    }
+    List<ZNRecord> znRecords = _propertyStore.get(paths, null, 
AccessOption.PERSISTENT);
+    for (ZNRecord znRecord : znRecords) {
+      if (znRecord != null) {
+        try {
+          putSchema(znRecord);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while adding schema for ZNRecord: 
{}", znRecord.getId(), e);
+        }
+      }
+    }
+  }
+
+  private void putSchema(ZNRecord znRecord)
+      throws IOException {
+    Schema schema = SchemaUtils.fromZNRecord(znRecord);
+    String rawTableName = schema.getSchemaName();
+    if (_caseInsensitive) {
+      Map<String, String> columnNameMap = new HashMap<>();
+      for (String columnName : schema.getColumnNames()) {
+        columnNameMap.put(columnName.toLowerCase(), columnName);
+      }
+      _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, columnNameMap));
+    } else {
+      _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, null));
+    }
+  }
+
+  private void removeSchema(String path) {
+    _propertyStore.unsubscribeDataChanges(path, _schemaChangeListener);
+    String rawTableName = path.substring(SCHEMA_PATH_PREFIX.length());
+    _schemaInfoMap.remove(rawTableName);
+  }
+
+  private class TableConfigChangeListener implements IZkChildListener, 
IZkDataListener {
 
     @Override
-    public void handleChildChange(String s, List<String> list)
-        throws Exception {
-      refresh();
+    public synchronized void handleChildChange(String path, List<String> 
tables) {
+      if (CollectionUtils.isEmpty(tables)) {
+        return;
+      }
+
+      // Only process new added table configs. Changed/removed table configs 
are handled by other callbacks.
+      List<String> pathsToAdd = new ArrayList<>();
+      for (String tableNameWithType : tables) {
+        if (!_tableConfigMap.containsKey(tableNameWithType)) {
+          pathsToAdd.add(TABLE_CONFIG_PATH_PREFIX + tableNameWithType);
+        }
+      }
+      if (!pathsToAdd.isEmpty()) {
+        addTableConfigs(pathsToAdd);
+      }
     }
 
     @Override
-    public void handleDataChange(String s, Object o)
-        throws Exception {
-      refresh();
+    public synchronized void handleDataChange(String path, Object data) {
+      if (data != null) {
+        ZNRecord znRecord = (ZNRecord) data;
+        try {
+          putTableConfig(znRecord);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while refreshing table config for 
ZNRecord: {}", znRecord.getId(), e);
+        }
+      }
     }
 
     @Override
-    public void handleDataDeleted(String s)
-        throws Exception {
-      refresh();
+    public synchronized void handleDataDeleted(String path) {
+      // NOTE: The path here is the absolute ZK path instead of the relative 
path to the property store.
+      String tableNameWithType = path.substring(path.lastIndexOf('/') + 1);
+      removeTableConfig(TABLE_CONFIG_PATH_PREFIX + tableNameWithType);
     }
   }
 
-  class SchemaChangeListener implements IZkChildListener, IZkDataListener {
-    Map<String, Map<String, String>> _schemaColumnMap = new 
ConcurrentHashMap<>();
-
-    public synchronized void refresh() {
-      try {
-        //always subscribe first before reading, so that we dont miss any 
changes between reading and setting the watcher again
-        _propertyStore.subscribeChildChanges(PROPERTYSTORE_SCHEMAS_PREFIX, 
_schemaChangeListener);
-        _propertyStore.subscribeDataChanges(PROPERTYSTORE_SCHEMAS_PREFIX, 
_schemaChangeListener);
-        List<ZNRecord> children =
-            _propertyStore.getChildren(PROPERTYSTORE_SCHEMAS_PREFIX, null, 
AccessOption.PERSISTENT);
-        if (children != null) {
-          for (ZNRecord znRecord : children) {
-            try {
-              Schema schema = SchemaUtils.fromZNRecord(znRecord);
-              String schemaNameLowerCase = 
schema.getSchemaName().toLowerCase();
-              Collection<FieldSpec> allFieldSpecs = schema.getAllFieldSpecs();
-              ConcurrentHashMap<String, String> columnNameMap = new 
ConcurrentHashMap<>();
-              _schemaColumnMap.put(schemaNameLowerCase, columnNameMap);
-              for (FieldSpec fieldSpec : allFieldSpecs) {
-                columnNameMap.put(fieldSpec.getName().toLowerCase(), 
fieldSpec.getName());
-              }
-            } catch (Exception e) {
-              LOGGER.warn("Exception loading schema for: {}: {}", 
znRecord.getId(), e.getMessage());
-              //ignore
-            }
-          }
-        }
-      } catch (Exception e) {
-        LOGGER.warn("Exception subscribing/reading schemas", e);
-        //ignore
+  private class SchemaChangeListener implements IZkChildListener, 
IZkDataListener {
+
+    @Override
+    public synchronized void handleChildChange(String path, List<String> 
tables) {
+      if (CollectionUtils.isEmpty(tables)) {
+        return;
       }
-    }
 
-    String getColumnName(String schemaName, String columnName) {
-      Map<String, String> columnNameMap = 
_schemaColumnMap.get(schemaName.toLowerCase());
-      if (columnNameMap != null) {
-        return columnNameMap.get(columnName.toLowerCase());
+      // Only process new added schemas. Changed/removed schemas are handled 
by other callbacks.
+      List<String> pathsToAdd = new ArrayList<>();
+      for (String rawTableName : tables) {
+        if (!_schemaInfoMap.containsKey(rawTableName)) {
+          pathsToAdd.add(SCHEMA_PATH_PREFIX + rawTableName);
+        }
+      }
+      if (!pathsToAdd.isEmpty()) {
+        addSchemas(pathsToAdd);
       }
-      return columnName;
     }
 
     @Override
-    public void handleChildChange(String s, List<String> list)
-        throws Exception {
-      refresh();
+    public synchronized void handleDataChange(String path, Object data) {
+      if (data != null) {
+        ZNRecord znRecord = (ZNRecord) data;
+        try {
+          putSchema(znRecord);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while refreshing schema for ZNRecord: 
{}", znRecord.getId(), e);
+        }
+      }
     }
 
     @Override
-    public void handleDataChange(String s, Object o)
-        throws Exception {
-      refresh();
+    public synchronized void handleDataDeleted(String path) {
+      // NOTE: The path here is the absolute ZK path instead of the relative 
path to the property store.
+      String rawTableName = path.substring(path.lastIndexOf('/') + 1);
+      removeSchema(SCHEMA_PATH_PREFIX + rawTableName);
     }
+  }
 
-    @Override
-    public void handleDataDeleted(String s)
-        throws Exception {
-      refresh();
+  private static class SchemaInfo {
+    final Schema _schema;
+    final Map<String, String> _columnNameMap;
+
+    private SchemaInfo(Schema schema, Map<String, String> columnNameMap) {
+      _schema = schema;
+      _columnNameMap = columnNameMap;
     }
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 7d5bd61..5cb528c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -27,6 +27,7 @@ import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -204,7 +205,15 @@ public class PinotHelixResourceManager {
     addInstanceGroupTagIfNeeded();
     _segmentDeletionManager = new SegmentDeletionManager(_dataDir, 
_helixAdmin, _helixClusterName, _propertyStore);
     ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore, 
_isSingleTenantCluster);
-    _tableCache = new TableCache(_propertyStore);
+
+    // Initialize TableCache
+    HelixConfigScope helixConfigScope =
+        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(_helixClusterName).build();
+    Map<String, String> configs = _helixAdmin.getConfig(helixConfigScope,
+        Arrays.asList(Helix.ENABLE_CASE_INSENSITIVE_KEY, 
Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY));
+    boolean caseInsensitive = 
Boolean.parseBoolean(configs.get(Helix.ENABLE_CASE_INSENSITIVE_KEY)) || Boolean
+        
.parseBoolean(configs.get(Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY));
+    _tableCache = new TableCache(_propertyStore, caseInsensitive);
   }
 
   /**
@@ -467,28 +476,23 @@ public class PinotHelixResourceManager {
    * @return tableName actually defined in Pinot (matches case) and exists 
,else, return the input value
    */
   public String getActualTableName(String tableName) {
-    return _tableCache.getActualTableName(tableName);
-  }
-
-  /**
-   *  Given a column name in any case, returns the column name as defined in 
Schema
-   *  If table has no schema, it just returns the input value
-   * @param tableName
-   * @param columnName
-   * @return
-   */
-  public String getActualColumnName(String tableName, String columnName) {
-    return _tableCache.getActualColumnName(tableName, columnName);
+    if (_tableCache.isCaseInsensitive()) {
+      String actualTableName = _tableCache.getActualTableName(tableName);
+      return actualTableName != null ? actualTableName : tableName;
+    } else {
+      return tableName;
+    }
   }
 
   /**
-   * Given a table name in any case, returns crypter class name defined in 
table config
-   * @param tableName table name in any case
+   * Returns the crypter class name defined in the table config for the given 
table.
+   *
+   * @param tableNameWithType Table name with type suffix
    * @return crypter class name
    */
-  public String getCrypterClassNameFromTableConfig(String tableName) {
-    TableConfig tableConfig = _tableCache.getTableConfig(tableName);
-    Preconditions.checkNotNull(tableConfig, "Table config is not available for 
table '%s'", tableName);
+  public String getCrypterClassNameFromTableConfig(String tableNameWithType) {
+    TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+    Preconditions.checkNotNull(tableConfig, "Table config is not available for 
table '%s'", tableNameWithType);
     return tableConfig.getValidationConfig().getCrypterClassName();
   }
 
@@ -1761,8 +1765,7 @@ public class PinotHelixResourceManager {
     propToUpdate.put(Helix.QUERY_RATE_LIMIT_DISABLED, 
Boolean.toString("DISABLE".equals(state)));
     HelixConfigScope scope =
         new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT, 
_helixClusterName)
-            .forParticipant(brokerInstanceName)
-            .build();
+            .forParticipant(brokerInstanceName).build();
     _helixAdmin.setConfig(scope, propToUpdate);
   }
 
@@ -2264,10 +2267,10 @@ public class PinotHelixResourceManager {
           LineageEntry lineageEntry = segmentLineage.getLineageEntry(entryId);
 
           // Check that any segment from 'segmentsFrom' does not appear twice.
-          
Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(),
 segmentsFrom), String
-              .format("It is not allowed to merge segments that are already 
merged. (tableName = %s, segmentsFrom from "
-                      + "existing lineage entry = %s, requested segmentsFrom = 
%s)", tableNameWithType,
-                  lineageEntry.getSegmentsFrom(), segmentsFrom));
+          
Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(),
 segmentsFrom), String.format(
+              "It is not allowed to merge segments that are already merged. 
(tableName = %s, segmentsFrom from "
+                  + "existing lineage entry = %s, requested segmentsFrom = 
%s)", tableNameWithType,
+              lineageEntry.getSegmentsFrom(), segmentsFrom));
 
           // Check that merged segments name cannot be the same.
           
Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), 
segmentsTo), String.format(
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
new file mode 100644
index 0000000..9ba206d
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix;
+
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.utils.helix.TableCache;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TableCacheTest extends ControllerTest {
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    startZk();
+    startController();
+    addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+    addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+  }
+
+  @Test
+  public void testTableCache()
+      throws Exception {
+    TableCache tableCache = new TableCache(_propertyStore, true);
+
+    assertTrue(tableCache.isCaseInsensitive());
+    assertNull(tableCache.getActualTableName("testTable"));
+    assertNull(tableCache.getColumnNameMap("testTable"));
+    assertNull(tableCache.getTableConfig("testTable_OFFLINE"));
+    assertNull(tableCache.getSchema("testTable"));
+
+    // Add a table config
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+    _helixResourceManager.addTable(tableConfig);
+    // Wait for at most 10 seconds for the callback to add the table config to 
the cache
+    TestUtils.waitForCondition(aVoid -> 
tableCache.getTableConfig("testTable_OFFLINE") != null, 10_000L,
+        "Failed to add the table config to the cache");
+    assertEquals(tableCache.getActualTableName("TeStTaBlE"), "testTable");
+    assertEquals(tableCache.getActualTableName("TeStTaBlE_oFfLiNe"), 
"testTable_OFFLINE");
+    assertNull(tableCache.getActualTableName("testTable_REALTIME"));
+    assertNull(tableCache.getColumnNameMap("testTable"));
+    assertEquals(tableCache.getTableConfig("testTable_OFFLINE"), tableConfig);
+    assertNull(tableCache.getSchema("testTable"));
+
+    // Update the table config
+    
tableConfig.getIndexingConfig().setCreateInvertedIndexDuringSegmentGeneration(true);
+    _helixResourceManager.updateTableConfig(tableConfig);
+    // Wait for at most 10 seconds for the callback to update the table config 
in the cache
+    // NOTE: Table config should never be null during the transitioning
+    TestUtils.waitForCondition(
+        aVoid -> 
Preconditions.checkNotNull(tableCache.getTableConfig("testTable_OFFLINE")).equals(tableConfig),
+        10_000L, "Failed to update the table config in the cache");
+    assertEquals(tableCache.getActualTableName("TeStTaBlE"), "testTable");
+    assertEquals(tableCache.getActualTableName("TeStTaBlE_oFfLiNe"), 
"testTable_OFFLINE");
+    assertNull(tableCache.getActualTableName("testTable_REALTIME"));
+    assertNull(tableCache.getColumnNameMap("testTable"));
+    assertEquals(tableCache.getTableConfig("testTable_OFFLINE"), tableConfig);
+    assertNull(tableCache.getSchema("testTable"));
+
+    // Add a schema
+    Schema schema =
+        new 
Schema.SchemaBuilder().setSchemaName("testTable").addSingleValueDimension("testColumn",
 DataType.INT)
+            .build();
+    _helixResourceManager.addSchema(schema, false);
+    // Wait for at most 10 seconds for the callback to add the schema to the 
cache
+    TestUtils.waitForCondition(aVoid -> tableCache.getSchema("testTable") != 
null, 10_000L,
+        "Failed to add the schema to the cache");
+    assertEquals(tableCache.getActualTableName("TeStTaBlE"), "testTable");
+    assertEquals(tableCache.getActualTableName("TeStTaBlE_oFfLiNe"), 
"testTable_OFFLINE");
+    assertNull(tableCache.getActualTableName("testTable_REALTIME"));
+    assertEquals(tableCache.getColumnNameMap("testTable"), 
Collections.singletonMap("testcolumn", "testColumn"));
+    assertEquals(tableCache.getTableConfig("testTable_OFFLINE"), tableConfig);
+    assertEquals(tableCache.getSchema("testTable"), schema);
+
+    // Update the schema
+    schema.addField(new DimensionFieldSpec("newColumn", DataType.LONG, true));
+    _helixResourceManager.updateSchema(schema, false);
+    // Wait for at most 10 seconds for the callback to update the schema in 
the cache
+    // NOTE: schema should never be null during the transitioning
+    TestUtils.waitForCondition(aVoid -> 
Preconditions.checkNotNull(tableCache.getSchema("testTable")).equals(schema),
+        10_000L, "Failed to update the schema in the cache");
+    assertEquals(tableCache.getActualTableName("TeStTaBlE"), "testTable");
+    assertEquals(tableCache.getActualTableName("TeStTaBlE_oFfLiNe"), 
"testTable_OFFLINE");
+    assertNull(tableCache.getActualTableName("testTable_REALTIME"));
+    Map<String, String> expectedColumnMap = new HashMap<>();
+    expectedColumnMap.put("testcolumn", "testColumn");
+    expectedColumnMap.put("newcolumn", "newColumn");
+    assertEquals(tableCache.getColumnNameMap("testTable"), expectedColumnMap);
+    assertEquals(tableCache.getTableConfig("testTable_OFFLINE"), tableConfig);
+    assertEquals(tableCache.getSchema("testTable"), schema);
+
+    // Create a new case-sensitive TableCache which should load the existing 
table config and schema
+    TableCache caseSensitiveTableCache = new TableCache(_propertyStore, false);
+    assertFalse(caseSensitiveTableCache.isCaseInsensitive());
+    // Getting actual table name or column name map should throw exception for 
case-sensitive TableCache
+    try {
+      caseSensitiveTableCache.getActualTableName("testTable");
+      fail();
+    } catch (Exception e) {
+      // Expected
+    }
+    try {
+      caseSensitiveTableCache.getColumnNameMap("testTable");
+      fail();
+    } catch (Exception e) {
+      // Expected
+    }
+    assertEquals(tableCache.getTableConfig("testTable_OFFLINE"), tableConfig);
+    assertEquals(tableCache.getSchema("testTable"), schema);
+
+    // Remove the table config
+    _helixResourceManager.deleteOfflineTable("testTable");
+    // Wait for at most 10 seconds for the callback to remove the table config 
from the cache
+    TestUtils.waitForCondition(aVoid -> 
tableCache.getTableConfig("testTable_OFFLINE") == null, 10_000L,
+        "Failed to remove the table config from the cache");
+    // Case-insensitive table name are handled based on the table config 
instead of the schema
+    assertNull(tableCache.getActualTableName("testTable"));
+    assertEquals(tableCache.getColumnNameMap("testTable"), expectedColumnMap);
+    assertNull(tableCache.getTableConfig("testTable_OFFLINE"));
+    assertEquals(tableCache.getSchema("testTable"), schema);
+
+    // Remove the schema
+    _helixResourceManager.deleteSchema(schema);
+    // Wait for at most 10 seconds for the callback to remove the schema from 
the cache
+    TestUtils.waitForCondition(aVoid -> tableCache.getSchema("testTable") == 
null, 10_000L,
+        "Failed to remove the schema from the cache");
+    assertNull(tableCache.getActualTableName("testTable"));
+    assertNull(tableCache.getColumnNameMap("testTable"));
+    assertNull(tableCache.getTableConfig("testTable_OFFLINE"));
+    assertNull(tableCache.getSchema("testTable"));
+  }
+
+  @AfterClass
+  public void tearDown() {
+    stopController();
+    stopZk();
+  }
+}
diff --git a/pinot-spi/pom.xml b/pinot-spi/pom.xml
index ab433e7..9e4535e 100644
--- a/pinot-spi/pom.xml
+++ b/pinot-spi/pom.xml
@@ -92,6 +92,10 @@
       <artifactId>commons-lang3</artifactId>
     </dependency>
     <dependency>
+      <groupId>commons-collections</groupId>
+      <artifactId>commons-collections</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
diff --git a/pom.xml b/pom.xml
index c59189a..a3f23b7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -402,11 +402,11 @@
         <artifactId>commons-lang3</artifactId>
         <version>3.5</version>
       </dependency>
-    <dependency>
-      <groupId>commons-collections</groupId>
-      <artifactId>commons-collections</artifactId>
-      <version>3.2.1</version>
-    </dependency>
+      <dependency>
+        <groupId>commons-collections</groupId>
+        <artifactId>commons-collections</artifactId>
+        <version>3.2.1</version>
+      </dependency>
       <dependency>
         <groupId>commons-configuration</groupId>
         <artifactId>commons-configuration</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to