This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch fix-max-burst-qps in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit d3f89b6aa14976ce01a46912a78d15393628d85f Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> AuthorDate: Mon May 10 10:20:33 2021 -0700 Move maxBurstQps broker metric in createRateLimiter method --- ...okerResourceOnlineOfflineStateModelFactory.java | 2 +- .../HelixExternalViewBasedQueryQuotaManager.java | 47 ++++++++++++++-------- ...elixExternalViewBasedQueryQuotaManagerTest.java | 28 ++++++------- .../pinot/common/metrics/AbstractMetrics.java | 21 +++++----- 4 files changed, 56 insertions(+), 42 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java index b6639e2..6250d1b 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java @@ -79,7 +79,7 @@ public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFact try { _routingManager.buildRouting(tableNameWithType); TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); - _queryQuotaManager.initTableQueryQuota(tableConfig, + _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, _helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(BROKER_RESOURCE_INSTANCE))); } catch (Exception e) { LOGGER.error("Caught exception while processing transition from OFFLINE to ONLINE for table: {}", diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java index 21b08cc..a389245 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java @@ -100,15 +100,15 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan ExternalView brokerResourceEV = HelixHelper .getExternalViewForResource(_helixManager.getClusterManagmentTool(), _helixManager.getClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); - initTableQueryQuota(tableConfig, brokerResourceEV); + initOrUpdateTableQueryQuota(tableConfig, brokerResourceEV); } /** - * Initialize dynamic rate limiter with table query quota. + * Initialize or update dynamic rate limiter with table query quota. * @param tableConfig table config. * @param brokerResource broker resource which stores all the broker states of each table. */ - public void initTableQueryQuota(TableConfig tableConfig, ExternalView brokerResource) { + public void initOrUpdateTableQueryQuota(TableConfig tableConfig, ExternalView brokerResource) { String tableNameWithType = tableConfig.getTableName(); LOGGER.info("Initializing rate limiter for table {}", tableNameWithType); @@ -118,7 +118,7 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan LOGGER.info("No qps config specified for table: {}", tableNameWithType); removeRateLimiter(tableNameWithType); } else { - createRateLimiter(tableNameWithType, brokerResource, quotaConfig); + createOrUpdateRateLimiter(tableNameWithType, brokerResource, quotaConfig); } } @@ -156,12 +156,13 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan } /** - * Create a rate limiter for a table. + * Create or update a rate limiter for a table. * @param tableNameWithType table name with table type. * @param brokerResource broker resource which stores all the broker states of each table. * @param quotaConfig quota config of the table. */ - private void createRateLimiter(String tableNameWithType, ExternalView brokerResource, QuotaConfig quotaConfig) { + private void createOrUpdateRateLimiter(String tableNameWithType, ExternalView brokerResource, + QuotaConfig quotaConfig) { if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null) { LOGGER.info("No qps config specified for table: {}", tableNameWithType); return; @@ -194,15 +195,30 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan // Get stat from property store String tableConfigPath = constructTableConfigPath(tableNameWithType); Stat stat = _propertyStore.getStat(tableConfigPath, AccessOption.PERSISTENT); - double perBrokerRate = overallRate / onlineCount; - QueryQuotaEntity queryQuotaEntity = - new QueryQuotaEntity(RateLimiter.create(perBrokerRate), new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), - new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), onlineCount, overallRate, stat.getVersion()); - _rateLimiterMap.put(tableNameWithType, queryQuotaEntity); - LOGGER.info( - "Rate limiter for table: {} has been initialized. Overall rate: {}. Per-broker rate: {}. Number of online broker instances: {}. Table config stat version: {}", - tableNameWithType, overallRate, perBrokerRate, onlineCount, stat.getVersion()); + + QueryQuotaEntity queryQuotaEntity = _rateLimiterMap.get(tableNameWithType); + if (queryQuotaEntity == null) { + queryQuotaEntity = + new QueryQuotaEntity(RateLimiter.create(perBrokerRate), new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), + new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), onlineCount, overallRate, stat.getVersion()); + _rateLimiterMap.put(tableNameWithType, queryQuotaEntity); + LOGGER.info( + "Rate limiter for table: {} has been initialized. Overall rate: {}. Per-broker rate: {}. Number of online broker instances: {}. Table config stat version: {}", + tableNameWithType, overallRate, perBrokerRate, onlineCount, stat.getVersion()); + } else { + queryQuotaEntity.getRateLimiter().setRate(perBrokerRate); + queryQuotaEntity.setNumOnlineBrokers(onlineCount); + queryQuotaEntity.setOverallRate(overallRate); + queryQuotaEntity.setTableConfigStatVersion(stat.getVersion()); + LOGGER.info( + "Rate limiter for table: {} has been updated. Overall rate: {}. Per-broker rate: {}. Number of online broker instances: {}. Table config stat version: {}", + tableNameWithType, overallRate, perBrokerRate, onlineCount, stat.getVersion()); + } + + final QueryQuotaEntity finalQueryQuotaEntity = queryQuotaEntity; + _brokerMetrics.addCallbackTableGauge(tableNameWithType, BrokerGauge.MAX_BURST_QPS, + () -> (long) finalQueryQuotaEntity.getMaxQpsTracker().getMaxCountPerBucket()); if (isQueryRateLimitDisabled()) { LOGGER.info("Query rate limiting is currently disabled for this broker. So it won't take effect immediately."); } @@ -269,9 +285,6 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan LOGGER.debug("The percentage of rate limit capacity utilization is {}", percentageOfCapacityUtilization); _brokerMetrics.setValueOfTableGauge(tableNameWithType, BrokerGauge.QUERY_QUOTA_CAPACITY_UTILIZATION_RATE, percentageOfCapacityUtilization); - - _brokerMetrics.addCallbackTableGaugeIfNeeded(tableNameWithType, BrokerGauge.MAX_BURST_QPS, - () -> (long) queryQuotaEntity.getMaxQpsTracker().getMaxCountPerBucket()); } if (!rateLimiter.tryAcquire()) { diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java index a302eed..e7ca822 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java @@ -149,7 +149,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { ZKMetadataProvider .setOfflineTableConfig(_testPropertyStore, OFFLINE_TABLE_NAME, TableConfigUtils.toZNRecord(tableConfig)); setQps(tableConfig); - _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource); + _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1); // All the request should be passed. @@ -164,7 +164,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { throws Exception { ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME); TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME); - _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource); + _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); } @@ -181,7 +181,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME); TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME); - _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource); + _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); // Nothing happened since it doesn't have qps quota. @@ -202,7 +202,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME); TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME); - _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource); + _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); // Drop the offline table won't have any affect since it is table type specific. @@ -233,9 +233,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { .setOfflineTableConfig(_testPropertyStore, OFFLINE_TABLE_NAME, TableConfigUtils.toZNRecord(offlineTableConfig)); // Since each table has 2 online brokers, per broker rate becomes 100.0 / 2 = 50.0 - _queryQuotaManager.initTableQueryQuota(offlineTableConfig, brokerResource); + _queryQuotaManager.initOrUpdateTableQueryQuota(offlineTableConfig, brokerResource); Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1); - _queryQuotaManager.initTableQueryQuota(realtimeTableConfig, brokerResource); + _queryQuotaManager.initOrUpdateTableQueryQuota(realtimeTableConfig, brokerResource); // The hash map now contains 2 entries for both of the tables. Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 2); @@ -259,7 +259,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { ZKMetadataProvider .setRealtimeTableConfig(_testPropertyStore, REALTIME_TABLE_NAME, TableConfigUtils.toZNRecord(tableConfig)); setQps(tableConfig); - _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource); + _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1); runQueries(70, 10L); @@ -276,7 +276,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { ZKMetadataProvider .setRealtimeTableConfig(_testPropertyStore, REALTIME_TABLE_NAME, TableConfigUtils.toZNRecord(tableConfig)); setQps(tableConfig); - _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource); + _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1); runQueries(70, 10L); @@ -291,7 +291,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { throws Exception { ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME); TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME); - _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource); + _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); } @@ -308,7 +308,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME); TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME); - _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource); + _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); } @@ -325,7 +325,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME); TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME); - _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource); + _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); } @@ -334,7 +334,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { throws Exception { TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME); setQps(tableConfig); - _queryQuotaManager.initTableQueryQuota(tableConfig, null); + _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, null); Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); } @@ -346,7 +346,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { ZKMetadataProvider .setOfflineTableConfig(_testPropertyStore, OFFLINE_TABLE_NAME, TableConfigUtils.toZNRecord(tableConfig)); setQps(tableConfig); - _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource); + _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1); } @@ -359,7 +359,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { ZKMetadataProvider .setOfflineTableConfig(_testPropertyStore, OFFLINE_TABLE_NAME, TableConfigUtils.toZNRecord(tableConfig)); setQps(tableConfig); - _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource); + _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); // For the 1st version we don't check the number of online brokers. // Thus the expected size now is 1. It'll be 0 when we bring dynamic rate back. diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java index 44a53c0..ad78912 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java @@ -154,7 +154,7 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e PinotTimer timer = PinotMetricUtils.makePinotTimer(_metricsRegistry, metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS); if (timer != null) { timer.update(duration, timeUnit); - } + } } /** @@ -430,12 +430,12 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e } } - public void addCallbackTableGaugeIfNeeded(final String tableName, final G gauge, final Callable<Long> valueCallback) { + public void addCallbackTableGauge(final String tableName, final G gauge, final Callable<Long> valueCallback) { final String fullGaugeName; String gaugeName = gauge.getGaugeName(); fullGaugeName = gaugeName + "." + getTableName(tableName); - addCallbackGaugeIfNeeded(fullGaugeName, valueCallback); + addOrReplaceCallbackGauge(fullGaugeName, valueCallback); } /** @@ -444,13 +444,14 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e * @param metricName The name of the metric * @param valueCallback The callback function used to retrieve the value of the gauge */ - public void addCallbackGaugeIfNeeded(final String metricName, final Callable<Long> valueCallback) { - if (!_gaugeValues.containsKey(metricName)) { - synchronized (_gaugeValues) { - if (!_gaugeValues.containsKey(metricName)) { - _gaugeValues.put(metricName, new AtomicLong(0L)); - addCallbackGauge(metricName, valueCallback); - } + public void addOrReplaceCallbackGauge(final String metricName, final Callable<Long> valueCallback) { + if (_gaugeValues.containsKey(metricName)) { + // TODO: remove callbac + } + synchronized (_gaugeValues) { + if (!_gaugeValues.containsKey(metricName)) { + _gaugeValues.put(metricName, new AtomicLong(0L)); + addCallbackGauge(metricName, valueCallback); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org