This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d12ce887a9 Minmaxrange null (#12252) d12ce887a9 is described below commit d12ce887a9adf5e4d6253ada533e9f0a5df71298 Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com> AuthorDate: Tue Mar 19 02:06:54 2024 -0700 Minmaxrange null (#12252) * new test framework candidate * Improved test system * Improve framework to be able to specify segments as strings * fix headers * Improve assertions when there are nulls * Improve error text * Improvements in the framework * Add a base class single input aggregation operations can extend to support null handling * Fix issue in NullableSingleInputAggregationFunction.forEachNotNullInt * Improve error message in NullEnabledQueriesTest * Add new schema family * Rename test schemas and table config * Split AllNullQueriesTest into on test per query * Revert change in AllNullQueriesTest that belongs to mode-null-support branch * Add null support in minmaxrange * Adapted to new framework * Applied suggestions during PR --- .../function/AggregationFunctionFactory.java | 2 +- .../function/MinMaxRangeAggregationFunction.java | 103 +++++------ .../function/MinMaxRangeMVAggregationFunction.java | 2 +- .../MinMaxRangeAggregationFunctionTest.java | 195 +++++++++++++++++++++ .../aggregator/MinMaxRangeValueAggregator.java | 2 +- .../local/customobject/MinMaxRangePair.java | 8 + 6 files changed, 259 insertions(+), 53 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java index 8db0d730d7..3c449f1578 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java @@ -316,7 +316,7 @@ public class AggregationFunctionFactory { } } case MINMAXRANGE: - return new MinMaxRangeAggregationFunction(arguments); + return new MinMaxRangeAggregationFunction(arguments, nullHandlingEnabled); case DISTINCTCOUNT: return new DistinctCountAggregationFunction(arguments, nullHandlingEnabled); case DISTINCTCOUNTBITMAP: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java index 1c039b9d14..28299429c6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java @@ -33,14 +33,14 @@ import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.spi.data.FieldSpec.DataType; -public class MinMaxRangeAggregationFunction extends BaseSingleInputAggregationFunction<MinMaxRangePair, Double> { +public class MinMaxRangeAggregationFunction extends NullableSingleInputAggregationFunction<MinMaxRangePair, Double> { - public MinMaxRangeAggregationFunction(List<ExpressionContext> arguments) { - super(verifySingleArgument(arguments, "MIN_MAX_RANGE")); + public MinMaxRangeAggregationFunction(List<ExpressionContext> arguments, boolean nullHandlingEnabled) { + super(verifySingleArgument(arguments, "MIN_MAX_RANGE"), nullHandlingEnabled); } - protected MinMaxRangeAggregationFunction(ExpressionContext expression) { - super(expression); + protected MinMaxRangeAggregationFunction(ExpressionContext expression, boolean nullHandlingEnabled) { + super(expression, nullHandlingEnabled); } @Override @@ -61,37 +61,29 @@ public class MinMaxRangeAggregationFunction extends BaseSingleInputAggregationFu @Override public void aggregate(int length, AggregationResultHolder aggregationResultHolder, Map<ExpressionContext, BlockValSet> blockValSetMap) { - double min = Double.POSITIVE_INFINITY; - double max = Double.NEGATIVE_INFINITY; BlockValSet blockValSet = blockValSetMap.get(_expression); + MinMaxRangePair minMax = new MinMaxRangePair(); + if (blockValSet.getValueType() != DataType.BYTES) { double[] doubleValues = blockValSet.getDoubleValuesSV(); - for (int i = 0; i < length; i++) { - double value = doubleValues[i]; - if (value < min) { - min = value; - } - if (value > max) { - max = value; + forEachNotNull(length, blockValSet, (from, to) -> { + for (int i = from; i < to; i++) { + double value = doubleValues[i]; + minMax.apply(value); } - } + }); } else { // Serialized MinMaxRangePair byte[][] bytesValues = blockValSet.getBytesValuesSV(); - for (int i = 0; i < length; i++) { - MinMaxRangePair minMaxRangePair = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]); - double minValue = minMaxRangePair.getMin(); - double maxValue = minMaxRangePair.getMax(); - if (minValue < min) { - min = minValue; + forEachNotNull(length, blockValSet, (from, to) -> { + for (int i = from; i < to; i++) { + MinMaxRangePair minMaxRangePair = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]); + minMax.apply(minMaxRangePair); } - if (maxValue > max) { - max = maxValue; - } - } + }); } - setAggregationResult(aggregationResultHolder, min, max); + setAggregationResult(aggregationResultHolder, minMax.getMin(), minMax.getMax()); } protected void setAggregationResult(AggregationResultHolder aggregationResultHolder, double min, double max) { @@ -109,17 +101,21 @@ public class MinMaxRangeAggregationFunction extends BaseSingleInputAggregationFu BlockValSet blockValSet = blockValSetMap.get(_expression); if (blockValSet.getValueType() != DataType.BYTES) { double[] doubleValues = blockValSet.getDoubleValuesSV(); - for (int i = 0; i < length; i++) { - double value = doubleValues[i]; - setGroupByResult(groupKeyArray[i], groupByResultHolder, value, value); - } + forEachNotNull(length, blockValSet, (from, to) -> { + for (int i = from; i < to; i++) { + double value = doubleValues[i]; + setGroupByResult(groupKeyArray[i], groupByResultHolder, value, value); + } + }); } else { // Serialized MinMaxRangePair byte[][] bytesValues = blockValSet.getBytesValuesSV(); - for (int i = 0; i < length; i++) { - MinMaxRangePair minMaxRangePair = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]); - setGroupByResult(groupKeyArray[i], groupByResultHolder, minMaxRangePair.getMin(), minMaxRangePair.getMax()); - } + forEachNotNull(length, blockValSet, (from, to) -> { + for (int i = from; i < to; i++) { + MinMaxRangePair minMaxRangePair = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]); + setGroupByResult(groupKeyArray[i], groupByResultHolder, minMaxRangePair.getMin(), minMaxRangePair.getMax()); + } + }); } } @@ -129,23 +125,27 @@ public class MinMaxRangeAggregationFunction extends BaseSingleInputAggregationFu BlockValSet blockValSet = blockValSetMap.get(_expression); if (blockValSet.getValueType() != DataType.BYTES) { double[] doubleValues = blockValSet.getDoubleValuesSV(); - for (int i = 0; i < length; i++) { - double value = doubleValues[i]; - for (int groupKey : groupKeysArray[i]) { - setGroupByResult(groupKey, groupByResultHolder, value, value); + forEachNotNull(length, blockValSet, (from, to) -> { + for (int i = from; i < to; i++) { + double value = doubleValues[i]; + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, value, value); + } } - } + }); } else { // Serialized MinMaxRangePair byte[][] bytesValues = blockValSet.getBytesValuesSV(); - for (int i = 0; i < length; i++) { - MinMaxRangePair minMaxRangePair = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]); - double min = minMaxRangePair.getMin(); - double max = minMaxRangePair.getMax(); - for (int groupKey : groupKeysArray[i]) { - setGroupByResult(groupKey, groupByResultHolder, min, max); + forEachNotNull(length, blockValSet, (from, to) -> { + for (int i = from; i < to; i++) { + MinMaxRangePair minMaxRangePair = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]); + double min = minMaxRangePair.getMin(); + double max = minMaxRangePair.getMax(); + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, min, max); + } } - } + }); } } @@ -161,8 +161,8 @@ public class MinMaxRangeAggregationFunction extends BaseSingleInputAggregationFu @Override public MinMaxRangePair extractAggregationResult(AggregationResultHolder aggregationResultHolder) { MinMaxRangePair minMaxRangePair = aggregationResultHolder.getResult(); - if (minMaxRangePair == null) { - return new MinMaxRangePair(Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY); + if (minMaxRangePair == null && !_nullHandlingEnabled) { + return new MinMaxRangePair(); } else { return minMaxRangePair; } @@ -171,8 +171,8 @@ public class MinMaxRangeAggregationFunction extends BaseSingleInputAggregationFu @Override public MinMaxRangePair extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { MinMaxRangePair minMaxRangePair = groupByResultHolder.getResult(groupKey); - if (minMaxRangePair == null) { - return new MinMaxRangePair(Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY); + if (minMaxRangePair == null && !_nullHandlingEnabled) { + return new MinMaxRangePair(); } else { return minMaxRangePair; } @@ -196,6 +196,9 @@ public class MinMaxRangeAggregationFunction extends BaseSingleInputAggregationFu @Override public Double extractFinalResult(MinMaxRangePair intermediateResult) { + if (intermediateResult == null) { + return null; + } return intermediateResult.getMax() - intermediateResult.getMin(); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeMVAggregationFunction.java index 466a6b044f..534bdb41f2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeMVAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeMVAggregationFunction.java @@ -30,7 +30,7 @@ import org.apache.pinot.segment.spi.AggregationFunctionType; public class MinMaxRangeMVAggregationFunction extends MinMaxRangeAggregationFunction { public MinMaxRangeMVAggregationFunction(List<ExpressionContext> arguments) { - super(verifySingleArgument(arguments, "MIN_MAX_RANGE_MV")); + super(verifySingleArgument(arguments, "MIN_MAX_RANGE_MV"), false); } @Override diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunctionTest.java new file mode 100644 index 0000000000..822399d66f --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunctionTest.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.core.query.aggregation.function; + +import org.apache.pinot.common.utils.PinotDataType; +import org.apache.pinot.queries.FluentQueryTest; +import org.apache.pinot.spi.data.FieldSpec; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + + +public class MinMaxRangeAggregationFunctionTest extends AbstractAggregationFunctionTest { + + @DataProvider(name = "scenarios") + Object[] scenarios() { + return new Object[] { + new Scenario(FieldSpec.DataType.INT), + new Scenario(FieldSpec.DataType.LONG), + new Scenario(FieldSpec.DataType.FLOAT), + new Scenario(FieldSpec.DataType.DOUBLE), + }; + } + + public class Scenario { + private final FieldSpec.DataType _dataType; + + public Scenario(FieldSpec.DataType dataType) { + _dataType = dataType; + } + + public FluentQueryTest.DeclaringTable getDeclaringTable(boolean nullHandlingEnabled) { + return givenSingleNullableFieldTable(_dataType, nullHandlingEnabled); + } + + @Override + public String toString() { + return "Scenario{" + "dt=" + _dataType + '}'; + } + } + + String diffBetweenMinAnd9(FieldSpec.DataType dt) { + switch (dt) { + case INT: return "2.147483657E9"; + case LONG: return "9.223372036854776E18"; + case FLOAT: return "Infinity"; + case DOUBLE: return "Infinity"; + default: throw new IllegalArgumentException(dt.toString()); + } + } + + @Test(dataProvider = "scenarios") + void aggrWithoutNull(Scenario scenario) { + scenario.getDeclaringTable(false) + .onFirstInstance("myField", + "null", + "1", + "null" + ).andOnSecondInstance("myField", + "null", + "9", + "null" + ) + .whenQuery("select minmaxrange(myField) from testTable") + .thenResultIs("DOUBLE", diffBetweenMinAnd9(scenario._dataType)); + } + + @Test(dataProvider = "scenarios") + void aggrWithNull(Scenario scenario) { + scenario.getDeclaringTable(true) + .onFirstInstance("myField", + "null", + "1", + "null" + ).andOnSecondInstance("myField", + "null", + "9", + "null" + ).whenQuery("select minmaxrange(myField) from testTable") + .thenResultIs("DOUBLE", "8"); + } + + @Test(dataProvider = "scenarios") + void aggrSvWithoutNull(Scenario scenario) { + scenario.getDeclaringTable(false) + .onFirstInstance("myField", + "null", + "1", + "null" + ).andOnSecondInstance("myField", + "null", + "9", + "null" + ).whenQuery("select 'cte', minmaxrange(myField) from testTable group by 'cte'") + .thenResultIs("STRING | DOUBLE", "cte | " + diffBetweenMinAnd9(scenario._dataType)); + } + + @Test(dataProvider = "scenarios") + void aggrSvWithNull(Scenario scenario) { + scenario.getDeclaringTable(true) + .onFirstInstance("myField", + "null", + "1", + "null" + ).andOnSecondInstance("myField", + "null", + "9", + "null" + ).whenQuery("select 'cte', minmaxrange(myField) from testTable group by 'cte'") + .thenResultIs("STRING | DOUBLE", "cte | 8"); + } + + String aggrSvSelfWithoutNullResult(FieldSpec.DataType dt) { + switch (dt) { + case INT: return "0"; + case LONG: return "0"; + case FLOAT: return "NaN"; + case DOUBLE: return "NaN"; + default: throw new IllegalArgumentException(dt.toString()); + } + } + + @Test(dataProvider = "scenarios") + void aggrSvSelfWithoutNull(Scenario scenario) { + PinotDataType pinotDataType = scenario._dataType == FieldSpec.DataType.INT + ? PinotDataType.INTEGER : PinotDataType.valueOf(scenario._dataType.name()); + + Object defaultNullValue; + switch (scenario._dataType) { + case INT: + defaultNullValue = Integer.MIN_VALUE; + break; + case LONG: + defaultNullValue = Long.MIN_VALUE; + break; + case FLOAT: + defaultNullValue = Float.NEGATIVE_INFINITY; + break; + case DOUBLE: + defaultNullValue = Double.NEGATIVE_INFINITY; + break; + default: + throw new IllegalArgumentException("Unexpected scenario data type " + scenario._dataType); + } + + scenario.getDeclaringTable(false) + .onFirstInstance("myField", + "null", + "1", + "2" + ).andOnSecondInstance("myField", + "null", + "1", + "2" + ).whenQuery("select myField, minmaxrange(myField) from testTable group by myField order by myField") + .thenResultIs(pinotDataType + " | DOUBLE", + defaultNullValue + " | " + aggrSvSelfWithoutNullResult(scenario._dataType), + "1 | 0", + "2 | 0"); + } + + @Test(dataProvider = "scenarios") + void aggrSvSelfWithNull(Scenario scenario) { + PinotDataType pinotDataType = scenario._dataType == FieldSpec.DataType.INT + ? PinotDataType.INTEGER : PinotDataType.valueOf(scenario._dataType.name()); + + scenario.getDeclaringTable(true) + .onFirstInstance("myField", + "null", + "1", + "2" + ).andOnSecondInstance("myField", + "null", + "1", + "2" + ).whenQuery("select myField, minmaxrange(myField) from testTable group by myField order by myField") + .thenResultIs(pinotDataType + " | DOUBLE", "1 | 0", "2 | 0", "null | null"); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinMaxRangeValueAggregator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinMaxRangeValueAggregator.java index d7de9ef01f..484e22b4f9 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinMaxRangeValueAggregator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinMaxRangeValueAggregator.java @@ -53,7 +53,7 @@ public class MinMaxRangeValueAggregator implements ValueAggregator<Object, MinMa value.apply(deserializeAggregatedValue((byte[]) rawValue)); } else { double doubleValue = ((Number) rawValue).doubleValue(); - value.apply(doubleValue, doubleValue); + value.apply(doubleValue); } return value; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/MinMaxRangePair.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/MinMaxRangePair.java index 09e926378d..940e980821 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/MinMaxRangePair.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/MinMaxRangePair.java @@ -26,11 +26,19 @@ public class MinMaxRangePair implements Comparable<MinMaxRangePair> { private double _min; private double _max; + public MinMaxRangePair() { + this(Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY); + } + public MinMaxRangePair(double min, double max) { _min = min; _max = max; } + public void apply(double value) { + apply(value, value); + } + public void apply(double min, double max) { if (min < _min) { _min = min; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org