KYLIN-1718 Grow ByteBuffer Dynamically in Cube Building and Query

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1587b293
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1587b293
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1587b293

Branch: refs/heads/KYLIN-1379-1705-1718-1719
Commit: 1587b2930f5e448c160f2e6d5f3e3b8d6005c4ac
Parents: 16f94bf
Author: sunyerui <sunye...@gmail.com>
Authored: Fri May 20 18:42:10 2016 +0800
Committer: sunyerui <sunye...@gmail.com>
Committed: Sun May 22 10:30:09 2016 +0800

----------------------------------------------------------------------
 .../kylin/gridtable/GTAggregateScanner.java     | 24 +++++++++++++-------
 .../metadata/measure/MeasureCodecTest.java      |  2 +-
 .../org/apache/kylin/measure/MeasureCodec.java  | 16 ++++++++++---
 .../engine/mr/steps/BaseCuboidMapperBase.java   |  2 +-
 .../kylin/engine/mr/steps/CuboidReducer.java    |  2 +-
 .../engine/mr/steps/InMemCuboidReducer.java     |  2 +-
 .../engine/mr/steps/MergeCuboidMapper.java      |  2 +-
 .../kylin/engine/mr/steps/CubeReducerTest.java  |  2 +-
 .../apache/kylin/engine/spark/SparkCubing.java  |  4 ++--
 .../coprocessor/endpoint/CubeVisitService.java  | 10 +++++++-
 .../storage/hbase/steps/KeyValueCreator.java    |  2 +-
 .../observer/AggregateRegionObserverTest.java   |  2 +-
 .../hbase/steps/RowValueDecoderTest.java        |  4 ++--
 13 files changed, 50 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 7356e77..4709fe7 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -25,6 +25,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -358,7 +359,7 @@ public class GTAggregateScanner implements IGTScanner {
 
         class ReturningRecord {
             final GTRecord record = new GTRecord(info);
-            final ByteBuffer metricsBuf = 
ByteBuffer.allocate(info.getMaxColumnLength(metrics));
+            ByteBuffer metricsBuf = 
ByteBuffer.allocate(info.getMaxColumnLength(metrics));
 
             void load(byte[] key, MeasureAggregator[] value) {
                 int offset = 0;
@@ -369,11 +370,18 @@ public class GTAggregateScanner implements IGTScanner {
                     offset += columnLength;
                 }
                 metricsBuf.clear();
-                for (int i = 0; i < value.length; i++) {
-                    int col = metrics.trueBitAt(i);
-                    int pos = metricsBuf.position();
-                    info.codeSystem.encodeColumnValue(col, 
value[i].getState(), metricsBuf);
-                    record.cols[col].set(metricsBuf.array(), pos, 
metricsBuf.position() - pos);
+                while (true) {
+                    try {
+                        for (int i = 0; i < value.length; i++) {
+                            int col = metrics.trueBitAt(i);
+                            int pos = metricsBuf.position();
+                            info.codeSystem.encodeColumnValue(col, 
value[i].getState(), metricsBuf);
+                            record.cols[col].set(metricsBuf.array(), pos, 
metricsBuf.position() - pos);
+                        }
+                        break;
+                    } catch (BufferOverflowException boe) {
+                        metricsBuf = ByteBuffer.allocate(metricsBuf.capacity() 
* 2);
+                    }
                 }
             }
         }
@@ -430,7 +438,7 @@ public class GTAggregateScanner implements IGTScanner {
                 if (buffMap != null) {
                     ObjectOutputStream oos = null;
                     Object[] aggrResult = null;
-                    final ByteBuffer metricsBuf = 
ByteBuffer.allocate(info.getMaxColumnLength(metrics));
+                    ByteBuffer metricsBuf = 
ByteBuffer.allocate(info.getMaxColumnLength(metrics));
                     try {
                         dumpedFile = File.createTempFile("KYLIN_AGGR_", 
".tmp");
 
@@ -443,7 +451,7 @@ public class GTAggregateScanner implements IGTScanner {
                             MeasureAggregators aggs = new 
MeasureAggregators(entry.getValue());
                             aggrResult = new Object[metrics.trueBitCount()];
                             aggs.collectStates(aggrResult);
-                            measureCodec.encode(aggrResult, metricsBuf);
+                            metricsBuf = measureCodec.encode(aggrResult, 
metricsBuf);
                             oos.writeObject(entry.getKey());
                             oos.writeObject(metricsBuf.array());
                         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
 
b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
index 840e174..e5ee2a4 100644
--- 
a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
+++ 
b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
@@ -58,7 +58,7 @@ public class MeasureCodecTest {
 
         ByteBuffer buf = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
 
-        codec.encode(values, buf);
+        buf = codec.encode(values, buf);
         buf.flip();
         System.out.println("size: " + buf.limit());
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java 
b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
index 2794fa8..3ae2576 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.measure;
 
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 
@@ -83,10 +84,19 @@ public class MeasureCodec {
         }
     }
 
-    public void encode(Object[] values, ByteBuffer out) {
+    public ByteBuffer encode(Object[] values, ByteBuffer out) {
         assert values.length == nMeasures;
-        for (int i = 0; i < nMeasures; i++) {
-            serializers[i].serialize(values[i], out);
+        ByteBuffer buffer = out;
+        while (true) {
+            try {
+                for (int i = 0; i < nMeasures; i++) {
+                    serializers[i].serialize(values[i], buffer);
+                }
+                break;
+            } catch (BufferOverflowException boe) {
+                buffer = ByteBuffer.allocate(buffer.capacity() * 2);
+            }
         }
+        return buffer;
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index a1eeb1b..dc8fc23 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -160,7 +160,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends 
KylinMapper<KEYIN, VAL
         }
 
         valueBuf.clear();
-        measureCodec.encode(measures, valueBuf);
+        valueBuf = measureCodec.encode(measures, valueBuf);
     }
 
     private Object buildValueOf(int idxOfMeasure, SplittedBytes[] 
splitBuffers) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
index f263d99..6f9d678 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
@@ -102,7 +102,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, 
Text, Text> {
         aggs.collectStates(result);
 
         valueBuf.clear();
-        codec.encode(result, valueBuf);
+        valueBuf = codec.encode(result, valueBuf);
 
         outputValue.set(valueBuf.array(), 0, valueBuf.position());
         context.write(key, outputValue);

http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
index ec36242..9778010 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -97,7 +97,7 @@ public class InMemCuboidReducer extends 
KylinReducer<ByteArrayWritable, ByteArra
 
         // output value
         valueBuf.clear();
-        codec.encode(result, valueBuf);
+        valueBuf = codec.encode(result, valueBuf);
         outputValue.set(valueBuf.array(), 0, valueBuf.position());
 
         context.write(outputKey, outputValue);

http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 1209da2..50304a1 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -235,7 +235,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, 
Text, Text, Text> {
                 measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], 
measureDescs.get(i), oldDicts, newDicts);
             }
             valueBuf.clear();
-            codec.encode(measureObjs, valueBuf);
+            valueBuf = codec.encode(measureObjs, valueBuf);
             outputValue.set(valueBuf.array(), 0, valueBuf.position());
             value = outputValue;
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
index aa5e77e..47a550d 100644
--- 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
+++ 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
@@ -165,7 +165,7 @@ public class CubeReducerTest extends 
LocalFileMetadataTestCase {
         Object[] values = new Object[] { new BigDecimal(sum), new 
BigDecimal(min), new BigDecimal(max), new LongMutable(count), new 
LongMutable(item_count) };
 
         buf.clear();
-        codec.encode(values, buf);
+        buf = codec.encode(values, buf);
 
         Text t = new Text();
         t.set(buf.array(), 0, buf.position());

http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 2b4e497..e7a38c7 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -422,7 +422,7 @@ public class SparkCubing extends AbstractApplication {
             @Override
             public Iterable<Tuple2<byte[], byte[]>> call(final 
Iterator<Tuple2<byte[], byte[]>> tuple2Iterator) throws Exception {
                 return new Iterable<Tuple2<byte[], byte[]>>() {
-                    final ByteBuffer buffer = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+                    ByteBuffer buffer = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
                     final MeasureCodec codec = new MeasureCodec(dataTypes);
                     final Object[] input = new Object[measureSize];
                     final Object[] result = new Object[measureSize];
@@ -443,7 +443,7 @@ public class SparkCubing extends AbstractApplication {
                                 }
                                 aggs.collectStates(result);
                                 buffer.clear();
-                                codec.encode(result, buffer);
+                                buffer = codec.encode(result, buffer);
                                 byte[] bytes = new byte[buffer.position()];
                                 System.arraycopy(buffer.array(), 
buffer.arrayOffset(), bytes, 0, buffer.position());
                                 return bytes;

http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 0cd35f1..4c029df 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
@@ -298,7 +299,14 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
                 }
 
                 buffer.clear();
-                oneRecord.exportColumns(scanReq.getColumns(), buffer);
+                while (true) {
+                    try {
+                        oneRecord.exportColumns(scanReq.getColumns(), buffer);
+                        break;
+                    } catch (BufferOverflowException boe) {
+                        buffer = ByteBuffer.allocate(buffer.capacity() * 4);
+                    }
+                }
                 buffer.flip();
 
                 outputStream.write(buffer.array(), buffer.arrayOffset() - 
buffer.position(), buffer.remaining());

http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
index c6a233b..cda7546 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
@@ -77,7 +77,7 @@ public class KeyValueCreator {
         }
 
         valueBuf.clear();
-        codec.encode(colValues, valueBuf);
+        valueBuf = codec.encode(colValues, valueBuf);
 
         return create(keyBytes, keyOffset, keyLength, valueBuf.array(), 0, 
valueBuf.position());
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
index cd4e33d..69d46ee 100644
--- 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
+++ 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
@@ -95,7 +95,7 @@ public class AggregateRegionObserverTest {
         new Object[] { new BigDecimal(decimal) } //
                 : new Object[] { new BigDecimal(decimal), new 
LongMutable(number) };
         buf.clear();
-        col.measureCodec.encode(values, buf);
+        buf = col.measureCodec.encode(values, buf);
 
         Cell keyValue = new KeyValue(key, 0, key.length, //
                 col.family, 0, col.family.length, //

http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
index 488ff59..334302c 100644
--- 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
+++ 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
@@ -64,7 +64,7 @@ public class RowValueDecoderTest extends 
LocalFileMetadataTestCase {
         LongMutable count = new LongMutable(2);
         LongMutable item_count = new LongMutable(100);
         ByteBuffer buf = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-        codec.encode(new Object[] { sum, min, max, count, item_count }, buf);
+        buf = codec.encode(new Object[] { sum, min, max, count, item_count }, 
buf);
 
         buf.flip();
         byte[] valueBytes = new byte[buf.limit()];
@@ -95,7 +95,7 @@ public class RowValueDecoderTest extends 
LocalFileMetadataTestCase {
         LongWritable count = new LongWritable(2);
         LongMutable item_count = new LongMutable(100);
         ByteBuffer buf = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-        codec.encode(new Object[] { sum, min, max, count, item_count }, buf);
+        buf = codec.encode(new Object[] { sum, min, max, count, item_count }, 
buf);
 
     }
 }

Reply via email to