KYLIN-2501 Stream Aggregate GTRecords at Query Server
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0fa57248 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0fa57248 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0fa57248 Branch: refs/heads/KYLIN-2501 Commit: 0fa572482f209d25cca6968812d293c08d077210 Parents: fa3ee3f Author: gaodayue <gaoda...@meituan.com> Authored: Wed Mar 15 22:45:02 2017 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Fri Mar 31 16:39:35 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 4 + .../kylin/common/util/ImmutableBitSet.java | 29 ++- .../org/apache/kylin/GTForwardingScanner.java | 56 +++++ .../kylin/cube/gridtable/CubeGridTable.java | 18 -- .../gridtable/CuboidToGridTableMapping.java | 18 ++ .../cube/inmemcubing/InMemCubeBuilder.java | 6 +- .../kylin/gridtable/GTAggregateScanner.java | 16 +- .../apache/kylin/gridtable/GTFilterScanner.java | 22 +- .../org/apache/kylin/gridtable/GTRecord.java | 80 +++---- .../apache/kylin/gridtable/GTScanRequest.java | 13 ++ .../gridtable/GTStreamAggregateScanner.java | 211 +++++++++++++++++++ .../kylin/gridtable/GTScanReqSerDerTest.java | 4 +- .../apache/kylin/storage/StorageContext.java | 20 ++ .../storage/gtrecord/CubeScanRangePlanner.java | 3 +- .../storage/gtrecord/CubeSegmentScanner.java | 7 +- .../storage/gtrecord/CubeTupleConverter.java | 31 +-- .../gtrecord/GTCubeStorageQueryBase.java | 38 +++- .../kylin/storage/gtrecord/ITupleConverter.java | 3 +- .../gtrecord/PartitionResultIterator.java | 59 ++++++ .../storage/gtrecord/PartitionResultMerger.java | 100 +++++++++ .../kylin/storage/gtrecord/ScannerWorker.java | 5 +- .../gtrecord/SegmentCubeTupleIterator.java | 72 ++++++- .../gtrecord/StorageResponseGTScatter.java | 82 +++---- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 7 +- .../storage/hbase/cube/v2/CubeHBaseRPC.java | 5 +- .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 5 +- 26 files changed, 704 insertions(+), 210 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 02349ad..9cd35c8 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -802,6 +802,10 @@ abstract public class KylinConfigBase implements Serializable { return Boolean.valueOf(getOptional("kylin.query.skip-empty-segments", "true")); } + public boolean isStreamAggregateEnabled() { + return Boolean.parseBoolean(getOptional("kylin.query.stream-aggregate-enabled", "true")); + } + @Deprecated //Limit is good even it's large. This config is meaning less since we already have scan threshold public int getStoragePushDownLimitMax() { return Integer.parseInt(getOptional("kylin.query.max-limit-pushdown", "10000")); http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java b/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java index b417877..5cdf08c 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java @@ -19,8 +19,9 @@ package org.apache.kylin.common.util; import java.nio.ByteBuffer; import java.util.BitSet; +import java.util.Iterator; -public class ImmutableBitSet { +public class ImmutableBitSet implements Iterable<Integer> { public static final ImmutableBitSet EMPTY = new ImmutableBitSet(new BitSet()); @@ -168,4 +169,30 @@ public class ImmutableBitSet { return new ImmutableBitSet(bitSet); } }; + + /** + * Iterate over the positions of true value. + * @return the iterator + */ + @Override + public Iterator<Integer> iterator() { + return new Iterator<Integer>() { + int index = 0; + + @Override + public boolean hasNext() { + return index < arr.length; + } + + @Override + public Integer next() { + return arr[index++]; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java b/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java new file mode 100644 index 0000000..de8c88d --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin; + +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.IGTScanner; + +import java.io.IOException; +import java.util.Iterator; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A {@link IGTScanner} which forwards all its method calls to another scanner. + * + * @see <a href="http://en.wikipedia.org/wiki/Decorator_pattern">decorator pattern</a>. + */ +public class GTForwardingScanner implements IGTScanner { + protected IGTScanner delegated; + + protected GTForwardingScanner(IGTScanner delegated) { + this.delegated = checkNotNull(delegated, "delegated"); + } + + @Override + public GTInfo getInfo() { + return delegated.getInfo(); + } + + @Override + public void close() throws IOException { + delegated.close(); + } + + @Override + public Iterator<GTRecord> iterator() { + return delegated.iterator(); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java index 563cf43..5cee9df 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java @@ -18,29 +18,11 @@ package org.apache.kylin.cube.gridtable; -import java.util.Map; - -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.kv.CubeDimEncMap; -import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.dimension.IDimensionEncodingMap; import org.apache.kylin.gridtable.GTInfo; -import org.apache.kylin.metadata.model.TblColRef; public class CubeGridTable { - - public static GTInfo newGTInfo(CubeSegment cubeSeg, long cuboidId) { - Cuboid cuboid = Cuboid.findById(cubeSeg.getCubeDesc(), cuboidId); - return newGTInfo(cuboid, new CubeDimEncMap(cubeSeg)); - } - - public static GTInfo newGTInfo(CubeDesc cubeDesc, long cuboidId, Map<TblColRef, Dictionary<String>> dictionaryMap) { - Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId); - return newGTInfo(cuboid, new CubeDimEncMap(cubeDesc, dictionaryMap)); - } - public static GTInfo newGTInfo(Cuboid cuboid, IDimensionEncodingMap dimEncMap) { CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(cuboid); http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java index 2e5dd12..6879687 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java @@ -140,11 +140,29 @@ public class CuboidToGridTableMapping { return i == null ? -1 : i.intValue(); } + public int[] getDimIndexes(Collection<TblColRef> dims) { + int[] result = new int[dims.size()]; + int i = 0; + for (TblColRef dim : dims) { + result[i++] = getIndexOf(dim); + } + return result; + } + public int getIndexOf(FunctionDesc metric) { Integer r = metrics2gt.get(metric); return r == null ? -1 : r; } + public int[] getMetricsIndexes(Collection<FunctionDesc> metrics) { + int[] result = new int[metrics.size()]; + int i = 0; + for (FunctionDesc metric : metrics) { + result[i++] = getIndexOf(metric); + } + return result; + } + public List<TblColRef> getCuboidDimensionsInGTOrder() { return cuboid.getColumns(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java index e08844e..a26e948 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java @@ -38,6 +38,7 @@ import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.cube.gridtable.CubeGridTable; +import org.apache.kylin.cube.kv.CubeDimEncMap; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.gridtable.GTAggregateScanner; import org.apache.kylin.gridtable.GTBuilder; @@ -108,7 +109,10 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { } private GridTable newGridTableByCuboidID(long cuboidID) throws IOException { - GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap); + GTInfo info = CubeGridTable.newGTInfo( + Cuboid.findById(cubeDesc, cuboidID), + new CubeDimEncMap(cubeDesc, dictionaryMap) + ); // Below several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest. // MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget); http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java index 7cdd4f5..0dd6fa9 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java @@ -45,7 +45,6 @@ import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.measure.MeasureAggregators; -import org.apache.kylin.metadata.datatype.DataType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +62,7 @@ public class GTAggregateScanner implements IGTScanner { final ImmutableBitSet metrics; final String[] metricsAggrFuncs; final IGTScanner inputScanner; + final BufferedMeasureCodec measureCodec; final AggregationCache aggrCache; final long spillThreshold; // 0 means no memory control && no spill final int storagePushDownLimit;//default to be Int.MAX @@ -86,6 +86,7 @@ public class GTAggregateScanner implements IGTScanner { this.metrics = req.getAggrMetrics(); this.metricsAggrFuncs = req.getAggrMetricsFuncs(); this.inputScanner = inputScanner; + this.measureCodec = req.createMeasureCodec(); this.aggrCache = new AggregationCache(); this.spillThreshold = (long) (req.getAggCacheMemThreshold() * MemoryBudgetController.ONE_GB); this.aggrMask = new boolean[metricsAggrFuncs.length]; @@ -175,7 +176,6 @@ public class GTAggregateScanner implements IGTScanner { final int keyLength; final boolean[] compareMask; boolean compareAll = true; - final BufferedMeasureCodec measureCodec; final Comparator<byte[]> bytesComparator = new Comparator<byte[]>() { @Override @@ -213,18 +213,6 @@ public class GTAggregateScanner implements IGTScanner { keyLength = compareMask.length; dumps = Lists.newArrayList(); aggBufMap = createBuffMap(); - measureCodec = createMeasureCodec(); - } - - private BufferedMeasureCodec createMeasureCodec() { - DataType[] types = new DataType[metrics.trueBitCount()]; - for (int i = 0; i < types.length; i++) { - types[i] = info.getColumnType(metrics.trueBitAt(i)); - } - - BufferedMeasureCodec result = new BufferedMeasureCodec(types); - result.setBufferSize(info.getMaxColumnLength(metrics)); - return result; } private boolean[] createCompareMask() { http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java index 717f89c..cad0a04 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Set; +import org.apache.kylin.GTForwardingScanner; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ImmutableBitSet; @@ -33,17 +34,16 @@ import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.tuple.IEvaluatableTuple; -public class GTFilterScanner implements IGTScanner { +public class GTFilterScanner extends GTForwardingScanner { - final private IGTScanner inputScanner; final private TupleFilter filter; final private IFilterCodeSystem<ByteArray> filterCodeSystem; final private IEvaluatableTuple oneTuple; // avoid instance creation private GTRecord next = null; - public GTFilterScanner(IGTScanner inputScanner, GTScanRequest req) throws IOException { - this.inputScanner = inputScanner; + public GTFilterScanner(IGTScanner delegated, GTScanRequest req) throws IOException { + super(delegated); this.filter = req.getFilterPushDown(); this.filterCodeSystem = GTUtil.wrap(getInfo().codeSystem.getComparator()); this.oneTuple = new IEvaluatableTuple() { @@ -53,25 +53,15 @@ public class GTFilterScanner implements IGTScanner { } }; - if (TupleFilter.isEvaluableRecursively(filter) == false) + if (!TupleFilter.isEvaluableRecursively(filter)) throw new IllegalArgumentException(); } @Override - public GTInfo getInfo() { - return inputScanner.getInfo(); - } - - @Override - public void close() throws IOException { - inputScanner.close(); - } - - @Override public Iterator<GTRecord> iterator() { return new Iterator<GTRecord>() { - private Iterator<GTRecord> inputIterator = inputScanner.iterator(); + private Iterator<GTRecord> inputIterator = delegated.iterator(); private FilterResultCache resultCache = new FilterResultCache(getInfo(), filter); @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java index f4480c8..3397adc 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java @@ -21,7 +21,6 @@ package org.apache.kylin.gridtable; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Comparator; -import java.util.List; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.ImmutableBitSet; @@ -46,18 +45,21 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable { } this.info = info; } - - public GTRecord(GTRecord other) { - this.info = other.info; - this.cols = new ByteArray[info.getColumnCount()]; - for (int i = 0; i < other.cols.length; i++) { - this.cols[i] = other.cols[i].copy(); + + @Override + public GTRecord clone() { // deep copy + ByteArray[] cols = new ByteArray[this.cols.length]; + for (int i = 0; i < cols.length; i++) { + cols[i] = this.cols[i].copy(); } + return new GTRecord(this.info, cols); } - @Override - public Object clone() { - return new GTRecord(this); + public void shallowCopyFrom(GTRecord source) { + assert info == source.info; + for (int i = 0; i < cols.length; i++) { + cols[i].set(source.cols[i]); + } } public GTInfo getInfo() { @@ -106,30 +108,18 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable { /** decode and return the values of this record */ public Object[] getValues(ImmutableBitSet selectedCols, Object[] result) { assert selectedCols.cardinality() == result.length; - for (int i = 0; i < selectedCols.trueBitCount(); i++) { - int c = selectedCols.trueBitAt(i); - if (cols[c] == null || cols[c].array() == null) { - result[i] = null; - } else { - result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer()); - } + result[i] = decodeValue(selectedCols.trueBitAt(i)); } return result; } - /** decode and return the values of this record */ - public Object[] getValues(int[] selectedColumns, Object[] result) { - assert selectedColumns.length <= result.length; - for (int i = 0; i < selectedColumns.length; i++) { - int c = selectedColumns[i]; - if (cols[c].array() == null) { - result[i] = null; - } else { - result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer()); - } + public Object decodeValue(int c) { + ByteArray col = cols[c]; + if (col != null && col.array() != null) { + return info.codeSystem.decodeColumnValue(c, col.asBuffer()); } - return result; + return null; } public int sizeOf(ImmutableBitSet selectedCols) { @@ -198,19 +188,13 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable { return compareToInternal(o, info.colAll); } - public int compareToOnPrimaryKey(GTRecord o) { - return compareToInternal(o, info.primaryKey); - } - - public static Comparator<GTRecord> getPrimaryKeyComparator() { + public static Comparator<GTRecord> getComparator(final ImmutableBitSet participateCols) { return new Comparator<GTRecord>() { - @Override public int compare(GTRecord o1, GTRecord o2) { if (o1 == null || o2 == null) { throw new IllegalStateException("Cannot handle null"); } - - return o1.compareToOnPrimaryKey(o2); + return o1.compareToInternal(o2, participateCols); } }; } @@ -287,26 +271,14 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable { loadColumns(info.colBlocks[c], buf); } - /** change pointers to point to data in given buffer, UNLIKE deserialize */ - public void loadColumns(ImmutableBitSet selectedCols, ByteBuffer buf) { - int pos = buf.position(); - for (int i = 0; i < selectedCols.trueBitCount(); i++) { - int c = selectedCols.trueBitAt(i); - int len = info.codeSystem.codeLength(c, buf); - cols[c].set(buf.array(), buf.arrayOffset() + pos, len); - pos += len; - buf.position(pos); - } - } - - /** change pointers to point to data in given buffer, UNLIKE deserialize - * unlike loadColumns(ImmutableBitSet selectedCols, ByteBuffer buf), this - * method allows to defined specific columns(in order) to load + /** + * Change pointers to point to data in given buffer, UNLIKE deserialize + * @param selectedCols positions of column to load + * @param buf buffer containing continuous data of selected columns */ - public void loadColumns(List<Integer> selectedCols, ByteBuffer buf) { + public void loadColumns(Iterable<Integer> selectedCols, ByteBuffer buf) { int pos = buf.position(); - for (int i = 0; i < selectedCols.size(); i++) { - int c = selectedCols.get(i); + for (int c : selectedCols) { int len = info.codeSystem.codeLength(c, buf); cols[c].set(buf.array(), buf.arrayOffset() + pos, len); pos += len; http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java index 4629c8e..ae35d2b 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java @@ -31,6 +31,8 @@ import org.apache.kylin.common.util.BytesSerializer; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.SerializeToByteBuffer; +import org.apache.kylin.measure.BufferedMeasureCodec; +import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; @@ -202,6 +204,17 @@ public class GTScanRequest { } + public BufferedMeasureCodec createMeasureCodec() { + DataType[] metricTypes = new DataType[aggrMetrics.trueBitCount()]; + for (int i = 0; i < metricTypes.length; i++) { + metricTypes[i] = info.getColumnType(aggrMetrics.trueBitAt(i)); + } + + BufferedMeasureCodec codec = new BufferedMeasureCodec(metricTypes); + codec.setBufferSize(info.getMaxColumnLength(aggrMetrics)); + return codec; + } + public boolean isDoingStorageAggregation() { return doingStorageAggregation; } http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java new file mode 100644 index 0000000..1fde423 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.gridtable; + +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; +import org.apache.kylin.GTForwardingScanner; +import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.measure.BufferedMeasureCodec; +import org.apache.kylin.measure.MeasureAggregator; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * GTStreamAggregateScanner requires input records to be sorted on group fields. + * In such cases, it's superior to hash/sort based aggregator because it can produce + * ordered outputs on the fly and the memory consumption is very low. + */ +public class GTStreamAggregateScanner extends GTForwardingScanner { + private final GTScanRequest req; + private final Comparator<GTRecord> keyComparator; + + public GTStreamAggregateScanner(IGTScanner delegated, + GTScanRequest req, Comparator<GTRecord> keyComparator) { + super(delegated); + this.req = req; + this.keyComparator = keyComparator; + } + + @Override + public Iterator<GTRecord> iterator() { + return new StreamMergeGTRecordIterator(delegated.iterator()); + } + + public Iterator<Object[]> valuesIterator(int[] gtDimsIdx, int[] gtMetricsIdx) { + return new StreamMergeValuesIterator(delegated.iterator(), gtDimsIdx, gtMetricsIdx); + } + + private abstract class AbstractStreamMergeIterator<E> implements Iterator<E> { + final PeekingIterator<GTRecord> input; + final IGTCodeSystem codeSystem; + final ImmutableBitSet dimensions; + final ImmutableBitSet metrics; + final String[] metricFuncs; + final BufferedMeasureCodec measureCodec; + + private final GTRecord first; // reuse to avoid object creation + + AbstractStreamMergeIterator(Iterator<GTRecord> input) { + this.input = Iterators.peekingIterator(input); + this.codeSystem = req.getInfo().getCodeSystem(); + this.dimensions = req.getDimensions(); + this.metrics = req.getAggrMetrics(); + this.metricFuncs = req.getAggrMetricsFuncs(); + this.measureCodec = req.createMeasureCodec(); + + this.first = new GTRecord(req.getInfo()); + } + + @Override + public boolean hasNext() { + return input.hasNext(); + } + + private boolean isSameKey(GTRecord o1, GTRecord o2) { + return keyComparator.compare(o1, o2) == 0; + } + + private boolean shouldMergeNext(GTRecord current) { + return input.hasNext() && isSameKey(current, input.peek()); + } + + protected abstract E finalizeResult(GTRecord record); + + protected abstract E finalizeResult(GTRecord record, Object[] aggStates); + + @Override + public E next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + // WATCH OUT! record returned by "input" scanner could be changed later, + // so we must make a shallow copy of it. + first.shallowCopyFrom(input.next()); + + // shortcut to avoid extra deserialize/serialize cost + if (!shouldMergeNext(first)) { + return finalizeResult(first); + } + // merge records with the same key + MeasureAggregator[] aggrs = codeSystem.newMetricsAggregators(metrics, metricFuncs); + aggregate(aggrs, first); + aggregate(aggrs, input.next()); // no need to copy record because it's not referred to later + while (shouldMergeNext(first)) { + aggregate(aggrs, input.next()); + } + + Object[] aggStates = new Object[aggrs.length]; + for (int i = 0; i < aggStates.length; i++) { + aggStates[i] = aggrs[i].getState(); + } + return finalizeResult(first, aggStates); + } + + @SuppressWarnings("unchecked") + protected void aggregate(MeasureAggregator[] aggregators, GTRecord record) { + for (int i = 0; i < aggregators.length; i++) { + int c = metrics.trueBitAt(i); + Object metric = codeSystem.decodeColumnValue(c, record.cols[c].asBuffer()); + aggregators[i].aggregate(metric); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove"); + } + } + + private class StreamMergeGTRecordIterator extends AbstractStreamMergeIterator<GTRecord> { + + private GTRecord returnRecord; // avoid object creation + + StreamMergeGTRecordIterator(Iterator<GTRecord> input) { + super(input); + this.returnRecord = new GTRecord(req.getInfo()); + } + + @Override + protected GTRecord finalizeResult(GTRecord record) { + return record; + } + + @Override + protected GTRecord finalizeResult(GTRecord record, Object[] aggStates) { + // 1. load dimensions + for (int c : dimensions) { + returnRecord.cols[c] = record.cols[c]; + } + // 2. serialize metrics + byte[] bytes = measureCodec.encode(aggStates).array(); + int[] sizes = measureCodec.getMeasureSizes(); + // 3. load metrics + int offset = 0; + for (int i = 0; i < metrics.trueBitCount(); i++) { + int c = metrics.trueBitAt(i); + returnRecord.cols[c].set(bytes, offset, sizes[i]); + offset += sizes[i]; + } + return returnRecord; + } + } + + private class StreamMergeValuesIterator extends AbstractStreamMergeIterator<Object[]> { + + private int[] gtDimsIdx; + private int[] gtMetricsIdx; + private Object[] result; // avoid object creation + + StreamMergeValuesIterator(Iterator<GTRecord> input, int[] gtDimsIdx, int[] gtMetricsIdx) { + super(input); + this.gtDimsIdx = gtDimsIdx; + this.gtMetricsIdx = gtMetricsIdx; + result = new Object[gtDimsIdx.length + gtMetricsIdx.length]; + } + + private void decodeAndSetDimensions(GTRecord record) { + for (int i = 0; i < gtDimsIdx.length; i++) { + result[i] = record.decodeValue(gtDimsIdx[i]); + } + } + + @Override + protected Object[] finalizeResult(GTRecord record) { + decodeAndSetDimensions(record); + // decode metrics + for (int i = 0; i < gtMetricsIdx.length; i++) { + result[gtDimsIdx.length + i] = record.decodeValue(gtMetricsIdx[i]); + } + return result; + } + + @Override + protected Object[] finalizeResult(GTRecord record, Object[] aggStates) { + decodeAndSetDimensions(record); + // set metrics + for (int i = 0; i < gtMetricsIdx.length; i++) { + result[gtDimsIdx.length + i] = aggStates[i]; + } + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java index 77cc2d8..1ae229a 100644 --- a/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java @@ -29,6 +29,7 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.gridtable.CubeGridTable; +import org.apache.kylin.cube.kv.CubeDimEncMap; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -90,7 +91,8 @@ public class GTScanReqSerDerTest extends LocalFileMetadataTestCase { CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube("test_kylin_cube_with_slr_ready"); CubeSegment segment = cube.getFirstSegment(); - GTInfo info = CubeGridTable.newGTInfo(segment, Cuboid.getBaseCuboidId(cube.getDescriptor())); + Cuboid baseCuboid = Cuboid.getBaseCuboid(cube.getDescriptor()); + GTInfo info = CubeGridTable.newGTInfo(baseCuboid, new CubeDimEncMap(segment)); GTInfo.serializer.serialize(info, buffer); buffer.flip(); http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java index 998f1db..bb17054 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java @@ -18,10 +18,12 @@ package org.apache.kylin.storage; +import java.util.Comparator; import java.util.concurrent.atomic.AtomicLong; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.metadata.realization.IRealization; import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase; import org.slf4j.Logger; @@ -48,6 +50,9 @@ public class StorageContext { private boolean needStorageAggregation = false; private boolean enableCoprocessor = false; + private boolean enableStreamAggregate = false; + private Comparator<GTRecord> groupKeyComparator; + private IStorageQuery storageQuery; private AtomicLong processedRowCount = new AtomicLong(); private Cuboid cuboid; @@ -230,4 +235,19 @@ public class StorageContext { this.storageQuery = storageQuery; } + public boolean isStreamAggregateEnabled() { + return enableStreamAggregate; + } + + public void enableStreamAggregate() { + this.enableStreamAggregate = true; + } + + public Comparator<GTRecord> getGroupKeyComparator() { + return groupKeyComparator; + } + + public void setGroupKeyComparator(Comparator<GTRecord> groupKeyComparator) { + this.groupKeyComparator = groupKeyComparator; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java index 6911827..c3cc858 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java @@ -38,6 +38,7 @@ import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; import org.apache.kylin.cube.gridtable.RecordComparators; import org.apache.kylin.cube.gridtable.ScanRangePlannerBase; import org.apache.kylin.cube.gridtable.SegmentGTStartAndEnd; +import org.apache.kylin.cube.kv.CubeDimEncMap; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; @@ -85,7 +86,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase { Set<TblColRef> filterDims = Sets.newHashSet(); TupleFilter.collectColumns(filter, filterDims); - this.gtInfo = CubeGridTable.newGTInfo(cubeSegment, cuboid.getId()); + this.gtInfo = CubeGridTable.newGTInfo(cuboid, new CubeDimEncMap(cubeSegment)); CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping(); IGTComparator comp = gtInfo.getCodeSystem().getComparator(); http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java index 4f206d4..31a9f99 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java @@ -78,7 +78,7 @@ public class CubeSegmentScanner implements IGTScanner { } scanRequest = scanRangePlanner.planScanRequest(); String gtStorage = ((GTCubeStorageQueryBase) context.getStorageQuery()).getGTStorage(); - scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage); + scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage, context); } @Override @@ -96,8 +96,7 @@ public class CubeSegmentScanner implements IGTScanner { return scanRequest == null ? null : scanRequest.getInfo(); } - public CubeSegment getSegment() { - return this.cubeSeg; + public GTScanRequest getScanRequest() { + return scanRequest; } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java index 280718f..b762e5c 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java @@ -28,10 +28,8 @@ import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; import org.apache.kylin.dict.lookup.LookupStringTable; -import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.measure.MeasureType; import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller; import org.apache.kylin.metadata.model.FunctionDesc; @@ -43,7 +41,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; /** - * convert GTRecord to tuple + * Convert Object[] (decoded GTRecord) to tuple */ public class CubeTupleConverter implements ITupleConverter { @@ -54,7 +52,6 @@ public class CubeTupleConverter implements ITupleConverter { private final int[] gtColIdx; private final int[] tupleIdx; - private final Object[] gtValues; private final MeasureType<?>[] measureTypes; private final List<IAdvMeasureFiller> advMeasureFillers; @@ -63,19 +60,16 @@ public class CubeTupleConverter implements ITupleConverter { private final int nSelectedDims; public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, // - Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) { + Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, int[] gtColIdx, + TupleInfo returnTupleInfo) { this.cubeSeg = cubeSeg; this.cuboid = cuboid; + this.gtColIdx = gtColIdx; this.tupleInfo = returnTupleInfo; this.derivedColFillers = Lists.newArrayList(); - List<TblColRef> cuboidDims = cuboid.getColumns(); - CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping(); - nSelectedDims = selectedDimensions.size(); - gtColIdx = new int[selectedDimensions.size() + selectedMetrics.size()]; tupleIdx = new int[selectedDimensions.size() + selectedMetrics.size()]; - gtValues = new Object[selectedDimensions.size() + selectedMetrics.size()]; // measure types don't have this many, but aligned length make programming easier measureTypes = new MeasureType[selectedDimensions.size() + selectedMetrics.size()]; @@ -89,21 +83,11 @@ public class CubeTupleConverter implements ITupleConverter { // pre-calculate dimension index mapping to tuple for (TblColRef dim : selectedDimensions) { - int dimIndex = mapping.getIndexOf(dim); - gtColIdx[i] = dimIndex; tupleIdx[i] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1; - - // if (tupleIdx[iii] == -1) { - // throw new IllegalStateException("dim not used in tuple:" + dim); - // } - i++; } for (FunctionDesc metric : selectedMetrics) { - int metricIndex = mapping.getIndexOf(metric); - gtColIdx[i] = metricIndex; - if (metric.needRewrite()) { String rewriteFieldName = metric.getRewriteFieldName(); tupleIdx[i] = tupleInfo.hasField(rewriteFieldName) ? tupleInfo.getFieldIndex(rewriteFieldName) : -1; @@ -126,7 +110,7 @@ public class CubeTupleConverter implements ITupleConverter { } // prepare derived columns and filler - Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(cuboidDims, null); + Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(cuboid.getColumns(), null); for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) { TblColRef[] hostCols = entry.getKey().data; for (DeriveInfo deriveInfo : entry.getValue()) { @@ -148,9 +132,8 @@ public class CubeTupleConverter implements ITupleConverter { } @Override - public List<IAdvMeasureFiller> translateResult(GTRecord record, Tuple tuple) { - - record.getValues(gtColIdx, gtValues); + public List<IAdvMeasureFiller> translateResult(Object[] gtValues, Tuple tuple) { + assert gtValues.length == gtColIdx.length; // dimensions for (int i = 0; i < nSelectedDims; i++) { http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index ecf1ad3..82590a2 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -26,15 +26,18 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.RawQueryLastHacker; import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; import org.apache.kylin.dict.lookup.LookupStringTable; +import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.measure.MeasureType; import org.apache.kylin.metadata.filter.ColumnTupleFilter; import org.apache.kylin.metadata.filter.CompareTupleFilter; @@ -120,6 +123,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { // set limit push down enableStorageLimitIfPossible(cuboid, groups, derivedPostAggregation, groupsD, filter, loosenedColumnD, sqlDigest.aggregations, context); + // set whether to aggregate results from multiple partitions + enableStreamAggregateIfBeneficial(cuboid, groupsD, context); // set query deadline context.setDeadline(cubeInstance); @@ -144,8 +149,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { protected abstract String getGTStorage(); - protected ITupleConverter newCubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo tupleInfo) { - return new CubeTupleConverter(cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo); + protected ITupleConverter newCubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, int[] gtColIdx, TupleInfo tupleInfo) { + return new CubeTupleConverter(cubeSeg, cuboid, selectedDimensions, selectedMetrics, gtColIdx, tupleInfo); } protected void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) { @@ -366,6 +371,35 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { } } + private void enableStreamAggregateIfBeneficial(Cuboid cuboid, Set<TblColRef> groupsD, StorageContext context) { + CubeDesc cubeDesc = cuboid.getCubeDesc(); + boolean enabled = cubeDesc.getConfig().isStreamAggregateEnabled(); + + Set<TblColRef> shardByInGroups = Sets.newHashSet(); + for (TblColRef col : cubeDesc.getShardByColumns()) { + if (groupsD.contains(col)) { + shardByInGroups.add(col); + } + } + if (!shardByInGroups.isEmpty()) { + enabled = false; + logger.debug("Aggregate partition results is not beneficial because shard by columns in groupD: " + shardByInGroups); + } + + if (!context.isNeedStorageAggregation()) { + enabled = false; + logger.debug("Aggregate partition results is not beneficial because no storage aggregation"); + } + + if (enabled) { + CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping(); + ImmutableBitSet cols = mapping.makeGridTableColumns(groupsD); + + context.enableStreamAggregate(); + context.setGroupKeyComparator(GTRecord.getComparator(cols)); + } + } + protected void notifyBeforeStorageQuery(SQLDigest sqlDigest) { Map<String, List<MeasureDesc>> map = Maps.newHashMap(); for (MeasureDesc measure : cubeDesc.getMeasures()) { http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java index 9c50d0c..dd48e4d 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java @@ -20,11 +20,10 @@ package org.apache.kylin.storage.gtrecord; import java.util.List; -import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller; import org.apache.kylin.metadata.tuple.Tuple; public interface ITupleConverter { - public List<IAdvMeasureFiller> translateResult(GTRecord record, Tuple tuple); + public List<IAdvMeasureFiller> translateResult(Object[] gtValues, Tuple tuple); } http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java new file mode 100644 index 0000000..474e1e0 --- /dev/null +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.gtrecord; + +import com.google.common.collect.UnmodifiableIterator; +import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; + +import java.nio.ByteBuffer; +import java.util.NoSuchElementException; + +/** + * Support iterate over {@code GTRecord}s in storage partition result. + * + * <p>Note that the implementation returns the same object for next(). + * Client needs to copy the returned record when needed. + */ +public class PartitionResultIterator extends UnmodifiableIterator<GTRecord> { + private final ByteBuffer buffer; + private final ImmutableBitSet cols; + private final GTRecord record; // reuse to avoid object creation + + public PartitionResultIterator(byte[] data, GTInfo info, ImmutableBitSet cols) { + this.buffer = ByteBuffer.wrap(data); + this.cols = cols; + this.record = new GTRecord(info); + } + + @Override + public boolean hasNext() { + return buffer.hasRemaining(); + } + + @Override + public GTRecord next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + record.loadColumns(cols, buffer); + return record; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java new file mode 100644 index 0000000..52029d3 --- /dev/null +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.gtrecord; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; +import com.google.common.collect.UnmodifiableIterator; +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.PriorityQueue; + +/** + * Merge-sort {@code GTRecord}s in all partitions, assume each partition contains sorted elements. + */ +public class PartitionResultMerger implements Iterable<GTRecord> { + private final ImmutableList<PartitionResultIterator> partitionResults; + private final GTInfo info; + private final Comparator<GTRecord> comparator; + + public PartitionResultMerger( + Iterable<PartitionResultIterator> partitionResults, + GTInfo info, Comparator<GTRecord> comparator) { + this.partitionResults = ImmutableList.copyOf(partitionResults); + this.info = info; + this.comparator = comparator; + } + + @Override + public Iterator<GTRecord> iterator() { + if (partitionResults.size() == 1) { + return partitionResults.get(0); + } + return new MergingResultsIterator(); + } + + private class MergingResultsIterator extends UnmodifiableIterator<GTRecord> { + final GTRecord record = new GTRecord(info); // reuse to avoid object creation + + PriorityQueue<PeekingIterator<GTRecord>> heap; + + MergingResultsIterator() { + Comparator<PeekingIterator<GTRecord>> heapComparator = new Comparator<PeekingIterator<GTRecord>>() { + public int compare(PeekingIterator<GTRecord> o1, PeekingIterator<GTRecord> o2) { + return comparator.compare(o1.peek(), o2.peek()); + } + }; + this.heap = new PriorityQueue<>(partitionResults.size(), heapComparator); + + for (PartitionResultIterator it : partitionResults) { + if (it.hasNext()) { + heap.offer(Iterators.peekingIterator(it)); + } + } + } + + @Override + public boolean hasNext() { + return !heap.isEmpty(); + } + + @Override + public GTRecord next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + // get smallest record + PeekingIterator<GTRecord> it = heap.poll(); + // WATCH OUT! record got from PartitionResultIterator.next() may changed later, + // so we must make a shallow copy of it. + record.shallowCopyFrom(it.next()); + + if (it.hasNext()) { + heap.offer(it); + } + + return record; + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java index 9e89227..fe22e9c 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java @@ -30,6 +30,7 @@ import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.gridtable.IGTStorage; import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.storage.StorageContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +39,7 @@ public class ScannerWorker { private static final Logger logger = LoggerFactory.getLogger(ScannerWorker.class); private IGTScanner internal = null; - public ScannerWorker(ISegment segment, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage) { + public ScannerWorker(ISegment segment, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage, StorageContext context) { if (scanRequest == null) { logger.info("Segment {} will be skipped", segment); internal = new EmptyGTScanner(); @@ -48,7 +49,7 @@ public class ScannerWorker { final GTInfo info = scanRequest.getInfo(); try { - IGTStorage rpc = (IGTStorage) Class.forName(gtStorage).getConstructor(ISegment.class, Cuboid.class, GTInfo.class).newInstance(segment, cuboid, info); // default behavior + IGTStorage rpc = (IGTStorage) Class.forName(gtStorage).getConstructor(ISegment.class, Cuboid.class, GTInfo.class, StorageContext.class).newInstance(segment, cuboid, info, context); // default behavior internal = rpc.getGTScanner(scanRequest); } catch (IOException | InstantiationException | InvocationTargetException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java index 37699a3..11f766c 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java @@ -24,8 +24,14 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Set; +import com.google.common.collect.UnmodifiableIterator; import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; +import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GTScanRequest; +import org.apache.kylin.gridtable.GTStreamAggregateScanner; +import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.TblColRef; @@ -49,7 +55,7 @@ public class SegmentCubeTupleIterator implements ITupleIterator { protected final Tuple tuple; protected final StorageContext context; - protected Iterator<GTRecord> gtItr; + protected Iterator<Object[]> gtValues; protected ITupleConverter cubeTupleConverter; protected Tuple next; @@ -66,12 +72,62 @@ public class SegmentCubeTupleIterator implements ITupleIterator { this.tupleInfo = returnTupleInfo; this.tuple = new Tuple(returnTupleInfo); this.context = context; - this.gtItr = getGTItr(scanner); - this.cubeTupleConverter = ((GTCubeStorageQueryBase) context.getStorageQuery()).newCubeTupleConverter(scanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo); + + CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping(); + int[] gtDimsIdx = mapping.getDimIndexes(selectedDimensions); + int[] gtMetricsIdx = mapping.getMetricsIndexes(selectedMetrics); + // gtColIdx = gtDimsIdx + gtMetricsIdx + int[] gtColIdx = new int[gtDimsIdx.length + gtMetricsIdx.length]; + System.arraycopy(gtDimsIdx, 0, gtColIdx, 0, gtDimsIdx.length); + System.arraycopy(gtMetricsIdx, 0, gtColIdx, gtDimsIdx.length, gtMetricsIdx.length); + + this.gtValues = getGTValuesIterator(scanner.iterator(), scanner.getScanRequest(), gtDimsIdx, gtMetricsIdx); + this.cubeTupleConverter = ((GTCubeStorageQueryBase) context.getStorageQuery()).newCubeTupleConverter( + scanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, gtColIdx, tupleInfo); } - private Iterator<GTRecord> getGTItr(CubeSegmentScanner scanner) { - return scanner.iterator(); + private Iterator<Object[]> getGTValuesIterator( + final Iterator<GTRecord> records, final GTScanRequest scanRequest, + final int[] gtDimsIdx, final int[] gtMetricsIdx) { + + boolean singlePartitionResult = records instanceof PartitionResultIterator; + if (context.isStreamAggregateEnabled() && !singlePartitionResult) { + // input records are ordered, leverage stream aggregator to produce possibly fewer records + IGTScanner inputScanner = new IGTScanner() { + public GTInfo getInfo() { + return scanRequest.getInfo(); + } + + public void close() throws IOException {} + + public Iterator<GTRecord> iterator() { + return records; + } + }; + GTStreamAggregateScanner aggregator = new GTStreamAggregateScanner( + inputScanner, scanRequest, context.getGroupKeyComparator()); + return aggregator.valuesIterator(gtDimsIdx, gtMetricsIdx); + } + + // simply decode records + return new UnmodifiableIterator<Object[]>() { + Object[] result = new Object[gtDimsIdx.length + gtMetricsIdx.length]; + + public boolean hasNext() { + return records.hasNext(); + } + + public Object[] next() { + GTRecord record = records.next(); + for (int i = 0; i < gtDimsIdx.length; i++) { + result[i] = record.decodeValue(gtDimsIdx[i]); + } + for (int i = 0; i < gtMetricsIdx.length; i++) { + result[gtDimsIdx.length + i] = record.decodeValue(gtMetricsIdx[i]); + } + return result; + } + }; } @Override @@ -91,13 +147,13 @@ public class SegmentCubeTupleIterator implements ITupleIterator { } // now we have a GTRecord - if (!gtItr.hasNext()) { + if (!gtValues.hasNext()) { return false; } - GTRecord curRecord = gtItr.next(); + Object[] gtValues = this.gtValues.next(); // translate into tuple - advMeasureFillers = cubeTupleConverter.translateResult(curRecord, tuple); + advMeasureFillers = cubeTupleConverter.translateResult(gtValues, tuple); // the simple case if (advMeasureFillers == null) { http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java index 1a80bbf..0f1e191 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java @@ -18,22 +18,20 @@ package org.apache.kylin.storage.gtrecord; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Iterator; - -import javax.annotation.Nullable; - +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.storage.StorageContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; -import com.google.common.collect.Iterators; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; /** * scatter the blob returned from region server to a iterable of gtrecords @@ -42,18 +40,20 @@ public class StorageResponseGTScatter implements IGTScanner { private static final Logger logger = LoggerFactory.getLogger(StorageResponseGTScatter.class); - private GTInfo info; + private final GTInfo info; private IPartitionStreamer partitionStreamer; - private Iterator<byte[]> blocks; - private ImmutableBitSet columns; - private int storagePushDownLimit = -1; + private final Iterator<byte[]> blocks; + private final ImmutableBitSet columns; + private final StorageContext context; + private final boolean needSorted; // whether scanner should return sorted records - public StorageResponseGTScatter(GTInfo info, IPartitionStreamer partitionStreamer, ImmutableBitSet columns, int storagePushDownLimit) { - this.info = info; + public StorageResponseGTScatter(GTScanRequest scanRequest, IPartitionStreamer partitionStreamer, StorageContext context) { + this.info = scanRequest.getInfo(); this.partitionStreamer = partitionStreamer; this.blocks = partitionStreamer.asByteArrayIterator(); - this.columns = columns; - this.storagePushDownLimit = storagePushDownLimit; + this.columns = scanRequest.getColumns(); + this.context = context; + this.needSorted = (context.getFinalPushDownLimit() != Integer.MAX_VALUE) || context.isStreamAggregateEnabled(); } @Override @@ -69,48 +69,18 @@ public class StorageResponseGTScatter implements IGTScanner { @Override public Iterator<GTRecord> iterator() { - Iterator<Iterator<GTRecord>> shardSubsets = Iterators.transform(blocks, new EndpointResponseGTScatterFunc()); - if (StorageContext.mergeSortPartitionResults(storagePushDownLimit)) { - logger.info("Using SortedIteratorMergerWithLimit to merge partition results"); - return new SortedIteratorMergerWithLimit<GTRecord>(shardSubsets, storagePushDownLimit, GTRecord.getPrimaryKeyComparator()).getIterator(); - } else { - logger.info("Using Iterators.concat to merge partition results"); - return Iterators.concat(shardSubsets); + List<PartitionResultIterator> partitionResults = Lists.newArrayList(); + while (blocks.hasNext()) { + partitionResults.add(new PartitionResultIterator(blocks.next(), info, columns)); } - } - - class EndpointResponseGTScatterFunc implements Function<byte[], Iterator<GTRecord>> { - @Nullable - @Override - public Iterator<GTRecord> apply(@Nullable final byte[] input) { - - return new Iterator<GTRecord>() { - private ByteBuffer inputBuffer = null; - //rotate between two buffer GTRecord to support SortedIteratorMergerWithLimit, which will peek one more GTRecord - private GTRecord firstRecord = null; - - @Override - public boolean hasNext() { - if (inputBuffer == null) { - inputBuffer = ByteBuffer.wrap(input); - firstRecord = new GTRecord(info); - } - return inputBuffer.position() < inputBuffer.limit(); - } - - @Override - public GTRecord next() { - firstRecord.loadColumns(columns, inputBuffer); - return firstRecord; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; + if (!needSorted) { + logger.debug("Using Iterators.concat to merge partition results"); + return Iterators.concat(partitionResults.iterator()); } - } + logger.debug("Using PartitionResultMerger to merge partition results"); + PartitionResultMerger merger = new PartitionResultMerger(partitionResults, info, context.getGroupKeyComparator()); + return merger.iterator(); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 82b67b6..e822ada 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -47,6 +47,7 @@ import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.storage.StorageContext; import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer; import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter; import org.apache.kylin.storage.hbase.HBaseConnection; @@ -69,8 +70,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { private static ExecutorService executorService = new LoggableCachedThreadPool(); - public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo) { - super(segment, cuboid, fullGTInfo); + public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) { + super(segment, cuboid, fullGTInfo, context); } private byte[] getByteArrayForShort(short v) { @@ -245,7 +246,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { }); } - return new StorageResponseGTScatter(fullGTInfo, new DummyPartitionStreamer(epResultItr), scanRequest.getColumns(), scanRequest.getStoragePushDownLimit()); + return new StorageResponseGTScatter(scanRequest, new DummyPartitionStreamer(epResultItr), storageContext); } private ByteString serializeGTScanReq(GTScanRequest scanRequest) { http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index 88e7176..db81646 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -49,6 +49,7 @@ import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRange; import org.apache.kylin.gridtable.IGTStorage; import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.storage.StorageContext; import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,17 +65,19 @@ public abstract class CubeHBaseRPC implements IGTStorage { final protected Cuboid cuboid; final protected GTInfo fullGTInfo; final protected QueryContext queryContext; + final protected StorageContext storageContext; final private RowKeyEncoder fuzzyKeyEncoder; final private RowKeyEncoder fuzzyMaskEncoder; - public CubeHBaseRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo) { + public CubeHBaseRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) { Preconditions.checkArgument(segment instanceof CubeSegment, "segment must be CubeSegment"); this.cubeSeg = (CubeSegment) segment; this.cuboid = cuboid; this.fullGTInfo = fullGTInfo; this.queryContext = QueryContext.current(); + this.storageContext = context; this.fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid); this.fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid); http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java index 33f8d90..951e2ef 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java @@ -42,6 +42,7 @@ import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.gridtable.IGTStore; import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.storage.StorageContext; import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,8 +88,8 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { } } - public CubeHBaseScanRPC(ISegment segment, Cuboid cuboid, final GTInfo fullGTInfo) { - super(segment, cuboid, fullGTInfo); + public CubeHBaseScanRPC(ISegment segment, Cuboid cuboid, final GTInfo fullGTInfo, StorageContext context) { + super(segment, cuboid, fullGTInfo, context); } @Override