KYLIN-1922 optimize needStorageAggregation check logic and make sure self-termination in coprocessor works
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e38557b4 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e38557b4 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e38557b4 Branch: refs/heads/KYLIN-1726 Commit: e38557b4d1cd1d42fe042e5500020cbfaba2d80b Parents: e87c816 Author: Hongbin Ma <mahong...@apache.org> Authored: Fri Sep 9 15:57:25 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Fri Sep 9 16:42:33 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 15 +- .../apache/kylin/cube/RawQueryLastHacker.java | 7 +- .../cube/gridtable/CubeScanRangePlanner.java | 340 ---------- .../kylin/gridtable/GTAggregateScanner.java | 10 +- .../apache/kylin/gridtable/GTFilterScanner.java | 6 +- .../GTScanExceedThresholdException.java | 2 +- .../apache/kylin/gridtable/GTScanRequest.java | 34 +- .../GTScanSelfTerminatedException.java | 26 + .../kylin/gridtable/GTScanTimeoutException.java | 2 +- .../gridtable/AggregationCacheSpillTest.java | 6 +- .../kylin/gridtable/DictGridTableTest.java | 617 ------------------ .../storage/gtrecord/CubeScanRangePlanner.java | 357 +++++++++++ .../storage/gtrecord/CubeSegmentScanner.java | 14 +- .../gtrecord/GTCubeStorageQueryBase.java | 36 +- .../storage/gtrecord/DictGridTableTest.java | 626 +++++++++++++++++++ .../apache/kylin/query/ITKylinQueryTest.java | 55 +- .../resources/query/sql_timeout/query01.sql | 19 + .../common/coprocessor/CoprocessorBehavior.java | 1 + .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 8 +- .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 2 +- .../hbase/cube/v2/ExpectedSizeIterator.java | 39 +- .../hbase/cube/v2/HBaseReadonlyStore.java | 11 +- .../coprocessor/endpoint/CubeVisitService.java | 26 +- 23 files changed, 1197 insertions(+), 1062 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index f0c91da..2ac9d48 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -481,8 +481,8 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.query.scan.threshold", "10000000")); } - public int getCubeVisitTimeoutTimes() { - return Integer.parseInt(getOptional("kylin.query.cube.visit.timeout.times", "1")); + public float getCubeVisitTimeoutTimes() { + return Float.parseFloat(getOptional("kylin.query.cube.visit.timeout.times", "1")); } public int getBadQueryStackTraceDepth() { @@ -545,15 +545,6 @@ abstract public class KylinConfigBase implements Serializable { return Boolean.parseBoolean(this.getOptional("kylin.query.ignore_unknown_function", "false")); } - public String getQueryStorageVisitPlanner() { - return this.getOptional("kylin.query.storage.visit.planner", "org.apache.kylin.cube.gridtable.CubeScanRangePlanner"); - } - - // for test only - public void setQueryStorageVisitPlanner(String v) { - setProperty("kylin.query.storage.visit.planner", v); - } - public int getQueryScanFuzzyKeyMax() { return Integer.parseInt(this.getOptional("kylin.query.scan.fuzzykey.max", "200")); } @@ -573,7 +564,7 @@ abstract public class KylinConfigBase implements Serializable { public boolean getQueryMetricsEnabled() { return Boolean.parseBoolean(getOptional("kylin.query.metrics.enabled", "false")); } - + public int[] getQueryMetricsPercentilesIntervals() { String[] dft = { "60", "300", "3600" }; return getOptionalIntArray("kylin.query.metrics.percentiles.intervals", dft); http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-cube/src/main/java/org/apache/kylin/cube/RawQueryLastHacker.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/RawQueryLastHacker.java b/core-cube/src/main/java/org/apache/kylin/cube/RawQueryLastHacker.java index 63ddac5..50c644e 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/RawQueryLastHacker.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/RawQueryLastHacker.java @@ -44,13 +44,14 @@ public class RawQueryLastHacker { // We need to retrieve cube to manually add columns into sqlDigest, so that we have full-columns results as output. boolean isSelectAll = sqlDigest.allColumns.isEmpty() || sqlDigest.allColumns.equals(sqlDigest.filterColumns); for (TblColRef col : cubeDesc.listAllColumns()) { - if (col.getTable().equals(sqlDigest.factTable) && (cubeDesc.listDimensionColumnsIncludingDerived().contains(col) || isSelectAll)) { - sqlDigest.allColumns.add(col); + if (cubeDesc.listDimensionColumnsIncludingDerived().contains(col) || isSelectAll) { + if (col.getTable().equals(sqlDigest.factTable)) + sqlDigest.allColumns.add(col); } } for (TblColRef col : sqlDigest.allColumns) { - if (cubeDesc.listDimensionColumnsIncludingDerived().contains(col)) { + if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) { // For dimension columns, take them as group by columns. sqlDigest.groupbyColumns.add(col); } else { http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeScanRangePlanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeScanRangePlanner.java deleted file mode 100644 index a937045..0000000 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeScanRangePlanner.java +++ /dev/null @@ -1,340 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.cube.gridtable; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.debug.BackdoorToggles; -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.common.FuzzyValueCombination; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.gridtable.GTInfo; -import org.apache.kylin.gridtable.GTRecord; -import org.apache.kylin.gridtable.GTScanRange; -import org.apache.kylin.gridtable.GTScanRequest; -import org.apache.kylin.gridtable.GTScanRequestBuilder; -import org.apache.kylin.gridtable.GTUtil; -import org.apache.kylin.gridtable.IGTComparator; -import org.apache.kylin.metadata.filter.TupleFilter; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.TblColRef; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -public class CubeScanRangePlanner extends ScanRangePlannerBase { - - private static final Logger logger = LoggerFactory.getLogger(CubeScanRangePlanner.class); - - protected int maxScanRanges; - protected int maxFuzzyKeys; - - //non-GT - protected CubeSegment cubeSegment; - protected CubeDesc cubeDesc; - protected Cuboid cuboid; - - public CubeScanRangePlanner(CubeSegment cubeSegment, Cuboid cuboid, TupleFilter filter, Set<TblColRef> dimensions, Set<TblColRef> groupbyDims, // - Collection<FunctionDesc> metrics) { - - this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax(); - this.maxFuzzyKeys = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax(); - - this.cubeSegment = cubeSegment; - this.cubeDesc = cubeSegment.getCubeDesc(); - this.cuboid = cuboid; - - Set<TblColRef> filterDims = Sets.newHashSet(); - TupleFilter.collectColumns(filter, filterDims); - - this.gtInfo = CubeGridTable.newGTInfo(cubeSegment, cuboid.getId()); - CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping(); - - IGTComparator comp = gtInfo.getCodeSystem().getComparator(); - //start key GTRecord compare to start key GTRecord - this.rangeStartComparator = RecordComparators.getRangeStartComparator(comp); - //stop key GTRecord compare to stop key GTRecord - this.rangeEndComparator = RecordComparators.getRangeEndComparator(comp); - //start key GTRecord compare to stop key GTRecord - this.rangeStartEndComparator = RecordComparators.getRangeStartEndComparator(comp); - - //replace the constant values in filter to dictionary codes - this.gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, gtInfo, mapping.getCuboidDimensionsInGTOrder(), groupbyDims); - - this.gtDimensions = mapping.makeGridTableColumns(dimensions); - this.gtAggrGroups = mapping.makeGridTableColumns(replaceDerivedColumns(groupbyDims, cubeSegment.getCubeDesc())); - this.gtAggrMetrics = mapping.makeGridTableColumns(metrics); - this.gtAggrFuncs = mapping.makeAggrFuncs(metrics); - - if (cubeSegment.getModel().getPartitionDesc().isPartitioned()) { - int index = mapping.getIndexOf(cubeSegment.getModel().getPartitionDesc().getPartitionDateColumnRef()); - if (index >= 0) { - SegmentGTStartAndEnd segmentGTStartAndEnd = new SegmentGTStartAndEnd(cubeSegment, gtInfo); - this.gtStartAndEnd = segmentGTStartAndEnd.getSegmentStartAndEnd(index); - this.isPartitionColUsingDatetimeEncoding = segmentGTStartAndEnd.isUsingDatetimeEncoding(index); - this.gtPartitionCol = gtInfo.colRef(index); - } - } - - } - - /** - * constrcut GTScanRangePlanner with incomplete information. only be used for UT - * @param info - * @param gtStartAndEnd - * @param gtPartitionCol - * @param gtFilter - */ - public CubeScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> gtStartAndEnd, TblColRef gtPartitionCol, TupleFilter gtFilter) { - - this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax(); - this.maxFuzzyKeys = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax(); - - this.gtInfo = info; - - IGTComparator comp = gtInfo.getCodeSystem().getComparator(); - //start key GTRecord compare to start key GTRecord - this.rangeStartComparator = RecordComparators.getRangeStartComparator(comp); - //stop key GTRecord compare to stop key GTRecord - this.rangeEndComparator = RecordComparators.getRangeEndComparator(comp); - //start key GTRecord compare to stop key GTRecord - this.rangeStartEndComparator = RecordComparators.getRangeStartEndComparator(comp); - - this.gtFilter = gtFilter; - this.gtStartAndEnd = gtStartAndEnd; - this.gtPartitionCol = gtPartitionCol; - } - - public GTScanRequest planScanRequest() { - GTScanRequest scanRequest; - List<GTScanRange> scanRanges = this.planScanRanges(); - if (scanRanges != null && scanRanges.size() != 0) { - scanRequest = new GTScanRequestBuilder().setInfo(gtInfo).setRanges(scanRanges).setDimensions(gtDimensions).setAggrGroupBy(gtAggrGroups).setAggrMetrics(gtAggrMetrics).setAggrMetricsFuncs(gtAggrFuncs).setFilterPushDown(gtFilter).createGTScanRequest(); - } else { - scanRequest = null; - } - return scanRequest; - } - - /** - * Overwrite this method to provide smarter storage visit plans - * @return - */ - public List<GTScanRange> planScanRanges() { - TupleFilter flatFilter = flattenToOrAndFilter(gtFilter); - - List<Collection<ColumnRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter); - - List<GTScanRange> scanRanges = Lists.newArrayListWithCapacity(orAndDimRanges.size()); - for (Collection<ColumnRange> andDimRanges : orAndDimRanges) { - GTScanRange scanRange = newScanRange(andDimRanges); - if (scanRange != null) - scanRanges.add(scanRange); - } - - List<GTScanRange> mergedRanges = mergeOverlapRanges(scanRanges); - mergedRanges = mergeTooManyRanges(mergedRanges, maxScanRanges); - - return mergedRanges; - } - - private Set<TblColRef> replaceDerivedColumns(Set<TblColRef> input, CubeDesc cubeDesc) { - Set<TblColRef> ret = Sets.newHashSet(); - for (TblColRef col : input) { - if (cubeDesc.hasHostColumn(col)) { - for (TblColRef host : cubeDesc.getHostInfo(col).columns) { - ret.add(host); - } - } else { - ret.add(col); - } - } - return ret; - } - - protected GTScanRange newScanRange(Collection<ColumnRange> andDimRanges) { - GTRecord pkStart = new GTRecord(gtInfo); - GTRecord pkEnd = new GTRecord(gtInfo); - Map<Integer, Set<ByteArray>> fuzzyValues = Maps.newHashMap(); - - List<GTRecord> fuzzyKeys; - - for (ColumnRange range : andDimRanges) { - if (gtPartitionCol != null && range.column.equals(gtPartitionCol)) { - int beginCompare = rangeStartEndComparator.comparator.compare(range.begin, gtStartAndEnd.getSecond()); - int endCompare = rangeStartEndComparator.comparator.compare(gtStartAndEnd.getFirst(), range.end); - - if ((isPartitionColUsingDatetimeEncoding && endCompare <= 0 && beginCompare < 0) || (!isPartitionColUsingDatetimeEncoding && endCompare <= 0 && beginCompare <= 0)) { - //segment range is [Closed,Open), but segmentStartAndEnd.getSecond() might be rounded when using dict encoding, so use <= when has equals in condition. - } else { - logger.debug("Pre-check partition col filter failed, partitionColRef {}, segment start {}, segment end {}, range begin {}, range end {}", // - gtPartitionCol, makeReadable(gtStartAndEnd.getFirst()), makeReadable(gtStartAndEnd.getSecond()), makeReadable(range.begin), makeReadable(range.end)); - return null; - } - } - - int col = range.column.getColumnDesc().getZeroBasedIndex(); - if (!gtInfo.getPrimaryKey().get(col)) - continue; - - pkStart.set(col, range.begin); - pkEnd.set(col, range.end); - - if (range.valueSet != null && !range.valueSet.isEmpty()) { - fuzzyValues.put(col, range.valueSet); - } - } - - fuzzyKeys = - - buildFuzzyKeys(fuzzyValues); - return new GTScanRange(pkStart, pkEnd, fuzzyKeys); - } - - private List<GTRecord> buildFuzzyKeys(Map<Integer, Set<ByteArray>> fuzzyValueSet) { - ArrayList<GTRecord> result = Lists.newArrayList(); - - if (fuzzyValueSet.isEmpty()) - return result; - - // debug/profiling purpose - if (BackdoorToggles.getDisableFuzzyKey()) { - logger.info("The execution of this query will not use fuzzy key"); - return result; - } - - List<Map<Integer, ByteArray>> fuzzyValueCombinations = FuzzyValueCombination.calculate(fuzzyValueSet, maxFuzzyKeys); - - for (Map<Integer, ByteArray> fuzzyValue : fuzzyValueCombinations) { - - // BitSet bitSet = new BitSet(gtInfo.getColumnCount()); - // for (Map.Entry<Integer, ByteArray> entry : fuzzyValue.entrySet()) { - // bitSet.set(entry.getKey()); - // } - GTRecord fuzzy = new GTRecord(gtInfo); - for (Map.Entry<Integer, ByteArray> entry : fuzzyValue.entrySet()) { - fuzzy.set(entry.getKey(), entry.getValue()); - } - - result.add(fuzzy); - } - return result; - } - - protected List<GTScanRange> mergeOverlapRanges(List<GTScanRange> ranges) { - if (ranges.size() <= 1) { - return ranges; - } - - // sort ranges by start key - Collections.sort(ranges, new Comparator<GTScanRange>() { - @Override - public int compare(GTScanRange a, GTScanRange b) { - return rangeStartComparator.compare(a.pkStart, b.pkStart); - } - }); - - // merge the overlap range - List<GTScanRange> mergedRanges = new ArrayList<GTScanRange>(); - int mergeBeginIndex = 0; - GTRecord mergeEnd = ranges.get(0).pkEnd; - for (int index = 1; index < ranges.size(); index++) { - GTScanRange range = ranges.get(index); - - // if overlap, swallow it - if (rangeStartEndComparator.compare(range.pkStart, mergeEnd) <= 0) { - mergeEnd = rangeEndComparator.max(mergeEnd, range.pkEnd); - continue; - } - - // not overlap, split here - GTScanRange mergedRange = mergeKeyRange(ranges.subList(mergeBeginIndex, index)); - mergedRanges.add(mergedRange); - - // start new split - mergeBeginIndex = index; - mergeEnd = range.pkEnd; - } - - // don't miss the last range - GTScanRange mergedRange = mergeKeyRange(ranges.subList(mergeBeginIndex, ranges.size())); - mergedRanges.add(mergedRange); - - return mergedRanges; - } - - private GTScanRange mergeKeyRange(List<GTScanRange> ranges) { - GTScanRange first = ranges.get(0); - if (ranges.size() == 1) - return first; - - GTRecord start = first.pkStart; - GTRecord end = first.pkEnd; - List<GTRecord> newFuzzyKeys = new ArrayList<GTRecord>(); - - boolean hasNonFuzzyRange = false; - for (GTScanRange range : ranges) { - hasNonFuzzyRange = hasNonFuzzyRange || range.fuzzyKeys.isEmpty(); - newFuzzyKeys.addAll(range.fuzzyKeys); - end = rangeEndComparator.max(end, range.pkEnd); - } - - // if any range is non-fuzzy, then all fuzzy keys must be cleared - // also too many fuzzy keys will slow down HBase scan - if (hasNonFuzzyRange || newFuzzyKeys.size() > maxFuzzyKeys) { - newFuzzyKeys.clear(); - } - - return new GTScanRange(start, end, newFuzzyKeys); - } - - protected List<GTScanRange> mergeTooManyRanges(List<GTScanRange> ranges, int maxRanges) { - if (ranges.size() <= maxRanges) { - return ranges; - } - - // TODO: check the distance between range and merge the large distance range - List<GTScanRange> result = new ArrayList<GTScanRange>(1); - GTScanRange mergedRange = mergeKeyRange(ranges); - result.add(mergedRange); - return result; - } - - public int getMaxScanRanges() { - return maxScanRanges; - } - - public void setMaxScanRanges(int maxScanRanges) { - this.maxScanRanges = maxScanRanges; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java index ccf4895..db38484 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java @@ -138,7 +138,10 @@ public class GTAggregateScanner implements IGTScanner { long count = 0; for (GTRecord r : inputScanner) { - count++; + //check deadline + if (count % GTScanRequest.terminateCheckInterval == 1 && System.currentTimeMillis() > deadline) { + throw new GTScanTimeoutException("Timeout in GTAggregateScanner with scanned count " + count); + } if (getNumOfSpills() == 0) { //check limit @@ -152,10 +155,7 @@ public class GTAggregateScanner implements IGTScanner { aggrCache.aggregate(r, Integer.MAX_VALUE); } - //check deadline - if (count % 10000 == 1 && System.currentTimeMillis() > deadline) { - throw new GTScanTimeoutException("Timeout in GTAggregateScanner with scanned count " + count); - } + count++; } logger.info("GTAggregateScanner input rows: " + count); return aggrCache.iterator(); http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java index 31a9599..f1f84af 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java @@ -132,12 +132,12 @@ public class GTFilterScanner implements IGTScanner { } // cache the last one input and result, can reuse because rowkey are ordered, and same input could come in small group - static class FilterResultCache { + public static class FilterResultCache { static final int CHECKPOINT = 10000; static final double HIT_RATE_THRESHOLD = 0.5; - static boolean ENABLED = true; // enable cache by default + public static boolean ENABLED = true; // enable cache by default - boolean enabled = ENABLED; + public boolean enabled = ENABLED; ImmutableBitSet colsInFilter; int count; int hit; http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java index dd57e90..ba75962 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java @@ -18,7 +18,7 @@ package org.apache.kylin.gridtable; -public class GTScanExceedThresholdException extends RuntimeException { +public class GTScanExceedThresholdException extends GTScanSelfTerminatedException { public GTScanExceedThresholdException(String message) { super(message); http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java index 4cfba1b..5d27028 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java @@ -42,6 +42,8 @@ import com.google.common.collect.Sets; public class GTScanRequest { private static final Logger logger = LoggerFactory.getLogger(GTScanRequest.class); + //it's not necessary to increase the checkInterval to very large because the check cost is not high + public static final int terminateCheckInterval = 1000; private GTInfo info; private List<GTScanRange> ranges; @@ -55,13 +57,16 @@ public class GTScanRequest { private ImmutableBitSet aggrGroupBy; private ImmutableBitSet aggrMetrics; private String[] aggrMetricsFuncs;// - + // hint to storage behavior private boolean allowStorageAggregation; private double aggCacheMemThreshold; private int storageScanRowNumThreshold; private int storagePushDownLimit; + // runtime computed fields + private transient boolean doingStorageAggregation = false; + GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, // ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, boolean allowStorageAggregation, // double aggCacheMemThreshold, int storageScanRowNumThreshold, int storagePushDownLimit) { @@ -169,6 +174,7 @@ public class GTScanRequest { logger.info("pre aggregation is not beneficial, skip it"); } else if (this.hasAggregation()) { logger.info("pre aggregating results before returning"); + this.doingStorageAggregation = true; result = new GTAggregateScanner(result, this, deadline); } else { logger.info("has no aggregation, skip it"); @@ -178,6 +184,10 @@ public class GTScanRequest { } + public boolean isDoingStorageAggregation() { + return doingStorageAggregation; + } + //touch every byte of the cell so that the cost of scanning will be truly reflected private int lookAndForget(IGTScanner scanner) { byte meaninglessByte = 0; @@ -215,8 +225,8 @@ public class GTScanRequest { return ranges; } - public void setGTScanRanges(List<GTScanRange> ranges) { - this.ranges = ranges; + public void clearScanRanges() { + this.ranges = Lists.newArrayList(); } public ImmutableBitSet getSelectedColBlocks() { @@ -251,10 +261,6 @@ public class GTScanRequest { return allowStorageAggregation; } - public void setAllowStorageAggregation(boolean allowStorageAggregation) { - this.allowStorageAggregation = allowStorageAggregation; - } - public double getAggCacheMemThreshold() { if (aggCacheMemThreshold < 0) return 0; @@ -262,28 +268,18 @@ public class GTScanRequest { return aggCacheMemThreshold; } - public void setAggCacheMemThreshold(double gb) { - this.aggCacheMemThreshold = gb; + public void disableAggCacheMemCheck() { + this.aggCacheMemThreshold = 0; } public int getStorageScanRowNumThreshold() { return storageScanRowNumThreshold; } - public void setStorageScanRowNumThreshold(int storageScanRowNumThreshold) { - logger.info("storageScanRowNumThreshold is set to " + storageScanRowNumThreshold); - this.storageScanRowNumThreshold = storageScanRowNumThreshold; - } - public int getStoragePushDownLimit() { return this.storagePushDownLimit; } - public void setStoragePushDownLimit(int limit) { - logger.info("storagePushDownLimit is set to " + storagePushDownLimit); - this.storagePushDownLimit = limit; - } - @Override public String toString() { return "GTScanRequest [range=" + ranges + ", columns=" + columns + ", filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + Arrays.toString(aggrMetricsFuncs) + "]"; http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanSelfTerminatedException.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanSelfTerminatedException.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanSelfTerminatedException.java new file mode 100644 index 0000000..4775ac6 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanSelfTerminatedException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.gridtable; + +public class GTScanSelfTerminatedException extends RuntimeException { + + public GTScanSelfTerminatedException(String s) { + super(s); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java index e92dae3..17a8d02 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java @@ -18,7 +18,7 @@ package org.apache.kylin.gridtable; -public class GTScanTimeoutException extends RuntimeException { +public class GTScanTimeoutException extends GTScanSelfTerminatedException { public GTScanTimeoutException(String message) { super(message); http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java index b5f6de7..38b8c90 100644 --- a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java @@ -84,8 +84,7 @@ public class AggregationCacheSpillTest extends LocalFileMetadataTestCase { } }; - GTScanRequest scanRequest = new GTScanRequestBuilder().setInfo(INFO).setRanges(null).setDimensions(new ImmutableBitSet(0, 3)).setAggrGroupBy(new ImmutableBitSet(0, 3)).setAggrMetrics(new ImmutableBitSet(3, 6)).setAggrMetricsFuncs(new String[] { "SUM", "SUM", "COUNT_DISTINCT" }).setFilterPushDown(null).createGTScanRequest(); - scanRequest.setAggCacheMemThreshold(0.5); + GTScanRequest scanRequest = new GTScanRequestBuilder().setInfo(INFO).setRanges(null).setDimensions(new ImmutableBitSet(0, 3)).setAggrGroupBy(new ImmutableBitSet(0, 3)).setAggrMetrics(new ImmutableBitSet(3, 6)).setAggrMetricsFuncs(new String[] { "SUM", "SUM", "COUNT_DISTINCT" }).setFilterPushDown(null).setAggCacheMemThreshold(0.5).createGTScanRequest(); GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest, Long.MAX_VALUE); @@ -127,8 +126,7 @@ public class AggregationCacheSpillTest extends LocalFileMetadataTestCase { }; // all-in-mem testcase - GTScanRequest scanRequest = new GTScanRequestBuilder().setInfo(INFO).setRanges(null).setDimensions(new ImmutableBitSet(0, 3)).setAggrGroupBy(new ImmutableBitSet(1, 3)).setAggrMetrics(new ImmutableBitSet(3, 6)).setAggrMetricsFuncs(new String[] { "SUM", "SUM", "COUNT_DISTINCT" }).setFilterPushDown(null).createGTScanRequest(); - scanRequest.setAggCacheMemThreshold(0.5); + GTScanRequest scanRequest = new GTScanRequestBuilder().setInfo(INFO).setRanges(null).setDimensions(new ImmutableBitSet(0, 3)).setAggrGroupBy(new ImmutableBitSet(1, 3)).setAggrMetrics(new ImmutableBitSet(3, 6)).setAggrMetricsFuncs(new String[] { "SUM", "SUM", "COUNT_DISTINCT" }).setFilterPushDown(null).setAggCacheMemThreshold(0.5).createGTScanRequest(); GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest, Long.MAX_VALUE); http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java deleted file mode 100644 index 7b6d3fa..0000000 --- a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java +++ /dev/null @@ -1,617 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.gridtable; - -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.BitSet; -import java.util.List; - -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.BytesSerializer; -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.common.util.ImmutableBitSet; -import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.cube.gridtable.CubeCodeSystem; -import org.apache.kylin.cube.gridtable.CubeScanRangePlanner; -import org.apache.kylin.dict.NumberDictionaryBuilder; -import org.apache.kylin.dict.StringBytesConverter; -import org.apache.kylin.dict.TrieDictionaryBuilder; -import org.apache.kylin.dimension.DictionaryDimEnc; -import org.apache.kylin.dimension.DimensionEncoding; -import org.apache.kylin.gridtable.GTFilterScanner.FilterResultCache; -import org.apache.kylin.gridtable.GTInfo.Builder; -import org.apache.kylin.gridtable.memstore.GTSimpleMemStore; -import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.metadata.datatype.LongMutable; -import org.apache.kylin.metadata.filter.ColumnTupleFilter; -import org.apache.kylin.metadata.filter.CompareTupleFilter; -import org.apache.kylin.metadata.filter.ConstantTupleFilter; -import org.apache.kylin.metadata.filter.ExtractTupleFilter; -import org.apache.kylin.metadata.filter.LogicalTupleFilter; -import org.apache.kylin.metadata.filter.TupleFilter; -import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; -import org.apache.kylin.metadata.model.ColumnDesc; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.model.TblColRef; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import com.google.common.collect.Lists; - -public class DictGridTableTest extends LocalFileMetadataTestCase { - - private GridTable table; - private GTInfo info; - private CompareTupleFilter timeComp0; - private CompareTupleFilter timeComp1; - private CompareTupleFilter timeComp2; - private CompareTupleFilter timeComp3; - private CompareTupleFilter timeComp4; - private CompareTupleFilter timeComp5; - private CompareTupleFilter timeComp6; - private CompareTupleFilter ageComp1; - private CompareTupleFilter ageComp2; - private CompareTupleFilter ageComp3; - private CompareTupleFilter ageComp4; - - @After - public void after() throws Exception { - - this.cleanupTestMetadata(); - } - - @Before - public void setup() throws IOException { - - this.createTestMetadata(); - - table = newTestTable(); - info = table.getInfo(); - - timeComp0 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-14")); - timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14")); - timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13")); - timeComp3 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-15")); - timeComp4 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-15")); - timeComp5 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-15")); - timeComp6 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-14")); - ageComp1 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "10")); - ageComp2 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "20")); - ageComp3 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "30")); - ageComp4 = compare(info.colRef(1), FilterOperatorEnum.NEQ, enc(info, 1, "30")); - - } - - @Test - public void verifySegmentSkipping() { - - ByteArray segmentStart = enc(info, 0, "2015-01-14"); - ByteArray segmentStartX = enc(info, 0, "2015-01-14 00:00:00");//when partition col is dict encoded, time format will be free - ByteArray segmentEnd = enc(info, 0, "2015-01-15"); - assertEquals(segmentStart, segmentStartX); - - { - LogicalTupleFilter filter = and(timeComp0, ageComp1); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); - List<GTScanRange> r = planner.planScanRanges(); - assertEquals(1, r.size());//scan range are [close,close] - assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString()); - assertEquals(1, r.get(0).fuzzyKeys.size()); - assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString()); - } - { - LogicalTupleFilter filter = and(timeComp2, ageComp1); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); - List<GTScanRange> r = planner.planScanRanges(); - assertEquals(0, r.size()); - } - { - LogicalTupleFilter filter = and(timeComp4, ageComp1); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); - List<GTScanRange> r = planner.planScanRanges(); - assertEquals(0, r.size()); - } - { - LogicalTupleFilter filter = and(timeComp5, ageComp1); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); - List<GTScanRange> r = planner.planScanRanges(); - assertEquals(0, r.size()); - } - { - LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1), and(timeComp6, ageComp1)); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); - List<GTScanRange> r = planner.planScanRanges(); - assertEquals(1, r.size()); - assertEquals("[1421193600000, 10]-[null, 10]", r.get(0).toString()); - assertEquals("[[null, 10, null, null, null], [1421193600000, 10, null, null, null]]", r.get(0).fuzzyKeys.toString()); - } - { - LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); - List<GTScanRange> r = planner.planScanRanges(); - assertEquals(1, r.size()); - assertEquals("[1421193600000, null]-[null, null]", r.get(0).toString()); - assertEquals(0, r.get(0).fuzzyKeys.size()); - } - { - //skip FALSE filter - LogicalTupleFilter filter = and(ageComp1, ConstantTupleFilter.FALSE); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); - List<GTScanRange> r = planner.planScanRanges(); - assertEquals(0, r.size()); - } - { - //TRUE or FALSE filter - LogicalTupleFilter filter = or(ConstantTupleFilter.TRUE, ConstantTupleFilter.FALSE); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); - List<GTScanRange> r = planner.planScanRanges(); - assertEquals(1, r.size()); - assertEquals("[null, null]-[null, null]", r.get(0).toString()); - } - { - //TRUE or other filter - LogicalTupleFilter filter = or(ageComp1, ConstantTupleFilter.TRUE); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); - List<GTScanRange> r = planner.planScanRanges(); - assertEquals(1, r.size()); - assertEquals("[null, null]-[null, null]", r.get(0).toString()); - } - } - - @Test - public void verifySegmentSkipping2() { - ByteArray segmentEnd = enc(info, 0, "2015-01-15"); - - { - LogicalTupleFilter filter = and(timeComp0, ageComp1); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), info.colRef(0), filter); - List<GTScanRange> r = planner.planScanRanges(); - assertEquals(1, r.size());//scan range are [close,close] - assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString()); - assertEquals(1, r.get(0).fuzzyKeys.size()); - assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString()); - } - - { - LogicalTupleFilter filter = and(timeComp5, ageComp1); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), info.colRef(0), filter); - List<GTScanRange> r = planner.planScanRanges(); - assertEquals(0, r.size());//scan range are [close,close] - } - } - - @Test - public void verifyScanRangePlanner() { - - // flatten or-and & hbase fuzzy value - { - LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2)); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter); - List<GTScanRange> r = planner.planScanRanges(); - assertEquals(1, r.size()); - assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString()); - assertEquals("[[null, 10, null, null, null], [null, 20, null, null, null]]", r.get(0).fuzzyKeys.toString()); - } - - // pre-evaluate ever false - { - LogicalTupleFilter filter = and(timeComp1, timeComp2); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter); - List<GTScanRange> r = planner.planScanRanges(); - assertEquals(0, r.size()); - } - - // pre-evaluate ever true - { - LogicalTupleFilter filter = or(timeComp1, ageComp4); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter); - List<GTScanRange> r = planner.planScanRanges(); - assertEquals("[[null, null]-[null, null]]", r.toString()); - } - - // merge overlap range - { - LogicalTupleFilter filter = or(timeComp1, timeComp3); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter); - List<GTScanRange> r = planner.planScanRanges(); - assertEquals("[[null, null]-[null, null]]", r.toString()); - } - - // merge too many ranges - { - LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2), and(timeComp4, ageComp3)); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter); - List<GTScanRange> r = planner.planScanRanges(); - assertEquals(3, r.size()); - assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString()); - assertEquals("[1421280000000, 20]-[1421280000000, 20]", r.get(1).toString()); - assertEquals("[1421280000000, 30]-[1421280000000, 30]", r.get(2).toString()); - planner.setMaxScanRanges(2); - List<GTScanRange> r2 = planner.planScanRanges(); - assertEquals("[[1421280000000, 10]-[1421280000000, 30]]", r2.toString()); - } - } - - @Test - public void verifyFirstRow() throws IOException { - doScanAndVerify(table, new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest(), "[1421193600000, 30, Yang, 10, 10.5]", // - "[1421193600000, 30, Luke, 10, 10.5]", // - "[1421280000000, 20, Dong, 10, 10.5]", // - "[1421280000000, 20, Jason, 10, 10.5]", // - "[1421280000000, 30, Xu, 10, 10.5]", // - "[1421366400000, 20, Mahone, 10, 10.5]", // - "[1421366400000, 20, Qianhao, 10, 10.5]", // - "[1421366400000, 30, George, 10, 10.5]", // - "[1421366400000, 30, Shaofeng, 10, 10.5]", // - "[1421452800000, 10, Kejia, 10, 10.5]"); - } - - //for testing GTScanRequest serialization and deserialization - public static GTScanRequest useDeserializedGTScanRequest(GTScanRequest origin) { - ByteBuffer buffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE); - GTScanRequest.serializer.serialize(origin, buffer); - buffer.flip(); - GTScanRequest sGTScanRequest = GTScanRequest.serializer.deserialize(buffer); - - Assert.assertArrayEquals(origin.getAggrMetricsFuncs(), sGTScanRequest.getAggrMetricsFuncs()); - Assert.assertEquals(origin.getAggCacheMemThreshold(), sGTScanRequest.getAggCacheMemThreshold(), 0.01); - return sGTScanRequest; - } - - @Test - public void verifyScanWithUnevaluatableFilter() throws IOException { - GTInfo info = table.getInfo(); - - CompareTupleFilter fComp = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14")); - ExtractTupleFilter fUnevaluatable = unevaluatable(info.colRef(1)); - LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1))); - LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable); - - GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[]{"sum"}).setFilterPushDown(filter).createGTScanRequest(); - - // note the unEvaluatable column 1 in filter is added to group by - assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString()); - - doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 20, null]", "[1421280000000, 30, null, 10, null]", "[1421366400000, 20, null, 20, null]", "[1421366400000, 30, null, 20, null]", "[1421452800000, 10, null, 10, null]"); - } - - @Test - public void verifyScanWithEvaluatableFilter() throws IOException { - GTInfo info = table.getInfo(); - - CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14")); - CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10")); - LogicalTupleFilter filter = and(fComp1, fComp2); - - GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[]{"sum"}).setFilterPushDown(filter).createGTScanRequest(); - // note the evaluatable column 1 in filter is added to returned columns but not in group by - assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString()); - - doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 30, null]", "[1421366400000, 20, null, 40, null]"); - } - - @Test - public void testFilterScannerPerf() throws IOException { - GridTable table = newTestPerfTable(); - GTInfo info = table.getInfo(); - - CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14")); - CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10")); - LogicalTupleFilter filter = and(fComp1, fComp2); - - FilterResultCache.ENABLED = false; - testFilterScannerPerfInner(table, info, filter); - FilterResultCache.ENABLED = true; - testFilterScannerPerfInner(table, info, filter); - FilterResultCache.ENABLED = false; - testFilterScannerPerfInner(table, info, filter); - FilterResultCache.ENABLED = true; - testFilterScannerPerfInner(table, info, filter); - } - - @SuppressWarnings("unused") - private void testFilterScannerPerfInner(GridTable table, GTInfo info, LogicalTupleFilter filter) throws IOException { - long start = System.currentTimeMillis(); - GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(filter).createGTScanRequest(); - IGTScanner scanner = table.scan(req); - int i = 0; - for (GTRecord r : scanner) { - i++; - } - scanner.close(); - long end = System.currentTimeMillis(); - System.out.println((end - start) + "ms with filter cache enabled=" + FilterResultCache.ENABLED + ", " + i + " rows"); - } - - @Test - public void verifyConvertFilterConstants1() { - GTInfo info = table.getInfo(); - - TableDesc extTable = TableDesc.mockup("ext"); - TblColRef extColA = ColumnDesc.mockup(extTable, 1, "A", "timestamp").getRef(); - TblColRef extColB = ColumnDesc.mockup(extTable, 2, "B", "integer").getRef(); - - CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14"); - CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.EQ, "10"); - LogicalTupleFilter filter = and(fComp1, fComp2); - - List<TblColRef> colMapping = Lists.newArrayList(); - colMapping.add(extColA); - colMapping.add(extColB); - - TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null); - assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 EQ [\\x00]]", newFilter.toString()); - } - - @Test - public void verifyConvertFilterConstants2() { - GTInfo info = table.getInfo(); - - TableDesc extTable = TableDesc.mockup("ext"); - TblColRef extColA = ColumnDesc.mockup(extTable, 1, "A", "timestamp").getRef(); - TblColRef extColB = ColumnDesc.mockup(extTable, 2, "B", "integer").getRef(); - - CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14"); - CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.LT, "9"); - LogicalTupleFilter filter = and(fComp1, fComp2); - - List<TblColRef> colMapping = Lists.newArrayList(); - colMapping.add(extColA); - colMapping.add(extColB); - - // $1<"9" round up to $1<"10" - TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null); - assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 LT [\\x00]]", newFilter.toString()); - } - - @Test - public void verifyConvertFilterConstants3() { - GTInfo info = table.getInfo(); - - TableDesc extTable = TableDesc.mockup("ext"); - TblColRef extColA = ColumnDesc.mockup(extTable, 1, "A", "timestamp").getRef(); - TblColRef extColB = ColumnDesc.mockup(extTable, 2, "B", "integer").getRef(); - - CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14"); - CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.LTE, "9"); - LogicalTupleFilter filter = and(fComp1, fComp2); - - List<TblColRef> colMapping = Lists.newArrayList(); - colMapping.add(extColA); - colMapping.add(extColB); - - // $1<="9" round down to FALSE - TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null); - assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], []]", newFilter.toString()); - } - - @Test - public void verifyConvertFilterConstants4() { - GTInfo info = table.getInfo(); - - TableDesc extTable = TableDesc.mockup("ext"); - TblColRef extColA = ColumnDesc.mockup(extTable, 1, "A", "timestamp").getRef(); - TblColRef extColB = ColumnDesc.mockup(extTable, 2, "B", "integer").getRef(); - - CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14"); - CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.IN, "9", "10", "15"); - LogicalTupleFilter filter = and(fComp1, fComp2); - - List<TblColRef> colMapping = Lists.newArrayList(); - colMapping.add(extColA); - colMapping.add(extColB); - - // $1 in ("9", "10", "15") has only "10" left - TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null); - assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 IN [\\x00]]", newFilter.toString()); - } - - private void doScanAndVerify(GridTable table, GTScanRequest req, String... verifyRows) throws IOException { - System.out.println(req); - IGTScanner scanner = table.scan(req); - int i = 0; - for (GTRecord r : scanner) { - System.out.println(r); - if (verifyRows == null || i >= verifyRows.length) { - Assert.fail(); - } - assertEquals(verifyRows[i], r.toString()); - i++; - } - scanner.close(); - } - - public static ByteArray enc(GTInfo info, int col, String value) { - ByteBuffer buf = ByteBuffer.allocate(info.getMaxColumnLength()); - info.codeSystem.encodeColumnValue(col, value, buf); - return ByteArray.copyOf(buf.array(), buf.arrayOffset(), buf.position()); - } - - public static ExtractTupleFilter unevaluatable(TblColRef col) { - ExtractTupleFilter r = new ExtractTupleFilter(FilterOperatorEnum.EXTRACT); - r.addChild(new ColumnTupleFilter(col)); - return r; - } - - public static CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op, Object... value) { - CompareTupleFilter result = new CompareTupleFilter(op); - result.addChild(new ColumnTupleFilter(col)); - result.addChild(new ConstantTupleFilter(Arrays.asList(value))); - return result; - } - - public static LogicalTupleFilter and(TupleFilter... children) { - return logic(FilterOperatorEnum.AND, children); - } - - public static LogicalTupleFilter or(TupleFilter... children) { - return logic(FilterOperatorEnum.OR, children); - } - - public static LogicalTupleFilter not(TupleFilter child) { - return logic(FilterOperatorEnum.NOT, child); - } - - public static LogicalTupleFilter logic(FilterOperatorEnum op, TupleFilter... children) { - LogicalTupleFilter result = new LogicalTupleFilter(op); - for (TupleFilter c : children) { - result.addChild(c); - } - return result; - } - - public static GridTable newTestTable() throws IOException { - GTInfo info = newInfo(); - GTSimpleMemStore store = new GTSimpleMemStore(info); - GridTable table = new GridTable(info, store); - - GTRecord r = new GTRecord(table.getInfo()); - GTBuilder builder = table.rebuild(); - - builder.write(r.setValues("2015-01-14", "30", "Yang", new LongMutable(10), new BigDecimal("10.5"))); - builder.write(r.setValues("2015-01-14", "30", "Luke", new LongMutable(10), new BigDecimal("10.5"))); - builder.write(r.setValues("2015-01-15", "20", "Dong", new LongMutable(10), new BigDecimal("10.5"))); - builder.write(r.setValues("2015-01-15", "20", "Jason", new LongMutable(10), new BigDecimal("10.5"))); - builder.write(r.setValues("2015-01-15", "30", "Xu", new LongMutable(10), new BigDecimal("10.5"))); - builder.write(r.setValues("2015-01-16", "20", "Mahone", new LongMutable(10), new BigDecimal("10.5"))); - builder.write(r.setValues("2015-01-16", "20", "Qianhao", new LongMutable(10), new BigDecimal("10.5"))); - builder.write(r.setValues("2015-01-16", "30", "George", new LongMutable(10), new BigDecimal("10.5"))); - builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new LongMutable(10), new BigDecimal("10.5"))); - builder.write(r.setValues("2015-01-17", "10", "Kejia", new LongMutable(10), new BigDecimal("10.5"))); - builder.close(); - - return table; - } - - static GridTable newTestPerfTable() throws IOException { - GTInfo info = newInfo(); - GTSimpleMemStore store = new GTSimpleMemStore(info); - GridTable table = new GridTable(info, store); - - GTRecord r = new GTRecord(table.getInfo()); - GTBuilder builder = table.rebuild(); - - for (int i = 0; i < 100000; i++) { - for (int j = 0; j < 10; j++) - builder.write(r.setValues("2015-01-14", "30", "Yang", new LongMutable(10), new BigDecimal("10.5"))); - - for (int j = 0; j < 10; j++) - builder.write(r.setValues("2015-01-14", "30", "Luke", new LongMutable(10), new BigDecimal("10.5"))); - - for (int j = 0; j < 10; j++) - builder.write(r.setValues("2015-01-15", "20", "Dong", new LongMutable(10), new BigDecimal("10.5"))); - - for (int j = 0; j < 10; j++) - builder.write(r.setValues("2015-01-15", "20", "Jason", new LongMutable(10), new BigDecimal("10.5"))); - - for (int j = 0; j < 10; j++) - builder.write(r.setValues("2015-01-15", "30", "Xu", new LongMutable(10), new BigDecimal("10.5"))); - - for (int j = 0; j < 10; j++) - builder.write(r.setValues("2015-01-16", "20", "Mahone", new LongMutable(10), new BigDecimal("10.5"))); - - for (int j = 0; j < 10; j++) - builder.write(r.setValues("2015-01-16", "20", "Qianhao", new LongMutable(10), new BigDecimal("10.5"))); - - for (int j = 0; j < 10; j++) - builder.write(r.setValues("2015-01-16", "30", "George", new LongMutable(10), new BigDecimal("10.5"))); - - for (int j = 0; j < 10; j++) - builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new LongMutable(10), new BigDecimal("10.5"))); - - for (int j = 0; j < 10; j++) - builder.write(r.setValues("2015-01-17", "10", "Kejia", new LongMutable(10), new BigDecimal("10.5"))); - } - builder.close(); - - return table; - } - - static GTInfo newInfo() { - Builder builder = GTInfo.builder(); - builder.setCodeSystem(newDictCodeSystem()); - builder.setColumns( // - DataType.getType("timestamp"), // - DataType.getType("integer"), // - DataType.getType("varchar(10)"), // - DataType.getType("bigint"), // - DataType.getType("decimal") // - ); - builder.setPrimaryKey(setOf(0, 1)); - builder.setColumnPreferIndex(setOf(0)); - builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0, 1), setOf(2), setOf(3, 4) }); - builder.enableRowBlock(4); - GTInfo info = builder.build(); - return info; - } - - @SuppressWarnings("unchecked") - private static CubeCodeSystem newDictCodeSystem() { - DimensionEncoding[] dimEncs = new DimensionEncoding[3]; - dimEncs[1] = new DictionaryDimEnc(newDictionaryOfInteger()); - dimEncs[2] = new DictionaryDimEnc(newDictionaryOfString()); - return new CubeCodeSystem(dimEncs); - } - - @SuppressWarnings("rawtypes") - private static Dictionary newDictionaryOfString() { - TrieDictionaryBuilder<String> builder = new TrieDictionaryBuilder<>(new StringBytesConverter()); - builder.addValue("Dong"); - builder.addValue("George"); - builder.addValue("Jason"); - builder.addValue("Kejia"); - builder.addValue("Luke"); - builder.addValue("Mahone"); - builder.addValue("Qianhao"); - builder.addValue("Shaofeng"); - builder.addValue("Xu"); - builder.addValue("Yang"); - return builder.build(0); - } - - @SuppressWarnings("rawtypes") - private static Dictionary newDictionaryOfInteger() { - NumberDictionaryBuilder<String> builder = new NumberDictionaryBuilder<>(new StringBytesConverter()); - builder.addValue("10"); - builder.addValue("20"); - builder.addValue("30"); - builder.addValue("40"); - builder.addValue("50"); - builder.addValue("60"); - builder.addValue("70"); - builder.addValue("80"); - builder.addValue("90"); - builder.addValue("100"); - return builder.build(0); - } - - public static ImmutableBitSet setOf(int... values) { - BitSet set = new BitSet(); - for (int i : values) - set.set(i); - return new ImmutableBitSet(set); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java new file mode 100644 index 0000000..9f505f3 --- /dev/null +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.gtrecord; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.debug.BackdoorToggles; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.common.FuzzyValueCombination; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.gridtable.CubeGridTable; +import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; +import org.apache.kylin.cube.gridtable.RecordComparators; +import org.apache.kylin.cube.gridtable.ScanRangePlannerBase; +import org.apache.kylin.cube.gridtable.SegmentGTStartAndEnd; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GTScanRange; +import org.apache.kylin.gridtable.GTScanRequest; +import org.apache.kylin.gridtable.GTScanRequestBuilder; +import org.apache.kylin.gridtable.GTUtil; +import org.apache.kylin.gridtable.IGTComparator; +import org.apache.kylin.metadata.filter.TupleFilter; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.storage.StorageContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +public class CubeScanRangePlanner extends ScanRangePlannerBase { + + private static final Logger logger = LoggerFactory.getLogger(CubeScanRangePlanner.class); + + protected int maxScanRanges; + protected int maxFuzzyKeys; + + //non-GT + protected CubeSegment cubeSegment; + protected CubeDesc cubeDesc; + protected Cuboid cuboid; + + protected StorageContext context; + + public CubeScanRangePlanner(CubeSegment cubeSegment, Cuboid cuboid, TupleFilter filter, Set<TblColRef> dimensions, Set<TblColRef> groupbyDims, // + Collection<FunctionDesc> metrics, StorageContext context) { + this.context = context; + + this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax(); + this.maxFuzzyKeys = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax(); + + this.cubeSegment = cubeSegment; + this.cubeDesc = cubeSegment.getCubeDesc(); + this.cuboid = cuboid; + + Set<TblColRef> filterDims = Sets.newHashSet(); + TupleFilter.collectColumns(filter, filterDims); + + this.gtInfo = CubeGridTable.newGTInfo(cubeSegment, cuboid.getId()); + CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping(); + + IGTComparator comp = gtInfo.getCodeSystem().getComparator(); + //start key GTRecord compare to start key GTRecord + this.rangeStartComparator = RecordComparators.getRangeStartComparator(comp); + //stop key GTRecord compare to stop key GTRecord + this.rangeEndComparator = RecordComparators.getRangeEndComparator(comp); + //start key GTRecord compare to stop key GTRecord + this.rangeStartEndComparator = RecordComparators.getRangeStartEndComparator(comp); + + //replace the constant values in filter to dictionary codes + this.gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, gtInfo, mapping.getCuboidDimensionsInGTOrder(), groupbyDims); + + this.gtDimensions = mapping.makeGridTableColumns(dimensions); + this.gtAggrGroups = mapping.makeGridTableColumns(replaceDerivedColumns(groupbyDims, cubeSegment.getCubeDesc())); + this.gtAggrMetrics = mapping.makeGridTableColumns(metrics); + this.gtAggrFuncs = mapping.makeAggrFuncs(metrics); + + if (cubeSegment.getModel().getPartitionDesc().isPartitioned()) { + int index = mapping.getIndexOf(cubeSegment.getModel().getPartitionDesc().getPartitionDateColumnRef()); + if (index >= 0) { + SegmentGTStartAndEnd segmentGTStartAndEnd = new SegmentGTStartAndEnd(cubeSegment, gtInfo); + this.gtStartAndEnd = segmentGTStartAndEnd.getSegmentStartAndEnd(index); + this.isPartitionColUsingDatetimeEncoding = segmentGTStartAndEnd.isUsingDatetimeEncoding(index); + this.gtPartitionCol = gtInfo.colRef(index); + } + } + + } + + /** + * constrcut GTScanRangePlanner with incomplete information. only be used for UT + * @param info + * @param gtStartAndEnd + * @param gtPartitionCol + * @param gtFilter + */ + public CubeScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> gtStartAndEnd, TblColRef gtPartitionCol, TupleFilter gtFilter) { + + this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax(); + this.maxFuzzyKeys = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax(); + + this.gtInfo = info; + + IGTComparator comp = gtInfo.getCodeSystem().getComparator(); + //start key GTRecord compare to start key GTRecord + this.rangeStartComparator = RecordComparators.getRangeStartComparator(comp); + //stop key GTRecord compare to stop key GTRecord + this.rangeEndComparator = RecordComparators.getRangeEndComparator(comp); + //start key GTRecord compare to stop key GTRecord + this.rangeStartEndComparator = RecordComparators.getRangeStartEndComparator(comp); + + this.gtFilter = gtFilter; + this.gtStartAndEnd = gtStartAndEnd; + this.gtPartitionCol = gtPartitionCol; + } + + public GTScanRequest planScanRequest() { + GTScanRequest scanRequest; + List<GTScanRange> scanRanges = this.planScanRanges(); + if (scanRanges != null && scanRanges.size() != 0) { + GTScanRequestBuilder builder = new GTScanRequestBuilder().setInfo(gtInfo).setRanges(scanRanges).setDimensions(gtDimensions).// + setAggrGroupBy(gtAggrGroups).setAggrMetrics(gtAggrMetrics).setAggrMetricsFuncs(gtAggrFuncs).setFilterPushDown(gtFilter).// + setAllowStorageAggregation(context.isNeedStorageAggregation()).setAggCacheMemThreshold(cubeSegment.getCubeInstance().getConfig().getQueryCoprocessorMemGB()).// + setStorageScanRowNumThreshold(context.getThreshold()); + + if (cubeDesc.supportsLimitPushDown()) { + builder.setStoragePushDownLimit(context.getStoragePushDownLimit()); + } + scanRequest = builder.createGTScanRequest(); + } else { + scanRequest = null; + } + return scanRequest; + } + + /** + * Overwrite this method to provide smarter storage visit plans + * @return + */ + public List<GTScanRange> planScanRanges() { + TupleFilter flatFilter = flattenToOrAndFilter(gtFilter); + + List<Collection<ColumnRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter); + + List<GTScanRange> scanRanges = Lists.newArrayListWithCapacity(orAndDimRanges.size()); + for (Collection<ColumnRange> andDimRanges : orAndDimRanges) { + GTScanRange scanRange = newScanRange(andDimRanges); + if (scanRange != null) + scanRanges.add(scanRange); + } + + List<GTScanRange> mergedRanges = mergeOverlapRanges(scanRanges); + mergedRanges = mergeTooManyRanges(mergedRanges, maxScanRanges); + + return mergedRanges; + } + + private Set<TblColRef> replaceDerivedColumns(Set<TblColRef> input, CubeDesc cubeDesc) { + Set<TblColRef> ret = Sets.newHashSet(); + for (TblColRef col : input) { + if (cubeDesc.hasHostColumn(col)) { + for (TblColRef host : cubeDesc.getHostInfo(col).columns) { + ret.add(host); + } + } else { + ret.add(col); + } + } + return ret; + } + + protected GTScanRange newScanRange(Collection<ColumnRange> andDimRanges) { + GTRecord pkStart = new GTRecord(gtInfo); + GTRecord pkEnd = new GTRecord(gtInfo); + Map<Integer, Set<ByteArray>> fuzzyValues = Maps.newHashMap(); + + List<GTRecord> fuzzyKeys; + + for (ColumnRange range : andDimRanges) { + if (gtPartitionCol != null && range.column.equals(gtPartitionCol)) { + int beginCompare = rangeStartEndComparator.comparator.compare(range.begin, gtStartAndEnd.getSecond()); + int endCompare = rangeStartEndComparator.comparator.compare(gtStartAndEnd.getFirst(), range.end); + + if ((isPartitionColUsingDatetimeEncoding && endCompare <= 0 && beginCompare < 0) || (!isPartitionColUsingDatetimeEncoding && endCompare <= 0 && beginCompare <= 0)) { + //segment range is [Closed,Open), but segmentStartAndEnd.getSecond() might be rounded when using dict encoding, so use <= when has equals in condition. + } else { + logger.debug("Pre-check partition col filter failed, partitionColRef {}, segment start {}, segment end {}, range begin {}, range end {}", // + gtPartitionCol, makeReadable(gtStartAndEnd.getFirst()), makeReadable(gtStartAndEnd.getSecond()), makeReadable(range.begin), makeReadable(range.end)); + return null; + } + } + + int col = range.column.getColumnDesc().getZeroBasedIndex(); + if (!gtInfo.getPrimaryKey().get(col)) + continue; + + pkStart.set(col, range.begin); + pkEnd.set(col, range.end); + + if (range.valueSet != null && !range.valueSet.isEmpty()) { + fuzzyValues.put(col, range.valueSet); + } + } + + fuzzyKeys = + + buildFuzzyKeys(fuzzyValues); + return new GTScanRange(pkStart, pkEnd, fuzzyKeys); + } + + private List<GTRecord> buildFuzzyKeys(Map<Integer, Set<ByteArray>> fuzzyValueSet) { + ArrayList<GTRecord> result = Lists.newArrayList(); + + if (fuzzyValueSet.isEmpty()) + return result; + + // debug/profiling purpose + if (BackdoorToggles.getDisableFuzzyKey()) { + logger.info("The execution of this query will not use fuzzy key"); + return result; + } + + List<Map<Integer, ByteArray>> fuzzyValueCombinations = FuzzyValueCombination.calculate(fuzzyValueSet, maxFuzzyKeys); + + for (Map<Integer, ByteArray> fuzzyValue : fuzzyValueCombinations) { + + // BitSet bitSet = new BitSet(gtInfo.getColumnCount()); + // for (Map.Entry<Integer, ByteArray> entry : fuzzyValue.entrySet()) { + // bitSet.set(entry.getKey()); + // } + GTRecord fuzzy = new GTRecord(gtInfo); + for (Map.Entry<Integer, ByteArray> entry : fuzzyValue.entrySet()) { + fuzzy.set(entry.getKey(), entry.getValue()); + } + + result.add(fuzzy); + } + return result; + } + + protected List<GTScanRange> mergeOverlapRanges(List<GTScanRange> ranges) { + if (ranges.size() <= 1) { + return ranges; + } + + // sort ranges by start key + Collections.sort(ranges, new Comparator<GTScanRange>() { + @Override + public int compare(GTScanRange a, GTScanRange b) { + return rangeStartComparator.compare(a.pkStart, b.pkStart); + } + }); + + // merge the overlap range + List<GTScanRange> mergedRanges = new ArrayList<GTScanRange>(); + int mergeBeginIndex = 0; + GTRecord mergeEnd = ranges.get(0).pkEnd; + for (int index = 1; index < ranges.size(); index++) { + GTScanRange range = ranges.get(index); + + // if overlap, swallow it + if (rangeStartEndComparator.compare(range.pkStart, mergeEnd) <= 0) { + mergeEnd = rangeEndComparator.max(mergeEnd, range.pkEnd); + continue; + } + + // not overlap, split here + GTScanRange mergedRange = mergeKeyRange(ranges.subList(mergeBeginIndex, index)); + mergedRanges.add(mergedRange); + + // start new split + mergeBeginIndex = index; + mergeEnd = range.pkEnd; + } + + // don't miss the last range + GTScanRange mergedRange = mergeKeyRange(ranges.subList(mergeBeginIndex, ranges.size())); + mergedRanges.add(mergedRange); + + return mergedRanges; + } + + private GTScanRange mergeKeyRange(List<GTScanRange> ranges) { + GTScanRange first = ranges.get(0); + if (ranges.size() == 1) + return first; + + GTRecord start = first.pkStart; + GTRecord end = first.pkEnd; + List<GTRecord> newFuzzyKeys = new ArrayList<GTRecord>(); + + boolean hasNonFuzzyRange = false; + for (GTScanRange range : ranges) { + hasNonFuzzyRange = hasNonFuzzyRange || range.fuzzyKeys.isEmpty(); + newFuzzyKeys.addAll(range.fuzzyKeys); + end = rangeEndComparator.max(end, range.pkEnd); + } + + // if any range is non-fuzzy, then all fuzzy keys must be cleared + // also too many fuzzy keys will slow down HBase scan + if (hasNonFuzzyRange || newFuzzyKeys.size() > maxFuzzyKeys) { + newFuzzyKeys.clear(); + } + + return new GTScanRange(start, end, newFuzzyKeys); + } + + protected List<GTScanRange> mergeTooManyRanges(List<GTScanRange> ranges, int maxRanges) { + if (ranges.size() <= maxRanges) { + return ranges; + } + + // TODO: check the distance between range and merge the large distance range + List<GTScanRange> result = new ArrayList<GTScanRange>(1); + GTScanRange mergedRange = mergeKeyRange(ranges); + result.add(mergedRange); + return result; + } + + public int getMaxScanRanges() { + return maxScanRanges; + } + + public void setMaxScanRanges(int maxScanRanges) { + this.maxScanRanges = maxScanRanges; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java index 3b9d9c6..f32831a 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java @@ -23,10 +23,8 @@ import java.util.Collection; import java.util.Iterator; import java.util.Set; -import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.gridtable.CubeScanRangePlanner; import org.apache.kylin.dict.BuiltInFunctionTransformer; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; @@ -67,23 +65,13 @@ public class CubeSegmentScanner implements IGTScanner { ITupleFilterTransformer translator = new BuiltInFunctionTransformer(cubeSeg.getDimensionEncodingMap()); filter = translator.transform(filter); - String plannerName = KylinConfig.getInstanceFromEnv().getQueryStorageVisitPlanner(); CubeScanRangePlanner scanRangePlanner; try { - scanRangePlanner = (CubeScanRangePlanner) Class.forName(plannerName).getConstructor(CubeSegment.class, Cuboid.class, TupleFilter.class, Set.class, Set.class, Collection.class).newInstance(cubeSeg, cuboid, filter, dimensions, groups, metrics); + scanRangePlanner = new CubeScanRangePlanner(cubeSeg, cuboid, filter, dimensions, groups, metrics, context); } catch (Exception e) { throw new RuntimeException(e); } scanRequest = scanRangePlanner.planScanRequest(); - if (scanRequest != null) { - scanRequest.setAllowStorageAggregation(context.isNeedStorageAggregation()); - scanRequest.setAggCacheMemThreshold(cubeSeg.getCubeInstance().getConfig().getQueryCoprocessorMemGB()); - scanRequest.setStorageScanRowNumThreshold(context.getThreshold());//TODO: devide by shard number? - - if (cubeSeg.getCubeDesc().supportsLimitPushDown()) { - scanRequest.setStoragePushDownLimit(context.getStoragePushDownLimit()); - } - } scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage); } http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index 86346f8..f0c2494 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -20,6 +20,7 @@ package org.apache.kylin.storage.gtrecord; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -72,10 +73,10 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { @Override public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) { - + //cope with queries with no aggregations RawQueryLastHacker.hackNoAggregations(sqlDigest, cubeDesc); - + // Customized measure taking effect: e.g. allow custom measures to help raw queries notifyBeforeStorageQuery(sqlDigest); @@ -112,9 +113,9 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { // replace derived columns in filter with host columns; columns on loosened condition must be added to group by TupleFilter filterD = translateDerived(filter, groupsD); - context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD, exactAggregation)); - enableStoragePushDownLimit(cuboid, groups, derivedPostAggregation, groupsD, filter, sqlDigest.aggregations, context); - setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory + context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD)); + enableStorageLimitIfPossible(cuboid, groups, derivedPostAggregation, groupsD, filter, sqlDigest.aggregations, context); + setThresholdIfNecessary(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory List<CubeSegmentScanner> scanners = Lists.newArrayList(); for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) { @@ -229,9 +230,22 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { return resultD; } - public boolean isNeedStorageAggregation(Cuboid cuboid, Collection<TblColRef> groupD, Collection<TblColRef> singleValueD, boolean isExactAggregation) { - logger.info("Set isNeedStorageAggregation to " + !isExactAggregation); - return !isExactAggregation; + public boolean isNeedStorageAggregation(Cuboid cuboid, Collection<TblColRef> groupD, Collection<TblColRef> singleValueD) { + + logger.info("GroupD :" + groupD); + logger.info("SingleValueD :" + singleValueD); + logger.info("Cuboid columns :" + cuboid.getColumns()); + + HashSet<TblColRef> temp = Sets.newHashSet(); + temp.addAll(groupD); + temp.addAll(singleValueD); + if (cuboid.getColumns().size() == temp.size()) { + logger.info("Does not need storage aggregation"); + return false; + } else { + logger.info("Need storage aggregation"); + return true; + } } //exact aggregation was introduced back when we had some measures (like holistic distinct count) that is sensitive @@ -268,7 +282,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { } if (exact) { - logger.info("exactAggregation is true"); + logger.info("exactAggregation is true, cuboid id is " + cuboid.getId()); } return exact; } @@ -355,7 +369,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { } } - private void setThreshold(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) { + private void setThresholdIfNecessary(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) { boolean hasMemHungryMeasure = false; for (FunctionDesc func : metrics) { hasMemHungryMeasure |= func.getMeasureType().isMemoryHungry(); @@ -381,7 +395,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { } } - private void enableStoragePushDownLimit(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> derivedPostAggregation, Collection<TblColRef> groupsD, TupleFilter filter, Collection<FunctionDesc> functionDescs, StorageContext context) { + private void enableStorageLimitIfPossible(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> derivedPostAggregation, Collection<TblColRef> groupsD, TupleFilter filter, Collection<FunctionDesc> functionDescs, StorageContext context) { boolean possible = true; boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());