Repository: kylin Updated Branches: refs/heads/master c67891d26 -> 28e942306
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java index 3c992d2..ff7fb2b 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java @@ -21,153 +21,77 @@ package org.apache.kylin.storage.gtrecord; import java.io.IOException; import java.util.Iterator; import java.util.List; -import java.util.NoSuchElementException; import java.util.Set; +import javax.annotation.Nullable; + +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.gridtable.GTRecord; -import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.tuple.ITuple; import org.apache.kylin.metadata.tuple.ITupleIterator; -import org.apache.kylin.metadata.tuple.Tuple; import org.apache.kylin.metadata.tuple.TupleInfo; import org.apache.kylin.storage.StorageContext; -import org.apache.kylin.storage.exception.ScanOutOfLimitException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; + public class SequentialCubeTupleIterator implements ITupleIterator { private static final Logger logger = LoggerFactory.getLogger(SequentialCubeTupleIterator.class); - protected final Cuboid cuboid; - protected final Set<TblColRef> selectedDimensions; - protected final Set<FunctionDesc> selectedMetrics; - protected final TupleInfo tupleInfo; - protected final Tuple tuple; - protected final Iterator<CubeSegmentScanner> scannerIterator; - protected final StorageContext context; - - protected CubeSegmentScanner curScanner; - protected Iterator<GTRecord> curRecordIterator; - protected CubeTupleConverter curTupleConverter; - protected Tuple next; - - private List<IAdvMeasureFiller> advMeasureFillers; - private int advMeasureRowsRemaining; - private int advMeasureRowIndex; + protected List<CubeSegmentScanner> scanners; + protected List<SegmentCubeTupleIterator> segmentCubeTupleIterators; + protected Iterator<ITuple> tupleIterator; + protected final int storagePushDownLimit; + protected StorageContext context; private int scanCount; private int scanCountDelta; public SequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, // Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) { - this.cuboid = cuboid; - this.selectedDimensions = selectedDimensions; - this.selectedMetrics = selectedMetrics; - this.tupleInfo = returnTupleInfo; - this.tuple = new Tuple(returnTupleInfo); - this.scannerIterator = scanners.iterator(); this.context = context; - } + this.scanners = scanners; - @Override - public boolean hasNext() { - if (next != null) - return true; - - if (hitLimitAndThreshold()) - return false; - - // consume any left rows from advanced measure filler - if (advMeasureRowsRemaining > 0) { - for (IAdvMeasureFiller filler : advMeasureFillers) { - filler.fillTuple(tuple, advMeasureRowIndex); - } - advMeasureRowIndex++; - advMeasureRowsRemaining--; - next = tuple; - return true; + segmentCubeTupleIterators = Lists.newArrayList(); + for (CubeSegmentScanner scanner : scanners) { + segmentCubeTupleIterators.add(new SegmentCubeTupleIterator(scanner, cuboid, selectedDimensions, selectedMetrics, returnTupleInfo, context)); } - - // get the next GTRecord - if (curScanner == null) { - if (scannerIterator.hasNext()) { - curScanner = scannerIterator.next(); - curRecordIterator = curScanner.iterator(); - if (curRecordIterator.hasNext()) { - //if the segment does not has any tuples, don't bother to create a converter - curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo); + + this.storagePushDownLimit = context.getStoragePushDownLimit(); + if (storagePushDownLimit > KylinConfig.getInstanceFromEnv().getStoragePushDownLimitMax()) { + //normal case + tupleIterator = Iterators.concat(segmentCubeTupleIterators.iterator()); + } else { + //query with limit + Iterator<Iterator<ITuple>> transformed = Iterators.transform(segmentCubeTupleIterators.iterator(), new Function<SegmentCubeTupleIterator, Iterator<ITuple>>() { + @Nullable + @Override + public Iterator<ITuple> apply(@Nullable SegmentCubeTupleIterator input) { + return input; } - } else { - return false; - } - } - if (curRecordIterator.hasNext() == false) { - close(curScanner); - curScanner = null; - curRecordIterator = null; - curTupleConverter = null; - return hasNext(); + }); + tupleIterator = new SortedIteratorMergerWithLimit<ITuple>(transformed, storagePushDownLimit, segmentCubeTupleIterators.get(0).getCubeTupleConverter().getTupleDimensionComparator()).getIterator(); } - - // now we have a GTRecord - GTRecord curRecord = curRecordIterator.next(); - - // translate into tuple - advMeasureFillers = curTupleConverter.translateResult(curRecord, tuple); - - // the simple case - if (advMeasureFillers == null) { - next = tuple; - return true; - } - - // advanced measure filling, like TopN, will produce multiple tuples out of one record - advMeasureRowsRemaining = -1; - for (IAdvMeasureFiller filler : advMeasureFillers) { - if (advMeasureRowsRemaining < 0) - advMeasureRowsRemaining = filler.getNumOfRows(); - if (advMeasureRowsRemaining != filler.getNumOfRows()) - throw new IllegalStateException(); - } - if (advMeasureRowsRemaining < 0) - throw new IllegalStateException(); - - advMeasureRowIndex = 0; - return hasNext(); } - private boolean hitLimitAndThreshold() { - // check limit - if (context.isLimitEnabled() && scanCount >= context.getLimit() + context.getOffset()) { - return true; - } - // check threshold - if (scanCount >= context.getThreshold()) { - throw new ScanOutOfLimitException("Scan row count exceeded threshold: " + context.getThreshold() + ", please add filter condition to narrow down backend scan range, like where clause."); - } - return false; + @Override + public boolean hasNext() { + return tupleIterator.hasNext(); } @Override public ITuple next() { - // fetch next record - if (next == null) { - hasNext(); - if (next == null) - throw new NoSuchElementException(); - } - scanCount++; if (++scanCountDelta >= 1000) flushScanCountDelta(); - ITuple result = next; - next = null; - return result; + return tupleIterator.next(); } @Override @@ -181,11 +105,8 @@ public class SequentialCubeTupleIterator implements ITupleIterator { // close all the remaining segmentIterator flushScanCountDelta(); - if (curScanner != null) - close(curScanner); - - while (scannerIterator.hasNext()) { - close(scannerIterator.next()); + for (SegmentCubeTupleIterator iterator : segmentCubeTupleIterators) { + iterator.close(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java new file mode 100644 index 0000000..d5aa9d0 --- /dev/null +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.gtrecord; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.PriorityQueue; + +import com.google.common.base.Preconditions; + +/** + * a merger that utilizes the sorted nature of input iterators + */ +public class SortedIteratorMerger<E> { + + private Iterator<Iterator<E>> shardSubsets; + private Comparator<E> comparator; + + public SortedIteratorMerger(Iterator<Iterator<E>> shardSubsets, Comparator<E> comparator) { + this.shardSubsets = shardSubsets; + this.comparator = comparator; + } + + public Iterator<E> getIterator() { + final PriorityQueue<PeekingImpl<E>> heap = new PriorityQueue<PeekingImpl<E>>(11, new Comparator<PeekingImpl<E>>() { + @Override + public int compare(PeekingImpl<E> o1, PeekingImpl<E> o2) { + return comparator.compare(o1.peek(), o2.peek()); + } + }); + + while (shardSubsets.hasNext()) { + Iterator<E> iterator = shardSubsets.next(); + PeekingImpl<E> peekingIterator = new PeekingImpl<>(iterator); + if (peekingIterator.hasNext()) { + heap.offer(peekingIterator); + } + } + + return getIteratorInternal(heap); + } + + protected Iterator<E> getIteratorInternal(PriorityQueue<PeekingImpl<E>> heap) { + return new MergedIterator<E>(heap, comparator); + } + + private static class MergedIterator<E> implements Iterator<E> { + + private final PriorityQueue<PeekingImpl<E>> heap; + private final Comparator<E> comparator; + + public MergedIterator(PriorityQueue<PeekingImpl<E>> heap, Comparator<E> comparator) { + this.heap = heap; + this.comparator = comparator; + } + + @Override + public boolean hasNext() { + return !heap.isEmpty(); + } + + @Override + public E next() { + PeekingImpl<E> poll = heap.poll(); + E current = poll.next(); + if (poll.hasNext()) { + + //TODO: remove this check when validated + Preconditions.checkState(comparator.compare(current, poll.peek()) < 0, "Not sorted! current: " + current + " Next: " + poll.peek()); + + heap.offer(poll); + } + return current; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimit.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimit.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimit.java new file mode 100644 index 0000000..0e40150 --- /dev/null +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimit.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.gtrecord; + +import java.lang.reflect.InvocationTargetException; +import java.util.Comparator; +import java.util.Iterator; +import java.util.PriorityQueue; + +import com.google.common.base.Preconditions; + +/** + * the limit here correspond to the the limit in sql + * if the SQL ends with "limit N", then each shard will return N "smallest" records + * The query sever side will use a heap to pick right records. + * + * There're two usage of SortedIteratorMergerWithLimit in kylin + * One at GTRecord level and the other at Tuple Level + * The first is to deal with cuboid shards among the same segment + * and the second is to deal with multiple segments + * + * Let's use single-segment as an example: + * suppose we have a "limit 2" in SQL, and we have three shards in the segment + * the first returns (1,2), the second returns (1,3) and the third returns (2,3) + * each subset is guaranteed to be sorted. (that's why it's called "SortedIterator Merger") + * SortedIteratorMergerWithLimit will merge these three subsets and return (1,1,2,2) + * + */ +public class SortedIteratorMergerWithLimit<E extends Cloneable> extends SortedIteratorMerger<E> { + private int limit; + private Comparator<E> comparator; + + public SortedIteratorMergerWithLimit(Iterator<Iterator<E>> shardSubsets, int limit, Comparator<E> comparator) { + super(shardSubsets, comparator); + this.limit = limit; + this.comparator = comparator; + } + + protected Iterator<E> getIteratorInternal(PriorityQueue<PeekingImpl<E>> heap) { + return new MergedIteratorWithLimit<E>(heap, limit, comparator); + } + + static class MergedIteratorWithLimit<E extends Cloneable> implements Iterator<E> { + + private final PriorityQueue<PeekingImpl<E>> heap; + private final Comparator<E> comparator; + + private boolean nextFetched = false; + private E fetched = null; + private E last = null; + + private int limit; + private int limitProgress = 0; + + private PeekingImpl<E> lastSource = null; + + public MergedIteratorWithLimit(PriorityQueue<PeekingImpl<E>> heap, int limit, Comparator<E> comparator) { + this.heap = heap; + this.limit = limit; + this.comparator = comparator; + } + + @Override + public boolean hasNext() { + if (nextFetched) { + return true; + } + + if (lastSource != null && lastSource.hasNext()) { + if (lastSource.hasNext()) { + heap.offer(lastSource); + } else { + lastSource = null; + } + } + + if (!heap.isEmpty()) { + PeekingImpl<E> first = heap.poll(); + E current = first.next(); + try { + current = (E) current.getClass().getMethod("clone").invoke(current); + } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { + throw new RuntimeException(e); + } + + lastSource = first; + + Preconditions.checkState(current != null); + + if (last == null || comparator.compare(current, last) != 0) { + if (++limitProgress > limit) { + return false; + } + } + nextFetched = true; + fetched = current; + + return true; + } else { + return false; + } + } + + @Override + public E next() { + if (!nextFetched) { + throw new IllegalStateException("Should hasNext() before next()"); + } + + //TODO: remove this check when validated + if (last != null) { + Preconditions.checkState(comparator.compare(last, fetched) <= 0, "Not sorted! last: " + last + " fetched: " + fetched); + } + + last = fetched; + nextFetched = false; + + return fetched; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerTest.java ---------------------------------------------------------------------- diff --git a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerTest.java b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerTest.java new file mode 100644 index 0000000..f09844a --- /dev/null +++ b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.gtrecord; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class SortedIteratorMergerTest { + + private Comparator<Integer> getComp() { + return new Comparator<Integer>() { + @Override + public int compare(Integer o1, Integer o2) { + return o1 - o2; + } + }; + } + + @Test + public void basic1() { + + List<Integer> a = Lists.newArrayList(1, 2, 3); + List<Integer> b = Lists.newArrayList(1, 2, 3); + List<Integer> c = Lists.newArrayList(1, 2, 5); + List<Iterator<Integer>> input = Lists.newArrayList(); + input.add(a.iterator()); + input.add(b.iterator()); + input.add(c.iterator()); + SortedIteratorMerger<Integer> merger = new SortedIteratorMerger<Integer>(input.iterator(), getComp()); + Iterator<Integer> iterator = merger.getIterator(); + List<Integer> result = Lists.newArrayList(); + while (iterator.hasNext()) { + result.add(iterator.next()); + } + Assert.assertEquals(Lists.newArrayList(1, 1, 1, 2, 2, 2, 3, 3, 5), result); + } + + @Test + public void basic2() { + + List<Integer> a = Lists.newArrayList(2); + List<Integer> b = Lists.newArrayList(); + List<Integer> c = Lists.newArrayList(1, 2, 5); + List<Iterator<Integer>> input = Lists.newArrayList(); + input.add(a.iterator()); + input.add(b.iterator()); + input.add(c.iterator()); + SortedIteratorMerger<Integer> merger = new SortedIteratorMerger<Integer>(input.iterator(), getComp()); + Iterator<Integer> iterator = merger.getIterator(); + List<Integer> result = Lists.newArrayList(); + while (iterator.hasNext()) { + result.add(iterator.next()); + } + Assert.assertEquals(Lists.newArrayList(1, 2, 2, 5), result); + } + + @Test + public void basic3() { + + List<Integer> a = Lists.newArrayList(); + List<Integer> b = Lists.newArrayList(); + List<Integer> c = Lists.newArrayList(); + List<Iterator<Integer>> input = Lists.newArrayList(); + input.add(a.iterator()); + input.add(b.iterator()); + input.add(c.iterator()); + SortedIteratorMerger<Integer> merger = new SortedIteratorMerger<Integer>(input.iterator(), getComp()); + Iterator<Integer> iterator = merger.getIterator(); + List<Integer> result = Lists.newArrayList(); + while (iterator.hasNext()) { + result.add(iterator.next()); + } + Assert.assertEquals(Lists.newArrayList(), result); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimitTest.java ---------------------------------------------------------------------- diff --git a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimitTest.java b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimitTest.java new file mode 100644 index 0000000..1627b4f --- /dev/null +++ b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimitTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.gtrecord; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class SortedIteratorMergerWithLimitTest { + class CloneableInteger implements Cloneable { + int value; + + public CloneableInteger(int value) { + this.value = value; + } + + @Override + public Object clone() { + return new CloneableInteger(value); + } + + @Override + public int hashCode() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + CloneableInteger that = (CloneableInteger) o; + + return value == that.value; + + } + } + + private Comparator<CloneableInteger> getComp() { + return new Comparator<CloneableInteger>() { + @Override + public int compare(CloneableInteger o1, CloneableInteger o2) { + return o1.value - o2.value; + } + }; + } + + @Test + public void basic1() { + + List<CloneableInteger> a = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(3)); + List<CloneableInteger> b = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(3)); + List<CloneableInteger> c = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(5)); + List<Iterator<CloneableInteger>> input = Lists.newArrayList(); + input.add(a.iterator()); + input.add(b.iterator()); + input.add(c.iterator()); + SortedIteratorMergerWithLimit<CloneableInteger> merger = new SortedIteratorMergerWithLimit<CloneableInteger>(input.iterator(), 3, getComp()); + Iterator<CloneableInteger> iterator = merger.getIterator(); + List<CloneableInteger> result = Lists.newArrayList(); + while (iterator.hasNext()) { + result.add(iterator.next()); + } + Assert.assertEquals(Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(1), new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(2), new CloneableInteger(2), new CloneableInteger(3), new CloneableInteger(3)), result); + } + + @Test + public void basic2() { + + List<CloneableInteger> a = Lists.newArrayList(new CloneableInteger(2)); + List<CloneableInteger> b = Lists.newArrayList(); + List<CloneableInteger> c = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(5)); + List<Iterator<CloneableInteger>> input = Lists.newArrayList(); + input.add(a.iterator()); + input.add(b.iterator()); + input.add(c.iterator()); + SortedIteratorMergerWithLimit<CloneableInteger> merger = new SortedIteratorMergerWithLimit<CloneableInteger>(input.iterator(), 3, getComp()); + Iterator<CloneableInteger> iterator = merger.getIterator(); + List<CloneableInteger> result = Lists.newArrayList(); + while (iterator.hasNext()) { + result.add(iterator.next()); + } + Assert.assertEquals(Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(2), new CloneableInteger(5)), result); + } + + @Test(expected = IllegalStateException.class) + public void basic3() { + + List<CloneableInteger> a = Lists.newArrayList(new CloneableInteger(2), new CloneableInteger(1)); + List<CloneableInteger> b = Lists.newArrayList(); + List<CloneableInteger> c = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(5)); + List<Iterator<CloneableInteger>> input = Lists.newArrayList(); + input.add(a.iterator()); + input.add(b.iterator()); + input.add(c.iterator()); + SortedIteratorMergerWithLimit<CloneableInteger> merger = new SortedIteratorMergerWithLimit<CloneableInteger>(input.iterator(), 3, getComp()); + Iterator<CloneableInteger> iterator = merger.getIterator(); + List<CloneableInteger> result = Lists.newArrayList(); + while (iterator.hasNext()) { + result.add(iterator.next()); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/kylin-it/src/test/java/org/apache/kylin/query/HackedDbUnitAssert.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/HackedDbUnitAssert.java b/kylin-it/src/test/java/org/apache/kylin/query/HackedDbUnitAssert.java new file mode 100644 index 0000000..c295430 --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/query/HackedDbUnitAssert.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.query; + +import org.dbunit.DatabaseUnitException; +import org.dbunit.assertion.DbUnitAssert; +import org.dbunit.assertion.FailureHandler; +import org.dbunit.dataset.Column; +import org.dbunit.dataset.Columns; +import org.dbunit.dataset.DataSetException; +import org.dbunit.dataset.ITable; +import org.dbunit.dataset.ITableMetaData; +import org.dbunit.dataset.datatype.DataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * dirty hack to support checking result of SQL with limit + */ +public class HackedDbUnitAssert extends DbUnitAssert { + private static final Logger logger = LoggerFactory.getLogger(HackedDbUnitAssert.class); + + public void assertEquals(ITable expectedTable, ITable actualTable, FailureHandler failureHandler) throws DatabaseUnitException { + logger.trace("assertEquals(expectedTable, actualTable, failureHandler) - start"); + logger.debug("assertEquals: expectedTable={}", expectedTable); + logger.debug("assertEquals: actualTable={}", actualTable); + logger.debug("assertEquals: failureHandler={}", failureHandler); + + // Do not continue if same instance + if (expectedTable == actualTable) { + logger.debug("The given tables reference the same object. Will return immediately. (Table={})", expectedTable); + return; + } + + if (failureHandler == null) { + logger.debug("FailureHandler is null. Using default implementation"); + failureHandler = getDefaultFailureHandler(); + } + + ITableMetaData expectedMetaData = expectedTable.getTableMetaData(); + ITableMetaData actualMetaData = actualTable.getTableMetaData(); + String expectedTableName = expectedMetaData.getTableName(); + + // // Verify row count + // int expectedRowsCount = expectedTable.getRowCount(); + // int actualRowsCount = actualTable.getRowCount(); + // if (expectedRowsCount != actualRowsCount) { + // String msg = "row count (table=" + expectedTableName + ")"; + // Error error = + // failureHandler.createFailure(msg, String + // .valueOf(expectedRowsCount), String + // .valueOf(actualRowsCount)); + // logger.error(error.toString()); + // throw error; + // } + + // if both tables are empty, it is not necessary to compare columns, as + // such + // comparison + // can fail if column metadata is different (which could occurs when + // comparing empty tables) + if (expectedTable.getRowCount() == 0 && actualTable.getRowCount() == 0) { + logger.debug("Tables are empty, hence equals."); + return; + } + + // Put the columns into the same order + Column[] expectedColumns = Columns.getSortedColumns(expectedMetaData); + Column[] actualColumns = Columns.getSortedColumns(actualMetaData); + + // Verify columns + Columns.ColumnDiff columnDiff = Columns.getColumnDiff(expectedMetaData, actualMetaData); + if (columnDiff.hasDifference()) { + String message = columnDiff.getMessage(); + Error error = failureHandler.createFailure(message, Columns.getColumnNamesAsString(expectedColumns), Columns.getColumnNamesAsString(actualColumns)); + logger.error(error.toString()); + throw error; + } + + // Get the datatypes to be used for comparing the sorted columns + ComparisonColumn[] comparisonCols = getComparisonColumns(expectedTableName, expectedColumns, actualColumns, failureHandler); + + // Finally compare the data + compareData(expectedTable, actualTable, comparisonCols, failureHandler); + } + + protected void compareData(ITable expectedTable, ITable actualTable, ComparisonColumn[] comparisonCols, FailureHandler failureHandler) throws DataSetException { + logger.debug("compareData(expectedTable={}, actualTable={}, " + "comparisonCols={}, failureHandler={}) - start", new Object[] { expectedTable, actualTable, comparisonCols, failureHandler }); + + if (expectedTable == null) { + throw new NullPointerException("The parameter 'expectedTable' must not be null"); + } + if (actualTable == null) { + throw new NullPointerException("The parameter 'actualTable' must not be null"); + } + if (comparisonCols == null) { + throw new NullPointerException("The parameter 'comparisonCols' must not be null"); + } + if (failureHandler == null) { + throw new NullPointerException("The parameter 'failureHandler' must not be null"); + } + + for (int index = 0; index < actualTable.getRowCount(); index++) { + if (!findRowInExpectedTable(expectedTable, actualTable, comparisonCols, failureHandler, index)) { + throw new IllegalStateException(); + } + } + + } + + private boolean findRowInExpectedTable(ITable expectedTable, ITable actualTable, ComparisonColumn[] comparisonCols, FailureHandler failureHandler, int index) throws DataSetException { + + // iterate over all rows + for (int i = 0; i < expectedTable.getRowCount(); i++) { + + // iterate over all columns of the current row + for (int j = 0; j < comparisonCols.length; j++) { + ComparisonColumn compareColumn = comparisonCols[j]; + + String columnName = compareColumn.getColumnName(); + DataType dataType = compareColumn.getDataType(); + + Object expectedValue = expectedTable.getValue(i, columnName); + Object actualValue = actualTable.getValue(index, columnName); + + // Compare the values + if (skipCompare(columnName, expectedValue, actualValue)) { + if (logger.isTraceEnabled()) { + logger.trace("ignoring comparison " + expectedValue + "=" + actualValue + " on column " + columnName); + } + continue; + } + + if (dataType.compare(expectedValue, actualValue) != 0) { + break; + + // Difference diff = new Difference(expectedTable, actualTable, i, columnName, expectedValue, actualValue); + // + // // Handle the difference (throw error immediately or something else) + // failureHandler.handle(diff); + } else { + if (j == comparisonCols.length - 1) { + return true; + } else { + continue; + } + } + } + } + return false; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java index a6e7956..2c428ec 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java @@ -35,6 +35,7 @@ import org.apache.kylin.query.enumerator.OLAPQuery; import org.apache.kylin.query.relnode.OLAPContext; import org.apache.kylin.query.routing.Candidate; import org.apache.kylin.query.schema.OLAPSchemaFactory; +import org.apache.kylin.storage.hbase.HBaseStorage; import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler; import org.dbunit.database.DatabaseConnection; import org.dbunit.database.IDatabaseConnection; @@ -123,7 +124,7 @@ public class ITKylinQueryTest extends KylinTestBase { @Test public void testSingleExecuteQuery() throws Exception { - String queryFileName = getQueryFolderPrefix() + "src/test/resources/query/sql_tableau/query20.sql"; + String queryFileName = getQueryFolderPrefix() + "src/test/resources/query/temp/query01.sql"; File sqlFile = new File(queryFileName); String sql = getTextFromFile(sqlFile); @@ -187,7 +188,7 @@ public class ITKylinQueryTest extends KylinTestBase { @Test public void testPreciselyDistinctCountQuery() throws Exception { if ("left".equalsIgnoreCase(joinType)) { - execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_distinct_precisely", null, true); + execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/temp", null, true); } } @@ -257,6 +258,13 @@ public class ITKylinQueryTest extends KylinTestBase { } @Test + public void testLimitCorrectness() throws Exception { + if (HBaseStorage.overwriteStorageQuery == null) {//v1 query engine will not work + execLimitAndValidate(getQueryFolderPrefix() + "src/test/resources/query/sql"); + } + } + + @Test public void testTopNQuery() throws Exception { if ("left".equalsIgnoreCase(joinType)) { this.execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_topn", null, true); http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java index 0511971..4e59815 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java @@ -34,6 +34,7 @@ import java.sql.Statement; import java.sql.Timestamp; import java.sql.Types; import java.util.ArrayList; +import java.util.Arrays; import java.util.Comparator; import java.util.HashSet; import java.util.List; @@ -62,6 +63,33 @@ import com.google.common.io.Files; */ public class KylinTestBase { + class ObjectArray { + Object[] data; + + public ObjectArray(Object[] data) { + this.data = data; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + ObjectArray that = (ObjectArray) o; + + // Probably incorrect - comparing Object[] arrays with Arrays.equals + return Arrays.equals(data, that.data); + + } + + @Override + public int hashCode() { + return Arrays.hashCode(data); + } + } + // Hack for the different constant integer type between optiq (INTEGER) and // h2 (BIGINT) public static class TestH2DataTypeFactory extends H2DataTypeFactory { @@ -357,6 +385,50 @@ public class KylinTestBase { } } + protected void execLimitAndValidate(String queryFolder) throws Exception { + printInfo("---------- test folder: " + new File(queryFolder).getAbsolutePath()); + + int appendLimitQueries = 0; + List<File> sqlFiles = getFilesFromFolder(new File(queryFolder), ".sql"); + for (File sqlFile : sqlFiles) { + String queryName = StringUtils.split(sqlFile.getName(), '.')[0]; + String sql = getTextFromFile(sqlFile); + + String sqlWithLimit; + if (sql.toLowerCase().contains("limit ")) { + sqlWithLimit = sql; + } else { + sqlWithLimit = sql + " limit 5"; + appendLimitQueries++; + } + + // execute Kylin + printInfo("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); + IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); + ITable kylinTable = executeQuery(kylinConn, queryName, sqlWithLimit, false); + + // execute H2 + printInfo("Query Result from H2 - " + queryName); + H2Connection h2Conn = new H2Connection(h2Connection, null); + h2Conn.getConfig().setProperty(DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new TestH2DataTypeFactory()); + ITable h2Table = executeQuery(h2Conn, queryName, sql, false); + + try { + HackedDbUnitAssert hackedDbUnitAssert = new HackedDbUnitAssert(); + hackedDbUnitAssert.assertEquals(h2Table, kylinTable); + } catch (Throwable t) { + printInfo("execAndCompQuery failed on: " + sqlFile.getAbsolutePath()); + throw t; + } + + compQueryCount++; + if (kylinTable.getRowCount() == 0) { + zeroResultQueries.add(sql); + } + } + printInfo("Queries appended with limit: " + appendLimitQueries); + } + protected void execAndCompQuery(String queryFolder, String[] exclusiveQuerys, boolean needSort) throws Exception { printInfo("---------- test folder: " + new File(queryFolder).getAbsolutePath()); Set<String> exclusiveSet = buildExclusiveSet(exclusiveQuerys); http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java index da8e7ce..3fdb92f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java @@ -29,7 +29,7 @@ import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTInfo.Builder; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTSampleCodeSystem; -import org.apache.kylin.gridtable.GTScanRequest; +import org.apache.kylin.gridtable.GTScanRequestBuilder; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.gridtable.IGTWriter; import org.apache.kylin.gridtable.benchmark.SortedGTRecordGenerator; @@ -109,7 +109,7 @@ public class HBaseScannerBenchmark { private void testScanRaw(String msg) throws IOException { long t = System.currentTimeMillis(); - IGTScanner scan = simpleStore.scan(new GTScanRequest(info, null, null, null)); + IGTScanner scan = simpleStore.scan(new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest()); ResultScanner innerScanner = ((SimpleHBaseStore.Reader) scan).getHBaseScanner(); int count = 0; for (Result r : innerScanner) { @@ -125,7 +125,7 @@ public class HBaseScannerBenchmark { private void testScanRecords(String msg) throws IOException { long t = System.currentTimeMillis(); - IGTScanner scan = simpleStore.scan(new GTScanRequest(info, null, null, null)); + IGTScanner scan = simpleStore.scan(new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest()); int count = 0; for (GTRecord rec : scan) { count++; http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 1cebdea..4c599d9 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -21,20 +21,11 @@ package org.apache.kylin.storage.hbase.cube.v2; import java.io.IOException; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; -import java.util.Iterator; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.zip.DataFormatException; -import javax.annotation.Nullable; - -import org.apache.commons.lang.NotImplementedException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.coprocessor.Batch; @@ -52,7 +43,6 @@ import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.gridtable.GTInfo; -import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRange; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; @@ -67,8 +57,6 @@ import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.Cub import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import com.google.protobuf.HBaseZeroCopyByteString; @@ -79,153 +67,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { private static ExecutorService executorService = new LoggableCachedThreadPool(); - static class ExpectedSizeIterator implements Iterator<byte[]> { - - BlockingQueue<byte[]> queue; - - int expectedSize; - int current = 0; - long timeout; - long timeoutTS; - volatile Throwable coprocException; - - public ExpectedSizeIterator(int expectedSize) { - this.expectedSize = expectedSize; - this.queue = new ArrayBlockingQueue<byte[]>(expectedSize); - - Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); - this.timeout = hconf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5) * hconf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 60000); - this.timeout = Math.max(this.timeout, 5 * 60000); - this.timeout *= KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes(); - - if (BackdoorToggles.getQueryTimeout() != -1) { - this.timeout = BackdoorToggles.getQueryTimeout(); - } - - this.timeout *= 1.1; // allow for some delay - - logger.info("Timeout for ExpectedSizeIterator is: " + this.timeout); - - this.timeoutTS = System.currentTimeMillis() + this.timeout; - } - - @Override - public boolean hasNext() { - return (current < expectedSize); - } - - @Override - public byte[] next() { - if (current >= expectedSize) { - throw new IllegalStateException("Won't have more data"); - } - try { - current++; - byte[] ret = null; - - while (ret == null && coprocException == null && timeoutTS - System.currentTimeMillis() > 0) { - ret = queue.poll(5000, TimeUnit.MILLISECONDS); - } - - if (coprocException != null) { - throw new RuntimeException("Error in coprocessor", coprocException); - } else if (ret == null) { - throw new RuntimeException("Timeout visiting cube!"); - } else { - return ret; - } - } catch (InterruptedException e) { - throw new RuntimeException("Error when waiting queue", e); - } - } - - @Override - public void remove() { - throw new NotImplementedException(); - } - - public void append(byte[] data) { - try { - queue.put(data); - } catch (InterruptedException e) { - throw new RuntimeException("error when waiting queue", e); - } - } - - public long getTimeout() { - return timeout; - } - - public void notifyCoprocException(Throwable ex) { - coprocException = ex; - } - } - - static class EndpointResultsAsGTScanner implements IGTScanner { - private GTInfo info; - private Iterator<byte[]> blocks; - private ImmutableBitSet columns; - private long totalScannedCount; - - public EndpointResultsAsGTScanner(GTInfo info, Iterator<byte[]> blocks, ImmutableBitSet columns, long totalScannedCount) { - this.info = info; - this.blocks = blocks; - this.columns = columns; - this.totalScannedCount = totalScannedCount; - } - - @Override - public GTInfo getInfo() { - return info; - } - - @Override - public long getScannedRowCount() { - return totalScannedCount; - } - - @Override - public void close() throws IOException { - //do nothing - } - - @Override - public Iterator<GTRecord> iterator() { - return Iterators.concat(Iterators.transform(blocks, new Function<byte[], Iterator<GTRecord>>() { - @Nullable - @Override - public Iterator<GTRecord> apply(@Nullable final byte[] input) { - - return new Iterator<GTRecord>() { - private ByteBuffer inputBuffer = null; - private GTRecord oneRecord = null; - - @Override - public boolean hasNext() { - if (inputBuffer == null) { - inputBuffer = ByteBuffer.wrap(input); - oneRecord = new GTRecord(info); - } - - return inputBuffer.position() < inputBuffer.limit(); - } - - @Override - public GTRecord next() { - oneRecord.loadColumns(columns, inputBuffer); - return oneRecord; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - })); - } - } - public CubeHBaseEndpointRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo fullGTInfo) { super(cubeSeg, cuboid, fullGTInfo); } @@ -345,7 +186,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize()); builder.setBehavior(toggle); builder.setStartTime(System.currentTimeMillis()); - builder.setTimeout(epResultItr.getTimeout()); + builder.setTimeout(epResultItr.getRpcTimeout()); builder.setKylinProperties(kylinConfig.getConfigAsString()); for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) { @@ -407,7 +248,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { } if (abnormalFinish[0]) { - Throwable ex = new RuntimeException(logHeader + "The coprocessor thread stopped itself due to scan timeout, failing current query..."); + Throwable ex = new RuntimeException(logHeader + "The coprocessor thread stopped itself due to scan timeout or scan threshold(check region server log), failing current query..."); logger.error(logHeader + "Error when visiting cubes by endpoint", ex); // double log coz the query thread may already timeout epResultItr.notifyCoprocException(ex); return; @@ -416,7 +257,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { }); } - return new EndpointResultsAsGTScanner(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get()); + return new GTBlobScatter(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get(), scanRequest.getStoragePushDownLimit()); } private String getStatsString(byte[] region, CubeVisitResponse result) { http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java new file mode 100644 index 0000000..4e0d15e --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.hbase.cube.v2; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.debug.BackdoorToggles; +import org.apache.kylin.storage.hbase.HBaseConnection; + +import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +class ExpectedSizeIterator implements Iterator<byte[]> { + + BlockingQueue<byte[]> queue; + + int expectedSize; + int current = 0; + long rpcTimeout; + long timeout; + long timeoutTS; + volatile Throwable coprocException; + + public ExpectedSizeIterator(int expectedSize) { + this.expectedSize = expectedSize; + this.queue = new ArrayBlockingQueue<byte[]>(expectedSize); + + Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); + this.rpcTimeout = hconf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.timeout = this.rpcTimeout * hconf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + CubeHBaseEndpointRPC.logger.info("rpc timeout is {} and after multiply retry times become {}", this.rpcTimeout, this.timeout); + this.timeout = Math.max(this.timeout, 5 * 60000); + this.timeout *= KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes(); + + if (BackdoorToggles.getQueryTimeout() != -1) { + this.timeout = BackdoorToggles.getQueryTimeout(); + } + + this.timeout *= 1.1; // allow for some delay + + CubeHBaseEndpointRPC.logger.info("Final Timeout for ExpectedSizeIterator is: " + this.timeout); + + this.timeoutTS = System.currentTimeMillis() + this.timeout; + } + + @Override + public boolean hasNext() { + return (current < expectedSize); + } + + @Override + public byte[] next() { + if (current >= expectedSize) { + throw new IllegalStateException("Won't have more data"); + } + try { + current++; + byte[] ret = null; + + while (ret == null && coprocException == null && timeoutTS > System.currentTimeMillis()) { + ret = queue.poll(5000, TimeUnit.MILLISECONDS); + } + + if (coprocException != null) { + throw new RuntimeException("Error in coprocessor", coprocException); + } else if (ret == null) { + throw new RuntimeException("Timeout visiting cube!"); + } else { + return ret; + } + } catch (InterruptedException e) { + throw new RuntimeException("Error when waiting queue", e); + } + } + + @Override + public void remove() { + throw new NotImplementedException(); + } + + public void append(byte[] data) { + try { + queue.put(data); + } catch (InterruptedException e) { + throw new RuntimeException("error when waiting queue", e); + } + } + + public long getRpcTimeout() { + return this.rpcTimeout; + } + + public void notifyCoprocException(Throwable ex) { + coprocException = ex; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java new file mode 100644 index 0000000..631510e --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.hbase.cube.v2; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; + +import javax.annotation.Nullable; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.storage.gtrecord.SortedIteratorMergerWithLimit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.collect.Iterators; + +/** + * scatter the blob returned from region server to a iterable of gtrecords + */ +class GTBlobScatter implements IGTScanner { + + private static final Logger logger = LoggerFactory.getLogger(GTBlobScatter.class); + + private GTInfo info; + private Iterator<byte[]> blocks; + private ImmutableBitSet columns; + private long totalScannedCount; + private int storagePushDownLimit = -1; + + public GTBlobScatter(GTInfo info, Iterator<byte[]> blocks, ImmutableBitSet columns, long totalScannedCount, int storagePushDownLimit) { + this.info = info; + this.blocks = blocks; + this.columns = columns; + this.totalScannedCount = totalScannedCount; + this.storagePushDownLimit = storagePushDownLimit; + } + + @Override + public GTInfo getInfo() { + return info; + } + + @Override + public long getScannedRowCount() { + return totalScannedCount; + } + + @Override + public void close() throws IOException { + //do nothing + } + + @Override + public Iterator<GTRecord> iterator() { + Iterator<Iterator<GTRecord>> shardSubsets = Iterators.transform(blocks, new GTBlobScatterFunc()); + if (storagePushDownLimit <= KylinConfig.getInstanceFromEnv().getStoragePushDownLimitMax()) { + return new SortedIteratorMergerWithLimit<GTRecord>(shardSubsets, storagePushDownLimit, GTRecord.getPrimaryKeyComparator()).getIterator(); + } else { + return Iterators.concat(shardSubsets); + } + } + + class GTBlobScatterFunc implements Function<byte[], Iterator<GTRecord>> { + @Nullable + @Override + public Iterator<GTRecord> apply(@Nullable final byte[] input) { + + return new Iterator<GTRecord>() { + private ByteBuffer inputBuffer = null; + //rotate between two buffer GTRecord to support SortedIteratorMergerWithLimit, which will peek one more GTRecord + private GTRecord firstRecord = null; + private GTRecord secondRecord = null; + private GTRecord thirdRecord = null; + private GTRecord fourthRecord = null; + private int counter = 0; + + @Override + public boolean hasNext() { + if (inputBuffer == null) { + inputBuffer = ByteBuffer.wrap(input); + firstRecord = new GTRecord(info); + secondRecord = new GTRecord(info); + thirdRecord = new GTRecord(info); + fourthRecord = new GTRecord(info); + } + + return inputBuffer.position() < inputBuffer.limit(); + } + + @Override + public GTRecord next() { + firstRecord.loadColumns(columns, inputBuffer); + //logger.info("A GTRecord: " + System.identityHashCode(this) + " " + firstRecord + " " + System.identityHashCode(firstRecord)); + return firstRecord; + // GTRecord temp = new GTRecord(info); + // temp.loadColumns(columns, inputBuffer); + // return temp; + + // counter++; + // int index = counter % 4; + // if (index == 1) { + // firstRecord.loadColumns(columns, inputBuffer); + // //logger.info("A GTRecord: " + System.identityHashCode(this) + " " + firstRecord + " " + System.identityHashCode(firstRecord)); + // return firstRecord; + // } else if (index == 2) { + // secondRecord.loadColumns(columns, inputBuffer); + // //logger.info("B GTRecord: " + System.identityHashCode(this) + " " + secondRecord + " " + System.identityHashCode(secondRecord)); + // return secondRecord; + // } else if (index == 3) { + // thirdRecord.loadColumns(columns, inputBuffer); + // //logger.info("C GTRecord: " + System.identityHashCode(this) + " " + thirdRecord + " " + System.identityHashCode(thirdRecord)); + // return thirdRecord; + // } else { + // fourthRecord.loadColumns(columns, inputBuffer); + // //logger.info("D GTRecord: " + System.identityHashCode(this) + " " + fourthRecord + " " + System.identityHashCode(fourthRecord)); + // return fourthRecord; + // } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 5b7a26a..cbccac6 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -48,7 +48,9 @@ import org.apache.kylin.common.util.CompressionUtils; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.dimension.DimensionEncoding; import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GTScanExceedThresholdException; import org.apache.kylin.gridtable.GTScanRequest; +import org.apache.kylin.gridtable.GTScanTimeoutException; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.gridtable.IGTStore; import org.apache.kylin.measure.BufferedMeasureEncoder; @@ -235,13 +237,12 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement } if (behavior.ordinal() < CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) { - scanReq.setAggrCacheGB(0); // disable mem check if so told + scanReq.setAggCacheMemThreshold(0); // disable mem check if so told } final MutableBoolean scanNormalComplete = new MutableBoolean(true); - final long startTime = this.serviceStartTime; - final long timeout = request.getTimeout(); - final int rowLimit = scanReq.getRowLimit(); + final long deadline = request.getTimeout() + this.serviceStartTime; + final long storagePushDownLimit = scanReq.getStoragePushDownLimit(); final CellListIterator cellListIterator = new CellListIterator() { @@ -256,19 +257,13 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement @Override public boolean hasNext() { - if (rowLimit > 0 && rowLimit <= counter) - return false; - if (counter % 100000 == 1) { - if (System.currentTimeMillis() - startTime > timeout) { - scanNormalComplete.setValue(false); - logger.error("scanner aborted because timeout"); - return false; - } + if (counter > scanReq.getStorageScanRowNumThreshold()) { + throw new GTScanExceedThresholdException("Exceed scan threshold at " + counter); } if (counter % 100000 == 1) { - logger.info("Scanned " + counter + " rows."); + logger.info("Scanned " + counter + " rows from HBase."); } counter++; return allCellLists.hasNext(); @@ -290,38 +285,47 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement IGTScanner rawScanner = store.scan(scanReq); IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, // behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER.ordinal(), // - behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal()); + behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal(), deadline); ByteBuffer buffer = ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE);//ByteArrayOutputStream will auto grow int finalRowCount = 0; - for (GTRecord oneRecord : finalScanner) { - if (!scanNormalComplete.booleanValue()) { - logger.error("aggregate iterator aborted because input iterator aborts"); - break; - } + try { + for (GTRecord oneRecord : finalScanner) { - if (finalRowCount % 100000 == 1) { - if (System.currentTimeMillis() - startTime > timeout) { - logger.error("aggregate iterator aborted because timeout"); + if (finalRowCount > storagePushDownLimit) { + logger.info("The finalScanner aborted because storagePushDownLimit is satisfied"); break; } - } - buffer.clear(); - try { - oneRecord.exportColumns(scanReq.getColumns(), buffer); - } catch (BufferOverflowException boe) { - buffer = ByteBuffer.allocate(oneRecord.sizeOf(scanReq.getColumns()) * 2); - oneRecord.exportColumns(scanReq.getColumns(), buffer); - } + if (finalRowCount % 100000 == 1) { + if (System.currentTimeMillis() > deadline) { + throw new GTScanTimeoutException("finalScanner timeouts after scanned " + finalRowCount); + } + } - outputStream.write(buffer.array(), 0, buffer.position()); - finalRowCount++; + buffer.clear(); + try { + oneRecord.exportColumns(scanReq.getColumns(), buffer); + } catch (BufferOverflowException boe) { + buffer = ByteBuffer.allocate(oneRecord.sizeOf(scanReq.getColumns()) * 2); + oneRecord.exportColumns(scanReq.getColumns(), buffer); + } + + outputStream.write(buffer.array(), 0, buffer.position()); + finalRowCount++; + } + } catch (GTScanTimeoutException e) { + scanNormalComplete.setValue(false); + logger.info("The cube visit did not finish normally because scan timeout", e); + } catch (GTScanExceedThresholdException e) { + scanNormalComplete.setValue(false); + logger.info("The cube visit did not finish normally because scan num exceeds threshold", e); + } finally { + finalScanner.close(); } - finalScanner.close(); appendProfileInfo(sb, "agg done");