KYLIN-1832 code review
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e6e330a8 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e6e330a8 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e6e330a8 Branch: refs/heads/master Commit: e6e330a8bd47f1d2dd5fd6f68b510c3cf0be0287 Parents: f05404d Author: Li Yang <liy...@apache.org> Authored: Wed Dec 14 15:29:56 2016 +0800 Committer: Li Yang <liy...@apache.org> Committed: Wed Dec 14 15:29:56 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/cube/util/CubingUtils.java | 12 +- .../apache/kylin/gridtable/UnitTestSupport.java | 22 +- .../benchmark/GTScannerBenchmark2.java | 4 +- .../gridtable/AggregationCacheMemSizeTest.java | 4 +- .../metadata/measure/MeasureCodecTest.java | 4 +- .../org/apache/kylin/measure/MeasureType.java | 2 +- .../kylin/measure/MeasureTypeFactory.java | 2 +- .../kylin/measure/hllc/DenseRegister.java | 26 +- .../kylin/measure/hllc/HLLCAggregator.java | 10 +- .../kylin/measure/hllc/HLLCMeasureType.java | 20 +- .../kylin/measure/hllc/HLLCSerializer.java | 16 +- .../apache/kylin/measure/hllc/HLLCounter.java | 377 ++++++++++++++++++ .../kylin/measure/hllc/HLLCounterOld.java | 393 +++++++++++++++++++ .../measure/hllc/HLLDistinctCountAggFunc.java | 22 +- .../measure/hllc/HyperLogLogPlusCounterNew.java | 388 ------------------ .../measure/hllc/HyperLogLogPlusCounterOld.java | 392 ------------------ .../org/apache/kylin/measure/hllc/Register.java | 4 +- .../kylin/measure/hllc/SparseRegister.java | 38 +- .../measure/AggregatorMemEstimateTest.java | 4 +- .../measure/hll/HyperLogLogCounterOldTest.java | 265 ------------- .../measure/hll2/HyperLogLogCounterNewTest.java | 301 -------------- .../hll2/NewHyperLogLogBenchmarkTest.java | 288 -------------- .../kylin/measure/hllc/HLLCounterOldTest.java | 266 +++++++++++++ .../kylin/measure/hllc/HLLCounterTest.java | 316 +++++++++++++++ .../hllc/NewHyperLogLogBenchmarkTest.java | 291 ++++++++++++++ .../kylin/engine/mr/common/CubeStatsReader.java | 12 +- .../kylin/engine/mr/common/CubeStatsWriter.java | 6 +- .../mr/steps/FactDistinctColumnsReducer.java | 8 +- .../mr/steps/FactDistinctHiveColumnsMapper.java | 10 +- .../engine/mr/steps/MergeStatisticsStep.java | 6 +- .../kylin/engine/mr/steps/CubeSamplingTest.java | 8 +- .../steps/FactDistinctColumnsReducerTest.java | 4 +- .../apache/kylin/engine/spark/SparkCubing.java | 28 +- .../cardinality/ColumnCardinalityMapper.java | 10 +- .../cardinality/ColumnCardinalityReducer.java | 12 +- .../ColumnCardinalityReducerTest.java | 4 +- 36 files changed, 1802 insertions(+), 1773 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java index 35139a4..5e63f94 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java @@ -38,7 +38,7 @@ import org.apache.kylin.dict.DictionaryGenerator; import org.apache.kylin.dict.DictionaryInfo; import org.apache.kylin.dict.DictionaryManager; import org.apache.kylin.dict.IterableDictionaryValueEnumerator; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; +import org.apache.kylin.measure.hllc.HLLCounter; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.ReadableTable; @@ -59,7 +59,7 @@ public class CubingUtils { private static Logger logger = LoggerFactory.getLogger(CubingUtils.class); - public static Map<Long, HyperLogLogPlusCounterNew> sampling(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDescIn, Iterable<List<String>> streams) { + public static Map<Long, HLLCounter> sampling(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDescIn, Iterable<List<String>> streams) { final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(flatDescIn, cubeDesc); final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length; final List<Long> allCuboidIds = new CuboidScheduler(cubeDesc).getAllCuboidIds(); @@ -84,9 +84,9 @@ public class CubingUtils { return result; } }); - final Map<Long, HyperLogLogPlusCounterNew> result = Maps.newHashMapWithExpectedSize(allCuboidIds.size()); + final Map<Long, HLLCounter> result = Maps.newHashMapWithExpectedSize(allCuboidIds.size()); for (Long cuboidId : allCuboidIds) { - result.put(cuboidId, new HyperLogLogPlusCounterNew(cubeDesc.getConfig().getCubeStatsHLLPrecision())); + result.put(cuboidId, new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision())); Integer[] cuboidBitSet = new Integer[Long.bitCount(cuboidId)]; long mask = Long.highestOneBit(baseCuboidId); @@ -118,9 +118,9 @@ public class CubingUtils { } } - for (Map.Entry<Long, HyperLogLogPlusCounterNew> longHyperLogLogPlusCounterNewEntry : result.entrySet()) { + for (Map.Entry<Long, HLLCounter> longHyperLogLogPlusCounterNewEntry : result.entrySet()) { Long cuboidId = longHyperLogLogPlusCounterNewEntry.getKey(); - HyperLogLogPlusCounterNew counter = longHyperLogLogPlusCounterNewEntry.getValue(); + HLLCounter counter = longHyperLogLogPlusCounterNewEntry.getValue(); Hasher hc = hf.newHasher(); final Integer[] cuboidBitSet = allCuboidsBitSet.get(cuboidId); for (int position = 0; position < cuboidBitSet.length; position++) { http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java index 6cbf237..b8d116c 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java @@ -26,7 +26,7 @@ import java.util.List; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.gridtable.GTInfo.Builder; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; +import org.apache.kylin.measure.hllc.HLLCounter; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.datatype.LongMutable; @@ -106,16 +106,16 @@ public class UnitTestSupport { String d_01_15 = datePlus("2015-01-15", i * 4); String d_01_16 = datePlus("2015-01-16", i * 4); String d_01_17 = datePlus("2015-01-17", i * 4); - result.add(newRec(info, d_01_14, "Yang", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounterNew(14))); - result.add(newRec(info, d_01_14, "Luke", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounterNew(14))); - result.add(newRec(info, d_01_15, "Xu", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounterNew(14))); - result.add(newRec(info, d_01_15, "Dong", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounterNew(14))); - result.add(newRec(info, d_01_15, "Jason", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounterNew(14))); - result.add(newRec(info, d_01_16, "Mahone", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounterNew(14))); - result.add(newRec(info, d_01_16, "Shaofeng", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounterNew(14))); - result.add(newRec(info, d_01_16, "Qianhao", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounterNew(14))); - result.add(newRec(info, d_01_16, "George", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounterNew(14))); - result.add(newRec(info, d_01_17, "Kejia", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounterNew(14))); + result.add(newRec(info, d_01_14, "Yang", "Food", new LongMutable(10), new BigDecimal("10.5"), new HLLCounter(14))); + result.add(newRec(info, d_01_14, "Luke", "Food", new LongMutable(10), new BigDecimal("10.5"), new HLLCounter(14))); + result.add(newRec(info, d_01_15, "Xu", "Food", new LongMutable(10), new BigDecimal("10.5"), new HLLCounter(14))); + result.add(newRec(info, d_01_15, "Dong", "Food", new LongMutable(10), new BigDecimal("10.5"), new HLLCounter(14))); + result.add(newRec(info, d_01_15, "Jason", "Food", new LongMutable(10), new BigDecimal("10.5"), new HLLCounter(14))); + result.add(newRec(info, d_01_16, "Mahone", "Food", new LongMutable(10), new BigDecimal("10.5"), new HLLCounter(14))); + result.add(newRec(info, d_01_16, "Shaofeng", "Food", new LongMutable(10), new BigDecimal("10.5"), new HLLCounter(14))); + result.add(newRec(info, d_01_16, "Qianhao", "Food", new LongMutable(10), new BigDecimal("10.5"), new HLLCounter(14))); + result.add(newRec(info, d_01_16, "George", "Food", new LongMutable(10), new BigDecimal("10.5"), new HLLCounter(14))); + result.add(newRec(info, d_01_17, "Kejia", "Food", new LongMutable(10), new BigDecimal("10.5"), new HLLCounter(14))); } return result; } http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java index f80bd24..85d8c37 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java @@ -34,7 +34,7 @@ import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.GTScanRequestBuilder; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.gridtable.benchmark.SortedGTRecordGenerator.Randomizer; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; +import org.apache.kylin.measure.hllc.HLLCounter; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.filter.ColumnTupleFilter; import org.apache.kylin.metadata.filter.CompareTupleFilter; @@ -80,7 +80,7 @@ public class GTScannerBenchmark2 { gen.addDimension(100, 4, null); gen.addMeasure(8); gen.addMeasure(8, new Randomizer() { - HyperLogLogPlusCounterNew hllc = new HyperLogLogPlusCounterNew(12); + HLLCounter hllc = new HLLCounter(12); @Override public int fillRandom(Random rand, byte[] array, int offset) { http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java index 66a6b51..8ffe055 100644 --- a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java @@ -26,7 +26,7 @@ import org.apache.kylin.measure.basic.LongSumAggregator; import org.apache.kylin.measure.bitmap.BitmapAggregator; import org.apache.kylin.measure.bitmap.BitmapCounter; import org.apache.kylin.measure.hllc.HLLCAggregator; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; +import org.apache.kylin.measure.hllc.HLLCounter; import org.apache.kylin.metadata.datatype.DoubleMutable; import org.apache.kylin.metadata.datatype.LongMutable; import org.github.jamm.MemoryMeter; @@ -105,7 +105,7 @@ public class AggregationCacheMemSizeTest { private HLLCAggregator createHLLCAggr() { HLLCAggregator hllcAggregator = new HLLCAggregator(14); - hllcAggregator.aggregate(new HyperLogLogPlusCounterNew(14)); + hllcAggregator.aggregate(new HLLCounter(14)); return hllcAggregator; } http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java index cd1aa96..0f3f3a9 100644 --- a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java +++ b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java @@ -26,7 +26,7 @@ import java.nio.ByteBuffer; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.bitmap.BitmapCounter; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; +import org.apache.kylin.measure.hllc.HLLCounter; import org.apache.kylin.metadata.datatype.DoubleMutable; import org.apache.kylin.metadata.datatype.LongMutable; import org.apache.kylin.metadata.model.FunctionDesc; @@ -57,7 +57,7 @@ public class MeasureCodecTest extends LocalFileMetadataTestCase { DoubleMutable d = new DoubleMutable(1.0); LongMutable l = new LongMutable(2); BigDecimal b = new BigDecimal("333.1234"); - HyperLogLogPlusCounterNew hllc = new HyperLogLogPlusCounterNew(16); + HLLCounter hllc = new HLLCounter(16); hllc.add("1234567"); hllc.add("abcdefg"); BitmapCounter bitmap = new BitmapCounter(); http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java index 031636e..89ff382 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java @@ -36,7 +36,7 @@ import org.apache.kylin.metadata.tuple.TupleInfo; * MeasureType captures how a kind of aggregation is defined, how it is calculated * during cube build, and how it is involved in query and storage scan. * - * @param <T> the Java type of aggregation data object, e.g. HyperLogLogPlusCounterOld + * @param <T> the Java type of aggregation data object, e.g. HLLCounter */ abstract public class MeasureType<T> { http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java index d94dec9..694459b 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java @@ -62,7 +62,7 @@ import com.google.common.collect.Maps; } </pre> * - * @param <T> the Java type of aggregation data object, e.g. HyperLogLogPlusCounterOld + * @param <T> the Java type of aggregation data object, e.g. HLLCounter */ abstract public class MeasureTypeFactory<T> { http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/core-metadata/src/main/java/org/apache/kylin/measure/hllc/DenseRegister.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/DenseRegister.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/DenseRegister.java index 26ee6ab..c5814aa 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/DenseRegister.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/DenseRegister.java @@ -25,7 +25,6 @@ import java.util.Map; * Created by xiefan on 16-12-9. */ public class DenseRegister implements Register { - private int p; private int m; @@ -41,7 +40,7 @@ public class DenseRegister implements Register { } @Override - public Byte get(int pos) { + public byte get(int pos) { return register[pos]; } @@ -80,11 +79,28 @@ public class DenseRegister implements Register { } @Override - public int getHashCode() { - return Arrays.hashCode(register); + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + Arrays.hashCode(register); + return result; } - public byte[] getRawRegister() { + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + DenseRegister other = (DenseRegister) obj; + if (!Arrays.equals(register, other.register)) + return false; + return true; + } + + byte[] getRawRegister() { return this.register; } http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java index ca73285..5966c04 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java @@ -23,10 +23,10 @@ import org.apache.kylin.measure.MeasureAggregator; /** */ @SuppressWarnings("serial") -public class HLLCAggregator extends MeasureAggregator<HyperLogLogPlusCounterNew> { +public class HLLCAggregator extends MeasureAggregator<HLLCounter> { final int precision; - HyperLogLogPlusCounterNew sum = null; + HLLCounter sum = null; public HLLCAggregator(int precision) { this.precision = precision; @@ -38,15 +38,15 @@ public class HLLCAggregator extends MeasureAggregator<HyperLogLogPlusCounterNew> } @Override - public void aggregate(HyperLogLogPlusCounterNew value) { + public void aggregate(HLLCounter value) { if (sum == null) - sum = new HyperLogLogPlusCounterNew(value); + sum = new HLLCounter(value); else sum.merge(value); } @Override - public HyperLogLogPlusCounterNew getState() { + public HLLCounter getState() { return sum; } http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java index 481fa4e..9601653 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java @@ -33,15 +33,15 @@ import org.apache.kylin.metadata.model.TblColRef; import com.google.common.collect.ImmutableMap; -public class HLLCMeasureType extends MeasureType<HyperLogLogPlusCounterNew> { +public class HLLCMeasureType extends MeasureType<HLLCounter> { public static final String FUNC_COUNT_DISTINCT = FunctionDesc.FUNC_COUNT_DISTINCT; public static final String DATATYPE_HLLC = "hllc"; - public static class Factory extends MeasureTypeFactory<HyperLogLogPlusCounterNew> { + public static class Factory extends MeasureTypeFactory<HLLCounter> { @Override - public MeasureType<HyperLogLogPlusCounterNew> createMeasureType(String funcName, DataType dataType) { + public MeasureType<HLLCounter> createMeasureType(String funcName, DataType dataType) { return new HLLCMeasureType(funcName, dataType); } @@ -56,7 +56,7 @@ public class HLLCMeasureType extends MeasureType<HyperLogLogPlusCounterNew> { } @Override - public Class<? extends DataTypeSerializer<HyperLogLogPlusCounterNew>> getAggrDataTypeSerializer() { + public Class<? extends DataTypeSerializer<HLLCounter>> getAggrDataTypeSerializer() { return HLLCSerializer.class; } } @@ -91,13 +91,13 @@ public class HLLCMeasureType extends MeasureType<HyperLogLogPlusCounterNew> { } @Override - public MeasureIngester<HyperLogLogPlusCounterNew> newIngester() { - return new MeasureIngester<HyperLogLogPlusCounterNew>() { - HyperLogLogPlusCounterNew current = new HyperLogLogPlusCounterNew(dataType.getPrecision()); + public MeasureIngester<HLLCounter> newIngester() { + return new MeasureIngester<HLLCounter>() { + HLLCounter current = new HLLCounter(dataType.getPrecision()); @Override - public HyperLogLogPlusCounterNew valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { - HyperLogLogPlusCounterNew hllc = current; + public HLLCounter valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { + HLLCounter hllc = current; hllc.clear(); for (String v : values) { if (v != null) @@ -109,7 +109,7 @@ public class HLLCMeasureType extends MeasureType<HyperLogLogPlusCounterNew> { } @Override - public MeasureAggregator<HyperLogLogPlusCounterNew> newAggregator() { + public MeasureAggregator<HLLCounter> newAggregator() { return new HLLCAggregator(dataType.getPrecision()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java index 1d01abc..e0992c7 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java @@ -28,10 +28,10 @@ import org.apache.kylin.metadata.datatype.DataTypeSerializer; * @author yangli9 * */ -public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounterNew> { +public class HLLCSerializer extends DataTypeSerializer<HLLCounter> { // be thread-safe and avoid repeated obj creation - private ThreadLocal<HyperLogLogPlusCounterNew> current = new ThreadLocal<HyperLogLogPlusCounterNew>(); + private ThreadLocal<HLLCounter> current = new ThreadLocal<HLLCounter>(); private int precision; @@ -40,7 +40,7 @@ public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounterNew } @Override - public void serialize(HyperLogLogPlusCounterNew value, ByteBuffer out) { + public void serialize(HLLCounter value, ByteBuffer out) { try { value.writeRegisters(out); } catch (IOException e) { @@ -48,18 +48,18 @@ public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounterNew } } - private HyperLogLogPlusCounterNew current() { - HyperLogLogPlusCounterNew hllc = current.get(); + private HLLCounter current() { + HLLCounter hllc = current.get(); if (hllc == null) { - hllc = new HyperLogLogPlusCounterNew(precision); + hllc = new HLLCounter(precision); current.set(hllc); } return hllc; } @Override - public HyperLogLogPlusCounterNew deserialize(ByteBuffer in) { - HyperLogLogPlusCounterNew hllc = current(); + public HLLCounter deserialize(ByteBuffer in) { + HLLCounter hllc = current(); try { hllc.readRegisters(in); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java new file mode 100644 index 0000000..22b5e55 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure.hllc; + +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import org.apache.kylin.common.util.BytesUtil; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Collection; +import java.util.Map; + +@SuppressWarnings("serial") +public class HLLCounter implements Serializable, Comparable<HLLCounter> { + + // not final for test purpose + static double OVERFLOW_FACTOR = 0.01; + + private int p; + + private int m; + + private HashFunction hashFunc = Hashing.murmur3_128(); + + private Register register; + + public HLLCounter() { + this(10, RegisterType.SPARSE, Hashing.murmur3_128()); + } + + public HLLCounter(int p) { + this(p, RegisterType.SPARSE, Hashing.murmur3_128()); + } + + public HLLCounter(int p, HashFunction hashFunc) { + this(p, RegisterType.SPARSE, hashFunc); + } + + public HLLCounter(HLLCounter another) { + this(another.p, another.hashFunc); + merge(another); + } + + HLLCounter(int p, RegisterType type) { + this(p, type, Hashing.murmur3_128()); + } + + HLLCounter(int p, RegisterType type, HashFunction hashFunc) { + this.p = p; + this.m = 1 << p;//(int) Math.pow(2, p); + this.hashFunc = hashFunc; + if (type == RegisterType.SPARSE) { + this.register = new SparseRegister(); + } else { + this.register = new DenseRegister(p); + } + } + + private boolean isDense(int size) { + double over = OVERFLOW_FACTOR * m; + return size > (int) over; + } + + public void add(int value) { + add(hashFunc.hashInt(value).asLong()); + } + + public void add(String value) { + add(hashFunc.hashString(value, Charset.defaultCharset()).asLong()); + } + + public void add(byte[] value) { + add(hashFunc.hashBytes(value).asLong()); + } + + public void add(byte[] value, int offset, int length) { + add(hashFunc.hashBytes(value, offset, length).asLong()); + } + + protected void add(long hash) { + int bucketMask = m - 1; + int bucket = (int) (hash & bucketMask); + int firstOnePos = Long.numberOfLeadingZeros(hash | bucketMask) + 1; + Byte b = register.get(bucket); + if (b == null || (byte) firstOnePos > b) { + register.set(bucket, (byte) firstOnePos); + } + toDenseIfNeeded(); + } + + private void toDenseIfNeeded() { + if (register instanceof SparseRegister) { + if (isDense(register.getSize())) { + register = ((SparseRegister) register).toDense(p); + } + } + } + + public void merge(HLLCounter another) { + assert this.p == another.p; + assert this.hashFunc == another.hashFunc; + if (register instanceof SparseRegister && another.register instanceof SparseRegister) { + register.merge(another.register); + toDenseIfNeeded(); + } else if (register instanceof SparseRegister && another.register instanceof DenseRegister) { + register = ((SparseRegister) register).toDense(p); + register.merge(another.register); + } else { + register.merge(another.register); + } + } + + public long getCountEstimate() { + return new HLLCSnapshot(this).getCountEstimate(); + } + + public int getPrecision() { + return this.p; + } + + public double getErrorRate() { + return 1.04 / Math.sqrt(m); + } + + @Override + public String toString() { + return "" + getCountEstimate(); + } + + // ============================================================================ + + // a memory efficient snapshot of HLL registers which can yield count estimate later + public static class HLLCSnapshot { + byte p; + double registerSum; + int zeroBuckets; + + public HLLCSnapshot(HLLCounter hllc) { + p = (byte) hllc.p; + registerSum = 0; + zeroBuckets = 0; + Register register = hllc.getRegister(); + DenseRegister dr; + if (register instanceof SparseRegister) { + dr = ((SparseRegister) register).toDense(p); + } else { + dr = (DenseRegister) register; + } + byte[] registers = dr.getRawRegister(); + for (int i = 0; i < hllc.m; i++) { + if (registers[i] == 0) { + registerSum++; + zeroBuckets++; + } else { + registerSum += 1.0 / (1L << registers[i]); + } + } + } + + public long getCountEstimate() { + int m = 1 << p; + double alpha = 0.7213 / (1 + 1.079 / m); + double estimate = alpha * m * m / registerSum; + + // small cardinality adjustment + if (zeroBuckets >= m * 0.07) { // (reference presto's HLL impl) + estimate = m * Math.log(m * 1.0 / zeroBuckets); + } else if (HyperLogLogPlusTable.isBiasCorrection(m, estimate)) { + estimate = HyperLogLogPlusTable.biasCorrection(p, estimate); + } + + return Math.round(estimate); + } + } + + public static void main(String[] args) throws IOException { + dumpErrorRates(); + } + + static void dumpErrorRates() { + for (int p = 10; p <= 18; p++) { + double rate = new HLLCounter(p, RegisterType.SPARSE).getErrorRate(); + double er = Math.round(rate * 10000) / 100D; + double er2 = Math.round(rate * 2 * 10000) / 100D; + double er3 = Math.round(rate * 3 * 10000) / 100D; + long size = Math.round(Math.pow(2, p)); + System.out.println("HLLC" + p + ",\t" + size + " bytes,\t68% err<" + er + "%" + ",\t95% err<" + er2 + "%" + ",\t99.7% err<" + er3 + "%"); + } + } + + public Register getRegister() { + return register; + } + + public void clear() { + register.clear(); + } + + // ============================================================================ + + public void writeRegisters(final ByteBuffer out) throws IOException { + + final int indexLen = getRegisterIndexSize(); + int size = register.getSize(); + + // decide output scheme -- map (3*size bytes) or array (2^p bytes) + byte scheme; + if (register instanceof SparseRegister || 5 + (indexLen + 1) * size < m) { + scheme = 0; // map + } else { + scheme = 1; // array + } + out.put(scheme); + if (scheme == 0) { // map scheme + BytesUtil.writeVInt(size, out); + if (register instanceof SparseRegister) { //sparse register + Collection<Map.Entry<Integer, Byte>> allValue = ((SparseRegister) register).getAllValue(); + for (Map.Entry<Integer, Byte> entry : allValue) { + writeUnsigned(entry.getKey(), indexLen, out); + out.put(entry.getValue()); + } + } else { //dense register + byte[] registers = ((DenseRegister) register).getRawRegister(); + for (int i = 0; i < m; i++) { + if (registers[i] > 0) { + writeUnsigned(i, indexLen, out); + out.put(registers[i]); + } + } + } + } else if (scheme == 1) { // array scheme + out.put(((DenseRegister) register).getRawRegister()); + } else + throw new IllegalStateException(); + } + + public void readRegisters(ByteBuffer in) throws IOException { + byte scheme = in.get(); + if (scheme == 0) { // map scheme + clear(); + int size = BytesUtil.readVInt(in); + if (size > m) + throw new IllegalArgumentException("register size (" + size + ") cannot be larger than m (" + m + ")"); + if (isDense(size)) { + register = new DenseRegister(p); + } else { + register = new SparseRegister();//default is sparse + } + int indexLen = getRegisterIndexSize(); + int key = 0; + for (int i = 0; i < size; i++) { + key = readUnsigned(in, indexLen); + register.set(key, in.get()); + } + } else if (scheme == 1) { // array scheme + if (register instanceof SparseRegister) { + register = new DenseRegister(p); + } + in.get(((DenseRegister) register).getRawRegister()); + } else + throw new IllegalStateException(); + } + + public int peekLength(ByteBuffer in) { + int mark = in.position(); + int len; + byte scheme = in.get(); + if (scheme == 0) { // map scheme + int size = BytesUtil.readVInt(in); + int indexLen = getRegisterIndexSize(); + len = in.position() - mark + (indexLen + 1) * size; + } else { + len = in.position() - mark + m; + } + + in.position(mark); + return len; + } + + public int maxLength() { + return 1 + m; + } + + private int getRegisterIndexSize() { + return (p - 1) / 8 + 1; // 2 when p=16, 3 when p=17 + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((hashFunc == null) ? 0 : hashFunc.hashCode()); + result = prime * result + p; + result = prime * result + register.hashCode(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + HLLCounter other = (HLLCounter) obj; + if (!hashFunc.equals(other.hashFunc)) + return false; + if (p != other.p) + return false; + if (!register.equals(other.register)) + return false; + return true; + } + + @Override + public int compareTo(HLLCounter o) { + if (o == null) + return 1; + + long e1 = this.getCountEstimate(); + long e2 = o.getCountEstimate(); + + if (e1 == e2) + return 0; + else if (e1 > e2) + return 1; + else + return -1; + } + + public static void writeUnsigned(int num, int size, ByteBuffer out) { + for (int i = 0; i < size; i++) { + out.put((byte) num); + num >>>= 8; + } + } + + public static int readUnsigned(ByteBuffer in, int size) { + int integer = 0; + int mask = 0xff; + int shift = 0; + for (int i = 0; i < size; i++) { + integer |= (in.get() << shift) & mask; + mask = mask << 8; + shift += 8; + } + return integer; + } + + public RegisterType getRegisterType() { + if (register instanceof SparseRegister) + return RegisterType.SPARSE; + else + return RegisterType.DENSE; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounterOld.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounterOld.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounterOld.java new file mode 100644 index 0000000..5cbdd43 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounterOld.java @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure.hllc; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Arrays; + +import org.apache.kylin.common.util.BytesUtil; + +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; + +/** + * Deprecated, use HLLCounter instead. + * + * About compression, test on HLLC data shows + * + * - LZF compression ratio is around 65%-80%, fast + * - GZIP compression ratio is around 41%-46%, very slow + */ +@Deprecated +@SuppressWarnings("serial") +public class HLLCounterOld implements Serializable, Comparable<HLLCounterOld> { + + private final int p; + private final int m; + private final HashFunction hashFunc; + byte[] registers; + int singleBucket; + + public HLLCounterOld() { + this(10); + } + + public HLLCounterOld(int p) { + this(p, Hashing.murmur3_128()); + } + + public HLLCounterOld(HLLCounterOld another) { + this(another.p, another.hashFunc); + merge(another); + } + + /** The larger p is, the more storage (2^p bytes), the better accuracy */ + private HLLCounterOld(int p, HashFunction hashFunc) { + this.p = p; + this.m = 1 << p;//(int) Math.pow(2, p); + this.hashFunc = hashFunc; + this.registers = new byte[m]; + this.singleBucket = -1; + } + + public void clear() { + byte zero = (byte) 0; + if (singleBucket == -1) { + //nothing + } else if (singleBucket >= 0) { + registers[singleBucket] = 0; + } else { + Arrays.fill(registers, zero); + } + singleBucket = -1; + } + + public void add(int value) { + add(hashFunc.hashInt(value).asLong()); + } + + public void add(String value) { + add(hashFunc.hashString(value, Charset.defaultCharset()).asLong()); + } + + public void add(byte[] value) { + add(hashFunc.hashBytes(value).asLong()); + } + + public void add(byte[] value, int offset, int length) { + add(hashFunc.hashBytes(value, offset, length).asLong()); + } + + protected void add(long hash) { + int bucketMask = m - 1; + int bucket = (int) (hash & bucketMask); + int firstOnePos = Long.numberOfLeadingZeros(hash | bucketMask) + 1; + + if (firstOnePos > registers[bucket]) + registers[bucket] = (byte) firstOnePos; + + if (singleBucket == -1) + singleBucket = bucket; + else + singleBucket = Integer.MIN_VALUE; + } + + public void merge(HLLCounterOld another) { + assert this.p == another.p; + assert this.hashFunc == another.hashFunc; + + // quick path for single value HLLC + if (another.singleBucket == -1) { + return; + } else if (another.singleBucket >= 0) { + int b = another.singleBucket; + if (registers[b] < another.registers[b]) + registers[b] = another.registers[b]; + } else { + // normal path + for (int i = 0; i < m; i++) { + if (registers[i] < another.registers[i]) + registers[i] = another.registers[i]; + } + } + singleBucket = Integer.MIN_VALUE; + } + + public long getCountEstimate() { + return new HLLCSnapshot(this).getCountEstimate(); + } + + public int getPrecision() { + return this.p; + } + + public double getErrorRate() { + return 1.04 / Math.sqrt(m); + } + + private int size() { + if (singleBucket == -1) { + return 0; + } else if (singleBucket >= 0) { + return 1; + } else { + int size = 0; + for (int i = 0; i < m; i++) { + if (registers[i] > 0) + size++; + } + return size; + } + } + + @Override + public String toString() { + return "" + getCountEstimate(); + } + + // ============================================================================ + + // a memory efficient snapshot of HLL registers which can yield count + // estimate later + public static class HLLCSnapshot { + byte p; + double registerSum; + int zeroBuckets; + + public HLLCSnapshot(HLLCounterOld hllc) { + p = (byte) hllc.p; + registerSum = 0; + zeroBuckets = 0; + + byte[] registers = hllc.registers; + for (int i = 0; i < hllc.m; i++) { + if (registers[i] == 0) { + registerSum++; + zeroBuckets++; + } else { + registerSum += 1.0 / (1L << registers[i]); + } + } + } + + public long getCountEstimate() { + int m = 1 << p; + double alpha = 0.7213 / (1 + 1.079 / m); + double estimate = alpha * m * m / registerSum; + + // small cardinality adjustment + if (zeroBuckets >= m * 0.07) { // (reference presto's HLL impl) + estimate = m * Math.log(m * 1.0 / zeroBuckets); + } else if (HyperLogLogPlusTable.isBiasCorrection(m, estimate)) { + estimate = HyperLogLogPlusTable.biasCorrection(p, estimate); + } + + return Math.round(estimate); + } + } + + // ============================================================================ + + public void writeRegisters(final ByteBuffer out) throws IOException { + + final int indexLen = getRegisterIndexSize(); + int size = size(); + + // decide output scheme -- map (3*size bytes) or array (2^p bytes) + byte scheme; + if (5 + (indexLen + 1) * size < m) // 5 is max len of vint + scheme = 0; // map + else + scheme = 1; // array + out.put(scheme); + + if (scheme == 0) { // map scheme + BytesUtil.writeVInt(size, out); + if (singleBucket == -1) { + // no non-zero register + } else if (singleBucket >= 0) { + writeUnsigned(singleBucket, indexLen, out); + out.put(registers[singleBucket]); + } else { + for (int i = 0; i < m; i++) { + if (registers[i] > 0) { + writeUnsigned(i, indexLen, out); + out.put(registers[i]); + } + } + } + } else if (scheme == 1) { // array scheme + out.put(registers); + } else + throw new IllegalStateException(); + } + + public void readRegisters(ByteBuffer in) throws IOException { + byte scheme = in.get(); + + if (scheme == 0) { // map scheme + clear(); + int size = BytesUtil.readVInt(in); + if (size > m) + throw new IllegalArgumentException("register size (" + size + ") cannot be larger than m (" + m + ")"); + int indexLen = getRegisterIndexSize(); + int key = 0; + for (int i = 0; i < size; i++) { + key = readUnsigned(in, indexLen); + registers[key] = in.get(); + } + + if (size == 0) + singleBucket = -1; + else if (size == 1) + singleBucket = key; + else + singleBucket = Integer.MIN_VALUE; + + } else if (scheme == 1) { // array scheme + in.get(registers); + singleBucket = Integer.MIN_VALUE; + } else + throw new IllegalStateException(); + } + + public int peekLength(ByteBuffer in) { + int mark = in.position(); + int len; + + byte scheme = in.get(); + if (scheme == 0) { // map scheme + int size = BytesUtil.readVInt(in); + int indexLen = getRegisterIndexSize(); + len = in.position() - mark + (indexLen + 1) * size; + } else { + len = in.position() - mark + m; + } + + in.position(mark); + return len; + } + + public int maxLength() { + return 1 + m; + } + + /*public void writeRegistersArray(final ByteBuffer out) { + out.put(this.registers); + } + + public void readRegistersArray(ByteBuffer in) { + in.get(registers, 0, m); + singleBucket = Integer.MIN_VALUE; + }*/ + + private int getRegisterIndexSize() { + return (p - 1) / 8 + 1; // 2 when p=16, 3 when p=17 + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((hashFunc == null) ? 0 : hashFunc.hashCode()); + result = prime * result + p; + result = prime * result + Arrays.hashCode(registers); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + HLLCounterOld other = (HLLCounterOld) obj; + if (hashFunc == null) { + if (other.hashFunc != null) + return false; + } else if (!hashFunc.equals(other.hashFunc)) + return false; + if (p != other.p) + return false; + if (!Arrays.equals(registers, other.registers)) + return false; + return true; + } + + @Override + public int compareTo(HLLCounterOld o) { + if (o == null) + return 1; + + long e1 = this.getCountEstimate(); + long e2 = o.getCountEstimate(); + + if (e1 == e2) + return 0; + else if (e1 > e2) + return 1; + else + return -1; + } + + public static void main(String[] args) throws IOException { + dumpErrorRates(); + } + + static void dumpErrorRates() { + for (int p = 10; p <= 18; p++) { + double rate = new HLLCounterOld(p).getErrorRate(); + double er = Math.round(rate * 10000) / 100D; + double er2 = Math.round(rate * 2 * 10000) / 100D; + double er3 = Math.round(rate * 3 * 10000) / 100D; + long size = Math.round(Math.pow(2, p)); + System.out.println("HLLC" + p + ",\t" + size + " bytes,\t68% err<" + er + "%" + ",\t95% err<" + er2 + "%" + ",\t99.7% err<" + er3 + "%"); + } + } + + /** + * + * @param num + * @param size + * @param out + */ + public static void writeUnsigned(int num, int size, ByteBuffer out) { + for (int i = 0; i < size; i++) { + out.put((byte) num); + num >>>= 8; + } + } + + public static int readUnsigned(ByteBuffer in, int size) { + int integer = 0; + int mask = 0xff; + int shift = 0; + for (int i = 0; i < size; i++) { + integer |= (in.get() << shift) & mask; + mask = mask << 8; + shift += 8; + } + return integer; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java index a72ad09..438a33f 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java @@ -31,21 +31,21 @@ public class HLLDistinctCountAggFunc { private static final Logger logger = LoggerFactory.getLogger(HLLDistinctCountAggFunc.class); - public static HyperLogLogPlusCounterNew init() { + public static HLLCounter init() { return null; } - public static HyperLogLogPlusCounterNew initAdd(Object v) { + public static HLLCounter initAdd(Object v) { if (v instanceof Long) { // holistic case long l = (Long) v; return new FixedValueHLLCMockup(l); } else { - HyperLogLogPlusCounterNew c = (HyperLogLogPlusCounterNew) v; - return new HyperLogLogPlusCounterNew(c); + HLLCounter c = (HLLCounter) v; + return new HLLCounter(c); } } - public static HyperLogLogPlusCounterNew add(HyperLogLogPlusCounterNew counter, Object v) { + public static HLLCounter add(HLLCounter counter, Object v) { if (v instanceof Long) { // holistic case long l = (Long) v; if (counter == null) { @@ -58,9 +58,9 @@ public class HLLDistinctCountAggFunc { return counter; } } else { - HyperLogLogPlusCounterNew c = (HyperLogLogPlusCounterNew) v; + HLLCounter c = (HLLCounter) v; if (counter == null) { - return new HyperLogLogPlusCounterNew(c); + return new HLLCounter(c); } else { counter.merge(c); return counter; @@ -68,16 +68,16 @@ public class HLLDistinctCountAggFunc { } } - public static HyperLogLogPlusCounterNew merge(HyperLogLogPlusCounterNew counter0, Object counter1) { + public static HLLCounter merge(HLLCounter counter0, Object counter1) { return add(counter0, counter1); } - public static long result(HyperLogLogPlusCounterNew counter) { + public static long result(HLLCounter counter) { return counter == null ? 0L : counter.getCountEstimate(); } @SuppressWarnings("serial") - private static class FixedValueHLLCMockup extends HyperLogLogPlusCounterNew { + private static class FixedValueHLLCMockup extends HLLCounter { private Long value = null; @@ -107,7 +107,7 @@ public class HLLDistinctCountAggFunc { } @Override - public void merge(HyperLogLogPlusCounterNew another) { + public void merge(HLLCounter another) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounterNew.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounterNew.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounterNew.java deleted file mode 100644 index d7329f6..0000000 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounterNew.java +++ /dev/null @@ -1,388 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.measure.hllc; - -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hashing; -import org.apache.kylin.common.util.BytesUtil; - -import java.io.IOException; -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.Collection; -import java.util.Map; - -@SuppressWarnings("serial") -public class HyperLogLogPlusCounterNew implements Serializable, Comparable<HyperLogLogPlusCounterNew> { - - private int p; - - private int m; - - private HashFunction hashFunc = Hashing.murmur3_128(); - - private Register register; - - public static double overflowFactor = 0.01; - - public HyperLogLogPlusCounterNew(int p, RegisterType type, HashFunction hashFunc) { - this.p = p; - this.m = 1 << p;//(int) Math.pow(2, p); - this.hashFunc = hashFunc; - if (type == RegisterType.SPARSE) { - double over = overflowFactor * m; - this.register = new SparseRegister((int) over); - } else { - this.register = new DenseRegister(p); - } - } - - public HyperLogLogPlusCounterNew() { - this(10, RegisterType.SPARSE, Hashing.murmur3_128()); - } - - public HyperLogLogPlusCounterNew(int p) { - this(p, RegisterType.SPARSE, Hashing.murmur3_128()); - } - - public HyperLogLogPlusCounterNew(int p, RegisterType type) { - this(p, type, Hashing.murmur3_128()); - } - - public HyperLogLogPlusCounterNew(int p, HashFunction hashFunc) { - this(p, RegisterType.SPARSE, hashFunc); - } - - public HyperLogLogPlusCounterNew(HyperLogLogPlusCounterNew another) { - this(another.p, another.hashFunc); - merge(another); - } - - public void add(int value) { - add(hashFunc.hashInt(value).asLong()); - } - - public void add(String value) { - add(hashFunc.hashString(value, Charset.defaultCharset()).asLong()); - } - - public void add(byte[] value) { - add(hashFunc.hashBytes(value).asLong()); - } - - public void add(byte[] value, int offset, int length) { - add(hashFunc.hashBytes(value, offset, length).asLong()); - } - - protected void add(long hash) { - int bucketMask = m - 1; - int bucket = (int) (hash & bucketMask); - int firstOnePos = Long.numberOfLeadingZeros(hash | bucketMask) + 1; - Byte b = register.get(bucket); - if (b == null || (byte) firstOnePos > b) { - register.set(bucket, (byte) firstOnePos); - } - if (register instanceof SparseRegister) { - if (((SparseRegister) register).isOverThreshold()) { - register = ((SparseRegister) register).toDense(p); - } - } - } - - public void merge(HyperLogLogPlusCounterNew another) { - assert this.p == another.p; - assert this.hashFunc == another.hashFunc; - if (register instanceof SparseRegister && another.register instanceof SparseRegister) { - register.merge(another.register); - if (((SparseRegister) register).isOverThreshold()) { - register = ((SparseRegister) register).toDense(p); - } - } else if (register instanceof SparseRegister && another.register instanceof DenseRegister) { - register = ((SparseRegister) register).toDense(p); - register.merge(another.register); - } else { - register.merge(another.register); - } - } - - public long getCountEstimate() { - return new HLLCSnapshot(this).getCountEstimate(); - } - - public int getPrecision() { - return this.p; - } - - public double getErrorRate() { - return 1.04 / Math.sqrt(m); - } - - @Override - public String toString() { - return "" + getCountEstimate(); - } - - // ============================================================================ - - // a memory efficient snapshot of HLL registers which can yield count - // estimate later - public static class HLLCSnapshot { - byte p; - double registerSum; - int zeroBuckets; - - public HLLCSnapshot(HyperLogLogPlusCounterNew hllc) { - p = (byte) hllc.p; - registerSum = 0; - zeroBuckets = 0; - Register register = hllc.getRegister(); - DenseRegister dr; - if (register instanceof SparseRegister) { - dr = ((SparseRegister) register).toDense(p); - } else { - dr = (DenseRegister) register; - } - byte[] registers = dr.getRawRegister(); - for (int i = 0; i < hllc.m; i++) { - if (registers[i] == 0) { - registerSum++; - zeroBuckets++; - } else { - registerSum += 1.0 / (1L << registers[i]); - } - } - } - - public long getCountEstimate() { - int m = 1 << p; - double alpha = 0.7213 / (1 + 1.079 / m); - double estimate = alpha * m * m / registerSum; - - // small cardinality adjustment - if (zeroBuckets >= m * 0.07) { // (reference presto's HLL impl) - estimate = m * Math.log(m * 1.0 / zeroBuckets); - } else if (HyperLogLogPlusTable.isBiasCorrection(m, estimate)) { - estimate = HyperLogLogPlusTable.biasCorrection(p, estimate); - } - - return Math.round(estimate); - } - } - - public static void main(String[] args) throws IOException { - dumpErrorRates(); - } - - static void dumpErrorRates() { - for (int p = 10; p <= 18; p++) { - double rate = new HyperLogLogPlusCounterNew(p, RegisterType.SPARSE).getErrorRate(); - double er = Math.round(rate * 10000) / 100D; - double er2 = Math.round(rate * 2 * 10000) / 100D; - double er3 = Math.round(rate * 3 * 10000) / 100D; - long size = Math.round(Math.pow(2, p)); - System.out.println("HLLC" + p + ",\t" + size + " bytes,\t68% err<" + er + "%" + ",\t95% err<" + er2 + "%" + ",\t99.7% err<" + er3 + "%"); - } - } - - public Register getRegister() { - return register; - } - - public void clear() { - register.clear(); - } - - public RegisterType getRegisterType() { - if (register instanceof SparseRegister) - return RegisterType.SPARSE; - else - return RegisterType.DENSE; - } - - // ============================================================================ - - public void writeRegisters(final ByteBuffer out) throws IOException { - - final int indexLen = getRegisterIndexSize(); - int size = size(); - - // decide output scheme -- map (3*size bytes) or array (2^p bytes) - byte scheme; - //byte type; - if (register instanceof SparseRegister || 5 + (indexLen + 1) * size < m) { - scheme = 0; //map - } else { - scheme = 1; // array - } - out.put(scheme); - if (scheme == 0) { // map scheme - BytesUtil.writeVInt(size, out); - if (register instanceof SparseRegister) { //sparseãregister - Collection<Map.Entry<Integer, Byte>> allValue = ((SparseRegister) register).getAllValue(); - for (Map.Entry<Integer, Byte> entry : allValue) { - writeUnsigned(entry.getKey(), indexLen, out); - out.put(entry.getValue()); - } - } else { //dense register - byte[] registers = ((DenseRegister) register).getRawRegister(); - for (int i = 0; i < m; i++) { - if (registers[i] > 0) { - writeUnsigned(i, indexLen, out); - out.put(registers[i]); - } - } - } - } else if (scheme == 1) { // array scheme - out.put(((DenseRegister) register).getRawRegister()); - } else - throw new IllegalStateException(); - } - - public void readRegisters(ByteBuffer in) throws IOException { - byte scheme = in.get(); - if (scheme == 0) { // map scheme - clear(); - int size = BytesUtil.readVInt(in); - if (size > m) - throw new IllegalArgumentException("register size (" + size + ") cannot be larger than m (" + m + ")"); - double over = overflowFactor * m; - if (size > (int) over) { - this.register = new DenseRegister(p); - } else { - this.register = new SparseRegister((int) over);//default is sparse - } - int indexLen = getRegisterIndexSize(); - int key = 0; - for (int i = 0; i < size; i++) { - key = readUnsigned(in, indexLen); - register.set(key, in.get()); - } - } else if (scheme == 1) { // array scheme - this.register = new DenseRegister(p); - for (int i = 0; i < m; i++) { - register.set(i, in.get()); - } - } else - throw new IllegalStateException(); - } - - public int peekLength(ByteBuffer in) { - int mark = in.position(); - int len; - byte scheme = in.get(); - if (scheme == 0) { // map scheme - int size = BytesUtil.readVInt(in); - int indexLen = getRegisterIndexSize(); - len = in.position() - mark + (indexLen + 1) * size; - } else { - len = in.position() - mark + m; - } - - in.position(mark); - return len; - } - - public int maxLength() { - return 1 + m; - } - - private int getRegisterIndexSize() { - return (p - 1) / 8 + 1; // 2 when p=16, 3 when p=17 - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((hashFunc == null) ? 0 : hashFunc.hashCode()); - result = prime * result + p; - result = prime * result + register.getHashCode(); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - HyperLogLogPlusCounterNew other = (HyperLogLogPlusCounterNew) obj; - if (hashFunc == null) { - if (other.hashFunc != null) - return false; - } else if (!hashFunc.equals(other.hashFunc)) - return false; - if (p != other.p) - return false; - if (this.getRegisterType() != other.getRegisterType()) - return false; - if (register.getHashCode() != other.register.getHashCode()) - return false; - return true; - } - - @Override - public int compareTo(HyperLogLogPlusCounterNew o) { - if (o == null) - return 1; - - long e1 = this.getCountEstimate(); - long e2 = o.getCountEstimate(); - - if (e1 == e2) - return 0; - else if (e1 > e2) - return 1; - else - return -1; - } - - /** - * - * @param num - * @param size - * @param out - */ - public static void writeUnsigned(int num, int size, ByteBuffer out) { - for (int i = 0; i < size; i++) { - out.put((byte) num); - num >>>= 8; - } - } - - public static int readUnsigned(ByteBuffer in, int size) { - int integer = 0; - int mask = 0xff; - int shift = 0; - for (int i = 0; i < size; i++) { - integer |= (in.get() << shift) & mask; - mask = mask << 8; - shift += 8; - } - return integer; - } - - private int size() { - return register.getSize(); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounterOld.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounterOld.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounterOld.java deleted file mode 100644 index cb5533e..0000000 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounterOld.java +++ /dev/null @@ -1,392 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.measure.hllc; - -import java.io.IOException; -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.Arrays; - -import org.apache.kylin.common.util.BytesUtil; - -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hashing; - -/** - * About compression, test on HLLC data shows - * - * - LZF compression ratio is around 65%-80%, fast - * - GZIP compression ratio is around 41%-46%, very slow - * - * @author yangli9 - */ -@SuppressWarnings("serial") -public class HyperLogLogPlusCounterOld implements Serializable, Comparable<HyperLogLogPlusCounterOld> { - - private final int p; - private final int m; - private final HashFunction hashFunc; - byte[] registers; - int singleBucket; - - public HyperLogLogPlusCounterOld() { - this(10); - } - - public HyperLogLogPlusCounterOld(int p) { - this(p, Hashing.murmur3_128()); - } - - public HyperLogLogPlusCounterOld(HyperLogLogPlusCounterOld another) { - this(another.p, another.hashFunc); - merge(another); - } - - /** The larger p is, the more storage (2^p bytes), the better accuracy */ - private HyperLogLogPlusCounterOld(int p, HashFunction hashFunc) { - this.p = p; - this.m = 1 << p;//(int) Math.pow(2, p); - this.hashFunc = hashFunc; - this.registers = new byte[m]; - this.singleBucket = -1; - } - - public void clear() { - byte zero = (byte) 0; - if (singleBucket == -1) { - //nothing - } else if (singleBucket >= 0) { - registers[singleBucket] = 0; - } else { - Arrays.fill(registers, zero); - } - singleBucket = -1; - } - - public void add(int value) { - add(hashFunc.hashInt(value).asLong()); - } - - public void add(String value) { - add(hashFunc.hashString(value, Charset.defaultCharset()).asLong()); - } - - public void add(byte[] value) { - add(hashFunc.hashBytes(value).asLong()); - } - - public void add(byte[] value, int offset, int length) { - add(hashFunc.hashBytes(value, offset, length).asLong()); - } - - protected void add(long hash) { - int bucketMask = m - 1; - int bucket = (int) (hash & bucketMask); - int firstOnePos = Long.numberOfLeadingZeros(hash | bucketMask) + 1; - - if (firstOnePos > registers[bucket]) - registers[bucket] = (byte) firstOnePos; - - if (singleBucket == -1) - singleBucket = bucket; - else - singleBucket = Integer.MIN_VALUE; - } - - public void merge(HyperLogLogPlusCounterOld another) { - assert this.p == another.p; - assert this.hashFunc == another.hashFunc; - - // quick path for single value HLLC - if (another.singleBucket == -1) { - return; - } else if (another.singleBucket >= 0) { - int b = another.singleBucket; - if (registers[b] < another.registers[b]) - registers[b] = another.registers[b]; - } else { - // normal path - for (int i = 0; i < m; i++) { - if (registers[i] < another.registers[i]) - registers[i] = another.registers[i]; - } - } - singleBucket = Integer.MIN_VALUE; - } - - public long getCountEstimate() { - return new HLLCSnapshot(this).getCountEstimate(); - } - - public int getPrecision() { - return this.p; - } - - public double getErrorRate() { - return 1.04 / Math.sqrt(m); - } - - private int size() { - if (singleBucket == -1) { - return 0; - } else if (singleBucket >= 0) { - return 1; - } else { - int size = 0; - for (int i = 0; i < m; i++) { - if (registers[i] > 0) - size++; - } - return size; - } - } - - @Override - public String toString() { - return "" + getCountEstimate(); - } - - // ============================================================================ - - // a memory efficient snapshot of HLL registers which can yield count - // estimate later - public static class HLLCSnapshot { - byte p; - double registerSum; - int zeroBuckets; - - public HLLCSnapshot(HyperLogLogPlusCounterOld hllc) { - p = (byte) hllc.p; - registerSum = 0; - zeroBuckets = 0; - - byte[] registers = hllc.registers; - for (int i = 0; i < hllc.m; i++) { - if (registers[i] == 0) { - registerSum++; - zeroBuckets++; - } else { - registerSum += 1.0 / (1L << registers[i]); - } - } - } - - public long getCountEstimate() { - int m = 1 << p; - double alpha = 0.7213 / (1 + 1.079 / m); - double estimate = alpha * m * m / registerSum; - - // small cardinality adjustment - if (zeroBuckets >= m * 0.07) { // (reference presto's HLL impl) - estimate = m * Math.log(m * 1.0 / zeroBuckets); - } else if (HyperLogLogPlusTable.isBiasCorrection(m, estimate)) { - estimate = HyperLogLogPlusTable.biasCorrection(p, estimate); - } - - return Math.round(estimate); - } - } - - // ============================================================================ - - public void writeRegisters(final ByteBuffer out) throws IOException { - - final int indexLen = getRegisterIndexSize(); - int size = size(); - - // decide output scheme -- map (3*size bytes) or array (2^p bytes) - byte scheme; - if (5 + (indexLen + 1) * size < m) // 5 is max len of vint - scheme = 0; // map - else - scheme = 1; // array - out.put(scheme); - - if (scheme == 0) { // map scheme - BytesUtil.writeVInt(size, out); - if (singleBucket == -1) { - // no non-zero register - } else if (singleBucket >= 0) { - writeUnsigned(singleBucket, indexLen, out); - out.put(registers[singleBucket]); - } else { - for (int i = 0; i < m; i++) { - if (registers[i] > 0) { - writeUnsigned(i, indexLen, out); - out.put(registers[i]); - } - } - } - } else if (scheme == 1) { // array scheme - out.put(registers); - } else - throw new IllegalStateException(); - } - - public void readRegisters(ByteBuffer in) throws IOException { - byte scheme = in.get(); - - if (scheme == 0) { // map scheme - clear(); - int size = BytesUtil.readVInt(in); - if (size > m) - throw new IllegalArgumentException("register size (" + size + ") cannot be larger than m (" + m + ")"); - int indexLen = getRegisterIndexSize(); - int key = 0; - for (int i = 0; i < size; i++) { - key = readUnsigned(in, indexLen); - registers[key] = in.get(); - } - - if (size == 0) - singleBucket = -1; - else if (size == 1) - singleBucket = key; - else - singleBucket = Integer.MIN_VALUE; - - } else if (scheme == 1) { // array scheme - in.get(registers); - singleBucket = Integer.MIN_VALUE; - } else - throw new IllegalStateException(); - } - - public int peekLength(ByteBuffer in) { - int mark = in.position(); - int len; - - byte scheme = in.get(); - if (scheme == 0) { // map scheme - int size = BytesUtil.readVInt(in); - int indexLen = getRegisterIndexSize(); - len = in.position() - mark + (indexLen + 1) * size; - } else { - len = in.position() - mark + m; - } - - in.position(mark); - return len; - } - - public int maxLength() { - return 1 + m; - } - - /*public void writeRegistersArray(final ByteBuffer out) { - out.put(this.registers); - } - - public void readRegistersArray(ByteBuffer in) { - in.get(registers, 0, m); - singleBucket = Integer.MIN_VALUE; - }*/ - - private int getRegisterIndexSize() { - return (p - 1) / 8 + 1; // 2 when p=16, 3 when p=17 - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((hashFunc == null) ? 0 : hashFunc.hashCode()); - result = prime * result + p; - result = prime * result + Arrays.hashCode(registers); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - HyperLogLogPlusCounterOld other = (HyperLogLogPlusCounterOld) obj; - if (hashFunc == null) { - if (other.hashFunc != null) - return false; - } else if (!hashFunc.equals(other.hashFunc)) - return false; - if (p != other.p) - return false; - if (!Arrays.equals(registers, other.registers)) - return false; - return true; - } - - @Override - public int compareTo(HyperLogLogPlusCounterOld o) { - if (o == null) - return 1; - - long e1 = this.getCountEstimate(); - long e2 = o.getCountEstimate(); - - if (e1 == e2) - return 0; - else if (e1 > e2) - return 1; - else - return -1; - } - - public static void main(String[] args) throws IOException { - dumpErrorRates(); - } - - static void dumpErrorRates() { - for (int p = 10; p <= 18; p++) { - double rate = new HyperLogLogPlusCounterOld(p).getErrorRate(); - double er = Math.round(rate * 10000) / 100D; - double er2 = Math.round(rate * 2 * 10000) / 100D; - double er3 = Math.round(rate * 3 * 10000) / 100D; - long size = Math.round(Math.pow(2, p)); - System.out.println("HLLC" + p + ",\t" + size + " bytes,\t68% err<" + er + "%" + ",\t95% err<" + er2 + "%" + ",\t99.7% err<" + er3 + "%"); - } - } - - /** - * - * @param num - * @param size - * @param out - */ - public static void writeUnsigned(int num, int size, ByteBuffer out) { - for (int i = 0; i < size; i++) { - out.put((byte) num); - num >>>= 8; - } - } - - public static int readUnsigned(ByteBuffer in, int size) { - int integer = 0; - int mask = 0xff; - int shift = 0; - for (int i = 0; i < size; i++) { - integer |= (in.get() << shift) & mask; - mask = mask << 8; - shift += 8; - } - return integer; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/core-metadata/src/main/java/org/apache/kylin/measure/hllc/Register.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/Register.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/Register.java index 79c4bba..a6ef94f 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/Register.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/Register.java @@ -24,7 +24,7 @@ public interface Register { void set(int pos, byte value); - Byte get(int pos); + byte get(int pos); void merge(Register another); @@ -32,6 +32,4 @@ public interface Register { int getSize(); - int getHashCode(); - } http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/core-metadata/src/main/java/org/apache/kylin/measure/hllc/SparseRegister.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/SparseRegister.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/SparseRegister.java index d241e81..d6bb024 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/SparseRegister.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/SparseRegister.java @@ -27,12 +27,9 @@ import java.util.TreeMap; */ public class SparseRegister implements Register { - private int overThreshold; - private Map<Integer, Byte> sparseRegister = new TreeMap<>(); - public SparseRegister(int overThreshold) { - this.overThreshold = overThreshold; + public SparseRegister() { } public DenseRegister toDense(int p) { @@ -49,8 +46,9 @@ public class SparseRegister implements Register { } @Override - public Byte get(int pos) { - return sparseRegister.get(pos); + public byte get(int pos) { + Byte b = sparseRegister.get(pos); + return b == null ? 0 : b; } @Override @@ -58,8 +56,8 @@ public class SparseRegister implements Register { assert another instanceof SparseRegister; SparseRegister sr = (SparseRegister) another; for (Map.Entry<Integer, Byte> entry : sr.sparseRegister.entrySet()) { - Byte v = sparseRegister.get(entry.getKey()); - if (v == null || entry.getValue() > v) + byte v = get(entry.getKey()); + if (entry.getValue() > v) sparseRegister.put(entry.getKey(), entry.getValue()); } } @@ -75,20 +73,28 @@ public class SparseRegister implements Register { } @Override - public int getHashCode() { + public int hashCode() { final int prime = 31; int result = 1; - for (Map.Entry<Integer, Byte> entry : sparseRegister.entrySet()) { - result = prime * result + entry.getKey(); - result = prime * result + entry.getValue(); - } + result = prime * result + ((sparseRegister == null) ? 0 : sparseRegister.hashCode()); return result; } - public boolean isOverThreshold() { - if (this.sparseRegister.size() > overThreshold) + @Override + public boolean equals(Object obj) { + if (this == obj) return true; - return false; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + SparseRegister other = (SparseRegister) obj; + if (sparseRegister == null) { + if (other.sparseRegister != null) + return false; + } else if (!sparseRegister.equals(other.sparseRegister)) + return false; + return true; } public Collection<Map.Entry<Integer, Byte>> getAllValue() { http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java index 103e721..0f22610 100644 --- a/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java +++ b/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java @@ -26,7 +26,7 @@ import org.apache.kylin.measure.bitmap.BitmapAggregator; import org.apache.kylin.measure.bitmap.BitmapCounter; import org.apache.kylin.measure.extendedcolumn.ExtendedColumnMeasureType; import org.apache.kylin.measure.hllc.HLLCAggregator; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; +import org.apache.kylin.measure.hllc.HLLCounter; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.datatype.DoubleMutable; import org.apache.kylin.metadata.datatype.LongMutable; @@ -94,7 +94,7 @@ public class AggregatorMemEstimateTest extends LocalFileMetadataTestCase { @Test public void testAggregatorEstimate() { HLLCAggregator hllcAggregator = new HLLCAggregator(14); - hllcAggregator.aggregate(new HyperLogLogPlusCounterNew(14)); + hllcAggregator.aggregate(new HLLCounter(14)); BitmapAggregator bitmapAggregator = new BitmapAggregator(); BitmapCounter bitmapCounter = new BitmapCounter();