This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new abb38c3f26 Add support for application-level query quota. (#14226)
abb38c3f26 is described below

commit abb38c3f26ac27f13302417dd298db4b7eb3d90b
Author: Bolek Ziobrowski <26925920+bziobrow...@users.noreply.github.com>
AuthorDate: Tue Oct 22 15:55:53 2024 +0200

    Add support for application-level query quota. (#14226)
    
    Adds a way to throttle queries (executed with both v1 or v2 engine) based 
on applicationName query option.
    Queries such as :
    
    ```
    set applicationName='test';
    select * from tables
    ```
---
 .../broker/api/resources/PinotBrokerDebug.java     |  11 +
 .../BrokerUserDefinedMessageHandlerFactory.java    |  27 ++
 .../HelixExternalViewBasedQueryQuotaManager.java   | 284 ++++++++++++++++++---
 .../pinot/broker/queryquota/QueryQuotaManager.java |  14 +
 .../requesthandler/BaseBrokerRequestHandler.java   |  13 +
 .../MultiStageBrokerRequestHandler.java            |   2 +-
 ...elixExternalViewBasedQueryQuotaManagerTest.java | 153 ++++++++++-
 .../BaseSingleStageBrokerRequestHandlerTest.java   |   1 +
 .../ApplicationQpsQuotaRefreshMessage.java         |  61 +++++
 .../pinot/common/metadata/ZKMetadataProvider.java  |  72 ++++++
 .../pinot/controller/api/resources/Constants.java  |   1 +
 .../PinotApplicationQuotaRestletResource.java      | 139 ++++++++++
 .../resources/PinotDatabaseRestletResource.java    |   2 +-
 .../helix/core/PinotHelixResourceManager.java      |  44 ++++
 .../java/org/apache/pinot/core/auth/Actions.java   |   2 +
 .../tests/BaseClusterIntegrationTest.java          |  29 ++-
 .../tests/QueryQuotaClusterIntegrationTest.java    | 168 ++++++++++--
 .../apache/pinot/spi/utils/CommonConstants.java    |   4 +
 18 files changed, 955 insertions(+), 72 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
index 78a6dd324f..a220bc53a5 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
@@ -323,4 +323,15 @@ public class PinotBrokerDebug {
       @ApiParam(value = "Name of the database") @PathParam("databaseName") 
String databaseName) {
     return 
String.valueOf(_queryQuotaManager.getDatabaseQueryQuota(databaseName));
   }
+
+  @GET
+  @Path("debug/applicationQuotas/{applicationName}")
+  @Produces(MediaType.TEXT_PLAIN)
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_APPLICATION_QUERY_QUOTA)
+  @ApiOperation(value = "Get the active query quota being imposed on the 
application", notes = "This is a debug "
+      + "endpoint, and won't maintain backward compatibility")
+  public String getApplicationQueryQuota(
+      @ApiParam(value = "Name of the application") 
@PathParam("applicationName") String applicationName) {
+    return 
String.valueOf(_queryQuotaManager.getApplicationQueryQuota(applicationName));
+  }
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
index 2c2cc33532..f4da13621e 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
@@ -25,6 +25,7 @@ import 
org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.model.Message;
 import 
org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
 import org.apache.pinot.broker.routing.BrokerRoutingManager;
+import org.apache.pinot.common.messages.ApplicationQpsQuotaRefreshMessage;
 import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage;
 import org.apache.pinot.common.messages.RoutingTableRebuildMessage;
 import org.apache.pinot.common.messages.SegmentRefreshMessage;
@@ -65,6 +66,8 @@ public class BrokerUserDefinedMessageHandlerFactory 
implements MessageHandlerFac
         return new RebuildRoutingTableMessageHandler(new 
RoutingTableRebuildMessage(message), context);
       case DatabaseConfigRefreshMessage.REFRESH_DATABASE_CONFIG_MSG_SUB_TYPE:
         return new RefreshDatabaseConfigMessageHandler(new 
DatabaseConfigRefreshMessage(message), context);
+      case ApplicationQpsQuotaRefreshMessage.REFRESH_APP_QUOTA_MSG_SUB_TYPE:
+        return new RefreshApplicationQpsQuotaMessageHandler(new 
ApplicationQpsQuotaRefreshMessage(message), context);
       default:
         // NOTE: Log a warning and return no-op message handler for 
unsupported message sub-types. This can happen when
         //       a new message sub-type is added, and the sender gets deployed 
first while receiver is still running the
@@ -162,6 +165,30 @@ public class BrokerUserDefinedMessageHandlerFactory 
implements MessageHandlerFac
     }
   }
 
+  private class RefreshApplicationQpsQuotaMessageHandler extends 
MessageHandler {
+    final String _applicationName;
+
+    RefreshApplicationQpsQuotaMessageHandler(ApplicationQpsQuotaRefreshMessage 
applicationQpsAuotaRefreshMessage,
+        NotificationContext context) {
+      super(applicationQpsAuotaRefreshMessage, context);
+      _applicationName = 
applicationQpsAuotaRefreshMessage.getApplicationName();
+    }
+
+    @Override
+    public HelixTaskResult handleMessage() {
+      
_queryQuotaManager.createOrUpdateApplicationRateLimiter(_applicationName);
+      HelixTaskResult result = new HelixTaskResult();
+      result.setSuccess(true);
+      return result;
+    }
+
+    @Override
+    public void onError(Exception e, ErrorCode code, ErrorType type) {
+      LOGGER.error("Got error while refreshing query quota for application: {} 
(error code: {}, error type: {})",
+          _applicationName, code, type, e);
+    }
+  }
+
   private class RebuildRoutingTableMessageHandler extends MessageHandler {
     final String _tableNameWithType;
 
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index b0684d0bc8..48c5c33d0a 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -58,7 +58,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * This class is to support the qps quota feature.
- * It allows performing qps quota check at table level and database level
+ * It allows performing qps quota check at table level, database and 
application level.
  * For table level check it depends on the broker source change to update the 
dynamic rate limit,
  *  which means it gets updated when a new table added or a broker restarted.
  * For database level check it depends on the broker as well as cluster config 
and database config change
@@ -67,6 +67,11 @@ import org.slf4j.LoggerFactory;
  * - the database config is updated
  * - new table is assigned to the broker (rate limiter is created if not 
present)
  * - broker added or removed from cluster
+ * For application level check it depends on the broker as well as cluster 
config and application quota change
+ * to update the dynamic rate limit, which means it gets updated when
+ * - the default query quota at cluster config is updated
+ * - the application quota is updated (e.g. via rest api)
+ * - broker added or removed from cluster
  */
 public class HelixExternalViewBasedQueryQuotaManager implements 
ClusterChangeHandler, QueryQuotaManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class);
@@ -81,7 +86,9 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
   private final AtomicInteger _lastKnownBrokerResourceVersion = new 
AtomicInteger(-1);
   private final Map<String, QueryQuotaEntity> _rateLimiterMap = new 
ConcurrentHashMap<>();
   private final Map<String, QueryQuotaEntity> _databaseRateLimiterMap = new 
ConcurrentHashMap<>();
+  private final Map<String, QueryQuotaEntity> _applicationRateLimiterMap = new 
ConcurrentHashMap<>();
   private double _defaultQpsQuotaForDatabase;
+  private double _defaultQpsQuotaForApplication;
 
   private HelixManager _helixManager;
   private ZkHelixPropertyStore<ZNRecord> _propertyStore;
@@ -98,29 +105,66 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     _helixManager = helixManager;
     _propertyStore = _helixManager.getHelixPropertyStore();
     _defaultQpsQuotaForDatabase = getDefaultQueryQuotaForDatabase();
+    _defaultQpsQuotaForApplication = getDefaultQueryQuotaForApplication();
     getQueryQuotaEnabledFlagFromInstanceConfig();
+
+    initializeApplicationQpsQuotas();
+  }
+
+  // read all app quotas from ZK and create rate limiters
+  private void initializeApplicationQpsQuotas() {
+    Map<String, Double> quotas =
+        
ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore());
+
+    if (quotas == null || quotas.isEmpty()) {
+      return;
+    }
+
+    ExternalView brokerResource = getBrokerResource();
+    int numOnlineBrokers = getNumOnlineBrokers(brokerResource);
+
+    for (Map.Entry<String, Double> entry : quotas.entrySet()) {
+      if (entry.getKey() == null) {
+        continue;
+      }
+
+      String appName = entry.getKey();
+      double appQpsQuota =
+          entry.getValue() != null && entry.getValue() != -1.0d ? 
entry.getValue() : _defaultQpsQuotaForApplication;
+
+      if (appQpsQuota < 0) {
+        buildEmptyOrResetApplicationRateLimiter(appName);
+        continue;
+      }
+
+      double perBrokerQpsQuota = appQpsQuota / numOnlineBrokers;
+      LOGGER.info("Adding new query rate limiter for application {} with rate 
{}.", appName, perBrokerQpsQuota);
+      QueryQuotaEntity queryQuotaEntity =
+          new QueryQuotaEntity(RateLimiter.create(perBrokerQpsQuota), new 
HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
+              new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), 
numOnlineBrokers, appQpsQuota, -1);
+      _applicationRateLimiterMap.put(appName, queryQuotaEntity);
+    }
+
+    return;
   }
 
   @Override
   public void processClusterChange(HelixConstants.ChangeType changeType) {
     Preconditions.checkState(CHANGE_TYPES_TO_PROCESS.contains(changeType), 
"Illegal change type: " + changeType);
     if (changeType == HelixConstants.ChangeType.EXTERNAL_VIEW) {
-      ExternalView brokerResourceEV = HelixHelper
-          .getExternalViewForResource(_helixManager.getClusterManagmentTool(), 
_helixManager.getClusterName(),
-              CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+      ExternalView brokerResourceEV = getBrokerResource();
       processQueryRateLimitingExternalViewChange(brokerResourceEV);
     } else if (changeType == HelixConstants.ChangeType.INSTANCE_CONFIG) {
       processQueryRateLimitingInstanceConfigChange();
     } else {
       processQueryRateLimitingClusterConfigChange();
+      processApplicationQueryRateLimitingClusterConfigChange();
     }
   }
 
   public void initOrUpdateTableQueryQuota(String tableNameWithType) {
     TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
-    ExternalView brokerResourceEV = HelixHelper
-        .getExternalViewForResource(_helixManager.getClusterManagmentTool(), 
_helixManager.getClusterName(),
-            CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    ExternalView brokerResourceEV = getBrokerResource();
     initOrUpdateTableQueryQuota(tableConfig, brokerResourceEV);
   }
 
@@ -264,52 +308,103 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
   }
 
+  /**
+   * Updates the application rate limiter if it already exists. It won't  
create a new rate limiter.
+   *
+   * @param applicationName application name for which rate limiter needs to 
be updated
+   */
+  public void updateApplicationRateLimiter(String applicationName) {
+    if (!_applicationRateLimiterMap.containsKey(applicationName)) {
+      return;
+    }
+    createOrUpdateApplicationRateLimiter(applicationName);
+  }
+
   // Caller method need not worry about getting lock on _databaseRateLimiterMap
   // as this method will do idempotent updates to the database rate limiters
   private synchronized void createOrUpdateDatabaseRateLimiter(List<String> 
databaseNames) {
-    ExternalView brokerResource = HelixHelper
-        .getExternalViewForResource(_helixManager.getClusterManagmentTool(), 
_helixManager.getClusterName(),
-            CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    ExternalView brokerResource = getBrokerResource();
     for (String databaseName : databaseNames) {
-      double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName);
-      if (databaseQpsQuota < 0) {
+      double qpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName);
+      if (qpsQuota < 0) {
         buildEmptyOrResetDatabaseRateLimiter(databaseName);
         continue;
       }
       int numOnlineBrokers = getNumOnlineBrokers(databaseName, brokerResource);
-      double perBrokerQpsQuota = databaseQpsQuota / numOnlineBrokers;
-      QueryQuotaEntity oldQueryQuotaEntity = 
_databaseRateLimiterMap.get(databaseName);
-      if (oldQueryQuotaEntity == null) {
+      double perBrokerQpsQuota = qpsQuota / numOnlineBrokers;
+      QueryQuotaEntity oldEntity = _databaseRateLimiterMap.get(databaseName);
+      if (oldEntity == null) {
         LOGGER.info("Adding new query rate limiter for database {} with rate 
{}.", databaseName, perBrokerQpsQuota);
-        QueryQuotaEntity queryQuotaEntity = new 
QueryQuotaEntity(RateLimiter.create(perBrokerQpsQuota),
-            new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), new 
MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND),
-            numOnlineBrokers, databaseQpsQuota, -1);
+        QueryQuotaEntity queryQuotaEntity =
+            new QueryQuotaEntity(RateLimiter.create(perBrokerQpsQuota),
+                new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
+                new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND),
+                numOnlineBrokers, qpsQuota, -1);
         _databaseRateLimiterMap.put(databaseName, queryQuotaEntity);
         continue;
       }
-      boolean changeDetected = false;
-      double oldQuota = oldQueryQuotaEntity.getRateLimiter() != null ? 
oldQueryQuotaEntity.getRateLimiter().getRate()
-          : -1;
-      if (oldQueryQuotaEntity.getOverallRate() != databaseQpsQuota) {
-        changeDetected = true;
-        LOGGER.info("Overall quota changed for the database from {} to {}", 
oldQueryQuotaEntity.getOverallRate(),
-            databaseQpsQuota);
-        oldQueryQuotaEntity.setOverallRate(databaseQpsQuota);
-      }
-      if (oldQueryQuotaEntity.getNumOnlineBrokers() != numOnlineBrokers) {
-        changeDetected = true;
-        LOGGER.info("Number of online brokers changed for the database from {} 
to {}",
-            oldQueryQuotaEntity.getNumOnlineBrokers(), numOnlineBrokers);
-        oldQueryQuotaEntity.setNumOnlineBrokers(numOnlineBrokers);
+      checkQueryQuotaChanged(databaseName, oldEntity, qpsQuota, "database", 
numOnlineBrokers, perBrokerQpsQuota);
+    }
+  }
+
+  public synchronized void createOrUpdateApplicationRateLimiter(String 
applicationName) {
+    
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName));
+  }
+
+  // Caller method need not worry about getting lock on 
_applicationRateLimiterMap
+  // as this method will do idempotent updates to the application rate limiters
+  private synchronized void createOrUpdateApplicationRateLimiter(List<String> 
applicationNames) {
+    ExternalView brokerResource = getBrokerResource();
+    for (String appName : applicationNames) {
+      double qpsQuota = getEffectiveQueryQuotaOnApplication(appName);
+      if (qpsQuota < 0) {
+        buildEmptyOrResetApplicationRateLimiter(appName);
+        continue;
       }
-      if (!changeDetected) {
-        LOGGER.info("No change detected with the query rate limiter for 
database {}", databaseName);
+      int numOnlineBrokers = getNumOnlineBrokers(brokerResource);
+      double perBrokerQpsQuota = qpsQuota / numOnlineBrokers;
+      QueryQuotaEntity oldEntity = _applicationRateLimiterMap.get(appName);
+      if (oldEntity == null) {
+        LOGGER.info("Adding new query rate limiter for application {} with 
rate {}.", appName, perBrokerQpsQuota);
+        QueryQuotaEntity queryQuotaEntity =
+            new QueryQuotaEntity(RateLimiter.create(perBrokerQpsQuota), new 
HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
+                                 new 
MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), numOnlineBrokers, qpsQuota,
+                                 -1);
+        _applicationRateLimiterMap.put(appName, queryQuotaEntity);
         continue;
       }
-      LOGGER.info("Updating existing query rate limiter for database {} from 
rate {} to {}", databaseName, oldQuota,
-          perBrokerQpsQuota);
-      
oldQueryQuotaEntity.setRateLimiter(RateLimiter.create(perBrokerQpsQuota));
+      checkQueryQuotaChanged(appName, oldEntity, qpsQuota, "application", 
numOnlineBrokers, perBrokerQpsQuota);
+    }
+  }
+
+  private void checkQueryQuotaChanged(String appName, QueryQuotaEntity 
oldEntity, double qpsQuota, String quotaType,
+                                      int numOnlineBrokers, double 
perBrokerQpsQuota) {
+    boolean isChange = false;
+    double oldQuota = oldEntity.getRateLimiter() != null ? 
oldEntity.getRateLimiter().getRate() : -1;
+    if (oldEntity.getOverallRate() != qpsQuota) {
+      isChange = true;
+      LOGGER.info("Overall quota changed for the {} {} from {} to {}", 
quotaType, appName, oldEntity.getOverallRate(),
+                  qpsQuota);
+      oldEntity.setOverallRate(qpsQuota);
+    }
+    if (oldEntity.getNumOnlineBrokers() != numOnlineBrokers) {
+      isChange = true;
+      LOGGER.info("Number of online brokers changed for the {} {} from {} to 
{}",
+                  quotaType, appName, oldEntity.getNumOnlineBrokers(), 
numOnlineBrokers);
+      oldEntity.setNumOnlineBrokers(numOnlineBrokers);
+    }
+    if (!isChange) {
+      LOGGER.info("No change detected with the query rate limiter for {} {}", 
quotaType, appName);
+      return;
     }
+    LOGGER.info("Updating existing query rate limiter for {} {} from rate {} 
to {}", quotaType, appName, oldQuota,
+                perBrokerQpsQuota);
+    oldEntity.setRateLimiter(RateLimiter.create(perBrokerQpsQuota));
+  }
+
+  private ExternalView getBrokerResource() {
+    return 
HelixHelper.getExternalViewForResource(_helixManager.getClusterManagmentTool(),
+        _helixManager.getClusterName(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
   }
 
   // Pulling this logic to a separate placeholder method so that the quota 
split logic
@@ -321,6 +416,10 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     return 
HelixHelper.getOnlineInstanceFromExternalView(brokerResource).size();
   }
 
+  private int getNumOnlineBrokers(ExternalView brokerResource) {
+    return 
HelixHelper.getOnlineInstanceFromExternalView(brokerResource).size();
+  }
+
   /**
    * Utility to get the effective query quota being imposed on a database.
    * It is computed based on the default quota set at cluster config and 
override set at database config
@@ -337,6 +436,22 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     return _defaultQpsQuotaForDatabase;
   }
 
+  /**
+   * Utility to get the effective query quota being imposed on an application. 
It is computed based on the default quota
+   * set at cluster config.
+   *
+   * @param applicationName application name to get the query quota on.
+   * @return effective query quota limit being applied
+   */
+  private double getEffectiveQueryQuotaOnApplication(String applicationName) {
+    Map<String, Double> quotas =
+        
ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore());
+    if (quotas != null && quotas.get(applicationName) != null && 
quotas.get(applicationName) != -1.0d) {
+      return quotas.get(applicationName);
+    }
+    return _defaultQpsQuotaForApplication;
+  }
+
   /**
    * Creates a new database rate limiter. Will not update the database rate 
limiter if it already exists.
    * @param databaseName database name for which rate limiter needs to be 
created
@@ -348,6 +463,18 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
   }
 
+  /**
+   * Creates a new database rate limiter. Will not update the database rate 
limiter if it already exists.
+   *
+   * @param applicationName database name for which rate limiter needs to be 
created
+   */
+  public void createApplicationRateLimiter(String applicationName) {
+    if (_applicationRateLimiterMap.containsKey(applicationName)) {
+      return;
+    }
+    
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName));
+  }
+
   /**
    * Build an empty rate limiter in the new query quota entity, or set the 
rate limiter to null in an existing query
    * quota entity.
@@ -365,6 +492,23 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     }
   }
 
+  /**
+   * Build an empty rate limiter in the new query quota entity, or set the 
rate limiter to null in an existing query
+   * quota entity.
+   */
+  private void buildEmptyOrResetApplicationRateLimiter(String applicationName) 
{
+    QueryQuotaEntity quotaEntity = 
_applicationRateLimiterMap.get(applicationName);
+    if (quotaEntity == null) {
+      // Create an QueryQuotaEntity object without setting a rate limiter.
+      quotaEntity = new QueryQuotaEntity(null, new 
HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
+          new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), 0, 0, 0);
+      _applicationRateLimiterMap.put(applicationName, quotaEntity);
+    } else {
+      // Set rate limiter to null for an existing QueryQuotaEntity object.
+      quotaEntity.setRateLimiter(null);
+    }
+  }
+
   /**
    * Build an empty rate limiter in the new query quota entity, or set the 
rate limiter to null in an existing query
    * quota entity.
@@ -428,6 +572,25 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     return tryAcquireToken(databaseName, queryQuota);
   }
 
+  @Override
+  public boolean acquireApplication(String applicationName) {
+    if (isQueryRateLimitDisabled()) {
+      return true;
+    }
+    QueryQuotaEntity queryQuota = 
_applicationRateLimiterMap.get(applicationName);
+    if (queryQuota == null) {
+      if (getDefaultQueryQuotaForApplication() < 0) {
+        return true;
+      } else {
+        createOrUpdateApplicationRateLimiter(applicationName);
+        queryQuota = _applicationRateLimiterMap.get(applicationName);
+      }
+    }
+
+    LOGGER.debug("Trying to acquire token for application: {}", 
applicationName);
+    return tryAcquireToken(applicationName, queryQuota);
+  }
+
   @Override
   public double getTableQueryQuota(String tableNameWithType) {
     return getQueryQuota(_rateLimiterMap.get(tableNameWithType));
@@ -438,6 +601,11 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     return getQueryQuota(_databaseRateLimiterMap.get(databaseName));
   }
 
+  @Override
+  public double getApplicationQueryQuota(String applicationName) {
+    return getQueryQuota(_applicationRateLimiterMap.get(applicationName));
+  }
+
   private double getQueryQuota(QueryQuotaEntity quotaEntity) {
     return quotaEntity == null || quotaEntity.getRateLimiter() == null ? 0 : 
quotaEntity.getRateLimiter().getRate();
   }
@@ -503,11 +671,11 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     if (rateLimiter == null) {
       return true;
     }
-    double perBrokerRate = rateLimiter.getRate();
 
     // Emit the qps capacity utilization rate.
-    int numHits = queryQuotaEntity.getQpsTracker().getHitCount();
     if (!rateLimiter.tryAcquire()) {
+      int numHits = queryQuotaEntity.getQpsTracker().getHitCount();
+      double perBrokerRate = rateLimiter.getRate();
       LOGGER.info("Quota is exceeded for table/database: {}. Per-broker rate: 
{}. Current qps: {}", resourceName,
           perBrokerRate, numHits);
       return false;
@@ -526,6 +694,11 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     return _databaseRateLimiterMap;
   }
 
+  @VisibleForTesting
+  public Map<String, QueryQuotaEntity> getApplicationRateLimiterMap() {
+    return _applicationRateLimiterMap;
+  }
+
   @VisibleForTesting
   public void cleanUpRateLimiterMap() {
     _rateLimiterMap.clear();
@@ -625,7 +798,20 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
         quota.setNumOnlineBrokers(onlineBrokerCount);
       }
       if (quota.getOverallRate() > 0) {
-        quota.setRateLimiter(RateLimiter.create(quota.getOverallRate() / 
onlineBrokerCount));
+        double qpsQuota = quota.getOverallRate() / onlineBrokerCount;
+        quota.setRateLimiter(RateLimiter.create(qpsQuota));
+      }
+    }
+
+    // handle EV change for application query quotas
+    for (Map.Entry<String, QueryQuotaEntity> it : 
_applicationRateLimiterMap.entrySet()) {
+      QueryQuotaEntity quota = it.getValue();
+      if (quota.getNumOnlineBrokers() != onlineBrokerCount) {
+        quota.setNumOnlineBrokers(onlineBrokerCount);
+      }
+      if (quota.getOverallRate() > 0) {
+        double qpsQuota = quota.getOverallRate() / onlineBrokerCount;
+        quota.setRateLimiter(RateLimiter.create(qpsQuota));
       }
     }
 
@@ -651,6 +837,15 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     createOrUpdateDatabaseRateLimiter(new 
ArrayList<>(_databaseRateLimiterMap.keySet()));
   }
 
+  public void processApplicationQueryRateLimitingClusterConfigChange() {
+    double oldQpsQuota = _defaultQpsQuotaForApplication;
+    _defaultQpsQuotaForApplication = getDefaultQueryQuotaForApplication();
+    if (oldQpsQuota == _defaultQpsQuotaForApplication) {
+      return;
+    }
+    createOrUpdateApplicationRateLimiter(new 
ArrayList<>(_applicationRateLimiterMap.keySet()));
+  }
+
   private double getDefaultQueryQuotaForDatabase() {
     HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool();
     HelixConfigScope configScope = new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
@@ -660,6 +855,15 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
             
.getOrDefault(CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND, "-1"));
   }
 
+  private double getDefaultQueryQuotaForApplication() {
+    HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool();
+    HelixConfigScope configScope = new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
+        _helixManager.getClusterName()).build();
+    return Double.parseDouble(helixAdmin.getConfig(configScope,
+            
Collections.singletonList(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND))
+        
.getOrDefault(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND, "-1"));
+  }
+
   /**
    * Process query quota state change when instance config gets changed
    */
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
index 50d2a8c7ae..70c3ef7588 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
@@ -34,6 +34,13 @@ public interface QueryQuotaManager {
    */
   boolean acquireDatabase(String databaseName);
 
+  /**
+   * Try to acquire a quota for the given application.
+   * @param applicationName application name
+   * @return {@code true} if the application quota has not been reached, 
{@code false} otherwise
+   */
+  boolean acquireApplication(String applicationName);
+
   /**
    * Get the QPS quota in effect for the table
    * @param tableNameWithType table name with type
@@ -47,4 +54,11 @@ public interface QueryQuotaManager {
    * @return effective quota qps. 0 if no qps quota is set.
    */
   double getDatabaseQueryQuota(String databaseName);
+
+  /**
+   * Get the QPS quota in effect for the application
+   * @param applicationName table name with type
+   * @return effective quota qps. 0 if no qps quota is set.
+   */
+  double getApplicationQueryQuota(String applicationName);
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 406d3d032a..9a5e0e94a4 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -53,10 +53,13 @@ import 
org.apache.pinot.spi.exception.BadQueryRequestException;
 import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.spi.utils.CommonConstants.Broker;
 import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 @ThreadSafe
 public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler 
{
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseBrokerRequestHandler.class);
   protected final PinotConfiguration _config;
   protected final String _brokerId;
   protected final BrokerRoutingManager _routingManager;
@@ -145,6 +148,16 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       }
     }
 
+    // check app qps before doing anything
+    String application = 
sqlNodeAndOptions.getOptions().get(Broker.Request.QueryOptionKey.APPLICATION_NAME);
+    if (application != null && 
!_queryQuotaManager.acquireApplication(application)) {
+      String errorMessage =
+          "Request " + requestId + ": " + query + " exceeds query quota for 
application: " + application;
+      LOGGER.info(errorMessage);
+      requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE);
+      return new 
BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR,
 errorMessage));
+    }
+
     // Add null handling option from broker config only if there is no 
override in the query
     if (_enableNullHandling != null) {
       sqlNodeAndOptions.getOptions()
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 70dadd2f24..8aef51ebd1 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -349,7 +349,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
   }
 
   /**
-   * Returns true if the QPS quota of the tables has exceeded.
+   * Returns true if the QPS quota of query tables, database or application 
has been exceeded.
    */
   private boolean hasExceededQPSQuota(@Nullable String database, Set<String> 
tableNames,
       RequestContext requestContext) {
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
index a9ac37f544..131faee022 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
@@ -60,6 +60,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
   private HelixExternalViewBasedQueryQuotaManager _queryQuotaManager;
   private ZkStarter.ZookeeperInstance _zookeeperInstance;
   private static final Map<String, String> CLUSTER_CONFIG_MAP = new 
HashMap<>();
+  private static final String APP_NAME = "app";
   private static final String RAW_TABLE_NAME = "testTable";
   private static final String OFFLINE_TABLE_NAME = RAW_TABLE_NAME + "_OFFLINE";
   private static final String REALTIME_TABLE_NAME = RAW_TABLE_NAME + 
"_REALTIME";
@@ -138,10 +139,12 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
       
ZKMetadataProvider.removeResourceConfigFromPropertyStore(_testPropertyStore, 
OFFLINE_TABLE_NAME);
       
ZKMetadataProvider.removeResourceConfigFromPropertyStore(_testPropertyStore, 
REALTIME_TABLE_NAME);
       ZKMetadataProvider.removeDatabaseConfig(_testPropertyStore, 
CommonConstants.DEFAULT_DATABASE);
+      ZKMetadataProvider.removeApplicationQuotas(_testPropertyStore);
       CLUSTER_CONFIG_MAP.clear();
     }
     _queryQuotaManager.cleanUpRateLimiterMap();
     _queryQuotaManager.getDatabaseRateLimiterMap().clear();
+    _queryQuotaManager.getApplicationRateLimiterMap().clear();
   }
 
   @AfterTest
@@ -255,6 +258,112 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
+  @Test
+  public void testWhenNoTableOrDatabaseOrApplicationQuotasSetQueriesRunWild()
+      throws InterruptedException {
+    ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
+    TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
+    ZKMetadataProvider.setTableConfig(_testPropertyStore, tableConfig);
+    _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, 
brokerResource);
+    
_queryQuotaManager.createDatabaseRateLimiter(CommonConstants.DEFAULT_DATABASE);
+    _queryQuotaManager.createApplicationRateLimiter(APP_NAME);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+    Assert.assertEquals(_queryQuotaManager.getDatabaseRateLimiterMap().size(), 
1);
+    
Assert.assertEquals(_queryQuotaManager.getApplicationRateLimiterMap().size(), 
1);
+
+    setDefaultDatabaseQps("-1");
+    setDefaultApplicationQps("-1");
+
+    runQueries(25, false);
+    runQueries(40, false);
+    runQueries(100, false);
+
+    _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+  }
+
+  @Test
+  public void testWhenOnlySpecificAppQuotaIsSetItAffectsQueriesWithAppOption()
+      throws InterruptedException {
+    ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
+    TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
+    ZKMetadataProvider.setTableConfig(_testPropertyStore, tableConfig);
+    _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, 
brokerResource);
+    
_queryQuotaManager.createDatabaseRateLimiter(CommonConstants.DEFAULT_DATABASE);
+
+    ZKMetadataProvider.setApplicationQpsQuota(_testPropertyStore, APP_NAME, 
50d);
+    _queryQuotaManager.createApplicationRateLimiter(APP_NAME);
+
+    setDefaultDatabaseQps("-1");
+    setDefaultApplicationQps("-1");
+
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+    Assert.assertEquals(_queryQuotaManager.getDatabaseRateLimiterMap().size(), 
1);
+    
Assert.assertEquals(_queryQuotaManager.getApplicationRateLimiterMap().size(), 
1);
+
+    runQueries(50, false);
+    runQueries(100, true);
+
+    _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+  }
+
+  @Test
+  public void testWhenOnlyDefaultAppQuotaIsSetItAffectsAllApplications()
+      throws InterruptedException {
+    ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
+    TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
+    ZKMetadataProvider.setTableConfig(_testPropertyStore, tableConfig);
+    _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, 
brokerResource);
+    
_queryQuotaManager.createDatabaseRateLimiter(CommonConstants.DEFAULT_DATABASE);
+
+    setDefaultDatabaseQps("-1");
+    setDefaultApplicationQps("50");
+
+    ZKMetadataProvider.setApplicationQpsQuota(_testPropertyStore, "someApp", 
100d);
+    _queryQuotaManager.createApplicationRateLimiter("someApp");
+
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+    Assert.assertEquals(_queryQuotaManager.getDatabaseRateLimiterMap().size(), 
1);
+    
Assert.assertEquals(_queryQuotaManager.getApplicationRateLimiterMap().size(), 
1);
+
+    runQueries(100, true, APP_NAME);
+    runQueries(100, true, "otherApp");
+    runQueries(100, false, "someApp");
+    runQueries(201, true, "someApp");
+
+    
Assert.assertEquals(_queryQuotaManager.getApplicationRateLimiterMap().size(), 
3);
+    _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+  }
+
+  @Test
+  public void tesCreateAndUpdateAppRateLimiterChangesRateLimiterMap() {
+    Map<String, Double> apps = new HashMap<>();
+    apps.put("app1", null);
+    apps.put("app2", 1d);
+    apps.put("app3", 2d);
+
+    apps.entrySet().stream().forEach(e -> {
+      ZKMetadataProvider.setApplicationQpsQuota(_testPropertyStore, 
e.getKey(), e.getValue());
+    });
+    apps.entrySet().forEach(app -> 
_queryQuotaManager.createApplicationRateLimiter(app.getKey()));
+    Map<String, QueryQuotaEntity> appQuotaMap = 
_queryQuotaManager.getApplicationRateLimiterMap();
+
+    Assert.assertNull(appQuotaMap.get("app1").getRateLimiter());
+    Assert.assertEquals(appQuotaMap.get("app2").getRateLimiter().getRate(), 1);
+    Assert.assertEquals(appQuotaMap.get("app3").getRateLimiter().getRate(), 2);
+
+    ZKMetadataProvider.setApplicationQpsQuota(_testPropertyStore, "app1", 1d);
+    ZKMetadataProvider.setApplicationQpsQuota(_testPropertyStore, "app2", 2d);
+
+    apps.entrySet().forEach(e -> 
_queryQuotaManager.updateApplicationRateLimiter(e.getKey()));
+
+    Assert.assertEquals(appQuotaMap.get("app1").getRateLimiter().getRate(), 1);
+    Assert.assertEquals(appQuotaMap.get("app2").getRateLimiter().getRate(), 2);
+    Assert.assertEquals(appQuotaMap.get("app3").getRateLimiter().getRate(), 2);
+  }
+
   @Test
   public void testCreateOrUpdateDatabaseRateLimiter() {
     List<String> dbList = new ArrayList<>(2);
@@ -264,19 +373,23 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
     DatabaseConfig db1 = new DatabaseConfig(dbList.get(0), new 
QuotaConfig(null, null));
     DatabaseConfig db2 = new DatabaseConfig(dbList.get(1), new 
QuotaConfig(null, "1"));
     DatabaseConfig db3 = new DatabaseConfig(dbList.get(2), new 
QuotaConfig(null, "2"));
+
     ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db1);
     ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db2);
     ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db3);
+
     dbList.forEach(db -> _queryQuotaManager.createDatabaseRateLimiter(db));
     Map<String, QueryQuotaEntity> dbQuotaMap = 
_queryQuotaManager.getDatabaseRateLimiterMap();
     Assert.assertNull(dbQuotaMap.get(dbList.get(0)).getRateLimiter());
     
Assert.assertEquals(dbQuotaMap.get(dbList.get(1)).getRateLimiter().getRate(), 
1);
     
Assert.assertEquals(dbQuotaMap.get(dbList.get(2)).getRateLimiter().getRate(), 
2);
+
     db1.setQuotaConfig(new QuotaConfig(null, "1"));
     db2.setQuotaConfig(new QuotaConfig(null, "2"));
     ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db1);
     ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db2);
     dbList.forEach(db -> _queryQuotaManager.updateDatabaseRateLimiter(db));
+
     
Assert.assertEquals(dbQuotaMap.get(dbList.get(0)).getRateLimiter().getRate(), 
1);
     
Assert.assertEquals(dbQuotaMap.get(dbList.get(1)).getRateLimiter().getRate(), 
2);
     
Assert.assertEquals(dbQuotaMap.get(dbList.get(2)).getRateLimiter().getRate(), 
2);
@@ -509,6 +622,11 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
     _queryQuotaManager.processQueryRateLimitingClusterConfigChange();
   }
 
+  private void setDefaultApplicationQps(String maxQps) {
+    
CLUSTER_CONFIG_MAP.put(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND,
 maxQps);
+    
_queryQuotaManager.processApplicationQueryRateLimitingClusterConfigChange();
+  }
+
   private void setDatabaseQps(DatabaseConfig databaseConfig, String maxQps) {
     QuotaConfig quotaConfig = new QuotaConfig(null, maxQps);
     databaseConfig.setQuotaConfig(quotaConfig);
@@ -516,11 +634,21 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
     
_queryQuotaManager.createDatabaseRateLimiter(CommonConstants.DEFAULT_DATABASE);
   }
 
+  private void setApplicationQps(String appName, Double maxQps) {
+    ZKMetadataProvider.setApplicationQpsQuota(_testPropertyStore, appName, 
maxQps);
+    _queryQuotaManager.createApplicationRateLimiter(appName);
+  }
+
   private void setQps(TableConfig tableConfig) {
     QuotaConfig quotaConfig = new QuotaConfig(null, TABLE_MAX_QPS_STR);
     tableConfig.setQuotaConfig(quotaConfig);
   }
 
+  private void setQps(TableConfig tableConfig, String value) {
+    QuotaConfig quotaConfig = new QuotaConfig(null, value);
+    tableConfig.setQuotaConfig(quotaConfig);
+  }
+
   private static ExternalView generateBrokerResource(String tableName) {
     ExternalView brokerResource = new 
ExternalView(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
     brokerResource.setState(tableName, BROKER_INSTANCE_ID, "ONLINE");
@@ -531,17 +659,29 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
   private void runQueries()
       throws InterruptedException {
     runQueries(TABLE_MAX_QPS, false);
-    //increase the qps and some of the queries should be throttled.
-    runQueries(TABLE_MAX_QPS * 2, true);
+    // increase the qps and some of the queries should be throttled.
+    // keep in mind that permits are 'regenerated' on every call based on how 
much time elapsed since last one
+    // that means for 25 QPS we get new permit every 40 ms or 0.5 every 20 ms
+    // if we start with 25 permits at time t1 then if we want to exceed the 
qps in the next second  we've to do more
+    // double requests, because 25 will regenerate
+    runQueries(TABLE_MAX_QPS * 2 + 1, true);
+  }
+
+  private void runQueries(double qps, boolean shouldFail)
+      throws InterruptedException {
+    runQueries(qps, shouldFail, APP_NAME);
   }
 
   // try to keep the qps below 50 to ensure that the time lost between 2 query 
runs on top of the sleepMillis
   // is not comparable to sleepMillis, else the actual qps would end being lot 
lower than required qps
-  private void runQueries(double qps, boolean shouldFail)
+  private void runQueries(double qps, boolean shouldFail, String appName)
       throws InterruptedException {
     int failCount = 0;
     long sleepMillis = (long) (1000 / qps);
     for (int i = 0; i < qps; i++) {
+      if (!_queryQuotaManager.acquireApplication(appName)) {
+        failCount++;
+      }
       if 
(!_queryQuotaManager.acquireDatabase(CommonConstants.DEFAULT_DATABASE)) {
         failCount++;
       }
@@ -550,6 +690,11 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
       }
       Thread.sleep(sleepMillis);
     }
-    Assert.assertTrue((failCount == 0 && !shouldFail) || (failCount != 0 && 
shouldFail));
+
+    if (shouldFail) {
+      Assert.assertTrue(failCount != 0, "Expected failure with qps: " + qps + 
" and app :" + appName);
+    } else {
+      Assert.assertTrue(failCount == 0, "Expected no failure with qps: " + qps 
+ " and app :" + appName);
+    }
   }
 }
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
index 0677d9dc5d..df4b1b6bf8 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
@@ -173,6 +173,7 @@ public class BaseSingleStageBrokerRequestHandlerTest {
     QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class);
     when(queryQuotaManager.acquire(anyString())).thenReturn(true);
     when(queryQuotaManager.acquireDatabase(anyString())).thenReturn(true);
+    when(queryQuotaManager.acquireApplication(anyString())).thenReturn(true);
     CountDownLatch latch = new CountDownLatch(1);
     long[] testRequestId = {-1};
     BrokerMetrics.register(mock(BrokerMetrics.class));
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/messages/ApplicationQpsQuotaRefreshMessage.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/ApplicationQpsQuotaRefreshMessage.java
new file mode 100644
index 0000000000..11768f7b37
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/ApplicationQpsQuotaRefreshMessage.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.messages;
+
+import java.util.UUID;
+import org.apache.helix.model.Message;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+
+
+/**
+ * This (Helix) message is sent from the controller to brokers when a request 
is received to update the application
+ * quota.
+ */
+public class ApplicationQpsQuotaRefreshMessage extends Message {
+  public static final String REFRESH_APP_QUOTA_MSG_SUB_TYPE = 
"REFRESH_APPLICATION_QUOTA";
+
+  private static final String APPLICATION_NAME_KEY = "applicationName";
+
+  /**
+   * Constructor for the sender.
+   */
+  public ApplicationQpsQuotaRefreshMessage(String applicationName) {
+    super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
+    setMsgSubType(REFRESH_APP_QUOTA_MSG_SUB_TYPE);
+    // Give it infinite time to process the message, as long as session is 
alive
+    setExecutionTimeout(-1);
+    // Set the Pinot specific fields
+    ZNRecord znRecord = getRecord();
+    znRecord.setSimpleField(APPLICATION_NAME_KEY, applicationName);
+  }
+
+  /**
+   * Constructor for the receiver.
+   */
+  public ApplicationQpsQuotaRefreshMessage(Message message) {
+    super(message.getRecord());
+    if (!message.getMsgSubType().equals(REFRESH_APP_QUOTA_MSG_SUB_TYPE)) {
+      throw new IllegalArgumentException("Invalid message subtype:" + 
message.getMsgSubType());
+    }
+  }
+
+  public String getApplicationName() {
+    return getRecord().getSimpleField(APPLICATION_NAME_KEY);
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 588b9df026..7d1143b0cb 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -63,6 +63,7 @@ public class ZKMetadataProvider {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ZKMetadataProvider.class);
   private static final String CLUSTER_TENANT_ISOLATION_ENABLED_KEY = 
"tenantIsolationEnabled";
+  private static final String CLUSTER_APPLICATION_QUOTAS = "applicationQuotas";
   private static final String PROPERTYSTORE_CONTROLLER_JOBS_PREFIX = 
"/CONTROLLER_JOBS";
   private static final String PROPERTYSTORE_SEGMENTS_PREFIX = "/SEGMENTS";
   private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS";
@@ -112,6 +113,15 @@ public class ZKMetadataProvider {
     
propertyStore.remove(constructPropertyStorePathForDatabaseConfig(databaseName), 
AccessOption.PERSISTENT);
   }
 
+  /**
+   * Remove database config.
+   */
+  @VisibleForTesting
+  public static void removeApplicationQuotas(ZkHelixPropertyStore<ZNRecord> 
propertyStore) {
+    
propertyStore.remove(constructPropertyStorePathForControllerConfig(CLUSTER_APPLICATION_QUOTAS),
+        AccessOption.PERSISTENT);
+  }
+
   private static ZNRecord toZNRecord(DatabaseConfig databaseConfig) {
     ZNRecord databaseConfigZNRecord = new 
ZNRecord(databaseConfig.getDatabaseName());
     Map<String, String> simpleFields = new HashMap<>();
@@ -758,4 +768,66 @@ public class ZKMetadataProvider {
       return true;
     }
   }
+
+  public static boolean setApplicationQpsQuota(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String applicationName,
+      Double value) {
+    final ZNRecord znRecord;
+    final String path = 
constructPropertyStorePathForControllerConfig(CLUSTER_APPLICATION_QUOTAS);
+
+    boolean doCreate;
+    if (!propertyStore.exists(path, AccessOption.PERSISTENT)) {
+      znRecord = new ZNRecord(CLUSTER_APPLICATION_QUOTAS);
+      doCreate = true;
+    } else {
+      znRecord = propertyStore.get(path, null, AccessOption.PERSISTENT);
+      doCreate = false;
+    }
+
+    Map<String, String> quotas = 
znRecord.getMapField(CLUSTER_APPLICATION_QUOTAS);
+    if (quotas == null) {
+      quotas = new HashMap<>();
+      znRecord.setMapField(CLUSTER_APPLICATION_QUOTAS, quotas);
+    }
+    quotas.put(applicationName, value != null ? value.toString() : null);
+
+    if (doCreate) {
+      return propertyStore.create(path, znRecord, AccessOption.PERSISTENT);
+    } else {
+      return propertyStore.set(path, znRecord, AccessOption.PERSISTENT);
+    }
+  }
+
+  @Nullable
+  public static Map<String, Double> 
getApplicationQpsQuotas(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    String controllerConfigPath = 
constructPropertyStorePathForControllerConfig(CLUSTER_APPLICATION_QUOTAS);
+    if (propertyStore.exists(controllerConfigPath, AccessOption.PERSISTENT)) {
+      ZNRecord znRecord = propertyStore.get(controllerConfigPath, null, 
AccessOption.PERSISTENT);
+      if (znRecord.getMapFields().containsKey(CLUSTER_APPLICATION_QUOTAS)) {
+        return 
toApplicationQpsQuotas(znRecord.getMapField(CLUSTER_APPLICATION_QUOTAS));
+      } else {
+        return null;
+      }
+    } else {
+      return null;
+    }
+  }
+
+  private static Map<String, Double> toApplicationQpsQuotas(Map<String, 
String> quotas) {
+    if (quotas == null) {
+      return new HashMap<>();
+    } else {
+      HashMap<String, Double> result = new HashMap<>();
+      for (Map.Entry<String, String> entry : quotas.entrySet()) {
+        if (entry.getValue() != null) {
+          try {
+            double value = Double.parseDouble(entry.getValue());
+            result.put(entry.getKey(), value);
+          } catch (NumberFormatException nfe) {
+            continue;
+          }
+        }
+      }
+      return result;
+    }
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
index 78476f603c..fea05fc8b2 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
@@ -33,6 +33,7 @@ public class Constants {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(Constants.class);
 
+  public static final String APPLICATION_TAG = "Application";
   public static final String CLUSTER_TAG = "Cluster";
   public static final String DATABASE_TAG = "Database";
   public static final String TABLE_TAG = "Table";
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java
new file mode 100644
index 0000000000..db050168fa
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.Collections;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.auth.Actions;
+import org.apache.pinot.core.auth.Authorize;
+import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+
+@Api(tags = Constants.APPLICATION_TAG, authorizations = {@Authorization(value 
= SWAGGER_AUTHORIZATION_KEY)})
+@SwaggerDefinition(securityDefinition = 
@SecurityDefinition(apiKeyAuthDefinitions = {
+    @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key =
+        SWAGGER_AUTHORIZATION_KEY, description =
+        "The format of the key is  ```\"Basic <token>\" or \"Bearer "
+            + "<token>\"```"), @ApiKeyAuthDefinition(name = 
CommonConstants.APPLICATION, in =
+    ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = 
CommonConstants.APPLICATION, description =
+    "Application context passed through http header. If no context is provided 
'default' application "
+        + "context will be considered.")
+}))
+@Path("/")
+public class PinotApplicationQuotaRestletResource {
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(PinotApplicationQuotaRestletResource.class);
+
+  @Inject
+  PinotHelixResourceManager _pinotHelixResourceManager;
+
+  /**
+   * API to get application quota configs. Will return null if application 
quotas are not defined
+   */
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/applicationQuotas")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_APPLICATION_QUERY_QUOTA)
+  @ApiOperation(value = "Get all application qps quotas", notes = "Get all 
application qps quotas")
+  public Map<String, Double> getApplicationQuotas(@Context HttpHeaders 
httpHeaders) {
+    Map<String, Double> quotas = 
_pinotHelixResourceManager.getApplicationQuotas();
+    if (quotas != null) {
+      return quotas;
+    } else {
+      return Collections.emptyMap();
+    }
+  }
+
+  /**
+   * API to get application quota configs. Will return null if application 
quotas are not defined
+   */
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/applicationQuotas/{appName}")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_APPLICATION_QUERY_QUOTA)
+  @ApiOperation(value = "Get application qps quota", notes = "Get application 
qps quota")
+  public Double getApplicationQuota(@Context HttpHeaders httpHeaders, 
@PathParam("appName") String appName) {
+
+    Map<String, Double> quotas = 
_pinotHelixResourceManager.getApplicationQuotas();
+    if (quotas != null && quotas.containsKey(appName)) {
+      return quotas.get(appName);
+    }
+
+    HelixConfigScope scope = new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
+        _pinotHelixResourceManager.getHelixClusterName()).build();
+    HelixAdmin helixAdmin = _pinotHelixResourceManager.getHelixAdmin();
+    String defaultQuota =
+        helixAdmin.getConfig(scope, 
Collections.singletonList(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND))
+            
.getOrDefault(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND, null);
+    return defaultQuota != null ? Double.parseDouble(defaultQuota) : null;
+  }
+
+  /**
+   * API to update the quota configs for application
+   */
+  @POST
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Path("/applicationQuotas/{appName}")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.UPDATE_APPLICATION_QUOTA)
+  @ApiOperation(value = "Update application quota", notes = "Update 
application quota")
+  public SuccessResponse setApplicationQuota(@PathParam("appName") String 
appName,
+      @QueryParam("maxQueriesPerSecond") String queryQuota, @Context 
HttpHeaders httpHeaders) {
+    try {
+      try {
+        Double newQuota = queryQuota != null ? Double.parseDouble(queryQuota) 
: null;
+        _pinotHelixResourceManager.updateApplicationQpsQuota(appName, 
newQuota);
+      } catch (NumberFormatException nfe) {
+        throw new ControllerApplicationException(LOGGER, "Application query 
quota value is not a number",
+            Response.Status.INTERNAL_SERVER_ERROR, nfe);
+      }
+
+      return new SuccessResponse("Query quota for application " + appName + " 
successfully updated");
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java
index eecf2d0778..ad5067f595 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java
@@ -137,7 +137,7 @@ public class PinotDatabaseRestletResource {
   @Path("/databases/{databaseName}/quotas")
   @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.UPDATE_DATABASE_QUOTA)
   @ApiOperation(value = "Update database quotas", notes = "Update database 
quotas")
-  public SuccessResponse addTable(
+  public SuccessResponse setDatabaseQuota(
       @PathParam("databaseName") String databaseName, 
@QueryParam("maxQueriesPerSecond") String queryQuota,
       @Context HttpHeaders httpHeaders) {
     if 
(!databaseName.equals(DatabaseUtils.extractDatabaseFromHttpHeaders(httpHeaders)))
 {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 1b4b722a7e..7ce94f04c8 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -102,6 +102,7 @@ import org.apache.pinot.common.lineage.LineageEntryState;
 import org.apache.pinot.common.lineage.SegmentLineage;
 import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
 import org.apache.pinot.common.lineage.SegmentLineageUtils;
+import org.apache.pinot.common.messages.ApplicationQpsQuotaRefreshMessage;
 import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage;
 import org.apache.pinot.common.messages.RoutingTableRebuildMessage;
 import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
@@ -197,6 +198,7 @@ public class PinotHelixResourceManager {
   private static final int DEFAULT_IDEAL_STATE_UPDATER_LOCKERS_SIZE = 500;
   private static final int DEFAULT_LINEAGE_UPDATER_LOCKERS_SIZE = 500;
   private static final String API_REQUEST_ID_PREFIX = "api-";
+  private static final int INFINITE_TIMEOUT = -1;
 
   private enum LineageUpdateType {
     START, END, REVERT
@@ -1653,6 +1655,19 @@ public class PinotHelixResourceManager {
     sendDatabaseConfigRefreshMessage(databaseConfig.getDatabaseName());
   }
 
+  /**
+   * Updates application quota and sends out a refresh message.
+   *
+   * @param applicationName name of application to set quota for
+   * @param value           quota value to set
+   */
+  public void updateApplicationQpsQuota(String applicationName, Double value) {
+    if (!ZKMetadataProvider.setApplicationQpsQuota(_propertyStore, 
applicationName, value)) {
+      throw new RuntimeException("Failed to create query quota for 
application: " + applicationName);
+    }
+    sendApplicationQpsQuotaRefreshMessage(applicationName);
+  }
+
   /**
    * Updates database config and sends out a database config refresh message.
    * @param databaseConfig database config to be created
@@ -2884,6 +2899,25 @@ public class PinotHelixResourceManager {
     }
   }
 
+  private void sendApplicationQpsQuotaRefreshMessage(String appName) {
+    ApplicationQpsQuotaRefreshMessage message = new 
ApplicationQpsQuotaRefreshMessage(appName);
+
+    // Send database config refresh message to brokers
+    Criteria criteria = new Criteria();
+    criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    criteria.setInstanceName("%");
+    criteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
+    criteria.setSessionSpecific(true);
+
+    int numMessagesSent = _helixZkManager.getMessagingService().send(criteria, 
message, null, INFINITE_TIMEOUT);
+    if (numMessagesSent > 0) {
+      LOGGER.info("Sent {} applcation qps quota refresh messages to brokers 
for application: {}", numMessagesSent,
+          appName);
+    } else {
+      LOGGER.warn("No application qps quota refresh message sent to brokers 
for application: {}", appName);
+    }
+  }
+
   private void sendDatabaseConfigRefreshMessage(String databaseName) {
     DatabaseConfigRefreshMessage databaseConfigRefreshMessage = new 
DatabaseConfigRefreshMessage(databaseName);
 
@@ -3162,6 +3196,16 @@ public class PinotHelixResourceManager {
     return ZKMetadataProvider.getDatabaseConfig(_propertyStore, databaseName);
   }
 
+  /**
+   * Get the database config for the given database name.
+   *
+   * @return map of application name to quotas
+   */
+  @Nullable
+  public Map<String, Double> getApplicationQuotas() {
+    return ZKMetadataProvider.getApplicationQpsQuotas(_propertyStore);
+  }
+
   /**
    * Get the table config for the given table name with type suffix.
    *
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java 
b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
index 51512eeeb4..d92ee5f1b4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
@@ -72,6 +72,7 @@ public class Actions {
     public static final String GET_USER = "GetUser";
     public static final String GET_VERSION = "GetVersion";
     public static final String GET_ZNODE = "GetZnode";
+    public static final String GET_APPLICATION_QUERY_QUOTA = 
"GetApplicationQueryQuota";
     public static final String GET_DATABASE_QUOTA = "GetDatabaseQuota";
     public static final String GET_DATABASE_QUERY_QUOTA = 
"GetDatabaseQueryQuota";
     public static final String INGEST_FILE = "IngestFile";
@@ -91,6 +92,7 @@ public class Actions {
     public static final String UPDATE_TIME_INTERVAL = "UpdateTimeInterval";
     public static final String UPDATE_USER = "UpdateUser";
     public static final String UPDATE_DATABASE_QUOTA = "UpdateDatabaseQuota";
+    public static final String UPDATE_APPLICATION_QUOTA = 
"UpdateApplicationQuota";
     public static final String UPDATE_ZNODE = "UpdateZnode";
     public static final String UPLOAD_SEGMENT = "UploadSegment";
     public static final String GET_INSTANCE_PARTITIONS = 
"GetInstancePartitions";
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index ffe846cf9c..cf1fce6fb0 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -292,15 +292,28 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
    * Creates a new OFFLINE table config.
    */
   protected TableConfig createOfflineTableConfig() {
-    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).setTimeColumnName(getTimeColumnName())
-        
.setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns())
-        
.setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns())
-        
.setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs())
-        
.setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode())
-        
.setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant())
-        
.setIngestionConfig(getIngestionConfig()).setQueryConfig(getQueryConfig())
-        
.setNullHandlingEnabled(getNullHandlingEnabled()).setSegmentPartitionConfig(getSegmentPartitionConfig())
+    // @formatter:off
+    return new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName(getTableName())
+        .setTimeColumnName(getTimeColumnName())
+        .setSortedColumn(getSortedColumn())
+        .setInvertedIndexColumns(getInvertedIndexColumns())
+        .setNoDictionaryColumns(getNoDictionaryColumns())
+        .setRangeIndexColumns(getRangeIndexColumns())
+        .setBloomFilterColumns(getBloomFilterColumns())
+        .setFieldConfigList(getFieldConfigs())
+        .setNumReplicas(getNumReplicas())
+        .setSegmentVersion(getSegmentVersion())
+        .setLoadMode(getLoadMode())
+        .setTaskConfig(getTaskConfig())
+        .setBrokerTenant(getBrokerTenant())
+        .setServerTenant(getServerTenant())
+        .setIngestionConfig(getIngestionConfig())
+        .setQueryConfig(getQueryConfig())
+        .setNullHandlingEnabled(getNullHandlingEnabled())
+        .setSegmentPartitionConfig(getSegmentPartitionConfig())
         .build();
+    // @formatter:on
   }
 
   /**
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
index dfd9d39727..8ac736e507 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
@@ -79,14 +79,20 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
         .buildTransport();
     _pinotConnection = ConnectionFactory.fromZookeeper(properties, getZkUrl() 
+ "/" + getHelixClusterName(),
         _pinotClientTransport);
+
+    // create default application rate limiter manually, otherwise 
verifyQuotaUpdate will fail
+    setQueryQuotaForApplication(null);
   }
 
   @AfterMethod
   void resetQuotas()
       throws Exception {
     addQueryQuotaToClusterConfig(null);
+    addAppQueryQuotaToClusterConfig(null);
+    setQueryQuotaForApplication(null);
     addQueryQuotaToDatabaseConfig(null);
     addQueryQuotaToTableConfig(null);
+
     _brokerHostPort = LOCAL_HOST + ":" + _brokerPorts.get(0);
     verifyQuotaUpdate(0);
   }
@@ -98,6 +104,13 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
     testQueryRate(40);
   }
 
+  @Test
+  public void testDefaultApplicationQueryQuota()
+      throws Exception {
+    addAppQueryQuotaToClusterConfig(50);
+    testQueryRate(50);
+  }
+
   @Test
   public void testDatabaseConfigQueryQuota()
       throws Exception {
@@ -105,6 +118,13 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
     testQueryRate(10);
   }
 
+  @Test
+  public void testApplicationQueryQuota()
+      throws Exception {
+    setQueryQuotaForApplication(15);
+    testQueryRate(15);
+  }
+
   @Test
   public void testDefaultDatabaseQueryQuotaOverride()
       throws Exception {
@@ -117,6 +137,18 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
     testQueryRate(40);
   }
 
+  @Test
+  public void testDefaultApplicationQueryQuotaOverride()
+      throws Exception {
+    addAppQueryQuotaToClusterConfig(25);
+    // override lower than default quota
+    setQueryQuotaForApplication(10);
+    testQueryRate(10);
+    // override higher than default quota
+    setQueryQuotaForApplication(40);
+    testQueryRate(40);
+  }
+
   @Test
   public void testDatabaseQueryQuotaWithTableQueryQuota()
       throws Exception {
@@ -129,6 +161,18 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
     testQueryRate(25);
   }
 
+  @Test
+  public void testApplicationQueryQuotaWithTableQueryQuota()
+      throws Exception {
+    setQueryQuotaForApplication(25);
+    // table quota within database quota. Queries should fail upon table quota 
(10 qps) breach
+    addQueryQuotaToTableConfig(10);
+    testQueryRate(10);
+    // table quota more than database quota. Queries should fail upon database 
quota (25 qps) breach
+    addQueryQuotaToTableConfig(50);
+    testQueryRate(25);
+  }
+
   @Test
   public void testDatabaseQueryQuotaWithTableQueryQuotaWithExtraBroker()
       throws Exception {
@@ -152,6 +196,39 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
     }
   }
 
+  @Test
+  public void 
testApplicationAndDatabaseQueryQuotaWithTableQueryQuotaWithExtraBroker()
+      throws Exception {
+    BaseBrokerStarter brokerStarter = null;
+    try {
+      addAppQueryQuotaToClusterConfig(null);
+      addQueryQuotaToClusterConfig(null);
+      setQueryQuotaForApplication(50);
+      addQueryQuotaToDatabaseConfig(25);
+      addQueryQuotaToTableConfig(10);
+      //
+      // Add one more broker such that quota gets distributed equally among 
them
+      brokerStarter = startOneBroker(2);
+      _brokerHostPort = LOCAL_HOST + ":" + brokerStarter.getPort();
+      // query only one broker across the divided quota
+      testQueryRateOnBroker(5);
+
+      // drop table level quota so that database quota comes into effect
+      addQueryQuotaToTableConfig(null);
+      // query only one broker across the divided quota
+      testQueryRateOnBroker(12.5f);
+
+      // drop database level quota so that app quota comes into effect
+      addQueryQuotaToDatabaseConfig(null);
+      // query only one broker across the divided quota
+      testQueryRateOnBroker(25f);
+    } finally {
+      if (brokerStarter != null) {
+        brokerStarter.stop();
+      }
+    }
+  }
+
   /**
    * Runs the query load with the max rate that the quota can allow and 
ensures queries are not failing.
    * Then runs the query load with double the max rate and expects queries to 
fail due to quota breach.
@@ -181,7 +258,8 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
     long sleepMillis = (long) (1000 / qps);
     Thread.sleep(sleepMillis);
     for (int i = 0; i < qps * 2; i++) {
-      ResultSetGroup resultSetGroup = _pinotConnection.execute("SELECT 
COUNT(*) FROM " + getTableName());
+      ResultSetGroup resultSetGroup =
+          _pinotConnection.execute("SET applicationName='default'; SELECT 
COUNT(*) FROM " + getTableName());
       for (PinotClientException exception : resultSetGroup.getExceptions()) {
         if (exception.getMessage().contains("QuotaExceededError")) {
           failCount++;
@@ -190,24 +268,48 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
       }
       Thread.sleep(sleepMillis);
     }
-    assertTrue((failCount == 0 && !shouldFail) || (failCount != 0 && 
shouldFail));
+    if (shouldFail) {
+      assertTrue(failCount != 0, "Expected >0 failures for qps: " + qps);
+    } else {
+      assertTrue(failCount == 0, "Expected 0 failures for qps: " + qps);
+    }
   }
 
+  private static volatile float _quota;
+  private static volatile String _quotaSource;
+
   private void verifyQuotaUpdate(float quotaQps) {
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        float tableQuota = 
Float.parseFloat(sendGetRequest(String.format("http://%s/debug/tables/queryQuota/%s_OFFLINE";,
-            _brokerHostPort, getTableName())));
-        tableQuota = tableQuota == 0 ? Long.MAX_VALUE : tableQuota;
-        float dbQuota = 
Float.parseFloat(sendGetRequest(String.format("http://%s/debug/databases/queryQuota/default";,
-            _brokerHostPort)));
-        dbQuota = dbQuota == 0 ? Long.MAX_VALUE : dbQuota;
-        return quotaQps == Math.min(tableQuota, dbQuota)
-            || (quotaQps == 0 && tableQuota == Long.MAX_VALUE && dbQuota == 
Long.MAX_VALUE);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }, 5000, "Failed to reflect query quota on rate limiter in 5s");
+    try {
+      TestUtils.waitForCondition(aVoid -> {
+        try {
+          float tableQuota = Float.parseFloat(sendGetRequest(
+              String.format("http://%s/debug/tables/queryQuota/%s_OFFLINE";, 
_brokerHostPort, getTableName())));
+          tableQuota = tableQuota == 0 ? Long.MAX_VALUE : tableQuota;
+          float dbQuota = Float.parseFloat(
+              
sendGetRequest(String.format("http://%s/debug/databases/queryQuota/default";, 
_brokerHostPort)));
+          float appQuota = Float.parseFloat(
+              
sendGetRequest(String.format("http://%s/debug/applicationQuotas/default";, 
_brokerHostPort)));
+          dbQuota = dbQuota == 0 ? Long.MAX_VALUE : dbQuota;
+          appQuota = appQuota == 0 ? Long.MAX_VALUE : appQuota;
+          float actualQuota = Math.min(Math.min(tableQuota, dbQuota), 
appQuota);
+          _quota = actualQuota;
+          if (_quota == dbQuota) {
+            _quotaSource = "database";
+          } else if (_quota == tableQuota) {
+            _quotaSource = "table";
+          } else {
+            _quotaSource = "application";
+          }
+          return quotaQps == actualQuota || (quotaQps == 0 && tableQuota == 
Long.MAX_VALUE && dbQuota == Long.MAX_VALUE
+              && appQuota == Long.MAX_VALUE);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }, 5000, "Failed to reflect query quota on rate limiter in 5s.");
+    } catch (AssertionError ae) {
+      throw new AssertionError(
+          ae.getMessage() + " Expected quota:" + quotaQps + " but is: " + 
_quota + " set on: " + _quotaSource, ae);
+    }
   }
 
   private BrokerResponse executeQueryOnBroker(String query) {
@@ -220,7 +322,8 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
     long sleepMillis = (long) (1000 / qps);
     Thread.sleep(sleepMillis);
     for (int i = 0; i < qps * 2; i++) {
-      BrokerResponse resultSetGroup = executeQueryOnBroker("SELECT COUNT(*) 
FROM " + getTableName());
+      BrokerResponse resultSetGroup =
+          executeQueryOnBroker("SET applicationName='default'; SELECT COUNT(*) 
FROM " + getTableName());
       for (Iterator<JsonNode> it = resultSetGroup.getExceptions().elements(); 
it.hasNext(); ) {
         JsonNode exception = it.next();
         if (exception.toPrettyString().contains("QuotaExceededError")) {
@@ -230,7 +333,12 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
       }
       Thread.sleep(sleepMillis);
     }
-    assertTrue((failCount == 0 && !shouldFail) || (failCount != 0 && 
shouldFail));
+
+    if (shouldFail) {
+      assertTrue(failCount != 0, "Expected >0 failures for qps: " + qps);
+    } else {
+      assertTrue(failCount == 0, "Expected 0 failures for qps: " + qps);
+    }
   }
 
   public void addQueryQuotaToTableConfig(Integer maxQps)
@@ -251,6 +359,16 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
     // to allow change propagation to QueryQuotaManager
   }
 
+  public void setQueryQuotaForApplication(Integer maxQps)
+      throws Exception {
+    String url = _controllerRequestURLBuilder.getBaseUrl() + 
"/applicationQuotas/default";
+    if (maxQps != null) {
+      url += "?maxQueriesPerSecond=" + maxQps;
+    }
+    HttpClient.wrapAndThrowHttpException(_httpClient.sendPostRequest(new 
URI(url), null, null));
+    // to allow change propagation to QueryQuotaManager
+  }
+
   public void addQueryQuotaToClusterConfig(Integer maxQps)
       throws Exception {
     if (maxQps == null) {
@@ -264,4 +382,18 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
     }
     // to allow change propagation to QueryQuotaManager
   }
+
+  public void addAppQueryQuotaToClusterConfig(Integer maxQps)
+      throws Exception {
+    if (maxQps == null) {
+      HttpClient.wrapAndThrowHttpException(_httpClient.sendDeleteRequest(new 
URI(
+          _controllerRequestURLBuilder.forClusterConfigs() + "/"
+              + CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND)));
+    } else {
+      String payload = "{\"" + 
CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND + "\":\"" + maxQps + 
"\"}";
+      HttpClient.wrapAndThrowHttpException(
+          _httpClient.sendJsonPostRequest(new 
URI(_controllerRequestURLBuilder.forClusterConfigs()), payload));
+    }
+    // to allow change propagation to QueryQuotaManager
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index f62efb2062..e889b5e0f7 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -57,6 +57,8 @@ public class CommonConstants {
   public static final String CONFIG_OF_SWAGGER_RESOURCES_PATH = 
"META-INF/resources/webjars/swagger-ui/";
   public static final String CONFIG_OF_TIMEZONE = "pinot.timezone";
 
+  public static final String APPLICATION = "application";
+
   public static final String DATABASE = "database";
   public static final String DEFAULT_DATABASE = "default";
   public static final String CONFIG_OF_PINOT_INSECURE_MODE = 
"pinot.insecure.mode";
@@ -86,6 +88,7 @@ public class CommonConstants {
     public static final String QUERIES_DISABLED = "queriesDisabled";
     public static final String QUERY_RATE_LIMIT_DISABLED = 
"queryRateLimitDisabled";
     public static final String DATABASE_MAX_QUERIES_PER_SECOND = 
"databaseMaxQueriesPerSecond";
+    public static final String APPLICATION_MAX_QUERIES_PER_SECOND = 
"applicationMaxQueriesPerSecond";
 
     public static final String INSTANCE_CONNECTED_METRIC_NAME = 
"helix.connected";
 
@@ -401,6 +404,7 @@ public class CommonConstants {
         public static final String USE_MULTISTAGE_ENGINE = 
"useMultistageEngine";
         public static final String INFER_PARTITION_HINT = "inferPartitionHint";
         public static final String ENABLE_NULL_HANDLING = "enableNullHandling";
+        public static final String APPLICATION_NAME = "applicationName";
         /**
          * If set, changes the explain behavior in multi-stage engine.
          *


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to