Repository: kylin Updated Branches: refs/heads/2.0.x bcfab74a9 -> 8f59aa89b
KYLIN-2552 Fix backward compatibility (Fan Xie) Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8f59aa89 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8f59aa89 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8f59aa89 Branch: refs/heads/2.0.x Commit: 8f59aa89bbaa07a4ca5aec5406d60f5d70318af3 Parents: bcfab74 Author: Hongbin Ma <mahong...@apache.org> Authored: Tue Apr 18 16:20:24 2017 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Tue Apr 18 16:20:31 2017 +0800 ---------------------------------------------------------------------- .../mr/steps/FactDistinctColumnsMapper.java | 55 ++++++++++++++++++-- 1 file changed, 51 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/8f59aa89/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java index e6cea2b..d36ae18 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java @@ -24,6 +24,8 @@ import java.util.Collection; import java.util.List; import org.apache.hadoop.io.Text; +import org.apache.kylin.common.KylinVersion; +import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.cube.cuboid.CuboidScheduler; @@ -64,6 +66,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB private int samplingPercentage; //private ByteArray[] row_hashcodes = null; private long[] rowHashCodesLong = null; + private ByteArray[] row_hashcodes = null; private ByteBuffer tmpbuf; private static final Text EMPTY_TEXT = new Text(); public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE; @@ -74,6 +77,9 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB private SelfDefineSortableKey sortableKey = new SelfDefineSortableKey(); + //about details of the new algorithm, please see KYLIN-2518 + private boolean isUsePutRowKeyToHllNewAlgorithm; + @Override protected void setup(Context context) throws IOException { super.setup(context); @@ -96,8 +102,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE); } - hf = Hashing.murmur3_128(); - rowHashCodesLong = new long[nRowKey]; TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); if (partitionColRef != null) { @@ -111,7 +115,23 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB } else { needFetchPartitionCol = true; } + //for KYLIN-2518 backward compatibility + if (KylinVersion.isBefore200(cubeDesc.getVersion())) { + isUsePutRowKeyToHllNewAlgorithm = false; + row_hashcodes = new ByteArray[nRowKey]; + for (int i = 0; i < nRowKey; i++) { + row_hashcodes[i] = new ByteArray(); + } + hf = Hashing.murmur3_32(); + logger.info("Found KylinVersion : {}. Use old algorithm for cuboid sampling.", cubeDesc.getVersion()); + } else { + isUsePutRowKeyToHllNewAlgorithm = true; + rowHashCodesLong = new long[nRowKey]; + hf = Hashing.murmur3_128(); + logger.info("Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518", cubeDesc.getVersion()); + } } + } private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet, List<Long> allCuboids) { @@ -176,7 +196,11 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB if (collectStatistics) { if (rowCount % 100 < samplingPercentage) { - putRowKeyToHLL(row); + if (isUsePutRowKeyToHllNewAlgorithm) { + putRowKeyToHLLNew(row); + } else { + putRowKeyToHLLOld(row); + } } if (needFetchPartitionCol == true) { @@ -208,7 +232,30 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB return size; } - private void putRowKeyToHLL(String[] row) { + private void putRowKeyToHLLOld(String[] row) { + //generate hash for each row key column + for (int i = 0; i < nRowKey; i++) { + Hasher hc = hf.newHasher(); + String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]]; + if (colValue != null) { + row_hashcodes[i].set(hc.putString(colValue).hash().asBytes()); + } else { + row_hashcodes[i].set(hc.putInt(0).hash().asBytes()); + } + } + + // user the row key column hash to get a consolidated hash for each cuboid + for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) { + Hasher hc = hf.newHasher(); + for (int position = 0; position < allCuboidsBitSet[i].length; position++) { + hc.putBytes(row_hashcodes[allCuboidsBitSet[i][position]].array()); + } + + allCuboidsHLL[i].add(hc.hash().asBytes()); + } + } + + private void putRowKeyToHLLNew(String[] row) { //generate hash for each row key column for (int i = 0; i < nRowKey; i++) { Hasher hc = hf.newHasher();