This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new b3172464ce add FIRSTWITHTIME aggregate function support #7647 (#8181) b3172464ce is described below commit b3172464cedff45a1af2be59db0822a6aeb6f403 Author: Yash Agarwal <yash.0...@gmail.com> AuthorDate: Wed May 4 23:38:15 2022 +0530 add FIRSTWITHTIME aggregate function support #7647 (#8181) --- .../function/AggregationFunctionTypeTest.java | 2 + .../function/AggregationFunctionFactory.java | 30 ++ ...irstDoubleValueWithTimeAggregationFunction.java | 126 ++++++ ...FirstFloatValueWithTimeAggregationFunction.java | 127 ++++++ .../FirstIntValueWithTimeAggregationFunction.java | 142 +++++++ .../FirstLongValueWithTimeAggregationFunction.java | 126 ++++++ ...irstStringValueWithTimeAggregationFunction.java | 124 ++++++ .../function/FirstWithTimeAggregationFunction.java | 222 +++++++++++ .../function/AggregationFunctionFactoryTest.java | 42 ++ .../pinot/queries/FirstWithTimeQueriesTest.java | 434 +++++++++++++++++++++ .../pinot/segment/spi/AggregationFunctionType.java | 1 + 11 files changed, 1376 insertions(+) diff --git a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java index ea9f69a815..f7a9ca16b0 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java @@ -33,6 +33,8 @@ public class AggregationFunctionTypeTest { Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("SuM"), AggregationFunctionType.SUM); Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("AvG"), AggregationFunctionType.AVG); Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("MoDe"), AggregationFunctionType.MODE); + Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("FiRsTwItHtImE"), + AggregationFunctionType.FIRSTWITHTIME); Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("LaStWiThTiMe"), AggregationFunctionType.LASTWITHTIME); Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("MiNmAxRaNgE"), diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java index 74c815edf5..fc684b434d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java @@ -160,6 +160,36 @@ public class AggregationFunctionFactory { return new AvgAggregationFunction(firstArgument); case MODE: return new ModeAggregationFunction(arguments); + case FIRSTWITHTIME: + if (arguments.size() == 3) { + ExpressionContext timeCol = arguments.get(1); + ExpressionContext dataType = arguments.get(2); + if (dataType.getType() != ExpressionContext.Type.LITERAL) { + throw new IllegalArgumentException("Third argument of firstWithTime Function should be literal." + + " The function can be used as firstWithTime(dataColumn, timeColumn, 'dataType')"); + } + FieldSpec.DataType fieldDataType + = FieldSpec.DataType.valueOf(dataType.getLiteral().toUpperCase()); + switch (fieldDataType) { + case BOOLEAN: + case INT: + return new FirstIntValueWithTimeAggregationFunction( + firstArgument, timeCol, fieldDataType == FieldSpec.DataType.BOOLEAN); + case LONG: + return new FirstLongValueWithTimeAggregationFunction(firstArgument, timeCol); + case FLOAT: + return new FirstFloatValueWithTimeAggregationFunction(firstArgument, timeCol); + case DOUBLE: + return new FirstDoubleValueWithTimeAggregationFunction(firstArgument, timeCol); + case STRING: + return new FirstStringValueWithTimeAggregationFunction(firstArgument, timeCol); + default: + throw new IllegalArgumentException("Unsupported Value Type for firstWithTime Function:" + dataType); + } + } else { + throw new IllegalArgumentException("Three arguments are required for firstWithTime Function." + + " The function can be used as firstWithTime(dataColumn, timeColumn, 'dataType')"); + } case LASTWITHTIME: if (arguments.size() == 3) { ExpressionContext timeCol = arguments.get(1); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstDoubleValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstDoubleValueWithTimeAggregationFunction.java new file mode 100644 index 0000000000..56aff397df --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstDoubleValueWithTimeAggregationFunction.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.segment.local.customobject.DoubleLongPair; +import org.apache.pinot.segment.local.customobject.ValueLongPair; + + +/** + * This function is used for FirstWithTime calculations for data column with double type. + * <p>The function can be used as FirstWithTime(dataExpression, timeExpression, 'double') + * <p>Following arguments are supported: + * <ul> + * <li>dataExpression: expression that contains the double data column to be calculated first on</li> + * <li>timeExpression: expression that contains the column to be used to decide which data is first, can be any + * Numeric column</li> + * </ul> + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class FirstDoubleValueWithTimeAggregationFunction extends FirstWithTimeAggregationFunction<Double> { + + private final static ValueLongPair<Double> DEFAULT_VALUE_TIME_PAIR + = new DoubleLongPair(Double.NaN, Long.MAX_VALUE); + + public FirstDoubleValueWithTimeAggregationFunction( + ExpressionContext dataCol, + ExpressionContext timeCol) { + super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE); + } + + @Override + public ValueLongPair<Double> constructValueLongPair(Double value, long time) { + return new DoubleLongPair(value, time); + } + + @Override + public ValueLongPair<Double> getDefaultValueTimePair() { + return DEFAULT_VALUE_TIME_PAIR; + } + + @Override + public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, + BlockValSet blockValSet, BlockValSet timeValSet) { + ValueLongPair<Double> defaultValueLongPair = getDefaultValueTimePair(); + Double firstData = defaultValueLongPair.getValue(); + long firstTime = defaultValueLongPair.getTime(); + double[] doubleValues = blockValSet.getDoubleValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + double data = doubleValues[i]; + long time = timeValues[i]; + if (time <= firstTime) { + firstTime = time; + firstData = data; + } + } + setAggregationResult(aggregationResultHolder, firstData, firstTime); + } + + @Override + public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, BlockValSet timeValSet) { + double[] doubleValues = blockValSet.getDoubleValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + double data = doubleValues[i]; + long time = timeValues[i]; + setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); + } + } + + @Override + public void aggregateGroupResultWithRawDataMv(int length, + int[][] groupKeysArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, + BlockValSet timeValSet) { + double[] doubleValues = blockValSet.getDoubleValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + double value = doubleValues[i]; + long time = timeValues[i]; + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, value, time); + } + } + } + + @Override + public String getResultColumnName() { + return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'DOUBLE')"; + } + + @Override + public String getColumnName() { + return getType().getName() + "_" + _expression + "_" + _timeCol + "_DOUBLE"; + } + + @Override + public ColumnDataType getFinalResultColumnType() { + return ColumnDataType.DOUBLE; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstFloatValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstFloatValueWithTimeAggregationFunction.java new file mode 100644 index 0000000000..3801ff17a3 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstFloatValueWithTimeAggregationFunction.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.segment.local.customobject.FloatLongPair; +import org.apache.pinot.segment.local.customobject.ValueLongPair; + + +/** + * This function is used for FirstWithTime calculations for data column with float type. + * <p>The function can be used as FirstWithTime(dataExpression, timeExpression, 'float') + * <p>Following arguments are supported: + * <ul> + * <li>dataExpression: expression that contains the float data column to be calculated first on</li> + * <li>timeExpression: expression that contains the column to be used to decide which data is first, can be any + * Numeric column</li> + * </ul> + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class FirstFloatValueWithTimeAggregationFunction extends FirstWithTimeAggregationFunction<Float> { + + private final static ValueLongPair<Float> DEFAULT_VALUE_TIME_PAIR = new FloatLongPair(Float.NaN, Long.MAX_VALUE); + + public FirstFloatValueWithTimeAggregationFunction( + ExpressionContext dataCol, + ExpressionContext timeCol) { + super(dataCol, timeCol, ObjectSerDeUtils.FLOAT_LONG_PAIR_SER_DE); + } + + @Override + public ValueLongPair<Float> constructValueLongPair(Float value, long time) { + return new FloatLongPair(value, time); + } + + @Override + public ValueLongPair<Float> getDefaultValueTimePair() { + return DEFAULT_VALUE_TIME_PAIR; + } + + @Override + public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, + BlockValSet blockValSet, BlockValSet timeValSet) { + ValueLongPair<Float> defaultValueLongPair = getDefaultValueTimePair(); + Float firstData = defaultValueLongPair.getValue(); + long firstTime = defaultValueLongPair.getTime(); + float[] floatValues = blockValSet.getFloatValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + float data = floatValues[i]; + long time = timeValues[i]; + if (time <= firstTime) { + firstTime = time; + firstData = data; + } + } + setAggregationResult(aggregationResultHolder, firstData, firstTime); + } + + @Override + public void aggregateGroupResultWithRawDataSv(int length, + int[] groupKeyArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, + BlockValSet timeValSet) { + float[] floatValues = blockValSet.getFloatValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + float data = floatValues[i]; + long time = timeValues[i]; + setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); + } + } + + @Override + public void aggregateGroupResultWithRawDataMv(int length, + int[][] groupKeysArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, + BlockValSet timeValSet) { + float[] floatValues = blockValSet.getFloatValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + float value = floatValues[i]; + long time = timeValues[i]; + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, value, time); + } + } + } + + @Override + public String getResultColumnName() { + return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'FLOAT')"; + } + + @Override + public String getColumnName() { + return getType().getName() + "_" + _expression + "_" + _timeCol + "_FLOAT"; + } + + @Override + public ColumnDataType getFinalResultColumnType() { + return ColumnDataType.FLOAT; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstIntValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstIntValueWithTimeAggregationFunction.java new file mode 100644 index 0000000000..be151b9994 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstIntValueWithTimeAggregationFunction.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.segment.local.customobject.IntLongPair; +import org.apache.pinot.segment.local.customobject.ValueLongPair; + + +/** + * This function is used for FirstWithTime calculations for data column with int/boolean type. + * <p>The function can be used as FirstWithTime(dataExpression, timeExpression, 'int') + * or FirstWithTime(dataExpression, timeExpression, 'boolean') + * <p>Following arguments are supported: + * <ul> + * <li>dataExpression: expression that contains the int/boolean data column to be calculated first on</li> + * <li>timeExpression: expression that contains the column to be used to decide which data is first, can be any + * Numeric column</li> + * </ul> + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class FirstIntValueWithTimeAggregationFunction extends FirstWithTimeAggregationFunction<Integer> { + + private final static ValueLongPair<Integer> DEFAULT_VALUE_TIME_PAIR + = new IntLongPair(Integer.MIN_VALUE, Long.MAX_VALUE); + private final boolean _isBoolean; + + public FirstIntValueWithTimeAggregationFunction( + ExpressionContext dataCol, + ExpressionContext timeCol, + boolean isBoolean) { + super(dataCol, timeCol, ObjectSerDeUtils.INT_LONG_PAIR_SER_DE); + _isBoolean = isBoolean; + } + + @Override + public ValueLongPair<Integer> constructValueLongPair(Integer value, long time) { + return new IntLongPair(value, time); + } + + @Override + public ValueLongPair<Integer> getDefaultValueTimePair() { + return DEFAULT_VALUE_TIME_PAIR; + } + + @Override + public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, + BlockValSet blockValSet, BlockValSet timeValSet) { + ValueLongPair<Integer> defaultValueLongPair = getDefaultValueTimePair(); + Integer firstData = defaultValueLongPair.getValue(); + long firstTime = defaultValueLongPair.getTime(); + int[] intValues = blockValSet.getIntValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + int data = intValues[i]; + long time = timeValues[i]; + if (time <= firstTime) { + firstTime = time; + firstData = data; + } + } + setAggregationResult(aggregationResultHolder, firstData, firstTime); + } + + @Override + public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, BlockValSet timeValSet) { + int[] intValues = blockValSet.getIntValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + int data = intValues[i]; + long time = timeValues[i]; + setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); + } + } + + @Override + public void aggregateGroupResultWithRawDataMv(int length, + int[][] groupKeysArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, + BlockValSet timeValSet) { + int[] intValues = blockValSet.getIntValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + int value = intValues[i]; + long time = timeValues[i]; + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, value, time); + } + } + } + + @Override + public String getResultColumnName() { + if (_isBoolean) { + return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'BOOLEAN')"; + } else { + return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'INT')"; + } + } + + @Override + public String getColumnName() { + if (_isBoolean) { + return getType().getName() + "_" + _expression + "_" + _timeCol + "_BOOLEAN"; + } else { + return getType().getName() + "_" + _expression + "_" + _timeCol + "_INT"; + } + } + + @Override + public ColumnDataType getFinalResultColumnType() { + if (_isBoolean) { + return ColumnDataType.BOOLEAN; + } else { + return ColumnDataType.INT; + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstLongValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstLongValueWithTimeAggregationFunction.java new file mode 100644 index 0000000000..960cda8820 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstLongValueWithTimeAggregationFunction.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.segment.local.customobject.LongLongPair; +import org.apache.pinot.segment.local.customobject.ValueLongPair; + + +/** + * This function is used for FirstWithTime calculations for data column with long type. + * <p>The function can be used as FirstWithTime(dataExpression, timeExpression, 'long') + * <p>Following arguments are supported: + * <ul> + * <li>dataExpression: expression that contains the long data column to be calculated first on</li> + * <li>timeExpression: expression that contains the column to be used to decide which data is first, can be any + * Numeric column</li> + * </ul> + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class FirstLongValueWithTimeAggregationFunction extends FirstWithTimeAggregationFunction<Long> { + + private final static ValueLongPair<Long> DEFAULT_VALUE_TIME_PAIR + = new LongLongPair(Long.MIN_VALUE, Long.MAX_VALUE); + + public FirstLongValueWithTimeAggregationFunction( + ExpressionContext dataCol, + ExpressionContext timeCol) { + super(dataCol, timeCol, ObjectSerDeUtils.LONG_LONG_PAIR_SER_DE); + } + + @Override + public ValueLongPair<Long> constructValueLongPair(Long value, long time) { + return new LongLongPair(value, time); + } + + @Override + public ValueLongPair<Long> getDefaultValueTimePair() { + return DEFAULT_VALUE_TIME_PAIR; + } + + @Override + public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, + BlockValSet blockValSet, BlockValSet timeValSet) { + ValueLongPair<Long> defaultValueLongPair = getDefaultValueTimePair(); + Long firstData = defaultValueLongPair.getValue(); + long firstTime = defaultValueLongPair.getTime(); + long[] longValues = blockValSet.getLongValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + long data = longValues[i]; + long time = timeValues[i]; + if (time <= firstTime) { + firstTime = time; + firstData = data; + } + } + setAggregationResult(aggregationResultHolder, firstData, firstTime); + } + + @Override + public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, BlockValSet timeValSet) { + long[] longValues = blockValSet.getLongValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + long data = longValues[i]; + long time = timeValues[i]; + setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); + } + } + + @Override + public void aggregateGroupResultWithRawDataMv(int length, + int[][] groupKeysArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, + BlockValSet timeValSet) { + long[] longValues = blockValSet.getLongValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + long value = longValues[i]; + long time = timeValues[i]; + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, value, time); + } + } + } + + @Override + public String getResultColumnName() { + return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'LONG')"; + } + + @Override + public String getColumnName() { + return getType().getName() + "_" + _expression + "_" + _timeCol + "_LONG"; + } + + @Override + public ColumnDataType getFinalResultColumnType() { + return ColumnDataType.LONG; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstStringValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstStringValueWithTimeAggregationFunction.java new file mode 100644 index 0000000000..b0ace7487e --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstStringValueWithTimeAggregationFunction.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.segment.local.customobject.StringLongPair; +import org.apache.pinot.segment.local.customobject.ValueLongPair; + + +/** + * This function is used for FirstWithTime calculations for data column with string type. + * <p>The function can be used as FirstWithTime(dataExpression, timeExpression, 'string') + * <p>Following arguments are supported: + * <ul> + * <li>dataExpression: expression that contains the string data column to be calculated first on</li> + * <li>timeExpression: expression that contains the column to be used to decide which data is first, can be any + * Numeric column</li> + * </ul> + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class FirstStringValueWithTimeAggregationFunction extends FirstWithTimeAggregationFunction<String> { + private final static ValueLongPair<String> DEFAULT_VALUE_TIME_PAIR = new StringLongPair("", Long.MAX_VALUE); + + public FirstStringValueWithTimeAggregationFunction( + ExpressionContext dataCol, + ExpressionContext timeCol) { + super(dataCol, timeCol, ObjectSerDeUtils.STRING_LONG_PAIR_SER_DE); + } + + @Override + public ValueLongPair<String> constructValueLongPair(String value, long time) { + return new StringLongPair(value, time); + } + + @Override + public ValueLongPair<String> getDefaultValueTimePair() { + return DEFAULT_VALUE_TIME_PAIR; + } + + @Override + public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, + BlockValSet blockValSet, BlockValSet timeValSet) { + ValueLongPair<String> defaultValueLongPair = getDefaultValueTimePair(); + String firstData = defaultValueLongPair.getValue(); + long firstTime = defaultValueLongPair.getTime(); + String[] stringValues = blockValSet.getStringValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + String data = stringValues[i]; + long time = timeValues[i]; + if (time <= firstTime) { + firstTime = time; + firstData = data; + } + } + setAggregationResult(aggregationResultHolder, firstData, firstTime); + } + + @Override + public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, BlockValSet timeValSet) { + String[] stringValues = blockValSet.getStringValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + String data = stringValues[i]; + long time = timeValues[i]; + setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time); + } + } + + @Override + public void aggregateGroupResultWithRawDataMv(int length, + int[][] groupKeysArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, + BlockValSet timeValSet) { + String[] stringValues = blockValSet.getStringValuesSV(); + long[] timeValues = timeValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + String value = stringValues[i]; + long time = timeValues[i]; + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, value, time); + } + } + } + + @Override + public String getResultColumnName() { + return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'STRING')"; + } + + @Override + public String getColumnName() { + return getType().getName() + "_" + _expression + "_" + _timeCol + "_STRING"; + } + + @Override + public ColumnDataType getFinalResultColumnType() { + return ColumnDataType.STRING; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunction.java new file mode 100644 index 0000000000..7049d0f1db --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunction.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; +import org.apache.pinot.segment.local.customobject.ValueLongPair; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.data.FieldSpec.DataType; + + +/** + * This function is used for FirstWithTime calculations. + * <p>The function can be used as FirstWithTime(dataExpression, timeExpression, 'dataType') + * <p>Following arguments are supported: + * <ul> + * <li>dataExpression: expression that contains the column to be calculated first on</li> + * <li>timeExpression: expression that contains the column to be used to decide which data is first, can be any + * Numeric column</li> + * <li>dataType: the data type of data column</li> + * </ul> + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public abstract class FirstWithTimeAggregationFunction<V extends Comparable<V>> + extends BaseSingleInputAggregationFunction<ValueLongPair<V>, V> { + protected final ExpressionContext _timeCol; + private final ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> _objectSerDe; + + public FirstWithTimeAggregationFunction(ExpressionContext dataCol, + ExpressionContext timeCol, + ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> objectSerDe) { + super(dataCol); + _timeCol = timeCol; + _objectSerDe = objectSerDe; + } + + public abstract ValueLongPair<V> constructValueLongPair(V value, long time); + + public abstract ValueLongPair<V> getDefaultValueTimePair(); + + public abstract void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder, + BlockValSet blockValSet, BlockValSet timeValSet); + + public abstract void aggregateGroupResultWithRawDataSv(int length, + int[] groupKeyArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, + BlockValSet timeValSet); + + public abstract void aggregateGroupResultWithRawDataMv(int length, + int[][] groupKeysArray, + GroupByResultHolder groupByResultHolder, + BlockValSet blockValSet, + BlockValSet timeValSet); + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.FIRSTWITHTIME; + } + + @Override + public AggregationResultHolder createAggregationResultHolder() { + return new ObjectAggregationResultHolder(); + } + + @Override + public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) { + return new ObjectGroupByResultHolder(initialCapacity, maxCapacity); + } + + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + + BlockValSet blockValSet = blockValSetMap.get(_expression); + BlockValSet blockTimeSet = blockValSetMap.get(_timeCol); + if (blockValSet.getValueType() != DataType.BYTES) { + aggregateResultWithRawData(length, aggregationResultHolder, blockValSet, blockTimeSet); + } else { + ValueLongPair<V> defaultValueLongPair = getDefaultValueTimePair(); + V firstData = defaultValueLongPair.getValue(); + long firstTime = defaultValueLongPair.getTime(); + // Serialized FirstPair + byte[][] bytesValues = blockValSet.getBytesValuesSV(); + for (int i = 0; i < length; i++) { + ValueLongPair<V> firstWithTimePair = _objectSerDe.deserialize(bytesValues[i]); + V data = firstWithTimePair.getValue(); + long time = firstWithTimePair.getTime(); + if (time <= firstTime) { + firstTime = time; + firstData = data; + } + } + setAggregationResult(aggregationResultHolder, firstData, firstTime); + } + } + + protected void setAggregationResult(AggregationResultHolder aggregationResultHolder, V data, long time) { + ValueLongPair firstWithTimePair = aggregationResultHolder.getResult(); + if (firstWithTimePair == null || time <= firstWithTimePair.getTime()) { + aggregationResultHolder.setValue(constructValueLongPair(data, time)); + } + } + + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet blockValSet = blockValSetMap.get(_expression); + BlockValSet timeValSet = blockValSetMap.get(_timeCol); + if (blockValSet.getValueType() != DataType.BYTES) { + aggregateGroupResultWithRawDataSv(length, groupKeyArray, groupByResultHolder, + blockValSet, timeValSet); + } else { + // Serialized FirstPair + byte[][] bytesValues = blockValSet.getBytesValuesSV(); + for (int i = 0; i < length; i++) { + ValueLongPair<V> firstWithTimePair = _objectSerDe.deserialize(bytesValues[i]); + setGroupByResult(groupKeyArray[i], + groupByResultHolder, + firstWithTimePair.getValue(), + firstWithTimePair.getTime()); + } + } + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet blockValSet = blockValSetMap.get(_expression); + BlockValSet timeValSet = blockValSetMap.get(_timeCol); + if (blockValSet.getValueType() != DataType.BYTES) { + aggregateGroupResultWithRawDataMv(length, groupKeysArray, groupByResultHolder, blockValSet, timeValSet); + } else { + // Serialized ValueTimePair + byte[][] bytesValues = blockValSet.getBytesValuesSV(); + for (int i = 0; i < length; i++) { + ValueLongPair<V> firstWithTimePair = _objectSerDe.deserialize(bytesValues[i]); + V data = firstWithTimePair.getValue(); + long time = firstWithTimePair.getTime(); + for (int groupKey : groupKeysArray[i]) { + setGroupByResult(groupKey, groupByResultHolder, data, time); + } + } + } + } + + protected void setGroupByResult(int groupKey, GroupByResultHolder groupByResultHolder, V data, long time) { + ValueLongPair firstWithTimePair = groupByResultHolder.getResult(groupKey); + if (firstWithTimePair == null || time <= firstWithTimePair.getTime()) { + groupByResultHolder.setValueForKey(groupKey, constructValueLongPair(data, time)); + } + } + + @Override + public ValueLongPair<V> extractAggregationResult(AggregationResultHolder aggregationResultHolder) { + ValueLongPair firstWithTimePair = aggregationResultHolder.getResult(); + if (firstWithTimePair == null) { + return getDefaultValueTimePair(); + } else { + return firstWithTimePair; + } + } + + @Override + public ValueLongPair<V> extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { + ValueLongPair<V> firstWithTimePair = groupByResultHolder.getResult(groupKey); + if (firstWithTimePair == null) { + return getDefaultValueTimePair(); + } else { + return firstWithTimePair; + } + } + + @Override + public ValueLongPair<V> merge(ValueLongPair<V> intermediateResult1, ValueLongPair<V> intermediateResult2) { + if (intermediateResult1.getTime() <= intermediateResult2.getTime()) { + return intermediateResult1; + } else { + return intermediateResult2; + } + } + + @Override + public List<ExpressionContext> getInputExpressions() { + return Arrays.asList(_expression, _timeCol); + } + + @Override + public ColumnDataType getIntermediateResultColumnType() { + return ColumnDataType.OBJECT; + } + + @Override + public V extractFinalResult(ValueLongPair<V> intermediateResult) { + return intermediateResult.getValue(); + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java index d51d4704f6..58deac76ee 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java @@ -87,6 +87,48 @@ public class AggregationFunctionFactoryTest { assertEquals(aggregationFunction.getColumnName(), "mode_column"); assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + function = getFunction("FiRsTwItHtImE", "(column,timeColumn,'BOOLEAN')"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); + assertTrue(aggregationFunction instanceof FirstIntValueWithTimeAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.FIRSTWITHTIME); + assertEquals(aggregationFunction.getColumnName(), "firstWithTime_column_timeColumn_BOOLEAN"); + assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + + function = getFunction("FiRsTwItHtImE", "(column,timeColumn,'INT')"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); + assertTrue(aggregationFunction instanceof FirstIntValueWithTimeAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.FIRSTWITHTIME); + assertEquals(aggregationFunction.getColumnName(), "firstWithTime_column_timeColumn_INT"); + assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + + function = getFunction("FiRsTwItHtImE", "(column,timeColumn,'LONG')"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); + assertTrue(aggregationFunction instanceof FirstLongValueWithTimeAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.FIRSTWITHTIME); + assertEquals(aggregationFunction.getColumnName(), "firstWithTime_column_timeColumn_LONG"); + assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + + function = getFunction("FiRsTwItHtImE", "(column,timeColumn,'FLOAT')"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); + assertTrue(aggregationFunction instanceof FirstFloatValueWithTimeAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.FIRSTWITHTIME); + assertEquals(aggregationFunction.getColumnName(), "firstWithTime_column_timeColumn_FLOAT"); + assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + + function = getFunction("FiRsTwItHtImE", "(column,timeColumn,'DOUBLE')"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); + assertTrue(aggregationFunction instanceof FirstDoubleValueWithTimeAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.FIRSTWITHTIME); + assertEquals(aggregationFunction.getColumnName(), "firstWithTime_column_timeColumn_DOUBLE"); + assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + + function = getFunction("FiRsTwItHtImE", "(column,timeColumn,'STRING')"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); + assertTrue(aggregationFunction instanceof FirstStringValueWithTimeAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.FIRSTWITHTIME); + assertEquals(aggregationFunction.getColumnName(), "firstWithTime_column_timeColumn_STRING"); + assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + function = getFunction("LaStWiThTiMe", "(column,timeColumn,'BOOLEAN')"); aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); assertTrue(aggregationFunction instanceof LastIntValueWithTimeAggregationFunction); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/FirstWithTimeQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/FirstWithTimeQueriesTest.java new file mode 100644 index 0000000000..e7be59a2c8 --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/queries/FirstWithTimeQueriesTest.java @@ -0,0 +1,434 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.queries; + +import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.operator.query.AggregationGroupByOrderByOperator; +import org.apache.pinot.core.operator.query.AggregationOperator; +import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; +import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator; +import org.apache.pinot.segment.local.customobject.DoubleLongPair; +import org.apache.pinot.segment.local.customobject.FloatLongPair; +import org.apache.pinot.segment.local.customobject.IntLongPair; +import org.apache.pinot.segment.local.customobject.LongLongPair; +import org.apache.pinot.segment.local.customobject.StringLongPair; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; +import org.apache.pinot.segment.spi.ImmutableSegment; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.utils.ReadMode; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + + +/** + * Queries test for FIRSTWITHTIME queries. + */ +public class FirstWithTimeQueriesTest extends BaseQueriesTest { + private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "FirstQueriesTest"); + private static final String RAW_TABLE_NAME = "testTable"; + private static final String SEGMENT_NAME = "testSegment"; + private static final Random RANDOM = new Random(); + + private static final int NUM_RECORDS = 2000; + private static final int MAX_VALUE = 1000; + + private static final String BOOL_COLUMN = "boolColumn"; + private static final String BOOL_NO_DICT_COLUMN = "boolNoDictColumn"; + private static final String INT_COLUMN = "intColumn"; + private static final String INT_MV_COLUMN = "intMVColumn"; + private static final String INT_NO_DICT_COLUMN = "intNoDictColumn"; + private static final String LONG_COLUMN = "longColumn"; + private static final String LONG_NO_DICT_COLUMN = "longNoDictColumn"; + private static final String FLOAT_COLUMN = "floatColumn"; + private static final String FLOAT_NO_DICT_COLUMN = "floatNoDictColumn"; + private static final String DOUBLE_COLUMN = "doubleColumn"; + private static final String DOUBLE_NO_DICT_COLUMN = "doubleNoDictColumn"; + private static final String STRING_COLUMN = "stringColumn"; + private static final String STRING_NO_DICT_COLUMN = "stringNoDictColumn"; + private static final String TIME_COLUMN = "timestampColumn"; + private static final Schema SCHEMA = new Schema.SchemaBuilder() + .addSingleValueDimension(BOOL_COLUMN, DataType.BOOLEAN) + .addSingleValueDimension(BOOL_NO_DICT_COLUMN, DataType.BOOLEAN) + .addSingleValueDimension(INT_COLUMN, DataType.INT) + .addMultiValueDimension(INT_MV_COLUMN, DataType.INT) + .addSingleValueDimension(INT_NO_DICT_COLUMN, DataType.INT) + .addSingleValueDimension(LONG_COLUMN, DataType.LONG) + .addSingleValueDimension(LONG_NO_DICT_COLUMN, DataType.LONG) + .addSingleValueDimension(FLOAT_COLUMN, DataType.FLOAT) + .addSingleValueDimension(FLOAT_NO_DICT_COLUMN, DataType.FLOAT) + .addSingleValueDimension(DOUBLE_COLUMN, DataType.DOUBLE) + .addSingleValueDimension(DOUBLE_NO_DICT_COLUMN, DataType.DOUBLE) + .addSingleValueDimension(STRING_COLUMN, DataType.STRING) + .addSingleValueDimension(STRING_NO_DICT_COLUMN, DataType.STRING) + .addSingleValueDimension(TIME_COLUMN, DataType.LONG).build(); + private static final TableConfig TABLE_CONFIG = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) + .setNoDictionaryColumns( + Lists.newArrayList(INT_NO_DICT_COLUMN, LONG_NO_DICT_COLUMN, FLOAT_NO_DICT_COLUMN, DOUBLE_NO_DICT_COLUMN)) + .build(); + + private Boolean _expectedResultFirstBoolean; + private Integer _expectedResultFirstInt; + private Long _expectedResultFirstLong; + private Float _expectedResultFirstFloat; + private Double _expectedResultFirstDouble; + private String _expectedResultFirstString; + private Map<Integer, Boolean> _boolGroupValues; + private Map<Integer, Integer> _intGroupValues; + private Map<Integer, Long> _longGroupValues; + private Map<Integer, Float> _floatGroupValues; + private Map<Integer, Double> _doubleGroupValues; + private Map<Integer, String> _stringGroupValues; + private IndexSegment _indexSegment; + private List<IndexSegment> _indexSegments; + + @Override + protected String getFilter() { + return ""; + } + + @Override + protected IndexSegment getIndexSegment() { + return _indexSegment; + } + + @Override + protected List<IndexSegment> getIndexSegments() { + return _indexSegments; + } + + @BeforeClass + public void setUp() + throws Exception { + FileUtils.deleteDirectory(INDEX_DIR); + + List<GenericRow> records = new ArrayList<>(NUM_RECORDS); + _boolGroupValues = new HashMap<>(); + _intGroupValues = new HashMap<>(); + _longGroupValues = new HashMap<>(); + _floatGroupValues = new HashMap<>(); + _doubleGroupValues = new HashMap<>(); + _stringGroupValues = new HashMap<>(); + for (int i = 0; i < NUM_RECORDS; i++) { + boolean boolValue = RANDOM.nextBoolean(); + int intValue = RANDOM.nextInt(MAX_VALUE); + long longValue = RANDOM.nextLong(); + float floatValue = RANDOM.nextFloat(); + double doubleValue = RANDOM.nextDouble(); + String strValue = String.valueOf(RANDOM.nextDouble()); + GenericRow record = new GenericRow(); + record.putValue(BOOL_COLUMN, boolValue); + record.putValue(BOOL_NO_DICT_COLUMN, boolValue); + record.putValue(INT_COLUMN, intValue); + record.putValue(INT_MV_COLUMN, new Integer[]{intValue, intValue}); + record.putValue(INT_NO_DICT_COLUMN, intValue); + record.putValue(LONG_COLUMN, longValue); + record.putValue(LONG_NO_DICT_COLUMN, longValue); + record.putValue(FLOAT_COLUMN, floatValue); + record.putValue(FLOAT_NO_DICT_COLUMN, floatValue); + record.putValue(DOUBLE_COLUMN, doubleValue); + record.putValue(DOUBLE_NO_DICT_COLUMN, doubleValue); + record.putValue(STRING_COLUMN, strValue); + record.putValue(STRING_NO_DICT_COLUMN, strValue); + record.putValue(TIME_COLUMN, (long) i); + if (i == 0) { + _expectedResultFirstBoolean = boolValue; + _expectedResultFirstInt = intValue; + _expectedResultFirstLong = longValue; + _expectedResultFirstFloat = floatValue; + _expectedResultFirstDouble = doubleValue; + _expectedResultFirstString = strValue; + } + _boolGroupValues.putIfAbsent(intValue, boolValue); + _intGroupValues.putIfAbsent(intValue, intValue); + _longGroupValues.putIfAbsent(intValue, longValue); + _floatGroupValues.putIfAbsent(intValue, floatValue); + _doubleGroupValues.putIfAbsent(intValue, doubleValue); + _stringGroupValues.putIfAbsent(intValue, strValue); + records.add(record); + } + + SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA); + segmentGeneratorConfig.setTableName(RAW_TABLE_NAME); + segmentGeneratorConfig.setSegmentName(SEGMENT_NAME); + segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath()); + + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records)); + driver.build(); + + ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap); + _indexSegment = immutableSegment; + _indexSegments = Arrays.asList(immutableSegment, immutableSegment); + } + + @Test + public void testAggregationOnly() { + String query = "SELECT " + + "FIRSTWITHTIME(boolColumn, timestampColumn, 'BOOLEAN'), " + + "FIRSTWITHTIME(intColumn, timestampColumn, 'INT'), " + + "FIRSTWITHTIME(longColumn, timestampColumn, 'LONG'), " + + "FIRSTWITHTIME(floatColumn, timestampColumn, 'FLOAT'), " + + "FIRSTWITHTIME(doubleColumn, timestampColumn, 'DOUBLE'), " + + "FIRSTWITHTIME(stringColumn, timestampColumn, 'STRING') " + + "FROM testTable"; + + // Inner segment + AggregationOperator aggregationOperator = getOperator(query); + IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock(); + QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), NUM_RECORDS, 0, + 7 * NUM_RECORDS, NUM_RECORDS); + List<Object> aggregationResult = resultsBlock.getAggregationResult(); + assertNotNull(aggregationResult); + assertEquals(((IntLongPair) aggregationResult.get(0)).getValue() != 0, _expectedResultFirstBoolean.booleanValue()); + assertEquals(((IntLongPair) aggregationResult.get(1)).getValue(), _expectedResultFirstInt); + assertEquals(((LongLongPair) aggregationResult.get(2)).getValue(), _expectedResultFirstLong); + assertEquals(((FloatLongPair) aggregationResult.get(3)).getValue(), _expectedResultFirstFloat); + assertEquals(((DoubleLongPair) aggregationResult.get(4)).getValue(), _expectedResultFirstDouble); + assertEquals(((StringLongPair) aggregationResult.get(5)).getValue(), _expectedResultFirstString); + + // Inter segments (expect 4 * inner segment result) + BrokerResponseNative brokerResponse = getBrokerResponse(query); + DataSchema expectedDataSchema = new DataSchema(new String[]{ + "firstwithtime(boolColumn,timestampColumn,'BOOLEAN')", + "firstwithtime(intColumn,timestampColumn,'INT')", + "firstwithtime(longColumn,timestampColumn,'LONG')", + "firstwithtime(floatColumn,timestampColumn,'FLOAT')", + "firstwithtime(doubleColumn,timestampColumn,'DOUBLE')", + "firstwithtime(stringColumn,timestampColumn,'STRING')" + }, new ColumnDataType[]{ + ColumnDataType.BOOLEAN, + ColumnDataType.INT, + ColumnDataType.LONG, + ColumnDataType.FLOAT, + ColumnDataType.DOUBLE, + ColumnDataType.STRING + }); + Object[] expectedResults = new Object[]{ + _expectedResultFirstBoolean, + _expectedResultFirstInt, + _expectedResultFirstLong, + _expectedResultFirstFloat, + _expectedResultFirstDouble, + _expectedResultFirstString + }; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 4 * NUM_RECORDS, 0L, 4 * 7 * NUM_RECORDS, + 4 * NUM_RECORDS, new ResultTable(expectedDataSchema, Collections.singletonList(expectedResults))); + } + + @Test + public void testAggregationOnlyNoDictionary() { + String query = "SELECT " + + "FIRSTWITHTIME(boolNoDictColumn,timestampColumn,'boolean'), " + + "FIRSTWITHTIME(intNoDictColumn,timestampColumn,'int'), " + + "FIRSTWITHTIME(longNoDictColumn,timestampColumn,'long'), " + + "FIRSTWITHTIME(floatNoDictColumn,timestampColumn,'float'), " + + "FIRSTWITHTIME(doubleNoDictColumn,timestampColumn,'double'), " + + "FIRSTWITHTIME(stringNoDictColumn,timestampColumn,'string') " + + "FROM testTable"; + + // Inner segment + AggregationOperator aggregationOperator = getOperator(query); + IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock(); + QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), NUM_RECORDS, 0, + 7 * NUM_RECORDS, NUM_RECORDS); + List<Object> aggregationResult = resultsBlock.getAggregationResult(); + assertNotNull(aggregationResult); + assertEquals(((IntLongPair) aggregationResult.get(0)).getValue() != 0, _expectedResultFirstBoolean.booleanValue()); + assertEquals(((IntLongPair) aggregationResult.get(1)).getValue(), _expectedResultFirstInt); + assertEquals(((LongLongPair) aggregationResult.get(2)).getValue(), _expectedResultFirstLong); + assertEquals(((FloatLongPair) aggregationResult.get(3)).getValue(), _expectedResultFirstFloat); + assertEquals(((DoubleLongPair) aggregationResult.get(4)).getValue(), _expectedResultFirstDouble); + assertEquals(((StringLongPair) aggregationResult.get(5)).getValue(), _expectedResultFirstString); + + // Inter segments (expect 4 * inner segment result) + BrokerResponseNative brokerResponse = getBrokerResponse(query); + DataSchema expectedDataSchema = new DataSchema(new String[]{ + "firstwithtime(boolNoDictColumn,timestampColumn,'BOOLEAN')", + "firstwithtime(intNoDictColumn,timestampColumn,'INT')", + "firstwithtime(longNoDictColumn,timestampColumn,'LONG')", + "firstwithtime(floatNoDictColumn,timestampColumn,'FLOAT')", + "firstwithtime(doubleNoDictColumn,timestampColumn,'DOUBLE')", + "firstwithtime(stringNoDictColumn,timestampColumn,'STRING')" + }, new ColumnDataType[]{ + ColumnDataType.BOOLEAN, + ColumnDataType.INT, + ColumnDataType.LONG, + ColumnDataType.FLOAT, + ColumnDataType.DOUBLE, + ColumnDataType.STRING + }); + Object[] expectedResults = new Object[]{ + _expectedResultFirstBoolean, + _expectedResultFirstInt, + _expectedResultFirstLong, + _expectedResultFirstFloat, + _expectedResultFirstDouble, + _expectedResultFirstString + }; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 4 * NUM_RECORDS, 0L, 4 * 7 * NUM_RECORDS, + 4 * NUM_RECORDS, new ResultTable(expectedDataSchema, Collections.singletonList(expectedResults))); + } + + @Test + public void testAggregationGroupBySV() { + String query = "SELECT intColumn AS key, " + + "FIRSTWITHTIME(boolColumn,timestampColumn,'boolean') AS v1, " + + "FIRSTWITHTIME(intColumn,timestampColumn,'int') AS v2, " + + "FIRSTWITHTIME(longColumn,timestampColumn,'long') AS v3, " + + "FIRSTWITHTIME(floatColumn,timestampColumn,'float') AS v4, " + + "FIRSTWITHTIME(doubleColumn,timestampColumn,'double') AS v5, " + + "FIRSTWITHTIME(stringColumn,timestampColumn,'string') AS v6 " + + "FROM testTable GROUP BY key"; + verifyAggregationGroupBy(query, 7); + } + + @Test + public void testAggregationGroupBySVNoDictionary() { + String query = "SELECT intNoDictColumn AS key, " + + "FIRSTWITHTIME(boolNoDictColumn,timestampColumn,'boolean') AS v1, " + + "FIRSTWITHTIME(intNoDictColumn,timestampColumn,'int') AS v2, " + + "FIRSTWITHTIME(longNoDictColumn,timestampColumn,'long') AS v3, " + + "FIRSTWITHTIME(floatNoDictColumn,timestampColumn,'float') AS v4, " + + "FIRSTWITHTIME(doubleNoDictColumn,timestampColumn,'double') AS v5, " + + "FIRSTWITHTIME(stringNoDictColumn,timestampColumn,'string') AS v6 " + + "FROM testTable GROUP BY key"; + verifyAggregationGroupBy(query, 7); + } + + @Test + public void testAggregationGroupByMV() { + String query = "SELECT intMVColumn AS key, " + + "FIRSTWITHTIME(boolColumn,timestampColumn,'boolean') AS v1, " + + "FIRSTWITHTIME(intColumn,timestampColumn,'int') AS v2, " + + "FIRSTWITHTIME(longColumn,timestampColumn,'long') AS v3, " + + "FIRSTWITHTIME(floatColumn,timestampColumn,'float') AS v4, " + + "FIRSTWITHTIME(doubleColumn,timestampColumn,'double') AS v5, " + + "FIRSTWITHTIME(stringColumn,timestampColumn,'string') AS v6 " + + "FROM testTable GROUP BY key"; + verifyAggregationGroupBy(query, 8); + } + + @Test + public void testAggregationGroupByMVNoDictionary() { + String query = "SELECT intMVColumn AS key, " + + "FIRSTWITHTIME(boolNoDictColumn,timestampColumn,'boolean') AS v1, " + + "FIRSTWITHTIME(intNoDictColumn,timestampColumn,'int') AS v2, " + + "FIRSTWITHTIME(longNoDictColumn,timestampColumn,'long') AS v3, " + + "FIRSTWITHTIME(floatNoDictColumn,timestampColumn,'float') AS v4, " + + "FIRSTWITHTIME(doubleNoDictColumn,timestampColumn,'double') AS v5, " + + "FIRSTWITHTIME(stringNoDictColumn,timestampColumn,'string') AS v6 " + + "FROM testTable GROUP BY key"; + verifyAggregationGroupBy(query, 8); + } + + private void verifyAggregationGroupBy(String query, int numProjectedColumns) { + // Inner segment + AggregationGroupByOrderByOperator groupByOperator = getOperator(query); + IntermediateResultsBlock resultsBlock = groupByOperator.nextBlock(); + QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0, + numProjectedColumns * (long) NUM_RECORDS, NUM_RECORDS); + AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult(); + assertNotNull(aggregationGroupByResult); + int numGroups = 0; + Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = aggregationGroupByResult.getGroupKeyIterator(); + while (groupKeyIterator.hasNext()) { + numGroups++; + GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next(); + Integer key = (Integer) groupKey._keys[0]; + assertTrue(_intGroupValues.containsKey(key)); + assertEquals(((IntLongPair) aggregationGroupByResult.getResultForGroupId(0, groupKey._groupId)).getValue() != 0, + (boolean) _boolGroupValues.get(key)); + assertEquals(((IntLongPair) aggregationGroupByResult.getResultForGroupId(1, groupKey._groupId)).getValue(), + _intGroupValues.get(key)); + assertEquals(((LongLongPair) aggregationGroupByResult.getResultForGroupId(2, groupKey._groupId)).getValue(), + _longGroupValues.get(key)); + assertEquals(((FloatLongPair) aggregationGroupByResult.getResultForGroupId(3, groupKey._groupId)).getValue(), + _floatGroupValues.get(key)); + assertEquals(((DoubleLongPair) aggregationGroupByResult.getResultForGroupId(4, groupKey._groupId)).getValue(), + _doubleGroupValues.get(key)); + assertEquals(((StringLongPair) aggregationGroupByResult.getResultForGroupId(5, groupKey._groupId)).getValue(), + _stringGroupValues.get(key)); + } + assertEquals(numGroups, _intGroupValues.size()); + + // Inter segments (expect 4 * inner segment result) + BrokerResponseNative brokerResponse = getBrokerResponse(query); + assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS); + assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0); + assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * numProjectedColumns * (long) NUM_RECORDS); + assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS); + + ResultTable resultTable = brokerResponse.getResultTable(); + DataSchema expectedDataSchema = + new DataSchema(new String[]{"key", "v1", "v2", "v3", "v4", "v5", "v6"}, new ColumnDataType[]{ + ColumnDataType.INT, ColumnDataType.BOOLEAN, ColumnDataType.INT, ColumnDataType.LONG, + ColumnDataType.FLOAT, ColumnDataType.DOUBLE, ColumnDataType.STRING + }); + assertEquals(resultTable.getDataSchema(), expectedDataSchema); + List<Object[]> rows = resultTable.getRows(); + assertEquals(rows.size(), 10); + for (Object[] row : rows) { + assertEquals(row.length, 7); + int key = (Integer) row[0]; + assertEquals(row[1], _boolGroupValues.get(key)); + assertEquals(row[2], _intGroupValues.get(key)); + assertEquals(row[3], _longGroupValues.get(key)); + assertEquals(row[4], _floatGroupValues.get(key)); + assertEquals(row[5], _doubleGroupValues.get(key)); + assertEquals(row[6], _stringGroupValues.get(key)); + } + } + + @AfterClass + public void tearDown() + throws IOException { + _indexSegment.destroy(); + FileUtils.deleteDirectory(INDEX_DIR); + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java index 889ade6e42..a1a435600a 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java @@ -37,6 +37,7 @@ public enum AggregationFunctionType { SUMPRECISION("sumPrecision"), AVG("avg"), MODE("mode"), + FIRSTWITHTIME("firstWithTime"), LASTWITHTIME("lastWithTime"), MINMAXRANGE("minMaxRange"), DISTINCTCOUNT("distinctCount"), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org