Repository: kylin Updated Branches: refs/heads/v1.6.0-rc2 9cdf8e7fe -> 03305fa89
KYLIN-2173 push down limit leads to wrong answer when filter is loosened Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/03305fa8 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/03305fa8 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/03305fa8 Branch: refs/heads/v1.6.0-rc2 Commit: 03305fa892c917311b0953798788e8a2bbc1c4da Parents: 9cdf8e7 Author: gaodayue <gaoda...@meituan.com> Authored: Mon Nov 14 21:17:44 2016 +0800 Committer: gaodayue <gaoda...@meituan.com> Committed: Mon Nov 14 21:19:42 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/storage/StorageContext.java | 1 + .../gtrecord/GTCubeStorageQueryBase.java | 97 ++++---------------- 2 files changed, 18 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/03305fa8/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java index cc39918..b338b3c 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java @@ -135,6 +135,7 @@ public class StorageContext { tempPushDownLimit, pushDownLimitMax); } else { this.finalPushDownLimit = tempPushDownLimit; + logger.info("Enable limit: " + tempPushDownLimit); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/03305fa8/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index b96e952..434ff43 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -106,33 +106,23 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { logger.info("Cuboid identified: cube={}, cuboidId={}, groupsD={}, otherDimsD={}", cubeInstance.getName(), cuboid.getId(), groupsD, otherDimsD); context.setCuboid(cuboid); - // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine + // set whether to aggr at storage Set<TblColRef> singleValuesD = findSingleValueColumns(filter); - boolean exactAggregation = isExactAggregation(cuboid, groups, otherDimsD, singleValuesD, derivedPostAggregation); - context.setExactAggregation(exactAggregation); + context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD)); // replace derived columns in filter with host columns; columns on loosened condition must be added to group by - TupleFilter filterD = translateDerived(filter, groupsD); + Set<TblColRef> loosenedColumnD = Sets.newHashSet(); + TupleFilter filterD = translateDerived(filter, loosenedColumnD); + groupsD.addAll(loosenedColumnD); - //set whether to aggr at storage - context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD)); // set limit push down - enableStorageLimitIfPossible(cuboid, groups, derivedPostAggregation, groupsD, filter, sqlDigest.aggregations, context); - context.setFinalPushDownLimit(cubeInstance); + enableStorageLimitIfPossible(cuboid, groups, derivedPostAggregation, groupsD, filter, loosenedColumnD, sqlDigest.aggregations, context); // set cautious threshold to prevent out of memory setThresholdIfNecessary(dimensionsD, metrics, context); List<CubeSegmentScanner> scanners = Lists.newArrayList(); for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) { CubeSegmentScanner scanner; - if (cubeSeg.getInputRecords() == 0) { - if (!skipZeroInputSegment(cubeSeg)) { - logger.warn("cube segment {} input record is 0, " + "it may caused by kylin failed to the job counter " + "as the hadoop history server wasn't running", cubeSeg); - } else { - logger.warn("cube segment {} input record is 0, skip it ", cubeSeg); - continue; - } - } try { scanner = new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, context, getGTStorage()); } catch (IllegalArgumentException ex) { @@ -152,10 +142,6 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context); } - protected boolean skipZeroInputSegment(CubeSegment cubeSegment) { - return false; - } - protected abstract String getGTStorage(); private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) { @@ -262,45 +248,6 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { } } - //exact aggregation was introduced back when we had some measures (like holistic distinct count) that is sensitive - //to post aggregation. Now that we don't have such measure any more, isExactAggregation should be useless (at least in v2 storage and above) - public boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) { - boolean exact = true; - - if (cuboid.requirePostAggregation()) { - exact = false; - logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId()); - } - - // derived aggregation is bad, unless expanded columns are already in group by - if (groups.containsAll(derivedPostAggregation) == false) { - exact = false; - logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation); - } - - // other columns (from filter) is bad, unless they are ensured to have single value - if (singleValuesD.containsAll(othersD) == false) { - exact = false; - logger.info("exactAggregation is false because some column not on group by: " + othersD // - + " (single value column: " + singleValuesD + ")"); - } - - // for partitioned cube, the partition column must belong to group by or has single value - PartitionDesc partDesc = cuboid.getCubeDesc().getModel().getPartitionDesc(); - if (partDesc.isPartitioned()) { - TblColRef col = partDesc.getPartitionDateColumnRef(); - if (!groups.contains(col) && !singleValuesD.contains(col)) { - exact = false; - logger.info("exactAggregation is false because cube is partitioned and " + col + " is not on group by"); - } - } - - if (exact) { - logger.info("exactAggregation is true, cuboid id is " + cuboid.getId()); - } - return exact; - } - @SuppressWarnings("unchecked") private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) { if (filter == null) @@ -363,26 +310,13 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { return; if (filter instanceof ColumnTupleFilter) { - collectColumns(((ColumnTupleFilter) filter).getColumn(), collector); + collector.add(((ColumnTupleFilter) filter).getColumn()); } for (TupleFilter child : filter.getChildren()) { collectColumnsRecursively(child, collector); } } - private void collectColumns(TblColRef col, Set<TblColRef> collector) { - if (cubeDesc.isExtendedColumn(col)) { - throw new CubeDesc.CannotFilterExtendedColumnException(col); - } - if (cubeDesc.isDerived(col)) { - DeriveInfo hostInfo = cubeDesc.getHostInfo(col); - for (TblColRef h : hostInfo.columns) - collector.add(h); - } else { - collector.add(col); - } - } - private void setThresholdIfNecessary(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) { boolean hasMemHungryMeasure = false; for (FunctionDesc func : metrics) { @@ -409,17 +343,20 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { } } - private void enableStorageLimitIfPossible(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> derivedPostAggregation, Collection<TblColRef> groupsD, TupleFilter filter, Collection<FunctionDesc> functionDescs, StorageContext context) { + private void enableStorageLimitIfPossible(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> derivedPostAggregation, Collection<TblColRef> groupsD, TupleFilter filter, Set<TblColRef> loosenedColumnD, Collection<FunctionDesc> functionDescs, StorageContext context) { boolean possible = true; - boolean goodFilter = filter == null || TupleFilter.isEvaluableRecursively(filter); - if (!goodFilter) { + if (!TupleFilter.isEvaluableRecursively(filter)) { + possible = false; + logger.info("Storage limit push down is impossible because the filter isn't evaluable"); + } + + if (!loosenedColumnD.isEmpty()) { // KYLIN-2173 possible = false; - logger.info("Storage limit push down is impossible because the filter is unevaluatable"); + logger.info("Storage limit push down is impossible because filter is loosened: " + loosenedColumnD); } - boolean goodSort = !context.hasSort(); - if (!goodSort) { + if (context.hasSort()) { possible = false; logger.info("Storage limit push down is impossible because the query has order by"); } @@ -447,8 +384,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { } if (possible) { - logger.info("Enable limit " + context.getLimit()); context.enableLimit(); + context.setFinalPushDownLimit(cubeInstance); } }