Repository: kylin
Updated Branches:
  refs/heads/master f2f3d6866 -> d56abdea8


KYLIN-1718 Grow ByteBuffer Dynamically in Cube Building and Query


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d56abdea
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d56abdea
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d56abdea

Branch: refs/heads/master
Commit: d56abdea858930be1df9e1772e323fd9cda31289
Parents: f2f3d68
Author: Yang Li <liy...@apache.org>
Authored: Sun May 22 21:23:47 2016 +0800
Committer: Yang Li <liy...@apache.org>
Committed: Sun May 22 21:23:47 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/cube/kv/RowConstants.java  |   1 -
 .../kylin/gridtable/GTAggregateScanner.java     |  32 +++---
 .../org/apache/kylin/gridtable/GTRecord.java    |   9 ++
 .../metadata/measure/MeasureCodecTest.java      |  11 +-
 .../kylin/measure/BufferedMeasureEncoder.java   | 107 +++++++++++++++++++
 .../org/apache/kylin/measure/MeasureCodec.java  |  92 ----------------
 .../apache/kylin/measure/MeasureDecoder.java    |  86 +++++++++++++++
 .../org/apache/kylin/measure/MeasureType.java   |   1 +
 .../kylin/engine/mr/common/CuboidStatsUtil.java |   3 +-
 .../engine/mr/steps/BaseCuboidMapperBase.java   |  15 ++-
 .../kylin/engine/mr/steps/CuboidReducer.java    |  11 +-
 .../mr/steps/FactDistinctHiveColumnsMapper.java |   3 +-
 .../engine/mr/steps/InMemCuboidReducer.java     |  18 +---
 .../kylin/engine/mr/steps/KVGTRecordWriter.java |  26 +++--
 .../engine/mr/steps/MergeCuboidMapper.java      |  11 +-
 .../kylin/engine/mr/steps/CubeReducerTest.java  |  18 ++--
 .../apache/kylin/engine/spark/SparkCubing.java  |  13 +--
 .../spark/cube/DefaultTupleConverter.java       |  28 ++---
 .../cardinality/ColumnCardinalityMapper.java    |   3 +-
 .../cardinality/ColumnCardinalityReducer.java   |   3 +-
 .../ColumnCardinalityReducerTest.java           |   3 +-
 .../observer/ObserverAggregators.java           |  13 +--
 .../coprocessor/endpoint/CubeVisitService.java  |  16 ++-
 .../storage/hbase/steps/CubeHFileMapper.java    |   6 +-
 .../storage/hbase/steps/KeyValueCreator.java    |  11 +-
 .../storage/hbase/steps/RowValueDecoder.java    |   6 +-
 .../observer/AggregateRegionObserverTest.java   |   8 +-
 .../hbase/steps/CubeHFileMapper2Test.java       |   7 +-
 .../hbase/steps/RowValueDecoderTest.java        |  15 ++-
 29 files changed, 332 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java 
b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
index 987fb55..809e0a3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
@@ -43,7 +43,6 @@ public class RowConstants {
     public static final byte[] ROWVALUE_DELIMITER_BYTES = { 7 };
 
     public static final int ROWKEY_BUFFER_SIZE = 65 * 256;// a little more 
than 64 dimensions * 256 bytes each
-    public static final int ROWVALUE_BUFFER_SIZE = 1024 * 1024; // 1 MB
 
     // marker class
     public static final byte[][] BYTE_ARR_MARKER = new byte[0][];

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 7356e77..a2ca40c 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -39,9 +39,9 @@ import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.measure.BufferedMeasureEncoder;
 import org.apache.kylin.measure.MeasureAggregator;
 import org.apache.kylin.measure.MeasureAggregators;
-import org.apache.kylin.measure.MeasureCodec;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -153,7 +153,7 @@ public class GTAggregateScanner implements IGTScanner {
         final List<Dump> dumps;
         final int keyLength;
         final boolean[] compareMask;
-        final MeasureCodec measureCodec;
+        final BufferedMeasureEncoder measureCodec;
 
         final Comparator<byte[]> bytesComparator = new Comparator<byte[]>() {
             @Override
@@ -187,12 +187,15 @@ public class GTAggregateScanner implements IGTScanner {
             measureCodec = createMeasureCodec();
         }
 
-        private MeasureCodec createMeasureCodec() {
+        private BufferedMeasureEncoder createMeasureCodec() {
             DataType[] types = new DataType[metrics.trueBitCount()];
             for (int i = 0; i < types.length; i++) {
                 types[i] = info.getColumnType(metrics.trueBitAt(i));
             }
-            return new MeasureCodec(types);
+            
+            BufferedMeasureEncoder result =  new BufferedMeasureEncoder(types);
+            result.setBufferSize(info.getMaxColumnLength(metrics));
+            return result;
         }
 
         private boolean[] createCompareMask() {
@@ -358,7 +361,7 @@ public class GTAggregateScanner implements IGTScanner {
 
         class ReturningRecord {
             final GTRecord record = new GTRecord(info);
-            final ByteBuffer metricsBuf = 
ByteBuffer.allocate(info.getMaxColumnLength(metrics));
+            final Object[] tmpValues = new Object[metrics.trueBitCount()];
 
             void load(byte[] key, MeasureAggregator[] value) {
                 int offset = 0;
@@ -368,12 +371,18 @@ public class GTAggregateScanner implements IGTScanner {
                     record.cols[c].set(key, offset, columnLength);
                     offset += columnLength;
                 }
-                metricsBuf.clear();
+                
+                for (int i = 0; i < value.length; i++) {
+                    tmpValues[i] = value[i].getState();
+                }
+
+                byte[] bytes = measureCodec.encode(tmpValues).array();
+                int[] sizes = measureCodec.getMeasureSizes();
+                offset = 0;
                 for (int i = 0; i < value.length; i++) {
                     int col = metrics.trueBitAt(i);
-                    int pos = metricsBuf.position();
-                    info.codeSystem.encodeColumnValue(col, 
value[i].getState(), metricsBuf);
-                    record.cols[col].set(metricsBuf.array(), pos, 
metricsBuf.position() - pos);
+                    record.cols[col].set(bytes, offset, sizes[i]);
+                    offset += sizes[i];
                 }
             }
         }
@@ -430,7 +439,6 @@ public class GTAggregateScanner implements IGTScanner {
                 if (buffMap != null) {
                     ObjectOutputStream oos = null;
                     Object[] aggrResult = null;
-                    final ByteBuffer metricsBuf = 
ByteBuffer.allocate(info.getMaxColumnLength(metrics));
                     try {
                         dumpedFile = File.createTempFile("KYLIN_AGGR_", 
".tmp");
 
@@ -438,12 +446,10 @@ public class GTAggregateScanner implements IGTScanner {
                         oos = new ObjectOutputStream(new 
FileOutputStream(dumpedFile));
                         oos.writeInt(buffMap.size());
                         for (Entry<byte[], MeasureAggregator[]> entry : 
buffMap.entrySet()) {
-                            metricsBuf.clear();
-
                             MeasureAggregators aggs = new 
MeasureAggregators(entry.getValue());
                             aggrResult = new Object[metrics.trueBitCount()];
                             aggs.collectStates(aggrResult);
-                            measureCodec.encode(aggrResult, metricsBuf);
+                            ByteBuffer metricsBuf = 
measureCodec.encode(aggrResult);
                             oos.writeObject(entry.getKey());
                             oos.writeObject(metricsBuf.array());
                         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index bccd0c5..f3cfc6a 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -136,6 +136,15 @@ public class GTRecord implements Comparable<GTRecord> {
         }
         return result;
     }
+    
+    public int sizeOf(ImmutableBitSet selectedCols) {
+        int size = 0;
+        for (int i = 0; i < selectedCols.trueBitCount(); i++) {
+            int c = selectedCols.trueBitAt(i);
+            size += cols[c].length();
+        }
+        return size;
+    }
 
     public GTRecord copy() {
         return copy(info.colAll);

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
 
b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
index 840e174..3f728b9 100644
--- 
a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
+++ 
b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
@@ -23,10 +23,9 @@ import static org.junit.Assert.*;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.measure.BufferedMeasureEncoder;
 import org.apache.kylin.measure.bitmap.BitmapCounter;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.metadata.datatype.DoubleMutable;
 import org.apache.kylin.metadata.datatype.LongMutable;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -42,7 +41,7 @@ public class MeasureCodecTest {
     public void basicTest() {
         MeasureDesc descs[] = new MeasureDesc[] { measure("double"), 
measure("long"), measure
             ("decimal"), measure("HLLC16"), measure("bitmap") };
-        MeasureCodec codec = new MeasureCodec(descs);
+        BufferedMeasureEncoder codec = new BufferedMeasureEncoder(descs);
 
         DoubleMutable d = new DoubleMutable(1.0);
         LongMutable l = new LongMutable(2);
@@ -56,9 +55,7 @@ public class MeasureCodecTest {
         bitmap.add(Integer.MAX_VALUE-10);
         Object values[] = new Object[] { d, l, b, hllc, bitmap };
 
-        ByteBuffer buf = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-
-        codec.encode(values, buf);
+        ByteBuffer buf = codec.encode(values);
         buf.flip();
         System.out.println("size: " + buf.limit());
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureEncoder.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureEncoder.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureEncoder.java
new file mode 100644
index 0000000..88c7949
--- /dev/null
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureEncoder.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+/**
+ * This class embeds a reusable byte buffer for measure encoding, and is not 
thread-safe.
+ * The buffer will grow to accommodate BufferOverflowException until a limit.
+ * The problem here to solve is some measure type cannot provide accurate 
DataTypeSerializer.maxLength()
+ */
+@SuppressWarnings({ "unchecked" })
+public class BufferedMeasureEncoder {
+    public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024; // 1 MB
+    public static final int MAX_BUFFER_SIZE = 64 * DEFAULT_BUFFER_SIZE; // 64 
MB
+
+    final private MeasureDecoder codec;
+
+    private ByteBuffer buf;
+    final private int[] measureSizes;
+
+    public BufferedMeasureEncoder(Collection<MeasureDesc> measureDescs) {
+        this.codec = new MeasureDecoder(measureDescs);
+        this.measureSizes = new int[codec.nMeasures];
+    }
+    
+    public BufferedMeasureEncoder(MeasureDesc... measureDescs) {
+        this.codec = new MeasureDecoder(measureDescs);
+        this.measureSizes = new int[codec.nMeasures];
+    }
+
+    public BufferedMeasureEncoder(DataType... dataTypes) {
+        this.codec = new MeasureDecoder(dataTypes);
+        this.measureSizes = new int[codec.nMeasures];
+    }
+    
+    public BufferedMeasureEncoder(String... dataTypes) {
+        this.codec = new MeasureDecoder(dataTypes);
+        this.measureSizes = new int[codec.nMeasures];
+    }
+
+    /** return the buffer that contains result of last encoding */
+    public ByteBuffer getBuffer() {
+        return buf;
+    }
+
+    /** return the measure sizes of last encoding */
+    public int[] getMeasureSizes() {
+        return measureSizes;
+    }
+
+    public void setBufferSize(int size) {
+        buf = null; // release memory for GC
+        buf = ByteBuffer.allocate(size);
+    }
+
+    public void decode(ByteBuffer buf, Object[] result) {
+        codec.decode(buf, result);
+    }
+
+    public ByteBuffer encode(Object[] values) {
+        if (buf == null) {
+            setBufferSize(DEFAULT_BUFFER_SIZE);
+        }
+
+        assert values.length == codec.nMeasures;
+
+        while (true) {
+            try {
+                buf.clear();
+                for (int i = 0, pos = 0; i < codec.nMeasures; i++) {
+                    codec.serializers[i].serialize(values[i], buf);
+                    measureSizes[i] = buf.position() - pos;
+                    pos = buf.position();
+                }
+                return buf;
+                
+            } catch (BufferOverflowException boe) {
+                if (buf.capacity() >= MAX_BUFFER_SIZE)
+                    throw boe;
+
+                setBufferSize(buf.capacity() * 2);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java 
b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
deleted file mode 100644
index 2794fa8..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.measure;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.datatype.DataTypeSerializer;
-import org.apache.kylin.metadata.model.MeasureDesc;
-
-/**
- * @author yangli9
- * 
- */
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class MeasureCodec {
-
-    int nMeasures;
-    DataTypeSerializer[] serializers;
-
-    public MeasureCodec(Collection<MeasureDesc> measureDescs) {
-        this((MeasureDesc[]) measureDescs.toArray(new 
MeasureDesc[measureDescs.size()]));
-    }
-
-    public MeasureCodec(MeasureDesc... measureDescs) {
-        String[] dataTypes = new String[measureDescs.length];
-        for (int i = 0; i < dataTypes.length; i++) {
-            dataTypes[i] = measureDescs[i].getFunction().getReturnType();
-        }
-        init(dataTypes);
-    }
-
-    public MeasureCodec(DataType... dateTypes) {
-        init(dateTypes);
-    }
-
-    public MeasureCodec(String... dataTypes) {
-        init(dataTypes);
-    }
-
-    private void init(String[] dataTypes) {
-        DataType[] typeInstances = new DataType[dataTypes.length];
-        for (int i = 0; i < dataTypes.length; i++) {
-            typeInstances[i] = DataType.getType(dataTypes[i]);
-        }
-        init(typeInstances);
-    }
-
-    private void init(DataType[] dataTypes) {
-        nMeasures = dataTypes.length;
-        serializers = new DataTypeSerializer[nMeasures];
-
-        for (int i = 0; i < nMeasures; i++) {
-            serializers[i] = DataTypeSerializer.create(dataTypes[i]);
-        }
-    }
-
-    public DataTypeSerializer getSerializer(int idx) {
-        return serializers[idx];
-    }
-
-    public void decode(ByteBuffer buf, Object[] result) {
-        assert result.length == nMeasures;
-        for (int i = 0; i < nMeasures; i++) {
-            result[i] = serializers[i].deserialize(buf);
-        }
-    }
-
-    public void encode(Object[] values, ByteBuffer out) {
-        assert values.length == nMeasures;
-        for (int i = 0; i < nMeasures; i++) {
-            serializers[i].serialize(values[i], out);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/core-metadata/src/main/java/org/apache/kylin/measure/MeasureDecoder.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureDecoder.java 
b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureDecoder.java
new file mode 100644
index 0000000..63dd1e7
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureDecoder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+/**
+ * @author yangli9
+ * 
+ */
+@SuppressWarnings({ "rawtypes" })
+public class MeasureDecoder {
+
+    int nMeasures;
+    DataTypeSerializer[] serializers;
+
+    public MeasureDecoder(Collection<MeasureDesc> measureDescs) {
+        this((MeasureDesc[]) measureDescs.toArray(new 
MeasureDesc[measureDescs.size()]));
+    }
+
+    public MeasureDecoder(MeasureDesc... measureDescs) {
+        String[] dataTypes = new String[measureDescs.length];
+        for (int i = 0; i < dataTypes.length; i++) {
+            dataTypes[i] = measureDescs[i].getFunction().getReturnType();
+        }
+        init(dataTypes);
+    }
+
+    public MeasureDecoder(DataType... dataTypes) {
+        init(dataTypes);
+    }
+
+    public MeasureDecoder(String... dataTypes) {
+        init(dataTypes);
+    }
+
+    private void init(String[] dataTypes) {
+        DataType[] typeInstances = new DataType[dataTypes.length];
+        for (int i = 0; i < dataTypes.length; i++) {
+            typeInstances[i] = DataType.getType(dataTypes[i]);
+        }
+        init(typeInstances);
+    }
+
+    private void init(DataType[] dataTypes) {
+        nMeasures = dataTypes.length;
+        serializers = new DataTypeSerializer[nMeasures];
+
+        for (int i = 0; i < nMeasures; i++) {
+            serializers[i] = DataTypeSerializer.create(dataTypes[i]);
+        }
+    }
+
+    public DataTypeSerializer getSerializer(int idx) {
+        return serializers[idx];
+    }
+
+    public void decode(ByteBuffer buf, Object[] result) {
+        assert result.length == nMeasures;
+        for (int i = 0; i < nMeasures; i++) {
+            result[i] = serializers[i].deserialize(buf);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java 
b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
index 4dea46b..740e896 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
@@ -107,6 +107,7 @@ abstract public class MeasureType<T> {
     /** Whether or not Calcite rel-tree needs rewrite to do last around of 
aggregation */
     abstract public boolean needRewrite();
 
+    /** Does the rewrite involves an extra field for the pre-calculated */
     public boolean needRewriteField() {
         return true;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
index cb4b1cb..78b272c 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.kylin.measure.BufferedMeasureEncoder;
 import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.kv.RowConstants;
@@ -49,7 +50,7 @@ public class CuboidStatsUtil {
         allCuboids.addAll(cuboidHLLMap.keySet());
         Collections.sort(allCuboids);
 
-        ByteBuffer valueBuf = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+        ByteBuffer valueBuf = 
ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE);
         SequenceFile.Writer writer = SequenceFile.createWriter(conf, 
SequenceFile.Writer.file(seqFilePath), 
SequenceFile.Writer.keyClass(LongWritable.class), 
SequenceFile.Writer.valueClass(BytesWritable.class));
         try {
             // mapper overlap ratio at key -1

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index a1eeb1b..e404f9a 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -35,14 +35,13 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
-import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.dimension.Dictionary;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.measure.BufferedMeasureEncoder;
 import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
@@ -77,11 +76,10 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends 
KylinMapper<KEYIN, VAL
     protected byte[][] keyBytesBuf;
     protected BytesSplitter bytesSplitter;
     protected AbstractRowKeyEncoder rowKeyEncoder;
-    protected MeasureCodec measureCodec;
+    protected BufferedMeasureEncoder measureCodec;
     private int errorRecordCounter;
     protected Text outputKey = new Text();
     protected Text outputValue = new Text();
-    private ByteBuffer valueBuf = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
 
     @Override
     protected void setup(Context context) throws IOException {
@@ -110,7 +108,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends 
KylinMapper<KEYIN, VAL
         bytesSplitter = new BytesSplitter(200, 16384);
         rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, 
baseCuboid);
 
-        measureCodec = new MeasureCodec(cubeDesc.getMeasures());
+        measureCodec = new BufferedMeasureEncoder(cubeDesc.getMeasures());
         measures = new Object[cubeDesc.getMeasures().size()];
 
         int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
@@ -153,14 +151,13 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends 
KylinMapper<KEYIN, VAL
         return rowKeyEncoder.encode(keyBytesBuf);
     }
 
-    private void buildValue(SplittedBytes[] splitBuffers) {
+    private ByteBuffer buildValue(SplittedBytes[] splitBuffers) {
 
         for (int i = 0; i < measures.length; i++) {
             measures[i] = buildValueOf(i, splitBuffers);
         }
 
-        valueBuf.clear();
-        measureCodec.encode(measures, valueBuf);
+        return measureCodec.encode(measures);
     }
 
     private Object buildValueOf(int idxOfMeasure, SplittedBytes[] 
splitBuffers) {
@@ -203,7 +200,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends 
KylinMapper<KEYIN, VAL
         byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers());
         outputKey.set(rowKey, 0, rowKey.length);
 
-        buildValue(bytesSplitter.getSplitBuffers());
+        ByteBuffer valueBuf = buildValue(bytesSplitter.getSplitBuffers());
         outputValue.set(valueBuf.array(), 0, valueBuf.position());
         context.write(outputKey, outputValue);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
index f263d99..6986344 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
@@ -25,13 +25,12 @@ import java.util.List;
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.KylinReducer;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.measure.BufferedMeasureEncoder;
 import org.apache.kylin.measure.MeasureAggregators;
-import org.apache.kylin.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,7 +47,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, 
Text, Text> {
     private CubeDesc cubeDesc;
     private List<MeasureDesc> measuresDescs;
 
-    private MeasureCodec codec;
+    private BufferedMeasureEncoder codec;
     private MeasureAggregators aggs;
 
     private int counter;
@@ -57,7 +56,6 @@ public class CuboidReducer extends KylinReducer<Text, Text, 
Text, Text> {
     private Object[] input;
     private Object[] result;
 
-    private ByteBuffer valueBuf = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
     private Text outputValue = new Text();
 
     @Override
@@ -73,7 +71,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, 
Text, Text> {
         cubeDesc = 
CubeManager.getInstance(config).getCube(cubeName).getDescriptor();
         measuresDescs = cubeDesc.getMeasures();
 
-        codec = new MeasureCodec(measuresDescs);
+        codec = new BufferedMeasureEncoder(measuresDescs);
         aggs = new MeasureAggregators(measuresDescs);
 
         input = new Object[measuresDescs.size()];
@@ -101,8 +99,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, 
Text, Text> {
         }
         aggs.collectStates(result);
 
-        valueBuf.clear();
-        codec.encode(result, valueBuf);
+        ByteBuffer valueBuf = codec.encode(result);
 
         outputValue.set(valueBuf.array(), 0, valueBuf.position());
         context.write(key, outputValue);

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
index 2688919..3be5795 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.hadoop.io.Text;
+import org.apache.kylin.measure.BufferedMeasureEncoder;
 import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
@@ -159,7 +160,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends 
FactDistinctColumnsMap
     @Override
     protected void cleanup(Context context) throws IOException, 
InterruptedException {
         if (collectStatistics) {
-            ByteBuffer hllBuf = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+            ByteBuffer hllBuf = 
ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE);
             // output each cuboid's hll to reducer, key is 0 - cuboidId
             HyperLogLogPlusCounter hll;
             for (int i = 0; i < cuboidIds.length; i++) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
index ec36242..673cfc0 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -26,17 +26,14 @@ import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
 import org.apache.kylin.engine.mr.KylinReducer;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.measure.BufferedMeasureEncoder;
 import org.apache.kylin.measure.MeasureAggregators;
-import org.apache.kylin.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +43,7 @@ public class InMemCuboidReducer extends 
KylinReducer<ByteArrayWritable, ByteArra
 
     private static final Logger logger = 
LoggerFactory.getLogger(InMemCuboidReducer.class);
 
-    private MeasureCodec codec;
+    private BufferedMeasureEncoder codec;
     private MeasureAggregators aggs;
 
     private int counter;
@@ -55,7 +52,6 @@ public class InMemCuboidReducer extends 
KylinReducer<ByteArrayWritable, ByteArra
     
     private Text outputKey;
     private Text outputValue;
-    private ByteBuffer valueBuf;
 
     @Override
     protected void setup(Context context) throws IOException {
@@ -63,22 +59,17 @@ public class InMemCuboidReducer extends 
KylinReducer<ByteArrayWritable, ByteArra
         KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
 
         String cubeName = 
context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
-        String segmentName = 
context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
-        boolean isMerge = 
Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_IS_MERGE));
-
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
         CubeDesc cubeDesc = cube.getDescriptor();
-        CubeSegment cubeSeg = cube.getSegment(segmentName, 
SegmentStatusEnum.NEW);
 
         List<MeasureDesc> measuresDescs = cubeDesc.getMeasures();
-        codec = new MeasureCodec(measuresDescs);
+        codec = new BufferedMeasureEncoder(measuresDescs);
         aggs = new MeasureAggregators(measuresDescs);
         input = new Object[measuresDescs.size()];
         result = new Object[measuresDescs.size()];
         
         outputKey = new Text();
         outputValue = new Text();
-        valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
     }
 
     @Override
@@ -96,8 +87,7 @@ public class InMemCuboidReducer extends 
KylinReducer<ByteArrayWritable, ByteArra
         outputKey.set(key.array(), key.offset(), key.length());
 
         // output value
-        valueBuf.clear();
-        codec.encode(result, valueBuf);
+        ByteBuffer valueBuf = codec.encode(result);
         outputValue.set(valueBuf.array(), 0, valueBuf.position());
 
         context.write(outputKey, outputValue);

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
index ee65132..de8b6d4 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
@@ -18,19 +18,21 @@
 
 package org.apache.kylin.engine.mr.steps;
 
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
 import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
-import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
 import org.apache.kylin.gridtable.GTRecord;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
+import org.apache.kylin.measure.BufferedMeasureEncoder;
 
 /**
  */
@@ -45,8 +47,8 @@ public abstract class KVGTRecordWriter implements 
ICuboidWriter {
     private int dimensions;
     private int measureCount;
     private byte[] keyBuf;
-    private int[] measureColumnsIndex;
-    private ByteBuffer valueBuf = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+    private ImmutableBitSet measureColumns;
+    private ByteBuffer valueBuf = 
ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE);
     private ByteArrayWritable outputKey = new ByteArrayWritable();
     private ByteArrayWritable outputValue = new ByteArrayWritable();
     private long cuboidRowCount = 0;
@@ -77,7 +79,12 @@ public abstract class KVGTRecordWriter implements 
ICuboidWriter {
 
         //output measures
         valueBuf.clear();
-        record.exportColumns(measureColumnsIndex, valueBuf);
+        try {
+            record.exportColumns(measureColumns, valueBuf);
+        } catch (BufferOverflowException boe) {
+            valueBuf = ByteBuffer.allocate((int) 
(record.sizeOf(measureColumns) * 1.5));
+            record.exportColumns(measureColumns, valueBuf);
+        }
 
         outputKey.set(keyBuf, 0, keyBuf.length);
         outputValue.set(valueBuf.array(), 0, valueBuf.position());
@@ -91,9 +98,6 @@ public abstract class KVGTRecordWriter implements 
ICuboidWriter {
         keyBuf = rowKeyEncoder.createBuf();
 
         dimensions = Long.bitCount(cuboidId);
-        measureColumnsIndex = new int[measureCount];
-        for (int i = 0; i < measureCount; i++) {
-            measureColumnsIndex[i] = dimensions + i;
-        }
+        measureColumns = new ImmutableBitSet(dimensions, dimensions + 
measureCount);
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 6766b31..72de7dc 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -47,7 +47,7 @@ import org.apache.kylin.dimension.Dictionary;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.measure.BufferedMeasureEncoder;
 import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.metadata.model.MeasureDesc;
@@ -85,9 +85,8 @@ public class MergeCuboidMapper extends KylinMapper<Text, 
Text, Text, Text> {
     private Map<TblColRef, Dictionary<String>> oldDicts;
     private Map<TblColRef, Dictionary<String>> newDicts;
     private List<MeasureDesc> measureDescs;
-    private MeasureCodec codec;
+    private BufferedMeasureEncoder codec;
     private Object[] measureObjs;
-    private ByteBuffer valueBuf;
     private Text outputValue;
 
     @Override
@@ -116,9 +115,8 @@ public class MergeCuboidMapper extends KylinMapper<Text, 
Text, Text, Text> {
         rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment);
 
         measureDescs = cubeDesc.getMeasures();
-        codec = new MeasureCodec(measureDescs);
+        codec = new BufferedMeasureEncoder(measureDescs);
         measureObjs = new Object[measureDescs.size()];
-        valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
         outputValue = new Text();
         
         dictMeasures = Lists.newArrayList();
@@ -233,8 +231,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, 
Text, Text, Text> {
                 MeasureIngester ingester = pair.getSecond();
                 measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], 
measureDescs.get(i), oldDicts, newDicts);
             }
-            valueBuf.clear();
-            codec.encode(measureObjs, valueBuf);
+            ByteBuffer valueBuf = codec.encode(measureObjs);
             outputValue.set(valueBuf.array(), 0, valueBuf.position());
             value = outputValue;
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
index aa5e77e..b64afd3 100644
--- 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
+++ 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
@@ -18,8 +18,7 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 import java.io.File;
 import java.lang.reflect.Field;
@@ -34,11 +33,10 @@ import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
 import org.apache.hadoop.mrunit.types.Pair;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.measure.BufferedMeasureEncoder;
 import org.apache.kylin.measure.MeasureAggregator;
-import org.apache.kylin.measure.MeasureCodec;
 import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.metadata.datatype.LongMutable;
@@ -50,12 +48,11 @@ import org.junit.Test;
 
 /**
  */
+@SuppressWarnings("rawtypes")
 public class CubeReducerTest extends LocalFileMetadataTestCase {
 
     ReduceDriver<Text, Text, Text, Text> reduceDriver;
 
-    ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-
     @Before
     public void setUp() throws Exception {
         createTestMetadata();
@@ -80,7 +77,7 @@ public class CubeReducerTest extends 
LocalFileMetadataTestCase {
         reduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, 
"test_kylin_cube_with_slr_ready");
 
         CubeDesc cubeDesc = 
CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready").getDescriptor();
-        MeasureCodec codec = new MeasureCodec(cubeDesc.getMeasures());
+        BufferedMeasureEncoder codec = new 
BufferedMeasureEncoder(cubeDesc.getMeasures());
 
         Text key1 = new Text("72010ustech");
         List<Text> values1 = new ArrayList<Text>();
@@ -127,7 +124,7 @@ public class CubeReducerTest extends 
LocalFileMetadataTestCase {
         MeasureType origMeasureType = functionDesc.getMeasureType();
         field.set(functionDesc, new MockUpMeasureType(origMeasureType));
 
-        MeasureCodec codec = new MeasureCodec(cubeDesc.getMeasures());
+        BufferedMeasureEncoder codec = new 
BufferedMeasureEncoder(cubeDesc.getMeasures());
 
         Text key1 = new Text("72010ustech");
         List<Text> values1 = new ArrayList<Text>();
@@ -161,11 +158,10 @@ public class CubeReducerTest extends 
LocalFileMetadataTestCase {
         assertTrue(result.contains(p3));
     }
 
-    private Text newValueText(MeasureCodec codec, String sum, String min, 
String max, int count, int item_count) {
+    private Text newValueText(BufferedMeasureEncoder codec, String sum, String 
min, String max, int count, int item_count) {
         Object[] values = new Object[] { new BigDecimal(sum), new 
BigDecimal(min), new BigDecimal(max), new LongMutable(count), new 
LongMutable(item_count) };
 
-        buf.clear();
-        codec.encode(values, buf);
+        ByteBuffer buf = codec.encode(values);
 
         Text t = new Text();
         t.set(buf.array(), 0, buf.position());

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 2b4e497..796fb9e 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -67,7 +67,6 @@ import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
 import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
 import org.apache.kylin.cube.kv.CubeDimEncMap;
-import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.cube.model.DimensionDesc;
@@ -81,8 +80,8 @@ import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter;
 import org.apache.kylin.engine.spark.cube.DefaultTupleConverter;
 import org.apache.kylin.engine.spark.util.IteratorUtils;
+import org.apache.kylin.measure.BufferedMeasureEncoder;
 import org.apache.kylin.measure.MeasureAggregators;
-import org.apache.kylin.measure.MeasureCodec;
 import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
@@ -110,8 +109,6 @@ import org.reflections.Reflections;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.Tuple2;
-
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
@@ -123,6 +120,8 @@ import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 import com.google.common.primitives.UnsignedBytes;
 
+import scala.Tuple2;
+
 /**
  */
 public class SparkCubing extends AbstractApplication {
@@ -422,8 +421,7 @@ public class SparkCubing extends AbstractApplication {
             @Override
             public Iterable<Tuple2<byte[], byte[]>> call(final 
Iterator<Tuple2<byte[], byte[]>> tuple2Iterator) throws Exception {
                 return new Iterable<Tuple2<byte[], byte[]>>() {
-                    final ByteBuffer buffer = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-                    final MeasureCodec codec = new MeasureCodec(dataTypes);
+                    final BufferedMeasureEncoder codec = new 
BufferedMeasureEncoder(dataTypes);
                     final Object[] input = new Object[measureSize];
                     final Object[] result = new Object[measureSize];
 
@@ -442,8 +440,7 @@ public class SparkCubing extends AbstractApplication {
                                     aggs.aggregate(input);
                                 }
                                 aggs.collectStates(result);
-                                buffer.clear();
-                                codec.encode(result, buffer);
+                                ByteBuffer buffer = codec.encode(result);
                                 byte[] bytes = new byte[buffer.position()];
                                 System.arraycopy(buffer.array(), 
buffer.arrayOffset(), bytes, 0, buffer.position());
                                 return bytes;

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
index 8da1519..2532679 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
@@ -17,16 +17,19 @@
 */
 package org.apache.kylin.engine.spark.cube;
 
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.Map;
 
 import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.kv.RowKeyEncoder;
 import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
 import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.measure.BufferedMeasureEncoder;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import scala.Tuple2;
@@ -36,7 +39,6 @@ import scala.Tuple2;
 public final class DefaultTupleConverter implements TupleConverter {
 
     private final static ThreadLocal<ByteBuffer> valueBuf = new 
ThreadLocal<>();
-    private final static ThreadLocal<int[]> measureColumnsIndex = new 
ThreadLocal<>();
     private final CubeSegment segment;
     private final int measureCount;
     private final Map<TblColRef, Integer> columnLengthMap;
@@ -52,16 +54,13 @@ public final class DefaultTupleConverter implements 
TupleConverter {
 
     private ByteBuffer getValueBuf() {
         if (valueBuf.get() == null) {
-            
valueBuf.set(ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE));
+            
valueBuf.set(ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE));
         }
         return valueBuf.get();
     }
-
-    private int[] getMeasureColumnsIndex() {
-        if (measureColumnsIndex.get() == null) {
-            measureColumnsIndex.set(new int[measureCount]);
-        }
-        return measureColumnsIndex.get();
+    
+    private void setValueBuf(ByteBuffer buf) {
+        valueBuf.set(buf);
     }
 
     @Override
@@ -70,10 +69,7 @@ public final class DefaultTupleConverter implements 
TupleConverter {
         RowKeyEncoder rowkeyEncoder = 
rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
 
         final int dimensions = Long.bitCount(cuboidId);
-        int[] measureColumnsIndex = getMeasureColumnsIndex();
-        for (int i = 0; i < measureCount; i++) {
-            measureColumnsIndex[i] = dimensions + i;
-        }
+        final ImmutableBitSet measureColumns = new ImmutableBitSet(dimensions, 
dimensions + measureCount);
 
         int offSet = 0;
         for (int x = 0; x < dimensions; x++) {
@@ -87,7 +83,13 @@ public final class DefaultTupleConverter implements 
TupleConverter {
 
         ByteBuffer valueBuf = getValueBuf();
         valueBuf.clear();
-        record.exportColumns(measureColumnsIndex, valueBuf);
+        try {
+            record.exportColumns(measureColumns, valueBuf);
+        } catch (BufferOverflowException boe) {
+            valueBuf = ByteBuffer.allocate((int) 
(record.sizeOf(measureColumns) * 1.5));
+            record.exportColumns(measureColumns, valueBuf);
+            setValueBuf(valueBuf);
+        }
 
         byte[] value = new byte[valueBuf.position()];
         System.arraycopy(valueBuf.array(), 0, value, 0, valueBuf.position());

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
 
b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
index 68b6ae4..e06fb68 100644
--- 
a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.measure.BufferedMeasureEncoder;
 import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.kv.RowConstants;
@@ -97,7 +98,7 @@ public class ColumnCardinalityMapper<T> extends 
KylinMapper<T, Object, IntWritab
     @Override
     protected void cleanup(Context context) throws IOException, 
InterruptedException {
         Iterator<Integer> it = hllcMap.keySet().iterator();
-        ByteBuffer buf = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+        ByteBuffer buf = 
ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE);
         while (it.hasNext()) {
             int key = it.next();
             HyperLogLogPlusCounter hllc = hllcMap.get(key);

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
 
b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
index 19f5759..6596917 100644
--- 
a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.measure.BufferedMeasureEncoder;
 import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.engine.mr.KylinReducer;
@@ -79,7 +80,7 @@ public class ColumnCardinalityReducer extends 
KylinReducer<IntWritable, BytesWri
         while (it.hasNext()) {
             int key = it.next();
             HyperLogLogPlusCounter hllc = hllcMap.get(key);
-            ByteBuffer buf = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+            ByteBuffer buf = 
ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE);
             buf.clear();
             hllc.writeRegisters(buf);
             buf.flip();

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java
 
b/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java
index 57721d6..b9f532d 100644
--- 
a/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java
+++ 
b/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
 import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.kylin.measure.BufferedMeasureEncoder;
 import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.kv.RowConstants;
@@ -65,7 +66,7 @@ public class ColumnCardinalityReducerTest {
             i++;
             hllc.add(Bytes.toBytes(temp));
         }
-        ByteBuffer buf = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+        ByteBuffer buf = 
ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE);
         buf.clear();
         hllc.writeRegisters(buf);
         buf.flip();

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java
index 9e9dd6d..941c10f 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java
@@ -28,10 +28,9 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.measure.BufferedMeasureEncoder;
 import org.apache.kylin.measure.MeasureAggregator;
-import org.apache.kylin.measure.MeasureCodec;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.MeasureTypeFactory;
 import org.apache.kylin.metadata.datatype.DataType;
@@ -228,9 +227,7 @@ public class ObserverAggregators {
             for (int j = 0; j < col.nMeasures; j++)
                 col.measureValues[j] = aggrs[i++].getState();
 
-            col.measureBuf.clear();
-            col.measureCodec.encode(col.measureValues, col.measureBuf);
-            hColValues[ci] = col.measureBuf;
+            hColValues[ci] = col.measureCodec.encode(col.measureValues);
         }
         return hColValues;
     }
@@ -244,9 +241,8 @@ public class ObserverAggregators {
         final String[] dataTypes;
         final int nMeasures;
 
-        final MeasureCodec measureCodec;
+        final BufferedMeasureEncoder measureCodec;
         final Object[] measureValues;
-        final ByteBuffer measureBuf;
 
         public HCol(byte[] bFamily, byte[] bQualifier, String[] funcNames, 
String[] dataTypes) {
             this.family = bFamily;
@@ -256,9 +252,8 @@ public class ObserverAggregators {
             this.nMeasures = funcNames.length;
             assert funcNames.length == dataTypes.length;
 
-            this.measureCodec = new MeasureCodec(dataTypes);
+            this.measureCodec = new BufferedMeasureEncoder(dataTypes);
             this.measureValues = new Object[nMeasures];
-            this.measureBuf = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 510e735..3ccf7cf 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
@@ -48,6 +49,7 @@ import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.gridtable.IGTStore;
+import org.apache.kylin.measure.BufferedMeasureEncoder;
 import org.apache.kylin.metadata.filter.UDF.MassInTupleFilter;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
@@ -279,9 +281,9 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
                     behavior.ordinal() >= 
CoprocessorBehavior.SCAN_FILTER.ordinal(), //
                     behavior.ordinal() >= 
CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal());
 
-            ByteBuffer buffer = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+            ByteBuffer buffer = 
ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE);
 
-            ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(RowConstants.ROWVALUE_BUFFER_SIZE);//ByteArrayOutputStream
 will auto grow
+            ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE);//ByteArrayOutputStream
 will auto grow
             int finalRowCount = 0;
             for (GTRecord oneRecord : finalScanner) {
 
@@ -298,10 +300,14 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
                 }
 
                 buffer.clear();
-                oneRecord.exportColumns(scanReq.getColumns(), buffer);
-                buffer.flip();
+                try {
+                    oneRecord.exportColumns(scanReq.getColumns(), buffer);
+                } catch (BufferOverflowException boe) {
+                    buffer = ByteBuffer.allocate((int) 
(oneRecord.sizeOf(scanReq.getColumns()) * 1.5));
+                    oneRecord.exportColumns(scanReq.getColumns(), buffer);
+                }
 
-                outputStream.write(buffer.array(), buffer.arrayOffset() - 
buffer.position(), buffer.remaining());
+                outputStream.write(buffer.array(), 0, buffer.position());
                 finalRowCount++;
             }
             finalScanner.close();

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
index 8205ff7..7523249 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
@@ -33,7 +33,7 @@ import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureDecoder;
 
 import com.google.common.collect.Lists;
 
@@ -48,7 +48,7 @@ public class CubeHFileMapper extends KylinMapper<Text, Text, 
ImmutableBytesWrita
     String cubeName;
     CubeDesc cubeDesc;
 
-    MeasureCodec inputCodec;
+    MeasureDecoder inputCodec;
     Object[] inputMeasures;
     List<KeyValueCreator> keyValueCreators;
 
@@ -62,7 +62,7 @@ public class CubeHFileMapper extends KylinMapper<Text, Text, 
ImmutableBytesWrita
         CubeManager cubeMgr = CubeManager.getInstance(config);
         cubeDesc = cubeMgr.getCube(cubeName).getDescriptor();
 
-        inputCodec = new MeasureCodec(cubeDesc.getMeasures());
+        inputCodec = new MeasureDecoder(cubeDesc.getMeasures());
         inputMeasures = new Object[cubeDesc.getMeasures().size()];
         keyValueCreators = Lists.newArrayList();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
index c6a233b..490031e 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
@@ -24,10 +24,9 @@ import java.util.List;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.measure.BufferedMeasureEncoder;
 import org.apache.kylin.metadata.model.MeasureDesc;
 
 /**
@@ -41,9 +40,8 @@ public class KeyValueCreator {
     int[] refIndex;
     MeasureDesc[] refMeasures;
 
-    MeasureCodec codec;
+    BufferedMeasureEncoder codec;
     Object[] colValues;
-    ByteBuffer valueBuf = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
 
     public boolean isFullCopy;
 
@@ -56,7 +54,7 @@ public class KeyValueCreator {
         refIndex = colDesc.getMeasureIndex();
         refMeasures = colDesc.getMeasures();
 
-        codec = new MeasureCodec(refMeasures);
+        codec = new BufferedMeasureEncoder(refMeasures);
         colValues = new Object[refMeasures.length];
 
         isFullCopy = true;
@@ -76,8 +74,7 @@ public class KeyValueCreator {
             colValues[i] = measureValues[refIndex[i]];
         }
 
-        valueBuf.clear();
-        codec.encode(colValues, valueBuf);
+        ByteBuffer valueBuf = codec.encode(colValues);
 
         return create(keyBytes, keyOffset, keyLength, valueBuf.array(), 0, 
valueBuf.position());
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
index 7d05dc9..e1e8f8c 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
@@ -25,7 +25,7 @@ import java.util.Collection;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureDecoder;
 import org.apache.kylin.metadata.datatype.DoubleMutable;
 import org.apache.kylin.metadata.datatype.LongMutable;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -44,7 +44,7 @@ public class RowValueDecoder implements Cloneable {
     private final byte[] hbaseColumnFamily;
     private final byte[] hbaseColumnQualifier;
 
-    private final MeasureCodec codec;
+    private final MeasureDecoder codec;
     private final BitSet projectionIndex;
     private final MeasureDesc[] measures;
     private final Object[] values;
@@ -55,7 +55,7 @@ public class RowValueDecoder implements Cloneable {
         this.hbaseColumnQualifier = Bytes.toBytes(hbaseColumn.getQualifier());
         this.projectionIndex = new BitSet();
         this.measures = hbaseColumn.getMeasures();
-        this.codec = new MeasureCodec(measures);
+        this.codec = new MeasureDecoder(measures);
         this.values = new Object[measures.length];
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
index cd4e33d..3efcf92 100644
--- 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
+++ 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
@@ -18,8 +18,7 @@
 
 package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -36,7 +35,6 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.metadata.datatype.LongMutable;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
@@ -55,7 +53,6 @@ import com.google.common.collect.Lists;
  * @author yangli9
  */
 public class AggregateRegionObserverTest {
-    ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
 
     byte[] mask = new byte[] { (byte) 0xff, (byte) 0xff, 0, 0 };
     byte[] k1 = new byte[] { 0x01, 0x01, 0, 0x01 };
@@ -94,8 +91,7 @@ public class AggregateRegionObserverTest {
         Object[] values = number == Integer.MIN_VALUE ? //
         new Object[] { new BigDecimal(decimal) } //
                 : new Object[] { new BigDecimal(decimal), new 
LongMutable(number) };
-        buf.clear();
-        col.measureCodec.encode(values, buf);
+        ByteBuffer buf = col.measureCodec.encode(values);
 
         Cell keyValue = new KeyValue(key, 0, key.length, //
                 col.family, 0, col.family.length, //

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
index b715498..dbf39e7 100644
--- 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
+++ 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
@@ -35,7 +35,7 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureDecoder;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -49,8 +49,7 @@ public class CubeHFileMapper2Test extends 
LocalFileMetadataTestCase {
 
     String cubeName = "test_kylin_cube_with_slr_ready";
 
-    MeasureCodec codec;
-    ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+    MeasureDecoder codec;
     Object[] outKV = new Object[2];
 
     @Before
@@ -60,7 +59,7 @@ public class CubeHFileMapper2Test extends 
LocalFileMetadataTestCase {
         FileUtils.deleteDirectory(new File("../job/meta"));
         FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl()), 
new File("../job/meta"));
         CubeDesc desc = 
CubeManager.getInstance(getTestConfig()).getCube(cubeName).getDescriptor();
-        codec = new MeasureCodec(desc.getMeasures());
+        codec = new MeasureDecoder(desc.getMeasures());
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
index 488ff59..6475bad 100644
--- 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
+++ 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.kylin.storage.hbase.steps;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
@@ -27,10 +27,9 @@ import java.util.Arrays;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.measure.BufferedMeasureEncoder;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.datatype.LongMutable;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -57,14 +56,13 @@ public class RowValueDecoderTest extends 
LocalFileMetadataTestCase {
         CubeDesc cubeDesc = 
CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready").getDescriptor();
         HBaseColumnDesc hbaseCol = 
cubeDesc.getHbaseMapping().getColumnFamily()[0].getColumns()[0];
 
-        MeasureCodec codec = new MeasureCodec(hbaseCol.getMeasures());
+        BufferedMeasureEncoder codec = new 
BufferedMeasureEncoder(hbaseCol.getMeasures());
         BigDecimal sum = new BigDecimal("333.1234567");
         BigDecimal min = new BigDecimal("333.1111111");
         BigDecimal max = new BigDecimal("333.1999999");
         LongMutable count = new LongMutable(2);
         LongMutable item_count = new LongMutable(100);
-        ByteBuffer buf = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-        codec.encode(new Object[] { sum, min, max, count, item_count }, buf);
+        ByteBuffer buf = codec.encode(new Object[] { sum, min, max, count, 
item_count });
 
         buf.flip();
         byte[] valueBytes = new byte[buf.limit()];
@@ -88,14 +86,13 @@ public class RowValueDecoderTest extends 
LocalFileMetadataTestCase {
         CubeDesc cubeDesc = 
CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready").getDescriptor();
         HBaseColumnDesc hbaseCol = 
cubeDesc.getHbaseMapping().getColumnFamily()[0].getColumns()[0];
 
-        MeasureCodec codec = new MeasureCodec(hbaseCol.getMeasures());
+        BufferedMeasureEncoder codec = new 
BufferedMeasureEncoder(hbaseCol.getMeasures());
         BigDecimal sum = new BigDecimal("11111111111111111111333.1234567");
         BigDecimal min = new BigDecimal("333.1111111");
         BigDecimal max = new BigDecimal("333.1999999");
         LongWritable count = new LongWritable(2);
         LongMutable item_count = new LongMutable(100);
-        ByteBuffer buf = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-        codec.encode(new Object[] { sum, min, max, count, item_count }, buf);
+        codec.encode(new Object[] { sum, min, max, count, item_count });
 
     }
 }

Reply via email to