yashmayya commented on code in PR #16836:
URL: https://github.com/apache/pinot/pull/16836#discussion_r2453079784
##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgMVAggregationFunction.java:
##########
@@ -76,8 +106,8 @@ public void aggregateGroupBySV(int length, int[]
groupKeyArray, GroupByResultHol
public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
- double[][] valuesArray = blockValSet.getDoubleValuesMV();
+ double[][] valuesArray = blockValSet.getDoubleValuesMV();
Review Comment:
nit: unnecessary whitespace change (same with other MV aggregation classes)
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/AvgMVValueAggregator.java:
##########
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.local.customobject.AvgPair;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Value aggregator for AVGMV aggregation function.
+ * This aggregator handles multi-value columns by computing the average across
all values in all arrays.
+ */
+public class AvgMVValueAggregator implements ValueAggregator<Object, AvgPair> {
Review Comment:
Consider extending `AvgValueAggregator` instead and only overriding required
methods since it seems they share a lot of code in common.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/CountMVValueAggregator.java:
##########
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Value aggregator for COUNTMV aggregation function.
+ * This aggregator handles multi-value columns by counting all values in all
arrays.
+ */
+public class CountMVValueAggregator implements ValueAggregator<Object, Long> {
Review Comment:
Same comments and questions as above for `AvgMVValueAggregator`
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/AvgMVValueAggregator.java:
##########
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.local.customobject.AvgPair;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Value aggregator for AVGMV aggregation function.
+ * This aggregator handles multi-value columns by computing the average across
all values in all arrays.
+ */
+public class AvgMVValueAggregator implements ValueAggregator<Object, AvgPair> {
+ public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
+
+ @Override
+ public AggregationFunctionType getAggregationType() {
+ return AggregationFunctionType.AVGMV;
+ }
+
+ @Override
+ public DataType getAggregatedValueType() {
+ return AGGREGATED_VALUE_TYPE;
+ }
+
+ @Override
+ public AvgPair getInitialAggregatedValue(@Nullable Object rawValue) {
+ if (rawValue == null) {
+ return new AvgPair();
+ }
+ if (rawValue instanceof byte[]) {
+ return deserializeAggregatedValue((byte[]) rawValue);
+ } else {
+ return processMultiValueArray(rawValue);
+ }
+ }
+
+ @Override
+ public AvgPair applyRawValue(AvgPair value, Object rawValue) {
+ if (rawValue instanceof byte[]) {
+ value.apply(deserializeAggregatedValue((byte[]) rawValue));
+ } else {
+ AvgPair mvResult = processMultiValueArray(rawValue);
+ value.apply(mvResult);
+ }
+ return value;
+ }
+
+ @Override
+ public AvgPair applyAggregatedValue(AvgPair value, AvgPair aggregatedValue) {
+ value.apply(aggregatedValue);
+ return value;
+ }
+
+ @Override
+ public AvgPair cloneAggregatedValue(AvgPair value) {
+ return new AvgPair(value.getSum(), value.getCount());
+ }
+
+ @Override
+ public boolean isAggregatedValueFixedSize() {
+ return true;
+ }
Review Comment:
This is used to determine whether a value aggregator can be used for
ingestion aggregations - have you verified if these MV value aggregators work
for ingestion aggregations too or only for star-tree indexes?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/AvgMVValueAggregator.java:
##########
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.local.customobject.AvgPair;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Value aggregator for AVGMV aggregation function.
+ * This aggregator handles multi-value columns by computing the average across
all values in all arrays.
+ */
+public class AvgMVValueAggregator implements ValueAggregator<Object, AvgPair> {
+ public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
+
+ @Override
+ public AggregationFunctionType getAggregationType() {
+ return AggregationFunctionType.AVGMV;
+ }
+
+ @Override
+ public DataType getAggregatedValueType() {
+ return AGGREGATED_VALUE_TYPE;
+ }
+
+ @Override
+ public AvgPair getInitialAggregatedValue(@Nullable Object rawValue) {
+ if (rawValue == null) {
+ return new AvgPair();
+ }
+ if (rawValue instanceof byte[]) {
+ return deserializeAggregatedValue((byte[]) rawValue);
+ } else {
+ return processMultiValueArray(rawValue);
+ }
+ }
+
+ @Override
+ public AvgPair applyRawValue(AvgPair value, Object rawValue) {
+ if (rawValue instanceof byte[]) {
+ value.apply(deserializeAggregatedValue((byte[]) rawValue));
+ } else {
+ AvgPair mvResult = processMultiValueArray(rawValue);
+ value.apply(mvResult);
+ }
+ return value;
+ }
+
+ @Override
+ public AvgPair applyAggregatedValue(AvgPair value, AvgPair aggregatedValue) {
+ value.apply(aggregatedValue);
+ return value;
+ }
+
+ @Override
+ public AvgPair cloneAggregatedValue(AvgPair value) {
+ return new AvgPair(value.getSum(), value.getCount());
+ }
+
+ @Override
+ public boolean isAggregatedValueFixedSize() {
+ return true;
+ }
+
+ @Override
+ public int getMaxAggregatedValueByteSize() {
+ return Double.BYTES + Long.BYTES;
+ }
+
+ @Override
+ public byte[] serializeAggregatedValue(AvgPair value) {
+ return CustomSerDeUtils.AVG_PAIR_SER_DE.serialize(value);
+ }
+
+ @Override
+ public AvgPair deserializeAggregatedValue(byte[] bytes) {
+ return CustomSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytes);
+ }
+
+ /**
+ * Processes a multi-value array and returns an AvgPair with the sum and
count.
+ * The rawValue can be an Object[] array containing numeric values.
+ */
+ private AvgPair processMultiValueArray(Object rawValue) {
+ if (rawValue instanceof Object[]) {
Review Comment:
Can't the raw value also be a primitive array?
cc - @Jackie-Jiang
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/AvgMVValueAggregator.java:
##########
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.local.customobject.AvgPair;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Value aggregator for AVGMV aggregation function.
+ * This aggregator handles multi-value columns by computing the average across
all values in all arrays.
+ */
+public class AvgMVValueAggregator implements ValueAggregator<Object, AvgPair> {
+ public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
+
+ @Override
+ public AggregationFunctionType getAggregationType() {
+ return AggregationFunctionType.AVGMV;
+ }
+
+ @Override
+ public DataType getAggregatedValueType() {
+ return AGGREGATED_VALUE_TYPE;
+ }
+
+ @Override
+ public AvgPair getInitialAggregatedValue(@Nullable Object rawValue) {
+ if (rawValue == null) {
+ return new AvgPair();
+ }
+ if (rawValue instanceof byte[]) {
+ return deserializeAggregatedValue((byte[]) rawValue);
+ } else {
+ return processMultiValueArray(rawValue);
+ }
+ }
+
+ @Override
+ public AvgPair applyRawValue(AvgPair value, Object rawValue) {
+ if (rawValue instanceof byte[]) {
+ value.apply(deserializeAggregatedValue((byte[]) rawValue));
+ } else {
+ AvgPair mvResult = processMultiValueArray(rawValue);
+ value.apply(mvResult);
+ }
+ return value;
+ }
+
+ @Override
+ public AvgPair applyAggregatedValue(AvgPair value, AvgPair aggregatedValue) {
+ value.apply(aggregatedValue);
+ return value;
+ }
+
+ @Override
+ public AvgPair cloneAggregatedValue(AvgPair value) {
+ return new AvgPair(value.getSum(), value.getCount());
+ }
+
+ @Override
+ public boolean isAggregatedValueFixedSize() {
+ return true;
+ }
+
+ @Override
+ public int getMaxAggregatedValueByteSize() {
+ return Double.BYTES + Long.BYTES;
+ }
+
+ @Override
+ public byte[] serializeAggregatedValue(AvgPair value) {
+ return CustomSerDeUtils.AVG_PAIR_SER_DE.serialize(value);
+ }
+
+ @Override
+ public AvgPair deserializeAggregatedValue(byte[] bytes) {
+ return CustomSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytes);
+ }
+
+ /**
+ * Processes a multi-value array and returns an AvgPair with the sum and
count.
+ * The rawValue can be an Object[] array containing numeric values.
+ */
+ private AvgPair processMultiValueArray(Object rawValue) {
+ if (rawValue instanceof Object[]) {
+ Object[] values = (Object[]) rawValue;
+ double sum = 0.0;
+ long count = 0;
+ for (Object value : values) {
+ if (value != null) {
+ sum += ValueAggregatorUtils.toDouble(value);
+ count++;
+ }
+ }
+ return new AvgPair(sum, count);
Review Comment:
```suggestion
AvgPair avgPair = new AvgPair();
for (Object value : values) {
if (value != null) {
avgPair.apply(ValueAggregatorUtils.toDouble(value));
}
}
return avgPair;
```
##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgMVAggregationFunction.java:
##########
@@ -43,8 +44,25 @@ public AggregationFunctionType getType() {
public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
- double[][] valuesArray = blockValSet.getDoubleValuesMV();
+ if (blockValSet.isSingleValue()) {
+ // StarTree pre-aggregated values: During StarTree creation, the
multi-value column is pre-aggregated per StarTree
Review Comment:
We usually use `star-tree` for the index name
##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgMVAggregationFunction.java:
##########
@@ -43,8 +44,25 @@ public AggregationFunctionType getType() {
public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
- double[][] valuesArray = blockValSet.getDoubleValuesMV();
+ if (blockValSet.isSingleValue()) {
+ // StarTree pre-aggregated values: During StarTree creation, the
multi-value column is pre-aggregated per StarTree
+ // node, resulting in a single value per node.
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ AvgPair avgPair = new AvgPair();
+ forEachNotNull(length, blockValSet, (from, to) -> {
Review Comment:
We don't really need `forEachNotNull` here because the star-tree index
pre-aggregated values can't be null right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]