KYLIN-2603, push down having clause when possible
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e1bc72e3 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e1bc72e3 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e1bc72e3 Branch: refs/heads/KYLIN-2606 Commit: e1bc72e36d83a42d364e6d9b0ee893fbc018d498 Parents: 83fb144 Author: Li Yang <liy...@apache.org> Authored: Wed May 10 21:01:53 2017 +0800 Committer: hongbin ma <m...@kyligence.io> Committed: Tue May 23 20:18:17 2017 +0800 ---------------------------------------------------------------------- .../cube/gridtable/ScanRangePlannerBase.java | 3 +- .../org/apache/kylin/cube/model/RowKeyDesc.java | 16 +- .../kylin/gridtable/GTAggregateScanner.java | 223 ++++++++++++++----- .../apache/kylin/gridtable/GTScanRequest.java | 19 +- .../kylin/gridtable/GTScanRequestBuilder.java | 8 +- .../java/org/apache/kylin/gridtable/GTUtil.java | 33 ++- .../apache/kylin/metadata/model/ColumnDesc.java | 2 + .../apache/kylin/metadata/model/TblColRef.java | 2 + .../kylin/metadata/realization/SQLDigest.java | 36 ++- .../storage/gtrecord/CubeScanRangePlanner.java | 12 +- .../storage/gtrecord/CubeSegmentScanner.java | 10 +- .../gtrecord/GTCubeStorageQueryBase.java | 51 ++++- .../gtrecord/GTCubeStorageQueryRequest.java | 14 +- .../kylin/storage/gtrecord/ScannerWorker.java | 4 + .../storage/gtrecord/DictGridTableTest.java | 14 ++ .../kylin/storage/hbase/ITStorageTest.java | 6 +- .../kylin/query/relnode/OLAPAggregateRel.java | 6 +- .../apache/kylin/query/relnode/OLAPContext.java | 9 +- .../kylin/query/relnode/OLAPFilterRel.java | 9 +- 19 files changed, 370 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java index d938f2b..ed0a77a 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java @@ -44,14 +44,15 @@ import com.google.common.collect.Sets; public abstract class ScanRangePlannerBase { //GT - protected TupleFilter gtFilter; protected GTInfo gtInfo; + protected TupleFilter gtFilter; protected Pair<ByteArray, ByteArray> gtStartAndEnd; protected TblColRef gtPartitionCol; protected ImmutableBitSet gtDimensions; protected ImmutableBitSet gtAggrGroups; protected ImmutableBitSet gtAggrMetrics; protected String[] gtAggrFuncs; + protected TupleFilter havingFilter; protected boolean isPartitionColUsingDatetimeEncoding = true; protected RecordComparator rangeStartComparator; http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyDesc.java index 00557c5..124f126 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyDesc.java @@ -18,21 +18,23 @@ package org.apache.kylin.cube.model; -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Objects; -import org.apache.commons.lang.ArrayUtils; -import org.apache.kylin.metadata.model.TblColRef; - import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.commons.lang.ArrayUtils; +import org.apache.kylin.metadata.model.TblColRef; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Objects; + /** */ +@SuppressWarnings("serial") @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) public class RowKeyDesc implements java.io.Serializable { http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java index 0dd6fa9..45a9148 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java @@ -30,9 +30,9 @@ import java.util.Arrays; import java.util.Comparator; import java.util.Iterator; import java.util.List; +import java.util.Map.Entry; import java.util.PriorityQueue; import java.util.SortedMap; -import java.util.Map.Entry; import org.apache.commons.io.IOUtils; import org.apache.kylin.common.exceptions.ResourceLimitExceededException; @@ -40,11 +40,15 @@ import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.MemoryBudgetController; -import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.measure.MeasureAggregators; +import org.apache.kylin.metadata.filter.IFilterCodeSystem; +import org.apache.kylin.metadata.filter.TupleFilter; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.tuple.ITuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +71,7 @@ public class GTAggregateScanner implements IGTScanner { final long spillThreshold; // 0 means no memory control && no spill final int storagePushDownLimit;//default to be Int.MAX final boolean spillEnabled; + final TupleFilter havingFilter; private int aggregatedRowCount = 0; private MemoryWaterLevel memTracker; @@ -92,6 +97,7 @@ public class GTAggregateScanner implements IGTScanner { this.aggrMask = new boolean[metricsAggrFuncs.length]; this.storagePushDownLimit = req.getStoragePushDownLimit(); this.spillEnabled = spillEnabled; + this.havingFilter = req.getHavingFilterPushDown(); Arrays.fill(aggrMask, true); } @@ -327,62 +333,143 @@ public class GTAggregateScanner implements IGTScanner { } public Iterator<GTRecord> iterator() { + Iterator<Entry<byte[], MeasureAggregator[]>> it = null; + if (dumps.isEmpty()) { // the all-in-mem case + it = aggBufMap.entrySet().iterator(); + } else { + // the spill case + if (!aggBufMap.isEmpty()) { + spillBuffMap(getEstimateSizeOfAggrCache()); // TODO allow merge in-mem map with spilled dumps + } + DumpMerger merger = new DumpMerger(dumps); + it = merger.iterator(); + } - return new Iterator<GTRecord>() { + final Iterator<Entry<byte[], MeasureAggregator[]>> input = it; - final Iterator<Entry<byte[], MeasureAggregator[]>> it = aggBufMap.entrySet().iterator(); - final ReturningRecord returningRecord = new ReturningRecord(); + return new Iterator<GTRecord>() { - @Override - public boolean hasNext() { - return it.hasNext(); - } + final ReturningRecord returningRecord = new ReturningRecord(); + Entry<byte[], MeasureAggregator[]> returningEntry = null; + final HavingFilterChecker havingFilterChecker = (havingFilter == null) ? null : new HavingFilterChecker(); - @Override - public GTRecord next() { - Entry<byte[], MeasureAggregator[]> entry = it.next(); - returningRecord.load(entry.getKey(), entry.getValue()); - return returningRecord.record; + @Override + public boolean hasNext() { + while (returningEntry == null && input.hasNext()) { + returningEntry = input.next(); + if (havingFilterChecker != null) + returningEntry = havingFilterChecker.check(returningEntry); } + return returningEntry != null; + } - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } else { - // the spill case - if (!aggBufMap.isEmpty()) { - this.spillBuffMap(getEstimateSizeOfAggrCache()); // TODO allow merge in-mem map with spilled dumps + @Override + public GTRecord next() { + returningRecord.load(returningEntry.getKey(), returningEntry.getValue()); + returningEntry = null; + return returningRecord.record; } - return new Iterator<GTRecord>() { - final DumpMerger merger = new DumpMerger(dumps); - final Iterator<Pair<byte[], MeasureAggregator[]>> it = merger.iterator(); - final ReturningRecord returningRecord = new ReturningRecord(); + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } - @Override - public boolean hasNext() { - return it.hasNext(); - } + class HavingFilterChecker { - @Override - public GTRecord next() { - Pair<byte[], MeasureAggregator[]> entry = it.next(); - returningRecord.load(entry.getKey(), entry.getValue()); - return returningRecord.record; - } + final HavingFilterTuple tuple = new HavingFilterTuple(); + final IFilterCodeSystem cs = new HavingFilterCodeSys(); - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; + HavingFilterChecker() { + logger.info("Evaluating 'having' filter -- " + havingFilter); + } + + public Entry<byte[], MeasureAggregator[]> check(Entry<byte[], MeasureAggregator[]> returningEntry) { + tuple.aggrValues = returningEntry.getValue(); + boolean pass = havingFilter.evaluate(tuple, cs); + return pass ? returningEntry : null; + } + } + + private class HavingFilterCodeSys implements IFilterCodeSystem { + + Object o2Cache; + double n2Cache; + + @Override + public int compare(Object o1, Object o2) { + if (o1 == null && o2 == null) + return 0; + + if (o1 == null) // null is bigger to align with CubeCodeSystem + return 1; + + if (o2 == null) // null is bigger to align with CubeCodeSystem + return -1; + + // for the 'having clause', we only concern numbers and BigDecimal + // we try to cache the o2, which should be a constant according to CompareTupleFilter.evaluate() + + double n1 = ((Number) o1).doubleValue(); + double n2 = (o2Cache == o2) ? n2Cache : Double.parseDouble((String) o2); + + if (o2Cache == null) { + o2Cache = o2; + n2Cache = n2; + } + + return Double.compare(n1, n2); + } + + @Override + public boolean isNull(Object code) { + return code == null; + } + + @Override + public void serialize(Object code, ByteBuffer buf) { + throw new UnsupportedOperationException(); + } + + @Override + public Object deserialize(ByteBuffer buf) { + throw new UnsupportedOperationException(); } } + private class HavingFilterTuple implements ITuple { + MeasureAggregator[] aggrValues; + + @Override + public Object getValue(TblColRef col) { + return aggrValues[col.getColumnDesc().getZeroBasedIndex()].getState(); + } + + @Override + public List<String> getAllFields() { + throw new UnsupportedOperationException(); + } + + @Override + public List<TblColRef> getAllColumns() { + throw new UnsupportedOperationException(); + } + + @Override + public Object[] getAllValues() { + throw new UnsupportedOperationException(); + } + + @Override + public ITuple makeCopy() { + throw new UnsupportedOperationException(); + } + }; + class ReturningRecord { final GTRecord record = new GTRecord(info); final Object[] tmpValues = new Object[metrics.trueBitCount()]; @@ -468,8 +555,7 @@ public class GTAggregateScanner implements IGTScanner { } public void flush() throws IOException { - logger.info("AggregationCache(size={} est_mem_size={} threshold={}) will spill to {}", - buffMap.size(), estMemSize, spillThreshold, dumpedFile.getAbsolutePath()); + logger.info("AggregationCache(size={} est_mem_size={} threshold={}) will spill to {}", buffMap.size(), estMemSize, spillThreshold, dumpedFile.getAbsolutePath()); if (buffMap != null) { DataOutputStream dos = null; @@ -503,18 +589,18 @@ public class GTAggregateScanner implements IGTScanner { } } - class DumpMerger implements Iterable<Pair<byte[], MeasureAggregator[]>> { - final PriorityQueue<Pair<byte[], Integer>> minHeap; + class DumpMerger implements Iterable<Entry<byte[], MeasureAggregator[]>> { + final PriorityQueue<Entry<byte[], Integer>> minHeap; final List<Iterator<Pair<byte[], byte[]>>> dumpIterators; final List<Object[]> dumpCurrentValues; final MeasureAggregator[] resultMeasureAggregators = newAggregators(); final MeasureAggregators resultAggrs = new MeasureAggregators(resultMeasureAggregators); public DumpMerger(List<Dump> dumps) { - minHeap = new PriorityQueue<>(dumps.size(), new Comparator<Pair<byte[], Integer>>() { + minHeap = new PriorityQueue<>(dumps.size(), new Comparator<Entry<byte[], Integer>>() { @Override - public int compare(Pair<byte[], Integer> o1, Pair<byte[], Integer> o2) { - return bytesComparator.compare(o1.getFirst(), o2.getFirst()); + public int compare(Entry<byte[], Integer> o1, Entry<byte[], Integer> o2) { + return bytesComparator.compare(o1.getKey(), o2.getKey()); } }); dumpIterators = Lists.newArrayListWithCapacity(dumps.size()); @@ -536,7 +622,7 @@ public class GTAggregateScanner implements IGTScanner { private void enqueueFromDump(int index) { if (dumpIterators.get(index) != null && dumpIterators.get(index).hasNext()) { Pair<byte[], byte[]> pair = dumpIterators.get(index).next(); - minHeap.offer(new Pair(pair.getKey(), index)); + minHeap.offer(new SimpleEntry(pair.getKey(), index)); Object[] metricValues = new Object[metrics.trueBitCount()]; measureCodec.decode(ByteBuffer.wrap(pair.getValue()), metricValues); dumpCurrentValues.set(index, metricValues); @@ -544,21 +630,21 @@ public class GTAggregateScanner implements IGTScanner { } @Override - public Iterator<Pair<byte[], MeasureAggregator[]>> iterator() { - return new Iterator<Pair<byte[], MeasureAggregator[]>>() { + public Iterator<Entry<byte[], MeasureAggregator[]>> iterator() { + return new Iterator<Entry<byte[], MeasureAggregator[]>>() { @Override public boolean hasNext() { return !minHeap.isEmpty(); } private void internalAggregate() { - Pair<byte[], Integer> peekEntry = minHeap.poll(); + Entry<byte[], Integer> peekEntry = minHeap.poll(); resultAggrs.aggregate(dumpCurrentValues.get(peekEntry.getValue())); enqueueFromDump(peekEntry.getValue()); } @Override - public Pair<byte[], MeasureAggregator[]> next() { + public Entry<byte[], MeasureAggregator[]> next() { // Use minimum heap to merge sort the keys, // also do aggregation for measures with same keys in different dumps resultAggrs.reset(); @@ -570,7 +656,7 @@ public class GTAggregateScanner implements IGTScanner { internalAggregate(); } - return new Pair(peekKey, resultMeasureAggregators); + return new SimpleEntry(peekKey, resultMeasureAggregators); } @Override @@ -581,4 +667,31 @@ public class GTAggregateScanner implements IGTScanner { } } } + + private static class SimpleEntry<K, V> implements Entry<K, V> { + K k; + V v; + + SimpleEntry(K k, V v) { + this.k = k; + this.v = v; + } + + @Override + public K getKey() { + return k; + } + + @Override + public V getValue() { + return v; + } + + @Override + public V setValue(V value) { + V oldV = v; + this.v = value; + return oldV; + } + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java index ae35d2b..9b6b2a6 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java @@ -33,7 +33,9 @@ import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.SerializeToByteBuffer; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.filter.StringCodeSystem; import org.apache.kylin.metadata.filter.TupleFilter; +import org.apache.kylin.metadata.filter.TupleFilterSerializer; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +58,7 @@ public class GTScanRequest { // optional filtering private TupleFilter filterPushDown; + private TupleFilter havingFilterPushDown; // optional aggregation private ImmutableBitSet aggrGroupBy; @@ -75,8 +78,9 @@ public class GTScanRequest { private transient boolean doingStorageAggregation = false; GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, // - ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, boolean allowStorageAggregation, // - double aggCacheMemThreshold, int storageScanRowNumThreshold, int storagePushDownLimit, String storageBehavior, long startTime, long timeout) { + ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, TupleFilter havingFilterPushDown, // + boolean allowStorageAggregation, double aggCacheMemThreshold, int storageScanRowNumThreshold, // + int storagePushDownLimit, String storageBehavior, long startTime, long timeout) { this.info = info; if (ranges == null) { this.ranges = Lists.newArrayList(new GTScanRange(new GTRecord(info), new GTRecord(info))); @@ -85,6 +89,7 @@ public class GTScanRequest { } this.columns = dimensions; this.filterPushDown = filterPushDown; + this.havingFilterPushDown = havingFilterPushDown; this.aggrGroupBy = aggrGroupBy; this.aggrMetrics = aggrMetrics; @@ -272,10 +277,10 @@ public class GTScanRequest { return filterPushDown; } - public void setFilterPushDown(TupleFilter filter) { - filterPushDown = filter; + public TupleFilter getHavingFilterPushDown() { + return havingFilterPushDown; } - + public ImmutableBitSet getDimensions() { return this.getColumns().andNot(this.getAggrMetrics()); } @@ -359,6 +364,7 @@ public class GTScanRequest { ImmutableBitSet.serializer.serialize(value.columns, out); BytesUtil.writeByteArray(GTUtil.serializeGTFilter(value.filterPushDown, value.info), out); + BytesUtil.writeByteArray(TupleFilterSerializer.serialize(value.havingFilterPushDown, StringCodeSystem.INSTANCE), out); ImmutableBitSet.serializer.serialize(value.aggrGroupBy, out); ImmutableBitSet.serializer.serialize(value.aggrMetrics, out); @@ -392,6 +398,7 @@ public class GTScanRequest { ImmutableBitSet sColumns = ImmutableBitSet.serializer.deserialize(in); TupleFilter sGTFilter = GTUtil.deserializeGTFilter(BytesUtil.readByteArray(in), sInfo); + TupleFilter sGTHavingFilter = TupleFilterSerializer.deserialize(BytesUtil.readByteArray(in), StringCodeSystem.INSTANCE); ImmutableBitSet sAggGroupBy = ImmutableBitSet.serializer.deserialize(in); ImmutableBitSet sAggrMetrics = ImmutableBitSet.serializer.deserialize(in); @@ -406,7 +413,7 @@ public class GTScanRequest { return new GTScanRequestBuilder().setInfo(sInfo).setRanges(sRanges).setDimensions(sColumns).// setAggrGroupBy(sAggGroupBy).setAggrMetrics(sAggrMetrics).setAggrMetricsFuncs(sAggrMetricFuncs).// - setFilterPushDown(sGTFilter).setAllowStorageAggregation(sAllowPreAggr).setAggCacheMemThreshold(sAggrCacheGB).// + setFilterPushDown(sGTFilter).setHavingFilterPushDown(sGTHavingFilter).setAllowStorageAggregation(sAllowPreAggr).setAggCacheMemThreshold(sAggrCacheGB).// setStorageScanRowNumThreshold(storageScanRowNumThreshold).setStoragePushDownLimit(storagePushDownLimit).// setStartTime(startTime).setTimeout(timeout).setStorageBehavior(storageBehavior).createGTScanRequest(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java index bcec1f4..ba1fdbc 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java @@ -29,6 +29,7 @@ public class GTScanRequestBuilder { private GTInfo info; private List<GTScanRange> ranges; private TupleFilter filterPushDown; + private TupleFilter havingFilterPushDown; private ImmutableBitSet dimensions; private ImmutableBitSet aggrGroupBy = null; private ImmutableBitSet aggrMetrics = null; @@ -56,6 +57,11 @@ public class GTScanRequestBuilder { return this; } + public GTScanRequestBuilder setHavingFilterPushDown(TupleFilter havingFilterPushDown) { + this.havingFilterPushDown = havingFilterPushDown; + return this; + } + public GTScanRequestBuilder setDimensions(ImmutableBitSet dimensions) { this.dimensions = dimensions; return this; @@ -131,6 +137,6 @@ public class GTScanRequestBuilder { this.startTime = startTime == -1 ? System.currentTimeMillis() : startTime; this.timeout = timeout == -1 ? 300000 : timeout; - return new GTScanRequest(info, ranges, dimensions, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown, allowStorageAggregation, aggCacheMemThreshold, storageScanRowNumThreshold, storagePushDownLimit, storageBehavior, startTime, timeout); + return new GTScanRequest(info, ranges, dimensions, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown, havingFilterPushDown, allowStorageAggregation, aggCacheMemThreshold, storageScanRowNumThreshold, storagePushDownLimit, storageBehavior, startTime, timeout); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java index 7496778..7a7e4e6 100755 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java @@ -20,7 +20,9 @@ package org.apache.kylin.gridtable; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.kylin.common.util.ByteArray; @@ -61,18 +63,28 @@ public class GTUtil { public static TupleFilter convertFilterColumnsAndConstants(TupleFilter rootFilter, GTInfo info, // List<TblColRef> colMapping, Set<TblColRef> unevaluatableColumnCollector) { - return convertFilter(rootFilter, info, colMapping, true, unevaluatableColumnCollector); + Map<TblColRef, Integer> map = colListToMap(colMapping); + return convertFilter(rootFilter, info, map, true, unevaluatableColumnCollector); + } + + protected static Map<TblColRef, Integer> colListToMap(List<TblColRef> colMapping) { + Map<TblColRef, Integer> map = new HashMap<>(); + for (int i = 0; i < colMapping.size(); i++) { + map.put(colMapping.get(i), i); + } + return map; } // converts TblColRef to GridTable column, encode constants, drop unEvaluatable parts private static TupleFilter convertFilter(TupleFilter rootFilter, final GTInfo info, // - final List<TblColRef> colMapping, final boolean encodeConstants, // + final Map<TblColRef, Integer> colMapping, final boolean encodeConstants, // final Set<TblColRef> unevaluatableColumnCollector) { IFilterCodeSystem<ByteArray> filterCodeSystem = wrap(info.codeSystem.getComparator()); + + GTConvertDecorator decorator = new GTConvertDecorator(unevaluatableColumnCollector, colMapping, info, encodeConstants); - byte[] bytes = TupleFilterSerializer.serialize(rootFilter, new GTConvertDecorator(unevaluatableColumnCollector, colMapping, info, encodeConstants), filterCodeSystem); - + byte[] bytes = TupleFilterSerializer.serialize(rootFilter, decorator, filterCodeSystem); return TupleFilterSerializer.deserialize(bytes, filterCodeSystem); } @@ -106,17 +118,22 @@ public class GTUtil { protected static class GTConvertDecorator implements TupleFilterSerializer.Decorator { protected final Set<TblColRef> unevaluatableColumnCollector; - protected final List<TblColRef> colMapping; + protected final Map<TblColRef, Integer> colMapping; protected final GTInfo info; protected final boolean encodeConstants; - public GTConvertDecorator(Set<TblColRef> unevaluatableColumnCollector, List<TblColRef> colMapping, GTInfo info, boolean encodeConstants) { + public GTConvertDecorator(Set<TblColRef> unevaluatableColumnCollector, Map<TblColRef, Integer> colMapping, GTInfo info, boolean encodeConstants) { this.unevaluatableColumnCollector = unevaluatableColumnCollector; this.colMapping = colMapping; this.info = info; this.encodeConstants = encodeConstants; buf = ByteBuffer.allocate(info.getMaxColumnLength()); } + + protected int mapCol(TblColRef col) { + Integer i = colMapping.get(col); + return i == null ? -1 : i; + } @Override public TupleFilter onSerialize(TupleFilter filter) { @@ -140,7 +157,7 @@ public class GTUtil { // map to column onto grid table if (colMapping != null && filter instanceof ColumnTupleFilter) { ColumnTupleFilter colFilter = (ColumnTupleFilter) filter; - int gtColIdx = colMapping.indexOf(colFilter.getColumn()); + int gtColIdx = mapCol(colFilter.getColumn()); return new ColumnTupleFilter(info.colRef(gtColIdx)); } @@ -174,7 +191,7 @@ public class GTUtil { //with normal ConstantTupleFilter Object firstValue = constValues.iterator().next(); - int col = colMapping == null ? externalCol.getColumnDesc().getZeroBasedIndex() : colMapping.indexOf(externalCol); + int col = colMapping == null ? externalCol.getColumnDesc().getZeroBasedIndex() : mapCol(externalCol); TupleFilter result; ByteArray code; http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java index 2cd4964..5d15d56 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java @@ -133,6 +133,8 @@ public class ColumnDesc implements Serializable { public void setId(String id) { this.id = id; + if (id != null) + zeroBasedIndex = Integer.parseInt(id) - 1; } public String getName() { http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java index da62a75..aa4c056 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java @@ -202,6 +202,8 @@ public class TblColRef implements Serializable { return false; if ((table == null ? other.table == null : table.equals(other.table)) == false) return false; + if (this.isInnerColumn() != other.isInnerColumn()) + return false; return true; } http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java index 36f303b..03ff3ff 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java @@ -46,34 +46,52 @@ public class SQLDigest { } } + // model public String factTable; - public TupleFilter filter; - public List<JoinDesc> joinDescs; public Set<TblColRef> allColumns; + public List<JoinDesc> joinDescs; + + // group by public List<TblColRef> groupbyColumns; public Set<TblColRef> subqueryJoinParticipants; - public Set<TblColRef> filterColumns; + + // aggregation public Set<TblColRef> metricColumns; public List<FunctionDesc> aggregations; // storage level measure type, on top of which various sql aggr function may apply public List<SQLCall> aggrSqlCalls; // sql level aggregation function call + + // filter + public Set<TblColRef> filterColumns; + public TupleFilter filter; + public TupleFilter havingFilter; + + // sort & limit public List<TblColRef> sortColumns; public List<OrderEnum> sortOrders; public boolean isRawQuery; public boolean limitPrecedesAggr; - public SQLDigest(String factTable, TupleFilter filter, List<JoinDesc> joinDescs, Set<TblColRef> allColumns, // - List<TblColRef> groupbyColumns, Set<TblColRef> subqueryJoinParticipants, Set<TblColRef> filterColumns, Set<TblColRef> metricColumns, // - List<FunctionDesc> aggregations, List<SQLCall> aggrSqlCalls, List<TblColRef> sortColumns, List<OrderEnum> sortOrders, boolean limitPrecedesAggr) { + public SQLDigest(String factTable, Set<TblColRef> allColumns, List<JoinDesc> joinDescs, // model + List<TblColRef> groupbyColumns, Set<TblColRef> subqueryJoinParticipants, // group by + Set<TblColRef> metricColumns, List<FunctionDesc> aggregations, List<SQLCall> aggrSqlCalls, // aggregation + Set<TblColRef> filterColumns, TupleFilter filter, TupleFilter havingFilter, // filter + List<TblColRef> sortColumns, List<OrderEnum> sortOrders, boolean limitPrecedesAggr // sort & limit + ) { this.factTable = factTable; - this.filter = filter; - this.joinDescs = joinDescs; this.allColumns = allColumns; + this.joinDescs = joinDescs; + this.groupbyColumns = groupbyColumns; this.subqueryJoinParticipants = subqueryJoinParticipants; - this.filterColumns = filterColumns; + this.metricColumns = metricColumns; this.aggregations = aggregations; this.aggrSqlCalls = aggrSqlCalls; + + this.filterColumns = filterColumns; + this.filter = filter; + this.havingFilter = havingFilter; + this.sortColumns = sortColumns; this.sortOrders = sortOrders; this.isRawQuery = isRawQuery(); http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java index c3cc858..cecea85 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java @@ -73,7 +73,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase { protected StorageContext context; public CubeScanRangePlanner(CubeSegment cubeSegment, Cuboid cuboid, TupleFilter filter, Set<TblColRef> dimensions, Set<TblColRef> groupByDims, // - Collection<FunctionDesc> metrics, StorageContext context) { + Collection<FunctionDesc> metrics, TupleFilter havingFilter, StorageContext context) { this.context = context; this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax(); @@ -100,6 +100,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase { //replace the constant values in filter to dictionary codes Set<TblColRef> groupByPushDown = Sets.newHashSet(groupByDims); this.gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, gtInfo, mapping.getCuboidDimensionsInGTOrder(), groupByPushDown); + this.havingFilter = havingFilter; this.gtDimensions = mapping.makeGridTableColumns(dimensions); this.gtAggrGroups = mapping.makeGridTableColumns(replaceDerivedColumns(groupByPushDown, cubeSegment.getCubeDesc())); @@ -115,15 +116,10 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase { this.gtPartitionCol = gtInfo.colRef(index); } } - } /** - * constrcut GTScanRangePlanner with incomplete information. only be used for UT - * @param info - * @param gtStartAndEnd - * @param gtPartitionCol - * @param gtFilter + * Construct GTScanRangePlanner with incomplete information. For UT only. */ public CubeScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> gtStartAndEnd, TblColRef gtPartitionCol, TupleFilter gtFilter) { @@ -152,7 +148,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase { scanRequest = new GTScanRequestBuilder().setInfo(gtInfo).setRanges(scanRanges).setDimensions(gtDimensions).// setAggrGroupBy(gtAggrGroups).setAggrMetrics(gtAggrMetrics).setAggrMetricsFuncs(gtAggrFuncs).setFilterPushDown(gtFilter).// setAllowStorageAggregation(context.isNeedStorageAggregation()).setAggCacheMemThreshold(cubeSegment.getConfig().getQueryCoprocessorMemGB()).// - setStoragePushDownLimit(context.getFinalPushDownLimit()).createGTScanRequest(); + setStoragePushDownLimit(context.getFinalPushDownLimit()).setHavingFilterPushDown(havingFilter).createGTScanRequest(); } else { scanRequest = null; } http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java index 31a9f99..ee12743 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java @@ -51,7 +51,7 @@ public class CubeSegmentScanner implements IGTScanner { final GTScanRequest scanRequest; public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, // - Collection<FunctionDesc> metrics, TupleFilter originalfilter, StorageContext context) { + Collection<FunctionDesc> metrics, TupleFilter originalfilter, TupleFilter havingFilter, StorageContext context) { logger.info("Init CubeSegmentScanner for segment {}", cubeSeg.getName()); @@ -70,16 +70,22 @@ public class CubeSegmentScanner implements IGTScanner { CubeScanRangePlanner scanRangePlanner; try { - scanRangePlanner = new CubeScanRangePlanner(cubeSeg, cuboid, filter, dimensions, groups, metrics, context); + scanRangePlanner = new CubeScanRangePlanner(cubeSeg, cuboid, filter, dimensions, groups, metrics, havingFilter, context); } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeException(e); } + scanRequest = scanRangePlanner.planScanRequest(); + String gtStorage = ((GTCubeStorageQueryBase) context.getStorageQuery()).getGTStorage(); scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage, context); } + + public boolean isSegmentSkipped() { + return scanner.isSegmentSkipped(); + } @Override public Iterator<GTRecord> iterator() { http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index 0f942f0..5faa098 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -44,6 +44,7 @@ import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.Segments; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.realization.SQLDigest; import org.apache.kylin.metadata.tuple.ITupleIterator; @@ -83,8 +84,10 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { continue; } - scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), request.getGroups(), request.getMetrics(), request.getFilter(), request.getContext()); - scanners.add(scanner); + scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), request.getGroups(), request.getMetrics(), request.getFilter(), request.getHavingFilter(), request.getContext()); + + if (!scanner.isSegmentSkipped()) + scanners.add(scanner); } if (scanners.isEmpty()) @@ -147,10 +150,13 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { enableStreamAggregateIfBeneficial(cuboid, groupsD, context); // set query deadline context.setDeadline(cubeInstance); - + + // push down having clause filter if possible + TupleFilter havingFilter = checkHavingCanPushDown(sqlDigest.havingFilter, groupsD, sqlDigest.aggregations, metrics); + logger.info("Cuboid identified: cube={}, cuboidId={}, groupsD={}, filterD={}, limitPushdown={}, storageAggr={}", cubeInstance.getName(), cuboid.getId(), groupsD, filterColumnD, context.getFinalPushDownLimit(), context.isNeedStorageAggregation()); - return new GTCubeStorageQueryRequest(cuboid, dimensionsD, groupsD, filterColumnD, metrics, filterD, context); + return new GTCubeStorageQueryRequest(cuboid, dimensionsD, groupsD, filterColumnD, metrics, filterD, havingFilter, context); } protected abstract String getGTStorage(); @@ -416,4 +422,41 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { } } + private TupleFilter checkHavingCanPushDown(TupleFilter havingFilter, Set<TblColRef> groupsD, List<FunctionDesc> aggregations, Set<FunctionDesc> metrics) { + // must have only one segment + Segments<CubeSegment> readySegs = cubeInstance.getSegments(SegmentStatusEnum.READY); + if (readySegs.size() != 1) + return null; + + // sharded-by column must on group by + CubeDesc desc = cubeInstance.getDescriptor(); + Set<TblColRef> shardBy = desc.getShardByColumns(); + if (groupsD == null || shardBy.isEmpty() || !groupsD.containsAll(shardBy)) + return null; + + // OK, push down + logger.info("Push down having filter " + havingFilter); + + // convert columns in the filter + Set<TblColRef> aggrOutCols = new HashSet<>(); + TupleFilter.collectColumns(havingFilter, aggrOutCols); + + for (TblColRef aggrOutCol : aggrOutCols) { + int aggrIdxOnSql = aggrOutCol.getColumnDesc().getZeroBasedIndex(); // aggr index marked in OLAPAggregateRel + FunctionDesc aggrFunc = aggregations.get(aggrIdxOnSql); + + // calculate the index of this aggr among all the metrics that is sending to storage + int aggrIdxAmongMetrics = 0; + for (MeasureDesc m : cubeDesc.getMeasures()) { + if (aggrFunc.equals(m.getFunction())) + break; + if (metrics.contains(m.getFunction())) + aggrIdxAmongMetrics++; + } + aggrOutCol.getColumnDesc().setId("" + (aggrIdxAmongMetrics + 1)); + } + + return havingFilter; + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryRequest.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryRequest.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryRequest.java index 68f755c..7793515 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryRequest.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryRequest.java @@ -27,6 +27,7 @@ import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.storage.StorageContext; +@SuppressWarnings("serial") public class GTCubeStorageQueryRequest implements Serializable { private Cuboid cuboid; private Set<TblColRef> dimensions; @@ -34,15 +35,18 @@ public class GTCubeStorageQueryRequest implements Serializable { private Set<TblColRef> filterCols; private Set<FunctionDesc> metrics; private TupleFilter filter; + private TupleFilter havingFilter; private StorageContext context; - public GTCubeStorageQueryRequest(Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, Set<TblColRef> filterCols, Set<FunctionDesc> metrics, TupleFilter filter, StorageContext context) { + public GTCubeStorageQueryRequest(Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, // + Set<TblColRef> filterCols, Set<FunctionDesc> metrics, TupleFilter filter, TupleFilter havingFilter, StorageContext context) { this.cuboid = cuboid; this.dimensions = dimensions; this.groups = groups; this.filterCols = filterCols; this.metrics = metrics; this.filter = filter; + this.havingFilter = havingFilter; this.context = context; } @@ -86,6 +90,14 @@ public class GTCubeStorageQueryRequest implements Serializable { this.filter = filter; } + public TupleFilter getHavingFilter() { + return havingFilter; + } + + public void setHavingFilter(TupleFilter havingFilter) { + this.havingFilter = havingFilter; + } + public StorageContext getContext() { return context; } http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java index fe22e9c..8f64bd1 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java @@ -55,6 +55,10 @@ public class ScannerWorker { throw new RuntimeException(e); } } + + public boolean isSegmentSkipped() { + return internal instanceof EmptyGTScanner; + } public Iterator<GTRecord> iterator() { return internal.iterator(); http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java ---------------------------------------------------------------------- diff --git a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java index 7500b00..672f3e0 100644 --- a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java +++ b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java @@ -61,6 +61,7 @@ import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.model.TblColRef.InnerDataTypeEnum; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -321,6 +322,19 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { } @Test + public void verifyAggregateAndHavingFilter() throws IOException { + GTInfo info = table.getInfo(); + + TblColRef havingCol = TblColRef.newInnerColumn("SUM_OF_BIGDECIMAL", InnerDataTypeEnum.LITERAL); + havingCol.getColumnDesc().setId("1"); // point to the first aggregated measure + CompareTupleFilter havingFilter = compare(havingCol, FilterOperatorEnum.GT, "20"); + + GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(1)).setAggrMetrics(setOf(4)).setAggrMetricsFuncs(new String[] { "sum" }).setHavingFilterPushDown(havingFilter).createGTScanRequest(); + + doScanAndVerify(table, useDeserializedGTScanRequest(req), "[null, 20, null, null, 42.0]", "[null, 30, null, null, 52.5]"); + } + + @Test public void testFilterScannerPerf() throws IOException { GridTable table = newTestPerfTable(); GTInfo info = table.getInfo(); http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java index 847baf8..24589a8 100644 --- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java @@ -135,7 +135,11 @@ public class ITStorageTest extends HBaseMetadataTestCase { int count = 0; ITupleIterator iterator = null; try { - SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Sets.<TblColRef> newHashSet(), Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations, Collections.<SQLCall> emptyList(), new ArrayList<TblColRef>(), new ArrayList<SQLDigest.OrderEnum>(), false); + SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", /*allCol*/ Collections.<TblColRef> emptySet(), /*join*/ null, // + groups, /*subqueryJoinParticipants*/ Sets.<TblColRef> newHashSet(), // + /*metricCol*/ Collections.<TblColRef> emptySet(), aggregations, /*aggrSqlCalls*/ Collections.<SQLCall> emptyList(), // + /*filter col*/ Collections.<TblColRef> emptySet(), filter, null, // + /*sortCol*/ new ArrayList<TblColRef>(), new ArrayList<SQLDigest.OrderEnum>(), false); iterator = storageEngine.search(context, sqlDigest, mockup.newTupleInfo(groups, aggregations)); while (iterator.hasNext()) { ITuple tuple = iterator.next(); http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java index 24711d3..adb145a 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java @@ -167,6 +167,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { if (!this.afterAggregate) { addToContextGroupBy(this.groups); this.context.aggregations.addAll(this.aggregations); + this.context.aggrOutCols.addAll(columnRowType.getAllColumns().subList(groups.size(), columnRowType.getAllColumns().size())); this.context.afterAggregate = true; if (this.context.afterLimit) { @@ -209,14 +210,15 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { for (int i = 0; i < this.aggregations.size(); i++) { FunctionDesc aggFunc = this.aggregations.get(i); String aggOutName; - if (aggFunc != null && aggFunc.needRewriteField()) { + if (aggFunc != null) { aggOutName = aggFunc.getRewriteFieldName(); } else { AggregateCall aggCall = this.rewriteAggCalls.get(i); int index = aggCall.getArgList().get(0); - aggOutName = getSqlFuncName(aggCall) + "_" + inputColumnRowType.getColumnByIndex(index).getIdentity() + "_"; + aggOutName = getSqlFuncName(aggCall) + "_" + inputColumnRowType.getColumnByIndex(index).getIdentity().replace('.', '_') + "_"; } TblColRef aggOutCol = TblColRef.newInnerColumn(aggOutName, TblColRef.InnerDataTypeEnum.LITERAL); + aggOutCol.getColumnDesc().setId("" + (i + 1)); // mark the index of aggregation columns.add(aggOutCol); } return new ColumnRowType(columns); http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java index db5f2eb..31ed075 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java @@ -126,9 +126,11 @@ public class OLAPContext { public Set<TblColRef> subqueryJoinParticipants = new HashSet<TblColRef>();//subqueryJoinParticipants will be added to groupByColumns(only when other group by co-exists) and allColumns public Set<TblColRef> metricsColumns = new HashSet<>(); public List<FunctionDesc> aggregations = new ArrayList<>(); // storage level measure type, on top of which various sql aggr function may apply + public List<TblColRef> aggrOutCols = new ArrayList<>(); // aggregation output (inner) columns public List<SQLCall> aggrSqlCalls = new ArrayList<>(); // sql level aggregation function call public Set<TblColRef> filterColumns = new HashSet<>(); public TupleFilter filter; + public TupleFilter havingFilter; public List<JoinDesc> joins = new LinkedList<>(); public JoinsTree joinsTree; private List<TblColRef> sortColumns; @@ -150,7 +152,12 @@ public class OLAPContext { public SQLDigest getSQLDigest() { if (sqlDigest == null) - sqlDigest = new SQLDigest(firstTableScan.getTableName(), filter, joins, allColumns, groupByColumns, subqueryJoinParticipants, filterColumns, metricsColumns, aggregations, aggrSqlCalls, sortColumns, sortOrders, limitPrecedesAggr); + sqlDigest = new SQLDigest(firstTableScan.getTableName(), allColumns, joins, // model + groupByColumns, subqueryJoinParticipants, // group by + metricsColumns, aggregations, aggrSqlCalls, // aggregation + filterColumns, filter, havingFilter, // filter + sortColumns, sortOrders, limitPrecedesAggr // sort & limit + ); return sqlDigest; } http://git-wip-us.apache.org/repos/asf/kylin/blob/e1bc72e3/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java index 882c959..0833a92 100755 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java @@ -355,7 +355,12 @@ public class OLAPFilterRel extends Filter implements OLAPRel { if (!context.afterAggregate) { translateFilter(context); } else { - context.afterHavingClauseFilter = true;//having clause is skipped + context.afterHavingClauseFilter = true; + + TupleFilterVisitor visitor = new TupleFilterVisitor(this.columnRowType); + TupleFilter havingFilter = this.condition.accept(visitor); + if (context.havingFilter == null) + context.havingFilter = havingFilter; } } @@ -372,8 +377,10 @@ public class OLAPFilterRel extends Filter implements OLAPRel { TupleFilterVisitor visitor = new TupleFilterVisitor(this.columnRowType); TupleFilter filter = this.condition.accept(visitor); + // optimize the filter, the optimization has to be segment-irrelevant new FilterOptimizeTransformer().transform(filter); + Set<TblColRef> filterColumns = Sets.newHashSet(); TupleFilter.collectColumns(filter, filterColumns); for (TblColRef tblColRef : filterColumns) {