yashmayya commented on code in PR #13544:
URL: https://github.com/apache/pinot/pull/13544#discussion_r1677642007


##########
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java:
##########
@@ -214,6 +216,15 @@ public void 
addInstanceConfigChangeHandler(ClusterChangeHandler instanceConfigCh
     _instanceConfigChangeHandlers.add(instanceConfigChangeHandler);
   }
 
+  /**
+   * Adds a cluster config change handler to handle Helix cluster config 
change callbacks.
+   * <p>NOTE: all change handlers will be run in a single thread, so any slow 
change handler can block other change
+   * handlers from running. For slow change handler, make it asynchronous.
+   */
+  public void addClusterConfigChangeHandler(ClusterChangeHandler 
clusterConfigChangeHandler) {

Review Comment:
   Where is this method called?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java:
##########
@@ -129,6 +135,32 @@ public void onError(Exception e, ErrorCode code, ErrorType 
type) {
     }
   }
 
+  private class RefreshDatabaseConfigMessageHandler extends MessageHandler {
+    final String _databaseName;
+
+    RefreshDatabaseConfigMessageHandler(DatabaseConfigRefreshMessage 
databaseConfigRefreshMessage,
+        NotificationContext context) {
+      super(databaseConfigRefreshMessage, context);
+      _databaseName = databaseConfigRefreshMessage.getDatabaseName();
+    }
+
+    @Override
+    public HelixTaskResult handleMessage() {
+      // only update the existing rate limiter.
+      // Database rate limiter creation should only be done through table 
based change triggers
+      _queryQuotaManager.updateDatabaseRateLimiter(_databaseName);

Review Comment:
   Can this cause race conditions between table creation and database quota 
config updates resulting in a stale quota config being used?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java:
##########
@@ -331,6 +331,12 @@ private TableAuthorizationResult 
hasTableAccess(RequesterIdentity requesterIdent
    * Returns true if the QPS quota of the tables has exceeded.
    */
   private boolean hasExceededQPSQuota(Set<String> tableNames, RequestContext 
requestContext) {
+    String database = 
DatabaseUtils.extractDatabaseFromTableName(tableNames.iterator().next());

Review Comment:
   The `database` has already been extracted from the request when this method 
is called - probably better to pass that in rather than re-extracting it from 
the table name?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -448,6 +448,14 @@ protected BrokerResponse handleRequest(long requestId, 
String query, @Nullable S
       }
 
       // Validate QPS quota
+      String database = DatabaseUtils.extractDatabaseFromTableName(tableName);
+      if (!_queryQuotaManager.acquireDatabase(database)) {
+        String errorMessage =
+            String.format("Request %d: %s exceeds query quota for database: 
%s", requestId, query, database);
+        LOGGER.info(errorMessage);

Review Comment:
   nit: can use parameterized logging instead



##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -230,6 +248,72 @@ private void createOrUpdateRateLimiter(String 
tableNameWithType, ExternalView br
     }
   }
 
+  /**
+   * Updates the database rate limiter if it already exists. Will not create a 
new database rate limiter.
+   * @param databaseName database name for which rate limiter needs to be 
updated
+   */
+  public void updateDatabaseRateLimiter(String databaseName) {
+    if (!_databaseRateLimiterMap.containsKey(databaseName)) {
+      return;
+    }
+    createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+  }
+
+  private void createOrUpdateDatabaseRateLimiter(List<String> databaseNames) {
+    double databaseQpsQuota = _defaultQpsQuotaForDatabase;
+    ExternalView brokerResource = HelixHelper
+        .getExternalViewForResource(_helixManager.getClusterManagmentTool(), 
_helixManager.getClusterName(),
+            CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    // Tables in database can span across broker tags as we don't maintain a 
broker tag to database mapping as of now.
+    // Hence, we consider all online brokers for the rate distribution.
+    int onlineBrokers = 
HelixHelper.getOnlineInstanceFromExternalView(brokerResource).size();
+    for (String databaseName : databaseNames) {
+      DatabaseConfig databaseConfig =
+          
ZKMetadataProvider.getDatabaseConfig(_helixManager.getHelixPropertyStore(), 
databaseName);
+      if (databaseConfig != null && databaseConfig.getQuotaConfig() != null
+          && databaseConfig.getQuotaConfig().getMaxQPS() != -1) {
+        databaseQpsQuota = databaseConfig.getQuotaConfig().getMaxQPS();
+      }
+      if (databaseQpsQuota < 0) {
+        buildEmptyOrResetRateLimiterInDatabaseQueryQuotaEntity(databaseName);
+        continue;
+      }
+      double perBrokerQpsQuota = databaseQpsQuota / onlineBrokers;

Review Comment:
   I think this could lead to extremely skewed scenarios right? Consider a 
large cluster with let's say 10 brokers and there's a database that has a 
single table with a broker tenant tag that only applies to a single broker. In 
this case, the single broker will get the entire table's query quota but only 
10% of the database query quota which might very well be less than the 
individual table level query quota?



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java:
##########
@@ -155,4 +157,14 @@ public static String extractDatabaseFromQueryRequest(
     String database = databaseFromHeaders != null ? databaseFromHeaders : 
databaseFromOptions;
     return Objects.requireNonNullElse(database, 
CommonConstants.DEFAULT_DATABASE);
   }
+
+  public static String extractDatabaseFromTableName(String tableName) {

Review Comment:
   ```suggestion
     public static String extractDatabaseFromFullyQualifiedTableName(String 
tableName) {
   ```
   nit: to make it more clear. Also could you please add a small Javadoc here?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -340,7 +435,7 @@ private boolean tryAcquireToken(String tableNameWithType, 
QueryQuotaEntity query
     // Emit the qps capacity utilization rate.
     int numHits = queryQuotaEntity.getQpsTracker().getHitCount();
     if (!rateLimiter.tryAcquire()) {
-      LOGGER.info("Quota is exceeded for table: {}. Per-broker rate: {}. 
Current qps: {}", tableNameWithType,
+      LOGGER.info("Quota is exceeded for table/database: {}. Per-broker rate: 
{}. Current qps: {}", resourceName,

Review Comment:
   I think it probably makes more sense to separate the log lines for table and 
database query quota limits to make it more clear to users whether the table or 
the database query quota has been exceeded?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -458,6 +566,27 @@ public void 
processQueryRateLimitingExternalViewChange(ExternalView currentBroke
             numRebuilt, _rateLimiterMap.size());
   }
 
+  /**
+   * Process query quota state change when cluster config gets changed
+   */
+  public void processQueryRateLimitingClusterConfigChange() {
+    double oldDatabaseQpsQuota = _defaultQpsQuotaForDatabase;
+    getDefaultQueryQuotaForDatabase();
+    if (oldDatabaseQpsQuota == _defaultQpsQuotaForDatabase) {
+      return;
+    }
+    createOrUpdateDatabaseRateLimiter(new 
ArrayList<>(_databaseRateLimiterMap.keySet()));
+  }
+
+  private void getDefaultQueryQuotaForDatabase() {

Review Comment:
   IMO this part is not as explicit / unambiguous to read as it could be - WDYT 
about making `getDefaultQueryQuotaForDatabase` return the default value and 
setting `_defaultQpsQuotaForDatabase` at the method's call-sites rather than 
within the method itself?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -340,7 +435,7 @@ private boolean tryAcquireToken(String tableNameWithType, 
QueryQuotaEntity query
     // Emit the qps capacity utilization rate.
     int numHits = queryQuotaEntity.getQpsTracker().getHitCount();
     if (!rateLimiter.tryAcquire()) {
-      LOGGER.info("Quota is exceeded for table: {}. Per-broker rate: {}. 
Current qps: {}", tableNameWithType,
+      LOGGER.info("Quota is exceeded for table/database: {}. Per-broker rate: 
{}. Current qps: {}", resourceName,

Review Comment:
   Ah never mind, just noticed that we do that in the broker request handler 
anyway, so this isn't too important here.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -46,6 +55,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.DATABASE_QUERY_RATE_LIMIT;
+
 
 /**
  * This class is to support the qps quota feature.

Review Comment:
   nit: this Javadoc should probably be updated - currently it states that `it 
only gets updated when a new table added or a broker restarted`.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java:
##########
@@ -107,6 +120,61 @@ public DeleteDatabaseResponse deleteTablesInDatabase(
     }
     return new DeleteDatabaseResponse(deletedTables, failedTables, dryRun);
   }
+
+  /**
+   * API to update the quota configs for database
+   * If database config is not present it will be created implicitly
+   */
+  @POST
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Path("/databases/{databaseName}/quotas")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.UPDATE_DATABASE_QUOTA)
+  @ApiOperation(value = "Update database quotas", notes = "Update database 
quotas")
+  public SuccessResponse addTable(
+      @PathParam("databaseName") String databaseName, 
@QueryParam("maxQueriesPerSecond") String queryQuota,
+      @Context HttpHeaders httpHeaders) {
+    if 
(!databaseName.equals(DatabaseUtils.extractDatabaseFromHttpHeaders(httpHeaders)))
 {
+      throw new ControllerApplicationException(LOGGER, "Database config name 
and request context does not match",
+          Response.Status.BAD_REQUEST);
+    }
+    try {
+      DatabaseConfig databaseConfig = 
_pinotHelixResourceManager.getDatabaseConfig(databaseName);
+      QuotaConfig quotaConfig = new QuotaConfig(null, queryQuota);
+      if (databaseConfig == null) {
+         databaseConfig = new DatabaseConfig(databaseName, quotaConfig);
+        _pinotHelixResourceManager.addDatabaseConfig(databaseConfig);
+      } else {
+        databaseConfig.setQuotaConfig(quotaConfig);
+        _pinotHelixResourceManager.updateDatabaseConfig(databaseConfig);
+      }
+      return new SuccessResponse("Database quotas for database config " + 
databaseName + " successfully updated");
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
+  /**
+   * API to get database quota configs.
+   * Will return null if database config is not defined or database quotas are 
not defined
+   */
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/databases/{databaseName}/quotas")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_DATABASE_QUOTA)
+  @ApiOperation(value = "Get database quota configs", notes = "Get database 
quota configs")
+  public QuotaConfig getDatabaseQuota(
+      @PathParam("databaseName") String databaseName, @Context HttpHeaders 
httpHeaders) {
+    if 
(!databaseName.equals(DatabaseUtils.extractDatabaseFromHttpHeaders(httpHeaders)))
 {
+      throw new ControllerApplicationException(LOGGER, "Database config name 
and request context does not match",
+          Response.Status.BAD_REQUEST);
+    }
+    DatabaseConfig databaseConfig = 
_pinotHelixResourceManager.getDatabaseConfig(databaseName);
+    if (databaseConfig != null) {
+      return databaseConfig.getQuotaConfig();
+    }
+    return new QuotaConfig(null, null);

Review Comment:
   Should we return the default database query quota here instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to