Repository: kylin
Updated Branches:
  refs/heads/2.x-staging 9ba89b886 -> af889ca00


KYLIN-1222 fix bug & add topN to v1 query engine & restore testing v1 query 
engine in case need it as a fallback for v2


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/af889ca0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/af889ca0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/af889ca0

Branch: refs/heads/2.x-staging
Commit: af889ca002ae0d1859eb25a5a0f49ab44fa1dcf2
Parents: 9ba89b8
Author: honma <ho...@ebay.com>
Authored: Wed Dec 9 16:47:22 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Dec 11 18:16:02 2015 +0800

----------------------------------------------------------------------
 .../kylin/query/test/ITCombinationTest.java     | 10 +++-
 .../kylin/query/test/ITKylinQueryTest.java      |  8 +--
 .../apache/kylin/query/test/KylinTestBase.java  | 33 +++++++++++
 query/src/test/resources/query/sql/query90.sql  | 24 ++++++++
 query/src/test/resources/query/sql/query91.sql  | 24 ++++++++
 .../kylin/storage/hbase/HBaseStorage.java       |  9 ++-
 .../hbase/cube/v1/CubeSegmentTupleIterator.java | 43 ++++++++++++++-
 .../storage/hbase/cube/v1/CubeStorageQuery.java | 14 +++--
 .../hbase/cube/v1/CubeTupleConverter.java       | 58 ++++++++++++++++++--
 .../coprocessor/observer/ObserverEnabler.java   | 12 ++--
 .../storage/hbase/cube/v2/CubeStorageQuery.java |  2 +
 11 files changed, 208 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/af889ca0/query/src/test/java/org/apache/kylin/query/test/ITCombinationTest.java
----------------------------------------------------------------------
diff --git 
a/query/src/test/java/org/apache/kylin/query/test/ITCombinationTest.java 
b/query/src/test/java/org/apache/kylin/query/test/ITCombinationTest.java
index bbff87d..cbafc75 100644
--- a/query/src/test/java/org/apache/kylin/query/test/ITCombinationTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/ITCombinationTest.java
@@ -22,6 +22,7 @@ import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collection;
 
+import org.apache.kylin.storage.hbase.HBaseStorage;
 import 
org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -41,6 +42,7 @@ public class ITCombinationTest extends ITKylinQueryTest {
     @AfterClass
     public static void tearDown() {
         clean();
+        HBaseStorage.overwriteStorageQuery = null;
     }
 
     /**
@@ -51,10 +53,10 @@ public class ITCombinationTest extends ITKylinQueryTest {
     @Parameterized.Parameters
     public static Collection<Object[]> configs() {
         //       return Arrays.asList(new Object[][] { { "inner", "unset" }, { 
"left", "unset" }, { "inner", "off" }, { "left", "off" }, { "inner", "on" }, { 
"left", "on" }, });
-        return Arrays.asList(new Object[][] { { "inner", "on" }, { "left", 
"on" } });
+        return Arrays.asList(new Object[][] { { "inner", "on", "v2" }, { 
"left", "on", "v1" }, { "left", "on", "v2" } });
     }
 
-    public ITCombinationTest(String joinType, String coprocessorToggle) throws 
Exception {
+    public ITCombinationTest(String joinType, String coprocessorToggle, String 
queryEngine) throws Exception {
 
         ITKylinQueryTest.clean();
 
@@ -68,5 +70,9 @@ public class ITCombinationTest extends ITKylinQueryTest {
         } else if (coprocessorToggle.equals("unset")) {
             // unset
         }
+
+        if ("v1".equalsIgnoreCase(queryEngine)) {
+            HBaseStorage.overwriteStorageQuery = 
HBaseStorage.v1CubeStorageQuery;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/af889ca0/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git 
a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java 
b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
index 72c366b..73e1263 100644
--- a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
@@ -31,8 +31,8 @@ import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.query.enumerator.OLAPQuery;
 import org.apache.kylin.query.relnode.OLAPContext;
 import org.apache.kylin.query.schema.OLAPSchemaFactory;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
 import 
org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
+import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
 import org.dbunit.database.DatabaseConnection;
 import org.dbunit.database.IDatabaseConnection;
 import org.junit.AfterClass;
@@ -95,7 +95,7 @@ public class ITKylinQueryTest extends KylinTestBase {
     @Test
     public void testSingleRunQuery() throws Exception {
 
-        String queryFileName = "src/test/resources/query/temp/query01.sql";
+        String queryFileName = 
"src/test/resources/query/sql_tableau/query20.sql";
 
         File sqlFile = new File(queryFileName);
         if (sqlFile.exists()) {
@@ -107,7 +107,7 @@ public class ITKylinQueryTest extends KylinTestBase {
     @Test
     public void testSingleExecuteQuery() throws Exception {
 
-        String queryFileName = "src/test/resources/query/sql/query58.sql";
+        String queryFileName = 
"src/test/resources/query/sql_tableau/query20.sql";
 
         File sqlFile = new File(queryFileName);
         String sql = getTextFromFile(sqlFile);
@@ -168,7 +168,7 @@ public class ITKylinQueryTest extends KylinTestBase {
 
     @Test
     public void testTableauQuery() throws Exception {
-        batchExecuteQuery("src/test/resources/query/sql_tableau");
+        execAndCompResultSize("src/test/resources/query/sql_tableau", null, 
true);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kylin/blob/af889ca0/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java 
b/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java
index 0399f8c..680acee 100644
--- a/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java
+++ b/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java
@@ -340,6 +340,39 @@ public class KylinTestBase {
         }
     }
 
+    protected void execAndCompResultSize(String queryFolder, String[] 
exclusiveQuerys, boolean needSort) throws Exception {
+        printInfo("---------- test folder: " + queryFolder);
+        Set<String> exclusiveSet = buildExclusiveSet(exclusiveQuerys);
+
+        List<File> sqlFiles = getFilesFromFolder(new File(queryFolder), 
".sql");
+        for (File sqlFile : sqlFiles) {
+            String queryName = StringUtils.split(sqlFile.getName(), '.')[0];
+            if (exclusiveSet.contains(queryName)) {
+                continue;
+            }
+            String sql = getTextFromFile(sqlFile);
+
+            // execute Kylin
+            printInfo("Query Result from Kylin - " + queryName + "  (" + 
queryFolder + ")");
+            IDatabaseConnection kylinConn = new 
DatabaseConnection(cubeConnection);
+            ITable kylinTable = executeQuery(kylinConn, queryName, sql, 
needSort);
+
+            // execute H2
+            printInfo("Query Result from H2 - " + queryName);
+            H2Connection h2Conn = new H2Connection(h2Connection, null);
+            
h2Conn.getConfig().setProperty(DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new 
TestH2DataTypeFactory());
+            ITable h2Table = executeQuery(h2Conn, queryName, sql, needSort);
+
+            // compare the result
+            Assert.assertEquals(h2Table.getRowCount(), 
kylinTable.getRowCount());
+
+            compQueryCount++;
+            if (kylinTable.getRowCount() == 0) {
+                zeroResultQueries.add(sql);
+            }
+        }
+    }
+
     protected void execAndCompQuery(String queryFolder, String[] 
exclusiveQuerys, boolean needSort) throws Exception {
         printInfo("---------- test folder: " + queryFolder);
         Set<String> exclusiveSet = buildExclusiveSet(exclusiveQuerys);

http://git-wip-us.apache.org/repos/asf/kylin/blob/af889ca0/query/src/test/resources/query/sql/query90.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query90.sql 
b/query/src/test/resources/query/sql/query90.sql
new file mode 100644
index 0000000..1357af6
--- /dev/null
+++ b/query/src/test/resources/query/sql/query90.sql
@@ -0,0 +1,24 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+
+ 
+ 
+  select test_kylin_fact.lstg_format_name, sum(price) as GMV, count(seller_id) 
as TRANS_CNT 
+ from test_kylin_fact where test_kylin_fact.lstg_format_name > 'AB' 
+ group by test_kylin_fact.lstg_format_name having count(seller_id) > 2 

http://git-wip-us.apache.org/repos/asf/kylin/blob/af889ca0/query/src/test/resources/query/sql/query91.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query91.sql 
b/query/src/test/resources/query/sql/query91.sql
new file mode 100644
index 0000000..bdb66de
--- /dev/null
+++ b/query/src/test/resources/query/sql/query91.sql
@@ -0,0 +1,24 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+
+ 
+ 
+  select test_kylin_fact.lstg_format_name, sum(price) as GMV, count(seller_id) 
as TRANS_CNT 
+ from test_kylin_fact where test_kylin_fact.lstg_format_name <= 'ABZ' 
+ group by test_kylin_fact.lstg_format_name having count(seller_id) > 2 

http://git-wip-us.apache.org/repos/asf/kylin/blob/af889ca0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
index e7c8116..c61212c 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
@@ -44,8 +44,9 @@ import com.google.common.base.Preconditions;
 //used by reflection
 public class HBaseStorage implements IStorage {
 
-    private final static String v2CubeStorageQuery = 
"org.apache.kylin.storage.hbase.cube.v2.CubeStorageQuery";
-    private final static String v1CubeStorageQuery = 
"org.apache.kylin.storage.hbase.cube.v1.CubeStorageQuery";
+    public final static String v2CubeStorageQuery = 
"org.apache.kylin.storage.hbase.cube.v2.CubeStorageQuery";
+    public final static String v1CubeStorageQuery = 
"org.apache.kylin.storage.hbase.cube.v1.CubeStorageQuery";
+    public static String overwriteStorageQuery = null;//for test case
 
     private final static String defaultIIStorageQuery = 
"org.apache.kylin.storage.hbase.ii.InvertedIndexStorageQuery";
 
@@ -71,7 +72,9 @@ public class HBaseStorage implements IStorage {
         } else if (realization.getType() == RealizationType.CUBE) {
 
             String cubeStorageQuery;
-            if 
("v1".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryVersion())) {
+            if (overwriteStorageQuery != null) {
+                cubeStorageQuery = overwriteStorageQuery;
+            } else if 
("v1".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryVersion())) {
                 cubeStorageQuery = v1CubeStorageQuery;
             } else {
                 cubeStorageQuery = v2CubeStorageQuery;//by default use v2

http://git-wip-us.apache.org/repos/asf/kylin/blob/af889ca0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
index 43fb1b5..f760192 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
@@ -40,6 +40,7 @@ import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
@@ -83,6 +84,10 @@ public class CubeSegmentTupleIterator implements 
ITupleIterator {
     protected Tuple next;
     protected final Cuboid cuboid;
 
+    private List<MeasureType.IAdvMeasureFiller> advMeasureFillers;
+    private int advMeasureRowsRemaining;
+    private int advMeasureRowIndex;
+
     public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> 
keyRanges, HConnection conn, //
             Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> 
groupBy, //
             List<RowValueDecoder> rowValueDecoders, StorageContext context, 
TupleInfo returnTupleInfo) {
@@ -115,6 +120,17 @@ public class CubeSegmentTupleIterator implements 
ITupleIterator {
         if (next != null)
             return true;
 
+        // consume any left rows from advanced measure filler
+        if (advMeasureRowsRemaining > 0) {
+            for (MeasureType.IAdvMeasureFiller filler : advMeasureFillers) {
+                filler.fillTuplle(oneTuple, advMeasureRowIndex);
+            }
+            advMeasureRowIndex++;
+            advMeasureRowsRemaining--;
+            next = oneTuple;
+            return true;
+        }
+
         if (resultIterator == null) {
             if (rangeIterator.hasNext() == false)
                 return false;
@@ -132,9 +148,30 @@ public class CubeSegmentTupleIterator implements 
ITupleIterator {
         scanCount++;
         if (++scanCountDelta >= 1000)
             flushScanCountDelta();
-        tupleConverter.translateResult(result, oneTuple);
-        next = oneTuple;
-        return true;
+
+        // translate into tuple
+        advMeasureFillers = tupleConverter.translateResult(result, oneTuple);
+
+        // the simple case
+        if (advMeasureFillers == null) {
+            next = oneTuple;
+            return true;
+        }
+
+        // advanced measure filling, like TopN, will produce multiple tuples 
out of one record
+        advMeasureRowsRemaining = -1;
+        for (MeasureType.IAdvMeasureFiller filler : advMeasureFillers) {
+            if (advMeasureRowsRemaining < 0)
+                advMeasureRowsRemaining = filler.getNumOfRows();
+            if (advMeasureRowsRemaining != filler.getNumOfRows())
+                throw new IllegalStateException();
+        }
+        if (advMeasureRowsRemaining < 0)
+            throw new IllegalStateException();
+
+        advMeasureRowIndex = 0;
+        return hasNext();
+
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/af889ca0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index 4483cb3..b340be0 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -38,6 +38,7 @@ import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -144,7 +145,7 @@ public class CubeStorageQuery implements 
ICachableStorageQuery {
         List<RowValueDecoder> valueDecoders = 
translateAggregation(cubeDesc.getHbaseMapping(), metrics, context);
 
         // memory hungry distinct count are pushed down to coprocessor, no 
need to set threshold any more
-        // setThreshold(dimensionsD, valueDecoders, context); // set cautious 
threshold to prevent out of memory
+        setThreshold(dimensionsD, valueDecoders, context); // set cautious 
threshold to prevent out of memory
         setCoprocessor(groupsCopD, valueDecoders, context); // enable 
coprocessor if beneficial
         setLimit(filter, context);
 
@@ -484,7 +485,9 @@ public class CubeStorageQuery implements 
ICachableStorageQuery {
 
             Collection<ColumnValueRange> andRanges = 
translateToAndDimRanges(andFilter.getChildren(), cubeSegment);
 
-            result.add(andRanges);
+            if (andRanges != null) {
+                result.add(andRanges);
+            }
         }
 
         return preprocessConstantConditions(result);
@@ -687,8 +690,9 @@ public class CubeStorageQuery implements 
ICachableStorageQuery {
             short cuboidShardNum = 
segment.getCuboidShardNum(scan.getCuboid().getId());
             short cuboidShardBase = 
segment.getCuboidBaseShard(scan.getCuboid().getId());
             for (short i = 0; i < cuboidShardNum; ++i) {
-                byte[] newStartKey = duplicateKeyAndChangeShard(i, startKey);
-                byte[] newStopKey = duplicateKeyAndChangeShard(i, stopKey);
+                short newShard = ShardingHash.normalize(cuboidShardBase, i, 
segment.getTotalShards());
+                byte[] newStartKey = duplicateKeyAndChangeShard(newShard, 
startKey);
+                byte[] newStopKey = duplicateKeyAndChangeShard(newShard, 
stopKey);
                 HBaseKeyRange newRange = new HBaseKeyRange(segment, 
scan.getCuboid(), newStartKey, newStopKey, //
                         scan.getFuzzyKeys(), scan.getFlatOrAndFilter(), 
scan.getPartitionColumnStartDate(), scan.getPartitionColumnEndDate());
                 ret.add(newRange);
@@ -756,5 +760,5 @@ public class CubeStorageQuery implements 
ICachableStorageQuery {
             measureType.adjustSqlDigest(measure, sqlDigest);
         }
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/af889ca0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
index e414e08..3b90dfa 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
@@ -8,12 +8,15 @@ import java.util.Map.Entry;
 
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.RowKeyDecoder;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -22,6 +25,7 @@ import org.apache.kylin.metadata.tuple.TupleInfo;
 import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 public class CubeTupleConverter {
 
@@ -30,11 +34,16 @@ public class CubeTupleConverter {
     final TupleInfo tupleInfo;
     final RowKeyDecoder rowKeyDecoder;
     final List<RowValueDecoder> rowValueDecoders;
-    final List<IDerivedColumnFiller> derivedColFillers; 
+    final List<IDerivedColumnFiller> derivedColFillers;
     final int[] dimensionTupleIdx;
     final int[][] metricsMeasureIdx;
     final int[][] metricsTupleIdx;
 
+    final List<MeasureType<?>> measureTypes;
+
+    final List<MeasureType.IAdvMeasureFiller> advMeasureFillers;
+    final List<Pair<Integer, Integer>> advMeasureIndexInRV;//first=> which 
rowValueDecoders,second => metric index
+
     public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, 
List<RowValueDecoder> rowValueDecoders, TupleInfo tupleInfo) {
         this.cubeSeg = cubeSeg;
         this.cuboid = cuboid;
@@ -42,16 +51,20 @@ public class CubeTupleConverter {
         this.rowKeyDecoder = new RowKeyDecoder(this.cubeSeg);
         this.rowValueDecoders = rowValueDecoders;
         this.derivedColFillers = Lists.newArrayList();
-        
+
         List<TblColRef> dimCols = cuboid.getColumns();
 
+        measureTypes = Lists.newArrayList();
+        advMeasureFillers = Lists.newArrayListWithCapacity(1);
+        advMeasureIndexInRV = Lists.newArrayListWithCapacity(1);
+
         // pre-calculate dimension index mapping to tuple
         dimensionTupleIdx = new int[dimCols.size()];
         for (int i = 0; i < dimCols.size(); i++) {
             TblColRef col = dimCols.get(i);
             dimensionTupleIdx[i] = tupleInfo.hasColumn(col) ? 
tupleInfo.getColumnIndex(col) : -1;
+            measureTypes.add(null);
         }
-        
 
         // pre-calculate metrics index mapping to tuple
         metricsMeasureIdx = new int[rowValueDecoders.size()][];
@@ -64,7 +77,7 @@ public class CubeTupleConverter {
             metricsTupleIdx[i] = new int[selectedMeasures.cardinality()];
             for (int j = 0, mi = selectedMeasures.nextSetBit(0); j < 
metricsMeasureIdx[i].length; j++, mi = selectedMeasures.nextSetBit(mi + 1)) {
                 FunctionDesc aggrFunc = measures[mi].getFunction();
-                
+
                 int tupleIdx;
                 // a rewrite metrics is identified by its rewrite field name
                 if (aggrFunc.needRewrite()) {
@@ -78,6 +91,16 @@ public class CubeTupleConverter {
                 }
                 metricsMeasureIdx[i][j] = mi;
                 metricsTupleIdx[i][j] = tupleIdx;
+
+                MeasureType<?> measureType = aggrFunc.getMeasureType();
+                if (measureType.needAdvancedTupleFilling()) {
+                    Map<TblColRef, Dictionary<String>> dictionaryMap = 
buildDictionaryMap(measureType.getColumnsNeedDictionary(aggrFunc));
+                    
advMeasureFillers.add(measureType.getAdvancedTupleFiller(aggrFunc, tupleInfo, 
dictionaryMap));
+                    advMeasureIndexInRV.add(Pair.newPair(i, mi));
+                    measureTypes.add(null);
+                } else {
+                    measureTypes.add(measureType);
+                }
             }
         }
 
@@ -94,7 +117,16 @@ public class CubeTupleConverter {
         }
     }
 
-    public void translateResult(Result hbaseRow, Tuple tuple) {
+    // load only needed dictionaries
+    private Map<TblColRef, Dictionary<String>> 
buildDictionaryMap(List<TblColRef> columnsNeedDictionary) {
+        Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
+        for (TblColRef col : columnsNeedDictionary) {
+            result.put(col, cubeSeg.getDictionary(col));
+        }
+        return result;
+    }
+
+    public List<MeasureType.IAdvMeasureFiller> translateResult(Result 
hbaseRow, Tuple tuple) {
         try {
             byte[] rowkey = hbaseRow.getRow();
             rowKeyDecoder.decode(rowkey);
@@ -125,8 +157,22 @@ public class CubeTupleConverter {
             int[] measureIdx = metricsMeasureIdx[i];
             int[] tupleIdx = metricsTupleIdx[i];
             for (int j = 0; j < measureIdx.length; j++) {
-                tuple.setMeasureValue(tupleIdx[j], 
measureValues[measureIdx[j]]);
+                if (measureTypes.get(dimensionValues.size() + j) != null) {
+                    tuple.setMeasureValue(tupleIdx[j], 
measureValues[measureIdx[j]]);
+                }
+            }
+        }
+
+        // advanced measure filling, due to possible row split, will complete 
at caller side
+        if (advMeasureFillers.isEmpty()) {
+            return null;
+        } else {
+            for (int i = 0; i < advMeasureFillers.size(); i++) {
+                Pair<Integer, Integer> metricLocation = 
advMeasureIndexInRV.get(i);
+                Object measureValue = 
rowValueDecoders.get(metricLocation.getFirst()).getValues()[metricLocation.getSecond()];
+                advMeasureFillers.get(i).reload(measureValue);
             }
+            return advMeasureFillers;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/af889ca0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
index 4750ea4..13dbaa9 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
@@ -38,8 +38,8 @@ import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
+import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator;
 import org.apache.kylin.storage.hbase.cube.v1.RegionScannerAdapter;
 import org.apache.kylin.storage.hbase.cube.v1.ResultScannerAdapter;
 import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
@@ -62,7 +62,7 @@ public class ObserverEnabler {
     public static ResultScanner scanWithCoprocessorIfBeneficial(CubeSegment 
segment, Cuboid cuboid, TupleFilter tupleFiler, //
             Collection<TblColRef> groupBy, Collection<RowValueDecoder> 
rowValueDecoders, StorageContext context, HTableInterface table, Scan scan) 
throws IOException {
 
-        if (context.isCoprocessorEnabled() == false) {
+        if (true) {
             return table.getScanner(scan);
         }
 
@@ -119,10 +119,10 @@ public class ObserverEnabler {
             return r;
         }
 
-        //        if 
(RowValueDecoder.hasMemHungryCountDistinct(rowValueDecoders)) {
-        //            logger.info("Coprocessor is disabled because there is 
memory hungry count distinct");
-        //            return false;
-        //        }
+        if (RowValueDecoder.hasMemHungryMeasures(rowValueDecoders)) {
+            logger.info("Coprocessor is disabled because there is memory 
hungry count distinct");
+            return false;
+        }
 
         if (context.isExactAggregation()) {
             logger.info("Coprocessor is disabled because exactAggregation is 
true");

http://git-wip-us.apache.org/repos/asf/kylin/blob/af889ca0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
index 41950f7..3a231b5 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
@@ -89,7 +89,9 @@ public class CubeStorageQuery implements 
ICachableStorageQuery {
         // replace derived columns in filter with host columns; columns on 
loosened condition must be added to group by
         TupleFilter filterD = translateDerived(filter, groupsD);
 
+        //actually even if the threshold is set, it will not be used in this 
query engine
         setThreshold(dimensionsD, metrics, context); // set cautious threshold 
to prevent out of memory
+        
         setLimit(filter, context);
 
         List<CubeSegmentScanner> scanners = Lists.newArrayList();

Reply via email to