optimize SortedIteratorMergerWithLimit when only 1 shard temp
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/85fcf019 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/85fcf019 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/85fcf019 Branch: refs/heads/yang22-cdh5.7 Commit: 85fcf019205c3e5e84d1224d1f19e166d3641b0e Parents: 5c70af2 Author: Hongbin Ma <mahong...@apache.org> Authored: Fri Dec 30 17:13:14 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Fri Dec 30 18:09:51 2016 +0800 ---------------------------------------------------------------------- .../storage/gtrecord/CubeSegmentScanner.java | 6 ++++-- .../gtrecord/SequentialCubeTupleIterator.java | 10 +++++++--- .../storage/gtrecord/SortedIteratorMerger.java | 19 ++++++++++++++++++- 3 files changed, 29 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/85fcf019/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java index 9d6f946..6fd88b7 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java @@ -50,12 +50,13 @@ public class CubeSegmentScanner implements IGTScanner { final Cuboid cuboid; final GTScanRequest scanRequest; + final boolean isEmpty; public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, // Collection<FunctionDesc> metrics, TupleFilter originalfilter, StorageContext context) { - + logger.info("Init CubeSegmentScanner for segment {}", cubeSeg.getName()); - + this.cuboid = cuboid; this.cubeSeg = cubeSeg; @@ -80,6 +81,7 @@ public class CubeSegmentScanner implements IGTScanner { scanRequest = scanRangePlanner.planScanRequest(); String gtStorage = ((GTCubeStorageQueryBase) context.getStorageQuery()).getGTStorage(); scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage); + isEmpty = scanRequest == null; } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/85fcf019/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java index ee868c7..49080d6 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java @@ -60,9 +60,13 @@ public class SequentialCubeTupleIterator implements ITupleIterator { segmentCubeTupleIterators = Lists.newArrayList(); for (CubeSegmentScanner scanner : scanners) { - segmentCubeTupleIterators.add(new SegmentCubeTupleIterator(scanner, cuboid, selectedDimensions, selectedMetrics, returnTupleInfo, context)); + if (!scanner.isEmpty) { + segmentCubeTupleIterators.add(new SegmentCubeTupleIterator(scanner, cuboid, selectedDimensions, selectedMetrics, returnTupleInfo, context)); + } } + logger.info("Number of segmentCubeTupleIterators: {}", segmentCubeTupleIterators.size()); + if (!context.isLimitEnabled()) { //normal case tupleIterator = Iterators.concat(segmentCubeTupleIterators.iterator()); @@ -78,7 +82,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator { tupleIterator = new SortedIteratorMergerWithLimit<ITuple>(transformed, context.getFinalPushDownLimit(), getTupleDimensionComparator(cuboid, returnTupleInfo)).getIterator(); } } - + public Comparator<ITuple> getTupleDimensionComparator(Cuboid cuboid, TupleInfo returnTupleInfo) { // dimensionIndexOnTuple is for SQL with limit List<Integer> temp = Lists.newArrayList(); @@ -92,7 +96,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator { for (int i = 0; i < temp.size(); i++) { dimensionIndexOnTuple[i] = temp.get(i); } - + return new Comparator<ITuple>() { @Override public int compare(ITuple o1, ITuple o2) { http://git-wip-us.apache.org/repos/asf/kylin/blob/85fcf019/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java index d5aa9d0..bca8068 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java @@ -22,6 +22,9 @@ import java.util.Comparator; import java.util.Iterator; import java.util.PriorityQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.common.base.Preconditions; /** @@ -29,9 +32,14 @@ import com.google.common.base.Preconditions; */ public class SortedIteratorMerger<E> { + private static final Logger logger = LoggerFactory.getLogger(SortedIteratorMerger.class); + private Iterator<Iterator<E>> shardSubsets; private Comparator<E> comparator; + private int shardCount = 0; + private Iterator<E> firstItr; + public SortedIteratorMerger(Iterator<Iterator<E>> shardSubsets, Comparator<E> comparator) { this.shardSubsets = shardSubsets; this.comparator = comparator; @@ -47,12 +55,22 @@ public class SortedIteratorMerger<E> { while (shardSubsets.hasNext()) { Iterator<E> iterator = shardSubsets.next(); + if (shardCount++ == 0) { + firstItr = iterator; + } + PeekingImpl<E> peekingIterator = new PeekingImpl<>(iterator); + if (peekingIterator.hasNext()) { heap.offer(peekingIterator); } } + if (shardCount == 1) { + logger.info("SortedIteratorMerger will downgrade to a single normal iterator"); + return firstItr; + } + return getIteratorInternal(heap); } @@ -94,7 +112,6 @@ public class SortedIteratorMerger<E> { throw new UnsupportedOperationException(); } - } }