Repository: kylin Updated Branches: refs/heads/KYLIN-2217-2 [created] bbf4a9f44
KYLIN-2217 Reducers build dictionaries locally Signed-off-by: Li Yang <liy...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a4e1f8dd Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a4e1f8dd Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a4e1f8dd Branch: refs/heads/KYLIN-2217-2 Commit: a4e1f8ddc53030c897e9dece481ba54c95165afe Parents: 3186d17 Author: xiefan46 <958034...@qq.com> Authored: Wed Nov 23 09:48:55 2016 +0800 Committer: Li Yang <liy...@apache.org> Committed: Wed Nov 30 11:06:36 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 9 +- .../java/org/apache/kylin/cube/CubeManager.java | 3 +- .../kylin/cube/cli/DictionaryGeneratorCLI.java | 25 ++- .../apache/kylin/dict/DictionaryManager.java | 17 +- .../apache/kylin/dict/DictionaryProvider.java | 28 ++++ .../dict/DictionaryReducerLocalGenerator.java | 156 +++++++++++++++++++ .../dict/IDictionaryReducerLocalBuilder.java | 31 ++++ .../kylin/dict/DictionaryProviderTest.java | 109 +++++++++++++ .../storage/translate/ColumnValueRange.java | 2 +- .../engine/mr/steps/CreateDictionaryJob.java | 44 +++++- .../mr/steps/FactDistinctColumnsReducer.java | 123 +++++++++++++-- .../mr/steps/UpdateCubeInfoAfterBuildStep.java | 54 +++---- .../storage/hbase/cube/v1/CubeStorageQuery.java | 6 +- 13 files changed, 547 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/a4e1f8dd/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 7dcc771..766c04d 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -719,7 +719,14 @@ abstract public class KylinConfigBase implements Serializable { //UHC: ultra high cardinality columns, contain the ShardByColumns and the GlobalDictionaryColumns public int getUHCReducerCount() { - return Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "3")); + return Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "1")); + } + + public boolean isReducerLocalBuildDict() { + if (getUHCReducerCount() != 1) { + return false; + } + return Boolean.parseBoolean(getOptional("kylin.engine.mr.reducer-local-build-dict", "true")); } public String getYarnStatusCheckUrl() { http://git-wip-us.apache.org/repos/asf/kylin/blob/a4e1f8dd/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index b4422d2..119a21a 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -213,6 +213,7 @@ public class CubeManager implements IRealizationProvider { return result; } + public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, ReadableTable inpTable) throws IOException { CubeDesc cubeDesc = cubeSeg.getCubeDesc(); if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) @@ -221,6 +222,7 @@ public class CubeManager implements IRealizationProvider { String builderClass = cubeDesc.getDictionaryBuilderClass(col); DictionaryInfo dictInfo = getDictionaryManager().buildDictionary(cubeDesc.getModel(), col, inpTable, builderClass); + saveDictionaryInfo(cubeSeg, col, dictInfo); return dictInfo; } @@ -266,7 +268,6 @@ public class CubeManager implements IRealizationProvider { } catch (IOException e) { throw new IllegalStateException("Failed to get dictionary for cube segment" + cubeSeg + ", col" + col, e); } - return (Dictionary<String>) info.getDictionaryObject(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/a4e1f8dd/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java index a6aeb96..a4e1df0 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java @@ -22,11 +22,13 @@ import java.io.IOException; import java.util.Set; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.DimensionDesc; import org.apache.kylin.dict.DictionaryManager; +import org.apache.kylin.dict.DictionaryProvider; import org.apache.kylin.dict.DistinctColumnValuesProvider; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.DataModelDesc; @@ -44,21 +46,30 @@ public class DictionaryGeneratorCLI { private static final Logger logger = LoggerFactory.getLogger(DictionaryGeneratorCLI.class); - public static void processSegment(KylinConfig config, String cubeName, String segmentID, DistinctColumnValuesProvider factTableValueProvider) throws IOException { + public static void processSegment(KylinConfig config, String cubeName, String segmentID, DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException { CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); CubeSegment segment = cube.getSegmentById(segmentID); - processSegment(config, segment, factTableValueProvider); + processSegment(config, segment, factTableValueProvider, dictProvider); } - private static void processSegment(KylinConfig config, CubeSegment cubeSeg, DistinctColumnValuesProvider factTableValueProvider) throws IOException { + private static void processSegment(KylinConfig config, CubeSegment cubeSeg, DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException { CubeManager cubeMgr = CubeManager.getInstance(config); // dictionary for (TblColRef col : cubeSeg.getCubeDesc().getAllColumnsNeedDictionaryBuilt()) { logger.info("Building dictionary for " + col); ReadableTable inpTable = decideInputTable(cubeSeg.getModel(), col, factTableValueProvider); - cubeMgr.buildDictionary(cubeSeg, col, inpTable); + if (config.isReducerLocalBuildDict() && dictProvider != null) { + Dictionary<String> dict = dictProvider.getDictionary(col); + if (dict != null) { + cubeMgr.saveDictionary(cubeSeg, col, inpTable, dict); + } else { + cubeMgr.buildDictionary(cubeSeg, col, inpTable); + } + } else { + cubeMgr.buildDictionary(cubeSeg, col, inpTable); + } } // snapshot @@ -68,19 +79,19 @@ public class DictionaryGeneratorCLI { if (cubeSeg.getModel().isLookupTable(table)) toSnapshot.add(table.getTableIdentity()); } - + for (String tableIdentity : toSnapshot) { logger.info("Building snapshot of " + tableIdentity); cubeMgr.buildSnapshotTable(cubeSeg, tableIdentity); } } - + private static ReadableTable decideInputTable(DataModelDesc model, TblColRef col, DistinctColumnValuesProvider factTableValueProvider) { KylinConfig config = model.getConfig(); DictionaryManager dictMgr = DictionaryManager.getInstance(config); TblColRef srcCol = dictMgr.decideSourceData(model, col); String srcTable = srcCol.getTable(); - + ReadableTable inpTable; if (model.isFactTable(srcTable)) { inpTable = factTableValueProvider.getDistinctValuesFor(srcCol); http://git-wip-us.apache.org/repos/asf/kylin/blob/a4e1f8dd/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java index 6178234..0caef14 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java @@ -87,6 +87,7 @@ public class DictionaryManager { private KylinConfig config; private LoadingCache<String, DictionaryInfo> dictCache; // resource + // path ==> // DictionaryInfo @@ -275,10 +276,12 @@ public class DictionaryManager { return buildDictionary(model, col, inpTable, null); } + public DictionaryInfo buildDictionary(DataModelDesc model, TblColRef col, ReadableTable inpTable, String builderClass) throws IOException { if (inpTable.exists() == false) return null; + logger.info("building dictionary for " + col); DictionaryInfo dictInfo = createDictionaryInfo(model, col, inpTable); @@ -291,6 +294,12 @@ public class DictionaryManager { logger.info("Building dictionary object " + JsonUtil.writeValueAsString(dictInfo)); Dictionary<String> dictionary; + dictionary = buildDictFromReadableTable(inpTable, dictInfo, builderClass, col); + return trySaveNewDict(dictionary, dictInfo); + } + + private Dictionary<String> buildDictFromReadableTable(ReadableTable inpTable, DictionaryInfo dictInfo, String builderClass, TblColRef col) throws IOException { + Dictionary<String> dictionary; IDictionaryValueEnumerator columnValueEnumerator = null; try { columnValueEnumerator = new TableColumnValueEnumerator(inpTable.getReader(), dictInfo.getSourceColumnIndex()); @@ -304,7 +313,7 @@ public class DictionaryManager { if (columnValueEnumerator != null) columnValueEnumerator.close(); } - return trySaveNewDict(dictionary, dictInfo); + return dictionary; } public DictionaryInfo saveDictionary(DataModelDesc model, TblColRef col, ReadableTable inpTable, Dictionary<String> dictionary) throws IOException { @@ -336,19 +345,19 @@ public class DictionaryManager { // FK on fact table and join type is inner, use PK from lookup instead if (model.isFactTable(col.getTable()) == false) return col; - + // find a lookup table that the col joins as FK for (TableRef lookup : model.getLookupTables()) { JoinDesc lookupJoin = model.getJoinByPKSide(lookup); int find = ArrayUtils.indexOf(lookupJoin.getForeignKeyColumns(), col); if (find < 0) continue; - + // make sure the joins are all inner up to the root if (isAllInnerJoinsToRoot(model, lookupJoin)) return lookupJoin.getPrimaryKeyColumns()[find]; } - + return col; } http://git-wip-us.apache.org/repos/asf/kylin/blob/a4e1f8dd/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryProvider.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryProvider.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryProvider.java new file mode 100644 index 0000000..6387535 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryProvider.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.dict; + +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.metadata.model.TblColRef; + +/** + * Created by xiefan on 16-11-23. + */ +public interface DictionaryProvider { + public Dictionary<String> getDictionary(TblColRef col); +} http://git-wip-us.apache.org/repos/asf/kylin/blob/a4e1f8dd/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryReducerLocalGenerator.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryReducerLocalGenerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryReducerLocalGenerator.java new file mode 100644 index 0000000..35d379a --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryReducerLocalGenerator.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.dict; + +import com.google.common.base.Preconditions; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.metadata.datatype.DataType; + +import java.text.ParseException; +import java.text.SimpleDateFormat; + +/** + * Created by xiefan on 16-11-16. + * + * TODO:sample,mergeDict + */ +public class DictionaryReducerLocalGenerator { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DictionaryReducerLocalGenerator.class); + + private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", "yyyyMMdd" }; + + public static IDictionaryReducerLocalBuilder getBuilder(DataType dataType) { + Preconditions.checkNotNull(dataType, "dataType cannot be null"); + + IDictionaryReducerLocalBuilder builder; + if (dataType.isDateTimeFamily()) { + if (dataType.isDate()) + builder = new DateDictBuilder(); + else + builder = new TimeDictBuilder(); + } else if (dataType.isNumberFamily()) { + builder = new NumberDictBuilder(0); + } else { + builder = new StringDictBuilder(0); + } + return builder; + } + + private static class DateDictBuilder implements IDictionaryReducerLocalBuilder { + + private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", "yyyyMMdd" }; + + private String matchPattern = null; + + private boolean isRecognizeFormat = false; + + private SimpleDateFormat sdf; + + @Override + public Dictionary<String> build(int baseId) throws Exception { + if (isRecognizeFormat) { + return new DateStrDictionary(matchPattern, baseId); + } else { + throw new IllegalStateException("Date format not match"); + } + } + + @Override + public void addValue(String value) throws Exception { + if (matchPattern == null) { //init match pattern + for (String ptn : DATE_PATTERNS) { + matchPattern = ptn; + SimpleDateFormat sdf = new SimpleDateFormat(ptn); + try { + sdf.parse(value); + isRecognizeFormat = true; + break; + } catch (ParseException e) { + + } + } + sdf = new SimpleDateFormat(matchPattern); + } + if (!isRecognizeFormat) { + throw new IllegalStateException("Date format not match"); + } + try { + sdf.parse(value); + } catch (ParseException e) { + isRecognizeFormat = false; + logger.info("Unrecognized date value: " + value); + } + } + + } + + private static class TimeDictBuilder implements IDictionaryReducerLocalBuilder { + + @Override + public Dictionary<String> build(int baseId) { + return new TimeStrDictionary(); + } + + @Override + public void addValue(String value) { + + } + + } + + private static class StringDictBuilder implements IDictionaryReducerLocalBuilder { + + private TrieDictionaryForestBuilder<String> builder; + + public StringDictBuilder(int baseId) { + builder = new TrieDictionaryForestBuilder<String>(new StringBytesConverter(), 0); + } + + @Override + public Dictionary<String> build(int baseId) { + return builder.build(); + } + + @Override + public void addValue(String value) { + builder.addValue(value); + } + + } + + public static class NumberDictBuilder implements IDictionaryReducerLocalBuilder { + + private NumberDictionaryForestBuilder builder; + + public NumberDictBuilder(int baseId) { + builder = new NumberDictionaryForestBuilder(baseId); + } + + @Override + public Dictionary<String> build(int baseId) { + return builder.build(); + } + + @Override + public void addValue(String value) { + builder.addValue(value); + } + + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/a4e1f8dd/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryReducerLocalBuilder.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryReducerLocalBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryReducerLocalBuilder.java new file mode 100644 index 0000000..19b1d28 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryReducerLocalBuilder.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.dict; + +import org.apache.kylin.common.util.Dictionary; + +/** + * Created by xiefan on 16-11-16. + */ +public interface IDictionaryReducerLocalBuilder { + Dictionary<String> build(int baseId) throws Exception; + + void addValue(String value) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/a4e1f8dd/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java new file mode 100644 index 0000000..0225737 --- /dev/null +++ b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java @@ -0,0 +1,109 @@ +package org.apache.kylin.dict; + +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.model.TblColRef; +import org.junit.Test; + +import java.io.BufferedOutputStream; +import java.io.BufferedWriter; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.lang.reflect.ParameterizedType; +import java.util.Arrays; +import java.util.Iterator; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Created by xiefan on 16-11-23. + */ +public class DictionaryProviderTest { + + @Test + public void testReadWrite() throws Exception{ + //string dict + Dictionary<String> dict = getDict(DataType.getType("string"), + Arrays.asList(new String[]{"a","b"}).iterator()); + readWriteTest(dict); + //number dict + Dictionary<String> dict2 = getDict(DataType.getType("long"), + Arrays.asList(new String[]{"1","2"}).iterator()); + readWriteTest(dict2); + + //date dict + Dictionary<String> dict3 = getDict(DataType.getType("datetime"), + Arrays.asList(new String[]{"20161122","20161123"}).iterator()); + readWriteTest(dict3); + + //date dict + Dictionary<String> dict4 = getDict(DataType.getType("datetime"), + Arrays.asList(new String[]{"2016-11-22","2016-11-23"}).iterator()); + readWriteTest(dict4); + + //date dict + try { + Dictionary<String> dict5 = getDict(DataType.getType("date"), + Arrays.asList(new String[]{"2016-11-22", "20161122"}).iterator()); + readWriteTest(dict5); + fail("Date format not correct.Should throw exception"); + }catch (IllegalStateException e){ + //correct + } + } + + @Test + public void testReadWriteTime(){ + System.out.println(Long.MAX_VALUE); + System.out.println(Long.MIN_VALUE); + } + + + private Dictionary<String> getDict(DataType type, Iterator<String> values) throws Exception{ + IDictionaryReducerLocalBuilder builder = DictionaryReducerLocalGenerator.getBuilder(type); + while(values.hasNext()){ + builder.addValue(values.next()); + } + return builder.build(0); + } + + private void readWriteTest(Dictionary<String> dict) throws Exception{ + final String path = "src/test/resources/dict/tmp_dict"; + File f = new File(path); + f.deleteOnExit(); + f.createNewFile(); + String dictClassName = dict.getClass().getName(); + DataOutputStream out = new DataOutputStream(new FileOutputStream(f)); + out.writeUTF(dictClassName); + dict.write(out); + out.close(); + //read dict + DataInputStream in = null; + Dictionary<String> dict2 = null; + try { + File f2 = new File(path); + in = new DataInputStream(new FileInputStream(f2)); + String dictClassName2 = in.readUTF(); + dict2 = (Dictionary<String>) ClassUtil.newInstance(dictClassName2); + dict2.readFields(in); + }catch(IOException e){ + e.printStackTrace(); + }finally { + if(in != null){ + try { + in.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + assertTrue(dict.equals(dict2)); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/a4e1f8dd/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java index 0dc1afa..56b1106 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java @@ -168,7 +168,7 @@ public class ColumnValueRange { // remove invalid EQ/IN values and round start/end according to dictionary public void preEvaluateWithDict(Dictionary<String> dict) { - if (dict == null) + if (dict == null || dict.getSize() == 0) return; if (equalValues != null) { http://git-wip-us.apache.org/repos/asf/kylin/blob/a4e1f8dd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java index 5d7cb21..63005f9 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java @@ -19,15 +19,25 @@ package org.apache.kylin.engine.mr.steps; import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.cli.DictionaryGeneratorCLI; +import org.apache.kylin.dict.DictionaryProvider; import org.apache.kylin.dict.DistinctColumnValuesProvider; +import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.SortedColumnDFSFile; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.ReadableTable; +import java.io.IOException; + /** * @author ysong1 */ @@ -48,13 +58,45 @@ public class CreateDictionaryJob extends AbstractHadoopJob { final String segmentID = getOptionValue(OPTION_SEGMENT_ID); final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH); - KylinConfig config = KylinConfig.getInstanceFromEnv(); + final KylinConfig config = KylinConfig.getInstanceFromEnv(); DictionaryGeneratorCLI.processSegment(config, cubeName, segmentID, new DistinctColumnValuesProvider() { @Override public ReadableTable getDistinctValuesFor(TblColRef col) { return new SortedColumnDFSFile(factColumnsInputPath + "/" + col.getName(), col.getType()); } + }, new DictionaryProvider() { + + @Override + public Dictionary<String> getDictionary(TblColRef col) { + if (!config.isReducerLocalBuildDict()) { + return null; + } + FSDataInputStream is = null; + try { + Path colDir = new Path(factColumnsInputPath, col.getName()); + Path outputFile = new Path(colDir, col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX); + Configuration conf = HadoopUtil.getCurrentConfiguration(); + FileSystem fs = HadoopUtil.getFileSystem(outputFile.getName()); + is = fs.open(outputFile); + String dictClassName = is.readUTF(); + Dictionary<String> dict = (Dictionary<String>) ClassUtil.newInstance(dictClassName); + dict.readFields(is); + logger.info("DictionaryProvider read dict form file : " + outputFile.getName()); + return dict; + } catch (Exception e) { + e.printStackTrace(); + return null; + } finally { + if (is != null) { + try { + is.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } }); return returnCode; http://git-wip-us.apache.org/repos/asf/kylin/blob/a4e1f8dd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index 6e24d61..5511626 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -27,6 +27,8 @@ import java.util.List; import java.util.Map; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.FastDateFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -36,14 +38,19 @@ import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.DateFormat; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.dict.DictionaryReducerLocalGenerator; +import org.apache.kylin.dict.IDictionaryReducerLocalBuilder; import org.apache.kylin.engine.mr.KylinReducer; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.CubeStatsWriter; import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,10 +80,19 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class); + //local build dict + private boolean isReducerLocalBuildDict; + private IDictionaryReducerLocalBuilder builder; + private FastDateFormat dateFormat; + private long timeMaxValue = Long.MIN_VALUE; + private long timeMinValue = Long.MAX_VALUE; + public static final String DICT_FILE_POSTFIX = ".RLD"; + public static final String PARTITION_COL_INFO_FILE_POSTFIX = ".PCI"; + private boolean isPartitionCol = false; + @Override protected void setup(Context context) throws IOException { super.bindCurrentConfiguration(context.getConfiguration()); - Configuration conf = context.getConfiguration(); KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME); @@ -102,14 +118,36 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK } else if (collectStatistics && (taskId == numberOfTasks - 2)) { // partition col isStatistics = false; + isPartitionCol = true; col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); colValues = Lists.newLinkedList(); + DataType partitionColType = col.getType(); + if (partitionColType.isDate()) { + dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN); + } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) { + dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS); + } else if (partitionColType.isStringFamily()) { + String partitionDateFormat = cubeDesc.getModel().getPartitionDesc().getPartitionDateFormat(); + if (StringUtils.isEmpty(partitionDateFormat)) { + partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN; + } + dateFormat = DateFormat.getDateFormat(partitionDateFormat); + } else { + throw new IllegalStateException("Type " + partitionColType + " is not valid partition column type"); + } } else { // col isStatistics = false; col = columnList.get(ReducerIdToColumnIndex.get(taskId)); colValues = Lists.newLinkedList(); } + + //local build dict + isReducerLocalBuildDict = config.isReducerLocalBuildDict(); + if (col != null && isReducerLocalBuildDict) { + builder = DictionaryReducerLocalGenerator.getBuilder(col.getType()); + } + } private void initReducerIdToColumnIndex(KylinConfig config) throws IOException { @@ -150,11 +188,26 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK } } } else { - colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1))); - if (colValues.size() == 1000000) { //spill every 1 million - logger.info("spill values to disk..."); - outputDistinctValues(col, colValues, context); - colValues.clear(); + if (isReducerLocalBuildDict) { + String value = new String(key.getBytes(), 1, key.getLength() - 1); + //partition col + try { + if (isPartitionCol) { + long time = dateFormat.parse(value).getTime(); + timeMinValue = Math.min(timeMinValue, time); + timeMaxValue = Math.max(timeMaxValue, time); + } + builder.addValue(value); + } catch (Exception e) { + e.printStackTrace(); + } + } else { + colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1))); + if (colValues.size() == 1000000) { //spill every 1 million + logger.info("spill values to disk..."); + outputDistinctValues(col, colValues, context); + colValues.clear(); + } } } @@ -191,12 +244,64 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK } } + private void outputDict(TblColRef col, Dictionary<String> dict, Context context) throws IOException { + final String fileName = col.getName() + DICT_FILE_POSTFIX; + FSDataOutputStream out = getOutputStream(context, fileName); + try { + String dictClassName = dict.getClass().getName(); + out.writeUTF(dictClassName); + dict.write(out); + logger.info("reducer id is:+" + taskId + " colName:" + col.getName() + " writing dict at file : " + fileName + " dict class:" + dictClassName); + } finally { + IOUtils.closeQuietly(out); + } + } + + private void outputPartitionInfo(Context context) throws IOException { + final String fileName = col.getName() + PARTITION_COL_INFO_FILE_POSTFIX; + FSDataOutputStream out = getOutputStream(context, fileName); + try { + out.writeLong(timeMinValue); + out.writeLong(timeMaxValue); + logger.info("write partition info for col : " + col.getName() + " minValue:" + timeMinValue + " maxValue:" + timeMaxValue); + } finally { + IOUtils.closeQuietly(out); + } + } + + private FSDataOutputStream getOutputStream(Context context, String outputFileName) throws IOException { + final Configuration conf = context.getConfiguration(); + final FileSystem fs = FileSystem.get(conf); + final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH); + final Path colDir = new Path(outputPath, col.getName()); + final Path outputFile = new Path(colDir, outputFileName); + FSDataOutputStream out = null; + if (!fs.exists(colDir)) { + fs.mkdirs(colDir); + } + fs.deleteOnExit(outputFile); + out = fs.create(outputFile); + return out; + } + @Override protected void doCleanup(Context context) throws IOException, InterruptedException { if (isStatistics == false) { - if (colValues.size() > 0) { - outputDistinctValues(col, colValues, context); - colValues.clear(); + if (isReducerLocalBuildDict) { + try { + if (isPartitionCol) { + outputPartitionInfo(context); + } + Dictionary<String> dict = builder.build(0); + outputDict(col, dict, context); + } catch (Exception e) { + e.printStackTrace(); + } + } else { + if (colValues.size() > 0) { + outputDistinctValues(col, colValues, context); + colValues.clear(); + } } } else { //output the hll info; http://git-wip-us.apache.org/repos/asf/kylin/blob/a4e1f8dd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java index eb06f07..977196c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java @@ -19,25 +19,23 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; -import java.text.ParseException; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.time.FastDateFormat; -import org.apache.kylin.common.util.DateFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; -import org.apache.kylin.engine.mr.SortedColumnDFSFile; +import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; -import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.source.ReadableTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,39 +81,25 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { private void updateTimeRange(CubeSegment segment) throws IOException { final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef(); - final DataType partitionColType = partitionCol.getType(); - final FastDateFormat dateFormat; - if (partitionColType.isDate()) { - dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN); - } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) { - dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS); - } else if (partitionColType.isStringFamily()) { - String partitionDateFormat = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat(); - if (StringUtils.isEmpty(partitionDateFormat)) { - partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN; - } - dateFormat = DateFormat.getDateFormat(partitionDateFormat); - } else { - throw new IllegalStateException("Type " + partitionColType + " is not valid partition column type"); - } - - final String factDistinctPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); - //final ReadableTable readableTable = new DFSFileTable(factDistinctPath + "/" + partitionCol.getName(), -1); - final ReadableTable readableTable = new SortedColumnDFSFile(factDistinctPath + "/" + partitionCol.getName(), partitionCol.getType()); - final ReadableTable.TableReader tableReader = readableTable.getReader(); + final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); + Path colDir = new Path(factColumnsInputPath, partitionCol.getName()); + Path outputFile = new Path(colDir, partitionCol.getName() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX); + Configuration conf = HadoopUtil.getCurrentConfiguration(); + FileSystem fs = HadoopUtil.getFileSystem(outputFile.getName()); + FSDataInputStream is = null; long minValue = Long.MAX_VALUE, maxValue = Long.MIN_VALUE; try { - while (tableReader.next()) { - long time = dateFormat.parse(tableReader.getRow()[0]).getTime(); - minValue = Math.min(minValue, time); - maxValue = Math.max(maxValue, time); - } - } catch (ParseException e) { + is = fs.open(outputFile); + long min = is.readLong(); + long max = is.readLong(); + minValue = Math.min(min, minValue); + maxValue = Math.max(max, maxValue); + } catch (IOException e) { throw new IOException(e); } finally { - IOUtils.closeQuietly(tableReader); + IOUtils.closeQuietly(is); } - + logger.info("updateTimeRange step. minValue:" + minValue + " maxValue:" + maxValue); segment.setDateRangeStart(minValue); segment.setDateRangeEnd(maxValue); } http://git-wip-us.apache.org/repos/asf/kylin/blob/a4e1f8dd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java index 9af0faf..02aa64a 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java @@ -437,7 +437,11 @@ public class CubeStorageQuery implements IStorageQuery { // build row key range for each cube segment StringBuilder sb = new StringBuilder("hbasekeyrange trace: "); for (CubeSegment cubeSeg : segs) { - + CubeDesc cubeDesc = cubeSeg.getCubeDesc(); + if (cubeDesc.getConfig().isSkippingEmptySegments() && cubeSeg.getInputRecords() == 0) { + logger.info("Skip cube segment {} because its input record is 0", cubeSeg); + continue; + } // consider derived (lookup snapshot), filter on dimension may // differ per segment List<Collection<ColumnValueRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter, cubeSeg);