This is an automated email from the ASF dual-hosted git repository. mayanks pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ded7e8f5ed Integer Tuple Sketch support (#10427) ded7e8f5ed is described below commit ded7e8f5ed63dbf41fb1cdff2e6bc86672e496aa Author: Andi Miller <a...@andimiller.net> AuthorDate: Thu May 25 19:56:58 2023 +0100 Integer Tuple Sketch support (#10427) * Add support for Datasketches Integer Tuple Sketches This adds support for `BYTES` columns containing Tuple Sketches with Integer as the summary type. The added classes currently support `Sum` as the semigroup, but are generic so others can be added. Feature breakdown: 1. Add transform functions that can be used to create Integer Tuple Sketches during ingestion, eg. `toIntegerSumTupleSketch(colA, colbB, 16)` 2. Add Codecs that use the Datasketches serialization 3. Add aggregation functions: * `DISTINCT_COUNT_TUPLE_SKETCH` will just get the estimate for the number of unique keys, same as Theta or HLL * `DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and return the raw sketch * `SUM_VALUES_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and estimate the sum of the value side * `AVG_VALUES_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and estimate the average of the value side 4. Add `ValueAggregator<_, _>`s for use in `StarTree` indexes for all 4 above aggregations 5. Add `ValueAggregator`s for use in rollups for all 4 above aggregations * fix style * add test for sketch agg * fix mangled license headers * annotate types for old versions of java * Cache Tuple Union result so it's not recomputed * Improve null handling in Tuple aggregation functions * Cleanup in IntegerTupleSketchAggregationFunction's parameters * Make Theta and Tuple transform functions throw on unexpected key types * Clean up sum/avg implementations for Tuple Sketch values * Fix on Java 8 * Expand todo for tuple sketch aggregation function * add preconditions to tuple aggregation function * empty commit to re-trigger CI * empty commit to re-trigger CI again * fix merge * empty commit to re-trigger CI again * fix merge * fix merge again --- .../apache/pinot/core/common/ObjectSerDeUtils.java | 30 ++- .../core/function/scalar/SketchFunctions.java | 81 ++++++-- .../function/AggregationFunctionFactory.java | 10 + ...ValueIntegerTupleSketchAggregationFunction.java | 70 +++++++ ...CountIntegerTupleSketchAggregationFunction.java | 54 +++++ .../IntegerTupleSketchAggregationFunction.java | 222 +++++++++++++++++++++ ...aluesIntegerTupleSketchAggregationFunction.java | 64 ++++++ .../aggregator/IntegerTupleSketchAggregator.java | 42 ++++ .../aggregator/ValueAggregatorFactory.java | 6 + .../core/function/scalar/SketchFunctionsTest.java | 19 ++ ...ctCountIntegerSumTupleSketchStarTreeV2Test.java | 57 ++++++ .../IntegerTupleSketchValueAggregator.java | 99 +++++++++ .../local/aggregator/ValueAggregatorFactory.java | 11 + .../segment/local/utils/CustomSerDeUtils.java | 24 +++ .../IntegerTupleSketchValueAggregatorTest.java | 70 +++++++ .../pinot/segment/spi/AggregationFunctionType.java | 9 + .../apache/pinot/spi/utils/CommonConstants.java | 3 + 17 files changed, 856 insertions(+), 15 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java index 14af9bac95..d93ea17ee9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java @@ -60,6 +60,8 @@ import java.util.Set; import org.apache.datasketches.kll.KllDoublesSketch; import org.apache.datasketches.memory.Memory; import org.apache.datasketches.theta.Sketch; +import org.apache.datasketches.tuple.aninteger.IntegerSummary; +import org.apache.datasketches.tuple.aninteger.IntegerSummaryDeserializer; import org.apache.pinot.common.CustomObject; import org.apache.pinot.core.query.aggregation.utils.argminmax.ArgMinMaxObject; import org.apache.pinot.core.query.distinct.DistinctTable; @@ -131,7 +133,8 @@ public class ObjectSerDeUtils { VarianceTuple(33), PinotFourthMoment(34), ArgMinMaxObject(35), - KllDataSketch(36); + KllDataSketch(36), + IntegerTupleSketch(37); private final int _value; @@ -219,6 +222,8 @@ public class ObjectSerDeUtils { return ObjectType.VarianceTuple; } else if (value instanceof PinotFourthMoment) { return ObjectType.PinotFourthMoment; + } else if (value instanceof org.apache.datasketches.tuple.Sketch) { + return ObjectType.IntegerTupleSketch; } else if (value instanceof ArgMinMaxObject) { return ObjectType.ArgMinMaxObject; } else { @@ -926,6 +931,28 @@ public class ObjectSerDeUtils { } }; + public static final ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>> DATA_SKETCH_INT_TUPLE_SER_DE = + new ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>>() { + @Override + public byte[] serialize(org.apache.datasketches.tuple.Sketch<IntegerSummary> value) { + return value.compact().toByteArray(); + } + + @Override + public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(byte[] bytes) { + return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes), + new IntegerSummaryDeserializer()); + } + + @Override + public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(ByteBuffer byteBuffer) { + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes), + new IntegerSummaryDeserializer()); + } + }; + public static final ObjectSerDe<KllDoublesSketch> KLL_SKETCH_SER_DE = new ObjectSerDe<KllDoublesSketch>() { @Override @@ -1298,6 +1325,7 @@ public class ObjectSerDeUtils { PINOT_FOURTH_MOMENT_OBJECT_SER_DE, ARG_MIN_MAX_OBJECT_SER_DE, KLL_SKETCH_SER_DE, + DATA_SKETCH_INT_TUPLE_SER_DE, }; //@formatter:on diff --git a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java index 0c55880526..f6245bec6f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java @@ -23,6 +23,8 @@ import java.math.BigDecimal; import javax.annotation.Nullable; import org.apache.datasketches.theta.Sketches; import org.apache.datasketches.theta.UpdateSketch; +import org.apache.datasketches.tuple.aninteger.IntegerSketch; +import org.apache.datasketches.tuple.aninteger.IntegerSummary; import org.apache.pinot.core.common.ObjectSerDeUtils; import org.apache.pinot.spi.annotations.ScalarFunction; import org.apache.pinot.spi.utils.CommonConstants; @@ -87,20 +89,24 @@ public class SketchFunctions { @ScalarFunction(nullableParameters = true) public static byte[] toThetaSketch(@Nullable Object input, int nominalEntries) { UpdateSketch sketch = Sketches.updateSketchBuilder().setNominalEntries(nominalEntries).build(); - if (input instanceof Integer) { - sketch.update((Integer) input); - } else if (input instanceof Long) { - sketch.update((Long) input); - } else if (input instanceof Float) { - sketch.update((Float) input); - } else if (input instanceof Double) { - sketch.update((Double) input); - } else if (input instanceof BigDecimal) { - sketch.update(((BigDecimal) input).toString()); - } else if (input instanceof String) { - sketch.update((String) input); - } else if (input instanceof byte[]) { - sketch.update((byte[]) input); + if (input != null) { + if (input instanceof Integer) { + sketch.update((Integer) input); + } else if (input instanceof Long) { + sketch.update((Long) input); + } else if (input instanceof Float) { + sketch.update((Float) input); + } else if (input instanceof Double) { + sketch.update((Double) input); + } else if (input instanceof BigDecimal) { + sketch.update(((BigDecimal) input).toString()); + } else if (input instanceof String) { + sketch.update((String) input); + } else if (input instanceof byte[]) { + sketch.update((byte[]) input); + } else { + throw new IllegalArgumentException("Unrecognised input type for Theta sketch: " + input.getClass().getName()); + } } return ObjectSerDeUtils.DATA_SKETCH_SER_DE.serialize(sketch.compact()); } @@ -131,4 +137,51 @@ public class SketchFunctions { } return ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.serialize(hll); } + + /** + * Create a Tuple Sketch containing the key and value supplied + * + * @param key an Object we want to insert as the key of the sketch, may be null to return an empty sketch + * @param value an Integer we want to associate as the value to go along with the key, may be null to return an + * empty sketch + * @return serialized tuple sketch as bytes + */ + @ScalarFunction(nullableParameters = true) + public static byte[] toIntegerSumTupleSketch(@Nullable Object key, @Nullable Integer value) { + return toIntegerSumTupleSketch(key, value, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK); + } + + /** + * Create a Tuple Sketch containing the key and value supplied + * + * @param key an Object we want to insert as the key of the sketch, may be null to return an empty sketch + * @param value an Integer we want to associate as the value to go along with the key, may be null to return an + * empty sketch + * @param lgK integer representing the log of the maximum number of retained entries in the sketch, between 4 and 26 + * @return serialized tuple sketch as bytes + */ + @ScalarFunction(nullableParameters = true) + public static byte[] toIntegerSumTupleSketch(@Nullable Object key, Integer value, int lgK) { + IntegerSketch is = new IntegerSketch(lgK, IntegerSummary.Mode.Sum); + if (value != null && key != null) { + if (key instanceof Integer) { + is.update((Integer) key, value); + } else if (key instanceof Long) { + is.update((Long) key, value); + } else if (key instanceof Float) { + is.update((float) key, value); + } else if (key instanceof Double) { + is.update((double) key, value); + } else if (key instanceof BigDecimal) { + is.update(((BigDecimal) key).toString(), value); + } else if (key instanceof String) { + is.update((String) key, value); + } else if (key instanceof byte[]) { + is.update((byte[]) key, value); + } else { + throw new IllegalArgumentException("Unrecognised key type for Theta sketch: " + key.getClass().getName()); + } + } + return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(is.compact()); + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java index f61375dc06..06fbb1db66 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java @@ -21,6 +21,7 @@ package org.apache.pinot.core.query.aggregation.function; import com.google.common.base.Preconditions; import java.util.List; import org.apache.commons.lang3.StringUtils; +import org.apache.datasketches.tuple.aninteger.IntegerSummary; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.request.context.FunctionContext; import org.apache.pinot.core.query.request.context.QueryContext; @@ -336,6 +337,15 @@ public class AggregationFunctionFactory { return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.KURTOSIS); case FOURTHMOMENT: return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.MOMENT); + case DISTINCTCOUNTTUPLESKETCH: + // mode actually doesn't matter here because we only care about keys, not values + return new DistinctCountIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum); + case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH: + return new IntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum); + case SUMVALUESINTEGERSUMTUPLESKETCH: + return new SumValuesIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum); + case AVGVALUEINTEGERSUMTUPLESKETCH: + return new AvgValueIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum); case PARENTARGMAX: return new ParentArgMinMaxAggregationFunction(arguments, true); case PARENTARGMIN: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java new file mode 100644 index 0000000000..7ef6633619 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import java.util.List; +import org.apache.datasketches.tuple.CompactSketch; +import org.apache.datasketches.tuple.SketchIterator; +import org.apache.datasketches.tuple.Union; +import org.apache.datasketches.tuple.aninteger.IntegerSummary; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.segment.spi.AggregationFunctionType; + + +public class AvgValueIntegerTupleSketchAggregationFunction + extends IntegerTupleSketchAggregationFunction { + + public AvgValueIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) { + super(arguments, mode); + } + + // TODO if extra aggregation modes are supported, make this switch + // ie, if a Mode argument other than SUM is passed in, switch to the matching AggregationFunctionType + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.AVGVALUEINTEGERSUMTUPLESKETCH; + } + + @Override + public ColumnDataType getFinalResultColumnType() { + return ColumnDataType.LONG; + } + + @Override + public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) { + if (integerSummarySketches == null) { + return null; + } + Union<IntegerSummary> union = new Union<>(_entries, _setOps); + integerSummarySketches.forEach(union::union); + double retainedTotal = 0L; + CompactSketch<IntegerSummary> result = union.getResult(); + SketchIterator<IntegerSummary> summaries = result.iterator(); + while (summaries.next()) { + retainedTotal += summaries.getSummary().getValue(); + } + if (result.getRetainedEntries() == 0) { + // there is nothing to average, return null + return null; + } + double estimate = retainedTotal / result.getRetainedEntries(); + return Math.round(estimate); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountIntegerTupleSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountIntegerTupleSketchAggregationFunction.java new file mode 100644 index 0000000000..087337472d --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountIntegerTupleSketchAggregationFunction.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import java.util.List; +import org.apache.datasketches.tuple.CompactSketch; +import org.apache.datasketches.tuple.Union; +import org.apache.datasketches.tuple.aninteger.IntegerSummary; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.segment.spi.AggregationFunctionType; + + +public class DistinctCountIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction { + + public DistinctCountIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, + IntegerSummary.Mode mode) { + super(arguments, mode); + } + + // TODO if extra aggregation modes are supported, make this switch + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.DISTINCTCOUNTTUPLESKETCH; + } + + @Override + public ColumnDataType getFinalResultColumnType() { + return ColumnDataType.LONG; + } + + @Override + public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) { + Union<IntegerSummary> union = new Union<>(_entries, _setOps); + integerSummarySketches.forEach(union::union); + return Double.valueOf(union.getResult().getEstimate()).longValue(); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java new file mode 100644 index 0000000000..fde88dc808 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.datasketches.tuple.CompactSketch; +import org.apache.datasketches.tuple.Sketch; +import org.apache.datasketches.tuple.Union; +import org.apache.datasketches.tuple.aninteger.IntegerSummary; +import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.utils.CommonConstants; + + +/*** + * This is the base class for all Integer Tuple Sketch aggregations + * + * Note that it only supports BYTES columns containing serialized sketches currently, but could be expanded to more + */ +public class IntegerTupleSketchAggregationFunction + extends BaseSingleInputAggregationFunction<List<CompactSketch<IntegerSummary>>, Comparable> { + final ExpressionContext _expressionContext; + final IntegerSummarySetOperations _setOps; + final int _entries; + + public IntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) { + super(arguments.get(0)); + + Preconditions.checkArgument(arguments.size() <= 2, + "Tuple Sketch Aggregation Function expects at most 2 arguments, got: %s", arguments.size()); + _expressionContext = arguments.get(0); + _setOps = new IntegerSummarySetOperations(mode, mode); + if (arguments.size() == 2) { + FieldSpec.DataType dataType = arguments.get(1).getLiteral().getType(); + Preconditions.checkArgument(dataType == FieldSpec.DataType.LONG || dataType == FieldSpec.DataType.INT, + "Tuple Sketch Aggregation Function expected the second argument to be a number of entries to keep, but it " + + "was of type %s", + dataType.toString()); + _entries = ((Long) arguments.get(1).getLiteral().getValue()).intValue(); + } else { + _entries = (int) Math.pow(2, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK); + } + } + + // TODO if extra aggregation modes are supported, make this switch + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH; + } + + @Override + public AggregationResultHolder createAggregationResultHolder() { + return new ObjectAggregationResultHolder(); + } + + @Override + public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) { + return new ObjectGroupByResultHolder(initialCapacity, maxCapacity); + } + + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet blockValSet = blockValSetMap.get(_expression); + + // Treat BYTES value as serialized Integer Tuple Sketch + FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType(); + if (storedType == FieldSpec.DataType.BYTES) { + byte[][] bytesValues = blockValSet.getBytesValuesSV(); + try { + List<CompactSketch<IntegerSummary>> integerSketch = aggregationResultHolder.getResult(); + if (integerSketch != null) { + List<CompactSketch<IntegerSummary>> sketches = + Arrays.stream(bytesValues).map(ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE::deserialize) + .map(Sketch::compact).collect(Collectors.toList()); + aggregationResultHolder.setValue(merge(aggregationResultHolder.getResult(), sketches)); + } else { + List<CompactSketch<IntegerSummary>> sketches = + Arrays.stream(bytesValues).map(ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE::deserialize) + .map(Sketch::compact).collect(Collectors.toList()); + aggregationResultHolder.setValue(sketches); + } + } catch (Exception e) { + throw new RuntimeException("Caught exception while merging Tuple Sketches", e); + } + } else { + throw new IllegalStateException("Illegal data type for " + getType() + " aggregation function: " + storedType); + } + } + + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + + BlockValSet blockValSet = blockValSetMap.get(_expression); + + // Treat BYTES value as serialized Integer Tuple Sketch + FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType(); + + if (storedType == FieldSpec.DataType.BYTES) { + byte[][] bytesValues = blockValSet.getBytesValuesSV(); + try { + for (int i = 0; i < length; i++) { + byte[] value = bytesValues[i]; + int groupKey = groupKeyArray[i]; + CompactSketch<IntegerSummary> newSketch = + ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact(); + if (groupByResultHolder.getResult(groupKey) == null) { + ArrayList<CompactSketch<IntegerSummary>> newList = new ArrayList<>(); + newList.add(newSketch); + groupByResultHolder.setValueForKey(groupKey, newList); + } else { + groupByResultHolder.<List<CompactSketch<IntegerSummary>>>getResult(groupKey).add(newSketch); + } + } + } catch (Exception e) { + throw new RuntimeException("Caught exception while merging Tuple Sketches", e); + } + } else { + throw new IllegalStateException( + "Illegal data type for INTEGER_TUPLE_SKETCH_UNION aggregation function: " + storedType); + } + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + byte[][] valueArray = blockValSetMap.get(_expression).getBytesValuesSV(); + for (int i = 0; i < length; i++) { + byte[] value = valueArray[i]; + CompactSketch<IntegerSummary> newSketch = + ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact(); + for (int groupKey : groupKeysArray[i]) { + if (groupByResultHolder.getResult(groupKey) == null) { + groupByResultHolder.setValueForKey(groupKey, Collections.singletonList(newSketch)); + } else { + groupByResultHolder.<List<CompactSketch<IntegerSummary>>>getResult(groupKey).add(newSketch); + } + } + } + } + + @Override + public List<CompactSketch<IntegerSummary>> extractAggregationResult(AggregationResultHolder aggregationResultHolder) { + return aggregationResultHolder.getResult(); + } + + @Override + public List<CompactSketch<IntegerSummary>> extractGroupByResult(GroupByResultHolder groupByResultHolder, + int groupKey) { + return groupByResultHolder.getResult(groupKey); + } + + @Override + public List<CompactSketch<IntegerSummary>> merge(List<CompactSketch<IntegerSummary>> intermediateResult1, + List<CompactSketch<IntegerSummary>> intermediateResult2) { + if (intermediateResult1 == null && intermediateResult2 != null) { + return intermediateResult2; + } else if (intermediateResult1 != null && intermediateResult2 == null) { + return intermediateResult1; + } else if (intermediateResult1 == null && intermediateResult2 == null) { + return new ArrayList<>(0); + } + ArrayList<CompactSketch<IntegerSummary>> merged = + new ArrayList<>(intermediateResult1.size() + intermediateResult2.size()); + merged.addAll(intermediateResult1); + merged.addAll(intermediateResult2); + return merged; + } + + @Override + public ColumnDataType getIntermediateResultColumnType() { + return ColumnDataType.OBJECT; + } + + @Override + public ColumnDataType getFinalResultColumnType() { + return ColumnDataType.STRING; + } + + @Override + public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) { + if (integerSummarySketches == null) { + return null; + } + Union<IntegerSummary> union = new Union<>(_entries, _setOps); + integerSummarySketches.forEach(union::union); + return Base64.getEncoder().encodeToString(union.getResult().toByteArray()); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java new file mode 100644 index 0000000000..0167c7a0cf --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import java.util.List; +import org.apache.datasketches.tuple.CompactSketch; +import org.apache.datasketches.tuple.SketchIterator; +import org.apache.datasketches.tuple.Union; +import org.apache.datasketches.tuple.aninteger.IntegerSummary; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.segment.spi.AggregationFunctionType; + + +public class SumValuesIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction { + + public SumValuesIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) { + super(arguments, mode); + } + + // TODO if extra aggregation modes are supported, make this switch + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.SUMVALUESINTEGERSUMTUPLESKETCH; + } + + @Override + public ColumnDataType getFinalResultColumnType() { + return ColumnDataType.LONG; + } + + @Override + public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) { + if (integerSummarySketches == null) { + return null; + } + Union<IntegerSummary> union = new Union<>(_entries, _setOps); + integerSummarySketches.forEach(union::union); + double retainedTotal = 0L; + CompactSketch<IntegerSummary> result = union.getResult(); + SketchIterator<IntegerSummary> summaries = result.iterator(); + while (summaries.next()) { + retainedTotal += summaries.getSummary().getValue(); + } + double estimate = retainedTotal / result.getTheta(); + return Math.round(estimate); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java new file mode 100644 index 0000000000..8bdf7f8a86 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.processing.aggregator; + +import org.apache.datasketches.tuple.Sketch; +import org.apache.datasketches.tuple.Union; +import org.apache.datasketches.tuple.aninteger.IntegerSummary; +import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations; +import org.apache.pinot.core.common.ObjectSerDeUtils; + + +public class IntegerTupleSketchAggregator implements ValueAggregator { + IntegerSummary.Mode _mode; + + public IntegerTupleSketchAggregator(IntegerSummary.Mode mode) { + _mode = mode; + } + + @Override + public Object aggregate(Object value1, Object value2) { + Sketch<IntegerSummary> first = ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) value1); + Sketch<IntegerSummary> second = ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) value2); + Sketch<IntegerSummary> result = new Union<>(new IntegerSummarySetOperations(_mode, _mode)).union(first, second); + return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(result); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java index 54824255dd..4cd5a1ea6d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.segment.processing.aggregator; +import org.apache.datasketches.tuple.aninteger.IntegerSummary; import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.spi.data.FieldSpec.DataType; @@ -46,6 +47,11 @@ public class ValueAggregatorFactory { case DISTINCTCOUNTTHETASKETCH: case DISTINCTCOUNTRAWTHETASKETCH: return new DistinctCountThetaSketchAggregator(); + case DISTINCTCOUNTTUPLESKETCH: + case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH: + case SUMVALUESINTEGERSUMTUPLESKETCH: + case AVGVALUEINTEGERSUMTUPLESKETCH: + return new IntegerTupleSketchAggregator(IntegerSummary.Mode.Sum); default: throw new IllegalStateException("Unsupported aggregation type: " + aggregationType); } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java index 4496bbad15..b62d363c4f 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java @@ -44,6 +44,8 @@ public class SketchFunctionsTest { } Assert.assertEquals(thetaEstimate(SketchFunctions.toThetaSketch(null)), 0.0); Assert.assertEquals(thetaEstimate(SketchFunctions.toThetaSketch(null, 1024)), 0.0); + Assert.assertThrows(IllegalArgumentException.class, () -> SketchFunctions.toThetaSketch(new Object())); + Assert.assertThrows(IllegalArgumentException.class, () -> SketchFunctions.toThetaSketch(new Object(), 1024)); } private long hllEstimate(byte[] bytes) { @@ -59,4 +61,21 @@ public class SketchFunctionsTest { Assert.assertEquals(hllEstimate(SketchFunctions.toHLL(null)), 0); Assert.assertEquals(hllEstimate(SketchFunctions.toHLL(null, 8)), 0); } + + private double intTupleEstimate(byte[] bytes) { + return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(bytes).getEstimate(); + } + + @Test + public void intTupleSumCreation() { + for (Object i : _inputs) { + Assert.assertEquals(intTupleEstimate(SketchFunctions.toIntegerSumTupleSketch(i, 1)), 1.0d); + Assert.assertEquals(intTupleEstimate(SketchFunctions.toIntegerSumTupleSketch(i, 1, 16)), 1.0d); + } + Assert.assertEquals(intTupleEstimate(SketchFunctions.toIntegerSumTupleSketch(null, 1)), 0.0d); + Assert.assertEquals(intTupleEstimate(SketchFunctions.toIntegerSumTupleSketch(null, 1, 16)), 0.0d); + Assert.assertThrows(IllegalArgumentException.class, () -> SketchFunctions.toIntegerSumTupleSketch(new Object(), 1)); + Assert.assertThrows(IllegalArgumentException.class, + () -> SketchFunctions.toIntegerSumTupleSketch(new Object(), 1, 1024)); + } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountIntegerSumTupleSketchStarTreeV2Test.java b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountIntegerSumTupleSketchStarTreeV2Test.java new file mode 100644 index 0000000000..b9c52bf958 --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountIntegerSumTupleSketchStarTreeV2Test.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.startree.v2; + +import java.util.Random; +import org.apache.datasketches.tuple.Sketch; +import org.apache.datasketches.tuple.aninteger.IntegerSketch; +import org.apache.datasketches.tuple.aninteger.IntegerSummary; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.segment.local.aggregator.IntegerTupleSketchValueAggregator; +import org.apache.pinot.segment.local.aggregator.ValueAggregator; +import org.apache.pinot.spi.data.FieldSpec.DataType; + +import static org.testng.Assert.assertEquals; + + +public class DistinctCountIntegerSumTupleSketchStarTreeV2Test + extends BaseStarTreeV2Test<byte[], Sketch<IntegerSummary>> { + + @Override + ValueAggregator<byte[], Sketch<IntegerSummary>> getValueAggregator() { + return new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum); + } + + @Override + DataType getRawValueType() { + return DataType.BYTES; + } + + @Override + byte[] getRandomRawValue(Random random) { + IntegerSketch is = new IntegerSketch(4, IntegerSummary.Mode.Sum); + is.update(random.nextInt(100), random.nextInt(100)); + return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(is.compact()); + } + + @Override + void assertAggregatedValue(Sketch<IntegerSummary> starTreeResult, Sketch<IntegerSummary> nonStarTreeResult) { + assertEquals(starTreeResult.getEstimate(), nonStarTreeResult.getEstimate()); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java new file mode 100644 index 0000000000..1440e738d1 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.aggregator; + +import org.apache.datasketches.tuple.Sketch; +import org.apache.datasketches.tuple.Union; +import org.apache.datasketches.tuple.aninteger.IntegerSummary; +import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations; +import org.apache.pinot.segment.local.utils.CustomSerDeUtils; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.data.FieldSpec.DataType; + + +public class IntegerTupleSketchValueAggregator implements ValueAggregator<byte[], Sketch<IntegerSummary>> { + public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES; + + // This changes a lot similar to the Bitmap aggregator + private int _maxByteSize; + + private final IntegerSummary.Mode _mode; + + public IntegerTupleSketchValueAggregator(IntegerSummary.Mode mode) { + _mode = mode; + } + + @Override + public AggregationFunctionType getAggregationType() { + return AggregationFunctionType.DISTINCTCOUNTTUPLESKETCH; + } + + @Override + public DataType getAggregatedValueType() { + return AGGREGATED_VALUE_TYPE; + } + + // Utility method to merge two sketches + private Sketch<IntegerSummary> union(Sketch<IntegerSummary> a, Sketch<IntegerSummary> b) { + return new Union<>(new IntegerSummarySetOperations(_mode, _mode)).union(a, b); + } + + @Override + public Sketch<IntegerSummary> getInitialAggregatedValue(byte[] rawValue) { + Sketch<IntegerSummary> initialValue = deserializeAggregatedValue(rawValue); + _maxByteSize = Math.max(_maxByteSize, rawValue.length); + return initialValue; + } + + @Override + public Sketch<IntegerSummary> applyRawValue(Sketch<IntegerSummary> value, byte[] rawValue) { + Sketch<IntegerSummary> right = deserializeAggregatedValue(rawValue); + Sketch<IntegerSummary> result = union(value, right).compact(); + _maxByteSize = Math.max(_maxByteSize, result.toByteArray().length); + return result; + } + + @Override + public Sketch<IntegerSummary> applyAggregatedValue(Sketch<IntegerSummary> value, + Sketch<IntegerSummary> aggregatedValue) { + Sketch<IntegerSummary> result = union(value, aggregatedValue); + _maxByteSize = Math.max(_maxByteSize, result.toByteArray().length); + return result; + } + + @Override + public Sketch<IntegerSummary> cloneAggregatedValue(Sketch<IntegerSummary> value) { + return deserializeAggregatedValue(serializeAggregatedValue(value)); + } + + @Override + public int getMaxAggregatedValueByteSize() { + return _maxByteSize; + } + + @Override + public byte[] serializeAggregatedValue(Sketch<IntegerSummary> value) { + return CustomSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(value); + } + + @Override + public Sketch<IntegerSummary> deserializeAggregatedValue(byte[] bytes) { + return CustomSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(bytes); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java index aa4bdb410b..b4f90c4952 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.segment.local.aggregator; +import org.apache.datasketches.tuple.aninteger.IntegerSummary; import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.spi.data.FieldSpec.DataType; @@ -66,6 +67,11 @@ public class ValueAggregatorFactory { case DISTINCTCOUNTTHETASKETCH: case DISTINCTCOUNTRAWTHETASKETCH: return new DistinctCountThetaSketchValueAggregator(); + case DISTINCTCOUNTTUPLESKETCH: + case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH: + case AVGVALUEINTEGERSUMTUPLESKETCH: + case SUMVALUESINTEGERSUMTUPLESKETCH: + return new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum); default: throw new IllegalStateException("Unsupported aggregation type: " + aggregationType); } @@ -107,6 +113,11 @@ public class ValueAggregatorFactory { case DISTINCTCOUNTTHETASKETCH: case DISTINCTCOUNTRAWTHETASKETCH: return DistinctCountThetaSketchValueAggregator.AGGREGATED_VALUE_TYPE; + case DISTINCTCOUNTTUPLESKETCH: + case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH: + case AVGVALUEINTEGERSUMTUPLESKETCH: + case SUMVALUESINTEGERSUMTUPLESKETCH: + return IntegerTupleSketchValueAggregator.AGGREGATED_VALUE_TYPE; default: throw new IllegalStateException("Unsupported aggregation type: " + aggregationType); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java index 289715680b..1ed3a3e341 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java @@ -26,6 +26,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.datasketches.memory.Memory; import org.apache.datasketches.theta.Sketch; +import org.apache.datasketches.tuple.aninteger.IntegerSummary; +import org.apache.datasketches.tuple.aninteger.IntegerSummaryDeserializer; import org.apache.pinot.segment.local.customobject.AvgPair; import org.apache.pinot.segment.local.customobject.MinMaxRangePair; import org.apache.pinot.segment.local.customobject.QuantileDigest; @@ -228,6 +230,28 @@ public class CustomSerDeUtils { } }; + public static final ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>> DATA_SKETCH_INT_TUPLE_SER_DE = + new ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>>() { + @Override + public byte[] serialize(org.apache.datasketches.tuple.Sketch<IntegerSummary> value) { + return value.compact().toByteArray(); + } + + @Override + public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(byte[] bytes) { + return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes), + new IntegerSummaryDeserializer()); + } + + @Override + public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(ByteBuffer byteBuffer) { + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes), + new IntegerSummaryDeserializer()); + } + }; + public static final ObjectSerDe<RoaringBitmap> ROARING_BITMAP_SER_DE = new ObjectSerDe<RoaringBitmap>() { @Override diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregatorTest.java new file mode 100644 index 0000000000..d108d799b0 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregatorTest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.aggregator; + +import org.apache.datasketches.tuple.Sketch; +import org.apache.datasketches.tuple.aninteger.IntegerSketch; +import org.apache.datasketches.tuple.aninteger.IntegerSummary; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +public class IntegerTupleSketchValueAggregatorTest { + + private byte[] sketchContaining(String key, int value) { + IntegerSketch is = new IntegerSketch(16, IntegerSummary.Mode.Sum); + is.update(key, value); + return is.compact().toByteArray(); + }; + + @Test + public void initialShouldParseASketch() { + IntegerTupleSketchValueAggregator agg = new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum); + assertEquals(agg.getInitialAggregatedValue(sketchContaining("hello world", 1)).getEstimate(), 1.0); + } + + @Test + public void applyAggregatedValueShouldUnion() { + IntegerSketch s1 = new IntegerSketch(16, IntegerSummary.Mode.Sum); + IntegerSketch s2 = new IntegerSketch(16, IntegerSummary.Mode.Sum); + s1.update("a", 1); + s2.update("b", 1); + IntegerTupleSketchValueAggregator agg = new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum); + Sketch<IntegerSummary> merged = agg.applyAggregatedValue(s1, s2); + assertEquals(merged.getEstimate(), 2.0); + + // and should update the max size + assertEquals(agg.getMaxAggregatedValueByteSize(), agg.serializeAggregatedValue(merged).length); + } + + @Test + public void applyRawValueShouldUnion() { + IntegerSketch s1 = new IntegerSketch(16, IntegerSummary.Mode.Sum); + IntegerSketch s2 = new IntegerSketch(16, IntegerSummary.Mode.Sum); + s1.update("a", 1); + s2.update("b", 1); + IntegerTupleSketchValueAggregator agg = new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum); + Sketch<IntegerSummary> merged = agg.applyRawValue(s1, agg.serializeAggregatedValue(s2)); + assertEquals(merged.getEstimate(), 2.0); + + // and should update the max size + assertEquals(agg.getMaxAggregatedValueByteSize(), agg.serializeAggregatedValue(merged).length); + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java index a9d2085c8f..7b2a02d666 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java @@ -72,6 +72,15 @@ public enum AggregationFunctionType { KURTOSIS("kurtosis"), FOURTHMOMENT("fourthmoment"), + // DataSketches Tuple Sketch support + DISTINCTCOUNTTUPLESKETCH("distinctCountTupleSketch"), + + // DataSketches Tuple Sketch support for Integer based Tuple Sketches + DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH("distinctCountRawIntegerSumTupleSketch"), + + SUMVALUESINTEGERSUMTUPLESKETCH("sumValuesIntegerSumTupleSketch"), + AVGVALUEINTEGERSUMTUPLESKETCH("avgValueIntegerSumTupleSketch"), + // Geo aggregation functions STUNION("STUnion"), diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 585c56520c..4dc7496c3a 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -96,6 +96,9 @@ public class CommonConstants { // https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 65536; + + public static final int DEFAULT_TUPLE_SKETCH_LGK = 16; + // Whether to rewrite DistinctCount to DistinctCountBitmap public static final String ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY = "enable.distinct.count.bitmap.override"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org