Repository: kylin Updated Branches: refs/heads/master f2f3d6866 -> d56abdea8
KYLIN-1718 Grow ByteBuffer Dynamically in Cube Building and Query Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d56abdea Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d56abdea Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d56abdea Branch: refs/heads/master Commit: d56abdea858930be1df9e1772e323fd9cda31289 Parents: f2f3d68 Author: Yang Li <liy...@apache.org> Authored: Sun May 22 21:23:47 2016 +0800 Committer: Yang Li <liy...@apache.org> Committed: Sun May 22 21:23:47 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/cube/kv/RowConstants.java | 1 - .../kylin/gridtable/GTAggregateScanner.java | 32 +++--- .../org/apache/kylin/gridtable/GTRecord.java | 9 ++ .../metadata/measure/MeasureCodecTest.java | 11 +- .../kylin/measure/BufferedMeasureEncoder.java | 107 +++++++++++++++++++ .../org/apache/kylin/measure/MeasureCodec.java | 92 ---------------- .../apache/kylin/measure/MeasureDecoder.java | 86 +++++++++++++++ .../org/apache/kylin/measure/MeasureType.java | 1 + .../kylin/engine/mr/common/CuboidStatsUtil.java | 3 +- .../engine/mr/steps/BaseCuboidMapperBase.java | 15 ++- .../kylin/engine/mr/steps/CuboidReducer.java | 11 +- .../mr/steps/FactDistinctHiveColumnsMapper.java | 3 +- .../engine/mr/steps/InMemCuboidReducer.java | 18 +--- .../kylin/engine/mr/steps/KVGTRecordWriter.java | 26 +++-- .../engine/mr/steps/MergeCuboidMapper.java | 11 +- .../kylin/engine/mr/steps/CubeReducerTest.java | 18 ++-- .../apache/kylin/engine/spark/SparkCubing.java | 13 +-- .../spark/cube/DefaultTupleConverter.java | 28 ++--- .../cardinality/ColumnCardinalityMapper.java | 3 +- .../cardinality/ColumnCardinalityReducer.java | 3 +- .../ColumnCardinalityReducerTest.java | 3 +- .../observer/ObserverAggregators.java | 13 +-- .../coprocessor/endpoint/CubeVisitService.java | 16 ++- .../storage/hbase/steps/CubeHFileMapper.java | 6 +- .../storage/hbase/steps/KeyValueCreator.java | 11 +- .../storage/hbase/steps/RowValueDecoder.java | 6 +- .../observer/AggregateRegionObserverTest.java | 8 +- .../hbase/steps/CubeHFileMapper2Test.java | 7 +- .../hbase/steps/RowValueDecoderTest.java | 15 ++- 29 files changed, 332 insertions(+), 244 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java index 987fb55..809e0a3 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java @@ -43,7 +43,6 @@ public class RowConstants { public static final byte[] ROWVALUE_DELIMITER_BYTES = { 7 }; public static final int ROWKEY_BUFFER_SIZE = 65 * 256;// a little more than 64 dimensions * 256 bytes each - public static final int ROWVALUE_BUFFER_SIZE = 1024 * 1024; // 1 MB // marker class public static final byte[][] BYTE_ARR_MARKER = new byte[0][]; http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java index 7356e77..a2ca40c 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java @@ -39,9 +39,9 @@ import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.MemoryBudgetController; import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.measure.BufferedMeasureEncoder; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.measure.MeasureAggregators; -import org.apache.kylin.measure.MeasureCodec; import org.apache.kylin.metadata.datatype.DataType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,7 +153,7 @@ public class GTAggregateScanner implements IGTScanner { final List<Dump> dumps; final int keyLength; final boolean[] compareMask; - final MeasureCodec measureCodec; + final BufferedMeasureEncoder measureCodec; final Comparator<byte[]> bytesComparator = new Comparator<byte[]>() { @Override @@ -187,12 +187,15 @@ public class GTAggregateScanner implements IGTScanner { measureCodec = createMeasureCodec(); } - private MeasureCodec createMeasureCodec() { + private BufferedMeasureEncoder createMeasureCodec() { DataType[] types = new DataType[metrics.trueBitCount()]; for (int i = 0; i < types.length; i++) { types[i] = info.getColumnType(metrics.trueBitAt(i)); } - return new MeasureCodec(types); + + BufferedMeasureEncoder result = new BufferedMeasureEncoder(types); + result.setBufferSize(info.getMaxColumnLength(metrics)); + return result; } private boolean[] createCompareMask() { @@ -358,7 +361,7 @@ public class GTAggregateScanner implements IGTScanner { class ReturningRecord { final GTRecord record = new GTRecord(info); - final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics)); + final Object[] tmpValues = new Object[metrics.trueBitCount()]; void load(byte[] key, MeasureAggregator[] value) { int offset = 0; @@ -368,12 +371,18 @@ public class GTAggregateScanner implements IGTScanner { record.cols[c].set(key, offset, columnLength); offset += columnLength; } - metricsBuf.clear(); + + for (int i = 0; i < value.length; i++) { + tmpValues[i] = value[i].getState(); + } + + byte[] bytes = measureCodec.encode(tmpValues).array(); + int[] sizes = measureCodec.getMeasureSizes(); + offset = 0; for (int i = 0; i < value.length; i++) { int col = metrics.trueBitAt(i); - int pos = metricsBuf.position(); - info.codeSystem.encodeColumnValue(col, value[i].getState(), metricsBuf); - record.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos); + record.cols[col].set(bytes, offset, sizes[i]); + offset += sizes[i]; } } } @@ -430,7 +439,6 @@ public class GTAggregateScanner implements IGTScanner { if (buffMap != null) { ObjectOutputStream oos = null; Object[] aggrResult = null; - final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics)); try { dumpedFile = File.createTempFile("KYLIN_AGGR_", ".tmp"); @@ -438,12 +446,10 @@ public class GTAggregateScanner implements IGTScanner { oos = new ObjectOutputStream(new FileOutputStream(dumpedFile)); oos.writeInt(buffMap.size()); for (Entry<byte[], MeasureAggregator[]> entry : buffMap.entrySet()) { - metricsBuf.clear(); - MeasureAggregators aggs = new MeasureAggregators(entry.getValue()); aggrResult = new Object[metrics.trueBitCount()]; aggs.collectStates(aggrResult); - measureCodec.encode(aggrResult, metricsBuf); + ByteBuffer metricsBuf = measureCodec.encode(aggrResult); oos.writeObject(entry.getKey()); oos.writeObject(metricsBuf.array()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java index bccd0c5..f3cfc6a 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java @@ -136,6 +136,15 @@ public class GTRecord implements Comparable<GTRecord> { } return result; } + + public int sizeOf(ImmutableBitSet selectedCols) { + int size = 0; + for (int i = 0; i < selectedCols.trueBitCount(); i++) { + int c = selectedCols.trueBitAt(i); + size += cols[c].length(); + } + return size; + } public GTRecord copy() { return copy(info.colAll); http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java index 840e174..3f728b9 100644 --- a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java +++ b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java @@ -23,10 +23,9 @@ import static org.junit.Assert.*; import java.math.BigDecimal; import java.nio.ByteBuffer; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; -import org.apache.kylin.cube.kv.RowConstants; -import org.apache.kylin.measure.MeasureCodec; +import org.apache.kylin.measure.BufferedMeasureEncoder; import org.apache.kylin.measure.bitmap.BitmapCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.metadata.datatype.DoubleMutable; import org.apache.kylin.metadata.datatype.LongMutable; import org.apache.kylin.metadata.model.FunctionDesc; @@ -42,7 +41,7 @@ public class MeasureCodecTest { public void basicTest() { MeasureDesc descs[] = new MeasureDesc[] { measure("double"), measure("long"), measure ("decimal"), measure("HLLC16"), measure("bitmap") }; - MeasureCodec codec = new MeasureCodec(descs); + BufferedMeasureEncoder codec = new BufferedMeasureEncoder(descs); DoubleMutable d = new DoubleMutable(1.0); LongMutable l = new LongMutable(2); @@ -56,9 +55,7 @@ public class MeasureCodecTest { bitmap.add(Integer.MAX_VALUE-10); Object values[] = new Object[] { d, l, b, hllc, bitmap }; - ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - - codec.encode(values, buf); + ByteBuffer buf = codec.encode(values); buf.flip(); System.out.println("size: " + buf.limit()); http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureEncoder.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureEncoder.java b/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureEncoder.java new file mode 100644 index 0000000..88c7949 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureEncoder.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure; + +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.util.Collection; + +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.model.MeasureDesc; + +/** + * This class embeds a reusable byte buffer for measure encoding, and is not thread-safe. + * The buffer will grow to accommodate BufferOverflowException until a limit. + * The problem here to solve is some measure type cannot provide accurate DataTypeSerializer.maxLength() + */ +@SuppressWarnings({ "unchecked" }) +public class BufferedMeasureEncoder { + public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024; // 1 MB + public static final int MAX_BUFFER_SIZE = 64 * DEFAULT_BUFFER_SIZE; // 64 MB + + final private MeasureDecoder codec; + + private ByteBuffer buf; + final private int[] measureSizes; + + public BufferedMeasureEncoder(Collection<MeasureDesc> measureDescs) { + this.codec = new MeasureDecoder(measureDescs); + this.measureSizes = new int[codec.nMeasures]; + } + + public BufferedMeasureEncoder(MeasureDesc... measureDescs) { + this.codec = new MeasureDecoder(measureDescs); + this.measureSizes = new int[codec.nMeasures]; + } + + public BufferedMeasureEncoder(DataType... dataTypes) { + this.codec = new MeasureDecoder(dataTypes); + this.measureSizes = new int[codec.nMeasures]; + } + + public BufferedMeasureEncoder(String... dataTypes) { + this.codec = new MeasureDecoder(dataTypes); + this.measureSizes = new int[codec.nMeasures]; + } + + /** return the buffer that contains result of last encoding */ + public ByteBuffer getBuffer() { + return buf; + } + + /** return the measure sizes of last encoding */ + public int[] getMeasureSizes() { + return measureSizes; + } + + public void setBufferSize(int size) { + buf = null; // release memory for GC + buf = ByteBuffer.allocate(size); + } + + public void decode(ByteBuffer buf, Object[] result) { + codec.decode(buf, result); + } + + public ByteBuffer encode(Object[] values) { + if (buf == null) { + setBufferSize(DEFAULT_BUFFER_SIZE); + } + + assert values.length == codec.nMeasures; + + while (true) { + try { + buf.clear(); + for (int i = 0, pos = 0; i < codec.nMeasures; i++) { + codec.serializers[i].serialize(values[i], buf); + measureSizes[i] = buf.position() - pos; + pos = buf.position(); + } + return buf; + + } catch (BufferOverflowException boe) { + if (buf.capacity() >= MAX_BUFFER_SIZE) + throw boe; + + setBufferSize(buf.capacity() * 2); + } + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java deleted file mode 100644 index 2794fa8..0000000 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.measure; - -import java.nio.ByteBuffer; -import java.util.Collection; - -import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.metadata.datatype.DataTypeSerializer; -import org.apache.kylin.metadata.model.MeasureDesc; - -/** - * @author yangli9 - * - */ -@SuppressWarnings({ "rawtypes", "unchecked" }) -public class MeasureCodec { - - int nMeasures; - DataTypeSerializer[] serializers; - - public MeasureCodec(Collection<MeasureDesc> measureDescs) { - this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()])); - } - - public MeasureCodec(MeasureDesc... measureDescs) { - String[] dataTypes = new String[measureDescs.length]; - for (int i = 0; i < dataTypes.length; i++) { - dataTypes[i] = measureDescs[i].getFunction().getReturnType(); - } - init(dataTypes); - } - - public MeasureCodec(DataType... dateTypes) { - init(dateTypes); - } - - public MeasureCodec(String... dataTypes) { - init(dataTypes); - } - - private void init(String[] dataTypes) { - DataType[] typeInstances = new DataType[dataTypes.length]; - for (int i = 0; i < dataTypes.length; i++) { - typeInstances[i] = DataType.getType(dataTypes[i]); - } - init(typeInstances); - } - - private void init(DataType[] dataTypes) { - nMeasures = dataTypes.length; - serializers = new DataTypeSerializer[nMeasures]; - - for (int i = 0; i < nMeasures; i++) { - serializers[i] = DataTypeSerializer.create(dataTypes[i]); - } - } - - public DataTypeSerializer getSerializer(int idx) { - return serializers[idx]; - } - - public void decode(ByteBuffer buf, Object[] result) { - assert result.length == nMeasures; - for (int i = 0; i < nMeasures; i++) { - result[i] = serializers[i].deserialize(buf); - } - } - - public void encode(Object[] values, ByteBuffer out) { - assert values.length == nMeasures; - for (int i = 0; i < nMeasures; i++) { - serializers[i].serialize(values[i], out); - } - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/core-metadata/src/main/java/org/apache/kylin/measure/MeasureDecoder.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureDecoder.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureDecoder.java new file mode 100644 index 0000000..63dd1e7 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureDecoder.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure; + +import java.nio.ByteBuffer; +import java.util.Collection; + +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.datatype.DataTypeSerializer; +import org.apache.kylin.metadata.model.MeasureDesc; + +/** + * @author yangli9 + * + */ +@SuppressWarnings({ "rawtypes" }) +public class MeasureDecoder { + + int nMeasures; + DataTypeSerializer[] serializers; + + public MeasureDecoder(Collection<MeasureDesc> measureDescs) { + this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()])); + } + + public MeasureDecoder(MeasureDesc... measureDescs) { + String[] dataTypes = new String[measureDescs.length]; + for (int i = 0; i < dataTypes.length; i++) { + dataTypes[i] = measureDescs[i].getFunction().getReturnType(); + } + init(dataTypes); + } + + public MeasureDecoder(DataType... dataTypes) { + init(dataTypes); + } + + public MeasureDecoder(String... dataTypes) { + init(dataTypes); + } + + private void init(String[] dataTypes) { + DataType[] typeInstances = new DataType[dataTypes.length]; + for (int i = 0; i < dataTypes.length; i++) { + typeInstances[i] = DataType.getType(dataTypes[i]); + } + init(typeInstances); + } + + private void init(DataType[] dataTypes) { + nMeasures = dataTypes.length; + serializers = new DataTypeSerializer[nMeasures]; + + for (int i = 0; i < nMeasures; i++) { + serializers[i] = DataTypeSerializer.create(dataTypes[i]); + } + } + + public DataTypeSerializer getSerializer(int idx) { + return serializers[idx]; + } + + public void decode(ByteBuffer buf, Object[] result) { + assert result.length == nMeasures; + for (int i = 0; i < nMeasures; i++) { + result[i] = serializers[i].deserialize(buf); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java index 4dea46b..740e896 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java @@ -107,6 +107,7 @@ abstract public class MeasureType<T> { /** Whether or not Calcite rel-tree needs rewrite to do last around of aggregation */ abstract public boolean needRewrite(); + /** Does the rewrite involves an extra field for the pre-calculated */ public boolean needRewriteField() { return true; } http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java index cb4b1cb..78b272c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; +import org.apache.kylin.measure.BufferedMeasureEncoder; import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.kv.RowConstants; @@ -49,7 +50,7 @@ public class CuboidStatsUtil { allCuboids.addAll(cuboidHLLMap.keySet()); Collections.sort(allCuboids); - ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); + ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE); SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(seqFilePath), SequenceFile.Writer.keyClass(LongWritable.class), SequenceFile.Writer.valueClass(BytesWritable.class)); try { // mapper overlap ratio at key -1 http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java index a1eeb1b..e404f9a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java @@ -35,14 +35,13 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; -import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; import org.apache.kylin.dimension.Dictionary; import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.measure.MeasureCodec; +import org.apache.kylin.measure.BufferedMeasureEncoder; import org.apache.kylin.measure.MeasureIngester; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; @@ -77,11 +76,10 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL protected byte[][] keyBytesBuf; protected BytesSplitter bytesSplitter; protected AbstractRowKeyEncoder rowKeyEncoder; - protected MeasureCodec measureCodec; + protected BufferedMeasureEncoder measureCodec; private int errorRecordCounter; protected Text outputKey = new Text(); protected Text outputValue = new Text(); - private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); @Override protected void setup(Context context) throws IOException { @@ -110,7 +108,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL bytesSplitter = new BytesSplitter(200, 16384); rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid); - measureCodec = new MeasureCodec(cubeDesc.getMeasures()); + measureCodec = new BufferedMeasureEncoder(cubeDesc.getMeasures()); measures = new Object[cubeDesc.getMeasures().size()]; int colCount = cubeDesc.getRowkey().getRowKeyColumns().length; @@ -153,14 +151,13 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL return rowKeyEncoder.encode(keyBytesBuf); } - private void buildValue(SplittedBytes[] splitBuffers) { + private ByteBuffer buildValue(SplittedBytes[] splitBuffers) { for (int i = 0; i < measures.length; i++) { measures[i] = buildValueOf(i, splitBuffers); } - valueBuf.clear(); - measureCodec.encode(measures, valueBuf); + return measureCodec.encode(measures); } private Object buildValueOf(int idxOfMeasure, SplittedBytes[] splitBuffers) { @@ -203,7 +200,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers()); outputKey.set(rowKey, 0, rowKey.length); - buildValue(bytesSplitter.getSplitBuffers()); + ByteBuffer valueBuf = buildValue(bytesSplitter.getSplitBuffers()); outputValue.set(valueBuf.array(), 0, valueBuf.position()); context.write(outputKey, outputValue); } http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java index f263d99..6986344 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java @@ -25,13 +25,12 @@ import java.util.List; import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.KylinReducer; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.measure.BufferedMeasureEncoder; import org.apache.kylin.measure.MeasureAggregators; -import org.apache.kylin.measure.MeasureCodec; import org.apache.kylin.metadata.model.MeasureDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +47,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> { private CubeDesc cubeDesc; private List<MeasureDesc> measuresDescs; - private MeasureCodec codec; + private BufferedMeasureEncoder codec; private MeasureAggregators aggs; private int counter; @@ -57,7 +56,6 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> { private Object[] input; private Object[] result; - private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); private Text outputValue = new Text(); @Override @@ -73,7 +71,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> { cubeDesc = CubeManager.getInstance(config).getCube(cubeName).getDescriptor(); measuresDescs = cubeDesc.getMeasures(); - codec = new MeasureCodec(measuresDescs); + codec = new BufferedMeasureEncoder(measuresDescs); aggs = new MeasureAggregators(measuresDescs); input = new Object[measuresDescs.size()]; @@ -101,8 +99,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> { } aggs.collectStates(result); - valueBuf.clear(); - codec.encode(result, valueBuf); + ByteBuffer valueBuf = codec.encode(result); outputValue.set(valueBuf.array(), 0, valueBuf.position()); context.write(key, outputValue); http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java index 2688919..3be5795 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.List; import org.apache.hadoop.io.Text; +import org.apache.kylin.measure.BufferedMeasureEncoder; import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; @@ -159,7 +160,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap @Override protected void cleanup(Context context) throws IOException, InterruptedException { if (collectStatistics) { - ByteBuffer hllBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); + ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE); // output each cuboid's hll to reducer, key is 0 - cuboidId HyperLogLogPlusCounter hll; for (int i = 0; i < cuboidIds.length; i++) { http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java index ec36242..673cfc0 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java @@ -26,17 +26,14 @@ import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.ByteArrayWritable; import org.apache.kylin.engine.mr.KylinReducer; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.measure.BufferedMeasureEncoder; import org.apache.kylin.measure.MeasureAggregators; -import org.apache.kylin.measure.MeasureCodec; import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +43,7 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra private static final Logger logger = LoggerFactory.getLogger(InMemCuboidReducer.class); - private MeasureCodec codec; + private BufferedMeasureEncoder codec; private MeasureAggregators aggs; private int counter; @@ -55,7 +52,6 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra private Text outputKey; private Text outputValue; - private ByteBuffer valueBuf; @Override protected void setup(Context context) throws IOException { @@ -63,22 +59,17 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase(); - String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME); - boolean isMerge = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_IS_MERGE)); - CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); CubeDesc cubeDesc = cube.getDescriptor(); - CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW); List<MeasureDesc> measuresDescs = cubeDesc.getMeasures(); - codec = new MeasureCodec(measuresDescs); + codec = new BufferedMeasureEncoder(measuresDescs); aggs = new MeasureAggregators(measuresDescs); input = new Object[measuresDescs.size()]; result = new Object[measuresDescs.size()]; outputKey = new Text(); outputValue = new Text(); - valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); } @Override @@ -96,8 +87,7 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra outputKey.set(key.array(), key.offset(), key.length()); // output value - valueBuf.clear(); - codec.encode(result, valueBuf); + ByteBuffer valueBuf = codec.encode(result); outputValue.set(valueBuf.array(), 0, valueBuf.position()); context.write(outputKey, outputValue); http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java index ee65132..de8b6d4 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java @@ -18,19 +18,21 @@ package org.apache.kylin.engine.mr.steps; +import java.io.IOException; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.inmemcubing.ICuboidWriter; import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; -import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.ByteArrayWritable; import org.apache.kylin.gridtable.GTRecord; - -import java.io.IOException; -import java.nio.ByteBuffer; +import org.apache.kylin.measure.BufferedMeasureEncoder; /** */ @@ -45,8 +47,8 @@ public abstract class KVGTRecordWriter implements ICuboidWriter { private int dimensions; private int measureCount; private byte[] keyBuf; - private int[] measureColumnsIndex; - private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); + private ImmutableBitSet measureColumns; + private ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE); private ByteArrayWritable outputKey = new ByteArrayWritable(); private ByteArrayWritable outputValue = new ByteArrayWritable(); private long cuboidRowCount = 0; @@ -77,7 +79,12 @@ public abstract class KVGTRecordWriter implements ICuboidWriter { //output measures valueBuf.clear(); - record.exportColumns(measureColumnsIndex, valueBuf); + try { + record.exportColumns(measureColumns, valueBuf); + } catch (BufferOverflowException boe) { + valueBuf = ByteBuffer.allocate((int) (record.sizeOf(measureColumns) * 1.5)); + record.exportColumns(measureColumns, valueBuf); + } outputKey.set(keyBuf, 0, keyBuf.length); outputValue.set(valueBuf.array(), 0, valueBuf.position()); @@ -91,9 +98,6 @@ public abstract class KVGTRecordWriter implements ICuboidWriter { keyBuf = rowKeyEncoder.createBuf(); dimensions = Long.bitCount(cuboidId); - measureColumnsIndex = new int[measureCount]; - for (int i = 0; i < measureCount; i++) { - measureColumnsIndex[i] = dimensions + i; - } + measureColumns = new ImmutableBitSet(dimensions, dimensions + measureCount); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java index 6766b31..72de7dc 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java @@ -47,7 +47,7 @@ import org.apache.kylin.dimension.Dictionary; import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.measure.MeasureCodec; +import org.apache.kylin.measure.BufferedMeasureEncoder; import org.apache.kylin.measure.MeasureIngester; import org.apache.kylin.measure.MeasureType; import org.apache.kylin.metadata.model.MeasureDesc; @@ -85,9 +85,8 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { private Map<TblColRef, Dictionary<String>> oldDicts; private Map<TblColRef, Dictionary<String>> newDicts; private List<MeasureDesc> measureDescs; - private MeasureCodec codec; + private BufferedMeasureEncoder codec; private Object[] measureObjs; - private ByteBuffer valueBuf; private Text outputValue; @Override @@ -116,9 +115,8 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment); measureDescs = cubeDesc.getMeasures(); - codec = new MeasureCodec(measureDescs); + codec = new BufferedMeasureEncoder(measureDescs); measureObjs = new Object[measureDescs.size()]; - valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); outputValue = new Text(); dictMeasures = Lists.newArrayList(); @@ -233,8 +231,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { MeasureIngester ingester = pair.getSecond(); measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts); } - valueBuf.clear(); - codec.encode(measureObjs, valueBuf); + ByteBuffer valueBuf = codec.encode(measureObjs); outputValue.set(valueBuf.array(), 0, valueBuf.position()); value = outputValue; } http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java index aa5e77e..b64afd3 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java @@ -18,8 +18,7 @@ package org.apache.kylin.engine.mr.steps; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import java.io.File; import java.lang.reflect.Field; @@ -34,11 +33,10 @@ import org.apache.hadoop.mrunit.mapreduce.ReduceDriver; import org.apache.hadoop.mrunit.types.Pair; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.measure.BufferedMeasureEncoder; import org.apache.kylin.measure.MeasureAggregator; -import org.apache.kylin.measure.MeasureCodec; import org.apache.kylin.measure.MeasureIngester; import org.apache.kylin.measure.MeasureType; import org.apache.kylin.metadata.datatype.LongMutable; @@ -50,12 +48,11 @@ import org.junit.Test; /** */ +@SuppressWarnings("rawtypes") public class CubeReducerTest extends LocalFileMetadataTestCase { ReduceDriver<Text, Text, Text, Text> reduceDriver; - ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - @Before public void setUp() throws Exception { createTestMetadata(); @@ -80,7 +77,7 @@ public class CubeReducerTest extends LocalFileMetadataTestCase { reduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, "test_kylin_cube_with_slr_ready"); CubeDesc cubeDesc = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready").getDescriptor(); - MeasureCodec codec = new MeasureCodec(cubeDesc.getMeasures()); + BufferedMeasureEncoder codec = new BufferedMeasureEncoder(cubeDesc.getMeasures()); Text key1 = new Text("72010ustech"); List<Text> values1 = new ArrayList<Text>(); @@ -127,7 +124,7 @@ public class CubeReducerTest extends LocalFileMetadataTestCase { MeasureType origMeasureType = functionDesc.getMeasureType(); field.set(functionDesc, new MockUpMeasureType(origMeasureType)); - MeasureCodec codec = new MeasureCodec(cubeDesc.getMeasures()); + BufferedMeasureEncoder codec = new BufferedMeasureEncoder(cubeDesc.getMeasures()); Text key1 = new Text("72010ustech"); List<Text> values1 = new ArrayList<Text>(); @@ -161,11 +158,10 @@ public class CubeReducerTest extends LocalFileMetadataTestCase { assertTrue(result.contains(p3)); } - private Text newValueText(MeasureCodec codec, String sum, String min, String max, int count, int item_count) { + private Text newValueText(BufferedMeasureEncoder codec, String sum, String min, String max, int count, int item_count) { Object[] values = new Object[] { new BigDecimal(sum), new BigDecimal(min), new BigDecimal(max), new LongMutable(count), new LongMutable(item_count) }; - buf.clear(); - codec.encode(values, buf); + ByteBuffer buf = codec.encode(values); Text t = new Text(); t.set(buf.array(), 0, buf.position()); http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index 2b4e497..796fb9e 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -67,7 +67,6 @@ import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder; import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder; import org.apache.kylin.cube.kv.CubeDimEncMap; -import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; import org.apache.kylin.cube.model.DimensionDesc; @@ -81,8 +80,8 @@ import org.apache.kylin.engine.mr.common.CubeStatsReader; import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter; import org.apache.kylin.engine.spark.cube.DefaultTupleConverter; import org.apache.kylin.engine.spark.util.IteratorUtils; +import org.apache.kylin.measure.BufferedMeasureEncoder; import org.apache.kylin.measure.MeasureAggregators; -import org.apache.kylin.measure.MeasureCodec; import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; @@ -110,8 +109,6 @@ import org.reflections.Reflections; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Tuple2; - import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; @@ -123,6 +120,8 @@ import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; import com.google.common.primitives.UnsignedBytes; +import scala.Tuple2; + /** */ public class SparkCubing extends AbstractApplication { @@ -422,8 +421,7 @@ public class SparkCubing extends AbstractApplication { @Override public Iterable<Tuple2<byte[], byte[]>> call(final Iterator<Tuple2<byte[], byte[]>> tuple2Iterator) throws Exception { return new Iterable<Tuple2<byte[], byte[]>>() { - final ByteBuffer buffer = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - final MeasureCodec codec = new MeasureCodec(dataTypes); + final BufferedMeasureEncoder codec = new BufferedMeasureEncoder(dataTypes); final Object[] input = new Object[measureSize]; final Object[] result = new Object[measureSize]; @@ -442,8 +440,7 @@ public class SparkCubing extends AbstractApplication { aggs.aggregate(input); } aggs.collectStates(result); - buffer.clear(); - codec.encode(result, buffer); + ByteBuffer buffer = codec.encode(result); byte[] bytes = new byte[buffer.position()]; System.arraycopy(buffer.array(), buffer.arrayOffset(), bytes, 0, buffer.position()); return bytes; http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java index 8da1519..2532679 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java @@ -17,16 +17,19 @@ */ package org.apache.kylin.engine.spark.cube; +import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.Map; import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.kv.RowKeyEncoder; import org.apache.kylin.cube.kv.RowKeyEncoderProvider; import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.measure.BufferedMeasureEncoder; import org.apache.kylin.metadata.model.TblColRef; import scala.Tuple2; @@ -36,7 +39,6 @@ import scala.Tuple2; public final class DefaultTupleConverter implements TupleConverter { private final static ThreadLocal<ByteBuffer> valueBuf = new ThreadLocal<>(); - private final static ThreadLocal<int[]> measureColumnsIndex = new ThreadLocal<>(); private final CubeSegment segment; private final int measureCount; private final Map<TblColRef, Integer> columnLengthMap; @@ -52,16 +54,13 @@ public final class DefaultTupleConverter implements TupleConverter { private ByteBuffer getValueBuf() { if (valueBuf.get() == null) { - valueBuf.set(ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE)); + valueBuf.set(ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE)); } return valueBuf.get(); } - - private int[] getMeasureColumnsIndex() { - if (measureColumnsIndex.get() == null) { - measureColumnsIndex.set(new int[measureCount]); - } - return measureColumnsIndex.get(); + + private void setValueBuf(ByteBuffer buf) { + valueBuf.set(buf); } @Override @@ -70,10 +69,7 @@ public final class DefaultTupleConverter implements TupleConverter { RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid); final int dimensions = Long.bitCount(cuboidId); - int[] measureColumnsIndex = getMeasureColumnsIndex(); - for (int i = 0; i < measureCount; i++) { - measureColumnsIndex[i] = dimensions + i; - } + final ImmutableBitSet measureColumns = new ImmutableBitSet(dimensions, dimensions + measureCount); int offSet = 0; for (int x = 0; x < dimensions; x++) { @@ -87,7 +83,13 @@ public final class DefaultTupleConverter implements TupleConverter { ByteBuffer valueBuf = getValueBuf(); valueBuf.clear(); - record.exportColumns(measureColumnsIndex, valueBuf); + try { + record.exportColumns(measureColumns, valueBuf); + } catch (BufferOverflowException boe) { + valueBuf = ByteBuffer.allocate((int) (record.sizeOf(measureColumns) * 1.5)); + record.exportColumns(measureColumns, valueBuf); + setValueBuf(valueBuf); + } byte[] value = new byte[valueBuf.position()]; System.arraycopy(valueBuf.array(), 0, value, 0, valueBuf.position()); http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java index 68b6ae4..e06fb68 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.measure.BufferedMeasureEncoder; import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.kv.RowConstants; @@ -97,7 +98,7 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab @Override protected void cleanup(Context context) throws IOException, InterruptedException { Iterator<Integer> it = hllcMap.keySet().iterator(); - ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); + ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE); while (it.hasNext()) { int key = it.next(); HyperLogLogPlusCounter hllc = hllcMap.get(key); http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java index 19f5759..6596917 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java @@ -30,6 +30,7 @@ import java.util.Map; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.kylin.measure.BufferedMeasureEncoder; import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.engine.mr.KylinReducer; @@ -79,7 +80,7 @@ public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWri while (it.hasNext()) { int key = it.next(); HyperLogLogPlusCounter hllc = hllcMap.get(key); - ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); + ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE); buf.clear(); hllc.writeRegisters(buf); buf.flip(); http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java ---------------------------------------------------------------------- diff --git a/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java b/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java index 57721d6..b9f532d 100644 --- a/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java +++ b/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java @@ -33,6 +33,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mrunit.mapreduce.ReduceDriver; import org.apache.hadoop.mrunit.types.Pair; +import org.apache.kylin.measure.BufferedMeasureEncoder; import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.kv.RowConstants; @@ -65,7 +66,7 @@ public class ColumnCardinalityReducerTest { i++; hllc.add(Bytes.toBytes(temp)); } - ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); + ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE); buf.clear(); hllc.writeRegisters(buf); buf.flip(); http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java index 9e9dd6d..941c10f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java @@ -28,10 +28,9 @@ import org.apache.hadoop.hbase.Cell; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesSerializer; import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.HBaseColumnDesc; +import org.apache.kylin.measure.BufferedMeasureEncoder; import org.apache.kylin.measure.MeasureAggregator; -import org.apache.kylin.measure.MeasureCodec; import org.apache.kylin.measure.MeasureType; import org.apache.kylin.measure.MeasureTypeFactory; import org.apache.kylin.metadata.datatype.DataType; @@ -228,9 +227,7 @@ public class ObserverAggregators { for (int j = 0; j < col.nMeasures; j++) col.measureValues[j] = aggrs[i++].getState(); - col.measureBuf.clear(); - col.measureCodec.encode(col.measureValues, col.measureBuf); - hColValues[ci] = col.measureBuf; + hColValues[ci] = col.measureCodec.encode(col.measureValues); } return hColValues; } @@ -244,9 +241,8 @@ public class ObserverAggregators { final String[] dataTypes; final int nMeasures; - final MeasureCodec measureCodec; + final BufferedMeasureEncoder measureCodec; final Object[] measureValues; - final ByteBuffer measureBuf; public HCol(byte[] bFamily, byte[] bQualifier, String[] funcNames, String[] dataTypes) { this.family = bFamily; @@ -256,9 +252,8 @@ public class ObserverAggregators { this.nMeasures = funcNames.length; assert funcNames.length == dataTypes.length; - this.measureCodec = new MeasureCodec(dataTypes); + this.measureCodec = new BufferedMeasureEncoder(dataTypes); this.measureValues = new Object[nMeasures]; - this.measureBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 510e735..3ccf7cf 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.lang.management.ManagementFactory; import java.net.InetAddress; +import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; @@ -48,6 +49,7 @@ import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.gridtable.IGTStore; +import org.apache.kylin.measure.BufferedMeasureEncoder; import org.apache.kylin.metadata.filter.UDF.MassInTupleFilter; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.realization.IRealizationConstants; @@ -279,9 +281,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER.ordinal(), // behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal()); - ByteBuffer buffer = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); + ByteBuffer buffer = ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(RowConstants.ROWVALUE_BUFFER_SIZE);//ByteArrayOutputStream will auto grow + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE);//ByteArrayOutputStream will auto grow int finalRowCount = 0; for (GTRecord oneRecord : finalScanner) { @@ -298,10 +300,14 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement } buffer.clear(); - oneRecord.exportColumns(scanReq.getColumns(), buffer); - buffer.flip(); + try { + oneRecord.exportColumns(scanReq.getColumns(), buffer); + } catch (BufferOverflowException boe) { + buffer = ByteBuffer.allocate((int) (oneRecord.sizeOf(scanReq.getColumns()) * 1.5)); + oneRecord.exportColumns(scanReq.getColumns(), buffer); + } - outputStream.write(buffer.array(), buffer.arrayOffset() - buffer.position(), buffer.remaining()); + outputStream.write(buffer.array(), 0, buffer.position()); finalRowCount++; } finalScanner.close(); http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java index 8205ff7..7523249 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java @@ -33,7 +33,7 @@ import org.apache.kylin.cube.model.HBaseColumnFamilyDesc; import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.measure.MeasureCodec; +import org.apache.kylin.measure.MeasureDecoder; import com.google.common.collect.Lists; @@ -48,7 +48,7 @@ public class CubeHFileMapper extends KylinMapper<Text, Text, ImmutableBytesWrita String cubeName; CubeDesc cubeDesc; - MeasureCodec inputCodec; + MeasureDecoder inputCodec; Object[] inputMeasures; List<KeyValueCreator> keyValueCreators; @@ -62,7 +62,7 @@ public class CubeHFileMapper extends KylinMapper<Text, Text, ImmutableBytesWrita CubeManager cubeMgr = CubeManager.getInstance(config); cubeDesc = cubeMgr.getCube(cubeName).getDescriptor(); - inputCodec = new MeasureCodec(cubeDesc.getMeasures()); + inputCodec = new MeasureDecoder(cubeDesc.getMeasures()); inputMeasures = new Object[cubeDesc.getMeasures().size()]; keyValueCreators = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java index c6a233b..490031e 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java @@ -24,10 +24,9 @@ import java.util.List; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.io.Text; import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.HBaseColumnDesc; -import org.apache.kylin.measure.MeasureCodec; +import org.apache.kylin.measure.BufferedMeasureEncoder; import org.apache.kylin.metadata.model.MeasureDesc; /** @@ -41,9 +40,8 @@ public class KeyValueCreator { int[] refIndex; MeasureDesc[] refMeasures; - MeasureCodec codec; + BufferedMeasureEncoder codec; Object[] colValues; - ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); public boolean isFullCopy; @@ -56,7 +54,7 @@ public class KeyValueCreator { refIndex = colDesc.getMeasureIndex(); refMeasures = colDesc.getMeasures(); - codec = new MeasureCodec(refMeasures); + codec = new BufferedMeasureEncoder(refMeasures); colValues = new Object[refMeasures.length]; isFullCopy = true; @@ -76,8 +74,7 @@ public class KeyValueCreator { colValues[i] = measureValues[refIndex[i]]; } - valueBuf.clear(); - codec.encode(colValues, valueBuf); + ByteBuffer valueBuf = codec.encode(colValues); return create(keyBytes, keyOffset, keyLength, valueBuf.array(), 0, valueBuf.position()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java index 7d05dc9..e1e8f8c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java @@ -25,7 +25,7 @@ import java.util.Collection; import org.apache.hadoop.hbase.client.Result; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.model.HBaseColumnDesc; -import org.apache.kylin.measure.MeasureCodec; +import org.apache.kylin.measure.MeasureDecoder; import org.apache.kylin.metadata.datatype.DoubleMutable; import org.apache.kylin.metadata.datatype.LongMutable; import org.apache.kylin.metadata.model.FunctionDesc; @@ -44,7 +44,7 @@ public class RowValueDecoder implements Cloneable { private final byte[] hbaseColumnFamily; private final byte[] hbaseColumnQualifier; - private final MeasureCodec codec; + private final MeasureDecoder codec; private final BitSet projectionIndex; private final MeasureDesc[] measures; private final Object[] values; @@ -55,7 +55,7 @@ public class RowValueDecoder implements Cloneable { this.hbaseColumnQualifier = Bytes.toBytes(hbaseColumn.getQualifier()); this.projectionIndex = new BitSet(); this.measures = hbaseColumn.getMeasures(); - this.codec = new MeasureCodec(measures); + this.codec = new MeasureDecoder(measures); this.values = new Object[measures.length]; } http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java index cd4e33d..3efcf92 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java @@ -18,8 +18,7 @@ package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import java.io.IOException; import java.math.BigDecimal; @@ -36,7 +35,6 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.metadata.datatype.LongMutable; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.TableDesc; @@ -55,7 +53,6 @@ import com.google.common.collect.Lists; * @author yangli9 */ public class AggregateRegionObserverTest { - ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); byte[] mask = new byte[] { (byte) 0xff, (byte) 0xff, 0, 0 }; byte[] k1 = new byte[] { 0x01, 0x01, 0, 0x01 }; @@ -94,8 +91,7 @@ public class AggregateRegionObserverTest { Object[] values = number == Integer.MIN_VALUE ? // new Object[] { new BigDecimal(decimal) } // : new Object[] { new BigDecimal(decimal), new LongMutable(number) }; - buf.clear(); - col.measureCodec.encode(values, buf); + ByteBuffer buf = col.measureCodec.encode(values); Cell keyValue = new KeyValue(key, 0, key.length, // col.family, 0, col.family.length, // http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java index b715498..dbf39e7 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java @@ -35,7 +35,7 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.HadoopUtil; -import org.apache.kylin.measure.MeasureCodec; +import org.apache.kylin.measure.MeasureDecoder; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -49,8 +49,7 @@ public class CubeHFileMapper2Test extends LocalFileMetadataTestCase { String cubeName = "test_kylin_cube_with_slr_ready"; - MeasureCodec codec; - ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); + MeasureDecoder codec; Object[] outKV = new Object[2]; @Before @@ -60,7 +59,7 @@ public class CubeHFileMapper2Test extends LocalFileMetadataTestCase { FileUtils.deleteDirectory(new File("../job/meta")); FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl()), new File("../job/meta")); CubeDesc desc = CubeManager.getInstance(getTestConfig()).getCube(cubeName).getDescriptor(); - codec = new MeasureCodec(desc.getMeasures()); + codec = new MeasureDecoder(desc.getMeasures()); } @After http://git-wip-us.apache.org/repos/asf/kylin/blob/d56abdea/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java index 488ff59..6475bad 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java @@ -18,7 +18,7 @@ package org.apache.kylin.storage.hbase.steps; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; import java.math.BigDecimal; import java.nio.ByteBuffer; @@ -27,10 +27,9 @@ import java.util.Arrays; import org.apache.hadoop.io.LongWritable; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.HBaseColumnDesc; -import org.apache.kylin.measure.MeasureCodec; +import org.apache.kylin.measure.BufferedMeasureEncoder; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.datatype.LongMutable; import org.apache.kylin.metadata.model.FunctionDesc; @@ -57,14 +56,13 @@ public class RowValueDecoderTest extends LocalFileMetadataTestCase { CubeDesc cubeDesc = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready").getDescriptor(); HBaseColumnDesc hbaseCol = cubeDesc.getHbaseMapping().getColumnFamily()[0].getColumns()[0]; - MeasureCodec codec = new MeasureCodec(hbaseCol.getMeasures()); + BufferedMeasureEncoder codec = new BufferedMeasureEncoder(hbaseCol.getMeasures()); BigDecimal sum = new BigDecimal("333.1234567"); BigDecimal min = new BigDecimal("333.1111111"); BigDecimal max = new BigDecimal("333.1999999"); LongMutable count = new LongMutable(2); LongMutable item_count = new LongMutable(100); - ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - codec.encode(new Object[] { sum, min, max, count, item_count }, buf); + ByteBuffer buf = codec.encode(new Object[] { sum, min, max, count, item_count }); buf.flip(); byte[] valueBytes = new byte[buf.limit()]; @@ -88,14 +86,13 @@ public class RowValueDecoderTest extends LocalFileMetadataTestCase { CubeDesc cubeDesc = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready").getDescriptor(); HBaseColumnDesc hbaseCol = cubeDesc.getHbaseMapping().getColumnFamily()[0].getColumns()[0]; - MeasureCodec codec = new MeasureCodec(hbaseCol.getMeasures()); + BufferedMeasureEncoder codec = new BufferedMeasureEncoder(hbaseCol.getMeasures()); BigDecimal sum = new BigDecimal("11111111111111111111333.1234567"); BigDecimal min = new BigDecimal("333.1111111"); BigDecimal max = new BigDecimal("333.1999999"); LongWritable count = new LongWritable(2); LongMutable item_count = new LongMutable(100); - ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - codec.encode(new Object[] { sum, min, max, count, item_count }, buf); + codec.encode(new Object[] { sum, min, max, count, item_count }); } }