KYLIN-2501 bugfix, pass IT
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d045a045 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d045a045 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d045a045 Branch: refs/heads/KYLIN-2501 Commit: d045a045a51f587afb35a997a87e249ad1da4adb Parents: 7e3c423 Author: gaodayue <gaoda...@meituan.com> Authored: Wed Mar 22 16:31:45 2017 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Fri Mar 31 14:59:51 2017 +0800 ---------------------------------------------------------------------- .../gridtable/GTStreamAggregateScanner.java | 24 +++-- .../apache/kylin/storage/StorageContext.java | 12 --- .../gtrecord/GTCubeStorageQueryBase.java | 7 -- .../storage/gtrecord/PartitionResultMerger.java | 100 ------------------- .../gtrecord/SegmentCubeTupleIterator.java | 7 +- .../SortMergedPartitionResultIterator.java | 81 +++++++++++++++ .../gtrecord/StorageResponseGTScatter.java | 13 ++- 7 files changed, 108 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/d045a045/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java index 1fde423..4eb5791 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java @@ -18,6 +18,7 @@ package org.apache.kylin.gridtable; +import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; import org.apache.kylin.GTForwardingScanner; @@ -38,11 +39,10 @@ public class GTStreamAggregateScanner extends GTForwardingScanner { private final GTScanRequest req; private final Comparator<GTRecord> keyComparator; - public GTStreamAggregateScanner(IGTScanner delegated, - GTScanRequest req, Comparator<GTRecord> keyComparator) { + public GTStreamAggregateScanner(IGTScanner delegated, GTScanRequest scanRequest) { super(delegated); - this.req = req; - this.keyComparator = keyComparator; + this.req = Preconditions.checkNotNull(scanRequest, "scanRequest"); + this.keyComparator = GTRecord.getComparator(scanRequest.getAggrGroupBy()); } @Override @@ -172,14 +172,22 @@ public class GTStreamAggregateScanner extends GTForwardingScanner { private class StreamMergeValuesIterator extends AbstractStreamMergeIterator<Object[]> { private int[] gtDimsIdx; - private int[] gtMetricsIdx; + private int[] gtMetricsIdx; // specify which metric to return and their order + private int[] aggIdx; // specify the ith returning metric's aggStates index + private Object[] result; // avoid object creation StreamMergeValuesIterator(Iterator<GTRecord> input, int[] gtDimsIdx, int[] gtMetricsIdx) { super(input); this.gtDimsIdx = gtDimsIdx; this.gtMetricsIdx = gtMetricsIdx; - result = new Object[gtDimsIdx.length + gtMetricsIdx.length]; + this.aggIdx = new int[gtMetricsIdx.length]; + for (int i = 0; i < aggIdx.length; i++) { + int metricIdx = gtMetricsIdx[i]; + aggIdx[i] = metrics.trueBitIndexOf(metricIdx); + } + + this.result = new Object[gtDimsIdx.length + gtMetricsIdx.length]; } private void decodeAndSetDimensions(GTRecord record) { @@ -202,8 +210,8 @@ public class GTStreamAggregateScanner extends GTForwardingScanner { protected Object[] finalizeResult(GTRecord record, Object[] aggStates) { decodeAndSetDimensions(record); // set metrics - for (int i = 0; i < gtMetricsIdx.length; i++) { - result[gtDimsIdx.length + i] = aggStates[i]; + for (int i = 0; i < aggIdx.length; i++) { + result[gtDimsIdx.length + i] = aggStates[aggIdx[i]]; } return result; } http://git-wip-us.apache.org/repos/asf/kylin/blob/d045a045/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java index bb17054..4522261 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java @@ -18,12 +18,10 @@ package org.apache.kylin.storage; -import java.util.Comparator; import java.util.concurrent.atomic.AtomicLong; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.metadata.realization.IRealization; import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase; import org.slf4j.Logger; @@ -49,9 +47,7 @@ public class StorageContext { private boolean exactAggregation = false; private boolean needStorageAggregation = false; private boolean enableCoprocessor = false; - private boolean enableStreamAggregate = false; - private Comparator<GTRecord> groupKeyComparator; private IStorageQuery storageQuery; private AtomicLong processedRowCount = new AtomicLong(); @@ -242,12 +238,4 @@ public class StorageContext { public void enableStreamAggregate() { this.enableStreamAggregate = true; } - - public Comparator<GTRecord> getGroupKeyComparator() { - return groupKeyComparator; - } - - public void setGroupKeyComparator(Comparator<GTRecord> groupKeyComparator) { - this.groupKeyComparator = groupKeyComparator; - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/d045a045/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index 82590a2..d91a0b4 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -26,18 +26,15 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.RawQueryLastHacker; import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; import org.apache.kylin.dict.lookup.LookupStringTable; -import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.measure.MeasureType; import org.apache.kylin.metadata.filter.ColumnTupleFilter; import org.apache.kylin.metadata.filter.CompareTupleFilter; @@ -392,11 +389,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { } if (enabled) { - CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping(); - ImmutableBitSet cols = mapping.makeGridTableColumns(groupsD); - context.enableStreamAggregate(); - context.setGroupKeyComparator(GTRecord.getComparator(cols)); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/d045a045/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java deleted file mode 100644 index 52029d3..0000000 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.storage.gtrecord; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterators; -import com.google.common.collect.PeekingIterator; -import com.google.common.collect.UnmodifiableIterator; -import org.apache.kylin.gridtable.GTInfo; -import org.apache.kylin.gridtable.GTRecord; - -import java.util.Comparator; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.PriorityQueue; - -/** - * Merge-sort {@code GTRecord}s in all partitions, assume each partition contains sorted elements. - */ -public class PartitionResultMerger implements Iterable<GTRecord> { - private final ImmutableList<PartitionResultIterator> partitionResults; - private final GTInfo info; - private final Comparator<GTRecord> comparator; - - public PartitionResultMerger( - Iterable<PartitionResultIterator> partitionResults, - GTInfo info, Comparator<GTRecord> comparator) { - this.partitionResults = ImmutableList.copyOf(partitionResults); - this.info = info; - this.comparator = comparator; - } - - @Override - public Iterator<GTRecord> iterator() { - if (partitionResults.size() == 1) { - return partitionResults.get(0); - } - return new MergingResultsIterator(); - } - - private class MergingResultsIterator extends UnmodifiableIterator<GTRecord> { - final GTRecord record = new GTRecord(info); // reuse to avoid object creation - - PriorityQueue<PeekingIterator<GTRecord>> heap; - - MergingResultsIterator() { - Comparator<PeekingIterator<GTRecord>> heapComparator = new Comparator<PeekingIterator<GTRecord>>() { - public int compare(PeekingIterator<GTRecord> o1, PeekingIterator<GTRecord> o2) { - return comparator.compare(o1.peek(), o2.peek()); - } - }; - this.heap = new PriorityQueue<>(partitionResults.size(), heapComparator); - - for (PartitionResultIterator it : partitionResults) { - if (it.hasNext()) { - heap.offer(Iterators.peekingIterator(it)); - } - } - } - - @Override - public boolean hasNext() { - return !heap.isEmpty(); - } - - @Override - public GTRecord next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - // get smallest record - PeekingIterator<GTRecord> it = heap.poll(); - // WATCH OUT! record got from PartitionResultIterator.next() may changed later, - // so we must make a shallow copy of it. - record.shallowCopyFrom(it.next()); - - if (it.hasNext()) { - heap.offer(it); - } - - return record; - } - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/d045a045/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java index 11f766c..3bac5ec 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java @@ -90,8 +90,8 @@ public class SegmentCubeTupleIterator implements ITupleIterator { final Iterator<GTRecord> records, final GTScanRequest scanRequest, final int[] gtDimsIdx, final int[] gtMetricsIdx) { - boolean singlePartitionResult = records instanceof PartitionResultIterator; - if (context.isStreamAggregateEnabled() && !singlePartitionResult) { + boolean hasMultiplePartitions = records instanceof SortMergedPartitionResultIterator; + if (hasMultiplePartitions && context.isStreamAggregateEnabled()) { // input records are ordered, leverage stream aggregator to produce possibly fewer records IGTScanner inputScanner = new IGTScanner() { public GTInfo getInfo() { @@ -104,8 +104,7 @@ public class SegmentCubeTupleIterator implements ITupleIterator { return records; } }; - GTStreamAggregateScanner aggregator = new GTStreamAggregateScanner( - inputScanner, scanRequest, context.getGroupKeyComparator()); + GTStreamAggregateScanner aggregator = new GTStreamAggregateScanner(inputScanner, scanRequest); return aggregator.valuesIterator(gtDimsIdx, gtMetricsIdx); } http://git-wip-us.apache.org/repos/asf/kylin/blob/d045a045/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java new file mode 100644 index 0000000..21e61e3 --- /dev/null +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.gtrecord; + +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; +import com.google.common.collect.UnmodifiableIterator; +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; + +import java.util.Comparator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.PriorityQueue; + +/** + * Merge-sort {@code GTRecord}s in all partitions, assume each partition contains sorted elements. + */ +public class SortMergedPartitionResultIterator extends UnmodifiableIterator<GTRecord> { + + final GTRecord record ; // reuse to avoid object creation + PriorityQueue<PeekingIterator<GTRecord>> heap; + + SortMergedPartitionResultIterator( + List<PartitionResultIterator> partitionResults, + GTInfo info, final Comparator<GTRecord> comparator) { + + this.record = new GTRecord(info); + Comparator<PeekingIterator<GTRecord>> heapComparator = new Comparator<PeekingIterator<GTRecord>>() { + public int compare(PeekingIterator<GTRecord> o1, PeekingIterator<GTRecord> o2) { + return comparator.compare(o1.peek(), o2.peek()); + } + }; + this.heap = new PriorityQueue<>(partitionResults.size(), heapComparator); + + for (PartitionResultIterator it : partitionResults) { + if (it.hasNext()) { + heap.offer(Iterators.peekingIterator(it)); + } + } + } + + @Override + public boolean hasNext() { + return !heap.isEmpty(); + } + + @Override + public GTRecord next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + // get smallest record + PeekingIterator<GTRecord> it = heap.poll(); + // WATCH OUT! record got from PartitionResultIterator.next() may changed later, + // so we must make a shallow copy of it. + record.shallowCopyFrom(it.next()); + + if (it.hasNext()) { + heap.offer(it); + } + + return record; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/d045a045/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java index 0f1e191..f1ab20c 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java @@ -44,7 +44,7 @@ public class StorageResponseGTScatter implements IGTScanner { private IPartitionStreamer partitionStreamer; private final Iterator<byte[]> blocks; private final ImmutableBitSet columns; - private final StorageContext context; + private final ImmutableBitSet groupByDims; private final boolean needSorted; // whether scanner should return sorted records public StorageResponseGTScatter(GTScanRequest scanRequest, IPartitionStreamer partitionStreamer, StorageContext context) { @@ -52,7 +52,7 @@ public class StorageResponseGTScatter implements IGTScanner { this.partitionStreamer = partitionStreamer; this.blocks = partitionStreamer.asByteArrayIterator(); this.columns = scanRequest.getColumns(); - this.context = context; + this.groupByDims = scanRequest.getAggrGroupBy(); this.needSorted = (context.getFinalPushDownLimit() != Integer.MAX_VALUE) || context.isStreamAggregateEnabled(); } @@ -74,13 +74,16 @@ public class StorageResponseGTScatter implements IGTScanner { partitionResults.add(new PartitionResultIterator(blocks.next(), info, columns)); } + if (partitionResults.size() == 1) { + return partitionResults.get(0); + } + if (!needSorted) { logger.debug("Using Iterators.concat to merge partition results"); return Iterators.concat(partitionResults.iterator()); } - logger.debug("Using PartitionResultMerger to merge partition results"); - PartitionResultMerger merger = new PartitionResultMerger(partitionResults, info, context.getGroupKeyComparator()); - return merger.iterator(); + logger.debug("Using SortMergedPartitionResultIterator to merge partition results"); + return new SortMergedPartitionResultIterator(partitionResults, info, GTRecord.getComparator(groupByDims)); } }