This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 272cddd8dd2 Star-tree index aggregate on MV columns (#16836)
272cddd8dd2 is described below
commit 272cddd8dd2ff69a98907676691336787c4874e1
Author: Mohemmad Zaid Khan <[email protected]>
AuthorDate: Fri Oct 24 21:02:13 2025 +0530
Star-tree index aggregate on MV columns (#16836)
---
.../function/AvgAggregationFunction.java | 32 ++++-----
.../function/AvgMVAggregationFunction.java | 32 ++++++++-
.../function/CountMVAggregationFunction.java | 25 +++++++
.../function/SumMVAggregationFunction.java | 33 ++++++++++
.../core/startree/v2/AvgMVStarTreeV2Test.java | 57 ++++++++++++++++
.../pinot/core/startree/v2/BaseStarTreeV2Test.java | 31 ++++++---
.../core/startree/v2/CountMVStarTreeV2Test.java | 55 ++++++++++++++++
.../core/startree/v2/SumMVStarTreeV2Test.java | 55 ++++++++++++++++
.../src/test/resources/TableIndexingTest.csv | 16 ++---
.../tests/StarTreeClusterIntegrationTest.java | 46 ++++++++++++-
...rmance_2014_100k_subset_nonulls_columns.schema} | 3 +-
.../local/aggregator/AvgMVValueAggregator.java | 77 ++++++++++++++++++++++
.../local/aggregator/CountMVValueAggregator.java | 69 +++++++++++++++++++
.../local/aggregator/SumMVValueAggregator.java | 69 +++++++++++++++++++
.../local/aggregator/ValueAggregatorFactory.java | 12 ++++
.../segment/local/utils/TableConfigUtils.java | 21 +++---
.../local/aggregator/ValueAggregatorTest.java | 3 +
.../airlineStats_offline_table_config.json | 11 ++++
.../batch/airlineStats/airlineStats_schema.json | 2 +-
19 files changed, 599 insertions(+), 50 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java
index 0e92c70b3bd..39dc8146709 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java
@@ -81,12 +81,10 @@ public class AvgAggregationFunction extends
NullableSingleInputAggregationFuncti
// Serialized AvgPair
byte[][] bytesValues = blockValSet.getBytesValuesSV();
AvgPair avgPair = new AvgPair();
- forEachNotNull(length, blockValSet, (from, to) -> {
- for (int i = from; i < to; i++) {
- AvgPair value =
ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
- avgPair.apply(value);
- }
- });
+ for (int i = 0; i < length; i++) {
+ AvgPair value =
ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
+ avgPair.apply(value);
+ }
// Only set the aggregation result when there is at least one non-null
input value
if (avgPair.getCount() != 0) {
updateAggregationResult(aggregationResultHolder, avgPair.getSum(),
avgPair.getCount());
@@ -118,12 +116,10 @@ public class AvgAggregationFunction extends
NullableSingleInputAggregationFuncti
} else {
// Serialized AvgPair
byte[][] bytesValues = blockValSet.getBytesValuesSV();
- forEachNotNull(length, blockValSet, (from, to) -> {
- for (int i = from; i < to; i++) {
- AvgPair avgPair =
ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
- updateGroupByResult(groupKeyArray[i], groupByResultHolder,
avgPair.getSum(), avgPair.getCount());
- }
- });
+ for (int i = 0; i < length; i++) {
+ AvgPair avgPair =
ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
+ updateGroupByResult(groupKeyArray[i], groupByResultHolder,
avgPair.getSum(), avgPair.getCount());
+ }
}
}
@@ -144,14 +140,12 @@ public class AvgAggregationFunction extends
NullableSingleInputAggregationFuncti
} else {
// Serialized AvgPair
byte[][] bytesValues = blockValSet.getBytesValuesSV();
- forEachNotNull(length, blockValSet, (from, to) -> {
- for (int i = from; i < to; i++) {
- AvgPair avgPair =
ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
- for (int groupKey : groupKeysArray[i]) {
- updateGroupByResult(groupKey, groupByResultHolder,
avgPair.getSum(), avgPair.getCount());
- }
+ for (int i = 0; i < length; i++) {
+ AvgPair avgPair =
ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
+ for (int groupKey : groupKeysArray[i]) {
+ updateGroupByResult(groupKey, groupByResultHolder, avgPair.getSum(),
avgPair.getCount());
}
- });
+ }
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgMVAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgMVAggregationFunction.java
index a81af44622e..d9aadc1b9c2 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgMVAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgMVAggregationFunction.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.segment.local.customobject.AvgPair;
@@ -43,8 +44,23 @@ public class AvgMVAggregationFunction extends
AvgAggregationFunction {
public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
- double[][] valuesArray = blockValSet.getDoubleValuesMV();
+ if (blockValSet.isSingleValue()) {
+ // star-tree pre-aggregated values: During star-tree creation, the
multi-value column is pre-aggregated
+ // per star-tree node, resulting in a single value per node.
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ AvgPair avgPair = new AvgPair();
+ for (int i = 0; i < length; i++) {
+ AvgPair value =
ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
+ avgPair.apply(value);
+ }
+ if (avgPair.getCount() != 0) {
+ updateAggregationResult(aggregationResultHolder, avgPair.getSum(),
avgPair.getCount());
+ }
+ return;
+ }
+
+ double[][] valuesArray = blockValSet.getDoubleValuesMV();
AvgPair avgPair = new AvgPair();
forEachNotNull(length, blockValSet, (from, to) -> {
for (int i = from; i < to; i++) {
@@ -53,7 +69,6 @@ public class AvgMVAggregationFunction extends
AvgAggregationFunction {
}
}
});
-
if (avgPair.getCount() != 0) {
updateAggregationResult(aggregationResultHolder, avgPair.getSum(),
avgPair.getCount());
}
@@ -63,8 +78,19 @@ public class AvgMVAggregationFunction extends
AvgAggregationFunction {
public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
- double[][] valuesArray = blockValSet.getDoubleValuesMV();
+ if (blockValSet.isSingleValue()) {
+ // star-tree pre-aggregated values: During star-tree creation, the
multi-value column is pre-aggregated
+ // per star-tree node, resulting in a single value per node.
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ for (int i = 0; i < length; i++) {
+ AvgPair avgPair =
ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
+ updateGroupByResult(groupKeyArray[i], groupByResultHolder,
avgPair.getSum(), avgPair.getCount());
+ }
+ return;
+ }
+
+ double[][] valuesArray = blockValSet.getDoubleValuesMV();
forEachNotNull(length, blockValSet, (from, to) -> {
for (int i = from; i < to; i++) {
aggregateOnGroupKey(groupKeyArray[i], groupByResultHolder,
valuesArray[i]);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountMVAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountMVAggregationFunction.java
index 153aa1c37fc..f6d213584ec 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountMVAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountMVAggregationFunction.java
@@ -53,6 +53,19 @@ public class CountMVAggregationFunction extends
CountAggregationFunction {
public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ if (blockValSet.isSingleValue()) {
+ // star-tree pre-aggregated values: During star-tree creation, the
multi-value column is pre-aggregated
+ // per star-tree node, resulting in a single value per node.
+ long[] valueArray = blockValSet.getLongValuesSV();
+ long count = 0;
+ for (int i = 0; i < length; i++) {
+ count += valueArray[i];
+ }
+
aggregationResultHolder.setValue(aggregationResultHolder.getDoubleResult() +
count);
+ return;
+ }
+
int[] valueArray = blockValSet.getNumMVEntries();
// Hack to make count effectively final for use in the lambda (we know
that there aren't concurrent access issues
// with forEachNotNull)
@@ -69,6 +82,18 @@ public class CountMVAggregationFunction extends
CountAggregationFunction {
public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ if (blockValSet.isSingleValue()) {
+ // star-tree pre-aggregated values: During star-tree creation, the
multi-value column is pre-aggregated
+ // per star-tree node, resulting in a single value per node.
+ long[] valueArray = blockValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ int groupKey = groupKeyArray[i];
+ groupByResultHolder.setValueForKey(groupKey,
groupByResultHolder.getDoubleResult(groupKey) + valueArray[i]);
+ }
+ return;
+ }
+
int[] valueArray = blockValSet.getNumMVEntries();
forEachNotNull(length, blockValSet, (from, to) -> {
for (int i = from; i < to; i++) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumMVAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumMVAggregationFunction.java
index f7d914d2dbc..b0cae2534d3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumMVAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumMVAggregationFunction.java
@@ -42,6 +42,19 @@ public class SumMVAggregationFunction extends
SumAggregationFunction {
public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ if (blockValSet.isSingleValue()) {
+ // star-tree pre-aggregated values: During star-tree creation, the
multi-value column is pre-aggregated
+ // per star-tree node, resulting in a single value per node.
+ double[] valueArray = blockValSet.getDoubleValuesSV();
+ double sum = 0.0;
+ for (int i = 0; i < length; i++) {
+ sum += valueArray[i];
+ }
+ updateAggregationResultHolder(aggregationResultHolder, sum);
+ return;
+ }
+
double[][] valuesArray = blockValSet.getDoubleValuesMV();
Double sum;
@@ -62,6 +75,26 @@ public class SumMVAggregationFunction extends
SumAggregationFunction {
public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ if (blockValSet.isSingleValue()) {
+ // star-tree pre-aggregated values: During star-tree creation, the
multi-value column is pre-aggregated
+ // per star-tree node, resulting in a single value per node.
+ double[] valueArray = blockValSet.getDoubleValuesSV();
+ if (_nullHandlingEnabled) {
+ for (int i = 0; i < length; i++) {
+ int groupKey = groupKeyArray[i];
+ Double result = groupByResultHolder.getResult(groupKey);
+ groupByResultHolder.setValueForKey(groupKey, result == null ?
valueArray[i] : result + valueArray[i]);
+ }
+ } else {
+ for (int i = 0; i < length; i++) {
+ int groupKey = groupKeyArray[i];
+ groupByResultHolder.setValueForKey(groupKey,
groupByResultHolder.getDoubleResult(groupKey) + valueArray[i]);
+ }
+ }
+ return;
+ }
+
double[][] valuesArray = blockValSet.getDoubleValuesMV();
if (_nullHandlingEnabled) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/AvgMVStarTreeV2Test.java
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/AvgMVStarTreeV2Test.java
new file mode 100644
index 00000000000..91b56716128
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/AvgMVStarTreeV2Test.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.startree.v2;
+
+import java.util.Random;
+import org.apache.pinot.segment.local.aggregator.AvgMVValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.customobject.AvgPair;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class AvgMVStarTreeV2Test extends BaseStarTreeV2Test<Object, AvgPair> {
+
+ @Override
+ ValueAggregator<Object, AvgPair> getValueAggregator() {
+ return new AvgMVValueAggregator();
+ }
+
+ @Override
+ DataType getRawValueType() {
+ return DataType.INT;
+ }
+
+ @Override
+ Object getRandomRawValue(Random random) {
+ return random.nextInt();
+ }
+
+ @Override
+ protected void assertAggregatedValue(AvgPair starTreeResult, AvgPair
nonStarTreeResult) {
+ assertEquals(starTreeResult.getSum(), nonStarTreeResult.getSum(), 1e-5);
+ assertEquals(starTreeResult.getCount(), nonStarTreeResult.getCount());
+ }
+
+ @Override
+ protected boolean isAggColSingleValueField() {
+ return false;
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
index f94f4d58e05..306daf40bb2 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
@@ -98,7 +98,7 @@ abstract class BaseStarTreeV2Test<R, A> {
private static final String DIMENSION1 = "d1__COLUMN_NAME";
private static final String DIMENSION2 = "DISTINCTCOUNTRAWHLL__d2";
private static final int DIMENSION_CARDINALITY = 100;
- private static final String METRIC = "m";
+ private static final String AGG_COL = "m";
// Supported filters
private static final String QUERY_FILTER_AND = String.format(" WHERE %1$s =
0 AND %2$s < 10", DIMENSION1, DIMENSION2);
@@ -151,7 +151,8 @@ abstract class BaseStarTreeV2Test<R, A> {
DataType rawValueType = getRawValueType();
// Raw value type will be null for COUNT aggregation function
if (rawValueType != null) {
- schemaBuilder.addMetric(METRIC, rawValueType);
+ schemaBuilder.addDimensionField(AGG_COL, rawValueType,
+ dimensionFieldSpec ->
dimensionFieldSpec.setSingleValueField(isAggColSingleValueField()));
}
Schema schema = schemaBuilder.build();
TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
@@ -162,7 +163,7 @@ abstract class BaseStarTreeV2Test<R, A> {
segmentRecord.putValue(DIMENSION1,
RANDOM.nextInt(DIMENSION_CARDINALITY));
segmentRecord.putValue(DIMENSION2,
RANDOM.nextInt(DIMENSION_CARDINALITY));
if (rawValueType != null) {
- segmentRecord.putValue(METRIC, getRandomRawValue(RANDOM));
+ segmentRecord.putValue(AGG_COL, getRandomRawValue(RANDOM));
}
segmentRecords.add(segmentRecord);
}
@@ -175,8 +176,9 @@ abstract class BaseStarTreeV2Test<R, A> {
driver.build();
StarTreeIndexConfig starTreeIndexConfig = new
StarTreeIndexConfig(Arrays.asList(DIMENSION1, DIMENSION2), null, null,
- Collections.singletonList(new StarTreeAggregationConfig(METRIC,
_valueAggregator.getAggregationType().getName(),
- null, getCompressionCodec(), true, getIndexVersion(), null,
null)), MAX_LEAF_RECORDS);
+ Collections.singletonList(new StarTreeAggregationConfig(AGG_COL,
+ _valueAggregator.getAggregationType().getName(), null,
getCompressionCodec(),
+ true, getIndexVersion(), null, null)), MAX_LEAF_RECORDS);
File indexDir = new File(TEMP_DIR, SEGMENT_NAME);
// Randomly build star-tree using on-heap or off-heap mode
MultipleTreesBuilder.BuildMode buildMode =
@@ -196,9 +198,9 @@ abstract class BaseStarTreeV2Test<R, A> {
} else if (aggregationType == AggregationFunctionType.PERCENTILEEST
|| aggregationType == AggregationFunctionType.PERCENTILETDIGEST) {
// Append a percentile number for percentile functions
- return String.format("%s(%s, 50)", aggregationType.getName(), METRIC);
+ return String.format("%s(%s, 50)", aggregationType.getName(), AGG_COL);
} else {
- return String.format("%s(%s)", aggregationType.getName(), METRIC);
+ return String.format("%s(%s)", aggregationType.getName(), AGG_COL);
}
}
@@ -476,7 +478,16 @@ abstract class BaseStarTreeV2Test<R, A> {
private Object getNextRawValue(int docId, ForwardIndexReader reader,
ForwardIndexReaderContext readerContext,
Dictionary dictionary) {
- return dictionary.get(reader.getDictId(docId, readerContext));
+ if (isAggColSingleValueField()) {
+ return dictionary.get(reader.getDictId(docId, readerContext));
+ } else {
+ int[] dictIds = reader.getDictIdMV(docId, readerContext);
+ Object[] rawValue = new Object[dictIds.length];
+ for (int i = 0; i < dictIds.length; i++) {
+ rawValue[i] = dictionary.get(dictIds[i]);
+ }
+ return rawValue;
+ }
}
/**
@@ -499,6 +510,10 @@ abstract class BaseStarTreeV2Test<R, A> {
return version > 1 ? version : null;
}
+ protected boolean isAggColSingleValueField() {
+ return true;
+ }
+
abstract ValueAggregator<R, A> getValueAggregator();
abstract DataType getRawValueType();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/CountMVStarTreeV2Test.java
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/CountMVStarTreeV2Test.java
new file mode 100644
index 00000000000..5c2e23a162d
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/CountMVStarTreeV2Test.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.startree.v2;
+
+import java.util.Random;
+import org.apache.pinot.segment.local.aggregator.CountMVValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class CountMVStarTreeV2Test extends BaseStarTreeV2Test<Object, Long> {
+
+ @Override
+ ValueAggregator<Object, Long> getValueAggregator() {
+ return new CountMVValueAggregator();
+ }
+
+ @Override
+ DataType getRawValueType() {
+ return DataType.INT;
+ }
+
+ @Override
+ Object getRandomRawValue(Random random) {
+ return random.nextInt();
+ }
+
+ @Override
+ protected void assertAggregatedValue(Long starTreeResult, Long
nonStarTreeResult) {
+ assertEquals(starTreeResult, nonStarTreeResult);
+ }
+
+ @Override
+ protected boolean isAggColSingleValueField() {
+ return false;
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/SumMVStarTreeV2Test.java
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/SumMVStarTreeV2Test.java
new file mode 100644
index 00000000000..009a7a9863b
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/SumMVStarTreeV2Test.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.startree.v2;
+
+import java.util.Random;
+import org.apache.pinot.segment.local.aggregator.SumMVValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class SumMVStarTreeV2Test extends BaseStarTreeV2Test<Object, Double> {
+
+ @Override
+ ValueAggregator<Object, Double> getValueAggregator() {
+ return new SumMVValueAggregator();
+ }
+
+ @Override
+ DataType getRawValueType() {
+ return DataType.INT;
+ }
+
+ @Override
+ Object getRandomRawValue(Random random) {
+ return random.nextInt();
+ }
+
+ @Override
+ protected void assertAggregatedValue(Double starTreeResult, Double
nonStarTreeResult) {
+ assertEquals(starTreeResult, nonStarTreeResult, 1e-5);
+ }
+
+ @Override
+ protected boolean isAggColSingleValueField() {
+ return false;
+ }
+}
diff --git a/pinot-core/src/test/resources/TableIndexingTest.csv
b/pinot-core/src/test/resources/TableIndexingTest.csv
index 420f2c72fe7..96042989c34 100644
--- a/pinot-core/src/test/resources/TableIndexingTest.csv
+++ b/pinot-core/src/test/resources/TableIndexingTest.csv
@@ -44,7 +44,7 @@ INT;mv;dict;json_index;false;Cannot create JSON index on
multi-value column: col
INT;mv;dict;native_text_index;false;Cannot create TEXT index on column: col of
stored type other than STRING
INT;mv;dict;text_index;false;Cannot create TEXT index on column: col of stored
type other than STRING
INT;mv;dict;range_index;true;
-INT;mv;dict;startree_index;false;Star-tree index can only be created on
single-value columns, but found multi-value column: col
+INT;mv;dict;startree_index;false;Star-tree dimension columns must be
single-value, but found multi-value column: col
INT;mv;dict;vector_index;false;Cannot create vector index on column: col of
stored type other than FLOAT
INT;mv;dict;multi_col_text_index;false;Cannot create TEXT index on column: col
of stored type other than STRING
LONG;sv;raw;timestamp_index;true;
@@ -92,7 +92,7 @@ LONG;mv;dict;json_index;false;Cannot create JSON index on
multi-value column: co
LONG;mv;dict;native_text_index;false;Cannot create TEXT index on column: col
of stored type other than STRING
LONG;mv;dict;text_index;false;Cannot create TEXT index on column: col of
stored type other than STRING
LONG;mv;dict;range_index;true;
-LONG;mv;dict;startree_index;false;Star-tree index can only be created on
single-value columns, but found multi-value column: col
+LONG;mv;dict;startree_index;false;Star-tree dimension columns must be
single-value, but found multi-value column: col
LONG;mv;dict;vector_index;false;Cannot create vector index on column: col of
stored type other than FLOAT
LONG;mv;dict;multi_col_text_index;false;Cannot create TEXT index on column:
col of stored type other than STRING
FLOAT;sv;raw;timestamp_index;false;Cannot create TIMESTAMP index on column:
col of stored type other than LONG
@@ -140,7 +140,7 @@ FLOAT;mv;dict;json_index;false;Cannot create JSON index on
multi-value column: c
FLOAT;mv;dict;native_text_index;false;Cannot create TEXT index on column: col
of stored type other than STRING
FLOAT;mv;dict;text_index;false;Cannot create TEXT index on column: col of
stored type other than STRING
FLOAT;mv;dict;range_index;true;
-FLOAT;mv;dict;startree_index;false;Star-tree index can only be created on
single-value columns, but found multi-value column: col
+FLOAT;mv;dict;startree_index;false;Star-tree dimension columns must be
single-value, but found multi-value column: col
FLOAT;mv;dict;vector_index;true;
FLOAT;mv;dict;multi_col_text_index;false;Cannot create TEXT index on column:
col of stored type other than STRING
DOUBLE;sv;raw;timestamp_index;false;Cannot create TIMESTAMP index on column:
col of stored type other than LONG
@@ -188,7 +188,7 @@ DOUBLE;mv;dict;json_index;false;Cannot create JSON index on
multi-value column:
DOUBLE;mv;dict;native_text_index;false;Cannot create TEXT index on column: col
of stored type other than STRING
DOUBLE;mv;dict;text_index;false;Cannot create TEXT index on column: col of
stored type other than STRING
DOUBLE;mv;dict;range_index;true;
-DOUBLE;mv;dict;startree_index;false;Star-tree index can only be created on
single-value columns, but found multi-value column: col
+DOUBLE;mv;dict;startree_index;false;Star-tree dimension columns must be
single-value, but found multi-value column: col
DOUBLE;mv;dict;vector_index;false;Cannot create vector index on column: col of
stored type other than FLOAT
DOUBLE;mv;dict;multi_col_text_index;false;Cannot create TEXT index on column:
col of stored type other than STRING
DECIMAL;sv_BIG;raw;timestamp_index;false;Cannot create TIMESTAMP index on
column: col of stored type other than LONG
@@ -260,7 +260,7 @@ BOOLEAN;mv;dict;json_index;false;Cannot create JSON index
on multi-value column:
BOOLEAN;mv;dict;native_text_index;false;Cannot create TEXT index on column:
col of stored type other than STRING
BOOLEAN;mv;dict;text_index;false;Cannot create TEXT index on column: col of
stored type other than STRING
BOOLEAN;mv;dict;range_index;true;
-BOOLEAN;mv;dict;startree_index;false;Star-tree index can only be created on
single-value columns, but found multi-value column: col
+BOOLEAN;mv;dict;startree_index;false;Star-tree dimension columns must be
single-value, but found multi-value column: col
BOOLEAN;mv;dict;vector_index;false;Cannot create vector index on column: col
of stored type other than FLOAT
BOOLEAN;mv;dict;multi_col_text_index;false;Cannot create TEXT index on column:
col of stored type other than STRING
TIMESTAMP;sv;raw;timestamp_index;true;
@@ -308,7 +308,7 @@ TIMESTAMP;mv;dict;json_index;false;Cannot create JSON index
on multi-value colum
TIMESTAMP;mv;dict;native_text_index;false;Cannot create TEXT index on column:
col of stored type other than STRING
TIMESTAMP;mv;dict;text_index;false;Cannot create TEXT index on column: col of
stored type other than STRING
TIMESTAMP;mv;dict;range_index;true;
-TIMESTAMP;mv;dict;startree_index;false;Star-tree index can only be created on
single-value columns, but found multi-value column: col
+TIMESTAMP;mv;dict;startree_index;false;Star-tree dimension columns must be
single-value, but found multi-value column: col
TIMESTAMP;mv;dict;vector_index;false;Cannot create vector index on column: col
of stored type other than FLOAT
TIMESTAMP;mv;dict;multi_col_text_index;false;Cannot create TEXT index on
column: col of stored type other than STRING
STRING;sv;raw;timestamp_index;false;Cannot create TIMESTAMP index on column:
col of stored type other than LONG
@@ -356,7 +356,7 @@ STRING;mv;dict;json_index;false;Cannot create JSON index on
multi-value column:
STRING;mv;dict;native_text_index;true;
STRING;mv;dict;text_index;true;
STRING;mv;dict;range_index;true;
-STRING;mv;dict;startree_index;false;Star-tree index can only be created on
single-value columns, but found multi-value column: col
+STRING;mv;dict;startree_index;false;Star-tree dimension columns must be
single-value, but found multi-value column: col
STRING;mv;dict;vector_index;false;Cannot create vector index on column: col of
stored type other than FLOAT
STRING;mv;dict;multi_col_text_index;true;
JSON;sv;raw;timestamp_index;false;Cannot create TIMESTAMP index on column: col
of stored type other than LONG
@@ -428,7 +428,7 @@ BYTES;mv;dict;json_index;false;Cannot create JSON index on
multi-value column: c
BYTES;mv;dict;native_text_index;false;Cannot create TEXT index on column: col
of stored type other than STRING
BYTES;mv;dict;text_index;false;Cannot create TEXT index on column: col of
stored type other than STRING
BYTES;mv;dict;range_index;false;Caught exception while reading data
-BYTES;mv;dict;startree_index;false;Star-tree index can only be created on
single-value columns, but found multi-value column: col
+BYTES;mv;dict;startree_index;false;Star-tree dimension columns must be
single-value, but found multi-value column: col
BYTES;mv;dict;vector_index;false;Cannot create vector index on column: col of
stored type other than FLOAT
BYTES;mv;dict;multi_col_text_index;false;Caught exception while reading data
STRING;map;raw;timestamp_index;false;Cannot create TIMESTAMP index on column:
col of stored type other than LONG
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
index 276ffe53b38..6480f933070 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
@@ -63,7 +63,7 @@ import static org.testng.Assert.assertTrue;
public class StarTreeClusterIntegrationTest extends BaseClusterIntegrationTest
{
public static final String FILTER_STARTREE_INDEX = "FILTER_STARTREE_INDEX";
private static final String SCHEMA_FILE_NAME =
-
"On_Time_On_Time_Performance_2014_100k_subset_nonulls_single_value_columns.schema";
+
"On_Time_On_Time_Performance_2014_100k_subset_nonulls_columns.schema";
private static final int NUM_STAR_TREE_DIMENSIONS = 5;
private static final int NUM_STAR_TREE_METRICS = 5;
private static final List<AggregationFunctionType>
AGGREGATION_FUNCTION_TYPES =
@@ -112,7 +112,9 @@ public class StarTreeClusterIntegrationTest extends
BaseClusterIntegrationTest {
int starTree1MaxLeafRecords = 10;
// Randomly pick some dimensions and metrics for the second star-tree
+ // Exclude TotalAddGTime since it's a multi-value column, should not be in
dimension split order.
List<String> allDimensions = new ArrayList<>(schema.getDimensionNames());
+ allDimensions.remove("TotalAddGTime");
Collections.shuffle(allDimensions, _random);
List<String> starTree2Dimensions = allDimensions.subList(0,
NUM_STAR_TREE_DIMENSIONS);
List<String> allMetrics = new ArrayList<>(schema.getMetricNames());
@@ -120,10 +122,16 @@ public class StarTreeClusterIntegrationTest extends
BaseClusterIntegrationTest {
List<String> starTree2Metrics = allMetrics.subList(0,
NUM_STAR_TREE_METRICS);
int starTree2MaxLeafRecords = 100;
+ // Tests StarTree aggregate for multi-value column
+ List<String> starTree3Dimensions =
+ Arrays.asList("OriginCityName", "DepTimeBlk", "LongestAddGTime",
"CRSDepTime", "DivArrDelay");
+ int starTree3MaxLeafRecords = 10;
+
TableConfig tableConfig = createOfflineTableConfig();
tableConfig.getIndexingConfig().setStarTreeIndexConfigs(
Arrays.asList(getStarTreeIndexConfig(starTree1Dimensions,
starTree1Metrics, starTree1MaxLeafRecords),
- getStarTreeIndexConfig(starTree2Dimensions, starTree2Metrics,
starTree2MaxLeafRecords)));
+ getStarTreeIndexConfig(starTree2Dimensions, starTree2Metrics,
starTree2MaxLeafRecords),
+ getStarTreeIndexConfigForMVColAgg(starTree3Dimensions,
starTree3MaxLeafRecords)));
addTableConfig(tableConfig);
// Unpack the Avro files
@@ -166,6 +174,14 @@ public class StarTreeClusterIntegrationTest extends
BaseClusterIntegrationTest {
return new StarTreeIndexConfig(dimensions, null, null, aggregationConfigs,
maxLeafRecords);
}
+ private static StarTreeIndexConfig
getStarTreeIndexConfigForMVColAgg(List<String> dimensions, int maxLeafRecords) {
+ List<StarTreeAggregationConfig> aggregationConfigs = new ArrayList<>();
+ aggregationConfigs.add(new StarTreeAggregationConfig("TotalAddGTime",
"COUNTMV"));
+ aggregationConfigs.add(new StarTreeAggregationConfig("TotalAddGTime",
"SUMMV"));
+ aggregationConfigs.add(new StarTreeAggregationConfig("TotalAddGTime",
"AVGMV"));
+ return new StarTreeIndexConfig(dimensions, null, null, aggregationConfigs,
maxLeafRecords);
+ }
+
@Test(dataProvider = "useBothQueryEngines")
public void testGeneratedQueries(boolean useMultiStageQueryEngine)
throws Exception {
@@ -209,6 +225,32 @@ public class StarTreeClusterIntegrationTest extends
BaseClusterIntegrationTest {
testStarQuery(starQuery, !useMultiStageQueryEngine);
}
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testMultiValueColumnAggregations(boolean
useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+ String starQuery = "SELECT COUNTMV(TotalAddGTime), SUMMV(TotalAddGTime),
AVGMV(TotalAddGTime) FROM mytable";
+ testStarQuery(starQuery, !useMultiStageQueryEngine);
+
+ starQuery = "SELECT OriginCityName, COUNTMV(TotalAddGTime),
AVGMV(TotalAddGTime), SUMMV(TotalAddGTime) "
+ + "FROM mytable GROUP BY OriginCityName ORDER BY OriginCityName";
+ testStarQuery(starQuery, !useMultiStageQueryEngine);
+
+ starQuery = "SELECT DepTimeBlk, SUMMV(TotalAddGTime), AVGMV(TotalAddGTime)
FROM mytable "
+ + "WHERE CRSDepTime > 1000 GROUP BY DepTimeBlk ORDER BY DepTimeBlk";
+ testStarQuery(starQuery, !useMultiStageQueryEngine);
+
+ starQuery = "SELECT OriginCityName, DepTimeBlk, SUMMV(TotalAddGTime) FROM
mytable "
+ + "GROUP BY OriginCityName, DepTimeBlk ORDER BY OriginCityName,
DepTimeBlk LIMIT 100";
+ testStarQuery(starQuery, !useMultiStageQueryEngine);
+
+ starQuery = "SELECT CRSDepTime, AVGMV(TotalAddGTime) FROM mytable "
+ + "WHERE CRSDepTime BETWEEN 800 AND 1200 AND DivArrDelay < 100 "
+ + "GROUP BY CRSDepTime ORDER BY CRSDepTime";
+ testStarQuery(starQuery, !useMultiStageQueryEngine);
+ }
+
private void testStarQuery(String starQuery, boolean verifyPlan)
throws Exception {
String explain = "EXPLAIN PLAN FOR ";
diff --git
a/pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_single_value_columns.schema
b/pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_columns.schema
similarity index 98%
rename from
pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_single_value_columns.schema
rename to
pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_columns.schema
index b0b2e7c450b..dfdfa576328 100644
---
a/pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_single_value_columns.schema
+++
b/pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_columns.schema
@@ -211,7 +211,8 @@
},
{
"name": "TotalAddGTime",
- "dataType": "INT"
+ "dataType": "INT",
+ "singleValueField": false
}
],
"metricFieldSpecs": [
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/AvgMVValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/AvgMVValueAggregator.java
new file mode 100644
index 00000000000..1ad4dbfaddd
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/AvgMVValueAggregator.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.local.customobject.AvgPair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+/**
+ * Value aggregator for AVGMV aggregation function.
+ * This aggregator handles multi-value columns by computing the average across
all values in all arrays.
+ */
+public class AvgMVValueAggregator extends AvgValueAggregator {
+
+ @Override
+ public AggregationFunctionType getAggregationType() {
+ return AggregationFunctionType.AVGMV;
+ }
+
+ @Override
+ public AvgPair getInitialAggregatedValue(@Nullable Object rawValue) {
+ if (rawValue == null) {
+ return new AvgPair();
+ }
+ if (rawValue instanceof byte[]) {
+ return deserializeAggregatedValue((byte[]) rawValue);
+ } else {
+ return processMultiValueArray(rawValue);
+ }
+ }
+
+ @Override
+ public AvgPair applyRawValue(AvgPair value, Object rawValue) {
+ if (rawValue instanceof byte[]) {
+ value.apply(deserializeAggregatedValue((byte[]) rawValue));
+ } else {
+ AvgPair mvResult = processMultiValueArray(rawValue);
+ value.apply(mvResult);
+ }
+ return value;
+ }
+
+ /**
+ * Processes a multi-value array and returns an AvgPair with the sum and
count.
+ * The rawValue can be an Object[] array containing numeric values.
+ */
+ private AvgPair processMultiValueArray(Object rawValue) {
+ if (rawValue instanceof Object[]) {
+ Object[] values = (Object[]) rawValue;
+ AvgPair avgPair = new AvgPair();
+ for (Object value : values) {
+ if (value != null) {
+ avgPair.apply(ValueAggregatorUtils.toDouble(value));
+ }
+ }
+ return avgPair;
+ } else {
+ return new AvgPair(ValueAggregatorUtils.toDouble(rawValue), 1L);
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/CountMVValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/CountMVValueAggregator.java
new file mode 100644
index 00000000000..7b26515fb18
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/CountMVValueAggregator.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+/**
+ * Value aggregator for COUNTMV aggregation function.
+ * This aggregator handles multi-value columns by counting all values in all
arrays.
+ */
+public class CountMVValueAggregator extends CountValueAggregator {
+
+ @Override
+ public AggregationFunctionType getAggregationType() {
+ return AggregationFunctionType.COUNTMV;
+ }
+
+ @Override
+ public Long getInitialAggregatedValue(@Nullable Object rawValue) {
+ if (rawValue == null) {
+ return 0L;
+ }
+ return processMultiValueArray(rawValue);
+ }
+
+ @Override
+ public Long applyRawValue(Long value, Object rawValue) {
+ if (rawValue == null) {
+ return value;
+ }
+ return value + processMultiValueArray(rawValue);
+ }
+
+ /**
+ * Processes a multi-value array and returns the count of non-null values.
+ * The rawValue can be an Object[] array containing values.
+ */
+ private Long processMultiValueArray(Object rawValue) {
+ if (rawValue instanceof Object[]) {
+ Object[] values = (Object[]) rawValue;
+ long count = 0;
+ for (Object value : values) {
+ if (value != null) {
+ count++;
+ }
+ }
+ return count;
+ } else {
+ return 1L;
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumMVValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumMVValueAggregator.java
new file mode 100644
index 00000000000..19d3a208c2a
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumMVValueAggregator.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+/**
+ * Value aggregator for SUMMV aggregation function.
+ * This aggregator handles multi-value columns by summing all values in all
arrays.
+ */
+public class SumMVValueAggregator extends SumValueAggregator {
+
+ @Override
+ public AggregationFunctionType getAggregationType() {
+ return AggregationFunctionType.SUMMV;
+ }
+
+ @Override
+ public Double getInitialAggregatedValue(@Nullable Object rawValue) {
+ if (rawValue == null) {
+ return 0.0;
+ }
+ return processMultiValueArray(rawValue);
+ }
+
+ @Override
+ public Double applyRawValue(Double value, Object rawValue) {
+ if (rawValue == null) {
+ return value;
+ }
+ return value + processMultiValueArray(rawValue);
+ }
+
+ /**
+ * Processes a multi-value array and returns the sum of all values.
+ * The rawValue can be an Object[] array containing numeric values.
+ */
+ private Double processMultiValueArray(Object rawValue) {
+ if (rawValue instanceof Object[]) {
+ Object[] values = (Object[]) rawValue;
+ double sum = 0.0;
+ for (Object value : values) {
+ if (value != null) {
+ sum += ValueAggregatorUtils.toDouble(value);
+ }
+ }
+ return sum;
+ } else {
+ return ValueAggregatorUtils.toDouble(rawValue);
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
index 4ada0134954..6e0d9fac708 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
@@ -44,16 +44,22 @@ public class ValueAggregatorFactory {
switch (aggregationType) {
case COUNT:
return new CountValueAggregator();
+ case COUNTMV:
+ return new CountMVValueAggregator();
case MIN:
return new MinValueAggregator();
case MAX:
return new MaxValueAggregator();
case SUM:
return new SumValueAggregator();
+ case SUMMV:
+ return new SumMVValueAggregator();
case SUMPRECISION:
return new SumPrecisionValueAggregator(arguments);
case AVG:
return new AvgValueAggregator();
+ case AVGMV:
+ return new AvgMVValueAggregator();
case MINMAXRANGE:
return new MinMaxRangeValueAggregator();
case DISTINCTCOUNTBITMAP:
@@ -99,16 +105,22 @@ public class ValueAggregatorFactory {
switch (aggregationType) {
case COUNT:
return CountValueAggregator.AGGREGATED_VALUE_TYPE;
+ case COUNTMV:
+ return CountMVValueAggregator.AGGREGATED_VALUE_TYPE;
case MIN:
return MinValueAggregator.AGGREGATED_VALUE_TYPE;
case MAX:
return MaxValueAggregator.AGGREGATED_VALUE_TYPE;
case SUM:
return SumValueAggregator.AGGREGATED_VALUE_TYPE;
+ case SUMMV:
+ return SumMVValueAggregator.AGGREGATED_VALUE_TYPE;
case SUMPRECISION:
return SumPrecisionValueAggregator.AGGREGATED_VALUE_TYPE;
case AVG:
return AvgValueAggregator.AGGREGATED_VALUE_TYPE;
+ case AVGMV:
+ return AvgMVValueAggregator.AGGREGATED_VALUE_TYPE;
case MINMAXRANGE:
return MinMaxRangeValueAggregator.AGGREGATED_VALUE_TYPE;
case DISTINCTCOUNTBITMAP:
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index fea18b4c9b4..1656009ddc2 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -1281,7 +1282,7 @@ public final class TableConfigUtils {
/// - All referenced columns exist in the schema and are single-valued
private static void validateStarTreeIndexConfigs(List<StarTreeIndexConfig>
starTreeIndexConfigs,
Map<String, FieldIndexConfigs> indexConfigsMap, Schema schema) {
- Set<String> referencedColumns = new HashSet<>();
+ Set<String> dimensionColumns = new HashSet<>();
for (StarTreeIndexConfig starTreeIndexConfig : starTreeIndexConfigs) {
// Validate dimension columns are dictionary encoded
List<String> dimensionsSplitOrder =
starTreeIndexConfig.getDimensionsSplitOrder();
@@ -1292,7 +1293,7 @@ public final class TableConfigUtils {
"Failed to find dimension column: %s specified in star-tree index
config in schema", dimension);
Preconditions.checkState(indexConfigs.getConfig(StandardIndexes.dictionary()).isEnabled(),
"Cannot create star-tree index on dimension column: %s without
dictionary", dimension);
- referencedColumns.add(dimension);
+ dimensionColumns.add(dimension);
}
// Validate 'dimensionsSplitOrder' contains all dimensions in
'skipStarNodeCreationForDimensions'
@@ -1312,6 +1313,7 @@ public final class TableConfigUtils {
"Either 'functionColumnPairs' or 'aggregationConfigs' must be
specified, but not both");
Set<AggregationFunctionColumnPair> functionColumnPairsSet = new
HashSet<>();
Set<AggregationFunctionColumnPair> storedTypes = new HashSet<>();
+ Set<String> aggregatedColumns = new HashSet<>();
if (functionColumnPairs != null) {
for (String functionColumnPair : functionColumnPairs) {
AggregationFunctionColumnPair columnPair;
@@ -1333,7 +1335,7 @@ public final class TableConfigUtils {
}
String column = columnPair.getColumn();
if (!column.equals(AggregationFunctionColumnPair.STAR)) {
- referencedColumns.add(column);
+ aggregatedColumns.add(column);
} else if (columnPair.getFunctionType() !=
AggregationFunctionType.COUNT) {
throw new IllegalStateException("Non-COUNT function set the column
as '*' in the functionColumnPair: "
+ functionColumnPair + ". Please configure an actual column
for the function");
@@ -1362,7 +1364,7 @@ public final class TableConfigUtils {
}
String column = columnPair.getColumn();
if (!column.equals(AggregationFunctionColumnPair.STAR)) {
- referencedColumns.add(column);
+ aggregatedColumns.add(column);
} else if (columnPair.getFunctionType() !=
AggregationFunctionType.COUNT) {
throw new IllegalStateException("Non-COUNT function set the column
as '*' in the aggregationConfig for "
+ "function: " + aggregationConfig.getAggregationFunction()
@@ -1371,16 +1373,19 @@ public final class TableConfigUtils {
}
}
- // Validate all referenced columns exist in the schema and are
single-valued
- for (String column : referencedColumns) {
+ for (String column : Iterables.concat(dimensionColumns,
aggregatedColumns)) {
FieldSpec fieldSpec = schema.getFieldSpecFor(column);
Preconditions.checkState(fieldSpec != null,
"Failed to find column: %s specified in star-tree index config in
schema", column);
- Preconditions.checkState(fieldSpec.isSingleValueField(),
- "Star-tree index can only be created on single-value columns, but
found multi-value column: %s", column);
Preconditions.checkState(fieldSpec.getDataType() != DataType.MAP,
"Star-tree index cannot be created on MAP column: %s", column);
}
+
+ for (String column : dimensionColumns) {
+ FieldSpec fieldSpec = schema.getFieldSpecFor(column);
+ Preconditions.checkState(fieldSpec.isSingleValueField(),
+ "Star-tree dimension columns must be single-value, but found
multi-value column: %s", column);
+ }
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorTest.java
index d983ce259c7..f98fac81a8c 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorTest.java
@@ -41,12 +41,15 @@ public class ValueAggregatorTest {
public static Object[][] fixedSizeAggregatedValue() {
return new Object[][]{
{AggregationFunctionType.COUNT, List.of(), true},
+ {AggregationFunctionType.COUNTMV, List.of(), true},
{AggregationFunctionType.MIN, List.of(), true},
{AggregationFunctionType.MAX, List.of(), true},
{AggregationFunctionType.SUM, List.of(), true},
+ {AggregationFunctionType.SUMMV, List.of(), true},
{AggregationFunctionType.SUMPRECISION, List.of(), false},
{AggregationFunctionType.SUMPRECISION,
List.of(ExpressionContext.forLiteral(Literal.intValue(20))), true},
{AggregationFunctionType.AVG, List.of(), true},
+ {AggregationFunctionType.AVGMV, List.of(), true},
{AggregationFunctionType.MINMAXRANGE, List.of(), true},
{AggregationFunctionType.DISTINCTCOUNTBITMAP, List.of(), false},
{AggregationFunctionType.DISTINCTCOUNTHLL, List.of(), true},
diff --git
a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
index 856719e0582..5558a95609a 100644
---
a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
+++
b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
@@ -66,6 +66,17 @@
"MAX__ArrDelay"
],
"maxLeafRecords": 10
+ },
+ {
+ "dimensionsSplitOrder": [
+ "AirlineID"
+ ],
+ "functionColumnPairs": [
+ "AVGMV__DivLongestGTimes",
+ "COUNTMV__DivLongestGTimes",
+ "SUMMV__DivLongestGTimes"
+ ],
+ "maxLeafRecords": 10
}
],
"enableDynamicStarTreeCreation": true,
diff --git
a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_schema.json
b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_schema.json
index 563e5caa196..703e3e27160 100644
---
a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_schema.json
+++
b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_schema.json
@@ -188,7 +188,7 @@
"singleValueField": false
},
{
- "dataType": "INT",
+ "dataType": "LONG",
"name": "DivTotalGTimes",
"singleValueField": false
},
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]