This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 2d1f7dc Add functions to return raw results for Percentile TDigest and Est (#7226) 2d1f7dc is described below commit 2d1f7dc9060d45ac9f4ff0621da62790ea9b8e64 Author: lakshmanan-v <85200301+lakshmana...@users.noreply.github.com> AuthorDate: Tue Aug 24 13:54:41 2021 -0700 Add functions to return raw results for Percentile TDigest and Est (#7226) Adding aggregate functions to return serialized / raw values of PercentileEst (QuantileDigest) and PercentileTDigest (TDigest) data structures. --- .../function/AggregationFunctionTypeTest.java | 8 ++ .../function/AggregationFunctionFactory.java | 32 +++++ .../PercentileRawEstAggregationFunction.java | 141 ++++++++++++++++++++ .../PercentileRawEstMVAggregationFunction.java | 43 +++++++ .../PercentileRawTDigestAggregationFunction.java | 142 +++++++++++++++++++++ .../PercentileRawTDigestMVAggregationFunction.java | 43 +++++++ .../function/AggregationFunctionFactoryTest.java | 42 ++++++ .../apache/pinot/queries/ExpectedQueryResult.java | 56 ++++++++ ...terSegmentAggregationMultiValueQueriesTest.java | 113 ++++++++++++++++ ...erSegmentAggregationSingleValueQueriesTest.java | 118 +++++++++++++++++ .../org/apache/pinot/queries/QueriesTestUtils.java | 105 ++++++++++++--- .../local/aggregator/ValueAggregatorFactory.java | 4 + .../customobject/SerializedQuantileDigest.java | 50 ++++++++ .../local/customobject/SerializedTDigest.java | 49 +++++++ .../pinot/segment/spi/AggregationFunctionType.java | 12 ++ .../misc/AggregationFunctionColumnPairTest.java | 18 +++ 16 files changed, 956 insertions(+), 20 deletions(-) diff --git a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java index 5b1d640..042381b 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java @@ -48,6 +48,10 @@ public class AggregationFunctionTypeTest { AggregationFunctionType.PERCENTILEEST); Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("PeRcEnTiLeTdIgEsT99"), AggregationFunctionType.PERCENTILETDIGEST); + Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("PeRcEnTiLeRaWeSt90mV"), + AggregationFunctionType.PERCENTILERAWESTMV); + Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("PeRcEnTiLeRaWtDiGeSt95mV"), + AggregationFunctionType.PERCENTILERAWTDIGESTMV); Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("CoUnTMv"), AggregationFunctionType.COUNTMV); Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("MiNmV"), AggregationFunctionType.MINMV); Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("MaXmV"), AggregationFunctionType.MAXMV); @@ -67,6 +71,10 @@ public class AggregationFunctionTypeTest { AggregationFunctionType.PERCENTILEESTMV); Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("PeRcEnTiLeTdIgEsT95mV"), AggregationFunctionType.PERCENTILETDIGESTMV); + Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("PeRcEnTiLeRaWeSt50"), + AggregationFunctionType.PERCENTILERAWEST); + Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("PeRcEnTiLeRaWtDiGeSt99"), + AggregationFunctionType.PERCENTILERAWTDIGEST); } @Test(expectedExceptions = IllegalArgumentException.class) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java index ccd45fc..3285607 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java @@ -61,10 +61,18 @@ public class AggregationFunctionFactory { // PercentileEst String percentileString = remainingFunctionName.substring(3); return new PercentileEstAggregationFunction(firstArgument, parsePercentileToInt(percentileString)); + } else if (remainingFunctionName.matches("RAWEST\\d+")) { + // PercentileRawEst + String percentileString = remainingFunctionName.substring(6); + return new PercentileRawEstAggregationFunction(firstArgument, parsePercentileToInt(percentileString)); } else if (remainingFunctionName.matches("TDIGEST\\d+")) { // PercentileTDigest String percentileString = remainingFunctionName.substring(7); return new PercentileTDigestAggregationFunction(firstArgument, parsePercentileToInt(percentileString)); + } else if (remainingFunctionName.matches("RAWTDIGEST\\d+")) { + // PercentileRawTDigest + String percentileString = remainingFunctionName.substring(10); + return new PercentileRawTDigestAggregationFunction(firstArgument, parsePercentileToInt(percentileString)); } else if (remainingFunctionName.matches("\\d+MV")) { // PercentileMV String percentileString = remainingFunctionName.substring(0, remainingFunctionName.length() - 2); @@ -73,10 +81,18 @@ public class AggregationFunctionFactory { // PercentileEstMV String percentileString = remainingFunctionName.substring(3, remainingFunctionName.length() - 2); return new PercentileEstMVAggregationFunction(firstArgument, parsePercentileToInt(percentileString)); + } else if (remainingFunctionName.matches("RAWEST\\d+MV")) { + // PercentileRawEstMV + String percentileString = remainingFunctionName.substring(6, remainingFunctionName.length() - 2); + return new PercentileRawEstMVAggregationFunction(firstArgument, parsePercentileToInt(percentileString)); } else if (remainingFunctionName.matches("TDIGEST\\d+MV")) { // PercentileTDigestMV String percentileString = remainingFunctionName.substring(7, remainingFunctionName.length() - 2); return new PercentileTDigestMVAggregationFunction(firstArgument, parsePercentileToInt(percentileString)); + } else if (remainingFunctionName.matches("RAWTDIGEST\\d+MV")) { + // PercentileRawTDigestMV + String percentileString = remainingFunctionName.substring(10, remainingFunctionName.length() - 2); + return new PercentileRawTDigestMVAggregationFunction(firstArgument, parsePercentileToInt(percentileString)); } } else if (numArguments == 2) { // Double arguments percentile (e.g. percentile(foo, 99), percentileTDigest(bar, 95), etc.) where the @@ -90,10 +106,18 @@ public class AggregationFunctionFactory { // PercentileEst return new PercentileEstAggregationFunction(firstArgument, percentile); } + if (remainingFunctionName.equals("RAWEST")) { + // PercentileRawEst + return new PercentileRawEstAggregationFunction(firstArgument, percentile); + } if (remainingFunctionName.equals("TDIGEST")) { // PercentileTDigest return new PercentileTDigestAggregationFunction(firstArgument, percentile); } + if (remainingFunctionName.equals("RAWTDIGEST")) { + // PercentileRawTDigest + return new PercentileRawTDigestAggregationFunction(firstArgument, percentile); + } if (remainingFunctionName.equals("MV")) { // PercentileMV return new PercentileMVAggregationFunction(firstArgument, percentile); @@ -102,10 +126,18 @@ public class AggregationFunctionFactory { // PercentileEstMV return new PercentileEstMVAggregationFunction(firstArgument, percentile); } + if (remainingFunctionName.equals("RAWESTMV")) { + // PercentileRawEstMV + return new PercentileRawEstMVAggregationFunction(firstArgument, percentile); + } if (remainingFunctionName.equals("TDIGESTMV")) { // PercentileTDigestMV return new PercentileTDigestMVAggregationFunction(firstArgument, percentile); } + if (remainingFunctionName.equals("RAWTDIGESTMV")) { + // PercentileRawTDigestMV + return new PercentileRawTDigestMVAggregationFunction(firstArgument, percentile); + } } throw new IllegalArgumentException("Invalid percentile function: " + function); } else { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawEstAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawEstAggregationFunction.java new file mode 100644 index 0000000..dc5d435 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawEstAggregationFunction.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import java.util.Map; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.segment.local.customobject.QuantileDigest; +import org.apache.pinot.segment.local.customobject.SerializedQuantileDigest; +import org.apache.pinot.segment.spi.AggregationFunctionType; + + +/** + * The {@code PercentileRawEstAggregationFunction} returns the serialized {@code QuantileDigest} data structure of the + * {@code PercentileEstAggregationFunction}. + */ +public class PercentileRawEstAggregationFunction + extends BaseSingleInputAggregationFunction<QuantileDigest, SerializedQuantileDigest> { + private final PercentileEstAggregationFunction _percentileEstAggregationFunction; + + public PercentileRawEstAggregationFunction(ExpressionContext expressionContext, double percentile) { + this(expressionContext, new PercentileEstAggregationFunction(expressionContext, percentile)); + } + + public PercentileRawEstAggregationFunction(ExpressionContext expressionContext, int percentile) { + this(expressionContext, new PercentileEstAggregationFunction(expressionContext, percentile)); + } + + protected PercentileRawEstAggregationFunction(ExpressionContext expression, + PercentileEstAggregationFunction percentileEstAggregationFunction) { + super(expression); + _percentileEstAggregationFunction = percentileEstAggregationFunction; + } + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.PERCENTILERAWEST; + } + + @Override + public String getColumnName() { + final double percentile = _percentileEstAggregationFunction._percentile; + final int version = _percentileEstAggregationFunction._version; + final String type = getType().getName(); + + return version == 0 ? type + (int) percentile + "_" + _expression : type + percentile + "_" + _expression; + } + + @Override + public String getResultColumnName() { + final double percentile = _percentileEstAggregationFunction._percentile; + final int version = _percentileEstAggregationFunction._version; + final String type = getType().getName().toLowerCase(); + + return version == 0 ? type + (int) percentile + "(" + _expression + ")" + : type + "(" + _expression + ", " + percentile + ")"; + } + + @Override + public AggregationResultHolder createAggregationResultHolder() { + return _percentileEstAggregationFunction.createAggregationResultHolder(); + } + + @Override + public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) { + return _percentileEstAggregationFunction.createGroupByResultHolder(initialCapacity, maxCapacity); + } + + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + _percentileEstAggregationFunction.aggregate(length, aggregationResultHolder, blockValSetMap); + } + + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + _percentileEstAggregationFunction.aggregateGroupBySV(length, groupKeyArray, groupByResultHolder, blockValSetMap); + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + _percentileEstAggregationFunction + .aggregateGroupByMV(length, groupKeysArray, groupByResultHolder, blockValSetMap); + } + + @Override + public QuantileDigest extractAggregationResult(AggregationResultHolder aggregationResultHolder) { + return _percentileEstAggregationFunction.extractAggregationResult(aggregationResultHolder); + } + + @Override + public QuantileDigest extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { + return _percentileEstAggregationFunction.extractGroupByResult(groupByResultHolder, groupKey); + } + + @Override + public QuantileDigest merge(QuantileDigest intermediateResult1, QuantileDigest intermediateResult2) { + return _percentileEstAggregationFunction.merge(intermediateResult1, intermediateResult2); + } + + @Override + public boolean isIntermediateResultComparable() { + return _percentileEstAggregationFunction.isIntermediateResultComparable(); + } + + @Override + public ColumnDataType getIntermediateResultColumnType() { + return _percentileEstAggregationFunction.getIntermediateResultColumnType(); + } + + @Override + public ColumnDataType getFinalResultColumnType() { + return ColumnDataType.STRING; + } + + @Override + public SerializedQuantileDigest extractFinalResult(QuantileDigest intermediateResult) { + return new SerializedQuantileDigest(intermediateResult, _percentileEstAggregationFunction._percentile); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawEstMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawEstMVAggregationFunction.java new file mode 100644 index 0000000..2f2bcfd --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawEstMVAggregationFunction.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.segment.spi.AggregationFunctionType; + + +/** + * The {@code PercentileRawEstMVAggregationFunction} returns the serialized {@code QuantileDigest} data structure of the + * {@code PercentileEstMVAggregationFunction}. + */ +public class PercentileRawEstMVAggregationFunction extends PercentileRawEstAggregationFunction { + + public PercentileRawEstMVAggregationFunction(ExpressionContext expressionContext, int percentile) { + super(expressionContext, new PercentileEstMVAggregationFunction(expressionContext, percentile)); + } + + public PercentileRawEstMVAggregationFunction(ExpressionContext expressionContext, double percentile) { + super(expressionContext, new PercentileEstMVAggregationFunction(expressionContext, percentile)); + } + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.PERCENTILERAWESTMV; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawTDigestAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawTDigestAggregationFunction.java new file mode 100644 index 0000000..2724b21 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawTDigestAggregationFunction.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import com.tdunning.math.stats.TDigest; +import java.util.Map; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.segment.local.customobject.SerializedTDigest; +import org.apache.pinot.segment.spi.AggregationFunctionType; + + +/** + * The {@code PercentileRawTDigestAggregationFunction} returns the serialized {@code TDigest} data structure of the + * {@code PercentileEstAggregationFunction}. + */ +public class PercentileRawTDigestAggregationFunction + extends BaseSingleInputAggregationFunction<TDigest, SerializedTDigest> { + private final PercentileTDigestAggregationFunction _percentileTDigestAggregationFunction; + + public PercentileRawTDigestAggregationFunction(ExpressionContext expressionContext, int percentile) { + this(expressionContext, new PercentileTDigestAggregationFunction(expressionContext, percentile)); + } + + public PercentileRawTDigestAggregationFunction(ExpressionContext expressionContext, double percentile) { + this(expressionContext, new PercentileTDigestAggregationFunction(expressionContext, percentile)); + } + + protected PercentileRawTDigestAggregationFunction(ExpressionContext expression, + PercentileTDigestAggregationFunction percentileTDigestAggregationFunction) { + super(expression); + _percentileTDigestAggregationFunction = percentileTDigestAggregationFunction; + } + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.PERCENTILERAWTDIGEST; + } + + @Override + public String getColumnName() { + final double percentile = _percentileTDigestAggregationFunction._percentile; + final int version = _percentileTDigestAggregationFunction._version; + final String type = getType().getName(); + + return version == 0 ? type + (int) percentile + "_" + _expression : type + percentile + "_" + _expression; + } + + @Override + public String getResultColumnName() { + final double percentile = _percentileTDigestAggregationFunction._percentile; + final int version = _percentileTDigestAggregationFunction._version; + final String type = getType().getName().toLowerCase(); + + return version == 0 ? type + (int) percentile + "(" + _expression + ")" + : type + "(" + _expression + ", " + percentile + ")"; + } + + @Override + public AggregationResultHolder createAggregationResultHolder() { + return _percentileTDigestAggregationFunction.createAggregationResultHolder(); + } + + @Override + public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) { + return _percentileTDigestAggregationFunction.createGroupByResultHolder(initialCapacity, maxCapacity); + } + + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + _percentileTDigestAggregationFunction.aggregate(length, aggregationResultHolder, blockValSetMap); + } + + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + _percentileTDigestAggregationFunction + .aggregateGroupBySV(length, groupKeyArray, groupByResultHolder, blockValSetMap); + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + _percentileTDigestAggregationFunction + .aggregateGroupByMV(length, groupKeysArray, groupByResultHolder, blockValSetMap); + } + + @Override + public TDigest extractAggregationResult(AggregationResultHolder aggregationResultHolder) { + return _percentileTDigestAggregationFunction.extractAggregationResult(aggregationResultHolder); + } + + @Override + public TDigest extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { + return _percentileTDigestAggregationFunction.extractGroupByResult(groupByResultHolder, groupKey); + } + + @Override + public TDigest merge(TDigest intermediateResult1, TDigest intermediateResult2) { + return _percentileTDigestAggregationFunction.merge(intermediateResult1, intermediateResult2); + } + + @Override + public boolean isIntermediateResultComparable() { + return _percentileTDigestAggregationFunction.isIntermediateResultComparable(); + } + + @Override + public ColumnDataType getIntermediateResultColumnType() { + return _percentileTDigestAggregationFunction.getIntermediateResultColumnType(); + } + + @Override + public ColumnDataType getFinalResultColumnType() { + return ColumnDataType.STRING; + } + + @Override + public SerializedTDigest extractFinalResult(TDigest intermediateResult) { + return new SerializedTDigest(intermediateResult, _percentileTDigestAggregationFunction._percentile); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawTDigestMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawTDigestMVAggregationFunction.java new file mode 100644 index 0000000..4cf5cbb --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawTDigestMVAggregationFunction.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.segment.spi.AggregationFunctionType; + + +/** + * The {@code PercentileRawTDigestMVAggregationFunction} returns the serialized {@code TDigest} data structure of the + * {@code PercentileTDigestMVAggregationFunction}. + */ +public class PercentileRawTDigestMVAggregationFunction extends PercentileRawTDigestAggregationFunction { + + public PercentileRawTDigestMVAggregationFunction(ExpressionContext expressionContext, int percentile) { + super(expressionContext, new PercentileTDigestMVAggregationFunction(expressionContext, percentile)); + } + + public PercentileRawTDigestMVAggregationFunction(ExpressionContext expressionContext, double percentile) { + super(expressionContext, new PercentileTDigestMVAggregationFunction(expressionContext, percentile)); + } + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.PERCENTILERAWTDIGESTMV; + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java index b855806..e5e0e22 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java @@ -136,6 +136,13 @@ public class AggregationFunctionFactoryTest { assertEquals(aggregationFunction.getColumnName(), "percentileEst50_column"); assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + function = getFunction("PeRcEnTiLeRaWEsT50"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); + assertTrue(aggregationFunction instanceof PercentileRawEstAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.PERCENTILERAWEST); + assertEquals(aggregationFunction.getColumnName(), "percentileRawEst50_column"); + assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + function = getFunction("PeRcEnTiLeTdIgEsT99"); aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); assertTrue(aggregationFunction instanceof PercentileTDigestAggregationFunction); @@ -143,6 +150,13 @@ public class AggregationFunctionFactoryTest { assertEquals(aggregationFunction.getColumnName(), "percentileTDigest99_column"); assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + function = getFunction("PeRcEnTiLeRaWTdIgEsT99"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); + assertTrue(aggregationFunction instanceof PercentileRawTDigestAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.PERCENTILERAWTDIGEST); + assertEquals(aggregationFunction.getColumnName(), "percentileRawTDigest99_column"); + assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + function = getFunction("PeRcEnTiLe", "(column, 5)"); aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); assertTrue(aggregationFunction instanceof PercentileAggregationFunction); @@ -164,6 +178,13 @@ public class AggregationFunctionFactoryTest { assertEquals(aggregationFunction.getColumnName(), "percentileEst50.0_column"); assertEquals(aggregationFunction.getResultColumnName(), "percentileest(column, 50.0)"); + function = getFunction("PeRcEnTiLeRaWeSt", "(column, 50)"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); + assertTrue(aggregationFunction instanceof PercentileRawEstAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.PERCENTILERAWEST); + assertEquals(aggregationFunction.getColumnName(), "percentileRawEst50.0_column"); + assertEquals(aggregationFunction.getResultColumnName(), "percentilerawest(column, 50.0)"); + function = getFunction("PeRcEnTiLeEsT", "(column, 55.555)"); aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); assertTrue(aggregationFunction instanceof PercentileEstAggregationFunction); @@ -171,6 +192,13 @@ public class AggregationFunctionFactoryTest { assertEquals(aggregationFunction.getColumnName(), "percentileEst55.555_column"); assertEquals(aggregationFunction.getResultColumnName(), "percentileest(column, 55.555)"); + function = getFunction("PeRcEnTiLeRaWeSt", "(column, 55.555)"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); + assertTrue(aggregationFunction instanceof PercentileRawEstAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.PERCENTILERAWEST); + assertEquals(aggregationFunction.getColumnName(), "percentileRawEst55.555_column"); + assertEquals(aggregationFunction.getResultColumnName(), "percentilerawest(column, 55.555)"); + function = getFunction("PeRcEnTiLeTdIgEsT", "(column, 99)"); aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); assertTrue(aggregationFunction instanceof PercentileTDigestAggregationFunction); @@ -185,6 +213,20 @@ public class AggregationFunctionFactoryTest { assertEquals(aggregationFunction.getColumnName(), "percentileTDigest99.9999_column"); assertEquals(aggregationFunction.getResultColumnName(), "percentiletdigest(column, 99.9999)"); + function = getFunction("PeRcEnTiLeRaWtDiGeSt", "(column, 99)"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); + assertTrue(aggregationFunction instanceof PercentileRawTDigestAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.PERCENTILERAWTDIGEST); + assertEquals(aggregationFunction.getColumnName(), "percentileRawTDigest99.0_column"); + assertEquals(aggregationFunction.getResultColumnName(), "percentilerawtdigest(column, 99.0)"); + + function = getFunction("PeRcEnTiLeRaWtDiGeSt", "(column, 99.9999)"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); + assertTrue(aggregationFunction instanceof PercentileRawTDigestAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.PERCENTILERAWTDIGEST); + assertEquals(aggregationFunction.getColumnName(), "percentileRawTDigest99.9999_column"); + assertEquals(aggregationFunction.getResultColumnName(), "percentilerawtdigest(column, 99.9999)"); + function = getFunction("CoUnTmV"); aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); assertTrue(aggregationFunction instanceof CountMVAggregationFunction); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/ExpectedQueryResult.java b/pinot-core/src/test/java/org/apache/pinot/queries/ExpectedQueryResult.java new file mode 100644 index 0000000..520f2ee --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/queries/ExpectedQueryResult.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.queries; + +public class ExpectedQueryResult<T> { + private long _numDocsScanned; + private long _numEntriesScannedInFilter; + private long _numEntriesScannedPostFilter; + private long _numTotalDocs; + private T[] _results; + + public ExpectedQueryResult(long numDocsScanned, long numEntriesScannedInFilter, long numEntriesScannedPostFilter, + long numTotalDocs, T[] results) { + _numDocsScanned = numDocsScanned; + _numEntriesScannedInFilter = numEntriesScannedInFilter; + _numEntriesScannedPostFilter = numEntriesScannedPostFilter; + _numTotalDocs = numTotalDocs; + _results = results; + } + + public long getNumDocsScanned() { + return _numDocsScanned; + } + + public long getNumEntriesScannedInFilter() { + return _numEntriesScannedInFilter; + } + + public long getNumEntriesScannedPostFilter() { + return _numEntriesScannedPostFilter; + } + + public long getNumTotalDocs() { + return _numTotalDocs; + } + + public T[] getResults() { + return _results; + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java index 4856eab..6c2b730 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java @@ -42,6 +42,9 @@ public class InterSegmentAggregationMultiValueQueriesTest extends BaseMultiValue private static final String MV_GROUP_BY = " group by column7"; private static final String ORDER_BY_ALIAS = " order by cnt_column6 DESC"; + // Allow 5% quantile error due to the randomness of TDigest merge + private static final double PERCENTILE_TDIGEST_DELTA = 0.05 * Integer.MAX_VALUE; + @Test public void testCountMV() { String query = "SELECT COUNTMV(column6) FROM testTable"; @@ -525,6 +528,116 @@ public class InterSegmentAggregationMultiValueQueriesTest extends BaseMultiValue } @Test + public void testPercentileRawEst50MV() { + testPercentileRawEstAggregationFunction(50); + } + + @Test + public void testPercentileRawEst90MV() { + testPercentileRawEstAggregationFunction(90); + } + + @Test + public void testPercentileRawEst95MV() { + testPercentileRawEstAggregationFunction(95); + } + + @Test + public void testPercentileRawEst99MV() { + testPercentileRawEstAggregationFunction(99); + } + + private void testPercentileRawEstAggregationFunction(int percentile) { + Function<Serializable, String> quantileExtractor = value -> String.valueOf( + ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(BytesUtils.toBytes((String) value)) + .getQuantile(percentile / 100.0)); + + String rawQuery = + String.format("SELECT PERCENTILERAWEST%dMV(column6) FROM testTable", percentile); + + String query = + String.format("SELECT PERCENTILEEST%dMV(column6) FROM testTable", percentile); + + queryAndTestAggregationResult(rawQuery, getExpectedQueryResults(query), quantileExtractor); + + queryAndTestAggregationResult(rawQuery + getFilter(), getExpectedQueryResults(query + getFilter()), + quantileExtractor); + + queryAndTestAggregationResult(rawQuery + SV_GROUP_BY, getExpectedQueryResults(query + SV_GROUP_BY), + quantileExtractor); + + queryAndTestAggregationResult(rawQuery + MV_GROUP_BY, getExpectedQueryResults(query + MV_GROUP_BY), + quantileExtractor); + } + + private void testPercentileRawTDigestAggregationFunction(int percentile) { + Function<Serializable, String> quantileExtractor = value -> String.valueOf( + ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(BytesUtils.toBytes((String) value)) + .quantile(percentile / 100.0)); + + String rawQuery = + String.format("SELECT PERCENTILERAWTDIGEST%dMV(column6) FROM testTable", percentile); + + String query = + String.format("SELECT PERCENTILETDIGEST%dMV(column6) FROM testTable", percentile); + + queryAndTestAggregationResultWithDelta(rawQuery, getExpectedQueryResults(query), quantileExtractor); + + queryAndTestAggregationResultWithDelta(rawQuery + getFilter(), getExpectedQueryResults(query + getFilter()), + quantileExtractor); + + queryAndTestAggregationResultWithDelta(rawQuery + SV_GROUP_BY, getExpectedQueryResults(query + SV_GROUP_BY), + quantileExtractor); + + queryAndTestAggregationResultWithDelta(rawQuery + MV_GROUP_BY, getExpectedQueryResults(query + MV_GROUP_BY), + quantileExtractor); + } + + private ExpectedQueryResult<String> getExpectedQueryResults(String query) { + return QueriesTestUtils.buildExpectedResponse(getBrokerResponseForPqlQuery(query)); + } + + private void queryAndTestAggregationResultWithDelta(String query, ExpectedQueryResult<String> expectedQueryResults, + Function<Serializable, String> responseMapper) { + BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query); + + QueriesTestUtils + .testInterSegmentApproximateAggregationResult(brokerResponse, expectedQueryResults.getNumDocsScanned(), + expectedQueryResults.getNumEntriesScannedInFilter(), expectedQueryResults.getNumEntriesScannedPostFilter(), + expectedQueryResults.getNumTotalDocs(), responseMapper, expectedQueryResults.getResults(), + PERCENTILE_TDIGEST_DELTA); + } + + private void queryAndTestAggregationResult(String query, ExpectedQueryResult<String> expectedQueryResults, + Function<Serializable, String> responseMapper) { + BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query); + QueriesTestUtils + .testInterSegmentAggregationResult(brokerResponse, expectedQueryResults.getNumDocsScanned(), + expectedQueryResults.getNumEntriesScannedInFilter(), expectedQueryResults.getNumEntriesScannedPostFilter(), + expectedQueryResults.getNumTotalDocs(), responseMapper, expectedQueryResults.getResults()); + } + + @Test + public void testPercentileRawTDigest50MV() { + testPercentileRawTDigestAggregationFunction(50); + } + + @Test + public void testPercentileRawTDigest90MV() { + testPercentileRawTDigestAggregationFunction(90); + } + + @Test + public void testPercentileRawTDigest95MV() { + testPercentileRawTDigestAggregationFunction(95); + } + + @Test + public void testPercentileRawTDigest99MV() { + testPercentileRawTDigestAggregationFunction(99); + } + + @Test public void testNumGroupsLimit() { String query = "SELECT COUNT(*) FROM testTable GROUP BY column6"; diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java index 54654ed..b88d38a 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java @@ -37,6 +37,9 @@ import static org.testng.Assert.assertTrue; public class InterSegmentAggregationSingleValueQueriesTest extends BaseSingleValueQueriesTest { private static final String GROUP_BY = " group by column9"; + // Allow 5% quantile error due to the randomness of TDigest merge + private static final double PERCENTILE_TDIGEST_DELTA = 0.05 * Integer.MAX_VALUE; + @Test public void testCount() { String query = "SELECT COUNT(*) FROM testTable"; @@ -413,6 +416,121 @@ public class InterSegmentAggregationSingleValueQueriesTest extends BaseSingleVal } @Test + public void testPercentileRawEst50() { + testPercentileRawEstAggregationFunction(50); + } + + @Test + public void testPercentileRawEst90() { + testPercentileRawEstAggregationFunction(90); + } + + @Test + public void testPercentileRawEst95() { + testPercentileRawEstAggregationFunction(95); + } + + @Test + public void testPercentileRawEst99() { + testPercentileRawEstAggregationFunction(99); + } + + private void queryAndTestAggregationResult(String query, ExpectedQueryResult<String> expectedQueryResults, + Function<Serializable, String> responseMapper) { + BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query); + QueriesTestUtils + .testInterSegmentAggregationResult(brokerResponse, expectedQueryResults.getNumDocsScanned(), + expectedQueryResults.getNumEntriesScannedInFilter(), expectedQueryResults.getNumEntriesScannedPostFilter(), + expectedQueryResults.getNumTotalDocs(), responseMapper, expectedQueryResults.getResults()); + } + + private void testPercentileRawEstAggregationFunction(int percentile) { + Function<Serializable, String> quantileExtractor = value -> String.valueOf( + ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(BytesUtils.toBytes((String) value)) + .getQuantile(percentile / 100.0)); + + String rawQuery = + String.format("SELECT PERCENTILERAWEST%d(column1), PERCENTILERAWEST%d(column3) FROM testTable", percentile, + percentile); + + String query = + String + .format("SELECT PERCENTILEEST%d(column1), PERCENTILEEST%d(column3) FROM testTable", percentile, percentile); + + queryAndTestAggregationResult(rawQuery, getExpectedQueryResults(query), quantileExtractor); + + queryAndTestAggregationResult(rawQuery + getFilter(), getExpectedQueryResults(query + getFilter()), + quantileExtractor); + + queryAndTestAggregationResult(rawQuery + GROUP_BY, getExpectedQueryResults(query + GROUP_BY), quantileExtractor); + + queryAndTestAggregationResult(rawQuery + getFilter() + GROUP_BY, + getExpectedQueryResults(query + getFilter() + GROUP_BY), + quantileExtractor); + } + + @Test + public void testPercentileRawTDigest50() { + testPercentileRawTDigestAggregationFunction(50); + } + + @Test + public void testPercentileRawTDigest90() { + testPercentileRawTDigestAggregationFunction(90); + } + + @Test + public void testPercentileRawTDigest95() { + testPercentileRawTDigestAggregationFunction(95); + } + + @Test + public void testPercentileRawTDigest99() { + testPercentileRawTDigestAggregationFunction(99); + } + + private void testPercentileRawTDigestAggregationFunction(int percentile) { + Function<Serializable, String> quantileExtractor = value -> String.valueOf( + ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(BytesUtils.toBytes((String) value)) + .quantile(percentile / 100.0)); + + String rawQuery = + String.format("SELECT PERCENTILERAWTDIGEST%d(column1), PERCENTILERAWTDIGEST%d(column3) FROM testTable", + percentile, percentile); + + String query = + String.format("SELECT PERCENTILETDIGEST%d(column1), PERCENTILETDIGEST%d(column3) FROM testTable", percentile, + percentile); + + queryAndTestAggregationResultWithDelta(rawQuery, getExpectedQueryResults(query), quantileExtractor); + + queryAndTestAggregationResultWithDelta(rawQuery + getFilter(), getExpectedQueryResults(query + getFilter()), + quantileExtractor); + + queryAndTestAggregationResultWithDelta(rawQuery + GROUP_BY, getExpectedQueryResults(query + GROUP_BY), + quantileExtractor); + + queryAndTestAggregationResultWithDelta(rawQuery + getFilter() + GROUP_BY, + getExpectedQueryResults(query + getFilter() + GROUP_BY), + quantileExtractor); + } + + private ExpectedQueryResult<String> getExpectedQueryResults(String query) { + return QueriesTestUtils.buildExpectedResponse(getBrokerResponseForPqlQuery(query)); + } + + private void queryAndTestAggregationResultWithDelta(String query, ExpectedQueryResult<String> expectedQueryResults, + Function<Serializable, String> responseMapper) { + BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query); + + QueriesTestUtils + .testInterSegmentApproximateAggregationResult(brokerResponse, expectedQueryResults.getNumDocsScanned(), + expectedQueryResults.getNumEntriesScannedInFilter(), expectedQueryResults.getNumEntriesScannedPostFilter(), + expectedQueryResults.getNumTotalDocs(), responseMapper, expectedQueryResults.getResults(), + PERCENTILE_TDIGEST_DELTA); + } + + @Test public void testNumGroupsLimit() { String query = "SELECT COUNT(*) FROM testTable GROUP BY column1"; diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/QueriesTestUtils.java b/pinot-core/src/test/java/org/apache/pinot/queries/QueriesTestUtils.java index 41a1129..2d1a5d1 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/QueriesTestUtils.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/QueriesTestUtils.java @@ -19,6 +19,7 @@ package org.apache.pinot.queries; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -95,11 +96,9 @@ public class QueriesTestUtils { public static void testInterSegmentAggregationResult(BrokerResponseNative brokerResponse, long expectedNumDocsScanned, long expectedNumEntriesScannedInFilter, long expectedNumEntriesScannedPostFilter, long expectedNumTotalDocs, Function<Serializable, String> responseMapper, String[] expectedAggregationResults) { - Assert.assertEquals(brokerResponse.getNumDocsScanned(), expectedNumDocsScanned); - Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), expectedNumEntriesScannedInFilter); - Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), expectedNumEntriesScannedPostFilter); - Assert.assertEquals(brokerResponse.getTotalDocs(), expectedNumTotalDocs); - List<AggregationResult> aggregationResults = brokerResponse.getAggregationResults(); + List<AggregationResult> aggregationResults = + validateAggregationStats(brokerResponse, expectedNumDocsScanned, expectedNumEntriesScannedInFilter, + expectedNumEntriesScannedPostFilter, expectedNumTotalDocs); int length = expectedAggregationResults.length; Assert.assertEquals(aggregationResults.size(), length); for (int i = 0; i < length; i++) { @@ -117,9 +116,82 @@ public class QueriesTestUtils { } } - public static void testInterSegmentAggregationGroupByResult(BrokerResponseNative brokerResponse, + /** + * Builds expected response object from the given broker response. + * @param brokerResponse + * @return + */ + public static ExpectedQueryResult<String> buildExpectedResponse(BrokerResponseNative brokerResponse) { + Function<Serializable, String> responseMapper = Serializable::toString; + + List<AggregationResult> aggregationResults = brokerResponse.getAggregationResults(); + List<String> results = new ArrayList<>(); + for (int i = 0; i < aggregationResults.size(); i++) { + AggregationResult aggregationResult = aggregationResults.get(i); + Serializable value = aggregationResult.getValue(); + if (value != null) { + results.add(responseMapper.apply(value)); + // Aggregation. + } else { + // Group-by. + results.add(responseMapper.apply(aggregationResult.getGroupByResult().get(0).getValue())); + } + } + return new ExpectedQueryResult<>(brokerResponse.getNumDocsScanned(), brokerResponse.getNumEntriesScannedInFilter(), + brokerResponse.getNumEntriesScannedPostFilter(), brokerResponse.getTotalDocs(), results.toArray(new String[0])); + } + + /** + * Verifies the given results of an approximate aggregation function. + * @param brokerResponse Broker response + * @param expectedNumDocsScanned Number of documents scanned. + * @param expectedNumEntriesScannedInFilter Number of entries scanned in filter + * @param expectedNumEntriesScannedPostFilter Number of entries scanned post filter. + * @param expectedNumTotalDocs Total documents. + * @param responseMapper Mapper to process response. + * @param expectedAggregationResults Expected aggregation results. + * @param resultComparisionDelta Validate results are within +/- delta range (0 - 100)%. + */ + public static void testInterSegmentApproximateAggregationResult(BrokerResponseNative brokerResponse, long expectedNumDocsScanned, long expectedNumEntriesScannedInFilter, long expectedNumEntriesScannedPostFilter, - long expectedNumTotalDocs, List<String[]> expectedGroupKeys, List<String[]> expectedAggregationResults) { + long expectedNumTotalDocs, Function<Serializable, String> responseMapper, String[] expectedAggregationResults, + double resultComparisionDelta) { + List<AggregationResult> aggregationResults = + validateAggregationStats(brokerResponse, expectedNumDocsScanned, expectedNumEntriesScannedInFilter, + expectedNumEntriesScannedPostFilter, expectedNumTotalDocs); + int length = expectedAggregationResults.length; + Assert.assertEquals(aggregationResults.size(), length); + + for (int i = 0; i < length; i++) { + AggregationResult aggregationResult = aggregationResults.get(i); + double expectedResult = Double.parseDouble(expectedAggregationResults[i]); + Serializable value = aggregationResult.getValue(); + double actualResult = 0L; + if (value != null) { + // Aggregation. + actualResult = Double.parseDouble(responseMapper.apply(value)); + } else { + // Group-by. + actualResult = Double.parseDouble(responseMapper.apply(aggregationResult.getGroupByResult().get(0).getValue())); + } + Assert.assertEquals(actualResult, expectedResult, resultComparisionDelta); + } + } + + private static List<AggregationResult> validateAggregationStats(BrokerResponseNative brokerResponse, + long expectedNumDocsScanned, long expectedNumEntriesScannedInFilter, long expectedNumEntriesScannedPostFilter, + long expectedNumTotalDocs) { + Assert.assertEquals(brokerResponse.getNumDocsScanned(), expectedNumDocsScanned); + Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), expectedNumEntriesScannedInFilter); + Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), expectedNumEntriesScannedPostFilter); + Assert.assertEquals(brokerResponse.getTotalDocs(), expectedNumTotalDocs); + return brokerResponse.getAggregationResults(); + } + + public static void testInterSegmentAggregationGroupByResult(BrokerResponseNative brokerResponse, + long expectedNumDocsScanned, + long expectedNumEntriesScannedInFilter, long expectedNumEntriesScannedPostFilter, long expectedNumTotalDocs, + List<String[]> expectedGroupKeys, List<String[]> expectedAggregationResults) { testInterSegmentAggregationGroupByResult(brokerResponse, expectedNumDocsScanned, expectedNumEntriesScannedInFilter, expectedNumEntriesScannedPostFilter, expectedNumTotalDocs, Serializable::toString, expectedGroupKeys, expectedAggregationResults); @@ -129,13 +201,9 @@ public class QueriesTestUtils { long expectedNumDocsScanned, long expectedNumEntriesScannedInFilter, long expectedNumEntriesScannedPostFilter, long expectedNumTotalDocs, Function<Serializable, String> responseMapper, List<String[]> expectedGroupKeys, List<String[]> expectedAggregationResults) { - Assert.assertEquals(brokerResponse.getNumDocsScanned(), expectedNumDocsScanned); - Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), expectedNumEntriesScannedInFilter); - Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), expectedNumEntriesScannedPostFilter); - Assert.assertEquals(brokerResponse.getTotalDocs(), expectedNumTotalDocs); - // size of this array will be equal to number of aggregation functions since - // we return each aggregation function separately - List<AggregationResult> aggregationResults = brokerResponse.getAggregationResults(); + List<AggregationResult> aggregationResults = + validateAggregationStats(brokerResponse, expectedNumDocsScanned, expectedNumEntriesScannedInFilter, + expectedNumEntriesScannedPostFilter, expectedNumTotalDocs); int numAggregationColumns = aggregationResults.size(); Assert.assertEquals(numAggregationColumns, expectedAggregationResults.get(0).length); int numKeyColumns = expectedGroupKeys.get(0).length; @@ -181,12 +249,9 @@ public class QueriesTestUtils { static void testInterSegmentGroupByOrderByResultPQL(BrokerResponseNative brokerResponse, long expectedNumDocsScanned, long expectedNumEntriesScannedInFilter, long expectedNumEntriesScannedPostFilter, long expectedNumTotalDocs, List<String[]> expectedGroups, List<List<Serializable>> expectedValues, boolean preserveType) { - Assert.assertEquals(brokerResponse.getNumDocsScanned(), expectedNumDocsScanned); - Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), expectedNumEntriesScannedInFilter); - Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), expectedNumEntriesScannedPostFilter); - Assert.assertEquals(brokerResponse.getTotalDocs(), expectedNumTotalDocs); - - List<AggregationResult> aggregationResults = brokerResponse.getAggregationResults(); + List<AggregationResult> aggregationResults = + validateAggregationStats(brokerResponse, expectedNumDocsScanned, expectedNumEntriesScannedInFilter, + expectedNumEntriesScannedPostFilter, expectedNumTotalDocs); if (aggregationResults == null) { Assert.assertEquals(expectedGroups.size(), 0); Assert.assertEquals(expectedValues.size(), 0); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java index 86f85f4..092a285 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java @@ -58,8 +58,10 @@ public class ValueAggregatorFactory { case DISTINCTCOUNTRAWHLL: return new DistinctCountHLLValueAggregator(); case PERCENTILEEST: + case PERCENTILERAWEST: return new PercentileEstValueAggregator(); case PERCENTILETDIGEST: + case PERCENTILERAWTDIGEST: return new PercentileTDigestValueAggregator(); default: throw new IllegalStateException("Unsupported aggregation type: " + aggregationType); @@ -94,8 +96,10 @@ public class ValueAggregatorFactory { case DISTINCTCOUNTRAWHLL: return DistinctCountHLLValueAggregator.AGGREGATED_VALUE_TYPE; case PERCENTILEEST: + case PERCENTILERAWEST: return PercentileEstValueAggregator.AGGREGATED_VALUE_TYPE; case PERCENTILETDIGEST: + case PERCENTILERAWTDIGEST: return PercentileTDigestValueAggregator.AGGREGATED_VALUE_TYPE; default: throw new IllegalStateException("Unsupported aggregation type: " + aggregationType); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedQuantileDigest.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedQuantileDigest.java new file mode 100644 index 0000000..edde4be --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedQuantileDigest.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.customobject; + +import org.apache.pinot.segment.local.utils.CustomSerDeUtils; +import org.apache.pinot.spi.utils.BytesUtils; + +import static com.google.common.base.Preconditions.checkArgument; + + +/** + * Serialized and comparable version of QuantileDigest. Compares QuantileDigest for a specific percentile value. + */ +public class SerializedQuantileDigest implements Comparable<SerializedQuantileDigest> { + private final double _percentile; + private final QuantileDigest _quantileDigest; + + public SerializedQuantileDigest(QuantileDigest quantileDigest, double percentile) { + _quantileDigest = quantileDigest; + _percentile = percentile / 100.0; + } + + @Override + public int compareTo(SerializedQuantileDigest other) { + checkArgument(other._percentile == _percentile, "Percentile number doesn't match!"); + return Long.compare(_quantileDigest.getQuantile(_percentile), + other._quantileDigest.getQuantile(_percentile)); + } + + @Override + public String toString() { + return BytesUtils.toHexString(CustomSerDeUtils.QUANTILE_DIGEST_SER_DE.serialize(_quantileDigest)); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedTDigest.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedTDigest.java new file mode 100644 index 0000000..24b2281 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedTDigest.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.customobject; + +import com.tdunning.math.stats.TDigest; +import org.apache.pinot.segment.local.utils.CustomSerDeUtils; +import org.apache.pinot.spi.utils.BytesUtils; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * Serialized and comparable version of TDigest. Compares TDigest for a specific percentile value. + */ +public class SerializedTDigest implements Comparable<SerializedTDigest> { + private final double _percentile; + private final TDigest _tDigest; + + public SerializedTDigest(TDigest tDigest, double percentile) { + _tDigest = tDigest; + _percentile = percentile / 100.0; + } + + @Override + public int compareTo(SerializedTDigest other) { + checkArgument(other._percentile == _percentile, "Percentile number doesn't match!"); + return Double.compare(_tDigest.quantile(_percentile), other._tDigest.quantile(_percentile)); + } + + @Override + public String toString() { + return BytesUtils.toHexString(CustomSerDeUtils.TDIGEST_SER_DE.serialize(_tDigest)); + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java index 9d10e8a..197239b 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java @@ -44,7 +44,9 @@ public enum AggregationFunctionType { DISTINCTCOUNTRAWTHETASKETCH("distinctCountRawThetaSketch"), PERCENTILE("percentile"), PERCENTILEEST("percentileEst"), + PERCENTILERAWEST("percentileRawEst"), PERCENTILETDIGEST("percentileTDigest"), + PERCENTILERAWTDIGEST("percentileRawTDigest"), IDSET("idSet"), // Geo aggregation functions @@ -63,7 +65,9 @@ public enum AggregationFunctionType { DISTINCTCOUNTRAWHLLMV("distinctCountRawHLLMV"), PERCENTILEMV("percentileMV"), PERCENTILEESTMV("percentileEstMV"), + PERCENTILERAWESTMV("percentileRawEstMV"), PERCENTILETDIGESTMV("percentileTDigestMV"), + PERCENTILERAWTDIGESTMV("percentileRawTDigestMV"), DISTINCT("distinct"); private final String _name; @@ -88,14 +92,22 @@ public enum AggregationFunctionType { return PERCENTILE; } else if (remainingFunctionName.equals("EST") || remainingFunctionName.matches("EST\\d+")) { return PERCENTILEEST; + } else if (remainingFunctionName.equals("RAWEST") || remainingFunctionName.matches("RAWEST\\d+")) { + return PERCENTILERAWEST; } else if (remainingFunctionName.equals("TDIGEST") || remainingFunctionName.matches("TDIGEST\\d+")) { return PERCENTILETDIGEST; + } else if (remainingFunctionName.equals("RAWTDIGEST") || remainingFunctionName.matches("RAWTDIGEST\\d+")) { + return PERCENTILERAWTDIGEST; } else if (remainingFunctionName.equals("MV") || remainingFunctionName.matches("\\d+MV")) { return PERCENTILEMV; } else if (remainingFunctionName.equals("ESTMV") || remainingFunctionName.matches("EST\\d+MV")) { return PERCENTILEESTMV; + } else if (remainingFunctionName.equals("RAWESTMV") || remainingFunctionName.matches("RAWEST\\d+MV")) { + return PERCENTILERAWESTMV; } else if (remainingFunctionName.equals("TDIGESTMV") || remainingFunctionName.matches("TDIGEST\\d+MV")) { return PERCENTILETDIGESTMV; + } else if (remainingFunctionName.equals("RAWTDIGESTMV") || remainingFunctionName.matches("RAWTDIGEST\\d+MV")) { + return PERCENTILERAWTDIGESTMV; } else { throw new IllegalArgumentException("Invalid aggregation function name: " + functionName); } diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/misc/AggregationFunctionColumnPairTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/misc/AggregationFunctionColumnPairTest.java index 659fc4c..b09499a 100644 --- a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/misc/AggregationFunctionColumnPairTest.java +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/misc/AggregationFunctionColumnPairTest.java @@ -92,6 +92,15 @@ public class AggregationFunctionColumnPairTest { Assert.assertEquals(fromColumnName, pair); Assert.assertEquals(fromColumnName.hashCode(), pair.hashCode()); + pair = new AggregationFunctionColumnPair(AggregationFunctionType.PERCENTILERAWEST, COLUMN); + Assert.assertEquals(pair.getFunctionType(), AggregationFunctionType.PERCENTILERAWEST); + Assert.assertEquals(pair.getColumn(), COLUMN); + columnName = pair.toColumnName(); + Assert.assertEquals(columnName, "percentileRawEst__column"); + fromColumnName = AggregationFunctionColumnPair.fromColumnName("PERCENTILE_RAW_EST__column"); + Assert.assertEquals(fromColumnName, pair); + Assert.assertEquals(fromColumnName.hashCode(), pair.hashCode()); + pair = new AggregationFunctionColumnPair(AggregationFunctionType.PERCENTILETDIGEST, COLUMN); Assert.assertEquals(pair.getFunctionType(), AggregationFunctionType.PERCENTILETDIGEST); Assert.assertEquals(pair.getColumn(), COLUMN); @@ -100,5 +109,14 @@ public class AggregationFunctionColumnPairTest { fromColumnName = AggregationFunctionColumnPair.fromColumnName("percentiletdigest__column"); Assert.assertEquals(fromColumnName, pair); Assert.assertEquals(fromColumnName.hashCode(), pair.hashCode()); + + pair = new AggregationFunctionColumnPair(AggregationFunctionType.PERCENTILERAWTDIGEST, COLUMN); + Assert.assertEquals(pair.getFunctionType(), AggregationFunctionType.PERCENTILERAWTDIGEST); + Assert.assertEquals(pair.getColumn(), COLUMN); + columnName = pair.toColumnName(); + Assert.assertEquals(columnName, "percentileRawTDigest__column"); + fromColumnName = AggregationFunctionColumnPair.fromColumnName("percentilerawtdigest__column"); + Assert.assertEquals(fromColumnName, pair); + Assert.assertEquals(fromColumnName.hashCode(), pair.hashCode()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org