kishoreg commented on a change in pull request #5780:
URL: https://github.com/apache/incubator-pinot/pull/5780#discussion_r463424152



##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
##########
@@ -18,190 +18,308 @@
  */
 package org.apache.pinot.common.utils.helix;
 
-import java.util.Collection;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- *  Caches table config and schema of a table.
- *  At the start - loads all the table configs and schemas in map.
- *  sets up a zookeeper listener that watches for any change and updates the 
cache.
- *  TODO: optimize to load only changed table configs/schema on a callback.
- *  TODO: Table deletes are not handled as of now
- *  Goal is to eventually grow this into a PinotClusterDataAccessor
+ * The {@code TableCache} caches all the table configs and schemas within the 
cluster, and listens on ZK changes to keep
+ * them in sync. It also maintains the table name map and the column name map 
for case-insensitive queries.
  */
 public class TableCache {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TableCache.class);
+  private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE";
+  private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
+  private static final String SCHEMA_PARENT_PATH = "/SCHEMAS";
+  private static final String SCHEMA_PATH_PREFIX = "/SCHEMAS/";
+  private static final String LOWER_CASE_OFFLINE_TABLE_SUFFIX = "_offline";
+  private static final String LOWER_CASE_REALTIME_TABLE_SUFFIX = "_realtime";
 
-  private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS";
-  private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX = 
"/CONFIGS/TABLE";
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  private final boolean _caseInsensitive;
+  // For case insensitive, key is lower case table name, value is actual table 
name
+  private final Map<String, String> _tableNameMap;
 
-  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
-  TableConfigChangeListener _tableConfigChangeListener;
-  SchemaChangeListener _schemaChangeListener;
+  // Key is table name with type suffix
+  private final TableConfigChangeListener _tableConfigChangeListener = new 
TableConfigChangeListener();
+  private final Map<String, TableConfig> _tableConfigMap = new 
ConcurrentHashMap<>();
+  // Key is raw table name
+  private final SchemaChangeListener _schemaChangeListener = new 
SchemaChangeListener();
+  private final Map<String, SchemaInfo> _schemaInfoMap = new 
ConcurrentHashMap<>();
 
-  public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+  public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore, boolean 
caseInsensitive) {
     _propertyStore = propertyStore;
-    _schemaChangeListener = new SchemaChangeListener();
-    _schemaChangeListener.refresh();
-    _tableConfigChangeListener = new TableConfigChangeListener();
-    _tableConfigChangeListener.refresh();
+    _caseInsensitive = caseInsensitive;
+    _tableNameMap = caseInsensitive ? new ConcurrentHashMap<>() : null;
+
+    synchronized (_tableConfigChangeListener) {
+      // Subscribe child changes before reading the data to avoid missing 
changes
+      _propertyStore.subscribeChildChanges(TABLE_CONFIG_PARENT_PATH, 
_tableConfigChangeListener);
+
+      List<String> tables = 
_propertyStore.getChildNames(TABLE_CONFIG_PARENT_PATH, AccessOption.PERSISTENT);
+      if (CollectionUtils.isNotEmpty(tables)) {
+        List<String> pathsToAdd = new ArrayList<>(tables.size());
+        for (String tableNameWithType : tables) {
+          pathsToAdd.add(TABLE_CONFIG_PATH_PREFIX + tableNameWithType);
+        }
+        addTableConfigs(pathsToAdd);
+      }
+    }
+
+    synchronized (_schemaChangeListener) {
+      // Subscribe child changes before reading the data to avoid missing 
changes
+      _propertyStore.subscribeChildChanges(SCHEMA_PARENT_PATH, 
_schemaChangeListener);
+
+      List<String> tables = _propertyStore.getChildNames(SCHEMA_PARENT_PATH, 
AccessOption.PERSISTENT);
+      if (CollectionUtils.isNotEmpty(tables)) {
+        List<String> pathsToAdd = new ArrayList<>(tables.size());
+        for (String rawTableName : tables) {
+          pathsToAdd.add(SCHEMA_PATH_PREFIX + rawTableName);
+        }
+        addSchemas(pathsToAdd);
+      }
+    }
+
+    LOGGER.info("Initialized TableCache with caseInsensitive: {}", 
caseInsensitive);
   }
 
-  public String getActualTableName(String tableName) {
-    return 
_tableConfigChangeListener._tableNameMap.getOrDefault(tableName.toLowerCase(), 
tableName);
+  /**
+   * Returns {@code true} if the TableCache is case-insensitive, {@code false} 
otherwise.
+   */
+  public boolean isCaseInsensitive() {
+    return _caseInsensitive;
   }
 
-  public boolean containsTable(String tableName) {
-    return 
_tableConfigChangeListener._tableNameMap.containsKey(tableName.toLowerCase());
+  /**
+   * For case-insensitive only, returns the actual table name for the given 
case-insensitive table name (with or without
+   * type suffix), or {@code null} if the table does not exist.
+   */
+  @Nullable
+  public String getActualTableName(String caseInsensitiveTableName) {
+    Preconditions.checkState(_caseInsensitive, "TableCache is not 
case-insensitive");
+    return _tableNameMap.get(caseInsensitiveTableName.toLowerCase());
   }
 
-  public String getActualColumnName(String tableName, String columnName) {
-    String schemaName = 
_tableConfigChangeListener._table2SchemaConfigMap.get(tableName.toLowerCase());
-    if (schemaName != null) {
-      String actualColumnName = 
_schemaChangeListener.getColumnName(schemaName, columnName);
-      // If actual column name doesn't exist in schema, then return the origin 
column name.
-      if (actualColumnName == null) {
-        return columnName;
+  /**
+   * For case-insensitive only, returns a map from lower case column name to 
actual column name for the given table, or
+   * {@code null} if the table schema does not exist.
+   */
+  @Nullable
+  public Map<String, String> getColumnNameMap(String rawTableName) {
+    Preconditions.checkState(_caseInsensitive, "TableCache is not 
case-insensitive");
+    SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
+    return schemaInfo != null ? schemaInfo._columnNameMap : null;
+  }
+
+  /**
+   * Returns the table config for the given table, or {@code null} if it does 
not exist.
+   */
+  @Nullable
+  public TableConfig getTableConfig(String tableNameWithType) {
+    return _tableConfigMap.get(tableNameWithType);
+  }
+
+  /**
+   * Returns the schema for the given table, or {@code null} if it does not 
exist.
+   */
+  @Nullable
+  public Schema getSchema(String rawTableName) {
+    SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
+    return schemaInfo != null ? schemaInfo._schema : null;
+  }
+
+  private void addTableConfigs(List<String> paths) {
+    // Subscribe data changes before reading the data to avoid missing changes
+    for (String path : paths) {
+      _propertyStore.subscribeDataChanges(path, _tableConfigChangeListener);
+    }
+    List<ZNRecord> znRecords = _propertyStore.get(paths, null, 
AccessOption.PERSISTENT);
+    for (ZNRecord znRecord : znRecords) {
+      if (znRecord != null) {
+        try {
+          putTableConfig(znRecord);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while adding table config for 
ZNRecord: {}", znRecord.getId(), e);
+        }
       }
-      return actualColumnName;
     }
-    return columnName;
   }
 
-  public TableConfig getTableConfig(String tableName) {
-    return _tableConfigChangeListener._tableConfigMap.get(tableName);
+  private void putTableConfig(ZNRecord znRecord)
+      throws IOException {
+    TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord);
+    String tableNameWithType = tableConfig.getTableName();
+    _tableConfigMap.put(tableNameWithType, tableConfig);
+    if (_caseInsensitive) {
+      _tableNameMap.put(tableNameWithType.toLowerCase(), tableNameWithType);
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      _tableNameMap.put(rawTableName.toLowerCase(), rawTableName);
+    }
   }
 
-  class TableConfigChangeListener implements IZkChildListener, IZkDataListener 
{
-
-    Map<String, TableConfig> _tableConfigMap = new ConcurrentHashMap<>();
-    Map<String, String> _tableNameMap = new ConcurrentHashMap<>();
-    Map<String, String> _table2SchemaConfigMap = new ConcurrentHashMap<>();
-
-    public synchronized void refresh() {
-      try {
-        //always subscribe first before reading, so that we dont miss any 
changes
-        
_propertyStore.subscribeChildChanges(PROPERTYSTORE_TABLE_CONFIGS_PREFIX, 
_tableConfigChangeListener);
-        
_propertyStore.subscribeDataChanges(PROPERTYSTORE_TABLE_CONFIGS_PREFIX, 
_tableConfigChangeListener);
-        List<ZNRecord> children =
-            _propertyStore.getChildren(PROPERTYSTORE_TABLE_CONFIGS_PREFIX, 
null, AccessOption.PERSISTENT);
-        if (children != null) {
-          for (ZNRecord znRecord : children) {
-            try {
-              TableConfig tableConfig = 
TableConfigUtils.fromZNRecord(znRecord);
-              String tableNameWithType = tableConfig.getTableName();
-              _tableConfigMap.put(tableNameWithType, tableConfig);
-              String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
-              //create case insensitive mapping
-              _tableNameMap.put(tableNameWithType.toLowerCase(), 
tableNameWithType);
-              _tableNameMap.put(rawTableName.toLowerCase(), rawTableName);
-              //create case insensitive mapping between table name and 
schemaName
-              _table2SchemaConfigMap.put(tableNameWithType.toLowerCase(), 
rawTableName);
-              _table2SchemaConfigMap.put(rawTableName.toLowerCase(), 
rawTableName);
-            } catch (Exception e) {
-              LOGGER.warn("Exception loading table config for: {}: {}", 
znRecord.getId(), e.getMessage());
-              //ignore
-            }
-          }
+  private void removeTableConfig(String path) {
+    _propertyStore.unsubscribeDataChanges(path, _tableConfigChangeListener);
+    String tableNameWithType = 
path.substring(TABLE_CONFIG_PATH_PREFIX.length());
+    _tableConfigMap.remove(tableNameWithType);
+    if (_caseInsensitive) {
+      _tableNameMap.remove(tableNameWithType.toLowerCase());
+      String lowerCaseRawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType).toLowerCase();
+      if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+        if (!_tableNameMap.containsKey(lowerCaseRawTableName + 
LOWER_CASE_REALTIME_TABLE_SUFFIX)) {
+          _tableNameMap.remove(lowerCaseRawTableName);
+        }
+      } else {
+        if (!_tableNameMap.containsKey(lowerCaseRawTableName + 
LOWER_CASE_OFFLINE_TABLE_SUFFIX)) {
+          _tableNameMap.remove(lowerCaseRawTableName);
         }
-      } catch (Exception e) {
-        LOGGER.warn("Exception subscribing/reading tableconfigs", e);
-        //ignore
       }
     }
+  }
+
+  private void addSchemas(List<String> paths) {
+    // Subscribe data changes before reading the data to avoid missing changes
+    for (String path : paths) {
+      _propertyStore.subscribeDataChanges(path, _schemaChangeListener);
+    }
+    List<ZNRecord> znRecords = _propertyStore.get(paths, null, 
AccessOption.PERSISTENT);
+    for (ZNRecord znRecord : znRecords) {
+      if (znRecord != null) {
+        try {
+          putSchema(znRecord);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while adding schema for ZNRecord: 
{}", znRecord.getId(), e);
+        }
+      }
+    }
+  }
+
+  private void putSchema(ZNRecord znRecord)
+      throws IOException {
+    Schema schema = SchemaUtils.fromZNRecord(znRecord);
+    String rawTableName = schema.getSchemaName();
+    if (_caseInsensitive) {
+      Map<String, String> columnNameMap = new HashMap<>();
+      for (String columnName : schema.getColumnNames()) {
+        columnNameMap.put(columnName.toLowerCase(), columnName);
+      }
+      _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, columnNameMap));
+    } else {
+      _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, null));
+    }
+  }
+
+  private void removeSchema(String path) {
+    _propertyStore.unsubscribeDataChanges(path, _schemaChangeListener);
+    String rawTableName = path.substring(SCHEMA_PATH_PREFIX.length());
+    _schemaInfoMap.remove(rawTableName);
+  }
+
+  private class TableConfigChangeListener implements IZkChildListener, 
IZkDataListener {
 
     @Override
-    public void handleChildChange(String s, List<String> list)
-        throws Exception {
-      refresh();
+    public synchronized void handleChildChange(String path, List<String> 
tables) {

Review comment:
       this will miss watches if there was a new table added while processing 
the callback right?

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
##########
@@ -18,190 +18,308 @@
  */
 package org.apache.pinot.common.utils.helix;
 
-import java.util.Collection;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- *  Caches table config and schema of a table.
- *  At the start - loads all the table configs and schemas in map.
- *  sets up a zookeeper listener that watches for any change and updates the 
cache.
- *  TODO: optimize to load only changed table configs/schema on a callback.
- *  TODO: Table deletes are not handled as of now
- *  Goal is to eventually grow this into a PinotClusterDataAccessor
+ * The {@code TableCache} caches all the table configs and schemas within the 
cluster, and listens on ZK changes to keep
+ * them in sync. It also maintains the table name map and the column name map 
for case-insensitive queries.
  */
 public class TableCache {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TableCache.class);
+  private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE";
+  private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
+  private static final String SCHEMA_PARENT_PATH = "/SCHEMAS";
+  private static final String SCHEMA_PATH_PREFIX = "/SCHEMAS/";
+  private static final String LOWER_CASE_OFFLINE_TABLE_SUFFIX = "_offline";
+  private static final String LOWER_CASE_REALTIME_TABLE_SUFFIX = "_realtime";
 
-  private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS";
-  private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX = 
"/CONFIGS/TABLE";
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  private final boolean _caseInsensitive;
+  // For case insensitive, key is lower case table name, value is actual table 
name
+  private final Map<String, String> _tableNameMap;
 
-  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
-  TableConfigChangeListener _tableConfigChangeListener;
-  SchemaChangeListener _schemaChangeListener;
+  // Key is table name with type suffix
+  private final TableConfigChangeListener _tableConfigChangeListener = new 
TableConfigChangeListener();
+  private final Map<String, TableConfig> _tableConfigMap = new 
ConcurrentHashMap<>();
+  // Key is raw table name
+  private final SchemaChangeListener _schemaChangeListener = new 
SchemaChangeListener();
+  private final Map<String, SchemaInfo> _schemaInfoMap = new 
ConcurrentHashMap<>();
 
-  public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+  public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore, boolean 
caseInsensitive) {
     _propertyStore = propertyStore;
-    _schemaChangeListener = new SchemaChangeListener();
-    _schemaChangeListener.refresh();
-    _tableConfigChangeListener = new TableConfigChangeListener();
-    _tableConfigChangeListener.refresh();
+    _caseInsensitive = caseInsensitive;
+    _tableNameMap = caseInsensitive ? new ConcurrentHashMap<>() : null;
+
+    synchronized (_tableConfigChangeListener) {
+      // Subscribe child changes before reading the data to avoid missing 
changes
+      _propertyStore.subscribeChildChanges(TABLE_CONFIG_PARENT_PATH, 
_tableConfigChangeListener);
+
+      List<String> tables = 
_propertyStore.getChildNames(TABLE_CONFIG_PARENT_PATH, AccessOption.PERSISTENT);
+      if (CollectionUtils.isNotEmpty(tables)) {
+        List<String> pathsToAdd = new ArrayList<>(tables.size());
+        for (String tableNameWithType : tables) {
+          pathsToAdd.add(TABLE_CONFIG_PATH_PREFIX + tableNameWithType);
+        }
+        addTableConfigs(pathsToAdd);
+      }
+    }
+
+    synchronized (_schemaChangeListener) {
+      // Subscribe child changes before reading the data to avoid missing 
changes
+      _propertyStore.subscribeChildChanges(SCHEMA_PARENT_PATH, 
_schemaChangeListener);
+
+      List<String> tables = _propertyStore.getChildNames(SCHEMA_PARENT_PATH, 
AccessOption.PERSISTENT);
+      if (CollectionUtils.isNotEmpty(tables)) {
+        List<String> pathsToAdd = new ArrayList<>(tables.size());
+        for (String rawTableName : tables) {
+          pathsToAdd.add(SCHEMA_PATH_PREFIX + rawTableName);
+        }
+        addSchemas(pathsToAdd);
+      }
+    }
+
+    LOGGER.info("Initialized TableCache with caseInsensitive: {}", 
caseInsensitive);
   }
 
-  public String getActualTableName(String tableName) {
-    return 
_tableConfigChangeListener._tableNameMap.getOrDefault(tableName.toLowerCase(), 
tableName);
+  /**
+   * Returns {@code true} if the TableCache is case-insensitive, {@code false} 
otherwise.
+   */
+  public boolean isCaseInsensitive() {
+    return _caseInsensitive;
   }
 
-  public boolean containsTable(String tableName) {
-    return 
_tableConfigChangeListener._tableNameMap.containsKey(tableName.toLowerCase());
+  /**
+   * For case-insensitive only, returns the actual table name for the given 
case-insensitive table name (with or without
+   * type suffix), or {@code null} if the table does not exist.
+   */
+  @Nullable
+  public String getActualTableName(String caseInsensitiveTableName) {
+    Preconditions.checkState(_caseInsensitive, "TableCache is not 
case-insensitive");
+    return _tableNameMap.get(caseInsensitiveTableName.toLowerCase());
   }
 
-  public String getActualColumnName(String tableName, String columnName) {
-    String schemaName = 
_tableConfigChangeListener._table2SchemaConfigMap.get(tableName.toLowerCase());
-    if (schemaName != null) {
-      String actualColumnName = 
_schemaChangeListener.getColumnName(schemaName, columnName);
-      // If actual column name doesn't exist in schema, then return the origin 
column name.
-      if (actualColumnName == null) {
-        return columnName;
+  /**
+   * For case-insensitive only, returns a map from lower case column name to 
actual column name for the given table, or
+   * {@code null} if the table schema does not exist.
+   */
+  @Nullable
+  public Map<String, String> getColumnNameMap(String rawTableName) {
+    Preconditions.checkState(_caseInsensitive, "TableCache is not 
case-insensitive");
+    SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
+    return schemaInfo != null ? schemaInfo._columnNameMap : null;
+  }
+
+  /**
+   * Returns the table config for the given table, or {@code null} if it does 
not exist.
+   */
+  @Nullable
+  public TableConfig getTableConfig(String tableNameWithType) {
+    return _tableConfigMap.get(tableNameWithType);
+  }
+
+  /**
+   * Returns the schema for the given table, or {@code null} if it does not 
exist.
+   */
+  @Nullable
+  public Schema getSchema(String rawTableName) {
+    SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
+    return schemaInfo != null ? schemaInfo._schema : null;
+  }
+
+  private void addTableConfigs(List<String> paths) {
+    // Subscribe data changes before reading the data to avoid missing changes
+    for (String path : paths) {
+      _propertyStore.subscribeDataChanges(path, _tableConfigChangeListener);
+    }
+    List<ZNRecord> znRecords = _propertyStore.get(paths, null, 
AccessOption.PERSISTENT);
+    for (ZNRecord znRecord : znRecords) {
+      if (znRecord != null) {
+        try {
+          putTableConfig(znRecord);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while adding table config for 
ZNRecord: {}", znRecord.getId(), e);
+        }
       }
-      return actualColumnName;
     }
-    return columnName;
   }
 
-  public TableConfig getTableConfig(String tableName) {
-    return _tableConfigChangeListener._tableConfigMap.get(tableName);
+  private void putTableConfig(ZNRecord znRecord)
+      throws IOException {
+    TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord);
+    String tableNameWithType = tableConfig.getTableName();
+    _tableConfigMap.put(tableNameWithType, tableConfig);
+    if (_caseInsensitive) {
+      _tableNameMap.put(tableNameWithType.toLowerCase(), tableNameWithType);
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      _tableNameMap.put(rawTableName.toLowerCase(), rawTableName);
+    }
   }
 
-  class TableConfigChangeListener implements IZkChildListener, IZkDataListener 
{
-
-    Map<String, TableConfig> _tableConfigMap = new ConcurrentHashMap<>();
-    Map<String, String> _tableNameMap = new ConcurrentHashMap<>();
-    Map<String, String> _table2SchemaConfigMap = new ConcurrentHashMap<>();
-
-    public synchronized void refresh() {
-      try {
-        //always subscribe first before reading, so that we dont miss any 
changes
-        
_propertyStore.subscribeChildChanges(PROPERTYSTORE_TABLE_CONFIGS_PREFIX, 
_tableConfigChangeListener);
-        
_propertyStore.subscribeDataChanges(PROPERTYSTORE_TABLE_CONFIGS_PREFIX, 
_tableConfigChangeListener);
-        List<ZNRecord> children =
-            _propertyStore.getChildren(PROPERTYSTORE_TABLE_CONFIGS_PREFIX, 
null, AccessOption.PERSISTENT);
-        if (children != null) {
-          for (ZNRecord znRecord : children) {
-            try {
-              TableConfig tableConfig = 
TableConfigUtils.fromZNRecord(znRecord);
-              String tableNameWithType = tableConfig.getTableName();
-              _tableConfigMap.put(tableNameWithType, tableConfig);
-              String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
-              //create case insensitive mapping
-              _tableNameMap.put(tableNameWithType.toLowerCase(), 
tableNameWithType);
-              _tableNameMap.put(rawTableName.toLowerCase(), rawTableName);
-              //create case insensitive mapping between table name and 
schemaName
-              _table2SchemaConfigMap.put(tableNameWithType.toLowerCase(), 
rawTableName);
-              _table2SchemaConfigMap.put(rawTableName.toLowerCase(), 
rawTableName);
-            } catch (Exception e) {
-              LOGGER.warn("Exception loading table config for: {}: {}", 
znRecord.getId(), e.getMessage());
-              //ignore
-            }
-          }
+  private void removeTableConfig(String path) {
+    _propertyStore.unsubscribeDataChanges(path, _tableConfigChangeListener);
+    String tableNameWithType = 
path.substring(TABLE_CONFIG_PATH_PREFIX.length());
+    _tableConfigMap.remove(tableNameWithType);
+    if (_caseInsensitive) {
+      _tableNameMap.remove(tableNameWithType.toLowerCase());
+      String lowerCaseRawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType).toLowerCase();
+      if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+        if (!_tableNameMap.containsKey(lowerCaseRawTableName + 
LOWER_CASE_REALTIME_TABLE_SUFFIX)) {
+          _tableNameMap.remove(lowerCaseRawTableName);
+        }
+      } else {
+        if (!_tableNameMap.containsKey(lowerCaseRawTableName + 
LOWER_CASE_OFFLINE_TABLE_SUFFIX)) {
+          _tableNameMap.remove(lowerCaseRawTableName);
         }
-      } catch (Exception e) {
-        LOGGER.warn("Exception subscribing/reading tableconfigs", e);
-        //ignore
       }
     }
+  }
+
+  private void addSchemas(List<String> paths) {
+    // Subscribe data changes before reading the data to avoid missing changes
+    for (String path : paths) {
+      _propertyStore.subscribeDataChanges(path, _schemaChangeListener);
+    }
+    List<ZNRecord> znRecords = _propertyStore.get(paths, null, 
AccessOption.PERSISTENT);
+    for (ZNRecord znRecord : znRecords) {
+      if (znRecord != null) {
+        try {
+          putSchema(znRecord);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while adding schema for ZNRecord: 
{}", znRecord.getId(), e);
+        }
+      }
+    }
+  }
+
+  private void putSchema(ZNRecord znRecord)
+      throws IOException {
+    Schema schema = SchemaUtils.fromZNRecord(znRecord);
+    String rawTableName = schema.getSchemaName();
+    if (_caseInsensitive) {
+      Map<String, String> columnNameMap = new HashMap<>();
+      for (String columnName : schema.getColumnNames()) {
+        columnNameMap.put(columnName.toLowerCase(), columnName);
+      }
+      _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, columnNameMap));
+    } else {
+      _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, null));
+    }
+  }
+
+  private void removeSchema(String path) {
+    _propertyStore.unsubscribeDataChanges(path, _schemaChangeListener);
+    String rawTableName = path.substring(SCHEMA_PATH_PREFIX.length());
+    _schemaInfoMap.remove(rawTableName);
+  }
+
+  private class TableConfigChangeListener implements IZkChildListener, 
IZkDataListener {
 
     @Override
-    public void handleChildChange(String s, List<String> list)
-        throws Exception {
-      refresh();
+    public synchronized void handleChildChange(String path, List<String> 
tables) {
+      if (CollectionUtils.isEmpty(tables)) {
+        return;
+      }
+
+      // Only process new added table configs. Changed/removed table configs 
are handled by other callbacks.
+      List<String> pathsToAdd = new ArrayList<>();
+      for (String tableNameWithType : tables) {
+        if (!_tableConfigMap.containsKey(tableNameWithType)) {
+          pathsToAdd.add(TABLE_CONFIG_PATH_PREFIX + tableNameWithType);
+        }
+      }
+      if (!pathsToAdd.isEmpty()) {
+        addTableConfigs(pathsToAdd);
+      }
     }
 
     @Override
-    public void handleDataChange(String s, Object o)
-        throws Exception {
-      refresh();
+    public synchronized void handleDataChange(String path, Object data) {
+      if (data != null) {
+        ZNRecord znRecord = (ZNRecord) data;
+        try {
+          putTableConfig(znRecord);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while refreshing table config for 
ZNRecord: {}", znRecord.getId(), e);
+        }
+      }
     }
 
     @Override
-    public void handleDataDeleted(String s)
-        throws Exception {
-      refresh();
+    public synchronized void handleDataDeleted(String path) {
+      // NOTE: The path here is the absolute ZK path instead of the relative 
path to the property store.
+      String tableNameWithType = path.substring(path.lastIndexOf('/') + 1);
+      removeTableConfig(TABLE_CONFIG_PATH_PREFIX + tableNameWithType);
     }
   }
 
-  class SchemaChangeListener implements IZkChildListener, IZkDataListener {
-    Map<String, Map<String, String>> _schemaColumnMap = new 
ConcurrentHashMap<>();
-
-    public synchronized void refresh() {

Review comment:
       this method was good, only thing we had to handle was calling subscribe 
on each child node. This is a much better way to handle ZK callbacks instead of 
trying to handle every callback with custom logic




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to