bziobrowski commented on code in PR #14226: URL: https://github.com/apache/pinot/pull/14226#discussion_r1807477477
########## pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java: ########## @@ -308,10 +372,72 @@ private synchronized void createOrUpdateDatabaseRateLimiter(List<String> databas } LOGGER.info("Updating existing query rate limiter for database {} from rate {} to {}", databaseName, oldQuota, perBrokerQpsQuota); - oldQueryQuotaEntity.setRateLimiter(RateLimiter.create(perBrokerQpsQuota)); + oldQueryQuotaEntity.setRateLimiter(createRateLimiter(perBrokerQpsQuota)); + } + } + + public synchronized void createOrUpdateApplicationRateLimiter(String applicationName) { + createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName)); + } + + // Caller method need not worry about getting lock on _applicationRateLimiterMap + // as this method will do idempotent updates to the application rate limiters + private synchronized void createOrUpdateApplicationRateLimiter(List<String> applicationNames) { + ExternalView brokerResource = getBrokerResource(); + for (String appName : applicationNames) { + double appQpsQuota = getEffectiveQueryQuotaOnApplication(appName); + if (appQpsQuota < 0) { + buildEmptyOrResetApplicationRateLimiter(appName); + continue; + } + int numOnlineBrokers = getNumOnlineBrokers(brokerResource); + double perBrokerQpsQuota = appQpsQuota / numOnlineBrokers; + QueryQuotaEntity oldEntity = _applicationRateLimiterMap.get(appName); + if (oldEntity == null) { + LOGGER.info("Adding new query rate limiter for application {} with rate {}.", appName, perBrokerQpsQuota); + QueryQuotaEntity queryQuotaEntity = + new QueryQuotaEntity(createRateLimiter(perBrokerQpsQuota), new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), + new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), numOnlineBrokers, appQpsQuota, -1); + _applicationRateLimiterMap.put(appName, queryQuotaEntity); + continue; + } + boolean isChange = false; + double oldQuota = oldEntity.getRateLimiter() != null ? oldEntity.getRateLimiter().getRate() : -1; + if (oldEntity.getOverallRate() != appQpsQuota) { + isChange = true; + LOGGER.info("Overall quota changed for the application {} from {} to {}", appName, oldEntity.getOverallRate(), + appQpsQuota); + oldEntity.setOverallRate(appQpsQuota); + } + if (oldEntity.getNumOnlineBrokers() != numOnlineBrokers) { + isChange = true; + LOGGER.info("Number of online brokers changed for the application from {} to {}", + oldEntity.getNumOnlineBrokers(), numOnlineBrokers); + oldEntity.setNumOnlineBrokers(numOnlineBrokers); + } + if (!isChange) { + LOGGER.info("No change detected with the query rate limiter for application {}", appName); + continue; + } + LOGGER.info("Updating existing query rate limiter for application {} from rate {} to {}", appName, oldQuota, + perBrokerQpsQuota); + oldEntity.setRateLimiter(createRateLimiter(perBrokerQpsQuota)); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org