This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new cb3ccdcaee First with time (#12235) cb3ccdcaee is described below commit cb3ccdcaeec4387adced4e7210435ec6260807c7 Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com> AuthorDate: Wed Mar 20 00:35:42 2024 -0700 First with time (#12235) * Support nullHandling in FirstWithTime * Add last_with_time as well --- .../function/AggregationFunctionFactory.java | 26 +-- ...irstDoubleValueWithTimeAggregationFunction.java | 55 +++--- ...FirstFloatValueWithTimeAggregationFunction.java | 55 +++--- .../FirstIntValueWithTimeAggregationFunction.java | 54 +++--- .../FirstLongValueWithTimeAggregationFunction.java | 55 +++--- ...irstStringValueWithTimeAggregationFunction.java | 55 +++--- .../function/FirstWithTimeAggregationFunction.java | 94 +++++++---- ...LastDoubleValueWithTimeAggregationFunction.java | 55 +++--- .../LastFloatValueWithTimeAggregationFunction.java | 55 +++--- .../LastIntValueWithTimeAggregationFunction.java | 54 +++--- .../LastLongValueWithTimeAggregationFunction.java | 55 +++--- ...LastStringValueWithTimeAggregationFunction.java | 55 +++--- .../function/LastWithTimeAggregationFunction.java | 34 +++- .../NullableSingleInputAggregationFunction.java | 113 +++++++++++++ .../FirstWithTimeAggregationFunctionTest.java | 186 +++++++++++++++++++++ .../LastWithTimeAggregationFunctionTest.java | 174 +++++++++++++++++++ .../segment/local/customobject/ValueLongPair.java | 8 + 17 files changed, 831 insertions(+), 352 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java index 3c449f1578..eeed8608a4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java @@ -221,17 +221,19 @@ public class AggregationFunctionFactory { DataType dataType = DataType.valueOf(dataTypeExp.getLiteral().getStringValue().toUpperCase()); switch (dataType) { case BOOLEAN: - return new FirstIntValueWithTimeAggregationFunction(firstArgument, timeCol, true); + return new FirstIntValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled, + true); case INT: - return new FirstIntValueWithTimeAggregationFunction(firstArgument, timeCol, false); + return new FirstIntValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled, + false); case LONG: - return new FirstLongValueWithTimeAggregationFunction(firstArgument, timeCol); + return new FirstLongValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled); case FLOAT: - return new FirstFloatValueWithTimeAggregationFunction(firstArgument, timeCol); + return new FirstFloatValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled); case DOUBLE: - return new FirstDoubleValueWithTimeAggregationFunction(firstArgument, timeCol); + return new FirstDoubleValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled); case STRING: - return new FirstStringValueWithTimeAggregationFunction(firstArgument, timeCol); + return new FirstStringValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled); default: throw new IllegalArgumentException("Unsupported data type for FIRST_WITH_TIME: " + dataType); } @@ -300,17 +302,17 @@ public class AggregationFunctionFactory { DataType dataType = DataType.valueOf(dataTypeExp.getLiteral().getStringValue().toUpperCase()); switch (dataType) { case BOOLEAN: - return new LastIntValueWithTimeAggregationFunction(firstArgument, timeCol, true); + return new LastIntValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled, true); case INT: - return new LastIntValueWithTimeAggregationFunction(firstArgument, timeCol, false); + return new LastIntValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled, false); case LONG: - return new LastLongValueWithTimeAggregationFunction(firstArgument, timeCol); + return new LastLongValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled); case FLOAT: - return new LastFloatValueWithTimeAggregationFunction(firstArgument, timeCol); + return new LastFloatValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled); case DOUBLE: - return new LastDoubleValueWithTimeAggregationFunction(firstArgument, timeCol); + return new LastDoubleValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled); case STRING: - return new LastStringValueWithTimeAggregationFunction(firstArgument, timeCol); + return new LastStringValueWithTimeAggregationFunction(firstArgument, timeCol, nullHandlingEnabled); default: throw new IllegalArgumentException("Unsupported data type for LAST_WITH_TIME: " + dataType); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstDoubleValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstDoubleValueWithTimeAggregationFunction.java index 270a86d671..46d61f225d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstDoubleValueWithTimeAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstDoubleValueWithTimeAggregationFunction.java @@ -22,10 +22,10 @@ import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.common.BlockValSet; import org.apache.pinot.core.common.ObjectSerDeUtils; -import org.apache.pinot.core.query.aggregation.AggregationResultHolder; import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.segment.local.customobject.DoubleLongPair; import org.apache.pinot.segment.local.customobject.ValueLongPair; +import org.roaringbitmap.IntIterator; /** @@ -41,8 +41,9 @@ import org.apache.pinot.segment.local.customobject.ValueLongPair; public class FirstDoubleValueWithTimeAggregationFunction extends FirstWithTimeAggregationFunction<Double> { private final static ValueLongPair<Double> DEFAULT_VALUE_TIME_PAIR = new DoubleLongPair(Double.NaN, Long.MAX_VALUE); - public FirstDoubleValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol) { - super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE); + public FirstDoubleValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol, + boolean nullHandlingEnabled) { + super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE, nullHandlingEnabled); } @Override @@ -56,22 +57,8 @@ public class FirstDoubleValueWithTimeAggregationFunction extends FirstWithTimeAg } @Override - public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, - BlockValSet blockValSet, BlockValSet timeValSet) { - ValueLongPair<Double> defaultValueLongPair = getDefaultValueTimePair(); - Double firstData = defaultValueLongPair.getValue(); - long firstTime = defaultValueLongPair.getTime(); - double[] doubleValues = blockValSet.getDoubleValuesSV(); - long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - double data = doubleValues[i]; - long time = timeValues[i]; - if (time <= firstTime) { - firstTime = time; - firstData = data; - } - } - setAggregationResult(aggregationResultHolder, firstData, firstTime); + public Double readCell(BlockValSet block, int docId) { + return block.getDoubleValuesSV()[docId]; } @Override @@ -79,11 +66,15 @@ public class FirstDoubleValueWithTimeAggregationFunction extends FirstWithTimeAg GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) { double[] doubleValues = blockValSet.getDoubleValuesSV(); long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - double data = doubleValues[i]; - long time = timeValues[i]; - setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); - } + + IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet); + forEachNotNull(length, nullIdxIterator, (from, to) -> { + for (int i = from; i < to; i++) { + double data = doubleValues[i]; + long time = timeValues[i]; + setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); + } + }); } @Override @@ -91,13 +82,17 @@ public class FirstDoubleValueWithTimeAggregationFunction extends FirstWithTimeAg GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) { double[] doubleValues = blockValSet.getDoubleValuesSV(); long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - double value = doubleValues[i]; - long time = timeValues[i]; - for (int groupKey : groupKeysArray[i]) { - setGroupByResult(groupKey, groupByResultHolder, value, time); + + IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet); + forEachNotNull(length, nullIdxIterator, (from, to) -> { + for (int i = from; i < to; i++) { + double value = doubleValues[i]; + long time = timeValues[i]; + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, value, time); + } } - } + }); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstFloatValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstFloatValueWithTimeAggregationFunction.java index e4c4da2400..d34fe755ec 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstFloatValueWithTimeAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstFloatValueWithTimeAggregationFunction.java @@ -22,10 +22,10 @@ import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.common.BlockValSet; import org.apache.pinot.core.common.ObjectSerDeUtils; -import org.apache.pinot.core.query.aggregation.AggregationResultHolder; import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.segment.local.customobject.FloatLongPair; import org.apache.pinot.segment.local.customobject.ValueLongPair; +import org.roaringbitmap.IntIterator; /** @@ -41,8 +41,9 @@ import org.apache.pinot.segment.local.customobject.ValueLongPair; public class FirstFloatValueWithTimeAggregationFunction extends FirstWithTimeAggregationFunction<Float> { private final static ValueLongPair<Float> DEFAULT_VALUE_TIME_PAIR = new FloatLongPair(Float.NaN, Long.MAX_VALUE); - public FirstFloatValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol) { - super(dataCol, timeCol, ObjectSerDeUtils.FLOAT_LONG_PAIR_SER_DE); + public FirstFloatValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol, + boolean nullHandlingEnabled) { + super(dataCol, timeCol, ObjectSerDeUtils.FLOAT_LONG_PAIR_SER_DE, nullHandlingEnabled); } @Override @@ -56,22 +57,8 @@ public class FirstFloatValueWithTimeAggregationFunction extends FirstWithTimeAgg } @Override - public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, - BlockValSet blockValSet, BlockValSet timeValSet) { - ValueLongPair<Float> defaultValueLongPair = getDefaultValueTimePair(); - Float firstData = defaultValueLongPair.getValue(); - long firstTime = defaultValueLongPair.getTime(); - float[] floatValues = blockValSet.getFloatValuesSV(); - long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - float data = floatValues[i]; - long time = timeValues[i]; - if (time <= firstTime) { - firstTime = time; - firstData = data; - } - } - setAggregationResult(aggregationResultHolder, firstData, firstTime); + public Float readCell(BlockValSet block, int docId) { + return block.getFloatValuesSV()[docId]; } @Override @@ -79,11 +66,15 @@ public class FirstFloatValueWithTimeAggregationFunction extends FirstWithTimeAgg GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) { float[] floatValues = blockValSet.getFloatValuesSV(); long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - float data = floatValues[i]; - long time = timeValues[i]; - setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); - } + + IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet); + forEachNotNull(length, nullIdxIterator, (from, to) -> { + for (int i = from; i < to; i++) { + float data = floatValues[i]; + long time = timeValues[i]; + setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); + } + }); } @Override @@ -91,13 +82,17 @@ public class FirstFloatValueWithTimeAggregationFunction extends FirstWithTimeAgg GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) { float[] floatValues = blockValSet.getFloatValuesSV(); long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - float value = floatValues[i]; - long time = timeValues[i]; - for (int groupKey : groupKeysArray[i]) { - setGroupByResult(groupKey, groupByResultHolder, value, time); + + IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet); + forEachNotNull(length, nullIdxIterator, (from, to) -> { + for (int i = from; i < to; i++) { + float value = floatValues[i]; + long time = timeValues[i]; + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, value, time); + } } - } + }); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstIntValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstIntValueWithTimeAggregationFunction.java index 8623f0bdc5..97b5829968 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstIntValueWithTimeAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstIntValueWithTimeAggregationFunction.java @@ -22,10 +22,10 @@ import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.common.BlockValSet; import org.apache.pinot.core.common.ObjectSerDeUtils; -import org.apache.pinot.core.query.aggregation.AggregationResultHolder; import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.segment.local.customobject.IntLongPair; import org.apache.pinot.segment.local.customobject.ValueLongPair; +import org.roaringbitmap.IntIterator; /** @@ -46,8 +46,8 @@ public class FirstIntValueWithTimeAggregationFunction extends FirstWithTimeAggre private final boolean _isBoolean; public FirstIntValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol, - boolean isBoolean) { - super(dataCol, timeCol, ObjectSerDeUtils.INT_LONG_PAIR_SER_DE); + boolean nullHandlingEnabled, boolean isBoolean) { + super(dataCol, timeCol, ObjectSerDeUtils.INT_LONG_PAIR_SER_DE, nullHandlingEnabled); _isBoolean = isBoolean; } @@ -62,22 +62,8 @@ public class FirstIntValueWithTimeAggregationFunction extends FirstWithTimeAggre } @Override - public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, - BlockValSet blockValSet, BlockValSet timeValSet) { - ValueLongPair<Integer> defaultValueLongPair = getDefaultValueTimePair(); - Integer firstData = defaultValueLongPair.getValue(); - long firstTime = defaultValueLongPair.getTime(); - int[] intValues = blockValSet.getIntValuesSV(); - long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - int data = intValues[i]; - long time = timeValues[i]; - if (time <= firstTime) { - firstTime = time; - firstData = data; - } - } - setAggregationResult(aggregationResultHolder, firstData, firstTime); + public Integer readCell(BlockValSet block, int docId) { + return block.getIntValuesSV()[docId]; } @Override @@ -85,11 +71,15 @@ public class FirstIntValueWithTimeAggregationFunction extends FirstWithTimeAggre GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) { int[] intValues = blockValSet.getIntValuesSV(); long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - int data = intValues[i]; - long time = timeValues[i]; - setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); - } + + IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet); + forEachNotNull(length, nullIdxIterator, (from, to) -> { + for (int i = from; i < to; i++) { + int data = intValues[i]; + long time = timeValues[i]; + setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); + } + }); } @Override @@ -97,13 +87,17 @@ public class FirstIntValueWithTimeAggregationFunction extends FirstWithTimeAggre GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) { int[] intValues = blockValSet.getIntValuesSV(); long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - int value = intValues[i]; - long time = timeValues[i]; - for (int groupKey : groupKeysArray[i]) { - setGroupByResult(groupKey, groupByResultHolder, value, time); + + IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet); + forEachNotNull(length, nullIdxIterator, (from, to) -> { + for (int i = from; i < to; i++) { + int value = intValues[i]; + long time = timeValues[i]; + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, value, time); + } } - } + }); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstLongValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstLongValueWithTimeAggregationFunction.java index 0f5afeaa97..9b1fd34d2a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstLongValueWithTimeAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstLongValueWithTimeAggregationFunction.java @@ -22,10 +22,10 @@ import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.common.BlockValSet; import org.apache.pinot.core.common.ObjectSerDeUtils; -import org.apache.pinot.core.query.aggregation.AggregationResultHolder; import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.segment.local.customobject.LongLongPair; import org.apache.pinot.segment.local.customobject.ValueLongPair; +import org.roaringbitmap.IntIterator; /** @@ -41,8 +41,9 @@ import org.apache.pinot.segment.local.customobject.ValueLongPair; public class FirstLongValueWithTimeAggregationFunction extends FirstWithTimeAggregationFunction<Long> { private final static ValueLongPair<Long> DEFAULT_VALUE_TIME_PAIR = new LongLongPair(Long.MIN_VALUE, Long.MAX_VALUE); - public FirstLongValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol) { - super(dataCol, timeCol, ObjectSerDeUtils.LONG_LONG_PAIR_SER_DE); + public FirstLongValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol, + boolean nullHandlingEnabled) { + super(dataCol, timeCol, ObjectSerDeUtils.LONG_LONG_PAIR_SER_DE, nullHandlingEnabled); } @Override @@ -56,22 +57,8 @@ public class FirstLongValueWithTimeAggregationFunction extends FirstWithTimeAggr } @Override - public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, - BlockValSet blockValSet, BlockValSet timeValSet) { - ValueLongPair<Long> defaultValueLongPair = getDefaultValueTimePair(); - Long firstData = defaultValueLongPair.getValue(); - long firstTime = defaultValueLongPair.getTime(); - long[] longValues = blockValSet.getLongValuesSV(); - long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - long data = longValues[i]; - long time = timeValues[i]; - if (time <= firstTime) { - firstTime = time; - firstData = data; - } - } - setAggregationResult(aggregationResultHolder, firstData, firstTime); + public Long readCell(BlockValSet block, int docId) { + return block.getLongValuesSV()[docId]; } @Override @@ -79,11 +66,15 @@ public class FirstLongValueWithTimeAggregationFunction extends FirstWithTimeAggr GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) { long[] longValues = blockValSet.getLongValuesSV(); long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - long data = longValues[i]; - long time = timeValues[i]; - setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); - } + + IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet); + forEachNotNull(length, nullIdxIterator, (from, to) -> { + for (int i = from; i < to; i++) { + long data = longValues[i]; + long time = timeValues[i]; + setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); + } + }); } @Override @@ -91,13 +82,17 @@ public class FirstLongValueWithTimeAggregationFunction extends FirstWithTimeAggr GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) { long[] longValues = blockValSet.getLongValuesSV(); long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - long value = longValues[i]; - long time = timeValues[i]; - for (int groupKey : groupKeysArray[i]) { - setGroupByResult(groupKey, groupByResultHolder, value, time); + + IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet); + forEachNotNull(length, nullIdxIterator, (from, to) -> { + for (int i = from; i < to; i++) { + long value = longValues[i]; + long time = timeValues[i]; + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, value, time); + } } - } + }); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstStringValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstStringValueWithTimeAggregationFunction.java index c30f4bfacb..4fe34117d2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstStringValueWithTimeAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstStringValueWithTimeAggregationFunction.java @@ -22,10 +22,10 @@ import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.common.BlockValSet; import org.apache.pinot.core.common.ObjectSerDeUtils; -import org.apache.pinot.core.query.aggregation.AggregationResultHolder; import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.segment.local.customobject.StringLongPair; import org.apache.pinot.segment.local.customobject.ValueLongPair; +import org.roaringbitmap.IntIterator; /** @@ -41,8 +41,9 @@ import org.apache.pinot.segment.local.customobject.ValueLongPair; public class FirstStringValueWithTimeAggregationFunction extends FirstWithTimeAggregationFunction<String> { private final static ValueLongPair<String> DEFAULT_VALUE_TIME_PAIR = new StringLongPair("", Long.MAX_VALUE); - public FirstStringValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol) { - super(dataCol, timeCol, ObjectSerDeUtils.STRING_LONG_PAIR_SER_DE); + public FirstStringValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol, + boolean nullHandlingEnabled) { + super(dataCol, timeCol, ObjectSerDeUtils.STRING_LONG_PAIR_SER_DE, nullHandlingEnabled); } @Override @@ -56,22 +57,8 @@ public class FirstStringValueWithTimeAggregationFunction extends FirstWithTimeAg } @Override - public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, - BlockValSet blockValSet, BlockValSet timeValSet) { - ValueLongPair<String> defaultValueLongPair = getDefaultValueTimePair(); - String firstData = defaultValueLongPair.getValue(); - long firstTime = defaultValueLongPair.getTime(); - String[] stringValues = blockValSet.getStringValuesSV(); - long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - String data = stringValues[i]; - long time = timeValues[i]; - if (time <= firstTime) { - firstTime = time; - firstData = data; - } - } - setAggregationResult(aggregationResultHolder, firstData, firstTime); + public String readCell(BlockValSet block, int docId) { + return block.getStringValuesSV()[docId]; } @Override @@ -79,11 +66,15 @@ public class FirstStringValueWithTimeAggregationFunction extends FirstWithTimeAg GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) { String[] stringValues = blockValSet.getStringValuesSV(); long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - String data = stringValues[i]; - long time = timeValues[i]; - setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); - } + + IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet); + forEachNotNull(length, nullIdxIterator, (from, to) -> { + for (int i = from; i < to; i++) { + String data = stringValues[i]; + long time = timeValues[i]; + setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); + } + }); } @Override @@ -91,13 +82,17 @@ public class FirstStringValueWithTimeAggregationFunction extends FirstWithTimeAg GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) { String[] stringValues = blockValSet.getStringValuesSV(); long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - String value = stringValues[i]; - long time = timeValues[i]; - for (int groupKey : groupKeysArray[i]) { - setGroupByResult(groupKey, groupByResultHolder, value, time); + + IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet); + forEachNotNull(length, nullIdxIterator, (from, to) -> { + for (int i = from; i < to; i++) { + String value = stringValues[i]; + long time = timeValues[i]; + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, value, time); + } } - } + }); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunction.java index 7049d0f1db..3d04803fde 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunction.java @@ -29,9 +29,11 @@ import org.apache.pinot.core.query.aggregation.AggregationResultHolder; import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder; import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; +import org.apache.pinot.segment.local.customobject.IntLongPair; import org.apache.pinot.segment.local.customobject.ValueLongPair; import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.roaringbitmap.IntIterator; /** @@ -47,14 +49,13 @@ import org.apache.pinot.spi.data.FieldSpec.DataType; */ @SuppressWarnings({"rawtypes", "unchecked"}) public abstract class FirstWithTimeAggregationFunction<V extends Comparable<V>> - extends BaseSingleInputAggregationFunction<ValueLongPair<V>, V> { + extends NullableSingleInputAggregationFunction<ValueLongPair<V>, V> { protected final ExpressionContext _timeCol; private final ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> _objectSerDe; - public FirstWithTimeAggregationFunction(ExpressionContext dataCol, - ExpressionContext timeCol, - ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> objectSerDe) { - super(dataCol); + public FirstWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol, + ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> objectSerDe, boolean nullHandlingEnabled) { + super(dataCol, nullHandlingEnabled); _timeCol = timeCol; _objectSerDe = objectSerDe; } @@ -63,8 +64,7 @@ public abstract class FirstWithTimeAggregationFunction<V extends Comparable<V>> public abstract ValueLongPair<V> getDefaultValueTimePair(); - public abstract void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, - BlockValSet blockValSet, BlockValSet timeValSet); + public abstract V readCell(BlockValSet block, int docId); public abstract void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray, @@ -100,23 +100,45 @@ public abstract class FirstWithTimeAggregationFunction<V extends Comparable<V>> BlockValSet blockValSet = blockValSetMap.get(_expression); BlockValSet blockTimeSet = blockValSetMap.get(_timeCol); if (blockValSet.getValueType() != DataType.BYTES) { - aggregateResultWithRawData(length, aggregationResultHolder, blockValSet, blockTimeSet); + IntLongPair defaultPair = new IntLongPair(Integer.MIN_VALUE, Long.MAX_VALUE); + long[] timeValues = blockTimeSet.getLongValuesSV(); + + IntIterator nullIdxIterator = orNullIterator(blockValSet, blockTimeSet); + IntLongPair bestPair = foldNotNull(length, nullIdxIterator, defaultPair, (pair, from, to) -> { + IntLongPair actualPair = pair; + for (int i = from; i < to; i++) { + long time = timeValues[i]; + if (time <= actualPair.getTime()) { + actualPair = new IntLongPair(i, time); + } + } + return actualPair; + }); + V bestValue; + if (bestPair.getValue() < 0) { + bestValue = getDefaultValueTimePair().getValue(); + } else { + bestValue = readCell(blockValSet, bestPair.getValue()); + } + setAggregationResult(aggregationResultHolder, bestValue, bestPair.getTime()); } else { + // We assume bytes contain the binary serialization of FirstPair ValueLongPair<V> defaultValueLongPair = getDefaultValueTimePair(); - V firstData = defaultValueLongPair.getValue(); - long firstTime = defaultValueLongPair.getTime(); - // Serialized FirstPair + + ValueLongPair<V> result = constructValueLongPair(defaultValueLongPair.getValue(), defaultValueLongPair.getTime()); byte[][] bytesValues = blockValSet.getBytesValuesSV(); - for (int i = 0; i < length; i++) { - ValueLongPair<V> firstWithTimePair = _objectSerDe.deserialize(bytesValues[i]); - V data = firstWithTimePair.getValue(); - long time = firstWithTimePair.getTime(); - if (time <= firstTime) { - firstTime = time; - firstData = data; + forEachNotNull(length, blockValSet, (from, to) -> { + for (int i = from; i < to; i++) { + ValueLongPair<V> firstWithTimePair = _objectSerDe.deserialize(bytesValues[i]); + long time = firstWithTimePair.getTime(); + if (time < result.getTime()) { + result.setTime(time); + result.setValue(firstWithTimePair.getValue()); + } } - } - setAggregationResult(aggregationResultHolder, firstData, firstTime); + }); + + setAggregationResult(aggregationResultHolder, result.getValue(), result.getTime()); } } @@ -138,13 +160,15 @@ public abstract class FirstWithTimeAggregationFunction<V extends Comparable<V>> } else { // Serialized FirstPair byte[][] bytesValues = blockValSet.getBytesValuesSV(); - for (int i = 0; i < length; i++) { - ValueLongPair<V> firstWithTimePair = _objectSerDe.deserialize(bytesValues[i]); - setGroupByResult(groupKeyArray[i], - groupByResultHolder, - firstWithTimePair.getValue(), - firstWithTimePair.getTime()); - } + forEachNotNull(length, blockValSet, (from, to) -> { + for (int i = from; i < to; i++) { + ValueLongPair<V> firstWithTimePair = _objectSerDe.deserialize(bytesValues[i]); + setGroupByResult(groupKeyArray[i], + groupByResultHolder, + firstWithTimePair.getValue(), + firstWithTimePair.getTime()); + } + }); } } @@ -158,14 +182,16 @@ public abstract class FirstWithTimeAggregationFunction<V extends Comparable<V>> } else { // Serialized ValueTimePair byte[][] bytesValues = blockValSet.getBytesValuesSV(); - for (int i = 0; i < length; i++) { - ValueLongPair<V> firstWithTimePair = _objectSerDe.deserialize(bytesValues[i]); - V data = firstWithTimePair.getValue(); - long time = firstWithTimePair.getTime(); - for (int groupKey : groupKeysArray[i]) { - setGroupByResult(groupKey, groupByResultHolder, data, time); + forEachNotNull(length, blockValSet, (from, to) -> { + for (int i = from; i < to; i++) { + ValueLongPair<V> firstWithTimePair = _objectSerDe.deserialize(bytesValues[i]); + V data = firstWithTimePair.getValue(); + long time = firstWithTimePair.getTime(); + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, data, time); + } } - } + }); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java index 93aec2b0c1..398455c799 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java @@ -22,10 +22,10 @@ import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.common.BlockValSet; import org.apache.pinot.core.common.ObjectSerDeUtils; -import org.apache.pinot.core.query.aggregation.AggregationResultHolder; import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.segment.local.customobject.DoubleLongPair; import org.apache.pinot.segment.local.customobject.ValueLongPair; +import org.roaringbitmap.IntIterator; /** @@ -41,8 +41,9 @@ import org.apache.pinot.segment.local.customobject.ValueLongPair; public class LastDoubleValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Double> { private final static ValueLongPair<Double> DEFAULT_VALUE_TIME_PAIR = new DoubleLongPair(Double.NaN, Long.MIN_VALUE); - public LastDoubleValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol) { - super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE); + public LastDoubleValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol, + boolean nullHandlingEnabled) { + super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE, nullHandlingEnabled); } @Override @@ -56,22 +57,8 @@ public class LastDoubleValueWithTimeAggregationFunction extends LastWithTimeAggr } @Override - public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, - BlockValSet blockValSet, BlockValSet timeValSet) { - ValueLongPair<Double> defaultValueLongPair = getDefaultValueTimePair(); - Double lastData = defaultValueLongPair.getValue(); - long lastTime = defaultValueLongPair.getTime(); - double[] doubleValues = blockValSet.getDoubleValuesSV(); - long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - double data = doubleValues[i]; - long time = timeValues[i]; - if (time >= lastTime) { - lastTime = time; - lastData = data; - } - } - setAggregationResult(aggregationResultHolder, lastData, lastTime); + public Double readCell(BlockValSet block, int docId) { + return block.getDoubleValuesSV()[docId]; } @Override @@ -79,11 +66,15 @@ public class LastDoubleValueWithTimeAggregationFunction extends LastWithTimeAggr GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) { double[] doubleValues = blockValSet.getDoubleValuesSV(); long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - double data = doubleValues[i]; - long time = timeValues[i]; - setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); - } + + IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet); + forEachNotNull(length, nullIdxIterator, (from, to) -> { + for (int i = from; i < to; i++) { + double data = doubleValues[i]; + long time = timeValues[i]; + setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); + } + }); } @Override @@ -91,13 +82,17 @@ public class LastDoubleValueWithTimeAggregationFunction extends LastWithTimeAggr GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) { double[] doubleValues = blockValSet.getDoubleValuesSV(); long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - double value = doubleValues[i]; - long time = timeValues[i]; - for (int groupKey : groupKeysArray[i]) { - setGroupByResult(groupKey, groupByResultHolder, value, time); + + IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet); + forEachNotNull(length, nullIdxIterator, (from, to) -> { + for (int i = from; i < to; i++) { + double value = doubleValues[i]; + long time = timeValues[i]; + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, value, time); + } } - } + }); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastFloatValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastFloatValueWithTimeAggregationFunction.java index 50635874c9..6d7eaaa172 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastFloatValueWithTimeAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastFloatValueWithTimeAggregationFunction.java @@ -22,10 +22,10 @@ import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.common.BlockValSet; import org.apache.pinot.core.common.ObjectSerDeUtils; -import org.apache.pinot.core.query.aggregation.AggregationResultHolder; import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.segment.local.customobject.FloatLongPair; import org.apache.pinot.segment.local.customobject.ValueLongPair; +import org.roaringbitmap.IntIterator; /** @@ -41,8 +41,9 @@ import org.apache.pinot.segment.local.customobject.ValueLongPair; public class LastFloatValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Float> { private final static ValueLongPair<Float> DEFAULT_VALUE_TIME_PAIR = new FloatLongPair(Float.NaN, Long.MIN_VALUE); - public LastFloatValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol) { - super(dataCol, timeCol, ObjectSerDeUtils.FLOAT_LONG_PAIR_SER_DE); + public LastFloatValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol, + boolean nullHandlingEnabled) { + super(dataCol, timeCol, ObjectSerDeUtils.FLOAT_LONG_PAIR_SER_DE, nullHandlingEnabled); } @Override @@ -56,22 +57,8 @@ public class LastFloatValueWithTimeAggregationFunction extends LastWithTimeAggre } @Override - public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, - BlockValSet blockValSet, BlockValSet timeValSet) { - ValueLongPair<Float> defaultValueLongPair = getDefaultValueTimePair(); - Float lastData = defaultValueLongPair.getValue(); - long lastTime = defaultValueLongPair.getTime(); - float[] floatValues = blockValSet.getFloatValuesSV(); - long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - float data = floatValues[i]; - long time = timeValues[i]; - if (time >= lastTime) { - lastTime = time; - lastData = data; - } - } - setAggregationResult(aggregationResultHolder, lastData, lastTime); + public Float readCell(BlockValSet block, int docId) { + return block.getFloatValuesSV()[docId]; } @Override @@ -79,11 +66,15 @@ public class LastFloatValueWithTimeAggregationFunction extends LastWithTimeAggre GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) { float[] floatValues = blockValSet.getFloatValuesSV(); long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - float data = floatValues[i]; - long time = timeValues[i]; - setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); - } + + IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet); + forEachNotNull(length, nullIdxIterator, (from, to) -> { + for (int i = from; i < to; i++) { + float data = floatValues[i]; + long time = timeValues[i]; + setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); + } + }); } @Override @@ -91,13 +82,17 @@ public class LastFloatValueWithTimeAggregationFunction extends LastWithTimeAggre GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) { float[] floatValues = blockValSet.getFloatValuesSV(); long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - float value = floatValues[i]; - long time = timeValues[i]; - for (int groupKey : groupKeysArray[i]) { - setGroupByResult(groupKey, groupByResultHolder, value, time); + + IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet); + forEachNotNull(length, nullIdxIterator, (from, to) -> { + for (int i = from; i < to; i++) { + float value = floatValues[i]; + long time = timeValues[i]; + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, value, time); + } } - } + }); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastIntValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastIntValueWithTimeAggregationFunction.java index 87d33ce0df..e124f58d0e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastIntValueWithTimeAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastIntValueWithTimeAggregationFunction.java @@ -22,10 +22,10 @@ import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.common.BlockValSet; import org.apache.pinot.core.common.ObjectSerDeUtils; -import org.apache.pinot.core.query.aggregation.AggregationResultHolder; import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.segment.local.customobject.IntLongPair; import org.apache.pinot.segment.local.customobject.ValueLongPair; +import org.roaringbitmap.IntIterator; /** @@ -46,8 +46,8 @@ public class LastIntValueWithTimeAggregationFunction extends LastWithTimeAggrega private final boolean _isBoolean; public LastIntValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol, - boolean isBoolean) { - super(dataCol, timeCol, ObjectSerDeUtils.INT_LONG_PAIR_SER_DE); + boolean nullHandlingEnabled, boolean isBoolean) { + super(dataCol, timeCol, ObjectSerDeUtils.INT_LONG_PAIR_SER_DE, nullHandlingEnabled); _isBoolean = isBoolean; } @@ -62,22 +62,8 @@ public class LastIntValueWithTimeAggregationFunction extends LastWithTimeAggrega } @Override - public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, - BlockValSet blockValSet, BlockValSet timeValSet) { - ValueLongPair<Integer> defaultValueLongPair = getDefaultValueTimePair(); - Integer lastData = defaultValueLongPair.getValue(); - long lastTime = defaultValueLongPair.getTime(); - int[] intValues = blockValSet.getIntValuesSV(); - long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - int data = intValues[i]; - long time = timeValues[i]; - if (time >= lastTime) { - lastTime = time; - lastData = data; - } - } - setAggregationResult(aggregationResultHolder, lastData, lastTime); + public Integer readCell(BlockValSet block, int docId) { + return block.getIntValuesSV()[docId]; } @Override @@ -85,11 +71,15 @@ public class LastIntValueWithTimeAggregationFunction extends LastWithTimeAggrega GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) { int[] intValues = blockValSet.getIntValuesSV(); long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - int data = intValues[i]; - long time = timeValues[i]; - setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); - } + + IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet); + forEachNotNull(length, nullIdxIterator, (from, to) -> { + for (int i = from; i < to; i++) { + int data = intValues[i]; + long time = timeValues[i]; + setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); + } + }); } @Override @@ -97,13 +87,17 @@ public class LastIntValueWithTimeAggregationFunction extends LastWithTimeAggrega GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) { int[] intValues = blockValSet.getIntValuesSV(); long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - int value = intValues[i]; - long time = timeValues[i]; - for (int groupKey : groupKeysArray[i]) { - setGroupByResult(groupKey, groupByResultHolder, value, time); + + IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet); + forEachNotNull(length, nullIdxIterator, (from, to) -> { + for (int i = from; i < to; i++) { + int value = intValues[i]; + long time = timeValues[i]; + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, value, time); + } } - } + }); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastLongValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastLongValueWithTimeAggregationFunction.java index 248487d635..6c0e08e761 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastLongValueWithTimeAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastLongValueWithTimeAggregationFunction.java @@ -22,10 +22,10 @@ import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.common.BlockValSet; import org.apache.pinot.core.common.ObjectSerDeUtils; -import org.apache.pinot.core.query.aggregation.AggregationResultHolder; import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.segment.local.customobject.LongLongPair; import org.apache.pinot.segment.local.customobject.ValueLongPair; +import org.roaringbitmap.IntIterator; /** @@ -41,8 +41,9 @@ import org.apache.pinot.segment.local.customobject.ValueLongPair; public class LastLongValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Long> { private final static ValueLongPair<Long> DEFAULT_VALUE_TIME_PAIR = new LongLongPair(Long.MIN_VALUE, Long.MIN_VALUE); - public LastLongValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol) { - super(dataCol, timeCol, ObjectSerDeUtils.LONG_LONG_PAIR_SER_DE); + public LastLongValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol, + boolean nullHandlingEnabled) { + super(dataCol, timeCol, ObjectSerDeUtils.LONG_LONG_PAIR_SER_DE, nullHandlingEnabled); } @Override @@ -56,22 +57,8 @@ public class LastLongValueWithTimeAggregationFunction extends LastWithTimeAggreg } @Override - public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, - BlockValSet blockValSet, BlockValSet timeValSet) { - ValueLongPair<Long> defaultValueLongPair = getDefaultValueTimePair(); - Long lastData = defaultValueLongPair.getValue(); - long lastTime = defaultValueLongPair.getTime(); - long[] longValues = blockValSet.getLongValuesSV(); - long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - long data = longValues[i]; - long time = timeValues[i]; - if (time >= lastTime) { - lastTime = time; - lastData = data; - } - } - setAggregationResult(aggregationResultHolder, lastData, lastTime); + public Long readCell(BlockValSet block, int docId) { + return block.getLongValuesSV()[docId]; } @Override @@ -79,11 +66,15 @@ public class LastLongValueWithTimeAggregationFunction extends LastWithTimeAggreg GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) { long[] longValues = blockValSet.getLongValuesSV(); long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - long data = longValues[i]; - long time = timeValues[i]; - setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); - } + + IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet); + forEachNotNull(length, nullIdxIterator, (from, to) -> { + for (int i = from; i < to; i++) { + long data = longValues[i]; + long time = timeValues[i]; + setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); + } + }); } @Override @@ -91,13 +82,17 @@ public class LastLongValueWithTimeAggregationFunction extends LastWithTimeAggreg GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) { long[] longValues = blockValSet.getLongValuesSV(); long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - long value = longValues[i]; - long time = timeValues[i]; - for (int groupKey : groupKeysArray[i]) { - setGroupByResult(groupKey, groupByResultHolder, value, time); + + IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet); + forEachNotNull(length, nullIdxIterator, (from, to) -> { + for (int i = from; i < to; i++) { + long value = longValues[i]; + long time = timeValues[i]; + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, value, time); + } } - } + }); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastStringValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastStringValueWithTimeAggregationFunction.java index 0f10e9399d..2b200b157a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastStringValueWithTimeAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastStringValueWithTimeAggregationFunction.java @@ -22,10 +22,10 @@ import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.common.BlockValSet; import org.apache.pinot.core.common.ObjectSerDeUtils; -import org.apache.pinot.core.query.aggregation.AggregationResultHolder; import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.segment.local.customobject.StringLongPair; import org.apache.pinot.segment.local.customobject.ValueLongPair; +import org.roaringbitmap.IntIterator; /** @@ -41,8 +41,9 @@ import org.apache.pinot.segment.local.customobject.ValueLongPair; public class LastStringValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<String> { private final static ValueLongPair<String> DEFAULT_VALUE_TIME_PAIR = new StringLongPair("", Long.MIN_VALUE); - public LastStringValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol) { - super(dataCol, timeCol, ObjectSerDeUtils.STRING_LONG_PAIR_SER_DE); + public LastStringValueWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol, + boolean nullHandlingEnabled) { + super(dataCol, timeCol, ObjectSerDeUtils.STRING_LONG_PAIR_SER_DE, nullHandlingEnabled); } @Override @@ -56,22 +57,8 @@ public class LastStringValueWithTimeAggregationFunction extends LastWithTimeAggr } @Override - public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, - BlockValSet blockValSet, BlockValSet timeValSet) { - ValueLongPair<String> defaultValueLongPair = getDefaultValueTimePair(); - String lastData = defaultValueLongPair.getValue(); - long lastTime = defaultValueLongPair.getTime(); - String[] stringValues = blockValSet.getStringValuesSV(); - long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - String data = stringValues[i]; - long time = timeValues[i]; - if (time >= lastTime) { - lastTime = time; - lastData = data; - } - } - setAggregationResult(aggregationResultHolder, lastData, lastTime); + public String readCell(BlockValSet block, int docId) { + return block.getStringValuesSV()[docId]; } @Override @@ -79,11 +66,15 @@ public class LastStringValueWithTimeAggregationFunction extends LastWithTimeAggr GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) { String[] stringValues = blockValSet.getStringValuesSV(); long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - String data = stringValues[i]; - long time = timeValues[i]; - setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); - } + + IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet); + forEachNotNull(length, nullIdxIterator, (from, to) -> { + for (int i = from; i < to; i++) { + String data = stringValues[i]; + long time = timeValues[i]; + setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); + } + }); } @Override @@ -91,13 +82,17 @@ public class LastStringValueWithTimeAggregationFunction extends LastWithTimeAggr GroupByResultHolder groupByResultHolder, BlockValSet blockValSet, BlockValSet timeValSet) { String[] stringValues = blockValSet.getStringValuesSV(); long[] timeValues = timeValSet.getLongValuesSV(); - for (int i = 0; i < length; i++) { - String value = stringValues[i]; - long time = timeValues[i]; - for (int groupKey : groupKeysArray[i]) { - setGroupByResult(groupKey, groupByResultHolder, value, time); + + IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet); + forEachNotNull(length, nullIdxIterator, (from, to) -> { + for (int i = from; i < to; i++) { + String value = stringValues[i]; + long time = timeValues[i]; + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, value, time); + } } - } + }); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java index cd4b0576a6..565445ae6f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java @@ -29,9 +29,11 @@ import org.apache.pinot.core.query.aggregation.AggregationResultHolder; import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder; import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; +import org.apache.pinot.segment.local.customobject.IntLongPair; import org.apache.pinot.segment.local.customobject.ValueLongPair; import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.roaringbitmap.IntIterator; /** @@ -47,14 +49,14 @@ import org.apache.pinot.spi.data.FieldSpec.DataType; */ @SuppressWarnings({"rawtypes", "unchecked"}) public abstract class LastWithTimeAggregationFunction<V extends Comparable<V>> - extends BaseSingleInputAggregationFunction<ValueLongPair<V>, V> { + extends NullableSingleInputAggregationFunction<ValueLongPair<V>, V> { protected final ExpressionContext _timeCol; private final ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> _objectSerDe; public LastWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol, - ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> objectSerDe) { - super(dataCol); + ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> objectSerDe, boolean nullHandlingEnabled) { + super(dataCol, nullHandlingEnabled); _timeCol = timeCol; _objectSerDe = objectSerDe; } @@ -63,8 +65,7 @@ public abstract class LastWithTimeAggregationFunction<V extends Comparable<V>> public abstract ValueLongPair<V> getDefaultValueTimePair(); - public abstract void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, - BlockValSet blockValSet, BlockValSet timeValSet); + public abstract V readCell(BlockValSet block, int docId); public abstract void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray, @@ -100,8 +101,29 @@ public abstract class LastWithTimeAggregationFunction<V extends Comparable<V>> BlockValSet blockValSet = blockValSetMap.get(_expression); BlockValSet blockTimeSet = blockValSetMap.get(_timeCol); if (blockValSet.getValueType() != DataType.BYTES) { - aggregateResultWithRawData(length, aggregationResultHolder, blockValSet, blockTimeSet); + IntLongPair defaultPair = new IntLongPair(Integer.MIN_VALUE, Long.MIN_VALUE); + long[] timeValues = blockTimeSet.getLongValuesSV(); + + IntIterator nullIdxIterator = orNullIterator(blockValSet, blockTimeSet); + IntLongPair bestPair = foldNotNull(length, nullIdxIterator, defaultPair, (pair, from, to) -> { + IntLongPair actualPair = pair; + for (int i = from; i < to; i++) { + long time = timeValues[i]; + if (time >= actualPair.getTime()) { + actualPair = new IntLongPair(i, time); + } + } + return actualPair; + }); + V bestValue; + if (bestPair.getValue() < 0) { + bestValue = getDefaultValueTimePair().getValue(); + } else { + bestValue = readCell(blockValSet, bestPair.getValue()); + } + setAggregationResult(aggregationResultHolder, bestValue, bestPair.getTime()); } else { + // We assume bytes contain the binary serialization of FirstPair ValueLongPair<V> defaultValueLongPair = getDefaultValueTimePair(); V lastData = defaultValueLongPair.getValue(); long lastTime = defaultValueLongPair.getTime(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/NullableSingleInputAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/NullableSingleInputAggregationFunction.java index 0a42db7442..78f1ae1269 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/NullableSingleInputAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/NullableSingleInputAggregationFunction.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.query.aggregation.function; +import java.util.NoSuchElementException; import javax.annotation.Nullable; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.core.common.BlockValSet; @@ -138,4 +139,116 @@ public abstract class NullableSingleInputAggregationFunction<I, F extends Compar } return acum; } + + public IntIterator orNullIterator(BlockValSet valSet1, BlockValSet valSet2) { + if (!_nullHandlingEnabled) { + return EmptyIntIterator.INSTANCE; + } else { + RoaringBitmap nullBlock1 = valSet1.getNullBitmap(); + RoaringBitmap nullBlock2 = valSet2.getNullBitmap(); + if (nullBlock1 == null) { + return nullBlock2 == null ? EmptyIntIterator.INSTANCE : nullBlock2.getIntIterator(); + } else if (nullBlock2 == null) { + return nullBlock1.getIntIterator(); + } else { + return new MinIntIterator(nullBlock1.getIntIterator(), nullBlock2.getIntIterator()); + } + } + } + + public static class EmptyIntIterator implements IntIterator { + + public static final EmptyIntIterator INSTANCE = new EmptyIntIterator(); + + private EmptyIntIterator() { + } + + @Override + public IntIterator clone() { + return this; + } + + @Override + public boolean hasNext() { + return false; + } + + @Override + public int next() { + throw new NoSuchElementException(); + } + } + + public static class MinIntIterator implements IntIterator { + private final IntIterator _it1; + private final IntIterator _it2; + private int _next1 = -1; + private int _next2 = -1; + + /** + * @param it1 it has to iterate in ascending order and the min value is 0 + * @param it2 it has to iterate in ascending order and the min value is 0 + */ + public MinIntIterator(IntIterator it1, IntIterator it2) { + _it1 = it1; + _it2 = it2; + } + + @Override + public IntIterator clone() { + return new MinIntIterator(_it1.clone(), _it2.clone()); + } + + @Override + public boolean hasNext() { + return _next1 > 0 || _next2 > 0 || _it1.hasNext() || _it2.hasNext(); + } + + @Override + public int next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + if (_next1 < 0) { + if (_it1.hasNext()) { + _next1 = _it1.next(); + } else { //it1 is completely consumed + if (_next2 >= 0) { // consume the last cached value + return consume2(); + } else { // after that, return all values from it2 + return _it2.next(); + } + } + } + if (_next2 < 0) { + if (_it2.hasNext()) { + _next2 = _it2.next(); + } else { //it2 is completely consumed + if (_next1 >= 0) { // consume the last cached value + return consume1(); + } else { // after that, return all values from it1 + return _it1.next(); + } + } + } + assert _next1 >= 0 && _next2 >= 0; + if (_next1 <= _next2) { + return consume1(); + } else { + return consume2(); + } + } + + private int consume1() { + int nextVal = _next1; + _next1 = -1; + return nextVal; + } + + private int consume2() { + int nextVal = _next2; + _next2 = -1; + return nextVal; + } + } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunctionTest.java new file mode 100644 index 0000000000..541b007d47 --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunctionTest.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.core.query.aggregation.function; + +import org.apache.pinot.common.utils.PinotDataType; +import org.apache.pinot.queries.FluentQueryTest; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + + +public class FirstWithTimeAggregationFunctionTest extends AbstractAggregationFunctionTest { + + @DataProvider(name = "scenarios") + Object[] scenarios() { + return new Object[] { + new Scenario(FieldSpec.DataType.INT, "1", "2", "-2147483648"), + new Scenario(FieldSpec.DataType.LONG, "1", "2", "-9223372036854775808"), + new Scenario(FieldSpec.DataType.FLOAT, "1", "2", "-Infinity"), + new Scenario(FieldSpec.DataType.DOUBLE, "1", "2", "-Infinity"), + new Scenario(FieldSpec.DataType.STRING, "a", "b", "\"null\""), + }; + } + + public class Scenario { + private final PinotDataType _pinotDataType; + private final FieldSpec.DataType _dataType; + private final String _valAsStr1; + private final String _valAsStr2; + private final String _defaultNullValue; + + public Scenario(FieldSpec.DataType dataType, String valAsStr1, String valAsStr2, String defaultNullValue) { + _dataType = dataType; + _valAsStr1 = valAsStr1; + _valAsStr2 = valAsStr2; + _defaultNullValue = defaultNullValue; + _pinotDataType = + _dataType == FieldSpec.DataType.INT ? PinotDataType.INTEGER : PinotDataType.valueOf(_dataType.name()); + } + + public FluentQueryTest.DeclaringTable getDeclaringTable(boolean nullHandlingEnabled) { + Schema schema = new Schema.SchemaBuilder() + .setSchemaName("testTable") + .setEnableColumnBasedNullHandling(true) + .addDimensionField("myField", _dataType, f -> f.setNullable(true)) + .addDimensionField("timeField", FieldSpec.DataType.TIMESTAMP) + .build(); + TableConfigBuilder tableConfigBuilder = new TableConfigBuilder(TableType.OFFLINE) + .setTableName("testTable"); + + return FluentQueryTest.withBaseDir(_baseDir) + .withNullHandling(nullHandlingEnabled) + .givenTable(schema, tableConfigBuilder.build()); + } + + @Override + public String toString() { + return "Scenario{" + "dt=" + _dataType + ", val1='" + _valAsStr1 + '\'' + ", val2='" + + _valAsStr2 + '\'' + '}'; + } + } + + @Test(dataProvider = "scenarios") + void aggrWithoutNull(Scenario scenario) { + scenario.getDeclaringTable(false) + .onFirstInstance("myField | timeField", + "null | 1", + scenario._valAsStr1 + " | 2", + "null | 3" + ).andOnSecondInstance("myField | timeField", + "null | 4", + scenario._valAsStr2 + " | 5", + "null | 6" + ) + .whenQuery("select FIRST_WITH_TIME(myField, timeField, '" + scenario._dataType + "') from testTable") + .thenResultIs(scenario._pinotDataType.name(), scenario._defaultNullValue); + } + + @Test(dataProvider = "scenarios") + void aggrWithNull(Scenario scenario) { + scenario.getDeclaringTable(true) + .onFirstInstance("myField | timeField", + "null | 1", + scenario._valAsStr1 + " | 2", + "null | 3" + ).andOnSecondInstance("myField | timeField", + "null | 4", + scenario._valAsStr2 + " | 5", + "null | 6" + ) + .whenQuery("select FIRST_WITH_TIME(myField, timeField, '" + scenario._dataType + "') from testTable") + .thenResultIs(scenario._pinotDataType.name(), scenario._valAsStr1); + } + + @Test(dataProvider = "scenarios") + void aggrSvWithoutNull(Scenario scenario) { + scenario.getDeclaringTable(false) + .onFirstInstance("myField | timeField", + "null | 1", + scenario._valAsStr1 + " | 2", + "null | 3" + ).andOnSecondInstance("myField | timeField", + "null | 4", + scenario._valAsStr2 + " | 5", + "null | 6" + ).whenQuery("select 'cte', FIRST_WITH_TIME(myField, timeField, '" + scenario._dataType + "') as mode " + + "from testTable " + + "group by 'cte'") + .thenResultIs("STRING | " + scenario._pinotDataType.name(), "cte | " + scenario._defaultNullValue); + } + + @Test(dataProvider = "scenarios") + void aggrSvWithNull(Scenario scenario) { + scenario.getDeclaringTable(true) + .onFirstInstance("myField | timeField", + "null | 1", + scenario._valAsStr1 + " | 2", + "null | 3" + ).andOnSecondInstance("myField | timeField", + "null | 4", + scenario._valAsStr2 + " | 5", + "null | 6" + ).whenQuery("select 'cte', FIRST_WITH_TIME(myField, timeField, '" + scenario._dataType + "') as mode " + + "from testTable " + + "group by 'cte'") + .thenResultIs("STRING | " + scenario._pinotDataType.name(), "cte | " + scenario._valAsStr1); + } + + @Test(dataProvider = "scenarios") + void aggrMvWithoutNull(Scenario scenario) { + scenario.getDeclaringTable(false) + .onFirstInstance("myField | timeField", + "null | 1", + scenario._valAsStr1 + " | 2", + "null | 3" + ).andOnSecondInstance("myField | timeField", + "null | 4", + scenario._valAsStr2 + " | 5", + "null | 6" + ).whenQuery("select 'cte1' as cte1, 'cte2' as cte2, " + + "FIRST_WITH_TIME(myField, timeField, '" + scenario._dataType + "') as mode " + + "from testTable " + + "group by 'cte'") + .thenResultIs("STRING | STRING | " + scenario._pinotDataType.name(), + "cte1 | cte2 | " + scenario._defaultNullValue); + } + + @Test(dataProvider = "scenarios") + void aggrMvWithNull(Scenario scenario) { + scenario.getDeclaringTable(true) + .onFirstInstance("myField | timeField", + "null | 1", + scenario._valAsStr1 + " | 2", + "null | 3" + ).andOnSecondInstance("myField | timeField", + "null | 4", + scenario._valAsStr2 + " | 5", + "null | 6" + ).whenQuery("select 'cte1' as cte1, 'cte2' as cte2, " + + "FIRST_WITH_TIME(myField, timeField, '" + scenario._dataType + "') as mode " + + "from testTable " + + "group by 'cte'") + .thenResultIs("STRING | STRING | " + scenario._pinotDataType.name(), + "cte1 | cte2 | " + scenario._valAsStr1); + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunctionTest.java new file mode 100644 index 0000000000..9c612ae538 --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunctionTest.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.core.query.aggregation.function; + +import org.apache.pinot.common.utils.PinotDataType; +import org.apache.pinot.queries.FluentQueryTest; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + + +public class LastWithTimeAggregationFunctionTest extends AbstractAggregationFunctionTest { + + @DataProvider(name = "scenarios") + Object[] scenarios() { + return new Object[] { + new Scenario(FieldSpec.DataType.INT, "1", "2", "-2147483648"), + new Scenario(FieldSpec.DataType.LONG, "1", "2", "-9223372036854775808"), + new Scenario(FieldSpec.DataType.FLOAT, "1", "2", "-Infinity"), + new Scenario(FieldSpec.DataType.DOUBLE, "1", "2", "-Infinity"), + new Scenario(FieldSpec.DataType.STRING, "a", "b", "\"null\""), + }; + } + + public class Scenario { + private final PinotDataType _pinotDataType; + private final FieldSpec.DataType _dataType; + private final String _valAsStr1; + private final String _valAsStr2; + private final String _defaultNullValue; + + public Scenario(FieldSpec.DataType dataType, String valAsStr1, String valAsStr2, String defaultNullValue) { + _dataType = dataType; + _valAsStr1 = valAsStr1; + _valAsStr2 = valAsStr2; + _defaultNullValue = defaultNullValue; + _pinotDataType = + _dataType == FieldSpec.DataType.INT ? PinotDataType.INTEGER : PinotDataType.valueOf(_dataType.name()); + } + + public FluentQueryTest.DeclaringTable getDeclaringTable(boolean nullHandlingEnabled) { + Schema schema = new Schema.SchemaBuilder() + .setSchemaName("testTable") + .setEnableColumnBasedNullHandling(true) + .addDimensionField("myField", _dataType, f -> f.setNullable(true)) + .addDimensionField("timeField", FieldSpec.DataType.TIMESTAMP) + .build(); + TableConfigBuilder tableConfigBuilder = new TableConfigBuilder(TableType.OFFLINE) + .setTableName("testTable"); + + return FluentQueryTest.withBaseDir(_baseDir) + .withNullHandling(nullHandlingEnabled) + .givenTable(schema, tableConfigBuilder.build()); + } + + @Override + public String toString() { + return "Scenario{" + "dt=" + _dataType + ", val1='" + _valAsStr1 + '\'' + ", val2='" + + _valAsStr2 + '\'' + '}'; + } + } + + @Test(dataProvider = "scenarios") + void aggrWithoutNull(Scenario scenario) { + scenario.getDeclaringTable(false) + .onFirstInstance("myField | timeField", + scenario._valAsStr1 + " | 2", + "null | 3" + ).andOnSecondInstance("myField | timeField", + scenario._valAsStr2 + " | 5", + "null | 6" + ) + .whenQuery("select LAST_WITH_TIME(myField, timeField, '" + scenario._dataType + "') from testTable") + .thenResultIs(scenario._pinotDataType.name(), scenario._defaultNullValue); + } + + @Test(dataProvider = "scenarios") + void aggrWithNull(Scenario scenario) { + scenario.getDeclaringTable(true) + .onFirstInstance("myField | timeField", + scenario._valAsStr1 + " | 2", + "null | 3" + ).andOnSecondInstance("myField | timeField", + scenario._valAsStr2 + " | 5", + "null | 6" + ) + .whenQuery("select LAST_WITH_TIME(myField, timeField, '" + scenario._dataType + "') from testTable") + .thenResultIs(scenario._pinotDataType.name(), scenario._valAsStr2); + } + + @Test(dataProvider = "scenarios") + void aggrSvWithoutNull(Scenario scenario) { + scenario.getDeclaringTable(false) + .onFirstInstance("myField | timeField", + scenario._valAsStr1 + " | 2", + "null | 3" + ).andOnSecondInstance("myField | timeField", + scenario._valAsStr2 + " | 5", + "null | 6" + ).whenQuery("select 'cte', LAST_WITH_TIME(myField, timeField, '" + scenario._dataType + "') as mode " + + "from testTable " + + "group by 'cte'") + .thenResultIs("STRING | " + scenario._pinotDataType.name(), "cte | " + scenario._defaultNullValue); + } + + @Test(dataProvider = "scenarios") + void aggrSvWithNull(Scenario scenario) { + scenario.getDeclaringTable(true) + .onFirstInstance("myField | timeField", + scenario._valAsStr1 + " | 2", + "null | 3" + ).andOnSecondInstance("myField | timeField", + scenario._valAsStr2 + " | 5", + "null | 6" + ).whenQuery("select 'cte', LAST_WITH_TIME(myField, timeField, '" + scenario._dataType + "') as mode " + + "from testTable " + + "group by 'cte'") + .thenResultIs("STRING | " + scenario._pinotDataType.name(), "cte | " + scenario._valAsStr2); + } + + @Test(dataProvider = "scenarios") + void aggrMvWithoutNull(Scenario scenario) { + scenario.getDeclaringTable(false) + .onFirstInstance("myField | timeField", + scenario._valAsStr1 + " | 2", + "null | 3" + ).andOnSecondInstance("myField | timeField", + scenario._valAsStr2 + " | 5", + "null | 6" + ).whenQuery("select 'cte1' as cte1, 'cte2' as cte2, " + + "LAST_WITH_TIME(myField, timeField, '" + scenario._dataType + "') as mode " + + "from testTable " + + "group by 'cte'") + .thenResultIs("STRING | STRING | " + scenario._pinotDataType.name(), + "cte1 | cte2 | " + scenario._defaultNullValue); + } + + @Test(dataProvider = "scenarios") + void aggrMvWithNull(Scenario scenario) { + scenario.getDeclaringTable(true) + .onFirstInstance("myField | timeField", + scenario._valAsStr1 + " | 2", + "null | 3" + ).andOnSecondInstance("myField | timeField", + scenario._valAsStr2 + " | 5", + "null | 6" + ).whenQuery("select 'cte1' as cte1, 'cte2' as cte2, " + + "LAST_WITH_TIME(myField, timeField, '" + scenario._dataType + "') as mode " + + "from testTable " + + "group by 'cte'") + .thenResultIs("STRING | STRING | " + scenario._pinotDataType.name(), + "cte1 | cte2 | " + scenario._valAsStr2); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueLongPair.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueLongPair.java index 81e2ded523..d6c3cf82fb 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueLongPair.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueLongPair.java @@ -35,6 +35,14 @@ public abstract class ValueLongPair<V extends Comparable<V>> implements Comparab return _time; } + public void setValue(V value) { + _value = value; + } + + public void setTime(long time) { + _time = time; + } + abstract public byte[] toBytes(); @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org