Repository: kylin
Updated Branches:
  refs/heads/master 16ff672b7 -> ec2845af4


KYLIN-1580 Use 1 byte instead of 8 bytes as column indicator in fact distinct 
MR job    

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ec2845af
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ec2845af
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ec2845af

Branch: refs/heads/master
Commit: ec2845af45bf9c44400bce68ce42dfe21adb850e
Parents: 8e10852
Author: shaofengshi <shaofeng...@apache.org>
Authored: Wed Apr 13 11:02:46 2016 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Wed Apr 13 11:03:06 2016 +0800

----------------------------------------------------------------------
 .../engine/mr/steps/FactDistinctColumnPartitioner.java   |  8 ++++----
 .../engine/mr/steps/FactDistinctColumnsReducer.java      |  4 ++--
 .../engine/mr/steps/FactDistinctHiveColumnsMapper.java   | 11 +++++++----
 3 files changed, 13 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/ec2845af/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
index b3498ff..a631cf4 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
@@ -21,7 +21,7 @@ package org.apache.kylin.engine.mr.steps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
 
 /**
  */
@@ -31,12 +31,12 @@ public class FactDistinctColumnPartitioner extends 
Partitioner<Text, Text> {
     @Override
     public int getPartition(Text key, Text value, int numReduceTasks) {
 
-        long colIndex = Bytes.toLong(key.getBytes(), 0, Bytes.SIZEOF_LONG);
-        if (colIndex < 0) {
+        if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_HLL) {
             // the last reducer is for merging hll
             return numReduceTasks - 1;
         } else {
-            return (int) (colIndex);
+            int colIndex = BytesUtil.readUnsigned(key.getBytes(), 0, 1);
+            return colIndex;
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec2845af/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 211e947..126eebd 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -96,7 +96,7 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<Text, Text, NullWri
     public void reduce(Text key, Iterable<Text> values, Context context) 
throws IOException, InterruptedException {
 
         if (isStatistics == false) {
-            colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 
Bytes.SIZEOF_LONG, key.getLength() - Bytes.SIZEOF_LONG)));
+            colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, 
key.getLength() - 1)));
             if (colValues.size() == 1000000) { //spill every 1 million
                 System.out.println("spill values to disk...");
                 outputDistinctValues(col, colValues, context);
@@ -104,7 +104,7 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<Text, Text, NullWri
             }
         } else {
             // for hll
-            long cuboidId = 0 - Bytes.toLong(key.getBytes(), 0, 
Bytes.SIZEOF_LONG);
+            long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
             for (Text value : values) {
                 HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14);
                 ByteBuffer bf = ByteBuffer.wrap(value.getBytes(), 0, 
value.getLength());

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec2845af/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
index 44de69a..8a130a7 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
@@ -37,7 +37,6 @@ import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 
 /**
- * @author yangli9
  */
 public class FactDistinctHiveColumnsMapper<KEYIN> extends 
FactDistinctColumnsMapperBase<KEYIN, Object> {
 
@@ -53,6 +52,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends 
FactDistinctColumnsMap
     private ByteArray[] row_hashcodes = null;
     private ByteBuffer keyBuffer;
     private static final Text EMPTY_TEXT = new Text();
+    public static final byte MARK_FOR_HLL = (byte) 0xFF;
 
     @Override
     protected void setup(Context context) throws IOException {
@@ -115,7 +115,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends 
FactDistinctColumnsMap
                     continue;
 
                 keyBuffer.clear();
-                keyBuffer.putLong((long)i);
+                keyBuffer.put(Bytes.toBytes(i)[3]); // one byte is enough
                 keyBuffer.put(Bytes.toBytes(fieldValue));
                 outputKey.set(keyBuffer.array(), 0, keyBuffer.position());
                 context.write(outputKey, EMPTY_TEXT);
@@ -164,7 +164,11 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends 
FactDistinctColumnsMap
             HyperLogLogPlusCounter hll;
             for (int i = 0; i < cuboidIds.length; i++) {
                 hll = allCuboidsHLL[i];
-                outputKey.set(Bytes.toBytes(0 - cuboidIds[i]));
+
+                keyBuffer.clear();
+                keyBuffer.put(MARK_FOR_HLL); // one byte
+                keyBuffer.putLong(cuboidIds[i]);
+                outputKey.set(keyBuffer.array(), 0, keyBuffer.position());
                 hllBuf.clear();
                 hll.writeRegisters(hllBuf);
                 outputValue.set(hllBuf.array(), 0, hllBuf.position());
@@ -172,5 +176,4 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends 
FactDistinctColumnsMap
             }
         }
     }
-
 }

Reply via email to