Repository: kylin Updated Branches: refs/heads/master ef148601d -> 5b942267f
KYLIN-2606 Only return counter for precise count_distinct if query is exactAggregate Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5b942267 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5b942267 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5b942267 Branch: refs/heads/master Commit: 5b942267fc26175199b4b26229e54def6728ce87 Parents: ef14860 Author: kangkaisen <kangkai...@163.com> Authored: Wed Feb 15 19:53:17 2017 +0800 Committer: kangkaisen <kangkai...@meituan.com> Committed: Sat Aug 19 13:30:36 2017 +0800 ---------------------------------------------------------------------- .../kylin/cube/gridtable/CubeCodeSystem.java | 4 + .../org/apache/kylin/gridtable/GTRecord.java | 8 + .../kylin/gridtable/GTSampleCodeSystem.java | 4 + .../apache/kylin/gridtable/IGTCodeSystem.java | 3 + .../measure/bitmap/BitmapCounterFactory.java | 2 + .../kylin/measure/bitmap/BitmapSerializer.java | 38 ++++- .../measure/bitmap/RoaringBitmapCounter.java | 10 ++ .../bitmap/RoaringBitmapCounterFactory.java | 5 + .../metadata/datatype/DataTypeSerializer.java | 13 ++ .../gtrecord/GTCubeStorageQueryBase.java | 53 ++++++- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 1 + .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 2 +- .../hbase/cube/v2/HBaseReadonlyStore.java | 33 +++- .../coprocessor/endpoint/CubeVisitService.java | 2 +- .../endpoint/generated/CubeVisitProtos.java | 151 +++++++++++++++---- .../endpoint/protobuf/CubeVisit.proto | 1 + 16 files changed, 292 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/5b942267/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java index aaa12a7..9eae6f3 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java @@ -177,4 +177,8 @@ public class CubeCodeSystem implements IGTCodeSystem { return result; } + @Override + public DataTypeSerializer<?> getSerializer(int col) { + return serializers[col]; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/5b942267/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java index 3e62ea7..f65e4b5 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java @@ -296,4 +296,12 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable { } } + /** change pointers to point to data in given buffer, this + * method allows to defined specific column to load */ + public void loadColumns(int selectedCol, ByteBuffer buf) { + int pos = buf.position(); + int len = info.codeSystem.codeLength(selectedCol, buf); + cols[selectedCol].set(buf.array(), buf.arrayOffset() + pos, len); + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/5b942267/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java index 3f3c844..2a5e791 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java @@ -118,4 +118,8 @@ public class GTSampleCodeSystem implements IGTCodeSystem { } }; + @Override + public DataTypeSerializer<?> getSerializer(int col) { + return serializers[col]; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/5b942267/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java index 89dfc99..9c8ad6b 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.dimension.DimensionEncoding; import org.apache.kylin.measure.MeasureAggregator; +import org.apache.kylin.metadata.datatype.DataTypeSerializer; public interface IGTCodeSystem { @@ -62,4 +63,6 @@ public interface IGTCodeSystem { /** Return aggregators for metrics */ MeasureAggregator<?>[] newMetricsAggregators(ImmutableBitSet columns, String[] aggrFunctions); + /** Return specific DataTypeSerializer */ + DataTypeSerializer<?> getSerializer(int col); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5b942267/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java index da7748e..39aa1be 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java @@ -26,5 +26,7 @@ public interface BitmapCounterFactory { BitmapCounter newBitmap(int... values); + BitmapCounter newBitmap(long counter); + BitmapCounter newBitmap(ByteBuffer in) throws IOException; } http://git-wip-us.apache.org/repos/asf/kylin/blob/5b942267/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java index c1b260d..1c13876 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java @@ -28,6 +28,9 @@ public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> { private static final BitmapCounterFactory factory = RoaringBitmapCounterFactory.INSTANCE; private static final BitmapCounter DELEGATE = factory.newBitmap(); + private static final int IS_RESULT_FLAG = 1; + private static final int RESULT_SIZE = 12; + // called by reflection public BitmapSerializer(DataType type) { } @@ -44,8 +47,13 @@ public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> { @Override public BitmapCounter deserialize(ByteBuffer in) { try { - return factory.newBitmap(in); - + //The length of RoaringBitmap is larger than 12 + if (peekLength(in) == RESULT_SIZE) { + int flag = in.getInt(); + return factory.newBitmap(in.getLong()); + } else { + return factory.newBitmap(in); + } } catch (IOException e) { throw new RuntimeException(e); } @@ -53,7 +61,12 @@ public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> { @Override public int peekLength(ByteBuffer in) { - return DELEGATE.peekLength(in); + ByteBuffer buffer = in.slice(); + if (buffer.getInt(0) == IS_RESULT_FLAG) { + return RESULT_SIZE; + } else { + return DELEGATE.peekLength(in); + } } @Override @@ -71,4 +84,23 @@ public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> { // It's difficult to decide the size before data was ingested, comparing with HLLCounter(16) as 64KB, here is assumption return 8 * 1024; } + + @Override + public boolean supportDirectReturnResult() { + return true; + } + + @Override + public ByteBuffer getFinalResult(ByteBuffer in) { + ByteBuffer out = ByteBuffer.allocate(RESULT_SIZE); + try { + BitmapCounter counter = factory.newBitmap(in); + out.putInt(IS_RESULT_FLAG); + out.putLong(counter.getCount()); + } catch (IOException e) { + throw new RuntimeException(e); + } + out.flip(); + return out; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/5b942267/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java index eec45f2..9929e24 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java @@ -35,6 +35,7 @@ import java.util.Iterator; public class RoaringBitmapCounter implements BitmapCounter, Serializable { private ImmutableRoaringBitmap bitmap; + private Long counter; RoaringBitmapCounter() { bitmap = new MutableRoaringBitmap(); @@ -44,6 +45,11 @@ public class RoaringBitmapCounter implements BitmapCounter, Serializable { this.bitmap = bitmap; } + RoaringBitmapCounter(long counter) { + this.counter = counter; + } + + private MutableRoaringBitmap getMutableBitmap() { if (bitmap instanceof MutableRoaringBitmap) { return (MutableRoaringBitmap) bitmap; @@ -86,6 +92,10 @@ public class RoaringBitmapCounter implements BitmapCounter, Serializable { @Override public long getCount() { + if (counter != null) { + return counter; + } + return bitmap.getCardinality(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5b942267/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java index 822afa2..8ab908a 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java @@ -40,6 +40,11 @@ public class RoaringBitmapCounterFactory implements BitmapCounterFactory, Serial } @Override + public BitmapCounter newBitmap(long counter) { + return new RoaringBitmapCounter(counter); + } + + @Override public BitmapCounter newBitmap(ByteBuffer in) throws IOException { RoaringBitmapCounter counter = new RoaringBitmapCounter(); counter.readFields(in); http://git-wip-us.apache.org/repos/asf/kylin/blob/5b942267/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java index a4a35a4..2de38c0 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java @@ -89,6 +89,19 @@ abstract public class DataTypeSerializer<T> implements BytesSerializer<T>, java. throw new UnsupportedOperationException(); } + /** If the query is exactAggregation and has some memory hungry measures, + * we could directly return final result to speed up the query. + * If the DataTypeSerializer support this, + * which should override the getFinalResult method, besides that, the deserialize and peekLength method should also support it, like {@link org.apache.kylin.measure.bitmap.BitmapSerializer} */ + public boolean supportDirectReturnResult() { + return false; + } + + /** An optional method that converts a expensive buffer to lightweight buffer containing final result (for memory hungry measures) */ + public ByteBuffer getFinalResult(ByteBuffer in) { + throw new UnsupportedOperationException(); + } + /** Convert from obj to string */ public String toString(T value) { if (value == null) http://git-wip-us.apache.org/repos/asf/kylin/blob/5b942267/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index 3543c75..7b7b9ca 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -44,6 +44,7 @@ import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.PartitionDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.Segments; import org.apache.kylin.metadata.model.TblColRef; @@ -135,6 +136,10 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { Set<TblColRef> singleValuesD = findSingleValueColumns(filter); context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD)); + // exactAggregation mean: needn't aggregation at storage and query engine both. + boolean exactAggregation = isExactAggregation(context, cuboid, groups, otherDimsD, singleValuesD, derivedPostAggregation, sqlDigest.aggregations); + context.setExactAggregation(exactAggregation); + // replace derived columns in filter with host columns; columns on loosened condition must be added to group by Set<TblColRef> loosenedColumnD = Sets.newHashSet(); Set<TblColRef> filterColumnD = Sets.newHashSet(); @@ -428,7 +433,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { sublist.get(0).getFunction().getMeasureType().adjustSqlDigest(sublist, sqlDigest); } } - + private TupleFilter checkHavingCanPushDown(TupleFilter havingFilter, Set<TblColRef> groupsD, List<FunctionDesc> aggregations, Set<FunctionDesc> metrics) { // must have only one segment Segments<CubeSegment> readySegs = cubeInstance.getSegments(SegmentStatusEnum.READY); @@ -465,4 +470,50 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { return havingFilter; } + private boolean isExactAggregation(StorageContext context, Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation, Collection<FunctionDesc> functionDescs) { + if (context.isNeedStorageAggregation()) { + logger.info("exactAggregation is false because need storage aggregation"); + return false; + } + + if (cuboid.requirePostAggregation()) { + logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId()); + return false; + } + + // derived aggregation is bad, unless expanded columns are already in group by + if (groups.containsAll(derivedPostAggregation) == false) { + logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation); + return false; + } + + // other columns (from filter) is bad, unless they are ensured to have single value + if (singleValuesD.containsAll(othersD) == false) { + logger.info("exactAggregation is false because some column not on group by: " + othersD // + + " (single value column: " + singleValuesD + ")"); + return false; + } + + //for DimensionAsMetric like max(cal_dt), the dimension column maybe not in real group by + for (FunctionDesc functionDesc : functionDescs) { + if (functionDesc.isDimensionAsMetric()) { + logger.info("exactAggregation is false because has DimensionAsMetric"); + return false; + } + } + + // for partitioned cube, the partition column must belong to group by or has single value + PartitionDesc partDesc = cuboid.getCubeDesc().getModel().getPartitionDesc(); + if (partDesc.isPartitioned()) { + TblColRef col = partDesc.getPartitionDateColumnRef(); + if (!groups.contains(col) && !singleValuesD.contains(col)) { + logger.info("exactAggregation is false because cube is partitioned and " + col + " is not on group by"); + return false; + } + } + + logger.info("exactAggregation is true, cuboid id is " + cuboid.getId()); + return true; + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/5b942267/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index e822ada..af8754d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -163,6 +163,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { } builder.setSpillEnabled(cubeSeg.getConfig().getQueryCoprocessorSpillEnabled()); builder.setMaxScanBytes(cubeSeg.getConfig().getPartitionMaxScanBytes()); + builder.setIsExactAggregate(storageContext.isExactAggregation()); for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) { executorService.submit(new Runnable() { http://git-wip-us.apache.org/repos/asf/kylin/blob/5b942267/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java index 951e2ef..a8f4fd8 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java @@ -193,7 +193,7 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { } }; - IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT, cubeSeg.getRowKeyPreambleSize(), false); + IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT, cubeSeg.getRowKeyPreambleSize(), false, storageContext.isExactAggregation()); IGTScanner rawScanner = store.scan(scanRequest); final IGTScanner decorateScanner = scanRequest.decorateScanner(rawScanner); http://git-wip-us.apache.org/repos/asf/kylin/blob/5b942267/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java index 631e8e8..4ec0c9d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java @@ -20,6 +20,7 @@ package org.apache.kylin.storage.hbase.cube.v2; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -44,18 +45,19 @@ public class HBaseReadonlyStore implements IGTStore { private List<List<Integer>> hbaseColumnsToGT; private int rowkeyPreambleSize; private boolean withDelay = false; - + private boolean isExactAggregation; /** * @param withDelay is for test use */ - public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> hbaseColumnsToGT, int rowkeyPreambleSize, boolean withDelay) { + public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> hbaseColumnsToGT, int rowkeyPreambleSize, boolean withDelay, boolean isExactAggregation) { this.cellListIterator = cellListIterator; this.info = gtScanRequest.getInfo(); this.hbaseColumns = hbaseColumns; this.hbaseColumnsToGT = hbaseColumnsToGT; this.rowkeyPreambleSize = rowkeyPreambleSize; this.withDelay = withDelay; + this.isExactAggregation = isExactAggregation; } @Override @@ -132,6 +134,12 @@ public class HBaseReadonlyStore implements IGTStore { buf = byteBuffer(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); oneRecord.loadColumns(hbaseColumnsToGT.get(i), buf); } + + + if (isExactAggregation && getDirectReturnResultColumns().size() > 0) { + trimGTRecord(oneRecord); + } + return oneRecord; } @@ -145,6 +153,27 @@ public class HBaseReadonlyStore implements IGTStore { return ByteBuffer.wrap(array, offset, length); } + private List<Integer> getDirectReturnResultColumns() { + List<Integer> columns = new ArrayList<>(); + for (int i = 0; i < info.getColumnCount(); i++) { + if (info.getCodeSystem().getSerializer(i).supportDirectReturnResult()) { + columns.add(i); + } + } + return columns; + } + + private void trimGTRecord(GTRecord record) { + List<Integer> directReturnResultColumns = getDirectReturnResultColumns(); + for (Integer i : directReturnResultColumns) { + ByteBuffer recordBuffer = record.get(i).asBuffer(); + if (recordBuffer!= null) { + ByteBuffer trimmedBuffer = info.getCodeSystem().getSerializer(i).getFinalResult(recordBuffer); + record.loadColumns(i, trimmedBuffer); + } + } + } + }; } http://git-wip-us.apache.org/repos/asf/kylin/blob/5b942267/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 1877064..3791e63 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -293,7 +293,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement !request.hasMaxScanBytes() ? Long.MAX_VALUE : request.getMaxScanBytes(), // for new client scanReq.getTimeout()); - IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize(), behavior.delayToggledOn()); + IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize(), behavior.delayToggledOn(), request.getIsExactAggregate()); IGTScanner rawScanner = store.scan(scanReq); IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, behavior.filterToggledOn(), behavior.aggrToggledOn(), false, request.getSpillEnabled()); http://git-wip-us.apache.org/repos/asf/kylin/blob/5b942267/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java index b9f2771..4c662c9 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java @@ -123,7 +123,7 @@ public final class CubeVisitProtos { * <code>optional int64 maxScanBytes = 8;</code> * * <pre> - * 0 means no limit + * must be positive * </pre> */ boolean hasMaxScanBytes(); @@ -131,10 +131,20 @@ public final class CubeVisitProtos { * <code>optional int64 maxScanBytes = 8;</code> * * <pre> - * 0 means no limit + * must be positive * </pre> */ long getMaxScanBytes(); + + // optional bool isExactAggregate = 9 [default = false]; + /** + * <code>optional bool isExactAggregate = 9 [default = false];</code> + */ + boolean hasIsExactAggregate(); + /** + * <code>optional bool isExactAggregate = 9 [default = false];</code> + */ + boolean getIsExactAggregate(); } /** * Protobuf type {@code CubeVisitRequest} @@ -230,6 +240,11 @@ public final class CubeVisitProtos { maxScanBytes_ = input.readInt64(); break; } + case 72: { + bitField0_ |= 0x00000080; + isExactAggregate_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -982,7 +997,7 @@ public final class CubeVisitProtos { * <code>optional int64 maxScanBytes = 8;</code> * * <pre> - * 0 means no limit + * must be positive * </pre> */ public boolean hasMaxScanBytes() { @@ -992,13 +1007,29 @@ public final class CubeVisitProtos { * <code>optional int64 maxScanBytes = 8;</code> * * <pre> - * 0 means no limit + * must be positive * </pre> */ public long getMaxScanBytes() { return maxScanBytes_; } + // optional bool isExactAggregate = 9 [default = false]; + public static final int ISEXACTAGGREGATE_FIELD_NUMBER = 9; + private boolean isExactAggregate_; + /** + * <code>optional bool isExactAggregate = 9 [default = false];</code> + */ + public boolean hasIsExactAggregate() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * <code>optional bool isExactAggregate = 9 [default = false];</code> + */ + public boolean getIsExactAggregate() { + return isExactAggregate_; + } + private void initFields() { gtScanRequest_ = com.google.protobuf.ByteString.EMPTY; hbaseRawScan_ = com.google.protobuf.ByteString.EMPTY; @@ -1008,6 +1039,7 @@ public final class CubeVisitProtos { queryId_ = ""; spillEnabled_ = true; maxScanBytes_ = 0L; + isExactAggregate_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -1061,6 +1093,9 @@ public final class CubeVisitProtos { if (((bitField0_ & 0x00000040) == 0x00000040)) { output.writeInt64(8, maxScanBytes_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + output.writeBool(9, isExactAggregate_); + } getUnknownFields().writeTo(output); } @@ -1102,6 +1137,10 @@ public final class CubeVisitProtos { size += com.google.protobuf.CodedOutputStream .computeInt64Size(8, maxScanBytes_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(9, isExactAggregate_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -1162,6 +1201,11 @@ public final class CubeVisitProtos { result = result && (getMaxScanBytes() == other.getMaxScanBytes()); } + result = result && (hasIsExactAggregate() == other.hasIsExactAggregate()); + if (hasIsExactAggregate()) { + result = result && (getIsExactAggregate() + == other.getIsExactAggregate()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -1207,6 +1251,10 @@ public final class CubeVisitProtos { hash = (37 * hash) + MAXSCANBYTES_FIELD_NUMBER; hash = (53 * hash) + hashLong(getMaxScanBytes()); } + if (hasIsExactAggregate()) { + hash = (37 * hash) + ISEXACTAGGREGATE_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getIsExactAggregate()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -1337,6 +1385,8 @@ public final class CubeVisitProtos { bitField0_ = (bitField0_ & ~0x00000040); maxScanBytes_ = 0L; bitField0_ = (bitField0_ & ~0x00000080); + isExactAggregate_ = false; + bitField0_ = (bitField0_ & ~0x00000100); return this; } @@ -1402,6 +1452,10 @@ public final class CubeVisitProtos { to_bitField0_ |= 0x00000040; } result.maxScanBytes_ = maxScanBytes_; + if (((from_bitField0_ & 0x00000100) == 0x00000100)) { + to_bitField0_ |= 0x00000080; + } + result.isExactAggregate_ = isExactAggregate_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1469,6 +1523,9 @@ public final class CubeVisitProtos { if (other.hasMaxScanBytes()) { setMaxScanBytes(other.getMaxScanBytes()); } + if (other.hasIsExactAggregate()) { + setIsExactAggregate(other.getIsExactAggregate()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -2068,7 +2125,7 @@ public final class CubeVisitProtos { * <code>optional int64 maxScanBytes = 8;</code> * * <pre> - * 0 means no limit + * must be positive * </pre> */ public boolean hasMaxScanBytes() { @@ -2078,7 +2135,7 @@ public final class CubeVisitProtos { * <code>optional int64 maxScanBytes = 8;</code> * * <pre> - * 0 means no limit + * must be positive * </pre> */ public long getMaxScanBytes() { @@ -2088,7 +2145,7 @@ public final class CubeVisitProtos { * <code>optional int64 maxScanBytes = 8;</code> * * <pre> - * 0 means no limit + * must be positive * </pre> */ public Builder setMaxScanBytes(long value) { @@ -2101,7 +2158,7 @@ public final class CubeVisitProtos { * <code>optional int64 maxScanBytes = 8;</code> * * <pre> - * 0 means no limit + * must be positive * </pre> */ public Builder clearMaxScanBytes() { @@ -2111,6 +2168,39 @@ public final class CubeVisitProtos { return this; } + // optional bool isExactAggregate = 9 [default = false]; + private boolean isExactAggregate_ ; + /** + * <code>optional bool isExactAggregate = 9 [default = false];</code> + */ + public boolean hasIsExactAggregate() { + return ((bitField0_ & 0x00000100) == 0x00000100); + } + /** + * <code>optional bool isExactAggregate = 9 [default = false];</code> + */ + public boolean getIsExactAggregate() { + return isExactAggregate_; + } + /** + * <code>optional bool isExactAggregate = 9 [default = false];</code> + */ + public Builder setIsExactAggregate(boolean value) { + bitField0_ |= 0x00000100; + isExactAggregate_ = value; + onChanged(); + return this; + } + /** + * <code>optional bool isExactAggregate = 9 [default = false];</code> + */ + public Builder clearIsExactAggregate() { + bitField0_ = (bitField0_ & ~0x00000100); + isExactAggregate_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:CubeVisitRequest) } @@ -5516,32 +5606,33 @@ public final class CubeVisitProtos { java.lang.String[] descriptorData = { "\npstorage-hbase/src/main/java/org/apache" + "/kylin/storage/hbase/cube/v2/coprocessor" + - "/endpoint/protobuf/CubeVisit.proto\"\205\002\n\020C" + + "/endpoint/protobuf/CubeVisit.proto\"\246\002\n\020C" + "ubeVisitRequest\022\025\n\rgtScanRequest\030\001 \002(\014\022\024" + "\n\014hbaseRawScan\030\002 \002(\014\022\032\n\022rowkeyPreambleSi" + "ze\030\003 \002(\005\0223\n\020hbaseColumnsToGT\030\004 \003(\0132\031.Cub" + "eVisitRequest.IntList\022\027\n\017kylinProperties" + "\030\005 \002(\t\022\017\n\007queryId\030\006 \001(\t\022\032\n\014spillEnabled\030" + - "\007 \001(\010:\004true\022\024\n\014maxScanBytes\030\010 \001(\003\032\027\n\007Int" + - "List\022\014\n\004ints\030\001 \003(\005\"\253\004\n\021CubeVisitResponse", - "\022\026\n\016compressedRows\030\001 \002(\014\022\'\n\005stats\030\002 \002(\0132" + - "\030.CubeVisitResponse.Stats\022/\n\terrorInfo\030\003" + - " \001(\0132\034.CubeVisitResponse.ErrorInfo\032\220\002\n\005S" + - "tats\022\030\n\020serviceStartTime\030\001 \001(\003\022\026\n\016servic" + - "eEndTime\030\002 \001(\003\022\027\n\017scannedRowCount\030\003 \001(\003\022" + - "\032\n\022aggregatedRowCount\030\004 \001(\003\022\025\n\rsystemCpu" + - "Load\030\005 \001(\001\022\036\n\026freePhysicalMemorySize\030\006 \001" + - "(\001\022\031\n\021freeSwapSpaceSize\030\007 \001(\001\022\020\n\010hostnam" + - "e\030\010 \001(\t\022\016\n\006etcMsg\030\t \001(\t\022\026\n\016normalComplet" + - "e\030\n \001(\005\022\024\n\014scannedBytes\030\013 \001(\003\032H\n\tErrorIn", - "fo\022*\n\004type\030\001 \002(\0162\034.CubeVisitResponse.Err" + - "orType\022\017\n\007message\030\002 \002(\t\"G\n\tErrorType\022\020\n\014" + - "UNKNOWN_TYPE\020\000\022\013\n\007TIMEOUT\020\001\022\033\n\027RESOURCE_" + - "LIMIT_EXCEEDED\020\0022F\n\020CubeVisitService\0222\n\t" + - "visitCube\022\021.CubeVisitRequest\032\022.CubeVisit" + - "ResponseB`\nEorg.apache.kylin.storage.hba" + - "se.cube.v2.coprocessor.endpoint.generate" + - "dB\017CubeVisitProtosH\001\210\001\001\240\001\001" + "\007 \001(\010:\004true\022\024\n\014maxScanBytes\030\010 \001(\003\022\037\n\020isE" + + "xactAggregate\030\t \001(\010:\005false\032\027\n\007IntList\022\014\n", + "\004ints\030\001 \003(\005\"\253\004\n\021CubeVisitResponse\022\026\n\016com" + + "pressedRows\030\001 \002(\014\022\'\n\005stats\030\002 \002(\0132\030.CubeV" + + "isitResponse.Stats\022/\n\terrorInfo\030\003 \001(\0132\034." + + "CubeVisitResponse.ErrorInfo\032\220\002\n\005Stats\022\030\n" + + "\020serviceStartTime\030\001 \001(\003\022\026\n\016serviceEndTim" + + "e\030\002 \001(\003\022\027\n\017scannedRowCount\030\003 \001(\003\022\032\n\022aggr" + + "egatedRowCount\030\004 \001(\003\022\025\n\rsystemCpuLoad\030\005 " + + "\001(\001\022\036\n\026freePhysicalMemorySize\030\006 \001(\001\022\031\n\021f" + + "reeSwapSpaceSize\030\007 \001(\001\022\020\n\010hostname\030\010 \001(\t" + + "\022\016\n\006etcMsg\030\t \001(\t\022\026\n\016normalComplete\030\n \001(\005", + "\022\024\n\014scannedBytes\030\013 \001(\003\032H\n\tErrorInfo\022*\n\004t" + + "ype\030\001 \002(\0162\034.CubeVisitResponse.ErrorType\022" + + "\017\n\007message\030\002 \002(\t\"G\n\tErrorType\022\020\n\014UNKNOWN" + + "_TYPE\020\000\022\013\n\007TIMEOUT\020\001\022\033\n\027RESOURCE_LIMIT_E" + + "XCEEDED\020\0022F\n\020CubeVisitService\0222\n\tvisitCu" + + "be\022\021.CubeVisitRequest\032\022.CubeVisitRespons" + + "eB`\nEorg.apache.kylin.storage.hbase.cube" + + ".v2.coprocessor.endpoint.generatedB\017Cube" + + "VisitProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -5553,7 +5644,7 @@ public final class CubeVisitProtos { internal_static_CubeVisitRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_CubeVisitRequest_descriptor, - new java.lang.String[] { "GtScanRequest", "HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", "KylinProperties", "QueryId", "SpillEnabled", "MaxScanBytes", }); + new java.lang.String[] { "GtScanRequest", "HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", "KylinProperties", "QueryId", "SpillEnabled", "MaxScanBytes", "IsExactAggregate", }); internal_static_CubeVisitRequest_IntList_descriptor = internal_static_CubeVisitRequest_descriptor.getNestedTypes().get(0); internal_static_CubeVisitRequest_IntList_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/kylin/blob/5b942267/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto index aa83595..8ca8756 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto @@ -38,6 +38,7 @@ message CubeVisitRequest { optional string queryId = 6; optional bool spillEnabled = 7 [default = true]; optional int64 maxScanBytes = 8; // must be positive + optional bool isExactAggregate = 9 [default = false]; message IntList { repeated int32 ints = 1; }