yashmayya commented on code in PR #13544: URL: https://github.com/apache/pinot/pull/13544#discussion_r1677642007
########## pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java: ########## @@ -214,6 +216,15 @@ public void addInstanceConfigChangeHandler(ClusterChangeHandler instanceConfigCh _instanceConfigChangeHandlers.add(instanceConfigChangeHandler); } + /** + * Adds a cluster config change handler to handle Helix cluster config change callbacks. + * <p>NOTE: all change handlers will be run in a single thread, so any slow change handler can block other change + * handlers from running. For slow change handler, make it asynchronous. + */ + public void addClusterConfigChangeHandler(ClusterChangeHandler clusterConfigChangeHandler) { Review Comment: Where is this method called? ########## pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java: ########## @@ -129,6 +135,32 @@ public void onError(Exception e, ErrorCode code, ErrorType type) { } } + private class RefreshDatabaseConfigMessageHandler extends MessageHandler { + final String _databaseName; + + RefreshDatabaseConfigMessageHandler(DatabaseConfigRefreshMessage databaseConfigRefreshMessage, + NotificationContext context) { + super(databaseConfigRefreshMessage, context); + _databaseName = databaseConfigRefreshMessage.getDatabaseName(); + } + + @Override + public HelixTaskResult handleMessage() { + // only update the existing rate limiter. + // Database rate limiter creation should only be done through table based change triggers + _queryQuotaManager.updateDatabaseRateLimiter(_databaseName); Review Comment: Can this cause race conditions between table creation and database quota config updates resulting in a stale quota config being used? ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java: ########## @@ -331,6 +331,12 @@ private TableAuthorizationResult hasTableAccess(RequesterIdentity requesterIdent * Returns true if the QPS quota of the tables has exceeded. */ private boolean hasExceededQPSQuota(Set<String> tableNames, RequestContext requestContext) { + String database = DatabaseUtils.extractDatabaseFromTableName(tableNames.iterator().next()); Review Comment: The `database` has already been extracted from the request when this method is called - probably better to pass that in rather than re-extracting it from the table name? ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java: ########## @@ -448,6 +448,14 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S } // Validate QPS quota + String database = DatabaseUtils.extractDatabaseFromTableName(tableName); + if (!_queryQuotaManager.acquireDatabase(database)) { + String errorMessage = + String.format("Request %d: %s exceeds query quota for database: %s", requestId, query, database); + LOGGER.info(errorMessage); Review Comment: nit: can use parameterized logging instead ########## pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java: ########## @@ -230,6 +248,72 @@ private void createOrUpdateRateLimiter(String tableNameWithType, ExternalView br } } + /** + * Updates the database rate limiter if it already exists. Will not create a new database rate limiter. + * @param databaseName database name for which rate limiter needs to be updated + */ + public void updateDatabaseRateLimiter(String databaseName) { + if (!_databaseRateLimiterMap.containsKey(databaseName)) { + return; + } + createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName)); + } + + private void createOrUpdateDatabaseRateLimiter(List<String> databaseNames) { + double databaseQpsQuota = _defaultQpsQuotaForDatabase; + ExternalView brokerResource = HelixHelper + .getExternalViewForResource(_helixManager.getClusterManagmentTool(), _helixManager.getClusterName(), + CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); + // Tables in database can span across broker tags as we don't maintain a broker tag to database mapping as of now. + // Hence, we consider all online brokers for the rate distribution. + int onlineBrokers = HelixHelper.getOnlineInstanceFromExternalView(brokerResource).size(); + for (String databaseName : databaseNames) { + DatabaseConfig databaseConfig = + ZKMetadataProvider.getDatabaseConfig(_helixManager.getHelixPropertyStore(), databaseName); + if (databaseConfig != null && databaseConfig.getQuotaConfig() != null + && databaseConfig.getQuotaConfig().getMaxQPS() != -1) { + databaseQpsQuota = databaseConfig.getQuotaConfig().getMaxQPS(); + } + if (databaseQpsQuota < 0) { + buildEmptyOrResetRateLimiterInDatabaseQueryQuotaEntity(databaseName); + continue; + } + double perBrokerQpsQuota = databaseQpsQuota / onlineBrokers; Review Comment: I think this could lead to extremely skewed scenarios right? Consider a large cluster with let's say 10 brokers and there's a database that has a single table with a broker tenant tag that only applies to a single broker. In this case, the single broker will get the entire table's query quota but only 10% of the database query quota which might very well be less than the individual table level query quota? ########## pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java: ########## @@ -155,4 +157,14 @@ public static String extractDatabaseFromQueryRequest( String database = databaseFromHeaders != null ? databaseFromHeaders : databaseFromOptions; return Objects.requireNonNullElse(database, CommonConstants.DEFAULT_DATABASE); } + + public static String extractDatabaseFromTableName(String tableName) { Review Comment: ```suggestion public static String extractDatabaseFromFullyQualifiedTableName(String tableName) { ``` nit: to make it more clear. Also could you please add a small Javadoc here? ########## pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java: ########## @@ -340,7 +435,7 @@ private boolean tryAcquireToken(String tableNameWithType, QueryQuotaEntity query // Emit the qps capacity utilization rate. int numHits = queryQuotaEntity.getQpsTracker().getHitCount(); if (!rateLimiter.tryAcquire()) { - LOGGER.info("Quota is exceeded for table: {}. Per-broker rate: {}. Current qps: {}", tableNameWithType, + LOGGER.info("Quota is exceeded for table/database: {}. Per-broker rate: {}. Current qps: {}", resourceName, Review Comment: I think it probably makes more sense to separate the log lines for table and database query quota limits to make it more clear to users whether the table or the database query quota has been exceeded? ########## pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java: ########## @@ -458,6 +566,27 @@ public void processQueryRateLimitingExternalViewChange(ExternalView currentBroke numRebuilt, _rateLimiterMap.size()); } + /** + * Process query quota state change when cluster config gets changed + */ + public void processQueryRateLimitingClusterConfigChange() { + double oldDatabaseQpsQuota = _defaultQpsQuotaForDatabase; + getDefaultQueryQuotaForDatabase(); + if (oldDatabaseQpsQuota == _defaultQpsQuotaForDatabase) { + return; + } + createOrUpdateDatabaseRateLimiter(new ArrayList<>(_databaseRateLimiterMap.keySet())); + } + + private void getDefaultQueryQuotaForDatabase() { Review Comment: IMO this part is not as explicit / unambiguous to read as it could be - WDYT about making `getDefaultQueryQuotaForDatabase` return the default value and setting `_defaultQpsQuotaForDatabase` at the method's call-sites rather than within the method itself? ########## pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java: ########## @@ -340,7 +435,7 @@ private boolean tryAcquireToken(String tableNameWithType, QueryQuotaEntity query // Emit the qps capacity utilization rate. int numHits = queryQuotaEntity.getQpsTracker().getHitCount(); if (!rateLimiter.tryAcquire()) { - LOGGER.info("Quota is exceeded for table: {}. Per-broker rate: {}. Current qps: {}", tableNameWithType, + LOGGER.info("Quota is exceeded for table/database: {}. Per-broker rate: {}. Current qps: {}", resourceName, Review Comment: Ah never mind, just noticed that we do that in the broker request handler anyway, so this isn't too important here. ########## pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java: ########## @@ -46,6 +55,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.pinot.spi.utils.CommonConstants.Helix.DATABASE_QUERY_RATE_LIMIT; + /** * This class is to support the qps quota feature. Review Comment: nit: this Javadoc should probably be updated - currently it states that `it only gets updated when a new table added or a broker restarted`. ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java: ########## @@ -107,6 +120,61 @@ public DeleteDatabaseResponse deleteTablesInDatabase( } return new DeleteDatabaseResponse(deletedTables, failedTables, dryRun); } + + /** + * API to update the quota configs for database + * If database config is not present it will be created implicitly + */ + @POST + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.APPLICATION_JSON) + @Path("/databases/{databaseName}/quotas") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.UPDATE_DATABASE_QUOTA) + @ApiOperation(value = "Update database quotas", notes = "Update database quotas") + public SuccessResponse addTable( + @PathParam("databaseName") String databaseName, @QueryParam("maxQueriesPerSecond") String queryQuota, + @Context HttpHeaders httpHeaders) { + if (!databaseName.equals(DatabaseUtils.extractDatabaseFromHttpHeaders(httpHeaders))) { + throw new ControllerApplicationException(LOGGER, "Database config name and request context does not match", + Response.Status.BAD_REQUEST); + } + try { + DatabaseConfig databaseConfig = _pinotHelixResourceManager.getDatabaseConfig(databaseName); + QuotaConfig quotaConfig = new QuotaConfig(null, queryQuota); + if (databaseConfig == null) { + databaseConfig = new DatabaseConfig(databaseName, quotaConfig); + _pinotHelixResourceManager.addDatabaseConfig(databaseConfig); + } else { + databaseConfig.setQuotaConfig(quotaConfig); + _pinotHelixResourceManager.updateDatabaseConfig(databaseConfig); + } + return new SuccessResponse("Database quotas for database config " + databaseName + " successfully updated"); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * API to get database quota configs. + * Will return null if database config is not defined or database quotas are not defined + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/databases/{databaseName}/quotas") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_DATABASE_QUOTA) + @ApiOperation(value = "Get database quota configs", notes = "Get database quota configs") + public QuotaConfig getDatabaseQuota( + @PathParam("databaseName") String databaseName, @Context HttpHeaders httpHeaders) { + if (!databaseName.equals(DatabaseUtils.extractDatabaseFromHttpHeaders(httpHeaders))) { + throw new ControllerApplicationException(LOGGER, "Database config name and request context does not match", + Response.Status.BAD_REQUEST); + } + DatabaseConfig databaseConfig = _pinotHelixResourceManager.getDatabaseConfig(databaseName); + if (databaseConfig != null) { + return databaseConfig.getQuotaConfig(); + } + return new QuotaConfig(null, null); Review Comment: Should we return the default database query quota here instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org