This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch enable-hit-counter-and-max-hit-rate-tracker in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 5d3c2950fe172b0ddee9f64ed44262ab004ecce0 Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> AuthorDate: Fri Jul 30 13:44:00 2021 -0700 Enable hit counter and max hit rate tracker --- .../HelixExternalViewBasedQueryQuotaManager.java | 73 +++++++++++++++------- .../pinot/broker/queryquota/QueryQuotaEntity.java | 4 ++ ...elixExternalViewBasedQueryQuotaManagerTest.java | 28 ++++++--- 3 files changed, 77 insertions(+), 28 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java index 630ac18..49fa9ea 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java @@ -51,7 +51,6 @@ import org.slf4j.LoggerFactory; * This class is to support the qps quota feature. * It depends on the broker source change to update the dynamic rate limit, * which means it only gets updated when a new table added or a broker restarted. - * TODO: support adding new rate limiter for existing tables without restarting the broker. */ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHandler, QueryQuotaManager { private static final Logger LOGGER = LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class); @@ -82,9 +81,8 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan @Override public void processClusterChange(HelixConstants.ChangeType changeType) { - Preconditions - .checkState(changeType == HelixConstants.ChangeType.EXTERNAL_VIEW - || changeType == HelixConstants.ChangeType.INSTANCE_CONFIG, "Illegal change type: " + changeType); + Preconditions.checkState(changeType == HelixConstants.ChangeType.EXTERNAL_VIEW + || changeType == HelixConstants.ChangeType.INSTANCE_CONFIG, "Illegal change type: " + changeType); if (changeType == HelixConstants.ChangeType.EXTERNAL_VIEW) { ExternalView brokerResourceEV = HelixHelper .getExternalViewForResource(_helixManager.getClusterManagmentTool(), _helixManager.getClusterName(), @@ -109,17 +107,15 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan * @param brokerResource broker resource which stores all the broker states of each table. */ public void initOrUpdateTableQueryQuota(TableConfig tableConfig, ExternalView brokerResource) { + if (tableConfig == null) { + LOGGER.info("No query quota to update since table config is null"); + return; + } String tableNameWithType = tableConfig.getTableName(); LOGGER.info("Initializing rate limiter for table {}", tableNameWithType); // Create rate limiter if query quota config is specified. - QuotaConfig quotaConfig = tableConfig.getQuotaConfig(); - if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null) { - LOGGER.info("No qps config specified for table: {}", tableNameWithType); - removeRateLimiter(tableNameWithType); - } else { - createOrUpdateRateLimiter(tableNameWithType, brokerResource, quotaConfig); - } + createOrUpdateRateLimiter(tableNameWithType, brokerResource, tableConfig.getQuotaConfig()); } /** @@ -138,10 +134,6 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan _rateLimiterMap.remove(tableNameWithType); } - public boolean containsRateLimiterForTable(String tableNameWithType) { - return _rateLimiterMap.containsKey(tableNameWithType); - } - /** * Get QuotaConfig from property store. * @param tableNameWithType table name with table type. @@ -165,11 +157,13 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan QuotaConfig quotaConfig) { if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null) { LOGGER.info("No qps config specified for table: {}", tableNameWithType); + buildEmptyOrResetRateLimiterInQueryQuotaEntity(tableNameWithType); return; } if (brokerResource == null) { LOGGER.warn("Failed to init qps quota for table {}. No broker resource connected!", tableNameWithType); + buildEmptyOrResetRateLimiterInQueryQuotaEntity(tableNameWithType); return; } @@ -207,7 +201,13 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan "Rate limiter for table: {} has been initialized. Overall rate: {}. Per-broker rate: {}. Number of online broker instances: {}. Table config stat version: {}", tableNameWithType, overallRate, perBrokerRate, onlineCount, stat.getVersion()); } else { - queryQuotaEntity.getRateLimiter().setRate(perBrokerRate); + RateLimiter rateLimiter = queryQuotaEntity.getRateLimiter(); + if (rateLimiter == null) { + rateLimiter = RateLimiter.create(perBrokerRate); + queryQuotaEntity.setRateLimiter(rateLimiter); + } else { + rateLimiter.setRate(perBrokerRate); + } queryQuotaEntity.setNumOnlineBrokers(onlineCount); queryQuotaEntity.setOverallRate(overallRate); queryQuotaEntity.setTableConfigStatVersion(stat.getVersion()); @@ -225,6 +225,22 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan } /** + * Build an empty rate limiter in the new query quota entity, or set the rate limiter to null in an existing query quota entity. + */ + private void buildEmptyOrResetRateLimiterInQueryQuotaEntity(String tableNameWithType) { + QueryQuotaEntity queryQuotaEntity = _rateLimiterMap.get(tableNameWithType); + if (queryQuotaEntity == null) { + // Create an QueryQuotaEntity object without setting a rate limiter. + queryQuotaEntity = new QueryQuotaEntity(null, new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), + new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), 0, 0, 0); + _rateLimiterMap.put(tableNameWithType, queryQuotaEntity); + } else { + // Set rate limiter to null for an existing QueryQuotaEntity object. + queryQuotaEntity.setRateLimiter(null); + } + } + + /** * {@inheritDoc} * <p>Acquires a token from rate limiter based on the table name. * @@ -276,6 +292,10 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan queryQuotaEntity.getMaxQpsTracker().hit(); RateLimiter rateLimiter = queryQuotaEntity.getRateLimiter(); + // Return true if no rate limiter is initialized. + if (rateLimiter == null) { + return true; + } double perBrokerRate = rateLimiter.getRate(); // Emit the qps capacity utilization rate. @@ -302,6 +322,11 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan } @VisibleForTesting + public QueryQuotaEntity getRateLimiterForTable(String tableNameWithType) { + return _rateLimiterMap.get(tableNameWithType); + } + + @VisibleForTesting public void cleanUpRateLimiterMap() { _rateLimiterMap.clear(); } @@ -328,6 +353,10 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan Map.Entry<String, QueryQuotaEntity> entry = it.next(); String tableNameWithType = entry.getKey(); QueryQuotaEntity queryQuotaEntity = entry.getValue(); + if (queryQuotaEntity.getRateLimiter() == null) { + // No rate limiter set, skip this table. + continue; + } // Get number of online brokers. Map<String, String> stateMap = currentBrokerResource.getStateMap(tableNameWithType); @@ -406,12 +435,14 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan private void getQueryQuotaEnabledFlagFromInstanceConfig() { try { - Map<String, String> instanceConfigsMap = - HelixHelper.getInstanceConfigsMapFor(_instanceId, _helixManager.getClusterName(), _helixManager.getClusterManagmentTool()); - String queryRateLimitDisabled = instanceConfigsMap.getOrDefault(CommonConstants.Helix.QUERY_RATE_LIMIT_DISABLED, "false"); + Map<String, String> instanceConfigsMap = HelixHelper + .getInstanceConfigsMapFor(_instanceId, _helixManager.getClusterName(), + _helixManager.getClusterManagmentTool()); + String queryRateLimitDisabled = + instanceConfigsMap.getOrDefault(CommonConstants.Helix.QUERY_RATE_LIMIT_DISABLED, "false"); _queryRateLimitDisabled = Boolean.parseBoolean(queryRateLimitDisabled); - LOGGER.info("Set query rate limiting to: {} for all {} tables in this broker.", _queryRateLimitDisabled ? "DISABLED" : "ENABLED", - _rateLimiterMap.size()); + LOGGER.info("Set query rate limiting to: {} for all {} tables in this broker.", + _queryRateLimitDisabled ? "DISABLED" : "ENABLED", _rateLimiterMap.size()); } catch (ZkNoNodeException e) { // It's a brand new broker. Skip checking instance config. _queryRateLimitDisabled = false; diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java index 9c243ae..82d8691 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java @@ -40,6 +40,10 @@ public class QueryQuotaEntity { _tableConfigStatVersion = tableConfigStatVersion; } + public void setRateLimiter(RateLimiter rateLimiter) { + _rateLimiter = rateLimiter; + } + public RateLimiter getRateLimiter() { return _rateLimiter; } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java index c24e407..2a307f2 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java @@ -165,7 +165,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME); TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME); _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); - Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); + Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1); + QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(OFFLINE_TABLE_NAME); + Assert.assertNull(queryQuotaEntity.getRateLimiter()); } @Test @@ -182,7 +184,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME); TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME); _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); - Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); + Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1); + QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(OFFLINE_TABLE_NAME); + Assert.assertNull(queryQuotaEntity.getRateLimiter()); // Nothing happened since it doesn't have qps quota. _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME); @@ -203,7 +207,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME); TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME); _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); - Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); + Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1); + QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(OFFLINE_TABLE_NAME); + Assert.assertNull(queryQuotaEntity.getRateLimiter()); // Drop the offline table won't have any affect since it is table type specific. _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME); @@ -292,7 +298,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME); TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME); _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); - Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); + Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1); + QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(REALTIME_TABLE_NAME); + Assert.assertNull(queryQuotaEntity.getRateLimiter()); } @Test @@ -309,7 +317,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME); TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME); _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); - Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); + Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1); + QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(REALTIME_TABLE_NAME); + Assert.assertNull(queryQuotaEntity.getRateLimiter()); } @Test @@ -326,7 +336,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME); TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME); _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource); - Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); + Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1); + QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(REALTIME_TABLE_NAME); + Assert.assertNull(queryQuotaEntity.getRateLimiter()); } @Test @@ -335,7 +347,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest { TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME); setQps(tableConfig); _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, null); - Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0); + Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1); + QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(OFFLINE_TABLE_NAME); + Assert.assertNull(queryQuotaEntity.getRateLimiter()); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org