This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch fix-max-burst-qps
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 1f49d149800931efd2b7b27764ad60be0754933a
Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz>
AuthorDate: Mon May 10 10:20:33 2021 -0700

    Move maxBurstQps broker metric in createRateLimiter method
---
 ...okerResourceOnlineOfflineStateModelFactory.java |  2 +-
 .../HelixExternalViewBasedQueryQuotaManager.java   | 47 ++++++++++++++--------
 ...elixExternalViewBasedQueryQuotaManagerTest.java | 28 ++++++-------
 .../pinot/common/metrics/AbstractMetrics.java      |  2 +-
 4 files changed, 46 insertions(+), 33 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
index b6639e2..6250d1b 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
@@ -79,7 +79,7 @@ public class BrokerResourceOnlineOfflineStateModelFactory 
extends StateModelFact
       try {
         _routingManager.buildRouting(tableNameWithType);
         TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
-        _queryQuotaManager.initTableQueryQuota(tableConfig,
+        _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig,
             
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(BROKER_RESOURCE_INSTANCE)));
       } catch (Exception e) {
         LOGGER.error("Caught exception while processing transition from 
OFFLINE to ONLINE for table: {}",
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index 21b08cc..630ac18 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -100,15 +100,15 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     ExternalView brokerResourceEV = HelixHelper
         .getExternalViewForResource(_helixManager.getClusterManagmentTool(), 
_helixManager.getClusterName(),
             CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
-    initTableQueryQuota(tableConfig, brokerResourceEV);
+    initOrUpdateTableQueryQuota(tableConfig, brokerResourceEV);
   }
 
   /**
-   * Initialize dynamic rate limiter with table query quota.
+   * Initialize or update dynamic rate limiter with table query quota.
    * @param tableConfig table config.
    * @param brokerResource broker resource which stores all the broker states 
of each table.
    */
-  public void initTableQueryQuota(TableConfig tableConfig, ExternalView 
brokerResource) {
+  public void initOrUpdateTableQueryQuota(TableConfig tableConfig, 
ExternalView brokerResource) {
     String tableNameWithType = tableConfig.getTableName();
     LOGGER.info("Initializing rate limiter for table {}", tableNameWithType);
 
@@ -118,7 +118,7 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
       LOGGER.info("No qps config specified for table: {}", tableNameWithType);
       removeRateLimiter(tableNameWithType);
     } else {
-      createRateLimiter(tableNameWithType, brokerResource, quotaConfig);
+      createOrUpdateRateLimiter(tableNameWithType, brokerResource, 
quotaConfig);
     }
   }
 
@@ -156,12 +156,13 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
   }
 
   /**
-   * Create a rate limiter for a table.
+   * Create or update a rate limiter for a table.
    * @param tableNameWithType table name with table type.
    * @param brokerResource broker resource which stores all the broker states 
of each table.
    * @param quotaConfig quota config of the table.
    */
-  private void createRateLimiter(String tableNameWithType, ExternalView 
brokerResource, QuotaConfig quotaConfig) {
+  private void createOrUpdateRateLimiter(String tableNameWithType, 
ExternalView brokerResource,
+      QuotaConfig quotaConfig) {
     if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null) {
       LOGGER.info("No qps config specified for table: {}", tableNameWithType);
       return;
@@ -194,15 +195,30 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     // Get stat from property store
     String tableConfigPath = constructTableConfigPath(tableNameWithType);
     Stat stat = _propertyStore.getStat(tableConfigPath, 
AccessOption.PERSISTENT);
-
     double perBrokerRate = overallRate / onlineCount;
-    QueryQuotaEntity queryQuotaEntity =
-        new QueryQuotaEntity(RateLimiter.create(perBrokerRate), new 
HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
-            new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), 
onlineCount, overallRate, stat.getVersion());
-    _rateLimiterMap.put(tableNameWithType, queryQuotaEntity);
-    LOGGER.info(
-        "Rate limiter for table: {} has been initialized. Overall rate: {}. 
Per-broker rate: {}. Number of online broker instances: {}. Table config stat 
version: {}",
-        tableNameWithType, overallRate, perBrokerRate, onlineCount, 
stat.getVersion());
+
+    QueryQuotaEntity queryQuotaEntity = _rateLimiterMap.get(tableNameWithType);
+    if (queryQuotaEntity == null) {
+      queryQuotaEntity =
+          new QueryQuotaEntity(RateLimiter.create(perBrokerRate), new 
HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
+              new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), 
onlineCount, overallRate, stat.getVersion());
+      _rateLimiterMap.put(tableNameWithType, queryQuotaEntity);
+      LOGGER.info(
+          "Rate limiter for table: {} has been initialized. Overall rate: {}. 
Per-broker rate: {}. Number of online broker instances: {}. Table config stat 
version: {}",
+          tableNameWithType, overallRate, perBrokerRate, onlineCount, 
stat.getVersion());
+    } else {
+      queryQuotaEntity.getRateLimiter().setRate(perBrokerRate);
+      queryQuotaEntity.setNumOnlineBrokers(onlineCount);
+      queryQuotaEntity.setOverallRate(overallRate);
+      queryQuotaEntity.setTableConfigStatVersion(stat.getVersion());
+      LOGGER.info(
+          "Rate limiter for table: {} has been updated. Overall rate: {}. 
Per-broker rate: {}. Number of online broker instances: {}. Table config stat 
version: {}",
+          tableNameWithType, overallRate, perBrokerRate, onlineCount, 
stat.getVersion());
+    }
+
+    final QueryQuotaEntity finalQueryQuotaEntity = queryQuotaEntity;
+    _brokerMetrics.addCallbackTableGaugeIfNeeded(tableNameWithType, 
BrokerGauge.MAX_BURST_QPS,
+        () -> (long) 
finalQueryQuotaEntity.getMaxQpsTracker().getMaxCountPerBucket());
     if (isQueryRateLimitDisabled()) {
       LOGGER.info("Query rate limiting is currently disabled for this broker. 
So it won't take effect immediately.");
     }
@@ -269,9 +285,6 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
       LOGGER.debug("The percentage of rate limit capacity utilization is {}", 
percentageOfCapacityUtilization);
       _brokerMetrics.setValueOfTableGauge(tableNameWithType, 
BrokerGauge.QUERY_QUOTA_CAPACITY_UTILIZATION_RATE,
           percentageOfCapacityUtilization);
-
-      _brokerMetrics.addCallbackTableGaugeIfNeeded(tableNameWithType, 
BrokerGauge.MAX_BURST_QPS,
-          () -> (long) 
queryQuotaEntity.getMaxQpsTracker().getMaxCountPerBucket());
     }
 
     if (!rateLimiter.tryAcquire()) {
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
index a302eed..e7ca822 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
@@ -149,7 +149,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
     ZKMetadataProvider
         .setOfflineTableConfig(_testPropertyStore, OFFLINE_TABLE_NAME, 
TableConfigUtils.toZNRecord(tableConfig));
     setQps(tableConfig);
-    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, 
brokerResource);
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
 
     // All the request should be passed.
@@ -164,7 +164,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
       throws Exception {
     ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
-    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, 
brokerResource);
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
@@ -181,7 +181,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
 
     ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
-    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, 
brokerResource);
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
 
     // Nothing happened since it doesn't have qps quota.
@@ -202,7 +202,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
 
     ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
-    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, 
brokerResource);
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
 
     // Drop the offline table won't have any affect since it is table type 
specific.
@@ -233,9 +233,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
         .setOfflineTableConfig(_testPropertyStore, OFFLINE_TABLE_NAME, 
TableConfigUtils.toZNRecord(offlineTableConfig));
 
     // Since each table has 2 online brokers, per broker rate becomes 100.0 / 
2 = 50.0
-    _queryQuotaManager.initTableQueryQuota(offlineTableConfig, brokerResource);
+    _queryQuotaManager.initOrUpdateTableQueryQuota(offlineTableConfig, 
brokerResource);
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
-    _queryQuotaManager.initTableQueryQuota(realtimeTableConfig, 
brokerResource);
+    _queryQuotaManager.initOrUpdateTableQueryQuota(realtimeTableConfig, 
brokerResource);
     // The hash map now contains 2 entries for both of the tables.
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 2);
 
@@ -259,7 +259,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
     ZKMetadataProvider
         .setRealtimeTableConfig(_testPropertyStore, REALTIME_TABLE_NAME, 
TableConfigUtils.toZNRecord(tableConfig));
     setQps(tableConfig);
-    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, 
brokerResource);
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
 
     runQueries(70, 10L);
@@ -276,7 +276,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
     ZKMetadataProvider
         .setRealtimeTableConfig(_testPropertyStore, REALTIME_TABLE_NAME, 
TableConfigUtils.toZNRecord(tableConfig));
     setQps(tableConfig);
-    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, 
brokerResource);
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
 
     runQueries(70, 10L);
@@ -291,7 +291,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
       throws Exception {
     ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
-    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, 
brokerResource);
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
@@ -308,7 +308,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
 
     ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
-    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, 
brokerResource);
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
@@ -325,7 +325,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
 
     ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
-    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, 
brokerResource);
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
@@ -334,7 +334,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
       throws Exception {
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
     setQps(tableConfig);
-    _queryQuotaManager.initTableQueryQuota(tableConfig, null);
+    _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, null);
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
@@ -346,7 +346,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
     ZKMetadataProvider
         .setOfflineTableConfig(_testPropertyStore, OFFLINE_TABLE_NAME, 
TableConfigUtils.toZNRecord(tableConfig));
     setQps(tableConfig);
-    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, 
brokerResource);
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
   }
 
@@ -359,7 +359,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
     ZKMetadataProvider
         .setOfflineTableConfig(_testPropertyStore, OFFLINE_TABLE_NAME, 
TableConfigUtils.toZNRecord(tableConfig));
     setQps(tableConfig);
-    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, 
brokerResource);
 
     // For the 1st version we don't check the number of online brokers.
     // Thus the expected size now is 1. It'll be 0 when we bring dynamic rate 
back.
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
index 44a53c0..8839b41 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
@@ -154,7 +154,7 @@ public abstract class AbstractMetrics<QP extends 
AbstractMetrics.QueryPhase, M e
     PinotTimer timer = PinotMetricUtils.makePinotTimer(_metricsRegistry, 
metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
     if (timer != null) {
       timer.update(duration, timeUnit);
-    }  
+    }
    }
 
   /**

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to