bziobrowski commented on code in PR #14226:
URL: https://github.com/apache/pinot/pull/14226#discussion_r1807477477
##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -308,10 +372,72 @@ private synchronized void
createOrUpdateDatabaseRateLimiter(List<String> databas
}
LOGGER.info("Updating existing query rate limiter for database {} from
rate {} to {}", databaseName, oldQuota,
perBrokerQpsQuota);
-
oldQueryQuotaEntity.setRateLimiter(RateLimiter.create(perBrokerQpsQuota));
+ oldQueryQuotaEntity.setRateLimiter(createRateLimiter(perBrokerQpsQuota));
+ }
+ }
+
+ public synchronized void createOrUpdateApplicationRateLimiter(String
applicationName) {
+
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName));
+ }
+
+ // Caller method need not worry about getting lock on
_applicationRateLimiterMap
+ // as this method will do idempotent updates to the application rate limiters
+ private synchronized void createOrUpdateApplicationRateLimiter(List<String>
applicationNames) {
+ ExternalView brokerResource = getBrokerResource();
+ for (String appName : applicationNames) {
+ double appQpsQuota = getEffectiveQueryQuotaOnApplication(appName);
+ if (appQpsQuota < 0) {
+ buildEmptyOrResetApplicationRateLimiter(appName);
+ continue;
+ }
+ int numOnlineBrokers = getNumOnlineBrokers(brokerResource);
+ double perBrokerQpsQuota = appQpsQuota / numOnlineBrokers;
+ QueryQuotaEntity oldEntity = _applicationRateLimiterMap.get(appName);
+ if (oldEntity == null) {
+ LOGGER.info("Adding new query rate limiter for application {} with
rate {}.", appName, perBrokerQpsQuota);
+ QueryQuotaEntity queryQuotaEntity =
+ new QueryQuotaEntity(createRateLimiter(perBrokerQpsQuota), new
HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
+ new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND),
numOnlineBrokers, appQpsQuota, -1);
+ _applicationRateLimiterMap.put(appName, queryQuotaEntity);
+ continue;
+ }
+ boolean isChange = false;
+ double oldQuota = oldEntity.getRateLimiter() != null ?
oldEntity.getRateLimiter().getRate() : -1;
+ if (oldEntity.getOverallRate() != appQpsQuota) {
+ isChange = true;
+ LOGGER.info("Overall quota changed for the application {} from {} to
{}", appName, oldEntity.getOverallRate(),
+ appQpsQuota);
+ oldEntity.setOverallRate(appQpsQuota);
+ }
+ if (oldEntity.getNumOnlineBrokers() != numOnlineBrokers) {
+ isChange = true;
+ LOGGER.info("Number of online brokers changed for the application from
{} to {}",
+ oldEntity.getNumOnlineBrokers(), numOnlineBrokers);
+ oldEntity.setNumOnlineBrokers(numOnlineBrokers);
+ }
+ if (!isChange) {
+ LOGGER.info("No change detected with the query rate limiter for
application {}", appName);
+ continue;
+ }
+ LOGGER.info("Updating existing query rate limiter for application {}
from rate {} to {}", appName, oldQuota,
+ perBrokerQpsQuota);
+ oldEntity.setRateLimiter(createRateLimiter(perBrokerQpsQuota));
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]