klsince commented on code in PR #13544: URL: https://github.com/apache/pinot/pull/13544#discussion_r1700630267
########## pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java: ########## @@ -230,6 +253,114 @@ private void createOrUpdateRateLimiter(String tableNameWithType, ExternalView br } } + /** + * Updates the database rate limiter if it already exists. Will not create a new database rate limiter. + * @param databaseName database name for which rate limiter needs to be updated + */ + public void updateDatabaseRateLimiter(String databaseName) { + if (!_databaseRateLimiterMap.containsKey(databaseName)) { + return; + } + createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName)); + } + + public synchronized void createOrUpdateDatabaseRateLimiter(List<String> databaseNames) { + ExternalView brokerResource = HelixHelper + .getExternalViewForResource(_helixManager.getClusterManagmentTool(), _helixManager.getClusterName(), + CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); + for (String databaseName : databaseNames) { + double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName); + if (databaseQpsQuota < 0) { + buildEmptyOrResetDatabaseRateLimiter(databaseName); + continue; + } + int quotaSplitFactor = getPerBrokerQpsQuotaSplit(databaseName, brokerResource); + double perBrokerQpsQuota = databaseQpsQuota / quotaSplitFactor; + QueryQuotaEntity oldRateLimiter = _databaseRateLimiterMap.get(databaseName); + String message; + if (oldRateLimiter == null) { + message = String.format("New query rate limiter added for database %s with rate %s.", databaseName, + perBrokerQpsQuota); + } else { + boolean changeDetected = false; + double oldRate = oldRateLimiter.getRateLimiter() != null ? oldRateLimiter.getRateLimiter().getRate() : -1; + message = String.format("Updated existing query rate limiter for database %s from rate %s to %s", databaseName, Review Comment: would suggest just to log this message in the end ########## pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java: ########## @@ -230,6 +253,114 @@ private void createOrUpdateRateLimiter(String tableNameWithType, ExternalView br } } + /** + * Updates the database rate limiter if it already exists. Will not create a new database rate limiter. + * @param databaseName database name for which rate limiter needs to be updated + */ + public void updateDatabaseRateLimiter(String databaseName) { + if (!_databaseRateLimiterMap.containsKey(databaseName)) { + return; + } + createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName)); + } + + public synchronized void createOrUpdateDatabaseRateLimiter(List<String> databaseNames) { + ExternalView brokerResource = HelixHelper + .getExternalViewForResource(_helixManager.getClusterManagmentTool(), _helixManager.getClusterName(), + CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); + for (String databaseName : databaseNames) { + double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName); + if (databaseQpsQuota < 0) { + buildEmptyOrResetDatabaseRateLimiter(databaseName); + continue; + } + int quotaSplitFactor = getPerBrokerQpsQuotaSplit(databaseName, brokerResource); + double perBrokerQpsQuota = databaseQpsQuota / quotaSplitFactor; + QueryQuotaEntity oldRateLimiter = _databaseRateLimiterMap.get(databaseName); + String message; + if (oldRateLimiter == null) { + message = String.format("New query rate limiter added for database %s with rate %s.", databaseName, + perBrokerQpsQuota); + } else { + boolean changeDetected = false; + double oldRate = oldRateLimiter.getRateLimiter() != null ? oldRateLimiter.getRateLimiter().getRate() : -1; + message = String.format("Updated existing query rate limiter for database %s from rate %s to %s", databaseName, + oldRate, perBrokerQpsQuota); + if (oldRateLimiter.getOverallRate() != databaseQpsQuota) { + changeDetected = true; + message += ". Overall quota changed for the database from " + oldRateLimiter.getOverallRate() + " to " Review Comment: I'd just log this reason, instead of composing a log msg. ########## pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java: ########## @@ -230,6 +253,114 @@ private void createOrUpdateRateLimiter(String tableNameWithType, ExternalView br } } + /** + * Updates the database rate limiter if it already exists. Will not create a new database rate limiter. + * @param databaseName database name for which rate limiter needs to be updated + */ + public void updateDatabaseRateLimiter(String databaseName) { + if (!_databaseRateLimiterMap.containsKey(databaseName)) { + return; + } + createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName)); + } + + public synchronized void createOrUpdateDatabaseRateLimiter(List<String> databaseNames) { + ExternalView brokerResource = HelixHelper + .getExternalViewForResource(_helixManager.getClusterManagmentTool(), _helixManager.getClusterName(), + CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); + for (String databaseName : databaseNames) { + double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName); + if (databaseQpsQuota < 0) { + buildEmptyOrResetDatabaseRateLimiter(databaseName); + continue; + } + int quotaSplitFactor = getPerBrokerQpsQuotaSplit(databaseName, brokerResource); + double perBrokerQpsQuota = databaseQpsQuota / quotaSplitFactor; + QueryQuotaEntity oldRateLimiter = _databaseRateLimiterMap.get(databaseName); + String message; + if (oldRateLimiter == null) { + message = String.format("New query rate limiter added for database %s with rate %s.", databaseName, + perBrokerQpsQuota); + } else { + boolean changeDetected = false; + double oldRate = oldRateLimiter.getRateLimiter() != null ? oldRateLimiter.getRateLimiter().getRate() : -1; + message = String.format("Updated existing query rate limiter for database %s from rate %s to %s", databaseName, + oldRate, perBrokerQpsQuota); + if (oldRateLimiter.getOverallRate() != databaseQpsQuota) { + changeDetected = true; + message += ". Overall quota changed for the database from " + oldRateLimiter.getOverallRate() + " to " + + databaseQpsQuota; + } + if (oldRateLimiter.getNumOnlineBrokers() != quotaSplitFactor) { + changeDetected = true; + message += ". Quota split factor changed for the database from " + oldRateLimiter.getOverallRate() + " to " + + quotaSplitFactor; + } + if (!changeDetected) { + LOGGER.info("No change detected with the query rate limiter for database {}", databaseName); + return; Review Comment: `continue` to next database? perhaps add a UT for processing multiple dbs if not having one yet ########## pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java: ########## @@ -230,6 +253,114 @@ private void createOrUpdateRateLimiter(String tableNameWithType, ExternalView br } } + /** + * Updates the database rate limiter if it already exists. Will not create a new database rate limiter. + * @param databaseName database name for which rate limiter needs to be updated + */ + public void updateDatabaseRateLimiter(String databaseName) { + if (!_databaseRateLimiterMap.containsKey(databaseName)) { + return; + } + createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName)); + } + + public synchronized void createOrUpdateDatabaseRateLimiter(List<String> databaseNames) { + ExternalView brokerResource = HelixHelper + .getExternalViewForResource(_helixManager.getClusterManagmentTool(), _helixManager.getClusterName(), + CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); + for (String databaseName : databaseNames) { + double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName); + if (databaseQpsQuota < 0) { + buildEmptyOrResetDatabaseRateLimiter(databaseName); + continue; + } + int quotaSplitFactor = getPerBrokerQpsQuotaSplit(databaseName, brokerResource); + double perBrokerQpsQuota = databaseQpsQuota / quotaSplitFactor; + QueryQuotaEntity oldRateLimiter = _databaseRateLimiterMap.get(databaseName); + String message; + if (oldRateLimiter == null) { + message = String.format("New query rate limiter added for database %s with rate %s.", databaseName, + perBrokerQpsQuota); Review Comment: nit: log this out, and continue. ``` if () { continue; } // no need for else {, avoiding those indents boolean changeDetected = false; ... ``` ########## pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java: ########## @@ -230,6 +253,114 @@ private void createOrUpdateRateLimiter(String tableNameWithType, ExternalView br } } + /** + * Updates the database rate limiter if it already exists. Will not create a new database rate limiter. + * @param databaseName database name for which rate limiter needs to be updated + */ + public void updateDatabaseRateLimiter(String databaseName) { + if (!_databaseRateLimiterMap.containsKey(databaseName)) { + return; + } + createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName)); + } + + public synchronized void createOrUpdateDatabaseRateLimiter(List<String> databaseNames) { Review Comment: you would need to add synchronized to updateDatabaseRateLimiter and createDatabaseRateLimiter methods too, otherwise they check the map membership outside the lock But looking at the updating logic for table rate limiter, which has does not synchronize at all, so is synchronization really needed for updating db rate? ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java: ########## @@ -327,7 +328,12 @@ private TableAuthorizationResult hasTableAccess(RequesterIdentity requesterIdent /** * Returns true if the QPS quota of the tables has exceeded. */ - private boolean hasExceededQPSQuota(Set<String> tableNames, RequestContext requestContext) { + private boolean hasExceededQPSQuota(Set<String> tableNames, String database, RequestContext requestContext) { Review Comment: nit: `hasExceededQPSQuota(@Nullable String database, ... tableNames, ...)` ########## pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java: ########## @@ -230,6 +253,114 @@ private void createOrUpdateRateLimiter(String tableNameWithType, ExternalView br } } + /** + * Updates the database rate limiter if it already exists. Will not create a new database rate limiter. + * @param databaseName database name for which rate limiter needs to be updated + */ + public void updateDatabaseRateLimiter(String databaseName) { + if (!_databaseRateLimiterMap.containsKey(databaseName)) { + return; + } + createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName)); + } + + public synchronized void createOrUpdateDatabaseRateLimiter(List<String> databaseNames) { + ExternalView brokerResource = HelixHelper + .getExternalViewForResource(_helixManager.getClusterManagmentTool(), _helixManager.getClusterName(), + CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); + for (String databaseName : databaseNames) { + double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName); + if (databaseQpsQuota < 0) { + buildEmptyOrResetDatabaseRateLimiter(databaseName); + continue; + } + int quotaSplitFactor = getPerBrokerQpsQuotaSplit(databaseName, brokerResource); + double perBrokerQpsQuota = databaseQpsQuota / quotaSplitFactor; + QueryQuotaEntity oldRateLimiter = _databaseRateLimiterMap.get(databaseName); + String message; + if (oldRateLimiter == null) { + message = String.format("New query rate limiter added for database %s with rate %s.", databaseName, + perBrokerQpsQuota); + } else { + boolean changeDetected = false; + double oldRate = oldRateLimiter.getRateLimiter() != null ? oldRateLimiter.getRateLimiter().getRate() : -1; + message = String.format("Updated existing query rate limiter for database %s from rate %s to %s", databaseName, + oldRate, perBrokerQpsQuota); + if (oldRateLimiter.getOverallRate() != databaseQpsQuota) { + changeDetected = true; + message += ". Overall quota changed for the database from " + oldRateLimiter.getOverallRate() + " to " + + databaseQpsQuota; + } + if (oldRateLimiter.getNumOnlineBrokers() != quotaSplitFactor) { Review Comment: would suggest to keep the names consistent, e.g. 1) quota (preferred) vs. rate 2) numOnlineBroker (preferred) vs. splitFactor ########## pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java: ########## @@ -129,6 +136,32 @@ public void onError(Exception e, ErrorCode code, ErrorType type) { } } + private class RefreshDatabaseConfigMessageHandler extends MessageHandler { Review Comment: This helix msg only updates db quota on broker. But it is used for addDatabaseConfig controller side restful API. So when addDatabaseConfig, broker may not create the db quota. Is this expected behavior? ``` public void addDatabaseConfig(DatabaseConfig databaseConfig) { if (!ZKMetadataProvider.createDatabaseConfig(_propertyStore, databaseConfig)) { throw new RuntimeException("Failed to create database config for database: " + databaseConfig.getDatabaseName()); } sendDatabaseConfigRefreshMessage(databaseConfig.getDatabaseName()); } ``` ########## pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java: ########## @@ -230,6 +253,114 @@ private void createOrUpdateRateLimiter(String tableNameWithType, ExternalView br } } + /** + * Updates the database rate limiter if it already exists. Will not create a new database rate limiter. + * @param databaseName database name for which rate limiter needs to be updated + */ + public void updateDatabaseRateLimiter(String databaseName) { + if (!_databaseRateLimiterMap.containsKey(databaseName)) { + return; + } + createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName)); + } + + public synchronized void createOrUpdateDatabaseRateLimiter(List<String> databaseNames) { + ExternalView brokerResource = HelixHelper + .getExternalViewForResource(_helixManager.getClusterManagmentTool(), _helixManager.getClusterName(), + CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); + for (String databaseName : databaseNames) { + double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName); + if (databaseQpsQuota < 0) { + buildEmptyOrResetDatabaseRateLimiter(databaseName); + continue; + } + int quotaSplitFactor = getPerBrokerQpsQuotaSplit(databaseName, brokerResource); + double perBrokerQpsQuota = databaseQpsQuota / quotaSplitFactor; + QueryQuotaEntity oldRateLimiter = _databaseRateLimiterMap.get(databaseName); + String message; + if (oldRateLimiter == null) { + message = String.format("New query rate limiter added for database %s with rate %s.", databaseName, + perBrokerQpsQuota); + } else { + boolean changeDetected = false; + double oldRate = oldRateLimiter.getRateLimiter() != null ? oldRateLimiter.getRateLimiter().getRate() : -1; + message = String.format("Updated existing query rate limiter for database %s from rate %s to %s", databaseName, + oldRate, perBrokerQpsQuota); + if (oldRateLimiter.getOverallRate() != databaseQpsQuota) { + changeDetected = true; + message += ". Overall quota changed for the database from " + oldRateLimiter.getOverallRate() + " to " + + databaseQpsQuota; + } + if (oldRateLimiter.getNumOnlineBrokers() != quotaSplitFactor) { + changeDetected = true; + message += ". Quota split factor changed for the database from " + oldRateLimiter.getOverallRate() + " to " + + quotaSplitFactor; + } + if (!changeDetected) { + LOGGER.info("No change detected with the query rate limiter for database {}", databaseName); + return; + } + } + QueryQuotaEntity queryQuotaEntity = new QueryQuotaEntity(RateLimiter.create(perBrokerQpsQuota), + new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), + quotaSplitFactor, databaseQpsQuota, -1); + _databaseRateLimiterMap.put(databaseName, queryQuotaEntity); + LOGGER.info(message); + } + } + + // Pulling this logic to a separate placeholder method so that the quota split logic + // can be enhanced further in isolation. + private int getPerBrokerQpsQuotaSplit(String databaseName, ExternalView brokerResource) { Review Comment: I would just call it getNumOnlineBrokers(), can comment a TODO that this method should return the online brokers used by the db, instead of all brokers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org