This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 9a7e11682a Database query quota (#13544) 9a7e11682a is described below commit 9a7e11682a56761684edb1aae4a7ec5b9984c6b8 Author: Shounak kulkarni <shounakmk...@gmail.com> AuthorDate: Mon Aug 5 08:42:26 2024 +0500 Database query quota (#13544) --- .../broker/broker/helix/BaseBrokerStarter.java | 17 ++ ...okerResourceOnlineOfflineStateModelFactory.java | 3 + .../BrokerUserDefinedMessageHandlerFactory.java | 33 +++ .../broker/broker/helix/ClusterChangeMediator.java | 12 +- .../HelixExternalViewBasedQueryQuotaManager.java | 219 ++++++++++++++++++-- .../pinot/broker/queryquota/QueryQuotaManager.java | 7 + .../BaseSingleStageBrokerRequestHandler.java | 10 +- .../MultiStageBrokerRequestHandler.java | 13 +- ...elixExternalViewBasedQueryQuotaManagerTest.java | 222 +++++++++++++++++---- .../BaseSingleStageBrokerRequestHandlerTest.java | 1 + .../messages/DatabaseConfigRefreshMessage.java | 60 ++++++ .../pinot/common/metadata/ZKMetadataProvider.java | 85 ++++++++ .../apache/pinot/common/utils/DatabaseUtils.java | 14 ++ .../resources/PinotDatabaseRestletResource.java | 85 +++++++- .../helix/core/PinotHelixResourceManager.java | 56 ++++++ .../java/org/apache/pinot/core/auth/Actions.java | 2 + .../tests/QueryQuotaClusterIntegrationTest.java | 207 +++++++++++++++++++ .../apache/pinot/spi/config/DatabaseConfig.java | 56 ++++++ .../apache/pinot/spi/utils/CommonConstants.java | 1 + 19 files changed, 1046 insertions(+), 57 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java index 04bf6ce921..553228d89c 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java @@ -107,6 +107,8 @@ public abstract class BaseBrokerStarter implements ServiceStartable { protected String _instanceId; private volatile boolean _isStarting = false; private volatile boolean _isShuttingDown = false; + + protected final List<ClusterChangeHandler> _clusterConfigChangeHandlers = new ArrayList<>(); protected final List<ClusterChangeHandler> _idealStateChangeHandlers = new ArrayList<>(); protected final List<ClusterChangeHandler> _externalViewChangeHandlers = new ArrayList<>(); protected final List<ClusterChangeHandler> _instanceConfigChangeHandlers = new ArrayList<>(); @@ -214,6 +216,15 @@ public abstract class BaseBrokerStarter implements ServiceStartable { _instanceConfigChangeHandlers.add(instanceConfigChangeHandler); } + /** + * Adds a cluster config change handler to handle Helix cluster config change callbacks. + * <p>NOTE: all change handlers will be run in a single thread, so any slow change handler can block other change + * handlers from running. For slow change handler, make it asynchronous. + */ + public void addClusterConfigChangeHandler(ClusterChangeHandler clusterConfigChangeHandler) { + _clusterConfigChangeHandlers.add(clusterConfigChangeHandler); + } + /** * Adds a live instance change handler to handle Helix live instance change callbacks. * <p>NOTE: all change handlers will be run in a single thread, so any slow change handler can block other change @@ -350,6 +361,10 @@ public abstract class BaseBrokerStarter implements ServiceStartable { _brokerAdminApplication.start(_listenerConfigs); LOGGER.info("Initializing cluster change mediator"); + for (ClusterChangeHandler clusterConfigChangeHandler : _clusterConfigChangeHandlers) { + clusterConfigChangeHandler.init(_spectatorHelixManager); + } + _clusterConfigChangeHandlers.add(queryQuotaManager); for (ClusterChangeHandler idealStateChangeHandler : _idealStateChangeHandlers) { idealStateChangeHandler.init(_spectatorHelixManager); } @@ -368,6 +383,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable { liveInstanceChangeHandler.init(_spectatorHelixManager); } Map<ChangeType, List<ClusterChangeHandler>> clusterChangeHandlersMap = new HashMap<>(); + clusterChangeHandlersMap.put(ChangeType.CLUSTER_CONFIG, _clusterConfigChangeHandlers); clusterChangeHandlersMap.put(ChangeType.IDEAL_STATE, _idealStateChangeHandlers); clusterChangeHandlersMap.put(ChangeType.EXTERNAL_VIEW, _externalViewChangeHandlers); clusterChangeHandlersMap.put(ChangeType.INSTANCE_CONFIG, _instanceConfigChangeHandlers); @@ -379,6 +395,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable { _spectatorHelixManager.addIdealStateChangeListener(_clusterChangeMediator); _spectatorHelixManager.addExternalViewChangeListener(_clusterChangeMediator); _spectatorHelixManager.addInstanceConfigChangeListener(_clusterChangeMediator); + _spectatorHelixManager.addClusterfigChangeListener(_clusterChangeMediator); if (!_liveInstanceChangeHandlers.isEmpty()) { _spectatorHelixManager.addLiveInstanceChangeListener(_clusterChangeMediator); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java index 648a2a43ff..41f9e9ef87 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java @@ -30,6 +30,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager; import org.apache.pinot.broker.routing.BrokerRoutingManager; import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.utils.DatabaseUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,6 +82,8 @@ public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFact TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, _helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(BROKER_RESOURCE_INSTANCE))); + _queryQuotaManager.createDatabaseRateLimiter( + DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(tableNameWithType)); } catch (Exception e) { LOGGER.error("Caught exception while processing transition from OFFLINE to ONLINE for table: {}", tableNameWithType, e); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java index 4283b10cc9..2c2cc33532 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java @@ -25,9 +25,11 @@ import org.apache.helix.messaging.handling.MessageHandlerFactory; import org.apache.helix.model.Message; import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager; import org.apache.pinot.broker.routing.BrokerRoutingManager; +import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage; import org.apache.pinot.common.messages.RoutingTableRebuildMessage; import org.apache.pinot.common.messages.SegmentRefreshMessage; import org.apache.pinot.common.messages.TableConfigRefreshMessage; +import org.apache.pinot.common.utils.DatabaseUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +63,8 @@ public class BrokerUserDefinedMessageHandlerFactory implements MessageHandlerFac return new RefreshTableConfigMessageHandler(new TableConfigRefreshMessage(message), context); case RoutingTableRebuildMessage.REBUILD_ROUTING_TABLE_MSG_SUB_TYPE: return new RebuildRoutingTableMessageHandler(new RoutingTableRebuildMessage(message), context); + case DatabaseConfigRefreshMessage.REFRESH_DATABASE_CONFIG_MSG_SUB_TYPE: + return new RefreshDatabaseConfigMessageHandler(new DatabaseConfigRefreshMessage(message), context); default: // NOTE: Log a warning and return no-op message handler for unsupported message sub-types. This can happen when // a new message sub-type is added, and the sender gets deployed first while receiver is still running the @@ -117,6 +121,9 @@ public class BrokerUserDefinedMessageHandlerFactory implements MessageHandlerFac // TODO: Fetch the table config here and pass it into the managers, or consider merging these 2 managers _routingManager.buildRouting(_tableNameWithType); _queryQuotaManager.initOrUpdateTableQueryQuota(_tableNameWithType); + // only create the rate limiter if not present. This message has no reason to update the database rate limiter + _queryQuotaManager.createDatabaseRateLimiter( + DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(_tableNameWithType)); HelixTaskResult result = new HelixTaskResult(); result.setSuccess(true); return result; @@ -129,6 +136,32 @@ public class BrokerUserDefinedMessageHandlerFactory implements MessageHandlerFac } } + private class RefreshDatabaseConfigMessageHandler extends MessageHandler { + final String _databaseName; + + RefreshDatabaseConfigMessageHandler(DatabaseConfigRefreshMessage databaseConfigRefreshMessage, + NotificationContext context) { + super(databaseConfigRefreshMessage, context); + _databaseName = databaseConfigRefreshMessage.getDatabaseName(); + } + + @Override + public HelixTaskResult handleMessage() { + // only update the existing rate limiter. + // Database rate limiter creation should only be done through table based change triggers + _queryQuotaManager.updateDatabaseRateLimiter(_databaseName); + HelixTaskResult result = new HelixTaskResult(); + result.setSuccess(true); + return result; + } + + @Override + public void onError(Exception e, ErrorCode code, ErrorType type) { + LOGGER.error("Got error while refreshing database config for database: {} (error code: {}, error type: {})", + _databaseName, code, type, e); + } + } + private class RebuildRoutingTableMessageHandler extends MessageHandler { final String _tableNameWithType; diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java index 202f1a3f8e..f1e25804bf 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java @@ -26,11 +26,13 @@ import java.util.concurrent.TimeUnit; import org.apache.helix.HelixConstants.ChangeType; import org.apache.helix.NotificationContext; import org.apache.helix.api.listeners.BatchMode; +import org.apache.helix.api.listeners.ClusterConfigChangeListener; import org.apache.helix.api.listeners.ExternalViewChangeListener; import org.apache.helix.api.listeners.IdealStateChangeListener; import org.apache.helix.api.listeners.InstanceConfigChangeListener; import org.apache.helix.api.listeners.LiveInstanceChangeListener; import org.apache.helix.api.listeners.PreFetch; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; @@ -55,7 +57,7 @@ import org.slf4j.LoggerFactory; @PreFetch(enabled = false) public class ClusterChangeMediator implements IdealStateChangeListener, ExternalViewChangeListener, InstanceConfigChangeListener, - LiveInstanceChangeListener { + ClusterConfigChangeListener, LiveInstanceChangeListener { private static final Logger LOGGER = LoggerFactory.getLogger(ClusterChangeMediator.class); // If no change got for 1 hour, proactively check changes @@ -192,6 +194,14 @@ public class ClusterChangeMediator enqueueChange(ChangeType.INSTANCE_CONFIG); } + @Override + public void onClusterConfigChange(ClusterConfig clusterConfig, NotificationContext context) { + // Cluster config should be null because Helix pre-fetch is disabled + assert clusterConfig == null; + + enqueueChange(ChangeType.CLUSTER_CONFIG); + } + @Override public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) { // Live instance list should be empty because Helix pre-fetch is disabled diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java index dabb95867b..d05de53f3e 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java @@ -21,14 +21,22 @@ package org.apache.pinot.broker.queryquota; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.RateLimiter; +import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.collections4.SetUtils; import org.apache.helix.AccessOption; +import org.apache.helix.HelixAdmin; import org.apache.helix.HelixConstants; import org.apache.helix.HelixManager; import org.apache.helix.model.ExternalView; +import org.apache.helix.model.HelixConfigScope; +import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException; @@ -37,6 +45,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metrics.BrokerGauge; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.utils.helix.HelixHelper; +import org.apache.pinot.spi.config.DatabaseConfig; import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -49,18 +58,30 @@ import org.slf4j.LoggerFactory; /** * This class is to support the qps quota feature. - * It depends on the broker source change to update the dynamic rate limit, - * which means it only gets updated when a new table added or a broker restarted. + * It allows performing qps quota check at table level and database level + * For table level check it depends on the broker source change to update the dynamic rate limit, + * which means it gets updated when a new table added or a broker restarted. + * For database level check it depends on the broker as well as cluster config and database config change + * to update the dynamic rate limit, which means it gets updated when + * - the default query quota at cluster config is updated + * - the database config is updated + * - new table is assigned to the broker (rate limiter is created if not present) + * - broker added or removed from cluster */ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHandler, QueryQuotaManager { private static final Logger LOGGER = LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class); private static final int ONE_SECOND_TIME_RANGE_IN_SECOND = 1; private static final int ONE_MINUTE_TIME_RANGE_IN_SECOND = 60; + private static final Set<HelixConstants.ChangeType> CHANGE_TYPES_TO_PROCESS = SetUtils.hashSet( + HelixConstants.ChangeType.EXTERNAL_VIEW, HelixConstants.ChangeType.INSTANCE_CONFIG, + HelixConstants.ChangeType.CLUSTER_CONFIG); private final BrokerMetrics _brokerMetrics; private final String _instanceId; private final AtomicInteger _lastKnownBrokerResourceVersion = new AtomicInteger(-1); private final Map<String, QueryQuotaEntity> _rateLimiterMap = new ConcurrentHashMap<>(); + private final Map<String, QueryQuotaEntity> _databaseRateLimiterMap = new ConcurrentHashMap<>(); + private double _defaultQpsQuotaForDatabase; private HelixManager _helixManager; private ZkHelixPropertyStore<ZNRecord> _propertyStore; @@ -76,20 +97,22 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan Preconditions.checkState(_helixManager == null, "HelixExternalViewBasedQueryQuotaManager is already initialized"); _helixManager = helixManager; _propertyStore = _helixManager.getHelixPropertyStore(); + _defaultQpsQuotaForDatabase = getDefaultQueryQuotaForDatabase(); getQueryQuotaEnabledFlagFromInstanceConfig(); } @Override public void processClusterChange(HelixConstants.ChangeType changeType) { - Preconditions.checkState(changeType == HelixConstants.ChangeType.EXTERNAL_VIEW - || changeType == HelixConstants.ChangeType.INSTANCE_CONFIG, "Illegal change type: " + changeType); + Preconditions.checkState(CHANGE_TYPES_TO_PROCESS.contains(changeType), "Illegal change type: " + changeType); if (changeType == HelixConstants.ChangeType.EXTERNAL_VIEW) { ExternalView brokerResourceEV = HelixHelper .getExternalViewForResource(_helixManager.getClusterManagmentTool(), _helixManager.getClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); processQueryRateLimitingExternalViewChange(brokerResourceEV); - } else { + } else if (changeType == HelixConstants.ChangeType.INSTANCE_CONFIG) { processQueryRateLimitingInstanceConfigChange(); + } else { + processQueryRateLimitingClusterConfigChange(); } } @@ -230,6 +253,118 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan } } + /** + * Updates the database rate limiter if it already exists. Will not create a new database rate limiter. + * @param databaseName database name for which rate limiter needs to be updated + */ + public void updateDatabaseRateLimiter(String databaseName) { + if (!_databaseRateLimiterMap.containsKey(databaseName)) { + return; + } + createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName)); + } + + // Caller method need not worry about getting lock on _databaseRateLimiterMap + // as this method will do idempotent updates to the database rate limiters + private synchronized void createOrUpdateDatabaseRateLimiter(List<String> databaseNames) { + ExternalView brokerResource = HelixHelper + .getExternalViewForResource(_helixManager.getClusterManagmentTool(), _helixManager.getClusterName(), + CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); + for (String databaseName : databaseNames) { + double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName); + if (databaseQpsQuota < 0) { + buildEmptyOrResetDatabaseRateLimiter(databaseName); + continue; + } + int numOnlineBrokers = getNumOnlineBrokers(databaseName, brokerResource); + double perBrokerQpsQuota = databaseQpsQuota / numOnlineBrokers; + QueryQuotaEntity oldQueryQuotaEntity = _databaseRateLimiterMap.get(databaseName); + if (oldQueryQuotaEntity == null) { + LOGGER.info("Adding new query rate limiter for database {} with rate {}.", databaseName, perBrokerQpsQuota); + QueryQuotaEntity queryQuotaEntity = new QueryQuotaEntity(RateLimiter.create(perBrokerQpsQuota), + new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), + numOnlineBrokers, databaseQpsQuota, -1); + _databaseRateLimiterMap.put(databaseName, queryQuotaEntity); + continue; + } + boolean changeDetected = false; + double oldQuota = oldQueryQuotaEntity.getRateLimiter() != null ? oldQueryQuotaEntity.getRateLimiter().getRate() + : -1; + if (oldQueryQuotaEntity.getOverallRate() != databaseQpsQuota) { + changeDetected = true; + LOGGER.info("Overall quota changed for the database from {} to {}", oldQueryQuotaEntity.getOverallRate(), + databaseQpsQuota); + oldQueryQuotaEntity.setOverallRate(databaseQpsQuota); + } + if (oldQueryQuotaEntity.getNumOnlineBrokers() != numOnlineBrokers) { + changeDetected = true; + LOGGER.info("Number of online brokers changed for the database from {} to {}", + oldQueryQuotaEntity.getNumOnlineBrokers(), numOnlineBrokers); + oldQueryQuotaEntity.setNumOnlineBrokers(numOnlineBrokers); + } + if (!changeDetected) { + LOGGER.info("No change detected with the query rate limiter for database {}", databaseName); + continue; + } + LOGGER.info("Updating existing query rate limiter for database {} from rate {} to {}", databaseName, oldQuota, + perBrokerQpsQuota); + oldQueryQuotaEntity.setRateLimiter(RateLimiter.create(perBrokerQpsQuota)); + } + } + + // Pulling this logic to a separate placeholder method so that the quota split logic + // can be enhanced further in isolation. + private int getNumOnlineBrokers(String databaseName, ExternalView brokerResource) { + // Tables in database can span across broker tags as we don't maintain a broker tag to database mapping as of now. + // Hence, we consider all online brokers for the rate distribution. + // TODO consider computing only the online brokers which serve the tables under the database + return HelixHelper.getOnlineInstanceFromExternalView(brokerResource).size(); + } + + /** + * Utility to get the effective query quota being imposed on a database. + * It is computed based on the default quota set at cluster config and override set at database config + * @param databaseName database name to get the query quota on. + * @return effective query quota limit being applied + */ + private double getEffectiveQueryQuotaOnDatabase(String databaseName) { + DatabaseConfig databaseConfig = + ZKMetadataProvider.getDatabaseConfig(_helixManager.getHelixPropertyStore(), databaseName); + if (databaseConfig != null && databaseConfig.getQuotaConfig() != null + && databaseConfig.getQuotaConfig().getMaxQPS() != -1) { + return databaseConfig.getQuotaConfig().getMaxQPS(); + } + return _defaultQpsQuotaForDatabase; + } + + /** + * Creates a new database rate limiter. Will not update the database rate limiter if it already exists. + * @param databaseName database name for which rate limiter needs to be created + */ + public void createDatabaseRateLimiter(String databaseName) { + if (_databaseRateLimiterMap.containsKey(databaseName)) { + return; + } + createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName)); + } + + /** + * Build an empty rate limiter in the new query quota entity, or set the rate limiter to null in an existing query + * quota entity. + */ + private void buildEmptyOrResetDatabaseRateLimiter(String databaseName) { + QueryQuotaEntity queryQuotaEntity = _databaseRateLimiterMap.get(databaseName); + if (queryQuotaEntity == null) { + // Create an QueryQuotaEntity object without setting a rate limiter. + queryQuotaEntity = new QueryQuotaEntity(null, new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), + new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), 0, 0, 0); + _databaseRateLimiterMap.put(databaseName, queryQuotaEntity); + } else { + // Set rate limiter to null for an existing QueryQuotaEntity object. + queryQuotaEntity.setRateLimiter(null); + } + } + /** * Build an empty rate limiter in the new query quota entity, or set the rate limiter to null in an existing query * quota entity. @@ -279,6 +414,20 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan } } + @Override + public boolean acquireDatabase(String databaseName) { + // Return true if query quota is disabled in the current broker. + if (isQueryRateLimitDisabled()) { + return true; + } + QueryQuotaEntity queryQuota = _databaseRateLimiterMap.get(databaseName); + if (queryQuota == null) { + return true; + } + LOGGER.debug("Trying to acquire token for database: {}", databaseName); + return tryAcquireToken(databaseName, queryQuota); + } + /** * {@inheritDoc} * <p>Acquires a token from rate limiter based on the table name. @@ -291,7 +440,6 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan if (isQueryRateLimitDisabled()) { return true; } - LOGGER.debug("Trying to acquire token for table: {}", tableName); String offlineTableName = null; String realtimeTableName = null; QueryQuotaEntity offlineTableQueryQuotaEntity = null; @@ -311,21 +459,27 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan realtimeTableQueryQuotaEntity = _rateLimiterMap.get(realtimeTableName); } - boolean offlineQuotaOk = - offlineTableQueryQuotaEntity == null || tryAcquireToken(offlineTableName, offlineTableQueryQuotaEntity); - boolean realtimeQuotaOk = - realtimeTableQueryQuotaEntity == null || tryAcquireToken(realtimeTableName, realtimeTableQueryQuotaEntity); + boolean offlineQuotaOk = true; + if (offlineTableQueryQuotaEntity != null) { + LOGGER.debug("Trying to acquire token for table: {}", offlineTableName); + offlineQuotaOk = tryAcquireToken(offlineTableName, offlineTableQueryQuotaEntity); + } + boolean realtimeQuotaOk = true; + if (realtimeTableQueryQuotaEntity != null) { + LOGGER.debug("Trying to acquire token for table: {}", realtimeTableName); + realtimeQuotaOk = tryAcquireToken(realtimeTableName, realtimeTableQueryQuotaEntity); + } return offlineQuotaOk && realtimeQuotaOk; } /** * Try to acquire token from rate limiter. Emit the utilization of the qps quota if broker metric isn't null. - * @param tableNameWithType table name with type. + * @param resourceName resource name to acquire. * @param queryQuotaEntity query quota entity for type-specific table. * @return true if there's no qps quota for that table, or a token is acquired successfully. */ - private boolean tryAcquireToken(String tableNameWithType, QueryQuotaEntity queryQuotaEntity) { + private boolean tryAcquireToken(String resourceName, QueryQuotaEntity queryQuotaEntity) { // Use hit counter to count the number of hits. queryQuotaEntity.getQpsTracker().hit(); queryQuotaEntity.getMaxQpsTracker().hit(); @@ -340,7 +494,7 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan // Emit the qps capacity utilization rate. int numHits = queryQuotaEntity.getQpsTracker().getHitCount(); if (!rateLimiter.tryAcquire()) { - LOGGER.info("Quota is exceeded for table: {}. Per-broker rate: {}. Current qps: {}", tableNameWithType, + LOGGER.info("Quota is exceeded for table/database: {}. Per-broker rate: {}. Current qps: {}", resourceName, perBrokerRate, numHits); return false; } @@ -353,6 +507,11 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan return _rateLimiterMap.size(); } + @VisibleForTesting + public Map<String, QueryQuotaEntity> getDatabaseRateLimiterMap() { + return _databaseRateLimiterMap; + } + @VisibleForTesting public QueryQuotaEntity getRateLimiterForTable(String tableNameWithType) { return _rateLimiterMap.get(tableNameWithType); @@ -448,6 +607,19 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan numRebuilt++; } } + + // handle EV change for database query quotas + int onlineBrokerCount = HelixHelper.getOnlineInstanceFromExternalView(currentBrokerResourceEV).size(); + for (Map.Entry<String, QueryQuotaEntity> it : _databaseRateLimiterMap.entrySet()) { + QueryQuotaEntity quota = it.getValue(); + if (quota.getNumOnlineBrokers() != onlineBrokerCount) { + quota.setNumOnlineBrokers(onlineBrokerCount); + } + if (quota.getOverallRate() > 0) { + quota.setRateLimiter(RateLimiter.create(quota.getOverallRate() / onlineBrokerCount)); + } + } + if (isQueryRateLimitDisabled()) { LOGGER.info("Query rate limiting is currently disabled for this broker. So it won't take effect immediately."); } @@ -458,6 +630,27 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan numRebuilt, _rateLimiterMap.size()); } + /** + * Process query quota state change when cluster config gets changed + */ + public void processQueryRateLimitingClusterConfigChange() { + double oldDatabaseQpsQuota = _defaultQpsQuotaForDatabase; + _defaultQpsQuotaForDatabase = getDefaultQueryQuotaForDatabase(); + if (oldDatabaseQpsQuota == _defaultQpsQuotaForDatabase) { + return; + } + createOrUpdateDatabaseRateLimiter(new ArrayList<>(_databaseRateLimiterMap.keySet())); + } + + private double getDefaultQueryQuotaForDatabase() { + HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool(); + HelixConfigScope configScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER) + .forCluster(_helixManager.getClusterName()).build(); + return Double.parseDouble(helixAdmin.getConfig(configScope, + Collections.singletonList(CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND)) + .getOrDefault(CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND, "-1")); + } + /** * Process query quota state change when instance config gets changed */ diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java index 6fd335e4d3..57faef8778 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java @@ -26,4 +26,11 @@ public interface QueryQuotaManager { * @return {@code true} if the table quota has not been reached, {@code false} otherwise */ boolean acquire(String tableName); + + /** + * Try to acquire a quota for the given database. + * @param databaseName database name + * @return {@code true} if the database quota has not been reached, {@code false} otherwise + */ + boolean acquireDatabase(String databaseName); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index 6a839d5e36..dfac99bb82 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -506,6 +506,14 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ } // Validate QPS quota + String database = DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(tableName); + if (!_queryQuotaManager.acquireDatabase(database)) { + String errorMessage = + String.format("Request %d: %s exceeds query quota for database: %s", requestId, query, database); + LOGGER.info(errorMessage); + requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE); + return new BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR, errorMessage)); + } if (!_queryQuotaManager.acquire(tableName)) { String errorMessage = String.format("Request %d: %s exceeds query quota for table: %s", requestId, query, tableName); @@ -531,7 +539,7 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ if (!pinotQuery.isExplain() && _enableMultistageMigrationMetric) { // Check if the query is a v2 supported query - String database = DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(), httpHeaders); + database = DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(), httpHeaders); // Attempt to add the query to the compile queue; drop if queue is full if (!_multistageCompileQueryQueue.offer(Pair.of(query, database))) { LOGGER.trace("Not compiling query `{}` using the multi-stage query engine because the query queue is full", diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index c8f7c4c2f6..a1e82dbd53 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -129,10 +129,11 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { long compilationStartTimeNs = System.nanoTime(); long queryTimeoutMs; QueryEnvironment.QueryPlannerResult queryPlanResult; + String database; try { Long timeoutMsFromQueryOption = QueryOptionsUtils.getTimeoutMs(queryOptions); queryTimeoutMs = timeoutMsFromQueryOption != null ? timeoutMsFromQueryOption : _brokerTimeoutMs; - String database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptions, httpHeaders); + database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptions, httpHeaders); QueryEnvironment queryEnvironment = new QueryEnvironment(database, _tableCache, _workerManager); switch (sqlNodeAndOptions.getSqlNode().getKind()) { case EXPLAIN: @@ -204,7 +205,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { } // Validate QPS quota - if (hasExceededQPSQuota(tableNames, requestContext)) { + if (hasExceededQPSQuota(database, tableNames, requestContext)) { String errorMessage = String.format("Request %d: %s exceeds query quota.", requestId, query); return new BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR, errorMessage)); } @@ -327,7 +328,13 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { /** * Returns true if the QPS quota of the tables has exceeded. */ - private boolean hasExceededQPSQuota(Set<String> tableNames, RequestContext requestContext) { + private boolean hasExceededQPSQuota(@Nullable String database, Set<String> tableNames, + RequestContext requestContext) { + if (database != null && !_queryQuotaManager.acquireDatabase(database)) { + LOGGER.warn("Request {}: query exceeds quota for database: {}", requestContext.getRequestId(), database); + requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE); + return true; + } for (String tableName : tableNames) { if (!_queryQuotaManager.acquire(tableName)) { LOGGER.warn("Request {}: query exceeds quota for table: {}", requestContext.getRequestId(), tableName); diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java index a22f8bdb57..760f3170f9 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.broker.queryquota; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -27,7 +28,6 @@ import org.apache.helix.HelixManager; import org.apache.helix.InstanceType; import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.manager.zk.ZKHelixManager; -import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.model.ExternalView; import org.apache.helix.model.HelixConfigScope; import org.apache.helix.store.zk.ZkHelixPropertyStore; @@ -38,6 +38,7 @@ import org.apache.helix.zookeeper.impl.client.ZkClient; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.utils.ZkStarter; +import org.apache.pinot.spi.config.DatabaseConfig; import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -58,10 +59,17 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { private HelixManager _helixManager; private HelixExternalViewBasedQueryQuotaManager _queryQuotaManager; private ZkStarter.ZookeeperInstance _zookeeperInstance; + private static final Map<String, String> CLUSTER_CONFIG_MAP = new HashMap<>(); private static final String RAW_TABLE_NAME = "testTable"; private static final String OFFLINE_TABLE_NAME = RAW_TABLE_NAME + "_OFFLINE"; private static final String REALTIME_TABLE_NAME = RAW_TABLE_NAME + "_REALTIME"; private static final String BROKER_INSTANCE_ID = "broker_instance_1"; + private static final long TABLE_MAX_QPS = 25; + private static final String TABLE_MAX_QPS_STR = String.valueOf(TABLE_MAX_QPS); + private static final long DATABASE_HIGH_QPS = 40; + private static final String DATABASE_HIGH_QPS_STR = String.valueOf(DATABASE_HIGH_QPS); + private static final long DATABASE_LOW_QPS = 10; + private static final String DATABASE_LOW_QPS_STR = String.valueOf(DATABASE_LOW_QPS); @BeforeTest public void beforeTest() { @@ -82,7 +90,6 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { } public class FakeHelixManager extends ZKHelixManager { - private ZkHelixPropertyStore<ZNRecord> _propertyStore; FakeHelixManager(String clusterName, String instanceName, InstanceType instanceType, String zkAddress) { super(clusterName, instanceName, instanceType, zkAddress); @@ -90,13 +97,6 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer()); _zkclient.deleteRecursively("/" + clusterName + "/PROPERTYSTORE"); _zkclient.createPersistent("/" + clusterName + "/PROPERTYSTORE", true); - setPropertyStore(clusterName); - } - - void setPropertyStore(String clusterName) { - _propertyStore = - new ZkHelixPropertyStore<>(new ZkBaseDataAccessor<ZNRecord>(_zkclient), "/" + clusterName + "/PROPERTYSTORE", - null); } void closeZkClient() { @@ -119,8 +119,16 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { @Override public Map<String, String> getConfig(HelixConfigScope scope, List<String> keys) { + if (scope.getType().equals(HelixConfigScope.ConfigScopeProperty.CLUSTER)) { + return CLUSTER_CONFIG_MAP; + } return _instanceConfigMap; } + + @Override + public ExternalView getResourceExternalView(String clusterName, String resourceName) { + return generateBrokerResource(OFFLINE_TABLE_NAME); + } } @AfterMethod @@ -129,8 +137,11 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { _testPropertyStore.reset(); ZKMetadataProvider.removeResourceConfigFromPropertyStore(_testPropertyStore, OFFLINE_TABLE_NAME); ZKMetadataProvider.removeResourceConfigFromPropertyStore(_testPropertyStore, REALTIME_TABLE_NAME); + ZKMetadataProvider.removeDatabaseConfig(_testPropertyStore, CommonConstants.DEFAULT_DATABASE); + CLUSTER_CONFIG_MAP.clear(); } _queryQuotaManager.cleanUpRateLimiterMap(); + _queryQuotaManager.getDatabaseRateLimiterMap().clear(); } @AfterTest @@ -152,17 +163,129 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1); // All the request should be passed. - runQueries(70, 10); + runQueries(); + + _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME); + Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); + } + + @Test + public void testOfflineTableNotnullQuotaWithHigherDefaultDatabaseQuota() + throws Exception { + ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME); + TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME); + ZKMetadataProvider.setTableConfig(_testPropertyStore, tableConfig); + setQps(tableConfig); + _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); + Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1); + _queryQuotaManager.createDatabaseRateLimiter(CommonConstants.DEFAULT_DATABASE); + Assert.assertEquals(_queryQuotaManager.getDatabaseRateLimiterMap().size(), 1); + + setDefaultDatabaseQps("40"); + // qps withing table and default database qps quota + runQueries(25, false); + // qps exceeding table qps quota but withing default database quota + runQueries(40, true); _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME); Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); } @Test - public void testOfflineTableWithNullQuotaAndNoRealtimeTableConfig() + public void testOfflineTableNotnullQuotaWithLowerDefaultDatabaseQuota() throws Exception { ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME); TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME); + ZKMetadataProvider.setTableConfig(_testPropertyStore, tableConfig); + setQps(tableConfig); + _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); + Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1); + _queryQuotaManager.createDatabaseRateLimiter(CommonConstants.DEFAULT_DATABASE); + Assert.assertEquals(_queryQuotaManager.getDatabaseRateLimiterMap().size(), 1); + + setDefaultDatabaseQps(DATABASE_LOW_QPS_STR); + // qps withing table and default database qps quota + runQueries(DATABASE_LOW_QPS, false); + // qps withing table qps quota but exceeding default database quota + runQueries(TABLE_MAX_QPS, true); + + _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME); + Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); + } + + @Test + public void testOfflineTableNotnullQuotaWithHigherDatabaseQuota() + throws Exception { + ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME); + TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME); + ZKMetadataProvider.setTableConfig(_testPropertyStore, tableConfig); + setQps(tableConfig); + _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); + Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1); + + DatabaseConfig databaseConfig = generateDefaultDatabaseConfig(); + setHigherDatabaseQps(databaseConfig); + // qps withing table and database qps quota + runQueries(TABLE_MAX_QPS, false); + // qps exceeding table qps quota but within database quota + runQueries(DATABASE_HIGH_QPS, true); + + _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME); + Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); + } + + @Test + public void testOfflineTableNotnullQuotaWithLowerDatabaseQuota() + throws Exception { + ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME); + TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME); + ZKMetadataProvider.setTableConfig(_testPropertyStore, tableConfig); + setQps(tableConfig); + _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); + Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1); + + DatabaseConfig databaseConfig = generateDefaultDatabaseConfig(); + setLowerDatabaseQps(databaseConfig); + // qps withing table and database qps quota + runQueries(DATABASE_LOW_QPS, false); + // qps within table qps quota but exceeding database quota + runQueries(TABLE_MAX_QPS, true); + + _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME); + Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); + } + + @Test + public void testCreateOrUpdateDatabaseRateLimiter() { + List<String> dbList = new ArrayList<>(2); + dbList.add("db1"); + dbList.add("db2"); + dbList.add("db3"); + DatabaseConfig db1 = new DatabaseConfig(dbList.get(0), new QuotaConfig(null, null)); + DatabaseConfig db2 = new DatabaseConfig(dbList.get(1), new QuotaConfig(null, "1")); + DatabaseConfig db3 = new DatabaseConfig(dbList.get(2), new QuotaConfig(null, "2")); + ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db1); + ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db2); + ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db3); + dbList.forEach(db -> _queryQuotaManager.createDatabaseRateLimiter(db)); + Map<String, QueryQuotaEntity> dbQuotaMap = _queryQuotaManager.getDatabaseRateLimiterMap(); + Assert.assertNull(dbQuotaMap.get(dbList.get(0)).getRateLimiter()); + Assert.assertEquals(dbQuotaMap.get(dbList.get(1)).getRateLimiter().getRate(), 1); + Assert.assertEquals(dbQuotaMap.get(dbList.get(2)).getRateLimiter().getRate(), 2); + db1.setQuotaConfig(new QuotaConfig(null, "1")); + db2.setQuotaConfig(new QuotaConfig(null, "2")); + ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db1); + ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db2); + dbList.forEach(db -> _queryQuotaManager.updateDatabaseRateLimiter(db)); + Assert.assertEquals(dbQuotaMap.get(dbList.get(0)).getRateLimiter().getRate(), 1); + Assert.assertEquals(dbQuotaMap.get(dbList.get(1)).getRateLimiter().getRate(), 2); + Assert.assertEquals(dbQuotaMap.get(dbList.get(2)).getRateLimiter().getRate(), 2); + } + + @Test + public void testOfflineTableWithNullQuotaAndNoRealtimeTableConfig() { + ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME); + TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME); _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1); QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(OFFLINE_TABLE_NAME); @@ -194,7 +317,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { @Test public void testOfflineTableWithNullQuotaButWithRealtimeTableConfigNotNullQpsConfig() throws Exception { - QuotaConfig quotaConfig = new QuotaConfig("6G", "100.00"); + QuotaConfig quotaConfig = new QuotaConfig("6G", TABLE_MAX_QPS_STR); TableConfig realtimeTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setQuotaConfig(quotaConfig) .setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND") @@ -220,7 +343,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { brokerResource.setState(REALTIME_TABLE_NAME, BROKER_INSTANCE_ID, "ONLINE"); brokerResource.setState(REALTIME_TABLE_NAME, "broker_instance_2", "OFFLINE"); - QuotaConfig quotaConfig = new QuotaConfig("6G", "100.00"); + QuotaConfig quotaConfig = new QuotaConfig("6G", TABLE_MAX_QPS_STR); TableConfig realtimeTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setQuotaConfig(quotaConfig) .setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND") @@ -241,7 +364,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 2); // Rate limiter generates 1 token every 10 milliseconds, have to make it sleep for a while. - runQueries(70, 10L); + runQueries(); _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME); // Since real-time table still has the qps quota, the size of the hash map becomes 1. @@ -262,7 +385,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1); - runQueries(70, 10L); + runQueries(); _queryQuotaManager.dropTableQueryQuota(REALTIME_TABLE_NAME); Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); @@ -278,7 +401,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1); - runQueries(70, 10L); + runQueries(); ZKMetadataProvider.removeResourceConfigFromPropertyStore(_testPropertyStore, REALTIME_TABLE_NAME); _queryQuotaManager.processQueryRateLimitingExternalViewChange(brokerResource); @@ -315,9 +438,8 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { } @Test - public void testRealtimeTableWithNullQuotaButWithOfflineTableConfigNotNullQpsConfig() - throws Exception { - QuotaConfig quotaConfig = new QuotaConfig("6G", "100.00"); + public void testRealtimeTableWithNullQuotaButWithOfflineTableConfigNotNullQpsConfig() { + QuotaConfig quotaConfig = new QuotaConfig("6G", TABLE_MAX_QPS_STR); TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setQuotaConfig(quotaConfig) .setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND") @@ -376,37 +498,65 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { return builder.build(); } + private DatabaseConfig generateDefaultDatabaseConfig() { + return new DatabaseConfig(CommonConstants.DEFAULT_DATABASE, null); + } + + private void setLowerDatabaseQps(DatabaseConfig databaseConfig) { + setDatabaseQps(databaseConfig, DATABASE_LOW_QPS_STR); + } + + private void setHigherDatabaseQps(DatabaseConfig databaseConfig) { + setDatabaseQps(databaseConfig, DATABASE_HIGH_QPS_STR); + } + + private void setDefaultDatabaseQps(String maxQps) { + ZKMetadataProvider.removeDatabaseConfig(_testPropertyStore, CommonConstants.DEFAULT_DATABASE); + CLUSTER_CONFIG_MAP.put(CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND, maxQps); + _queryQuotaManager.processQueryRateLimitingClusterConfigChange(); + } + + private void setDatabaseQps(DatabaseConfig databaseConfig, String maxQps) { + QuotaConfig quotaConfig = new QuotaConfig(null, maxQps); + databaseConfig.setQuotaConfig(quotaConfig); + ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, databaseConfig); + _queryQuotaManager.createDatabaseRateLimiter(CommonConstants.DEFAULT_DATABASE); + } + private void setQps(TableConfig tableConfig) { - QuotaConfig quotaConfig = new QuotaConfig(null, "100.00"); + QuotaConfig quotaConfig = new QuotaConfig(null, TABLE_MAX_QPS_STR); tableConfig.setQuotaConfig(quotaConfig); } - private ExternalView generateBrokerResource(String tableName) { + private static ExternalView generateBrokerResource(String tableName) { ExternalView brokerResource = new ExternalView(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); brokerResource.setState(tableName, BROKER_INSTANCE_ID, "ONLINE"); brokerResource.setState(tableName, "broker_instance_2", "OFFLINE"); return brokerResource; } - private void runQueries(int numOfTimesToRun, long millis) + private void runQueries() throws InterruptedException { - int count = 0; - for (int i = 0; i < numOfTimesToRun; i++) { - Assert.assertTrue(_queryQuotaManager.acquire(RAW_TABLE_NAME)); - count++; - Thread.sleep(millis); - } - Assert.assertEquals(count, numOfTimesToRun); + runQueries(TABLE_MAX_QPS, false); + //increase the qps and some of the queries should be throttled. + runQueries(TABLE_MAX_QPS * 2, true); + } - //Reduce the time of sleeping and some of the queries should be throttled. - count = 0; - millis /= 2; - for (int i = 0; i < numOfTimesToRun; i++) { + // try to keep the qps below 50 to ensure that the time lost between 2 query runs on top of the sleepMillis + // is not comparable to sleepMillis, else the actual qps would end being lot lower than required qps + private void runQueries(double qps, boolean shouldFail) + throws InterruptedException { + int failCount = 0; + long sleepMillis = (long) (1000 / qps); + for (int i = 0; i < qps; i++) { + if (!_queryQuotaManager.acquireDatabase(CommonConstants.DEFAULT_DATABASE)) { + failCount++; + } if (!_queryQuotaManager.acquire(RAW_TABLE_NAME)) { - count++; + failCount++; } - Thread.sleep(millis); + Thread.sleep(sleepMillis); } - Assert.assertTrue(count > 0 && count < numOfTimesToRun); + Assert.assertTrue((failCount == 0 && !shouldFail) || (failCount != 0 && shouldFail)); } } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java index f1a6dfe33f..7dfbdd75b9 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java @@ -172,6 +172,7 @@ public class BaseSingleStageBrokerRequestHandlerTest { when(routingManager.getRoutingTable(any(), Mockito.anyLong())).thenReturn(rt); QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class); when(queryQuotaManager.acquire(anyString())).thenReturn(true); + when(queryQuotaManager.acquireDatabase(anyString())).thenReturn(true); CountDownLatch latch = new CountDownLatch(1); long[] testRequestId = {-1}; BrokerMetrics.register(mock(BrokerMetrics.class)); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/messages/DatabaseConfigRefreshMessage.java b/pinot-common/src/main/java/org/apache/pinot/common/messages/DatabaseConfigRefreshMessage.java new file mode 100644 index 0000000000..5511f26b27 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/messages/DatabaseConfigRefreshMessage.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.messages; + +import java.util.UUID; +import org.apache.helix.model.Message; +import org.apache.helix.zookeeper.datamodel.ZNRecord; + + +/** + * This (Helix) message is sent from the controller to brokers when a request is received to update the database config. + */ +public class DatabaseConfigRefreshMessage extends Message { + public static final String REFRESH_DATABASE_CONFIG_MSG_SUB_TYPE = "REFRESH_DATABASE_CONFIG"; + + private static final String DATABASE_NAME_KEY = "databaseName"; + + /** + * Constructor for the sender. + */ + public DatabaseConfigRefreshMessage(String databaseName) { + super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString()); + setMsgSubType(REFRESH_DATABASE_CONFIG_MSG_SUB_TYPE); + // Give it infinite time to process the message, as long as session is alive + setExecutionTimeout(-1); + // Set the Pinot specific fields + ZNRecord znRecord = getRecord(); + znRecord.setSimpleField(DATABASE_NAME_KEY, databaseName); + } + + /** + * Constructor for the receiver. + */ + public DatabaseConfigRefreshMessage(Message message) { + super(message.getRecord()); + if (!message.getMsgSubType().equals(REFRESH_DATABASE_CONFIG_MSG_SUB_TYPE)) { + throw new IllegalArgumentException("Invalid message subtype:" + message.getMsgSubType()); + } + } + + public String getDatabaseName() { + return getRecord().getSimpleField(DATABASE_NAME_KEY); + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java index 0fdf94388a..faf9b6799c 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java @@ -18,8 +18,10 @@ */ package org.apache.pinot.common.metadata; +import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -39,11 +41,14 @@ import org.apache.pinot.common.utils.SchemaUtils; import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils; import org.apache.pinot.common.utils.config.TableConfigUtils; import org.apache.pinot.spi.config.ConfigUtils; +import org.apache.pinot.spi.config.DatabaseConfig; +import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.user.UserConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.StringUtil; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.zookeeper.data.Stat; @@ -62,6 +67,7 @@ public class ZKMetadataProvider { private static final String PROPERTYSTORE_SEGMENTS_PREFIX = "/SEGMENTS"; private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS"; private static final String PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX = "/INSTANCE_PARTITIONS"; + private static final String PROPERTYSTORE_DATABASE_CONFIGS_PREFIX = "/CONFIGS/DATABASE"; private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX = "/CONFIGS/TABLE"; private static final String PROPERTYSTORE_USER_CONFIGS_PREFIX = "/CONFIGS/USER"; private static final String PROPERTYSTORE_INSTANCE_CONFIGS_PREFIX = "/CONFIGS/INSTANCE"; @@ -73,6 +79,75 @@ public class ZKMetadataProvider { propertyStore.set(constructPropertyStorePathForUserConfig(username), znRecord, AccessOption.PERSISTENT); } + /** + * Create database config, fail if exists. + * + * @return true if creation is successful. + */ + public static boolean createDatabaseConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, + DatabaseConfig databaseConfig) { + String databaseName = databaseConfig.getDatabaseName(); + String databaseConfigPath = constructPropertyStorePathForDatabaseConfig(databaseName); + ZNRecord databaseConfigZNRecord = toZNRecord(databaseConfig); + return propertyStore.create(databaseConfigPath, databaseConfigZNRecord, AccessOption.PERSISTENT); + } + + /** + * Update database config. + * + * @return true if update is successful. + */ + public static boolean setDatabaseConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, DatabaseConfig databaseConfig) { + String databaseName = databaseConfig.getDatabaseName(); + ZNRecord databaseConfigZNRecord = toZNRecord(databaseConfig); + return propertyStore.set(constructPropertyStorePathForDatabaseConfig(databaseName), databaseConfigZNRecord, + -1, AccessOption.PERSISTENT); + } + + /** + * Remove database config. + */ + @VisibleForTesting + public static void removeDatabaseConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String databaseName) { + propertyStore.remove(constructPropertyStorePathForDatabaseConfig(databaseName), AccessOption.PERSISTENT); + } + + private static ZNRecord toZNRecord(DatabaseConfig databaseConfig) { + ZNRecord databaseConfigZNRecord = new ZNRecord(databaseConfig.getDatabaseName()); + Map<String, String> simpleFields = new HashMap<>(); + simpleFields.put(DatabaseConfig.DATABASE_NAME_KEY, databaseConfig.getDatabaseName()); + QuotaConfig quotaConfig = databaseConfig.getQuotaConfig(); + if (quotaConfig != null) { + simpleFields.put(DatabaseConfig.QUOTA_CONFIG_KEY, quotaConfig.toJsonString()); + } + databaseConfigZNRecord.setSimpleFields(simpleFields); + return databaseConfigZNRecord; + } + + @Nullable + private static DatabaseConfig toDatabaseConfig(@Nullable ZNRecord znRecord) { + if (znRecord == null) { + return null; + } + try { + Map<String, String> simpleFields = znRecord.getSimpleFields(); + + // Mandatory fields + String databaseName = simpleFields.get(DatabaseConfig.DATABASE_NAME_KEY); + + // Optional fields + QuotaConfig quotaConfig = null; + String quotaConfigString = simpleFields.get(DatabaseConfig.QUOTA_CONFIG_KEY); + if (quotaConfigString != null) { + quotaConfig = JsonUtils.stringToObject(quotaConfigString, QuotaConfig.class); + } + return new DatabaseConfig(databaseName, quotaConfig); + } catch (Exception e) { + LOGGER.error("Caught exception while creating database config from ZNRecord: {}", znRecord.getId(), e); + return null; + } + } + @Deprecated public static void setTableConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String tableNameWithType, ZNRecord znRecord) { @@ -177,6 +252,10 @@ public class ZKMetadataProvider { return StringUtil.join("/", PROPERTYSTORE_CONTROLLER_JOBS_PREFIX, jobType); } + public static String constructPropertyStorePathForDatabaseConfig(String resourceName) { + return StringUtil.join("/", PROPERTYSTORE_DATABASE_CONFIGS_PREFIX, resourceName); + } + public static String constructPropertyStorePathForResourceConfig(String resourceName) { return StringUtil.join("/", PROPERTYSTORE_TABLE_CONFIGS_PREFIX, resourceName); } @@ -360,6 +439,12 @@ public class ZKMetadataProvider { .collect(Collectors.toMap(UserConfig::getUsernameWithComponent, u -> u)); } + @Nullable + public static DatabaseConfig getDatabaseConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String databaseName) { + return toDatabaseConfig(propertyStore.get(constructPropertyStorePathForDatabaseConfig(databaseName), null, + AccessOption.PERSISTENT)); + } + @Nullable public static TableConfig getTableConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String tableNameWithType) { return toTableConfig(propertyStore.get(constructPropertyStorePathForResourceConfig(tableNameWithType), null, diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java index 809592bbc7..3691e0063c 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java @@ -155,4 +155,18 @@ public class DatabaseUtils { String database = databaseFromHeaders != null ? databaseFromHeaders : databaseFromOptions; return Objects.requireNonNullElse(database, CommonConstants.DEFAULT_DATABASE); } + + /** + * Extract the database name from the prefix of fully qualified table name. + * If no prefix is present "default" database is returned + */ + public static String extractDatabaseFromFullyQualifiedTableName(String fullyQualifiedTableName) { + String[] split = StringUtils.split(fullyQualifiedTableName, '.'); + return split.length == 1 ? CommonConstants.DEFAULT_DATABASE : split[0]; + } + + public static String extractDatabaseFromHttpHeaders(HttpHeaders headers) { + String databaseName = headers.getHeaderString(CommonConstants.DATABASE); + return databaseName == null ? CommonConstants.DEFAULT_DATABASE : databaseName; + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java index 9fd4584207..eecf2d0778 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java @@ -27,21 +27,34 @@ import io.swagger.annotations.Authorization; import io.swagger.annotations.SecurityDefinition; import io.swagger.annotations.SwaggerDefinition; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import javax.inject.Inject; +import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.GET; +import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.helix.HelixAdmin; +import org.apache.helix.model.HelixConfigScope; +import org.apache.helix.model.builder.HelixConfigScopeBuilder; +import org.apache.pinot.common.utils.DatabaseUtils; +import org.apache.pinot.controller.api.exception.ControllerApplicationException; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.core.auth.Actions; import org.apache.pinot.core.auth.Authorize; import org.apache.pinot.core.auth.TargetType; +import org.apache.pinot.spi.config.DatabaseConfig; +import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,9 +63,14 @@ import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K @Api(tags = Constants.DATABASE_TAG, authorizations = {@Authorization(value = SWAGGER_AUTHORIZATION_KEY)}) -@SwaggerDefinition(securityDefinition = @SecurityDefinition(apiKeyAuthDefinitions = @ApiKeyAuthDefinition(name = - HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY, - description = "The format of the key is ```\"Basic <token>\" or \"Bearer <token>\"```"))) +@SwaggerDefinition(securityDefinition = @SecurityDefinition(apiKeyAuthDefinitions = { + @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, + key = SWAGGER_AUTHORIZATION_KEY, + description = "The format of the key is ```\"Basic <token>\" or \"Bearer <token>\"```"), + @ApiKeyAuthDefinition(name = CommonConstants.DATABASE, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, + key = CommonConstants.DATABASE, + description = "Database context passed through http header. If no context is provided 'default' database " + + "context will be considered.")})) @Path("/") public class PinotDatabaseRestletResource { public static final Logger LOGGER = LoggerFactory.getLogger(PinotDatabaseRestletResource.class); @@ -108,6 +126,67 @@ public class PinotDatabaseRestletResource { } return new DeleteDatabaseResponse(deletedTables, failedTables, dryRun); } + + /** + * API to update the quota configs for database + * If database config is not present it will be created implicitly + */ + @POST + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.APPLICATION_JSON) + @Path("/databases/{databaseName}/quotas") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.UPDATE_DATABASE_QUOTA) + @ApiOperation(value = "Update database quotas", notes = "Update database quotas") + public SuccessResponse addTable( + @PathParam("databaseName") String databaseName, @QueryParam("maxQueriesPerSecond") String queryQuota, + @Context HttpHeaders httpHeaders) { + if (!databaseName.equals(DatabaseUtils.extractDatabaseFromHttpHeaders(httpHeaders))) { + throw new ControllerApplicationException(LOGGER, "Database config name and request context does not match", + Response.Status.BAD_REQUEST); + } + try { + DatabaseConfig databaseConfig = _pinotHelixResourceManager.getDatabaseConfig(databaseName); + QuotaConfig quotaConfig = new QuotaConfig(null, queryQuota); + if (databaseConfig == null) { + databaseConfig = new DatabaseConfig(databaseName, quotaConfig); + _pinotHelixResourceManager.addDatabaseConfig(databaseConfig); + } else { + databaseConfig.setQuotaConfig(quotaConfig); + _pinotHelixResourceManager.updateDatabaseConfig(databaseConfig); + } + return new SuccessResponse("Database quotas for database config " + databaseName + " successfully updated"); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * API to get database quota configs. + * Will return null if database config is not defined or database quotas are not defined + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/databases/{databaseName}/quotas") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_DATABASE_QUOTA) + @ApiOperation(value = "Get database quota configs", notes = "Get database quota configs") + public QuotaConfig getDatabaseQuota( + @PathParam("databaseName") String databaseName, @Context HttpHeaders httpHeaders) { + if (!databaseName.equals(DatabaseUtils.extractDatabaseFromHttpHeaders(httpHeaders))) { + throw new ControllerApplicationException(LOGGER, "Database config name and request context does not match", + Response.Status.BAD_REQUEST); + } + DatabaseConfig databaseConfig = _pinotHelixResourceManager.getDatabaseConfig(databaseName); + if (databaseConfig != null) { + return databaseConfig.getQuotaConfig(); + } + HelixAdmin helixAdmin = _pinotHelixResourceManager.getHelixAdmin(); + HelixConfigScope configScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER) + .forCluster(_pinotHelixResourceManager.getHelixClusterName()).build(); + String defaultQueryQuota = helixAdmin.getConfig(configScope, + Collections.singletonList(CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND)) + .getOrDefault(CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND, null); + return new QuotaConfig(null, defaultQueryQuota); + } } class DeleteDatabaseResponse { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 2b835faaae..546f4c0105 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -102,6 +102,7 @@ import org.apache.pinot.common.lineage.LineageEntryState; import org.apache.pinot.common.lineage.SegmentLineage; import org.apache.pinot.common.lineage.SegmentLineageAccessHelper; import org.apache.pinot.common.lineage.SegmentLineageUtils; +import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage; import org.apache.pinot.common.messages.RoutingTableRebuildMessage; import org.apache.pinot.common.messages.RunPeriodicTaskMessage; import org.apache.pinot.common.messages.SegmentRefreshMessage; @@ -154,6 +155,7 @@ import org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObs import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils; import org.apache.pinot.controller.helix.starter.HelixConfig; import org.apache.pinot.segment.spi.SegmentMetadata; +import org.apache.pinot.spi.config.DatabaseConfig; import org.apache.pinot.spi.config.instance.Instance; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableStats; @@ -1639,6 +1641,29 @@ public class PinotHelixResourceManager { LOGGER.info("Successfully add user:{}", usernamePrefix); } + /** + * Creates database config and sends out a database config refresh message. + * @param databaseConfig database config to be created + */ + public void addDatabaseConfig(DatabaseConfig databaseConfig) { + if (!ZKMetadataProvider.createDatabaseConfig(_propertyStore, databaseConfig)) { + throw new RuntimeException("Failed to create database config for database: " + databaseConfig.getDatabaseName()); + } + sendDatabaseConfigRefreshMessage(databaseConfig.getDatabaseName()); + } + + /** + * Updates database config and sends out a database config refresh message. + * @param databaseConfig database config to be created + */ + public void updateDatabaseConfig(DatabaseConfig databaseConfig) { + if (!ZKMetadataProvider.setDatabaseConfig(_propertyStore, databaseConfig)) { + throw new RuntimeException( + "Failed to update database config in Zookeeper for database: " + databaseConfig.getDatabaseName()); + } + sendDatabaseConfigRefreshMessage(databaseConfig.getDatabaseName()); + } + /** * Performs validations of table config and adds the table to zookeeper * @throws InvalidTableConfigException if validations fail @@ -2755,6 +2780,26 @@ public class PinotHelixResourceManager { } } + private void sendDatabaseConfigRefreshMessage(String databaseName) { + DatabaseConfigRefreshMessage databaseConfigRefreshMessage = new DatabaseConfigRefreshMessage(databaseName); + + // Send database config refresh message to brokers + Criteria recipientCriteria = new Criteria(); + recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); + recipientCriteria.setInstanceName("%"); + recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE); + recipientCriteria.setSessionSpecific(true); + // Send message with no callback and infinite timeout on the recipient + int numMessagesSent = + _helixZkManager.getMessagingService().send(recipientCriteria, databaseConfigRefreshMessage, null, -1); + if (numMessagesSent > 0) { + LOGGER.info("Sent {} database config refresh messages to brokers for database: {}", numMessagesSent, + databaseName); + } else { + LOGGER.warn("No database config refresh message sent to brokers for database: {}", databaseName); + } + } + private void sendRoutingTableRebuildMessage(String tableNameWithType) { RoutingTableRebuildMessage routingTableRebuildMessage = new RoutingTableRebuildMessage(tableNameWithType); @@ -2982,6 +3027,17 @@ public class PinotHelixResourceManager { return _helixAdmin.getResourceExternalView(_helixClusterName, tableNameWithType); } + /** + * Get the database config for the given database name. + * + * @param databaseName database name + * @return Database config + */ + @Nullable + public DatabaseConfig getDatabaseConfig(String databaseName) { + return ZKMetadataProvider.getDatabaseConfig(_propertyStore, databaseName); + } + /** * Get the table config for the given table name with type suffix. * diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java index 51cab16711..62f1b2e4d6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java @@ -70,6 +70,7 @@ public class Actions { public static final String GET_USER = "GetUser"; public static final String GET_VERSION = "GetVersion"; public static final String GET_ZNODE = "GetZnode"; + public static final String GET_DATABASE_QUOTA = "GetDatabaseQuota"; public static final String INGEST_FILE = "IngestFile"; public static final String RECOMMEND_CONFIG = "RecommendConfig"; public static final String RESET_SEGMENT = "ResetSegment"; @@ -86,6 +87,7 @@ public class Actions { public static final String REBALANCE_TENANT_TABLES = "RebalanceTenantTables"; public static final String UPDATE_TIME_INTERVAL = "UpdateTimeInterval"; public static final String UPDATE_USER = "UpdateUser"; + public static final String UPDATE_DATABASE_QUOTA = "UpdateDatabaseQuota"; public static final String UPDATE_ZNODE = "UpdateZnode"; public static final String UPLOAD_SEGMENT = "UploadSegment"; public static final String GET_INSTANCE_PARTITIONS = "GetInstancePartitions"; diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java new file mode 100644 index 0000000000..d1fb956f2c --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import java.net.URI; +import java.util.Properties; +import org.apache.pinot.broker.broker.helix.BaseBrokerStarter; +import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManagerTest; +import org.apache.pinot.client.ConnectionFactory; +import org.apache.pinot.client.JsonAsyncHttpPinotClientTransportFactory; +import org.apache.pinot.client.PinotClientException; +import org.apache.pinot.client.ResultSetGroup; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.spi.config.table.QuotaConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.apache.pinot.client.Connection.FAIL_ON_EXCEPTIONS; +import static org.testng.Assert.assertTrue; + + +/** + * This test suite is focused only on validating that the config changes are propagated properly as expected. + * Validations around different cases arising from cluster config, database config and table config are extensively + * tested as part of {@link HelixExternalViewBasedQueryQuotaManagerTest} + */ +public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest { + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + startController(); + startBrokers(1); + startServers(1); + + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createOfflineTableConfig(); + addTableConfig(tableConfig); + + Properties properties = new Properties(); + properties.put(FAIL_ON_EXCEPTIONS, "FALSE"); + _pinotConnection = ConnectionFactory.fromZookeeper(properties, getZkUrl() + "/" + getHelixClusterName(), + new JsonAsyncHttpPinotClientTransportFactory().withConnectionProperties(getPinotConnectionProperties()) + .buildTransport()); + } + + @AfterMethod + void resetQuotas() + throws Exception { + addQueryQuotaToClusterConfig(null); + addQueryQuotaToDatabaseConfig(null); + addQueryQuotaToTableConfig(null); + } + + @Test + public void testDefaultDatabaseQueryQuota() + throws Exception { + addQueryQuotaToClusterConfig(40); + testQueryRate(40); + } + + @Test + public void testDatabaseConfigQueryQuota() + throws Exception { + addQueryQuotaToDatabaseConfig(10); + testQueryRate(10); + } + + @Test + public void testDefaultDatabaseQueryQuotaOverride() + throws Exception { + addQueryQuotaToClusterConfig(25); + // override lower than default quota + addQueryQuotaToDatabaseConfig(10); + testQueryRate(10); + // override higher than default quota + addQueryQuotaToDatabaseConfig(40); + testQueryRate(40); + } + + @Test + public void testDatabaseQueryQuotaWithTableQueryQuota() + throws Exception { + addQueryQuotaToDatabaseConfig(25); + // table quota within database quota. Queries should fail upon table quota (10 qps) breach + addQueryQuotaToTableConfig(10); + testQueryRate(10); + // table quota more than database quota. Queries should fail upon database quota (25 qps) breach + addQueryQuotaToTableConfig(50); + testQueryRate(25); + } + + @Test + public void testDatabaseQueryQuotaWithTableQueryQuotaWithExtraBroker() + throws Exception { + BaseBrokerStarter brokerStarter = null; + try { + addQueryQuotaToDatabaseConfig(25); + addQueryQuotaToTableConfig(10); + // Add one more broker such that quota gets distributed equally among them + brokerStarter = startOneBroker(2); + // to allow change propagation to QueryQuotaManager + Thread.sleep(1000); + testQueryRate(10); + // drop table level quota so that database quota comes into effect + addQueryQuotaToTableConfig(null); + testQueryRate(25); + } finally { + if (brokerStarter != null) { + brokerStarter.stop(); + } + } + } + + /** + * Runs the query load with the max rate that the quota can allow and ensures queries are not failing. + * Then runs the query load with double the max rate and expects queries to fail due to quota breach. + * @param maxRate max rate allowed by the quota + */ + void testQueryRate(int maxRate) + throws Exception { + runQueries(maxRate, false); + //increase the qps and some of the queries should be throttled. + runQueries(maxRate * 2, true); + } + + // try to keep the qps below 50 to ensure that the time lost between 2 query runs on top of the sleepMillis + // is not comparable to sleepMillis, else the actual qps would end up being much lower than required qps + private void runQueries(double qps, boolean shouldFail) + throws Exception { + int failCount = 0; + long sleepMillis = (long) (1000 / qps); + for (int i = 0; i < qps * 2; i++) { + ResultSetGroup resultSetGroup = _pinotConnection.execute("SELECT COUNT(*) FROM " + getTableName()); + for (PinotClientException exception : resultSetGroup.getExceptions()) { + if (exception.getMessage().contains("QuotaExceededError")) { + failCount++; + break; + } + } + Thread.sleep(sleepMillis); + } + assertTrue((failCount == 0 && !shouldFail) || (failCount != 0 && shouldFail)); + } + + + public void addQueryQuotaToTableConfig(Integer maxQps) + throws Exception { + TableConfig tableConfig = getOfflineTableConfig(); + tableConfig.setQuotaConfig(new QuotaConfig(null, maxQps == null ? null : maxQps.toString())); + updateTableConfig(tableConfig); + // to allow change propagation to QueryQuotaManager + Thread.sleep(1000); + } + + public void addQueryQuotaToDatabaseConfig(Integer maxQps) + throws Exception { + String url = _controllerRequestURLBuilder.getBaseUrl() + "/databases/default/quotas"; + if (maxQps != null) { + url += "?maxQueriesPerSecond=" + maxQps; + } + HttpClient.wrapAndThrowHttpException(_httpClient.sendPostRequest(new URI(url), null, null)); + // to allow change propagation to QueryQuotaManager + Thread.sleep(1000); + } + + public void addQueryQuotaToClusterConfig(Integer maxQps) + throws Exception { + if (maxQps == null) { + HttpClient.wrapAndThrowHttpException(_httpClient.sendDeleteRequest(new URI( + _controllerRequestURLBuilder.forClusterConfigs() + "/" + + CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND))); + } else { + String payload = "{\"" + CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND + "\":\"" + maxQps + "\"}"; + HttpClient.wrapAndThrowHttpException( + _httpClient.sendJsonPostRequest(new URI(_controllerRequestURLBuilder.forClusterConfigs()), payload)); + } + // to allow change propagation to QueryQuotaManager + Thread.sleep(1000); + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/DatabaseConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/DatabaseConfig.java new file mode 100644 index 0000000000..c615fa5488 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/DatabaseConfig.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import javax.annotation.Nullable; +import org.apache.pinot.spi.config.table.QuotaConfig; + + +public class DatabaseConfig extends BaseJsonConfig { + public static final String QUOTA_CONFIG_KEY = "quota"; + public static final String DATABASE_NAME_KEY = "databaseName"; + private String _databaseName; + @JsonPropertyDescription("Resource quota associated with this database") + private QuotaConfig _quotaConfig; + + public DatabaseConfig(String databaseName, QuotaConfig quotaConfig) { + _databaseName = databaseName; + _quotaConfig = quotaConfig; + } + + public String getDatabaseName() { + return _databaseName; + } + + public void setDatabaseName(String databaseName) { + _databaseName = databaseName; + } + + @JsonProperty(QUOTA_CONFIG_KEY) + @Nullable + public QuotaConfig getQuotaConfig() { + return _quotaConfig; + } + + public void setQuotaConfig(QuotaConfig quotaConfig) { + _quotaConfig = quotaConfig; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index a069031b69..6769ae1894 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -78,6 +78,7 @@ public class CommonConstants { public static final String IS_SHUTDOWN_IN_PROGRESS = "shutdownInProgress"; public static final String QUERIES_DISABLED = "queriesDisabled"; public static final String QUERY_RATE_LIMIT_DISABLED = "queryRateLimitDisabled"; + public static final String DATABASE_MAX_QUERIES_PER_SECOND = "databaseMaxQueriesPerSecond"; public static final String INSTANCE_CONNECTED_METRIC_NAME = "helix.connected"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org