This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch table_cache in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit f64b211c9980695ac96aba11f67b026aa1f6ec8c Author: Xiaotian (Jackie) Jiang <jackie....@gmail.com> AuthorDate: Thu Jul 30 18:44:04 2020 -0700 Refactor TableCache --- .../broker/broker/helix/HelixBrokerStarter.java | 35 +- .../requesthandler/BaseBrokerRequestHandler.java | 166 +++++----- .../SingleConnectionBrokerRequestHandler.java | 11 +- .../LiteralOnlyBrokerRequestTest.java | 16 +- .../pinot/common/utils/helix/TableCache.java | 362 ++++++++++++++------- .../helix/core/PinotHelixResourceManager.java | 51 +-- .../pinot/controller/helix/TableCacheTest.java | 168 ++++++++++ pinot-spi/pom.xml | 4 + pom.xml | 10 +- 9 files changed, 558 insertions(+), 265 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java index 3617a82..9ba8207 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java @@ -18,17 +18,14 @@ */ package org.apache.pinot.broker.broker.helix; +import com.google.common.collect.ImmutableList; +import com.yammer.metrics.core.MetricsRegistry; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; - import javax.annotation.Nullable; - -import org.apache.commons.configuration.BaseConfiguration; -import org.apache.commons.configuration.Configuration; -import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixConstants.ChangeType; import org.apache.helix.HelixDataAccessor; @@ -61,15 +58,13 @@ import org.apache.pinot.common.utils.CommonConstants.Helix; import org.apache.pinot.common.utils.NetUtil; import org.apache.pinot.common.utils.ServiceStatus; import org.apache.pinot.common.utils.config.TagNameUtils; +import org.apache.pinot.common.utils.helix.TableCache; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.services.ServiceRole; import org.apache.pinot.spi.services.ServiceStartable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.ImmutableList; -import com.yammer.metrics.core.MetricsRegistry; - @SuppressWarnings("unused") public class HelixBrokerStarter implements ServiceStartable { @@ -106,7 +101,8 @@ public class HelixBrokerStarter implements ServiceStartable { this(brokerConf, clusterName, zkServer, null); } - public HelixBrokerStarter(PinotConfiguration brokerConf, String clusterName, String zkServer, @Nullable String brokerHost) + public HelixBrokerStarter(PinotConfiguration brokerConf, String clusterName, String zkServer, + @Nullable String brokerHost) throws Exception { _brokerConf = brokerConf; setupHelixSystemProperties(); @@ -190,17 +186,15 @@ public class HelixBrokerStarter implements ServiceStartable { _helixAdmin = _spectatorHelixManager.getClusterManagmentTool(); _propertyStore = _spectatorHelixManager.getHelixPropertyStore(); _helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor(); + // Fetch cluster level config from ZK - ConfigAccessor configAccessor = _spectatorHelixManager.getConfigAccessor(); HelixConfigScope helixConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(_clusterName).build(); - Map<String, String> configMap = configAccessor.get(helixConfigScope, Arrays + Map<String, String> configMap = _helixAdmin.getConfig(helixConfigScope, Arrays .asList(Helix.ENABLE_CASE_INSENSITIVE_KEY, Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY, Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY)); - if (Boolean.parseBoolean(configMap.get(Helix.ENABLE_CASE_INSENSITIVE_KEY)) || Boolean - .parseBoolean(configMap.get(Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY))) { - _brokerConf.setProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, true); - } + boolean caseInsensitive = Boolean.parseBoolean(configMap.get(Helix.ENABLE_CASE_INSENSITIVE_KEY)) || Boolean + .parseBoolean(configMap.get(Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY)); String log2mStr = configMap.get(Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY); if (log2mStr != null) { try { @@ -233,9 +227,10 @@ public class HelixBrokerStarter implements ServiceStartable { queryQuotaManager.init(_spectatorHelixManager); // Initialize FunctionRegistry before starting the broker request handler FunctionRegistry.init(); + TableCache tableCache = new TableCache(_propertyStore, caseInsensitive); _brokerRequestHandler = new SingleConnectionBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory, queryQuotaManager, - _brokerMetrics, _propertyStore); + tableCache, _brokerMetrics); int brokerQueryPort = _brokerConf.getProperty(Helix.KEY_OF_BROKER_QUERY_PORT, Helix.DEFAULT_BROKER_QUERY_PORT); LOGGER.info("Starting broker admin application on port: {}", brokerQueryPort); @@ -311,8 +306,9 @@ public class HelixBrokerStarter implements ServiceStartable { } } - double minResourcePercentForStartup = _brokerConf.getProperty( - Broker.CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START, Broker.DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START); + double minResourcePercentForStartup = _brokerConf + .getProperty(Broker.CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START, + Broker.DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START); LOGGER.info("Registering service status handler"); ServiceStatus.setServiceStatusCallback(_brokerId, new ServiceStatus.MultipleCallbackServiceStatusCallback( @@ -395,7 +391,8 @@ public class HelixBrokerStarter implements ServiceStartable { return _brokerRequestHandler; } - public static HelixBrokerStarter getDefault() throws Exception { + public static HelixBrokerStarter getDefault() + throws Exception { Map<String, Object> properties = new HashMap<>(); properties.put(Helix.KEY_OF_BROKER_QUERY_PORT, 5001); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index e442b5d..7bb8c7a 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -18,6 +18,10 @@ */ package org.apache.pinot.broker.requesthandler; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Splitter; +import com.google.common.util.concurrent.RateLimiter; import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; @@ -31,14 +35,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; - import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; - import org.apache.calcite.sql.SqlKind; import org.apache.commons.lang3.StringUtils; -import org.apache.helix.ZNRecord; -import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.broker.api.RequestStatistics; import org.apache.pinot.broker.api.RequesterIdentity; import org.apache.pinot.broker.broker.AccessControlFactory; @@ -85,11 +85,6 @@ import org.apache.pinot.sql.parsers.CalciteSqlParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Splitter; -import com.google.common.util.concurrent.RateLimiter; - @ThreadSafe public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { @@ -99,6 +94,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { protected final RoutingManager _routingManager; protected final AccessControlFactory _accessControlFactory; protected final QueryQuotaManager _queryQuotaManager; + protected final TableCache _tableCache; protected final BrokerMetrics _brokerMetrics; protected final AtomicLong _requestIdGenerator = new AtomicLong(); @@ -115,36 +111,29 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { private final RateLimiter _numDroppedLogRateLimiter; private final AtomicInteger _numDroppedLog; - private final boolean _enableCaseInsensitive; private final boolean _enableQueryLimitOverride; private final int _defaultHllLog2m; - private final TableCache _tableCache; public BaseBrokerRequestHandler(PinotConfiguration config, RoutingManager routingManager, - AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics, - ZkHelixPropertyStore<ZNRecord> propertyStore) { + AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, + BrokerMetrics brokerMetrics) { _config = config; _routingManager = routingManager; _accessControlFactory = accessControlFactory; _queryQuotaManager = queryQuotaManager; + _tableCache = tableCache; _brokerMetrics = brokerMetrics; - _enableCaseInsensitive = _config.getProperty(CommonConstants.Helix.ENABLE_CASE_INSENSITIVE_KEY, false); - if (_enableCaseInsensitive) { - _tableCache = new TableCache(propertyStore); - } else { - _tableCache = null; - } - _defaultHllLog2m = _config - .getProperty(CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY, CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M); - + _defaultHllLog2m = _config.getProperty(CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY, + CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M); _enableQueryLimitOverride = _config.getProperty(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, false); _brokerId = config.getProperty(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId()); _brokerTimeoutMs = config.getProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, Broker.DEFAULT_BROKER_TIMEOUT_MS); _queryResponseLimit = config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT); - _queryLogLength = config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_LOG_LENGTH, Broker.DEFAULT_BROKER_QUERY_LOG_LENGTH); + _queryLogLength = + config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_LOG_LENGTH, Broker.DEFAULT_BROKER_QUERY_LOG_LENGTH); _queryLogRateLimiter = RateLimiter.create(config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND, Broker.DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND)); _numDroppedLog = new AtomicInteger(0); @@ -201,7 +190,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { e.getMessage()); } } - updateQuerySource(brokerRequest); + updateTableName(brokerRequest); try { updateColumnNames(brokerRequest); } catch (Exception e) { @@ -442,43 +431,60 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { * * Only update TableName in QuerySource if there is no existing table in the format of [database_name].[table_name], * but only [table_name]. - * - * @param brokerRequest */ - private void updateQuerySource(BrokerRequest brokerRequest) { + private void updateTableName(BrokerRequest brokerRequest) { String tableName = brokerRequest.getQuerySource().getTableName(); - // Check if table is in the format of [database_name].[table_name] - String[] tableNameSplits = StringUtils.split(tableName, ".", 2); - // Update table name if there is no existing table in the format of [database_name].[table_name] but only [table_name] - if (_enableCaseInsensitive) { - if (tableNameSplits.length < 2) { - brokerRequest.getQuerySource().setTableName(_tableCache.getActualTableName(tableName)); + + // Use TableCache to handle case-insensitive table name + if (_tableCache.isCaseInsensitive()) { + String actualTableName = _tableCache.getActualTableName(tableName); + if (actualTableName != null) { + setTableName(brokerRequest, actualTableName); return; } - if (_tableCache.containsTable(tableNameSplits[1]) && !_tableCache.containsTable(tableName)) { - // Use TableCache to check case insensitive table name. - brokerRequest.getQuerySource().setTableName(_tableCache.getActualTableName(tableNameSplits[1])); + + // Check if table is in the format of [database_name].[table_name] + String[] tableNameSplits = StringUtils.split(tableName, ".", 2); + if (tableNameSplits.length == 2) { + actualTableName = _tableCache.getActualTableName(tableNameSplits[1]); + if (actualTableName != null) { + setTableName(brokerRequest, actualTableName); + return; + } } + return; } - if (tableNameSplits.length < 2) { + + // Check if table is in the format of [database_name].[table_name] + String[] tableNameSplits = StringUtils.split(tableName, ".", 2); + if (tableNameSplits.length != 2) { return; } - // Use RoutingManager to check case sensitive table name. + + // Use RoutingManager to handle case-sensitive table name + // Update table name if there is no existing table in the format of [database_name].[table_name] but only [table_name] if (TableNameBuilder.isTableResource(tableName)) { if (_routingManager.routingExists(tableNameSplits[1]) && !_routingManager.routingExists(tableName)) { - brokerRequest.getQuerySource().setTableName(tableNameSplits[1]); + setTableName(brokerRequest, tableNameSplits[1]); } return; } + if (_routingManager.routingExists(TableNameBuilder.OFFLINE.tableNameWithType(tableNameSplits[1])) + && !_routingManager.routingExists(TableNameBuilder.OFFLINE.tableNameWithType(tableName))) { + setTableName(brokerRequest, tableNameSplits[1]); + return; + } if (_routingManager.routingExists(TableNameBuilder.REALTIME.tableNameWithType(tableNameSplits[1])) && !_routingManager.routingExists(TableNameBuilder.REALTIME.tableNameWithType(tableName))) { - brokerRequest.getQuerySource().setTableName(tableNameSplits[1]); - return; + setTableName(brokerRequest, tableNameSplits[1]); } - if (_routingManager.routingExists(TableNameBuilder.OFFLINE.tableNameWithType(tableNameSplits[1])) - && !_routingManager.routingExists(TableNameBuilder.OFFLINE.tableNameWithType(tableName))) { - brokerRequest.getQuerySource().setTableName(tableNameSplits[1]); + } + + private void setTableName(BrokerRequest brokerRequest, String tableName) { + brokerRequest.getQuerySource().setTableName(tableName); + if (brokerRequest.getPinotQuery() != null) { + brokerRequest.getPinotQuery().getDataSource().setTableName(tableName); } } @@ -672,14 +678,15 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { * Fixes the column names to the actual column names in the given broker request. */ private void updateColumnNames(BrokerRequest brokerRequest) { - String tableName = brokerRequest.getQuerySource().getTableName(); - //fix columns + String rawTableName = TableNameBuilder.extractRawTableName(brokerRequest.getQuerySource().getTableName()); + Map<String, String> columnNameMap = + _tableCache.isCaseInsensitive() ? _tableCache.getColumnNameMap(rawTableName) : null; + if (brokerRequest.getFilterSubQueryMap() != null) { Collection<FilterQuery> values = brokerRequest.getFilterSubQueryMap().getFilterQueryMap().values(); for (FilterQuery filterQuery : values) { if (filterQuery.getNestedFilterQueryIdsSize() == 0) { - String expression = filterQuery.getColumn(); - filterQuery.setColumn(fixColumnName(tableName, expression)); + filterQuery.setColumn(fixColumnName(rawTableName, filterQuery.getColumn(), columnNameMap)); } } } @@ -688,14 +695,14 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { if (!info.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) { // Always read from backward compatible api in AggregationFunctionUtils. List<String> arguments = AggregationFunctionUtils.getArguments(info); - arguments.replaceAll(e -> fixColumnName(tableName, e)); + arguments.replaceAll(e -> fixColumnName(rawTableName, e, columnNameMap)); info.setExpressions(arguments); } } if (brokerRequest.isSetGroupBy()) { List<String> expressions = brokerRequest.getGroupBy().getExpressions(); for (int i = 0; i < expressions.size(); i++) { - expressions.set(i, fixColumnName(tableName, expressions.get(i))); + expressions.set(i, fixColumnName(rawTableName, expressions.get(i), columnNameMap)); } } } else { @@ -704,7 +711,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { for (int i = 0; i < selectionColumns.size(); i++) { String expression = selectionColumns.get(i); if (!expression.equals("*")) { - selectionColumns.set(i, fixColumnName(tableName, expression)); + selectionColumns.set(i, fixColumnName(rawTableName, expression, columnNameMap)); } } } @@ -712,86 +719,87 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { List<SelectionSort> orderBy = brokerRequest.getOrderBy(); for (SelectionSort selectionSort : orderBy) { String expression = selectionSort.getColumn(); - selectionSort.setColumn(fixColumnName(tableName, expression)); + selectionSort.setColumn(fixColumnName(rawTableName, expression, columnNameMap)); } } PinotQuery pinotQuery = brokerRequest.getPinotQuery(); if (pinotQuery != null) { - pinotQuery.getDataSource().setTableName(tableName); for (Expression expression : pinotQuery.getSelectList()) { - fixColumnName(tableName, expression); + fixColumnName(rawTableName, expression, columnNameMap); } Expression filterExpression = pinotQuery.getFilterExpression(); if (filterExpression != null) { - fixColumnName(tableName, filterExpression); + fixColumnName(rawTableName, filterExpression, columnNameMap); } List<Expression> groupByList = pinotQuery.getGroupByList(); if (groupByList != null) { for (Expression expression : groupByList) { - fixColumnName(tableName, expression); + fixColumnName(rawTableName, expression, columnNameMap); } } List<Expression> orderByList = pinotQuery.getOrderByList(); if (orderByList != null) { for (Expression expression : orderByList) { - fixColumnName(tableName, expression); + fixColumnName(rawTableName, expression, columnNameMap); } } Expression havingExpression = pinotQuery.getHavingExpression(); if (havingExpression != null) { - fixColumnName(tableName, havingExpression); + fixColumnName(rawTableName, havingExpression, columnNameMap); } } } - private String fixColumnName(String tableNameWithType, String expression) { + private String fixColumnName(String rawTableName, String expression, @Nullable Map<String, String> columnNameMap) { TransformExpressionTree expressionTree = TransformExpressionTree.compileToExpressionTree(expression); - fixColumnName(tableNameWithType, expressionTree); + fixColumnName(rawTableName, expressionTree, columnNameMap); return expressionTree.toString(); } - private void fixColumnName(String tableNameWithType, TransformExpressionTree expression) { + private void fixColumnName(String rawTableName, TransformExpressionTree expression, + @Nullable Map<String, String> columnNameMap) { TransformExpressionTree.ExpressionType expressionType = expression.getExpressionType(); if (expressionType == TransformExpressionTree.ExpressionType.IDENTIFIER) { - String identifier = expression.getValue(); - expression.setValue(getActualColumnName(tableNameWithType, identifier)); + expression.setValue(getActualColumnName(rawTableName, expression.getValue(), columnNameMap)); } else if (expressionType == TransformExpressionTree.ExpressionType.FUNCTION) { for (TransformExpressionTree child : expression.getChildren()) { - fixColumnName(tableNameWithType, child); + fixColumnName(rawTableName, child, columnNameMap); } } } - private void fixColumnName(String tableNameWithType, Expression expression) { + private void fixColumnName(String rawTableName, Expression expression, @Nullable Map<String, String> columnNameMap) { ExpressionType expressionType = expression.getType(); if (expressionType == ExpressionType.IDENTIFIER) { Identifier identifier = expression.getIdentifier(); - identifier.setName(getActualColumnName(tableNameWithType, identifier.getName())); + identifier.setName(getActualColumnName(rawTableName, identifier.getName(), columnNameMap)); } else if (expressionType == ExpressionType.FUNCTION) { for (Expression operand : expression.getFunctionCall().getOperands()) { - fixColumnName(tableNameWithType, operand); + fixColumnName(rawTableName, operand, columnNameMap); } } } - private String getActualColumnName(String tableNameWithType, String columnName) { + private String getActualColumnName(String rawTableName, String columnName, + @Nullable Map<String, String> columnNameMap) { + // Check if column is in the format of [table_name].[column_name] String[] splits = StringUtils.split(columnName, ".", 2); - if (_enableCaseInsensitive) { - if (splits.length == 2) { - if (TableNameBuilder.extractRawTableName(tableNameWithType).equalsIgnoreCase(splits[0])) { - return _tableCache.getActualColumnName(tableNameWithType, splits[1]); - } + if (_tableCache.isCaseInsensitive()) { + if (splits.length == 2 && rawTableName.equalsIgnoreCase(splits[0])) { + columnName = splits[1]; + } + if (columnNameMap != null) { + return columnNameMap.getOrDefault(columnName, columnName); + } else { + return columnName; } - return _tableCache.getActualColumnName(tableNameWithType, columnName); } else { - if (splits.length == 2) { - if (TableNameBuilder.extractRawTableName(tableNameWithType).equals(splits[0])) { - return splits[1]; - } + if (splits.length == 2 && rawTableName.equals(splits[0])) { + columnName = splits[1]; } + return columnName; } - return columnName; } private static Map<String, String> getOptionsFromJson(JsonNode request, String optionsKey) { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java index 420177a..1f67d88 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java @@ -21,12 +21,8 @@ package org.apache.pinot.broker.requesthandler; import java.util.HashMap; import java.util.List; import java.util.Map; - import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; - -import org.apache.helix.ZNRecord; -import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.broker.api.RequestStatistics; import org.apache.pinot.broker.broker.AccessControlFactory; import org.apache.pinot.broker.queryquota.QueryQuotaManager; @@ -41,6 +37,7 @@ import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.response.broker.QueryProcessingException; import org.apache.pinot.common.utils.DataTable; import org.apache.pinot.common.utils.HashUtil; +import org.apache.pinot.common.utils.helix.TableCache; import org.apache.pinot.core.transport.AsyncQueryResponse; import org.apache.pinot.core.transport.QueryRouter; import org.apache.pinot.core.transport.ServerInstance; @@ -59,9 +56,9 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl private final QueryRouter _queryRouter; public SingleConnectionBrokerRequestHandler(PinotConfiguration config, RoutingManager routingManager, - AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics, - ZkHelixPropertyStore<ZNRecord> propertyStore) { - super(config, routingManager, accessControlFactory, queryQuotaManager, brokerMetrics, propertyStore); + AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, + BrokerMetrics brokerMetrics) { + super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics); _queryRouter = new QueryRouter(_brokerId, brokerMetrics); } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java index 0dbb2e2..18f9e86 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java @@ -18,8 +18,10 @@ */ package org.apache.pinot.broker.requesthandler; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.yammer.metrics.core.MetricsRegistry; import java.util.Random; - import org.apache.pinot.broker.api.RequestStatistics; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.response.broker.BrokerResponseNative; @@ -30,10 +32,6 @@ import org.apache.pinot.sql.parsers.CalciteSqlCompiler; import org.testng.Assert; import org.testng.annotations.Test; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.yammer.metrics.core.MetricsRegistry; - public class LiteralOnlyBrokerRequestTest { private static final Random RANDOM = new Random(System.currentTimeMillis()); @@ -92,8 +90,8 @@ public class LiteralOnlyBrokerRequestTest { public void testBrokerRequestHandler() throws Exception { SingleConnectionBrokerRequestHandler requestHandler = - new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, null, - new BrokerMetrics("", new MetricsRegistry(), false), null); + new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, null, null, + new BrokerMetrics("", new MetricsRegistry(), false)); long randNum = RANDOM.nextLong(); byte[] randBytes = new byte[12]; RANDOM.nextBytes(randBytes); @@ -119,8 +117,8 @@ public class LiteralOnlyBrokerRequestTest { public void testBrokerRequestHandlerWithAsFunction() throws Exception { SingleConnectionBrokerRequestHandler requestHandler = - new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, null, - new BrokerMetrics("", new MetricsRegistry(), false), null); + new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, null, null, + new BrokerMetrics("", new MetricsRegistry(), false)); long currentTsMin = System.currentTimeMillis(); JsonNode request = new ObjectMapper().readTree( "{\"sql\":\"SELECT now() as currentTs, fromDateTime('2020-01-01 UTC', 'yyyy-MM-dd z') as firstDayOf2020\"}"); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java index 4e3dd4d..dbd360d 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java @@ -18,19 +18,23 @@ */ package org.apache.pinot.common.utils.helix; -import java.util.Collection; +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.Nullable; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; +import org.apache.commons.collections.CollectionUtils; import org.apache.helix.AccessOption; import org.apache.helix.ZNRecord; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.utils.SchemaUtils; import org.apache.pinot.common.utils.config.TableConfigUtils; import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; @@ -38,170 +42,284 @@ import org.slf4j.LoggerFactory; /** - * Caches table config and schema of a table. - * At the start - loads all the table configs and schemas in map. - * sets up a zookeeper listener that watches for any change and updates the cache. - * TODO: optimize to load only changed table configs/schema on a callback. - * TODO: Table deletes are not handled as of now - * Goal is to eventually grow this into a PinotClusterDataAccessor + * The {@code TableCache} caches all the table configs and schemas within the cluster, and listens on ZK changes to keep + * them in sync. It also maintains the table name map and the column name map for case-insensitive queries. */ public class TableCache { private static final Logger LOGGER = LoggerFactory.getLogger(TableCache.class); + private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE"; + private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/"; + private static final String SCHEMA_PARENT_PATH = "/SCHEMAS"; + private static final String SCHEMA_PATH_PREFIX = "/SCHEMAS/"; + private static final String LOWER_CASE_OFFLINE_TABLE_SUFFIX = "_offline"; + private static final String LOWER_CASE_REALTIME_TABLE_SUFFIX = "_realtime"; - private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS"; - private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX = "/CONFIGS/TABLE"; + private final ZkHelixPropertyStore<ZNRecord> _propertyStore; + private final boolean _caseInsensitive; + // For case insensitive, key is lower case table name, value is actual table name + private final Map<String, String> _tableNameMap; - private ZkHelixPropertyStore<ZNRecord> _propertyStore; - TableConfigChangeListener _tableConfigChangeListener; - SchemaChangeListener _schemaChangeListener; + // Key is table name with type suffix + private final TableConfigChangeListener _tableConfigChangeListener = new TableConfigChangeListener(); + private final Map<String, TableConfig> _tableConfigMap = new ConcurrentHashMap<>(); + // Key is raw table name + private final SchemaChangeListener _schemaChangeListener = new SchemaChangeListener(); + private final Map<String, SchemaInfo> _schemaInfoMap = new ConcurrentHashMap<>(); - public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore) { + public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore, boolean caseInsensitive) { _propertyStore = propertyStore; - _schemaChangeListener = new SchemaChangeListener(); - _schemaChangeListener.refresh(); - _tableConfigChangeListener = new TableConfigChangeListener(); - _tableConfigChangeListener.refresh(); + _caseInsensitive = caseInsensitive; + _tableNameMap = caseInsensitive ? new ConcurrentHashMap<>() : null; + + synchronized (_tableConfigChangeListener) { + // Subscribe child changes before reading the data to avoid missing changes + _propertyStore.subscribeChildChanges(TABLE_CONFIG_PARENT_PATH, _tableConfigChangeListener); + + List<String> tables = _propertyStore.getChildNames(TABLE_CONFIG_PARENT_PATH, AccessOption.PERSISTENT); + if (CollectionUtils.isNotEmpty(tables)) { + List<String> pathsToAdd = new ArrayList<>(tables.size()); + for (String tableNameWithType : tables) { + pathsToAdd.add(TABLE_CONFIG_PATH_PREFIX + tableNameWithType); + } + addTableConfigs(pathsToAdd); + } + } + + synchronized (_schemaChangeListener) { + // Subscribe child changes before reading the data to avoid missing changes + _propertyStore.subscribeChildChanges(SCHEMA_PARENT_PATH, _schemaChangeListener); + + List<String> tables = _propertyStore.getChildNames(SCHEMA_PARENT_PATH, AccessOption.PERSISTENT); + if (CollectionUtils.isNotEmpty(tables)) { + List<String> pathsToAdd = new ArrayList<>(tables.size()); + for (String rawTableName : tables) { + pathsToAdd.add(SCHEMA_PATH_PREFIX + rawTableName); + } + addSchemas(pathsToAdd); + } + } + + LOGGER.info("Initialized TableCache with caseInsensitive: {}", caseInsensitive); } - public String getActualTableName(String tableName) { - return _tableConfigChangeListener._tableNameMap.getOrDefault(tableName.toLowerCase(), tableName); + /** + * Returns {@code true} if the TableCache is case-insensitive, {@code false} otherwise. + */ + public boolean isCaseInsensitive() { + return _caseInsensitive; } - public boolean containsTable(String tableName) { - return _tableConfigChangeListener._tableNameMap.containsKey(tableName.toLowerCase()); + /** + * For case-insensitive only, returns the actual table name for the given case-insensitive table name (with or without + * type suffix), or {@code null} if the table does not exist. + */ + @Nullable + public String getActualTableName(String caseInsensitiveTableName) { + Preconditions.checkState(_caseInsensitive, "TableCache is not case-insensitive"); + return _tableNameMap.get(caseInsensitiveTableName.toLowerCase()); } - public String getActualColumnName(String tableName, String columnName) { - String schemaName = _tableConfigChangeListener._table2SchemaConfigMap.get(tableName.toLowerCase()); - if (schemaName != null) { - String actualColumnName = _schemaChangeListener.getColumnName(schemaName, columnName); - // If actual column name doesn't exist in schema, then return the origin column name. - if (actualColumnName == null) { - return columnName; + /** + * For case-insensitive only, returns a map from lower case column name to actual column name for the given table, or + * {@code null} if the table schema does not exist. + */ + @Nullable + public Map<String, String> getColumnNameMap(String rawTableName) { + Preconditions.checkState(_caseInsensitive, "TableCache is not case-insensitive"); + SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName); + return schemaInfo != null ? schemaInfo._columnNameMap : null; + } + + /** + * Returns the table config for the given table, or {@code null} if it does not exist. + */ + @Nullable + public TableConfig getTableConfig(String tableNameWithType) { + return _tableConfigMap.get(tableNameWithType); + } + + /** + * Returns the schema for the given table, or {@code null} if it does not exist. + */ + @Nullable + public Schema getSchema(String rawTableName) { + SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName); + return schemaInfo != null ? schemaInfo._schema : null; + } + + private void addTableConfigs(List<String> paths) { + // Subscribe data changes before reading the data to avoid missing changes + for (String path : paths) { + _propertyStore.subscribeDataChanges(path, _tableConfigChangeListener); + } + List<ZNRecord> znRecords = _propertyStore.get(paths, null, AccessOption.PERSISTENT); + for (ZNRecord znRecord : znRecords) { + if (znRecord != null) { + try { + putTableConfig(znRecord); + } catch (Exception e) { + LOGGER.error("Caught exception while adding table config for ZNRecord: {}", znRecord.getId(), e); + } } - return actualColumnName; } - return columnName; } - public TableConfig getTableConfig(String tableName) { - return _tableConfigChangeListener._tableConfigMap.get(tableName); + private void putTableConfig(ZNRecord znRecord) + throws IOException { + TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord); + String tableNameWithType = tableConfig.getTableName(); + _tableConfigMap.put(tableNameWithType, tableConfig); + if (_caseInsensitive) { + _tableNameMap.put(tableNameWithType.toLowerCase(), tableNameWithType); + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + _tableNameMap.put(rawTableName.toLowerCase(), rawTableName); + } } - class TableConfigChangeListener implements IZkChildListener, IZkDataListener { - - Map<String, TableConfig> _tableConfigMap = new ConcurrentHashMap<>(); - Map<String, String> _tableNameMap = new ConcurrentHashMap<>(); - Map<String, String> _table2SchemaConfigMap = new ConcurrentHashMap<>(); - - public synchronized void refresh() { - try { - //always subscribe first before reading, so that we dont miss any changes - _propertyStore.subscribeChildChanges(PROPERTYSTORE_TABLE_CONFIGS_PREFIX, _tableConfigChangeListener); - _propertyStore.subscribeDataChanges(PROPERTYSTORE_TABLE_CONFIGS_PREFIX, _tableConfigChangeListener); - List<ZNRecord> children = - _propertyStore.getChildren(PROPERTYSTORE_TABLE_CONFIGS_PREFIX, null, AccessOption.PERSISTENT); - if (children != null) { - for (ZNRecord znRecord : children) { - try { - TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord); - String tableNameWithType = tableConfig.getTableName(); - _tableConfigMap.put(tableNameWithType, tableConfig); - String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); - //create case insensitive mapping - _tableNameMap.put(tableNameWithType.toLowerCase(), tableNameWithType); - _tableNameMap.put(rawTableName.toLowerCase(), rawTableName); - //create case insensitive mapping between table name and schemaName - _table2SchemaConfigMap.put(tableNameWithType.toLowerCase(), rawTableName); - _table2SchemaConfigMap.put(rawTableName.toLowerCase(), rawTableName); - } catch (Exception e) { - LOGGER.warn("Exception loading table config for: {}: {}", znRecord.getId(), e.getMessage()); - //ignore - } - } + private void removeTableConfig(String path) { + _propertyStore.unsubscribeDataChanges(path, _tableConfigChangeListener); + String tableNameWithType = path.substring(TABLE_CONFIG_PATH_PREFIX.length()); + _tableConfigMap.remove(tableNameWithType); + if (_caseInsensitive) { + _tableNameMap.remove(tableNameWithType.toLowerCase()); + String lowerCaseRawTableName = TableNameBuilder.extractRawTableName(tableNameWithType).toLowerCase(); + if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) { + if (!_tableNameMap.containsKey(lowerCaseRawTableName + LOWER_CASE_REALTIME_TABLE_SUFFIX)) { + _tableNameMap.remove(lowerCaseRawTableName); + } + } else { + if (!_tableNameMap.containsKey(lowerCaseRawTableName + LOWER_CASE_OFFLINE_TABLE_SUFFIX)) { + _tableNameMap.remove(lowerCaseRawTableName); } - } catch (Exception e) { - LOGGER.warn("Exception subscribing/reading tableconfigs", e); - //ignore } } + } + + private void addSchemas(List<String> paths) { + // Subscribe data changes before reading the data to avoid missing changes + for (String path : paths) { + _propertyStore.subscribeDataChanges(path, _schemaChangeListener); + } + List<ZNRecord> znRecords = _propertyStore.get(paths, null, AccessOption.PERSISTENT); + for (ZNRecord znRecord : znRecords) { + if (znRecord != null) { + try { + putSchema(znRecord); + } catch (Exception e) { + LOGGER.error("Caught exception while adding schema for ZNRecord: {}", znRecord.getId(), e); + } + } + } + } + + private void putSchema(ZNRecord znRecord) + throws IOException { + Schema schema = SchemaUtils.fromZNRecord(znRecord); + String rawTableName = schema.getSchemaName(); + if (_caseInsensitive) { + Map<String, String> columnNameMap = new HashMap<>(); + for (String columnName : schema.getColumnNames()) { + columnNameMap.put(columnName.toLowerCase(), columnName); + } + _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, columnNameMap)); + } else { + _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, null)); + } + } + + private void removeSchema(String path) { + _propertyStore.unsubscribeDataChanges(path, _schemaChangeListener); + String rawTableName = path.substring(SCHEMA_PATH_PREFIX.length()); + _schemaInfoMap.remove(rawTableName); + } + + private class TableConfigChangeListener implements IZkChildListener, IZkDataListener { @Override - public void handleChildChange(String s, List<String> list) - throws Exception { - refresh(); + public synchronized void handleChildChange(String path, List<String> tables) { + if (CollectionUtils.isEmpty(tables)) { + return; + } + + // Only process new added table configs. Changed/removed table configs are handled by other callbacks. + List<String> pathsToAdd = new ArrayList<>(); + for (String tableNameWithType : tables) { + if (!_tableConfigMap.containsKey(tableNameWithType)) { + pathsToAdd.add(TABLE_CONFIG_PATH_PREFIX + tableNameWithType); + } + } + if (!pathsToAdd.isEmpty()) { + addTableConfigs(pathsToAdd); + } } @Override - public void handleDataChange(String s, Object o) - throws Exception { - refresh(); + public synchronized void handleDataChange(String path, Object data) { + if (data != null) { + ZNRecord znRecord = (ZNRecord) data; + try { + putTableConfig(znRecord); + } catch (Exception e) { + LOGGER.error("Caught exception while refreshing table config for ZNRecord: {}", znRecord.getId(), e); + } + } } @Override - public void handleDataDeleted(String s) - throws Exception { - refresh(); + public synchronized void handleDataDeleted(String path) { + // NOTE: The path here is the absolute ZK path instead of the relative path to the property store. + String tableNameWithType = path.substring(path.lastIndexOf('/') + 1); + removeTableConfig(TABLE_CONFIG_PATH_PREFIX + tableNameWithType); } } - class SchemaChangeListener implements IZkChildListener, IZkDataListener { - Map<String, Map<String, String>> _schemaColumnMap = new ConcurrentHashMap<>(); - - public synchronized void refresh() { - try { - //always subscribe first before reading, so that we dont miss any changes between reading and setting the watcher again - _propertyStore.subscribeChildChanges(PROPERTYSTORE_SCHEMAS_PREFIX, _schemaChangeListener); - _propertyStore.subscribeDataChanges(PROPERTYSTORE_SCHEMAS_PREFIX, _schemaChangeListener); - List<ZNRecord> children = - _propertyStore.getChildren(PROPERTYSTORE_SCHEMAS_PREFIX, null, AccessOption.PERSISTENT); - if (children != null) { - for (ZNRecord znRecord : children) { - try { - Schema schema = SchemaUtils.fromZNRecord(znRecord); - String schemaNameLowerCase = schema.getSchemaName().toLowerCase(); - Collection<FieldSpec> allFieldSpecs = schema.getAllFieldSpecs(); - ConcurrentHashMap<String, String> columnNameMap = new ConcurrentHashMap<>(); - _schemaColumnMap.put(schemaNameLowerCase, columnNameMap); - for (FieldSpec fieldSpec : allFieldSpecs) { - columnNameMap.put(fieldSpec.getName().toLowerCase(), fieldSpec.getName()); - } - } catch (Exception e) { - LOGGER.warn("Exception loading schema for: {}: {}", znRecord.getId(), e.getMessage()); - //ignore - } - } - } - } catch (Exception e) { - LOGGER.warn("Exception subscribing/reading schemas", e); - //ignore + private class SchemaChangeListener implements IZkChildListener, IZkDataListener { + + @Override + public synchronized void handleChildChange(String path, List<String> tables) { + if (CollectionUtils.isEmpty(tables)) { + return; } - } - String getColumnName(String schemaName, String columnName) { - Map<String, String> columnNameMap = _schemaColumnMap.get(schemaName.toLowerCase()); - if (columnNameMap != null) { - return columnNameMap.get(columnName.toLowerCase()); + // Only process new added schemas. Changed/removed schemas are handled by other callbacks. + List<String> pathsToAdd = new ArrayList<>(); + for (String rawTableName : tables) { + if (!_schemaInfoMap.containsKey(rawTableName)) { + pathsToAdd.add(SCHEMA_PATH_PREFIX + rawTableName); + } + } + if (!pathsToAdd.isEmpty()) { + addSchemas(pathsToAdd); } - return columnName; } @Override - public void handleChildChange(String s, List<String> list) - throws Exception { - refresh(); + public synchronized void handleDataChange(String path, Object data) { + if (data != null) { + ZNRecord znRecord = (ZNRecord) data; + try { + putSchema(znRecord); + } catch (Exception e) { + LOGGER.error("Caught exception while refreshing schema for ZNRecord: {}", znRecord.getId(), e); + } + } } @Override - public void handleDataChange(String s, Object o) - throws Exception { - refresh(); + public synchronized void handleDataDeleted(String path) { + // NOTE: The path here is the absolute ZK path instead of the relative path to the property store. + String rawTableName = path.substring(path.lastIndexOf('/') + 1); + removeSchema(SCHEMA_PATH_PREFIX + rawTableName); } + } - @Override - public void handleDataDeleted(String s) - throws Exception { - refresh(); + private static class SchemaInfo { + final Schema _schema; + final Map<String, String> _columnNameMap; + + private SchemaInfo(Schema schema, Map<String, String> columnNameMap) { + _schema = schema; + _columnNameMap = columnNameMap; } } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 7d5bd61..5cb528c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -27,6 +27,7 @@ import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -204,7 +205,15 @@ public class PinotHelixResourceManager { addInstanceGroupTagIfNeeded(); _segmentDeletionManager = new SegmentDeletionManager(_dataDir, _helixAdmin, _helixClusterName, _propertyStore); ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore, _isSingleTenantCluster); - _tableCache = new TableCache(_propertyStore); + + // Initialize TableCache + HelixConfigScope helixConfigScope = + new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(_helixClusterName).build(); + Map<String, String> configs = _helixAdmin.getConfig(helixConfigScope, + Arrays.asList(Helix.ENABLE_CASE_INSENSITIVE_KEY, Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY)); + boolean caseInsensitive = Boolean.parseBoolean(configs.get(Helix.ENABLE_CASE_INSENSITIVE_KEY)) || Boolean + .parseBoolean(configs.get(Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY)); + _tableCache = new TableCache(_propertyStore, caseInsensitive); } /** @@ -467,28 +476,23 @@ public class PinotHelixResourceManager { * @return tableName actually defined in Pinot (matches case) and exists ,else, return the input value */ public String getActualTableName(String tableName) { - return _tableCache.getActualTableName(tableName); - } - - /** - * Given a column name in any case, returns the column name as defined in Schema - * If table has no schema, it just returns the input value - * @param tableName - * @param columnName - * @return - */ - public String getActualColumnName(String tableName, String columnName) { - return _tableCache.getActualColumnName(tableName, columnName); + if (_tableCache.isCaseInsensitive()) { + String actualTableName = _tableCache.getActualTableName(tableName); + return actualTableName != null ? actualTableName : tableName; + } else { + return tableName; + } } /** - * Given a table name in any case, returns crypter class name defined in table config - * @param tableName table name in any case + * Returns the crypter class name defined in the table config for the given table. + * + * @param tableNameWithType Table name with type suffix * @return crypter class name */ - public String getCrypterClassNameFromTableConfig(String tableName) { - TableConfig tableConfig = _tableCache.getTableConfig(tableName); - Preconditions.checkNotNull(tableConfig, "Table config is not available for table '%s'", tableName); + public String getCrypterClassNameFromTableConfig(String tableNameWithType) { + TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType); + Preconditions.checkNotNull(tableConfig, "Table config is not available for table '%s'", tableNameWithType); return tableConfig.getValidationConfig().getCrypterClassName(); } @@ -1761,8 +1765,7 @@ public class PinotHelixResourceManager { propToUpdate.put(Helix.QUERY_RATE_LIMIT_DISABLED, Boolean.toString("DISABLE".equals(state))); HelixConfigScope scope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT, _helixClusterName) - .forParticipant(brokerInstanceName) - .build(); + .forParticipant(brokerInstanceName).build(); _helixAdmin.setConfig(scope, propToUpdate); } @@ -2264,10 +2267,10 @@ public class PinotHelixResourceManager { LineageEntry lineageEntry = segmentLineage.getLineageEntry(entryId); // Check that any segment from 'segmentsFrom' does not appear twice. - Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), segmentsFrom), String - .format("It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from " - + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType, - lineageEntry.getSegmentsFrom(), segmentsFrom)); + Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), segmentsFrom), String.format( + "It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from " + + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType, + lineageEntry.getSegmentsFrom(), segmentsFrom)); // Check that merged segments name cannot be the same. Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), segmentsTo), String.format( diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java new file mode 100644 index 0000000..9ba206d --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix; + +import com.google.common.base.Preconditions; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.common.utils.helix.TableCache; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class TableCacheTest extends ControllerTest { + + @BeforeClass + public void setUp() + throws Exception { + startZk(); + startController(); + addFakeBrokerInstancesToAutoJoinHelixCluster(1, true); + addFakeServerInstancesToAutoJoinHelixCluster(1, true); + } + + @Test + public void testTableCache() + throws Exception { + TableCache tableCache = new TableCache(_propertyStore, true); + + assertTrue(tableCache.isCaseInsensitive()); + assertNull(tableCache.getActualTableName("testTable")); + assertNull(tableCache.getColumnNameMap("testTable")); + assertNull(tableCache.getTableConfig("testTable_OFFLINE")); + assertNull(tableCache.getSchema("testTable")); + + // Add a table config + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build(); + _helixResourceManager.addTable(tableConfig); + // Wait for at most 10 seconds for the callback to add the table config to the cache + TestUtils.waitForCondition(aVoid -> tableCache.getTableConfig("testTable_OFFLINE") != null, 10_000L, + "Failed to add the table config to the cache"); + assertEquals(tableCache.getActualTableName("TeStTaBlE"), "testTable"); + assertEquals(tableCache.getActualTableName("TeStTaBlE_oFfLiNe"), "testTable_OFFLINE"); + assertNull(tableCache.getActualTableName("testTable_REALTIME")); + assertNull(tableCache.getColumnNameMap("testTable")); + assertEquals(tableCache.getTableConfig("testTable_OFFLINE"), tableConfig); + assertNull(tableCache.getSchema("testTable")); + + // Update the table config + tableConfig.getIndexingConfig().setCreateInvertedIndexDuringSegmentGeneration(true); + _helixResourceManager.updateTableConfig(tableConfig); + // Wait for at most 10 seconds for the callback to update the table config in the cache + // NOTE: Table config should never be null during the transitioning + TestUtils.waitForCondition( + aVoid -> Preconditions.checkNotNull(tableCache.getTableConfig("testTable_OFFLINE")).equals(tableConfig), + 10_000L, "Failed to update the table config in the cache"); + assertEquals(tableCache.getActualTableName("TeStTaBlE"), "testTable"); + assertEquals(tableCache.getActualTableName("TeStTaBlE_oFfLiNe"), "testTable_OFFLINE"); + assertNull(tableCache.getActualTableName("testTable_REALTIME")); + assertNull(tableCache.getColumnNameMap("testTable")); + assertEquals(tableCache.getTableConfig("testTable_OFFLINE"), tableConfig); + assertNull(tableCache.getSchema("testTable")); + + // Add a schema + Schema schema = + new Schema.SchemaBuilder().setSchemaName("testTable").addSingleValueDimension("testColumn", DataType.INT) + .build(); + _helixResourceManager.addSchema(schema, false); + // Wait for at most 10 seconds for the callback to add the schema to the cache + TestUtils.waitForCondition(aVoid -> tableCache.getSchema("testTable") != null, 10_000L, + "Failed to add the schema to the cache"); + assertEquals(tableCache.getActualTableName("TeStTaBlE"), "testTable"); + assertEquals(tableCache.getActualTableName("TeStTaBlE_oFfLiNe"), "testTable_OFFLINE"); + assertNull(tableCache.getActualTableName("testTable_REALTIME")); + assertEquals(tableCache.getColumnNameMap("testTable"), Collections.singletonMap("testcolumn", "testColumn")); + assertEquals(tableCache.getTableConfig("testTable_OFFLINE"), tableConfig); + assertEquals(tableCache.getSchema("testTable"), schema); + + // Update the schema + schema.addField(new DimensionFieldSpec("newColumn", DataType.LONG, true)); + _helixResourceManager.updateSchema(schema, false); + // Wait for at most 10 seconds for the callback to update the schema in the cache + // NOTE: schema should never be null during the transitioning + TestUtils.waitForCondition(aVoid -> Preconditions.checkNotNull(tableCache.getSchema("testTable")).equals(schema), + 10_000L, "Failed to update the schema in the cache"); + assertEquals(tableCache.getActualTableName("TeStTaBlE"), "testTable"); + assertEquals(tableCache.getActualTableName("TeStTaBlE_oFfLiNe"), "testTable_OFFLINE"); + assertNull(tableCache.getActualTableName("testTable_REALTIME")); + Map<String, String> expectedColumnMap = new HashMap<>(); + expectedColumnMap.put("testcolumn", "testColumn"); + expectedColumnMap.put("newcolumn", "newColumn"); + assertEquals(tableCache.getColumnNameMap("testTable"), expectedColumnMap); + assertEquals(tableCache.getTableConfig("testTable_OFFLINE"), tableConfig); + assertEquals(tableCache.getSchema("testTable"), schema); + + // Create a new case-sensitive TableCache which should load the existing table config and schema + TableCache caseSensitiveTableCache = new TableCache(_propertyStore, false); + assertFalse(caseSensitiveTableCache.isCaseInsensitive()); + // Getting actual table name or column name map should throw exception for case-sensitive TableCache + try { + caseSensitiveTableCache.getActualTableName("testTable"); + fail(); + } catch (Exception e) { + // Expected + } + try { + caseSensitiveTableCache.getColumnNameMap("testTable"); + fail(); + } catch (Exception e) { + // Expected + } + assertEquals(tableCache.getTableConfig("testTable_OFFLINE"), tableConfig); + assertEquals(tableCache.getSchema("testTable"), schema); + + // Remove the table config + _helixResourceManager.deleteOfflineTable("testTable"); + // Wait for at most 10 seconds for the callback to remove the table config from the cache + TestUtils.waitForCondition(aVoid -> tableCache.getTableConfig("testTable_OFFLINE") == null, 10_000L, + "Failed to remove the table config from the cache"); + // Case-insensitive table name are handled based on the table config instead of the schema + assertNull(tableCache.getActualTableName("testTable")); + assertEquals(tableCache.getColumnNameMap("testTable"), expectedColumnMap); + assertNull(tableCache.getTableConfig("testTable_OFFLINE")); + assertEquals(tableCache.getSchema("testTable"), schema); + + // Remove the schema + _helixResourceManager.deleteSchema(schema); + // Wait for at most 10 seconds for the callback to remove the schema from the cache + TestUtils.waitForCondition(aVoid -> tableCache.getSchema("testTable") == null, 10_000L, + "Failed to remove the schema from the cache"); + assertNull(tableCache.getActualTableName("testTable")); + assertNull(tableCache.getColumnNameMap("testTable")); + assertNull(tableCache.getTableConfig("testTable_OFFLINE")); + assertNull(tableCache.getSchema("testTable")); + } + + @AfterClass + public void tearDown() { + stopController(); + stopZk(); + } +} diff --git a/pinot-spi/pom.xml b/pinot-spi/pom.xml index ab433e7..9e4535e 100644 --- a/pinot-spi/pom.xml +++ b/pinot-spi/pom.xml @@ -92,6 +92,10 @@ <artifactId>commons-lang3</artifactId> </dependency> <dependency> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + </dependency> + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> diff --git a/pom.xml b/pom.xml index c59189a..a3f23b7 100644 --- a/pom.xml +++ b/pom.xml @@ -402,11 +402,11 @@ <artifactId>commons-lang3</artifactId> <version>3.5</version> </dependency> - <dependency> - <groupId>commons-collections</groupId> - <artifactId>commons-collections</artifactId> - <version>3.2.1</version> - </dependency> + <dependency> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + <version>3.2.1</version> + </dependency> <dependency> <groupId>commons-configuration</groupId> <artifactId>commons-configuration</artifactId> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org