This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch cache-table-schemas-in-broker in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 8bbed08223f4b461018738dc6ba6afda73292ce3 Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> AuthorDate: Thu Nov 29 15:26:36 2018 -0800 Add guava cache to cache table schema in pinot broker --- .../pinot/broker/broker/BrokerServerBuilder.java | 11 ++- .../broker/broker/helix/HelixBrokerStarter.java | 3 +- .../requesthandler/BaseBrokerRequestHandler.java | 84 +++++++++++++++++++++- .../ConnectionPoolBrokerRequestHandler.java | 10 ++- .../SingleConnectionBrokerRequestHandler.java | 8 ++- .../broker/requesthandler/TableSchemaCache.java | 67 +++++++++++++++++ 6 files changed, 173 insertions(+), 10 deletions(-) diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/BrokerServerBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/BrokerServerBuilder.java index c1e2d59..71ee2f8 100644 --- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/BrokerServerBuilder.java +++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/BrokerServerBuilder.java @@ -30,6 +30,8 @@ import com.linkedin.pinot.common.utils.CommonConstants; import com.yammer.metrics.core.MetricsRegistry; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.configuration.Configuration; +import org.apache.helix.ZNRecord; +import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +65,7 @@ public class BrokerServerBuilder { private final TimeBoundaryService _timeBoundaryService; private final LiveInstancesChangeListenerImpl _liveInstanceChangeListener; private final TableQueryQuotaManager _tableQueryQuotaManager; + private final ZkHelixPropertyStore<ZNRecord> _propertyStore; private final AccessControlFactory _accessControlFactory; private final MetricsRegistry _metricsRegistry; private final BrokerMetrics _brokerMetrics; @@ -70,7 +73,8 @@ public class BrokerServerBuilder { private final BrokerAdminApiApplication _brokerAdminApplication; public BrokerServerBuilder(Configuration config, RoutingTable routingTable, TimeBoundaryService timeBoundaryService, - LiveInstancesChangeListenerImpl liveInstanceChangeListener, TableQueryQuotaManager tableQueryQuotaManager) { + LiveInstancesChangeListenerImpl liveInstanceChangeListener, TableQueryQuotaManager tableQueryQuotaManager, + ZkHelixPropertyStore<ZNRecord> propertyStore) { _state.set(State.INIT); _config = config; _delayedShutdownTimeMs = config.getLong(DELAY_SHUTDOWN_TIME_MS_CONFIG, DEFAULT_DELAY_SHUTDOWN_TIME_MS); @@ -78,6 +82,7 @@ public class BrokerServerBuilder { _timeBoundaryService = timeBoundaryService; _liveInstanceChangeListener = liveInstanceChangeListener; _tableQueryQuotaManager = tableQueryQuotaManager; + _propertyStore = propertyStore; _accessControlFactory = AccessControlFactory.loadFactory(_config.subset(ACCESS_CONTROL_PREFIX)); _metricsRegistry = new MetricsRegistry(); MetricsHelper.initializeMetrics(config.subset(METRICS_CONFIG_PREFIX)); @@ -93,11 +98,11 @@ public class BrokerServerBuilder { if (requestHandlerType.equalsIgnoreCase(SINGLE_CONNECTION_REQUEST_HANDLER_TYPE)) { LOGGER.info("Using SingleConnectionBrokerRequestHandler"); return new SingleConnectionBrokerRequestHandler(_config, _routingTable, _timeBoundaryService, - _accessControlFactory, _tableQueryQuotaManager, _brokerMetrics); + _accessControlFactory, _tableQueryQuotaManager, _propertyStore, _brokerMetrics); } else { LOGGER.info("Using ConnectionPoolBrokerRequestHandler"); return new ConnectionPoolBrokerRequestHandler(_config, _routingTable, _timeBoundaryService, _accessControlFactory, - _tableQueryQuotaManager, _brokerMetrics, _liveInstanceChangeListener, _metricsRegistry); + _tableQueryQuotaManager, _propertyStore, _brokerMetrics, _liveInstanceChangeListener, _metricsRegistry); } } diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/helix/HelixBrokerStarter.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/helix/HelixBrokerStarter.java index ac7e6de..fa81795 100644 --- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/helix/HelixBrokerStarter.java +++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/helix/HelixBrokerStarter.java @@ -198,7 +198,8 @@ public class HelixBrokerStarter { config = DefaultHelixBrokerConfig.getDefaultBrokerConf(); } BrokerServerBuilder brokerServerBuilder = new BrokerServerBuilder(config, _helixExternalViewBasedRouting, - _helixExternalViewBasedRouting.getTimeBoundaryService(), _liveInstancesListener, _tableQueryQuotaManager); + _helixExternalViewBasedRouting.getTimeBoundaryService(), _liveInstancesListener, _tableQueryQuotaManager, + _propertyStore); _accessControlFactory = brokerServerBuilder.getAccessControlFactory(); _helixExternalViewBasedRouting.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics()); _tableQueryQuotaManager.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics()); diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 014842c..7a4f424 100644 --- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -23,30 +23,46 @@ import com.linkedin.pinot.broker.routing.RoutingTable; import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest; import com.linkedin.pinot.broker.routing.TimeBoundaryService; import com.linkedin.pinot.common.config.TableNameBuilder; +import com.linkedin.pinot.common.data.Schema; import com.linkedin.pinot.common.exception.QueryException; import com.linkedin.pinot.common.metrics.BrokerMeter; import com.linkedin.pinot.common.metrics.BrokerMetrics; import com.linkedin.pinot.common.metrics.BrokerQueryPhase; +import com.linkedin.pinot.common.request.AggregationInfo; import com.linkedin.pinot.common.request.BrokerRequest; import com.linkedin.pinot.common.request.FilterOperator; import com.linkedin.pinot.common.request.FilterQuery; import com.linkedin.pinot.common.request.FilterQueryMap; +import com.linkedin.pinot.common.request.GroupBy; +import com.linkedin.pinot.common.request.Selection; +import com.linkedin.pinot.common.request.transform.TransformExpressionTree; import com.linkedin.pinot.common.response.BrokerResponse; import com.linkedin.pinot.common.response.broker.BrokerResponseNative; import com.linkedin.pinot.common.utils.CommonConstants; +import com.linkedin.pinot.common.utils.request.FilterQueryTree; +import com.linkedin.pinot.common.utils.request.RequestUtils; +import com.linkedin.pinot.core.query.aggregation.function.AggregationFunctionType; +import com.linkedin.pinot.core.query.aggregation.function.AggregationFunctionUtils; import com.linkedin.pinot.core.query.reduce.BrokerReduceService; import com.linkedin.pinot.pql.parsers.Pql2Compiler; import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang3.StringUtils; +import org.apache.helix.ZNRecord; +import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,11 +81,13 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { protected final TimeBoundaryService _timeBoundaryService; protected final AccessControlFactory _accessControlFactory; protected final TableQueryQuotaManager _tableQueryQuotaManager; + protected final ZkHelixPropertyStore<ZNRecord> _propertyStore; protected final BrokerMetrics _brokerMetrics; protected final AtomicLong _requestIdGenerator = new AtomicLong(); protected final BrokerRequestOptimizer _brokerRequestOptimizer = new BrokerRequestOptimizer(); protected final BrokerReduceService _brokerReduceService = new BrokerReduceService(); + protected final TableSchemaCache _tableSchemaCache; protected final String _brokerId; protected final long _brokerTimeoutMs; @@ -78,13 +96,16 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { public BaseBrokerRequestHandler(Configuration config, RoutingTable routingTable, TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory, - TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics) { + TableQueryQuotaManager tableQueryQuotaManager, ZkHelixPropertyStore<ZNRecord> propertyStore, + BrokerMetrics brokerMetrics) { _config = config; _routingTable = routingTable; _timeBoundaryService = timeBoundaryService; _accessControlFactory = accessControlFactory; _tableQueryQuotaManager = tableQueryQuotaManager; + _propertyStore = propertyStore; _brokerMetrics = brokerMetrics; + _tableSchemaCache = new TableSchemaCache(_propertyStore); _brokerId = config.getString(CONFIG_OF_BROKER_ID, getDefaultBrokerId()); _brokerTimeoutMs = config.getLong(CONFIG_OF_BROKER_TIMEOUT_MS, DEFAULT_BROKER_TIMEOUT_MS); @@ -311,6 +332,67 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { "Value for 'LIMIT' (" + limit + ") exceeds maximum allowed value of " + _queryResponseLimit); } } + + // Checks whether the query contains non-existence columns. + // Table name has already been verified before hitting this line. + String tableName = brokerRequest.getQuerySource().getTableName(); + Schema schema = _tableSchemaCache.getIfTableSchemaPresent(tableName); + if (schema != null) { + Set<String> allColumns = getAllColumnsFromBrokerRequest(brokerRequest); + Set<String> copied = new HashSet<>(allColumns); + copied.removeAll(schema.getColumnNames()); + if (!copied.isEmpty()) { + throw new RuntimeException("Found non-existence columns from the query: " + copied.toString()); + } + } else { + // If the cache doesn't have the schema, loads the schema to the cache asynchronously. + _tableSchemaCache.refreshTableSchema(tableName); + } + } + + /** + * Helper to get all the columns from broker request. + * Returns the set of all the columns. + */ + private Set<String> getAllColumnsFromBrokerRequest(BrokerRequest brokerRequest) { + Set<String> allColumns = new HashSet<>(); + // Filter + FilterQueryTree filterQueryTree = RequestUtils.generateFilterQueryTree(brokerRequest); + if (filterQueryTree != null) { + allColumns.addAll(RequestUtils.extractFilterColumns(filterQueryTree)); + } + + // Aggregation + List<AggregationInfo> aggregationsInfo = brokerRequest.getAggregationsInfo(); + if (aggregationsInfo != null) { + Set<TransformExpressionTree> _aggregationExpressions = new HashSet<>(); + for (AggregationInfo aggregationInfo : aggregationsInfo) { + if (!aggregationInfo.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) { + _aggregationExpressions.add( + TransformExpressionTree.compileToExpressionTree(AggregationFunctionUtils.getColumn(aggregationInfo))); + } + } + allColumns.addAll(RequestUtils.extractColumnsFromExpressions(_aggregationExpressions)); + } + + // Group-by + GroupBy groupBy = brokerRequest.getGroupBy(); + if (groupBy != null) { + Set<TransformExpressionTree> groupByExpressions = new HashSet<>(); + for (String expression : groupBy.getExpressions()) { + groupByExpressions.add(TransformExpressionTree.compileToExpressionTree(expression)); + } + allColumns.addAll(RequestUtils.extractColumnsFromExpressions(groupByExpressions)); + } + + + // Selection + Selection selection = brokerRequest.getSelections(); + if (selection != null) { + allColumns.addAll(RequestUtils.extractSelectionColumns(selection)); + } + + return allColumns; } /** diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java index b17eb42..09aa125 100644 --- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java @@ -62,6 +62,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.configuration.Configuration; +import org.apache.helix.ZNRecord; +import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.thrift.protocol.TCompactProtocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,9 +87,11 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler public ConnectionPoolBrokerRequestHandler(Configuration config, RoutingTable routingTable, TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory, - TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics, - LiveInstancesChangeListenerImpl liveInstanceChangeListener, MetricsRegistry metricsRegistry) { - super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, brokerMetrics); + TableQueryQuotaManager tableQueryQuotaManager, ZkHelixPropertyStore<ZNRecord> propertyStore, + BrokerMetrics brokerMetrics, LiveInstancesChangeListenerImpl liveInstanceChangeListener, + MetricsRegistry metricsRegistry) { + super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, propertyStore, + brokerMetrics); _liveInstanceChangeListener = liveInstanceChangeListener; TransportClientConf transportClientConf = new TransportClientConf(); diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java index 762b724..3389bcb 100644 --- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java @@ -39,6 +39,8 @@ import java.util.Map; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.configuration.Configuration; +import org.apache.helix.ZNRecord; +import org.apache.helix.store.zk.ZkHelixPropertyStore; /** @@ -51,8 +53,10 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl public SingleConnectionBrokerRequestHandler(Configuration config, RoutingTable routingTable, TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory, - TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics) { - super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, brokerMetrics); + TableQueryQuotaManager tableQueryQuotaManager, ZkHelixPropertyStore<ZNRecord> propertyStore, + BrokerMetrics brokerMetrics) { + super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, propertyStore, + brokerMetrics); _queryRouter = new QueryRouter(_brokerId, brokerMetrics); } diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/TableSchemaCache.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/TableSchemaCache.java new file mode 100644 index 0000000..6c8c58c --- /dev/null +++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/TableSchemaCache.java @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-c...@linkedin.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.linkedin.pinot.broker.requesthandler; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.linkedin.pinot.common.config.TableNameBuilder; +import com.linkedin.pinot.common.data.Schema; +import com.linkedin.pinot.common.metadata.ZKMetadataProvider; +import java.util.concurrent.TimeUnit; +import org.apache.helix.ZNRecord; +import org.apache.helix.store.zk.ZkHelixPropertyStore; + + +public class TableSchemaCache { + private static final long DEFAULT_CACHE_SIZE = 50; + private static final long DEFAULT_CACHE_TIMEOUT_IN_MINUTE = 60; + + private final LoadingCache<String, Schema> _tableSchemaCache; + private final ZkHelixPropertyStore<ZNRecord> _propertyStore; + + TableSchemaCache(ZkHelixPropertyStore<ZNRecord> propertyStore) { + _propertyStore = propertyStore; + _tableSchemaCache = CacheBuilder.newBuilder() + .maximumSize(DEFAULT_CACHE_SIZE) + .expireAfterWrite(DEFAULT_CACHE_TIMEOUT_IN_MINUTE, TimeUnit.MINUTES) + .build(new CacheLoader<String, Schema>() { + @Override + public Schema load(String rawTableName) { + return ZKMetadataProvider.getTableSchema(_propertyStore, rawTableName); + } + }); + + } + + /** + * Refreshes table schema. + * @param tableName Table name with or without type suffix. + */ + public void refreshTableSchema(String tableName) { + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + _tableSchemaCache.refresh(rawTableName); + } + + /** + * Gets table schema if it's present. + * @param tableName Table name with or without type suffix. + */ + public Schema getIfTableSchemaPresent(String tableName) { + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + return _tableSchemaCache.getIfPresent(rawTableName); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org