This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a7e11682a Database query quota (#13544)
9a7e11682a is described below

commit 9a7e11682a56761684edb1aae4a7ec5b9984c6b8
Author: Shounak kulkarni <shounakmk...@gmail.com>
AuthorDate: Mon Aug 5 08:42:26 2024 +0500

    Database query quota (#13544)
---
 .../broker/broker/helix/BaseBrokerStarter.java     |  17 ++
 ...okerResourceOnlineOfflineStateModelFactory.java |   3 +
 .../BrokerUserDefinedMessageHandlerFactory.java    |  33 +++
 .../broker/broker/helix/ClusterChangeMediator.java |  12 +-
 .../HelixExternalViewBasedQueryQuotaManager.java   | 219 ++++++++++++++++++--
 .../pinot/broker/queryquota/QueryQuotaManager.java |   7 +
 .../BaseSingleStageBrokerRequestHandler.java       |  10 +-
 .../MultiStageBrokerRequestHandler.java            |  13 +-
 ...elixExternalViewBasedQueryQuotaManagerTest.java | 222 +++++++++++++++++----
 .../BaseSingleStageBrokerRequestHandlerTest.java   |   1 +
 .../messages/DatabaseConfigRefreshMessage.java     |  60 ++++++
 .../pinot/common/metadata/ZKMetadataProvider.java  |  85 ++++++++
 .../apache/pinot/common/utils/DatabaseUtils.java   |  14 ++
 .../resources/PinotDatabaseRestletResource.java    |  85 +++++++-
 .../helix/core/PinotHelixResourceManager.java      |  56 ++++++
 .../java/org/apache/pinot/core/auth/Actions.java   |   2 +
 .../tests/QueryQuotaClusterIntegrationTest.java    | 207 +++++++++++++++++++
 .../apache/pinot/spi/config/DatabaseConfig.java    |  56 ++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |   1 +
 19 files changed, 1046 insertions(+), 57 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 04bf6ce921..553228d89c 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -107,6 +107,8 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
   protected String _instanceId;
   private volatile boolean _isStarting = false;
   private volatile boolean _isShuttingDown = false;
+
+  protected final List<ClusterChangeHandler> _clusterConfigChangeHandlers = 
new ArrayList<>();
   protected final List<ClusterChangeHandler> _idealStateChangeHandlers = new 
ArrayList<>();
   protected final List<ClusterChangeHandler> _externalViewChangeHandlers = new 
ArrayList<>();
   protected final List<ClusterChangeHandler> _instanceConfigChangeHandlers = 
new ArrayList<>();
@@ -214,6 +216,15 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     _instanceConfigChangeHandlers.add(instanceConfigChangeHandler);
   }
 
+  /**
+   * Adds a cluster config change handler to handle Helix cluster config 
change callbacks.
+   * <p>NOTE: all change handlers will be run in a single thread, so any slow 
change handler can block other change
+   * handlers from running. For slow change handler, make it asynchronous.
+   */
+  public void addClusterConfigChangeHandler(ClusterChangeHandler 
clusterConfigChangeHandler) {
+    _clusterConfigChangeHandlers.add(clusterConfigChangeHandler);
+  }
+
   /**
    * Adds a live instance change handler to handle Helix live instance change 
callbacks.
    * <p>NOTE: all change handlers will be run in a single thread, so any slow 
change handler can block other change
@@ -350,6 +361,10 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     _brokerAdminApplication.start(_listenerConfigs);
 
     LOGGER.info("Initializing cluster change mediator");
+    for (ClusterChangeHandler clusterConfigChangeHandler : 
_clusterConfigChangeHandlers) {
+      clusterConfigChangeHandler.init(_spectatorHelixManager);
+    }
+    _clusterConfigChangeHandlers.add(queryQuotaManager);
     for (ClusterChangeHandler idealStateChangeHandler : 
_idealStateChangeHandlers) {
       idealStateChangeHandler.init(_spectatorHelixManager);
     }
@@ -368,6 +383,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
       liveInstanceChangeHandler.init(_spectatorHelixManager);
     }
     Map<ChangeType, List<ClusterChangeHandler>> clusterChangeHandlersMap = new 
HashMap<>();
+    clusterChangeHandlersMap.put(ChangeType.CLUSTER_CONFIG, 
_clusterConfigChangeHandlers);
     clusterChangeHandlersMap.put(ChangeType.IDEAL_STATE, 
_idealStateChangeHandlers);
     clusterChangeHandlersMap.put(ChangeType.EXTERNAL_VIEW, 
_externalViewChangeHandlers);
     clusterChangeHandlersMap.put(ChangeType.INSTANCE_CONFIG, 
_instanceConfigChangeHandlers);
@@ -379,6 +395,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     _spectatorHelixManager.addIdealStateChangeListener(_clusterChangeMediator);
     
_spectatorHelixManager.addExternalViewChangeListener(_clusterChangeMediator);
     
_spectatorHelixManager.addInstanceConfigChangeListener(_clusterChangeMediator);
+    _spectatorHelixManager.addClusterfigChangeListener(_clusterChangeMediator);
     if (!_liveInstanceChangeHandlers.isEmpty()) {
       
_spectatorHelixManager.addLiveInstanceChangeListener(_clusterChangeMediator);
     }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
index 648a2a43ff..41f9e9ef87 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
@@ -30,6 +30,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import 
org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
 import org.apache.pinot.broker.routing.BrokerRoutingManager;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.utils.DatabaseUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,6 +82,8 @@ public class BrokerResourceOnlineOfflineStateModelFactory 
extends StateModelFact
         TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
         _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig,
             
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(BROKER_RESOURCE_INSTANCE)));
+        _queryQuotaManager.createDatabaseRateLimiter(
+            
DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(tableNameWithType));
       } catch (Exception e) {
         LOGGER.error("Caught exception while processing transition from 
OFFLINE to ONLINE for table: {}",
             tableNameWithType, e);
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
index 4283b10cc9..2c2cc33532 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
@@ -25,9 +25,11 @@ import 
org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.model.Message;
 import 
org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
 import org.apache.pinot.broker.routing.BrokerRoutingManager;
+import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage;
 import org.apache.pinot.common.messages.RoutingTableRebuildMessage;
 import org.apache.pinot.common.messages.SegmentRefreshMessage;
 import org.apache.pinot.common.messages.TableConfigRefreshMessage;
+import org.apache.pinot.common.utils.DatabaseUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,6 +63,8 @@ public class BrokerUserDefinedMessageHandlerFactory 
implements MessageHandlerFac
         return new RefreshTableConfigMessageHandler(new 
TableConfigRefreshMessage(message), context);
       case RoutingTableRebuildMessage.REBUILD_ROUTING_TABLE_MSG_SUB_TYPE:
         return new RebuildRoutingTableMessageHandler(new 
RoutingTableRebuildMessage(message), context);
+      case DatabaseConfigRefreshMessage.REFRESH_DATABASE_CONFIG_MSG_SUB_TYPE:
+        return new RefreshDatabaseConfigMessageHandler(new 
DatabaseConfigRefreshMessage(message), context);
       default:
         // NOTE: Log a warning and return no-op message handler for 
unsupported message sub-types. This can happen when
         //       a new message sub-type is added, and the sender gets deployed 
first while receiver is still running the
@@ -117,6 +121,9 @@ public class BrokerUserDefinedMessageHandlerFactory 
implements MessageHandlerFac
       // TODO: Fetch the table config here and pass it into the managers, or 
consider merging these 2 managers
       _routingManager.buildRouting(_tableNameWithType);
       _queryQuotaManager.initOrUpdateTableQueryQuota(_tableNameWithType);
+      // only create the rate limiter if not present. This message has no 
reason to update the database rate limiter
+      _queryQuotaManager.createDatabaseRateLimiter(
+          
DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(_tableNameWithType));
       HelixTaskResult result = new HelixTaskResult();
       result.setSuccess(true);
       return result;
@@ -129,6 +136,32 @@ public class BrokerUserDefinedMessageHandlerFactory 
implements MessageHandlerFac
     }
   }
 
+  private class RefreshDatabaseConfigMessageHandler extends MessageHandler {
+    final String _databaseName;
+
+    RefreshDatabaseConfigMessageHandler(DatabaseConfigRefreshMessage 
databaseConfigRefreshMessage,
+        NotificationContext context) {
+      super(databaseConfigRefreshMessage, context);
+      _databaseName = databaseConfigRefreshMessage.getDatabaseName();
+    }
+
+    @Override
+    public HelixTaskResult handleMessage() {
+      // only update the existing rate limiter.
+      // Database rate limiter creation should only be done through table 
based change triggers
+      _queryQuotaManager.updateDatabaseRateLimiter(_databaseName);
+      HelixTaskResult result = new HelixTaskResult();
+      result.setSuccess(true);
+      return result;
+    }
+
+    @Override
+    public void onError(Exception e, ErrorCode code, ErrorType type) {
+      LOGGER.error("Got error while refreshing database config for database: 
{} (error code: {}, error type: {})",
+          _databaseName, code, type, e);
+    }
+  }
+
   private class RebuildRoutingTableMessageHandler extends MessageHandler {
     final String _tableNameWithType;
 
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
index 202f1a3f8e..f1e25804bf 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
@@ -26,11 +26,13 @@ import java.util.concurrent.TimeUnit;
 import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.api.listeners.BatchMode;
+import org.apache.helix.api.listeners.ClusterConfigChangeListener;
 import org.apache.helix.api.listeners.ExternalViewChangeListener;
 import org.apache.helix.api.listeners.IdealStateChangeListener;
 import org.apache.helix.api.listeners.InstanceConfigChangeListener;
 import org.apache.helix.api.listeners.LiveInstanceChangeListener;
 import org.apache.helix.api.listeners.PreFetch;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
@@ -55,7 +57,7 @@ import org.slf4j.LoggerFactory;
 @PreFetch(enabled = false)
 public class ClusterChangeMediator
     implements IdealStateChangeListener, ExternalViewChangeListener, 
InstanceConfigChangeListener,
-               LiveInstanceChangeListener {
+               ClusterConfigChangeListener, LiveInstanceChangeListener {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterChangeMediator.class);
 
   // If no change got for 1 hour, proactively check changes
@@ -192,6 +194,14 @@ public class ClusterChangeMediator
     enqueueChange(ChangeType.INSTANCE_CONFIG);
   }
 
+  @Override
+  public void onClusterConfigChange(ClusterConfig clusterConfig, 
NotificationContext context) {
+    // Cluster config should be null because Helix pre-fetch is disabled
+    assert clusterConfig == null;
+
+    enqueueChange(ChangeType.CLUSTER_CONFIG);
+  }
+
   @Override
   public void onLiveInstanceChange(List<LiveInstance> liveInstances, 
NotificationContext changeContext) {
     // Live instance list should be empty because Helix pre-fetch is disabled
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index dabb95867b..d05de53f3e 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -21,14 +21,22 @@ package org.apache.pinot.broker.queryquota;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.RateLimiter;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.collections4.SetUtils;
 import org.apache.helix.AccessOption;
+import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixManager;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
@@ -37,6 +45,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.BrokerGauge;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.spi.config.DatabaseConfig;
 import org.apache.pinot.spi.config.table.QuotaConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -49,18 +58,30 @@ import org.slf4j.LoggerFactory;
 
 /**
  * This class is to support the qps quota feature.
- * It depends on the broker source change to update the dynamic rate limit,
- *  which means it only gets updated when a new table added or a broker 
restarted.
+ * It allows performing qps quota check at table level and database level
+ * For table level check it depends on the broker source change to update the 
dynamic rate limit,
+ *  which means it gets updated when a new table added or a broker restarted.
+ * For database level check it depends on the broker as well as cluster config 
and database config change
+ * to update the dynamic rate limit, which means it gets updated when
+ * - the default query quota at cluster config is updated
+ * - the database config is updated
+ * - new table is assigned to the broker (rate limiter is created if not 
present)
+ * - broker added or removed from cluster
  */
 public class HelixExternalViewBasedQueryQuotaManager implements 
ClusterChangeHandler, QueryQuotaManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class);
   private static final int ONE_SECOND_TIME_RANGE_IN_SECOND = 1;
   private static final int ONE_MINUTE_TIME_RANGE_IN_SECOND = 60;
+  private static final Set<HelixConstants.ChangeType> CHANGE_TYPES_TO_PROCESS 
= SetUtils.hashSet(
+      HelixConstants.ChangeType.EXTERNAL_VIEW, 
HelixConstants.ChangeType.INSTANCE_CONFIG,
+      HelixConstants.ChangeType.CLUSTER_CONFIG);
 
   private final BrokerMetrics _brokerMetrics;
   private final String _instanceId;
   private final AtomicInteger _lastKnownBrokerResourceVersion = new 
AtomicInteger(-1);
   private final Map<String, QueryQuotaEntity> _rateLimiterMap = new 
ConcurrentHashMap<>();
+  private final Map<String, QueryQuotaEntity> _databaseRateLimiterMap = new 
ConcurrentHashMap<>();
+  private double _defaultQpsQuotaForDatabase;
 
   private HelixManager _helixManager;
   private ZkHelixPropertyStore<ZNRecord> _propertyStore;
@@ -76,20 +97,22 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     Preconditions.checkState(_helixManager == null, 
"HelixExternalViewBasedQueryQuotaManager is already initialized");
     _helixManager = helixManager;
     _propertyStore = _helixManager.getHelixPropertyStore();
+    _defaultQpsQuotaForDatabase = getDefaultQueryQuotaForDatabase();
     getQueryQuotaEnabledFlagFromInstanceConfig();
   }
 
   @Override
   public void processClusterChange(HelixConstants.ChangeType changeType) {
-    Preconditions.checkState(changeType == 
HelixConstants.ChangeType.EXTERNAL_VIEW
-        || changeType == HelixConstants.ChangeType.INSTANCE_CONFIG, "Illegal 
change type: " + changeType);
+    Preconditions.checkState(CHANGE_TYPES_TO_PROCESS.contains(changeType), 
"Illegal change type: " + changeType);
     if (changeType == HelixConstants.ChangeType.EXTERNAL_VIEW) {
       ExternalView brokerResourceEV = HelixHelper
           .getExternalViewForResource(_helixManager.getClusterManagmentTool(), 
_helixManager.getClusterName(),
               CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
       processQueryRateLimitingExternalViewChange(brokerResourceEV);
-    } else {
+    } else if (changeType == HelixConstants.ChangeType.INSTANCE_CONFIG) {
       processQueryRateLimitingInstanceConfigChange();
+    } else {
+      processQueryRateLimitingClusterConfigChange();
     }
   }
 
@@ -230,6 +253,118 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     }
   }
 
+  /**
+   * Updates the database rate limiter if it already exists. Will not create a 
new database rate limiter.
+   * @param databaseName database name for which rate limiter needs to be 
updated
+   */
+  public void updateDatabaseRateLimiter(String databaseName) {
+    if (!_databaseRateLimiterMap.containsKey(databaseName)) {
+      return;
+    }
+    createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+  }
+
+  // Caller method need not worry about getting lock on _databaseRateLimiterMap
+  // as this method will do idempotent updates to the database rate limiters
+  private synchronized void createOrUpdateDatabaseRateLimiter(List<String> 
databaseNames) {
+    ExternalView brokerResource = HelixHelper
+        .getExternalViewForResource(_helixManager.getClusterManagmentTool(), 
_helixManager.getClusterName(),
+            CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    for (String databaseName : databaseNames) {
+      double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName);
+      if (databaseQpsQuota < 0) {
+        buildEmptyOrResetDatabaseRateLimiter(databaseName);
+        continue;
+      }
+      int numOnlineBrokers = getNumOnlineBrokers(databaseName, brokerResource);
+      double perBrokerQpsQuota = databaseQpsQuota / numOnlineBrokers;
+      QueryQuotaEntity oldQueryQuotaEntity = 
_databaseRateLimiterMap.get(databaseName);
+      if (oldQueryQuotaEntity == null) {
+        LOGGER.info("Adding new query rate limiter for database {} with rate 
{}.", databaseName, perBrokerQpsQuota);
+        QueryQuotaEntity queryQuotaEntity = new 
QueryQuotaEntity(RateLimiter.create(perBrokerQpsQuota),
+            new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), new 
MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND),
+            numOnlineBrokers, databaseQpsQuota, -1);
+        _databaseRateLimiterMap.put(databaseName, queryQuotaEntity);
+        continue;
+      }
+      boolean changeDetected = false;
+      double oldQuota = oldQueryQuotaEntity.getRateLimiter() != null ? 
oldQueryQuotaEntity.getRateLimiter().getRate()
+          : -1;
+      if (oldQueryQuotaEntity.getOverallRate() != databaseQpsQuota) {
+        changeDetected = true;
+        LOGGER.info("Overall quota changed for the database from {} to {}", 
oldQueryQuotaEntity.getOverallRate(),
+            databaseQpsQuota);
+        oldQueryQuotaEntity.setOverallRate(databaseQpsQuota);
+      }
+      if (oldQueryQuotaEntity.getNumOnlineBrokers() != numOnlineBrokers) {
+        changeDetected = true;
+        LOGGER.info("Number of online brokers changed for the database from {} 
to {}",
+            oldQueryQuotaEntity.getNumOnlineBrokers(), numOnlineBrokers);
+        oldQueryQuotaEntity.setNumOnlineBrokers(numOnlineBrokers);
+      }
+      if (!changeDetected) {
+        LOGGER.info("No change detected with the query rate limiter for 
database {}", databaseName);
+        continue;
+      }
+      LOGGER.info("Updating existing query rate limiter for database {} from 
rate {} to {}", databaseName, oldQuota,
+          perBrokerQpsQuota);
+      
oldQueryQuotaEntity.setRateLimiter(RateLimiter.create(perBrokerQpsQuota));
+    }
+  }
+
+  // Pulling this logic to a separate placeholder method so that the quota 
split logic
+  // can be enhanced further in isolation.
+  private int getNumOnlineBrokers(String databaseName, ExternalView 
brokerResource) {
+    // Tables in database can span across broker tags as we don't maintain a 
broker tag to database mapping as of now.
+    // Hence, we consider all online brokers for the rate distribution.
+    // TODO consider computing only the online brokers which serve the tables 
under the database
+    return 
HelixHelper.getOnlineInstanceFromExternalView(brokerResource).size();
+  }
+
+  /**
+   * Utility to get the effective query quota being imposed on a database.
+   * It is computed based on the default quota set at cluster config and 
override set at database config
+   * @param databaseName database name to get the query quota on.
+   * @return effective query quota limit being applied
+   */
+  private double getEffectiveQueryQuotaOnDatabase(String databaseName) {
+    DatabaseConfig databaseConfig =
+        
ZKMetadataProvider.getDatabaseConfig(_helixManager.getHelixPropertyStore(), 
databaseName);
+    if (databaseConfig != null && databaseConfig.getQuotaConfig() != null
+        && databaseConfig.getQuotaConfig().getMaxQPS() != -1) {
+      return databaseConfig.getQuotaConfig().getMaxQPS();
+    }
+    return _defaultQpsQuotaForDatabase;
+  }
+
+  /**
+   * Creates a new database rate limiter. Will not update the database rate 
limiter if it already exists.
+   * @param databaseName database name for which rate limiter needs to be 
created
+   */
+  public void createDatabaseRateLimiter(String databaseName) {
+    if (_databaseRateLimiterMap.containsKey(databaseName)) {
+      return;
+    }
+    createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+  }
+
+  /**
+   * Build an empty rate limiter in the new query quota entity, or set the 
rate limiter to null in an existing query
+   * quota entity.
+   */
+  private void buildEmptyOrResetDatabaseRateLimiter(String databaseName) {
+    QueryQuotaEntity queryQuotaEntity = 
_databaseRateLimiterMap.get(databaseName);
+    if (queryQuotaEntity == null) {
+      // Create an QueryQuotaEntity object without setting a rate limiter.
+      queryQuotaEntity = new QueryQuotaEntity(null, new 
HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
+          new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), 0, 0, 0);
+      _databaseRateLimiterMap.put(databaseName, queryQuotaEntity);
+    } else {
+      // Set rate limiter to null for an existing QueryQuotaEntity object.
+      queryQuotaEntity.setRateLimiter(null);
+    }
+  }
+
   /**
    * Build an empty rate limiter in the new query quota entity, or set the 
rate limiter to null in an existing query
    * quota entity.
@@ -279,6 +414,20 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     }
   }
 
+  @Override
+  public boolean acquireDatabase(String databaseName) {
+    // Return true if query quota is disabled in the current broker.
+    if (isQueryRateLimitDisabled()) {
+      return true;
+    }
+    QueryQuotaEntity queryQuota = _databaseRateLimiterMap.get(databaseName);
+    if (queryQuota == null) {
+      return true;
+    }
+    LOGGER.debug("Trying to acquire token for database: {}", databaseName);
+    return tryAcquireToken(databaseName, queryQuota);
+  }
+
   /**
    * {@inheritDoc}
    * <p>Acquires a token from rate limiter based on the table name.
@@ -291,7 +440,6 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     if (isQueryRateLimitDisabled()) {
       return true;
     }
-    LOGGER.debug("Trying to acquire token for table: {}", tableName);
     String offlineTableName = null;
     String realtimeTableName = null;
     QueryQuotaEntity offlineTableQueryQuotaEntity = null;
@@ -311,21 +459,27 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
       realtimeTableQueryQuotaEntity = _rateLimiterMap.get(realtimeTableName);
     }
 
-    boolean offlineQuotaOk =
-        offlineTableQueryQuotaEntity == null || 
tryAcquireToken(offlineTableName, offlineTableQueryQuotaEntity);
-    boolean realtimeQuotaOk =
-        realtimeTableQueryQuotaEntity == null || 
tryAcquireToken(realtimeTableName, realtimeTableQueryQuotaEntity);
+    boolean offlineQuotaOk = true;
+    if (offlineTableQueryQuotaEntity != null) {
+      LOGGER.debug("Trying to acquire token for table: {}", offlineTableName);
+      offlineQuotaOk = tryAcquireToken(offlineTableName, 
offlineTableQueryQuotaEntity);
+    }
+    boolean realtimeQuotaOk = true;
+    if (realtimeTableQueryQuotaEntity != null) {
+      LOGGER.debug("Trying to acquire token for table: {}", realtimeTableName);
+      realtimeQuotaOk = tryAcquireToken(realtimeTableName, 
realtimeTableQueryQuotaEntity);
+    }
 
     return offlineQuotaOk && realtimeQuotaOk;
   }
 
   /**
    * Try to acquire token from rate limiter. Emit the utilization of the qps 
quota if broker metric isn't null.
-   * @param tableNameWithType table name with type.
+   * @param resourceName resource name to acquire.
    * @param queryQuotaEntity query quota entity for type-specific table.
    * @return true if there's no qps quota for that table, or a token is 
acquired successfully.
    */
-  private boolean tryAcquireToken(String tableNameWithType, QueryQuotaEntity 
queryQuotaEntity) {
+  private boolean tryAcquireToken(String resourceName, QueryQuotaEntity 
queryQuotaEntity) {
     // Use hit counter to count the number of hits.
     queryQuotaEntity.getQpsTracker().hit();
     queryQuotaEntity.getMaxQpsTracker().hit();
@@ -340,7 +494,7 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     // Emit the qps capacity utilization rate.
     int numHits = queryQuotaEntity.getQpsTracker().getHitCount();
     if (!rateLimiter.tryAcquire()) {
-      LOGGER.info("Quota is exceeded for table: {}. Per-broker rate: {}. 
Current qps: {}", tableNameWithType,
+      LOGGER.info("Quota is exceeded for table/database: {}. Per-broker rate: 
{}. Current qps: {}", resourceName,
           perBrokerRate, numHits);
       return false;
     }
@@ -353,6 +507,11 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     return _rateLimiterMap.size();
   }
 
+  @VisibleForTesting
+  public Map<String, QueryQuotaEntity> getDatabaseRateLimiterMap() {
+    return _databaseRateLimiterMap;
+  }
+
   @VisibleForTesting
   public QueryQuotaEntity getRateLimiterForTable(String tableNameWithType) {
     return _rateLimiterMap.get(tableNameWithType);
@@ -448,6 +607,19 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
         numRebuilt++;
       }
     }
+
+    // handle EV change for database query quotas
+    int onlineBrokerCount = 
HelixHelper.getOnlineInstanceFromExternalView(currentBrokerResourceEV).size();
+    for (Map.Entry<String, QueryQuotaEntity> it : 
_databaseRateLimiterMap.entrySet()) {
+      QueryQuotaEntity quota = it.getValue();
+      if (quota.getNumOnlineBrokers() != onlineBrokerCount) {
+        quota.setNumOnlineBrokers(onlineBrokerCount);
+      }
+      if (quota.getOverallRate() > 0) {
+        quota.setRateLimiter(RateLimiter.create(quota.getOverallRate() / 
onlineBrokerCount));
+      }
+    }
+
     if (isQueryRateLimitDisabled()) {
       LOGGER.info("Query rate limiting is currently disabled for this broker. 
So it won't take effect immediately.");
     }
@@ -458,6 +630,27 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
             numRebuilt, _rateLimiterMap.size());
   }
 
+  /**
+   * Process query quota state change when cluster config gets changed
+   */
+  public void processQueryRateLimitingClusterConfigChange() {
+    double oldDatabaseQpsQuota = _defaultQpsQuotaForDatabase;
+    _defaultQpsQuotaForDatabase = getDefaultQueryQuotaForDatabase();
+    if (oldDatabaseQpsQuota == _defaultQpsQuotaForDatabase) {
+      return;
+    }
+    createOrUpdateDatabaseRateLimiter(new 
ArrayList<>(_databaseRateLimiterMap.keySet()));
+  }
+
+  private double getDefaultQueryQuotaForDatabase() {
+    HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool();
+    HelixConfigScope configScope = new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
+        .forCluster(_helixManager.getClusterName()).build();
+    return Double.parseDouble(helixAdmin.getConfig(configScope,
+            
Collections.singletonList(CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND))
+            
.getOrDefault(CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND, "-1"));
+  }
+
   /**
    * Process query quota state change when instance config gets changed
    */
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
index 6fd335e4d3..57faef8778 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
@@ -26,4 +26,11 @@ public interface QueryQuotaManager {
    * @return {@code true} if the table quota has not been reached, {@code 
false} otherwise
    */
   boolean acquire(String tableName);
+
+  /**
+   * Try to acquire a quota for the given database.
+   * @param databaseName database name
+   * @return {@code true} if the database quota has not been reached, {@code 
false} otherwise
+   */
+  boolean acquireDatabase(String databaseName);
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index 6a839d5e36..dfac99bb82 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -506,6 +506,14 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
       }
 
       // Validate QPS quota
+      String database = 
DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(tableName);
+      if (!_queryQuotaManager.acquireDatabase(database)) {
+        String errorMessage =
+            String.format("Request %d: %s exceeds query quota for database: 
%s", requestId, query, database);
+        LOGGER.info(errorMessage);
+        
requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE);
+        return new 
BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR,
 errorMessage));
+      }
       if (!_queryQuotaManager.acquire(tableName)) {
         String errorMessage =
             String.format("Request %d: %s exceeds query quota for table: %s", 
requestId, query, tableName);
@@ -531,7 +539,7 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
 
       if (!pinotQuery.isExplain() && _enableMultistageMigrationMetric) {
         // Check if the query is a v2 supported query
-        String database = 
DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(), 
httpHeaders);
+        database = 
DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(), 
httpHeaders);
         // Attempt to add the query to the compile queue; drop if queue is full
         if (!_multistageCompileQueryQueue.offer(Pair.of(query, database))) {
           LOGGER.trace("Not compiling query `{}` using the multi-stage query 
engine because the query queue is full",
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index c8f7c4c2f6..a1e82dbd53 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -129,10 +129,11 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     long compilationStartTimeNs = System.nanoTime();
     long queryTimeoutMs;
     QueryEnvironment.QueryPlannerResult queryPlanResult;
+    String database;
     try {
       Long timeoutMsFromQueryOption = 
QueryOptionsUtils.getTimeoutMs(queryOptions);
       queryTimeoutMs = timeoutMsFromQueryOption != null ? 
timeoutMsFromQueryOption : _brokerTimeoutMs;
-      String database = 
DatabaseUtils.extractDatabaseFromQueryRequest(queryOptions, httpHeaders);
+      database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptions, 
httpHeaders);
       QueryEnvironment queryEnvironment = new QueryEnvironment(database, 
_tableCache, _workerManager);
       switch (sqlNodeAndOptions.getSqlNode().getKind()) {
         case EXPLAIN:
@@ -204,7 +205,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     }
 
     // Validate QPS quota
-    if (hasExceededQPSQuota(tableNames, requestContext)) {
+    if (hasExceededQPSQuota(database, tableNames, requestContext)) {
       String errorMessage = String.format("Request %d: %s exceeds query 
quota.", requestId, query);
       return new 
BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR,
 errorMessage));
     }
@@ -327,7 +328,13 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
   /**
    * Returns true if the QPS quota of the tables has exceeded.
    */
-  private boolean hasExceededQPSQuota(Set<String> tableNames, RequestContext 
requestContext) {
+  private boolean hasExceededQPSQuota(@Nullable String database, Set<String> 
tableNames,
+      RequestContext requestContext) {
+    if (database != null && !_queryQuotaManager.acquireDatabase(database)) {
+      LOGGER.warn("Request {}: query exceeds quota for database: {}", 
requestContext.getRequestId(), database);
+      requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE);
+      return true;
+    }
     for (String tableName : tableNames) {
       if (!_queryQuotaManager.acquire(tableName)) {
         LOGGER.warn("Request {}: query exceeds quota for table: {}", 
requestContext.getRequestId(), tableName);
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
index a22f8bdb57..760f3170f9 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.broker.queryquota;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -27,7 +28,6 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -38,6 +38,7 @@ import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.spi.config.DatabaseConfig;
 import org.apache.pinot.spi.config.table.QuotaConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -58,10 +59,17 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
   private HelixManager _helixManager;
   private HelixExternalViewBasedQueryQuotaManager _queryQuotaManager;
   private ZkStarter.ZookeeperInstance _zookeeperInstance;
+  private static final Map<String, String> CLUSTER_CONFIG_MAP = new 
HashMap<>();
   private static final String RAW_TABLE_NAME = "testTable";
   private static final String OFFLINE_TABLE_NAME = RAW_TABLE_NAME + "_OFFLINE";
   private static final String REALTIME_TABLE_NAME = RAW_TABLE_NAME + 
"_REALTIME";
   private static final String BROKER_INSTANCE_ID = "broker_instance_1";
+  private static final long TABLE_MAX_QPS = 25;
+  private static final String TABLE_MAX_QPS_STR = 
String.valueOf(TABLE_MAX_QPS);
+  private static final long DATABASE_HIGH_QPS = 40;
+  private static final String DATABASE_HIGH_QPS_STR = 
String.valueOf(DATABASE_HIGH_QPS);
+  private static final long DATABASE_LOW_QPS = 10;
+  private static final String DATABASE_LOW_QPS_STR = 
String.valueOf(DATABASE_LOW_QPS);
 
   @BeforeTest
   public void beforeTest() {
@@ -82,7 +90,6 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
   }
 
   public class FakeHelixManager extends ZKHelixManager {
-    private ZkHelixPropertyStore<ZNRecord> _propertyStore;
 
     FakeHelixManager(String clusterName, String instanceName, InstanceType 
instanceType, String zkAddress) {
       super(clusterName, instanceName, instanceType, zkAddress);
@@ -90,13 +97,6 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
           ZkClient.DEFAULT_SESSION_TIMEOUT, 
ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
       _zkclient.deleteRecursively("/" + clusterName + "/PROPERTYSTORE");
       _zkclient.createPersistent("/" + clusterName + "/PROPERTYSTORE", true);
-      setPropertyStore(clusterName);
-    }
-
-    void setPropertyStore(String clusterName) {
-      _propertyStore =
-          new ZkHelixPropertyStore<>(new 
ZkBaseDataAccessor<ZNRecord>(_zkclient), "/" + clusterName + "/PROPERTYSTORE",
-              null);
     }
 
     void closeZkClient() {
@@ -119,8 +119,16 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
 
     @Override
     public Map<String, String> getConfig(HelixConfigScope scope, List<String> 
keys) {
+      if 
(scope.getType().equals(HelixConfigScope.ConfigScopeProperty.CLUSTER)) {
+        return CLUSTER_CONFIG_MAP;
+      }
       return _instanceConfigMap;
     }
+
+    @Override
+    public ExternalView getResourceExternalView(String clusterName, String 
resourceName) {
+      return generateBrokerResource(OFFLINE_TABLE_NAME);
+    }
   }
 
   @AfterMethod
@@ -129,8 +137,11 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
       _testPropertyStore.reset();
       
ZKMetadataProvider.removeResourceConfigFromPropertyStore(_testPropertyStore, 
OFFLINE_TABLE_NAME);
       
ZKMetadataProvider.removeResourceConfigFromPropertyStore(_testPropertyStore, 
REALTIME_TABLE_NAME);
+      ZKMetadataProvider.removeDatabaseConfig(_testPropertyStore, 
CommonConstants.DEFAULT_DATABASE);
+      CLUSTER_CONFIG_MAP.clear();
     }
     _queryQuotaManager.cleanUpRateLimiterMap();
+    _queryQuotaManager.getDatabaseRateLimiterMap().clear();
   }
 
   @AfterTest
@@ -152,17 +163,129 @@ public class HelixExternalViewBasedQueryQuotaManagerTest 
{
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
 
     // All the request should be passed.
-    runQueries(70, 10);
+    runQueries();
+
+    _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+  }
+
+  @Test
+  public void testOfflineTableNotnullQuotaWithHigherDefaultDatabaseQuota()
+      throws Exception {
+    ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
+    TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
+    ZKMetadataProvider.setTableConfig(_testPropertyStore, tableConfig);
+    setQps(tableConfig);
+    _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, 
brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+    
_queryQuotaManager.createDatabaseRateLimiter(CommonConstants.DEFAULT_DATABASE);
+    Assert.assertEquals(_queryQuotaManager.getDatabaseRateLimiterMap().size(), 
1);
+
+    setDefaultDatabaseQps("40");
+    // qps withing table and default database qps quota
+    runQueries(25, false);
+    // qps exceeding table qps quota but withing default database quota
+    runQueries(40, true);
 
     _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
-  public void testOfflineTableWithNullQuotaAndNoRealtimeTableConfig()
+  public void testOfflineTableNotnullQuotaWithLowerDefaultDatabaseQuota()
       throws Exception {
     ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
+    ZKMetadataProvider.setTableConfig(_testPropertyStore, tableConfig);
+    setQps(tableConfig);
+    _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, 
brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+    
_queryQuotaManager.createDatabaseRateLimiter(CommonConstants.DEFAULT_DATABASE);
+    Assert.assertEquals(_queryQuotaManager.getDatabaseRateLimiterMap().size(), 
1);
+
+    setDefaultDatabaseQps(DATABASE_LOW_QPS_STR);
+    // qps withing table and default database qps quota
+    runQueries(DATABASE_LOW_QPS, false);
+    // qps withing table qps quota but exceeding default database quota
+    runQueries(TABLE_MAX_QPS, true);
+
+    _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+  }
+
+  @Test
+  public void testOfflineTableNotnullQuotaWithHigherDatabaseQuota()
+      throws Exception {
+    ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
+    TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
+    ZKMetadataProvider.setTableConfig(_testPropertyStore, tableConfig);
+    setQps(tableConfig);
+    _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, 
brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+
+    DatabaseConfig databaseConfig = generateDefaultDatabaseConfig();
+    setHigherDatabaseQps(databaseConfig);
+    // qps withing table and database qps quota
+    runQueries(TABLE_MAX_QPS, false);
+    // qps exceeding table qps quota but within database quota
+    runQueries(DATABASE_HIGH_QPS, true);
+
+    _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+  }
+
+  @Test
+  public void testOfflineTableNotnullQuotaWithLowerDatabaseQuota()
+      throws Exception {
+    ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
+    TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
+    ZKMetadataProvider.setTableConfig(_testPropertyStore, tableConfig);
+    setQps(tableConfig);
+    _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, 
brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+
+    DatabaseConfig databaseConfig = generateDefaultDatabaseConfig();
+    setLowerDatabaseQps(databaseConfig);
+    // qps withing table and database qps quota
+    runQueries(DATABASE_LOW_QPS, false);
+    // qps within table qps quota but exceeding database quota
+    runQueries(TABLE_MAX_QPS, true);
+
+    _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+  }
+
+  @Test
+  public void testCreateOrUpdateDatabaseRateLimiter() {
+    List<String> dbList = new ArrayList<>(2);
+    dbList.add("db1");
+    dbList.add("db2");
+    dbList.add("db3");
+    DatabaseConfig db1 = new DatabaseConfig(dbList.get(0), new 
QuotaConfig(null, null));
+    DatabaseConfig db2 = new DatabaseConfig(dbList.get(1), new 
QuotaConfig(null, "1"));
+    DatabaseConfig db3 = new DatabaseConfig(dbList.get(2), new 
QuotaConfig(null, "2"));
+    ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db1);
+    ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db2);
+    ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db3);
+    dbList.forEach(db -> _queryQuotaManager.createDatabaseRateLimiter(db));
+    Map<String, QueryQuotaEntity> dbQuotaMap = 
_queryQuotaManager.getDatabaseRateLimiterMap();
+    Assert.assertNull(dbQuotaMap.get(dbList.get(0)).getRateLimiter());
+    
Assert.assertEquals(dbQuotaMap.get(dbList.get(1)).getRateLimiter().getRate(), 
1);
+    
Assert.assertEquals(dbQuotaMap.get(dbList.get(2)).getRateLimiter().getRate(), 
2);
+    db1.setQuotaConfig(new QuotaConfig(null, "1"));
+    db2.setQuotaConfig(new QuotaConfig(null, "2"));
+    ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db1);
+    ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db2);
+    dbList.forEach(db -> _queryQuotaManager.updateDatabaseRateLimiter(db));
+    
Assert.assertEquals(dbQuotaMap.get(dbList.get(0)).getRateLimiter().getRate(), 
1);
+    
Assert.assertEquals(dbQuotaMap.get(dbList.get(1)).getRateLimiter().getRate(), 
2);
+    
Assert.assertEquals(dbQuotaMap.get(dbList.get(2)).getRateLimiter().getRate(), 
2);
+  }
+
+  @Test
+  public void testOfflineTableWithNullQuotaAndNoRealtimeTableConfig() {
+    ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
+    TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
     _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, 
brokerResource);
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
     QueryQuotaEntity queryQuotaEntity = 
_queryQuotaManager.getRateLimiterForTable(OFFLINE_TABLE_NAME);
@@ -194,7 +317,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
   @Test
   public void 
testOfflineTableWithNullQuotaButWithRealtimeTableConfigNotNullQpsConfig()
       throws Exception {
-    QuotaConfig quotaConfig = new QuotaConfig("6G", "100.00");
+    QuotaConfig quotaConfig = new QuotaConfig("6G", TABLE_MAX_QPS_STR);
     TableConfig realtimeTableConfig =
         new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setQuotaConfig(quotaConfig)
             
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND")
@@ -220,7 +343,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
     brokerResource.setState(REALTIME_TABLE_NAME, BROKER_INSTANCE_ID, "ONLINE");
     brokerResource.setState(REALTIME_TABLE_NAME, "broker_instance_2", 
"OFFLINE");
 
-    QuotaConfig quotaConfig = new QuotaConfig("6G", "100.00");
+    QuotaConfig quotaConfig = new QuotaConfig("6G", TABLE_MAX_QPS_STR);
     TableConfig realtimeTableConfig =
         new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setQuotaConfig(quotaConfig)
             
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND")
@@ -241,7 +364,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 2);
 
     // Rate limiter generates 1 token every 10 milliseconds, have to make it 
sleep for a while.
-    runQueries(70, 10L);
+    runQueries();
 
     _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
     // Since real-time table still has the qps quota, the size of the hash map 
becomes 1.
@@ -262,7 +385,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
     _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, 
brokerResource);
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
 
-    runQueries(70, 10L);
+    runQueries();
 
     _queryQuotaManager.dropTableQueryQuota(REALTIME_TABLE_NAME);
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
@@ -278,7 +401,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
     _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, 
brokerResource);
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
 
-    runQueries(70, 10L);
+    runQueries();
 
     
ZKMetadataProvider.removeResourceConfigFromPropertyStore(_testPropertyStore, 
REALTIME_TABLE_NAME);
     
_queryQuotaManager.processQueryRateLimitingExternalViewChange(brokerResource);
@@ -315,9 +438,8 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
   }
 
   @Test
-  public void 
testRealtimeTableWithNullQuotaButWithOfflineTableConfigNotNullQpsConfig()
-      throws Exception {
-    QuotaConfig quotaConfig = new QuotaConfig("6G", "100.00");
+  public void 
testRealtimeTableWithNullQuotaButWithOfflineTableConfigNotNullQpsConfig() {
+    QuotaConfig quotaConfig = new QuotaConfig("6G", TABLE_MAX_QPS_STR);
     TableConfig offlineTableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setQuotaConfig(quotaConfig)
             
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND")
@@ -376,37 +498,65 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
     return builder.build();
   }
 
+  private DatabaseConfig generateDefaultDatabaseConfig() {
+    return new DatabaseConfig(CommonConstants.DEFAULT_DATABASE, null);
+  }
+
+  private void setLowerDatabaseQps(DatabaseConfig databaseConfig) {
+    setDatabaseQps(databaseConfig, DATABASE_LOW_QPS_STR);
+  }
+
+  private void setHigherDatabaseQps(DatabaseConfig databaseConfig) {
+    setDatabaseQps(databaseConfig, DATABASE_HIGH_QPS_STR);
+  }
+
+  private void setDefaultDatabaseQps(String maxQps) {
+    ZKMetadataProvider.removeDatabaseConfig(_testPropertyStore, 
CommonConstants.DEFAULT_DATABASE);
+    
CLUSTER_CONFIG_MAP.put(CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND, 
maxQps);
+    _queryQuotaManager.processQueryRateLimitingClusterConfigChange();
+  }
+
+  private void setDatabaseQps(DatabaseConfig databaseConfig, String maxQps) {
+    QuotaConfig quotaConfig = new QuotaConfig(null, maxQps);
+    databaseConfig.setQuotaConfig(quotaConfig);
+    ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, databaseConfig);
+    
_queryQuotaManager.createDatabaseRateLimiter(CommonConstants.DEFAULT_DATABASE);
+  }
+
   private void setQps(TableConfig tableConfig) {
-    QuotaConfig quotaConfig = new QuotaConfig(null, "100.00");
+    QuotaConfig quotaConfig = new QuotaConfig(null, TABLE_MAX_QPS_STR);
     tableConfig.setQuotaConfig(quotaConfig);
   }
 
-  private ExternalView generateBrokerResource(String tableName) {
+  private static ExternalView generateBrokerResource(String tableName) {
     ExternalView brokerResource = new 
ExternalView(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
     brokerResource.setState(tableName, BROKER_INSTANCE_ID, "ONLINE");
     brokerResource.setState(tableName, "broker_instance_2", "OFFLINE");
     return brokerResource;
   }
 
-  private void runQueries(int numOfTimesToRun, long millis)
+  private void runQueries()
       throws InterruptedException {
-    int count = 0;
-    for (int i = 0; i < numOfTimesToRun; i++) {
-      Assert.assertTrue(_queryQuotaManager.acquire(RAW_TABLE_NAME));
-      count++;
-      Thread.sleep(millis);
-    }
-    Assert.assertEquals(count, numOfTimesToRun);
+    runQueries(TABLE_MAX_QPS, false);
+    //increase the qps and some of the queries should be throttled.
+    runQueries(TABLE_MAX_QPS * 2, true);
+  }
 
-    //Reduce the time of sleeping and some of the queries should be throttled.
-    count = 0;
-    millis /= 2;
-    for (int i = 0; i < numOfTimesToRun; i++) {
+  // try to keep the qps below 50 to ensure that the time lost between 2 query 
runs on top of the sleepMillis
+  // is not comparable to sleepMillis, else the actual qps would end being lot 
lower than required qps
+  private void runQueries(double qps, boolean shouldFail)
+      throws InterruptedException {
+    int failCount = 0;
+    long sleepMillis = (long) (1000 / qps);
+    for (int i = 0; i < qps; i++) {
+      if 
(!_queryQuotaManager.acquireDatabase(CommonConstants.DEFAULT_DATABASE)) {
+        failCount++;
+      }
       if (!_queryQuotaManager.acquire(RAW_TABLE_NAME)) {
-        count++;
+        failCount++;
       }
-      Thread.sleep(millis);
+      Thread.sleep(sleepMillis);
     }
-    Assert.assertTrue(count > 0 && count < numOfTimesToRun);
+    Assert.assertTrue((failCount == 0 && !shouldFail) || (failCount != 0 && 
shouldFail));
   }
 }
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
index f1a6dfe33f..7dfbdd75b9 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
@@ -172,6 +172,7 @@ public class BaseSingleStageBrokerRequestHandlerTest {
     when(routingManager.getRoutingTable(any(), 
Mockito.anyLong())).thenReturn(rt);
     QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class);
     when(queryQuotaManager.acquire(anyString())).thenReturn(true);
+    when(queryQuotaManager.acquireDatabase(anyString())).thenReturn(true);
     CountDownLatch latch = new CountDownLatch(1);
     long[] testRequestId = {-1};
     BrokerMetrics.register(mock(BrokerMetrics.class));
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/messages/DatabaseConfigRefreshMessage.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/DatabaseConfigRefreshMessage.java
new file mode 100644
index 0000000000..5511f26b27
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/DatabaseConfigRefreshMessage.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.messages;
+
+import java.util.UUID;
+import org.apache.helix.model.Message;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+
+
+/**
+ * This (Helix) message is sent from the controller to brokers when a request 
is received to update the database config.
+ */
+public class DatabaseConfigRefreshMessage extends Message {
+  public static final String REFRESH_DATABASE_CONFIG_MSG_SUB_TYPE = 
"REFRESH_DATABASE_CONFIG";
+
+  private static final String DATABASE_NAME_KEY = "databaseName";
+
+  /**
+   * Constructor for the sender.
+   */
+  public DatabaseConfigRefreshMessage(String databaseName) {
+    super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
+    setMsgSubType(REFRESH_DATABASE_CONFIG_MSG_SUB_TYPE);
+    // Give it infinite time to process the message, as long as session is 
alive
+    setExecutionTimeout(-1);
+    // Set the Pinot specific fields
+    ZNRecord znRecord = getRecord();
+    znRecord.setSimpleField(DATABASE_NAME_KEY, databaseName);
+  }
+
+  /**
+   * Constructor for the receiver.
+   */
+  public DatabaseConfigRefreshMessage(Message message) {
+    super(message.getRecord());
+    if (!message.getMsgSubType().equals(REFRESH_DATABASE_CONFIG_MSG_SUB_TYPE)) 
{
+      throw new IllegalArgumentException("Invalid message subtype:" + 
message.getMsgSubType());
+    }
+  }
+
+  public String getDatabaseName() {
+    return getRecord().getSimpleField(DATABASE_NAME_KEY);
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 0fdf94388a..faf9b6799c 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pinot.common.metadata;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -39,11 +41,14 @@ import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.spi.config.ConfigUtils;
+import org.apache.pinot.spi.config.DatabaseConfig;
+import org.apache.pinot.spi.config.table.QuotaConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.user.UserConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.StringUtil;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.zookeeper.data.Stat;
@@ -62,6 +67,7 @@ public class ZKMetadataProvider {
   private static final String PROPERTYSTORE_SEGMENTS_PREFIX = "/SEGMENTS";
   private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS";
   private static final String PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX = 
"/INSTANCE_PARTITIONS";
+  private static final String PROPERTYSTORE_DATABASE_CONFIGS_PREFIX = 
"/CONFIGS/DATABASE";
   private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX = 
"/CONFIGS/TABLE";
   private static final String PROPERTYSTORE_USER_CONFIGS_PREFIX = 
"/CONFIGS/USER";
   private static final String PROPERTYSTORE_INSTANCE_CONFIGS_PREFIX = 
"/CONFIGS/INSTANCE";
@@ -73,6 +79,75 @@ public class ZKMetadataProvider {
     propertyStore.set(constructPropertyStorePathForUserConfig(username), 
znRecord, AccessOption.PERSISTENT);
   }
 
+  /**
+   * Create database config, fail if exists.
+   *
+   * @return true if creation is successful.
+   */
+  public static boolean createDatabaseConfig(ZkHelixPropertyStore<ZNRecord> 
propertyStore,
+      DatabaseConfig databaseConfig) {
+    String databaseName = databaseConfig.getDatabaseName();
+    String databaseConfigPath = 
constructPropertyStorePathForDatabaseConfig(databaseName);
+    ZNRecord databaseConfigZNRecord = toZNRecord(databaseConfig);
+    return propertyStore.create(databaseConfigPath, databaseConfigZNRecord, 
AccessOption.PERSISTENT);
+  }
+
+  /**
+   * Update database config.
+   *
+   * @return true if update is successful.
+   */
+  public static boolean setDatabaseConfig(ZkHelixPropertyStore<ZNRecord> 
propertyStore, DatabaseConfig databaseConfig) {
+    String databaseName = databaseConfig.getDatabaseName();
+    ZNRecord databaseConfigZNRecord = toZNRecord(databaseConfig);
+    return 
propertyStore.set(constructPropertyStorePathForDatabaseConfig(databaseName), 
databaseConfigZNRecord,
+        -1, AccessOption.PERSISTENT);
+  }
+
+  /**
+   * Remove database config.
+   */
+  @VisibleForTesting
+  public static void removeDatabaseConfig(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String databaseName) {
+    
propertyStore.remove(constructPropertyStorePathForDatabaseConfig(databaseName), 
AccessOption.PERSISTENT);
+  }
+
+  private static ZNRecord toZNRecord(DatabaseConfig databaseConfig) {
+    ZNRecord databaseConfigZNRecord = new 
ZNRecord(databaseConfig.getDatabaseName());
+    Map<String, String> simpleFields = new HashMap<>();
+    simpleFields.put(DatabaseConfig.DATABASE_NAME_KEY, 
databaseConfig.getDatabaseName());
+    QuotaConfig quotaConfig = databaseConfig.getQuotaConfig();
+    if (quotaConfig != null) {
+      simpleFields.put(DatabaseConfig.QUOTA_CONFIG_KEY, 
quotaConfig.toJsonString());
+    }
+    databaseConfigZNRecord.setSimpleFields(simpleFields);
+    return databaseConfigZNRecord;
+  }
+
+  @Nullable
+  private static DatabaseConfig toDatabaseConfig(@Nullable ZNRecord znRecord) {
+    if (znRecord == null) {
+      return null;
+    }
+    try {
+      Map<String, String> simpleFields = znRecord.getSimpleFields();
+
+      // Mandatory fields
+      String databaseName = simpleFields.get(DatabaseConfig.DATABASE_NAME_KEY);
+
+      // Optional fields
+      QuotaConfig quotaConfig = null;
+      String quotaConfigString = 
simpleFields.get(DatabaseConfig.QUOTA_CONFIG_KEY);
+      if (quotaConfigString != null) {
+        quotaConfig = JsonUtils.stringToObject(quotaConfigString, 
QuotaConfig.class);
+      }
+      return new DatabaseConfig(databaseName, quotaConfig);
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while creating database config from 
ZNRecord: {}", znRecord.getId(), e);
+      return null;
+    }
+  }
+
   @Deprecated
   public static void setTableConfig(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String tableNameWithType,
       ZNRecord znRecord) {
@@ -177,6 +252,10 @@ public class ZKMetadataProvider {
     return StringUtil.join("/", PROPERTYSTORE_CONTROLLER_JOBS_PREFIX, jobType);
   }
 
+  public static String constructPropertyStorePathForDatabaseConfig(String 
resourceName) {
+    return StringUtil.join("/", PROPERTYSTORE_DATABASE_CONFIGS_PREFIX, 
resourceName);
+  }
+
   public static String constructPropertyStorePathForResourceConfig(String 
resourceName) {
     return StringUtil.join("/", PROPERTYSTORE_TABLE_CONFIGS_PREFIX, 
resourceName);
   }
@@ -360,6 +439,12 @@ public class ZKMetadataProvider {
         .collect(Collectors.toMap(UserConfig::getUsernameWithComponent, u -> 
u));
   }
 
+  @Nullable
+  public static DatabaseConfig 
getDatabaseConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String 
databaseName) {
+    return 
toDatabaseConfig(propertyStore.get(constructPropertyStorePathForDatabaseConfig(databaseName),
 null,
+        AccessOption.PERSISTENT));
+  }
+
   @Nullable
   public static TableConfig getTableConfig(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String tableNameWithType) {
     return 
toTableConfig(propertyStore.get(constructPropertyStorePathForResourceConfig(tableNameWithType),
 null,
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java
index 809592bbc7..3691e0063c 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java
@@ -155,4 +155,18 @@ public class DatabaseUtils {
     String database = databaseFromHeaders != null ? databaseFromHeaders : 
databaseFromOptions;
     return Objects.requireNonNullElse(database, 
CommonConstants.DEFAULT_DATABASE);
   }
+
+  /**
+   * Extract the database name from the prefix of fully qualified table name.
+   * If no prefix is present "default" database is returned
+   */
+  public static String extractDatabaseFromFullyQualifiedTableName(String 
fullyQualifiedTableName) {
+    String[] split = StringUtils.split(fullyQualifiedTableName, '.');
+    return split.length == 1 ? CommonConstants.DEFAULT_DATABASE : split[0];
+  }
+
+  public static String extractDatabaseFromHttpHeaders(HttpHeaders headers) {
+    String databaseName = headers.getHeaderString(CommonConstants.DATABASE);
+    return databaseName == null ? CommonConstants.DEFAULT_DATABASE : 
databaseName;
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java
index 9fd4584207..eecf2d0778 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java
@@ -27,21 +27,34 @@ import io.swagger.annotations.Authorization;
 import io.swagger.annotations.SecurityDefinition;
 import io.swagger.annotations.SwaggerDefinition;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import javax.inject.Inject;
+import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
+import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.pinot.common.utils.DatabaseUtils;
+import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.core.auth.Actions;
 import org.apache.pinot.core.auth.Authorize;
 import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.spi.config.DatabaseConfig;
+import org.apache.pinot.spi.config.table.QuotaConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,9 +63,14 @@ import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
 
 
 @Api(tags = Constants.DATABASE_TAG, authorizations = {@Authorization(value = 
SWAGGER_AUTHORIZATION_KEY)})
-@SwaggerDefinition(securityDefinition = 
@SecurityDefinition(apiKeyAuthDefinitions = @ApiKeyAuthDefinition(name =
-    HttpHeaders.AUTHORIZATION, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY,
-    description = "The format of the key is  ```\"Basic <token>\" or \"Bearer 
<token>\"```")))
+@SwaggerDefinition(securityDefinition = 
@SecurityDefinition(apiKeyAuthDefinitions = {
+    @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER,
+        key = SWAGGER_AUTHORIZATION_KEY,
+        description = "The format of the key is  ```\"Basic <token>\" or 
\"Bearer <token>\"```"),
+    @ApiKeyAuthDefinition(name = CommonConstants.DATABASE, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER,
+        key = CommonConstants.DATABASE,
+        description = "Database context passed through http header. If no 
context is provided 'default' database "
+            + "context will be considered.")}))
 @Path("/")
 public class PinotDatabaseRestletResource {
   public static final Logger LOGGER = 
LoggerFactory.getLogger(PinotDatabaseRestletResource.class);
@@ -108,6 +126,67 @@ public class PinotDatabaseRestletResource {
     }
     return new DeleteDatabaseResponse(deletedTables, failedTables, dryRun);
   }
+
+  /**
+   * API to update the quota configs for database
+   * If database config is not present it will be created implicitly
+   */
+  @POST
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Path("/databases/{databaseName}/quotas")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.UPDATE_DATABASE_QUOTA)
+  @ApiOperation(value = "Update database quotas", notes = "Update database 
quotas")
+  public SuccessResponse addTable(
+      @PathParam("databaseName") String databaseName, 
@QueryParam("maxQueriesPerSecond") String queryQuota,
+      @Context HttpHeaders httpHeaders) {
+    if 
(!databaseName.equals(DatabaseUtils.extractDatabaseFromHttpHeaders(httpHeaders)))
 {
+      throw new ControllerApplicationException(LOGGER, "Database config name 
and request context does not match",
+          Response.Status.BAD_REQUEST);
+    }
+    try {
+      DatabaseConfig databaseConfig = 
_pinotHelixResourceManager.getDatabaseConfig(databaseName);
+      QuotaConfig quotaConfig = new QuotaConfig(null, queryQuota);
+      if (databaseConfig == null) {
+         databaseConfig = new DatabaseConfig(databaseName, quotaConfig);
+        _pinotHelixResourceManager.addDatabaseConfig(databaseConfig);
+      } else {
+        databaseConfig.setQuotaConfig(quotaConfig);
+        _pinotHelixResourceManager.updateDatabaseConfig(databaseConfig);
+      }
+      return new SuccessResponse("Database quotas for database config " + 
databaseName + " successfully updated");
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
+  /**
+   * API to get database quota configs.
+   * Will return null if database config is not defined or database quotas are 
not defined
+   */
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/databases/{databaseName}/quotas")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_DATABASE_QUOTA)
+  @ApiOperation(value = "Get database quota configs", notes = "Get database 
quota configs")
+  public QuotaConfig getDatabaseQuota(
+      @PathParam("databaseName") String databaseName, @Context HttpHeaders 
httpHeaders) {
+    if 
(!databaseName.equals(DatabaseUtils.extractDatabaseFromHttpHeaders(httpHeaders)))
 {
+      throw new ControllerApplicationException(LOGGER, "Database config name 
and request context does not match",
+          Response.Status.BAD_REQUEST);
+    }
+    DatabaseConfig databaseConfig = 
_pinotHelixResourceManager.getDatabaseConfig(databaseName);
+    if (databaseConfig != null) {
+      return databaseConfig.getQuotaConfig();
+    }
+    HelixAdmin helixAdmin = _pinotHelixResourceManager.getHelixAdmin();
+    HelixConfigScope configScope = new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
+        .forCluster(_pinotHelixResourceManager.getHelixClusterName()).build();
+    String defaultQueryQuota = helixAdmin.getConfig(configScope,
+            
Collections.singletonList(CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND))
+            
.getOrDefault(CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND, null);
+    return new QuotaConfig(null, defaultQueryQuota);
+  }
 }
 
 class DeleteDatabaseResponse {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 2b835faaae..546f4c0105 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -102,6 +102,7 @@ import org.apache.pinot.common.lineage.LineageEntryState;
 import org.apache.pinot.common.lineage.SegmentLineage;
 import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
 import org.apache.pinot.common.lineage.SegmentLineageUtils;
+import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage;
 import org.apache.pinot.common.messages.RoutingTableRebuildMessage;
 import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
 import org.apache.pinot.common.messages.SegmentRefreshMessage;
@@ -154,6 +155,7 @@ import 
org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObs
 import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
 import org.apache.pinot.controller.helix.starter.HelixConfig;
 import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.config.DatabaseConfig;
 import org.apache.pinot.spi.config.instance.Instance;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableStats;
@@ -1639,6 +1641,29 @@ public class PinotHelixResourceManager {
     LOGGER.info("Successfully add user:{}", usernamePrefix);
   }
 
+  /**
+   * Creates database config and sends out a database config refresh message.
+   * @param databaseConfig database config to be created
+   */
+  public void addDatabaseConfig(DatabaseConfig databaseConfig) {
+    if (!ZKMetadataProvider.createDatabaseConfig(_propertyStore, 
databaseConfig)) {
+      throw new RuntimeException("Failed to create database config for 
database: " + databaseConfig.getDatabaseName());
+    }
+    sendDatabaseConfigRefreshMessage(databaseConfig.getDatabaseName());
+  }
+
+  /**
+   * Updates database config and sends out a database config refresh message.
+   * @param databaseConfig database config to be created
+   */
+  public void updateDatabaseConfig(DatabaseConfig databaseConfig) {
+    if (!ZKMetadataProvider.setDatabaseConfig(_propertyStore, databaseConfig)) 
{
+      throw new RuntimeException(
+          "Failed to update database config in Zookeeper for database: " + 
databaseConfig.getDatabaseName());
+    }
+    sendDatabaseConfigRefreshMessage(databaseConfig.getDatabaseName());
+  }
+
   /**
    * Performs validations of table config and adds the table to zookeeper
    * @throws InvalidTableConfigException if validations fail
@@ -2755,6 +2780,26 @@ public class PinotHelixResourceManager {
     }
   }
 
+  private void sendDatabaseConfigRefreshMessage(String databaseName) {
+    DatabaseConfigRefreshMessage databaseConfigRefreshMessage = new 
DatabaseConfigRefreshMessage(databaseName);
+
+    // Send database config refresh message to brokers
+    Criteria recipientCriteria = new Criteria();
+    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    recipientCriteria.setInstanceName("%");
+    recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
+    recipientCriteria.setSessionSpecific(true);
+    // Send message with no callback and infinite timeout on the recipient
+    int numMessagesSent =
+        _helixZkManager.getMessagingService().send(recipientCriteria, 
databaseConfigRefreshMessage, null, -1);
+    if (numMessagesSent > 0) {
+      LOGGER.info("Sent {} database config refresh messages to brokers for 
database: {}", numMessagesSent,
+          databaseName);
+    } else {
+      LOGGER.warn("No database config refresh message sent to brokers for 
database: {}", databaseName);
+    }
+  }
+
   private void sendRoutingTableRebuildMessage(String tableNameWithType) {
     RoutingTableRebuildMessage routingTableRebuildMessage = new 
RoutingTableRebuildMessage(tableNameWithType);
 
@@ -2982,6 +3027,17 @@ public class PinotHelixResourceManager {
     return _helixAdmin.getResourceExternalView(_helixClusterName, 
tableNameWithType);
   }
 
+  /**
+   * Get the database config for the given database name.
+   *
+   * @param databaseName database name
+   * @return Database config
+   */
+  @Nullable
+  public DatabaseConfig getDatabaseConfig(String databaseName) {
+    return ZKMetadataProvider.getDatabaseConfig(_propertyStore, databaseName);
+  }
+
   /**
    * Get the table config for the given table name with type suffix.
    *
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java 
b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
index 51cab16711..62f1b2e4d6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
@@ -70,6 +70,7 @@ public class Actions {
     public static final String GET_USER = "GetUser";
     public static final String GET_VERSION = "GetVersion";
     public static final String GET_ZNODE = "GetZnode";
+    public static final String GET_DATABASE_QUOTA = "GetDatabaseQuota";
     public static final String INGEST_FILE = "IngestFile";
     public static final String RECOMMEND_CONFIG = "RecommendConfig";
     public static final String RESET_SEGMENT = "ResetSegment";
@@ -86,6 +87,7 @@ public class Actions {
     public static final String REBALANCE_TENANT_TABLES = 
"RebalanceTenantTables";
     public static final String UPDATE_TIME_INTERVAL = "UpdateTimeInterval";
     public static final String UPDATE_USER = "UpdateUser";
+    public static final String UPDATE_DATABASE_QUOTA = "UpdateDatabaseQuota";
     public static final String UPDATE_ZNODE = "UpdateZnode";
     public static final String UPLOAD_SEGMENT = "UploadSegment";
     public static final String GET_INSTANCE_PARTITIONS = 
"GetInstancePartitions";
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
new file mode 100644
index 0000000000..d1fb956f2c
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.net.URI;
+import java.util.Properties;
+import org.apache.pinot.broker.broker.helix.BaseBrokerStarter;
+import 
org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManagerTest;
+import org.apache.pinot.client.ConnectionFactory;
+import org.apache.pinot.client.JsonAsyncHttpPinotClientTransportFactory;
+import org.apache.pinot.client.PinotClientException;
+import org.apache.pinot.client.ResultSetGroup;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.spi.config.table.QuotaConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.client.Connection.FAIL_ON_EXCEPTIONS;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * This test suite is focused only on validating that the config changes are 
propagated properly as expected.
+ * Validations around different cases arising from cluster config, database 
config and table config are extensively
+ * tested as part of {@link HelixExternalViewBasedQueryQuotaManagerTest}
+ */
+public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest {
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBrokers(1);
+    startServers(1);
+
+    // Create and upload the schema and table config
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig tableConfig = createOfflineTableConfig();
+    addTableConfig(tableConfig);
+
+    Properties properties = new Properties();
+    properties.put(FAIL_ON_EXCEPTIONS, "FALSE");
+    _pinotConnection = ConnectionFactory.fromZookeeper(properties, getZkUrl() 
+ "/" + getHelixClusterName(),
+        new 
JsonAsyncHttpPinotClientTransportFactory().withConnectionProperties(getPinotConnectionProperties())
+            .buildTransport());
+  }
+
+  @AfterMethod
+  void resetQuotas()
+      throws Exception {
+    addQueryQuotaToClusterConfig(null);
+    addQueryQuotaToDatabaseConfig(null);
+    addQueryQuotaToTableConfig(null);
+  }
+
+  @Test
+  public void testDefaultDatabaseQueryQuota()
+      throws Exception {
+    addQueryQuotaToClusterConfig(40);
+    testQueryRate(40);
+  }
+
+  @Test
+  public void testDatabaseConfigQueryQuota()
+      throws Exception {
+    addQueryQuotaToDatabaseConfig(10);
+    testQueryRate(10);
+  }
+
+  @Test
+  public void testDefaultDatabaseQueryQuotaOverride()
+      throws Exception {
+    addQueryQuotaToClusterConfig(25);
+    // override lower than default quota
+    addQueryQuotaToDatabaseConfig(10);
+    testQueryRate(10);
+    // override higher than default quota
+    addQueryQuotaToDatabaseConfig(40);
+    testQueryRate(40);
+  }
+
+  @Test
+  public void testDatabaseQueryQuotaWithTableQueryQuota()
+      throws Exception {
+    addQueryQuotaToDatabaseConfig(25);
+    // table quota within database quota. Queries should fail upon table quota 
(10 qps) breach
+    addQueryQuotaToTableConfig(10);
+    testQueryRate(10);
+    // table quota more than database quota. Queries should fail upon database 
quota (25 qps) breach
+    addQueryQuotaToTableConfig(50);
+    testQueryRate(25);
+  }
+
+  @Test
+  public void testDatabaseQueryQuotaWithTableQueryQuotaWithExtraBroker()
+      throws Exception {
+    BaseBrokerStarter brokerStarter = null;
+    try {
+      addQueryQuotaToDatabaseConfig(25);
+      addQueryQuotaToTableConfig(10);
+      // Add one more broker such that quota gets distributed equally among 
them
+      brokerStarter = startOneBroker(2);
+      // to allow change propagation to QueryQuotaManager
+      Thread.sleep(1000);
+      testQueryRate(10);
+      // drop table level quota so that database quota comes into effect
+      addQueryQuotaToTableConfig(null);
+      testQueryRate(25);
+    } finally {
+      if (brokerStarter != null) {
+        brokerStarter.stop();
+      }
+    }
+  }
+
+  /**
+   * Runs the query load with the max rate that the quota can allow and 
ensures queries are not failing.
+   * Then runs the query load with double the max rate and expects queries to 
fail due to quota breach.
+   * @param maxRate max rate allowed by the quota
+   */
+  void testQueryRate(int maxRate)
+      throws Exception {
+    runQueries(maxRate, false);
+    //increase the qps and some of the queries should be throttled.
+    runQueries(maxRate * 2, true);
+  }
+
+  // try to keep the qps below 50 to ensure that the time lost between 2 query 
runs on top of the sleepMillis
+  // is not comparable to sleepMillis, else the actual qps would end up being 
much lower than required qps
+  private void runQueries(double qps, boolean shouldFail)
+      throws Exception {
+    int failCount = 0;
+    long sleepMillis = (long) (1000 / qps);
+    for (int i = 0; i < qps * 2; i++) {
+      ResultSetGroup resultSetGroup = _pinotConnection.execute("SELECT 
COUNT(*) FROM " + getTableName());
+      for (PinotClientException exception : resultSetGroup.getExceptions()) {
+        if (exception.getMessage().contains("QuotaExceededError")) {
+          failCount++;
+          break;
+        }
+      }
+      Thread.sleep(sleepMillis);
+    }
+    assertTrue((failCount == 0 && !shouldFail) || (failCount != 0 && 
shouldFail));
+  }
+
+
+  public void addQueryQuotaToTableConfig(Integer maxQps)
+      throws Exception {
+    TableConfig tableConfig = getOfflineTableConfig();
+    tableConfig.setQuotaConfig(new QuotaConfig(null, maxQps == null ? null : 
maxQps.toString()));
+    updateTableConfig(tableConfig);
+    // to allow change propagation to QueryQuotaManager
+    Thread.sleep(1000);
+  }
+
+  public void addQueryQuotaToDatabaseConfig(Integer maxQps)
+      throws Exception {
+    String url = _controllerRequestURLBuilder.getBaseUrl() + 
"/databases/default/quotas";
+    if (maxQps != null) {
+        url += "?maxQueriesPerSecond=" + maxQps;
+    }
+    HttpClient.wrapAndThrowHttpException(_httpClient.sendPostRequest(new 
URI(url), null, null));
+    // to allow change propagation to QueryQuotaManager
+    Thread.sleep(1000);
+  }
+
+  public void addQueryQuotaToClusterConfig(Integer maxQps)
+      throws Exception {
+    if (maxQps == null) {
+      HttpClient.wrapAndThrowHttpException(_httpClient.sendDeleteRequest(new 
URI(
+        _controllerRequestURLBuilder.forClusterConfigs() + "/"
+            + CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND)));
+    } else {
+      String payload = "{\"" + 
CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND + "\":\"" + maxQps + 
"\"}";
+      HttpClient.wrapAndThrowHttpException(
+          _httpClient.sendJsonPostRequest(new 
URI(_controllerRequestURLBuilder.forClusterConfigs()), payload));
+    }
+    // to allow change propagation to QueryQuotaManager
+    Thread.sleep(1000);
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/DatabaseConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/DatabaseConfig.java
new file mode 100644
index 0000000000..c615fa5488
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/DatabaseConfig.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.table.QuotaConfig;
+
+
+public class DatabaseConfig extends BaseJsonConfig {
+  public static final String QUOTA_CONFIG_KEY = "quota";
+  public static final String DATABASE_NAME_KEY = "databaseName";
+  private String _databaseName;
+  @JsonPropertyDescription("Resource quota associated with this database")
+  private QuotaConfig _quotaConfig;
+
+  public DatabaseConfig(String databaseName, QuotaConfig quotaConfig) {
+    _databaseName = databaseName;
+    _quotaConfig = quotaConfig;
+  }
+
+  public String getDatabaseName() {
+    return _databaseName;
+  }
+
+  public void setDatabaseName(String databaseName) {
+    _databaseName = databaseName;
+  }
+
+  @JsonProperty(QUOTA_CONFIG_KEY)
+  @Nullable
+  public QuotaConfig getQuotaConfig() {
+    return _quotaConfig;
+  }
+
+  public void setQuotaConfig(QuotaConfig quotaConfig) {
+    _quotaConfig = quotaConfig;
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index a069031b69..6769ae1894 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -78,6 +78,7 @@ public class CommonConstants {
     public static final String IS_SHUTDOWN_IN_PROGRESS = "shutdownInProgress";
     public static final String QUERIES_DISABLED = "queriesDisabled";
     public static final String QUERY_RATE_LIMIT_DISABLED = 
"queryRateLimitDisabled";
+    public static final String DATABASE_MAX_QUERIES_PER_SECOND = 
"databaseMaxQueriesPerSecond";
 
     public static final String INSTANCE_CONNECTED_METRIC_NAME = 
"helix.connected";
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to