Repository: kylin Updated Branches: refs/heads/master 97bfd5747 -> 48e164924
KYLIN-1787 Enable limit and threshold for v2 storage Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/48e16492 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/48e16492 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/48e16492 Branch: refs/heads/master Commit: 48e164924944a6f945a9d1e20828967146201695 Parents: 97bfd57 Author: Li Yang <liy...@apache.org> Authored: Wed Jun 22 16:57:38 2016 +0800 Committer: Li Yang <liy...@apache.org> Committed: Wed Jun 22 16:58:07 2016 +0800 ---------------------------------------------------------------------- .../cube/inmemcubing/InMemCubeBuilder.java | 4 +- .../kylin/gridtable/GTAggregateScanner.java | 3 -- .../kylin/gridtable/GTScanRangePlanner.java | 6 +-- .../apache/kylin/gridtable/GTScanRequest.java | 41 ++++++++++++++------ .../gridtable/benchmark/GTScannerBenchmark.java | 2 +- .../benchmark/GTScannerBenchmark2.java | 2 +- .../gridtable/AggregationCacheSpillTest.java | 6 ++- .../kylin/gridtable/DictGridTableTest.java | 4 +- .../kylin/gridtable/SimpleGridTableTest.java | 2 +- .../hbase/cube/v2/CubeSegmentScanner.java | 11 +++++- .../storage/hbase/cube/v2/CubeStorageQuery.java | 25 +++++++----- .../cube/v2/SequentialCubeTupleIterator.java | 16 ++++++++ .../coprocessor/endpoint/CubeVisitService.java | 4 ++ 13 files changed, 88 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java index c270d3f..fe83ccf 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java @@ -329,7 +329,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { IGTScanner baseInput = new InputConverter(baseCuboid.getInfo(), input); Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = InMemCubeBuilderUtils.getDimensionAndMetricColumnBitSet(baseCuboidId, measureCount); - GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null, true, 0); + GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null); GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req); aggregationScanner.trackMemoryLevel(baseCuboidMemTracker); @@ -397,7 +397,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { private GTAggregateScanner prepareGTAggregationScanner(GridTable gridTable, long parentId, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException { GTInfo info = gridTable.getInfo(); - GTScanRequest req = new GTScanRequest(info, null, null, aggregationColumns, measureColumns, metricsAggrFuncs, null, true, 0); + GTScanRequest req = new GTScanRequest(info, null, null, aggregationColumns, measureColumns, metricsAggrFuncs, null); GTAggregateScanner scanner = (GTAggregateScanner) gridTable.scan(req); // for child cuboid, some measures don't need aggregation. http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java index d748297..53cc387 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java @@ -25,8 +25,6 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Comparator; @@ -38,7 +36,6 @@ import java.util.SortedMap; import org.apache.commons.io.IOUtils; import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.MemoryBudgetController; import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel; http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java index 4f641e9..7173815 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java @@ -162,11 +162,11 @@ public class GTScanRangePlanner { } - public GTScanRequest planScanRequest(boolean allowPreAggregate) { + public GTScanRequest planScanRequest() { GTScanRequest scanRequest; List<GTScanRange> scanRanges = this.planScanRanges(); if (scanRanges != null && scanRanges.size() != 0) { - scanRequest = new GTScanRequest(gtInfo, scanRanges, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter, allowPreAggregate, cubeSegment.getCubeInstance().getConfig().getQueryCoprocessorMemGB()); + scanRequest = new GTScanRequest(gtInfo, scanRanges, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter); } else { scanRequest = null; } @@ -177,7 +177,7 @@ public class GTScanRangePlanner { * Overwrite this method to provide smarter storage visit plans * @return */ - public List<GTScanRange> planScanRanges() { + List<GTScanRange> planScanRanges() { TupleFilter flatFilter = flattenToOrAndFilter(gtFilter); List<Collection<ColumnRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter); http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java index 55d84e6..593469a 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java @@ -52,21 +52,18 @@ public class GTScanRequest { // hint to storage behavior private boolean allowPreAggregation = true; - private double aggrCacheGB = 0; // no limit + private double aggrCacheGB = 0; // 0 means no row/memory limit; positive means memory limit in GB; negative means row limit public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet columns, TupleFilter filterPushDown) { - this.info = info; - if (ranges == null) { - this.ranges = Lists.newArrayList(new GTScanRange(new GTRecord(info), new GTRecord(info))); - } else { - this.ranges = ranges; - } - this.columns = columns; - this.filterPushDown = filterPushDown; - validate(info); + this(info, ranges, columns, null, null, null, filterPushDown, true, 0); } public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, // + ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown) { + this(info, ranges, dimensions, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown, true, 0); + } + + private GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, // ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, boolean allowPreAggregation, double aggrCacheGB) { this.info = info; if (ranges == null) { @@ -238,14 +235,36 @@ public class GTScanRequest { return aggrMetricsFuncs; } + public boolean isAllowPreAggregation() { + return allowPreAggregation; + } + + public void setAllowPreAggregation(boolean allowPreAggregation) { + this.allowPreAggregation = allowPreAggregation; + } + public double getAggrCacheGB() { - return aggrCacheGB; + if (aggrCacheGB < 0) + return 0; + else + return aggrCacheGB; } public void setAggrCacheGB(double gb) { this.aggrCacheGB = gb; } + public int getRowLimit() { + if (aggrCacheGB < 0) + return (int) -aggrCacheGB; + else + return 0; + } + + public void setRowLimit(int limit) { + aggrCacheGB = -limit; + } + @Override public String toString() { return "GTScanRequest [range=" + ranges + ", columns=" + columns + ", filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + Arrays.toString(aggrMetricsFuncs) + "]"; http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java index 19c4bea..0d96ac0 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java @@ -110,7 +110,7 @@ public class GTScannerBenchmark { @SuppressWarnings("unused") private void testAggregate(ImmutableBitSet groupBy) throws IOException { long t = System.currentTimeMillis(); - GTScanRequest req = new GTScanRequest(info, null, dimensions, groupBy, metrics, aggrFuncs, null, true, 10); + GTScanRequest req = new GTScanRequest(info, null, dimensions, groupBy, metrics, aggrFuncs, null); IGTScanner scanner = req.decorateScanner(gen.generate(N)); long count = 0; http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java index e1e3881..8a376ba 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java @@ -132,7 +132,7 @@ public class GTScannerBenchmark2 { @SuppressWarnings("unused") private void testAggregate(ImmutableBitSet groupBy) throws IOException { long t = System.currentTimeMillis(); - GTScanRequest req = new GTScanRequest(info, null, dimensions, groupBy, metrics, aggrFuncs, null, true, 10); + GTScanRequest req = new GTScanRequest(info, null, dimensions, groupBy, metrics, aggrFuncs, null); IGTScanner scanner = req.decorateScanner(gen.generate(N)); long count = 0; http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java index ff67c9c..fd891f5 100644 --- a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java @@ -83,7 +83,8 @@ public class AggregationCacheSpillTest extends LocalFileMetadataTestCase { } }; - GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(0, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null, true, 0.5); + GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(0, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null); + scanRequest.setAggrCacheGB(0.5); GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest); @@ -125,7 +126,8 @@ public class AggregationCacheSpillTest extends LocalFileMetadataTestCase { }; // all-in-mem testcase - GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(1, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null, true, 0.5); + GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(1, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null); + scanRequest.setAggrCacheGB(0.5); GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest); http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java index a328216..af39e21 100644 --- a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java @@ -289,7 +289,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1))); LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable); - GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter, true, 0); + GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter); // note the unEvaluatable column 1 in filter is added to group by assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString()); @@ -305,7 +305,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10")); LogicalTupleFilter filter = and(fComp1, fComp2); - GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter, true, 0); + GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter); // note the evaluatable column 1 in filter is added to returned columns but not in group by assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString()); http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java index 1b8529f..d300787 100644 --- a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java @@ -104,7 +104,7 @@ public class SimpleGridTableTest extends LocalFileMetadataTestCase { } private IGTScanner scanAndAggregate(GridTable table) throws IOException { - GTScanRequest req = new GTScanRequest(table.getInfo(), null, null, setOf(0, 2), setOf(3, 4), new String[] { "count", "sum" }, null, true, 0); + GTScanRequest req = new GTScanRequest(table.getInfo(), null, null, setOf(0, 2), setOf(3, 4), new String[] { "count", "sum" }, null); IGTScanner scanner = table.scan(req); int i = 0; for (GTRecord r : scanner) { http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java index 2b55ace..9890ae9 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java @@ -37,6 +37,7 @@ import org.apache.kylin.metadata.filter.ITupleFilterTransformer; import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.storage.StorageContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +52,7 @@ public class CubeSegmentScanner implements IGTScanner { final GTScanRequest scanRequest; public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, // - Collection<FunctionDesc> metrics, TupleFilter filter, boolean allowPreAggregate) { + Collection<FunctionDesc> metrics, TupleFilter filter, StorageContext context) { this.cuboid = cuboid; this.cubeSeg = cubeSeg; @@ -66,7 +67,13 @@ public class CubeSegmentScanner implements IGTScanner { } catch (Exception e) { throw new RuntimeException(e); } - scanRequest = scanRangePlanner.planScanRequest(allowPreAggregate); + scanRequest = scanRangePlanner.planScanRequest(); + if (scanRequest != null) { + scanRequest.setAllowPreAggregation(!context.isExactAggregation()); + scanRequest.setAggrCacheGB(cubeSeg.getCubeInstance().getConfig().getQueryCoprocessorMemGB()); + if (context.isLimitEnabled()) + scanRequest.setRowLimit(context.getLimit()); + } scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest); } http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java index c8a5412..cec4e2f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java @@ -40,6 +40,7 @@ import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.PartitionDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.realization.SQLDigest; @@ -93,7 +94,7 @@ public class CubeStorageQuery implements IStorageQuery { Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>(); dimensionsD.addAll(groupsD); dimensionsD.addAll(otherDimsD); - Cuboid cuboid = Cuboid.identifyCuboid(cubeDesc,dimensionsD, metrics); + Cuboid cuboid = Cuboid.identifyCuboid(cubeDesc, dimensionsD, metrics); context.setCuboid(cuboid); // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine @@ -104,20 +105,16 @@ public class CubeStorageQuery implements IStorageQuery { // replace derived columns in filter with host columns; columns on loosened condition must be added to group by TupleFilter filterD = translateDerived(filter, groupsD); - //actually even if the threshold is set, it will not be used in this query engine setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory - setLimit(filter, context); List<CubeSegmentScanner> scanners = Lists.newArrayList(); for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) { CubeSegmentScanner scanner; if (cubeSeg.getInputRecords() == 0) { - logger.warn("cube segment {} input record is 0, " + - "it may caused by kylin failed to the job counter " + - "as the hadoop history server wasn't running", cubeSeg); + logger.warn("cube segment {} input record is 0, " + "it may caused by kylin failed to the job counter " + "as the hadoop history server wasn't running", cubeSeg); } - scanner = new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, !isExactAggregation); + scanner = new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, context); scanners.add(scanner); } @@ -170,8 +167,6 @@ public class CubeStorageQuery implements IStorageQuery { return expanded; } - - @SuppressWarnings("unchecked") private Set<TblColRef> findSingleValueColumns(TupleFilter filter) { Collection<? extends TupleFilter> toCheck; @@ -236,6 +231,16 @@ public class CubeStorageQuery implements IStorageQuery { + " (single value column: " + singleValuesD + ")"); } + // for partitioned cube, the partition column must belong to group by or has single value + PartitionDesc partDesc = cuboid.getCubeDesc().getModel().getPartitionDesc(); + if (partDesc.isPartitioned()) { + TblColRef col = partDesc.getPartitionDateColumnRef(); + if (!groups.contains(col) && !singleValuesD.contains(col)) { + exact = false; + logger.info("exactAggregation is false because cube is partitioned and " + col + " is not on group by"); + } + } + if (exact) { logger.info("exactAggregation is true"); } @@ -343,7 +348,7 @@ public class CubeStorageQuery implements IStorageQuery { long rowEst = this.cubeInstance.getConfig().getQueryMemBudget() / rowSizeEst; if (rowEst > 0) { - logger.info("Memory budget is set to: " + rowEst); + logger.info("Memory budget is set to " + rowEst + " rows"); context.setThreshold((int) rowEst); } else { logger.info("Memory budget is not set."); http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java index bbc7370..f8b055c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java @@ -34,6 +34,7 @@ import org.apache.kylin.metadata.tuple.ITupleIterator; import org.apache.kylin.metadata.tuple.Tuple; import org.apache.kylin.metadata.tuple.TupleInfo; import org.apache.kylin.storage.StorageContext; +import org.apache.kylin.storage.exception.ScanOutOfLimitException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,6 +78,9 @@ public class SequentialCubeTupleIterator implements ITupleIterator { if (next != null) return true; + if (hitLimitAndThreshold()) + return false; + // consume any left rows from advanced measure filler if (advMeasureRowsRemaining > 0) { for (IAdvMeasureFiller filler : advMeasureFillers) { @@ -137,6 +141,18 @@ public class SequentialCubeTupleIterator implements ITupleIterator { } + private boolean hitLimitAndThreshold() { + // check limit + if (context.isLimitEnabled() && scanCount >= context.getLimit() + context.getOffset()) { + return true; + } + // check threshold + if (scanCount >= context.getThreshold()) { + throw new ScanOutOfLimitException("Scan row count exceeded threshold: " + context.getThreshold() + ", please add filter condition to narrow down backend scan range, like where clause."); + } + return false; + } + @Override public ITuple next() { // fetch next record http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 3827aa9..d320dc5 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -243,6 +243,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement final MutableBoolean scanNormalComplete = new MutableBoolean(true); final long startTime = this.serviceStartTime; final long timeout = request.getTimeout(); + final int rowLimit = scanReq.getRowLimit(); final CellListIterator cellListIterator = new CellListIterator() { @@ -257,6 +258,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement @Override public boolean hasNext() { + if (rowLimit > 0 && rowLimit <= counter) + return false; + if (counter % 1000 == 1) { if (System.currentTimeMillis() - startTime > timeout) { scanNormalComplete.setValue(false);