This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 5e76b100b9 Support for DistinctSumMV and DistinctAvgMV aggregation functions (#10128) 5e76b100b9 is described below commit 5e76b100b924340b1f401994f3cd84c876377291 Author: Vivek Iyer Vaidyanathan <vviveki...@gmail.com> AuthorDate: Wed Jan 18 17:09:25 2023 -0800 Support for DistinctSumMV and DistinctAvgMV aggregation functions (#10128) * Add support for DistinctSumMV and DistinctAverageMV aggregation functions * Empty commit to retrigger tests * Address review comments --- .../query/NonScanBasedAggregationOperator.java | 10 +- .../pinot/core/plan/AggregationPlanNode.java | 2 +- .../function/AggregationFunctionFactory.java | 4 + .../BaseDistinctAggregateAggregationFunction.java | 370 +++++++++++++++++---- .../function/DistinctAvgAggregationFunction.java | 25 +- ....java => DistinctAvgMVAggregationFunction.java} | 31 +- .../function/DistinctCountAggregationFunction.java | 25 +- .../DistinctCountMVAggregationFunction.java | 244 +------------- .../function/DistinctSumAggregationFunction.java | 25 +- ....java => DistinctSumMVAggregationFunction.java} | 31 +- .../pinot/queries/ExplainPlanQueriesTest.java | 126 ++++--- ...terSegmentAggregationMultiValueQueriesTest.java | 84 ++++- .../pinot/segment/spi/AggregationFunctionType.java | 2 + 13 files changed, 625 insertions(+), 354 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java index e036e17a8d..bf7e4e63c2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java @@ -100,7 +100,11 @@ public class NonScanBasedAggregationOperator extends BaseOperator<AggregationRes result = new MinMaxRangePair(getMinValue(dataSource), getMaxValue(dataSource)); break; case DISTINCTCOUNT: + case DISTINCTSUM: + case DISTINCTAVG: case DISTINCTCOUNTMV: + case DISTINCTSUMMV: + case DISTINCTAVGMV: result = getDistinctValueSet(Objects.requireNonNull(dataSource.getDictionary())); break; case DISTINCTCOUNTHLL: @@ -113,12 +117,6 @@ public class NonScanBasedAggregationOperator extends BaseOperator<AggregationRes result = getDistinctCountHLLResult(Objects.requireNonNull(dataSource.getDictionary()), ((DistinctCountRawHLLAggregationFunction) aggregationFunction).getDistinctCountHLLAggregationFunction()); break; - case DISTINCTSUM: - result = getDistinctValueSet(Objects.requireNonNull(dataSource.getDictionary())); - break; - case DISTINCTAVG: - result = getDistinctValueSet(Objects.requireNonNull(dataSource.getDictionary())); - break; case SEGMENTPARTITIONEDDISTINCTCOUNT: result = (long) Objects.requireNonNull(dataSource.getDictionary()).length(); break; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java index 8ab98970f4..74e5951412 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java @@ -56,7 +56,7 @@ public class AggregationPlanNode implements PlanNode { private static final EnumSet<AggregationFunctionType> DICTIONARY_BASED_FUNCTIONS = EnumSet.of(MIN, MINMV, MAX, MAXMV, MINMAXRANGE, MINMAXRANGEMV, DISTINCTCOUNT, DISTINCTCOUNTMV, DISTINCTCOUNTHLL, DISTINCTCOUNTHLLMV, DISTINCTCOUNTRAWHLL, DISTINCTCOUNTRAWHLLMV, SEGMENTPARTITIONEDDISTINCTCOUNT, - DISTINCTCOUNTSMARTHLL, DISTINCTSUM, DISTINCTAVG); + DISTINCTCOUNTSMARTHLL, DISTINCTSUM, DISTINCTAVG, DISTINCTSUMMV, DISTINCTAVGMV); // DISTINCTCOUNT excluded because consuming segment metadata contains unknown cardinality when there is no dictionary private static final EnumSet<AggregationFunctionType> METADATA_BASED_FUNCTIONS = diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java index 157d1b64da..9b571ff3c4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java @@ -266,6 +266,10 @@ public class AggregationFunctionFactory { return new DistinctCountHLLMVAggregationFunction(arguments); case DISTINCTCOUNTRAWHLLMV: return new DistinctCountRawHLLMVAggregationFunction(arguments); + case DISTINCTSUMMV: + return new DistinctSumMVAggregationFunction(firstArgument); + case DISTINCTAVGMV: + return new DistinctAvgMVAggregationFunction(firstArgument); case DISTINCT: return new DistinctAggregationFunction(arguments, queryContext.getOrderByExpressions(), queryContext.getLimit()); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseDistinctAggregateAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseDistinctAggregateAggregationFunction.java index 0f9f09a392..0dc2ed35e5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseDistinctAggregateAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseDistinctAggregateAggregationFunction.java @@ -71,7 +71,73 @@ public abstract class BaseDistinctAggregateAggregationFunction<T extends Compara } @Override - public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + public Set extractAggregationResult(AggregationResultHolder aggregationResultHolder) { + Object result = aggregationResultHolder.getResult(); + if (result == null) { + // Use empty IntOpenHashSet as a place holder for empty result + return new IntOpenHashSet(); + } + + if (result instanceof DictIdsWrapper) { + // For dictionary-encoded expression, convert dictionary ids to values + return convertToValueSet((DictIdsWrapper) result); + } else { + // For non-dictionary-encoded expression, directly return the value set + return (Set) result; + } + } + + @Override + public Set extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { + Object result = groupByResultHolder.getResult(groupKey); + if (result == null) { + // NOTE: Return an empty IntOpenHashSet for empty result. + return new IntOpenHashSet(); + } + + if (result instanceof DictIdsWrapper) { + // For dictionary-encoded expression, convert dictionary ids to values + return convertToValueSet((DictIdsWrapper) result); + } else { + // For non-dictionary-encoded expression, directly return the value set + return (Set) result; + } + } + + @Override + public Set merge(Set intermediateResult1, Set intermediateResult2) { + if (intermediateResult1.isEmpty()) { + return intermediateResult2; + } + if (intermediateResult2.isEmpty()) { + return intermediateResult1; + } + intermediateResult1.addAll(intermediateResult2); + return intermediateResult1; + } + + @Override + public ColumnDataType getIntermediateResultColumnType() { + return ColumnDataType.OBJECT; + } + + /** + * Returns the dictionary id bitmap from the result holder or creates a new one if it does not exist. + */ + protected static RoaringBitmap getDictIdBitmap(AggregationResultHolder aggregationResultHolder, + Dictionary dictionary) { + DictIdsWrapper dictIdsWrapper = aggregationResultHolder.getResult(); + if (dictIdsWrapper == null) { + dictIdsWrapper = new DictIdsWrapper(dictionary); + aggregationResultHolder.setValue(dictIdsWrapper); + } + return dictIdsWrapper._dictIdBitmap; + } + + /** + * Performs aggregation for a SV column + */ + protected void svAggregate(int length, AggregationResultHolder aggregationResultHolder, Map<ExpressionContext, BlockValSet> blockValSetMap) { BlockValSet blockValSet = blockValSetMap.get(_expression); @@ -136,8 +202,85 @@ public abstract class BaseDistinctAggregateAggregationFunction<T extends Compara } } - @Override - public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + /** + * Performs aggregation for a MV column + */ + protected void mvAggregate(int length, AggregationResultHolder aggregationResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet blockValSet = blockValSetMap.get(_expression); + + // For dictionary-encoded expression, store dictionary ids into the bitmap + Dictionary dictionary = blockValSet.getDictionary(); + if (dictionary != null) { + RoaringBitmap dictIdBitmap = getDictIdBitmap(aggregationResultHolder, dictionary); + int[][] dictIds = blockValSet.getDictionaryIdsMV(); + for (int i = 0; i < length; i++) { + dictIdBitmap.add(dictIds[i]); + } + return; + } + + // For non-dictionary-encoded expression, store values into the value set + DataType storedType = blockValSet.getValueType().getStoredType(); + Set valueSet = getValueSet(aggregationResultHolder, storedType); + switch (storedType) { + case INT: + IntOpenHashSet intSet = (IntOpenHashSet) valueSet; + int[][] intValues = blockValSet.getIntValuesMV(); + for (int i = 0; i < length; i++) { + for (int value : intValues[i]) { + intSet.add(value); + } + } + break; + case LONG: + LongOpenHashSet longSet = (LongOpenHashSet) valueSet; + long[][] longValues = blockValSet.getLongValuesMV(); + for (int i = 0; i < length; i++) { + for (long value : longValues[i]) { + longSet.add(value); + } + } + break; + case FLOAT: + FloatOpenHashSet floatSet = (FloatOpenHashSet) valueSet; + float[][] floatValues = blockValSet.getFloatValuesMV(); + for (int i = 0; i < length; i++) { + for (float value : floatValues[i]) { + floatSet.add(value); + } + } + break; + case DOUBLE: + DoubleOpenHashSet doubleSet = (DoubleOpenHashSet) valueSet; + double[][] doubleValues = blockValSet.getDoubleValuesMV(); + for (int i = 0; i < length; i++) { + for (double value : doubleValues[i]) { + doubleSet.add(value); + } + } + break; + case STRING: + ObjectOpenHashSet<String> stringSet = (ObjectOpenHashSet<String>) valueSet; + String[][] stringValues = blockValSet.getStringValuesMV(); + for (int i = 0; i < length; i++) { + //noinspection ManualArrayToCollectionCopy + for (String value : stringValues[i]) { + //noinspection UseBulkOperation + stringSet.add(value); + } + } + break; + default: + throw new IllegalStateException( + "Illegal data type for " + _functionType.getName() + " aggregation function: " + storedType); + } + } + + /** + * Performs aggregation for a SV column with group by on a SV column. + */ + protected void svAggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, Map<ExpressionContext, BlockValSet> blockValSetMap) { BlockValSet blockValSet = blockValSetMap.get(_expression); @@ -199,8 +342,86 @@ public abstract class BaseDistinctAggregateAggregationFunction<T extends Compara } } - @Override - public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + /** + * Performs aggregation for a MV column with group by on a SV column. + */ + protected void mvAggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet blockValSet = blockValSetMap.get(_expression); + + // For dictionary-encoded expression, store dictionary ids into the bitmap + Dictionary dictionary = blockValSet.getDictionary(); + if (dictionary != null) { + int[][] dictIds = blockValSet.getDictionaryIdsMV(); + for (int i = 0; i < length; i++) { + getDictIdBitmap(groupByResultHolder, groupKeyArray[i], dictionary).add(dictIds[i]); + } + return; + } + + // For non-dictionary-encoded expression, store values into the value set + DataType storedType = blockValSet.getValueType().getStoredType(); + switch (storedType) { + case INT: + int[][] intValues = blockValSet.getIntValuesMV(); + for (int i = 0; i < length; i++) { + IntOpenHashSet intSet = (IntOpenHashSet) getValueSet(groupByResultHolder, groupKeyArray[i], DataType.INT); + for (int value : intValues[i]) { + intSet.add(value); + } + } + break; + case LONG: + long[][] longValues = blockValSet.getLongValuesMV(); + for (int i = 0; i < length; i++) { + LongOpenHashSet longSet = (LongOpenHashSet) getValueSet(groupByResultHolder, groupKeyArray[i], DataType.LONG); + for (long value : longValues[i]) { + longSet.add(value); + } + } + break; + case FLOAT: + float[][] floatValues = blockValSet.getFloatValuesMV(); + for (int i = 0; i < length; i++) { + FloatOpenHashSet floatSet = + (FloatOpenHashSet) getValueSet(groupByResultHolder, groupKeyArray[i], DataType.FLOAT); + for (float value : floatValues[i]) { + floatSet.add(value); + } + } + break; + case DOUBLE: + double[][] doubleValues = blockValSet.getDoubleValuesMV(); + for (int i = 0; i < length; i++) { + DoubleOpenHashSet doubleSet = + (DoubleOpenHashSet) getValueSet(groupByResultHolder, groupKeyArray[i], DataType.DOUBLE); + for (double value : doubleValues[i]) { + doubleSet.add(value); + } + } + break; + case STRING: + String[][] stringValues = blockValSet.getStringValuesMV(); + for (int i = 0; i < length; i++) { + ObjectOpenHashSet<String> stringSet = + (ObjectOpenHashSet<String>) getValueSet(groupByResultHolder, groupKeyArray[i], DataType.STRING); + //noinspection ManualArrayToCollectionCopy + for (String value : stringValues[i]) { + //noinspection UseBulkOperation + stringSet.add(value); + } + } + break; + default: + throw new IllegalStateException( + "Illegal data type for " + _functionType.getName() + " aggregation function: " + storedType); + } + } + + /** + * Performs aggregation for a SV column with group by on a MV column. + */ + protected void svAggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, Map<ExpressionContext, BlockValSet> blockValSetMap) { BlockValSet blockValSet = blockValSetMap.get(_expression); @@ -259,68 +480,91 @@ public abstract class BaseDistinctAggregateAggregationFunction<T extends Compara } } - @Override - public Set extractAggregationResult(AggregationResultHolder aggregationResultHolder) { - Object result = aggregationResultHolder.getResult(); - if (result == null) { - // Use empty IntOpenHashSet as a place holder for empty result - return new IntOpenHashSet(); - } - - if (result instanceof DictIdsWrapper) { - // For dictionary-encoded expression, convert dictionary ids to values - return convertToValueSet((DictIdsWrapper) result); - } else { - // For non-dictionary-encoded expression, directly return the value set - return (Set) result; - } - } - - @Override - public Set extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { - Object result = groupByResultHolder.getResult(groupKey); - if (result == null) { - // NOTE: Return an empty IntOpenHashSet for empty result. - return new IntOpenHashSet(); - } - - if (result instanceof DictIdsWrapper) { - // For dictionary-encoded expression, convert dictionary ids to values - return convertToValueSet((DictIdsWrapper) result); - } else { - // For non-dictionary-encoded expression, directly return the value set - return (Set) result; - } - } + /** + * Performs aggregation for a MV column with group by on a MV column. + */ + protected void mvAggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet blockValSet = blockValSetMap.get(_expression); - @Override - public Set merge(Set intermediateResult1, Set intermediateResult2) { - if (intermediateResult1.isEmpty()) { - return intermediateResult2; - } - if (intermediateResult2.isEmpty()) { - return intermediateResult1; + // For dictionary-encoded expression, store dictionary ids into the bitmap + Dictionary dictionary = blockValSet.getDictionary(); + if (dictionary != null) { + int[][] dictIds = blockValSet.getDictionaryIdsMV(); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + getDictIdBitmap(groupByResultHolder, groupKey, dictionary).add(dictIds[i]); + } + } + return; } - intermediateResult1.addAll(intermediateResult2); - return intermediateResult1; - } - @Override - public ColumnDataType getIntermediateResultColumnType() { - return ColumnDataType.OBJECT; - } - - /** - * Returns the dictionary id bitmap from the result holder or creates a new one if it does not exist. - */ - protected static RoaringBitmap getDictIdBitmap(AggregationResultHolder aggregationResultHolder, - Dictionary dictionary) { - DictIdsWrapper dictIdsWrapper = aggregationResultHolder.getResult(); - if (dictIdsWrapper == null) { - dictIdsWrapper = new DictIdsWrapper(dictionary); - aggregationResultHolder.setValue(dictIdsWrapper); + // For non-dictionary-encoded expression, store hash code of the values into the value set + DataType storedType = blockValSet.getValueType().getStoredType(); + switch (storedType) { + case INT: + int[][] intValues = blockValSet.getIntValuesMV(); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + IntOpenHashSet intSet = (IntOpenHashSet) getValueSet(groupByResultHolder, groupKey, DataType.INT); + for (int value : intValues[i]) { + intSet.add(value); + } + } + } + break; + case LONG: + long[][] longValues = blockValSet.getLongValuesMV(); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + LongOpenHashSet longSet = (LongOpenHashSet) getValueSet(groupByResultHolder, groupKey, DataType.LONG); + for (long value : longValues[i]) { + longSet.add(value); + } + } + } + break; + case FLOAT: + float[][] floatValues = blockValSet.getFloatValuesMV(); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + FloatOpenHashSet floatSet = (FloatOpenHashSet) getValueSet(groupByResultHolder, groupKey, DataType.FLOAT); + for (float value : floatValues[i]) { + floatSet.add(value); + } + } + } + break; + case DOUBLE: + double[][] doubleValues = blockValSet.getDoubleValuesMV(); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + DoubleOpenHashSet doubleSet = + (DoubleOpenHashSet) getValueSet(groupByResultHolder, groupKey, DataType.DOUBLE); + for (double value : doubleValues[i]) { + doubleSet.add(value); + } + } + } + break; + case STRING: + String[][] stringValues = blockValSet.getStringValuesMV(); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + ObjectOpenHashSet<String> stringSet = + (ObjectOpenHashSet<String>) getValueSet(groupByResultHolder, groupKey, DataType.STRING); + //noinspection ManualArrayToCollectionCopy + for (String value : stringValues[i]) { + //noinspection UseBulkOperation + stringSet.add(value); + } + } + } + break; + default: + throw new IllegalStateException( + "Illegal data type for " + _functionType.getName() + " aggregation function: " + storedType); } - return dictIdsWrapper._dictIdBitmap; } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgAggregationFunction.java index 41b8550963..bca958957f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgAggregationFunction.java @@ -18,13 +18,18 @@ */ package org.apache.pinot.core.query.aggregation.function; +import java.util.Map; import java.util.Set; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.segment.spi.AggregationFunctionType; + /** - * Aggregation function to compute the average of distinct values. + * Aggregation function to compute the average of distinct values for an SV column. */ public class DistinctAvgAggregationFunction extends BaseDistinctAggregateAggregationFunction<Double> { @@ -32,6 +37,24 @@ public class DistinctAvgAggregationFunction extends BaseDistinctAggregateAggrega super(expression, AggregationFunctionType.DISTINCTAVG); } + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + svAggregate(length, aggregationResultHolder, blockValSetMap); + } + + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + svAggregateGroupBySV(length, groupKeyArray, groupByResultHolder, blockValSetMap); + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + svAggregateGroupByMV(length, groupKeysArray, groupByResultHolder, blockValSetMap); + } + @Override public DataSchema.ColumnDataType getFinalResultColumnType() { return DataSchema.ColumnDataType.DOUBLE; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgMVAggregationFunction.java similarity index 55% copy from pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgAggregationFunction.java copy to pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgMVAggregationFunction.java index 41b8550963..30a8e00492 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgMVAggregationFunction.java @@ -18,18 +18,41 @@ */ package org.apache.pinot.core.query.aggregation.function; +import java.util.Map; import java.util.Set; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.segment.spi.AggregationFunctionType; + /** - * Aggregation function to compute the average of distinct values. + * Aggregation function to compute the average of distinct values for an MV column. */ -public class DistinctAvgAggregationFunction extends BaseDistinctAggregateAggregationFunction<Double> { +public class DistinctAvgMVAggregationFunction extends BaseDistinctAggregateAggregationFunction<Double> { + + public DistinctAvgMVAggregationFunction(ExpressionContext expression) { + super(expression, AggregationFunctionType.DISTINCTAVGMV); + } + + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + mvAggregate(length, aggregationResultHolder, blockValSetMap); + } - public DistinctAvgAggregationFunction(ExpressionContext expression) { - super(expression, AggregationFunctionType.DISTINCTAVG); + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + mvAggregateGroupBySV(length, groupKeyArray, groupByResultHolder, blockValSetMap); + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + mvAggregateGroupByMV(length, groupKeysArray, groupByResultHolder, blockValSetMap); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java index 899a2d6ab5..aec983b8df 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java @@ -18,13 +18,18 @@ */ package org.apache.pinot.core.query.aggregation.function; +import java.util.Map; import java.util.Set; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.segment.spi.AggregationFunctionType; + /** - * Aggregation function to compute the average of distinct values. + * Aggregation function to compute the average of distinct values for an SV column */ public class DistinctCountAggregationFunction extends BaseDistinctAggregateAggregationFunction<Integer> { @@ -32,6 +37,24 @@ public class DistinctCountAggregationFunction extends BaseDistinctAggregateAggre super(expression, AggregationFunctionType.DISTINCTCOUNT); } + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + svAggregate(length, aggregationResultHolder, blockValSetMap); + } + + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + svAggregateGroupBySV(length, groupKeyArray, groupByResultHolder, blockValSetMap); + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + svAggregateGroupByMV(length, groupKeysArray, groupByResultHolder, blockValSetMap); + } + @Override public ColumnDataType getFinalResultColumnType() { return ColumnDataType.INT; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java index 6dc65461a9..d8d257b400 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java @@ -18,261 +18,51 @@ */ package org.apache.pinot.core.query.aggregation.function; -import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet; -import it.unimi.dsi.fastutil.floats.FloatOpenHashSet; -import it.unimi.dsi.fastutil.ints.IntOpenHashSet; -import it.unimi.dsi.fastutil.longs.LongOpenHashSet; -import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; import java.util.Map; import java.util.Set; import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.common.BlockValSet; import org.apache.pinot.core.query.aggregation.AggregationResultHolder; import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.segment.spi.AggregationFunctionType; -import org.apache.pinot.segment.spi.index.reader.Dictionary; -import org.apache.pinot.spi.data.FieldSpec.DataType; -import org.roaringbitmap.RoaringBitmap; -@SuppressWarnings({"rawtypes", "unchecked"}) -public class DistinctCountMVAggregationFunction extends DistinctCountAggregationFunction { +/** + * Aggregation function to compute the average of distinct values for an MV column + */ +public class DistinctCountMVAggregationFunction extends BaseDistinctAggregateAggregationFunction<Integer> { public DistinctCountMVAggregationFunction(ExpressionContext expression) { - super(expression); + super(expression, AggregationFunctionType.DISTINCTCOUNTMV); } - @Override - public AggregationFunctionType getType() { - return AggregationFunctionType.DISTINCTCOUNTMV; - } @Override public void aggregate(int length, AggregationResultHolder aggregationResultHolder, Map<ExpressionContext, BlockValSet> blockValSetMap) { - BlockValSet blockValSet = blockValSetMap.get(_expression); - - // For dictionary-encoded expression, store dictionary ids into the bitmap - Dictionary dictionary = blockValSet.getDictionary(); - if (dictionary != null) { - RoaringBitmap dictIdBitmap = getDictIdBitmap(aggregationResultHolder, dictionary); - int[][] dictIds = blockValSet.getDictionaryIdsMV(); - for (int i = 0; i < length; i++) { - dictIdBitmap.add(dictIds[i]); - } - return; - } - - // For non-dictionary-encoded expression, store values into the value set - DataType storedType = blockValSet.getValueType().getStoredType(); - Set valueSet = getValueSet(aggregationResultHolder, storedType); - switch (storedType) { - case INT: - IntOpenHashSet intSet = (IntOpenHashSet) valueSet; - int[][] intValues = blockValSet.getIntValuesMV(); - for (int i = 0; i < length; i++) { - for (int value : intValues[i]) { - intSet.add(value); - } - } - break; - case LONG: - LongOpenHashSet longSet = (LongOpenHashSet) valueSet; - long[][] longValues = blockValSet.getLongValuesMV(); - for (int i = 0; i < length; i++) { - for (long value : longValues[i]) { - longSet.add(value); - } - } - break; - case FLOAT: - FloatOpenHashSet floatSet = (FloatOpenHashSet) valueSet; - float[][] floatValues = blockValSet.getFloatValuesMV(); - for (int i = 0; i < length; i++) { - for (float value : floatValues[i]) { - floatSet.add(value); - } - } - break; - case DOUBLE: - DoubleOpenHashSet doubleSet = (DoubleOpenHashSet) valueSet; - double[][] doubleValues = blockValSet.getDoubleValuesMV(); - for (int i = 0; i < length; i++) { - for (double value : doubleValues[i]) { - doubleSet.add(value); - } - } - break; - case STRING: - ObjectOpenHashSet<String> stringSet = (ObjectOpenHashSet<String>) valueSet; - String[][] stringValues = blockValSet.getStringValuesMV(); - for (int i = 0; i < length; i++) { - //noinspection ManualArrayToCollectionCopy - for (String value : stringValues[i]) { - //noinspection UseBulkOperation - stringSet.add(value); - } - } - break; - default: - throw new IllegalStateException("Illegal data type for DISTINCT_COUNT_MV aggregation function: " + storedType); - } + mvAggregate(length, aggregationResultHolder, blockValSetMap); } @Override public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, Map<ExpressionContext, BlockValSet> blockValSetMap) { - BlockValSet blockValSet = blockValSetMap.get(_expression); - - // For dictionary-encoded expression, store dictionary ids into the bitmap - Dictionary dictionary = blockValSet.getDictionary(); - if (dictionary != null) { - int[][] dictIds = blockValSet.getDictionaryIdsMV(); - for (int i = 0; i < length; i++) { - getDictIdBitmap(groupByResultHolder, groupKeyArray[i], dictionary).add(dictIds[i]); - } - return; - } - - // For non-dictionary-encoded expression, store values into the value set - DataType storedType = blockValSet.getValueType().getStoredType(); - switch (storedType) { - case INT: - int[][] intValues = blockValSet.getIntValuesMV(); - for (int i = 0; i < length; i++) { - IntOpenHashSet intSet = (IntOpenHashSet) getValueSet(groupByResultHolder, groupKeyArray[i], DataType.INT); - for (int value : intValues[i]) { - intSet.add(value); - } - } - break; - case LONG: - long[][] longValues = blockValSet.getLongValuesMV(); - for (int i = 0; i < length; i++) { - LongOpenHashSet longSet = (LongOpenHashSet) getValueSet(groupByResultHolder, groupKeyArray[i], DataType.LONG); - for (long value : longValues[i]) { - longSet.add(value); - } - } - break; - case FLOAT: - float[][] floatValues = blockValSet.getFloatValuesMV(); - for (int i = 0; i < length; i++) { - FloatOpenHashSet floatSet = - (FloatOpenHashSet) getValueSet(groupByResultHolder, groupKeyArray[i], DataType.FLOAT); - for (float value : floatValues[i]) { - floatSet.add(value); - } - } - break; - case DOUBLE: - double[][] doubleValues = blockValSet.getDoubleValuesMV(); - for (int i = 0; i < length; i++) { - DoubleOpenHashSet doubleSet = - (DoubleOpenHashSet) getValueSet(groupByResultHolder, groupKeyArray[i], DataType.DOUBLE); - for (double value : doubleValues[i]) { - doubleSet.add(value); - } - } - break; - case STRING: - String[][] stringValues = blockValSet.getStringValuesMV(); - for (int i = 0; i < length; i++) { - ObjectOpenHashSet<String> stringSet = - (ObjectOpenHashSet<String>) getValueSet(groupByResultHolder, groupKeyArray[i], DataType.STRING); - //noinspection ManualArrayToCollectionCopy - for (String value : stringValues[i]) { - //noinspection UseBulkOperation - stringSet.add(value); - } - } - break; - default: - throw new IllegalStateException("Illegal data type for DISTINCT_COUNT_MV aggregation function: " + storedType); - } + mvAggregateGroupBySV(length, groupKeyArray, groupByResultHolder, blockValSetMap); } @Override public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, Map<ExpressionContext, BlockValSet> blockValSetMap) { - BlockValSet blockValSet = blockValSetMap.get(_expression); + mvAggregateGroupByMV(length, groupKeysArray, groupByResultHolder, blockValSetMap); + } - // For dictionary-encoded expression, store dictionary ids into the bitmap - Dictionary dictionary = blockValSet.getDictionary(); - if (dictionary != null) { - int[][] dictIds = blockValSet.getDictionaryIdsMV(); - for (int i = 0; i < length; i++) { - for (int groupKey : groupKeysArray[i]) { - getDictIdBitmap(groupByResultHolder, groupKey, dictionary).add(dictIds[i]); - } - } - return; - } + @Override + public DataSchema.ColumnDataType getFinalResultColumnType() { + return DataSchema.ColumnDataType.INT; + } - // For non-dictionary-encoded expression, store hash code of the values into the value set - DataType storedType = blockValSet.getValueType().getStoredType(); - switch (storedType) { - case INT: - int[][] intValues = blockValSet.getIntValuesMV(); - for (int i = 0; i < length; i++) { - for (int groupKey : groupKeysArray[i]) { - IntOpenHashSet intSet = (IntOpenHashSet) getValueSet(groupByResultHolder, groupKey, DataType.INT); - for (int value : intValues[i]) { - intSet.add(value); - } - } - } - break; - case LONG: - long[][] longValues = blockValSet.getLongValuesMV(); - for (int i = 0; i < length; i++) { - for (int groupKey : groupKeysArray[i]) { - LongOpenHashSet longSet = (LongOpenHashSet) getValueSet(groupByResultHolder, groupKey, DataType.LONG); - for (long value : longValues[i]) { - longSet.add(value); - } - } - } - break; - case FLOAT: - float[][] floatValues = blockValSet.getFloatValuesMV(); - for (int i = 0; i < length; i++) { - for (int groupKey : groupKeysArray[i]) { - FloatOpenHashSet floatSet = (FloatOpenHashSet) getValueSet(groupByResultHolder, groupKey, DataType.FLOAT); - for (float value : floatValues[i]) { - floatSet.add(value); - } - } - } - break; - case DOUBLE: - double[][] doubleValues = blockValSet.getDoubleValuesMV(); - for (int i = 0; i < length; i++) { - for (int groupKey : groupKeysArray[i]) { - DoubleOpenHashSet doubleSet = - (DoubleOpenHashSet) getValueSet(groupByResultHolder, groupKey, DataType.DOUBLE); - for (double value : doubleValues[i]) { - doubleSet.add(value); - } - } - } - break; - case STRING: - String[][] stringValues = blockValSet.getStringValuesMV(); - for (int i = 0; i < length; i++) { - for (int groupKey : groupKeysArray[i]) { - ObjectOpenHashSet<String> stringSet = - (ObjectOpenHashSet<String>) getValueSet(groupByResultHolder, groupKey, DataType.STRING); - //noinspection ManualArrayToCollectionCopy - for (String value : stringValues[i]) { - //noinspection UseBulkOperation - stringSet.add(value); - } - } - } - break; - default: - throw new IllegalStateException("Illegal data type for DISTINCT_COUNT_MV aggregation function: " + storedType); - } + @Override + public Integer extractFinalResult(Set intermediateResult) { + return intermediateResult.size(); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumAggregationFunction.java index b42bc13b08..f3a6c55805 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumAggregationFunction.java @@ -18,13 +18,18 @@ */ package org.apache.pinot.core.query.aggregation.function; +import java.util.Map; import java.util.Set; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.segment.spi.AggregationFunctionType; + /** - * Aggregation function to compute the sum of distinct values. + * Aggregation function to compute the sum of distinct values for an SV column. */ public class DistinctSumAggregationFunction extends BaseDistinctAggregateAggregationFunction<Double> { @@ -32,6 +37,24 @@ public class DistinctSumAggregationFunction extends BaseDistinctAggregateAggrega super(expression, AggregationFunctionType.DISTINCTSUM); } + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + svAggregate(length, aggregationResultHolder, blockValSetMap); + } + + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + svAggregateGroupBySV(length, groupKeyArray, groupByResultHolder, blockValSetMap); + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + svAggregateGroupByMV(length, groupKeysArray, groupByResultHolder, blockValSetMap); + } + @Override public DataSchema.ColumnDataType getFinalResultColumnType() { return DataSchema.ColumnDataType.DOUBLE; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumMVAggregationFunction.java similarity index 52% copy from pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumAggregationFunction.java copy to pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumMVAggregationFunction.java index b42bc13b08..3ffd0acd16 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctSumMVAggregationFunction.java @@ -18,18 +18,41 @@ */ package org.apache.pinot.core.query.aggregation.function; +import java.util.Map; import java.util.Set; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.segment.spi.AggregationFunctionType; + /** - * Aggregation function to compute the sum of distinct values. + * Aggregation function to compute the sum of distinct values for an MV column. */ -public class DistinctSumAggregationFunction extends BaseDistinctAggregateAggregationFunction<Double> { +public class DistinctSumMVAggregationFunction extends BaseDistinctAggregateAggregationFunction<Double> { + + public DistinctSumMVAggregationFunction(ExpressionContext expression) { + super(expression, AggregationFunctionType.DISTINCTSUMMV); + } + + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + mvAggregate(length, aggregationResultHolder, blockValSetMap); + } - public DistinctSumAggregationFunction(ExpressionContext expression) { - super(expression, AggregationFunctionType.DISTINCTSUM); + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + mvAggregateGroupBySV(length, groupKeyArray, groupByResultHolder, blockValSetMap); + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + mvAggregateGroupByMV(length, groupKeysArray, groupByResultHolder, blockValSetMap); } @Override diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java index b04cefb875..d74a81fa87 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java @@ -108,6 +108,8 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest { private final static String COL1_SORTED_INDEX = "sortedIndexCol1"; private final static String COL1_JSON_INDEX = "jsonIndexCol1"; private final static String COL1_TEXT_INDEX = "textIndexCol1"; + private final static String MV_COL1_RAW = "mvRawCol1"; + private final static String MV_COL1_NO_INDEX = "mvNoIndexCol1"; private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME) .addSingleValueDimension(COL1_RAW, FieldSpec.DataType.INT) @@ -123,14 +125,16 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest { .addSingleValueDimension(COL3_RANGE_INDEX, FieldSpec.DataType.INT) .addSingleValueDimension(COL1_SORTED_INDEX, FieldSpec.DataType.DOUBLE) .addSingleValueDimension(COL1_JSON_INDEX, FieldSpec.DataType.JSON) - .addSingleValueDimension(COL1_TEXT_INDEX, FieldSpec.DataType.STRING).build(); + .addSingleValueDimension(COL1_TEXT_INDEX, FieldSpec.DataType.STRING) + .addMultiValueDimension(MV_COL1_RAW, FieldSpec.DataType.INT) + .addMultiValueDimension(MV_COL1_NO_INDEX, FieldSpec.DataType.INT).build(); private static final DataSchema DATA_SCHEMA = new DataSchema(new String[]{"Operator", "Operator_Id", "Parent_Id"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT}); private static final TableConfig TABLE_CONFIG = - new TableConfigBuilder(TableType.OFFLINE).setNoDictionaryColumns(Arrays.asList(COL1_RAW)) + new TableConfigBuilder(TableType.OFFLINE).setNoDictionaryColumns(Arrays.asList(COL1_RAW, MV_COL1_RAW)) .setTableName(RAW_TABLE_NAME).build(); private IndexSegment _indexSegment; @@ -159,7 +163,7 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest { GenericRow createMockRecord(int noIndexCol1, int noIndexCol2, int noIndexCol3, boolean noIndexCol4, double invertedIndexCol1, int invertedIndexCol2, String intervedIndexCol3, double rangeIndexCol1, int rangeIndexCol2, int rangeIndexCol3, double sortedIndexCol1, String jsonIndexCol1, - String textIndexCol1, int rawCol1) { + String textIndexCol1, int rawCol1, Object[] mvRawCol1, Object[] mvNoIndexCol1) { GenericRow record = new GenericRow(); record.putValue(COL1_RAW, rawCol1); @@ -182,6 +186,9 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest { record.putValue(COL1_JSON_INDEX, jsonIndexCol1); record.putValue(COL1_TEXT_INDEX, textIndexCol1); + record.putValue(MV_COL1_RAW, mvRawCol1); + record.putValue(MV_COL1_NO_INDEX, mvNoIndexCol1); + return record; } @@ -232,38 +239,49 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest { List<GenericRow> records = new ArrayList<>(NUM_RECORDS); records.add(createMockRecord(1, 2, 3, true, 1.1, 2, "daffy", 10.1, 20, 30, 100.1, - "{\"first\": \"daffy\", \"last\": " + "\"duck\"}", "daffy", 1)); + "{\"first\": \"daffy\", \"last\": " + "\"duck\"}", "daffy", 1, new Object[]{1, 2, 3}, new Object[]{1, 2, 3})); records.add(createMockRecord(0, 1, 2, false, 0.1, 1, "mickey", 0.1, 10, 20, 100.2, - "{\"first\": \"mickey\", \"last\": " + "\"mouse\"}", "mickey", 0)); + "{\"first\": \"mickey\", \"last\": " + "\"mouse\"}", "mickey", 0, new Object[]{2, 3, 4}, + new Object[]{2, 3, 4})); records.add(createMockRecord(3, 4, 5, true, 2.1, 3, "mickey", 20.1, 30, 40, 100.3, - "{\"first\": \"mickey\", \"last\": " + "\"mouse\"}", "mickey", 3)); + "{\"first\": \"mickey\", \"last\": " + "\"mouse\"}", "mickey", 3, new Object[]{3, 4, 5}, + new Object[]{3, 4, 5})); ImmutableSegment immutableSegment1 = createImmutableSegment(records, SEGMENT_NAME_1); List<GenericRow> records2 = new ArrayList<>(NUM_RECORDS); records2.add(createMockRecord(5, 2, 3, true, 1.1, 2, "pluto", 10.1, 20, 30, 100.1, - "{\"first\": \"pluto\", \"last\": " + "\"dog\"}", "pluto", 5)); + "{\"first\": \"pluto\", \"last\": " + "\"dog\"}", "pluto", 5, new Object[]{100, 200, 300}, + new Object[]{100, 200, 300})); records2.add(createMockRecord(6, 1, 2, false, 0.1, 1, "pluto", 0.1, 10, 20, 100.2, - "{\"first\": \"pluto\", \"last\": " + "\"dog\"}", "pluto", 6)); + "{\"first\": \"pluto\", \"last\": " + "\"dog\"}", "pluto", 6, new Object[]{200, 300, 400}, + new Object[]{200, 300, 400})); records2.add(createMockRecord(8, 4, 5, true, 2.1, 3, "pluto", 20.1, 30, 40, 100.3, - "{\"first\": \"pluto\", \"last\": " + "\"dog\"}", "pluto", 8)); + "{\"first\": \"pluto\", \"last\": " + "\"dog\"}", "pluto", 8, new Object[]{300, 400, 500}, + new Object[]{300, 400, 500})); ImmutableSegment immutableSegment2 = createImmutableSegment(records2, SEGMENT_NAME_2); List<GenericRow> records3 = new ArrayList<>(NUM_RECORDS); records3.add(createMockRecord(5, 2, 3, true, 1.5, 2, "donald", 10.1, 20, 30, 100.1, - "{\"first\": \"donald\", \"last\": " + "\"duck\"}", "donald", 1)); + "{\"first\": \"donald\", \"last\": " + "\"duck\"}", "donald", 1, new Object[]{100, 200, 300}, + new Object[]{100, 200, 300})); records3.add(createMockRecord(6, 1, 2, false, 0.1, 1, "goofy", 0.1, 10, 20, 100.2, - "{\"first\": \"goofy\", \"last\": " + "\"dog\"}", "goofy", 1)); + "{\"first\": \"goofy\", \"last\": " + "\"dog\"}", "goofy", 1, new Object[]{100, 200, 300}, + new Object[]{100, 200, 300})); records3.add(createMockRecord(7, 4, 5, true, 2.1, 3, "minnie", 20.1, 30, 40, 100.3, - "{\"first\": \"minnie\", \"last\": " + "\"mouse\"}", "minnie", 1)); + "{\"first\": \"minnie\", \"last\": " + "\"mouse\"}", "minnie", 1, new Object[]{1000, 2000, 3000}, + new Object[]{1000, 2000, 3000})); ImmutableSegment immutableSegment3 = createImmutableSegment(records3, SEGMENT_NAME_3); List<GenericRow> records4 = new ArrayList<>(NUM_RECORDS); records4.add(createMockRecord(5, 2, 3, true, 1.1, 2, "tweety", 10.1, 20, 30, 100.1, - "{\"first\": \"tweety\", \"last\": " + "\"bird\"}", "tweety", 5)); + "{\"first\": \"tweety\", \"last\": " + "\"bird\"}", "tweety", 5, new Object[]{100, 200, 300}, + new Object[]{100, 200, 300})); records4.add(createMockRecord(6, 1, 2, false, 0.1, 1, "bugs", 0.1, 10, 20, 100.2, - "{\"first\": \"bugs\", \"last\": " + "\"bunny\"}", "bugs", 6)); + "{\"first\": \"bugs\", \"last\": " + "\"bunny\"}", "bugs", 6, new Object[]{100, 200, 300}, + new Object[]{100, 200, 300})); records4.add(createMockRecord(7, 4, 5, true, 2.1, 3, "sylvester", 20.1, 30, 40, 100.3, - "{\"first\": \"sylvester\", \"last\": " + "\"cat\"}", "sylvester", 7)); + "{\"first\": \"sylvester\", \"last\": " + "\"cat\"}", "sylvester", 7, new Object[]{1000, 2000, 3000}, + new Object[]{1000, 2000, 3000})); ImmutableSegment immutableSegment4 = createImmutableSegment(records4, SEGMENT_NAME_4); _indexSegment = immutableSegment1; @@ -371,15 +389,16 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest { result1.add(new Object[]{"PLAN_START(numSegmentsForThisPlan:4)", ExplainPlanRows.PLAN_START_IDS, ExplainPlanRows.PLAN_START_IDS}); result1.add(new Object[]{ - "SELECT(selectList:invertedIndexCol1, invertedIndexCol2, invertedIndexCol3, jsonIndexCol1, " - + "noIndexCol1, noIndexCol2, noIndexCol3, noIndexCol4, rangeIndexCol1, rangeIndexCol2, rangeIndexCol3, " - + "rawCol1, sortedIndexCol1, textIndexCol1)", 3, 2}); + "SELECT(selectList:invertedIndexCol1, invertedIndexCol2, invertedIndexCol3, jsonIndexCol1, mvNoIndexCol1, " + + "mvRawCol1, noIndexCol1, noIndexCol2, noIndexCol3, noIndexCol4, rangeIndexCol1, rangeIndexCol2, " + + "rangeIndexCol3, rawCol1, sortedIndexCol1, textIndexCol1)", 3, 2}); result1.add(new Object[]{"TRANSFORM_PASSTHROUGH(invertedIndexCol1, invertedIndexCol2, invertedIndexCol3, " - + "jsonIndexCol1, noIndexCol1, noIndexCol2, noIndexCol3, noIndexCol4, rangeIndexCol1, rangeIndexCol2, " - + "rangeIndexCol3, rawCol1, sortedIndexCol1, textIndexCol1)", 4, 3}); - result1.add(new Object[]{"PROJECT(noIndexCol4, rawCol1, sortedIndexCol1, noIndexCol3, rangeIndexCol1, " - + "rangeIndexCol2, invertedIndexCol1, noIndexCol2, invertedIndexCol2, noIndexCol1, rangeIndexCol3, " - + "textIndexCol1, jsonIndexCol1, invertedIndexCol3)", 5, 4}); + + "jsonIndexCol1, mvNoIndexCol1, mvRawCol1, noIndexCol1, noIndexCol2, noIndexCol3, noIndexCol4, " + + "rangeIndexCol1, rangeIndexCol2, rangeIndexCol3, rawCol1, sortedIndexCol1, textIndexCol1)", 4, 3}); + result1.add(new Object[]{"PROJECT(noIndexCol4, rawCol1, sortedIndexCol1, noIndexCol3, mvNoIndexCol1" + + ", rangeIndexCol1, rangeIndexCol2, invertedIndexCol1, noIndexCol2, invertedIndexCol2, noIndexCol1, " + + "rangeIndexCol3, textIndexCol1, mvRawCol1, jsonIndexCol1, invertedIndexCol3)", 5, 4 + }); result1.add(new Object[]{"DOC_ID_SET", 6, 5}); result1.add(new Object[]{"FILTER_MATCH_ENTIRE_SEGMENT(docs:3)", 7, 6}); check(query1, new ResultTable(DATA_SCHEMA, result1)); @@ -435,15 +454,19 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest { result1.add(new Object[]{"PLAN_START(numSegmentsForThisPlan:4)", ExplainPlanRows.PLAN_START_IDS, ExplainPlanRows.PLAN_START_IDS}); result1.add(new Object[]{ - "SELECT(selectList:invertedIndexCol1, invertedIndexCol2, invertedIndexCol3, jsonIndexCol1, " - + "noIndexCol1, noIndexCol2, noIndexCol3, noIndexCol4, rangeIndexCol1, rangeIndexCol2, rangeIndexCol3, " - + "rawCol1, sortedIndexCol1, textIndexCol1)", 3, 2}); + "SELECT(selectList:invertedIndexCol1, invertedIndexCol2, invertedIndexCol3, jsonIndexCol1, mvNoIndexCol1, " + + "mvRawCol1, noIndexCol1, noIndexCol2, noIndexCol3, noIndexCol4, rangeIndexCol1, rangeIndexCol2, " + + "rangeIndexCol3, rawCol1, sortedIndexCol1, textIndexCol1)", 3, 2 + }); result1.add(new Object[]{"TRANSFORM_PASSTHROUGH(invertedIndexCol1, invertedIndexCol2, invertedIndexCol3, " - + "jsonIndexCol1, noIndexCol1, noIndexCol2, noIndexCol3, noIndexCol4, rangeIndexCol1, rangeIndexCol2, " - + "rangeIndexCol3, rawCol1, sortedIndexCol1, textIndexCol1)", 4, 3}); - result1.add(new Object[]{"PROJECT(noIndexCol4, rawCol1, sortedIndexCol1, noIndexCol3, rangeIndexCol1, " - + "rangeIndexCol2, invertedIndexCol1, noIndexCol2, invertedIndexCol2, noIndexCol1, rangeIndexCol3, " - + "textIndexCol1, jsonIndexCol1, invertedIndexCol3)", 5, 4}); + + "jsonIndexCol1, mvNoIndexCol1, mvRawCol1, noIndexCol1, noIndexCol2, noIndexCol3, noIndexCol4, " + + "rangeIndexCol1, rangeIndexCol2, rangeIndexCol3, rawCol1, sortedIndexCol1, textIndexCol1)", 4, 3 + }); + result1.add(new Object[]{ + "PROJECT(noIndexCol4, rawCol1, sortedIndexCol1, noIndexCol3, mvNoIndexCol1, " + + "rangeIndexCol1, rangeIndexCol2, invertedIndexCol1, noIndexCol2, invertedIndexCol2, noIndexCol1, " + + "rangeIndexCol3, textIndexCol1, mvRawCol1, jsonIndexCol1, invertedIndexCol3)", 5, 4 + }); result1.add(new Object[]{"DOC_ID_SET", 6, 5}); result1.add(new Object[]{"FILTER_MATCH_ENTIRE_SEGMENT(docs:3)", 7, 6}); check(query1, new ResultTable(DATA_SCHEMA, result1)); @@ -1703,19 +1726,42 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest { result5.add(new Object[]{"AGGREGATE_NO_SCAN", 3, 2}); check(query5, new ResultTable(DATA_SCHEMA, result5)); - // Full scan required for distinctavg as the column does not have a dictionary. - String query6 = "EXPLAIN PLAN FOR SELECT DISTINCTAVG(rawCol1) FROM testTable"; + String query6 = "EXPLAIN PLAN FOR SELECT DISTINCTSUMMV(mvNoIndexCol1) FROM testTable"; List<Object[]> result6 = new ArrayList<>(); result6.add(new Object[]{"BROKER_REDUCE(limit:10)", 1, 0}); result6.add(new Object[]{"COMBINE_AGGREGATE", 2, 1}); - result6.add(new Object[]{"PLAN_START(numSegmentsForThisPlan:4)", ExplainPlanRows.PLAN_START_IDS, - ExplainPlanRows.PLAN_START_IDS}); - result6.add(new Object[]{"AGGREGATE(aggregations:distinctAvg(rawCol1))", 3, 2}); - result6.add(new Object[]{"TRANSFORM_PASSTHROUGH(rawCol1)", 4, 3}); - result6.add(new Object[]{"PROJECT(rawCol1)", 5, 4}); - result6.add(new Object[]{"DOC_ID_SET", 6, 5}); - result6.add(new Object[]{"FILTER_MATCH_ENTIRE_SEGMENT(docs:3)", 7, 6}); + result6.add(new Object[]{ + "PLAN_START(numSegmentsForThisPlan:4)", ExplainPlanRows.PLAN_START_IDS, ExplainPlanRows.PLAN_START_IDS + }); + result6.add(new Object[]{"AGGREGATE_NO_SCAN", 3, 2}); check(query6, new ResultTable(DATA_SCHEMA, result6)); + + // Full scan required for distinctavg as the column does not have a dictionary. + String query7 = "EXPLAIN PLAN FOR SELECT DISTINCTAVG(rawCol1) FROM testTable"; + List<Object[]> result7 = new ArrayList<>(); + result7.add(new Object[]{"BROKER_REDUCE(limit:10)", 1, 0}); + result7.add(new Object[]{"COMBINE_AGGREGATE", 2, 1}); + result7.add(new Object[]{"PLAN_START(numSegmentsForThisPlan:4)", ExplainPlanRows.PLAN_START_IDS, + ExplainPlanRows.PLAN_START_IDS}); + result7.add(new Object[]{"AGGREGATE(aggregations:distinctAvg(rawCol1))", 3, 2}); + result7.add(new Object[]{"TRANSFORM_PASSTHROUGH(rawCol1)", 4, 3}); + result7.add(new Object[]{"PROJECT(rawCol1)", 5, 4}); + result7.add(new Object[]{"DOC_ID_SET", 6, 5}); + result7.add(new Object[]{"FILTER_MATCH_ENTIRE_SEGMENT(docs:3)", 7, 6}); + check(query7, new ResultTable(DATA_SCHEMA, result7)); + + String query8 = "EXPLAIN PLAN FOR SELECT DISTINCTAVGMV(mvRawCol1) FROM testTable"; + List<Object[]> result8 = new ArrayList<>(); + result8.add(new Object[]{"BROKER_REDUCE(limit:10)", 1, 0}); + result8.add(new Object[]{"COMBINE_AGGREGATE", 2, 1}); + result8.add(new Object[]{"PLAN_START(numSegmentsForThisPlan:4)", ExplainPlanRows.PLAN_START_IDS, + ExplainPlanRows.PLAN_START_IDS}); + result8.add(new Object[]{"AGGREGATE(aggregations:distinctAvgMV(mvRawCol1))", 3, 2}); + result8.add(new Object[]{"TRANSFORM_PASSTHROUGH(mvRawCol1)", 4, 3}); + result8.add(new Object[]{"PROJECT(mvRawCol1)", 5, 4}); + result8.add(new Object[]{"DOC_ID_SET", 6, 5}); + result8.add(new Object[]{"FILTER_MATCH_ENTIRE_SEGMENT(docs:3)", 7, 6}); + check(query8, new ResultTable(DATA_SCHEMA, result8)); } diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java index 760c1c78c1..4d32358814 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java @@ -121,7 +121,8 @@ public class InterSegmentAggregationMultiValueQueriesTest extends BaseMultiValue public void testMaxMV() { String query = "SELECT MAXMV(column6) AS value FROM testTable"; - // Without filter, query should be answered by DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0) + // Without filter, query should be answered by NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0) + // for dictionary based columns. BrokerResponseNative brokerResponse = getBrokerResponse(query); DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new ColumnDataType[]{ColumnDataType.DOUBLE}); ResultTable expectedResultTable = @@ -148,7 +149,8 @@ public class InterSegmentAggregationMultiValueQueriesTest extends BaseMultiValue public void testMinMV() { String query = "SELECT MINMV(column6) AS value FROM testTable"; - // Without filter, query should be answered by DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0) + // Without filter, query should be answered by NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0) + // for dictionary based columns. BrokerResponseNative brokerResponse = getBrokerResponse(query); DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new ColumnDataType[]{ColumnDataType.DOUBLE}); Object[] expectedResults = new Object[]{1001.0}; @@ -245,7 +247,8 @@ public class InterSegmentAggregationMultiValueQueriesTest extends BaseMultiValue public void testMinMaxRangeMV() { String query = "SELECT MINMAXRANGEMV(column6) AS value FROM testTable"; - // Without filter, query should be answered by DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0) + // Without filter, query should be answered by NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0) + // for dictionary based columns. BrokerResponseNative brokerResponse = getBrokerResponse(query); DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new ColumnDataType[]{ColumnDataType.DOUBLE}); Object[] expectedResults = new Object[]{2147482646.0}; @@ -277,7 +280,8 @@ public class InterSegmentAggregationMultiValueQueriesTest extends BaseMultiValue public void testDistinctCountMV() { String query = "SELECT DISTINCTCOUNTMV(column6) AS value FROM testTable"; - // Without filter, query should be answered by DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0) + // Without filter, query should be answered by NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0) + // for dictionary based columns. BrokerResponseNative brokerResponse = getBrokerResponse(query); DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new ColumnDataType[]{ColumnDataType.INT}); Object[] expectedResults = new Object[]{18499}; @@ -305,11 +309,78 @@ public class InterSegmentAggregationMultiValueQueriesTest extends BaseMultiValue QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 124960L, 400000L, expectedResultTable); } + @Test + public void testDistinctSumMV() { + String query = "SELECT DISTINCTSUMMV(column6) AS value FROM testTable"; + + // Without filter, query should be answered by NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0) + // for dictionary based columns. + BrokerResponseNative brokerResponse = getBrokerResponse(query); + DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new ColumnDataType[]{ColumnDataType.DOUBLE}); + Object[] expectedResults = new Object[]{24592775810.0}; + ResultTable expectedResultTable = new ResultTable(expectedDataSchema, Collections.singletonList(expectedResults)); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 0L, 400000L, expectedResultTable); + + brokerResponse = getBrokerResponse(query + FILTER); + expectedResults[0] = 2578123532.0; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 62480L, 400000L, expectedResultTable); + + brokerResponse = getBrokerResponse(query + SV_GROUP_BY); + expectedResults[0] = 6304833321.0; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 800000L, 400000L, expectedResultTable); + + brokerResponse = getBrokerResponse(query + FILTER + SV_GROUP_BY); + expectedResults[0] = 2578123532.0; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 124960L, 400000L, expectedResultTable); + + brokerResponse = getBrokerResponse(query + MV_GROUP_BY); + expectedResults[0] = 8999975927.0; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 800000L, 400000L, expectedResultTable); + + brokerResponse = getBrokerResponse(query + FILTER + MV_GROUP_BY); + expectedResults[0] = 2478539095.0; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 124960L, 400000L, expectedResultTable); + } + + @Test + public void testDistinctAvgMV() { + String query = "SELECT DISTINCTAVGMV(column6) AS value FROM testTable"; + + // Without filter, query should be answered by NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0) + // for dictionary based columns. + BrokerResponseNative brokerResponse = getBrokerResponse(query); + DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new ColumnDataType[]{ColumnDataType.DOUBLE}); + Object[] expectedResults = new Object[]{1329411.0930320558}; + ResultTable expectedResultTable = new ResultTable(expectedDataSchema, Collections.singletonList(expectedResults)); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 0L, 400000L, expectedResultTable); + + brokerResponse = getBrokerResponse(query + FILTER); + expectedResults[0] = 2173797.244519393; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 62480L, 400000L, expectedResultTable); + + brokerResponse = getBrokerResponse(query + SV_GROUP_BY); + expectedResults[0] = 2147483647.0; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 800000L, 400000L, expectedResultTable); + + brokerResponse = getBrokerResponse(query + FILTER + SV_GROUP_BY); + expectedResults[0] = 2147483647.0; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 124960L, 400000L, expectedResultTable); + + brokerResponse = getBrokerResponse(query + MV_GROUP_BY); + expectedResults[0] = 2147483647.0; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 800000L, 400000L, expectedResultTable); + + brokerResponse = getBrokerResponse(query + FILTER + MV_GROUP_BY); + expectedResults[0] = 2147483647.0; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 124960L, 400000L, expectedResultTable); + } + @Test public void testDistinctCountHLLMV() { String query = "SELECT DISTINCTCOUNTHLLMV(column6) AS value FROM testTable"; - // Without filter, query should be answered by DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0) + // Without filter, query should be answered by NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0) + // for dictionary based columns. BrokerResponseNative brokerResponse = getBrokerResponse(query); DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new ColumnDataType[]{ColumnDataType.LONG}); Object[] expectedResults = new Object[]{20039L}; @@ -343,7 +414,8 @@ public class InterSegmentAggregationMultiValueQueriesTest extends BaseMultiValue Function<Object, Object> cardinalityExtractor = value -> ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(BytesUtils.toBytes((String) value)).cardinality(); - // Without filter, query should be answered by DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0) + // Without filter, query should be answered by NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0) + // for dictionary based columns. BrokerResponseNative brokerResponse = getBrokerResponse(query); DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new ColumnDataType[]{ColumnDataType.LONG}); Object[] expectedResults = new Object[]{20039L}; diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java index fc2aec9bc9..8f53122bd4 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java @@ -82,6 +82,8 @@ public enum AggregationFunctionType { DISTINCTCOUNTBITMAPMV("distinctCountBitmapMV"), DISTINCTCOUNTHLLMV("distinctCountHLLMV"), DISTINCTCOUNTRAWHLLMV("distinctCountRawHLLMV"), + DISTINCTSUMMV("distinctSumMV"), + DISTINCTAVGMV("distinctAvgMV"), PERCENTILEMV("percentileMV"), PERCENTILEESTMV("percentileEstMV"), PERCENTILERAWESTMV("percentileRawEstMV"), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org