This is an automated email from the ASF dual-hosted git repository. kishoreg pushed a commit to branch distinct-count-bitmap in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit cbcc6a0d0641ebc7ea99d7bda488599b56f5c845 Author: kishoreg <g.kish...@gmail.com> AuthorDate: Tue Jul 28 17:16:40 2020 -0700 Adding distinct count support based on bitmap --- .../common/function/AggregationFunctionType.java | 1 + .../apache/pinot/core/common/ObjectSerDeUtils.java | 62 +++- .../DistinctCountBitmapValueAggregator.java | 95 ++++++ .../function/AggregationFunctionFactory.java | 2 + .../function/AggregationFunctionVisitorBase.java | 3 + .../function/DistinctCountAggregationFunction.java | 71 +++++ .../DistinctCountBitmapAggregationFunction.java | 354 +++++++++++++++++++++ 7 files changed, 572 insertions(+), 16 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java index ff3fb50..6c7ebe5 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java @@ -28,6 +28,7 @@ public enum AggregationFunctionType { MINMAXRANGE("minMaxRange"), DISTINCTCOUNT("distinctCount"), DISTINCTCOUNTHLL("distinctCountHLL"), + DISTINCTCOUNTBITMAP("distinctCountBitmap"), DISTINCTCOUNTRAWHLL("distinctCountRawHLL"), FASTHLL("fastHLL"), DISTINCTCOUNTTHETASKETCH("distinctCountThetaSketch"), diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java index f471e37..5cbe20f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java @@ -40,6 +40,8 @@ import org.apache.pinot.core.query.aggregation.function.customobject.AvgPair; import org.apache.pinot.core.query.aggregation.function.customobject.DistinctTable; import org.apache.pinot.core.query.aggregation.function.customobject.MinMaxRangePair; import org.apache.pinot.core.query.aggregation.function.customobject.QuantileDigest; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; /** @@ -63,7 +65,8 @@ public class ObjectSerDeUtils { IntSet(9), TDigest(10), DistinctTable(11), - DataSketch(12); + DataSketch(12), + Bitmap(13); private int _value; @@ -102,6 +105,8 @@ public class ObjectSerDeUtils { return ObjectType.DistinctTable; } else if (value instanceof Sketch) { return ObjectType.DataSketch; + } else if (value instanceof MutableRoaringBitmap) { + return ObjectType.Bitmap; } else { throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName()); } @@ -286,6 +291,44 @@ public class ObjectSerDeUtils { } }; + public static final ObjectSerDe<MutableRoaringBitmap> ROARING_BITMAP_SERDE = new ObjectSerDe<MutableRoaringBitmap>() { + + @Override + public byte[] serialize(MutableRoaringBitmap bitmap) { + try { + byte[] bytes = new byte[bitmap.serializedSizeInBytes()]; + bitmap.serialize(ByteBuffer.wrap(bytes)); + return bytes; + } catch (Exception e) { + throw new RuntimeException("Caught exception while serializing RoaringBitmap", e); + } + } + + @Override + public MutableRoaringBitmap deserialize(byte[] bytes) { + try { + MutableRoaringBitmap bitmap = new MutableRoaringBitmap(); + bitmap.deserialize(ByteBuffer.wrap(bytes)); + return bitmap; + } catch (IOException e) { + throw new RuntimeException("Caught exception while de-serializing MutableRoaringBitmap", e); + } + } + + @Override + public MutableRoaringBitmap deserialize(ByteBuffer byteBuffer) { + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + try { + MutableRoaringBitmap bitmap = new MutableRoaringBitmap(); + bitmap.deserialize(ByteBuffer.wrap(bytes)); + return bitmap; + } catch (IOException e) { + throw new RuntimeException("Caught exception while de-serializing MutableRoaringBitmap", e); + } + } + }; + public static final ObjectSerDe<DistinctTable> DISTINCT_TABLE_SER_DE = new ObjectSerDe<DistinctTable>() { @Override @@ -484,21 +527,8 @@ public class ObjectSerDeUtils { // NOTE: DO NOT change the order, it has to be the same order as the ObjectType //@formatter:off - private static final ObjectSerDe[] SER_DES = { - STRING_SER_DE, - LONG_SER_DE, - DOUBLE_SER_DE, - DOUBLE_ARRAY_LIST_SER_DE, - AVG_PAIR_SER_DE, - MIN_MAX_RANGE_PAIR_SER_DE, - HYPER_LOG_LOG_SER_DE, - QUANTILE_DIGEST_SER_DE, - MAP_SER_DE, - INT_SET_SER_DE, - TDIGEST_SER_DE, - DISTINCT_TABLE_SER_DE, - DATA_SKETCH_SER_DE - }; + private static final ObjectSerDe[] SER_DES = + {STRING_SER_DE, LONG_SER_DE, DOUBLE_SER_DE, DOUBLE_ARRAY_LIST_SER_DE, AVG_PAIR_SER_DE, MIN_MAX_RANGE_PAIR_SER_DE, HYPER_LOG_LOG_SER_DE, QUANTILE_DIGEST_SER_DE, MAP_SER_DE, INT_SET_SER_DE, TDIGEST_SER_DE, DISTINCT_TABLE_SER_DE, DATA_SKETCH_SER_DE, ROARING_BITMAP_SERDE}; //@formatter:on public static byte[] serialize(Object value) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountBitmapValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountBitmapValueAggregator.java new file mode 100644 index 0000000..5623a17 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountBitmapValueAggregator.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.aggregator; + +import com.clearspring.analytics.stream.cardinality.CardinalityMergeException; +import java.util.Objects; +import org.apache.pinot.common.function.AggregationFunctionType; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +public class DistinctCountBitmapValueAggregator implements ValueAggregator<Object, MutableRoaringBitmap> { + public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES; + private int _maxByteSize; + + @Override + public AggregationFunctionType getAggregationType() { + return AggregationFunctionType.DISTINCTCOUNTBITMAP; + } + + @Override + public DataType getAggregatedValueType() { + return AGGREGATED_VALUE_TYPE; + } + + @Override + public MutableRoaringBitmap getInitialAggregatedValue(Object rawValue) { + MutableRoaringBitmap initialValue; + if (rawValue instanceof byte[]) { + byte[] bytes = (byte[]) rawValue; + initialValue = deserializeAggregatedValue(bytes); + _maxByteSize = Math.max(_maxByteSize, bytes.length); + } else { + initialValue = new MutableRoaringBitmap(); + initialValue.add(Objects.hashCode(rawValue) & Integer.MAX_VALUE); + _maxByteSize = Math.max(_maxByteSize, initialValue.serializedSizeInBytes()); + } + return initialValue; + } + + @Override + public MutableRoaringBitmap applyRawValue(MutableRoaringBitmap value, Object rawValue) { + if (rawValue instanceof byte[]) { + value.or(deserializeAggregatedValue((byte[]) rawValue)); + } else { + value.add(Objects.hashCode(rawValue) & Integer.MAX_VALUE); + } + _maxByteSize = Math.max(_maxByteSize, value.serializedSizeInBytes()); + return value; + } + + @Override + public MutableRoaringBitmap applyAggregatedValue(MutableRoaringBitmap value, MutableRoaringBitmap aggregatedValue) { + value.or(aggregatedValue); + _maxByteSize = Math.max(_maxByteSize, value.serializedSizeInBytes()); + return value; + } + + @Override + public MutableRoaringBitmap cloneAggregatedValue(MutableRoaringBitmap value) { + return deserializeAggregatedValue(serializeAggregatedValue(value)); + } + + @Override + public int getMaxAggregatedValueByteSize() { + return _maxByteSize; + } + + @Override + public byte[] serializeAggregatedValue(MutableRoaringBitmap value) { + return ObjectSerDeUtils.ROARING_BITMAP_SERDE.serialize(value); + } + + @Override + public MutableRoaringBitmap deserializeAggregatedValue(byte[] bytes) { + return ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytes); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java index 021c188..37cebaf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java @@ -123,6 +123,8 @@ public class AggregationFunctionFactory { return new DistinctCountAggregationFunction(firstArgument); case DISTINCTCOUNTHLL: return new DistinctCountHLLAggregationFunction(arguments); + case DISTINCTCOUNTBITMAP: + return new DistinctCountBitmapAggregationFunction(arguments); case DISTINCTCOUNTRAWHLL: return new DistinctCountRawHLLAggregationFunction(arguments); case FASTHLL: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java index 2b5b615..aadb583 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java @@ -98,5 +98,8 @@ public class AggregationFunctionVisitorBase { public void visit(DistinctCountThetaSketchAggregationFunction function) { } + + public void visit(DistinctCountBitmapAggregationFunction distinctCountBitmapAggregationFunction) { + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java index 7e7ba4b..6077ba7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java @@ -18,18 +18,23 @@ */ package org.apache.pinot.core.query.aggregation.function; +import com.clearspring.analytics.stream.cardinality.HyperLogLog; import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import java.util.Arrays; +import java.util.HashSet; import java.util.Map; import org.apache.pinot.common.function.AggregationFunctionType; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.ObjectSerDeUtils; import org.apache.pinot.core.query.aggregation.AggregationResultHolder; import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder; import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; import org.apache.pinot.core.query.request.context.ExpressionContext; import org.apache.pinot.spi.data.FieldSpec; +import org.roaringbitmap.RoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; public class DistinctCountAggregationFunction extends BaseSingleInputAggregationFunction<IntOpenHashSet, Integer> { @@ -298,4 +303,70 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation } return valueSet; } + + public static void main(String[] args) { + int length = 30_000_000; + + int cardinality = 10_000_000; + + double[] metric = new double[length]; + String[] strings = new String[length]; + + for (int i = 0; i < length; i++) { + metric[i] = (i * 1001 )% cardinality; + strings[i] = "asdasdasdadsad" + i % cardinality; + } + System.out.println(" ================= "); + //String TEST + long start = System.currentTimeMillis(); +// IntOpenHashSet set = new IntOpenHashSet(3100000); + IntOpenHashSet stringSet = new IntOpenHashSet(); +// HashSet<Integer> set = new HashSet<Integer>(3100000); + for (int i = 0; i < length; i++) { + stringSet.add(strings[i].hashCode()); + } + System.out.println(System.currentTimeMillis() - start); + System.out.println(stringSet.size()); + System.out.println(" ================= "); + + //DOUBLE TEST + start = System.currentTimeMillis(); +// IntOpenHashSet set = new IntOpenHashSet(3100000); + IntOpenHashSet doubleSet = new IntOpenHashSet(1000); +// HashSet<Integer> set = new HashSet<Integer>(3100000); + for (int i = 0; i < length; i++) { + doubleSet.add(Double.hashCode(metric[i])); + } + System.out.println(System.currentTimeMillis() - start); + System.out.println(doubleSet.size()); + start = System.currentTimeMillis(); + byte[] serialize = ObjectSerDeUtils.serialize(doubleSet); + System.out.println("took = " + (System.currentTimeMillis() - start)); + System.out.println(serialize.length); + System.out.println(" ================= "); + + + //BITMAP TEST + start = System.currentTimeMillis(); + MutableRoaringBitmap bitmap = new MutableRoaringBitmap(); + for (int i = 0; i < length; i++) { + bitmap.add(Double.hashCode(metric[i])); + } + System.out.println(System.currentTimeMillis() - start); + System.out.println(bitmap.getCardinality()); + System.out.println(bitmap.serializedSizeInBytes()); + System.out.println(" ================= "); + + //HLL TEST + start = System.currentTimeMillis(); + HyperLogLog hll = new HyperLogLog(12); + for (int i = 0; i < length; i++) { + hll.offer(metric[i]); + } + System.out.println(System.currentTimeMillis() - start); + System.out.println(hll.cardinality()); + System.out.println(hll.sizeof()); + System.out.println(" ================= "); + + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapAggregationFunction.java new file mode 100644 index 0000000..1a10f07 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapAggregationFunction.java @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import com.google.common.base.Preconditions; +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.function.AggregationFunctionType; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; +import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.roaringbitmap.RoaringBitmap; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +public class DistinctCountBitmapAggregationFunction extends BaseSingleInputAggregationFunction<ImmutableRoaringBitmap, Long> { + + public DistinctCountBitmapAggregationFunction(List<ExpressionContext> arguments) { + super(arguments.get(0)); + } + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.DISTINCTCOUNTBITMAP; + } + + @Override + public void accept(AggregationFunctionVisitorBase visitor) { + visitor.visit(this); + } + + @Override + public AggregationResultHolder createAggregationResultHolder() { + return new ObjectAggregationResultHolder(); + } + + @Override + public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) { + return new ObjectGroupByResultHolder(initialCapacity, maxCapacity); + } + + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet blockValSet = blockValSetMap.get(_expression); + DataType valueType = blockValSet.getValueType(); + + if (valueType != DataType.BYTES) { + MutableRoaringBitmap bitmap = getDefaultBitmap(aggregationResultHolder); + + switch (valueType) { + case INT: + int[] intValues = blockValSet.getIntValuesSV(); + for (int i = 0; i < length; i++) { + bitmap.add(Integer.hashCode(intValues[i]) & Integer.MAX_VALUE); + } + break; + case LONG: + long[] longValues = blockValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + bitmap.add(Long.hashCode(longValues[i]) & Integer.MAX_VALUE); + } + break; + case FLOAT: + float[] floatValues = blockValSet.getFloatValuesSV(); + for (int i = 0; i < length; i++) { + bitmap.add(Float.hashCode(floatValues[i]) & Integer.MAX_VALUE); + } + break; + case DOUBLE: + double[] doubleValues = blockValSet.getDoubleValuesSV(); + for (int i = 0; i < length; i++) { + bitmap.add(Double.hashCode(doubleValues[i]) & Integer.MAX_VALUE); + } + break; + case STRING: + String[] stringValues = blockValSet.getStringValuesSV(); + for (int i = 0; i < length; i++) { + bitmap.add(stringValues[i].hashCode() & Integer.MAX_VALUE); + } + break; + default: + throw new IllegalStateException( + "Illegal data type for DISTINCT_COUNT_HLL aggregation function: " + valueType); + } + } else { + // Serialized Bitmap + byte[][] bytesValues = blockValSet.getBytesValuesSV(); + try { + MutableRoaringBitmap bitmap = aggregationResultHolder.getResult(); + if (bitmap != null) { + for (int i = 0; i < length; i++) { + bitmap.or(ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytesValues[i])); + } + } else { + bitmap = ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytesValues[0]).toMutableRoaringBitmap(); + aggregationResultHolder.setValue(bitmap); + for (int i = 1; i < length; i++) { + bitmap.or(ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytesValues[i])); + } + } + } catch (Exception e) { + throw new RuntimeException("Caught exception while merging HyperLogLogs", e); + } + } + } + + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet blockValSet = blockValSetMap.get(_expression); + DataType valueType = blockValSet.getValueType(); + + switch (valueType) { + case INT: + int[] intValues = blockValSet.getIntValuesSV(); + for (int i = 0; i < length; i++) { + getDefaultBitmap(groupByResultHolder, groupKeyArray[i]) + .add(Integer.hashCode(intValues[i]) & Integer.MAX_VALUE); + } + break; + case LONG: + long[] longValues = blockValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + getDefaultBitmap(groupByResultHolder, groupKeyArray[i]).add(Long.hashCode(longValues[i]) & Integer.MAX_VALUE); + } + break; + case FLOAT: + float[] floatValues = blockValSet.getFloatValuesSV(); + for (int i = 0; i < length; i++) { + getDefaultBitmap(groupByResultHolder, groupKeyArray[i]) + .add(Float.hashCode(floatValues[i]) & Integer.MAX_VALUE); + } + break; + case DOUBLE: + double[] doubleValues = blockValSet.getDoubleValuesSV(); + for (int i = 0; i < length; i++) { + getDefaultBitmap(groupByResultHolder, groupKeyArray[i]) + .add(Double.hashCode(doubleValues[i]) & Integer.MAX_VALUE); + } + break; + case STRING: + String[] stringValues = blockValSet.getStringValuesSV(); + for (int i = 0; i < length; i++) { + getDefaultBitmap(groupByResultHolder, groupKeyArray[i]).add(stringValues[i].hashCode() & Integer.MAX_VALUE); + } + break; + case BYTES: + // Serialized HyperLogLog + byte[][] bytesValues = blockValSet.getBytesValuesSV(); + try { + for (int i = 0; i < length; i++) { + ImmutableRoaringBitmap value = ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytesValues[i]); + int groupKey = groupKeyArray[i]; + MutableRoaringBitmap bitmap = groupByResultHolder.getResult(groupKey); + if (bitmap != null) { + bitmap.or(value); + } else { + groupByResultHolder.setValueForKey(groupKey, value.toMutableRoaringBitmap()); + } + } + } catch (Exception e) { + throw new RuntimeException("Caught exception while merging Bitmaps", e); + } + break; + default: + throw new IllegalStateException( + "Illegal data type for DISTINCT_COUNT_BITMAP aggregation function: " + valueType); + } + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet blockValSet = blockValSetMap.get(_expression); + DataType valueType = blockValSet.getValueType(); + + switch (valueType) { + case INT: + int[] intValues = blockValSet.getIntValuesSV(); + for (int i = 0; i < length; i++) { + int value = intValues[i]; + for (int groupKey : groupKeysArray[i]) { + getDefaultBitmap(groupByResultHolder, groupKey).add(Integer.hashCode(value) & Integer.MAX_VALUE); + } + } + break; + case LONG: + long[] longValues = blockValSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + long value = longValues[i]; + for (int groupKey : groupKeysArray[i]) { + getDefaultBitmap(groupByResultHolder, groupKey).add(Long.hashCode(value) & Integer.MAX_VALUE); + } + } + break; + case FLOAT: + float[] floatValues = blockValSet.getFloatValuesSV(); + for (int i = 0; i < length; i++) { + float value = floatValues[i]; + for (int groupKey : groupKeysArray[i]) { + getDefaultBitmap(groupByResultHolder, groupKey).add(Float.hashCode(value) & Integer.MAX_VALUE); + } + } + break; + case DOUBLE: + double[] doubleValues = blockValSet.getDoubleValuesSV(); + for (int i = 0; i < length; i++) { + double value = doubleValues[i]; + for (int groupKey : groupKeysArray[i]) { + getDefaultBitmap(groupByResultHolder, groupKey).add(Double.hashCode(value) & Integer.MAX_VALUE); + } + } + break; + case STRING: + String[] stringValues = blockValSet.getStringValuesSV(); + for (int i = 0; i < length; i++) { + String value = stringValues[i]; + for (int groupKey : groupKeysArray[i]) { + getDefaultBitmap(groupByResultHolder, groupKey).add(value.hashCode() & Integer.MAX_VALUE); + } + } + break; + case BYTES: + // Serialized HyperLogLog + byte[][] bytesValues = blockValSet.getBytesValuesSV(); + try { + for (int i = 0; i < length; i++) { + ImmutableRoaringBitmap value = ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytesValues[i]); + for (int groupKey : groupKeysArray[i]) { + MutableRoaringBitmap bitmap = groupByResultHolder.getResult(groupKey); + if (bitmap != null) { + bitmap.or(value); + } else { + // Create a new HyperLogLog for the group + groupByResultHolder + .setValueForKey(groupKey, value.toMutableRoaringBitmap()); + } + } + } + } catch (Exception e) { + throw new RuntimeException("Caught exception while merging HyperLogLogs", e); + } + break; + default: + throw new IllegalStateException("Illegal data type for DISTINCT_COUNT_HLL aggregation function: " + valueType); + } + } + + @Override + public ImmutableRoaringBitmap extractAggregationResult(AggregationResultHolder aggregationResultHolder) { + MutableRoaringBitmap hyperLogLog = aggregationResultHolder.getResult(); + if (hyperLogLog == null) { + return new MutableRoaringBitmap(); + } else { + return hyperLogLog; + } + } + + @Override + public MutableRoaringBitmap extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { + MutableRoaringBitmap hyperLogLog = groupByResultHolder.getResult(groupKey); + if (hyperLogLog == null) { + return new MutableRoaringBitmap(); + } else { + return hyperLogLog; + } + } + + @Override + public ImmutableRoaringBitmap merge(ImmutableRoaringBitmap intermediateResult1, + ImmutableRoaringBitmap intermediateResult2) { + try { + intermediateResult1.or(intermediateResult2); + } catch (Exception e) { + throw new RuntimeException("Caught exception while merging HyperLogLogs", e); + } + return intermediateResult1; + } + + @Override + public boolean isIntermediateResultComparable() { + return false; + } + + @Override + public ColumnDataType getIntermediateResultColumnType() { + return ColumnDataType.OBJECT; + } + + @Override + public ColumnDataType getFinalResultColumnType() { + return ColumnDataType.LONG; + } + + @Override + public Long extractFinalResult(ImmutableRoaringBitmap intermediateResult) { + return Long.valueOf(intermediateResult.getCardinality()); + } + + /** + * Returns the HyperLogLog from the result holder or creates a new one with default log2m if it does not exist. + * + * @param aggregationResultHolder Result holder + * @return HyperLogLog from the result holder + */ + protected MutableRoaringBitmap getDefaultBitmap(AggregationResultHolder aggregationResultHolder) { + MutableRoaringBitmap bitmap = aggregationResultHolder.getResult(); + if (bitmap == null) { + bitmap = new MutableRoaringBitmap(); + aggregationResultHolder.setValue(bitmap); + } + return bitmap; + } + + /** + * Returns the HyperLogLog for the given group key if exists, or creates a new one with default log2m. + * + * @param groupByResultHolder Result holder + * @param groupKey Group key for which to return the HyperLogLog + * @return HyperLogLog for the group key + */ + protected MutableRoaringBitmap getDefaultBitmap(GroupByResultHolder groupByResultHolder, int groupKey) { + MutableRoaringBitmap bitmap = groupByResultHolder.getResult(groupKey); + if (bitmap == null) { + bitmap = new MutableRoaringBitmap(); + groupByResultHolder.setValueForKey(groupKey, bitmap); + } + return bitmap; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org