KYLIN-2518 Optimize put row key to hll Signed-off-by: Hongbin Ma <mahong...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4c218214 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4c218214 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4c218214 Branch: refs/heads/master-hbase0.98 Commit: 4c21821471cb261cfecdf8289c5f8284af817b3e Parents: b1cc0dd Author: xiefan46 <958034...@qq.com> Authored: Mon Mar 27 18:13:03 2017 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Wed Mar 29 11:01:24 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/measure/hllc/HLLCounter.java | 54 ++-- .../mr/steps/FactDistinctColumnsMapper.java | 31 +- .../mr/steps/NewCubeSamplingMethodTest.java | 299 +++++++++++++++++++ 3 files changed, 341 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4c218214/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java index 82c881b..b793465 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java @@ -60,7 +60,7 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> { merge(another); } - HLLCounter(int p, RegisterType type) { + public HLLCounter(int p, RegisterType type) { this(p, type, Hashing.murmur3_128()); } @@ -99,6 +99,10 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> { add(hashFunc.hashBytes(value, offset, length).asLong()); } + public void addHashDirectly(long hash){ + add(hash); + } + protected void add(long hash) { int bucketMask = m - 1; int bucket = (int) (hash & bucketMask); @@ -126,7 +130,7 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> { } private void toDenseIfNeeded() { - if (register instanceof SparseRegister) { + if (register.getRegisterType() == RegisterType.SPARSE) { if (isDense(register.getSize())) { register = ((SparseRegister) register).toDense(p); } @@ -137,36 +141,36 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> { assert this.p == another.p; assert this.hashFunc == another.hashFunc; switch (register.getRegisterType()) { - case SINGLE_VALUE: - switch (another.getRegisterType()) { case SINGLE_VALUE: - if (register.getSize() > 0 && another.register.getSize() > 0) { - register = ((SingleValueRegister) register).toSparse(); - } else { - SingleValueRegister sr = (SingleValueRegister) another.register; - if (sr.getSize() > 0) - register.set(sr.getSingleValuePos(), sr.getValue()); - return; + switch (another.getRegisterType()) { + case SINGLE_VALUE: + if (register.getSize() > 0 && another.register.getSize() > 0) { + register = ((SingleValueRegister) register).toSparse(); + } else { + SingleValueRegister sr = (SingleValueRegister) another.register; + if (sr.getSize() > 0) + register.set(sr.getSingleValuePos(), sr.getValue()); + return; + } + break; + case SPARSE: + register = ((SingleValueRegister) register).toSparse(); + break; + case DENSE: + register = ((SingleValueRegister) register).toDense(this.p); + break; + default: + break; } + break; case SPARSE: - register = ((SingleValueRegister) register).toSparse(); - break; - case DENSE: - register = ((SingleValueRegister) register).toDense(this.p); + if (another.getRegisterType() == RegisterType.DENSE) { + register = ((SparseRegister) register).toDense(p); + } break; default: break; - } - - break; - case SPARSE: - if (another.getRegisterType() == RegisterType.DENSE) { - register = ((SparseRegister) register).toDense(p); - } - break; - default: - break; } register.merge(another.register); toDenseIfNeeded(); http://git-wip-us.apache.org/repos/asf/kylin/blob/4c218214/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java index 9f65163..e6cea2b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java @@ -24,13 +24,13 @@ import java.util.Collection; import java.util.List; import org.apache.hadoop.io.Text; -import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.hllc.HLLCounter; +import org.apache.kylin.measure.hllc.RegisterType; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; @@ -62,7 +62,8 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB private HashFunction hf = null; private int rowCount = 0; private int samplingPercentage; - private ByteArray[] row_hashcodes = null; + //private ByteArray[] row_hashcodes = null; + private long[] rowHashCodesLong = null; private ByteBuffer tmpbuf; private static final Text EMPTY_TEXT = new Text(); public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE; @@ -92,14 +93,11 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB allCuboidsHLL = new HLLCounter[cuboidIds.length]; for (int i = 0; i < cuboidIds.length; i++) { - allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()); + allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE); } - hf = Hashing.murmur3_32(); - row_hashcodes = new ByteArray[nRowKey]; - for (int i = 0; i < nRowKey; i++) { - row_hashcodes[i] = new ByteArray(); - } + hf = Hashing.murmur3_128(); + rowHashCodesLong = new long[nRowKey]; TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); if (partitionColRef != null) { @@ -211,26 +209,23 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB } private void putRowKeyToHLL(String[] row) { - //generate hash for each row key column for (int i = 0; i < nRowKey; i++) { Hasher hc = hf.newHasher(); String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]]; - if (colValue != null) { - row_hashcodes[i].set(hc.putString(colValue).hash().asBytes()); - } else { - row_hashcodes[i].set(hc.putInt(0).hash().asBytes()); - } + if (colValue == null) + colValue = "0"; + byte[] bytes = hc.putString(colValue).hash().asBytes(); + rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a) } // user the row key column hash to get a consolidated hash for each cuboid for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) { - Hasher hc = hf.newHasher(); + long value = 0; for (int position = 0; position < allCuboidsBitSet[i].length; position++) { - hc.putBytes(row_hashcodes[allCuboidsBitSet[i][position]].array()); + value += rowHashCodesLong[allCuboidsBitSet[i][position]]; } - - allCuboidsHLL[i].add(hc.hash().asBytes()); + allCuboidsHLL[i].addHashDirectly(value); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/4c218214/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java new file mode 100644 index 0000000..f018f28 --- /dev/null +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + + +package org.apache.kylin.engine.mr.steps; + +import com.google.common.collect.Lists; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.measure.hllc.HLLCounter; +import org.apache.kylin.measure.hllc.RegisterType; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; + +@Ignore +public class NewCubeSamplingMethodTest { + + private static final int ROW_LENGTH = 10; + + private Integer[][] allCuboidsBitSet; + + private long baseCuboidId; + + private final int rowCount = 500000; + + @Before + public void setup() { + baseCuboidId = (1L << ROW_LENGTH) - 1; + createAllCuboidBitSet(); + System.out.println("Totally have " + allCuboidsBitSet.length + " cuboids."); + } + + @Ignore + @Test + public void testRandomData() throws Exception { + List<List<String>> dataSet = getRandomDataset(rowCount); + comparePerformanceBasic(dataSet); + compareAccuracyBasic(dataSet); + } + + + @Ignore + @Test + public void testSmallCardData() throws Exception { + List<List<String>> dataSet = getSmallCardDataset(rowCount); + comparePerformanceBasic(dataSet); + compareAccuracyBasic(dataSet); + } + + + public void comparePerformanceBasic(final List<List<String>> rows) throws Exception { + //old hash method + ByteArray[] colHashValues = getNewColHashValues(ROW_LENGTH); + HLLCounter[] cuboidCounters = getNewCuboidCounters(allCuboidsBitSet.length); + long start = System.currentTimeMillis(); + for (List<String> row : rows) { + putRowKeyToHLL(row, colHashValues, cuboidCounters, Hashing.murmur3_32()); + } + long totalTime = System.currentTimeMillis() - start; + System.out.println("old method cost time : " + totalTime); + //new hash method + colHashValues = getNewColHashValues(ROW_LENGTH); + cuboidCounters = getNewCuboidCounters(allCuboidsBitSet.length); + start = System.currentTimeMillis(); + long[] valueHashLong = new long[allCuboidsBitSet.length]; + for (List<String> row : rows) { + putRowKeyToHLLNew(row, valueHashLong, cuboidCounters, Hashing.murmur3_128()); + } + totalTime = System.currentTimeMillis() - start; + System.out.println("new method cost time : " + totalTime); + } + + //test accuracy + public void compareAccuracyBasic(final List<List<String>> rows) throws Exception { + final long realCardinality = countCardinality(rows); + System.out.println("real cardinality : " + realCardinality); + //test1 + long t1 = runAndGetTime(new TestCase() { + @Override + public void run() throws Exception { + HLLCounter counter = new HLLCounter(14, RegisterType.DENSE); + final ByteArray[] colHashValues = getNewColHashValues(ROW_LENGTH); + HashFunction hf = Hashing.murmur3_32(); + for (List<String> row : rows) { + + int x = 0; + for (String field : row) { + Hasher hc = hf.newHasher(); + colHashValues[x++].set(hc.putString(field).hash().asBytes()); + } + + Hasher hc = hf.newHasher(); + for (int position = 0; position < colHashValues.length; position++) { + hc.putBytes(colHashValues[position].array()); + } + counter.add(hc.hash().asBytes()); + } + long estimate = counter.getCountEstimate(); + System.out.println("old method finished. Estimate cardinality : " + estimate + ". Error rate : " + countErrorRate(estimate, realCardinality)); + } + }); + + + long t2 = runAndGetTime(new TestCase() { + @Override + public void run() throws Exception { + HLLCounter counter = new HLLCounter(14, RegisterType.DENSE); + HashFunction hf2 = Hashing.murmur3_128(); + long[] valueHashLong = new long[allCuboidsBitSet.length]; + for (List<String> row : rows) { + + int x = 0; + for (String field : row) { + Hasher hc = hf2.newHasher(); + byte[] bytes = hc.putString(x + field).hash().asBytes(); + valueHashLong[x++] = Bytes.toLong(bytes); + } + + long value = 0; + for (int position = 0; position < row.size(); position++) { + value += valueHashLong[position]; + } + counter.addHashDirectly(value); + } + long estimate = counter.getCountEstimate(); + System.out.println("new method finished. Estimate cardinality : " + estimate + ". Error rate : " + countErrorRate(estimate, realCardinality)); + } + }); + } + + public void createAllCuboidBitSet() { + List<Long> allCuboids = Lists.newArrayList(); + List<Integer[]> allCuboidsBitSetList = Lists.newArrayList(); + for (long i = 1; i < baseCuboidId; i++) { + allCuboids.add(i); + addCuboidBitSet(i, allCuboidsBitSetList); + } + allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[allCuboidsBitSetList.size()][]); + } + + private ByteArray[] getNewColHashValues(int rowLength) { + ByteArray[] colHashValues = new ByteArray[rowLength]; + for (int i = 0; i < rowLength; i++) { + colHashValues[i] = new ByteArray(); + } + return colHashValues; + } + + private HLLCounter[] getNewCuboidCounters(int cuboidNum) { + HLLCounter[] counters = new HLLCounter[cuboidNum]; + for (int i = 0; i < counters.length; i++) + counters[i] = new HLLCounter(14, RegisterType.DENSE); + return counters; + } + + + private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet) { + Integer[] indice = new Integer[Long.bitCount(cuboidId)]; + + long mask = Long.highestOneBit(baseCuboidId); + int position = 0; + for (int i = 0; i < ROW_LENGTH; i++) { + if ((mask & cuboidId) > 0) { + indice[position] = i; + position++; + } + mask = mask >> 1; + } + + allCuboidsBitSet.add(indice); + + } + + private long runAndGetTime(TestCase testCase) throws Exception { + long start = System.currentTimeMillis(); + testCase.run(); + long totalTime = System.currentTimeMillis() - start; + return totalTime; + } + + interface TestCase { + void run() throws Exception; + } + + private void putRowKeyToHLL(List<String> row, ByteArray[] colHashValues, HLLCounter[] cuboidCounters, HashFunction hashFunction) { + int x = 0; + for (String field : row) { + Hasher hc = hashFunction.newHasher(); + colHashValues[x++].set(hc.putString(field).hash().asBytes()); + } + + for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) { + Hasher hc = hashFunction.newHasher(); + for (int position = 0; position < allCuboidsBitSet[i].length; position++) { + hc.putBytes(colHashValues[allCuboidsBitSet[i][position]].array()); + //hc.putBytes(seperator); + } + cuboidCounters[i].add(hc.hash().asBytes()); + } + } + + private void putRowKeyToHLLNew(List<String> row, long[] hashValuesLong, HLLCounter[] cuboidCounters, HashFunction hashFunction) { + int x = 0; + for (String field : row) { + Hasher hc = hashFunction.newHasher(); + byte[] bytes = hc.putString(x + field).hash().asBytes(); + hashValuesLong[x++] = Bytes.toLong(bytes); + } + + for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) { + long value = 0; + for (int position = 0; position < allCuboidsBitSet[i].length; position++) { + value += hashValuesLong[allCuboidsBitSet[i][position]]; + } + cuboidCounters[i].addHashDirectly(value); + } + } + + private List<List<String>> getRandomDataset(int size) { + List<List<String>> rows = new ArrayList<>(); + for (int i = 0; i < size; i++) { + rows.add(getRandomRow()); + } + return rows; + } + + private List<List<String>> getSmallCardDataset(int size) { + List<List<String>> rows = new ArrayList<>(); + for (int i = 0; i < size; i++) { + rows.add(getSmallCardRow()); + } + return rows; + } + + private List<String> getRandomRow() { + List<String> row = new ArrayList<>(); + for (int i = 0; i < ROW_LENGTH; i++) { + row.add(RandomStringUtils.random(10)); + } + return row; + } + + private String[] smallCardRow = {"abc", "bcd", "jifea", "feaifj"}; + + private Random rand = new Random(System.currentTimeMillis()); + + private List<String> getSmallCardRow() { + List<String> row = new ArrayList<>(); + row.add(smallCardRow[rand.nextInt(smallCardRow.length)]); + for (int i = 1; i < ROW_LENGTH; i++) { + row.add("abc"); + } + return row; + } + + + private int countCardinality(List<List<String>> rows) { + Set<String> diffCols = new HashSet<String>(); + for (List<String> row : rows) { + StringBuilder sb = new StringBuilder(); + for (String str : row) { + sb.append(str); + } + diffCols.add(sb.toString()); + } + return diffCols.size(); + } + + private double countErrorRate(long estimate, long real) { + double rate = Math.abs((estimate - real) * 1.0) / real; + return rate; + } +}