This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new abb38c3f26 Add support for application-level query quota. (#14226) abb38c3f26 is described below commit abb38c3f26ac27f13302417dd298db4b7eb3d90b Author: Bolek Ziobrowski <26925920+bziobrow...@users.noreply.github.com> AuthorDate: Tue Oct 22 15:55:53 2024 +0200 Add support for application-level query quota. (#14226) Adds a way to throttle queries (executed with both v1 or v2 engine) based on applicationName query option. Queries such as : ``` set applicationName='test'; select * from tables ``` --- .../broker/api/resources/PinotBrokerDebug.java | 11 + .../BrokerUserDefinedMessageHandlerFactory.java | 27 ++ .../HelixExternalViewBasedQueryQuotaManager.java | 284 ++++++++++++++++++--- .../pinot/broker/queryquota/QueryQuotaManager.java | 14 + .../requesthandler/BaseBrokerRequestHandler.java | 13 + .../MultiStageBrokerRequestHandler.java | 2 +- ...elixExternalViewBasedQueryQuotaManagerTest.java | 153 ++++++++++- .../BaseSingleStageBrokerRequestHandlerTest.java | 1 + .../ApplicationQpsQuotaRefreshMessage.java | 61 +++++ .../pinot/common/metadata/ZKMetadataProvider.java | 72 ++++++ .../pinot/controller/api/resources/Constants.java | 1 + .../PinotApplicationQuotaRestletResource.java | 139 ++++++++++ .../resources/PinotDatabaseRestletResource.java | 2 +- .../helix/core/PinotHelixResourceManager.java | 44 ++++ .../java/org/apache/pinot/core/auth/Actions.java | 2 + .../tests/BaseClusterIntegrationTest.java | 29 ++- .../tests/QueryQuotaClusterIntegrationTest.java | 168 ++++++++++-- .../apache/pinot/spi/utils/CommonConstants.java | 4 + 18 files changed, 955 insertions(+), 72 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java index 78a6dd324f..a220bc53a5 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java @@ -323,4 +323,15 @@ public class PinotBrokerDebug { @ApiParam(value = "Name of the database") @PathParam("databaseName") String databaseName) { return String.valueOf(_queryQuotaManager.getDatabaseQueryQuota(databaseName)); } + + @GET + @Path("debug/applicationQuotas/{applicationName}") + @Produces(MediaType.TEXT_PLAIN) + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_APPLICATION_QUERY_QUOTA) + @ApiOperation(value = "Get the active query quota being imposed on the application", notes = "This is a debug " + + "endpoint, and won't maintain backward compatibility") + public String getApplicationQueryQuota( + @ApiParam(value = "Name of the application") @PathParam("applicationName") String applicationName) { + return String.valueOf(_queryQuotaManager.getApplicationQueryQuota(applicationName)); + } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java index 2c2cc33532..f4da13621e 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java @@ -25,6 +25,7 @@ import org.apache.helix.messaging.handling.MessageHandlerFactory; import org.apache.helix.model.Message; import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager; import org.apache.pinot.broker.routing.BrokerRoutingManager; +import org.apache.pinot.common.messages.ApplicationQpsQuotaRefreshMessage; import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage; import org.apache.pinot.common.messages.RoutingTableRebuildMessage; import org.apache.pinot.common.messages.SegmentRefreshMessage; @@ -65,6 +66,8 @@ public class BrokerUserDefinedMessageHandlerFactory implements MessageHandlerFac return new RebuildRoutingTableMessageHandler(new RoutingTableRebuildMessage(message), context); case DatabaseConfigRefreshMessage.REFRESH_DATABASE_CONFIG_MSG_SUB_TYPE: return new RefreshDatabaseConfigMessageHandler(new DatabaseConfigRefreshMessage(message), context); + case ApplicationQpsQuotaRefreshMessage.REFRESH_APP_QUOTA_MSG_SUB_TYPE: + return new RefreshApplicationQpsQuotaMessageHandler(new ApplicationQpsQuotaRefreshMessage(message), context); default: // NOTE: Log a warning and return no-op message handler for unsupported message sub-types. This can happen when // a new message sub-type is added, and the sender gets deployed first while receiver is still running the @@ -162,6 +165,30 @@ public class BrokerUserDefinedMessageHandlerFactory implements MessageHandlerFac } } + private class RefreshApplicationQpsQuotaMessageHandler extends MessageHandler { + final String _applicationName; + + RefreshApplicationQpsQuotaMessageHandler(ApplicationQpsQuotaRefreshMessage applicationQpsAuotaRefreshMessage, + NotificationContext context) { + super(applicationQpsAuotaRefreshMessage, context); + _applicationName = applicationQpsAuotaRefreshMessage.getApplicationName(); + } + + @Override + public HelixTaskResult handleMessage() { + _queryQuotaManager.createOrUpdateApplicationRateLimiter(_applicationName); + HelixTaskResult result = new HelixTaskResult(); + result.setSuccess(true); + return result; + } + + @Override + public void onError(Exception e, ErrorCode code, ErrorType type) { + LOGGER.error("Got error while refreshing query quota for application: {} (error code: {}, error type: {})", + _applicationName, code, type, e); + } + } + private class RebuildRoutingTableMessageHandler extends MessageHandler { final String _tableNameWithType; diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java index b0684d0bc8..48c5c33d0a 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java @@ -58,7 +58,7 @@ import org.slf4j.LoggerFactory; /** * This class is to support the qps quota feature. - * It allows performing qps quota check at table level and database level + * It allows performing qps quota check at table level, database and application level. * For table level check it depends on the broker source change to update the dynamic rate limit, * which means it gets updated when a new table added or a broker restarted. * For database level check it depends on the broker as well as cluster config and database config change @@ -67,6 +67,11 @@ import org.slf4j.LoggerFactory; * - the database config is updated * - new table is assigned to the broker (rate limiter is created if not present) * - broker added or removed from cluster + * For application level check it depends on the broker as well as cluster config and application quota change + * to update the dynamic rate limit, which means it gets updated when + * - the default query quota at cluster config is updated + * - the application quota is updated (e.g. via rest api) + * - broker added or removed from cluster */ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHandler, QueryQuotaManager { private static final Logger LOGGER = LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class); @@ -81,7 +86,9 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan private final AtomicInteger _lastKnownBrokerResourceVersion = new AtomicInteger(-1); private final Map<String, QueryQuotaEntity> _rateLimiterMap = new ConcurrentHashMap<>(); private final Map<String, QueryQuotaEntity> _databaseRateLimiterMap = new ConcurrentHashMap<>(); + private final Map<String, QueryQuotaEntity> _applicationRateLimiterMap = new ConcurrentHashMap<>(); private double _defaultQpsQuotaForDatabase; + private double _defaultQpsQuotaForApplication; private HelixManager _helixManager; private ZkHelixPropertyStore<ZNRecord> _propertyStore; @@ -98,29 +105,66 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan _helixManager = helixManager; _propertyStore = _helixManager.getHelixPropertyStore(); _defaultQpsQuotaForDatabase = getDefaultQueryQuotaForDatabase(); + _defaultQpsQuotaForApplication = getDefaultQueryQuotaForApplication(); getQueryQuotaEnabledFlagFromInstanceConfig(); + + initializeApplicationQpsQuotas(); + } + + // read all app quotas from ZK and create rate limiters + private void initializeApplicationQpsQuotas() { + Map<String, Double> quotas = + ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore()); + + if (quotas == null || quotas.isEmpty()) { + return; + } + + ExternalView brokerResource = getBrokerResource(); + int numOnlineBrokers = getNumOnlineBrokers(brokerResource); + + for (Map.Entry<String, Double> entry : quotas.entrySet()) { + if (entry.getKey() == null) { + continue; + } + + String appName = entry.getKey(); + double appQpsQuota = + entry.getValue() != null && entry.getValue() != -1.0d ? entry.getValue() : _defaultQpsQuotaForApplication; + + if (appQpsQuota < 0) { + buildEmptyOrResetApplicationRateLimiter(appName); + continue; + } + + double perBrokerQpsQuota = appQpsQuota / numOnlineBrokers; + LOGGER.info("Adding new query rate limiter for application {} with rate {}.", appName, perBrokerQpsQuota); + QueryQuotaEntity queryQuotaEntity = + new QueryQuotaEntity(RateLimiter.create(perBrokerQpsQuota), new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), + new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), numOnlineBrokers, appQpsQuota, -1); + _applicationRateLimiterMap.put(appName, queryQuotaEntity); + } + + return; } @Override public void processClusterChange(HelixConstants.ChangeType changeType) { Preconditions.checkState(CHANGE_TYPES_TO_PROCESS.contains(changeType), "Illegal change type: " + changeType); if (changeType == HelixConstants.ChangeType.EXTERNAL_VIEW) { - ExternalView brokerResourceEV = HelixHelper - .getExternalViewForResource(_helixManager.getClusterManagmentTool(), _helixManager.getClusterName(), - CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); + ExternalView brokerResourceEV = getBrokerResource(); processQueryRateLimitingExternalViewChange(brokerResourceEV); } else if (changeType == HelixConstants.ChangeType.INSTANCE_CONFIG) { processQueryRateLimitingInstanceConfigChange(); } else { processQueryRateLimitingClusterConfigChange(); + processApplicationQueryRateLimitingClusterConfigChange(); } } public void initOrUpdateTableQueryQuota(String tableNameWithType) { TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); - ExternalView brokerResourceEV = HelixHelper - .getExternalViewForResource(_helixManager.getClusterManagmentTool(), _helixManager.getClusterName(), - CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); + ExternalView brokerResourceEV = getBrokerResource(); initOrUpdateTableQueryQuota(tableConfig, brokerResourceEV); } @@ -264,52 +308,103 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName)); } + /** + * Updates the application rate limiter if it already exists. It won't create a new rate limiter. + * + * @param applicationName application name for which rate limiter needs to be updated + */ + public void updateApplicationRateLimiter(String applicationName) { + if (!_applicationRateLimiterMap.containsKey(applicationName)) { + return; + } + createOrUpdateApplicationRateLimiter(applicationName); + } + // Caller method need not worry about getting lock on _databaseRateLimiterMap // as this method will do idempotent updates to the database rate limiters private synchronized void createOrUpdateDatabaseRateLimiter(List<String> databaseNames) { - ExternalView brokerResource = HelixHelper - .getExternalViewForResource(_helixManager.getClusterManagmentTool(), _helixManager.getClusterName(), - CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); + ExternalView brokerResource = getBrokerResource(); for (String databaseName : databaseNames) { - double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName); - if (databaseQpsQuota < 0) { + double qpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName); + if (qpsQuota < 0) { buildEmptyOrResetDatabaseRateLimiter(databaseName); continue; } int numOnlineBrokers = getNumOnlineBrokers(databaseName, brokerResource); - double perBrokerQpsQuota = databaseQpsQuota / numOnlineBrokers; - QueryQuotaEntity oldQueryQuotaEntity = _databaseRateLimiterMap.get(databaseName); - if (oldQueryQuotaEntity == null) { + double perBrokerQpsQuota = qpsQuota / numOnlineBrokers; + QueryQuotaEntity oldEntity = _databaseRateLimiterMap.get(databaseName); + if (oldEntity == null) { LOGGER.info("Adding new query rate limiter for database {} with rate {}.", databaseName, perBrokerQpsQuota); - QueryQuotaEntity queryQuotaEntity = new QueryQuotaEntity(RateLimiter.create(perBrokerQpsQuota), - new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), - numOnlineBrokers, databaseQpsQuota, -1); + QueryQuotaEntity queryQuotaEntity = + new QueryQuotaEntity(RateLimiter.create(perBrokerQpsQuota), + new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), + new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), + numOnlineBrokers, qpsQuota, -1); _databaseRateLimiterMap.put(databaseName, queryQuotaEntity); continue; } - boolean changeDetected = false; - double oldQuota = oldQueryQuotaEntity.getRateLimiter() != null ? oldQueryQuotaEntity.getRateLimiter().getRate() - : -1; - if (oldQueryQuotaEntity.getOverallRate() != databaseQpsQuota) { - changeDetected = true; - LOGGER.info("Overall quota changed for the database from {} to {}", oldQueryQuotaEntity.getOverallRate(), - databaseQpsQuota); - oldQueryQuotaEntity.setOverallRate(databaseQpsQuota); - } - if (oldQueryQuotaEntity.getNumOnlineBrokers() != numOnlineBrokers) { - changeDetected = true; - LOGGER.info("Number of online brokers changed for the database from {} to {}", - oldQueryQuotaEntity.getNumOnlineBrokers(), numOnlineBrokers); - oldQueryQuotaEntity.setNumOnlineBrokers(numOnlineBrokers); + checkQueryQuotaChanged(databaseName, oldEntity, qpsQuota, "database", numOnlineBrokers, perBrokerQpsQuota); + } + } + + public synchronized void createOrUpdateApplicationRateLimiter(String applicationName) { + createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName)); + } + + // Caller method need not worry about getting lock on _applicationRateLimiterMap + // as this method will do idempotent updates to the application rate limiters + private synchronized void createOrUpdateApplicationRateLimiter(List<String> applicationNames) { + ExternalView brokerResource = getBrokerResource(); + for (String appName : applicationNames) { + double qpsQuota = getEffectiveQueryQuotaOnApplication(appName); + if (qpsQuota < 0) { + buildEmptyOrResetApplicationRateLimiter(appName); + continue; } - if (!changeDetected) { - LOGGER.info("No change detected with the query rate limiter for database {}", databaseName); + int numOnlineBrokers = getNumOnlineBrokers(brokerResource); + double perBrokerQpsQuota = qpsQuota / numOnlineBrokers; + QueryQuotaEntity oldEntity = _applicationRateLimiterMap.get(appName); + if (oldEntity == null) { + LOGGER.info("Adding new query rate limiter for application {} with rate {}.", appName, perBrokerQpsQuota); + QueryQuotaEntity queryQuotaEntity = + new QueryQuotaEntity(RateLimiter.create(perBrokerQpsQuota), new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), + new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), numOnlineBrokers, qpsQuota, + -1); + _applicationRateLimiterMap.put(appName, queryQuotaEntity); continue; } - LOGGER.info("Updating existing query rate limiter for database {} from rate {} to {}", databaseName, oldQuota, - perBrokerQpsQuota); - oldQueryQuotaEntity.setRateLimiter(RateLimiter.create(perBrokerQpsQuota)); + checkQueryQuotaChanged(appName, oldEntity, qpsQuota, "application", numOnlineBrokers, perBrokerQpsQuota); + } + } + + private void checkQueryQuotaChanged(String appName, QueryQuotaEntity oldEntity, double qpsQuota, String quotaType, + int numOnlineBrokers, double perBrokerQpsQuota) { + boolean isChange = false; + double oldQuota = oldEntity.getRateLimiter() != null ? oldEntity.getRateLimiter().getRate() : -1; + if (oldEntity.getOverallRate() != qpsQuota) { + isChange = true; + LOGGER.info("Overall quota changed for the {} {} from {} to {}", quotaType, appName, oldEntity.getOverallRate(), + qpsQuota); + oldEntity.setOverallRate(qpsQuota); + } + if (oldEntity.getNumOnlineBrokers() != numOnlineBrokers) { + isChange = true; + LOGGER.info("Number of online brokers changed for the {} {} from {} to {}", + quotaType, appName, oldEntity.getNumOnlineBrokers(), numOnlineBrokers); + oldEntity.setNumOnlineBrokers(numOnlineBrokers); + } + if (!isChange) { + LOGGER.info("No change detected with the query rate limiter for {} {}", quotaType, appName); + return; } + LOGGER.info("Updating existing query rate limiter for {} {} from rate {} to {}", quotaType, appName, oldQuota, + perBrokerQpsQuota); + oldEntity.setRateLimiter(RateLimiter.create(perBrokerQpsQuota)); + } + + private ExternalView getBrokerResource() { + return HelixHelper.getExternalViewForResource(_helixManager.getClusterManagmentTool(), + _helixManager.getClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); } // Pulling this logic to a separate placeholder method so that the quota split logic @@ -321,6 +416,10 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan return HelixHelper.getOnlineInstanceFromExternalView(brokerResource).size(); } + private int getNumOnlineBrokers(ExternalView brokerResource) { + return HelixHelper.getOnlineInstanceFromExternalView(brokerResource).size(); + } + /** * Utility to get the effective query quota being imposed on a database. * It is computed based on the default quota set at cluster config and override set at database config @@ -337,6 +436,22 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan return _defaultQpsQuotaForDatabase; } + /** + * Utility to get the effective query quota being imposed on an application. It is computed based on the default quota + * set at cluster config. + * + * @param applicationName application name to get the query quota on. + * @return effective query quota limit being applied + */ + private double getEffectiveQueryQuotaOnApplication(String applicationName) { + Map<String, Double> quotas = + ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore()); + if (quotas != null && quotas.get(applicationName) != null && quotas.get(applicationName) != -1.0d) { + return quotas.get(applicationName); + } + return _defaultQpsQuotaForApplication; + } + /** * Creates a new database rate limiter. Will not update the database rate limiter if it already exists. * @param databaseName database name for which rate limiter needs to be created @@ -348,6 +463,18 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName)); } + /** + * Creates a new database rate limiter. Will not update the database rate limiter if it already exists. + * + * @param applicationName database name for which rate limiter needs to be created + */ + public void createApplicationRateLimiter(String applicationName) { + if (_applicationRateLimiterMap.containsKey(applicationName)) { + return; + } + createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName)); + } + /** * Build an empty rate limiter in the new query quota entity, or set the rate limiter to null in an existing query * quota entity. @@ -365,6 +492,23 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan } } + /** + * Build an empty rate limiter in the new query quota entity, or set the rate limiter to null in an existing query + * quota entity. + */ + private void buildEmptyOrResetApplicationRateLimiter(String applicationName) { + QueryQuotaEntity quotaEntity = _applicationRateLimiterMap.get(applicationName); + if (quotaEntity == null) { + // Create an QueryQuotaEntity object without setting a rate limiter. + quotaEntity = new QueryQuotaEntity(null, new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), + new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), 0, 0, 0); + _applicationRateLimiterMap.put(applicationName, quotaEntity); + } else { + // Set rate limiter to null for an existing QueryQuotaEntity object. + quotaEntity.setRateLimiter(null); + } + } + /** * Build an empty rate limiter in the new query quota entity, or set the rate limiter to null in an existing query * quota entity. @@ -428,6 +572,25 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan return tryAcquireToken(databaseName, queryQuota); } + @Override + public boolean acquireApplication(String applicationName) { + if (isQueryRateLimitDisabled()) { + return true; + } + QueryQuotaEntity queryQuota = _applicationRateLimiterMap.get(applicationName); + if (queryQuota == null) { + if (getDefaultQueryQuotaForApplication() < 0) { + return true; + } else { + createOrUpdateApplicationRateLimiter(applicationName); + queryQuota = _applicationRateLimiterMap.get(applicationName); + } + } + + LOGGER.debug("Trying to acquire token for application: {}", applicationName); + return tryAcquireToken(applicationName, queryQuota); + } + @Override public double getTableQueryQuota(String tableNameWithType) { return getQueryQuota(_rateLimiterMap.get(tableNameWithType)); @@ -438,6 +601,11 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan return getQueryQuota(_databaseRateLimiterMap.get(databaseName)); } + @Override + public double getApplicationQueryQuota(String applicationName) { + return getQueryQuota(_applicationRateLimiterMap.get(applicationName)); + } + private double getQueryQuota(QueryQuotaEntity quotaEntity) { return quotaEntity == null || quotaEntity.getRateLimiter() == null ? 0 : quotaEntity.getRateLimiter().getRate(); } @@ -503,11 +671,11 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan if (rateLimiter == null) { return true; } - double perBrokerRate = rateLimiter.getRate(); // Emit the qps capacity utilization rate. - int numHits = queryQuotaEntity.getQpsTracker().getHitCount(); if (!rateLimiter.tryAcquire()) { + int numHits = queryQuotaEntity.getQpsTracker().getHitCount(); + double perBrokerRate = rateLimiter.getRate(); LOGGER.info("Quota is exceeded for table/database: {}. Per-broker rate: {}. Current qps: {}", resourceName, perBrokerRate, numHits); return false; @@ -526,6 +694,11 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan return _databaseRateLimiterMap; } + @VisibleForTesting + public Map<String, QueryQuotaEntity> getApplicationRateLimiterMap() { + return _applicationRateLimiterMap; + } + @VisibleForTesting public void cleanUpRateLimiterMap() { _rateLimiterMap.clear(); @@ -625,7 +798,20 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan quota.setNumOnlineBrokers(onlineBrokerCount); } if (quota.getOverallRate() > 0) { - quota.setRateLimiter(RateLimiter.create(quota.getOverallRate() / onlineBrokerCount)); + double qpsQuota = quota.getOverallRate() / onlineBrokerCount; + quota.setRateLimiter(RateLimiter.create(qpsQuota)); + } + } + + // handle EV change for application query quotas + for (Map.Entry<String, QueryQuotaEntity> it : _applicationRateLimiterMap.entrySet()) { + QueryQuotaEntity quota = it.getValue(); + if (quota.getNumOnlineBrokers() != onlineBrokerCount) { + quota.setNumOnlineBrokers(onlineBrokerCount); + } + if (quota.getOverallRate() > 0) { + double qpsQuota = quota.getOverallRate() / onlineBrokerCount; + quota.setRateLimiter(RateLimiter.create(qpsQuota)); } } @@ -651,6 +837,15 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan createOrUpdateDatabaseRateLimiter(new ArrayList<>(_databaseRateLimiterMap.keySet())); } + public void processApplicationQueryRateLimitingClusterConfigChange() { + double oldQpsQuota = _defaultQpsQuotaForApplication; + _defaultQpsQuotaForApplication = getDefaultQueryQuotaForApplication(); + if (oldQpsQuota == _defaultQpsQuotaForApplication) { + return; + } + createOrUpdateApplicationRateLimiter(new ArrayList<>(_applicationRateLimiterMap.keySet())); + } + private double getDefaultQueryQuotaForDatabase() { HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool(); HelixConfigScope configScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER) @@ -660,6 +855,15 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan .getOrDefault(CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND, "-1")); } + private double getDefaultQueryQuotaForApplication() { + HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool(); + HelixConfigScope configScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster( + _helixManager.getClusterName()).build(); + return Double.parseDouble(helixAdmin.getConfig(configScope, + Collections.singletonList(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND)) + .getOrDefault(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND, "-1")); + } + /** * Process query quota state change when instance config gets changed */ diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java index 50d2a8c7ae..70c3ef7588 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java @@ -34,6 +34,13 @@ public interface QueryQuotaManager { */ boolean acquireDatabase(String databaseName); + /** + * Try to acquire a quota for the given application. + * @param applicationName application name + * @return {@code true} if the application quota has not been reached, {@code false} otherwise + */ + boolean acquireApplication(String applicationName); + /** * Get the QPS quota in effect for the table * @param tableNameWithType table name with type @@ -47,4 +54,11 @@ public interface QueryQuotaManager { * @return effective quota qps. 0 if no qps quota is set. */ double getDatabaseQueryQuota(String databaseName); + + /** + * Get the QPS quota in effect for the application + * @param applicationName table name with type + * @return effective quota qps. 0 if no qps quota is set. + */ + double getApplicationQueryQuota(String applicationName); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 406d3d032a..9a5e0e94a4 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -53,10 +53,13 @@ import org.apache.pinot.spi.exception.BadQueryRequestException; import org.apache.pinot.spi.trace.RequestContext; import org.apache.pinot.spi.utils.CommonConstants.Broker; import org.apache.pinot.sql.parsers.SqlNodeAndOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @ThreadSafe public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(BaseBrokerRequestHandler.class); protected final PinotConfiguration _config; protected final String _brokerId; protected final BrokerRoutingManager _routingManager; @@ -145,6 +148,16 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { } } + // check app qps before doing anything + String application = sqlNodeAndOptions.getOptions().get(Broker.Request.QueryOptionKey.APPLICATION_NAME); + if (application != null && !_queryQuotaManager.acquireApplication(application)) { + String errorMessage = + "Request " + requestId + ": " + query + " exceeds query quota for application: " + application; + LOGGER.info(errorMessage); + requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE); + return new BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR, errorMessage)); + } + // Add null handling option from broker config only if there is no override in the query if (_enableNullHandling != null) { sqlNodeAndOptions.getOptions() diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 70dadd2f24..8aef51ebd1 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -349,7 +349,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { } /** - * Returns true if the QPS quota of the tables has exceeded. + * Returns true if the QPS quota of query tables, database or application has been exceeded. */ private boolean hasExceededQPSQuota(@Nullable String database, Set<String> tableNames, RequestContext requestContext) { diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java index a9ac37f544..131faee022 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java @@ -60,6 +60,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { private HelixExternalViewBasedQueryQuotaManager _queryQuotaManager; private ZkStarter.ZookeeperInstance _zookeeperInstance; private static final Map<String, String> CLUSTER_CONFIG_MAP = new HashMap<>(); + private static final String APP_NAME = "app"; private static final String RAW_TABLE_NAME = "testTable"; private static final String OFFLINE_TABLE_NAME = RAW_TABLE_NAME + "_OFFLINE"; private static final String REALTIME_TABLE_NAME = RAW_TABLE_NAME + "_REALTIME"; @@ -138,10 +139,12 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { ZKMetadataProvider.removeResourceConfigFromPropertyStore(_testPropertyStore, OFFLINE_TABLE_NAME); ZKMetadataProvider.removeResourceConfigFromPropertyStore(_testPropertyStore, REALTIME_TABLE_NAME); ZKMetadataProvider.removeDatabaseConfig(_testPropertyStore, CommonConstants.DEFAULT_DATABASE); + ZKMetadataProvider.removeApplicationQuotas(_testPropertyStore); CLUSTER_CONFIG_MAP.clear(); } _queryQuotaManager.cleanUpRateLimiterMap(); _queryQuotaManager.getDatabaseRateLimiterMap().clear(); + _queryQuotaManager.getApplicationRateLimiterMap().clear(); } @AfterTest @@ -255,6 +258,112 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); } + @Test + public void testWhenNoTableOrDatabaseOrApplicationQuotasSetQueriesRunWild() + throws InterruptedException { + ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME); + TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME); + ZKMetadataProvider.setTableConfig(_testPropertyStore, tableConfig); + _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); + _queryQuotaManager.createDatabaseRateLimiter(CommonConstants.DEFAULT_DATABASE); + _queryQuotaManager.createApplicationRateLimiter(APP_NAME); + Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1); + Assert.assertEquals(_queryQuotaManager.getDatabaseRateLimiterMap().size(), 1); + Assert.assertEquals(_queryQuotaManager.getApplicationRateLimiterMap().size(), 1); + + setDefaultDatabaseQps("-1"); + setDefaultApplicationQps("-1"); + + runQueries(25, false); + runQueries(40, false); + runQueries(100, false); + + _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME); + Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); + } + + @Test + public void testWhenOnlySpecificAppQuotaIsSetItAffectsQueriesWithAppOption() + throws InterruptedException { + ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME); + TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME); + ZKMetadataProvider.setTableConfig(_testPropertyStore, tableConfig); + _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); + _queryQuotaManager.createDatabaseRateLimiter(CommonConstants.DEFAULT_DATABASE); + + ZKMetadataProvider.setApplicationQpsQuota(_testPropertyStore, APP_NAME, 50d); + _queryQuotaManager.createApplicationRateLimiter(APP_NAME); + + setDefaultDatabaseQps("-1"); + setDefaultApplicationQps("-1"); + + Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1); + Assert.assertEquals(_queryQuotaManager.getDatabaseRateLimiterMap().size(), 1); + Assert.assertEquals(_queryQuotaManager.getApplicationRateLimiterMap().size(), 1); + + runQueries(50, false); + runQueries(100, true); + + _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME); + Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); + } + + @Test + public void testWhenOnlyDefaultAppQuotaIsSetItAffectsAllApplications() + throws InterruptedException { + ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME); + TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME); + ZKMetadataProvider.setTableConfig(_testPropertyStore, tableConfig); + _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); + _queryQuotaManager.createDatabaseRateLimiter(CommonConstants.DEFAULT_DATABASE); + + setDefaultDatabaseQps("-1"); + setDefaultApplicationQps("50"); + + ZKMetadataProvider.setApplicationQpsQuota(_testPropertyStore, "someApp", 100d); + _queryQuotaManager.createApplicationRateLimiter("someApp"); + + Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1); + Assert.assertEquals(_queryQuotaManager.getDatabaseRateLimiterMap().size(), 1); + Assert.assertEquals(_queryQuotaManager.getApplicationRateLimiterMap().size(), 1); + + runQueries(100, true, APP_NAME); + runQueries(100, true, "otherApp"); + runQueries(100, false, "someApp"); + runQueries(201, true, "someApp"); + + Assert.assertEquals(_queryQuotaManager.getApplicationRateLimiterMap().size(), 3); + _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME); + Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); + } + + @Test + public void tesCreateAndUpdateAppRateLimiterChangesRateLimiterMap() { + Map<String, Double> apps = new HashMap<>(); + apps.put("app1", null); + apps.put("app2", 1d); + apps.put("app3", 2d); + + apps.entrySet().stream().forEach(e -> { + ZKMetadataProvider.setApplicationQpsQuota(_testPropertyStore, e.getKey(), e.getValue()); + }); + apps.entrySet().forEach(app -> _queryQuotaManager.createApplicationRateLimiter(app.getKey())); + Map<String, QueryQuotaEntity> appQuotaMap = _queryQuotaManager.getApplicationRateLimiterMap(); + + Assert.assertNull(appQuotaMap.get("app1").getRateLimiter()); + Assert.assertEquals(appQuotaMap.get("app2").getRateLimiter().getRate(), 1); + Assert.assertEquals(appQuotaMap.get("app3").getRateLimiter().getRate(), 2); + + ZKMetadataProvider.setApplicationQpsQuota(_testPropertyStore, "app1", 1d); + ZKMetadataProvider.setApplicationQpsQuota(_testPropertyStore, "app2", 2d); + + apps.entrySet().forEach(e -> _queryQuotaManager.updateApplicationRateLimiter(e.getKey())); + + Assert.assertEquals(appQuotaMap.get("app1").getRateLimiter().getRate(), 1); + Assert.assertEquals(appQuotaMap.get("app2").getRateLimiter().getRate(), 2); + Assert.assertEquals(appQuotaMap.get("app3").getRateLimiter().getRate(), 2); + } + @Test public void testCreateOrUpdateDatabaseRateLimiter() { List<String> dbList = new ArrayList<>(2); @@ -264,19 +373,23 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { DatabaseConfig db1 = new DatabaseConfig(dbList.get(0), new QuotaConfig(null, null)); DatabaseConfig db2 = new DatabaseConfig(dbList.get(1), new QuotaConfig(null, "1")); DatabaseConfig db3 = new DatabaseConfig(dbList.get(2), new QuotaConfig(null, "2")); + ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db1); ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db2); ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db3); + dbList.forEach(db -> _queryQuotaManager.createDatabaseRateLimiter(db)); Map<String, QueryQuotaEntity> dbQuotaMap = _queryQuotaManager.getDatabaseRateLimiterMap(); Assert.assertNull(dbQuotaMap.get(dbList.get(0)).getRateLimiter()); Assert.assertEquals(dbQuotaMap.get(dbList.get(1)).getRateLimiter().getRate(), 1); Assert.assertEquals(dbQuotaMap.get(dbList.get(2)).getRateLimiter().getRate(), 2); + db1.setQuotaConfig(new QuotaConfig(null, "1")); db2.setQuotaConfig(new QuotaConfig(null, "2")); ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db1); ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db2); dbList.forEach(db -> _queryQuotaManager.updateDatabaseRateLimiter(db)); + Assert.assertEquals(dbQuotaMap.get(dbList.get(0)).getRateLimiter().getRate(), 1); Assert.assertEquals(dbQuotaMap.get(dbList.get(1)).getRateLimiter().getRate(), 2); Assert.assertEquals(dbQuotaMap.get(dbList.get(2)).getRateLimiter().getRate(), 2); @@ -509,6 +622,11 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { _queryQuotaManager.processQueryRateLimitingClusterConfigChange(); } + private void setDefaultApplicationQps(String maxQps) { + CLUSTER_CONFIG_MAP.put(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND, maxQps); + _queryQuotaManager.processApplicationQueryRateLimitingClusterConfigChange(); + } + private void setDatabaseQps(DatabaseConfig databaseConfig, String maxQps) { QuotaConfig quotaConfig = new QuotaConfig(null, maxQps); databaseConfig.setQuotaConfig(quotaConfig); @@ -516,11 +634,21 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { _queryQuotaManager.createDatabaseRateLimiter(CommonConstants.DEFAULT_DATABASE); } + private void setApplicationQps(String appName, Double maxQps) { + ZKMetadataProvider.setApplicationQpsQuota(_testPropertyStore, appName, maxQps); + _queryQuotaManager.createApplicationRateLimiter(appName); + } + private void setQps(TableConfig tableConfig) { QuotaConfig quotaConfig = new QuotaConfig(null, TABLE_MAX_QPS_STR); tableConfig.setQuotaConfig(quotaConfig); } + private void setQps(TableConfig tableConfig, String value) { + QuotaConfig quotaConfig = new QuotaConfig(null, value); + tableConfig.setQuotaConfig(quotaConfig); + } + private static ExternalView generateBrokerResource(String tableName) { ExternalView brokerResource = new ExternalView(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); brokerResource.setState(tableName, BROKER_INSTANCE_ID, "ONLINE"); @@ -531,17 +659,29 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { private void runQueries() throws InterruptedException { runQueries(TABLE_MAX_QPS, false); - //increase the qps and some of the queries should be throttled. - runQueries(TABLE_MAX_QPS * 2, true); + // increase the qps and some of the queries should be throttled. + // keep in mind that permits are 'regenerated' on every call based on how much time elapsed since last one + // that means for 25 QPS we get new permit every 40 ms or 0.5 every 20 ms + // if we start with 25 permits at time t1 then if we want to exceed the qps in the next second we've to do more + // double requests, because 25 will regenerate + runQueries(TABLE_MAX_QPS * 2 + 1, true); + } + + private void runQueries(double qps, boolean shouldFail) + throws InterruptedException { + runQueries(qps, shouldFail, APP_NAME); } // try to keep the qps below 50 to ensure that the time lost between 2 query runs on top of the sleepMillis // is not comparable to sleepMillis, else the actual qps would end being lot lower than required qps - private void runQueries(double qps, boolean shouldFail) + private void runQueries(double qps, boolean shouldFail, String appName) throws InterruptedException { int failCount = 0; long sleepMillis = (long) (1000 / qps); for (int i = 0; i < qps; i++) { + if (!_queryQuotaManager.acquireApplication(appName)) { + failCount++; + } if (!_queryQuotaManager.acquireDatabase(CommonConstants.DEFAULT_DATABASE)) { failCount++; } @@ -550,6 +690,11 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { } Thread.sleep(sleepMillis); } - Assert.assertTrue((failCount == 0 && !shouldFail) || (failCount != 0 && shouldFail)); + + if (shouldFail) { + Assert.assertTrue(failCount != 0, "Expected failure with qps: " + qps + " and app :" + appName); + } else { + Assert.assertTrue(failCount == 0, "Expected no failure with qps: " + qps + " and app :" + appName); + } } } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java index 0677d9dc5d..df4b1b6bf8 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java @@ -173,6 +173,7 @@ public class BaseSingleStageBrokerRequestHandlerTest { QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class); when(queryQuotaManager.acquire(anyString())).thenReturn(true); when(queryQuotaManager.acquireDatabase(anyString())).thenReturn(true); + when(queryQuotaManager.acquireApplication(anyString())).thenReturn(true); CountDownLatch latch = new CountDownLatch(1); long[] testRequestId = {-1}; BrokerMetrics.register(mock(BrokerMetrics.class)); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/messages/ApplicationQpsQuotaRefreshMessage.java b/pinot-common/src/main/java/org/apache/pinot/common/messages/ApplicationQpsQuotaRefreshMessage.java new file mode 100644 index 0000000000..11768f7b37 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/messages/ApplicationQpsQuotaRefreshMessage.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.messages; + +import java.util.UUID; +import org.apache.helix.model.Message; +import org.apache.helix.zookeeper.datamodel.ZNRecord; + + +/** + * This (Helix) message is sent from the controller to brokers when a request is received to update the application + * quota. + */ +public class ApplicationQpsQuotaRefreshMessage extends Message { + public static final String REFRESH_APP_QUOTA_MSG_SUB_TYPE = "REFRESH_APPLICATION_QUOTA"; + + private static final String APPLICATION_NAME_KEY = "applicationName"; + + /** + * Constructor for the sender. + */ + public ApplicationQpsQuotaRefreshMessage(String applicationName) { + super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString()); + setMsgSubType(REFRESH_APP_QUOTA_MSG_SUB_TYPE); + // Give it infinite time to process the message, as long as session is alive + setExecutionTimeout(-1); + // Set the Pinot specific fields + ZNRecord znRecord = getRecord(); + znRecord.setSimpleField(APPLICATION_NAME_KEY, applicationName); + } + + /** + * Constructor for the receiver. + */ + public ApplicationQpsQuotaRefreshMessage(Message message) { + super(message.getRecord()); + if (!message.getMsgSubType().equals(REFRESH_APP_QUOTA_MSG_SUB_TYPE)) { + throw new IllegalArgumentException("Invalid message subtype:" + message.getMsgSubType()); + } + } + + public String getApplicationName() { + return getRecord().getSimpleField(APPLICATION_NAME_KEY); + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java index 588b9df026..7d1143b0cb 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java @@ -63,6 +63,7 @@ public class ZKMetadataProvider { private static final Logger LOGGER = LoggerFactory.getLogger(ZKMetadataProvider.class); private static final String CLUSTER_TENANT_ISOLATION_ENABLED_KEY = "tenantIsolationEnabled"; + private static final String CLUSTER_APPLICATION_QUOTAS = "applicationQuotas"; private static final String PROPERTYSTORE_CONTROLLER_JOBS_PREFIX = "/CONTROLLER_JOBS"; private static final String PROPERTYSTORE_SEGMENTS_PREFIX = "/SEGMENTS"; private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS"; @@ -112,6 +113,15 @@ public class ZKMetadataProvider { propertyStore.remove(constructPropertyStorePathForDatabaseConfig(databaseName), AccessOption.PERSISTENT); } + /** + * Remove database config. + */ + @VisibleForTesting + public static void removeApplicationQuotas(ZkHelixPropertyStore<ZNRecord> propertyStore) { + propertyStore.remove(constructPropertyStorePathForControllerConfig(CLUSTER_APPLICATION_QUOTAS), + AccessOption.PERSISTENT); + } + private static ZNRecord toZNRecord(DatabaseConfig databaseConfig) { ZNRecord databaseConfigZNRecord = new ZNRecord(databaseConfig.getDatabaseName()); Map<String, String> simpleFields = new HashMap<>(); @@ -758,4 +768,66 @@ public class ZKMetadataProvider { return true; } } + + public static boolean setApplicationQpsQuota(ZkHelixPropertyStore<ZNRecord> propertyStore, String applicationName, + Double value) { + final ZNRecord znRecord; + final String path = constructPropertyStorePathForControllerConfig(CLUSTER_APPLICATION_QUOTAS); + + boolean doCreate; + if (!propertyStore.exists(path, AccessOption.PERSISTENT)) { + znRecord = new ZNRecord(CLUSTER_APPLICATION_QUOTAS); + doCreate = true; + } else { + znRecord = propertyStore.get(path, null, AccessOption.PERSISTENT); + doCreate = false; + } + + Map<String, String> quotas = znRecord.getMapField(CLUSTER_APPLICATION_QUOTAS); + if (quotas == null) { + quotas = new HashMap<>(); + znRecord.setMapField(CLUSTER_APPLICATION_QUOTAS, quotas); + } + quotas.put(applicationName, value != null ? value.toString() : null); + + if (doCreate) { + return propertyStore.create(path, znRecord, AccessOption.PERSISTENT); + } else { + return propertyStore.set(path, znRecord, AccessOption.PERSISTENT); + } + } + + @Nullable + public static Map<String, Double> getApplicationQpsQuotas(ZkHelixPropertyStore<ZNRecord> propertyStore) { + String controllerConfigPath = constructPropertyStorePathForControllerConfig(CLUSTER_APPLICATION_QUOTAS); + if (propertyStore.exists(controllerConfigPath, AccessOption.PERSISTENT)) { + ZNRecord znRecord = propertyStore.get(controllerConfigPath, null, AccessOption.PERSISTENT); + if (znRecord.getMapFields().containsKey(CLUSTER_APPLICATION_QUOTAS)) { + return toApplicationQpsQuotas(znRecord.getMapField(CLUSTER_APPLICATION_QUOTAS)); + } else { + return null; + } + } else { + return null; + } + } + + private static Map<String, Double> toApplicationQpsQuotas(Map<String, String> quotas) { + if (quotas == null) { + return new HashMap<>(); + } else { + HashMap<String, Double> result = new HashMap<>(); + for (Map.Entry<String, String> entry : quotas.entrySet()) { + if (entry.getValue() != null) { + try { + double value = Double.parseDouble(entry.getValue()); + result.put(entry.getKey(), value); + } catch (NumberFormatException nfe) { + continue; + } + } + } + return result; + } + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java index 78476f603c..fea05fc8b2 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java @@ -33,6 +33,7 @@ public class Constants { private static final Logger LOGGER = LoggerFactory.getLogger(Constants.class); + public static final String APPLICATION_TAG = "Application"; public static final String CLUSTER_TAG = "Cluster"; public static final String DATABASE_TAG = "Database"; public static final String TABLE_TAG = "Table"; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java new file mode 100644 index 0000000000..db050168fa --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiKeyAuthDefinition; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.Authorization; +import io.swagger.annotations.SecurityDefinition; +import io.swagger.annotations.SwaggerDefinition; +import java.util.Collections; +import java.util.Map; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.helix.HelixAdmin; +import org.apache.helix.model.HelixConfigScope; +import org.apache.helix.model.builder.HelixConfigScopeBuilder; +import org.apache.pinot.controller.api.exception.ControllerApplicationException; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.core.auth.Actions; +import org.apache.pinot.core.auth.Authorize; +import org.apache.pinot.core.auth.TargetType; +import org.apache.pinot.spi.utils.CommonConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY; + + +@Api(tags = Constants.APPLICATION_TAG, authorizations = {@Authorization(value = SWAGGER_AUTHORIZATION_KEY)}) +@SwaggerDefinition(securityDefinition = @SecurityDefinition(apiKeyAuthDefinitions = { + @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = + SWAGGER_AUTHORIZATION_KEY, description = + "The format of the key is ```\"Basic <token>\" or \"Bearer " + + "<token>\"```"), @ApiKeyAuthDefinition(name = CommonConstants.APPLICATION, in = + ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = CommonConstants.APPLICATION, description = + "Application context passed through http header. If no context is provided 'default' application " + + "context will be considered.") +})) +@Path("/") +public class PinotApplicationQuotaRestletResource { + public static final Logger LOGGER = LoggerFactory.getLogger(PinotApplicationQuotaRestletResource.class); + + @Inject + PinotHelixResourceManager _pinotHelixResourceManager; + + /** + * API to get application quota configs. Will return null if application quotas are not defined + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/applicationQuotas") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_APPLICATION_QUERY_QUOTA) + @ApiOperation(value = "Get all application qps quotas", notes = "Get all application qps quotas") + public Map<String, Double> getApplicationQuotas(@Context HttpHeaders httpHeaders) { + Map<String, Double> quotas = _pinotHelixResourceManager.getApplicationQuotas(); + if (quotas != null) { + return quotas; + } else { + return Collections.emptyMap(); + } + } + + /** + * API to get application quota configs. Will return null if application quotas are not defined + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/applicationQuotas/{appName}") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_APPLICATION_QUERY_QUOTA) + @ApiOperation(value = "Get application qps quota", notes = "Get application qps quota") + public Double getApplicationQuota(@Context HttpHeaders httpHeaders, @PathParam("appName") String appName) { + + Map<String, Double> quotas = _pinotHelixResourceManager.getApplicationQuotas(); + if (quotas != null && quotas.containsKey(appName)) { + return quotas.get(appName); + } + + HelixConfigScope scope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster( + _pinotHelixResourceManager.getHelixClusterName()).build(); + HelixAdmin helixAdmin = _pinotHelixResourceManager.getHelixAdmin(); + String defaultQuota = + helixAdmin.getConfig(scope, Collections.singletonList(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND)) + .getOrDefault(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND, null); + return defaultQuota != null ? Double.parseDouble(defaultQuota) : null; + } + + /** + * API to update the quota configs for application + */ + @POST + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.APPLICATION_JSON) + @Path("/applicationQuotas/{appName}") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.UPDATE_APPLICATION_QUOTA) + @ApiOperation(value = "Update application quota", notes = "Update application quota") + public SuccessResponse setApplicationQuota(@PathParam("appName") String appName, + @QueryParam("maxQueriesPerSecond") String queryQuota, @Context HttpHeaders httpHeaders) { + try { + try { + Double newQuota = queryQuota != null ? Double.parseDouble(queryQuota) : null; + _pinotHelixResourceManager.updateApplicationQpsQuota(appName, newQuota); + } catch (NumberFormatException nfe) { + throw new ControllerApplicationException(LOGGER, "Application query quota value is not a number", + Response.Status.INTERNAL_SERVER_ERROR, nfe); + } + + return new SuccessResponse("Query quota for application " + appName + " successfully updated"); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java index eecf2d0778..ad5067f595 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java @@ -137,7 +137,7 @@ public class PinotDatabaseRestletResource { @Path("/databases/{databaseName}/quotas") @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.UPDATE_DATABASE_QUOTA) @ApiOperation(value = "Update database quotas", notes = "Update database quotas") - public SuccessResponse addTable( + public SuccessResponse setDatabaseQuota( @PathParam("databaseName") String databaseName, @QueryParam("maxQueriesPerSecond") String queryQuota, @Context HttpHeaders httpHeaders) { if (!databaseName.equals(DatabaseUtils.extractDatabaseFromHttpHeaders(httpHeaders))) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 1b4b722a7e..7ce94f04c8 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -102,6 +102,7 @@ import org.apache.pinot.common.lineage.LineageEntryState; import org.apache.pinot.common.lineage.SegmentLineage; import org.apache.pinot.common.lineage.SegmentLineageAccessHelper; import org.apache.pinot.common.lineage.SegmentLineageUtils; +import org.apache.pinot.common.messages.ApplicationQpsQuotaRefreshMessage; import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage; import org.apache.pinot.common.messages.RoutingTableRebuildMessage; import org.apache.pinot.common.messages.RunPeriodicTaskMessage; @@ -197,6 +198,7 @@ public class PinotHelixResourceManager { private static final int DEFAULT_IDEAL_STATE_UPDATER_LOCKERS_SIZE = 500; private static final int DEFAULT_LINEAGE_UPDATER_LOCKERS_SIZE = 500; private static final String API_REQUEST_ID_PREFIX = "api-"; + private static final int INFINITE_TIMEOUT = -1; private enum LineageUpdateType { START, END, REVERT @@ -1653,6 +1655,19 @@ public class PinotHelixResourceManager { sendDatabaseConfigRefreshMessage(databaseConfig.getDatabaseName()); } + /** + * Updates application quota and sends out a refresh message. + * + * @param applicationName name of application to set quota for + * @param value quota value to set + */ + public void updateApplicationQpsQuota(String applicationName, Double value) { + if (!ZKMetadataProvider.setApplicationQpsQuota(_propertyStore, applicationName, value)) { + throw new RuntimeException("Failed to create query quota for application: " + applicationName); + } + sendApplicationQpsQuotaRefreshMessage(applicationName); + } + /** * Updates database config and sends out a database config refresh message. * @param databaseConfig database config to be created @@ -2884,6 +2899,25 @@ public class PinotHelixResourceManager { } } + private void sendApplicationQpsQuotaRefreshMessage(String appName) { + ApplicationQpsQuotaRefreshMessage message = new ApplicationQpsQuotaRefreshMessage(appName); + + // Send database config refresh message to brokers + Criteria criteria = new Criteria(); + criteria.setRecipientInstanceType(InstanceType.PARTICIPANT); + criteria.setInstanceName("%"); + criteria.setResource(Helix.BROKER_RESOURCE_INSTANCE); + criteria.setSessionSpecific(true); + + int numMessagesSent = _helixZkManager.getMessagingService().send(criteria, message, null, INFINITE_TIMEOUT); + if (numMessagesSent > 0) { + LOGGER.info("Sent {} applcation qps quota refresh messages to brokers for application: {}", numMessagesSent, + appName); + } else { + LOGGER.warn("No application qps quota refresh message sent to brokers for application: {}", appName); + } + } + private void sendDatabaseConfigRefreshMessage(String databaseName) { DatabaseConfigRefreshMessage databaseConfigRefreshMessage = new DatabaseConfigRefreshMessage(databaseName); @@ -3162,6 +3196,16 @@ public class PinotHelixResourceManager { return ZKMetadataProvider.getDatabaseConfig(_propertyStore, databaseName); } + /** + * Get the database config for the given database name. + * + * @return map of application name to quotas + */ + @Nullable + public Map<String, Double> getApplicationQuotas() { + return ZKMetadataProvider.getApplicationQpsQuotas(_propertyStore); + } + /** * Get the table config for the given table name with type suffix. * diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java index 51512eeeb4..d92ee5f1b4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java @@ -72,6 +72,7 @@ public class Actions { public static final String GET_USER = "GetUser"; public static final String GET_VERSION = "GetVersion"; public static final String GET_ZNODE = "GetZnode"; + public static final String GET_APPLICATION_QUERY_QUOTA = "GetApplicationQueryQuota"; public static final String GET_DATABASE_QUOTA = "GetDatabaseQuota"; public static final String GET_DATABASE_QUERY_QUOTA = "GetDatabaseQueryQuota"; public static final String INGEST_FILE = "IngestFile"; @@ -91,6 +92,7 @@ public class Actions { public static final String UPDATE_TIME_INTERVAL = "UpdateTimeInterval"; public static final String UPDATE_USER = "UpdateUser"; public static final String UPDATE_DATABASE_QUOTA = "UpdateDatabaseQuota"; + public static final String UPDATE_APPLICATION_QUOTA = "UpdateApplicationQuota"; public static final String UPDATE_ZNODE = "UpdateZnode"; public static final String UPLOAD_SEGMENT = "UploadSegment"; public static final String GET_INSTANCE_PARTITIONS = "GetInstancePartitions"; diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java index ffe846cf9c..cf1fce6fb0 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java @@ -292,15 +292,28 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { * Creates a new OFFLINE table config. */ protected TableConfig createOfflineTableConfig() { - return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).setTimeColumnName(getTimeColumnName()) - .setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns()) - .setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns()) - .setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs()) - .setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()) - .setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()) - .setIngestionConfig(getIngestionConfig()).setQueryConfig(getQueryConfig()) - .setNullHandlingEnabled(getNullHandlingEnabled()).setSegmentPartitionConfig(getSegmentPartitionConfig()) + // @formatter:off + return new TableConfigBuilder(TableType.OFFLINE) + .setTableName(getTableName()) + .setTimeColumnName(getTimeColumnName()) + .setSortedColumn(getSortedColumn()) + .setInvertedIndexColumns(getInvertedIndexColumns()) + .setNoDictionaryColumns(getNoDictionaryColumns()) + .setRangeIndexColumns(getRangeIndexColumns()) + .setBloomFilterColumns(getBloomFilterColumns()) + .setFieldConfigList(getFieldConfigs()) + .setNumReplicas(getNumReplicas()) + .setSegmentVersion(getSegmentVersion()) + .setLoadMode(getLoadMode()) + .setTaskConfig(getTaskConfig()) + .setBrokerTenant(getBrokerTenant()) + .setServerTenant(getServerTenant()) + .setIngestionConfig(getIngestionConfig()) + .setQueryConfig(getQueryConfig()) + .setNullHandlingEnabled(getNullHandlingEnabled()) + .setSegmentPartitionConfig(getSegmentPartitionConfig()) .build(); + // @formatter:on } /** diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java index dfd9d39727..8ac736e507 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java @@ -79,14 +79,20 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest .buildTransport(); _pinotConnection = ConnectionFactory.fromZookeeper(properties, getZkUrl() + "/" + getHelixClusterName(), _pinotClientTransport); + + // create default application rate limiter manually, otherwise verifyQuotaUpdate will fail + setQueryQuotaForApplication(null); } @AfterMethod void resetQuotas() throws Exception { addQueryQuotaToClusterConfig(null); + addAppQueryQuotaToClusterConfig(null); + setQueryQuotaForApplication(null); addQueryQuotaToDatabaseConfig(null); addQueryQuotaToTableConfig(null); + _brokerHostPort = LOCAL_HOST + ":" + _brokerPorts.get(0); verifyQuotaUpdate(0); } @@ -98,6 +104,13 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest testQueryRate(40); } + @Test + public void testDefaultApplicationQueryQuota() + throws Exception { + addAppQueryQuotaToClusterConfig(50); + testQueryRate(50); + } + @Test public void testDatabaseConfigQueryQuota() throws Exception { @@ -105,6 +118,13 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest testQueryRate(10); } + @Test + public void testApplicationQueryQuota() + throws Exception { + setQueryQuotaForApplication(15); + testQueryRate(15); + } + @Test public void testDefaultDatabaseQueryQuotaOverride() throws Exception { @@ -117,6 +137,18 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest testQueryRate(40); } + @Test + public void testDefaultApplicationQueryQuotaOverride() + throws Exception { + addAppQueryQuotaToClusterConfig(25); + // override lower than default quota + setQueryQuotaForApplication(10); + testQueryRate(10); + // override higher than default quota + setQueryQuotaForApplication(40); + testQueryRate(40); + } + @Test public void testDatabaseQueryQuotaWithTableQueryQuota() throws Exception { @@ -129,6 +161,18 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest testQueryRate(25); } + @Test + public void testApplicationQueryQuotaWithTableQueryQuota() + throws Exception { + setQueryQuotaForApplication(25); + // table quota within database quota. Queries should fail upon table quota (10 qps) breach + addQueryQuotaToTableConfig(10); + testQueryRate(10); + // table quota more than database quota. Queries should fail upon database quota (25 qps) breach + addQueryQuotaToTableConfig(50); + testQueryRate(25); + } + @Test public void testDatabaseQueryQuotaWithTableQueryQuotaWithExtraBroker() throws Exception { @@ -152,6 +196,39 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest } } + @Test + public void testApplicationAndDatabaseQueryQuotaWithTableQueryQuotaWithExtraBroker() + throws Exception { + BaseBrokerStarter brokerStarter = null; + try { + addAppQueryQuotaToClusterConfig(null); + addQueryQuotaToClusterConfig(null); + setQueryQuotaForApplication(50); + addQueryQuotaToDatabaseConfig(25); + addQueryQuotaToTableConfig(10); + // + // Add one more broker such that quota gets distributed equally among them + brokerStarter = startOneBroker(2); + _brokerHostPort = LOCAL_HOST + ":" + brokerStarter.getPort(); + // query only one broker across the divided quota + testQueryRateOnBroker(5); + + // drop table level quota so that database quota comes into effect + addQueryQuotaToTableConfig(null); + // query only one broker across the divided quota + testQueryRateOnBroker(12.5f); + + // drop database level quota so that app quota comes into effect + addQueryQuotaToDatabaseConfig(null); + // query only one broker across the divided quota + testQueryRateOnBroker(25f); + } finally { + if (brokerStarter != null) { + brokerStarter.stop(); + } + } + } + /** * Runs the query load with the max rate that the quota can allow and ensures queries are not failing. * Then runs the query load with double the max rate and expects queries to fail due to quota breach. @@ -181,7 +258,8 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest long sleepMillis = (long) (1000 / qps); Thread.sleep(sleepMillis); for (int i = 0; i < qps * 2; i++) { - ResultSetGroup resultSetGroup = _pinotConnection.execute("SELECT COUNT(*) FROM " + getTableName()); + ResultSetGroup resultSetGroup = + _pinotConnection.execute("SET applicationName='default'; SELECT COUNT(*) FROM " + getTableName()); for (PinotClientException exception : resultSetGroup.getExceptions()) { if (exception.getMessage().contains("QuotaExceededError")) { failCount++; @@ -190,24 +268,48 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest } Thread.sleep(sleepMillis); } - assertTrue((failCount == 0 && !shouldFail) || (failCount != 0 && shouldFail)); + if (shouldFail) { + assertTrue(failCount != 0, "Expected >0 failures for qps: " + qps); + } else { + assertTrue(failCount == 0, "Expected 0 failures for qps: " + qps); + } } + private static volatile float _quota; + private static volatile String _quotaSource; + private void verifyQuotaUpdate(float quotaQps) { - TestUtils.waitForCondition(aVoid -> { - try { - float tableQuota = Float.parseFloat(sendGetRequest(String.format("http://%s/debug/tables/queryQuota/%s_OFFLINE", - _brokerHostPort, getTableName()))); - tableQuota = tableQuota == 0 ? Long.MAX_VALUE : tableQuota; - float dbQuota = Float.parseFloat(sendGetRequest(String.format("http://%s/debug/databases/queryQuota/default", - _brokerHostPort))); - dbQuota = dbQuota == 0 ? Long.MAX_VALUE : dbQuota; - return quotaQps == Math.min(tableQuota, dbQuota) - || (quotaQps == 0 && tableQuota == Long.MAX_VALUE && dbQuota == Long.MAX_VALUE); - } catch (IOException e) { - throw new RuntimeException(e); - } - }, 5000, "Failed to reflect query quota on rate limiter in 5s"); + try { + TestUtils.waitForCondition(aVoid -> { + try { + float tableQuota = Float.parseFloat(sendGetRequest( + String.format("http://%s/debug/tables/queryQuota/%s_OFFLINE", _brokerHostPort, getTableName()))); + tableQuota = tableQuota == 0 ? Long.MAX_VALUE : tableQuota; + float dbQuota = Float.parseFloat( + sendGetRequest(String.format("http://%s/debug/databases/queryQuota/default", _brokerHostPort))); + float appQuota = Float.parseFloat( + sendGetRequest(String.format("http://%s/debug/applicationQuotas/default", _brokerHostPort))); + dbQuota = dbQuota == 0 ? Long.MAX_VALUE : dbQuota; + appQuota = appQuota == 0 ? Long.MAX_VALUE : appQuota; + float actualQuota = Math.min(Math.min(tableQuota, dbQuota), appQuota); + _quota = actualQuota; + if (_quota == dbQuota) { + _quotaSource = "database"; + } else if (_quota == tableQuota) { + _quotaSource = "table"; + } else { + _quotaSource = "application"; + } + return quotaQps == actualQuota || (quotaQps == 0 && tableQuota == Long.MAX_VALUE && dbQuota == Long.MAX_VALUE + && appQuota == Long.MAX_VALUE); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, 5000, "Failed to reflect query quota on rate limiter in 5s."); + } catch (AssertionError ae) { + throw new AssertionError( + ae.getMessage() + " Expected quota:" + quotaQps + " but is: " + _quota + " set on: " + _quotaSource, ae); + } } private BrokerResponse executeQueryOnBroker(String query) { @@ -220,7 +322,8 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest long sleepMillis = (long) (1000 / qps); Thread.sleep(sleepMillis); for (int i = 0; i < qps * 2; i++) { - BrokerResponse resultSetGroup = executeQueryOnBroker("SELECT COUNT(*) FROM " + getTableName()); + BrokerResponse resultSetGroup = + executeQueryOnBroker("SET applicationName='default'; SELECT COUNT(*) FROM " + getTableName()); for (Iterator<JsonNode> it = resultSetGroup.getExceptions().elements(); it.hasNext(); ) { JsonNode exception = it.next(); if (exception.toPrettyString().contains("QuotaExceededError")) { @@ -230,7 +333,12 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest } Thread.sleep(sleepMillis); } - assertTrue((failCount == 0 && !shouldFail) || (failCount != 0 && shouldFail)); + + if (shouldFail) { + assertTrue(failCount != 0, "Expected >0 failures for qps: " + qps); + } else { + assertTrue(failCount == 0, "Expected 0 failures for qps: " + qps); + } } public void addQueryQuotaToTableConfig(Integer maxQps) @@ -251,6 +359,16 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest // to allow change propagation to QueryQuotaManager } + public void setQueryQuotaForApplication(Integer maxQps) + throws Exception { + String url = _controllerRequestURLBuilder.getBaseUrl() + "/applicationQuotas/default"; + if (maxQps != null) { + url += "?maxQueriesPerSecond=" + maxQps; + } + HttpClient.wrapAndThrowHttpException(_httpClient.sendPostRequest(new URI(url), null, null)); + // to allow change propagation to QueryQuotaManager + } + public void addQueryQuotaToClusterConfig(Integer maxQps) throws Exception { if (maxQps == null) { @@ -264,4 +382,18 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest } // to allow change propagation to QueryQuotaManager } + + public void addAppQueryQuotaToClusterConfig(Integer maxQps) + throws Exception { + if (maxQps == null) { + HttpClient.wrapAndThrowHttpException(_httpClient.sendDeleteRequest(new URI( + _controllerRequestURLBuilder.forClusterConfigs() + "/" + + CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND))); + } else { + String payload = "{\"" + CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND + "\":\"" + maxQps + "\"}"; + HttpClient.wrapAndThrowHttpException( + _httpClient.sendJsonPostRequest(new URI(_controllerRequestURLBuilder.forClusterConfigs()), payload)); + } + // to allow change propagation to QueryQuotaManager + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index f62efb2062..e889b5e0f7 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -57,6 +57,8 @@ public class CommonConstants { public static final String CONFIG_OF_SWAGGER_RESOURCES_PATH = "META-INF/resources/webjars/swagger-ui/"; public static final String CONFIG_OF_TIMEZONE = "pinot.timezone"; + public static final String APPLICATION = "application"; + public static final String DATABASE = "database"; public static final String DEFAULT_DATABASE = "default"; public static final String CONFIG_OF_PINOT_INSECURE_MODE = "pinot.insecure.mode"; @@ -86,6 +88,7 @@ public class CommonConstants { public static final String QUERIES_DISABLED = "queriesDisabled"; public static final String QUERY_RATE_LIMIT_DISABLED = "queryRateLimitDisabled"; public static final String DATABASE_MAX_QUERIES_PER_SECOND = "databaseMaxQueriesPerSecond"; + public static final String APPLICATION_MAX_QUERIES_PER_SECOND = "applicationMaxQueriesPerSecond"; public static final String INSTANCE_CONNECTED_METRIC_NAME = "helix.connected"; @@ -401,6 +404,7 @@ public class CommonConstants { public static final String USE_MULTISTAGE_ENGINE = "useMultistageEngine"; public static final String INFER_PARTITION_HINT = "inferPartitionHint"; public static final String ENABLE_NULL_HANDLING = "enableNullHandling"; + public static final String APPLICATION_NAME = "applicationName"; /** * If set, changes the explain behavior in multi-stage engine. * --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org