klsince commented on code in PR #13544:
URL: https://github.com/apache/pinot/pull/13544#discussion_r1700630267


##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -230,6 +253,114 @@ private void createOrUpdateRateLimiter(String 
tableNameWithType, ExternalView br
     }
   }
 
+  /**
+   * Updates the database rate limiter if it already exists. Will not create a 
new database rate limiter.
+   * @param databaseName database name for which rate limiter needs to be 
updated
+   */
+  public void updateDatabaseRateLimiter(String databaseName) {
+    if (!_databaseRateLimiterMap.containsKey(databaseName)) {
+      return;
+    }
+    createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+  }
+
+  public synchronized void createOrUpdateDatabaseRateLimiter(List<String> 
databaseNames) {
+    ExternalView brokerResource = HelixHelper
+        .getExternalViewForResource(_helixManager.getClusterManagmentTool(), 
_helixManager.getClusterName(),
+            CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    for (String databaseName : databaseNames) {
+      double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName);
+      if (databaseQpsQuota < 0) {
+        buildEmptyOrResetDatabaseRateLimiter(databaseName);
+        continue;
+      }
+      int quotaSplitFactor = getPerBrokerQpsQuotaSplit(databaseName, 
brokerResource);
+      double perBrokerQpsQuota = databaseQpsQuota / quotaSplitFactor;
+      QueryQuotaEntity oldRateLimiter = 
_databaseRateLimiterMap.get(databaseName);
+      String message;
+      if (oldRateLimiter == null) {
+        message = String.format("New query rate limiter added for database %s 
with rate %s.", databaseName,
+            perBrokerQpsQuota);
+      } else {
+        boolean changeDetected = false;
+        double oldRate = oldRateLimiter.getRateLimiter() != null ? 
oldRateLimiter.getRateLimiter().getRate() : -1;
+        message = String.format("Updated existing query rate limiter for 
database %s from rate %s to %s", databaseName,

Review Comment:
   would suggest just to log this message in the end



##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -230,6 +253,114 @@ private void createOrUpdateRateLimiter(String 
tableNameWithType, ExternalView br
     }
   }
 
+  /**
+   * Updates the database rate limiter if it already exists. Will not create a 
new database rate limiter.
+   * @param databaseName database name for which rate limiter needs to be 
updated
+   */
+  public void updateDatabaseRateLimiter(String databaseName) {
+    if (!_databaseRateLimiterMap.containsKey(databaseName)) {
+      return;
+    }
+    createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+  }
+
+  public synchronized void createOrUpdateDatabaseRateLimiter(List<String> 
databaseNames) {
+    ExternalView brokerResource = HelixHelper
+        .getExternalViewForResource(_helixManager.getClusterManagmentTool(), 
_helixManager.getClusterName(),
+            CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    for (String databaseName : databaseNames) {
+      double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName);
+      if (databaseQpsQuota < 0) {
+        buildEmptyOrResetDatabaseRateLimiter(databaseName);
+        continue;
+      }
+      int quotaSplitFactor = getPerBrokerQpsQuotaSplit(databaseName, 
brokerResource);
+      double perBrokerQpsQuota = databaseQpsQuota / quotaSplitFactor;
+      QueryQuotaEntity oldRateLimiter = 
_databaseRateLimiterMap.get(databaseName);
+      String message;
+      if (oldRateLimiter == null) {
+        message = String.format("New query rate limiter added for database %s 
with rate %s.", databaseName,
+            perBrokerQpsQuota);
+      } else {
+        boolean changeDetected = false;
+        double oldRate = oldRateLimiter.getRateLimiter() != null ? 
oldRateLimiter.getRateLimiter().getRate() : -1;
+        message = String.format("Updated existing query rate limiter for 
database %s from rate %s to %s", databaseName,
+            oldRate, perBrokerQpsQuota);
+        if (oldRateLimiter.getOverallRate() != databaseQpsQuota) {
+          changeDetected = true;
+          message += ". Overall quota changed for the database from " + 
oldRateLimiter.getOverallRate() + " to "

Review Comment:
   I'd just log this reason, instead of composing a log msg.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -230,6 +253,114 @@ private void createOrUpdateRateLimiter(String 
tableNameWithType, ExternalView br
     }
   }
 
+  /**
+   * Updates the database rate limiter if it already exists. Will not create a 
new database rate limiter.
+   * @param databaseName database name for which rate limiter needs to be 
updated
+   */
+  public void updateDatabaseRateLimiter(String databaseName) {
+    if (!_databaseRateLimiterMap.containsKey(databaseName)) {
+      return;
+    }
+    createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+  }
+
+  public synchronized void createOrUpdateDatabaseRateLimiter(List<String> 
databaseNames) {
+    ExternalView brokerResource = HelixHelper
+        .getExternalViewForResource(_helixManager.getClusterManagmentTool(), 
_helixManager.getClusterName(),
+            CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    for (String databaseName : databaseNames) {
+      double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName);
+      if (databaseQpsQuota < 0) {
+        buildEmptyOrResetDatabaseRateLimiter(databaseName);
+        continue;
+      }
+      int quotaSplitFactor = getPerBrokerQpsQuotaSplit(databaseName, 
brokerResource);
+      double perBrokerQpsQuota = databaseQpsQuota / quotaSplitFactor;
+      QueryQuotaEntity oldRateLimiter = 
_databaseRateLimiterMap.get(databaseName);
+      String message;
+      if (oldRateLimiter == null) {
+        message = String.format("New query rate limiter added for database %s 
with rate %s.", databaseName,
+            perBrokerQpsQuota);
+      } else {
+        boolean changeDetected = false;
+        double oldRate = oldRateLimiter.getRateLimiter() != null ? 
oldRateLimiter.getRateLimiter().getRate() : -1;
+        message = String.format("Updated existing query rate limiter for 
database %s from rate %s to %s", databaseName,
+            oldRate, perBrokerQpsQuota);
+        if (oldRateLimiter.getOverallRate() != databaseQpsQuota) {
+          changeDetected = true;
+          message += ". Overall quota changed for the database from " + 
oldRateLimiter.getOverallRate() + " to "
+              + databaseQpsQuota;
+        }
+        if (oldRateLimiter.getNumOnlineBrokers() != quotaSplitFactor) {
+          changeDetected = true;
+          message += ". Quota split factor changed for the database from " + 
oldRateLimiter.getOverallRate() + " to "
+              + quotaSplitFactor;
+        }
+        if (!changeDetected) {
+          LOGGER.info("No change detected with the query rate limiter for 
database {}", databaseName);
+          return;

Review Comment:
   `continue` to next database? perhaps add a UT for processing multiple dbs if 
not having one yet



##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -230,6 +253,114 @@ private void createOrUpdateRateLimiter(String 
tableNameWithType, ExternalView br
     }
   }
 
+  /**
+   * Updates the database rate limiter if it already exists. Will not create a 
new database rate limiter.
+   * @param databaseName database name for which rate limiter needs to be 
updated
+   */
+  public void updateDatabaseRateLimiter(String databaseName) {
+    if (!_databaseRateLimiterMap.containsKey(databaseName)) {
+      return;
+    }
+    createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+  }
+
+  public synchronized void createOrUpdateDatabaseRateLimiter(List<String> 
databaseNames) {
+    ExternalView brokerResource = HelixHelper
+        .getExternalViewForResource(_helixManager.getClusterManagmentTool(), 
_helixManager.getClusterName(),
+            CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    for (String databaseName : databaseNames) {
+      double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName);
+      if (databaseQpsQuota < 0) {
+        buildEmptyOrResetDatabaseRateLimiter(databaseName);
+        continue;
+      }
+      int quotaSplitFactor = getPerBrokerQpsQuotaSplit(databaseName, 
brokerResource);
+      double perBrokerQpsQuota = databaseQpsQuota / quotaSplitFactor;
+      QueryQuotaEntity oldRateLimiter = 
_databaseRateLimiterMap.get(databaseName);
+      String message;
+      if (oldRateLimiter == null) {
+        message = String.format("New query rate limiter added for database %s 
with rate %s.", databaseName,
+            perBrokerQpsQuota);

Review Comment:
   nit: log this out, and continue.
   ```
   if () {
   continue;
   }
   // no need for else {, avoiding those indents
   boolean changeDetected = false;
   ...
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -230,6 +253,114 @@ private void createOrUpdateRateLimiter(String 
tableNameWithType, ExternalView br
     }
   }
 
+  /**
+   * Updates the database rate limiter if it already exists. Will not create a 
new database rate limiter.
+   * @param databaseName database name for which rate limiter needs to be 
updated
+   */
+  public void updateDatabaseRateLimiter(String databaseName) {
+    if (!_databaseRateLimiterMap.containsKey(databaseName)) {
+      return;
+    }
+    createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+  }
+
+  public synchronized void createOrUpdateDatabaseRateLimiter(List<String> 
databaseNames) {

Review Comment:
   you would need to add synchronized to updateDatabaseRateLimiter and 
createDatabaseRateLimiter methods too, otherwise they check the map membership 
outside the lock
   
   But looking at the updating logic for table rate limiter, which has does not 
synchronize at all, so is synchronization really needed for updating db rate?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java:
##########
@@ -327,7 +328,12 @@ private TableAuthorizationResult 
hasTableAccess(RequesterIdentity requesterIdent
   /**
    * Returns true if the QPS quota of the tables has exceeded.
    */
-  private boolean hasExceededQPSQuota(Set<String> tableNames, RequestContext 
requestContext) {
+  private boolean hasExceededQPSQuota(Set<String> tableNames, String database, 
RequestContext requestContext) {

Review Comment:
   nit: `hasExceededQPSQuota(@Nullable String database, ... tableNames, ...)`



##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -230,6 +253,114 @@ private void createOrUpdateRateLimiter(String 
tableNameWithType, ExternalView br
     }
   }
 
+  /**
+   * Updates the database rate limiter if it already exists. Will not create a 
new database rate limiter.
+   * @param databaseName database name for which rate limiter needs to be 
updated
+   */
+  public void updateDatabaseRateLimiter(String databaseName) {
+    if (!_databaseRateLimiterMap.containsKey(databaseName)) {
+      return;
+    }
+    createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+  }
+
+  public synchronized void createOrUpdateDatabaseRateLimiter(List<String> 
databaseNames) {
+    ExternalView brokerResource = HelixHelper
+        .getExternalViewForResource(_helixManager.getClusterManagmentTool(), 
_helixManager.getClusterName(),
+            CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    for (String databaseName : databaseNames) {
+      double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName);
+      if (databaseQpsQuota < 0) {
+        buildEmptyOrResetDatabaseRateLimiter(databaseName);
+        continue;
+      }
+      int quotaSplitFactor = getPerBrokerQpsQuotaSplit(databaseName, 
brokerResource);
+      double perBrokerQpsQuota = databaseQpsQuota / quotaSplitFactor;
+      QueryQuotaEntity oldRateLimiter = 
_databaseRateLimiterMap.get(databaseName);
+      String message;
+      if (oldRateLimiter == null) {
+        message = String.format("New query rate limiter added for database %s 
with rate %s.", databaseName,
+            perBrokerQpsQuota);
+      } else {
+        boolean changeDetected = false;
+        double oldRate = oldRateLimiter.getRateLimiter() != null ? 
oldRateLimiter.getRateLimiter().getRate() : -1;
+        message = String.format("Updated existing query rate limiter for 
database %s from rate %s to %s", databaseName,
+            oldRate, perBrokerQpsQuota);
+        if (oldRateLimiter.getOverallRate() != databaseQpsQuota) {
+          changeDetected = true;
+          message += ". Overall quota changed for the database from " + 
oldRateLimiter.getOverallRate() + " to "
+              + databaseQpsQuota;
+        }
+        if (oldRateLimiter.getNumOnlineBrokers() != quotaSplitFactor) {

Review Comment:
   would suggest to keep the names consistent, e.g. 
   1) quota (preferred) vs. rate
   2) numOnlineBroker (preferred) vs. splitFactor



##########
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java:
##########
@@ -129,6 +136,32 @@ public void onError(Exception e, ErrorCode code, ErrorType 
type) {
     }
   }
 
+  private class RefreshDatabaseConfigMessageHandler extends MessageHandler {

Review Comment:
   This helix msg only updates db quota on broker. But it is used for 
addDatabaseConfig controller side restful API. So when addDatabaseConfig, 
broker may not create the db quota. Is this expected behavior?
   
   ```
    public void addDatabaseConfig(DatabaseConfig databaseConfig) {
       if (!ZKMetadataProvider.createDatabaseConfig(_propertyStore, 
databaseConfig)) {
         throw new RuntimeException("Failed to create database config for 
database: " + databaseConfig.getDatabaseName());
       }
       sendDatabaseConfigRefreshMessage(databaseConfig.getDatabaseName());
     }
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -230,6 +253,114 @@ private void createOrUpdateRateLimiter(String 
tableNameWithType, ExternalView br
     }
   }
 
+  /**
+   * Updates the database rate limiter if it already exists. Will not create a 
new database rate limiter.
+   * @param databaseName database name for which rate limiter needs to be 
updated
+   */
+  public void updateDatabaseRateLimiter(String databaseName) {
+    if (!_databaseRateLimiterMap.containsKey(databaseName)) {
+      return;
+    }
+    createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+  }
+
+  public synchronized void createOrUpdateDatabaseRateLimiter(List<String> 
databaseNames) {
+    ExternalView brokerResource = HelixHelper
+        .getExternalViewForResource(_helixManager.getClusterManagmentTool(), 
_helixManager.getClusterName(),
+            CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    for (String databaseName : databaseNames) {
+      double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName);
+      if (databaseQpsQuota < 0) {
+        buildEmptyOrResetDatabaseRateLimiter(databaseName);
+        continue;
+      }
+      int quotaSplitFactor = getPerBrokerQpsQuotaSplit(databaseName, 
brokerResource);
+      double perBrokerQpsQuota = databaseQpsQuota / quotaSplitFactor;
+      QueryQuotaEntity oldRateLimiter = 
_databaseRateLimiterMap.get(databaseName);
+      String message;
+      if (oldRateLimiter == null) {
+        message = String.format("New query rate limiter added for database %s 
with rate %s.", databaseName,
+            perBrokerQpsQuota);
+      } else {
+        boolean changeDetected = false;
+        double oldRate = oldRateLimiter.getRateLimiter() != null ? 
oldRateLimiter.getRateLimiter().getRate() : -1;
+        message = String.format("Updated existing query rate limiter for 
database %s from rate %s to %s", databaseName,
+            oldRate, perBrokerQpsQuota);
+        if (oldRateLimiter.getOverallRate() != databaseQpsQuota) {
+          changeDetected = true;
+          message += ". Overall quota changed for the database from " + 
oldRateLimiter.getOverallRate() + " to "
+              + databaseQpsQuota;
+        }
+        if (oldRateLimiter.getNumOnlineBrokers() != quotaSplitFactor) {
+          changeDetected = true;
+          message += ". Quota split factor changed for the database from " + 
oldRateLimiter.getOverallRate() + " to "
+              + quotaSplitFactor;
+        }
+        if (!changeDetected) {
+          LOGGER.info("No change detected with the query rate limiter for 
database {}", databaseName);
+          return;
+        }
+      }
+      QueryQuotaEntity queryQuotaEntity = new 
QueryQuotaEntity(RateLimiter.create(perBrokerQpsQuota),
+          new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), new 
MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND),
+          quotaSplitFactor, databaseQpsQuota, -1);
+      _databaseRateLimiterMap.put(databaseName, queryQuotaEntity);
+      LOGGER.info(message);
+    }
+  }
+
+  // Pulling this logic to a separate placeholder method so that the quota 
split logic
+  // can be enhanced further in isolation.
+  private int getPerBrokerQpsQuotaSplit(String databaseName, ExternalView 
brokerResource) {

Review Comment:
   I would just call it getNumOnlineBrokers(), can comment a TODO that this 
method should return the online brokers used by the db, instead of all brokers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to