Repository: kylin Updated Branches: refs/heads/KYLIN-2800 [created] ac77008ee
KYLIN-2800 All dictionaries should be built based on the flat hive table Signed-off-by: Li Yang <liy...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ac77008e Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ac77008e Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ac77008e Branch: refs/heads/KYLIN-2800 Commit: ac77008ee81d4dcc2956b1a2cfd6eaa7ae9fc5d9 Parents: 520f627 Author: zhengdong <zhe...@outlook.com> Authored: Mon Aug 21 18:39:24 2017 +0800 Committer: Li Yang <liy...@apache.org> Committed: Wed Sep 6 11:27:56 2017 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/cube/CubeManager.java | 34 +++--------- .../kylin/cube/cli/DictionaryGeneratorCLI.java | 23 +------- .../apache/kylin/dict/DictionaryManager.java | 56 +++----------------- .../kylin/dict/DictionaryManagerTest.java | 35 +++--------- .../engine/mr/steps/FactDistinctColumnsJob.java | 4 +- .../mr/steps/FactDistinctColumnsMapper.java | 6 +-- .../mr/steps/FactDistinctColumnsMapperBase.java | 12 +++-- .../mr/steps/FactDistinctColumnsReducer.java | 4 +- .../engine/mr/steps/MergeCuboidMapper.java | 20 +------ .../engine/mr/steps/MergeDictionaryStep.java | 27 +--------- .../kylin/cube/ITDictionaryManagerTest.java | 6 +-- 11 files changed, 43 insertions(+), 184 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/ac77008e/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java old mode 100644 new mode 100755 index 043993c..b782a5e --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -229,7 +229,7 @@ public class CubeManager implements IRealizationProvider { return null; String builderClass = cubeDesc.getDictionaryBuilderClass(col); - DictionaryInfo dictInfo = getDictionaryManager().buildDictionary(cubeDesc.getModel(), col, inpTable, + DictionaryInfo dictInfo = getDictionaryManager().buildDictionary(col, inpTable, builderClass); saveDictionaryInfo(cubeSeg, col, dictInfo); @@ -242,7 +242,7 @@ public class CubeManager implements IRealizationProvider { if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) return null; - DictionaryInfo dictInfo = getDictionaryManager().saveDictionary(cubeDesc.getModel(), col, inpTable, dict); + DictionaryInfo dictInfo = getDictionaryManager().saveDictionary(col, inpTable, dict); saveDictionaryInfo(cubeSeg, col, dictInfo); return dictInfo; @@ -859,24 +859,6 @@ public class CubeManager implements IRealizationProvider { this.listener = listener; } - /** - * Get the columns which need build the dictionary from fact table. (the column exists on fact and is not fk) - * @param cubeDesc - * @return - * @throws IOException - */ - public List<TblColRef> getAllDictColumnsOnFact(CubeDesc cubeDesc) throws IOException { - List<TblColRef> factDictCols = new ArrayList<TblColRef>(); - DictionaryManager dictMgr = DictionaryManager.getInstance(config); - for (TblColRef col : cubeDesc.getAllColumnsNeedDictionaryBuilt()) { - - String scanTable = dictMgr.decideSourceData(cubeDesc.getModel(), col).getTable(); - if (cubeDesc.getModel().isFactTable(scanTable)) { - factDictCols.add(col); - } - } - return factDictCols; - } /** * Calculate the holes (gaps) in segments. @@ -922,8 +904,8 @@ public class CubeManager implements IRealizationProvider { //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException { - List<TblColRef> factDictCols = getAllDictColumnsOnFact(cubeDesc); - int[] uhcIndex = new int[factDictCols.size()]; + List<TblColRef> dictCols = Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt()); + int[] uhcIndex = new int[dictCols.size()]; //add GlobalDictionaryColumns List<DictionaryDesc> dictionaryDescList = cubeDesc.getDictionaries(); @@ -931,8 +913,8 @@ public class CubeManager implements IRealizationProvider { for (DictionaryDesc dictionaryDesc : dictionaryDescList) { if (dictionaryDesc.getBuilderClass() != null && dictionaryDesc.getBuilderClass().equalsIgnoreCase(GLOBAL_DICTIONNARY_CLASS)) { - for (int i = 0; i < factDictCols.size(); i++) { - if (factDictCols.get(i).equals(dictionaryDesc.getColumnRef())) { + for (int i = 0; i < dictCols.size(); i++) { + if (dictCols.get(i).equals(dictionaryDesc.getColumnRef())) { uhcIndex[i] = 1; break; } @@ -943,8 +925,8 @@ public class CubeManager implements IRealizationProvider { //add ShardByColumns Set<TblColRef> shardByColumns = cubeDesc.getShardByColumns(); - for (int i = 0; i < factDictCols.size(); i++) { - if (shardByColumns.contains(factDictCols.get(i))) { + for (int i = 0; i < dictCols.size(); i++) { + if (shardByColumns.contains(dictCols.get(i))) { uhcIndex[i] = 1; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/ac77008e/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java index 91deda3..f8b6d61 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java @@ -27,17 +27,12 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.DimensionDesc; -import org.apache.kylin.dict.DictionaryManager; import org.apache.kylin.dict.DictionaryProvider; import org.apache.kylin.dict.DistinctColumnValuesProvider; -import org.apache.kylin.metadata.MetadataManager; -import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.JoinDesc; -import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.IReadableTable; -import org.apache.kylin.source.SourceFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,7 +55,7 @@ public class DictionaryGeneratorCLI { // dictionary for (TblColRef col : cubeSeg.getCubeDesc().getAllColumnsNeedDictionaryBuilt()) { logger.info("Building dictionary for " + col); - IReadableTable inpTable = decideInputTable(cubeSeg.getModel(), col, factTableValueProvider); + IReadableTable inpTable = factTableValueProvider.getDistinctValuesFor(col); if (dictProvider != null) { Dictionary<String> dict = dictProvider.getDictionary(col); if (dict != null) { @@ -99,20 +94,4 @@ public class DictionaryGeneratorCLI { } } - private static IReadableTable decideInputTable(DataModelDesc model, TblColRef col, DistinctColumnValuesProvider factTableValueProvider) { - KylinConfig config = model.getConfig(); - DictionaryManager dictMgr = DictionaryManager.getInstance(config); - TblColRef srcCol = dictMgr.decideSourceData(model, col); - String srcTable = srcCol.getTable(); - - IReadableTable inpTable; - if (model.isFactTable(srcTable)) { - inpTable = factTableValueProvider.getDistinctValuesFor(srcCol); - } else { - MetadataManager metadataManager = MetadataManager.getInstance(config); - TableDesc tableDesc = new TableDesc(metadataManager.getTableDesc(srcTable, model.getProject())); - inpTable = SourceFactory.createReadableTable(tableDesc); - } - return inpTable; - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/ac77008e/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java old mode 100644 new mode 100755 index 857ee30..7a253e9 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java @@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang.ArrayUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.ClassUtil; @@ -37,9 +36,6 @@ import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.metadata.model.DataModelDesc; -import org.apache.kylin.metadata.model.JoinDesc; -import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.IReadableTable; import org.apache.kylin.source.IReadableTable.TableSignature; @@ -277,17 +273,17 @@ public class DictionaryManager { } } - public DictionaryInfo buildDictionary(DataModelDesc model, TblColRef col, IReadableTable inpTable) throws IOException { - return buildDictionary(model, col, inpTable, null); + public DictionaryInfo buildDictionary(TblColRef col, IReadableTable inpTable) throws IOException { + return buildDictionary(col, inpTable, null); } - public DictionaryInfo buildDictionary(DataModelDesc model, TblColRef col, IReadableTable inpTable, String builderClass) throws IOException { + public DictionaryInfo buildDictionary(TblColRef col, IReadableTable inpTable, String builderClass) throws IOException { if (inpTable.exists() == false) return null; logger.info("building dictionary for " + col); - DictionaryInfo dictInfo = createDictionaryInfo(model, col, inpTable); + DictionaryInfo dictInfo = createDictionaryInfo(col, inpTable); String dupInfo = checkDupByInfo(dictInfo); if (dupInfo != null) { logger.info("Identical dictionary input " + dictInfo.getInput() + ", reuse existing dictionary at " + dupInfo); @@ -321,8 +317,8 @@ public class DictionaryManager { return dictionary; } - public DictionaryInfo saveDictionary(DataModelDesc model, TblColRef col, IReadableTable inpTable, Dictionary<String> dictionary) throws IOException { - DictionaryInfo dictInfo = createDictionaryInfo(model, col, inpTable); + public DictionaryInfo saveDictionary(TblColRef col, IReadableTable inpTable, Dictionary<String> dictionary) throws IOException { + DictionaryInfo dictInfo = createDictionaryInfo(col, inpTable); String dupInfo = checkDupByInfo(dictInfo); if (dupInfo != null) { logger.info("Identical dictionary input " + dictInfo.getInput() + ", reuse existing dictionary at " + dupInfo); @@ -332,51 +328,15 @@ public class DictionaryManager { return trySaveNewDict(dictionary, dictInfo); } - private DictionaryInfo createDictionaryInfo(DataModelDesc model, TblColRef col, IReadableTable inpTable) throws IOException { - TblColRef srcCol = decideSourceData(model, col); + private DictionaryInfo createDictionaryInfo(TblColRef col, IReadableTable inpTable) throws IOException { TableSignature inputSig = inpTable.getSignature(); if (inputSig == null) // table does not exists throw new IllegalStateException("Input table does not exist: " + inpTable); - DictionaryInfo dictInfo = new DictionaryInfo(srcCol.getColumnDesc(), col.getDatatype(), inputSig); + DictionaryInfo dictInfo = new DictionaryInfo(col.getColumnDesc(), col.getDatatype(), inputSig); return dictInfo; } - /** - * Decide a dictionary's source data, leverage PK-FK relationship. - */ - public TblColRef decideSourceData(DataModelDesc model, TblColRef col) { - // Note FK on fact table is supported by scan the related PK on lookup table - // FK on fact table and join type is inner, use PK from lookup instead - if (model.isFactTable(col.getTable()) == false) - return col; - - // find a lookup table that the col joins as FK - for (TableRef lookup : model.getLookupTables()) { - JoinDesc lookupJoin = model.getJoinByPKSide(lookup); - int find = ArrayUtils.indexOf(lookupJoin.getForeignKeyColumns(), col); - if (find < 0) - continue; - - // make sure the joins are all inner up to the root - if (isAllInnerJoinsToRoot(model, lookupJoin)) - return lookupJoin.getPrimaryKeyColumns()[find]; - } - - return col; - } - - private boolean isAllInnerJoinsToRoot(DataModelDesc model, JoinDesc join) { - while (join != null) { - if (join.isInnerJoin() == false) - return false; - - TableRef table = join.getFKSide(); - join = model.getJoinByPKSide(table); - } - return true; - } - private String checkDupByInfo(DictionaryInfo dictInfo) throws IOException { final ResourceStore store = MetadataManager.getInstance(config).getStore(); final List<DictionaryInfo> allResources = store.getAllResources(dictInfo.getResourceDir(), DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER); http://git-wip-us.apache.org/repos/asf/kylin/blob/ac77008e/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryManagerTest.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryManagerTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryManagerTest.java old mode 100644 new mode 100755 index 4820318..9c126b4 --- a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryManagerTest.java +++ b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryManagerTest.java @@ -45,29 +45,6 @@ public class DictionaryManagerTest extends LocalFileMetadataTestCase { cleanupTestMetadata(); } - @Test - public void testDecideSourceData() { - KylinConfig config = KylinConfig.getInstanceFromEnv(); - - DictionaryManager dictMgr = DictionaryManager.getInstance(config); - MetadataManager metaMgr = MetadataManager.getInstance(config); - - { - DataModelDesc innerModel = metaMgr.getDataModelDesc("test_kylin_inner_join_model_desc"); - TblColRef factDate = innerModel.findColumn("TEST_KYLIN_FACT.CAL_DT"); - TblColRef lookupDate = innerModel.findColumn("TEST_CAL_DT.CAL_DT"); - TblColRef formatName = innerModel.findColumn("lstg_format_name"); - assertEquals(lookupDate, dictMgr.decideSourceData(innerModel, factDate)); - assertEquals(lookupDate, dictMgr.decideSourceData(innerModel, lookupDate)); - assertEquals(formatName, dictMgr.decideSourceData(innerModel, formatName)); - } - - { - DataModelDesc outerModel = metaMgr.getDataModelDesc("test_kylin_left_join_model_desc"); - TblColRef factDate = outerModel.findColumn("TEST_KYLIN_FACT.CAL_DT"); - assertEquals(factDate, dictMgr.decideSourceData(outerModel, factDate)); - } - } @Test public void testBuildSaveDictionary() throws IOException { @@ -78,27 +55,27 @@ public class DictionaryManagerTest extends LocalFileMetadataTestCase { TblColRef col = model.findColumn("lstg_format_name"); // non-exist input returns null; - DictionaryInfo nullInfo = dictMgr.buildDictionary(model, col, MockupReadableTable.newNonExistTable("/a/path")); + DictionaryInfo nullInfo = dictMgr.buildDictionary(col, MockupReadableTable.newNonExistTable("/a/path")); assertEquals(null, nullInfo); - DictionaryInfo info1 = dictMgr.buildDictionary(model, col, MockupReadableTable.newSingleColumnTable("/a/path", "1", "2", "3")); + DictionaryInfo info1 = dictMgr.buildDictionary(col, MockupReadableTable.newSingleColumnTable("/a/path", "1", "2", "3")); assertEquals(3, info1.getDictionaryObject().getSize()); // same input returns same dict - DictionaryInfo info2 = dictMgr.buildDictionary(model, col, MockupReadableTable.newSingleColumnTable("/a/path", "1", "2", "3")); + DictionaryInfo info2 = dictMgr.buildDictionary(col, MockupReadableTable.newSingleColumnTable("/a/path", "1", "2", "3")); assertTrue(info1 == info2); // same input values (different path) returns same dict - DictionaryInfo info3 = dictMgr.buildDictionary(model, col, MockupReadableTable.newSingleColumnTable("/a/different/path", "1", "2", "3")); + DictionaryInfo info3 = dictMgr.buildDictionary(col, MockupReadableTable.newSingleColumnTable("/a/different/path", "1", "2", "3")); assertTrue(info1 == info3); // save dictionary works in spite of non-exist table Dictionary<String> dict = DictionaryGenerator.buildDictionary(col.getType(), new IterableDictionaryValueEnumerator("1", "2", "3")); - DictionaryInfo info4 = dictMgr.saveDictionary(model, col, MockupReadableTable.newNonExistTable("/a/path"), dict); + DictionaryInfo info4 = dictMgr.saveDictionary(col, MockupReadableTable.newNonExistTable("/a/path"), dict); assertTrue(info1 == info4); Dictionary<String> dict2 = DictionaryGenerator.buildDictionary(col.getType(), new IterableDictionaryValueEnumerator("1", "2", "3", "4")); - DictionaryInfo info5 = dictMgr.saveDictionary(model, col, MockupReadableTable.newNonExistTable("/a/path"), dict2); + DictionaryInfo info5 = dictMgr.saveDictionary(col, MockupReadableTable.newNonExistTable("/a/path"), dict2); assertTrue(info1 != info5); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/ac77008e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java old mode 100644 new mode 100755 index ee0989a..08dadc9 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java @@ -19,7 +19,7 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; -import java.util.List; +import java.util.Set; import org.apache.commons.cli.Options; import org.apache.hadoop.fs.Path; @@ -81,7 +81,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { // add metadata to distributed cache CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); CubeInstance cube = cubeMgr.getCube(cubeName); - List<TblColRef> columnsNeedDict = cubeMgr.getAllDictColumnsOnFact(cube.getDescriptor()); + Set<TblColRef> columnsNeedDict = cube.getDescriptor().getAllColumnsNeedDictionaryBuilt(); int reducerCount = columnsNeedDict.size(); int uhcReducerCount = cube.getConfig().getUHCReducerCount(); http://git-wip-us.apache.org/repos/asf/kylin/blob/ac77008e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java old mode 100644 new mode 100755 index 8281759..a9c7833 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java @@ -162,7 +162,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB for (String[] row: rowCollection) { context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row)); - for (int i = 0; i < factDictCols.size(); i++) { + for (int i = 0; i < dictCols.size(); i++) { String fieldValue = row[dictionaryColumnIndex[i]]; if (fieldValue == null) continue; @@ -185,14 +185,14 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB tmpbuf.put(Bytes.toBytes(reducerIndex)[3]); tmpbuf.put(valueBytes); outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); - DataType type = factDictCols.get(i).getType(); + DataType type = dictCols.get(i).getType(); sortableKey.init(outputKey, type); //judge type context.write(sortableKey, EMPTY_TEXT); // log a few rows for troubleshooting if (rowCount < 10) { - logger.info("Sample output: " + factDictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex); + logger.info("Sample output: " + dictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/ac77008e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java old mode 100644 new mode 100755 index 458af69..91454da --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java @@ -41,6 +41,8 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.metadata.model.TblColRef; +import com.google.common.collect.Lists; + /** */ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, SelfDefineSortableKey, Text> { @@ -50,7 +52,7 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli protected CubeSegment cubeSeg; protected CubeDesc cubeDesc; protected long baseCuboidId; - protected List<TblColRef> factDictCols; + protected List<TblColRef> dictCols; protected IMRTableInputFormat flatTableInputFormat; protected Text outputKey = new Text(); @@ -76,14 +78,14 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli cubeSeg = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID)); cubeDesc = cube.getDescriptor(); baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - factDictCols = CubeManager.getInstance(config).getAllDictColumnsOnFact(cubeDesc); + dictCols = Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt()); flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat(); intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc); - dictionaryColumnIndex = new int[factDictCols.size()]; - for (int i = 0; i < factDictCols.size(); i++) { - TblColRef colRef = factDictCols.get(i); + dictionaryColumnIndex = new int[dictCols.size()]; + for (int i = 0; i < dictCols.size(); i++) { + TblColRef colRef = dictCols.get(i); int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef); dictionaryColumnIndex[i] = columnIndexOnFlatTbl; } http://git-wip-us.apache.org/repos/asf/kylin/blob/ac77008e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java old mode 100644 new mode 100755 index 7f01c3a..96a7d15 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -26,7 +26,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import com.google.common.base.Preconditions; import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; @@ -52,6 +51,7 @@ import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -98,7 +98,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); cubeConfig = cube.getConfig(); cubeDesc = cube.getDescriptor(); - columnList = CubeManager.getInstance(config).getAllDictColumnsOnFact(cubeDesc); + columnList = Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt()); boolean collectStatistics = Boolean.parseBoolean(conf.get(BatchConstants.CFG_STATISTICS_ENABLED)); int numberOfTasks = context.getNumReduceTasks(); http://git-wip-us.apache.org/repos/asf/kylin/blob/ac77008e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java old mode 100644 new mode 100755 index a01a928..4a579b8 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java @@ -20,7 +20,6 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -51,7 +50,6 @@ import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.MeasureIngester; import org.apache.kylin.measure.MeasureType; import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; import com.google.common.collect.Lists; @@ -79,7 +77,6 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { private RowKeySplitter rowKeySplitter; private RowKeyEncoderProvider rowKeyEncoderProvider; - private HashMap<TblColRef, Boolean> dimensionsNeedDict = new HashMap<TblColRef, Boolean>(); // for re-encode measures that use dictionary private List<Pair<Integer, MeasureIngester>> dictMeasures; @@ -161,7 +158,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { int useSplit = i + bodySplitOffset; TblColRef col = cuboid.getColumns().get(i); - if (this.checkNeedMerging(col)) { + if (cubeDesc.getRowkey().isUseDictionary(col)) { // if dictionary on fact table column, needs rewrite DictionaryManager dictMgr = DictionaryManager.getInstance(config); Dictionary<String> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col)); @@ -234,19 +231,4 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { context.write(outputKey, value); } - - private Boolean checkNeedMerging(TblColRef col) throws IOException { - Boolean ret = dimensionsNeedDict.get(col); - if (ret != null) - return ret; - - ret = cubeDesc.getRowkey().isUseDictionary(col); - if (ret) { - TableRef srcTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), col).getTableRef(); - ret = cubeDesc.getModel().isFactTable(srcTable); - } - - dimensionsNeedDict.put(col, ret); - return ret; - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/ac77008e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java index 4ca132c..58b2c02 100755 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java @@ -21,7 +21,6 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; @@ -37,7 +36,6 @@ import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; -import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,33 +91,17 @@ public class MergeDictionaryStep extends AbstractExecutable { } /** - * For the new segment, we need to create dictionaries for it, too. For - * those dictionaries on fact table, create it by merging underlying - * dictionaries. For those dictionaries on lookup table, just copy them from - * the latest one of the merging segments( https://issues.apache.org/jira/browse/KYLIN-2457), - * which is reasonable under the assumption that lookup tables would be either static or incremental. - * + * For the new segment, we need to create new dimension dictionaries by merging underlying + * dictionaries. (https://issues.apache.org/jira/browse/KYLIN-2457, https://issues.apache.org/jira/browse/KYLIN-2800) * @param cube * @param newSeg * @throws IOException */ private void makeDictForNewSegment(KylinConfig conf, CubeInstance cube, CubeSegment newSeg, List<CubeSegment> mergingSegments) throws IOException { - HashSet<TblColRef> colsNeedMeringDict = new HashSet<TblColRef>(); - HashSet<TblColRef> colsNeedCopyDict = new HashSet<TblColRef>(); DictionaryManager dictMgr = DictionaryManager.getInstance(conf); - CubeDesc cubeDesc = cube.getDescriptor(); for (TblColRef col : cubeDesc.getAllColumnsNeedDictionaryBuilt()) { - TableRef srcTable = dictMgr.decideSourceData(cubeDesc.getModel(), col).getTableRef(); - if (cubeDesc.getModel().isFactTable(srcTable)) { - colsNeedMeringDict.add(col); - } else { - colsNeedCopyDict.add(col); - } - } - - for (TblColRef col : colsNeedMeringDict) { logger.info("Merging fact table dictionary on : " + col); List<DictionaryInfo> dictInfos = new ArrayList<DictionaryInfo>(); for (CubeSegment segment : mergingSegments) { @@ -135,11 +117,6 @@ public class MergeDictionaryStep extends AbstractExecutable { } mergeDictionaries(dictMgr, newSeg, dictInfos, col); } - CubeSegment lastSeg = mergingSegments.get(mergingSegments.size() - 1); - for (TblColRef col : colsNeedCopyDict) { - String path = lastSeg.getDictResPath(col); - newSeg.putDictResPath(col, path); - } } private DictionaryInfo mergeDictionaries(DictionaryManager dictMgr, CubeSegment cubeSeg, List<DictionaryInfo> dicts, TblColRef col) throws IOException { http://git-wip-us.apache.org/repos/asf/kylin/blob/ac77008e/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java old mode 100644 new mode 100755 index 4dcfdb2..dc41773 --- a/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java @@ -65,10 +65,10 @@ public class ITDictionaryManagerTest extends LocalFileMetadataTestCase { MockDistinctColumnValuesProvider mockupData = new MockDistinctColumnValuesProvider("A", "B", "C"); - DictionaryInfo info1 = dictMgr.buildDictionary(cubeDesc.getModel(), col, mockupData.getDistinctValuesFor(col)); + DictionaryInfo info1 = dictMgr.buildDictionary(col, mockupData.getDistinctValuesFor(col)); System.out.println(JsonUtil.writeValueAsIndentString(info1)); - DictionaryInfo info2 = dictMgr.buildDictionary(cubeDesc.getModel(), col, mockupData.getDistinctValuesFor(col)); + DictionaryInfo info2 = dictMgr.buildDictionary(col, mockupData.getDistinctValuesFor(col)); System.out.println(JsonUtil.writeValueAsIndentString(info2)); // test check duplicate @@ -89,7 +89,7 @@ public class ITDictionaryManagerTest extends LocalFileMetadataTestCase { // test empty dictionary MockDistinctColumnValuesProvider mockupEmpty = new MockDistinctColumnValuesProvider(); - DictionaryInfo info3 = dictMgr.buildDictionary(cubeDesc.getModel(), col, mockupEmpty.getDistinctValuesFor(col)); + DictionaryInfo info3 = dictMgr.buildDictionary(col, mockupEmpty.getDistinctValuesFor(col)); System.out.println(JsonUtil.writeValueAsIndentString(info3)); assertEquals(0, info3.getCardinality()); assertEquals(0, info3.getDictionaryObject().getSize());