http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index 76212c8..6e894dd 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -83,7 +83,7 @@ import org.apache.kylin.engine.spark.cube.DefaultTupleConverter; import org.apache.kylin.engine.spark.util.IteratorUtils; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.MeasureAggregators; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; +import org.apache.kylin.measure.hllc.HLLCounter; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.MeasureDesc; @@ -241,15 +241,15 @@ public class SparkCubing extends AbstractApplication { } } - private Map<Long, HyperLogLogPlusCounterNew> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName, String segmentId) throws Exception { + private Map<Long, HLLCounter> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName, String segmentId) throws Exception { CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName); CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); CubeDesc cubeDesc = cubeInstance.getDescriptor(); CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc); List<Long> allCuboidIds = cuboidScheduler.getAllCuboidIds(); - final HashMap<Long, HyperLogLogPlusCounterNew> zeroValue = Maps.newHashMap(); + final HashMap<Long, HLLCounter> zeroValue = Maps.newHashMap(); for (Long id : allCuboidIds) { - zeroValue.put(id, new HyperLogLogPlusCounterNew(cubeDesc.getConfig().getCubeStatsHLLPrecision())); + zeroValue.put(id, new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision())); } CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); @@ -278,12 +278,12 @@ public class SparkCubing extends AbstractApplication { row_hashcodes[i] = new ByteArray(); } - final HashMap<Long, HyperLogLogPlusCounterNew> samplingResult = rowJavaRDD.aggregate(zeroValue, new Function2<HashMap<Long, HyperLogLogPlusCounterNew>, List<String>, HashMap<Long, HyperLogLogPlusCounterNew>>() { + final HashMap<Long, HLLCounter> samplingResult = rowJavaRDD.aggregate(zeroValue, new Function2<HashMap<Long, HLLCounter>, List<String>, HashMap<Long, HLLCounter>>() { final HashFunction hashFunction = Hashing.murmur3_128(); @Override - public HashMap<Long, HyperLogLogPlusCounterNew> call(HashMap<Long, HyperLogLogPlusCounterNew> v1, List<String> v2) throws Exception { + public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, List<String> v2) throws Exception { for (int i = 0; i < nRowKey; i++) { Hasher hc = hashFunction.newHasher(); String colValue = v2.get(rowKeyColumnIndexes[i]); @@ -296,7 +296,7 @@ public class SparkCubing extends AbstractApplication { for (Map.Entry<Long, Integer[]> entry : allCuboidsBitSet.entrySet()) { Hasher hc = hashFunction.newHasher(); - HyperLogLogPlusCounterNew counter = v1.get(entry.getKey()); + HLLCounter counter = v1.get(entry.getKey()); final Integer[] cuboidBitSet = entry.getValue(); for (int position = 0; position < cuboidBitSet.length; position++) { hc.putBytes(row_hashcodes[cuboidBitSet[position]].array()); @@ -305,14 +305,14 @@ public class SparkCubing extends AbstractApplication { } return v1; } - }, new Function2<HashMap<Long, HyperLogLogPlusCounterNew>, HashMap<Long, HyperLogLogPlusCounterNew>, HashMap<Long, HyperLogLogPlusCounterNew>>() { + }, new Function2<HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>>() { @Override - public HashMap<Long, HyperLogLogPlusCounterNew> call(HashMap<Long, HyperLogLogPlusCounterNew> v1, HashMap<Long, HyperLogLogPlusCounterNew> v2) throws Exception { + public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, HashMap<Long, HLLCounter> v2) throws Exception { Preconditions.checkArgument(v1.size() == v2.size()); Preconditions.checkArgument(v1.size() > 0); - for (Map.Entry<Long, HyperLogLogPlusCounterNew> entry : v1.entrySet()) { - final HyperLogLogPlusCounterNew counter1 = entry.getValue(); - final HyperLogLogPlusCounterNew counter2 = v2.get(entry.getKey()); + for (Map.Entry<Long, HLLCounter> entry : v1.entrySet()) { + final HLLCounter counter1 = entry.getValue(); + final HLLCounter counter2 = v2.get(entry.getKey()); counter1.merge(Preconditions.checkNotNull(counter2, "counter cannot be null")); } return v1; @@ -470,7 +470,7 @@ public class SparkCubing extends AbstractApplication { ClassUtil.addClasspath(confPath); } - private byte[][] createHTable(String cubeName, String segmentId, Map<Long, HyperLogLogPlusCounterNew> samplingResult) throws Exception { + private byte[][] createHTable(String cubeName, String segmentId, Map<Long, HLLCounter> samplingResult) throws Exception { final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); @@ -614,7 +614,7 @@ public class SparkCubing extends AbstractApplication { } }); - final Map<Long, HyperLogLogPlusCounterNew> samplingResult = sampling(rowJavaRDD, cubeName, segmentId); + final Map<Long, HLLCounter> samplingResult = sampling(rowJavaRDD, cubeName, segmentId); final byte[][] splitKeys = createHTable(cubeName, segmentId, samplingResult); final String hfile = build(rowJavaRDD, cubeName, segmentId, splitKeys);
http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java index 230249f..f046f78 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java @@ -35,7 +35,7 @@ import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.measure.BufferedMeasureCodec; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; +import org.apache.kylin.measure.hllc.HLLCounter; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.TableDesc; @@ -46,7 +46,7 @@ import org.apache.kylin.metadata.model.TableDesc; */ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritable, BytesWritable> { - private Map<Integer, HyperLogLogPlusCounterNew> hllcMap = new HashMap<Integer, HyperLogLogPlusCounterNew>(); + private Map<Integer, HLLCounter> hllcMap = new HashMap<Integer, HLLCounter>(); public static final String DEFAULT_DELIM = ","; private int counter = 0; @@ -87,9 +87,9 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab counter++; } - private HyperLogLogPlusCounterNew getHllc(Integer key) { + private HLLCounter getHllc(Integer key) { if (!hllcMap.containsKey(key)) { - hllcMap.put(key, new HyperLogLogPlusCounterNew()); + hllcMap.put(key, new HLLCounter()); } return hllcMap.get(key); } @@ -100,7 +100,7 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); while (it.hasNext()) { int key = it.next(); - HyperLogLogPlusCounterNew hllc = hllcMap.get(key); + HLLCounter hllc = hllcMap.get(key); buf.clear(); hllc.writeRegisters(buf); buf.flip(); http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java index 32cc6d9..0648960 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java @@ -32,7 +32,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.kylin.engine.mr.KylinReducer; import org.apache.kylin.measure.BufferedMeasureCodec; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; +import org.apache.kylin.measure.hllc.HLLCounter; /** * @author Jack @@ -41,7 +41,7 @@ import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWritable, IntWritable, LongWritable> { public static final int ONE = 1; - private Map<Integer, HyperLogLogPlusCounterNew> hllcMap = new HashMap<Integer, HyperLogLogPlusCounterNew>(); + private Map<Integer, HLLCounter> hllcMap = new HashMap<Integer, HLLCounter>(); @Override protected void setup(Context context) throws IOException { @@ -53,16 +53,16 @@ public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWri int skey = key.get(); for (BytesWritable v : values) { ByteBuffer buffer = ByteBuffer.wrap(v.getBytes()); - HyperLogLogPlusCounterNew hll = new HyperLogLogPlusCounterNew(); + HLLCounter hll = new HLLCounter(); hll.readRegisters(buffer); getHllc(skey).merge(hll); hll.clear(); } } - private HyperLogLogPlusCounterNew getHllc(Integer key) { + private HLLCounter getHllc(Integer key) { if (!hllcMap.containsKey(key)) { - hllcMap.put(key, new HyperLogLogPlusCounterNew()); + hllcMap.put(key, new HLLCounter()); } return hllcMap.get(key); } @@ -78,7 +78,7 @@ public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWri it = keys.iterator(); while (it.hasNext()) { int key = it.next(); - HyperLogLogPlusCounterNew hllc = hllcMap.get(key); + HLLCounter hllc = hllcMap.get(key); ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); buf.clear(); hllc.writeRegisters(buf); http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java ---------------------------------------------------------------------- diff --git a/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java b/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java index 410543a..c32e76d 100644 --- a/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java +++ b/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java @@ -35,7 +35,7 @@ import org.apache.hadoop.mrunit.mapreduce.ReduceDriver; import org.apache.hadoop.mrunit.types.Pair; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.measure.BufferedMeasureCodec; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; +import org.apache.kylin.measure.hllc.HLLCounter; import org.junit.Before; import org.junit.Test; @@ -57,7 +57,7 @@ public class ColumnCardinalityReducerTest { } private byte[] getBytes(String str) throws IOException { - HyperLogLogPlusCounterNew hllc = new HyperLogLogPlusCounterNew(); + HLLCounter hllc = new HLLCounter(); StringTokenizer tokenizer = new StringTokenizer(str, ColumnCardinalityMapper.DEFAULT_DELIM); int i = 0; while (tokenizer.hasMoreTokens()) {