Repository: kylin Updated Branches: refs/heads/master 16ff672b7 -> ec2845af4
KYLIN-1580 Use 1 byte instead of 8 bytes as column indicator in fact distinct MR job Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ec2845af Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ec2845af Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ec2845af Branch: refs/heads/master Commit: ec2845af45bf9c44400bce68ce42dfe21adb850e Parents: 8e10852 Author: shaofengshi <shaofeng...@apache.org> Authored: Wed Apr 13 11:02:46 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Wed Apr 13 11:03:06 2016 +0800 ---------------------------------------------------------------------- .../engine/mr/steps/FactDistinctColumnPartitioner.java | 8 ++++---- .../engine/mr/steps/FactDistinctColumnsReducer.java | 4 ++-- .../engine/mr/steps/FactDistinctHiveColumnsMapper.java | 11 +++++++---- 3 files changed, 13 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/ec2845af/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java index b3498ff..a631cf4 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java @@ -21,7 +21,7 @@ package org.apache.kylin.engine.mr.steps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; -import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.BytesUtil; /** */ @@ -31,12 +31,12 @@ public class FactDistinctColumnPartitioner extends Partitioner<Text, Text> { @Override public int getPartition(Text key, Text value, int numReduceTasks) { - long colIndex = Bytes.toLong(key.getBytes(), 0, Bytes.SIZEOF_LONG); - if (colIndex < 0) { + if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_HLL) { // the last reducer is for merging hll return numReduceTasks - 1; } else { - return (int) (colIndex); + int colIndex = BytesUtil.readUnsigned(key.getBytes(), 0, 1); + return colIndex; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/ec2845af/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index 211e947..126eebd 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -96,7 +96,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { if (isStatistics == false) { - colValues.add(new ByteArray(Bytes.copy(key.getBytes(), Bytes.SIZEOF_LONG, key.getLength() - Bytes.SIZEOF_LONG))); + colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1))); if (colValues.size() == 1000000) { //spill every 1 million System.out.println("spill values to disk..."); outputDistinctValues(col, colValues, context); @@ -104,7 +104,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri } } else { // for hll - long cuboidId = 0 - Bytes.toLong(key.getBytes(), 0, Bytes.SIZEOF_LONG); + long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG); for (Text value : values) { HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14); ByteBuffer bf = ByteBuffer.wrap(value.getBytes(), 0, value.getLength()); http://git-wip-us.apache.org/repos/asf/kylin/blob/ec2845af/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java index 44de69a..8a130a7 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java @@ -37,7 +37,6 @@ import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; /** - * @author yangli9 */ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> { @@ -53,6 +52,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap private ByteArray[] row_hashcodes = null; private ByteBuffer keyBuffer; private static final Text EMPTY_TEXT = new Text(); + public static final byte MARK_FOR_HLL = (byte) 0xFF; @Override protected void setup(Context context) throws IOException { @@ -115,7 +115,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap continue; keyBuffer.clear(); - keyBuffer.putLong((long)i); + keyBuffer.put(Bytes.toBytes(i)[3]); // one byte is enough keyBuffer.put(Bytes.toBytes(fieldValue)); outputKey.set(keyBuffer.array(), 0, keyBuffer.position()); context.write(outputKey, EMPTY_TEXT); @@ -164,7 +164,11 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap HyperLogLogPlusCounter hll; for (int i = 0; i < cuboidIds.length; i++) { hll = allCuboidsHLL[i]; - outputKey.set(Bytes.toBytes(0 - cuboidIds[i])); + + keyBuffer.clear(); + keyBuffer.put(MARK_FOR_HLL); // one byte + keyBuffer.putLong(cuboidIds[i]); + outputKey.set(keyBuffer.array(), 0, keyBuffer.position()); hllBuf.clear(); hll.writeRegisters(hllBuf); outputValue.set(hllBuf.array(), 0, hllBuf.position()); @@ -172,5 +176,4 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap } } } - }