Repository: kylin Updated Branches: refs/heads/master 7cb88f5f9 -> 6f9bce275
KYLIN-2501 pipeline partition results if possible Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6f9bce27 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6f9bce27 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6f9bce27 Branch: refs/heads/master Commit: 6f9bce27545ef185a64c24aff67790150fb469b9 Parents: 7cb88f5 Author: gaodayue <gaoda...@meituan.com> Authored: Wed Apr 5 13:47:34 2017 +0800 Committer: gaodayue <gaoda...@meituan.com> Committed: Wed Apr 5 13:56:03 2017 +0800 ---------------------------------------------------------------------- .../gtrecord/StorageResponseGTScatter.java | 22 +++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/6f9bce27/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java index f1ab20c..ef12ff0 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java @@ -18,6 +18,7 @@ package org.apache.kylin.storage.gtrecord; +import com.google.common.base.Function; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import org.apache.kylin.common.util.ImmutableBitSet; @@ -69,21 +70,22 @@ public class StorageResponseGTScatter implements IGTScanner { @Override public Iterator<GTRecord> iterator() { - List<PartitionResultIterator> partitionResults = Lists.newArrayList(); - while (blocks.hasNext()) { - partitionResults.add(new PartitionResultIterator(blocks.next(), info, columns)); + Iterator<PartitionResultIterator> iterators = Iterators.transform(blocks, new Function<byte[], PartitionResultIterator>() { + public PartitionResultIterator apply(byte[] input) { + return new PartitionResultIterator(input, info, columns); + } + }); + + if (!needSorted) { + logger.debug("Using Iterators.concat to pipeline partition results"); + return Iterators.concat(iterators); } + List<PartitionResultIterator> partitionResults = Lists.newArrayList(iterators); if (partitionResults.size() == 1) { return partitionResults.get(0); } - - if (!needSorted) { - logger.debug("Using Iterators.concat to merge partition results"); - return Iterators.concat(partitionResults.iterator()); - } - - logger.debug("Using SortMergedPartitionResultIterator to merge partition results"); + logger.debug("Using SortMergedPartitionResultIterator to merge {} partition results", partitionResults.size()); return new SortMergedPartitionResultIterator(partitionResults, info, GTRecord.getComparator(groupByDims)); } }