KYLIN-2387 A new BitmapCounter with better performance
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d19533c4 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d19533c4 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d19533c4 Branch: refs/heads/master-hbase1.x Commit: d19533c45414315b3ec29428edfd7c1df6d33604 Parents: 4b97721 Author: gaodayue <gaoda...@meituan.com> Authored: Wed Jan 11 15:12:31 2017 +0800 Committer: gaodayue <gaoda...@meituan.com> Committed: Fri Jan 13 22:32:05 2017 +0800 ---------------------------------------------------------------------- .../common/util/ByteBufferOutputStream.java | 43 +++++ .../gridtable/AggregationCacheMemSizeTest.java | 10 +- .../metadata/measure/MeasureCodecTest.java | 10 +- .../kylin/measure/bitmap/BitmapAggregator.java | 46 +++-- .../kylin/measure/bitmap/BitmapCounter.java | 181 ++++--------------- .../bitmap/BitmapDistinctCountAggFunc.java | 35 ++-- .../BitmapIntersectDistinctCountAggFunc.java | 37 ++-- .../kylin/measure/bitmap/BitmapMeasureType.java | 52 +++--- .../kylin/measure/bitmap/BitmapSerializer.java | 31 +--- .../measure/bitmap/ImmutableBitmapCounter.java | 112 ++++++++++++ .../measure/bitmap/MutableBitmapCounter.java | 60 ++++++ .../measure/AggregatorMemEstimateTest.java | 4 +- .../measure/bitmap/BitmapAggregatorTest.java | 38 ++-- .../kylin/measure/bitmap/BitmapCounterTest.java | 88 +++++---- .../measure/bitmap/BitmapSerializerTest.java | 60 +++--- 15 files changed, 457 insertions(+), 350 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-common/src/main/java/org/apache/kylin/common/util/ByteBufferOutputStream.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ByteBufferOutputStream.java b/core-common/src/main/java/org/apache/kylin/common/util/ByteBufferOutputStream.java new file mode 100644 index 0000000..2e3ff07 --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/util/ByteBufferOutputStream.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.common.util; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +/** + * An OutputStream whose target is a {@link ByteBuffer}. + */ +public class ByteBufferOutputStream extends OutputStream { + protected final ByteBuffer buffer; + + public ByteBufferOutputStream(ByteBuffer buffer) { + this.buffer = buffer; + } + + @Override + public void write(int b) throws IOException { + buffer.put((byte) b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + buffer.put(b, off, len); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java index b164e54..63c7672 100644 --- a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java @@ -33,7 +33,7 @@ import org.apache.kylin.measure.basic.BigDecimalSumAggregator; import org.apache.kylin.measure.basic.DoubleSumAggregator; import org.apache.kylin.measure.basic.LongSumAggregator; import org.apache.kylin.measure.bitmap.BitmapAggregator; -import org.apache.kylin.measure.bitmap.BitmapCounter; +import org.apache.kylin.measure.bitmap.MutableBitmapCounter; import org.apache.kylin.measure.hllc.HLLCAggregator; import org.apache.kylin.measure.hllc.HLLCounter; import org.github.jamm.MemoryMeter; @@ -43,13 +43,13 @@ import com.google.common.base.Stopwatch; public class AggregationCacheMemSizeTest { private static final MemoryMeter meter = new MemoryMeter(); - private static final BitmapCounter[] bitmaps = new BitmapCounter[5]; + private static final MutableBitmapCounter[] bitmaps = new MutableBitmapCounter[5]; private static final Random random = new Random(); // consider bitmaps with variant cardinality static { for (int i = 0; i < bitmaps.length; i++) { - bitmaps[i] = new BitmapCounter(); + bitmaps[i] = new MutableBitmapCounter(); } final int totalBits = 1_000_000; @@ -116,8 +116,8 @@ public class AggregationCacheMemSizeTest { } private BitmapAggregator createBitmapAggr(boolean lowCardinality) { - BitmapCounter counter = new BitmapCounter(); - counter.merge(lowCardinality ? bitmaps[0] : bitmaps[3]); + MutableBitmapCounter counter = new MutableBitmapCounter(); + counter.orWith(lowCardinality ? bitmaps[0] : bitmaps[3]); BitmapAggregator result = new BitmapAggregator(); result.aggregate(counter); http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java index ad4d90b..97c9751 100644 --- a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java +++ b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -25,7 +25,7 @@ import java.nio.ByteBuffer; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.measure.BufferedMeasureCodec; -import org.apache.kylin.measure.bitmap.BitmapCounter; +import org.apache.kylin.measure.bitmap.MutableBitmapCounter; import org.apache.kylin.measure.hllc.HLLCounter; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; @@ -34,7 +34,7 @@ import org.junit.BeforeClass; import org.junit.Test; /** - * + * */ public class MeasureCodecTest extends LocalFileMetadataTestCase { @BeforeClass @@ -58,7 +58,7 @@ public class MeasureCodecTest extends LocalFileMetadataTestCase { HLLCounter hllc = new HLLCounter(16); hllc.add("1234567"); hllc.add("abcdefg"); - BitmapCounter bitmap = new BitmapCounter(); + MutableBitmapCounter bitmap = new MutableBitmapCounter(); bitmap.add(123); bitmap.add(45678); bitmap.add(Integer.MAX_VALUE - 10); http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java index cd0b4bb..2c91bfa 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java @@ -20,37 +20,49 @@ package org.apache.kylin.measure.bitmap; import org.apache.kylin.measure.MeasureAggregator; -/** - * Created by sunyerui on 15/12/2. - */ public class BitmapAggregator extends MeasureAggregator<BitmapCounter> { - private BitmapCounter sum = null; + private ImmutableBitmapCounter sum; + private boolean isMutable; @Override public void reset() { sum = null; + isMutable = false; } @Override public void aggregate(BitmapCounter value) { + ImmutableBitmapCounter v = (ImmutableBitmapCounter) value; + + // Here we optimize for case when group only has 1 value. In such situation, no + // aggregation is needed, so we just keep a reference to the first value, saving + // the cost of deserialization and merging. if (sum == null) { - sum = new BitmapCounter(value); - } else { - sum.merge(value); + sum = v; + return; + } + + MutableBitmapCounter mutable; + if (!isMutable) { // when aggregate the second value + mutable = sum.toMutable(); + sum = mutable; + isMutable = true; + } else { // for the third, forth, ... + mutable = (MutableBitmapCounter) sum; } + mutable.orWith(v); } @Override public BitmapCounter aggregate(BitmapCounter value1, BitmapCounter value2) { - if (value1 == null) { - return new BitmapCounter(value2); - } else if (value2 == null) { - return new BitmapCounter(value1); + MutableBitmapCounter merged = new MutableBitmapCounter(); + if (value1 != null) { + merged.orWith((ImmutableBitmapCounter) value1); + } + if (value2 != null) { + merged.orWith((ImmutableBitmapCounter) value2); } - - BitmapCounter merged = new BitmapCounter(value1); - merged.merge(value2); return merged; } @@ -61,10 +73,6 @@ public class BitmapAggregator extends MeasureAggregator<BitmapCounter> { @Override public int getMemBytesEstimate() { - if (sum == null) { - return Integer.MIN_VALUE; - } else { - return sum.getMemBytes(); - } + return sum == null ? 0 : sum.getMemBytes(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java index ac932ce..f07059c 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java @@ -18,158 +18,43 @@ package org.apache.kylin.measure.bitmap; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Iterator; -import org.apache.kylin.common.util.ByteBufferBackedInputStream; -import org.roaringbitmap.buffer.MutableRoaringBitmap; - /** - * Created by sunyerui on 15/12/1. + * An implementation-agnostic bitmap type. */ -public class BitmapCounter implements Comparable<BitmapCounter>, java.io.Serializable { - - private MutableRoaringBitmap bitmap = new MutableRoaringBitmap(); - - public BitmapCounter() { - } - - public BitmapCounter(BitmapCounter another) { - merge(another); - } - - public void clear() { - bitmap.clear(); - } - - public BitmapCounter clone() { - BitmapCounter newCounter = new BitmapCounter(); - newCounter.bitmap = bitmap.clone(); - return newCounter; - } - - public void add(int value) { - bitmap.add(value); - } - - public void add(String value) { - if (value == null || value.isEmpty()) { - return; - } - add(Integer.parseInt(value)); - } - - public void merge(BitmapCounter another) { - this.bitmap.or(another.bitmap); - } - - public void intersect(BitmapCounter another) { - this.bitmap.and(another.bitmap); - } - - public long getCount() { - return this.bitmap.getCardinality(); - } - - public int getMemBytes() { - return this.bitmap.getSizeInBytes(); - } - - public Iterator<Integer> iterator() { - return bitmap.iterator(); - } - - public void writeRegisters(ByteBuffer out) throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bos); - bitmap.runOptimize(); - bitmap.serialize(dos); - dos.close(); - ByteBuffer bb = ByteBuffer.wrap(bos.toByteArray()); - out.put(bb); - } - - public void readRegisters(ByteBuffer in) throws IOException { - try (DataInputStream is = new DataInputStream(new ByteBufferBackedInputStream(in))) { - bitmap.deserialize(is); - } - } - - @Override - public String toString() { - long count = getCount(); - if (count <= 10) { - return "(" + count + ")" + bitmap.toString(); - } else { - StringBuilder sb = new StringBuilder(); - sb.append("(").append(count).append("){"); - int values = 0; - for (Integer v : bitmap) { - if (values++ < 10) { - sb.append(v).append(","); - } else { - sb.append("..."); - break; - } - } - sb.append("}"); - return sb.toString(); - } - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + bitmap.hashCode(); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - BitmapCounter other = (BitmapCounter) obj; - return bitmap.equals(other.bitmap); - } - - @Override - public int compareTo(BitmapCounter o) { - if (o == null) - return 1; - - long e1 = this.getCount(); - long e2 = o.getCount(); - - if (e1 == e2) - return 0; - else if (e1 > e2) - return 1; - else - return -1; - } - - public int peekLength(ByteBuffer in) { - int mark = in.position(); - int len; - - MutableRoaringBitmap bitmap = new MutableRoaringBitmap(); - try (DataInputStream is = new DataInputStream(new ByteBufferBackedInputStream(in))) { - bitmap.deserialize(is); - } catch (IOException e) { - throw new IllegalStateException(e); - } - - len = in.position() - mark; - in.position(mark); - return len; - } +public interface BitmapCounter extends Iterable<Integer> { + /** + * @return cardinality of the bitmap + */ + long getCount(); + + /** + * @return estimated memory footprint of this counter + */ + int getMemBytes(); + + /** + * @return a iterator of the ints stored in this counter. + */ + Iterator<Integer> iterator(); + + /** + * Serialize this counter. The current counter is not modified. + */ + void serialize(ByteBuffer out) throws IOException; + + /** + * Deserialize a counter from its serialized form. + * <p> After deserialize, any changes to `in` should not affect the returned counter. + */ + BitmapCounter deserialize(ByteBuffer in) throws IOException; + + /** + * @return size of the counter stored in the current position of `in`. + * The position field must not be modified. + */ + int peekLength(ByteBuffer in); } http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapDistinctCountAggFunc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapDistinctCountAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapDistinctCountAggFunc.java index d039b6d..66215e1 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapDistinctCountAggFunc.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapDistinctCountAggFunc.java @@ -18,35 +18,30 @@ package org.apache.kylin.measure.bitmap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** - * Created by sunyerui on 15/12/22. + * Bitmap-based distinct count UDAF, called by calcite runtime. */ public class BitmapDistinctCountAggFunc { - private static final Logger logger = LoggerFactory.getLogger(BitmapDistinctCountAggFunc.class); - - public static BitmapCounter init() { - return null; + public static BitmapAggregator init() { + return new BitmapAggregator(); } - public static BitmapCounter add(BitmapCounter counter, Object v) { - BitmapCounter c = (BitmapCounter) v; - if (counter == null) { - return new BitmapCounter(c); - } else { - counter.merge(c); - return counter; - } + public static BitmapAggregator add(BitmapAggregator agg, Object value) { + agg.aggregate((BitmapCounter) value); + return agg; } - public static BitmapCounter merge(BitmapCounter counter0, Object counter1) { - return add(counter0, counter1); + public static BitmapAggregator merge(BitmapAggregator agg, Object value) { + BitmapAggregator agg2 = (BitmapAggregator) value; + if (agg2.getState() == null) { + return agg; + } + return add(agg, agg2.getState()); } - public static long result(BitmapCounter counter) { - return counter == null ? 0L : counter.getCount(); + public static long result(BitmapAggregator agg) { + BitmapCounter finalState = agg.getState(); + return finalState == null ? 0 : finalState.getCount(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java index cf42d1b..dcdf945 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java @@ -17,9 +17,6 @@ */ package org.apache.kylin.measure.bitmap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -31,10 +28,9 @@ import java.util.Map; * requires an bitmap count distinct measure of uuid, and an dimension of event */ public class BitmapIntersectDistinctCountAggFunc { - private static final Logger logger = LoggerFactory.getLogger(BitmapIntersectDistinctCountAggFunc.class); public static class RetentionPartialResult { - Map<Object, BitmapCounter> map; + Map<Object, MutableBitmapCounter> map; List keyList; public RetentionPartialResult() { @@ -45,29 +41,34 @@ public class BitmapIntersectDistinctCountAggFunc { if (this.keyList == null) { this.keyList = keyList; } - BitmapCounter counter = map.get(key); - if (counter == null) { - counter = new BitmapCounter(); - map.put(key, counter); + if (this.keyList != null && this.keyList.contains(key)) { + MutableBitmapCounter counter = map.get(key); + if (counter == null) { + counter = new MutableBitmapCounter(); + map.put(key, counter); + } + counter.orWith((ImmutableBitmapCounter) value); } - counter.merge((BitmapCounter)value); } public long result() { if (keyList == null || keyList.isEmpty()) { return 0; } - BitmapCounter counter = null; + // if any specified key not in map, the intersection must be 0 for (Object key : keyList) { - BitmapCounter c = map.get(key); - if (c == null) { - // We have a key in filter list but not in map, meaning there's no intersect data + if (!map.containsKey(key)) { return 0; + } + } + MutableBitmapCounter counter = null; + for (Object key : keyList) { + MutableBitmapCounter c = map.get(key); + if (counter == null) { + counter = new MutableBitmapCounter(); + counter.orWith(c); } else { - if (counter == null) { - counter = c.clone(); - } - counter.intersect(c); + counter.andWith(c); } } return counter.getCount(); http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java index 6ad82a1..b6f1975 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java @@ -18,8 +18,8 @@ package org.apache.kylin.measure.bitmap; +import java.util.Arrays; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Map; @@ -38,6 +38,8 @@ import org.apache.kylin.metadata.realization.SQLDigest.SQLCall; import com.google.common.collect.ImmutableMap; +import static com.google.common.base.Preconditions.checkArgument; + /** * Created by sunyerui on 15/12/10. */ @@ -77,11 +79,14 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> { @Override public void validate(FunctionDesc functionDesc) throws IllegalArgumentException { - if (FUNC_COUNT_DISTINCT.equals(functionDesc.getExpression()) == false) - throw new IllegalArgumentException("BitmapMeasureType func is not " + FUNC_COUNT_DISTINCT + " but " + functionDesc.getExpression()); - - if (DATATYPE_BITMAP.equals(functionDesc.getReturnDataType().getName()) == false) - throw new IllegalArgumentException("BitmapMeasureType datatype is not " + DATATYPE_BITMAP + " but " + functionDesc.getReturnDataType().getName()); + checkArgument(FUNC_COUNT_DISTINCT.equals(functionDesc.getExpression()), + "BitmapMeasureType only support function %s, got %s", FUNC_COUNT_DISTINCT, functionDesc.getExpression()); + checkArgument(functionDesc.getParameterCount() == 1, + "BitmapMeasureType only support 1 parameter, got %d", functionDesc.getParameterCount()); + + String returnType = functionDesc.getReturnDataType().getName(); + checkArgument(DATATYPE_BITMAP.equals(returnType), + "BitmapMeasureType's return type must be %s, got %s", DATATYPE_BITMAP, returnType); } @Override @@ -92,24 +97,29 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> { @Override public MeasureIngester<BitmapCounter> newIngester() { return new MeasureIngester<BitmapCounter>() { - BitmapCounter current = new BitmapCounter(); + MutableBitmapCounter current = new MutableBitmapCounter(); @Override public BitmapCounter valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { - BitmapCounter bitmap = current; + checkArgument(values.length == 1, "expect 1 value, got %s", Arrays.toString(values)); + + MutableBitmapCounter bitmap = current; bitmap.clear(); + + if (values[0] == null) { + return bitmap; + } + + int id; if (needDictionaryColumn(measureDesc.getFunction())) { TblColRef literalCol = measureDesc.getFunction().getParameter().getColRefs().get(0); Dictionary<String> dictionary = dictionaryMap.get(literalCol); - if (values != null && values.length > 0 && values[0] != null) { - int id = dictionary.getIdFromValue(values[0]); - bitmap.add(id); - } + id = dictionary.getIdFromValue(values[0]); } else { - for (String value : values) { - bitmap.add(value); - } + id = Integer.parseInt(values[0]); } + + bitmap.add(id); return bitmap; } @@ -122,11 +132,9 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> { Dictionary<String> sourceDict = oldDicts.get(colRef); Dictionary<String> mergedDict = newDicts.get(colRef); - BitmapCounter retValue = new BitmapCounter(); + MutableBitmapCounter retValue = new MutableBitmapCounter(); byte[] literal = new byte[sourceDict.getSizeOfValue()]; - Iterator<Integer> iterator = value.iterator(); - while (iterator.hasNext()) { - int id = iterator.next(); + for (int id : value) { int newId; int size = sourceDict.getValueBytesFromId(id, literal, 0); if (size < 0) { @@ -141,7 +149,7 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> { @Override public void reset() { - current = new BitmapCounter(); + current = new MutableBitmapCounter(); } }; } @@ -174,8 +182,8 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> { return true; } - static final Map<String, Class<?>> UDAF_MAP = ImmutableMap.<String, Class<?>> of(// - FUNC_COUNT_DISTINCT, BitmapDistinctCountAggFunc.class, // + static final Map<String, Class<?>> UDAF_MAP = ImmutableMap.of( + FUNC_COUNT_DISTINCT, BitmapDistinctCountAggFunc.class, FUNC_INTERSECT_COUNT_DISTINCT, BitmapIntersectDistinctCountAggFunc.class); @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java index 089d18c..8b15d1c 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java @@ -18,54 +18,41 @@ package org.apache.kylin.measure.bitmap; -import java.io.IOException; -import java.nio.ByteBuffer; - import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.datatype.DataTypeSerializer; -/** - * Created by sunyerui on 15/12/1. - */ -public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> { +import java.io.IOException; +import java.nio.ByteBuffer; - private ThreadLocal<BitmapCounter> current = new ThreadLocal<>(); +public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> { + private static final BitmapCounter DELEGATE = new ImmutableBitmapCounter(); + // called by reflection public BitmapSerializer(DataType type) { } @Override public void serialize(BitmapCounter value, ByteBuffer out) { try { - value.writeRegisters(out); + value.serialize(out); } catch (IOException e) { throw new RuntimeException(e); } } - private BitmapCounter current() { - BitmapCounter counter = current.get(); - if (counter == null) { - counter = new BitmapCounter(); - current.set(counter); - } - return counter; - } - @Override public BitmapCounter deserialize(ByteBuffer in) { - BitmapCounter counter = current(); + try { - counter.readRegisters(in); + return DELEGATE.deserialize(in); } catch (IOException e) { throw new RuntimeException(e); } - return counter; } @Override public int peekLength(ByteBuffer in) { - return current().peekLength(in); + return DELEGATE.peekLength(in); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/ImmutableBitmapCounter.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/ImmutableBitmapCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/ImmutableBitmapCounter.java new file mode 100644 index 0000000..5c39a4a --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/ImmutableBitmapCounter.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.measure.bitmap; + +import org.apache.kylin.common.util.ByteBufferOutputStream; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.util.Iterator; + +/** + * A thin wrapper around {@link ImmutableRoaringBitmap}. + */ +public class ImmutableBitmapCounter implements BitmapCounter { + + protected ImmutableRoaringBitmap bitmap; + + public ImmutableBitmapCounter() { + this(ImmutableRoaringBitmap.bitmapOf()); + } + + public ImmutableBitmapCounter(ImmutableRoaringBitmap bitmap) { + this.bitmap = bitmap; + } + + @Override + public long getCount() { + return bitmap.getCardinality(); + } + + @Override + public int getMemBytes() { + return bitmap.getSizeInBytes(); + } + + @Override + public Iterator<Integer> iterator() { + return bitmap.iterator(); + } + + @Override + public void serialize(ByteBuffer out) throws IOException { + if (out.remaining() < bitmap.serializedSizeInBytes()) { + throw new BufferOverflowException(); + } + bitmap.serialize(new DataOutputStream(new ByteBufferOutputStream(out))); + } + + @Override + public BitmapCounter deserialize(ByteBuffer in) throws IOException { + int size = peekLength(in); + // make a copy of the content to be safe + byte[] dst = new byte[size]; + in.get(dst); + + // just map the buffer, faster than deserialize + ImmutableRoaringBitmap bitmap = new ImmutableRoaringBitmap(ByteBuffer.wrap(dst)); + return new ImmutableBitmapCounter(bitmap); + } + + @Override + public int peekLength(ByteBuffer in) { + // only look at the metadata of the bitmap, no deserialization happens + ImmutableRoaringBitmap bitmap = new ImmutableRoaringBitmap(in); + return bitmap.serializedSizeInBytes(); + } + + /** + * Copies the content of this counter to a counter that can be modified. + * @return a mutable counter + */ + public MutableBitmapCounter toMutable() { + MutableBitmapCounter mutable = new MutableBitmapCounter(); + mutable.orWith(this); + return mutable; + } + + @Override + public boolean equals(Object obj) { + return (obj instanceof ImmutableBitmapCounter) && + bitmap.equals(((ImmutableBitmapCounter) obj).bitmap); + } + + @Override + public int hashCode() { + return bitmap.hashCode(); + } + + @Override + public String toString() { + return "BitmapCounter[" + getCount() + "]"; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/MutableBitmapCounter.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/MutableBitmapCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/MutableBitmapCounter.java new file mode 100644 index 0000000..af01790 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/MutableBitmapCounter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.measure.bitmap; + +import org.roaringbitmap.buffer.MutableRoaringBitmap; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * A thin wrapper around {@link MutableRoaringBitmap}. + */ +public class MutableBitmapCounter extends ImmutableBitmapCounter { + + public MutableBitmapCounter() { + super(new MutableRoaringBitmap()); + } + + private MutableRoaringBitmap getBitmap() { + return (MutableRoaringBitmap) bitmap; + } + + public void clear() { + getBitmap().clear(); + } + + public void add(int value) { + getBitmap().add(value); + } + + public void orWith(ImmutableBitmapCounter another) { + getBitmap().or(another.bitmap); + } + + public void andWith(ImmutableBitmapCounter another) { + getBitmap().and(another.bitmap); + } + + @Override + public void serialize(ByteBuffer out) throws IOException { + getBitmap().runOptimize(); + super.serialize(out); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java index 3d48ac2..39921c2 100644 --- a/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java +++ b/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java @@ -33,7 +33,7 @@ import org.apache.kylin.measure.basic.LongMaxAggregator; import org.apache.kylin.measure.basic.LongMinAggregator; import org.apache.kylin.measure.basic.LongSumAggregator; import org.apache.kylin.measure.bitmap.BitmapAggregator; -import org.apache.kylin.measure.bitmap.BitmapCounter; +import org.apache.kylin.measure.bitmap.MutableBitmapCounter; import org.apache.kylin.measure.extendedcolumn.ExtendedColumnMeasureType; import org.apache.kylin.measure.hllc.HLLCAggregator; import org.apache.kylin.measure.hllc.HLLCounter; @@ -104,7 +104,7 @@ public class AggregatorMemEstimateTest extends LocalFileMetadataTestCase { hllcAggregator.aggregate(new HLLCounter(14)); BitmapAggregator bitmapAggregator = new BitmapAggregator(); - BitmapCounter bitmapCounter = new BitmapCounter(); + MutableBitmapCounter bitmapCounter = new MutableBitmapCounter(); for (int i = 4000; i <= 100000; i += 2) { bitmapCounter.add(i); } http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapAggregatorTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapAggregatorTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapAggregatorTest.java index e216d0b..03eb53a 100644 --- a/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapAggregatorTest.java +++ b/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapAggregatorTest.java @@ -18,39 +18,29 @@ package org.apache.kylin.measure.bitmap; +import org.junit.Test; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import org.junit.Test; - -/** - * Created by sunyerui on 15/12/31. - */ public class BitmapAggregatorTest { @Test public void testAggregator() { - BitmapCounter counter = new BitmapCounter(); - counter.add(1); - counter.add(3333); - counter.add("123"); - counter.add(123); - assertEquals(3, counter.getCount()); - - BitmapCounter counter2 = new BitmapCounter(); - counter2.add("23456"); - counter2.add(12273456); - counter2.add("4258"); - counter2.add(123); - assertEquals(4, counter2.getCount()); - BitmapAggregator aggregator = new BitmapAggregator(); - assertNull(aggregator.getState()); - assertEquals(Integer.MIN_VALUE, aggregator.getMemBytesEstimate()); + assertNull(null, aggregator.getState()); + + aggregator.aggregate(new ImmutableBitmapCounter( + ImmutableRoaringBitmap.bitmapOf(10, 20, 30, 40) + )); + assertEquals(4, aggregator.getState().getCount()); + + aggregator.aggregate(new ImmutableBitmapCounter( + ImmutableRoaringBitmap.bitmapOf(25, 30, 35, 40, 45) + )); + assertEquals(7, aggregator.getState().getCount()); - aggregator.aggregate(counter); - aggregator.aggregate(counter2); - assertEquals(6, aggregator.getState().getCount()); aggregator.reset(); assertNull(aggregator.getState()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapCounterTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapCounterTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapCounterTest.java index c9c1b51..70e4ecc 100644 --- a/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapCounterTest.java +++ b/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapCounterTest.java @@ -18,58 +18,66 @@ package org.apache.kylin.measure.bitmap; -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.nio.ByteBuffer; - import org.junit.Test; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; -/** - * Created by sunyerui on 15/12/31. - */ public class BitmapCounterTest { @Test - public void testAddAndMergeValues() { - BitmapCounter counter = new BitmapCounter(); - counter.add(1); - counter.add(3333); - counter.add("123"); - counter.add(123); - assertEquals(3, counter.getCount()); + public void testBitmapCounter() { + ImmutableRoaringBitmap bitmap = ImmutableRoaringBitmap.bitmapOf(10, 20, 30, 1000); + ImmutableBitmapCounter counter = new ImmutableBitmapCounter(bitmap); + assertEquals(4, counter.getCount()); + assertTrue(counter.getMemBytes() > 0); - BitmapCounter counter2 = new BitmapCounter(); - counter2.add("23456"); - counter2.add(12273456); - counter2.add("4258"); - counter2.add(123); - counter2.add(-2147483648); - counter2.add(-2); - assertEquals(6, counter2.getCount()); + MutableBitmapCounter counter2 = new MutableBitmapCounter(); + assertEquals(0, counter2.getCount()); + counter2.add(10); + counter2.add(30); + counter2.add(40); + counter2.add(2000); + assertEquals(4, counter2.getCount()); - counter.merge(counter2); - assertEquals(8, counter.getCount()); - System.out.print("counter size: " + counter.getMemBytes() + ", counter2 size: " + counter2.getMemBytes()); + counter2.orWith(counter); + assertEquals(4, counter.getCount()); + assertEquals(6, counter2.getCount()); // in-place change + + int i = 0; + int[] values = new int[(int) counter2.getCount()]; + for (int value : counter2) { + values[i++] = value; + } + assertArrayEquals(new int[]{10, 20, 30, 40, 1000, 2000}, values); + + counter2.clear(); + assertEquals(0, counter2.getCount()); } @Test - public void testSerDeCounter() throws IOException { - BitmapCounter counter = new BitmapCounter(); - for (int i = 1; i < 1000; i++) { - counter.add(i); - } - ByteBuffer buffer = ByteBuffer.allocate(10 * 1024 * 1024); - counter.writeRegisters(buffer); - int len = buffer.position(); + public void testToMutableBitmapCounter() { + ImmutableRoaringBitmap bitmap = ImmutableRoaringBitmap.bitmapOf(10, 20, 30, 1000); + ImmutableBitmapCounter immutable = new ImmutableBitmapCounter(bitmap); + MutableBitmapCounter mutable = new MutableBitmapCounter(); + mutable.orWith(immutable); + + assertEquals(4, immutable.getCount()); + assertEquals(4, mutable.getCount()); + assertTrue(immutable.equals(mutable)); + assertTrue(mutable.equals(immutable)); - buffer.position(0); - assertEquals(len, counter.peekLength(buffer)); - assertEquals(0, buffer.position()); + MutableBitmapCounter newCounter = immutable.toMutable(); + newCounter.add(40); + assertEquals(4, immutable.getCount()); + assertEquals(5, newCounter.getCount()); - BitmapCounter counter2 = new BitmapCounter(); - counter2.readRegisters(buffer); - assertEquals(999, counter2.getCount()); + newCounter = mutable.toMutable(); + newCounter.add(40); + assertEquals(4, mutable.getCount()); + assertEquals(5, newCounter.getCount()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapSerializerTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapSerializerTest.java index 41efb2c..71fcae6 100644 --- a/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapSerializerTest.java +++ b/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapSerializerTest.java @@ -18,19 +18,20 @@ package org.apache.kylin.measure.bitmap; -import static org.junit.Assert.assertEquals; - -import java.nio.ByteBuffer; - import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.metadata.datatype.DataType; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; + +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; -/** - * Created by sunyerui on 15/12/31. - */ public class BitmapSerializerTest extends LocalFileMetadataTestCase { @BeforeClass public static void setUp() throws Exception { @@ -43,27 +44,36 @@ public class BitmapSerializerTest extends LocalFileMetadataTestCase { } @Test - public void testSerDeCounter() { - BitmapCounter counter = new BitmapCounter(); - counter.add(1); - counter.add(3333); - counter.add("123"); - counter.add(123); - assertEquals(3, counter.getCount()); - - ByteBuffer buffer = ByteBuffer.allocate(10 * 1024 * 1024); + public void testBitmapSerDe() { BitmapSerializer serializer = new BitmapSerializer(DataType.ANY); + + ImmutableRoaringBitmap bitmap = ImmutableRoaringBitmap.bitmapOf(1, 1234, 5678, 100000); + ImmutableBitmapCounter counter = new ImmutableBitmapCounter(bitmap); + + ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024); serializer.serialize(counter, buffer); - int len = buffer.position(); + int size = buffer.position(); + buffer.flip(); + + assertEquals(size, serializer.peekLength(buffer)); + assertEquals(0, buffer.position()); // peek doesn't change buffer + + BitmapCounter counter2 = serializer.deserialize(buffer); + assertEquals(size, buffer.position()); // deserialize advance positions to next record + assertEquals(4, counter2.getCount()); - buffer.position(0); - BitmapSerializer deSerializer = new BitmapSerializer(DataType.ANY); - BitmapCounter counter2 = deSerializer.deserialize(buffer); - assertEquals(3, counter2.getCount()); + buffer.flip(); + for (int i = 0; i < size; i++) { + buffer.put((byte) 0); // clear buffer content + } + assertEquals(4, counter2.getCount()); - buffer.position(0); - assertEquals(len, deSerializer.peekLength(buffer)); - assertEquals(8 * 1024 * 1024, deSerializer.maxLength()); - System.out.println("counter size " + deSerializer.getStorageBytesEstimate()); + buffer = ByteBuffer.allocate(size - 1); + try { + serializer.serialize(counter, buffer); + Assert.fail(); + } catch (Exception e) { + assertTrue(e instanceof BufferOverflowException); + } } } \ No newline at end of file