This is an automated email from the ASF dual-hosted git repository. tingchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 3501b869c7 Add hyperLogLogPlus aggregation function for distinct count (#11346) 3501b869c7 is described below commit 3501b869c7ece2ddd088daf34c68a15ad6740a3a Author: deemoliu <qiao...@uber.com> AuthorDate: Thu Sep 21 16:48:17 2023 -0700 Add hyperLogLogPlus aggregation function for distinct count (#11346) * Add hyperLogLogPlus aggregation function for distinct count * address code comments * address code comments --- .../apache/pinot/core/common/ObjectSerDeUtils.java | 36 +- .../query/NonScanBasedAggregationOperator.java | 46 +- .../pinot/core/plan/AggregationPlanNode.java | 3 +- .../function/AggregationFunctionFactory.java | 8 + .../DistinctCountHLLPlusAggregationFunction.java | 471 +++++++++++++++++++++ .../DistinctCountHLLPlusMVAggregationFunction.java | 265 ++++++++++++ ...DistinctCountRawHLLPlusAggregationFunction.java | 115 +++++ ...stinctCountRawHLLPlusMVAggregationFunction.java | 36 ++ .../function/AggregationFunctionFactoryTest.java | 24 ++ ...terSegmentAggregationMultiValueQueriesTest.java | 75 ++++ ...SegmentAggregationMultiValueRawQueriesTest.java | 75 ++++ ...erSegmentAggregationSingleValueQueriesTest.java | 60 +++ .../pinot/queries/SerializedBytesQueriesTest.java | 121 +++++- .../tests/OfflineClusterIntegrationTest.java | 28 +- .../DistinctCountHLLPlusValueAggregator.java | 125 ++++++ .../local/aggregator/ValueAggregatorFactory.java | 6 + .../local/customobject/SerializedHLLPlus.java | 42 ++ .../segment/local/utils/CustomSerDeUtils.java | 29 ++ .../segment/local/utils/HyperLogLogPlusUtils.java | 43 ++ .../pinot/segment/spi/AggregationFunctionType.java | 16 + .../apache/pinot/spi/utils/CommonConstants.java | 2 + 21 files changed, 1613 insertions(+), 13 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java index 69c00ea5b8..4f05bad761 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.common; import com.clearspring.analytics.stream.cardinality.HyperLogLog; +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; import com.clearspring.analytics.stream.cardinality.RegisterSet; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -139,7 +140,9 @@ public class ObjectSerDeUtils { KllDataSketch(36), IntegerTupleSketch(37), FrequentStringsSketch(38), - FrequentLongsSketch(39); + FrequentLongsSketch(39), + HyperLogLogPlus(40); + private final int _value; @@ -235,6 +238,8 @@ public class ObjectSerDeUtils { return ObjectType.FrequentStringsSketch; } else if (value instanceof LongsSketch) { return ObjectType.FrequentLongsSketch; + } else if (value instanceof HyperLogLogPlus) { + return ObjectType.HyperLogLogPlus; } else { throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName()); } @@ -563,6 +568,34 @@ public class ObjectSerDeUtils { } }; + public static final ObjectSerDe<HyperLogLogPlus> HYPER_LOG_LOG_PLUS_SER_DE = new ObjectSerDe<HyperLogLogPlus>() { + + @Override + public byte[] serialize(HyperLogLogPlus hyperLogLogPlus) { + try { + return hyperLogLogPlus.getBytes(); + } catch (IOException e) { + throw new RuntimeException("Caught exception while serializing HyperLogLogPlus", e); + } + } + + @Override + public HyperLogLogPlus deserialize(byte[] bytes) { + try { + return HyperLogLogPlus.Builder.build(bytes); + } catch (IOException e) { + throw new RuntimeException("Caught exception while serializing HyperLogLogPlus", e); + } + } + + @Override + public HyperLogLogPlus deserialize(ByteBuffer byteBuffer) { + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return deserialize(bytes); + } + }; + public static final ObjectSerDe<DistinctTable> DISTINCT_TABLE_SER_DE = new ObjectSerDe<DistinctTable>() { @Override @@ -1377,6 +1410,7 @@ public class ObjectSerDeUtils { DATA_SKETCH_INT_TUPLE_SER_DE, FREQUENT_STRINGS_SKETCH_SER_DE, FREQUENT_LONGS_SKETCH_SER_DE, + HYPER_LOG_LOG_PLUS_SER_DE, }; //@formatter:on diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java index 2060135d38..fb6e8d02f6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.operator.query; import com.clearspring.analytics.stream.cardinality.HyperLogLog; +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet; import it.unimi.dsi.fastutil.floats.FloatOpenHashSet; import it.unimi.dsi.fastutil.ints.IntOpenHashSet; @@ -36,7 +37,9 @@ import org.apache.pinot.core.operator.ExecutionStatistics; import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.function.DistinctCountHLLAggregationFunction; +import org.apache.pinot.core.query.aggregation.function.DistinctCountHLLPlusAggregationFunction; import org.apache.pinot.core.query.aggregation.function.DistinctCountRawHLLAggregationFunction; +import org.apache.pinot.core.query.aggregation.function.DistinctCountRawHLLPlusAggregationFunction; import org.apache.pinot.core.query.aggregation.function.DistinctCountSmartHLLAggregationFunction; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.local.customobject.MinMaxRangePair; @@ -50,8 +53,8 @@ import org.apache.pinot.spi.utils.ByteArray; * Aggregation operator that utilizes dictionary or column metadata for serving aggregation queries to avoid scanning. * The scanless operator is selected in the plan maker, if the query is of aggregation type min, max, minmaxrange, * distinctcount, distinctcounthll, distinctcountrawhll, segmentpartitioneddistinctcount, distinctcountsmarthll, - * and the column has a dictionary, or has column metadata with min and max value defined. It also supports count(*) if - * the query has no filter. + * distinctcounthllplus, distinctcountrawhllplus, and the column has a dictionary, or has column metadata with min and + * max value defined. It also supports count(*) if the query has no filter. * We don't use this operator if the segment has star tree, * as the dictionary will have aggregated values for the metrics, and dimensions will have star node value. * @@ -118,6 +121,17 @@ public class NonScanBasedAggregationOperator extends BaseOperator<AggregationRes result = getDistinctCountHLLResult(Objects.requireNonNull(dataSource.getDictionary()), ((DistinctCountRawHLLAggregationFunction) aggregationFunction).getDistinctCountHLLAggregationFunction()); break; + case DISTINCTCOUNTHLLPLUS: + case DISTINCTCOUNTHLLPLUSMV: + result = getDistinctCountHLLPlusResult(Objects.requireNonNull(dataSource.getDictionary()), + (DistinctCountHLLPlusAggregationFunction) aggregationFunction); + break; + case DISTINCTCOUNTRAWHLLPLUS: + case DISTINCTCOUNTRAWHLLPLUSMV: + result = getDistinctCountHLLPlusResult(Objects.requireNonNull(dataSource.getDictionary()), + ((DistinctCountRawHLLPlusAggregationFunction) aggregationFunction) + .getDistinctCountHLLPlusAggregationFunction()); + break; case SEGMENTPARTITIONEDDISTINCTCOUNT: result = (long) Objects.requireNonNull(dataSource.getDictionary()).length(); break; @@ -215,6 +229,15 @@ public class NonScanBasedAggregationOperator extends BaseOperator<AggregationRes return hll; } + private static HyperLogLogPlus getDistinctValueHLLPlus(Dictionary dictionary, int p, int sp) { + HyperLogLogPlus hllPlus = new HyperLogLogPlus(p, sp); + int length = dictionary.length(); + for (int i = 0; i < length; i++) { + hllPlus.offer(dictionary.get(i)); + } + return hllPlus; + } + private static HyperLogLog getDistinctCountHLLResult(Dictionary dictionary, DistinctCountHLLAggregationFunction function) { if (dictionary.getValueType() == FieldSpec.DataType.BYTES) { @@ -234,6 +257,25 @@ public class NonScanBasedAggregationOperator extends BaseOperator<AggregationRes } } + private static HyperLogLogPlus getDistinctCountHLLPlusResult(Dictionary dictionary, + DistinctCountHLLPlusAggregationFunction function) { + if (dictionary.getValueType() == FieldSpec.DataType.BYTES) { + // Treat BYTES value as serialized HyperLogLogPlus + try { + HyperLogLogPlus hllplus = ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(dictionary.getBytesValue(0)); + int length = dictionary.length(); + for (int i = 1; i < length; i++) { + hllplus.addAll(ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(dictionary.getBytesValue(i))); + } + return hllplus; + } catch (Exception e) { + throw new RuntimeException("Caught exception while merging HyperLogLogPluses", e); + } + } else { + return getDistinctValueHLLPlus(dictionary, function.getP(), function.getSp()); + } + } + private static Object getDistinctCountSmartHLLResult(Dictionary dictionary, DistinctCountSmartHLLAggregationFunction function) { if (dictionary.length() > function.getThreshold()) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java index 2f53b0d238..95a61c10be 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java @@ -56,7 +56,8 @@ public class AggregationPlanNode implements PlanNode { private static final EnumSet<AggregationFunctionType> DICTIONARY_BASED_FUNCTIONS = EnumSet.of(MIN, MINMV, MAX, MAXMV, MINMAXRANGE, MINMAXRANGEMV, DISTINCTCOUNT, DISTINCTCOUNTMV, DISTINCTCOUNTHLL, DISTINCTCOUNTHLLMV, DISTINCTCOUNTRAWHLL, DISTINCTCOUNTRAWHLLMV, SEGMENTPARTITIONEDDISTINCTCOUNT, - DISTINCTCOUNTSMARTHLL, DISTINCTSUM, DISTINCTAVG, DISTINCTSUMMV, DISTINCTAVGMV); + DISTINCTCOUNTSMARTHLL, DISTINCTSUM, DISTINCTAVG, DISTINCTSUMMV, DISTINCTAVGMV, DISTINCTCOUNTHLLPLUS, + DISTINCTCOUNTHLLPLUSMV, DISTINCTCOUNTRAWHLLPLUS, DISTINCTCOUNTRAWHLLPLUSMV); // DISTINCTCOUNT excluded because consuming segment metadata contains unknown cardinality when there is no dictionary private static final EnumSet<AggregationFunctionType> METADATA_BASED_FUNCTIONS = diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java index 2d69c093f2..c426ed472f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java @@ -298,6 +298,14 @@ public class AggregationFunctionFactory { return new DistinctCountHLLMVAggregationFunction(arguments); case DISTINCTCOUNTRAWHLLMV: return new DistinctCountRawHLLMVAggregationFunction(arguments); + case DISTINCTCOUNTHLLPLUS: + return new DistinctCountHLLPlusAggregationFunction(arguments); + case DISTINCTCOUNTRAWHLLPLUS: + return new DistinctCountRawHLLPlusAggregationFunction(arguments); + case DISTINCTCOUNTHLLPLUSMV: + return new DistinctCountHLLPlusMVAggregationFunction(arguments); + case DISTINCTCOUNTRAWHLLPLUSMV: + return new DistinctCountRawHLLPlusMVAggregationFunction(arguments); case DISTINCTSUMMV: return new DistinctSumMVAggregationFunction(arguments); case DISTINCTAVGMV: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLPlusAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLPlusAggregationFunction.java new file mode 100644 index 0000000000..2ca7d4eec3 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLPlusAggregationFunction.java @@ -0,0 +1,471 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; +import com.google.common.base.Preconditions; +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.segment.spi.index.reader.Dictionary; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.utils.CommonConstants; +import org.roaringbitmap.PeekableIntIterator; +import org.roaringbitmap.RoaringBitmap; + + +public class DistinctCountHLLPlusAggregationFunction extends BaseSingleInputAggregationFunction<HyperLogLogPlus, Long> { + // The parameter "p" determines the precision of the sparse list in HyperLogLogPlus. + protected final int _p; + // The "sp" parameter specifies the number of standard deviations that the sparse list's precision should be set to. + protected final int _sp; + + public DistinctCountHLLPlusAggregationFunction(List<ExpressionContext> arguments) { + super(arguments.get(0)); + int numExpressions = arguments.size(); + // This function expects 1 or 2 or 3 arguments. + Preconditions.checkArgument(numExpressions <= 3, "DistinctCountHLLPlus expects 2 or 3 arguments, got: %s", + numExpressions); + if (arguments.size() == 2) { + _p = arguments.get(1).getLiteral().getIntValue(); + _sp = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_SP; + } else if (arguments.size() == 3) { + _p = arguments.get(1).getLiteral().getIntValue(); + _sp = arguments.get(2).getLiteral().getIntValue(); + } else { + _p = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_P; + _sp = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_SP; + } + } + + public int getP() { + return _p; + } + + public int getSp() { + return _sp; + } + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.DISTINCTCOUNTHLLPLUS; + } + + @Override + public AggregationResultHolder createAggregationResultHolder() { + return new ObjectAggregationResultHolder(); + } + + @Override + public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) { + return new ObjectGroupByResultHolder(initialCapacity, maxCapacity); + } + + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet blockValSet = blockValSetMap.get(_expression); + + // Treat BYTES value as serialized HyperLogLogPlus + DataType storedType = blockValSet.getValueType().getStoredType(); + if (storedType == DataType.BYTES) { + byte[][] bytesValues = blockValSet.getBytesValuesSV(); + try { + HyperLogLogPlus hyperLogLogPlus = aggregationResultHolder.getResult(); + if (hyperLogLogPlus == null) { + hyperLogLogPlus = ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(bytesValues[0]); + aggregationResultHolder.setValue(hyperLogLogPlus); + } else { + hyperLogLogPlus.addAll(ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(bytesValues[0])); + } + for (int i = 1; i < length; i++) { + hyperLogLogPlus.addAll(ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(bytesValues[i])); + } + } catch (Exception e) { + throw new RuntimeException("Caught exception while merging HyperLogLogPlus", e); + } + return; + } + + // For dictionary-encoded expression, store dictionary ids into the bitmap + Dictionary dictionary = blockValSet.getDictionary(); + if (dictionary != null) { + int[] dictIds = blockValSet.getDictionaryIdsSV(); + getDictIdBitmap(aggregationResultHolder, dictionary).addN(dictIds, 0, length); + return; + } + + // For non-dictionary-encoded expression, store values into the HyperLogLogPlus + HyperLogLogPlus hyperLogLogPlus = getHyperLogLogPlus(aggregationResultHolder); + switch (storedType) { + case INT: + int[] intValues = blockValSet.getIntValuesSV(); + for (int i = 0; i < length; i++) { + hyperLogLogPlus.offer(intValues[i]); + } + break; + case LONG: + long[] longValues = blockValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + hyperLogLogPlus.offer(longValues[i]); + } + break; + case FLOAT: + float[] floatValues = blockValSet.getFloatValuesSV(); + for (int i = 0; i < length; i++) { + hyperLogLogPlus.offer(floatValues[i]); + } + break; + case DOUBLE: + double[] doubleValues = blockValSet.getDoubleValuesSV(); + for (int i = 0; i < length; i++) { + hyperLogLogPlus.offer(doubleValues[i]); + } + break; + case STRING: + String[] stringValues = blockValSet.getStringValuesSV(); + for (int i = 0; i < length; i++) { + hyperLogLogPlus.offer(stringValues[i]); + } + break; + default: + throw new IllegalStateException( + "Illegal data type for DISTINCT_COUNT_HLL_PLUS aggregation function: " + storedType); + } + } + + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet blockValSet = blockValSetMap.get(_expression); + + // Treat BYTES value as serialized HyperLogLogPlus + DataType storedType = blockValSet.getValueType().getStoredType(); + if (storedType == DataType.BYTES) { + byte[][] bytesValues = blockValSet.getBytesValuesSV(); + try { + for (int i = 0; i < length; i++) { + HyperLogLogPlus value = ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(bytesValues[i]); + int groupKey = groupKeyArray[i]; + HyperLogLogPlus hyperLogLogPlus = groupByResultHolder.getResult(groupKey); + if (hyperLogLogPlus != null) { + hyperLogLogPlus.addAll(value); + } else { + groupByResultHolder.setValueForKey(groupKey, value); + } + } + } catch (Exception e) { + throw new RuntimeException("Caught exception while merging HyperLogLogPlus", e); + } + return; + } + + // For dictionary-encoded expression, store dictionary ids into the bitmap + Dictionary dictionary = blockValSet.getDictionary(); + if (dictionary != null) { + int[] dictIds = blockValSet.getDictionaryIdsSV(); + for (int i = 0; i < length; i++) { + getDictIdBitmap(groupByResultHolder, groupKeyArray[i], dictionary).add(dictIds[i]); + } + return; + } + + // For non-dictionary-encoded expression, store values into the HyperLogLogPlus + switch (storedType) { + case INT: + int[] intValues = blockValSet.getIntValuesSV(); + for (int i = 0; i < length; i++) { + getHyperLogLogPlus(groupByResultHolder, groupKeyArray[i]).offer(intValues[i]); + } + break; + case LONG: + long[] longValues = blockValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + getHyperLogLogPlus(groupByResultHolder, groupKeyArray[i]).offer(longValues[i]); + } + break; + case FLOAT: + float[] floatValues = blockValSet.getFloatValuesSV(); + for (int i = 0; i < length; i++) { + getHyperLogLogPlus(groupByResultHolder, groupKeyArray[i]).offer(floatValues[i]); + } + break; + case DOUBLE: + double[] doubleValues = blockValSet.getDoubleValuesSV(); + for (int i = 0; i < length; i++) { + getHyperLogLogPlus(groupByResultHolder, groupKeyArray[i]).offer(doubleValues[i]); + } + break; + case STRING: + String[] stringValues = blockValSet.getStringValuesSV(); + for (int i = 0; i < length; i++) { + getHyperLogLogPlus(groupByResultHolder, groupKeyArray[i]).offer(stringValues[i]); + } + break; + default: + throw new IllegalStateException( + "Illegal data type for DISTINCT_COUNT_HLL_PLUS aggregation function: " + storedType); + } + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet blockValSet = blockValSetMap.get(_expression); + + // Treat BYTES value as serialized HyperLogLogPlus + DataType storedType = blockValSet.getValueType().getStoredType(); + if (storedType == DataType.BYTES) { + byte[][] bytesValues = blockValSet.getBytesValuesSV(); + try { + for (int i = 0; i < length; i++) { + HyperLogLogPlus value = ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(bytesValues[i]); + for (int groupKey : groupKeysArray[i]) { + HyperLogLogPlus hyperLogLogPlus = groupByResultHolder.getResult(groupKey); + if (hyperLogLogPlus != null) { + hyperLogLogPlus.addAll(value); + } else { + // Create a new HyperLogLogPlus for the group + groupByResultHolder.setValueForKey(groupKey, + ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(bytesValues[i])); + } + } + } + } catch (Exception e) { + throw new RuntimeException("Caught exception while merging HyperLogLogPlus", e); + } + return; + } + + // For dictionary-encoded expression, store dictionary ids into the bitmap + Dictionary dictionary = blockValSet.getDictionary(); + if (dictionary != null) { + int[] dictIds = blockValSet.getDictionaryIdsSV(); + for (int i = 0; i < length; i++) { + setDictIdForGroupKeys(groupByResultHolder, groupKeysArray[i], dictionary, dictIds[i]); + } + return; + } + + // For non-dictionary-encoded expression, store values into the HyperLogLogPlus + switch (storedType) { + case INT: + int[] intValues = blockValSet.getIntValuesSV(); + for (int i = 0; i < length; i++) { + setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], intValues[i]); + } + break; + case LONG: + long[] longValues = blockValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], longValues[i]); + } + break; + case FLOAT: + float[] floatValues = blockValSet.getFloatValuesSV(); + for (int i = 0; i < length; i++) { + setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], floatValues[i]); + } + break; + case DOUBLE: + double[] doubleValues = blockValSet.getDoubleValuesSV(); + for (int i = 0; i < length; i++) { + setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], doubleValues[i]); + } + break; + case STRING: + String[] stringValues = blockValSet.getStringValuesSV(); + for (int i = 0; i < length; i++) { + setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], stringValues[i]); + } + break; + default: + throw new IllegalStateException( + "Illegal data type for DISTINCT_COUNT_HLL_PLUS aggregation function: " + storedType); + } + } + + @Override + public HyperLogLogPlus extractAggregationResult(AggregationResultHolder aggregationResultHolder) { + Object result = aggregationResultHolder.getResult(); + if (result == null) { + return new HyperLogLogPlus(_p, _sp); + } + + if (result instanceof DictIdsWrapper) { + // For dictionary-encoded expression, convert dictionary ids to HyperLogLogPlus + return convertToHyperLogLogPlus((DictIdsWrapper) result); + } else { + // For non-dictionary-encoded expression, directly return the HyperLogLogPlus + return (HyperLogLogPlus) result; + } + } + + @Override + public HyperLogLogPlus extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { + Object result = groupByResultHolder.getResult(groupKey); + if (result == null) { + return new HyperLogLogPlus(_p, _sp); + } + + if (result instanceof DictIdsWrapper) { + // For dictionary-encoded expression, convert dictionary ids to HyperLogLogPlus + return convertToHyperLogLogPlus((DictIdsWrapper) result); + } else { + // For non-dictionary-encoded expression, directly return the HyperLogLogPlus + return (HyperLogLogPlus) result; + } + } + + @Override + public HyperLogLogPlus merge(HyperLogLogPlus intermediateResult1, HyperLogLogPlus intermediateResult2) { + // Can happen when aggregating serialized HyperLogLogPlus with non-default p, sp values + if (intermediateResult1.sizeof() != intermediateResult2.sizeof()) { + if (intermediateResult1.cardinality() == 0) { + return intermediateResult2; + } else { + Preconditions.checkState(intermediateResult2.cardinality() == 0, + "Cannot merge HyperLogLogPlus of different sizes"); + return intermediateResult1; + } + } + try { + intermediateResult1.addAll(intermediateResult2); + } catch (Exception e) { + throw new RuntimeException("Caught exception while merging HyperLogLogPlus", e); + } + return intermediateResult1; + } + + @Override + public ColumnDataType getIntermediateResultColumnType() { + return ColumnDataType.OBJECT; + } + + @Override + public ColumnDataType getFinalResultColumnType() { + return ColumnDataType.LONG; + } + + @Override + public Long extractFinalResult(HyperLogLogPlus intermediateResult) { + return intermediateResult.cardinality(); + } + + /** + * Returns the dictionary id bitmap from the result holder or creates a new one if it does not exist. + */ + protected static RoaringBitmap getDictIdBitmap(AggregationResultHolder aggregationResultHolder, + Dictionary dictionary) { + DictIdsWrapper dictIdsWrapper = aggregationResultHolder.getResult(); + if (dictIdsWrapper == null) { + dictIdsWrapper = new DictIdsWrapper(dictionary); + aggregationResultHolder.setValue(dictIdsWrapper); + } + return dictIdsWrapper._dictIdBitmap; + } + + /** + * Returns the HyperLogLogPlus from the result holder or creates a new one if it does not exist. + */ + protected HyperLogLogPlus getHyperLogLogPlus(AggregationResultHolder aggregationResultHolder) { + HyperLogLogPlus hyperLogLogPlus = aggregationResultHolder.getResult(); + if (hyperLogLogPlus == null) { + hyperLogLogPlus = new HyperLogLogPlus(_p, _sp); + aggregationResultHolder.setValue(hyperLogLogPlus); + } + return hyperLogLogPlus; + } + + /** + * Returns the dictionary id bitmap for the given group key or creates a new one if it does not exist. + */ + protected static RoaringBitmap getDictIdBitmap(GroupByResultHolder groupByResultHolder, int groupKey, + Dictionary dictionary) { + DictIdsWrapper dictIdsWrapper = groupByResultHolder.getResult(groupKey); + if (dictIdsWrapper == null) { + dictIdsWrapper = new DictIdsWrapper(dictionary); + groupByResultHolder.setValueForKey(groupKey, dictIdsWrapper); + } + return dictIdsWrapper._dictIdBitmap; + } + + /** + * Returns the HyperLogLogPlus for the given group key or creates a new one if it does not exist. + */ + protected HyperLogLogPlus getHyperLogLogPlus(GroupByResultHolder groupByResultHolder, int groupKey) { + HyperLogLogPlus hyperLogLogPlus = groupByResultHolder.getResult(groupKey); + if (hyperLogLogPlus == null) { + hyperLogLogPlus = new HyperLogLogPlus(_p, _sp); + groupByResultHolder.setValueForKey(groupKey, hyperLogLogPlus); + } + return hyperLogLogPlus; + } + + /** + * Helper method to set dictionary id for the given group keys into the result holder. + */ + private static void setDictIdForGroupKeys(GroupByResultHolder groupByResultHolder, int[] groupKeys, + Dictionary dictionary, int dictId) { + for (int groupKey : groupKeys) { + getDictIdBitmap(groupByResultHolder, groupKey, dictionary).add(dictId); + } + } + + /** + * Helper method to set value for the given group keys into the result holder. + */ + private void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, int[] groupKeys, Object value) { + for (int groupKey : groupKeys) { + getHyperLogLogPlus(groupByResultHolder, groupKey).offer(value); + } + } + + /** + * Helper method to read dictionary and convert dictionary ids to HyperLogLogPlus for dictionary-encoded expression. + */ + private HyperLogLogPlus convertToHyperLogLogPlus(DictIdsWrapper dictIdsWrapper) { + HyperLogLogPlus hyperLogLogPlus = new HyperLogLogPlus(_p, _sp); + Dictionary dictionary = dictIdsWrapper._dictionary; + RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap; + PeekableIntIterator iterator = dictIdBitmap.getIntIterator(); + while (iterator.hasNext()) { + hyperLogLogPlus.offer(dictionary.get(iterator.next())); + } + return hyperLogLogPlus; + } + + private static final class DictIdsWrapper { + final Dictionary _dictionary; + final RoaringBitmap _dictIdBitmap; + + private DictIdsWrapper(Dictionary dictionary) { + _dictionary = dictionary; + _dictIdBitmap = new RoaringBitmap(); + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLPlusMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLPlusMVAggregationFunction.java new file mode 100644 index 0000000000..00abb1f5d2 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLPlusMVAggregationFunction.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.segment.spi.index.reader.Dictionary; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.roaringbitmap.RoaringBitmap; + + +public class DistinctCountHLLPlusMVAggregationFunction extends DistinctCountHLLPlusAggregationFunction { + + public DistinctCountHLLPlusMVAggregationFunction(List<ExpressionContext> arguments) { + super(arguments); + } + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.DISTINCTCOUNTHLLPLUSMV; + } + + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet blockValSet = blockValSetMap.get(_expression); + + // For dictionary-encoded expression, store dictionary ids into the bitmap + Dictionary dictionary = blockValSet.getDictionary(); + if (dictionary != null) { + RoaringBitmap dictIdBitmap = getDictIdBitmap(aggregationResultHolder, dictionary); + int[][] dictIds = blockValSet.getDictionaryIdsMV(); + for (int i = 0; i < length; i++) { + dictIdBitmap.add(dictIds[i]); + } + return; + } + + // For non-dictionary-encoded expression, store values into the HyperLogLog + HyperLogLogPlus hyperLogLogPlus = getHyperLogLogPlus(aggregationResultHolder); + DataType storedType = blockValSet.getValueType().getStoredType(); + switch (storedType) { + case INT: + int[][] intValuesArray = blockValSet.getIntValuesMV(); + for (int i = 0; i < length; i++) { + for (int value : intValuesArray[i]) { + hyperLogLogPlus.offer(value); + } + } + break; + case LONG: + long[][] longValuesArray = blockValSet.getLongValuesMV(); + for (int i = 0; i < length; i++) { + for (long value : longValuesArray[i]) { + hyperLogLogPlus.offer(value); + } + } + break; + case FLOAT: + float[][] floatValuesArray = blockValSet.getFloatValuesMV(); + for (int i = 0; i < length; i++) { + for (float value : floatValuesArray[i]) { + hyperLogLogPlus.offer(value); + } + } + break; + case DOUBLE: + double[][] doubleValuesArray = blockValSet.getDoubleValuesMV(); + for (int i = 0; i < length; i++) { + for (double value : doubleValuesArray[i]) { + hyperLogLogPlus.offer(value); + } + } + break; + case STRING: + String[][] stringValuesArray = blockValSet.getStringValuesMV(); + for (int i = 0; i < length; i++) { + for (String value : stringValuesArray[i]) { + hyperLogLogPlus.offer(value); + } + } + break; + default: + throw new IllegalStateException( + "Illegal data type for DISTINCT_COUNT_HLL_MV aggregation function: " + storedType); + } + } + + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet blockValSet = blockValSetMap.get(_expression); + + // For dictionary-encoded expression, store dictionary ids into the bitmap + Dictionary dictionary = blockValSet.getDictionary(); + if (dictionary != null) { + int[][] dictIds = blockValSet.getDictionaryIdsMV(); + for (int i = 0; i < length; i++) { + getDictIdBitmap(groupByResultHolder, groupKeyArray[i], dictionary).add(dictIds[i]); + } + return; + } + + // For non-dictionary-encoded expression, store values into the HyperLogLog + DataType storedType = blockValSet.getValueType().getStoredType(); + switch (storedType) { + case INT: + int[][] intValuesArray = blockValSet.getIntValuesMV(); + for (int i = 0; i < length; i++) { + HyperLogLogPlus hyperLogLogPlus = getHyperLogLogPlus(groupByResultHolder, groupKeyArray[i]); + for (int value : intValuesArray[i]) { + hyperLogLogPlus.offer(value); + } + } + break; + case LONG: + long[][] longValuesArray = blockValSet.getLongValuesMV(); + for (int i = 0; i < length; i++) { + HyperLogLogPlus hyperLogLogPlus = getHyperLogLogPlus(groupByResultHolder, groupKeyArray[i]); + for (long value : longValuesArray[i]) { + hyperLogLogPlus.offer(value); + } + } + break; + case FLOAT: + float[][] floatValuesArray = blockValSet.getFloatValuesMV(); + for (int i = 0; i < length; i++) { + HyperLogLogPlus hyperLogLogPlus = getHyperLogLogPlus(groupByResultHolder, groupKeyArray[i]); + for (float value : floatValuesArray[i]) { + hyperLogLogPlus.offer(value); + } + } + break; + case DOUBLE: + double[][] doubleValuesArray = blockValSet.getDoubleValuesMV(); + for (int i = 0; i < length; i++) { + HyperLogLogPlus hyperLogLogPlus = getHyperLogLogPlus(groupByResultHolder, groupKeyArray[i]); + for (double value : doubleValuesArray[i]) { + hyperLogLogPlus.offer(value); + } + } + break; + case STRING: + String[][] stringValuesArray = blockValSet.getStringValuesMV(); + for (int i = 0; i < length; i++) { + HyperLogLogPlus hyperLogLogPlus = getHyperLogLogPlus(groupByResultHolder, groupKeyArray[i]); + for (String value : stringValuesArray[i]) { + hyperLogLogPlus.offer(value); + } + } + break; + default: + throw new IllegalStateException( + "Illegal data type for DISTINCT_COUNT_HLL_MV aggregation function: " + storedType); + } + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet blockValSet = blockValSetMap.get(_expression); + + // For dictionary-encoded expression, store dictionary ids into the bitmap + Dictionary dictionary = blockValSet.getDictionary(); + if (dictionary != null) { + int[][] dictIds = blockValSet.getDictionaryIdsMV(); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + getDictIdBitmap(groupByResultHolder, groupKey, dictionary).add(dictIds[i]); + } + } + return; + } + + // For non-dictionary-encoded expression, store values into the HyperLogLog + DataType storedType = blockValSet.getValueType().getStoredType(); + switch (storedType) { + case INT: + int[][] intValuesArray = blockValSet.getIntValuesMV(); + for (int i = 0; i < length; i++) { + int[] intValues = intValuesArray[i]; + for (int groupKey : groupKeysArray[i]) { + HyperLogLogPlus hyperLogLogPlus = getHyperLogLogPlus(groupByResultHolder, groupKey); + for (int value : intValues) { + hyperLogLogPlus.offer(value); + } + } + } + break; + case LONG: + long[][] longValuesArray = blockValSet.getLongValuesMV(); + for (int i = 0; i < length; i++) { + long[] longValues = longValuesArray[i]; + for (int groupKey : groupKeysArray[i]) { + HyperLogLogPlus hyperLogLogPlus = getHyperLogLogPlus(groupByResultHolder, groupKey); + for (long value : longValues) { + hyperLogLogPlus.offer(value); + } + } + } + break; + case FLOAT: + float[][] floatValuesArray = blockValSet.getFloatValuesMV(); + for (int i = 0; i < length; i++) { + float[] floatValues = floatValuesArray[i]; + for (int groupKey : groupKeysArray[i]) { + HyperLogLogPlus hyperLogLogPlus = getHyperLogLogPlus(groupByResultHolder, groupKey); + for (float value : floatValues) { + hyperLogLogPlus.offer(value); + } + } + } + break; + case DOUBLE: + double[][] doubleValuesArray = blockValSet.getDoubleValuesMV(); + for (int i = 0; i < length; i++) { + double[] doubleValues = doubleValuesArray[i]; + for (int groupKey : groupKeysArray[i]) { + HyperLogLogPlus hyperLogLogPlus = getHyperLogLogPlus(groupByResultHolder, groupKey); + for (double value : doubleValues) { + hyperLogLogPlus.offer(value); + } + } + } + break; + case STRING: + String[][] stringValuesArray = blockValSet.getStringValuesMV(); + for (int i = 0; i < length; i++) { + String[] stringValues = stringValuesArray[i]; + for (int groupKey : groupKeysArray[i]) { + HyperLogLogPlus hyperLogLogPlus = getHyperLogLogPlus(groupByResultHolder, groupKey); + for (String value : stringValues) { + hyperLogLogPlus.offer(value); + } + } + } + break; + default: + throw new IllegalStateException( + "Illegal data type for DISTINCT_COUNT_HLL_MV aggregation function: " + storedType); + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLPlusAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLPlusAggregationFunction.java new file mode 100644 index 0000000000..facef6a222 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLPlusAggregationFunction.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.segment.local.customobject.SerializedHLLPlus; +import org.apache.pinot.segment.spi.AggregationFunctionType; + + +public class DistinctCountRawHLLPlusAggregationFunction + extends BaseSingleInputAggregationFunction<HyperLogLogPlus, SerializedHLLPlus> { + private final DistinctCountHLLPlusAggregationFunction _distinctCountHLLPlusAggregationFunction; + + public DistinctCountRawHLLPlusAggregationFunction(List<ExpressionContext> arguments) { + this(arguments.get(0), new DistinctCountHLLPlusAggregationFunction(arguments)); + } + + DistinctCountRawHLLPlusAggregationFunction(ExpressionContext expression, + DistinctCountHLLPlusAggregationFunction distinctCountHLLPlusAggregationFunction) { + super(expression); + _distinctCountHLLPlusAggregationFunction = distinctCountHLLPlusAggregationFunction; + } + + public DistinctCountHLLPlusAggregationFunction getDistinctCountHLLPlusAggregationFunction() { + return _distinctCountHLLPlusAggregationFunction; + } + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.DISTINCTCOUNTRAWHLLPLUS; + } + + @Override + public AggregationResultHolder createAggregationResultHolder() { + return _distinctCountHLLPlusAggregationFunction.createAggregationResultHolder(); + } + + @Override + public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) { + return _distinctCountHLLPlusAggregationFunction.createGroupByResultHolder(initialCapacity, maxCapacity); + } + + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + _distinctCountHLLPlusAggregationFunction.aggregate(length, aggregationResultHolder, blockValSetMap); + } + + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + _distinctCountHLLPlusAggregationFunction.aggregateGroupBySV(length, groupKeyArray, groupByResultHolder, + blockValSetMap); + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + _distinctCountHLLPlusAggregationFunction.aggregateGroupByMV(length, groupKeysArray, groupByResultHolder, + blockValSetMap); + } + + @Override + public HyperLogLogPlus extractAggregationResult(AggregationResultHolder aggregationResultHolder) { + return _distinctCountHLLPlusAggregationFunction.extractAggregationResult(aggregationResultHolder); + } + + @Override + public HyperLogLogPlus extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { + return _distinctCountHLLPlusAggregationFunction.extractGroupByResult(groupByResultHolder, groupKey); + } + + @Override + public HyperLogLogPlus merge(HyperLogLogPlus intermediateResult1, HyperLogLogPlus intermediateResult2) { + return _distinctCountHLLPlusAggregationFunction.merge(intermediateResult1, intermediateResult2); + } + + @Override + public ColumnDataType getIntermediateResultColumnType() { + return _distinctCountHLLPlusAggregationFunction.getIntermediateResultColumnType(); + } + + @Override + public ColumnDataType getFinalResultColumnType() { + return ColumnDataType.STRING; + } + + @Override + public SerializedHLLPlus extractFinalResult(HyperLogLogPlus intermediateResult) { + return new SerializedHLLPlus(intermediateResult); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLPlusMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLPlusMVAggregationFunction.java new file mode 100644 index 0000000000..6ae2d04996 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLPlusMVAggregationFunction.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import java.util.List; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.segment.spi.AggregationFunctionType; + + +public class DistinctCountRawHLLPlusMVAggregationFunction extends DistinctCountRawHLLPlusAggregationFunction { + + public DistinctCountRawHLLPlusMVAggregationFunction(List<ExpressionContext> arguments) { + super(arguments.get(0), new DistinctCountHLLPlusMVAggregationFunction(arguments)); + } + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.DISTINCTCOUNTRAWHLLPLUSMV; + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java index de8abca960..0caf40536b 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java @@ -172,6 +172,18 @@ public class AggregationFunctionFactoryTest { assertEquals(aggregationFunction.getType(), AggregationFunctionType.DISTINCTCOUNTRAWHLL); assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + function = getFunction("DiStInCtCoUnThLlPlUs"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, false); + assertTrue(aggregationFunction instanceof DistinctCountHLLPlusAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.DISTINCTCOUNTHLLPLUS); + assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + + function = getFunction("DiStInCtCoUnTrAwHlLpLuS"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, false); + assertTrue(aggregationFunction instanceof DistinctCountRawHLLPlusAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.DISTINCTCOUNTRAWHLLPLUS); + assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + function = getFunction("FaStHlL"); aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, false); assertTrue(aggregationFunction instanceof FastHLLAggregationFunction); @@ -358,6 +370,18 @@ public class AggregationFunctionFactoryTest { assertEquals(aggregationFunction.getType(), AggregationFunctionType.DISTINCTCOUNTRAWHLLMV); assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + function = getFunction("DiStInCt_CoUnT_hLl_PlUs_Mv"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, false); + assertTrue(aggregationFunction instanceof DistinctCountHLLPlusMVAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.DISTINCTCOUNTHLLPLUSMV); + assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + + function = getFunction("DiStInCtCoUnTrAwHlLpLuS_mV"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, false); + assertTrue(aggregationFunction instanceof DistinctCountRawHLLPlusMVAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.DISTINCTCOUNTRAWHLLPLUSMV); + assertEquals(aggregationFunction.getResultColumnName(), function.toString()); + function = getFunction("PeRcEnTiLe10Mv"); aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, false); assertTrue(aggregationFunction instanceof PercentileMVAggregationFunction); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java index 33c32db70a..f974aa8542 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java @@ -408,6 +408,39 @@ public class InterSegmentAggregationMultiValueQueriesTest extends BaseMultiValue QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 124960L, 400000L, expectedResultTable); } + @Test + public void testDistinctCountHLLPlusMV() { + String query = "SELECT DISTINCTCOUNTHLLPLUSMV(column6) AS value FROM testTable"; + + // Without filter, query should be answered by NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0) + // for dictionary based columns. + BrokerResponseNative brokerResponse = getBrokerResponse(query); + DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new ColumnDataType[]{ColumnDataType.LONG}); + Object[] expectedResults = new Object[]{18651L}; + ResultTable expectedResultTable = new ResultTable(expectedDataSchema, Collections.singletonList(expectedResults)); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 0L, 400000L, expectedResultTable); + + brokerResponse = getBrokerResponse(query + FILTER); + expectedResults[0] = 1176L; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 62480L, 400000L, expectedResultTable); + + brokerResponse = getBrokerResponse(query + SV_GROUP_BY); + expectedResults[0] = 4796L; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 800000L, 400000L, expectedResultTable); + + brokerResponse = getBrokerResponse(query + FILTER + SV_GROUP_BY); + expectedResults[0] = 1176L; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 124960L, 400000L, expectedResultTable); + + brokerResponse = getBrokerResponse(query + MV_GROUP_BY); + expectedResults[0] = 3457L; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 800000L, 400000L, expectedResultTable); + + brokerResponse = getBrokerResponse(query + FILTER + MV_GROUP_BY); + expectedResults[0] = 579L; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 124960L, 400000L, expectedResultTable); + } + @Test public void testDistinctCountRawHLLMV() { String query = "SELECT DISTINCTCOUNTRAWHLLMV(column6) AS value FROM testTable"; @@ -449,6 +482,48 @@ public class InterSegmentAggregationMultiValueQueriesTest extends BaseMultiValue cardinalityExtractor); } + @Test + public void testDistinctCountRawHLLPLUSMV() { + String query = "SELECT DISTINCTCOUNTRAWHLLPLUSMV(column6) AS value FROM testTable"; + Function<Object, Object> cardinalityExtractor = + value -> ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(BytesUtils.toBytes((String) value)) + .cardinality(); + + // Without filter, query should be answered by NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0) + // for dictionary based columns. + BrokerResponseNative brokerResponse = getBrokerResponse(query); + DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new ColumnDataType[]{ColumnDataType.LONG}); + Object[] expectedResults = new Object[]{18651L}; + ResultTable expectedResultTable = new ResultTable(expectedDataSchema, Collections.singletonList(expectedResults)); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 0L, 400000L, expectedResultTable, + cardinalityExtractor); + + brokerResponse = getBrokerResponse(query + FILTER); + expectedResults[0] = 1176L; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 62480L, 400000L, expectedResultTable, + cardinalityExtractor); + + brokerResponse = getBrokerResponse(query + SV_GROUP_BY); + expectedResults[0] = 4796L; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 800000L, 400000L, expectedResultTable, + cardinalityExtractor); + + brokerResponse = getBrokerResponse(query + FILTER + SV_GROUP_BY); + expectedResults[0] = 1176L; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 124960L, 400000L, expectedResultTable, + cardinalityExtractor); + + brokerResponse = getBrokerResponse(query + MV_GROUP_BY); + expectedResults[0] = 3457L; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 800000L, 400000L, expectedResultTable, + cardinalityExtractor); + + brokerResponse = getBrokerResponse(query + FILTER + MV_GROUP_BY); + expectedResults[0] = 579L; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 455552L, 124960L, 400000L, expectedResultTable, + cardinalityExtractor); + } + @Test public void testPercentileMV() { List<String> queries = Arrays.asList("SELECT PERCENTILE50MV(column6) AS value FROM testTable", diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java index c9f0c444af..591b5ffffa 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java @@ -350,6 +350,39 @@ public class InterSegmentAggregationMultiValueRawQueriesTest extends BaseMultiVa QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 519028L, 124960L, 400000L, expectedResultTable); } + @Test + public void testDistinctCountHLLPLUSMV() { + String query = "SELECT DISTINCTCOUNTHLLPLUSMV(column6) AS value FROM testTable"; + + // Without filter, query should be answered by DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0) + BrokerResponseNative brokerResponse = getBrokerResponse(query); + DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new DataSchema.ColumnDataType[] + {DataSchema.ColumnDataType.LONG}); + Object[] expectedResults = new Object[]{18651L}; + ResultTable expectedResultTable = new ResultTable(expectedDataSchema, Collections.singletonList(expectedResults)); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 400000L, 400000L, expectedResultTable); + + brokerResponse = getBrokerResponse(query + FILTER); + expectedResults[0] = 1176L; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 519028L, 62480L, 400000L, expectedResultTable); + + brokerResponse = getBrokerResponse(query + SV_GROUP_BY); + expectedResults[0] = 4796L; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 800000L, 400000L, expectedResultTable); + + brokerResponse = getBrokerResponse(query + FILTER + SV_GROUP_BY); + expectedResults[0] = 1176L; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 519028L, 124960L, 400000L, expectedResultTable); + + brokerResponse = getBrokerResponse(query + MV_GROUP_BY); + expectedResults[0] = 3457L; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 800000L, 400000L, expectedResultTable); + + brokerResponse = getBrokerResponse(query + FILTER + MV_GROUP_BY); + expectedResults[0] = 579L; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 519028L, 124960L, 400000L, expectedResultTable); + } + @Test public void testDistinctCountRawHLLMV() { String query = "SELECT DISTINCTCOUNTRAWHLLMV(column6) AS value FROM testTable"; @@ -391,6 +424,48 @@ public class InterSegmentAggregationMultiValueRawQueriesTest extends BaseMultiVa cardinalityExtractor); } + @Test + public void testDistinctCountRawHLLPLUSMV() { + String query = "SELECT DISTINCTCOUNTRAWHLLPLUSMV(column6) AS value FROM testTable"; + Function<Object, Object> cardinalityExtractor = + value -> ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(BytesUtils.toBytes((String) value)) + .cardinality(); + + // Without filter, query should be answered by DictionaryBasedAggregationOperator (numEntriesScannedPostFilter = 0) + BrokerResponseNative brokerResponse = getBrokerResponse(query); + DataSchema expectedDataSchema = new DataSchema(new String[]{"value"}, new DataSchema.ColumnDataType[] + {DataSchema.ColumnDataType.LONG}); + Object[] expectedResults = new Object[]{18651L}; + ResultTable expectedResultTable = new ResultTable(expectedDataSchema, Collections.singletonList(expectedResults)); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 400000L, 400000L, expectedResultTable, + cardinalityExtractor); + + brokerResponse = getBrokerResponse(query + FILTER); + expectedResults[0] = 1176L; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 519028L, 62480L, 400000L, expectedResultTable, + cardinalityExtractor); + + brokerResponse = getBrokerResponse(query + SV_GROUP_BY); + expectedResults[0] = 4796L; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 800000L, 400000L, expectedResultTable, + cardinalityExtractor); + + brokerResponse = getBrokerResponse(query + FILTER + SV_GROUP_BY); + expectedResults[0] = 1176L; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 519028L, 124960L, 400000L, expectedResultTable, + cardinalityExtractor); + + brokerResponse = getBrokerResponse(query + MV_GROUP_BY); + expectedResults[0] = 3457L; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 400000L, 0L, 800000L, 400000L, expectedResultTable, + cardinalityExtractor); + + brokerResponse = getBrokerResponse(query + FILTER + MV_GROUP_BY); + expectedResults[0] = 579L; + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 62480L, 519028L, 124960L, 400000L, expectedResultTable, + cardinalityExtractor); + } + @Test public void testPercentileMV() { List<String> queries = Arrays.asList("SELECT PERCENTILE50MV(column6) AS value FROM testTable", diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java index a6e8320962..d852a8c626 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java @@ -283,6 +283,32 @@ public class InterSegmentAggregationSingleValueQueriesTest extends BaseSingleVal QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L, 73548L, 120000L, expectedResultTable); } + @Test + public void testDistinctCountHLLPlus() { + String query = "SELECT DISTINCTCOUNTHLLPLUS(column1) AS v1, DISTINCTCOUNTHLLPLUS(column3) AS v2 FROM testTable"; + + // Without filter, query should be answered by NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0) + // for dictionary based columns. + BrokerResponseNative brokerResponse = getBrokerResponse(query); + DataSchema expectedDataSchema = + new DataSchema(new String[]{"v1", "v2"}, new ColumnDataType[]{ColumnDataType.LONG, ColumnDataType.LONG}); + ResultTable expectedResultTable = + new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{6595L, 21822L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L, 0L, 120000L, expectedResultTable); + + brokerResponse = getBrokerResponse(query + FILTER); + expectedResultTable = new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{1885L, 4545L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L, 49032L, 120000L, expectedResultTable); + + brokerResponse = getBrokerResponse(query + GROUP_BY); + expectedResultTable = new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{3495L, 12022L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L, 360000L, 120000L, expectedResultTable); + + brokerResponse = getBrokerResponse(query + FILTER + GROUP_BY); + expectedResultTable = new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{1273L, 3284L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L, 73548L, 120000L, expectedResultTable); + } + @Test public void testDistinctCountRawHLL() { String query = "SELECT DISTINCTCOUNTRAWHLL(column1) AS v1, DISTINCTCOUNTRAWHLL(column3) AS v2 FROM testTable"; @@ -315,6 +341,40 @@ public class InterSegmentAggregationSingleValueQueriesTest extends BaseSingleVal cardinalityExtractor); } + @Test + public void testDistinctCountRawHLLPlus() { + String query = + "SELECT DISTINCTCOUNTRAWHLLPLUS(column1) AS v1, DISTINCTCOUNTRAWHLLPLUS(column3) AS v2 FROM testTable"; + Function<Object, Object> cardinalityExtractor = + value -> ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(BytesUtils.toBytes((String) value)) + .cardinality(); + + // Without filter, query should be answered by NonScanBasedAggregationOperator (numEntriesScannedPostFilter = 0) + // for dictionary based columns. + BrokerResponseNative brokerResponse = getBrokerResponse(query); + DataSchema expectedDataSchema = + new DataSchema(new String[]{"v1", "v2"}, new ColumnDataType[]{ColumnDataType.LONG, ColumnDataType.LONG}); + ResultTable expectedResultTable = + new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{6595L, 21822L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L, 0L, 120000L, expectedResultTable, + cardinalityExtractor); + + brokerResponse = getBrokerResponse(query + FILTER); + expectedResultTable = new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{1885L, 4545L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L, 49032L, 120000L, expectedResultTable, + cardinalityExtractor); + + brokerResponse = getBrokerResponse(query + GROUP_BY); + expectedResultTable = new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{3495L, 12022L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L, 360000L, 120000L, expectedResultTable, + cardinalityExtractor); + + brokerResponse = getBrokerResponse(query + FILTER + GROUP_BY); + expectedResultTable = new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{1273L, 3284L})); + QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L, 73548L, 120000L, expectedResultTable, + cardinalityExtractor); + } + @Test public void testPercentile() { List<String> queries = diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java index bba665f9e8..17bd6494a7 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java @@ -19,6 +19,7 @@ package org.apache.pinot.queries; import com.clearspring.analytics.stream.cardinality.HyperLogLog; +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; import com.tdunning.math.stats.MergingDigest; import com.tdunning.math.stats.TDigest; import java.io.File; @@ -82,6 +83,8 @@ public class SerializedBytesQueriesTest extends BaseQueriesTest { private static final String DISTINCT_COUNT_HLL_COLUMN = "distinctCountHLLColumn"; // Use non-default log2m private static final int DISTINCT_COUNT_HLL_LOG2M = 9; + private static final String DISTINCT_COUNT_HLL_PLUS_COLUMN = "distinctCountHLLPlusColumn"; + private static final int DISTINCT_COUNT_HLL_PLUS_P = 14; private static final String MIN_MAX_RANGE_COLUMN = "minMaxRangeColumn"; private static final String PERCENTILE_EST_COLUMN = "percentileEstColumn"; // Use non-default max error @@ -101,6 +104,7 @@ public class SerializedBytesQueriesTest extends BaseQueriesTest { private final int[][] _valuesArray = new int[NUM_ROWS][MAX_NUM_VALUES_TO_PRE_AGGREGATE]; private final AvgPair[] _avgPairs = new AvgPair[NUM_ROWS]; private final HyperLogLog[] _hyperLogLogs = new HyperLogLog[NUM_ROWS]; + private final HyperLogLogPlus[] _hyperLogLogPluses = new HyperLogLogPlus[NUM_ROWS]; private final MinMaxRangePair[] _minMaxRangePairs = new MinMaxRangePair[NUM_ROWS]; private final QuantileDigest[] _quantileDigests = new QuantileDigest[NUM_ROWS]; private final TDigest[] _tDigests = new TDigest[NUM_ROWS]; @@ -193,6 +197,14 @@ public class SerializedBytesQueriesTest extends BaseQueriesTest { _tDigests[i] = tDigest; valueMap.put(PERCENTILE_TDIGEST_COLUMN, ObjectSerDeUtils.TDIGEST_SER_DE.serialize(tDigest)); + HyperLogLogPlus hyperLogLogPlus = new HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P); + for (int value : values) { + hyperLogLogPlus.offer(value); + } + _hyperLogLogPluses[i] = hyperLogLogPlus; + valueMap.put(DISTINCT_COUNT_HLL_PLUS_COLUMN, + ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(hyperLogLogPlus)); + GenericRow genericRow = new GenericRow(); genericRow.init(valueMap); rows.add(genericRow); @@ -201,8 +213,9 @@ public class SerializedBytesQueriesTest extends BaseQueriesTest { Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME) .addSingleValueDimension(GROUP_BY_SV_COLUMN, DataType.STRING) .addMultiValueDimension(GROUP_BY_MV_COLUMN, DataType.STRING).addMetric(AVG_COLUMN, DataType.BYTES) - .addMetric(DISTINCT_COUNT_HLL_COLUMN, DataType.BYTES).addMetric(MIN_MAX_RANGE_COLUMN, DataType.BYTES) - .addMetric(PERCENTILE_EST_COLUMN, DataType.BYTES).addMetric(PERCENTILE_TDIGEST_COLUMN, DataType.BYTES).build(); + .addMetric(DISTINCT_COUNT_HLL_COLUMN, DataType.BYTES).addMetric(DISTINCT_COUNT_HLL_PLUS_COLUMN, DataType.BYTES) + .addMetric(MIN_MAX_RANGE_COLUMN, DataType.BYTES).addMetric(PERCENTILE_EST_COLUMN, DataType.BYTES) + .addMetric(PERCENTILE_TDIGEST_COLUMN, DataType.BYTES).build(); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); @@ -224,7 +237,7 @@ public class SerializedBytesQueriesTest extends BaseQueriesTest { AggregationOperator aggregationOperator = getOperator(getAggregationQuery()); List<Object> aggregationResult = aggregationOperator.nextBlock().getResults(); assertNotNull(aggregationResult); - assertEquals(aggregationResult.size(), 5); + assertEquals(aggregationResult.size(), 6); // Avg AvgPair avgPair = (AvgPair) aggregationResult.get(0); @@ -277,13 +290,24 @@ public class SerializedBytesQueriesTest extends BaseQueriesTest { expectedTDigest.add(_tDigests[i]); } assertEquals(tDigest.quantile(0.5), expectedTDigest.quantile(0.5), PERCENTILE_TDIGEST_DELTA); + + // DistinctCountHLLPlus + HyperLogLogPlus hyperLogLogPlus = (HyperLogLogPlus) aggregationResult.get(5); + HyperLogLogPlus expectedHyperLogLogPlus = new HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P); + for (int value : _valuesArray[0]) { + expectedHyperLogLogPlus.offer(value); + } + for (int i = 1; i < NUM_ROWS; i++) { + expectedHyperLogLogPlus.addAll(_hyperLogLogPluses[i]); + } + assertEquals(hyperLogLogPlus.cardinality(), expectedHyperLogLogPlus.cardinality()); } @Test public void testInterSegmentsAggregation() throws Exception { Object[] aggregationResults = getBrokerResponse(getAggregationQuery()).getResultTable().getRows().get(0); - assertEquals(aggregationResults.length, 5); + assertEquals(aggregationResults.length, 6); // Simulate the process of server side merge and broker side merge @@ -372,11 +396,31 @@ public class SerializedBytesQueriesTest extends BaseQueriesTest { tDigest1.add(tDigest2); double expectedPercentileTDigestResult = tDigest1.quantile(0.5); + // DistinctCountHLLPlus + HyperLogLogPlus hyperLogLogPlus1 = new HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P); + HyperLogLogPlus hyperLogLogPlus2 = new HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P); + for (int value : _valuesArray[0]) { + hyperLogLogPlus1.offer(value); + hyperLogLogPlus2.offer(value); + } + for (int i = 1; i < NUM_ROWS; i++) { + hyperLogLogPlus1.addAll(_hyperLogLogPluses[i]); + hyperLogLogPlus2.addAll(_hyperLogLogPluses[i]); + } + hyperLogLogPlus1.addAll(hyperLogLogPlus2); + hyperLogLogPlus1 = ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize( + ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(hyperLogLogPlus1)); + hyperLogLogPlus2 = ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize( + ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(hyperLogLogPlus1)); + hyperLogLogPlus1.addAll(hyperLogLogPlus2); + long expectedDistinctCountHllPlusResult = hyperLogLogPlus1.cardinality(); + assertEquals((Double) aggregationResults[0], expectedAvgResult, 1e-5); assertEquals((long) aggregationResults[1], expectedDistinctCountHllResult); assertEquals((Double) aggregationResults[2], expectedMinMaxRangeResult, 1e-5); assertEquals((long) aggregationResults[3], expectedPercentileEstResult); assertEquals((Double) aggregationResults[4], expectedPercentileTDigestResult, PERCENTILE_TDIGEST_DELTA); + assertEquals((long) aggregationResults[5], expectedDistinctCountHllPlusResult); } @Test @@ -442,6 +486,17 @@ public class SerializedBytesQueriesTest extends BaseQueriesTest { expectedTDigest.add(_tDigests[i]); } assertEquals(tDigest.quantile(0.5), expectedTDigest.quantile(0.5), PERCENTILE_TDIGEST_DELTA); + + // DistinctCountHLLPlus + HyperLogLogPlus hyperLogLogPlus = (HyperLogLogPlus) groupByResult.getResultForGroupId(5, groupKey._groupId); + HyperLogLogPlus expectedHyperLogLogPlus = new HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P); + for (int value : _valuesArray[groupId]) { + expectedHyperLogLogPlus.offer(value); + } + for (int i = groupId + NUM_GROUPS; i < NUM_ROWS; i += NUM_GROUPS) { + expectedHyperLogLogPlus.addAll(_hyperLogLogPluses[i]); + } + assertEquals(hyperLogLogPlus.cardinality(), expectedHyperLogLogPlus.cardinality()); } } @@ -539,12 +594,32 @@ public class SerializedBytesQueriesTest extends BaseQueriesTest { tDigest1.add(tDigest2); double expectedPercentileTDigestResult = tDigest1.quantile(0.5); + // DistinctCountHLLPlus + HyperLogLogPlus hyperLogLogPlus1 = new HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P); + HyperLogLogPlus hyperLogLogPlus2 = new HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P); + for (int value : _valuesArray[groupId]) { + hyperLogLogPlus1.offer(value); + hyperLogLogPlus2.offer(value); + } + for (int i = groupId + NUM_GROUPS; i < NUM_ROWS; i += NUM_GROUPS) { + hyperLogLogPlus1.addAll(_hyperLogLogPluses[i]); + hyperLogLogPlus2.addAll(_hyperLogLogPluses[i]); + } + hyperLogLogPlus1.addAll(hyperLogLogPlus2); + hyperLogLogPlus1 = ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize( + ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(hyperLogLogPlus1)); + hyperLogLogPlus2 = ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize( + ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(hyperLogLogPlus1)); + hyperLogLogPlus1.addAll(hyperLogLogPlus2); + long expectedDistinctCountHllPlusResult = hyperLogLogPlus1.cardinality(); + Object[] row = rows.get(groupId); assertEquals((Double) row[0], expectedAvgResult, 1e-5); assertEquals((long) row[1], expectedDistinctCountHllResult); assertEquals((Double) row[2], expectedMinMaxRangeResult, 1e-5); assertEquals((long) row[3], expectedPercentileEstResult); assertEquals((Double) row[4], expectedPercentileTDigestResult, PERCENTILE_TDIGEST_DELTA); + assertEquals((long) row[5], expectedDistinctCountHllPlusResult); } } @@ -595,6 +670,15 @@ public class SerializedBytesQueriesTest extends BaseQueriesTest { expectedTDigest.add(_tDigests[i]); } + // DistinctCountHLL + HyperLogLogPlus expectedHyperLogLogPlus = new HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P); + for (int value : _valuesArray[0]) { + expectedHyperLogLogPlus.offer(value); + } + for (int i = 1; i < NUM_ROWS; i++) { + expectedHyperLogLogPlus.addAll(_hyperLogLogPluses[i]); + } + Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = groupByResult.getGroupKeyIterator(); while (groupKeyIterator.hasNext()) { GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next(); @@ -620,6 +704,10 @@ public class SerializedBytesQueriesTest extends BaseQueriesTest { // PercentileTDigest TDigest tDigest = (TDigest) groupByResult.getResultForGroupId(4, groupKey._groupId); assertEquals(tDigest.quantile(0.5), expectedTDigest.quantile(0.5), PERCENTILE_TDIGEST_DELTA); + + // DistinctCountHLLPlus + HyperLogLogPlus hyperLogLogPlus = (HyperLogLogPlus) groupByResult.getResultForGroupId(5, groupKey._groupId); + assertEquals(hyperLogLogPlus.cardinality(), expectedHyperLogLogPlus.cardinality()); } } @@ -716,20 +804,41 @@ public class SerializedBytesQueriesTest extends BaseQueriesTest { tDigest1.add(tDigest2); double expectedPercentileTDigestResult = tDigest1.quantile(0.5); + // DistinctCountHLL + HyperLogLogPlus hyperLogLogPlus1 = new HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P); + HyperLogLogPlus hyperLogLogPlus2 = new HyperLogLogPlus(DISTINCT_COUNT_HLL_PLUS_P); + for (int value : _valuesArray[0]) { + hyperLogLogPlus1.offer(value); + hyperLogLogPlus2.offer(value); + } + for (int i = 1; i < NUM_ROWS; i++) { + hyperLogLogPlus1.addAll(_hyperLogLogPluses[i]); + hyperLogLogPlus2.addAll(_hyperLogLogPluses[i]); + } + hyperLogLogPlus1.addAll(hyperLogLogPlus2); + hyperLogLogPlus1 = ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize( + ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(hyperLogLogPlus1)); + hyperLogLogPlus2 = ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize( + ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(hyperLogLogPlus1)); + hyperLogLogPlus1.addAll(hyperLogLogPlus2); + long expectedDistinctCountHllPlusResult = hyperLogLogPlus1.cardinality(); + for (Object[] row : rows) { assertEquals((Double) row[0], expectedAvgResult, 1e-5); assertEquals((long) row[1], expectedDistinctCountHllResult); assertEquals((Double) row[2], expectedMinMaxRangeResult, 1e-5); assertEquals((long) row[3], expectedPercentileEstResult); assertEquals((Double) row[4], expectedPercentileTDigestResult, PERCENTILE_TDIGEST_DELTA); + assertEquals((long) row[5], expectedDistinctCountHllPlusResult); } } private String getAggregationQuery() { return String.format( - "SELECT AVG(%s), DISTINCTCOUNTHLL(%s), MINMAXRANGE(%s), PERCENTILEEST50(%s), PERCENTILETDIGEST50(%s) FROM %s", + "SELECT AVG(%s), DISTINCTCOUNTHLL(%s), MINMAXRANGE(%s), PERCENTILEEST50(%s), PERCENTILETDIGEST50(%s), " + + "DISTINCTCOUNTHLLPLUS(%s) FROM %s", AVG_COLUMN, DISTINCT_COUNT_HLL_COLUMN, MIN_MAX_RANGE_COLUMN, PERCENTILE_EST_COLUMN, PERCENTILE_TDIGEST_COLUMN, - RAW_TABLE_NAME); + DISTINCT_COUNT_HLL_PLUS_COLUMN, RAW_TABLE_NAME); } private String getGroupBySVQuery() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index df045e1d5b..d9d645c065 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -2632,7 +2632,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet // The Accurate value is 6538. query = "SELECT distinctCount(FlightNum) FROM mytable "; assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), 6538); - assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), 6538); // Expected distinctCountHll with different log2m value from 2 to 19. The Accurate value is 6538. long[] expectedResults = new long[]{ @@ -2642,7 +2641,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet for (int i = 2; i < 20; i++) { query = String.format("SELECT distinctCountHLL(FlightNum, %d) FROM mytable ", i); assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), expectedResults[i - 2]); - assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), expectedResults[i - 2]); } // Default log2m for HLL is set to 12 in V1 and 8 in V2 @@ -2654,7 +2652,31 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet expectedDefault = expectedResults[10]; } assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), expectedDefault); - assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), expectedDefault); + } + + @Test(dataProvider = "useBothQueryEngines") + public void testDistinctCountHllPlus(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query; + + // The Accurate value is 6538. + query = "SELECT distinctCount(FlightNum) FROM mytable "; + assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), 6538); + + // Expected distinctCountHllPlus with different P value from 4 (minimal value) to 19. The Accurate value is 6538. + long[] expectedResults = new long[]{ + 4901, 5755, 6207, 5651, 6318, 6671, 6559, 6425, 6490, 6486, 6489, 6516, 6532, 6526, 6525, 6534 + }; + + for (int i = 4; i < 20; i++) { + query = String.format("SELECT distinctCountHLLPlus(FlightNum, %d) FROM mytable ", i); + assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), expectedResults[i - 4]); + } + + // Default HLL Plus is set as p=14 + query = "SELECT distinctCountHLLPlus(FlightNum) FROM mytable "; + assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), expectedResults[10]); } @Test diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLPlusValueAggregator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLPlusValueAggregator.java new file mode 100644 index 0000000000..66e38cd151 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLPlusValueAggregator.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.aggregator; + +import com.clearspring.analytics.stream.cardinality.CardinalityMergeException; +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; +import java.util.List; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.segment.local.utils.CustomSerDeUtils; +import org.apache.pinot.segment.local.utils.HyperLogLogPlusUtils; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.utils.CommonConstants; + + +public class DistinctCountHLLPlusValueAggregator implements ValueAggregator<Object, HyperLogLogPlus> { + public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES; + + private final int _p; + private final int _sp; + + // Byte size won't change once we get the initial aggregated value + private int _maxByteSize; + + public DistinctCountHLLPlusValueAggregator(List<ExpressionContext> arguments) { + // length 1 means we use the default _p and _sp + if (arguments.size() == 2) { + _p = arguments.get(1).getLiteral().getIntValue(); + _sp = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_SP; + } else if (arguments.size() == 3) { + _p = arguments.get(1).getLiteral().getIntValue(); + _sp = arguments.get(2).getLiteral().getIntValue(); + } else { + _p = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_P; + _sp = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_SP; + } + } + + @Override + public AggregationFunctionType getAggregationType() { + return AggregationFunctionType.DISTINCTCOUNTHLL; + } + + @Override + public DataType getAggregatedValueType() { + return AGGREGATED_VALUE_TYPE; + } + + @Override + public HyperLogLogPlus getInitialAggregatedValue(Object rawValue) { + HyperLogLogPlus initialValue; + if (rawValue instanceof byte[]) { + byte[] bytes = (byte[]) rawValue; + initialValue = deserializeAggregatedValue(bytes); + _maxByteSize = bytes.length; + } else { + initialValue = new HyperLogLogPlus(_p, _sp); + initialValue.offer(rawValue); + _maxByteSize = HyperLogLogPlusUtils.byteSize(_p, _sp); + } + return initialValue; + } + + @Override + public HyperLogLogPlus applyRawValue(HyperLogLogPlus value, Object rawValue) { + if (rawValue instanceof byte[]) { + try { + value.addAll(deserializeAggregatedValue((byte[]) rawValue)); + } catch (CardinalityMergeException e) { + throw new RuntimeException(e); + } + } else { + value.offer(rawValue); + } + return value; + } + + @Override + public HyperLogLogPlus applyAggregatedValue(HyperLogLogPlus value, HyperLogLogPlus aggregatedValue) { + try { + value.addAll(aggregatedValue); + return value; + } catch (CardinalityMergeException e) { + throw new RuntimeException(e); + } + } + + @Override + public HyperLogLogPlus cloneAggregatedValue(HyperLogLogPlus value) { + return deserializeAggregatedValue(serializeAggregatedValue(value)); + } + + @Override + public int getMaxAggregatedValueByteSize() { + // NOTE: For aggregated metrics, initial aggregated value might have not been generated. Returns the byte size + // based on p and sp. + return _maxByteSize > 0 ? _maxByteSize : HyperLogLogPlusUtils.byteSize(_p, _sp); + } + + @Override + public byte[] serializeAggregatedValue(HyperLogLogPlus value) { + return CustomSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(value); + } + + @Override + public HyperLogLogPlus deserializeAggregatedValue(byte[] bytes) { + return CustomSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(bytes); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java index b348b1ff4c..16dc0328f7 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java @@ -70,6 +70,9 @@ public class ValueAggregatorFactory { case DISTINCTCOUNTTHETASKETCH: case DISTINCTCOUNTRAWTHETASKETCH: return new DistinctCountThetaSketchValueAggregator(); + case DISTINCTCOUNTHLLPLUS: + case DISTINCTCOUNTRAWHLLPLUS: + return new DistinctCountHLLPlusValueAggregator(arguments); case DISTINCTCOUNTTUPLESKETCH: case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH: case AVGVALUEINTEGERSUMTUPLESKETCH: @@ -116,6 +119,9 @@ public class ValueAggregatorFactory { case DISTINCTCOUNTTHETASKETCH: case DISTINCTCOUNTRAWTHETASKETCH: return DistinctCountThetaSketchValueAggregator.AGGREGATED_VALUE_TYPE; + case DISTINCTCOUNTHLLPLUS: + case DISTINCTCOUNTRAWHLLPLUS: + return DistinctCountHLLPlusValueAggregator.AGGREGATED_VALUE_TYPE; case DISTINCTCOUNTTUPLESKETCH: case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH: case AVGVALUEINTEGERSUMTUPLESKETCH: diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedHLLPlus.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedHLLPlus.java new file mode 100644 index 0000000000..bee3e7884e --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedHLLPlus.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.customobject; + +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; +import org.apache.pinot.segment.local.utils.CustomSerDeUtils; +import org.apache.pinot.spi.utils.BytesUtils; + + +public class SerializedHLLPlus implements Comparable<SerializedHLLPlus> { + private final HyperLogLogPlus _hyperLogLogPlus; + + public SerializedHLLPlus(HyperLogLogPlus hyperLogLogPlus) { + _hyperLogLogPlus = hyperLogLogPlus; + } + + @Override + public int compareTo(SerializedHLLPlus other) { + return Long.compare(_hyperLogLogPlus.cardinality(), other._hyperLogLogPlus.cardinality()); + } + + @Override + public String toString() { + return BytesUtils.toHexString(CustomSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.serialize(_hyperLogLogPlus)); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java index 1ed3a3e341..f5a6275a3b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java @@ -19,6 +19,7 @@ package org.apache.pinot.segment.local.utils; import com.clearspring.analytics.stream.cardinality.HyperLogLog; +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; import com.google.common.primitives.Longs; import com.tdunning.math.stats.MergingDigest; import com.tdunning.math.stats.TDigest; @@ -188,6 +189,34 @@ public class CustomSerDeUtils { } }; + public static final ObjectSerDe<HyperLogLogPlus> HYPER_LOG_LOG_PLUS_SER_DE = new ObjectSerDe<HyperLogLogPlus>() { + + @Override + public byte[] serialize(HyperLogLogPlus hyperLogLogPlus) { + try { + return hyperLogLogPlus.getBytes(); + } catch (IOException e) { + throw new RuntimeException("Caught exception while serializing HyperLogLogPlus", e); + } + } + + @Override + public HyperLogLogPlus deserialize(byte[] bytes) { + try { + return HyperLogLogPlus.Builder.build(bytes); + } catch (IOException e) { + throw new RuntimeException("Caught exception while de-serializing HyperLogLogPlus", e); + } + } + + @Override + public HyperLogLogPlus deserialize(ByteBuffer byteBuffer) { + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return deserialize(bytes); + } + }; + public static final ObjectSerDe<TDigest> TDIGEST_SER_DE = new ObjectSerDe<TDigest>() { @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/HyperLogLogPlusUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/HyperLogLogPlusUtils.java new file mode 100644 index 0000000000..8614961511 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/HyperLogLogPlusUtils.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils; + +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; + + +public class HyperLogLogPlusUtils { + private HyperLogLogPlusUtils() { + } + + /** + * Returns the byte size of the given HyperLogLog. + */ + public static int byteSize(HyperLogLogPlus value) { + // 8 bytes header (p, sp, & register set size) & register set data + return value.sizeof() + 3 * Integer.BYTES; + } + + /** + * Returns the byte size of HyperLogLogPlus of a given p and sp. + */ + public static int byteSize(int p, int sp) { + // 8 bytes header (p & sp) & register set data + return new HyperLogLogPlus(p, sp).sizeof() + 3 * Integer.BYTES; + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java index 4ac3b32af9..33313366a6 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java @@ -141,6 +141,15 @@ public enum AggregationFunctionType { PERCENTILERAWKLL("percentileRawKLL", null, SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.family(ImmutableList.of(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC)), ReturnTypes.VARCHAR_2000, ReturnTypes.explicit(SqlTypeName.OTHER)), + // hyper log log plus plus functions + DISTINCTCOUNTHLLPLUS("distinctCountHLLPlus", ImmutableList.of("DISTINCT_COUNT_HLL_PLUS"), SqlKind.OTHER_FUNCTION, + SqlFunctionCategory.USER_DEFINED_FUNCTION, + OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC), ordinal -> ordinal > 0), + ReturnTypes.BIGINT, ReturnTypes.explicit(SqlTypeName.OTHER)), + DISTINCTCOUNTRAWHLLPLUS("distinctCountRawHLLPlus", ImmutableList.of("DISTINCT_COUNT_RAW_HLL_PLUS"), + SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION, + OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, SqlTypeFamily.INTEGER), ordinal -> ordinal > 0), + ReturnTypes.VARCHAR_2000, ReturnTypes.explicit(SqlTypeName.OTHER)), // DEPRECATED in v2 @Deprecated @@ -248,6 +257,13 @@ public enum AggregationFunctionType { PERCENTILERAWKLLMV("percentileRawKLLMV", null, SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.family(ImmutableList.of(SqlTypeFamily.ARRAY, SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC), ordinal -> ordinal > 1 && ordinal < 4), ReturnTypes.VARCHAR_2000, ReturnTypes.explicit(SqlTypeName.OTHER)), + // hyper log log plus plus functions + DISTINCTCOUNTHLLPLUSMV("distinctCountHLLPlusMV", null, SqlKind.OTHER_FUNCTION, + SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.family(SqlTypeFamily.ARRAY), ReturnTypes.BIGINT, + ReturnTypes.explicit(SqlTypeName.OTHER)), + DISTINCTCOUNTRAWHLLPLUSMV("distinctCountRawHLLPlusMV", null, SqlKind.OTHER_FUNCTION, + SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.family(SqlTypeFamily.ARRAY), ReturnTypes.VARCHAR_2000, + ReturnTypes.explicit(SqlTypeName.OTHER)), // boolean aggregate functions BOOLAND("boolAnd", null, SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION, diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 91755df8fa..713bbb2fe3 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -92,6 +92,8 @@ public class CommonConstants { public static final String DEFAULT_HYPERLOGLOG_LOG2M_KEY = "default.hyperloglog.log2m"; public static final int DEFAULT_HYPERLOGLOG_LOG2M = 8; + public static final int DEFAULT_HYPERLOGLOG_PLUS_P = 14; + public static final int DEFAULT_HYPERLOGLOG_PLUS_SP = 0; // 2 to the power of 16, for tradeoffs see datasketches library documentation: // https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org