Repository: kylin Updated Branches: refs/heads/2.x-staging 9ba89b886 -> af889ca00
KYLIN-1222 fix bug & add topN to v1 query engine & restore testing v1 query engine in case need it as a fallback for v2 Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/af889ca0 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/af889ca0 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/af889ca0 Branch: refs/heads/2.x-staging Commit: af889ca002ae0d1859eb25a5a0f49ab44fa1dcf2 Parents: 9ba89b8 Author: honma <ho...@ebay.com> Authored: Wed Dec 9 16:47:22 2015 +0800 Committer: honma <ho...@ebay.com> Committed: Fri Dec 11 18:16:02 2015 +0800 ---------------------------------------------------------------------- .../kylin/query/test/ITCombinationTest.java | 10 +++- .../kylin/query/test/ITKylinQueryTest.java | 8 +-- .../apache/kylin/query/test/KylinTestBase.java | 33 +++++++++++ query/src/test/resources/query/sql/query90.sql | 24 ++++++++ query/src/test/resources/query/sql/query91.sql | 24 ++++++++ .../kylin/storage/hbase/HBaseStorage.java | 9 ++- .../hbase/cube/v1/CubeSegmentTupleIterator.java | 43 ++++++++++++++- .../storage/hbase/cube/v1/CubeStorageQuery.java | 14 +++-- .../hbase/cube/v1/CubeTupleConverter.java | 58 ++++++++++++++++++-- .../coprocessor/observer/ObserverEnabler.java | 12 ++-- .../storage/hbase/cube/v2/CubeStorageQuery.java | 2 + 11 files changed, 208 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/af889ca0/query/src/test/java/org/apache/kylin/query/test/ITCombinationTest.java ---------------------------------------------------------------------- diff --git a/query/src/test/java/org/apache/kylin/query/test/ITCombinationTest.java b/query/src/test/java/org/apache/kylin/query/test/ITCombinationTest.java index bbff87d..cbafc75 100644 --- a/query/src/test/java/org/apache/kylin/query/test/ITCombinationTest.java +++ b/query/src/test/java/org/apache/kylin/query/test/ITCombinationTest.java @@ -22,6 +22,7 @@ import java.sql.SQLException; import java.util.Arrays; import java.util.Collection; +import org.apache.kylin.storage.hbase.HBaseStorage; import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -41,6 +42,7 @@ public class ITCombinationTest extends ITKylinQueryTest { @AfterClass public static void tearDown() { clean(); + HBaseStorage.overwriteStorageQuery = null; } /** @@ -51,10 +53,10 @@ public class ITCombinationTest extends ITKylinQueryTest { @Parameterized.Parameters public static Collection<Object[]> configs() { // return Arrays.asList(new Object[][] { { "inner", "unset" }, { "left", "unset" }, { "inner", "off" }, { "left", "off" }, { "inner", "on" }, { "left", "on" }, }); - return Arrays.asList(new Object[][] { { "inner", "on" }, { "left", "on" } }); + return Arrays.asList(new Object[][] { { "inner", "on", "v2" }, { "left", "on", "v1" }, { "left", "on", "v2" } }); } - public ITCombinationTest(String joinType, String coprocessorToggle) throws Exception { + public ITCombinationTest(String joinType, String coprocessorToggle, String queryEngine) throws Exception { ITKylinQueryTest.clean(); @@ -68,5 +70,9 @@ public class ITCombinationTest extends ITKylinQueryTest { } else if (coprocessorToggle.equals("unset")) { // unset } + + if ("v1".equalsIgnoreCase(queryEngine)) { + HBaseStorage.overwriteStorageQuery = HBaseStorage.v1CubeStorageQuery; + } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/af889ca0/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java ---------------------------------------------------------------------- diff --git a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java index 72c366b..73e1263 100644 --- a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java +++ b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java @@ -31,8 +31,8 @@ import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.query.enumerator.OLAPQuery; import org.apache.kylin.query.relnode.OLAPContext; import org.apache.kylin.query.schema.OLAPSchemaFactory; -import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase; import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler; +import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase; import org.dbunit.database.DatabaseConnection; import org.dbunit.database.IDatabaseConnection; import org.junit.AfterClass; @@ -95,7 +95,7 @@ public class ITKylinQueryTest extends KylinTestBase { @Test public void testSingleRunQuery() throws Exception { - String queryFileName = "src/test/resources/query/temp/query01.sql"; + String queryFileName = "src/test/resources/query/sql_tableau/query20.sql"; File sqlFile = new File(queryFileName); if (sqlFile.exists()) { @@ -107,7 +107,7 @@ public class ITKylinQueryTest extends KylinTestBase { @Test public void testSingleExecuteQuery() throws Exception { - String queryFileName = "src/test/resources/query/sql/query58.sql"; + String queryFileName = "src/test/resources/query/sql_tableau/query20.sql"; File sqlFile = new File(queryFileName); String sql = getTextFromFile(sqlFile); @@ -168,7 +168,7 @@ public class ITKylinQueryTest extends KylinTestBase { @Test public void testTableauQuery() throws Exception { - batchExecuteQuery("src/test/resources/query/sql_tableau"); + execAndCompResultSize("src/test/resources/query/sql_tableau", null, true); } @Test http://git-wip-us.apache.org/repos/asf/kylin/blob/af889ca0/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java ---------------------------------------------------------------------- diff --git a/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java b/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java index 0399f8c..680acee 100644 --- a/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java +++ b/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java @@ -340,6 +340,39 @@ public class KylinTestBase { } } + protected void execAndCompResultSize(String queryFolder, String[] exclusiveQuerys, boolean needSort) throws Exception { + printInfo("---------- test folder: " + queryFolder); + Set<String> exclusiveSet = buildExclusiveSet(exclusiveQuerys); + + List<File> sqlFiles = getFilesFromFolder(new File(queryFolder), ".sql"); + for (File sqlFile : sqlFiles) { + String queryName = StringUtils.split(sqlFile.getName(), '.')[0]; + if (exclusiveSet.contains(queryName)) { + continue; + } + String sql = getTextFromFile(sqlFile); + + // execute Kylin + printInfo("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); + IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); + ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort); + + // execute H2 + printInfo("Query Result from H2 - " + queryName); + H2Connection h2Conn = new H2Connection(h2Connection, null); + h2Conn.getConfig().setProperty(DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new TestH2DataTypeFactory()); + ITable h2Table = executeQuery(h2Conn, queryName, sql, needSort); + + // compare the result + Assert.assertEquals(h2Table.getRowCount(), kylinTable.getRowCount()); + + compQueryCount++; + if (kylinTable.getRowCount() == 0) { + zeroResultQueries.add(sql); + } + } + } + protected void execAndCompQuery(String queryFolder, String[] exclusiveQuerys, boolean needSort) throws Exception { printInfo("---------- test folder: " + queryFolder); Set<String> exclusiveSet = buildExclusiveSet(exclusiveQuerys); http://git-wip-us.apache.org/repos/asf/kylin/blob/af889ca0/query/src/test/resources/query/sql/query90.sql ---------------------------------------------------------------------- diff --git a/query/src/test/resources/query/sql/query90.sql b/query/src/test/resources/query/sql/query90.sql new file mode 100644 index 0000000..1357af6 --- /dev/null +++ b/query/src/test/resources/query/sql/query90.sql @@ -0,0 +1,24 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + + + + + select test_kylin_fact.lstg_format_name, sum(price) as GMV, count(seller_id) as TRANS_CNT + from test_kylin_fact where test_kylin_fact.lstg_format_name > 'AB' + group by test_kylin_fact.lstg_format_name having count(seller_id) > 2 http://git-wip-us.apache.org/repos/asf/kylin/blob/af889ca0/query/src/test/resources/query/sql/query91.sql ---------------------------------------------------------------------- diff --git a/query/src/test/resources/query/sql/query91.sql b/query/src/test/resources/query/sql/query91.sql new file mode 100644 index 0000000..bdb66de --- /dev/null +++ b/query/src/test/resources/query/sql/query91.sql @@ -0,0 +1,24 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + + + + + select test_kylin_fact.lstg_format_name, sum(price) as GMV, count(seller_id) as TRANS_CNT + from test_kylin_fact where test_kylin_fact.lstg_format_name <= 'ABZ' + group by test_kylin_fact.lstg_format_name having count(seller_id) > 2 http://git-wip-us.apache.org/repos/asf/kylin/blob/af889ca0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java index e7c8116..c61212c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java @@ -44,8 +44,9 @@ import com.google.common.base.Preconditions; //used by reflection public class HBaseStorage implements IStorage { - private final static String v2CubeStorageQuery = "org.apache.kylin.storage.hbase.cube.v2.CubeStorageQuery"; - private final static String v1CubeStorageQuery = "org.apache.kylin.storage.hbase.cube.v1.CubeStorageQuery"; + public final static String v2CubeStorageQuery = "org.apache.kylin.storage.hbase.cube.v2.CubeStorageQuery"; + public final static String v1CubeStorageQuery = "org.apache.kylin.storage.hbase.cube.v1.CubeStorageQuery"; + public static String overwriteStorageQuery = null;//for test case private final static String defaultIIStorageQuery = "org.apache.kylin.storage.hbase.ii.InvertedIndexStorageQuery"; @@ -71,7 +72,9 @@ public class HBaseStorage implements IStorage { } else if (realization.getType() == RealizationType.CUBE) { String cubeStorageQuery; - if ("v1".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryVersion())) { + if (overwriteStorageQuery != null) { + cubeStorageQuery = overwriteStorageQuery; + } else if ("v1".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryVersion())) { cubeStorageQuery = v1CubeStorageQuery; } else { cubeStorageQuery = v2CubeStorageQuery;//by default use v2 http://git-wip-us.apache.org/repos/asf/kylin/blob/af889ca0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java index 43fb1b5..f760192 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java @@ -40,6 +40,7 @@ import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.model.HBaseColumnDesc; +import org.apache.kylin.measure.MeasureType; import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.tuple.ITupleIterator; @@ -83,6 +84,10 @@ public class CubeSegmentTupleIterator implements ITupleIterator { protected Tuple next; protected final Cuboid cuboid; + private List<MeasureType.IAdvMeasureFiller> advMeasureFillers; + private int advMeasureRowsRemaining; + private int advMeasureRowIndex; + public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, // Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, // List<RowValueDecoder> rowValueDecoders, StorageContext context, TupleInfo returnTupleInfo) { @@ -115,6 +120,17 @@ public class CubeSegmentTupleIterator implements ITupleIterator { if (next != null) return true; + // consume any left rows from advanced measure filler + if (advMeasureRowsRemaining > 0) { + for (MeasureType.IAdvMeasureFiller filler : advMeasureFillers) { + filler.fillTuplle(oneTuple, advMeasureRowIndex); + } + advMeasureRowIndex++; + advMeasureRowsRemaining--; + next = oneTuple; + return true; + } + if (resultIterator == null) { if (rangeIterator.hasNext() == false) return false; @@ -132,9 +148,30 @@ public class CubeSegmentTupleIterator implements ITupleIterator { scanCount++; if (++scanCountDelta >= 1000) flushScanCountDelta(); - tupleConverter.translateResult(result, oneTuple); - next = oneTuple; - return true; + + // translate into tuple + advMeasureFillers = tupleConverter.translateResult(result, oneTuple); + + // the simple case + if (advMeasureFillers == null) { + next = oneTuple; + return true; + } + + // advanced measure filling, like TopN, will produce multiple tuples out of one record + advMeasureRowsRemaining = -1; + for (MeasureType.IAdvMeasureFiller filler : advMeasureFillers) { + if (advMeasureRowsRemaining < 0) + advMeasureRowsRemaining = filler.getNumOfRows(); + if (advMeasureRowsRemaining != filler.getNumOfRows()) + throw new IllegalStateException(); + } + if (advMeasureRowsRemaining < 0) + throw new IllegalStateException(); + + advMeasureRowIndex = 0; + return hasNext(); + } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/af889ca0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java index 4483cb3..b340be0 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java @@ -38,6 +38,7 @@ import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.common.util.ShardingHash; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -144,7 +145,7 @@ public class CubeStorageQuery implements ICachableStorageQuery { List<RowValueDecoder> valueDecoders = translateAggregation(cubeDesc.getHbaseMapping(), metrics, context); // memory hungry distinct count are pushed down to coprocessor, no need to set threshold any more - // setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory + setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial setLimit(filter, context); @@ -484,7 +485,9 @@ public class CubeStorageQuery implements ICachableStorageQuery { Collection<ColumnValueRange> andRanges = translateToAndDimRanges(andFilter.getChildren(), cubeSegment); - result.add(andRanges); + if (andRanges != null) { + result.add(andRanges); + } } return preprocessConstantConditions(result); @@ -687,8 +690,9 @@ public class CubeStorageQuery implements ICachableStorageQuery { short cuboidShardNum = segment.getCuboidShardNum(scan.getCuboid().getId()); short cuboidShardBase = segment.getCuboidBaseShard(scan.getCuboid().getId()); for (short i = 0; i < cuboidShardNum; ++i) { - byte[] newStartKey = duplicateKeyAndChangeShard(i, startKey); - byte[] newStopKey = duplicateKeyAndChangeShard(i, stopKey); + short newShard = ShardingHash.normalize(cuboidShardBase, i, segment.getTotalShards()); + byte[] newStartKey = duplicateKeyAndChangeShard(newShard, startKey); + byte[] newStopKey = duplicateKeyAndChangeShard(newShard, stopKey); HBaseKeyRange newRange = new HBaseKeyRange(segment, scan.getCuboid(), newStartKey, newStopKey, // scan.getFuzzyKeys(), scan.getFlatOrAndFilter(), scan.getPartitionColumnStartDate(), scan.getPartitionColumnEndDate()); ret.add(newRange); @@ -756,5 +760,5 @@ public class CubeStorageQuery implements ICachableStorageQuery { measureType.adjustSqlDigest(measure, sqlDigest); } } - + } http://git-wip-us.apache.org/repos/asf/kylin/blob/af889ca0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java index e414e08..3b90dfa 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java @@ -8,12 +8,15 @@ import java.util.Map.Entry; import org.apache.hadoop.hbase.client.Result; import org.apache.kylin.common.util.Array; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.kv.RowKeyDecoder; import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; import org.apache.kylin.dict.lookup.LookupStringTable; +import org.apache.kylin.measure.MeasureType; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; @@ -22,6 +25,7 @@ import org.apache.kylin.metadata.tuple.TupleInfo; import org.apache.kylin.storage.hbase.steps.RowValueDecoder; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; public class CubeTupleConverter { @@ -30,11 +34,16 @@ public class CubeTupleConverter { final TupleInfo tupleInfo; final RowKeyDecoder rowKeyDecoder; final List<RowValueDecoder> rowValueDecoders; - final List<IDerivedColumnFiller> derivedColFillers; + final List<IDerivedColumnFiller> derivedColFillers; final int[] dimensionTupleIdx; final int[][] metricsMeasureIdx; final int[][] metricsTupleIdx; + final List<MeasureType<?>> measureTypes; + + final List<MeasureType.IAdvMeasureFiller> advMeasureFillers; + final List<Pair<Integer, Integer>> advMeasureIndexInRV;//first=> which rowValueDecoders,second => metric index + public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, List<RowValueDecoder> rowValueDecoders, TupleInfo tupleInfo) { this.cubeSeg = cubeSeg; this.cuboid = cuboid; @@ -42,16 +51,20 @@ public class CubeTupleConverter { this.rowKeyDecoder = new RowKeyDecoder(this.cubeSeg); this.rowValueDecoders = rowValueDecoders; this.derivedColFillers = Lists.newArrayList(); - + List<TblColRef> dimCols = cuboid.getColumns(); + measureTypes = Lists.newArrayList(); + advMeasureFillers = Lists.newArrayListWithCapacity(1); + advMeasureIndexInRV = Lists.newArrayListWithCapacity(1); + // pre-calculate dimension index mapping to tuple dimensionTupleIdx = new int[dimCols.size()]; for (int i = 0; i < dimCols.size(); i++) { TblColRef col = dimCols.get(i); dimensionTupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1; + measureTypes.add(null); } - // pre-calculate metrics index mapping to tuple metricsMeasureIdx = new int[rowValueDecoders.size()][]; @@ -64,7 +77,7 @@ public class CubeTupleConverter { metricsTupleIdx[i] = new int[selectedMeasures.cardinality()]; for (int j = 0, mi = selectedMeasures.nextSetBit(0); j < metricsMeasureIdx[i].length; j++, mi = selectedMeasures.nextSetBit(mi + 1)) { FunctionDesc aggrFunc = measures[mi].getFunction(); - + int tupleIdx; // a rewrite metrics is identified by its rewrite field name if (aggrFunc.needRewrite()) { @@ -78,6 +91,16 @@ public class CubeTupleConverter { } metricsMeasureIdx[i][j] = mi; metricsTupleIdx[i][j] = tupleIdx; + + MeasureType<?> measureType = aggrFunc.getMeasureType(); + if (measureType.needAdvancedTupleFilling()) { + Map<TblColRef, Dictionary<String>> dictionaryMap = buildDictionaryMap(measureType.getColumnsNeedDictionary(aggrFunc)); + advMeasureFillers.add(measureType.getAdvancedTupleFiller(aggrFunc, tupleInfo, dictionaryMap)); + advMeasureIndexInRV.add(Pair.newPair(i, mi)); + measureTypes.add(null); + } else { + measureTypes.add(measureType); + } } } @@ -94,7 +117,16 @@ public class CubeTupleConverter { } } - public void translateResult(Result hbaseRow, Tuple tuple) { + // load only needed dictionaries + private Map<TblColRef, Dictionary<String>> buildDictionaryMap(List<TblColRef> columnsNeedDictionary) { + Map<TblColRef, Dictionary<String>> result = Maps.newHashMap(); + for (TblColRef col : columnsNeedDictionary) { + result.put(col, cubeSeg.getDictionary(col)); + } + return result; + } + + public List<MeasureType.IAdvMeasureFiller> translateResult(Result hbaseRow, Tuple tuple) { try { byte[] rowkey = hbaseRow.getRow(); rowKeyDecoder.decode(rowkey); @@ -125,8 +157,22 @@ public class CubeTupleConverter { int[] measureIdx = metricsMeasureIdx[i]; int[] tupleIdx = metricsTupleIdx[i]; for (int j = 0; j < measureIdx.length; j++) { - tuple.setMeasureValue(tupleIdx[j], measureValues[measureIdx[j]]); + if (measureTypes.get(dimensionValues.size() + j) != null) { + tuple.setMeasureValue(tupleIdx[j], measureValues[measureIdx[j]]); + } + } + } + + // advanced measure filling, due to possible row split, will complete at caller side + if (advMeasureFillers.isEmpty()) { + return null; + } else { + for (int i = 0; i < advMeasureFillers.size(); i++) { + Pair<Integer, Integer> metricLocation = advMeasureIndexInRV.get(i); + Object measureValue = rowValueDecoders.get(metricLocation.getFirst()).getValues()[metricLocation.getSecond()]; + advMeasureFillers.get(i).reload(measureValue); } + return advMeasureFillers; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/af889ca0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java index 4750ea4..13dbaa9 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java @@ -38,8 +38,8 @@ import org.apache.kylin.storage.StorageContext; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector; -import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType; +import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator; import org.apache.kylin.storage.hbase.cube.v1.RegionScannerAdapter; import org.apache.kylin.storage.hbase.cube.v1.ResultScannerAdapter; import org.apache.kylin.storage.hbase.steps.RowValueDecoder; @@ -62,7 +62,7 @@ public class ObserverEnabler { public static ResultScanner scanWithCoprocessorIfBeneficial(CubeSegment segment, Cuboid cuboid, TupleFilter tupleFiler, // Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context, HTableInterface table, Scan scan) throws IOException { - if (context.isCoprocessorEnabled() == false) { + if (true) { return table.getScanner(scan); } @@ -119,10 +119,10 @@ public class ObserverEnabler { return r; } - // if (RowValueDecoder.hasMemHungryCountDistinct(rowValueDecoders)) { - // logger.info("Coprocessor is disabled because there is memory hungry count distinct"); - // return false; - // } + if (RowValueDecoder.hasMemHungryMeasures(rowValueDecoders)) { + logger.info("Coprocessor is disabled because there is memory hungry count distinct"); + return false; + } if (context.isExactAggregation()) { logger.info("Coprocessor is disabled because exactAggregation is true"); http://git-wip-us.apache.org/repos/asf/kylin/blob/af889ca0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java index 41950f7..3a231b5 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java @@ -89,7 +89,9 @@ public class CubeStorageQuery implements ICachableStorageQuery { // replace derived columns in filter with host columns; columns on loosened condition must be added to group by TupleFilter filterD = translateDerived(filter, groupsD); + //actually even if the threshold is set, it will not be used in this query engine setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory + setLimit(filter, context); List<CubeSegmentScanner> scanners = Lists.newArrayList();