This is an automated email from the ASF dual-hosted git repository. richardstartin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d2ca07e Combine Metadata and Dictionary based plans into single operator (#8408) d2ca07e is described below commit d2ca07ee57edb8833ef83e4c2bf40915f1e4879e Author: Richard Startin <rich...@startree.ai> AuthorDate: Tue Mar 29 22:35:23 2022 +0100 Combine Metadata and Dictionary based plans into single operator (#8408) * satisfy queries using datasource metadata when convenient to * only us metadata if min and max value are non-null * merge Metadata and Dictionary based plans into DataSource based plan * rename operator + review comments * update test comments * fix exception message --- .../query/MetadataBasedAggregationOperator.java | 94 ---------------------- ...r.java => NonScanBasedAggregationOperator.java} | 85 ++++++++++++------- .../pinot/core/plan/AggregationPlanNode.java | 73 ++++++++--------- ...adataAndDictionaryAggregationPlanMakerTest.java | 18 ++--- .../pinot/queries/DistinctCountQueriesTest.java | 22 ++--- .../pinot/queries/ExplainPlanQueriesTest.java | 4 +- ...SegmentPartitionedDistinctCountQueriesTest.java | 6 +- ...onaryAggregationPlanClusterIntegrationTest.java | 29 ++++--- .../org/apache/pinot/perf/BenchmarkQueries.java | 7 +- 9 files changed, 136 insertions(+), 202 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/MetadataBasedAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/MetadataBasedAggregationOperator.java deleted file mode 100644 index ea40149..0000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/MetadataBasedAggregationOperator.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.operator.query; - -import com.google.common.base.Preconditions; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import org.apache.pinot.core.common.Operator; -import org.apache.pinot.core.operator.BaseOperator; -import org.apache.pinot.core.operator.ExecutionStatistics; -import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; -import org.apache.pinot.core.query.aggregation.function.AggregationFunction; -import org.apache.pinot.segment.spi.AggregationFunctionType; -import org.apache.pinot.segment.spi.SegmentMetadata; -import org.apache.pinot.segment.spi.datasource.DataSource; - - -/** - * Aggregation operator that utilizes metadata for serving aggregation queries. - */ -@SuppressWarnings("rawtypes") -public class MetadataBasedAggregationOperator extends BaseOperator<IntermediateResultsBlock> { - private static final String OPERATOR_NAME = "MetadataBasedAggregationOperator"; - private static final String EXPLAIN_NAME = "AGGREGATE_METADATA"; - - private final AggregationFunction[] _aggregationFunctions; - private final SegmentMetadata _segmentMetadata; - private final Map<String, DataSource> _dataSourceMap; - - public MetadataBasedAggregationOperator(AggregationFunction[] aggregationFunctions, SegmentMetadata segmentMetadata, - Map<String, DataSource> dataSourceMap) { - _aggregationFunctions = aggregationFunctions; - _segmentMetadata = segmentMetadata; - - // Datasource is currently not used, but will start getting used as we add support for aggregation - // functions other than count(*). - _dataSourceMap = dataSourceMap; - } - - @Override - protected IntermediateResultsBlock getNextBlock() { - int numAggregationFunctions = _aggregationFunctions.length; - List<Object> aggregationResults = new ArrayList<>(numAggregationFunctions); - long numTotalDocs = _segmentMetadata.getTotalDocs(); - for (AggregationFunction aggregationFunction : _aggregationFunctions) { - Preconditions.checkState(aggregationFunction.getType() == AggregationFunctionType.COUNT, - "Metadata based aggregation operator does not support function type: " + aggregationFunction.getType()); - aggregationResults.add(numTotalDocs); - } - - // Build intermediate result block based on aggregation result from the executor. - return new IntermediateResultsBlock(_aggregationFunctions, aggregationResults, false); - } - - @Override - public String getOperatorName() { - return OPERATOR_NAME; - } - - @Override - public String toExplainString() { - return EXPLAIN_NAME; - } - - @Override - public List<Operator> getChildOperators() { - return Collections.emptyList(); - } - - @Override - public ExecutionStatistics getExecutionStatistics() { - // NOTE: Set numDocsScanned to numTotalDocs for backward compatibility. - int numTotalDocs = _segmentMetadata.getTotalDocs(); - return new ExecutionStatistics(numTotalDocs, 0, 0, numTotalDocs); - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java similarity index 69% rename from pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java rename to pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java index 3f0dd87..6408a01 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java @@ -27,9 +27,8 @@ import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; +import java.util.Objects; import java.util.Set; -import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.core.common.ObjectSerDeUtils; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.BaseOperator; @@ -40,80 +39,90 @@ import org.apache.pinot.core.query.aggregation.function.DistinctCountHLLAggregat import org.apache.pinot.core.query.aggregation.function.DistinctCountRawHLLAggregationFunction; import org.apache.pinot.core.query.aggregation.function.DistinctCountSmartHLLAggregationFunction; import org.apache.pinot.segment.local.customobject.MinMaxRangePair; +import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.segment.spi.index.reader.Dictionary; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.utils.ByteArray; /** - * Aggregation operator that utilizes dictionary for serving aggregation queries. - * The dictionary operator is selected in the plan maker, if the query is of aggregation type min, max, minmaxrange - * and the column has a dictionary. + * Aggregation operator that utilizes dictionary or column metadata for serving aggregation queries to avoid scanning. + * The scanless operator is selected in the plan maker, if the query is of aggregation type min, max, minmaxrange, + * distinctcount, distinctcounthll, distinctcountrawhll, segmentpartitioneddistinctcount, distinctcountsmarthll, + * and the column has a dictionary, or has column metadata with min and max value defined. It also supports count(*) if + * the query has no filter. * We don't use this operator if the segment has star tree, - * as the dictionary will have aggregated values for the metrics, and dimensions will have star node value + * as the dictionary will have aggregated values for the metrics, and dimensions will have star node value. * - * For min value, we use the first value from the dictionary - * For max value we use the last value from dictionary + * For min value, we use the first value from the dictionary, falling back to the column metadata min value if there + * is no dictionary. + * For max value we use the last value from dictionary, falling back to the column metadata max value if there + * is no dictionary. */ @SuppressWarnings("rawtypes") -public class DictionaryBasedAggregationOperator extends BaseOperator<IntermediateResultsBlock> { - private static final String OPERATOR_NAME = "DictionaryBasedAggregationOperator"; - private static final String EXPLAIN_NAME = "AGGREGATE_DICTIONARY"; +public class NonScanBasedAggregationOperator extends BaseOperator<IntermediateResultsBlock> { + private static final String OPERATOR_NAME = NonScanBasedAggregationOperator.class.getSimpleName(); + private static final String EXPLAIN_NAME = "AGGREGATE_NO_SCAN"; private final AggregationFunction[] _aggregationFunctions; - private final Map<String, Dictionary> _dictionaryMap; + private final DataSource[] _dataSources; private final int _numTotalDocs; - public DictionaryBasedAggregationOperator(AggregationFunction[] aggregationFunctions, - Map<String, Dictionary> dictionaryMap, int numTotalDocs) { + public NonScanBasedAggregationOperator(AggregationFunction[] aggregationFunctions, + DataSource[] dataSources, int numTotalDocs) { _aggregationFunctions = aggregationFunctions; - _dictionaryMap = dictionaryMap; + _dataSources = dataSources; _numTotalDocs = numTotalDocs; } @Override protected IntermediateResultsBlock getNextBlock() { List<Object> aggregationResults = new ArrayList<>(_aggregationFunctions.length); - for (AggregationFunction aggregationFunction : _aggregationFunctions) { - String column = ((ExpressionContext) aggregationFunction.getInputExpressions().get(0)).getIdentifier(); - Dictionary dictionary = _dictionaryMap.get(column); + for (int i = 0; i < _aggregationFunctions.length; i++) { + AggregationFunction aggregationFunction = _aggregationFunctions[i]; + // note that dataSource will be null for COUNT, sp do not interact with it until it's known this isn't a COUNT + DataSource dataSource = _dataSources[i]; Object result; switch (aggregationFunction.getType()) { + case COUNT: + result = (long) _numTotalDocs; + break; case MIN: case MINMV: - result = toDouble(dictionary.getMinVal()); + result = getMinValue(dataSource); break; case MAX: case MAXMV: - result = toDouble(dictionary.getMaxVal()); + result = getMaxValue(dataSource); break; case MINMAXRANGE: case MINMAXRANGEMV: - result = new MinMaxRangePair(toDouble(dictionary.getMinVal()), toDouble(dictionary.getMaxVal())); + result = new MinMaxRangePair(getMinValue(dataSource), getMaxValue(dataSource)); break; case DISTINCTCOUNT: case DISTINCTCOUNTMV: - result = getDistinctValueSet(dictionary); + result = getDistinctValueSet(Objects.requireNonNull(dataSource.getDictionary())); break; case DISTINCTCOUNTHLL: case DISTINCTCOUNTHLLMV: - result = getDistinctCountHLLResult(dictionary, (DistinctCountHLLAggregationFunction) aggregationFunction); + result = getDistinctCountHLLResult(Objects.requireNonNull(dataSource.getDictionary()), + (DistinctCountHLLAggregationFunction) aggregationFunction); break; case DISTINCTCOUNTRAWHLL: case DISTINCTCOUNTRAWHLLMV: - result = getDistinctCountHLLResult(dictionary, + result = getDistinctCountHLLResult(Objects.requireNonNull(dataSource.getDictionary()), ((DistinctCountRawHLLAggregationFunction) aggregationFunction).getDistinctCountHLLAggregationFunction()); break; case SEGMENTPARTITIONEDDISTINCTCOUNT: - result = (long) dictionary.length(); + result = (long) Objects.requireNonNull(dataSource.getDictionary()).length(); break; case DISTINCTCOUNTSMARTHLL: - result = getDistinctCountSmartHLLResult(dictionary, + result = getDistinctCountSmartHLLResult(Objects.requireNonNull(dataSource.getDictionary()), (DistinctCountSmartHLLAggregationFunction) aggregationFunction); break; default: throw new IllegalStateException( - "Dictionary based aggregation operator does not support function type: " + aggregationFunction.getType()); + "Non-scan based aggregation operator does not support function type: " + aggregationFunction.getType()); } aggregationResults.add(result); } @@ -122,8 +131,26 @@ public class DictionaryBasedAggregationOperator extends BaseOperator<Intermediat return new IntermediateResultsBlock(_aggregationFunctions, aggregationResults, false); } - private double toDouble(Comparable value) { - if (value instanceof Number) { + private static Double getMinValue(DataSource dataSource) { + Dictionary dictionary = dataSource.getDictionary(); + if (dictionary != null) { + return toDouble(dictionary.getMinVal()); + } + return toDouble(dataSource.getDataSourceMetadata().getMinValue()); + } + + private static Double getMaxValue(DataSource dataSource) { + Dictionary dictionary = dataSource.getDictionary(); + if (dictionary != null) { + return toDouble(dictionary.getMaxVal()); + } + return toDouble(dataSource.getDataSourceMetadata().getMaxValue()); + } + + private static Double toDouble(Comparable<?> value) { + if (value instanceof Double) { + return (Double) value; + } else if (value instanceof Number) { return ((Number) value).doubleValue(); } else { return Double.parseDouble(value.toString()); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java index 8a4deed..4ff6080 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java @@ -19,7 +19,6 @@ package org.apache.pinot.core.plan; import java.util.ArrayList; -import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -34,9 +33,8 @@ import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; import org.apache.pinot.core.operator.filter.BaseFilterOperator; import org.apache.pinot.core.operator.filter.CombinedFilterOperator; import org.apache.pinot.core.operator.query.AggregationOperator; -import org.apache.pinot.core.operator.query.DictionaryBasedAggregationOperator; import org.apache.pinot.core.operator.query.FilteredAggregationOperator; -import org.apache.pinot.core.operator.query.MetadataBasedAggregationOperator; +import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator; import org.apache.pinot.core.operator.transform.TransformOperator; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; @@ -46,7 +44,7 @@ import org.apache.pinot.core.startree.StarTreeUtils; import org.apache.pinot.core.startree.plan.StarTreeTransformPlanNode; import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.segment.spi.IndexSegment; -import org.apache.pinot.segment.spi.index.reader.Dictionary; +import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair; import org.apache.pinot.segment.spi.index.startree.StarTreeV2; @@ -60,9 +58,13 @@ import static org.apache.pinot.segment.spi.AggregationFunctionType.*; @SuppressWarnings("rawtypes") public class AggregationPlanNode implements PlanNode { private static final EnumSet<AggregationFunctionType> DICTIONARY_BASED_FUNCTIONS = - EnumSet.of(MIN, MINMV, MAX, MAXMV, MINMAXRANGE, MINMAXRANGEMV, DISTINCTCOUNT, DISTINCTCOUNTMV, DISTINCTCOUNTHLL, - DISTINCTCOUNTHLLMV, DISTINCTCOUNTRAWHLL, DISTINCTCOUNTRAWHLLMV, SEGMENTPARTITIONEDDISTINCTCOUNT, - DISTINCTCOUNTSMARTHLL); + EnumSet.of(MIN, MINMV, MAX, MAXMV, MINMAXRANGE, MINMAXRANGEMV, DISTINCTCOUNT, DISTINCTCOUNTMV, + DISTINCTCOUNTHLL, DISTINCTCOUNTHLLMV, DISTINCTCOUNTRAWHLL, DISTINCTCOUNTRAWHLLMV, + SEGMENTPARTITIONEDDISTINCTCOUNT, DISTINCTCOUNTSMARTHLL); + + // DISTINCTCOUNT excluded because consuming segment metadata contains unknown cardinality when there is no dictionary + private static final EnumSet<AggregationFunctionType> METADATA_BASED_FUNCTIONS = + EnumSet.of(COUNT, MIN, MINMV, MAX, MAXMV, MINMAXRANGE, MINMAXRANGEMV); private final IndexSegment _indexSegment; private final QueryContext _queryContext; @@ -176,18 +178,17 @@ public class AggregationPlanNode implements PlanNode { BaseFilterOperator filterOperator = filterPlanNode.run(); // Use metadata/dictionary to solve the query if possible - // TODO: Use the same operator for both of them so that COUNT(*), MAX(col) can be optimized if (filterOperator.isResultMatchingAll()) { - if (isFitForMetadataBasedPlan(aggregationFunctions)) { - return new MetadataBasedAggregationOperator(aggregationFunctions, _indexSegment.getSegmentMetadata(), - Collections.emptyMap()); - } else if (isFitForDictionaryBasedPlan(aggregationFunctions, _indexSegment)) { - Map<String, Dictionary> dictionaryMap = new HashMap<>(); - for (AggregationFunction aggregationFunction : aggregationFunctions) { - String column = ((ExpressionContext) aggregationFunction.getInputExpressions().get(0)).getIdentifier(); - dictionaryMap.computeIfAbsent(column, k -> _indexSegment.getDataSource(k).getDictionary()); + if (isFitForNonScanBasedPlan(aggregationFunctions, _indexSegment)) { + DataSource[] dataSources = new DataSource[aggregationFunctions.length]; + for (int i = 0; i < aggregationFunctions.length; i++) { + List<?> inputExpressions = aggregationFunctions[i].getInputExpressions(); + if (!inputExpressions.isEmpty()) { + String column = ((ExpressionContext) inputExpressions.get(0)).getIdentifier(); + dataSources[i] = _indexSegment.getDataSource(column); + } } - return new DictionaryBasedAggregationOperator(aggregationFunctions, dictionaryMap, numTotalDocs); + return new NonScanBasedAggregationOperator(aggregationFunctions, dataSources, numTotalDocs); } } @@ -223,36 +224,32 @@ public class AggregationPlanNode implements PlanNode { } /** - * Returns {@code true} if the given aggregations can be solved with segment metadata, {@code false} otherwise. - * <p>Aggregations supported: COUNT - */ - private static boolean isFitForMetadataBasedPlan(AggregationFunction[] aggregationFunctions) { - for (AggregationFunction aggregationFunction : aggregationFunctions) { - if (aggregationFunction.getType() != COUNT) { - return false; - } - } - return true; - } - - /** - * Returns {@code true} if the given aggregations can be solved with dictionary, {@code false} otherwise. + * Returns {@code true} if the given aggregations can be solved with dictionary or column metadata, {@code false} + * otherwise. */ - private static boolean isFitForDictionaryBasedPlan(AggregationFunction[] aggregationFunctions, + private static boolean isFitForNonScanBasedPlan(AggregationFunction[] aggregationFunctions, IndexSegment indexSegment) { for (AggregationFunction aggregationFunction : aggregationFunctions) { - if (!DICTIONARY_BASED_FUNCTIONS.contains(aggregationFunction.getType())) { - return false; + if (aggregationFunction.getType() == COUNT) { + continue; } ExpressionContext argument = (ExpressionContext) aggregationFunction.getInputExpressions().get(0); if (argument.getType() != ExpressionContext.Type.IDENTIFIER) { return false; } - String column = argument.getIdentifier(); - Dictionary dictionary = indexSegment.getDataSource(column).getDictionary(); - if (dictionary == null) { - return false; + DataSource dataSource = indexSegment.getDataSource(argument.getIdentifier()); + if (DICTIONARY_BASED_FUNCTIONS.contains(aggregationFunction.getType())) { + if (dataSource.getDictionary() != null) { + continue; + } + } + if (METADATA_BASED_FUNCTIONS.contains(aggregationFunction.getType())) { + if (dataSource.getDataSourceMetadata().getMaxValue() != null + && dataSource.getDataSourceMetadata().getMinValue() != null) { + continue; + } } + return false; } return true; } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java index e18f7f3..2bf354a 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java @@ -29,8 +29,7 @@ import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.query.AggregationGroupByOperator; import org.apache.pinot.core.operator.query.AggregationOperator; -import org.apache.pinot.core.operator.query.DictionaryBasedAggregationOperator; -import org.apache.pinot.core.operator.query.MetadataBasedAggregationOperator; +import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator; import org.apache.pinot.core.operator.query.SelectionOnlyOperator; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; @@ -165,32 +164,32 @@ public class MetadataAndDictionaryAggregationPlanMakerTest { }); // COUNT from metadata entries.add(new Object[]{ - "select count(*) from testTable", MetadataBasedAggregationOperator.class, AggregationOperator.class + "select count(*) from testTable", NonScanBasedAggregationOperator.class, AggregationOperator.class }); // COUNT from metadata with match all filter entries.add(new Object[]{ - "select count(*) from testTable where column1 > 10", MetadataBasedAggregationOperator.class, + "select count(*) from testTable where column1 > 10", NonScanBasedAggregationOperator.class, AggregationOperator.class }); // MIN/MAX from dictionary entries.add(new Object[]{ - "select max(daysSinceEpoch),min(daysSinceEpoch) from testTable", DictionaryBasedAggregationOperator.class, + "select max(daysSinceEpoch),min(daysSinceEpoch) from testTable", NonScanBasedAggregationOperator.class, AggregationOperator.class }); // MIN/MAX from dictionary with match all filter entries.add(new Object[]{ "select max(daysSinceEpoch),min(daysSinceEpoch) from testTable where column1 > 10", - DictionaryBasedAggregationOperator.class, AggregationOperator.class + NonScanBasedAggregationOperator.class, AggregationOperator.class }); // MINMAXRANGE from dictionary entries.add(new Object[]{ - "select minmaxrange(daysSinceEpoch) from testTable", DictionaryBasedAggregationOperator.class, + "select minmaxrange(daysSinceEpoch) from testTable", NonScanBasedAggregationOperator.class, AggregationOperator.class }); // MINMAXRANGE from dictionary with match all filter entries.add(new Object[]{ "select minmaxrange(daysSinceEpoch) from testTable where column1 > 10", - DictionaryBasedAggregationOperator.class, AggregationOperator.class + NonScanBasedAggregationOperator.class, AggregationOperator.class }); // Aggregation entries.add(new Object[]{ @@ -203,7 +202,8 @@ public class MetadataAndDictionaryAggregationPlanMakerTest { }); // COUNT from metadata, MIN from dictionary entries.add(new Object[]{ - "select count(*),min(column17) from testTable", AggregationOperator.class, AggregationOperator.class + "select count(*),min(column17) from testTable", NonScanBasedAggregationOperator.class, + AggregationOperator.class }); // Aggregation group-by entries.add(new Object[]{ diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java index 513b806..f86c6a6 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java @@ -38,7 +38,7 @@ import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; import org.apache.pinot.core.operator.query.AggregationGroupByOperator; import org.apache.pinot.core.operator.query.AggregationOperator; -import org.apache.pinot.core.operator.query.DictionaryBasedAggregationOperator; +import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator; import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; @@ -193,8 +193,8 @@ public class DistinctCountQueriesTest extends BaseQueriesTest { // Inner segment for (Object operator : Arrays.asList(getOperatorForSqlQuery(query), getOperatorForSqlQueryWithFilter(query))) { - assertTrue(operator instanceof DictionaryBasedAggregationOperator); - IntermediateResultsBlock resultsBlock = ((DictionaryBasedAggregationOperator) operator).nextBlock(); + assertTrue(operator instanceof NonScanBasedAggregationOperator); + IntermediateResultsBlock resultsBlock = ((NonScanBasedAggregationOperator) operator).nextBlock(); QueriesTestUtils.testInnerSegmentExecutionStatistics(((Operator) operator).getExecutionStatistics(), NUM_RECORDS, 0, 0, NUM_RECORDS); List<Object> aggregationResult = resultsBlock.getAggregationResult(); @@ -307,8 +307,8 @@ public class DistinctCountQueriesTest extends BaseQueriesTest { // Inner segment String[] interSegmentsExpectedResults = new String[5]; for (Object operator : Arrays.asList(getOperatorForSqlQuery(query), getOperatorForSqlQueryWithFilter(query))) { - assertTrue(operator instanceof DictionaryBasedAggregationOperator); - IntermediateResultsBlock resultsBlock = ((DictionaryBasedAggregationOperator) operator).nextBlock(); + assertTrue(operator instanceof NonScanBasedAggregationOperator); + IntermediateResultsBlock resultsBlock = ((NonScanBasedAggregationOperator) operator).nextBlock(); QueriesTestUtils.testInnerSegmentExecutionStatistics(((Operator) operator).getExecutionStatistics(), NUM_RECORDS, 0, 0, NUM_RECORDS); List<Object> aggregationResult = resultsBlock.getAggregationResult(); @@ -378,8 +378,8 @@ public class DistinctCountQueriesTest extends BaseQueriesTest { // Change log2m query = "SELECT DISTINCTCOUNTHLL(intColumn, 12) FROM testTable"; operator = getOperatorForSqlQuery(query); - assertTrue(operator instanceof DictionaryBasedAggregationOperator); - aggregationResult = ((DictionaryBasedAggregationOperator) operator).nextBlock().getAggregationResult(); + assertTrue(operator instanceof NonScanBasedAggregationOperator); + aggregationResult = ((NonScanBasedAggregationOperator) operator).nextBlock().getAggregationResult(); assertNotNull(aggregationResult); assertEquals(aggregationResult.size(), 1); assertTrue(aggregationResult.get(0) instanceof HyperLogLog); @@ -401,8 +401,8 @@ public class DistinctCountQueriesTest extends BaseQueriesTest { // Inner segment String[] interSegmentsExpectedResults = new String[6]; for (Object operator : Arrays.asList(getOperatorForSqlQuery(query), getOperatorForSqlQueryWithFilter(query))) { - assertTrue(operator instanceof DictionaryBasedAggregationOperator); - IntermediateResultsBlock resultsBlock = ((DictionaryBasedAggregationOperator) operator).nextBlock(); + assertTrue(operator instanceof NonScanBasedAggregationOperator); + IntermediateResultsBlock resultsBlock = ((NonScanBasedAggregationOperator) operator).nextBlock(); QueriesTestUtils.testInnerSegmentExecutionStatistics(((Operator) operator).getExecutionStatistics(), NUM_RECORDS, 0, 0, NUM_RECORDS); List<Object> aggregationResult = resultsBlock.getAggregationResult(); @@ -471,8 +471,8 @@ public class DistinctCountQueriesTest extends BaseQueriesTest { // Change log2m query = "SELECT DISTINCTCOUNTSMARTHLL(intColumn, 'hllLog2m=8;hllConversionThreshold=10') FROM testTable"; operator = getOperatorForSqlQuery(query); - assertTrue(operator instanceof DictionaryBasedAggregationOperator); - aggregationResult = ((DictionaryBasedAggregationOperator) operator).nextBlock().getAggregationResult(); + assertTrue(operator instanceof NonScanBasedAggregationOperator); + aggregationResult = ((NonScanBasedAggregationOperator) operator).nextBlock().getAggregationResult(); assertNotNull(aggregationResult); assertEquals(aggregationResult.size(), 1); assertTrue(aggregationResult.get(0) instanceof HyperLogLog); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java index 033d17c..617ffd7 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java @@ -502,14 +502,14 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest { List<Object[]> result1 = new ArrayList<>(); result1.add(new Object[]{"BROKER_REDUCE(limit:10)", 0, -1}); result1.add(new Object[]{"COMBINE_AGGREGATE", 1, 0}); - result1.add(new Object[]{"AGGREGATE_METADATA", 2, 1}); + result1.add(new Object[]{"AGGREGATE_NO_SCAN", 2, 1}); check(query1, new ResultTable(DATA_SCHEMA, result1)); String query2 = "EXPLAIN PLAN FOR SELECT min(invertedIndexCol1) FROM testTable"; List<Object[]> result2 = new ArrayList<>(); result2.add(new Object[]{"BROKER_REDUCE(limit:10)", 0, -1}); result2.add(new Object[]{"COMBINE_AGGREGATE", 1, 0}); - result2.add(new Object[]{"AGGREGATE_DICTIONARY", 2, 1}); + result2.add(new Object[]{"AGGREGATE_NO_SCAN", 2, 1}); check(query2, new ResultTable(DATA_SCHEMA, result2)); String query3 = diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentPartitionedDistinctCountQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentPartitionedDistinctCountQueriesTest.java index 0520086..06ee41e 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentPartitionedDistinctCountQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentPartitionedDistinctCountQueriesTest.java @@ -37,7 +37,7 @@ import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; import org.apache.pinot.core.operator.query.AggregationGroupByOperator; import org.apache.pinot.core.operator.query.AggregationOperator; -import org.apache.pinot.core.operator.query.DictionaryBasedAggregationOperator; +import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator; import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; @@ -159,8 +159,8 @@ public class SegmentPartitionedDistinctCountQueriesTest extends BaseQueriesTest // Inner segment for (Object operator : Arrays.asList(getOperatorForSqlQuery(query), getOperatorForSqlQueryWithFilter(query))) { - assertTrue(operator instanceof DictionaryBasedAggregationOperator); - IntermediateResultsBlock resultsBlock = ((DictionaryBasedAggregationOperator) operator).nextBlock(); + assertTrue(operator instanceof NonScanBasedAggregationOperator); + IntermediateResultsBlock resultsBlock = ((NonScanBasedAggregationOperator) operator).nextBlock(); QueriesTestUtils.testInnerSegmentExecutionStatistics(((Operator) operator).getExecutionStatistics(), NUM_RECORDS, 0, 0, NUM_RECORDS); List<Object> aggregationResult = resultsBlock.getAggregationResult(); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MetadataAndDictionaryAggregationPlanClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MetadataAndDictionaryAggregationPlanClusterIntegrationTest.java index 5b3d1e1..42e3dfd 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MetadataAndDictionaryAggregationPlanClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MetadataAndDictionaryAggregationPlanClusterIntegrationTest.java @@ -36,8 +36,7 @@ import static org.testng.Assert.assertTrue; /** - * Integration test to check aggregation functions which use {@code DictionaryBasedAggregationPlanNode} and - * {@code MetadataBasedAggregationPlanNode}. + * Integration test to check aggregation functions which use {@code DataSourceBasedAggregationPlanNode} */ // TODO: remove this integration test and add unit test for metadata and dictionary based aggregation operator public class MetadataAndDictionaryAggregationPlanClusterIntegrationTest extends BaseClusterIntegrationTest { @@ -235,44 +234,44 @@ public class MetadataAndDictionaryAggregationPlanClusterIntegrationTest extends // Check execution stats JsonNode response; - // Dictionary column: answered by DictionaryBasedAggregationOperator + // Dictionary column: answered by NonScanBasedAggregationOperator pqlQuery = "SELECT MAX(ArrTime) FROM " + tableName; response = postQuery(pqlQuery); assertEquals(response.get("numEntriesScannedPostFilter").asLong(), 0); assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0); assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong()); - // Non dictionary column: not answered by DictionaryBasedAggregationOperator + // Non dictionary column: answered by NonScanBasedAggregationOperator pqlQuery = "SELECT MAX(DepDelay) FROM " + tableName; response = postQuery(pqlQuery); - assertEquals(response.get("numEntriesScannedPostFilter").asLong(), response.get("numDocsScanned").asLong()); + assertEquals(response.get("numEntriesScannedPostFilter").asLong(), 0); assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0); assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong()); // multiple dictionary based aggregation functions, dictionary columns: answered by - // DictionaryBasedAggregationOperator + // NonScanBasedAggregationOperator pqlQuery = "SELECT MAX(ArrTime),MIN(ArrTime) FROM " + tableName; response = postQuery(pqlQuery); assertEquals(response.get("numEntriesScannedPostFilter").asLong(), 0); assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0); assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong()); - // multiple aggregation functions, mix of dictionary based and non dictionary based: not answered by - // DictionaryBasedAggregationOperator + // multiple aggregation functions, mix of dictionary based and non dictionary based: answered by + // NonScanBasedAggregationOperator pqlQuery = "SELECT MAX(ArrTime),COUNT(ArrTime) FROM " + tableName; response = postQuery(pqlQuery); - assertEquals(response.get("numEntriesScannedPostFilter").asLong(), response.get("numDocsScanned").asLong()); + assertEquals(response.get("numEntriesScannedPostFilter").asLong(), 0); assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0); assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong()); - // group by in query : not answered by DictionaryBasedAggregationOperator + // group by in query : not answered by NonScanBasedAggregationOperator pqlQuery = "SELECT MAX(ArrTime) FROM " + tableName + " group by DaysSinceEpoch"; response = postQuery(pqlQuery); assertTrue(response.get("numEntriesScannedPostFilter").asLong() > 0); assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0); assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong()); - // filter in query: not answered by DictionaryBasedAggregationOperator + // filter in query: not answered by NonScanBasedAggregationOperator pqlQuery = "SELECT MAX(ArrTime) FROM " + tableName + " where DaysSinceEpoch > 16100"; response = postQuery(pqlQuery); assertTrue(response.get("numEntriesScannedPostFilter").asLong() > 0); @@ -305,23 +304,23 @@ public class MetadataAndDictionaryAggregationPlanClusterIntegrationTest extends assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0); assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong()); - // group by present in query: not answered by MetadataBasedAggregationOperator + // group by present in query: not answered by NonScanBasedAggregationOperator pqlQuery = "SELECT COUNT(*) FROM " + tableName + " GROUP BY DaysSinceEpoch"; response = postQuery(pqlQuery); assertTrue(response.get("numEntriesScannedPostFilter").asLong() > 0); assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0); assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong()); - // filter present in query: not answered by MetadataBasedAggregationOperator + // filter present in query: not answered by NonScanBasedAggregationOperator pqlQuery = "SELECT COUNT(*) FROM " + tableName + " WHERE DaysSinceEpoch > 16100"; response = postQuery(pqlQuery); assertEquals(response.get("numEntriesScannedPostFilter").asLong(), 0); assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0); - // mixed aggregation functions in query: not answered by MetadataBasedAggregationOperator + // mixed aggregation functions in query: not answered by NonScanBasedAggregationOperator pqlQuery = "SELECT COUNT(*),MAX(ArrTime) FROM " + tableName; response = postQuery(pqlQuery); - assertTrue(response.get("numEntriesScannedPostFilter").asLong() > 0); + assertTrue(response.get("numEntriesScannedPostFilter").asLong() == 0); assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0); assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong()); } diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueries.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueries.java index 21a899a..0d13c6f 100644 --- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueries.java +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueries.java @@ -130,13 +130,18 @@ public class BenchmarkQueries extends BaseQueriesTest { + "year(INT_COL) as y, month(INT_COL) as m " + "from MyTable group by y, m"; + public static final String RAW_COLUMN_SUMMARY_STATS = "SELECT " + + "MIN(RAW_INT_COL), MAX(RAW_INT_COL), COUNT(*) " + + "FROM MyTable"; + @Param("1500000") private int _numRows; @Param({"EXP(0.001)", "EXP(0.5)", "EXP(0.999)"}) String _scenario; @Param({ MULTI_GROUP_BY_WITH_RAW_QUERY, MULTI_GROUP_BY_WITH_RAW_QUERY_2, FILTERED_QUERY, NON_FILTERED_QUERY, - SUM_QUERY, NO_INDEX_LIKE_QUERY, MULTI_GROUP_BY_ORDER_BY, MULTI_GROUP_BY_ORDER_BY_LOW_HIGH, TIME_GROUP_BY + SUM_QUERY, NO_INDEX_LIKE_QUERY, MULTI_GROUP_BY_ORDER_BY, MULTI_GROUP_BY_ORDER_BY_LOW_HIGH, TIME_GROUP_BY, + RAW_COLUMN_SUMMARY_STATS }) String _query; private IndexSegment _indexSegment; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org