http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java ---------------------------------------------------------------------- diff --git a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java new file mode 100644 index 0000000..0cdfa7e --- /dev/null +++ b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java @@ -0,0 +1,626 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.gtrecord; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.BitSet; +import java.util.List; + +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.BytesSerializer; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.gridtable.CubeCodeSystem; +import org.apache.kylin.dict.NumberDictionaryBuilder; +import org.apache.kylin.dict.StringBytesConverter; +import org.apache.kylin.dict.TrieDictionaryBuilder; +import org.apache.kylin.dimension.DictionaryDimEnc; +import org.apache.kylin.dimension.DimensionEncoding; +import org.apache.kylin.gridtable.GTBuilder; +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GTScanRange; +import org.apache.kylin.gridtable.GTScanRequest; +import org.apache.kylin.gridtable.GTScanRequestBuilder; +import org.apache.kylin.gridtable.GTUtil; +import org.apache.kylin.gridtable.GridTable; +import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.gridtable.GTFilterScanner.FilterResultCache; +import org.apache.kylin.gridtable.GTInfo.Builder; +import org.apache.kylin.gridtable.memstore.GTSimpleMemStore; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.datatype.LongMutable; +import org.apache.kylin.metadata.filter.ColumnTupleFilter; +import org.apache.kylin.metadata.filter.CompareTupleFilter; +import org.apache.kylin.metadata.filter.ConstantTupleFilter; +import org.apache.kylin.metadata.filter.ExtractTupleFilter; +import org.apache.kylin.metadata.filter.LogicalTupleFilter; +import org.apache.kylin.metadata.filter.TupleFilter; +import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; +import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class DictGridTableTest extends LocalFileMetadataTestCase { + + private GridTable table; + private GTInfo info; + private CompareTupleFilter timeComp0; + private CompareTupleFilter timeComp1; + private CompareTupleFilter timeComp2; + private CompareTupleFilter timeComp3; + private CompareTupleFilter timeComp4; + private CompareTupleFilter timeComp5; + private CompareTupleFilter timeComp6; + private CompareTupleFilter ageComp1; + private CompareTupleFilter ageComp2; + private CompareTupleFilter ageComp3; + private CompareTupleFilter ageComp4; + + @After + public void after() throws Exception { + + this.cleanupTestMetadata(); + } + + @Before + public void setup() throws IOException { + + this.createTestMetadata(); + + table = newTestTable(); + info = table.getInfo(); + + timeComp0 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-14")); + timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14")); + timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13")); + timeComp3 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-15")); + timeComp4 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-15")); + timeComp5 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-15")); + timeComp6 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-14")); + ageComp1 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "10")); + ageComp2 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "20")); + ageComp3 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "30")); + ageComp4 = compare(info.colRef(1), FilterOperatorEnum.NEQ, enc(info, 1, "30")); + + } + + @Test + public void verifySegmentSkipping() { + + ByteArray segmentStart = enc(info, 0, "2015-01-14"); + ByteArray segmentStartX = enc(info, 0, "2015-01-14 00:00:00");//when partition col is dict encoded, time format will be free + ByteArray segmentEnd = enc(info, 0, "2015-01-15"); + assertEquals(segmentStart, segmentStartX); + + { + LogicalTupleFilter filter = and(timeComp0, ageComp1); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); + List<GTScanRange> r = planner.planScanRanges(); + assertEquals(1, r.size());//scan range are [close,close] + assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString()); + assertEquals(1, r.get(0).fuzzyKeys.size()); + assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString()); + } + { + LogicalTupleFilter filter = and(timeComp2, ageComp1); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); + List<GTScanRange> r = planner.planScanRanges(); + assertEquals(0, r.size()); + } + { + LogicalTupleFilter filter = and(timeComp4, ageComp1); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); + List<GTScanRange> r = planner.planScanRanges(); + assertEquals(0, r.size()); + } + { + LogicalTupleFilter filter = and(timeComp5, ageComp1); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); + List<GTScanRange> r = planner.planScanRanges(); + assertEquals(0, r.size()); + } + { + LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1), and(timeComp6, ageComp1)); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); + List<GTScanRange> r = planner.planScanRanges(); + assertEquals(1, r.size()); + assertEquals("[1421193600000, 10]-[null, 10]", r.get(0).toString()); + assertEquals("[[null, 10, null, null, null], [1421193600000, 10, null, null, null]]", r.get(0).fuzzyKeys.toString()); + } + { + LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); + List<GTScanRange> r = planner.planScanRanges(); + assertEquals(1, r.size()); + assertEquals("[1421193600000, null]-[null, null]", r.get(0).toString()); + assertEquals(0, r.get(0).fuzzyKeys.size()); + } + { + //skip FALSE filter + LogicalTupleFilter filter = and(ageComp1, ConstantTupleFilter.FALSE); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); + List<GTScanRange> r = planner.planScanRanges(); + assertEquals(0, r.size()); + } + { + //TRUE or FALSE filter + LogicalTupleFilter filter = or(ConstantTupleFilter.TRUE, ConstantTupleFilter.FALSE); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); + List<GTScanRange> r = planner.planScanRanges(); + assertEquals(1, r.size()); + assertEquals("[null, null]-[null, null]", r.get(0).toString()); + } + { + //TRUE or other filter + LogicalTupleFilter filter = or(ageComp1, ConstantTupleFilter.TRUE); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); + List<GTScanRange> r = planner.planScanRanges(); + assertEquals(1, r.size()); + assertEquals("[null, null]-[null, null]", r.get(0).toString()); + } + } + + @Test + public void verifySegmentSkipping2() { + ByteArray segmentEnd = enc(info, 0, "2015-01-15"); + + { + LogicalTupleFilter filter = and(timeComp0, ageComp1); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), info.colRef(0), filter); + List<GTScanRange> r = planner.planScanRanges(); + assertEquals(1, r.size());//scan range are [close,close] + assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString()); + assertEquals(1, r.get(0).fuzzyKeys.size()); + assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString()); + } + + { + LogicalTupleFilter filter = and(timeComp5, ageComp1); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), info.colRef(0), filter); + List<GTScanRange> r = planner.planScanRanges(); + assertEquals(0, r.size());//scan range are [close,close] + } + } + + @Test + public void verifyScanRangePlanner() { + + // flatten or-and & hbase fuzzy value + { + LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2)); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter); + List<GTScanRange> r = planner.planScanRanges(); + assertEquals(1, r.size()); + assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString()); + assertEquals("[[null, 10, null, null, null], [null, 20, null, null, null]]", r.get(0).fuzzyKeys.toString()); + } + + // pre-evaluate ever false + { + LogicalTupleFilter filter = and(timeComp1, timeComp2); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter); + List<GTScanRange> r = planner.planScanRanges(); + assertEquals(0, r.size()); + } + + // pre-evaluate ever true + { + LogicalTupleFilter filter = or(timeComp1, ageComp4); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter); + List<GTScanRange> r = planner.planScanRanges(); + assertEquals("[[null, null]-[null, null]]", r.toString()); + } + + // merge overlap range + { + LogicalTupleFilter filter = or(timeComp1, timeComp3); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter); + List<GTScanRange> r = planner.planScanRanges(); + assertEquals("[[null, null]-[null, null]]", r.toString()); + } + + // merge too many ranges + { + LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2), and(timeComp4, ageComp3)); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter); + List<GTScanRange> r = planner.planScanRanges(); + assertEquals(3, r.size()); + assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString()); + assertEquals("[1421280000000, 20]-[1421280000000, 20]", r.get(1).toString()); + assertEquals("[1421280000000, 30]-[1421280000000, 30]", r.get(2).toString()); + planner.setMaxScanRanges(2); + List<GTScanRange> r2 = planner.planScanRanges(); + assertEquals("[[1421280000000, 10]-[1421280000000, 30]]", r2.toString()); + } + } + + @Test + public void verifyFirstRow() throws IOException { + doScanAndVerify(table, new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest(), "[1421193600000, 30, Yang, 10, 10.5]", // + "[1421193600000, 30, Luke, 10, 10.5]", // + "[1421280000000, 20, Dong, 10, 10.5]", // + "[1421280000000, 20, Jason, 10, 10.5]", // + "[1421280000000, 30, Xu, 10, 10.5]", // + "[1421366400000, 20, Mahone, 10, 10.5]", // + "[1421366400000, 20, Qianhao, 10, 10.5]", // + "[1421366400000, 30, George, 10, 10.5]", // + "[1421366400000, 30, Shaofeng, 10, 10.5]", // + "[1421452800000, 10, Kejia, 10, 10.5]"); + } + + //for testing GTScanRequest serialization and deserialization + public static GTScanRequest useDeserializedGTScanRequest(GTScanRequest origin) { + ByteBuffer buffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE); + GTScanRequest.serializer.serialize(origin, buffer); + buffer.flip(); + GTScanRequest sGTScanRequest = GTScanRequest.serializer.deserialize(buffer); + + Assert.assertArrayEquals(origin.getAggrMetricsFuncs(), sGTScanRequest.getAggrMetricsFuncs()); + Assert.assertEquals(origin.getAggCacheMemThreshold(), sGTScanRequest.getAggCacheMemThreshold(), 0.01); + return sGTScanRequest; + } + + @Test + public void verifyScanWithUnevaluatableFilter() throws IOException { + GTInfo info = table.getInfo(); + + CompareTupleFilter fComp = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14")); + ExtractTupleFilter fUnevaluatable = unevaluatable(info.colRef(1)); + LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1))); + LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable); + + GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" }).setFilterPushDown(filter).createGTScanRequest(); + + // note the unEvaluatable column 1 in filter is added to group by + assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString()); + + doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 20, null]", "[1421280000000, 30, null, 10, null]", "[1421366400000, 20, null, 20, null]", "[1421366400000, 30, null, 20, null]", "[1421452800000, 10, null, 10, null]"); + } + + @Test + public void verifyScanWithEvaluatableFilter() throws IOException { + GTInfo info = table.getInfo(); + + CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14")); + CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10")); + LogicalTupleFilter filter = and(fComp1, fComp2); + + GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" }).setFilterPushDown(filter).createGTScanRequest(); + // note the evaluatable column 1 in filter is added to returned columns but not in group by + assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString()); + + doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 30, null]", "[1421366400000, 20, null, 40, null]"); + } + + @Test + public void testFilterScannerPerf() throws IOException { + GridTable table = newTestPerfTable(); + GTInfo info = table.getInfo(); + + CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14")); + CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10")); + LogicalTupleFilter filter = and(fComp1, fComp2); + + FilterResultCache.ENABLED = false; + testFilterScannerPerfInner(table, info, filter); + FilterResultCache.ENABLED = true; + testFilterScannerPerfInner(table, info, filter); + FilterResultCache.ENABLED = false; + testFilterScannerPerfInner(table, info, filter); + FilterResultCache.ENABLED = true; + testFilterScannerPerfInner(table, info, filter); + } + + @SuppressWarnings("unused") + private void testFilterScannerPerfInner(GridTable table, GTInfo info, LogicalTupleFilter filter) throws IOException { + long start = System.currentTimeMillis(); + GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(filter).createGTScanRequest(); + IGTScanner scanner = table.scan(req); + int i = 0; + for (GTRecord r : scanner) { + i++; + } + scanner.close(); + long end = System.currentTimeMillis(); + System.out.println((end - start) + "ms with filter cache enabled=" + FilterResultCache.ENABLED + ", " + i + " rows"); + } + + @Test + public void verifyConvertFilterConstants1() { + GTInfo info = table.getInfo(); + + TableDesc extTable = TableDesc.mockup("ext"); + TblColRef extColA = ColumnDesc.mockup(extTable, 1, "A", "timestamp").getRef(); + TblColRef extColB = ColumnDesc.mockup(extTable, 2, "B", "integer").getRef(); + + CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14"); + CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.EQ, "10"); + LogicalTupleFilter filter = and(fComp1, fComp2); + + List<TblColRef> colMapping = Lists.newArrayList(); + colMapping.add(extColA); + colMapping.add(extColB); + + TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null); + assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 EQ [\\x00]]", newFilter.toString()); + } + + @Test + public void verifyConvertFilterConstants2() { + GTInfo info = table.getInfo(); + + TableDesc extTable = TableDesc.mockup("ext"); + TblColRef extColA = ColumnDesc.mockup(extTable, 1, "A", "timestamp").getRef(); + TblColRef extColB = ColumnDesc.mockup(extTable, 2, "B", "integer").getRef(); + + CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14"); + CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.LT, "9"); + LogicalTupleFilter filter = and(fComp1, fComp2); + + List<TblColRef> colMapping = Lists.newArrayList(); + colMapping.add(extColA); + colMapping.add(extColB); + + // $1<"9" round up to $1<"10" + TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null); + assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 LT [\\x00]]", newFilter.toString()); + } + + @Test + public void verifyConvertFilterConstants3() { + GTInfo info = table.getInfo(); + + TableDesc extTable = TableDesc.mockup("ext"); + TblColRef extColA = ColumnDesc.mockup(extTable, 1, "A", "timestamp").getRef(); + TblColRef extColB = ColumnDesc.mockup(extTable, 2, "B", "integer").getRef(); + + CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14"); + CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.LTE, "9"); + LogicalTupleFilter filter = and(fComp1, fComp2); + + List<TblColRef> colMapping = Lists.newArrayList(); + colMapping.add(extColA); + colMapping.add(extColB); + + // $1<="9" round down to FALSE + TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null); + assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], []]", newFilter.toString()); + } + + @Test + public void verifyConvertFilterConstants4() { + GTInfo info = table.getInfo(); + + TableDesc extTable = TableDesc.mockup("ext"); + TblColRef extColA = ColumnDesc.mockup(extTable, 1, "A", "timestamp").getRef(); + TblColRef extColB = ColumnDesc.mockup(extTable, 2, "B", "integer").getRef(); + + CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14"); + CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.IN, "9", "10", "15"); + LogicalTupleFilter filter = and(fComp1, fComp2); + + List<TblColRef> colMapping = Lists.newArrayList(); + colMapping.add(extColA); + colMapping.add(extColB); + + // $1 in ("9", "10", "15") has only "10" left + TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null); + assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 IN [\\x00]]", newFilter.toString()); + } + + private void doScanAndVerify(GridTable table, GTScanRequest req, String... verifyRows) throws IOException { + System.out.println(req); + IGTScanner scanner = table.scan(req); + int i = 0; + for (GTRecord r : scanner) { + System.out.println(r); + if (verifyRows == null || i >= verifyRows.length) { + Assert.fail(); + } + assertEquals(verifyRows[i], r.toString()); + i++; + } + scanner.close(); + } + + public static ByteArray enc(GTInfo info, int col, String value) { + ByteBuffer buf = ByteBuffer.allocate(info.getMaxColumnLength()); + info.getCodeSystem().encodeColumnValue(col, value, buf); + return ByteArray.copyOf(buf.array(), buf.arrayOffset(), buf.position()); + } + + public static ExtractTupleFilter unevaluatable(TblColRef col) { + ExtractTupleFilter r = new ExtractTupleFilter(FilterOperatorEnum.EXTRACT); + r.addChild(new ColumnTupleFilter(col)); + return r; + } + + public static CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op, Object... value) { + CompareTupleFilter result = new CompareTupleFilter(op); + result.addChild(new ColumnTupleFilter(col)); + result.addChild(new ConstantTupleFilter(Arrays.asList(value))); + return result; + } + + public static LogicalTupleFilter and(TupleFilter... children) { + return logic(FilterOperatorEnum.AND, children); + } + + public static LogicalTupleFilter or(TupleFilter... children) { + return logic(FilterOperatorEnum.OR, children); + } + + public static LogicalTupleFilter not(TupleFilter child) { + return logic(FilterOperatorEnum.NOT, child); + } + + public static LogicalTupleFilter logic(FilterOperatorEnum op, TupleFilter... children) { + LogicalTupleFilter result = new LogicalTupleFilter(op); + for (TupleFilter c : children) { + result.addChild(c); + } + return result; + } + + public static GridTable newTestTable() throws IOException { + GTInfo info = newInfo(); + GTSimpleMemStore store = new GTSimpleMemStore(info); + GridTable table = new GridTable(info, store); + + GTRecord r = new GTRecord(table.getInfo()); + GTBuilder builder = table.rebuild(); + + builder.write(r.setValues("2015-01-14", "30", "Yang", new LongMutable(10), new BigDecimal("10.5"))); + builder.write(r.setValues("2015-01-14", "30", "Luke", new LongMutable(10), new BigDecimal("10.5"))); + builder.write(r.setValues("2015-01-15", "20", "Dong", new LongMutable(10), new BigDecimal("10.5"))); + builder.write(r.setValues("2015-01-15", "20", "Jason", new LongMutable(10), new BigDecimal("10.5"))); + builder.write(r.setValues("2015-01-15", "30", "Xu", new LongMutable(10), new BigDecimal("10.5"))); + builder.write(r.setValues("2015-01-16", "20", "Mahone", new LongMutable(10), new BigDecimal("10.5"))); + builder.write(r.setValues("2015-01-16", "20", "Qianhao", new LongMutable(10), new BigDecimal("10.5"))); + builder.write(r.setValues("2015-01-16", "30", "George", new LongMutable(10), new BigDecimal("10.5"))); + builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new LongMutable(10), new BigDecimal("10.5"))); + builder.write(r.setValues("2015-01-17", "10", "Kejia", new LongMutable(10), new BigDecimal("10.5"))); + builder.close(); + + return table; + } + + static GridTable newTestPerfTable() throws IOException { + GTInfo info = newInfo(); + GTSimpleMemStore store = new GTSimpleMemStore(info); + GridTable table = new GridTable(info, store); + + GTRecord r = new GTRecord(table.getInfo()); + GTBuilder builder = table.rebuild(); + + for (int i = 0; i < 100000; i++) { + for (int j = 0; j < 10; j++) + builder.write(r.setValues("2015-01-14", "30", "Yang", new LongMutable(10), new BigDecimal("10.5"))); + + for (int j = 0; j < 10; j++) + builder.write(r.setValues("2015-01-14", "30", "Luke", new LongMutable(10), new BigDecimal("10.5"))); + + for (int j = 0; j < 10; j++) + builder.write(r.setValues("2015-01-15", "20", "Dong", new LongMutable(10), new BigDecimal("10.5"))); + + for (int j = 0; j < 10; j++) + builder.write(r.setValues("2015-01-15", "20", "Jason", new LongMutable(10), new BigDecimal("10.5"))); + + for (int j = 0; j < 10; j++) + builder.write(r.setValues("2015-01-15", "30", "Xu", new LongMutable(10), new BigDecimal("10.5"))); + + for (int j = 0; j < 10; j++) + builder.write(r.setValues("2015-01-16", "20", "Mahone", new LongMutable(10), new BigDecimal("10.5"))); + + for (int j = 0; j < 10; j++) + builder.write(r.setValues("2015-01-16", "20", "Qianhao", new LongMutable(10), new BigDecimal("10.5"))); + + for (int j = 0; j < 10; j++) + builder.write(r.setValues("2015-01-16", "30", "George", new LongMutable(10), new BigDecimal("10.5"))); + + for (int j = 0; j < 10; j++) + builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new LongMutable(10), new BigDecimal("10.5"))); + + for (int j = 0; j < 10; j++) + builder.write(r.setValues("2015-01-17", "10", "Kejia", new LongMutable(10), new BigDecimal("10.5"))); + } + builder.close(); + + return table; + } + + static GTInfo newInfo() { + Builder builder = GTInfo.builder(); + builder.setCodeSystem(newDictCodeSystem()); + builder.setColumns( // + DataType.getType("timestamp"), // + DataType.getType("integer"), // + DataType.getType("varchar(10)"), // + DataType.getType("bigint"), // + DataType.getType("decimal") // + ); + builder.setPrimaryKey(setOf(0, 1)); + builder.setColumnPreferIndex(setOf(0)); + builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0, 1), setOf(2), setOf(3, 4) }); + builder.enableRowBlock(4); + GTInfo info = builder.build(); + return info; + } + + @SuppressWarnings("unchecked") + private static CubeCodeSystem newDictCodeSystem() { + DimensionEncoding[] dimEncs = new DimensionEncoding[3]; + dimEncs[1] = new DictionaryDimEnc(newDictionaryOfInteger()); + dimEncs[2] = new DictionaryDimEnc(newDictionaryOfString()); + return new CubeCodeSystem(dimEncs); + } + + @SuppressWarnings("rawtypes") + private static Dictionary newDictionaryOfString() { + TrieDictionaryBuilder<String> builder = new TrieDictionaryBuilder<>(new StringBytesConverter()); + builder.addValue("Dong"); + builder.addValue("George"); + builder.addValue("Jason"); + builder.addValue("Kejia"); + builder.addValue("Luke"); + builder.addValue("Mahone"); + builder.addValue("Qianhao"); + builder.addValue("Shaofeng"); + builder.addValue("Xu"); + builder.addValue("Yang"); + return builder.build(0); + } + + @SuppressWarnings("rawtypes") + private static Dictionary newDictionaryOfInteger() { + NumberDictionaryBuilder<String> builder = new NumberDictionaryBuilder<>(new StringBytesConverter()); + builder.addValue("10"); + builder.addValue("20"); + builder.addValue("30"); + builder.addValue("40"); + builder.addValue("50"); + builder.addValue("60"); + builder.addValue("70"); + builder.addValue("80"); + builder.addValue("90"); + builder.addValue("100"); + return builder.build(0); + } + + public static ImmutableBitSet setOf(int... values) { + BitSet set = new BitSet(); + for (int i : values) + set.set(i); + return new ImmutableBitSet(set); + } +}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java index 375b198..fc2fd52 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java @@ -22,13 +22,16 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.sql.DriverManager; +import java.sql.SQLException; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.apache.kylin.gridtable.GTScanSelfTerminatedException; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.query.enumerator.OLAPQuery; @@ -37,18 +40,26 @@ import org.apache.kylin.query.routing.Candidate; import org.apache.kylin.query.routing.rules.RemoveBlackoutRealizationsRule; import org.apache.kylin.query.schema.OLAPSchemaFactory; import org.apache.kylin.storage.hbase.HBaseStorage; +import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior; import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler; import org.dbunit.database.DatabaseConnection; import org.dbunit.database.IDatabaseConnection; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import com.google.common.collect.Maps; public class ITKylinQueryTest extends KylinTestBase { + @Rule + public ExpectedException thrown = ExpectedException.none(); + @BeforeClass public static void setUp() throws Exception { printInfo("setUp in ITKylinQueryTest"); @@ -108,10 +119,52 @@ public class ITKylinQueryTest extends KylinTestBase { return ""; } - @Ignore("this is only for debug") + @Test + public void testTimeoutQuery() throws Exception { + + thrown.expect(SQLException.class); + + //should not break at table duplicate check, should fail at model duplicate check + thrown.expectCause(new BaseMatcher<Throwable>() { + @Override + public boolean matches(Object item) { + if (item instanceof GTScanSelfTerminatedException) { + return true; + } + return false; + } + + @Override + public void describeTo(Description description) { + } + }); + + Map<String, String> toggles = Maps.newHashMap(); + toggles.put(BackdoorToggles.DEBUG_TOGGLE_COPROCESSOR_BEHAVIOR, CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.toString());//delay 10ms for every scan + BackdoorToggles.setToggles(toggles); + + KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cube.visit.timeout.times", "0.03");//set timeout to 9s + + //these two cubes has RAW measure, will disturb limit push down + RemoveBlackoutRealizationsRule.blackouts.add("CUBE[name=test_kylin_cube_without_slr_left_join_empty]"); + RemoveBlackoutRealizationsRule.blackouts.add("CUBE[name=test_kylin_cube_without_slr_inner_join_empty]"); + + execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_timeout", null, true); + + //these two cubes has RAW measure, will disturb limit push down + RemoveBlackoutRealizationsRule.blackouts.remove("CUBE[name=test_kylin_cube_without_slr_left_join_empty]"); + RemoveBlackoutRealizationsRule.blackouts.remove("CUBE[name=test_kylin_cube_without_slr_inner_join_empty]"); + + KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cube.visit.timeout.times", "1");//set timeout to 9s + BackdoorToggles.cleanToggles(); + } + + //don't try to ignore this test, try to clean your "temp" folder @Test public void testTempQuery() throws Exception { + PRINT_RESULT = true; execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/temp", null, true); + PRINT_RESULT = false; } @Ignore http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/kylin-it/src/test/resources/query/sql_timeout/query01.sql ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/resources/query/sql_timeout/query01.sql b/kylin-it/src/test/resources/query/sql_timeout/query01.sql new file mode 100644 index 0000000..3b9a837 --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_timeout/query01.sql @@ -0,0 +1,19 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +select * from test_kylin_fact limit 1200 http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorBehavior.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorBehavior.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorBehavior.java index 75533cd..5f21351 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorBehavior.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorBehavior.java @@ -26,4 +26,5 @@ public enum CoprocessorBehavior { SCAN_FILTER, //only scan+filter used,used for profiling filter speed. Will not return any result SCAN_FILTER_AGGR, //aggregate the result. Will return results SCAN_FILTER_AGGR_CHECKMEM, //default full operations. Will return results + SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY, // on each scan operation, delay for 10s to simulate slow queries, for test use } http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 07a3cc3..5b48351 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -43,8 +43,8 @@ import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.ISegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.gridtable.GTInfo; -import org.apache.kylin.gridtable.GTScanRange; import org.apache.kylin.gridtable.GTScanRequest; +import org.apache.kylin.gridtable.GTScanSelfTerminatedException; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior; @@ -106,7 +106,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { final String toggle = BackdoorToggles.getCoprocessorBehavior() == null ? CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : BackdoorToggles.getCoprocessorBehavior(); - logger.debug("New scanner for current segment {} will use {} as endpoint's behavior", cubeSeg, toggle); + logger.info("New scanner for current segment {} will use {} as endpoint's behavior", cubeSeg, toggle); Pair<Short, Short> shardNumAndBaseShard = getShardNumAndBaseShard(); short shardNum = shardNumAndBaseShard.getFirst(); @@ -146,7 +146,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { rawScanBufferSize *= 4; } } - scanRequest.setGTScanRanges(Lists.<GTScanRange> newArrayList());//since raw scans are sent to coprocessor, we don't need to duplicate sending it + scanRequest.clearScanRanges();//since raw scans are sent to coprocessor, we don't need to duplicate sending it int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE; while (true) { @@ -248,7 +248,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { } if (abnormalFinish[0]) { - Throwable ex = new RuntimeException(logHeader + "The coprocessor thread stopped itself due to scan timeout or scan threshold(check region server log), failing current query..."); + Throwable ex = new GTScanSelfTerminatedException(logHeader + "The coprocessor thread stopped itself due to scan timeout or scan threshold(check region server log), failing current query..."); logger.error(logHeader + "Error when visiting cubes by endpoint", ex); // double log coz the query thread may already timeout epResultItr.notifyCoprocException(ex); return; http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java index a359d19..f1e5dab 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java @@ -213,7 +213,7 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { } }; - IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT, cubeSeg.getRowKeyPreambleSize()); + IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT, cubeSeg.getRowKeyPreambleSize(), false); IGTScanner rawScanner = store.scan(scanRequest); final IGTScanner decorateScanner = scanRequest.decorateScanner(rawScanner); http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java index 7d48c1a..442963f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java @@ -18,20 +18,22 @@ package org.apache.kylin.storage.hbase.cube.v2; +import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.debug.BackdoorToggles; +import org.apache.kylin.gridtable.GTScanRequest; +import org.apache.kylin.gridtable.GTScanSelfTerminatedException; import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Iterator; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - class ExpectedSizeIterator implements Iterator<byte[]> { private static final Logger logger = LoggerFactory.getLogger(ExpectedSizeIterator.class); @@ -48,22 +50,24 @@ class ExpectedSizeIterator implements Iterator<byte[]> { this.expectedSize = expectedSize; this.queue = new ArrayBlockingQueue<byte[]>(expectedSize); + StringBuilder sb = new StringBuilder(); Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); + this.rpcTimeout = hconf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); this.timeout = this.rpcTimeout * hconf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - logger.info("rpc timeout is {} and after multiply retry times become {}", this.rpcTimeout, this.timeout); - this.timeout = Math.max(this.timeout, 5 * 60000); + sb.append("rpc timeout is " + this.rpcTimeout + " and after multiply retry times becomes " + this.timeout); + this.timeout *= KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes(); + sb.append(" after multiply kylin.query.cube.visit.timeout.times becomes " + this.timeout); + + logger.info(sb.toString()); if (BackdoorToggles.getQueryTimeout() != -1) { this.timeout = BackdoorToggles.getQueryTimeout(); + logger.info("rpc timeout is overwritten to " + this.timeout); } - this.timeout *= 1.1; // allow for some delay - - logger.info("Final Timeout for ExpectedSizeIterator is: " + this.timeout); - - this.timeoutTS = System.currentTimeMillis() + this.timeout; + this.timeoutTS = System.currentTimeMillis() + 2 * this.timeout;//longer timeout than coprocessor so that query thread will not timeout faster than coprocessor } @Override @@ -85,9 +89,14 @@ class ExpectedSizeIterator implements Iterator<byte[]> { } if (coprocException != null) { - throw new RuntimeException("Error in coprocessor", coprocException); + if (coprocException instanceof GTScanSelfTerminatedException) + throw (GTScanSelfTerminatedException) coprocException; + else + throw new RuntimeException("Error in coprocessor",coprocException); + } else if (ret == null) { - throw new RuntimeException("Timeout visiting cube!"); + throw new RuntimeException("Timeout visiting cube! Check why coprocessor exception is not sent back? In coprocessor Self-termination is checked every " + // + GTScanRequest.terminateCheckInterval + " scanned rows, the configured timeout(" + timeout + ") cannot support this many scans?"); } else { return ret; } @@ -110,7 +119,7 @@ class ExpectedSizeIterator implements Iterator<byte[]> { } public long getRpcTimeout() { - return this.rpcTimeout; + return this.timeout; } public void notifyCoprocException(Throwable ex) { http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java index 4b9b4fa..1d8ad79 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java @@ -43,13 +43,15 @@ public class HBaseReadonlyStore implements IGTStore { private List<Pair<byte[], byte[]>> hbaseColumns; private List<List<Integer>> hbaseColumnsToGT; private int rowkeyPreambleSize; + private boolean withDelay = false; - public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> hbaseColumnsToGT, int rowkeyPreambleSize) { + public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> hbaseColumnsToGT, int rowkeyPreambleSize, boolean withDelay) { this.cellListIterator = cellListIterator; this.info = gtScanRequest.getInfo(); this.hbaseColumns = hbaseColumns; this.hbaseColumnsToGT = hbaseColumnsToGT; this.rowkeyPreambleSize = rowkeyPreambleSize; + this.withDelay = withDelay; } @Override @@ -95,6 +97,13 @@ public class HBaseReadonlyStore implements IGTStore { @Override public boolean hasNext() { + if (withDelay) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } return cellListIterator.hasNext(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index b29d0d1..064d100 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -170,7 +170,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement @SuppressWarnings("checkstyle:methodlength") @Override - public void visitCube(final RpcController controller, CubeVisitProtos.CubeVisitRequest request, RpcCallback<CubeVisitProtos.CubeVisitResponse> done) { + public void visitCube(final RpcController controller, final CubeVisitProtos.CubeVisitRequest request, RpcCallback<CubeVisitProtos.CubeVisitResponse> done) { List<RegionScanner> regionScanners = Lists.newArrayList(); HRegion region = null; @@ -241,7 +241,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement } if (behavior.ordinal() < CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) { - scanReq.setAggCacheMemThreshold(0); // disable mem check if so told + scanReq.disableAggCacheMemCheck(); // disable mem check if so told } final MutableBoolean scanNormalComplete = new MutableBoolean(true); @@ -266,7 +266,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement throw new GTScanExceedThresholdException("Exceed scan threshold at " + counter); } - if (counter % 100000 == 1) { + if (counter % (10 * GTScanRequest.terminateCheckInterval) == 1) { logger.info("Scanned " + counter + " rows from HBase."); } counter++; @@ -284,7 +284,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement } }; - IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize()); + IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, // + request.getRowkeyPreambleSize(), CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.toString().equals(request.getBehavior())); IGTScanner rawScanner = store.scan(scanReq); IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, // @@ -299,14 +300,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement try { for (GTRecord oneRecord : finalScanner) { - if (finalRowCount > storagePushDownLimit) { - logger.info("The finalScanner aborted because storagePushDownLimit is satisfied"); - break; - } - - if (finalRowCount % 100000 == 1) { + if (finalRowCount % GTScanRequest.terminateCheckInterval == 1) { if (System.currentTimeMillis() > deadline) { - throw new GTScanTimeoutException("finalScanner timeouts after scanned " + finalRowCount); + throw new GTScanTimeoutException("finalScanner timeouts after contributed " + finalRowCount); } } @@ -319,7 +315,15 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement } outputStream.write(buffer.array(), 0, buffer.position()); + finalRowCount++; + + //if it's doing storage aggr, then should rely on GTAggregateScanner's limit check + if (!scanReq.isDoingStorageAggregation() && finalRowCount >= storagePushDownLimit) { + //read one more record than limit + logger.info("The finalScanner aborted because storagePushDownLimit is satisfied"); + break; + } } } catch (GTScanTimeoutException e) { scanNormalComplete.setValue(false);