Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2800 [created] ac77008ee


KYLIN-2800 All dictionaries should be built based on the flat hive table

Signed-off-by: Li Yang <liy...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ac77008e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ac77008e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ac77008e

Branch: refs/heads/KYLIN-2800
Commit: ac77008ee81d4dcc2956b1a2cfd6eaa7ae9fc5d9
Parents: 520f627
Author: zhengdong <zhe...@outlook.com>
Authored: Mon Aug 21 18:39:24 2017 +0800
Committer: Li Yang <liy...@apache.org>
Committed: Wed Sep 6 11:27:56 2017 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeManager.java | 34 +++---------
 .../kylin/cube/cli/DictionaryGeneratorCLI.java  | 23 +-------
 .../apache/kylin/dict/DictionaryManager.java    | 56 +++-----------------
 .../kylin/dict/DictionaryManagerTest.java       | 35 +++---------
 .../engine/mr/steps/FactDistinctColumnsJob.java |  4 +-
 .../mr/steps/FactDistinctColumnsMapper.java     |  6 +--
 .../mr/steps/FactDistinctColumnsMapperBase.java | 12 +++--
 .../mr/steps/FactDistinctColumnsReducer.java    |  4 +-
 .../engine/mr/steps/MergeCuboidMapper.java      | 20 +------
 .../engine/mr/steps/MergeDictionaryStep.java    | 27 +---------
 .../kylin/cube/ITDictionaryManagerTest.java     |  6 +--
 11 files changed, 43 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/ac77008e/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
old mode 100644
new mode 100755
index 043993c..b782a5e
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -229,7 +229,7 @@ public class CubeManager implements IRealizationProvider {
             return null;
 
         String builderClass = cubeDesc.getDictionaryBuilderClass(col);
-        DictionaryInfo dictInfo = 
getDictionaryManager().buildDictionary(cubeDesc.getModel(), col, inpTable,
+        DictionaryInfo dictInfo = getDictionaryManager().buildDictionary(col, 
inpTable,
                 builderClass);
 
         saveDictionaryInfo(cubeSeg, col, dictInfo);
@@ -242,7 +242,7 @@ public class CubeManager implements IRealizationProvider {
         if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
             return null;
 
-        DictionaryInfo dictInfo = 
getDictionaryManager().saveDictionary(cubeDesc.getModel(), col, inpTable, dict);
+        DictionaryInfo dictInfo = getDictionaryManager().saveDictionary(col, 
inpTable, dict);
 
         saveDictionaryInfo(cubeSeg, col, dictInfo);
         return dictInfo;
@@ -859,24 +859,6 @@ public class CubeManager implements IRealizationProvider {
         this.listener = listener;
     }
 
-    /**
-     * Get the columns which need build the dictionary from fact table. (the 
column exists on fact and is not fk)
-     * @param cubeDesc
-     * @return
-     * @throws IOException
-     */
-    public List<TblColRef> getAllDictColumnsOnFact(CubeDesc cubeDesc) throws 
IOException {
-        List<TblColRef> factDictCols = new ArrayList<TblColRef>();
-        DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-        for (TblColRef col : cubeDesc.getAllColumnsNeedDictionaryBuilt()) {
-
-            String scanTable = dictMgr.decideSourceData(cubeDesc.getModel(), 
col).getTable();
-            if (cubeDesc.getModel().isFactTable(scanTable)) {
-                factDictCols.add(col);
-            }
-        }
-        return factDictCols;
-    }
 
     /**
      * Calculate the holes (gaps) in segments.
@@ -922,8 +904,8 @@ public class CubeManager implements IRealizationProvider {
 
     //UHC (ultra high cardinality column): contain the ShardByColumns and the 
GlobalDictionaryColumns
     public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException {
-        List<TblColRef> factDictCols = getAllDictColumnsOnFact(cubeDesc);
-        int[] uhcIndex = new int[factDictCols.size()];
+        List<TblColRef> dictCols = 
Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt());
+        int[] uhcIndex = new int[dictCols.size()];
 
         //add GlobalDictionaryColumns
         List<DictionaryDesc> dictionaryDescList = cubeDesc.getDictionaries();
@@ -931,8 +913,8 @@ public class CubeManager implements IRealizationProvider {
             for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
                 if (dictionaryDesc.getBuilderClass() != null
                         && 
dictionaryDesc.getBuilderClass().equalsIgnoreCase(GLOBAL_DICTIONNARY_CLASS)) {
-                    for (int i = 0; i < factDictCols.size(); i++) {
-                        if 
(factDictCols.get(i).equals(dictionaryDesc.getColumnRef())) {
+                    for (int i = 0; i < dictCols.size(); i++) {
+                        if 
(dictCols.get(i).equals(dictionaryDesc.getColumnRef())) {
                             uhcIndex[i] = 1;
                             break;
                         }
@@ -943,8 +925,8 @@ public class CubeManager implements IRealizationProvider {
 
         //add ShardByColumns
         Set<TblColRef> shardByColumns = cubeDesc.getShardByColumns();
-        for (int i = 0; i < factDictCols.size(); i++) {
-            if (shardByColumns.contains(factDictCols.get(i))) {
+        for (int i = 0; i < dictCols.size(); i++) {
+            if (shardByColumns.contains(dictCols.get(i))) {
                 uhcIndex[i] = 1;
             }
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ac77008e/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java 
b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
index 91deda3..f8b6d61 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -27,17 +27,12 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.dict.DictionaryProvider;
 import org.apache.kylin.dict.DistinctColumnValuesProvider;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
-import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.IReadableTable;
-import org.apache.kylin.source.SourceFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,7 +55,7 @@ public class DictionaryGeneratorCLI {
         // dictionary
         for (TblColRef col : 
cubeSeg.getCubeDesc().getAllColumnsNeedDictionaryBuilt()) {
             logger.info("Building dictionary for " + col);
-            IReadableTable inpTable = decideInputTable(cubeSeg.getModel(), 
col, factTableValueProvider);
+            IReadableTable inpTable = 
factTableValueProvider.getDistinctValuesFor(col);
             if (dictProvider != null) {
                 Dictionary<String> dict = dictProvider.getDictionary(col);
                 if (dict != null) {
@@ -99,20 +94,4 @@ public class DictionaryGeneratorCLI {
         }
     }
 
-    private static IReadableTable decideInputTable(DataModelDesc model, 
TblColRef col, DistinctColumnValuesProvider factTableValueProvider) {
-        KylinConfig config = model.getConfig();
-        DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-        TblColRef srcCol = dictMgr.decideSourceData(model, col);
-        String srcTable = srcCol.getTable();
-
-        IReadableTable inpTable;
-        if (model.isFactTable(srcTable)) {
-            inpTable = factTableValueProvider.getDistinctValuesFor(srcCol);
-        } else {
-            MetadataManager metadataManager = 
MetadataManager.getInstance(config);
-            TableDesc tableDesc = new 
TableDesc(metadataManager.getTableDesc(srcTable, model.getProject()));
-            inpTable = SourceFactory.createReadableTable(tableDesc);
-        }
-        return inpTable;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ac77008e/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java 
b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
old mode 100644
new mode 100755
index 857ee30..7a253e9
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.ClassUtil;
@@ -37,9 +36,6 @@ import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.JoinDesc;
-import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.IReadableTable;
 import org.apache.kylin.source.IReadableTable.TableSignature;
@@ -277,17 +273,17 @@ public class DictionaryManager {
         }
     }
 
-    public DictionaryInfo buildDictionary(DataModelDesc model, TblColRef col, 
IReadableTable inpTable) throws IOException {
-        return buildDictionary(model, col, inpTable, null);
+    public DictionaryInfo buildDictionary(TblColRef col, IReadableTable 
inpTable) throws IOException {
+        return buildDictionary(col, inpTable, null);
     }
 
-    public DictionaryInfo buildDictionary(DataModelDesc model, TblColRef col, 
IReadableTable inpTable, String builderClass) throws IOException {
+    public DictionaryInfo buildDictionary(TblColRef col, IReadableTable 
inpTable, String builderClass) throws IOException {
         if (inpTable.exists() == false)
             return null;
 
         logger.info("building dictionary for " + col);
 
-        DictionaryInfo dictInfo = createDictionaryInfo(model, col, inpTable);
+        DictionaryInfo dictInfo = createDictionaryInfo(col, inpTable);
         String dupInfo = checkDupByInfo(dictInfo);
         if (dupInfo != null) {
             logger.info("Identical dictionary input " + dictInfo.getInput() + 
", reuse existing dictionary at " + dupInfo);
@@ -321,8 +317,8 @@ public class DictionaryManager {
         return dictionary;
     }
 
-    public DictionaryInfo saveDictionary(DataModelDesc model, TblColRef col, 
IReadableTable inpTable, Dictionary<String> dictionary) throws IOException {
-        DictionaryInfo dictInfo = createDictionaryInfo(model, col, inpTable);
+    public DictionaryInfo saveDictionary(TblColRef col, IReadableTable 
inpTable, Dictionary<String> dictionary) throws IOException {
+        DictionaryInfo dictInfo = createDictionaryInfo(col, inpTable);
         String dupInfo = checkDupByInfo(dictInfo);
         if (dupInfo != null) {
             logger.info("Identical dictionary input " + dictInfo.getInput() + 
", reuse existing dictionary at " + dupInfo);
@@ -332,51 +328,15 @@ public class DictionaryManager {
         return trySaveNewDict(dictionary, dictInfo);
     }
 
-    private DictionaryInfo createDictionaryInfo(DataModelDesc model, TblColRef 
col, IReadableTable inpTable) throws IOException {
-        TblColRef srcCol = decideSourceData(model, col);
+    private DictionaryInfo createDictionaryInfo(TblColRef col, IReadableTable 
inpTable) throws IOException {
         TableSignature inputSig = inpTable.getSignature();
         if (inputSig == null) // table does not exists
             throw new IllegalStateException("Input table does not exist: " + 
inpTable);
 
-        DictionaryInfo dictInfo = new DictionaryInfo(srcCol.getColumnDesc(), 
col.getDatatype(), inputSig);
+        DictionaryInfo dictInfo = new DictionaryInfo(col.getColumnDesc(), 
col.getDatatype(), inputSig);
         return dictInfo;
     }
 
-    /**
-     * Decide a dictionary's source data, leverage PK-FK relationship.
-     */
-    public TblColRef decideSourceData(DataModelDesc model, TblColRef col) {
-        // Note FK on fact table is supported by scan the related PK on lookup 
table
-        // FK on fact table and join type is inner, use PK from lookup instead
-        if (model.isFactTable(col.getTable()) == false)
-            return col;
-
-        // find a lookup table that the col joins as FK
-        for (TableRef lookup : model.getLookupTables()) {
-            JoinDesc lookupJoin = model.getJoinByPKSide(lookup);
-            int find = ArrayUtils.indexOf(lookupJoin.getForeignKeyColumns(), 
col);
-            if (find < 0)
-                continue;
-
-            // make sure the joins are all inner up to the root
-            if (isAllInnerJoinsToRoot(model, lookupJoin))
-                return lookupJoin.getPrimaryKeyColumns()[find];
-        }
-
-        return col;
-    }
-
-    private boolean isAllInnerJoinsToRoot(DataModelDesc model, JoinDesc join) {
-        while (join != null) {
-            if (join.isInnerJoin() == false)
-                return false;
-
-            TableRef table = join.getFKSide();
-            join = model.getJoinByPKSide(table);
-        }
-        return true;
-    }
-
     private String checkDupByInfo(DictionaryInfo dictInfo) throws IOException {
         final ResourceStore store = 
MetadataManager.getInstance(config).getStore();
         final List<DictionaryInfo> allResources = 
store.getAllResources(dictInfo.getResourceDir(), DictionaryInfo.class, 
DictionaryInfoSerializer.INFO_SERIALIZER);

http://git-wip-us.apache.org/repos/asf/kylin/blob/ac77008e/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryManagerTest.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryManagerTest.java
 
b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryManagerTest.java
old mode 100644
new mode 100755
index 4820318..9c126b4
--- 
a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryManagerTest.java
+++ 
b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryManagerTest.java
@@ -45,29 +45,6 @@ public class DictionaryManagerTest extends 
LocalFileMetadataTestCase {
         cleanupTestMetadata();
     }
 
-    @Test
-    public void testDecideSourceData() {
-        KylinConfig config = KylinConfig.getInstanceFromEnv();
-
-        DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-        MetadataManager metaMgr = MetadataManager.getInstance(config);
-
-        {
-            DataModelDesc innerModel = 
metaMgr.getDataModelDesc("test_kylin_inner_join_model_desc");
-            TblColRef factDate = 
innerModel.findColumn("TEST_KYLIN_FACT.CAL_DT");
-            TblColRef lookupDate = innerModel.findColumn("TEST_CAL_DT.CAL_DT");
-            TblColRef formatName = innerModel.findColumn("lstg_format_name");
-            assertEquals(lookupDate, dictMgr.decideSourceData(innerModel, 
factDate));
-            assertEquals(lookupDate, dictMgr.decideSourceData(innerModel, 
lookupDate));
-            assertEquals(formatName, dictMgr.decideSourceData(innerModel, 
formatName));
-        }
-        
-        {
-            DataModelDesc outerModel = 
metaMgr.getDataModelDesc("test_kylin_left_join_model_desc");
-            TblColRef factDate = 
outerModel.findColumn("TEST_KYLIN_FACT.CAL_DT");
-            assertEquals(factDate, dictMgr.decideSourceData(outerModel, 
factDate));
-        }
-    }
     
     @Test
     public void testBuildSaveDictionary() throws IOException {
@@ -78,27 +55,27 @@ public class DictionaryManagerTest extends 
LocalFileMetadataTestCase {
         TblColRef col = model.findColumn("lstg_format_name");
 
         // non-exist input returns null;
-        DictionaryInfo nullInfo = dictMgr.buildDictionary(model, col, 
MockupReadableTable.newNonExistTable("/a/path"));
+        DictionaryInfo nullInfo = dictMgr.buildDictionary(col, 
MockupReadableTable.newNonExistTable("/a/path"));
         assertEquals(null, nullInfo);
         
-        DictionaryInfo info1 = dictMgr.buildDictionary(model, col, 
MockupReadableTable.newSingleColumnTable("/a/path", "1", "2", "3"));
+        DictionaryInfo info1 = dictMgr.buildDictionary(col, 
MockupReadableTable.newSingleColumnTable("/a/path", "1", "2", "3"));
         assertEquals(3, info1.getDictionaryObject().getSize());
 
         // same input returns same dict
-        DictionaryInfo info2 = dictMgr.buildDictionary(model, col, 
MockupReadableTable.newSingleColumnTable("/a/path", "1", "2", "3"));
+        DictionaryInfo info2 = dictMgr.buildDictionary(col, 
MockupReadableTable.newSingleColumnTable("/a/path", "1", "2", "3"));
         assertTrue(info1 == info2);
         
         // same input values (different path) returns same dict
-        DictionaryInfo info3 = dictMgr.buildDictionary(model, col, 
MockupReadableTable.newSingleColumnTable("/a/different/path", "1", "2", "3"));
+        DictionaryInfo info3 = dictMgr.buildDictionary(col, 
MockupReadableTable.newSingleColumnTable("/a/different/path", "1", "2", "3"));
         assertTrue(info1 == info3);
         
         // save dictionary works in spite of non-exist table
         Dictionary<String> dict = 
DictionaryGenerator.buildDictionary(col.getType(), new 
IterableDictionaryValueEnumerator("1", "2", "3"));
-        DictionaryInfo info4 = dictMgr.saveDictionary(model, col, 
MockupReadableTable.newNonExistTable("/a/path"), dict);
+        DictionaryInfo info4 = dictMgr.saveDictionary(col, 
MockupReadableTable.newNonExistTable("/a/path"), dict);
         assertTrue(info1 == info4);
         
         Dictionary<String> dict2 = 
DictionaryGenerator.buildDictionary(col.getType(), new 
IterableDictionaryValueEnumerator("1", "2", "3", "4"));
-        DictionaryInfo info5 = dictMgr.saveDictionary(model, col, 
MockupReadableTable.newNonExistTable("/a/path"), dict2);
+        DictionaryInfo info5 = dictMgr.saveDictionary(col, 
MockupReadableTable.newNonExistTable("/a/path"), dict2);
         assertTrue(info1 != info5);
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ac77008e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
old mode 100644
new mode 100755
index ee0989a..08dadc9
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -19,7 +19,7 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
-import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.Path;
@@ -81,7 +81,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob 
{
             // add metadata to distributed cache
             CubeManager cubeMgr = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
             CubeInstance cube = cubeMgr.getCube(cubeName);
-            List<TblColRef> columnsNeedDict = 
cubeMgr.getAllDictColumnsOnFact(cube.getDescriptor());
+            Set<TblColRef> columnsNeedDict = 
cube.getDescriptor().getAllColumnsNeedDictionaryBuilt();
 
             int reducerCount = columnsNeedDict.size();
             int uhcReducerCount = cube.getConfig().getUHCReducerCount();

http://git-wip-us.apache.org/repos/asf/kylin/blob/ac77008e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
old mode 100644
new mode 100755
index 8281759..a9c7833
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -162,7 +162,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends 
FactDistinctColumnsMapperB
 
         for (String[] row: rowCollection) {
             
context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row));
-            for (int i = 0; i < factDictCols.size(); i++) {
+            for (int i = 0; i < dictCols.size(); i++) {
                 String fieldValue = row[dictionaryColumnIndex[i]];
                 if (fieldValue == null)
                     continue;
@@ -185,14 +185,14 @@ public class FactDistinctColumnsMapper<KEYIN> extends 
FactDistinctColumnsMapperB
                 tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
                 tmpbuf.put(valueBytes);
                 outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
-                DataType type = factDictCols.get(i).getType();
+                DataType type = dictCols.get(i).getType();
                 sortableKey.init(outputKey, type);
                 //judge type
                 context.write(sortableKey, EMPTY_TEXT);
 
                 // log a few rows for troubleshooting
                 if (rowCount < 10) {
-                    logger.info("Sample output: " + factDictCols.get(i) + " '" 
+ fieldValue + "' => reducer " + reducerIndex);
+                    logger.info("Sample output: " + dictCols.get(i) + " '" + 
fieldValue + "' => reducer " + reducerIndex);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ac77008e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
old mode 100644
new mode 100755
index 458af69..91454da
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -41,6 +41,8 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.metadata.model.TblColRef;
 
+import com.google.common.collect.Lists;
+
 /**
  */
 abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends 
KylinMapper<KEYIN, VALUEIN, SelfDefineSortableKey, Text> {
@@ -50,7 +52,7 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, 
VALUEIN> extends Kyli
     protected CubeSegment cubeSeg;
     protected CubeDesc cubeDesc;
     protected long baseCuboidId;
-    protected List<TblColRef> factDictCols;
+    protected List<TblColRef> dictCols;
     protected IMRTableInputFormat flatTableInputFormat;
 
     protected Text outputKey = new Text();
@@ -76,14 +78,14 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, 
VALUEIN> extends Kyli
         cubeSeg = 
cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID));
         cubeDesc = cube.getDescriptor();
         baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        factDictCols = 
CubeManager.getInstance(config).getAllDictColumnsOnFact(cubeDesc);
+        dictCols = 
Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt());
 
         flatTableInputFormat = 
MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
 
         intermediateTableDesc = new 
CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), 
cubeDesc);
-        dictionaryColumnIndex = new int[factDictCols.size()];
-        for (int i = 0; i < factDictCols.size(); i++) {
-            TblColRef colRef = factDictCols.get(i);
+        dictionaryColumnIndex = new int[dictCols.size()];
+        for (int i = 0; i < dictCols.size(); i++) {
+            TblColRef colRef = dictCols.get(i);
             int columnIndexOnFlatTbl = 
intermediateTableDesc.getColumnIndex(colRef);
             dictionaryColumnIndex[i] = columnIndexOnFlatTbl;
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ac77008e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
old mode 100644
new mode 100755
index 7f01c3a..96a7d15
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -26,7 +26,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import com.google.common.base.Preconditions;
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
@@ -52,6 +51,7 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -98,7 +98,7 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<SelfDefineSortableK
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
         cubeConfig = cube.getConfig();
         cubeDesc = cube.getDescriptor();
-        columnList = 
CubeManager.getInstance(config).getAllDictColumnsOnFact(cubeDesc);
+        columnList = 
Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt());
 
         boolean collectStatistics = 
Boolean.parseBoolean(conf.get(BatchConstants.CFG_STATISTICS_ENABLED));
         int numberOfTasks = context.getNumReduceTasks();

http://git-wip-us.apache.org/repos/asf/kylin/blob/ac77008e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
old mode 100644
new mode 100755
index a01a928..4a579b8
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -20,7 +20,6 @@ package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -51,7 +50,6 @@ import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.collect.Lists;
@@ -79,7 +77,6 @@ public class MergeCuboidMapper extends KylinMapper<Text, 
Text, Text, Text> {
     private RowKeySplitter rowKeySplitter;
     private RowKeyEncoderProvider rowKeyEncoderProvider;
 
-    private HashMap<TblColRef, Boolean> dimensionsNeedDict = new 
HashMap<TblColRef, Boolean>();
 
     // for re-encode measures that use dictionary
     private List<Pair<Integer, MeasureIngester>> dictMeasures;
@@ -161,7 +158,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, 
Text, Text, Text> {
             int useSplit = i + bodySplitOffset;
             TblColRef col = cuboid.getColumns().get(i);
 
-            if (this.checkNeedMerging(col)) {
+            if (cubeDesc.getRowkey().isUseDictionary(col)) {
                 // if dictionary on fact table column, needs rewrite
                 DictionaryManager dictMgr = 
DictionaryManager.getInstance(config);
                 Dictionary<String> mergedDict = 
dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
@@ -234,19 +231,4 @@ public class MergeCuboidMapper extends KylinMapper<Text, 
Text, Text, Text> {
 
         context.write(outputKey, value);
     }
-
-    private Boolean checkNeedMerging(TblColRef col) throws IOException {
-        Boolean ret = dimensionsNeedDict.get(col);
-        if (ret != null)
-            return ret;
-        
-        ret = cubeDesc.getRowkey().isUseDictionary(col);
-        if (ret) {
-            TableRef srcTable = 
DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), 
col).getTableRef();
-            ret = cubeDesc.getModel().isFactTable(srcTable);
-        }
-        
-        dimensionsNeedDict.put(col, ret);
-        return ret;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ac77008e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
index 4ca132c..58b2c02 100755
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
@@ -21,7 +21,6 @@ package org.apache.kylin.engine.mr.steps;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -37,7 +36,6 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -93,33 +91,17 @@ public class MergeDictionaryStep extends AbstractExecutable 
{
     }
 
     /**
-     * For the new segment, we need to create dictionaries for it, too. For
-     * those dictionaries on fact table, create it by merging underlying
-     * dictionaries. For those dictionaries on lookup table, just copy them 
from
-     * the latest one of the merging segments( 
https://issues.apache.org/jira/browse/KYLIN-2457),
-     * which is reasonable under the assumption that lookup tables would be 
either static or incremental.
-     *
+     * For the new segment, we need to create new dimension dictionaries by 
merging underlying
+     * dictionaries. (https://issues.apache.org/jira/browse/KYLIN-2457, 
https://issues.apache.org/jira/browse/KYLIN-2800)
      * @param cube
      * @param newSeg
      * @throws IOException
      */
     private void makeDictForNewSegment(KylinConfig conf, CubeInstance cube, 
CubeSegment newSeg, List<CubeSegment> mergingSegments) throws IOException {
-        HashSet<TblColRef> colsNeedMeringDict = new HashSet<TblColRef>();
-        HashSet<TblColRef> colsNeedCopyDict = new HashSet<TblColRef>();
         DictionaryManager dictMgr = DictionaryManager.getInstance(conf);
-
         CubeDesc cubeDesc = cube.getDescriptor();
 
         for (TblColRef col : cubeDesc.getAllColumnsNeedDictionaryBuilt()) {
-            TableRef srcTable = dictMgr.decideSourceData(cubeDesc.getModel(), 
col).getTableRef();
-            if (cubeDesc.getModel().isFactTable(srcTable)) {
-                colsNeedMeringDict.add(col);
-            } else {
-                colsNeedCopyDict.add(col);
-            }
-        }
-
-        for (TblColRef col : colsNeedMeringDict) {
             logger.info("Merging fact table dictionary on : " + col);
             List<DictionaryInfo> dictInfos = new ArrayList<DictionaryInfo>();
             for (CubeSegment segment : mergingSegments) {
@@ -135,11 +117,6 @@ public class MergeDictionaryStep extends 
AbstractExecutable {
             }
             mergeDictionaries(dictMgr, newSeg, dictInfos, col);
         }
-        CubeSegment lastSeg = mergingSegments.get(mergingSegments.size() - 1);
-        for (TblColRef col : colsNeedCopyDict) {
-            String path = lastSeg.getDictResPath(col);
-            newSeg.putDictResPath(col, path);
-        }
     }
 
     private DictionaryInfo mergeDictionaries(DictionaryManager dictMgr, 
CubeSegment cubeSeg, List<DictionaryInfo> dicts, TblColRef col) throws 
IOException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/ac77008e/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java 
b/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java
old mode 100644
new mode 100755
index 4dcfdb2..dc41773
--- a/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java
@@ -65,10 +65,10 @@ public class ITDictionaryManagerTest extends 
LocalFileMetadataTestCase {
 
         MockDistinctColumnValuesProvider mockupData = new 
MockDistinctColumnValuesProvider("A", "B", "C");
 
-        DictionaryInfo info1 = dictMgr.buildDictionary(cubeDesc.getModel(), 
col, mockupData.getDistinctValuesFor(col));
+        DictionaryInfo info1 = dictMgr.buildDictionary(col, 
mockupData.getDistinctValuesFor(col));
         System.out.println(JsonUtil.writeValueAsIndentString(info1));
 
-        DictionaryInfo info2 = dictMgr.buildDictionary(cubeDesc.getModel(), 
col, mockupData.getDistinctValuesFor(col));
+        DictionaryInfo info2 = dictMgr.buildDictionary(col, 
mockupData.getDistinctValuesFor(col));
         System.out.println(JsonUtil.writeValueAsIndentString(info2));
 
         // test check duplicate
@@ -89,7 +89,7 @@ public class ITDictionaryManagerTest extends 
LocalFileMetadataTestCase {
 
         // test empty dictionary
         MockDistinctColumnValuesProvider mockupEmpty = new 
MockDistinctColumnValuesProvider();
-        DictionaryInfo info3 = dictMgr.buildDictionary(cubeDesc.getModel(), 
col, mockupEmpty.getDistinctValuesFor(col));
+        DictionaryInfo info3 = dictMgr.buildDictionary(col, 
mockupEmpty.getDistinctValuesFor(col));
         System.out.println(JsonUtil.writeValueAsIndentString(info3));
         assertEquals(0, info3.getCardinality());
         assertEquals(0, info3.getDictionaryObject().getSize());

Reply via email to