klsince commented on code in PR #13544:
URL: https://github.com/apache/pinot/pull/13544#discussion_r1697688129


##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -230,6 +251,82 @@ private void createOrUpdateRateLimiter(String 
tableNameWithType, ExternalView br
     }
   }
 
+  /**
+   * Updates the database rate limiter if it already exists. Will not create a 
new database rate limiter.
+   * @param databaseName database name for which rate limiter needs to be 
updated
+   */
+  public void updateDatabaseRateLimiter(String databaseName) {
+    if (!_databaseRateLimiterMap.containsKey(databaseName)) {
+      return;
+    }
+    createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+  }
+
+  public void createOrUpdateDatabaseRateLimiter(List<String> databaseNames) {
+    ExternalView brokerResource = HelixHelper
+        .getExternalViewForResource(_helixManager.getClusterManagmentTool(), 
_helixManager.getClusterName(),
+            CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    // Tables in database can span across broker tags as we don't maintain a 
broker tag to database mapping as of now.
+    // Hence, we consider all online brokers for the rate distribution.
+    int onlineBrokers = 
HelixHelper.getOnlineInstanceFromExternalView(brokerResource).size();
+    for (String databaseName : databaseNames) {
+      double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName);
+      if (databaseQpsQuota < 0) {
+        buildEmptyOrResetRateLimiterInDatabaseQueryQuotaEntity(databaseName);
+        continue;
+      }
+      double perBrokerQpsQuota = databaseQpsQuota / onlineBrokers;
+      QueryQuotaEntity queryQuotaEntity = new 
QueryQuotaEntity(RateLimiter.create(perBrokerQpsQuota),
+          new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), new 
MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND),
+          onlineBrokers, databaseQpsQuota, -1);
+      _databaseRateLimiterMap.put(databaseName, queryQuotaEntity);
+    }
+  }
+
+  /**
+   * Utility to get the effective query quota being imposed on a database.
+   * It is computed based on the default quota set at cluster config and 
override set at database config
+   * @param databaseName database name to get the query quota on.
+   * @return effective query quota limit being applied
+   */
+  private double getEffectiveQueryQuotaOnDatabase(String databaseName) {
+    DatabaseConfig databaseConfig =
+        
ZKMetadataProvider.getDatabaseConfig(_helixManager.getHelixPropertyStore(), 
databaseName);
+    if (databaseConfig != null && databaseConfig.getQuotaConfig() != null
+        && databaseConfig.getQuotaConfig().getMaxQPS() != -1) {
+      return databaseConfig.getQuotaConfig().getMaxQPS();
+    }
+    return _defaultQpsQuotaForDatabase;
+  }
+
+  /**
+   * Creates a new database rate limiter. Will not update the database rate 
limiter if it already exists.
+   * @param databaseName database name for which rate limiter needs to be 
created
+   */
+  public void createDatabaseRateLimiter(String databaseName) {
+    if (_databaseRateLimiterMap.containsKey(databaseName)) {
+      return;
+    }
+    createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+  }
+
+  /**
+   * Build an empty rate limiter in the new query quota entity, or set the 
rate limiter to null in an existing query
+   * quota entity.
+   */
+  private void buildEmptyOrResetRateLimiterInDatabaseQueryQuotaEntity(String 
databaseName) {

Review Comment:
   nit: the method name seems too long, just createOrResetRateLimiter()?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -46,21 +55,31 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.DATABASE_QUERY_RATE_LIMIT;
+
 
 /**
  * This class is to support the qps quota feature.
- * It depends on the broker source change to update the dynamic rate limit,
- *  which means it only gets updated when a new table added or a broker 
restarted.
+ * It allows performing qps quota check at table level and database level
+ * For table level check it depends on the broker source change to update the 
dynamic rate limit,
+ *  which means it gets updated when a new table added or a broker restarted.
+ * For database level check it depends on the broker as well as cluster config 
change to update the dynamic rate limit,
+ *  which means it gets updated when the default query quota at cluster config 
is updated or a broker restarted.

Review Comment:
   IIUC, the db quota is also updated when a new table is added? as the 
updating method is called inside 
   ```
   @Transition(from = "OFFLINE", to = "ONLINE")
       public void onBecomeOnlineFromOffline(Message message, 
NotificationContext context) {
         String tableNameWithType = message.getPartitionName();
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -279,6 +376,17 @@ private void 
addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(String table
     }
   }
 
+  @Override
+  public boolean acquireDatabase(String databaseName) {
+    // Return true if query quota is disabled in the current broker.
+    if (isQueryRateLimitDisabled()) {
+      return true;
+    }
+    LOGGER.info("Trying to acquire token for database: {}", databaseName);

Review Comment:
   this can be very verbose, as it's triggered for each query?



##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java:
##########
@@ -73,6 +79,87 @@ public static void 
setUserConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, S
     propertyStore.set(constructPropertyStorePathForUserConfig(username), 
znRecord, AccessOption.PERSISTENT);
   }
 
+  /**
+   * Create database config, fail if exists.
+   *
+   * @return true if creation is successful.
+   */
+  public static boolean createDatabaseConfig(
+      ZkHelixPropertyStore<ZNRecord> propertyStore, DatabaseConfig 
databaseConfig) {
+    String databaseName = databaseConfig.getDatabaseName();
+    String databaseConfigPath = 
constructPropertyStorePathForDatabaseConfig(databaseName);
+    ZNRecord databaseConfigZNRecord;
+    try {
+      databaseConfigZNRecord = toZNRecord(databaseConfig);
+    } catch (Exception e) {
+      LOGGER.error("Caught exception constructing ZNRecord from database 
config for database: {}", databaseName, e);
+      return false;
+    }
+    return propertyStore.create(databaseConfigPath, databaseConfigZNRecord, 
AccessOption.PERSISTENT);
+  }
+
+  /**
+   * Update database config.
+   *
+   * @return true if update is successful.
+   */
+  public static boolean setDatabaseConfig(ZkHelixPropertyStore<ZNRecord> 
propertyStore, DatabaseConfig databaseConfig) {
+    String databaseName = databaseConfig.getDatabaseName();
+    ZNRecord databaseConfigZNRecord;
+    try {
+      databaseConfigZNRecord = toZNRecord(databaseConfig);
+    } catch (Exception e) {
+      LOGGER.error("Caught exception constructing ZNRecord from database 
config for database : {}", databaseName, e);
+      return false;
+    }
+    return 
propertyStore.set(constructPropertyStorePathForDatabaseConfig(databaseName), 
databaseConfigZNRecord,
+        -1, AccessOption.PERSISTENT);
+  }
+
+  /**
+   * Remove database config.
+   */
+  @VisibleForTesting
+  public static void removeDatabaseConfig(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String databaseName) {
+    
propertyStore.remove(constructPropertyStorePathForDatabaseConfig(databaseName), 
AccessOption.PERSISTENT);
+  }
+
+  private static ZNRecord toZNRecord(DatabaseConfig databaseConfig) {

Review Comment:
   does this method toZNRecord throw Exception? as it's just put() calls 
inside. If not, then can remove the try-catch blocks in the methods above



##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java:
##########
@@ -73,6 +79,87 @@ public static void 
setUserConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, S
     propertyStore.set(constructPropertyStorePathForUserConfig(username), 
znRecord, AccessOption.PERSISTENT);
   }
 
+  /**
+   * Create database config, fail if exists.
+   *
+   * @return true if creation is successful.
+   */
+  public static boolean createDatabaseConfig(
+      ZkHelixPropertyStore<ZNRecord> propertyStore, DatabaseConfig 
databaseConfig) {

Review Comment:
   format?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -230,6 +251,82 @@ private void createOrUpdateRateLimiter(String 
tableNameWithType, ExternalView br
     }
   }
 
+  /**
+   * Updates the database rate limiter if it already exists. Will not create a 
new database rate limiter.
+   * @param databaseName database name for which rate limiter needs to be 
updated
+   */
+  public void updateDatabaseRateLimiter(String databaseName) {
+    if (!_databaseRateLimiterMap.containsKey(databaseName)) {
+      return;
+    }
+    createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+  }
+
+  public void createOrUpdateDatabaseRateLimiter(List<String> databaseNames) {
+    ExternalView brokerResource = HelixHelper
+        .getExternalViewForResource(_helixManager.getClusterManagmentTool(), 
_helixManager.getClusterName(),
+            CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    // Tables in database can span across broker tags as we don't maintain a 
broker tag to database mapping as of now.
+    // Hence, we consider all online brokers for the rate distribution.
+    int onlineBrokers = 
HelixHelper.getOnlineInstanceFromExternalView(brokerResource).size();
+    for (String databaseName : databaseNames) {
+      double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName);
+      if (databaseQpsQuota < 0) {
+        buildEmptyOrResetRateLimiterInDatabaseQueryQuotaEntity(databaseName);
+        continue;
+      }
+      double perBrokerQpsQuota = databaseQpsQuota / onlineBrokers;
+      QueryQuotaEntity queryQuotaEntity = new 
QueryQuotaEntity(RateLimiter.create(perBrokerQpsQuota),
+          new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), new 
MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND),
+          onlineBrokers, databaseQpsQuota, -1);
+      _databaseRateLimiterMap.put(databaseName, queryQuotaEntity);

Review Comment:
   log some info when updating database rate limiter, and better if can log the 
reason of update



##########
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java:
##########
@@ -81,6 +82,8 @@ public void onBecomeOnlineFromOffline(Message message, 
NotificationContext conte
         TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
         _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig,
             
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(BROKER_RESOURCE_INSTANCE)));
+        _queryQuotaManager.createDatabaseRateLimiter(

Review Comment:
   why not use similar method name initOrUpdateDatabaseQueryQuota?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -458,6 +458,14 @@ protected BrokerResponse handleRequest(long requestId, 
String query, @Nullable S
       }
 
       // Validate QPS quota
+      String database = 
DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(tableName);
+      if (!_queryQuotaManager.acquireDatabase(database)) {
+        String errorMessage =
+            String.format("Request %d: %s exceeds query quota for database: 
%s", requestId, query, database);
+        LOGGER.info(errorMessage);
+        
requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE);
+        return new 
BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR,
 errorMessage));

Review Comment:
   add a new error type/code DATABASE_QUOTA_EXCEEDED_ERROR?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -230,6 +248,72 @@ private void createOrUpdateRateLimiter(String 
tableNameWithType, ExternalView br
     }
   }
 
+  /**
+   * Updates the database rate limiter if it already exists. Will not create a 
new database rate limiter.
+   * @param databaseName database name for which rate limiter needs to be 
updated
+   */
+  public void updateDatabaseRateLimiter(String databaseName) {
+    if (!_databaseRateLimiterMap.containsKey(databaseName)) {
+      return;
+    }
+    createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+  }
+
+  private void createOrUpdateDatabaseRateLimiter(List<String> databaseNames) {
+    double databaseQpsQuota = _defaultQpsQuotaForDatabase;
+    ExternalView brokerResource = HelixHelper
+        .getExternalViewForResource(_helixManager.getClusterManagmentTool(), 
_helixManager.getClusterName(),
+            CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    // Tables in database can span across broker tags as we don't maintain a 
broker tag to database mapping as of now.
+    // Hence, we consider all online brokers for the rate distribution.
+    int onlineBrokers = 
HelixHelper.getOnlineInstanceFromExternalView(brokerResource).size();
+    for (String databaseName : databaseNames) {
+      DatabaseConfig databaseConfig =
+          
ZKMetadataProvider.getDatabaseConfig(_helixManager.getHelixPropertyStore(), 
databaseName);
+      if (databaseConfig != null && databaseConfig.getQuotaConfig() != null
+          && databaseConfig.getQuotaConfig().getMaxQPS() != -1) {
+        databaseQpsQuota = databaseConfig.getQuotaConfig().getMaxQPS();
+      }
+      if (databaseQpsQuota < 0) {
+        buildEmptyOrResetRateLimiterInDatabaseQueryQuotaEntity(databaseName);
+        continue;
+      }
+      double perBrokerQpsQuota = databaseQpsQuota / onlineBrokers;

Review Comment:
   would suggest to extract a method to get the brokers for a database, so that 
we can extend it later to divide db quota among brokers actually used by the 
tables from the DB
   
   



##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -353,6 +461,11 @@ public int getRateLimiterMapSize() {
     return _rateLimiterMap.size();
   }
 
+  @VisibleForTesting

Review Comment:
   nit: for the two methods marked VisibleForTesting, perhaps just add a method 
(VisibleForTesting) to return _databaseRateLimiterMap



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java:
##########
@@ -27,6 +27,8 @@
 import org.apache.pinot.spi.exception.DatabaseConflictException;
 import org.apache.pinot.spi.utils.CommonConstants;
 
+import static org.apache.pinot.spi.utils.CommonConstants.DATABASE;

Review Comment:
   I think `import static` is less recommended since sometime ago, but they are 
not cleaned up from the repo yet. So better avoid it when possible.



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java:
##########
@@ -155,4 +157,18 @@ public static String extractDatabaseFromQueryRequest(
     String database = databaseFromHeaders != null ? databaseFromHeaders : 
databaseFromOptions;
     return Objects.requireNonNullElse(database, 
CommonConstants.DEFAULT_DATABASE);
   }
+
+  /**
+   * Extract the database name from the prefix of fully qualified table name.
+   * If no prefix is present "default" database is returned
+   */
+  public static String extractDatabaseFromFullyQualifiedTableName(String 
fullyQualifiedTableName) {

Review Comment:
   looks like `fullyQualifiedTableName` is not necessarily a fully qualified 
table name, as there is check on split.length
   
   if so, perhaps rename the method as extractDatabaseFromTableName()



##########
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java:
##########
@@ -129,6 +135,32 @@ public void onError(Exception e, ErrorCode code, ErrorType 
type) {
     }
   }
 
+  private class RefreshDatabaseConfigMessageHandler extends MessageHandler {
+    final String _databaseName;
+
+    RefreshDatabaseConfigMessageHandler(DatabaseConfigRefreshMessage 
databaseConfigRefreshMessage,
+        NotificationContext context) {
+      super(databaseConfigRefreshMessage, context);
+      _databaseName = databaseConfigRefreshMessage.getDatabaseName();
+    }
+
+    @Override
+    public HelixTaskResult handleMessage() {
+      // only update the existing rate limiter.
+      // Database rate limiter creation should only be done through table 
based change triggers
+      _queryQuotaManager.updateDatabaseRateLimiter(_databaseName);

Review Comment:
   I'd assume custom HelixMsg would be processed by a separate thread pool, 
while the ClusterChangeMediator may have its own internal thread pool to invoke 
listeners upon Helix changes. can confirm by printing the thread names



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to