Repository: kylin
Updated Branches:
  refs/heads/master c67891d26 -> 28e942306


http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
index 3c992d2..ff7fb2b 100644
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -21,153 +21,77 @@ package org.apache.kylin.storage.gtrecord;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
-import java.util.NoSuchElementException;
 import java.util.Set;
 
+import javax.annotation.Nullable;
+
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.ITuple;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.Tuple;
 import org.apache.kylin.metadata.tuple.TupleInfo;
 import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.exception.ScanOutOfLimitException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
 public class SequentialCubeTupleIterator implements ITupleIterator {
 
     private static final Logger logger = 
LoggerFactory.getLogger(SequentialCubeTupleIterator.class);
 
-    protected final Cuboid cuboid;
-    protected final Set<TblColRef> selectedDimensions;
-    protected final Set<FunctionDesc> selectedMetrics;
-    protected final TupleInfo tupleInfo;
-    protected final Tuple tuple;
-    protected final Iterator<CubeSegmentScanner> scannerIterator;
-    protected final StorageContext context;
-
-    protected CubeSegmentScanner curScanner;
-    protected Iterator<GTRecord> curRecordIterator;
-    protected CubeTupleConverter curTupleConverter;
-    protected Tuple next;
-
-    private List<IAdvMeasureFiller> advMeasureFillers;
-    private int advMeasureRowsRemaining;
-    private int advMeasureRowIndex;
+    protected List<CubeSegmentScanner> scanners;
+    protected List<SegmentCubeTupleIterator> segmentCubeTupleIterators;
+    protected Iterator<ITuple> tupleIterator;
+    protected final int storagePushDownLimit;
+    protected StorageContext context;
 
     private int scanCount;
     private int scanCountDelta;
 
     public SequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, 
Cuboid cuboid, Set<TblColRef> selectedDimensions, //
             Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, 
StorageContext context) {
-        this.cuboid = cuboid;
-        this.selectedDimensions = selectedDimensions;
-        this.selectedMetrics = selectedMetrics;
-        this.tupleInfo = returnTupleInfo;
-        this.tuple = new Tuple(returnTupleInfo);
-        this.scannerIterator = scanners.iterator();
         this.context = context;
-    }
+        this.scanners = scanners;
 
-    @Override
-    public boolean hasNext() {
-        if (next != null)
-            return true;
-
-        if (hitLimitAndThreshold())
-            return false;
-
-        // consume any left rows from advanced measure filler
-        if (advMeasureRowsRemaining > 0) {
-            for (IAdvMeasureFiller filler : advMeasureFillers) {
-                filler.fillTuple(tuple, advMeasureRowIndex);
-            }
-            advMeasureRowIndex++;
-            advMeasureRowsRemaining--;
-            next = tuple;
-            return true;
+        segmentCubeTupleIterators = Lists.newArrayList();
+        for (CubeSegmentScanner scanner : scanners) {
+            segmentCubeTupleIterators.add(new 
SegmentCubeTupleIterator(scanner, cuboid, selectedDimensions, selectedMetrics, 
returnTupleInfo, context));
         }
-
-        // get the next GTRecord
-        if (curScanner == null) {
-            if (scannerIterator.hasNext()) {
-                curScanner = scannerIterator.next();
-                curRecordIterator = curScanner.iterator();
-                if (curRecordIterator.hasNext()) {
-                    //if the segment does not has any tuples, don't bother to 
create a converter
-                    curTupleConverter = new 
CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, 
selectedMetrics, tupleInfo);
+        
+        this.storagePushDownLimit = context.getStoragePushDownLimit();
+        if (storagePushDownLimit > 
KylinConfig.getInstanceFromEnv().getStoragePushDownLimitMax()) {
+            //normal case
+            tupleIterator = 
Iterators.concat(segmentCubeTupleIterators.iterator());
+        } else {
+            //query with limit
+            Iterator<Iterator<ITuple>> transformed = 
Iterators.transform(segmentCubeTupleIterators.iterator(), new 
Function<SegmentCubeTupleIterator, Iterator<ITuple>>() {
+                @Nullable
+                @Override
+                public Iterator<ITuple> apply(@Nullable 
SegmentCubeTupleIterator input) {
+                    return input;
                 }
-            } else {
-                return false;
-            }
-        }
-        if (curRecordIterator.hasNext() == false) {
-            close(curScanner);
-            curScanner = null;
-            curRecordIterator = null;
-            curTupleConverter = null;
-            return hasNext();
+            });
+            tupleIterator = new 
SortedIteratorMergerWithLimit<ITuple>(transformed, storagePushDownLimit, 
segmentCubeTupleIterators.get(0).getCubeTupleConverter().getTupleDimensionComparator()).getIterator();
         }
-
-        // now we have a GTRecord
-        GTRecord curRecord = curRecordIterator.next();
-
-        // translate into tuple
-        advMeasureFillers = curTupleConverter.translateResult(curRecord, 
tuple);
-
-        // the simple case
-        if (advMeasureFillers == null) {
-            next = tuple;
-            return true;
-        }
-
-        // advanced measure filling, like TopN, will produce multiple tuples 
out of one record
-        advMeasureRowsRemaining = -1;
-        for (IAdvMeasureFiller filler : advMeasureFillers) {
-            if (advMeasureRowsRemaining < 0)
-                advMeasureRowsRemaining = filler.getNumOfRows();
-            if (advMeasureRowsRemaining != filler.getNumOfRows())
-                throw new IllegalStateException();
-        }
-        if (advMeasureRowsRemaining < 0)
-            throw new IllegalStateException();
-
-        advMeasureRowIndex = 0;
-        return hasNext();
     }
 
-    private boolean hitLimitAndThreshold() {
-        // check limit
-        if (context.isLimitEnabled() && scanCount >= context.getLimit() + 
context.getOffset()) {
-            return true;
-        }
-        // check threshold
-        if (scanCount >= context.getThreshold()) {
-            throw new ScanOutOfLimitException("Scan row count exceeded 
threshold: " + context.getThreshold() + ", please add filter condition to 
narrow down backend scan range, like where clause.");
-        }
-        return false;
+    @Override
+    public boolean hasNext() {
+        return tupleIterator.hasNext();
     }
 
     @Override
     public ITuple next() {
-        // fetch next record
-        if (next == null) {
-            hasNext();
-            if (next == null)
-                throw new NoSuchElementException();
-        }
-
         scanCount++;
         if (++scanCountDelta >= 1000)
             flushScanCountDelta();
 
-        ITuple result = next;
-        next = null;
-        return result;
+        return tupleIterator.next();
     }
 
     @Override
@@ -181,11 +105,8 @@ public class SequentialCubeTupleIterator implements 
ITupleIterator {
         // close all the remaining segmentIterator
         flushScanCountDelta();
 
-        if (curScanner != null)
-            close(curScanner);
-
-        while (scannerIterator.hasNext()) {
-            close(scannerIterator.next());
+        for (SegmentCubeTupleIterator iterator : segmentCubeTupleIterators) {
+            iterator.close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java
 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java
new file mode 100644
index 0000000..d5aa9d0
--- /dev/null
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * a merger that utilizes the sorted nature of input iterators
+ */
+public class SortedIteratorMerger<E> {
+
+    private Iterator<Iterator<E>> shardSubsets;
+    private Comparator<E> comparator;
+
+    public SortedIteratorMerger(Iterator<Iterator<E>> shardSubsets, 
Comparator<E> comparator) {
+        this.shardSubsets = shardSubsets;
+        this.comparator = comparator;
+    }
+
+    public Iterator<E> getIterator() {
+        final PriorityQueue<PeekingImpl<E>> heap = new 
PriorityQueue<PeekingImpl<E>>(11, new Comparator<PeekingImpl<E>>() {
+            @Override
+            public int compare(PeekingImpl<E> o1, PeekingImpl<E> o2) {
+                return comparator.compare(o1.peek(), o2.peek());
+            }
+        });
+
+        while (shardSubsets.hasNext()) {
+            Iterator<E> iterator = shardSubsets.next();
+            PeekingImpl<E> peekingIterator = new PeekingImpl<>(iterator);
+            if (peekingIterator.hasNext()) {
+                heap.offer(peekingIterator);
+            }
+        }
+
+        return getIteratorInternal(heap);
+    }
+
+    protected Iterator<E> getIteratorInternal(PriorityQueue<PeekingImpl<E>> 
heap) {
+        return new MergedIterator<E>(heap, comparator);
+    }
+
+    private static class MergedIterator<E> implements Iterator<E> {
+
+        private final PriorityQueue<PeekingImpl<E>> heap;
+        private final Comparator<E> comparator;
+
+        public MergedIterator(PriorityQueue<PeekingImpl<E>> heap, 
Comparator<E> comparator) {
+            this.heap = heap;
+            this.comparator = comparator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return !heap.isEmpty();
+        }
+
+        @Override
+        public E next() {
+            PeekingImpl<E> poll = heap.poll();
+            E current = poll.next();
+            if (poll.hasNext()) {
+
+                //TODO: remove this check when validated
+                Preconditions.checkState(comparator.compare(current, 
poll.peek()) < 0, "Not sorted! current: " + current + " Next: " + poll.peek());
+
+                heap.offer(poll);
+            }
+            return current;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+     
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimit.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimit.java
 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimit.java
new file mode 100644
index 0000000..0e40150
--- /dev/null
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimit.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * the limit here correspond to the the limit in sql
+ * if the SQL ends with "limit N", then each shard will return N "smallest" 
records
+ * The query sever side will use a heap to pick right records.
+ * 
+ * There're two usage of SortedIteratorMergerWithLimit in kylin
+ * One at GTRecord level and the other at Tuple Level
+ * The first is to deal with cuboid shards among the same segment
+ * and the second is to deal with multiple segments
+ * 
+ * Let's use single-segment as an example:
+ * suppose we have a "limit 2" in SQL, and we have three shards in the segment
+ * the first returns (1,2), the second returns (1,3) and the third returns 
(2,3)
+ * each subset is guaranteed to be sorted. (that's why it's called 
"SortedIterator Merger")
+ * SortedIteratorMergerWithLimit will merge these three subsets and return 
(1,1,2,2)
+ * 
+ */
+public class SortedIteratorMergerWithLimit<E extends Cloneable> extends 
SortedIteratorMerger<E> {
+    private int limit;
+    private Comparator<E> comparator;
+
+    public SortedIteratorMergerWithLimit(Iterator<Iterator<E>> shardSubsets, 
int limit, Comparator<E> comparator) {
+        super(shardSubsets, comparator);
+        this.limit = limit;
+        this.comparator = comparator;
+    }
+
+    protected Iterator<E> getIteratorInternal(PriorityQueue<PeekingImpl<E>> 
heap) {
+        return new MergedIteratorWithLimit<E>(heap, limit, comparator);
+    }
+
+    static class MergedIteratorWithLimit<E extends Cloneable> implements 
Iterator<E> {
+
+        private final PriorityQueue<PeekingImpl<E>> heap;
+        private final Comparator<E> comparator;
+
+        private boolean nextFetched = false;
+        private E fetched = null;
+        private E last = null;
+
+        private int limit;
+        private int limitProgress = 0;
+
+        private PeekingImpl<E> lastSource = null;
+
+        public MergedIteratorWithLimit(PriorityQueue<PeekingImpl<E>> heap, int 
limit, Comparator<E> comparator) {
+            this.heap = heap;
+            this.limit = limit;
+            this.comparator = comparator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (nextFetched) {
+                return true;
+            }
+
+            if (lastSource != null && lastSource.hasNext()) {
+                if (lastSource.hasNext()) {
+                    heap.offer(lastSource);
+                } else {
+                    lastSource = null;
+                }
+            }
+
+            if (!heap.isEmpty()) {
+                PeekingImpl<E> first = heap.poll();
+                E current = first.next();
+                try {
+                    current = (E) 
current.getClass().getMethod("clone").invoke(current);
+                } catch (IllegalAccessException | InvocationTargetException | 
NoSuchMethodException e) {
+                    throw new RuntimeException(e);
+                }
+
+                lastSource = first;
+
+                Preconditions.checkState(current != null);
+
+                if (last == null || comparator.compare(current, last) != 0) {
+                    if (++limitProgress > limit) {
+                        return false;
+                    }
+                }
+                nextFetched = true;
+                fetched = current;
+
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        @Override
+        public E next() {
+            if (!nextFetched) {
+                throw new IllegalStateException("Should hasNext() before 
next()");
+            }
+
+            //TODO: remove this check when validated
+            if (last != null) {
+                Preconditions.checkState(comparator.compare(last, fetched) <= 
0, "Not sorted! last: " + last + " fetched: " + fetched);
+            }
+
+            last = fetched;
+            nextFetched = false;
+
+            return fetched;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerTest.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerTest.java
 
b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerTest.java
new file mode 100644
index 0000000..f09844a
--- /dev/null
+++ 
b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class SortedIteratorMergerTest {
+
+    private Comparator<Integer> getComp() {
+        return new Comparator<Integer>() {
+            @Override
+            public int compare(Integer o1, Integer o2) {
+                return o1 - o2;
+            }
+        };
+    }
+
+    @Test
+    public void basic1() {
+
+        List<Integer> a = Lists.newArrayList(1, 2, 3);
+        List<Integer> b = Lists.newArrayList(1, 2, 3);
+        List<Integer> c = Lists.newArrayList(1, 2, 5);
+        List<Iterator<Integer>> input = Lists.newArrayList();
+        input.add(a.iterator());
+        input.add(b.iterator());
+        input.add(c.iterator());
+        SortedIteratorMerger<Integer> merger = new 
SortedIteratorMerger<Integer>(input.iterator(), getComp());
+        Iterator<Integer> iterator = merger.getIterator();
+        List<Integer> result = Lists.newArrayList();
+        while (iterator.hasNext()) {
+            result.add(iterator.next());
+        }
+        Assert.assertEquals(Lists.newArrayList(1, 1, 1, 2, 2, 2, 3, 3, 5), 
result);
+    }
+
+    @Test
+    public void basic2() {
+
+        List<Integer> a = Lists.newArrayList(2);
+        List<Integer> b = Lists.newArrayList();
+        List<Integer> c = Lists.newArrayList(1, 2, 5);
+        List<Iterator<Integer>> input = Lists.newArrayList();
+        input.add(a.iterator());
+        input.add(b.iterator());
+        input.add(c.iterator());
+        SortedIteratorMerger<Integer> merger = new 
SortedIteratorMerger<Integer>(input.iterator(), getComp());
+        Iterator<Integer> iterator = merger.getIterator();
+        List<Integer> result = Lists.newArrayList();
+        while (iterator.hasNext()) {
+            result.add(iterator.next());
+        }
+        Assert.assertEquals(Lists.newArrayList(1, 2, 2, 5), result);
+    }
+
+    @Test
+    public void basic3() {
+
+        List<Integer> a = Lists.newArrayList();
+        List<Integer> b = Lists.newArrayList();
+        List<Integer> c = Lists.newArrayList();
+        List<Iterator<Integer>> input = Lists.newArrayList();
+        input.add(a.iterator());
+        input.add(b.iterator());
+        input.add(c.iterator());
+        SortedIteratorMerger<Integer> merger = new 
SortedIteratorMerger<Integer>(input.iterator(), getComp());
+        Iterator<Integer> iterator = merger.getIterator();
+        List<Integer> result = Lists.newArrayList();
+        while (iterator.hasNext()) {
+            result.add(iterator.next());
+        }
+        Assert.assertEquals(Lists.newArrayList(), result);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimitTest.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimitTest.java
 
b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimitTest.java
new file mode 100644
index 0000000..1627b4f
--- /dev/null
+++ 
b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimitTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class SortedIteratorMergerWithLimitTest {
+    class CloneableInteger implements Cloneable {
+        int value;
+
+        public CloneableInteger(int value) {
+            this.value = value;
+        }
+
+        @Override
+        public Object clone() {
+            return new CloneableInteger(value);
+        }
+
+        @Override
+        public int hashCode() {
+            return value;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            CloneableInteger that = (CloneableInteger) o;
+
+            return value == that.value;
+
+        }
+    }
+
+    private Comparator<CloneableInteger> getComp() {
+        return new Comparator<CloneableInteger>() {
+            @Override
+            public int compare(CloneableInteger o1, CloneableInteger o2) {
+                return o1.value - o2.value;
+            }
+        };
+    }
+
+    @Test
+    public void basic1() {
+
+        List<CloneableInteger> a = Lists.newArrayList(new CloneableInteger(1), 
new CloneableInteger(2), new CloneableInteger(3));
+        List<CloneableInteger> b = Lists.newArrayList(new CloneableInteger(1), 
new CloneableInteger(2), new CloneableInteger(3));
+        List<CloneableInteger> c = Lists.newArrayList(new CloneableInteger(1), 
new CloneableInteger(2), new CloneableInteger(5));
+        List<Iterator<CloneableInteger>> input = Lists.newArrayList();
+        input.add(a.iterator());
+        input.add(b.iterator());
+        input.add(c.iterator());
+        SortedIteratorMergerWithLimit<CloneableInteger> merger = new 
SortedIteratorMergerWithLimit<CloneableInteger>(input.iterator(), 3, getComp());
+        Iterator<CloneableInteger> iterator = merger.getIterator();
+        List<CloneableInteger> result = Lists.newArrayList();
+        while (iterator.hasNext()) {
+            result.add(iterator.next());
+        }
+        Assert.assertEquals(Lists.newArrayList(new CloneableInteger(1), new 
CloneableInteger(1), new CloneableInteger(1), new CloneableInteger(2), new 
CloneableInteger(2), new CloneableInteger(2), new CloneableInteger(3), new 
CloneableInteger(3)), result);
+    }
+
+    @Test
+    public void basic2() {
+
+        List<CloneableInteger> a = Lists.newArrayList(new CloneableInteger(2));
+        List<CloneableInteger> b = Lists.newArrayList();
+        List<CloneableInteger> c = Lists.newArrayList(new CloneableInteger(1), 
new CloneableInteger(2), new CloneableInteger(5));
+        List<Iterator<CloneableInteger>> input = Lists.newArrayList();
+        input.add(a.iterator());
+        input.add(b.iterator());
+        input.add(c.iterator());
+        SortedIteratorMergerWithLimit<CloneableInteger> merger = new 
SortedIteratorMergerWithLimit<CloneableInteger>(input.iterator(), 3, getComp());
+        Iterator<CloneableInteger> iterator = merger.getIterator();
+        List<CloneableInteger> result = Lists.newArrayList();
+        while (iterator.hasNext()) {
+            result.add(iterator.next());
+        }
+        Assert.assertEquals(Lists.newArrayList(new CloneableInteger(1), new 
CloneableInteger(2), new CloneableInteger(2), new CloneableInteger(5)), result);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void basic3() {
+
+        List<CloneableInteger> a = Lists.newArrayList(new CloneableInteger(2), 
new CloneableInteger(1));
+        List<CloneableInteger> b = Lists.newArrayList();
+        List<CloneableInteger> c = Lists.newArrayList(new CloneableInteger(1), 
new CloneableInteger(2), new CloneableInteger(5));
+        List<Iterator<CloneableInteger>> input = Lists.newArrayList();
+        input.add(a.iterator());
+        input.add(b.iterator());
+        input.add(c.iterator());
+        SortedIteratorMergerWithLimit<CloneableInteger> merger = new 
SortedIteratorMergerWithLimit<CloneableInteger>(input.iterator(), 3, getComp());
+        Iterator<CloneableInteger> iterator = merger.getIterator();
+        List<CloneableInteger> result = Lists.newArrayList();
+        while (iterator.hasNext()) {
+            result.add(iterator.next());
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/kylin-it/src/test/java/org/apache/kylin/query/HackedDbUnitAssert.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/query/HackedDbUnitAssert.java 
b/kylin-it/src/test/java/org/apache/kylin/query/HackedDbUnitAssert.java
new file mode 100644
index 0000000..c295430
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/query/HackedDbUnitAssert.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query;
+
+import org.dbunit.DatabaseUnitException;
+import org.dbunit.assertion.DbUnitAssert;
+import org.dbunit.assertion.FailureHandler;
+import org.dbunit.dataset.Column;
+import org.dbunit.dataset.Columns;
+import org.dbunit.dataset.DataSetException;
+import org.dbunit.dataset.ITable;
+import org.dbunit.dataset.ITableMetaData;
+import org.dbunit.dataset.datatype.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * dirty hack to support checking result of SQL with limit
+ */
+public class HackedDbUnitAssert extends DbUnitAssert {
+    private static final Logger logger = 
LoggerFactory.getLogger(HackedDbUnitAssert.class);
+
+    public void assertEquals(ITable expectedTable, ITable actualTable, 
FailureHandler failureHandler) throws DatabaseUnitException {
+        logger.trace("assertEquals(expectedTable, actualTable, failureHandler) 
- start");
+        logger.debug("assertEquals: expectedTable={}", expectedTable);
+        logger.debug("assertEquals: actualTable={}", actualTable);
+        logger.debug("assertEquals: failureHandler={}", failureHandler);
+
+        // Do not continue if same instance
+        if (expectedTable == actualTable) {
+            logger.debug("The given tables reference the same object. Will 
return immediately. (Table={})", expectedTable);
+            return;
+        }
+
+        if (failureHandler == null) {
+            logger.debug("FailureHandler is null. Using default 
implementation");
+            failureHandler = getDefaultFailureHandler();
+        }
+
+        ITableMetaData expectedMetaData = expectedTable.getTableMetaData();
+        ITableMetaData actualMetaData = actualTable.getTableMetaData();
+        String expectedTableName = expectedMetaData.getTableName();
+
+        //        // Verify row count
+        //        int expectedRowsCount = expectedTable.getRowCount();
+        //        int actualRowsCount = actualTable.getRowCount();
+        //        if (expectedRowsCount != actualRowsCount) {
+        //            String msg = "row count (table=" + expectedTableName + 
")";
+        //            Error error =
+        //                    failureHandler.createFailure(msg, String
+        //                            .valueOf(expectedRowsCount), String
+        //                            .valueOf(actualRowsCount));
+        //            logger.error(error.toString());
+        //            throw error;
+        //        }
+
+        // if both tables are empty, it is not necessary to compare columns, as
+        // such
+        // comparison
+        // can fail if column metadata is different (which could occurs when
+        // comparing empty tables)
+        if (expectedTable.getRowCount() == 0 &&  actualTable.getRowCount() == 
0) {
+            logger.debug("Tables are empty, hence equals.");
+            return;
+        }
+
+        // Put the columns into the same order
+        Column[] expectedColumns = Columns.getSortedColumns(expectedMetaData);
+        Column[] actualColumns = Columns.getSortedColumns(actualMetaData);
+
+        // Verify columns
+        Columns.ColumnDiff columnDiff = 
Columns.getColumnDiff(expectedMetaData, actualMetaData);
+        if (columnDiff.hasDifference()) {
+            String message = columnDiff.getMessage();
+            Error error = failureHandler.createFailure(message, 
Columns.getColumnNamesAsString(expectedColumns), 
Columns.getColumnNamesAsString(actualColumns));
+            logger.error(error.toString());
+            throw error;
+        }
+
+        // Get the datatypes to be used for comparing the sorted columns
+        ComparisonColumn[] comparisonCols = 
getComparisonColumns(expectedTableName, expectedColumns, actualColumns, 
failureHandler);
+
+        // Finally compare the data
+        compareData(expectedTable, actualTable, comparisonCols, 
failureHandler);
+    }
+
+    protected void compareData(ITable expectedTable, ITable actualTable, 
ComparisonColumn[] comparisonCols, FailureHandler failureHandler) throws 
DataSetException {
+        logger.debug("compareData(expectedTable={}, actualTable={}, " + 
"comparisonCols={}, failureHandler={}) - start", new Object[] { expectedTable, 
actualTable, comparisonCols, failureHandler });
+
+        if (expectedTable == null) {
+            throw new NullPointerException("The parameter 'expectedTable' must 
not be null");
+        }
+        if (actualTable == null) {
+            throw new NullPointerException("The parameter 'actualTable' must 
not be null");
+        }
+        if (comparisonCols == null) {
+            throw new NullPointerException("The parameter 'comparisonCols' 
must not be null");
+        }
+        if (failureHandler == null) {
+            throw new NullPointerException("The parameter 'failureHandler' 
must not be null");
+        }
+
+        for (int index = 0; index < actualTable.getRowCount(); index++) {
+            if (!findRowInExpectedTable(expectedTable, actualTable, 
comparisonCols, failureHandler, index)) {
+                throw new IllegalStateException();
+            }
+        }
+
+    }
+
+    private boolean findRowInExpectedTable(ITable expectedTable, ITable 
actualTable, ComparisonColumn[] comparisonCols, FailureHandler failureHandler, 
int index) throws DataSetException {
+
+        // iterate over all rows
+        for (int i = 0; i < expectedTable.getRowCount(); i++) {
+
+            // iterate over all columns of the current row
+            for (int j = 0; j < comparisonCols.length; j++) {
+                ComparisonColumn compareColumn = comparisonCols[j];
+
+                String columnName = compareColumn.getColumnName();
+                DataType dataType = compareColumn.getDataType();
+
+                Object expectedValue = expectedTable.getValue(i, columnName);
+                Object actualValue = actualTable.getValue(index, columnName);
+
+                // Compare the values
+                if (skipCompare(columnName, expectedValue, actualValue)) {
+                    if (logger.isTraceEnabled()) {
+                        logger.trace("ignoring comparison " + expectedValue + 
"=" + actualValue + " on column " + columnName);
+                    }
+                    continue;
+                }
+
+                if (dataType.compare(expectedValue, actualValue) != 0) {
+                    break;
+
+                    //                    Difference diff = new 
Difference(expectedTable, actualTable, i, columnName, expectedValue, 
actualValue);
+                    //
+                    //                    // Handle the difference (throw 
error immediately or something else)
+                    //                    failureHandler.handle(diff);
+                } else {
+                    if (j == comparisonCols.length - 1) {
+                        return true;
+                    } else {
+                        continue;
+                    }
+                }
+            }
+        }
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java 
b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index a6e7956..2c428ec 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -35,6 +35,7 @@ import org.apache.kylin.query.enumerator.OLAPQuery;
 import org.apache.kylin.query.relnode.OLAPContext;
 import org.apache.kylin.query.routing.Candidate;
 import org.apache.kylin.query.schema.OLAPSchemaFactory;
+import org.apache.kylin.storage.hbase.HBaseStorage;
 import 
org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
 import org.dbunit.database.DatabaseConnection;
 import org.dbunit.database.IDatabaseConnection;
@@ -123,7 +124,7 @@ public class ITKylinQueryTest extends KylinTestBase {
     @Test
     public void testSingleExecuteQuery() throws Exception {
 
-        String queryFileName = getQueryFolderPrefix() + 
"src/test/resources/query/sql_tableau/query20.sql";
+        String queryFileName = getQueryFolderPrefix() + 
"src/test/resources/query/temp/query01.sql";
 
         File sqlFile = new File(queryFileName);
         String sql = getTextFromFile(sqlFile);
@@ -187,7 +188,7 @@ public class ITKylinQueryTest extends KylinTestBase {
     @Test
     public void testPreciselyDistinctCountQuery() throws Exception {
         if ("left".equalsIgnoreCase(joinType)) {
-            execAndCompQuery(getQueryFolderPrefix() + 
"src/test/resources/query/sql_distinct_precisely", null, true);
+            execAndCompQuery(getQueryFolderPrefix() + 
"src/test/resources/query/temp", null, true);
         }
     }
 
@@ -257,6 +258,13 @@ public class ITKylinQueryTest extends KylinTestBase {
     }
 
     @Test
+    public void testLimitCorrectness() throws Exception {
+        if (HBaseStorage.overwriteStorageQuery == null) {//v1 query engine 
will not work
+            execLimitAndValidate(getQueryFolderPrefix() + 
"src/test/resources/query/sql");
+        }
+    }
+
+    @Test
     public void testTopNQuery() throws Exception {
         if ("left".equalsIgnoreCase(joinType)) {
             this.execAndCompQuery(getQueryFolderPrefix() + 
"src/test/resources/query/sql_topn", null, true);

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java 
b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index 0511971..4e59815 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -34,6 +34,7 @@ import java.sql.Statement;
 import java.sql.Timestamp;
 import java.sql.Types;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
@@ -62,6 +63,33 @@ import com.google.common.io.Files;
  */
 public class KylinTestBase {
 
+    class ObjectArray {
+        Object[] data;
+
+        public ObjectArray(Object[] data) {
+            this.data = data;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            ObjectArray that = (ObjectArray) o;
+
+            // Probably incorrect - comparing Object[] arrays with 
Arrays.equals
+            return Arrays.equals(data, that.data);
+
+        }
+
+        @Override
+        public int hashCode() {
+            return Arrays.hashCode(data);
+        }
+    }
+
     // Hack for the different constant integer type between optiq (INTEGER) and
     // h2 (BIGINT)
     public static class TestH2DataTypeFactory extends H2DataTypeFactory {
@@ -357,6 +385,50 @@ public class KylinTestBase {
         }
     }
 
+    protected void execLimitAndValidate(String queryFolder) throws Exception {
+        printInfo("---------- test folder: " + new 
File(queryFolder).getAbsolutePath());
+
+        int appendLimitQueries = 0;
+        List<File> sqlFiles = getFilesFromFolder(new File(queryFolder), 
".sql");
+        for (File sqlFile : sqlFiles) {
+            String queryName = StringUtils.split(sqlFile.getName(), '.')[0];
+            String sql = getTextFromFile(sqlFile);
+
+            String sqlWithLimit;
+            if (sql.toLowerCase().contains("limit ")) {
+                sqlWithLimit = sql;
+            } else {
+                sqlWithLimit = sql + " limit 5";
+                appendLimitQueries++;
+            }
+
+            // execute Kylin
+            printInfo("Query Result from Kylin - " + queryName + "  (" + 
queryFolder + ")");
+            IDatabaseConnection kylinConn = new 
DatabaseConnection(cubeConnection);
+            ITable kylinTable = executeQuery(kylinConn, queryName, 
sqlWithLimit, false);
+
+            // execute H2
+            printInfo("Query Result from H2 - " + queryName);
+            H2Connection h2Conn = new H2Connection(h2Connection, null);
+            
h2Conn.getConfig().setProperty(DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new 
TestH2DataTypeFactory());
+            ITable h2Table = executeQuery(h2Conn, queryName, sql, false);
+
+            try {
+                HackedDbUnitAssert hackedDbUnitAssert = new 
HackedDbUnitAssert();
+                hackedDbUnitAssert.assertEquals(h2Table, kylinTable);
+            } catch (Throwable t) {
+                printInfo("execAndCompQuery failed on: " + 
sqlFile.getAbsolutePath());
+                throw t;
+            }
+
+            compQueryCount++;
+            if (kylinTable.getRowCount() == 0) {
+                zeroResultQueries.add(sql);
+            }
+        }
+        printInfo("Queries appended with limit: " + appendLimitQueries);
+    }
+
     protected void execAndCompQuery(String queryFolder, String[] 
exclusiveQuerys, boolean needSort) throws Exception {
         printInfo("---------- test folder: " + new 
File(queryFolder).getAbsolutePath());
         Set<String> exclusiveSet = buildExclusiveSet(exclusiveQuerys);

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java
index da8e7ce..3fdb92f 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java
@@ -29,7 +29,7 @@ import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTInfo.Builder;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTSampleCodeSystem;
-import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.gridtable.IGTWriter;
 import org.apache.kylin.gridtable.benchmark.SortedGTRecordGenerator;
@@ -109,7 +109,7 @@ public class HBaseScannerBenchmark {
     private void testScanRaw(String msg) throws IOException {
         long t = System.currentTimeMillis();
 
-        IGTScanner scan = simpleStore.scan(new GTScanRequest(info, null, null, 
null));
+        IGTScanner scan = simpleStore.scan(new 
GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
         ResultScanner innerScanner = ((SimpleHBaseStore.Reader) 
scan).getHBaseScanner();
         int count = 0;
         for (Result r : innerScanner) {
@@ -125,7 +125,7 @@ public class HBaseScannerBenchmark {
     private void testScanRecords(String msg) throws IOException {
         long t = System.currentTimeMillis();
 
-        IGTScanner scan = simpleStore.scan(new GTScanRequest(info, null, null, 
null));
+        IGTScanner scan = simpleStore.scan(new 
GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
         int count = 0;
         for (GTRecord rec : scan) {
             count++;

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 1cebdea..4c599d9 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -21,20 +21,11 @@ package org.apache.kylin.storage.hbase.cube.v2;
 import java.io.IOException;
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
-import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.DataFormatException;
 
-import javax.annotation.Nullable;
-
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
@@ -52,7 +43,6 @@ import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRange;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
@@ -67,8 +57,6 @@ import 
org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.Cub
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.HBaseZeroCopyByteString;
@@ -79,153 +67,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
     private static ExecutorService executorService = new 
LoggableCachedThreadPool();
 
-    static class ExpectedSizeIterator implements Iterator<byte[]> {
-
-        BlockingQueue<byte[]> queue;
-
-        int expectedSize;
-        int current = 0;
-        long timeout;
-        long timeoutTS;
-        volatile Throwable coprocException;
-
-        public ExpectedSizeIterator(int expectedSize) {
-            this.expectedSize = expectedSize;
-            this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
-
-            Configuration hconf = 
HBaseConnection.getCurrentHBaseConfiguration();
-            this.timeout = 
hconf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5) * 
hconf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 60000);
-            this.timeout = Math.max(this.timeout, 5 * 60000);
-            this.timeout *= 
KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes();
-
-            if (BackdoorToggles.getQueryTimeout() != -1) {
-                this.timeout = BackdoorToggles.getQueryTimeout();
-            }
-
-            this.timeout *= 1.1; // allow for some delay
-
-            logger.info("Timeout for ExpectedSizeIterator is: " + 
this.timeout);
-
-            this.timeoutTS = System.currentTimeMillis() + this.timeout;
-        }
-
-        @Override
-        public boolean hasNext() {
-            return (current < expectedSize);
-        }
-
-        @Override
-        public byte[] next() {
-            if (current >= expectedSize) {
-                throw new IllegalStateException("Won't have more data");
-            }
-            try {
-                current++;
-                byte[] ret = null;
-
-                while (ret == null && coprocException == null && timeoutTS - 
System.currentTimeMillis() > 0) {
-                    ret = queue.poll(5000, TimeUnit.MILLISECONDS);
-                }
-
-                if (coprocException != null) {
-                    throw new RuntimeException("Error in coprocessor", 
coprocException);
-                } else if (ret == null) {
-                    throw new RuntimeException("Timeout visiting cube!");
-                } else {
-                    return ret;
-                }
-            } catch (InterruptedException e) {
-                throw new RuntimeException("Error when waiting queue", e);
-            }
-        }
-
-        @Override
-        public void remove() {
-            throw new NotImplementedException();
-        }
-
-        public void append(byte[] data) {
-            try {
-                queue.put(data);
-            } catch (InterruptedException e) {
-                throw new RuntimeException("error when waiting queue", e);
-            }
-        }
-
-        public long getTimeout() {
-            return timeout;
-        }
-
-        public void notifyCoprocException(Throwable ex) {
-            coprocException = ex;
-        }
-    }
-
-    static class EndpointResultsAsGTScanner implements IGTScanner {
-        private GTInfo info;
-        private Iterator<byte[]> blocks;
-        private ImmutableBitSet columns;
-        private long totalScannedCount;
-
-        public EndpointResultsAsGTScanner(GTInfo info, Iterator<byte[]> 
blocks, ImmutableBitSet columns, long totalScannedCount) {
-            this.info = info;
-            this.blocks = blocks;
-            this.columns = columns;
-            this.totalScannedCount = totalScannedCount;
-        }
-
-        @Override
-        public GTInfo getInfo() {
-            return info;
-        }
-
-        @Override
-        public long getScannedRowCount() {
-            return totalScannedCount;
-        }
-
-        @Override
-        public void close() throws IOException {
-            //do nothing
-        }
-
-        @Override
-        public Iterator<GTRecord> iterator() {
-            return Iterators.concat(Iterators.transform(blocks, new 
Function<byte[], Iterator<GTRecord>>() {
-                @Nullable
-                @Override
-                public Iterator<GTRecord> apply(@Nullable final byte[] input) {
-
-                    return new Iterator<GTRecord>() {
-                        private ByteBuffer inputBuffer = null;
-                        private GTRecord oneRecord = null;
-
-                        @Override
-                        public boolean hasNext() {
-                            if (inputBuffer == null) {
-                                inputBuffer = ByteBuffer.wrap(input);
-                                oneRecord = new GTRecord(info);
-                            }
-
-                            return inputBuffer.position() < 
inputBuffer.limit();
-                        }
-
-                        @Override
-                        public GTRecord next() {
-                            oneRecord.loadColumns(columns, inputBuffer);
-                            return oneRecord;
-                        }
-
-                        @Override
-                        public void remove() {
-                            throw new UnsupportedOperationException();
-                        }
-                    };
-                }
-            }));
-        }
-    }
-
     public CubeHBaseEndpointRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo 
fullGTInfo) {
         super(cubeSeg, cuboid, fullGTInfo);
     }
@@ -345,7 +186,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
         builder.setBehavior(toggle);
         builder.setStartTime(System.currentTimeMillis());
-        builder.setTimeout(epResultItr.getTimeout());
+        builder.setTimeout(epResultItr.getRpcTimeout());
         builder.setKylinProperties(kylinConfig.getConfigAsString());
 
         for (final Pair<byte[], byte[]> epRange : 
getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
@@ -407,7 +248,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                     }
 
                     if (abnormalFinish[0]) {
-                        Throwable ex = new RuntimeException(logHeader + "The 
coprocessor thread stopped itself due to scan timeout, failing current 
query...");
+                        Throwable ex = new RuntimeException(logHeader + "The 
coprocessor thread stopped itself due to scan timeout or scan threshold(check 
region server log), failing current query...");
                         logger.error(logHeader + "Error when visiting cubes by 
endpoint", ex); // double log coz the query thread may already timeout
                         epResultItr.notifyCoprocException(ex);
                         return;
@@ -416,7 +257,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             });
         }
 
-        return new EndpointResultsAsGTScanner(fullGTInfo, epResultItr, 
scanRequest.getColumns(), totalScannedCount.get());
+        return new GTBlobScatter(fullGTInfo, epResultItr, 
scanRequest.getColumns(), totalScannedCount.get(), 
scanRequest.getStoragePushDownLimit());
     }
 
     private String getStatsString(byte[] region, CubeVisitResponse result) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
new file mode 100644
index 0000000..4e0d15e
--- /dev/null
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.debug.BackdoorToggles;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+
+import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+class ExpectedSizeIterator implements Iterator<byte[]> {
+
+    BlockingQueue<byte[]> queue;
+
+    int expectedSize;
+    int current = 0;
+    long rpcTimeout;
+    long timeout;
+    long timeoutTS;
+    volatile Throwable coprocException;
+
+    public ExpectedSizeIterator(int expectedSize) {
+        this.expectedSize = expectedSize;
+        this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
+
+        Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
+        this.rpcTimeout = hconf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+        this.timeout = this.rpcTimeout * 
hconf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+        CubeHBaseEndpointRPC.logger.info("rpc timeout is {} and after multiply 
retry times become {}", this.rpcTimeout, this.timeout);
+        this.timeout = Math.max(this.timeout, 5 * 60000);
+        this.timeout *= 
KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes();
+
+        if (BackdoorToggles.getQueryTimeout() != -1) {
+            this.timeout = BackdoorToggles.getQueryTimeout();
+        }
+
+        this.timeout *= 1.1; // allow for some delay
+
+        CubeHBaseEndpointRPC.logger.info("Final Timeout for 
ExpectedSizeIterator is: " + this.timeout);
+
+        this.timeoutTS = System.currentTimeMillis() + this.timeout;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return (current < expectedSize);
+    }
+
+    @Override
+    public byte[] next() {
+        if (current >= expectedSize) {
+            throw new IllegalStateException("Won't have more data");
+        }
+        try {
+            current++;
+            byte[] ret = null;
+
+            while (ret == null && coprocException == null && timeoutTS > 
System.currentTimeMillis()) {
+                ret = queue.poll(5000, TimeUnit.MILLISECONDS);
+            }
+
+            if (coprocException != null) {
+                throw new RuntimeException("Error in coprocessor", 
coprocException);
+            } else if (ret == null) {
+                throw new RuntimeException("Timeout visiting cube!");
+            } else {
+                return ret;
+            }
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Error when waiting queue", e);
+        }
+    }
+
+    @Override
+    public void remove() {
+        throw new NotImplementedException();
+    }
+
+    public void append(byte[] data) {
+        try {
+            queue.put(data);
+        } catch (InterruptedException e) {
+            throw new RuntimeException("error when waiting queue", e);
+        }
+    }
+
+    public long getRpcTimeout() {
+        return this.rpcTimeout;
+    }
+
+    public void notifyCoprocException(Throwable ex) {
+        coprocException = ex;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java
new file mode 100644
index 0000000..631510e
--- /dev/null
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import javax.annotation.Nullable;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.storage.gtrecord.SortedIteratorMergerWithLimit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+
+/**
+ * scatter the blob returned from region server to a iterable of gtrecords
+ */
+class GTBlobScatter implements IGTScanner {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(GTBlobScatter.class);
+
+    private GTInfo info;
+    private Iterator<byte[]> blocks;
+    private ImmutableBitSet columns;
+    private long totalScannedCount;
+    private int storagePushDownLimit = -1;
+
+    public GTBlobScatter(GTInfo info, Iterator<byte[]> blocks, ImmutableBitSet 
columns, long totalScannedCount, int storagePushDownLimit) {
+        this.info = info;
+        this.blocks = blocks;
+        this.columns = columns;
+        this.totalScannedCount = totalScannedCount;
+        this.storagePushDownLimit = storagePushDownLimit;
+    }
+
+    @Override
+    public GTInfo getInfo() {
+        return info;
+    }
+
+    @Override
+    public long getScannedRowCount() {
+        return totalScannedCount;
+    }
+
+    @Override
+    public void close() throws IOException {
+        //do nothing
+    }
+
+    @Override
+    public Iterator<GTRecord> iterator() {
+        Iterator<Iterator<GTRecord>> shardSubsets = 
Iterators.transform(blocks, new GTBlobScatterFunc());
+        if (storagePushDownLimit <= 
KylinConfig.getInstanceFromEnv().getStoragePushDownLimitMax()) {
+            return new SortedIteratorMergerWithLimit<GTRecord>(shardSubsets, 
storagePushDownLimit, GTRecord.getPrimaryKeyComparator()).getIterator();
+        } else {
+            return Iterators.concat(shardSubsets);
+        }
+    }
+
+    class GTBlobScatterFunc implements Function<byte[], Iterator<GTRecord>> {
+        @Nullable
+        @Override
+        public Iterator<GTRecord> apply(@Nullable final byte[] input) {
+
+            return new Iterator<GTRecord>() {
+                private ByteBuffer inputBuffer = null;
+                //rotate between two buffer GTRecord to support 
SortedIteratorMergerWithLimit, which will peek one more GTRecord
+                private GTRecord firstRecord = null;
+                private GTRecord secondRecord = null;
+                private GTRecord thirdRecord = null;
+                private GTRecord fourthRecord = null;
+                private int counter = 0;
+
+                @Override
+                public boolean hasNext() {
+                    if (inputBuffer == null) {
+                        inputBuffer = ByteBuffer.wrap(input);
+                        firstRecord = new GTRecord(info);
+                        secondRecord = new GTRecord(info);
+                        thirdRecord = new GTRecord(info);
+                        fourthRecord = new GTRecord(info);
+                    }
+
+                    return inputBuffer.position() < inputBuffer.limit();
+                }
+
+                @Override
+                public GTRecord next() {
+                    firstRecord.loadColumns(columns, inputBuffer);
+                    //logger.info("A GTRecord: " + 
System.identityHashCode(this) + " " + firstRecord + " " + 
System.identityHashCode(firstRecord));
+                    return firstRecord;
+                    //                    GTRecord temp = new GTRecord(info);
+                    //                    temp.loadColumns(columns, 
inputBuffer);
+                    //                    return temp;
+
+                    //                    counter++;
+                    //                    int index = counter % 4;
+                    //                    if (index == 1) {
+                    //                        firstRecord.loadColumns(columns, 
inputBuffer);
+                    //                        //logger.info("A GTRecord: " + 
System.identityHashCode(this) + " " + firstRecord + " " + 
System.identityHashCode(firstRecord));
+                    //                        return firstRecord;
+                    //                    } else if (index == 2) {
+                    //                        
secondRecord.loadColumns(columns, inputBuffer);
+                    //                        //logger.info("B GTRecord: " + 
System.identityHashCode(this) + " " + secondRecord + " " + 
System.identityHashCode(secondRecord));
+                    //                        return secondRecord;
+                    //                    } else if (index == 3) {
+                    //                        thirdRecord.loadColumns(columns, 
inputBuffer);
+                    //                        //logger.info("C GTRecord: " + 
System.identityHashCode(this) + " " + thirdRecord + " " + 
System.identityHashCode(thirdRecord));
+                    //                        return thirdRecord;
+                    //                    } else {
+                    //                        
fourthRecord.loadColumns(columns, inputBuffer);
+                    //                        //logger.info("D GTRecord: " + 
System.identityHashCode(this) + " " + fourthRecord + " " + 
System.identityHashCode(fourthRecord));
+                    //                        return fourthRecord;
+                    //                    }
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 5b7a26a..cbccac6 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -48,7 +48,9 @@ import org.apache.kylin.common.util.CompressionUtils;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.dimension.DimensionEncoding;
 import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanExceedThresholdException;
 import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanTimeoutException;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.gridtable.IGTStore;
 import org.apache.kylin.measure.BufferedMeasureEncoder;
@@ -235,13 +237,12 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
             }
 
             if (behavior.ordinal() < 
CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
-                scanReq.setAggrCacheGB(0); // disable mem check if so told
+                scanReq.setAggCacheMemThreshold(0); // disable mem check if so 
told
             }
 
             final MutableBoolean scanNormalComplete = new MutableBoolean(true);
-            final long startTime = this.serviceStartTime;
-            final long timeout = request.getTimeout();
-            final int rowLimit = scanReq.getRowLimit();
+            final long deadline = request.getTimeout() + this.serviceStartTime;
+            final long storagePushDownLimit = 
scanReq.getStoragePushDownLimit();
 
             final CellListIterator cellListIterator = new CellListIterator() {
 
@@ -256,19 +257,13 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
 
                 @Override
                 public boolean hasNext() {
-                    if (rowLimit > 0 && rowLimit <= counter)
-                        return false;
 
-                    if (counter % 100000 == 1) {
-                        if (System.currentTimeMillis() - startTime > timeout) {
-                            scanNormalComplete.setValue(false);
-                            logger.error("scanner aborted because timeout");
-                            return false;
-                        }
+                    if (counter > scanReq.getStorageScanRowNumThreshold()) {
+                        throw new GTScanExceedThresholdException("Exceed scan 
threshold at " + counter);
                     }
 
                     if (counter % 100000 == 1) {
-                        logger.info("Scanned " + counter + " rows.");
+                        logger.info("Scanned " + counter + " rows from 
HBase.");
                     }
                     counter++;
                     return allCellLists.hasNext();
@@ -290,38 +285,47 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
             IGTScanner rawScanner = store.scan(scanReq);
             IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, //
                     behavior.ordinal() >= 
CoprocessorBehavior.SCAN_FILTER.ordinal(), //
-                    behavior.ordinal() >= 
CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal());
+                    behavior.ordinal() >= 
CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal(), deadline);
 
             ByteBuffer buffer = 
ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE);
 
             ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE);//ByteArrayOutputStream
 will auto grow
             int finalRowCount = 0;
-            for (GTRecord oneRecord : finalScanner) {
 
-                if (!scanNormalComplete.booleanValue()) {
-                    logger.error("aggregate iterator aborted because input 
iterator aborts");
-                    break;
-                }
+            try {
+                for (GTRecord oneRecord : finalScanner) {
 
-                if (finalRowCount % 100000 == 1) {
-                    if (System.currentTimeMillis() - startTime > timeout) {
-                        logger.error("aggregate iterator aborted because 
timeout");
+                    if (finalRowCount > storagePushDownLimit) {
+                        logger.info("The finalScanner aborted because 
storagePushDownLimit is satisfied");
                         break;
                     }
-                }
 
-                buffer.clear();
-                try {
-                    oneRecord.exportColumns(scanReq.getColumns(), buffer);
-                } catch (BufferOverflowException boe) {
-                    buffer = 
ByteBuffer.allocate(oneRecord.sizeOf(scanReq.getColumns()) * 2);
-                    oneRecord.exportColumns(scanReq.getColumns(), buffer);
-                }
+                    if (finalRowCount % 100000 == 1) {
+                        if (System.currentTimeMillis() > deadline) {
+                            throw new GTScanTimeoutException("finalScanner 
timeouts after scanned " + finalRowCount);
+                        }
+                    }
 
-                outputStream.write(buffer.array(), 0, buffer.position());
-                finalRowCount++;
+                    buffer.clear();
+                    try {
+                        oneRecord.exportColumns(scanReq.getColumns(), buffer);
+                    } catch (BufferOverflowException boe) {
+                        buffer = 
ByteBuffer.allocate(oneRecord.sizeOf(scanReq.getColumns()) * 2);
+                        oneRecord.exportColumns(scanReq.getColumns(), buffer);
+                    }
+
+                    outputStream.write(buffer.array(), 0, buffer.position());
+                    finalRowCount++;
+                }
+            } catch (GTScanTimeoutException e) {
+                scanNormalComplete.setValue(false);
+                logger.info("The cube visit did not finish normally because 
scan timeout", e);
+            } catch (GTScanExceedThresholdException e) {
+                scanNormalComplete.setValue(false);
+                logger.info("The cube visit did not finish normally because 
scan num exceeds threshold", e);
+            } finally {
+                finalScanner.close();
             }
-            finalScanner.close();
 
             appendProfileInfo(sb, "agg done");
 

Reply via email to