This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 6cd4b60 Add 'AggregateMetrics' rule to RuleEngine (#6789) 6cd4b60 is described below commit 6cd4b604d2f6e7513fdf7b4730e6054c9f2dc080 Author: Sajjad Moradi <moradi.saj...@gmail.com> AuthorDate: Wed Apr 21 11:42:21 2021 -0500 Add 'AggregateMetrics' rule to RuleEngine (#6789) * Add 'AggregateMetrics' rule to RuleEngine * Fix comments * Use proper import statement after rebase * Add checking for column names being metric --- .../controller/recommender/io/ConfigManager.java | 10 ++ .../recommender/rules/RulesToExecute.java | 14 +++ .../rules/impl/AggregateMetricsRule.java | 85 ++++++++++++++ .../rules/impl/RealtimeProvisioningRule.java | 9 +- .../rules/io/params/RecommenderConstants.java | 1 + .../controller/recommender/TestConfigEngine.java | 19 ++- .../rules/impl/AggregateMetricsRuleTest.java | 129 +++++++++++++++++++++ .../AggregateMetricsRuleInput.json | 64 ++++++++++ 8 files changed, 324 insertions(+), 7 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/ConfigManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/ConfigManager.java index 1e67abe..1e43893 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/ConfigManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/ConfigManager.java @@ -41,6 +41,7 @@ public class ConfigManager { PartitionConfig _partitionConfig = new PartitionConfig(); FlaggedQueries _flaggedQueries = new FlaggedQueries(); Map<String, Map<String, String>> _realtimeProvisioningRecommendations = new HashMap<>(); + boolean _aggregateMetrics = false; @JsonSetter(nulls = Nulls.SKIP) public void setIndexConfig(IndexConfig indexConfig) { @@ -63,6 +64,11 @@ public class ConfigManager { _realtimeProvisioningRecommendations = realtimeProvisioningRecommendation; } + @JsonSetter(nulls = Nulls.SKIP) + public void setAggregateMetrics(boolean aggregateMetrics) { + _aggregateMetrics = aggregateMetrics; + } + public IndexConfig getIndexConfig() { return _indexConfig; } @@ -78,4 +84,8 @@ public class ConfigManager { public Map<String, Map<String, String>> getRealtimeProvisioningRecommendations() { return _realtimeProvisioningRecommendations; } + + public boolean isAggregateMetrics() { + return _aggregateMetrics; + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/RulesToExecute.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/RulesToExecute.java index cd2608b..50d65f8 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/RulesToExecute.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/RulesToExecute.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonSetter; import com.fasterxml.jackson.annotation.Nulls; import org.apache.pinot.controller.recommender.io.ConfigManager; import org.apache.pinot.controller.recommender.io.InputManager; +import org.apache.pinot.controller.recommender.rules.impl.AggregateMetricsRule; import org.apache.pinot.controller.recommender.rules.impl.BloomFilterRule; import org.apache.pinot.controller.recommender.rules.impl.FlagQueryRule; import org.apache.pinot.controller.recommender.rules.impl.InvertedSortedIndexJointRule; @@ -58,6 +59,8 @@ public class RulesToExecute { return new NoDictionaryOnHeapDictionaryJointRule(inputManager, outputManager); case VariedLengthDictionaryRule: return new VariedLengthDictionaryRule(inputManager, outputManager); + case AggregateMetricsRule: + return new AggregateMetricsRule(inputManager, outputManager); case RealtimeProvisioningRule: return new RealtimeProvisioningRule(inputManager, outputManager); default: @@ -73,6 +76,7 @@ public class RulesToExecute { boolean _recommendNoDictionaryOnHeapDictionaryJoint = DEFAULT_RECOMMEND_NO_DICTIONARY_ONHEAP_DICTIONARY_JOINT; boolean _recommendVariedLengthDictionary = DEFAULT_RECOMMEND_VARIED_LENGTH_DICTIONARY; boolean _recommendFlagQuery = DEFAULT_RECOMMEND_FLAG_QUERY; + boolean _recommendAggregateMetrics = DEFAULT_RECOMMEND_AGGREGATE_METRICS; boolean _recommendRealtimeProvisioning = DEFAULT_RECOMMEND_REALTIME_PROVISIONING; @JsonSetter(nulls = Nulls.SKIP) @@ -111,6 +115,11 @@ public class RulesToExecute { } @JsonSetter(nulls = Nulls.SKIP) + public void setRecommendAggregateMetrics(boolean aggregateMetrics) { + _recommendAggregateMetrics = aggregateMetrics; + } + + @JsonSetter(nulls = Nulls.SKIP) public void setRecommendRealtimeProvisioning(boolean recommendRealtimeProvisioning) { _recommendPinotTablePartition = recommendRealtimeProvisioning; } @@ -143,6 +152,10 @@ public class RulesToExecute { return _recommendBloomFilter; } + public boolean isRecommendAggregateMetrics() { + return _recommendAggregateMetrics; + } + public boolean isRecommendRealtimeProvisioning() { return _recommendRealtimeProvisioning; } @@ -157,6 +170,7 @@ public class RulesToExecute { VariedLengthDictionaryRule, // VariedLengthDictionaryRule must go after NoDictionaryOnHeapDictionaryJointRule since we do not recommend dictionary on NoDictionary cols PinotTablePartitionRule, // PinotTablePartitionRule must go after KafkaPartitionRule to recommend realtime partitions, after NoDictionaryOnHeapDictionaryJointRule to correctly calculate record size BloomFilterRule, + AggregateMetricsRule, RealtimeProvisioningRule // this rule must be the last one because it needs the output of other rules as its input } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/AggregateMetricsRule.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/AggregateMetricsRule.java new file mode 100644 index 0000000..34b3cf6 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/AggregateMetricsRule.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.controller.recommender.rules.impl; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FunctionContext; +import org.apache.pinot.controller.recommender.exceptions.InvalidInputException; +import org.apache.pinot.controller.recommender.io.ConfigManager; +import org.apache.pinot.controller.recommender.io.InputManager; +import org.apache.pinot.controller.recommender.rules.AbstractRule; + +import static org.apache.pinot.controller.recommender.rules.io.params.RecommenderConstants.HYBRID; +import static org.apache.pinot.controller.recommender.rules.io.params.RecommenderConstants.REALTIME; + + +/** + * This rule checks the provided queries and suggests the value for 'AggregateMetrics' flag in table config. + * It looks at selection columns and if all of them are SUM function, the flag should be true, otherwise it's false. + * It also checks if all column names appearing in sum function are in fact metric columns. + */ +public class AggregateMetricsRule extends AbstractRule { + + public AggregateMetricsRule(InputManager input, ConfigManager output) { + super(input, output); + } + + @Override + public void run() + throws InvalidInputException { + String tableType = _input.getTableType(); + if ((tableType.equalsIgnoreCase(REALTIME) || tableType.equalsIgnoreCase(HYBRID))) { + _output.setAggregateMetrics(shouldAggregate(_input)); + } + } + + private boolean shouldAggregate(InputManager inputManager) { + Set<String> metricNames = new HashSet<>(inputManager.getSchema().getMetricNames()); + for (String query : inputManager.getParsedQueries()) { + for (ExpressionContext selectExpr : inputManager.getQueryContext(query).getSelectExpressions()) { + FunctionContext funcCtx = selectExpr.getFunction(); + if (selectExpr.getType() != ExpressionContext.Type.FUNCTION + || !funcCtx.getFunctionName().equalsIgnoreCase("SUM") + || hasNonMetricArguments(funcCtx.getArguments(), metricNames)) { + return false; + } + } + } + return true; + } + + private boolean hasNonMetricArguments(List<ExpressionContext> arguments, Set<String> metricNames) { + for (ExpressionContext arg : arguments) { + if (arg.getType() == ExpressionContext.Type.IDENTIFIER) { + if (!metricNames.contains(arg.getIdentifier())) { + return true; + } + } else if (arg.getType() == ExpressionContext.Type.FUNCTION) { + if (hasNonMetricArguments(arg.getFunction().getArguments(), metricNames)) { + return true; + } + } + } + return false; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/RealtimeProvisioningRule.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/RealtimeProvisioningRule.java index 0173f43..5f34fda 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/RealtimeProvisioningRule.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/RealtimeProvisioningRule.java @@ -71,7 +71,8 @@ public class RealtimeProvisioningRule extends AbstractRule { } // prepare input to memory estimator - TableConfig tableConfig = createTableConfig(_output.getIndexConfig(), _input.getSchema()); + TableConfig tableConfig = + createTableConfig(_output.getIndexConfig(), _input.getSchema(), _output.isAggregateMetrics()); long maxUsableHostMemoryByte = DataSizeUtils.toBytes(_params.getMaxUsableHostMemory()); int totalConsumingPartitions = _params.getNumPartitions() * _params.getNumReplicas(); int ingestionRatePerPartition = (int) _input.getNumMessagesPerSecInKafkaTopic() / _params.getNumPartitions(); @@ -96,7 +97,7 @@ public class RealtimeProvisioningRule extends AbstractRule { extractResults(memoryEstimator, numHosts, numHours, _output.getRealtimeProvisioningRecommendations()); } - private TableConfig createTableConfig(IndexConfig indexConfig, Schema schema) { + private TableConfig createTableConfig(IndexConfig indexConfig, Schema schema, boolean aggregateMetrics) { TableConfigBuilder tableConfigBuilder = new TableConfigBuilder(TableType.REALTIME); tableConfigBuilder.setTableName(schema.getSchemaName()); tableConfigBuilder.setLoadMode("MMAP"); @@ -109,7 +110,9 @@ public class RealtimeProvisioningRule extends AbstractRule { setIfNotEmpty(indexConfig.getOnHeapDictionaryColumns(), tableConfigBuilder::setOnHeapDictionaryColumns); setIfNotEmpty(indexConfig.getVariedLengthDictionaryColumns(), tableConfigBuilder::setVarLengthDictionaryColumns); - return tableConfigBuilder.build(); + TableConfig tableConfig = tableConfigBuilder.build(); + tableConfig.getIndexingConfig().setAggregateMetrics(aggregateMetrics); + return tableConfig; } private void setIfNotEmpty(String colName, Consumer<String> func) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java index db4c274..67d7f31 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java @@ -44,6 +44,7 @@ public class RecommenderConstants { public static final boolean DEFAULT_RECOMMEND_INVERTED_SORTED_INDEX_JOINT = true; public static final boolean DEFAULT_RECOMMEND_BLOOM_FILTER = true; public static final boolean DEFAULT_RECOMMEND_NO_DICTIONARY_ONHEAP_DICTIONARY_JOINT = true; + public static final boolean DEFAULT_RECOMMEND_AGGREGATE_METRICS = true; public static final boolean DEFAULT_RECOMMEND_REALTIME_PROVISIONING = true; } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java index 257142c..555b0a1 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java @@ -370,16 +370,27 @@ public class TestConfigEngine { testRealtimeProvisioningRule("recommenderInput/RealtimeProvisioningInput_dateTimeColumn.json"); } + @Test + void testAggregateMetricsRule() throws Exception { + ConfigManager output = runRecommenderDriver("recommenderInput/AggregateMetricsRuleInput.json"); + assertTrue(output.isAggregateMetrics()); + } + private void testRealtimeProvisioningRule(String fileName) throws Exception { - String input = readInputToStr(fileName); - String output = RecommenderDriver.run(input); - ConfigManager configManager = objectMapper.readValue(output, ConfigManager.class); - Map<String, Map<String, String>> recommendations = configManager.getRealtimeProvisioningRecommendations(); + ConfigManager output = runRecommenderDriver(fileName); + Map<String, Map<String, String>> recommendations = output.getRealtimeProvisioningRecommendations(); assertRealtimeProvisioningRecommendation(recommendations.get(OPTIMAL_SEGMENT_SIZE)); assertRealtimeProvisioningRecommendation(recommendations.get(CONSUMING_MEMORY_PER_HOST)); assertRealtimeProvisioningRecommendation(recommendations.get(TOTAL_MEMORY_USED_PER_HOST)); } + private ConfigManager runRecommenderDriver(String fileName) + throws IOException, InvalidInputException { + String input = readInputToStr(fileName); + String output = RecommenderDriver.run(input); + return objectMapper.readValue(output, ConfigManager.class); + } + private void assertRealtimeProvisioningRecommendation(Map<String, String> matrix) { for (int i = 2; i < 13; i += 2) { assertTrue(matrix.containsKey(String.format("numHours - %2d", i))); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/rules/impl/AggregateMetricsRuleTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/rules/impl/AggregateMetricsRuleTest.java new file mode 100644 index 0000000..ccc06a6 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/rules/impl/AggregateMetricsRuleTest.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.controller.recommender.rules.impl; + +import com.google.common.collect.ImmutableSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.controller.recommender.io.ConfigManager; +import org.apache.pinot.controller.recommender.io.InputManager; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.MetricFieldSpec; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class AggregateMetricsRuleTest { + + @Test + public void testRun() + throws Exception { + Set<String> metrics = ImmutableSet.of("a", "b", "c"); + InputManager input = createInput(metrics, "select sum(a), sum(b), sum(c) from tableT", "select sum(a) from tableT2"); + ConfigManager output = new ConfigManager(); + AggregateMetricsRule rule = new AggregateMetricsRule(input, output); + rule.run(); + assertTrue(output.isAggregateMetrics()); + } + + @Test + public void testRun_nonAggregate() + throws Exception { + Set<String> metrics = ImmutableSet.of("a", "b", "c"); + InputManager input = createInput(metrics, "select sum(a), sum(b), sum(c) from tableT", "select sum(a), b from tableT2"); + ConfigManager output = new ConfigManager(); + AggregateMetricsRule rule = new AggregateMetricsRule(input, output); + rule.run(); + assertFalse(output.isAggregateMetrics()); + } + + @Test + public void testRun_nonAggregate_withNonSumFunction() + throws Exception { + Set<String> metrics = ImmutableSet.of("a", "b", "c"); + InputManager input = createInput(metrics, "select sum(a), sum(b), max(c) from tableT"); + ConfigManager output = new ConfigManager(); + AggregateMetricsRule rule = new AggregateMetricsRule(input, output); + rule.run(); + assertFalse(output.isAggregateMetrics()); + } + + @Test + public void testRun_nonMetricColumnInSum() + throws Exception { + Set<String> metrics = ImmutableSet.of("a", "b", "c"); + InputManager input = createInput(metrics, "select sum(a), sum(b), sum(X) from tableT"); + ConfigManager output = new ConfigManager(); + AggregateMetricsRule rule = new AggregateMetricsRule(input, output); + rule.run(); + assertFalse(output.isAggregateMetrics()); + } + + @Test + public void testRun_complexExpressionInSum_withMetricColumns() + throws Exception { + Set<String> metrics = ImmutableSet.of("a", "b", "c"); + InputManager input = createInput(metrics, "select sum(a), sum(b), sum(2 * a + 3 * b + c) from tableT"); + ConfigManager output = new ConfigManager(); + AggregateMetricsRule rule = new AggregateMetricsRule(input, output); + rule.run(); + assertTrue(output.isAggregateMetrics()); + } + + @Test + public void testRun_complexExpressionInSum_withSomeNonMetricColumns() + throws Exception { + Set<String> metrics = ImmutableSet.of("a", "b", "c"); + InputManager input = createInput(metrics, "select sum(a), sum(b), sum(2 * a + 3 * b + X) from tableT"); + ConfigManager output = new ConfigManager(); + AggregateMetricsRule rule = new AggregateMetricsRule(input, output); + rule.run(); + assertFalse(output.isAggregateMetrics()); + } + + @Test + public void testRun_offlineTable() + throws Exception { + Set<String> metrics = ImmutableSet.of("a", "b", "c"); + InputManager input = createInput(metrics, "select sum(a), sum(b), sum(c) from tableT"); + input.setTableType("OFFLINE"); + ConfigManager output = new ConfigManager(); + AggregateMetricsRule rule = new AggregateMetricsRule(input, output); + rule.run(); + assertFalse(output.isAggregateMetrics()); + } + + + private InputManager createInput(Set<String> metricNames, String... queries) + throws Exception { + InputManager input = new InputManager(); + Map<String, Double> queryWithWeight = new HashMap<>(); + for (String query : queries) { + queryWithWeight.put(query, 1.0); + } + input.setQueryWeightMap(queryWithWeight); + input.setTableType("Realtime"); + metricNames.forEach(metric -> input.getSchema().addField(new MetricFieldSpec(metric, FieldSpec.DataType.INT))); + input.init(); + return input; + } +} diff --git a/pinot-controller/src/test/resources/recommenderInput/AggregateMetricsRuleInput.json b/pinot-controller/src/test/resources/recommenderInput/AggregateMetricsRuleInput.json new file mode 100644 index 0000000..ac265a5 --- /dev/null +++ b/pinot-controller/src/test/resources/recommenderInput/AggregateMetricsRuleInput.json @@ -0,0 +1,64 @@ +{ + "schema":{ + "schemaName": "tableSchema", + "dimensionFieldSpecs": [ + { + "name": "a", + "dataType": "INT", + "cardinality":20, + "numValuesPerEntry":1 + } + ], + "metricFieldSpecs": [ + { + "name": "k", + "dataType": "DOUBLE", + "cardinality":1000, + "numValuesPerEntry":1, + "averageLength" : 100 + }, + { + "name": "l", + "dataType": "STRING", + "cardinality":1000, + "numValuesPerEntry":1, + "averageLength" : 10 + }, + { + "name": "m", + "dataType": "BYTES", + "cardinality":1000, + "numValuesPerEntry":1, + "averageLength" : 25 + } + ], + "dateTimeFieldSpecs": [ + { + "name": "t", + "dataType": "INT", + "format": "1:DAYS:EPOCH", + "granularity": "1:DAYS", + "cardinality": 1000 + } + ] + }, + "queriesWithWeights":{ + "select sum(k) from tableName where a in (2,4)": 1, + "select sum(l), sum(m) from tableName where a in (1,3)": 1, + "select sum(2 * k + 3 * l - 4 / m) from tableName where a = 5": 1 + }, + "qps": 150, + "numMessagesPerSecInKafkaTopic":1000, + "numRecordsPerPush":10000, + "tableType": "HYBRID", + "latencySLA": 500, + "rulesToExecute": { + "recommendAggregateMetrics": true + }, + "overWrittenConfigs": { + "indexConfig": { + "invertedIndexColumns": ["a","b"], + "rangeIndexColumns": ["f"] + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org