KYLIN-1718 Grow ByteBuffer Dynamically in Cube Building and Query
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1587b293 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1587b293 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1587b293 Branch: refs/heads/KYLIN-1379-1705-1718-1719 Commit: 1587b2930f5e448c160f2e6d5f3e3b8d6005c4ac Parents: 16f94bf Author: sunyerui <sunye...@gmail.com> Authored: Fri May 20 18:42:10 2016 +0800 Committer: sunyerui <sunye...@gmail.com> Committed: Sun May 22 10:30:09 2016 +0800 ---------------------------------------------------------------------- .../kylin/gridtable/GTAggregateScanner.java | 24 +++++++++++++------- .../metadata/measure/MeasureCodecTest.java | 2 +- .../org/apache/kylin/measure/MeasureCodec.java | 16 ++++++++++--- .../engine/mr/steps/BaseCuboidMapperBase.java | 2 +- .../kylin/engine/mr/steps/CuboidReducer.java | 2 +- .../engine/mr/steps/InMemCuboidReducer.java | 2 +- .../engine/mr/steps/MergeCuboidMapper.java | 2 +- .../kylin/engine/mr/steps/CubeReducerTest.java | 2 +- .../apache/kylin/engine/spark/SparkCubing.java | 4 ++-- .../coprocessor/endpoint/CubeVisitService.java | 10 +++++++- .../storage/hbase/steps/KeyValueCreator.java | 2 +- .../observer/AggregateRegionObserverTest.java | 2 +- .../hbase/steps/RowValueDecoderTest.java | 4 ++-- 13 files changed, 50 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java index 7356e77..4709fe7 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java @@ -25,6 +25,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Comparator; @@ -358,7 +359,7 @@ public class GTAggregateScanner implements IGTScanner { class ReturningRecord { final GTRecord record = new GTRecord(info); - final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics)); + ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics)); void load(byte[] key, MeasureAggregator[] value) { int offset = 0; @@ -369,11 +370,18 @@ public class GTAggregateScanner implements IGTScanner { offset += columnLength; } metricsBuf.clear(); - for (int i = 0; i < value.length; i++) { - int col = metrics.trueBitAt(i); - int pos = metricsBuf.position(); - info.codeSystem.encodeColumnValue(col, value[i].getState(), metricsBuf); - record.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos); + while (true) { + try { + for (int i = 0; i < value.length; i++) { + int col = metrics.trueBitAt(i); + int pos = metricsBuf.position(); + info.codeSystem.encodeColumnValue(col, value[i].getState(), metricsBuf); + record.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos); + } + break; + } catch (BufferOverflowException boe) { + metricsBuf = ByteBuffer.allocate(metricsBuf.capacity() * 2); + } } } } @@ -430,7 +438,7 @@ public class GTAggregateScanner implements IGTScanner { if (buffMap != null) { ObjectOutputStream oos = null; Object[] aggrResult = null; - final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics)); + ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics)); try { dumpedFile = File.createTempFile("KYLIN_AGGR_", ".tmp"); @@ -443,7 +451,7 @@ public class GTAggregateScanner implements IGTScanner { MeasureAggregators aggs = new MeasureAggregators(entry.getValue()); aggrResult = new Object[metrics.trueBitCount()]; aggs.collectStates(aggrResult); - measureCodec.encode(aggrResult, metricsBuf); + metricsBuf = measureCodec.encode(aggrResult, metricsBuf); oos.writeObject(entry.getKey()); oos.writeObject(metricsBuf.array()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java index 840e174..e5ee2a4 100644 --- a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java +++ b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java @@ -58,7 +58,7 @@ public class MeasureCodecTest { ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - codec.encode(values, buf); + buf = codec.encode(values, buf); buf.flip(); System.out.println("size: " + buf.limit()); http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java index 2794fa8..3ae2576 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java @@ -18,6 +18,7 @@ package org.apache.kylin.measure; +import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.Collection; @@ -83,10 +84,19 @@ public class MeasureCodec { } } - public void encode(Object[] values, ByteBuffer out) { + public ByteBuffer encode(Object[] values, ByteBuffer out) { assert values.length == nMeasures; - for (int i = 0; i < nMeasures; i++) { - serializers[i].serialize(values[i], out); + ByteBuffer buffer = out; + while (true) { + try { + for (int i = 0; i < nMeasures; i++) { + serializers[i].serialize(values[i], buffer); + } + break; + } catch (BufferOverflowException boe) { + buffer = ByteBuffer.allocate(buffer.capacity() * 2); + } } + return buffer; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java index a1eeb1b..dc8fc23 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java @@ -160,7 +160,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL } valueBuf.clear(); - measureCodec.encode(measures, valueBuf); + valueBuf = measureCodec.encode(measures, valueBuf); } private Object buildValueOf(int idxOfMeasure, SplittedBytes[] splitBuffers) { http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java index f263d99..6f9d678 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java @@ -102,7 +102,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> { aggs.collectStates(result); valueBuf.clear(); - codec.encode(result, valueBuf); + valueBuf = codec.encode(result, valueBuf); outputValue.set(valueBuf.array(), 0, valueBuf.position()); context.write(key, outputValue); http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java index ec36242..9778010 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java @@ -97,7 +97,7 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra // output value valueBuf.clear(); - codec.encode(result, valueBuf); + valueBuf = codec.encode(result, valueBuf); outputValue.set(valueBuf.array(), 0, valueBuf.position()); context.write(outputKey, outputValue); http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java index 1209da2..50304a1 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java @@ -235,7 +235,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts); } valueBuf.clear(); - codec.encode(measureObjs, valueBuf); + valueBuf = codec.encode(measureObjs, valueBuf); outputValue.set(valueBuf.array(), 0, valueBuf.position()); value = outputValue; } http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java index aa5e77e..47a550d 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java @@ -165,7 +165,7 @@ public class CubeReducerTest extends LocalFileMetadataTestCase { Object[] values = new Object[] { new BigDecimal(sum), new BigDecimal(min), new BigDecimal(max), new LongMutable(count), new LongMutable(item_count) }; buf.clear(); - codec.encode(values, buf); + buf = codec.encode(values, buf); Text t = new Text(); t.set(buf.array(), 0, buf.position()); http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index 2b4e497..e7a38c7 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -422,7 +422,7 @@ public class SparkCubing extends AbstractApplication { @Override public Iterable<Tuple2<byte[], byte[]>> call(final Iterator<Tuple2<byte[], byte[]>> tuple2Iterator) throws Exception { return new Iterable<Tuple2<byte[], byte[]>>() { - final ByteBuffer buffer = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); + ByteBuffer buffer = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); final MeasureCodec codec = new MeasureCodec(dataTypes); final Object[] input = new Object[measureSize]; final Object[] result = new Object[measureSize]; @@ -443,7 +443,7 @@ public class SparkCubing extends AbstractApplication { } aggs.collectStates(result); buffer.clear(); - codec.encode(result, buffer); + buffer = codec.encode(result, buffer); byte[] bytes = new byte[buffer.position()]; System.arraycopy(buffer.array(), buffer.arrayOffset(), bytes, 0, buffer.position()); return bytes; http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 0cd35f1..4c029df 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.lang.management.ManagementFactory; import java.net.InetAddress; +import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; @@ -298,7 +299,14 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement } buffer.clear(); - oneRecord.exportColumns(scanReq.getColumns(), buffer); + while (true) { + try { + oneRecord.exportColumns(scanReq.getColumns(), buffer); + break; + } catch (BufferOverflowException boe) { + buffer = ByteBuffer.allocate(buffer.capacity() * 4); + } + } buffer.flip(); outputStream.write(buffer.array(), buffer.arrayOffset() - buffer.position(), buffer.remaining()); http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java index c6a233b..cda7546 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java @@ -77,7 +77,7 @@ public class KeyValueCreator { } valueBuf.clear(); - codec.encode(colValues, valueBuf); + valueBuf = codec.encode(colValues, valueBuf); return create(keyBytes, keyOffset, keyLength, valueBuf.array(), 0, valueBuf.position()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java index cd4e33d..69d46ee 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java @@ -95,7 +95,7 @@ public class AggregateRegionObserverTest { new Object[] { new BigDecimal(decimal) } // : new Object[] { new BigDecimal(decimal), new LongMutable(number) }; buf.clear(); - col.measureCodec.encode(values, buf); + buf = col.measureCodec.encode(values, buf); Cell keyValue = new KeyValue(key, 0, key.length, // col.family, 0, col.family.length, // http://git-wip-us.apache.org/repos/asf/kylin/blob/1587b293/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java index 488ff59..334302c 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java @@ -64,7 +64,7 @@ public class RowValueDecoderTest extends LocalFileMetadataTestCase { LongMutable count = new LongMutable(2); LongMutable item_count = new LongMutable(100); ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - codec.encode(new Object[] { sum, min, max, count, item_count }, buf); + buf = codec.encode(new Object[] { sum, min, max, count, item_count }, buf); buf.flip(); byte[] valueBytes = new byte[buf.limit()]; @@ -95,7 +95,7 @@ public class RowValueDecoderTest extends LocalFileMetadataTestCase { LongWritable count = new LongWritable(2); LongMutable item_count = new LongMutable(100); ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - codec.encode(new Object[] { sum, min, max, count, item_count }, buf); + buf = codec.encode(new Object[] { sum, min, max, count, item_count }, buf); } }